Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ You can use the following blocks with `database_observability.mysql`:
| [`explain_plans`][explain_plans] | Configure the explain plans collector. | no |
| [`locks`][locks] | Configure the locks collector. | no |
| [`query_samples`][query_samples] | Configure the query samples collector. | no |
| [`health_check`][health_check] | Configure the health check collector. | no |

The > symbol indicates deeper levels of nesting.
For example, `cloud_provider` > `aws` refers to a `aws` block defined inside an `cloud_provider` block.
Expand All @@ -75,6 +76,7 @@ For example, `cloud_provider` > `aws` refers to a `aws` block defined inside an
[locks]: #locks
[query_samples]: #query_samples
[setup_actors]: #setup_actors
[health_check]: #health_check

### `cloud_provider`

Expand Down Expand Up @@ -146,6 +148,13 @@ The `aws` block supplies the [ARN](https://docs.aws.amazon.com/IAM/latest/UserGu
| `collect_interval` | `duration` | How frequently to check if `setup_actors` are configured correctly. | `"1h"` | no |


### `health_checks`

| Name | Type | Description | Default | Required |
| -------------------------- | ---------- | ---------------------------------------------------------------------- | ------- | -------- |
| `collect_interval` | `duration` | How frequently to run health checks. | `"1h"` | no |


## Example

```alloy
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
package collector

import (
"context"
"database/sql"
"fmt"
"strings"
"time"

"github.com/go-kit/log"
"go.uber.org/atomic"

"github.com/grafana/alloy/internal/build"
"github.com/grafana/alloy/internal/component/common/loki"
"github.com/grafana/alloy/internal/component/database_observability"
"github.com/grafana/alloy/internal/runtime/logging"
"github.com/grafana/alloy/internal/runtime/logging/level"
)

const (
HealthCheckCollector = "health_check"
OP_HEALTH_STATUS = "health_status"
)

type HealthCheckArguments struct {
DB *sql.DB
CollectInterval time.Duration
EntryHandler loki.EntryHandler

Logger log.Logger
}

type HealthCheck struct {
dbConnection *sql.DB
collectInterval time.Duration
entryHandler loki.EntryHandler
logger log.Logger

running *atomic.Bool
ctx context.Context
cancel context.CancelFunc
}

func NewHealthCheck(args HealthCheckArguments) (*HealthCheck, error) {
h := &HealthCheck{
dbConnection: args.DB,
collectInterval: args.CollectInterval,
entryHandler: args.EntryHandler,
logger: log.With(args.Logger, "collector", HealthCheckCollector),
running: &atomic.Bool{},
}
return h, nil
}

func (c *HealthCheck) Name() string {
return HealthCheckCollector
}

func (c *HealthCheck) Start(ctx context.Context) error {
level.Debug(c.logger).Log("msg", "collector started")

c.running.Store(true)
ctx, cancel := context.WithCancel(ctx)
c.ctx = ctx
c.cancel = cancel

go func() {
defer func() {
c.Stop()
c.running.Store(false)
}()

ticker := time.NewTicker(c.collectInterval)

for {
c.fetchHealthChecks(c.ctx)
select {
case <-c.ctx.Done():
return
case <-ticker.C:
// continue loop
}
}
}()

return nil
}

func (c *HealthCheck) Stopped() bool {
return !c.running.Load()
}

// Stop should be kept idempotent
func (c *HealthCheck) Stop() {
if c.cancel != nil {
c.cancel()
}
}

type healthCheckResult struct {
name string
result bool
value string
err error
}

func (c *HealthCheck) fetchHealthChecks(ctx context.Context) {
checks := []func(context.Context, *sql.DB) healthCheckResult{
checkAlloyVersion,
checkRequiredGrants,
checkEventsStatementsDigestHasRows,
}

for _, checkFn := range checks {
result := checkFn(ctx, c.dbConnection)
if result.err != nil {
level.Error(c.logger).Log("msg", "health check failed", "check", result.name, "err", result.err)
continue
}
msg := fmt.Sprintf(`check="%s" result="%v" value="%s"`, result.name, result.result, result.value)
c.entryHandler.Chan() <- database_observability.BuildLokiEntry(
logging.LevelInfo,
OP_HEALTH_STATUS,
msg,
)
}
}

// checkAlloyVersion reports the running Alloy version.
func checkAlloyVersion(ctx context.Context, db *sql.DB) healthCheckResult {
r := healthCheckResult{name: "AlloyVersion"}
// Always succeeds; returns the version string embedded at build time.
r.result = true
r.value = build.Version
return r
}

// checkRequiredGrants verifies required privileges are present.
func checkRequiredGrants(ctx context.Context, db *sql.DB) healthCheckResult {
r := healthCheckResult{name: "RequiredGrantsPresent"}
req := map[string]bool{
"PROCESS": false,
"REPLICATION CLIENT": false,
"SELECT": false,
"SHOW VIEW": false,
}

rows, err := db.QueryContext(ctx, "SHOW GRANTS")
if err != nil {
r.err = fmt.Errorf("SHOW GRANTS: %w", err)
return r
}
defer rows.Close()

for rows.Next() {
var grantLine string
if err := rows.Scan(&grantLine); err != nil {
r.err = fmt.Errorf("scan SHOW GRANTS: %w", err)
return r
}
up := strings.ToUpper(grantLine)

// Mark individual privileges if present on *.* scope.
for k := range req {
if strings.Contains(up, " ON *.*") && strings.Contains(up, k) {
req[k] = true
}
}
}
if err := rows.Err(); err != nil {
r.err = fmt.Errorf("iterate SHOW GRANTS: %w", err)
return r
}

r.result = true
for k, found := range req {
if !found {
r.result = false
if r.value == "" {
r.value = "missing: " + k
} else {
r.value += "," + k
}
}
}

return r
}

// checkEventsStatementsDigestHasRows ensures performance_schema.events_statements_summary_by_digest has rows.
func checkEventsStatementsDigestHasRows(ctx context.Context, db *sql.DB) healthCheckResult {
r := healthCheckResult{name: "PerformanceSchemaHasRows"}
const q = `SELECT COUNT(*) FROM performance_schema.events_statements_summary_by_digest`
var rowCount int64
if err := db.QueryRowContext(ctx, q).Scan(&rowCount); err != nil {
r.err = err
return r
}
if rowCount == 0 {
return r
}
r.result = true
return r
}
Loading
Loading