[Do not submit] Timeseries backfill avro#501
[Do not submit] Timeseries backfill avro#501rohitkumarbhagat wants to merge 4 commits intodatacommonsorg:masterfrom
Conversation
…alidating observation data
… processing of serialized observations
Not up to standards ⛔🔴 Issues
|
| Category | Results |
|---|---|
| BestPractice | 5 medium |
| Security | 1 high |
| CodeStyle | 47 minor |
| Complexity | 4 medium |
🟢 Metrics 249 complexity · 22 duplication
Metric Results Complexity 249 Duplication 22
NEW Get contextual insights on your PRs based on Codacy's metrics, along with PR and Jira context, without leaving GitHub. Enable AI reviewer
TIP This summary will be updated as you push new changes.
There was a problem hiding this comment.
Code Review
This pull request introduces the timeseries-backfill module, which provides Beam/Dataflow pipelines and a standalone validator for backfilling normalized timeseries tables from legacy Spanner data. The implementation includes support for both live Spanner sources and Avro exports. Review feedback focuses on improving the portability and maintainability of the module by addressing hardcoded local file paths in the documentation and scripts, replacing non-standard shell utilities with more common alternatives, and consolidating redundant command-line examples in the README.
|
|
||
| Destination schema: | ||
|
|
||
| - [rk-experiments/mixer/spanner/bq_spanner_ingestion/timeseries_schema.sql](/home/rohitrkumar_google_com/Documents/dc/github/rohitkumarbhagat/dc_local/rk-experiments/mixer/spanner/bq_spanner_ingestion/timeseries_schema.sql) |
|
|
||
| mvn -Pgit-worktree compile exec:java \ | ||
| -pl timeseries-backfill -am \ | ||
| -Dexec.mainClass=org.datacommons.ingestion.timeseries.TimeseriesBackfillAvroPipeline \ | ||
| -Dexec.args="--project=datcom-store --spannerInstanceId=dc-kg-test --spannerDatabaseId=dc_graph_2026_01_27 --inputFiles=/usr/local/google/home/rohitrkumar/Documents/dc/github/rohitkumarbhagat/import/pipeline/Observation.avro-00042-of-00303 --destinationTimeSeriesTableName=TimeSeries_rk --destinationTimeSeriesAttributeTableName=TimeSeriesAttribute_rk --destinationStatVarObservationTableName=StatVarObservation_rk --runner=DirectRunner" | ||
|
|
| ```bash | ||
| mvn -Pgit-worktree compile exec:java \ | ||
| -pl timeseries-backfill -am \ | ||
| -Dexec.mainClass=org.datacommons.ingestion.timeseries.TimeseriesBackfillAvroPipeline \ | ||
| -Dexec.args="--project=datcom-store --spannerInstanceId=dc-kg-test --spannerDatabaseId=dc_graph_2026_01_27 --inputExportDir=gs://rohitrkumar-dataflow/spanner_obs_dump_2026_04_21/<export_subdir> --destinationTimeSeriesTableName=TimeSeries_rk --destinationTimeSeriesAttributeTableName=TimeSeriesAttribute_rk --destinationStatVarObservationTableName=StatVarObservation_rk --variableMeasured=Count_Person --runner=DataflowRunner --region=us-central1 --tempLocation=gs://keyurs-dataflow/temp --stagingLocation=gs://keyurs-dataflow/temp --numWorkers=20 --maxNumWorkers=100 --workerMachineType=n2-custom-4-32768 --numberOfWorkerHarnessThreads=2" | ||
|
|
||
| mvn -Pgit-worktree compile exec:java \ | ||
| -pl timeseries-backfill -am \ | ||
| -Dexec.mainClass=org.datacommons.ingestion.timeseries.TimeseriesBackfillAvroPipeline \ | ||
| -Dexec.args="--project=datcom-store --spannerInstanceId=dc-kg-test --spannerDatabaseId=dc_graph_2026_01_27 --inputFiles=gs://rohitrkumar-dataflow/spanner_obs_dump_2026_04_21/dc-kg-test-dc_graph_2026_01_27-2026-04-23_05_47_24-8439747614048276587/Observation.avro-00042-of-00303 --destinationTimeSeriesTableName=TimeSeries_rk --destinationTimeSeriesAttributeTableName=TimeSeriesAttribute_rk --destinationStatVarObservationTableName=StatVarObservation_rk --runner=DataflowRunner --region=us-central1 --tempLocation=gs://keyurs-dataflow/temp --stagingLocation=gs://keyurs-dataflow/temp --numWorkers=20 --maxNumWorkers=100 --workerMachineType=n2-custom-4-32768 --numberOfWorkerHarnessThreads=2" | ||
|
|
||
| mvn -Pgit-worktree compile exec:java \ | ||
| -pl timeseries-backfill -am \ | ||
| -Dexec.mainClass=org.datacommons.ingestion.timeseries.TimeseriesBackfillAvroPipeline \ | ||
| -Dexec.args="--project=datcom-store --spannerInstanceId=dc-kg-test --spannerDatabaseId=dc_graph_2026_01_27 --inputExportDir=gs://rohitrkumar-dataflow/spanner_obs_dump_2026_04_21/dc-kg-test-dc_graph_2026_01_27-2026-04-23_05_47_24-8439747614048276587 --destinationTimeSeriesTableName=TimeSeries_rk --destinationTimeSeriesAttributeTableName=TimeSeriesAttribute_rk --destinationStatVarObservationTableName=StatVarObservation_rk --runner=DataflowRunner --region=us-central1 --tempLocation=gs://keyurs-dataflow/temp --stagingLocation=gs://keyurs-dataflow/temp --numWorkers=20 --maxNumWorkers=100 --workerMachineType=n2-custom-4-32768 --numberOfWorkerHarnessThreads=2" | ||
|
|
||
| mvn -Pgit-worktree compile exec:java \ | ||
| -pl timeseries-backfill -am \ | ||
| -Dexec.mainClass=org.datacommons.ingestion.timeseries.TimeseriesBackfillAvroPipeline \ | ||
| -Dexec.args="--project=datcom-store --spannerInstanceId=dc-kg-test --spannerDatabaseId=dc_graph_2026_01_27 --inputExportDir=gs://rohitrkumar-dataflow/spanner_obs_dump_2026_04_21/dc-kg-test-dc_graph_2026_01_27-2026-04-23_05_47_24-8439747614048276587 --destinationTimeSeriesTableName=TimeSeries_rk --destinationTimeSeriesAttributeTableName=TimeSeriesAttribute_rk --destinationStatVarObservationTableName=StatVarObservation_rk --runner=DataflowRunner --region=us-central1 --tempLocation=gs://keyurs-dataflow/temp --stagingLocation=gs://keyurs-dataflow/temp --numWorkers=20 --maxNumWorkers=100 --workerMachineType=n2-custom-4-32768 --numberOfWorkerHarnessThreads=2 | ||
|
|
||
| ``` |
There was a problem hiding this comment.
This section contains multiple, duplicated, and inconsistently formatted command examples, which can be confusing. It's recommended to consolidate these into one or two clear examples for running with --inputExportDir and --inputFiles. Also, please use placeholders for user-specific values like GCS paths.
Here is a suggested cleaned-up version for the --inputExportDir case:
mvn -Pgit-worktree compile exec:java \
-pl timeseries-backfill -am \
-Dexec.mainClass=org.datacommons.ingestion.timeseries.TimeseriesBackfillAvroPipeline \
-Dexec.args="--project=<gcp-project> \
--spannerInstanceId=<instance-id> \
--spannerDatabaseId=<database-id> \
--inputExportDir=gs://<your-bucket>/<path-to-export-dir> \
--destinationTimeSeriesTableName=TimeSeries_rk \
--destinationTimeSeriesAttributeTableName=TimeSeriesAttribute_rk \
--destinationStatVarObservationTableName=StatVarObservation_rk \
--variableMeasured=Count_Person \
--runner=DataflowRunner \
--region=us-central1 \
--tempLocation=gs://<your-bucket>/temp \
--stagingLocation=gs://<your-bucket>/staging \
--numWorkers=20 \
--maxNumWorkers=100 \
--workerMachineType=n2-custom-4-32768 \
--numberOfWorkerHarnessThreads=2"| append_if_present() { | ||
| local pattern="$1" | ||
| local ddl="$2" | ||
| if rg -q "${pattern}" "${current_schema_file}"; then |
There was a problem hiding this comment.
The script uses rg (ripgrep), which is not a standard command-line utility and may not be installed on all systems. For better portability, please use a more common tool like grep. The -q flag in grep provides the same "quiet" functionality.
| if rg -q "${pattern}" "${current_schema_file}"; then | |
| if grep -q "${pattern}" "${current_schema_file}"; then |
…rt instructions, and add sample Avro files
No description provided.