Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
10418d8
fix(realtime): gracefully recover from ECONNRESET errors when sending…
ericallam Oct 17, 2025
fb2d548
Add support for multiple writers to a single stream by removing the E…
ericallam Oct 17, 2025
4bdda7a
Make the MetadataStream client more robust to failure and add tests
ericallam Oct 17, 2025
278aad2
Make the stream client more resilient and robust, including implement…
ericallam Oct 18, 2025
de51c26
Add some more streaming examples and markdown streaming
ericallam Oct 18, 2025
17ffdf3
s2 WIP
ericallam Oct 20, 2025
500bf15
Added realtimeStreams column to TaskRun to replace using metadata for…
ericallam Oct 21, 2025
f56ad0c
Write to s2 from the client instead of the server
ericallam Oct 21, 2025
bbdbe71
WIP
ericallam Oct 22, 2025
d6ef9be
Add env var
ericallam Oct 22, 2025
dccd43f
Loads more stuff
ericallam Oct 23, 2025
c084c1a
The stream.read() span now works better when specifying a startIndex
ericallam Oct 23, 2025
d6af637
WIP
ericallam Oct 23, 2025
33887cc
Configure the waitUntil timeout via an env var
ericallam Oct 23, 2025
a4ab169
Return stream parts from SSE class
ericallam Oct 23, 2025
fceefd9
Adds new streams icon
samejr Oct 23, 2025
5f4de7f
Layout improvements to streams inspector
samejr Oct 23, 2025
3ff8dc8
Improve layout of streams inspector
samejr Oct 23, 2025
3516897
Remove tabs if only Overview is shown
samejr Oct 23, 2025
16c17e0
Added compact view for streams and sticky copy button
ericallam Oct 23, 2025
12ddd4b
Add AI SDK demo
ericallam Oct 23, 2025
8c1e384
experiment_throttle is now just throttle
ericallam Oct 23, 2025
908dd1c
Show textwrapping, copy and modal buttons on the Properties code blocks
samejr Oct 24, 2025
d299a38
Use simplr library for plurals and flex wrap the heading info nicely
samejr Oct 24, 2025
1f00978
Moves toggle button functionality into the header with new icons
samejr Oct 24, 2025
9f837b5
Make sure the scroll view knows if it’s at the top or bottom even if …
samejr Oct 24, 2025
2303ec2
Improve loading states
samejr Oct 24, 2025
ad0954f
Add divide between heads and inspector content
samejr Oct 24, 2025
77cb137
Remove 0 padding
samejr Oct 24, 2025
6c30fda
Adds nice behaviour if a long stream key is used
samejr Oct 24, 2025
4253f84
Disable the header buttons if the content is loading
samejr Oct 24, 2025
8b3b8a5
Fixed failing test
ericallam Oct 25, 2025
6bd2354
Improved X-Resume-From-Chunk header parsing
ericallam Oct 25, 2025
92e85d8
Unify inactivity timeout threshold
ericallam Oct 25, 2025
c0f7612
If v2 streams is requested, throw an error if S2 env vars are not set
ericallam Oct 25, 2025
0326f5f
Use implicit radix arg when calling parseInt
ericallam Oct 25, 2025
83823e8
Consistent API client creation
ericallam Oct 25, 2025
81ea750
Normalize the stream source to an async iterable before passing to th…
ericallam Oct 25, 2025
3fc6e96
Refactor the metadata streams stuff to be better
ericallam Oct 25, 2025
a8110c4
properly abort streams when the waitUntil timeout occurs
ericallam Oct 25, 2025
178b490
fix the new configurable waitUntil timeout
ericallam Oct 25, 2025
234bd93
prevent memory leaks by cleaning up responses and requests
ericallam Oct 25, 2025
fd40ab7
Fix timer leak
ericallam Oct 25, 2025
4c76837
use server provided options for the s2 writer
ericallam Oct 25, 2025
f0d5e42
s2 stream writer now handles abort signals
ericallam Oct 25, 2025
581135c
Fixed core tests
ericallam Oct 25, 2025
5dcf303
No need to use keys and a Map for stream management
ericallam Oct 25, 2025
9286e9b
Fixed runStream tests
ericallam Oct 25, 2025
9e5460d
Implement TRIGGER_V2_REALTIME_STREAMS env var
ericallam Oct 25, 2025
bd18134
New streams API (sync pipe with default)
ericallam Nov 2, 2025
57db0a6
Fixed useRealtimeStream issue because of overloading
ericallam Nov 2, 2025
f80e4c0
Prevent initializing a realtime stream on a completed run
ericallam Nov 2, 2025
d4d7f01
Add streams.append
ericallam Nov 2, 2025
3cc9ba6
Nicer API and update the s2 writer to use the new append session
ericallam Nov 5, 2025
5a4deac
Use a readable stream directly
ericallam Nov 5, 2025
396f325
Implement s2 chunk deduping
ericallam Nov 5, 2025
d4e1c98
Fix tests
ericallam Nov 8, 2025
37eb03a
couple of fixes
ericallam Nov 8, 2025
e979f52
Improve stream dashboard perf with virtualization
ericallam Nov 10, 2025
a0c3963
Fix streams.append for s2, and a bunch of other little things, plus t…
ericallam Nov 10, 2025
b5db525
close the controller when signal aborts
ericallam Nov 10, 2025
967f418
Fix waitForStreams async executor
ericallam Nov 10, 2025
557b48a
change info logs to debug
ericallam Nov 10, 2025
1626195
fix streams test
ericallam Nov 10, 2025
211d804
Update changeset to be a minor update
ericallam Nov 11, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Write to s2 from the client instead of the server
  • Loading branch information
ericallam committed Nov 11, 2025
commit f56ad0c7000f359db5d0bdc3a53c9759633d53fe
2 changes: 1 addition & 1 deletion apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1233,7 +1233,7 @@ const EnvironmentSchema = z
.enum(["log", "error", "warn", "info", "debug"])
.default("info"),
REALTIME_STREAMS_S2_FLUSH_INTERVAL_MS: z.coerce.number().int().default(100),
REALTIME_STREAMS_S2_RESUME_TTL_SECONDS: z.coerce.number().int().default(86400),
REALTIME_STREAMS_S2_MAX_RETRIES: z.coerce.number().int().default(10),
})
.and(GithubAppEnvSchema)
.and(S2EnvSchema);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { json } from "@remix-run/server-runtime";
import { z } from "zod";
import { $replica } from "~/db.server";
import { $replica, prisma } from "~/db.server";
import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server";
import {
createActionApiRoute,
Expand Down Expand Up @@ -53,26 +54,58 @@ const { action } = createActionApiRoute(
return new Response("Target not found", { status: 404 });
}

// Extract client ID from header, default to "default" if not provided
const clientId = request.headers.get("X-Client-Id") || "default";
const streamVersion = request.headers.get("X-Stream-Version") || "v1";

if (!request.body) {
return new Response("No body provided", { status: 400 });
}
if (request.method === "PUT") {
// This is the "create" endpoint
const updatedRun = await prisma.taskRun.update({
where: {
friendlyId: targetId,
runtimeEnvironmentId: authentication.environment.id,
},
data: {
realtimeStreams: {
push: params.streamId,
},
},
select: {
realtimeStreamsVersion: true,
},
});

const resumeFromChunk = request.headers.get("X-Resume-From-Chunk");
const resumeFromChunkNumber = resumeFromChunk ? parseInt(resumeFromChunk, 10) : undefined;
const realtimeStream = getRealtimeStreamInstance(
authentication.environment,
updatedRun.realtimeStreamsVersion
);

const realtimeStream = getRealtimeStreamInstance(authentication.environment, streamVersion);
const { responseHeaders } = await realtimeStream.initializeStream(targetId, params.streamId);

return realtimeStream.ingestData(
request.body,
targetId,
params.streamId,
clientId,
resumeFromChunkNumber
);
return json(
{
version: updatedRun.realtimeStreamsVersion,
},
{ status: 202, headers: responseHeaders }
);
} else {
// Extract client ID from header, default to "default" if not provided
const clientId = request.headers.get("X-Client-Id") || "default";
const streamVersion = request.headers.get("X-Stream-Version") || "v1";

if (!request.body) {
return new Response("No body provided", { status: 400 });
}

const resumeFromChunk = request.headers.get("X-Resume-From-Chunk");
const resumeFromChunkNumber = resumeFromChunk ? parseInt(resumeFromChunk, 10) : undefined;

const realtimeStream = getRealtimeStreamInstance(authentication.environment, streamVersion);

return realtimeStream.ingestData(
request.body,
targetId,
params.streamId,
clientId,
resumeFromChunkNumber
);
}
}
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
this.inactivityTimeoutMs = options.inactivityTimeoutMs ?? 60000; // Default: 60 seconds
}

async initializeStream(
runId: string,
streamId: string
): Promise<{ responseHeaders?: Record<string, string> }> {
return {};
}

async streamResponse(
request: Request,
runId: string,
Expand Down
Loading