Skip to content

Commit 82686bc

Browse files
authored
Merge pull request #1509 from NREL/sdr_2025_release_1_postprocessing
SDR postprocessing
2 parents bc033b6 + 6a3a823 commit 82686bc

14 files changed

+62173
-1059
lines changed

postprocessing/README.md

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,19 @@ To install the package, we recommend using `uv` for Python package management.
3535
uv run --group dev pre-commit install
3636
```
3737

38-
# Usage
39-
40-
4138
4. Run the scripts as desired
4239
```bash
40+
# Output the failure log
4341
cd path/to/postprocessing
4442
uv run resstockpostproc/get_failures.py <csv_path> --verbose
43+
44+
# Export metadata and annual results from files on S3
45+
uv run resstockpostproc/process_bsb_results.py "s3://res-sdr/testing-sdr-fy25/a_run" "C:/path/to/bsb/output/a_run_output"
46+
47+
# Export metdata and annual results from local files
48+
# (It is faster to download the /baseline and /upgrades directories from S3 once instead of reading from S3 each time)
49+
uv run resstockpostproc/process_bsb_results.py "C:/path/to/bsb/output/a_run" "C:/path/to/bsb/output/a_run_output"
50+
51+
# Export metdata and annual results to OEDI
52+
uv run resstockpostproc/process_bsb_results.py "C:/path/to/bsb/output/a_run" "s3://oedi-data-lake/nrel-pds-building-stock/end-use-load-profiles-for-us-building-stock/2025/resstock_amy2018_release_1"
4553
```
Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1 @@
1-
from resstockpostproc.process_metadata import publish_baseline_annual_results, publish_upgrade_annual_results
2-
3-
__all__ = [
4-
"publish_baseline_annual_results",
5-
"publish_upgrade_annual_results",
6-
]
1+
from .process_metadata import process_simulation_outputs
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
import pandas as pd
2+
import pathlib
3+
def data_dictionary(df_sdr):
4+
"""
5+
generate data dictionary based on sdr_column_definitions.csv.
6+
"""
7+
df_sdr_meta = df_sdr[(df_sdr['Publish In Full'] == 'yes') & (df_sdr['Published Annual Name'].notnull())]
8+
df_sdr_tsagg = df_sdr[(df_sdr['Timeseries Publish In Full'] == 'yes') & (df_sdr['Published Timeseries Name'].notnull())]
9+
10+
# metadata_and_annual_results column names, units, and description
11+
df_meta = df_sdr_meta[['Published Annual Name',
12+
'Data Type',
13+
'Published Annual Unit',
14+
'Notes']].rename(columns={
15+
'Published Annual Name': 'field_name',
16+
'Data Type': 'data_type',
17+
'Published Annual Unit': 'units',
18+
'Notes': 'field_description'
19+
})
20+
df_meta.insert(loc=0, column='field_location', value='metadata_and_annual')
21+
22+
# timeseries_aggregates column names, units, and description
23+
df_tsagg_sdr = df_sdr_tsagg[['Published Timeseries Name',
24+
'Data Type',
25+
'Published Timeseries Unit',
26+
'Notes']].rename(columns={
27+
'Published Timeseries Name': 'field_name',
28+
'Data Type': 'data_type',
29+
'Published Timeseries Unit': 'units',
30+
'Notes': 'field_description'
31+
})
32+
df_tsagg_sdr.insert(loc=0, column='field_location', value='timeseries_aggregates')
33+
34+
#combine metadata_and_annual_results and timeseries_aggregates
35+
df_data_dict = pd.concat([df_meta, df_tsagg_sdr], ignore_index=True)
36+
df_data_dict['units'] = df_data_dict['units'].fillna('n/a')
37+
38+
return df_data_dict
39+
40+
def main():
41+
here = pathlib.Path(__file__).resolve().parent
42+
df_sdr = pd.read_csv(here / "resources" / "publication" / "sdr_column_definitions.csv")
43+
df_data_dict = data_dictionary(df_sdr)
44+
df_data_dict.to_csv(here / "resources" / "publication" / "data_dictionary.tsv", sep='\t', index=None)
45+
46+
47+
if __name__ == "__main__":
48+
main()
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
import pandas as pd
2+
import pathlib
3+
4+
5+
def enum(df):
6+
"""
7+
enumerations for a dataframe
8+
"""
9+
df_enum = (
10+
pd.concat(
11+
[pd.DataFrame({'metadata_column': col, 'enumeration': df[col].unique()})
12+
for col in df.columns],
13+
ignore_index=True
14+
)
15+
)
16+
17+
return df_enum
18+
19+
20+
def enum_dict(df_data_dict, df_bs_csv, df_meta_up, up_files):
21+
#format buildstock.csv column names
22+
df_bs_csv.columns = ['in.' + col.lower().replace(' ', '_') for col in df_bs_csv.columns]
23+
df_bs_csv = df_bs_csv.drop('in.building', axis=1)
24+
df_bs_csv = df_bs_csv.rename(columns={
25+
'in.ashrae_iecc_climate_zone_2004_-_sub-cz_split': 'in.ashrae_iecc_climate_zone_2004_sub_cz_split',
26+
'in.income_recs2015': 'in.income_recs_2015',
27+
'in.income_recs2020': 'in.income_recs_2020'
28+
})
29+
30+
#enumerations from buildstock.csv
31+
df_enum_bs_csv = enum(df_bs_csv)
32+
33+
df_data_dict_filter = df_data_dict[df_data_dict['field_location'] == 'metadata_and_annual']
34+
data_dict_columns = df_data_dict_filter['field_name']
35+
data_dict_columns = [x for x in data_dict_columns if not x.startswith(("out.", "calc.weighted", "bldg_id"))]
36+
37+
bs_csv_columns = df_bs_csv.columns
38+
leftover_columns = list(set(data_dict_columns) - set(bs_csv_columns))
39+
40+
#enumerations from released data
41+
df_enum_meta = pd.DataFrame(columns=['metadata_column', 'enumeration'])
42+
for up in up_files:
43+
#Do not need the renaming for the released parquet file
44+
df_meta_up[up] = df_meta_up[up].rename(columns={
45+
'in.sqft': 'in.sqft..ft2',
46+
'in.air_leakage_to_outside_ach_50': 'in.air_leakage_to_outside_ach50',
47+
'upgrade_name': 'in.upgrade_name',
48+
'in.electric_panel_service_rating': 'in.electric_panel_service_rating..a',
49+
'in.electric_panel_service_rating_bin': 'in.electric_panel_service_rating_bin..a',
50+
'in.air_leakage_to_outside_ach_50': 'in.air_leakage_to_outside_ach50'
51+
})
52+
existing_cols = [c for c in leftover_columns if c in df_meta_up[up].columns]
53+
df_meta_filter = df_meta_up[up][existing_cols]
54+
df_meta_filter_enum = enum(df_meta_filter)
55+
df_enum_meta = pd.concat([df_enum_meta, df_meta_filter_enum]).drop_duplicates(keep='first')
56+
57+
df_enum_dict = pd.concat([df_enum_bs_csv, df_enum_meta]).drop_duplicates(keep='first')
58+
df_enum_dict['enumeration'] = df_enum_dict['enumeration'].fillna("None")
59+
df_enum_dict = df_enum_dict.sort_values(by=['metadata_column', 'enumeration'])
60+
61+
df_enum_dict_columns = df_enum_dict['metadata_column'].unique().tolist()
62+
missing_cols = [c for c in data_dict_columns if c not in df_enum_dict_columns]
63+
print("Missing columns:", missing_cols)
64+
65+
return df_enum_dict
66+
67+
68+
def main():
69+
here = pathlib.Path(__file__).resolve().parent
70+
test_path = here.parent.parent
71+
df_data_dict = pd.read_csv(here / "resources" / "publication" / "data_dictionary.tsv", sep='\t')
72+
df_bs_csv = pd.read_csv(test_path / "test" / "base_results" / "baseline"/ "annual"/ "buildstock.csv")
73+
df_meta_up = {}
74+
up_path = (test_path / "test" / "base_results" / "upgrades"/ "sdr_annual")
75+
up_files = [f.name for f in up_path.glob('*.csv')]
76+
for up in up_files:
77+
df_meta_up[up] = pd.read_csv(test_path / "test" / "base_results" / "upgrades"/ "sdr_annual"/ up)
78+
79+
df_enum_dict = enum_dict(df_data_dict, df_bs_csv, df_meta_up, up_files)
80+
df_enum_dict.to_csv(here / "resources" / "publication" / "enumeration_dictionary.tsv", sep='\t', index=None)
81+
82+
83+
if __name__ == "__main__":
84+
main()

postprocessing/resstockpostproc/income_mapper.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ def assign_representative_income(df: pl.LazyFrame | pl.DataFrame, return_map_onl
108108
check_df = check_df.collect() if lazy else check_df
109109
assert len(check_df) == 0, f"rep_income could not be mapped for {len(check_df)} rows\n{check_df}"
110110

111-
print(f"Note: {rep_inc} is not available for vacant units, which have 'Not Available' for in.income")
111+
# print(f"Note: {rep_inc} is not available for vacant units, which have 'Not Available' for in.income")
112112

113113
df3 = df2.select([bldg_id, rep_inc])
114114
if return_map_only:

postprocessing/resstockpostproc/process_bsb_results.py

Lines changed: 82 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -4,87 +4,99 @@
44
55
Example usage:
66
uv run resstockpostproc/process_bsb_results.py /path/to/bsb_raw_results /path/to/output_dir
7+
uv run resstockpostproc/process_bsb_results.py "C:/Scratch/ResStock/efforts/full_550k_run" "s3://oedi-data-lake/nrel-pds-building-stock/end-use-load-profiles-for-us-building-stock/2025/resstock_amy2018_release_1"
78
89
Note: bsb_raw_results folder must contain both baseline and upgrade files. Baseline file should be named
910
results_up00.parquet and upgrade files should be named results_upXX.parquet where XX is the upgrade number. The can
1011
either be in their own folders (baseline and upgrades) or all be in the same folder.
1112
"""
1213

13-
import sys
14+
import re
1415
import polars as pl
1516
from pathlib import Path
1617
from resstockpostproc.process_metadata import (
17-
publish_baseline_annual_results,
18-
publish_upgrade_annual_results,
18+
get_schema_superset,
19+
get_upgrade_rename_dict,
20+
get_failed_building_list,
21+
process_simulation_outputs,
22+
export_metadata_and_annual_results_for_upgrade,
23+
cache_simulation_outputs_file
24+
)
25+
from resstockpostproc.utils import (
26+
setup_fsspec_filesystem
1927
)
20-
import re
21-
22-
23-
def process_results(raw_results_dir: str, output_dir: str) -> None:
24-
output_path = Path(output_dir)
25-
output_path.mkdir(parents=True, exist_ok=True)
26-
result_files = list(Path(raw_results_dir).rglob("*"))
27-
baseline_files = [f for f in result_files if "up00" in f.name.lower()]
28-
upgrade_files = [f for f in result_files if "up00" not in f.name.lower()]
29-
30-
if not baseline_files:
31-
print("Error: No baseline or upgrade files found")
32-
sys.exit(1)
33-
if len(baseline_files) > 1:
34-
print("Error: More than one baseline file found")
35-
sys.exit(1)
36-
37-
baseline_file = baseline_files[0]
38-
print(f"Processing baseline file: {baseline_file}")
39-
baseline_df = read_file(baseline_file)
40-
41-
failed_bldgs = (
42-
baseline_df.filter(pl.col("completed_status") == "Fail")
43-
.select(pl.col("building_id"))
44-
.collect()["building_id"]
45-
.to_list()
46-
)
47-
print(f"Removing {len(failed_bldgs)} buildings that failed in baseline")
48-
bs_pub_df = publish_baseline_annual_results(baseline_df)
49-
write_file(bs_pub_df, output_path, upgrade=0)
50-
51-
for upgrade_file in upgrade_files:
52-
up_info = re.search(r"up(\d+)", upgrade_file.name)
53-
if up_info is None:
54-
continue
55-
upgrade_num = int(up_info.group(1))
5628

57-
print(f"Processing upgrade file: {upgrade_file}, upgrade number: {upgrade_num}")
58-
upgrade_df = read_file(upgrade_file)
59-
up_up_df = publish_upgrade_annual_results(
60-
failed_bldgs, bs_pub_df, upgrade_df, upgrade_num
29+
def export_metadata_and_annual_results(raw_results_dir: str,
30+
output_dir: str,
31+
aws_profile_name = None) -> None:
32+
# Set up filesystem objects for raw results and output directories
33+
raw_results_dir = setup_fsspec_filesystem(raw_results_dir, aws_profile_name)
34+
output_dir = setup_fsspec_filesystem(output_dir, aws_profile_name)
35+
36+
# Find the raw results files
37+
pqt_glob = f'{raw_results_dir["fs_path"]}/**/*.parquet'
38+
result_files = raw_results_dir['fs'].glob(pqt_glob)
39+
baseline_file = [f for f in result_files if "up00" in Path(f).name.lower()][0]
40+
upgrade_ids = [int(re.search(r'up(\d+)', p).group(1)) for p in result_files]
41+
upgrade_ids.sort()
42+
43+
# Information used across upgrades
44+
upgrade_renamer = get_upgrade_rename_dict(raw_results_dir)
45+
col_schema = get_schema_superset(result_files, raw_results_dir)
46+
sim_out_cache_dir = Path(f"{output_dir['fs_path']}/cached_simulation_outputs")
47+
48+
# Process and cache the simulation outputs, starting with the baseline
49+
baseline_df = pl.scan_parquet(baseline_file, storage_options=raw_results_dir['storage_options'])
50+
failed_bldgs = get_failed_building_list(baseline_df)
51+
processed_baseline_df = None
52+
for upgrade_id in upgrade_ids:
53+
upgrade_file = f'{raw_results_dir["fs_path"]}/upgrades/upgrade={upgrade_id}/results_up{upgrade_id:02d}.parquet'
54+
if upgrade_id == 0:
55+
upgrade_file = f'{raw_results_dir["fs_path"]}/baseline/results_up{upgrade_id:02d}.parquet'
56+
57+
print(f"Processing upgrade file: {upgrade_file}, upgrade number: {upgrade_id} {'*'*100}")
58+
raw_upgrade_df = pl.scan_parquet(upgrade_file, storage_options=raw_results_dir['storage_options'])
59+
processed_upgrade_df = process_simulation_outputs(
60+
failed_bldgs,
61+
baseline_df,
62+
processed_baseline_df,
63+
raw_upgrade_df,
64+
upgrade_id,
65+
upgrade_renamer,
66+
col_schema
6167
)
62-
write_file(up_up_df, output_path, upgrade_num)
63-
64-
65-
def read_file(file: Path) -> pl.LazyFrame:
66-
match file.suffix:
67-
case ".parquet":
68-
return pl.scan_parquet(file)
69-
case ".csv":
70-
return pl.scan_csv(file)
71-
case ".gz":
72-
assert file.stem.endswith(".csv"), f"gz file is not a csv: {file}"
73-
return pl.scan_csv(file)
74-
case _:
75-
raise ValueError(f"Unsupported file type: {file}")
76-
77-
78-
def write_file(df: pl.LazyFrame, output_path: Path, upgrade: int):
79-
parquet_file_dir = output_path / "parquet" / f"upgrade={upgrade}"
80-
parquet_file_dir.mkdir(parents=True, exist_ok=True)
81-
csv_file_dir = output_path / "results_csvs_pub"
82-
csv_file_dir.mkdir(parents=True, exist_ok=True)
83-
csv_file = csv_file_dir / f"results_up{upgrade:02d}.csv"
84-
parquet_file = parquet_file_dir / f"results_up{upgrade:02d}.parquet"
85-
df.sink_parquet(parquet_file)
86-
df.sink_csv(csv_file)
87-
print(f"Wrote {upgrade} to {parquet_file} and {csv_file}")
68+
cache_simulation_outputs_file(output_dir, sim_out_cache_dir, upgrade_id, processed_upgrade_df)
69+
up_cols = set(sorted(processed_upgrade_df.collect_schema().names()))
70+
71+
if upgrade_id == 0:
72+
processed_baseline_df = processed_upgrade_df
73+
base_cols = set(sorted(processed_baseline_df.collect_schema().names()))
74+
75+
if not base_cols == up_cols:
76+
raise ValueError("Column set in baseline and upgrade don't match")
77+
78+
# Export files to specified geographic partitions
79+
geo_exports = [
80+
{
81+
'geo_top_dir': 'national',
82+
'partition_cols': {},
83+
'data_types': ['full'], # TODO add basic
84+
'file_types': [ 'csv', 'parquet'],
85+
},
86+
{
87+
'geo_top_dir': 'by_state',
88+
'partition_cols': {
89+
'in.state': 'state'
90+
},
91+
'data_types': ['full'], # TODO add basic
92+
'file_types': ['csv', 'parquet'],
93+
}
94+
]
95+
for upgrade_id in upgrade_ids:
96+
export_metadata_and_annual_results_for_upgrade(
97+
output_dir,
98+
upgrade_id,
99+
geo_exports)
88100

89101

90102
if __name__ == "__main__":
@@ -104,4 +116,4 @@ def write_file(df: pl.LazyFrame, output_path: Path, upgrade: int):
104116
help="Directory to write transformed results",
105117
)
106118
args = parser.parse_args()
107-
process_results(args.raw_results_dir, args.output_dir)
119+
export_metadata_and_annual_results(args.raw_results_dir, args.output_dir)

0 commit comments

Comments
 (0)