Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
240 changes: 239 additions & 1 deletion apps/server/src/git/Layers/GitCore.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { existsSync } from "node:fs";
import { existsSync, writeFileSync } from "node:fs";
import path from "node:path";

import * as NodeServices from "@effect/platform-node/NodeServices";
Expand Down Expand Up @@ -976,6 +976,244 @@ it.layer(TestLayer)("git integration", (it) => {
}),
);

it.effect("removes newly leaked tmp_pack files after a failed upstream refresh", () =>
Effect.gen(function* () {
const tmp = yield* makeTmpDir();
const gitCommonDir = path.join(tmp, ".git");
const packDir = path.join(gitCommonDir, "objects", "pack");
yield* makeDirectory(packDir);

const existingTmpPack = path.join(packDir, "tmp_pack_existing");
const leakedTmpPack = path.join(packDir, "tmp_pack_new");
const preservedPack = path.join(packDir, "pack-keep.pack");
yield* writeTextFile(existingTmpPack, "existing temporary pack\n");
yield* writeTextFile(preservedPack, "keep this pack file\n");

const ok = (stdout = "") =>
Effect.succeed({
code: 0,
stdout,
stderr: "",
stdoutTruncated: false,
stderrTruncated: false,
});

const core = yield* makeIsolatedGitCore((input) => {
if (
input.args[0] === "rev-parse" &&
input.args[1] === "--abbrev-ref" &&
input.args[2] === "--symbolic-full-name" &&
input.args[3] === "@{upstream}"
) {
return ok("origin/main\n");
}
if (input.args[0] === "remote") {
return ok("origin\n");
}
if (input.args[0] === "rev-parse" && input.args[1] === "--git-common-dir") {
return ok(`${gitCommonDir}\n`);
}
if (input.args[0] === "--git-dir" && input.args[2] === "fetch") {
return Effect.gen(function* () {
yield* Effect.sync(() => {
writeFileSync(leakedTmpPack, "leaked temporary pack\n");
});
return yield* new GitCommandError({
operation: input.operation,
command: `git ${input.args.join(" ")}`,
cwd: input.cwd,
detail: "simulated fetch timeout",
});
});
}
if (input.operation === "GitCore.statusDetails.status") {
return ok("# branch.head main\n# branch.upstream origin/main\n# branch.ab +0 -0\n");
}
if (
input.operation === "GitCore.statusDetails.unstagedNumstat" ||
input.operation === "GitCore.statusDetails.stagedNumstat"
) {
return ok();
}
if (input.operation === "GitCore.statusDetails.defaultRef") {
return ok("refs/remotes/origin/main\n");
}
return Effect.fail(
new GitCommandError({
operation: input.operation,
command: `git ${input.args.join(" ")}`,
cwd: input.cwd,
detail: "Unexpected git command in tmp_pack cleanup test.",
}),
);
});

const status = yield* core.statusDetails(tmp);
expect(status.branch).toBe("main");
expect(existsSync(leakedTmpPack)).toBe(false);
expect(existsSync(existingTmpPack)).toBe(true);
expect(existsSync(preservedPack)).toBe(true);
}),
);

it.effect("skips tmp_pack cleanup while another fetch lock is active", () =>
Effect.gen(function* () {
const tmp = yield* makeTmpDir();
const gitCommonDir = path.join(tmp, ".git");
const packDir = path.join(gitCommonDir, "objects", "pack");
const fetchHeadLock = path.join(gitCommonDir, "FETCH_HEAD.lock");
yield* makeDirectory(packDir);

const leakedTmpPack = path.join(packDir, "tmp_pack_new");
yield* writeTextFile(fetchHeadLock, "locked\n");

const ok = (stdout = "") =>
Effect.succeed({
code: 0,
stdout,
stderr: "",
stdoutTruncated: false,
stderrTruncated: false,
});

const core = yield* makeIsolatedGitCore((input) => {
if (
input.args[0] === "rev-parse" &&
input.args[1] === "--abbrev-ref" &&
input.args[2] === "--symbolic-full-name" &&
input.args[3] === "@{upstream}"
) {
return ok("origin/main\n");
}
if (input.args[0] === "remote") {
return ok("origin\n");
}
if (input.args[0] === "rev-parse" && input.args[1] === "--git-common-dir") {
return ok(`${gitCommonDir}\n`);
}
if (input.args[0] === "--git-dir" && input.args[2] === "fetch") {
return Effect.gen(function* () {
yield* Effect.sync(() => {
writeFileSync(leakedTmpPack, "leaked temporary pack\n");
});
return yield* new GitCommandError({
operation: input.operation,
command: `git ${input.args.join(" ")}`,
cwd: input.cwd,
detail: "simulated fetch timeout",
});
});
}
if (input.operation === "GitCore.statusDetails.status") {
return ok("# branch.head main\n# branch.upstream origin/main\n# branch.ab +0 -0\n");
}
if (
input.operation === "GitCore.statusDetails.unstagedNumstat" ||
input.operation === "GitCore.statusDetails.stagedNumstat"
) {
return ok();
}
if (input.operation === "GitCore.statusDetails.defaultRef") {
return ok("refs/remotes/origin/main\n");
}
return Effect.fail(
new GitCommandError({
operation: input.operation,
command: `git ${input.args.join(" ")}`,
cwd: input.cwd,
detail: "Unexpected git command in concurrent fetch cleanup test.",
}),
);
});

const status = yield* core.statusDetails(tmp);
expect(status.branch).toBe("main");
expect(existsSync(leakedTmpPack)).toBe(true);
}),
);

it.effect("skips tmp_pack cleanup while a linked worktree fetch lock is active", () =>
Effect.gen(function* () {
const tmp = yield* makeTmpDir();
const gitCommonDir = path.join(tmp, ".git");
const packDir = path.join(gitCommonDir, "objects", "pack");
const linkedWorktreeFetchHeadLock = path.join(
gitCommonDir,
"worktrees",
"feature",
"FETCH_HEAD.lock",
);
yield* makeDirectory(packDir);
yield* makeDirectory(path.dirname(linkedWorktreeFetchHeadLock));

const leakedTmpPack = path.join(packDir, "tmp_pack_new");
yield* writeTextFile(linkedWorktreeFetchHeadLock, "locked\n");

const ok = (stdout = "") =>
Effect.succeed({
code: 0,
stdout,
stderr: "",
stdoutTruncated: false,
stderrTruncated: false,
});

const core = yield* makeIsolatedGitCore((input) => {
if (
input.args[0] === "rev-parse" &&
input.args[1] === "--abbrev-ref" &&
input.args[2] === "--symbolic-full-name" &&
input.args[3] === "@{upstream}"
) {
return ok("origin/main\n");
}
if (input.args[0] === "remote") {
return ok("origin\n");
}
if (input.args[0] === "rev-parse" && input.args[1] === "--git-common-dir") {
return ok(`${gitCommonDir}\n`);
}
if (input.args[0] === "--git-dir" && input.args[2] === "fetch") {
return Effect.gen(function* () {
yield* Effect.sync(() => {
writeFileSync(leakedTmpPack, "leaked temporary pack\n");
});
return yield* new GitCommandError({
operation: input.operation,
command: `git ${input.args.join(" ")}`,
cwd: input.cwd,
detail: "simulated fetch timeout",
});
});
}
if (input.operation === "GitCore.statusDetails.status") {
return ok("# branch.head main\n# branch.upstream origin/main\n# branch.ab +0 -0\n");
}
if (
input.operation === "GitCore.statusDetails.unstagedNumstat" ||
input.operation === "GitCore.statusDetails.stagedNumstat"
) {
return ok();
}
if (input.operation === "GitCore.statusDetails.defaultRef") {
return ok("refs/remotes/origin/main\n");
}
return Effect.fail(
new GitCommandError({
operation: input.operation,
command: `git ${input.args.join(" ")}`,
cwd: input.cwd,
detail: "Unexpected git command in linked worktree fetch cleanup test.",
}),
);
});

const status = yield* core.statusDetails(tmp);
expect(status.branch).toBe("main");
expect(existsSync(leakedTmpPack)).toBe(true);
}),
);

it.effect("throws when branch does not exist", () =>
Effect.gen(function* () {
const tmp = yield* makeTmpDir();
Expand Down
100 changes: 91 additions & 9 deletions apps/server/src/git/Layers/GitCore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ const STATUS_UPSTREAM_REFRESH_INTERVAL = Duration.seconds(15);
const STATUS_UPSTREAM_REFRESH_TIMEOUT = Duration.seconds(5);
const STATUS_UPSTREAM_REFRESH_FAILURE_COOLDOWN = Duration.seconds(5);
const STATUS_UPSTREAM_REFRESH_CACHE_CAPACITY = 2_048;
const TEMPORARY_PACK_FILE_PREFIX = "tmp_pack_";
const DEFAULT_BASE_BRANCH_CANDIDATES = ["main", "master"] as const;
const GIT_LIST_BRANCHES_DEFAULT_LIMIT = 100;
const NON_REPOSITORY_STATUS_DETAILS = Object.freeze<GitStatusDetails>({
Expand Down Expand Up @@ -923,15 +924,96 @@ export const makeGitCore = Effect.fn("makeGitCore")(function* (options?: {
): Effect.Effect<void, GitCommandError> => {
const fetchCwd =
path.basename(gitCommonDir) === ".git" ? path.dirname(gitCommonDir) : gitCommonDir;
return executeGit(
"GitCore.fetchRemoteForStatus",
fetchCwd,
["--git-dir", gitCommonDir, "fetch", "--quiet", "--no-tags", remoteName],
{
allowNonZeroExit: true,
timeoutMs: Duration.toMillis(STATUS_UPSTREAM_REFRESH_TIMEOUT),
},
).pipe(Effect.asVoid);

const readTemporaryPackFiles = Effect.fn("readTemporaryPackFiles")(function* () {
const packDir = path.join(gitCommonDir, "objects", "pack");
const entries = yield* fileSystem
.readDirectory(packDir, { recursive: false })
.pipe(Effect.orElseSucceed(() => [] as Array<string>));
return new Set(
entries.filter((entry) => entry.startsWith(TEMPORARY_PACK_FILE_PREFIX)),
) as ReadonlySet<string>;
});

const listFetchHeadLockPaths = Effect.fn("listFetchHeadLockPaths")(function* () {
const worktreesDir = path.join(gitCommonDir, "worktrees");
const worktreeEntries = yield* fileSystem
.readDirectory(worktreesDir, { recursive: false })
.pipe(Effect.orElseSucceed(() => [] as Array<string>));
return [
path.join(gitCommonDir, "FETCH_HEAD.lock"),
...worktreeEntries.map((entry) => path.join(worktreesDir, entry, "FETCH_HEAD.lock")),
];
});

const hasConcurrentFetchLock = Effect.fn("hasConcurrentFetchLock")(function* () {
const fetchHeadLockPaths = yield* listFetchHeadLockPaths();
for (const fetchHeadLockPath of fetchHeadLockPaths) {
if (yield* fileSystem.exists(fetchHeadLockPath).pipe(Effect.orElseSucceed(() => false))) {
return true;
}
}
return false;
});

const removeLeakedTemporaryPackFiles = Effect.fn("removeLeakedTemporaryPackFiles")(function* (
temporaryPackFilesBeforeFetch: ReadonlySet<string>,
) {
if (yield* hasConcurrentFetchLock()) {
yield* Effect.logWarning(
"skipped leaked temporary git pack cleanup while another fetch is active",
{
gitCommonDir,
},
);
return;
}

const packDir = path.join(gitCommonDir, "objects", "pack");
const entries = yield* fileSystem
.readDirectory(packDir, { recursive: false })
.pipe(Effect.orElseSucceed(() => [] as Array<string>));
const leakedPackFiles = entries.filter(
(entry) =>
entry.startsWith(TEMPORARY_PACK_FILE_PREFIX) && !temporaryPackFilesBeforeFetch.has(entry),
);
if (leakedPackFiles.length === 0) {
return;
}

yield* Effect.logWarning("removed leaked temporary git pack files after failed refresh", {
gitCommonDir,
leakedPackFiles,
});

yield* Effect.forEach(
leakedPackFiles,
(entry) =>
fileSystem.remove(path.join(packDir, entry), { force: true }).pipe(
Effect.catch((error) =>
Effect.logWarning("failed to remove leaked temporary git pack file", {
gitCommonDir,
packFile: entry,
error: error.message,
}),
),
),
{ concurrency: "unbounded" },
);
});

return Effect.gen(function* () {
const temporaryPackFilesBeforeFetch = yield* readTemporaryPackFiles();
yield* executeGit(
"GitCore.fetchRemoteForStatus",
fetchCwd,
["--git-dir", gitCommonDir, "fetch", "--quiet", "--no-tags", remoteName],
{
timeoutMs: Duration.toMillis(STATUS_UPSTREAM_REFRESH_TIMEOUT),
fallbackErrorMessage: "git fetch failed",
},
).pipe(Effect.tapError(() => removeLeakedTemporaryPackFiles(temporaryPackFilesBeforeFetch)));
}).pipe(Effect.asVoid);
};

const resolveGitCommonDir = Effect.fn("resolveGitCommonDir")(function* (cwd: string) {
Expand Down
Loading