Skip to content

Commit 9c4b19b

Browse files
authored
Merge pull request #19 from BeshoyHindy/release/v0.1.0-alpha.5
Release v0.1.0-alpha.5
2 parents d4ba784 + 0bf4aab commit 9c4b19b

File tree

13 files changed

+185
-58
lines changed

13 files changed

+185
-58
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -482,3 +482,6 @@ $RECYCLE.BIN/
482482

483483
# Vim temporary swap files
484484
*.swp
485+
486+
# Temporary test projects
487+
_temp-*

CHANGELOG.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,22 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
## [0.1.0-alpha.5] - 2026-02-23
11+
12+
### Changed
13+
14+
- `BuildAwakeableId` rewritten with `System.Buffers.Text.Base64Url` — eliminates 3-4 intermediate string allocations per awakeable
15+
- Serialization hot path (`Run`, `SetState`, `Call`) uses `ArrayPool<byte>` rentals via `CopyToPooled` instead of `.ToArray()` — reduces GC pressure for typical 50-500 byte payloads
16+
- Replay journal entries use `RawMessage.DetachPayload()` ownership transfer — eliminates 1 `byte[]` copy per replayed entry
17+
- `CompletionManager` uses `CompletionSlot` discriminated union struct instead of `ConcurrentDictionary<int, object>` — eliminates boxing of `CompletionResult` on early completions
18+
- `ProtocolReader` single-segment fast paths for header parsing and payload copy — avoids `stackalloc` + `ReadOnlySequence.CopyTo` overhead on the common Kestrel path
19+
- `NegotiateVersion` uses explicit version substring checks instead of loop iteration
20+
21+
### Fixed
22+
23+
- NativeAotCounter sample port 9088 → 9086 to match CI integration test expectations
24+
- NativeAotSaga sample port 9089 → 9087 to match CI integration test expectations
25+
1026
## [0.1.0-alpha.4] - 2026-02-23
1127

1228
### Added

Directory.Build.props

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
<!-- Shared NuGet package metadata -->
1212
<PropertyGroup>
13-
<Version>0.1.0-alpha.4</Version>
13+
<Version>0.1.0-alpha.5</Version>
1414
<Authors>Beshoy Hindy</Authors>
1515
<Company>Beshoy Hindy</Company>
1616
<Copyright>Copyright (c) 2026 Beshoy Hindy</Copyright>

samples/NativeAotCounter/Program.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,6 @@
66
// Publish with: dotnet publish -c Release
77
await RestateHost
88
.CreateBuilder()
9-
.WithPort(9088)
9+
.WithPort(9086)
1010
.BuildAot(services => services.AddRestateGenerated(AppJsonContext.Default))
1111
.RunAsync();

samples/NativeAotSaga/Program.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,6 @@
66
// Publish with: dotnet publish -c Release
77
await RestateHost
88
.CreateBuilder()
9-
.WithPort(9089)
9+
.WithPort(9087)
1010
.BuildAot(services => services.AddRestateGenerated(AppJsonContext.Default))
1111
.RunAsync();

src/Restate.Sdk/Hosting/RestateEndpointRouteBuilderExtensions.cs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,13 @@ public static class RestateEndpointRouteBuilderExtensions
3737
if (acceptHeader.Contains("*/*", StringComparison.Ordinal))
3838
return SupportedContentTypes[^1];
3939

40-
// Find the highest version we support that the client also supports
41-
foreach (var supported in SupportedContentTypes)
42-
{
43-
if (acceptHeader.Contains(supported, StringComparison.OrdinalIgnoreCase))
44-
return supported;
45-
}
40+
// Check version-specific substrings directly (highest priority first)
41+
if (acceptHeader.Contains("endpointmanifest.v3", StringComparison.OrdinalIgnoreCase))
42+
return SupportedContentTypes[0];
43+
if (acceptHeader.Contains("endpointmanifest.v2", StringComparison.OrdinalIgnoreCase))
44+
return SupportedContentTypes[1];
45+
if (acceptHeader.Contains("endpointmanifest.v1", StringComparison.OrdinalIgnoreCase))
46+
return SupportedContentTypes[2];
4647

4748
return null; // No mutually supported version → 415
4849
}

src/Restate.Sdk/Internal/Journal/CompletionManager.cs

Lines changed: 63 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -2,32 +2,70 @@
22

33
namespace Restate.Sdk.Internal.Journal;
44

5+
/// <summary>
6+
/// Discriminated union that avoids boxing CompletionResult (a struct) when stored
7+
/// in ConcurrentDictionary. Holds either a TCS, a CompletionResult, or a TerminalException.
8+
/// </summary>
9+
internal readonly struct CompletionSlot
10+
{
11+
private readonly object? _ref;
12+
private readonly CompletionResult _result;
13+
public SlotKind Kind { get; }
14+
15+
public enum SlotKind : byte { Tcs, Result, Failure }
16+
17+
public CompletionSlot(TaskCompletionSource<CompletionResult> tcs)
18+
{
19+
_ref = tcs;
20+
_result = default;
21+
Kind = SlotKind.Tcs;
22+
}
23+
24+
public CompletionSlot(CompletionResult result)
25+
{
26+
_ref = null;
27+
_result = result;
28+
Kind = SlotKind.Result;
29+
}
30+
31+
public CompletionSlot(TerminalException ex)
32+
{
33+
_ref = ex;
34+
_result = default;
35+
Kind = SlotKind.Failure;
36+
}
37+
38+
public TaskCompletionSource<CompletionResult> Tcs => (TaskCompletionSource<CompletionResult>)_ref!;
39+
public CompletionResult Result => _result;
40+
public TerminalException Exception => (TerminalException)_ref!;
41+
}
42+
543
// ConcurrentDictionary is required here: the handler thread calls GetOrRegister while
644
// ProcessIncomingMessagesAsync (running on a separate Task) calls TryComplete/TryFail.
745
// Early completions are stored so notifications arriving before registration are not lost.
846
internal sealed class CompletionManager
947
{
10-
private readonly ConcurrentDictionary<int, object> _slots = new();
48+
private readonly ConcurrentDictionary<int, CompletionSlot> _slots = new();
1149

1250
public TaskCompletionSource<CompletionResult> Register(int entryIndex)
1351
{
1452
var tcs = new TaskCompletionSource<CompletionResult>(TaskCreationOptions.RunContinuationsAsynchronously);
15-
if (_slots.TryAdd(entryIndex, tcs))
53+
if (_slots.TryAdd(entryIndex, new CompletionSlot(tcs)))
1654
return tcs;
1755

1856
// Slot already occupied — either an early completion or a duplicate registration.
1957
if (_slots.TryRemove(entryIndex, out var slot))
2058
{
21-
if (slot is TaskCompletionSource<CompletionResult>)
59+
if (slot.Kind == CompletionSlot.SlotKind.Tcs)
2260
throw new InvalidOperationException($"Entry {entryIndex} already registered");
2361

2462
// Early completion arrived before registration — resolve the TCS immediately.
25-
if (slot is CompletionResult result)
26-
tcs.SetResult(result);
27-
else if (slot is TerminalException ex)
28-
tcs.SetException(ex);
63+
if (slot.Kind == CompletionSlot.SlotKind.Result)
64+
tcs.SetResult(slot.Result);
65+
else if (slot.Kind == CompletionSlot.SlotKind.Failure)
66+
tcs.SetException(slot.Exception);
2967

30-
_slots.TryAdd(entryIndex, tcs);
68+
_slots.TryAdd(entryIndex, new CompletionSlot(tcs));
3169
}
3270

3371
return tcs;
@@ -36,55 +74,56 @@ public TaskCompletionSource<CompletionResult> Register(int entryIndex)
3674
public TaskCompletionSource<CompletionResult> GetOrRegister(int entryIndex)
3775
{
3876
var slot = _slots.GetOrAdd(entryIndex,
39-
static _ => new TaskCompletionSource<CompletionResult>(TaskCreationOptions.RunContinuationsAsynchronously));
77+
static _ => new CompletionSlot(
78+
new TaskCompletionSource<CompletionResult>(TaskCreationOptions.RunContinuationsAsynchronously)));
4079

41-
if (slot is TaskCompletionSource<CompletionResult> tcs)
42-
return tcs;
80+
if (slot.Kind == CompletionSlot.SlotKind.Tcs)
81+
return slot.Tcs;
4382

4483
// An early completion arrived before we registered — create a pre-resolved TCS.
4584
var earlyTcs = new TaskCompletionSource<CompletionResult>(TaskCreationOptions.RunContinuationsAsynchronously);
46-
if (slot is CompletionResult result)
47-
earlyTcs.SetResult(result);
48-
else if (slot is TerminalException ex)
49-
earlyTcs.SetException(ex);
85+
if (slot.Kind == CompletionSlot.SlotKind.Result)
86+
earlyTcs.SetResult(slot.Result);
87+
else if (slot.Kind == CompletionSlot.SlotKind.Failure)
88+
earlyTcs.SetException(slot.Exception);
5089

5190
// Replace the stored value with the TCS (not strictly required, but keeps the dictionary clean).
52-
_slots.TryUpdate(entryIndex, earlyTcs, slot);
91+
_slots.TryUpdate(entryIndex, new CompletionSlot(earlyTcs), slot);
5392
return earlyTcs;
5493
}
5594

5695
public bool TryComplete(int entryIndex, CompletionResult result)
5796
{
5897
if (_slots.TryRemove(entryIndex, out var slot))
5998
{
60-
if (slot is TaskCompletionSource<CompletionResult> tcs)
61-
return tcs.TrySetResult(result);
99+
if (slot.Kind == CompletionSlot.SlotKind.Tcs)
100+
return slot.Tcs.TrySetResult(result);
62101
}
63102

64103
// No handler registered yet — store the result for later delivery.
65-
_slots.TryAdd(entryIndex, result);
104+
_slots.TryAdd(entryIndex, new CompletionSlot(result));
66105
return true;
67106
}
68107

69108
public bool TryFail(int entryIndex, ushort code, string message)
70109
{
71110
if (_slots.TryRemove(entryIndex, out var slot))
72111
{
73-
if (slot is TaskCompletionSource<CompletionResult> tcs)
74-
return tcs.TrySetException(new TerminalException(message, code));
112+
if (slot.Kind == CompletionSlot.SlotKind.Tcs)
113+
return slot.Tcs.TrySetException(new TerminalException(message, code));
75114
}
76115

77116
// No handler registered yet — store the failure for later delivery.
78-
_slots.TryAdd(entryIndex, new TerminalException(message, code));
117+
_slots.TryAdd(entryIndex, new CompletionSlot(new TerminalException(message, code)));
79118
return true;
80119
}
81120

82121
public void CancelAll()
83122
{
84123
foreach (var pair in _slots)
85124
{
86-
if (pair.Value is TaskCompletionSource<CompletionResult> tcs)
87-
tcs.TrySetCanceled();
125+
if (pair.Value.Kind == CompletionSlot.SlotKind.Tcs)
126+
pair.Value.Tcs.TrySetCanceled();
88127
}
89128

90129
_slots.Clear();

src/Restate.Sdk/Internal/Journal/InvocationJournal.cs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ internal sealed class InvocationJournal : IDisposable
77
private const int DefaultCapacity = 4;
88

99
private JournalEntry[] _entries;
10+
private List<byte[]>? _pooledBuffers;
1011

1112
public InvocationJournal()
1213
{
@@ -28,8 +29,23 @@ public JournalEntry this[int index]
2829
}
2930
}
3031

32+
/// <summary>
33+
/// Tracks a pooled buffer (detached from RawMessage) for batch return on Dispose.
34+
/// </summary>
35+
public void TrackPooledBuffer(byte[] buffer)
36+
{
37+
(_pooledBuffers ??= new List<byte[]>(8)).Add(buffer);
38+
}
39+
3140
public void Dispose()
3241
{
42+
if (_pooledBuffers is not null)
43+
{
44+
foreach (var buf in _pooledBuffers)
45+
ArrayPool<byte>.Shared.Return(buf);
46+
_pooledBuffers = null;
47+
}
48+
3349
if (_entries.Length > 0)
3450
{
3551
ArrayPool<JournalEntry>.Shared.Return(_entries, true);

src/Restate.Sdk/Internal/Protocol/ProtocolReader.cs

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,9 +78,18 @@ private bool TryParseMessage(ref ReadOnlySequence<byte> buffer, out RawMessage m
7878
if (buffer.Length < MessageHeader.Size)
7979
return false;
8080

81-
Span<byte> headerBytes = stackalloc byte[MessageHeader.Size];
82-
buffer.Slice(0, MessageHeader.Size).CopyTo(headerBytes);
83-
_pendingHeader = MessageHeader.Read(headerBytes);
81+
// Fast path: single-segment buffer (common case with Kestrel)
82+
if (buffer.FirstSpan.Length >= MessageHeader.Size)
83+
{
84+
_pendingHeader = MessageHeader.Read(buffer.FirstSpan);
85+
}
86+
else
87+
{
88+
Span<byte> headerBytes = stackalloc byte[MessageHeader.Size];
89+
buffer.Slice(0, MessageHeader.Size).CopyTo(headerBytes);
90+
_pendingHeader = MessageHeader.Read(headerBytes);
91+
}
92+
8493
buffer = buffer.Slice(MessageHeader.Size);
8594
_state = DecoderState.WaitingPayload;
8695
}
@@ -99,7 +108,12 @@ private bool TryParseMessage(ref ReadOnlySequence<byte> buffer, out RawMessage m
99108

100109
var payloadSlice = buffer.Slice(0, _pendingHeader.Length);
101110
var rented = ArrayPool<byte>.Shared.Rent((int)_pendingHeader.Length);
102-
payloadSlice.CopyTo(rented);
111+
112+
// Fast path: single-segment avoids ReadOnlySequence.CopyTo overhead
113+
if (payloadSlice.IsSingleSegment)
114+
payloadSlice.FirstSpan.CopyTo(rented);
115+
else
116+
payloadSlice.CopyTo(rented);
103117

104118
message = RawMessage.Create(_pendingHeader, rented, (int)_pendingHeader.Length);
105119
buffer = buffer.Slice(_pendingHeader.Length);

src/Restate.Sdk/Internal/Protocol/RawMessage.cs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,20 @@ public static RawMessage Create(MessageHeader header, byte[] rentedBuffer, int l
3535
return new RawMessage(header, rentedBuffer, length);
3636
}
3737

38+
/// <summary>
39+
/// Transfers ownership of the pooled payload buffer to the caller.
40+
/// After this call, Dispose() becomes a no-op — the caller is responsible
41+
/// for returning the buffer to ArrayPool when done.
42+
/// </summary>
43+
public (byte[] Buffer, ReadOnlyMemory<byte> Memory) DetachPayload()
44+
{
45+
var buf = _rentedBuffer!;
46+
var mem = buf.AsMemory(0, _payloadLength);
47+
_rentedBuffer = null;
48+
_payloadLength = 0;
49+
return (buf, mem);
50+
}
51+
3852
public void Dispose()
3953
{
4054
if (_rentedBuffer is not null)

0 commit comments

Comments
 (0)