Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
255 changes: 254 additions & 1 deletion crates/ironrdp-egfx/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,222 @@ pub struct QoeMetrics {
pub time_diff_dr: u16,
}

// ============================================================================
// QoE Statistics
// ============================================================================

/// Accumulated Quality of Experience statistics.
///
/// Tracks client-reported decode/render timing from [2.2.2.13] QoE Frame
/// Acknowledge PDUs and server-measured round-trip latency from frame
/// acknowledgments. Statistics are accumulated over the lifetime of the
/// EGFX channel.
///
/// Use [`GraphicsPipelineServer::qoe_snapshot()`] to query current values.
///
/// [2.2.2.13]: https://learn.microsoft.com/en-us/openspecs/windows_protocols/ms-rdpegfx/40c1ada9-db39-407b-a760-fca4b3e9cc35
#[derive(Debug)]
struct QoeCollector {
/// Total QoE reports received from client.
total_reports: u64,

/// Most recent client decode+render time (microseconds).
latest_decode_render_us: u16,

/// Exponential moving average of decode+render time (microseconds).
avg_decode_render_us: f32,

/// Minimum decode+render time observed (microseconds).
min_decode_render_us: u16,

/// Maximum decode+render time observed (microseconds).
max_decode_render_us: u16,

/// Total round-trip latency samples (frame send to ack receive).
total_rtt_samples: u64,

/// Exponential moving average of round-trip latency (milliseconds).
avg_rtt_ms: f32,

/// Minimum round-trip latency observed (milliseconds).
min_rtt_ms: f32,

/// Maximum round-trip latency observed (milliseconds).
max_rtt_ms: f32,

/// Total encoded bytes across all acknowledged frames.
total_bytes_sent: u64,

/// Total frames acknowledged (for average frame size calculation).
total_frames_acked: u64,

/// Number of times a frame send was blocked by backpressure.
backpressure_count: u64,
}

/// EMA smoothing factor. Balances responsiveness to recent values against
/// stability. 0.1 means ~10-sample effective window.
const QOE_EMA_ALPHA: f32 = 0.1;

impl QoeCollector {
fn new() -> Self {
Self {
total_reports: 0,
latest_decode_render_us: 0,
avg_decode_render_us: 0.0,
min_decode_render_us: u16::MAX,
max_decode_render_us: 0,
total_rtt_samples: 0,
avg_rtt_ms: 0.0,
min_rtt_ms: f32::MAX,
max_rtt_ms: 0.0,
total_bytes_sent: 0,
total_frames_acked: 0,
backpressure_count: 0,
}
}

/// Record a QoE report from the client.
fn record_qoe(&mut self, metrics: &QoeMetrics) {
let dr = metrics.time_diff_dr;

self.latest_decode_render_us = dr;
self.min_decode_render_us = self.min_decode_render_us.min(dr);
self.max_decode_render_us = self.max_decode_render_us.max(dr);

if self.total_reports == 0 {
self.avg_decode_render_us = f32::from(dr);
} else {
self.avg_decode_render_us =
self.avg_decode_render_us * (1.0 - QOE_EMA_ALPHA) + f32::from(dr) * QOE_EMA_ALPHA;
}

self.total_reports += 1;
}

/// Record an acknowledged frame's size for bandwidth tracking.
#[expect(
clippy::as_conversions,
reason = "usize to u64 is lossless on all supported platforms (32/64-bit)"
)]
fn record_frame_ack(&mut self, size_bytes: usize) {
self.total_bytes_sent += size_bytes as u64;
self.total_frames_acked += 1;
}

/// Record a backpressure event (frame send blocked by full queue).
fn record_backpressure(&mut self) {
self.backpressure_count += 1;
}

/// Record a round-trip latency measurement from a frame acknowledgment.
fn record_rtt(&mut self, rtt: core::time::Duration) {
let rtt_ms = rtt.as_secs_f32() * 1000.0;

self.min_rtt_ms = self.min_rtt_ms.min(rtt_ms);
self.max_rtt_ms = self.max_rtt_ms.max(rtt_ms);

if self.total_rtt_samples == 0 {
self.avg_rtt_ms = rtt_ms;
} else {
self.avg_rtt_ms = self.avg_rtt_ms * (1.0 - QOE_EMA_ALPHA) + rtt_ms * QOE_EMA_ALPHA;
}

self.total_rtt_samples += 1;
}

/// Produce a point-in-time snapshot of accumulated statistics.
fn snapshot(&self) -> QoeSnapshot {
QoeSnapshot {
total_qoe_reports: self.total_reports,
latest_decode_render_us: self.latest_decode_render_us,
avg_decode_render_us: self.avg_decode_render_us,
min_decode_render_us: if self.total_reports == 0 {
0
} else {
self.min_decode_render_us
},
max_decode_render_us: self.max_decode_render_us,
total_rtt_samples: self.total_rtt_samples,
avg_rtt_ms: self.avg_rtt_ms,
min_rtt_ms: if self.total_rtt_samples == 0 {
0.0
} else {
self.min_rtt_ms
},
max_rtt_ms: self.max_rtt_ms,
total_bytes_sent: self.total_bytes_sent,
avg_frame_size_bytes: if self.total_frames_acked == 0 {
0
} else {
self.total_bytes_sent / self.total_frames_acked
},
backpressure_count: self.backpressure_count,
}
}

/// Reset all accumulated statistics.
fn clear(&mut self) {
*self = Self::new();
}
}

impl Default for QoeCollector {
fn default() -> Self {
Self::new()
}
}

/// Point-in-time snapshot of QoE statistics.
///
/// Returned by [`GraphicsPipelineServer::qoe_snapshot()`].
#[derive(Debug, Clone)]
pub struct QoeSnapshot {
/// Total QoE reports received from client.
pub total_qoe_reports: u64,

/// Most recent client decode+render time (microseconds).
pub latest_decode_render_us: u16,

/// Exponential moving average of decode+render time (microseconds).
pub avg_decode_render_us: f32,

/// Minimum decode+render time observed (microseconds).
pub min_decode_render_us: u16,

/// Maximum decode+render time observed (microseconds).
pub max_decode_render_us: u16,

/// Total round-trip latency samples.
pub total_rtt_samples: u64,

/// Exponential moving average of round-trip latency (milliseconds).
pub avg_rtt_ms: f32,

/// Minimum round-trip latency observed (milliseconds).
pub min_rtt_ms: f32,

/// Maximum round-trip latency observed (milliseconds).
pub max_rtt_ms: f32,

/// Total encoded bytes across all acknowledged frames.
pub total_bytes_sent: u64,

/// Average frame size in bytes (total_bytes_sent / frames acknowledged).
pub avg_frame_size_bytes: u64,

/// Number of times a frame send was blocked by backpressure.
///
/// High values indicate the client cannot keep up with the server's
/// frame rate. Useful for adaptive encoding decisions and thin client
/// detection.
pub backpressure_count: u64,
}

// ============================================================================
// Frame Tracking
// ============================================================================

/// Frame tracking for flow control
///
/// Implements the "Unacknowledged Frames ADM element" from MS-RDPEGFX.
Expand Down Expand Up @@ -630,6 +846,7 @@ pub struct GraphicsPipelineServer {

surfaces: Surfaces,
frames: FrameTracker,
qoe: QoeCollector,

output_width: u16,
output_height: u16,
Expand Down Expand Up @@ -660,6 +877,7 @@ impl GraphicsPipelineServer {
codec_caps: CodecCapabilities::default(),
surfaces: Surfaces::new(),
frames,
qoe: QoeCollector::new(),
output_width: 0,
output_height: 0,
reset_graphics_sent: false,
Expand Down Expand Up @@ -932,6 +1150,35 @@ impl GraphicsPipelineServer {
self.frames.set_max_in_flight(max);
}

// ========================================================================
// QoE Statistics
// ========================================================================

/// Get a snapshot of accumulated Quality of Experience statistics.
///
/// Returns `None` if no QoE reports have been received and no
/// round-trip latency samples have been measured.
///
/// QoE reports are sent by clients that support [2.2.2.13] QoE Frame
/// Acknowledge PDUs (V10.4+). Round-trip latency is measured for all
/// EGFX versions from frame send to acknowledgment.
///
/// [2.2.2.13]: https://learn.microsoft.com/en-us/openspecs/windows_protocols/ms-rdpegfx/40c1ada9-db39-407b-a760-fca4b3e9cc35
#[must_use]
pub fn qoe_snapshot(&self) -> Option<QoeSnapshot> {
if self.qoe.total_reports == 0 && self.qoe.total_rtt_samples == 0 {
return None;
}
Some(self.qoe.snapshot())
}

/// Reset accumulated QoE statistics.
///
/// Useful when starting a new measurement window (e.g., after a resize).
pub fn reset_qoe(&mut self) {
self.qoe.clear();
}

// ========================================================================
// Frame Sending
// ========================================================================
Expand Down Expand Up @@ -999,6 +1246,7 @@ impl GraphicsPipelineServer {
return None;
}
if self.should_backpressure() {
self.qoe.record_backpressure();
return None;
}

Expand Down Expand Up @@ -1049,6 +1297,7 @@ impl GraphicsPipelineServer {
return None;
}
if self.should_backpressure() {
self.qoe.record_backpressure();
return None;
}

Expand Down Expand Up @@ -1188,7 +1437,10 @@ impl GraphicsPipelineServer {
let queue_depth = pdu.queue_depth.to_u32();

if let Some(info) = self.frames.acknowledge(pdu.frame_id, queue_depth) {
trace!(frame_id = pdu.frame_id, latency = ?info.sent_at.elapsed());
let rtt = info.sent_at.elapsed();
self.qoe.record_rtt(rtt);
self.qoe.record_frame_ack(info.size_bytes);
trace!(frame_id = pdu.frame_id, latency = ?rtt);
}

self.handler.on_frame_ack(pdu.frame_id, queue_depth);
Expand All @@ -1202,6 +1454,7 @@ impl GraphicsPipelineServer {
time_diff_dr: pdu.time_diff_dr,
};

self.qoe.record_qoe(&metrics);
self.handler.on_qoe_metrics(metrics);
}

Expand Down
Loading
Loading