Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 89 additions & 57 deletions src/commands/watch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import * as path from "node:path";
import { Command } from "commander";
import { createFileSystem, createStore } from "../lib/context";
import { DEFAULT_IGNORE_PATTERNS } from "../lib/file";
import { acquireLock, isLocked, releaseLock } from "../lib/lock";
import {
createIndexingSpinner,
formatDryRunSummary,
Expand All @@ -19,11 +20,41 @@ export async function startWatch(options: {
dryRun: boolean;
}): Promise<void> {
let refreshInterval: NodeJS.Timeout | undefined;
const watchRoot = process.cwd();

const skipSync = isLocked(watchRoot);
if (skipSync) {
console.log(
"Another watch process is already syncing this directory. Skipping initial sync.",
);
}

const hasLock = acquireLock(watchRoot);
if (!hasLock && !skipSync) {
console.log(
"Another watch process is already syncing this directory. Skipping initial sync.",
);
}

const cleanup = (): void => {
if (hasLock) {
releaseLock(watchRoot);
}
};

process.on("exit", cleanup);
process.on("SIGINT", () => {
cleanup();
process.exit(0);
});
process.on("SIGTERM", () => {
cleanup();
process.exit(0);
});

try {
const store = await createStore();

// Refresh JWT token every 5 minutes (before 15-minute expiration)
if (!options.dryRun) {
const REFRESH_INTERVAL = 5 * 60 * 1000;
refreshInterval = setInterval(async () => {
Expand All @@ -36,73 +67,73 @@ export async function startWatch(options: {
);
}
}, REFRESH_INTERVAL);
// Allow process to exit even if interval is active (fs.watch keeps it alive anyway)
refreshInterval.unref();
}

const fileSystem = createFileSystem({
ignorePatterns: [...DEFAULT_IGNORE_PATTERNS],
});
const watchRoot = process.cwd();
console.debug("Watching for file changes in", watchRoot);

const { spinner, onProgress } = createIndexingSpinner(watchRoot);
try {
if (hasLock) {
const { spinner, onProgress } = createIndexingSpinner(watchRoot);
try {
await store.retrieve(options.store);
} catch {
await store.create({
name: options.store,
description:
"mgrep store - Mixedbreads multimodal multilingual magic search",
});
}
const result = await initialSync(
store,
fileSystem,
options.store,
watchRoot,
options.dryRun,
onProgress,
);
const deletedInfo =
result.deleted > 0 ? ` • deleted ${result.deleted}` : "";
const errorsInfo =
result.errors > 0 ? ` • errors ${result.errors}` : "";
if (result.errors > 0) {
spinner.warn(
`Initial sync complete (${result.processed}/${result.total}) • uploaded ${result.uploaded}${deletedInfo}${errorsInfo}`,
);
console.error(
`\n⚠️ ${result.errors} file(s) failed to upload. Run with DEBUG=mgrep* for more details.`,
);
} else {
spinner.succeed(
`Initial sync complete (${result.processed}/${result.total}) • uploaded ${result.uploaded}${deletedInfo}`,
);
}
if (options.dryRun) {
console.log(
formatDryRunSummary(result, {
actionDescription: "found",
includeTotal: true,
}),
);
return;
}
} catch (e) {
if (e instanceof QuotaExceededError) {
spinner.fail("Quota exceeded");
console.error(
"\n❌ Free tier quota exceeded. You've reached the monthly limit of 2,000,000 store tokens.",
);
console.error(
" Upgrade your plan at https://platform.mixedbread.com to continue syncing.\n",
try {
await store.retrieve(options.store);
} catch {
await store.create({
name: options.store,
description:
"mgrep store - Mixedbreads multimodal multilingual magic search",
});
}
const result = await initialSync(
store,
fileSystem,
options.store,
watchRoot,
options.dryRun,
onProgress,
);
process.exit(1);
const deletedInfo =
result.deleted > 0 ? ` • deleted ${result.deleted}` : "";
const errorsInfo =
result.errors > 0 ? ` • errors ${result.errors}` : "";
if (result.errors > 0) {
spinner.warn(
`Initial sync complete (${result.processed}/${result.total}) • uploaded ${result.uploaded}${deletedInfo}${errorsInfo}`,
);
console.error(
`\n⚠️ ${result.errors} file(s) failed to upload. Run with DEBUG=mgrep* for more details.`,
);
} else {
spinner.succeed(
`Initial sync complete (${result.processed}/${result.total}) • uploaded ${result.uploaded}${deletedInfo}`,
);
}
if (options.dryRun) {
console.log(
formatDryRunSummary(result, {
actionDescription: "found",
includeTotal: true,
}),
);
return;
}
} catch (e) {
if (e instanceof QuotaExceededError) {
spinner.fail("Quota exceeded");
console.error(
"\n❌ Free tier quota exceeded. You've reached the monthly limit of 2,000,000 store tokens.",
);
console.error(
" Upgrade your plan at https://platform.mixedbread.com to continue syncing.\n",
);
process.exit(1);
}
spinner.fail("Initial upload failed");
throw e;
}
spinner.fail("Initial upload failed");
throw e;
}

console.log("Watching for file changes in", watchRoot);
Expand Down Expand Up @@ -141,6 +172,7 @@ export async function startWatch(options: {
if (refreshInterval) {
clearInterval(refreshInterval);
}
cleanup();
const message = error instanceof Error ? error.message : "Unknown error";
console.error("Failed to start watcher:", message);
process.exitCode = 1;
Expand Down
98 changes: 98 additions & 0 deletions src/lib/lock.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import * as fs from "node:fs";
import * as path from "node:path";

const LOCK_DIR = "/tmp";
const LOCK_PREFIX = "mgrep-watch-lock-";

/**
* Generates a lock file path based on the directory being watched.
* Uses a hash of the directory path to create a unique lock file name.
*/
function getLockFilePath(watchDir: string): string {
const normalizedPath = path.resolve(watchDir);
const hash = Buffer.from(normalizedPath).toString("base64url");
return path.join(LOCK_DIR, `${LOCK_PREFIX}${hash}.lock`);
}

/**
* Attempts to acquire a lock for the given directory.
* Returns true if the lock was acquired, false if another process holds it.
*/
export function acquireLock(watchDir: string): boolean {
const lockFile = getLockFilePath(watchDir);

try {
if (fs.existsSync(lockFile)) {
const content = fs.readFileSync(lockFile, "utf-8");
const pid = Number.parseInt(content.trim(), 10);

if (!Number.isNaN(pid)) {
try {
process.kill(pid, 0);
return false;
} catch {
fs.unlinkSync(lockFile);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Stale lock cleanup can throw unhandled ENOENT error

When multiple processes detect a stale lock file simultaneously, they may both attempt to call fs.unlinkSync(lockFile). The first process succeeds, but the second throws an ENOENT error because the file no longer exists. This error is not caught by the inner try-catch (which only handles process.kill errors) and propagates to the outer catch which only handles EEXIST. The unhandled ENOENT gets re-thrown, causing the process to crash instead of gracefully failing to acquire the lock.

Fix in Cursor Fix in Web

}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Corrupt lock file permanently blocks lock acquisition

When a lock file exists but contains invalid content (e.g., empty or non-numeric), Number.parseInt returns NaN, causing the !Number.isNaN(pid) condition to be false. The code skips the cleanup block entirely and attempts to create the lock file with the wx flag, which fails with EEXIST. This results in acquireLock returning false even though no valid process holds the lock, permanently blocking lock acquisition until the corrupt file is manually removed.

Fix in Cursor Fix in Web

}

fs.writeFileSync(lockFile, process.pid.toString(), { flag: "wx" });
return true;
} catch (err) {
if (err instanceof Error && "code" in err && err.code === "EEXIST") {
return false;
}
throw err;
}
}

/**
* Releases the lock for the given directory.
* Only removes the lock file if this process owns it.
*/
export function releaseLock(watchDir: string): void {
const lockFile = getLockFilePath(watchDir);

try {
if (fs.existsSync(lockFile)) {
const content = fs.readFileSync(lockFile, "utf-8");
const pid = Number.parseInt(content.trim(), 10);

if (pid === process.pid) {
fs.unlinkSync(lockFile);
}
}
} catch {
// Ignore errors during cleanup
}
}

/**
* Checks if a lock exists for the given directory.
* Returns true if the lock is held by a running process.
*/
export function isLocked(watchDir: string): boolean {
const lockFile = getLockFilePath(watchDir);

try {
if (!fs.existsSync(lockFile)) {
return false;
}

const content = fs.readFileSync(lockFile, "utf-8");
const pid = Number.parseInt(content.trim(), 10);

if (Number.isNaN(pid)) {
return false;
}

try {
process.kill(pid, 0);
return true;
} catch {
return false;
}
} catch {
return false;
}
}
Loading