Skip to content

Commit 9093f12

Browse files
committed
apiserver: refactor WithWaitGroup handler
1 parent 03b78f3 commit 9093f12

File tree

6 files changed

+81
-59
lines changed

6 files changed

+81
-59
lines changed

staging/src/k8s.io/apiserver/pkg/server/config.go

Lines changed: 37 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -156,8 +156,10 @@ type Config struct {
156156

157157
// BuildHandlerChainFunc allows you to build custom handler chains by decorating the apiHandler.
158158
BuildHandlerChainFunc func(apiHandler http.Handler, c *Config) (secure http.Handler)
159-
// HandlerChainWaitGroup allows you to wait for all chain handlers exit after the server shutdown.
160-
HandlerChainWaitGroup *utilwaitgroup.SafeWaitGroup
159+
// NonLongRunningRequestWaitGroup allows you to wait for all chain
160+
// handlers associated with non long-running requests
161+
// to complete while the server is shuting down.
162+
NonLongRunningRequestWaitGroup *utilwaitgroup.SafeWaitGroup
161163
// DiscoveryAddresses is used to build the IPs pass to discovery. If nil, the ExternalAddress is
162164
// always reported
163165
DiscoveryAddresses discovery.Addresses
@@ -349,26 +351,26 @@ func NewConfig(codecs serializer.CodecFactory) *Config {
349351
lifecycleSignals := newLifecycleSignals()
350352

351353
return &Config{
352-
Serializer: codecs,
353-
BuildHandlerChainFunc: DefaultBuildHandlerChain,
354-
HandlerChainWaitGroup: new(utilwaitgroup.SafeWaitGroup),
355-
LegacyAPIGroupPrefixes: sets.NewString(DefaultLegacyAPIPrefix),
356-
DisabledPostStartHooks: sets.NewString(),
357-
PostStartHooks: map[string]PostStartHookConfigEntry{},
358-
HealthzChecks: append([]healthz.HealthChecker{}, defaultHealthChecks...),
359-
ReadyzChecks: append([]healthz.HealthChecker{}, defaultHealthChecks...),
360-
LivezChecks: append([]healthz.HealthChecker{}, defaultHealthChecks...),
361-
EnableIndex: true,
362-
EnableDiscovery: true,
363-
EnableProfiling: true,
364-
DebugSocketPath: "",
365-
EnableMetrics: true,
366-
MaxRequestsInFlight: 400,
367-
MaxMutatingRequestsInFlight: 200,
368-
RequestTimeout: time.Duration(60) * time.Second,
369-
MinRequestTimeout: 1800,
370-
LivezGracePeriod: time.Duration(0),
371-
ShutdownDelayDuration: time.Duration(0),
354+
Serializer: codecs,
355+
BuildHandlerChainFunc: DefaultBuildHandlerChain,
356+
NonLongRunningRequestWaitGroup: new(utilwaitgroup.SafeWaitGroup),
357+
LegacyAPIGroupPrefixes: sets.NewString(DefaultLegacyAPIPrefix),
358+
DisabledPostStartHooks: sets.NewString(),
359+
PostStartHooks: map[string]PostStartHookConfigEntry{},
360+
HealthzChecks: append([]healthz.HealthChecker{}, defaultHealthChecks...),
361+
ReadyzChecks: append([]healthz.HealthChecker{}, defaultHealthChecks...),
362+
LivezChecks: append([]healthz.HealthChecker{}, defaultHealthChecks...),
363+
EnableIndex: true,
364+
EnableDiscovery: true,
365+
EnableProfiling: true,
366+
DebugSocketPath: "",
367+
EnableMetrics: true,
368+
MaxRequestsInFlight: 400,
369+
MaxMutatingRequestsInFlight: 200,
370+
RequestTimeout: time.Duration(60) * time.Second,
371+
MinRequestTimeout: 1800,
372+
LivezGracePeriod: time.Duration(0),
373+
ShutdownDelayDuration: time.Duration(0),
372374
// 1.5MB is the default client request size in bytes
373375
// the etcd server should accept. See
374376
// https://github.com/etcd-io/etcd/blob/release-3.4/embed/config.go#L56.
@@ -641,18 +643,18 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
641643
apiServerHandler := NewAPIServerHandler(name, c.Serializer, handlerChainBuilder, delegationTarget.UnprotectedHandler())
642644

643645
s := &GenericAPIServer{
644-
discoveryAddresses: c.DiscoveryAddresses,
645-
LoopbackClientConfig: c.LoopbackClientConfig,
646-
legacyAPIGroupPrefixes: c.LegacyAPIGroupPrefixes,
647-
admissionControl: c.AdmissionControl,
648-
Serializer: c.Serializer,
649-
AuditBackend: c.AuditBackend,
650-
Authorizer: c.Authorization.Authorizer,
651-
delegationTarget: delegationTarget,
652-
EquivalentResourceRegistry: c.EquivalentResourceRegistry,
653-
HandlerChainWaitGroup: c.HandlerChainWaitGroup,
654-
Handler: apiServerHandler,
655-
UnprotectedDebugSocket: debugSocket,
646+
discoveryAddresses: c.DiscoveryAddresses,
647+
LoopbackClientConfig: c.LoopbackClientConfig,
648+
legacyAPIGroupPrefixes: c.LegacyAPIGroupPrefixes,
649+
admissionControl: c.AdmissionControl,
650+
Serializer: c.Serializer,
651+
AuditBackend: c.AuditBackend,
652+
Authorizer: c.Authorization.Authorizer,
653+
delegationTarget: delegationTarget,
654+
EquivalentResourceRegistry: c.EquivalentResourceRegistry,
655+
NonLongRunningRequestWaitGroup: c.NonLongRunningRequestWaitGroup,
656+
Handler: apiServerHandler,
657+
UnprotectedDebugSocket: debugSocket,
656658

657659
listedPathProvider: apiServerHandler,
658660

@@ -887,7 +889,7 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
887889

888890
handler = genericapifilters.WithRequestDeadline(handler, c.AuditBackend, c.AuditPolicyRuleEvaluator,
889891
c.LongRunningFunc, c.Serializer, c.RequestTimeout)
890-
handler = genericfilters.WithWaitGroup(handler, c.LongRunningFunc, c.HandlerChainWaitGroup)
892+
handler = genericfilters.WithWaitGroup(handler, c.LongRunningFunc, c.NonLongRunningRequestWaitGroup)
891893
if c.SecureServing != nil && !c.SecureServing.DisableHTTP2 && c.GoawayChance > 0 {
892894
handler = genericfilters.WithProbabilisticGoaway(handler, c.GoawayChance)
893895
}

staging/src/k8s.io/apiserver/pkg/server/config_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -298,12 +298,12 @@ func TestAuthenticationAuditAnnotationsDefaultChain(t *testing.T) {
298298
AuditPolicyRuleEvaluator: policy.NewFakePolicyRuleEvaluator(auditinternal.LevelMetadata, nil),
299299

300300
// avoid nil panics
301-
HandlerChainWaitGroup: &waitgroup.SafeWaitGroup{},
302-
RequestInfoResolver: &request.RequestInfoFactory{},
303-
RequestTimeout: 10 * time.Second,
304-
LongRunningFunc: func(_ *http.Request, _ *request.RequestInfo) bool { return false },
305-
lifecycleSignals: newLifecycleSignals(),
306-
TracerProvider: tracing.NewNoopTracerProvider(),
301+
NonLongRunningRequestWaitGroup: &waitgroup.SafeWaitGroup{},
302+
RequestInfoResolver: &request.RequestInfoFactory{},
303+
RequestTimeout: 10 * time.Second,
304+
LongRunningFunc: func(_ *http.Request, _ *request.RequestInfo) bool { return false },
305+
lifecycleSignals: newLifecycleSignals(),
306+
TracerProvider: tracing.NewNoopTracerProvider(),
307307
}
308308

309309
h := DefaultBuildHandlerChain(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {

staging/src/k8s.io/apiserver/pkg/server/filters/waitgroup.go

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,20 +24,34 @@ import (
2424
"k8s.io/api/core/v1"
2525
apierrors "k8s.io/apimachinery/pkg/api/errors"
2626
"k8s.io/apimachinery/pkg/runtime"
27-
utilwaitgroup "k8s.io/apimachinery/pkg/util/waitgroup"
2827
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
2928
apirequest "k8s.io/apiserver/pkg/endpoints/request"
3029
"k8s.io/client-go/kubernetes/scheme"
3130
)
3231

32+
// RequestWaitGroup helps with the accounting of request(s) that are in
33+
// flight: the caller is expected to invoke Add(1) before executing the
34+
// request handler and then invoke Done() when the handler finishes.
35+
// NOTE: implementations must ensure that it is thread-safe
36+
// when invoked from multiple goroutines.
37+
type RequestWaitGroup interface {
38+
// Add adds delta, which may be negative, similar to sync.WaitGroup.
39+
// If Add with a positive delta happens after Wait, it will return error,
40+
// which prevent unsafe Add.
41+
Add(delta int) error
42+
43+
// Done decrements the WaitGroup counter.
44+
Done()
45+
}
46+
3347
// WithWaitGroup adds all non long-running requests to wait group, which is used for graceful shutdown.
34-
func WithWaitGroup(handler http.Handler, longRunning apirequest.LongRunningRequestCheck, wg *utilwaitgroup.SafeWaitGroup) http.Handler {
48+
func WithWaitGroup(handler http.Handler, longRunning apirequest.LongRunningRequestCheck, wg RequestWaitGroup) http.Handler {
3549
// NOTE: both WithWaitGroup and WithRetryAfter must use the same exact isRequestExemptFunc 'isRequestExemptFromRetryAfter,
3650
// otherwise SafeWaitGroup might wait indefinitely and will prevent the server from shutting down gracefully.
3751
return withWaitGroup(handler, longRunning, wg, isRequestExemptFromRetryAfter)
3852
}
3953

40-
func withWaitGroup(handler http.Handler, longRunning apirequest.LongRunningRequestCheck, wg *utilwaitgroup.SafeWaitGroup, isRequestExemptFn isRequestExemptFunc) http.Handler {
54+
func withWaitGroup(handler http.Handler, longRunning apirequest.LongRunningRequestCheck, wg RequestWaitGroup, isRequestExemptFn isRequestExemptFunc) http.Handler {
4155
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
4256
ctx := req.Context()
4357
requestInfo, ok := apirequest.RequestInfoFrom(ctx)
@@ -64,16 +78,20 @@ func withWaitGroup(handler http.Handler, longRunning apirequest.LongRunningReque
6478

6579
// When apiserver is shutting down, signal clients to retry
6680
// There is a good chance the client hit a different server, so a tight retry is good for client responsiveness.
67-
w.Header().Add("Retry-After", "1")
68-
w.Header().Set("Content-Type", runtime.ContentTypeJSON)
69-
w.Header().Set("X-Content-Type-Options", "nosniff")
70-
statusErr := apierrors.NewServiceUnavailable("apiserver is shutting down").Status()
71-
w.WriteHeader(int(statusErr.Code))
72-
fmt.Fprintln(w, runtime.EncodeOrDie(scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &statusErr))
81+
waitGroupWriteRetryAfterToResponse(w)
7382
return
7483
}
7584

7685
defer wg.Done()
7786
handler.ServeHTTP(w, req)
7887
})
7988
}
89+
90+
func waitGroupWriteRetryAfterToResponse(w http.ResponseWriter) {
91+
w.Header().Add("Retry-After", "1")
92+
w.Header().Set("Content-Type", runtime.ContentTypeJSON)
93+
w.Header().Set("X-Content-Type-Options", "nosniff")
94+
statusErr := apierrors.NewServiceUnavailable("apiserver is shutting down").Status()
95+
w.WriteHeader(int(statusErr.Code))
96+
fmt.Fprintln(w, runtime.EncodeOrDie(scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &statusErr))
97+
}

staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -217,8 +217,10 @@ type GenericAPIServer struct {
217217
// delegationTarget is the next delegate in the chain. This is never nil.
218218
delegationTarget DelegationTarget
219219

220-
// HandlerChainWaitGroup allows you to wait for all chain handlers finish after the server shutdown.
221-
HandlerChainWaitGroup *utilwaitgroup.SafeWaitGroup
220+
// NonLongRunningRequestWaitGroup allows you to wait for all chain
221+
// handlers associated with non long-running requests
222+
// to complete while the server is shuting down.
223+
NonLongRunningRequestWaitGroup *utilwaitgroup.SafeWaitGroup
222224

223225
// ShutdownDelayDuration allows to block shutdown for some time, e.g. until endpoints pointing to this API server
224226
// have converged on all node. During this time, the API server keeps serving, /healthz will return 200,
@@ -452,7 +454,7 @@ func (s *GenericAPIServer) PrepareRun() preparedGenericAPIServer {
452454
// | | | | |
453455
// | | ---------------| |
454456
// | | | |
455-
// | | (HandlerChainWaitGroup::Wait) |
457+
// | | (NonLongRunningRequestWaitGroup::Wait) |
456458
// | | | |
457459
// | | InFlightRequestsDrained (drainedCh) |
458460
// | | | |
@@ -582,7 +584,7 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
582584
<-notAcceptingNewRequestCh.Signaled()
583585

584586
// Wait for all requests to finish, which are bounded by the RequestTimeout variable.
585-
// once HandlerChainWaitGroup.Wait is invoked, the apiserver is
587+
// once NonLongRunningRequestWaitGroup.Wait is invoked, the apiserver is
586588
// expected to reject any incoming request with a {503, Retry-After}
587589
// response via the WithWaitGroup filter. On the contrary, we observe
588590
// that incoming request(s) get a 'connection refused' error, this is
@@ -594,7 +596,7 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
594596
// 'Server.Shutdown' will be invoked only after in-flight requests
595597
// have been drained.
596598
// TODO: can we consolidate these two modes of graceful termination?
597-
s.HandlerChainWaitGroup.Wait()
599+
s.NonLongRunningRequestWaitGroup.Wait()
598600
}()
599601

600602
klog.V(1).Info("[graceful-termination] waiting for shutdown to be initiated")

staging/src/k8s.io/apiserver/pkg/server/genericapiserver_graceful_termination_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ func newSignalInterceptingTestStep() *signalInterceptingTestStep {
144144
// | |
145145
// | |-------------------------------------------------|
146146
// | | |
147-
// | close(stopHttpServerCh) HandlerChainWaitGroup.Wait()
147+
// | close(stopHttpServerCh) NonLongRunningRequestWaitGroup.Wait()
148148
// | | |
149149
// | server.Shutdown(timeout=60s) |
150150
// | | |
@@ -357,7 +357,7 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationDisabled(t
357357
// | |
358358
// | (NotAcceptingNewRequest)
359359
// | |
360-
// | HandlerChainWaitGroup.Wait()
360+
// | NonLongRunningRequestWaitGroup.Wait()
361361
// | |
362362
// | (InFlightRequestsDrained)
363363
// | |

staging/src/k8s.io/apiserver/pkg/server/genericapiserver_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -602,7 +602,7 @@ func TestGracefulShutdown(t *testing.T) {
602602
wg.Add(1)
603603

604604
config.BuildHandlerChainFunc = func(apiHandler http.Handler, c *Config) http.Handler {
605-
handler := genericfilters.WithWaitGroup(apiHandler, c.LongRunningFunc, c.HandlerChainWaitGroup)
605+
handler := genericfilters.WithWaitGroup(apiHandler, c.LongRunningFunc, c.NonLongRunningRequestWaitGroup)
606606
handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver)
607607
return handler
608608
}
@@ -666,7 +666,7 @@ func TestGracefulShutdown(t *testing.T) {
666666
}
667667

668668
// wait for wait group handler finish
669-
s.HandlerChainWaitGroup.Wait()
669+
s.NonLongRunningRequestWaitGroup.Wait()
670670
<-stoppedCh
671671

672672
// check server all handlers finished.

0 commit comments

Comments
 (0)