Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/Runner.Common/RunServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ public Task<AgentJobRequestMessage> GetJobMessageAsync(string id, CancellationTo
{
CheckConnection();
return RetryRequest<AgentJobRequestMessage>(
async () => await _runServiceHttpClient.GetJobMessageAsync(requestUri, id, cancellationToken), cancellationToken);
async () => await _runServiceHttpClient.GetJobMessageAsync(requestUri, id, cancellationToken), cancellationToken,
shouldRetry: ex => ex is not TaskOrchestrationJobAlreadyAcquiredException);
}

public Task CompleteJobAsync(Guid planId, Guid jobId, TaskResult result, Dictionary<String, VariableValue> outputs, IList<StepResult> stepResults, CancellationToken cancellationToken)
Expand Down
9 changes: 5 additions & 4 deletions src/Runner.Common/RunnerService.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System;
using System;
using System.Threading;
using System.Threading.Tasks;
using GitHub.Runner.Sdk;
Expand Down Expand Up @@ -80,10 +80,11 @@ async Task<Unit> wrappedFunc()
}
await RetryRequest<Unit>(wrappedFunc, cancellationToken, maxRetryAttemptsCount);
}

protected async Task<T> RetryRequest<T>(Func<Task<T>> func,
CancellationToken cancellationToken,
int maxRetryAttemptsCount = 5
int maxRetryAttemptsCount = 5,
Func<Exception, bool> shouldRetry = null
)
{
var retryCount = 0;
Expand All @@ -96,7 +97,7 @@ protected async Task<T> RetryRequest<T>(Func<Task<T>> func,
return await func();
}
// TODO: Add handling of non-retriable exceptions: https://github.com/github/actions-broker/issues/122
catch (Exception ex) when (retryCount < maxRetryAttemptsCount)
catch (Exception ex) when (retryCount < maxRetryAttemptsCount && (shouldRetry == null || shouldRetry(ex)))
{
Trace.Error("Catch exception during request");
Trace.Error(ex);
Expand Down
12 changes: 11 additions & 1 deletion src/Runner.Listener/Runner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,17 @@ private async Task<int> RunAsync(RunnerSettings settings, bool runOnce = false)
{
var runServer = HostContext.CreateService<IRunServer>();
await runServer.ConnectAsync(new Uri(messageRef.RunServiceUrl), creds);
jobRequestMessage = await runServer.GetJobMessageAsync(messageRef.RunnerRequestId, messageQueueLoopTokenSource.Token);
try
{
jobRequestMessage =
await runServer.GetJobMessageAsync(messageRef.RunnerRequestId,
messageQueueLoopTokenSource.Token);
}
catch (TaskOrchestrationJobAlreadyAcquiredException)
{
Trace.Info("Job is already acquired, skip this message.");
continue;
}
}

jobDispatcher.Run(jobRequestMessage, runOnce);
Expand Down
22 changes: 21 additions & 1 deletion src/Sdk/DTWebApi/WebApi/Exceptions.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System;
using System;
using System.Runtime.Serialization;
using GitHub.Services.Common;

Expand Down Expand Up @@ -1519,6 +1519,26 @@ private TaskOrchestrationJobNotFoundException(SerializationInfo info, StreamingC
}
}

[Serializable]
[ExceptionMapping("0.0", "3.0", "TaskOrchestrationJobAlreadyAcquiredException", "GitHub.DistributedTask.WebApi.TaskOrchestrationJobAlreadyAcquiredException, GitHub.DistributedTask.WebApi, Version=14.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a")]
public sealed class TaskOrchestrationJobAlreadyAcquiredException : DistributedTaskException
{
public TaskOrchestrationJobAlreadyAcquiredException(String message)
: base(message)
{
}

public TaskOrchestrationJobAlreadyAcquiredException(String message, Exception innerException)
: base(message, innerException)
{
}

private TaskOrchestrationJobAlreadyAcquiredException(SerializationInfo info, StreamingContext context)
: base(info, context)
{
}
}

[Serializable]
[ExceptionMapping("0.0", "3.0", "TaskOrchestrationPlanSecurityException", "GitHub.DistributedTask.WebApi.TaskOrchestrationPlanSecurityException, GitHub.DistributedTask.WebApi, Version=14.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a")]
public sealed class TaskOrchestrationPlanSecurityException : DistributedTaskException
Expand Down
2 changes: 2 additions & 0 deletions src/Sdk/RSWebApi/RunServiceHttpClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ public async Task<AgentJobRequestMessage> GetJobMessageAsync(
{
case HttpStatusCode.NotFound:
throw new TaskOrchestrationJobNotFoundException($"Job message not found: {messageId}");
case HttpStatusCode.Conflict:
throw new TaskOrchestrationJobAlreadyAcquiredException($"Job message already acquired: {messageId}");
default:
throw new Exception($"Failed to get job message: {result.Error}");
}
Expand Down