Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions import-automation/workflow/ingestion-helper/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

FROM python:3.12-slim

# Copy uv binary
COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /bin/

# Allow statements and log messages to immediately appear in the logs
ENV PYTHONUNBUFFERED True

WORKDIR /app

# Install protobuf compiler and curl
RUN apt-get update && apt-get install -y protobuf-compiler curl && rm -rf /var/lib/apt/lists/*

# Copy local code to the container image.
COPY . .

# Install production dependencies using uv.
RUN uv pip install --system --no-cache .

# Fetch proto file from GitHub
RUN curl -o storage.proto https://raw.githubusercontent.com/datacommonsorg/import/master/pipeline/data/src/main/proto/storage.proto

# Generate proto descriptor set
RUN protoc --include_imports --descriptor_set_out=storage.pb storage.proto

# Run the functions framework
CMD ["functions-framework", "--target", "ingestion_helper"]
Comment thread
gmechali marked this conversation as resolved.
26 changes: 26 additions & 0 deletions import-automation/workflow/ingestion-helper/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,29 @@ Updates the version of an import, records version history, and updates the statu
* `version` (Required): The version string. If set to `'STAGING'`, it resolves to the current staging version.
* `comment` (Required): A comment for the audit log explaining the version update.
* `override` (Optional): Override version without checking import status (boolean)

#### `initialize_database`
Initializes the Spanner database by creating all necessary tables and uploading proto descriptors.

* This action requires no payload parameters. It automatically reads `schema.sql` and `storage.pb` from the container directory to provision the database schema and proto descriptors.
Comment thread
dwnoble marked this conversation as resolved.
* **Note on Protos**: The `storage.pb` file is generated during the Docker build process. The `Dockerfile` fetches `storage.proto` from the `datacommonsorg/import` GitHub repository and compiles it into `storage.pb`.

## Local Development and Testing

To run the helper service locally and test its functionality:

### Running the Server
Ensure you have installed the requirements (`uv pip install -r requirements.txt`), then start the functions framework:

```bash
uv run functions-framework --target ingestion_helper
```
By default, this will start serving on `http://localhost:8080`.

### Triggering Actions
You can test specific actions by sending a POST request with a JSON payload. For example, to trigger database initialization locally:
```bash
curl -X POST http://localhost:8080 \
-H "Content-Type: application/json" \
-d '{"actionType": "initialize_database"}'
```
48 changes: 48 additions & 0 deletions import-automation/workflow/ingestion-helper/cloudbuild.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

steps:
# Build the container image
- name: 'gcr.io/cloud-builders/docker'
args: ['build', '-t', '$_AR_REGION-docker.pkg.dev/datcom-ci/$_AR_REPO/datacommons-ingestion-helper:$_TAG', '-t', '$_AR_REGION-docker.pkg.dev/datcom-ci/$_AR_REPO/datacommons-ingestion-helper:latest', '.']

# Push the container image - Needed before deployment
- name: 'gcr.io/cloud-builders/docker'
args: ['push', '$_AR_REGION-docker.pkg.dev/datcom-ci/$_AR_REPO/datacommons-ingestion-helper:$_TAG']

# Deploy container image to Cloud Run (Conditional)
- name: 'gcr.io/google.com/cloudsdktool/cloud-sdk'
entrypoint: 'bash'
args:
- '-c'
- |
if [ "$_DEPLOY" = "true" ]; then
gcloud run deploy '$_SERVICE_NAME' \
--image '$_AR_REGION-docker.pkg.dev/datcom-ci/$_AR_REPO/datacommons-ingestion-helper:$_TAG' \
--region '$_AR_REGION' \
--project '$PROJECT_ID'
else
echo "Skipping deployment as _DEPLOY is not set to true"
fi

substitutions:
_AR_REGION: 'us'
_AR_REPO: 'gcr.io'
_SERVICE_NAME: 'spanner-ingestion-helper'
_DEPLOY: 'false'
_TAG: 'latest'

images:
- '$_AR_REGION-docker.pkg.dev/datcom-ci/$_AR_REPO/datacommons-ingestion-helper:$_TAG'
- '$_AR_REGION-docker.pkg.dev/datcom-ci/$_AR_REPO/datacommons-ingestion-helper:latest'
5 changes: 5 additions & 0 deletions import-automation/workflow/ingestion-helper/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,5 +193,10 @@ def ingestion_helper(request):
f"OK [Import: {import_name} Version: {version} Status: {params['status']}]",
200)

elif actionType == 'initialize_database':
Comment thread
gmechali marked this conversation as resolved.
# Initializes the database by creating all required tables and proto bundles.
logging.info("Action: initialize_database")
spanner.initialize_database()
return ('OK', 200)
else:
return (f'Unknown actionType: {actionType}', 400)
33 changes: 33 additions & 0 deletions import-automation/workflow/ingestion-helper/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[project]
name = "ingestion-helper"
version = "0.1.0"
description = "Ingestion helper for Data Commons Spanner ingestion"
dependencies = [
"functions-framework==3.*",
"google-cloud-spanner",
"google-api-python-client",
"google-cloud-storage",
"google-auth",
"absl-py",
]

[tool.hatch.build.targets.wheel]
bypass-selection = true
6 changes: 0 additions & 6 deletions import-automation/workflow/ingestion-helper/requirements.txt

This file was deleted.

110 changes: 110 additions & 0 deletions import-automation/workflow/ingestion-helper/schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
-- Copyright 2026 Google LLC
Comment thread
gmechali marked this conversation as resolved.
--
-- Licensed under the Apache License, Version 2.0 (the "License")
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.

CREATE PROTO BUNDLE (
`org.datacommons.Observations`
);

CREATE TABLE Node (
Comment thread
gmechali marked this conversation as resolved.
subject_id STRING(1024) NOT NULL,
value STRING(MAX),
bytes BYTES(MAX),
name STRING(MAX),
types ARRAY<STRING(1024)>,
name_tokenlist TOKENLIST AS (TOKENIZE_FULLTEXT(name)) HIDDEN,
) PRIMARY KEY(subject_id);

CREATE TABLE Edge (
subject_id STRING(1024) NOT NULL,
predicate STRING(1024) NOT NULL,
object_id STRING(1024) NOT NULL,
provenance STRING(1024) NOT NULL,
) PRIMARY KEY(subject_id, predicate, object_id, provenance),
INTERLEAVE IN Node;

CREATE TABLE Observation (
observation_about STRING(1024) NOT NULL,
variable_measured STRING(1024) NOT NULL,
facet_id STRING(1024) NOT NULL,
observation_period STRING(1024),
measurement_method STRING(1024),
unit STRING(1024),
scaling_factor STRING(1024),
observations org.datacommons.Observations,
import_name STRING(1024),
provenance_url STRING(1024),
is_dc_aggregate BOOL,
) PRIMARY KEY(observation_about, variable_measured, facet_id);

CREATE TABLE ImportStatus (
ImportName STRING(MAX) NOT NULL,
LatestVersion STRING(MAX),
GraphPath STRING(MAX),
State STRING(1024) NOT NULL,
JobId STRING(1024),
WorkflowId STRING(1024),
ExecutionTime INT64,
DataVolume INT64,
DataImportTimestamp TIMESTAMP OPTIONS ( allow_commit_timestamp = TRUE ),
StatusUpdateTimestamp TIMESTAMP OPTIONS ( allow_commit_timestamp = TRUE ),
NextRefreshTimestamp TIMESTAMP,
) PRIMARY KEY(ImportName);

CREATE TABLE IngestionHistory (
CompletionTimestamp TIMESTAMP NOT NULL OPTIONS ( allow_commit_timestamp = TRUE ),
IngestionFailure Bool NOT NULL,
WorkflowExecutionID STRING(1024) NOT NULL,
DataflowJobID STRING(1024),
IngestedImports ARRAY<STRING(MAX)>,
ExecutionTime INT64,
NodeCount INT64,
EdgeCount INT64,
ObservationCount INT64,
) PRIMARY KEY(CompletionTimestamp DESC);

CREATE TABLE ImportVersionHistory (
ImportName STRING(MAX) NOT NULL,
Version STRING(MAX) NOT NULL,
UpdateTimestamp TIMESTAMP NOT NULL OPTIONS (allow_commit_timestamp=true),
Comment STRING(MAX),
) PRIMARY KEY (ImportName, UpdateTimestamp DESC);

CREATE TABLE IngestionLock (
LockID STRING(1024) NOT NULL,
LockOwner STRING(1024),
AcquiredTimestamp TIMESTAMP OPTIONS ( allow_commit_timestamp = TRUE ),
) PRIMARY KEY(LockID);

CREATE PROPERTY GRAPH DCGraph
NODE TABLES(
Node
KEY(subject_id)
LABEL Node PROPERTIES(
bytes,
name,
subject_id,
types,
value)
)
EDGE TABLES(
Edge
KEY(subject_id, predicate, object_id, provenance)
SOURCE KEY(subject_id) REFERENCES Node(subject_id)
DESTINATION KEY(object_id) REFERENCES Node(subject_id)
LABEL Edge PROPERTIES(
object_id,
predicate,
provenance,
subject_id)
);
Loading
Loading