Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 22 additions & 22 deletions Pipfile.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file not shown.
Binary file not shown.
7 changes: 7 additions & 0 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,9 +344,11 @@ def test_bulk_update_embeddings_exit_bulk_operation_error(
@patch("tim.opensearch.create_index")
@patch("tim.opensearch.promote_index")
@patch("tim.opensearch.get_index_aliases")
@patch("tim.opensearch.bulk_update")
@patch("tim.opensearch.bulk_index")
def test_reindex_source_success(
mock_bulk_index,
mock_bulk_update,
mock_get_index_aliases,
mock_promote_index,
mock_create_index,
Expand All @@ -362,6 +364,11 @@ def test_reindex_source_success(
"errors": 0,
"total": 1000,
}
mock_bulk_update.return_value = {
"updated": 10,
"errors": 0,
"total": 10,
}

result = runner.invoke(
main,
Expand Down
35 changes: 26 additions & 9 deletions tim/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ def bulk_update_embeddings(
"embedding_object",
],
run_id=run_id,
action="index",
)
embeddings_to_index = helpers.format_embeddings(embeddings)

Expand Down Expand Up @@ -429,11 +430,10 @@ def reindex_source(

This CLI command performs the following:
1. creates a new index for the source
2. promotes this index as the primary for the source alias, and added to any other
2. promotes this index as the primary for the source alias and add to any other
aliases passed (e.g. 'timdex')
3. uses the TDA library to yield only current records from the parquet dataset
for the source
4. bulk index these records to the new Opensearch index
3. bulk index current records from the parquet dataset to the index
4. bulk update current embeddings (if any) from the parquet dataset to the index

The net effect is a full refresh for a source in Opensearch, ensuring only current,
non-deleted versions of records are used from the parquet dataset.
Expand All @@ -454,12 +454,11 @@ def reindex_source(
tim_os.get_index_aliases(client, index),
)

# perform bulk indexing of current records from source
index_results = {"created": 0, "updated": 0, "errors": 0, "total": 0}

# reindex current records from source
td = TIMDEXDataset(location=dataset_path)

# bulk index records
index_results = {"created": 0, "updated": 0, "errors": 0, "total": 0}
records_to_index = td.read_transformed_records_iter(
table="current_records",
source=source,
Expand All @@ -468,7 +467,25 @@ def reindex_source(
try:
index_results.update(tim_os.bulk_index(client, index, records_to_index))
except BulkIndexingError as exception:
logger.info(f"Bulk indexing failed: {exception}")
logger.error(f"Bulk indexing failed: {exception}") # noqa: TRY400

# bulk index embeddings
update_results = {"updated": 0, "errors": 0, "total": 0}
embeddings = td.embeddings.read_dicts_iter(
table="current_embeddings",
columns=[
"timdex_record_id",
"embedding_strategy",
"embedding_object",
],
source=source,
action="index",
)
embeddings_to_index = helpers.format_embeddings(embeddings)
try:
update_results.update(tim_os.bulk_update(client, index, embeddings_to_index))
except BulkOperationError as exception:
logger.error(f"Bulk update with embeddings failed: {exception}") # noqa: TRY400

summary_results = {"index": index_results}
summary_results = {"index": index_results, "update": update_results}
logger.info(f"Reindex source complete: {json.dumps(summary_results)}")