From 2558d5818d7be4a8a9a6bb54c633c22704f166d8 Mon Sep 17 00:00:00 2001 From: Stephen Thompson Date: Wed, 11 Feb 2026 10:07:56 +0000 Subject: [PATCH 01/19] Refactor Snakefile to make it easier to write a unit test for date exclusion. --- src/pipeline/Snakefile | 15 ++---- src/pipeline/__init__.py | 0 src/pipeline/utils.py | 111 +++++++++++++++++++++++++++++++++++++++ tests/snakemake_tests.py | 22 ++++++++ 4 files changed, 138 insertions(+), 10 deletions(-) create mode 100644 src/pipeline/__init__.py create mode 100644 src/pipeline/utils.py create mode 100644 tests/snakemake_tests.py diff --git a/src/pipeline/Snakefile b/src/pipeline/Snakefile index 5bff46a..c376d15 100644 --- a/src/pipeline/Snakefile +++ b/src/pipeline/Snakefile @@ -21,18 +21,9 @@ from locations import ( CSV_PATTERN, make_file_name, ) -from pseudon.hashing import do_hash from pseudon.pseudon import csv_to_parquets - -def get_file_age(file_path: Path) -> timedelta: - # need to use UTC to avoid DST issues - file_time_utc = datetime.fromtimestamp(file_path.stat().st_mtime, timezone.utc) - now_utc = datetime.now(timezone.utc) - return now_utc - file_time_utc - -def hash_csn(csn: str) -> str: - return do_hash("csn", csn) +from utils import determine_eventual_outputs # How long before we assume that no more data will be written to the file, and @@ -52,6 +43,7 @@ CSV_AGE_THRESHOLD_MINUTES = timedelta(minutes=float(config['CSV_AGE_THRESHOLD_MI # be fed into snakemake. +<<<<<<< HEAD class InputCsvFile: """Represent the different files in the pipeline from the point of view of one csn + day + variable + channel combination (ie. one "original CSV" file). @@ -128,6 +120,9 @@ def determine_eventual_outputs(): all_outputs, hash_to_csn = determine_eventual_outputs() +======= +all_outputs, hash_to_csn = determine_eventual_outputs(CSV_PATTERN, CSV_WAIT_TIME) +>>>>>>> 601f2c3 (Refactor Snakefile to make it easier to write a unit test for date exclusion.) ALL_FTPS_UPLOADED = [ao.get_ftps_uploaded_file() for ao in all_outputs] ALL_DAILY_HASH_LOOKUPS = sorted({ao.get_daily_hash_lookup() for ao in all_outputs}) diff --git a/src/pipeline/__init__.py b/src/pipeline/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/pipeline/utils.py b/src/pipeline/utils.py new file mode 100644 index 0000000..5dd657d --- /dev/null +++ b/src/pipeline/utils.py @@ -0,0 +1,111 @@ +import json +import time +from datetime import datetime, timedelta, timezone +from pathlib import Path + +import pyarrow.parquet as pq +from snakemake.io import glob_wildcards + +from pseudon.hashing import do_hash +from locations import ( + WAVEFORM_HASH_LOOKUPS, + WAVEFORM_ORIGINAL_CSV, + WAVEFORM_PSEUDONYMISED_PARQUET, + WAVEFORM_FTPS_LOGS, + ORIGINAL_PARQUET_PATTERN, + FILE_STEM_PATTERN_HASHED, + CSV_PATTERN, + make_file_name, +) + + +def hash_csn(csn: str) -> str: + return do_hash("csn", csn) + + +class InputCsvFile: + """Represent the different files in the pipeline from the point of view of one csn + + day + variable + channel combination (ie. + + one "original CSV" file). These files are glued together by the Snakemake rules. + """ + + def __init__( + self, date: str, csn: str, variable_id: str, channel_id: str, units: str + ): + self.date = date + self.csn = csn + self.hashed_csn = hash_csn(csn) + self.variable_id = variable_id + self.channel_id = channel_id + self.units = units + self._subs_dict = dict( + date=self.date, + csn=self.csn, + hashed_csn=self.hashed_csn, + variable_id=self.variable_id, + channel_id=self.channel_id, + units=self.units, + ) + + def get_original_csv_path(self) -> Path: + return Path(make_file_name(str(CSV_PATTERN), self._subs_dict)) + + def get_original_parquet_path(self) -> Path: + return Path(make_file_name(str(ORIGINAL_PARQUET_PATTERN), self._subs_dict)) + + def get_pseudonymised_parquet_path(self) -> Path: + final_stem = make_file_name(FILE_STEM_PATTERN_HASHED, self._subs_dict) + return WAVEFORM_PSEUDONYMISED_PARQUET / f"{final_stem}.parquet" + + def get_ftps_uploaded_file(self) -> Path: + final_stem = make_file_name(FILE_STEM_PATTERN_HASHED, self._subs_dict) + return WAVEFORM_FTPS_LOGS / (final_stem + ".ftps.uploaded.json") + + def get_daily_hash_lookup(self) -> Path: + return WAVEFORM_HASH_LOOKUPS / f"{self.date}.hashes.json" + + +def get_file_age(file_path: Path) -> timedelta: + # need to use UTC to avoid DST issues + file_time_utc = datetime.fromtimestamp(file_path.stat().st_mtime, timezone.utc) + now_utc = datetime.now(timezone.utc) + return now_utc - file_time_utc + + +def determine_eventual_outputs(csv_pattern: str, csv_wait_time: timedelta): + # Discover all CSVs using the basic file name pattern + before = time.perf_counter() + all_wc = glob_wildcards(csv_pattern) + + # all_wc.date, all_wc.csn, all_wc.streamId, all_wc.units are parallel lists + # e.g. all_wc.csn[0] corresponds to all_wc.date[0], etc. + + # Build reverse lookup using named wildcards + _hash_to_csn: dict[str, str] = {} + for csn in all_wc.csn: + _hash_to_csn[hash_csn(csn)] = csn + # Apply all_wc to FILE_STEM_PATTERN_HASHED to generate the output stems + _all_outputs = [] + for date, csn, variable_id, channel_id, units in zip( + all_wc.date, all_wc.csn, all_wc.variable_id, all_wc.channel_id, all_wc.units + ): + input_file_obj = InputCsvFile(date, csn, variable_id, channel_id, units) + orig_file = input_file_obj.get_original_csv_path() + ## uncomment to sort by file date + # if date == '2025-01-01': + # print(f"Skipping file with bad date: {orig_file}") + # continue + if csn == "unmatched_csn": + print(f"Skipping file with unmatched CSN: {orig_file}") + continue + file_age = get_file_age(orig_file) + if file_age < csv_wait_time: + print(f"File too new (age={file_age}): {orig_file}") + continue + _all_outputs.append(input_file_obj) + after = time.perf_counter() + print( + f"Calculated output files using newness threshold {csv_wait_time} in {after - before} seconds" + ) + return _all_outputs, _hash_to_csn diff --git a/tests/snakemake_tests.py b/tests/snakemake_tests.py new file mode 100644 index 0000000..8572116 --- /dev/null +++ b/tests/snakemake_tests.py @@ -0,0 +1,22 @@ +import json +import math +import os +import re +from dataclasses import dataclass +from decimal import Decimal +import random + +import pyarrow as pa +import pyarrow.parquet as pq +import subprocess +import time +from pathlib import Path + +import pytest +from stablehash import stablehash + +from src.pipeline import utils + + +def test_determine_eventual_outputs(): + assert True From 61a5b59d7bcdd94cf0b0074525ef7015fe545fa5 Mon Sep 17 00:00:00 2001 From: Stephen Thompson Date: Wed, 11 Feb 2026 13:49:17 +0000 Subject: [PATCH 02/19] Implementing test --- src/pipeline/utils.py | 4 ++-- tests/snakemake_tests.py | 36 +++++++++++++++++++----------------- 2 files changed, 21 insertions(+), 19 deletions(-) diff --git a/src/pipeline/utils.py b/src/pipeline/utils.py index 5dd657d..3cb1ec0 100644 --- a/src/pipeline/utils.py +++ b/src/pipeline/utils.py @@ -73,10 +73,10 @@ def get_file_age(file_path: Path) -> timedelta: return now_utc - file_time_utc -def determine_eventual_outputs(csv_pattern: str, csv_wait_time: timedelta): +def determine_eventual_outputs(csv_wait_time: timedelta): # Discover all CSVs using the basic file name pattern before = time.perf_counter() - all_wc = glob_wildcards(csv_pattern) + all_wc = glob_wildcards(CSV_PATTERN) # all_wc.date, all_wc.csn, all_wc.streamId, all_wc.units are parallel lists # e.g. all_wc.csn[0] corresponds to all_wc.date[0], etc. diff --git a/tests/snakemake_tests.py b/tests/snakemake_tests.py index 8572116..42c4665 100644 --- a/tests/snakemake_tests.py +++ b/tests/snakemake_tests.py @@ -1,22 +1,24 @@ -import json -import math -import os -import re -from dataclasses import dataclass -from decimal import Decimal -import random - -import pyarrow as pa -import pyarrow.parquet as pq -import subprocess -import time -from pathlib import Path +from datetime import timedelta import pytest -from stablehash import stablehash +import locations + +import src.pipeline.utils as utils + -from src.pipeline import utils +def mock_do_hash(csn: str): + return "no-hash" -def test_determine_eventual_outputs(): - assert True +def test_determine_eventual_outputs(monkeypatch): + tmp_path = "../" + original_csv_dir = tmp_path / locations.WAVEFORM_ORIGINAL_CSV.relative_to("/") + monkeypatch.setattr( + utils, + "CSV_PATTERN", + original_csv_dir / "{date}.{csn}.{variable_id}.{channel_id}.{units}.csv", + ) + monkeypatch.setattr("src.pipeline.utils.hash_csn", mock_do_hash) + csv_wait_time = timedelta(minutes=5) + print(utils.determine_eventual_outputs(csv_wait_time)) + assert False From 452f713d9d1e6e7496cbc2bc36f50a28d7a2b2ec Mon Sep 17 00:00:00 2001 From: Stephen Thompson Date: Wed, 11 Feb 2026 13:58:07 +0000 Subject: [PATCH 03/19] Fix errors from rebase --- src/pipeline/Snakefile | 82 +----------------------------------------- 1 file changed, 1 insertion(+), 81 deletions(-) diff --git a/src/pipeline/Snakefile b/src/pipeline/Snakefile index c376d15..5bd36ae 100644 --- a/src/pipeline/Snakefile +++ b/src/pipeline/Snakefile @@ -42,87 +42,7 @@ CSV_AGE_THRESHOLD_MINUTES = timedelta(minutes=float(config['CSV_AGE_THRESHOLD_MI # Therefore, look at the input files and work out the eventual output files so they can # be fed into snakemake. - -<<<<<<< HEAD -class InputCsvFile: - """Represent the different files in the pipeline from the point of view of one - csn + day + variable + channel combination (ie. one "original CSV" file). - These files are glued together by the Snakemake rules. - """ - def __init__(self, - date: str, - csn: str, - variable_id: str, - channel_id: str, - units: str): - self.date = date - self.csn = csn - self.hashed_csn = hash_csn(csn) - self.variable_id = variable_id - self.channel_id = channel_id - self.units = units - self._subs_dict = dict( - date=self.date, - csn=self.csn, - hashed_csn=self.hashed_csn, - variable_id=self.variable_id, - channel_id=self.channel_id, - units=self.units - ) - - def get_original_csv_path(self) -> Path: - return Path(make_file_name(str(CSV_PATTERN), self._subs_dict)) - - def get_original_parquet_path(self) -> Path: - return Path(make_file_name(str(ORIGINAL_PARQUET_PATTERN), self._subs_dict)) - - def get_pseudonymised_parquet_path(self) -> Path: - final_stem = make_file_name(FILE_STEM_PATTERN_HASHED, self._subs_dict) - return WAVEFORM_PSEUDONYMISED_PARQUET / f"{final_stem}.parquet" - - def get_ftps_uploaded_file(self) -> Path: - final_stem = make_file_name(FILE_STEM_PATTERN_HASHED, self._subs_dict) - return WAVEFORM_FTPS_LOGS / (final_stem + ".ftps.uploaded.json") - - def get_daily_hash_lookup(self) -> Path: - return Path(make_file_name(str(HASH_LOOKUP_JSON), self._subs_dict)) - - -def determine_eventual_outputs(): - # Discover all CSVs using the basic file name pattern - before = time.perf_counter() - all_wc = glob_wildcards(CSV_PATTERN) - - # all_wc.date, all_wc.csn, all_wc.streamId, all_wc.units are parallel lists - # e.g. all_wc.csn[0] corresponds to all_wc.date[0], etc. - - # Build reverse lookup using named wildcards - _hash_to_csn: dict[str, str] = {} - for csn in all_wc.csn: - _hash_to_csn[hash_csn(csn)] = csn - # Apply all_wc to FILE_STEM_PATTERN_HASHED to generate the output stems - _all_outputs = [] - for date, csn, variable_id, channel_id, units \ - in zip(all_wc.date, all_wc.csn, all_wc.variable_id, all_wc.channel_id, all_wc.units): - input_file_obj = InputCsvFile(date, csn, variable_id, channel_id, units) - orig_file = input_file_obj.get_original_csv_path() - if csn == 'unmatched_csn': - print(f"Skipping file with unmatched CSN: {orig_file}") - continue - file_age = get_file_age(orig_file) - if file_age < CSV_AGE_THRESHOLD_MINUTES: - print(f"File too new (age={file_age}): {orig_file}") - continue - _all_outputs.append(input_file_obj) - after = time.perf_counter() - print(f"Snakemake [pid={os.getpid()}]: Determined {len(_all_outputs)} output files using newness threshold {CSV_AGE_THRESHOLD_MINUTES} in {after - before} seconds") - return _all_outputs, _hash_to_csn - - -all_outputs, hash_to_csn = determine_eventual_outputs() -======= -all_outputs, hash_to_csn = determine_eventual_outputs(CSV_PATTERN, CSV_WAIT_TIME) ->>>>>>> 601f2c3 (Refactor Snakefile to make it easier to write a unit test for date exclusion.) +all_outputs, hash_to_csn = determine_eventual_outputs(CSV_WAIT_TIME) ALL_FTPS_UPLOADED = [ao.get_ftps_uploaded_file() for ao in all_outputs] ALL_DAILY_HASH_LOOKUPS = sorted({ao.get_daily_hash_lookup() for ao in all_outputs}) From 50483fb25a2440a5a3577afa39923c54e6c35dab Mon Sep 17 00:00:00 2001 From: Stephen Thompson Date: Wed, 11 Feb 2026 15:23:39 +0000 Subject: [PATCH 04/19] Debug after rebase --- src/pipeline/Snakefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pipeline/Snakefile b/src/pipeline/Snakefile index 5bd36ae..ad0100f 100644 --- a/src/pipeline/Snakefile +++ b/src/pipeline/Snakefile @@ -42,7 +42,7 @@ CSV_AGE_THRESHOLD_MINUTES = timedelta(minutes=float(config['CSV_AGE_THRESHOLD_MI # Therefore, look at the input files and work out the eventual output files so they can # be fed into snakemake. -all_outputs, hash_to_csn = determine_eventual_outputs(CSV_WAIT_TIME) +all_outputs, hash_to_csn = determine_eventual_outputs(CSV_AGE_THRESHOLD_MINUTES) ALL_FTPS_UPLOADED = [ao.get_ftps_uploaded_file() for ao in all_outputs] ALL_DAILY_HASH_LOOKUPS = sorted({ao.get_daily_hash_lookup() for ao in all_outputs}) From dbc774ef5f6ecd5283d72b4ecc5f8b704f954d78 Mon Sep 17 00:00:00 2001 From: Stephen Thompson Date: Wed, 11 Feb 2026 15:33:15 +0000 Subject: [PATCH 05/19] More post rebase fixes --- src/pipeline/utils.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/pipeline/utils.py b/src/pipeline/utils.py index 3cb1ec0..04407ea 100644 --- a/src/pipeline/utils.py +++ b/src/pipeline/utils.py @@ -12,6 +12,7 @@ WAVEFORM_ORIGINAL_CSV, WAVEFORM_PSEUDONYMISED_PARQUET, WAVEFORM_FTPS_LOGS, + HASH_LOOKUP_JSON, ORIGINAL_PARQUET_PATTERN, FILE_STEM_PATTERN_HASHED, CSV_PATTERN, @@ -63,7 +64,7 @@ def get_ftps_uploaded_file(self) -> Path: return WAVEFORM_FTPS_LOGS / (final_stem + ".ftps.uploaded.json") def get_daily_hash_lookup(self) -> Path: - return WAVEFORM_HASH_LOOKUPS / f"{self.date}.hashes.json" + return Path(make_file_name(str(HASH_LOOKUP_JSON), self._subs_dict)) def get_file_age(file_path: Path) -> timedelta: From 67e763b82c5dfff2e0264c9fba595e3f0bdc7a98 Mon Sep 17 00:00:00 2001 From: Stephen Thompson Date: Wed, 11 Feb 2026 16:11:49 +0000 Subject: [PATCH 06/19] Put test file descript in a separate helper file so we can use it for multiple tests. --- tests/helpers.py | 90 +++++++++++++++++++++++++++++ tests/snakemake_tests.py | 47 ++++++++++++++- tests/test_snakemake_integration.py | 90 +---------------------------- 3 files changed, 136 insertions(+), 91 deletions(-) create mode 100644 tests/helpers.py diff --git a/tests/helpers.py b/tests/helpers.py new file mode 100644 index 0000000..63d82f2 --- /dev/null +++ b/tests/helpers.py @@ -0,0 +1,90 @@ +import math +from dataclasses import dataclass +from decimal import Decimal +import random +from stablehash import stablehash + + +@dataclass +class TestFileDescription: + __test__ = False + date: str + start_timestamp: float + csn: str + mrn: str + location: str + variable_id: str + channel_id: str + sampling_rate: int + units: str + num_rows: int + _test_values: list = None + + def get_hashed_csn(self): + """This test runs outside of a docker container and the hasher doesn't expose a + fixed port, so is somewhat hard to find. + + Easier to just hard code the CSNs since we have a limited set of input CSNs. We + are defining expected values here; the test exporter will still use the test + hasher to generate the actual values. + """ + static_lookup = { + # These hashes are not secrets, they're keyed hashes of the input values + "SECRET_CSN_1234": "253d32c67e0d5aa4cdc7e9fc8442710dee8338c92abc3b905ab4b2f03194fc7e", # pragma: allowlist secret + "SECRET_CSN_1235": "ea2fda353f54926ae9d43fbc0ff4253912c250a137e9bd38bed860abacfe03ef", # pragma: allowlist secret + } + try: + return static_lookup[self.csn] + except KeyError as e: + # See develop.md "Manual hash lookup" if you need to add value + raise KeyError( + f"Unknown CSN '{self.csn}' passed to static get_hashed_csn(). " + f"You may need to manually add the known hash for your CSN. See docs for details." + ) from e + + def get_orig_csv(self): + return f"{self.date}/{self.date}.{self.csn}.{self.variable_id}.{self.channel_id}.{self.units}.csv" + + def get_orig_parquet(self): + return f"{self.date}/{self.date}.{self.csn}.{self.variable_id}.{self.channel_id}.{self.units}.parquet" + + def get_pseudon_parquet(self): + return f"{self.date}/{self.date}.{self.get_hashed_csn()}.{self.variable_id}.{self.channel_id}.{self.units}.parquet" + + def get_hashes(self): + return f"{self.date}/{self.date}.hashes.json" + + def get_stable_hash(self): + """To aid in generating different but repeatable test data for each file.""" + return stablehash( + ( + self.date, + self.csn, + self.mrn, + self.location, + self.variable_id, + self.channel_id, + ) + ) + + def get_stable_seed(self): + byte_hash = self.get_stable_hash().digest()[:4] + return int.from_bytes(byte_hash) + + def generate_data(self, vals_per_row: int) -> list[list[Decimal]]: + if self._test_values is None: + seed = self.get_stable_seed() + rng = random.Random(seed) + base_ampl = rng.normalvariate(1, 0.2) + base_offset = rng.normalvariate(0, 0.2) + self._test_values = [] + for row_num in range(self.num_rows): + values_row = [ + Decimal.from_float( + base_ampl * math.sin(base_offset + row_num * vals_per_row + i) + ).quantize(Decimal("1.0000")) + for i in range(vals_per_row) + ] + self._test_values.append(values_row) + # return as string but keep the numerical representation for comparison to parquet later + return self._test_values diff --git a/tests/snakemake_tests.py b/tests/snakemake_tests.py index 42c4665..3237db7 100644 --- a/tests/snakemake_tests.py +++ b/tests/snakemake_tests.py @@ -1,17 +1,60 @@ from datetime import timedelta +from pathlib import Path import pytest import locations import src.pipeline.utils as utils +from tests.helpers import TestFileDescription + def mock_do_hash(csn: str): return "no-hash" -def test_determine_eventual_outputs(monkeypatch): - tmp_path = "../" +def _make_test_input_csv(tmp_path): + files = [] + files.append( + TestFileDescription( + "2025-01-01", + 1735740783.0, + "SECRET_CSN_1235", + "SECRET_MRN_12346", + "SECRET_LOCATION_123", + "27", + "noCh", + 50, + "uV", + 4, + ) + ) + # new day, first CSN again + files.append( + TestFileDescription( + "2025-01-02", + 1735801965.0, + "SECRET_CSN_1234", + "SECRET_MRN_12345", + "SECRET_LOCATION_123", + "27", + "noCh", + 50, + "uV", + 5, + ) + ) + + for t in files: + original_csv_dir = tmp_path / "original-csv" + csv_path = original_csv_dir / t.get_orig_csv() + csv_path.parent.mkdir(parents=True, exist_ok=True) + with open(csv_path, "w") as f: + f.write("") + + +def test_determine_eventual_outputs(tmp_path: Path, monkeypatch): + _make_test_input_csv(tmp_path) original_csv_dir = tmp_path / locations.WAVEFORM_ORIGINAL_CSV.relative_to("/") monkeypatch.setattr( utils, diff --git a/tests/test_snakemake_integration.py b/tests/test_snakemake_integration.py index 47b4270..1b3c870 100644 --- a/tests/test_snakemake_integration.py +++ b/tests/test_snakemake_integration.py @@ -1,10 +1,7 @@ import json -import math import os import re -from dataclasses import dataclass from decimal import Decimal -import random import pyarrow as pa import pyarrow.parquet as pq @@ -13,7 +10,7 @@ from pathlib import Path import pytest -from stablehash import stablehash +from tests.helpers import TestFileDescription def _run_compose( @@ -52,91 +49,6 @@ def build_required_images(): result.check_returncode() -@dataclass -class TestFileDescription: - __test__ = False - date: str - start_timestamp: float - csn: str - mrn: str - location: str - variable_id: str - channel_id: str - sampling_rate: int - units: str - num_rows: int - _test_values: list = None - - def get_hashed_csn(self): - """This test runs outside of a docker container and the hasher doesn't expose a - fixed port, so is somewhat hard to find. - - Easier to just hard code the CSNs since we have a limited set of input CSNs. We - are defining expected values here; the test exporter will still use the test - hasher to generate the actual values. - """ - static_lookup = { - # These hashes are not secrets, they're keyed hashes of the input values - "SECRET_CSN_1234": "253d32c67e0d5aa4cdc7e9fc8442710dee8338c92abc3b905ab4b2f03194fc7e", # pragma: allowlist secret - "SECRET_CSN_1235": "ea2fda353f54926ae9d43fbc0ff4253912c250a137e9bd38bed860abacfe03ef", # pragma: allowlist secret - } - try: - return static_lookup[self.csn] - except KeyError as e: - # See develop.md "Manual hash lookup" if you need to add value - raise KeyError( - f"Unknown CSN '{self.csn}' passed to static get_hashed_csn(). " - f"You may need to manually add the known hash for your CSN. See docs for details." - ) from e - - def get_orig_csv(self): - return f"{self.date}/{self.date}.{self.csn}.{self.variable_id}.{self.channel_id}.{self.units}.csv" - - def get_orig_parquet(self): - return f"{self.date}/{self.date}.{self.csn}.{self.variable_id}.{self.channel_id}.{self.units}.parquet" - - def get_pseudon_parquet(self): - return f"{self.date}/{self.date}.{self.get_hashed_csn()}.{self.variable_id}.{self.channel_id}.{self.units}.parquet" - - def get_hashes(self): - return f"{self.date}/{self.date}.hashes.json" - - def get_stable_hash(self): - """To aid in generating different but repeatable test data for each file.""" - return stablehash( - ( - self.date, - self.csn, - self.mrn, - self.location, - self.variable_id, - self.channel_id, - ) - ) - - def get_stable_seed(self): - byte_hash = self.get_stable_hash().digest()[:4] - return int.from_bytes(byte_hash) - - def generate_data(self, vals_per_row: int) -> list[list[Decimal]]: - if self._test_values is None: - seed = self.get_stable_seed() - rng = random.Random(seed) - base_ampl = rng.normalvariate(1, 0.2) - base_offset = rng.normalvariate(0, 0.2) - self._test_values = [] - for row_num in range(self.num_rows): - values_row = [ - Decimal.from_float( - base_ampl * math.sin(base_offset + row_num * vals_per_row + i) - ).quantize(Decimal("1.0000")) - for i in range(vals_per_row) - ] - self._test_values.append(values_row) - # return as string but keep the numerical representation for comparison to parquet later - return self._test_values - - def _make_test_input_csv(tmp_path, t: TestFileDescription) -> list[list[Decimal]]: original_csv_dir = tmp_path / "original-csv" csv_path = original_csv_dir / t.get_orig_csv() From 9e6caac028790b2825655a0488762ebe07fc74e8 Mon Sep 17 00:00:00 2001 From: Stephen Thompson Date: Wed, 11 Feb 2026 17:08:51 +0000 Subject: [PATCH 07/19] determine_eventual_outputs takes a parameter that allows us to only process files from yesterday, plus a unit test. TODO get Snakefile to pass parameter based on environment variable. --- src/pipeline/utils.py | 16 +++++++++++----- tests/snakemake_tests.py | 37 +++++++++++++++++++++++++++++-------- 2 files changed, 40 insertions(+), 13 deletions(-) diff --git a/src/pipeline/utils.py b/src/pipeline/utils.py index 04407ea..a519bb1 100644 --- a/src/pipeline/utils.py +++ b/src/pipeline/utils.py @@ -74,7 +74,12 @@ def get_file_age(file_path: Path) -> timedelta: return now_utc - file_time_utc -def determine_eventual_outputs(csv_wait_time: timedelta): +def determine_eventual_outputs(csv_wait_time: timedelta, process_yesterday_only: bool): + """ + :param timedelta: only process files older than this + :param process_one_day_only: if true we only allow files from yesterday + :returns: A list of InputCsvFile and a dictionary containing the hash and csn values. + """ # Discover all CSVs using the basic file name pattern before = time.perf_counter() all_wc = glob_wildcards(CSV_PATTERN) @@ -93,10 +98,11 @@ def determine_eventual_outputs(csv_wait_time: timedelta): ): input_file_obj = InputCsvFile(date, csn, variable_id, channel_id, units) orig_file = input_file_obj.get_original_csv_path() - ## uncomment to sort by file date - # if date == '2025-01-01': - # print(f"Skipping file with bad date: {orig_file}") - # continue + if process_yesterday_only: + yesterday = datetime.now() - timedelta(1) + if date != datetime.strftime(yesterday, "%Y-%m-%d"): + print(f"Skipping file not from yesterday {orig_file}") + continue if csn == "unmatched_csn": print(f"Skipping file with unmatched CSN: {orig_file}") continue diff --git a/tests/snakemake_tests.py b/tests/snakemake_tests.py index 3237db7..9cbdd9c 100644 --- a/tests/snakemake_tests.py +++ b/tests/snakemake_tests.py @@ -1,4 +1,4 @@ -from datetime import timedelta +from datetime import timedelta, datetime from pathlib import Path import pytest @@ -15,8 +15,10 @@ def mock_do_hash(csn: str): def _make_test_input_csv(tmp_path): files = [] + # today files.append( TestFileDescription( + datetime.strftime(datetime.now(), "%Y-%m-%d"), "2025-01-01", 1735740783.0, "SECRET_CSN_1235", @@ -29,10 +31,27 @@ def _make_test_input_csv(tmp_path): 4, ) ) - # new day, first CSN again + + # yesterday + files.append( + TestFileDescription( + datetime.strftime(datetime.now() - timedelta(1), "%Y-%m-%d"), + "2025-01-01", + 1735740783.0, + "SECRET_CSN_1235", + "SECRET_MRN_12346", + "SECRET_LOCATION_123", + "27", + "noCh", + 50, + "uV", + 4, + ) + ) + # two days ago, first CSN again files.append( TestFileDescription( - "2025-01-02", + datetime.strftime(datetime.now() - timedelta(2), "%Y-%m-%d"), 1735801965.0, "SECRET_CSN_1234", "SECRET_MRN_12345", @@ -46,7 +65,7 @@ def _make_test_input_csv(tmp_path): ) for t in files: - original_csv_dir = tmp_path / "original-csv" + original_csv_dir = tmp_path / "waveform-export/original-csv" csv_path = original_csv_dir / t.get_orig_csv() csv_path.parent.mkdir(parents=True, exist_ok=True) with open(csv_path, "w") as f: @@ -59,9 +78,11 @@ def test_determine_eventual_outputs(tmp_path: Path, monkeypatch): monkeypatch.setattr( utils, "CSV_PATTERN", - original_csv_dir / "{date}.{csn}.{variable_id}.{channel_id}.{units}.csv", + original_csv_dir / "{date}/{date}.{csn}.{variable_id}.{channel_id}.{units}.csv", ) monkeypatch.setattr("src.pipeline.utils.hash_csn", mock_do_hash) - csv_wait_time = timedelta(minutes=5) - print(utils.determine_eventual_outputs(csv_wait_time)) - assert False + csv_wait_time = timedelta(0) + files, hash_to_csn = utils.determine_eventual_outputs(csv_wait_time, True) + assert len(files) == 1 + files, hash_to_csn = utils.determine_eventual_outputs(csv_wait_time, False) + assert len(files) == 3 From 8bbd9354289555107075736b6766da0d9a73d5c6 Mon Sep 17 00:00:00 2001 From: Stephen Thompson Date: Thu, 12 Feb 2026 09:03:29 +0000 Subject: [PATCH 08/19] Implemented pass through of environment variable to control processing dates. --- config.EXAMPLE/exporter.env.EXAMPLE | 5 +++++ exporter-scripts/scheduled-script.sh | 2 +- src/pipeline/Snakefile | 10 +++++----- tests/test_snakemake_integration.py | 1 + 4 files changed, 12 insertions(+), 6 deletions(-) diff --git a/config.EXAMPLE/exporter.env.EXAMPLE b/config.EXAMPLE/exporter.env.EXAMPLE index 214801c..989c607 100644 --- a/config.EXAMPLE/exporter.env.EXAMPLE +++ b/config.EXAMPLE/exporter.env.EXAMPLE @@ -22,3 +22,8 @@ INSTANCE_NAME= # How long must it have been since an original CSV was last modified before # progressing it down the pipeline? CSV_AGE_THRESHOLD_MINUTES=180 + +# For production use only process CSV files that have a date matching to +# yesterday. Setting this to False will process all relevant csv, which may +# be useful for development or restarting a stopped service. +ONLY_USE_CSV_FROM_YESTERDAY=True diff --git a/exporter-scripts/scheduled-script.sh b/exporter-scripts/scheduled-script.sh index eb494fa..3d3b674 100755 --- a/exporter-scripts/scheduled-script.sh +++ b/exporter-scripts/scheduled-script.sh @@ -27,7 +27,7 @@ set +e snakemake --snakefile /app/src/pipeline/Snakefile \ --cores "$SNAKEMAKE_CORES" \ --until "$SNAKEMAKE_RULE_UNTIL" \ - --config CSV_AGE_THRESHOLD_MINUTES="${CSV_AGE_THRESHOLD_MINUTES}" \ + --config CSV_AGE_THRESHOLD_MINUTES="${CSV_AGE_THRESHOLD_MINUTES}" ONLY_USE_CSV_FROM_YESTERDAY="${ONLY_USE_CSV_FROM_YESTERDAY}" \ >> "$outer_log_file" 2>&1 ret_code=$? set -e diff --git a/src/pipeline/Snakefile b/src/pipeline/Snakefile index ad0100f..469886f 100644 --- a/src/pipeline/Snakefile +++ b/src/pipeline/Snakefile @@ -1,5 +1,4 @@ import json -import os import time from datetime import datetime, timedelta, timezone from pathlib import Path @@ -9,16 +8,13 @@ from snakemake.io import glob_wildcards from exporter.ftps import do_upload from locations import ( - WAVEFORM_HASH_LOOKUPS, WAVEFORM_ORIGINAL_CSV, WAVEFORM_SNAKEMAKE_LOGS, WAVEFORM_PSEUDONYMISED_PARQUET, WAVEFORM_FTPS_LOGS, HASH_LOOKUP_JSON, - ORIGINAL_PARQUET_PATTERN, FILE_STEM_PATTERN, FILE_STEM_PATTERN_HASHED, - CSV_PATTERN, make_file_name, ) from pseudon.pseudon import csv_to_parquets @@ -36,13 +32,17 @@ from utils import determine_eventual_outputs # so being very cautious here may not be necessary. CSV_AGE_THRESHOLD_MINUTES = timedelta(minutes=float(config['CSV_AGE_THRESHOLD_MINUTES'])) +# To prevent Snakemake picking up old files in production, we can set a flag to to only +# use files with a date prefix matching yesterday's date. +ONLY_USE_CSV_FROM_YESTERDAY = bool(config['ONLY_USE_CSV_FROM_YESTERDAY']) + # The problem here is that snakemake works "backwards" in that you have to define which # files you want to be produced, yet we don't know what output files we need until the # input files (original CSVs) are present. # Therefore, look at the input files and work out the eventual output files so they can # be fed into snakemake. -all_outputs, hash_to_csn = determine_eventual_outputs(CSV_AGE_THRESHOLD_MINUTES) +all_outputs, hash_to_csn = determine_eventual_outputs(CSV_AGE_THRESHOLD_MINUTES, ONLY_USE_CSV_FROM_YESTERDAY) ALL_FTPS_UPLOADED = [ao.get_ftps_uploaded_file() for ao in all_outputs] ALL_DAILY_HASH_LOOKUPS = sorted({ao.get_daily_hash_lookup() for ao in all_outputs}) diff --git a/tests/test_snakemake_integration.py b/tests/test_snakemake_integration.py index 1b3c870..d59fffb 100644 --- a/tests/test_snakemake_integration.py +++ b/tests/test_snakemake_integration.py @@ -267,6 +267,7 @@ def _run_snakemake(tmp_path): "SNAKEMAKE_CORES=1\n" "INSTANCE_NAME=pytest\n" "CSV_AGE_THRESHOLD_MINUTES=5\n" + "ONLY_USE_CSV_FROM_YESTERDAY=False\n" ) # run system under test (exporter container) in foreground compose_args = [ From d82212afcf2598eec0a8dc42993098bff7549dce Mon Sep 17 00:00:00 2001 From: Stephen Thompson Date: Thu, 12 Feb 2026 09:13:08 +0000 Subject: [PATCH 09/19] Added an explanatory note. --- tests/snakemake_tests.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/snakemake_tests.py b/tests/snakemake_tests.py index 9cbdd9c..e9803ee 100644 --- a/tests/snakemake_tests.py +++ b/tests/snakemake_tests.py @@ -69,6 +69,8 @@ def _make_test_input_csv(tmp_path): csv_path = original_csv_dir / t.get_orig_csv() csv_path.parent.mkdir(parents=True, exist_ok=True) with open(csv_path, "w") as f: + # for this test doesn't use the contents of the files, only their + # filenames, so we'll save some processing by creating empty files. f.write("") From a023f8d2441937a1bf54d1849830255152bfbb7c Mon Sep 17 00:00:00 2001 From: Stephen Thompson Date: Thu, 12 Feb 2026 09:26:37 +0000 Subject: [PATCH 10/19] Fix unused imports --- src/pipeline/utils.py | 4 ---- tests/snakemake_tests.py | 1 - 2 files changed, 5 deletions(-) diff --git a/src/pipeline/utils.py b/src/pipeline/utils.py index a519bb1..449d2a6 100644 --- a/src/pipeline/utils.py +++ b/src/pipeline/utils.py @@ -1,15 +1,11 @@ -import json import time from datetime import datetime, timedelta, timezone from pathlib import Path -import pyarrow.parquet as pq from snakemake.io import glob_wildcards from pseudon.hashing import do_hash from locations import ( - WAVEFORM_HASH_LOOKUPS, - WAVEFORM_ORIGINAL_CSV, WAVEFORM_PSEUDONYMISED_PARQUET, WAVEFORM_FTPS_LOGS, HASH_LOOKUP_JSON, diff --git a/tests/snakemake_tests.py b/tests/snakemake_tests.py index e9803ee..7511b02 100644 --- a/tests/snakemake_tests.py +++ b/tests/snakemake_tests.py @@ -1,7 +1,6 @@ from datetime import timedelta, datetime from pathlib import Path -import pytest import locations import src.pipeline.utils as utils From 28f693a24a25938c021eb317cdc496e2e931443b Mon Sep 17 00:00:00 2001 From: Stephen Thompson Date: Thu, 12 Feb 2026 13:16:26 +0000 Subject: [PATCH 11/19] Implemented a regex match so we can optionally specify a date or date range to process from the config file. --- config.EXAMPLE/exporter.env.EXAMPLE | 5 ++++ exporter-scripts/scheduled-script.sh | 6 +++- src/pipeline/Snakefile | 7 +++-- src/pipeline/utils.py | 23 +++++++++++----- tests/snakemake_tests.py | 41 ++++++++++++++++++++++++---- 5 files changed, 67 insertions(+), 15 deletions(-) diff --git a/config.EXAMPLE/exporter.env.EXAMPLE b/config.EXAMPLE/exporter.env.EXAMPLE index 989c607..302c079 100644 --- a/config.EXAMPLE/exporter.env.EXAMPLE +++ b/config.EXAMPLE/exporter.env.EXAMPLE @@ -27,3 +27,8 @@ CSV_AGE_THRESHOLD_MINUTES=180 # yesterday. Setting this to False will process all relevant csv, which may # be useful for development or restarting a stopped service. ONLY_USE_CSV_FROM_YESTERDAY=True + +# if ONLY_USE_CSV_FROM_YESTERDAY set False, you can optionally +# specify a date to process format YYYY-MM-DD also accepts a regular +# expression to match multiple date +PROCESS_CSV_FROM_DATE= diff --git a/exporter-scripts/scheduled-script.sh b/exporter-scripts/scheduled-script.sh index 3d3b674..7692e5d 100755 --- a/exporter-scripts/scheduled-script.sh +++ b/exporter-scripts/scheduled-script.sh @@ -23,11 +23,15 @@ SNAKEMAKE_CORES="${SNAKEMAKE_CORES:-1}" echo "$0: invoking snakemake, cores=$SNAKEMAKE_CORES, logging to $outer_log_file" # For telling the pipeline not to go all the way SNAKEMAKE_RULE_UNTIL="${SNAKEMAKE_RULE_UNTIL:-all}" +# We default to only matching files from yesterday +ONLY_USE_CSV_FROM_YESTERDAY="${ONLY_USE_CSV_FROM_YESTERDAY:-True}" +# Default will match any date containing numbers +PROCESS_CSV_FROM_DATE="${PROCESS_CSV_FROM_DATE:-'[0-9]'}" set +e snakemake --snakefile /app/src/pipeline/Snakefile \ --cores "$SNAKEMAKE_CORES" \ --until "$SNAKEMAKE_RULE_UNTIL" \ - --config CSV_AGE_THRESHOLD_MINUTES="${CSV_AGE_THRESHOLD_MINUTES}" ONLY_USE_CSV_FROM_YESTERDAY="${ONLY_USE_CSV_FROM_YESTERDAY}" \ + --config CSV_AGE_THRESHOLD_MINUTES="${CSV_AGE_THRESHOLD_MINUTES}" ONLY_USE_CSV_FROM_YESTERDAY="${ONLY_USE_CSV_FROM_YESTERDAY}" PROCESS_CSV_FROM_DATE="${PROCESS_CSV_FROM_DATE}"\ >> "$outer_log_file" 2>&1 ret_code=$? set -e diff --git a/src/pipeline/Snakefile b/src/pipeline/Snakefile index 469886f..45ca508 100644 --- a/src/pipeline/Snakefile +++ b/src/pipeline/Snakefile @@ -33,16 +33,19 @@ from utils import determine_eventual_outputs CSV_AGE_THRESHOLD_MINUTES = timedelta(minutes=float(config['CSV_AGE_THRESHOLD_MINUTES'])) # To prevent Snakemake picking up old files in production, we can set a flag to to only -# use files with a date prefix matching yesterday's date. +# use files with a date prefix matching yesterday's date. Defaults to True ONLY_USE_CSV_FROM_YESTERDAY = bool(config['ONLY_USE_CSV_FROM_YESTERDAY']) +# We can optionally select a single date to process or a regular expression to match multiple dates. +PROCESS_CSV_FROM_DATE = str(config['PROCESS_CSV_FROM_DATE']) + # The problem here is that snakemake works "backwards" in that you have to define which # files you want to be produced, yet we don't know what output files we need until the # input files (original CSVs) are present. # Therefore, look at the input files and work out the eventual output files so they can # be fed into snakemake. -all_outputs, hash_to_csn = determine_eventual_outputs(CSV_AGE_THRESHOLD_MINUTES, ONLY_USE_CSV_FROM_YESTERDAY) +all_outputs, hash_to_csn = determine_eventual_outputs(CSV_AGE_THRESHOLD_MINUTES, ONLY_USE_CSV_FROM_YESTERDAY, PROCESS_CSV_FROM_DATE) ALL_FTPS_UPLOADED = [ao.get_ftps_uploaded_file() for ao in all_outputs] ALL_DAILY_HASH_LOOKUPS = sorted({ao.get_daily_hash_lookup() for ao in all_outputs}) diff --git a/src/pipeline/utils.py b/src/pipeline/utils.py index 449d2a6..e255292 100644 --- a/src/pipeline/utils.py +++ b/src/pipeline/utils.py @@ -1,6 +1,7 @@ import time from datetime import datetime, timedelta, timezone from pathlib import Path +import re from snakemake.io import glob_wildcards @@ -70,10 +71,14 @@ def get_file_age(file_path: Path) -> timedelta: return now_utc - file_time_utc -def determine_eventual_outputs(csv_wait_time: timedelta, process_yesterday_only: bool): +def determine_eventual_outputs( + csv_wait_time: timedelta, process_only_yesterday: bool, process_datestring: str +): """ :param timedelta: only process files older than this - :param process_one_day_only: if true we only allow files from yesterday + :param process_only_yesterday: if false we process all dates, true only from yesterday + :param process_datestring: a regular expression to match datestrings. Has no effect if + process_only_yesterday is true :returns: A list of InputCsvFile and a dictionary containing the hash and csn values. """ # Discover all CSVs using the basic file name pattern @@ -85,6 +90,12 @@ def determine_eventual_outputs(csv_wait_time: timedelta, process_yesterday_only: # Build reverse lookup using named wildcards _hash_to_csn: dict[str, str] = {} + + if process_only_yesterday: + process_datestring = datetime.strftime( + datetime.now() - timedelta(1), "%Y-%m-%d" + ) + for csn in all_wc.csn: _hash_to_csn[hash_csn(csn)] = csn # Apply all_wc to FILE_STEM_PATTERN_HASHED to generate the output stems @@ -94,11 +105,9 @@ def determine_eventual_outputs(csv_wait_time: timedelta, process_yesterday_only: ): input_file_obj = InputCsvFile(date, csn, variable_id, channel_id, units) orig_file = input_file_obj.get_original_csv_path() - if process_yesterday_only: - yesterday = datetime.now() - timedelta(1) - if date != datetime.strftime(yesterday, "%Y-%m-%d"): - print(f"Skipping file not from yesterday {orig_file}") - continue + if re.search(process_datestring, date) is None: + print(f"Skipping file not from {process_datestring} {orig_file}") + continue if csn == "unmatched_csn": print(f"Skipping file with unmatched CSN: {orig_file}") continue diff --git a/tests/snakemake_tests.py b/tests/snakemake_tests.py index 7511b02..4bb261e 100644 --- a/tests/snakemake_tests.py +++ b/tests/snakemake_tests.py @@ -31,7 +31,7 @@ def _make_test_input_csv(tmp_path): ) ) - # yesterday + # from yesterday files.append( TestFileDescription( datetime.strftime(datetime.now() - timedelta(1), "%Y-%m-%d"), @@ -47,7 +47,7 @@ def _make_test_input_csv(tmp_path): 4, ) ) - # two days ago, first CSN again + # two files from 2 days ago files.append( TestFileDescription( datetime.strftime(datetime.now() - timedelta(2), "%Y-%m-%d"), @@ -62,6 +62,20 @@ def _make_test_input_csv(tmp_path): 5, ) ) + files.append( + TestFileDescription( + datetime.strftime(datetime.now() - timedelta(2), "%Y-%m-%d"), + 1735801965.0, + "SECRET_CSN_1234", + "SECRET_MRN_12345", + "SECRET_LOCATION_123", + "27", + "14", + 50, + "uV", + 4, + ) + ) for t in files: original_csv_dir = tmp_path / "waveform-export/original-csv" @@ -83,7 +97,24 @@ def test_determine_eventual_outputs(tmp_path: Path, monkeypatch): ) monkeypatch.setattr("src.pipeline.utils.hash_csn", mock_do_hash) csv_wait_time = timedelta(0) - files, hash_to_csn = utils.determine_eventual_outputs(csv_wait_time, True) + # with process only yesterday true we should return only the single file from yesterday + process_only_yesterday = True + process_datestring = "" + files, hash_to_csn = utils.determine_eventual_outputs( + csv_wait_time, process_only_yesterday, process_datestring + ) assert len(files) == 1 - files, hash_to_csn = utils.determine_eventual_outputs(csv_wait_time, False) - assert len(files) == 3 + # with process only yesterday false and an empty process_datestring we should return all 4 files. + process_only_yesterday = False + process_datestring = "" + files, hash_to_csn = utils.determine_eventual_outputs( + csv_wait_time, process_only_yesterday, process_datestring + ) + assert len(files) == 4 + # with process only yesterday false and process_datestring set to two days ago we should return the two files from two days ago + process_only_yesterday = False + process_datestring = datetime.strftime(datetime.now() - timedelta(2), "%Y-%m-%d") + files, hash_to_csn = utils.determine_eventual_outputs( + csv_wait_time, process_only_yesterday, process_datestring + ) + assert len(files) == 2 From d0257cb0ee70307810f9093e8670ad19203ae09b Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Thu, 12 Feb 2026 16:52:02 +0000 Subject: [PATCH 12/19] Remove stray arguments --- tests/snakemake_tests.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/snakemake_tests.py b/tests/snakemake_tests.py index 4bb261e..ae9f20e 100644 --- a/tests/snakemake_tests.py +++ b/tests/snakemake_tests.py @@ -18,7 +18,6 @@ def _make_test_input_csv(tmp_path): files.append( TestFileDescription( datetime.strftime(datetime.now(), "%Y-%m-%d"), - "2025-01-01", 1735740783.0, "SECRET_CSN_1235", "SECRET_MRN_12346", @@ -35,7 +34,6 @@ def _make_test_input_csv(tmp_path): files.append( TestFileDescription( datetime.strftime(datetime.now() - timedelta(1), "%Y-%m-%d"), - "2025-01-01", 1735740783.0, "SECRET_CSN_1235", "SECRET_MRN_12346", From f2f7a162184312e082cb2d0d5daeb36b557264f1 Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Thu, 12 Feb 2026 17:41:59 +0000 Subject: [PATCH 13/19] Fix conversion of config booleans --- config.EXAMPLE/exporter.env.EXAMPLE | 6 +++--- src/pipeline/Snakefile | 4 ++-- src/pipeline/utils.py | 10 ++++++++++ 3 files changed, 15 insertions(+), 5 deletions(-) diff --git a/config.EXAMPLE/exporter.env.EXAMPLE b/config.EXAMPLE/exporter.env.EXAMPLE index 302c079..39a4119 100644 --- a/config.EXAMPLE/exporter.env.EXAMPLE +++ b/config.EXAMPLE/exporter.env.EXAMPLE @@ -24,11 +24,11 @@ INSTANCE_NAME= CSV_AGE_THRESHOLD_MINUTES=180 # For production use only process CSV files that have a date matching to -# yesterday. Setting this to False will process all relevant csv, which may +# yesterday. Setting this to FALSE will process all relevant csv, which may # be useful for development or restarting a stopped service. -ONLY_USE_CSV_FROM_YESTERDAY=True +ONLY_USE_CSV_FROM_YESTERDAY=TRUE -# if ONLY_USE_CSV_FROM_YESTERDAY set False, you can optionally +# if ONLY_USE_CSV_FROM_YESTERDAY set to FALSE, you can optionally # specify a date to process format YYYY-MM-DD also accepts a regular # expression to match multiple date PROCESS_CSV_FROM_DATE= diff --git a/src/pipeline/Snakefile b/src/pipeline/Snakefile index 45ca508..ae0bf6b 100644 --- a/src/pipeline/Snakefile +++ b/src/pipeline/Snakefile @@ -19,7 +19,7 @@ from locations import ( ) from pseudon.pseudon import csv_to_parquets -from utils import determine_eventual_outputs +from utils import config_bool, determine_eventual_outputs # How long before we assume that no more data will be written to the file, and @@ -34,7 +34,7 @@ CSV_AGE_THRESHOLD_MINUTES = timedelta(minutes=float(config['CSV_AGE_THRESHOLD_MI # To prevent Snakemake picking up old files in production, we can set a flag to to only # use files with a date prefix matching yesterday's date. Defaults to True -ONLY_USE_CSV_FROM_YESTERDAY = bool(config['ONLY_USE_CSV_FROM_YESTERDAY']) +ONLY_USE_CSV_FROM_YESTERDAY = config_bool(config['ONLY_USE_CSV_FROM_YESTERDAY']) # We can optionally select a single date to process or a regular expression to match multiple dates. PROCESS_CSV_FROM_DATE = str(config['PROCESS_CSV_FROM_DATE']) diff --git a/src/pipeline/utils.py b/src/pipeline/utils.py index e255292..ee37ac5 100644 --- a/src/pipeline/utils.py +++ b/src/pipeline/utils.py @@ -17,6 +17,16 @@ ) +def config_bool(value): + """Convert a config value from env/CLI to a bool.""" + s = str(value).strip().lower() + if s in {"", "0", "false"}: + return False + if s in {"1", "true"}: + return True + raise ValueError(f'Can\'t interpret value "{value}" as a boolean') + + def hash_csn(csn: str) -> str: return do_hash("csn", csn) From a74a57df78766c19b1c527d8016c68c5da1fc6e6 Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Thu, 12 Feb 2026 17:45:23 +0000 Subject: [PATCH 14/19] Rename test module so pytest can see it --- tests/{snakemake_tests.py => test_snakemake.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename tests/{snakemake_tests.py => test_snakemake.py} (100%) diff --git a/tests/snakemake_tests.py b/tests/test_snakemake.py similarity index 100% rename from tests/snakemake_tests.py rename to tests/test_snakemake.py From 1afdeccf71c160d6b1154337cc1756bb953bca7a Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Thu, 12 Feb 2026 18:48:34 +0000 Subject: [PATCH 15/19] Use UTC when working out today's date --- src/pipeline/utils.py | 8 ++++---- tests/test_snakemake.py | 11 ++++++----- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/src/pipeline/utils.py b/src/pipeline/utils.py index ee37ac5..6bf58b4 100644 --- a/src/pipeline/utils.py +++ b/src/pipeline/utils.py @@ -85,7 +85,7 @@ def determine_eventual_outputs( csv_wait_time: timedelta, process_only_yesterday: bool, process_datestring: str ): """ - :param timedelta: only process files older than this + :param csv_wait_time: only process files older than this :param process_only_yesterday: if false we process all dates, true only from yesterday :param process_datestring: a regular expression to match datestrings. Has no effect if process_only_yesterday is true @@ -102,9 +102,9 @@ def determine_eventual_outputs( _hash_to_csn: dict[str, str] = {} if process_only_yesterday: - process_datestring = datetime.strftime( - datetime.now() - timedelta(1), "%Y-%m-%d" - ) + process_datestring = ( + datetime.now(tz=timezone.utc).date() - timedelta(days=1) + ).isoformat() for csn in all_wc.csn: _hash_to_csn[hash_csn(csn)] = csn diff --git a/tests/test_snakemake.py b/tests/test_snakemake.py index ae9f20e..e9ef50a 100644 --- a/tests/test_snakemake.py +++ b/tests/test_snakemake.py @@ -1,4 +1,4 @@ -from datetime import timedelta, datetime +from datetime import timedelta, datetime, timezone from pathlib import Path import locations @@ -15,9 +15,10 @@ def mock_do_hash(csn: str): def _make_test_input_csv(tmp_path): files = [] # today + today = datetime.now(tz=timezone.utc).date() files.append( TestFileDescription( - datetime.strftime(datetime.now(), "%Y-%m-%d"), + today.isoformat(), 1735740783.0, "SECRET_CSN_1235", "SECRET_MRN_12346", @@ -33,7 +34,7 @@ def _make_test_input_csv(tmp_path): # from yesterday files.append( TestFileDescription( - datetime.strftime(datetime.now() - timedelta(1), "%Y-%m-%d"), + (today - timedelta(days=1)).isoformat(), 1735740783.0, "SECRET_CSN_1235", "SECRET_MRN_12346", @@ -48,7 +49,7 @@ def _make_test_input_csv(tmp_path): # two files from 2 days ago files.append( TestFileDescription( - datetime.strftime(datetime.now() - timedelta(2), "%Y-%m-%d"), + (today - timedelta(days=2)).isoformat(), 1735801965.0, "SECRET_CSN_1234", "SECRET_MRN_12345", @@ -62,7 +63,7 @@ def _make_test_input_csv(tmp_path): ) files.append( TestFileDescription( - datetime.strftime(datetime.now() - timedelta(2), "%Y-%m-%d"), + (today - timedelta(days=2)).isoformat(), 1735801965.0, "SECRET_CSN_1234", "SECRET_MRN_12345", From b569b6dcead219cef070e7a0464ec068603fad4a Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Thu, 12 Feb 2026 19:26:49 +0000 Subject: [PATCH 16/19] Reduce some repetition and tautology --- tests/test_snakemake.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/test_snakemake.py b/tests/test_snakemake.py index e9ef50a..59899a4 100644 --- a/tests/test_snakemake.py +++ b/tests/test_snakemake.py @@ -12,7 +12,7 @@ def mock_do_hash(csn: str): return "no-hash" -def _make_test_input_csv(tmp_path): +def _make_test_input_csv(csv_dir: Path): files = [] # today today = datetime.now(tz=timezone.utc).date() @@ -77,22 +77,22 @@ def _make_test_input_csv(tmp_path): ) for t in files: - original_csv_dir = tmp_path / "waveform-export/original-csv" - csv_path = original_csv_dir / t.get_orig_csv() + csv_path = csv_dir / t.get_orig_csv() csv_path.parent.mkdir(parents=True, exist_ok=True) with open(csv_path, "w") as f: # for this test doesn't use the contents of the files, only their # filenames, so we'll save some processing by creating empty files. f.write("") + return [f.get_orig_csv() for f in files] def test_determine_eventual_outputs(tmp_path: Path, monkeypatch): - _make_test_input_csv(tmp_path) original_csv_dir = tmp_path / locations.WAVEFORM_ORIGINAL_CSV.relative_to("/") + expected_paths = _make_test_input_csv(original_csv_dir) monkeypatch.setattr( utils, "CSV_PATTERN", - original_csv_dir / "{date}/{date}.{csn}.{variable_id}.{channel_id}.{units}.csv", + original_csv_dir / (locations.FILE_STEM_PATTERN + ".csv") ) monkeypatch.setattr("src.pipeline.utils.hash_csn", mock_do_hash) csv_wait_time = timedelta(0) From b1baf118097bc1a6959458e7697d22daa3a80a3c Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Thu, 12 Feb 2026 19:45:00 +0000 Subject: [PATCH 17/19] Parameterize test and check the right files are being returned --- tests/test_snakemake.py | 51 +++++++++++++++++++++++------------------ 1 file changed, 29 insertions(+), 22 deletions(-) diff --git a/tests/test_snakemake.py b/tests/test_snakemake.py index 59899a4..c9ab5ea 100644 --- a/tests/test_snakemake.py +++ b/tests/test_snakemake.py @@ -1,6 +1,8 @@ from datetime import timedelta, datetime, timezone from pathlib import Path +import pytest + import locations import src.pipeline.utils as utils @@ -86,34 +88,39 @@ def _make_test_input_csv(csv_dir: Path): return [f.get_orig_csv() for f in files] -def test_determine_eventual_outputs(tmp_path: Path, monkeypatch): +@pytest.mark.parametrize( + "process_only_yesterday, process_datestring, expected_file_indexes", + [ + # with process only yesterday true we should return only the single file from yesterday + (True, "", [1]), + # with process only yesterday false and an empty process_datestring we should return all 4 files. + (False, "", [0, 1, 2, 3]), + # with process only yesterday false and process_datestring set to two days ago we should return the two files from two days ago + ( + False, + (datetime.now(tz=timezone.utc).date() - timedelta(days=2)).isoformat(), + [2, 3], + ), + ], +) +def test_determine_eventual_outputs( + tmp_path: Path, + monkeypatch, + process_only_yesterday, + process_datestring, + expected_file_indexes, +): original_csv_dir = tmp_path / locations.WAVEFORM_ORIGINAL_CSV.relative_to("/") expected_paths = _make_test_input_csv(original_csv_dir) monkeypatch.setattr( - utils, - "CSV_PATTERN", - original_csv_dir / (locations.FILE_STEM_PATTERN + ".csv") + utils, "CSV_PATTERN", original_csv_dir / (locations.FILE_STEM_PATTERN + ".csv") ) monkeypatch.setattr("src.pipeline.utils.hash_csn", mock_do_hash) csv_wait_time = timedelta(0) - # with process only yesterday true we should return only the single file from yesterday - process_only_yesterday = True - process_datestring = "" - files, hash_to_csn = utils.determine_eventual_outputs( - csv_wait_time, process_only_yesterday, process_datestring - ) - assert len(files) == 1 - # with process only yesterday false and an empty process_datestring we should return all 4 files. - process_only_yesterday = False - process_datestring = "" - files, hash_to_csn = utils.determine_eventual_outputs( - csv_wait_time, process_only_yesterday, process_datestring - ) - assert len(files) == 4 - # with process only yesterday false and process_datestring set to two days ago we should return the two files from two days ago - process_only_yesterday = False - process_datestring = datetime.strftime(datetime.now() - timedelta(2), "%Y-%m-%d") + files, hash_to_csn = utils.determine_eventual_outputs( csv_wait_time, process_only_yesterday, process_datestring ) - assert len(files) == 2 + assert { + str(f.get_original_csv_path().relative_to(original_csv_dir)) for f in files + } == {expected_paths[ex_i] for ex_i in expected_file_indexes} From 9c97ccbb399f9832a7c352db77c25463a5b80871 Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Fri, 13 Feb 2026 12:24:03 +0000 Subject: [PATCH 18/19] Verify yesterday mode overrides explicit date --- tests/test_snakemake.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/test_snakemake.py b/tests/test_snakemake.py index c9ab5ea..c32b07d 100644 --- a/tests/test_snakemake.py +++ b/tests/test_snakemake.py @@ -93,6 +93,12 @@ def _make_test_input_csv(csv_dir: Path): [ # with process only yesterday true we should return only the single file from yesterday (True, "", [1]), + # process only yesterday overrides process_datestring + ( + True, + (datetime.now(tz=timezone.utc).date() - timedelta(days=2)).isoformat(), + [1], + ), # with process only yesterday false and an empty process_datestring we should return all 4 files. (False, "", [0, 1, 2, 3]), # with process only yesterday false and process_datestring set to two days ago we should return the two files from two days ago From ac0c24bfbcedeb5090638eb2b38d15fb861b0890 Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Fri, 13 Feb 2026 13:11:33 +0000 Subject: [PATCH 19/19] Set new env var explicitly in test config so that values you have on your dev machine won't intermittently break the tests --- tests/test_snakemake_integration.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/test_snakemake_integration.py b/tests/test_snakemake_integration.py index d59fffb..50d7b9c 100644 --- a/tests/test_snakemake_integration.py +++ b/tests/test_snakemake_integration.py @@ -260,6 +260,9 @@ def _run_snakemake(tmp_path): # It works around this in normal use by reading env vars only from the bind-mounted exporter.env file. # So to use a different config during test, we must override that file with a special version # that we create here. + # Note that since we bypass cron here, any config on your dev machine will be picked up and then + # overwritten by what is below. Therefore, you should explicitly set values below even if you only + # want the default value to be used. tmp_exporter_env_path = tmp_path / "config/exporter.env" tmp_exporter_env_path.parent.mkdir(exist_ok=True) tmp_exporter_env_path.write_text( @@ -268,6 +271,7 @@ def _run_snakemake(tmp_path): "INSTANCE_NAME=pytest\n" "CSV_AGE_THRESHOLD_MINUTES=5\n" "ONLY_USE_CSV_FROM_YESTERDAY=False\n" + "PROCESS_CSV_FROM_DATE=\n" ) # run system under test (exporter container) in foreground compose_args = [