diff --git a/crates/assemble/src/lib.rs b/crates/assemble/src/lib.rs index f9324b081c7..4d254dbbeab 100644 --- a/crates/assemble/src/lib.rs +++ b/crates/assemble/src/lib.rs @@ -379,6 +379,7 @@ pub fn shard_template( read_channel_size, ring_buffer_size, log_level, + flags, } = shard; // We hard-code that recovery logs always have prefix "recovery". @@ -453,6 +454,14 @@ pub fn shard_template( } } + for (name, value) in flags { + labels = labels::set_value( + labels, + &format!("{}{}", labels::FLAG_PREFIX, name.as_str()), + value, + ); + } + consumer::ShardSpec { id: shard_id_prefix.to_string(), disable: *disable, diff --git a/crates/labels/src/lib.rs b/crates/labels/src/lib.rs index 0db9437b5ed..535f421fb67 100644 --- a/crates/labels/src/lib.rs +++ b/crates/labels/src/lib.rs @@ -9,6 +9,7 @@ pub const BUILD: &str = "estuary.dev/build"; pub const COLLECTION: &str = "estuary.dev/collection"; pub const CORDON: &str = "estuary.dev/cordon"; pub const FIELD_PREFIX: &str = "estuary.dev/field/"; +pub const FLAG_PREFIX: &str = "estuary.dev/flag/"; pub const KEY_BEGIN: &str = "estuary.dev/key-begin"; pub const KEY_BEGIN_MIN: &str = "00000000"; pub const KEY_END: &str = "estuary.dev/key-end"; diff --git a/crates/labels/src/shard.rs b/crates/labels/src/shard.rs index 8488421a6b5..c167ac12d78 100644 --- a/crates/labels/src/shard.rs +++ b/crates/labels/src/shard.rs @@ -25,6 +25,10 @@ pub fn encode_labeling(mut set: LabelSet, labeling: &ops::ShardLabeling) -> Labe set = set_value(set, crate::TASK_NAME, &labeling.task_name); set = set_value(set, crate::TASK_TYPE, labeling.task_type().as_str_name()); + for (name, value) in &labeling.flags { + set = set_value(set, &format!("{}{name}", crate::FLAG_PREFIX), value); + } + set = set_value(set, crate::LOGS_JOURNAL, &labeling.logs_journal); set = set_value(set, crate::STATS_JOURNAL, &labeling.stats_journal); @@ -70,6 +74,13 @@ pub fn decode_labeling(set: &LabelSet) -> Result { let logs_journal = maybe_one(set, crate::LOGS_JOURNAL)?.to_string(); let stats_journal = maybe_one(set, crate::STATS_JOURNAL)?.to_string(); + let mut flags = std::collections::BTreeMap::new(); + for label in &set.labels { + if let Some(name) = label.name.strip_prefix(crate::FLAG_PREFIX) { + flags.insert(name.to_string(), label.value.clone()); + } + } + if !split_source.is_empty() && !split_target.is_empty() { return Err(Error::SplitSourceAndTarget( split_source.to_string(), @@ -88,6 +99,7 @@ pub fn decode_labeling(set: &LabelSet) -> Result { task_type, logs_journal, stats_journal, + flags, }) } @@ -162,19 +174,32 @@ mod test { split_target: "split/target".to_string(), task_name: "task/name".to_string(), task_type: ops::TaskType::Derivation as i32, + flags: [ + ("buffer-size".to_string(), "1024".to_string()), + ("enable-new-thing".to_string(), "true".to_string()), + ] + .into(), logs_journal: "logs/journal".to_string(), stats_journal: "stats/journal".to_string(), }; let set = encode_labeling(LabelSet::default(), &labeling); - insta::assert_json_snapshot!(set, @r###" + insta::assert_json_snapshot!(set, @r#" { "labels": [ { "name": "estuary.dev/build", "value": "a-build" }, + { + "name": "estuary.dev/flag/buffer-size", + "value": "1024" + }, + { + "name": "estuary.dev/flag/enable-new-thing", + "value": "true" + }, { "name": "estuary.dev/hostname", "value": "a.hostname" @@ -225,7 +250,7 @@ mod test { } ] } - "###); + "#); let id = format!("base/shard/id/{}", id_suffix(&set).unwrap()); assert_eq!(id, "base/shard/id/00000100-00000000"); @@ -242,6 +267,8 @@ mod test { // All labels except SPLIT_TARGET set. let model = build_set([ (crate::BUILD, "a-build"), + ("estuary.dev/flag/buffer-size", "1024"), + ("estuary.dev/flag/enable-new-thing", "true"), (crate::HOSTNAME, "a.hostname"), (crate::KEY_BEGIN, "00000001"), (crate::KEY_END, "00000002"), @@ -257,9 +284,13 @@ mod test { insta::assert_json_snapshot!( case(model.clone()), - @r###" + @r#" { "build": "a-build", + "flags": { + "buffer-size": "1024", + "enable-new-thing": "true" + }, "hostname": "a.hostname", "logLevel": "info", "logsJournal": "logs/journal", @@ -274,7 +305,7 @@ mod test { "taskName": "the/task", "taskType": "capture" } - "### + "# ); // Optional labels removed & split target instead of source. @@ -294,15 +325,19 @@ mod test { set = crate::add_value(set, crate::SPLIT_TARGET, "split/target"); insta::assert_json_snapshot!(case(set), - @r###" + @r#" { "build": "a-build", + "flags": { + "buffer-size": "1024", + "enable-new-thing": "true" + }, "logLevel": "info", "splitTarget": "split/target", "taskName": "the/task", "taskType": "capture" } - "### + "# ); // Expected label is missing. diff --git a/crates/models/src/shards.rs b/crates/models/src/shards.rs index 32ebdb3d31e..4671ab9618b 100644 --- a/crates/models/src/shards.rs +++ b/crates/models/src/shards.rs @@ -1,9 +1,11 @@ use schemars::JsonSchema; use serde::{Deserialize, Serialize}; +use std::collections::BTreeMap; use std::time::Duration; +use validator::Validate; /// A ShardTemplate configures how shards process a catalog task. -#[derive(Serialize, Deserialize, Debug, Default, JsonSchema, Clone, PartialEq)] +#[derive(Serialize, Deserialize, Debug, Default, JsonSchema, Clone, PartialEq, Validate)] #[serde(deny_unknown_fields, rename_all = "camelCase")] #[schemars(example = ShardTemplate::example())] pub struct ShardTemplate { @@ -77,6 +79,12 @@ pub struct ShardTemplate { #[serde(default, skip_serializing_if = "Option::is_none")] #[schemars(with = "String")] pub log_level: Option, + /// # Flags are string-valued feature flags. + /// Flag names must be valid tokens (Unicode letters, numbers, '-', '_', '.'). + /// Each flag produces a shard label `estuary.dev/flag/`. + #[serde(default, skip_serializing_if = "BTreeMap::is_empty")] + #[validate(nested)] + pub flags: BTreeMap, } impl ShardTemplate { @@ -96,6 +104,7 @@ impl ShardTemplate { ring_buffer_size: o4, read_channel_size: o5, log_level: o6, + flags, } = self; !disable @@ -105,5 +114,6 @@ impl ShardTemplate { && o4.is_none() && o5.is_none() && o6.is_none() + && flags.is_empty() } } diff --git a/crates/proto-flow/src/ops.rs b/crates/proto-flow/src/ops.rs index 4db839c63cc..57116332c6d 100644 --- a/crates/proto-flow/src/ops.rs +++ b/crates/proto-flow/src/ops.rs @@ -2,7 +2,7 @@ /// ShardLabeling is a parsed and validated representation of the Flow /// labels which are attached to Gazette ShardSpecs, that are understood /// by the Flow runtime and influence its behavior with respect to the shard. -#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +#[derive(Clone, PartialEq, ::prost::Message)] pub struct ShardLabeling { /// Catalog build identifier which the task uses. #[prost(string, tag = "1")] @@ -34,6 +34,12 @@ pub struct ShardLabeling { /// Journal to which task stats are directed. #[prost(string, tag = "11")] pub stats_journal: ::prost::alloc::string::String, + /// Flags are arbitrary string-valued feature flags set on the task's shard template. + #[prost(btree_map = "string, string", tag = "12")] + pub flags: ::prost::alloc::collections::BTreeMap< + ::prost::alloc::string::String, + ::prost::alloc::string::String, + >, } /// Common `shard` sub-document logged by Stats and Log. #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] @@ -183,23 +189,6 @@ pub mod stats { #[prost(uint64, tag = "2")] pub bytes_total: u64, } - /// MaterializeBinding represents stats for a single binding of a materialization task. - #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] - pub struct MaterializeBinding { - #[prost(message, optional, tag = "1")] - pub left: ::core::option::Option, - #[prost(message, optional, tag = "2")] - pub right: ::core::option::Option, - #[prost(message, optional, tag = "3")] - pub out: ::core::option::Option, - /// The most recent publish timestamp from the source documents that were - /// read by this binding. - #[prost(message, optional, tag = "4")] - pub last_source_published_at: ::core::option::Option<::pbjson_types::Timestamp>, - /// Total bytes behind across all source journals of this binding. - #[prost(uint64, tag = "5")] - pub bytes_behind: u64, - } /// CaptureBinding represents stats for a single binding of a capture task. #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] pub struct CaptureBinding { @@ -248,6 +237,23 @@ pub mod stats { pub bytes_behind: u64, } } + /// MaterializeBinding represents stats for a single binding of a materialization task. + #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] + pub struct MaterializeBinding { + #[prost(message, optional, tag = "1")] + pub left: ::core::option::Option, + #[prost(message, optional, tag = "2")] + pub right: ::core::option::Option, + #[prost(message, optional, tag = "3")] + pub out: ::core::option::Option, + /// The most recent publish timestamp from the source documents that were + /// read by this binding. + #[prost(message, optional, tag = "4")] + pub last_source_published_at: ::core::option::Option<::pbjson_types::Timestamp>, + /// Total bytes behind across all source journals of this binding. + #[prost(uint64, tag = "5")] + pub bytes_behind: u64, + } /// Interval metrics are emitted at regular intervals. #[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct Interval { diff --git a/crates/proto-flow/src/ops.serde.rs b/crates/proto-flow/src/ops.serde.rs index e90a714a9e3..b18f3cffd54 100644 --- a/crates/proto-flow/src/ops.serde.rs +++ b/crates/proto-flow/src/ops.serde.rs @@ -411,6 +411,9 @@ impl serde::Serialize for ShardLabeling { if !self.stats_journal.is_empty() { len += 1; } + if !self.flags.is_empty() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("ops.ShardLabeling", len)?; if !self.build.is_empty() { struct_ser.serialize_field("build", &self.build)?; @@ -446,6 +449,9 @@ impl serde::Serialize for ShardLabeling { if !self.stats_journal.is_empty() { struct_ser.serialize_field("statsJournal", &self.stats_journal)?; } + if !self.flags.is_empty() { + struct_ser.serialize_field("flags", &self.flags)?; + } struct_ser.end() } } @@ -473,6 +479,7 @@ impl<'de> serde::Deserialize<'de> for ShardLabeling { "logsJournal", "stats_journal", "statsJournal", + "flags", ]; #[allow(clippy::enum_variant_names)] @@ -487,6 +494,7 @@ impl<'de> serde::Deserialize<'de> for ShardLabeling { TaskType, LogsJournal, StatsJournal, + Flags, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -518,6 +526,7 @@ impl<'de> serde::Deserialize<'de> for ShardLabeling { "taskType" | "task_type" => Ok(GeneratedField::TaskType), "logsJournal" | "logs_journal" => Ok(GeneratedField::LogsJournal), "statsJournal" | "stats_journal" => Ok(GeneratedField::StatsJournal), + "flags" => Ok(GeneratedField::Flags), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -547,6 +556,7 @@ impl<'de> serde::Deserialize<'de> for ShardLabeling { let mut task_type__ = None; let mut logs_journal__ = None; let mut stats_journal__ = None; + let mut flags__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::Build => { @@ -609,6 +619,14 @@ impl<'de> serde::Deserialize<'de> for ShardLabeling { } stats_journal__ = Some(map_.next_value()?); } + GeneratedField::Flags => { + if flags__.is_some() { + return Err(serde::de::Error::duplicate_field("flags")); + } + flags__ = Some( + map_.next_value::>()? + ); + } } } Ok(ShardLabeling { @@ -622,6 +640,7 @@ impl<'de> serde::Deserialize<'de> for ShardLabeling { task_type: task_type__.unwrap_or_default(), logs_journal: logs_journal__.unwrap_or_default(), stats_journal: stats_journal__.unwrap_or_default(), + flags: flags__.unwrap_or_default(), }) } } diff --git a/crates/proto-flow/tests/regression.rs b/crates/proto-flow/tests/regression.rs index b2127933917..1250e678177 100644 --- a/crates/proto-flow/tests/regression.rs +++ b/crates/proto-flow/tests/regression.rs @@ -715,6 +715,11 @@ fn ex_shard_labeling() -> ops::ShardLabeling { split_target: String::new(), task_name: "the/task/name".to_string(), task_type: ops::TaskType::Derivation as i32, + flags: [ + ("buffer-size".to_string(), "1024".to_string()), + ("enable-new-thing".to_string(), "true".to_string()), + ] + .into(), logs_journal: "ops/logs/one=capture/two=the%2Ftask%2Fname".to_string(), stats_journal: "ops/stats/one=capture/two=the%2Ftask%2Fname".to_string(), } diff --git a/crates/proto-flow/tests/snapshots/regression__shard_labeling_json.snap b/crates/proto-flow/tests/snapshots/regression__shard_labeling_json.snap index 2d9b1e89990..bb2140bf9d9 100644 --- a/crates/proto-flow/tests/snapshots/regression__shard_labeling_json.snap +++ b/crates/proto-flow/tests/snapshots/regression__shard_labeling_json.snap @@ -16,5 +16,9 @@ expression: json_test(msg) "taskName": "the/task/name", "taskType": "derivation", "logsJournal": "ops/logs/one=capture/two=the%2Ftask%2Fname", - "statsJournal": "ops/stats/one=capture/two=the%2Ftask%2Fname" + "statsJournal": "ops/stats/one=capture/two=the%2Ftask%2Fname", + "flags": { + "buffer-size": "1024", + "enable-new-thing": "true" + } } diff --git a/crates/proto-flow/tests/snapshots/regression__shard_labeling_proto.snap b/crates/proto-flow/tests/snapshots/regression__shard_labeling_proto.snap index 39a77ecc3ae..34a3cff9084 100644 --- a/crates/proto-flow/tests/snapshots/regression__shard_labeling_proto.snap +++ b/crates/proto-flow/tests/snapshots/regression__shard_labeling_proto.snap @@ -12,5 +12,8 @@ expression: proto_test(msg) |74686525 32467461 736b2532 466e616d| the%2Ftask%2Fnam 00000070 |655a2b6f 70732f73 74617473 2f6f6e65| eZ+ops/stats/one 00000080 |3d636170 74757265 2f74776f 3d746865| =capture/two=the 00000090 -|25324674 61736b25 32466e61 6d65| %2Ftask%2Fname 000000a0 - 000000ae +|25324674 61736b25 32466e61 6d656213| %2Ftask%2Fnameb. 000000a0 +|0a0b6275 66666572 2d73697a 65120431| ..buffer-size..1 000000b0 +|30323462 180a1065 6e61626c 652d6e65| 024b...enable-ne 000000c0 +|772d7468 696e6712 04747275 65| w-thing..true 000000d0 + 000000dd diff --git a/crates/proto-gazette/src/consumer.rs b/crates/proto-gazette/src/consumer.rs index 5c314a07f52..db94df2b08f 100644 --- a/crates/proto-gazette/src/consumer.rs +++ b/crates/proto-gazette/src/consumer.rs @@ -172,6 +172,9 @@ pub struct ConsumerSpec { /// Maximum number of assigned Shards. #[prost(uint32, tag = "2")] pub shard_limit: u32, + /// When true, this consumer has been signaled to exit. + #[prost(bool, tag = "3")] + pub exiting: bool, } /// ReplicaStatus is the status of a ShardSpec assigned to a ConsumerSpec. /// It serves as an allocator AssignmentValue. ReplicaStatus is reduced by taking diff --git a/crates/proto-gazette/src/consumer.serde.rs b/crates/proto-gazette/src/consumer.serde.rs index 0628bba7707..3810f4dcc66 100644 --- a/crates/proto-gazette/src/consumer.serde.rs +++ b/crates/proto-gazette/src/consumer.serde.rs @@ -861,6 +861,9 @@ impl serde::Serialize for ConsumerSpec { if self.shard_limit != 0 { len += 1; } + if self.exiting { + len += 1; + } let mut struct_ser = serializer.serialize_struct("consumer.ConsumerSpec", len)?; if let Some(v) = self.process_spec.as_ref() { struct_ser.serialize_field("processSpec", v)?; @@ -868,6 +871,9 @@ impl serde::Serialize for ConsumerSpec { if self.shard_limit != 0 { struct_ser.serialize_field("shardLimit", &self.shard_limit)?; } + if self.exiting { + struct_ser.serialize_field("exiting", &self.exiting)?; + } struct_ser.end() } } @@ -882,12 +888,14 @@ impl<'de> serde::Deserialize<'de> for ConsumerSpec { "processSpec", "shard_limit", "shardLimit", + "exiting", ]; #[allow(clippy::enum_variant_names)] enum GeneratedField { ProcessSpec, ShardLimit, + Exiting, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -911,6 +919,7 @@ impl<'de> serde::Deserialize<'de> for ConsumerSpec { match value { "processSpec" | "process_spec" => Ok(GeneratedField::ProcessSpec), "shardLimit" | "shard_limit" => Ok(GeneratedField::ShardLimit), + "exiting" => Ok(GeneratedField::Exiting), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -932,6 +941,7 @@ impl<'de> serde::Deserialize<'de> for ConsumerSpec { { let mut process_spec__ = None; let mut shard_limit__ = None; + let mut exiting__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::ProcessSpec => { @@ -948,11 +958,18 @@ impl<'de> serde::Deserialize<'de> for ConsumerSpec { Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) ; } + GeneratedField::Exiting => { + if exiting__.is_some() { + return Err(serde::de::Error::duplicate_field("exiting")); + } + exiting__ = Some(map_.next_value()?); + } } } Ok(ConsumerSpec { process_spec: process_spec__, shard_limit: shard_limit__.unwrap_or_default(), + exiting: exiting__.unwrap_or_default(), }) } } diff --git a/crates/proto-gazette/src/protocol.rs b/crates/proto-gazette/src/protocol.rs index a4f15217eda..609f9740dad 100644 --- a/crates/proto-gazette/src/protocol.rs +++ b/crates/proto-gazette/src/protocol.rs @@ -288,6 +288,9 @@ pub struct BrokerSpec { /// Maximum number of assigned Journal replicas. #[prost(uint32, tag = "2")] pub journal_limit: u32, + /// When true, this broker has been signaled to exit. + #[prost(bool, tag = "3")] + pub exiting: bool, } /// Fragment is a content-addressed description of a contiguous Journal span, /// defined by the \[begin, end) offset range covered by the Fragment and the diff --git a/crates/proto-gazette/src/protocol.serde.rs b/crates/proto-gazette/src/protocol.serde.rs index 16bd1897cdd..c3377ff54b1 100644 --- a/crates/proto-gazette/src/protocol.serde.rs +++ b/crates/proto-gazette/src/protocol.serde.rs @@ -867,6 +867,9 @@ impl serde::Serialize for BrokerSpec { if self.journal_limit != 0 { len += 1; } + if self.exiting { + len += 1; + } let mut struct_ser = serializer.serialize_struct("protocol.BrokerSpec", len)?; if let Some(v) = self.process_spec.as_ref() { struct_ser.serialize_field("processSpec", v)?; @@ -874,6 +877,9 @@ impl serde::Serialize for BrokerSpec { if self.journal_limit != 0 { struct_ser.serialize_field("journalLimit", &self.journal_limit)?; } + if self.exiting { + struct_ser.serialize_field("exiting", &self.exiting)?; + } struct_ser.end() } } @@ -888,12 +894,14 @@ impl<'de> serde::Deserialize<'de> for BrokerSpec { "processSpec", "journal_limit", "journalLimit", + "exiting", ]; #[allow(clippy::enum_variant_names)] enum GeneratedField { ProcessSpec, JournalLimit, + Exiting, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -917,6 +925,7 @@ impl<'de> serde::Deserialize<'de> for BrokerSpec { match value { "processSpec" | "process_spec" => Ok(GeneratedField::ProcessSpec), "journalLimit" | "journal_limit" => Ok(GeneratedField::JournalLimit), + "exiting" => Ok(GeneratedField::Exiting), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -938,6 +947,7 @@ impl<'de> serde::Deserialize<'de> for BrokerSpec { { let mut process_spec__ = None; let mut journal_limit__ = None; + let mut exiting__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::ProcessSpec => { @@ -954,11 +964,18 @@ impl<'de> serde::Deserialize<'de> for BrokerSpec { Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) ; } + GeneratedField::Exiting => { + if exiting__.is_some() { + return Err(serde::de::Error::duplicate_field("exiting")); + } + exiting__ = Some(map_.next_value()?); + } } } Ok(BrokerSpec { process_spec: process_spec__, journal_limit: journal_limit__.unwrap_or_default(), + exiting: exiting__.unwrap_or_default(), }) } } diff --git a/crates/sources/tests/snapshots/schema_generation__catalog_schema_snapshot.snap b/crates/sources/tests/snapshots/schema_generation__catalog_schema_snapshot.snap index bfd44fb9166..eac386f3ce9 100644 --- a/crates/sources/tests/snapshots/schema_generation__catalog_schema_snapshot.snap +++ b/crates/sources/tests/snapshots/schema_generation__catalog_schema_snapshot.snap @@ -1217,6 +1217,17 @@ expression: "&schema" "title": "Disable processing of the task's shards.", "type": "boolean" }, + "flags": { + "title": "Flags are string-valued feature flags.", + "description": "Flag names must be valid tokens (Unicode letters, numbers, '-', '_', '.').\nEach flag produces a shard label `estuary.dev/flag/`.", + "type": "object", + "additionalProperties": false, + "patternProperties": { + "^[\\p{Letter}\\p{Number}\\-_\\.]+$": { + "$ref": "#/$defs/Token" + } + } + }, "hotStandbys": { "title": "Number of hot standbys to keep for each task shard.", "description": "Hot standbys of a shard actively replicate the shard's state to another\nmachine, and are able to be quickly promoted to take over processing for\nthe shard should its current primary fail.\nIf not set, then no hot standbys are maintained.\nEXPERIMENTAL: this field MAY be removed.", @@ -1622,6 +1633,14 @@ expression: "&schema" "documents" ] }, + "Token": { + "description": "Token is Unicode letters, numbers, '-', '_', or '.'.", + "type": "string", + "examples": [ + "token" + ], + "pattern": "^[\\p{Letter}\\p{Number}\\-_\\.]+$" + }, "Transform": { "description": "Transform names are Unicode letters, numbers, '-', '_', or '.'.", "type": "string", diff --git a/crates/validation/tests/snapshots/field_selection_conflicts__abort.snap b/crates/validation/tests/snapshots/field_selection_conflicts__abort.snap index 7928d964ba4..df07388cd60 100644 --- a/crates/validation/tests/snapshots/field_selection_conflicts__abort.snap +++ b/crates/validation/tests/snapshots/field_selection_conflicts__abort.snap @@ -63,6 +63,7 @@ expression: "(selections, errors)" ring_buffer_size: None, read_channel_size: None, log_level: None, + flags: {}, }, expect_pub_id: None, delete: false, diff --git a/crates/validation/tests/snapshots/field_selection_conflicts__backfill_binding.snap b/crates/validation/tests/snapshots/field_selection_conflicts__backfill_binding.snap index e9bee4d6fd6..b7e50599058 100644 --- a/crates/validation/tests/snapshots/field_selection_conflicts__backfill_binding.snap +++ b/crates/validation/tests/snapshots/field_selection_conflicts__backfill_binding.snap @@ -63,6 +63,7 @@ expression: "(selections, errors)" ring_buffer_size: None, read_channel_size: None, log_level: None, + flags: {}, }, expect_pub_id: None, delete: false, diff --git a/crates/validation/tests/snapshots/field_selection_conflicts__disable_binding.snap b/crates/validation/tests/snapshots/field_selection_conflicts__disable_binding.snap index 6a1720386f5..f80d59b92ac 100644 --- a/crates/validation/tests/snapshots/field_selection_conflicts__disable_binding.snap +++ b/crates/validation/tests/snapshots/field_selection_conflicts__disable_binding.snap @@ -50,6 +50,7 @@ expression: "(selections, errors)" ring_buffer_size: None, read_channel_size: None, log_level: None, + flags: {}, }, expect_pub_id: None, delete: false, diff --git a/crates/validation/tests/snapshots/field_selection_conflicts__disable_task.snap b/crates/validation/tests/snapshots/field_selection_conflicts__disable_task.snap index 99f8ae64ec4..f5781eeb2d7 100644 --- a/crates/validation/tests/snapshots/field_selection_conflicts__disable_task.snap +++ b/crates/validation/tests/snapshots/field_selection_conflicts__disable_task.snap @@ -50,6 +50,7 @@ expression: "(selections, errors)" ring_buffer_size: None, read_channel_size: None, log_level: None, + flags: {}, }, expect_pub_id: None, delete: false, diff --git a/crates/validation/tests/snapshots/group_by_conflicts__backfill_binding.snap b/crates/validation/tests/snapshots/group_by_conflicts__backfill_binding.snap index cda851caa5f..1b08263a25e 100644 --- a/crates/validation/tests/snapshots/group_by_conflicts__backfill_binding.snap +++ b/crates/validation/tests/snapshots/group_by_conflicts__backfill_binding.snap @@ -60,6 +60,7 @@ expression: "(selections, errors)" ring_buffer_size: None, read_channel_size: None, log_level: None, + flags: {}, }, expect_pub_id: None, delete: false, diff --git a/crates/validation/tests/snapshots/group_by_conflicts__backfill_binding_with_reset.snap b/crates/validation/tests/snapshots/group_by_conflicts__backfill_binding_with_reset.snap index c14783a1a8d..3462382b4a3 100644 --- a/crates/validation/tests/snapshots/group_by_conflicts__backfill_binding_with_reset.snap +++ b/crates/validation/tests/snapshots/group_by_conflicts__backfill_binding_with_reset.snap @@ -60,6 +60,7 @@ expression: "(selections, errors)" ring_buffer_size: None, read_channel_size: None, log_level: None, + flags: {}, }, expect_pub_id: None, delete: false, diff --git a/crates/validation/tests/snapshots/group_by_conflicts__disable_binding.snap b/crates/validation/tests/snapshots/group_by_conflicts__disable_binding.snap index 0195b7c11ea..aaede62927f 100644 --- a/crates/validation/tests/snapshots/group_by_conflicts__disable_binding.snap +++ b/crates/validation/tests/snapshots/group_by_conflicts__disable_binding.snap @@ -50,6 +50,7 @@ expression: "(selections, errors)" ring_buffer_size: None, read_channel_size: None, log_level: None, + flags: {}, }, expect_pub_id: None, delete: false, diff --git a/crates/validation/tests/snapshots/group_by_conflicts__disable_task.snap b/crates/validation/tests/snapshots/group_by_conflicts__disable_task.snap index 765bdbe8652..13fe2cc33b5 100644 --- a/crates/validation/tests/snapshots/group_by_conflicts__disable_task.snap +++ b/crates/validation/tests/snapshots/group_by_conflicts__disable_task.snap @@ -50,6 +50,7 @@ expression: "(selections, errors)" ring_buffer_size: None, read_channel_size: None, log_level: None, + flags: {}, }, expect_pub_id: None, delete: false, diff --git a/crates/validation/tests/snapshots/group_by_conflicts__noop_with_delta_binding.snap b/crates/validation/tests/snapshots/group_by_conflicts__noop_with_delta_binding.snap index 11982e6bf87..60e0231a92b 100644 --- a/crates/validation/tests/snapshots/group_by_conflicts__noop_with_delta_binding.snap +++ b/crates/validation/tests/snapshots/group_by_conflicts__noop_with_delta_binding.snap @@ -60,6 +60,7 @@ expression: "(selections, errors)" ring_buffer_size: None, read_channel_size: None, log_level: None, + flags: {}, }, expect_pub_id: None, delete: false, diff --git a/crates/validation/tests/snapshots/group_by_conflicts__noop_with_manual_group_by.snap b/crates/validation/tests/snapshots/group_by_conflicts__noop_with_manual_group_by.snap index f84e4a51272..f6c3eb2aa88 100644 --- a/crates/validation/tests/snapshots/group_by_conflicts__noop_with_manual_group_by.snap +++ b/crates/validation/tests/snapshots/group_by_conflicts__noop_with_manual_group_by.snap @@ -65,6 +65,7 @@ expression: "(selections, errors)" ring_buffer_size: None, read_channel_size: None, log_level: None, + flags: {}, }, expect_pub_id: None, delete: false, diff --git a/crates/validation/tests/snapshots/transition_tests__group_by_migration.snap b/crates/validation/tests/snapshots/transition_tests__group_by_migration.snap index 5ee732b5004..676f8e8928c 100644 --- a/crates/validation/tests/snapshots/transition_tests__group_by_migration.snap +++ b/crates/validation/tests/snapshots/transition_tests__group_by_migration.snap @@ -66,6 +66,7 @@ expression: "(&outcome.built_materializations[0].model,\n&outcome.built_material ring_buffer_size: None, read_channel_size: None, log_level: None, + flags: {}, }, expect_pub_id: None, delete: false, diff --git a/flow.schema.json b/flow.schema.json index fcf74725894..a9fd733b7f8 100644 --- a/flow.schema.json +++ b/flow.schema.json @@ -1213,6 +1213,17 @@ "title": "Disable processing of the task's shards.", "type": "boolean" }, + "flags": { + "title": "Flags are string-valued feature flags.", + "description": "Flag names must be valid tokens (Unicode letters, numbers, '-', '_', '.').\nEach flag produces a shard label `estuary.dev/flag/`.", + "type": "object", + "additionalProperties": false, + "patternProperties": { + "^[\\p{Letter}\\p{Number}\\-_\\.]+$": { + "type": "string" + } + } + }, "hotStandbys": { "title": "Number of hot standbys to keep for each task shard.", "description": "Hot standbys of a shard actively replicate the shard's state to another\nmachine, and are able to be quickly promoted to take over processing for\nthe shard should its current primary fail.\nIf not set, then no hot standbys are maintained.\nEXPERIMENTAL: this field MAY be removed.", diff --git a/go/labels/labels.go b/go/labels/labels.go index 2275f9b3d69..f483f14d3aa 100644 --- a/go/labels/labels.go +++ b/go/labels/labels.go @@ -22,6 +22,8 @@ const ( // Field is a logical partition of the Collection that's implemented by this // journal. FieldPrefix = "estuary.dev/field/" + // Flag is a named feature flag which controls low-level task behavior. + FlagPrefix = "estuary.dev/flag/" // KeyBegin is a hexadecimal encoding of the beginning key range (inclusive) // managed by this journal or shard, in an order-preserving packed []byte embedding. KeyBegin = "estuary.dev/key-begin" diff --git a/go/labels/shard.go b/go/labels/shard.go index 9439bf9036d..6c9dec5dcda 100644 --- a/go/labels/shard.go +++ b/go/labels/shard.go @@ -2,6 +2,7 @@ package labels import ( "fmt" + "strings" pf "github.com/estuary/flow/go/protocols/flow" "github.com/estuary/flow/go/protocols/ops" @@ -65,6 +66,13 @@ func ParseShardLabels(set pf.LabelSet) (ops.ShardLabeling, error) { return out, fmt.Errorf("invalid stats journal: %w", out.StatsJournal.Validate()) } + out.Flags = make(map[string]string) + for _, l := range set.Labels { + if name, ok := strings.CutPrefix(l.Name, FlagPrefix); ok { + out.Flags[name] = l.Value + } + } + if out.SplitSource != "" && out.SplitTarget != "" { return out, fmt.Errorf( "both split-source %q and split-target %q are set but shouldn't be", diff --git a/go/labels/shard_test.go b/go/labels/shard_test.go index 18f9eadc8a9..ea80f7f8d3b 100644 --- a/go/labels/shard_test.go +++ b/go/labels/shard_test.go @@ -12,6 +12,8 @@ import ( func TestParsingShardLabels(t *testing.T) { var set = pb.MustLabelSet( Build, "a-build", + FlagPrefix+"buffer-size", "1024", + FlagPrefix+"enable-new-thing", "true", LogLevel, "debug", KeyBegin, "aaaaaaaa", KeyEnd, "bbbbbbbb", @@ -48,6 +50,10 @@ func TestParsingShardLabels(t *testing.T) { TaskType: ops.TaskType_capture, LogsJournal: "logs/journal", StatsJournal: "stats/journal", + Flags: map[string]string{ + "buffer-size": "1024", + "enable-new-thing": "true", + }, }, out) // Case: logs & stats journals are ommitted and use legacy behavior. diff --git a/go/protocols/ops/ops.pb.go b/go/protocols/ops/ops.pb.go index ffe23231fd2..44c301b9a3c 100644 --- a/go/protocols/ops/ops.pb.go +++ b/go/protocols/ops/ops.pb.go @@ -121,10 +121,12 @@ type ShardLabeling struct { // Journal to which task logs are directed. LogsJournal go_gazette_dev_core_broker_protocol.Journal `protobuf:"bytes,10,opt,name=logs_journal,json=logsJournal,proto3,casttype=go.gazette.dev/core/broker/protocol.Journal" json:"logs_journal,omitempty"` // Journal to which task stats are directed. - StatsJournal go_gazette_dev_core_broker_protocol.Journal `protobuf:"bytes,11,opt,name=stats_journal,json=statsJournal,proto3,casttype=go.gazette.dev/core/broker/protocol.Journal" json:"stats_journal,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` + StatsJournal go_gazette_dev_core_broker_protocol.Journal `protobuf:"bytes,11,opt,name=stats_journal,json=statsJournal,proto3,casttype=go.gazette.dev/core/broker/protocol.Journal" json:"stats_journal,omitempty"` + // Flags are arbitrary string-valued feature flags set on the task's shard template. + Flags map[string]string `protobuf:"bytes,12,rep,name=flags,proto3" json:"flags,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` } func (m *ShardLabeling) Reset() { *m = ShardLabeling{} } @@ -652,6 +654,7 @@ func init() { proto.RegisterEnum("ops.TaskType", TaskType_name, TaskType_value) proto.RegisterEnum("ops.Log_Level", Log_Level_name, Log_Level_value) proto.RegisterType((*ShardLabeling)(nil), "ops.ShardLabeling") + proto.RegisterMapType((map[string]string)(nil), "ops.ShardLabeling.FlagsEntry") proto.RegisterType((*ShardRef)(nil), "ops.ShardRef") proto.RegisterType((*Meta)(nil), "ops.Meta") proto.RegisterType((*Log)(nil), "ops.Log") @@ -671,89 +674,91 @@ func init() { func init() { proto.RegisterFile("go/protocols/ops/ops.proto", fileDescriptor_37de94a5cb9d0036) } var fileDescriptor_37de94a5cb9d0036 = []byte{ - // 1302 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xc4, 0x57, 0xdf, 0x8e, 0xdb, 0xc4, - 0x17, 0x5e, 0x27, 0x76, 0x36, 0x39, 0x4e, 0x76, 0xd3, 0x69, 0x7f, 0x3f, 0x4c, 0xda, 0xee, 0x6e, - 0x03, 0x48, 0xdb, 0x2e, 0x38, 0x6a, 0x2a, 0x24, 0x54, 0x09, 0x89, 0xa6, 0xa5, 0x12, 0xd5, 0x6e, - 0x55, 0xbc, 0xe1, 0xa6, 0x12, 0xb2, 0x26, 0xf6, 0xc4, 0xeb, 0xc6, 0x99, 0xb1, 0x3c, 0xe3, 0x6d, - 0xd3, 0x47, 0xe0, 0x96, 0x1b, 0xae, 0x10, 0xcf, 0xc0, 0x53, 0x54, 0x5c, 0x20, 0x2e, 0xb8, 0x44, - 0x45, 0x94, 0xb7, 0xe0, 0x0a, 0xcd, 0x8c, 0x9d, 0xb8, 0xdd, 0xa5, 0xff, 0x84, 0xc4, 0xc5, 0xae, - 0x66, 0xce, 0xf9, 0xe6, 0xcc, 0x99, 0xf3, 0x7d, 0x67, 0xc6, 0x81, 0x5e, 0xc4, 0x06, 0x69, 0xc6, - 0x04, 0x0b, 0x58, 0xc2, 0x07, 0x2c, 0x55, 0x7f, 0xae, 0xb2, 0xa0, 0x3a, 0x4b, 0x79, 0xef, 0xc2, - 0x73, 0x80, 0x69, 0xc2, 0x1e, 0xaa, 0x7f, 0x1a, 0xd2, 0x3b, 0x17, 0xb1, 0x88, 0xa9, 0xe1, 0x40, - 0x8e, 0x0a, 0xeb, 0x76, 0xc4, 0x58, 0x94, 0x10, 0xbd, 0x6e, 0x92, 0x4f, 0x07, 0x22, 0x9e, 0x13, - 0x2e, 0xf0, 0x3c, 0xd5, 0x80, 0xfe, 0xaf, 0x75, 0xe8, 0x1c, 0x1e, 0xe1, 0x2c, 0xdc, 0xc7, 0x13, - 0x92, 0xc4, 0x34, 0x42, 0xe7, 0xc0, 0x9a, 0xe4, 0x71, 0x12, 0x3a, 0xc6, 0x8e, 0xb1, 0xdb, 0xf2, - 0xf4, 0x04, 0xf5, 0xa0, 0x79, 0xc4, 0xb8, 0xa0, 0x78, 0x4e, 0x9c, 0x9a, 0x72, 0x2c, 0xe7, 0x68, - 0x0f, 0x5a, 0x09, 0x8b, 0xfc, 0x84, 0x1c, 0x93, 0xc4, 0xa9, 0xef, 0x18, 0xbb, 0x1b, 0xc3, 0x0d, - 0x57, 0x26, 0xbf, 0xcf, 0x22, 0x77, 0x5f, 0x5a, 0xbd, 0x66, 0xc2, 0x22, 0x35, 0x42, 0x7b, 0x60, - 0x65, 0x98, 0x46, 0xc4, 0xb1, 0x76, 0x8c, 0x5d, 0x7b, 0xb8, 0xe9, 0xaa, 0x33, 0x78, 0xd2, 0x74, - 0x98, 0x92, 0x60, 0x64, 0x3e, 0x79, 0xba, 0xbd, 0xe6, 0x69, 0x0c, 0xba, 0x04, 0x6d, 0x9e, 0x26, - 0xb1, 0xf0, 0x39, 0xcb, 0xb3, 0x80, 0x38, 0x0d, 0xb5, 0xb3, 0xad, 0x6c, 0x87, 0xca, 0xb4, 0x82, - 0x08, 0x9c, 0x45, 0x44, 0x38, 0xeb, 0x15, 0xc8, 0x58, 0x99, 0xd0, 0x79, 0x68, 0x09, 0xcc, 0x67, - 0xbe, 0x4a, 0xbe, 0xa9, 0x93, 0x97, 0x86, 0xbb, 0x32, 0xf9, 0x2b, 0x85, 0x53, 0x2c, 0x52, 0xe2, - 0xb4, 0x54, 0xf2, 0x1d, 0x95, 0xfc, 0x18, 0xf3, 0xd9, 0x78, 0x91, 0x12, 0x8d, 0x95, 0x23, 0xe4, - 0x41, 0x3b, 0x61, 0x11, 0xf7, 0x1f, 0xb0, 0x3c, 0xa3, 0x38, 0x71, 0x40, 0xc6, 0x1a, 0x0d, 0xfe, - 0x7a, 0xba, 0xbd, 0x17, 0x31, 0x37, 0xc2, 0x8f, 0x89, 0x10, 0xc4, 0x0d, 0xc9, 0xf1, 0x20, 0x60, - 0x19, 0x19, 0x4c, 0x32, 0x36, 0x23, 0xd9, 0x92, 0x33, 0xf7, 0x8e, 0x5e, 0xe6, 0xd9, 0x32, 0x48, - 0x31, 0x41, 0x63, 0xe8, 0x70, 0x81, 0xc5, 0x2a, 0xa8, 0xfd, 0x76, 0x41, 0xdb, 0x2a, 0x4a, 0x31, - 0xeb, 0x7f, 0x67, 0x40, 0x53, 0xd1, 0xea, 0x91, 0x29, 0xba, 0x04, 0xe6, 0x2c, 0xa6, 0x9a, 0xd0, - 0x13, 0xa7, 0x53, 0x2e, 0x84, 0xc0, 0xac, 0x50, 0xab, 0xc6, 0xb2, 0x6c, 0x33, 0xb2, 0xf0, 0x27, - 0x24, 0x8a, 0xa9, 0xa2, 0xb5, 0xe5, 0x35, 0x67, 0x64, 0x31, 0x92, 0x73, 0xd4, 0x87, 0x4e, 0xe6, - 0x07, 0x09, 0x0b, 0x66, 0x05, 0xc0, 0xd4, 0x75, 0xcf, 0x6e, 0x4a, 0x9b, 0xc6, 0x2c, 0x95, 0x64, - 0x55, 0x94, 0xd4, 0xef, 0x81, 0x79, 0x40, 0x04, 0x96, 0x5b, 0xe6, 0x79, 0x5c, 0xca, 0x4c, 0x8d, - 0xfb, 0x3f, 0xd7, 0xa1, 0xbe, 0xcf, 0x22, 0xb4, 0x05, 0xe6, 0x9c, 0x08, 0xac, 0x7c, 0xf6, 0xb0, - 0xa5, 0x32, 0x96, 0x8b, 0x3c, 0xcb, 0x97, 0x76, 0xf4, 0x1e, 0x58, 0x5c, 0x9e, 0x4e, 0xe5, 0x6b, - 0x17, 0x47, 0x2a, 0xcf, 0xeb, 0x69, 0x1f, 0xba, 0x0a, 0xad, 0xa5, 0xda, 0x55, 0xfe, 0xf6, 0xb0, - 0xe7, 0xea, 0x7e, 0x70, 0xcb, 0x7e, 0x70, 0xc7, 0x25, 0xc2, 0xab, 0x09, 0x8e, 0xde, 0x07, 0x4b, - 0xab, 0xd8, 0x3c, 0x55, 0xc5, 0xda, 0x89, 0x1c, 0x58, 0x9f, 0x13, 0xce, 0x71, 0x21, 0xe2, 0x96, - 0x57, 0x4e, 0xd1, 0x7d, 0xd8, 0x9c, 0xc6, 0x24, 0x09, 0xb9, 0xff, 0x80, 0x33, 0xea, 0xcf, 0x71, - 0xea, 0x34, 0x76, 0xea, 0xbb, 0xf6, 0xf0, 0xfc, 0x32, 0xd2, 0x6d, 0xe5, 0xbf, 0xc3, 0x19, 0x3d, - 0xc0, 0xe9, 0xe7, 0x54, 0x64, 0x8b, 0xd1, 0x85, 0x6f, 0x7e, 0xdf, 0x76, 0x08, 0x0d, 0x58, 0x18, - 0xd3, 0x68, 0x20, 0x57, 0xba, 0x1e, 0x7e, 0x78, 0xa0, 0x63, 0x7a, 0x0d, 0x1d, 0x11, 0x6d, 0x81, - 0xc5, 0x53, 0x4c, 0xb9, 0xb3, 0xae, 0x22, 0x36, 0xcb, 0x88, 0x9e, 0x36, 0xf7, 0x3e, 0x03, 0x74, - 0x32, 0x36, 0xea, 0x42, 0x7d, 0x46, 0x16, 0x45, 0x91, 0xe5, 0x50, 0xb2, 0x72, 0x8c, 0x93, 0x5c, - 0x73, 0xdd, 0xf6, 0xf4, 0xe4, 0x7a, 0xed, 0x13, 0xa3, 0xff, 0x25, 0x58, 0xba, 0x47, 0xcf, 0xc2, - 0x66, 0x4e, 0x43, 0x32, 0x8d, 0x29, 0x09, 0x75, 0x5b, 0x77, 0xd7, 0x50, 0x0b, 0x2c, 0x92, 0x65, - 0x2c, 0xeb, 0x1a, 0xa8, 0x09, 0xe6, 0x43, 0x9c, 0xd1, 0x6e, 0x4d, 0x8e, 0x62, 0x3a, 0x65, 0xdd, - 0xba, 0x74, 0x87, 0x64, 0x92, 0x47, 0x5d, 0x53, 0x0e, 0x45, 0x86, 0x03, 0xd2, 0xb5, 0xfa, 0xbf, - 0x75, 0xc0, 0x3a, 0x94, 0xc2, 0xfc, 0xcf, 0x28, 0xfd, 0x10, 0x10, 0x4b, 0x09, 0xf5, 0x39, 0x09, - 0x18, 0x0d, 0xb9, 0x2f, 0x98, 0xc0, 0x9a, 0x5f, 0xc3, 0xeb, 0x4a, 0xcf, 0xa1, 0x76, 0x8c, 0xa5, - 0x5d, 0x5d, 0x15, 0x8f, 0xa8, 0x1f, 0xb0, 0x9c, 0x0a, 0x45, 0x6e, 0xc7, 0x6b, 0x8a, 0x47, 0xf4, - 0xa6, 0x9c, 0xa3, 0xab, 0xb0, 0x1e, 0xe0, 0x54, 0xe4, 0x19, 0x29, 0x58, 0x7d, 0x47, 0x27, 0x29, - 0xcf, 0xe7, 0xde, 0xd4, 0x1e, 0x55, 0x75, 0xaf, 0xc4, 0xa1, 0xcb, 0xd0, 0x08, 0x49, 0x16, 0x1f, - 0x13, 0x75, 0x2f, 0xd9, 0xc3, 0x33, 0x95, 0x15, 0xb7, 0x94, 0xc3, 0x2b, 0x00, 0xe8, 0x53, 0xb0, - 0xe7, 0x58, 0x90, 0x2c, 0xc6, 0x49, 0xfc, 0x58, 0xde, 0x53, 0x2b, 0xdd, 0x68, 0xfc, 0xc1, 0xca, - 0xab, 0x77, 0xa9, 0xe2, 0xd1, 0x00, 0x9a, 0x31, 0x15, 0x24, 0x3b, 0xc6, 0x89, 0xba, 0xc6, 0xec, - 0xe1, 0xd9, 0xca, 0xda, 0x2f, 0x0a, 0x97, 0xb7, 0x04, 0xf5, 0xee, 0x42, 0xfb, 0x16, 0x0b, 0xf8, - 0x0d, 0x1a, 0x8e, 0x16, 0x82, 0x70, 0x74, 0x11, 0x20, 0x64, 0x41, 0x59, 0x20, 0x49, 0x93, 0xe9, - 0xb5, 0xa4, 0x45, 0x57, 0x66, 0x1b, 0xec, 0x89, 0xc4, 0x15, 0xfe, 0x9a, 0xf2, 0x83, 0x32, 0x29, - 0x40, 0xef, 0x47, 0x03, 0x36, 0x8a, 0x22, 0x8c, 0x62, 0x2a, 0xa5, 0x8c, 0x3e, 0x02, 0x2b, 0x8b, - 0xa3, 0x23, 0x51, 0x90, 0x5e, 0x2d, 0x57, 0x75, 0x6b, 0x4f, 0xa3, 0xd0, 0x65, 0xa8, 0xb3, 0x5c, - 0x14, 0x02, 0xf8, 0x47, 0xb0, 0xc4, 0xa0, 0xdb, 0x70, 0x26, 0xc1, 0x5c, 0xf8, 0x69, 0x3e, 0x49, - 0x62, 0x7e, 0x44, 0x42, 0x1f, 0x8b, 0xd7, 0x10, 0xc4, 0xa6, 0x5c, 0x74, 0xaf, 0x5c, 0x73, 0x43, - 0xf4, 0xbe, 0x82, 0x76, 0x95, 0xb8, 0x53, 0xda, 0x65, 0x50, 0x6d, 0x17, 0x7b, 0xf8, 0xee, 0x49, - 0xca, 0x8b, 0xd3, 0x56, 0x3a, 0xa9, 0xf7, 0xbd, 0x09, 0x0d, 0x4d, 0x2f, 0xba, 0x01, 0x20, 0x32, - 0x4c, 0xf9, 0x94, 0x65, 0x73, 0xee, 0x18, 0x8a, 0xd5, 0x4b, 0x27, 0x54, 0xe0, 0x8e, 0x97, 0x18, - 0xcd, 0x6d, 0x65, 0x11, 0xfa, 0x18, 0x5a, 0xcb, 0x73, 0xbe, 0xaa, 0x3a, 0x2b, 0x64, 0x59, 0xce, - 0xfa, 0xdb, 0x96, 0xd3, 0x7c, 0xf3, 0x72, 0xfe, 0x64, 0x40, 0x6b, 0x79, 0x12, 0xf4, 0x7f, 0x68, - 0x14, 0xef, 0xb6, 0xae, 0x67, 0x31, 0x93, 0xb2, 0x88, 0x69, 0xfa, 0x6a, 0xa6, 0x35, 0x0a, 0x1d, - 0x82, 0xa3, 0x92, 0xd3, 0xab, 0xdf, 0x94, 0xf2, 0xff, 0xc9, 0xb5, 0xfa, 0x5b, 0xa1, 0x92, 0xa9, - 0xfc, 0x6c, 0xd0, 0x72, 0x9e, 0x90, 0x23, 0xf9, 0x36, 0x9a, 0x4a, 0xcf, 0x5a, 0xe2, 0x23, 0x65, - 0xea, 0xdd, 0x87, 0xcd, 0x17, 0x58, 0x39, 0x45, 0x1e, 0x57, 0x9f, 0x97, 0xc7, 0xf9, 0x97, 0x30, - 0x5b, 0x15, 0xc8, 0xb7, 0x35, 0x40, 0x95, 0x7e, 0x2e, 0x1b, 0x66, 0x0f, 0xcc, 0x84, 0x4c, 0x5f, - 0xd9, 0x2f, 0x0a, 0xb4, 0xea, 0xae, 0xda, 0x9b, 0x74, 0xd7, 0xeb, 0xc8, 0xe1, 0x65, 0x15, 0x37, - 0xff, 0xad, 0x8a, 0x5b, 0x27, 0x2b, 0xfe, 0x35, 0x74, 0x5f, 0xbc, 0xe4, 0x4e, 0x29, 0xf9, 0xb5, - 0xe7, 0x4b, 0x7e, 0xf1, 0xf4, 0x2b, 0xf2, 0x94, 0xae, 0xbc, 0x07, 0xcd, 0xf2, 0x1e, 0x44, 0x1f, - 0xc0, 0x46, 0x9e, 0xca, 0xb7, 0xa4, 0x7c, 0x18, 0xd4, 0x0e, 0x1d, 0xaf, 0xa3, 0xad, 0xc5, 0xa3, - 0x20, 0x2f, 0xc5, 0x5c, 0xbe, 0xc2, 0x7e, 0x86, 0x85, 0xde, 0xb0, 0xe6, 0xb5, 0x94, 0xc5, 0xc3, - 0x82, 0x5c, 0xb9, 0x0b, 0xcd, 0xf2, 0x43, 0x0a, 0x75, 0xa1, 0x1d, 0xd3, 0x63, 0x9c, 0xc4, 0xa1, - 0xfa, 0x96, 0xec, 0xae, 0x21, 0x7b, 0xf9, 0x5e, 0x74, 0x0d, 0xb4, 0x01, 0xa0, 0x2e, 0x7a, 0x2c, - 0x62, 0x26, 0x5f, 0xce, 0xb3, 0xb0, 0xb9, 0xba, 0xbe, 0xb5, 0xb1, 0x3e, 0xba, 0xfe, 0xe4, 0x8f, - 0xad, 0xb5, 0x27, 0xcf, 0xb6, 0x8c, 0x5f, 0x9e, 0x6d, 0x19, 0x3f, 0xfc, 0xb9, 0x65, 0xdc, 0xdf, - 0x8d, 0x62, 0x71, 0x94, 0x4f, 0xdc, 0x80, 0xcd, 0x07, 0x84, 0x8b, 0x1c, 0x67, 0x0b, 0xfd, 0xe9, - 0xff, 0xe2, 0xaf, 0x85, 0x49, 0x43, 0x4d, 0xaf, 0xfd, 0x1d, 0x00, 0x00, 0xff, 0xff, 0xe0, 0xa8, - 0x19, 0xc5, 0x48, 0x0c, 0x00, 0x00, + // 1335 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xc4, 0x57, 0xc1, 0x6e, 0xdb, 0x46, + 0x13, 0x36, 0x25, 0x52, 0x96, 0x86, 0x92, 0xad, 0x6c, 0xf2, 0xff, 0x3f, 0x7f, 0x25, 0xb1, 0x1d, + 0xb7, 0x05, 0x9c, 0xb8, 0xa5, 0x10, 0x05, 0x05, 0x82, 0x00, 0x05, 0x1a, 0x25, 0x0d, 0xd0, 0xc0, + 0x0e, 0x52, 0x5a, 0xbd, 0x04, 0x28, 0x88, 0x15, 0xb9, 0xa2, 0x19, 0x51, 0xbb, 0x04, 0x77, 0xe9, + 0x44, 0x79, 0x84, 0x5e, 0x7b, 0xe9, 0xa9, 0xe8, 0x33, 0xf4, 0x15, 0x7a, 0x09, 0x7a, 0x28, 0xfa, + 0x00, 0x45, 0x8a, 0xa6, 0x6f, 0xd1, 0x53, 0xb1, 0xbb, 0xa4, 0x44, 0xc7, 0x6e, 0x9c, 0x04, 0x05, + 0x7a, 0xb0, 0xb1, 0x3b, 0xf3, 0xcd, 0xec, 0xec, 0xcc, 0x37, 0xb3, 0x14, 0xf4, 0x22, 0xd6, 0x4f, + 0x33, 0x26, 0x58, 0xc0, 0x12, 0xde, 0x67, 0xa9, 0xfa, 0x73, 0x95, 0x04, 0xd5, 0x59, 0xca, 0x7b, + 0x97, 0x8e, 0x01, 0x26, 0x09, 0x7b, 0xa2, 0xfe, 0x69, 0x48, 0xef, 0x42, 0xc4, 0x22, 0xa6, 0x96, + 0x7d, 0xb9, 0x2a, 0xa4, 0x9b, 0x11, 0x63, 0x51, 0x42, 0xb4, 0xdd, 0x38, 0x9f, 0xf4, 0x45, 0x3c, + 0x23, 0x5c, 0xe0, 0x59, 0xaa, 0x01, 0xdb, 0x3f, 0x9a, 0xd0, 0x39, 0x38, 0xc4, 0x59, 0xb8, 0x87, + 0xc7, 0x24, 0x89, 0x69, 0x84, 0x2e, 0x80, 0x35, 0xce, 0xe3, 0x24, 0x74, 0x8c, 0x2d, 0x63, 0xa7, + 0xe5, 0xe9, 0x0d, 0xea, 0x41, 0xf3, 0x90, 0x71, 0x41, 0xf1, 0x8c, 0x38, 0x35, 0xa5, 0x58, 0xec, + 0xd1, 0x2e, 0xb4, 0x12, 0x16, 0xf9, 0x09, 0x39, 0x22, 0x89, 0x53, 0xdf, 0x32, 0x76, 0xd6, 0x06, + 0x6b, 0xae, 0x0c, 0x7e, 0x8f, 0x45, 0xee, 0x9e, 0x94, 0x7a, 0xcd, 0x84, 0x45, 0x6a, 0x85, 0x76, + 0xc1, 0xca, 0x30, 0x8d, 0x88, 0x63, 0x6d, 0x19, 0x3b, 0xf6, 0x60, 0xdd, 0x55, 0x77, 0xf0, 0xa4, + 0xe8, 0x20, 0x25, 0xc1, 0xd0, 0x7c, 0xfe, 0x62, 0x73, 0xc5, 0xd3, 0x18, 0x74, 0x05, 0xda, 0x3c, + 0x4d, 0x62, 0xe1, 0x73, 0x96, 0x67, 0x01, 0x71, 0x1a, 0xea, 0x64, 0x5b, 0xc9, 0x0e, 0x94, 0x68, + 0x09, 0x11, 0x38, 0x8b, 0x88, 0x70, 0x56, 0x2b, 0x90, 0x91, 0x12, 0xa1, 0x8b, 0xd0, 0x12, 0x98, + 0x4f, 0x7d, 0x15, 0x7c, 0x53, 0x07, 0x2f, 0x05, 0x0f, 0x64, 0xf0, 0xd7, 0x0a, 0xa5, 0x98, 0xa7, + 0xc4, 0x69, 0xa9, 0xe0, 0x3b, 0x2a, 0xf8, 0x11, 0xe6, 0xd3, 0xd1, 0x3c, 0x25, 0x1a, 0x2b, 0x57, + 0xc8, 0x83, 0x76, 0xc2, 0x22, 0xee, 0x3f, 0x66, 0x79, 0x46, 0x71, 0xe2, 0x80, 0xf4, 0x35, 0xec, + 0xff, 0xf9, 0x62, 0x73, 0x37, 0x62, 0x6e, 0x84, 0x9f, 0x11, 0x21, 0x88, 0x1b, 0x92, 0xa3, 0x7e, + 0xc0, 0x32, 0xd2, 0x1f, 0x67, 0x6c, 0x4a, 0xb2, 0x45, 0xcd, 0xdc, 0xfb, 0xda, 0xcc, 0xb3, 0xa5, + 0x93, 0x62, 0x83, 0x46, 0xd0, 0xe1, 0x02, 0x8b, 0xa5, 0x53, 0xfb, 0xdd, 0x9c, 0xb6, 0x95, 0x97, + 0xd2, 0xeb, 0x0d, 0xb0, 0x26, 0x09, 0x8e, 0xb8, 0xd3, 0xde, 0xaa, 0xef, 0xd8, 0x83, 0xcb, 0xea, + 0x46, 0xc7, 0xea, 0xec, 0xde, 0x93, 0xfa, 0xcf, 0xa8, 0xc8, 0xe6, 0x9e, 0xc6, 0xf6, 0x6e, 0x02, + 0x2c, 0x85, 0xa8, 0x0b, 0xf5, 0x29, 0x99, 0x17, 0x2c, 0x90, 0x4b, 0xc9, 0x8c, 0x23, 0x9c, 0xe4, + 0x25, 0x01, 0xf4, 0xe6, 0x56, 0xed, 0xa6, 0xb1, 0xfd, 0xad, 0x01, 0x4d, 0xe5, 0xdd, 0x23, 0x13, + 0x74, 0x05, 0xcc, 0x69, 0x4c, 0x35, 0x7f, 0x4e, 0x24, 0x53, 0xa9, 0x10, 0x02, 0xb3, 0xc2, 0x24, + 0xb5, 0x96, 0x55, 0x9a, 0x92, 0xb9, 0x3f, 0x26, 0x51, 0x4c, 0x15, 0x8b, 0x5a, 0x5e, 0x73, 0x4a, + 0xe6, 0x43, 0xb9, 0x47, 0xdb, 0xd0, 0xc9, 0xfc, 0x20, 0x61, 0xc1, 0xb4, 0x00, 0x98, 0xba, 0xcc, + 0xd9, 0x1d, 0x29, 0xd3, 0x98, 0x05, 0x71, 0xad, 0x0a, 0x71, 0xb7, 0x7b, 0x60, 0xee, 0x13, 0x81, + 0xe5, 0x91, 0x79, 0x1e, 0x97, 0xac, 0x56, 0xeb, 0xed, 0x9f, 0xeb, 0x50, 0xdf, 0x63, 0x11, 0xda, + 0x00, 0x73, 0x46, 0x04, 0x56, 0x3a, 0x7b, 0xd0, 0x52, 0x11, 0x4b, 0x23, 0xcf, 0xf2, 0xa5, 0x1c, + 0xbd, 0x07, 0x16, 0x97, 0xb7, 0x53, 0xf1, 0xda, 0xc5, 0x95, 0xca, 0xfb, 0x7a, 0x5a, 0x87, 0xae, + 0x43, 0x6b, 0xd1, 0x5c, 0x2a, 0x7e, 0x7b, 0xd0, 0x73, 0x75, 0xfb, 0xb9, 0x65, 0xfb, 0xb9, 0xa3, + 0x12, 0xe1, 0xd5, 0x04, 0x47, 0xef, 0x83, 0xa5, 0x9b, 0xc6, 0x3c, 0xb5, 0x69, 0xb4, 0x12, 0x39, + 0xb0, 0x3a, 0x23, 0x9c, 0xe3, 0xa2, 0x67, 0x5a, 0x5e, 0xb9, 0x45, 0x8f, 0x60, 0x7d, 0x12, 0x93, + 0x24, 0xe4, 0xfe, 0x63, 0xce, 0xa8, 0x3f, 0xc3, 0xa9, 0xd3, 0x50, 0xf5, 0xbe, 0xb8, 0xf0, 0x74, + 0x4f, 0xe9, 0xef, 0x73, 0x46, 0xf7, 0x71, 0xaa, 0x0a, 0x3b, 0xbc, 0xf4, 0xf5, 0x6f, 0x9b, 0x0e, + 0xa1, 0x01, 0x0b, 0x63, 0x1a, 0xf5, 0xa5, 0xa5, 0xeb, 0xe1, 0x27, 0xfb, 0xda, 0xa7, 0xd7, 0xd0, + 0x1e, 0xd1, 0x06, 0x58, 0x3c, 0xc5, 0x94, 0x3b, 0xab, 0xca, 0x63, 0xb3, 0xf4, 0xe8, 0x69, 0x71, + 0xef, 0x53, 0x40, 0x27, 0x7d, 0x9f, 0x45, 0x9a, 0x76, 0x95, 0x34, 0x5f, 0x80, 0xa5, 0x47, 0xc2, + 0x79, 0x58, 0xcf, 0x69, 0x48, 0x26, 0x31, 0x25, 0xa1, 0x9e, 0x22, 0xdd, 0x15, 0xd4, 0x02, 0x8b, + 0x64, 0x19, 0xcb, 0xba, 0x06, 0x6a, 0x82, 0xf9, 0x04, 0x67, 0xb4, 0x5b, 0x93, 0xab, 0x98, 0x4e, + 0x58, 0xb7, 0x2e, 0xd5, 0x21, 0x19, 0xe7, 0x51, 0xd7, 0x94, 0x4b, 0x91, 0xe1, 0x80, 0x74, 0xad, + 0xed, 0x5f, 0x3b, 0x60, 0x1d, 0xc8, 0x3e, 0xf8, 0xd7, 0x4a, 0xfa, 0x21, 0x20, 0x96, 0x12, 0xea, + 0x73, 0x12, 0x30, 0x1a, 0x72, 0x5f, 0x30, 0x81, 0x75, 0x7d, 0x0d, 0xaf, 0x2b, 0x35, 0x07, 0x5a, + 0x31, 0x92, 0x72, 0x35, 0x99, 0x9e, 0x52, 0x3f, 0x60, 0x39, 0x15, 0xaa, 0xb8, 0x1d, 0xaf, 0x29, + 0x9e, 0xd2, 0x3b, 0x72, 0x8f, 0xae, 0xc3, 0x6a, 0x80, 0x53, 0x91, 0x67, 0xa4, 0xa8, 0xea, 0xff, + 0x74, 0x90, 0xf2, 0x7e, 0xee, 0x1d, 0xad, 0xd1, 0xfd, 0x5b, 0xe2, 0xd0, 0x55, 0x68, 0x84, 0x24, + 0x8b, 0x8f, 0x88, 0x1a, 0x83, 0xf6, 0xe0, 0x5c, 0xc5, 0xe2, 0xae, 0x52, 0x78, 0x05, 0x00, 0x7d, + 0x02, 0xf6, 0x0c, 0x0b, 0x92, 0xc5, 0x38, 0x89, 0x9f, 0xc9, 0xb1, 0xb8, 0xe4, 0x8d, 0xc6, 0xef, + 0x2f, 0xb5, 0xfa, 0x94, 0x2a, 0x1e, 0xf5, 0xa1, 0x19, 0x53, 0x41, 0xb2, 0x23, 0x9c, 0xa8, 0xa9, + 0x69, 0x0f, 0xce, 0x57, 0x6c, 0x3f, 0x2f, 0x54, 0xde, 0x02, 0xd4, 0x7b, 0x00, 0xed, 0xbb, 0x2c, + 0xe0, 0xb7, 0x69, 0x38, 0x9c, 0x0b, 0xc2, 0xd1, 0x65, 0x80, 0x90, 0x05, 0x65, 0x82, 0x64, 0x99, + 0x4c, 0xaf, 0x25, 0x25, 0x3a, 0x33, 0x9b, 0x60, 0x8f, 0x25, 0xae, 0xd0, 0xd7, 0x94, 0x1e, 0x94, + 0x48, 0x01, 0x7a, 0x3f, 0x18, 0xb0, 0x56, 0x24, 0x61, 0x18, 0x53, 0x49, 0x65, 0xf4, 0x11, 0x58, + 0x59, 0x1c, 0x1d, 0x8a, 0xa2, 0xe8, 0xd5, 0x74, 0x55, 0x8f, 0xf6, 0x34, 0x0a, 0x5d, 0x85, 0x3a, + 0xcb, 0x45, 0x41, 0x80, 0xbf, 0x05, 0x4b, 0x0c, 0xba, 0x07, 0xe7, 0x12, 0xcc, 0x85, 0x9f, 0xe6, + 0xe3, 0x24, 0xe6, 0x87, 0x24, 0xf4, 0xb1, 0x78, 0x03, 0x42, 0xac, 0x4b, 0xa3, 0x87, 0xa5, 0xcd, + 0x6d, 0xd1, 0xfb, 0x12, 0xda, 0xd5, 0xc2, 0x9d, 0xd2, 0x2e, 0xfd, 0x6a, 0xbb, 0xd8, 0x83, 0xff, + 0x9f, 0x2c, 0x79, 0x71, 0xdb, 0x4a, 0x27, 0xf5, 0xbe, 0x33, 0xa1, 0xa1, 0xcb, 0x8b, 0x6e, 0x03, + 0x88, 0x0c, 0x53, 0x3e, 0x61, 0xd9, 0x8c, 0x3b, 0x86, 0xaa, 0xea, 0x95, 0x13, 0x2c, 0x70, 0x47, + 0x0b, 0x8c, 0xae, 0x6d, 0xc5, 0x08, 0x7d, 0x0c, 0xad, 0xc5, 0x3d, 0xcf, 0xca, 0xce, 0x12, 0x59, + 0xa6, 0xb3, 0xfe, 0xae, 0xe9, 0x34, 0xdf, 0x3e, 0x9d, 0x3f, 0x19, 0xd0, 0x5a, 0xdc, 0x04, 0xfd, + 0x17, 0x1a, 0xc5, 0x67, 0x82, 0xce, 0x67, 0xb1, 0x93, 0xb4, 0x88, 0x69, 0x7a, 0x76, 0xa5, 0x35, + 0x0a, 0x1d, 0x80, 0xa3, 0x82, 0xd3, 0xd6, 0x6f, 0x5b, 0xf2, 0xff, 0x48, 0x5b, 0xfd, 0x69, 0x52, + 0x89, 0x54, 0x7e, 0xa5, 0x68, 0x3a, 0x8f, 0xc9, 0xa1, 0x7c, 0x1b, 0x4d, 0xc5, 0x67, 0x4d, 0xf1, + 0xa1, 0x12, 0xf5, 0x1e, 0xc1, 0xfa, 0x2b, 0x55, 0x39, 0x85, 0x1e, 0xd7, 0x8f, 0xd3, 0xe3, 0xe2, + 0x6b, 0x2a, 0x5b, 0x25, 0xc8, 0x37, 0x35, 0x40, 0x95, 0x7e, 0x2e, 0x1b, 0x66, 0x17, 0xcc, 0x84, + 0x4c, 0xce, 0xec, 0x17, 0x05, 0x5a, 0x76, 0x57, 0xed, 0x6d, 0xba, 0xeb, 0x4d, 0xe8, 0xf0, 0xba, + 0x8c, 0x9b, 0xff, 0x54, 0xc6, 0xad, 0x93, 0x19, 0xff, 0x0a, 0xba, 0xaf, 0x0e, 0xb9, 0x53, 0x52, + 0x7e, 0xe3, 0x78, 0xca, 0x2f, 0x9f, 0x3e, 0x22, 0x4f, 0xe9, 0xca, 0x87, 0xd0, 0x2c, 0xe7, 0x20, + 0xfa, 0x00, 0xd6, 0xf2, 0x54, 0xbe, 0x25, 0xe5, 0xc3, 0xa0, 0x4e, 0xe8, 0x78, 0x1d, 0x2d, 0x2d, + 0x1e, 0x05, 0x39, 0x14, 0x73, 0xf9, 0x0a, 0xfb, 0x19, 0x16, 0xfa, 0xc0, 0x9a, 0xd7, 0x52, 0x12, + 0x0f, 0x0b, 0x72, 0xed, 0x01, 0x34, 0xcb, 0x0f, 0x29, 0xd4, 0x85, 0x76, 0x4c, 0x8f, 0x70, 0x12, + 0x87, 0xea, 0xd3, 0xb5, 0xbb, 0x82, 0xec, 0xc5, 0x7b, 0xd1, 0x35, 0xd0, 0x1a, 0x80, 0x1a, 0xf4, + 0x58, 0xc4, 0x4c, 0xbe, 0x9c, 0xe7, 0x61, 0x7d, 0x39, 0xbe, 0xb5, 0xb0, 0x3e, 0xbc, 0xf5, 0xfc, + 0xf7, 0x8d, 0x95, 0xe7, 0x2f, 0x37, 0x8c, 0x5f, 0x5e, 0x6e, 0x18, 0xdf, 0xff, 0xb1, 0x61, 0x3c, + 0xda, 0x89, 0x62, 0x71, 0x98, 0x8f, 0xdd, 0x80, 0xcd, 0xfa, 0x84, 0x8b, 0x1c, 0x67, 0x73, 0xfd, + 0x4b, 0xe3, 0xd5, 0x1f, 0x27, 0xe3, 0x86, 0xda, 0xde, 0xf8, 0x2b, 0x00, 0x00, 0xff, 0xff, 0x3e, + 0xe5, 0x31, 0x21, 0xb7, 0x0c, 0x00, 0x00, } func (m *ShardLabeling) Marshal() (dAtA []byte, err error) { @@ -780,6 +785,25 @@ func (m *ShardLabeling) MarshalToSizedBuffer(dAtA []byte) (int, error) { i -= len(m.XXX_unrecognized) copy(dAtA[i:], m.XXX_unrecognized) } + if len(m.Flags) > 0 { + for k := range m.Flags { + v := m.Flags[k] + baseI := i + i -= len(v) + copy(dAtA[i:], v) + i = encodeVarintOps(dAtA, i, uint64(len(v))) + i-- + dAtA[i] = 0x12 + i -= len(k) + copy(dAtA[i:], k) + i = encodeVarintOps(dAtA, i, uint64(len(k))) + i-- + dAtA[i] = 0xa + i = encodeVarintOps(dAtA, i, uint64(baseI-i)) + i-- + dAtA[i] = 0x62 + } + } if len(m.StatsJournal) > 0 { i -= len(m.StatsJournal) copy(dAtA[i:], m.StatsJournal) @@ -1629,6 +1653,14 @@ func (m *ShardLabeling) ProtoSize() (n int) { if l > 0 { n += 1 + l + sovOps(uint64(l)) } + if len(m.Flags) > 0 { + for k, v := range m.Flags { + _ = k + _ = v + mapEntrySize := 1 + len(k) + sovOps(uint64(len(k))) + 1 + len(v) + sovOps(uint64(len(v))) + n += mapEntrySize + 1 + sovOps(uint64(mapEntrySize)) + } + } if m.XXX_unrecognized != nil { n += len(m.XXX_unrecognized) } @@ -2280,6 +2312,133 @@ func (m *ShardLabeling) Unmarshal(dAtA []byte) error { } m.StatsJournal = go_gazette_dev_core_broker_protocol.Journal(dAtA[iNdEx:postIndex]) iNdEx = postIndex + case 12: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Flags", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowOps + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthOps + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthOps + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Flags == nil { + m.Flags = make(map[string]string) + } + var mapkey string + var mapvalue string + for iNdEx < postIndex { + entryPreIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowOps + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + if fieldNum == 1 { + var stringLenmapkey uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowOps + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLenmapkey |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLenmapkey := int(stringLenmapkey) + if intStringLenmapkey < 0 { + return ErrInvalidLengthOps + } + postStringIndexmapkey := iNdEx + intStringLenmapkey + if postStringIndexmapkey < 0 { + return ErrInvalidLengthOps + } + if postStringIndexmapkey > l { + return io.ErrUnexpectedEOF + } + mapkey = string(dAtA[iNdEx:postStringIndexmapkey]) + iNdEx = postStringIndexmapkey + } else if fieldNum == 2 { + var stringLenmapvalue uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowOps + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLenmapvalue |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLenmapvalue := int(stringLenmapvalue) + if intStringLenmapvalue < 0 { + return ErrInvalidLengthOps + } + postStringIndexmapvalue := iNdEx + intStringLenmapvalue + if postStringIndexmapvalue < 0 { + return ErrInvalidLengthOps + } + if postStringIndexmapvalue > l { + return io.ErrUnexpectedEOF + } + mapvalue = string(dAtA[iNdEx:postStringIndexmapvalue]) + iNdEx = postStringIndexmapvalue + } else { + iNdEx = entryPreIndex + skippy, err := skipOps(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthOps + } + if (iNdEx + skippy) > postIndex { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + m.Flags[mapkey] = mapvalue + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipOps(dAtA[iNdEx:]) diff --git a/go/protocols/ops/ops.proto b/go/protocols/ops/ops.proto index c748b8ec550..1f3fe704aea 100644 --- a/go/protocols/ops/ops.proto +++ b/go/protocols/ops/ops.proto @@ -46,6 +46,8 @@ message ShardLabeling { // Journal to which task stats are directed. string stats_journal = 11 [ (gogoproto.casttype) = "go.gazette.dev/core/broker/protocol.Journal" ]; + // Flags are arbitrary string-valued feature flags set on the task's shard template. + map flags = 12; } // Common `shard` sub-document logged by Stats and Log.