Replies: 1 comment
-
Proposal: HTTP Source Connector (Webhook Gateway)SummaryAn HTTP source connector that embeds an HTTP server inside the Source plugin, accepts incoming webhook POST requests, and produces messages to Iggy topics. This is the inverse of the HTTP sink connector (#2925) and completes the HTTP integration pair. Key features:
MotivationGap in the Connector EcosystemAll three existing source connectors are poll-based — they reach out to external systems:
Missing: A push-based source that accepts incoming HTTP requests. Webhook ingestion is the dominant pattern for receiving real-time events — SaaS integrations (GitHub, Stripe, Slack), IoT devices, CI/CD callbacks, inter-service events. Without this, every Iggy user needing webhook → topic routing must build a standalone HTTP server + Iggy producer, duplicating connection management, error handling, backpressure, and graceful shutdown logic. Architecture: Push-to-Pull BridgeThe Source trait is inherently pull-based ( Three Concurrent Tasks
SDK Compatibility AnalysisThe Source SDK was designed for poll-based sources but is fully compatible with a server-style source:
Critical constraint: Background tasks must not clone the Endpoint Registry (Event-Sourced)Why Not Just Static Config?Static
DesignThe connector maintains a live endpoint registry backed by a dedicated Iggy config topic. On startup, it replays the topic from offset 0 to reconstruct state. Then subscribes for live updates. Endpoint types:
Ephemeral endpoints use a cryptographically random 32-char hex ID. The URL itself acts as a bearer token (128 bits of entropy, same as Fluvio's webhook gateway, Slack webhook URLs). Each endpoint maps to a target topic with optional per-endpoint HMAC auth. Config topic events: {"action": "register", "endpoint_id": "a3f8c2e1...", "topic": "github_events",
"auth_type": "hmac-sha256", "hmac_header": "X-Hub-Signature-256", ...}
{"action": "revoke", "endpoint_id": "a3f8c2e1...", "reason": "compromised"}
{"action": "update", "endpoint_id": "a3f8c2e1...", "auth_secret": "new_secret"}Graceful fallback: If the config topic doesn't exist, the connector operates in static-only mode (named topics from TOML, global bearer token auth). The config topic is opt-in. Multi-Instance CoordinationAll instances consume the same config topic. Since events are ordered, all instances converge to the same registry state: Consistency model: eventual (typically <100ms propagation). Acceptable for webhook endpoints — same model used by DNS and distributed caches. Lock-Free Hot PathThe HTTP request handling path must be as fast as possible — target <100μs from TCP accept to HTTP 200. Why these data structures:
No mutex anywhere between HTTP request arrival and queue enqueue. poll() Implementationasync fn poll(&self) -> Result<ProducedMessages, Error> {
loop {
let mut batch = Vec::with_capacity(self.max_batch_size);
while let Some(msg) = self.queue.pop() {
batch.push(msg.into_produced_message());
if batch.len() >= self.max_batch_size { break; }
}
if !batch.is_empty() {
return Ok(ProducedMessages { schema: Schema::Raw, messages: batch, state: None });
}
// Wait for notification or timeout
tokio::select! {
_ = self.notify.notified() => continue,
_ = tokio::time::sleep(Duration::from_millis(100)) => {
return Ok(ProducedMessages { messages: vec![], .. });
}
_ = self.cancel_token.cancelled() => {
return Ok(ProducedMessages { messages: vec![], .. });
}
}
}
}Configurationtype = "source"
key = "http"
enabled = true
name = "webhook_gateway"
path = "target/release/libiggy_connector_http_source"
# Source connectors use StreamProducerConfig (singular `topic`).
# Multi-topic routing handled internally by the connector (see Blocker 1).
[[streams]]
stream = "webhooks"
topic = "ingest"
schema = "raw"
batch_length = 100
linger_time = "5ms"
[plugin_config]
bind_address = "0.0.0.0"
port = 9090
max_body_size_bytes = 1048576 # 1MB
buffer_capacity = 10000
max_batch_size = 500
# Config topic (optional — enables endpoint registry + hot reload)
config_topic_stream = "platform"
config_topic_name = "http-source-config"
iggy_connection_string = "tcp://127.0.0.1:8090"
# Global auth (for /topics/{name} endpoints)
auth_bearer_token = "global-webhook-secret"
# Instance identity (for multi-instance setups)
instance_id = "webhook-01"
# HTTP metadata forwarding as Iggy message headers
include_http_metadata = true
forward_headers = ["X-Request-ID", "X-Correlation-ID"]
health_check_enabled = trueRequest/Response ContractNamed TopicEphemeral EndpointError Responses
HealthHMAC ValidationPer-endpoint HMAC validation over raw request body bytes (never re-serialized JSON):
Using {"action": "register", "endpoint_id": "...", "topic": "github_events",
"auth_type": "hmac-sha256", "hmac_header": "X-Hub-Signature-256",
"hmac_prefix": "sha256=", "auth_secret": "whsec_..."}Defense in depth: URL is unguessable (128-bit) AND HMAC signature must be valid. Either can be rotated independently. Async AcknowledgmentRespond before producing to Iggy: Delivery is at-most-once from the caller's perspective. 200 = "queued locally," not "persisted to Iggy." Webhook senders implement retry-on-timeout, providing at-least-once from the caller's side. Comparison: HTTP Sink vs HTTP Source
Architectural Questions for the Team
ScopeIncluded
Not Included
RBAC: Deliberate Non-GoalWe considered and explicitly rejected building RBAC (Role-Based Access Control) into the connector. The connector performs authentication (verify the caller is who they claim) but delegates authorization (is this caller allowed to perform this action) to the layers above and below it. The connector sits between two systems that already have their own access control: Adding a third authorization layer in the middle creates configuration complexity (three places to manage permissions with no clear source of truth), consistency headaches (permissions across layers diverge), and scope creep (role definitions, permission grants, group management — an entire authorization system that would dwarf the connector). How authorization is already handled at each layer:
Key insight: Ephemeral endpoints with per-endpoint auth are functionally equivalent to per-resource authorization. Each integration gets its own endpoint with its own secret. Revoking the endpoint (via config topic) is equivalent to revoking access. No role hierarchy needed. What would change this: If Iggy develops a first-party connector authorization framework (e.g., connector-level ACLs managed by the server), the HTTP source should integrate with it rather than maintaining its own. Estimated Size~3600-4650 lines across Pre-Implementation Review ResultsWe ran 5 specialized review agents against this design before any code was written. 25 findings (7 CRITICAL, 18 HIGH). 17 fixed in the design, 4 resolved with detailed implementation notes, 3 are architectural blockers requiring team input, 1 is an inherited runtime limitation. Architectural Blockers (Need Team Input)Blocker 1: SDK Single-Topic Routing
We see four options:
We lean toward (b) for initial implementation, with (a) as a follow-up SDK enhancement. What does the team prefer? Blocker 2: Shutdown Data LossThe runtime's Options:
Would a PR to adjust the shutdown order in Blocker 3: Config Consumer FailureIf the config consumer's Iggy connection drops, the HTTP server continues with a frozen registry — revoked endpoints stay active. We've added health signaling ( Key Design Corrections From ReviewThese have already been incorporated into the design above:
Additional Type DefinitionsQueuedMessage (the hot-path bridge between HTTP handler and poll): struct QueuedMessage {
topic: String, // Target topic, already resolved
payload: Vec<u8>, // Raw HTTP body bytes
http_metadata: HashMap<String, String>, // Pre-filtered forward headers
received_at: std::time::Instant, // For queue-time latency metrics
}EndpointId (validated newtype, 128 bits of entropy): struct EndpointId(String); // Exactly 32 lowercase hex chars, validated at constructionConfigEvent (serde-tagged, compile-time field validation): #[derive(Deserialize)]
#[serde(tag = "action", rename_all = "snake_case")]
enum ConfigEvent {
Register(RegisterEndpoint),
Revoke(RevokeEndpoint),
Update(UpdateEndpoint),
Config(ConfigOverride), // Allowlisted keys with range validation
}Looking forward to your feedback on the design, especially the architectural questions above. Happy to discuss any aspect in detail. |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
Hi team,
Following up on the HTTP sink connector (#2925), we'd like to propose its inverse — an HTTP source connector that acts as a webhook gateway. It embeds an HTTP server inside the Source plugin, accepts incoming POST requests, and produces messages to Iggy topics.
The connector ecosystem has 3 sources today, all poll-based (random, PostgreSQL, Elasticsearch). There's no push-based source for accepting inbound webhooks — the most common pattern for receiving real-time events from SaaS integrations (GitHub, Stripe, Slack), IoT devices, CI/CD pipelines, and inter-service communication.
Pre-Implementation Review
We ran 5 specialized review agents against this design before writing any code. 25 findings (7 CRITICAL, 18 HIGH). 21 have been corrected in the design. 3 are architectural blockers that require team input:
Blocker 1: SDK Single-Topic Routing
ProducedMessagehas no topic field. The runtime creates oneIggyProducerfor the last[[streams]]entry. Multi-topic routing (different webhook endpoints → different topics) is the connector's primary value proposition, but it cannot be expressed through the current SDK contract.We lean toward using the connector's own
IggyClientto produce directly (bypassing the forwarding loop), with an SDK extension as a follow-up. What does the team prefer?Blocker 2: Shutdown Data Loss
SourceManager::stop_connectorcallscleanup_sender(removes the flume channel) BEFORE callingiggy_source_close. Messages drained bypoll()during shutdown hit a dead callback. Would a PR to adjust the shutdown order be welcome? (This affects all source connectors, not just HTTP.)Blocker 3: Config Consumer Failure
The connector creates its own
IggyClientto consume a config topic (event-sourced endpoint registry). If that connection drops, the HTTP server continues with a frozen registry — revoked endpoints stay active. We've added health signaling (degraded status, optional rejection). Is this failure mode acceptable?Design Highlights
crossbeam::queue::ArrayQueue) →poll()drains → Iggy/e/{random_id}) — individually revocable, with optional per-endpoint HMAC validationArcSwapfor registry,ArrayQueue+Notifyfor buffer — no mutex between TCP accept and HTTP 200CancellationTokentreeFull design document with all corrections, type definitions, and review findings is in the comment below.
Looking forward to your feedback, especially on the three blockers.
Beta Was this translation helpful? Give feedback.
All reactions