diff --git a/.github/workflows/lint.yaml b/.github/workflows/lint.yaml index 37cd22c8a4..e75623fce4 100644 --- a/.github/workflows/lint.yaml +++ b/.github/workflows/lint.yaml @@ -32,7 +32,6 @@ jobs: DEFAULT_BRANCH: main GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} LINTER_RULES_PATH: / - FILTER_REGEX_EXCLUDE: '.*C-Thread-Pool/.*' MARKDOWN_CONFIG_FILE: .markdown-lint.yaml VALIDATE_CPP: false VALIDATE_JSCPD: false diff --git a/C-Thread-Pool/LICENSE b/C-Thread-Pool/LICENSE deleted file mode 100644 index bf4494c15b..0000000000 --- a/C-Thread-Pool/LICENSE +++ /dev/null @@ -1,21 +0,0 @@ -The MIT License (MIT) - -Copyright (c) 2016 Johan Hanssen Seferidis - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. diff --git a/C-Thread-Pool/README.md b/C-Thread-Pool/README.md deleted file mode 100644 index dc710b7e0b..0000000000 --- a/C-Thread-Pool/README.md +++ /dev/null @@ -1,70 +0,0 @@ -[![GitHub Actions](https://github.com/Pithikos/C-Thread-Pool/actions/workflows/tests.yml/badge.svg?branch=master)](https://github.com/Pithikos/C-Thread-Pool/actions?query=workflow%3Atests+branch%3Amaster) - - -# C Thread Pool - -This is a minimal but advanced threadpool implementation. - - * ANCI C and POSIX compliant - * Pause/resume/wait as you like - * Simple easy-to-digest API - * Well tested - -The threadpool is under MIT license. Notice that this project took a considerable amount of work and sacrifice of my free time and the reason I give it for free (even for commercial use) is so when you become rich and wealthy you don't forget about us open-source creatures of the night. Cheers! - -If this project reduced your development time feel free to buy me a coffee. - -[![Donate](https://www.paypal.com/en_US/i/btn/x-click-but21.gif)](https://www.paypal.me/seferidis) - - -## Run an example - -The library is not precompiled so you have to compile it with your project. The thread pool -uses POSIX threads so if you compile with gcc on Linux you have to use the flag `-pthread` like this: - - gcc example.c thpool.c -D THPOOL_DEBUG -pthread -o example - - -Then run the executable like this: - - ./example - - -## Basic usage - -1. Include the header in your source file: `#include "thpool.h"` -2. Create a thread pool with number of threads you want: `threadpool thpool = thpool_init(4);` -3. Add work to the pool: `thpool_add_work(thpool, (void*)function_p, (void*)arg_p);` - -The workers(threads) will start their work automatically as fast as there is new work -in the pool. If you want to wait for all added work to be finished before continuing -you can use `thpool_wait(thpool);`. If you want to destroy the pool you can use -`thpool_destroy(thpool);`. - - -## API - -For a deeper look into the documentation check in the [thpool.h](https://github.com/Pithikos/C-Thread-Pool/blob/master/thpool.h) file. Below is a fast practical overview. - -| Function example | Description | -|---------------------------------|---------------------------------------------------------------------| -| ***thpool_init(4)*** | Will return a new threadpool with `4` threads. | -| ***thpool_add_work(thpool, (void*)function_p, (void*)arg_p)*** | Will add new work to the pool. Work is simply a function. You can pass a single argument to the function if you wish. If not, `NULL` should be passed. | -| ***thpool_wait(thpool)*** | Will wait for all jobs (both in queue and currently running) to finish. | -| ***thpool_destroy(thpool)*** | This will destroy the threadpool. If jobs are currently being executed, then it will wait for them to finish. | -| ***thpool_pause(thpool)*** | All threads in the threadpool will pause no matter if they are idle or executing work. | -| ***thpool_resume(thpool)*** | If the threadpool is paused, then all threads will resume from where they were. | -| ***thpool_num_threads_working(thpool)*** | Will return the number of currently working threads. | - - -## Contribution - -You are very welcome to contribute. If you have a new feature in mind, you can always open an issue on github describing it so you don't end up doing a lot of work that might not be eventually merged. Generally we are very open to contributions as long as they follow the below keypoints. - -* Try to keep the API as minimal as possible. That means if a feature or fix can be implemented without affecting the existing API but requires more development time, then we will opt to sacrifice development time. -* Solutions need to be POSIX compliant. The thread-pool is advertised as such so it makes sense that it actually is. -* For coding style simply try to stick to the conventions you find in the existing codebase. -* Tests: A new fix or feature should be covered by tests. If the existing tests are not sufficient, we expect an according test to follow with the pull request. -* Documentation: for a new feature please add documentation. For an API change the documentation has to be thorough and super easy to understand. - -If you wish to **get access as a collaborator** feel free to mention it in the issue https://github.com/Pithikos/C-Thread-Pool/issues/78 diff --git a/C-Thread-Pool/thpool.c b/C-Thread-Pool/thpool.c deleted file mode 100644 index 83885c3777..0000000000 --- a/C-Thread-Pool/thpool.c +++ /dev/null @@ -1,571 +0,0 @@ -/* ******************************** - * Author: Johan Hanssen Seferidis - * License: MIT - * Description: Library providing a threading pool where you can add - * work. For usage, check the thpool.h file or README.md - * - *//** @file thpool.h *//* - * - ********************************/ - -#if defined(__APPLE__) -#include -#else -#ifndef _POSIX_C_SOURCE -#define _POSIX_C_SOURCE 200809L -#endif -#ifndef _XOPEN_SOURCE -#define _XOPEN_SOURCE 500 -#endif -#endif -#include -#include -#include -#include -#include -#include -#include -#if defined(__linux__) -#include -#endif -#if defined(__FreeBSD__) || defined(__OpenBSD__) -#include -#endif - -#include "thpool.h" - -#ifdef THPOOL_DEBUG -#define THPOOL_DEBUG 1 -#else -#define THPOOL_DEBUG 0 -#endif - -#if !defined(DISABLE_PRINT) || defined(THPOOL_DEBUG) -#define err(str) fprintf(stderr, str) -#else -#define err(str) -#endif - -#ifndef THPOOL_THREAD_NAME -#define THPOOL_THREAD_NAME thpool -#endif - -#define STRINGIFY(x) #x -#define TOSTRING(x) STRINGIFY(x) - -static volatile int threads_keepalive; -static volatile int threads_on_hold; - - - -/* ========================== STRUCTURES ============================ */ - - -/* Binary semaphore */ -typedef struct bsem { - pthread_mutex_t mutex; - pthread_cond_t cond; - int v; -} bsem; - - -/* Job */ -typedef struct job{ - struct job* prev; /* pointer to previous job */ - void (*function)(void* arg); /* function pointer */ - void* arg; /* function's argument */ -} job; - - -/* Job queue */ -typedef struct jobqueue{ - pthread_mutex_t rwmutex; /* used for queue r/w access */ - job *front; /* pointer to front of queue */ - job *rear; /* pointer to rear of queue */ - bsem *has_jobs; /* flag as binary semaphore */ - int len; /* number of jobs in queue */ -} jobqueue; - - -/* Thread */ -typedef struct thread{ - int id; /* friendly id */ - pthread_t pthread; /* pointer to actual thread */ - struct thpool_* thpool_p; /* access to thpool */ -} thread; - - -/* Threadpool */ -typedef struct thpool_{ - thread** threads; /* pointer to threads */ - volatile int num_threads_alive; /* threads currently alive */ - volatile int num_threads_working; /* threads currently working */ - pthread_mutex_t thcount_lock; /* used for thread count etc */ - pthread_cond_t threads_all_idle; /* signal to thpool_wait */ - jobqueue jobqueue; /* job queue */ -} thpool_; - - - - - -/* ========================== PROTOTYPES ============================ */ - - -static int thread_init(thpool_* thpool_p, struct thread** thread_p, int id); -static void* thread_do(struct thread* thread_p); -static void thread_hold(int sig_id); -static void thread_destroy(struct thread* thread_p); - -static int jobqueue_init(jobqueue* jobqueue_p); -static void jobqueue_clear(jobqueue* jobqueue_p); -static void jobqueue_push(jobqueue* jobqueue_p, struct job* newjob_p); -static struct job* jobqueue_pull(jobqueue* jobqueue_p); -static void jobqueue_destroy(jobqueue* jobqueue_p); - -static void bsem_init(struct bsem *bsem_p, int value); -static void bsem_reset(struct bsem *bsem_p); -static void bsem_post(struct bsem *bsem_p); -static void bsem_post_all(struct bsem *bsem_p); -static void bsem_wait(struct bsem *bsem_p); - - - - - -/* ========================== THREADPOOL ============================ */ - - -/* Initialise thread pool */ -struct thpool_* thpool_init(int num_threads){ - - threads_on_hold = 0; - threads_keepalive = 1; - - if (num_threads < 0){ - num_threads = 0; - } - - /* Make new thread pool */ - thpool_* thpool_p; - thpool_p = (struct thpool_*)malloc(sizeof(struct thpool_)); - if (thpool_p == NULL){ - err("thpool_init(): Could not allocate memory for thread pool\n"); - return NULL; - } - thpool_p->num_threads_alive = 0; - thpool_p->num_threads_working = 0; - - /* Initialise the job queue */ - if (jobqueue_init(&thpool_p->jobqueue) == -1){ - err("thpool_init(): Could not allocate memory for job queue\n"); - free(thpool_p); - return NULL; - } - - /* Make threads in pool */ - thpool_p->threads = (struct thread**)malloc(num_threads * sizeof(struct thread *)); - if (thpool_p->threads == NULL){ - err("thpool_init(): Could not allocate memory for threads\n"); - jobqueue_destroy(&thpool_p->jobqueue); - free(thpool_p); - return NULL; - } - - pthread_mutex_init(&(thpool_p->thcount_lock), NULL); - pthread_cond_init(&thpool_p->threads_all_idle, NULL); - - /* Thread init */ - int n; - for (n=0; nthreads[n], n); -#if THPOOL_DEBUG - printf("THPOOL_DEBUG: Created thread %d in pool \n", n); -#endif - } - - /* Wait for threads to initialize */ - while (thpool_p->num_threads_alive != num_threads) {} - - return thpool_p; -} - - -/* Add work to the thread pool */ -int thpool_add_work(thpool_* thpool_p, void (*function_p)(void*), void* arg_p){ - job* newjob; - - newjob=(struct job*)malloc(sizeof(struct job)); - if (newjob==NULL){ - err("thpool_add_work(): Could not allocate memory for new job\n"); - return -1; - } - - /* add function and argument */ - newjob->function=function_p; - newjob->arg=arg_p; - - /* add job to queue */ - jobqueue_push(&thpool_p->jobqueue, newjob); - - return 0; -} - - -/* Wait until all jobs have finished */ -void thpool_wait(thpool_* thpool_p){ - pthread_mutex_lock(&thpool_p->thcount_lock); - while (thpool_p->jobqueue.len || thpool_p->num_threads_working) { - pthread_cond_wait(&thpool_p->threads_all_idle, &thpool_p->thcount_lock); - } - pthread_mutex_unlock(&thpool_p->thcount_lock); -} - - -/* Destroy the threadpool */ -void thpool_destroy(thpool_* thpool_p){ - /* No need to destroy if it's NULL */ - if (thpool_p == NULL) return ; - - volatile int threads_total = thpool_p->num_threads_alive; - - /* End each thread 's infinite loop */ - threads_keepalive = 0; - - /* Give one second to kill idle threads */ - double TIMEOUT = 1.0; - time_t start, end; - double tpassed = 0.0; - time (&start); - while (tpassed < TIMEOUT && thpool_p->num_threads_alive){ - bsem_post_all(thpool_p->jobqueue.has_jobs); - time (&end); - tpassed = difftime(end,start); - } - - /* Poll remaining threads */ - while (thpool_p->num_threads_alive){ - bsem_post_all(thpool_p->jobqueue.has_jobs); - sleep(1); - } - - /* Job queue cleanup */ - jobqueue_destroy(&thpool_p->jobqueue); - /* Deallocs */ - int n; - for (n=0; n < threads_total; n++){ - thread_destroy(thpool_p->threads[n]); - } - free(thpool_p->threads); - free(thpool_p); -} - - -/* Pause all threads in threadpool */ -void thpool_pause(thpool_* thpool_p) { - int n; - for (n=0; n < thpool_p->num_threads_alive; n++){ - pthread_kill(thpool_p->threads[n]->pthread, SIGUSR1); - } -} - - -/* Resume all threads in threadpool */ -void thpool_resume(thpool_* thpool_p) { - // resuming a single threadpool hasn't been - // implemented yet, meanwhile this suppresses - // the warnings - (void)thpool_p; - - threads_on_hold = 0; -} - - -int thpool_num_threads_working(thpool_* thpool_p){ - return thpool_p->num_threads_working; -} - - - - - -/* ============================ THREAD ============================== */ - - -/* Initialize a thread in the thread pool - * - * @param thread address to the pointer of the thread to be created - * @param id id to be given to the thread - * @return 0 on success, -1 otherwise. - */ -static int thread_init (thpool_* thpool_p, struct thread** thread_p, int id){ - - *thread_p = (struct thread*)malloc(sizeof(struct thread)); - if (*thread_p == NULL){ - err("thread_init(): Could not allocate memory for thread\n"); - return -1; - } - - (*thread_p)->thpool_p = thpool_p; - (*thread_p)->id = id; - - pthread_create(&(*thread_p)->pthread, NULL, (void * (*)(void *)) thread_do, (*thread_p)); - pthread_detach((*thread_p)->pthread); - return 0; -} - - -/* Sets the calling thread on hold */ -static void thread_hold(int sig_id) { - (void)sig_id; - threads_on_hold = 1; - while (threads_on_hold){ - sleep(1); - } -} - - -/* What each thread is doing -* -* In principle this is an endless loop. The only time this loop gets interrupted is once -* thpool_destroy() is invoked or the program exits. -* -* @param thread thread that will run this function -* @return nothing -*/ -static void* thread_do(struct thread* thread_p){ - - /* Set thread name for profiling and debugging */ - char thread_name[16] = {0}; - - snprintf(thread_name, 16, TOSTRING(THPOOL_THREAD_NAME) "-%d", thread_p->id); - -#if defined(__linux__) - /* Use prctl instead to prevent using _GNU_SOURCE flag and implicit declaration */ - prctl(PR_SET_NAME, thread_name); -#elif defined(__APPLE__) && defined(__MACH__) - pthread_setname_np(thread_name); -#elif defined(__FreeBSD__) || defined(__OpenBSD__) - pthread_set_name_np(thread_p->pthread, thread_name); -#else - err("thread_do(): pthread_setname_np is not supported on this system"); -#endif - - /* Assure all threads have been created before starting serving */ - thpool_* thpool_p = thread_p->thpool_p; - - /* Register signal handler */ - struct sigaction act; - sigemptyset(&act.sa_mask); - act.sa_flags = SA_ONSTACK; - act.sa_handler = thread_hold; - if (sigaction(SIGUSR1, &act, NULL) == -1) { - err("thread_do(): cannot handle SIGUSR1"); - } - - /* Mark thread as alive (initialized) */ - pthread_mutex_lock(&thpool_p->thcount_lock); - thpool_p->num_threads_alive += 1; - pthread_mutex_unlock(&thpool_p->thcount_lock); - - while(threads_keepalive){ - - bsem_wait(thpool_p->jobqueue.has_jobs); - - if (threads_keepalive){ - - pthread_mutex_lock(&thpool_p->thcount_lock); - thpool_p->num_threads_working++; - pthread_mutex_unlock(&thpool_p->thcount_lock); - - /* Read job from queue and execute it */ - void (*func_buff)(void*); - void* arg_buff; - job* job_p = jobqueue_pull(&thpool_p->jobqueue); - if (job_p) { - func_buff = job_p->function; - arg_buff = job_p->arg; - func_buff(arg_buff); - free(job_p); - } - - pthread_mutex_lock(&thpool_p->thcount_lock); - thpool_p->num_threads_working--; - if (!thpool_p->num_threads_working) { - pthread_cond_signal(&thpool_p->threads_all_idle); - } - pthread_mutex_unlock(&thpool_p->thcount_lock); - - } - } - pthread_mutex_lock(&thpool_p->thcount_lock); - thpool_p->num_threads_alive --; - pthread_mutex_unlock(&thpool_p->thcount_lock); - - return NULL; -} - - -/* Frees a thread */ -static void thread_destroy (thread* thread_p){ - free(thread_p); -} - - - - - -/* ============================ JOB QUEUE =========================== */ - - -/* Initialize queue */ -static int jobqueue_init(jobqueue* jobqueue_p){ - jobqueue_p->len = 0; - jobqueue_p->front = NULL; - jobqueue_p->rear = NULL; - - jobqueue_p->has_jobs = (struct bsem*)malloc(sizeof(struct bsem)); - if (jobqueue_p->has_jobs == NULL){ - return -1; - } - - pthread_mutex_init(&(jobqueue_p->rwmutex), NULL); - bsem_init(jobqueue_p->has_jobs, 0); - - return 0; -} - - -/* Clear the queue */ -static void jobqueue_clear(jobqueue* jobqueue_p){ - - while(jobqueue_p->len){ - free(jobqueue_pull(jobqueue_p)); - } - - jobqueue_p->front = NULL; - jobqueue_p->rear = NULL; - bsem_reset(jobqueue_p->has_jobs); - jobqueue_p->len = 0; - -} - - -/* Add (allocated) job to queue - */ -static void jobqueue_push(jobqueue* jobqueue_p, struct job* newjob){ - - pthread_mutex_lock(&jobqueue_p->rwmutex); - newjob->prev = NULL; - - switch(jobqueue_p->len){ - - case 0: /* if no jobs in queue */ - jobqueue_p->front = newjob; - jobqueue_p->rear = newjob; - break; - - default: /* if jobs in queue */ - jobqueue_p->rear->prev = newjob; - jobqueue_p->rear = newjob; - - } - jobqueue_p->len++; - - bsem_post(jobqueue_p->has_jobs); - pthread_mutex_unlock(&jobqueue_p->rwmutex); -} - - -/* Get first job from queue(removes it from queue) - * Notice: Caller MUST hold a mutex - */ -static struct job* jobqueue_pull(jobqueue* jobqueue_p){ - - pthread_mutex_lock(&jobqueue_p->rwmutex); - job* job_p = jobqueue_p->front; - - switch(jobqueue_p->len){ - - case 0: /* if no jobs in queue */ - break; - - case 1: /* if one job in queue */ - jobqueue_p->front = NULL; - jobqueue_p->rear = NULL; - jobqueue_p->len = 0; - break; - - default: /* if >1 jobs in queue */ - jobqueue_p->front = job_p->prev; - jobqueue_p->len--; - /* more than one job in queue -> post it */ - bsem_post(jobqueue_p->has_jobs); - - } - - pthread_mutex_unlock(&jobqueue_p->rwmutex); - return job_p; -} - - -/* Free all queue resources back to the system */ -static void jobqueue_destroy(jobqueue* jobqueue_p){ - jobqueue_clear(jobqueue_p); - free(jobqueue_p->has_jobs); -} - - - - - -/* ======================== SYNCHRONISATION ========================= */ - - -/* Init semaphore to 1 or 0 */ -static void bsem_init(bsem *bsem_p, int value) { - if (value < 0 || value > 1) { - err("bsem_init(): Binary semaphore can take only values 1 or 0"); - exit(1); - } - pthread_mutex_init(&(bsem_p->mutex), NULL); - pthread_cond_init(&(bsem_p->cond), NULL); - bsem_p->v = value; -} - - -/* Reset semaphore to 0 */ -static void bsem_reset(bsem *bsem_p) { - pthread_mutex_destroy(&(bsem_p->mutex)); - pthread_cond_destroy(&(bsem_p->cond)); - bsem_init(bsem_p, 0); -} - - -/* Post to at least one thread */ -static void bsem_post(bsem *bsem_p) { - pthread_mutex_lock(&bsem_p->mutex); - bsem_p->v = 1; - pthread_cond_signal(&bsem_p->cond); - pthread_mutex_unlock(&bsem_p->mutex); -} - - -/* Post to all threads */ -static void bsem_post_all(bsem *bsem_p) { - pthread_mutex_lock(&bsem_p->mutex); - bsem_p->v = 1; - pthread_cond_broadcast(&bsem_p->cond); - pthread_mutex_unlock(&bsem_p->mutex); -} - - -/* Wait on semaphore until semaphore has value 0 */ -static void bsem_wait(bsem* bsem_p) { - pthread_mutex_lock(&bsem_p->mutex); - while (bsem_p->v != 1) { - pthread_cond_wait(&bsem_p->cond, &bsem_p->mutex); - } - bsem_p->v = 0; - pthread_mutex_unlock(&bsem_p->mutex); -} diff --git a/C-Thread-Pool/thpool.h b/C-Thread-Pool/thpool.h deleted file mode 100644 index af3e68d165..0000000000 --- a/C-Thread-Pool/thpool.h +++ /dev/null @@ -1,187 +0,0 @@ -/********************************** - * @author Johan Hanssen Seferidis - * License: MIT - * - **********************************/ - -#ifndef _THPOOL_ -#define _THPOOL_ - -#ifdef __cplusplus -extern "C" { -#endif - -/* =================================== API ======================================= */ - - -typedef struct thpool_* threadpool; - - -/** - * @brief Initialize threadpool - * - * Initializes a threadpool. This function will not return until all - * threads have initialized successfully. - * - * @example - * - * .. - * threadpool thpool; //First we declare a threadpool - * thpool = thpool_init(4); //then we initialize it to 4 threads - * .. - * - * @param num_threads number of threads to be created in the threadpool - * @return threadpool created threadpool on success, - * NULL on error - */ -threadpool thpool_init(int num_threads); - - -/** - * @brief Add work to the job queue - * - * Takes an action and its argument and adds it to the threadpool's job queue. - * If you want to add to work a function with more than one arguments then - * a way to implement this is by passing a pointer to a structure. - * - * NOTICE: You have to cast both the function and argument to not get warnings. - * - * @example - * - * void print_num(int num){ - * printf("%d\n", num); - * } - * - * int main() { - * .. - * int a = 10; - * thpool_add_work(thpool, (void*)print_num, (void*)a); - * .. - * } - * - * @param threadpool threadpool to which the work will be added - * @param function_p pointer to function to add as work - * @param arg_p pointer to an argument - * @return 0 on success, -1 otherwise. - */ -int thpool_add_work(threadpool, void (*function_p)(void*), void* arg_p); - - -/** - * @brief Wait for all queued jobs to finish - * - * Will wait for all jobs - both queued and currently running to finish. - * Once the queue is empty and all work has completed, the calling thread - * (probably the main program) will continue. - * - * Smart polling is used in wait. The polling is initially 0 - meaning that - * there is virtually no polling at all. If after 1 seconds the threads - * haven't finished, the polling interval starts growing exponentially - * until it reaches max_secs seconds. Then it jumps down to a maximum polling - * interval assuming that heavy processing is being used in the threadpool. - * - * @example - * - * .. - * threadpool thpool = thpool_init(4); - * .. - * // Add a bunch of work - * .. - * thpool_wait(thpool); - * puts("All added work has finished"); - * .. - * - * @param threadpool the threadpool to wait for - * @return nothing - */ -void thpool_wait(threadpool); - - -/** - * @brief Pauses all threads immediately - * - * The threads will be paused no matter if they are idle or working. - * The threads return to their previous states once thpool_resume - * is called. - * - * While the thread is being paused, new work can be added. - * - * @example - * - * threadpool thpool = thpool_init(4); - * thpool_pause(thpool); - * .. - * // Add a bunch of work - * .. - * thpool_resume(thpool); // Let the threads start their magic - * - * @param threadpool the threadpool where the threads should be paused - * @return nothing - */ -void thpool_pause(threadpool); - - -/** - * @brief Unpauses all threads if they are paused - * - * @example - * .. - * thpool_pause(thpool); - * sleep(10); // Delay execution 10 seconds - * thpool_resume(thpool); - * .. - * - * @param threadpool the threadpool where the threads should be unpaused - * @return nothing - */ -void thpool_resume(threadpool); - - -/** - * @brief Destroy the threadpool - * - * This will wait for the currently active threads to finish and then 'kill' - * the whole threadpool to free up memory. - * - * @example - * int main() { - * threadpool thpool1 = thpool_init(2); - * threadpool thpool2 = thpool_init(2); - * .. - * thpool_destroy(thpool1); - * .. - * return 0; - * } - * - * @param threadpool the threadpool to destroy - * @return nothing - */ -void thpool_destroy(threadpool); - - -/** - * @brief Show currently working threads - * - * Working threads are the threads that are performing work (not idle). - * - * @example - * int main() { - * threadpool thpool1 = thpool_init(2); - * threadpool thpool2 = thpool_init(2); - * .. - * printf("Working threads: %d\n", thpool_num_threads_working(thpool1)); - * .. - * return 0; - * } - * - * @param threadpool the threadpool of interest - * @return integer number of threads working - */ -int thpool_num_threads_working(threadpool); - - -#ifdef __cplusplus -} -#endif - -#endif diff --git a/Dockerfile b/Dockerfile index 26bcf3786d..f50b30360d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -80,7 +80,6 @@ RUN go mod graph | awk '{if ($1 !~ "@") print $2}' | xargs go get WORKDIR /go/src/app COPY --link *.* ./ COPY --link caddy caddy -COPY --link C-Thread-Pool C-Thread-Pool COPY --link internal internal COPY --link testdata testdata diff --git a/alpine.Dockerfile b/alpine.Dockerfile index 00addd5983..d7abaccd68 100644 --- a/alpine.Dockerfile +++ b/alpine.Dockerfile @@ -78,7 +78,6 @@ RUN go mod graph | awk '{if ($1 !~ "@") print $2}' | xargs go get WORKDIR /go/src/app COPY --link *.* ./ COPY --link caddy caddy -COPY --link C-Thread-Pool C-Thread-Pool COPY --link internal internal COPY --link testdata testdata diff --git a/frankenphp.c b/frankenphp.c index e2b7bfa58a..686d4c91d5 100644 --- a/frankenphp.c +++ b/frankenphp.c @@ -18,9 +18,6 @@ #include #include -#include "C-Thread-Pool/thpool.c" -#include "C-Thread-Pool/thpool.h" - #include "_cgo_export.h" #include "frankenphp_arginfo.h" @@ -726,7 +723,7 @@ sapi_module_struct frankenphp_sapi_module = { STANDARD_SAPI_MODULE_PROPERTIES}; -static void *manager_thread(void *arg) { +void frankenphp_prepare_thread() { // SIGPIPE must be masked in non-Go threads: // https://pkg.go.dev/os/signal#hdr-Go_programs_that_use_cgo_or_SWIG sigset_t set; @@ -737,10 +734,9 @@ static void *manager_thread(void *arg) { perror("failed to block SIGPIPE"); exit(EXIT_FAILURE); } +} - int num_threads = *((int *)arg); - free(arg); - arg = NULL; +void frankenphp_prepare_sapi(int num_threads) { #ifdef ZTS #if (PHP_VERSION_ID >= 80300) @@ -767,18 +763,9 @@ static void *manager_thread(void *arg) { #endif frankenphp_sapi_module.startup(&frankenphp_sapi_module); +} - threadpool thpool = thpool_init(num_threads); - - uintptr_t rh; - while ((rh = go_fetch_request())) { - thpool_add_work(thpool, go_execute_script, (void *)rh); - } - - /* channel closed, shutdown gracefully */ - thpool_wait(thpool); - thpool_destroy(thpool); - +void frankenphp_shutdown_sapi() { frankenphp_sapi_module.shutdown(&frankenphp_sapi_module); sapi_shutdown(); @@ -792,26 +779,6 @@ static void *manager_thread(void *arg) { frankenphp_sapi_module.ini_entries = NULL; } #endif - - go_shutdown(); - - return NULL; -} - -int frankenphp_init(int num_threads) { - pthread_t thread; - - int *num_threads_ptr = calloc(1, sizeof(int)); - *num_threads_ptr = num_threads; - - if (pthread_create(&thread, NULL, *manager_thread, (void *)num_threads_ptr) != - 0) { - go_shutdown(); - - return -1; - } - - return pthread_detach(thread); } int frankenphp_request_startup() { diff --git a/frankenphp.go b/frankenphp.go index 8df3ee4348..a26317380c 100644 --- a/frankenphp.go +++ b/frankenphp.go @@ -5,10 +5,6 @@ // [FrankenPHP app server]: https://frankenphp.dev package frankenphp -//go:generate rm -Rf C-Thread-Pool/ -//go:generate git clone --depth=1 git@github.com:Pithikos/C-Thread-Pool.git -//go:generate rm -Rf C-Thread-Pool/.git C-Thread-Pool/.github C-Thread-Pool/docs C-Thread-Pool/tests C-Thread-Pool/example.c - // Use PHP includes corresponding to your PHP installation by running: // // export CGO_CFLAGS=$(php-config --includes) @@ -17,7 +13,7 @@ package frankenphp // We also set these flags for hardening: https://github.com/docker-library/php/blob/master/8.2/bookworm/zts/Dockerfile#L57-L59 // #cgo darwin pkg-config: libxml-2.0 -// #cgo CFLAGS: -Wall -Werror -fstack-protector-strong -fpic -fpie -O2 -D_LARGEFILE_SOURCE -D_FILE_OFFSET_BITS=64 +// #cgo CFLAGS: -Werror -fstack-protector-strong -fpic -fpie -O2 -D_LARGEFILE_SOURCE -D_FILE_OFFSET_BITS=64 // #cgo CFLAGS: -I/usr/local/include/php -I/usr/local/include/php/main -I/usr/local/include/php/TSRM -I/usr/local/include/php/Zend -I/usr/local/include/php/ext -I/usr/local/include/php/ext/date/lib // #cgo CFLAGS: -DTHREAD_NAME=frankenphp // #cgo linux CFLAGS: -D_GNU_SOURCE @@ -317,9 +313,7 @@ func Init(options ...Option) error { done = make(chan struct{}) requestChan = make(chan *http.Request) - if C.frankenphp_init(C.int(opt.numThreads)) != 0 { - return MainThreadCreationError - } + go startMainThreads(opt.numThreads) if err := initWorkers(opt.workers); err != nil { return err @@ -333,6 +327,34 @@ func Init(options ...Option) error { return nil } +func startMainThreads(numThreads int) bool { + // prevent sharing this thread with other go funcs + runtime.LockOSThread() + // note: we do NOT unlock the thread so that Go will not try to reuse it when this func completes + + var threads sync.WaitGroup + + C.frankenphp_prepare_thread() + C.frankenphp_prepare_sapi(C.int(numThreads)) + + for i := 0; i < numThreads; i++ { + threads.Add(1) + go func() { + runtime.LockOSThread() + defer threads.Done() + + C.frankenphp_prepare_thread() + go_fetch_and_execute() + }() + } + + threads.Wait() + C.frankenphp_shutdown_sapi() + go_shutdown() + + return true +} + // Shutdown stops the workers and the PHP runtime. func Shutdown() { stopWorkers() @@ -468,16 +490,34 @@ func ServeHTTP(responseWriter http.ResponseWriter, request *http.Request) error return nil } -//export go_fetch_request -func go_fetch_request() C.uintptr_t { - select { - case <-done: - return 0 +//export go_fetch_and_execute +func go_fetch_and_execute() { + for { + select { + case <-done: + return + case r := <-requestChan: + h := cgo.NewHandle(r) + r.Context().Value(handleKey).(*handleList).AddHandle(h) + + fc, ok := FromContext(r.Context()) + if !ok { + panic(InvalidRequestError) + } + + if err := updateServerContext(r, true, 0); err != nil { + panic(err) + } - case r := <-requestChan: - h := cgo.NewHandle(r) - r.Context().Value(handleKey).(*handleList).AddHandle(h) - return C.uintptr_t(h) + // scriptFilename is freed in frankenphp_execute_script() + fc.exitStatus = C.frankenphp_execute_script(C.CString(fc.scriptFilename)) + if fc.exitStatus < 0 { + panic(ScriptExecutionError) + } + + maybeCloseContext(fc) + r.Context().Value(handleKey).(*handleList).FreeAll() + } } } @@ -487,33 +527,6 @@ func maybeCloseContext(fc *FrankenPHPContext) { }) } -// go_execute_script Note: only called in cgi-mode -// -//export go_execute_script -func go_execute_script(rh unsafe.Pointer) { - handle := cgo.Handle(rh) - - request := handle.Value().(*http.Request) - fc, ok := FromContext(request.Context()) - if !ok { - panic(InvalidRequestError) - } - defer func() { - maybeCloseContext(fc) - request.Context().Value(handleKey).(*handleList).FreeAll() - }() - - if err := updateServerContext(request, true, 0); err != nil { - panic(err) - } - - // scriptFilename is freed in frankenphp_execute_script() - fc.exitStatus = C.frankenphp_execute_script(C.CString(fc.scriptFilename)) - if fc.exitStatus < 0 { - panic(ScriptExecutionError) - } -} - //export go_ub_write func go_ub_write(rh C.uintptr_t, cBuf *C.char, length C.int) (C.size_t, C.bool) { r := cgo.Handle(rh).Value().(*http.Request) diff --git a/frankenphp.h b/frankenphp.h index 8cb3761b09..b0f1e64361 100644 --- a/frankenphp.h +++ b/frankenphp.h @@ -40,7 +40,9 @@ typedef struct frankenphp_config { } frankenphp_config; frankenphp_config frankenphp_get_config(); -int frankenphp_init(int num_threads); +void frankenphp_prepare_thread(); +void frankenphp_prepare_sapi(int num_threads); +void frankenphp_shutdown_sapi(); int frankenphp_update_server_context( bool create, uintptr_t current_request, uintptr_t main_request, diff --git a/static-builder.Dockerfile b/static-builder.Dockerfile index 97812ae8f0..ad4949a067 100644 --- a/static-builder.Dockerfile +++ b/static-builder.Dockerfile @@ -83,7 +83,6 @@ RUN go mod graph | awk '{if ($1 !~ "@") print $2}' | xargs go get WORKDIR /go/src/app COPY *.* ./ COPY caddy caddy -COPY C-Thread-Pool C-Thread-Pool RUN --mount=type=secret,id=github-token GITHUB_TOKEN=$(cat /run/secrets/github-token) ./build-static.sh && \ rm -Rf dist/static-php-cli/source/*