Skip to content

Commit 96f0096

Browse files
Use metadata in 'bulk-update-embeddings' (#375)
Why these changes are being introduced: * With TDA 3.8.0, we can now retrieve record metadata columns in embeddings read methods. Filtering embeddings by `action="index"` prevents any attempt to update documents that do not exist in OpenSearch (`action="delete"`), which results in an API error.. This is important especially with the current state of tim.opensearch.bulk_update, which will raise a BulkOperationError and cause the 'bulk_update_embeddings' CLI command to exit early. This also includes an additional change to also index embeddings when performing a reindex. How this addresses that need: * Filter embeddings by action="index" * Install latest version of timdex-dataset-api (latest commit) * Update embeddings in fixtures/test/dataset to use 'embeddings_timestamp" Side effects of this change: * None Relevant ticket(s): * https://mitlibraries.atlassian.net/browse/USE-273
1 parent fc4a958 commit 96f0096

File tree

5 files changed

+55
-31
lines changed

5 files changed

+55
-31
lines changed

Pipfile.lock

Lines changed: 22 additions & 22 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

tests/fixtures/dataset/data/embeddings/year=2025/month=12/day=10/629d15f4-84e4-4b32-92c1-1b1debd377fb-0.parquet renamed to tests/fixtures/dataset/data/embeddings/year=2025/month=12/day=17/4a40cef5-5629-4bc7-b743-7804a34f9593-0.parquet

22.8 KB
Binary file not shown.

tests/test_cli.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -344,9 +344,11 @@ def test_bulk_update_embeddings_exit_bulk_operation_error(
344344
@patch("tim.opensearch.create_index")
345345
@patch("tim.opensearch.promote_index")
346346
@patch("tim.opensearch.get_index_aliases")
347+
@patch("tim.opensearch.bulk_update")
347348
@patch("tim.opensearch.bulk_index")
348349
def test_reindex_source_success(
349350
mock_bulk_index,
351+
mock_bulk_update,
350352
mock_get_index_aliases,
351353
mock_promote_index,
352354
mock_create_index,
@@ -362,6 +364,11 @@ def test_reindex_source_success(
362364
"errors": 0,
363365
"total": 1000,
364366
}
367+
mock_bulk_update.return_value = {
368+
"updated": 10,
369+
"errors": 0,
370+
"total": 10,
371+
}
365372

366373
result = runner.invoke(
367374
main,

tim/cli.py

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -385,6 +385,7 @@ def bulk_update_embeddings(
385385
"embedding_object",
386386
],
387387
run_id=run_id,
388+
action="index",
388389
)
389390
embeddings_to_index = helpers.format_embeddings(embeddings)
390391

@@ -429,11 +430,10 @@ def reindex_source(
429430
430431
This CLI command performs the following:
431432
1. creates a new index for the source
432-
2. promotes this index as the primary for the source alias, and added to any other
433+
2. promotes this index as the primary for the source alias and add to any other
433434
aliases passed (e.g. 'timdex')
434-
3. uses the TDA library to yield only current records from the parquet dataset
435-
for the source
436-
4. bulk index these records to the new Opensearch index
435+
3. bulk index current records from the parquet dataset to the index
436+
4. bulk update current embeddings (if any) from the parquet dataset to the index
437437
438438
The net effect is a full refresh for a source in Opensearch, ensuring only current,
439439
non-deleted versions of records are used from the parquet dataset.
@@ -454,12 +454,11 @@ def reindex_source(
454454
tim_os.get_index_aliases(client, index),
455455
)
456456

457-
# perform bulk indexing of current records from source
458-
index_results = {"created": 0, "updated": 0, "errors": 0, "total": 0}
459-
457+
# reindex current records from source
460458
td = TIMDEXDataset(location=dataset_path)
461459

462460
# bulk index records
461+
index_results = {"created": 0, "updated": 0, "errors": 0, "total": 0}
463462
records_to_index = td.read_transformed_records_iter(
464463
table="current_records",
465464
source=source,
@@ -468,7 +467,25 @@ def reindex_source(
468467
try:
469468
index_results.update(tim_os.bulk_index(client, index, records_to_index))
470469
except BulkIndexingError as exception:
471-
logger.info(f"Bulk indexing failed: {exception}")
470+
logger.error(f"Bulk indexing failed: {exception}") # noqa: TRY400
471+
472+
# bulk index embeddings
473+
update_results = {"updated": 0, "errors": 0, "total": 0}
474+
embeddings = td.embeddings.read_dicts_iter(
475+
table="current_embeddings",
476+
columns=[
477+
"timdex_record_id",
478+
"embedding_strategy",
479+
"embedding_object",
480+
],
481+
source=source,
482+
action="index",
483+
)
484+
embeddings_to_index = helpers.format_embeddings(embeddings)
485+
try:
486+
update_results.update(tim_os.bulk_update(client, index, embeddings_to_index))
487+
except BulkOperationError as exception:
488+
logger.error(f"Bulk update with embeddings failed: {exception}") # noqa: TRY400
472489

473-
summary_results = {"index": index_results}
490+
summary_results = {"index": index_results, "update": update_results}
474491
logger.info(f"Reindex source complete: {json.dumps(summary_results)}")

0 commit comments

Comments
 (0)