Skip to content

Commit b35bbb0

Browse files
committed
Remove TIMDEXDataset load and filtering, use metadata
Why these changes are being introduced: TIMDEXDataset no longer requires .load(), and now provides information about records in the dataset via metadata queries. As such, we can determine if there are records in a run just by performing a metadata SQL query. How this addresses that need: * Remove `TIMDEXDataset.load()` call * Replace row count with metadata SQL call for ETL run count Side effects of this change: * None Relevant ticket(s): * https://mitlibraries.atlassian.net/browse/TIMX-535
1 parent 8653224 commit b35bbb0

File tree

3 files changed

+17
-9
lines changed

3 files changed

+17
-9
lines changed

lambdas/format_input.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,10 +92,9 @@ def lambda_handler(event: dict, _context: dict) -> dict:
9292
return result
9393

9494
if next_step == "load":
95-
if not helpers.dataset_records_exist_for_run(run_date, run_id):
95+
if not helpers.dataset_records_exist_for_run(run_id):
9696
result["failure"] = (
97-
"No records were found in the TIMDEX dataset for run_date "
98-
f"'{run_date}', run_id '{run_id}'."
97+
f"No records were found in the TIMDEX dataset for run_id '{run_id}'."
9998
)
10099
return result
101100
result["load"] = commands.generate_load_commands(

lambdas/helpers.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
# ruff: noqa: S608
2+
13
import contextlib
24
import logging
35
from datetime import UTC, datetime, timedelta
@@ -157,14 +159,22 @@ def list_s3_files_by_prefix(bucket: str, prefix: str) -> list[str]:
157159
return s3_files
158160

159161

160-
def dataset_records_exist_for_run(run_date: str, run_id: str) -> bool:
161-
"""Query TIMDEX dataset to confirm records to load and/or delete.
162+
def dataset_records_exist_for_run(run_id: str) -> bool:
163+
"""Query TIMDEX dataset metadata to confirm records to load and/or delete.
162164
163165
A "run" is defined by a run-date + run-id, both provided as inputs to this lambda
164166
invocation provided by the StepFunction. We are interested only in records where
165167
action is "index" or "delete". If zero records exist, or have action "skip" or
166168
"error", we do not need to perform any load commands.
167169
"""
168170
td = TIMDEXDataset(location=CONFIG.s3_timdex_dataset_location)
169-
td.load(run_date=run_date, run_id=run_id, action=["index", "delete"])
170-
return td.row_count > 0
171+
172+
etl_run_count = td.metadata.conn.query(
173+
f"""
174+
select count(*)
175+
from metadata.records
176+
where run_id = '{run_id}'
177+
and action in ('index','delete')
178+
"""
179+
).fetchone()[0]
180+
return etl_run_count > 0

tests/test_format_input.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,6 @@ def test_lambda_handler_with_next_step_load_no_files_present():
280280
"source": "testsource",
281281
"verbose": False,
282282
"failure": (
283-
"No records were found in the TIMDEX dataset for "
284-
"run_date '2022-01-02', run_id 'run-abc-123'."
283+
"No records were found in the TIMDEX dataset for run_id 'run-abc-123'."
285284
),
286285
}

0 commit comments

Comments
 (0)