From 502beaad11c3dde0e61e4f48283c4877dfacd826 Mon Sep 17 00:00:00 2001 From: chenbei Date: Wed, 20 May 2026 16:05:42 +0800 Subject: [PATCH 1/2] add spacy model default config --- tools/security_audit/default.yaml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tools/security_audit/default.yaml b/tools/security_audit/default.yaml index c78918b..1b47c5d 100644 --- a/tools/security_audit/default.yaml +++ b/tools/security_audit/default.yaml @@ -23,6 +23,10 @@ checkers: # - DPOLabelFlipLLMJudge # - name: JailbreakLLMJudge # enabled: false + - name: PIINERDetector + enabled: false + params: + language: en - name: JailbreakClassifier enabled: false params: From 5cb02421033edec74a97a00839b2dbb727042598 Mon Sep 17 00:00:00 2001 From: chenbei Date: Wed, 20 May 2026 17:13:16 +0800 Subject: [PATCH 2/2] add resource-tier checker preflight --- tools/security_audit/config.py | 81 ++++++++--------- tools/security_audit/policy.py | 160 ++++++++++++++++++++++++--------- tools/security_audit/tool.py | 75 +++++++++++----- 3 files changed, 208 insertions(+), 108 deletions(-) diff --git a/tools/security_audit/config.py b/tools/security_audit/config.py index 62474d7..44c6908 100644 --- a/tools/security_audit/config.py +++ b/tools/security_audit/config.py @@ -1,40 +1,41 @@ -from typing import Dict, List, Optional - -from pydantic import BaseModel - - -class LLMConfig(BaseModel): - """LLM service config for LLM-as-Judge checkers.""" - model: str = "" - api_key: str = "" - api_url: str = "" - temperature: float = 0.0 - max_tokens: int = 2048 - - -class CheckerConfig(BaseModel): - """Config for a single checker.""" - name: str - enabled: bool = True - params: Dict = {} - - -class ExecutorConfig(BaseModel): - """Executor engine config.""" - max_workers: int = 4 - batch_size: int = 100 - start_index: int = 0 - end_index: int = -1 # -1 means process all - - -class AuditConfig(BaseModel): - """Top-level audit config.""" - task_name: str = "security_audit" - output_path: str = "outputs/" - log_level: str = "INFO" - - executor: ExecutorConfig = ExecutorConfig() - llm: Optional[LLMConfig] = None # llm model name (llm-based checkers required) - models: Dict[str, str] = {} # model name or path (model-based checkers required) - checkers: List[CheckerConfig] = [] - checker_tags: List[str] = [] +from typing import Dict, List, Optional + +from pydantic import BaseModel + + +class LLMConfig(BaseModel): + """LLM service config for LLM-as-Judge checkers.""" + model: str = "" + api_key: str = "" + api_url: str = "" + temperature: float = 0.0 + max_tokens: int = 2048 + + +class CheckerConfig(BaseModel): + """Config for a single checker.""" + name: str + enabled: bool = True + params: Dict = {} + selection_source: str = "config" # explicit | config | auto + + +class ExecutorConfig(BaseModel): + """Executor engine config.""" + max_workers: int = 4 + batch_size: int = 100 + start_index: int = 0 + end_index: int = -1 # -1 means process all + + +class AuditConfig(BaseModel): + """Top-level audit config.""" + task_name: str = "security_audit" + output_path: str = "outputs/" + log_level: str = "INFO" + + executor: ExecutorConfig = ExecutorConfig() + llm: Optional[LLMConfig] = None # llm model name (llm-based checkers required) + models: Dict[str, str] = {} # model name or path (model-based checkers required) + checkers: List[CheckerConfig] = [] + checker_tags: List[str] = [] diff --git a/tools/security_audit/policy.py b/tools/security_audit/policy.py index c7683d9..ebedd64 100644 --- a/tools/security_audit/policy.py +++ b/tools/security_audit/policy.py @@ -8,18 +8,64 @@ from .config import CheckerConfig -_MODEL_PATH_KEYS = { - "HarmfulContentClassifier": "harmful_content_classifier", - "JailbreakClassifier": "jailbreak_classifier", - "PromptInjectionClassifier": "prompt_injection_classifier", - "BiasClassifier": "bias_classifier", +_RULE_BASED_CHECKERS = [ + "PIIRule", + "SecretRule", + "ToxicityKeywordRule", + "HarmfulKeywordRule", + "BiasKeywordRule", +] + +_LLM_JUDGE_CHECKERS = [ + "HarmfulContentLLMJudge", + "BiasLLMJudge", + "ToxicityLLMJudge", + "PIILLMJudge", + "SycophancyLLMJudge", + "PromptInjectionLLMJudge", + "JailbreakLLMJudge", + "FactualInconsistancyLLMJudge", + "SelfContradictionLLMJudge", + "InstructionMismatchLLMJudge", + "DPOLabelFlipLLMJudge", +] + +_STANDARD_MODEL_CHECKERS = [ + "PIINERDetector", +] + +_HEAVY_MODEL_CHECKERS = [ + "HarmfulContentClassifier", + "ToxicityClassifier", + "BiasClassifier", + "JailbreakClassifier", + "PromptInjectionClassifier", + "GraCeFulBackdoorDefender", +] + +_HEAVY_CHECKERS = set(_HEAVY_MODEL_CHECKERS) + +_RESOURCE_TIER_ORDER = {"light": 0, "standard": 1, "full": 2} + +_CHECKER_MIN_RESOURCE_TIERS = { + **{name: "light" for name in _RULE_BASED_CHECKERS}, + **{name: "standard" for name in _LLM_JUDGE_CHECKERS}, + **{name: "standard" for name in _STANDARD_MODEL_CHECKERS}, + **{name: "full" for name in _HEAVY_MODEL_CHECKERS}, +} + +_LOCAL_MODEL_PATH_CHECKERS = { + "HarmfulContentClassifier", + "JailbreakClassifier", + "PromptInjectionClassifier", + "BiasClassifier", } + def validate_selected_checkers( *, checker_configs: list[CheckerConfig], - tool_defaults: dict[str, Any], runtime_policy: RuntimePolicy, context_config: dict[str, Any], ) -> list[PreflightIssue]: @@ -27,16 +73,19 @@ def validate_selected_checkers( for checker_config in checker_configs: if not checker_config.enabled: continue - issues.extend(validate_checker_network_availability( + resource_issues = validate_checker_resource_tier_availability( checker_config=checker_config, - tool_defaults=tool_defaults, runtime_policy=runtime_policy, - context_config=context_config, - )) - issues.extend(validate_checker_resource_tier_availability( + ) + issues.extend(resource_issues) + if not checker_config.enabled: + continue + if any(issue.level == "error" for issue in resource_issues): + continue + issues.extend(validate_checker_network_availability( checker_config=checker_config, - tool_defaults=tool_defaults, runtime_policy=runtime_policy, + context_config=context_config, )) return issues @@ -44,7 +93,6 @@ def validate_selected_checkers( def validate_checker_network_availability( *, checker_config: CheckerConfig, - tool_defaults: dict[str, Any], runtime_policy: RuntimePolicy, context_config: dict[str, Any], ) -> list[PreflightIssue]: @@ -66,9 +114,8 @@ def validate_checker_network_availability( ), )] - if name in _MODEL_PATH_KEYS: - model_key = _MODEL_PATH_KEYS[name] - model_path = _resolve_model_path(checker_config, tool_defaults, model_key) + if name in _LOCAL_MODEL_PATH_CHECKERS: + model_path = _resolve_model_path(checker_config) if not model_path: return [PreflightIssue( level="error", @@ -76,8 +123,8 @@ def validate_checker_network_availability( checker_name=name, message=( "Offline model-based checker requires a local model path. " - f"Set tool_defaults.security_audit.models.{model_key} or pass " - "checker params.model_name_or_path." + "Set checker params.model_name_or_path in " + "tools/security_audit/default.yaml." ), )] if not _path_exists(model_path): @@ -118,27 +165,60 @@ def validate_checker_network_availability( def validate_checker_resource_tier_availability( *, checker_config: CheckerConfig, - tool_defaults: dict[str, Any], runtime_policy: RuntimePolicy, ) -> list[PreflightIssue]: - # TODO: (resource_tier) Intern-owned implementation. Keep this flexible: - # define checker min_tier metadata and default checker pools for - # light/standard/full. Keep provenance simple for now: this layer validates - # the final selected checkers rather than tracking whether they came from - # user text, generated DSL, or config defaults. - return [] + + name = checker_config.name + required_tier = _CHECKER_MIN_RESOURCE_TIERS.get(name) + if required_tier is None: + return [] + + current_tier = runtime_policy.resource_tier + current_rank = _RESOURCE_TIER_ORDER.get(current_tier) + required_rank = _RESOURCE_TIER_ORDER[required_tier] + if current_rank is None or current_rank >= required_rank: + return [] + + source = getattr(checker_config, "selection_source", "config") + if source != "explicit": + checker_config.enabled = False + return [PreflightIssue( + level="warning", + code="checker_filtered_by_resource_tier", + checker_name=name, + message=( + f"Checker `{name}` requires deployment.resource_tier >= {required_tier!r}, " + f"but current resource_tier is {current_tier!r}; " + f"it was disabled from the {source} checker selection." + ), + )] + + return [PreflightIssue( + level="error", + code="checker_resource_tier_too_low", + checker_name=name, + message=( + f"Checker `{name}` requires deployment.resource_tier >= {required_tier!r}, " + f"but current resource_tier is {current_tier!r}." + ), + )] + def resolve_default_checkers_for_resource_tier(resource_tier: str) -> list[str]: - # TODO: (resource_tier) Replace this placeholder with light/standard/full - # default checker sets and, later, funnel-routing strategy selection. - return [ - "PIIRule", - "SecretRule", - "ToxicityKeywordRule", - "HarmfulKeywordRule", - "BiasKeywordRule", - ] + normalized = (resource_tier or "light").strip().lower() + if normalized == "standard": + return [ + *_STANDARD_MODEL_CHECKERS, + *_LLM_JUDGE_CHECKERS, + ] + if normalized == "full": + return [ + *_STANDARD_MODEL_CHECKERS, + *_LLM_JUDGE_CHECKERS, + *_HEAVY_MODEL_CHECKERS, + ] + return list(_RULE_BASED_CHECKERS) def _has_local_llm_config(context_config: dict[str, Any]) -> bool: @@ -150,17 +230,9 @@ def _has_local_llm_config(context_config: dict[str, Any]) -> bool: ) -def _resolve_model_path( - checker_config: CheckerConfig, - tool_defaults: dict[str, Any], - model_key: str, -) -> str | None: +def _resolve_model_path(checker_config: CheckerConfig) -> str | None: explicit = checker_config.params.get("model_name_or_path") - if explicit: - return str(explicit) - models = tool_defaults.get("models") if isinstance(tool_defaults.get("models"), dict) else {} - value = models.get(model_key) - return str(value) if value else None + return str(explicit) if explicit else None def _path_exists(value: str) -> bool: diff --git a/tools/security_audit/tool.py b/tools/security_audit/tool.py index e4effde..0e8e4ea 100644 --- a/tools/security_audit/tool.py +++ b/tools/security_audit/tool.py @@ -16,7 +16,7 @@ from .config import AuditConfig, CheckerConfig, ExecutorConfig, LLMConfig from .executor import Executor from .loader import load_samples -from .policy import validate_selected_checkers +from .policy import resolve_default_checkers_for_resource_tier, validate_selected_checkers _DEFAULT_RISK_WEIGHTS = { @@ -44,29 +44,53 @@ def _get_tool_defaults(context_config: dict) -> dict: return security_defaults if isinstance(security_defaults, dict) else {} -def _resolve_checker_configs(kwargs: dict, tool_defaults: dict) -> list[CheckerConfig]: +def _load_default_checker_configs(tool_defaults: dict) -> list[CheckerConfig]: + raw = tool_defaults.get("checkers") + if not isinstance(raw, list): + return [] + + configs: list[CheckerConfig] = [] + for item in raw: + if isinstance(item, str): + configs.append(CheckerConfig(name=item, selection_source="config")) + elif isinstance(item, dict) and "name" in item: + configs.append(CheckerConfig(**{**item, "selection_source": "config"})) + return configs + + +def _resolve_checker_configs(kwargs: dict, tool_defaults: dict, resource_tier: str) -> list[CheckerConfig]: """Resolve CheckerConfig list. Priority: - 1. ``checkers`` list in tool_defaults (from default.yaml). - 2. Built-in fallback: [PIIRule]. + 1. Explicit ``checker_names`` from tool call. These force-enable the + named checkers while inheriting same-name params from default.yaml. + 2. Enabled ``checkers`` list in tool_defaults (from default.yaml). + 3. Resource-tier default checkers. """ - names = kwargs.get("checker_names") - if names: # non-empty list → explicit override - return [CheckerConfig(name=n) for n in names] + default_configs = _load_default_checker_configs(tool_defaults) + defaults_by_name = {config.name: config for config in default_configs} - raw = tool_defaults.get("checkers") - if raw and isinstance(raw, list): + names = kwargs.get("checker_names") + if names: configs = [] - for item in raw: - if isinstance(item, str): - configs.append(CheckerConfig(name=item)) - elif isinstance(item, dict) and "name" in item: - configs.append(CheckerConfig(**item)) - if configs: - return configs - - return [CheckerConfig(name="PIIRule")] + for name in names: + default_config = defaults_by_name.get(name) + params = dict(default_config.params) if default_config else {} + configs.append(CheckerConfig( + name=name, + enabled=True, + params=params, + selection_source="explicit", + )) + return configs + + if default_configs: + return default_configs + + return [ + CheckerConfig(name=name, selection_source="auto") + for name in resolve_default_checkers_for_resource_tier(resource_tier) + ] def _calc_security_score(risk_distribution: dict, risk_weights: dict) -> float: @@ -137,30 +161,33 @@ def run(self, context: ToolContext, **kwargs: Any) -> dict[str, Any]: max_workers: int = kwargs.get("max_workers", 4) tool_defaults = _get_tool_defaults(context.config) - checker_configs = _resolve_checker_configs(kwargs, tool_defaults) - checker_names = [c.name for c in checker_configs if c.enabled] runtime_policy = build_runtime_policy(context.config) - + checker_configs = _resolve_checker_configs( + kwargs, + tool_defaults, + runtime_policy.resource_tier, + ) handle_preflight_issues( validate_selected_checkers( checker_configs=checker_configs, - tool_defaults=tool_defaults, runtime_policy=runtime_policy, context_config=context.config, ), strict=runtime_policy.strict_preflight, logger=context.logger, ) + checker_names = [c.name for c in checker_configs if c.enabled] if kwargs.get("checker_names"): context.log(f"SecurityAuditTool: using user-specified checkers: {checker_names}") - if tool_defaults.get("checkers"): + elif tool_defaults.get("checkers"): context.log( f"SecurityAuditTool: using checkers from default.yaml: {checker_names}. " ) else: context.log( - f"SecurityAuditTool: no checkers configured, using built-in default: {checker_names}" + f"SecurityAuditTool: no checkers configured, using " + f"{runtime_policy.resource_tier} resource-tier defaults: {checker_names}" ) context.log(f"SecurityAuditTool: {len(data)} records, checkers={checker_names}")