Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions src/Runner.Common/BrokerServer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using GitHub.Actions.RunService.WebApi;
using GitHub.DistributedTask.Pipelines;
using GitHub.DistributedTask.WebApi;
using GitHub.Runner.Sdk;
using GitHub.Services.Common;
using Sdk.RSWebApi.Contracts;
using Sdk.WebApi.WebApi.RawClient;

namespace GitHub.Runner.Common
{
[ServiceLocator(Default = typeof(BrokerServer))]
public interface IBrokerServer : IRunnerService
{
Task ConnectAsync(Uri serverUrl, VssCredentials credentials);

Task<TaskAgentMessage> GetRunnerMessageAsync(CancellationToken token, TaskAgentStatus status, string version);
}

public sealed class BrokerServer : RunnerService, IBrokerServer
{
private bool _hasConnection;
private Uri _brokerUri;
private RawConnection _connection;
private BrokerHttpClient _brokerHttpClient;

public async Task ConnectAsync(Uri serverUri, VssCredentials credentials)
{
_brokerUri = serverUri;

_connection = VssUtil.CreateRawConnection(serverUri, credentials);
_brokerHttpClient = await _connection.GetClientAsync<BrokerHttpClient>();
_hasConnection = true;
}

private void CheckConnection()
{
if (!_hasConnection)
{
throw new InvalidOperationException($"SetConnection");
}
}

public Task<TaskAgentMessage> GetRunnerMessageAsync(CancellationToken cancellationToken, TaskAgentStatus status, string version)
{
CheckConnection();
var jobMessage = RetryRequest<TaskAgentMessage>(
async () => await _brokerHttpClient.GetRunnerMessageAsync(version, status, cancellationToken), cancellationToken);

return jobMessage;
}
}
}
3 changes: 3 additions & 0 deletions src/Runner.Common/ConfigurationStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ public sealed class RunnerSettings
[DataMember(EmitDefaultValue = false)]
public bool UseV2Flow { get; set; }

[DataMember(EmitDefaultValue = false)]
public string ServerUrlV2 { get; set; }

[IgnoreDataMember]
public bool IsHostedServer
{
Expand Down
204 changes: 204 additions & 0 deletions src/Runner.Listener/BrokerMessageListener.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Runtime.InteropServices;
using System.Security.Cryptography;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using GitHub.DistributedTask.WebApi;
using GitHub.Runner.Common;
using GitHub.Runner.Listener.Configuration;
using GitHub.Runner.Sdk;
using GitHub.Services.Common;
using GitHub.Runner.Common.Util;
using GitHub.Services.OAuth;

namespace GitHub.Runner.Listener
{
public sealed class BrokerMessageListener : RunnerService, IMessageListener
{
private RunnerSettings _settings;
private ITerminal _term;
private TimeSpan _getNextMessageRetryInterval;
private TaskAgentStatus runnerStatus = TaskAgentStatus.Online;
private CancellationTokenSource _getMessagesTokenSource;
private IBrokerServer _brokerServer;

public override void Initialize(IHostContext hostContext)
{
base.Initialize(hostContext);

_term = HostContext.GetService<ITerminal>();
_brokerServer = HostContext.GetService<IBrokerServer>();
}

public async Task<Boolean> CreateSessionAsync(CancellationToken token)
{
await RefreshBrokerConnection();
return await Task.FromResult(true);
}

public async Task DeleteSessionAsync()
{
await Task.CompletedTask;
}

public void OnJobStatus(object sender, JobStatusEventArgs e)
{
Trace.Info("Received job status event. JobState: {0}", e.Status);
runnerStatus = e.Status;
try
{
_getMessagesTokenSource?.Cancel();
}
catch (ObjectDisposedException)
{
Trace.Info("_getMessagesTokenSource is already disposed.");
}
}

public async Task<TaskAgentMessage> GetNextMessageAsync(CancellationToken token)
{
bool encounteringError = false;
int continuousError = 0;
Stopwatch heartbeat = new();
heartbeat.Restart();
var maxRetryCount = 10;

while (true)
{
TaskAgentMessage message = null;
_getMessagesTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token);
try
{
message = await _brokerServer.GetRunnerMessageAsync(_getMessagesTokenSource.Token, runnerStatus, BuildConstants.RunnerPackage.Version);

if (message == null)
{
continue;
}

return message;
}
catch (OperationCanceledException) when (_getMessagesTokenSource.Token.IsCancellationRequested && !token.IsCancellationRequested)
{
Trace.Info("Get messages has been cancelled using local token source. Continue to get messages with new status.");
continue;
}
catch (OperationCanceledException) when (token.IsCancellationRequested)
{
Trace.Info("Get next message has been cancelled.");
throw;
}
catch (TaskAgentAccessTokenExpiredException)
{
Trace.Info("Runner OAuth token has been revoked. Unable to pull message.");
throw;
}
catch (AccessDeniedException e) when (e.InnerException is InvalidTaskAgentVersionException)
{
throw;
}
catch (Exception ex)
{
Trace.Error("Catch exception during get next message.");
Trace.Error(ex);

if (!IsGetNextMessageExceptionRetriable(ex))
{
throw;
}
else
{
continuousError++;
//retry after a random backoff to avoid service throttling
//in case of there is a service error happened and all agents get kicked off of the long poll and all agent try to reconnect back at the same time.
if (continuousError <= 5)
{
// random backoff [15, 30]
_getNextMessageRetryInterval = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromSeconds(15), TimeSpan.FromSeconds(30), _getNextMessageRetryInterval);
}
else if (continuousError >= maxRetryCount)
{
throw;
}
else
{
// more aggressive backoff [30, 60]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically we'll retry forever in case of retriable exceptions?
Nit: Regarding comment, It's less aggressive right? since we're increasing the retry intervals.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep I think so. This is copied behavior from the existing Message listener

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it could be read as "more aggressively backing off by increasing the time" 😄

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could have some maximum of continuousError where I abort if it reaches that

_getNextMessageRetryInterval = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromSeconds(30), TimeSpan.FromSeconds(60), _getNextMessageRetryInterval);
}

if (!encounteringError)
{
//print error only on the first consecutive error
_term.WriteError($"{DateTime.UtcNow:u}: Runner connect error: {ex.Message}. Retrying until reconnected.");
encounteringError = true;
}

// re-create VssConnection before next retry
await RefreshBrokerConnection();

Trace.Info("Sleeping for {0} seconds before retrying.", _getNextMessageRetryInterval.TotalSeconds);
await HostContext.Delay(_getNextMessageRetryInterval, token);
}
}
finally
{
_getMessagesTokenSource.Dispose();
}

if (message == null)
{
if (heartbeat.Elapsed > TimeSpan.FromMinutes(30))
{
Trace.Info($"No message retrieved within last 30 minutes.");
heartbeat.Restart();
}
else
{
Trace.Verbose($"No message retrieved.");
}

continue;
}

Trace.Info($"Message '{message.MessageId}' received.");
}
}

public async Task DeleteMessageAsync(TaskAgentMessage message)
{
await Task.CompletedTask;
}

private bool IsGetNextMessageExceptionRetriable(Exception ex)
{
if (ex is TaskAgentNotFoundException ||
ex is TaskAgentPoolNotFoundException ||
ex is TaskAgentSessionExpiredException ||
ex is AccessDeniedException ||
ex is VssUnauthorizedException)
{
Trace.Info($"Non-retriable exception: {ex.Message}");
return false;
}
else
{
Trace.Info($"Retriable exception: {ex.Message}");
return true;
}
}

private async Task RefreshBrokerConnection()
{
var configManager = HostContext.GetService<IConfigurationManager>();
_settings = configManager.LoadSettings();

var credMgr = HostContext.GetService<ICredentialManager>();
VssCredentials creds = credMgr.LoadCredentials();
await _brokerServer.ConnectAsync(new Uri(_settings.ServerUrlV2), creds);
}
}
}
14 changes: 13 additions & 1 deletion src/Runner.Listener/Runner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -339,13 +339,25 @@ private void CtrlCHandler(object sender, EventArgs e)
}
}

private IMessageListener GetMesageListener(RunnerSettings settings)
{
if (settings.UseV2Flow)
{
var brokerListener = new BrokerMessageListener();
brokerListener.Initialize(HostContext);
return brokerListener;
}

return HostContext.GetService<IMessageListener>();
}

//create worker manager, create message listener and start listening to the queue
private async Task<int> RunAsync(RunnerSettings settings, bool runOnce = false)
{
try
{
Trace.Info(nameof(RunAsync));
_listener = HostContext.GetService<IMessageListener>();
_listener = GetMesageListener(settings);
if (!await _listener.CreateSessionAsync(HostContext.RunnerShutdownToken))
{
return Constants.Runner.ReturnCode.TerminatedError;
Expand Down
84 changes: 84 additions & 0 deletions src/Sdk/WebApi/WebApi/BrokerHttpClient.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using GitHub.DistributedTask.Pipelines;
using GitHub.DistributedTask.WebApi;
using GitHub.Services.Common;
using GitHub.Services.OAuth;
using GitHub.Services.WebApi;
using Sdk.RSWebApi.Contracts;
using Sdk.WebApi.WebApi;

namespace GitHub.Actions.RunService.WebApi
{
public class BrokerHttpClient : RawHttpClientBase
{
public BrokerHttpClient(
Uri baseUrl,
VssOAuthCredential credentials)
: base(baseUrl, credentials)
{
}

public BrokerHttpClient(
Uri baseUrl,
VssOAuthCredential credentials,
RawClientHttpRequestSettings settings)
: base(baseUrl, credentials, settings)
{
}

public BrokerHttpClient(
Uri baseUrl,
VssOAuthCredential credentials,
params DelegatingHandler[] handlers)
: base(baseUrl, credentials, handlers)
{
}

public BrokerHttpClient(
Uri baseUrl,
VssOAuthCredential credentials,
RawClientHttpRequestSettings settings,
params DelegatingHandler[] handlers)
: base(baseUrl, credentials, settings, handlers)
{
}

public BrokerHttpClient(
Uri baseUrl,
HttpMessageHandler pipeline,
Boolean disposeHandler)
: base(baseUrl, pipeline, disposeHandler)
{
}

public Task<TaskAgentMessage> GetRunnerMessageAsync(
string runnerVersion,
TaskAgentStatus? status,
CancellationToken cancellationToken = default
)
{
var requestUri = new Uri(Client.BaseAddress, "message");

List<KeyValuePair<string, string>> queryParams = new List<KeyValuePair<string, string>>();

if (status != null)
{
queryParams.Add("status", status.Value.ToString());
}
if (runnerVersion != null)
{
queryParams.Add("runnerVersion", runnerVersion);
}

return SendAsync<TaskAgentMessage>(
new HttpMethod("GET"),
requestUri: requestUri,
queryParameters: queryParams,
cancellationToken: cancellationToken);
}
}
}