From 7685180402bd93c3dbbec9acdf4613ddeb510348 Mon Sep 17 00:00:00 2001 From: Robert Landers Date: Sat, 1 Jun 2024 15:42:41 +0200 Subject: [PATCH 1/2] poc: multi-threading --- frankenphp.c | 50 ++++++++++++++++++++++++++++++++++++----- frankenphp.go | 62 +++++++++++++++++++++++++++++++++++++++++++++++++-- frankenphp.h | 2 +- 3 files changed, 105 insertions(+), 9 deletions(-) diff --git a/frankenphp.c b/frankenphp.c index e2b7bfa58a..e10b0386d5 100644 --- a/frankenphp.c +++ b/frankenphp.c @@ -726,6 +726,11 @@ sapi_module_struct frankenphp_sapi_module = { STANDARD_SAPI_MODULE_PROPERTIES}; +struct thread_args { + int num_threads; + int num_non_worker_threads; +}; + static void *manager_thread(void *arg) { // SIGPIPE must be masked in non-Go threads: // https://pkg.go.dev/os/signal#hdr-Go_programs_that_use_cgo_or_SWIG @@ -738,7 +743,10 @@ static void *manager_thread(void *arg) { exit(EXIT_FAILURE); } - int num_threads = *((int *)arg); + struct thread_args args = *((struct thread_args*)arg); + + int num_threads = args.num_threads; + int num_non_worker_threads = args.num_non_worker_threads; free(arg); arg = NULL; @@ -770,6 +778,16 @@ static void *manager_thread(void *arg) { threadpool thpool = thpool_init(num_threads); + // handle case where all threads are non-worker threads + if(num_threads == num_non_worker_threads) { + for(int i = 0; i < num_threads; i++) { + thpool_add_work(thpool, go_fetch_and_execute, NULL); + } + + // todo: properly shutdown + return NULL; + } + uintptr_t rh; while ((rh = go_fetch_request())) { thpool_add_work(thpool, go_execute_script, (void *)rh); @@ -798,14 +816,17 @@ static void *manager_thread(void *arg) { return NULL; } -int frankenphp_init(int num_threads) { +int frankenphp_init(int num_threads, int non_worker_threads) { pthread_t thread; - int *num_threads_ptr = calloc(1, sizeof(int)); - *num_threads_ptr = num_threads; + struct thread_args args; + args.num_threads = num_threads; + args.num_non_worker_threads = non_worker_threads; + + struct thread_args* num_threads_ptr = calloc(1, sizeof(args)); + *num_threads_ptr = args; - if (pthread_create(&thread, NULL, *manager_thread, (void *)num_threads_ptr) != - 0) { + if (pthread_create(&thread, NULL, *manager_thread, (void *)num_threads_ptr) != 0) { go_shutdown(); return -1; @@ -815,9 +836,12 @@ int frankenphp_init(int num_threads) { } int frankenphp_request_startup() { + //go_log_s("here we go!"); if (php_request_startup() == SUCCESS) { + //go_log_s("success!"); return SUCCESS; } + //go_log_s("failure!"); frankenphp_server_context *ctx = SG(server_context); SG(server_context) = NULL; @@ -829,7 +853,13 @@ int frankenphp_request_startup() { return FAILURE; } +int req = 0; + int frankenphp_execute_script(char *file_name) { + char str[50]; + int rn = req++; + sprintf(str, "starting %d", rn); + //go_log_s(str); if (frankenphp_request_startup() == FAILURE) { free(file_name); file_name = NULL; @@ -837,6 +867,9 @@ int frankenphp_execute_script(char *file_name) { return FAILURE; } + sprintf(str, "started %d", rn); + //go_log_s(str); + int status = SUCCESS; zend_file_handle file_handle; @@ -859,6 +892,9 @@ int frankenphp_execute_script(char *file_name) { frankenphp_clean_server_context(); frankenphp_request_shutdown(); + sprintf(str, "finished %d", rn); + //go_log_s(str); + return status; } @@ -875,6 +911,8 @@ static char **cli_argv; // Bakken and Zeev Suraski static void cli_register_file_handles(bool no_close) /* {{{ */ { + go_log_s("registering file handles"); + php_stream *s_in, *s_out, *s_err; php_stream_context *sc_in = NULL, *sc_out = NULL, *sc_err = NULL; zend_constant ic, oc, ec; diff --git a/frankenphp.go b/frankenphp.go index 8df3ee4348..185b2e1f64 100644 --- a/frankenphp.go +++ b/frankenphp.go @@ -45,6 +45,8 @@ import ( "strconv" "strings" "sync" + "sync/atomic" + "time" "unsafe" "github.com/maypok86/otter" @@ -317,7 +319,7 @@ func Init(options ...Option) error { done = make(chan struct{}) requestChan = make(chan *http.Request) - if C.frankenphp_init(C.int(opt.numThreads)) != 0 { + if C.frankenphp_init(C.int(opt.numThreads), C.int(opt.numThreads-numWorkers)) != 0 { return MainThreadCreationError } @@ -439,6 +441,8 @@ func updateServerContext(request *http.Request, create bool, mrh C.uintptr_t) er return nil } +var counter int + // ServeHTTP executes a PHP script according to the given context. func ServeHTTP(responseWriter http.ResponseWriter, request *http.Request) error { shutdownWG.Add(1) @@ -475,6 +479,10 @@ func go_fetch_request() C.uintptr_t { return 0 case r := <-requestChan: + count := counter + counter += 1 + logger.Warn("Fetched request", zap.Int("counter", count)) + r = r.WithContext(context.WithValue(r.Context(), "counter__", count)) h := cgo.NewHandle(r) r.Context().Value(handleKey).(*handleList).AddHandle(h) return C.uintptr_t(h) @@ -487,7 +495,55 @@ func maybeCloseContext(fc *FrankenPHPContext) { }) } -// go_execute_script Note: only called in cgi-mode +//export go_log_s +func go_log_s(cstr *C.char) { + // Convert C string to Go string + str := C.GoString(cstr) + + // Log (print) the string + logger.Warn(str) +} + +var conReqs atomic.Int32 + +//export go_fetch_and_execute +func go_fetch_and_execute(rh unsafe.Pointer) { + //logger.Warn("Starting fetch and execute") + + for { + select { + case <-done: + return + case r := <-requestChan: + conReqs.Add(1) + start := time.Now() + logger.Warn("Executing request", zap.Int("total", int(conReqs.Load()))) + fc, ok := FromContext(r.Context()) + handle := cgo.NewHandle(r) + r.Context().Value(handleKey).(*handleList).AddHandle(handle) + if !ok { + panic(InvalidRequestError) + } + + if err := updateServerContext(r, true, 0); err != nil { + panic(err) + } + + fc.exitStatus = C.frankenphp_execute_script(C.CString(fc.scriptFilename)) + if fc.exitStatus < 0 { + panic(ScriptExecutionError) + } + + maybeCloseContext(fc) + r.Context().Value(handleKey).(*handleList).FreeAll() + conReqs.Add(-1) + + logger.Warn("finished execution", zap.Int("total", int(conReqs.Load())), zap.Duration("elapsed", time.Since(start))) + } + } +} + +// go_execute_script Note: only called in cgi-mode and the main worker request // //export go_execute_script func go_execute_script(rh unsafe.Pointer) { @@ -512,6 +568,8 @@ func go_execute_script(rh unsafe.Pointer) { if fc.exitStatus < 0 { panic(ScriptExecutionError) } + + logger.Warn("Handled request", zap.Any("counter", request.Context().Value("counter__"))) } //export go_ub_write diff --git a/frankenphp.h b/frankenphp.h index 8cb3761b09..7516f5a5ae 100644 --- a/frankenphp.h +++ b/frankenphp.h @@ -40,7 +40,7 @@ typedef struct frankenphp_config { } frankenphp_config; frankenphp_config frankenphp_get_config(); -int frankenphp_init(int num_threads); +int frankenphp_init(int num_threads, int non_worker_threads); int frankenphp_update_server_context( bool create, uintptr_t current_request, uintptr_t main_request, From 91a472992653bfd96f26ddc59417347022300fa1 Mon Sep 17 00:00:00 2001 From: Robert Landers Date: Sat, 1 Jun 2024 15:57:09 +0200 Subject: [PATCH 2/2] add more logging --- frankenphp.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/frankenphp.go b/frankenphp.go index 185b2e1f64..0636e98e9f 100644 --- a/frankenphp.go +++ b/frankenphp.go @@ -510,14 +510,16 @@ var conReqs atomic.Int32 func go_fetch_and_execute(rh unsafe.Pointer) { //logger.Warn("Starting fetch and execute") + start := time.Now() + for { select { case <-done: return case r := <-requestChan: conReqs.Add(1) - start := time.Now() - logger.Warn("Executing request", zap.Int("total", int(conReqs.Load()))) + logger.Warn("Executing request", zap.Int("total", int(conReqs.Load())), zap.Duration("elapsed", time.Since(start))) + start = time.Now() fc, ok := FromContext(r.Context()) handle := cgo.NewHandle(r) r.Context().Value(handleKey).(*handleList).AddHandle(handle) @@ -539,6 +541,7 @@ func go_fetch_and_execute(rh unsafe.Pointer) { conReqs.Add(-1) logger.Warn("finished execution", zap.Int("total", int(conReqs.Load())), zap.Duration("elapsed", time.Since(start))) + start = time.Now() } } }