-
Notifications
You must be signed in to change notification settings - Fork 462
Expand file tree
/
Copy pathphpthread.go
More file actions
296 lines (247 loc) · 8.56 KB
/
Copy pathphpthread.go
File metadata and controls
296 lines (247 loc) · 8.56 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
package frankenphp
// #cgo nocallback frankenphp_new_php_thread
// #include "frankenphp.h"
import "C"
import (
"context"
"log/slog"
"runtime"
"sync"
"sync/atomic"
"unsafe"
"github.com/dunglas/frankenphp/internal/state"
)
// representation of the actual underlying PHP thread
// identified by the index in the phpThreads slice
type phpThread struct {
runtime.Pinner
threadIndex int
requestChan chan contextHolder
drainChan chan struct{}
handlerMu sync.RWMutex
handler threadHandler
contextMu sync.RWMutex
state *state.ThreadState
requestCount atomic.Int64
// forceKill holds &EG() pointers captured on the PHP thread itself.
// forceKillMu pairs with go_frankenphp_clear_force_kill_slot's write
// lock so a concurrent kill never dereferences pointers freed by
// ts_free_thread.
forceKillMu sync.RWMutex
forceKill C.force_kill_slot
}
// threadHandler defines how the callbacks from the C thread should be handled
type threadHandler interface {
name() string
beforeScriptExecution() string
afterScriptExecution(exitStatus int)
context() context.Context
frankenPHPContext() *frankenPHPContext
// drain is a hook called by drainWorkerThreads right before drainChan is
// closed. Handlers that need to wake up a thread parked in a blocking C
// call (e.g. by closing a stop pipe) plug their signal in here. All
// current handlers are no-ops; this is the seam later handler types use
// without having to modify drainWorkerThreads.
drain()
}
func newPHPThread(threadIndex int) *phpThread {
return &phpThread{
threadIndex: threadIndex,
requestChan: make(chan contextHolder),
state: state.NewThreadState(),
}
}
// boot starts the underlying PHP thread
func (thread *phpThread) boot() {
// thread must be in reserved state to boot
if !thread.state.CompareAndSwap(state.Reserved, state.Booting) && !thread.state.CompareAndSwap(state.BootRequested, state.Booting) {
panic("thread is not in reserved state: " + thread.state.Name())
}
// boot threads as inactive
thread.handlerMu.Lock()
thread.handler = &inactiveThread{thread: thread}
thread.drainChan = make(chan struct{})
thread.handlerMu.Unlock()
// start the actual posix thread - TODO: try this with go threads instead
if !C.frankenphp_new_php_thread(C.uintptr_t(thread.threadIndex)) {
panic("unable to create thread")
}
thread.state.WaitFor(state.Inactive)
}
// reboot the underlying C thread. Ignore the request if state is currently not Ready.
func (thread *phpThread) reboot() bool {
if !thread.state.CompareAndSwap(state.Ready, state.Rebooting) {
return false // thread is not ready to reboot
}
go func() {
thread.state.WaitFor(state.RebootReady)
if !C.frankenphp_new_php_thread(C.uintptr_t(thread.threadIndex)) {
panic("unable to create thread")
}
}()
return true
}
// force the underlying C thread to reboot. Will always reboot unless already shutting down or done.
func (thread *phpThread) forceReboot() bool {
if !thread.state.RequestSafeStateChange(state.ForceRebooting) {
// thread already shutting down or done
return false
}
go func() {
thread.state.WaitFor(state.RebootReady)
if !C.frankenphp_new_php_thread(C.uintptr_t(thread.threadIndex)) {
panic("unable to create thread")
}
}()
return true
}
// shutdown the underlying PHP thread
func (thread *phpThread) shutdown() {
if !thread.state.RequestSafeStateChange(state.ShuttingDown) {
// thread is already shutting down, prefer the stable reserved state over done
_ = thread.state.CompareAndSwap(state.Done, state.Reserved)
return
}
close(thread.drainChan)
// Arm force-kill after the grace period to wake any thread stuck in
// a blocking syscall (sleep, blocking I/O). The wait remains
// unbounded - on platforms where force-kill cannot interrupt the
// syscall (macOS, Windows non-alertable Sleep) the thread will exit
// when the syscall completes naturally; the operator's orchestrator
// is responsible for any harder timeout.
if !thread.state.WaitForStateWithTimeout(shutDownGracePeriod, state.Done) {
globalLogger.LogAttrs(
globalCtx,
slog.LevelWarn,
"force-killing thread on shutdown timeout",
slog.String("name", thread.name()),
slog.String("state", thread.state.Name()),
slog.String("timeout", shutDownGracePeriod.String()),
)
thread.sendKillSignal()
thread.state.WaitFor(state.Done)
}
thread.drainChan = make(chan struct{})
// threads go back to the reserved state from which they can be booted again
thread.state.Set(state.Reserved)
}
// setHandler changes the thread handler safely
// must be called from outside the PHP thread
func (thread *phpThread) setHandler(handler threadHandler) {
thread.handlerMu.Lock()
defer thread.handlerMu.Unlock()
if !thread.state.RequestSafeStateChange(state.TransitionRequested) {
// no state change allowed == shutdown or done
return
}
close(thread.drainChan)
thread.state.WaitFor(state.TransitionInProgress)
thread.handler = handler
thread.drainChan = make(chan struct{})
thread.state.Set(state.TransitionComplete)
}
// transition to a new handler safely
// is triggered by setHandler and executed on the PHP thread
func (thread *phpThread) transitionToNewHandler() string {
thread.state.Set(state.TransitionInProgress)
thread.state.WaitFor(state.TransitionComplete)
// execute beforeScriptExecution of the new handler
return thread.handler.beforeScriptExecution()
}
func (thread *phpThread) frankenPHPContext() *frankenPHPContext {
return thread.handler.frankenPHPContext()
}
func (thread *phpThread) context() context.Context {
if thread.handler == nil {
// handler can be nil when using opcache.preload
return globalCtx
}
return thread.handler.context()
}
func (thread *phpThread) name() string {
thread.handlerMu.RLock()
defer thread.handlerMu.RUnlock()
if thread.handler == nil {
return "unknown"
}
return thread.handler.name()
}
// send a kill signal to PHP (ZTS compatible)
// make sure to only call this if PHP is actively handling a request
func (thread *phpThread) sendKillSignal() {
thread.forceKillMu.RLock()
C.frankenphp_force_kill_thread(thread.forceKill)
thread.forceKillMu.RUnlock()
}
// Pin a string that is not null-terminated
// PHP's zend_string may contain null-bytes
func (thread *phpThread) pinString(s string) *C.char {
sData := unsafe.StringData(s)
if sData == nil {
return nil
}
thread.Pin(sData)
return (*C.char)(unsafe.Pointer(sData))
}
// C strings must be null-terminated
func (thread *phpThread) pinCString(s string) *C.char {
return thread.pinString(s + "\x00")
}
func (*phpThread) updateContext(isWorker bool) {
C.frankenphp_update_local_thread_context(C.bool(isWorker))
}
//export go_frankenphp_before_script_execution
func go_frankenphp_before_script_execution(threadIndex C.uintptr_t) *C.char {
thread := phpThreads[threadIndex]
scriptName := thread.handler.beforeScriptExecution()
// if no scriptName is passed, shut down
if scriptName == "" {
return nil
}
// return the name of the PHP script that should be executed
return thread.pinCString(scriptName)
}
//export go_frankenphp_after_script_execution
func go_frankenphp_after_script_execution(threadIndex C.uintptr_t, exitStatus C.int) {
thread := phpThreads[threadIndex]
if exitStatus < 0 {
panic(ErrScriptExecution)
}
thread.handler.afterScriptExecution(int(exitStatus))
// unpin all memory used during script execution
thread.Unpin()
}
//export go_frankenphp_store_force_kill_slot
func go_frankenphp_store_force_kill_slot(threadIndex C.uintptr_t, slot C.force_kill_slot) {
thread := phpThreads[threadIndex]
thread.forceKillMu.Lock()
// Release any prior slot's OS resource (Windows HANDLE) before
// overwriting; a phpThread can reboot and re-register.
C.frankenphp_release_thread_for_kill(thread.forceKill)
thread.forceKill = slot
thread.forceKillMu.Unlock()
}
//export go_frankenphp_clear_force_kill_slot
func go_frankenphp_clear_force_kill_slot(threadIndex C.uintptr_t) {
// Called from C before ts_free_thread on both exit paths. Zeroing
// the slot under the write lock guarantees any concurrent kill
// either completed before we got the lock or sees a zero slot.
thread := phpThreads[threadIndex]
thread.forceKillMu.Lock()
C.frankenphp_release_thread_for_kill(thread.forceKill)
thread.forceKill = C.force_kill_slot{}
thread.forceKillMu.Unlock()
}
//export go_frankenphp_on_thread_shutdown
func go_frankenphp_on_thread_shutdown(threadIndex C.uintptr_t) {
thread := phpThreads[threadIndex]
thread.Unpin()
switch thread.state.Get() {
case state.Rebooting:
thread.state.Set(state.RebootReady)
case state.ForceRebooting:
thread.state.Set(state.YieldingForReboot)
default:
thread.state.Set(state.Done)
}
}