diff --git a/prisma/schema.prisma b/prisma/schema.prisma index 9bd3e4b..2c8b145 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -392,7 +392,7 @@ model IssueSyncRun { syncedCount Int @default(0) errorMessage String? notes String? @db.Text - syncType String @default("scheduled") // "scheduled" | "manual" | "pre-claim" + syncType String @default("scheduled") // "scheduled" | "manual" | "automation" startedAt DateTime @default(now()) completedAt DateTime? diff --git a/src/app/api/automation/sync/route.ts b/src/app/api/automation/sync/route.ts index f8ba69d..ea60055 100644 --- a/src/app/api/automation/sync/route.ts +++ b/src/app/api/automation/sync/route.ts @@ -12,6 +12,7 @@ import { } from "@/lib/github"; import { getTrackedRepos } from "@/lib/config"; import { authorizeRequest } from "@/lib/auth"; +import { acquireLock, releaseLock } from "@/lib/sync-lock"; async function syncRepo(repoFullName: string) { const parts = repoFullName.split("/"); @@ -334,24 +335,39 @@ export async function POST(request: Request) { const body = await request.json().catch(() => ({})); const repoFullName = body.repo || body.fullName; - if (repoFullName) { - const result = await syncRepo(repoFullName); - if (result.success) { - return NextResponse.json({ success: true, syncRunId: result.syncRunId }); - } else { - return NextResponse.json({ error: result.error }, { status: 500 }); - } + // Acquire shared DB lock to prevent overlapping runs across all sync types + const lockResult = await acquireLock("automation"); + if (!lockResult.locked) { + return NextResponse.json( + { error: "A sync is already running. Try again later.", locked: true }, + { status: 409 }, + ); } - const repos = await getTrackedRepos(); - const results = []; - for (const repo of repos) { - results.push({ repo, result: await syncRepo(repo) }); - } + const { runId } = lockResult; - return NextResponse.json({ - synced: results.filter((r) => r.result.success).length, - failed: results.filter((r) => !r.result.success).length, - results, - }); + try { + if (repoFullName) { + const result = await syncRepo(repoFullName); + if (result.success) { + return NextResponse.json({ success: true, syncRunId: result.syncRunId }); + } else { + return NextResponse.json({ error: result.error }, { status: 500 }); + } + } + + const repos = await getTrackedRepos(); + const results = []; + for (const repo of repos) { + results.push({ repo, result: await syncRepo(repo) }); + } + + return NextResponse.json({ + synced: results.filter((r) => r.result.success).length, + failed: results.filter((r) => !r.result.success).length, + results, + }); + } finally { + await releaseLock(runId).catch(() => {}); + } } diff --git a/src/app/api/issues/untriaged.test.ts b/src/app/api/issues/untriaged.test.ts new file mode 100644 index 0000000..8472768 --- /dev/null +++ b/src/app/api/issues/untriaged.test.ts @@ -0,0 +1,240 @@ +// @vitest-environment node +import { describe, expect, it, beforeEach, vi } from "vitest"; + +// Use dynamic doMock to avoid hoisting issues entirely. +vi.doMock("@/lib/prisma", () => ({ + prisma: { + issue: { + findMany: vi.fn(), + }, + }, +})); + +vi.doMock("@/lib/agent-queue", () => ({ + isRenovateIssue: vi.fn((issue: { title: string; labels: string[] }) => { + const title = issue.title.toLowerCase(); + return title.includes("dependency dashboard") || title.includes("renovate dashboard"); + }), +})); + +vi.doMock("@/types", () => ({ + STATUS_LABELS: ["status/backlog", "status/ready", "status/in-progress", "status/in-review", "status/done"], + PRIORITY_LABELS: ["priority/p0", "priority/p1", "priority/p2", "priority/p3"], + AGENT_PREFIX: "agent/", + OWNER_PREFIX: "owner/", + PROJECT_PREFIX: "project/", + BOARD_COLUMNS: [ + { id: "status/backlog" }, + { id: "status/ready" }, + { id: "status/in-progress" }, + { id: "status/in-review" }, + { id: "status/done" }, + ], + VALID_LANES: ["normal", "escalated", "backlog"], + VALID_CONFIDENCE: ["high", "medium", "low"], + isAgentLabel: (label: string) => label.startsWith("agent/"), + isOwnerLabel: (label: string) => label.startsWith("owner/"), + getStatusFromLabels: (_labels: string[]) => null, + getAgentFromLabels: (_labels: string[]) => null, + getOwnerFromLabels: (_labels: string[]) => null, + getPriorityFromLabels: (_labels: string[]) => null, +})); + +// Dynamic imports after doMock setup. +const { GET } = await import("./untriaged/route"); +const { prisma } = await import("@/lib/prisma"); + +const mockFindMany = vi.mocked(prisma.issue.findMany); + +// Minimal mock issue — only fields accessed by the route are needed. +type MockIssue = { + id: string; + number: number; + title: string; + url: string; + labels: string[]; + state: string; + createdAt: Date; + updatedAt: Date; + repository: { fullName: string }; +}; + +const makeIssue = (overrides: Partial = {}) => ({ + id: overrides.id ?? "issue_1", + number: overrides.number ?? 1, + title: overrides.title ?? "Test issue", + url: overrides.url ?? "https://github.com/test/repo/issues/1", + labels: overrides.labels ?? [], + state: overrides.state ?? "open", + createdAt: overrides.createdAt ?? new Date("2026-01-01"), + updatedAt: overrides.updatedAt ?? new Date("2026-01-01"), + repository: { fullName: overrides.repository?.fullName ?? "test/repo" }, +}) satisfies MockIssue; + +describe("GET /api/issues/untriaged", () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + it("returns only issues with no status/* label", async () => { + const issues = [ + makeIssue({ number: 1, labels: ["bug", "priority/p1"] }), // untriaged + makeIssue({ number: 2, labels: ["status/ready", "priority/p1"] }), // has status — excluded + makeIssue({ number: 3, labels: ["priority/p2"] }), // untriaged + makeIssue({ number: 4, labels: ["status/backlog"] }), // has status — excluded + ]; + mockFindMany.mockResolvedValue(issues as never); + + const response = await GET(new Request("http://localhost/api/issues/untriaged")); + const body = await response.json(); + + expect(response.status).toBe(200); + expect(body).toHaveLength(2); + expect(body.map((i: { number: number }) => i.number)).toEqual([1, 3]); + }); + + it("excludes closed issues", async () => { + const issues = [makeIssue({ number: 1, labels: ["bug"], state: "open" })]; + mockFindMany.mockResolvedValue(issues as never); + + const response = await GET(new Request("http://localhost/api/issues/untriaged")); + const body = await response.json(); + + expect(body).toHaveLength(1); + expect(body[0].number).toBe(1); + }); + + it("excludes issues with any status label", async () => { + const issues = [ + makeIssue({ number: 1, labels: ["status/done"] }), + makeIssue({ number: 2, labels: ["status/in-progress"] }), + makeIssue({ number: 3, labels: ["status/in-review"] }), + makeIssue({ number: 4, labels: [] }), // truly unlabelled — should be included + ]; + mockFindMany.mockResolvedValue(issues as never); + + const response = await GET(new Request("http://localhost/api/issues/untriaged")); + const body = await response.json(); + + expect(body).toHaveLength(1); + expect(body[0].number).toBe(4); + }); + + it("respects limit parameter (default 50)", async () => { + const issues = Array.from({ length: 100 }, (_, i) => makeIssue({ number: i + 1, labels: ["bug"] })); + mockFindMany.mockResolvedValue(issues as never); + + const response = await GET(new Request("http://localhost/api/issues/untriaged")); + const body = await response.json(); + + expect(body).toHaveLength(50); // default limit + }); + + it("respects custom limit parameter", async () => { + const issues = Array.from({ length: 100 }, (_, i) => makeIssue({ number: i + 1, labels: ["bug"] })); + mockFindMany.mockResolvedValue(issues as never); + + const response = await GET(new Request("http://localhost/api/issues/untriaged?limit=10")); + const body = await response.json(); + + expect(body).toHaveLength(10); + }); + + it("caps limit at 200", async () => { + const issues = Array.from({ length: 500 }, (_, i) => makeIssue({ number: i + 1, labels: ["bug"] })); + mockFindMany.mockResolvedValue(issues as never); + + const response = await GET(new Request("http://localhost/api/issues/untriaged?limit=999")); + const body = await response.json(); + + expect(body).toHaveLength(200); // hard cap + }); + + it("filters by repo when specified", async () => { + const issues = [ + makeIssue({ number: 1, labels: ["bug"], repository: { fullName: "test/repo" } }), + makeIssue({ number: 2, labels: ["bug"], repository: { fullName: "other/repo" } }), + ]; + mockFindMany.mockResolvedValue(issues as never); + + const response = await GET(new Request("http://localhost/api/issues/untriaged?repo=test/repo")); + const body = await response.json(); + + expect(body).toHaveLength(1); + expect(body[0].number).toBe(1); + }); + + it("excludes Renovate issues by default", async () => { + const issues = [ + makeIssue({ number: 1, title: "Dependency Dashboard", labels: ["bug"] }), + makeIssue({ number: 2, title: "Fix critical bug", labels: ["bug"] }), + ]; + mockFindMany.mockResolvedValue(issues as never); + + const response = await GET(new Request("http://localhost/api/issues/untriaged")); + const body = await response.json(); + + expect(body).toHaveLength(1); + expect(body[0].number).toBe(2); + }); + + it("includes Renovate issues when excludeRenovate=false", async () => { + const issues = [ + makeIssue({ number: 1, title: "Dependency Dashboard", labels: ["bug"] }), + makeIssue({ number: 2, title: "Fix critical bug", labels: ["bug"] }), + ]; + mockFindMany.mockResolvedValue(issues as never); + + const response = await GET(new Request("http://localhost/api/issues/untriaged?excludeRenovate=false")); + const body = await response.json(); + + expect(body).toHaveLength(2); + }); + + it("returns empty array when no untriaged issues exist", async () => { + const issues = [ + makeIssue({ number: 1, labels: ["status/ready"] }), + makeIssue({ number: 2, labels: ["status/backlog"] }), + ]; + mockFindMany.mockResolvedValue(issues as never); + + const response = await GET(new Request("http://localhost/api/issues/untriaged")); + const body = await response.json(); + + expect(body).toEqual([]); + }); + + it("returns correct issue shape with all fields", async () => { + const expectedIssue = makeIssue({ number: 42, title: "Untriaged bug", labels: ["bug"] }); + mockFindMany.mockResolvedValue([expectedIssue] as never); + + const response = await GET(new Request("http://localhost/api/issues/untriaged")); + const body = await response.json(); + + expect(body).toHaveLength(1); + const issue = body[0]; + expect(issue).toHaveProperty("id"); + expect(issue).toHaveProperty("number", 42); + expect(issue).toHaveProperty("title", "Untriaged bug"); + expect(issue).toHaveProperty("url"); + expect(issue).toHaveProperty("labels"); + expect(issue).toHaveProperty("state", "open"); + expect(issue).toHaveProperty("repository", { fullName: "test/repo" }); + }); + + it("orders by updatedAt descending", async () => { + const baseDate = new Date("2026-01-01"); + // Pre-sort descending since the route's orderBy is mocked out. + const issues = [ + makeIssue({ number: 3, labels: ["bug"], updatedAt: new Date(baseDate.getTime() + 2000) }), + makeIssue({ number: 1, labels: ["bug"], updatedAt: new Date(baseDate.getTime() + 1000) }), + makeIssue({ number: 2, labels: ["bug"], updatedAt: baseDate }), + ]; + mockFindMany.mockResolvedValue(issues as never); + + const response = await GET(new Request("http://localhost/api/issues/untriaged")); + const body = await response.json(); + + expect(body.map((i: { number: number }) => i.number)).toEqual([3, 1, 2]); + }); +}); diff --git a/src/app/api/issues/untriaged/route.ts b/src/app/api/issues/untriaged/route.ts new file mode 100644 index 0000000..916d418 --- /dev/null +++ b/src/app/api/issues/untriaged/route.ts @@ -0,0 +1,97 @@ +import { NextResponse } from "next/server"; +import { prisma } from "@/lib/prisma"; +import { STATUS_LABELS } from "@/types"; +import { isRenovateIssue } from "@/lib/agent-queue"; + +/** + * GET /api/issues/untriaged + * + * Returns open issues with no `status/*` label — an intake view for grooming. + * These issues are excluded from normal worker queues but need to be surfaced + * so they can be classified into status/backlog, status/ready, or closed. + * + * Query parameters: + * limit — max results (default 50, bounded per run) + * repo — filter by repository fullName + * excludeRenovate — skip Renovate/dashboard noise (default true) + */ + +interface UntriagedIssue { + id: string; + number: number; + title: string; + url: string; + labels: string[]; + state: string; + createdAt: Date; + updatedAt: Date; + repository: { fullName: string }; +} + +export async function GET(request: Request) { + try { + const { searchParams } = new URL(request.url); + const limit = Math.min( + parseInt(searchParams.get("limit") ?? "50", 10), + 200, // hard cap to prevent runaway queries + ); + const repoFilter = searchParams.get("repo"); + const excludeRenovate = searchParams.get("excludeRenovate") !== "false"; + + // Fetch all open issues from enabled repos + let where: Record = { + state: "open", + repository: { enabled: true }, + }; + + if (repoFilter) { + where = { + ...where, + repository: { enabled: true, fullName: repoFilter }, + }; + } + + const issues: UntriagedIssue[] = await prisma.issue.findMany({ + where, + select: { + id: true, + number: true, + title: true, + url: true, + labels: true, + state: true, + createdAt: true, + updatedAt: true, + repository: { select: { fullName: true } }, + }, + orderBy: { updatedAt: "desc" }, + }); + + // Filter to only issues with no status/* label (untriaged) + const untriaged = issues.filter((issue) => { + for (const label of issue.labels) { + if ((STATUS_LABELS as string[]).includes(label)) return false; + } + return true; + }); + + // Filter by repo if specified + let result = untriaged; + if (repoFilter) { + result = result.filter((issue) => issue.repository.fullName === repoFilter); + } + + // Optionally exclude Renovate/dashboard noise + if (excludeRenovate) { + result = result.filter((issue) => !isRenovateIssue(issue)); + } + + // Bound results per run + result = result.slice(0, limit); + + return NextResponse.json(result); + } catch (error) { + console.error("Failed to fetch untriaged issues:", error); + return NextResponse.json({ error: "Failed to fetch untriaged issues" }, { status: 500 }); + } +} diff --git a/src/app/api/pr-followup/sync/route.ts b/src/app/api/pr-followup/sync/route.ts index 4c7fc79..17a1d17 100644 --- a/src/app/api/pr-followup/sync/route.ts +++ b/src/app/api/pr-followup/sync/route.ts @@ -1,5 +1,6 @@ -import { NextResponse } from "next/server"; +import { NextRequest, NextResponse } from "next/server"; import { prisma, asPrFixQueueClient } from "@/lib/prisma"; +import { authorizeRequest } from "@/lib/auth"; import { getTrackedRepos } from "@/lib/config"; import { processPrFollowupEvents, isAllowedBotAuthor } from "@/lib/pr-followup-ingestion"; @@ -85,7 +86,12 @@ async function fetchWithGithub(url: string, token: string): Promise { return res.json(); } -export async function POST() { +export async function POST(request: NextRequest) { + const auth = await authorizeRequest(request); + if (!auth.authorized) { + return NextResponse.json({ error: "Unauthorized" }, { status: 401 }); + } + try { const token = process.env.GITHUB_TOKEN; if (!token) { diff --git a/src/app/api/pr-followup/webhook/route.ts b/src/app/api/pr-followup/webhook/route.ts index ff09d41..f588f35 100644 --- a/src/app/api/pr-followup/webhook/route.ts +++ b/src/app/api/pr-followup/webhook/route.ts @@ -1,5 +1,6 @@ import { NextResponse } from "next/server"; import { createHmac, timingSafeEqual } from "node:crypto"; +import { authorizeRequest } from "@/lib/auth"; import { prisma, asPrFixQueueClient } from "@/lib/prisma"; import { processPrFollowupEvents, PrFollowupEvent } from "@/lib/pr-followup-ingestion"; @@ -25,11 +26,22 @@ function extractLinkedIssueFromPr(pr: Record): number | null { * - pull_request (merge_state_status changes, etc.) * * Signature verification: validates X-Hub-Signature-256 using HMAC-SHA256 - * with the WEBHOOK_SECRET environment variable. Set this to your GitHub - * webhook secret in deployment configuration. If not set, verification is - * skipped (e.g. when behind an API gateway that handles auth). + * with the WEBHOOK_SECRET environment variable. + * + * Default behavior is fail-closed: if WEBHOOK_SECRET is not configured, + * requests are rejected unless WEBHOOK_GATEWAY_MODE is explicitly set to "true", + * which indicates the endpoint is behind a gateway that performs its own + * authentication and signature verification. */ +/** Is webhook signature verification enabled (fail-closed default)? */ +function isSignatureVerificationEnabled(): boolean { + const secret = process.env.WEBHOOK_SECRET; + if (secret) return true; + // Gateway mode: caller explicitly opts out of local signature verification + return process.env.WEBHOOK_GATEWAY_MODE === "true"; +} + function verifyWebhookSignature(secret: string, payload: Buffer, signature: string): boolean { if (!signature.startsWith("sha256=")) return false; const expected = signature.slice(9); @@ -179,18 +191,27 @@ export async function POST(request: Request) { return NextResponse.json({ error: "Missing x-github-event header" }, { status: 400 }); } + // Authenticate the request (Bearer token, Basic Auth, or OIDC session) + const auth = await authorizeRequest(request); + if (!auth.authorized) { + return NextResponse.json({ error: "Unauthorized" }, { status: 401 }); + } + // Read raw body once for signature verification and parsing const rawBody = await request.arrayBuffer(); const payload = Buffer.from(rawBody); - // Validate webhook signature if WEBHOOK_SECRET is configured - const webhookSecret = process.env.WEBHOOK_SECRET; - if (webhookSecret) { + // Webhook signature verification: fail-closed by default. + // If WEBHOOK_SECRET is set, always verify. If not set, only skip when + // WEBHOOK_GATEWAY_MODE=true (explicit opt-out for gateway deployments). + const sigVerificationEnabled = isSignatureVerificationEnabled(); + if (sigVerificationEnabled) { + const webhookSecret = process.env.WEBHOOK_SECRET; const signature = request.headers.get("x-hub-signature-256"); if (!signature) { return NextResponse.json({ error: "Missing x-hub-signature-256 header" }, { status: 401 }); } - if (!verifyWebhookSignature(webhookSecret, payload, signature)) { + if (!verifyWebhookSignature(webhookSecret!, payload, signature)) { return NextResponse.json({ error: "Invalid webhook signature" }, { status: 401 }); } } diff --git a/src/app/api/routes.test.ts b/src/app/api/routes.test.ts index d3606c6..bb81357 100644 --- a/src/app/api/routes.test.ts +++ b/src/app/api/routes.test.ts @@ -12,6 +12,7 @@ describe("API route smoke checks", () => { "src/app/api/issues/move/route.ts", "src/app/api/issues/claim/route.ts", "src/app/api/issues/unclaim/route.ts", + "src/app/api/issues/untriaged/route.ts", "src/app/api/automation/repos/[...repo]/route.ts", "src/app/api/health/route.ts", "src/app/api/agents/[agentName]/queue/route.ts", diff --git a/src/app/api/sync/route.ts b/src/app/api/sync/route.ts index df672f6..b8dcd5c 100644 --- a/src/app/api/sync/route.ts +++ b/src/app/api/sync/route.ts @@ -4,6 +4,7 @@ import { fetchIssues } from "@/lib/github"; import { getSyncRepos, parseExcludedLabels } from "@/lib/config"; import { syncIssuesForRepos, mergeLabels } from "@/lib/issue-sync"; import { authorizeRequest } from "@/lib/auth"; +import { acquireLock, releaseLock } from "@/lib/sync-lock"; export async function POST(request: NextRequest) { if (!(await authorizeRequest(request)).authorized) { @@ -34,35 +35,61 @@ export async function POST(request: NextRequest) { } } - const excludedLabels = parseExcludedLabels(process.env.DISPATCH_EXCLUDED_LABELS); - const result = await syncIssuesForRepos(repos, fetchIssues, { - findIssue(repositoryId, number) { - return prisma.issue.findUnique({ - where: { repositoryId_number: { repositoryId, number } }, - }); - }, - async updateIssue(id, data) { - // Preserve agent/* labels from Prisma in case GitHub hasn't propagated the claim yet. - // This prevents a race condition where the claim endpoint adds an agent label to both - // Prisma and GitHub, but a concurrent sync overwrites Prisma with stale GitHub data. - const existing = await prisma.issue.findUnique({ - where: { id }, - select: { labels: true }, - }); + // Acquire shared DB lock to prevent overlapping runs across all sync types + const lockResult = await acquireLock("manual"); + if (!lockResult.locked) { + return NextResponse.json( + { error: "A sync is already running. Try again later.", locked: true }, + { status: 409 }, + ); + } + + const { runId } = lockResult; + + try { + const excludedLabels = parseExcludedLabels(process.env.DISPATCH_EXCLUDED_LABELS); + const result = await syncIssuesForRepos(repos, fetchIssues, { + findIssue(repositoryId, number) { + return prisma.issue.findUnique({ + where: { repositoryId_number: { repositoryId, number } }, + }); + }, + async updateIssue(id, data) { + // Preserve agent/* labels from Prisma in case GitHub hasn't propagated the claim yet. + // This prevents a race condition where the claim endpoint adds an agent label to both + // Prisma and GitHub, but a concurrent sync overwrites Prisma with stale GitHub data. + const existing = await prisma.issue.findUnique({ + where: { id }, + select: { labels: true }, + }); - if (existing && existing.labels.length > 0) { - // Merge: use GitHub labels as base, add any agent/* labels from Prisma that aren't on GitHub - data.labels = mergeLabels(data.labels, existing.labels); - } + if (existing && existing.labels.length > 0) { + // Merge: use GitHub labels as base, add any agent/* labels from Prisma that aren't on GitHub + data.labels = mergeLabels(data.labels, existing.labels); + } - await prisma.issue.update({ where: { id }, data }); - }, - async createIssue(repositoryId, data) { - await prisma.issue.create({ data: { ...data, repositoryId } }); - }, - }, excludedLabels); + await prisma.issue.update({ where: { id }, data }); + }, + async createIssue(repositoryId, data) { + await prisma.issue.create({ data: { ...data, repositoryId } }); + }, + }, excludedLabels); - return NextResponse.json(result); + // Update the sync run record + await prisma.issueSyncRun.updateMany({ + where: { id: runId, status: "running" }, + data: { + status: "completed", + completedAt: new Date(), + reposFetched: result.repos ?? 0, + syncedCount: result.syncedCount ?? 0, + }, + }); + + return NextResponse.json(result); + } finally { + await releaseLock(runId).catch(() => {}); + } } catch (error) { console.error("Sync failed:", error); return NextResponse.json({ error: "Sync failed" }, { status: 500 }); diff --git a/src/app/api/sync/scheduled/route.ts b/src/app/api/sync/scheduled/route.ts index 50495b1..d99de8e 100644 --- a/src/app/api/sync/scheduled/route.ts +++ b/src/app/api/sync/scheduled/route.ts @@ -4,59 +4,7 @@ import { fetchIssues, fetchIssue, fetchRepo, fetchWorkflows, fetchRecentRunsAllW import { getSyncRepos, getTrackedRepos, parseExcludedLabels } from "@/lib/config"; import { syncIssuesForRepos, reconcileClosedIssues, SyncResponse, ClosedIssueReconcileResponse } from "@/lib/issue-sync"; import { authorizeRequest } from "@/lib/auth"; - -// --------------------------------------------------------------------------- -// Lock acquisition — DB-backed single-row guard to prevent overlapping runs. -// Uses upsert on a single "global" row; the first writer wins. -// --------------------------------------------------------------------------- - -async function acquireLock(): Promise<{ locked: true; runId: string } | { locked: false }> { - const existing = await prisma.syncLock.findUnique({ where: { id: "global" } }); - - if (existing && existing.syncRunId) { - const maxAgeMs = 30 * 60 * 1000; - const age = Date.now() - existing.acquiredAt.getTime(); - if (age < maxAgeMs) { - return { locked: false }; - } - // Stale lock — clear it and proceed - await prisma.syncLock.delete({ where: { id: "global" } }); - } - - try { - const runId = await prisma.$transaction(async (tx) => { - // Double-check inside the transaction for race safety - const stillExisting = await tx.syncLock.findUnique({ where: { id: "global" } }); - if (stillExisting && stillExisting.syncRunId) { - throw new Error("already_locked"); - } - - const run = await tx.issueSyncRun.create({ - data: { status: "running", syncType: "scheduled", startedAt: new Date() }, - }); - - await tx.syncLock.create({ - data: { id: "global", syncRunId: run.id, acquiredAt: new Date() }, - }); - - return run.id; - }); - - return { locked: true, runId }; - } catch (err) { - if (err instanceof Error && err.message === "already_locked") { - return { locked: false }; - } - // Re-throw unexpected errors - throw err; - } -} - -async function releaseLock(runId: string): Promise { - await prisma.syncLock.deleteMany({ - where: { id: "global", syncRunId: runId }, - }); -} +import { acquireLock, releaseLock } from "@/lib/sync-lock"; // --------------------------------------------------------------------------- // Route handlers @@ -83,11 +31,11 @@ export async function POST(request: Request) { const syncIssues = options.issues !== false; // default true const syncAutomation = options.automation === true; // default false - // Acquire DB lock to prevent overlapping runs - const lockResult = await acquireLock(); + // Acquire shared DB lock to prevent overlapping runs across all sync types + const lockResult = await acquireLock("scheduled"); if (!lockResult.locked) { return NextResponse.json( - { error: "A scheduled sync is already running. Try again later.", locked: true }, + { error: "A sync is already running. Try again later.", locked: true }, { status: 409 }, ); } diff --git a/src/lib/sync-lock.ts b/src/lib/sync-lock.ts new file mode 100644 index 0000000..fa6d36e --- /dev/null +++ b/src/lib/sync-lock.ts @@ -0,0 +1,88 @@ +/** + * Shared sync-locking module. + * + * Provides a DB-backed single-row lock (syncLock table) that all sync + * entry-points share to prevent overlapping concurrent runs across: + * - Scheduled sync (`/api/sync/scheduled`) + * - Manual issue sync (`/api/sync`) + * - Automation sync (`/api/automation/sync`) + * + * Lock semantics: + * - First writer wins; subsequent writers get a 409 Conflict. + * - Stale locks (>30 min) are automatically cleared. + * - Lock is released on successful completion or failure (via try/finally). + */ + +import { prisma } from "@/lib/prisma"; + +const LOCK_ID = "global" as const; +const MAX_AGE_MS = 30 * 60 * 1000; // 30 minutes + +export interface AcquiredLock { + locked: true; + runId: string; +} + +export interface LockConflict { + locked: false; +} + +/** + * Attempt to acquire the global sync lock. + * + * Returns `{ locked: true, runId }` on success or `{ locked: false }` when + * another run already holds (or has a non-stale) lock. + * + * Creates an IssueSyncRun record so we can track which sync type acquired it. + */ +export async function acquireLock( + syncType: "scheduled" | "manual" | "automation", +): Promise { + try { + const existing = await prisma.syncLock.findUnique({ where: { id: LOCK_ID } }); + + if (existing && existing.syncRunId) { + const age = Date.now() - existing.acquiredAt.getTime(); + if (age < MAX_AGE_MS) { + return { locked: false }; + } + // Stale lock — clear it and proceed + await prisma.syncLock.delete({ where: { id: LOCK_ID } }); + } + + const runId = await prisma.$transaction(async (tx) => { + // Double-check inside the transaction for race safety + const stillExisting = await tx.syncLock.findUnique({ where: { id: LOCK_ID } }); + if (stillExisting && stillExisting.syncRunId) { + throw new Error("already_locked"); + } + + const run = await tx.issueSyncRun.create({ + data: { status: "running", syncType, startedAt: new Date() }, + }); + + await tx.syncLock.create({ + data: { id: LOCK_ID, syncRunId: run.id, acquiredAt: new Date() }, + }); + + return run.id; + }); + + return { locked: true, runId }; + } catch (err) { + if (err instanceof Error && err.message === "already_locked") { + return { locked: false }; + } + throw err; + } +} + +/** + * Release the global sync lock for the given run. + * Uses a conditional delete to avoid releasing another run's lock. + */ +export async function releaseLock(runId: string): Promise { + await prisma.syncLock.deleteMany({ + where: { id: LOCK_ID, syncRunId: runId }, + }); +}