feat(connectors): implement InfluxDB Sink and Source#2933
feat(connectors): implement InfluxDB Sink and Source#2933ryerraguntla wants to merge 60 commits intoapache:masterfrom
Conversation
Add example TOML configuration files for InfluxDB source and sink connectors under core/connectors/runtime/example_config/connectors/influx. Each file includes connector metadata (type, key, name, path, plugin format) and stream/plugin_config sections. The sink config defines connection (url, org, bucket, token), measurement/precision, payload and metadata/tag options, batching, retries and timeouts. The source config provides connection and token, a Flux query template using $cursor and $limit, polling, batch/cursor settings, initial offset, and retry/timeout settings. These serve as runtime examples for connector development and testing.
Improve resilience of InfluxDB source and sink connectors. Adds exponential backoff with ±20% jitter for open/poll/write retries, honours Retry-After on HTTP 429, and caps backoff delays. Introduces a simple consecutive-failure circuit breaker (threshold + cool-down) to avoid hammering unavailable InfluxDB instances and new config fields (max_open_retries, open_retry_max_delay, circuit_breaker_threshold, circuit_breaker_cool_down). Sink now propagates batch write errors (preventing silent data loss) and resets/records circuit state on success/failure. Consolidates helper functions (duration parsing, backoff, jitter, retry-after parsing, escaping), minor logging improvements, and type fixes. Adds integration test scaffolding for InfluxDB (docker-compose, sink/source tests) and updates connector Cargo.toml entries (rand/workspace and small feature tweaks in integration Cargo.toml).
Add dedicated InfluxDB source and sink TOML fixtures for integration tests (core/integration/tests/connectors/influxdb_source.toml and influxdb_sink.toml) with default plugin settings. Also normalize trailing newlines in existing sink/source.toml files and update core/integration/tests/connectors/fixtures/mod.rs to (re-)export the InfluxDb sink and source fixtures so they are available to tests.
Add dedicated InfluxDB source and sink TOML fixtures for integration tests (core/integration/tests/connectors/influxdb_source.toml and influxdb_sink.toml) with default plugin settings. Also normalize trailing newlines in existing sink/source.toml files and update core/integration/tests/connectors/fixtures/mod.rs to (re-)export the InfluxDb sink and source fixtures so they are available to tests.
…us brace escaping
Change InfluxDB connector defaults to nanosecond precision and increase sink default timeout to 30s. Updated DEFAULT_PRECISION and DEFAULT_TIMEOUT in the sink implementation, adjusted example connector TOML files and test runtime configs to set precision = "ns", and updated integration tests (influxdb_source.rs) to fix formatting, polling behavior, and assertions. These changes standardize precision handling and give the sink more time for network operations.
Insert a small dummy comment in the file header and consolidate two separate `pub use influxdb::...` lines into a single grouped import (`pub use influxdb::{InfluxDbSinkFixture, InfluxDbSourceFixture};`). No functional changes—just minor cleanup for clarity.
Change example and test configs to use microsecond precision (precision = "us") instead of nanoseconds. Update InfluxDbSink formatting to emit numeric fields without the integer suffix (`i`) and adjust timestamp handling/comment for clarity. Comment out the source crate's dependency on the influxdb sink and remove the corresponding entry from Cargo.lock to avoid the circular/implicit dependency. These changes align line protocol output and configs with the chosen precision and simplify crate dependencies.
Adjust InfluxDB sink/source behavior and tests: treat message.timestamp as microseconds, provide fallback to current time when timestamp is unset, and add per-message offset to ensure unique timestamps to avoid deduplication. Change default precision handling (config -> ns, sink default constant -> us) and update timestamp conversion logic to operate on microseconds. Fix source polling to avoid unconditional sleep and to sleep when circuit breaker is open. Improve test fixtures: use ns precision for write URL, refine CSV query row counting logic, update sink test queries to check the offset field, and make tests use dynamic timestamps (Utc::now) instead of fixed constants. Added logging/debugging to help diagnose InfluxDB responses and sink point timestamps.
Remove the unused ONE_DAY_MICROS constant and change test message timestamp increments from one day to 1000 microseconds (1ms). This makes generated TestMessage timestamps closely spaced for integration tests and avoids large time deltas that were previously used.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #2933 +/- ##
============================================
+ Coverage 71.74% 72.03% +0.28%
Complexity 930 930
============================================
Files 1121 1124 +3
Lines 93777 95182 +1405
Branches 71125 72541 +1416
============================================
+ Hits 67284 68564 +1280
- Misses 23856 23920 +64
- Partials 2637 2698 +61
🚀 New features to boost your workflow:
|
|
@ryerraguntla you said that precommit hooks were ran, yet clippy is failing. how is that possible? ;) |
hubcio
left a comment
There was a problem hiding this comment.
please make sure that all checks in CI pass. after that, we can review.
hubcio
left a comment
There was a problem hiding this comment.
i found 4 more nits, once you fix these we can merge.
core/integration/tests/connectors/fixtures/influxdb/container.rs
Outdated
Show resolved
Hide resolved
core/connectors/runtime/example_config/connectors/influxdb_source.toml
Outdated
Show resolved
Hide resolved
Introduce a separate per-query retry backoff cap and related config handling: add DEFAULT_RETRY_MAX_DELAY, a query_retry_max_delay field cached in open(), and a retry_max_delay config option (parsed once in open()). Use the cached value for per-query retries instead of re-parsing/open-backoff settings. Also add a warning for unrecognized payload_format values. Example connector configs were updated/moved (flattened path, placeholder token/org/bucket, and <BASE_DIR> in path), and a noisy debug eprintln in the InfluxDB test fixture was removed.
ryerraguntla
left a comment
There was a problem hiding this comment.
Committed the last set of nits. Once this goes with a clean prechecks, I will add some more test case to cover different data format messages.
Add comprehensive InfluxDB integration tests and extend test fixtures to support multiple payload formats and metadata/precision options. Introduces two large test suites (influxdb_sink_formats.rs, influxdb_source_formats.rs) plus several connector config TOMLs for text/base64/raw and no-metadata cases. Update fixtures to expose new env var constants, add per-fixture option structs and typed fixture variants (text, base64, raw, no-metadata, ns-precision), and wire those into the fixtures re-exports. Add base64 dependency to Cargo.toml/Cargo.lock required by the new tests. These changes increase coverage of payload_format branches, metadata permutations, precision handling, escaping, scalar parsing and cursor behavior.
ryerraguntla
left a comment
There was a problem hiding this comment.
Fixed the nits and added test cases to increase code coverage.
Introduce a reqwest-middleware based retry layer for InfluxDB connectors and wire it into source/sink crates. Added InfluxDbRetryMiddleware, is_transient_status and build_retry_client in the SDK retry module to handle 429/5xx, Retry-After, exponential backoff and jitter. Refactored InfluxDB sink and source to: build a raw reqwest::Client for startup connectivity checks (with their own retry bounds), then wrap it with ClientWithMiddleware for per-operation retries; removed duplicated hand-rolled retry loops. Updated Cargo.toml entries to include http/reqwest-middleware and added unit tests for the new middleware, connector behaviors, parsing and circuit breaker logic.
ryerraguntla
left a comment
There was a problem hiding this comment.
Improved the code coverage and refactored code as suggested.
Escape literal newline (\n) and carriage-return (\r) bytes in InfluxDB line-protocol outputs (measurement names, tag keys/values, and quoted string fields) to avoid splitting/corrupting line-protocol batches. Add unit tests to cover the new escape branches and an integration test for text payloads containing newlines. For the InfluxDB source, add a persisted cursor_row_count (with serde default) and update polling logic: query_with_params now accepts an already_seen count and inflates the limit to compensate, poll_messages skips already-delivered rows at the current cursor, returns rows_at_max_cursor, and state updates accumulate/reset cursor_row_count accordingly. Also add a small sleep when the circuit breaker is open and new unit tests for limit inflation.
ryerraguntla
left a comment
There was a problem hiding this comment.
Completed the requested changes.
|
A quick question - what about v3? There's a dedicated crate based on HTTP, could we also support it? |
yes we can. There are significant improvements and differences in V3 . A different PR in next few weeks will be helpful. |
core/connectors/runtime/example_config/connectors/influxdb_source.toml
Outdated
Show resolved
Hide resolved
Rename InfluxDbRetryMiddleware -> HttpRetryMiddleware and add a log_prefix so the retry middleware can be reused across connectors. Introduce Error::PermanentHttpError to represent non-transient HTTP (4xx) errors and use it in InfluxDB source/sink code so those errors do not count toward the circuit breaker. Switch connector token fields to secrecy::SecretString with iggy_common::serde_secret::serialize_secret and use expose_secret() when sending. Improve parse_duration to warn on invalid user-supplied values and add a test to cover default/no-value behavior. Relax CSV header detection to require only `_time` (to support Flux aggregation results) and add a comment to the example config noting that precision is a write-side setting. Add iggy_common and secrecy to connector Cargo.toml files.
…ntla/iggy into feat/influxdb-connector
ryerraguntla
left a comment
There was a problem hiding this comment.
All suggestions taken care of
hubcio
left a comment
There was a problem hiding this comment.
once you fix these I think we can merge.
Introduce simd-json for faster JSON (added to influxdb_sink Cargo.toml) and rename Payload::try_as_bytes to try_to_bytes with docs clarifying allocation semantics. Add shared connectivity primitives to the SDK: ConnectivityConfig, check_connectivity, and check_connectivity_with_retry (centralizing health-check + retry logic). Cache payload_format in InfluxDB sink and source to avoid per-message allocations, and implement a fast-path simd_json serialisation for Payload::Json in the sink (falling back to bytes->serde_json validation for other variants). Replace duplicated health-check/retry implementations in the InfluxDB sink/source with the SDK helpers and update call sites accordingly. Many new tests were added/updated in the influxdb sink to cover payload handling, timestamps, metadata, URL building, and circuit-breaker behavior.
ryerraguntla
left a comment
There was a problem hiding this comment.
Implemented the suggestions.




Adds a native InfluxDB connector for Apache Iggy resolving #2700.
New crates:
iggy_connector_influxdb_sink— consumes Iggy messages and writes them to InfluxDB 2.x via line protocol (/api/v2/write)iggy_connector_influxdb_source— polls InfluxDB Flux query results and produces messages to Iggy topicsFeatures:
ns,us,ms,s)Closes #2700
What changed?
Implemented the following features based on a good connector framework
InfluxDB Sink Connector
Writes Iggy stream messages to InfluxDB v2 using the line protocol write API.
Data writing — Messages are serialised as line protocol with correct escaping for measurements, tag values, and string fields. Payloads can be written as JSON (validated and escaped), UTF-8 text, or Base64-encoded raw bytes. Timestamps are converted from Iggy's microsecond epoch to the configured InfluxDB precision (ns/us/ms/s, default µs). An offset-based nanosecond blend prevents silent deduplication when multiple messages in the same batch share the same microsecond timestamp. If a producer sets timestamp=0, the connector falls back to SystemTime::now() to avoid Year-1970 points.
Resilience — Writes are retried up to max_retries (default 3) on HTTP 429 and 5xx responses using exponential backoff with ±20% jitter. The Retry-After header is honoured in both integer-seconds and RFC 7231 HTTP-date formats before falling back to own backoff. A circuit breaker opens after circuit_breaker_threshold (default 5) consecutive batch failures and holds for circuit_breaker_cool_down (default 30s) before a half-open probe. Batch errors are captured and propagated to the runtime after processing all remaining sub-batches, preventing silent data loss.
Startup — open() retries the InfluxDB /health endpoint up to max_open_retries (default 10) with capped exponential backoff, so the connector recovers from transient InfluxDB restarts without manual intervention.
Metadata — Stream name, topic, and partition can be written as InfluxDB tags; message checksum and origin timestamp as fields. Each is individually togglable. The measurement name is configurable (default iggy_messages). Verbose logging mode promotes per-batch diagnostics from debug! to info! without code changes.
InfluxDB Source Connector
Polls InfluxDB v2 via the Flux query API and publishes results to Iggy topics as structured JSON messages.
Incremental polling — Tracks max(_time) from each Flux response as a cursor, advanced by 1 ns before storing to prevent boundary-point re-delivery. The cursor is templated into any $cursor and $limit placeholder in the user-supplied Flux query, supporting arbitrary range, filter, pivot, and aggregation chains. The cursor and row count are persisted via ConnectorState so polling resumes exactly where it left off after a restart.
CSV parsing — Handles InfluxDB annotated CSV output: skips #group, #datatype, and #default annotation rows, detects header rows by field names, and correctly handles blank-line separators between multi-table results.
Message format — Each row is published as a structured JSON envelope {measurement, field, timestamp, value, row} by default. When payload_column is set, the raw field value is extracted and emitted in the configured format (JSON, Text, or Raw/Base64).
Resilience — Same exponential backoff, jitter, Retry-After parsing, and circuit breaker as the sink. open() retries the /health endpoint before declaring ready. close() drops the reqwest client to release all connection pool resources.
Configuration — cursor_field (default _time) and initial_offset allow operators to replay from any point in the bucket. poll_interval, batch_size, and timeout are all tunable. Verbose logging toggles per-poll diagnostics between debug! and info!.
Local Execution
AI Usage
If AI tools were used, please answer: