Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Made the notebook use a pandas pipeline and updated HES files location
  • Loading branch information
SamHollings authored May 28, 2024
commit b19645b1117597b31ab535c2af9cc1671496c325
2 changes: 1 addition & 1 deletion config.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
project_name = "example_pipeline_pyspark_version"

data_url = "https://s3.eu-west-2.amazonaws.com/files.digital.nhs.uk/assets/Services/Artificial+data/Artificial+HES+final/artificial_hes_ae_202302_v1_sample.zip"
data_url = "https://files.digital.nhs.uk/assets/Services/Artificial%20data/Artificial%20HES%20final/artificial_hes_ae_202302_v1_sample.zip"
path_to_downloaded_data = "data_in/artificial_hes_ae_202302_v1_sample.zip/artificial_hes_ae_202302_v1_sample/artificial_hes_ae_2122.csv"

# Here we describe where the output and logs are saved, change as necessary
Expand Down
143 changes: 100 additions & 43 deletions rap_example_pipeline_python.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@
"Note that in the src folder, each of these phases has its own folder, to neatly organise the code used for each one."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Setup"
]
},
{
"cell_type": "code",
"execution_count": null,
Expand All @@ -63,49 +70,33 @@
"logger = logging.getLogger(__name__)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Config"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def main():\n",
" \n",
" # load config, here we load our project's parameters from the config.toml file\n",
" config = file_paths.get_config() \n",
"\n",
" # configure logging\n",
" logging_config.configure_logging(config['log_dir'])\n",
" logger.info(f\"Configured logging with log folder: {config['log_dir']}.\")\n",
" logger.info(f\"Logging the config settings:\\n\\n\\t{config}\\n\")\n",
" logger.info(f\"Starting run at:\\t{datetime.now().time()}\")\n",
"\n",
" # get artificial HES data as CSV\n",
" get_data.download_zip_from_url(config['data_url'], overwrite=True)\n",
" logger.info(f\"Downloaded artificial hes as zip.\")\n",
"\n",
" # create spark session\n",
" spark = spark_utils.create_spark_session(config['project_name'])\n",
" logger.info(f\"created spark session with app name: {config['project_name']}\")\n",
"\n",
" # Loading data from CSV as spark data frame\n",
" df_hes_data = reading_data.load_csv_into_spark_data_frame(spark, config['path_to_downloaded_data'])\n",
"\n",
" # Creating dictionary to hold outputs\n",
" outputs = {}\n",
"\n",
" # Count number of episodes in England - place this in the outputs dictionary\n",
" outputs[\"df_hes_england_count\"] = aggregate_counts.get_distinct_count(df_hes_data, 'epikey', 'number_of_episodes')\n",
"\n",
" # Rename and save spark dataframes as CSVs:\n",
" for output_name, output in outputs.items():\n",
" write_csv.save_spark_dataframe_as_csv(output, output_name)\n",
" logger.info(f\"saved output df {output_name} as csv\")\n",
" write_csv.rename_csv_output(output_name)\n",
" logger.info(f\"renamed {output_name} file\")\n",
" \n",
" # stop the spark session\n",
" spark.stop()"
"config = file_paths.get_config() \n",
"\n",
"# configure logging\n",
"logging_config.configure_logging(config['log_dir'])\n",
"logger.info(f\"Configured logging with log folder: {config['log_dir']}.\")\n",
"logger.info(f\"Logging the config settings:\\n\\n\\t{config}\\n\")\n",
"logger.info(f\"Starting run at:\\t{datetime.now().time()}\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Load Data"
]
},
{
Expand All @@ -114,17 +105,83 @@
"metadata": {},
"outputs": [],
"source": [
" print(f\"Running create_publication script\")\n",
" start_time = timeit.default_timer()\n",
" main()\n",
" total_time = timeit.default_timer() - start_time\n",
" logger.info(f\"Running time of create_publication script: {int(total_time / 60)} minutes and {round(total_time%60)} seconds.\\n\")"
"# get artificial HES data as CSV\n",
"get_data.download_zip_from_url(config['data_url'], overwrite=True)\n",
"logger.info(f\"Downloaded artificial hes as zip.\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd\n",
"\n",
"df_hes_data = pd.read_csv(config['path_to_downloaded_data'])"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def get_distinct_count(df: pd.DataFrame, col_to_aggregate: str) -> int:\n",
" \"\"\"Returns the number of distinct values in a column of a pandas DataFrame.\"\"\"\n",
" return df[col_to_aggregate].nunique()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Creating dictionary to hold outputs\n",
"outputs = {}\n",
"\n",
"# Count number of episodes in England - place this in the outputs dictionary\n",
"outputs[\"df_hes_england_count\"] = get_distinct_count(df_hes_data, 'EPIKEY')\n",
"\n",
"# Rename and save spark dataframes as CSVs:\n",
"for output_name, output in outputs.items():\n",
"\n",
" import pandas as pd\n",
"\n",
" # Create a DataFrame with the integer value\n",
" df_output = pd.DataFrame({'england_count': [outputs[\"df_hes_england_count\"]]})\n",
"\n",
" # prep the filepath and ensure the directory exists\n",
" from pathlib import Path\n",
" output_file = 'my_file.csv'\n",
" output_dir = Path(f'data_out/{output_name}')\n",
" output_dir.mkdir(parents=True, exist_ok=True)\n",
" output_filename = output_dir /f'{output_name}.csv'\n",
"\n",
" # Save the DataFrame to a CSV file\n",
" df_output.to_csv(output_filename, index=False)\n",
" logger.info(f\"saved output df to {output_filename}\")"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"name": "python"
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.13"
}
},
"nbformat": 4,
Expand Down
18 changes: 10 additions & 8 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,32 @@
# See https://nhsd-git.digital.nhs.uk/data-services/analytics-service/iuod/rap-community-of-practice/-/blob/master/python/project-structure-and-packaging.md

# PySpark
pyspark==3.2.1
pyspark

# Requests - used to collect data
requests

# Python version = 3.10.*

# Data manipulation
numpy==1.21.5
pandas==1.3.5
numpy
pandas

# SQL connections
pyodbc==4.0.35
sqlalchemy==1.4.46
pyodbc
sqlalchemy

# Excel output
#openpyxl==3.0.9

# Testing
pytest==6.2.5
pytest-html==3.1.1
pytest
pytest-html

# Dependencies of the above packages
#ipykernel==6.9.0
#nbformat==5.1.3
toml==0.10.2
toml
#pathlib2==2.3.6

jupyter