Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 53 additions & 35 deletions src/AsyncWebSocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,9 @@ size_t AsyncWebSocketMessage::ack(size_t len, uint32_t time) {
if (_sent >= _WSbuffer->size() && _acked >= _ack) {
_status = WS_MSG_SENT;
}
async_ws_log_v("opcode: %" PRIu8 ", acked: %u/%u, left: %u/%u, status: %d", _opcode, _acked, _ack, len - pending, len, static_cast<int>(_status));
return len - pending;
const size_t remaining = len - pending;
async_ws_log_v("ACK[%" PRIu8 "] %u/%u (acked: %u/%u) => %" PRIu8, _opcode, _sent, _WSbuffer->size(), _acked, _ack, static_cast<uint8_t>(_status));
return remaining;
}

size_t AsyncWebSocketMessage::send(AsyncClient *client) {
Expand All @@ -182,20 +183,22 @@ size_t AsyncWebSocketMessage::send(AsyncClient *client) {
}

if (_status != WS_MSG_SENDING) {
async_ws_log_v("C[%" PRIu16 "] Wrong status: got: %d, expected: %d", client->remotePort(), static_cast<int>(_status), static_cast<int>(WS_MSG_SENDING));
async_ws_log_v("SEND[%" PRIu8 "] => [%" PRIu16 "] WS_MSG_SENDING != %" PRIu8, _opcode, client->remotePort(), static_cast<uint8_t>(_status));
return 0;
}

if (_sent == _WSbuffer->size()) {
if (_acked == _ack) {
_status = WS_MSG_SENT;
}
async_ws_log_v("C[%" PRIu16 "] Already sent: %u/%u", client->remotePort(), _sent, _WSbuffer->size());
async_ws_log_v("SEND[%" PRIu8 "] => [%" PRIu16 "] WS_MSG_SENT %u/%u (acked: %u/%u)", _opcode, client->remotePort(), _sent, _WSbuffer->size(), _acked, _ack);
return 0;
}
if (_sent > _WSbuffer->size()) {
_status = WS_MSG_ERROR;
async_ws_log_v("C[%" PRIu16 "] Error, sent more: %u/%u", client->remotePort(), _sent, _WSbuffer->size());
async_ws_log_v(
"SEND[%" PRIu8 "] => [%" PRIu16 "] WS_MSG_ERROR %u/%u (acked: %u/%u)", _opcode, client->remotePort(), _sent, _WSbuffer->size(), _acked, _ack
);
return 0;
}

Expand All @@ -204,7 +207,7 @@ size_t AsyncWebSocketMessage::send(AsyncClient *client) {

// not enough space in lwip buffer ?
if (!window) {
async_ws_log_v("C[%" PRIu16 "] No space left to send more data: acked: %u, sent: %u, remaining: %u", client->remotePort(), _acked, _sent, toSend);
async_ws_log_v("SEND[%" PRIu8 "] => [%" PRIu16 "] NO_SPACE %u", _opcode, client->remotePort(), toSend);
return 0;
}

Expand All @@ -224,7 +227,9 @@ size_t AsyncWebSocketMessage::send(AsyncClient *client) {
_ack -= (toSend - sent);
}

async_ws_log_v("C[%" PRIu16 "] sent: %u/%u, final: %d, acked: %u/%u", client->remotePort(), _sent, _WSbuffer->size(), final, _acked, _ack);
async_ws_log_v(
"SEND[%" PRIu8 "] => [%" PRIu16 "] WS_MSG_SENDING %u/%u (acked: %u/%u)", _opcode, client->remotePort(), _sent, _WSbuffer->size(), _acked, _ack
);
return sent;
}

Expand Down Expand Up @@ -304,6 +309,8 @@ void AsyncWebSocketClient::_clearQueue() {
void AsyncWebSocketClient::_onAck(size_t len, uint32_t time) {
_lastMessageTime = millis();

async_ws_log_v("[%s][%" PRIu32 "] START ACK(%u, %" PRIu32 ") Q:%u", _server->url(), _clientId, len, time, _messageQueue.size());

#ifdef ESP32
std::unique_lock<std::recursive_mutex> lock(_lock);
#endif
Expand All @@ -315,6 +322,7 @@ void AsyncWebSocketClient::_onAck(size_t len, uint32_t time) {
if (_status == WS_DISCONNECTING && head.opcode() == WS_DISCONNECT) {
_controlQueue.pop_front();
_status = WS_DISCONNECTED;
async_ws_log_v("[%s][%" PRIu32 "] ACK WS_DISCONNECTED", _server->url(), _clientId);
if (_client) {
#ifdef ESP32
/*
Expand Down Expand Up @@ -343,6 +351,8 @@ void AsyncWebSocketClient::_onAck(size_t len, uint32_t time) {

_clearQueue();

async_ws_log_v("[%s][%" PRIu32 "] END ACK(%u, %" PRIu32 ") Q:%u", _server->url(), _clientId, len, time, _messageQueue.size());

_runQueue();
}

Expand Down Expand Up @@ -386,7 +396,7 @@ void AsyncWebSocketClient::_runQueue() {
continue;
}
if (space > (size_t)(ctrl.len() - 1)) {
async_ws_log_v("WS[%" PRIu32 "] Sending control frame: %" PRIu8 ", len: %" PRIu8, _clientId, ctrl.opcode(), ctrl.len());
async_ws_log_v("[%s][%" PRIu32 "] SEND CTRL %" PRIu8, _server->url(), _clientId, ctrl.opcode());
ctrl.send(_client);
space = webSocketSendFrameWindow(_client);
}
Expand All @@ -398,22 +408,23 @@ void AsyncWebSocketClient::_runQueue() {
for (auto &msg : _messageQueue) {
if (msg._remainingBytesToSend()) {
async_ws_log_v(
"WS[%" PRIu32 "] Send message fragment: %u/%u, acked: %u/%u", _clientId, msg._remainingBytesToSend(), msg._sent + msg._remainingBytesToSend(),
msg._acked, msg._ack
"[%s][%" PRIu32 "][%" PRIu8 "] SEND %u/%u (acked: %u/%u)", _server->url(), _clientId, msg._opcode, msg._sent, msg._WSbuffer->size(), msg._acked,
msg._ack
);

// will use all the remaining space, or all the remaining bytes to send, whichever is smaller
msg.send(_client);
space = webSocketSendFrameWindow(_client);

// If we haven't finished sending this message, we must stop here to preserve WebSocket ordering.
// We can only pipeline subsequent messages if the current one is fully passed to TCP buffer.
if (msg._remainingBytesToSend()) {
async_ws_log_v("[%s][%" PRIu32 "][%" PRIu8 "] NO_SPACE", _server->url(), _clientId, msg._opcode);
break;
}
}

// not enough space for another message
if (!space) {
} else if (!space) {
// not enough space for another message
async_ws_log_v("[%s][%" PRIu32 "] NO_SPACE", _server->url(), _clientId);
break;
}
}
Expand Down Expand Up @@ -452,6 +463,7 @@ bool AsyncWebSocketClient::_queueControl(uint8_t opcode, const uint8_t *data, si
#endif

_controlQueue.emplace_back(opcode, data, len, mask);
async_ws_log_v("[%s][%" PRIu32 "] QUEUE CTRL (%u) << %" PRIu8, _server->url(), _clientId, _controlQueue.size(), opcode);

if (_client && _client->canSend()) {
_runQueue();
Expand Down Expand Up @@ -485,16 +497,17 @@ bool AsyncWebSocketClient::_queueMessage(AsyncWebSocketSharedBuffer buffer, uint
_client->close();
}

async_ws_log_w("Too many messages queued: closing connection");
async_ws_log_w("[%s][%" PRIu32 "] Too many messages queued: closing connection", _server->url(), _clientId);

} else {
async_ws_log_w("Too many messages queued: discarding new message");
async_ws_log_w("[%s][%" PRIu32 "] Too many messages queued: discarding new message", _server->url(), _clientId);
}

return false;
}

_messageQueue.emplace_back(buffer, opcode, mask);
async_ws_log_v("[%s][%" PRIu32 "] QUEUE MSG (%u/%u) << %" PRIu8, _server->url(), _clientId, _messageQueue.size(), WS_MAX_QUEUED_MESSAGES, opcode);

if (_client && _client->canSend()) {
_runQueue();
Expand All @@ -508,6 +521,8 @@ void AsyncWebSocketClient::close(uint16_t code, const char *message) {
return;
}

async_ws_log_w("[%s][%" PRIu32 "] CLOSE", _server->url(), _clientId);

_status = WS_DISCONNECTING;

if (code) {
Expand Down Expand Up @@ -541,21 +556,20 @@ bool AsyncWebSocketClient::ping(const uint8_t *data, size_t len) {
return _status == WS_CONNECTED && _queueControl(WS_PING, data, len);
}

void AsyncWebSocketClient::_onError(int8_t) {
// Serial.println("onErr");
void AsyncWebSocketClient::_onError(int8_t err) {
async_ws_log_v("[%s][%" PRIu32 "] ERROR %" PRIi8, _server->url(), _clientId, static_cast<int8_t>(err));
}

void AsyncWebSocketClient::_onTimeout(uint32_t time) {
if (!_client) {
return;
}
// Serial.println("onTime");
(void)time;
async_ws_log_v("[%s][%" PRIu32 "] TIMEOUT %" PRIu32, _server->url(), _clientId, time);
_client->close();
}

void AsyncWebSocketClient::_onDisconnect() {
// Serial.println("onDis");
async_ws_log_v("[%s][%" PRIu32 "] DISCONNECT", _server->url(), _clientId);
_client = nullptr;
_server->_handleDisconnect(this);
}
Expand All @@ -566,7 +580,7 @@ void AsyncWebSocketClient::_onData(void *pbuf, size_t plen) {

while (plen > 0) {
async_ws_log_v(
"WS[%" PRIu32 "] _onData: plen: %" PRIu32 ", _pstate: %" PRIu8 ", _status: %" PRIu8, _clientId, plen, _pstate, static_cast<uint8_t>(_status)
"[%s][%" PRIu32 "] DATA plen: %" PRIu32 ", _pstate: %" PRIu8 ", _status: %" PRIu8, _server->url(), _clientId, plen, _pstate, static_cast<uint8_t>(_status)
);

if (_pstate == STATE_FRAME_START) {
Expand Down Expand Up @@ -595,8 +609,8 @@ void AsyncWebSocketClient::_onData(void *pbuf, size_t plen) {
}

async_ws_log_v(
"WS[%" PRIu32 "] _pinfo: index: %" PRIu64 ", final: %" PRIu8 ", opcode: %" PRIu8 ", masked: %" PRIu8 ", len: %" PRIu64, _clientId, _pinfo.index,
_pinfo.final, _pinfo.opcode, _pinfo.masked, _pinfo.len
"[%s][%" PRIu32 "] DATA _pinfo: index: %" PRIu64 ", final: %" PRIu8 ", opcode: %" PRIu8 ", masked: %" PRIu8 ", len: %" PRIu64, _server->url(), _clientId,
_pinfo.index, _pinfo.final, _pinfo.opcode, _pinfo.masked, _pinfo.len
);

// Handle fragmented mask data - Safari may split the 4-byte mask across multiple packets
Expand All @@ -608,17 +622,17 @@ void AsyncWebSocketClient::_onData(void *pbuf, size_t plen) {
if (plen == 0) {
// Safari close frame edge case: masked bit set but no mask data
if (_pinfo.opcode == WS_DISCONNECT) {
async_ws_log_v("WS[%" PRIu32 "] close frame with incomplete mask, treating as unmasked", _clientId);
async_ws_log_v("[%s][%" PRIu32 "] DATA close frame with incomplete mask, treating as unmasked", _server->url(), _clientId);
_pinfo.masked = 0;
_pinfo.index = 0;
_pinfo.len = 0;
_pstate = STATE_FRAME_START;
break;
}

//wait for more data
// wait for more data
_pstate = STATE_FRAME_MASK;
async_ws_log_v("WS[%" PRIu32 "] waiting for more mask data: read: %" PRIu8 "/4", _clientId, _pinfo.masked - 1);
async_ws_log_v("[%s][%" PRIu32 "] DATA waiting for more mask data: read: %" PRIu8 "/4", _server->url(), _clientId, _pinfo.masked - 1);
return;
}

Expand All @@ -634,7 +648,7 @@ void AsyncWebSocketClient::_onData(void *pbuf, size_t plen) {

// restore masked to 1 for backward compatibility
if (_pinfo.masked >= 5) {
async_ws_log_v("WS[%" PRIu32 "] mask read complete", _clientId);
async_ws_log_v("[%s][%" PRIu32 "] DATA mask read complete", _server->url(), _clientId);
_pinfo.masked = 1;
}

Expand Down Expand Up @@ -663,7 +677,7 @@ void AsyncWebSocketClient::_onData(void *pbuf, size_t plen) {

if (datalen > 0) {
async_ws_log_v(
"WS[%" PRIu32 "] processing next fragment of %s frame %" PRIu32 ", index: %" PRIu64 ", len: %" PRIu32 "", _clientId,
"[%s][%" PRIu32 "] DATA processing next fragment of %s frame %" PRIu32 ", index: %" PRIu64 ", len: %" PRIu32 "", _server->url(), _clientId,
(_pinfo.message_opcode == WS_TEXT) ? "text" : "binary", _pinfo.num, _pinfo.index, (uint32_t)datalen
);
_handleDataEvent(data, datalen, datalen == plen); // datalen == plen means that we are processing the last part of the current TCP packet
Expand All @@ -676,7 +690,7 @@ void AsyncWebSocketClient::_onData(void *pbuf, size_t plen) {
_pstate = STATE_FRAME_START;

if (_pinfo.opcode == WS_DISCONNECT) {
async_ws_log_v("WS[%" PRIu32 "] processing disconnect", _clientId);
async_ws_log_v("[%s][%" PRIu32 "] DATA WS_DISCONNECT", _server->url(), _clientId);

if (datalen) {
uint16_t reasonCode = (uint16_t)(data[0] << 8) + data[1];
Expand All @@ -699,19 +713,19 @@ void AsyncWebSocketClient::_onData(void *pbuf, size_t plen) {
}

} else if (_pinfo.opcode == WS_PING) {
async_ws_log_v("WS[%" PRIu32 "] processing ping", _clientId);
async_ws_log_v("[%s][%" PRIu32 "] DATA PING", _server->url(), _clientId);
_server->_handleEvent(this, WS_EVT_PING, NULL, NULL, 0);
_queueControl(WS_PONG, data, datalen);

} else if (_pinfo.opcode == WS_PONG) {
async_ws_log_v("WS[%" PRIu32 "] processing pong", _clientId);
async_ws_log_v("[%s][%" PRIu32 "] DATA PONG", _server->url(), _clientId);
if (datalen != AWSC_PING_PAYLOAD_LEN || memcmp(AWSC_PING_PAYLOAD, data, AWSC_PING_PAYLOAD_LEN) != 0) {
_server->_handleEvent(this, WS_EVT_PONG, NULL, NULL, 0);
}

} else if (_pinfo.opcode < WS_DISCONNECT) { // continuation or text/binary frame
async_ws_log_v(
"WS[%" PRIu32 "] processing final fragment of %s frame %" PRIu32 ", index: %" PRIu64 ", len: %" PRIu32 "", _clientId,
"[%s][%" PRIu32 "] DATA processing final fragment of %s frame %" PRIu32 ", index: %" PRIu64 ", len: %" PRIu32 "", _server->url(), _clientId,
(_pinfo.message_opcode == WS_TEXT) ? "text" : "binary", _pinfo.num, _pinfo.index, (uint32_t)datalen
);

Expand All @@ -728,7 +742,9 @@ void AsyncWebSocketClient::_onData(void *pbuf, size_t plen) {
// unexpected frame error, close connection
_pstate = STATE_FRAME_START;

async_ws_log_v("frame error: len: %u, index: %llu, total: %llu\n", datalen, _pinfo.index, _pinfo.len);
async_ws_log_v(
"[%s][%" PRIu32 "] DATA frame error: len: %u, index: %" PRIu64 ", total: %" PRIu64 "\n", _server->url(), _clientId, datalen, _pinfo.index, _pinfo.len
);

_status = WS_DISCONNECTING;
if (_client) {
Expand Down Expand Up @@ -1038,7 +1054,9 @@ void AsyncWebSocket::closeAll(uint16_t code, const char *message) {
}

void AsyncWebSocket::cleanupClients(uint16_t maxClients) {
if (count() > maxClients) {
const size_t c = count();
if (c > maxClients) {
async_ws_log_v("[%s] CLEANUP %" PRIu32 " (%u/%" PRIu16 ")", _url.c_str(), _clients.front().id(), c, maxClients);
_clients.front().close();
}

Expand Down