Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ model IssueSyncRun {
syncedCount Int @default(0)
errorMessage String?
notes String? @db.Text
syncType String @default("scheduled") // "scheduled" | "manual" | "pre-claim"
syncType String @default("scheduled") // "scheduled" | "manual" | "automation"
startedAt DateTime @default(now())
completedAt DateTime?

Expand Down
53 changes: 53 additions & 0 deletions src/app/api/automation/sync/route.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,59 @@ vi.mock("@/lib/config", () => ({
parseExcludedLabels: vi.fn().mockReturnValue([]),
}));

const { mocks } = vi.hoisted(() => ({
mocks: {
syncLockFindUnique: vi.fn().mockResolvedValue(null),
syncLockDelete: vi.fn().mockResolvedValue(undefined),
syncLockDeleteMany: vi.fn().mockResolvedValue({ count: 0 }),
transactionFn: vi.fn(async (fn: any) => {
const runId = `run-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
return runId;
}),
upsert: vi.fn().mockResolvedValue({ id: "ar-1", fullName: "org/repo" }),
findMany: vi.fn().mockResolvedValue([]),
create: vi.fn().mockResolvedValue(undefined),
},
}));

vi.mock("@/lib/prisma", () => ({
prisma: {
syncLock: {
findUnique: mocks.syncLockFindUnique,
delete: mocks.syncLockDelete,
deleteMany: mocks.syncLockDeleteMany,
},
$transaction: mocks.transactionFn,
automationRepo: {
upsert: mocks.upsert,
},
githubWorkflow: {
upsert: mocks.upsert,
findMany: mocks.findMany,
create: mocks.create,
},
githubWorkflowRun: {
upsert: mocks.upsert,
findUnique: vi.fn().mockResolvedValue(null),
},
githubWorkflowJob: {
upsert: mocks.upsert,
},
githubRelease: {
upsert: mocks.upsert,
},
githubPullRequest: {
upsert: mocks.upsert,
},
githubPackage: {
upsert: mocks.upsert,
},
automationEvent: {
create: mocks.create,
},
},
}));

import { POST } from "./route";

describe("POST /api/automation/sync — auth", () => {
Expand Down
50 changes: 33 additions & 17 deletions src/app/api/automation/sync/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
} from "@/lib/github";
import { getTrackedRepos } from "@/lib/config";
import { authorizeRequest } from "@/lib/auth";
import { acquireLock, releaseLock } from "@/lib/sync-lock";

async function syncRepo(repoFullName: string) {
const parts = repoFullName.split("/");
Expand Down Expand Up @@ -334,24 +335,39 @@ export async function POST(request: Request) {
const body = await request.json().catch(() => ({}));
const repoFullName = body.repo || body.fullName;

if (repoFullName) {
const result = await syncRepo(repoFullName);
if (result.success) {
return NextResponse.json({ success: true, syncRunId: result.syncRunId });
} else {
return NextResponse.json({ error: result.error }, { status: 500 });
}
// Acquire shared DB lock to prevent overlapping runs across all sync types
const lockResult = await acquireLock("automation");
if (!lockResult.locked) {
return NextResponse.json(
{ error: "A sync is already running. Try again later.", locked: true },
{ status: 409 },
);
}

const repos = await getTrackedRepos();
const results = [];
for (const repo of repos) {
results.push({ repo, result: await syncRepo(repo) });
}
const { runId } = lockResult;

return NextResponse.json({
synced: results.filter((r) => r.result.success).length,
failed: results.filter((r) => !r.result.success).length,
results,
});
try {
if (repoFullName) {
const result = await syncRepo(repoFullName);
if (result.success) {
return NextResponse.json({ success: true, syncRunId: result.syncRunId });
} else {
return NextResponse.json({ error: result.error }, { status: 500 });
}
}

const repos = await getTrackedRepos();
const results = [];
for (const repo of repos) {
results.push({ repo, result: await syncRepo(repo) });
}

return NextResponse.json({
synced: results.filter((r) => r.result.success).length,
failed: results.filter((r) => !r.result.success).length,
results,
});
} finally {
await releaseLock(runId).catch(() => {});
}
}
17 changes: 17 additions & 0 deletions src/app/api/sync/route.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@ const { mocks } = vi.hoisted(() => ({
update: vi.fn().mockResolvedValue(undefined),
create: vi.fn().mockResolvedValue({ id: "issue-1" }),
auth: vi.fn(),
syncLockFindUnique: vi.fn().mockResolvedValue(null),
syncLockDelete: vi.fn().mockResolvedValue(undefined),
syncLockDeleteMany: vi.fn().mockResolvedValue({ count: 0 }),
issueSyncRunUpdateMany: vi.fn().mockResolvedValue({ count: 1 }),
transactionFn: vi.fn(async (fn: any) => {
const runId = `run-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
return runId;
}),
},
}));

Expand All @@ -31,6 +39,15 @@ vi.mock("@/lib/prisma", () => ({
update: mocks.update,
create: mocks.create,
},
syncLock: {
findUnique: mocks.syncLockFindUnique,
delete: mocks.syncLockDelete,
deleteMany: mocks.syncLockDeleteMany,
},
issueSyncRun: {
updateMany: mocks.issueSyncRunUpdateMany,
},
$transaction: mocks.transactionFn,
},
}));

Expand Down
79 changes: 53 additions & 26 deletions src/app/api/sync/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { fetchIssues } from "@/lib/github";
import { getSyncRepos, parseExcludedLabels } from "@/lib/config";
import { syncIssuesForRepos, mergeLabels } from "@/lib/issue-sync";
import { authorizeRequest } from "@/lib/auth";
import { acquireLock, releaseLock } from "@/lib/sync-lock";

export async function POST(request: NextRequest) {
if (!(await authorizeRequest(request)).authorized) {
Expand Down Expand Up @@ -34,35 +35,61 @@ export async function POST(request: NextRequest) {
}
}

const excludedLabels = parseExcludedLabels(process.env.DISPATCH_EXCLUDED_LABELS);
const result = await syncIssuesForRepos(repos, fetchIssues, {
findIssue(repositoryId, number) {
return prisma.issue.findUnique({
where: { repositoryId_number: { repositoryId, number } },
});
},
async updateIssue(id, data) {
// Preserve agent/* labels from Prisma in case GitHub hasn't propagated the claim yet.
// This prevents a race condition where the claim endpoint adds an agent label to both
// Prisma and GitHub, but a concurrent sync overwrites Prisma with stale GitHub data.
const existing = await prisma.issue.findUnique({
where: { id },
select: { labels: true },
});
// Acquire shared DB lock to prevent overlapping runs across all sync types
const lockResult = await acquireLock("manual");
if (!lockResult.locked) {
return NextResponse.json(
{ error: "A sync is already running. Try again later.", locked: true },
{ status: 409 },
);
}

const { runId } = lockResult;

try {
const excludedLabels = parseExcludedLabels(process.env.DISPATCH_EXCLUDED_LABELS);
const result = await syncIssuesForRepos(repos, fetchIssues, {
findIssue(repositoryId, number) {
return prisma.issue.findUnique({
where: { repositoryId_number: { repositoryId, number } },
});
},
async updateIssue(id, data) {
// Preserve agent/* labels from Prisma in case GitHub hasn't propagated the claim yet.
// This prevents a race condition where the claim endpoint adds an agent label to both
// Prisma and GitHub, but a concurrent sync overwrites Prisma with stale GitHub data.
const existing = await prisma.issue.findUnique({
where: { id },
select: { labels: true },
});

if (existing && existing.labels.length > 0) {
// Merge: use GitHub labels as base, add any agent/* labels from Prisma that aren't on GitHub
data.labels = mergeLabels(data.labels, existing.labels);
}
if (existing && existing.labels.length > 0) {
// Merge: use GitHub labels as base, add any agent/* labels from Prisma that aren't on GitHub
data.labels = mergeLabels(data.labels, existing.labels);
}

await prisma.issue.update({ where: { id }, data });
},
async createIssue(repositoryId, data) {
await prisma.issue.create({ data: { ...data, repositoryId } });
},
}, excludedLabels);
await prisma.issue.update({ where: { id }, data });
},
async createIssue(repositoryId, data) {
await prisma.issue.create({ data: { ...data, repositoryId } });
},
}, excludedLabels);

return NextResponse.json(result);
// Update the sync run record
await prisma.issueSyncRun.updateMany({
where: { id: runId, status: "running" },
data: {
status: "completed",
completedAt: new Date(),
reposFetched: result.repos ?? 0,
syncedCount: result.syncedCount ?? 0,
},
});

return NextResponse.json(result);
} finally {
await releaseLock(runId).catch(() => {});
}
} catch (error) {
console.error("Sync failed:", error);
return NextResponse.json({ error: "Sync failed" }, { status: 500 });
Expand Down
60 changes: 4 additions & 56 deletions src/app/api/sync/scheduled/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,59 +4,7 @@ import { fetchIssues, fetchIssue, fetchRepo, fetchWorkflows, fetchRecentRunsAllW
import { getSyncRepos, getTrackedRepos, parseExcludedLabels } from "@/lib/config";
import { syncIssuesForRepos, reconcileClosedIssues, SyncResponse, ClosedIssueReconcileResponse } from "@/lib/issue-sync";
import { authorizeRequest } from "@/lib/auth";

// ---------------------------------------------------------------------------
// Lock acquisition — DB-backed single-row guard to prevent overlapping runs.
// Uses upsert on a single "global" row; the first writer wins.
// ---------------------------------------------------------------------------

async function acquireLock(): Promise<{ locked: true; runId: string } | { locked: false }> {
const existing = await prisma.syncLock.findUnique({ where: { id: "global" } });

if (existing && existing.syncRunId) {
const maxAgeMs = 30 * 60 * 1000;
const age = Date.now() - existing.acquiredAt.getTime();
if (age < maxAgeMs) {
return { locked: false };
}
// Stale lock — clear it and proceed
await prisma.syncLock.delete({ where: { id: "global" } });
}

try {
const runId = await prisma.$transaction(async (tx) => {
// Double-check inside the transaction for race safety
const stillExisting = await tx.syncLock.findUnique({ where: { id: "global" } });
if (stillExisting && stillExisting.syncRunId) {
throw new Error("already_locked");
}

const run = await tx.issueSyncRun.create({
data: { status: "running", syncType: "scheduled", startedAt: new Date() },
});

await tx.syncLock.create({
data: { id: "global", syncRunId: run.id, acquiredAt: new Date() },
});

return run.id;
});

return { locked: true, runId };
} catch (err) {
if (err instanceof Error && err.message === "already_locked") {
return { locked: false };
}
// Re-throw unexpected errors
throw err;
}
}

async function releaseLock(runId: string): Promise<void> {
await prisma.syncLock.deleteMany({
where: { id: "global", syncRunId: runId },
});
}
import { acquireLock, releaseLock } from "@/lib/sync-lock";

// ---------------------------------------------------------------------------
// Route handlers
Expand All @@ -83,11 +31,11 @@ export async function POST(request: Request) {
const syncIssues = options.issues !== false; // default true
const syncAutomation = options.automation === true; // default false

// Acquire DB lock to prevent overlapping runs
const lockResult = await acquireLock();
// Acquire shared DB lock to prevent overlapping runs across all sync types
const lockResult = await acquireLock("scheduled");
if (!lockResult.locked) {
return NextResponse.json(
{ error: "A scheduled sync is already running. Try again later.", locked: true },
{ error: "A sync is already running. Try again later.", locked: true },
{ status: 409 },
);
}
Expand Down
Loading