From c66d8014060dfd1d49fc4cba79fa560a35494fd2 Mon Sep 17 00:00:00 2001 From: Yashwanth Anantharaju Date: Thu, 27 Apr 2023 12:22:58 -0400 Subject: [PATCH 1/3] handle conflict errors from run service --- src/Runner.Common/RunServer.cs | 3 +- src/Runner.Common/RunnerService.cs | 5 +- src/Runner.Listener/Runner.cs | 118 ++++++++++++++++------- src/Sdk/DTWebApi/WebApi/Exceptions.cs | 20 ++++ src/Sdk/RSWebApi/RunServiceHttpClient.cs | 2 + 5 files changed, 111 insertions(+), 37 deletions(-) diff --git a/src/Runner.Common/RunServer.cs b/src/Runner.Common/RunServer.cs index 4f344c0b070..3cc53f3d894 100644 --- a/src/Runner.Common/RunServer.cs +++ b/src/Runner.Common/RunServer.cs @@ -52,7 +52,8 @@ public Task GetJobMessageAsync(string id, CancellationTo { CheckConnection(); return RetryRequest( - async () => await _runServiceHttpClient.GetJobMessageAsync(requestUri, id, cancellationToken), cancellationToken); + async () => await _runServiceHttpClient.GetJobMessageAsync(requestUri, id, cancellationToken), cancellationToken, + shouldRetry: ex => ex is not TaskOrchestrationJobAlreadyAcquiredException); } public Task CompleteJobAsync(Guid planId, Guid jobId, TaskResult result, Dictionary outputs, IList stepResults, CancellationToken cancellationToken) diff --git a/src/Runner.Common/RunnerService.cs b/src/Runner.Common/RunnerService.cs index a0cb96a0ecf..f3165d7a1fa 100644 --- a/src/Runner.Common/RunnerService.cs +++ b/src/Runner.Common/RunnerService.cs @@ -83,7 +83,8 @@ async Task wrappedFunc() protected async Task RetryRequest(Func> func, CancellationToken cancellationToken, - int maxRetryAttemptsCount = 5 + int maxRetryAttemptsCount = 5, + Func shouldRetry = null ) { var retryCount = 0; @@ -96,7 +97,7 @@ protected async Task RetryRequest(Func> func, return await func(); } // TODO: Add handling of non-retriable exceptions: https://github.com/github/actions-broker/issues/122 - catch (Exception ex) when (retryCount < maxRetryAttemptsCount) + catch (Exception ex) when (retryCount < maxRetryAttemptsCount && (shouldRetry == null || shouldRetry(ex))) { Trace.Error("Catch exception during request"); Trace.Error(ex); diff --git a/src/Runner.Listener/Runner.cs b/src/Runner.Listener/Runner.cs index 865c596a91e..8d399f5083a 100644 --- a/src/Runner.Listener/Runner.cs +++ b/src/Runner.Listener/Runner.cs @@ -370,7 +370,8 @@ private async Task RunAsync(RunnerSettings settings, bool runOnce = false) _term.WriteLine($"{DateTime.UtcNow:u}: Listening for Jobs"); IJobDispatcher jobDispatcher = null; - CancellationTokenSource messageQueueLoopTokenSource = CancellationTokenSource.CreateLinkedTokenSource(HostContext.RunnerShutdownToken); + CancellationTokenSource messageQueueLoopTokenSource = + CancellationTokenSource.CreateLinkedTokenSource(HostContext.RunnerShutdownToken); // Should we try to cleanup ephemeral runners bool runOnceJobCompleted = false; @@ -394,17 +395,20 @@ private async Task RunAsync(RunnerSettings settings, bool runOnce = false) bool skipMessageDeletion = false; try { - Task getNextMessage = _listener.GetNextMessageAsync(messageQueueLoopTokenSource.Token); + Task getNextMessage = + _listener.GetNextMessageAsync(messageQueueLoopTokenSource.Token); if (autoUpdateInProgress) { - Trace.Verbose("Auto update task running at backend, waiting for getNextMessage or selfUpdateTask to finish."); + Trace.Verbose( + "Auto update task running at backend, waiting for getNextMessage or selfUpdateTask to finish."); Task completeTask = await Task.WhenAny(getNextMessage, selfUpdateTask); if (completeTask == selfUpdateTask) { autoUpdateInProgress = false; if (await selfUpdateTask) { - Trace.Info("Auto update task finished at backend, an runner update is ready to apply exit the current runner instance."); + Trace.Info( + "Auto update task finished at backend, an runner update is ready to apply exit the current runner instance."); Trace.Info("Stop message queue looping."); messageQueueLoopTokenSource.Cancel(); try @@ -427,19 +431,23 @@ private async Task RunAsync(RunnerSettings settings, bool runOnce = false) } else { - Trace.Info("Auto update task finished at backend, there is no available runner update needs to apply, continue message queue looping."); + Trace.Info( + "Auto update task finished at backend, there is no available runner update needs to apply, continue message queue looping."); } } } if (runOnceJobReceived) { - Trace.Verbose("One time used runner has start running its job, waiting for getNextMessage or the job to finish."); - Task completeTask = await Task.WhenAny(getNextMessage, jobDispatcher.RunOnceJobCompleted.Task); + Trace.Verbose( + "One time used runner has start running its job, waiting for getNextMessage or the job to finish."); + Task completeTask = await Task.WhenAny(getNextMessage, + jobDispatcher.RunOnceJobCompleted.Task); if (completeTask == jobDispatcher.RunOnceJobCompleted.Task) { runOnceJobCompleted = true; - Trace.Info("Job has finished at backend, the runner will exit since it is running under onetime use mode."); + Trace.Info( + "Job has finished at backend, the runner will exit since it is running under onetime use mode."); Trace.Info("Stop message queue looping."); messageQueueLoopTokenSource.Cancel(); try @@ -457,53 +465,73 @@ private async Task RunAsync(RunnerSettings settings, bool runOnce = false) message = await getNextMessage; //get next message HostContext.WritePerfCounter($"MessageReceived_{message.MessageType}"); - if (string.Equals(message.MessageType, AgentRefreshMessage.MessageType, StringComparison.OrdinalIgnoreCase) || - string.Equals(message.MessageType, RunnerRefreshMessage.MessageType, StringComparison.OrdinalIgnoreCase)) + if (string.Equals(message.MessageType, AgentRefreshMessage.MessageType, + StringComparison.OrdinalIgnoreCase) || + string.Equals(message.MessageType, RunnerRefreshMessage.MessageType, + StringComparison.OrdinalIgnoreCase)) { if (autoUpdateInProgress == false) { autoUpdateInProgress = true; AgentRefreshMessage runnerUpdateMessage = null; - if (string.Equals(message.MessageType, AgentRefreshMessage.MessageType, StringComparison.OrdinalIgnoreCase)) + if (string.Equals(message.MessageType, AgentRefreshMessage.MessageType, + StringComparison.OrdinalIgnoreCase)) { runnerUpdateMessage = JsonUtility.FromString(message.Body); } else { - var brokerRunnerUpdateMessage = JsonUtility.FromString(message.Body); - runnerUpdateMessage = new AgentRefreshMessage(brokerRunnerUpdateMessage.RunnerId, brokerRunnerUpdateMessage.TargetVersion, TimeSpan.FromSeconds(brokerRunnerUpdateMessage.TimeoutInSeconds)); + var brokerRunnerUpdateMessage = + JsonUtility.FromString(message.Body); + runnerUpdateMessage = new AgentRefreshMessage( + brokerRunnerUpdateMessage.RunnerId, brokerRunnerUpdateMessage.TargetVersion, + TimeSpan.FromSeconds(brokerRunnerUpdateMessage.TimeoutInSeconds)); } #if DEBUG // Can mock the update for testing - if (StringUtil.ConvertToBoolean(Environment.GetEnvironmentVariable("GITHUB_ACTIONS_RUNNER_IS_MOCK_UPDATE"))) + if (StringUtil.ConvertToBoolean( + Environment.GetEnvironmentVariable("GITHUB_ACTIONS_RUNNER_IS_MOCK_UPDATE"))) { // The mock_update_messages.json file should be an object with keys being the current version and values being the targeted mock version object // Example: { "2.283.2": {"targetVersion":"2.284.1"}, "2.284.1": {"targetVersion":"2.285.0"}} - var mockUpdatesPath = Path.Combine(HostContext.GetDirectory(WellKnownDirectory.Root), "mock_update_messages.json"); + var mockUpdatesPath = + Path.Combine(HostContext.GetDirectory(WellKnownDirectory.Root), + "mock_update_messages.json"); if (File.Exists(mockUpdatesPath)) { - var mockUpdateMessages = JsonUtility.FromString>(File.ReadAllText(mockUpdatesPath)); + var mockUpdateMessages = + JsonUtility.FromString>( + File.ReadAllText(mockUpdatesPath)); if (mockUpdateMessages.ContainsKey(BuildConstants.RunnerPackage.Version)) { - var mockTargetVersion = mockUpdateMessages[BuildConstants.RunnerPackage.Version].TargetVersion; - _term.WriteLine($"Mocking update, using version {mockTargetVersion} instead of {runnerUpdateMessage.TargetVersion}"); - Trace.Info($"Mocking update, using version {mockTargetVersion} instead of {runnerUpdateMessage.TargetVersion}"); - runnerUpdateMessage = new AgentRefreshMessage(runnerUpdateMessage.AgentId, mockTargetVersion, runnerUpdateMessage.Timeout); + var mockTargetVersion = + mockUpdateMessages[BuildConstants.RunnerPackage.Version] + .TargetVersion; + _term.WriteLine( + $"Mocking update, using version {mockTargetVersion} instead of {runnerUpdateMessage.TargetVersion}"); + Trace.Info( + $"Mocking update, using version {mockTargetVersion} instead of {runnerUpdateMessage.TargetVersion}"); + runnerUpdateMessage = + new AgentRefreshMessage(runnerUpdateMessage.AgentId, + mockTargetVersion, runnerUpdateMessage.Timeout); } } } #endif var selfUpdater = HostContext.GetService(); - selfUpdateTask = selfUpdater.SelfUpdate(runnerUpdateMessage, jobDispatcher, false, HostContext.RunnerShutdownToken); + selfUpdateTask = selfUpdater.SelfUpdate(runnerUpdateMessage, jobDispatcher, false, + HostContext.RunnerShutdownToken); Trace.Info("Refresh message received, kick-off selfupdate background process."); } else { - Trace.Info("Refresh message received, skip autoupdate since a previous autoupdate is already running."); + Trace.Info( + "Refresh message received, skip autoupdate since a previous autoupdate is already running."); } } - else if (string.Equals(message.MessageType, JobRequestMessageTypes.PipelineAgentJobRequest, StringComparison.OrdinalIgnoreCase)) + else if (string.Equals(message.MessageType, JobRequestMessageTypes.PipelineAgentJobRequest, + StringComparison.OrdinalIgnoreCase)) { if (autoUpdateInProgress || runOnceJobReceived) { @@ -512,8 +540,10 @@ private async Task RunAsync(RunnerSettings settings, bool runOnce = false) } else { - Trace.Info($"Received job message of length {message.Body.Length} from service, with hash '{IOUtil.GetSha256Hash(message.Body)}'"); - var jobMessage = StringUtil.ConvertFromJson(message.Body); + Trace.Info( + $"Received job message of length {message.Body.Length} from service, with hash '{IOUtil.GetSha256Hash(message.Body)}'"); + var jobMessage = + StringUtil.ConvertFromJson(message.Body); jobDispatcher.Run(jobMessage, runOnce); if (runOnce) { @@ -543,13 +573,25 @@ private async Task RunAsync(RunnerSettings settings, bool runOnce = false) { var actionsRunServer = HostContext.CreateService(); await actionsRunServer.ConnectAsync(new Uri(settings.ServerUrl), creds); - jobRequestMessage = await actionsRunServer.GetJobMessageAsync(messageRef.RunnerRequestId, messageQueueLoopTokenSource.Token); + jobRequestMessage = + await actionsRunServer.GetJobMessageAsync(messageRef.RunnerRequestId, + messageQueueLoopTokenSource.Token); } else { var runServer = HostContext.CreateService(); await runServer.ConnectAsync(new Uri(messageRef.RunServiceUrl), creds); - jobRequestMessage = await runServer.GetJobMessageAsync(messageRef.RunnerRequestId, messageQueueLoopTokenSource.Token); + try + { + jobRequestMessage = + await runServer.GetJobMessageAsync(messageRef.RunnerRequestId, + messageQueueLoopTokenSource.Token); + } + catch (TaskOrchestrationJobAlreadyAcquiredException) + { + Trace.Info("Job is already acquired, skip this message."); + continue; + } } jobDispatcher.Run(jobRequestMessage, runOnce); @@ -560,7 +602,8 @@ private async Task RunAsync(RunnerSettings settings, bool runOnce = false) } } } - else if (string.Equals(message.MessageType, JobCancelMessage.MessageType, StringComparison.OrdinalIgnoreCase)) + else if (string.Equals(message.MessageType, JobCancelMessage.MessageType, + StringComparison.OrdinalIgnoreCase)) { var cancelJobMessage = JsonUtility.FromString(message.Body); bool jobCancelled = jobDispatcher.Cancel(cancelJobMessage); @@ -568,20 +611,26 @@ private async Task RunAsync(RunnerSettings settings, bool runOnce = false) if (skipMessageDeletion) { - Trace.Info($"Skip message deletion for cancellation message '{message.MessageId}'."); + Trace.Info( + $"Skip message deletion for cancellation message '{message.MessageId}'."); } } - else if (string.Equals(message.MessageType, Pipelines.HostedRunnerShutdownMessage.MessageType, StringComparison.OrdinalIgnoreCase)) + else if (string.Equals(message.MessageType, + Pipelines.HostedRunnerShutdownMessage.MessageType, + StringComparison.OrdinalIgnoreCase)) { - var HostedRunnerShutdownMessage = JsonUtility.FromString(message.Body); + var HostedRunnerShutdownMessage = + JsonUtility.FromString(message.Body); skipMessageDeletion = true; skipSessionDeletion = true; - Trace.Info($"Service requests the hosted runner to shutdown. Reason: '{HostedRunnerShutdownMessage.Reason}'."); + Trace.Info( + $"Service requests the hosted runner to shutdown. Reason: '{HostedRunnerShutdownMessage.Reason}'."); return Constants.Runner.ReturnCode.Success; } else { - Trace.Error($"Received message {message.MessageId} with unsupported message type {message.MessageType}."); + Trace.Error( + $"Received message {message.MessageId} with unsupported message type {message.MessageType}."); } } finally @@ -594,7 +643,8 @@ private async Task RunAsync(RunnerSettings settings, bool runOnce = false) } catch (Exception ex) { - Trace.Error($"Catch exception during delete message from message queue. message id: {message.MessageId}"); + Trace.Error( + $"Catch exception during delete message from message queue. message id: {message.MessageId}"); Trace.Error(ex); } finally diff --git a/src/Sdk/DTWebApi/WebApi/Exceptions.cs b/src/Sdk/DTWebApi/WebApi/Exceptions.cs index 3690bbe6bb0..92cea8d5962 100644 --- a/src/Sdk/DTWebApi/WebApi/Exceptions.cs +++ b/src/Sdk/DTWebApi/WebApi/Exceptions.cs @@ -1518,6 +1518,26 @@ private TaskOrchestrationJobNotFoundException(SerializationInfo info, StreamingC { } } + + [Serializable] + [ExceptionMapping("0.0", "3.0", "TaskOrchestrationJobAlreadyAcquiredException", "GitHub.DistributedTask.WebApi.TaskOrchestrationJobAlreadyAcquiredException, GitHub.DistributedTask.WebApi, Version=14.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a")] + public sealed class TaskOrchestrationJobAlreadyAcquiredException : DistributedTaskException + { + public TaskOrchestrationJobAlreadyAcquiredException(String message) + : base(message) + { + } + + public TaskOrchestrationJobAlreadyAcquiredException(String message, Exception innerException) + : base(message, innerException) + { + } + + private TaskOrchestrationJobAlreadyAcquiredException(SerializationInfo info, StreamingContext context) + : base(info, context) + { + } + } [Serializable] [ExceptionMapping("0.0", "3.0", "TaskOrchestrationPlanSecurityException", "GitHub.DistributedTask.WebApi.TaskOrchestrationPlanSecurityException, GitHub.DistributedTask.WebApi, Version=14.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a")] diff --git a/src/Sdk/RSWebApi/RunServiceHttpClient.cs b/src/Sdk/RSWebApi/RunServiceHttpClient.cs index 65259c5ba22..8ddd1672b04 100644 --- a/src/Sdk/RSWebApi/RunServiceHttpClient.cs +++ b/src/Sdk/RSWebApi/RunServiceHttpClient.cs @@ -86,6 +86,8 @@ public async Task GetJobMessageAsync( { case HttpStatusCode.NotFound: throw new TaskOrchestrationJobNotFoundException($"Job message not found: {messageId}"); + case HttpStatusCode.Conflict: + throw new TaskOrchestrationJobAlreadyAcquiredException($"Job message already acquired: {messageId}"); default: throw new Exception($"Failed to get job message: {result.Error}"); } From e8883e467d0c5f7cb7f9ab480dcacbe03cfe1204 Mon Sep 17 00:00:00 2001 From: Yashwanth Anantharaju Date: Thu, 27 Apr 2023 12:32:08 -0400 Subject: [PATCH 2/3] nit: formatting --- src/Runner.Common/RunnerService.cs | 4 ++-- src/Runner.Listener/Runner.cs | 2 +- src/Sdk/DTWebApi/WebApi/Exceptions.cs | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/Runner.Common/RunnerService.cs b/src/Runner.Common/RunnerService.cs index f3165d7a1fa..4694af1ce2e 100644 --- a/src/Runner.Common/RunnerService.cs +++ b/src/Runner.Common/RunnerService.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Threading; using System.Threading.Tasks; using GitHub.Runner.Sdk; @@ -80,7 +80,7 @@ async Task wrappedFunc() } await RetryRequest(wrappedFunc, cancellationToken, maxRetryAttemptsCount); } - + protected async Task RetryRequest(Func> func, CancellationToken cancellationToken, int maxRetryAttemptsCount = 5, diff --git a/src/Runner.Listener/Runner.cs b/src/Runner.Listener/Runner.cs index 8d399f5083a..b9e071370f9 100644 --- a/src/Runner.Listener/Runner.cs +++ b/src/Runner.Listener/Runner.cs @@ -585,7 +585,7 @@ await actionsRunServer.GetJobMessageAsync(messageRef.RunnerRequestId, { jobRequestMessage = await runServer.GetJobMessageAsync(messageRef.RunnerRequestId, - messageQueueLoopTokenSource.Token); + messageQueueLoopTokenSource.Token); } catch (TaskOrchestrationJobAlreadyAcquiredException) { diff --git a/src/Sdk/DTWebApi/WebApi/Exceptions.cs b/src/Sdk/DTWebApi/WebApi/Exceptions.cs index 92cea8d5962..aed702b7e5c 100644 --- a/src/Sdk/DTWebApi/WebApi/Exceptions.cs +++ b/src/Sdk/DTWebApi/WebApi/Exceptions.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Runtime.Serialization; using GitHub.Services.Common; @@ -1518,7 +1518,7 @@ private TaskOrchestrationJobNotFoundException(SerializationInfo info, StreamingC { } } - + [Serializable] [ExceptionMapping("0.0", "3.0", "TaskOrchestrationJobAlreadyAcquiredException", "GitHub.DistributedTask.WebApi.TaskOrchestrationJobAlreadyAcquiredException, GitHub.DistributedTask.WebApi, Version=14.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a")] public sealed class TaskOrchestrationJobAlreadyAcquiredException : DistributedTaskException From f56b2f318bb054cf4c37bd3cbd6392d094ee0fb6 Mon Sep 17 00:00:00 2001 From: Yashwanth Anantharaju Date: Thu, 27 Apr 2023 12:43:18 -0400 Subject: [PATCH 3/3] fix formatting --- src/Runner.Listener/Runner.cs | 110 +++++++++++----------------------- 1 file changed, 35 insertions(+), 75 deletions(-) diff --git a/src/Runner.Listener/Runner.cs b/src/Runner.Listener/Runner.cs index b9e071370f9..e71ba9cb5b0 100644 --- a/src/Runner.Listener/Runner.cs +++ b/src/Runner.Listener/Runner.cs @@ -370,8 +370,7 @@ private async Task RunAsync(RunnerSettings settings, bool runOnce = false) _term.WriteLine($"{DateTime.UtcNow:u}: Listening for Jobs"); IJobDispatcher jobDispatcher = null; - CancellationTokenSource messageQueueLoopTokenSource = - CancellationTokenSource.CreateLinkedTokenSource(HostContext.RunnerShutdownToken); + CancellationTokenSource messageQueueLoopTokenSource = CancellationTokenSource.CreateLinkedTokenSource(HostContext.RunnerShutdownToken); // Should we try to cleanup ephemeral runners bool runOnceJobCompleted = false; @@ -395,20 +394,17 @@ private async Task RunAsync(RunnerSettings settings, bool runOnce = false) bool skipMessageDeletion = false; try { - Task getNextMessage = - _listener.GetNextMessageAsync(messageQueueLoopTokenSource.Token); + Task getNextMessage = _listener.GetNextMessageAsync(messageQueueLoopTokenSource.Token); if (autoUpdateInProgress) { - Trace.Verbose( - "Auto update task running at backend, waiting for getNextMessage or selfUpdateTask to finish."); + Trace.Verbose("Auto update task running at backend, waiting for getNextMessage or selfUpdateTask to finish."); Task completeTask = await Task.WhenAny(getNextMessage, selfUpdateTask); if (completeTask == selfUpdateTask) { autoUpdateInProgress = false; if (await selfUpdateTask) { - Trace.Info( - "Auto update task finished at backend, an runner update is ready to apply exit the current runner instance."); + Trace.Info("Auto update task finished at backend, an runner update is ready to apply exit the current runner instance."); Trace.Info("Stop message queue looping."); messageQueueLoopTokenSource.Cancel(); try @@ -431,23 +427,19 @@ private async Task RunAsync(RunnerSettings settings, bool runOnce = false) } else { - Trace.Info( - "Auto update task finished at backend, there is no available runner update needs to apply, continue message queue looping."); + Trace.Info("Auto update task finished at backend, there is no available runner update needs to apply, continue message queue looping."); } } } if (runOnceJobReceived) { - Trace.Verbose( - "One time used runner has start running its job, waiting for getNextMessage or the job to finish."); - Task completeTask = await Task.WhenAny(getNextMessage, - jobDispatcher.RunOnceJobCompleted.Task); + Trace.Verbose("One time used runner has start running its job, waiting for getNextMessage or the job to finish."); + Task completeTask = await Task.WhenAny(getNextMessage, jobDispatcher.RunOnceJobCompleted.Task); if (completeTask == jobDispatcher.RunOnceJobCompleted.Task) { runOnceJobCompleted = true; - Trace.Info( - "Job has finished at backend, the runner will exit since it is running under onetime use mode."); + Trace.Info("Job has finished at backend, the runner will exit since it is running under onetime use mode."); Trace.Info("Stop message queue looping."); messageQueueLoopTokenSource.Cancel(); try @@ -465,73 +457,53 @@ private async Task RunAsync(RunnerSettings settings, bool runOnce = false) message = await getNextMessage; //get next message HostContext.WritePerfCounter($"MessageReceived_{message.MessageType}"); - if (string.Equals(message.MessageType, AgentRefreshMessage.MessageType, - StringComparison.OrdinalIgnoreCase) || - string.Equals(message.MessageType, RunnerRefreshMessage.MessageType, - StringComparison.OrdinalIgnoreCase)) + if (string.Equals(message.MessageType, AgentRefreshMessage.MessageType, StringComparison.OrdinalIgnoreCase) || + string.Equals(message.MessageType, RunnerRefreshMessage.MessageType, StringComparison.OrdinalIgnoreCase)) { if (autoUpdateInProgress == false) { autoUpdateInProgress = true; AgentRefreshMessage runnerUpdateMessage = null; - if (string.Equals(message.MessageType, AgentRefreshMessage.MessageType, - StringComparison.OrdinalIgnoreCase)) + if (string.Equals(message.MessageType, AgentRefreshMessage.MessageType, StringComparison.OrdinalIgnoreCase)) { runnerUpdateMessage = JsonUtility.FromString(message.Body); } else { - var brokerRunnerUpdateMessage = - JsonUtility.FromString(message.Body); - runnerUpdateMessage = new AgentRefreshMessage( - brokerRunnerUpdateMessage.RunnerId, brokerRunnerUpdateMessage.TargetVersion, - TimeSpan.FromSeconds(brokerRunnerUpdateMessage.TimeoutInSeconds)); + var brokerRunnerUpdateMessage = JsonUtility.FromString(message.Body); + runnerUpdateMessage = new AgentRefreshMessage(brokerRunnerUpdateMessage.RunnerId, brokerRunnerUpdateMessage.TargetVersion, TimeSpan.FromSeconds(brokerRunnerUpdateMessage.TimeoutInSeconds)); } #if DEBUG // Can mock the update for testing - if (StringUtil.ConvertToBoolean( - Environment.GetEnvironmentVariable("GITHUB_ACTIONS_RUNNER_IS_MOCK_UPDATE"))) + if (StringUtil.ConvertToBoolean(Environment.GetEnvironmentVariable("GITHUB_ACTIONS_RUNNER_IS_MOCK_UPDATE"))) { // The mock_update_messages.json file should be an object with keys being the current version and values being the targeted mock version object // Example: { "2.283.2": {"targetVersion":"2.284.1"}, "2.284.1": {"targetVersion":"2.285.0"}} - var mockUpdatesPath = - Path.Combine(HostContext.GetDirectory(WellKnownDirectory.Root), - "mock_update_messages.json"); + var mockUpdatesPath = Path.Combine(HostContext.GetDirectory(WellKnownDirectory.Root), "mock_update_messages.json"); if (File.Exists(mockUpdatesPath)) { - var mockUpdateMessages = - JsonUtility.FromString>( - File.ReadAllText(mockUpdatesPath)); + var mockUpdateMessages = JsonUtility.FromString>(File.ReadAllText(mockUpdatesPath)); if (mockUpdateMessages.ContainsKey(BuildConstants.RunnerPackage.Version)) { - var mockTargetVersion = - mockUpdateMessages[BuildConstants.RunnerPackage.Version] - .TargetVersion; - _term.WriteLine( - $"Mocking update, using version {mockTargetVersion} instead of {runnerUpdateMessage.TargetVersion}"); - Trace.Info( - $"Mocking update, using version {mockTargetVersion} instead of {runnerUpdateMessage.TargetVersion}"); - runnerUpdateMessage = - new AgentRefreshMessage(runnerUpdateMessage.AgentId, - mockTargetVersion, runnerUpdateMessage.Timeout); + var mockTargetVersion = mockUpdateMessages[BuildConstants.RunnerPackage.Version].TargetVersion; + _term.WriteLine($"Mocking update, using version {mockTargetVersion} instead of {runnerUpdateMessage.TargetVersion}"); + Trace.Info($"Mocking update, using version {mockTargetVersion} instead of {runnerUpdateMessage.TargetVersion}"); + runnerUpdateMessage = new AgentRefreshMessage(runnerUpdateMessage.AgentId, mockTargetVersion, runnerUpdateMessage.Timeout); } } } #endif var selfUpdater = HostContext.GetService(); - selfUpdateTask = selfUpdater.SelfUpdate(runnerUpdateMessage, jobDispatcher, false, - HostContext.RunnerShutdownToken); + selfUpdateTask = selfUpdater.SelfUpdate(runnerUpdateMessage, jobDispatcher, false, HostContext.RunnerShutdownToken); Trace.Info("Refresh message received, kick-off selfupdate background process."); } else { - Trace.Info( - "Refresh message received, skip autoupdate since a previous autoupdate is already running."); + Trace.Info("Refresh message received, skip autoupdate since a previous autoupdate is already running."); } } - else if (string.Equals(message.MessageType, JobRequestMessageTypes.PipelineAgentJobRequest, - StringComparison.OrdinalIgnoreCase)) + else if (string.Equals(message.MessageType, JobRequestMessageTypes.PipelineAgentJobRequest, StringComparison.OrdinalIgnoreCase)) { if (autoUpdateInProgress || runOnceJobReceived) { @@ -540,10 +512,8 @@ private async Task RunAsync(RunnerSettings settings, bool runOnce = false) } else { - Trace.Info( - $"Received job message of length {message.Body.Length} from service, with hash '{IOUtil.GetSha256Hash(message.Body)}'"); - var jobMessage = - StringUtil.ConvertFromJson(message.Body); + Trace.Info($"Received job message of length {message.Body.Length} from service, with hash '{IOUtil.GetSha256Hash(message.Body)}'"); + var jobMessage = StringUtil.ConvertFromJson(message.Body); jobDispatcher.Run(jobMessage, runOnce); if (runOnce) { @@ -573,9 +543,7 @@ private async Task RunAsync(RunnerSettings settings, bool runOnce = false) { var actionsRunServer = HostContext.CreateService(); await actionsRunServer.ConnectAsync(new Uri(settings.ServerUrl), creds); - jobRequestMessage = - await actionsRunServer.GetJobMessageAsync(messageRef.RunnerRequestId, - messageQueueLoopTokenSource.Token); + jobRequestMessage = await actionsRunServer.GetJobMessageAsync(messageRef.RunnerRequestId, messageQueueLoopTokenSource.Token); } else { @@ -584,8 +552,8 @@ await actionsRunServer.GetJobMessageAsync(messageRef.RunnerRequestId, try { jobRequestMessage = - await runServer.GetJobMessageAsync(messageRef.RunnerRequestId, - messageQueueLoopTokenSource.Token); + await runServer.GetJobMessageAsync(messageRef.RunnerRequestId, + messageQueueLoopTokenSource.Token); } catch (TaskOrchestrationJobAlreadyAcquiredException) { @@ -602,8 +570,7 @@ await runServer.GetJobMessageAsync(messageRef.RunnerRequestId, } } } - else if (string.Equals(message.MessageType, JobCancelMessage.MessageType, - StringComparison.OrdinalIgnoreCase)) + else if (string.Equals(message.MessageType, JobCancelMessage.MessageType, StringComparison.OrdinalIgnoreCase)) { var cancelJobMessage = JsonUtility.FromString(message.Body); bool jobCancelled = jobDispatcher.Cancel(cancelJobMessage); @@ -611,26 +578,20 @@ await runServer.GetJobMessageAsync(messageRef.RunnerRequestId, if (skipMessageDeletion) { - Trace.Info( - $"Skip message deletion for cancellation message '{message.MessageId}'."); + Trace.Info($"Skip message deletion for cancellation message '{message.MessageId}'."); } } - else if (string.Equals(message.MessageType, - Pipelines.HostedRunnerShutdownMessage.MessageType, - StringComparison.OrdinalIgnoreCase)) + else if (string.Equals(message.MessageType, Pipelines.HostedRunnerShutdownMessage.MessageType, StringComparison.OrdinalIgnoreCase)) { - var HostedRunnerShutdownMessage = - JsonUtility.FromString(message.Body); + var HostedRunnerShutdownMessage = JsonUtility.FromString(message.Body); skipMessageDeletion = true; skipSessionDeletion = true; - Trace.Info( - $"Service requests the hosted runner to shutdown. Reason: '{HostedRunnerShutdownMessage.Reason}'."); + Trace.Info($"Service requests the hosted runner to shutdown. Reason: '{HostedRunnerShutdownMessage.Reason}'."); return Constants.Runner.ReturnCode.Success; } else { - Trace.Error( - $"Received message {message.MessageId} with unsupported message type {message.MessageType}."); + Trace.Error($"Received message {message.MessageId} with unsupported message type {message.MessageType}."); } } finally @@ -643,8 +604,7 @@ await runServer.GetJobMessageAsync(messageRef.RunnerRequestId, } catch (Exception ex) { - Trace.Error( - $"Catch exception during delete message from message queue. message id: {message.MessageId}"); + Trace.Error($"Catch exception during delete message from message queue. message id: {message.MessageId}"); Trace.Error(ex); } finally