diff --git a/.github/dependabot.yml b/.github/dependabot.yml index ae17f3da..e7e07693 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -1,12 +1,13 @@ version: 2 updates: -- package-ecosystem: github-actions - directory: "/" - schedule: - interval: daily - open-pull-requests-limit: 10 -- package-ecosystem: npm - directory: "/" - schedule: - interval: daily - open-pull-requests-limit: 10 + - package-ecosystem: github-actions + directory: / + schedule: + interval: daily + open-pull-requests-limit: 10 + + - package-ecosystem: npm + directory: / + schedule: + interval: daily + open-pull-requests-limit: 10 diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 0a878830..2e864050 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1,4 +1,4 @@ -name: ci +name: CI on: push: @@ -10,40 +10,43 @@ on: - 'docs/**' - '*.md' +permissions: + contents: read + jobs: dependency-review: name: Dependency Review if: github.event_name == 'pull_request' runs-on: ubuntu-latest - permissions: - contents: read steps: - - name: Check out repo - uses: actions/checkout@v3 + - name: Checkout repository + uses: actions/checkout@v4 with: persist-credentials: false - name: Dependency review - uses: actions/dependency-review-action@v2 + uses: actions/dependency-review-action@v4 test: runs-on: ${{ matrix.os }} - permissions: - contents: read strategy: matrix: - node-version: [14, 16, '*'] + node-version: [16, 18, 20] os: [ubuntu-latest, windows-latest, macOS-latest] + fail-fast: false steps: - - uses: actions/checkout@v3 + - name: Checkout repository + uses: actions/checkout@v4 with: persist-credentials: false - name: Use Node.js - uses: actions/setup-node@v3 + uses: actions/setup-node@v4 with: node-version: ${{ matrix.node-version }} check-latest: true + cache: npm + cache-dependency-path: package.json - name: Install run: | @@ -67,8 +70,6 @@ jobs: coverage: needs: test runs-on: ubuntu-latest - permissions: - contents: read steps: - name: Coveralls Finished uses: coverallsapp/github-action@master diff --git a/.github/workflows/sast.yml b/.github/workflows/codeql.yml similarity index 50% rename from .github/workflows/sast.yml rename to .github/workflows/codeql.yml index f618657f..0170f08c 100644 --- a/.github/workflows/sast.yml +++ b/.github/workflows/codeql.yml @@ -1,4 +1,4 @@ -name: sast +name: CodeQL on: push: @@ -11,19 +11,25 @@ jobs: name: Analyze runs-on: ubuntu-latest permissions: + actions: read contents: read security-events: write strategy: fail-fast: true matrix: - language: [ 'javascript' ] + language: [ 'javascript-typescript' ] steps: - - uses: actions/checkout@v3 + - name: Checkout repository + uses: actions/checkout@v4 with: persist-credentials: false - - uses: github/codeql-action/init@v2 + - name: Initialize CodeQL + uses: github/codeql-action/init@v3 with: languages: ${{ matrix.language }} - - uses: github/codeql-action/analyze@v2 + - name: Perform CodeQL Analysis + uses: github/codeql-action/analyze@v3 + with: + category: "/language:${{ matrix.language }}" diff --git a/.github/workflows/labeler.yml b/.github/workflows/labeler.yml index d1b07822..21997102 100644 --- a/.github/workflows/labeler.yml +++ b/.github/workflows/labeler.yml @@ -1,10 +1,15 @@ -name: "Pull Request Labeler" +name: Pull Request Labeler + on: pull_request_target +permissions: + contents: read + pull-requests: write + jobs: label: runs-on: ubuntu-latest steps: - - uses: actions/labeler@main - with: - repo-token: "${{ secrets.GITHUB_TOKEN }}" + - uses: actions/labeler@v5 + with: + repo-token: "${{ secrets.GITHUB_TOKEN }}" diff --git a/README.md b/README.md index 2a7b7d23..f4771cca 100644 --- a/README.md +++ b/README.md @@ -1,12 +1,10 @@ - + # Aedes ![ci](https://github.com/moscajs/aedes/workflows/ci/badge.svg) -[![js-standard-style](https://img.shields.io/badge/code%20style-standard-brightgreen.svg?style=flat)](http://standardjs.com/) +[![js-standard-style](https://img.shields.io/badge/code%20style-standard-brightgreen.svg?style=flat)](https://standardjs.com/) [![Maintenance](https://img.shields.io/badge/Maintained%3F-yes-green.svg)](https://github.com/moscajs/aedes/graphs/commit-activity) [![PRs Welcome](https://img.shields.io/badge/PRs-welcome-brightgreen.svg)](https://github.com/moscajs/aedes/pulls)\ -[![Total alerts](https://img.shields.io/lgtm/alerts/g/moscajs/aedes.svg?logo=lgtm&logoWidth=18)](https://lgtm.com/projects/g/moscajs/aedes/alerts/) -[![Language grade: JavaScript](https://img.shields.io/lgtm/grade/javascript/g/moscajs/aedes.svg?logo=lgtm&logoWidth=18)](https://lgtm.com/projects/g/moscajs/aedes/context:javascript) [![Coverage Status](https://coveralls.io/repos/moscajs/aedes/badge.svg?branch=main&service=github)](https://coveralls.io/github/moscajs/aedes?branch=main) [![Known Vulnerabilities](https://snyk.io/test/github/moscajs/aedes/badge.svg)](https://snyk.io/test/github/moscajs/aedes)\ ![node](https://img.shields.io/node/v/aedes) @@ -283,7 +281,7 @@ Want to contribute? Check our list of ## Security notice -Messages sent to the broker are considered _valid_ once they pass the [`authorizePublish`](https://github.com/moscajs/aedes/blob/main/docs/Aedes.md#handler-authorizepublish-client-packet-callback) callback. +Messages sent to the broker are considered _valid_ once they pass the [`authorizePublish`](./docs/Aedes.md#handler-authorizepublish-client-packet-callback) callback. In other terms, if permissions for the given client are revoked after the call completes, the message is still considered valid. In case you are sending time-sensitive messages, make sure to use QoS 0 or connect with a clean session. diff --git a/SECURITY.md b/SECURITY.md index 5d316923..6000d731 100644 --- a/SECURITY.md +++ b/SECURITY.md @@ -2,4 +2,4 @@ ## Reporting a Vulnerability -Please email daniel.sorridi+aedes@gmail.com; matteo.collina+aedes@gmail.com +Please report all vulnerabilities to [https://github.com/moscajs/aedes/security](https://github.com/moscajs/aedes/security). diff --git a/aedes.d.ts b/aedes.d.ts index 1574faf1..3d3fc7f5 100644 --- a/aedes.d.ts +++ b/aedes.d.ts @@ -1,8 +1,12 @@ import Aedes, { AedesOptions } from './types/instance' -export declare function createBroker (options?: AedesOptions): Aedes +export declare function createBroker(options?: AedesOptions): Aedes export * from './types/instance' export * from './types/packet' export * from './types/client' export default Aedes + +declare module 'aedes' { + export = Aedes +} diff --git a/aedes.js b/aedes.js index 78f43521..c54e6836 100644 --- a/aedes.js +++ b/aedes.js @@ -26,10 +26,12 @@ const defaultOptions = { authorizeSubscribe: defaultAuthorizeSubscribe, authorizeForward: defaultAuthorizeForward, published: defaultPublished, + wrapDeliveryFunc: null, trustProxy: false, trustedProxies: [], queueLimit: 42, - maxClientsIdLength: 23 + maxClientsIdLength: 23, + keepaliveLimit: 0 } function Aedes (opts) { @@ -47,29 +49,26 @@ function Aedes (opts) { this.counter = 0 this.queueLimit = opts.queueLimit this.connectTimeout = opts.connectTimeout + this.keepaliveLimit = opts.keepaliveLimit this.maxClientsIdLength = opts.maxClientsIdLength this.mq = opts.mq || mqemitter({ concurrency: opts.concurrency, matchEmptyLevels: true // [MQTT-4.7.1-3] }) - this.handle = function handle (conn, req) { - conn.setMaxListeners(opts.concurrency * 2) - // create a new Client instance for a new connection - // return, just to please standard - return new Client(that, conn, req) - } + this.concurrency = opts.concurrency this.persistence = opts.persistence || memory() this.persistence.broker = this this._parallel = parallel() this._series = series() this._enqueuers = reusify(DoEnqueues) - this.preConnect = opts.preConnect - this.authenticate = opts.authenticate - this.authorizePublish = opts.authorizePublish - this.authorizeSubscribe = opts.authorizeSubscribe + this._preConnect = opts.preConnect + this._authenticate = opts.authenticate + this._authorizePublish = opts.authorizePublish + this._authorizeSubscribe = opts.authorizeSubscribe this.authorizeForward = opts.authorizeForward this.published = opts.published + this._wrapDeliveryFunc = opts.wrapDeliveryFunc this.decodeProtocol = opts.decodeProtocol this.trustProxy = opts.trustProxy @@ -168,7 +167,13 @@ function Aedes (opts) { const clientId = packet.payload.toString() if (that.clients[clientId] && serverId !== that.id) { - that.clients[clientId].close(done) + if (that.clients[clientId].closed) { + // remove the client from the list if it is already closed + that.deleteClient(clientId) + done() + } else { + that.clients[clientId].close(done) + } } else { done() } @@ -181,6 +186,36 @@ function Aedes (opts) { util.inherits(Aedes, EventEmitter) +Aedes.prototype.handle = function handle (conn, req) { + conn.setMaxListeners(this.concurrency * 2) + // create a new Client instance for a new connection + // return, just to please standard + return new Client(this, conn, req) +} + +Aedes.prototype.preConnect = function preConnect (client, packet, callback) { + this._preConnect(client, packet, callback) +} + +Aedes.prototype.authenticate = function authenticate (client, username, password, callback) { + this._authenticate(client, username, password, callback) +} + +Aedes.prototype.authorizePublish = function authorizePublish (client, packet, callback) { + this._authorizePublish(client, packet, callback) +} + +Aedes.prototype.authorizeSubscribe = function authorizeSubscribe (client, sub, callback) { + this._authorizeSubscribe(client, sub, callback) +} + +Aedes.prototype.wrapDeliveryFunc = function wrapDeliveryFunc (client, func) { + if (this._wrapDeliveryFunc) { + return this._wrapDeliveryFunc(client, func) + } + return func +} + function storeRetained (packet, done) { if (packet.retain) { this.broker.persistence.storeRetained(packet, done) @@ -326,8 +361,7 @@ Aedes.prototype._finishRegisterClient = function (client) { } Aedes.prototype.unregisterClient = function (client) { - this.connectedClients-- - delete this.clients[client.id] + this.deleteClient(client.id) this.emit('clientDisconnect', client) this.publish({ topic: $SYS_PREFIX + this.id + '/disconnect/clients', @@ -335,6 +369,11 @@ Aedes.prototype.unregisterClient = function (client) { }, noop) } +Aedes.prototype.deleteClient = function (clientId) { + this.connectedClients-- + delete this.clients[clientId] +} + function closeClient (client, cb) { this.clients[client].close(cb) } diff --git a/docs/Aedes.md b/docs/Aedes.md index 67f15e7c..46bc4dc0 100644 --- a/docs/Aedes.md +++ b/docs/Aedes.md @@ -42,6 +42,7 @@ - `heartbeatInterval` `` an interval in millisconds at which server beats its health signal in `$SYS//heartbeat` topic. __Default__: `60000` - `id` `` aedes broker unique identifier. __Default__: `uuidv4()` - `connectTimeout` `` maximum waiting time in milliseconds waiting for a [`CONNECT`][CONNECT] packet. __Default__: `30000` + - `keepaliveLimit` `` maximum client keep alive time allowed, 0 means no limit. __Default__: `0` - Returns `` Create a new Aedes server. diff --git a/lib/client.js b/lib/client.js index 414d8e5f..5d2cbf10 100644 --- a/lib/client.js +++ b/lib/client.js @@ -5,11 +5,11 @@ const EventEmitter = require('events') const util = require('util') const eos = require('end-of-stream') const Packet = require('aedes-packet') -const write = require('./write') +const { write } = require('./write') const QoSPacket = require('./qos-packet') -const handleSubscribe = require('./handlers/subscribe') -const handleUnsubscribe = require('./handlers/unsubscribe') -const handle = require('./handlers') +const { handleSubscribe } = require('./handlers/subscribe') +const { handleUnsubscribe } = require('./handlers/unsubscribe') +const { handle } = require('./handlers') const { pipeline } = require('stream') const { through } = require('./utils') @@ -313,7 +313,14 @@ Client.prototype.close = function (done) { }, noop) } }) + } else if (will) { + // delete the persisted will even on clean disconnect https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc385349232 + that.broker.persistence.delWill({ + id: that.id, + brokerId: that.broker.id + }, noop) } + that.will = null // this function might be called twice that._will = null diff --git a/lib/handlers/connect.js b/lib/handlers/connect.js index a4c32d05..a18fd3cd 100644 --- a/lib/handlers/connect.js +++ b/lib/handlers/connect.js @@ -2,10 +2,10 @@ const retimer = require('retimer') const { pipeline } = require('stream') -const write = require('../write') +const { write } = require('../write') const QoSPacket = require('../qos-packet') const { through } = require('../utils') -const handleSubscribe = require('./subscribe') +const { handleSubscribe } = require('./subscribe') const uniqueId = require('hyperid')() function Connack (arg) { @@ -36,7 +36,8 @@ const errorMessages = [ 'identifier rejected', 'Server unavailable', 'bad user name or password', - 'not authorized' + 'not authorized', + 'keep alive limit exceeded' ] function handleConnect (client, packet, done) { @@ -66,6 +67,10 @@ function init (client, packet, done) { if (packet.protocolVersion === 3 && clientId.length > client.broker.maxClientsIdLength) { returnCode = 2 } + // check if the client keepalive is compatible with broker settings + if (client.broker.keepaliveLimit && (!packet.keepalive || packet.keepalive > client.broker.keepaliveLimit)) { + returnCode = 6 + } if (returnCode > 0) { const error = new Error(errorMessages[returnCode]) error.errorCode = returnCode @@ -264,4 +269,4 @@ function emptyQueueFilter (err, client, packet) { } } -module.exports = handleConnect +module.exports = { handleConnect } diff --git a/lib/handlers/index.js b/lib/handlers/index.js index a5dfaa8c..b611293a 100644 --- a/lib/handlers/index.js +++ b/lib/handlers/index.js @@ -1,13 +1,13 @@ 'use strict' -const handleConnect = require('./connect') -const handleSubscribe = require('./subscribe') -const handleUnsubscribe = require('./unsubscribe') -const handlePublish = require('./publish') -const handlePuback = require('./puback') -const handlePubrel = require('./pubrel') -const handlePubrec = require('./pubrec') -const handlePing = require('./ping') +const { handleConnect } = require('./connect') +const { handleSubscribe } = require('./subscribe') +const { handleUnsubscribe } = require('./unsubscribe') +const { handlePublish } = require('./publish') +const { handlePuback } = require('./puback') +const { handlePubrel } = require('./pubrel') +const { handlePubrec } = require('./pubrec') +const { handlePing } = require('./ping') function handle (client, packet, done) { if (packet.cmd === 'connect') { @@ -74,4 +74,4 @@ function finish (conn, packet, done) { done(error) } -module.exports = handle +module.exports = { handle } diff --git a/lib/handlers/ping.js b/lib/handlers/ping.js index a4c042cf..69b3dede 100644 --- a/lib/handlers/ping.js +++ b/lib/handlers/ping.js @@ -1,6 +1,6 @@ 'use strict' -const write = require('../write') +const { write } = require('../write') const pingResp = { cmd: 'pingresp' } @@ -10,4 +10,4 @@ function handlePing (client, packet, done) { write(client, pingResp, done) } -module.exports = handlePing +module.exports = { handlePing } diff --git a/lib/handlers/puback.js b/lib/handlers/puback.js index e4b419c9..83768612 100644 --- a/lib/handlers/puback.js +++ b/lib/handlers/puback.js @@ -8,4 +8,4 @@ function handlePuback (client, packet, done) { }) } -module.exports = handlePuback +module.exports = { handlePuback } diff --git a/lib/handlers/publish.js b/lib/handlers/publish.js index e30c9db6..5c3e1674 100644 --- a/lib/handlers/publish.js +++ b/lib/handlers/publish.js @@ -1,6 +1,6 @@ 'use strict' -const write = require('../write') +const { write } = require('../write') function PubAck (packet) { this.cmd = 'puback' @@ -62,4 +62,4 @@ function authorizePublish (packet, done) { this.broker.authorizePublish(this, packet, done) } -module.exports = handlePublish +module.exports = { handlePublish } diff --git a/lib/handlers/pubrec.js b/lib/handlers/pubrec.js index 5c914dd6..dc7a7f0c 100644 --- a/lib/handlers/pubrec.js +++ b/lib/handlers/pubrec.js @@ -1,6 +1,6 @@ 'use strict' -const write = require('../write') +const { write } = require('../write') function PubRel (packet) { this.cmd = 'pubrel' @@ -27,4 +27,4 @@ function handlePubrec (client, packet, done) { } } -module.exports = handlePubrec +module.exports = { handlePubrec } diff --git a/lib/handlers/pubrel.js b/lib/handlers/pubrel.js index 09dcc86d..672b6978 100644 --- a/lib/handlers/pubrel.js +++ b/lib/handlers/pubrel.js @@ -1,6 +1,6 @@ 'use strict' -const write = require('../write') +const { write } = require('../write') function ClientPacketStatus (client, packet) { this.client = client @@ -47,4 +47,4 @@ function pubrelDel (arg, done) { persistence.incomingDelPacket(this.client, arg.packet, done) } -module.exports = handlePubrel +module.exports = { handlePubrel } diff --git a/lib/handlers/subscribe.js b/lib/handlers/subscribe.js index 24704274..86ccb13a 100644 --- a/lib/handlers/subscribe.js +++ b/lib/handlers/subscribe.js @@ -4,7 +4,7 @@ const fastfall = require('fastfall') const Packet = require('aedes-packet') const { through } = require('../utils') const { validateTopic, $SYS_PREFIX } = require('../utils') -const write = require('../write') +const { write } = require('../write') const subscribeTopicActions = fastfall([ authorize, @@ -155,6 +155,16 @@ function addSubs (sub, done) { func = blockDollarSignTopics(func) } + // OTel instrumentation hook: wrap delivery function for tracing + func = broker.wrapDeliveryFunc(client, func) + + if (client.closed || client.broker.closed) { + // a hack, sometimes client.close() or broker.close() happened + // before authenticate() comes back + // we don't continue subscription here + return + } + if (!client.subscriptions[topic]) { client.subscriptions[topic] = new Subscription(qos, func, rh, rap, nl) broker.subscribe(topic, func, done) @@ -245,4 +255,4 @@ function completeSubscribe (err) { function noop () { } -module.exports = handleSubscribe +module.exports = { handleSubscribe } diff --git a/lib/handlers/unsubscribe.js b/lib/handlers/unsubscribe.js index e08c3176..b9cd7efd 100644 --- a/lib/handlers/unsubscribe.js +++ b/lib/handlers/unsubscribe.js @@ -1,6 +1,6 @@ 'use strict' -const write = require('../write') +const { write } = require('../write') const { validateTopic, $SYS_PREFIX } = require('../utils') function UnSubAck (packet) { @@ -101,4 +101,4 @@ function completeUnsubscribe (err) { function noop () { } -module.exports = handleUnsubscribe +module.exports = { handleUnsubscribe } diff --git a/lib/write.js b/lib/write.js index 716d81a3..a5d186c8 100644 --- a/lib/write.js +++ b/lib/write.js @@ -21,4 +21,4 @@ function write (client, packet, done) { setImmediate(done, error, client) } -module.exports = write +module.exports = { write } diff --git a/package.json b/package.json index 9799d4f4..74520c82 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "aedes", - "version": "0.49.0", + "version": "0.51.3", "description": "Stream-based MQTT broker", "main": "aedes.js", "types": "aedes.d.ts", @@ -16,7 +16,7 @@ "test:typescript": "tsd", "unit": "tap -J test/*.js", "unit:report": "tap -J test/*.js --cov --coverage-report=html --coverage-report=cobertura | tee out.tap", - "license-checker": "license-checker --production --onlyAllow=\"MIT;ISC;BSD-3-Clause;BSD-2-Clause\"", + "license-checker": "license-checker --production --onlyAllow=\"MIT;ISC;BSD-3-Clause;BSD-2-Clause;0BSD\"", "release": "read -p 'GITHUB_TOKEN: ' GITHUB_TOKEN && export GITHUB_TOKEN=$GITHUB_TOKEN && release-it --disable-metrics" }, "release-it": { @@ -89,48 +89,50 @@ } ], "license": "MIT", + "funding": { + "type": "opencollective", + "url": "https://opencollective.com/aedes" + }, "bugs": { "url": "https://github.com/moscajs/aedes/issues" }, "homepage": "https://github.com/moscajs/aedes#readme", "engines": { - "node": ">=14" + "node": ">=16" }, "devDependencies": { - "@sinonjs/fake-timers": "^10.0.2", - "@types/node": "^18.14.2", - "@typescript-eslint/eslint-plugin": "^5.54.0", - "@typescript-eslint/parser": "^5.54.0", + "@sinonjs/fake-timers": "^11.2.2", + "@types/node": "^20.11.17", + "@typescript-eslint/eslint-plugin": "^7.0.1", + "@typescript-eslint/parser": "^7.0.1", "concat-stream": "^2.0.0", "duplexify": "^4.1.2", "license-checker": "^25.0.1", - "markdownlint-cli": "^0.33.0", - "mqtt": "^4.3.7", + "markdownlint-cli": "^0.41.0", + "mqtt": "^5.3.5", "mqtt-connection": "^4.1.0", "pre-commit": "^1.2.2", "proxyquire": "^2.1.3", - "release-it": "^15.6.1", + "release-it": "^17.0.5", "snazzy": "^9.0.0", - "standard": "^17.0.0", - "tap": "^16.3.4", - "tsd": "^0.28.0", - "typescript": "^5.0.2", + "standard": "^17.1.0", + "tap": "^16.3.10", + "tsd": "^0.31.0", + "typescript": "^5.3.3", "websocket-stream": "^5.5.2" }, "dependencies": { - "aedes-packet": "^2.3.1", - "aedes-persistence": "8.1.3", - "bulk-write-stream": "^2.0.1", + "aedes-packet": "^3.0.0", + "aedes-persistence": "^9.1.2", "end-of-stream": "^1.4.4", "fastfall": "^1.5.1", "fastparallel": "^2.4.1", "fastseries": "^2.0.0", - "hyperid": "^3.0.0", - "mqemitter": "^4.5.0", - "mqtt-packet": "^7.1.2", - "readable-stream": "^3.6.0", - "retimer": "^3.0.0", + "hyperid": "^3.2.0", + "mqemitter": "^6.0.0", + "mqtt-packet": "^9.0.0", + "retimer": "^4.0.0", "reusify": "^1.0.4", - "uuid": "^8.3.2" + "uuid": "^10.0.0" } } diff --git a/test/connect.js b/test/connect.js index d1607386..fb6de7f9 100644 --- a/test/connect.js +++ b/test/connect.js @@ -97,6 +97,40 @@ test('reject client requested for unsupported protocol version', function (t) { }) }) +test('reject clients that exceed the keepalive limit', function (t) { + t.plan(3) + + const broker = aedes({ + keepaliveLimit: 100 + }) + t.teardown(broker.close.bind(broker)) + + const s = setup(broker) + + s.inStream.write({ + cmd: 'connect', + keepalive: 150 + }) + s.outStream.on('data', function (packet) { + console.log(packet) + t.same(packet, { + cmd: 'connack', + returnCode: 6, + length: 2, + qos: 0, + retain: false, + dup: false, + topic: null, + payload: null, + sessionPresent: false + }, 'unsuccessful connack, keep alive limit exceeded') + }) + broker.on('connectionError', function (client, err) { + t.equal(err.message, 'keep alive limit exceeded') + t.equal(broker.connectedClients, 0) + }) +}) + // Guarded in mqtt-packet test('reject clients with no clientId running on MQTT 3.1.0', function (t) { t.plan(3) diff --git a/test/events.js b/test/events.js index 40191395..76ee9ad1 100644 --- a/test/events.js +++ b/test/events.js @@ -221,3 +221,22 @@ test('Test backpressure aedes published function', function (t) { }) }) }) + +test('clear closed clients when the same clientId is managed by another broker', function (t) { + t.plan(2) + + const clientId = 'closed-client' + const aedesBroker = aedes() + + // simulate a closed client on the broker + aedesBroker.clients[clientId] = { closed: true, broker: aedesBroker } + aedesBroker.connectedClients = 1 + + // simulate the creation of the same client on another broker of the cluster + aedesBroker.publish({ topic: '$SYS/anotherbroker/new/clients', payload: clientId }, () => { + t.equal(aedesBroker.clients[clientId], undefined) // check that the closed client was removed + t.equal(aedesBroker.connectedClients, 0) + }) + + t.teardown(aedesBroker.close.bind(aedesBroker)) +}) diff --git a/test/types/aedes.test-d.ts b/test/types/aedes.test-d.ts index abac7cae..cc145919 100644 --- a/test/types/aedes.test-d.ts +++ b/test/types/aedes.test-d.ts @@ -1,4 +1,3 @@ - import { IncomingMessage } from 'node:http' import { Socket } from 'node:net' import type { @@ -7,10 +6,13 @@ import type { Client, Connection } from '../../aedes' -import Aedes, { createBroker } from '../../aedes' +import Aedes, { AedesOptions, createBroker } from '../../aedes' import type { AedesPublishPacket, ConnackPacket, ConnectPacket, PingreqPacket, PublishPacket, PubrelPacket, Subscription, SubscribePacket, UnsubscribePacket } from '../../types/packet' import { expectType } from 'tsd' +// Test for createBroker function +expectType<(options?: AedesOptions) => Aedes>(createBroker) + // Aedes server let broker = createBroker() expectType(broker) @@ -21,6 +23,7 @@ broker = new Aedes({ heartbeatInterval: 60000, connectTimeout: 30000, maxClientsIdLength: 23, + keepaliveLimit: 0, preConnect: (client: Client, packet: ConnectPacket, callback) => { if (client.req) { callback(new Error('not websocket stream'), false) diff --git a/test/will.js b/test/will.js index d2893d28..368544f6 100644 --- a/test/will.js +++ b/test/will.js @@ -420,6 +420,29 @@ test('does not deliver will when client sends a DISCONNECT', function (t) { }) }) +test('deletes from persistence on DISCONNECT', function (t) { + t.plan(2) + + const opts = { + clientId: 'abcde' + } + const broker = aedes() + t.teardown(broker.close.bind(broker)) + + const s = noError(willConnect(setup(broker), opts, function () { + s.inStream.end({ + cmd: 'disconnect' + }) + }), t) + + s.broker.persistence.getWill({ + id: opts.clientId + }, function (err, packet) { + t.error(err, 'no error') + t.notOk(packet) + }) +}) + test('does not store multiple will with same clientid', function (t) { t.plan(4) diff --git a/types/instance.d.ts b/types/instance.d.ts index f912a95e..d84f9d5c 100644 --- a/types/instance.d.ts +++ b/types/instance.d.ts @@ -68,6 +68,9 @@ type PublishedHandler = ( callback: (error?: Error | null) => void ) => void; +export type DeliverFunc = (packet: AedesPublishPacket, callback: () => void) => void; +export type DeliverFuncWrapper = (client: Client, func: DeliverFunc) => DeliverFunc; + export interface AedesOptions { mq?: any; id?: string; @@ -75,6 +78,7 @@ export interface AedesOptions { concurrency?: number; heartbeatInterval?: number; connectTimeout?: number; + keepaliveLimit?: number; queueLimit?: number; maxClientsIdLength?: number; preConnect?: PreConnectHandler; @@ -83,6 +87,7 @@ export interface AedesOptions { authorizeSubscribe?: AuthorizeSubscribeHandler; authorizeForward?: AuthorizeForwardHandler; published?: PublishedHandler; + wrapDeliveryFunc?: DeliverFuncWrapper; } export default class Aedes extends EventEmitter { @@ -156,4 +161,5 @@ export default class Aedes extends EventEmitter { authorizeSubscribe: AuthorizeSubscribeHandler authorizeForward: AuthorizeForwardHandler published: PublishedHandler + wrapDeliveryFunc: (client: Client, func: DeliverFunc) => DeliverFunc }