Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions contrib/workflow_with_ai_parse_document/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Databricks
.databricks/

# Python
build/
dist/
__pycache__/
*.egg-info
.venv/
*.py[cod]

# Local configuration (keep your settings private)
databricks.local.yml

# IDE
.idea/
.vscode/
.DS_Store

# Scratch/temporary files
scratch/**
!scratch/README.md

# Test documents (don't commit large PDFs)
*.pdf
*.png
*.jpg
*.jpeg

# Exception: Allow documentation images in assets folder
!assets/*.png
!assets/*.jpg
!assets/*.jpeg
178 changes: 178 additions & 0 deletions contrib/workflow_with_ai_parse_document/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
# AI Document Processing Job with Structured Streaming

A Databricks Asset Bundle demonstrating **incremental document processing** using `ai_parse_document`, `ai_query`, and Databricks Jobs with Structured Streaming.

## Overview

This example shows how to build an incremental job that:
1. **Parses** PDFs and images using [`ai_parse_document`](https://docs.databricks.com/aws/en/sql/language-manual/functions/ai_parse_document)
2. **Extracts** clean text with incremental processing
3. **Analyzes** content using [`ai_query`](https://docs.databricks.com/aws/en/sql/language-manual/functions/ai_query) with LLMs

All stages run as Python notebook tasks in a Databricks Job using Structured Streaming with serverless compute.

## Architecture

```
Source Documents (UC Volume)
Task 1: ai_parse_document → parsed_documents_raw (variant)
Task 2: text extraction → parsed_documents_text (string)
Task 3: ai_query → parsed_documents_structured (json)
```

### Key Features

- **Incremental processing**: Only new files are processed using Structured Streaming checkpoints
- **Serverless compute**: Runs on serverless compute for cost efficiency
- **Task dependencies**: Sequential execution with automatic dependency management
- **Parameterized**: Catalog, schema, volumes, and table names configurable via variables
- **Error handling**: Gracefully handles parsing failures
- **Visual debugging**: Interactive notebook for inspecting results

## Prerequisites

- Databricks workspace with Unity Catalog
- Databricks CLI v0.218.0+
- Unity Catalog volumes for:
- Source documents (PDFs/images)
- Parsed output images
- Streaming checkpoints
- AI functions (`ai_parse_document`, `ai_query`)

## Quick Start

1. **Install and authenticate**
```bash
databricks auth login --host https://your-workspace.cloud.databricks.com
```

2. **Configure** `databricks.yml` with your workspace settings

3. **Validate** the bundle configuration
```bash
databricks bundle validate
```

4. **Deploy**
```bash
databricks bundle deploy
```

5. **Upload documents** to your source volume

6. **Run job** from the Databricks UI (Workflows)

## Configuration

Edit `databricks.yml`:

```yaml
variables:
catalog: main # Your catalog
schema: default # Your schema
source_volume_path: /Volumes/main/default/source_documents # Source PDFs
output_volume_path: /Volumes/main/default/parsed_output # Parsed images
checkpoint_base_path: /Volumes/main/default/checkpoints # Checkpoints
raw_table_name: parsed_documents_raw # Table names
text_table_name: parsed_documents_text
structured_table_name: parsed_documents_structured
```

## Job Tasks

### Task 1: Document Parsing
**File**: `src/transformations/01_parse_documents.py`

Uses `ai_parse_document` to extract text, tables, and metadata from PDFs/images:
- Reads files from volume using Structured Streaming
- Stores variant output with bounding boxes
- Incremental: checkpointed streaming prevents reprocessing

### Task 2: Text Extraction
**File**: `src/transformations/02_extract_text.py`

Extracts clean concatenated text using `transform()`:
- Reads from previous task's table via streaming
- Handles both parser v1.0 and v2.0 formats
- Uses `transform()` for efficient text extraction
- Includes error handling for failed parses

### Task 3: AI Query Extraction
**File**: `src/transformations/03_extract_structured_data.py`

Applies LLM to extract structured insights:
- Reads from text table via streaming
- Uses `ai_query` with Claude Sonnet 4
- Customizable prompt for domain-specific extraction
- Outputs structured JSON

## Visual Debugger

The included notebook visualizes parsing results with interactive bounding boxes.

**Open**: `src/explorations/ai_parse_document -- debug output.py`

**Configure widgets**:
- `input_file`: `/Volumes/main/default/source_docs/sample.pdf`
- `image_output_path`: `/Volumes/main/default/parsed_out/`
- `page_selection`: `all` (or `1-3`, `1,5,10`)

**Features**:
- Color-coded bounding boxes by element type
- Hover tooltips showing extracted content
- Automatic image scaling
- Page selection support

### Example Output

**Document Summary with Element Types**

![Document Summary](assets/document_summary.png)

**Visual Bounding Boxes on Page 1**

![Page 1 Bounding Boxes](assets/page1_bounding_boxes.png)

**Extracted Elements with Descriptions**

![Page 1 Elements List](assets/page1_elements_list.png)

**Page 2 Contents Table with Bounding Boxes**

![Page 2 Contents Table](assets/page2_contents_table.png)

**Table Extraction Example**

![Table Extraction](assets/table_extraction.png)

**Figure Description**

![Figure Description](assets/figure_description.png)

## Project Structure

```
.
├── databricks.yml # Bundle configuration
├── resources/
│ └── ai_parse_document_job.job.yml
├── src/
│ ├── transformations/
│ │ ├── 01_parse_documents.py
│ │ ├── 02_extract_text.py
│ │ └── 03_extract_structured_data.py
│ └── explorations/
│ └── ai_parse_document -- debug output.py
└── README.md
```

## Resources

- [Databricks Asset Bundles](https://docs.databricks.com/dev-tools/bundles/)
- [Databricks Workflows](https://docs.databricks.com/workflows/)
- [Structured Streaming](https://docs.databricks.com/structured-streaming/)
- [`ai_parse_document` Function](https://docs.databricks.com/aws/en/sql/language-manual/functions/ai_parse_document)
- [`ai_query` Function](https://docs.databricks.com/aws/en/sql/language-manual/functions/ai_query)
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
52 changes: 52 additions & 0 deletions contrib/workflow_with_ai_parse_document/databricks.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# This is a Databricks asset bundle definition for ai_parse_document_workflow.
# See https://docs.databricks.com/dev-tools/bundles/index.html for documentation.
bundle:
name: ai_parse_document_workflow

variables:
catalog:
description: The catalog name for the workflow
default: main
schema:
description: The schema name for the workflow
default: default
source_volume_path:
description: Source volume path for PDF files
default: /Volumes/main/default/source_documents
output_volume_path:
description: Output volume path for processed images
default: /Volumes/main/default/parsed_output
checkpoint_base_path:
description: Base path for Structured Streaming checkpoints
default: /tmp/checkpoints/ai_parse_workflow
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This path is local to a driver. Checkpoints will be gone by the time the cluster terminates. Intentional?

raw_table_name:
description: Table name for raw parsed documents
default: parsed_documents_raw
text_table_name:
description: Table name for extracted text
default: parsed_documents_text
structured_table_name:
description: Table name for structured data
default: parsed_documents_structured

include:
- resources/*.yml

targets:
dev:
# The default target uses 'mode: development' to create a development copy.
# - Deployed resources get prefixed with '[dev my_user_name]'
# - Any job schedules and triggers are paused by default.
# See also https://docs.databricks.com/dev-tools/bundles/deployment-modes.html.
mode: development
default: true
workspace:
host: https://your-workspace.cloud.databricks.com

prod:
mode: production
workspace:
host: https://your-workspace.cloud.databricks.com
permissions:
- group_name: users
level: CAN_VIEW
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
resources:
jobs:
ai_parse_document_job:
name: ai_parse_document_job

# Optional: Add a schedule
# schedule:
# quartz_cron_expression: "0 0 * * * ?"
# timezone_id: "UTC"

# Job-level parameters shared across all tasks
parameters:
- name: catalog
default: ${var.catalog}
- name: schema
default: ${var.schema}
- name: source_volume_path
default: ${var.source_volume_path}
- name: output_volume_path
default: ${var.output_volume_path}
- name: checkpoint_base_path
default: ${var.checkpoint_base_path}
- name: raw_table_name
default: ${var.raw_table_name}
- name: text_table_name
default: ${var.text_table_name}
- name: structured_table_name
default: ${var.structured_table_name}

environments:
- environment_key: serverless_env
spec:
client: "3"

tasks:
- task_key: parse_documents
environment_key: serverless_env
notebook_task:
notebook_path: ../src/transformations/01_parse_documents.py
base_parameters:
source_volume_path: "{{job.parameters.source_volume_path}}"
output_volume_path: "{{job.parameters.output_volume_path}}"
checkpoint_location: "{{job.parameters.checkpoint_base_path}}/01_parse_documents"
table_name: "{{job.parameters.raw_table_name}}"

- task_key: extract_text
depends_on:
- task_key: parse_documents
environment_key: serverless_env
notebook_task:
notebook_path: ../src/transformations/02_extract_text.py
base_parameters:
checkpoint_location: "{{job.parameters.checkpoint_base_path}}/02_extract_text"
source_table_name: "{{job.parameters.raw_table_name}}"
table_name: "{{job.parameters.text_table_name}}"

- task_key: extract_structured_data
depends_on:
- task_key: extract_text
environment_key: serverless_env
notebook_task:
notebook_path: ../src/transformations/03_extract_structured_data.py
base_parameters:
checkpoint_location: "{{job.parameters.checkpoint_base_path}}/03_extract_structured_data"
source_table_name: "{{job.parameters.text_table_name}}"
table_name: "{{job.parameters.structured_table_name}}"

max_concurrent_runs: 1
Loading