Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 28 additions & 1 deletion examples/wgpu_room/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{
video_renderer::VideoRenderer,
};
use egui::{CornerRadius, Stroke};
use livekit::{e2ee::EncryptionType, prelude::*, SimulateScenario};
use livekit::{e2ee::EncryptionType, prelude::*, track::VideoQuality, SimulateScenario};
use std::collections::HashMap;

/// The state of the application are saved on app exit and restored on app start.
Expand Down Expand Up @@ -266,6 +266,33 @@ impl LkApp {

ui.label(format!("{} - {:?}", publication.name(), publication.source()));

ui.horizontal(|ui| {
ui.label("Simulcasted - ");
let is_simulcasted = publication.simulcasted();
ui.label(if is_simulcasted { "Yes" } else { "No" });
if is_simulcasted {
ui.menu_button("Set Quality", |ui| {
let publication = publication.clone();
if ui.button("Low").clicked() {
let _ = self.service.send(AsyncCmd::SetVideoQuality {
publication,
quality: VideoQuality::Low,
});
} else if ui.button("Medium").clicked() {
let _ = self.service.send(AsyncCmd::SetVideoQuality {
publication,
quality: VideoQuality::Medium,
});
} else if ui.button("High").clicked() {
let _ = self.service.send(AsyncCmd::SetVideoQuality {
publication,
quality: VideoQuality::High,
});
}
});
}
});

ui.horizontal(|ui| {
if publication.is_muted() {
ui.colored_label(egui::Color32::DARK_GRAY, "Muted");
Expand Down
5 changes: 5 additions & 0 deletions examples/wgpu_room/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::{
use livekit::{
e2ee::{key_provider::*, E2eeOptions, EncryptionType},
prelude::*,
track::VideoQuality,
SimulateScenario,
};
use parking_lot::Mutex;
Expand All @@ -20,6 +21,7 @@ pub enum AsyncCmd {
ToggleSine,
SubscribeTrack { publication: RemoteTrackPublication },
UnsubscribeTrack { publication: RemoteTrackPublication },
SetVideoQuality { publication: RemoteTrackPublication, quality: VideoQuality },
E2eeKeyRatchet,
LogStats,
}
Expand Down Expand Up @@ -159,6 +161,9 @@ async fn service_task(inner: Arc<ServiceInner>, mut cmd_rx: mpsc::UnboundedRecei
AsyncCmd::UnsubscribeTrack { publication } => {
publication.set_subscribed(false);
}
AsyncCmd::SetVideoQuality { publication, quality } => {
publication.set_video_quality(quality);
}
AsyncCmd::E2eeKeyRatchet => {
if let Some(state) = running_state.as_ref() {
let e2ee_manager = state.room.e2ee_manager();
Expand Down
8 changes: 6 additions & 2 deletions livekit-ffi/protocol/ffi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,9 @@ message FfiRequest {

StreamSendBytesRequest send_bytes = 67;

// NEXT_ID: 68
SetRemoteTrackPublicationQualityRequest set_remote_track_publication_quality = 68;

// NEXT_ID: 69
}
}

Expand Down Expand Up @@ -247,7 +249,9 @@ message FfiResponse {

StreamSendBytesResponse send_bytes = 66;

// NEXT_ID: 67
SetRemoteTrackPublicationQualityResponse set_remote_track_publication_quality = 67;

// NEXT_ID: 68
}
}

Expand Down
14 changes: 13 additions & 1 deletion livekit-ffi/protocol/track_publication.proto
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,19 @@ message UpdateRemoteTrackPublicationDimensionRequest {
required uint32 height = 3;
}

message UpdateRemoteTrackPublicationDimensionResponse {}
// Video quality for simulcasted tracks.
enum VideoQuality {
VIDEO_QUALITY_LOW = 0;
VIDEO_QUALITY_MEDIUM = 1;
VIDEO_QUALITY_HIGH = 2;
}

message UpdateRemoteTrackPublicationDimensionResponse {}

// For tracks that support simulcasting, adjust subscribed quality.
message SetRemoteTrackPublicationQualityRequest {
required uint64 track_publication_handle = 1;
required VideoQuality quality = 2;
}

message SetRemoteTrackPublicationQualityResponse {}
22 changes: 22 additions & 0 deletions livekit-ffi/src/server/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,25 @@ fn on_update_remote_track_publication_dimension(
Ok(proto::UpdateRemoteTrackPublicationDimensionResponse {})
}

fn on_set_remote_track_publication_quality(
server: &'static FfiServer,
request: proto::SetRemoteTrackPublicationQualityRequest,
) -> FfiResult<proto::SetRemoteTrackPublicationQualityResponse> {
let ffi_publication =
server.retrieve_handle::<FfiPublication>(request.track_publication_handle)?;

let TrackPublication::Remote(publication) = &ffi_publication.publication else {
return Err(FfiError::InvalidRequest("publication is not a RemotePublication".into()));
};
let quality = match request.quality() {
proto::VideoQuality::Low => livekit::track::VideoQuality::Low,
proto::VideoQuality::Medium => livekit::track::VideoQuality::Medium,
proto::VideoQuality::High => livekit::track::VideoQuality::High,
};
publication.set_video_quality(quality);
Ok(proto::SetRemoteTrackPublicationQualityResponse {})
}

fn on_set_local_metadata(
server: &'static FfiServer,
set_local_metadata: proto::SetLocalMetadataRequest,
Expand Down Expand Up @@ -1239,6 +1258,9 @@ pub fn handle_request(
Request::UpdateRemoteTrackPublicationDimension(req) => {
on_update_remote_track_publication_dimension(server, req)?.into()
}
Request::SetRemoteTrackPublicationQuality(req) => {
on_set_remote_track_publication_quality(server, req)?.into()
}
Request::SendStreamHeader(req) => on_send_stream_header(server, req)?.into(),
Request::SendStreamChunk(req) => on_send_stream_chunk(server, req)?.into(),
Request::SendStreamTrailer(req) => on_send_stream_trailer(server, req)?.into(),
Expand Down
33 changes: 32 additions & 1 deletion livekit/src/room/participant/remote_participant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@ use livekit_runtime::timeout;
use parking_lot::Mutex;

use super::{ConnectionQuality, ParticipantInner, ParticipantKind, TrackKind};
use crate::{prelude::*, rtc_engine::RtcEngine, track::TrackError};
use crate::{
prelude::*,
rtc_engine::RtcEngine,
track::{TrackError, VideoQuality},
};

const ADD_TRACK_TIMEOUT: Duration = Duration::from_secs(5);

Expand Down Expand Up @@ -417,6 +421,33 @@ impl RemoteParticipant {
});
}
});

publication.on_video_quality_changed({
let rtc_engine = self.inner.rtc_engine.clone();
move |publication, quality| {
let rtc_engine = rtc_engine.clone();
livekit_runtime::spawn(async move {
let tsid: String = publication.sid().into();
let quality = match quality {
VideoQuality::Low => proto::VideoQuality::Low,
VideoQuality::Medium => proto::VideoQuality::Medium,
VideoQuality::High => proto::VideoQuality::High,
}
.into();
let update_track_settings = proto::UpdateTrackSettings {
track_sids: vec![tsid.clone()],
quality,
..Default::default()
};

rtc_engine
.send_request(proto::signal_request::Message::TrackSetting(
update_track_settings,
))
.await
});
}
});
}

pub(crate) fn remove_publication(&self, sid: &TrackSid) -> Option<TrackPublication> {
Expand Down
29 changes: 28 additions & 1 deletion livekit/src/room/publication/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use livekit_protocol::{self as proto, AudioTrackFeature};
use parking_lot::{Mutex, RwLock};

use super::{PermissionStatus, SubscriptionStatus, TrackPublication, TrackPublicationInner};
use crate::{e2ee::EncryptionType, prelude::*};
use crate::{e2ee::EncryptionType, prelude::*, track::VideoQuality};

type SubscribedHandler = Box<dyn Fn(RemoteTrackPublication, RemoteTrack) + Send>;
type UnsubscribedHandler = Box<dyn Fn(RemoteTrackPublication, RemoteTrack) + Send>;
Expand All @@ -29,6 +29,7 @@ type PermissionStatusChangedHandler =
type SubscriptionUpdateNeededHandler = Box<dyn Fn(RemoteTrackPublication, bool) + Send>;
type EnabledStatusChangedHandler = Box<dyn Fn(RemoteTrackPublication, bool) + Send>;
type VideoDimensionsChangedHandler = Box<dyn Fn(RemoteTrackPublication, TrackDimension) + Send>;
type VideoQualityChangedHandler = Box<dyn Fn(RemoteTrackPublication, VideoQuality) + Send>;

#[derive(Default)]
struct RemoteEvents {
Expand All @@ -39,6 +40,7 @@ struct RemoteEvents {
subscription_update_needed: Mutex<Option<SubscriptionUpdateNeededHandler>>,
enabled_status_changed: Mutex<Option<EnabledStatusChangedHandler>>,
video_dimensions_changed: Mutex<Option<VideoDimensionsChangedHandler>>,
video_quality_changed: Mutex<Option<VideoQualityChangedHandler>>,
}

#[derive(Debug)]
Expand Down Expand Up @@ -224,6 +226,13 @@ impl RemoteTrackPublication {
*self.remote.events.video_dimensions_changed.lock() = Some(Box::new(f));
}

pub(crate) fn on_video_quality_changed(
&self,
f: impl Fn(RemoteTrackPublication, VideoQuality) + Send + 'static,
) {
*self.remote.events.video_quality_changed.lock() = Some(Box::new(f));
}

pub fn set_subscribed(&self, subscribed: bool) {
let old_subscription_state = self.subscription_status();
let old_permission_state = self.permission_status();
Expand Down Expand Up @@ -253,6 +262,24 @@ impl RemoteTrackPublication {
self.emit_permission_update(old_permission_state);
}

/// For tracks that support simulcasting, adjust subscribed quality.
///
/// This indicates the highest quality the client can accept. if network
/// bandwidth does not allow, server will automatically reduce quality to
/// optimize for uninterrupted video.
///
pub fn set_video_quality(&self, quality: VideoQuality) {
if !self.simulcasted() {
log::warn!("Cannot set video quality for a track that is not simulcasted");
return;
}
if let Some(video_quality_changed) =
self.remote.events.video_quality_changed.lock().as_ref()
{
video_quality_changed(self.clone(), quality)
}
}

pub fn set_enabled(&self, enabled: bool) {
if self.is_subscribed() && enabled != self.is_enabled() {
let track = self.track().unwrap();
Expand Down
8 changes: 8 additions & 0 deletions livekit/src/room/track/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,14 @@ pub enum TrackSource {
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct TrackDimension(pub u32, pub u32);

/// Video quality for simulcasted tracks.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum VideoQuality {
Low,
Medium,
High,
}

macro_rules! track_dispatch {
([$($variant:ident),+]) => {
enum_dispatch!(
Expand Down