Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added helm-releases/parseable-enterprise-2.6.7.tgz
Binary file not shown.
179 changes: 102 additions & 77 deletions index.yaml

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion src/alerts/alert_structs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -833,7 +833,7 @@ impl AlertStateEntry {

// Create a sorted view without mutating the original
let mut sorted_states = self.states.clone();
sorted_states.sort_by(|a, b| a.last_updated_at.cmp(&b.last_updated_at));
sorted_states.sort_by_key(|a| a.last_updated_at);

for transition in &sorted_states {
match transition.state {
Expand Down
4 changes: 2 additions & 2 deletions src/alerts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1459,8 +1459,8 @@ impl AlertManagerTrait for Alerts {
// let alerts = self.alerts.read().await;
let mut tags = if let Some(alerts) = self.alerts.read().await.get(tenant) {
alerts
.iter()
.filter_map(|(_, alert)| alert.get_tags().as_ref())
.values()
.filter_map(|alert| alert.get_tags().as_ref())
.flat_map(|t| t.iter().cloned())
.collect::<Vec<String>>()
} else {
Expand Down
6 changes: 3 additions & 3 deletions src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,15 +280,15 @@ pub async fn get_stats_date(
let events_ingested = EVENTS_INGESTED_DATE
.get_metric_with_label_values(&event_labels)
.unwrap()
.get() as u64;
.get();
let ingestion_size = EVENTS_INGESTED_SIZE_DATE
.get_metric_with_label_values(&event_labels)
.unwrap()
.get() as u64;
.get();
let storage_size = EVENTS_STORAGE_SIZE_DATE
.get_metric_with_label_values(&storage_size_labels)
.unwrap()
.get() as u64;
.get();

let stats = Stats {
events: events_ingested,
Expand Down
43 changes: 20 additions & 23 deletions src/metrics/prom_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,29 +129,26 @@ impl Metrics {
for sample in samples {
if let PromValue::Gauge(val) = sample.value {
match sample.metric.as_str() {
"parseable_events_ingested_date" => {
"parseable_events_ingested_date"
if sample.labels.get("stream").expect("stream name is present")
== stream_name
&& sample.labels.get("date").expect("date is present") == date
{
events_ingested = val as u64;
}
&& sample.labels.get("date").expect("date is present") == date =>
{
events_ingested = val as u64;
}
"parseable_events_ingested_size_date" => {
"parseable_events_ingested_size_date"
if sample.labels.get("stream").expect("stream name is present")
== stream_name
&& sample.labels.get("date").expect("date is present") == date
{
ingestion_size = val as u64;
}
&& sample.labels.get("date").expect("date is present") == date =>
{
ingestion_size = val as u64;
}
"parseable_events_storage_size_date" => {
"parseable_events_storage_size_date"
if sample.labels.get("stream").expect("stream name is present")
== stream_name
&& sample.labels.get("date").expect("date is present") == date
{
storage_size = val as u64;
}
&& sample.labels.get("date").expect("date is present") == date =>
{
storage_size = val as u64;
}
_ => {}
}
Expand Down Expand Up @@ -198,15 +195,15 @@ impl Metrics {
prom_dress.parseable_storage_size.data += val;
}
}
"parseable_lifetime_events_storage_size" => {
if sample.labels.get("type").expect("type is present") == "data" {
prom_dress.parseable_lifetime_storage_size.data += val;
}
"parseable_lifetime_events_storage_size"
if sample.labels.get("type").expect("type is present") == "data" =>
{
prom_dress.parseable_lifetime_storage_size.data += val;
}
"parseable_deleted_events_storage_size" => {
if sample.labels.get("type").expect("type is present") == "data" {
prom_dress.parseable_deleted_storage_size.data += val;
}
"parseable_deleted_events_storage_size"
if sample.labels.get("type").expect("type is present") == "data" =>
{
prom_dress.parseable_deleted_storage_size.data += val;
}
_ => {}
}
Expand Down
92 changes: 30 additions & 62 deletions src/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,26 @@ use crate::parseable::{DEFAULT_TENANT, PARSEABLE};
use crate::storage::{ObjectStorageProvider, ObjectStoreFormat};
use crate::utils::time::TimeRange;

/// Boxed record-batch stream used as the streaming half of query results.
type BoxedBatchStream = Pin<
Box<
RecordBatchStreamAdapter<
select_all::SelectAll<
Pin<
Box<
dyn RecordBatchStream<
Item = Result<RecordBatch, datafusion::error::DataFusionError>,
> + Send,
>,
>,
>,
>,
>,
>;

/// Result type returned by query execution: either collected batches or a streaming adapter, plus field names.
type QueryResult = Result<(Either<Vec<RecordBatch>, BoxedBatchStream>, Vec<String>), ExecuteError>;

// pub static QUERY_SESSION: Lazy<SessionContext> =
// Lazy::new(|| Query::create_session_context(PARSEABLE.storage()));

Expand Down Expand Up @@ -133,37 +153,7 @@ impl InMemorySessionContext {

/// This function executes a query on the dedicated runtime, ensuring that the query is not isolated to a single thread/CPU
/// at a time and has access to the entire thread pool, enabling better concurrent processing, and thus quicker results.
pub async fn execute(
query: Query,
is_streaming: bool,
tenant_id: &Option<String>,
) -> Result<
(
Either<
Vec<RecordBatch>,
Pin<
Box<
RecordBatchStreamAdapter<
select_all::SelectAll<
Pin<
Box<
dyn RecordBatchStream<
Item = Result<
RecordBatch,
datafusion::error::DataFusionError,
>,
> + Send,
>,
>,
>,
>,
>,
>,
>,
Vec<String>,
),
ExecuteError,
> {
pub async fn execute(query: Query, is_streaming: bool, tenant_id: &Option<String>) -> QueryResult {
let id = tenant_id.clone();
QUERY_RUNTIME
.spawn(async move { query.execute(is_streaming, &id).await })
Expand Down Expand Up @@ -272,37 +262,15 @@ impl Query {
/// this function returns the result of the query
/// if streaming is true, it returns a stream
/// if streaming is false, it returns a vector of record batches
pub async fn execute(
&self,
is_streaming: bool,
tenant_id: &Option<String>,
) -> Result<
(
Either<
Vec<RecordBatch>,
Pin<
Box<
RecordBatchStreamAdapter<
select_all::SelectAll<
Pin<
Box<
dyn RecordBatchStream<
Item = Result<
RecordBatch,
datafusion::error::DataFusionError,
>,
> + Send,
>,
>,
>,
>,
>,
>,
>,
Vec<String>,
),
ExecuteError,
> {
#[tracing::instrument(
name = "datafusion.execute",
skip(self, is_streaming, tenant_id),
fields(
db.system.name = "datafusion",
db.operation.name = "SELECT",
)
)]
pub async fn execute(&self, is_streaming: bool, tenant_id: &Option<String>) -> QueryResult {
let df = QUERY_SESSION
.get_ctx()
.execute_logical_plan(self.final_logical_plan(tenant_id))
Expand Down
8 changes: 4 additions & 4 deletions src/query/stream_schema_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -759,10 +759,10 @@ fn is_overlapping_query(
for filter in time_filters {
match filter {
PartialTimeFilter::Low(Bound::Excluded(time))
| PartialTimeFilter::Low(Bound::Included(time)) => {
if time < &first_entry_lower_bound.naive_utc() {
return true;
}
| PartialTimeFilter::Low(Bound::Included(time))
if time < &first_entry_lower_bound.naive_utc() =>
{
return true;
}
_ => {}
}
Expand Down
Loading