Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ compose.yml
CLAUDE.md
.claude/
.venv/
build/
__pycache__/
*.egg-info/
*.pyc
.pytest_cache/
.ruff_cache/
/data
.idea
.idea
8 changes: 7 additions & 1 deletion app/backends/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import asyncio
import logging
import shutil
from abc import ABC, abstractmethod
from pathlib import Path, PurePosixPath
from typing import TYPE_CHECKING
Expand Down Expand Up @@ -122,7 +123,12 @@ async def prepare(
f"{resolved_ref}:{job_ref}",
)

# 4. Create worktree (detached HEAD)
# 4. Create worktree (detached HEAD).
# Prune stale entries first, then remove any leftover directory so a
# retry after a failed/cancelled run doesn't hit "already exists".
await self._run_git_cmd("git", "-C", repo_dir, "worktree", "prune")
if Path(work_dir).exists():
shutil.rmtree(work_dir)
await self._run_git_cmd(
"git",
"-C",
Expand Down
69 changes: 52 additions & 17 deletions app/backends/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@
import shutil
import signal
import sqlite3
import sys
from pathlib import Path
from typing import TYPE_CHECKING

from app.backends.base import CHUNK_SIZE, SNKMT_DB_FILENAME, ComputeBackend
from app.config import LocalConfig
from app.models import WorkflowFileInfo
from app.utils import (
build_windows_wrapper_script,
build_wrapper_script,
enforce_error_limit,
rename_with_cleanup,
Expand All @@ -31,12 +33,13 @@ class LocalBackend(ComputeBackend):

def __init__(self, config: LocalConfig) -> None:
self._config = config
self._scratch_dir_path = Path(config.scratch_dir).resolve()

def work_dir(self, job_id: str) -> str:
return f"{self._config.scratch_dir}/jobs/{job_id}"
return str(self._scratch_dir_path / "jobs" / job_id)

def _scratch_dir(self) -> str:
return self._config.scratch_dir
return str(self._scratch_dir_path)

async def _run_git_cmd(self, *args: str) -> str:
"""Run a git subprocess locally, returning stdout."""
Expand Down Expand Up @@ -66,7 +69,7 @@ async def prepare(
workflow: str,
git_ref: str | None = None,
) -> tuple[str, str | None, str | None]:
Path(self._config.scratch_dir).mkdir(parents=True, exist_ok=True)
self._scratch_dir_path.mkdir(parents=True, exist_ok=True)
return await super().prepare(job_id, workflow, git_ref)

async def setup(
Expand Down Expand Up @@ -97,23 +100,47 @@ async def launch(
)

wd = Path(work_dir)
script_path = wd / ".run.sh"
await asyncio.to_thread(script_path.write_text, wrapper_content)
await asyncio.to_thread(script_path.chmod, 0o755)

proc = await asyncio.create_subprocess_shell(
"nohup bash "
f"{shlex.quote(script_path.name)}"
" < /dev/null > /dev/null 2>&1 &",
cwd=work_dir,
)
await proc.wait()

pid_path = wd / ".pid"

async def _pid_exists() -> bool:
return await asyncio.to_thread(pid_path.exists)

if sys.platform == "win32":
wrapper_content = build_windows_wrapper_script(
self._config.pixi_path, snkmt_db_path, configfile, snakemake_args, env_vars
)
script_path = wd / ".run.ps1"
await asyncio.to_thread(script_path.write_text, wrapper_content, encoding="utf-8")

# cmd /c start /b detaches PowerShell immediately (equivalent to Unix &).
# PowerShell writes $PID to .pid so we can track and kill the process.
proc = await asyncio.create_subprocess_exec(
"cmd", "/c", "start", "/b", "powershell",
"-NonInteractive", "-NoProfile",
"-ExecutionPolicy", "Bypass",
"-File", str(script_path),
stdin=asyncio.subprocess.DEVNULL,
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.DEVNULL,
cwd=work_dir,
)
await proc.wait()
else:
wrapper_content = build_wrapper_script(
self._config.pixi_path, snkmt_db_path, configfile, snakemake_args, env_vars
)
script_path = wd / ".run.sh"
await asyncio.to_thread(script_path.write_text, wrapper_content)
await asyncio.to_thread(script_path.chmod, 0o755)

proc = await asyncio.create_subprocess_shell(
"nohup bash "
f"{shlex.quote(script_path.name)}"
" < /dev/null > /dev/null 2>&1 &",
cwd=work_dir,
)
await proc.wait()

await self._poll_until_pid_file(job_id, work_dir, _pid_exists)

async def monitor(
Expand Down Expand Up @@ -277,6 +304,13 @@ async def sync_snkmt_db(self, job_id: str, work_dir: str, local_path: Path) -> N
src = Path(work_dir).resolve() / SNKMT_DB_FILENAME
if not src.exists():
return
if src == local_path.resolve():
logger.debug(
"Skipping snkmt.db self-sync for job %s because source and destination are identical: %s",
job_id,
src,
)
return
local_path.parent.mkdir(parents=True, exist_ok=True)

def _backup() -> None:
Expand Down Expand Up @@ -311,8 +345,9 @@ async def cleanup(
try:
pid = int(pid_path.read_text().strip())
os.kill(pid, signal.SIGTERM)
await asyncio.sleep(5)
os.kill(pid, signal.SIGKILL)
if hasattr(signal, "SIGKILL"): # not defined on Windows
await asyncio.sleep(5)
os.kill(pid, signal.SIGKILL)
except ProcessLookupError as exc:
logger.debug("Process already gone for job %s: %s", job_id, exc)
except (ValueError, OSError) as exc:
Expand Down
39 changes: 39 additions & 0 deletions app/routes/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import asyncio
import logging
import mimetypes
import subprocess
import sys
import uuid
from pathlib import Path as FilePath
from pathlib import PurePosixPath
Expand Down Expand Up @@ -70,6 +72,30 @@ def _validate_output_path(path: str) -> str:
return normalized


def _reveal_path(path: FilePath) -> str:
"""Reveal a path in the local file manager, selecting the file when supported."""
if not path.exists():
raise HTTPException(status_code=404, detail="File not found")

if sys.platform == "darwin":
cmd = ["open", "-R", str(path)]
message = f"Revealed {path.name} in Finder"
elif sys.platform == "win32":
cmd = ["explorer", f"/select,{path}"]
message = f"Revealed {path.name} in Explorer"
else:
cmd = ["xdg-open", str(path.parent)]
message = f"Opened {path.parent}"

try:
subprocess.Popen(cmd) # noqa: S603
except OSError as exc:
raise HTTPException(
status_code=500, detail=f"Failed to reveal file: {exc}"
) from exc
return message


def _build_outputs_response(
files_raw: list[WorkflowFileInfo],
) -> JobOutputsResponse:
Expand Down Expand Up @@ -223,6 +249,19 @@ async def event_generator() -> AsyncGenerator[dict[str, str], None]:
return EventSourceResponse(event_generator())


@router.post("/jobs/{job_id}/logs/reveal")
async def reveal_log_file(
job_id: Annotated[str, Path()],
store: Annotated[JobStore, Depends(get_store)],
) -> dict[str, str]:
"""Reveal the persisted log file in the local file manager."""
require_job(store, job_id)
log_path = store.get_log_path(job_id)
if log_path is None:
raise HTTPException(status_code=500, detail="Persistence not configured")
return {"message": _reveal_path(log_path)}


@router.get("/jobs/{job_id}/outputs", response_model=JobOutputsResponse)
async def list_outputs(
job_id: Annotated[str, Path()],
Expand Down
2 changes: 1 addition & 1 deletion app/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def persist(self, record: JobRecord) -> None:
target = job_dir.joinpath("job.json")
tmp = target.with_suffix(".json.tmp")
tmp.write_text(json.dumps(data, indent=2), encoding="utf-8")
tmp.rename(target)
tmp.replace(target)

def flush_logs_to_disk(self, job_id: str) -> None:
"""Batch write unflushed log lines to disk."""
Expand Down
46 changes: 41 additions & 5 deletions app/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,19 +50,55 @@ def build_wrapper_script(
return f"""\
#!/bin/bash
{exports}echo $$ > .pid
SNKMT_ARGS=""
SNKMT_ARGS=()
if {pixi} run python -c "import snakemake_logger_plugin_snkmt" 2>/dev/null; then
SNKMT_ARGS="--logger snkmt --logger-snkmt-db {snkmt}"
SNKMT_ARGS=("--logger" "snkmt" "--logger-snkmt-db" {snkmt})
fi
{pixi} run snakemake $SNKMT_ARGS{configfile_arg}{extra_args} > .stdout.log 2>&1
{pixi} run snakemake "${{SNKMT_ARGS[@]}}"{configfile_arg}{extra_args} > .stdout.log 2>&1
echo $? > .exitcode
"""


def _ps_quote(s: str) -> str:
"""Single-quote a string for PowerShell (literal, no variable expansion)."""
return "'" + s.replace("'", "''") + "'"


def build_windows_wrapper_script(
pixi_path: str,
snkmt_db_path: str,
configfile: str | None,
snakemake_args: list[str] | None,
env_vars: dict[str, str] | None = None,
) -> str:
"""Build the .run.ps1 PowerShell wrapper script for a Snakemake workflow run on Windows."""
configfile_arg = f" --configfile {_ps_quote(configfile)}" if configfile else ""
extra_args = ""
if snakemake_args:
extra_args = " " + " ".join(_ps_quote(a) for a in snakemake_args)
pixi = _ps_quote(pixi_path)
snkmt = _ps_quote(snkmt_db_path)
env_lines = (
"\n".join(f"$env:{k} = {_ps_quote(v)}" for k, v in env_vars.items()) + "\n"
if env_vars
else ""
)
return f"""\
$PID | Out-File -FilePath '.pid' -NoNewline -Encoding ASCII
{env_lines}$snkmtArgs = @()
& {pixi} run python -c 'import snakemake_logger_plugin_snkmt' 2>$null
if ($LASTEXITCODE -eq 0) {{
$snkmtArgs = @('--logger', 'snkmt', '--logger-snkmt-db', {snkmt})
}}
& {pixi} run snakemake @snkmtArgs{configfile_arg}{extra_args} > '.stdout.log' 2>&1
$LASTEXITCODE | Out-File -FilePath '.exitcode' -NoNewline -Encoding ASCII
"""


def rename_with_cleanup(tmp: Path, dest: Path) -> None:
"""Rename tmp to dest, cleaning up tmp on failure."""
"""Atomically replace dest with tmp, cleaning up tmp on failure."""
try:
tmp.rename(dest)
tmp.replace(dest)
except OSError:
with contextlib.suppress(OSError):
tmp.unlink(missing_ok=True)
Expand Down
7 changes: 7 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
local:
scratch_dir: /Users/mikoding/Documents/mikoding/snakedispatch/data/scratch
pixi_path: /Users/mikoding/.pixi/bin/pixi
poll_interval: 5
default_snakemake_args: ["--cores", "1"]

DATA_DIR: /Users/mikoding/Documents/mikoding/snakedispatch/data
10 changes: 10 additions & 0 deletions data/demo-workflow/Snakefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
rule all:
input:
"results/hello.txt"


rule write_hello:
output:
"results/hello.txt"
shell:
"mkdir -p results && echo 'hello from snakedispatch' > {output}"
8 changes: 8 additions & 0 deletions data/demo-workflow/pixi.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[workspace]
name = "snakedispatch-demo-workflow"
version = "0.1.0"
channels = ["conda-forge", "bioconda"]
platforms = ["osx-arm64"]

[dependencies]
snakemake = "*"
Loading