Skip to content
Merged
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
package database_observability

import (
"encoding/base64"
"encoding/json"
"fmt"
"strings"

"github.com/go-logfmt/logfmt"
"github.com/grafana/alloy/internal/component/common/loki"
)

type ExplainPlanOutputOperation string

const (
Expand Down Expand Up @@ -35,6 +45,14 @@ const (
ExplainPlanJoinAlgorithmNestedLoop ExplainPlanJoinAlgorithm = "nested_loop"
)

type ExplainProcessingResult string

const (
ExplainProcessingResultSuccess ExplainProcessingResult = "success"
ExplainProcessingResultError ExplainProcessingResult = "error"
ExplainProcessingResultSkipped ExplainProcessingResult = "skipped"
)

type ExplainReservedWordMetadata struct {
ExemptionPrefixes *[]string
}
Expand Down Expand Up @@ -121,6 +139,9 @@ type ExplainPlanMetadataInfo struct {
DatabaseVersion string `json:"databaseVersion"`
QueryIdentifier string `json:"queryIdentifier"`
GeneratedAt string `json:"generatedAt"`

ProcessingResult ExplainProcessingResult `json:"processingResult"`
ProcessingResultReason string `json:"processingResultReason"`
}

type ExplainPlanNode struct {
Expand All @@ -143,3 +164,28 @@ type ExplainPlanNodeDetails struct {
SortKeys []string `json:"sortKeys,omitempty"`
Warning *string `json:"warning,omitempty"`
}

func ExtractExplainPlanOutputFromLogMsg(lokiEntry loki.Entry) (ExplainPlanOutput, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is used only for testing, maybe could be moved to a shared _test.go file, or we could add a comment that says it's here just for testing.

var explainPlanOutput ExplainPlanOutput
var explainPlanOutputString string
decoder := logfmt.NewDecoder(strings.NewReader(lokiEntry.Line))
for decoder.ScanRecord() {
for decoder.ScanKeyval() {
if string(decoder.Key()) == "explain_plan_output" {
explainPlanOutputString = string(decoder.Value())
break
}
}
}
if decoder.Err() != nil {
return explainPlanOutput, fmt.Errorf("failed to decode logfmt: %v", decoder.Err())
}
base64Decoded, err := base64.StdEncoding.DecodeString(explainPlanOutputString)
if err != nil {
return explainPlanOutput, fmt.Errorf("failed to decode base64 explain plan output: %v", err)
}
if err := json.Unmarshal(base64Decoded, &explainPlanOutput); err != nil {
return explainPlanOutput, fmt.Errorf("failed to unmarshal explain plan output: %v", err)
}
return explainPlanOutput, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,47 @@ const selectDigestsForExplainPlan = `

const selectExplainPlanPrefix = `EXPLAIN FORMAT=JSON `

func sendPremptedExplainPlansOutput(schemaName string, dbVersion string, digest string, generatedAt string, entryHandler loki.EntryHandler, result database_observability.ExplainProcessingResult, reason string) error {
output := &database_observability.ExplainPlanOutput{
Metadata: database_observability.ExplainPlanMetadataInfo{
DatabaseEngine: "MySQL",
DatabaseVersion: dbVersion,
QueryIdentifier: digest,
GeneratedAt: generatedAt,
ProcessingResult: result,
ProcessingResultReason: reason,
},
}

explainPlanOutputJSON, err := json.Marshal(output)
if err != nil {
return fmt.Errorf("failed to marshal explain plan output: %w", err)
}

logMessage := fmt.Sprintf(
`schema="%s" digest="%s" explain_plan_output="%s"`,
schemaName,
digest,
base64.StdEncoding.EncodeToString(explainPlanOutputJSON),
)

entryHandler.Chan() <- database_observability.BuildLokiEntry(
logging.LevelInfo,
OP_EXPLAIN_PLAN_OUTPUT,
logMessage,
)

return nil
}

func newExplainPlansOutput(logger log.Logger, dbVersion string, digest string, explainJson []byte, generatedAt string) (*database_observability.ExplainPlanOutput, error) {
output := &database_observability.ExplainPlanOutput{
Metadata: database_observability.ExplainPlanMetadataInfo{
DatabaseEngine: "MySQL",
DatabaseVersion: dbVersion,
QueryIdentifier: digest,
GeneratedAt: generatedAt,
DatabaseEngine: "MySQL",
DatabaseVersion: dbVersion,
QueryIdentifier: digest,
GeneratedAt: generatedAt,
ProcessingResult: database_observability.ExplainProcessingResultSuccess,
},
}

Expand Down Expand Up @@ -517,12 +551,22 @@ func (c *ExplainPlans) populateQueryCache(ctx context.Context) error {
return strings.EqualFold(schema, schemaName)
}) {

err := sendPremptedExplainPlansOutput(schemaName, c.dbVersion, digest, time.Now().Format(time.RFC3339), c.entryHandler, database_observability.ExplainProcessingResultSkipped, "query belongs to excluded schema")
if err != nil {
level.Error(c.logger).Log("msg", "failed to send excluded schema skip explain plan output", "err", err)
}
continue
}

qi := newQueryInfo(schemaName, digest, queryText)
if _, ok := c.queryDenylist[qi.uniqueKey]; !ok {
c.queryCache[qi.uniqueKey] = qi
} else {
err := sendPremptedExplainPlansOutput(schemaName, c.dbVersion, digest, time.Now().Format(time.RFC3339), c.entryHandler, database_observability.ExplainProcessingResultSkipped, "query denylisted")
if err != nil {
level.Error(c.logger).Log("msg", "failed to send denylisted query skip explain plan output", "err", err)
}
continue
}
if ls.After(c.lastSeen) {
c.lastSeen = ls
Expand Down Expand Up @@ -559,25 +603,34 @@ func (c *ExplainPlans) fetchExplainPlans(ctx context.Context) error {
if *nonRecoverableFailureOccurred {
qi.failureCount++
c.queryDenylist[qi.uniqueKey] = qi
level.Debug(c.logger).Log("msg", "query denylisted", "digest", qi.digest)
}
delete(c.queryCache, qi.uniqueKey)
processedCount++
}(&nonRecoverableFailureOccurred)

if strings.HasSuffix(qi.queryText, "...") {
level.Debug(logger).Log("msg", "skipping truncated query")
err := sendPremptedExplainPlansOutput(qi.schemaName, c.dbVersion, qi.digest, time.Now().Format(time.RFC3339), c.entryHandler, database_observability.ExplainProcessingResultSkipped, "query is truncated")
if err != nil {
level.Error(c.logger).Log("msg", "failed to send truncated query skip explain plan output", "err", err)
}
continue
}

containsReservedWord, err := database_observability.ContainsReservedKeywords(qi.queryText, database_observability.ExplainReservedWordDenyList, sqllexer.DBMSMySQL)
if err != nil {
level.Error(logger).Log("msg", "failed to check for reserved keywords", "err", err)
err := sendPremptedExplainPlansOutput(qi.schemaName, c.dbVersion, qi.digest, time.Now().Format(time.RFC3339), c.entryHandler, database_observability.ExplainProcessingResultError, fmt.Sprintf("failed to check for reserved keywords: %s", err.Error()))
if err != nil {
level.Error(c.logger).Log("msg", "failed to send reserved keyword check error explain plan output", "err", err)
}
continue
}

if containsReservedWord {
level.Debug(logger).Log("msg", "skipping query containing reserved word")
err := sendPremptedExplainPlansOutput(qi.schemaName, c.dbVersion, qi.digest, time.Now().Format(time.RFC3339), c.entryHandler, database_observability.ExplainProcessingResultSkipped, "query contains reserved word")
if err != nil {
level.Error(c.logger).Log("msg", "failed to send reserved keyword check error explain plan output", "err", err)
}
continue
}

Expand Down Expand Up @@ -649,8 +702,6 @@ func (c *ExplainPlans) fetchExplainPlans(ctx context.Context) error {
OP_EXPLAIN_PLAN_OUTPUT,
logMessage,
)
// TODO: Add context to logging when errors occur so the original node can be found.
// I.E. query_block->nested_loop[1]->table etc..
}

return nil
Expand Down
Loading