Skip to content

Commit d3542bc

Browse files
feat: Database_observability.mysql: support excluding schemas in all collectors (#5350)
### Brief description of Pull Request Propagate `exclude_schemas` configuration to all MySQL collectors and append the user-defined schemas to the default excluded schemas. Additionally, change the behaviour of explain_plans collector to skip queries from excluded schemas instead of processing them. ### Pull Request Details <!-- Add a more detailed descripion of the Pull Request here, if needed. --> ### Issue(s) fixed by this Pull Request <!-- Uncomment the following line and fill in an issue number if you want a GitHub issue to be closed automatically when this PR gets merged. --> <!-- Fixes #issue_id --> ### Notes to the Reviewer <!-- Add any relevant notes for the reviewers and testers of this PR. --> ### PR Checklist <!-- Remove items that do not apply. For completed items, change [ ] to [x]. --> - [ ] Documentation added - [x] Tests updated - [ ] Config converters updated
1 parent a818d6e commit d3542bc

File tree

12 files changed

+269
-90
lines changed

12 files changed

+269
-90
lines changed

internal/component/database_observability/mysql/collector/constants.go

Lines changed: 0 additions & 3 deletions
This file was deleted.
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package collector
2+
3+
import "strings"
4+
5+
var defaultExcludedSchemas = []string{"mysql", "performance_schema", "sys", "information_schema"}
6+
7+
var defaultExclusionClause = buildExclusionClause(defaultExcludedSchemas)
8+
9+
func buildExcludedSchemasClause(excludedSchemas []string) string {
10+
if len(excludedSchemas) == 0 {
11+
return defaultExclusionClause
12+
}
13+
14+
allSchemas := make([]string, 0, len(defaultExcludedSchemas)+len(excludedSchemas))
15+
allSchemas = append(allSchemas, defaultExcludedSchemas...)
16+
allSchemas = append(allSchemas, excludedSchemas...)
17+
18+
return buildExclusionClause(allSchemas)
19+
}
20+
21+
func buildExclusionClause(schemas []string) string {
22+
escaped := make([]string, len(schemas))
23+
for i, schema := range schemas {
24+
escaped[i] = escapeSQLString(schema)
25+
}
26+
return "(" + strings.Join(escaped, ", ") + ")"
27+
}
28+
29+
// escapeSQLString escapes single quotes by doubling them to prevent SQL injection.
30+
func escapeSQLString(s string) string {
31+
escaped := strings.ReplaceAll(s, "'", "''")
32+
return "'" + escaped + "'"
33+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package collector
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/require"
7+
)
8+
9+
func TestBuildExcludedSchemasClause(t *testing.T) {
10+
tests := []struct {
11+
name string
12+
userExcludedSchemas []string
13+
expected string
14+
}{
15+
{
16+
name: "nil user schemas returns default schemas",
17+
userExcludedSchemas: nil,
18+
expected: "('mysql', 'performance_schema', 'sys', 'information_schema')",
19+
},
20+
{
21+
name: "empty user schemas returns default schemas",
22+
userExcludedSchemas: []string{},
23+
expected: "('mysql', 'performance_schema', 'sys', 'information_schema')",
24+
},
25+
{
26+
name: "single user schema is appended to default schemas",
27+
userExcludedSchemas: []string{"my_schema"},
28+
expected: "('mysql', 'performance_schema', 'sys', 'information_schema', 'my_schema')",
29+
},
30+
{
31+
name: "multiple user schemas are appended to default schemas",
32+
userExcludedSchemas: []string{"schema1", "schema2", "schema3"},
33+
expected: "('mysql', 'performance_schema', 'sys', 'information_schema', 'schema1', 'schema2', 'schema3')",
34+
},
35+
{
36+
name: "schema with single quote is escaped to prevent SQL injection",
37+
userExcludedSchemas: []string{"test'schema"},
38+
expected: "('mysql', 'performance_schema', 'sys', 'information_schema', 'test''schema')",
39+
},
40+
{
41+
name: "schema with SQL injection attempt is escaped",
42+
userExcludedSchemas: []string{"'; DROP TABLE users; --"},
43+
expected: "('mysql', 'performance_schema', 'sys', 'information_schema', '''; DROP TABLE users; --')",
44+
},
45+
{
46+
name: "schema with multiple single quotes is escaped",
47+
userExcludedSchemas: []string{"it's a test's schema"},
48+
expected: "('mysql', 'performance_schema', 'sys', 'information_schema', 'it''s a test''s schema')",
49+
},
50+
}
51+
52+
for _, tc := range tests {
53+
t.Run(tc.name, func(t *testing.T) {
54+
result := buildExcludedSchemasClause(tc.userExcludedSchemas)
55+
require.Equal(t, tc.expected, result)
56+
})
57+
}
58+
}

internal/component/database_observability/mysql/collector/explain_plans.go

Lines changed: 3 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"encoding/json"
99
"fmt"
1010
"math"
11-
"slices"
1211
"strconv"
1312
"strings"
1413
"time"
@@ -42,7 +41,7 @@ const selectDigestsForExplainPlan = `
4241
WHERE LAST_SEEN > ?
4342
AND QUERY_SAMPLE_TEXT IS NOT NULL
4443
AND DIGEST IS NOT NULL
45-
AND SCHEMA_NAME NOT IN ` + EXCLUDED_SCHEMAS
44+
AND SCHEMA_NAME NOT IN %s`
4645

4746
const selectExplainPlanPrefix = `EXPLAIN FORMAT=JSON `
4847

@@ -520,7 +519,8 @@ func (c *ExplainPlans) Stop() {
520519
}
521520

522521
func (c *ExplainPlans) populateQueryCache(ctx context.Context) error {
523-
rs, err := c.dbConnection.QueryContext(ctx, selectDigestsForExplainPlan, c.lastSeen)
522+
query := fmt.Sprintf(selectDigestsForExplainPlan, buildExcludedSchemasClause(c.excludeSchemas))
523+
rs, err := c.dbConnection.QueryContext(ctx, query, c.lastSeen)
524524
if err != nil {
525525
level.Error(c.logger).Log("msg", "failed to fetch digests for explain plans", "err", err)
526526
return err
@@ -536,23 +536,6 @@ func (c *ExplainPlans) populateQueryCache(ctx context.Context) error {
536536
level.Error(c.logger).Log("msg", "failed to scan digest for explain plans", "err", err)
537537
return err
538538
}
539-
if slices.ContainsFunc(c.excludeSchemas, func(schema string) bool {
540-
return strings.EqualFold(schema, schemaName)
541-
}) {
542-
543-
err := c.sendExplainPlansOutput(
544-
schemaName,
545-
digest,
546-
generatedAt,
547-
database_observability.ExplainProcessingResultSkipped,
548-
"query belongs to excluded schema",
549-
nil,
550-
)
551-
if err != nil {
552-
level.Error(c.logger).Log("msg", "failed to send excluded schema skip explain plan output", "err", err)
553-
}
554-
continue
555-
}
556539

557540
qi := newQueryInfo(schemaName, digest, queryText)
558541
if _, ok := c.queryDenylist[qi.uniqueKey]; !ok {

internal/component/database_observability/mysql/collector/explain_plans_test.go

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1529,7 +1529,7 @@ func TestExplainPlans(t *testing.T) {
15291529

15301530
t.Run("uses argument value on first request", func(t *testing.T) {
15311531
nextSeen := lastSeen.Add(time.Second * 45)
1532-
mock.ExpectQuery(selectDigestsForExplainPlan).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{
1532+
mock.ExpectQuery(fmt.Sprintf(selectDigestsForExplainPlan, defaultExclusionClause)).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{
15331533
"schema_name",
15341534
"digest",
15351535
"query_text",
@@ -1551,7 +1551,7 @@ func TestExplainPlans(t *testing.T) {
15511551
})
15521552

15531553
t.Run("uses oldest last seen value on subsequent requests", func(t *testing.T) {
1554-
mock.ExpectQuery(selectDigestsForExplainPlan).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{
1554+
mock.ExpectQuery(fmt.Sprintf(selectDigestsForExplainPlan, defaultExclusionClause)).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{
15551555
"schema_name",
15561556
"digest",
15571557
"query_text",
@@ -1594,7 +1594,7 @@ func TestExplainPlans(t *testing.T) {
15941594

15951595
t.Run("skips truncated queries", func(t *testing.T) {
15961596
logBuffer.Reset()
1597-
mock.ExpectQuery(selectDigestsForExplainPlan).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{
1597+
mock.ExpectQuery(fmt.Sprintf(selectDigestsForExplainPlan, defaultExclusionClause)).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{
15981598
"schema_name",
15991599
"digest",
16001600
"query_sample_text",
@@ -1629,7 +1629,7 @@ func TestExplainPlans(t *testing.T) {
16291629
t.Run("skips non-select queries", func(t *testing.T) {
16301630
lokiClient.Clear()
16311631
logBuffer.Reset()
1632-
mock.ExpectQuery(selectDigestsForExplainPlan).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{
1632+
mock.ExpectQuery(fmt.Sprintf(selectDigestsForExplainPlan, defaultExclusionClause)).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{
16331633
"schema_name",
16341634
"digest",
16351635
"query_sample_text",
@@ -1678,7 +1678,7 @@ func TestExplainPlans(t *testing.T) {
16781678

16791679
t.Run("skips no row result", func(t *testing.T) {
16801680
logBuffer.Reset()
1681-
mock.ExpectQuery(selectDigestsForExplainPlan).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{
1681+
mock.ExpectQuery(fmt.Sprintf(selectDigestsForExplainPlan, defaultExclusionClause)).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{
16821682
"schema_name",
16831683
"digest",
16841684
"query_sample_text",
@@ -1710,7 +1710,7 @@ func TestExplainPlans(t *testing.T) {
17101710
t.Run("passes queries beginning in select", func(t *testing.T) {
17111711
lokiClient.Clear()
17121712
logBuffer.Reset()
1713-
mock.ExpectQuery(selectDigestsForExplainPlan).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{
1713+
mock.ExpectQuery(fmt.Sprintf(selectDigestsForExplainPlan, defaultExclusionClause)).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{
17141714
"schema_name",
17151715
"digest",
17161716
"query_sample_text",
@@ -1747,7 +1747,7 @@ func TestExplainPlans(t *testing.T) {
17471747
t.Run("passes queries beginning in with", func(t *testing.T) {
17481748
lokiClient.Clear()
17491749
logBuffer.Reset()
1750-
mock.ExpectQuery(selectDigestsForExplainPlan).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{
1750+
mock.ExpectQuery(fmt.Sprintf(selectDigestsForExplainPlan, defaultExclusionClause)).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{
17511751
"schema_name",
17521752
"digest",
17531753
"query_sample_text",
@@ -1809,7 +1809,7 @@ func TestQueryFailureDenylist(t *testing.T) {
18091809
})
18101810
require.NoError(t, err)
18111811

1812-
mock.ExpectQuery(selectDigestsForExplainPlan).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{
1812+
mock.ExpectQuery(fmt.Sprintf(selectDigestsForExplainPlan, defaultExclusionClause)).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{
18131813
"schema_name",
18141814
"digest",
18151815
"query_sample_text",
@@ -1841,7 +1841,7 @@ func TestQueryFailureDenylist(t *testing.T) {
18411841
lokiClient.Clear()
18421842
logBuffer.Reset()
18431843

1844-
mock.ExpectQuery(selectDigestsForExplainPlan).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{
1844+
mock.ExpectQuery(fmt.Sprintf(selectDigestsForExplainPlan, defaultExclusionClause)).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{
18451845
"schema_name",
18461846
"digest",
18471847
"query_sample_text",
@@ -1899,17 +1899,12 @@ func TestSchemaDenylist(t *testing.T) {
18991899
})
19001900
require.NoError(t, err)
19011901

1902-
mock.ExpectQuery(selectDigestsForExplainPlan).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{
1902+
mock.ExpectQuery(fmt.Sprintf(selectDigestsForExplainPlan, buildExcludedSchemasClause([]string{"some_schema"}))).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{
19031903
"schema_name",
19041904
"digest",
19051905
"query_sample_text",
19061906
"last_seen",
19071907
}).AddRow(
1908-
"some_schema",
1909-
"some_digest1",
1910-
"select * from some_table where id = 1",
1911-
lastSeen,
1912-
).AddRow(
19131908
"different_schema",
19141909
"some_digest2",
19151910
"select * from some_table where id = 2",

internal/component/database_observability/mysql/collector/query_details.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ type QueryDetailsArguments struct {
3939
DB *sql.DB
4040
CollectInterval time.Duration
4141
StatementsLimit int
42+
ExcludeSchemas []string
4243
EntryHandler loki.EntryHandler
4344

4445
Logger log.Logger
@@ -48,6 +49,7 @@ type QueryDetails struct {
4849
dbConnection *sql.DB
4950
collectInterval time.Duration
5051
statementsLimit int
52+
excludeSchemas []string
5153
entryHandler loki.EntryHandler
5254
sqlParser parser.Parser
5355
normalizer *sqllexer.Normalizer
@@ -63,6 +65,7 @@ func NewQueryDetails(args QueryDetailsArguments) (*QueryDetails, error) {
6365
dbConnection: args.DB,
6466
collectInterval: args.CollectInterval,
6567
statementsLimit: args.StatementsLimit,
68+
excludeSchemas: args.ExcludeSchemas,
6669
entryHandler: args.EntryHandler,
6770
sqlParser: parser.NewTiDBSqlParser(),
6871
normalizer: sqllexer.NewNormalizer(sqllexer.WithCollectTables(true)),
@@ -120,7 +123,7 @@ func (c *QueryDetails) Stop() {
120123
}
121124

122125
func (c *QueryDetails) tablesFromEventsStatements(ctx context.Context) error {
123-
query := fmt.Sprintf(selectQueryTablesSamples, EXCLUDED_SCHEMAS, c.statementsLimit)
126+
query := fmt.Sprintf(selectQueryTablesSamples, buildExcludedSchemasClause(c.excludeSchemas), c.statementsLimit)
124127
rs, err := c.dbConnection.QueryContext(ctx, query)
125128
if err != nil {
126129
return fmt.Errorf("failed to fetch summary table samples: %w", err)

internal/component/database_observability/mysql/collector/query_details_test.go

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -378,7 +378,7 @@ func TestQueryTables(t *testing.T) {
378378
require.NoError(t, err)
379379
require.NotNil(t, collector)
380380

381-
mock.ExpectQuery(fmt.Sprintf(selectQueryTablesSamples, EXCLUDED_SCHEMAS, 250)).WithoutArgs().RowsWillBeClosed().
381+
mock.ExpectQuery(fmt.Sprintf(selectQueryTablesSamples, defaultExclusionClause, 250)).WithoutArgs().RowsWillBeClosed().
382382
WillReturnRows(
383383
sqlmock.NewRows([]string{
384384
"digest",
@@ -439,15 +439,15 @@ func TestQueryTablesSQLDriverErrors(t *testing.T) {
439439
require.NoError(t, err)
440440
require.NotNil(t, collector)
441441

442-
mock.ExpectQuery(fmt.Sprintf(selectQueryTablesSamples, EXCLUDED_SCHEMAS, 250)).WithoutArgs().RowsWillBeClosed().
442+
mock.ExpectQuery(fmt.Sprintf(selectQueryTablesSamples, defaultExclusionClause, 250)).WithoutArgs().RowsWillBeClosed().
443443
WillReturnRows(
444444
sqlmock.NewRows([]string{
445445
"digest", // not enough columns
446446
}).AddRow(
447447
"abc123",
448448
))
449449

450-
mock.ExpectQuery(fmt.Sprintf(selectQueryTablesSamples, EXCLUDED_SCHEMAS, 250)).WithoutArgs().RowsWillBeClosed().
450+
mock.ExpectQuery(fmt.Sprintf(selectQueryTablesSamples, defaultExclusionClause, 250)).WithoutArgs().RowsWillBeClosed().
451451
WillReturnRows(
452452
sqlmock.NewRows([]string{
453453
"digest",
@@ -505,7 +505,7 @@ func TestQueryTablesSQLDriverErrors(t *testing.T) {
505505
require.NoError(t, err)
506506
require.NotNil(t, collector)
507507

508-
mock.ExpectQuery(fmt.Sprintf(selectQueryTablesSamples, EXCLUDED_SCHEMAS, 250)).WithoutArgs().RowsWillBeClosed().
508+
mock.ExpectQuery(fmt.Sprintf(selectQueryTablesSamples, defaultExclusionClause, 250)).WithoutArgs().RowsWillBeClosed().
509509
WillReturnRows(
510510
sqlmock.NewRows([]string{
511511
"digest",
@@ -568,9 +568,9 @@ func TestQueryTablesSQLDriverErrors(t *testing.T) {
568568
require.NoError(t, err)
569569
require.NotNil(t, collector)
570570

571-
mock.ExpectQuery(fmt.Sprintf(selectQueryTablesSamples, EXCLUDED_SCHEMAS, 250)).WithoutArgs().WillReturnError(fmt.Errorf("connection error"))
571+
mock.ExpectQuery(fmt.Sprintf(selectQueryTablesSamples, defaultExclusionClause, 250)).WithoutArgs().WillReturnError(fmt.Errorf("connection error"))
572572

573-
mock.ExpectQuery(fmt.Sprintf(selectQueryTablesSamples, EXCLUDED_SCHEMAS, 250)).WithoutArgs().RowsWillBeClosed().
573+
mock.ExpectQuery(fmt.Sprintf(selectQueryTablesSamples, defaultExclusionClause, 250)).WithoutArgs().RowsWillBeClosed().
574574
WillReturnRows(
575575
sqlmock.NewRows([]string{
576576
"digest",
@@ -609,3 +609,33 @@ func TestQueryTablesSQLDriverErrors(t *testing.T) {
609609
require.Equal(t, `level="info" schema="some_schema" digest="abc123" table="some_table"`, lokiEntries[1].Line)
610610
})
611611
}
612+
613+
func TestQueryDetailsExcludeSchemas(t *testing.T) {
614+
defer goleak.VerifyNone(t)
615+
616+
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
617+
require.NoError(t, err)
618+
defer db.Close()
619+
620+
lokiClient := loki.NewCollectingHandler()
621+
defer lokiClient.Stop()
622+
623+
c, err := NewQueryDetails(QueryDetailsArguments{
624+
DB: db,
625+
CollectInterval: time.Millisecond,
626+
StatementsLimit: 250,
627+
ExcludeSchemas: []string{"excluded_schema"},
628+
EntryHandler: lokiClient,
629+
Logger: log.NewLogfmtLogger(os.Stderr),
630+
})
631+
require.NoError(t, err)
632+
633+
// Verify the query uses the custom exclusion clause
634+
mock.ExpectQuery(fmt.Sprintf(selectQueryTablesSamples, buildExcludedSchemasClause([]string{"excluded_schema"}), 250)).
635+
WithoutArgs().RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{
636+
"digest", "digest_text", "schema_name", "query_sample_text",
637+
}))
638+
639+
c.tablesFromEventsStatements(t.Context())
640+
require.NoError(t, mock.ExpectationsWereMet())
641+
}

0 commit comments

Comments
 (0)