Skip to content

euvhmac/jox-llm-etl

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

1 Commit
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

JOX Pipeline

Pipeline ETL automatizado que extrai dados estruturados de boletins diários em PDF usando parsing via LLM, orquestrado no Databricks com Arquitetura Medallion.

Pipeline em execução


Visão Geral

O JOX Pipeline ingere um boletim agropecuário diário (PDF), converte para Markdown, extrai notícias e cotações estruturadas via LLM (Claude Haiku 4.5) e persiste os resultados em Delta Tables no Databricks Unity Catalog — com uma camada Gold tipada pronta para consumo em BI.

Todo o pipeline roda em um único Databricks Notebook, agendado diariamente via Databricks Workflows.

PDF (web) → Markdown (Docling) → Extração LLM (Claude Haiku) → Delta Tables → Camada Gold (BI)

Arquitetura e Decisões de Design

Arquitetura Medallion

Camada Formato Finalidade
Raw PDF + Markdown em Unity Catalog Volumes Dados-fonte imutáveis, trilha de auditoria completa
Silver Delta Tables (noticias, cotacoes) Dados estruturados com histórico append e idempotência
Gold Delta Tables (schema tipado) Colunas tipadas (DATE, STRING), schema limpo para ferramentas de BI (Power BI, DAX)

Por que um Notebook Único?

As iterações iniciais usavam 6 notebooks separados orquestrados via dbutils.notebook.run(). Isso causava:

  • restartPython() limpando o estado dos módulos entre notebooks
  • Mensagens de erro sendo engolidas nas fronteiras entre notebooks
  • Gerenciamento complexo de dependências entre cells

A abordagem consolidada em notebook único (5 sprints sequenciais) resolveu tudo isso mantendo cada sprint claramente delimitado com headers Markdown.

Por que Pydantic v2 para Validação de Schema?

A LLM retorna JSON livre. Os modelos Pydantic v2 (NewsItem, QuoteItem, ExtractionPayload) validam cada campo no momento da extração — capturando datas malformadas, campos ausentes e incompatibilidades de tipo antes dos dados chegarem à camada Delta. Um field_validator garante conformidade com datas ISO-8601.

Por que valor_rs é STRING e não DECIMAL?

Cotações frequentemente chegam como faixas (ex: "129,00/132,00"). Converter para numérico perderia essa informação. Analistas tratam o parsing em DAX/Power BI onde as regras de negócio para faixas já estão definidas.

Instalação Lazy do Docling

O Docling (PDF → Markdown) inclui PyTorch, que pode causar OOM em clusters leves. O pipeline instala sob demanda apenas antes do Sprint 2, enquanto dependências mais leves (requests, pydantic, openai) são instaladas no início.


Stack Tecnológico

Componente Tecnologia
Orquestração Databricks Notebooks + Workflows
PDF → Markdown Docling (OCR opcional)
Extração LLM Claude Haiku 4.5 via Databricks Serving Endpoint (API compatível OpenAI)
Validação de Schema Pydantic v2 (structured output, auto-correção de JSON)
Armazenamento Delta Tables no Unity Catalog / Volumes
Configuração YAML com dataclasses imutáveis (frozen)
Logging PipelineLogger customizado com métodos narrativos (section, step, detail)

Estrutura do Projeto

jox-llm-etl/
├── config/
│   └── config.yaml              # Configuração do pipeline (paths, LLM, Delta, Gold)
├── notebooks/
│   └── pipeline_jox.py          # Notebook principal do Databricks (5 sprints)
├── src/
│   ├── config/
│   │   └── settings.py          # Loader de config via dataclasses imutáveis
│   ├── domain/
│   │   ├── schemas.py           # Modelos Pydantic v2 (NewsItem, QuoteItem, ExtractionPayload)
│   │   ├── errors.py            # Tipos de erro do domínio
│   │   └── results.py           # Result pattern (ExtractionResult, WriteResult)
│   ├── downloader/
│   │   └── pdf_downloader.py    # Downloader HTTP com lógica de retry
│   ├── infrastructure/
│   │   ├── llm_adapter.py       # Adapter LLM compatível OpenAI + system prompt + reparo de JSON
│   │   ├── delta_writer.py      # Writer para Delta Tables com check de idempotência
│   │   ├── docling_adapter.py   # Wrapper de conversão Docling PDF→MD
│   │   └── markdown_reader.py   # Leitor de arquivos Markdown
│   ├── logging/
│   │   ├── logger.py            # PipelineLogger com métodos narrativos
│   │   └── formatters.py        # Formatter plain (sem ANSI — compatível Databricks)
│   ├── pipeline/
│   │   ├── ingestion_pipeline.py    # Sprint 1: Download PDF
│   │   ├── markdown_pipeline.py     # Sprint 2: Conversão PDF → Markdown
│   │   ├── extraction_pipeline.py   # Sprint 3: Extração via LLM
│   │   └── output_pipeline.py       # Sprint 4: Persistência Delta
│   ├── services/
│   │   └── path_resolver.py     # Resolução de paths baseada em data para Volumes
│   └── storage/
│       ├── file_storage.py      # Abstração de I/O de arquivos
│       └── markdown_storage.py  # Storage específico para Markdown
├── run_pipeline.py              # Exemplo de configuração Databricks Jobs API
├── requirements.txt             # Referência de dependências Python
└── .gitignore

Sprints do Pipeline

O notebook executa 5 sprints sequenciais:

Sprint 1 — Download PDF

Baixa o PDF do boletim diário da URL fonte e salva em um Unity Catalog Volume. Inclui idempotência: pula se o arquivo do dia já existir (a menos que force_overwrite=true).

Sprint 2 — Conversão para Markdown

Instala o Docling sob demanda e converte o PDF para Markdown. Preserva estruturas de tabela para parsing downstream pela LLM.

Sprint 3 — Extração via LLM

Envia o Markdown para o Claude Haiku 4.5 via Databricks Serving Endpoint com um system prompt detalhado. A resposta é parseada como JSON, validada contra modelos Pydantic, e se a validação falhar, um loop de auto-correção reenvia o erro para a LLM corrigir (até 3 tentativas).

Entidades extraídas:

  • NewsItem — data, categoria (SUÍNOS, BOVINOS, FRANGO VIVO...), texto completo
  • QuoteItem — data, grupo, produto, unidade, preço (string raw), status/tendência

Sprint 4 — Persistência em Delta Tables

Grava os DataFrames validados em Delta Tables com modo append e check de idempotência (evita linhas duplicadas para a mesma data).

Sprint 5 — Camada Gold

Cria tabelas Gold tipadas via CREATE OR REPLACE TABLE ... AS SELECT com casts explícitos (DATE, STRING). Essas tabelas são o contrato para dashboards de BI.


Configuração

Todas as configurações ficam em config/config.yaml. Seções principais:

# Fonte de dados
source:
  url: "https://jox.com.br/boletim/diario.pdf"

# Paths no Unity Catalog
storage:
  pdf_base_path: "/Volumes/<catalog>/jox/raw/pdf"
  md_base_path: "/Volumes/<catalog>/jox/raw/md"

# Delta Tables (Silver)
delta_output:
  catalog: "my_catalog"
  schema: "jox"

# LLM (Databricks Serving Endpoint)
llm:
  model: "databricks-claude-haiku-4-5"
  databricks_host: "https://adb-<workspace-id>.<region>.azuredatabricks.net"
  databricks_token_scope: "your-scope"
  databricks_token_key: "your-token-key"

# Camada Gold
gold_output:
  catalog: "my_catalog"
  schema: "gold_jox"

Gerenciamento de Secrets

O token da LLM é obtido em runtime via dbutils.secrets.get(). Nenhuma credencial é armazenada no código ou nos arquivos de configuração.


Deploy

Pré-requisitos

  • Workspace Databricks com Unity Catalog habilitado
  • Serving endpoint com Claude Haiku 4.5 (ou qualquer modelo compatível OpenAI)
  • Secret scope no Databricks com o token da API
  • Cluster com Python 3.10+

Passo a Passo

  1. Importe o repositório no Workspace do Databricks via Git Folder
  2. Atualize config/config.yaml com seu catálogo, schema, host e secret scope
  3. Crie um secret no Databricks:
    databricks secrets put-secret --scope your-scope --key your-token-key
  4. Execute notebooks/pipeline_jox a partir do Workspace
  5. (Opcional) Agende como job diário via Databricks Workflows — veja run_pipeline.py para exemplo de configuração via Jobs API

Schema de Saída

Silver — <catalog>.jox.noticias

Coluna Tipo Descrição
data STRING Data do boletim (formato ISO)
tipo STRING Categoria da notícia
noticia STRING Texto completo da notícia

Silver — <catalog>.jox.cotacoes

Coluna Tipo Descrição
data STRING Data do boletim (formato ISO)
grupo STRING Grupo do produto
produto STRING Nome do produto
unid STRING Unidade de medida
valor_rs STRING Preço em R$ (pode conter faixas)
stat STRING Indicador de tendência

Gold — Tabelas tipadas com cast para DATE, mesmas colunas, sem metadados internos.


Licença

Este projeto é disponibilizado como referência de portfólio. Sinta-se à vontade para usar como inspiração para seus próprios pipelines.

About

Pipeline ETL automatizado que extrai dados estruturados de boletins agropecuários diários em PDF via LLM (Claude Haiku) com Arquitetura Medallion no Databricks Unity Catalog.

Resources

Stars

Watchers

Forks

Contributors

Languages