enhancement(amqp source): expose message headers and properties in events (#23368)#25494
Open
KimJS0328 wants to merge 1 commit into
Open
enhancement(amqp source): expose message headers and properties in events (#23368)#25494KimJS0328 wants to merge 1 commit into
KimJS0328 wants to merge 1 commit into
Conversation
Contributor
|
All contributors have signed the CLA ✍️ ✅ |
Author
|
I have read the CLA Document and I hereby sign the CLA |
…ents (vectordotdev#23368) * enhancement(amqp source): expose AMQP message headers and properties as part of emitted events. * fix(amqp source): message timestamp handling by interpreting timestamps as Unix seconds instead of milliseconds, resolving an incorrect timestamp parsing bug discovered during this work.
8e344af to
2da9768
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Exposes AMQP 0.9.1 message headers and
BasicPropertieson events emitted by theamqpsource, and fixes the message timestamp to be interpreted as Unix seconds (per the AMQP spec) instead of being misinterpreted.headers_key(default:headers) andproperties_key(default:properties). Both can be disabled by setting them to an empty value, following the same pattern as the existingrouting_key_field/exchange_key/offset_key.headersis populated from the user-definedFieldTablevia a recursiveAMQPValue→Valueconversion, preserving nested tables/arrays and supported scalar types.propertiesis populated with the well-known AMQPBasicPropertiesscalar fields (content_type,content_encoding,delivery_mode,priority,correlation_id,reply_to,expiration,message_id,timestamp,type,user_id,app_id,cluster_id).headersis intentionally omitted frompropertiesto avoid duplication with the top-levelheaderskey.BasicProperties::timestampand anyAMQPValue::Timestampvalues inside headers/properties) are now interpreted as Unix seconds and converted into properchrono::DateTime<Utc>values. The schema definitions for bothVectorandLegacylog namespaces are updated accordingly.Vector configuration
Example event emitted (Legacy namespace):
{ "exchange": "amq.rabbitmq.event", "headers": { "peer_addr": "12.345.67.89", "peer_port": 45842, "recv_bytes": 844, "route": "{}", "send_bytes": 150830, "sock_addr": "12.123.45.67", "sock_port": 25672, "timestamp_in_ms": 1779688712659 }, "message": "", "offset": 555, "properties": { "delivery_mode": 2, "timestamp": "2026-05-25T05:58:32Z" }, "routing": "node.node.stats", "source_type": "amqp", "timestamp": "2026-05-25T05:58:32Z" }How did you test this PR?
src/sources/amqp.rs:output_schema_definition_vector_namespace/output_schema_definition_legacy_namespaceupdated to cover the newheadersandpropertiesmetadata fields.amqp_field_table_to_value_preserves_supported_typescovers the recursiveFieldTable→Valueconversion including nested tables, arrays, strings/bytes, integers, booleans, and timestamps.basic_properties_to_value_includes_scalar_propertiescovers the scalar property extraction.amqp_timestamp_to_datetime_uses_unix_secondscovers the Unix-seconds timestamp parsing.amqp_source_consume_event_with_headers_and_propertiesadded in the existingamqp-integration-testsmodule — publishes a message with custom headers, content type, priority, and timestamp via RabbitMQ and asserts the resulting event shape.cargo runagainst a real RabbitMQ broker, consumed live events from a queue with the configuration shown above, and verified thatheaders,properties, and the correctedtimestampappear on the emitted events as expected (sample output included in the "Vector configuration" section).make fmtmake check-fmtmake check-clippymake check-markdownmake check-generated-docs./scripts/check_changelog_fragments.shChange Type
Is this a breaking change?
The new
headers_keyandproperties_keyoptions default toheadersandpropertiesrespectively, which adds new top-level fields to emitted events. This is consistent with how the existingrouting_key_field/exchange_key/offset_keyalready behave and follows the same opt-out pattern (setting the key to empty disables the field).The timestamp fix changes the AMQP message timestamp from a previously incorrect value into a correctly interpreted Unix-seconds
DateTime. Users relying on the prior (incorrect) behavior would see corrected timestamp values.Does this PR include user facing changes?
no-changeloglabel to this PR.Changelog fragments included:
changelog.d/23368_amqp_source_headers_properties.feature.mdchangelog.d/23368_amqp_source_timestamp.fix.mdReferences
Notes
@vectordotdev/vectorto reach out to us regarding this PR.pre-pushhook, please see this template.make fmtmake check-clippy(if there are failures it's possible some of them can be fixed withmake clippy-fix)make testgit merge origin masterandgit push.Cargo.lock), pleaserun
make build-licensesto regenerate the license inventory and commit the changes (if any). More details on the dd-rust-license-tool.