From 1012dd89ab1f1d207f5185c88a2926b700ce8fb9 Mon Sep 17 00:00:00 2001 From: Saffron Worker Date: Mon, 8 Jun 2026 12:06:53 -0600 Subject: [PATCH 1/2] feat(sync): share sync locking across all sync entry-points Extract DB-backed lock from scheduled sync into shared module (src/lib/sync-lock.ts) and apply it to manual issue sync and automation sync endpoints. This prevents overlapping concurrent runs across all three sync types (scheduled, manual, automation), addressing race conditions from browser refreshes, cron overlap, or repeated clicks. - New src/lib/sync-lock.ts: acquireLock(syncType) / releaseLock(runId) with stale lock cleanup (>30 min) and transactional double-check - src/app/api/sync/route.ts: acquire lock before issue sync, return 409 if a sync is already running - src/app/api/automation/sync/route.ts: acquire lock before automation sync (single repo or batch), return 409 if locked - src/app/api/sync/scheduled/route.ts: replace inline lock with shared module import, update error message to be sync-type-agnostic - prisma/schema.prisma: update syncType comment to include 'automation' --- prisma/schema.prisma | 2 +- src/app/api/automation/sync/route.ts | 50 ++++++++++------ src/app/api/sync/route.ts | 79 +++++++++++++++++-------- src/app/api/sync/scheduled/route.ts | 60 ++----------------- src/lib/sync-lock.ts | 88 ++++++++++++++++++++++++++++ 5 files changed, 179 insertions(+), 100 deletions(-) create mode 100644 src/lib/sync-lock.ts diff --git a/prisma/schema.prisma b/prisma/schema.prisma index 9bd3e4b..2c8b145 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -392,7 +392,7 @@ model IssueSyncRun { syncedCount Int @default(0) errorMessage String? notes String? @db.Text - syncType String @default("scheduled") // "scheduled" | "manual" | "pre-claim" + syncType String @default("scheduled") // "scheduled" | "manual" | "automation" startedAt DateTime @default(now()) completedAt DateTime? diff --git a/src/app/api/automation/sync/route.ts b/src/app/api/automation/sync/route.ts index f8ba69d..ea60055 100644 --- a/src/app/api/automation/sync/route.ts +++ b/src/app/api/automation/sync/route.ts @@ -12,6 +12,7 @@ import { } from "@/lib/github"; import { getTrackedRepos } from "@/lib/config"; import { authorizeRequest } from "@/lib/auth"; +import { acquireLock, releaseLock } from "@/lib/sync-lock"; async function syncRepo(repoFullName: string) { const parts = repoFullName.split("/"); @@ -334,24 +335,39 @@ export async function POST(request: Request) { const body = await request.json().catch(() => ({})); const repoFullName = body.repo || body.fullName; - if (repoFullName) { - const result = await syncRepo(repoFullName); - if (result.success) { - return NextResponse.json({ success: true, syncRunId: result.syncRunId }); - } else { - return NextResponse.json({ error: result.error }, { status: 500 }); - } + // Acquire shared DB lock to prevent overlapping runs across all sync types + const lockResult = await acquireLock("automation"); + if (!lockResult.locked) { + return NextResponse.json( + { error: "A sync is already running. Try again later.", locked: true }, + { status: 409 }, + ); } - const repos = await getTrackedRepos(); - const results = []; - for (const repo of repos) { - results.push({ repo, result: await syncRepo(repo) }); - } + const { runId } = lockResult; - return NextResponse.json({ - synced: results.filter((r) => r.result.success).length, - failed: results.filter((r) => !r.result.success).length, - results, - }); + try { + if (repoFullName) { + const result = await syncRepo(repoFullName); + if (result.success) { + return NextResponse.json({ success: true, syncRunId: result.syncRunId }); + } else { + return NextResponse.json({ error: result.error }, { status: 500 }); + } + } + + const repos = await getTrackedRepos(); + const results = []; + for (const repo of repos) { + results.push({ repo, result: await syncRepo(repo) }); + } + + return NextResponse.json({ + synced: results.filter((r) => r.result.success).length, + failed: results.filter((r) => !r.result.success).length, + results, + }); + } finally { + await releaseLock(runId).catch(() => {}); + } } diff --git a/src/app/api/sync/route.ts b/src/app/api/sync/route.ts index df672f6..b8dcd5c 100644 --- a/src/app/api/sync/route.ts +++ b/src/app/api/sync/route.ts @@ -4,6 +4,7 @@ import { fetchIssues } from "@/lib/github"; import { getSyncRepos, parseExcludedLabels } from "@/lib/config"; import { syncIssuesForRepos, mergeLabels } from "@/lib/issue-sync"; import { authorizeRequest } from "@/lib/auth"; +import { acquireLock, releaseLock } from "@/lib/sync-lock"; export async function POST(request: NextRequest) { if (!(await authorizeRequest(request)).authorized) { @@ -34,35 +35,61 @@ export async function POST(request: NextRequest) { } } - const excludedLabels = parseExcludedLabels(process.env.DISPATCH_EXCLUDED_LABELS); - const result = await syncIssuesForRepos(repos, fetchIssues, { - findIssue(repositoryId, number) { - return prisma.issue.findUnique({ - where: { repositoryId_number: { repositoryId, number } }, - }); - }, - async updateIssue(id, data) { - // Preserve agent/* labels from Prisma in case GitHub hasn't propagated the claim yet. - // This prevents a race condition where the claim endpoint adds an agent label to both - // Prisma and GitHub, but a concurrent sync overwrites Prisma with stale GitHub data. - const existing = await prisma.issue.findUnique({ - where: { id }, - select: { labels: true }, - }); + // Acquire shared DB lock to prevent overlapping runs across all sync types + const lockResult = await acquireLock("manual"); + if (!lockResult.locked) { + return NextResponse.json( + { error: "A sync is already running. Try again later.", locked: true }, + { status: 409 }, + ); + } + + const { runId } = lockResult; + + try { + const excludedLabels = parseExcludedLabels(process.env.DISPATCH_EXCLUDED_LABELS); + const result = await syncIssuesForRepos(repos, fetchIssues, { + findIssue(repositoryId, number) { + return prisma.issue.findUnique({ + where: { repositoryId_number: { repositoryId, number } }, + }); + }, + async updateIssue(id, data) { + // Preserve agent/* labels from Prisma in case GitHub hasn't propagated the claim yet. + // This prevents a race condition where the claim endpoint adds an agent label to both + // Prisma and GitHub, but a concurrent sync overwrites Prisma with stale GitHub data. + const existing = await prisma.issue.findUnique({ + where: { id }, + select: { labels: true }, + }); - if (existing && existing.labels.length > 0) { - // Merge: use GitHub labels as base, add any agent/* labels from Prisma that aren't on GitHub - data.labels = mergeLabels(data.labels, existing.labels); - } + if (existing && existing.labels.length > 0) { + // Merge: use GitHub labels as base, add any agent/* labels from Prisma that aren't on GitHub + data.labels = mergeLabels(data.labels, existing.labels); + } - await prisma.issue.update({ where: { id }, data }); - }, - async createIssue(repositoryId, data) { - await prisma.issue.create({ data: { ...data, repositoryId } }); - }, - }, excludedLabels); + await prisma.issue.update({ where: { id }, data }); + }, + async createIssue(repositoryId, data) { + await prisma.issue.create({ data: { ...data, repositoryId } }); + }, + }, excludedLabels); - return NextResponse.json(result); + // Update the sync run record + await prisma.issueSyncRun.updateMany({ + where: { id: runId, status: "running" }, + data: { + status: "completed", + completedAt: new Date(), + reposFetched: result.repos ?? 0, + syncedCount: result.syncedCount ?? 0, + }, + }); + + return NextResponse.json(result); + } finally { + await releaseLock(runId).catch(() => {}); + } } catch (error) { console.error("Sync failed:", error); return NextResponse.json({ error: "Sync failed" }, { status: 500 }); diff --git a/src/app/api/sync/scheduled/route.ts b/src/app/api/sync/scheduled/route.ts index 50495b1..d99de8e 100644 --- a/src/app/api/sync/scheduled/route.ts +++ b/src/app/api/sync/scheduled/route.ts @@ -4,59 +4,7 @@ import { fetchIssues, fetchIssue, fetchRepo, fetchWorkflows, fetchRecentRunsAllW import { getSyncRepos, getTrackedRepos, parseExcludedLabels } from "@/lib/config"; import { syncIssuesForRepos, reconcileClosedIssues, SyncResponse, ClosedIssueReconcileResponse } from "@/lib/issue-sync"; import { authorizeRequest } from "@/lib/auth"; - -// --------------------------------------------------------------------------- -// Lock acquisition — DB-backed single-row guard to prevent overlapping runs. -// Uses upsert on a single "global" row; the first writer wins. -// --------------------------------------------------------------------------- - -async function acquireLock(): Promise<{ locked: true; runId: string } | { locked: false }> { - const existing = await prisma.syncLock.findUnique({ where: { id: "global" } }); - - if (existing && existing.syncRunId) { - const maxAgeMs = 30 * 60 * 1000; - const age = Date.now() - existing.acquiredAt.getTime(); - if (age < maxAgeMs) { - return { locked: false }; - } - // Stale lock — clear it and proceed - await prisma.syncLock.delete({ where: { id: "global" } }); - } - - try { - const runId = await prisma.$transaction(async (tx) => { - // Double-check inside the transaction for race safety - const stillExisting = await tx.syncLock.findUnique({ where: { id: "global" } }); - if (stillExisting && stillExisting.syncRunId) { - throw new Error("already_locked"); - } - - const run = await tx.issueSyncRun.create({ - data: { status: "running", syncType: "scheduled", startedAt: new Date() }, - }); - - await tx.syncLock.create({ - data: { id: "global", syncRunId: run.id, acquiredAt: new Date() }, - }); - - return run.id; - }); - - return { locked: true, runId }; - } catch (err) { - if (err instanceof Error && err.message === "already_locked") { - return { locked: false }; - } - // Re-throw unexpected errors - throw err; - } -} - -async function releaseLock(runId: string): Promise { - await prisma.syncLock.deleteMany({ - where: { id: "global", syncRunId: runId }, - }); -} +import { acquireLock, releaseLock } from "@/lib/sync-lock"; // --------------------------------------------------------------------------- // Route handlers @@ -83,11 +31,11 @@ export async function POST(request: Request) { const syncIssues = options.issues !== false; // default true const syncAutomation = options.automation === true; // default false - // Acquire DB lock to prevent overlapping runs - const lockResult = await acquireLock(); + // Acquire shared DB lock to prevent overlapping runs across all sync types + const lockResult = await acquireLock("scheduled"); if (!lockResult.locked) { return NextResponse.json( - { error: "A scheduled sync is already running. Try again later.", locked: true }, + { error: "A sync is already running. Try again later.", locked: true }, { status: 409 }, ); } diff --git a/src/lib/sync-lock.ts b/src/lib/sync-lock.ts new file mode 100644 index 0000000..fa6d36e --- /dev/null +++ b/src/lib/sync-lock.ts @@ -0,0 +1,88 @@ +/** + * Shared sync-locking module. + * + * Provides a DB-backed single-row lock (syncLock table) that all sync + * entry-points share to prevent overlapping concurrent runs across: + * - Scheduled sync (`/api/sync/scheduled`) + * - Manual issue sync (`/api/sync`) + * - Automation sync (`/api/automation/sync`) + * + * Lock semantics: + * - First writer wins; subsequent writers get a 409 Conflict. + * - Stale locks (>30 min) are automatically cleared. + * - Lock is released on successful completion or failure (via try/finally). + */ + +import { prisma } from "@/lib/prisma"; + +const LOCK_ID = "global" as const; +const MAX_AGE_MS = 30 * 60 * 1000; // 30 minutes + +export interface AcquiredLock { + locked: true; + runId: string; +} + +export interface LockConflict { + locked: false; +} + +/** + * Attempt to acquire the global sync lock. + * + * Returns `{ locked: true, runId }` on success or `{ locked: false }` when + * another run already holds (or has a non-stale) lock. + * + * Creates an IssueSyncRun record so we can track which sync type acquired it. + */ +export async function acquireLock( + syncType: "scheduled" | "manual" | "automation", +): Promise { + try { + const existing = await prisma.syncLock.findUnique({ where: { id: LOCK_ID } }); + + if (existing && existing.syncRunId) { + const age = Date.now() - existing.acquiredAt.getTime(); + if (age < MAX_AGE_MS) { + return { locked: false }; + } + // Stale lock — clear it and proceed + await prisma.syncLock.delete({ where: { id: LOCK_ID } }); + } + + const runId = await prisma.$transaction(async (tx) => { + // Double-check inside the transaction for race safety + const stillExisting = await tx.syncLock.findUnique({ where: { id: LOCK_ID } }); + if (stillExisting && stillExisting.syncRunId) { + throw new Error("already_locked"); + } + + const run = await tx.issueSyncRun.create({ + data: { status: "running", syncType, startedAt: new Date() }, + }); + + await tx.syncLock.create({ + data: { id: LOCK_ID, syncRunId: run.id, acquiredAt: new Date() }, + }); + + return run.id; + }); + + return { locked: true, runId }; + } catch (err) { + if (err instanceof Error && err.message === "already_locked") { + return { locked: false }; + } + throw err; + } +} + +/** + * Release the global sync lock for the given run. + * Uses a conditional delete to avoid releasing another run's lock. + */ +export async function releaseLock(runId: string): Promise { + await prisma.syncLock.deleteMany({ + where: { id: LOCK_ID, syncRunId: runId }, + }); +} From 8033b1e5e610b2998c1173d6255c1f2b03f98429 Mon Sep 17 00:00:00 2001 From: Saffron Normal Worker Date: Mon, 8 Jun 2026 12:42:32 -0600 Subject: [PATCH 2/2] fix(tests): mock syncLock and issueSyncRun in sync route tests The manual sync and automation sync routes now use acquireLock() which requires prisma.syncLock and prisma.. Add these mocks to the test fixtures so the tests pass without a real database. --- src/app/api/automation/sync/route.test.ts | 53 +++++++++++++++++++++++ src/app/api/sync/route.test.ts | 17 ++++++++ 2 files changed, 70 insertions(+) diff --git a/src/app/api/automation/sync/route.test.ts b/src/app/api/automation/sync/route.test.ts index 37b6e0c..f64125e 100644 --- a/src/app/api/automation/sync/route.test.ts +++ b/src/app/api/automation/sync/route.test.ts @@ -15,6 +15,59 @@ vi.mock("@/lib/config", () => ({ parseExcludedLabels: vi.fn().mockReturnValue([]), })); +const { mocks } = vi.hoisted(() => ({ + mocks: { + syncLockFindUnique: vi.fn().mockResolvedValue(null), + syncLockDelete: vi.fn().mockResolvedValue(undefined), + syncLockDeleteMany: vi.fn().mockResolvedValue({ count: 0 }), + transactionFn: vi.fn(async (fn: any) => { + const runId = `run-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; + return runId; + }), + upsert: vi.fn().mockResolvedValue({ id: "ar-1", fullName: "org/repo" }), + findMany: vi.fn().mockResolvedValue([]), + create: vi.fn().mockResolvedValue(undefined), + }, +})); + +vi.mock("@/lib/prisma", () => ({ + prisma: { + syncLock: { + findUnique: mocks.syncLockFindUnique, + delete: mocks.syncLockDelete, + deleteMany: mocks.syncLockDeleteMany, + }, + $transaction: mocks.transactionFn, + automationRepo: { + upsert: mocks.upsert, + }, + githubWorkflow: { + upsert: mocks.upsert, + findMany: mocks.findMany, + create: mocks.create, + }, + githubWorkflowRun: { + upsert: mocks.upsert, + findUnique: vi.fn().mockResolvedValue(null), + }, + githubWorkflowJob: { + upsert: mocks.upsert, + }, + githubRelease: { + upsert: mocks.upsert, + }, + githubPullRequest: { + upsert: mocks.upsert, + }, + githubPackage: { + upsert: mocks.upsert, + }, + automationEvent: { + create: mocks.create, + }, + }, +})); + import { POST } from "./route"; describe("POST /api/automation/sync — auth", () => { diff --git a/src/app/api/sync/route.test.ts b/src/app/api/sync/route.test.ts index a943a25..35e9c29 100644 --- a/src/app/api/sync/route.test.ts +++ b/src/app/api/sync/route.test.ts @@ -17,6 +17,14 @@ const { mocks } = vi.hoisted(() => ({ update: vi.fn().mockResolvedValue(undefined), create: vi.fn().mockResolvedValue({ id: "issue-1" }), auth: vi.fn(), + syncLockFindUnique: vi.fn().mockResolvedValue(null), + syncLockDelete: vi.fn().mockResolvedValue(undefined), + syncLockDeleteMany: vi.fn().mockResolvedValue({ count: 0 }), + issueSyncRunUpdateMany: vi.fn().mockResolvedValue({ count: 1 }), + transactionFn: vi.fn(async (fn: any) => { + const runId = `run-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; + return runId; + }), }, })); @@ -31,6 +39,15 @@ vi.mock("@/lib/prisma", () => ({ update: mocks.update, create: mocks.create, }, + syncLock: { + findUnique: mocks.syncLockFindUnique, + delete: mocks.syncLockDelete, + deleteMany: mocks.syncLockDeleteMany, + }, + issueSyncRun: { + updateMany: mocks.issueSyncRunUpdateMany, + }, + $transaction: mocks.transactionFn, }, }));