From fd776371cabf07ddc1b20ea8c607f1180758bfad Mon Sep 17 00:00:00 2001 From: mikoding Date: Sun, 24 May 2026 23:28:45 +0700 Subject: [PATCH 1/5] Set up desktop distribution: paths, gitignore, onboarding --- .gitignore | 3 +- app/backends/local.py | 7 +- config.yaml | 7 ++ data/demo-workflow/Snakefile | 10 ++ data/demo-workflow/pixi.toml | 8 ++ data/how-to-work.md | 227 +++++++++++++++++++++++++++++++++++ data/introduction.md | 176 +++++++++++++++++++++++++++ 7 files changed, 434 insertions(+), 4 deletions(-) create mode 100644 config.yaml create mode 100644 data/demo-workflow/Snakefile create mode 100644 data/demo-workflow/pixi.toml create mode 100644 data/how-to-work.md create mode 100644 data/introduction.md diff --git a/.gitignore b/.gitignore index d8dad13..249dd45 100644 --- a/.gitignore +++ b/.gitignore @@ -7,10 +7,11 @@ compose.yml CLAUDE.md .claude/ .venv/ +build/ __pycache__/ *.egg-info/ *.pyc .pytest_cache/ .ruff_cache/ /data -.idea \ No newline at end of file +.idea diff --git a/app/backends/local.py b/app/backends/local.py index ee951f9..2f110b8 100644 --- a/app/backends/local.py +++ b/app/backends/local.py @@ -31,12 +31,13 @@ class LocalBackend(ComputeBackend): def __init__(self, config: LocalConfig) -> None: self._config = config + self._scratch_dir_path = Path(config.scratch_dir).resolve() def work_dir(self, job_id: str) -> str: - return f"{self._config.scratch_dir}/jobs/{job_id}" + return str(self._scratch_dir_path / "jobs" / job_id) def _scratch_dir(self) -> str: - return self._config.scratch_dir + return str(self._scratch_dir_path) async def _run_git_cmd(self, *args: str) -> str: """Run a git subprocess locally, returning stdout.""" @@ -66,7 +67,7 @@ async def prepare( workflow: str, git_ref: str | None = None, ) -> tuple[str, str | None, str | None]: - Path(self._config.scratch_dir).mkdir(parents=True, exist_ok=True) + self._scratch_dir_path.mkdir(parents=True, exist_ok=True) return await super().prepare(job_id, workflow, git_ref) async def setup( diff --git a/config.yaml b/config.yaml new file mode 100644 index 0000000..f7489c8 --- /dev/null +++ b/config.yaml @@ -0,0 +1,7 @@ +local: + scratch_dir: /Users/mikoding/Documents/mikoding/snakedispatch/data/scratch + pixi_path: /Users/mikoding/.pixi/bin/pixi + poll_interval: 5 + default_snakemake_args: ["--cores", "1"] + +DATA_DIR: /Users/mikoding/Documents/mikoding/snakedispatch/data diff --git a/data/demo-workflow/Snakefile b/data/demo-workflow/Snakefile new file mode 100644 index 0000000..b0b94e6 --- /dev/null +++ b/data/demo-workflow/Snakefile @@ -0,0 +1,10 @@ +rule all: + input: + "results/hello.txt" + + +rule write_hello: + output: + "results/hello.txt" + shell: + "mkdir -p results && echo 'hello from snakedispatch' > {output}" diff --git a/data/demo-workflow/pixi.toml b/data/demo-workflow/pixi.toml new file mode 100644 index 0000000..bd4fd4e --- /dev/null +++ b/data/demo-workflow/pixi.toml @@ -0,0 +1,8 @@ +[workspace] +name = "snakedispatch-demo-workflow" +version = "0.1.0" +channels = ["conda-forge", "bioconda"] +platforms = ["osx-arm64"] + +[dependencies] +snakemake = "*" diff --git a/data/how-to-work.md b/data/how-to-work.md new file mode 100644 index 0000000..a845c23 --- /dev/null +++ b/data/how-to-work.md @@ -0,0 +1,227 @@ +# How To Work In This Repo + +This is a practical second-pass guide for making changes safely and quickly. + +## What Kind Of Project This Is + +This repo is small and test-heavy. There is not much indirection. Most changes fall into one of these buckets: + +- API contract change +- job lifecycle change +- backend behavior change +- config/model validation change +- Snakemake metadata query change + +Because the codebase is compact, the fastest path is usually: + +1. find the route or behavior +2. trace into the store/task/backend layer +3. update tests close to that behavior + +## Local Development Expectations + +The repository uses Python packaging via `pyproject.toml` and locks dependencies with `uv.lock`. + +Relevant tooling from [pyproject.toml](/Users/mikoding/Documents/mikoding/snakedispatch/pyproject.toml:1): + +- Python `>=3.12` +- FastAPI +- Pydantic v2 +- `pytest` +- `pytest-asyncio` +- `httpx` +- `ruff` + +The container uses `uvicorn` to run the app, as shown in [Dockerfile](/Users/mikoding/Documents/mikoding/snakedispatch/Dockerfile:44). + +## Recommended First Commands + +If you are setting up locally outside Docker, the usual commands are: + +```bash +uv sync +pytest +ruff check . +``` + +If you want to run the service directly: + +```bash +uv run uvicorn app.main:app --reload +``` + +The docs UI will then be available at: + +- `/docs` +- `/redoc` + +If you want the containerized path, [README.md](/Users/mikoding/Documents/mikoding/snakedispatch/README.md:11) uses: + +```bash +cp compose.example.yml compose.yml +docker compose up -d +``` + +## Configuration Workflow + +The app expects one YAML config file at `./config.yaml` by default, controlled by `SNAKEDISPATCH_CONFIG`. + +Use one of: + +- [config/config.local.example.yaml](/Users/mikoding/Documents/mikoding/snakedispatch/config/config.local.example.yaml:1) +- [config/config.slurm_ssh.example.yaml](/Users/mikoding/Documents/mikoding/snakedispatch/config/config.slurm_ssh.example.yaml:1) + +For local work, prefer the `local` backend unless you are specifically changing remote execution behavior. + +## How To Trace A Change + +### If you are changing an API endpoint + +Start here: + +- [app/routes/jobs.py](/Users/mikoding/Documents/mikoding/snakedispatch/app/routes/jobs.py:98) +- [app/routes/snkmt.py](/Users/mikoding/Documents/mikoding/snakedispatch/app/routes/snkmt.py:178) +- tests: + - [tests/test_api.py](/Users/mikoding/Documents/mikoding/snakedispatch/tests/test_api.py:1) + - [tests/test_snkmt.py](/Users/mikoding/Documents/mikoding/snakedispatch/tests/test_snkmt.py:1) + +Typical pattern: + +1. update request or response model in [app/models.py](/Users/mikoding/Documents/mikoding/snakedispatch/app/models.py:1) if needed +2. update route logic +3. update tests first or immediately after + +### If you are changing job lifecycle or orchestration + +Start here: + +- [app/tasks.py](/Users/mikoding/Documents/mikoding/snakedispatch/app/tasks.py:180) +- [app/store.py](/Users/mikoding/Documents/mikoding/snakedispatch/app/store.py:31) +- tests: + - [tests/test_execute.py](/Users/mikoding/Documents/mikoding/snakedispatch/tests/test_execute.py:1) + - [tests/test_store.py](/Users/mikoding/Documents/mikoding/snakedispatch/tests/test_store.py:1) + +This is where status transitions, finalization, cleanup, cache restore/save, and sync behavior live. + +### If you are changing execution behavior + +Start here: + +- [app/backends/base.py](/Users/mikoding/Documents/mikoding/snakedispatch/app/backends/base.py:23) +- [app/backends/local.py](/Users/mikoding/Documents/mikoding/snakedispatch/app/backends/local.py:24) +- [app/backends/slurm_ssh.py](/Users/mikoding/Documents/mikoding/snakedispatch/app/backends/slurm_ssh.py:97) +- tests: + - [tests/test_base.py](/Users/mikoding/Documents/mikoding/snakedispatch/tests/test_base.py:1) + - [tests/test_local_backend.py](/Users/mikoding/Documents/mikoding/snakedispatch/tests/test_local_backend.py:1) + - [tests/test_slurm_ssh.py](/Users/mikoding/Documents/mikoding/snakedispatch/tests/test_slurm_ssh.py:1) + +Read `LocalBackend` first. It is simpler and reveals the shape of the interface. + +### If you are changing validation or config behavior + +Start here: + +- [app/config.py](/Users/mikoding/Documents/mikoding/snakedispatch/app/config.py:1) +- [app/models.py](/Users/mikoding/Documents/mikoding/snakedispatch/app/models.py:1) +- tests: + - [tests/test_models.py](/Users/mikoding/Documents/mikoding/snakedispatch/tests/test_models.py:1) + - parts of [tests/test_local_backend.py](/Users/mikoding/Documents/mikoding/snakedispatch/tests/test_local_backend.py:1) + +### If you are changing Snakemake metadata behavior + +Start here: + +- [app/snkmt.py](/Users/mikoding/Documents/mikoding/snakedispatch/app/snkmt.py:1) +- [app/routes/snkmt.py](/Users/mikoding/Documents/mikoding/snakedispatch/app/routes/snkmt.py:178) +- tests: + - [tests/test_snkmt.py](/Users/mikoding/Documents/mikoding/snakedispatch/tests/test_snkmt.py:1) + +## Test Strategy + +The tests are one of the strongest parts of this repo. Use them as executable documentation. + +Suggested workflow: + +1. run the narrowest test file first +2. fix failures locally +3. run the broader relevant subset +4. finish with the full suite if your change is cross-cutting + +Examples: + +```bash +pytest tests/test_api.py +pytest tests/test_execute.py +pytest tests/test_local_backend.py +pytest +``` + +## How The Tests Are Built + +[tests/conftest.py](/Users/mikoding/Documents/mikoding/snakedispatch/tests/conftest.py:1) is worth reading early because it shows: + +- how `AppState` is injected +- how the backend is mocked +- how the sample `snkmt.db` is created +- how the ASGI app is exercised with `httpx` + +That fixture setup tells you what the team considers stable boundaries. + +## Common Safe Assumptions + +- Route handlers should stay thin. +- Shared runtime state should live in `AppState`. +- Job lifecycle transitions should go through `JobStore`. +- Backend-specific behavior should stay behind `ComputeBackend`. +- New functionality should come with tests near the touched layer. + +## Common Footguns + +- Do not bypass the store when mutating job state unless there is a strong reason. +- Do not mix backend-specific behavior into routes. +- Be careful with path validation for `configfile`, `extra_files`, `cache_dirs`, and output downloads. +- Remember that logs and `snkmt.db` are intentionally persisted for restart tolerance. +- The service is designed for exactly one configured backend, not many at once. +- `slurm_ssh` has an optional dependency on `asyncssh`; not every environment will have it installed. + +## Good First Debugging Path + +When something is wrong with a submitted job, inspect in this order: + +1. route request validation in [app/routes/jobs.py](/Users/mikoding/Documents/mikoding/snakedispatch/app/routes/jobs.py:98) +2. lifecycle transitions in [app/tasks.py](/Users/mikoding/Documents/mikoding/snakedispatch/app/tasks.py:180) +3. persisted state and logs in [app/store.py](/Users/mikoding/Documents/mikoding/snakedispatch/app/store.py:31) +4. backend launch and monitor logic in the relevant backend file +5. Snakemake metadata sync and query behavior if the issue is in workflow detail endpoints + +## If You Need To Add A New Backend + +The intended extension path is explicitly documented in [app/backends/base.py](/Users/mikoding/Documents/mikoding/snakedispatch/app/backends/base.py:23). + +You would need to: + +1. implement a new `ComputeBackend` subclass +2. add a config model in [app/config.py](/Users/mikoding/Documents/mikoding/snakedispatch/app/config.py:1) +3. register the backend key in config loading +4. update [app/backends/__init__.py](/Users/mikoding/Documents/mikoding/snakedispatch/app/backends/__init__.py:1) +5. add backend-specific tests + +## A Good First Week Plan + +1. Read [app/main.py](/Users/mikoding/Documents/mikoding/snakedispatch/app/main.py:26), [app/routes/jobs.py](/Users/mikoding/Documents/mikoding/snakedispatch/app/routes/jobs.py:98), and [app/tasks.py](/Users/mikoding/Documents/mikoding/snakedispatch/app/tasks.py:180). +2. Run `pytest` and skim failures only if they happen. +3. Read `LocalBackend` end to end. +4. Read [tests/conftest.py](/Users/mikoding/Documents/mikoding/snakedispatch/tests/conftest.py:1) and [tests/test_api.py](/Users/mikoding/Documents/mikoding/snakedispatch/tests/test_api.py:1). +5. Pick one small bugfix or test-only change before attempting a backend-level feature. + +## Short Summary + +The fastest way to work effectively in this repo is to treat it as a compact service with strong boundaries: + +- routes define HTTP behavior +- tasks define execution flow +- store defines persisted state +- backends define environment-specific execution +- tests define expected behavior + +If you preserve those boundaries, most changes stay simple. diff --git a/data/introduction.md b/data/introduction.md new file mode 100644 index 0000000..cde2e32 --- /dev/null +++ b/data/introduction.md @@ -0,0 +1,176 @@ +# Codebase Introduction + +This repository is `snakedispatch`, a small FastAPI service that dispatches Snakemake workflows onto one compute backend and exposes job state over HTTP. + +At a high level: + +1. A client calls `POST /jobs` with a workflow source and optional execution settings. +2. The service creates a job record and starts an async background task. +3. The configured backend prepares the workflow, launches Snakemake, and monitors execution. +4. The service persists job metadata, streams logs, exposes outputs, and syncs Snakemake metadata from `snkmt.db`. + +## What The Service Does + +- Accepts workflow submissions from a Git URL or a local directory. +- Supports one compute backend per deployment: + - `local`: runs on the same machine. + - `slurm_ssh`: connects to a remote SLURM cluster over SSH. +- Stores job state on disk so jobs survive process restarts. +- Streams stdout logs over SSE. +- Exposes output files and Snakemake workflow metadata through REST endpoints. + +## Main Runtime Flow + +The app is wired in [app/main.py](/Users/mikoding/Documents/mikoding/snakedispatch/app/main.py:26). + +- `lifespan()` loads config from YAML plus environment variables. +- It creates: + - a `JobStore` + - one `ComputeBackend` + - shared app state in `request.app.state.app` +- It restores persisted jobs from disk. +- It starts two background loops: + - `gc_loop()` for cleaning old terminal jobs + - `sync_job_data_loop()` for flushing logs and syncing `snkmt.db` + +## The Files That Matter Most + +Read these in order: + +1. [README.md](/Users/mikoding/Documents/mikoding/snakedispatch/README.md:1) +2. [app/main.py](/Users/mikoding/Documents/mikoding/snakedispatch/app/main.py:26) +3. [app/routes/jobs.py](/Users/mikoding/Documents/mikoding/snakedispatch/app/routes/jobs.py:98) +4. [app/tasks.py](/Users/mikoding/Documents/mikoding/snakedispatch/app/tasks.py:180) +5. [app/store.py](/Users/mikoding/Documents/mikoding/snakedispatch/app/store.py:31) +6. [app/backends/base.py](/Users/mikoding/Documents/mikoding/snakedispatch/app/backends/base.py:23) +7. [app/backends/local.py](/Users/mikoding/Documents/mikoding/snakedispatch/app/backends/local.py:24) +8. [app/backends/slurm_ssh.py](/Users/mikoding/Documents/mikoding/snakedispatch/app/backends/slurm_ssh.py:97) +9. [app/routes/snkmt.py](/Users/mikoding/Documents/mikoding/snakedispatch/app/routes/snkmt.py:178) +10. [app/snkmt.py](/Users/mikoding/Documents/mikoding/snakedispatch/app/snkmt.py:1) + +## Mental Model By Layer + +### API Layer + +[app/routes/jobs.py](/Users/mikoding/Documents/mikoding/snakedispatch/app/routes/jobs.py:98) is the main API surface. + +- `POST /jobs`: validate request, create job, spawn async execution +- `GET /jobs`: list jobs +- `GET /jobs/{job_id}`: get one job +- `GET /jobs/{job_id}/logs`: SSE log stream +- `GET /jobs/{job_id}/outputs`: list output files +- `GET /jobs/{job_id}/outputs/{path}`: download one output +- `POST /jobs/{job_id}/cancel`: cancel a running job +- `DELETE /jobs/{job_id}`: remove the job and trigger cleanup + +[app/routes/snkmt.py](/Users/mikoding/Documents/mikoding/snakedispatch/app/routes/snkmt.py:178) exposes read-only workflow data from the Snakemake SQLite database. + +### Execution Layer + +[app/tasks.py](/Users/mikoding/Documents/mikoding/snakedispatch/app/tasks.py:180) is the real execution pipeline. + +The sequence is: + +1. `backend.prepare()` +2. `store.mark_setup()` +3. `backend.setup()` +4. optional cache restore +5. `store.mark_running()` +6. `backend.launch()` +7. `backend.monitor()` +8. collect outputs and final status +9. flush logs and sync final Snakemake data + +This file is the best place to understand job lifecycle transitions. + +### Persistence Layer + +[app/store.py](/Users/mikoding/Documents/mikoding/snakedispatch/app/store.py:31) manages in-memory and on-disk job state. + +Each job record tracks: + +- status +- timestamps +- exit code +- workflow source +- Git ref and SHA +- cached workflow file listing +- log lines +- optional Snakemake progress counters + +Persisted data lives under the configured `DATA_DIR`, typically in `jobs//`. + +### Backend Layer + +[app/backends/base.py](/Users/mikoding/Documents/mikoding/snakedispatch/app/backends/base.py:82) defines the common backend protocol. + +Important idea: the base class already handles Git-backed workflow preparation using a bare repo plus worktrees. Backend implementations mainly differ in how they: + +- execute commands +- copy local workflows +- read files and logs +- clean up processes and directories +- sync `snkmt.db` + +`LocalBackend` is the easiest backend to understand first. `SlurmSSHBackend` adds SSH persistence, SFTP, remote launch, remote status probing, and SLURM-oriented operation. + +## Configuration Model + +[app/config.py](/Users/mikoding/Documents/mikoding/snakedispatch/app/config.py:17) splits configuration into: + +- environment-driven app settings via `Settings` +- YAML-defined backend config via `load_config()` + +The YAML file must define exactly one backend key: + +- `local` +- `slurm_ssh` + +Examples: + +- [config/config.local.example.yaml](/Users/mikoding/Documents/mikoding/snakedispatch/config/config.local.example.yaml:1) +- [config/config.slurm_ssh.example.yaml](/Users/mikoding/Documents/mikoding/snakedispatch/config/config.slurm_ssh.example.yaml:1) + +## Domain-Specific Piece: `snkmt.db` + +The most project-specific part of the service is the Snakemake metadata database. + +- The Snakemake wrapper script tries to enable the `snakemake_logger_plugin_snkmt` plugin. +- That plugin writes workflow metadata to `snkmt.db`. +- The service periodically syncs that DB locally and queries it through [app/snkmt.py](/Users/mikoding/Documents/mikoding/snakedispatch/app/snkmt.py:1). + +That is how the API can expose: + +- workflow counts +- rules +- jobs +- files per job +- rulegraph +- errors + +## Operational Behavior To Keep In Mind + +- This service is restart-tolerant because job metadata and logs are persisted. +- Log streaming uses SSE while a job is running. +- Output downloads are plain streamed HTTP responses after or during execution. +- Old terminal jobs are garbage-collected by age. +- Environment variables passed into workflows are allowlisted by config. +- Most edge-case behavior is covered by tests rather than large amounts of inline documentation. + +## Best Way To Learn The Code + +1. Read [tests/conftest.py](/Users/mikoding/Documents/mikoding/snakedispatch/tests/conftest.py:1) to see how the app is assembled in tests. +2. Read [tests/test_api.py](/Users/mikoding/Documents/mikoding/snakedispatch/tests/test_api.py:1) to understand expected endpoint behavior. +3. Follow one job from `POST /jobs` through [app/routes/jobs.py](/Users/mikoding/Documents/mikoding/snakedispatch/app/routes/jobs.py:98), [app/tasks.py](/Users/mikoding/Documents/mikoding/snakedispatch/app/tasks.py:180), and the chosen backend. +4. Read `LocalBackend` before `SlurmSSHBackend`. + +## Short Summary + +This is not a large framework-heavy codebase. It is a compact service with four key concepts: + +- HTTP routes +- job execution orchestration +- backend abstraction +- persisted job state plus Snakemake metadata + +If you understand those four pieces, you will be productive quickly. From 9821b2f4673fe6235a2ded5c767f02734dafd30b Mon Sep 17 00:00:00 2001 From: mikoding Date: Sun, 24 May 2026 23:28:51 +0700 Subject: [PATCH 2/5] fix: prune stale worktree before git worktree add When a job directory already exists from a previous failed or cancelled run, git worktree add fails with exit 128 "already exists". Prune stale worktree refs and remove the directory beforehand so retries succeed. --- app/backends/base.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/app/backends/base.py b/app/backends/base.py index 8c50f7d..01fd3bc 100644 --- a/app/backends/base.py +++ b/app/backends/base.py @@ -2,6 +2,7 @@ import asyncio import logging +import shutil from abc import ABC, abstractmethod from pathlib import Path, PurePosixPath from typing import TYPE_CHECKING @@ -122,7 +123,12 @@ async def prepare( f"{resolved_ref}:{job_ref}", ) - # 4. Create worktree (detached HEAD) + # 4. Create worktree (detached HEAD). + # Prune stale entries first, then remove any leftover directory so a + # retry after a failed/cancelled run doesn't hit "already exists". + await self._run_git_cmd("git", "-C", repo_dir, "worktree", "prune") + if Path(work_dir).exists(): + shutil.rmtree(work_dir) await self._run_git_cmd( "git", "-C", From 45cb67b40556e8fb01e77a5d2bf6fcf0e4766ded Mon Sep 17 00:00:00 2001 From: mikoding Date: Sun, 24 May 2026 23:28:57 +0700 Subject: [PATCH 3/5] fix: use bash array for SNKMT_ARGS to handle paths with spaces Paths under ~/Library/Application Support contain a space. Storing the shlex-quoted path in a plain string variable and expanding it unquoted caused word-splitting, so Snakemake received a truncated path as a positional target. Switch to a bash array and use "${SNKMT_ARGS[@]}" to preserve the path as a single argument. --- app/utils.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/app/utils.py b/app/utils.py index 16d768b..20177e4 100644 --- a/app/utils.py +++ b/app/utils.py @@ -50,11 +50,11 @@ def build_wrapper_script( return f"""\ #!/bin/bash {exports}echo $$ > .pid -SNKMT_ARGS="" +SNKMT_ARGS=() if {pixi} run python -c "import snakemake_logger_plugin_snkmt" 2>/dev/null; then - SNKMT_ARGS="--logger snkmt --logger-snkmt-db {snkmt}" + SNKMT_ARGS=("--logger" "snkmt" "--logger-snkmt-db" {snkmt}) fi -{pixi} run snakemake $SNKMT_ARGS{configfile_arg}{extra_args} > .stdout.log 2>&1 +{pixi} run snakemake "${{SNKMT_ARGS[@]}}"{configfile_arg}{extra_args} > .stdout.log 2>&1 echo $? > .exitcode """ From 66bf601ead646c683466dfad54b980246008f385 Mon Sep 17 00:00:00 2001 From: mikoding Date: Fri, 22 May 2026 17:30:09 +0700 Subject: [PATCH 4/5] Fix snakedispatch log handling and db sync --- app/backends/local.py | 7 +++++++ app/routes/jobs.py | 39 +++++++++++++++++++++++++++++++++++++ tests/test_api.py | 37 +++++++++++++++++++++++++++++++++++ tests/test_local_backend.py | 19 ++++++++++++++++++ 4 files changed, 102 insertions(+) diff --git a/app/backends/local.py b/app/backends/local.py index 2f110b8..2995359 100644 --- a/app/backends/local.py +++ b/app/backends/local.py @@ -278,6 +278,13 @@ async def sync_snkmt_db(self, job_id: str, work_dir: str, local_path: Path) -> N src = Path(work_dir).resolve() / SNKMT_DB_FILENAME if not src.exists(): return + if src == local_path.resolve(): + logger.debug( + "Skipping snkmt.db self-sync for job %s because source and destination are identical: %s", + job_id, + src, + ) + return local_path.parent.mkdir(parents=True, exist_ok=True) def _backup() -> None: diff --git a/app/routes/jobs.py b/app/routes/jobs.py index 4746b2c..379b666 100644 --- a/app/routes/jobs.py +++ b/app/routes/jobs.py @@ -3,6 +3,8 @@ import asyncio import logging import mimetypes +import subprocess +import sys import uuid from pathlib import Path as FilePath from pathlib import PurePosixPath @@ -70,6 +72,30 @@ def _validate_output_path(path: str) -> str: return normalized +def _reveal_path(path: FilePath) -> str: + """Reveal a path in the local file manager, selecting the file when supported.""" + if not path.exists(): + raise HTTPException(status_code=404, detail="File not found") + + if sys.platform == "darwin": + cmd = ["open", "-R", str(path)] + message = f"Revealed {path.name} in Finder" + elif sys.platform == "win32": + cmd = ["explorer", f"/select,{path}"] + message = f"Revealed {path.name} in Explorer" + else: + cmd = ["xdg-open", str(path.parent)] + message = f"Opened {path.parent}" + + try: + subprocess.Popen(cmd) # noqa: S603 + except OSError as exc: + raise HTTPException( + status_code=500, detail=f"Failed to reveal file: {exc}" + ) from exc + return message + + def _build_outputs_response( files_raw: list[WorkflowFileInfo], ) -> JobOutputsResponse: @@ -223,6 +249,19 @@ async def event_generator() -> AsyncGenerator[dict[str, str], None]: return EventSourceResponse(event_generator()) +@router.post("/jobs/{job_id}/logs/reveal") +async def reveal_log_file( + job_id: Annotated[str, Path()], + store: Annotated[JobStore, Depends(get_store)], +) -> dict[str, str]: + """Reveal the persisted log file in the local file manager.""" + require_job(store, job_id) + log_path = store.get_log_path(job_id) + if log_path is None: + raise HTTPException(status_code=500, detail="Persistence not configured") + return {"message": _reveal_path(log_path)} + + @router.get("/jobs/{job_id}/outputs", response_model=JobOutputsResponse) async def list_outputs( job_id: Annotated[str, Path()], diff --git a/tests/test_api.py b/tests/test_api.py index 127e8aa..1efa607 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -388,6 +388,43 @@ async def test_log_eviction_streams_available(self, async_client, store): assert new_offset == MAX_IN_MEMORY_LINES + 100 +class TestRevealLogFile: + async def test_reveals_persisted_log(self, async_client, store, monkeypatch): + job_id = "test-reveal-log" + store.create_job(job_id) + store.mark_finished(job_id, 0) + log_path = store.get_log_path(job_id) + assert log_path is not None + log_path.parent.mkdir(parents=True, exist_ok=True) + log_path.write_text("hello\n") + + calls: list[list[str]] = [] + + def fake_popen(cmd: list[str], *args, **kwargs): + calls.append(cmd) + + class _Proc: + pass + + return _Proc() + + monkeypatch.setattr("app.routes.jobs.subprocess.Popen", fake_popen) + monkeypatch.setattr("app.routes.jobs.sys.platform", "darwin") + + response = await async_client.post(f"/jobs/{job_id}/logs/reveal") + assert response.status_code == 200 + assert response.json()["message"] == f"Revealed {log_path.name} in Finder" + assert calls == [["open", "-R", str(log_path)]] + + async def test_missing_log_returns_404(self, async_client, store): + job_id = "test-reveal-missing" + store.create_job(job_id) + store.mark_finished(job_id, 0) + + response = await async_client.post(f"/jobs/{job_id}/logs/reveal") + assert response.status_code == 404 + + class TestHealth: async def test_returns_status(self, async_client, mock_backend): mock_backend.check_connectivity = AsyncMock(return_value=True) diff --git a/tests/test_local_backend.py b/tests/test_local_backend.py index b638805..2c1571e 100644 --- a/tests/test_local_backend.py +++ b/tests/test_local_backend.py @@ -381,6 +381,25 @@ async def test_noop_when_snkmt_db_missing(self, backend, tmp_path): assert not local_path.exists() + async def test_noop_when_source_and_destination_are_same(self, backend, tmp_path): + import sqlite3 + + work_dir = tmp_path / "scratch" / "jobs" / "job-self-sync" + work_dir.mkdir(parents=True) + src_db = work_dir / "snkmt.db" + conn = sqlite3.connect(str(src_db)) + conn.execute("CREATE TABLE workflows (id TEXT PRIMARY KEY)") + conn.execute("INSERT INTO workflows VALUES ('wf-1')") + conn.commit() + conn.close() + + await backend.sync_snkmt_db("job-self-sync", str(work_dir), src_db) + + conn2 = sqlite3.connect(str(src_db)) + rows = conn2.execute("SELECT id FROM workflows").fetchall() + conn2.close() + assert rows == [("wf-1",)] + class TestCheckConnectivity: async def test_returns_true(self, backend): From 555300d7238893eb8e07515903093fb5e168d2e6 Mon Sep 17 00:00:00 2001 From: mikoding Date: Tue, 26 May 2026 17:31:21 +0700 Subject: [PATCH 5/5] fix: Windows compatibility for atomic writes and job launch - Use Path.replace() instead of Path.rename() to fix [WinError 183] (rename fails on Windows if destination already exists) - Replace nohup/bash launcher with PowerShell wrapper on Windows; use 'cmd /c start /b powershell -File .run.ps1' to detach the process - Guard signal.SIGKILL in cleanup() (not defined on Windows) --- app/backends/local.py | 55 ++++++++++++++++++++++++++++++++----------- app/store.py | 2 +- app/utils.py | 40 +++++++++++++++++++++++++++++-- 3 files changed, 80 insertions(+), 17 deletions(-) diff --git a/app/backends/local.py b/app/backends/local.py index 2995359..8aca297 100644 --- a/app/backends/local.py +++ b/app/backends/local.py @@ -8,6 +8,7 @@ import shutil import signal import sqlite3 +import sys from pathlib import Path from typing import TYPE_CHECKING @@ -15,6 +16,7 @@ from app.config import LocalConfig from app.models import WorkflowFileInfo from app.utils import ( + build_windows_wrapper_script, build_wrapper_script, enforce_error_limit, rename_with_cleanup, @@ -98,23 +100,47 @@ async def launch( ) wd = Path(work_dir) - script_path = wd / ".run.sh" - await asyncio.to_thread(script_path.write_text, wrapper_content) - await asyncio.to_thread(script_path.chmod, 0o755) - - proc = await asyncio.create_subprocess_shell( - "nohup bash " - f"{shlex.quote(script_path.name)}" - " < /dev/null > /dev/null 2>&1 &", - cwd=work_dir, - ) - await proc.wait() - pid_path = wd / ".pid" async def _pid_exists() -> bool: return await asyncio.to_thread(pid_path.exists) + if sys.platform == "win32": + wrapper_content = build_windows_wrapper_script( + self._config.pixi_path, snkmt_db_path, configfile, snakemake_args, env_vars + ) + script_path = wd / ".run.ps1" + await asyncio.to_thread(script_path.write_text, wrapper_content, encoding="utf-8") + + # cmd /c start /b detaches PowerShell immediately (equivalent to Unix &). + # PowerShell writes $PID to .pid so we can track and kill the process. + proc = await asyncio.create_subprocess_exec( + "cmd", "/c", "start", "/b", "powershell", + "-NonInteractive", "-NoProfile", + "-ExecutionPolicy", "Bypass", + "-File", str(script_path), + stdin=asyncio.subprocess.DEVNULL, + stdout=asyncio.subprocess.DEVNULL, + stderr=asyncio.subprocess.DEVNULL, + cwd=work_dir, + ) + await proc.wait() + else: + wrapper_content = build_wrapper_script( + self._config.pixi_path, snkmt_db_path, configfile, snakemake_args, env_vars + ) + script_path = wd / ".run.sh" + await asyncio.to_thread(script_path.write_text, wrapper_content) + await asyncio.to_thread(script_path.chmod, 0o755) + + proc = await asyncio.create_subprocess_shell( + "nohup bash " + f"{shlex.quote(script_path.name)}" + " < /dev/null > /dev/null 2>&1 &", + cwd=work_dir, + ) + await proc.wait() + await self._poll_until_pid_file(job_id, work_dir, _pid_exists) async def monitor( @@ -319,8 +345,9 @@ async def cleanup( try: pid = int(pid_path.read_text().strip()) os.kill(pid, signal.SIGTERM) - await asyncio.sleep(5) - os.kill(pid, signal.SIGKILL) + if hasattr(signal, "SIGKILL"): # not defined on Windows + await asyncio.sleep(5) + os.kill(pid, signal.SIGKILL) except ProcessLookupError as exc: logger.debug("Process already gone for job %s: %s", job_id, exc) except (ValueError, OSError) as exc: diff --git a/app/store.py b/app/store.py index 2e7e312..5633999 100644 --- a/app/store.py +++ b/app/store.py @@ -101,7 +101,7 @@ def persist(self, record: JobRecord) -> None: target = job_dir.joinpath("job.json") tmp = target.with_suffix(".json.tmp") tmp.write_text(json.dumps(data, indent=2), encoding="utf-8") - tmp.rename(target) + tmp.replace(target) def flush_logs_to_disk(self, job_id: str) -> None: """Batch write unflushed log lines to disk.""" diff --git a/app/utils.py b/app/utils.py index 20177e4..5aaa8c9 100644 --- a/app/utils.py +++ b/app/utils.py @@ -59,10 +59,46 @@ def build_wrapper_script( """ +def _ps_quote(s: str) -> str: + """Single-quote a string for PowerShell (literal, no variable expansion).""" + return "'" + s.replace("'", "''") + "'" + + +def build_windows_wrapper_script( + pixi_path: str, + snkmt_db_path: str, + configfile: str | None, + snakemake_args: list[str] | None, + env_vars: dict[str, str] | None = None, +) -> str: + """Build the .run.ps1 PowerShell wrapper script for a Snakemake workflow run on Windows.""" + configfile_arg = f" --configfile {_ps_quote(configfile)}" if configfile else "" + extra_args = "" + if snakemake_args: + extra_args = " " + " ".join(_ps_quote(a) for a in snakemake_args) + pixi = _ps_quote(pixi_path) + snkmt = _ps_quote(snkmt_db_path) + env_lines = ( + "\n".join(f"$env:{k} = {_ps_quote(v)}" for k, v in env_vars.items()) + "\n" + if env_vars + else "" + ) + return f"""\ +$PID | Out-File -FilePath '.pid' -NoNewline -Encoding ASCII +{env_lines}$snkmtArgs = @() +& {pixi} run python -c 'import snakemake_logger_plugin_snkmt' 2>$null +if ($LASTEXITCODE -eq 0) {{ + $snkmtArgs = @('--logger', 'snkmt', '--logger-snkmt-db', {snkmt}) +}} +& {pixi} run snakemake @snkmtArgs{configfile_arg}{extra_args} > '.stdout.log' 2>&1 +$LASTEXITCODE | Out-File -FilePath '.exitcode' -NoNewline -Encoding ASCII +""" + + def rename_with_cleanup(tmp: Path, dest: Path) -> None: - """Rename tmp to dest, cleaning up tmp on failure.""" + """Atomically replace dest with tmp, cleaning up tmp on failure.""" try: - tmp.rename(dest) + tmp.replace(dest) except OSError: with contextlib.suppress(OSError): tmp.unlink(missing_ok=True)