Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions crates/assemble/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,7 @@ pub fn shard_template(
read_channel_size,
ring_buffer_size,
log_level,
flags,
} = shard;

// We hard-code that recovery logs always have prefix "recovery".
Expand Down Expand Up @@ -453,6 +454,14 @@ pub fn shard_template(
}
}

for (name, value) in flags {
labels = labels::set_value(
labels,
&format!("{}{}", labels::FLAG_PREFIX, name.as_str()),
value,
);
}

consumer::ShardSpec {
id: shard_id_prefix.to_string(),
disable: *disable,
Expand Down
1 change: 1 addition & 0 deletions crates/labels/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub const BUILD: &str = "estuary.dev/build";
pub const COLLECTION: &str = "estuary.dev/collection";
pub const CORDON: &str = "estuary.dev/cordon";
pub const FIELD_PREFIX: &str = "estuary.dev/field/";
pub const FLAG_PREFIX: &str = "estuary.dev/flag/";
pub const KEY_BEGIN: &str = "estuary.dev/key-begin";
pub const KEY_BEGIN_MIN: &str = "00000000";
pub const KEY_END: &str = "estuary.dev/key-end";
Expand Down
47 changes: 41 additions & 6 deletions crates/labels/src/shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ pub fn encode_labeling(mut set: LabelSet, labeling: &ops::ShardLabeling) -> Labe
set = set_value(set, crate::TASK_NAME, &labeling.task_name);
set = set_value(set, crate::TASK_TYPE, labeling.task_type().as_str_name());

for (name, value) in &labeling.flags {
set = set_value(set, &format!("{}{name}", crate::FLAG_PREFIX), value);
}

set = set_value(set, crate::LOGS_JOURNAL, &labeling.logs_journal);
set = set_value(set, crate::STATS_JOURNAL, &labeling.stats_journal);

Expand Down Expand Up @@ -70,6 +74,13 @@ pub fn decode_labeling(set: &LabelSet) -> Result<ops::ShardLabeling, Error> {
let logs_journal = maybe_one(set, crate::LOGS_JOURNAL)?.to_string();
let stats_journal = maybe_one(set, crate::STATS_JOURNAL)?.to_string();

let mut flags = std::collections::BTreeMap::new();
for label in &set.labels {
if let Some(name) = label.name.strip_prefix(crate::FLAG_PREFIX) {
flags.insert(name.to_string(), label.value.clone());
}
}

if !split_source.is_empty() && !split_target.is_empty() {
return Err(Error::SplitSourceAndTarget(
split_source.to_string(),
Expand All @@ -88,6 +99,7 @@ pub fn decode_labeling(set: &LabelSet) -> Result<ops::ShardLabeling, Error> {
task_type,
logs_journal,
stats_journal,
flags,
})
}

Expand Down Expand Up @@ -162,19 +174,32 @@ mod test {
split_target: "split/target".to_string(),
task_name: "task/name".to_string(),
task_type: ops::TaskType::Derivation as i32,
flags: [
("buffer-size".to_string(), "1024".to_string()),
("enable-new-thing".to_string(), "true".to_string()),
]
.into(),
logs_journal: "logs/journal".to_string(),
stats_journal: "stats/journal".to_string(),
};

let set = encode_labeling(LabelSet::default(), &labeling);

insta::assert_json_snapshot!(set, @r###"
insta::assert_json_snapshot!(set, @r#"
{
"labels": [
{
"name": "estuary.dev/build",
"value": "a-build"
},
{
"name": "estuary.dev/flag/buffer-size",
"value": "1024"
},
{
"name": "estuary.dev/flag/enable-new-thing",
"value": "true"
},
{
"name": "estuary.dev/hostname",
"value": "a.hostname"
Expand Down Expand Up @@ -225,7 +250,7 @@ mod test {
}
]
}
"###);
"#);

let id = format!("base/shard/id/{}", id_suffix(&set).unwrap());
assert_eq!(id, "base/shard/id/00000100-00000000");
Expand All @@ -242,6 +267,8 @@ mod test {
// All labels except SPLIT_TARGET set.
let model = build_set([
(crate::BUILD, "a-build"),
("estuary.dev/flag/buffer-size", "1024"),
("estuary.dev/flag/enable-new-thing", "true"),
(crate::HOSTNAME, "a.hostname"),
(crate::KEY_BEGIN, "00000001"),
(crate::KEY_END, "00000002"),
Expand All @@ -257,9 +284,13 @@ mod test {

insta::assert_json_snapshot!(
case(model.clone()),
@r###"
@r#"
{
"build": "a-build",
"flags": {
"buffer-size": "1024",
"enable-new-thing": "true"
},
"hostname": "a.hostname",
"logLevel": "info",
"logsJournal": "logs/journal",
Expand All @@ -274,7 +305,7 @@ mod test {
"taskName": "the/task",
"taskType": "capture"
}
"###
"#
);

// Optional labels removed & split target instead of source.
Expand All @@ -294,15 +325,19 @@ mod test {
set = crate::add_value(set, crate::SPLIT_TARGET, "split/target");

insta::assert_json_snapshot!(case(set),
@r###"
@r#"
{
"build": "a-build",
"flags": {
"buffer-size": "1024",
"enable-new-thing": "true"
},
"logLevel": "info",
"splitTarget": "split/target",
"taskName": "the/task",
"taskType": "capture"
}
"###
"#
);

// Expected label is missing.
Expand Down
12 changes: 11 additions & 1 deletion crates/models/src/shards.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::time::Duration;
use validator::Validate;

/// A ShardTemplate configures how shards process a catalog task.
#[derive(Serialize, Deserialize, Debug, Default, JsonSchema, Clone, PartialEq)]
#[derive(Serialize, Deserialize, Debug, Default, JsonSchema, Clone, PartialEq, Validate)]
#[serde(deny_unknown_fields, rename_all = "camelCase")]
#[schemars(example = ShardTemplate::example())]
pub struct ShardTemplate {
Expand Down Expand Up @@ -77,6 +79,12 @@ pub struct ShardTemplate {
#[serde(default, skip_serializing_if = "Option::is_none")]
#[schemars(with = "String")]
pub log_level: Option<String>,
/// # Flags are string-valued feature flags.
/// Flag names must be valid tokens (Unicode letters, numbers, '-', '_', '.').
/// Each flag produces a shard label `estuary.dev/flag/<name>`.
#[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
#[validate(nested)]
pub flags: BTreeMap<super::Token, super::Token>,
}

impl ShardTemplate {
Expand All @@ -96,6 +104,7 @@ impl ShardTemplate {
ring_buffer_size: o4,
read_channel_size: o5,
log_level: o6,
flags,
} = self;

!disable
Expand All @@ -105,5 +114,6 @@ impl ShardTemplate {
&& o4.is_none()
&& o5.is_none()
&& o6.is_none()
&& flags.is_empty()
}
}
42 changes: 24 additions & 18 deletions crates/proto-flow/src/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
/// ShardLabeling is a parsed and validated representation of the Flow
/// labels which are attached to Gazette ShardSpecs, that are understood
/// by the Flow runtime and influence its behavior with respect to the shard.
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ShardLabeling {
/// Catalog build identifier which the task uses.
#[prost(string, tag = "1")]
Expand Down Expand Up @@ -34,6 +34,12 @@ pub struct ShardLabeling {
/// Journal to which task stats are directed.
#[prost(string, tag = "11")]
pub stats_journal: ::prost::alloc::string::String,
/// Flags are arbitrary string-valued feature flags set on the task's shard template.
#[prost(btree_map = "string, string", tag = "12")]
pub flags: ::prost::alloc::collections::BTreeMap<
::prost::alloc::string::String,
::prost::alloc::string::String,
>,
}
/// Common `shard` sub-document logged by Stats and Log.
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
Expand Down Expand Up @@ -183,23 +189,6 @@ pub mod stats {
#[prost(uint64, tag = "2")]
pub bytes_total: u64,
}
/// MaterializeBinding represents stats for a single binding of a materialization task.
#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
pub struct MaterializeBinding {
#[prost(message, optional, tag = "1")]
pub left: ::core::option::Option<DocsAndBytes>,
#[prost(message, optional, tag = "2")]
pub right: ::core::option::Option<DocsAndBytes>,
#[prost(message, optional, tag = "3")]
pub out: ::core::option::Option<DocsAndBytes>,
/// The most recent publish timestamp from the source documents that were
/// read by this binding.
#[prost(message, optional, tag = "4")]
pub last_source_published_at: ::core::option::Option<::pbjson_types::Timestamp>,
/// Total bytes behind across all source journals of this binding.
#[prost(uint64, tag = "5")]
pub bytes_behind: u64,
}
/// CaptureBinding represents stats for a single binding of a capture task.
#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
pub struct CaptureBinding {
Expand Down Expand Up @@ -248,6 +237,23 @@ pub mod stats {
pub bytes_behind: u64,
}
}
/// MaterializeBinding represents stats for a single binding of a materialization task.
#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
pub struct MaterializeBinding {
#[prost(message, optional, tag = "1")]
pub left: ::core::option::Option<DocsAndBytes>,
#[prost(message, optional, tag = "2")]
pub right: ::core::option::Option<DocsAndBytes>,
#[prost(message, optional, tag = "3")]
pub out: ::core::option::Option<DocsAndBytes>,
/// The most recent publish timestamp from the source documents that were
/// read by this binding.
#[prost(message, optional, tag = "4")]
pub last_source_published_at: ::core::option::Option<::pbjson_types::Timestamp>,
/// Total bytes behind across all source journals of this binding.
#[prost(uint64, tag = "5")]
pub bytes_behind: u64,
}
/// Interval metrics are emitted at regular intervals.
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct Interval {
Expand Down
19 changes: 19 additions & 0 deletions crates/proto-flow/src/ops.serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,9 @@ impl serde::Serialize for ShardLabeling {
if !self.stats_journal.is_empty() {
len += 1;
}
if !self.flags.is_empty() {
len += 1;
}
let mut struct_ser = serializer.serialize_struct("ops.ShardLabeling", len)?;
if !self.build.is_empty() {
struct_ser.serialize_field("build", &self.build)?;
Expand Down Expand Up @@ -446,6 +449,9 @@ impl serde::Serialize for ShardLabeling {
if !self.stats_journal.is_empty() {
struct_ser.serialize_field("statsJournal", &self.stats_journal)?;
}
if !self.flags.is_empty() {
struct_ser.serialize_field("flags", &self.flags)?;
}
struct_ser.end()
}
}
Expand Down Expand Up @@ -473,6 +479,7 @@ impl<'de> serde::Deserialize<'de> for ShardLabeling {
"logsJournal",
"stats_journal",
"statsJournal",
"flags",
];

#[allow(clippy::enum_variant_names)]
Expand All @@ -487,6 +494,7 @@ impl<'de> serde::Deserialize<'de> for ShardLabeling {
TaskType,
LogsJournal,
StatsJournal,
Flags,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
Expand Down Expand Up @@ -518,6 +526,7 @@ impl<'de> serde::Deserialize<'de> for ShardLabeling {
"taskType" | "task_type" => Ok(GeneratedField::TaskType),
"logsJournal" | "logs_journal" => Ok(GeneratedField::LogsJournal),
"statsJournal" | "stats_journal" => Ok(GeneratedField::StatsJournal),
"flags" => Ok(GeneratedField::Flags),
_ => Err(serde::de::Error::unknown_field(value, FIELDS)),
}
}
Expand Down Expand Up @@ -547,6 +556,7 @@ impl<'de> serde::Deserialize<'de> for ShardLabeling {
let mut task_type__ = None;
let mut logs_journal__ = None;
let mut stats_journal__ = None;
let mut flags__ = None;
while let Some(k) = map_.next_key()? {
match k {
GeneratedField::Build => {
Expand Down Expand Up @@ -609,6 +619,14 @@ impl<'de> serde::Deserialize<'de> for ShardLabeling {
}
stats_journal__ = Some(map_.next_value()?);
}
GeneratedField::Flags => {
if flags__.is_some() {
return Err(serde::de::Error::duplicate_field("flags"));
}
flags__ = Some(
map_.next_value::<std::collections::BTreeMap<_, _>>()?
);
}
}
}
Ok(ShardLabeling {
Expand All @@ -622,6 +640,7 @@ impl<'de> serde::Deserialize<'de> for ShardLabeling {
task_type: task_type__.unwrap_or_default(),
logs_journal: logs_journal__.unwrap_or_default(),
stats_journal: stats_journal__.unwrap_or_default(),
flags: flags__.unwrap_or_default(),
})
}
}
Expand Down
5 changes: 5 additions & 0 deletions crates/proto-flow/tests/regression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,11 @@ fn ex_shard_labeling() -> ops::ShardLabeling {
split_target: String::new(),
task_name: "the/task/name".to_string(),
task_type: ops::TaskType::Derivation as i32,
flags: [
("buffer-size".to_string(), "1024".to_string()),
("enable-new-thing".to_string(), "true".to_string()),
]
.into(),
logs_journal: "ops/logs/one=capture/two=the%2Ftask%2Fname".to_string(),
stats_journal: "ops/stats/one=capture/two=the%2Ftask%2Fname".to_string(),
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,9 @@ expression: json_test(msg)
"taskName": "the/task/name",
"taskType": "derivation",
"logsJournal": "ops/logs/one=capture/two=the%2Ftask%2Fname",
"statsJournal": "ops/stats/one=capture/two=the%2Ftask%2Fname"
"statsJournal": "ops/stats/one=capture/two=the%2Ftask%2Fname",
"flags": {
"buffer-size": "1024",
"enable-new-thing": "true"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,8 @@ expression: proto_test(msg)
|74686525 32467461 736b2532 466e616d| the%2Ftask%2Fnam 00000070
|655a2b6f 70732f73 74617473 2f6f6e65| eZ+ops/stats/one 00000080
|3d636170 74757265 2f74776f 3d746865| =capture/two=the 00000090
|25324674 61736b25 32466e61 6d65| %2Ftask%2Fname 000000a0
000000ae
|25324674 61736b25 32466e61 6d656213| %2Ftask%2Fnameb. 000000a0
|0a0b6275 66666572 2d73697a 65120431| ..buffer-size..1 000000b0
|30323462 180a1065 6e61626c 652d6e65| 024b...enable-ne 000000c0
|772d7468 696e6712 04747275 65| w-thing..true 000000d0
000000dd
3 changes: 3 additions & 0 deletions crates/proto-gazette/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ pub struct ConsumerSpec {
/// Maximum number of assigned Shards.
#[prost(uint32, tag = "2")]
pub shard_limit: u32,
/// When true, this consumer has been signaled to exit.
#[prost(bool, tag = "3")]
pub exiting: bool,
}
/// ReplicaStatus is the status of a ShardSpec assigned to a ConsumerSpec.
/// It serves as an allocator AssignmentValue. ReplicaStatus is reduced by taking
Expand Down
Loading
Loading