Skip to content
Merged
27 changes: 27 additions & 0 deletions .chloggen/trace-prapagation-postgres.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog)
component: receiver/postgresql

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add trace propagation support for postgresql

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [44868]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
22 changes: 21 additions & 1 deletion receiver/postgresqlreceiver/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,18 @@ import (
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
"go.uber.org/multierr"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/sqlquery"
)

const lagMetricsInSecondsFeatureGateID = "postgresqlreceiver.preciselagmetrics"
const (
lagMetricsInSecondsFeatureGateID = "postgresqlreceiver.preciselagmetrics"
querySampleTraceContextKey = "_otel_trace_context"
)

var preciseLagMetricsFg = featuregate.GlobalRegistry().MustRegister(
lagMetricsInSecondsFeatureGateID,
Expand Down Expand Up @@ -881,13 +886,15 @@ func (c *postgreSQLClient) getQuerySamples(ctx context.Context, limit int64, new
errs := make([]error, 0)
finalAttributes := make([]map[string]any, 0)
dbPrefix := "postgresql."
propagator := propagation.TraceContext{}
for _, row := range rows {
if row["query"] == "<insufficient privilege>" {
logger.Warn("skipping query sample due to insufficient privileges")
errs = append(errs, errors.New("skipping query sample due to insufficient privileges"))
continue
}
currentAttributes := make(map[string]any)
var traceCtx context.Context
simpleColumns := []string{
"client_hostname",
"query_start",
Expand All @@ -900,6 +907,19 @@ func (c *postgreSQLClient) getQuerySamples(ctx context.Context, limit int64, new

for _, col := range simpleColumns {
currentAttributes[dbPrefix+col] = row[col]
if col == "application_name" && row[col] != "" {
ctxFromQuery := propagator.Extract(context.Background(), propagation.MapCarrier{
"traceparent": row[col],
})

if trace.SpanContextFromContext(ctxFromQuery).IsValid() {
traceCtx = ctxFromQuery
}
}
}

if traceCtx != nil {
currentAttributes[querySampleTraceContextKey] = traceCtx
}

clientPort := int64(0)
Expand Down
2 changes: 1 addition & 1 deletion receiver/postgresqlreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ require (
go.opentelemetry.io/collector/receiver/receivertest v0.141.1-0.20251210054218-8f51a1792add
go.opentelemetry.io/collector/scraper v0.141.1-0.20251210054218-8f51a1792add
go.opentelemetry.io/collector/scraper/scraperhelper v0.141.1-0.20251210054218-8f51a1792add
go.opentelemetry.io/otel v1.39.0
go.opentelemetry.io/otel/trace v1.39.0
go.uber.org/goleak v1.3.0
go.uber.org/multierr v1.11.0
Expand Down Expand Up @@ -109,7 +110,6 @@ require (
go.opentelemetry.io/collector/receiver/receiverhelper v0.141.1-0.20251210054218-8f51a1792add // indirect
go.opentelemetry.io/collector/receiver/xreceiver v0.141.1-0.20251210054218-8f51a1792add // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.60.0 // indirect
go.opentelemetry.io/otel v1.39.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0 // indirect
go.opentelemetry.io/otel/metric v1.39.0 // indirect
go.opentelemetry.io/otel/sdk v1.39.0 // indirect
Expand Down
8 changes: 7 additions & 1 deletion receiver/postgresqlreceiver/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,13 @@ func (p *postgreSQLScraper) collectQuerySamples(ctx context.Context, dbClient cl
return
}
for _, atts := range attributes {
p.lb.RecordDbServerQuerySampleEvent(context.Background(),
logCtx := context.Background()
if ctxFromQuery, ok := atts[querySampleTraceContextKey]; ok {
if ctx, ok := ctxFromQuery.(context.Context); ok {
logCtx = ctx
}
}
p.lb.RecordDbServerQuerySampleEvent(logCtx,
timestamp,
metadata.AttributeDbSystemNamePostgresql,
atts["db.namespace"].(string),
Expand Down
45 changes: 45 additions & 0 deletions receiver/postgresqlreceiver/scraper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,51 @@ func TestScrapeQuerySample(t *testing.T) {
assert.NoError(t, errs)
}

func TestScrapeQuerySampleWithTraceparent(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.Databases = []string{}
cfg.Events.DbServerQuerySample.Enabled = true
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
require.NoError(t, err)

defer db.Close()

factory := mockSimpleClientFactory{
db: db,
}

settings := receivertest.NewNopSettings(metadata.Type)
logger, err := zap.NewProduction()
require.NoError(t, err)
settings.TelemetrySettings = component.TelemetrySettings{
Logger: logger,
}

scraper := newPostgreSQLScraper(settings, cfg, factory, newCache(1), newTTLCache[string](1, time.Second))
scraper.newestQueryTimestamp = 123440.111

traceparent := "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01"
mock.ExpectQuery(expectedScrapeSampleQuery).WillReturnRows(sqlmock.NewRows(
[]string{"datname", "usename", "client_addr", "client_hostname", "client_port", "query_start", "wait_event_type", "wait_event", "query_id", "pid", "application_name", "_query_start_timestamp", "state", "query", "duration_ms"},
).FromCSVString(fmt.Sprintf("postgres,otelu,11.4.5.14,otel,114514,2025-02-12T16:37:54.843+08:00,,,123131231231,1450,%s,123445.123,idle,select * from pg_stat_activity where id = 32,1.2", traceparent)))
actualLogs, err := scraper.scrapeQuerySamples(t.Context(), 30)
require.NoError(t, err)

require.Equal(t, 1, actualLogs.ResourceLogs().Len())
rl := actualLogs.ResourceLogs().At(0)
require.Equal(t, 1, rl.ScopeLogs().Len())
sl := rl.ScopeLogs().At(0)
require.Equal(t, 1, sl.LogRecords().Len())
lr := sl.LogRecords().At(0)

require.Equal(t, "4bf92f3577b34da6a3ce929d0e0e4736", lr.TraceID().String())
require.Equal(t, "00f067aa0ba902b7", lr.SpanID().String())

applicationName, ok := lr.Attributes().Get("postgresql.application_name")
require.True(t, ok)
require.Equal(t, traceparent, applicationName.Str())
}

//go:embed testdata/scraper/top-query/expectedSql.sql
var expectedScrapeTopQuery string

Expand Down
Loading