diff --git a/.github/workflows/lint.yaml b/.github/workflows/lint.yaml index 37cd22c8a4..e75623fce4 100644 --- a/.github/workflows/lint.yaml +++ b/.github/workflows/lint.yaml @@ -32,7 +32,6 @@ jobs: DEFAULT_BRANCH: main GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} LINTER_RULES_PATH: / - FILTER_REGEX_EXCLUDE: '.*C-Thread-Pool/.*' MARKDOWN_CONFIG_FILE: .markdown-lint.yaml VALIDATE_CPP: false VALIDATE_JSCPD: false diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index b3f071df05..e9fc9a1f82 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -20,6 +20,7 @@ jobs: php-versions: ['8.2', '8.3'] env: GOEXPERIMENT: cgocheck2 + GOMAXPROCS: 10 steps: - uses: actions/checkout@v4 diff --git a/C-Thread-Pool/LICENSE b/C-Thread-Pool/LICENSE deleted file mode 100644 index bf4494c15b..0000000000 --- a/C-Thread-Pool/LICENSE +++ /dev/null @@ -1,21 +0,0 @@ -The MIT License (MIT) - -Copyright (c) 2016 Johan Hanssen Seferidis - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. diff --git a/C-Thread-Pool/README.md b/C-Thread-Pool/README.md deleted file mode 100644 index dc710b7e0b..0000000000 --- a/C-Thread-Pool/README.md +++ /dev/null @@ -1,70 +0,0 @@ -[![GitHub Actions](https://github.com/Pithikos/C-Thread-Pool/actions/workflows/tests.yml/badge.svg?branch=master)](https://github.com/Pithikos/C-Thread-Pool/actions?query=workflow%3Atests+branch%3Amaster) - - -# C Thread Pool - -This is a minimal but advanced threadpool implementation. - - * ANCI C and POSIX compliant - * Pause/resume/wait as you like - * Simple easy-to-digest API - * Well tested - -The threadpool is under MIT license. Notice that this project took a considerable amount of work and sacrifice of my free time and the reason I give it for free (even for commercial use) is so when you become rich and wealthy you don't forget about us open-source creatures of the night. Cheers! - -If this project reduced your development time feel free to buy me a coffee. - -[![Donate](https://www.paypal.com/en_US/i/btn/x-click-but21.gif)](https://www.paypal.me/seferidis) - - -## Run an example - -The library is not precompiled so you have to compile it with your project. The thread pool -uses POSIX threads so if you compile with gcc on Linux you have to use the flag `-pthread` like this: - - gcc example.c thpool.c -D THPOOL_DEBUG -pthread -o example - - -Then run the executable like this: - - ./example - - -## Basic usage - -1. Include the header in your source file: `#include "thpool.h"` -2. Create a thread pool with number of threads you want: `threadpool thpool = thpool_init(4);` -3. Add work to the pool: `thpool_add_work(thpool, (void*)function_p, (void*)arg_p);` - -The workers(threads) will start their work automatically as fast as there is new work -in the pool. If you want to wait for all added work to be finished before continuing -you can use `thpool_wait(thpool);`. If you want to destroy the pool you can use -`thpool_destroy(thpool);`. - - -## API - -For a deeper look into the documentation check in the [thpool.h](https://github.com/Pithikos/C-Thread-Pool/blob/master/thpool.h) file. Below is a fast practical overview. - -| Function example | Description | -|---------------------------------|---------------------------------------------------------------------| -| ***thpool_init(4)*** | Will return a new threadpool with `4` threads. | -| ***thpool_add_work(thpool, (void*)function_p, (void*)arg_p)*** | Will add new work to the pool. Work is simply a function. You can pass a single argument to the function if you wish. If not, `NULL` should be passed. | -| ***thpool_wait(thpool)*** | Will wait for all jobs (both in queue and currently running) to finish. | -| ***thpool_destroy(thpool)*** | This will destroy the threadpool. If jobs are currently being executed, then it will wait for them to finish. | -| ***thpool_pause(thpool)*** | All threads in the threadpool will pause no matter if they are idle or executing work. | -| ***thpool_resume(thpool)*** | If the threadpool is paused, then all threads will resume from where they were. | -| ***thpool_num_threads_working(thpool)*** | Will return the number of currently working threads. | - - -## Contribution - -You are very welcome to contribute. If you have a new feature in mind, you can always open an issue on github describing it so you don't end up doing a lot of work that might not be eventually merged. Generally we are very open to contributions as long as they follow the below keypoints. - -* Try to keep the API as minimal as possible. That means if a feature or fix can be implemented without affecting the existing API but requires more development time, then we will opt to sacrifice development time. -* Solutions need to be POSIX compliant. The thread-pool is advertised as such so it makes sense that it actually is. -* For coding style simply try to stick to the conventions you find in the existing codebase. -* Tests: A new fix or feature should be covered by tests. If the existing tests are not sufficient, we expect an according test to follow with the pull request. -* Documentation: for a new feature please add documentation. For an API change the documentation has to be thorough and super easy to understand. - -If you wish to **get access as a collaborator** feel free to mention it in the issue https://github.com/Pithikos/C-Thread-Pool/issues/78 diff --git a/C-Thread-Pool/thpool.c b/C-Thread-Pool/thpool.c deleted file mode 100644 index 83885c3777..0000000000 --- a/C-Thread-Pool/thpool.c +++ /dev/null @@ -1,571 +0,0 @@ -/* ******************************** - * Author: Johan Hanssen Seferidis - * License: MIT - * Description: Library providing a threading pool where you can add - * work. For usage, check the thpool.h file or README.md - * - *//** @file thpool.h *//* - * - ********************************/ - -#if defined(__APPLE__) -#include -#else -#ifndef _POSIX_C_SOURCE -#define _POSIX_C_SOURCE 200809L -#endif -#ifndef _XOPEN_SOURCE -#define _XOPEN_SOURCE 500 -#endif -#endif -#include -#include -#include -#include -#include -#include -#include -#if defined(__linux__) -#include -#endif -#if defined(__FreeBSD__) || defined(__OpenBSD__) -#include -#endif - -#include "thpool.h" - -#ifdef THPOOL_DEBUG -#define THPOOL_DEBUG 1 -#else -#define THPOOL_DEBUG 0 -#endif - -#if !defined(DISABLE_PRINT) || defined(THPOOL_DEBUG) -#define err(str) fprintf(stderr, str) -#else -#define err(str) -#endif - -#ifndef THPOOL_THREAD_NAME -#define THPOOL_THREAD_NAME thpool -#endif - -#define STRINGIFY(x) #x -#define TOSTRING(x) STRINGIFY(x) - -static volatile int threads_keepalive; -static volatile int threads_on_hold; - - - -/* ========================== STRUCTURES ============================ */ - - -/* Binary semaphore */ -typedef struct bsem { - pthread_mutex_t mutex; - pthread_cond_t cond; - int v; -} bsem; - - -/* Job */ -typedef struct job{ - struct job* prev; /* pointer to previous job */ - void (*function)(void* arg); /* function pointer */ - void* arg; /* function's argument */ -} job; - - -/* Job queue */ -typedef struct jobqueue{ - pthread_mutex_t rwmutex; /* used for queue r/w access */ - job *front; /* pointer to front of queue */ - job *rear; /* pointer to rear of queue */ - bsem *has_jobs; /* flag as binary semaphore */ - int len; /* number of jobs in queue */ -} jobqueue; - - -/* Thread */ -typedef struct thread{ - int id; /* friendly id */ - pthread_t pthread; /* pointer to actual thread */ - struct thpool_* thpool_p; /* access to thpool */ -} thread; - - -/* Threadpool */ -typedef struct thpool_{ - thread** threads; /* pointer to threads */ - volatile int num_threads_alive; /* threads currently alive */ - volatile int num_threads_working; /* threads currently working */ - pthread_mutex_t thcount_lock; /* used for thread count etc */ - pthread_cond_t threads_all_idle; /* signal to thpool_wait */ - jobqueue jobqueue; /* job queue */ -} thpool_; - - - - - -/* ========================== PROTOTYPES ============================ */ - - -static int thread_init(thpool_* thpool_p, struct thread** thread_p, int id); -static void* thread_do(struct thread* thread_p); -static void thread_hold(int sig_id); -static void thread_destroy(struct thread* thread_p); - -static int jobqueue_init(jobqueue* jobqueue_p); -static void jobqueue_clear(jobqueue* jobqueue_p); -static void jobqueue_push(jobqueue* jobqueue_p, struct job* newjob_p); -static struct job* jobqueue_pull(jobqueue* jobqueue_p); -static void jobqueue_destroy(jobqueue* jobqueue_p); - -static void bsem_init(struct bsem *bsem_p, int value); -static void bsem_reset(struct bsem *bsem_p); -static void bsem_post(struct bsem *bsem_p); -static void bsem_post_all(struct bsem *bsem_p); -static void bsem_wait(struct bsem *bsem_p); - - - - - -/* ========================== THREADPOOL ============================ */ - - -/* Initialise thread pool */ -struct thpool_* thpool_init(int num_threads){ - - threads_on_hold = 0; - threads_keepalive = 1; - - if (num_threads < 0){ - num_threads = 0; - } - - /* Make new thread pool */ - thpool_* thpool_p; - thpool_p = (struct thpool_*)malloc(sizeof(struct thpool_)); - if (thpool_p == NULL){ - err("thpool_init(): Could not allocate memory for thread pool\n"); - return NULL; - } - thpool_p->num_threads_alive = 0; - thpool_p->num_threads_working = 0; - - /* Initialise the job queue */ - if (jobqueue_init(&thpool_p->jobqueue) == -1){ - err("thpool_init(): Could not allocate memory for job queue\n"); - free(thpool_p); - return NULL; - } - - /* Make threads in pool */ - thpool_p->threads = (struct thread**)malloc(num_threads * sizeof(struct thread *)); - if (thpool_p->threads == NULL){ - err("thpool_init(): Could not allocate memory for threads\n"); - jobqueue_destroy(&thpool_p->jobqueue); - free(thpool_p); - return NULL; - } - - pthread_mutex_init(&(thpool_p->thcount_lock), NULL); - pthread_cond_init(&thpool_p->threads_all_idle, NULL); - - /* Thread init */ - int n; - for (n=0; nthreads[n], n); -#if THPOOL_DEBUG - printf("THPOOL_DEBUG: Created thread %d in pool \n", n); -#endif - } - - /* Wait for threads to initialize */ - while (thpool_p->num_threads_alive != num_threads) {} - - return thpool_p; -} - - -/* Add work to the thread pool */ -int thpool_add_work(thpool_* thpool_p, void (*function_p)(void*), void* arg_p){ - job* newjob; - - newjob=(struct job*)malloc(sizeof(struct job)); - if (newjob==NULL){ - err("thpool_add_work(): Could not allocate memory for new job\n"); - return -1; - } - - /* add function and argument */ - newjob->function=function_p; - newjob->arg=arg_p; - - /* add job to queue */ - jobqueue_push(&thpool_p->jobqueue, newjob); - - return 0; -} - - -/* Wait until all jobs have finished */ -void thpool_wait(thpool_* thpool_p){ - pthread_mutex_lock(&thpool_p->thcount_lock); - while (thpool_p->jobqueue.len || thpool_p->num_threads_working) { - pthread_cond_wait(&thpool_p->threads_all_idle, &thpool_p->thcount_lock); - } - pthread_mutex_unlock(&thpool_p->thcount_lock); -} - - -/* Destroy the threadpool */ -void thpool_destroy(thpool_* thpool_p){ - /* No need to destroy if it's NULL */ - if (thpool_p == NULL) return ; - - volatile int threads_total = thpool_p->num_threads_alive; - - /* End each thread 's infinite loop */ - threads_keepalive = 0; - - /* Give one second to kill idle threads */ - double TIMEOUT = 1.0; - time_t start, end; - double tpassed = 0.0; - time (&start); - while (tpassed < TIMEOUT && thpool_p->num_threads_alive){ - bsem_post_all(thpool_p->jobqueue.has_jobs); - time (&end); - tpassed = difftime(end,start); - } - - /* Poll remaining threads */ - while (thpool_p->num_threads_alive){ - bsem_post_all(thpool_p->jobqueue.has_jobs); - sleep(1); - } - - /* Job queue cleanup */ - jobqueue_destroy(&thpool_p->jobqueue); - /* Deallocs */ - int n; - for (n=0; n < threads_total; n++){ - thread_destroy(thpool_p->threads[n]); - } - free(thpool_p->threads); - free(thpool_p); -} - - -/* Pause all threads in threadpool */ -void thpool_pause(thpool_* thpool_p) { - int n; - for (n=0; n < thpool_p->num_threads_alive; n++){ - pthread_kill(thpool_p->threads[n]->pthread, SIGUSR1); - } -} - - -/* Resume all threads in threadpool */ -void thpool_resume(thpool_* thpool_p) { - // resuming a single threadpool hasn't been - // implemented yet, meanwhile this suppresses - // the warnings - (void)thpool_p; - - threads_on_hold = 0; -} - - -int thpool_num_threads_working(thpool_* thpool_p){ - return thpool_p->num_threads_working; -} - - - - - -/* ============================ THREAD ============================== */ - - -/* Initialize a thread in the thread pool - * - * @param thread address to the pointer of the thread to be created - * @param id id to be given to the thread - * @return 0 on success, -1 otherwise. - */ -static int thread_init (thpool_* thpool_p, struct thread** thread_p, int id){ - - *thread_p = (struct thread*)malloc(sizeof(struct thread)); - if (*thread_p == NULL){ - err("thread_init(): Could not allocate memory for thread\n"); - return -1; - } - - (*thread_p)->thpool_p = thpool_p; - (*thread_p)->id = id; - - pthread_create(&(*thread_p)->pthread, NULL, (void * (*)(void *)) thread_do, (*thread_p)); - pthread_detach((*thread_p)->pthread); - return 0; -} - - -/* Sets the calling thread on hold */ -static void thread_hold(int sig_id) { - (void)sig_id; - threads_on_hold = 1; - while (threads_on_hold){ - sleep(1); - } -} - - -/* What each thread is doing -* -* In principle this is an endless loop. The only time this loop gets interrupted is once -* thpool_destroy() is invoked or the program exits. -* -* @param thread thread that will run this function -* @return nothing -*/ -static void* thread_do(struct thread* thread_p){ - - /* Set thread name for profiling and debugging */ - char thread_name[16] = {0}; - - snprintf(thread_name, 16, TOSTRING(THPOOL_THREAD_NAME) "-%d", thread_p->id); - -#if defined(__linux__) - /* Use prctl instead to prevent using _GNU_SOURCE flag and implicit declaration */ - prctl(PR_SET_NAME, thread_name); -#elif defined(__APPLE__) && defined(__MACH__) - pthread_setname_np(thread_name); -#elif defined(__FreeBSD__) || defined(__OpenBSD__) - pthread_set_name_np(thread_p->pthread, thread_name); -#else - err("thread_do(): pthread_setname_np is not supported on this system"); -#endif - - /* Assure all threads have been created before starting serving */ - thpool_* thpool_p = thread_p->thpool_p; - - /* Register signal handler */ - struct sigaction act; - sigemptyset(&act.sa_mask); - act.sa_flags = SA_ONSTACK; - act.sa_handler = thread_hold; - if (sigaction(SIGUSR1, &act, NULL) == -1) { - err("thread_do(): cannot handle SIGUSR1"); - } - - /* Mark thread as alive (initialized) */ - pthread_mutex_lock(&thpool_p->thcount_lock); - thpool_p->num_threads_alive += 1; - pthread_mutex_unlock(&thpool_p->thcount_lock); - - while(threads_keepalive){ - - bsem_wait(thpool_p->jobqueue.has_jobs); - - if (threads_keepalive){ - - pthread_mutex_lock(&thpool_p->thcount_lock); - thpool_p->num_threads_working++; - pthread_mutex_unlock(&thpool_p->thcount_lock); - - /* Read job from queue and execute it */ - void (*func_buff)(void*); - void* arg_buff; - job* job_p = jobqueue_pull(&thpool_p->jobqueue); - if (job_p) { - func_buff = job_p->function; - arg_buff = job_p->arg; - func_buff(arg_buff); - free(job_p); - } - - pthread_mutex_lock(&thpool_p->thcount_lock); - thpool_p->num_threads_working--; - if (!thpool_p->num_threads_working) { - pthread_cond_signal(&thpool_p->threads_all_idle); - } - pthread_mutex_unlock(&thpool_p->thcount_lock); - - } - } - pthread_mutex_lock(&thpool_p->thcount_lock); - thpool_p->num_threads_alive --; - pthread_mutex_unlock(&thpool_p->thcount_lock); - - return NULL; -} - - -/* Frees a thread */ -static void thread_destroy (thread* thread_p){ - free(thread_p); -} - - - - - -/* ============================ JOB QUEUE =========================== */ - - -/* Initialize queue */ -static int jobqueue_init(jobqueue* jobqueue_p){ - jobqueue_p->len = 0; - jobqueue_p->front = NULL; - jobqueue_p->rear = NULL; - - jobqueue_p->has_jobs = (struct bsem*)malloc(sizeof(struct bsem)); - if (jobqueue_p->has_jobs == NULL){ - return -1; - } - - pthread_mutex_init(&(jobqueue_p->rwmutex), NULL); - bsem_init(jobqueue_p->has_jobs, 0); - - return 0; -} - - -/* Clear the queue */ -static void jobqueue_clear(jobqueue* jobqueue_p){ - - while(jobqueue_p->len){ - free(jobqueue_pull(jobqueue_p)); - } - - jobqueue_p->front = NULL; - jobqueue_p->rear = NULL; - bsem_reset(jobqueue_p->has_jobs); - jobqueue_p->len = 0; - -} - - -/* Add (allocated) job to queue - */ -static void jobqueue_push(jobqueue* jobqueue_p, struct job* newjob){ - - pthread_mutex_lock(&jobqueue_p->rwmutex); - newjob->prev = NULL; - - switch(jobqueue_p->len){ - - case 0: /* if no jobs in queue */ - jobqueue_p->front = newjob; - jobqueue_p->rear = newjob; - break; - - default: /* if jobs in queue */ - jobqueue_p->rear->prev = newjob; - jobqueue_p->rear = newjob; - - } - jobqueue_p->len++; - - bsem_post(jobqueue_p->has_jobs); - pthread_mutex_unlock(&jobqueue_p->rwmutex); -} - - -/* Get first job from queue(removes it from queue) - * Notice: Caller MUST hold a mutex - */ -static struct job* jobqueue_pull(jobqueue* jobqueue_p){ - - pthread_mutex_lock(&jobqueue_p->rwmutex); - job* job_p = jobqueue_p->front; - - switch(jobqueue_p->len){ - - case 0: /* if no jobs in queue */ - break; - - case 1: /* if one job in queue */ - jobqueue_p->front = NULL; - jobqueue_p->rear = NULL; - jobqueue_p->len = 0; - break; - - default: /* if >1 jobs in queue */ - jobqueue_p->front = job_p->prev; - jobqueue_p->len--; - /* more than one job in queue -> post it */ - bsem_post(jobqueue_p->has_jobs); - - } - - pthread_mutex_unlock(&jobqueue_p->rwmutex); - return job_p; -} - - -/* Free all queue resources back to the system */ -static void jobqueue_destroy(jobqueue* jobqueue_p){ - jobqueue_clear(jobqueue_p); - free(jobqueue_p->has_jobs); -} - - - - - -/* ======================== SYNCHRONISATION ========================= */ - - -/* Init semaphore to 1 or 0 */ -static void bsem_init(bsem *bsem_p, int value) { - if (value < 0 || value > 1) { - err("bsem_init(): Binary semaphore can take only values 1 or 0"); - exit(1); - } - pthread_mutex_init(&(bsem_p->mutex), NULL); - pthread_cond_init(&(bsem_p->cond), NULL); - bsem_p->v = value; -} - - -/* Reset semaphore to 0 */ -static void bsem_reset(bsem *bsem_p) { - pthread_mutex_destroy(&(bsem_p->mutex)); - pthread_cond_destroy(&(bsem_p->cond)); - bsem_init(bsem_p, 0); -} - - -/* Post to at least one thread */ -static void bsem_post(bsem *bsem_p) { - pthread_mutex_lock(&bsem_p->mutex); - bsem_p->v = 1; - pthread_cond_signal(&bsem_p->cond); - pthread_mutex_unlock(&bsem_p->mutex); -} - - -/* Post to all threads */ -static void bsem_post_all(bsem *bsem_p) { - pthread_mutex_lock(&bsem_p->mutex); - bsem_p->v = 1; - pthread_cond_broadcast(&bsem_p->cond); - pthread_mutex_unlock(&bsem_p->mutex); -} - - -/* Wait on semaphore until semaphore has value 0 */ -static void bsem_wait(bsem* bsem_p) { - pthread_mutex_lock(&bsem_p->mutex); - while (bsem_p->v != 1) { - pthread_cond_wait(&bsem_p->cond, &bsem_p->mutex); - } - bsem_p->v = 0; - pthread_mutex_unlock(&bsem_p->mutex); -} diff --git a/C-Thread-Pool/thpool.h b/C-Thread-Pool/thpool.h deleted file mode 100644 index af3e68d165..0000000000 --- a/C-Thread-Pool/thpool.h +++ /dev/null @@ -1,187 +0,0 @@ -/********************************** - * @author Johan Hanssen Seferidis - * License: MIT - * - **********************************/ - -#ifndef _THPOOL_ -#define _THPOOL_ - -#ifdef __cplusplus -extern "C" { -#endif - -/* =================================== API ======================================= */ - - -typedef struct thpool_* threadpool; - - -/** - * @brief Initialize threadpool - * - * Initializes a threadpool. This function will not return until all - * threads have initialized successfully. - * - * @example - * - * .. - * threadpool thpool; //First we declare a threadpool - * thpool = thpool_init(4); //then we initialize it to 4 threads - * .. - * - * @param num_threads number of threads to be created in the threadpool - * @return threadpool created threadpool on success, - * NULL on error - */ -threadpool thpool_init(int num_threads); - - -/** - * @brief Add work to the job queue - * - * Takes an action and its argument and adds it to the threadpool's job queue. - * If you want to add to work a function with more than one arguments then - * a way to implement this is by passing a pointer to a structure. - * - * NOTICE: You have to cast both the function and argument to not get warnings. - * - * @example - * - * void print_num(int num){ - * printf("%d\n", num); - * } - * - * int main() { - * .. - * int a = 10; - * thpool_add_work(thpool, (void*)print_num, (void*)a); - * .. - * } - * - * @param threadpool threadpool to which the work will be added - * @param function_p pointer to function to add as work - * @param arg_p pointer to an argument - * @return 0 on success, -1 otherwise. - */ -int thpool_add_work(threadpool, void (*function_p)(void*), void* arg_p); - - -/** - * @brief Wait for all queued jobs to finish - * - * Will wait for all jobs - both queued and currently running to finish. - * Once the queue is empty and all work has completed, the calling thread - * (probably the main program) will continue. - * - * Smart polling is used in wait. The polling is initially 0 - meaning that - * there is virtually no polling at all. If after 1 seconds the threads - * haven't finished, the polling interval starts growing exponentially - * until it reaches max_secs seconds. Then it jumps down to a maximum polling - * interval assuming that heavy processing is being used in the threadpool. - * - * @example - * - * .. - * threadpool thpool = thpool_init(4); - * .. - * // Add a bunch of work - * .. - * thpool_wait(thpool); - * puts("All added work has finished"); - * .. - * - * @param threadpool the threadpool to wait for - * @return nothing - */ -void thpool_wait(threadpool); - - -/** - * @brief Pauses all threads immediately - * - * The threads will be paused no matter if they are idle or working. - * The threads return to their previous states once thpool_resume - * is called. - * - * While the thread is being paused, new work can be added. - * - * @example - * - * threadpool thpool = thpool_init(4); - * thpool_pause(thpool); - * .. - * // Add a bunch of work - * .. - * thpool_resume(thpool); // Let the threads start their magic - * - * @param threadpool the threadpool where the threads should be paused - * @return nothing - */ -void thpool_pause(threadpool); - - -/** - * @brief Unpauses all threads if they are paused - * - * @example - * .. - * thpool_pause(thpool); - * sleep(10); // Delay execution 10 seconds - * thpool_resume(thpool); - * .. - * - * @param threadpool the threadpool where the threads should be unpaused - * @return nothing - */ -void thpool_resume(threadpool); - - -/** - * @brief Destroy the threadpool - * - * This will wait for the currently active threads to finish and then 'kill' - * the whole threadpool to free up memory. - * - * @example - * int main() { - * threadpool thpool1 = thpool_init(2); - * threadpool thpool2 = thpool_init(2); - * .. - * thpool_destroy(thpool1); - * .. - * return 0; - * } - * - * @param threadpool the threadpool to destroy - * @return nothing - */ -void thpool_destroy(threadpool); - - -/** - * @brief Show currently working threads - * - * Working threads are the threads that are performing work (not idle). - * - * @example - * int main() { - * threadpool thpool1 = thpool_init(2); - * threadpool thpool2 = thpool_init(2); - * .. - * printf("Working threads: %d\n", thpool_num_threads_working(thpool1)); - * .. - * return 0; - * } - * - * @param threadpool the threadpool of interest - * @return integer number of threads working - */ -int thpool_num_threads_working(threadpool); - - -#ifdef __cplusplus -} -#endif - -#endif diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index d3a43bdcf9..850229dd4a 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -19,7 +19,6 @@ If docker version is lower than 23.0, build is failed by dockerignore [pattern i !testdata/*.php !testdata/*.txt +!caddy -+!C-Thread-Pool +!internal ``` diff --git a/Dockerfile b/Dockerfile index 6a0ae174a9..a4b48ddee6 100644 --- a/Dockerfile +++ b/Dockerfile @@ -80,7 +80,6 @@ RUN go mod graph | awk '{if ($1 !~ "@") print $2}' | xargs go get WORKDIR /go/src/app COPY --link *.* ./ COPY --link caddy caddy -COPY --link C-Thread-Pool C-Thread-Pool COPY --link internal internal COPY --link testdata testdata diff --git a/alpine.Dockerfile b/alpine.Dockerfile index 33e99a0373..114d8ed1c8 100644 --- a/alpine.Dockerfile +++ b/alpine.Dockerfile @@ -78,7 +78,6 @@ RUN go mod graph | awk '{if ($1 !~ "@") print $2}' | xargs go get WORKDIR /go/src/app COPY --link *.* ./ COPY --link caddy caddy -COPY --link C-Thread-Pool C-Thread-Pool COPY --link internal internal COPY --link testdata testdata diff --git a/docs/cn/CONTRIBUTING.md b/docs/cn/CONTRIBUTING.md index 9fecf9593b..a7887983b4 100644 --- a/docs/cn/CONTRIBUTING.md +++ b/docs/cn/CONTRIBUTING.md @@ -19,7 +19,6 @@ docker run --cap-add=SYS_PTRACE --security-opt seccomp=unconfined -p 8080:8080 - !testdata/*.php !testdata/*.txt +!caddy -+!C-Thread-Pool +!internal ``` diff --git a/docs/fr/CONTRIBUTING.md b/docs/fr/CONTRIBUTING.md index ce478456e1..7f9973969b 100644 --- a/docs/fr/CONTRIBUTING.md +++ b/docs/fr/CONTRIBUTING.md @@ -19,7 +19,6 @@ Si la version de Docker est inférieure à 23.0, la construction échoue à caus !testdata/*.php !testdata/*.txt +!caddy -+!C-Thread-Pool +!internal ``` diff --git a/docs/tr/CONTRIBUTING.md b/docs/tr/CONTRIBUTING.md index 2149de4159..a7b0937658 100644 --- a/docs/tr/CONTRIBUTING.md +++ b/docs/tr/CONTRIBUTING.md @@ -19,7 +19,6 @@ Docker sürümü 23.0'dan düşükse, derleme dockerignore [pattern issue](https !testdata/*.php !testdata/*.txt +!caddy -+!C-Thread-Pool +!internal ``` diff --git a/frankenphp.c b/frankenphp.c index 088f6997a1..3ccd64c442 100644 --- a/frankenphp.c +++ b/frankenphp.c @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -17,9 +18,11 @@ #include #include #include - -#include "C-Thread-Pool/thpool.c" -#include "C-Thread-Pool/thpool.h" +#if defined(__linux__) +#include +#elif defined(__FreeBSD__) || defined(__OpenBSD__) +#include +#endif #include "_cgo_export.h" #include "frankenphp_arginfo.h" @@ -453,12 +456,13 @@ int frankenphp_update_server_context( #endif /* todo: use a pool */ - ctx = (frankenphp_server_context *)calloc( - 1, sizeof(frankenphp_server_context)); + ctx = + (frankenphp_server_context *)malloc(sizeof(frankenphp_server_context)); if (ctx == NULL) { return FAILURE; } + ctx->worker_ready = false; ctx->cookie_data = NULL; ctx->finished = false; @@ -719,7 +723,36 @@ sapi_module_struct frankenphp_sapi_module = { STANDARD_SAPI_MODULE_PROPERTIES}; -static void *manager_thread(void *arg) { +/* Sets thread name for profiling and debugging. + * + * Adapted from https://github.com/Pithikos/C-Thread-Pool + * Copyright: Johan Hanssen Seferidis + * License: MIT + */ +static void set_thread_name(char *thread_name) { +#if defined(__linux__) + /* Use prctl instead to prevent using _GNU_SOURCE flag and implicit + * declaration */ + prctl(PR_SET_NAME, thread_name); +#elif defined(__APPLE__) && defined(__MACH__) + pthread_setname_np(thread_name); +#elif defined(__FreeBSD__) || defined(__OpenBSD__) + pthread_set_name_np(pthread_self(), thread_name); +#endif +} + +static void *php_thread(void *arg) { + char thread_name[16] = {0}; + snprintf(thread_name, 16, "php-%" PRIxPTR, (uintptr_t)arg); + set_thread_name(thread_name); + + while (go_handle_request()) { + } + + return NULL; +} + +static void *php_main(void *arg) { /* * SIGPIPE must be masked in non-Go threads: * https://pkg.go.dev/os/signal#hdr-Go_programs_that_use_cgo_or_SWIG @@ -735,6 +768,8 @@ static void *manager_thread(void *arg) { intptr_t num_threads = (intptr_t)arg; + set_thread_name("php-main"); + #ifdef ZTS #if (PHP_VERSION_ID >= 80300) php_tsrm_startup_ex(num_threads); @@ -754,6 +789,10 @@ static void *manager_thread(void *arg) { frankenphp_sapi_module.ini_entries = HARDCODED_INI; #else frankenphp_sapi_module.ini_entries = malloc(sizeof(HARDCODED_INI)); + if (frankenphp_sapi_module.ini_entries == NULL) { + perror("malloc failed"); + exit(EXIT_FAILURE); + } memcpy(frankenphp_sapi_module.ini_entries, HARDCODED_INI, sizeof(HARDCODED_INI)); #endif @@ -761,17 +800,30 @@ static void *manager_thread(void *arg) { frankenphp_sapi_module.startup(&frankenphp_sapi_module); - threadpool thpool = thpool_init(num_threads); + pthread_t *threads = malloc(num_threads * sizeof(pthread_t)); + if (threads == NULL) { + perror("malloc failed"); + exit(EXIT_FAILURE); + } - uintptr_t rh; - while ((rh = go_fetch_request())) { - thpool_add_work(thpool, go_execute_script, (void *)rh); + for (uintptr_t i = 0; i < num_threads; i++) { + if (pthread_create(&(*(threads + i)), NULL, &php_thread, (void *)i) != 0) { + perror("failed to create PHP thread"); + free(threads); + exit(EXIT_FAILURE); + } } - /* channel closed, shutdown gracefully */ - thpool_wait(thpool); - thpool_destroy(thpool); + for (int i = 0; i < num_threads; i++) { + if (pthread_join((*(threads + i)), NULL) != 0) { + perror("failed to join PHP thread"); + free(threads); + exit(EXIT_FAILURE); + } + } + free(threads); + /* channel closed, shutdown gracefully */ frankenphp_sapi_module.shutdown(&frankenphp_sapi_module); sapi_shutdown(); @@ -794,8 +846,8 @@ static void *manager_thread(void *arg) { int frankenphp_init(int num_threads) { pthread_t thread; - if (pthread_create(&thread, NULL, *manager_thread, - (void *)(intptr_t)num_threads) != 0) { + if (pthread_create(&thread, NULL, &php_main, (void *)(intptr_t)num_threads) != + 0) { go_shutdown(); return -1; diff --git a/frankenphp.go b/frankenphp.go index 76a0469de8..709018cea8 100644 --- a/frankenphp.go +++ b/frankenphp.go @@ -5,10 +5,6 @@ // [FrankenPHP app server]: https://frankenphp.dev package frankenphp -//go:generate rm -Rf C-Thread-Pool/ -//go:generate git clone --depth=1 git@github.com:Pithikos/C-Thread-Pool.git -//go:generate rm -Rf C-Thread-Pool/.git C-Thread-Pool/.github C-Thread-Pool/docs C-Thread-Pool/tests C-Thread-Pool/example.c - // Use PHP includes corresponding to your PHP installation by running: // // export CGO_CFLAGS=$(php-config --includes) @@ -468,16 +464,36 @@ func ServeHTTP(responseWriter http.ResponseWriter, request *http.Request) error return nil } -//export go_fetch_request -func go_fetch_request() C.uintptr_t { +//export go_handle_request +func go_handle_request() bool { select { case <-done: - return 0 + return false case r := <-requestChan: h := cgo.NewHandle(r) r.Context().Value(handleKey).(*handleList).AddHandle(h) - return C.uintptr_t(h) + + fc, ok := FromContext(r.Context()) + if !ok { + panic(InvalidRequestError) + } + defer func() { + maybeCloseContext(fc) + r.Context().Value(handleKey).(*handleList).FreeAll() + }() + + if err := updateServerContext(r, true, 0); err != nil { + panic(err) + } + + // scriptFilename is freed in frankenphp_execute_script() + fc.exitStatus = C.frankenphp_execute_script(C.CString(fc.scriptFilename)) + if fc.exitStatus < 0 { + panic(ScriptExecutionError) + } + + return true } } @@ -487,33 +503,6 @@ func maybeCloseContext(fc *FrankenPHPContext) { }) } -// go_execute_script Note: only called in cgi-mode -// -//export go_execute_script -func go_execute_script(rh unsafe.Pointer) { - handle := cgo.Handle(rh) - - request := handle.Value().(*http.Request) - fc, ok := FromContext(request.Context()) - if !ok { - panic(InvalidRequestError) - } - defer func() { - maybeCloseContext(fc) - request.Context().Value(handleKey).(*handleList).FreeAll() - }() - - if err := updateServerContext(request, true, 0); err != nil { - panic(err) - } - - // scriptFilename is freed in frankenphp_execute_script() - fc.exitStatus = C.frankenphp_execute_script(C.CString(fc.scriptFilename)) - if fc.exitStatus < 0 { - panic(ScriptExecutionError) - } -} - //export go_ub_write func go_ub_write(rh C.uintptr_t, cBuf *C.char, length C.int) (C.size_t, C.bool) { r := cgo.Handle(rh).Value().(*http.Request) diff --git a/static-builder.Dockerfile b/static-builder.Dockerfile index 97812ae8f0..ad4949a067 100644 --- a/static-builder.Dockerfile +++ b/static-builder.Dockerfile @@ -83,7 +83,6 @@ RUN go mod graph | awk '{if ($1 !~ "@") print $2}' | xargs go get WORKDIR /go/src/app COPY *.* ./ COPY caddy caddy -COPY C-Thread-Pool C-Thread-Pool RUN --mount=type=secret,id=github-token GITHUB_TOKEN=$(cat /run/secrets/github-token) ./build-static.sh && \ rm -Rf dist/static-php-cli/source/*