diff --git a/examples/.github-webhook-server.yaml b/examples/.github-webhook-server.yaml index 6611628d5..1eee145e0 100644 --- a/examples/.github-webhook-server.yaml +++ b/examples/.github-webhook-server.yaml @@ -171,3 +171,11 @@ test-oracle: - "tests/**/*.py" triggers: - approved + +# Security Checks (overrides global config) +security-checks: + mandatory: true + suspicious-paths: + - ".github/workflows/" + - ".github/actions/" + committer-identity-check: true diff --git a/examples/config.yaml b/examples/config.yaml index 2be36d720..c8529020e 100644 --- a/examples/config.yaml +++ b/examples/config.yaml @@ -136,6 +136,21 @@ ai-features: enabled: true timeout-minutes: 10 # Timeout in minutes for AI CLI (default: 10) +# Security Checks configuration +# Detects potentially malicious PR patterns like modifications to +# security-sensitive paths or mismatched committer identities. +security-checks: + mandatory: true # true (default) = blocks can-be-merged. false = advisory only + suspicious-paths: + - ".claude/" + - ".vscode/" + - ".cursor/" + - ".devcontainer/" + - ".pi/" + - ".github/workflows/" + - ".github/actions/" + committer-identity-check: true + repositories: my-repository: name: my-org/my-repository @@ -286,3 +301,10 @@ repositories: - "tests/**/*.py" triggers: - approved + + # Security Checks (overrides global) + # security-checks: + # suspicious-paths: + # - ".github/workflows/" + # - ".github/actions/" + # committer-identity-check: true diff --git a/webhook_server/config/schema.yaml b/webhook_server/config/schema.yaml index 33b292876..7d0dd7157 100644 --- a/webhook_server/config/schema.yaml +++ b/webhook_server/config/schema.yaml @@ -68,6 +68,34 @@ $defs: - ai-provider - ai-model additionalProperties: false + security-checks: + type: object + description: | + Security checks for pull requests. Detects potentially malicious PR patterns + such as modifications to security-sensitive paths or mismatched committer identities. + properties: + mandatory: + type: boolean + description: | + When true (default), security checks block can-be-merged. + When false, security checks are advisory only (run but don't block merge). + default: true + suspicious-paths: + type: array + description: | + List of path prefixes considered security-sensitive. + PRs modifying files under these paths will fail the security-suspicious-paths check run + and have auto-merge blocked. + Default: [".claude/", ".vscode/", ".cursor/", ".devcontainer/", ".pi/", ".github/workflows/", ".github/actions/"] + items: + type: string + committer-identity-check: + type: boolean + description: | + When enabled, compares the PR author (parent committer) against the last commit's + committer. Fails the security-committer-identity check run if they differ. + default: true + additionalProperties: false type: object properties: log-level: @@ -213,6 +241,8 @@ properties: additionalProperties: false ai-features: $ref: '#/$defs/ai-features' + security-checks: + $ref: '#/$defs/security-checks' labels: type: object description: Configure which labels are enabled and their colors @@ -631,6 +661,8 @@ properties: additionalProperties: false ai-features: $ref: '#/$defs/ai-features' + security-checks: + $ref: '#/$defs/security-checks' custom-check-runs: type: array description: | diff --git a/webhook_server/libs/github_api.py b/webhook_server/libs/github_api.py index b7e6d94e0..5db4d3a9a 100644 --- a/webhook_server/libs/github_api.py +++ b/webhook_server/libs/github_api.py @@ -35,6 +35,7 @@ CAN_BE_MERGED_STR, CONFIGURABLE_LABEL_CATEGORIES, CONVENTIONAL_TITLE_STR, + DEFAULT_SUSPICIOUS_PATHS, OTHER_MAIN_BRANCH, PRE_COMMIT_STR, PYTHON_MODULE_INSTALL_STR, @@ -662,7 +663,7 @@ async def process(self) -> Any: self.last_commit = await self._get_last_commit(pull_request=pull_request) self.parent_committer = pull_request.user.login - self.last_committer = getattr(self.last_commit.committer, "login", self.parent_committer) + self.last_committer = getattr(self.last_commit.committer, "login", "unknown") # Store PR SHAs: prefer webhook payload (avoids race condition with live API) # For pull_request events, base.sha and head.sha are guaranteed by GitHub webhook spec. @@ -953,6 +954,33 @@ def _repo_data_from_config(self, repository_config: dict[str, Any]) -> None: self.ai_features: dict[str, Any] | None = self.config.get_value( value="ai-features", return_on_none=None, extra_dict=repository_config ) + _security_checks: dict[str, Any] | None = self.config.get_value( + value="security-checks", return_on_none=None, extra_dict=repository_config + ) + _security_config = _security_checks if isinstance(_security_checks, dict) else {} + _suspicious_paths = _security_config.get("suspicious-paths", DEFAULT_SUSPICIOUS_PATHS) + self.security_suspicious_paths: list[str] = ( + [str(p).strip() for p in _suspicious_paths if isinstance(p, (str, int, float)) and str(p).strip()] + if isinstance(_suspicious_paths, list) + else DEFAULT_SUSPICIOUS_PATHS + ) + _committer_check_raw = _security_config.get("committer-identity-check", True) + if not isinstance(_committer_check_raw, bool): + self.logger.warning( + f"{self.log_prefix} security-checks.committer-identity-check must be boolean, " + f"got {type(_committer_check_raw).__name__}. Defaulting to true." + ) + _committer_check_raw = True + self.security_committer_identity_check: bool = _committer_check_raw + _mandatory_raw = _security_config.get("mandatory", True) + if not isinstance(_mandatory_raw, bool): + self.logger.warning( + f"{self.log_prefix} security-checks.mandatory must be boolean, got {type(_mandatory_raw).__name__}. " + "Defaulting to true." + ) + _mandatory_raw = True + self.security_mandatory: bool = _mandatory_raw + _auto_merge_prs = self.config.get_value( value="set-auto-merge-prs", return_on_none=[], extra_dict=repository_config ) diff --git a/webhook_server/libs/handlers/check_run_handler.py b/webhook_server/libs/handlers/check_run_handler.py index 2f6f0b6ed..6d6eb597a 100644 --- a/webhook_server/libs/handlers/check_run_handler.py +++ b/webhook_server/libs/handlers/check_run_handler.py @@ -18,6 +18,8 @@ IN_PROGRESS_STR, PYTHON_MODULE_INSTALL_STR, QUEUED_STR, + SECURITY_COMMITTER_IDENTITY_STR, + SECURITY_SUSPICIOUS_PATHS_STR, SUCCESS_STR, TOX_STR, VERIFIED_LABEL_STR, @@ -427,6 +429,14 @@ async def all_required_status_checks(self, pull_request: PullRequest) -> list[st check_name = custom_check["name"] all_required_status_checks.append(check_name) + # Add mandatory security checks + if self.github_webhook.security_mandatory: + if self.github_webhook.security_suspicious_paths: + all_required_status_checks.append(SECURITY_SUSPICIOUS_PATHS_STR) + + if self.github_webhook.security_committer_identity_check: + all_required_status_checks.append(SECURITY_COMMITTER_IDENTITY_STR) + # Use ordered deduplication to combine branch and config checks without duplicates _all_required_status_checks = list(dict.fromkeys(branch_required_status_checks + all_required_status_checks)) self.logger.debug(f"{self.log_prefix} All required status checks: {_all_required_status_checks}") diff --git a/webhook_server/libs/handlers/issue_comment_handler.py b/webhook_server/libs/handlers/issue_comment_handler.py index 1222ac3b6..5e9a870c4 100644 --- a/webhook_server/libs/handlers/issue_comment_handler.py +++ b/webhook_server/libs/handlers/issue_comment_handler.py @@ -9,7 +9,7 @@ from github.PullRequest import PullRequest from github.Repository import Repository -from webhook_server.libs.handlers.check_run_handler import CheckRunHandler +from webhook_server.libs.handlers.check_run_handler import CheckRunHandler, CheckRunOutput from webhook_server.libs.handlers.labels_handler import LabelsHandler from webhook_server.libs.handlers.owners_files_handler import OwnersFileHandler from webhook_server.libs.handlers.pull_request_handler import PullRequestHandler @@ -30,9 +30,12 @@ COMMAND_REGENERATE_WELCOME_STR, COMMAND_REPROCESS_STR, COMMAND_RETEST_STR, + COMMAND_SECURITY_OVERRIDE_STR, COMMAND_TEST_ORACLE_STR, HOLD_LABEL_STR, REACTIONS, + SECURITY_COMMITTER_IDENTITY_STR, + SECURITY_SUSPICIOUS_PATHS_STR, USER_LABELS_DICT, VERIFIED_LABEL_STR, WIP_STR, @@ -171,6 +174,7 @@ async def user_commands( COMMAND_ADD_ALLOWED_USER_STR, COMMAND_REGENERATE_WELCOME_STR, COMMAND_TEST_ORACLE_STR, + COMMAND_SECURITY_OVERRIDE_STR, ] command_and_args: list[str] = command.split(" ", 1) @@ -362,6 +366,64 @@ async def user_commands( pull_request.create_issue_comment, msg, logger=self.logger, log_prefix=self.log_prefix ) + elif _command == COMMAND_SECURITY_OVERRIDE_STR: + maintainers = await self.owners_file_handler.get_all_repository_maintainers() + if reviewed_user not in maintainers: + msg = "Only maintainers can use `/security-override`" + self.logger.debug(f"{self.log_prefix} {msg}") + await github_api_call( + pull_request.create_issue_comment, body=msg, logger=self.logger, log_prefix=self.log_prefix + ) + return + + if remove: + # Re-run security checks to re-evaluate + await self.runner_handler.run_security_suspicious_paths() + await self.runner_handler.run_security_committer_identity() + self.logger.info(f"{self.log_prefix} Security checks re-run by {reviewed_user}") + await github_api_call( + pull_request.create_issue_comment, + body=f"Security override removed by @{reviewed_user}. Security checks re-run.", + logger=self.logger, + log_prefix=self.log_prefix, + ) + else: + # Set security check runs to success (override) + if ( + not self.github_webhook.security_suspicious_paths + and not self.github_webhook.security_committer_identity_check + ): + msg = "No security checks are enabled — nothing to override." + self.logger.debug(f"{self.log_prefix} {msg}") + await github_api_call( + pull_request.create_issue_comment, body=msg, logger=self.logger, log_prefix=self.log_prefix + ) + return + + override_output: CheckRunOutput = { + "title": f"Overridden by maintainer @{reviewed_user}", + "summary": "Security check overridden by maintainer", + "text": ( + f"This security check was overridden by maintainer @{reviewed_user}.\n\n" + "Use `/security-override cancel` to re-run security checks." + ), + } + if self.github_webhook.security_suspicious_paths: + await self.check_run_handler.set_check_success( + name=SECURITY_SUSPICIOUS_PATHS_STR, output=override_output + ) + if self.github_webhook.security_committer_identity_check: + await self.check_run_handler.set_check_success( + name=SECURITY_COMMITTER_IDENTITY_STR, output=override_output + ) + self.logger.info(f"{self.log_prefix} Security override applied by {reviewed_user}") + await github_api_call( + pull_request.create_issue_comment, + body=f"Security checks overridden by @{reviewed_user}. Security check runs set to pass.", + logger=self.logger, + log_prefix=self.log_prefix, + ) + elif _command == WIP_STR: if not await self.owners_file_handler.is_user_valid_to_run_commands( pull_request=pull_request, reviewed_user=reviewed_user diff --git a/webhook_server/libs/handlers/pull_request_handler.py b/webhook_server/libs/handlers/pull_request_handler.py index b7c77a4d6..a511123a5 100644 --- a/webhook_server/libs/handlers/pull_request_handler.py +++ b/webhook_server/libs/handlers/pull_request_handler.py @@ -37,6 +37,9 @@ NEEDS_REBASE_LABEL_STR, PRE_COMMIT_STR, PYTHON_MODULE_INSTALL_STR, + SECURITY_COMMITTER_IDENTITY_STR, + SECURITY_SUSPICIOUS_PATHS_STR, + SUCCESS_STR, TOX_STR, USER_LABELS_DICT, VERIFIED_LABEL_STR, @@ -519,6 +522,7 @@ def _prepare_welcome_comment(self) -> str: {self._prepare_available_labels_section} {self._prepare_ai_features_welcome_section}\ +{self._prepare_security_checks_welcome_section}\ ### 💡 Tips @@ -635,6 +639,10 @@ def _prepare_pr_status_commands_section(self) -> str: ) commands.append("* `/regenerate-welcome` - Regenerate this welcome message") + if self.github_webhook.security_mandatory: + commands.append("* `/security-override` - Set security check runs to pass (maintainers only)") + commands.append("* `/security-override cancel` - Re-run security checks") + return "\n".join(commands) @property @@ -738,6 +746,38 @@ def _build_ai_features_section(self) -> str: {features_list} + +""" + + @property + def _prepare_security_checks_welcome_section(self) -> str: + """Prepare the Security Checks section for the welcome comment.""" + checks: list[str] = [] + + if self.github_webhook.security_suspicious_paths: + paths_str = ", ".join(f"`{p}`" for p in self.github_webhook.security_suspicious_paths) + checks.append(f"* **Suspicious Path Detection**: Monitors paths: {paths_str}") + + if self.github_webhook.security_committer_identity_check: + checks.append("* **Committer Identity Check**: Verifies last committer matches PR author") + + if not checks: + return "" + + mandatory_note = ( + "* **Mandatory**: Security checks block merge (use `/security-override` to bypass — maintainers only)" + if self.github_webhook.security_mandatory + else "* **Advisory**: Security checks are informational only (do not block merge)" + ) + checks.append(mandatory_note) + + checks_list = "\n".join(checks) + return f""" +
+Security Checks + +{checks_list} +
""" @@ -1109,6 +1149,13 @@ async def process_opened_or_synchronize_pull_request( if self.github_webhook.conventional_title: setup_tasks.append(self.check_run_handler.set_check_queued(name=CONVENTIONAL_TITLE_STR)) + # Queue security check runs + if self.github_webhook.security_suspicious_paths: + setup_tasks.append(self.check_run_handler.set_check_queued(name=SECURITY_SUSPICIOUS_PATHS_STR)) + + if self.github_webhook.security_committer_identity_check: + setup_tasks.append(self.check_run_handler.set_check_queued(name=SECURITY_COMMITTER_IDENTITY_STR)) + # Queue custom check runs (same as built-in checks) # Note: custom checks are validated in GithubWebhook._validate_custom_check_runs() # so name is guaranteed to exist @@ -1140,6 +1187,13 @@ async def process_opened_or_synchronize_pull_request( if self.github_webhook.conventional_title: ci_tasks.append(self.runner_handler.run_conventional_title_check(pull_request=pull_request)) + # Launch security check runs + if self.github_webhook.security_suspicious_paths: + ci_tasks.append(self.runner_handler.run_security_suspicious_paths()) + + if self.github_webhook.security_committer_identity_check: + ci_tasks.append(self.runner_handler.run_security_committer_identity()) + # Launch custom check runs (same as built-in checks) for custom_check in self.github_webhook.custom_check_runs: ci_tasks.append( @@ -1205,6 +1259,76 @@ async def set_pull_request_automerge(self, pull_request: PullRequest) -> None: self.logger.debug(f"{self.log_prefix} auto_merge: {auto_merge}, branch: {pull_request.base.ref}") + # Security override: block auto-merge if PR modifies suspicious paths + # Also disable already-enabled auto-merge (e.g., PR gained suspicious paths after auto-merge was set) + if (auto_merge or pull_request.raw_data.get("auto_merge")) and self.github_webhook.security_suspicious_paths: + changed_files = self.owners_file_handler.changed_files + suspicious_matches = [ + f + for f in changed_files + if any(f.startswith(prefix) for prefix in self.github_webhook.security_suspicious_paths) + ] + if suspicious_matches: + # Check if security check run was already overridden (set to success by maintainer) + _check_runs = await github_api_call( + lambda: list(self.github_webhook.last_commit.get_check_runs()), + logger=self.logger, + log_prefix=self.log_prefix, + ) + # Find the latest check run for security-suspicious-paths + security_path_runs = [cr for cr in _check_runs if cr.name == SECURITY_SUSPICIOUS_PATHS_STR] + # Check runs are returned newest-first by GitHub API + security_check_passed = bool(security_path_runs) and security_path_runs[0].conclusion == SUCCESS_STR + + if not security_check_passed: + auto_merge = False + files_list = ", ".join(f"`{f}`" for f in suspicious_matches) + self.logger.info( + f"{self.log_prefix} Auto-merge blocked: " + f"PR modifies security-sensitive paths: {suspicious_matches}" + ) + + # Avoid posting duplicate comments on every synchronize event + existing_comments = await github_api_call( + lambda: list(pull_request.get_issue_comments()), + logger=self.logger, + log_prefix=self.log_prefix, + ) + already_commented = any( + c.body.startswith("Auto-merge blocked: PR modifies security-sensitive paths:") + and c.user.login in self.github_webhook.auto_verified_and_merged_users + for c in existing_comments + ) + + if not already_commented: + await github_api_call( + pull_request.create_issue_comment, + f"Auto-merge blocked: PR modifies security-sensitive paths: {files_list}", + logger=self.logger, + log_prefix=self.log_prefix, + ) + + # Disable already-enabled auto-merge on the PR + if pull_request.raw_data.get("auto_merge"): + try: + self.logger.info( + f"{self.log_prefix} Suspicious paths detected, disabling existing auto-merge" + ) + await github_api_call( + pull_request.disable_automerge, logger=self.logger, log_prefix=self.log_prefix + ) + except asyncio.CancelledError: + raise + except Exception: + self.logger.exception( + f"{self.log_prefix} Failed to disable auto-merge for suspicious paths PR" + ) + else: + self.logger.info( + f"{self.log_prefix} Suspicious paths detected but security check already passed " + f"(maintainer override), allowing auto-merge" + ) + if auto_merge: # AI-resolved cherry-picks should NEVER be auto-merged labels = await github_api_call( diff --git a/webhook_server/libs/handlers/runner_handler.py b/webhook_server/libs/handlers/runner_handler.py index 9f22f2453..1a03435c4 100644 --- a/webhook_server/libs/handlers/runner_handler.py +++ b/webhook_server/libs/handlers/runner_handler.py @@ -28,6 +28,8 @@ PRE_COMMIT_STR, PREK_STR, PYTHON_MODULE_INSTALL_STR, + SECURITY_COMMITTER_IDENTITY_STR, + SECURITY_SUSPICIOUS_PATHS_STR, TOX_STR, ) from webhook_server.utils.github_repository_settings import get_repository_github_app_token @@ -311,6 +313,136 @@ async def run_pre_commit(self, pull_request: PullRequest) -> None: check_config = CheckConfig(name=PRE_COMMIT_STR, command=cmd, title="Pre-Commit") await self.run_check(pull_request=pull_request, check_config=check_config) + async def run_security_suspicious_paths(self) -> None: + """Check if PR modifies security-sensitive paths. + + Fails the check run if any changed files match the configured suspicious path prefixes. + Uses changed_files from owners_file_handler (already computed during PR processing). + """ + suspicious_paths = self.github_webhook.security_suspicious_paths + if not suspicious_paths: + self.logger.debug(f"{self.log_prefix} No suspicious paths configured, skipping security check") + return + + await self.check_run_handler.set_check_in_progress(name=SECURITY_SUSPICIOUS_PATHS_STR) + + try: + changed_files = self.owners_file_handler.changed_files + matched_files = [f for f in changed_files if any(f.startswith(prefix) for prefix in suspicious_paths)] + + if matched_files: + files_list = "\n".join(f"- `{f}`" for f in matched_files) + output: CheckRunOutput = { + "title": "\u274c Security: Suspicious Paths Detected", + "summary": f"{len(matched_files)} file(s) modify security-sensitive paths", + "text": ( + f"## Suspicious Path Detection\n\n" + f"This PR modifies files in security-sensitive locations:\n\n" + f"{files_list}\n\n" + f"**Configured suspicious path prefixes:**\n" + + "\n".join(f"- `{p}`" for p in suspicious_paths) + + "\n\n" + "These paths control development tooling, CI/CD workflows, or IDE configurations " + "and require careful review to prevent supply-chain attacks." + ), + } + self.logger.warning(f"{self.log_prefix} PR modifies suspicious paths: {matched_files}") + await self.check_run_handler.set_check_failure(name=SECURITY_SUSPICIOUS_PATHS_STR, output=output) + else: + output = { + "title": "Security: Suspicious Paths", + "summary": "No security-sensitive paths modified", + "text": ( + "No changed files match the configured suspicious path prefixes.\n\n" + "**Configured prefixes:**\n" + "\n".join(f"- `{p}`" for p in suspicious_paths) + ), + } + await self.check_run_handler.set_check_success(name=SECURITY_SUSPICIOUS_PATHS_STR, output=output) + except asyncio.CancelledError: + raise + except Exception: + self.logger.exception(f"{self.log_prefix} Security suspicious paths check failed") + await self.check_run_handler.set_check_failure( + name=SECURITY_SUSPICIOUS_PATHS_STR, + output={"title": "Security check error", "summary": "Unexpected error during check", "text": None}, + ) + + async def run_security_committer_identity(self) -> None: + """Check if the last committer matches the PR author. + + Fails the check run if the last commit's committer differs from the PR author + (parent_committer), which may indicate a commit was authored by someone unexpected. + """ + if not self.github_webhook.security_committer_identity_check: + self.logger.debug(f"{self.log_prefix} Committer identity check disabled, skipping") + return + + await self.check_run_handler.set_check_in_progress(name=SECURITY_COMMITTER_IDENTITY_STR) + + try: + parent_committer = self.github_webhook.parent_committer + last_committer = self.github_webhook.last_committer + + if last_committer == "unknown": + output: CheckRunOutput = { + "title": "\u274c Security: Committer Identity Unknown", + "summary": "Committer identity could not be verified", + "text": ( + "## Committer Identity Check\n\n" + f"**PR author:** `{parent_committer}`\n" + "**Last commit committer:** unknown\n\n" + "Committer identity could not be verified \u2014 " + "last commit has no associated GitHub user.\n\n" + "This may indicate:\n" + "- A commit was made with a local Git identity not linked to a GitHub account\n" + "- The committer's email is not verified on GitHub\n\n" + "Please verify the commit authorship before merging." + ), + } + self.logger.warning( + f"{self.log_prefix} Committer identity unknown: " + f"PR author={parent_committer}, last committer has no GitHub user" + ) + await self.check_run_handler.set_check_failure(name=SECURITY_COMMITTER_IDENTITY_STR, output=output) + elif last_committer != parent_committer: + output = { + "title": "\u274c Security: Committer Identity Mismatch", + "summary": f"Last committer '{last_committer}' differs from PR author '{parent_committer}'", + "text": ( + f"## Committer Identity Check\n\n" + f"**PR author:** `{parent_committer}`\n" + f"**Last commit committer:** `{last_committer}`\n\n" + f"The last commit in this PR was made by a different user than the PR author. " + f"This may indicate:\n" + f"- An unauthorized commit was pushed to the PR branch\n" + f"- A bot or automation tool committed with unexpected credentials\n" + f"- A legitimate co-author contribution (review carefully)\n\n" + f"Please verify this is expected before merging." + ), + } + self.logger.warning( + f"{self.log_prefix} Committer identity mismatch: " + f"PR author={parent_committer}, last committer={last_committer}" + ) + await self.check_run_handler.set_check_failure(name=SECURITY_COMMITTER_IDENTITY_STR, output=output) + else: + output = { + "title": "Security: Committer Identity", + "summary": "Committer identity verified", + "text": ( + f"The last commit committer (`{last_committer}`) matches the PR author (`{parent_committer}`)." + ), + } + await self.check_run_handler.set_check_success(name=SECURITY_COMMITTER_IDENTITY_STR, output=output) + except asyncio.CancelledError: + raise + except Exception: + self.logger.exception(f"{self.log_prefix} Security committer identity check failed") + await self.check_run_handler.set_check_failure( + name=SECURITY_COMMITTER_IDENTITY_STR, + output={"title": "Security check error", "summary": "Unexpected error during check", "text": None}, + ) + def _build_oci_annotations( self, pull_request: PullRequest | None = None, diff --git a/webhook_server/tests/test_check_run_handler.py b/webhook_server/tests/test_check_run_handler.py index ed113ef90..a724ac0d4 100644 --- a/webhook_server/tests/test_check_run_handler.py +++ b/webhook_server/tests/test_check_run_handler.py @@ -48,6 +48,9 @@ def mock_github_webhook(self) -> Mock: mock_webhook.container_repository_password = "test-pass" # pragma: allowlist secret mock_webhook.ctx = None mock_webhook.custom_check_runs = [] + mock_webhook.security_suspicious_paths = [] + mock_webhook.security_committer_identity_check = False + mock_webhook.security_mandatory = False return mock_webhook @pytest.fixture diff --git a/webhook_server/tests/test_custom_check_runs.py b/webhook_server/tests/test_custom_check_runs.py index c116ff28f..124ed2233 100644 --- a/webhook_server/tests/test_custom_check_runs.py +++ b/webhook_server/tests/test_custom_check_runs.py @@ -168,6 +168,9 @@ def mock_github_webhook(self) -> Mock: {"name": "lint", "command": "uv tool run --from ruff ruff check"}, {"name": "security-scan", "command": "uv tool run --from bandit bandit -r ."}, ] + mock_webhook.security_suspicious_paths = [] + mock_webhook.security_committer_identity_check = False + mock_webhook.security_mandatory = False return mock_webhook @pytest.fixture @@ -288,6 +291,9 @@ def mock_github_webhook_with_mixed_mandatory(self) -> Mock: {"name": "mandatory-check-2", "command": "echo test3", "mandatory": True}, {"name": "default-mandatory-check", "command": "echo test4"}, # No mandatory field = default to true ] + mock_webhook.security_suspicious_paths = [] + mock_webhook.security_committer_identity_check = False + mock_webhook.security_mandatory = False return mock_webhook @pytest.mark.asyncio @@ -1023,6 +1029,9 @@ def mock_github_webhook(self, tmp_path: Path) -> Mock: mock_webhook.clone_repo_dir = str(tmp_path / "test-repo") mock_webhook.mask_sensitive = True mock_webhook.custom_check_runs = [] + mock_webhook.security_suspicious_paths = [] + mock_webhook.security_committer_identity_check = False + mock_webhook.security_mandatory = False return mock_webhook @pytest.mark.asyncio diff --git a/webhook_server/tests/test_pull_request_handler.py b/webhook_server/tests/test_pull_request_handler.py index 0f2263d02..4a7642a18 100644 --- a/webhook_server/tests/test_pull_request_handler.py +++ b/webhook_server/tests/test_pull_request_handler.py @@ -94,6 +94,10 @@ def mock_github_webhook(self) -> Mock: mock_webhook.custom_check_runs = [] mock_webhook.ai_features = None mock_webhook.required_conversation_resolution = False + mock_webhook.security_suspicious_paths = [] + mock_webhook.security_committer_identity_check = True + mock_webhook.security_mandatory = True + mock_webhook.last_committer = "test-user" mock_webhook.config = Mock() mock_webhook.config.get_value = Mock(return_value=None) return mock_webhook @@ -136,6 +140,8 @@ def pull_request_handler(self, mock_github_webhook: Mock, mock_owners_file_handl handler.runner_handler.run_conventional_title_check = AsyncMock() handler.runner_handler.run_build_container = AsyncMock() handler.runner_handler.run_install_python_module = AsyncMock() + handler.runner_handler.run_security_suspicious_paths = AsyncMock() + handler.runner_handler.run_security_committer_identity = AsyncMock() handler.runner_handler.run_podman_command = AsyncMock(return_value=(0, "", "")) handler.runner_handler.cherry_pick = AsyncMock() @@ -2636,6 +2642,8 @@ async def test_process_opened_setup_task_failure( patch.object(pull_request_handler.runner_handler, "run_install_python_module", new=AsyncMock()), patch.object(pull_request_handler.runner_handler, "run_build_container", new=AsyncMock()), patch.object(pull_request_handler.runner_handler, "run_conventional_title_check", new=AsyncMock()), + patch.object(pull_request_handler.runner_handler, "run_security_suspicious_paths", new=AsyncMock()), + patch.object(pull_request_handler.runner_handler, "run_security_committer_identity", new=AsyncMock()), ): await pull_request_handler.process_opened_or_synchronize_pull_request(mock_pull_request) @@ -2664,6 +2672,8 @@ async def test_process_opened_ci_task_failure( patch.object(pull_request_handler.runner_handler, "run_pre_commit", new=AsyncMock()), patch.object(pull_request_handler.runner_handler, "run_install_python_module", new=AsyncMock()), patch.object(pull_request_handler.runner_handler, "run_build_container", new=AsyncMock()), + patch.object(pull_request_handler.runner_handler, "run_security_suspicious_paths", new=AsyncMock()), + patch.object(pull_request_handler.runner_handler, "run_security_committer_identity", new=AsyncMock()), ): await pull_request_handler.process_opened_or_synchronize_pull_request(mock_pull_request) diff --git a/webhook_server/tests/test_security_checks.py b/webhook_server/tests/test_security_checks.py new file mode 100644 index 000000000..5099e77e8 --- /dev/null +++ b/webhook_server/tests/test_security_checks.py @@ -0,0 +1,770 @@ +"""Tests for PR security checks. + +Tests cover: +1. Suspicious path detection check run (runner_handler) +2. Committer identity check run (runner_handler) +3. Auto-merge override for suspicious paths (pull_request_handler) +""" + +from collections.abc import Generator +from typing import Any +from unittest.mock import AsyncMock, Mock, patch + +import pytest + +from webhook_server.libs.github_api import GithubWebhook +from webhook_server.libs.handlers.check_run_handler import CheckRunHandler +from webhook_server.libs.handlers.issue_comment_handler import IssueCommentHandler +from webhook_server.libs.handlers.pull_request_handler import PullRequestHandler +from webhook_server.libs.handlers.runner_handler import RunnerHandler +from webhook_server.utils.constants import ( + BUILTIN_CHECK_NAMES, + COMMAND_SECURITY_OVERRIDE_STR, + DEFAULT_SUSPICIOUS_PATHS, + SECURITY_COMMITTER_IDENTITY_STR, + SECURITY_SUSPICIOUS_PATHS_STR, +) + +TEST_GITHUB_TOKEN = "ghp_testtoken123" # pragma: allowlist secret + + +class TestSecuritySuspiciousPaths: + """Test suite for suspicious path detection check run.""" + + @pytest.fixture + def mock_github_webhook(self) -> Mock: + mock_webhook = Mock() + mock_webhook.hook_data = {"action": "opened"} + mock_webhook.logger = Mock() + mock_webhook.log_prefix = "[TEST]" + mock_webhook.repository = Mock() + mock_webhook.token = TEST_GITHUB_TOKEN + mock_webhook.clone_repo_dir = "/tmp/test-repo" + mock_webhook.tox = {} + mock_webhook.tox_python_version = "" + mock_webhook.tox_args = "" + mock_webhook.pre_commit = False + mock_webhook.build_and_push_container = {} + mock_webhook.pypi = {} + mock_webhook.conventional_title = "" + mock_webhook.ctx = None + mock_webhook.custom_check_runs = [] + mock_webhook.ai_features = None + mock_webhook.config = Mock() + mock_webhook.config.get_value = Mock(return_value=None) + mock_webhook.security_suspicious_paths = DEFAULT_SUSPICIOUS_PATHS + mock_webhook.security_committer_identity_check = True + mock_webhook.parent_committer = "test-user" + mock_webhook.last_committer = "test-user" + return mock_webhook + + @pytest.fixture + def mock_owners_file_handler(self) -> Mock: + mock_handler = Mock() + mock_handler.is_user_valid_to_run_commands = AsyncMock(return_value=True) + mock_handler.changed_files = [] + return mock_handler + + @pytest.fixture + def runner_handler(self, mock_github_webhook: Mock, mock_owners_file_handler: Mock) -> RunnerHandler: + return RunnerHandler(mock_github_webhook, mock_owners_file_handler) + + @pytest.fixture(autouse=True) + def patch_check_run_text(self) -> Generator[None]: + with patch( + "webhook_server.libs.handlers.check_run_handler.CheckRunHandler.get_check_run_text", + return_value="dummy output", + ): + yield + + @pytest.mark.asyncio + async def test_suspicious_paths_no_match( + self, runner_handler: RunnerHandler, mock_owners_file_handler: Mock + ) -> None: + """Check passes when no changed files match suspicious paths.""" + mock_owners_file_handler.changed_files = ["src/main.py", "tests/test_main.py", "README.md"] + + with patch.object(runner_handler.check_run_handler, "set_check_in_progress", new=AsyncMock()) as mock_progress: + with patch.object(runner_handler.check_run_handler, "set_check_success", new=AsyncMock()) as mock_success: + await runner_handler.run_security_suspicious_paths() + + mock_progress.assert_called_once_with(name=SECURITY_SUSPICIOUS_PATHS_STR) + mock_success.assert_called_once() + call_args = mock_success.call_args + assert call_args.kwargs["name"] == SECURITY_SUSPICIOUS_PATHS_STR + assert "No security-sensitive paths modified" in call_args.kwargs["output"]["summary"] + + @pytest.mark.asyncio + async def test_suspicious_paths_match_single( + self, runner_handler: RunnerHandler, mock_owners_file_handler: Mock + ) -> None: + """Check fails when a file matches a suspicious path prefix.""" + mock_owners_file_handler.changed_files = ["src/main.py", ".github/workflows/ci.yml"] + + with patch.object(runner_handler.check_run_handler, "set_check_in_progress", new=AsyncMock()): + with patch.object(runner_handler.check_run_handler, "set_check_failure", new=AsyncMock()) as mock_failure: + await runner_handler.run_security_suspicious_paths() + + mock_failure.assert_called_once() + call_args = mock_failure.call_args + assert call_args.kwargs["name"] == SECURITY_SUSPICIOUS_PATHS_STR + assert "1 file(s)" in call_args.kwargs["output"]["summary"] + assert ".github/workflows/ci.yml" in call_args.kwargs["output"]["text"] + + @pytest.mark.asyncio + async def test_suspicious_paths_match_multiple( + self, runner_handler: RunnerHandler, mock_owners_file_handler: Mock + ) -> None: + """Check fails listing all matched files when multiple suspicious paths are found.""" + mock_owners_file_handler.changed_files = [ + ".claude/settings.json", + ".vscode/extensions.json", + "src/app.py", + ".github/workflows/deploy.yml", + ] + + with patch.object(runner_handler.check_run_handler, "set_check_in_progress", new=AsyncMock()): + with patch.object(runner_handler.check_run_handler, "set_check_failure", new=AsyncMock()) as mock_failure: + await runner_handler.run_security_suspicious_paths() + + mock_failure.assert_called_once() + call_args = mock_failure.call_args + assert call_args.kwargs["name"] == SECURITY_SUSPICIOUS_PATHS_STR + assert "3 file(s)" in call_args.kwargs["output"]["summary"] + output_text = call_args.kwargs["output"]["text"] + assert ".claude/settings.json" in output_text + assert ".vscode/extensions.json" in output_text + assert ".github/workflows/deploy.yml" in output_text + # Safe file should not be listed + assert "src/app.py" not in output_text + + @pytest.mark.asyncio + async def test_suspicious_paths_custom_config( + self, runner_handler: RunnerHandler, mock_owners_file_handler: Mock + ) -> None: + """Check uses custom suspicious paths from configuration.""" + runner_handler.github_webhook.security_suspicious_paths = ["custom/sensitive/", "secret/"] + mock_owners_file_handler.changed_files = ["custom/sensitive/config.yaml", "src/main.py"] + + with patch.object(runner_handler.check_run_handler, "set_check_in_progress", new=AsyncMock()): + with patch.object(runner_handler.check_run_handler, "set_check_failure", new=AsyncMock()) as mock_failure: + await runner_handler.run_security_suspicious_paths() + + mock_failure.assert_called_once() + call_args = mock_failure.call_args + assert "custom/sensitive/config.yaml" in call_args.kwargs["output"]["text"] + + @pytest.mark.asyncio + async def test_suspicious_paths_empty_config(self, runner_handler: RunnerHandler) -> None: + """Check is skipped when suspicious paths list is empty.""" + runner_handler.github_webhook.security_suspicious_paths = [] + + with patch.object(runner_handler.check_run_handler, "set_check_in_progress", new=AsyncMock()) as mock_progress: + await runner_handler.run_security_suspicious_paths() + mock_progress.assert_not_called() + + @pytest.mark.asyncio + async def test_suspicious_paths_all_default_prefixes( + self, runner_handler: RunnerHandler, mock_owners_file_handler: Mock + ) -> None: + """Verify all default suspicious path prefixes are detected.""" + mock_owners_file_handler.changed_files = [ + ".claude/config.json", + ".vscode/settings.json", + ".cursor/rules", + ".devcontainer/devcontainer.json", + ".pi/config.yaml", + ".github/workflows/ci.yml", + ".github/actions/custom/action.yml", + ] + + with patch.object(runner_handler.check_run_handler, "set_check_in_progress", new=AsyncMock()): + with patch.object(runner_handler.check_run_handler, "set_check_failure", new=AsyncMock()) as mock_failure: + await runner_handler.run_security_suspicious_paths() + + mock_failure.assert_called_once() + call_args = mock_failure.call_args + assert "7 file(s)" in call_args.kwargs["output"]["summary"] + + +class TestSecurityCommitterIdentity: + """Test suite for committer identity check run.""" + + @pytest.fixture + def mock_github_webhook(self) -> Mock: + mock_webhook = Mock() + mock_webhook.hook_data = {"action": "opened"} + mock_webhook.logger = Mock() + mock_webhook.log_prefix = "[TEST]" + mock_webhook.repository = Mock() + mock_webhook.token = TEST_GITHUB_TOKEN + mock_webhook.clone_repo_dir = "/tmp/test-repo" + mock_webhook.tox = {} + mock_webhook.tox_python_version = "" + mock_webhook.tox_args = "" + mock_webhook.pre_commit = False + mock_webhook.build_and_push_container = {} + mock_webhook.pypi = {} + mock_webhook.conventional_title = "" + mock_webhook.ctx = None + mock_webhook.custom_check_runs = [] + mock_webhook.ai_features = None + mock_webhook.config = Mock() + mock_webhook.config.get_value = Mock(return_value=None) + mock_webhook.security_suspicious_paths = DEFAULT_SUSPICIOUS_PATHS + mock_webhook.security_committer_identity_check = True + mock_webhook.parent_committer = "test-user" + mock_webhook.last_committer = "test-user" + return mock_webhook + + @pytest.fixture + def mock_owners_file_handler(self) -> Mock: + mock_handler = Mock() + mock_handler.is_user_valid_to_run_commands = AsyncMock(return_value=True) + return mock_handler + + @pytest.fixture + def runner_handler(self, mock_github_webhook: Mock, mock_owners_file_handler: Mock) -> RunnerHandler: + return RunnerHandler(mock_github_webhook, mock_owners_file_handler) + + @pytest.fixture(autouse=True) + def patch_check_run_text(self) -> Generator[None]: + with patch( + "webhook_server.libs.handlers.check_run_handler.CheckRunHandler.get_check_run_text", + return_value="dummy output", + ): + yield + + @pytest.mark.asyncio + async def test_committer_identity_match(self, runner_handler: RunnerHandler) -> None: + """Check passes when last committer matches PR author.""" + runner_handler.github_webhook.parent_committer = "test-user" + runner_handler.github_webhook.last_committer = "test-user" + + with patch.object(runner_handler.check_run_handler, "set_check_in_progress", new=AsyncMock()) as mock_progress: + with patch.object(runner_handler.check_run_handler, "set_check_success", new=AsyncMock()) as mock_success: + await runner_handler.run_security_committer_identity() + + mock_progress.assert_called_once_with(name=SECURITY_COMMITTER_IDENTITY_STR) + mock_success.assert_called_once() + call_args = mock_success.call_args + assert call_args.kwargs["name"] == SECURITY_COMMITTER_IDENTITY_STR + assert "Committer identity verified" in call_args.kwargs["output"]["summary"] + + @pytest.mark.asyncio + async def test_committer_identity_mismatch(self, runner_handler: RunnerHandler) -> None: + """Check fails when last committer differs from PR author.""" + runner_handler.github_webhook.parent_committer = "legit-user" + runner_handler.github_webhook.last_committer = "suspicious-bot" + + with patch.object(runner_handler.check_run_handler, "set_check_in_progress", new=AsyncMock()): + with patch.object(runner_handler.check_run_handler, "set_check_failure", new=AsyncMock()) as mock_failure: + await runner_handler.run_security_committer_identity() + + mock_failure.assert_called_once() + call_args = mock_failure.call_args + assert call_args.kwargs["name"] == SECURITY_COMMITTER_IDENTITY_STR + output = call_args.kwargs["output"] + assert "suspicious-bot" in output["summary"] + assert "legit-user" in output["summary"] + assert "suspicious-bot" in output["text"] + assert "legit-user" in output["text"] + + @pytest.mark.asyncio + async def test_committer_identity_unknown(self, runner_handler: RunnerHandler) -> None: + """Check fails when last committer is unknown (no GitHub user).""" + runner_handler.github_webhook.parent_committer = "legit-user" + runner_handler.github_webhook.last_committer = "unknown" + + with patch.object(runner_handler.check_run_handler, "set_check_in_progress", new=AsyncMock()): + with patch.object(runner_handler.check_run_handler, "set_check_failure", new=AsyncMock()) as mock_failure: + await runner_handler.run_security_committer_identity() + + mock_failure.assert_called_once() + call_args = mock_failure.call_args + assert call_args.kwargs["name"] == SECURITY_COMMITTER_IDENTITY_STR + output = call_args.kwargs["output"] + assert "could not be verified" in output["summary"] + assert "no associated GitHub user" in output["text"] + + @pytest.mark.asyncio + async def test_committer_identity_check_disabled(self, runner_handler: RunnerHandler) -> None: + """Check is skipped when committer-identity-check is disabled.""" + runner_handler.github_webhook.security_committer_identity_check = False + + with patch.object(runner_handler.check_run_handler, "set_check_in_progress", new=AsyncMock()) as mock_progress: + await runner_handler.run_security_committer_identity() + mock_progress.assert_not_called() + + +class TestAutoMergeSecurityOverride: + """Test auto-merge is blocked when PR modifies suspicious paths.""" + + @pytest.fixture + def mock_github_webhook(self) -> Mock: + mock_webhook = Mock() + mock_webhook.hook_data = {"action": "opened"} + mock_webhook.logger = Mock() + mock_webhook.log_prefix = "[TEST]" + mock_webhook.repository = Mock() + mock_webhook.repository_full_name = "test-org/test-repo" + mock_webhook.token = TEST_GITHUB_TOKEN + mock_webhook.parent_committer = "auto-merge-user" + mock_webhook.auto_verified_and_merged_users = ["auto-merge-user"] + mock_webhook.set_auto_merge_prs = [] + mock_webhook.security_suspicious_paths = DEFAULT_SUSPICIOUS_PATHS + mock_webhook.security_committer_identity_check = True + mock_webhook.security_mandatory = True + mock_webhook.last_commit = Mock() + mock_webhook.ctx = None + mock_webhook.enabled_labels = None + mock_webhook.custom_check_runs = [] + mock_webhook.ai_features = None + mock_webhook.config = Mock() + mock_webhook.config.get_value = Mock(return_value=None) + mock_webhook.tox = False + mock_webhook.pre_commit = False + mock_webhook.pypi = False + mock_webhook.build_and_push_container = False + mock_webhook.conventional_title = False + mock_webhook.create_issue_for_new_pr = True + mock_webhook.verified_job = True + mock_webhook.can_be_merged_required_labels = [] + mock_webhook.minimum_lgtm = 1 + mock_webhook.auto_verify_cherry_picked_prs = True + mock_webhook.cherry_pick_assign_to_pr_author = True + mock_webhook.required_conversation_resolution = False + mock_webhook.issue_url_for_welcome_msg = "" + return mock_webhook + + @pytest.fixture + def mock_owners_file_handler(self) -> Mock: + mock_handler = Mock() + mock_handler.is_user_valid_to_run_commands = AsyncMock(return_value=True) + mock_handler.changed_files = [] + mock_handler.all_pull_request_approvers = ["approver1"] + mock_handler.all_pull_request_reviewers = ["reviewer1"] + mock_handler.all_repository_approvers_and_reviewers = {} + return mock_handler + + @pytest.fixture + def mock_pull_request(self) -> Mock: + mock_pr = Mock() + mock_pr.number = 123 + mock_pr.title = "Test PR" + mock_pr.base.ref = "main" + mock_pr.head.ref = "feature-branch" + mock_pr.head.sha = "abc123" + mock_pr.user.login = "auto-merge-user" + mock_pr.labels = [] + mock_pr.raw_data = {} + mock_pr.create_issue_comment = Mock() + mock_pr.enable_automerge = Mock() + return mock_pr + + @pytest.mark.asyncio + async def test_automerge_blocked_by_suspicious_paths( + self, + mock_github_webhook: Mock, + mock_owners_file_handler: Mock, + mock_pull_request: Mock, + ) -> None: + """Auto-merge is blocked and comment posted when PR modifies suspicious paths.""" + + mock_owners_file_handler.changed_files = [".github/workflows/ci.yml", "src/main.py"] + handler = PullRequestHandler(mock_github_webhook, mock_owners_file_handler) + + with patch( + "webhook_server.libs.handlers.pull_request_handler.github_api_call", new=AsyncMock() + ) as mock_api_call: + await handler.set_pull_request_automerge(pull_request=mock_pull_request) + + # Should have called github_api_call for labels fetch + blocking comment + comment_calls = [ + c + for c in mock_api_call.call_args_list + if len(c.args) > 1 and isinstance(c.args[1], str) and "Auto-merge blocked" in c.args[1] + ] + assert len(comment_calls) == 1 + assert ".github/workflows/ci.yml" in comment_calls[0].args[1] + + @pytest.mark.asyncio + async def test_automerge_allowed_without_suspicious_paths( + self, + mock_github_webhook: Mock, + mock_owners_file_handler: Mock, + mock_pull_request: Mock, + ) -> None: + """Auto-merge proceeds when no suspicious paths are modified.""" + + mock_owners_file_handler.changed_files = ["src/main.py", "tests/test_main.py"] + handler = PullRequestHandler(mock_github_webhook, mock_owners_file_handler) + + # github_api_call is called for labels (returns empty list) and enable_automerge + async def mock_api_side_effect(func: Any, *args: Any, **kwargs: Any) -> Any: + # When called with a lambda (labels fetch), return empty list + if callable(func) and not args: + return func() + # Otherwise (enable_automerge), just return None + return None + + with patch( + "webhook_server.libs.handlers.pull_request_handler.github_api_call", new=AsyncMock(return_value=[]) + ) as mock_api_call: + await handler.set_pull_request_automerge(pull_request=mock_pull_request) + + # Should have called github_api_call but NOT for blocking comment + for call in mock_api_call.call_args_list: + if len(call.args) > 1 and isinstance(call.args[1], str): + assert "Auto-merge blocked" not in call.args[1] + + @pytest.mark.asyncio + async def test_automerge_not_blocked_when_security_paths_empty( + self, + mock_github_webhook: Mock, + mock_owners_file_handler: Mock, + mock_pull_request: Mock, + ) -> None: + """Auto-merge is not blocked when suspicious paths config is empty.""" + + mock_github_webhook.security_suspicious_paths = [] + mock_owners_file_handler.changed_files = [".github/workflows/ci.yml"] + handler = PullRequestHandler(mock_github_webhook, mock_owners_file_handler) + + with patch( + "webhook_server.libs.handlers.pull_request_handler.github_api_call", new=AsyncMock(return_value=[]) + ) as mock_api_call: + await handler.set_pull_request_automerge(pull_request=mock_pull_request) + + # Should NOT have posted a blocking comment + for call in mock_api_call.call_args_list: + if len(call.args) > 1 and isinstance(call.args[1], str): + assert "Auto-merge blocked" not in call.args[1] + + @pytest.mark.asyncio + async def test_automerge_disabled_when_already_enabled_and_suspicious_paths( + self, + mock_github_webhook: Mock, + mock_owners_file_handler: Mock, + mock_pull_request: Mock, + ) -> None: + """Already-enabled auto-merge is disabled when PR gains suspicious paths on synchronize.""" + # User is NOT in auto-merge list, but PR already has auto-merge enabled + mock_github_webhook.auto_verified_and_merged_users = [] + mock_github_webhook.set_auto_merge_prs = [] + mock_owners_file_handler.changed_files = [".github/workflows/ci.yml", "src/main.py"] + mock_pull_request.raw_data = {"auto_merge": {"merge_method": "squash"}} + handler = PullRequestHandler(mock_github_webhook, mock_owners_file_handler) + + with patch( + "webhook_server.libs.handlers.pull_request_handler.github_api_call", + new=AsyncMock(), + ) as mock_api_call: + await handler.set_pull_request_automerge(pull_request=mock_pull_request) + + # Should have posted blocking comment AND called disable_automerge + comment_calls = [ + c + for c in mock_api_call.call_args_list + if len(c.args) > 1 and isinstance(c.args[1], str) and "Auto-merge blocked" in c.args[1] + ] + assert len(comment_calls) == 1 + + disable_calls = [ + c + for c in mock_api_call.call_args_list + if len(c.args) > 0 and c.args[0] == mock_pull_request.disable_automerge + ] + assert len(disable_calls) == 1 + + +class TestSecurityCheckConstants: + """Test that security check constants are properly defined.""" + + def test_security_constants_values(self) -> None: + assert SECURITY_SUSPICIOUS_PATHS_STR == "security-suspicious-paths" + assert SECURITY_COMMITTER_IDENTITY_STR == "security-committer-identity" + + def test_default_suspicious_paths(self) -> None: + assert ".claude/" in DEFAULT_SUSPICIOUS_PATHS + assert ".vscode/" in DEFAULT_SUSPICIOUS_PATHS + assert ".cursor/" in DEFAULT_SUSPICIOUS_PATHS + assert ".devcontainer/" in DEFAULT_SUSPICIOUS_PATHS + assert ".pi/" in DEFAULT_SUSPICIOUS_PATHS + assert ".github/workflows/" in DEFAULT_SUSPICIOUS_PATHS + assert ".github/actions/" in DEFAULT_SUSPICIOUS_PATHS + + def test_security_checks_in_builtin_check_names(self) -> None: + assert SECURITY_SUSPICIOUS_PATHS_STR in BUILTIN_CHECK_NAMES + assert SECURITY_COMMITTER_IDENTITY_STR in BUILTIN_CHECK_NAMES + + def test_security_override_constants(self) -> None: + assert COMMAND_SECURITY_OVERRIDE_STR == "security-override" + + +class TestSecurityConfigSanitization: + """Test config sanitization through the production code path.""" + + @pytest.fixture + def mock_webhook_for_config(self) -> Mock: + """Create a minimal mock GithubWebhook for _repo_data_from_config testing.""" + + mock = Mock() + mock.logger = Mock() + mock.log_prefix = "[TEST]" + mock.config = Mock() + return mock + + def test_non_string_suspicious_paths_sanitized(self, mock_webhook_for_config: Mock) -> None: + """Non-string items in suspicious-paths are converted to strings.""" + + security_config: dict[str, Any] = { + "suspicious-paths": [".github/workflows/", 123, 4.5, "", " ", ".vscode/"], + "committer-identity-check": True, + } + mock_webhook_for_config.config.get_value = Mock( + side_effect=lambda value, **kwargs: security_config if value == "security-checks" else None + ) + + GithubWebhook._repo_data_from_config(mock_webhook_for_config, repository_config={}) + assert mock_webhook_for_config.security_suspicious_paths == [".github/workflows/", "123", "4.5", ".vscode/"] + + def test_non_list_suspicious_paths_uses_defaults(self, mock_webhook_for_config: Mock) -> None: + """Non-list suspicious-paths falls back to defaults.""" + + security_config: dict[str, Any] = { + "suspicious-paths": "not-a-list", + "committer-identity-check": True, + } + mock_webhook_for_config.config.get_value = Mock( + side_effect=lambda value, **kwargs: security_config if value == "security-checks" else None + ) + + GithubWebhook._repo_data_from_config(mock_webhook_for_config, repository_config={}) + assert mock_webhook_for_config.security_suspicious_paths == DEFAULT_SUSPICIOUS_PATHS + + +class TestSecurityRequiredStatusChecks: + """Test security checks in all_required_status_checks.""" + + @pytest.fixture + def mock_github_webhook(self) -> Mock: + mock_webhook = Mock() + mock_webhook.hook_data = {"action": "opened"} + mock_webhook.logger = Mock() + mock_webhook.log_prefix = "[TEST]" + mock_webhook.repository = Mock() + mock_webhook.token = TEST_GITHUB_TOKEN + mock_webhook.tox = {} + mock_webhook.verified_job = True + mock_webhook.build_and_push_container = {} + mock_webhook.pypi = {} + mock_webhook.conventional_title = "" + mock_webhook.custom_check_runs = [] + mock_webhook.security_suspicious_paths = DEFAULT_SUSPICIOUS_PATHS + mock_webhook.security_committer_identity_check = True + mock_webhook.security_mandatory = True + mock_webhook.last_commit = Mock() + mock_webhook.last_commit.sha = "abc123" + mock_webhook.ctx = None + mock_webhook.config = Mock() + mock_webhook.config.get_value = Mock(return_value=None) + return mock_webhook + + @pytest.fixture + def mock_owners_file_handler(self) -> Mock: + mock_handler = Mock() + return mock_handler + + @pytest.fixture + def check_run_handler(self, mock_github_webhook: Mock, mock_owners_file_handler: Mock) -> CheckRunHandler: + return CheckRunHandler(mock_github_webhook, mock_owners_file_handler) + + @pytest.fixture + def mock_pull_request(self) -> Mock: + mock_pr = Mock() + mock_pr.number = 123 + mock_pr.base.ref = "main" + mock_pr.labels = [] # No security-override label + return mock_pr + + @pytest.fixture(autouse=True) + def patch_check_run_text(self) -> Generator[None]: + with patch( + "webhook_server.libs.handlers.check_run_handler.CheckRunHandler.get_check_run_text", + return_value="dummy output", + ): + yield + + @pytest.mark.asyncio + async def test_security_checks_required_when_mandatory( + self, check_run_handler: CheckRunHandler, mock_pull_request: Mock + ) -> None: + """Security checks are in required checks when mandatory=true.""" + with patch.object( + check_run_handler, + "get_branch_required_status_checks", + new=AsyncMock(return_value=[]), + ): + checks = await check_run_handler.all_required_status_checks(pull_request=mock_pull_request) + assert SECURITY_SUSPICIOUS_PATHS_STR in checks + assert SECURITY_COMMITTER_IDENTITY_STR in checks + + @pytest.mark.asyncio + async def test_security_checks_not_required_when_not_mandatory( + self, check_run_handler: CheckRunHandler, mock_pull_request: Mock + ) -> None: + """Security checks are NOT in required checks when mandatory=false.""" + check_run_handler.github_webhook.security_mandatory = False + + with patch.object( + check_run_handler, + "get_branch_required_status_checks", + new=AsyncMock(return_value=[]), + ): + checks = await check_run_handler.all_required_status_checks(pull_request=mock_pull_request) + assert SECURITY_SUSPICIOUS_PATHS_STR not in checks + assert SECURITY_COMMITTER_IDENTITY_STR not in checks + + @pytest.mark.asyncio + async def test_security_checks_partial_config( + self, check_run_handler: CheckRunHandler, mock_pull_request: Mock + ) -> None: + """Only configured security checks are added to required list.""" + check_run_handler.github_webhook.security_suspicious_paths = [] + check_run_handler.github_webhook.security_committer_identity_check = True + + with patch.object( + check_run_handler, + "get_branch_required_status_checks", + new=AsyncMock(return_value=[]), + ): + checks = await check_run_handler.all_required_status_checks(pull_request=mock_pull_request) + assert SECURITY_SUSPICIOUS_PATHS_STR not in checks + assert SECURITY_COMMITTER_IDENTITY_STR in checks + + +class TestSecurityOverrideCommand: + """Test /security-override command handling.""" + + @pytest.fixture + def mock_github_webhook(self) -> Mock: + mock_webhook = Mock() + mock_webhook.hook_data = {"action": "created"} + mock_webhook.logger = Mock() + mock_webhook.log_prefix = "[TEST]" + mock_webhook.repository = Mock() + mock_webhook.repository_full_name = "test-org/test-repo" + mock_webhook.token = TEST_GITHUB_TOKEN + mock_webhook.security_mandatory = True + mock_webhook.security_suspicious_paths = DEFAULT_SUSPICIOUS_PATHS + mock_webhook.security_committer_identity_check = True + mock_webhook.ctx = None + mock_webhook.config = Mock() + mock_webhook.config.get_value = Mock(return_value=None) + return mock_webhook + + @pytest.fixture + def mock_pull_request(self) -> Mock: + mock_pr = Mock() + mock_pr.number = 123 + mock_pr.labels = [] + mock_pr.create_issue_comment = Mock() + return mock_pr + + @pytest.mark.asyncio + async def test_security_override_by_maintainer(self, mock_github_webhook: Mock, mock_pull_request: Mock) -> None: + """Maintainers can set security check runs to success.""" + + mock_owners = Mock() + mock_owners.get_all_repository_maintainers = AsyncMock(return_value=["maintainer-user"]) + mock_owners.all_repository_approvers = ["approver1"] + mock_owners.is_user_valid_to_run_commands = AsyncMock(return_value=True) + + handler = IssueCommentHandler(mock_github_webhook, mock_owners) + + with ( + patch.object(handler.check_run_handler, "set_check_success", new=AsyncMock()) as mock_success, + patch.object(handler, "create_comment_reaction", new=AsyncMock()), + patch( + "webhook_server.libs.handlers.issue_comment_handler.github_api_call", + new=AsyncMock(), + ), + ): + await handler.user_commands( + pull_request=mock_pull_request, + command=COMMAND_SECURITY_OVERRIDE_STR, + reviewed_user="maintainer-user", + issue_comment_id=1, + is_draft=False, + ) + + # Both security check runs should be set to success + success_names = [c.kwargs["name"] for c in mock_success.call_args_list] + assert SECURITY_SUSPICIOUS_PATHS_STR in success_names + assert SECURITY_COMMITTER_IDENTITY_STR in success_names + + @pytest.mark.asyncio + async def test_security_override_rejected_for_non_maintainer( + self, mock_github_webhook: Mock, mock_pull_request: Mock + ) -> None: + """Non-maintainers cannot use /security-override.""" + + mock_owners = Mock() + mock_owners.get_all_repository_maintainers = AsyncMock(return_value=["maintainer-user"]) + mock_owners.all_repository_approvers = ["approver1"] + mock_owners.is_user_valid_to_run_commands = AsyncMock(return_value=True) + + handler = IssueCommentHandler(mock_github_webhook, mock_owners) + + with ( + patch.object(handler.check_run_handler, "set_check_success", new=AsyncMock()) as mock_success, + patch.object(handler, "create_comment_reaction", new=AsyncMock()), + patch( + "webhook_server.libs.handlers.issue_comment_handler.github_api_call", + new=AsyncMock(), + ), + ): + await handler.user_commands( + pull_request=mock_pull_request, + command=COMMAND_SECURITY_OVERRIDE_STR, + reviewed_user="random-user", + issue_comment_id=1, + is_draft=False, + ) + + # Check runs should NOT be set to success + mock_success.assert_not_called() + + @pytest.mark.asyncio + async def test_security_override_cancel(self, mock_github_webhook: Mock, mock_pull_request: Mock) -> None: + """/security-override cancel re-runs security checks.""" + + mock_owners = Mock() + mock_owners.get_all_repository_maintainers = AsyncMock(return_value=["maintainer-user"]) + mock_owners.all_repository_approvers = ["approver1"] + mock_owners.is_user_valid_to_run_commands = AsyncMock(return_value=True) + + handler = IssueCommentHandler(mock_github_webhook, mock_owners) + + with ( + patch.object(handler.runner_handler, "run_security_suspicious_paths", new=AsyncMock()) as mock_run_paths, + patch.object( + handler.runner_handler, "run_security_committer_identity", new=AsyncMock() + ) as mock_run_identity, + patch.object(handler, "create_comment_reaction", new=AsyncMock()), + patch( + "webhook_server.libs.handlers.issue_comment_handler.github_api_call", + new=AsyncMock(), + ), + ): + await handler.user_commands( + pull_request=mock_pull_request, + command=f"{COMMAND_SECURITY_OVERRIDE_STR} cancel", + reviewed_user="maintainer-user", + issue_comment_id=1, + is_draft=False, + ) + + mock_run_paths.assert_called_once() + mock_run_identity.assert_called_once() diff --git a/webhook_server/utils/constants.py b/webhook_server/utils/constants.py index 765665de4..e1bc4aa4c 100644 --- a/webhook_server/utils/constants.py +++ b/webhook_server/utils/constants.py @@ -16,6 +16,9 @@ BUILD_CONTAINER_STR: str = "build-container" PYTHON_MODULE_INSTALL_STR: str = "python-module-install" CONVENTIONAL_TITLE_STR: str = "conventional-title" +SECURITY_SUSPICIOUS_PATHS_STR: str = "security-suspicious-paths" +SECURITY_COMMITTER_IDENTITY_STR: str = "security-committer-identity" +COMMAND_SECURITY_OVERRIDE_STR: str = "security-override" WIP_STR: str = "wip" LGTM_STR: str = "lgtm" APPROVE_STR: str = "approve" @@ -140,8 +143,20 @@ PYTHON_MODULE_INSTALL_STR, CONVENTIONAL_TITLE_STR, CAN_BE_MERGED_STR, + SECURITY_SUSPICIOUS_PATHS_STR, + SECURITY_COMMITTER_IDENTITY_STR, }) +DEFAULT_SUSPICIOUS_PATHS: list[str] = [ + ".claude/", + ".vscode/", + ".cursor/", + ".devcontainer/", + ".pi/", + ".github/workflows/", + ".github/actions/", +] + class REACTIONS: ok: str = "+1" diff --git a/webhook_server/utils/helpers.py b/webhook_server/utils/helpers.py index dc881e9a7..f3fdac108 100644 --- a/webhook_server/utils/helpers.py +++ b/webhook_server/utils/helpers.py @@ -506,7 +506,7 @@ def get_api_with_highest_rate_limit(config: Config, repository_name: str = "") - _rate_limit = _api.get_rate_limit() log_rate_limit(rate_limit=_rate_limit, api_user=_api_user) - logger.info(f"API user {_api_user} selected (single token configured)") + logger.info(f"API user {_api_user} selected (single API configured)") return _api, _token, _api_user for _api, _token in apis_and_tokens: