diff --git a/apps/server/src/git/Layers/GitCore.test.ts b/apps/server/src/git/Layers/GitCore.test.ts index 665c4b138f9..b857144f684 100644 --- a/apps/server/src/git/Layers/GitCore.test.ts +++ b/apps/server/src/git/Layers/GitCore.test.ts @@ -1,4 +1,4 @@ -import { existsSync } from "node:fs"; +import { existsSync, writeFileSync } from "node:fs"; import path from "node:path"; import * as NodeServices from "@effect/platform-node/NodeServices"; @@ -976,6 +976,244 @@ it.layer(TestLayer)("git integration", (it) => { }), ); + it.effect("removes newly leaked tmp_pack files after a failed upstream refresh", () => + Effect.gen(function* () { + const tmp = yield* makeTmpDir(); + const gitCommonDir = path.join(tmp, ".git"); + const packDir = path.join(gitCommonDir, "objects", "pack"); + yield* makeDirectory(packDir); + + const existingTmpPack = path.join(packDir, "tmp_pack_existing"); + const leakedTmpPack = path.join(packDir, "tmp_pack_new"); + const preservedPack = path.join(packDir, "pack-keep.pack"); + yield* writeTextFile(existingTmpPack, "existing temporary pack\n"); + yield* writeTextFile(preservedPack, "keep this pack file\n"); + + const ok = (stdout = "") => + Effect.succeed({ + code: 0, + stdout, + stderr: "", + stdoutTruncated: false, + stderrTruncated: false, + }); + + const core = yield* makeIsolatedGitCore((input) => { + if ( + input.args[0] === "rev-parse" && + input.args[1] === "--abbrev-ref" && + input.args[2] === "--symbolic-full-name" && + input.args[3] === "@{upstream}" + ) { + return ok("origin/main\n"); + } + if (input.args[0] === "remote") { + return ok("origin\n"); + } + if (input.args[0] === "rev-parse" && input.args[1] === "--git-common-dir") { + return ok(`${gitCommonDir}\n`); + } + if (input.args[0] === "--git-dir" && input.args[2] === "fetch") { + return Effect.gen(function* () { + yield* Effect.sync(() => { + writeFileSync(leakedTmpPack, "leaked temporary pack\n"); + }); + return yield* new GitCommandError({ + operation: input.operation, + command: `git ${input.args.join(" ")}`, + cwd: input.cwd, + detail: "simulated fetch timeout", + }); + }); + } + if (input.operation === "GitCore.statusDetails.status") { + return ok("# branch.head main\n# branch.upstream origin/main\n# branch.ab +0 -0\n"); + } + if ( + input.operation === "GitCore.statusDetails.unstagedNumstat" || + input.operation === "GitCore.statusDetails.stagedNumstat" + ) { + return ok(); + } + if (input.operation === "GitCore.statusDetails.defaultRef") { + return ok("refs/remotes/origin/main\n"); + } + return Effect.fail( + new GitCommandError({ + operation: input.operation, + command: `git ${input.args.join(" ")}`, + cwd: input.cwd, + detail: "Unexpected git command in tmp_pack cleanup test.", + }), + ); + }); + + const status = yield* core.statusDetails(tmp); + expect(status.branch).toBe("main"); + expect(existsSync(leakedTmpPack)).toBe(false); + expect(existsSync(existingTmpPack)).toBe(true); + expect(existsSync(preservedPack)).toBe(true); + }), + ); + + it.effect("skips tmp_pack cleanup while another fetch lock is active", () => + Effect.gen(function* () { + const tmp = yield* makeTmpDir(); + const gitCommonDir = path.join(tmp, ".git"); + const packDir = path.join(gitCommonDir, "objects", "pack"); + const fetchHeadLock = path.join(gitCommonDir, "FETCH_HEAD.lock"); + yield* makeDirectory(packDir); + + const leakedTmpPack = path.join(packDir, "tmp_pack_new"); + yield* writeTextFile(fetchHeadLock, "locked\n"); + + const ok = (stdout = "") => + Effect.succeed({ + code: 0, + stdout, + stderr: "", + stdoutTruncated: false, + stderrTruncated: false, + }); + + const core = yield* makeIsolatedGitCore((input) => { + if ( + input.args[0] === "rev-parse" && + input.args[1] === "--abbrev-ref" && + input.args[2] === "--symbolic-full-name" && + input.args[3] === "@{upstream}" + ) { + return ok("origin/main\n"); + } + if (input.args[0] === "remote") { + return ok("origin\n"); + } + if (input.args[0] === "rev-parse" && input.args[1] === "--git-common-dir") { + return ok(`${gitCommonDir}\n`); + } + if (input.args[0] === "--git-dir" && input.args[2] === "fetch") { + return Effect.gen(function* () { + yield* Effect.sync(() => { + writeFileSync(leakedTmpPack, "leaked temporary pack\n"); + }); + return yield* new GitCommandError({ + operation: input.operation, + command: `git ${input.args.join(" ")}`, + cwd: input.cwd, + detail: "simulated fetch timeout", + }); + }); + } + if (input.operation === "GitCore.statusDetails.status") { + return ok("# branch.head main\n# branch.upstream origin/main\n# branch.ab +0 -0\n"); + } + if ( + input.operation === "GitCore.statusDetails.unstagedNumstat" || + input.operation === "GitCore.statusDetails.stagedNumstat" + ) { + return ok(); + } + if (input.operation === "GitCore.statusDetails.defaultRef") { + return ok("refs/remotes/origin/main\n"); + } + return Effect.fail( + new GitCommandError({ + operation: input.operation, + command: `git ${input.args.join(" ")}`, + cwd: input.cwd, + detail: "Unexpected git command in concurrent fetch cleanup test.", + }), + ); + }); + + const status = yield* core.statusDetails(tmp); + expect(status.branch).toBe("main"); + expect(existsSync(leakedTmpPack)).toBe(true); + }), + ); + + it.effect("skips tmp_pack cleanup while a linked worktree fetch lock is active", () => + Effect.gen(function* () { + const tmp = yield* makeTmpDir(); + const gitCommonDir = path.join(tmp, ".git"); + const packDir = path.join(gitCommonDir, "objects", "pack"); + const linkedWorktreeFetchHeadLock = path.join( + gitCommonDir, + "worktrees", + "feature", + "FETCH_HEAD.lock", + ); + yield* makeDirectory(packDir); + yield* makeDirectory(path.dirname(linkedWorktreeFetchHeadLock)); + + const leakedTmpPack = path.join(packDir, "tmp_pack_new"); + yield* writeTextFile(linkedWorktreeFetchHeadLock, "locked\n"); + + const ok = (stdout = "") => + Effect.succeed({ + code: 0, + stdout, + stderr: "", + stdoutTruncated: false, + stderrTruncated: false, + }); + + const core = yield* makeIsolatedGitCore((input) => { + if ( + input.args[0] === "rev-parse" && + input.args[1] === "--abbrev-ref" && + input.args[2] === "--symbolic-full-name" && + input.args[3] === "@{upstream}" + ) { + return ok("origin/main\n"); + } + if (input.args[0] === "remote") { + return ok("origin\n"); + } + if (input.args[0] === "rev-parse" && input.args[1] === "--git-common-dir") { + return ok(`${gitCommonDir}\n`); + } + if (input.args[0] === "--git-dir" && input.args[2] === "fetch") { + return Effect.gen(function* () { + yield* Effect.sync(() => { + writeFileSync(leakedTmpPack, "leaked temporary pack\n"); + }); + return yield* new GitCommandError({ + operation: input.operation, + command: `git ${input.args.join(" ")}`, + cwd: input.cwd, + detail: "simulated fetch timeout", + }); + }); + } + if (input.operation === "GitCore.statusDetails.status") { + return ok("# branch.head main\n# branch.upstream origin/main\n# branch.ab +0 -0\n"); + } + if ( + input.operation === "GitCore.statusDetails.unstagedNumstat" || + input.operation === "GitCore.statusDetails.stagedNumstat" + ) { + return ok(); + } + if (input.operation === "GitCore.statusDetails.defaultRef") { + return ok("refs/remotes/origin/main\n"); + } + return Effect.fail( + new GitCommandError({ + operation: input.operation, + command: `git ${input.args.join(" ")}`, + cwd: input.cwd, + detail: "Unexpected git command in linked worktree fetch cleanup test.", + }), + ); + }); + + const status = yield* core.statusDetails(tmp); + expect(status.branch).toBe("main"); + expect(existsSync(leakedTmpPack)).toBe(true); + }), + ); + it.effect("throws when branch does not exist", () => Effect.gen(function* () { const tmp = yield* makeTmpDir(); diff --git a/apps/server/src/git/Layers/GitCore.ts b/apps/server/src/git/Layers/GitCore.ts index 3e9df316f1e..4b7ebb81266 100644 --- a/apps/server/src/git/Layers/GitCore.ts +++ b/apps/server/src/git/Layers/GitCore.ts @@ -58,6 +58,7 @@ const STATUS_UPSTREAM_REFRESH_INTERVAL = Duration.seconds(15); const STATUS_UPSTREAM_REFRESH_TIMEOUT = Duration.seconds(5); const STATUS_UPSTREAM_REFRESH_FAILURE_COOLDOWN = Duration.seconds(5); const STATUS_UPSTREAM_REFRESH_CACHE_CAPACITY = 2_048; +const TEMPORARY_PACK_FILE_PREFIX = "tmp_pack_"; const DEFAULT_BASE_BRANCH_CANDIDATES = ["main", "master"] as const; const GIT_LIST_BRANCHES_DEFAULT_LIMIT = 100; const NON_REPOSITORY_STATUS_DETAILS = Object.freeze({ @@ -923,15 +924,96 @@ export const makeGitCore = Effect.fn("makeGitCore")(function* (options?: { ): Effect.Effect => { const fetchCwd = path.basename(gitCommonDir) === ".git" ? path.dirname(gitCommonDir) : gitCommonDir; - return executeGit( - "GitCore.fetchRemoteForStatus", - fetchCwd, - ["--git-dir", gitCommonDir, "fetch", "--quiet", "--no-tags", remoteName], - { - allowNonZeroExit: true, - timeoutMs: Duration.toMillis(STATUS_UPSTREAM_REFRESH_TIMEOUT), - }, - ).pipe(Effect.asVoid); + + const readTemporaryPackFiles = Effect.fn("readTemporaryPackFiles")(function* () { + const packDir = path.join(gitCommonDir, "objects", "pack"); + const entries = yield* fileSystem + .readDirectory(packDir, { recursive: false }) + .pipe(Effect.orElseSucceed(() => [] as Array)); + return new Set( + entries.filter((entry) => entry.startsWith(TEMPORARY_PACK_FILE_PREFIX)), + ) as ReadonlySet; + }); + + const listFetchHeadLockPaths = Effect.fn("listFetchHeadLockPaths")(function* () { + const worktreesDir = path.join(gitCommonDir, "worktrees"); + const worktreeEntries = yield* fileSystem + .readDirectory(worktreesDir, { recursive: false }) + .pipe(Effect.orElseSucceed(() => [] as Array)); + return [ + path.join(gitCommonDir, "FETCH_HEAD.lock"), + ...worktreeEntries.map((entry) => path.join(worktreesDir, entry, "FETCH_HEAD.lock")), + ]; + }); + + const hasConcurrentFetchLock = Effect.fn("hasConcurrentFetchLock")(function* () { + const fetchHeadLockPaths = yield* listFetchHeadLockPaths(); + for (const fetchHeadLockPath of fetchHeadLockPaths) { + if (yield* fileSystem.exists(fetchHeadLockPath).pipe(Effect.orElseSucceed(() => false))) { + return true; + } + } + return false; + }); + + const removeLeakedTemporaryPackFiles = Effect.fn("removeLeakedTemporaryPackFiles")(function* ( + temporaryPackFilesBeforeFetch: ReadonlySet, + ) { + if (yield* hasConcurrentFetchLock()) { + yield* Effect.logWarning( + "skipped leaked temporary git pack cleanup while another fetch is active", + { + gitCommonDir, + }, + ); + return; + } + + const packDir = path.join(gitCommonDir, "objects", "pack"); + const entries = yield* fileSystem + .readDirectory(packDir, { recursive: false }) + .pipe(Effect.orElseSucceed(() => [] as Array)); + const leakedPackFiles = entries.filter( + (entry) => + entry.startsWith(TEMPORARY_PACK_FILE_PREFIX) && !temporaryPackFilesBeforeFetch.has(entry), + ); + if (leakedPackFiles.length === 0) { + return; + } + + yield* Effect.logWarning("removed leaked temporary git pack files after failed refresh", { + gitCommonDir, + leakedPackFiles, + }); + + yield* Effect.forEach( + leakedPackFiles, + (entry) => + fileSystem.remove(path.join(packDir, entry), { force: true }).pipe( + Effect.catch((error) => + Effect.logWarning("failed to remove leaked temporary git pack file", { + gitCommonDir, + packFile: entry, + error: error.message, + }), + ), + ), + { concurrency: "unbounded" }, + ); + }); + + return Effect.gen(function* () { + const temporaryPackFilesBeforeFetch = yield* readTemporaryPackFiles(); + yield* executeGit( + "GitCore.fetchRemoteForStatus", + fetchCwd, + ["--git-dir", gitCommonDir, "fetch", "--quiet", "--no-tags", remoteName], + { + timeoutMs: Duration.toMillis(STATUS_UPSTREAM_REFRESH_TIMEOUT), + fallbackErrorMessage: "git fetch failed", + }, + ).pipe(Effect.tapError(() => removeLeakedTemporaryPackFiles(temporaryPackFilesBeforeFetch))); + }).pipe(Effect.asVoid); }; const resolveGitCommonDir = Effect.fn("resolveGitCommonDir")(function* (cwd: string) {