Skip to content

Latest commit

 

History

History
223 lines (163 loc) · 9.24 KB

File metadata and controls

223 lines (163 loc) · 9.24 KB

JOX Pipeline

Pipeline ETL automatizado que extrai dados estruturados de boletins diários em PDF usando parsing via LLM, orquestrado no Databricks com Arquitetura Medallion.

Pipeline em execução


Visão Geral

O JOX Pipeline ingere um boletim agropecuário diário (PDF), converte para Markdown, extrai notícias e cotações estruturadas via LLM (Claude Haiku 4.5) e persiste os resultados em Delta Tables no Databricks Unity Catalog — com uma camada Gold tipada pronta para consumo em BI.

Todo o pipeline roda em um único Databricks Notebook, agendado diariamente via Databricks Workflows.

PDF (web) → Markdown (Docling) → Extração LLM (Claude Haiku) → Delta Tables → Camada Gold (BI)

Arquitetura e Decisões de Design

Arquitetura Medallion

Camada Formato Finalidade
Raw PDF + Markdown em Unity Catalog Volumes Dados-fonte imutáveis, trilha de auditoria completa
Silver Delta Tables (noticias, cotacoes) Dados estruturados com histórico append e idempotência
Gold Delta Tables (schema tipado) Colunas tipadas (DATE, STRING), schema limpo para ferramentas de BI (Power BI, DAX)

Por que um Notebook Único?

As iterações iniciais usavam 6 notebooks separados orquestrados via dbutils.notebook.run(). Isso causava:

  • restartPython() limpando o estado dos módulos entre notebooks
  • Mensagens de erro sendo engolidas nas fronteiras entre notebooks
  • Gerenciamento complexo de dependências entre cells

A abordagem consolidada em notebook único (5 sprints sequenciais) resolveu tudo isso mantendo cada sprint claramente delimitado com headers Markdown.

Por que Pydantic v2 para Validação de Schema?

A LLM retorna JSON livre. Os modelos Pydantic v2 (NewsItem, QuoteItem, ExtractionPayload) validam cada campo no momento da extração — capturando datas malformadas, campos ausentes e incompatibilidades de tipo antes dos dados chegarem à camada Delta. Um field_validator garante conformidade com datas ISO-8601.

Por que valor_rs é STRING e não DECIMAL?

Cotações frequentemente chegam como faixas (ex: "129,00/132,00"). Converter para numérico perderia essa informação. Analistas tratam o parsing em DAX/Power BI onde as regras de negócio para faixas já estão definidas.

Instalação Lazy do Docling

O Docling (PDF → Markdown) inclui PyTorch, que pode causar OOM em clusters leves. O pipeline instala sob demanda apenas antes do Sprint 2, enquanto dependências mais leves (requests, pydantic, openai) são instaladas no início.


Stack Tecnológico

Componente Tecnologia
Orquestração Databricks Notebooks + Workflows
PDF → Markdown Docling (OCR opcional)
Extração LLM Claude Haiku 4.5 via Databricks Serving Endpoint (API compatível OpenAI)
Validação de Schema Pydantic v2 (structured output, auto-correção de JSON)
Armazenamento Delta Tables no Unity Catalog / Volumes
Configuração YAML com dataclasses imutáveis (frozen)
Logging PipelineLogger customizado com métodos narrativos (section, step, detail)

Estrutura do Projeto

jox-llm-etl/
├── config/
│   └── config.yaml              # Configuração do pipeline (paths, LLM, Delta, Gold)
├── notebooks/
│   └── pipeline_jox.py          # Notebook principal do Databricks (5 sprints)
├── src/
│   ├── config/
│   │   └── settings.py          # Loader de config via dataclasses imutáveis
│   ├── domain/
│   │   ├── schemas.py           # Modelos Pydantic v2 (NewsItem, QuoteItem, ExtractionPayload)
│   │   ├── errors.py            # Tipos de erro do domínio
│   │   └── results.py           # Result pattern (ExtractionResult, WriteResult)
│   ├── downloader/
│   │   └── pdf_downloader.py    # Downloader HTTP com lógica de retry
│   ├── infrastructure/
│   │   ├── llm_adapter.py       # Adapter LLM compatível OpenAI + system prompt + reparo de JSON
│   │   ├── delta_writer.py      # Writer para Delta Tables com check de idempotência
│   │   ├── docling_adapter.py   # Wrapper de conversão Docling PDF→MD
│   │   └── markdown_reader.py   # Leitor de arquivos Markdown
│   ├── logging/
│   │   ├── logger.py            # PipelineLogger com métodos narrativos
│   │   └── formatters.py        # Formatter plain (sem ANSI — compatível Databricks)
│   ├── pipeline/
│   │   ├── ingestion_pipeline.py    # Sprint 1: Download PDF
│   │   ├── markdown_pipeline.py     # Sprint 2: Conversão PDF → Markdown
│   │   ├── extraction_pipeline.py   # Sprint 3: Extração via LLM
│   │   └── output_pipeline.py       # Sprint 4: Persistência Delta
│   ├── services/
│   │   └── path_resolver.py     # Resolução de paths baseada em data para Volumes
│   └── storage/
│       ├── file_storage.py      # Abstração de I/O de arquivos
│       └── markdown_storage.py  # Storage específico para Markdown
├── run_pipeline.py              # Exemplo de configuração Databricks Jobs API
├── requirements.txt             # Referência de dependências Python
└── .gitignore

Sprints do Pipeline

O notebook executa 5 sprints sequenciais:

Sprint 1 — Download PDF

Baixa o PDF do boletim diário da URL fonte e salva em um Unity Catalog Volume. Inclui idempotência: pula se o arquivo do dia já existir (a menos que force_overwrite=true).

Sprint 2 — Conversão para Markdown

Instala o Docling sob demanda e converte o PDF para Markdown. Preserva estruturas de tabela para parsing downstream pela LLM.

Sprint 3 — Extração via LLM

Envia o Markdown para o Claude Haiku 4.5 via Databricks Serving Endpoint com um system prompt detalhado. A resposta é parseada como JSON, validada contra modelos Pydantic, e se a validação falhar, um loop de auto-correção reenvia o erro para a LLM corrigir (até 3 tentativas).

Entidades extraídas:

  • NewsItem — data, categoria (SUÍNOS, BOVINOS, FRANGO VIVO...), texto completo
  • QuoteItem — data, grupo, produto, unidade, preço (string raw), status/tendência

Sprint 4 — Persistência em Delta Tables

Grava os DataFrames validados em Delta Tables com modo append e check de idempotência (evita linhas duplicadas para a mesma data).

Sprint 5 — Camada Gold

Cria tabelas Gold tipadas via CREATE OR REPLACE TABLE ... AS SELECT com casts explícitos (DATE, STRING). Essas tabelas são o contrato para dashboards de BI.


Configuração

Todas as configurações ficam em config/config.yaml. Seções principais:

# Fonte de dados
source:
  url: "https://jox.com.br/boletim/diario.pdf"

# Paths no Unity Catalog
storage:
  pdf_base_path: "/Volumes/<catalog>/jox/raw/pdf"
  md_base_path: "/Volumes/<catalog>/jox/raw/md"

# Delta Tables (Silver)
delta_output:
  catalog: "my_catalog"
  schema: "jox"

# LLM (Databricks Serving Endpoint)
llm:
  model: "databricks-claude-haiku-4-5"
  databricks_host: "https://adb-<workspace-id>.<region>.azuredatabricks.net"
  databricks_token_scope: "your-scope"
  databricks_token_key: "your-token-key"

# Camada Gold
gold_output:
  catalog: "my_catalog"
  schema: "gold_jox"

Gerenciamento de Secrets

O token da LLM é obtido em runtime via dbutils.secrets.get(). Nenhuma credencial é armazenada no código ou nos arquivos de configuração.


Deploy

Pré-requisitos

  • Workspace Databricks com Unity Catalog habilitado
  • Serving endpoint com Claude Haiku 4.5 (ou qualquer modelo compatível OpenAI)
  • Secret scope no Databricks com o token da API
  • Cluster com Python 3.10+

Passo a Passo

  1. Importe o repositório no Workspace do Databricks via Git Folder
  2. Atualize config/config.yaml com seu catálogo, schema, host e secret scope
  3. Crie um secret no Databricks:
    databricks secrets put-secret --scope your-scope --key your-token-key
  4. Execute notebooks/pipeline_jox a partir do Workspace
  5. (Opcional) Agende como job diário via Databricks Workflows — veja run_pipeline.py para exemplo de configuração via Jobs API

Schema de Saída

Silver — <catalog>.jox.noticias

Coluna Tipo Descrição
data STRING Data do boletim (formato ISO)
tipo STRING Categoria da notícia
noticia STRING Texto completo da notícia

Silver — <catalog>.jox.cotacoes

Coluna Tipo Descrição
data STRING Data do boletim (formato ISO)
grupo STRING Grupo do produto
produto STRING Nome do produto
unid STRING Unidade de medida
valor_rs STRING Preço em R$ (pode conter faixas)
stat STRING Indicador de tendência

Gold — Tabelas tipadas com cast para DATE, mesmas colunas, sem metadados internos.


Licença

Este projeto é disponibilizado como referência de portfólio. Sinta-se à vontade para usar como inspiração para seus próprios pipelines.