Ballista Executor report plan/operators metrics to Ballista Scheduler #124
Conversation
|
Please help to review my PR. |
andygrove
left a comment
There was a problem hiding this comment.
This looks like a great improvement!
thinkharderdev
left a comment
There was a problem hiding this comment.
Thanks @mingmwang this is great! I had one nit and one more substantive comment
| println!( | ||
| "=== [{}/{}/{}] Stage finished, physical plan with metrics ===\n{}\n", | ||
| job_id, | ||
| stage_id, | ||
| partition, | ||
| DisplayableExecutionPlan::with_full_metrics(stage_plan.as_ref()).indent() | ||
| ); |
There was a problem hiding this comment.
I'm not sure we should be printing to stdout here. If we want to print something it should be controlled by the log level (i.e. people can turn it off if it's too noisy)
There was a problem hiding this comment.
I will change it to use info! here. But I realize that the println! is used in many other places like ExecutorMetricsCollector, execution_graph.resolve_shuffles() etc.
| task_statuses: vec![None; num_tasks], | ||
| output_link, | ||
| resolved, | ||
| stage_metrics: None, |
There was a problem hiding this comment.
I'm not sure about storing the raw metrics in the ExecutionGraph. It could potentially be a lot of data (eg if you had a ParquetExec across many files then you can have a potentially very big set of metrics) and we have to read/write the graph to the state store a lot.
I think it might be worth introducing a MetricStore trait where we can make the storage (or non-storage as the case may be) of metrics extensible.
There was a problem hiding this comment.
@thinkharderdev Actually the stage_metrics stored here is not the raw metrics. It is combined/aggregated among all the tasks of that stage, that's why I name it to stage_metrics, not task metrics.
There was a problem hiding this comment.
Right, that helps but I still think that storing the metrics in the ExecutionGraph is not the right way to go. Even aggregated we're adding more data to a value that has to be read/written/decoded/encoded a lot. The other problem I think is that ideally we want to discard the full ExecutionGraph soon after the job is completed to prevent unbounded growth in the state store. But metrics may be something that we wish to preserve for longer (or even indefinitely). In addition, I think this may very well be an area where extensibility could be important. The ExecutionGraph is an internal implementation detail, but users may have an interest in storing and analyzing metrics and may wish to "export" them to a different system of their choosing (RDBMS, etc). Having an interface that allows for that would be helpful.
There was a problem hiding this comment.
Even aggregated we're adding more data to a value that has to be read/written/decoded/encoded a lot
I agree with you that the ExecutionGraph is not the perfect place to store metrics. But since there is no in-memory stat store now. And I do not want to add another key space to sled/etcd to store the metrics, so just reuse the existing ExecutionGraph. I think it is not that heavy compared to the existing task_statuses array If we have thousands of tasks.
/// Status of each already scheduled task. If status is None, the partition has not yet been scheduled
pub(crate) task_statuses: Vec<Option<task_status::Status>>,
Ideally those metrics should be stored in memory because currently they are just for displaying purpose.
For long terms preserving, both the task status and metrics should be persisted to other places.
There was a problem hiding this comment.
I think we should still have different stores: in-memory stat store, persisted stat store(sled/etcd) and long term persisted store(RDBMS, object store).
|
|
||
| message NamedGauge { | ||
| string name = 1; | ||
| uint64 value = 2; |
There was a problem hiding this comment.
From the metric system, the Gauge could be negative.
INT64 is better.
There was a problem hiding this comment.
Good point. But in DataFusion metrics code base, the Gauge is of AtomicUsize
/// A gauge is the simplest metrics type. It just returns a value.
/// For example, you can easily expose current memory consumption with a gauge.
///
/// Note `clone`ing gauge update the same underlying metrics
#[derive(Debug, Clone)]
pub struct Gauge {
/// value of the metric gauge
value: std::sync::Arc<AtomicUsize>,
}
``
There was a problem hiding this comment.
@andygrove @alamb
Can we collation two side?
The Gauge use the signed integer to store the value.
apache/datafusion#1682
There was a problem hiding this comment.
I would prefer to handle the type alignment in another PR.
There was a problem hiding this comment.
I think changing Gauge in datafusion to use AtomicI64 rather than AtomicUsize would make sense to me
* Remove Keyspace::QueuedJobs * Fix UT * Fix cargo clippy for rust 1.63 Co-authored-by: yangzhong <yangzhong@ebay.com>
|
is there any other comments or concerns to be resolved before merging this PR? I haven't been following ballista closely so I don't know who has been merging the PRs recently |
|
@andygrove @thinkharderdev |
Which issue does this PR close?
Partial closes #116.
There are still some pending (nice to have) changes need to be made to the DataFusion code base
Basically this PR covered those parts:
rpc UpdateTaskStatus (UpdateTaskStatusParams) returns (UpdateTaskStatusResult) {}Rationale for this change
What changes are included in this PR?
Are there any user-facing changes?