Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 40 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

`batchor` grew out of a recurring problem in academic research: running large datasets through LLMs in batch — reliably, reproducibly, and without reinventing the same glue code across every project. The patterns that kept emerging (durable state, typed results, safe resume after failure) were extracted into this library so they do not have to be rebuilt each time.

`batchor` is a durable OpenAI Batch runner for Python teams that want:
`batchor` is a durable provider Batch runner for Python teams that want:

- typed Pydantic results
- resumable durable runs
Expand All @@ -12,7 +12,7 @@
- library-first run controls
- a small operator CLI for CSV and JSONL jobs

It is intentionally narrow today: OpenAI-first, SQLite-first, and library-first.
It is intentionally narrow today: OpenAI-first for the CLI, SQLite-first for local durability, and library-first for provider selection beyond the default OpenAI path.

## What problem it solves

Expand All @@ -37,6 +37,7 @@ Most OpenAI Batch examples stop at "upload a JSONL file and poll until it finish
Built-in implementations:

- `OpenAIProviderConfig` + `OpenAIBatchProvider`
- `GeminiProviderConfig` + `GeminiBatchProvider` for text-only Gemini Batch jobs in the Python API
- `SQLiteStorage`
- `PostgresStorage` as an opt-in durable control-plane backend
- `MemoryStateStore`
Expand All @@ -52,6 +53,8 @@ Important constraints:
- the CLI supports file-backed inputs only
- users still own selecting and ordering input files or partitions
- the built-in CLI uses SQLite durability only
- the built-in CLI is OpenAI-only today; Gemini is exposed through the Python API
- Gemini support is text-only for now and does not build multimodal requests
- structured-output rehydration requires an importable module-level Pydantic model
- raw output artifacts are retained by default and must be exported before raw pruning
- pause/resume/cancel and incremental terminal-result APIs are library-first today
Expand Down Expand Up @@ -88,6 +91,7 @@ graph LR

subgraph providers["providers/"]
OpenAI["OpenAIBatchProvider"]
Gemini["GeminiBatchProvider"]
end

subgraph sources["sources/"]
Expand All @@ -107,6 +111,7 @@ graph LR
User -->|"start() / run_and_wait()"| BatchRunner
BatchRunner --> Run
BatchRunner --> OpenAI
BatchRunner --> Gemini
BatchRunner --> SQLite
BatchRunner --> LocalFS
Files -->|"BatchItem stream"| BatchRunner
Expand Down Expand Up @@ -142,6 +147,12 @@ Operational semantics for resume, run control, and artifact retention live in
pip install batchor
```

For Gemini Batch support, install the optional extra:

```bash
pip install "batchor[gemini]"
```

## Repo Agent Setup

This repo now includes local AI-agent scaffolding so a contributor agent can pick up repo conventions without extra global setup:
Expand All @@ -163,8 +174,8 @@ Supported Python versions:

For Python API usage, auth resolution is:

1. explicit `OpenAIProviderConfig(api_key=...)`
2. ambient `OPENAI_API_KEY`
1. explicit provider config credentials such as `OpenAIProviderConfig(api_key=...)` or `GeminiProviderConfig(api_key=...)`
2. ambient provider environment variables, currently `OPENAI_API_KEY` or `GEMINI_API_KEY`

The Python library does not auto-load `.env`.

Expand Down Expand Up @@ -193,6 +204,29 @@ run = runner.run_and_wait(
print(run.results()[0].output_text)
```

### Gemini text job

```python
from batchor import BatchItem, BatchJob, BatchRunner, GeminiProviderConfig, PromptParts


runner = BatchRunner(storage="memory")
run = runner.run_and_wait(
BatchJob(
items=[BatchItem(item_id="row1", payload="Summarize this text")],
build_prompt=lambda item: PromptParts(prompt=item.payload),
provider_config=GeminiProviderConfig(
model="gemini-2.5-flash",
api_key="YOUR_GEMINI_API_KEY",
),
)
)

print(run.results()[0].output_text)
```

Gemini support currently builds text-only `GenerateContent` batch requests. It uses Gemini JSONL `key` values internally while keeping `batchor`'s durable item and attempt tracking unchanged.

### Structured output

```python
Expand Down Expand Up @@ -244,6 +278,8 @@ Structured-output models are validated up front against the OpenAI strict-schema

If you need a field to be optional in Python, model it as nullable in the schema shape OpenAI accepts rather than relying on omitted required fields.

The same `structured_output=` API is available with `GeminiProviderConfig`; batchor sends the schema through Gemini `generation_config.response_json_schema` and validates the returned JSON text with the same Pydantic model.

### Rehydrate a durable run

```python
Expand Down
10 changes: 8 additions & 2 deletions docs/design_docs/ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ graph TB
subgraph providers["providers/"]
BatchProvider["BatchProvider (ABC)"]
OpenAIProvider["OpenAIBatchProvider"]
GeminiProvider["GeminiBatchProvider"]
ProviderRegistry
end

Expand Down Expand Up @@ -91,6 +92,7 @@ graph TB
MemoryStateStore -.->|implements| StateStore
BatchRunner -->|submits/polls| BatchProvider
OpenAIProvider -.->|implements| BatchProvider
GeminiProvider -.->|implements| BatchProvider
BatchRunner -->|stores artifacts| ArtifactStore
LocalArtifactStore -.->|implements| ArtifactStore
BatchRunner -->|creates via| ProviderRegistry
Expand Down Expand Up @@ -120,6 +122,10 @@ The public runtime model centers on four types:
one logical source, while callers remain responsible for selecting and ordering
the child sources up front.

Provider adaptation is intentionally concentrated behind `BatchProvider`.
The runtime stores one durable internal custom identifier per item attempt, while each provider maps that identifier to its own request JSONL field. OpenAI uses `custom_id`; Gemini uses `key`.
Provider hooks also own response-text extraction so structured-output parsing can stay generic across provider payload shapes.

## Main user-facing flow

The normal public flow is:
Expand All @@ -136,7 +142,7 @@ Internally that expands to:
3. Claim a bounded submission window from pending items.
4. Build or replay request JSONL rows.
5. Persist request artifacts before upload.
6. Submit one or more OpenAI batch files.
6. Submit one or more provider batch files.
7. Poll active batches.
8. Download output/error files.
9. Parse terminal item results back into the state store.
Expand Down Expand Up @@ -190,7 +196,7 @@ sequenceDiagram
BatchRunner->>Provider: upload_input_file(local_path)
Provider-->>BatchRunner: remote_file_id
BatchRunner->>Provider: create_batch(remote_file_id)
Provider-->>BatchRunner: BatchRemoteRecord (status=validating)
Provider-->>BatchRunner: BatchRemoteRecord (status=submitted/validating)
BatchRunner->>StateStore: register_batch()
BatchRunner->>StateStore: mark_items_submitted()
end
Expand Down
109 changes: 109 additions & 0 deletions docs/design_docs/GEMINI_BATCHING.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# Gemini Batching

This document describes the Gemini-specific behavior inside `batchor`.

Status: implemented for text-only Python API jobs. Multimodal request construction is intentionally out of scope for now.

## Current behavior

Gemini support is provided by:

- `GeminiProviderConfig`
- `GeminiBatchProvider`
- the default `ProviderRegistry`

The provider is available through the Python API after installing the optional SDK extra:

```bash
pip install "batchor[gemini]"
```

The CLI remains OpenAI-only today.

## Request construction

The built-in provider converts each prepared item into one Gemini Batch JSONL row:

```json
{"key":"row1:a1","request":{"contents":[{"parts":[{"text":"Summarize this text"}]}]}}
```

Important details:

- `key` is the provider-facing correlation identifier.
- the runtime still stores the same durable attempt identifier internally as `custom_id`
- `PromptParts.prompt` becomes a text part under `request.contents`
- `PromptParts.system_prompt` becomes `request.system_instruction`
- `GeminiProviderConfig.generation_config` is copied into each request when provided

Structured-output jobs add:

```json
{
"generation_config": {
"response_mime_type": "application/json",
"response_json_schema": {"type": "object"}
}
}
```

The schema is generated from the same `structured_output=` Pydantic model used by OpenAI jobs and is validated locally before submission.

## Authentication

Authentication resolution is:

1. explicit `GeminiProviderConfig(api_key=...)`
2. ambient `GEMINI_API_KEY`

The provider builds the Google GenAI client lazily so importing `batchor` or constructing the default provider registry does not require the optional `google-genai` dependency.

## Batch lifecycle

The provider follows the Google Gemini Batch/File API flow:

1. upload the prepared JSONL file through the File API with `mime_type="jsonl"`
2. create a batch with `client.batches.create(model=..., src=..., config={"display_name": ...})`
3. poll with `client.batches.get(name=batch_id)`
4. treat `JOB_STATE_SUCCEEDED` and `JOB_STATE_PARTIALLY_SUCCEEDED` as completed
5. read the result file name from `dest.file_name`
6. download the result file through `client.files.download(file=...)`

Terminal failure states map to batchor's generic terminal batch statuses:

- `JOB_STATE_FAILED` -> `failed`
- `JOB_STATE_CANCELLED` -> `cancelled`
- `JOB_STATE_EXPIRED` -> `expired`

Unknown or active states are normalized to `submitted`.

## Response parsing

Gemini output rows are split by `key`.

Rows with a provider `error` field, or without a JSON-object `response`, are treated as item errors.
Successful rows are expected to include Gemini response payloads such as:

```json
{"key":"row1:a1","response":{"candidates":[{"content":{"parts":[{"text":"done"}]}}]}}
```

Text extraction checks `response.text` first, then concatenates `response.candidates[].content.parts[].text`.
Structured-output jobs then parse the extracted text as JSON and validate it with the requested Pydantic model.

## Durable replay

Request artifacts preserve the provider-specific JSONL row. On retry or fresh-process resume, the runtime reloads the persisted row and asks the provider to replace only the correlation identifier. That means Gemini retries preserve the original `request` payload while updating `key` from, for example, `row1:a1` to `row1:a2`.

## Current limits

- Gemini support is Python API only.
- Gemini request construction is text-only.
- multimodal File API references are not generated by `batchor` yet.
- Gemini-specific enqueue-limit controls are not implemented yet; generic request-count and request-file-size chunking still apply.
- live Gemini smoke tests are manual/TBD; automated coverage uses fake Gemini clients.

## References

- [Gemini Batch API](https://ai.google.dev/gemini-api/docs/batch-mode)
- [Gemini structured output](https://ai.google.dev/gemini-api/docs/structured-output)
5 changes: 3 additions & 2 deletions docs/design_docs/OPENAI_BATCHING.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@

This document describes the OpenAI-specific behavior inside `batchor`.

`batchor` is not a generic batch abstraction with many first-party providers yet. The OpenAI path is the primary implemented path, so a lot of runtime behavior is defined around OpenAI Batch semantics.
OpenAI remains the default and most feature-complete provider path. Gemini has its own design note in [`GEMINI_BATCHING.md`](GEMINI_BATCHING.md), while this page focuses only on OpenAI Batch semantics.

## Current behavior

The Python API and CLI both support OpenAI. The CLI is OpenAI-only today.

## Request construction

The built-in provider converts each prepared item into one OpenAI Batch JSONL request row.
Expand Down Expand Up @@ -174,7 +176,6 @@ Provider-side remote batch cancellation is not implemented in v1.

## Current limits

- only the built-in OpenAI Batch provider path is implemented
- the docs do not yet provide a full capability matrix across all OpenAI endpoint features
- artifact storage is still local-filesystem-only

Expand Down
3 changes: 2 additions & 1 deletion docs/design_docs/ROADMAP.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@ This file tracks important work that should be explicit in the extracted reposit
- extend resumable ingestion beyond the built-in CSV/JSONL sources
- add automated retention windows on top of the explicit export/prune lifecycle
- add more input adapters beyond CSV and JSONL
- expose non-OpenAI providers through CLI workflows once provider-specific auth and flags are stable
- add CLI structured-output workflows if the importability story can stay durable and predictable

## Longer-Term Ideas

- additional providers beyond OpenAI
- additional provider coverage beyond OpenAI and text-only Gemini
- artifact store abstraction
- partial-result streaming APIs
- richer CLI or operator workflow beyond the current file-backed text-job surface
Expand Down
5 changes: 3 additions & 2 deletions docs/design_docs/STORAGE_AND_RUNS.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ Postgres is also implemented for shared control-plane state when callers explici

In-memory storage exists for tests and short-lived local runs.

The SQLite/OpenAI path is covered by the default smoke test. Postgres storage compatibility is validated in a dedicated storage-contract CI job and requires `BATCHOR_TEST_POSTGRES_DSN` for equivalent local coverage.
The SQLite/OpenAI path is covered by the default smoke test. Gemini provider wiring is covered with fake-client integration tests. Postgres storage compatibility is validated in a dedicated storage-contract CI job and requires `BATCHOR_TEST_POSTGRES_DSN` for equivalent local coverage.

## Storage responsibilities

Expand Down Expand Up @@ -164,7 +164,7 @@ Successful rehydration depends on:

Fresh-process resume also requeues any `queued_local` items back to `pending` before submission resumes.

Resume compatibility intentionally ignores non-persisted secret fields such as provider API keys.
Resume compatibility intentionally ignores non-persisted secret fields such as provider API keys. Rehydrated OpenAI runs need `OPENAI_API_KEY` when no explicit in-memory config is supplied; rehydrated Gemini runs need `GEMINI_API_KEY`.

For deterministic-source resume, the caller must also reuse the same `run_id` and provide the same source identity/fingerprint.
For composite sources, that includes the same ordered child identities; changing the child order or swapping one file changes the logical source identity.
Expand All @@ -177,6 +177,7 @@ Built-in deterministic sources currently include:
- `ParquetItemSource`

Once an item has a durable request artifact pointer, `batchor` prunes large inline request-building fields from the control-plane store and relies on the artifact for later retries.
Provider-specific request rows are preserved in those artifacts. During replay, OpenAI updates `custom_id`; Gemini updates `key`; both still map back to the same internal durable attempt identifier.

## Artifact lifecycle

Expand Down
2 changes: 2 additions & 0 deletions docs/doc-map.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ This page explains what each document is for so readers do not have to guess whi
| `design_docs/BOUNDARY_AND_PHILOSOPHY.md` | Ownership boundary between `batchor`, storage/artifacts, and user pipelines. |
| `design_docs/ARCHITECTURE.md` | Canonical runtime diagrams, package structure, main flows, and extension seams. |
| `design_docs/OPENAI_BATCHING.md` | OpenAI request construction, token budgeting, splitting, and batch polling behavior. |
| `design_docs/GEMINI_BATCHING.md` | Gemini text-only request construction, batch polling, response parsing, and current limits. |
| `design_docs/STORAGE_AND_RUNS.md` | Durable `Run` lifecycle, rehydration, checkpoints, control state, artifact retention, and operator semantics. |
| `design_docs/STORAGE_MIGRATIONS.md` | SQLite schema-versioning and migration guidance. |
| `design_docs/ROADMAP.md` | Intentionally unimplemented areas and planned work. |
Expand All @@ -38,3 +39,4 @@ This page explains what each document is for so readers do not have to guess whi
- Library-first run control now includes `pause`, `resume`, and drain-style `cancel`.
- Incremental terminal-result reads/exports are documented in the Python API and storage docs.
- Raw output/error artifact retention is now configurable per run through `ArtifactPolicy`.
- Gemini text-only Batch support is available through the Python API and default provider registry.
17 changes: 17 additions & 0 deletions docs/getting-started/installation.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,17 @@ Supported Python versions:

- `3.12`
- `3.13`
- `3.14`

## Optional provider extras

OpenAI support is installed by default. Gemini support uses Google's optional SDK dependency:

```bash
pip install "batchor[gemini]"
```

The Gemini provider is currently Python API only and text-only. The CLI remains OpenAI-focused.

## What gets installed

Expand All @@ -18,6 +29,7 @@ The package includes:
- the Python library
- the `batchor` CLI
- the built-in OpenAI provider integration
- the built-in Gemini provider integration when `batchor[gemini]` is installed
- SQLite and Postgres storage implementations

It does not provision external infrastructure for you. If you use Postgres or a shared artifact root, you still manage those resources yourself.
Expand All @@ -29,6 +41,11 @@ For Python API usage, authentication resolution is:
1. `OpenAIProviderConfig(api_key=...)`
2. `OPENAI_API_KEY`

For Gemini Python API usage, authentication resolution is:

1. `GeminiProviderConfig(api_key=...)`
2. `GEMINI_API_KEY`

The Python library does not auto-load `.env`.

The CLI loads a local `.env` as a convenience for operator usage, then resolves `OPENAI_API_KEY`.
Expand Down
Loading
Loading