diff --git a/.github/workflows/flow_schema_migrations.yml b/.github/workflows/flow_schema_migrations.yml deleted file mode 100644 index 32caa62571..0000000000 --- a/.github/workflows/flow_schema_migrations.yml +++ /dev/null @@ -1,62 +0,0 @@ -name: flow pipeline schema migrations -on: - push: - paths: - - "telemetry/flow-enricher/clickhouse/**" - branches: [ main, 'hotfix/**' ] - pull_request: - paths: - - "telemetry/flow-enricher/clickhouse/**" - -jobs: - validate: - name: schema:validate - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v4 - - uses: actions/setup-go@v5 - with: - go-version-file: go.mod - cache: true - - name: install goose - run: go install github.com/pressly/goose/v3/cmd/goose@latest - - name: validate - working-directory: "telemetry/flow-enricher/clickhouse/" - run: goose validate - status: - name: schema:status - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v4 - - uses: actions/setup-go@v5 - with: - go-version-file: go.mod - cache: true - - name: install goose - run: go install github.com/pressly/goose/v3/cmd/goose@latest - - name: status - working-directory: "telemetry/flow-enricher/clickhouse/" - run: goose status - env: - GOOSE_DRIVER: "clickhouse" - GOOSE_DBSTRING: ${{ secrets.GOOSE_DBSTRING }} - migrate: - name: schema:migrate - needs: [ validate, status ] - runs-on: ubuntu-latest - environment: testnet - if: github.ref == 'refs/heads/main' - steps: - - uses: actions/checkout@v4 - - uses: actions/setup-go@v5 - with: - go-version-file: go.mod - cache: true - - name: install goose - run: go install github.com/pressly/goose/v3/cmd/goose@latest - - name: migrate - working-directory: "telemetry/flow-enricher/clickhouse/" - run: goose up - env: - GOOSE_DRIVER: "clickhouse" - GOOSE_DBSTRING: ${{ secrets.GOOSE_DBSTRING }} diff --git a/CHANGELOG.md b/CHANGELOG.md index 9c248370bc..5ebd4b4fbf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ All notable changes to this project will be documented in this file. - Tools - Add `IsRetryableFunc` field to `RetryOptions` for configurable retry criteria in the Solana JSON-RPC client; add `"rate limited"` string match and RPC code `-32429` to the default implementation - Telemetry + - Add shared `telemetry/migrations` package with goose-based ClickHouse schema migrations for all telemetry services; add `CLICKHOUSE_RUN_MIGRATIONS` env var to flow-enricher and gnmi-writer for on-startup schema migration ([#3460](https://github.com/malbeclabs/doublezero/pull/3460)) - Add optional TLS support to state-ingest server via `--tls-cert-file` and `--tls-key-file` flags; when set, the server listens on both HTTP (`:8080`) and HTTPS (`:8443`) simultaneously - Remove `--additional-child-probes` CLI flag from telemetry-agent; child geoprobe discovery now relies entirely on the onchain Geolocation program - Add BGP status submitter: on each tick, reads BGP socket state from the device namespace, maps each activated user to their tunnel peer IP, and submits `SetUserBGPStatus` onchain; supports a configurable down grace period and periodic keepalive refresh; enabled via `--bgp-status-enable` with `--bgp-status-interval`, `--bgp-status-refresh-interval`, and `--bgp-status-down-grace-period` flags diff --git a/telemetry/flow-enricher/cmd/flow-enricher/main.go b/telemetry/flow-enricher/cmd/flow-enricher/main.go index ab3911fcc6..3d54c13da6 100644 --- a/telemetry/flow-enricher/cmd/flow-enricher/main.go +++ b/telemetry/flow-enricher/cmd/flow-enricher/main.go @@ -15,6 +15,7 @@ import ( "github.com/malbeclabs/doublezero/config" "github.com/malbeclabs/doublezero/smartcontract/sdk/go/serviceability" enricher "github.com/malbeclabs/doublezero/telemetry/flow-enricher/internal/flow-enricher" + "github.com/malbeclabs/doublezero/telemetry/migrations" "github.com/malbeclabs/doublezero/tools/solana/pkg/rpc" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -59,6 +60,21 @@ func main() { if *stdoutOutput { chWriter = enricher.NewStdoutWriter() } else { + if os.Getenv("CLICKHOUSE_RUN_MIGRATIONS") == "true" { + if err := migrations.RunMigrations( + os.Getenv("CLICKHOUSE_ADDR"), + getEnvOrDefault("CLICKHOUSE_DB", "default"), + os.Getenv("CLICKHOUSE_USER"), + os.Getenv("CLICKHOUSE_PASS"), + os.Getenv("CLICKHOUSE_TLS_DISABLED") != "true", + logger, + ); err != nil { + logger.Error("error running clickhouse migrations", "error", err) + os.Exit(1) + } + logger.Info("clickhouse migrations applied") + } + chOpts := []enricher.ClickhouseOption{} if os.Getenv("CLICKHOUSE_TLS_DISABLED") == "true" { chOpts = append(chOpts, enricher.WithTLSDisabled(true)) diff --git a/telemetry/flow-enricher/internal/flow-enricher/enricher_integration_test.go b/telemetry/flow-enricher/internal/flow-enricher/enricher_integration_test.go index 7767d519f7..d3de6b13c5 100644 --- a/telemetry/flow-enricher/internal/flow-enricher/enricher_integration_test.go +++ b/telemetry/flow-enricher/internal/flow-enricher/enricher_integration_test.go @@ -11,6 +11,7 @@ import ( "time" "github.com/malbeclabs/doublezero/smartcontract/sdk/go/serviceability" + "github.com/malbeclabs/doublezero/telemetry/migrations" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" @@ -34,7 +35,7 @@ var ( chUser = "enricher" chPassword = "clickhouse" chDbname = "default" - chTable = "flows_integration" + chTable = "flows" rpBroker string rpUser = "enricher" @@ -53,13 +54,35 @@ func setupClickhouseContainer(ctx context.Context) error { clickhouse.WithUsername(chUser), clickhouse.WithPassword(chPassword), clickhouse.WithDatabase(chDbname), - clickhouse.WithInitScripts( - filepath.Join("fixtures", "create_table_device_ifindex.sql"), - filepath.Join("fixtures", "insert_device_ifindex.sql"), - filepath.Join("fixtures", "create_table_flows.sql"), - ), ) - return err + if err != nil { + return err + } + + chConn, err := clickhouseCtr.ConnectionHost(ctx) + if err != nil { + return err + } + + if err = migrations.RunMigrations(chConn, chDbname, chUser, chPassword, false, logger); err != nil { + return fmt.Errorf("error running clickhouse migrations: %w", err) + } + + db, err := migrations.NewDB(chConn, chDbname, chUser, chPassword, false) + if err != nil { + return err + } + defer db.Close() + + insertSQL, err := os.ReadFile(filepath.Join("fixtures", "insert_device_ifindex.sql")) + if err != nil { + return fmt.Errorf("error reading insert_device_ifindex.sql: %w", err) + } + if _, err = db.ExecContext(ctx, string(insertSQL)); err != nil { + return fmt.Errorf("error seeding device_ifindex: %w", err) + } + + return nil } func setupRedpandaContainer(ctx context.Context) error { @@ -192,7 +215,7 @@ func TestFlowEnrichment(t *testing.T) { WithClickhouseDB(chDbname), WithClickhouseUser(chUser), WithClickhousePassword(chPassword), - WithClickhouseTable("flows_integration"), + WithClickhouseTable(chTable), WithTLSDisabled(true), WithClickhouseLogger(logger), WithClickhouseMetrics(NewClickhouseMetrics(reg)), @@ -297,7 +320,7 @@ func TestFlowEnrichment(t *testing.T) { payload := readPcap(t, "./fixtures/sflow_ingress_user_traffic.pcap") f := &flow.FlowSample{ - ReceiveTimestamp: ×tamppb.Timestamp{Seconds: 1625243456, Nanos: 0}, + ReceiveTimestamp: timestamppb.Now(), FlowPayload: payload, } diff --git a/telemetry/flow-enricher/internal/flow-enricher/fixtures/create_table_flows.sql b/telemetry/flow-enricher/internal/flow-enricher/fixtures/create_table_flows.sql deleted file mode 100644 index edf73253ff..0000000000 --- a/telemetry/flow-enricher/internal/flow-enricher/fixtures/create_table_flows.sql +++ /dev/null @@ -1,65 +0,0 @@ -CREATE TABLE default.flows_integration -( - `as_path` Array(String) CODEC(ZSTD(1)), - `bgp_communities` Array(String) CODEC(ZSTD(1)), - `bgp_next_hop` String, - `bytes` UInt64 CODEC(Delta, LZ4), - `dst_addr` String, - `dst_as` UInt32, - `dst_mac` String, - `dst_net` String, - `dst_port` UInt16, - `dst_vlan` UInt16, - `etype` LowCardinality(String), - `forwarding_status` UInt8, - `fragment_id` UInt32, - `fragment_offset` UInt16, - `icmp_code` UInt8, - `icmp_type` UInt8, - `in_if` Int64, - `in_ifname` LowCardinality(String), - `ip_flags` UInt8, - `ip_tos` UInt8, - `ip_ttl` UInt8, - `ipv6_flow_label` UInt32, - `ipv6_routing_header_addresses` Array(String) CODEC(ZSTD(1)), - `ipv6_routing_header_seg_left` UInt8, - `layer_size` Array(Int64) CODEC(ZSTD(1)), - `layer_stack` Array(String) CODEC(ZSTD(1)), - `mpls_ip` Array(String) CODEC(ZSTD(1)), - `mpls_label` Array(String) CODEC(ZSTD(1)), - `mpls_ttl` Array(String) CODEC(ZSTD(1)), - `next_hop` String, - `next_hop_as` UInt32, - `observation_domain_id` UInt32, - `observation_point_id` UInt32, - `out_if` Int64, - `out_ifname` LowCardinality(String), - `packets` UInt32 CODEC(Delta, LZ4), - `proto` LowCardinality(String), - `sampler_address` LowCardinality(String), - `sampling_rate` UInt32, - `sequence_num` UInt32 CODEC(Delta, LZ4), - `src_addr` String, - `src_as` UInt32, - `src_mac` String, - `src_net` String, - `src_port` UInt16, - `src_vlan` UInt16, - `tcp_flags` UInt8, - `time_flow_end_ns` DateTime64(9) CODEC(DoubleDelta, LZ4), - `time_flow_start_ns` DateTime64(9) CODEC(DoubleDelta, LZ4), - `time_received_ns` DateTime64(9) CODEC(DoubleDelta, LZ4), - `type` LowCardinality(String), - `vlan_id` UInt16, - `src_device_code` LowCardinality(String), - `dst_device_code` LowCardinality(String), - `src_location` LowCardinality(String), - `dst_location` LowCardinality(String), - `src_exchange` LowCardinality(String), - `dst_exchange` LowCardinality(String) -) -ENGINE = MergeTree -PARTITION BY toYYYYMM(time_received_ns) -ORDER BY (time_received_ns, sampler_address, etype, proto, in_ifname, out_ifname, src_as, dst_as, sampling_rate) -SETTINGS index_granularity = 8192; diff --git a/telemetry/gnmi-writer/clickhouse/bgp_neighbors.sql b/telemetry/gnmi-writer/clickhouse/bgp_neighbors.sql deleted file mode 100644 index e014d711cc..0000000000 --- a/telemetry/gnmi-writer/clickhouse/bgp_neighbors.sql +++ /dev/null @@ -1,33 +0,0 @@ --- BGP Neighbors Table --- Stores BGP neighbor records from gNMI telemetry - -CREATE TABLE IF NOT EXISTS bgp_neighbors ( - timestamp DateTime64(9) CODEC(DoubleDelta, ZSTD(1)), - device_pubkey LowCardinality(String), - network_instance LowCardinality(String), - neighbor_address String, - description String, - peer_as UInt32, - local_as UInt32, - peer_type LowCardinality(String), - session_state LowCardinality(String), - established_transitions UInt64, - last_established Int64, - messages_received_update UInt64, - messages_sent_update UInt64 -) -ENGINE = MergeTree() -PARTITION BY toYYYYMM(timestamp) -ORDER BY (device_pubkey, network_instance, neighbor_address, timestamp) -TTL toDateTime(timestamp) + INTERVAL 30 DAY -SETTINGS index_granularity = 8192; - --- View for latest snapshot per device (returns complete state from most recent timestamp) -CREATE VIEW IF NOT EXISTS bgp_neighbors_latest AS -SELECT * -FROM bgp_neighbors -WHERE (device_pubkey, timestamp) IN ( - SELECT device_pubkey, max(timestamp) - FROM bgp_neighbors - GROUP BY device_pubkey -); diff --git a/telemetry/gnmi-writer/clickhouse/interface_ifindex.sql b/telemetry/gnmi-writer/clickhouse/interface_ifindex.sql deleted file mode 100644 index 209d7a317a..0000000000 --- a/telemetry/gnmi-writer/clickhouse/interface_ifindex.sql +++ /dev/null @@ -1,24 +0,0 @@ --- Interface Ifindex Table --- Stores interface ifindex mappings from gNMI telemetry - -CREATE TABLE IF NOT EXISTS interface_ifindex ( - timestamp DateTime64(9) CODEC(DoubleDelta, ZSTD(1)), - device_pubkey LowCardinality(String), - interface_name String, - ifindex UInt32 -) -ENGINE = MergeTree() -PARTITION BY toYYYYMM(timestamp) -ORDER BY (device_pubkey, interface_name, timestamp) -TTL toDateTime(timestamp) + INTERVAL 30 DAY -SETTINGS index_granularity = 8192; - --- View for latest snapshot per device (returns complete state from most recent timestamp) -CREATE VIEW IF NOT EXISTS interface_ifindex_latest AS -SELECT * -FROM interface_ifindex -WHERE (device_pubkey, timestamp) IN ( - SELECT device_pubkey, max(timestamp) - FROM interface_ifindex - GROUP BY device_pubkey -); diff --git a/telemetry/gnmi-writer/clickhouse/interface_state.sql b/telemetry/gnmi-writer/clickhouse/interface_state.sql deleted file mode 100644 index 9a465ee3cb..0000000000 --- a/telemetry/gnmi-writer/clickhouse/interface_state.sql +++ /dev/null @@ -1,37 +0,0 @@ --- Interface State Table --- Stores interface state and counters from gNMI telemetry - -CREATE TABLE IF NOT EXISTS interface_state ( - timestamp DateTime64(9) CODEC(DoubleDelta, ZSTD(1)), - device_pubkey LowCardinality(String), - interface_name String, - admin_status LowCardinality(String), - oper_status LowCardinality(String), - ifindex UInt32, - mtu UInt16, - last_change Int64, - carrier_transitions UInt64, - in_octets UInt64, - out_octets UInt64, - in_pkts UInt64, - out_pkts UInt64, - in_errors UInt64, - out_errors UInt64, - in_discards UInt64, - out_discards UInt64 -) -ENGINE = MergeTree() -PARTITION BY toYYYYMM(timestamp) -ORDER BY (device_pubkey, interface_name, timestamp) -TTL toDateTime(timestamp) + INTERVAL 30 DAY -SETTINGS index_granularity = 8192; - --- View for latest snapshot per device (returns complete state from most recent timestamp) -CREATE VIEW IF NOT EXISTS interface_state_latest AS -SELECT * -FROM interface_state -WHERE (device_pubkey, timestamp) IN ( - SELECT device_pubkey, max(timestamp) - FROM interface_state - GROUP BY device_pubkey -); diff --git a/telemetry/gnmi-writer/clickhouse/isis_adjacencies.sql b/telemetry/gnmi-writer/clickhouse/isis_adjacencies.sql deleted file mode 100644 index 80f061c6ef..0000000000 --- a/telemetry/gnmi-writer/clickhouse/isis_adjacencies.sql +++ /dev/null @@ -1,33 +0,0 @@ --- ISIS Adjacency State Table --- Stores ISIS adjacency records from gNMI telemetry - -CREATE TABLE IF NOT EXISTS isis_adjacencies ( - timestamp DateTime64(9) CODEC(DoubleDelta, ZSTD(1)), - device_pubkey LowCardinality(String), - interface_id String, - level UInt8, - system_id String, - adjacency_state LowCardinality(String), - neighbor_ipv4 String, - neighbor_ipv6 String, - neighbor_circuit_type LowCardinality(String), - area_address String, - up_timestamp Int64, - local_circuit_id UInt32, - neighbor_circuit_id UInt32 -) -ENGINE = MergeTree() -PARTITION BY toYYYYMM(timestamp) -ORDER BY (device_pubkey, interface_id, level, system_id, timestamp) -TTL toDateTime(timestamp) + INTERVAL 30 DAY -SETTINGS index_granularity = 8192; - --- View for latest snapshot per device (returns complete state from most recent timestamp) -CREATE VIEW IF NOT EXISTS isis_adjacencies_latest AS -SELECT * -FROM isis_adjacencies -WHERE (device_pubkey, timestamp) IN ( - SELECT device_pubkey, max(timestamp) - FROM isis_adjacencies - GROUP BY device_pubkey -); diff --git a/telemetry/gnmi-writer/clickhouse/system_state.sql b/telemetry/gnmi-writer/clickhouse/system_state.sql deleted file mode 100644 index a877b4168b..0000000000 --- a/telemetry/gnmi-writer/clickhouse/system_state.sql +++ /dev/null @@ -1,29 +0,0 @@ --- System State Table --- Stores system state records from gNMI telemetry (hostname, memory, CPU) - -CREATE TABLE IF NOT EXISTS system_state ( - timestamp DateTime64(9) CODEC(DoubleDelta, ZSTD(1)), - device_pubkey LowCardinality(String), - hostname String, - mem_total UInt64, - mem_used UInt64, - mem_free UInt64, - cpu_user Float64, - cpu_system Float64, - cpu_idle Float64 -) -ENGINE = MergeTree() -PARTITION BY toYYYYMM(timestamp) -ORDER BY (device_pubkey, timestamp) -TTL toDateTime(timestamp) + INTERVAL 30 DAY -SETTINGS index_granularity = 8192; - --- View for latest snapshot per device (returns complete state from most recent timestamp) -CREATE VIEW IF NOT EXISTS system_state_latest AS -SELECT * -FROM system_state -WHERE (device_pubkey, timestamp) IN ( - SELECT device_pubkey, max(timestamp) - FROM system_state - GROUP BY device_pubkey -); diff --git a/telemetry/gnmi-writer/clickhouse/transceiver_state.sql b/telemetry/gnmi-writer/clickhouse/transceiver_state.sql deleted file mode 100644 index 22c7a75d0d..0000000000 --- a/telemetry/gnmi-writer/clickhouse/transceiver_state.sql +++ /dev/null @@ -1,27 +0,0 @@ --- Transceiver Optical Power Table --- Stores optical transceiver channel state from gNMI telemetry - -CREATE TABLE IF NOT EXISTS transceiver_state ( - timestamp DateTime64(9) CODEC(DoubleDelta, ZSTD(1)), - device_pubkey LowCardinality(String), - interface_name String, - channel_index UInt16, - input_power Float64, - output_power Float64, - laser_bias_current Float64 -) -ENGINE = MergeTree() -PARTITION BY toYYYYMM(timestamp) -ORDER BY (device_pubkey, interface_name, channel_index, timestamp) -TTL toDateTime(timestamp) + INTERVAL 30 DAY -SETTINGS index_granularity = 8192; - --- View for latest snapshot per device (returns complete state from most recent timestamp) -CREATE VIEW IF NOT EXISTS transceiver_state_latest AS -SELECT * -FROM transceiver_state -WHERE (device_pubkey, timestamp) IN ( - SELECT device_pubkey, max(timestamp) - FROM transceiver_state - GROUP BY device_pubkey -); diff --git a/telemetry/gnmi-writer/clickhouse/transceiver_thresholds.sql b/telemetry/gnmi-writer/clickhouse/transceiver_thresholds.sql deleted file mode 100644 index db5db680f3..0000000000 --- a/telemetry/gnmi-writer/clickhouse/transceiver_thresholds.sql +++ /dev/null @@ -1,34 +0,0 @@ --- Transceiver Thresholds Table --- Stores transceiver alarm thresholds from gNMI telemetry - -CREATE TABLE IF NOT EXISTS transceiver_thresholds ( - timestamp DateTime64(9) CODEC(DoubleDelta, ZSTD(1)), - device_pubkey LowCardinality(String), - interface_name String, - severity LowCardinality(String), - input_power_lower Float64, - input_power_upper Float64, - output_power_lower Float64, - output_power_upper Float64, - laser_bias_current_lower Float64, - laser_bias_current_upper Float64, - module_temperature_lower Float64, - module_temperature_upper Float64, - supply_voltage_lower Float64, - supply_voltage_upper Float64 -) -ENGINE = MergeTree() -PARTITION BY toYYYYMM(timestamp) -ORDER BY (device_pubkey, interface_name, severity, timestamp) -TTL toDateTime(timestamp) + INTERVAL 30 DAY -SETTINGS index_granularity = 8192; - --- View for latest snapshot per device (returns complete state from most recent timestamp) -CREATE VIEW IF NOT EXISTS transceiver_thresholds_latest AS -SELECT * -FROM transceiver_thresholds -WHERE (device_pubkey, timestamp) IN ( - SELECT device_pubkey, max(timestamp) - FROM transceiver_thresholds - GROUP BY device_pubkey -); diff --git a/telemetry/gnmi-writer/cmd/gnmi-writer/main.go b/telemetry/gnmi-writer/cmd/gnmi-writer/main.go index 904322f775..c3cd32b85c 100644 --- a/telemetry/gnmi-writer/cmd/gnmi-writer/main.go +++ b/telemetry/gnmi-writer/cmd/gnmi-writer/main.go @@ -15,6 +15,7 @@ import ( "github.com/lmittmann/tint" "github.com/malbeclabs/doublezero/telemetry/gnmi-writer/internal/gnmi" + "github.com/malbeclabs/doublezero/telemetry/migrations" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" flag "github.com/spf13/pflag" @@ -102,6 +103,12 @@ func run() error { case "stdout": writer = gnmi.NewStdoutRecordWriter() case "clickhouse": + if cfg.ClickhouseRunMigrations { + if err = migrations.RunMigrations(cfg.ClickhouseAddr, cfg.ClickhouseDB, cfg.ClickhouseUser, cfg.ClickhousePassword, !cfg.ClickhouseTLSDisabled, log); err != nil { + return fmt.Errorf("clickhouse migrations: %w", err) + } + log.Info("clickhouse migrations applied") + } chMetrics := gnmi.NewClickhouseMetrics(prometheus.DefaultRegisterer) writer, err = gnmi.NewClickhouseRecordWriter( gnmi.WithClickhouseAddr(cfg.ClickhouseAddr), @@ -246,11 +253,12 @@ type Config struct { KafkaTLSDisabled bool // ClickHouse configuration - ClickhouseAddr string - ClickhouseDB string - ClickhouseUser string - ClickhousePassword string - ClickhouseTLSDisabled bool + ClickhouseAddr string + ClickhouseDB string + ClickhouseUser string + ClickhousePassword string + ClickhouseTLSDisabled bool + ClickhouseRunMigrations bool } func getenv(key, def string) string { @@ -288,6 +296,7 @@ func loadConfig() (Config, error) { flag.StringVar(&cfg.ClickhouseUser, "clickhouse-user", getenv("CLICKHOUSE_USER", "default"), "clickhouse username (env: CLICKHOUSE_USER)") flag.StringVar(&cfg.ClickhousePassword, "clickhouse-password", getenv("CLICKHOUSE_PASS", ""), "clickhouse password (env: CLICKHOUSE_PASS)") flag.BoolVar(&cfg.ClickhouseTLSDisabled, "clickhouse-tls-disabled", getenv("CLICKHOUSE_TLS_DISABLED", "") == "true", "disable TLS for clickhouse (env: CLICKHOUSE_TLS_DISABLED)") + flag.BoolVar(&cfg.ClickhouseRunMigrations, "clickhouse-run-migrations", getenv("CLICKHOUSE_RUN_MIGRATIONS", "") == "true", "run clickhouse migrations on startup (env: CLICKHOUSE_RUN_MIGRATIONS)") flag.Parse() diff --git a/telemetry/gnmi-writer/internal/gnmi/processor_integration_test.go b/telemetry/gnmi-writer/internal/gnmi/processor_integration_test.go index e1f67285fc..61c5dc2d45 100644 --- a/telemetry/gnmi-writer/internal/gnmi/processor_integration_test.go +++ b/telemetry/gnmi-writer/internal/gnmi/processor_integration_test.go @@ -23,6 +23,8 @@ import ( "github.com/twmb/franz-go/pkg/sasl/scram" "google.golang.org/protobuf/encoding/prototext" "google.golang.org/protobuf/proto" + + "github.com/malbeclabs/doublezero/telemetry/migrations" ) const ( @@ -62,23 +64,13 @@ func newTestHarness(t *testing.T) *testHarness { logger: logger, } - // Setup ClickHouse with production schemas - schemaDir := filepath.Join("..", "..", "clickhouse") + // Setup ClickHouse and apply migrations. var err error h.clickhouseCtr, err = clickhouse.Run(ctx, "clickhouse/clickhouse-server:23.3.8.21-alpine", clickhouse.WithUsername(chUser), clickhouse.WithPassword(chPassword), clickhouse.WithDatabase(chDbname), - clickhouse.WithInitScripts( - filepath.Join(schemaDir, "isis_adjacencies.sql"), - filepath.Join(schemaDir, "system_state.sql"), - filepath.Join(schemaDir, "bgp_neighbors.sql"), - filepath.Join(schemaDir, "interface_ifindex.sql"), - filepath.Join(schemaDir, "transceiver_state.sql"), - filepath.Join(schemaDir, "transceiver_thresholds.sql"), - filepath.Join(schemaDir, "interface_state.sql"), - ), ) require.NoError(t, err, "error setting up clickhouse container") testcontainers.CleanupContainer(t, h.clickhouseCtr) @@ -86,6 +78,9 @@ func newTestHarness(t *testing.T) *testHarness { h.chConn, err = h.clickhouseCtr.ConnectionHost(ctx) require.NoError(t, err, "error getting clickhouse connection") + err = migrations.RunMigrations(h.chConn, chDbname, chUser, chPassword, false, logger) + require.NoError(t, err, "error running clickhouse migrations") + // Setup Redpanda with retry logic for flaky container registry const maxAttempts = 5 var lastErr error diff --git a/telemetry/flow-enricher/clickhouse/20250303200212_flows_init.sql b/telemetry/migrations/20250303200212_flows_init.sql similarity index 78% rename from telemetry/flow-enricher/clickhouse/20250303200212_flows_init.sql rename to telemetry/migrations/20250303200212_flows_init.sql index 19af7bec9b..3a5b800f73 100644 --- a/telemetry/flow-enricher/clickhouse/20250303200212_flows_init.sql +++ b/telemetry/migrations/20250303200212_flows_init.sql @@ -60,9 +60,19 @@ CREATE TABLE IF NOT EXISTS default.flows `_offset` Int64, `_topic` String, `_header_keys` Array(String), - `_header_values` Array(String) + `_header_values` Array(String), + `src_device_code` String, + `dst_device_code` String, + `src_location` String, + `dst_location` String, + `src_exchange` String, + `dst_exchange` String ) -ENGINE = SharedMergeTree('/clickhouse/tables/{uuid}/{shard}', '{replica}') +-- MergeTree is used intentionally. ClickHouse Cloud automatically promotes MergeTree +-- (and its variants) to the equivalent SharedMergeTree engine transparently, so +-- specifying SharedMergeTree explicitly is unnecessary and breaks single-node deployments. +-- See: https://clickhouse.com/docs/cloud/reference/shared-merge-tree +ENGINE = MergeTree() PARTITION BY toYYYYMM(time_received_ns) ORDER BY (time_received_ns, sampler_address, etype, proto, in_ifname, out_ifname, src_as, dst_as, sampling_rate) TTL toDateTime(time_received_ns) + toIntervalMonth(1) diff --git a/telemetry/migrations/20250303200213_gnmi_init.sql b/telemetry/migrations/20250303200213_gnmi_init.sql new file mode 100644 index 0000000000..59b7fa05ce --- /dev/null +++ b/telemetry/migrations/20250303200213_gnmi_init.sql @@ -0,0 +1,270 @@ +-- +goose Up + +-- +goose StatementBegin +CREATE TABLE IF NOT EXISTS bgp_neighbors ( + timestamp DateTime64(9) CODEC(DoubleDelta, ZSTD(1)), + device_pubkey LowCardinality(String), + network_instance LowCardinality(String), + neighbor_address String, + description String, + peer_as UInt32, + local_as UInt32, + peer_type LowCardinality(String), + session_state LowCardinality(String), + established_transitions UInt64, + last_established Int64, + messages_received_update UInt64, + messages_sent_update UInt64 +) +ENGINE = MergeTree() +PARTITION BY toYYYYMM(timestamp) +ORDER BY (device_pubkey, network_instance, neighbor_address, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY +SETTINGS index_granularity = 8192; +-- +goose StatementEnd + +-- +goose StatementBegin +CREATE VIEW IF NOT EXISTS bgp_neighbors_latest AS +SELECT * +FROM bgp_neighbors +WHERE (device_pubkey, timestamp) IN ( + SELECT device_pubkey, max(timestamp) + FROM bgp_neighbors + GROUP BY device_pubkey +); +-- +goose StatementEnd + +-- +goose StatementBegin +CREATE TABLE IF NOT EXISTS interface_ifindex ( + timestamp DateTime64(9) CODEC(DoubleDelta, ZSTD(1)), + device_pubkey LowCardinality(String), + interface_name String, + ifindex UInt32 +) +ENGINE = MergeTree() +PARTITION BY toYYYYMM(timestamp) +ORDER BY (device_pubkey, interface_name, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY +SETTINGS index_granularity = 8192; +-- +goose StatementEnd + +-- +goose StatementBegin +CREATE VIEW IF NOT EXISTS interface_ifindex_latest AS +SELECT * +FROM interface_ifindex +WHERE (device_pubkey, timestamp) IN ( + SELECT device_pubkey, max(timestamp) + FROM interface_ifindex + GROUP BY device_pubkey +); +-- +goose StatementEnd + +-- +goose StatementBegin +CREATE TABLE IF NOT EXISTS interface_state ( + timestamp DateTime64(9) CODEC(DoubleDelta, ZSTD(1)), + device_pubkey LowCardinality(String), + interface_name String, + admin_status LowCardinality(String), + oper_status LowCardinality(String), + ifindex UInt32, + mtu UInt16, + last_change Int64, + carrier_transitions UInt64, + in_octets UInt64, + out_octets UInt64, + in_pkts UInt64, + out_pkts UInt64, + in_errors UInt64, + out_errors UInt64, + in_discards UInt64, + out_discards UInt64 +) +ENGINE = MergeTree() +PARTITION BY toYYYYMM(timestamp) +ORDER BY (device_pubkey, interface_name, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY +SETTINGS index_granularity = 8192; +-- +goose StatementEnd + +-- +goose StatementBegin +CREATE VIEW IF NOT EXISTS interface_state_latest AS +SELECT * +FROM interface_state +WHERE (device_pubkey, timestamp) IN ( + SELECT device_pubkey, max(timestamp) + FROM interface_state + GROUP BY device_pubkey +); +-- +goose StatementEnd + +-- +goose StatementBegin +CREATE TABLE IF NOT EXISTS isis_adjacencies ( + timestamp DateTime64(9) CODEC(DoubleDelta, ZSTD(1)), + device_pubkey LowCardinality(String), + interface_id String, + level UInt8, + system_id String, + adjacency_state LowCardinality(String), + neighbor_ipv4 String, + neighbor_ipv6 String, + neighbor_circuit_type LowCardinality(String), + area_address String, + up_timestamp Int64, + local_circuit_id UInt32, + neighbor_circuit_id UInt32 +) +ENGINE = MergeTree() +PARTITION BY toYYYYMM(timestamp) +ORDER BY (device_pubkey, interface_id, level, system_id, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY +SETTINGS index_granularity = 8192; +-- +goose StatementEnd + +-- +goose StatementBegin +CREATE VIEW IF NOT EXISTS isis_adjacencies_latest AS +SELECT * +FROM isis_adjacencies +WHERE (device_pubkey, timestamp) IN ( + SELECT device_pubkey, max(timestamp) + FROM isis_adjacencies + GROUP BY device_pubkey +); +-- +goose StatementEnd + +-- +goose StatementBegin +CREATE TABLE IF NOT EXISTS system_state ( + timestamp DateTime64(9) CODEC(DoubleDelta, ZSTD(1)), + device_pubkey LowCardinality(String), + hostname String, + mem_total UInt64, + mem_used UInt64, + mem_free UInt64, + cpu_user Float64, + cpu_system Float64, + cpu_idle Float64 +) +ENGINE = MergeTree() +PARTITION BY toYYYYMM(timestamp) +ORDER BY (device_pubkey, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY +SETTINGS index_granularity = 8192; +-- +goose StatementEnd + +-- +goose StatementBegin +CREATE VIEW IF NOT EXISTS system_state_latest AS +SELECT * +FROM system_state +WHERE (device_pubkey, timestamp) IN ( + SELECT device_pubkey, max(timestamp) + FROM system_state + GROUP BY device_pubkey +); +-- +goose StatementEnd + +-- +goose StatementBegin +CREATE TABLE IF NOT EXISTS transceiver_state ( + timestamp DateTime64(9) CODEC(DoubleDelta, ZSTD(1)), + device_pubkey LowCardinality(String), + interface_name String, + channel_index UInt16, + input_power Float64, + output_power Float64, + laser_bias_current Float64 +) +ENGINE = MergeTree() +PARTITION BY toYYYYMM(timestamp) +ORDER BY (device_pubkey, interface_name, channel_index, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY +SETTINGS index_granularity = 8192; +-- +goose StatementEnd + +-- +goose StatementBegin +CREATE VIEW IF NOT EXISTS transceiver_state_latest AS +SELECT * +FROM transceiver_state +WHERE (device_pubkey, timestamp) IN ( + SELECT device_pubkey, max(timestamp) + FROM transceiver_state + GROUP BY device_pubkey +); +-- +goose StatementEnd + +-- +goose StatementBegin +CREATE TABLE IF NOT EXISTS transceiver_thresholds ( + timestamp DateTime64(9) CODEC(DoubleDelta, ZSTD(1)), + device_pubkey LowCardinality(String), + interface_name String, + severity LowCardinality(String), + input_power_lower Float64, + input_power_upper Float64, + output_power_lower Float64, + output_power_upper Float64, + laser_bias_current_lower Float64, + laser_bias_current_upper Float64, + module_temperature_lower Float64, + module_temperature_upper Float64, + supply_voltage_lower Float64, + supply_voltage_upper Float64 +) +ENGINE = MergeTree() +PARTITION BY toYYYYMM(timestamp) +ORDER BY (device_pubkey, interface_name, severity, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY +SETTINGS index_granularity = 8192; +-- +goose StatementEnd + +-- +goose StatementBegin +CREATE VIEW IF NOT EXISTS transceiver_thresholds_latest AS +SELECT * +FROM transceiver_thresholds +WHERE (device_pubkey, timestamp) IN ( + SELECT device_pubkey, max(timestamp) + FROM transceiver_thresholds + GROUP BY device_pubkey +); +-- +goose StatementEnd + +-- +goose Down + +-- +goose StatementBegin +DROP VIEW IF EXISTS transceiver_thresholds_latest; +-- +goose StatementEnd +-- +goose StatementBegin +DROP TABLE IF EXISTS transceiver_thresholds; +-- +goose StatementEnd +-- +goose StatementBegin +DROP VIEW IF EXISTS transceiver_state_latest; +-- +goose StatementEnd +-- +goose StatementBegin +DROP TABLE IF EXISTS transceiver_state; +-- +goose StatementEnd +-- +goose StatementBegin +DROP VIEW IF EXISTS system_state_latest; +-- +goose StatementEnd +-- +goose StatementBegin +DROP TABLE IF EXISTS system_state; +-- +goose StatementEnd +-- +goose StatementBegin +DROP VIEW IF EXISTS isis_adjacencies_latest; +-- +goose StatementEnd +-- +goose StatementBegin +DROP TABLE IF EXISTS isis_adjacencies; +-- +goose StatementEnd +-- +goose StatementBegin +DROP VIEW IF EXISTS interface_state_latest; +-- +goose StatementEnd +-- +goose StatementBegin +DROP TABLE IF EXISTS interface_state; +-- +goose StatementEnd +-- +goose StatementBegin +DROP VIEW IF EXISTS interface_ifindex_latest; +-- +goose StatementEnd +-- +goose StatementBegin +DROP TABLE IF EXISTS interface_ifindex; +-- +goose StatementEnd +-- +goose StatementBegin +DROP VIEW IF EXISTS bgp_neighbors_latest; +-- +goose StatementEnd +-- +goose StatementBegin +DROP TABLE IF EXISTS bgp_neighbors; +-- +goose StatementEnd diff --git a/telemetry/flow-enricher/internal/flow-enricher/fixtures/create_table_device_ifindex.sql b/telemetry/migrations/20250303200214_device_ifindex_init.sql similarity index 54% rename from telemetry/flow-enricher/internal/flow-enricher/fixtures/create_table_device_ifindex.sql rename to telemetry/migrations/20250303200214_device_ifindex_init.sql index eafdca490a..ea83ba5fd8 100644 --- a/telemetry/flow-enricher/internal/flow-enricher/fixtures/create_table_device_ifindex.sql +++ b/telemetry/migrations/20250303200214_device_ifindex_init.sql @@ -1,4 +1,6 @@ -CREATE TABLE default.device_ifindex +-- +goose Up +-- +goose StatementBegin +CREATE TABLE IF NOT EXISTS default.device_ifindex ( `pubkey` String, `ifindex` UInt64, @@ -10,3 +12,9 @@ ENGINE = ReplacingMergeTree(timestamp) PRIMARY KEY (ipv4_address, ifindex) ORDER BY (ipv4_address, ifindex) SETTINGS index_granularity = 8192; +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin +DROP TABLE IF EXISTS default.device_ifindex; +-- +goose StatementEnd diff --git a/telemetry/migrations/embed.go b/telemetry/migrations/embed.go new file mode 100644 index 0000000000..25b15ea175 --- /dev/null +++ b/telemetry/migrations/embed.go @@ -0,0 +1,7 @@ +// Package migrations embeds ClickHouse migration SQL files for use with goose. +package migrations + +import "embed" + +//go:embed [0-9]*.sql +var FS embed.FS diff --git a/telemetry/migrations/migrations.go b/telemetry/migrations/migrations.go new file mode 100644 index 0000000000..4a20282f42 --- /dev/null +++ b/telemetry/migrations/migrations.go @@ -0,0 +1,50 @@ +package migrations + +import ( + "context" + "crypto/tls" + "database/sql" + "fmt" + "log/slog" + + "github.com/ClickHouse/clickhouse-go/v2" + "github.com/pressly/goose/v3" +) + +// RunMigrations applies pending goose migrations against ClickHouse. +func RunMigrations(addr, database, username, password string, secure bool, log *slog.Logger) error { + db, err := NewDB(addr, database, username, password, secure) + if err != nil { + return err + } + defer func() { _ = db.Close() }() + + provider, err := goose.NewProvider(goose.DialectClickHouse, db, FS, goose.WithSlog(log)) + if err != nil { + return fmt.Errorf("goose provider: %w", err) + } + if _, err = provider.Up(context.Background()); err != nil { + return fmt.Errorf("goose up: %w", err) + } + return nil +} + +// NewDB opens a ClickHouse database connection for use in tests or custom migration scenarios. +func NewDB(addr, database, username, password string, secure bool) (*sql.DB, error) { + opts := &clickhouse.Options{ + Addr: []string{addr}, + Auth: clickhouse.Auth{ + Database: database, + Username: username, + Password: password, + }, + } + if secure { + opts.TLS = &tls.Config{} + } + db := clickhouse.OpenDB(opts) + if err := db.Ping(); err != nil { + return nil, fmt.Errorf("clickhouse ping: %w", err) + } + return db, nil +} diff --git a/telemetry/migrations/setup.sql b/telemetry/migrations/setup.sql new file mode 100644 index 0000000000..b956a2d22b --- /dev/null +++ b/telemetry/migrations/setup.sql @@ -0,0 +1,14 @@ +-- One-time manual setup: create databases and writer users before running migrations. +-- Run this as a ClickHouse admin before deploying any telemetry services. + +CREATE DATABASE IF NOT EXISTS telemetry_mainnet_beta; +CREATE USER IF NOT EXISTS telemetry_mainnet_beta IDENTIFIED BY 'changeme'; +GRANT SELECT, INSERT, CREATE TABLE, DROP TABLE, ALTER TABLE ON telemetry_mainnet_beta.* TO telemetry_mainnet_beta; + +CREATE DATABASE IF NOT EXISTS telemetry_testnet; +CREATE USER IF NOT EXISTS telemetry_testnet IDENTIFIED BY 'changeme'; +GRANT SELECT, INSERT, CREATE TABLE, DROP TABLE, ALTER TABLE ON telemetry_testnet.* TO telemetry_testnet; + +CREATE DATABASE IF NOT EXISTS telemetry_devnet; +CREATE USER IF NOT EXISTS telemetry_devnet IDENTIFIED BY 'changeme'; +GRANT SELECT, INSERT, CREATE TABLE, DROP TABLE, ALTER TABLE ON telemetry_devnet.* TO telemetry_devnet;