Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions src/Runner.Common/ActionsRunServer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using GitHub.DistributedTask.Pipelines;
using GitHub.DistributedTask.WebApi;
using GitHub.Services.Common;
using GitHub.Services.WebApi;

namespace GitHub.Runner.Common
{
[ServiceLocator(Default = typeof(ActionsRunServer))]
public interface IActionsRunServer : IRunnerService
{
Task ConnectAsync(Uri serverUrl, VssCredentials credentials);

Task<AgentJobRequestMessage> GetJobMessageAsync(string id, CancellationToken token);
}

public sealed class ActionsRunServer : RunnerService, IActionsRunServer
{
private bool _hasConnection;
private VssConnection _connection;
private TaskAgentHttpClient _taskAgentClient;

public async Task ConnectAsync(Uri serverUrl, VssCredentials credentials)
{
_connection = await EstablishVssConnection(serverUrl, credentials, TimeSpan.FromSeconds(100));
_taskAgentClient = _connection.GetClient<TaskAgentHttpClient>();
_hasConnection = true;
}

private void CheckConnection()
{
if (!_hasConnection)
{
throw new InvalidOperationException($"SetConnection");
}
}

public Task<AgentJobRequestMessage> GetJobMessageAsync(string id, CancellationToken cancellationToken)
{
CheckConnection();
var jobMessage = RetryRequest<AgentJobRequestMessage>(async () =>
{
return await _taskAgentClient.GetJobMessageAsync(id, cancellationToken);
}, cancellationToken);
Comment thread
yaananth marked this conversation as resolved.

return jobMessage;
}
}
}
78 changes: 17 additions & 61 deletions src/Runner.Common/RunServer.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
using System;
using System;
using System.Threading;
using System.Threading.Tasks;
using GitHub.DistributedTask.Pipelines;
using GitHub.DistributedTask.WebApi;
using GitHub.Runner.Sdk;
using GitHub.Services.Common;
using GitHub.Services.WebApi;
using Sdk.WebApi.WebApi.RawClient;

namespace GitHub.Runner.Common
{
Expand All @@ -20,40 +21,17 @@ public interface IRunServer : IRunnerService
public sealed class RunServer : RunnerService, IRunServer
Comment thread
yaananth marked this conversation as resolved.
{
private bool _hasConnection;
private VssConnection _connection;
private TaskAgentHttpClient _taskAgentClient;
private Uri requestUri;
private RawConnection _connection;
private RunServiceHttpClient _runServiceHttpClient;

public async Task ConnectAsync(Uri serverUrl, VssCredentials credentials)
public async Task ConnectAsync(Uri serverUri, VssCredentials credentials)
{
_connection = await EstablishVssConnection(serverUrl, credentials, TimeSpan.FromSeconds(100));
_taskAgentClient = _connection.GetClient<TaskAgentHttpClient>();
_hasConnection = true;
}

private async Task<VssConnection> EstablishVssConnection(Uri serverUrl, VssCredentials credentials, TimeSpan timeout)
Comment thread
yaananth marked this conversation as resolved.
{
Trace.Info($"EstablishVssConnection");
Trace.Info($"Establish connection with {timeout.TotalSeconds} seconds timeout.");
int attemptCount = 5;
while (attemptCount-- > 0)
{
var connection = VssUtil.CreateConnection(serverUrl, credentials, timeout: timeout);
try
{
await connection.ConnectAsync();
return connection;
}
catch (Exception ex) when (attemptCount > 0)
{
Trace.Info($"Catch exception during connect. {attemptCount} attempt left.");
Trace.Error(ex);

await HostContext.Delay(TimeSpan.FromMilliseconds(100), CancellationToken.None);
}
}
requestUri = serverUri;

// should never reach here.
throw new InvalidOperationException(nameof(EstablishVssConnection));
_connection = VssUtil.CreateRawConnection(new Uri(serverUri.Authority), credentials);
_runServiceHttpClient = await _connection.GetClientAsync<RunServiceHttpClient>();
_hasConnection = true;
}

private void CheckConnection()
Expand All @@ -67,37 +45,15 @@ private void CheckConnection()
public Task<AgentJobRequestMessage> GetJobMessageAsync(string id, CancellationToken cancellationToken)
{
CheckConnection();
var jobMessage = RetryRequest<AgentJobRequestMessage>(async () =>
{
return await _taskAgentClient.GetJobMessageAsync(id, cancellationToken);
}, cancellationToken);
return jobMessage;
}

private async Task<T> RetryRequest<T>(Func<Task<T>> func,
CancellationToken cancellationToken,
int maxRetryAttemptsCount = 5
)
{
var retryCount = 0;
while (true)
var jobMessage = RetryRequest<AgentJobRequestMessage>(
async () => await _runServiceHttpClient.GetJobMessageAsync(requestUri, id, cancellationToken), cancellationToken);
if (jobMessage == null)
{
retryCount++;
cancellationToken.ThrowIfCancellationRequested();
try
{
return await func();
}
// TODO: Add handling of non-retriable exceptions: https://github.com/github/actions-broker/issues/122
catch (Exception ex) when (retryCount < maxRetryAttemptsCount)
{
Trace.Error("Catch exception during get full job message");
Trace.Error(ex);
var backOff = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(15));
Trace.Warning($"Back off {backOff.TotalSeconds} seconds before next retry. {maxRetryAttemptsCount - retryCount} attempt left.");
await Task.Delay(backOff, cancellationToken);
}
throw new TaskOrchestrationJobNotFoundException(id);
}

return jobMessage;
}

}
}
25 changes: 0 additions & 25 deletions src/Runner.Common/RunnerServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -179,31 +179,6 @@ public void SetConnectionTimeout(RunnerConnectionType connectionType, TimeSpan t
}
}

private async Task<VssConnection> EstablishVssConnection(Uri serverUrl, VssCredentials credentials, TimeSpan timeout)
Comment thread
yaananth marked this conversation as resolved.
{
Trace.Info($"Establish connection with {timeout.TotalSeconds} seconds timeout.");
int attemptCount = 5;
while (attemptCount-- > 0)
{
var connection = VssUtil.CreateConnection(serverUrl, credentials, timeout: timeout);
try
{
await connection.ConnectAsync();
return connection;
}
catch (Exception ex) when (attemptCount > 0)
{
Trace.Info($"Catch exception during connect. {attemptCount} attempt left.");
Trace.Error(ex);

await HostContext.Delay(TimeSpan.FromMilliseconds(100), CancellationToken.None);
}
}

// should never reach here.
throw new InvalidOperationException(nameof(EstablishVssConnection));
}

private void CheckConnection(RunnerConnectionType connectionType)
{
switch (connectionType)
Expand Down
62 changes: 60 additions & 2 deletions src/Runner.Common/RunnerService.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using GitHub.Runner.Sdk;
using GitHub.Services.Common;
using GitHub.Services.WebApi;
using Sdk.WebApi.WebApi.RawClient;

namespace GitHub.Runner.Common
{
Expand All @@ -21,9 +27,9 @@ public abstract class RunnerService
protected IHostContext HostContext { get; private set; }
protected Tracing Trace { get; private set; }

public string TraceName
public string TraceName
{
get
get
{
return GetType().Name;
}
Expand All @@ -35,5 +41,57 @@ public virtual void Initialize(IHostContext hostContext)
Trace = HostContext.GetTrace(TraceName);
Trace.Entering();
}

protected async Task<VssConnection> EstablishVssConnection(Uri serverUrl, VssCredentials credentials, TimeSpan timeout)
{
Trace.Info($"EstablishVssConnection");
Trace.Info($"Establish connection with {timeout.TotalSeconds} seconds timeout.");
int attemptCount = 5;
while (attemptCount-- > 0)
{
var connection = VssUtil.CreateConnection(serverUrl, credentials, timeout: timeout);
try
{
await connection.ConnectAsync();
return connection;
}
catch (Exception ex) when (attemptCount > 0)
{
Trace.Info($"Catch exception during connect. {attemptCount} attempt left.");
Trace.Error(ex);

await HostContext.Delay(TimeSpan.FromMilliseconds(100), CancellationToken.None);
}
}

// should never reach here.
throw new InvalidOperationException(nameof(EstablishVssConnection));
}

protected async Task<T> RetryRequest<T>(Func<Task<T>> func,
CancellationToken cancellationToken,
int maxRetryAttemptsCount = 5
)
{
var retryCount = 0;
while (true)
{
retryCount++;
cancellationToken.ThrowIfCancellationRequested();
try
{
return await func();
}
// TODO: Add handling of non-retriable exceptions: https://github.com/github/actions-broker/issues/122
catch (Exception ex) when (retryCount < maxRetryAttemptsCount)
{
Trace.Error("Catch exception during get full job message");
Trace.Error(ex);
var backOff = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(15));
Trace.Warning($"Back off {backOff.TotalSeconds} seconds before next retry. {maxRetryAttemptsCount - retryCount} attempt left.");
await Task.Delay(backOff, cancellationToken);
}
}
}
}
}
18 changes: 14 additions & 4 deletions src/Runner.Listener/Runner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -496,16 +496,26 @@ private async Task<int> RunAsync(RunnerSettings settings, bool runOnce = false)
else
{
var messageRef = StringUtil.ConvertFromJson<RunnerJobRequestRef>(message.Body);
Pipelines.AgentJobRequestMessage jobRequestMessage = null;

// Create connection
var credMgr = HostContext.GetService<ICredentialManager>();
var creds = credMgr.LoadCredentials();

var runServer = HostContext.CreateService<IRunServer>();
await runServer.ConnectAsync(new Uri(settings.ServerUrl), creds);
var jobMessage = await runServer.GetJobMessageAsync(messageRef.RunnerRequestId, messageQueueLoopTokenSource.Token);
if (string.IsNullOrEmpty(messageRef.RunServiceUrl))
Comment thread
yaananth marked this conversation as resolved.
{
var actionsRunServer = HostContext.CreateService<IActionsRunServer>();

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These should be HostContext.GetService<> so the underline object will be reused.

@yaananth yaananth Oct 7, 2022

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👌🏼 I didn't do this btw, this was his how it was before 🤷🏻‍♂️ I added new code doing the same
Do you want me to do a follow up PR with just this change or do we have any other PR's that we can bundle this with?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe just a follow-up PR for this change only.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do, thanks!

await actionsRunServer.ConnectAsync(new Uri(settings.ServerUrl), creds);
jobRequestMessage = await actionsRunServer.GetJobMessageAsync(messageRef.RunnerRequestId, messageQueueLoopTokenSource.Token);
}
else
{
var runServer = HostContext.CreateService<IRunServer>();
await runServer.ConnectAsync(new Uri(messageRef.RunServiceUrl), creds);
jobRequestMessage = await runServer.GetJobMessageAsync(messageRef.RunnerRequestId, messageQueueLoopTokenSource.Token);
}

jobDispatcher.Run(jobMessage, runOnce);
jobDispatcher.Run(jobRequestMessage, runOnce);
if (runOnce)
{
Trace.Info("One time used runner received job message.");
Expand Down
4 changes: 3 additions & 1 deletion src/Runner.Listener/RunnerJobRequestRef.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,7 @@ public sealed class RunnerJobRequestRef
public string Id { get; set; }
[DataMember(Name = "runner_request_id")]
public string RunnerRequestId { get; set; }
[DataMember(Name = "run_service_url")]
public string RunServiceUrl { get; set; }
Comment thread
yaananth marked this conversation as resolved.
}
}
}
47 changes: 46 additions & 1 deletion src/Runner.Sdk/Util/VssUtil.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using System.Net.Http.Headers;
using System.Runtime.InteropServices;
using System.Net;
using Sdk.WebApi.WebApi.RawClient;

namespace GitHub.Runner.Sdk
{
Expand All @@ -34,7 +35,11 @@ public static void InitializeVssClientSettings(List<ProductInfoHeaderValue> addi
}
}

public static VssConnection CreateConnection(Uri serverUri, VssCredentials credentials, IEnumerable<DelegatingHandler> additionalDelegatingHandler = null, TimeSpan? timeout = null)
public static VssConnection CreateConnection(
Uri serverUri,
VssCredentials credentials,
IEnumerable<DelegatingHandler> additionalDelegatingHandler = null,
TimeSpan? timeout = null)
{
VssClientHttpRequestSettings settings = VssClientHttpRequestSettings.Default.Clone();

Expand Down Expand Up @@ -75,6 +80,46 @@ public static VssConnection CreateConnection(Uri serverUri, VssCredentials crede
return connection;
}

public static RawConnection CreateRawConnection(
Comment thread
yaananth marked this conversation as resolved.
Uri serverUri,
VssCredentials credentials,
IEnumerable<DelegatingHandler> additionalDelegatingHandler = null,
TimeSpan? timeout = null)
{
RawClientHttpRequestSettings settings = RawClientHttpRequestSettings.Default.Clone();

int maxRetryRequest;
if (!int.TryParse(Environment.GetEnvironmentVariable("GITHUB_ACTIONS_RUNNER_HTTP_RETRY") ?? string.Empty, out maxRetryRequest))
{
maxRetryRequest = 3;
}

// make sure MaxRetryRequest in range [3, 10]
settings.MaxRetryRequest = Math.Min(Math.Max(maxRetryRequest, 3), 10);

if (!int.TryParse(Environment.GetEnvironmentVariable("GITHUB_ACTIONS_RUNNER_HTTP_TIMEOUT") ?? string.Empty, out int httpRequestTimeoutSeconds))
{
settings.SendTimeout = timeout ?? TimeSpan.FromSeconds(100);
}
else
{
// prefer environment variable
settings.SendTimeout = TimeSpan.FromSeconds(Math.Min(Math.Max(httpRequestTimeoutSeconds, 100), 1200));
}

// Remove Invariant from the list of accepted languages.
//
// The constructor of VssHttpRequestSettings (base class of VssClientHttpRequestSettings) adds the current
// UI culture to the list of accepted languages. The UI culture will be Invariant on OSX/Linux when the
// LANG environment variable is not set when the program starts. If Invariant is in the list of accepted
// languages, then "System.ArgumentException: The value cannot be null or empty." will be thrown when the
// settings are applied to an HttpRequestMessage.
settings.AcceptLanguages.Remove(CultureInfo.InvariantCulture);

RawConnection connection = new RawConnection(serverUri, new RawHttpMessageHandler(credentials.ToOAuthCredentials(), settings), additionalDelegatingHandler);
return connection;
}

public static VssCredentials GetVssCredential(ServiceEndpoint serviceEndpoint)
{
ArgUtil.NotNull(serviceEndpoint, nameof(serviceEndpoint));
Expand Down
Loading