Skip to content

Commit 3acb8ea

Browse files
e5lclaude
andauthored
KTOR-8938 Inherit server coroutine context in WebSocket session (#5426)
* KTOR-8938 Add failing test for WebSocket context inheritance Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * KTOR-8938 Inherit server coroutine context in WebSocket session The WebSocket session's coroutineContext was built solely from the engine dispatcher, dropping custom CoroutineContext elements from the server/call scope. Use call.coroutineContext as the base so user-provided elements are preserved. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * KTOR-8938 Fix import ordering in WebSocketEngineSuite Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * KTOR-8938 Preserve original Job parent in WebSocket context fix Strip the call's Job before merging its context, and restore the original Job from the engine's processing coroutine. This ensures custom context elements are inherited without changing the WebSocket session's Job parent-child relationship. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * KTOR-8938 Use call coroutine context including Job for WebSocket session Use call.coroutineContext directly (with its Job) so that the WebSocket session is a proper child of the server's coroutine scope. This ensures cancellation propagates correctly from server to WebSocket sessions. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * KTOR-8938 Add test for WebSocket session cancellation on server stop Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * KTOR-8938 Make server-stop cancellation test open, skip on Tomcat Tomcat's servlet-based WebSocket upgrade creates an upgradeJob with no parent connection to the server scope, so server stop does not propagate cancellation to WebSocket sessions. This is a pre-existing Tomcat limitation unrelated to this fix. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * KTOR-8938 Fix WebSocket cancellation on server stop for servlet engines Pass the servlet's SupervisorJob as parent for the upgrade handler's Job, ensuring cancellation propagates when the servlet is destroyed. Use onCancelling=true for WebConnection cleanup to break the deadlock between blocked IO reads and Job completion. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * KTOR-8938 Skip server-stop cancellation test on JS/WASM server.stop() calls runBlocking internally via runBlockingBridge, which throws on JS and WASM platforms. Skip the test on those targets with a runtime platform check. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * KTOR-8938 Narrow @OptIn(InternalCoroutinesApi::class) scope to init method Move the annotation from class-level to the specific method that uses invokeOnCompletion(onCancelling = true), in both servlet and servlet-jakarta upgrade handlers. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * KTOR-8938 Use stopSuspend instead of stop in WebSocket test - Replace server.stop() with server.stopSuspend() as suggested by reviewer - Remove JS/WASM platform check since stopSuspend doesn't use runBlocking - Remove open modifier from test function (no longer needed for overrides) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent cec7d38 commit 3acb8ea

6 files changed

Lines changed: 70 additions & 5 deletions

File tree

ktor-server/ktor-server-plugins/ktor-server-websockets/common/src/io/ktor/server/websocket/WebSocketUpgrade.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ public class WebSocketUpgrade(
9595
output,
9696
plugin.maxFrameSize,
9797
plugin.masking,
98-
coroutineContext = engineContext + (coroutineContext[Job] ?: EmptyCoroutineContext),
98+
coroutineContext = call.coroutineContext + engineContext,
9999
channelsConfig = plugin.channelsConfig
100100
)
101101

ktor-server/ktor-server-servlet-jakarta/jvm/src/io/ktor/server/servlet/jakarta/AsyncServlet.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,9 @@ public open class AsyncServletApplicationResponse(
125125
(call.request as AsyncServletApplicationRequest).upgraded()
126126
completed = true
127127

128-
servletUpgradeImpl.performUpgrade(upgrade, servletRequest, servletResponse, engineContext, userContext)
128+
val servletJob = coroutineContext[Job]?.parent
129+
val upgradeEngineContext = if (servletJob != null) engineContext + servletJob else engineContext
130+
servletUpgradeImpl.performUpgrade(upgrade, servletRequest, servletResponse, upgradeEngineContext, userContext)
129131
}
130132

131133
@UseHttp2Push

ktor-server/ktor-server-servlet-jakarta/jvm/src/io/ktor/server/servlet/jakarta/ServletUpgrade.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,13 +84,14 @@ public class ServletUpgradeHandler : HttpUpgradeHandler, CoroutineScope {
8484

8585
override val coroutineContext: CoroutineContext get() = upgradeJob
8686

87+
@OptIn(InternalCoroutinesApi::class)
8788
override fun init(webConnection: WebConnection?) {
8889
if (webConnection == null) {
8990
throw IllegalArgumentException("Upgrade processing requires WebConnection instance")
9091
}
9192

9293
upgradeJob = Job(up.engineContext[Job])
93-
upgradeJob.invokeOnCompletion {
94+
upgradeJob.invokeOnCompletion(onCancelling = true) {
9495
webConnection.close()
9596
}
9697

ktor-server/ktor-server-servlet/jvm/src/io/ktor/server/servlet/AsyncServlet.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,9 @@ public open class AsyncServletApplicationResponse(
125125
(call.request as AsyncServletApplicationRequest).upgraded()
126126
completed = true
127127

128-
servletUpgradeImpl.performUpgrade(upgrade, servletRequest, servletResponse, engineContext, userContext)
128+
val servletJob = coroutineContext[Job]?.parent
129+
val upgradeEngineContext = if (servletJob != null) engineContext + servletJob else engineContext
130+
servletUpgradeImpl.performUpgrade(upgrade, servletRequest, servletResponse, upgradeEngineContext, userContext)
129131
}
130132

131133
@UseHttp2Push

ktor-server/ktor-server-servlet/jvm/src/io/ktor/server/servlet/ServletUpgrade.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,13 +84,14 @@ public class ServletUpgradeHandler : HttpUpgradeHandler, CoroutineScope {
8484

8585
override val coroutineContext: CoroutineContext get() = upgradeJob
8686

87+
@OptIn(InternalCoroutinesApi::class)
8788
override fun init(webConnection: WebConnection?) {
8889
if (webConnection == null) {
8990
throw IllegalArgumentException("Upgrade processing requires WebConnection instance")
9091
}
9192

9293
upgradeJob = Job(up.engineContext[Job])
93-
upgradeJob.invokeOnCompletion {
94+
upgradeJob.invokeOnCompletion(onCancelling = true) {
9495
webConnection.close()
9596
}
9697

ktor-server/ktor-server-test-suites/common/src/io/ktor/server/testing/suites/WebSocketEngineSuite.kt

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import kotlinx.coroutines.*
2121
import kotlinx.coroutines.CancellationException
2222
import kotlinx.coroutines.channels.*
2323
import kotlinx.io.*
24+
import kotlin.coroutines.*
2425
import kotlin.random.*
2526
import kotlin.test.*
2627
import kotlin.time.Duration.Companion.milliseconds
@@ -653,6 +654,62 @@ abstract class WebSocketEngineSuite<TEngine : ApplicationEngine, TConfiguration
653654
checkFrame(second.await())
654655
}
655656

657+
@Test
658+
fun testWebSocketSessionInheritsServerCoroutineContext() = runTest {
659+
val result = CompletableDeferred<Boolean>()
660+
661+
val customElement = object : AbstractCoroutineContextElement(CustomTestElement) {}
662+
663+
createAndStartServer(parent = customElement) {
664+
webSocket("/") {
665+
val hasElement = coroutineContext[CustomTestElement] != null
666+
result.complete(hasElement)
667+
}
668+
}
669+
670+
useSocket {
671+
negotiateHttpWebSocket()
672+
673+
output.apply {
674+
// close frame with code 1000
675+
writeHex("0x88 0x02 0x03 0xe8")
676+
flush()
677+
}
678+
679+
assertCloseFrame()
680+
}
681+
682+
assertTrue(result.await(), "WebSocket session should inherit custom coroutine context elements from server")
683+
}
684+
685+
@Test
686+
fun testWebSocketSessionCancelledOnServerStop() = runTest {
687+
val sessionStarted = CompletableDeferred<Unit>()
688+
val sessionCancelled = CompletableDeferred<Unit>()
689+
690+
createAndStartServer {
691+
webSocket("/") {
692+
sessionStarted.complete(Unit)
693+
try {
694+
incoming.consumeEach {}
695+
} finally {
696+
sessionCancelled.complete(Unit)
697+
}
698+
}
699+
}
700+
701+
useSocket {
702+
negotiateHttpWebSocket()
703+
704+
sessionStarted.await()
705+
server!!.stopSuspend(0, 0)
706+
707+
withTimeout(5000) {
708+
sessionCancelled.await()
709+
}
710+
}
711+
}
712+
656713
@Test
657714
fun testCorruptFrameWithBadOpcode() = runTest {
658715
createAndStartServer {
@@ -830,6 +887,8 @@ internal suspend fun ByteWriteChannel.writeFrameTest(frame: Frame, masking: Bool
830887

831888
internal fun Boolean.flagAt(at: Int) = if (this) 1 shl at else 0
832889

890+
private object CustomTestElement : CoroutineContext.Key<AbstractCoroutineContextElement>
891+
833892
private fun Source.mask(maskKey: Int): Source = withMemory(4) { maskMemory ->
834893
maskMemory.storeIntAt(0, maskKey)
835894
buildPacket {

0 commit comments

Comments
 (0)