Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ them to fit your particular use case.
approach
* [Dataflow Streaming XML to GCS](examples/dataflow-xml-pubsub-to-gcs) -
Dataflow example to handle streaming of xml encoded messages and write them to Google Cloud Storage
* [Dataflow – DLP Flex De-ID (CSV from GCS to BigQuery)](examples/dataflow-dlp-flex-deid) - Dataflow Flex Template that batches CSV rows from Cloud Storage, de-identifies with Sensitive Data Protection (DLP), and writes to BigQuery.
* [Dataflow DLP Hashpipeline](examples/dataflow-dlp-hash-pipeline) - Match DLP
Social Security Number findings against a hashed dictionary in Firestore.
Use Secret Manager for the hash key.
Expand Down
10 changes: 10 additions & 0 deletions examples/dataflow-dlp-flex-deid/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
**/.git
**/.github
**/__pycache__
**/.pytest_cache
*.pyc
*.pyo
*.pyd
*.egg-info
*.log
.DS_Store
10 changes: 10 additions & 0 deletions examples/dataflow-dlp-flex-deid/.gcloudignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
.git/
.github/
__pycache__/
.pytest_cache/
*.pyc
*.pyo
*.pyd
*.egg-info/
*.log
.DS_Store
15 changes: 15 additions & 0 deletions examples/dataflow-dlp-flex-deid/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Python / build
__pycache__/
*.py[cod]
*.egg-info/
.build/
dist/

# Local env / IDE
.venv/
.env
.idea/
.vscode/

# OS / misc
.DS_Store
25 changes: 25 additions & 0 deletions examples/dataflow-dlp-flex-deid/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

FROM gcr.io/dataflow-templates-base/python3-template-launcher-base:PY311-2024-12-01

ENV FLEX_TEMPLATE_PYTHON_PY_FILE=/app/main.py
ENV FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE=/app/requirements.txt

WORKDIR /app
COPY . /app

RUN pip install --no-cache-dir -r /app/requirements.txt

ENTRYPOINT ["/opt/google/dataflow/python_template_launcher"]
144 changes: 144 additions & 0 deletions examples/dataflow-dlp-flex-deid/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
# Dataflow Flex Template: De-identify CSVs in GCS (DLP) → BigQuery

A runnable Flex Template that takes CSVs in Cloud Storage, calls DLP to de-identify sensitive fields, and writes sanitized rows to BigQuery.

> **Prerequisites**
> - Google Cloud project with billing enabled
> - You can run commands either in **Cloud Shell** or locally with the **gcloud** CLI
> - A DLP **De-identification Template** (`projects/<P>/locations/<L>/deidentifyTemplates/<ID>`)

---

## Parameters

| Name | Required | Default | Description |
|------|:--------:|---------|-------------|
| `file_pattern` | ✅ | — | GCS glob to input CSVs |
| `dataset` | ✅ | — | BigQuery dataset (table is created if needed) |
| `deidentify_template_name` | ✅ | — | `projects/<P>/locations/<L>/deidentifyTemplates/<ID>` |
| `csv_headers` **or** `headers_gcs_uri` | ⚙️ | — | Provide headers inline (comma-separated) **or** via a `gs://` file (first line is header) |
| `batch_size` | ⚙️ | 500 | CSV lines per DLP call |
| `dlp_api_retry_count` | ⚙️ | 3 | Retries per batch |
| `skip_header_lines` | ⚙️ | 1 | Header lines to skip in `ReadFromText` |
| `output_table` | ⚙️ | `output_<templateId>` | Output table name within `dataset` |

> **CSV & headers:** The CSV **must** have the same column order/count as the header names you supply. The DLP `table` item uses those names when returning de-identified rows.

---

## Quickstart (Cloud Shell or local with gcloud)

### Variables
```bash
# Run from: examples/dataflow-dlp-flex-deid/
export PROJECT_ID="<YOUR_PROJECT_ID>"
export REGION="us-central1"
export DATASET="sensitive_data"
export STAGING_BUCKET_NAME="${PROJECT_ID}-dataflow-assets"
export AR_REPO_NAME="dataflow-images"
export IMAGE_TAG="${REGION}-docker.pkg.dev/${PROJECT_ID}/${AR_REPO_NAME}/dlp-csv-deid:latest"
export TEMPLATE_SPEC="gs://${STAGING_BUCKET_NAME}/templates/dlp-csv-deid.json"
export DEID_TEMPLATE_NAME="projects/${PROJECT_ID}/locations/global/deidentifyTemplates/<TEMPLATE_ID>"
export SERVICE_ACCOUNT_NAME="dlp-flex-template-runner"
export SERVICE_ACCOUNT_EMAIL="${SERVICE_ACCOUNT_NAME}@${PROJECT_ID}.iam.gserviceaccount.com"
```

### One-time resources
```bash
# Run from: examples/dataflow-dlp-flex-deid/
gcloud config set project "$PROJECT_ID"
gcloud services enable \
dataflow.googleapis.com dlp.googleapis.com cloudbuild.googleapis.com \
artifactregistry.googleapis.com bigquery.googleapis.com compute.googleapis.com

gcloud storage buckets create "gs://${STAGING_BUCKET_NAME}" \
--location="$REGION" --uniform-bucket-level-access || true

bq --location="$REGION" mk --dataset "${PROJECT_ID}:${DATASET}" || true

gcloud artifacts repositories create "${AR_REPO_NAME}" \
--repository-format=docker --location="${REGION}" || true

gcloud iam service-accounts create "${SERVICE_ACCOUNT_NAME}" \
--display-name="DLP Flex Template Runner" || true

for ROLE in roles/dataflow.worker roles/storage.objectAdmin \
roles/bigquery.jobUser roles/bigquery.dataEditor \
roles/artifactregistry.reader roles/dlp.user
do
gcloud projects add-iam-policy-binding "$PROJECT_ID" \
--member="serviceAccount:${SERVICE_ACCOUNT_EMAIL}" \
--role="$ROLE" --condition=None
done
```

### Build and push the template
```bash
# Run from: examples/dataflow-dlp-flex-deid/
gcloud builds submit \
--config cloudbuild.yaml \
--substitutions=_IMAGE_TAG="${IMAGE_TAG}" \
--project="${PROJECT_ID}" .
```

### Build the Flex Template spec
```bash
# Run from: examples/dataflow-dlp-flex-deid/
gcloud dataflow flex-template build "${TEMPLATE_SPEC}" \
--image "${IMAGE_TAG}" \
--sdk-language "PYTHON" \
--metadata-file "metadata.json" \
--project "${PROJECT_ID}"
```

### Run the job
```bash
# Run from: anywhere
JOB_NAME="dlp-deid-csv-$(date +%Y%m%d-%H%M%S)"

# Option A — inline headers:
CSV_HEADERS="name,email,phone"

gcloud dataflow flex-template run "${JOB_NAME}" \
--template-file-gcs-location "${TEMPLATE_SPEC}" \
--region "${REGION}" \
--service-account-email "${SERVICE_ACCOUNT_EMAIL}" \
--staging-location "gs://${STAGING_BUCKET_NAME}/staging" \
--temp-location "gs://${STAGING_BUCKET_NAME}/temp" \
--parameters file_pattern="gs://<INPUT_BUCKET>/<PATH>/*.csv" \
--parameters dataset="${DATASET}" \
--parameters deidentify_template_name="${DEID_TEMPLATE_NAME}" \
--parameters csv_headers="${CSV_HEADERS}" \
--parameters output_table="output_example"

# Option B — headers file in GCS (first line is the header row):
HEADERS_GCS_URI="gs://<INPUT_BUCKET>/headers.txt"

gcloud dataflow flex-template run "${JOB_NAME}" \
--template-file-gcs-location "${TEMPLATE_SPEC}" \
--region "${REGION}" \
--service-account-email "${SERVICE_ACCOUNT_EMAIL}" \
--staging-location "gs://${STAGING_BUCKET_NAME}/staging" \
--temp-location "gs://${STAGING_BUCKET_NAME}/temp" \
--parameters file_pattern="gs://<INPUT_BUCKET>/<PATH>/*.csv" \
--parameters dataset="${DATASET}" \
--parameters deidentify_template_name="${DEID_TEMPLATE_NAME}" \
--parameters headers_gcs_uri="${HEADERS_GCS_URI}" \
--parameters output_table="output_example"
```

> **Output table name:** Defaults to `output_<templateId>` (derived from the last segment of `deidentify_template_name`) unless you set `output_table`.

---

**Optional Private IPs:** add `--network`, `--subnetwork`, and `--disable-public-ips` to the run command and ensure Private Google Access (or Cloud NAT) so workers can reach Google APIs.

---

## Troubleshooting

- **Header mismatch** → ensure headers match CSV columns and your DLP template.
- **Permission denied** → verify runner service account roles listed in “One-time resources”.
- **Template not found** → check `deidentify_template_name` and its location (`global` or regional).

---
28 changes: 28 additions & 0 deletions examples/dataflow-dlp-flex-deid/cloudbuild.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

steps:
- name: gcr.io/cloud-builders/docker
args: ['build','-t','${_IMAGE_TAG}','.']
- name: gcr.io/cloud-builders/docker
args: ['push','${_IMAGE_TAG}']

images:
- '${_IMAGE_TAG}'

substitutions:
_IMAGE_TAG: 'REGION-docker.pkg.dev/PROJECT/REPO/dlp-csv-deid:latest'

options:
logging: CLOUD_LOGGING_ONLY
Loading