Skip to content

Commit 9c00cd7

Browse files
authored
feat: pool-wide saturation computation + expose as gauge (kubernetes-sigs#2343)
* refactor: pool-wide saturation metrics + prometheus Signed-off-by: Edoardo Vacchi <evacchi@users.noreply.github.com> * remove redundant field Signed-off-by: Edoardo Vacchi <evacchi@users.noreply.github.com> * address review comments Signed-off-by: Edoardo Vacchi <evacchi@users.noreply.github.com> * rebase issue Signed-off-by: Edoardo Vacchi <evacchi@users.noreply.github.com> * fix rebase Signed-off-by: Edoardo Vacchi <evacchi@users.noreply.github.com> --------- Signed-off-by: Edoardo Vacchi <evacchi@users.noreply.github.com>
1 parent eb017be commit 9c00cd7

File tree

8 files changed

+82
-11
lines changed

8 files changed

+82
-11
lines changed

cmd/epp/runner/runner.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -341,6 +341,7 @@ func (r *Runner) setup(ctx context.Context, cfg *rest.Config, opts *runserver.Op
341341
}
342342
fc, err := fccontroller.NewFlowController(
343343
ctx,
344+
opts.PoolName,
344345
eppConfig.FlowControlConfig.Controller,
345346
registry, saturationDetector,
346347
locator,

pkg/epp/flowcontrol/controller/controller.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ type flowControllerOption func(*FlowController)
128128
// The provided context governs the lifecycle of the controller and all its workers.
129129
func NewFlowController(
130130
ctx context.Context,
131+
poolName string,
131132
config *Config,
132133
registry contracts.FlowRegistry,
133134
sd contracts.SaturationDetector,
@@ -156,6 +157,7 @@ func NewFlowController(
156157
) shardProcessor {
157158
return internal.NewShardProcessor(
158159
ctx,
160+
poolName,
159161
shard,
160162
saturationDetector,
161163
podLocator,

pkg/epp/flowcontrol/controller/controller_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ func newUnitHarness(
132132
withClock(harnessOpts.clock),
133133
withShardProcessorFactory(mockProcessorFactory.new),
134134
}
135-
fc, err := NewFlowController(ctx, cfg, registry, mockDetector, mockPodLocator, fcOpts...)
135+
fc, err := NewFlowController(ctx, "test-pool", cfg, registry, mockDetector, mockPodLocator, fcOpts...)
136136
require.NoError(t, err, "failed to create FlowController for unit test harness")
137137

138138
h := &testHarness{
@@ -167,7 +167,7 @@ func newIntegrationHarness(t *testing.T, ctx context.Context, cfg *Config, regis
167167
withRegistryClient(registry),
168168
withClock(mockClock),
169169
}
170-
fc, err := NewFlowController(ctx, cfg, registry, mockDetector, mockPodLocator, opts...)
170+
fc, err := NewFlowController(ctx, "test-pool", cfg, registry, mockDetector, mockPodLocator, opts...)
171171
require.NoError(t, err, "failed to create FlowController for integration test harness")
172172

173173
h := &testHarness{

pkg/epp/flowcontrol/controller/internal/processor.go

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ var ErrProcessorBusy = errors.New("shard processor is busy")
6565
// loop is the sole writer for all state-mutating operations. This makes complex transactions (like capacity checks)
6666
// inherently atomic without coarse-grained locks.
6767
type ShardProcessor struct {
68+
poolName string
6869
shard contracts.RegistryShard
6970
saturationDetector contracts.SaturationDetector
7071
podLocator contracts.PodLocator
@@ -87,6 +88,7 @@ type ShardProcessor struct {
8788
// NewShardProcessor creates a new ShardProcessor instance.
8889
func NewShardProcessor(
8990
ctx context.Context,
91+
poolName string,
9092
shard contracts.RegistryShard,
9193
saturationDetector contracts.SaturationDetector,
9294
podLocator contracts.PodLocator,
@@ -97,6 +99,7 @@ func NewShardProcessor(
9799
) *ShardProcessor {
98100
return &ShardProcessor{
99101
shard: shard,
102+
poolName: poolName,
100103
saturationDetector: saturationDetector,
101104
podLocator: podLocator,
102105
clock: clock,
@@ -311,6 +314,20 @@ func (sp *ShardProcessor) dispatchCycle(ctx context.Context) bool {
311314
metrics.RecordFlowControlDispatchCycleDuration(time.Since(dispatchCycleStart))
312315
}()
313316

317+
pool := sp.podLocator.Locate(ctx, nil)
318+
saturation := sp.saturationDetector.Saturation(ctx, pool)
319+
320+
// Record pool saturation metric
321+
metrics.RecordFlowControlPoolSaturation(sp.poolName, saturation)
322+
323+
// --- Viability Check (Pool-Wide Saturation) ---
324+
if saturation >= 1.0 {
325+
sp.logger.V(logutil.DEBUG).Info("Pool is saturated; enforcing HoL blocking.",
326+
"poolName", sp.poolName)
327+
// Short-circuit
328+
return false
329+
}
330+
314331
for _, priority := range sp.shard.AllOrderedPriorityLevels() {
315332
originalBand, err := sp.shard.PriorityBandAccessor(priority)
316333
if err != nil {
@@ -328,16 +345,7 @@ func (sp *ShardProcessor) dispatchCycle(ctx context.Context) bool {
328345
continue
329346
}
330347

331-
// --- Viability Check (Saturation/HoL Blocking) ---
332348
req := item.OriginalRequest()
333-
candidates := sp.podLocator.Locate(ctx, req.GetMetadata())
334-
if sp.saturationDetector.Saturation(ctx, candidates) >= 1.0 {
335-
sp.logger.V(logutil.DEBUG).Info("Policy's chosen item is saturated; enforcing HoL blocking.",
336-
"flowKey", req.FlowKey(), "reqID", req.ID(), "priorityName", originalBand.PriorityName())
337-
// Stop the dispatch cycle entirely to respect strict policy decision and prevent priority inversion where
338-
// lower-priority work might exacerbate the saturation affecting high-priority work.
339-
return false
340-
}
341349

342350
// --- Dispatch ---
343351
if err := sp.dispatchItem(item); err != nil {

pkg/epp/flowcontrol/controller/internal/processor_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ func newTestHarness(t *testing.T, expiryCleanupInterval time.Duration) *testHarn
120120

121121
h.processor = NewShardProcessor(
122122
h.ctx,
123+
"test-pool",
123124
h,
124125
h.saturationDetector,
125126
h.podLocator,
@@ -546,6 +547,7 @@ func TestShardProcessor(t *testing.T) {
546547
// A successful test is one that completes without panicking.
547548
time.Sleep(50 * time.Millisecond)
548549
})
550+
549551
})
550552

551553
t.Run("Unit", func(t *testing.T) {

pkg/epp/metrics/metrics.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -431,6 +431,15 @@ var (
431431
},
432432
append([]string{"fairness_id", "priority", "inference_pool"}, modelLabels...),
433433
)
434+
435+
flowControlPoolSaturation = prometheus.NewGaugeVec(
436+
prometheus.GaugeOpts{
437+
Subsystem: inferenceExtension,
438+
Name: "flow_control_pool_saturation",
439+
Help: metricsutil.HelpMsgWithStability("Current saturation level of the inference pool (0.0 = empty, 1.0 = fully saturated).", compbasemetrics.ALPHA),
440+
},
441+
[]string{"inference_pool"},
442+
)
434443
)
435444

436445
// --- Inference Model Rewrite Metrics ---
@@ -487,6 +496,7 @@ func Register(customCollectors ...prometheus.Collector) {
487496
metrics.Registry.MustRegister(flowControlDispatchCycleDuration)
488497
metrics.Registry.MustRegister(flowControlQueueSize)
489498
metrics.Registry.MustRegister(flowControlQueueBytes)
499+
metrics.Registry.MustRegister(flowControlPoolSaturation)
490500
metrics.Registry.MustRegister(flowControlRequestEnqueueDuration)
491501
metrics.Registry.MustRegister(inferenceModelRewriteDecisionsTotal)
492502
for _, collector := range customCollectors {
@@ -535,6 +545,7 @@ func Reset() {
535545
flowControlRequestQueueDuration.Reset()
536546
flowControlQueueSize.Reset()
537547
flowControlQueueBytes.Reset()
548+
flowControlPoolSaturation.Reset()
538549
flowControlRequestEnqueueDuration.Reset()
539550
inferenceModelRewriteDecisionsTotal.Reset()
540551
}
@@ -849,6 +860,11 @@ func SubFlowControlQueueBytes(fairnessID, priority, inferencePool, modelName, ta
849860
flowControlQueueBytes.WithLabelValues(fairnessID, priority, inferencePool, modelName, targetModelName).Sub(float64(bytes))
850861
}
851862

863+
// RecordFlowControlPoolSaturation records the current saturation level for an inference pool.
864+
func RecordFlowControlPoolSaturation(inferencePool string, saturation float64) {
865+
flowControlPoolSaturation.WithLabelValues(inferencePool).Set(saturation)
866+
}
867+
852868
// SetTTFTSLOThreshold sets the TTFT SLO threshold for a model.
853869
// This allows dynamic threshold management and makes the threshold visible in metrics.
854870
func SetTTFTSLOThreshold(modelName, targetModelName string, threshold float64) {

pkg/epp/metrics/metrics_test.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1120,6 +1120,47 @@ func TestFlowControlQueueBytesMetric(t *testing.T) {
11201120
require.Equal(t, 0.0, val, "Gauge value for non-existent labels should be 0")
11211121
}
11221122

1123+
func TestFlowControlPoolSaturationMetric(t *testing.T) {
1124+
Reset()
1125+
1126+
const pool = "test-pool"
1127+
1128+
// Set saturation to 0.5
1129+
RecordFlowControlPoolSaturation(pool, 0.5)
1130+
val, err := testutil.GetGaugeMetricValue(flowControlPoolSaturation.WithLabelValues(pool))
1131+
require.NoError(t, err, "Failed to get gauge value for pool saturation")
1132+
require.Equal(t, 0.5, val, "Gauge value should be 0.5")
1133+
1134+
// Update saturation to 1.0 (fully saturated)
1135+
RecordFlowControlPoolSaturation(pool, 1.0)
1136+
val, err = testutil.GetGaugeMetricValue(flowControlPoolSaturation.WithLabelValues(pool))
1137+
require.NoError(t, err, "Failed to get gauge value after update")
1138+
require.Equal(t, 1.0, val, "Gauge value should be 1.0 after update")
1139+
1140+
// Update saturation to 0.0 (empty)
1141+
RecordFlowControlPoolSaturation(pool, 0.0)
1142+
val, err = testutil.GetGaugeMetricValue(flowControlPoolSaturation.WithLabelValues(pool))
1143+
require.NoError(t, err, "Failed to get gauge value for empty pool")
1144+
require.Equal(t, 0.0, val, "Gauge value should be 0.0 for empty pool")
1145+
1146+
// Multiple pools
1147+
RecordFlowControlPoolSaturation("pool-a", 0.3)
1148+
RecordFlowControlPoolSaturation("pool-b", 0.7)
1149+
1150+
valA, err := testutil.GetGaugeMetricValue(flowControlPoolSaturation.WithLabelValues("pool-a"))
1151+
require.NoError(t, err, "Failed to get gauge value for pool-a")
1152+
require.Equal(t, 0.3, valA, "Gauge value should be 0.3 for pool-a")
1153+
1154+
valB, err := testutil.GetGaugeMetricValue(flowControlPoolSaturation.WithLabelValues("pool-b"))
1155+
require.NoError(t, err, "Failed to get gauge value for pool-b")
1156+
require.Equal(t, 0.7, valB, "Gauge value should be 0.7 for pool-b")
1157+
1158+
// Non-existent pool
1159+
val, err = testutil.GetGaugeMetricValue(flowControlPoolSaturation.WithLabelValues("non-existent"))
1160+
require.NoError(t, err, "Failed to get gauge value for non-existent pool")
1161+
require.Equal(t, 0.0, val, "Gauge value for non-existent pool should be 0")
1162+
}
1163+
11231164
func TestInferenceModelRewriteDecisionsTotalMetric(t *testing.T) {
11241165
Reset()
11251166

site-src/guides/metrics-and-observability.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ These metrics provide insights into the [Flow Control layer](flow-control.md) wi
6464
| inference_extension_flow_control_queue_bytes | Gauge | The current size in bytes of all requests being actively managed by the Flow Control layer. This includes requests from the moment they enter the `EnqueueAndWait` function until they reach a final outcome. | `fairness_id`=&lt;flow-id&gt; <br> `priority`=&lt;flow-priority&gt; <br> `inference_pool`=&lt;pool-name&gt; <br> `model_name`=&lt;model-name&gt; <br> `target_model_name`=&lt;target-model-name&gt; | ALPHA |
6565
| inference_extension_flow_control_dispatch_cycle_duration_seconds | Histogram | The time taken for each dispatch cycle in the Flow Control layer. | | ALPHA |
6666
| inference_extension_flow_control_request_enqueue_duration_seconds | Gauge | The time taken to enqueue requests by the EPP Flow Control layer. | `fairness_id`=&lt;flow-id&gt; <br> `priority`=&lt;flow-priority&gt; <br> `outcome`=&lt;QueueOutcome&gt; | ALPHA |
67+
| inference_extension_flow_control_pool_saturation | Gauge | Current saturation level of the inference pool (0.0 = empty, 1.0 = fully saturated). | `inference_pool`=&lt;pool-name&gt; | ALPHA |
6768

6869

6970
## Scrape Metrics & Pprof profiles

0 commit comments

Comments
 (0)