Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -137,18 +137,32 @@ const ConnectorsStatusContent: FunctionComponent<ConnectorsStatusContentProps> =

const queues = connectorsStateData.rabbitMQMetrics?.queues ?? [];

const toSafeNumber = (value: unknown): number => {
const nValue = Number(value);
return Number.isFinite(nValue) ? nValue : 0;
};

// Build a map of connectorId -> total messages in one pass over all queues
const queueMessagesByConnector = useMemo(() => {
const map = new Map<string, number>();
for (const queue of queues) {
if (!queue?.name) continue;
const messages = toSafeNumber(queue.messages);
// Match queue names like "<prefix>push_<connectorId>" or "<prefix>listen_<connectorId>"
let idx = queue.name.indexOf('push_');
if (idx === -1) idx = queue.name.indexOf('listen_');
if (idx === -1) continue;
const connectorId = queue.name.substring(queue.name.indexOf('_', idx) + 1);
if (!connectorId) continue;
map.set(connectorId, (map.get(connectorId) ?? 0) + messages);
}
return map;
}, [queues]);

const connectorsWithMessages = filteredConnectors?.map((connector) => {
const queueName = connector.connector_type === 'INTERNAL_ENRICHMENT'
? `listen_${connector.id}`
: `push_${connector.id}`;
const queue = queues.find((o) => o?.name?.includes(queueName));
const messagesCount = queue ? queue.messages : 0;
const messagesCount = queueMessagesByConnector.get(connector.id) ?? 0;
const connectorTriggerStatus = getConnectorTriggerStatus(connector as unknown as Connector);
return {
...connector,
messages: messagesCount,
connectorTriggerStatus,
};
return { ...connector, messages: messagesCount, connectorTriggerStatus };
}) || [];

const sortedConnectors = connectorsWithMessages.sort((a, b) => {
Expand Down
24 changes: 18 additions & 6 deletions opencti-platform/opencti-graphql/src/database/rabbitmq.js
Original file line number Diff line number Diff line change
Expand Up @@ -367,13 +367,25 @@ export const purgeConnectorQueues = async (connector) => {
export const getConnectorQueueDetails = async (connectorId) => {
try {
const httpClient = await amqpHttpClient();
const pathRabbit = `/api/queues${isEmptyField(VHOST_PATH) ? '/%2F' : VHOST_PATH}/${RABBITMQ_PUSH_QUEUE_PREFIX}${connectorId}`;

const queueDetailResponse = await httpClient.get(pathRabbit).then((response) => response.data);
logApp.debug('Rabbit HTTP API response', { queueDetailResponse });
const vhostPath = isEmptyField(VHOST_PATH) ? '/%2F' : VHOST_PATH;
const pathPushQueue = `/api/queues${vhostPath}/${RABBITMQ_PUSH_QUEUE_PREFIX}${connectorId}`;
const pathListenQueue = `/api/queues${vhostPath}/${RABBITMQ_LISTEN_QUEUE_PREFIX}${connectorId}`;

// Fetch both push and listen queue details in parallel
const [pushResult, listenResult] = await Promise.all([
httpClient.get(pathPushQueue).then((response) => response.data).catch(() => null),
httpClient.get(pathListenQueue).then((response) => response.data).catch(() => null),
]);

const pushMessages = pushResult?.messages || 0;
const pushSize = pushResult?.message_bytes || 0;
const listenMessages = listenResult?.messages || 0;
const listenSize = listenResult?.message_bytes || 0;

logApp.debug('Rabbit HTTP API response', { pushResult, listenResult });
return {
messages_number: queueDetailResponse.messages || 0,
messages_size: queueDetailResponse.message_bytes || 0,
messages_number: pushMessages + listenMessages,
messages_size: pushSize + listenSize,
};
} catch (e) {
// For managed connector, the queue is available only after the connector is started.
Expand Down