Skip to content

Commit 1423135

Browse files
committed
feat(sending): Support 'Idempotency-Key' header for /submit message requests and 'X-EE-Idempotency-Key' SMTP header to avoid sending duplicate emails
1 parent b9d504e commit 1423135

File tree

6 files changed

+238
-11
lines changed

6 files changed

+238
-11
lines changed

lib/db.js

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ const hSetNewScript = fs.readFileSync(pathlib.join(__dirname, 'lua/h-set-new.lua
128128
const hIncrbyExistsScript = fs.readFileSync(pathlib.join(__dirname, 'lua/h-incrby-exists.lua'), 'utf-8');
129129
const eeListAddScript = fs.readFileSync(pathlib.join(__dirname, 'lua/ee-list-add.lua'), 'utf-8');
130130
const eeListRemoveScript = fs.readFileSync(pathlib.join(__dirname, 'lua/ee-list-remove.lua'), 'utf-8');
131+
const eeGetIdempotencyScript = fs.readFileSync(pathlib.join(__dirname, 'lua/ee-get-idempotency.lua'), 'utf-8');
131132

132133
redis.defineCommand('zExpunge', {
133134
numberOfKeys: 2,
@@ -204,6 +205,11 @@ redis.defineCommand('eeListRemove', {
204205
lua: eeListRemoveScript
205206
});
206207

208+
redis.defineCommand('eeGetIdempotency', {
209+
numberOfKeys: 1,
210+
lua: eeGetIdempotencyScript
211+
});
212+
207213
module.exports.redis = redis;
208214
module.exports.notifyQueue = notifyQueue;
209215
module.exports.submitQueue = submitQueue;

lib/email-client/base-client.js

Lines changed: 134 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
'use strict';
22

3-
const { parentPort } = require('worker_threads');
3+
const { parentPort, threadId: workerThreadId } = require('worker_threads');
44
const crypto = require('crypto');
55
const logger = require('../logger');
66
const { webhooks: Webhooks } = require('../webhooks');
@@ -43,7 +43,8 @@ const {
4343
readEnvValue,
4444
emitChangeEvent,
4545
filterEmptyObjectValues,
46-
resolveCredentials
46+
resolveCredentials,
47+
getDateBuckets
4748
} = require('../tools');
4849

4950
const {
@@ -84,6 +85,8 @@ async function metricsMeta(meta, logger, key, method, ...args) {
8485
}
8586
}
8687

88+
const pendingIdempotencyOperations = new Map();
89+
8790
class BaseClient {
8891
constructor(account, options) {
8992
this.account = account;
@@ -619,7 +622,136 @@ class BaseClient {
619622
return oauthCredentials;
620623
}
621624

625+
async checkIdempotencyKey(objName, idempotencyKey) {
626+
if (!idempotencyKey) {
627+
return null;
628+
}
629+
630+
const idempotencyKeyName = objName ? `${objName}/${idempotencyKey}` : idempotencyKey;
631+
632+
// check last 24-48 hours, so probably will return 2 keys, at rare cases 1
633+
const { bucketKeys } = getDateBuckets(1 * 24 * 3600);
634+
635+
let idempotencyResultStr = await this.redis.eeGetIdempotency(
636+
`${REDIS_PREFIX}idempotency:bucket:`,
637+
idempotencyKeyName,
638+
this.runIndex,
639+
workerThreadId,
640+
bucketKeys.join(',')
641+
);
642+
643+
let idempotencyData;
644+
try {
645+
idempotencyData = JSON.parse(idempotencyResultStr);
646+
idempotencyData.idempotencyKey = idempotencyKey;
647+
idempotencyData.idempotencyKeyName = idempotencyKeyName;
648+
} catch (err) {
649+
this.logger.error({ msg: 'Failed to parse idempotency data', idempotencyKey, cachedValue: idempotencyResultStr, err });
650+
}
651+
652+
if (idempotencyData?.status === 'new') {
653+
if (pendingIdempotencyOperations.has(idempotencyKeyName)) {
654+
let error = new Error('Cancelling pending operation');
655+
for (let promise of pendingIdempotencyOperations.get(idempotencyKeyName)) {
656+
promise.reject(error);
657+
}
658+
}
659+
pendingIdempotencyOperations.set(idempotencyKeyName, []);
660+
}
661+
662+
// use existing response
663+
switch (idempotencyData.status) {
664+
case 'completed':
665+
idempotencyData.returnValue = Object.assign({}, idempotencyData.result, {
666+
idempotency: { key: idempotencyData.idempotencyKey, status: 'HIT' }
667+
});
668+
break;
669+
case 'pending': {
670+
let queueResult = await new Promise((resolve, reject) => {
671+
pendingIdempotencyOperations.get(idempotencyData.idempotencyKeyName).push({ resolve, reject });
672+
});
673+
idempotencyData.returnValue = Object.assign({}, queueResult, { idempotency: { key: idempotencyData.idempotencyKey, status: 'HIT' } });
674+
break;
675+
}
676+
}
677+
678+
return idempotencyData || null;
679+
}
680+
681+
async updateIdempotencyData(idempotencyData, result) {
682+
if (idempotencyData?.bucketKey && idempotencyData?.idempotencyKeyName) {
683+
// update status and result
684+
try {
685+
await this.redis.hset(
686+
idempotencyData?.bucketKey,
687+
idempotencyData.idempotencyKeyName,
688+
JSON.stringify({
689+
status: 'completed',
690+
runIndex: idempotencyData.runIndex,
691+
threadId: idempotencyData.threadId,
692+
result
693+
})
694+
);
695+
} catch (err) {
696+
this.logger.error({
697+
msg: 'Failed to update idempotency data',
698+
idempotencyKey: idempotencyData.idempotencyKey,
699+
err
700+
});
701+
}
702+
703+
for (let promise of pendingIdempotencyOperations.get(idempotencyData.idempotencyKeyName)) {
704+
promise.resolve(result);
705+
}
706+
}
707+
}
708+
709+
async clearIdempotencyData(idempotencyData, error) {
710+
if (idempotencyData?.status === 'new' && idempotencyData?.bucketKey && idempotencyData?.idempotencyKeyName) {
711+
// delete failed attempt information
712+
try {
713+
await this.redis.hdel(idempotencyData?.bucketKey, idempotencyData.idempotencyKeyName);
714+
} catch (err) {
715+
this.logger.error({
716+
msg: 'Failed to clear idempotency data',
717+
idempotencyKey: idempotencyData.idempotencyKey,
718+
err
719+
});
720+
}
721+
722+
for (let promise of pendingIdempotencyOperations.get(idempotencyData.idempotencyKeyName)) {
723+
promise.reject(error);
724+
}
725+
}
726+
}
727+
622728
async queueMessage(data, meta, connectionOptions) {
729+
let idempotencyData;
730+
731+
if (meta.idempotencyKey) {
732+
idempotencyData = await this.checkIdempotencyKey(`mq/${this.account}`, meta.idempotencyKey);
733+
if (idempotencyData?.returnValue) {
734+
return idempotencyData?.returnValue;
735+
}
736+
}
737+
738+
let queueResult;
739+
try {
740+
queueResult = await this.queueMessageHandler(data, meta, connectionOptions);
741+
await this.updateIdempotencyData(idempotencyData, queueResult);
742+
} catch (err) {
743+
await this.clearIdempotencyData(idempotencyData, err);
744+
throw err;
745+
}
746+
747+
if (idempotencyData?.status === 'new') {
748+
return Object.assign({}, queueResult, { idempotency: { key: idempotencyData.idempotencyKey, status: 'MISS' } });
749+
}
750+
751+
return queueResult;
752+
}
753+
754+
async queueMessageHandler(data, meta, connectionOptions) {
623755
let accountData = await this.accountObject.loadAccountData();
624756

625757
let gatewayData;

lib/lua/ee-get-idempotency.lua

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
2+
local bucketKeyPrefix = KEYS[1];
3+
local idempotencyKey = ARGV[1];
4+
5+
-- Use run index and thread ID check to determine if previously pending job has been terminated
6+
-- EmailEngine assigns a specific thread to process all actions for the same user
7+
local runIndex = tonumber(ARGV[2]);
8+
local threadId = tonumber(ARGV[3]);
9+
10+
local buckets = ARGV[4];
11+
12+
local lastBucket = nil;
13+
14+
for bucket in string.gmatch(buckets, "([^,]+)") do
15+
local bucketKey = bucketKeyPrefix .. bucket;
16+
if redis.call("HEXISTS", bucketKey, idempotencyKey) == 1 then
17+
local existingValue = redis.call("HGET", bucketKeyPrefix .. bucket, idempotencyKey);
18+
19+
local parsedValue = cjson.decode(existingValue);
20+
local existingStatus = parsedValue["status"];
21+
local existingRunIndex = parsedValue["runIndex"];
22+
local existingThreadId = parsedValue["threadId"];
23+
24+
if existingStatus == "pending" and (existingRunIndex < runIndex or existingThreadId ~= threadId) then
25+
-- found match but ignore it
26+
redis.log( redis.LOG_NOTICE, "EE: Ignoring pending task with old run index: " .. existingValue .. " Current run index: " .. tostring(runIndex).. " Current thread ID: " .. tostring(threadId));
27+
else
28+
parsedValue['bucketKey'] = bucketKey;
29+
return existingValue;
30+
end
31+
32+
end
33+
34+
lastBucket = bucket;
35+
end
36+
37+
-- No idempotency key found, create a new entry to the newest bucket
38+
local bucketKey = bucketKeyPrefix .. lastBucket;
39+
40+
local newValue = {
41+
['status'] = 'pending',
42+
['runIndex'] = runIndex,
43+
['threadId'] = threadId
44+
};
45+
46+
redis.call("HSET", bucketKey, idempotencyKey, cjson.encode(newValue));
47+
redis.call("EXPIRE", bucketKey, 24 * 3600);
48+
49+
-- return with a "new" status
50+
newValue['status'] = 'new';
51+
newValue['bucketKey'] = bucketKey;
52+
return cjson.encode(newValue)

lib/tools.js

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -531,13 +531,11 @@ module.exports = {
531531
return entry;
532532
},
533533

534-
async getCounterValues(redis, seconds) {
535-
seconds = Number(seconds) || 3600;
536-
534+
getDateBuckets(seconds) {
537535
let now = new Date();
538536
let startTime = new Date(now.getTime() - seconds * 1000);
539537

540-
let hashKeys = [];
538+
let bucketKeys = [];
541539

542540
// find out all the date buckets we need to check for
543541
let endDateStr = `${now
@@ -550,18 +548,26 @@ module.exports = {
550548
let startTimeStr = `${startTime
551549
.toISOString()
552550
// bucket includes 1 minute
553-
.substr(0, 16)
551+
.substring(0, 16)
554552
.replace(/[^0-9]+/g, '')}`;
555553

556554
while (dateStr < endDateStr) {
557555
dateStr = `${hashTime
558556
.toISOString()
559557
.substr(0, 10)
560558
.replace(/[^0-9]+/g, '')}`;
561-
hashKeys.push(dateStr);
559+
bucketKeys.push(dateStr);
562560
hashTime = new Date(hashTime.getTime() + 24 * 3600 * 1000);
563561
}
564562

563+
return { bucketKeys, startTimeStr };
564+
},
565+
566+
async getCounterValues(redis, seconds) {
567+
seconds = Number(seconds) || 3600;
568+
569+
const { bucketKeys: hashKeys, startTimeStr } = module.exports.getDateBuckets(seconds);
570+
565571
// list potential counter keys
566572
let statUpdateKeys = await redis.smembers(`${REDIS_PREFIX}stats:keys`);
567573

workers/api.js

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5272,7 +5272,10 @@ const init = async () => {
52725272
});
52735273

52745274
try {
5275-
return await accountObject.queueMessage(request.payload, { source: 'api' });
5275+
return await accountObject.queueMessage(request.payload, {
5276+
source: 'api',
5277+
idempotencyKey: request.headers['idempotency-key']
5278+
});
52765279
} catch (err) {
52775280
request.logger.error({ msg: 'API request failed', err });
52785281
if (Boom.isBoom(err)) {
@@ -5318,6 +5321,24 @@ const init = async () => {
53185321
account: accountIdSchema.required()
53195322
}),
53205323

5324+
headers: Joi.object({
5325+
'x-ee-timeout': Joi.number()
5326+
.integer()
5327+
.min(0)
5328+
.max(2 * 3600 * 1000)
5329+
.optional()
5330+
.description(`Override the \`EENGINE_TIMEOUT\` environment variable for a single API request (in milliseconds)`)
5331+
.label('X-EE-Timeout'),
5332+
'idempotency-key': Joi.string()
5333+
.min(0)
5334+
.max(1024)
5335+
.optional()
5336+
.description(
5337+
'A unique identifier provided by the client to ensure that repeated requests with the same key are processed only once. The value should be unique per operation and can be up to 1024 characters in length.'
5338+
)
5339+
.label('Idempotency-Key')
5340+
}).unknown(),
5341+
53215342
payload: Joi.object({
53225343
reference: messageReferenceSchema,
53235344

workers/smtp.js

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,12 @@ function processMessage(stream, session, meta) {
222222
if (requestedAccount) {
223223
meta.requestedAccount = requestedAccount;
224224
}
225+
226+
let idempotencyKey = headers.getFirst('x-ee-idempotency-key');
227+
headers.remove('x-ee-idempotency-key');
228+
if (idempotencyKey) {
229+
meta.idempotencyKey = idempotencyKey;
230+
}
225231
});
226232

227233
stream.once('error', err => joiner.emit('error', err));
@@ -372,7 +378,10 @@ async function init() {
372378
};
373379

374380
accountObject
375-
.queueMessage(payload, { source: 'smtp' })
381+
.queueMessage(payload, {
382+
source: 'smtp',
383+
idempotencyKey: messageMeta.idempotencyKey
384+
})
376385
.then(res => {
377386
// queued for later
378387
metrics(logger, 'events', 'inc', {
@@ -384,7 +393,8 @@ async function init() {
384393
account: session.user,
385394
messageId: res.messageId,
386395
sendAt: res.sendAt,
387-
queueId: res.queueId
396+
queueId: res.queueId,
397+
idempotency: res.idempotency
388398
});
389399

390400
return callback(null, `Message queued for delivery as ${res.queueId} (${new Date(res.sendAt).toISOString()})`);

0 commit comments

Comments
 (0)