[backend] Stream consumers should only return valid and ongoing consumers (#14816)#14818
[backend] Stream consumers should only return valid and ongoing consumers (#14816)#14818richard-julien merged 2 commits intomasterfrom
Conversation
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #14818 +/- ##
==========================================
+ Coverage 32.35% 32.37% +0.01%
==========================================
Files 3111 3111
Lines 211873 211875 +2
Branches 38394 38415 +21
==========================================
+ Hits 68554 68594 +40
+ Misses 143319 143281 -38
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
This PR aims to ensure the backend only returns valid, ongoing stream consumers when listing consumers for a live stream (to avoid drawer errors and inflated consumer counts), and adds unit coverage around getConsumersForCollection.
Changes:
- Add unit tests for
getConsumersForCollectioncovering multiple Redis/pipeline scenarios and required fields. - Filter Redis consumer results to only include entries that have both
userIdandconnectedAt.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| opencti-platform/opencti-graphql/src/graphql/streamConsumerRegistry.ts | Adds filtering so only consumers with required fields (userId, connectedAt) are returned. |
| opencti-platform/opencti-graphql/tests/01-unit/utils/streamConsumerRegistry-test.ts | Introduces unit tests for getConsumersForCollection covering valid/invalid hashes and pipeline edge cases. |
| it('should clean up stale entries from the sorted set', async () => { | ||
| mockClient.zrangebyscore.mockResolvedValue([]); | ||
|
|
||
| await getConsumersForCollection('collection-cleanup'); | ||
|
|
||
| // zrangebyscore should be called with staleCutoff and +inf | ||
| expect(mockClient.zrangebyscore).toHaveBeenCalledWith( | ||
| expect.stringContaining('collection:collection-cleanup'), | ||
| expect.any(Number), | ||
| '+inf', | ||
| ); | ||
| }); |
There was a problem hiding this comment.
This test name suggests stale sorted-set entries are cleaned up, but the assertions only verify the zrangebyscore call arguments (and the implementation returns early before calling zremrangebyscore when there are no recent connections). Either rename the test to reflect what it verifies, or assert cleanup behavior with a non-empty zrangebyscore result.
| if (hash.userId && hash.connectedAt) { // only push if we have a userId and connectedAt (indicates a valid and still ongoing consumer) | ||
| results.push({ | ||
| connectionId: hash.connectionId || connectionIds[i], | ||
| collectionId: hash.collectionId || collectionId, | ||
| userId: hash.userId, | ||
| userEmail: hash.userEmail || '', | ||
| connectedAt: hash.connectedAt, | ||
| lastEventId: hash.lastEventId || '', |
There was a problem hiding this comment.
The filter only checks that hash.connectedAt is truthy, but the issue/PR description requires a valid connectedAt date. If connectedAt contains an invalid string (or 'undefined'), this consumer will still be returned and can still break the drawer when the UI tries to parse it. Consider validating with Date.parse(hash.connectedAt) (or a shared date parsing helper) and only including consumers when the parsed value is finite, optionally also normalizing to an ISO string.
|
|
||
| expect(result).toHaveLength(0); | ||
| }); | ||
|
|
There was a problem hiding this comment.
Tests cover missing connectedAt, but they don’t cover an invalid/garbage connectedAt value (e.g. not-a-date) which is the scenario that can still cause parsing errors in the consumer drawer. Add a unit test that returns a hash with an invalid connectedAt and assert it’s filtered out (and update the implementation accordingly).
| it('should skip consumers with invalid connectedAt value', async () => { | |
| mockClient.zrangebyscore.mockResolvedValue(['conn-1']); | |
| const invalidHash: Record<string, string> = { | |
| connectionId: 'conn-1', | |
| collectionId: 'collection-1', | |
| userId: 'user-1', | |
| userEmail: 'user@test.com', | |
| // connectedAt is present but not a valid date string | |
| connectedAt: 'not-a-date', | |
| lastEventId: '', | |
| deliveryRate: '0', | |
| processingRate: '0', | |
| resolutionRate: '0', | |
| lastUpdate: '1741257600000', | |
| }; | |
| mockPipeline.exec.mockResolvedValue([[null, invalidHash]]); | |
| const result = await getConsumersForCollection('collection-1'); | |
| expect(result).toHaveLength(0); | |
| }); |
Proposed changes
Related issues
#14816