Skip to content

Commit 3970c2a

Browse files
committed
refactor: relocate SaturationDetector interface and its mock to the framework interface package.
1 parent 077c9a1 commit 3970c2a

File tree

16 files changed

+52
-51
lines changed

16 files changed

+52
-51
lines changed

apix/config/v1alpha1/endpointpickerconfig_types.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,6 @@ func (fg FeatureGates) String() string {
163163
return "{" + result + "}"
164164
}
165165

166-
167166
// DataLayerConfig contains the configuration of the DataLayer feature
168167
type DataLayerConfig struct {
169168
// +required

apix/config/v1alpha1/zz_generated.deepcopy.go

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cmd/epp/runner/runner.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,6 @@ const (
9393
// control layer.
9494
// DEPRECATION NOTICE - this env var will be removed in the next version as we switch to configuring the EPP using FeatureGates in the config file.
9595
enableExperimentalFlowControlLayer = "ENABLE_EXPERIMENTAL_FLOW_CONTROL_LAYER"
96-
9796
)
9897

9998
var (
@@ -550,7 +549,6 @@ func (r *Runner) parseConfigurationPhaseTwo(ctx context.Context, rawConfig *conf
550549
// The plugins will be executed in topologically sorted order to ensure that data is produced before it is consumed.
551550
r.requestControlConfig.OrderPrepareDataPlugins(dag)
552551

553-
554552
r.parser = handlers.NewParser(cfg.ParserConfig)
555553
logger.Info("loaded configuration from file/text successfully")
556554

@@ -569,7 +567,6 @@ func applyDeprecatedEnvFeatureGate(envVar, featureName, featureGate string, rawC
569567
}
570568
}
571569

572-
573570
func (r *Runner) setupDataLayer(enableNewMetrics bool, cfg *datalayer.Config,
574571
epf datalayer.EndpointFactory, mgr ctrl.Manager) error {
575572
disallowedMetricsExtractor := ""

pkg/epp/config/loader/configloader_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -526,7 +526,6 @@ func TestInstantiateAndConfigure(t *testing.T) {
526526
}
527527
}
528528

529-
530529
// --- Helpers & Mocks ---
531530

532531
func hasPluginType(handle fwkplugin.Handle, typeName string) bool {

pkg/epp/flowcontrol/benchmark/benchmark.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,12 +120,13 @@ func (m benchMatrix) name() string {
120120

121121
// testDetector exposes an API to manually release downstream capacity during a test run.
122122
type testDetector interface {
123-
contracts.SaturationDetector
123+
flowcontrol.SaturationDetector
124124
Release()
125125
}
126126

127127
// benchDetector models target saturation based strictly on active request counts.
128128
type benchDetector struct {
129+
plugin.Plugin
129130
concurrencyLimit atomic.Int64
130131
// _ prevents false sharing between atomic counters on multicore CPU cache lines.
131132
_ [56]byte
@@ -159,7 +160,9 @@ func (d *benchDetector) Saturation(ctx context.Context, candidates []metrics.Pod
159160

160161
// alwaysSaturatedDetector simulates a permanently saturated downstream pool.
161162
// It is strictly used to evaluate garbage collection pathways and client abandonment scenarios.
162-
type alwaysSaturatedDetector struct{}
163+
type alwaysSaturatedDetector struct {
164+
plugin.Plugin
165+
}
163166

164167
// Release is a no-op for the permanently saturated mock.
165168
func (d *alwaysSaturatedDetector) Release() {}

pkg/epp/flowcontrol/contracts/dependencies.go

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,3 @@ type PodLocator interface {
3131
// Locate returns a list of pod metrics that match the criteria defined in the request metadata.
3232
Locate(ctx context.Context, requestMetadata map[string]any) []fwkdl.Endpoint
3333
}
34-
35-
// SaturationDetector defines the contract for a component that provides real-time load signals to the FlowController.
36-
type SaturationDetector interface {
37-
// Saturation returns the saturation level of the pool
38-
// - A value >= 1.0 indicates that the system is fully saturated.
39-
// - A value < 1.0 indicates the ratio of used capacity to total capacity.
40-
//
41-
// FlowController consumes this signal to make dispatch decisions:
42-
// - If Saturation() >= 1.0: Stop dispatching (enforce HoL blocking).
43-
// - If Saturation() < 1.0: Continue dispatching.
44-
Saturation(ctx context.Context, candidatePods []fwkdl.Endpoint) float64
45-
}

pkg/epp/flowcontrol/contracts/mocks/mocks.go

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -108,18 +108,6 @@ var _ contracts.RegistryShard = &MockRegistryShard{}
108108

109109
// --- Dependency Mocks ---
110110

111-
// MockSaturationDetector is a simple "stub-style" mock for testing.
112-
type MockSaturationDetector struct {
113-
SaturationFunc func(ctx context.Context, candidatePods []metrics.PodMetrics) float64
114-
}
115-
116-
func (m *MockSaturationDetector) Saturation(ctx context.Context, candidatePods []metrics.PodMetrics) float64 {
117-
if m.SaturationFunc != nil {
118-
return m.SaturationFunc(ctx, candidatePods)
119-
}
120-
return 0.0
121-
}
122-
123111
// MockPodLocator provides a mock implementation of the contracts.PodLocator interface.
124112
// It allows tests to control the exact set of pods returned for a given request.
125113
type MockPodLocator struct {

pkg/epp/flowcontrol/controller/controller.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ type shardProcessor interface {
6262
type shardProcessorFactory func(
6363
ctx context.Context,
6464
shard contracts.RegistryShard,
65-
saturationDetector contracts.SaturationDetector,
65+
saturationDetector flowcontrol.SaturationDetector,
6666
podLocator contracts.PodLocator,
6767
clock clock.WithTicker,
6868
cleanupSweepInterval time.Duration,
@@ -98,7 +98,7 @@ type FlowController struct {
9898

9999
config *Config
100100
registry registryClient
101-
saturationDetector contracts.SaturationDetector
101+
saturationDetector flowcontrol.SaturationDetector
102102
podLocator contracts.PodLocator
103103
clock clock.WithTicker
104104
logger logr.Logger
@@ -131,7 +131,7 @@ func NewFlowController(
131131
poolName string,
132132
config *Config,
133133
registry contracts.FlowRegistry,
134-
sd contracts.SaturationDetector,
134+
sd flowcontrol.SaturationDetector,
135135
podLocator contracts.PodLocator,
136136
opts ...flowControllerOption,
137137
) (*FlowController, error) {
@@ -148,7 +148,7 @@ func NewFlowController(
148148
fc.shardProcessorFactory = func(
149149
ctx context.Context,
150150
shard contracts.RegistryShard,
151-
saturationDetector contracts.SaturationDetector,
151+
saturationDetector flowcontrol.SaturationDetector,
152152
podLocator contracts.PodLocator,
153153
clock clock.WithTicker,
154154
cleanupSweepInterval time.Duration,

pkg/epp/flowcontrol/controller/controller_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ func newUnitHarness(
115115
opt(harnessOpts)
116116
}
117117

118-
mockDetector := &mocks.MockSaturationDetector{}
118+
mockDetector := &frameworkmocks.MockSaturationDetector{}
119119
mockPodLocator := &mocks.MockPodLocator{}
120120

121121
mockProcessorFactory := &mockShardProcessorFactory{
@@ -154,7 +154,7 @@ func newUnitHarness(
154154
// validating the controller-processor interaction.
155155
func newIntegrationHarness(t *testing.T, ctx context.Context, cfg *Config, registry *mockRegistryClient) *testHarness {
156156
t.Helper()
157-
mockDetector := &mocks.MockSaturationDetector{}
157+
mockDetector := &frameworkmocks.MockSaturationDetector{}
158158
mockPodLocator := &mocks.MockPodLocator{}
159159

160160
// Align FakeClock with system time. See explanation in newUnitHarness.
@@ -276,7 +276,7 @@ type mockShardProcessorFactory struct {
276276
func (f *mockShardProcessorFactory) new(
277277
_ context.Context, // The factory does not use the lifecycle context; it's passed to the processor's Run method later.
278278
shard contracts.RegistryShard,
279-
_ contracts.SaturationDetector,
279+
_ flowcontrol.SaturationDetector,
280280
_ contracts.PodLocator,
281281
_ clock.WithTicker,
282282
_ time.Duration,
@@ -1140,7 +1140,7 @@ func TestFlowController_WorkerManagement(t *testing.T) {
11401140
h.fc.shardProcessorFactory = func(
11411141
ctx context.Context, // The context created by getOrStartWorker for the potential new processor.
11421142
shard contracts.RegistryShard,
1143-
_ contracts.SaturationDetector,
1143+
_ flowcontrol.SaturationDetector,
11441144
_ contracts.PodLocator,
11451145
_ clock.WithTicker,
11461146
_ time.Duration,

pkg/epp/flowcontrol/controller/internal/processor.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ var ErrProcessorBusy = errors.New("shard processor is busy")
6767
type ShardProcessor struct {
6868
poolName string
6969
shard contracts.RegistryShard
70-
saturationDetector contracts.SaturationDetector
70+
saturationDetector flowcontrol.SaturationDetector
7171
podLocator contracts.PodLocator
7272
clock clock.WithTicker
7373
cleanupSweepInterval time.Duration
@@ -90,7 +90,7 @@ func NewShardProcessor(
9090
ctx context.Context,
9191
poolName string,
9292
shard contracts.RegistryShard,
93-
saturationDetector contracts.SaturationDetector,
93+
saturationDetector flowcontrol.SaturationDetector,
9494
podLocator contracts.PodLocator,
9595
clock clock.WithTicker,
9696
cleanupSweepInterval time.Duration,

0 commit comments

Comments
 (0)