[C++] Core SDK implementation and build (split 3/6 from #415)#420
[C++] Core SDK implementation and build (split 3/6 from #415)#420zlata-stefanovic-db wants to merge 11 commits into
Conversation
2b0af0d to
121ffab
Compare
121ffab to
45886f6
Compare
## Summary Core C++ SDK: the public headers (`include/`), implementation (`src/`), the Rust C FFI build glue (`cmake/`), the CMake build (library target, install / `find_package` export, sanitizer option), the `Makefile`, `.clang-format`, and the C++ CI (`ci-cpp.yml` + `push.yml` path filter). Builds the library. Part of the #415 split (4/6). ### Merge order Off `main`. **Tests (5/6) and examples (6/6) are stacked on this PR** and merge after it. The `add_subdirectory(tests)`/`(examples)` wiring is intentionally not here yet — it arrives with those PRs. Draft until the stack is reviewed. Split from #415. Signed-off-by: Zlata Stefanovic <zlata.stefanovic@databricks.com>
Warn against combining the fire-and-forget _nowait ingest APIs with a custom HeadersProvider: detached background tasks are not drained by close() or the destructor, so they can call back into the provider after the Stream releases it. Re-enable LeakSanitizer in the ASan CI job (was detect_leaks=0, which hid all leaks) with a narrow suppression file covering only the intentional once_cell/tokio runtime globals, so real wrapper leaks stay visible. Signed-off-by: Zlata Stefanovic <zlata.stefanovic@databricks.com>
44aebdb to
fd39202
Compare
- close(): keep the handle alive on a failed close so get_unacked_records() and retry still work; free only on success - ingest_*_records: reject empty batches instead of returning the FFI -2 sentinel as an offset; nowait batch variants no-op on empty - headers callback: signal a non-null error on OOM instead of failing open; reject keys/values containing embedded NUL bytes Signed-off-by: Zlata Stefanovic <zlata.stefanovic@databricks.com>
- Sdk::create(): route through the builder so the user-agent reports zerobus-sdk-cpp instead of the Rust default - callback_max_wait_time_ms: leave the FFI default in place on nullopt instead of forcing None - SdkBuilder: type the handle as CZerobusSdkBuilder* instead of void* - add missing <cstddef>/<utility> includes to public headers Signed-off-by: Zlata Stefanovic <zlata.stefanovic@databricks.com>
- installed CMake config recreates the zerobus_ffi target the export references, fixing external link failures - FFI custom command depends on Rust sources so edits trigger a rebuild - gate tests/examples options on existing subdirs; fail configure on version.hpp drift - narrow LSan suppressions to lazy-init/runtime construction - drop the redundant use_local_sdk patch from C++ CI Signed-off-by: Zlata Stefanovic <zlata.stefanovic@databricks.com>
| void ingest_json_records_nowait(const std::vector<std::string>& records); | ||
|
|
||
| /// Block until the record at `offset` has been acknowledged by the server. | ||
| void wait_for_offset(std::int64_t offset); |
There was a problem hiding this comment.
Not a blocker for this PR, just something to think about and see if it makes sense and is worth doing as a follow-up.
The C++ SDK has the pull-based ack model (ingest → offset → wait_for_offset / flush) but no push-based AckCallback like Python and Java have.
For Go this gap is fine - goroutines are userspace-scheduled by the Go runtime, multiplexed onto a small number of OS threads. Blocking a goroutine on wait_for_offset just suspends it and yields its OS thread to another goroutine; no kernel involvement. You can have thousands of goroutines blocked on ack tracking with negligible cost.
C++ std::thread maps 1:1 to an OS thread: kernel-scheduled. Blocking one on wait_for_offset parks that OS thread doing nothing until the server ack arrives. Scaling this across many concurrent streams might get expensive fast.
The idiomatic C++ solution for "notify me when something happens" is a callback, not a blocked thread. With AckCallback, the Rust tokio runtime fires the callback from its own thread pool when the ack arrives - the application thread never blocks waiting for acks and no extra OS threads are needed.
For now the pull model is functional. I suggest looking into this as a follow up to see if it makes sense to implement it (adding AckCallback to the C FFI). Some issues I see in implementation to think about:
- Language boundaries
- Callback object lifetime
| /// Ingest a batch of JSON records. Returns the offset of the last record. | ||
| /// Throws `ZerobusException` if `records` is empty. | ||
| std::int64_t ingest_json_records(const std::vector<std::string>& records); |
There was a problem hiding this comment.
What do other SDKs do in case that records is empty, do they also throw an exception?
There was a problem hiding this comment.
Most of them don't, this is a good idea to change it for consistency
There was a problem hiding this comment.
Just made this change, thank you! @irinatomic-db
`ingest_proto_records` / `ingest_json_records` threw `ZerobusException` on an empty batch. No other SDK treats an empty batch as an error: the Rust core returns `Ok(None)`, the FFI returns its `-2` sentinel with a success result, and the Go wrapper maps that to `-1` with no error. Return `-1` (a no-op) for an empty batch instead of throwing, bringing C++ in line with the other SDKs. `-1` is unambiguous since real offsets are non-negative. Update the header docs accordingly. The `_nowait` batch variants already no-op on empty, so they are unchanged. Signed-off-by: Zlata Stefanovic <zlata.stefanovic@databricks.com>
The empty-batch no-op comments in stream.cpp/stream.hpp exceeded the 80-column limit, failing the clang-format CI check. Reflow them; no code change. Signed-off-by: Zlata Stefanovic <zlata.stefanovic@databricks.com>
| /// argument-validation errors are reported (as exceptions). Ingestion errors | ||
| /// are silently dropped. The stream must outlive the background work. | ||
| /// | ||
| /// WARNING — do not combine the `_nowait` APIs with a custom | ||
| /// `HeadersProvider`. A fire-and-forget task is detached: neither `close()` | ||
| /// nor the destructor drains it, and a task that still needs fresh headers | ||
| /// may call back into the provider after the `Stream` (and the `shared_ptr` | ||
| /// keeping the provider alive) is destroyed — a use-after-free. The FFI | ||
| /// exposes no way to drain these tasks, so there is no safe ordering. With a | ||
| /// `HeadersProvider`, use only the blocking ingest variants, which complete | ||
| /// before they return. | ||
| void ingest_proto_record_nowait(const std::uint8_t* data, std::size_t len); |
There was a problem hiding this comment.
The doc warning is correct but insufficient - it relies on the caller reading and following the warning. The Rust background task is detached; neither close() nor the destructor drains it, so a task that needs fresh headers can call back through a raw pointer after provider_ is destroyed. This should be enforced in code, not just documented.
Maybe smth like:
void Stream::ingest_proto_record_nowait(...) {
if (provider_ != nullptr)
throw ZerobusException("_nowait APIs cannot be used with a custom HeadersProvider", false);
...
}There was a problem hiding this comment.
Agree with this. I discussed it with Danilo already — it’s a real issue, will do this C++ mitigation now.
The reason I didn’t do the full fix in this PR is that the root issue is at the C FFI contract, not just in C++ wrapper logic. For this beta PR we kept the warning-only mitigation to avoid cross-SDK FFI changes right now, but we should enforce it in code.
I’ll file a follow-up for _nowait + custom HeadersProvider enforcement (starting with C++ guard, then proper FFI-safe lifecycle fix).
There was a problem hiding this comment.
Made the mitigation now
| std::int64_t Stream::ingest_proto_record(const std::uint8_t* data, | ||
| std::size_t len) { | ||
| detail::ResultGuard guard; | ||
| std::int64_t offset = | ||
| zerobus_stream_ingest_proto_record(handle_, data, len, guard.ptr()); |
There was a problem hiding this comment.
The FFI returns -2 for an empty batch and -1 for errors, alongside CResult.success. C++ relies entirely on throw_if_error() and hands the raw offset straight to the caller. If the FFI ever returned a negative sentinel with success == true, the caller would get -2 as a real offset and pass it to wait_for_offset(-2). Go defends against this explicitly (if offset == -2 { return -1, nil } then if offset < 0 { ... error }). Worth adding a post-call guard:
guard.throw_if_error();
if (offset < 0)
throw ZerobusException("unexpected negative offset from FFI", false);
return offset;There was a problem hiding this comment.
Addressing this as well, thank you!
Throw when _nowait APIs are used with a custom HeadersProvider to prevent callback lifetime UAF risk. Signed-off-by: Zlata Stefanovic <zlata.stefanovic@databricks.com>
The blocking ingest_* methods returned the FFI's raw int64 straight to the caller after throw_if_error(). The FFI overloads that return value with negative sentinels (-1 error, -2 empty batch) separately from CResult.success, so a negative value arriving with success set would be handed back as a real offset and could reach wait_for_offset(-2). Add a checked_offset() helper that throws ZerobusException on a negative offset, applied to ingest_proto_record, ingest_json_record, ingest_proto_records, and ingest_json_records. The batch methods still short-circuit the empty case to -1 before the FFI call, so that path never hits the guard. Mirrors the explicit offset < 0 check Go already performs. Signed-off-by: Zlata Stefanovic <zlata.stefanovic@databricks.com>
Summary
The core of the C++ SDK: a C++17, header + static-library wrapper over the
Zerobus C FFI (
rust/ffi), which in turn wraps the Rust core. It gives C++callers the same gRPC streaming / OAuth / recovery / ingestion engine as the
other SDKs, behind an idiomatic, exception-based, RAII API. This PR contains the
public API, its implementation, the build, and CI - but not the tests or
examples (those are stacked on top; see Merge order). Arrow Flight ingestion is
not part of this PR; it is peeled into its own PR in the split.
What's in this PR
Public API -
cpp/include/zerobus/(the surface; opaque, forward-declaredFFI handles only, so
zerobus.hnever leaks into consumers):Sdk/SdkBuilder+TableProperties- connection factory and stream creation.Stream- proto/JSON ingestion: single, batched, and fire-and-forget (*_nowait),plus
flush,wait_for_offset,get_unacked_records,close.ProtoSchema- build a descriptor + encode records straight from Unity Catalogtable metadata (no
.proto/protoc).HeadersProvider- custom auth headers.ZerobusException(is_retryable()),StreamOptions,UnackedRecord,version(), and thezerobus.hppumbrella header.Implementation -
cpp/src/(the only place that includeszerobus.h):sdk.cpp,stream.cpp,proto_schema.cppforward to the C FFI;headers_callback.cppis theextern "C"trampoline.src/detail/internals:ResultGuard(CResult-ZerobusException, alwaysfreeing the C error string), the
StreamOptions- C config conversion, and thetrampoline declaration.
Build & CI:
CMakeLists.txt- library target (zerobus::zerobus), theinstall/find_package(zerobus)export with the FFI archive bundled, and theZEROBUS_SANITIZEoption (off by default).cmake/BuildRustFfi.cmakebuildslibzerobus_ffifrom local Rust source bydefault, or links a prebuilt lib via
-DZEROBUS_FFI_LIBRARY=.Makefile(build/test/lint/fmt,SANITIZE=pass-through),.clang-format.ci-cpp.yml(fmt + test + Addresspush.yml`'s path filter.Design notes
free their handle exactly once (the source is nulled on move). Errors are thrown,
never returned.
buffers / pointer arrays; the batch helpers build only the small pointer/length
arrays the C entry points need.
Merge order
Off
main. Tests and examples are stacked on this PR and merge after it. Theadd_subdirectory(tests)/(examples)wiring is intentionally omitted here andarrives with those PRs, so this branch configures and builds the library cleanly
on its own.
Part of the #415 split.
Test plan
make build- configures and builds the library (FFI from source).make lint- clang-format check +-Wall -Wextra.find_package(zerobus)and linkzerobus::zerobusfrom acmake --installtree.