diff --git a/prisma/schema.prisma b/prisma/schema.prisma index 9bd3e4b..2c8b145 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -392,7 +392,7 @@ model IssueSyncRun { syncedCount Int @default(0) errorMessage String? notes String? @db.Text - syncType String @default("scheduled") // "scheduled" | "manual" | "pre-claim" + syncType String @default("scheduled") // "scheduled" | "manual" | "automation" startedAt DateTime @default(now()) completedAt DateTime? diff --git a/src/app/api/automation/sync/route.test.ts b/src/app/api/automation/sync/route.test.ts index 37b6e0c..f64125e 100644 --- a/src/app/api/automation/sync/route.test.ts +++ b/src/app/api/automation/sync/route.test.ts @@ -15,6 +15,59 @@ vi.mock("@/lib/config", () => ({ parseExcludedLabels: vi.fn().mockReturnValue([]), })); +const { mocks } = vi.hoisted(() => ({ + mocks: { + syncLockFindUnique: vi.fn().mockResolvedValue(null), + syncLockDelete: vi.fn().mockResolvedValue(undefined), + syncLockDeleteMany: vi.fn().mockResolvedValue({ count: 0 }), + transactionFn: vi.fn(async (fn: any) => { + const runId = `run-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; + return runId; + }), + upsert: vi.fn().mockResolvedValue({ id: "ar-1", fullName: "org/repo" }), + findMany: vi.fn().mockResolvedValue([]), + create: vi.fn().mockResolvedValue(undefined), + }, +})); + +vi.mock("@/lib/prisma", () => ({ + prisma: { + syncLock: { + findUnique: mocks.syncLockFindUnique, + delete: mocks.syncLockDelete, + deleteMany: mocks.syncLockDeleteMany, + }, + $transaction: mocks.transactionFn, + automationRepo: { + upsert: mocks.upsert, + }, + githubWorkflow: { + upsert: mocks.upsert, + findMany: mocks.findMany, + create: mocks.create, + }, + githubWorkflowRun: { + upsert: mocks.upsert, + findUnique: vi.fn().mockResolvedValue(null), + }, + githubWorkflowJob: { + upsert: mocks.upsert, + }, + githubRelease: { + upsert: mocks.upsert, + }, + githubPullRequest: { + upsert: mocks.upsert, + }, + githubPackage: { + upsert: mocks.upsert, + }, + automationEvent: { + create: mocks.create, + }, + }, +})); + import { POST } from "./route"; describe("POST /api/automation/sync — auth", () => { diff --git a/src/app/api/automation/sync/route.ts b/src/app/api/automation/sync/route.ts index f8ba69d..ea60055 100644 --- a/src/app/api/automation/sync/route.ts +++ b/src/app/api/automation/sync/route.ts @@ -12,6 +12,7 @@ import { } from "@/lib/github"; import { getTrackedRepos } from "@/lib/config"; import { authorizeRequest } from "@/lib/auth"; +import { acquireLock, releaseLock } from "@/lib/sync-lock"; async function syncRepo(repoFullName: string) { const parts = repoFullName.split("/"); @@ -334,24 +335,39 @@ export async function POST(request: Request) { const body = await request.json().catch(() => ({})); const repoFullName = body.repo || body.fullName; - if (repoFullName) { - const result = await syncRepo(repoFullName); - if (result.success) { - return NextResponse.json({ success: true, syncRunId: result.syncRunId }); - } else { - return NextResponse.json({ error: result.error }, { status: 500 }); - } + // Acquire shared DB lock to prevent overlapping runs across all sync types + const lockResult = await acquireLock("automation"); + if (!lockResult.locked) { + return NextResponse.json( + { error: "A sync is already running. Try again later.", locked: true }, + { status: 409 }, + ); } - const repos = await getTrackedRepos(); - const results = []; - for (const repo of repos) { - results.push({ repo, result: await syncRepo(repo) }); - } + const { runId } = lockResult; - return NextResponse.json({ - synced: results.filter((r) => r.result.success).length, - failed: results.filter((r) => !r.result.success).length, - results, - }); + try { + if (repoFullName) { + const result = await syncRepo(repoFullName); + if (result.success) { + return NextResponse.json({ success: true, syncRunId: result.syncRunId }); + } else { + return NextResponse.json({ error: result.error }, { status: 500 }); + } + } + + const repos = await getTrackedRepos(); + const results = []; + for (const repo of repos) { + results.push({ repo, result: await syncRepo(repo) }); + } + + return NextResponse.json({ + synced: results.filter((r) => r.result.success).length, + failed: results.filter((r) => !r.result.success).length, + results, + }); + } finally { + await releaseLock(runId).catch(() => {}); + } } diff --git a/src/app/api/sync/route.test.ts b/src/app/api/sync/route.test.ts index a943a25..35e9c29 100644 --- a/src/app/api/sync/route.test.ts +++ b/src/app/api/sync/route.test.ts @@ -17,6 +17,14 @@ const { mocks } = vi.hoisted(() => ({ update: vi.fn().mockResolvedValue(undefined), create: vi.fn().mockResolvedValue({ id: "issue-1" }), auth: vi.fn(), + syncLockFindUnique: vi.fn().mockResolvedValue(null), + syncLockDelete: vi.fn().mockResolvedValue(undefined), + syncLockDeleteMany: vi.fn().mockResolvedValue({ count: 0 }), + issueSyncRunUpdateMany: vi.fn().mockResolvedValue({ count: 1 }), + transactionFn: vi.fn(async (fn: any) => { + const runId = `run-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; + return runId; + }), }, })); @@ -31,6 +39,15 @@ vi.mock("@/lib/prisma", () => ({ update: mocks.update, create: mocks.create, }, + syncLock: { + findUnique: mocks.syncLockFindUnique, + delete: mocks.syncLockDelete, + deleteMany: mocks.syncLockDeleteMany, + }, + issueSyncRun: { + updateMany: mocks.issueSyncRunUpdateMany, + }, + $transaction: mocks.transactionFn, }, })); diff --git a/src/app/api/sync/route.ts b/src/app/api/sync/route.ts index df672f6..b8dcd5c 100644 --- a/src/app/api/sync/route.ts +++ b/src/app/api/sync/route.ts @@ -4,6 +4,7 @@ import { fetchIssues } from "@/lib/github"; import { getSyncRepos, parseExcludedLabels } from "@/lib/config"; import { syncIssuesForRepos, mergeLabels } from "@/lib/issue-sync"; import { authorizeRequest } from "@/lib/auth"; +import { acquireLock, releaseLock } from "@/lib/sync-lock"; export async function POST(request: NextRequest) { if (!(await authorizeRequest(request)).authorized) { @@ -34,35 +35,61 @@ export async function POST(request: NextRequest) { } } - const excludedLabels = parseExcludedLabels(process.env.DISPATCH_EXCLUDED_LABELS); - const result = await syncIssuesForRepos(repos, fetchIssues, { - findIssue(repositoryId, number) { - return prisma.issue.findUnique({ - where: { repositoryId_number: { repositoryId, number } }, - }); - }, - async updateIssue(id, data) { - // Preserve agent/* labels from Prisma in case GitHub hasn't propagated the claim yet. - // This prevents a race condition where the claim endpoint adds an agent label to both - // Prisma and GitHub, but a concurrent sync overwrites Prisma with stale GitHub data. - const existing = await prisma.issue.findUnique({ - where: { id }, - select: { labels: true }, - }); + // Acquire shared DB lock to prevent overlapping runs across all sync types + const lockResult = await acquireLock("manual"); + if (!lockResult.locked) { + return NextResponse.json( + { error: "A sync is already running. Try again later.", locked: true }, + { status: 409 }, + ); + } + + const { runId } = lockResult; + + try { + const excludedLabels = parseExcludedLabels(process.env.DISPATCH_EXCLUDED_LABELS); + const result = await syncIssuesForRepos(repos, fetchIssues, { + findIssue(repositoryId, number) { + return prisma.issue.findUnique({ + where: { repositoryId_number: { repositoryId, number } }, + }); + }, + async updateIssue(id, data) { + // Preserve agent/* labels from Prisma in case GitHub hasn't propagated the claim yet. + // This prevents a race condition where the claim endpoint adds an agent label to both + // Prisma and GitHub, but a concurrent sync overwrites Prisma with stale GitHub data. + const existing = await prisma.issue.findUnique({ + where: { id }, + select: { labels: true }, + }); - if (existing && existing.labels.length > 0) { - // Merge: use GitHub labels as base, add any agent/* labels from Prisma that aren't on GitHub - data.labels = mergeLabels(data.labels, existing.labels); - } + if (existing && existing.labels.length > 0) { + // Merge: use GitHub labels as base, add any agent/* labels from Prisma that aren't on GitHub + data.labels = mergeLabels(data.labels, existing.labels); + } - await prisma.issue.update({ where: { id }, data }); - }, - async createIssue(repositoryId, data) { - await prisma.issue.create({ data: { ...data, repositoryId } }); - }, - }, excludedLabels); + await prisma.issue.update({ where: { id }, data }); + }, + async createIssue(repositoryId, data) { + await prisma.issue.create({ data: { ...data, repositoryId } }); + }, + }, excludedLabels); - return NextResponse.json(result); + // Update the sync run record + await prisma.issueSyncRun.updateMany({ + where: { id: runId, status: "running" }, + data: { + status: "completed", + completedAt: new Date(), + reposFetched: result.repos ?? 0, + syncedCount: result.syncedCount ?? 0, + }, + }); + + return NextResponse.json(result); + } finally { + await releaseLock(runId).catch(() => {}); + } } catch (error) { console.error("Sync failed:", error); return NextResponse.json({ error: "Sync failed" }, { status: 500 }); diff --git a/src/app/api/sync/scheduled/route.ts b/src/app/api/sync/scheduled/route.ts index 50495b1..d99de8e 100644 --- a/src/app/api/sync/scheduled/route.ts +++ b/src/app/api/sync/scheduled/route.ts @@ -4,59 +4,7 @@ import { fetchIssues, fetchIssue, fetchRepo, fetchWorkflows, fetchRecentRunsAllW import { getSyncRepos, getTrackedRepos, parseExcludedLabels } from "@/lib/config"; import { syncIssuesForRepos, reconcileClosedIssues, SyncResponse, ClosedIssueReconcileResponse } from "@/lib/issue-sync"; import { authorizeRequest } from "@/lib/auth"; - -// --------------------------------------------------------------------------- -// Lock acquisition — DB-backed single-row guard to prevent overlapping runs. -// Uses upsert on a single "global" row; the first writer wins. -// --------------------------------------------------------------------------- - -async function acquireLock(): Promise<{ locked: true; runId: string } | { locked: false }> { - const existing = await prisma.syncLock.findUnique({ where: { id: "global" } }); - - if (existing && existing.syncRunId) { - const maxAgeMs = 30 * 60 * 1000; - const age = Date.now() - existing.acquiredAt.getTime(); - if (age < maxAgeMs) { - return { locked: false }; - } - // Stale lock — clear it and proceed - await prisma.syncLock.delete({ where: { id: "global" } }); - } - - try { - const runId = await prisma.$transaction(async (tx) => { - // Double-check inside the transaction for race safety - const stillExisting = await tx.syncLock.findUnique({ where: { id: "global" } }); - if (stillExisting && stillExisting.syncRunId) { - throw new Error("already_locked"); - } - - const run = await tx.issueSyncRun.create({ - data: { status: "running", syncType: "scheduled", startedAt: new Date() }, - }); - - await tx.syncLock.create({ - data: { id: "global", syncRunId: run.id, acquiredAt: new Date() }, - }); - - return run.id; - }); - - return { locked: true, runId }; - } catch (err) { - if (err instanceof Error && err.message === "already_locked") { - return { locked: false }; - } - // Re-throw unexpected errors - throw err; - } -} - -async function releaseLock(runId: string): Promise { - await prisma.syncLock.deleteMany({ - where: { id: "global", syncRunId: runId }, - }); -} +import { acquireLock, releaseLock } from "@/lib/sync-lock"; // --------------------------------------------------------------------------- // Route handlers @@ -83,11 +31,11 @@ export async function POST(request: Request) { const syncIssues = options.issues !== false; // default true const syncAutomation = options.automation === true; // default false - // Acquire DB lock to prevent overlapping runs - const lockResult = await acquireLock(); + // Acquire shared DB lock to prevent overlapping runs across all sync types + const lockResult = await acquireLock("scheduled"); if (!lockResult.locked) { return NextResponse.json( - { error: "A scheduled sync is already running. Try again later.", locked: true }, + { error: "A sync is already running. Try again later.", locked: true }, { status: 409 }, ); } diff --git a/src/lib/sync-lock.ts b/src/lib/sync-lock.ts new file mode 100644 index 0000000..fa6d36e --- /dev/null +++ b/src/lib/sync-lock.ts @@ -0,0 +1,88 @@ +/** + * Shared sync-locking module. + * + * Provides a DB-backed single-row lock (syncLock table) that all sync + * entry-points share to prevent overlapping concurrent runs across: + * - Scheduled sync (`/api/sync/scheduled`) + * - Manual issue sync (`/api/sync`) + * - Automation sync (`/api/automation/sync`) + * + * Lock semantics: + * - First writer wins; subsequent writers get a 409 Conflict. + * - Stale locks (>30 min) are automatically cleared. + * - Lock is released on successful completion or failure (via try/finally). + */ + +import { prisma } from "@/lib/prisma"; + +const LOCK_ID = "global" as const; +const MAX_AGE_MS = 30 * 60 * 1000; // 30 minutes + +export interface AcquiredLock { + locked: true; + runId: string; +} + +export interface LockConflict { + locked: false; +} + +/** + * Attempt to acquire the global sync lock. + * + * Returns `{ locked: true, runId }` on success or `{ locked: false }` when + * another run already holds (or has a non-stale) lock. + * + * Creates an IssueSyncRun record so we can track which sync type acquired it. + */ +export async function acquireLock( + syncType: "scheduled" | "manual" | "automation", +): Promise { + try { + const existing = await prisma.syncLock.findUnique({ where: { id: LOCK_ID } }); + + if (existing && existing.syncRunId) { + const age = Date.now() - existing.acquiredAt.getTime(); + if (age < MAX_AGE_MS) { + return { locked: false }; + } + // Stale lock — clear it and proceed + await prisma.syncLock.delete({ where: { id: LOCK_ID } }); + } + + const runId = await prisma.$transaction(async (tx) => { + // Double-check inside the transaction for race safety + const stillExisting = await tx.syncLock.findUnique({ where: { id: LOCK_ID } }); + if (stillExisting && stillExisting.syncRunId) { + throw new Error("already_locked"); + } + + const run = await tx.issueSyncRun.create({ + data: { status: "running", syncType, startedAt: new Date() }, + }); + + await tx.syncLock.create({ + data: { id: LOCK_ID, syncRunId: run.id, acquiredAt: new Date() }, + }); + + return run.id; + }); + + return { locked: true, runId }; + } catch (err) { + if (err instanceof Error && err.message === "already_locked") { + return { locked: false }; + } + throw err; + } +} + +/** + * Release the global sync lock for the given run. + * Uses a conditional delete to avoid releasing another run's lock. + */ +export async function releaseLock(runId: string): Promise { + await prisma.syncLock.deleteMany({ + where: { id: LOCK_ID, syncRunId: runId }, + }); +}