Skip to content

Commit 2feff80

Browse files
authored
Mark pending channel messages sent on RESP_CODE_SENT (#186)
* Mark pending channel message sent on RESP_CODE_SENT * Disambiguate RESP_CODE_SENT handling for direct vs channel * Handle channel sent feedback when firmware returns RESP_CODE_OK * Correlate channel OK ACKs and queue reaction channel sends
1 parent 51d70ce commit 2feff80

File tree

3 files changed

+175
-15
lines changed

3 files changed

+175
-15
lines changed

lib/connector/meshcore_connector.dart

Lines changed: 158 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,10 @@ class MeshCoreConnector extends ChangeNotifier {
114114
final List<Channel> _channels = [];
115115
final Map<String, List<Message>> _conversations = {};
116116
final Map<int, List<ChannelMessage>> _channelMessages = {};
117+
final List<String> _pendingChannelSentQueue = [];
118+
final List<_PendingCommandAck> _pendingGenericAckQueue = [];
119+
static const String _reactionSendQueuePrefix = '__reaction_send__';
120+
int _reactionSendQueueSequence = 0;
117121
final Set<String> _loadedConversationKeys = {};
118122
final Map<int, Set<String>> _processedChannelReactions =
119123
{}; // channelIndex -> Set of "targetHash_emoji"
@@ -988,14 +992,21 @@ class MeshCoreConnector extends ChangeNotifier {
988992
_isSyncingChannels = false;
989993
_channelSyncInFlight = false;
990994
_hasLoadedChannels = false;
995+
_pendingChannelSentQueue.clear();
996+
_pendingGenericAckQueue.clear();
997+
_reactionSendQueueSequence = 0;
991998

992999
_setState(MeshCoreConnectionState.disconnected);
9931000
if (!manual) {
9941001
_scheduleReconnect();
9951002
}
9961003
}
9971004

998-
Future<void> sendFrame(Uint8List data) async {
1005+
Future<void> sendFrame(
1006+
Uint8List data, {
1007+
String? channelSendQueueId,
1008+
bool expectsGenericAck = false,
1009+
}) async {
9991010
if (!isConnected || _rxCharacteristic == null) {
10001011
throw Exception("Not connected to a MeshCore device");
10011012
}
@@ -1014,6 +1025,11 @@ class MeshCoreConnector extends ChangeNotifier {
10141025
data.toList(),
10151026
withoutResponse: canWriteWithoutResponse,
10161027
);
1028+
_trackPendingGenericAck(
1029+
data,
1030+
channelSendQueueId: channelSendQueueId,
1031+
expectsGenericAck: expectsGenericAck,
1032+
);
10171033
}
10181034

10191035
Future<void> requestBatteryStatus({bool force = false}) async {
@@ -1369,7 +1385,13 @@ class MeshCoreConnector extends ChangeNotifier {
13691385
notifyListeners();
13701386

13711387
// Send the reaction to the device (don't add as a visible message)
1372-
await sendFrame(buildSendChannelTextMsgFrame(channel.index, text));
1388+
final reactionQueueId = _nextReactionSendQueueId();
1389+
_pendingChannelSentQueue.add(reactionQueueId);
1390+
await sendFrame(
1391+
buildSendChannelTextMsgFrame(channel.index, text),
1392+
channelSendQueueId: reactionQueueId,
1393+
expectsGenericAck: true,
1394+
);
13731395
return;
13741396
}
13751397

@@ -1379,6 +1401,7 @@ class MeshCoreConnector extends ChangeNotifier {
13791401
channel.index,
13801402
);
13811403
_addChannelMessage(channel.index, message);
1404+
_pendingChannelSentQueue.add(message.messageId);
13821405
notifyListeners();
13831406

13841407
final trimmed = text.trim();
@@ -1388,7 +1411,11 @@ class MeshCoreConnector extends ChangeNotifier {
13881411
(isChannelSmazEnabled(channel.index) && !isStructuredPayload)
13891412
? Smaz.encodeIfSmaller(text)
13901413
: text;
1391-
await sendFrame(buildSendChannelTextMsgFrame(channel.index, outboundText));
1414+
await sendFrame(
1415+
buildSendChannelTextMsgFrame(channel.index, outboundText),
1416+
channelSendQueueId: message.messageId,
1417+
expectsGenericAck: true,
1418+
);
13921419
}
13931420

13941421
Future<void> removeContact(Contact contact) async {
@@ -1735,6 +1762,9 @@ class MeshCoreConnector extends ChangeNotifier {
17351762
debugPrint('RX frame: code=$code len=${frame.length}');
17361763

17371764
switch (code) {
1765+
case respCodeOk:
1766+
_handleOk();
1767+
break;
17381768
case respCodeDeviceInfo:
17391769
_handleDeviceInfo(frame);
17401770
break;
@@ -1829,6 +1859,17 @@ class MeshCoreConnector extends ChangeNotifier {
18291859
'Firmware responded with error code: $errCode',
18301860
tag: 'Protocol',
18311861
);
1862+
1863+
if (_pendingGenericAckQueue.isEmpty) {
1864+
return;
1865+
}
1866+
1867+
final failedAck = _pendingGenericAckQueue.removeAt(0);
1868+
if (failedAck.commandCode != cmdSendChannelTxtMsg ||
1869+
failedAck.channelSendQueueId == null) {
1870+
return;
1871+
}
1872+
_pendingChannelSentQueue.remove(failedAck.channelSendQueueId);
18321873
}
18331874

18341875
void _handlePathUpdated(Uint8List frame) {
@@ -2611,8 +2652,22 @@ class MeshCoreConnector extends ChangeNotifier {
26112652
return;
26122653
}
26132654

2614-
if (_retryService != null) {
2615-
_retryService!.updateMessageFromSent(ackHash, timeoutMs);
2655+
final retryService = _retryService;
2656+
if (retryService != null &&
2657+
retryService.updateMessageFromSent(
2658+
ackHash,
2659+
timeoutMs,
2660+
allowQueueFallback: false,
2661+
)) {
2662+
return;
2663+
}
2664+
2665+
if (_markNextPendingChannelMessageSent()) {
2666+
return;
2667+
}
2668+
2669+
if (retryService != null) {
2670+
retryService.updateMessageFromSent(ackHash, timeoutMs);
26162671
}
26172672
} else {
26182673
// Fallback to old behavior
@@ -2629,6 +2684,64 @@ class MeshCoreConnector extends ChangeNotifier {
26292684
}
26302685
}
26312686

2687+
bool _markNextPendingChannelMessageSent() {
2688+
while (_pendingChannelSentQueue.isNotEmpty) {
2689+
final queuedMessageId = _pendingChannelSentQueue.removeAt(0);
2690+
if (_isReactionSendQueueId(queuedMessageId)) {
2691+
return true;
2692+
}
2693+
if (_markPendingChannelMessageSentById(queuedMessageId)) {
2694+
return true;
2695+
}
2696+
}
2697+
return false;
2698+
}
2699+
2700+
bool _markPendingChannelMessageSentById(String messageId) {
2701+
for (final entry in _channelMessages.entries) {
2702+
final channelMessages = entry.value;
2703+
for (int i = channelMessages.length - 1; i >= 0; i--) {
2704+
final message = channelMessages[i];
2705+
if (message.messageId != messageId) {
2706+
continue;
2707+
}
2708+
if (!message.isOutgoing ||
2709+
message.status != ChannelMessageStatus.pending) {
2710+
return false;
2711+
}
2712+
channelMessages[i] = message.copyWith(
2713+
status: ChannelMessageStatus.sent,
2714+
);
2715+
_pendingChannelSentQueue.remove(messageId);
2716+
unawaited(
2717+
_channelMessageStore.saveChannelMessages(entry.key, channelMessages),
2718+
);
2719+
notifyListeners();
2720+
return true;
2721+
}
2722+
}
2723+
return false;
2724+
}
2725+
2726+
void _handleOk() {
2727+
if (_pendingGenericAckQueue.isEmpty) {
2728+
return;
2729+
}
2730+
2731+
final pendingAck = _pendingGenericAckQueue.removeAt(0);
2732+
if (pendingAck.commandCode != cmdSendChannelTxtMsg ||
2733+
pendingAck.channelSendQueueId == null) {
2734+
return;
2735+
}
2736+
2737+
final queueId = pendingAck.channelSendQueueId!;
2738+
_pendingChannelSentQueue.remove(queueId);
2739+
if (_isReactionSendQueueId(queueId)) {
2740+
return;
2741+
}
2742+
_markPendingChannelMessageSentById(queueId);
2743+
}
2744+
26322745
void _handleSendConfirmed(Uint8List frame) {
26332746
// Frame format from C++:
26342747
// [0] = PUSH_CODE_SEND_CONFIRMED
@@ -3207,18 +3320,22 @@ class MeshCoreConnector extends ChangeNotifier {
32073320
mergedPathBytes.length,
32083321
);
32093322
final newRepeatCount = existing.repeatCount + 1;
3323+
final promotedFromPending =
3324+
newRepeatCount == 1 &&
3325+
existing.status == ChannelMessageStatus.pending;
32103326
messages[existingIndex] = existing.copyWith(
32113327
repeatCount: newRepeatCount,
32123328
pathLength: mergedPathLength,
32133329
pathBytes: mergedPathBytes,
32143330
pathVariants: mergedPathVariants,
32153331
// Mark as sent when first repeat is heard
3216-
status:
3217-
newRepeatCount == 1 &&
3218-
existing.status == ChannelMessageStatus.pending
3332+
status: promotedFromPending
32193333
? ChannelMessageStatus.sent
32203334
: existing.status,
32213335
);
3336+
if (promotedFromPending) {
3337+
_pendingChannelSentQueue.remove(existing.messageId);
3338+
}
32223339
} else {
32233340
messages.add(processedMessage);
32243341
}
@@ -3391,11 +3508,37 @@ class MeshCoreConnector extends ChangeNotifier {
33913508
_queuedMessageSyncInFlight = false;
33923509
_isSyncingChannels = false;
33933510
_channelSyncInFlight = false;
3511+
_pendingChannelSentQueue.clear();
3512+
_pendingGenericAckQueue.clear();
3513+
_reactionSendQueueSequence = 0;
33943514

33953515
_setState(MeshCoreConnectionState.disconnected);
33963516
_scheduleReconnect();
33973517
}
33983518

3519+
void _trackPendingGenericAck(
3520+
Uint8List data, {
3521+
String? channelSendQueueId,
3522+
required bool expectsGenericAck,
3523+
}) {
3524+
if (!expectsGenericAck || data.isEmpty) return;
3525+
_pendingGenericAckQueue.add(
3526+
_PendingCommandAck(
3527+
commandCode: data[0],
3528+
channelSendQueueId: channelSendQueueId,
3529+
),
3530+
);
3531+
}
3532+
3533+
String _nextReactionSendQueueId() {
3534+
_reactionSendQueueSequence++;
3535+
return '$_reactionSendQueuePrefix$_reactionSendQueueSequence';
3536+
}
3537+
3538+
bool _isReactionSendQueueId(String queueId) {
3539+
return queueId.startsWith(_reactionSendQueuePrefix);
3540+
}
3541+
33993542
Map<String, String> _parseKeyValueString(String input) {
34003543
final result = <String, String>{};
34013544

@@ -3691,3 +3834,10 @@ class _RepeaterAckContext {
36913834
required this.messageBytes,
36923835
});
36933836
}
3837+
3838+
class _PendingCommandAck {
3839+
final int commandCode;
3840+
final String? channelSendQueueId;
3841+
3842+
_PendingCommandAck({required this.commandCode, this.channelSendQueueId});
3843+
}

lib/screens/contacts_screen.dart

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -183,14 +183,17 @@ class _ContactsScreenState extends State<ContactsScreen>
183183
final connector = Provider.of<MeshCoreConnector>(context, listen: false);
184184
final exportContactFrame = buildExportContactFrame(pubKey);
185185
_pendingOperations.add(ContactOperationType.export);
186-
await connector.sendFrame(exportContactFrame);
186+
await connector.sendFrame(exportContactFrame, expectsGenericAck: true);
187187
}
188188

189189
Future<void> _contactZeroHop(Uint8List pubKey) async {
190190
final connector = Provider.of<MeshCoreConnector>(context, listen: false);
191191
final exportContactZeroHopFrame = buildZeroHopContact(pubKey);
192192
_pendingOperations.add(ContactOperationType.zeroHopShare);
193-
await connector.sendFrame(exportContactZeroHopFrame);
193+
await connector.sendFrame(
194+
exportContactZeroHopFrame,
195+
expectsGenericAck: true,
196+
);
194197
}
195198

196199
Future<void> _contactImport() async {
@@ -217,7 +220,7 @@ class _ContactsScreenState extends State<ContactsScreen>
217220
try {
218221
final importContactFrame = buildImportContactFrame(hexString);
219222
_pendingOperations.add(ContactOperationType.import);
220-
await connector.sendFrame(importContactFrame);
223+
await connector.sendFrame(importContactFrame, expectsGenericAck: true);
221224
} catch (e) {
222225
if (mounted) {
223226
ScaffoldMessenger.of(context).showSnackBar(

lib/services/message_retry_service.dart

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,11 @@ class MessageRetryService extends ChangeNotifier {
234234
}
235235
}
236236

237-
void updateMessageFromSent(Uint8List ackHash, int timeoutMs) {
237+
bool updateMessageFromSent(
238+
Uint8List ackHash,
239+
int timeoutMs, {
240+
bool allowQueueFallback = true,
241+
}) {
238242
final ackHashHex = ackHash
239243
.map((b) => b.toRadixString(16).padLeft(2, '0'))
240244
.join();
@@ -277,7 +281,7 @@ class MessageRetryService extends ChangeNotifier {
277281
}
278282

279283
// FALLBACK: Old queue-based matching (for messages sent before hash computation was added)
280-
if (messageId == null) {
284+
if (messageId == null && allowQueueFallback) {
281285
_debugLogService?.warn(
282286
'RESP_CODE_SENT: ACK hash $ackHashHex not found in hash table, falling back to queue',
283287
tag: 'AckHash',
@@ -320,7 +324,7 @@ class MessageRetryService extends ChangeNotifier {
320324

321325
if (messageId == null || contact == null) {
322326
debugPrint('No pending message found for ACK hash: $ackHashHex');
323-
return;
327+
return false;
324328
}
325329

326330
// Store the mapping for future lookups (e.g., when ACK arrives)
@@ -339,7 +343,7 @@ class MessageRetryService extends ChangeNotifier {
339343
'Message $messageId no longer pending for ACK hash: $ackHashHex',
340344
);
341345
_ackHashToMessageId.remove(ackHashHex);
342-
return;
346+
return false;
343347
}
344348

345349
// Add this ACK hash to the list of expected ACKs for this message (for history)
@@ -389,8 +393,11 @@ class MessageRetryService extends ChangeNotifier {
389393

390394
_startTimeoutTimer(messageId, actualTimeout);
391395
debugPrint('Updated message $messageId with ACK hash: $ackHashHex');
396+
return true;
392397
}
393398

399+
bool get hasPendingMessages => _pendingMessages.isNotEmpty;
400+
394401
void _startTimeoutTimer(String messageId, int timeoutMs) {
395402
_timeoutTimers[messageId]?.cancel();
396403
_timeoutTimers[messageId] = Timer(Duration(milliseconds: timeoutMs), () {

0 commit comments

Comments
 (0)