Skip to content

Commit 22e6af4

Browse files
committed
chore: improve Twitter queue reliability, logging, and tests
1 parent d1cb9d3 commit 22e6af4

File tree

5 files changed

+328
-11
lines changed

5 files changed

+328
-11
lines changed

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "opensea-activity-bot",
3-
"version": "3.4.8",
3+
"version": "3.5.0",
44
"description": "A bot that shares new OpenSea events for a collection to Discord and Twitter.",
55
"author": "Ryan Ghods <ryan@ryanio.com>",
66
"license": "MIT",

src/platforms/twitter.ts

Lines changed: 57 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,10 @@ const groupConfig = getDefaultEventGroupConfig("TWITTER");
8686
const groupManager = new EventGroupManager(groupConfig);
8787

8888
// Generic async queue for tweeting
89+
logger.debug(
90+
`${logStart} Queue config: delay=${PER_TWEET_DELAY_MS}ms backoffBase=${BACKOFF_BASE_MS}ms backoffMax=${BACKOFF_MAX_MS}ms timeout=${PROCESSING_TIMEOUT_MS}ms`
91+
);
92+
8993
const tweetQueue = new AsyncQueue<TweetQueueItem>({
9094
perItemDelayMs: PER_TWEET_DELAY_MS,
9195
backoffBaseMs: BACKOFF_BASE_MS,
@@ -133,12 +137,20 @@ const tweetQueue = new AsyncQueue<TweetQueueItem>({
133137
const dayRemaining = rateLimit?.day?.remaining;
134138
const dayReset = rateLimit?.day?.reset;
135139
if (dayRemaining === 0 && typeof dayReset === "number") {
136-
return {
140+
const classification = {
137141
type: "rate_limit",
138142
pauseUntilMs: (dayReset as number) * MS_PER_SECOND,
139143
} as const;
144+
logger.debug(
145+
`${logStart} classifyError: code=${errCode} remaining=${dayRemaining} reset=${dayReset} type=${classification.type}`
146+
);
147+
return classification;
140148
}
141-
return { type: "transient" } as const;
149+
const classification = { type: "transient" } as const;
150+
logger.debug(
151+
`${logStart} classifyError: code=${errCode} remaining=${dayRemaining} type=${classification.type}`
152+
);
153+
return classification;
142154
}
143155
const status =
144156
(error as { data?: { status?: number }; status?: number })?.data
@@ -149,16 +161,45 @@ const tweetQueue = new AsyncQueue<TweetQueueItem>({
149161
status === 0 ||
150162
(error as { name?: string })?.name === "FetchError"
151163
) {
152-
return { type: "transient" } as const;
164+
const classification = { type: "transient" } as const;
165+
logger.debug(
166+
`${logStart} classifyError: status=${status ?? "unknown"} type=${classification.type}`
167+
);
168+
return classification;
153169
}
154-
return { type: "fatal" } as const;
170+
const classification = { type: "fatal" } as const;
171+
logger.debug(
172+
`${logStart} classifyError: status=${status ?? "unknown"} type=${classification.type}`
173+
);
174+
return classification;
155175
},
156176
});
157177

158178
const keyForQueueItem = (item: TweetQueueItem): string => {
159179
const ev = item.event;
160180
if (isGroupedEvent(ev)) {
181+
// For transaction-based groups, we want a stable key per tx hash so that
182+
// repeated polling doesn't enqueue duplicate work for the same on-chain tx.
161183
const tx = ev.txHash ?? txHashFor(ev.events?.[0]) ?? "unknown";
184+
185+
if (typeof tx === "string" && tx.startsWith("actor:")) {
186+
// Actor-based groups reuse a synthetic "actor:<kind>:<address>" tx hash.
187+
// If we keyed only on this value, we'd permanently suppress future groups
188+
// for the same actor after the first tweet.
189+
//
190+
// To allow multiple distinct actor groups over time while still deduping
191+
// true duplicates, incorporate the time window of the grouped events
192+
// into the queue key. This keeps keys stable for the same group across
193+
// retries, but different once new events arrive.
194+
const firstTs = ev.events[0]?.event_timestamp;
195+
const lastTs = ev.events.at(-1)?.event_timestamp ?? firstTs;
196+
const window =
197+
firstTs !== undefined && lastTs !== undefined
198+
? `${firstTs}-${lastTs}`
199+
: "unknown-window";
200+
return `group:${tx}|${window}`;
201+
}
202+
162203
return `group:${tx}`;
163204
}
164205
return eventKeyFor(item.event as OpenSeaAssetEvent);
@@ -561,6 +602,18 @@ const enqueueIndividualEvents = (
561602
for (const event of processableEvents) {
562603
tweetQueue.enqueue({ event });
563604
}
605+
606+
if (processableEvents.length > 0) {
607+
const MAX_SAMPLE = 5;
608+
const sampleKeys = processableEvents
609+
.slice(0, MAX_SAMPLE)
610+
.map((e) => eventKeyFor(e));
611+
logger.debug(
612+
`${logStart} Enqueued ${processableEvents.length} single event(s) for tweeting, sampleKeys=[${sampleKeys.join(
613+
", "
614+
)}] queue=${tweetQueue.size()}`
615+
);
616+
}
564617
};
565618

566619
const logProcessingSummary = (

src/utils/queue.ts

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,12 @@ export class AsyncQueue<T> {
6363
}
6464

6565
enqueue(item: T) {
66-
this.list.push({ item, attempts: 0 });
66+
const workItem: WorkItem<T> = { item, attempts: 0 };
67+
this.list.push(workItem);
68+
if (this.options.debug) {
69+
const key = this.options.keyFor(item);
70+
logger.debug(`[Queue] Enqueued item: ${key} size=${this.list.length}`);
71+
}
6772

6873
// Auto-start the queue if it's idle
6974
if (!this.isProcessing) {
@@ -169,8 +174,14 @@ export class AsyncQueue<T> {
169174
private async pauseIfNeeded(now: number) {
170175
if (this.pauseUntilMs > now) {
171176
const waitMs = this.pauseUntilMs - now;
177+
if (this.options.debug) {
178+
logger.debug(`[Queue] Pause in effect for ${waitMs}ms`);
179+
}
172180
await timeout(waitMs);
173181
this.pauseUntilMs = 0;
182+
if (this.options.debug) {
183+
logger.debug("[Queue] Pause complete, resuming processing");
184+
}
174185
}
175186
}
176187

@@ -186,7 +197,8 @@ export class AsyncQueue<T> {
186197
const key = this.options.keyFor(next.item);
187198
const timeoutMs = this.getProcessingTimeoutMs();
188199

189-
logger.info(`[Queue] Processing item: ${key}`);
200+
const attemptNumber = next.attempts + 1;
201+
logger.info(`[Queue] Processing item: ${key} (attempt ${attemptNumber})`);
190202
const startTime = Date.now();
191203

192204
try {
@@ -197,7 +209,9 @@ export class AsyncQueue<T> {
197209
`Processing timed out after ${timeoutMs}ms for item: ${key}`
198210
);
199211
const durationMs = Date.now() - startTime;
200-
logger.info(`[Queue] Completed item: ${key} (${durationMs}ms)`);
212+
logger.info(
213+
`[Queue] Completed item: ${key} (${durationMs}ms, attempt ${attemptNumber})`
214+
);
201215
return true;
202216
} catch (error: unknown) {
203217
const durationMs = Date.now() - startTime;
@@ -225,14 +239,19 @@ export class AsyncQueue<T> {
225239
this.options.backoffBaseMs
226240
);
227241
this.pauseUntilMs = Date.now() + waitMs;
228-
logger.warn(`[Queue] Rate limited, pausing for ${waitMs}ms`);
242+
logger.warn(
243+
`[Queue] Rate limited for item: ${key}, pausing for ${waitMs}ms`
244+
);
245+
if (this.options.debug) {
246+
logger.debug("[Queue] Rate limit error details:", error);
247+
}
229248
return false; // retry same item after pause
230249
}
231250
if (classification.type === "transient") {
232251
next.attempts += 1;
233252
const waitMs = this.calcBackoffMs(next.attempts);
234253
logger.warn(
235-
`[Queue] Transient error (attempt ${next.attempts}), backing off for ${waitMs}ms`
254+
`[Queue] Transient error for item: ${key} (attempt ${next.attempts}), backing off for ${waitMs}ms`
236255
);
237256
if (this.options.debug) {
238257
logger.debug("[Queue] Error details:", error);
@@ -242,7 +261,7 @@ export class AsyncQueue<T> {
242261
}
243262
// fatal
244263
logger.error(
245-
`[Queue] Fatal error processing item (${durationMs}ms), dropping:`,
264+
`[Queue] Fatal error processing item ${key} (${durationMs}ms), dropping:`,
246265
error
247266
);
248267
this.list.shift();

test/platforms/twitter.test.ts

Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import fs from "node:fs";
22
import path from "node:path";
33
import { jest } from "@jest/globals";
4+
import { createMintBatch, TEST_MINTER_1 } from "../helpers";
45

56
// Mock env
67
process.env.TWITTER_EVENTS = "sale,listing,offer,transfer,burn";
@@ -352,6 +353,191 @@ describe("twitter flows", () => {
352353
)
353354
).toBe(true);
354355
});
356+
357+
it("retries on 429 rate limit errors from Twitter and eventually tweets", async () => {
358+
const originalEventsEnv = process.env.TWITTER_EVENTS;
359+
try {
360+
process.env.TWITTER_EVENTS = "mint";
361+
process.env.TWITTER_QUEUE_DELAY_MS = "0";
362+
process.env.TWITTER_BACKOFF_BASE_MS = "1";
363+
process.env.TWITTER_BACKOFF_MAX_MS = "5";
364+
365+
const baseTimestamp = 3_000_000_000;
366+
const [mintEvent] = createMintBatch(1, TEST_MINTER_1, baseTimestamp);
367+
368+
const m = require("twitter-api-v2") as {
369+
__mockReadWrite: { v2: { tweet: jest.Mock } };
370+
};
371+
const tweetMock = m.__mockReadWrite.v2.tweet as jest.Mock;
372+
373+
let firstCall = true;
374+
tweetMock.mockImplementation(() => {
375+
if (firstCall) {
376+
firstCall = false;
377+
const error = {
378+
code: 429,
379+
rateLimit: {
380+
day: {
381+
remaining: 0,
382+
reset: Math.floor(Date.now() / 1000),
383+
},
384+
},
385+
};
386+
throw error;
387+
}
388+
return Promise.resolve({ data: { id: "1", text: "ok" } });
389+
});
390+
391+
const { tweetEvents } = await import("../../src/platforms/twitter");
392+
tweetEvents([
393+
mintEvent,
394+
] as unknown as import("../../src/types").OpenSeaAssetEvent[]);
395+
396+
await jest.runAllTimersAsync();
397+
expect(tweetMock).toHaveBeenCalledTimes(2);
398+
} finally {
399+
process.env.TWITTER_EVENTS = originalEventsEnv;
400+
}
401+
});
402+
403+
it("retries transient 5xx errors from Twitter and eventually tweets", async () => {
404+
const originalEventsEnv = process.env.TWITTER_EVENTS;
405+
try {
406+
process.env.TWITTER_EVENTS = "mint";
407+
process.env.TWITTER_QUEUE_DELAY_MS = "0";
408+
process.env.TWITTER_BACKOFF_BASE_MS = "1";
409+
process.env.TWITTER_BACKOFF_MAX_MS = "5";
410+
411+
const baseTimestamp = 3_100_000_000;
412+
const [mintEvent] = createMintBatch(1, TEST_MINTER_1, baseTimestamp);
413+
414+
const m = require("twitter-api-v2") as {
415+
__mockReadWrite: { v2: { tweet: jest.Mock } };
416+
};
417+
const tweetMock = m.__mockReadWrite.v2.tweet as jest.Mock;
418+
419+
let attempts = 0;
420+
tweetMock.mockImplementation(() => {
421+
attempts += 1;
422+
if (attempts < 3) {
423+
const error = { status: 503 };
424+
throw error;
425+
}
426+
return Promise.resolve({ data: { id: "1", text: "ok" } });
427+
});
428+
429+
const { tweetEvents } = await import("../../src/platforms/twitter");
430+
tweetEvents([
431+
mintEvent,
432+
] as unknown as import("../../src/types").OpenSeaAssetEvent[]);
433+
434+
await jest.runAllTimersAsync();
435+
expect(tweetMock).toHaveBeenCalledTimes(3);
436+
} finally {
437+
process.env.TWITTER_EVENTS = originalEventsEnv;
438+
}
439+
});
440+
441+
it("drops fatal 4xx errors from Twitter without infinite retries", async () => {
442+
const originalEventsEnv = process.env.TWITTER_EVENTS;
443+
try {
444+
process.env.TWITTER_EVENTS = "mint";
445+
process.env.TWITTER_QUEUE_DELAY_MS = "0";
446+
process.env.TWITTER_BACKOFF_BASE_MS = "1";
447+
process.env.TWITTER_BACKOFF_MAX_MS = "5";
448+
449+
const baseTimestamp = 3_200_000_000;
450+
const [mintEvent] = createMintBatch(1, TEST_MINTER_1, baseTimestamp);
451+
452+
const m = require("twitter-api-v2") as {
453+
__mockReadWrite: { v2: { tweet: jest.Mock } };
454+
};
455+
const tweetMock = m.__mockReadWrite.v2.tweet as jest.Mock;
456+
457+
tweetMock.mockImplementation(() => {
458+
const error = { status: 400 };
459+
throw error;
460+
});
461+
462+
const { tweetEvents } = await import("../../src/platforms/twitter");
463+
tweetEvents([
464+
mintEvent,
465+
] as unknown as import("../../src/types").OpenSeaAssetEvent[]);
466+
467+
await jest.runAllTimersAsync();
468+
expect(tweetMock).toHaveBeenCalledTimes(1);
469+
} finally {
470+
process.env.TWITTER_EVENTS = originalEventsEnv;
471+
}
472+
});
473+
474+
it("tweets single mint events when below group size threshold", async () => {
475+
const originalEventsEnv = process.env.TWITTER_EVENTS;
476+
try {
477+
process.env.TWITTER_EVENTS = "mint";
478+
process.env.TWITTER_EVENT_GROUP_SETTLE_MS = "60000";
479+
process.env.TWITTER_EVENT_GROUP_MIN_GROUP_SIZE = "3";
480+
process.env.TWITTER_QUEUE_DELAY_MS = "0";
481+
482+
const baseTimestamp = 1_234_567_890;
483+
const mintEvents = createMintBatch(2, TEST_MINTER_1, baseTimestamp);
484+
485+
const { tweetEvents } = await import("../../src/platforms/twitter");
486+
tweetEvents(mintEvents);
487+
488+
const m = require("twitter-api-v2") as {
489+
__mockReadWrite: { v2: { tweet: jest.Mock } };
490+
};
491+
492+
await jest.runAllTimersAsync();
493+
const calls = (m.__mockReadWrite.v2.tweet as jest.Mock).mock.calls;
494+
495+
// Below minGroupSize, mints should be tweeted individually
496+
expect(calls.length).toBe(2);
497+
} finally {
498+
process.env.TWITTER_EVENTS = originalEventsEnv;
499+
}
500+
});
501+
502+
it("tweets multiple actor-based mint groups for the same minter over time", async () => {
503+
const originalEventsEnv = process.env.TWITTER_EVENTS;
504+
try {
505+
process.env.TWITTER_EVENTS = "mint";
506+
process.env.TWITTER_EVENT_GROUP_SETTLE_MS = "0";
507+
process.env.TWITTER_EVENT_GROUP_MIN_GROUP_SIZE = "2";
508+
process.env.TWITTER_QUEUE_DELAY_MS = "0";
509+
510+
const baseTimestamp = 2_000_000_000;
511+
const firstBatch = createMintBatch(2, TEST_MINTER_1, baseTimestamp);
512+
const secondBatch = createMintBatch(
513+
2,
514+
TEST_MINTER_1,
515+
baseTimestamp + 100
516+
);
517+
518+
const { tweetEvents } = await import("../../src/platforms/twitter");
519+
520+
// First group for this minter
521+
tweetEvents(firstBatch);
522+
const m = require("twitter-api-v2") as {
523+
__mockReadWrite: { v2: { tweet: jest.Mock } };
524+
};
525+
await jest.runAllTimersAsync();
526+
let calls = (m.__mockReadWrite.v2.tweet as jest.Mock).mock.calls;
527+
expect(calls.length).toBe(1);
528+
529+
// Second independent group for the same minter should also tweet
530+
tweetEvents(secondBatch);
531+
await jest.runAllTimersAsync();
532+
calls = (m.__mockReadWrite.v2.tweet as jest.Mock).mock.calls;
533+
534+
// Previously this would stay at 1 due to actor-based queue key dedupe.
535+
// With the updated keying (including timestamp window), we expect 2.
536+
expect(calls.length).toBe(2);
537+
} finally {
538+
process.env.TWITTER_EVENTS = originalEventsEnv;
539+
}
540+
});
355541
});
356542

357543
// Add basic tests for matchesSelection mint/burn classification

0 commit comments

Comments
 (0)