Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 80 additions & 14 deletions review/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -247,40 +247,106 @@ async function reviewAndFix(ctx: WorkforceCtx, pr: Pr): Promise<void> {
// message so the channel stays a single conversation per PR.
const READY_ANNOUNCED_TAG = 'pr-reviewer:ready-announced';

async function announceReadyOnce(ctx: WorkforceCtx, pr: Pr): Promise<void> {
export async function announceReadyOnce(ctx: WorkforceCtx, pr: Pr, client?: SlackThreadClient): Promise<void> {
const channel = input(ctx, 'SLACK_CHANNEL');
if (!channel) return;
if (pr.headSha && (await alreadyAnnouncedReady(ctx, pr))) return;
const reservation = pr.headSha ? await reserveReadyAnnouncement(ctx, pr) : undefined;
if (pr.headSha && !reservation) return;
const who = `<https://github.com/${pr.author}|@${pr.author}>`; // the PR opener
await postSlackPrUpdate(
ctx,
pr,
`:white_check_mark: ${who} — PR #${pr.number} in *${pr.owner}/${pr.repo}* is ready for your review: ${pr.url}`
);
try {
await postSlackPrUpdate(
ctx,
pr,
`:white_check_mark: ${who} — PR #${pr.number} in *${pr.owner}/${pr.repo}* is ready for your review: ${pr.url}`,
client
);
} catch (error) {
if (pr.headSha && reservation && 'id' in reservation && typeof reservation.id === 'string') {
await forgetReadyAnnouncementReservation(ctx, pr, reservation.id, 'failed');
}
throw error;
}
if (pr.headSha) await rememberReadyAnnounced(ctx, pr);
}

function readyAnnouncedTags(pr: Pr): string[] {
return [READY_ANNOUNCED_TAG, `pr:${pr.owner}/${pr.repo}#${pr.number}`];
return [
READY_ANNOUNCED_TAG,
`pr:${pr.owner}/${pr.repo}#${pr.number}`,
...(pr.headSha ? [`head:${pr.headSha}`] : []),
];
}

async function alreadyAnnouncedReady(ctx: WorkforceCtx, pr: Pr): Promise<boolean> {
return (await readyAnnouncementItems(ctx, pr, 'announced')).length > 0;
}

async function reserveReadyAnnouncement(ctx: WorkforceCtx, pr: Pr): Promise<{ id: string } | {} | undefined> {
if (await alreadyAnnouncedReady(ctx, pr)) return undefined;
const saved = await rememberReadyAnnouncementReservation(ctx, pr);
if (!saved?.id) return {};

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: Ready-announcement dedupe is bypassed when reservation save returns no id, allowing duplicate Slack pings for the same head SHA.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At review/agent.ts, line 287:

<comment>Ready-announcement dedupe is bypassed when reservation save returns no id, allowing duplicate Slack pings for the same head SHA.</comment>

<file context>
@@ -247,40 +247,106 @@ async function reviewAndFix(ctx: WorkforceCtx, pr: Pr): Promise<void> {
+async function reserveReadyAnnouncement(ctx: WorkforceCtx, pr: Pr): Promise<{ id: string } | {} | undefined> {
+  if (await alreadyAnnouncedReady(ctx, pr)) return undefined;
+  const saved = await rememberReadyAnnouncementReservation(ctx, pr);
+  if (!saved?.id) return {};
+  const [winner] = await readyAnnouncementItems(ctx, pr, 'reservation');
+  if (!winner || winner.id === saved.id) return saved;
</file context>
Suggested change
if (!saved?.id) return {};
if (!saved?.id) return undefined;

const [winner] = await readyAnnouncementItems(ctx, pr, 'reservation');
if (!winner || winner.id === saved.id) return saved;
await forgetReadyAnnouncementReservation(ctx, pr, saved.id, 'cancelled');
return undefined;
}

async function readyAnnouncementItems(ctx: WorkforceCtx, pr: Pr, kind: 'announced' | 'reservation') {
const items = await ctx.memory.recall(`pr-reviewer ready announced for ${pr.owner}/${pr.repo}#${pr.number}`, {
scope: 'workspace',
tags: readyAnnouncedTags(pr),
limit: 5,
limit: 100,
});
return items.some((item) => {
const parsed = items.flatMap((item) => {
try {
return (JSON.parse(item.content) as { headSha?: string }).headSha === pr.headSha;
const content = JSON.parse(item.content) as { headSha?: string; kind?: string; reservationId?: string };
return content.headSha === pr.headSha ? [{ item, content }] : [];
} catch {
return false;
return [];
}
});
const inactiveReservationIds = new Set(
parsed
.filter(({ content }) => content.kind === 'failed' || content.kind === 'cancelled')
.map(({ content }) => content.reservationId)
.filter((id): id is string => typeof id === 'string' && id.length > 0)
);
return parsed
.filter(({ item, content }) => {
try {
return content.kind === kind && !inactiveReservationIds.has(item.id);
} catch {
return false;
}
})
.map(({ item }) => item)
.sort((a, b) => a.createdAt.localeCompare(b.createdAt) || a.id.localeCompare(b.id));
}

async function rememberReadyAnnounced(ctx: WorkforceCtx, pr: Pr): Promise<{ id: string } | void> {
return await saveReadyAnnouncementMarker(ctx, pr, 'announced');
}

async function rememberReadyAnnouncementReservation(ctx: WorkforceCtx, pr: Pr): Promise<{ id: string } | void> {
return await saveReadyAnnouncementMarker(ctx, pr, 'reservation');
}

async function rememberReadyAnnounced(ctx: WorkforceCtx, pr: Pr): Promise<void> {
await ctx.memory.save(JSON.stringify({ headSha: pr.headSha }), {
async function forgetReadyAnnouncementReservation(
ctx: WorkforceCtx,
pr: Pr,
reservationId: string,
kind: 'failed' | 'cancelled'
): Promise<{ id: string } | void> {
return await saveReadyAnnouncementMarker(ctx, pr, kind, reservationId);
}

async function saveReadyAnnouncementMarker(
ctx: WorkforceCtx,
pr: Pr,
kind: 'announced' | 'reservation' | 'failed' | 'cancelled',
reservationId?: string
): Promise<{ id: string } | void> {
return await ctx.memory.save(JSON.stringify({ headSha: pr.headSha, kind, ...(reservationId ? { reservationId } : {}) }), {
scope: 'workspace',
tags: readyAnnouncedTags(pr),
});
Expand Down
126 changes: 126 additions & 0 deletions tests/review-agent.test.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import test from 'node:test';
import { parseIntegrations } from '@agentworkforce/persona-kit';

import {
announceReadyOnce,
labelNames,
postSlackPrUpdate,
prReadyStateAllowsHumanReview,
Expand Down Expand Up @@ -362,3 +363,128 @@ test('postSlackPrUpdate starts one channel message per PR and threads later upda
{ kind: 'reply', channel: 'C123', threadTs: '1710000000.123456', text: 'merged' },
]);
});

test('announceReadyOnce posts once for the same head sha', async () => {
const memory = [];
const ctx = readyAnnouncementTestCtx(memory);
const calls = [];
const slack = readyAnnouncementSlack(calls);
const pr = readyAnnouncementPr();

await announceReadyOnce(ctx, pr, slack);
await announceReadyOnce(ctx, pr, slack);

assert.equal(readyAnnouncementMarkers(memory, 'reservation').length, 1);
assert.equal(readyAnnouncementMarkers(memory, 'announced').length, 1);
assert.equal(calls.length, 1);
assert.match(calls[0].text, /ready for your review/);
});

test('announceReadyOnce chooses one marker when same-head runs overlap', async () => {
const memory = [];
let saves = 0;
let releaseSaves;
const bothSaved = new Promise((resolve) => {
releaseSaves = resolve;
});
const ctx = readyAnnouncementTestCtx(memory, {
async afterSave() {
saves += 1;
if (saves === 2) releaseSaves();
await bothSaved;
},
});
const calls = [];
const slack = readyAnnouncementSlack(calls);
const pr = readyAnnouncementPr();

await Promise.all([
announceReadyOnce(ctx, pr, slack),
announceReadyOnce(ctx, pr, slack),
]);

assert.equal(readyAnnouncementMarkers(memory, 'reservation').length, 2);
assert.equal(readyAnnouncementMarkers(memory, 'announced').length, 1);
assert.equal(calls.length, 1);
});

test('announceReadyOnce retries when the winning Slack post fails before announcement is saved', async () => {
const memory = [];
const ctx = readyAnnouncementTestCtx(memory);
const calls = [];
const pr = readyAnnouncementPr();
const failingSlack = {
async post() {
throw new Error('slack unavailable');
},
async reply() {
throw new Error('should not reply');
},
};

await assert.rejects(() => announceReadyOnce(ctx, pr, failingSlack), /slack unavailable/);
assert.equal(readyAnnouncementMarkers(memory, 'announced').length, 0);

await announceReadyOnce(ctx, pr, readyAnnouncementSlack(calls));

assert.equal(calls.length, 1);
assert.equal(readyAnnouncementMarkers(memory, 'announced').length, 1);
});

function readyAnnouncementMarkers(memory, kind) {
return memory.filter((item) => {
if (!item.tags.includes('pr-reviewer:ready-announced')) return false;
return JSON.parse(item.content).kind === kind;
});
}

function readyAnnouncementTestCtx(memory, hooks = {}) {
return {
persona: {
inputSpecs: { SLACK_CHANNEL: { env: '__TEST_SLACK_CHANNEL__' } },
inputs: { SLACK_CHANNEL: 'C123' },
},
memory: {
async recall(_query, opts) {
return memory.filter((item) => opts.tags.every((tag) => item.tags.includes(tag)));
},
async save(content, opts) {
const id = `memory-${memory.length + 1}`;
memory.push({
id,
content,
tags: opts.tags,
scope: opts.scope,
createdAt: new Date(memory.length).toISOString(),
});
await hooks.afterSave?.();
return { id };
},
},
log() {},
};
}

function readyAnnouncementSlack(calls) {
return {
async post(channel, text) {
calls.push({ kind: 'post', channel, text });
return { channel, ts: '1710000000.123456' };
},
async reply(channel, threadTs, text) {
calls.push({ kind: 'reply', channel, threadTs, text });
return { channel, ts: '1710000001.123456' };
},
};
}

function readyAnnouncementPr() {
return {
owner: 'AgentWorkforce',
repo: 'agents',
number: 50,
url: 'https://github.com/AgentWorkforce/agents/pull/50',
author: 'khaliqgant',
headSha: '9b1ecb4022bf574885b50376db65a827ddedce3b',
};
}