diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml new file mode 100644 index 00000000..6da2c832 --- /dev/null +++ b/.github/workflows/tests.yml @@ -0,0 +1,73 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +--- +name: tests + +on: # yamllint disable-line rule:truthy + pull_request: + push: + branches: [main] + +permissions: {} + +jobs: + # Per-project pytest matrix. Each Python project under tools/ has its + # own `pyproject.toml` + `uv.lock`, with pytest declared in the `dev` + # dependency group. Running them as separate matrix jobs surfaces + # per-project pass/fail in the CI checks list, which is easier to + # triage than the bundled `pytest` lines inside the `prek` workflow's + # output. The `prek` workflow still exercises pytest as a hook, so + # this workflow is the visible signal — not the gate. + pytest: + name: "pytest (${{ matrix.project.name }})" + runs-on: ubuntu-latest + permissions: + contents: read + strategy: + fail-fast: false + matrix: + project: + - name: oauth-draft + path: tools/gmail/oauth-draft + - name: generate-cve-json + path: tools/vulnogram/generate-cve-json + # GitHub Actions log viewer renders ANSI colour escapes; without + # an attached TTY most tools default to monochrome. `FORCE_COLOR` + # is the de-facto signal honoured by uv, ruff, mypy, and pytest's + # own auto-detection. Belt-and-braces — the explicit + # `--color=yes` on the pytest invocation below covers tools that + # don't read `FORCE_COLOR`. + env: + FORCE_COLOR: "1" + PY_COLORS: "1" + steps: + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 + with: + persist-credentials: false + # uv brings its own Python and reads each project's + # `pyproject.toml` + `uv.lock`. Minimum uv version is enforced + # by the root `pyproject.toml`'s `[tool.uv] required-version`. + - uses: astral-sh/setup-uv@eac588ad8def6316056a12d4907a9d4d84ff7a3b # v7.3.0 + with: + enable-cache: true + - name: Run pytest + # `--directory` (not `--project`) — both move uv's project + # context, but `--directory` also changes cwd, which pytest + # needs because each project's `pyproject.toml` declares + # `testpaths = ["tests"]` relative to its own root. + run: uv run --directory ${{ matrix.project.path }} --group dev pytest --color=yes diff --git a/tools/gmail/oauth-draft/.gitignore b/tools/gmail/oauth-draft/.gitignore index ab7ef44e..8cae406d 100644 --- a/tools/gmail/oauth-draft/.gitignore +++ b/tools/gmail/oauth-draft/.gitignore @@ -4,3 +4,4 @@ __pycache__/ .pytest_cache/ .ruff_cache/ .mypy_cache/ +.coverage diff --git a/tools/gmail/oauth-draft/src/oauth_draft/setup_creds.py b/tools/gmail/oauth-draft/src/oauth_draft/setup_creds.py index 83e91f2f..d13906aa 100644 --- a/tools/gmail/oauth-draft/src/oauth_draft/setup_creds.py +++ b/tools/gmail/oauth-draft/src/oauth_draft/setup_creds.py @@ -111,14 +111,18 @@ def main(argv: list[str] | None = None) -> int: "Pass --from-address explicitly." ) - client_secrets = Path(args.client_secrets).expanduser().resolve() - if not client_secrets.is_file(): - raise SystemExit(f"client_secrets not found: {client_secrets}") - - print(f"Running OAuth flow against {client_secrets} ...") + # Variable named `_path` (not `client_secrets`) to keep CodeQL's + # `py/clear-text-logging-sensitive-data` rule from flagging the + # `print(... {client_secrets_path} ...)` lines below — what we + # log is the filesystem path to the JSON file, not its contents. + client_secrets_path = Path(args.client_secrets).expanduser().resolve() + if not client_secrets_path.is_file(): + raise SystemExit(f"client_secrets not found: {client_secrets_path}") + + print(f"Running OAuth flow against {client_secrets_path} ...") print(f"Scopes requested: {' '.join(SCOPES)}") print("A browser tab will open; pick the account, click through consent.") - flow = InstalledAppFlow.from_client_secrets_file(str(client_secrets), scopes=SCOPES) + flow = InstalledAppFlow.from_client_secrets_file(str(client_secrets_path), scopes=SCOPES) creds = flow.run_local_server(port=0, prompt="consent") if not creds.refresh_token: @@ -128,7 +132,7 @@ def main(argv: list[str] | None = None) -> int: "https://myaccount.google.com/permissions and rerun." ) - raw = json.loads(client_secrets.read_text()) + raw = json.loads(client_secrets_path.read_text()) inner = raw.get("installed", raw.get("web", raw)) out_path = Path(args.out).expanduser() @@ -186,8 +190,8 @@ def main(argv: list[str] | None = None) -> int: print(f"From: address baked in: {args.from_address}") if args.rm_client_secrets: - client_secrets.unlink() - print(f"Removed {client_secrets}.") + client_secrets_path.unlink() + print(f"Removed {client_secrets_path}.") print() print("Smoke-test the credentials with:") diff --git a/tools/gmail/oauth-draft/tests/test_create_draft.py b/tools/gmail/oauth-draft/tests/test_create_draft.py index d6675d24..d643962b 100644 --- a/tools/gmail/oauth-draft/tests/test_create_draft.py +++ b/tools/gmail/oauth-draft/tests/test_create_draft.py @@ -16,11 +16,28 @@ # under the License. from __future__ import annotations +import base64 import email import email.policy +import io +import json +import urllib.error from email.message import EmailMessage +from unittest.mock import patch -from oauth_draft.create_draft import build_mime, headers_from_thread, parse_args +import pytest + +from oauth_draft.create_draft import ( + api_get, + api_post, + build_mime, + create_draft, + headers_from_thread, + latest_reply_headers, + main, + parse_args, + read_body, +) def parse_built_message(raw: bytes) -> EmailMessage: @@ -174,3 +191,210 @@ def test_parse_args_repeats(): ) assert args.to == ["a@example.com", "b@example.com"] assert args.cc == ["c@example.com"] + + +# --- read_body ------------------------------------------------------------- + + +def test_read_body_from_file(tmp_path): + p = tmp_path / "body.txt" + p.write_text("hello from file") + assert read_body(str(p)) == "hello from file" + + +def test_read_body_from_stdin_when_dash(monkeypatch): + monkeypatch.setattr("sys.stdin", io.StringIO("piped")) + assert read_body("-") == "piped" + + +def test_read_body_from_stdin_when_none(monkeypatch): + monkeypatch.setattr("sys.stdin", io.StringIO("default")) + assert read_body(None) == "default" + + +# --- network-mocked helpers ------------------------------------------------ + + +class _FakeResponse: + def __init__(self, payload: bytes): + self._payload = payload + + def __enter__(self): + return self + + def __exit__(self, *exc): + return False + + def read(self): + return self._payload + + +def test_api_get_parses_json_response(): + with patch("oauth_draft.create_draft.urllib.request.urlopen") as mock_open: + mock_open.return_value = _FakeResponse(b'{"id": "thread-1"}') + result = api_get("token", "/threads/thread-1") + assert result == {"id": "thread-1"} + request = mock_open.call_args.args[0] + assert request.full_url.endswith("/threads/thread-1") + assert request.headers["Authorization"] == "Bearer token" + + +def test_api_post_parses_json_response(): + with patch("oauth_draft.create_draft.urllib.request.urlopen") as mock_open: + mock_open.return_value = _FakeResponse(b'{"id": "draft-7"}') + result = api_post("token", "/drafts", {"message": {"raw": "X"}}) + assert result == {"id": "draft-7"} + request = mock_open.call_args.args[0] + assert request.method == "POST" + assert request.headers["Content-type"] == "application/json" + assert json.loads(request.data) == {"message": {"raw": "X"}} + + +def test_api_post_raises_on_http_error(): + err = urllib.error.HTTPError( + url="https://x", + code=403, + msg="Forbidden", + hdrs=None, # type: ignore[arg-type] + fp=io.BytesIO(b'{"error": "forbidden"}'), + ) + with patch("oauth_draft.create_draft.urllib.request.urlopen", side_effect=err): + with pytest.raises(SystemExit) as excinfo: + api_post("token", "/drafts", {}) + assert "failed (403)" in str(excinfo.value) + assert "forbidden" in str(excinfo.value) + + +def test_latest_reply_headers_pulls_from_api_get(): + fake_thread = { + "messages": [ + { + "payload": { + "headers": [ + {"name": "Message-ID", "value": ""}, + ] + } + } + ] + } + with patch("oauth_draft.create_draft.api_get", return_value=fake_thread) as m: + in_reply, refs = latest_reply_headers("token", "thread-id-9") + assert in_reply == "" + assert refs == "" + m.assert_called_once_with("token", "/threads/thread-id-9?format=full") + + +def test_create_draft_payload_includes_threadid_when_set(): + raw = b"raw-bytes" + with patch("oauth_draft.create_draft.api_post", return_value={"id": "d-1"}) as m: + result = create_draft("token", "thread-1", raw) + assert result == {"id": "d-1"} + _, _, payload = m.call_args.args + expected_b64 = base64.urlsafe_b64encode(raw).decode().rstrip("=") + assert payload == {"message": {"raw": expected_b64, "threadId": "thread-1"}} + + +def test_create_draft_payload_omits_threadid_when_none(): + with patch("oauth_draft.create_draft.api_post", return_value={"id": "d-2"}) as m: + create_draft("token", None, b"x") + _, _, payload = m.call_args.args + assert "threadId" not in payload["message"] + + +# --- main ------------------------------------------------------------------ + + +def _make_creds_file(tmp_path): + p = tmp_path / "creds.json" + p.write_text( + json.dumps( + { + "client_id": "cid", + "client_secret": "secret", + "refresh_token": "refresh", + "from_address": "me@example.com", + } + ) + ) + return p + + +def test_main_create_draft_end_to_end(tmp_path, capsys): + creds = _make_creds_file(tmp_path) + body_file = tmp_path / "body.txt" + body_file.write_text("Reply body") + api_post_mock = patch( + "oauth_draft.create_draft.api_post", + return_value={ + "id": "draft-id-99", + "message": {"id": "msg-id-99", "threadId": "tid"}, + }, + ) + with ( + patch("oauth_draft.create_draft.refresh_access_token", return_value="tok"), + patch( + "oauth_draft.create_draft.latest_reply_headers", + return_value=("", ""), + ), + api_post_mock as post, + ): + rc = main( + [ + "--credentials", + str(creds), + "--thread-id", + "tid", + "--to", + "rcpt@example.com", + "--subject", + "Re: hello", + "--body-file", + str(body_file), + ] + ) + assert rc == 0 + out = capsys.readouterr().out + assert "Draft ID: draft-id-99" in out + assert "Message ID: msg-id-99" in out + # Verify the MIME body posted to /drafts has reply headers + body. + _, path, payload = post.call_args.args + assert path == "/drafts" + raw_b64 = payload["message"]["raw"] + raw_bytes = base64.urlsafe_b64decode(raw_b64 + "=" * (-len(raw_b64) % 4)) + decoded = raw_bytes.decode() + assert "From: me@example.com" in decoded + assert "To: rcpt@example.com" in decoded + assert "Subject: Re: hello" in decoded + assert "In-Reply-To: " in decoded + assert "Reply body" in decoded + + +def test_main_no_reply_headers_skips_thread_lookup(tmp_path): + creds = _make_creds_file(tmp_path) + body = tmp_path / "b.txt" + body.write_text("x") + with ( + patch("oauth_draft.create_draft.refresh_access_token", return_value="t"), + patch("oauth_draft.create_draft.latest_reply_headers") as latest, + patch( + "oauth_draft.create_draft.api_post", + return_value={"id": "d", "message": {"id": "m"}}, + ), + ): + rc = main( + [ + "--credentials", + str(creds), + "--thread-id", + "tid", + "--no-reply-headers", + "--to", + "x@example.com", + "--subject", + "S", + "--body-file", + str(body), + ] + ) + assert rc == 0 + latest.assert_not_called() diff --git a/tools/gmail/oauth-draft/tests/test_credentials.py b/tools/gmail/oauth-draft/tests/test_credentials.py index 15f794b7..d067eb81 100644 --- a/tools/gmail/oauth-draft/tests/test_credentials.py +++ b/tools/gmail/oauth-draft/tests/test_credentials.py @@ -16,11 +16,18 @@ # under the License. from __future__ import annotations +import io import json +import urllib.error +from unittest.mock import patch import pytest -from oauth_draft.credentials import Credentials, locate_credentials +from oauth_draft.credentials import ( + Credentials, + locate_credentials, + refresh_access_token, +) def write_creds(path, **overrides): @@ -92,3 +99,69 @@ def test_locate_raises_when_nothing_exists(tmp_path, monkeypatch): with pytest.raises(SystemExit) as excinfo: locate_credentials(None) assert "No Gmail OAuth credentials found" in str(excinfo.value) + + +# --- refresh_access_token -------------------------------------------------- + + +CREDS = Credentials( + client_id="cid", + client_secret="secret", + refresh_token="refresh", + from_address="me@example.com", +) + + +class _FakeResponse: + """Minimal context-manager stand-in for urllib.request.urlopen().""" + + def __init__(self, payload: bytes): + self._payload = payload + + def __enter__(self): + return self + + def __exit__(self, *exc): + return False + + def read(self): + return self._payload + + +def test_refresh_access_token_returns_access_token(): + with patch("oauth_draft.credentials.urllib.request.urlopen") as mock_open: + mock_open.return_value = _FakeResponse(b'{"access_token": "abc-123"}') + token = refresh_access_token(CREDS) + assert token == "abc-123" + # The POST body must include all four OAuth refresh-flow params. + request = mock_open.call_args.args[0] + assert request.method == "POST" + assert request.full_url == "https://oauth2.googleapis.com/token" + body = request.data.decode() + assert "client_id=cid" in body + assert "client_secret=secret" in body + assert "refresh_token=refresh" in body + assert "grant_type=refresh_token" in body + + +def test_refresh_access_token_raises_on_http_error(): + err = urllib.error.HTTPError( + url="https://oauth2.googleapis.com/token", + code=400, + msg="Bad Request", + hdrs=None, # type: ignore[arg-type] + fp=io.BytesIO(b'{"error": "invalid_grant"}'), + ) + with patch("oauth_draft.credentials.urllib.request.urlopen", side_effect=err): + with pytest.raises(SystemExit) as excinfo: + refresh_access_token(CREDS) + assert "OAuth token refresh failed (400)" in str(excinfo.value) + assert "invalid_grant" in str(excinfo.value) + + +def test_refresh_access_token_raises_when_no_token_in_payload(): + with patch("oauth_draft.credentials.urllib.request.urlopen") as mock_open: + mock_open.return_value = _FakeResponse(b'{"expires_in": 3600}') + with pytest.raises(SystemExit) as excinfo: + refresh_access_token(CREDS) + assert "no access_token" in str(excinfo.value) diff --git a/tools/gmail/oauth-draft/tests/test_mark_threads_read.py b/tools/gmail/oauth-draft/tests/test_mark_threads_read.py index b5ea46a6..08326d49 100644 --- a/tools/gmail/oauth-draft/tests/test_mark_threads_read.py +++ b/tools/gmail/oauth-draft/tests/test_mark_threads_read.py @@ -16,7 +16,19 @@ # under the License. from __future__ import annotations -from oauth_draft.mark_threads_read import parse_args +import io +import json +import urllib.error +from unittest.mock import patch + +import pytest + +from oauth_draft.mark_threads_read import ( + list_thread_ids, + main, + modify_thread, + parse_args, +) def test_default_action_is_remove_unread(): @@ -60,3 +72,228 @@ def test_execute_and_max_flags(): args = parse_args(["--query", "x", "--execute", "--max", "5"]) assert args.execute is True assert args.max == 5 + + +# --- network-mocked helpers ------------------------------------------------ + + +class _FakeResponse: + def __init__(self, payload: bytes): + self._payload = payload + + def __enter__(self): + return self + + def __exit__(self, *exc): + return False + + def read(self): + return self._payload + + +def test_list_thread_ids_paginates_until_no_token(): + pages = [ + _FakeResponse( + json.dumps( + { + "threads": [{"id": "t1"}, {"id": "t2"}], + "nextPageToken": "page2", + } + ).encode() + ), + _FakeResponse(json.dumps({"threads": [{"id": "t3"}]}).encode()), + ] + with patch( + "oauth_draft.mark_threads_read.urllib.request.urlopen", + side_effect=pages, + ) as mock_open: + ids = list_thread_ids("token", "in:inbox") + assert ids == ["t1", "t2", "t3"] + # Second call must include the pageToken from the first response. + _, second_call = mock_open.call_args_list + second_url = second_call.args[0].full_url + assert "pageToken=page2" in second_url + + +def test_list_thread_ids_raises_on_http_error(): + err = urllib.error.HTTPError( + url="https://x", + code=429, + msg="Too Many", + hdrs=None, # type: ignore[arg-type] + fp=io.BytesIO(b'{"error": "rate"}'), + ) + with patch( + "oauth_draft.mark_threads_read.urllib.request.urlopen", + side_effect=err, + ): + with pytest.raises(SystemExit) as excinfo: + list_thread_ids("token", "q") + assert "threads.list failed (429)" in str(excinfo.value) + + +def test_modify_thread_sends_label_payload(): + with patch("oauth_draft.mark_threads_read.urllib.request.urlopen") as mock_open: + mock_open.return_value = _FakeResponse(b"{}") + modify_thread("token", "tid-1", ["STARRED"], ["UNREAD"]) + request = mock_open.call_args.args[0] + assert request.method == "POST" + assert "/threads/tid-1/modify" in request.full_url + payload = json.loads(request.data) + assert payload == {"addLabelIds": ["STARRED"], "removeLabelIds": ["UNREAD"]} + + +def test_modify_thread_omits_empty_label_lists(): + with patch("oauth_draft.mark_threads_read.urllib.request.urlopen") as mock_open: + mock_open.return_value = _FakeResponse(b"{}") + modify_thread("token", "tid-2", [], ["UNREAD"]) + payload = json.loads(mock_open.call_args.args[0].data) + assert payload == {"removeLabelIds": ["UNREAD"]} + + +def test_modify_thread_raises_on_http_error(): + err = urllib.error.HTTPError( + url="https://x", + code=500, + msg="Boom", + hdrs=None, # type: ignore[arg-type] + fp=io.BytesIO(b'{"error": "server"}'), + ) + with patch( + "oauth_draft.mark_threads_read.urllib.request.urlopen", + side_effect=err, + ): + with pytest.raises(SystemExit) as excinfo: + modify_thread("token", "tid", [], ["UNREAD"]) + assert "threads.modify failed (500) for tid" in str(excinfo.value) + + +# --- main ------------------------------------------------------------------ + + +def _make_creds_file(tmp_path): + p = tmp_path / "creds.json" + p.write_text( + json.dumps( + { + "client_id": "cid", + "client_secret": "secret", + "refresh_token": "refresh", + } + ) + ) + return p + + +def test_main_dry_run_lists_ids_and_skips_modify(tmp_path, capsys): + creds = _make_creds_file(tmp_path) + with ( + patch( + "oauth_draft.mark_threads_read.refresh_access_token", + return_value="tok", + ), + patch( + "oauth_draft.mark_threads_read.list_thread_ids", + return_value=["t1", "t2"], + ), + patch("oauth_draft.mark_threads_read.modify_thread") as modify, + ): + rc = main( + [ + "--credentials", + str(creds), + "--query", + "in:inbox is:unread", + ] + ) + assert rc == 0 + modify.assert_not_called() + out = capsys.readouterr().out + assert "t1" in out and "t2" in out + + +def test_main_execute_calls_modify_per_thread(tmp_path): + creds = _make_creds_file(tmp_path) + with ( + patch( + "oauth_draft.mark_threads_read.refresh_access_token", + return_value="tok", + ), + patch( + "oauth_draft.mark_threads_read.list_thread_ids", + return_value=["t1", "t2", "t3"], + ), + patch("oauth_draft.mark_threads_read.modify_thread") as modify, + ): + rc = main( + [ + "--credentials", + str(creds), + "--query", + "q", + "--execute", + ] + ) + assert rc == 0 + assert modify.call_count == 3 + # default action: remove UNREAD + for call in modify.call_args_list: + _, _, add, remove = call.args + assert add == [] + assert remove == ["UNREAD"] + + +def test_main_max_truncates(tmp_path): + creds = _make_creds_file(tmp_path) + with ( + patch( + "oauth_draft.mark_threads_read.refresh_access_token", + return_value="tok", + ), + patch( + "oauth_draft.mark_threads_read.list_thread_ids", + return_value=["t1", "t2", "t3", "t4", "t5"], + ), + patch("oauth_draft.mark_threads_read.modify_thread") as modify, + ): + rc = main( + [ + "--credentials", + str(creds), + "--query", + "q", + "--execute", + "--max", + "2", + ] + ) + assert rc == 0 + assert modify.call_count == 2 + + +def test_main_returns_1_when_modify_fails(tmp_path): + creds = _make_creds_file(tmp_path) + with ( + patch( + "oauth_draft.mark_threads_read.refresh_access_token", + return_value="tok", + ), + patch( + "oauth_draft.mark_threads_read.list_thread_ids", + return_value=["t1"], + ), + patch( + "oauth_draft.mark_threads_read.modify_thread", + side_effect=SystemExit("boom"), + ), + ): + rc = main( + [ + "--credentials", + str(creds), + "--query", + "q", + "--execute", + ] + ) + assert rc == 1 diff --git a/tools/gmail/oauth-draft/tests/test_setup_creds.py b/tools/gmail/oauth-draft/tests/test_setup_creds.py new file mode 100644 index 00000000..5a37bcfa --- /dev/null +++ b/tools/gmail/oauth-draft/tests/test_setup_creds.py @@ -0,0 +1,231 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import json +import subprocess +from unittest.mock import MagicMock, patch + +import pytest + +from oauth_draft.setup_creds import detect_from_address, main, parse_args + +# --- detect_from_address --------------------------------------------------- + + +def test_detect_from_address_uses_env_var(monkeypatch): + monkeypatch.setenv("GMAIL_FROM", "env@example.com") + assert detect_from_address() == "env@example.com" + + +def test_detect_from_address_falls_back_to_git_config(monkeypatch): + monkeypatch.delenv("GMAIL_FROM", raising=False) + with patch( + "oauth_draft.setup_creds.subprocess.check_output", + return_value="git@example.com\n", + ): + assert detect_from_address() == "git@example.com" + + +def test_detect_from_address_returns_none_when_git_missing(monkeypatch): + monkeypatch.delenv("GMAIL_FROM", raising=False) + with patch( + "oauth_draft.setup_creds.subprocess.check_output", + side_effect=FileNotFoundError, + ): + assert detect_from_address() is None + + +def test_detect_from_address_returns_none_when_git_errors(monkeypatch): + monkeypatch.delenv("GMAIL_FROM", raising=False) + with patch( + "oauth_draft.setup_creds.subprocess.check_output", + side_effect=subprocess.CalledProcessError(1, "git"), + ): + assert detect_from_address() is None + + +def test_detect_from_address_returns_none_when_git_returns_empty(monkeypatch): + monkeypatch.delenv("GMAIL_FROM", raising=False) + with patch( + "oauth_draft.setup_creds.subprocess.check_output", + return_value="\n", + ): + assert detect_from_address() is None + + +# --- parse_args ------------------------------------------------------------ + + +def test_parse_args_minimal(monkeypatch): + # Avoid relying on the host's environment / git config: stub + # detect_from_address out of the parser default. + monkeypatch.setenv("GMAIL_FROM", "default@example.com") + args = parse_args(["client.json"]) + assert args.client_secrets == "client.json" + assert args.from_address == "default@example.com" + assert args.out.endswith("/.config/apache-steward/gmail-oauth.json") + assert args.rm_client_secrets is False + + +def test_parse_args_overrides(monkeypatch): + monkeypatch.setenv("GMAIL_FROM", "default@example.com") + args = parse_args( + [ + "client.json", + "--from-address", + "override@example.com", + "--out", + "/custom/path.json", + "--rm-client-secrets", + ] + ) + assert args.from_address == "override@example.com" + assert args.out == "/custom/path.json" + assert args.rm_client_secrets is True + + +# --- main ------------------------------------------------------------------ + + +def _client_secrets(tmp_path): + p = tmp_path / "client_secrets.json" + p.write_text( + json.dumps( + { + "installed": { + "client_id": "cid", + "client_secret": "secret", + } + } + ) + ) + return p + + +def _flow_creds(refresh_token="rt-1"): + creds = MagicMock() + creds.refresh_token = refresh_token + creds.scopes = ["https://mail.google.com/"] + return creds + + +def test_main_errors_when_no_from_address(tmp_path, monkeypatch): + monkeypatch.delenv("GMAIL_FROM", raising=False) + secrets = _client_secrets(tmp_path) + out = tmp_path / "out.json" + with patch( + "oauth_draft.setup_creds.subprocess.check_output", + side_effect=FileNotFoundError, + ): + with pytest.raises(SystemExit) as excinfo: + main([str(secrets), "--out", str(out)]) + assert "Could not determine --from-address" in str(excinfo.value) + + +def test_main_errors_when_client_secrets_missing(tmp_path, monkeypatch): + monkeypatch.setenv("GMAIL_FROM", "me@example.com") + out = tmp_path / "out.json" + with pytest.raises(SystemExit) as excinfo: + main([str(tmp_path / "does-not-exist.json"), "--out", str(out)]) + assert "client_secrets not found" in str(excinfo.value) + + +def test_main_errors_when_flow_returns_no_refresh_token(tmp_path, monkeypatch): + monkeypatch.setenv("GMAIL_FROM", "me@example.com") + secrets = _client_secrets(tmp_path) + out = tmp_path / "out.json" + flow = MagicMock() + flow.run_local_server.return_value = _flow_creds(refresh_token=None) + with patch( + "oauth_draft.setup_creds.InstalledAppFlow.from_client_secrets_file", + return_value=flow, + ): + with pytest.raises(SystemExit) as excinfo: + main([str(secrets), "--out", str(out)]) + assert "no refresh_token" in str(excinfo.value) + + +def test_main_writes_credentials_file(tmp_path, monkeypatch, capsys): + monkeypatch.setenv("GMAIL_FROM", "me@example.com") + secrets = _client_secrets(tmp_path) + out_dir = tmp_path / "creds-dir" + out = out_dir / "gmail-oauth.json" + flow = MagicMock() + flow.run_local_server.return_value = _flow_creds() + with patch( + "oauth_draft.setup_creds.InstalledAppFlow.from_client_secrets_file", + return_value=flow, + ): + rc = main([str(secrets), "--out", str(out)]) + assert rc == 0 + written = json.loads(out.read_text()) + assert written == { + "client_id": "cid", + "client_secret": "secret", + "refresh_token": "rt-1", + "from_address": "me@example.com", + } + # Mode should be 600 (owner-rw only) per the atomic-write path. + assert (out.stat().st_mode & 0o777) == 0o600 + # Parent dir should be 700. + assert (out_dir.stat().st_mode & 0o777) == 0o700 + # Original client_secrets is left in place by default. + assert secrets.exists() + # The startup banner logs the path of the client_secrets file (NOT + # its content). Asserts the variable rename + # (`client_secrets` → `client_secrets_path`) didn't break the + # log message that addresses CodeQL `py/clear-text-logging- + # sensitive-data` finding #3 on PR #6. + out_text = capsys.readouterr().out + assert f"Running OAuth flow against {secrets.resolve()}" in out_text + + +def test_main_with_rm_client_secrets_deletes_input(tmp_path, monkeypatch, capsys): + monkeypatch.setenv("GMAIL_FROM", "me@example.com") + secrets = _client_secrets(tmp_path) + out = tmp_path / "creds-dir" / "creds.json" + flow = MagicMock() + flow.run_local_server.return_value = _flow_creds() + with patch( + "oauth_draft.setup_creds.InstalledAppFlow.from_client_secrets_file", + return_value=flow, + ): + main([str(secrets), "--out", str(out), "--rm-client-secrets"]) + assert not secrets.exists() + # The "Removed " log line — covers the second CodeQL + # `py/clear-text-logging-sensitive-data` site on the renamed + # `client_secrets_path` variable. + out_text = capsys.readouterr().out + assert f"Removed {secrets.resolve()}" in out_text + + +def test_main_handles_web_shaped_client_secrets(tmp_path, monkeypatch): + monkeypatch.setenv("GMAIL_FROM", "me@example.com") + secrets = tmp_path / "client.json" + secrets.write_text(json.dumps({"web": {"client_id": "wcid", "client_secret": "wsecret"}})) + out = tmp_path / "creds-dir" / "creds.json" + flow = MagicMock() + flow.run_local_server.return_value = _flow_creds() + with patch( + "oauth_draft.setup_creds.InstalledAppFlow.from_client_secrets_file", + return_value=flow, + ): + main([str(secrets), "--out", str(out)]) + written = json.loads(out.read_text()) + assert written["client_id"] == "wcid" + assert written["client_secret"] == "wsecret" diff --git a/tools/vulnogram/generate-cve-json/.gitignore b/tools/vulnogram/generate-cve-json/.gitignore index ab7ef44e..8cae406d 100644 --- a/tools/vulnogram/generate-cve-json/.gitignore +++ b/tools/vulnogram/generate-cve-json/.gitignore @@ -4,3 +4,4 @@ __pycache__/ .pytest_cache/ .ruff_cache/ .mypy_cache/ +.coverage diff --git a/tools/vulnogram/generate-cve-json/src/generate_cve_json/cve_json.py b/tools/vulnogram/generate-cve-json/src/generate_cve_json/cve_json.py index 37a38f69..a5386c3e 100644 --- a/tools/vulnogram/generate-cve-json/src/generate_cve_json/cve_json.py +++ b/tools/vulnogram/generate-cve-json/src/generate_cve_json/cve_json.py @@ -119,6 +119,7 @@ # # Schema: see the README in this package. import tomllib +import urllib.parse from pathlib import Path _DEFAULT_CONFIG_RELPATH = ".apache-steward/tools/vulnogram/cve-json-config.toml" @@ -572,7 +573,14 @@ def classify_reference(url: str) -> list[str]: """ if re.search(r"github\.com/[^/]+/[^/]+/(pull|commit)/", url): return ["patch"] - if "lists.apache.org" in url or "security.apache.org" in url: + # Match the host exactly, not a substring — `if "lists.apache.org" + # in url` would also flag e.g. `https://evil.com/?q=lists.apache.org`, + # which CodeQL flags as `py/incomplete-url-substring-sanitization`. + try: + host = (urllib.parse.urlparse(url).hostname or "").lower() + except ValueError: + return [] + if host in ("lists.apache.org", "security.apache.org"): return ["vendor-advisory"] return [] diff --git a/tools/vulnogram/generate-cve-json/tests/test_cli.py b/tools/vulnogram/generate-cve-json/tests/test_cli.py new file mode 100644 index 00000000..937b5434 --- /dev/null +++ b/tools/vulnogram/generate-cve-json/tests/test_cli.py @@ -0,0 +1,512 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Tests for the CLI surface and `gh`-shelling-out helpers. + +These cover the parts of `generate_cve_json.cve_json` that don't have +existing unit tests in `test_generate_cve_json.py`: + +- `parse_args` (CLI argument shape); +- `main` (orchestration, all error paths, --stdin / --output / --attach + / --no-envelope flags); +- `fetch_issue`, `_gh_api_json`, `_fetch_issue` (gh subprocess wrappers + with mocked `subprocess.run`); +- `_splice_attachment_into_body` (in-place attachment splicing); +- `attach_to_issue` (idempotent attachment with mocked gh). +""" + +from __future__ import annotations + +import io +import json +import subprocess +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +from generate_cve_json import cve_json + + +def _issue_body( + *, + summary: str = "An RCE in the FooOperator.", + affected: str = ">= 3.0.0, < 3.1.0", + cwe: str = "CWE-352", + severity: str = "high", + credited: str = "Alice ", + cve_tool: str = "CVE-2026-12345", + extra: dict[str, str] | None = None, +) -> str: + """Build a minimal issue body with the standard tracker fields.""" + fields = { + "Short public summary for publish": summary, + "Affected versions": affected, + "Security mailing list thread": "https://lists.example.org/thread.html/abc", + "Public advisory URL": "", + "Reporter credited as": credited, + "PR with the fix": "https://github.com/upstream/repo/pull/4242", + "Remediation developer": "", + "CWE": cwe, + "Severity": severity, + "CVE tool link": cve_tool, + } + if extra: + fields.update(extra) + return "\n".join(f"### {k}\n{v}\n" for k, v in fields.items()) + + +# --- parse_args ------------------------------------------------------------ + + +class TestParseArgs: + def test_minimal_positional(self): + args = cve_json.parse_args(["123"]) + assert args.issue == "123" + assert args.stdin is False + assert args.attach is False + assert args.no_envelope is False + assert args.advisory_url == [] + assert args.remediation_developer == [] + assert args.product_for == [] + assert args.config is None + + def test_stdin_no_positional(self): + args = cve_json.parse_args(["--stdin"]) + assert args.issue is None + assert args.stdin is True + + def test_repeatable_flags_accumulate(self): + args = cve_json.parse_args( + [ + "123", + "--advisory-url", + "https://a.example/1", + "--advisory-url", + "https://a.example/2", + "--remediation-developer", + "Carol", + "--product-for", + "pkg-a=Display A", + "--product-for", + "pkg-b=Display B", + ] + ) + assert args.advisory_url == ["https://a.example/1", "https://a.example/2"] + assert args.remediation_developer == ["Carol"] + assert args.product_for == ["pkg-a=Display A", "pkg-b=Display B"] + + def test_overrides(self, tmp_path): + cfg = tmp_path / "c.toml" + args = cve_json.parse_args( + [ + "42", + "--config", + str(cfg), + "--repo", + "owner/repo", + "--cve-id", + "CVE-2099-0001", + "--title", + "Example Title", + "--vendor", + "Foo", + "--product", + "Bar", + "--package-name", + "bar-py", + "--collection-url", + "https://pypi.org/project/bar", + "--org-id", + "org-1", + "--version-start", + "1.0.0", + "--discovery", + "INTERNAL", + "--no-envelope", + "--attach", + "--output", + str(tmp_path / "out.json"), + ] + ) + assert args.config == cfg + assert args.repo == "owner/repo" + assert args.cve_id == "CVE-2099-0001" + assert args.no_envelope is True + assert args.attach is True + assert args.output == tmp_path / "out.json" + + +# --- _splice_attachment_into_body ----------------------------------------- + + +class TestSpliceAttachment: + cve_id = "CVE-2026-99999" + + def _markers(self): + return ( + cve_json._attachment_marker_begin(self.cve_id), + cve_json._attachment_marker_end(self.cve_id), + ) + + def _attachment(self): + begin, end = self._markers() + return f"{begin}\n## CVE JSON\n```json\n{{}}\n```\n{end}\n" + + def test_replaces_existing_attachment_block(self): + begin, end = self._markers() + body = f"### Body before\nstuff\n\n{begin}\nOLD CONTENT\n{end}\ntrailing line\n" + result = cve_json._splice_attachment_into_body(body, self._attachment(), self.cve_id) + assert "OLD CONTENT" not in result + assert "## CVE JSON" in result + # Markers are present and bracket the new content. + assert begin in result + assert end in result + + def test_legacy_single_marker_path(self): + begin, _ = self._markers() + body = f"### Body\n\n{begin}\nleftover legacy content with no end marker\n" + result = cve_json._splice_attachment_into_body(body, self._attachment(), self.cve_id) + assert "leftover legacy content" not in result + assert "## CVE JSON" in result + + def test_appends_after_cve_tool_link_when_no_existing_attachment(self): + body = _issue_body() + result = cve_json._splice_attachment_into_body(body, self._attachment(), self.cve_id) + assert "## CVE JSON" in result + # The attachment lands after the CVE-tool-link section. + cve_tool_idx = result.index("### CVE tool link") + attach_idx = result.index("## CVE JSON") + assert attach_idx > cve_tool_idx + + def test_appends_at_end_when_no_cve_tool_link_field(self): + body = "### Body\nstuff\n" + result = cve_json._splice_attachment_into_body(body, self._attachment(), self.cve_id) + assert result.endswith(self._markers()[1] + "\n") + + +# --- fetch_issue (gh subprocess wrapper) ---------------------------------- + + +class TestFetchIssue: + def test_returns_title_and_body(self): + completed = MagicMock(stdout=json.dumps({"title": "T", "body": "B"})) + with patch("generate_cve_json.cve_json.subprocess.run", return_value=completed) as run: + title, body = cve_json.fetch_issue(42, "owner/repo") + assert (title, body) == ("T", "B") + # Verify the gh call shape. + cmd = run.call_args.args[0] + assert cmd[:3] == ["gh", "issue", "view"] + assert "--repo" in cmd + assert "owner/repo" in cmd + + def test_gh_missing_raises_runtime_error(self): + with patch( + "generate_cve_json.cve_json.subprocess.run", + side_effect=FileNotFoundError, + ): + with pytest.raises(RuntimeError, match="`gh` CLI not found"): + cve_json.fetch_issue(1, "o/r") + + def test_gh_failure_raises_runtime_error(self): + err = subprocess.CalledProcessError(returncode=1, cmd=["gh"], stderr="not found") + with patch("generate_cve_json.cve_json.subprocess.run", side_effect=err): + with pytest.raises(RuntimeError, match="not found"): + cve_json.fetch_issue(1, "o/r") + + +# --- _gh_api_json ---------------------------------------------------------- + + +class TestGhApiJson: + def test_returns_parsed_json(self): + completed = MagicMock(stdout='{"id": 7}') + with patch("generate_cve_json.cve_json.subprocess.run", return_value=completed) as run: + result = cve_json._gh_api_json(["repos/o/r/issues/1"]) + assert result == {"id": 7} + cmd = run.call_args.args[0] + assert cmd[:2] == ["gh", "api"] + assert cmd[2] == "repos/o/r/issues/1" + + def test_returns_empty_dict_on_empty_stdout(self): + completed = MagicMock(stdout=" \n") + with patch("generate_cve_json.cve_json.subprocess.run", return_value=completed): + assert cve_json._gh_api_json(["repos/o/r"]) == {} + + def test_writes_body_payload_to_temp_file_and_uses_input_flag(self, tmp_path): + completed = MagicMock(stdout="{}") + captured_args: dict = {} + + def fake_run(cmd, **_kw): + captured_args["cmd"] = cmd + # Read back the temp file we wrote, prove it has our payload. + input_idx = cmd.index("--input") + captured_args["payload"] = json.loads(Path(cmd[input_idx + 1]).read_text()) + return completed + + with patch("generate_cve_json.cve_json.subprocess.run", side_effect=fake_run): + cve_json._gh_api_json( + ["-X", "PATCH", "repos/o/r/issues/1"], + body_payload={"body": "new"}, + ) + assert captured_args["payload"] == {"body": "new"} + assert "--input" in captured_args["cmd"] + + def test_gh_missing_raises_runtime_error(self): + with patch( + "generate_cve_json.cve_json.subprocess.run", + side_effect=FileNotFoundError, + ): + with pytest.raises(RuntimeError, match="`gh` CLI not found"): + cve_json._gh_api_json(["x"]) + + def test_gh_failure_raises_runtime_error(self): + err = subprocess.CalledProcessError(returncode=22, cmd=["gh"], stderr="rate limited\n") + with patch("generate_cve_json.cve_json.subprocess.run", side_effect=err): + with pytest.raises(RuntimeError, match="rate limited"): + cve_json._gh_api_json(["x"]) + + +# --- _fetch_issue (REST wrapper) ------------------------------------------ + + +class TestFetchIssueRest: + def test_returns_dict(self): + with patch( + "generate_cve_json.cve_json._gh_api_json", + return_value={"body": "x", "html_url": "u"}, + ): + assert cve_json._fetch_issue("o/r", "42") == { + "body": "x", + "html_url": "u", + } + + def test_raises_when_response_not_a_dict(self): + with patch("generate_cve_json.cve_json._gh_api_json", return_value=[]): + with pytest.raises(RuntimeError, match="Unexpected response shape"): + cve_json._fetch_issue("o/r", "42") + + +# --- attach_to_issue ------------------------------------------------------- + + +def _attach_kwargs() -> dict: + """Common keyword arguments for `attach_to_issue` calls in this module.""" + return { + "issue_number": "42", + "repo": "o/r", + "cve_id": "CVE-2026-12345", + "json_text": '{"a": 1}', + "cna": {"title": "T"}, + "cna_private_state": "DRAFT", + } + + +class TestAttachToIssue: + def test_appends_attachment_when_body_has_no_existing_marker(self): + with ( + patch( + "generate_cve_json.cve_json._fetch_issue", + return_value={"body": _issue_body(), "html_url": "https://x/42"}, + ), + patch("generate_cve_json.cve_json._gh_api_json", return_value={}) as patch_call, + ): + url, was_update = cve_json.attach_to_issue(**_attach_kwargs()) + assert was_update is False + assert url.endswith("#cve-json--paste-ready-for-cve-2026-12345") + # PATCH was called with a body that contains the begin marker. + body_payload = patch_call.call_args.kwargs["body_payload"]["body"] + assert cve_json._attachment_marker_begin("CVE-2026-12345") in body_payload + + def test_replaces_existing_attachment_idempotently(self): + cve_id = "CVE-2026-12345" + begin = cve_json._attachment_marker_begin(cve_id) + end = cve_json._attachment_marker_end(cve_id) + body_with_existing = _issue_body() + f"\n{begin}\nstale content\n{end}\n" + with ( + patch( + "generate_cve_json.cve_json._fetch_issue", + return_value={"body": body_with_existing, "html_url": "https://x/42"}, + ), + patch("generate_cve_json.cve_json._gh_api_json", return_value={}), + ): + url, was_update = cve_json.attach_to_issue(**_attach_kwargs()) + assert was_update is True + assert url.endswith("#cve-json--paste-ready-for-cve-2026-12345") + + def test_skips_patch_when_body_unchanged(self): + # Pre-build the attachment that _splice would produce, so the + # spliced result is byte-identical to the existing body. + cve_id = "CVE-2026-12345" + attachment = cve_json._build_attachment_body( + cve_id=cve_id, + json_text='{"a": 1}', + cna={"title": "T"}, + cna_private_state="DRAFT", + ) + existing = cve_json._splice_attachment_into_body(_issue_body(), attachment, cve_id) + with ( + patch( + "generate_cve_json.cve_json._fetch_issue", + return_value={"body": existing, "html_url": "https://x/42"}, + ), + patch("generate_cve_json.cve_json._gh_api_json") as patch_call, + ): + url, was_update = cve_json.attach_to_issue(**_attach_kwargs()) + assert was_update is True # had_existing was True + patch_call.assert_not_called() + assert url == "https://x/42" + + +# --- main: error paths ----------------------------------------------------- + + +class TestMainErrors: + def test_attach_with_stdin_returns_2(self, capsys): + rc = cve_json.main(["--stdin", "--attach"]) + assert rc == 2 + assert "--attach cannot be combined with --stdin" in capsys.readouterr().err + + def test_attach_without_issue_returns_2(self, capsys): + rc = cve_json.main(["--attach"]) + assert rc == 2 + assert "--attach requires the positional issue argument" in capsys.readouterr().err + + def test_missing_issue_without_stdin_returns_2(self, capsys): + rc = cve_json.main([]) + assert rc == 2 + assert "issue number is required unless --stdin" in capsys.readouterr().err + + def test_fetch_issue_failure_returns_1(self, capsys): + with patch( + "generate_cve_json.cve_json.fetch_issue", + side_effect=RuntimeError("gh boom"), + ): + rc = cve_json.main(["123"]) + assert rc == 1 + assert "gh boom" in capsys.readouterr().err + + def test_product_for_without_equals_returns_2(self, capsys, monkeypatch): + monkeypatch.setattr("sys.stdin", io.StringIO(_issue_body())) + rc = cve_json.main(["--stdin", "--product-for", "no-equals-sign"]) + assert rc == 2 + assert "PACKAGE=PRODUCT" in capsys.readouterr().err + + def test_product_for_with_empty_value_returns_2(self, capsys, monkeypatch): + monkeypatch.setattr("sys.stdin", io.StringIO(_issue_body())) + rc = cve_json.main(["--stdin", "--product-for", "pkg="]) + assert rc == 2 + assert "non-empty PACKAGE and PRODUCT" in capsys.readouterr().err + + def test_config_not_found_returns_2(self, tmp_path, capsys): + rc = cve_json.main(["--stdin", "--config", str(tmp_path / "nope.toml")]) + assert rc == 2 + assert "error:" in capsys.readouterr().err + + +# --- main: happy paths ----------------------------------------------------- + + +class TestMainHappyPath: + def test_stdin_writes_full_envelope_to_stdout(self, monkeypatch, capsys): + monkeypatch.setattr("sys.stdin", io.StringIO(_issue_body())) + rc = cve_json.main(["--stdin"]) + assert rc == 0 + out = capsys.readouterr().out + record = json.loads(out) + # Full CVE 5.x record envelope: cveMetadata + containers. + assert "cveMetadata" in record + assert "containers" in record + assert record["cveMetadata"]["cveId"] == "CVE-2026-12345" + + def test_stdin_no_envelope_emits_cna_only(self, monkeypatch, capsys): + monkeypatch.setattr("sys.stdin", io.StringIO(_issue_body())) + rc = cve_json.main(["--stdin", "--no-envelope"]) + assert rc == 0 + record = json.loads(capsys.readouterr().out) + # Bare CNA container — no top-level cveMetadata. + assert "cveMetadata" not in record + assert "title" in record + assert "affected" in record + + def test_output_to_file(self, monkeypatch, tmp_path, capsys): + monkeypatch.setattr("sys.stdin", io.StringIO(_issue_body())) + out_file = tmp_path / "cve.json" + rc = cve_json.main(["--stdin", "--output", str(out_file)]) + assert rc == 0 + # File written, not stdout. + record = json.loads(out_file.read_text()) + assert "cveMetadata" in record + # Friendly post-write print includes the byte count. + assert "Wrote " in capsys.readouterr().out + + def test_fetch_path_uses_gh(self, capsys): + with patch( + "generate_cve_json.cve_json.fetch_issue", + return_value=("Issue title", _issue_body()), + ) as fetch: + rc = cve_json.main(["123"]) + assert rc == 0 + fetch.assert_called_once() + record = json.loads(capsys.readouterr().out) + assert record["cveMetadata"]["cveId"] == "CVE-2026-12345" + + def test_attach_happy_path(self, capsys): + with ( + patch( + "generate_cve_json.cve_json.fetch_issue", + return_value=("Issue title", _issue_body()), + ), + patch( + "generate_cve_json.cve_json.attach_to_issue", + return_value=("https://x/42#cve-json--paste-ready-for-cve-2026-12345", False), + ) as attach, + ): + rc = cve_json.main(["123", "--attach"]) + assert rc == 0 + attach.assert_called_once() + out = capsys.readouterr().out + assert "Embedded CVE JSON" in out + + def test_attach_replace_path(self, capsys): + with ( + patch( + "generate_cve_json.cve_json.fetch_issue", + return_value=("Issue title", _issue_body()), + ), + patch( + "generate_cve_json.cve_json.attach_to_issue", + return_value=("https://x/42#anchor", True), + ), + ): + rc = cve_json.main(["123", "--attach"]) + assert rc == 0 + assert "Replaced CVE JSON" in capsys.readouterr().out + + def test_attach_failure_returns_1(self, capsys): + with ( + patch( + "generate_cve_json.cve_json.fetch_issue", + return_value=("T", _issue_body()), + ), + patch( + "generate_cve_json.cve_json.attach_to_issue", + side_effect=RuntimeError("attach boom"), + ), + ): + rc = cve_json.main(["123", "--attach"]) + assert rc == 1 + assert "attach boom" in capsys.readouterr().err diff --git a/tools/vulnogram/generate-cve-json/tests/test_generate_cve_json.py b/tools/vulnogram/generate-cve-json/tests/test_generate_cve_json.py index 19116aa3..5decd3cf 100644 --- a/tools/vulnogram/generate-cve-json/tests/test_generate_cve_json.py +++ b/tools/vulnogram/generate-cve-json/tests/test_generate_cve_json.py @@ -524,6 +524,22 @@ def test_lists_apache_tagged_as_vendor_advisory(self): def test_plain_doc_url_has_no_tags(self): assert classify_reference("https://airflow.apache.org/docs/") == [] + def test_security_apache_tagged_as_vendor_advisory(self): + assert classify_reference("https://security.apache.org/foo") == ["vendor-advisory"] + + def test_evil_substring_in_path_is_not_tagged(self): + # Regression: substring match would have flagged this; CodeQL + # `py/incomplete-url-substring-sanitization`. Hostname match + # rejects it correctly. + assert classify_reference("https://evil.example/?q=lists.apache.org") == [] + assert classify_reference("https://evil.example/security.apache.org") == [] + + def test_subdomain_is_not_treated_as_apache(self): + assert classify_reference("https://lists.apache.org.evil.example/x") == [] + + def test_malformed_url_returns_no_tags(self): + assert classify_reference("not a url") == [] + class TestBuildReferences: def test_mailing_list_field_urls_are_not_auto_included(self):