Skip to content

Commit d39eeb6

Browse files
authored
Revert ExecutionComplete early scheduling optimization (#1929)
* Revert "Early scheduling (#1904)" This reverts commit 85c279a. * Revert Early Scheduling to fix the Build Panics
1 parent b4c8216 commit d39eeb6

File tree

11 files changed

+27
-347
lines changed

11 files changed

+27
-347
lines changed

nativelink-proto/com/github/trace_machina/nativelink/remote_execution/worker_api.proto

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,6 @@ service WorkerApi {
5757

5858
/// Informs the scheduler about the result of an execution request.
5959
rpc ExecutionResponse(ExecuteResult) returns (google.protobuf.Empty);
60-
61-
/// Notify the scheduler that an execution request is in the upload phase
62-
/// and therefore a new execution may be scheduled.
63-
rpc ExecutionComplete(ExecuteComplete) returns (google.protobuf.Empty);
6460
}
6561

6662
/// Request object for keep alive requests.
@@ -127,21 +123,6 @@ message ExecuteResult {
127123
reserved 9; // NextId.
128124
}
129125

130-
/// A notification that an ExecutionRequest is in the upload phase.
131-
message ExecuteComplete {
132-
/// ID of the worker making the request.
133-
string worker_id = 1;
134-
135-
/// The `instance_name` this task was initially assigned to. This is set by the client
136-
/// that initially sent the job as part of the BRE protocol.
137-
string instance_name = 2;
138-
139-
/// The operation ID that was executed.
140-
string operation_id = 3;
141-
142-
reserved 4; // NextId.
143-
}
144-
145126
/// Result sent back from the server when a node connects.
146127
message ConnectionResult {
147128
/// The internal ID given to the newly connected node.

nativelink-proto/genproto/com.github.trace_machina.nativelink.remote_execution.pb.rs

Lines changed: 0 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -86,20 +86,6 @@ pub mod execute_result {
8686
InternalError(super::super::super::super::super::super::google::rpc::Status),
8787
}
8888
}
89-
/// / A notification that an ExecutionRequest is in the upload phase.
90-
#[derive(Clone, PartialEq, ::prost::Message)]
91-
pub struct ExecuteComplete {
92-
/// / ID of the worker making the request.
93-
#[prost(string, tag = "1")]
94-
pub worker_id: ::prost::alloc::string::String,
95-
/// / The `instance_name` this task was initially assigned to. This is set by the client
96-
/// / that initially sent the job as part of the BRE protocol.
97-
#[prost(string, tag = "2")]
98-
pub instance_name: ::prost::alloc::string::String,
99-
/// / The operation ID that was executed.
100-
#[prost(string, tag = "3")]
101-
pub operation_id: ::prost::alloc::string::String,
102-
}
10389
/// / Result sent back from the server when a node connects.
10490
#[derive(Clone, PartialEq, ::prost::Message)]
10591
pub struct ConnectionResult {
@@ -402,34 +388,6 @@ pub mod worker_api_client {
402388
);
403389
self.inner.unary(req, path, codec).await
404390
}
405-
/// / Notify the scheduler that an execution request is in the upload phase
406-
/// / and therefore a new execution may be scheduled.
407-
pub async fn execution_complete(
408-
&mut self,
409-
request: impl tonic::IntoRequest<super::ExecuteComplete>,
410-
) -> std::result::Result<tonic::Response<()>, tonic::Status> {
411-
self.inner
412-
.ready()
413-
.await
414-
.map_err(|e| {
415-
tonic::Status::unknown(
416-
format!("Service was not ready: {}", e.into()),
417-
)
418-
})?;
419-
let codec = tonic::codec::ProstCodec::default();
420-
let path = http::uri::PathAndQuery::from_static(
421-
"/com.github.trace_machina.nativelink.remote_execution.WorkerApi/ExecutionComplete",
422-
);
423-
let mut req = request.into_request();
424-
req.extensions_mut()
425-
.insert(
426-
GrpcMethod::new(
427-
"com.github.trace_machina.nativelink.remote_execution.WorkerApi",
428-
"ExecutionComplete",
429-
),
430-
);
431-
self.inner.unary(req, path, codec).await
432-
}
433391
}
434392
}
435393
/// Generated server implementations.
@@ -491,12 +449,6 @@ pub mod worker_api_server {
491449
&self,
492450
request: tonic::Request<super::ExecuteResult>,
493451
) -> std::result::Result<tonic::Response<()>, tonic::Status>;
494-
/// / Notify the scheduler that an execution request is in the upload phase
495-
/// / and therefore a new execution may be scheduled.
496-
async fn execution_complete(
497-
&self,
498-
request: tonic::Request<super::ExecuteComplete>,
499-
) -> std::result::Result<tonic::Response<()>, tonic::Status>;
500452
}
501453
/// / This API describes how schedulers communicate with Worker nodes.
502454
/// /
@@ -760,51 +712,6 @@ pub mod worker_api_server {
760712
};
761713
Box::pin(fut)
762714
}
763-
"/com.github.trace_machina.nativelink.remote_execution.WorkerApi/ExecutionComplete" => {
764-
#[allow(non_camel_case_types)]
765-
struct ExecutionCompleteSvc<T: WorkerApi>(pub Arc<T>);
766-
impl<
767-
T: WorkerApi,
768-
> tonic::server::UnaryService<super::ExecuteComplete>
769-
for ExecutionCompleteSvc<T> {
770-
type Response = ();
771-
type Future = BoxFuture<
772-
tonic::Response<Self::Response>,
773-
tonic::Status,
774-
>;
775-
fn call(
776-
&mut self,
777-
request: tonic::Request<super::ExecuteComplete>,
778-
) -> Self::Future {
779-
let inner = Arc::clone(&self.0);
780-
let fut = async move {
781-
<T as WorkerApi>::execution_complete(&inner, request).await
782-
};
783-
Box::pin(fut)
784-
}
785-
}
786-
let accept_compression_encodings = self.accept_compression_encodings;
787-
let send_compression_encodings = self.send_compression_encodings;
788-
let max_decoding_message_size = self.max_decoding_message_size;
789-
let max_encoding_message_size = self.max_encoding_message_size;
790-
let inner = self.inner.clone();
791-
let fut = async move {
792-
let method = ExecutionCompleteSvc(inner);
793-
let codec = tonic::codec::ProstCodec::default();
794-
let mut grpc = tonic::server::Grpc::new(codec)
795-
.apply_compression_config(
796-
accept_compression_encodings,
797-
send_compression_encodings,
798-
)
799-
.apply_max_message_size_config(
800-
max_decoding_message_size,
801-
max_encoding_message_size,
802-
);
803-
let res = grpc.unary(method, req).await;
804-
Ok(res)
805-
};
806-
Box::pin(fut)
807-
}
808715
_ => {
809716
Box::pin(async move {
810717
let mut response = http::Response::new(

nativelink-scheduler/src/api_worker_scheduler.rs

Lines changed: 6 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
// limitations under the License.
1414

1515
use core::ops::{Deref, DerefMut};
16-
use std::collections::{HashMap, HashSet};
1716
use std::sync::Arc;
1817

1918
use async_lock::Mutex;
@@ -82,10 +81,6 @@ struct ApiWorkerSchedulerImpl {
8281
#[metric(group = "workers")]
8382
workers: Workers,
8483

85-
/// A set of operations that have notified completion, but are still
86-
/// uploading results.
87-
pending_results: HashMap<WorkerId, HashSet<OperationId>>,
88-
8984
/// The worker state manager.
9085
#[metric(group = "worker_state_manager")]
9186
worker_state_manager: Arc<dyn WorkerStateManager>,
@@ -173,7 +168,6 @@ impl ApiWorkerSchedulerImpl {
173168
/// running.
174169
fn remove_worker(&mut self, worker_id: &WorkerId) -> Option<Worker> {
175170
let result = self.workers.pop(worker_id);
176-
self.pending_results.remove(worker_id);
177171
self.worker_change_notify.notify_one();
178172
result
179173
}
@@ -247,17 +241,13 @@ impl ApiWorkerSchedulerImpl {
247241
})?;
248242

249243
// Ensure the worker is supposed to be running the operation.
250-
let pending_completion = !worker.running_action_infos.contains_key(operation_id);
251-
if pending_completion
252-
&& !self
253-
.pending_results
254-
.get(worker_id)
255-
.is_some_and(|pending_operations| pending_operations.contains(operation_id))
256-
{
257-
return Err(make_err!(
244+
if !worker.running_action_infos.contains_key(operation_id) {
245+
let err = make_err!(
258246
Code::Internal,
259247
"Operation {operation_id} should not be running on worker {worker_id} in SimpleScheduler::update_action"
260-
));
248+
);
249+
return Result::<(), _>::Err(err.clone())
250+
.merge(self.immediate_evict_worker(worker_id, err, false).await);
261251
}
262252

263253
let (is_finished, due_to_backpressure) = match &update {
@@ -293,32 +283,6 @@ impl ApiWorkerSchedulerImpl {
293283
return Ok(());
294284
}
295285

296-
if pending_completion {
297-
// This is absolutely always true, but pattern match anyway.
298-
if let Some(pending_operations) = self.pending_results.get_mut(worker_id) {
299-
pending_operations.remove(operation_id);
300-
if pending_operations.is_empty() {
301-
self.pending_results.remove(worker_id);
302-
}
303-
return Ok(());
304-
}
305-
}
306-
307-
Self::complete_worker_action(
308-
worker,
309-
operation_id,
310-
due_to_backpressure,
311-
&self.worker_change_notify,
312-
)
313-
.await
314-
}
315-
316-
async fn complete_worker_action(
317-
worker: &mut Worker,
318-
operation_id: &OperationId,
319-
due_to_backpressure: bool,
320-
notify: &Notify,
321-
) -> Result<(), Error> {
322286
// Clear this action from the current worker if finished.
323287
let complete_action_res = {
324288
let was_paused = !worker.can_accept_work();
@@ -333,43 +297,11 @@ impl ApiWorkerSchedulerImpl {
333297
complete_action_res
334298
};
335299

336-
notify.notify_one();
300+
self.worker_change_notify.notify_one();
337301

338302
complete_action_res
339303
}
340304

341-
async fn notify_complete(
342-
&mut self,
343-
worker_id: &WorkerId,
344-
operation_id: &OperationId,
345-
) -> Result<(), Error> {
346-
let worker = self.workers.get_mut(worker_id).err_tip(|| {
347-
format!("Worker {worker_id} does not exist in SimpleScheduler::notify_complete")
348-
})?;
349-
350-
// Ensure the worker is supposed to be running the operation.
351-
if !worker.running_action_infos.contains_key(operation_id) {
352-
let err = make_err!(
353-
Code::Internal,
354-
"Operation {operation_id} should not be running on worker {worker_id} in SimpleScheduler::update_action"
355-
);
356-
return Err(err);
357-
}
358-
359-
match self.pending_results.entry(worker_id.clone()) {
360-
std::collections::hash_map::Entry::Occupied(mut occupied_entry) => {
361-
occupied_entry.get_mut().insert(operation_id.clone());
362-
}
363-
std::collections::hash_map::Entry::Vacant(vacant_entry) => {
364-
vacant_entry
365-
.insert(HashSet::new())
366-
.insert(operation_id.clone());
367-
}
368-
}
369-
370-
Self::complete_worker_action(worker, operation_id, false, &self.worker_change_notify).await
371-
}
372-
373305
/// Notifies the specified worker to run the given action and handles errors by evicting
374306
/// the worker if the notification fails.
375307
async fn worker_notify_run_action(
@@ -476,7 +408,6 @@ impl ApiWorkerScheduler {
476408
Arc::new(Self {
477409
inner: Mutex::new(ApiWorkerSchedulerImpl {
478410
workers: Workers(LruCache::unbounded()),
479-
pending_results: HashMap::new(),
480411
worker_state_manager: worker_state_manager.clone(),
481412
allocation_strategy,
482413
worker_change_notify,
@@ -591,15 +522,6 @@ impl WorkerScheduler for ApiWorkerScheduler {
591522
inner.update_action(worker_id, operation_id, update).await
592523
}
593524

594-
async fn notify_complete(
595-
&self,
596-
worker_id: &WorkerId,
597-
operation_id: &OperationId,
598-
) -> Result<(), Error> {
599-
let mut inner = self.inner.lock().await;
600-
inner.notify_complete(worker_id, operation_id).await
601-
}
602-
603525
async fn worker_keep_alive_received(
604526
&self,
605527
worker_id: &WorkerId,

nativelink-scheduler/src/simple_scheduler.rs

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -518,16 +518,6 @@ impl WorkerScheduler for SimpleScheduler {
518518
.await
519519
}
520520

521-
async fn notify_complete(
522-
&self,
523-
worker_id: &WorkerId,
524-
operation_id: &OperationId,
525-
) -> Result<(), Error> {
526-
self.worker_scheduler
527-
.notify_complete(worker_id, operation_id)
528-
.await
529-
}
530-
531521
async fn worker_keep_alive_received(
532522
&self,
533523
worker_id: &WorkerId,

nativelink-scheduler/src/worker_scheduler.rs

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,6 @@ pub trait WorkerScheduler: Sync + Send + Unpin + RootMetricsComponent + 'static
4040
update: UpdateOperationType,
4141
) -> Result<(), Error>;
4242

43-
/// Notify that the operation has completed execution, but not uploaded yet.
44-
async fn notify_complete(
45-
&self,
46-
worker_id: &WorkerId,
47-
operation_id: &OperationId,
48-
) -> Result<(), Error>;
49-
5043
/// Event for when the keep alive message was received from the worker.
5144
async fn worker_keep_alive_received(
5245
&self,

nativelink-service/src/worker_api_server.rs

Lines changed: 1 addition & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::
2727
WorkerApi, WorkerApiServer as Server,
2828
};
2929
use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::{
30-
execute_result, ConnectWorkerRequest, ExecuteComplete, ExecuteResult, GoingAwayRequest, KeepAliveRequest, UpdateForWorker
30+
execute_result, ConnectWorkerRequest, ExecuteResult, GoingAwayRequest, KeepAliveRequest, UpdateForWorker
3131
};
3232
use nativelink_scheduler::worker::Worker;
3333
use nativelink_scheduler::worker_scheduler::WorkerScheduler;
@@ -258,19 +258,6 @@ impl WorkerApiServer {
258258
}
259259
Ok(Response::new(()))
260260
}
261-
262-
async fn inner_execution_complete(
263-
&self,
264-
execute_complete: ExecuteComplete,
265-
) -> Result<Response<()>, Error> {
266-
let worker_id: WorkerId = execute_complete.worker_id.into();
267-
let operation_id = OperationId::from(execute_complete.operation_id);
268-
self.scheduler
269-
.notify_complete(&worker_id, &operation_id)
270-
.await
271-
.err_tip(|| format!("Failed to operation {operation_id:?}"))?;
272-
Ok(Response::new(()))
273-
}
274261
}
275262

276263
#[tonic::async_trait]
@@ -344,20 +331,4 @@ impl WorkerApi for WorkerApiServer {
344331
.await
345332
.map_err(Into::into)
346333
}
347-
348-
#[instrument(
349-
err,
350-
ret(level = Level::DEBUG),
351-
level = Level::ERROR,
352-
skip_all,
353-
fields(request = ?grpc_request.get_ref())
354-
)]
355-
async fn execution_complete(
356-
&self,
357-
grpc_request: Request<ExecuteComplete>,
358-
) -> Result<Response<()>, Status> {
359-
self.inner_execution_complete(grpc_request.into_inner())
360-
.await
361-
.map_err(Into::into)
362-
}
363334
}

0 commit comments

Comments
 (0)