From 99f644aa96a10c30bb58229b2155ae6984c4cc68 Mon Sep 17 00:00:00 2001 From: Greg Lamberson Date: Sun, 15 Mar 2026 02:54:03 -0500 Subject: [PATCH] feat(egfx): add QoE statistics accumulation on GraphicsPipelineServer The server currently processes QoE Frame Acknowledge PDUs and frame acknowledgments but discards all performance data after forwarding to handler callbacks. This adds a QoeCollector that automatically accumulates client decode/render timing and frame round-trip latency, exposed via qoe_snapshot() for health monitoring consumers. QoeCollector tracks: - Client decode+render time from QoE PDUs (EMA, min/max, latest) - Frame round-trip latency from FrameInfo.sent_at correlation (EMA, min/max) New public API: - QoeSnapshot struct (Clone + Debug) - GraphicsPipelineServer::qoe_snapshot() -> Option - GraphicsPipelineServer::reset_qoe() No changes to GraphicsPipelineHandler trait. Zero breaking changes. Part of #1158 (Section 11: Session Health Monitoring) --- crates/ironrdp-egfx/src/server.rs | 255 +++++++++++++++++- .../tests/egfx/server.rs | 122 +++++++++ 2 files changed, 376 insertions(+), 1 deletion(-) diff --git a/crates/ironrdp-egfx/src/server.rs b/crates/ironrdp-egfx/src/server.rs index 5e23b9464..81e4185f3 100644 --- a/crates/ironrdp-egfx/src/server.rs +++ b/crates/ironrdp-egfx/src/server.rs @@ -253,6 +253,222 @@ pub struct QoeMetrics { pub time_diff_dr: u16, } +// ============================================================================ +// QoE Statistics +// ============================================================================ + +/// Accumulated Quality of Experience statistics. +/// +/// Tracks client-reported decode/render timing from [2.2.2.13] QoE Frame +/// Acknowledge PDUs and server-measured round-trip latency from frame +/// acknowledgments. Statistics are accumulated over the lifetime of the +/// EGFX channel. +/// +/// Use [`GraphicsPipelineServer::qoe_snapshot()`] to query current values. +/// +/// [2.2.2.13]: https://learn.microsoft.com/en-us/openspecs/windows_protocols/ms-rdpegfx/40c1ada9-db39-407b-a760-fca4b3e9cc35 +#[derive(Debug)] +struct QoeCollector { + /// Total QoE reports received from client. + total_reports: u64, + + /// Most recent client decode+render time (microseconds). + latest_decode_render_us: u16, + + /// Exponential moving average of decode+render time (microseconds). + avg_decode_render_us: f32, + + /// Minimum decode+render time observed (microseconds). + min_decode_render_us: u16, + + /// Maximum decode+render time observed (microseconds). + max_decode_render_us: u16, + + /// Total round-trip latency samples (frame send to ack receive). + total_rtt_samples: u64, + + /// Exponential moving average of round-trip latency (milliseconds). + avg_rtt_ms: f32, + + /// Minimum round-trip latency observed (milliseconds). + min_rtt_ms: f32, + + /// Maximum round-trip latency observed (milliseconds). + max_rtt_ms: f32, + + /// Total encoded bytes across all acknowledged frames. + total_bytes_sent: u64, + + /// Total frames acknowledged (for average frame size calculation). + total_frames_acked: u64, + + /// Number of times a frame send was blocked by backpressure. + backpressure_count: u64, +} + +/// EMA smoothing factor. Balances responsiveness to recent values against +/// stability. 0.1 means ~10-sample effective window. +const QOE_EMA_ALPHA: f32 = 0.1; + +impl QoeCollector { + fn new() -> Self { + Self { + total_reports: 0, + latest_decode_render_us: 0, + avg_decode_render_us: 0.0, + min_decode_render_us: u16::MAX, + max_decode_render_us: 0, + total_rtt_samples: 0, + avg_rtt_ms: 0.0, + min_rtt_ms: f32::MAX, + max_rtt_ms: 0.0, + total_bytes_sent: 0, + total_frames_acked: 0, + backpressure_count: 0, + } + } + + /// Record a QoE report from the client. + fn record_qoe(&mut self, metrics: &QoeMetrics) { + let dr = metrics.time_diff_dr; + + self.latest_decode_render_us = dr; + self.min_decode_render_us = self.min_decode_render_us.min(dr); + self.max_decode_render_us = self.max_decode_render_us.max(dr); + + if self.total_reports == 0 { + self.avg_decode_render_us = f32::from(dr); + } else { + self.avg_decode_render_us = + self.avg_decode_render_us * (1.0 - QOE_EMA_ALPHA) + f32::from(dr) * QOE_EMA_ALPHA; + } + + self.total_reports += 1; + } + + /// Record an acknowledged frame's size for bandwidth tracking. + #[expect( + clippy::as_conversions, + reason = "usize to u64 is lossless on all supported platforms (32/64-bit)" + )] + fn record_frame_ack(&mut self, size_bytes: usize) { + self.total_bytes_sent += size_bytes as u64; + self.total_frames_acked += 1; + } + + /// Record a backpressure event (frame send blocked by full queue). + fn record_backpressure(&mut self) { + self.backpressure_count += 1; + } + + /// Record a round-trip latency measurement from a frame acknowledgment. + fn record_rtt(&mut self, rtt: core::time::Duration) { + let rtt_ms = rtt.as_secs_f32() * 1000.0; + + self.min_rtt_ms = self.min_rtt_ms.min(rtt_ms); + self.max_rtt_ms = self.max_rtt_ms.max(rtt_ms); + + if self.total_rtt_samples == 0 { + self.avg_rtt_ms = rtt_ms; + } else { + self.avg_rtt_ms = self.avg_rtt_ms * (1.0 - QOE_EMA_ALPHA) + rtt_ms * QOE_EMA_ALPHA; + } + + self.total_rtt_samples += 1; + } + + /// Produce a point-in-time snapshot of accumulated statistics. + fn snapshot(&self) -> QoeSnapshot { + QoeSnapshot { + total_qoe_reports: self.total_reports, + latest_decode_render_us: self.latest_decode_render_us, + avg_decode_render_us: self.avg_decode_render_us, + min_decode_render_us: if self.total_reports == 0 { + 0 + } else { + self.min_decode_render_us + }, + max_decode_render_us: self.max_decode_render_us, + total_rtt_samples: self.total_rtt_samples, + avg_rtt_ms: self.avg_rtt_ms, + min_rtt_ms: if self.total_rtt_samples == 0 { + 0.0 + } else { + self.min_rtt_ms + }, + max_rtt_ms: self.max_rtt_ms, + total_bytes_sent: self.total_bytes_sent, + avg_frame_size_bytes: if self.total_frames_acked == 0 { + 0 + } else { + self.total_bytes_sent / self.total_frames_acked + }, + backpressure_count: self.backpressure_count, + } + } + + /// Reset all accumulated statistics. + fn clear(&mut self) { + *self = Self::new(); + } +} + +impl Default for QoeCollector { + fn default() -> Self { + Self::new() + } +} + +/// Point-in-time snapshot of QoE statistics. +/// +/// Returned by [`GraphicsPipelineServer::qoe_snapshot()`]. +#[derive(Debug, Clone)] +pub struct QoeSnapshot { + /// Total QoE reports received from client. + pub total_qoe_reports: u64, + + /// Most recent client decode+render time (microseconds). + pub latest_decode_render_us: u16, + + /// Exponential moving average of decode+render time (microseconds). + pub avg_decode_render_us: f32, + + /// Minimum decode+render time observed (microseconds). + pub min_decode_render_us: u16, + + /// Maximum decode+render time observed (microseconds). + pub max_decode_render_us: u16, + + /// Total round-trip latency samples. + pub total_rtt_samples: u64, + + /// Exponential moving average of round-trip latency (milliseconds). + pub avg_rtt_ms: f32, + + /// Minimum round-trip latency observed (milliseconds). + pub min_rtt_ms: f32, + + /// Maximum round-trip latency observed (milliseconds). + pub max_rtt_ms: f32, + + /// Total encoded bytes across all acknowledged frames. + pub total_bytes_sent: u64, + + /// Average frame size in bytes (total_bytes_sent / frames acknowledged). + pub avg_frame_size_bytes: u64, + + /// Number of times a frame send was blocked by backpressure. + /// + /// High values indicate the client cannot keep up with the server's + /// frame rate. Useful for adaptive encoding decisions and thin client + /// detection. + pub backpressure_count: u64, +} + +// ============================================================================ +// Frame Tracking +// ============================================================================ + /// Frame tracking for flow control /// /// Implements the "Unacknowledged Frames ADM element" from MS-RDPEGFX. @@ -630,6 +846,7 @@ pub struct GraphicsPipelineServer { surfaces: Surfaces, frames: FrameTracker, + qoe: QoeCollector, output_width: u16, output_height: u16, @@ -660,6 +877,7 @@ impl GraphicsPipelineServer { codec_caps: CodecCapabilities::default(), surfaces: Surfaces::new(), frames, + qoe: QoeCollector::new(), output_width: 0, output_height: 0, reset_graphics_sent: false, @@ -932,6 +1150,35 @@ impl GraphicsPipelineServer { self.frames.set_max_in_flight(max); } + // ======================================================================== + // QoE Statistics + // ======================================================================== + + /// Get a snapshot of accumulated Quality of Experience statistics. + /// + /// Returns `None` if no QoE reports have been received and no + /// round-trip latency samples have been measured. + /// + /// QoE reports are sent by clients that support [2.2.2.13] QoE Frame + /// Acknowledge PDUs (V10.4+). Round-trip latency is measured for all + /// EGFX versions from frame send to acknowledgment. + /// + /// [2.2.2.13]: https://learn.microsoft.com/en-us/openspecs/windows_protocols/ms-rdpegfx/40c1ada9-db39-407b-a760-fca4b3e9cc35 + #[must_use] + pub fn qoe_snapshot(&self) -> Option { + if self.qoe.total_reports == 0 && self.qoe.total_rtt_samples == 0 { + return None; + } + Some(self.qoe.snapshot()) + } + + /// Reset accumulated QoE statistics. + /// + /// Useful when starting a new measurement window (e.g., after a resize). + pub fn reset_qoe(&mut self) { + self.qoe.clear(); + } + // ======================================================================== // Frame Sending // ======================================================================== @@ -999,6 +1246,7 @@ impl GraphicsPipelineServer { return None; } if self.should_backpressure() { + self.qoe.record_backpressure(); return None; } @@ -1049,6 +1297,7 @@ impl GraphicsPipelineServer { return None; } if self.should_backpressure() { + self.qoe.record_backpressure(); return None; } @@ -1188,7 +1437,10 @@ impl GraphicsPipelineServer { let queue_depth = pdu.queue_depth.to_u32(); if let Some(info) = self.frames.acknowledge(pdu.frame_id, queue_depth) { - trace!(frame_id = pdu.frame_id, latency = ?info.sent_at.elapsed()); + let rtt = info.sent_at.elapsed(); + self.qoe.record_rtt(rtt); + self.qoe.record_frame_ack(info.size_bytes); + trace!(frame_id = pdu.frame_id, latency = ?rtt); } self.handler.on_frame_ack(pdu.frame_id, queue_depth); @@ -1202,6 +1454,7 @@ impl GraphicsPipelineServer { time_diff_dr: pdu.time_diff_dr, }; + self.qoe.record_qoe(&metrics); self.handler.on_qoe_metrics(metrics); } diff --git a/crates/ironrdp-testsuite-core/tests/egfx/server.rs b/crates/ironrdp-testsuite-core/tests/egfx/server.rs index ee0a45fab..420dd681d 100644 --- a/crates/ironrdp-testsuite-core/tests/egfx/server.rs +++ b/crates/ironrdp-testsuite-core/tests/egfx/server.rs @@ -248,3 +248,125 @@ fn test_frame_flow_control() { let frame3 = server.send_avc420_frame(surface_id, &h264_data, ®ions, 33); assert!(frame3.is_none()); } + +// ============================================================================ +// QoE Statistics Tests +// ============================================================================ + +#[test] +fn test_qoe_snapshot_none_before_data() { + let handler = Box::new(TestHandler::new()); + let server = GraphicsPipelineServer::new(handler); + + // No QoE reports yet. + assert!(server.qoe_snapshot().is_none()); +} + +#[test] +fn test_qoe_snapshot_after_frame_ack() { + use ironrdp_egfx::pdu::{FrameAcknowledgePdu, QueueDepth}; + + let handler = Box::new(TestHandler::new()); + let mut server = GraphicsPipelineServer::new(handler); + + // Negotiate capabilities. + let client_caps_pdu = GfxPdu::CapabilitiesAdvertise(CapabilitiesAdvertisePdu(vec![CapabilitySet::V8_1 { + flags: CapabilitiesV81Flags::AVC420_ENABLED, + }])); + let payload = encode_pdu(&client_caps_pdu); + let _output = server.process(0, &payload).expect("process failed"); + + // Create surface and send a frame. + let surface_id = server.create_surface(1920, 1080).unwrap(); + server.drain_output(); + + let h264_data = vec![0x00, 0x00, 0x00, 0x01, 0x67]; + let regions = vec![Avc420Region::full_frame(1920, 1080, 22)]; + let frame_id = server.send_avc420_frame(surface_id, &h264_data, ®ions, 0); + assert!(frame_id.is_some()); + + // Simulate client frame acknowledgment. + let ack_pdu = GfxPdu::FrameAcknowledge(FrameAcknowledgePdu { + frame_id: frame_id.unwrap(), + queue_depth: QueueDepth::AvailableBytes(1), + total_frames_decoded: 1, + }); + let ack_payload = encode_pdu(&ack_pdu); + let _output = server.process(0, &ack_payload).expect("process failed"); + + // QoE snapshot should now have RTT data (no QoE reports, but RTT from ack). + let snapshot = server.qoe_snapshot(); + assert!(snapshot.is_some()); + + let snap = snapshot.unwrap(); + assert_eq!(snap.total_rtt_samples, 1); + // RTT should be some small value (frame was just sent). + assert!(snap.avg_rtt_ms < 1000.0); + // No QoE reports yet. + assert_eq!(snap.total_qoe_reports, 0); +} + +#[test] +fn test_qoe_snapshot_after_qoe_report() { + use ironrdp_egfx::pdu::QoeFrameAcknowledgePdu; + + let handler = Box::new(TestHandler::new()); + let mut server = GraphicsPipelineServer::new(handler); + + // Negotiate capabilities (V10 for QoE support). + let client_caps_pdu = GfxPdu::CapabilitiesAdvertise(CapabilitiesAdvertisePdu(vec![CapabilitySet::V10 { + flags: CapabilitiesV10Flags::SMALL_CACHE, + }])); + let payload = encode_pdu(&client_caps_pdu); + let _output = server.process(0, &payload).expect("process failed"); + + // Simulate QoE report. + let qoe_pdu = GfxPdu::QoeFrameAcknowledge(QoeFrameAcknowledgePdu { + frame_id: 0, + timestamp: 12345, + time_diff_se: 100, + time_diff_dr: 4500, + }); + let qoe_payload = encode_pdu(&qoe_pdu); + let _output = server.process(0, &qoe_payload).expect("process failed"); + + let snapshot = server.qoe_snapshot(); + assert!(snapshot.is_some()); + + let snap = snapshot.unwrap(); + assert_eq!(snap.total_qoe_reports, 1); + assert_eq!(snap.latest_decode_render_us, 4500); + assert!((snap.avg_decode_render_us - 4500.0).abs() < 0.1); + assert_eq!(snap.min_decode_render_us, 4500); + assert_eq!(snap.max_decode_render_us, 4500); +} + +#[test] +fn test_qoe_reset() { + use ironrdp_egfx::pdu::QoeFrameAcknowledgePdu; + + let handler = Box::new(TestHandler::new()); + let mut server = GraphicsPipelineServer::new(handler); + + // Negotiate. + let client_caps_pdu = GfxPdu::CapabilitiesAdvertise(CapabilitiesAdvertisePdu(vec![CapabilitySet::V10 { + flags: CapabilitiesV10Flags::SMALL_CACHE, + }])); + let payload = encode_pdu(&client_caps_pdu); + let _output = server.process(0, &payload).expect("process failed"); + + // Add a QoE report. + let qoe_pdu = GfxPdu::QoeFrameAcknowledge(QoeFrameAcknowledgePdu { + frame_id: 0, + timestamp: 1000, + time_diff_se: 50, + time_diff_dr: 3000, + }); + let qoe_payload = encode_pdu(&qoe_pdu); + let _output = server.process(0, &qoe_payload).expect("process failed"); + assert!(server.qoe_snapshot().is_some()); + + // Reset clears all statistics. + server.reset_qoe(); + assert!(server.qoe_snapshot().is_none()); +}