Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .changes/pending-track-queue
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
patch type="fixed" "race condition with pending tracks"
3 changes: 3 additions & 0 deletions lib/src/constants.dart
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ class Timeouts {
final Duration connection;
final Duration debounce;
final Duration publish;
final Duration subscribe;
final Duration peerConnection;
final Duration iceRestart;

const Timeouts({
required this.connection,
required this.debounce,
required this.publish,
required this.subscribe,
required this.peerConnection,
required this.iceRestart,
});
Expand All @@ -31,6 +33,7 @@ class Timeouts {
connection: Duration(seconds: 10),
debounce: Duration(milliseconds: 100),
publish: Duration(seconds: 10),
subscribe: Duration(seconds: 10),
peerConnection: Duration(seconds: 10),
iceRestart: Duration(seconds: 10),
);
Expand Down
3 changes: 2 additions & 1 deletion lib/src/core/engine.dart
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
String? url;
String? token;

late ConnectOptions connectOptions;
ConnectOptions connectOptions;
RoomOptions roomOptions;
FastConnectOptions? fastConnectOptions;

Expand Down Expand Up @@ -188,6 +188,7 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
}

Engine({
required this.connectOptions,
required this.roomOptions,
SignalClient? signalClient,
PeerConnectionCreate? peerConnectionCreate,
Expand Down
153 changes: 153 additions & 0 deletions lib/src/core/pending_track_queue.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
// Copyright 2024 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

import 'package:flutter_webrtc/flutter_webrtc.dart' as rtc;
import 'package:meta/meta.dart';

import '../events.dart';
import '../logger.dart';
import '../types/other.dart';

typedef PendingTrackSubscriber = Future<bool> Function(PendingTrack entry);
typedef TrackExceptionEmitter = void Function(TrackSubscriptionExceptionEvent event);

/// Helper that queues subscriber tracks when participant metadata isn't ready yet.
@internal
class PendingTrackQueue {
final int maxSize;
Duration ttl;
final TrackExceptionEmitter emitException;

// keyed by participant sid
final Map<String, List<PendingTrack>> _pending = {};

PendingTrackQueue({
required this.ttl,
required this.emitException,
this.maxSize = 100,
});

void updateTtl(Duration ttl) {
this.ttl = ttl;
}

void clear() {
_pending.clear();
}

void enqueue({
required rtc.MediaStreamTrack track,
required rtc.MediaStream stream,
required rtc.RTCRtpReceiver? receiver,
required String participantSid,
required String trackSid,
required ConnectionState connectionState,
}) {
// If we're already disconnected, drop immediately.
if (connectionState == ConnectionState.disconnected) {
final event = TrackSubscriptionExceptionEvent(
participant: null,
sid: trackSid,
reason: TrackSubscribeFailReason.noParticipantFound,
);
logger.warning('Dropping pending track while disconnected trackSid:$trackSid participantSid:$participantSid');
emitException(event);
return;
}

_removeExpired();

final totalPending = _pending.values.fold<int>(0, (sum, list) => sum + list.length);
if (totalPending >= maxSize) {
final event = TrackSubscriptionExceptionEvent(
participant: null,
sid: trackSid,
reason: TrackSubscribeFailReason.noParticipantFound,
);
logger.severe('Pending track queue full, dropping trackSid:$trackSid participantSid:$participantSid');
emitException(event);
return;
}

final expiresAt = DateTime.now().add(ttl);
logger.fine('Queueing pending trackSid:$trackSid participantSid:$participantSid until metadata is ready');
final entry = PendingTrack(
track: track,
stream: stream,
receiver: receiver,
participantSid: participantSid,
trackSid: trackSid,
expiresAt: expiresAt,
);
final list = _pending.putIfAbsent(participantSid, () => []);
list.add(entry);
}

@internal
Future<void> flush({
required bool isConnected,
String? participantSid,
required PendingTrackSubscriber subscriber,
}) async {
_removeExpired();
if (!isConnected) return;

final Iterable<PendingTrack> source = participantSid != null
? List<PendingTrack>.from(_pending[participantSid] ?? const [])
: _pending.values.expand((e) => e).toList();

for (final item in source) {
final success = await subscriber(item);
if (success) {
_pending[item.participantSid]?.remove(item);
}
}
}

void _removeExpired() {
final now = DateTime.now();
_pending.forEach((sid, list) {
final expired = list.where((p) => p.expiresAt.isBefore(now)).toList();
for (final item in expired) {
list.remove(item);
final event = TrackSubscriptionExceptionEvent(
participant: null,
sid: item.trackSid,
reason: TrackSubscribeFailReason.noParticipantFound,
);
logger.warning('Pending track expired waiting for participant metadata: $event');
emitException(event);
}
});
}
}

@internal
class PendingTrack {
final rtc.MediaStreamTrack track;
final rtc.MediaStream stream;
final rtc.RTCRtpReceiver? receiver;
final String participantSid;
final String trackSid;
final DateTime expiresAt;

PendingTrack({
required this.track,
required this.stream,
required this.receiver,
required this.participantSid,
required this.trackSid,
required this.expiresAt,
});
}
59 changes: 54 additions & 5 deletions lib/src/core/room.dart
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import '../types/rpc.dart';
import '../types/transcription_segment.dart';
import '../utils.dart' show unpackStreamId;
import 'engine.dart';
import 'pending_track_queue.dart';

/// Room is the primary construct for LiveKit conferences. It contains a
/// group of [Participant]s, each publishing and subscribing to [Track]s.
Expand Down Expand Up @@ -135,6 +136,9 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
@internal
late final PreConnectAudioBuffer preConnectAudioBuffer;

// Pending subscriber tracks keyed by participantSid, for tracks arriving before metadata or before the room connected.
late final PendingTrackQueue _pendingTrackQueue;

// for testing
@internal
Map<String, RpcRequestHandler> get rpcHandlers => _rpcHandlers;
Expand All @@ -152,6 +156,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
Engine? engine,
}) : engine = engine ??
Engine(
connectOptions: connectOptions,
roomOptions: roomOptions,
) {
//
Expand All @@ -161,11 +166,18 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
_signalListener = this.engine.signalClient.createListener();
_setUpSignalListeners();

_pendingTrackQueue = PendingTrackQueue(
ttl: this.engine.connectOptions.timeouts.subscribe,
emitException: (event) => events.emit(event),
);

// Any event emitted will trigger ChangeNotifier
events.listen((event) {
logger.finer('[RoomEvent] $event, will notifyListeners()');
notifyListeners();
});
// Keep a connected flush as a fallback in case tracks arrive pre-connected but before metadata.
events.on<RoomConnectedEvent>((event) => _flushPendingTracks());

_setupRpcListeners();

Expand Down Expand Up @@ -232,6 +244,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
}) async {
var roomOptions = this.roomOptions;
connectOptions ??= ConnectOptions();
_pendingTrackQueue.updateTtl(connectOptions.timeouts.subscribe);
// ignore: deprecated_member_use_from_same_package
if ((roomOptions.encryption != null || roomOptions.e2eeOptions != null) && engine.e2eeManager == null) {
if (!lkPlatformSupportsE2EE()) {
Expand Down Expand Up @@ -596,12 +609,18 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
reason: TrackSubscribeFailReason.invalidServerResponse,
);
}
if (participant == null) {
throw TrackSubscriptionExceptionEvent(
participant: participant,
sid: trackSid,
reason: TrackSubscribeFailReason.noParticipantFound,

final shouldDefer = connectionState != ConnectionState.connected || participant == null;
if (shouldDefer) {
_pendingTrackQueue.enqueue(
track: event.track,
stream: event.stream,
receiver: event.receiver,
participantSid: participantSid,
trackSid: trackSid,
connectionState: connectionState,
);
return;
}
await participant.addSubscribedMediaTrack(
event.track,
Expand Down Expand Up @@ -678,6 +697,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {

_remoteParticipants[result.participant.identity] = result.participant;
_sidToIdentity[result.participant.sid] = result.participant.identity;
await _flushPendingTracks(participant: result.participant);
return result;
}

Expand Down Expand Up @@ -722,10 +742,12 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
}
}
_sidToIdentity[info.sid] = info.identity;
await _flushPendingTracks(participant: result.participant);
} else {
final wasUpdated = await result.participant.updateFromInfo(info);
if (wasUpdated) {
_sidToIdentity[info.sid] = info.identity;
await _flushPendingTracks(participant: result.participant);
}
}
}
Expand Down Expand Up @@ -760,6 +782,32 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
emitWhenConnected(ActiveSpeakersChangedEvent(speakers: activeSpeakers));
}

Future<void> _flushPendingTracks({RemoteParticipant? participant}) => _pendingTrackQueue.flush(
isConnected: connectionState == ConnectionState.connected,
participantSid: participant?.sid,
subscriber: (pending) async {
final target = participant ?? _getRemoteParticipantBySid(pending.participantSid);
if (target == null) return false;
try {
await target.addSubscribedMediaTrack(
pending.track,
pending.stream,
pending.trackSid,
receiver: pending.receiver,
audioOutputOptions: roomOptions.defaultAudioOutputOptions,
);
return true;
} on TrackSubscriptionExceptionEvent catch (event) {
logger.severe('Track subscription failed during flush: ${event}');
events.emit(event);
return true;
} catch (exception) {
logger.warning('Unknown exception during pending track flush: ${exception}');
return false;
}
},
);

// from data channel
// updates are sent only when there's a change to speaker ordering
void _onEngineActiveSpeakersUpdateEvent(List<lk_models.SpeakerInfo> speakers) {
Expand Down Expand Up @@ -941,6 +989,7 @@ extension RoomPrivateMethods on Room {
}
_remoteParticipants.clear();
_sidToIdentity.clear();
_pendingTrackQueue.clear();

// clean up LocalParticipant
await localParticipant?.unpublishAllTracks();
Expand Down
Loading