Skip to content
Merged
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ Main (unreleased)

- Added `send_traceparent` option for `tracing` config to enable traceparent header propagation. (@MyDigitalLife)

- (_Public Preview_) Additions to `database_observability.mysql` and `database_observability.postgres` components:
- `explain_plans`
- always send an explain plan log message for each query, even skipped or errored queries. (@rgeyer)

### Bugfixes


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@ const (
ExplainPlanJoinAlgorithmNestedLoop ExplainPlanJoinAlgorithm = "nested_loop"
)

type ExplainProcessingResult string

const (
ExplainProcessingResultSuccess ExplainProcessingResult = "success"
ExplainProcessingResultError ExplainProcessingResult = "error"
ExplainProcessingResultSkipped ExplainProcessingResult = "skipped"
)

type ExplainReservedWordMetadata struct {
ExemptionPrefixes *[]string
}
Expand Down Expand Up @@ -121,6 +129,9 @@ type ExplainPlanMetadataInfo struct {
DatabaseVersion string `json:"databaseVersion"`
QueryIdentifier string `json:"queryIdentifier"`
GeneratedAt string `json:"generatedAt"`

ProcessingResult ExplainProcessingResult `json:"processingResult"`
ProcessingResultReason string `json:"processingResultReason"`
}

type ExplainPlanNode struct {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package database_observability

import (
"encoding/base64"
"encoding/json"
"fmt"
"strings"

"github.com/go-logfmt/logfmt"
"github.com/grafana/alloy/internal/component/common/loki"
)

// ExtractExplainPlanOutputFromLogMsg extracts the explain plan output from a log message. It is only used for testing by both mysql and postgres explain plan collectors.
func ExtractExplainPlanOutputFromLogMsg(lokiEntry loki.Entry) (ExplainPlanOutput, error) {
var explainPlanOutput ExplainPlanOutput
var explainPlanOutputString string
decoder := logfmt.NewDecoder(strings.NewReader(lokiEntry.Line))
for decoder.ScanRecord() {
for decoder.ScanKeyval() {
if string(decoder.Key()) == "explain_plan_output" {
explainPlanOutputString = string(decoder.Value())
break
}
}
}
if decoder.Err() != nil {
return explainPlanOutput, fmt.Errorf("failed to decode logfmt: %v", decoder.Err())
}
base64Decoded, err := base64.StdEncoding.DecodeString(explainPlanOutputString)
if err != nil {
return explainPlanOutput, fmt.Errorf("failed to decode base64 explain plan output: %v", err)
}
if err := json.Unmarshal(base64Decoded, &explainPlanOutput); err != nil {
return explainPlanOutput, fmt.Errorf("failed to unmarshal explain plan output: %v", err)
}
return explainPlanOutput, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,28 +46,14 @@ const selectDigestsForExplainPlan = `

const selectExplainPlanPrefix = `EXPLAIN FORMAT=JSON `

func newExplainPlansOutput(logger log.Logger, dbVersion string, digest string, explainJson []byte, generatedAt string) (*database_observability.ExplainPlanOutput, error) {
output := &database_observability.ExplainPlanOutput{
Metadata: database_observability.ExplainPlanMetadataInfo{
DatabaseEngine: "MySQL",
DatabaseVersion: dbVersion,
QueryIdentifier: digest,
GeneratedAt: generatedAt,
},
}

func newExplainPlansOutput(logger log.Logger, explainJson []byte) (*database_observability.ExplainPlanNode, error) {
qblock, _, _, err := jsonparser.Get(explainJson, "query_block")
if err != nil {
return output, fmt.Errorf("failed to get query block: %w", err)
return nil, fmt.Errorf("failed to get query block: %w", err)
}

planNode, err := parseTopLevelPlanNode(logger, qblock)
output.Plan = planNode
if err != nil {
return output, err
}

return output, nil
return &planNode, err
}

func parseTopLevelPlanNode(logger log.Logger, topLevelPlanNode []byte) (database_observability.ExplainPlanNode, error) {
Expand Down Expand Up @@ -452,6 +438,42 @@ func NewExplainPlans(args ExplainPlansArguments) (*ExplainPlans, error) {
}, nil
}

func (c *ExplainPlans) sendExplainPlansOutput(schemaName string, digest string, generatedAt string, result database_observability.ExplainProcessingResult, reason string, plan *database_observability.ExplainPlanNode) error {
output := &database_observability.ExplainPlanOutput{
Metadata: database_observability.ExplainPlanMetadataInfo{
DatabaseEngine: "MySQL",
DatabaseVersion: c.dbVersion,
QueryIdentifier: digest,
GeneratedAt: generatedAt,
ProcessingResult: result,
ProcessingResultReason: reason,
},
}
if plan != nil {
output.Plan = *plan
}

explainPlanOutputJSON, err := json.Marshal(output)
if err != nil {
return fmt.Errorf("failed to marshal explain plan output: %w", err)
}

logMessage := fmt.Sprintf(
`schema="%s" digest="%s" explain_plan_output="%s"`,
schemaName,
digest,
base64.StdEncoding.EncodeToString(explainPlanOutputJSON),
)

c.entryHandler.Chan() <- database_observability.BuildLokiEntry(
logging.LevelInfo,
OP_EXPLAIN_PLAN_OUTPUT,
logMessage,
)

return nil
}

func (c *ExplainPlans) Name() string {
return ExplainPlansCollector
}
Expand Down Expand Up @@ -507,6 +529,7 @@ func (c *ExplainPlans) populateQueryCache(ctx context.Context) error {

// Populate cache
for rs.Next() {
generatedAt := time.Now().Format(time.RFC3339)
var schemaName, digest, queryText string
var ls time.Time
if err = rs.Scan(&schemaName, &digest, &queryText, &ls); err != nil {
Expand All @@ -517,12 +540,36 @@ func (c *ExplainPlans) populateQueryCache(ctx context.Context) error {
return strings.EqualFold(schema, schemaName)
}) {

err := c.sendExplainPlansOutput(
schemaName,
digest,
generatedAt,
database_observability.ExplainProcessingResultSkipped,
"query belongs to excluded schema",
nil,
)
if err != nil {
level.Error(c.logger).Log("msg", "failed to send excluded schema skip explain plan output", "err", err)
}
continue
}

qi := newQueryInfo(schemaName, digest, queryText)
if _, ok := c.queryDenylist[qi.uniqueKey]; !ok {
c.queryCache[qi.uniqueKey] = qi
} else {
err := c.sendExplainPlansOutput(
schemaName,
digest,
generatedAt,
database_observability.ExplainProcessingResultSkipped,
"query denylisted",
nil,
)
if err != nil {
level.Error(c.logger).Log("msg", "failed to send denylisted query skip explain plan output", "err", err)
}
continue
}
if ls.After(c.lastSeen) {
c.lastSeen = ls
Expand All @@ -549,6 +596,7 @@ func (c *ExplainPlans) fetchExplainPlans(ctx context.Context) error {

processedCount := 0
for _, qi := range c.queryCache {
generatedAt := time.Now().Format(time.RFC3339)
nonRecoverableFailureOccurred := false
if processedCount >= c.currentBatchSize {
break
Expand All @@ -559,25 +607,55 @@ func (c *ExplainPlans) fetchExplainPlans(ctx context.Context) error {
if *nonRecoverableFailureOccurred {
qi.failureCount++
c.queryDenylist[qi.uniqueKey] = qi
level.Debug(c.logger).Log("msg", "query denylisted", "digest", qi.digest)
}
delete(c.queryCache, qi.uniqueKey)
processedCount++
}(&nonRecoverableFailureOccurred)

if strings.HasSuffix(qi.queryText, "...") {
level.Debug(logger).Log("msg", "skipping truncated query")
err := c.sendExplainPlansOutput(
qi.schemaName,
qi.digest,
generatedAt,
database_observability.ExplainProcessingResultSkipped,
"query is truncated",
nil,
)
if err != nil {
level.Error(c.logger).Log("msg", "failed to send truncated query skip explain plan output", "err", err)
}
continue
}

containsReservedWord, err := database_observability.ContainsReservedKeywords(qi.queryText, database_observability.ExplainReservedWordDenyList, sqllexer.DBMSMySQL)
if err != nil {
level.Error(logger).Log("msg", "failed to check for reserved keywords", "err", err)
err := c.sendExplainPlansOutput(
qi.schemaName,
qi.digest,
generatedAt,
database_observability.ExplainProcessingResultError,
fmt.Sprintf("failed to check for reserved keywords: %s", err.Error()),
nil,
)
if err != nil {
level.Error(c.logger).Log("msg", "failed to send reserved keyword check error explain plan output", "err", err)
}
continue
}

if containsReservedWord {
level.Debug(logger).Log("msg", "skipping query containing reserved word")
err := c.sendExplainPlansOutput(
qi.schemaName,
qi.digest,
generatedAt,
database_observability.ExplainProcessingResultSkipped,
"query contains reserved word",
nil,
)
if err != nil {
level.Error(c.logger).Log("msg", "failed to send reserved keyword check error explain plan output", "err", err)
}
continue
}

Expand Down Expand Up @@ -617,9 +695,7 @@ func (c *ExplainPlans) fetchExplainPlans(ctx context.Context) error {
level.Debug(logger).Log("msg", "db native explain plan",
"db_native_explain_plan", base64.StdEncoding.EncodeToString(redactedByteExplainPlanJSON))

generatedAt := time.Now().Format(time.RFC3339)

explainPlanOutput, genErr := newExplainPlansOutput(logger, c.dbVersion, qi.digest, byteExplainPlanJSON, generatedAt)
explainPlanOutput, genErr := newExplainPlansOutput(logger, byteExplainPlanJSON)
explainPlanOutputJSON, err := json.Marshal(explainPlanOutput)
if err != nil {
level.Error(logger).Log("msg", "failed to marshal explain plan output", "err", err)
Expand All @@ -637,20 +713,16 @@ func (c *ExplainPlans) fetchExplainPlans(ctx context.Context) error {
continue
}

logMessage := fmt.Sprintf(
`schema="%s" digest="%s" explain_plan_output="%s"`,
if err := c.sendExplainPlansOutput(
qi.schemaName,
qi.digest,
base64.StdEncoding.EncodeToString(explainPlanOutputJSON),
)

c.entryHandler.Chan() <- database_observability.BuildLokiEntry(
logging.LevelInfo,
OP_EXPLAIN_PLAN_OUTPUT,
logMessage,
)
// TODO: Add context to logging when errors occur so the original node can be found.
// I.E. query_block->nested_loop[1]->table etc..
generatedAt,
database_observability.ExplainProcessingResultSuccess,
"",
explainPlanOutput,
); err != nil {
level.Error(c.logger).Log("msg", "failed to send explain plan output", "err", err)
}
}

return nil
Expand Down
Loading
Loading