diff --git a/Pipfile.lock b/Pipfile.lock index c0e2065..20c0553 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -26,20 +26,20 @@ }, "boto3": { "hashes": [ - "sha256:cdd4cc3e5bb08ed8a0c5cc77eca78f98f0239521de0991f14e44b788b0c639b2", - "sha256:d21d22af9aeb1bad8e9b670a221d6534c0120f7e7baf523dafaca83f1f5c3f90" + "sha256:649b134d25b278c24fcc8b3f94519de3884283b7848dc32f42b0ffdd9d19ce99", + "sha256:8112e1beb5978bb455ea4b41a9ef26fc408f6340d8ff69ef93dded4f80fd53e9" ], "index": "pypi", "markers": "python_version >= '3.9'", - "version": "==1.42.9" + "version": "==1.42.12" }, "botocore": { "hashes": [ - "sha256:74f69bfd116cc7c8215481284957eecdb48580e071dd50cb8c64356a866abd8c", - "sha256:f99ba2ca34e24c4ebec150376c815646970753c032eb84f230874b2975a185a8" + "sha256:1f9f63c3d6bb1f768519da30d6018706443c5d8af5472274d183a4945f3d81f8", + "sha256:4f163880350f6d831857ce5d023875b7c6534be862e5affd9fcf82b8d1ab3537" ], "markers": "python_version >= '3.9'", - "version": "==1.42.9" + "version": "==1.42.12" }, "certifi": { "hashes": [ @@ -644,12 +644,12 @@ }, "sentry-sdk": { "hashes": [ - "sha256:8218891d5e41b4ea8d61d2aed62ed10c80e39d9f2959d6f939efbf056857e050", - "sha256:d72f8c61025b7d1d9e52510d03a6247b280094a327dd900d987717a4fce93412" + "sha256:5213190977ff7fdff8a58b722fb807f8d5524a80488626ebeda1b5676c0c1473", + "sha256:6b12ac256769d41825d9b7518444e57fa35b5642df4c7c5e322af4d2c8721172" ], "index": "pypi", "markers": "python_version >= '3.6'", - "version": "==2.47.0" + "version": "==2.48.0" }, "six": { "hashes": [ @@ -735,7 +735,7 @@ }, "timdex-dataset-api": { "git": "https://github.com/MITLibraries/timdex-dataset-api.git", - "ref": "184d87fd0647cae9e1db8488f2a3d88de96a7226" + "ref": "a1d8ad7662d8d864d1fb2b52376312c7c92e7398" }, "typing-extensions": { "hashes": [ @@ -927,20 +927,20 @@ }, "boto3-stubs": { "hashes": [ - "sha256:b132be8260eb56010b47499658cf6c485f99ea0190969e7d1adb74c505c83e68", - "sha256:d9b108fb3e0af33fedb0a9f0e214bc1d474a6e092a4b8755e88ccc847dc9c624" + "sha256:18a9a970fd9d8867558f091608f2d9944d6f55143d23cb11785f1c143d91aa54", + "sha256:f69f12c884519a62a6e6d7f296932d4cdc14f47ea987b1eb7b226337f1127d13" ], "index": "pypi", "markers": "python_version >= '3.9'", - "version": "==1.42.9" + "version": "==1.42.12" }, "botocore-stubs": { "hashes": [ - "sha256:92fdd2a1d911355166da3e30e9bb9b1803f7e2caec0d913f5fad3a920352ce6d", - "sha256:9f8b652549d4f727aa69e09d462d18e54a1bd10f3dbb593da56d5d0aafe9756e" + "sha256:788ecde81894f149bf210286f82ff3e49b97ce736a2ecb89f210be3f0374d35e", + "sha256:c2bac920d8c302e15e6bec9593daeaf04777714f8309fe052fd24cf28eb85a76" ], "markers": "python_version >= '3.9'", - "version": "==1.42.9" + "version": "==1.42.11" }, "cachecontrol": { "extras": [ @@ -1253,11 +1253,11 @@ }, "filelock": { "hashes": [ - "sha256:339b4732ffda5cd79b13f4e2711a31b0365ce445d95d243bb996273d072546a2", - "sha256:711e943b4ec6be42e1d4e6690b48dc175c822967466bb31c0c293f34334c13f4" + "sha256:15d9e9a67306188a44baa72f569d2bfd803076269365fdea0934385da4dc361a", + "sha256:b8360948b351b80f420878d8516519a2204b07aefcdcfd24912a5d33127f188c" ], "markers": "python_version >= '3.10'", - "version": "==3.20.0" + "version": "==3.20.1" }, "freezegun": { "hashes": [ @@ -1651,12 +1651,12 @@ }, "pre-commit": { "hashes": [ - "sha256:25e2ce09595174d9c97860a95609f9f852c0614ba602de3561e267547f2335e1", - "sha256:dc5a065e932b19fc1d4c653c6939068fe54325af8e741e74e88db4d28a4dd66b" + "sha256:3b3afd891e97337708c1674210f8eba659b52a38ea5f822ff142d10786221f77", + "sha256:eb545fcff725875197837263e977ea257a402056661f09dae08e4b149b030a61" ], "index": "pypi", "markers": "python_version >= '3.10'", - "version": "==4.5.0" + "version": "==4.5.1" }, "prompt-toolkit": { "hashes": [ diff --git a/tests/fixtures/dataset/data/embeddings/year=2025/month=12/day=10/629d15f4-84e4-4b32-92c1-1b1debd377fb-0.parquet b/tests/fixtures/dataset/data/embeddings/year=2025/month=12/day=17/4a40cef5-5629-4bc7-b743-7804a34f9593-0.parquet similarity index 94% rename from tests/fixtures/dataset/data/embeddings/year=2025/month=12/day=10/629d15f4-84e4-4b32-92c1-1b1debd377fb-0.parquet rename to tests/fixtures/dataset/data/embeddings/year=2025/month=12/day=17/4a40cef5-5629-4bc7-b743-7804a34f9593-0.parquet index 90b2f2a..81e5d79 100644 Binary files a/tests/fixtures/dataset/data/embeddings/year=2025/month=12/day=10/629d15f4-84e4-4b32-92c1-1b1debd377fb-0.parquet and b/tests/fixtures/dataset/data/embeddings/year=2025/month=12/day=17/4a40cef5-5629-4bc7-b743-7804a34f9593-0.parquet differ diff --git a/tests/fixtures/dataset/data/embeddings/year=2025/month=12/day=17/8a790473-7274-4ccd-bb90-f3c9bcc1801d-0.parquet b/tests/fixtures/dataset/data/embeddings/year=2025/month=12/day=17/8a790473-7274-4ccd-bb90-f3c9bcc1801d-0.parquet new file mode 100644 index 0000000..af9e013 Binary files /dev/null and b/tests/fixtures/dataset/data/embeddings/year=2025/month=12/day=17/8a790473-7274-4ccd-bb90-f3c9bcc1801d-0.parquet differ diff --git a/tests/test_cli.py b/tests/test_cli.py index 61dbc4f..07a6546 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -344,9 +344,11 @@ def test_bulk_update_embeddings_exit_bulk_operation_error( @patch("tim.opensearch.create_index") @patch("tim.opensearch.promote_index") @patch("tim.opensearch.get_index_aliases") +@patch("tim.opensearch.bulk_update") @patch("tim.opensearch.bulk_index") def test_reindex_source_success( mock_bulk_index, + mock_bulk_update, mock_get_index_aliases, mock_promote_index, mock_create_index, @@ -362,6 +364,11 @@ def test_reindex_source_success( "errors": 0, "total": 1000, } + mock_bulk_update.return_value = { + "updated": 10, + "errors": 0, + "total": 10, + } result = runner.invoke( main, diff --git a/tim/cli.py b/tim/cli.py index a97f4c6..58219bd 100644 --- a/tim/cli.py +++ b/tim/cli.py @@ -385,6 +385,7 @@ def bulk_update_embeddings( "embedding_object", ], run_id=run_id, + action="index", ) embeddings_to_index = helpers.format_embeddings(embeddings) @@ -429,11 +430,10 @@ def reindex_source( This CLI command performs the following: 1. creates a new index for the source - 2. promotes this index as the primary for the source alias, and added to any other + 2. promotes this index as the primary for the source alias and add to any other aliases passed (e.g. 'timdex') - 3. uses the TDA library to yield only current records from the parquet dataset - for the source - 4. bulk index these records to the new Opensearch index + 3. bulk index current records from the parquet dataset to the index + 4. bulk update current embeddings (if any) from the parquet dataset to the index The net effect is a full refresh for a source in Opensearch, ensuring only current, non-deleted versions of records are used from the parquet dataset. @@ -454,12 +454,11 @@ def reindex_source( tim_os.get_index_aliases(client, index), ) - # perform bulk indexing of current records from source - index_results = {"created": 0, "updated": 0, "errors": 0, "total": 0} - + # reindex current records from source td = TIMDEXDataset(location=dataset_path) # bulk index records + index_results = {"created": 0, "updated": 0, "errors": 0, "total": 0} records_to_index = td.read_transformed_records_iter( table="current_records", source=source, @@ -468,7 +467,25 @@ def reindex_source( try: index_results.update(tim_os.bulk_index(client, index, records_to_index)) except BulkIndexingError as exception: - logger.info(f"Bulk indexing failed: {exception}") + logger.error(f"Bulk indexing failed: {exception}") # noqa: TRY400 + + # bulk index embeddings + update_results = {"updated": 0, "errors": 0, "total": 0} + embeddings = td.embeddings.read_dicts_iter( + table="current_embeddings", + columns=[ + "timdex_record_id", + "embedding_strategy", + "embedding_object", + ], + source=source, + action="index", + ) + embeddings_to_index = helpers.format_embeddings(embeddings) + try: + update_results.update(tim_os.bulk_update(client, index, embeddings_to_index)) + except BulkOperationError as exception: + logger.error(f"Bulk update with embeddings failed: {exception}") # noqa: TRY400 - summary_results = {"index": index_results} + summary_results = {"index": index_results, "update": update_results} logger.info(f"Reindex source complete: {json.dumps(summary_results)}")