This repository was archived by the owner on Jun 6, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 550
Expand file tree
/
Copy pathindex.js
More file actions
77 lines (71 loc) · 2.62 KB
/
index.js
File metadata and controls
77 lines (71 loc) · 2.62 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
require('module-alias/register');
require('dotenv').config();
const fetch = require('node-fetch');
const AsyncLock = require('async-lock');
const { default: PQueue } = require('p-queue');
require('@dbc/common/init');
const logger = require('@dbc/common/logger');
const { getFrameworkInformer } = require('@dbc/common/k8s');
const { alwaysRetryDecorator } = require('@dbc/common/util');
const config = require('@dbc/watcher/framework/config');
// Here, we use AsyncLock to control the concurrency of frameworks with the same name;
// e.g. If framework A has event1, event2, and event3, we use AsyncLock
// to ensure they will be delivered to write-merger in order.
// In the same time, we use PQueue to control the concurrency of frameworks with different names;
// e.g. If there are framework 1 ~ framework 30000, only some of them can be processed concurrently.
const lock = new AsyncLock({ maxPending: Number.MAX_SAFE_INTEGER });
const queue = new PQueue({ concurrency: config.maxRpcConcurrency });
async function synchronizeFramework(eventType, apiObject) {
const res = await fetch(
`${config.writeMergerUrl}/api/v1/watchEvents/${eventType}`,
{
method: 'POST',
body: JSON.stringify(apiObject),
headers: { 'Content-Type': 'application/json' },
timeout: config.writeMergerConnectionTimeoutSecond * 1000,
},
);
if (!res.ok) {
throw new Error(`Request returns a ${res.status} error.`);
}
}
const eventHandler = (eventType, apiObject) => {
/*
event uid-based lock + always retry
*/
const receivedTs = new Date().getTime();
const state =
apiObject.status && apiObject.status.state
? apiObject.status.state
: 'Unknown';
logger.info(
`Event type=${eventType} receivedTs=${receivedTs} framework=${apiObject.metadata.name} state=${state} received.`,
);
lock.acquire(apiObject.metadata.name, () => {
return queue.add(
alwaysRetryDecorator(
() => synchronizeFramework(eventType, apiObject),
`Sync to write merger type=${eventType} receivedTs=${receivedTs} framework=${apiObject.metadata.name} state=${state}`,
),
);
});
};
const informer = getFrameworkInformer();
informer.on('add', apiObject => {
eventHandler('ADDED', apiObject);
});
informer.on('update', apiObject => {
eventHandler('MODIFED', apiObject);
});
informer.on('delete', apiObject => {
eventHandler('DELETED', apiObject);
});
informer.on('error', err => {
// If any error happens, the process should exit, and let Kubernetes restart it.
logger.error(err, function() {
process.exit(1);
});
});
informer.start();