feat(connectors): add generic HTTP sink connector#2925
feat(connectors): add generic HTTP sink connector#2925mlevkov wants to merge 56 commits intoapache:masterfrom
Conversation
…nk impl Add generic HTTP sink connector for delivering consumed messages to any HTTP endpoint (webhooks, REST APIs, serverless functions). This commit establishes the crate structure, config types, and stub trait implementation. - HttpMethod enum (Get, Head, Post, Put, Patch, Delete) with Default=Post - BatchMode enum (Individual, Ndjson, JsonArray, Raw) with Default=Individual - HttpSinkConfig with 20 fields covering retry, TLS, batching, metadata - HttpSink struct with Option<Client> (built in open(), not new()) - Stub Sink trait impl (open/consume/close) with TODO markers for Commit 2 - Document runtime consume() Result discard (upstream sink.rs:585 bug) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Full implementation of the HTTP sink connector's Sink trait: open(): Build reqwest::Client from config (timeout, TLS, pool size), optional health check with configurable HTTP method. consume(): Four batch modes — individual (partial delivery on failure), ndjson (newline-delimited), json_array (single array), raw (bytes). Metadata envelope wrapping with UUID-formatted u128 IDs, base64 for binary payloads (Raw/Proto/FlatBuffer). Configurable success status codes, checksum and origin timestamp inclusion. Retry: Exponential backoff with configurable multiplier and cap. Transient errors (429/500/502/503/504) and network errors retry; non-transient errors fail immediately. Respects Retry-After header on HTTP 429. close(): Log cumulative stats (requests, delivered, errors, retries). Config resolution: All Option fields resolved to concrete values in new() following MongoDB sink pattern. Duration strings parsed with humantime. UTF-8-safe response truncation in logs. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Addresses all findings from 4-agent code review: - Cap Retry-After to max_retry_delay, use reqwest::header::RETRY_AFTER - Health check uses configured success_status_codes, applies custom headers - NDJSON trailing newline for spec compliance - Skip-and-continue on per-message serialization failure (ndjson/json_array) - MAX_CONSECUTIVE_FAILURES=3 threshold in individual/raw modes - Direct simd_json→serde_json structural conversion (ported from ES sink) - Verbose consume() log downgraded to debug level - Explicit error on response body read failure - Empty URL validation with Error::InitError - UUID format documented as non-RFC-4122 - Contradictory config warnings (Raw+metadata, GET/HEAD+batch) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…CR round 2) Round 2 double-review findings: - CRITICAL: JSON array batch serialization error now logs batch size context - HIGH: success_status_codes validated non-empty in open() (prevents retry storms) - HIGH: Partial delivery logs separate HTTP failures vs serialization errors - HIGH: saturating_sub prevents usize underflow in remaining-messages calc - MEDIUM: Skip count logged on ndjson/json_array failure path (not just success) - MEDIUM: payload_to_json documented as defensive (all current variants infallible) - LOW: Raw/FlatBuffer match arms merged in payload_to_json Deferred (documented, not bugs): - Retry-After HTTP-date format (needs httpdate dependency, out of scope for v1) - Payload::Proto raw mode semantic inconsistency (follows SDK try_into_vec behavior) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Example configuration with all plugin_config fields documented. Follows the MongoDB/PostgreSQL sink config.toml pattern. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…known limitations Follows MongoDB sink README structure: Try It, Quick Start, Configuration, Batch Modes, Retry Strategy, Example Configs, Known Limitations. Documents 3 deferred review findings and 2 runtime issues as known limitations. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Tests cover: - Config resolution (defaults, overrides, backoff clamp, invalid duration fallback) - Duration parsing (valid strings, None fallback) - HttpMethod serde (uppercase serialize/deserialize, invalid rejection) - BatchMode serde (snake_case serialization) - Content-type mapping for all 4 batch modes - UUID formatting (zero, max, specific grouping) - UTF-8-safe truncation (short, long, multibyte) - Payload conversion (JSON, Text, Raw, FlatBuffer, Proto) - Metadata envelope (with/without metadata, checksum, origin_timestamp) - Retry delay computation (base, exponential backoff, max cap) - Transient status classification (429/5xx vs 4xx) - owned_value_to_serde_json (null, bool, int, f64, NaN, infinity, nested) - TOML config deserialization (minimal, full, invalid method/batch_mode) - open() validation (empty URL, invalid URL, empty success_status_codes, valid) Adds toml as dev-dependency for config deserialization tests. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Tests: - Add iggy_timestamp assertion to metadata envelope test - Add negative assertions for absent checksum/origin_timestamp by default - Strengthen multibyte truncation test with concrete expected value - Add raw mode + include_metadata invariant test (47 tests total) Docs: - Fix README retry sequence (attempt 1 is retry_delay, not immediate) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add 6 end-to-end integration tests covering all batch modes and metadata behavior of the HTTP sink connector. Tests use WireMock container as a programmable HTTP endpoint and verify received requests via admin API. Tests: - individual_json_messages_delivered_as_separate_posts - ndjson_messages_delivered_as_single_request - json_array_messages_delivered_as_single_request - raw_binary_messages_delivered_without_envelope - metadata_disabled_sends_bare_payload - individual_messages_have_sequential_offsets Fixture variants: Individual, NDJSON, JsonArray, Raw, NoMetadata Following MongoDB sink integration test patterns. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
CRITICAL fixes: - C1: SSRF prevention — URL scheme validation (http/https only) in open() - C2: Header validation — reject invalid header names/values at init, not per-request - C3: O(1) retry clones — send_with_retry takes bytes::Bytes instead of Vec<u8> HIGH fixes: - H1: Content-Type deduplication — filter user-supplied Content-Type in request_builder() - H3: Skipped message accounting — abort path now records skipped messages in errors_count TEST fixes: - T1: Content-Type assertions use expect() instead of silent if-let skip - T2: Exact count assertions (==) instead of >= that masks over-delivery - T3: Offset test checks contiguous ordering, not absolute base-0 assumption - T4: New test for consume() before open() returns InitError DOCS fixes: - D1: Disambiguate sink.rs:585 → runtime/src/sink.rs:585 - D2: send_individual doc mentions MAX_CONSECUTIVE_FAILURES abort behavior 9 new unit tests (47 → 56), all passing, zero clippy warnings. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
7 findings from 4-agent double-review: R2-1 (HIGH): WireMockRequest::header() now actually case-insensitive per RFC 7230 R2-2 (HIGH): Offset test uses explicit unwrap_or_else instead of silent filter_map R2-3 (MEDIUM): URL parse error now includes the actual parse error message R2-4 (MEDIUM): Abort accounting uses saturating_sub + debug_assert for defensive safety R2-5 (MEDIUM): open() warns when user Content-Type header will be overridden by batch_mode R2-6 (MEDIUM): Batch modes (ndjson/json_array) now count all undelivered messages in errors_count R2-7 (LOW): Content-Type test improved with set-based assertion and documented limitation Deferred (pre-existing, not regressions): - parse_duration silent fallback (requires SDK contract change) - Runtime discards consume() errors (upstream issue apache#2927) - Retry-After HTTP-date format (nice-to-have) - NaN/Infinity to null (documented, matches ES sink) 56 unit tests passing, zero clippy warnings. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…yment patterns New sections: - Use Cases: webhook delivery, REST API ingestion, serverless triggers, IoT relay, multi-service fan-out, observability pipeline - Authentication: Bearer, API key, Basic auth, multi-header, limitations (no OAuth2 refresh, no SigV4, no mTLS) - Deployment Patterns: single destination/multi-topic, multi-destination (one connector per destination), fan-out (same topic to multiple endpoints via separate consumer groups), Docker/container deployment, environment variable overrides for secrets - Updated Known Limitations: added per-topic routing, OAuth2, env var expansion; linked upstream issues apache#2927 and apache#2928 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Configure reqwest client with tcp_keepalive(30s) and pool_idle_timeout(90s) to detect dead connections behind cloud load balancers and clean up stale idle connections. Add Performance Considerations section to README covering batch mode selection, memory implications, connection pooling, and retry impact. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add connector_multi_topic_stream seed function that creates one stream with two topics. Add HttpSinkMultiTopicFixture that subscribes to both topics via the STREAMS_0_TOPICS env var. The test sends messages to each topic and verifies all arrive at WireMock with correct iggy_topic metadata, demonstrating the multi-topic single-connector deployment pattern. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Explain what "deploying multiple instances" means tactically — each instance is a separate OS process with its own config directory, not a config option within one process. Add a clear table showing which deployment patterns are achievable today vs. not, and annotate each deployment pattern section with its achievability status. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…tails Add links to runtime source code (sink.rs, sdk/src/sink.rs) explaining how the connector runtime spawns one task per topic, uses DashMap for plugin instance multiplexing, and calls consume() sequentially. Expand connection pooling section with reqwest client sharing semantics, TCP keep-alive rationale for cloud LB idle timeouts, and cross-process pool isolation. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…structure Clarify that the connector does not require any particular message structure on input — it receives raw bytes from the Iggy runtime. The metadata envelope is added by the sink on the way out, not expected on the way in. Includes ASCII flow diagram, schema interpretation table, and guidance for publishing existing structs in any serialization format. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…lidation, docs Address 15 findings from 4-agent code review (CR round 3): CRITICAL: - C1: Add errors_count for payload-size-exceeded in ndjson/json_array batch modes HIGH: - H1: Remove HTTP-sink-specific constants from shared harness (seeds.rs), create second topic inline in multi-topic integration test - H2: Add errors_count for json_array whole-batch serialization failure - H3: Replace fragile line-number references with function names in README MEDIUM: - M1: Prevent panic in compute_retry_delay on f64 overflow (extreme backoff) - M2: Validate status codes in open() — reject codes outside 100-599 - M3: Fix retry math in README (3 attempts not 4, include timeout) - M4: Fix GCP timeout comment (60-350s -> AWS ALB ~60s, GCP ~600s) - M5: Remove specific RSS claim from README - M6: Clarify FFI boundary in consume() error log and README - M7: Warn on non-integer Retry-After header instead of silently ignoring - M8: Remove unused dashmap/once_cell direct dependencies - M9: Replace magic string match arms with constants in integration test LOW: - L1: Extract shared send_batch_body() helper from ndjson/json_array - L2: Add last_success_timestamp to close() stats log - L3: Add credential placeholder warning comment in config.toml Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…clarity Change send_batch_body parameter from Vec<u8> to Bytes — makes the zero-copy intent explicit and idiomatic. Callers wrap with Bytes::from() at the call site after payload size checks. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Address 6 findings from double-review round 4:
F1 (HIGH): Narrow status code validation from 200-599, rejecting HTTP 1xx
informational codes that are not valid terminal response codes.
F2 (HIGH): Warn on non-UTF-8 Retry-After header values instead of
silently dropping them via .to_str().ok().
F3 (HIGH): Add debug_assert!(count > 0) in send_batch_body() for
defense-in-depth against empty batch calls.
F4 (MEDIUM): Replace line number reference (runtime/src/sink.rs:585)
with function name (process_messages()) in consume() doc comment.
F5 (MEDIUM): Clarify README retry labels — "Initial request" + "Retry 1/2/3"
instead of ambiguous "Attempt 1/2/3".
F6 (MEDIUM): Warn in constructor when retry_delay > max_retry_delay,
since all delays will be silently capped.
New test: given_informational_status_code_should_fail_open (60 total).
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Apply rigorous test documentation standards to all 7 integration tests: Module-level documentation (~130 lines): - Connector architecture diagram (test code → runtime → sink → WireMock) - Runtime model explanation (1 process = 1 config = 1 plugin, per-topic tasks) - What each test validates (7-test summary) - Full-stack infrastructure details (iggy-server, runtime, WireMock, fixtures) - Fixture architecture and env var override pattern - Running instructions with prerequisites - Success criteria, known limitations, related documentation - Test history with code review changes Per-test documentation (40-65 lines each): - Purpose, Behavior Under Test, Why This Matters - Numbered Test Flow steps - Key Validations with rationale - Related Code with function names (not line numbers) - Test History where applicable (multi-topic H1/M9 changes) Inline commentary: - Step comments explaining each phase of the test - Assertion messages with expected vs actual context Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Fix missing HttpSinkMultiTopicFixture re-export in fixtures/mod.rs that caused E0432 + cascading E0282 type inference errors. Remove dead re-exports (HttpSinkWireMockContainer, WireMockRequest) from http/mod.rs. Add #[allow(dead_code)] to reset_requests() test utility. Apply rustfmt across lib.rs and http_sink.rs integration tests. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #2925 +/- ##
=============================================
- Coverage 71.74% 59.37% -12.38%
Complexity 943 943
=============================================
Files 1121 1123 +2
Lines 93800 94978 +1178
Branches 71124 72314 +1190
=============================================
- Hits 67301 56390 -10911
- Misses 23863 36068 +12205
+ Partials 2636 2520 -116
🚀 New features to boost your workflow:
|
…s, multi-topic test - typos: change "DELET" to "DELEET" in doc comment and tests (false positive) - markdown: add `text` language to 3 fenced code blocks, fix table separator spacing - rustfmt: apply CI-matching formatting to container.rs - licenses: add iggy_connector_http_sink to DEPENDENCIES.md - multi-topic test: add connector_multi_topic_stream seed so both topics exist before connector runtime starts (runtime health check requires all configured topics — was timing out after 1000 retries in CI) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
hubcio
left a comment
There was a problem hiding this comment.
overall good direction, please fix the CI (markdown lint) and rest of the comments. looks pretty solid to me.
|
Thank you @spetz — excellent feedback across the board. Here's how I've grouped the 12 comments for implementation: Grouped by themeA. Constants & naming (straightforward)
B. Strongly typed structs + enum methods
C. Method signature cleanup
D. Design decisions (want to discuss)
Will work through A → B → C → D in order. Addressing #10 and #7 last since they need alignment. |
|
Please note the limitations introduced by the shift to 1. The hand-rolled retry loop previously parsed integer Impact: If a downstream endpoint returns 2. Retry count no longer tracked as a connector-level metric The 3. The These are acceptable trade-offs for the simplification (120 → 40 lines in |
Aligns with Rust naming conventions (NDJSON is an acronym). Snake_case serialization now produces "nd_json" instead of "ndjson". Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Introduces FIELD_* and ENCODING_* constants for all JSON field names used in payload_to_json() and build_envelope(). Eliminates magic strings. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Uses strum_macros::Display for human-readable batch mode names in logs. Removes mode_name parameter from send_per_message() and batch_mode parameter from send_batch_body() — both now use self.batch_mode. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Content-type is intrinsic to the batch mode, not the sink. Simplifies call sites and test — no longer need to construct a full HttpSink to test content-type mapping. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Introduces MetadataEnvelope, IggyMetadata, EncodedPayload, and EncodedHeader structs. Rewrites build_envelope() and payload_to_json() to construct typed values instead of json! macros. FIELD_* constants moved to test module for assertion use. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Replaces string literal field lookups in test assertions with named constants for consistency and single-source-of-truth field names. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Removes 22 section separator comments. Moves parse_duration() below impl HttpSink. Moves test helper functions (given_*) after test functions (callers before callees). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Adds fn client() helper that returns the initialized client or an error. Removes client: &reqwest::Client parameter from 7 methods (send_with_retry, send_per_message, send_individual, send_batch_body, send_ndjson, send_json_array, send_raw). Each calls self.client() internally. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
spetz confirmed "simple form without dashes." Replaces uuid::Uuid::new_v8()
with format!("{:032x}"). Removes uuid crate dependency. IggyMetadata.iggy_id
stays String.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Replaces ~120 lines of manual retry loop with reqwest-middleware stack: TracingMiddleware for request spans, RetryTransientMiddleware with ExponentialBackoff for retry logic. Adds HttpSinkRetryStrategy that respects user-configured success_status_codes — codes in the success set are never retried (even 429). Logs warning when 429 + Retry-After header is present but middleware uses computed backoff. Changes: - client type: reqwest::Client → ClientWithMiddleware - build_request returns reqwest_middleware::RequestBuilder - retry_backoff_multiplier: f64 → u32 (matches runtime pattern) - Removes: compute_retry_delay, parse_retry_after, is_transient_status - Removes: retries_count field and stats - send_with_retry shrinks from ~120 to ~40 lines Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
CRITICAL fixes:
- build_envelope() now returns Result instead of unwrap_or(Null) —
serialization errors are propagated to callers' skip-and-count logic
instead of silently sending null payloads
- retry_delay > max_retry_delay now swaps values instead of panicking
in ExponentialBackoff::retry_bounds() assert
HIGH fixes:
- Retry-After warn message uses actual status code instead of
hardcoded "429" (503 with Retry-After was mislogged)
- Doc comment retry_backoff_multiplier default: 2.0 → 2 (matches u32)
- Middleware errors now logged at error! with {:#} format before
mapping to flat Error string (preserves retry count context)
- requests_sent renamed to send_attempts (middleware retries make
the counter track logical sends, not physical HTTP requests)
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- batch_mode "ndjson" → "nd_json" throughout - iggy_id UUID format → 32-char hex (no dashes) - retry_backoff_multiplier: f64 2.0 → u32 2 - Retry strategy section: reqwest-middleware + custom strategy - Retry-After: logged as warning, not used for backoff timing - Connection pooling: reqwest::Client → ClientWithMiddleware Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Summary
Adds a generic HTTP sink connector that delivers consumed Iggy stream messages to any HTTP endpoint — webhooks, Lambda functions, REST APIs, or SaaS integrations.
individual(one request per message),nd_json(newline-delimited),json_array(single array),raw(bytes)RetryTransientMiddlewarewithExponentialBackoffand customHttpSinkRetryStrategythat respects user-configured success codesMetadataEnvelope,IggyMetadata,EncodedPayload,EncodedHeaderstructs (nojson!macros)ClientWithMiddlewarewrapping reqwest with TCP keep-alive (30s), pool idle timeout (90s), configurable max connectionsdanger_accept_invalid_certsfor dev environmentsBatchModevariants have human-readable Display for logsFiles Changed
sinks/http_sink/src/lib.rssinks/http_sink/README.mdsinks/http_sink/Cargo.tomlsinks/http_sink/config.tomlfixtures/http/container.rsfixtures/http/sink.rshttp/http_sink.rsArchitecture
Code Review History
Round 5 (maintainer review — hubcio) key changes:
.clone()on payloads in all send methodssend_per_message()shared by individual/raw modesHeaderMapinopen()— avoid per-request header parsingHashSetforsuccess_status_codes— O(1) hot-path lookupiggy_headersin metadata envelopeopen()consume()Round 8 (post spetz review — 12 items addressed in 10 commits + 1 remediation):
BatchMode::Ndjson→NdJson, serializes as"nd_json"FIELD_*,ENCODING_*) replacing magic stringsstrum::DisplayonBatchMode, removedmode_name/batch_modestring paramscontent_type()moved fromHttpSinktoBatchModeimpljson!macros → strongly typedMetadataEnvelope/IggyMetadata/EncodedPayload/EncodedHeaderclientparameter threading removed —fn client()helperuuidcrate removed)reqwest-middleware(RetryTransientMiddleware+HttpSinkRetryStrategy)retry_backoff_multiplier:f64→u32(matchesExponentialBackoff::base())retries_countfield removed (retries transparent inside middleware)Round 8 remediation (2 CRITICAL, 4 HIGH):
build_envelope()returnsResult— no moreunwrap_or(Null)silent data lossretry_delay > max_retry_delayauto-swaps to preventExponentialBackoffpanic{:#}for full retry contextrequests_sent→send_attempts(middleware retries make counter ambiguous)default: 2.0→default: 2for u32 backoff multiplierDeferred (tracked in issues)
Error::HttpRequestFailed— #2927Test Plan
individual_json_messages_delivered_as_separate_posts— per-message delivery + envelopendjson_messages_delivered_as_single_request— NDJSON batch modejson_array_messages_delivered_as_single_request— JSON array batch moderaw_binary_messages_delivered_without_envelope— raw byte passthroughmetadata_disabled_sends_bare_payload— include_metadata=falseindividual_messages_have_sequential_offsets— offset ordering integritymulti_topic_messages_delivered_with_correct_topic_metadata— 2 topics, metadata accuracycargo clippy -p iggy_connector_http_sink -- -D warnings— 0 warningscargo clippy -p integration -- -D warnings— 0 warningsRelated