Skip to content

Commit b0c1b91

Browse files
authored
Merge pull request #321 from MITLibraries/TIMX-521-new-etl-records-data-location
TIMX 521 - New ETL records data location
2 parents 5619aad + 504a392 commit b0c1b91

File tree

12 files changed

+712
-652
lines changed

12 files changed

+712
-652
lines changed

Pipfile.lock

Lines changed: 418 additions & 430 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

README.md

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -178,9 +178,7 @@ WORKSPACE=### Set to `dev` for local development; this will be set to `stage` an
178178

179179
### Optional
180180

181-
```shell
182-
ETL_VERSION=### Version number of the TIMDEX ETL infrastructure. This can be used to align application behavior with the requirements of other applications in the TIMDEX ETL pipeline.
183-
```
181+
None at this time.
184182

185183

186184

lambdas/alma_prep.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import logging
2-
import os
32
import tarfile
43
from collections.abc import Generator
54
from typing import IO, TYPE_CHECKING
@@ -11,9 +10,12 @@
1110
from mypy_boto3_s3.client import S3Client # pragma: no cover
1211

1312
from lambdas import helpers
13+
from lambdas.config import Config
1414

1515
logger = logging.getLogger(__name__)
1616

17+
CONFIG = Config()
18+
1719

1820
def extract_file_from_source_bucket_to_target_bucket(
1921
s3_client: "S3Client",
@@ -100,7 +102,7 @@ def prepare_alma_export_files(run_date: str, run_type: str, timdex_bucket: str)
100102
then performs the extract, unzip, rename and upload steps.
101103
"""
102104
export_job_date = run_date.replace("-", "")
103-
alma_bucket = os.environ["TIMDEX_ALMA_EXPORT_BUCKET_ID"]
105+
alma_bucket = CONFIG.alma_export_bucket
104106
alma_export_files = helpers.list_s3_files_by_prefix(
105107
alma_bucket,
106108
f"exlibris/timdex/TIMDEX_ALMA_EXPORT_{run_type.upper()}_{export_job_date}",

lambdas/commands.py

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
import logging
22

3-
from lambdas import config, helpers
3+
from lambdas import helpers
4+
from lambdas.config import Config
45

56
logger = logging.getLogger(__name__)
67

8+
CONFIG = Config()
9+
710

811
def generate_extract_command(
912
# ruff: noqa: FBT001
@@ -28,7 +31,7 @@ def generate_extract_command(
2831
if verbose:
2932
extract_command.append("--verbose")
3033

31-
if source in config.GIS_SOURCES:
34+
if source in CONFIG.GIS_SOURCES:
3235
extract_command.append("harvest")
3336
if run_type == "daily":
3437
extract_command.append("--harvest-type=incremental")
@@ -80,7 +83,7 @@ def generate_transform_commands(
8083
for extract_output_file in extract_output_files:
8184
transform_command = [
8285
f"--input-file=s3://{timdex_bucket}/{extract_output_file}",
83-
f"--output-location=s3://{timdex_bucket}/dataset",
86+
f"--output-location={CONFIG.s3_timdex_dataset_data_location}",
8487
f"--source={source}",
8588
f"--run-id={run_id}",
8689
f"--run-timestamp={run_timestamp}",
@@ -94,11 +97,8 @@ def generate_load_commands(
9497
run_date: str,
9598
run_type: str,
9699
run_id: str,
97-
timdex_bucket: str,
98100
) -> dict:
99101
"""Generate task run command for TIMDEX load."""
100-
dataset_location = f"s3://{timdex_bucket}/dataset"
101-
102102
update_command = [
103103
"bulk-update",
104104
"--run-date",
@@ -108,14 +108,22 @@ def generate_load_commands(
108108
]
109109

110110
if run_type == "daily":
111-
update_command.extend(["--source", source, dataset_location])
111+
update_command.extend(
112+
[
113+
"--source",
114+
source,
115+
CONFIG.s3_timdex_dataset_data_location,
116+
]
117+
)
112118
return {"bulk-update-command": update_command}
113119

114120
if run_type == "full":
115121
new_index_name = helpers.generate_index_name(source)
116-
update_command.extend(["--index", new_index_name, dataset_location])
122+
update_command.extend(
123+
["--index", new_index_name, CONFIG.s3_timdex_dataset_data_location]
124+
)
117125
promote_index_command = ["promote", "--index", new_index_name]
118-
for alias, sources in config.INDEX_ALIASES.items():
126+
for alias, sources in CONFIG.INDEX_ALIASES.items():
119127
if source in sources:
120128
promote_index_command.append("--alias")
121129
promote_index_command.append(alias)

lambdas/config.py

Lines changed: 68 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,71 @@
1+
# ruff: noqa: EM102, TRY003
2+
13
import logging
24
import os
5+
from typing import Any, ClassVar
6+
37

4-
GIS_SOURCES = ["gismit", "gisogm"]
5-
INDEX_ALIASES = {
6-
"rdi": ["jpal", "whoas", "zenodo"],
7-
"timdex": ["alma", "aspace", "dspace"],
8-
"geo": GIS_SOURCES,
9-
}
10-
REQUIRED_ENV = {
11-
"TIMDEX_ALMA_EXPORT_BUCKET_ID",
12-
"TIMDEX_S3_EXTRACT_BUCKET_ID",
13-
"WORKSPACE",
14-
}
15-
REQUIRED_FIELDS = ("next-step", "run-date", "run-type", "source")
16-
REQUIRED_OAI_HARVEST_FIELDS = ("oai-pmh-host", "oai-metadata-format")
17-
VALID_DATE_FORMATS = ("%Y-%m-%d", "%Y-%m-%dT%H:%M:%SZ")
18-
VALID_RUN_TYPES = ("full", "daily")
19-
VALID_STEPS = ("extract", "transform", "load")
20-
21-
22-
def check_verbosity(verbose: bool | str) -> bool: # noqa: FBT001
23-
"""Determine whether verbose is True or False given a boolean or string value."""
24-
if isinstance(verbose, bool):
25-
return verbose
26-
return verbose.lower() == "true"
8+
class Config:
9+
REQUIRED_ENV_VARS = (
10+
"TIMDEX_ALMA_EXPORT_BUCKET_ID",
11+
"TIMDEX_S3_EXTRACT_BUCKET_ID",
12+
"WORKSPACE",
13+
)
14+
OPTIONAL_ENV_VARS = ()
15+
16+
GIS_SOURCES = ("gismit", "gisogm")
17+
INDEX_ALIASES: ClassVar = {
18+
"rdi": ["jpal", "whoas", "zenodo"],
19+
"timdex": ["alma", "aspace", "dspace"],
20+
"geo": GIS_SOURCES,
21+
}
22+
REQUIRED_FIELDS = ("next-step", "run-date", "run-type", "source")
23+
REQUIRED_OAI_HARVEST_FIELDS = ("oai-pmh-host", "oai-metadata-format")
24+
VALID_DATE_FORMATS = ("%Y-%m-%d", "%Y-%m-%dT%H:%M:%SZ")
25+
VALID_RUN_TYPES = ("full", "daily")
26+
VALID_STEPS = ("extract", "transform", "load")
27+
28+
def __getattr__(self, name: str) -> Any: # noqa: ANN401
29+
"""Provide dot notation access to configurations and env vars on this class."""
30+
if name in self.REQUIRED_ENV_VARS or name in self.OPTIONAL_ENV_VARS:
31+
return os.getenv(name)
32+
message = f"'{name}' not a valid configuration variable"
33+
raise AttributeError(message)
34+
35+
def check_required_env_vars(self) -> None:
36+
"""Method to raise exception if required env vars not set."""
37+
missing_vars = [var for var in self.REQUIRED_ENV_VARS if not os.getenv(var)]
38+
if missing_vars:
39+
message = f"Missing required environment variables: {', '.join(missing_vars)}"
40+
raise OSError(message)
41+
42+
@staticmethod
43+
def get_verbose_flag(verbose: bool | str) -> bool: # noqa: FBT001
44+
"""Determine whether verbose is True or False given a boolean or string value."""
45+
if isinstance(verbose, bool):
46+
return verbose
47+
return verbose.lower() == "true"
48+
49+
@property
50+
def alma_export_bucket(self) -> str:
51+
var = "TIMDEX_ALMA_EXPORT_BUCKET_ID"
52+
value = os.getenv(var)
53+
if not value:
54+
raise OSError(f"Env var '{var}' must be defined")
55+
return value
56+
57+
@property
58+
def timdex_bucket(self) -> str:
59+
var = "TIMDEX_S3_EXTRACT_BUCKET_ID"
60+
value = os.getenv(var)
61+
if not value:
62+
raise OSError(f"Env var '{var}' must be defined")
63+
return value
64+
65+
@property
66+
def s3_timdex_dataset_data_location(self) -> str:
67+
"""Return full S3 URI (bucket + prefix) of ETL records data location."""
68+
return f"s3://{self.timdex_bucket}/dataset/data/records"
2769

2870

2971
def configure_logger(
@@ -34,9 +76,9 @@ def configure_logger(
3476
) -> str:
3577
"""Configure application via passed application root logger.
3678
37-
If verbose=True, 3rd party libraries can be quite chatty. For convenience, they can
38-
be set to WARNING level by either passing a comma seperated list of logger names to
39-
'warning_only_loggers' or by setting the env var WARNING_ONLY_LOGGERS.
79+
If verbose=True, 3rd party libraries can be quite chatty. For convenience, they
80+
can be set to WARNING level by either passing a comma seperated list of logger
81+
names to 'warning_only_loggers' or by setting the env var WARNING_ONLY_LOGGERS.
4082
"""
4183
if verbose:
4284
root_logger.setLevel(logging.DEBUG)
@@ -61,55 +103,3 @@ def configure_logger(
61103
f"Logger '{root_logger.name}' configured with level="
62104
f"{logging.getLevelName(root_logger.getEffectiveLevel())}"
63105
)
64-
65-
66-
def validate_input(input_data: dict) -> None:
67-
"""Validate input to the lambda function.
68-
69-
Ensures that all requiered input fields are present and contain valid data.
70-
"""
71-
# All required fields are present
72-
if missing_fields := [field for field in REQUIRED_FIELDS if field not in input_data]:
73-
message = (
74-
f"Input must include all required fields. Missing fields: {missing_fields}"
75-
)
76-
raise ValueError(message)
77-
78-
# Valid next step
79-
next_step = input_data["next-step"]
80-
if next_step not in VALID_STEPS:
81-
message = (
82-
f"Input 'next-step' value must be one of: {VALID_STEPS}. Value "
83-
f"provided was '{next_step}'"
84-
)
85-
raise ValueError(message)
86-
87-
# Valid run type
88-
run_type = input_data["run-type"]
89-
if run_type not in VALID_RUN_TYPES:
90-
message = (
91-
f"Input 'run-type' value must be one of: {VALID_RUN_TYPES}. Value "
92-
f"provided was '{run_type}'"
93-
)
94-
raise ValueError(message)
95-
96-
# If next step is extract step, required harvest fields are present
97-
# ruff: noqa: SIM102
98-
if input_data["next-step"] == "extract":
99-
if input_data["source"] not in GIS_SOURCES:
100-
if missing_harvest_fields := [
101-
field for field in REQUIRED_OAI_HARVEST_FIELDS if field not in input_data
102-
]:
103-
message = (
104-
"Input must include all required harvest fields when starting with "
105-
f"harvest step. Missing fields: {missing_harvest_fields}"
106-
)
107-
raise ValueError(message)
108-
109-
110-
def verify_env() -> None:
111-
"""Confirm that required env variables are set."""
112-
for key in REQUIRED_ENV:
113-
if not os.getenv(key):
114-
message = f"Required env variable {key} is not set"
115-
raise RuntimeError(message)

lambdas/format_input.py

Lines changed: 33 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,30 @@
11
import json
22
import logging
3-
import os
43
import uuid
54
from datetime import UTC, datetime
65

7-
from lambdas import alma_prep, commands, config, errors, helpers
6+
from lambdas import alma_prep, commands, errors, helpers
7+
from lambdas.config import Config, configure_logger
88

99
logger = logging.getLogger(__name__)
1010

11+
CONFIG = Config()
12+
1113

1214
def lambda_handler(event: dict, _context: dict) -> dict:
1315
"""Format data into the necessary input for TIMDEX pipeline processing."""
14-
config.verify_env()
15-
verbose = config.check_verbosity(event.get("verbose", False))
16-
config.configure_logger(logging.getLogger(), verbose=verbose)
16+
verbose = CONFIG.get_verbose_flag(event.get("verbose", False))
17+
configure_logger(logging.getLogger(), verbose=verbose)
1718
logger.debug(json.dumps(event))
18-
config.validate_input(event)
19+
20+
helpers.validate_input(event)
1921

2022
run_date = helpers.format_run_date(event["run-date"])
2123
run_type = event["run-type"]
2224
source = event["source"]
2325
next_step = event["next-step"]
2426
run_id = event.get("run-id", str(uuid.uuid4()))
2527
run_timestamp = event.get("run-timestamp", datetime.now(UTC).isoformat())
26-
timdex_bucket = os.environ["TIMDEX_S3_EXTRACT_BUCKET_ID"]
2728

2829
result = {
2930
"run-date": run_date,
@@ -33,24 +34,34 @@ def lambda_handler(event: dict, _context: dict) -> dict:
3334
}
3435

3536
if next_step == "extract":
36-
if source in config.GIS_SOURCES:
37+
if source in CONFIG.GIS_SOURCES:
3738
result["harvester-type"] = "geo"
3839
else:
3940
result["harvester-type"] = "oai"
4041
result["next-step"] = "transform"
4142
result["extract"] = commands.generate_extract_command(
42-
event, run_date, timdex_bucket, verbose
43+
event,
44+
run_date,
45+
CONFIG.timdex_bucket,
46+
verbose,
4347
)
4448
return result
4549

4650
if next_step == "transform":
4751
try:
4852
if source == "alma":
49-
alma_prep.prepare_alma_export_files(run_date, run_type, timdex_bucket)
53+
alma_prep.prepare_alma_export_files(
54+
run_date,
55+
run_type,
56+
CONFIG.timdex_bucket,
57+
)
5058
extract_output_files = helpers.list_s3_files_by_prefix(
51-
timdex_bucket,
59+
CONFIG.timdex_bucket,
5260
helpers.generate_step_output_prefix(
53-
source, run_date, run_type, "extract"
61+
source,
62+
run_date,
63+
run_type,
64+
"extract",
5465
),
5566
)
5667
except errors.NoFilesError:
@@ -72,19 +83,26 @@ def lambda_handler(event: dict, _context: dict) -> dict:
7283
)
7384
result["next-step"] = "load"
7485
result["transform"] = commands.generate_transform_commands(
75-
extract_output_files, event, timdex_bucket, run_id, run_timestamp
86+
extract_output_files,
87+
event,
88+
CONFIG.timdex_bucket,
89+
run_id,
90+
run_timestamp,
7691
)
7792
return result
7893

7994
if next_step == "load":
80-
if not helpers.dataset_records_exist_for_run(timdex_bucket, run_date, run_id):
95+
if not helpers.dataset_records_exist_for_run(run_date, run_id):
8196
result["failure"] = (
8297
"No records were found in the TIMDEX dataset for run_date "
8398
f"'{run_date}', run_id '{run_id}'."
8499
)
85100
return result
86101
result["load"] = commands.generate_load_commands(
87-
source, run_date, run_type, run_id, timdex_bucket
102+
source,
103+
run_date,
104+
run_type,
105+
run_id,
88106
)
89107
return result
90108

0 commit comments

Comments
 (0)