Skip to content

Commit 2c1f2b9

Browse files
feat: NHS record extractor script (#1804)
* add nhs record extractor script * change nhs number example and adding logging * md linting correct
1 parent ec67c21 commit 2c1f2b9

File tree

3 files changed

+301
-0
lines changed

3 files changed

+301
-0
lines changed
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
# NHS Record Extractor
2+
3+
A Python script that finds the most recent records for a list of NHS numbers from parquet files in the current directory and saves them to a new parquet file. The most recent record is determined by the date in the filename.
4+
5+
## Requirements
6+
7+
Install the required dependencies:
8+
9+
```sh
10+
pip install -r requirements.txt
11+
```
12+
13+
## Usage
14+
15+
### Using example NHS numbers
16+
17+
```sh
18+
python nhs_record_extractor.py
19+
```
20+
21+
### Using NHS numbers from a file
22+
23+
```sh
24+
python nhs_record_extractor.py nhs_numbers.txt
25+
```
26+
27+
### Specifying an output file
28+
29+
```sh
30+
python nhs_record_extractor.py nhs_numbers.txt output_file.parquet
31+
```
32+
33+
## Input Format
34+
35+
The input file should contain one NHS number per line.
36+
37+
## How It Works
38+
39+
1. The script scans the current directory for all .parquet files
40+
2. It extracts the date from each filename (everything before the first underscore)
41+
- Example: From '20251027100135103118_BEFB67_-_CAAS_BREAST_SCREENING_COHORT.parquet', it extracts '20251027100135103118'
42+
3. It reads each parquet file and looks for records matching the provided NHS numbers
43+
4. For each NHS number, it keeps track of the most recent record based on the date extracted from the filename
44+
5. Finally, it saves all the most recent records to a new parquet file with the same schema as the source files
45+
46+
## Expected File Format
47+
48+
### Parquet Files
49+
50+
The script expects parquet files to have at least this column:
51+
52+
- `nhs_number`: The NHS number
53+
54+
### Filenames
55+
56+
The script expects parquet filenames to follow this pattern:
57+
58+
- `DATE_other_information.parquet` where DATE is the date received
59+
- Example: '20251027100135103118_BEFB67_-_CAAS_BREAST_SCREENING_COHORT.parquet'
Lines changed: 240 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,240 @@
1+
#!/usr/bin/env python3
2+
"""
3+
NHS Record Extractor
4+
5+
This script finds the most recent records for a list of NHS numbers
6+
from parquet files in the current directory and saves them to a new parquet file.
7+
The most recent record is determined by the date in the filename.
8+
"""
9+
10+
from pandas.core.frame import DataFrame
11+
12+
13+
from typing import Any
14+
15+
16+
import os
17+
import sys
18+
import glob
19+
import pandas as pd
20+
import pyarrow
21+
import pyarrow.parquet as pq
22+
from datetime import datetime
23+
import re
24+
25+
def extract_date_from_filename(filename):
26+
"""
27+
Extract the date from a filename.
28+
29+
The date is everything up to the first underscore in the filename.
30+
Example: '20251027100135103118_BEFB67_-_CAAS_BREAST_SCREENING_COHORT.parquet'
31+
32+
Args:
33+
filename (str): The filename to extract the date from
34+
35+
Returns:
36+
str: The date string extracted from the filename
37+
"""
38+
# Get just the filename without the path
39+
base_filename = os.path.basename(filename)
40+
41+
# Extract everything up to the first underscore
42+
match = re.match(r'^([^_]+)_', base_filename)
43+
44+
if match:
45+
return match.group(1)
46+
else:
47+
# If no underscore found, return empty string
48+
return ""
49+
50+
def find_parquet_files(directory='.'):
51+
"""
52+
Find all parquet files in the specified directory.
53+
54+
Args:
55+
directory (str): Directory to search for parquet files
56+
57+
Returns:
58+
list: List of paths to parquet files
59+
"""
60+
return glob.glob(os.path.join(directory, '*.parquet'))
61+
62+
def read_nhs_numbers(file_path=None):
63+
"""
64+
Read NHS numbers from a file or use example numbers.
65+
66+
Args:
67+
file_path (str, optional): Path to file containing NHS numbers
68+
69+
Returns:
70+
list: List of NHS numbers
71+
"""
72+
if file_path:
73+
try:
74+
with open(file_path, 'r') as file:
75+
nhs_numbers = [line.strip() for line in file if line.strip()]
76+
return nhs_numbers
77+
except FileNotFoundError:
78+
print(f"Error: File '{file_path}' not found.")
79+
sys.exit(1)
80+
else:
81+
# Example NHS numbers if no file is provided
82+
print('using default nhs numbers from code not from the file.')
83+
return ['9999987109', '1234567890']
84+
85+
def find_most_recent_records(nhs_numbers, parquet_files) -> tuple[DataFrame, Any | None, dict[Any, Any]]:
86+
"""
87+
Find the most recent record for each NHS number from the parquet files.
88+
The most recent record is determined by the date in the filename.
89+
90+
Args:
91+
nhs_numbers (list): List of NHS numbers to search for
92+
parquet_files (list): List of parquet file paths
93+
94+
Returns:
95+
tuple: (DataFrame of most recent records, schema of the parquet files)
96+
"""
97+
# Convert NHS numbers to a set for faster lookup
98+
nhs_set = set(nhs_numbers)
99+
100+
# Dictionary to store the most recent record for each NHS number
101+
most_recent_records = {}
102+
103+
# Store the schema from the first valid parquet file
104+
schema = None
105+
106+
# Process each parquet file
107+
for file_path in parquet_files:
108+
print(f"Processing file: {file_path}")
109+
110+
# Extract date from filename
111+
file_date_str = extract_date_from_filename(file_path)
112+
113+
if not file_date_str:
114+
print(f"Warning: Could not extract date from filename {file_path}. Skipping.")
115+
continue
116+
117+
try:
118+
# Read the parquet file
119+
df = pd.read_parquet(file_path)
120+
121+
# Store the schema from the first valid parquet file
122+
if schema is None:
123+
# Get the schema from the parquet file
124+
parquet_file = pq.ParquetFile(file_path)
125+
# Convert ParquetSchema to pyarrow.lib.Schema
126+
schema = parquet_file.schema_arrow
127+
128+
# Check if the dataframe has the necessary column
129+
if 'nhs_number' not in df.columns:
130+
print(f"Warning: File {file_path} does not have an 'nhs_number' column. Skipping.")
131+
continue
132+
133+
# Filter for records with matching NHS numbers
134+
matching_records = df[df['nhs_number'].astype(str).isin(list(nhs_set))]
135+
136+
# Process each matching record
137+
for _, record in matching_records.iterrows():
138+
nhs = str(record['nhs_number'])
139+
140+
# Check if this is the most recent record for this NHS number
141+
# based on the file date
142+
if nhs not in most_recent_records or file_date_str > most_recent_records[nhs]['file_date']:
143+
most_recent_records[nhs] = {
144+
'record': record,
145+
'file_date': file_date_str,
146+
'file_path': file_path
147+
}
148+
149+
except Exception as e:
150+
print(f"Error processing file {file_path}: {e}")
151+
152+
# Create a DataFrame from the most recent records
153+
if most_recent_records:
154+
records_list = [record_data['record'] for record_data in most_recent_records.values()]
155+
result_df = pd.DataFrame(records_list)
156+
return result_df, schema, most_recent_records
157+
else:
158+
return pd.DataFrame(), schema, {}
159+
160+
def save_to_parquet(df, schema, output_file='most_recent_records.parquet'):
161+
"""
162+
Save the DataFrame to a parquet file with the same schema as the source files.
163+
164+
Args:
165+
df (DataFrame): DataFrame to save
166+
schema: Schema to use for the parquet file
167+
output_file (str): Path to save the parquet file
168+
"""
169+
if df.empty:
170+
print("No records to save.")
171+
return False
172+
173+
try:
174+
# Save the DataFrame to a parquet file
175+
if schema is not None:
176+
try:
177+
# Try to use the provided schema
178+
table = pyarrow.Table.from_pandas(df, schema=schema)
179+
except Exception as schema_error:
180+
print(f"Warning: Could not use provided schema: {schema_error}")
181+
print("Falling back to inferred schema.")
182+
table = pyarrow.Table.from_pandas(df)
183+
else:
184+
# No schema provided, let pyarrow infer it
185+
table = pyarrow.Table.from_pandas(df)
186+
187+
pq.write_table(table, output_file)
188+
print(f"Saved {len(df)} records to {output_file}")
189+
return True
190+
except Exception as e:
191+
print(f"Error saving to parquet file: {e}")
192+
return False
193+
194+
def main():
195+
"""
196+
Main function to find the most recent records for NHS numbers and save them to a parquet file.
197+
"""
198+
# Check if a file path was provided as a command-line argument
199+
if len(sys.argv) > 1:
200+
file_path = sys.argv[1]
201+
print(f"Reading NHS numbers from {file_path}...")
202+
nhs_numbers = read_nhs_numbers(file_path)
203+
else:
204+
nhs_numbers = read_nhs_numbers()
205+
print(f"Using example NHS numbers: {nhs_numbers}")
206+
207+
# Find all parquet files in the current directory
208+
parquet_files = find_parquet_files()
209+
print(f"Found {len(parquet_files)} parquet files.")
210+
211+
if not parquet_files:
212+
print("No parquet files found in the current directory.")
213+
sys.exit(1)
214+
215+
# Find the most recent records
216+
result_df, schema, most_recent_records = find_most_recent_records(nhs_numbers, parquet_files)
217+
218+
# Print summary of results
219+
if not result_df.empty:
220+
print(f"\nFound most recent records for {len(result_df)} NHS numbers.")
221+
222+
# Get the output file name
223+
output_file = 'most_recent_records.parquet'
224+
if len(sys.argv) > 2:
225+
output_file = sys.argv[2]
226+
227+
# Print source files for each NHS number
228+
print("\nSource files for each NHS number:")
229+
for nhs, data in most_recent_records.items():
230+
print(f"NHS {nhs}: {data['file_path']} (Date: {data['file_date']})")
231+
232+
# Save the results to a parquet file
233+
save_to_parquet(result_df, schema, output_file)
234+
else:
235+
print("No matching records found for the provided NHS numbers.")
236+
237+
if __name__ == "__main__":
238+
main()
239+
240+
# Made with Bob
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
pandas>=1.3.0
2+
pyarrow>=5.0.0

0 commit comments

Comments
 (0)