Make SDK ingestion docs agent-friendly#387
Conversation
96faf6f to
5f0058a
Compare
Co-authored-by: Isaac Signed-off-by: elenagaljak-db <elena.galjak@databricks.com>
teodordelibasic-db
left a comment
There was a problem hiding this comment.
Public doc examples in record_types.rs and arrow_stream.rs still ends an ingest_* call with an immediate stream.wait_for_offset(offset).await?;.
Per location, the line to change and the fix:
rust/sdk/src/record_types.rs:59(ProtoBytesexample) — we can drop the trailing
stream.wait_for_offset(offset).await?;in favor ofstream.flush().await?;, or add a one-line note that per-record waiting is for low-volume confirmation only.rust/sdk/src/record_types.rs:83(JsonStringexample) — same change.rust/sdk/src/record_types.rs:108(ProtoMessageexample) — same change.rust/sdk/src/record_types.rs:141(JsonValueexample) — same change.rust/sdk/src/arrow_stream.rs:190(ZerobusArrowStreamtype-level example) — the example ingests one batch then waits on its offset; lead with the loop-thenflush()pattern instead, matchinglib.rs, and keep the single-batch wait only as the labeled low-volume case.rust/sdk/src/arrow_stream.rs:1221(ingest_batchmethod example) — same: replace the immediate
wait_for_offset(offset)withflush()once at the end, or label it as the confirm-a specific-batch case.
| 6. **Enable Recovery** - Always set `Recovery: true` in production environments | ||
| 7. **Use Batch Ingestion** - For high throughput, ingest many records before calling `Flush()` | ||
| 8. **Monitor Errors** - Log and alert on non-retryable errors | ||
| 9. **Use Protocol Buffers for Production** - More efficient than JSON for high-volume scenarios |
There was a problem hiding this comment.
Duplicate number 9 in list below, this line should be followed with 10/11/12.
| } | ||
| log.Printf("Batch queued with offset: %d", batchOffset) | ||
| // ... ingest more batches ... | ||
| stream.Flush() // wait for everything at the end |
There was a problem hiding this comment.
Other Flush() and WaitForOffset() calls in this README check for errors; this lone one does not. We can change to something like:
if err := stream.Flush(); err != nil {
log.Fatal(err)
}| **Idiomatic flow:** | ||
|
|
||
| ```python | ||
| for record in records: |
There was a problem hiding this comment.
The Quick Start above this is async (await ... ; asyncio.run(main())), so maybe we should change this section to use async as well.
for record in records:
await stream.ingest_record_offset(record)
await stream.flush()| **Confirming a specific record** (waiting on the last offset confirms all prior records): | ||
|
|
||
| ```python | ||
| for record in records: |
There was a problem hiding this comment.
ditto:
for record in records:
offset = await stream.ingest_record_offset(record)
await stream.wait_for_offset(offset) # confirm the run before continuing| /// // Wait for both to be acknowledged | ||
| /// await stream.waitForOffset(offset2); | ||
| /// // High-throughput pattern: ingest in a loop, wait once at the end. | ||
| /// let lastOffset; |
There was a problem hiding this comment.
If records is empty, lastOffset is undefined and waitForOffset(undefined) runs instead of being a
no-op. Inconsistent with the neighboring ingestRecordsOffset() example (line 486), which correctly
guards with a null sentinel. Either prefer await stream.flush(); here, or guard:
let lastOffset: bigint | null = null;
for (const record of records) lastOffset = await stream.ingestRecordOffset(record);
if (lastOffset !== null) await stream.waitForOffset(lastOffset);| /// | ||
| /// ```typescript | ||
| /// const offsets = []; | ||
| /// let lastOffset; |
| stream.wait_for_offset(offset).await?; | ||
| } | ||
| // Returns Some(offset) for non-empty batches, None for empty batches. | ||
| // Queue many batches this way; flush() once when done. |
There was a problem hiding this comment.
Since the comment says "flush() once when done" we can add a trailing stream.flush().await?; so the fragment is self-contained.
| ```typescript | ||
| // High-throughput pattern: send many, wait once | ||
| // Idiomatic flow: ingest in a loop, then flush once | ||
| const offset1 = await stream.ingestRecordOffset(record1); // Resolves immediately |
There was a problem hiding this comment.
offset1 is dead. We can use a small loop with a single lastOffset (matching the JSDoc in src/lib.rs) or
drop the first binding.
| ## Client code patterns (performance) | ||
|
|
||
| When writing or reviewing client/example code, follow the idiomatic async flow. | ||
| `IngestRecordOffset()` (and `IngestRecordsOffset()` / `IngestBatch()`) return as |
There was a problem hiding this comment.
IngestBatch is a method on ZerobusArrowStream (go/arrow_stream.go:168), not on ZerobusStream where IngestRecordOffset/IngestRecordsOffset live. This reads as if all three are on the same type. We should maybe drop IngestBatch() from the list or note it belongs to the Arrow stream. Flagged by LLM so probably useful.
| 5. **Use Protocol Buffers for production**: Protocol Buffers (the default) provides better performance and schema validation. Use JSON only when you need schema flexibility or for quick prototyping. | ||
| 6. **Store credentials securely**: Use environment variables, never hardcode credentials | ||
| 7. **Use batch ingestion**: For high-throughput scenarios, use `ingestRecordsOffset()` instead of individual `ingestRecordOffset()` calls | ||
| 8. **Ingest in a loop, then `flush()`**: `ingestRecordOffset()` / `ingestRecordsOffset()` resolve as soon as the record is queued; the SDK sends and tracks acknowledgment in the background. Confirm durability with a single `flush()` (once for a bounded batch, or periodically for a long-running stream). Each ingest returns an offset, and `waitForOffset(offset)` confirms a specific record when you need it (acks are ordered, so the last offset confirms the whole run). Just avoid calling `waitForOffset()` after every record in a tight loop, since that limits throughput to one record per round-trip. |
There was a problem hiding this comment.
This repeats the same explanation as the "Acknowledgments and throughput" blockquote at line 185. We can shorten #8 to a cross-reference ("See Acknowledgments and throughput above"). Separately, other SDKs give this its own ### Acknowledgments and throughput heading (e.g. python/README.md), whereas TS uses a long > blockquote - promoting it to a heading would match the rest of the set.
|
A thing that I have noticed in READMEs is that a JSON example is always given first. We should put proto first and for json add some disclamer about throughput. |
|
I don't agree that it should be first since JSON is easier to set up to just test and its always labeled as quick start. We can emphasize more that its better to use proto/arrow for production. |
Ideally once we add support for dynamic protobuf etc. it should be as easy to setup as JSON. |
What
Reworks the ingestion documentation across all five SDKs (Rust, Python, TypeScript, Java, Go) plus the root docs so that clients — including AI coding assistants — write performant code by default.
Why
Users (and agents) were producing slow clients that call the wait-for-acknowledgment method (
wait_for_offset/waitForOffset/WaitForOffset, or.join()on a per-record future) after every ingest. Because ingestion is asynchronous and pipelined, waiting per record forces a full server round-trip before the next record is sent, limiting throughput to ~one record per round-trip. The docs were partly to blame: every SDK led with the per-record-wait example, and the high-throughput pattern was buried far below.Changes
flush()once — as the first example in every README. Per-record waiting is presented as a legitimate tool for low-volume "confirm this specific record" cases, not the default.///, Python docstrings (sync + async), TypeScript JSDoc, Java Javadoc, Go godoc — oningest_*,wait_for_offset/flushand equivalents.flush()once.ingestRecord().join()-in-a-loop ontoingestRecordOffset()+ wait-once.CLAUDE.md(root + per-SDK) gain a "client code patterns" / performance rule for in-repo agents.NEXT_CHANGELOG.mdupdated in each SDK under Documentation.Notes
wait_for_offset_internalwaits forlast_received_offset >= target).flush()/last-offset approach (no invented APIs).This pull request and its description were written by Isaac.