From bd1f1322fa81b625f2f92378cdaf9b12825b9198 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?K=C3=A9vin=20Dunglas?= Date: Sat, 13 Sep 2025 13:57:37 +0200 Subject: [PATCH 1/3] Simplify --- threadFramework.go | 61 +++++++++++++++++----------------------------- worker.go | 2 +- 2 files changed, 24 insertions(+), 39 deletions(-) diff --git a/threadFramework.go b/threadFramework.go index 6a3315a6f7..ff6032d5b0 100644 --- a/threadFramework.go +++ b/threadFramework.go @@ -60,49 +60,34 @@ func RegisterExternalWorker(worker WorkerExtension) { // startExternalWorkerPipe creates a pipe from an external worker to the main worker. func startExternalWorkerPipe(w *worker, externalWorker WorkerExtension, thread *phpThread) { - go func() { - defer func() { - if r := recover(); r != nil { - logger.LogAttrs(context.Background(), slog.LevelError, "external worker pipe panicked", slog.String("worker", w.name), slog.Any("panic", r)) - } - }() + for { + rq := externalWorker.ProvideRequest() - for { - var rq *WorkerRequest - func() { - defer func() { - if r := recover(); r != nil { - logger.LogAttrs(context.Background(), slog.LevelError, "ProvideRequest panicked", slog.String("worker", w.name), slog.Any("panic", r)) - rq = nil - } - }() - rq = externalWorker.ProvideRequest() - }() + if rq == nil || rq.Request == nil { + logger.LogAttrs(context.Background(), slog.LevelWarn, "external worker provided nil request", slog.String("worker", w.name)) + continue + } - if rq == nil || rq.Request == nil { - logger.LogAttrs(context.Background(), slog.LevelWarn, "external worker provided nil request", slog.String("worker", w.name)) - continue - } + r := rq.Request + fr, err := NewRequestWithContext(r, WithOriginalRequest(r), WithWorkerName(w.name)) + if err != nil { + logger.LogAttrs(context.Background(), slog.LevelError, "error creating request for external worker", slog.String("worker", w.name), slog.Any("error", err)) + continue + } - r := rq.Request - fr, err := NewRequestWithContext(r, WithOriginalRequest(r), WithWorkerName(w.name)) - if err != nil { - logger.LogAttrs(context.Background(), slog.LevelError, "error creating request for external worker", slog.String("worker", w.name), slog.Any("error", err)) - continue - } + if fc, ok := fromContext(fr.Context()); ok { + fc.responseWriter = rq.Response - if fc, ok := fromContext(fr.Context()); ok { - fc.responseWriter = rq.Response + // Queue the request and wait for completion if Done channel was provided + logger.LogAttrs(context.Background(), slog.LevelInfo, "queue the external worker request", slog.String("worker", w.name)) - // Queue the request and wait for completion if Done channel was provided - w.requestChan <- fc - if rq.Done != nil { - go func() { - <-fc.done - close(rq.Done) - }() - } + w.requestChan <- fc + if rq.Done != nil { + go func() { + <-fc.done + close(rq.Done) + }() } } - }() + } } diff --git a/worker.go b/worker.go index 39417e6570..dcc07444d9 100644 --- a/worker.go +++ b/worker.go @@ -55,7 +55,7 @@ func initWorkers(opt []workerOpt) error { // create a pipe from the external worker to the main worker // note: this is locked to the initial thread size the external worker requested if workerThread, ok := thread.handler.(*workerThread); ok && workerThread.externalWorker != nil { - startExternalWorkerPipe(w, workerThread.externalWorker, thread) + go startExternalWorkerPipe(w, workerThread.externalWorker, thread) } workersReady.Done() }() From db84f5e30519114e300c4a0e33201feb3dc50942 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?K=C3=A9vin=20Dunglas?= Date: Sat, 13 Sep 2025 14:11:15 +0200 Subject: [PATCH 2/3] remove unused variable --- threadFramework.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/threadFramework.go b/threadFramework.go index ff6032d5b0..2e37151b78 100644 --- a/threadFramework.go +++ b/threadFramework.go @@ -59,7 +59,7 @@ func RegisterExternalWorker(worker WorkerExtension) { } // startExternalWorkerPipe creates a pipe from an external worker to the main worker. -func startExternalWorkerPipe(w *worker, externalWorker WorkerExtension, thread *phpThread) { +func startExternalWorkerPipe(w *worker, externalWorker WorkerExtension) { for { rq := externalWorker.ProvideRequest() From b3861b5247a37b75552e192462b37d3efb8ac14f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?K=C3=A9vin=20Dunglas?= Date: Sat, 13 Sep 2025 14:15:56 +0200 Subject: [PATCH 3/3] log thread index --- threadFramework.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/threadFramework.go b/threadFramework.go index 2e37151b78..07badf8bce 100644 --- a/threadFramework.go +++ b/threadFramework.go @@ -59,19 +59,19 @@ func RegisterExternalWorker(worker WorkerExtension) { } // startExternalWorkerPipe creates a pipe from an external worker to the main worker. -func startExternalWorkerPipe(w *worker, externalWorker WorkerExtension) { +func startExternalWorkerPipe(w *worker, externalWorker WorkerExtension, thread *phpThread) { for { rq := externalWorker.ProvideRequest() if rq == nil || rq.Request == nil { - logger.LogAttrs(context.Background(), slog.LevelWarn, "external worker provided nil request", slog.String("worker", w.name)) + logger.LogAttrs(context.Background(), slog.LevelWarn, "external worker provided nil request", slog.String("worker", w.name), slog.Int("thread", thread.threadIndex)) continue } r := rq.Request fr, err := NewRequestWithContext(r, WithOriginalRequest(r), WithWorkerName(w.name)) if err != nil { - logger.LogAttrs(context.Background(), slog.LevelError, "error creating request for external worker", slog.String("worker", w.name), slog.Any("error", err)) + logger.LogAttrs(context.Background(), slog.LevelError, "error creating request for external worker", slog.String("worker", w.name), slog.Int("thread", thread.threadIndex), slog.Any("error", err)) continue } @@ -79,7 +79,7 @@ func startExternalWorkerPipe(w *worker, externalWorker WorkerExtension) { fc.responseWriter = rq.Response // Queue the request and wait for completion if Done channel was provided - logger.LogAttrs(context.Background(), slog.LevelInfo, "queue the external worker request", slog.String("worker", w.name)) + logger.LogAttrs(context.Background(), slog.LevelInfo, "queue the external worker request", slog.String("worker", w.name), slog.Int("thread", thread.threadIndex)) w.requestChan <- fc if rq.Done != nil {