diff --git a/contrib/workflow_with_ai_parse_document/.gitignore b/contrib/workflow_with_ai_parse_document/.gitignore new file mode 100644 index 00000000..f1a09579 --- /dev/null +++ b/contrib/workflow_with_ai_parse_document/.gitignore @@ -0,0 +1,33 @@ +# Databricks +.databricks/ + +# Python +build/ +dist/ +__pycache__/ +*.egg-info +.venv/ +*.py[cod] + +# Local configuration (keep your settings private) +databricks.local.yml + +# IDE +.idea/ +.vscode/ +.DS_Store + +# Scratch/temporary files +scratch/** +!scratch/README.md + +# Test documents (don't commit large PDFs) +*.pdf +*.png +*.jpg +*.jpeg + +# Exception: Allow documentation images in assets folder +!assets/*.png +!assets/*.jpg +!assets/*.jpeg diff --git a/contrib/workflow_with_ai_parse_document/README.md b/contrib/workflow_with_ai_parse_document/README.md new file mode 100644 index 00000000..2e0e921e --- /dev/null +++ b/contrib/workflow_with_ai_parse_document/README.md @@ -0,0 +1,178 @@ +# AI Document Processing Job with Structured Streaming + +A Databricks Asset Bundle demonstrating **incremental document processing** using `ai_parse_document`, `ai_query`, and Databricks Jobs with Structured Streaming. + +## Overview + +This example shows how to build an incremental job that: +1. **Parses** PDFs and images using [`ai_parse_document`](https://docs.databricks.com/aws/en/sql/language-manual/functions/ai_parse_document) +2. **Extracts** clean text with incremental processing +3. **Analyzes** content using [`ai_query`](https://docs.databricks.com/aws/en/sql/language-manual/functions/ai_query) with LLMs + +All stages run as Python notebook tasks in a Databricks Job using Structured Streaming with serverless compute. + +## Architecture + +``` +Source Documents (UC Volume) + ↓ + Task 1: ai_parse_document → parsed_documents_raw (variant) + ↓ + Task 2: text extraction → parsed_documents_text (string) + ↓ + Task 3: ai_query → parsed_documents_structured (json) +``` + +### Key Features + +- **Incremental processing**: Only new files are processed using Structured Streaming checkpoints +- **Serverless compute**: Runs on serverless compute for cost efficiency +- **Task dependencies**: Sequential execution with automatic dependency management +- **Parameterized**: Catalog, schema, volumes, and table names configurable via variables +- **Error handling**: Gracefully handles parsing failures +- **Visual debugging**: Interactive notebook for inspecting results + +## Prerequisites + +- Databricks workspace with Unity Catalog +- Databricks CLI v0.218.0+ +- Unity Catalog volumes for: + - Source documents (PDFs/images) + - Parsed output images + - Streaming checkpoints +- AI functions (`ai_parse_document`, `ai_query`) + +## Quick Start + +1. **Install and authenticate** + ```bash + databricks auth login --host https://your-workspace.cloud.databricks.com + ``` + +2. **Configure** `databricks.yml` with your workspace settings + +3. **Validate** the bundle configuration + ```bash + databricks bundle validate + ``` + +4. **Deploy** + ```bash + databricks bundle deploy + ``` + +5. **Upload documents** to your source volume + +6. **Run job** from the Databricks UI (Workflows) + +## Configuration + +Edit `databricks.yml`: + +```yaml +variables: + catalog: main # Your catalog + schema: default # Your schema + source_volume_path: /Volumes/main/default/source_documents # Source PDFs + output_volume_path: /Volumes/main/default/parsed_output # Parsed images + checkpoint_base_path: /Volumes/main/default/checkpoints # Checkpoints + raw_table_name: parsed_documents_raw # Table names + text_table_name: parsed_documents_text + structured_table_name: parsed_documents_structured +``` + +## Job Tasks + +### Task 1: Document Parsing +**File**: `src/transformations/01_parse_documents.py` + +Uses `ai_parse_document` to extract text, tables, and metadata from PDFs/images: +- Reads files from volume using Structured Streaming +- Stores variant output with bounding boxes +- Incremental: checkpointed streaming prevents reprocessing + +### Task 2: Text Extraction +**File**: `src/transformations/02_extract_text.py` + +Extracts clean concatenated text using `transform()`: +- Reads from previous task's table via streaming +- Handles both parser v1.0 and v2.0 formats +- Uses `transform()` for efficient text extraction +- Includes error handling for failed parses + +### Task 3: AI Query Extraction +**File**: `src/transformations/03_extract_structured_data.py` + +Applies LLM to extract structured insights: +- Reads from text table via streaming +- Uses `ai_query` with Claude Sonnet 4 +- Customizable prompt for domain-specific extraction +- Outputs structured JSON + +## Visual Debugger + +The included notebook visualizes parsing results with interactive bounding boxes. + +**Open**: `src/explorations/ai_parse_document -- debug output.py` + +**Configure widgets**: +- `input_file`: `/Volumes/main/default/source_docs/sample.pdf` +- `image_output_path`: `/Volumes/main/default/parsed_out/` +- `page_selection`: `all` (or `1-3`, `1,5,10`) + +**Features**: +- Color-coded bounding boxes by element type +- Hover tooltips showing extracted content +- Automatic image scaling +- Page selection support + +### Example Output + +**Document Summary with Element Types** + +![Document Summary](assets/document_summary.png) + +**Visual Bounding Boxes on Page 1** + +![Page 1 Bounding Boxes](assets/page1_bounding_boxes.png) + +**Extracted Elements with Descriptions** + +![Page 1 Elements List](assets/page1_elements_list.png) + +**Page 2 Contents Table with Bounding Boxes** + +![Page 2 Contents Table](assets/page2_contents_table.png) + +**Table Extraction Example** + +![Table Extraction](assets/table_extraction.png) + +**Figure Description** + +![Figure Description](assets/figure_description.png) + +## Project Structure + +``` +. +├── databricks.yml # Bundle configuration +├── resources/ +│ └── ai_parse_document_job.job.yml +├── src/ +│ ├── transformations/ +│ │ ├── 01_parse_documents.py +│ │ ├── 02_extract_text.py +│ │ └── 03_extract_structured_data.py +│ └── explorations/ +│ └── ai_parse_document -- debug output.py +└── README.md +``` + +## Resources + +- [Databricks Asset Bundles](https://docs.databricks.com/dev-tools/bundles/) +- [Databricks Workflows](https://docs.databricks.com/workflows/) +- [Structured Streaming](https://docs.databricks.com/structured-streaming/) +- [`ai_parse_document` Function](https://docs.databricks.com/aws/en/sql/language-manual/functions/ai_parse_document) +- [`ai_query` Function](https://docs.databricks.com/aws/en/sql/language-manual/functions/ai_query) diff --git a/contrib/workflow_with_ai_parse_document/assets/document_summary.png b/contrib/workflow_with_ai_parse_document/assets/document_summary.png new file mode 100644 index 00000000..4508b367 Binary files /dev/null and b/contrib/workflow_with_ai_parse_document/assets/document_summary.png differ diff --git a/contrib/workflow_with_ai_parse_document/assets/figure_description.png b/contrib/workflow_with_ai_parse_document/assets/figure_description.png new file mode 100644 index 00000000..a1b54efc Binary files /dev/null and b/contrib/workflow_with_ai_parse_document/assets/figure_description.png differ diff --git a/contrib/workflow_with_ai_parse_document/assets/page1_bounding_boxes.png b/contrib/workflow_with_ai_parse_document/assets/page1_bounding_boxes.png new file mode 100644 index 00000000..4e2cd16e Binary files /dev/null and b/contrib/workflow_with_ai_parse_document/assets/page1_bounding_boxes.png differ diff --git a/contrib/workflow_with_ai_parse_document/assets/page1_elements_list.png b/contrib/workflow_with_ai_parse_document/assets/page1_elements_list.png new file mode 100644 index 00000000..a44fc988 Binary files /dev/null and b/contrib/workflow_with_ai_parse_document/assets/page1_elements_list.png differ diff --git a/contrib/workflow_with_ai_parse_document/assets/page2_contents_table.png b/contrib/workflow_with_ai_parse_document/assets/page2_contents_table.png new file mode 100644 index 00000000..af9ba869 Binary files /dev/null and b/contrib/workflow_with_ai_parse_document/assets/page2_contents_table.png differ diff --git a/contrib/workflow_with_ai_parse_document/assets/table_extraction.png b/contrib/workflow_with_ai_parse_document/assets/table_extraction.png new file mode 100644 index 00000000..9f9290cb Binary files /dev/null and b/contrib/workflow_with_ai_parse_document/assets/table_extraction.png differ diff --git a/contrib/workflow_with_ai_parse_document/databricks.yml b/contrib/workflow_with_ai_parse_document/databricks.yml new file mode 100644 index 00000000..c8784a9a --- /dev/null +++ b/contrib/workflow_with_ai_parse_document/databricks.yml @@ -0,0 +1,52 @@ +# This is a Databricks asset bundle definition for ai_parse_document_workflow. +# See https://docs.databricks.com/dev-tools/bundles/index.html for documentation. +bundle: + name: ai_parse_document_workflow + +variables: + catalog: + description: The catalog name for the workflow + default: main + schema: + description: The schema name for the workflow + default: default + source_volume_path: + description: Source volume path for PDF files + default: /Volumes/main/default/source_documents + output_volume_path: + description: Output volume path for processed images + default: /Volumes/main/default/parsed_output + checkpoint_base_path: + description: Base path for Structured Streaming checkpoints + default: /tmp/checkpoints/ai_parse_workflow + raw_table_name: + description: Table name for raw parsed documents + default: parsed_documents_raw + text_table_name: + description: Table name for extracted text + default: parsed_documents_text + structured_table_name: + description: Table name for structured data + default: parsed_documents_structured + +include: + - resources/*.yml + +targets: + dev: + # The default target uses 'mode: development' to create a development copy. + # - Deployed resources get prefixed with '[dev my_user_name]' + # - Any job schedules and triggers are paused by default. + # See also https://docs.databricks.com/dev-tools/bundles/deployment-modes.html. + mode: development + default: true + workspace: + host: https://your-workspace.cloud.databricks.com + + prod: + mode: production + workspace: + host: https://your-workspace.cloud.databricks.com + permissions: + - group_name: users + level: CAN_VIEW diff --git a/contrib/workflow_with_ai_parse_document/resources/ai_parse_document_job.job.yml b/contrib/workflow_with_ai_parse_document/resources/ai_parse_document_job.job.yml new file mode 100644 index 00000000..ea8e7abd --- /dev/null +++ b/contrib/workflow_with_ai_parse_document/resources/ai_parse_document_job.job.yml @@ -0,0 +1,68 @@ +resources: + jobs: + ai_parse_document_job: + name: ai_parse_document_job + + # Optional: Add a schedule + # schedule: + # quartz_cron_expression: "0 0 * * * ?" + # timezone_id: "UTC" + + # Job-level parameters shared across all tasks + parameters: + - name: catalog + default: ${var.catalog} + - name: schema + default: ${var.schema} + - name: source_volume_path + default: ${var.source_volume_path} + - name: output_volume_path + default: ${var.output_volume_path} + - name: checkpoint_base_path + default: ${var.checkpoint_base_path} + - name: raw_table_name + default: ${var.raw_table_name} + - name: text_table_name + default: ${var.text_table_name} + - name: structured_table_name + default: ${var.structured_table_name} + + environments: + - environment_key: serverless_env + spec: + client: "3" + + tasks: + - task_key: parse_documents + environment_key: serverless_env + notebook_task: + notebook_path: ../src/transformations/01_parse_documents.py + base_parameters: + source_volume_path: "{{job.parameters.source_volume_path}}" + output_volume_path: "{{job.parameters.output_volume_path}}" + checkpoint_location: "{{job.parameters.checkpoint_base_path}}/01_parse_documents" + table_name: "{{job.parameters.raw_table_name}}" + + - task_key: extract_text + depends_on: + - task_key: parse_documents + environment_key: serverless_env + notebook_task: + notebook_path: ../src/transformations/02_extract_text.py + base_parameters: + checkpoint_location: "{{job.parameters.checkpoint_base_path}}/02_extract_text" + source_table_name: "{{job.parameters.raw_table_name}}" + table_name: "{{job.parameters.text_table_name}}" + + - task_key: extract_structured_data + depends_on: + - task_key: extract_text + environment_key: serverless_env + notebook_task: + notebook_path: ../src/transformations/03_extract_structured_data.py + base_parameters: + checkpoint_location: "{{job.parameters.checkpoint_base_path}}/03_extract_structured_data" + source_table_name: "{{job.parameters.text_table_name}}" + table_name: "{{job.parameters.structured_table_name}}" + + max_concurrent_runs: 1 diff --git a/contrib/workflow_with_ai_parse_document/src/explorations/ai_parse_document -- debug output.py b/contrib/workflow_with_ai_parse_document/src/explorations/ai_parse_document -- debug output.py new file mode 100644 index 00000000..9f3f507d --- /dev/null +++ b/contrib/workflow_with_ai_parse_document/src/explorations/ai_parse_document -- debug output.py @@ -0,0 +1,824 @@ +# Databricks notebook source +# MAGIC %md +# MAGIC # 🔍 AI Parse Document Debug Interface +# MAGIC +# MAGIC Version 1.3 +# MAGIC +# MAGIC Last update: Oct 6, 2025 +# MAGIC +# MAGIC Changelog: +# MAGIC - Simplified widget parameters: `input_file` and `image_output_path` now accept full volume paths +# MAGIC - Removed separate `catalog`, `schema`, `volume` widgets +# MAGIC - `input_file` supports wildcards for processing multiple files (e.g., `/Volumes/catalog/schema/volume/input/*`) +# MAGIC +# MAGIC ## Overview +# MAGIC This notebook provides a **visual debugging interface** for analyzing the output of Databricks' `ai_parse_document` function. It renders parsed documents with interactive bounding box overlays, allowing you to inspect what content was extracted from each region of your documents. +# MAGIC +# MAGIC ## Features +# MAGIC - 📊 **Visual Bounding Boxes**: Color-coded overlays showing the exact regions where text/elements were detected +# MAGIC - 🎯 **Interactive Tooltips**: Hover over any bounding box to see the parsed content from that region +# MAGIC - 📐 **Automatic Scaling**: Large documents are automatically scaled to fit within 1024px width for optimal viewing +# MAGIC - 🎨 **Element Type Visualization**: Different colors for different element types (text, headers, tables, figures, etc.) +# MAGIC +# MAGIC ## Required Parameters +# MAGIC +# MAGIC This interface requires widget parameters to be configured before running: +# MAGIC +# MAGIC ### 1. `input_file` +# MAGIC - **Description**: Full Unity Catalog volume path to the document(s) you want to parse and visualize +# MAGIC - **Examples**: +# MAGIC - Single file: `/Volumes/catalog/schema/volume/input/document.pdf` +# MAGIC - All files in directory: `/Volumes/catalog/schema/volume/input/*` +# MAGIC - Pattern matching: `/Volumes/catalog/schema/volume/input/*.pdf` +# MAGIC - **Requirements**: Read access to the volume containing your PDF/image files +# MAGIC +# MAGIC ### 2. `image_output_path` +# MAGIC - **Description**: Full Unity Catalog volume path where `ai_parse_document` will store the extracted page images +# MAGIC - **Example**: `/Volumes/catalog/schema/volume/output/` +# MAGIC - **Requirements**: Write access required for storing intermediate image outputs +# MAGIC - **Note**: As documented in the [official Databricks documentation](https://docs.databricks.com/aws/en/sql/language-manual/functions/ai_parse_document), this path is used by the parsing function to store page images that are referenced in the output +# MAGIC +# MAGIC ### 3. `page_selection` +# MAGIC - **Description**: Specifies which pages to display in the visualization +# MAGIC - **Supported formats**: +# MAGIC - `"all"` or leave empty: Display all pages +# MAGIC - `"3"`: Display only page 3 (1-indexed) +# MAGIC - `"1-5"`: Display pages 1 through 5 (inclusive, 1-indexed) +# MAGIC - `"1,3,5"`: Display specific pages (1-indexed) +# MAGIC - `"1-3,7,10-12"`: Mixed ranges and individual pages +# MAGIC +# MAGIC ## Usage Instructions +# MAGIC +# MAGIC 1. **Clone this notebook** to your workspace: +# MAGIC - Select **"File -> Clone"** button in the top toolbar +# MAGIC - Choose your desired location in your workspace +# MAGIC - This ensures you have a personal copy you can modify and run +# MAGIC +# MAGIC 2. **Prepare your Unity Catalog volumes**: +# MAGIC - Create or identify a volume for your PDF/image files +# MAGIC - Create or identify a volume for output images +# MAGIC - Upload your PDF files to the input location +# MAGIC +# MAGIC 3. **Configure the widget parameters** at the top of this notebook: +# MAGIC - Set `input_file` to the full volume path (file or directory with wildcard) +# MAGIC - Set `image_output_path` to the full volume path for outputs +# MAGIC - Set `page_selection` to control which pages to visualize +# MAGIC +# MAGIC 4. **Run all code cells** which will generate visual debugging results. +# MAGIC +# MAGIC ## What You'll See +# MAGIC +# MAGIC - **Document Summary**: Overview of pages, element counts, and document metadata +# MAGIC - **Color Legend**: Visual guide showing which colors represent which element types +# MAGIC - **Annotated Images**: Each page with overlaid bounding boxes +# MAGIC - Hover over any box to see the extracted content +# MAGIC - Yellow highlight indicates the currently hovered element +# MAGIC - **Parsed Elements List**: Complete list of all extracted elements with their content + +# COMMAND ---------- + +# Exec Parameters + +dbutils.widgets.text("input_file", "/Volumes/main/default/source_documents/sample.pdf") +dbutils.widgets.text("image_output_path", "/Volumes/main/default/parsed_output/") +dbutils.widgets.text("page_selection", "all") + +input_file = dbutils.widgets.get("input_file") +image_output_path = dbutils.widgets.get("image_output_path") +page_selection = dbutils.widgets.get("page_selection") + +# COMMAND ---------- + +# DBTITLE 1,Configuration Parameters +# Path configuration - use widget values as-is + +source_files = input_file + +# Parse page selection string and return list of page indices to display. +# +# Supported formats: +# - "all" or None: Display all pages +# - "3": Display specific page (1-indexed) +# - "1-5": Display page range (inclusive, 1-indexed) +# - "1,3,5": Display list of specific pages (1-indexed) +# - "1-3,7,10-12": Mixed ranges and individual pages +page_selection = f"{page_selection}" + +# COMMAND ---------- + +# DBTITLE 1,Run Document Parse Code (may take some time) +# SQL statement with ai_parse_document() +# Note: input_file can be a single file path or a directory path with wildcard +sql = f""" +with parsed_documents AS ( + SELECT + path, + ai_parse_document(content + , + map( + 'version', '2.0', + 'imageOutputPath', '{image_output_path}', + 'descriptionElementTypes', '*' + ) + ) as parsed + FROM + read_files('{source_files}', format => 'binaryFile') +) +select * from parsed_documents +""" + +parsed_results = [row.parsed for row in spark.sql(sql).collect()] + +# COMMAND ---------- + +import json +from typing import Dict, List, Any, Optional, Tuple, Set, Union +from IPython.display import HTML, display +import base64 +import os +from PIL import Image +import io + + +class DocumentRenderer: + def __init__(self): + # Color mapping for different element types + self.element_colors = { + "section_header": "#FF6B6B", + "text": "#4ECDC4", + "figure": "#45B7D1", + "caption": "#96CEB4", + "page_footer": "#FFEAA7", + "page_header": "#DDA0DD", + "table": "#98D8C8", + "list": "#F7DC6F", + "default": "#BDC3C7", + } + + def _parse_page_selection( + self, page_selection: Union[str, None], total_pages: int + ) -> Set[int]: + """Parse page selection string and return set of page indices (0-based). + + Args: + page_selection: Selection string or None + total_pages: Total number of pages available + + Returns: + Set of 0-based page indices to display + """ + # Handle None or "all" - return all pages + if page_selection is None or page_selection.lower() == "all": + return set(range(total_pages)) + + selected_pages = set() + + # Clean the input + page_selection = page_selection.strip() + + # Split by commas for multiple selections + parts = page_selection.split(",") + + for part in parts: + part = part.strip() + + # Check if it's a range (contains hyphen) + if "-" in part: + try: + # Split range and convert to integers + range_parts = part.split("-") + if len(range_parts) == 2: + start = int(range_parts[0].strip()) + end = int(range_parts[1].strip()) + + # Convert from 1-indexed to 0-indexed + start_idx = start - 1 + end_idx = end - 1 + + # Add all pages in range (inclusive) + for i in range(start_idx, end_idx + 1): + if 0 <= i < total_pages: + selected_pages.add(i) + except ValueError: + print(f"Warning: Invalid range '{part}' in page selection") + else: + # Single page number + try: + page_num = int(part.strip()) + # Convert from 1-indexed to 0-indexed + page_idx = page_num - 1 + if 0 <= page_idx < total_pages: + selected_pages.add(page_idx) + else: + print( + f"Warning: Page {page_num} is out of range (1-{total_pages})" + ) + except ValueError: + print(f"Warning: Invalid page number '{part}' in page selection") + + # If no valid pages were selected, default to all pages + if not selected_pages: + print( + f"Warning: No valid pages in selection '{page_selection}'. Showing all pages." + ) + return set(range(total_pages)) + + return selected_pages + + def _get_element_color(self, element_type: str) -> str: + """Get color for element type.""" + return self.element_colors.get( + element_type.lower(), self.element_colors["default"] + ) + + def _get_image_dimensions(self, image_path: str) -> Optional[Tuple[int, int]]: + """Get dimensions of an image file.""" + try: + if os.path.exists(image_path): + with Image.open(image_path) as img: + return img.size # Returns (width, height) + return None + except Exception as e: + print(f"Error getting image dimensions for {image_path}: {e}") + return None + + def _load_image_as_base64(self, image_path: str) -> Optional[str]: + """Load image from file path and convert to base64.""" + try: + if os.path.exists(image_path): + with open(image_path, "rb") as img_file: + img_data = img_file.read() + img_base64 = base64.b64encode(img_data).decode("utf-8") + ext = os.path.splitext(image_path)[1].lower() + if ext in [".jpg", ".jpeg"]: + return f"data:image/jpeg;base64,{img_base64}" + elif ext in [".png"]: + return f"data:image/png;base64,{img_base64}" + else: + return f"data:image/jpeg;base64,{img_base64}" + return None + except Exception as e: + print(f"Error loading image {image_path}: {e}") + return None + + def _render_element_content(self, element: Dict, for_tooltip: bool = False) -> str: + """Render element content with appropriate formatting for both tooltip and element list display. + + Args: + element: The element dictionary containing content/description + for_tooltip: Whether this is for tooltip display (affects styling and truncation) + """ + element_type = element.get("type", "unknown") + content = element.get("content", "") + description = element.get("description", "") + + display_content = "" + + if content: + if element_type == "table": + # Render the HTML table with styling + table_html = content + + # Apply different styling based on context + if for_tooltip: + # Compact styling for tooltips with light theme + # Use full width available for tooltip tables + table_style = f'''style="width: 100%; border-collapse: collapse; margin: 5px 0; font-size: 10px;"''' + th_style = 'style="border: 1px solid #ddd; padding: 4px; background: #f8f9fa; color: #333; font-weight: bold; text-align: left; font-size: 10px;"' + td_style = 'style="border: 1px solid #ddd; padding: 4px; color: #333; font-size: 10px;"' + thead_style = 'style="background: #e9ecef;"' + else: + # Full styling for element list + table_style = '''style="width: 100%; border-collapse: collapse; margin: 10px 0; font-size: 13px;"''' + th_style = 'style="border: 1px solid #ddd; padding: 8px; background: #f5f5f5; font-weight: bold; text-align: left;"' + td_style = 'style="border: 1px solid #ddd; padding: 8px;"' + thead_style = 'style="background: #f0f0f0;"' + + # Apply styling transformations + if "" in table_html: + table_html = table_html.replace("
", f"
") + if "" in table_html: + table_html = table_html.replace("", f"") + + if for_tooltip: + display_content = table_html + else: + display_content = f"
{table_html}
" + else: + # Regular content handling + if for_tooltip and len(content) > 500: + # Truncate for tooltip display and escape HTML for safety + display_content = self._escape_for_html_attribute( + content[:500] + "..." + ) + else: + display_content = ( + self._escape_for_html_attribute(content) + if for_tooltip + else content + ) + elif description: + desc_content = description + if for_tooltip and len(desc_content) > 500: + desc_content = desc_content[:500] + "..." + + if for_tooltip: + display_content = self._escape_for_html_attribute( + f"Description: {desc_content}" + ) + else: + display_content = f"Description: {desc_content}" + else: + display_content = ( + "No content available" if for_tooltip else "No content" + ) + + return display_content + + def _escape_for_html_attribute(self, text: str) -> str: + """Escape text for safe use in HTML attributes.""" + return ( + text.replace("&", "&") + .replace("<", "<") + .replace(">", ">") + .replace('"', """) + .replace("'", "'") + .replace("\n", "
") + ) + + def _calculate_tooltip_width(self, element: Dict, image_width: int) -> int: + """Calculate dynamic tooltip width based on table content.""" + element_type = element.get("type", "unknown") + content = element.get("content", "") + + if element_type == "table" and content: + # Count columns by looking for ", content, re.DOTALL | re.IGNORECASE + ) + if first_row_match: + first_row = first_row_match.group(1) + # Count th or td tags + th_count = len(re.findall(r"]*>", first_row, re.IGNORECASE)) + td_count = len(re.findall(r"]*>", first_row, re.IGNORECASE)) + column_count = max(th_count, td_count) + + if column_count > 0: + # Base width + additional width per column + base_width = 300 + width_per_column = 80 + calculated_width = base_width + (column_count * width_per_column) + + # Cap at 4/5th of image width + max_width = int(image_width * 0.8) + return min(calculated_width, max_width) + + # Default width for non-tables or when calculation fails + return 400 + + def _create_annotated_image(self, page: Dict, elements: List[Dict]) -> str: + """Create annotated image with SCALING to fit within 1024px width.""" + image_uri = page.get("image_uri", "") + page_id = page.get("id", 0) + + if not image_uri: + return "

No image URI found for this page

" + + # Load image + img_data_uri = self._load_image_as_base64(image_uri) + if not img_data_uri: + return f""" +
+ Could not load image: {image_uri}
+ Make sure the file exists and is accessible. +
+ """ + + # Get original image dimensions + original_dimensions = self._get_image_dimensions(image_uri) + if not original_dimensions: + # Fallback: display without explicit scaling + original_width, original_height = 1024, 768 # Default fallback + else: + original_width, original_height = original_dimensions + + # Calculate scaling factor to fit within 1024px width + max_display_width = 1024 + scale_factor = 1.0 + display_width = original_width + display_height = original_height + + if original_width > max_display_width: + scale_factor = max_display_width / original_width + display_width = max_display_width + display_height = int(original_height * scale_factor) + + # Filter elements for this page and collect their bounding boxes + page_elements = [] + + for elem in elements: + elem_bboxes = [] + for bbox in elem.get("bbox", []): + if bbox.get("page_id", 0) == page_id: + coord = bbox.get("coord", []) + if len(coord) >= 4: + elem_bboxes.append(bbox) + + if elem_bboxes: + page_elements.append({"element": elem, "bboxes": elem_bboxes}) + + if not page_elements: + return f"

No elements found for page {page_id}

" + + header_info = f""" +
+ Page {page_id + 1}: {len(page_elements)} elements
+ Original size: {original_width}×{original_height}px | + Display size: {display_width}×{display_height}px | + Scale factor: {scale_factor:.3f}
+
+ """ + + # Generate unique container ID for this page + container_id = f"page_container_{page_id}_{id(self)}" + + # Create bounding box overlays using SCALED coordinates with hover functionality + overlays = [] + + for idx, item in enumerate(page_elements): + element = item["element"] + element_id = element.get("id", "N/A") + element_type = element.get("type", "unknown") + color = self._get_element_color(element_type) + + # Use the shared content renderer for tooltip + tooltip_content = self._render_element_content(element, for_tooltip=True) + + # Calculate dynamic tooltip width + tooltip_width = self._calculate_tooltip_width(element, display_width) + + # Tables should render as HTML, other content should be escaped + + for bbox_idx, bbox in enumerate(item["bboxes"]): + coord = bbox.get("coord", []) + if len(coord) >= 4: + x1, y1, x2, y2 = coord + + # Apply scaling to coordinates + scaled_x1 = x1 * scale_factor + scaled_y1 = y1 * scale_factor + scaled_x2 = x2 * scale_factor + scaled_y2 = y2 * scale_factor + + width = scaled_x2 - scaled_x1 + height = scaled_y2 - scaled_y1 + + # Skip invalid boxes + if width <= 0 or height <= 0: + continue + + # Position label above box when possible + label_top = -18 if scaled_y1 >= 18 else 2 + + # Unique ID for this bounding box + box_id = f"bbox_{page_id}_{idx}_{bbox_idx}" + + # Calculate tooltip position (prefer right side, but switch to left if needed) + tooltip_left = 10 + + overlay = f""" +
+
+ {element_type.upper()[:6]}#{element_id} +
+ +
+
+ {element_type.upper()} #{element_id} +
+
+ {tooltip_content} +
+
+
+ """ + overlays.append(overlay) + + # Pure CSS hover functionality (works in Databricks) + styles = f""" + + """ + + return f""" + {header_info} + {styles} +
+ Page {page_id + 1} + {"".join(overlays)} +
+ """ + + def _create_page_elements_list(self, page_id: int, elements: List[Dict]) -> str: + """Create a detailed list of elements for a specific page.""" + # Filter elements for this page + page_elements = [] + + for elem in elements: + elem_bboxes = [] + for bbox in elem.get("bbox", []): + if bbox.get("page_id", 0) == page_id: + elem_bboxes.append(bbox) + + if elem_bboxes: + page_elements.append(elem) + + if not page_elements: + return f"

No elements found for page {page_id + 1}

" + + html_parts = [] + + for element in page_elements: + element_id = element.get("id", "N/A") + element_type = element.get("type", "unknown") + color = self._get_element_color(element_type) + + # Get bounding box info for this page only + bbox_info = "No bbox" + bbox_list = element.get("bbox", []) + if bbox_list: + bbox_details = [] + for bbox in bbox_list: + if bbox.get("page_id", 0) == page_id: + coord = bbox.get("coord", []) + if len(coord) >= 4: + bbox_details.append( + f"[{coord[0]:.0f}, {coord[1]:.0f}, {coord[2]:.0f}, {coord[3]:.0f}]" + ) + bbox_info = "; ".join(bbox_details) if bbox_details else "Invalid bbox" + + # Use the shared content renderer for element list display + display_content = self._render_element_content(element, for_tooltip=False) + + element_html = f""" +
+
+

+ {element_type.upper().replace("_", " ")} (ID: {element_id}) +

+ + {bbox_info} + +
+
+ {display_content} +
+
+ """ + html_parts.append(element_html) + + return f""" +
+

📋 Page {page_id + 1} Elements ({len(page_elements)} items)

+ {"".join(html_parts)} +
+ """ + + def _create_summary( + self, document: Dict, metadata: Dict, selected_pages: Set[int], total_pages: int + ) -> str: + """Create a summary with page selection info.""" + elements = document.get("elements", []) + + # Count elements only on selected pages + selected_elements = [] + for elem in elements: + for bbox in elem.get("bbox", []): + if bbox.get("page_id", 0) in selected_pages: + selected_elements.append(elem) + break + + # Count by type (for selected pages) + type_counts = {} + for elem in selected_elements: + elem_type = elem.get("type", "unknown") + type_counts[elem_type] = type_counts.get(elem_type, 0) + 1 + + type_list = ", ".join([f"{t}: {c}" for t, c in type_counts.items()]) + + # Create page selection info + if len(selected_pages) == total_pages: + page_info = f"All {total_pages} pages" + else: + # Convert to 1-indexed for display + page_nums = sorted([p + 1 for p in selected_pages]) + if len(page_nums) <= 10: + page_info = f"Pages {', '.join(map(str, page_nums))} ({len(selected_pages)} of {total_pages})" + else: + page_info = f"{len(selected_pages)} of {total_pages} pages selected" + + return f""" +
+

📄 Document Summary

+

Displaying: {page_info}

+

Elements on selected pages: {len(selected_elements)}

+

Element Types: {type_list if type_list else "None"}

+

Document ID: {str(metadata.get("id", "N/A"))[:12]}...

+
+ """ + + def render_document( + self, parsed_result: Any, page_selection: Union[str, None] = None + ) -> None: + """Main render function with page selection support. + + Args: + parsed_result: The parsed document result + page_selection: Page selection string. Supported formats: + - "all" or None: Display all pages + - "3": Display only page 3 (1-indexed) + - "1-5": Display pages 1 through 5 (inclusive) + - "1,3,5": Display specific pages + - "1-3,7,10-12": Mixed format + """ + try: + # Convert to dict + if hasattr(parsed_result, "toPython"): + parsed_dict = parsed_result.toPython() + elif hasattr(parsed_result, "toJson"): + parsed_dict = json.loads(parsed_result.toJson()) + elif isinstance(parsed_result, dict): + parsed_dict = parsed_result + else: + display( + HTML( + f"

❌ Could not convert result. Type: {type(parsed_result)}

" + ) + ) + return + + # Extract components + document = parsed_dict.get("document", {}) + pages = document.get("pages", []) + elements = document.get("elements", []) + metadata = parsed_dict.get("metadata", {}) + + if not elements: + display( + HTML("

❌ No elements found in document

") + ) + return + + # Parse page selection + selected_pages = self._parse_page_selection(page_selection, len(pages)) + + # Display title + display(HTML("

🔍 AI Parse Document Results

")) + + # Display summary with page selection info + summary_html = self._create_summary( + document, metadata, selected_pages, len(pages) + ) + display(HTML(summary_html)) + + # Display color legend + legend_items = [] + for elem_type, color in self.element_colors.items(): + if elem_type != "default": + legend_items.append(f""" + + + {elem_type.replace("_", " ").title()} + + """) + + display( + HTML(f""" +
+ 🎨 Element Colors:
+ {"".join(legend_items)} +
+ """) + ) + + # Display annotated images with their corresponding elements (filtered by selection) + if pages: + display(HTML("

🖼️ Annotated Images & Elements

")) + + # Sort selected pages for display + sorted_selected = sorted(selected_pages) + + for page_idx in sorted_selected: + if page_idx < len(pages): + page = pages[page_idx] + + # Display the annotated image + annotated_html = self._create_annotated_image(page, elements) + display( + HTML(f"
{annotated_html}
") + ) + + # Display elements for this page immediately after the image + page_id = page.get("id", page_idx) + page_elements_html = self._create_page_elements_list( + page_id, elements + ) + display(HTML(page_elements_html)) + + except Exception as e: + display(HTML(f"

❌ Error: {str(e)}

")) + import traceback + + display(HTML(f"
{traceback.format_exc()}
")) + + +# Simple usage functions +def render_ai_parse_output(parsed_result, page_selection=None): + """Simple function to render ai_parse_document output with page selection. + + Args: + parsed_result: The parsed document result + page_selection: Optional page selection string. Examples: + - None or "all": Display all pages + - "3": Display only page 3 + - "1-5": Display pages 1 through 5 + - "1,3,5": Display specific pages + - "1-3,7,10-12": Mixed format + """ + renderer = DocumentRenderer() + renderer.render_document(parsed_result, page_selection) + + +# COMMAND ---------- + +# DBTITLE 1,Debug Visualization Results +for parsed_result in parsed_results: + render_ai_parse_output(parsed_result, page_selection) diff --git a/contrib/workflow_with_ai_parse_document/src/transformations/01_parse_documents.py b/contrib/workflow_with_ai_parse_document/src/transformations/01_parse_documents.py new file mode 100644 index 00000000..297aad29 --- /dev/null +++ b/contrib/workflow_with_ai_parse_document/src/transformations/01_parse_documents.py @@ -0,0 +1,98 @@ +# Databricks notebook source +# MAGIC %md +# MAGIC # Parse Documents using ai_parse_document +# MAGIC +# MAGIC This notebook uses Structured Streaming to incrementally parse PDFs and images using the ai_parse_document function. + +# COMMAND ---------- + +# Get parameters +dbutils.widgets.text("catalog", "main", "Catalog name") +dbutils.widgets.text("schema", "default", "Schema name") +dbutils.widgets.text( + "source_volume_path", "/Volumes/main/default/source_documents", "Source volume path" +) +dbutils.widgets.text( + "output_volume_path", "/Volumes/main/default/parsed_output", "Output volume path" +) +dbutils.widgets.text( + "checkpoint_location", + "/Volumes/main/default/checkpoints/parse_documents", + "Checkpoint location", +) +dbutils.widgets.text("table_name", "parsed_documents_raw", "Output table name") + +catalog = dbutils.widgets.get("catalog") +schema = dbutils.widgets.get("schema") +source_volume_path = dbutils.widgets.get("source_volume_path") +output_volume_path = dbutils.widgets.get("output_volume_path") +checkpoint_location = dbutils.widgets.get("checkpoint_location") +table_name = dbutils.widgets.get("table_name") + +# COMMAND ---------- + +# Set catalog and schema +spark.sql(f"USE CATALOG {catalog}") +spark.sql(f"USE SCHEMA {schema}") + +# COMMAND ---------- + +from pyspark.sql.functions import col, current_timestamp, expr +from pyspark.sql.types import ( + StructType, + StructField, + StringType, + BinaryType, + TimestampType, + LongType, +) + +# Define schema for binary files (must match exact schema expected by binaryFile format) +binary_file_schema = StructType( + [ + StructField("path", StringType(), False), + StructField("modificationTime", TimestampType(), False), + StructField("length", LongType(), False), + StructField("content", BinaryType(), True), + ] +) + +# Read files using Structured Streaming +files_df = ( + spark.readStream.format("binaryFile") + .schema(binary_file_schema) + .option("pathGlobFilter", "*.{pdf,jpg,jpeg,png}") + .option("recursiveFileLookup", "true") + .load(source_volume_path) +) + +# Parse documents with ai_parse_document +parsed_df = ( + files_df.repartition(8, expr("crc32(path) % 8")) + .withColumn( + "parsed", + expr(f""" + ai_parse_document( + content, + map( + 'version', '2.0', + 'imageOutputPath', '{output_volume_path}', + 'descriptionElementTypes', '*' + ) + ) + """), + ) + .withColumn("parsed_at", current_timestamp()) + .select("path", "parsed", "parsed_at") +) + +# Write to Delta table with streaming +( + parsed_df.writeStream.format("delta") + .outputMode("append") + .option("checkpointLocation", checkpoint_location) + .option("delta.feature.variantType-preview", "supported") + .option("mergeSchema", "true") + .trigger(availableNow=True) + .toTable(table_name) +) diff --git a/contrib/workflow_with_ai_parse_document/src/transformations/02_extract_text.py b/contrib/workflow_with_ai_parse_document/src/transformations/02_extract_text.py new file mode 100644 index 00000000..29fa9097 --- /dev/null +++ b/contrib/workflow_with_ai_parse_document/src/transformations/02_extract_text.py @@ -0,0 +1,73 @@ +# Databricks notebook source +# MAGIC %md +# MAGIC # Extract Text from Parsed Documents +# MAGIC +# MAGIC This notebook uses Structured Streaming to extract clean text from parsed documents. + +# COMMAND ---------- + +# Get parameters +dbutils.widgets.text("catalog", "main", "Catalog name") +dbutils.widgets.text("schema", "default", "Schema name") +dbutils.widgets.text( + "checkpoint_location", + "/Volumes/main/default/checkpoints/extract_text", + "Checkpoint location", +) +dbutils.widgets.text("source_table_name", "parsed_documents_raw", "Source table name") +dbutils.widgets.text("table_name", "parsed_documents_text", "Output table name") + +catalog = dbutils.widgets.get("catalog") +schema = dbutils.widgets.get("schema") +checkpoint_location = dbutils.widgets.get("checkpoint_location") +source_table_name = dbutils.widgets.get("source_table_name") +table_name = dbutils.widgets.get("table_name") + +# COMMAND ---------- + +# Set catalog and schema +spark.sql(f"USE CATALOG {catalog}") +spark.sql(f"USE SCHEMA {schema}") + +# COMMAND ---------- + +from pyspark.sql.functions import col, concat_ws, expr, lit, when + +# Read from source table using Structured Streaming +parsed_stream = spark.readStream.format("delta").table(source_table_name) + +# Extract text from parsed documents +text_df = ( + parsed_stream.withColumn( + "text", + when( + expr("try_cast(parsed:error_status AS STRING)").isNotNull(), lit(None) + ).otherwise( + concat_ws( + "\n\n", + expr(""" + transform( + CASE + WHEN try_cast(parsed:metadata:version AS STRING) = '1.0' + THEN try_cast(parsed:document:pages AS ARRAY) + ELSE try_cast(parsed:document:elements AS ARRAY) + END, + element -> try_cast(element:content AS STRING) + ) + """), + ) + ), + ) + .withColumn("error_status", expr("try_cast(parsed:error_status AS STRING)")) + .select("path", "text", "error_status", "parsed_at") +) + +# Write to Delta table with streaming +( + text_df.writeStream.format("delta") + .outputMode("append") + .option("checkpointLocation", checkpoint_location) + .option("mergeSchema", "true") + .trigger(availableNow=True) + .toTable(table_name) +) diff --git a/contrib/workflow_with_ai_parse_document/src/transformations/03_extract_structured_data.py b/contrib/workflow_with_ai_parse_document/src/transformations/03_extract_structured_data.py new file mode 100644 index 00000000..7320711c --- /dev/null +++ b/contrib/workflow_with_ai_parse_document/src/transformations/03_extract_structured_data.py @@ -0,0 +1,81 @@ +# Databricks notebook source +# MAGIC %md +# MAGIC # Extract Structured Data using AI Query +# MAGIC +# MAGIC This notebook uses Structured Streaming to extract structured JSON from document text using ai_query. + +# COMMAND ---------- + +# Get parameters +dbutils.widgets.text("catalog", "main", "Catalog name") +dbutils.widgets.text("schema", "default", "Schema name") +dbutils.widgets.text( + "checkpoint_location", + "/Volumes/main/default/checkpoints/extract_structured", + "Checkpoint location", +) +dbutils.widgets.text("source_table_name", "parsed_documents_text", "Source table name") +dbutils.widgets.text("table_name", "parsed_documents_structured", "Output table name") + +catalog = dbutils.widgets.get("catalog") +schema = dbutils.widgets.get("schema") +checkpoint_location = dbutils.widgets.get("checkpoint_location") +source_table_name = dbutils.widgets.get("source_table_name") +table_name = dbutils.widgets.get("table_name") + +# COMMAND ---------- + +# Set catalog and schema +spark.sql(f"USE CATALOG {catalog}") +spark.sql(f"USE SCHEMA {schema}") + +# COMMAND ---------- + +from pyspark.sql.functions import col, concat, current_timestamp, expr, length, lit + +# Read from source table using Structured Streaming +text_stream = ( + spark.readStream.format("delta") + .table(source_table_name) + .filter( + (col("text").isNotNull()) + & (col("error_status").isNull()) + & (length(col("text")) > 100) + ) +) + +# Extract structured data using ai_query +structured_df = ( + text_stream.withColumn( + "extracted_json", + expr(""" + ai_query( + 'databricks-claude-sonnet-4', + concat( + 'Extract key information from this document and return as JSON. ', + 'Include: document_type, key_entities (names, organizations, locations), ', + 'dates, amounts, and a brief summary (max 100 words). ', + 'Document text: ', + text + ), + returnType => 'STRING', + modelParameters => named_struct( + 'max_tokens', 2000, + 'temperature', 0.1 + ) + ) + """), + ) + .withColumn("extraction_timestamp", current_timestamp()) + .select("path", "extracted_json", "parsed_at", "extraction_timestamp") +) + +# Write to Delta table with streaming +( + structured_df.writeStream.format("delta") + .outputMode("append") + .option("checkpointLocation", checkpoint_location) + .option("mergeSchema", "true") + .trigger(availableNow=True) + .toTable(table_name) +)
" in table_html: + table_html = table_html.replace("", f"") + if "" in table_html: + table_html = table_html.replace("", f"") + if "
or tags in first row + import re + + # Find first row (either in thead or tbody) + first_row_match = re.search( + r"]*>(.*?)