-
Notifications
You must be signed in to change notification settings - Fork 456
Expand file tree
/
Copy paththreadtaskworker.go
More file actions
280 lines (234 loc) · 6.74 KB
/
threadtaskworker.go
File metadata and controls
280 lines (234 loc) · 6.74 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
package frankenphp
// #include "frankenphp.h"
// #include <php_variables.h>
import "C"
import (
"errors"
"github.com/dunglas/frankenphp/internal/fastabs"
"sync"
"unsafe"
)
type taskWorker struct {
threads []*phpThread
threadMutex sync.RWMutex
fileName string
taskChan chan *pendingTask
name string
num int
env PreparedEnv
}
// representation of a thread that handles tasks directly assigned by go or via frankenphp_send_request()
// can also just execute a script in a loop
// implements the threadHandler interface
type taskWorkerThread struct {
thread *phpThread
taskWorker *taskWorker
dummyContext *frankenPHPContext
currentTask *pendingTask
}
var taskWorkers []*taskWorker
// EXPERIMENTAL: a task dispatched to a task worker
type pendingTask struct {
message any // the argument passed to frankenphp_send_request() or the return value of frankenphp_handle_request()
done sync.RWMutex
callback func() // optional callback for direct execution (tests)
}
func initTaskWorkers(opts []workerOpt) error {
taskWorkers = make([]*taskWorker, 0, len(opts))
ready := sync.WaitGroup{}
for _, opt := range opts {
fileName, err := fastabs.FastAbs(opt.fileName)
if err != nil {
return err
}
if opt.maxQueueLen <= 0 {
opt.maxQueueLen = 10000 // default queue len, TODO: unlimited?
}
tw := &taskWorker{
threads: make([]*phpThread, 0, opt.num),
fileName: fileName,
taskChan: make(chan *pendingTask, opt.maxQueueLen),
name: opt.name,
num: opt.num,
env: opt.env,
}
taskWorkers = append(taskWorkers, tw)
// start the actual PHP threads
ready.Add(tw.num)
for i := 0; i < tw.num; i++ {
thread := getInactivePHPThread()
convertToTaskWorkerThread(thread, tw)
go func(thread *phpThread) {
thread.state.waitFor(stateReady)
ready.Done()
}(thread)
}
}
ready.Wait()
return nil
}
func drainTaskWorkers() {
for _, tw := range taskWorkers {
tw.drainQueue()
}
}
func convertToTaskWorkerThread(thread *phpThread, tw *taskWorker) *taskWorkerThread {
handler := &taskWorkerThread{
thread: thread,
taskWorker: tw,
}
thread.setHandler(handler)
return handler
}
func (handler *taskWorkerThread) beforeScriptExecution() string {
thread := handler.thread
switch thread.state.get() {
case stateTransitionRequested:
handler.taskWorker.detach(thread)
return thread.transitionToNewHandler()
case stateBooting, stateTransitionComplete:
tw := handler.taskWorker
tw.threadMutex.Lock()
tw.threads = append(tw.threads, thread)
tw.threadMutex.Unlock()
thread.state.set(stateReady)
thread.updateContext(false, true)
return handler.setupWorkerScript()
case stateReady:
return handler.setupWorkerScript()
case stateRestarting:
thread.state.set(stateYielding)
thread.state.waitFor(stateReady, stateShuttingDown)
return handler.beforeScriptExecution()
case stateShuttingDown:
handler.taskWorker.detach(thread)
// signal to stop
return ""
}
panic("unexpected state: " + thread.state.name())
}
func (handler *taskWorkerThread) setupWorkerScript() string {
fc, err := newDummyContext(handler.taskWorker.fileName, WithRequestPreparedEnv(handler.taskWorker.env))
if err != nil {
panic(err)
}
handler.dummyContext = fc
clearSandboxedEnv(handler.thread)
return handler.taskWorker.fileName
}
func (handler *taskWorkerThread) afterScriptExecution(int) {
// restart the script
}
func (handler *taskWorkerThread) getRequestContext() *frankenPHPContext {
return handler.dummyContext
}
func (handler *taskWorkerThread) name() string {
return "Task Worker PHP Thread - " + handler.taskWorker.fileName
}
func (tw *taskWorker) detach(thread *phpThread) {
tw.threadMutex.Lock()
defer tw.threadMutex.Unlock()
for i, t := range tw.threads {
if t == thread {
tw.threads = append(tw.threads[:i], tw.threads[i+1:]...)
return
}
}
}
// make sure all tasks are done by re-queuing them until the channel is empty
func (tw *taskWorker) drainQueue() {
for {
select {
case pt := <-tw.taskChan:
tw.taskChan <- pt
pt.done.RLock() // wait for completion
default:
return
}
}
}
func (tw *taskWorker) dispatch(t *pendingTask) error {
t.done.Lock()
select {
case tw.taskChan <- t:
return nil
default:
return errors.New("Task worker queue is full, cannot dispatch task: " + tw.name)
}
}
func getTaskWorkerByName(name string) *taskWorker {
for _, w := range taskWorkers {
if w.name == name {
return w
}
}
return nil
}
//export go_frankenphp_worker_handle_task
func go_frankenphp_worker_handle_task(threadIndex C.uintptr_t) *C.zval {
thread := phpThreads[threadIndex]
handler, _ := thread.handler.(*taskWorkerThread)
thread.Unpin()
thread.state.markAsWaiting(true)
select {
case task := <-handler.taskWorker.taskChan:
handler.currentTask = task
thread.state.markAsWaiting(false)
// if the task has a callback, execute it (see types_test.go)
if task.callback != nil {
task.callback()
go_frankenphp_finish_task(threadIndex, nil)
return go_frankenphp_worker_handle_task(threadIndex)
}
zval := phpValue(task.message)
task.message = nil // free memory
thread.Pin(unsafe.Pointer(zval)) // TODO: refactor types.go so no pinning is required
return zval
case <-handler.thread.drainChan:
thread.state.markAsWaiting(false)
// send an empty task to drain the thread
return nil
}
}
//export go_frankenphp_finish_task
func go_frankenphp_finish_task(threadIndex C.uintptr_t, zv *C.zval) {
thread := phpThreads[threadIndex]
handler, ok := thread.handler.(*taskWorkerThread)
if !ok {
panic("thread is not a task thread: " + thread.handler.name())
}
if zv != nil {
result, err := goValue[any](zv)
if err != nil {
panic("failed to convert go_frankenphp_finish_task() return value: " + err.Error())
}
handler.currentTask.message = result
}
handler.currentTask.done.Unlock()
handler.currentTask = nil
}
//export go_frankenphp_send_request
func go_frankenphp_send_request(threadIndex C.uintptr_t, zv *C.zval, name *C.char, nameLen C.size_t) *C.char {
if zv == nil {
return phpThreads[threadIndex].pinCString("Task argument cannot be null")
}
var tw *taskWorker
if nameLen != 0 {
tw = getTaskWorkerByName(C.GoStringN(name, C.int(nameLen)))
} else if len(taskWorkers) != 0 {
tw = taskWorkers[0]
}
if tw == nil {
return phpThreads[threadIndex].pinCString("No worker found to handle this task: " + C.GoStringN(name, C.int(nameLen)))
}
// convert the argument of frankenphp_send_request() to a Go value
goArg, err := goValue[any](zv)
if err != nil {
return phpThreads[threadIndex].pinCString("Failed to convert frankenphp_send_request() argument: " + err.Error())
}
err = tw.dispatch(&pendingTask{message: goArg})
if err != nil {
return phpThreads[threadIndex].pinCString(err.Error())
}
return nil
}