From f5d62c4ea445e6c34fc4d1b79de1f0bf2fa87407 Mon Sep 17 00:00:00 2001 From: Kynan Rilee Date: Thu, 7 May 2026 14:23:51 -0400 Subject: [PATCH] Claude: GCP Connection, holds Service Account Key --- Cargo.lock | 92 ++++++++++++++++-- Cargo.toml | 1 + deny.toml | 18 +++- .../src/catalog/builtin_table_updates.rs | 1 + src/adapter/src/catalog/state.rs | 1 + src/adapter/src/coord/ddl.rs | 2 + src/sql-lexer/src/keywords.txt | 2 + src/sql-parser/src/ast/defs/ddl.rs | 8 ++ src/sql-parser/src/parser.rs | 19 ++-- src/sql-parser/tests/testdata/ddl | 7 ++ src/sql/src/plan.rs | 3 + src/sql/src/plan/statement/ddl.rs | 1 + src/sql/src/plan/statement/ddl/connection.rs | 10 ++ src/storage-types/Cargo.toml | 1 + src/storage-types/src/connections.rs | 10 ++ src/storage-types/src/connections/gcp.rs | 97 +++++++++++++++++++ 16 files changed, 259 insertions(+), 14 deletions(-) create mode 100644 src/storage-types/src/connections/gcp.rs diff --git a/Cargo.lock b/Cargo.lock index d456d1edecbaf..819f4e4352d24 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2162,6 +2162,16 @@ dependencies = [ "libc", ] +[[package]] +name = "core-foundation" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2a6cd9ae233e7f62ba4e9353e81a88df7fc8a5987b8d445b4d90c879bd156f6" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "core-foundation-sys" version = "0.8.7" @@ -3670,6 +3680,32 @@ dependencies = [ "slab", ] +[[package]] +name = "gcp_auth" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2b3d0b409a042a380111af38136310839af8ac1a0917fb6e84515ed1e4bf3ee" +dependencies = [ + "async-trait", + "base64 0.22.1", + "bytes", + "chrono", + "http 1.4.0", + "http-body-util", + "hyper 1.9.0", + "hyper-rustls", + "hyper-util", + "ring", + "rustls-pki-types", + "serde", + "serde_json", + "thiserror 2.0.18", + "tokio", + "tracing", + "tracing-futures", + "url", +] + [[package]] name = "generator" version = "0.7.5" @@ -4183,6 +4219,7 @@ dependencies = [ "hyper 1.9.0", "hyper-util", "rustls", + "rustls-native-certs", "rustls-pki-types", "tokio", "tokio-rustls", @@ -5478,10 +5515,10 @@ dependencies = [ "mz-ore", "mz-sql-parser", "open", - "openssl-probe", + "openssl-probe 0.1.6", "reqwest", "rpassword", - "security-framework", + "security-framework 2.10.0", "semver", "serde", "serde-aux", @@ -8242,6 +8279,7 @@ dependencies = [ "dec", "derivative", "differential-dataflow", + "gcp_auth", "hex", "http 1.4.0", "iceberg", @@ -8545,10 +8583,10 @@ dependencies = [ "libc", "log", "openssl", - "openssl-probe", + "openssl-probe 0.1.6", "openssl-sys", "schannel", - "security-framework", + "security-framework 2.10.0", "security-framework-sys", "tempfile", ] @@ -8954,6 +8992,12 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" +[[package]] +name = "openssl-probe" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c87def4c32ab89d880effc9e097653c8da5d6ef28e6b539d313baaacfbafcbe" + [[package]] name = "openssl-src" version = "300.3.1+3.3.1" @@ -10789,12 +10833,25 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69f9466fb2c14ea04357e91413efb882e2a6d4a406e625449bc0a5d360d53a21" dependencies = [ "once_cell", + "ring", "rustls-pki-types", "rustls-webpki", "subtle", "zeroize", ] +[[package]] +name = "rustls-native-certs" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "612460d5f7bea540c490b2b6395d8e34a953e52b491accd6c86c8164c5932a63" +dependencies = [ + "openssl-probe 0.2.1", + "rustls-pki-types", + "schannel", + "security-framework 3.7.0", +] + [[package]] name = "rustls-pki-types" version = "1.14.0" @@ -10940,7 +10997,20 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "770452e37cad93e0a50d5abc3990d2bc351c36d0328f86cefec2f2fb206eaef6" dependencies = [ "bitflags 1.3.2", - "core-foundation", + "core-foundation 0.9.3", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework" +version = "3.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7f4bc775c73d9a02cde8bf7b2ec4c9d12743edf609006c7facc23998404cd1d" +dependencies = [ + "bitflags 2.11.0", + "core-foundation 0.10.1", "core-foundation-sys", "libc", "security-framework-sys", @@ -11746,7 +11816,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a13f3d0daba03132c0aa9767f98351b3488edc2c100cda2d2ec2b04f3d8d3c8b" dependencies = [ "bitflags 2.11.0", - "core-foundation", + "core-foundation 0.9.3", "system-configuration-sys", ] @@ -12554,6 +12624,16 @@ dependencies = [ "valuable", ] +[[package]] +name = "tracing-futures" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2" +dependencies = [ + "pin-project", + "tracing", +] + [[package]] name = "tracing-log" version = "0.2.0" diff --git a/Cargo.toml b/Cargo.toml index 53a8df6cee545..1e5679419899a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -346,6 +346,7 @@ futures = "0.3.32" futures-core = "0.3.31" futures-task = "0.3.31" futures-util = "0.3.31" +gcp_auth = "0.12.6" glob = "0.3.3" globset = "0.4.18" governor = "0.10.1" diff --git a/deny.toml b/deny.toml index 44e2eab541bcb..a36382191b6a4 100644 --- a/deny.toml +++ b/deny.toml @@ -130,6 +130,19 @@ skip = [ { name = "hashbrown", version = "0.16.1" }, # Used by dynfmt; iceberg/typetag pulls in v0.4. { name = "erased-serde", version = "0.3.26" }, + # gcp_auth → hyper-rustls → rustls-native-certs pulls newer versions + # while native-tls still pulls older versions. + { name = "core-foundation", version = "0.10.1" }, + { name = "security-framework", version = "3.7.0" }, + { name = "openssl-probe", version = "0.2.1" }, + # reqsign (via iceberg-storage-opendal / opendal) pins older deps + # than the workspace. + { name = "jsonwebtoken", version = "9.3.1" }, + { name = "quick-xml", version = "0.37.5" }, + # aws-lc-rs (via jsonwebtoken 10) and ring pull different `untrusted`. + { name = "untrusted", version = "0.7.1" }, + # Held back by lazy_static 1.4.0 (used by num-bigint-dig). + { name = "spin", version = "0.5.2" }, ] [[bans.deny]] @@ -206,9 +219,11 @@ wrappers = [ ] # We prefer the system's native TLS or OpenSSL to Rustls, since they are more -# mature and more widely used. +# mature and more widely used. `gcp_auth` only ships with rustls-based TLS, +# so allow it through. [[bans.deny]] name = "rustls" +wrappers = ["hyper-rustls", "tokio-rustls"] # once_cell is going to be added to std, and doesn't use macros # Unfortunately, its heavily used, so we have lots of exceptions. @@ -219,6 +234,7 @@ wrappers = [ "findshlibs", "launchdarkly-server-sdk", "launchdarkly-server-sdk-evaluation", + "num-bigint-dig", "prometheus", "rayon-core", "sharded-slab", diff --git a/src/adapter/src/catalog/builtin_table_updates.rs b/src/adapter/src/catalog/builtin_table_updates.rs index 4c9b4e065903c..cfa477c1f1ece 100644 --- a/src/adapter/src/catalog/builtin_table_updates.rs +++ b/src/adapter/src/catalog/builtin_table_updates.rs @@ -1039,6 +1039,7 @@ impl CatalogState { updates.push(self.pack_ssh_tunnel_connection_update(id, key_1, key_2, diff)); } ConnectionDetails::Csr(_) + | ConnectionDetails::Gcp(_) | ConnectionDetails::Postgres(_) | ConnectionDetails::MySql(_) | ConnectionDetails::SqlServer(_) diff --git a/src/adapter/src/catalog/state.rs b/src/adapter/src/catalog/state.rs index e32543b731747..22f37c4d7ad0d 100644 --- a/src/adapter/src/catalog/state.rs +++ b/src/adapter/src/catalog/state.rs @@ -2777,6 +2777,7 @@ impl ConnectionResolver for CatalogState { Ssh(conn) => Ssh(conn), Aws(conn) => Aws(conn), AwsPrivatelink(conn) => AwsPrivatelink(conn), + Gcp(conn) => Gcp(conn), MySql(conn) => MySql(conn.into_inline_connection(self)), SqlServer(conn) => SqlServer(conn.into_inline_connection(self)), IcebergCatalog(conn) => IcebergCatalog(conn.into_inline_connection(self)), diff --git a/src/adapter/src/coord/ddl.rs b/src/adapter/src/coord/ddl.rs index dfd1cc0dc72e1..bec4c8248b0cc 100644 --- a/src/adapter/src/coord/ddl.rs +++ b/src/adapter/src/coord/ddl.rs @@ -1135,6 +1135,7 @@ impl Coordinator { ConnectionDetails::Csr(_) | ConnectionDetails::Ssh { .. } | ConnectionDetails::Aws(_) + | ConnectionDetails::Gcp(_) | ConnectionDetails::IcebergCatalog(_) => {} }, CatalogItem::Table(_) => { @@ -1311,6 +1312,7 @@ impl Coordinator { ConnectionDetails::Csr(_) | ConnectionDetails::Ssh { .. } | ConnectionDetails::Aws(_) + | ConnectionDetails::Gcp(_) | ConnectionDetails::IcebergCatalog(_) => {} } } diff --git a/src/sql-lexer/src/keywords.txt b/src/sql-lexer/src/keywords.txt index b5832107b0644..873b3c833feff 100644 --- a/src/sql-lexer/src/keywords.txt +++ b/src/sql-lexer/src/keywords.txt @@ -25,6 +25,7 @@ Abort Access +Account Action Add Added @@ -200,6 +201,7 @@ Full Fullname Function Fusion +Gcp Generator Grant Greatest diff --git a/src/sql-parser/src/ast/defs/ddl.rs b/src/sql-parser/src/ast/defs/ddl.rs index 4beaf51fc602f..4984f6eddfbfb 100644 --- a/src/sql-parser/src/ast/defs/ddl.rs +++ b/src/sql-parser/src/ast/defs/ddl.rs @@ -791,6 +791,7 @@ pub enum ConnectionOptionName { Scope, SecretAccessKey, SecurityProtocol, + ServiceAccountKey, ServiceName, SshTunnel, SslCertificate, @@ -834,6 +835,7 @@ impl AstDisplay for ConnectionOptionName { ConnectionOptionName::Scope => "SCOPE", ConnectionOptionName::SecurityProtocol => "SECURITY PROTOCOL", ConnectionOptionName::SecretAccessKey => "SECRET ACCESS KEY", + ConnectionOptionName::ServiceAccountKey => "SERVICE ACCOUNT KEY", ConnectionOptionName::ServiceName => "SERVICE NAME", ConnectionOptionName::SshTunnel => "SSH TUNNEL", ConnectionOptionName::SslCertificate => "SSL CERTIFICATE", @@ -883,6 +885,7 @@ impl WithOptionName for ConnectionOptionName { | ConnectionOptionName::Scope | ConnectionOptionName::SecurityProtocol | ConnectionOptionName::SecretAccessKey + | ConnectionOptionName::ServiceAccountKey | ConnectionOptionName::ServiceName | ConnectionOptionName::SshTunnel | ConnectionOptionName::SslCertificate @@ -911,6 +914,7 @@ impl_display_t!(ConnectionOption); pub enum CreateConnectionType { Aws, AwsPrivatelink, + Gcp, Kafka, Csr, Postgres, @@ -928,6 +932,7 @@ impl CreateConnectionType { Self::Postgres => "postgres", Self::Aws => "aws", Self::AwsPrivatelink => "aws-privatelink", + Self::Gcp => "gcp", Self::Ssh => "ssh-tunnel", Self::MySql => "mysql", Self::SqlServer => "sql-server", @@ -954,6 +959,9 @@ impl AstDisplay for CreateConnectionType { Self::AwsPrivatelink => { f.write_str("AWS PRIVATELINK"); } + Self::Gcp => { + f.write_str("GCP"); + } Self::Ssh => { f.write_str("SSH TUNNEL"); } diff --git a/src/sql-parser/src/parser.rs b/src/sql-parser/src/parser.rs index 6232ebb693453..e76d4bbe2e8be 100644 --- a/src/sql-parser/src/parser.rs +++ b/src/sql-parser/src/parser.rs @@ -2456,9 +2456,9 @@ impl<'a> Parser<'a> { TO => true, _ => unreachable!(), }; - let connection_type = match self - .expect_one_of_keywords(&[AWS, KAFKA, CONFLUENT, POSTGRES, SSH, SQL, MYSQL, ICEBERG])? - { + let connection_type = match self.expect_one_of_keywords(&[ + AWS, GCP, KAFKA, CONFLUENT, POSTGRES, SSH, SQL, MYSQL, ICEBERG, + ])? { AWS => { if self.parse_keyword(PRIVATELINK) { CreateConnectionType::AwsPrivatelink @@ -2466,6 +2466,7 @@ impl<'a> Parser<'a> { CreateConnectionType::Aws } } + GCP => CreateConnectionType::Gcp, KAFKA => CreateConnectionType::Kafka, CONFLUENT => { self.expect_keywords(&[SCHEMA, REGISTRY])?; @@ -2873,10 +2874,14 @@ impl<'a> Parser<'a> { self.expect_keywords(&[ACCESS, KEY])?; ConnectionOptionName::SecretAccessKey } - SERVICE => { - self.expect_keyword(NAME)?; - ConnectionOptionName::ServiceName - } + SERVICE => match self.expect_one_of_keywords(&[ACCOUNT, NAME])? { + ACCOUNT => { + self.expect_keyword(KEY)?; + ConnectionOptionName::ServiceAccountKey + } + NAME => ConnectionOptionName::ServiceName, + _ => unreachable!(), + }, SESSION => { self.expect_keyword(TOKEN)?; ConnectionOptionName::SessionToken diff --git a/src/sql-parser/tests/testdata/ddl b/src/sql-parser/tests/testdata/ddl index 4a98459b0ffd1..e5bc38f3656dd 100644 --- a/src/sql-parser/tests/testdata/ddl +++ b/src/sql-parser/tests/testdata/ddl @@ -561,6 +561,13 @@ CREATE CONNECTION privatelinkconn TO AWS PRIVATELINK (SERVICE NAME = 'com.amazon => CreateConnection(CreateConnectionStatement { name: UnresolvedItemName([Ident("privatelinkconn")]), connection_type: AwsPrivatelink, if_not_exists: false, values: [ConnectionOption { name: ServiceName, value: Some(Value(String("com.amazonaws.vpce.us-east-1.vpce-svc-0e123abc123198abc"))) }, ConnectionOption { name: AvailabilityZones, value: Some(Sequence([Value(String("use1-az1")), Value(String("use1-az4"))])) }], with_options: [] }) +parse-statement +CREATE CONNECTION gcpconn TO GCP (SERVICE ACCOUNT KEY = SECRET keyfile) +---- +CREATE CONNECTION gcpconn TO GCP (SERVICE ACCOUNT KEY = SECRET keyfile) +=> +CreateConnection(CreateConnectionStatement { name: UnresolvedItemName([Ident("gcpconn")]), connection_type: Gcp, if_not_exists: false, values: [ConnectionOption { name: ServiceAccountKey, value: Some(Secret(Name(UnresolvedItemName([Ident("keyfile")])))) }], with_options: [] }) + parse-statement CREATE CONNECTION privatelinkconn TO AWS PRIVATELINK (SERVICE NAME 'com.amazonaws.vpce.us-east-1.vpce-svc-0e123abc123198abc', AVAILABILITY ZONES ('use1-az1', 'use1-az4')) WITH (VALIDATE = FALSE) ---- diff --git a/src/sql/src/plan.rs b/src/sql/src/plan.rs index 6a885020b7486..a99d4535b3af7 100644 --- a/src/sql/src/plan.rs +++ b/src/sql/src/plan.rs @@ -57,6 +57,7 @@ use mz_sql_parser::ast::{ }; use mz_ssh_util::keys::SshKeyPair; use mz_storage_types::connections::aws::AwsConnection; +use mz_storage_types::connections::gcp::GcpConnection; use mz_storage_types::connections::inline::ReferencedConnection; use mz_storage_types::connections::{ AwsPrivatelinkConnection, CsrConnection, IcebergCatalogConnection, KafkaConnection, @@ -1676,6 +1677,7 @@ pub enum ConnectionDetails { }, Aws(AwsConnection), AwsPrivatelink(AwsPrivatelinkConnection), + Gcp(GcpConnection), MySql(MySqlConnection), SqlServer(SqlServerConnectionDetails), IcebergCatalog(IcebergCatalogConnection), @@ -1698,6 +1700,7 @@ impl ConnectionDetails { ConnectionDetails::AwsPrivatelink(c) => { mz_storage_types::connections::Connection::AwsPrivatelink(c.clone()) } + ConnectionDetails::Gcp(c) => mz_storage_types::connections::Connection::Gcp(c.clone()), ConnectionDetails::MySql(c) => { mz_storage_types::connections::Connection::MySql(c.clone()) } diff --git a/src/sql/src/plan/statement/ddl.rs b/src/sql/src/plan/statement/ddl.rs index af3aab6d4e517..b226547b47908 100644 --- a/src/sql/src/plan/statement/ddl.rs +++ b/src/sql/src/plan/statement/ddl.rs @@ -7017,6 +7017,7 @@ pub fn plan_alter_connection( let connection_type = match connection { Connection::Aws(_) => CreateConnectionType::Aws, Connection::AwsPrivatelink(_) => CreateConnectionType::AwsPrivatelink, + Connection::Gcp(_) => CreateConnectionType::Gcp, Connection::Kafka(_) => CreateConnectionType::Kafka, Connection::Csr(_) => CreateConnectionType::Csr, Connection::Postgres(_) => CreateConnectionType::Postgres, diff --git a/src/sql/src/plan/statement/ddl/connection.rs b/src/sql/src/plan/statement/ddl/connection.rs index 81825849f7247..e55271857cc8a 100644 --- a/src/sql/src/plan/statement/ddl/connection.rs +++ b/src/sql/src/plan/statement/ddl/connection.rs @@ -29,6 +29,7 @@ use mz_ssh_util::keys::SshKeyPair; use mz_storage_types::connections::aws::{ AwsAssumeRole, AwsAuth, AwsConnection, AwsConnectionReference, AwsCredentials, }; +use mz_storage_types::connections::gcp::GcpConnection; use mz_storage_types::connections::inline::ReferencedConnection; use mz_storage_types::connections::string_or_secret::StringOrSecret; use mz_storage_types::connections::{ @@ -72,6 +73,7 @@ generate_extracted_config!( (Scope, String), (SecretAccessKey, with_options::Secret), (SecurityProtocol, String), + (ServiceAccountKey, with_options::Secret), (ServiceName, String), (SshTunnel, with_options::Object), (SslCertificate, StringOrSecret), @@ -115,6 +117,7 @@ pub(super) fn validate_options_per_connection_type( ] .as_slice(), CreateConnectionType::AwsPrivatelink => &[AvailabilityZones, Port, ServiceName], + CreateConnectionType::Gcp => &[ServiceAccountKey], CreateConnectionType::Csr => &[ AwsPrivatelink, Password, @@ -312,6 +315,13 @@ impl ConnectionOptionExtracted { } ConnectionDetails::AwsPrivatelink(connection) } + CreateConnectionType::Gcp => { + let credentials_json = self + .service_account_key + .ok_or_else(|| sql_err!("SERVICE ACCOUNT KEY option is required"))? + .into(); + ConnectionDetails::Gcp(GcpConnection { credentials_json }) + } CreateConnectionType::Kafka => { let (tls, sasl) = plan_kafka_security(scx, &self)?; let (static_brokers, matching_rules) = self.get_brokers_and_rules(scx)?; diff --git a/src/storage-types/Cargo.toml b/src/storage-types/Cargo.toml index f8de6fa6657fe..156a7f35ff93a 100644 --- a/src/storage-types/Cargo.toml +++ b/src/storage-types/Cargo.toml @@ -28,6 +28,7 @@ columnation.workspace = true dec.workspace = true derivative.workspace = true differential-dataflow.workspace = true +gcp_auth.workspace = true hex.workspace = true http.workspace = true itertools.workspace = true diff --git a/src/storage-types/src/connections.rs b/src/storage-types/src/connections.rs index 31badd1be6c76..a32572311076b 100644 --- a/src/storage-types/src/connections.rs +++ b/src/storage-types/src/connections.rs @@ -80,6 +80,7 @@ use crate::dyncfgs::{ use crate::errors::{ContextCreationError, CsrConnectError}; pub mod aws; +pub mod gcp; pub mod inline; pub mod string_or_secret; @@ -355,6 +356,7 @@ pub enum Connection { Ssh(SshConnection), Aws(AwsConnection), AwsPrivatelink(AwsPrivatelinkConnection), + Gcp(gcp::GcpConnection), MySql(MySqlConnection), SqlServer(SqlServerConnectionDetails), IcebergCatalog(IcebergCatalogConnection), @@ -371,6 +373,7 @@ impl IntoInlineConnection Connection::Ssh(ssh) => Connection::Ssh(ssh), Connection::Aws(aws) => Connection::Aws(aws), Connection::AwsPrivatelink(awspl) => Connection::AwsPrivatelink(awspl), + Connection::Gcp(gcp) => Connection::Gcp(gcp), Connection::MySql(mysql) => Connection::MySql(mysql.into_inline_connection(r)), Connection::SqlServer(sql_server) => { Connection::SqlServer(sql_server.into_inline_connection(r)) @@ -392,6 +395,7 @@ impl Connection { Connection::Ssh(conn) => conn.validate_by_default(), Connection::Aws(conn) => conn.validate_by_default(), Connection::AwsPrivatelink(conn) => conn.validate_by_default(), + Connection::Gcp(conn) => conn.validate_by_default(), Connection::MySql(conn) => conn.validate_by_default(), Connection::SqlServer(conn) => conn.validate_by_default(), Connection::IcebergCatalog(conn) => conn.validate_by_default(), @@ -415,6 +419,7 @@ impl Connection { Connection::Ssh(conn) => conn.validate(id, storage_configuration).await?, Connection::Aws(conn) => conn.validate(id, storage_configuration).await?, Connection::AwsPrivatelink(conn) => conn.validate(id, storage_configuration).await?, + Connection::Gcp(conn) => conn.validate(id, storage_configuration).await?, Connection::MySql(conn) => { conn.validate(id, storage_configuration).await?; } @@ -494,6 +499,8 @@ pub enum ConnectionValidationError { SqlServer(#[from] SqlServerConnectionValidationError), #[error(transparent)] Aws(#[from] AwsConnectionValidationError), + #[error(transparent)] + Gcp(#[from] gcp::GcpConnectionValidationError), #[error("{}", .0.display_with_causes())] Other(#[from] anyhow::Error), } @@ -506,6 +513,7 @@ impl ConnectionValidationError { ConnectionValidationError::MySql(e) => e.detail(), ConnectionValidationError::SqlServer(e) => e.detail(), ConnectionValidationError::Aws(e) => e.detail(), + ConnectionValidationError::Gcp(e) => e.detail(), ConnectionValidationError::Other(_) => None, } } @@ -517,6 +525,7 @@ impl ConnectionValidationError { ConnectionValidationError::MySql(e) => e.hint(), ConnectionValidationError::SqlServer(e) => e.hint(), ConnectionValidationError::Aws(e) => e.hint(), + ConnectionValidationError::Gcp(e) => e.hint(), ConnectionValidationError::Other(_) => None, } } @@ -527,6 +536,7 @@ impl AlterCompatible for Connection { match (self, other) { (Self::Aws(s), Self::Aws(o)) => s.alter_compatible(id, o), (Self::AwsPrivatelink(s), Self::AwsPrivatelink(o)) => s.alter_compatible(id, o), + (Self::Gcp(s), Self::Gcp(o)) => s.alter_compatible(id, o), (Self::Ssh(s), Self::Ssh(o)) => s.alter_compatible(id, o), (Self::Csr(s), Self::Csr(o)) => s.alter_compatible(id, o), (Self::Kafka(s), Self::Kafka(o)) => s.alter_compatible(id, o), diff --git a/src/storage-types/src/connections/gcp.rs b/src/storage-types/src/connections/gcp.rs new file mode 100644 index 0000000000000..65a6bf0bef4fa --- /dev/null +++ b/src/storage-types/src/connections/gcp.rs @@ -0,0 +1,97 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +//! GCP configuration for sources and sinks. + +use gcp_auth::{CustomServiceAccount, TokenProvider}; +use mz_ore::error::ErrorExt; +use mz_repr::{CatalogItemId, GlobalId}; +use proptest_derive::Arbitrary; +use serde::{Deserialize, Serialize}; + +use crate::AlterCompatible; +use crate::configuration::StorageConfiguration; +use crate::controller::AlterError; + +/// Scope used when probing the credentials during validation. Picked because +/// every service-account key is allowed to mint tokens for it, so a successful +/// response confirms the key is well-formed and accepted by Google's token +/// endpoint without requiring any specific IAM grants on the service account. +const VALIDATION_SCOPE: &str = "https://www.googleapis.com/auth/cloud-platform"; + +/// GCP connection configuration. +#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)] +pub struct GcpConnection { + /// Secret containing a GCP service-account key in JSON format, + /// as produced by `gcloud iam service-accounts keys create`. + pub credentials_json: CatalogItemId, +} + +impl AlterCompatible for GcpConnection { + fn alter_compatible(&self, _id: GlobalId, _other: &Self) -> Result<(), AlterError> { + // Every element of the GCP connection is configurable. + Ok(()) + } +} + +impl GcpConnection { + /// Validates this connection by reading the service-account key out of the + /// secrets store, parsing it, and exchanging it for an OAuth2 access token + /// at Google's token endpoint. + pub(crate) async fn validate( + &self, + _id: CatalogItemId, + storage_configuration: &StorageConfiguration, + ) -> Result<(), GcpConnectionValidationError> { + let json = storage_configuration + .connection_context + .secrets_reader + .read_string(self.credentials_json) + .await + .map_err(GcpConnectionValidationError::SecretRead)?; + let service_account = CustomServiceAccount::from_json(&json) + .map_err(GcpConnectionValidationError::ParseKey)?; + service_account + .token(&[VALIDATION_SCOPE]) + .await + .map_err(GcpConnectionValidationError::FetchToken)?; + Ok(()) + } + + pub(crate) fn validate_by_default(&self) -> bool { + false + } +} + +/// An error returned by `GcpConnection::validate`. +#[derive(thiserror::Error, Debug)] +pub enum GcpConnectionValidationError { + #[error("failed to read service-account key from secret store: {}", .0.display_with_causes())] + SecretRead(#[source] anyhow::Error), + #[error("failed to parse service-account key JSON: {}", .0.display_with_causes())] + ParseKey(#[source] gcp_auth::Error), + #[error("failed to obtain access token from Google: {}", .0.display_with_causes())] + FetchToken(#[source] gcp_auth::Error), +} + +impl GcpConnectionValidationError { + pub fn detail(&self) -> Option { + None + } + + pub fn hint(&self) -> Option { + match self { + GcpConnectionValidationError::ParseKey(_) => Some( + "The secret must hold the JSON output of `gcloud iam service-accounts keys create`." + .into(), + ), + _ => None, + } + } +}