Skip to content

Commit 7aee5e1

Browse files
committed
fix: release undersized groups for individual processing
- When a settled group has fewer unprocessed events than minGroupSize, release those events for individual processing instead of keeping them stuck in the aggregator forever - Added debug logging to OpenSea dedupe layer to show exactly which events are being filtered and why - Updated getReadyGroups to return both groups and released individuals - processEventsWithAggregator now includes released individuals in processableEvents
1 parent 4bfc47f commit 7aee5e1

File tree

4 files changed

+47
-25
lines changed

4 files changed

+47
-25
lines changed

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "opensea-activity-bot",
3-
"version": "3.5.9",
3+
"version": "3.5.10",
44
"description": "A bot that shares new OpenSea events for a collection to Discord and Twitter.",
55
"author": "Ryan Ghods <ryan@ryanio.com>",
66
"license": "MIT",

src/opensea.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -471,12 +471,19 @@ const processEventFilters = (
471471
for (const event of processed) {
472472
const key = canonicalEventKeyFor(event);
473473
if (eventStateStore.hasKey(key)) {
474+
logger.debug(
475+
`[Dedupe] Skipping already-seen event: ${event.event_type} token=${event.nft?.identifier ?? "?"} key=${key}`
476+
);
474477
continue;
475478
}
476479
deduped.push(event);
477480
newKeys.push(key);
478481
}
479482

483+
if (preDedup > 0 && deduped.length === 0) {
484+
logger.debug(`[Dedupe] All ${preDedup} events were filtered as duplicates`);
485+
}
486+
480487
eventStateStore.markProcessed(newKeys);
481488
processed = deduped;
482489
stats.deduped = preDedup - deduped.length;

src/utils/event-grouping.ts

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,11 @@ export class EventGroupManager {
125125
}
126126
}
127127

128-
// Get ready event groups
129-
getReadyGroups(): Array<{ tx: string; events: OpenSeaAssetEvent[] }> {
128+
// Get ready event groups and any individual events released from undersized groups
129+
getReadyGroups(): {
130+
groups: Array<{ tx: string; events: OpenSeaAssetEvent[] }>;
131+
releasedIndividuals: OpenSeaAssetEvent[];
132+
} {
130133
// Log pending groups state before checking
131134
if (this.actorAgg.size > 0) {
132135
const now = Date.now();
@@ -163,7 +166,8 @@ export class EventGroupManager {
163166
private processSettledGroup(
164167
key: string,
165168
agg: { events: OpenSeaAssetEvent[]; rawCount: number },
166-
out: Array<{ tx: string; events: OpenSeaAssetEvent[] }>
169+
out: Array<{ tx: string; events: OpenSeaAssetEvent[] }>,
170+
releasedIndividuals: OpenSeaAssetEvent[]
167171
): void {
168172
const unprocessed = this.filterUnprocessedEvents(agg.events, key);
169173

@@ -182,11 +186,13 @@ export class EventGroupManager {
182186
}
183187

184188
if (unprocessed.length > 0) {
185-
// Events exist but below minGroupSize - keep for individual processing
186-
logger.warn(
187-
`[EventGroup] Group ${key} has ${unprocessed.length} unprocessed events ` +
188-
`(below minGroupSize=${this.minGroupSize}), keeping for individual processing`
189+
// Events exist but below minGroupSize - release for individual processing
190+
logger.info(
191+
`[EventGroup] 📤 Releasing ${unprocessed.length} event(s) from group ${key} for individual processing ` +
192+
`(below minGroupSize=${this.minGroupSize})`
189193
);
194+
releasedIndividuals.push(...unprocessed);
195+
this.actorAgg.delete(key);
190196
return;
191197
}
192198

@@ -197,11 +203,12 @@ export class EventGroupManager {
197203
this.actorAgg.delete(key);
198204
}
199205

200-
private collectReadyActorGroups(): Array<{
201-
tx: string;
202-
events: OpenSeaAssetEvent[];
203-
}> {
204-
const out: Array<{ tx: string; events: OpenSeaAssetEvent[] }> = [];
206+
private collectReadyActorGroups(): {
207+
groups: Array<{ tx: string; events: OpenSeaAssetEvent[] }>;
208+
releasedIndividuals: OpenSeaAssetEvent[];
209+
} {
210+
const groups: Array<{ tx: string; events: OpenSeaAssetEvent[] }> = [];
211+
const releasedIndividuals: OpenSeaAssetEvent[] = [];
205212
const now = Date.now();
206213
this.pruneStaleActorGroups(now);
207214

@@ -211,10 +218,10 @@ export class EventGroupManager {
211218
const meetsMinSize = agg.rawCount >= this.minGroupSize;
212219

213220
if (meetsMinSize && isSettled) {
214-
this.processSettledGroup(key, agg, out);
221+
this.processSettledGroup(key, agg, groups, releasedIndividuals);
215222
}
216223
}
217-
return out;
224+
return { groups, releasedIndividuals };
218225
}
219226

220227
private actorKeyForEvent(event: OpenSeaAssetEvent): string | undefined {
@@ -684,17 +691,23 @@ export const processEventsWithAggregator = (
684691
}
685692

686693
// Always flush ready groups (even if no new events)
687-
const readyGroups = groupManager.getReadyGroups();
694+
// Also get any individual events released from undersized groups
695+
const { groups: readyGroups, releasedIndividuals } =
696+
groupManager.getReadyGroups();
688697

689698
// Filter processable events (only if we have new events)
690699
const { processableEvents, skippedDupes, skippedPending } =
691700
events.length > 0
692701
? groupManager.filterProcessableEvents(events)
693702
: { processableEvents: [], skippedDupes: 0, skippedPending: 0 };
694703

704+
// Add released individuals to processable events
705+
// These are events from settled groups that didn't meet minGroupSize
706+
const allProcessable = [...processableEvents, ...releasedIndividuals];
707+
695708
return {
696709
readyGroups,
697-
processableEvents,
710+
processableEvents: allProcessable,
698711
skippedDupes,
699712
skippedPending,
700713
};

test/utils/event-grouping.test.ts

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -397,17 +397,17 @@ describe("eventGrouping-utils", () => {
397397
groupManager.addEvents([mockEvent1, mockEvent2]);
398398

399399
// Initially no ready groups
400-
let readyGroups = groupManager.getReadyGroups();
401-
expect(readyGroups).toHaveLength(0);
400+
let result = groupManager.getReadyGroups();
401+
expect(result.groups).toHaveLength(0);
402402

403403
// Wait for settle time (config.settleMs is 1000, so wait 1100 to be safe)
404404
const SETTLE_BUFFER_MS = 1100;
405405
jest.advanceTimersByTime(SETTLE_BUFFER_MS);
406406

407-
readyGroups = groupManager.getReadyGroups();
408-
expect(readyGroups).toHaveLength(1);
409-
expect(readyGroups[0].tx).toBe("actor:purchase:0xbuyer1");
410-
expect(readyGroups[0].events).toHaveLength(2);
407+
result = groupManager.getReadyGroups();
408+
expect(result.groups).toHaveLength(1);
409+
expect(result.groups[0].tx).toBe("actor:purchase:0xbuyer1");
410+
expect(result.groups[0].events).toHaveLength(2);
411411
});
412412

413413
it("does not flush a group if duplicates reduce below min size", () => {
@@ -419,8 +419,10 @@ describe("eventGrouping-utils", () => {
419419

420420
const SETTLE_BUFFER_MS = 1100;
421421
jest.advanceTimersByTime(SETTLE_BUFFER_MS);
422-
const readyGroups = groupManager.getReadyGroups();
423-
expect(readyGroups.length).toBe(0);
422+
const result = groupManager.getReadyGroups();
423+
// No groups returned (below minGroupSize), but the single event should be released
424+
expect(result.groups.length).toBe(0);
425+
expect(result.releasedIndividuals.length).toBe(1);
424426
});
425427
});
426428

0 commit comments

Comments
 (0)