Skip to content

Ox streaming client and server transport#182

Open
kubinio123 wants to merge 3 commits into
152-enrich-server-implementationfrom
151-ox-streaming-client-transport
Open

Ox streaming client and server transport#182
kubinio123 wants to merge 3 commits into
152-enrich-server-implementationfrom
151-ox-streaming-client-transport

Conversation

@kubinio123

Copy link
Copy Markdown
Contributor

DONE:

  • ox client and server transport

@kubinio123 kubinio123 marked this pull request as ready for review June 26, 2026 10:24
@kubinio123 kubinio123 requested review from adamjankaczmarek and adamw and removed request for adamjankaczmarek June 26, 2026 10:24
@adamw

adamw commented Jun 26, 2026

Copy link
Copy Markdown
Member

Review

(Reviewed against the PR's base branch 152-enrich-server-implementation, not master.)

Nice addition — direct-style Ox client/server transports mirroring the ZIO ones, with good doc/example coverage. One thing worth calling out as a positive: OxServerHttpTransport.eventStream scopes its producer fork inside Flow.usingEmit { supervised { … } }, so a client disconnect tears the fork down — it doesn't have the detached-forkDaemon leak the ZIO streaming HTTP transport has. A few findings, most-severe first:

1. GET SSE listener reconnect is unguarded — a failed reconnect crashes the client's supervised scope

client-streaming/client-ox/.../OxClientHttpTransport.scala (getListenerLoop / openGetSseStream)

The background GET SSE listener exists to resiliently reconnect after the connection is cut, but only the drainSse call is wrapped in try/catch. The reconnect attempt itself — openGetSseStreamwithLastEvent.send(backend) — is not. On a transient failure that backend call throws (SttpClientException.ConnectException etc.), and since the loop runs in a supervised forkDiscard, the exception propagates out and tears down the enclosing supervised scope (at best, it silently kills the listener and all further server-pushed notifications).

Failure scenario: the server restarts (or a brief network blip) while a bidirectional Ox client is idle → the listener's reconnect send throws → the user's whole supervised block fails, rather than retrying. The streaming integration test doesn't catch this because it restores the proxy before the listener reconnects, so the reconnect send always succeeds. Wrapping the openGetSseStream call (or its send) in the same retry/catch as drainSse would fix it.

2. Reconnect backoff never escalates

client-streaming/client-ox/.../OxClientHttpTransport.scala (getListenerLoop)

attempt = 0 is reset on every successful openGetSseStream, so reconnectDelay(attempt) is always reconnectDelay(1) = 100ms. A server (or proxy) that accepts the GET stream and then immediately closes it produces a tight ~100ms reconnect loop instead of the intended exponential backoff. The attempt counter only ever reaches 1 before being reset.

3. Stale pinned dependency in example (minor)

examples/src/main/scala/examples/both/serverAndClient.scala

//> using dep com.softwaremill.sttp.tapir::tapir-netty-server-sync:1.13.19 pins tapir 1.13.19 while the project uses tapirV = "1.13.23". Harmless today since the "verify examples compile using Scala CLI" CI step is still commented out, but it'll drift further once that step is re-enabled.

Findings 1–2 are in the Ox HTTP client listener; the server transports and stdio client look solid.

🤖 Generated with Claude Code

@adamw

adamw commented Jun 26, 2026

Copy link
Copy Markdown
Member

Follow-up on findings #1 and #2, after checking the Ox docs — they can be addressed together, and #1 is more severe than I first stated.

Per Ox's structured-concurrency semantics, in a supervised scope any fork that throws (daemon forks like forkDiscard included) ends the whole scope and re-throws, interrupting sibling forks. So the unguarded reconnect in getListenerLoopopenGetSseStreamwithLastEvent.send(backend), which is outside the try/catch that only wraps drainSse — doesn't just stop the SSE listener on a transient failure: it tears down the caller's supervised block and takes the whole session down with it. (So scratch my "at best silently kills the listener" hedge — it definitively propagates.)

The idiomatic fix folds both findings together. The hand-rolled while + attempt counter + reconnectDelay is reimplementing ox.resilience.retry with a Schedule, and the attempt = 0 reset on every successful open is why the backoff never escalates (#2). Replacing it with something like:

retry(Schedule.exponentialBackoff(100.millis).jitter().maxInterval(30.seconds)):
  openGetSseStream(lastEventId.get())
    .foreach(drainSse(_, id => lastEventId.set(Some(id))))

(terminating on closing, with the connect and drain both inside the retried block) closes the unguarded-reconnect hole and restores real exponential backoff in one go.

For contrast, the server side (OxServerHttpTransport) already follows the Ox "short-lived, per-request scope" guidance with Flow.usingEmit { supervised { … } }, so its producer fork is torn down on client disconnect — no change needed there.

🤖 Generated with Claude Code

uri: Uri,
protocolVersion: ProtocolVersion,
timeout: FiniteDuration,
scope: Ox,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks fishy. If this needs to be run within a scope, use using Ox. But maybe it doesn't need to run any forks in the parent scope (perferred ;) )

protocolVersion: ProtocolVersion,
timeout: FiniteDuration,
scope: Ox,
sessionId: AtomicReference[Option[String]],

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's a lot of atomics here. Maybe they could be somehow extract to an actor?


class OxMcpClientHttpIntegrationSpec extends McpClientStreamingHttpIntegrationSpec[Identity, SyncBackend] with SyncToFuture:

override def usingBackend[A](use: SyncBackend => Identity[A]): Identity[A] =

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Identity[A] is just A

Comment thread docs/client/examples.md
supervised:
val backend = DefaultSyncBackend()
val transport = OxClientHttpTransport(backend, uri"http://localhost:8080/mcp")
val client = McpClient.bidirectional[Identity](

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doi you need the type param here? it should be inferred basing on the transport

}
```

## Bidirectional client over an Ox streaming transport

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

usually uyou only need to modify the docs in docs/, these get updated on release

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants