From df885279a121d14b4945ca51b3ef4074ebf9c924 Mon Sep 17 00:00:00 2001 From: Luke Tomlinson Date: Tue, 21 Mar 2023 14:27:52 -0700 Subject: [PATCH 1/9] WIP --- src/Runner.Listener/BrokerMessageListener.cs | 217 +++++++++++++++++++ src/Sdk/WebApi/WebApi/BrokerHttpClient.cs | 61 ++++++ 2 files changed, 278 insertions(+) create mode 100644 src/Runner.Listener/BrokerMessageListener.cs create mode 100644 src/Sdk/WebApi/WebApi/BrokerHttpClient.cs diff --git a/src/Runner.Listener/BrokerMessageListener.cs b/src/Runner.Listener/BrokerMessageListener.cs new file mode 100644 index 00000000000..3dc474d931b --- /dev/null +++ b/src/Runner.Listener/BrokerMessageListener.cs @@ -0,0 +1,217 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.IO; +using System.Runtime.InteropServices; +using System.Security.Cryptography; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using GitHub.DistributedTask.WebApi; +using GitHub.Runner.Common; +using GitHub.Runner.Listener.Configuration; +using GitHub.Runner.Sdk; +using GitHub.Services.Common; +using GitHub.Services.OAuth; + +namespace GitHub.Runner.Listener +{ + public sealed class BrokerMessageListener : RunnerService, IMessageListener + { + private long? _lastMessageId; + private RunnerSettings _settings; + private ITerminal _term; + private IRunnerServer _runnerServer; + private TaskAgentSession _session; + private TimeSpan _getNextMessageRetryInterval; + private readonly TimeSpan _sessionCreationRetryInterval = TimeSpan.FromSeconds(30); + private readonly TimeSpan _sessionConflictRetryLimit = TimeSpan.FromMinutes(4); + private readonly TimeSpan _clockSkewRetryLimit = TimeSpan.FromMinutes(30); + private readonly Dictionary _sessionCreationExceptionTracker = new(); + private TaskAgentStatus runnerStatus = TaskAgentStatus.Online; + private CancellationTokenSource _getMessagesTokenSource; + + public override void Initialize(IHostContext hostContext) + { + base.Initialize(hostContext); + + _term = HostContext.GetService(); + } + + public async Task CreateSessionAsync(CancellationToken token) + { + return await Task.FromResult(true); + } + + public async Task DeleteSessionAsync() + { + await Task.CompletedTask; + } + + public void OnJobStatus(object sender, JobStatusEventArgs e) + { + if (StringUtil.ConvertToBoolean(Environment.GetEnvironmentVariable("USE_BROKER_FLOW"))) + { + Trace.Info("Received job status event. JobState: {0}", e.Status); + runnerStatus = e.Status; + try + { + _getMessagesTokenSource?.Cancel(); + } + catch (ObjectDisposedException) + { + Trace.Info("_getMessagesTokenSource is already disposed."); + } + } + } + + public async Task GetNextMessageAsync(CancellationToken token) + { + Trace.Entering(); + ArgUtil.NotNull(_settings, nameof(_settings)); + bool encounteringError = false; + int continuousError = 0; + string errorMessage = string.Empty; + Stopwatch heartbeat = new(); + heartbeat.Restart(); + while (true) + { + token.ThrowIfCancellationRequested(); + TaskAgentMessage message = null; + _getMessagesTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token); + try + { + message = await _runnerServer.GetAgentMessageAsync(_settings.PoolId, + _session.SessionId, + _lastMessageId, + runnerStatus, + BuildConstants.RunnerPackage.Version, + _getMessagesTokenSource.Token); + + + if (message != null) + { + _lastMessageId = message.MessageId; + } + + if (encounteringError) //print the message once only if there was an error + { + _term.WriteLine($"{DateTime.UtcNow:u}: Runner reconnected."); + encounteringError = false; + continuousError = 0; + } + } + catch (OperationCanceledException) when (_getMessagesTokenSource.Token.IsCancellationRequested && !token.IsCancellationRequested) + { + Trace.Info("Get messages has been cancelled using local token source. Continue to get messages with new status."); + continue; + } + catch (OperationCanceledException) when (token.IsCancellationRequested) + { + Trace.Info("Get next message has been cancelled."); + throw; + } + catch (TaskAgentAccessTokenExpiredException) + { + Trace.Info("Runner OAuth token has been revoked. Unable to pull message."); + throw; + } + catch (AccessDeniedException e) when (e.InnerException is InvalidTaskAgentVersionException) + { + throw; + } + catch (Exception ex) + { + Trace.Error("Catch exception during get next message."); + Trace.Error(ex); + + // don't retry if SkipSessionRecover = true, DT service will delete agent session to stop agent from taking more jobs. + if (ex is TaskAgentSessionExpiredException && !_settings.SkipSessionRecover && await CreateSessionAsync(token)) + { + Trace.Info($"{nameof(TaskAgentSessionExpiredException)} received, recovered by recreate session."); + } + else if (!IsGetNextMessageExceptionRetriable(ex)) + { + throw; + } + else + { + continuousError++; + //retry after a random backoff to avoid service throttling + //in case of there is a service error happened and all agents get kicked off of the long poll and all agent try to reconnect back at the same time. + if (continuousError <= 5) + { + // random backoff [15, 30] + _getNextMessageRetryInterval = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromSeconds(15), TimeSpan.FromSeconds(30), _getNextMessageRetryInterval); + } + else + { + // more aggressive backoff [30, 60] + _getNextMessageRetryInterval = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromSeconds(30), TimeSpan.FromSeconds(60), _getNextMessageRetryInterval); + } + + if (!encounteringError) + { + //print error only on the first consecutive error + _term.WriteError($"{DateTime.UtcNow:u}: Runner connect error: {ex.Message}. Retrying until reconnected."); + encounteringError = true; + } + + Trace.Info("Sleeping for {0} seconds before retrying.", _getNextMessageRetryInterval.TotalSeconds); + await HostContext.Delay(_getNextMessageRetryInterval, token); + } + } + finally + { + _getMessagesTokenSource.Dispose(); + } + + if (message == null) + { + if (heartbeat.Elapsed > TimeSpan.FromMinutes(30)) + { + Trace.Info($"No message retrieved within last 30 minutes."); + heartbeat.Restart(); + } + else + { + Trace.Verbose($"No message retrieved"); + } + + continue; + } + + Trace.Info($"Message '{message.MessageId}' received"); + return message; + } + } + + public async Task DeleteMessageAsync(TaskAgentMessage message) + { + await Task.CompletedTask; + } + + private bool IsGetNextMessageExceptionRetriable(Exception ex) + { + if (ex is TaskAgentNotFoundException || + ex is TaskAgentPoolNotFoundException || + ex is TaskAgentSessionExpiredException || + ex is AccessDeniedException || + ex is VssUnauthorizedException) + { + Trace.Info($"Non-retriable exception: {ex.Message}"); + return false; + } + else + { + Trace.Info($"Retriable exception: {ex.Message}"); + return true; + } + } + + private GetMessageAsync(string status, string version) + { + + } + } +} diff --git a/src/Sdk/WebApi/WebApi/BrokerHttpClient.cs b/src/Sdk/WebApi/WebApi/BrokerHttpClient.cs new file mode 100644 index 00000000000..6a7a5e9db49 --- /dev/null +++ b/src/Sdk/WebApi/WebApi/BrokerHttpClient.cs @@ -0,0 +1,61 @@ +using System; +using System.IO; +using System.Net.Http; +using System.Net.Http.Headers; +using System.Threading; +using System.Threading.Tasks; +using GitHub.Services.Results.Contracts; +using System.Net.Http.Formatting; +using Sdk.WebApi.WebApi; +using GitHub.DistributedTask.WebApi; + +namespace GitHub.Services.Results.Client +{ + public class BrokerHttpClient : RawHttpClientBase + { + public BrokerHttpClient( + Uri baseUrl, + HttpMessageHandler pipeline, + string token, + bool disposeHandler) + : base(baseUrl, pipeline, disposeHandler) + { + m_token = token; + m_brokerUrl = baseUrl; + m_formatter = new JsonMediaTypeFormatter(); + } + + public async Task GetMessagesAsync(CancellationToken cancellationToken) + { + var uri = new Uri(m_brokerUrl, Constants.Messages); + return await GetSignedURLResponse(uri, cancellationToken); + } + + // Get Sas URL calls + private async Task GetSignedURLResponse(Uri uri, CancellationToken cancellationToken) + { + using (HttpRequestMessage requestMessage = new HttpRequestMessage(HttpMethod.Get, uri)) + { + requestMessage.Headers.Authorization = new AuthenticationHeaderValue("Bearer", m_token); + requestMessage.Headers.Accept.Add(MediaTypeWithQualityHeaderValue.Parse("application/json")); + + using (var response = await SendAsync(requestMessage, HttpCompletionOption.ResponseContentRead, cancellationToken: cancellationToken)) + { + return await ReadJsonContentAsync(response, cancellationToken); + } + } + } + + private MediaTypeFormatter m_formatter; + private Uri m_brokerUrl; + private string m_token; + } + + // Constants specific to results + public static class Constants + { + + public static readonly string Messages = "messages"; + } + +} From af657acebc20e4448e4c524be6abcf205a390f5d Mon Sep 17 00:00:00 2001 From: Luke Tomlinson Date: Wed, 22 Mar 2023 12:55:15 -0700 Subject: [PATCH 2/9] WIP --- .vscode/launch.json | 8 +- src/Runner.Common/BrokerServer.cs | 56 ++++ src/Runner.Listener/BrokerMessageListener.cs | 285 ++++++++++--------- src/Runner.Listener/MessageListener.cs | 2 +- src/Sdk/WebApi/WebApi/BrokerHttpClient.cs | 80 +++--- 5 files changed, 256 insertions(+), 175 deletions(-) create mode 100644 src/Runner.Common/BrokerServer.cs diff --git a/.vscode/launch.json b/.vscode/launch.json index 3c5f5c694cd..270f1d6f5d0 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -24,7 +24,10 @@ ], "cwd": "${workspaceFolder}/src", "console": "integratedTerminal", - "requireExactSource": false + "requireExactSource": false, + "env": { + "USE_BROKER_FLOW": "1" + } }, { "name": "Configure", @@ -54,5 +57,4 @@ "requireExactSource": false }, ], -} - +} \ No newline at end of file diff --git a/src/Runner.Common/BrokerServer.cs b/src/Runner.Common/BrokerServer.cs new file mode 100644 index 00000000000..2846162109f --- /dev/null +++ b/src/Runner.Common/BrokerServer.cs @@ -0,0 +1,56 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using GitHub.Actions.RunService.WebApi; +using GitHub.DistributedTask.Pipelines; +using GitHub.DistributedTask.WebApi; +using GitHub.Runner.Sdk; +using GitHub.Services.Common; +using Sdk.RSWebApi.Contracts; +using Sdk.WebApi.WebApi.RawClient; + +namespace GitHub.Runner.Common +{ + [ServiceLocator(Default = typeof(BrokerServer))] + public interface IBrokerServer : IRunnerService + { + Task ConnectAsync(Uri serverUrl, VssCredentials credentials); + + Task GetRunnerMessageAsync(CancellationToken token); + } + + public sealed class BrokerServer : RunnerService, IBrokerServer + { + private bool _hasConnection; + private Uri _brokerUri; + private RawConnection _connection; + private BrokerHttpClient _brokerHttpClient; + + public async Task ConnectAsync(Uri serverUri, VssCredentials credentials) + { + _brokerUri = serverUri; + + _connection = VssUtil.CreateRawConnection(serverUri, credentials); + _brokerHttpClient = await _connection.GetClientAsync(); + _hasConnection = true; + } + + private void CheckConnection() + { + if (!_hasConnection) + { + throw new InvalidOperationException($"SetConnection"); + } + } + + public Task GetRunnerMessageAsync(CancellationToken cancellationToken) + { + CheckConnection(); + var jobMessage = RetryRequest( + async () => await _brokerHttpClient.GetRunnerMessageAsync(cancellationToken), cancellationToken); + + return jobMessage; + } + } +} diff --git a/src/Runner.Listener/BrokerMessageListener.cs b/src/Runner.Listener/BrokerMessageListener.cs index 3dc474d931b..35a37969e16 100644 --- a/src/Runner.Listener/BrokerMessageListener.cs +++ b/src/Runner.Listener/BrokerMessageListener.cs @@ -18,28 +18,33 @@ namespace GitHub.Runner.Listener { public sealed class BrokerMessageListener : RunnerService, IMessageListener { - private long? _lastMessageId; - private RunnerSettings _settings; - private ITerminal _term; - private IRunnerServer _runnerServer; - private TaskAgentSession _session; - private TimeSpan _getNextMessageRetryInterval; - private readonly TimeSpan _sessionCreationRetryInterval = TimeSpan.FromSeconds(30); - private readonly TimeSpan _sessionConflictRetryLimit = TimeSpan.FromMinutes(4); - private readonly TimeSpan _clockSkewRetryLimit = TimeSpan.FromMinutes(30); - private readonly Dictionary _sessionCreationExceptionTracker = new(); + // private long? _lastMessageId; + // private RunnerSettings _settings; + // private ITerminal _term; + // private IRunnerServer _runnerServer; + // private TaskAgentSession _session; + // private TimeSpan _getNextMessageRetryInterval; + // private readonly TimeSpan _sessionCreationRetryInterval = TimeSpan.FromSeconds(30); + // private readonly TimeSpan _sessionConflictRetryLimit = TimeSpan.FromMinutes(4); + // private readonly TimeSpan _clockSkewRetryLimit = TimeSpan.FromMinutes(30); + // private readonly Dictionary _sessionCreationExceptionTracker = new(); private TaskAgentStatus runnerStatus = TaskAgentStatus.Online; private CancellationTokenSource _getMessagesTokenSource; + private IBrokerServer _brokerServer; public override void Initialize(IHostContext hostContext) { base.Initialize(hostContext); - _term = HostContext.GetService(); + // _term = HostContext.GetService(); + _brokerServer = HostContext.GetService(); } public async Task CreateSessionAsync(CancellationToken token) { + var credMgr = HostContext.GetService(); + VssCredentials creds = credMgr.LoadCredentials(); + await _brokerServer.ConnectAsync(new Uri("http://broker.actions.localhost"), creds); return await Task.FromResult(true); } @@ -50,140 +55,150 @@ public async Task DeleteSessionAsync() public void OnJobStatus(object sender, JobStatusEventArgs e) { - if (StringUtil.ConvertToBoolean(Environment.GetEnvironmentVariable("USE_BROKER_FLOW"))) + + Trace.Info("Received job status event. JobState: {0}", e.Status); + runnerStatus = e.Status; + try { - Trace.Info("Received job status event. JobState: {0}", e.Status); - runnerStatus = e.Status; - try - { - _getMessagesTokenSource?.Cancel(); - } - catch (ObjectDisposedException) - { - Trace.Info("_getMessagesTokenSource is already disposed."); - } + _getMessagesTokenSource?.Cancel(); + } + catch (ObjectDisposedException) + { + Trace.Info("_getMessagesTokenSource is already disposed."); } + } public async Task GetNextMessageAsync(CancellationToken token) { - Trace.Entering(); - ArgUtil.NotNull(_settings, nameof(_settings)); - bool encounteringError = false; - int continuousError = 0; - string errorMessage = string.Empty; - Stopwatch heartbeat = new(); - heartbeat.Restart(); while (true) { - token.ThrowIfCancellationRequested(); - TaskAgentMessage message = null; _getMessagesTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token); - try - { - message = await _runnerServer.GetAgentMessageAsync(_settings.PoolId, - _session.SessionId, - _lastMessageId, - runnerStatus, - BuildConstants.RunnerPackage.Version, - _getMessagesTokenSource.Token); - - - if (message != null) - { - _lastMessageId = message.MessageId; - } - - if (encounteringError) //print the message once only if there was an error - { - _term.WriteLine($"{DateTime.UtcNow:u}: Runner reconnected."); - encounteringError = false; - continuousError = 0; - } - } - catch (OperationCanceledException) when (_getMessagesTokenSource.Token.IsCancellationRequested && !token.IsCancellationRequested) - { - Trace.Info("Get messages has been cancelled using local token source. Continue to get messages with new status."); - continue; - } - catch (OperationCanceledException) when (token.IsCancellationRequested) - { - Trace.Info("Get next message has been cancelled."); - throw; - } - catch (TaskAgentAccessTokenExpiredException) - { - Trace.Info("Runner OAuth token has been revoked. Unable to pull message."); - throw; - } - catch (AccessDeniedException e) when (e.InnerException is InvalidTaskAgentVersionException) - { - throw; - } - catch (Exception ex) - { - Trace.Error("Catch exception during get next message."); - Trace.Error(ex); - - // don't retry if SkipSessionRecover = true, DT service will delete agent session to stop agent from taking more jobs. - if (ex is TaskAgentSessionExpiredException && !_settings.SkipSessionRecover && await CreateSessionAsync(token)) - { - Trace.Info($"{nameof(TaskAgentSessionExpiredException)} received, recovered by recreate session."); - } - else if (!IsGetNextMessageExceptionRetriable(ex)) - { - throw; - } - else - { - continuousError++; - //retry after a random backoff to avoid service throttling - //in case of there is a service error happened and all agents get kicked off of the long poll and all agent try to reconnect back at the same time. - if (continuousError <= 5) - { - // random backoff [15, 30] - _getNextMessageRetryInterval = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromSeconds(15), TimeSpan.FromSeconds(30), _getNextMessageRetryInterval); - } - else - { - // more aggressive backoff [30, 60] - _getNextMessageRetryInterval = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromSeconds(30), TimeSpan.FromSeconds(60), _getNextMessageRetryInterval); - } - - if (!encounteringError) - { - //print error only on the first consecutive error - _term.WriteError($"{DateTime.UtcNow:u}: Runner connect error: {ex.Message}. Retrying until reconnected."); - encounteringError = true; - } - - Trace.Info("Sleeping for {0} seconds before retrying.", _getNextMessageRetryInterval.TotalSeconds); - await HostContext.Delay(_getNextMessageRetryInterval, token); - } - } - finally + var message = await _brokerServer.GetRunnerMessageAsync(_getMessagesTokenSource.Token); + if (message != null) { - _getMessagesTokenSource.Dispose(); + return message; } - if (message == null) - { - if (heartbeat.Elapsed > TimeSpan.FromMinutes(30)) - { - Trace.Info($"No message retrieved within last 30 minutes."); - heartbeat.Restart(); - } - else - { - Trace.Verbose($"No message retrieved"); - } - - continue; - } - - Trace.Info($"Message '{message.MessageId}' received"); - return message; } + + // return message; + // Trace.Entering(); + // ArgUtil.NotNull(_settings, nameof(_settings)); + // bool encounteringError = false; + // int continuousError = 0; + // string errorMessage = string.Empty; + // Stopwatch heartbeat = new(); + // heartbeat.Restart(); + // while (true) + // { + // token.ThrowIfCancellationRequested(); + // TaskAgentMessage message = null; + // _getMessagesTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token); + // try + // { + // message = await _runnerServer.GetAgentMessageAsync(_settings.PoolId, + // _session.SessionId, + // _lastMessageId, + // runnerStatus, + // BuildConstants.RunnerPackage.Version, + // _getMessagesTokenSource.Token); + + + // if (message != null) + // { + // _lastMessageId = message.MessageId; + // } + + // if (encounteringError) //print the message once only if there was an error + // { + // _term.WriteLine($"{DateTime.UtcNow:u}: Runner reconnected."); + // encounteringError = false; + // continuousError = 0; + // } + // } + // catch (OperationCanceledException) when (_getMessagesTokenSource.Token.IsCancellationRequested && !token.IsCancellationRequested) + // { + // Trace.Info("Get messages has been cancelled using local token source. Continue to get messages with new status."); + // continue; + // } + // catch (OperationCanceledException) when (token.IsCancellationRequested) + // { + // Trace.Info("Get next message has been cancelled."); + // throw; + // } + // catch (TaskAgentAccessTokenExpiredException) + // { + // Trace.Info("Runner OAuth token has been revoked. Unable to pull message."); + // throw; + // } + // catch (AccessDeniedException e) when (e.InnerException is InvalidTaskAgentVersionException) + // { + // throw; + // } + // catch (Exception ex) + // { + // Trace.Error("Catch exception during get next message."); + // Trace.Error(ex); + + // // don't retry if SkipSessionRecover = true, DT service will delete agent session to stop agent from taking more jobs. + // if (ex is TaskAgentSessionExpiredException && !_settings.SkipSessionRecover && await CreateSessionAsync(token)) + // { + // Trace.Info($"{nameof(TaskAgentSessionExpiredException)} received, recovered by recreate session."); + // } + // else if (!IsGetNextMessageExceptionRetriable(ex)) + // { + // throw; + // } + // else + // { + // continuousError++; + // //retry after a random backoff to avoid service throttling + // //in case of there is a service error happened and all agents get kicked off of the long poll and all agent try to reconnect back at the same time. + // if (continuousError <= 5) + // { + // // random backoff [15, 30] + // _getNextMessageRetryInterval = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromSeconds(15), TimeSpan.FromSeconds(30), _getNextMessageRetryInterval); + // } + // else + // { + // // more aggressive backoff [30, 60] + // _getNextMessageRetryInterval = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromSeconds(30), TimeSpan.FromSeconds(60), _getNextMessageRetryInterval); + // } + + // if (!encounteringError) + // { + // //print error only on the first consecutive error + // _term.WriteError($"{DateTime.UtcNow:u}: Runner connect error: {ex.Message}. Retrying until reconnected."); + // encounteringError = true; + // } + + // Trace.Info("Sleeping for {0} seconds before retrying.", _getNextMessageRetryInterval.TotalSeconds); + // await HostContext.Delay(_getNextMessageRetryInterval, token); + // } + // } + // finally + // { + // _getMessagesTokenSource.Dispose(); + // } + + // if (message == null) + // { + // if (heartbeat.Elapsed > TimeSpan.FromMinutes(30)) + // { + // Trace.Info($"No message retrieved within last 30 minutes."); + // heartbeat.Restart(); + // } + // else + // { + // Trace.Verbose($"No message retrieved"); + // } + + // continue; + // } + + // Trace.Info($"Message '{message.MessageId}' received"); + // return message; } public async Task DeleteMessageAsync(TaskAgentMessage message) @@ -209,9 +224,9 @@ ex is AccessDeniedException || } } - private GetMessageAsync(string status, string version) - { + // private GetMessageAsync(string status, string version) + // { - } + // } } } diff --git a/src/Runner.Listener/MessageListener.cs b/src/Runner.Listener/MessageListener.cs index cb8edb607fe..80c0227eae5 100644 --- a/src/Runner.Listener/MessageListener.cs +++ b/src/Runner.Listener/MessageListener.cs @@ -16,7 +16,7 @@ namespace GitHub.Runner.Listener { - [ServiceLocator(Default = typeof(MessageListener))] + [ServiceLocator(Default = typeof(BrokerMessageListener))] public interface IMessageListener : IRunnerService { Task CreateSessionAsync(CancellationToken token); diff --git a/src/Sdk/WebApi/WebApi/BrokerHttpClient.cs b/src/Sdk/WebApi/WebApi/BrokerHttpClient.cs index 6a7a5e9db49..40563b351bd 100644 --- a/src/Sdk/WebApi/WebApi/BrokerHttpClient.cs +++ b/src/Sdk/WebApi/WebApi/BrokerHttpClient.cs @@ -1,61 +1,69 @@ using System; -using System.IO; +using System.Collections.Generic; using System.Net.Http; -using System.Net.Http.Headers; using System.Threading; using System.Threading.Tasks; -using GitHub.Services.Results.Contracts; -using System.Net.Http.Formatting; -using Sdk.WebApi.WebApi; +using GitHub.DistributedTask.Pipelines; using GitHub.DistributedTask.WebApi; +using GitHub.Services.Common; +using GitHub.Services.OAuth; +using GitHub.Services.WebApi; +using Sdk.RSWebApi.Contracts; +using Sdk.WebApi.WebApi; -namespace GitHub.Services.Results.Client +namespace GitHub.Actions.RunService.WebApi { public class BrokerHttpClient : RawHttpClientBase { public BrokerHttpClient( Uri baseUrl, - HttpMessageHandler pipeline, - string token, - bool disposeHandler) - : base(baseUrl, pipeline, disposeHandler) + VssOAuthCredential credentials) + : base(baseUrl, credentials) { - m_token = token; - m_brokerUrl = baseUrl; - m_formatter = new JsonMediaTypeFormatter(); } - public async Task GetMessagesAsync(CancellationToken cancellationToken) + public BrokerHttpClient( + Uri baseUrl, + VssOAuthCredential credentials, + RawClientHttpRequestSettings settings) + : base(baseUrl, credentials, settings) { - var uri = new Uri(m_brokerUrl, Constants.Messages); - return await GetSignedURLResponse(uri, cancellationToken); } - // Get Sas URL calls - private async Task GetSignedURLResponse(Uri uri, CancellationToken cancellationToken) + public BrokerHttpClient( + Uri baseUrl, + VssOAuthCredential credentials, + params DelegatingHandler[] handlers) + : base(baseUrl, credentials, handlers) { - using (HttpRequestMessage requestMessage = new HttpRequestMessage(HttpMethod.Get, uri)) - { - requestMessage.Headers.Authorization = new AuthenticationHeaderValue("Bearer", m_token); - requestMessage.Headers.Accept.Add(MediaTypeWithQualityHeaderValue.Parse("application/json")); + } - using (var response = await SendAsync(requestMessage, HttpCompletionOption.ResponseContentRead, cancellationToken: cancellationToken)) - { - return await ReadJsonContentAsync(response, cancellationToken); - } - } + public BrokerHttpClient( + Uri baseUrl, + VssOAuthCredential credentials, + RawClientHttpRequestSettings settings, + params DelegatingHandler[] handlers) + : base(baseUrl, credentials, settings, handlers) + { } - private MediaTypeFormatter m_formatter; - private Uri m_brokerUrl; - private string m_token; - } + public BrokerHttpClient( + Uri baseUrl, + HttpMessageHandler pipeline, + Boolean disposeHandler) + : base(baseUrl, pipeline, disposeHandler) + { + } - // Constants specific to results - public static class Constants - { + public Task GetRunnerMessageAsync( + CancellationToken cancellationToken = default) + { + var requestUri = new Uri(Client.BaseAddress, "message"); - public static readonly string Messages = "messages"; + return SendAsync( + new HttpMethod("GET"), + requestUri: requestUri, + cancellationToken: cancellationToken); + } } - } From ee19ca253e9d56dd9ac3fb66981bd480284d43ff Mon Sep 17 00:00:00 2001 From: Luke Tomlinson Date: Wed, 22 Mar 2023 15:20:31 -0700 Subject: [PATCH 3/9] WIP --- .vscode/launch.json | 5 ++++- src/Runner.Common/BrokerServer.cs | 6 ++--- src/Runner.Listener/BrokerMessageListener.cs | 23 +++++++++++++++++--- src/Sdk/WebApi/WebApi/BrokerHttpClient.cs | 17 ++++++++++++++- 4 files changed, 43 insertions(+), 8 deletions(-) diff --git a/.vscode/launch.json b/.vscode/launch.json index 270f1d6f5d0..887789e3bb4 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -12,7 +12,10 @@ ], "cwd": "${workspaceFolder}/src", "console": "integratedTerminal", - "requireExactSource": false + "requireExactSource": false, + "env": { + "USE_BROKER_FLOW": "1" + } }, { "name": "Run", diff --git a/src/Runner.Common/BrokerServer.cs b/src/Runner.Common/BrokerServer.cs index 2846162109f..bb0691771a0 100644 --- a/src/Runner.Common/BrokerServer.cs +++ b/src/Runner.Common/BrokerServer.cs @@ -17,7 +17,7 @@ public interface IBrokerServer : IRunnerService { Task ConnectAsync(Uri serverUrl, VssCredentials credentials); - Task GetRunnerMessageAsync(CancellationToken token); + Task GetRunnerMessageAsync(CancellationToken token, TaskAgentStatus status, string version); } public sealed class BrokerServer : RunnerService, IBrokerServer @@ -44,11 +44,11 @@ private void CheckConnection() } } - public Task GetRunnerMessageAsync(CancellationToken cancellationToken) + public Task GetRunnerMessageAsync(CancellationToken cancellationToken, TaskAgentStatus status, string version) { CheckConnection(); var jobMessage = RetryRequest( - async () => await _brokerHttpClient.GetRunnerMessageAsync(cancellationToken), cancellationToken); + async () => await _brokerHttpClient.GetRunnerMessageAsync(version, status, cancellationToken), cancellationToken); return jobMessage; } diff --git a/src/Runner.Listener/BrokerMessageListener.cs b/src/Runner.Listener/BrokerMessageListener.cs index 35a37969e16..40a73dae0a6 100644 --- a/src/Runner.Listener/BrokerMessageListener.cs +++ b/src/Runner.Listener/BrokerMessageListener.cs @@ -12,6 +12,7 @@ using GitHub.Runner.Listener.Configuration; using GitHub.Runner.Sdk; using GitHub.Services.Common; +using GitHub.Runner.Common.Util; using GitHub.Services.OAuth; namespace GitHub.Runner.Listener @@ -31,6 +32,7 @@ public sealed class BrokerMessageListener : RunnerService, IMessageListener private TaskAgentStatus runnerStatus = TaskAgentStatus.Online; private CancellationTokenSource _getMessagesTokenSource; private IBrokerServer _brokerServer; + private string lastRunnerRequestId; public override void Initialize(IHostContext hostContext) { @@ -74,12 +76,27 @@ public async Task GetNextMessageAsync(CancellationToken token) while (true) { _getMessagesTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token); - var message = await _brokerServer.GetRunnerMessageAsync(_getMessagesTokenSource.Token); - if (message != null) + var message = await _brokerServer.GetRunnerMessageAsync(_getMessagesTokenSource.Token, runnerStatus, BuildConstants.RunnerPackage.Version); + + if (message == null) { - return message; + continue; } + if (MessageUtil.IsRunServiceJob(message.MessageType)) + { + var messageRef = StringUtil.ConvertFromJson(message.Body); + + if (messageRef.RunnerRequestId != lastRunnerRequestId) + { + lastRunnerRequestId = messageRef.RunnerRequestId; + return message; + } + } + else + { + return message; + } } // return message; diff --git a/src/Sdk/WebApi/WebApi/BrokerHttpClient.cs b/src/Sdk/WebApi/WebApi/BrokerHttpClient.cs index 40563b351bd..8f9b22e7541 100644 --- a/src/Sdk/WebApi/WebApi/BrokerHttpClient.cs +++ b/src/Sdk/WebApi/WebApi/BrokerHttpClient.cs @@ -56,13 +56,28 @@ public BrokerHttpClient( } public Task GetRunnerMessageAsync( - CancellationToken cancellationToken = default) + string runnerVersion, + TaskAgentStatus? status, + CancellationToken cancellationToken = default + ) { var requestUri = new Uri(Client.BaseAddress, "message"); + List> queryParams = new List>(); + + if (status != null) + { + queryParams.Add("status", status.Value.ToString()); + } + if (runnerVersion != null) + { + queryParams.Add("runnerVersion", runnerVersion); + } + return SendAsync( new HttpMethod("GET"), requestUri: requestUri, + queryParameters: queryParams, cancellationToken: cancellationToken); } } From a49a5bef65846a60a440576faaec5aa6117a4894 Mon Sep 17 00:00:00 2001 From: Luke Tomlinson Date: Thu, 23 Mar 2023 07:00:27 -0700 Subject: [PATCH 4/9] . --- src/Runner.Listener/BrokerMessageListener.cs | 257 ++++++++----------- 1 file changed, 112 insertions(+), 145 deletions(-) diff --git a/src/Runner.Listener/BrokerMessageListener.cs b/src/Runner.Listener/BrokerMessageListener.cs index 40a73dae0a6..e4f64ae6a63 100644 --- a/src/Runner.Listener/BrokerMessageListener.cs +++ b/src/Runner.Listener/BrokerMessageListener.cs @@ -19,16 +19,9 @@ namespace GitHub.Runner.Listener { public sealed class BrokerMessageListener : RunnerService, IMessageListener { - // private long? _lastMessageId; - // private RunnerSettings _settings; - // private ITerminal _term; - // private IRunnerServer _runnerServer; - // private TaskAgentSession _session; - // private TimeSpan _getNextMessageRetryInterval; - // private readonly TimeSpan _sessionCreationRetryInterval = TimeSpan.FromSeconds(30); - // private readonly TimeSpan _sessionConflictRetryLimit = TimeSpan.FromMinutes(4); - // private readonly TimeSpan _clockSkewRetryLimit = TimeSpan.FromMinutes(30); - // private readonly Dictionary _sessionCreationExceptionTracker = new(); + private RunnerSettings _settings; + private ITerminal _term; + private TimeSpan _getNextMessageRetryInterval; private TaskAgentStatus runnerStatus = TaskAgentStatus.Online; private CancellationTokenSource _getMessagesTokenSource; private IBrokerServer _brokerServer; @@ -38,15 +31,13 @@ public override void Initialize(IHostContext hostContext) { base.Initialize(hostContext); - // _term = HostContext.GetService(); + _term = HostContext.GetService(); _brokerServer = HostContext.GetService(); } public async Task CreateSessionAsync(CancellationToken token) { - var credMgr = HostContext.GetService(); - VssCredentials creds = credMgr.LoadCredentials(); - await _brokerServer.ConnectAsync(new Uri("http://broker.actions.localhost"), creds); + await RefreshBrokerConnection(); return await Task.FromResult(true); } @@ -73,149 +64,119 @@ public void OnJobStatus(object sender, JobStatusEventArgs e) public async Task GetNextMessageAsync(CancellationToken token) { + bool encounteringError = false; + int continuousError = 0; + Stopwatch heartbeat = new(); + heartbeat.Restart(); + while (true) { + TaskAgentMessage message = null; _getMessagesTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token); - var message = await _brokerServer.GetRunnerMessageAsync(_getMessagesTokenSource.Token, runnerStatus, BuildConstants.RunnerPackage.Version); + try + { + message = await _brokerServer.GetRunnerMessageAsync(_getMessagesTokenSource.Token, runnerStatus, BuildConstants.RunnerPackage.Version); - if (message == null) + if (message == null) + { + continue; + } + + if (MessageUtil.IsRunServiceJob(message.MessageType)) + { + var messageRef = StringUtil.ConvertFromJson(message.Body); + + if (messageRef.RunnerRequestId != lastRunnerRequestId) + { + lastRunnerRequestId = messageRef.RunnerRequestId; + return message; + } + } + else + { + return message; + } + } + catch (OperationCanceledException) when (_getMessagesTokenSource.Token.IsCancellationRequested && !token.IsCancellationRequested) { + Trace.Info("Get messages has been cancelled using local token source. Continue to get messages with new status."); continue; } - - if (MessageUtil.IsRunServiceJob(message.MessageType)) + catch (OperationCanceledException) when (token.IsCancellationRequested) + { + Trace.Info("Get next message has been cancelled."); + throw; + } + catch (TaskAgentAccessTokenExpiredException) + { + Trace.Info("Runner OAuth token has been revoked. Unable to pull message."); + throw; + } + catch (AccessDeniedException e) when (e.InnerException is InvalidTaskAgentVersionException) + { + throw; + } + catch (Exception ex) { - var messageRef = StringUtil.ConvertFromJson(message.Body); + Trace.Error("Catch exception during get next message."); + Trace.Error(ex); - if (messageRef.RunnerRequestId != lastRunnerRequestId) + if (!IsGetNextMessageExceptionRetriable(ex)) { - lastRunnerRequestId = messageRef.RunnerRequestId; - return message; + throw; + } + else + { + continuousError++; + //retry after a random backoff to avoid service throttling + //in case of there is a service error happened and all agents get kicked off of the long poll and all agent try to reconnect back at the same time. + if (continuousError <= 5) + { + // random backoff [15, 30] + _getNextMessageRetryInterval = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromSeconds(15), TimeSpan.FromSeconds(30), _getNextMessageRetryInterval); + } + else + { + // more aggressive backoff [30, 60] + _getNextMessageRetryInterval = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromSeconds(30), TimeSpan.FromSeconds(60), _getNextMessageRetryInterval); + } + + if (!encounteringError) + { + //print error only on the first consecutive error + _term.WriteError($"{DateTime.UtcNow:u}: Runner connect error: {ex.Message}. Retrying until reconnected."); + encounteringError = true; + } + + // re-create VssConnection before next retry + await RefreshBrokerConnection(); + + Trace.Info("Sleeping for {0} seconds before retrying.", _getNextMessageRetryInterval.TotalSeconds); + await HostContext.Delay(_getNextMessageRetryInterval, token); } } - else + finally { - return message; + _getMessagesTokenSource.Dispose(); + } + + if (message == null) + { + if (heartbeat.Elapsed > TimeSpan.FromMinutes(30)) + { + Trace.Info($"No message retrieved within last 30 minutes."); + heartbeat.Restart(); + } + else + { + Trace.Verbose($"No message retrieved."); + } + + continue; } - } - // return message; - // Trace.Entering(); - // ArgUtil.NotNull(_settings, nameof(_settings)); - // bool encounteringError = false; - // int continuousError = 0; - // string errorMessage = string.Empty; - // Stopwatch heartbeat = new(); - // heartbeat.Restart(); - // while (true) - // { - // token.ThrowIfCancellationRequested(); - // TaskAgentMessage message = null; - // _getMessagesTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token); - // try - // { - // message = await _runnerServer.GetAgentMessageAsync(_settings.PoolId, - // _session.SessionId, - // _lastMessageId, - // runnerStatus, - // BuildConstants.RunnerPackage.Version, - // _getMessagesTokenSource.Token); - - - // if (message != null) - // { - // _lastMessageId = message.MessageId; - // } - - // if (encounteringError) //print the message once only if there was an error - // { - // _term.WriteLine($"{DateTime.UtcNow:u}: Runner reconnected."); - // encounteringError = false; - // continuousError = 0; - // } - // } - // catch (OperationCanceledException) when (_getMessagesTokenSource.Token.IsCancellationRequested && !token.IsCancellationRequested) - // { - // Trace.Info("Get messages has been cancelled using local token source. Continue to get messages with new status."); - // continue; - // } - // catch (OperationCanceledException) when (token.IsCancellationRequested) - // { - // Trace.Info("Get next message has been cancelled."); - // throw; - // } - // catch (TaskAgentAccessTokenExpiredException) - // { - // Trace.Info("Runner OAuth token has been revoked. Unable to pull message."); - // throw; - // } - // catch (AccessDeniedException e) when (e.InnerException is InvalidTaskAgentVersionException) - // { - // throw; - // } - // catch (Exception ex) - // { - // Trace.Error("Catch exception during get next message."); - // Trace.Error(ex); - - // // don't retry if SkipSessionRecover = true, DT service will delete agent session to stop agent from taking more jobs. - // if (ex is TaskAgentSessionExpiredException && !_settings.SkipSessionRecover && await CreateSessionAsync(token)) - // { - // Trace.Info($"{nameof(TaskAgentSessionExpiredException)} received, recovered by recreate session."); - // } - // else if (!IsGetNextMessageExceptionRetriable(ex)) - // { - // throw; - // } - // else - // { - // continuousError++; - // //retry after a random backoff to avoid service throttling - // //in case of there is a service error happened and all agents get kicked off of the long poll and all agent try to reconnect back at the same time. - // if (continuousError <= 5) - // { - // // random backoff [15, 30] - // _getNextMessageRetryInterval = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromSeconds(15), TimeSpan.FromSeconds(30), _getNextMessageRetryInterval); - // } - // else - // { - // // more aggressive backoff [30, 60] - // _getNextMessageRetryInterval = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromSeconds(30), TimeSpan.FromSeconds(60), _getNextMessageRetryInterval); - // } - - // if (!encounteringError) - // { - // //print error only on the first consecutive error - // _term.WriteError($"{DateTime.UtcNow:u}: Runner connect error: {ex.Message}. Retrying until reconnected."); - // encounteringError = true; - // } - - // Trace.Info("Sleeping for {0} seconds before retrying.", _getNextMessageRetryInterval.TotalSeconds); - // await HostContext.Delay(_getNextMessageRetryInterval, token); - // } - // } - // finally - // { - // _getMessagesTokenSource.Dispose(); - // } - - // if (message == null) - // { - // if (heartbeat.Elapsed > TimeSpan.FromMinutes(30)) - // { - // Trace.Info($"No message retrieved within last 30 minutes."); - // heartbeat.Restart(); - // } - // else - // { - // Trace.Verbose($"No message retrieved"); - // } - - // continue; - // } - - // Trace.Info($"Message '{message.MessageId}' received"); - // return message; + Trace.Info($"Message '{message.MessageId}' received."); + } } public async Task DeleteMessageAsync(TaskAgentMessage message) @@ -241,9 +202,15 @@ ex is AccessDeniedException || } } - // private GetMessageAsync(string status, string version) - // { + private async Task RefreshBrokerConnection() + { - // } + var configManager = HostContext.GetService(); + _settings = configManager.LoadSettings(); + + var credMgr = HostContext.GetService(); + VssCredentials creds = credMgr.LoadCredentials(); + await _brokerServer.ConnectAsync(new Uri("http://broker.actions.localhost"), creds); + } } } From 08479068fe709174196a47d452103c238ba6600a Mon Sep 17 00:00:00 2001 From: Luke Tomlinson Date: Thu, 23 Mar 2023 07:35:50 -0700 Subject: [PATCH 5/9] Cleanup --- src/Runner.Common/ConfigurationStore.cs | 3 +++ src/Runner.Listener/BrokerMessageListener.cs | 3 +-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/Runner.Common/ConfigurationStore.cs b/src/Runner.Common/ConfigurationStore.cs index 49c4229daf4..3dd5a803208 100644 --- a/src/Runner.Common/ConfigurationStore.cs +++ b/src/Runner.Common/ConfigurationStore.cs @@ -53,6 +53,9 @@ public sealed class RunnerSettings [DataMember(EmitDefaultValue = false)] public bool UseV2Flow { get; set; } + [DataMember(EmitDefaultValue = false)] + public string ServerUrlV2 { get; set; } + [IgnoreDataMember] public bool IsHostedServer { diff --git a/src/Runner.Listener/BrokerMessageListener.cs b/src/Runner.Listener/BrokerMessageListener.cs index e4f64ae6a63..ac73bae3a96 100644 --- a/src/Runner.Listener/BrokerMessageListener.cs +++ b/src/Runner.Listener/BrokerMessageListener.cs @@ -204,13 +204,12 @@ ex is AccessDeniedException || private async Task RefreshBrokerConnection() { - var configManager = HostContext.GetService(); _settings = configManager.LoadSettings(); var credMgr = HostContext.GetService(); VssCredentials creds = credMgr.LoadCredentials(); - await _brokerServer.ConnectAsync(new Uri("http://broker.actions.localhost"), creds); + await _brokerServer.ConnectAsync(new Uri(_settings.ServerUrlV2), creds); } } } From c2307b3c9260c4a52b86f6c264f502f71d8596a3 Mon Sep 17 00:00:00 2001 From: Luke Tomlinson Date: Thu, 23 Mar 2023 08:02:21 -0700 Subject: [PATCH 6/9] . --- src/Runner.Listener/MessageListener.cs | 2 +- src/Runner.Listener/Runner.cs | 14 +++++++++++++- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/src/Runner.Listener/MessageListener.cs b/src/Runner.Listener/MessageListener.cs index 80c0227eae5..cb8edb607fe 100644 --- a/src/Runner.Listener/MessageListener.cs +++ b/src/Runner.Listener/MessageListener.cs @@ -16,7 +16,7 @@ namespace GitHub.Runner.Listener { - [ServiceLocator(Default = typeof(BrokerMessageListener))] + [ServiceLocator(Default = typeof(MessageListener))] public interface IMessageListener : IRunnerService { Task CreateSessionAsync(CancellationToken token); diff --git a/src/Runner.Listener/Runner.cs b/src/Runner.Listener/Runner.cs index c727f1b3865..e95379ad385 100644 --- a/src/Runner.Listener/Runner.cs +++ b/src/Runner.Listener/Runner.cs @@ -339,13 +339,25 @@ private void CtrlCHandler(object sender, EventArgs e) } } + private IMessageListener GetMesageListener(RunnerSettings settings) + { + if (settings.UseV2Flow) + { + var brokerListener = new BrokerMessageListener(); + brokerListener.Initialize(HostContext); + return brokerListener; + } + + return HostContext.GetService(); + } + //create worker manager, create message listener and start listening to the queue private async Task RunAsync(RunnerSettings settings, bool runOnce = false) { try { Trace.Info(nameof(RunAsync)); - _listener = HostContext.GetService(); + _listener = GetMesageListener(settings); if (!await _listener.CreateSessionAsync(HostContext.RunnerShutdownToken)) { return Constants.Runner.ReturnCode.TerminatedError; From 0989ee93d65f41fc5d4ac25db689c790e466e08a Mon Sep 17 00:00:00 2001 From: Luke Tomlinson Date: Thu, 23 Mar 2023 08:31:55 -0700 Subject: [PATCH 7/9] . --- src/Runner.Listener/BrokerMessageListener.cs | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/Runner.Listener/BrokerMessageListener.cs b/src/Runner.Listener/BrokerMessageListener.cs index ac73bae3a96..30324449141 100644 --- a/src/Runner.Listener/BrokerMessageListener.cs +++ b/src/Runner.Listener/BrokerMessageListener.cs @@ -48,7 +48,6 @@ public async Task DeleteSessionAsync() public void OnJobStatus(object sender, JobStatusEventArgs e) { - Trace.Info("Received job status event. JobState: {0}", e.Status); runnerStatus = e.Status; try @@ -59,7 +58,6 @@ public void OnJobStatus(object sender, JobStatusEventArgs e) { Trace.Info("_getMessagesTokenSource is already disposed."); } - } public async Task GetNextMessageAsync(CancellationToken token) From 0f8ed8f7fce58012897680f603823503f1a299fe Mon Sep 17 00:00:00 2001 From: Luke Tomlinson Date: Thu, 23 Mar 2023 11:10:35 -0700 Subject: [PATCH 8/9] Cleanup --- .vscode/launch.json | 13 ++++--------- src/Runner.Listener/BrokerMessageListener.cs | 16 +--------------- 2 files changed, 5 insertions(+), 24 deletions(-) diff --git a/.vscode/launch.json b/.vscode/launch.json index 887789e3bb4..3c5f5c694cd 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -12,10 +12,7 @@ ], "cwd": "${workspaceFolder}/src", "console": "integratedTerminal", - "requireExactSource": false, - "env": { - "USE_BROKER_FLOW": "1" - } + "requireExactSource": false }, { "name": "Run", @@ -27,10 +24,7 @@ ], "cwd": "${workspaceFolder}/src", "console": "integratedTerminal", - "requireExactSource": false, - "env": { - "USE_BROKER_FLOW": "1" - } + "requireExactSource": false }, { "name": "Configure", @@ -60,4 +54,5 @@ "requireExactSource": false }, ], -} \ No newline at end of file +} + diff --git a/src/Runner.Listener/BrokerMessageListener.cs b/src/Runner.Listener/BrokerMessageListener.cs index 30324449141..c0c7516b8a7 100644 --- a/src/Runner.Listener/BrokerMessageListener.cs +++ b/src/Runner.Listener/BrokerMessageListener.cs @@ -25,7 +25,6 @@ public sealed class BrokerMessageListener : RunnerService, IMessageListener private TaskAgentStatus runnerStatus = TaskAgentStatus.Online; private CancellationTokenSource _getMessagesTokenSource; private IBrokerServer _brokerServer; - private string lastRunnerRequestId; public override void Initialize(IHostContext hostContext) { @@ -80,20 +79,7 @@ public async Task GetNextMessageAsync(CancellationToken token) continue; } - if (MessageUtil.IsRunServiceJob(message.MessageType)) - { - var messageRef = StringUtil.ConvertFromJson(message.Body); - - if (messageRef.RunnerRequestId != lastRunnerRequestId) - { - lastRunnerRequestId = messageRef.RunnerRequestId; - return message; - } - } - else - { - return message; - } + return message; } catch (OperationCanceledException) when (_getMessagesTokenSource.Token.IsCancellationRequested && !token.IsCancellationRequested) { From 567c0f33bd154835fa6a28c6b98598080d009f27 Mon Sep 17 00:00:00 2001 From: Luke Tomlinson Date: Fri, 24 Mar 2023 10:21:13 -0700 Subject: [PATCH 9/9] PR feedback --- src/Runner.Listener/BrokerMessageListener.cs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/Runner.Listener/BrokerMessageListener.cs b/src/Runner.Listener/BrokerMessageListener.cs index c0c7516b8a7..93259c8e56f 100644 --- a/src/Runner.Listener/BrokerMessageListener.cs +++ b/src/Runner.Listener/BrokerMessageListener.cs @@ -65,6 +65,7 @@ public async Task GetNextMessageAsync(CancellationToken token) int continuousError = 0; Stopwatch heartbeat = new(); heartbeat.Restart(); + var maxRetryCount = 10; while (true) { @@ -119,6 +120,10 @@ public async Task GetNextMessageAsync(CancellationToken token) // random backoff [15, 30] _getNextMessageRetryInterval = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromSeconds(15), TimeSpan.FromSeconds(30), _getNextMessageRetryInterval); } + else if (continuousError >= maxRetryCount) + { + throw; + } else { // more aggressive backoff [30, 60]