diff --git a/config.EXAMPLE/exporter.env.EXAMPLE b/config.EXAMPLE/exporter.env.EXAMPLE index 214801c..39a4119 100644 --- a/config.EXAMPLE/exporter.env.EXAMPLE +++ b/config.EXAMPLE/exporter.env.EXAMPLE @@ -22,3 +22,13 @@ INSTANCE_NAME= # How long must it have been since an original CSV was last modified before # progressing it down the pipeline? CSV_AGE_THRESHOLD_MINUTES=180 + +# For production use only process CSV files that have a date matching to +# yesterday. Setting this to FALSE will process all relevant csv, which may +# be useful for development or restarting a stopped service. +ONLY_USE_CSV_FROM_YESTERDAY=TRUE + +# if ONLY_USE_CSV_FROM_YESTERDAY set to FALSE, you can optionally +# specify a date to process format YYYY-MM-DD also accepts a regular +# expression to match multiple date +PROCESS_CSV_FROM_DATE= diff --git a/exporter-scripts/scheduled-script.sh b/exporter-scripts/scheduled-script.sh index eb494fa..7692e5d 100755 --- a/exporter-scripts/scheduled-script.sh +++ b/exporter-scripts/scheduled-script.sh @@ -23,11 +23,15 @@ SNAKEMAKE_CORES="${SNAKEMAKE_CORES:-1}" echo "$0: invoking snakemake, cores=$SNAKEMAKE_CORES, logging to $outer_log_file" # For telling the pipeline not to go all the way SNAKEMAKE_RULE_UNTIL="${SNAKEMAKE_RULE_UNTIL:-all}" +# We default to only matching files from yesterday +ONLY_USE_CSV_FROM_YESTERDAY="${ONLY_USE_CSV_FROM_YESTERDAY:-True}" +# Default will match any date containing numbers +PROCESS_CSV_FROM_DATE="${PROCESS_CSV_FROM_DATE:-'[0-9]'}" set +e snakemake --snakefile /app/src/pipeline/Snakefile \ --cores "$SNAKEMAKE_CORES" \ --until "$SNAKEMAKE_RULE_UNTIL" \ - --config CSV_AGE_THRESHOLD_MINUTES="${CSV_AGE_THRESHOLD_MINUTES}" \ + --config CSV_AGE_THRESHOLD_MINUTES="${CSV_AGE_THRESHOLD_MINUTES}" ONLY_USE_CSV_FROM_YESTERDAY="${ONLY_USE_CSV_FROM_YESTERDAY}" PROCESS_CSV_FROM_DATE="${PROCESS_CSV_FROM_DATE}"\ >> "$outer_log_file" 2>&1 ret_code=$? set -e diff --git a/src/pipeline/Snakefile b/src/pipeline/Snakefile index 5bff46a..ae0bf6b 100644 --- a/src/pipeline/Snakefile +++ b/src/pipeline/Snakefile @@ -1,5 +1,4 @@ import json -import os import time from datetime import datetime, timedelta, timezone from pathlib import Path @@ -9,30 +8,18 @@ from snakemake.io import glob_wildcards from exporter.ftps import do_upload from locations import ( - WAVEFORM_HASH_LOOKUPS, WAVEFORM_ORIGINAL_CSV, WAVEFORM_SNAKEMAKE_LOGS, WAVEFORM_PSEUDONYMISED_PARQUET, WAVEFORM_FTPS_LOGS, HASH_LOOKUP_JSON, - ORIGINAL_PARQUET_PATTERN, FILE_STEM_PATTERN, FILE_STEM_PATTERN_HASHED, - CSV_PATTERN, make_file_name, ) -from pseudon.hashing import do_hash from pseudon.pseudon import csv_to_parquets - -def get_file_age(file_path: Path) -> timedelta: - # need to use UTC to avoid DST issues - file_time_utc = datetime.fromtimestamp(file_path.stat().st_mtime, timezone.utc) - now_utc = datetime.now(timezone.utc) - return now_utc - file_time_utc - -def hash_csn(csn: str) -> str: - return do_hash("csn", csn) +from utils import config_bool, determine_eventual_outputs # How long before we assume that no more data will be written to the file, and @@ -45,89 +32,20 @@ def hash_csn(csn: str) -> str: # so being very cautious here may not be necessary. CSV_AGE_THRESHOLD_MINUTES = timedelta(minutes=float(config['CSV_AGE_THRESHOLD_MINUTES'])) +# To prevent Snakemake picking up old files in production, we can set a flag to to only +# use files with a date prefix matching yesterday's date. Defaults to True +ONLY_USE_CSV_FROM_YESTERDAY = config_bool(config['ONLY_USE_CSV_FROM_YESTERDAY']) + +# We can optionally select a single date to process or a regular expression to match multiple dates. +PROCESS_CSV_FROM_DATE = str(config['PROCESS_CSV_FROM_DATE']) + # The problem here is that snakemake works "backwards" in that you have to define which # files you want to be produced, yet we don't know what output files we need until the # input files (original CSVs) are present. # Therefore, look at the input files and work out the eventual output files so they can # be fed into snakemake. - -class InputCsvFile: - """Represent the different files in the pipeline from the point of view of one - csn + day + variable + channel combination (ie. one "original CSV" file). - These files are glued together by the Snakemake rules. - """ - def __init__(self, - date: str, - csn: str, - variable_id: str, - channel_id: str, - units: str): - self.date = date - self.csn = csn - self.hashed_csn = hash_csn(csn) - self.variable_id = variable_id - self.channel_id = channel_id - self.units = units - self._subs_dict = dict( - date=self.date, - csn=self.csn, - hashed_csn=self.hashed_csn, - variable_id=self.variable_id, - channel_id=self.channel_id, - units=self.units - ) - - def get_original_csv_path(self) -> Path: - return Path(make_file_name(str(CSV_PATTERN), self._subs_dict)) - - def get_original_parquet_path(self) -> Path: - return Path(make_file_name(str(ORIGINAL_PARQUET_PATTERN), self._subs_dict)) - - def get_pseudonymised_parquet_path(self) -> Path: - final_stem = make_file_name(FILE_STEM_PATTERN_HASHED, self._subs_dict) - return WAVEFORM_PSEUDONYMISED_PARQUET / f"{final_stem}.parquet" - - def get_ftps_uploaded_file(self) -> Path: - final_stem = make_file_name(FILE_STEM_PATTERN_HASHED, self._subs_dict) - return WAVEFORM_FTPS_LOGS / (final_stem + ".ftps.uploaded.json") - - def get_daily_hash_lookup(self) -> Path: - return Path(make_file_name(str(HASH_LOOKUP_JSON), self._subs_dict)) - - -def determine_eventual_outputs(): - # Discover all CSVs using the basic file name pattern - before = time.perf_counter() - all_wc = glob_wildcards(CSV_PATTERN) - - # all_wc.date, all_wc.csn, all_wc.streamId, all_wc.units are parallel lists - # e.g. all_wc.csn[0] corresponds to all_wc.date[0], etc. - - # Build reverse lookup using named wildcards - _hash_to_csn: dict[str, str] = {} - for csn in all_wc.csn: - _hash_to_csn[hash_csn(csn)] = csn - # Apply all_wc to FILE_STEM_PATTERN_HASHED to generate the output stems - _all_outputs = [] - for date, csn, variable_id, channel_id, units \ - in zip(all_wc.date, all_wc.csn, all_wc.variable_id, all_wc.channel_id, all_wc.units): - input_file_obj = InputCsvFile(date, csn, variable_id, channel_id, units) - orig_file = input_file_obj.get_original_csv_path() - if csn == 'unmatched_csn': - print(f"Skipping file with unmatched CSN: {orig_file}") - continue - file_age = get_file_age(orig_file) - if file_age < CSV_AGE_THRESHOLD_MINUTES: - print(f"File too new (age={file_age}): {orig_file}") - continue - _all_outputs.append(input_file_obj) - after = time.perf_counter() - print(f"Snakemake [pid={os.getpid()}]: Determined {len(_all_outputs)} output files using newness threshold {CSV_AGE_THRESHOLD_MINUTES} in {after - before} seconds") - return _all_outputs, _hash_to_csn - - -all_outputs, hash_to_csn = determine_eventual_outputs() +all_outputs, hash_to_csn = determine_eventual_outputs(CSV_AGE_THRESHOLD_MINUTES, ONLY_USE_CSV_FROM_YESTERDAY, PROCESS_CSV_FROM_DATE) ALL_FTPS_UPLOADED = [ao.get_ftps_uploaded_file() for ao in all_outputs] ALL_DAILY_HASH_LOOKUPS = sorted({ao.get_daily_hash_lookup() for ao in all_outputs}) diff --git a/src/pipeline/__init__.py b/src/pipeline/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/pipeline/utils.py b/src/pipeline/utils.py new file mode 100644 index 0000000..6bf58b4 --- /dev/null +++ b/src/pipeline/utils.py @@ -0,0 +1,133 @@ +import time +from datetime import datetime, timedelta, timezone +from pathlib import Path +import re + +from snakemake.io import glob_wildcards + +from pseudon.hashing import do_hash +from locations import ( + WAVEFORM_PSEUDONYMISED_PARQUET, + WAVEFORM_FTPS_LOGS, + HASH_LOOKUP_JSON, + ORIGINAL_PARQUET_PATTERN, + FILE_STEM_PATTERN_HASHED, + CSV_PATTERN, + make_file_name, +) + + +def config_bool(value): + """Convert a config value from env/CLI to a bool.""" + s = str(value).strip().lower() + if s in {"", "0", "false"}: + return False + if s in {"1", "true"}: + return True + raise ValueError(f'Can\'t interpret value "{value}" as a boolean') + + +def hash_csn(csn: str) -> str: + return do_hash("csn", csn) + + +class InputCsvFile: + """Represent the different files in the pipeline from the point of view of one csn + + day + variable + channel combination (ie. + + one "original CSV" file). These files are glued together by the Snakemake rules. + """ + + def __init__( + self, date: str, csn: str, variable_id: str, channel_id: str, units: str + ): + self.date = date + self.csn = csn + self.hashed_csn = hash_csn(csn) + self.variable_id = variable_id + self.channel_id = channel_id + self.units = units + self._subs_dict = dict( + date=self.date, + csn=self.csn, + hashed_csn=self.hashed_csn, + variable_id=self.variable_id, + channel_id=self.channel_id, + units=self.units, + ) + + def get_original_csv_path(self) -> Path: + return Path(make_file_name(str(CSV_PATTERN), self._subs_dict)) + + def get_original_parquet_path(self) -> Path: + return Path(make_file_name(str(ORIGINAL_PARQUET_PATTERN), self._subs_dict)) + + def get_pseudonymised_parquet_path(self) -> Path: + final_stem = make_file_name(FILE_STEM_PATTERN_HASHED, self._subs_dict) + return WAVEFORM_PSEUDONYMISED_PARQUET / f"{final_stem}.parquet" + + def get_ftps_uploaded_file(self) -> Path: + final_stem = make_file_name(FILE_STEM_PATTERN_HASHED, self._subs_dict) + return WAVEFORM_FTPS_LOGS / (final_stem + ".ftps.uploaded.json") + + def get_daily_hash_lookup(self) -> Path: + return Path(make_file_name(str(HASH_LOOKUP_JSON), self._subs_dict)) + + +def get_file_age(file_path: Path) -> timedelta: + # need to use UTC to avoid DST issues + file_time_utc = datetime.fromtimestamp(file_path.stat().st_mtime, timezone.utc) + now_utc = datetime.now(timezone.utc) + return now_utc - file_time_utc + + +def determine_eventual_outputs( + csv_wait_time: timedelta, process_only_yesterday: bool, process_datestring: str +): + """ + :param csv_wait_time: only process files older than this + :param process_only_yesterday: if false we process all dates, true only from yesterday + :param process_datestring: a regular expression to match datestrings. Has no effect if + process_only_yesterday is true + :returns: A list of InputCsvFile and a dictionary containing the hash and csn values. + """ + # Discover all CSVs using the basic file name pattern + before = time.perf_counter() + all_wc = glob_wildcards(CSV_PATTERN) + + # all_wc.date, all_wc.csn, all_wc.streamId, all_wc.units are parallel lists + # e.g. all_wc.csn[0] corresponds to all_wc.date[0], etc. + + # Build reverse lookup using named wildcards + _hash_to_csn: dict[str, str] = {} + + if process_only_yesterday: + process_datestring = ( + datetime.now(tz=timezone.utc).date() - timedelta(days=1) + ).isoformat() + + for csn in all_wc.csn: + _hash_to_csn[hash_csn(csn)] = csn + # Apply all_wc to FILE_STEM_PATTERN_HASHED to generate the output stems + _all_outputs = [] + for date, csn, variable_id, channel_id, units in zip( + all_wc.date, all_wc.csn, all_wc.variable_id, all_wc.channel_id, all_wc.units + ): + input_file_obj = InputCsvFile(date, csn, variable_id, channel_id, units) + orig_file = input_file_obj.get_original_csv_path() + if re.search(process_datestring, date) is None: + print(f"Skipping file not from {process_datestring} {orig_file}") + continue + if csn == "unmatched_csn": + print(f"Skipping file with unmatched CSN: {orig_file}") + continue + file_age = get_file_age(orig_file) + if file_age < csv_wait_time: + print(f"File too new (age={file_age}): {orig_file}") + continue + _all_outputs.append(input_file_obj) + after = time.perf_counter() + print( + f"Calculated output files using newness threshold {csv_wait_time} in {after - before} seconds" + ) + return _all_outputs, _hash_to_csn diff --git a/tests/helpers.py b/tests/helpers.py new file mode 100644 index 0000000..63d82f2 --- /dev/null +++ b/tests/helpers.py @@ -0,0 +1,90 @@ +import math +from dataclasses import dataclass +from decimal import Decimal +import random +from stablehash import stablehash + + +@dataclass +class TestFileDescription: + __test__ = False + date: str + start_timestamp: float + csn: str + mrn: str + location: str + variable_id: str + channel_id: str + sampling_rate: int + units: str + num_rows: int + _test_values: list = None + + def get_hashed_csn(self): + """This test runs outside of a docker container and the hasher doesn't expose a + fixed port, so is somewhat hard to find. + + Easier to just hard code the CSNs since we have a limited set of input CSNs. We + are defining expected values here; the test exporter will still use the test + hasher to generate the actual values. + """ + static_lookup = { + # These hashes are not secrets, they're keyed hashes of the input values + "SECRET_CSN_1234": "253d32c67e0d5aa4cdc7e9fc8442710dee8338c92abc3b905ab4b2f03194fc7e", # pragma: allowlist secret + "SECRET_CSN_1235": "ea2fda353f54926ae9d43fbc0ff4253912c250a137e9bd38bed860abacfe03ef", # pragma: allowlist secret + } + try: + return static_lookup[self.csn] + except KeyError as e: + # See develop.md "Manual hash lookup" if you need to add value + raise KeyError( + f"Unknown CSN '{self.csn}' passed to static get_hashed_csn(). " + f"You may need to manually add the known hash for your CSN. See docs for details." + ) from e + + def get_orig_csv(self): + return f"{self.date}/{self.date}.{self.csn}.{self.variable_id}.{self.channel_id}.{self.units}.csv" + + def get_orig_parquet(self): + return f"{self.date}/{self.date}.{self.csn}.{self.variable_id}.{self.channel_id}.{self.units}.parquet" + + def get_pseudon_parquet(self): + return f"{self.date}/{self.date}.{self.get_hashed_csn()}.{self.variable_id}.{self.channel_id}.{self.units}.parquet" + + def get_hashes(self): + return f"{self.date}/{self.date}.hashes.json" + + def get_stable_hash(self): + """To aid in generating different but repeatable test data for each file.""" + return stablehash( + ( + self.date, + self.csn, + self.mrn, + self.location, + self.variable_id, + self.channel_id, + ) + ) + + def get_stable_seed(self): + byte_hash = self.get_stable_hash().digest()[:4] + return int.from_bytes(byte_hash) + + def generate_data(self, vals_per_row: int) -> list[list[Decimal]]: + if self._test_values is None: + seed = self.get_stable_seed() + rng = random.Random(seed) + base_ampl = rng.normalvariate(1, 0.2) + base_offset = rng.normalvariate(0, 0.2) + self._test_values = [] + for row_num in range(self.num_rows): + values_row = [ + Decimal.from_float( + base_ampl * math.sin(base_offset + row_num * vals_per_row + i) + ).quantize(Decimal("1.0000")) + for i in range(vals_per_row) + ] + self._test_values.append(values_row) + # return as string but keep the numerical representation for comparison to parquet later + return self._test_values diff --git a/tests/test_snakemake.py b/tests/test_snakemake.py new file mode 100644 index 0000000..c32b07d --- /dev/null +++ b/tests/test_snakemake.py @@ -0,0 +1,132 @@ +from datetime import timedelta, datetime, timezone +from pathlib import Path + +import pytest + +import locations + +import src.pipeline.utils as utils + +from tests.helpers import TestFileDescription + + +def mock_do_hash(csn: str): + return "no-hash" + + +def _make_test_input_csv(csv_dir: Path): + files = [] + # today + today = datetime.now(tz=timezone.utc).date() + files.append( + TestFileDescription( + today.isoformat(), + 1735740783.0, + "SECRET_CSN_1235", + "SECRET_MRN_12346", + "SECRET_LOCATION_123", + "27", + "noCh", + 50, + "uV", + 4, + ) + ) + + # from yesterday + files.append( + TestFileDescription( + (today - timedelta(days=1)).isoformat(), + 1735740783.0, + "SECRET_CSN_1235", + "SECRET_MRN_12346", + "SECRET_LOCATION_123", + "27", + "noCh", + 50, + "uV", + 4, + ) + ) + # two files from 2 days ago + files.append( + TestFileDescription( + (today - timedelta(days=2)).isoformat(), + 1735801965.0, + "SECRET_CSN_1234", + "SECRET_MRN_12345", + "SECRET_LOCATION_123", + "27", + "noCh", + 50, + "uV", + 5, + ) + ) + files.append( + TestFileDescription( + (today - timedelta(days=2)).isoformat(), + 1735801965.0, + "SECRET_CSN_1234", + "SECRET_MRN_12345", + "SECRET_LOCATION_123", + "27", + "14", + 50, + "uV", + 4, + ) + ) + + for t in files: + csv_path = csv_dir / t.get_orig_csv() + csv_path.parent.mkdir(parents=True, exist_ok=True) + with open(csv_path, "w") as f: + # for this test doesn't use the contents of the files, only their + # filenames, so we'll save some processing by creating empty files. + f.write("") + return [f.get_orig_csv() for f in files] + + +@pytest.mark.parametrize( + "process_only_yesterday, process_datestring, expected_file_indexes", + [ + # with process only yesterday true we should return only the single file from yesterday + (True, "", [1]), + # process only yesterday overrides process_datestring + ( + True, + (datetime.now(tz=timezone.utc).date() - timedelta(days=2)).isoformat(), + [1], + ), + # with process only yesterday false and an empty process_datestring we should return all 4 files. + (False, "", [0, 1, 2, 3]), + # with process only yesterday false and process_datestring set to two days ago we should return the two files from two days ago + ( + False, + (datetime.now(tz=timezone.utc).date() - timedelta(days=2)).isoformat(), + [2, 3], + ), + ], +) +def test_determine_eventual_outputs( + tmp_path: Path, + monkeypatch, + process_only_yesterday, + process_datestring, + expected_file_indexes, +): + original_csv_dir = tmp_path / locations.WAVEFORM_ORIGINAL_CSV.relative_to("/") + expected_paths = _make_test_input_csv(original_csv_dir) + monkeypatch.setattr( + utils, "CSV_PATTERN", original_csv_dir / (locations.FILE_STEM_PATTERN + ".csv") + ) + monkeypatch.setattr("src.pipeline.utils.hash_csn", mock_do_hash) + csv_wait_time = timedelta(0) + + files, hash_to_csn = utils.determine_eventual_outputs( + csv_wait_time, process_only_yesterday, process_datestring + ) + assert { + str(f.get_original_csv_path().relative_to(original_csv_dir)) for f in files + } == {expected_paths[ex_i] for ex_i in expected_file_indexes} diff --git a/tests/test_snakemake_integration.py b/tests/test_snakemake_integration.py index 47b4270..50d7b9c 100644 --- a/tests/test_snakemake_integration.py +++ b/tests/test_snakemake_integration.py @@ -1,10 +1,7 @@ import json -import math import os import re -from dataclasses import dataclass from decimal import Decimal -import random import pyarrow as pa import pyarrow.parquet as pq @@ -13,7 +10,7 @@ from pathlib import Path import pytest -from stablehash import stablehash +from tests.helpers import TestFileDescription def _run_compose( @@ -52,91 +49,6 @@ def build_required_images(): result.check_returncode() -@dataclass -class TestFileDescription: - __test__ = False - date: str - start_timestamp: float - csn: str - mrn: str - location: str - variable_id: str - channel_id: str - sampling_rate: int - units: str - num_rows: int - _test_values: list = None - - def get_hashed_csn(self): - """This test runs outside of a docker container and the hasher doesn't expose a - fixed port, so is somewhat hard to find. - - Easier to just hard code the CSNs since we have a limited set of input CSNs. We - are defining expected values here; the test exporter will still use the test - hasher to generate the actual values. - """ - static_lookup = { - # These hashes are not secrets, they're keyed hashes of the input values - "SECRET_CSN_1234": "253d32c67e0d5aa4cdc7e9fc8442710dee8338c92abc3b905ab4b2f03194fc7e", # pragma: allowlist secret - "SECRET_CSN_1235": "ea2fda353f54926ae9d43fbc0ff4253912c250a137e9bd38bed860abacfe03ef", # pragma: allowlist secret - } - try: - return static_lookup[self.csn] - except KeyError as e: - # See develop.md "Manual hash lookup" if you need to add value - raise KeyError( - f"Unknown CSN '{self.csn}' passed to static get_hashed_csn(). " - f"You may need to manually add the known hash for your CSN. See docs for details." - ) from e - - def get_orig_csv(self): - return f"{self.date}/{self.date}.{self.csn}.{self.variable_id}.{self.channel_id}.{self.units}.csv" - - def get_orig_parquet(self): - return f"{self.date}/{self.date}.{self.csn}.{self.variable_id}.{self.channel_id}.{self.units}.parquet" - - def get_pseudon_parquet(self): - return f"{self.date}/{self.date}.{self.get_hashed_csn()}.{self.variable_id}.{self.channel_id}.{self.units}.parquet" - - def get_hashes(self): - return f"{self.date}/{self.date}.hashes.json" - - def get_stable_hash(self): - """To aid in generating different but repeatable test data for each file.""" - return stablehash( - ( - self.date, - self.csn, - self.mrn, - self.location, - self.variable_id, - self.channel_id, - ) - ) - - def get_stable_seed(self): - byte_hash = self.get_stable_hash().digest()[:4] - return int.from_bytes(byte_hash) - - def generate_data(self, vals_per_row: int) -> list[list[Decimal]]: - if self._test_values is None: - seed = self.get_stable_seed() - rng = random.Random(seed) - base_ampl = rng.normalvariate(1, 0.2) - base_offset = rng.normalvariate(0, 0.2) - self._test_values = [] - for row_num in range(self.num_rows): - values_row = [ - Decimal.from_float( - base_ampl * math.sin(base_offset + row_num * vals_per_row + i) - ).quantize(Decimal("1.0000")) - for i in range(vals_per_row) - ] - self._test_values.append(values_row) - # return as string but keep the numerical representation for comparison to parquet later - return self._test_values - - def _make_test_input_csv(tmp_path, t: TestFileDescription) -> list[list[Decimal]]: original_csv_dir = tmp_path / "original-csv" csv_path = original_csv_dir / t.get_orig_csv() @@ -348,6 +260,9 @@ def _run_snakemake(tmp_path): # It works around this in normal use by reading env vars only from the bind-mounted exporter.env file. # So to use a different config during test, we must override that file with a special version # that we create here. + # Note that since we bypass cron here, any config on your dev machine will be picked up and then + # overwritten by what is below. Therefore, you should explicitly set values below even if you only + # want the default value to be used. tmp_exporter_env_path = tmp_path / "config/exporter.env" tmp_exporter_env_path.parent.mkdir(exist_ok=True) tmp_exporter_env_path.write_text( @@ -355,6 +270,8 @@ def _run_snakemake(tmp_path): "SNAKEMAKE_CORES=1\n" "INSTANCE_NAME=pytest\n" "CSV_AGE_THRESHOLD_MINUTES=5\n" + "ONLY_USE_CSV_FROM_YESTERDAY=False\n" + "PROCESS_CSV_FROM_DATE=\n" ) # run system under test (exporter container) in foreground compose_args = [