diff --git a/skills/agent-risk-firewall/.claude-plugin/plugin.json b/skills/agent-risk-firewall/.claude-plugin/plugin.json new file mode 100644 index 000000000..ca82547a8 --- /dev/null +++ b/skills/agent-risk-firewall/.claude-plugin/plugin.json @@ -0,0 +1,17 @@ +{ + "name": "agent-risk-firewall", + "description": "Pre-trade risk firewall for Agentic Wallet swaps on X Layer and Solana", + "version": "1.2.0", + "author": { + "name": "Agent Risk Firewall Contributors" + }, + "license": "MIT", + "keywords": [ + "security", + "risk", + "agentic-wallet", + "xlayer", + "solana", + "trading" + ] +} diff --git a/skills/agent-risk-firewall/.gitignore b/skills/agent-risk-firewall/.gitignore new file mode 100644 index 000000000..0121a7f7f --- /dev/null +++ b/skills/agent-risk-firewall/.gitignore @@ -0,0 +1,10 @@ +.pytest_cache/ +__pycache__/ +*.py[cod] +*.egg-info/ +build/ +dist/ +.venv/ +venv/ +.env +.env.* diff --git a/skills/agent-risk-firewall/LICENSE b/skills/agent-risk-firewall/LICENSE new file mode 100644 index 000000000..d06d3ab7f --- /dev/null +++ b/skills/agent-risk-firewall/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2026 Agent Risk Firewall Contributors + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/skills/agent-risk-firewall/README.md b/skills/agent-risk-firewall/README.md new file mode 100644 index 000000000..f6e169384 --- /dev/null +++ b/skills/agent-risk-firewall/README.md @@ -0,0 +1,197 @@ +# Agent Risk Firewall + +Pre-trade risk firewall for OKX Agentic Wallet swaps on X Layer and Solana. + +This directory contains the OKX Plugin Store package for `agent-risk-firewall`: a Skill + Python CLI that helps an AI agent evaluate a proposed swap, token trade, or approval before asking the user to sign. + +The plugin returns a deterministic JSON verdict: + +- `allow`: continue the normal signing flow if the user already requested it +- `warn`: show the warning reasons and require explicit confirmation +- `block`: cancel the operation and do not request a signature + +## Safety Boundary + +Agent Risk Firewall does not sign transactions, broadcast transactions, execute swaps, revoke approvals, or handle private keys. It only evaluates intent, quote, token context, and optional transaction context. + +If OKX OnchainOS scan or simulation data is unavailable, the plugin treats verification as incomplete. In `balanced`, `competition`, and `degen-small-size`, incomplete verification returns at least `warn`; in `strict`, it returns `block`. + +## What It Checks + +- X Layer and Solana address/chain compatibility +- Token risk from OKX OnchainOS token scan +- Transaction risk from OKX OnchainOS tx-scan +- Simulation failure or revert from OKX OnchainOS gateway simulation +- Slippage and price impact thresholds +- Per-trade and wallet-exposure caps +- Low liquidity when liquidity data is available +- Optional external evidence from GoPlus, Birdeye, and RootData +- Approval spender and unlimited allowance risk +- Competition context: active status, join status, supported chains, participation thresholds, and token-pair eligibility +- Audit trail fields: `decisionId`, `policyVersion`, `evidenceHash` + +## Commands + +```bash +agent-risk-firewall check --input request.json --format json +agent-risk-firewall policy --profile balanced +agent-risk-firewall policy --profile strict +agent-risk-firewall policy --profile competition +agent-risk-firewall policy --profile degen-small-size +agent-risk-firewall self-test +``` + +Use `--input -` to read JSON from stdin. + +## Input Contract + +```json +{ + "chain": "xlayer", + "operation": "swap", + "walletAddress": "0x0000000000000000000000000000000000000000", + "tokenIn": { + "address": "0x0000000000000000000000000000000000000001", + "symbol": "USDC", + "decimals": 6 + }, + "tokenOut": { + "address": "0x0000000000000000000000000000000000000002", + "symbol": "TOKEN", + "decimals": 18 + }, + "amountIn": "100", + "amountInUsd": 100, + "quote": { + "expectedOut": "12345", + "slippagePct": 1, + "priceImpactPct": 0.8, + "route": ["OKX DEX"], + "venue": "okx-dex" + }, + "tx": { + "from": "0x0000000000000000000000000000000000000000", + "to": "0x0000000000000000000000000000000000000003", + "data": "0x", + "value": "0" + }, + "approval": { + "spender": "0x0000000000000000000000000000000000000004", + "spenderType": "contract", + "isUnlimited": false + }, + "externalEvidence": { + "goplus": {}, + "birdeye": {}, + "rootdata": {} + }, + "competition": { + "activityName": "Selected Agentic Trading competition", + "active": true, + "joined": true, + "supportedChains": ["xlayer", "solana"], + "primaryChain": "xlayer", + "minParticipationUsd": 25, + "minLeaderboardUsd": 100, + "minWalletBalanceUsd": 100, + "eligibleTokenTradeRequired": true, + "disallowedPairClasses": ["stable-stable", "stable-native", "native-native", "native-wrapped-native"] + }, + "policyProfile": "competition" +} +``` + +Supported values: + +- `chain`: `xlayer`, `solana` +- `operation`: `buy`, `sell`, `swap`, `approval` +- `policyProfile`: `balanced`, `strict`, `competition`, `degen-small-size` + +## Policy Profiles + +| Profile | Use case | Key behavior | +| --- | --- | --- | +| `balanced` | Default retail agentic trading guardrail | Blocks critical risk and warns on elevated risk. | +| `strict` | High-safety mode | Blocks unavailable scans, token `HIGH`, EOA spenders, and tighter slippage/price-impact limits. | +| `competition` | OKX Agentic Trading style workflows | Requires competition preflight context, blocks inactive or unsupported-chain trades, warns if not joined or below thresholds, and blocks stablecoin/native-only pairs. | +| `degen-small-size` | Small meme-token experiments | Higher slippage tolerance with a hard 25 USD trade cap. | + +## Competition Mode Enhancer + +Use `policyProfile: "competition"` when an agent is trading for an OKX Agentic Trading competition. The agent should fetch competition detail and user-status first, then pass the normalized result as `competition`. + +Competition-specific outcomes: + +| Signal | Result | +| --- | --- | +| Missing competition context | `warn` | +| Competition inactive or ended | `block` | +| Requested chain not supported by the competition context | `block` | +| Wallet not joined or join status unknown | `warn` | +| Trade below participation, leaderboard, or minimum balance thresholds | `warn` | +| Stablecoin/native-only pair such as USDC-USDT, OKB-USDT, SOL-WSOL | `block` | +| Real token trade with active, joined, supported competition context | may `allow` if all other checks pass | + +Internal competition IDs may be used in tool-to-tool context, but should not be rendered in user-facing messages. + +## Balanced Policy + +| Signal | Result | +| --- | --- | +| Token `CRITICAL` on buy/swap | `block` | +| Token `CRITICAL` on sell | `warn` | +| Token `HIGH` or `MEDIUM` | `warn` | +| OKX tx-scan `block` | `block` | +| OKX tx-scan `warn` | `warn` | +| Simulation revert/failure | `block` | +| Scan or simulation timeout/unavailable | `warn` | +| Slippage `2%` to `5%` | `warn` | +| Slippage above `5%` | `block` | +| Price impact `3%` to `8%` | `warn` | +| Price impact above `8%` | `block` | +| Trade above `250 USD` | `block` | +| Trade above `10%` of wallet value, when wallet value is provided | `block` | + +## Local Test + +From the repository root: + +```powershell +$env:PYTHONDONTWRITEBYTECODE = "1" +python -m pytest .\skills\agent-risk-firewall\tests -q -p no:cacheprovider +& "$env:USERPROFILE\.local\bin\plugin-store.exe" lint .\skills\agent-risk-firewall +``` + +Expected results: + +```text +tests passed +Plugin 'agent-risk-firewall' passed all checks +``` + +## Agent Integration Pattern + +```text +1. Agent receives a natural-language trade intent. +2. Agent gets quote or unsigned transaction context from OKX OnchainOS. +3. Agent runs agent-risk-firewall check. +4. If allow, continue only if the user requested execution. +5. If warn, show reasons and require explicit confirmation. +6. If block, cancel and do not ask the user to sign. +``` + +Compatible strategy pattern: + +```text +xlayer-alpha-hunter -> unsigned swap -> agent-risk-firewall -> execute only if allowed +smart-tradex -> quote/tx context -> agent-risk-firewall -> warn/block gate +otto-alpha-sniper -> external evidence + tx context -> agent-risk-firewall -> final confirmation +``` + +## Disclaimer + +This plugin is a defensive guardrail, not a guarantee of safety. On-chain data, scan results, and simulations can be incomplete, stale, or wrong. Trading and DeFi activity can cause loss of funds. Use dry-run mode first and start with small amounts. + +## License + +MIT diff --git a/skills/agent-risk-firewall/SKILL.md b/skills/agent-risk-firewall/SKILL.md new file mode 100644 index 000000000..cfdd1d09a --- /dev/null +++ b/skills/agent-risk-firewall/SKILL.md @@ -0,0 +1,271 @@ +--- +name: agent-risk-firewall +description: "Pre-trade risk firewall for Agentic Wallet swaps on X Layer and Solana" +version: "1.2.0" +author: "Agent Risk Firewall Contributors" +tags: + - security + - risk + - agentic-wallet + - xlayer + - solana + - trading +--- + +# Agent Risk Firewall + +## Overview + +Agent Risk Firewall is a pre-trade guardian for Agentic Wallet workflows. Use it before any X Layer or Solana swap, token buy, token sell, or approval when an agent has quote or transaction context and is about to ask the user to sign. + +The firewall does not sign transactions, broadcast transactions, revoke approvals, or execute swaps. It normalizes the proposed operation, calls OKX OnchainOS security and simulation commands when available, accepts optional external evidence from other plugins, applies a deterministic policy profile, and returns `allow`, `warn`, or `block`. + +## Pre-flight Checks + +Before using this skill, ensure: + +1. Python 3.8+ is available. +2. The `agent-risk-firewall` command is installed from this plugin. +3. For live OKX checks, install OnchainOS skills with `npx skills add okx/onchainos-skills`. +4. For production usage, configure personal OKX credentials through the OnchainOS environment variables. Never commit `.env` files or API keys. + +If `onchainos` is not installed or a scan fails, treat that as incomplete verification. In `balanced`, `competition`, and `degen-small-size`, unavailable scan data returns at least `warn`. In `strict`, unavailable scan data returns `block`. + +## Commands + +### Check a proposed trade or approval + +```bash +agent-risk-firewall check --input request.json --format json +``` + +**When to use**: Run this before any X Layer or Solana swap, token buy, token sell, or approval when the agent has quote or transaction context and is about to request a signature. + +**Output**: JSON containing `verdict`, `riskScore`, `requiresUserConfirmation`, `reasons`, normalized `evidence`, `audit`, and `safeNextStep`. + +**Example**: + +```bash +agent-risk-firewall check --input request.json --format json +``` + +Use `--input -` to read JSON from stdin. + +Required input shape: + +```json +{ + "chain": "xlayer", + "operation": "swap", + "walletAddress": "0x0000000000000000000000000000000000000000", + "tokenIn": { + "address": "0x0000000000000000000000000000000000000001", + "symbol": "USDC", + "decimals": 6 + }, + "tokenOut": { + "address": "0x0000000000000000000000000000000000000002", + "symbol": "TOKEN", + "decimals": 18 + }, + "amountIn": "100", + "amountInUsd": 100, + "walletValueUsd": 1000, + "quote": { + "expectedOut": "12345", + "slippagePct": 1, + "priceImpactPct": 0.8, + "route": ["OKX DEX"], + "venue": "okx-dex" + }, + "tx": { + "from": "0x0000000000000000000000000000000000000000", + "to": "0x0000000000000000000000000000000000000003", + "data": "0x", + "value": "0" + }, + "approval": { + "spender": "0x0000000000000000000000000000000000000004", + "spenderType": "contract", + "isUnlimited": false + }, + "externalEvidence": { + "goplus": {}, + "birdeye": {}, + "rootdata": {} + }, + "competition": { + "activityName": "Selected Agentic Trading competition", + "active": true, + "joined": true, + "supportedChains": ["xlayer", "solana"], + "primaryChain": "xlayer", + "minParticipationUsd": 25, + "minLeaderboardUsd": 100, + "minWalletBalanceUsd": 100, + "eligibleTokenTradeRequired": true, + "disallowedPairClasses": ["stable-stable", "stable-native", "native-native", "native-wrapped-native"] + }, + "policyProfile": "competition" +} +``` + +Output: + +```json +{ + "verdict": "warn", + "riskScore": 60, + "requiresUserConfirmation": true, + "reasons": [ + { + "code": "TOKEN_HIGH", + "severity": "warn", + "message": "Target token has HIGH risk." + } + ], + "evidence": {}, + "audit": { + "decisionId": "arf_0123456789abcdef", + "policyProfile": "balanced", + "policyVersion": "1.2.0", + "evidenceHash": "64-character-sha256" + }, + "safeNextStep": "Show the warning reasons and ask the user for explicit confirmation before signing." +} +``` + +Agent behavior contract: + +- `allow`: the agent may continue the normal signing or broadcast flow only if the user already requested execution. +- `warn`: the agent must show the warning reasons, quote details, and `audit.decisionId`, then require explicit user confirmation before continuing. +- `block`: the agent must stop. It must not ask the user to sign, must not broadcast, and must recommend canceling or revising the operation. + +## Policy Profiles + +Use `policyProfile` to tune the firewall for the agent workflow: + +| Profile | Use case | Key behavior | +|---|---|---| +| `balanced` | Default retail agentic trading guardrail | Blocks critical signals, warns on elevated slippage/price impact, allows normal safe trades. | +| `strict` | High-safety mode for larger or less trusted workflows | Blocks unavailable scans, token `HIGH`, EOA spenders, and tighter slippage/price impact thresholds. | +| `competition` | OKX Agentic Trading style workflows | Requires competition preflight context, warns when registration or thresholds are incomplete, blocks inactive or unsupported-chain trades, and blocks stablecoin/native-only pairs. | +| `degen-small-size` | Small-size meme/token exploration | Allows higher slippage/price impact but caps trade size at 25 USD and 3% wallet exposure. | + +## Competition Mode Enhancer + +For OKX Agentic Trading competition workflows, run the firewall with `policyProfile: "competition"` after the agent has fetched competition detail and user-status. + +The agent should pass a `competition` object with: + +- `active`: whether the competition is active. +- `joined`: whether the wallet has registered. +- `supportedChains`: normalized chains that count for the competition. Until OKX exposes a backend multi-chain field, agents should treat Solana plus the competition primary chain as supported. +- `minParticipationUsd`, `minLeaderboardUsd`, `minWalletBalanceUsd`: thresholds parsed from competition rules when available. +- `eligibleTokenTradeRequired`: `true` when stablecoin/native-only trades should not count as eligible competition trades. +- `disallowedPairClasses`: pair classes such as `stable-stable`, `stable-native`, `native-native`, and `native-wrapped-native`. + +Competition verdict behavior: + +| Signal | Result | +|---|---| +| Missing competition context | `warn` | +| Competition inactive or ended | `block` | +| Requested chain not in `supportedChains` | `block` | +| Wallet not joined, or join status missing | `warn` | +| Trade amount or wallet value below competition thresholds | `warn` | +| Stablecoin/native-only pair | `block` | + +Internal competition IDs can exist in tool context, but do not show them in user-facing messages. + +## External Evidence + +Other plugins can pass optional evidence into `externalEvidence` without this firewall calling third-party APIs directly: + +- `goplus`: token/address security fields such as `riskLevel`, `is_honeypot`, `is_blacklisted`, `buy_tax`, `sell_tax`. +- `birdeye`: liquidity and holder distribution fields such as `liquidityUsd`, `top10HolderPercent`. +- `rootdata`: project intelligence fields such as `riskLevel`, `tags`, `labels`, or `riskTags`. + +Critical external evidence can upgrade a result to `block`; high or incomplete evidence usually upgrades to `warn`. + +## Approval-Specific Checks + +For `operation: "approval"`, include an `approval` object when available: + +```json +{ + "spender": "0x0000000000000000000000000000000000000004", + "spenderType": "eoa", + "isUnlimited": true, + "allowedSpenders": [], + "blockedSpenders": [] +} +``` + +The firewall checks for missing spender, spender address mismatch, explicitly blocked spender, spender not in a provided allowlist, EOA spender, and unlimited allowance. + +## Compatibility Examples + +Trading strategy plugins can use this firewall as a pre-sign middleware: + +```text +xlayer-alpha-hunter -> onchainos swap swap -> agent-risk-firewall check -> user confirmation or cancel -> onchainos swap execute +smart-tradex -> quote/unsigned tx -> agent-risk-firewall check -> allow/warn/block gate +otto-alpha-sniper -> externalEvidence + tx context -> agent-risk-firewall check -> final confirmation +``` + +The strategy plugin keeps its alpha logic. Agent Risk Firewall owns the pre-sign risk gate. + +### Show the active policy + +```bash +agent-risk-firewall policy --profile balanced +``` + +**When to use**: Run this when the user asks what thresholds the firewall applies, or before integrating the firewall into another trading skill. + +**Output**: JSON describing supported chains, max trade size, wallet exposure cap, slippage thresholds, price impact thresholds, and low-liquidity threshold. + +**Example**: + +```bash +agent-risk-firewall policy --profile balanced +agent-risk-firewall policy --profile strict +agent-risk-firewall policy --profile competition +agent-risk-firewall policy --profile degen-small-size +``` + +### Run a local self-test + +```bash +agent-risk-firewall self-test +``` + +**When to use**: Run this after installation or before submitting a PR to verify that the CLI, policy engine, and JSON renderer work without live assets or external scans. + +**Output**: JSON with `status: pass` or `status: fail` plus three fixture verdicts for allow, warn, and block cases. + +**Example**: + +```bash +agent-risk-firewall self-test +``` + +## Error Handling + +| Error | Cause | Resolution | +|-------|-------|------------| +| `INVALID_JSON` | Input is not valid JSON | Fix the input file or stdin payload. | +| `INVALID_INPUT` | Required fields are missing or malformed | Add `chain`, `operation`, `walletAddress`, token context, and quote/tx details. | +| `ONCHAINOS_UNAVAILABLE` | The `onchainos` CLI is not installed | Install `okx/onchainos-skills`, then retry. | +| `SCAN_TIMEOUT` | A live OKX scan timed out | Retry once; if still unavailable, treat the result as incomplete verification. | +| `UNSUPPORTED_CHAIN` | Chain is outside MVP scope | Use `xlayer` or `solana`. | + +## Security Notices + +- This plugin never asks for, stores, or handles private keys or seed phrases. +- This plugin never signs or broadcasts transactions. +- This plugin is a guardrail, not a guarantee of safety. Onchain data, external APIs, and simulations can be incomplete or stale. +- Do not override a `block` verdict. A `block` means the agent must cancel the proposed operation. +- `warn` requires explicit user confirmation before signing. +- Trading and DeFi activity can cause loss of funds. Use dry-run and small amounts first. diff --git a/skills/agent-risk-firewall/SUMMARY.md b/skills/agent-risk-firewall/SUMMARY.md new file mode 100644 index 000000000..ea2ce6531 --- /dev/null +++ b/skills/agent-risk-firewall/SUMMARY.md @@ -0,0 +1,36 @@ +# Agent Risk Firewall + +## Overview + +Agent Risk Firewall is a pre-trade guardrail for Agentic Wallet swaps on X Layer and Solana. It receives a proposed intent, quote, transaction context, approval context, and optional external evidence, runs available OKX OnchainOS checks, applies a deterministic policy profile, and returns `allow`, `warn`, or `block`. + +Core operations: + +- Check proposed swaps, token buys, token sells, and approvals before signing +- Normalize OKX security scan, transaction scan, token report, and simulation evidence +- Aggregate optional GoPlus, Birdeye, and RootData evidence supplied by other plugins +- Check approval spender and unlimited allowance risk +- Apply Competition Mode Enhancer checks for Agentic Trading workflows: active status, join status, supported chains, thresholds, and eligible token pairs +- Return audit fields: `decisionId`, `policyVersion`, and `evidenceHash` +- Return deterministic `allow`, `warn`, or `block` verdicts for agent workflows + +Tags: `security` `risk` `agentic-wallet` `xlayer` `solana` `trading` + +## Prerequisites + +- No IP restrictions enforced by this plugin +- Supported chains: X Layer and Solana +- Supported operations: swaps, token buys, token sells, and approvals +- Python 3.8+ +- Optional for live checks: `npx skills add okx/onchainos-skills` +- Optional for production reliability: personal OKX OnchainOS API credentials configured in the environment + +## Quick Start + +1. **Prepare a request**: Build a JSON payload with `chain`, `operation`, wallet address, token context, quote details, and optional transaction context. +2. **Run the firewall**: Execute `agent-risk-firewall check --input request.json --format json` before asking the user to sign. +3. **Apply the verdict**: Continue on `allow`, ask explicit confirmation on `warn`, and cancel the operation on `block`. +4. **Pick a policy profile**: Use `balanced`, `strict`, `competition`, or `degen-small-size` depending on the workflow. For `competition`, fetch OKX competition detail and user-status first and pass the normalized `competition` context. +5. **Inspect policy or installation**: Use `agent-risk-firewall policy --profile balanced` to view thresholds and `agent-risk-firewall self-test` to verify local installation. + +The plugin does not sign, broadcast, trade, revoke approvals, or handle private keys. diff --git a/skills/agent-risk-firewall/plugin.yaml b/skills/agent-risk-firewall/plugin.yaml new file mode 100644 index 000000000..348997a9e --- /dev/null +++ b/skills/agent-risk-firewall/plugin.yaml @@ -0,0 +1,30 @@ +schema_version: 1 +name: agent-risk-firewall +version: "1.2.0" +description: "Pre-trade risk firewall for Agentic Wallet swaps on X Layer and Solana" +author: + name: "Agent Risk Firewall Contributors" + github: "maixuancanh" +license: MIT +category: security +tags: + - security + - risk + - agentic-wallet + - xlayer + - solana + - trading + +components: + skill: + dir: "." + +build: + lang: python + source_repo: maixuancanh/agent-risk-firewall + source_commit: "21956e4cd1eb5f2a6c7b4b1b70b896f92deee93c" + binary_name: agent-risk-firewall + main: "src/agent_risk_firewall/cli.py" + +api_calls: + - web3.okx.com diff --git a/skills/agent-risk-firewall/pyproject.toml b/skills/agent-risk-firewall/pyproject.toml new file mode 100644 index 000000000..c6d4cc37c --- /dev/null +++ b/skills/agent-risk-firewall/pyproject.toml @@ -0,0 +1,25 @@ +[build-system] +requires = ["setuptools>=68", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "agent-risk-firewall" +version = "1.2.0" +description = "Pre-trade risk firewall for Agentic Wallet swaps on X Layer and Solana" +requires-python = ">=3.8" +license = { text = "MIT" } +authors = [ + { name = "Agent Risk Firewall Contributors" } +] +keywords = ["security", "risk", "agentic-wallet", "xlayer", "solana", "trading"] +dependencies = [] + +[project.scripts] +agent-risk-firewall = "agent_risk_firewall.cli:main" + +[tool.setuptools.packages.find] +where = ["src"] + +[tool.pytest.ini_options] +testpaths = ["tests"] +pythonpath = ["src"] diff --git a/skills/agent-risk-firewall/scripts/clean.ps1 b/skills/agent-risk-firewall/scripts/clean.ps1 new file mode 100644 index 000000000..e29470f8b --- /dev/null +++ b/skills/agent-risk-firewall/scripts/clean.ps1 @@ -0,0 +1,25 @@ +$ErrorActionPreference = "Stop" + +$root = (Resolve-Path (Join-Path $PSScriptRoot "..")).Path +$targets = @() + +$pytestCache = Join-Path $root ".pytest_cache" +if (Test-Path $pytestCache) { + $targets += (Resolve-Path $pytestCache).Path +} + +Get-ChildItem -Path $root -Recurse -Force -Directory -Filter "__pycache__" | ForEach-Object { + $targets += $_.FullName +} + +foreach ($target in $targets) { + if (-not $target.StartsWith($root, [System.StringComparison]::OrdinalIgnoreCase)) { + throw "Refusing to remove outside plugin root: $target" + } +} + +foreach ($target in $targets) { + Remove-Item -Recurse -Force -LiteralPath $target +} + +Write-Output "Removed $($targets.Count) cache directories under $root" diff --git a/skills/agent-risk-firewall/scripts/clean.py b/skills/agent-risk-firewall/scripts/clean.py new file mode 100644 index 000000000..c9f4a9606 --- /dev/null +++ b/skills/agent-risk-firewall/scripts/clean.py @@ -0,0 +1,28 @@ +from pathlib import Path +import shutil + + +ROOT = Path(__file__).resolve().parents[1] + + +def main() -> int: + targets = [] + pytest_cache = ROOT / ".pytest_cache" + if pytest_cache.exists(): + targets.append(pytest_cache) + targets.extend(ROOT.rglob("__pycache__")) + + removed = 0 + for target in targets: + resolved = target.resolve() + if ROOT.resolve() not in (resolved, *resolved.parents): + raise RuntimeError(f"Refusing to remove outside plugin root: {resolved}") + shutil.rmtree(resolved, ignore_errors=True) + removed += 1 + + print(f"Removed {removed} cache directories under {ROOT}") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/skills/agent-risk-firewall/setup.py b/skills/agent-risk-firewall/setup.py new file mode 100644 index 000000000..606849326 --- /dev/null +++ b/skills/agent-risk-firewall/setup.py @@ -0,0 +1,3 @@ +from setuptools import setup + +setup() diff --git a/skills/agent-risk-firewall/src/agent_risk_firewall/__init__.py b/skills/agent-risk-firewall/src/agent_risk_firewall/__init__.py new file mode 100644 index 000000000..c68196d1c --- /dev/null +++ b/skills/agent-risk-firewall/src/agent_risk_firewall/__init__.py @@ -0,0 +1 @@ +__version__ = "1.2.0" diff --git a/skills/agent-risk-firewall/src/agent_risk_firewall/__main__.py b/skills/agent-risk-firewall/src/agent_risk_firewall/__main__.py new file mode 100644 index 000000000..a049ad7ae --- /dev/null +++ b/skills/agent-risk-firewall/src/agent_risk_firewall/__main__.py @@ -0,0 +1,5 @@ +from .cli import main + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/skills/agent-risk-firewall/src/agent_risk_firewall/adapters.py b/skills/agent-risk-firewall/src/agent_risk_firewall/adapters.py new file mode 100644 index 000000000..35590a87c --- /dev/null +++ b/skills/agent-risk-firewall/src/agent_risk_firewall/adapters.py @@ -0,0 +1,174 @@ +import json +import os +import shutil +import subprocess +from typing import Any, Dict, List, Optional + +from .models import target_token + + +class OnchainOSAdapter: + def __init__(self, timeout_seconds: int = 20): + self.timeout_seconds = timeout_seconds + self.binary = shutil.which("onchainos") + + def is_available(self) -> bool: + if os.environ.get("AGENT_RISK_FIREWALL_DISABLE_ONCHAINOS") == "1": + return False + return bool(self.binary) + + def collect(self, context: Dict[str, Any]) -> Dict[str, Any]: + evidence: Dict[str, Any] = {} + token = target_token(context) + token_address = token.get("address") if isinstance(token, dict) else None + tx = context.get("tx") or {} + + if token_address: + evidence["tokenScan"] = self.token_scan(context["chain"], token_address) + evidence["tokenReport"] = self.token_report(context["chain"], token_address) + else: + evidence["tokenScan"] = {"status": "skipped", "reason": "No target token address."} + + if self._has_tx_context(tx): + evidence["txScan"] = self.tx_scan(context) + evidence["simulation"] = self.simulate(context) + else: + evidence["txScan"] = {"status": "skipped", "reason": "No transaction context."} + evidence["simulation"] = {"status": "skipped", "reason": "No transaction context."} + + return evidence + + def token_scan(self, chain: str, address: str) -> Dict[str, Any]: + chain_id = _chain_id(chain) + return self._run_json(["security", "token-scan", "--tokens", f"{chain_id}:{address}"]) + + def token_report(self, chain: str, address: str) -> Dict[str, Any]: + return self._run_json(["token", "liquidity", "--chain", chain, "--address", address]) + + def tx_scan(self, context: Dict[str, Any]) -> Dict[str, Any]: + tx = context.get("tx") or {} + args = ["security", "tx-scan", "--chain", context["chain"]] + self._append_tx_scan_args(args, context["chain"], tx) + return self._run_json(args) + + def simulate(self, context: Dict[str, Any]) -> Dict[str, Any]: + tx = context.get("tx") or {} + args = ["gateway", "simulate", "--chain", context["chain"]] + self._append_simulate_args(args, tx) + return self._run_json(args) + + def _run_json(self, args: List[str]) -> Dict[str, Any]: + if not self.is_available(): + return { + "status": "unavailable", + "code": "ONCHAINOS_UNAVAILABLE", + "message": "The onchainos CLI is not installed or is disabled.", + "command": ["onchainos"] + args, + } + + command = [self.binary or "onchainos"] + args + try: + completed = subprocess.run( + command, + check=False, + capture_output=True, + text=True, + timeout=self.timeout_seconds, + ) + except subprocess.TimeoutExpired: + return { + "status": "timeout", + "code": "SCAN_TIMEOUT", + "message": "OnchainOS command timed out.", + "command": ["onchainos"] + args, + } + except OSError as exc: + return { + "status": "error", + "code": "ONCHAINOS_ERROR", + "message": str(exc), + "command": ["onchainos"] + args, + } + + parsed = _parse_output(completed.stdout) + if completed.returncode != 0: + return { + "status": "error", + "code": "ONCHAINOS_COMMAND_FAILED", + "returnCode": completed.returncode, + "message": (completed.stderr or completed.stdout or "").strip(), + "command": ["onchainos"] + args, + "data": parsed, + } + + return { + "status": "ok", + "command": ["onchainos"] + args, + "data": parsed, + } + + @staticmethod + def _has_tx_context(tx: Dict[str, Any]) -> bool: + return bool(tx.get("to") or tx.get("data") or tx.get("signaturePayload") or tx.get("signedTx")) + + @staticmethod + def _append_tx_scan_args(args: List[str], chain: str, tx: Dict[str, Any]) -> None: + if chain == "solana": + mapping = ( + ("from", "--from"), + ("encoding", "--encoding"), + ("signaturePayload", "--transactions"), + ) + else: + mapping = ( + ("from", "--from"), + ("to", "--to"), + ("data", "--data"), + ("value", "--value"), + ("gas", "--gas"), + ("gasPrice", "--gas-price"), + ) + for source, flag in mapping: + value = tx.get(source) + if value is not None and value != "": + args.extend([flag, str(value)]) + + @staticmethod + def _append_simulate_args(args: List[str], tx: Dict[str, Any]) -> None: + mapping = ( + ("from", "--from"), + ("to", "--to"), + ("data", "--data"), + ("value", "--amount"), + ) + for source, flag in mapping: + value = tx.get(source) + if value is not None and value != "": + args.extend([flag, str(value)]) + + +def _chain_id(chain: str) -> str: + if chain == "xlayer": + return "196" + if chain == "solana": + return "501" + return chain + + +def _parse_output(output: str) -> Any: + text = (output or "").strip() + if not text: + return {} + try: + return json.loads(text) + except json.JSONDecodeError: + pass + + start = text.find("{") + end = text.rfind("}") + if start != -1 and end != -1 and end > start: + try: + return json.loads(text[start : end + 1]) + except json.JSONDecodeError: + pass + return {"raw": text} diff --git a/skills/agent-risk-firewall/src/agent_risk_firewall/cli.py b/skills/agent-risk-firewall/src/agent_risk_firewall/cli.py new file mode 100644 index 000000000..62e59ad3d --- /dev/null +++ b/skills/agent-risk-firewall/src/agent_risk_firewall/cli.py @@ -0,0 +1,154 @@ +import argparse +import sys +from typing import Any, Dict, Optional + +from .adapters import OnchainOSAdapter +from .models import InputError, read_input, validate_check_input +from .policy import evaluate, get_policy +from .render import dumps_json, error_payload + + +def main(argv: Optional[list] = None) -> int: + parser = build_parser() + args = parser.parse_args(argv) + + if args.command == "check": + return _cmd_check(args) + if args.command == "policy": + return _cmd_policy(args) + if args.command == "self-test": + return _cmd_self_test() + + parser.print_help() + return 2 + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + prog="agent-risk-firewall", + description="Pre-trade risk firewall for Agentic Wallet swaps on X Layer and Solana.", + ) + subparsers = parser.add_subparsers(dest="command") + + check = subparsers.add_parser("check", help="Evaluate a proposed trade or approval.") + check.add_argument("--input", required=True, help="Path to input JSON, or '-' for stdin.") + check.add_argument("--format", default="json", choices=["json"], help="Output format.") + + policy = subparsers.add_parser("policy", help="Print the active policy.") + policy.add_argument( + "--profile", + default="balanced", + help="Policy profile: balanced, strict, competition, or degen-small-size.", + ) + + subparsers.add_parser("self-test", help="Run local policy checks without live assets.") + return parser + + +def _cmd_check(args: argparse.Namespace) -> int: + try: + payload = read_input(args.input) + context, validation_findings = validate_check_input(payload) + except OSError as exc: + print(dumps_json(error_payload("INPUT_READ_FAILED", "Could not read input.", [str(exc)]))) + return 2 + except InputError as exc: + print(dumps_json(error_payload(exc.code, exc.message, exc.details))) + return 2 + + adapter = OnchainOSAdapter() + evidence = adapter.collect(context) + result = evaluate(context, evidence, validation_findings, context.get("policyProfile", "balanced")) + print(dumps_json(result)) + return 0 + + +def _cmd_policy(args: argparse.Namespace) -> int: + print(dumps_json(get_policy(args.profile))) + return 0 + + +def _cmd_self_test() -> int: + cases = [_fixture_allow(), _fixture_warn(), _fixture_block()] + results = [] + for name, context, evidence in cases: + result = evaluate(context, evidence, [], "balanced") + results.append({"case": name, "verdict": result["verdict"], "riskScore": result["riskScore"]}) + + passed = ( + results[0]["verdict"] == "allow" + and results[1]["verdict"] == "warn" + and results[2]["verdict"] == "block" + ) + print(dumps_json({"status": "pass" if passed else "fail", "results": results})) + return 0 if passed else 1 + + +def _base_context() -> Dict[str, Any]: + return { + "chain": "xlayer", + "operation": "swap", + "walletAddress": "0x0000000000000000000000000000000000000000", + "tokenIn": { + "address": "0x0000000000000000000000000000000000000001", + "symbol": "USDC", + "decimals": 6, + }, + "tokenOut": { + "address": "0x0000000000000000000000000000000000000002", + "symbol": "TOKEN", + "decimals": 18, + }, + "amountIn": "10", + "amountInUsd": None, + "walletValueUsd": None, + "quote": {"slippagePct": 0.5, "priceImpactPct": 0.4}, + "tx": {}, + "policyProfile": "balanced", + } + + +def _fixture_allow(): + return ( + "allow-low-risk", + _base_context(), + { + "tokenScan": {"status": "ok", "data": {"riskLevel": "LOW"}}, + "tokenReport": {"status": "ok", "data": {"liquidityUsd": 500000}}, + "txScan": {"status": "skipped"}, + "simulation": {"status": "skipped"}, + }, + ) + + +def _fixture_warn(): + context = _base_context() + context["quote"] = {"slippagePct": 4, "priceImpactPct": 1} + return ( + "warn-slippage", + context, + { + "tokenScan": {"status": "ok", "data": {"riskLevel": "LOW"}}, + "tokenReport": {"status": "ok", "data": {"liquidityUsd": 500000}}, + "txScan": {"status": "skipped"}, + "simulation": {"status": "skipped"}, + }, + ) + + +def _fixture_block(): + context = _base_context() + return ( + "block-critical-token", + context, + { + "tokenScan": {"status": "ok", "data": {"riskLevel": "CRITICAL"}}, + "tokenReport": {"status": "ok", "data": {"liquidityUsd": 500000}}, + "txScan": {"status": "skipped"}, + "simulation": {"status": "skipped"}, + }, + ) + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/skills/agent-risk-firewall/src/agent_risk_firewall/models.py b/skills/agent-risk-firewall/src/agent_risk_firewall/models.py new file mode 100644 index 000000000..5b01d9fda --- /dev/null +++ b/skills/agent-risk-firewall/src/agent_risk_firewall/models.py @@ -0,0 +1,203 @@ +import json +import re +import sys +from decimal import Decimal, InvalidOperation +from typing import Any, Dict, List, Optional, Tuple + + +SUPPORTED_CHAINS = {"xlayer", "solana"} +SUPPORTED_OPERATIONS = {"buy", "sell", "swap", "approval"} +EVM_ADDRESS_RE = re.compile(r"^0x[a-fA-F0-9]{40}$") +SOLANA_ADDRESS_RE = re.compile(r"^[1-9A-HJ-NP-Za-km-z]{32,44}$") + + +class InputError(Exception): + def __init__(self, code: str, message: str, details: Optional[List[str]] = None): + super().__init__(message) + self.code = code + self.message = message + self.details = details or [] + + +def read_input(path: str) -> Dict[str, Any]: + if path == "-": + raw = sys.stdin.read() + else: + with open(path, "r", encoding="utf-8") as handle: + raw = handle.read() + try: + payload = json.loads(raw) + except json.JSONDecodeError as exc: + raise InputError("INVALID_JSON", "Input is not valid JSON.", [str(exc)]) + if not isinstance(payload, dict): + raise InputError("INVALID_INPUT", "Input JSON must be an object.") + return payload + + +def normalize_chain(value: Any) -> str: + text = str(value or "").strip().lower().replace("_", "-") + aliases = { + "x-layer": "xlayer", + "x layer": "xlayer", + "196": "xlayer", + "sol": "solana", + "501": "solana", + } + return aliases.get(text, text) + + +def is_evm_address(value: Any) -> bool: + return isinstance(value, str) and bool(EVM_ADDRESS_RE.match(value)) + + +def is_solana_address(value: Any) -> bool: + return isinstance(value, str) and bool(SOLANA_ADDRESS_RE.match(value)) + + +def address_matches_chain(chain: str, value: Any) -> bool: + if not value: + return False + if chain == "xlayer": + return is_evm_address(value) + if chain == "solana": + return is_solana_address(value) + return False + + +def optional_decimal(value: Any) -> Optional[Decimal]: + if value is None or value == "": + return None + try: + return Decimal(str(value)) + except (InvalidOperation, ValueError): + return None + + +def _normalize_token(value: Any) -> Optional[Dict[str, Any]]: + if value is None: + return None + if not isinstance(value, dict): + return {"raw": value} + token = dict(value) + if "symbol" in token and isinstance(token["symbol"], str): + token["symbol"] = token["symbol"].strip() + return token + + +def target_token(context: Dict[str, Any]) -> Optional[Dict[str, Any]]: + operation = context.get("operation") + if operation in ("buy", "swap"): + return context.get("tokenOut") or context.get("tokenIn") + if operation in ("sell", "approval"): + return context.get("tokenIn") or context.get("tokenOut") + return context.get("tokenOut") or context.get("tokenIn") + + +def validate_check_input(payload: Dict[str, Any]) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]: + fatal: List[str] = [] + findings: List[Dict[str, Any]] = [] + + chain = normalize_chain(payload.get("chain")) + if not chain: + fatal.append("Missing required field: chain.") + elif chain not in SUPPORTED_CHAINS: + fatal.append("Unsupported chain. MVP supports only xlayer and solana.") + + operation = str(payload.get("operation") or "").strip().lower() + if not operation: + fatal.append("Missing required field: operation.") + elif operation not in SUPPORTED_OPERATIONS: + fatal.append("Unsupported operation. Use buy, sell, swap, or approval.") + + wallet_address = payload.get("walletAddress") + if not wallet_address: + fatal.append("Missing required field: walletAddress.") + + token_in = _normalize_token(payload.get("tokenIn")) + token_out = _normalize_token(payload.get("tokenOut")) + if token_in is None and token_out is None: + fatal.append("At least one of tokenIn or tokenOut is required.") + + quote = payload.get("quote") if isinstance(payload.get("quote"), dict) else {} + tx = payload.get("tx") if isinstance(payload.get("tx"), dict) else {} + approval = payload.get("approval") if isinstance(payload.get("approval"), dict) else {} + external_evidence = payload.get("externalEvidence") if isinstance(payload.get("externalEvidence"), dict) else {} + competition = payload.get("competition") if isinstance(payload.get("competition"), dict) else {} + + context: Dict[str, Any] = { + "chain": chain, + "operation": operation, + "walletAddress": wallet_address, + "tokenIn": token_in, + "tokenOut": token_out, + "amountIn": payload.get("amountIn"), + "amountInUsd": optional_decimal(payload.get("amountInUsd")), + "walletValueUsd": optional_decimal(payload.get("walletValueUsd")), + "quote": quote, + "tx": tx, + "approval": approval, + "externalEvidence": external_evidence, + "competition": competition, + "policyProfile": str(payload.get("policyProfile") or "balanced").strip().lower(), + } + + if fatal: + raise InputError("INVALID_INPUT", "Input failed required-field validation.", fatal) + + if not address_matches_chain(chain, wallet_address): + findings.append( + { + "code": "ADDRESS_CHAIN_MISMATCH", + "severity": "block", + "score": 95, + "message": "Wallet address format does not match the requested chain.", + } + ) + + for field_name, token in (("tokenIn", token_in), ("tokenOut", token_out)): + if not token or not isinstance(token, dict): + continue + address = token.get("address") + if address and not address_matches_chain(chain, address): + findings.append( + { + "code": "ADDRESS_CHAIN_MISMATCH", + "severity": "block", + "score": 95, + "message": "%s address format does not match the requested chain." % field_name, + } + ) + + tx_from = tx.get("from") + tx_to = tx.get("to") + if tx_from and not address_matches_chain(chain, tx_from): + findings.append( + { + "code": "ADDRESS_CHAIN_MISMATCH", + "severity": "block", + "score": 95, + "message": "Transaction from address format does not match the requested chain.", + } + ) + if tx_to and chain == "xlayer" and not address_matches_chain(chain, tx_to): + findings.append( + { + "code": "ADDRESS_CHAIN_MISMATCH", + "severity": "block", + "score": 95, + "message": "Transaction recipient address format does not match X Layer.", + } + ) + + spender = approval.get("spender") or tx.get("spender") + if spender and chain == "xlayer" and not address_matches_chain(chain, spender): + findings.append( + { + "code": "ADDRESS_CHAIN_MISMATCH", + "severity": "block", + "score": 95, + "message": "Approval spender address format does not match X Layer.", + } + ) + + return context, findings diff --git a/skills/agent-risk-firewall/src/agent_risk_firewall/policy.py b/skills/agent-risk-firewall/src/agent_risk_firewall/policy.py new file mode 100644 index 000000000..03d51a8b7 --- /dev/null +++ b/skills/agent-risk-firewall/src/agent_risk_firewall/policy.py @@ -0,0 +1,1176 @@ +import hashlib +import json +from decimal import Decimal +from typing import Any, Dict, Iterable, List, Optional + +from .models import target_token + + +VERDICT_PRIORITY = {"allow": 0, "warn": 1, "block": 2} +POLICY_VERSION = "1.2.0" +MAX_UINT256 = Decimal(2) ** 256 - Decimal(1) + + +BALANCED_POLICY = { + "profile": "balanced", + "policyVersion": POLICY_VERSION, + "supportedChains": ["xlayer", "solana"], + "maxTradeUsd": 250, + "maxWalletPct": 10, + "slippageWarnPct": 2, + "slippageBlockPct": 5, + "priceImpactWarnPct": 3, + "priceImpactBlockPct": 8, + "lowLiquidityUsd": 10000, + "tokenHighSeverity": "warn", + "tokenMediumSeverity": "warn", + "scanUnavailableSeverity": "warn", + "txScanUnavailableSeverity": "warn", + "simulationUnavailableSeverity": "warn", + "unlimitedApprovalSeverity": "warn", + "spenderEoaSeverity": "warn", + "unknownSpenderSeverity": "warn", + "highTaxWarnPct": 25, + "highTaxBlockPct": 50, + "topHolderWarnPct": 70, + "topHolderBlockPct": 90, + "disallowStableNativeOnlyPair": False, +} + +STRICT_POLICY = dict( + BALANCED_POLICY, + profile="strict", + maxTradeUsd=100, + maxWalletPct=5, + slippageWarnPct=1, + slippageBlockPct=3, + priceImpactWarnPct=2, + priceImpactBlockPct=5, + lowLiquidityUsd=50000, + tokenHighSeverity="block", + scanUnavailableSeverity="block", + txScanUnavailableSeverity="block", + simulationUnavailableSeverity="block", + spenderEoaSeverity="block", + unknownSpenderSeverity="block", + highTaxWarnPct=15, + highTaxBlockPct=35, + topHolderWarnPct=50, + topHolderBlockPct=75, +) + +COMPETITION_POLICY = dict( + BALANCED_POLICY, + profile="competition", + maxTradeUsd=100, + maxWalletPct=10, + slippageWarnPct=1.5, + slippageBlockPct=4, + priceImpactWarnPct=2.5, + priceImpactBlockPct=6, + lowLiquidityUsd=25000, + disallowStableNativeOnlyPair=True, + requireCompetitionContext=True, + disallowedPairClasses=[ + "stable-stable", + "stable-native", + "stable-wrapped-native", + "native-native", + "native-wrapped-native", + "wrapped-native-wrapped-native", + ], +) + +DEGEN_SMALL_SIZE_POLICY = dict( + BALANCED_POLICY, + profile="degen-small-size", + maxTradeUsd=25, + maxWalletPct=3, + slippageWarnPct=5, + slippageBlockPct=12, + priceImpactWarnPct=8, + priceImpactBlockPct=20, + lowLiquidityUsd=2000, + highTaxWarnPct=35, + highTaxBlockPct=65, +) + +POLICIES = { + "balanced": BALANCED_POLICY, + "strict": STRICT_POLICY, + "competition": COMPETITION_POLICY, + "degen-small-size": DEGEN_SMALL_SIZE_POLICY, +} + + +def get_policy(profile: str = "balanced") -> Dict[str, Any]: + normalized = str(profile or "balanced").strip().lower() + if normalized not in POLICIES: + return dict(BALANCED_POLICY, requestedProfile=normalized, warning="Unknown profile; using balanced.") + return dict(POLICIES[normalized]) + + +def evaluate( + context: Dict[str, Any], + evidence: Dict[str, Any], + validation_findings: Optional[List[Dict[str, Any]]] = None, + profile: str = "balanced", +) -> Dict[str, Any]: + policy = get_policy(profile) + evidence_for_output = dict(evidence) + if context.get("externalEvidence"): + evidence_for_output["externalEvidence"] = context.get("externalEvidence") + + reasons: List[Dict[str, Any]] = [] + for finding in validation_findings or []: + _add_reason(reasons, finding) + + _evaluate_profile_rules(context, policy, reasons) + _evaluate_amount_caps(context, policy, reasons) + _evaluate_quote(context, policy, reasons) + _evaluate_token_scan(context, evidence.get("tokenScan") or {}, policy, reasons) + _evaluate_token_report(evidence.get("tokenReport") or {}, policy, reasons) + _evaluate_tx_scan(context, evidence.get("txScan") or {}, policy, reasons) + _evaluate_simulation(context, evidence.get("simulation") or {}, policy, reasons) + _evaluate_approval(context, policy, reasons) + _evaluate_external_evidence(context, policy, reasons) + + verdict = _highest_verdict(reasons) + risk_score = _risk_score(reasons, verdict) + return { + "verdict": verdict, + "riskScore": risk_score, + "requiresUserConfirmation": verdict == "warn", + "reasons": _public_reasons(reasons), + "evidence": evidence_for_output, + "audit": _audit_trail(context, evidence_for_output, policy, verdict, reasons), + "safeNextStep": _safe_next_step(verdict), + } + + +def _evaluate_profile_rules(context: Dict[str, Any], policy: Dict[str, Any], reasons: List[Dict[str, Any]]) -> None: + if policy.get("profile") == "competition": + _evaluate_competition_context(context, policy, reasons) + if not policy.get("disallowStableNativeOnlyPair"): + return + token_in = context.get("tokenIn") or {} + token_out = context.get("tokenOut") or {} + pair_class = _pair_class(token_in, token_out) + disallowed = _disallowed_pair_classes(context, policy) + if pair_class in disallowed: + _add_reason( + reasons, + { + "code": "COMPETITION_PAIR_NOT_ELIGIBLE", + "severity": "block", + "score": 87, + "message": "Competition profile blocks stablecoin/native-only pairs; use a real token trade.", + }, + ) + + +def _evaluate_competition_context(context: Dict[str, Any], policy: Dict[str, Any], reasons: List[Dict[str, Any]]) -> None: + competition = context.get("competition") or {} + if not competition: + if policy.get("requireCompetitionContext"): + _add_reason( + reasons, + { + "code": "COMPETITION_CONTEXT_MISSING", + "severity": "warn", + "score": 56, + "message": "Competition profile needs competition detail and user-status context before trading.", + }, + ) + return + + active = _competition_active(competition) + if active is False: + _add_reason( + reasons, + { + "code": "COMPETITION_INACTIVE", + "severity": "block", + "score": 90, + "message": "The selected competition is not active.", + }, + ) + elif active is None: + _add_reason( + reasons, + { + "code": "COMPETITION_STATUS_UNKNOWN", + "severity": "warn", + "score": 57, + "message": "Competition active status is missing; fetch competition detail before trading.", + }, + ) + + joined = _competition_joined(competition) + if joined is False: + _add_reason( + reasons, + { + "code": "COMPETITION_NOT_JOINED", + "severity": "warn", + "score": 59, + "message": "Wallet is not registered for the selected competition; trade may not count.", + }, + ) + elif joined is None: + _add_reason( + reasons, + { + "code": "COMPETITION_JOIN_STATUS_UNKNOWN", + "severity": "warn", + "score": 56, + "message": "Competition join status is missing; fetch user-status before trading.", + }, + ) + + supported_chains = _competition_supported_chains(competition) + chain = str(context.get("chain") or "").strip().lower() + if not supported_chains: + _add_reason( + reasons, + { + "code": "COMPETITION_CHAIN_CONTEXT_UNKNOWN", + "severity": "warn", + "score": 56, + "message": "Competition supported-chain context is missing; fetch competition detail before trading.", + }, + ) + elif chain not in supported_chains: + _add_reason( + reasons, + { + "code": "COMPETITION_CHAIN_UNSUPPORTED", + "severity": "block", + "score": 89, + "message": "Requested chain is not listed as supported by the competition context.", + }, + ) + + token_in = context.get("tokenIn") or {} + token_out = context.get("tokenOut") or {} + pair_class = _pair_class(token_in, token_out) + if ( + _truthy(competition.get("eligibleTokenTradeRequired")) + and not _pair_has_real_token(token_in, token_out) + and pair_class not in _disallowed_pair_classes(context, policy) + ): + _add_reason( + reasons, + { + "code": "COMPETITION_PAIR_NOT_ELIGIBLE", + "severity": "block", + "score": 87, + "message": "Competition requires a token trade; stablecoin/native-only pairs are not enough.", + }, + ) + + amount_usd = context.get("amountInUsd") + if isinstance(amount_usd, Decimal): + participation_min = _competition_decimal( + competition, + ["minParticipationUsd", "minimumParticipationUsd", "qualifyingVolumeUsd", "minTradeUsd"], + ) + leaderboard_min = _competition_decimal( + competition, + ["minLeaderboardUsd", "minimumLeaderboardUsd", "rankQualifyingVolumeUsd"], + ) + if participation_min is not None and amount_usd < participation_min: + _add_reason( + reasons, + { + "code": "COMPETITION_VOLUME_BELOW_PARTICIPATION_MIN", + "severity": "warn", + "score": 55, + "message": "Trade amount is below the competition participation threshold.", + }, + ) + if leaderboard_min is not None and amount_usd < leaderboard_min: + _add_reason( + reasons, + { + "code": "COMPETITION_VOLUME_BELOW_LEADERBOARD_MIN", + "severity": "warn", + "score": 54, + "message": "Trade amount may be below the competition leaderboard threshold.", + }, + ) + + min_wallet_balance = _competition_decimal(competition, ["minWalletBalanceUsd", "minimumWalletBalanceUsd"]) + wallet_value_usd = context.get("walletValueUsd") or _competition_decimal(competition, ["walletBalanceUsd", "walletValueUsd"]) + if min_wallet_balance is not None and isinstance(wallet_value_usd, Decimal) and wallet_value_usd < min_wallet_balance: + _add_reason( + reasons, + { + "code": "COMPETITION_BALANCE_BELOW_MIN", + "severity": "warn", + "score": 55, + "message": "Wallet value appears below the competition minimum balance threshold.", + }, + ) + + +def _evaluate_amount_caps(context: Dict[str, Any], policy: Dict[str, Any], reasons: List[Dict[str, Any]]) -> None: + amount_usd = context.get("amountInUsd") + wallet_value_usd = context.get("walletValueUsd") + if isinstance(amount_usd, Decimal): + if amount_usd > Decimal(str(policy["maxTradeUsd"])): + _add_reason( + reasons, + { + "code": "TRADE_CAP_EXCEEDED", + "severity": "block", + "score": 88, + "message": "Trade amount exceeds the active policy per-trade cap.", + }, + ) + if isinstance(wallet_value_usd, Decimal) and wallet_value_usd > 0: + pct = (amount_usd / wallet_value_usd) * Decimal("100") + if pct > Decimal(str(policy["maxWalletPct"])): + _add_reason( + reasons, + { + "code": "WALLET_EXPOSURE_HIGH", + "severity": "block", + "score": 86, + "message": "Trade amount exceeds 10% of wallet value.", + }, + ) + + +def _evaluate_quote(context: Dict[str, Any], policy: Dict[str, Any], reasons: List[Dict[str, Any]]) -> None: + quote = context.get("quote") or {} + slippage = _to_decimal(quote.get("slippagePct")) + price_impact = _to_decimal(quote.get("priceImpactPct")) + + if slippage is not None: + if slippage > Decimal(str(policy["slippageBlockPct"])): + _add_reason( + reasons, + { + "code": "SLIPPAGE_HIGH", + "severity": "block", + "score": 84, + "message": "Quoted slippage is above the active policy block threshold.", + }, + ) + elif slippage >= Decimal(str(policy["slippageWarnPct"])): + _add_reason( + reasons, + { + "code": "SLIPPAGE_ELEVATED", + "severity": "warn", + "score": 58, + "message": "Quoted slippage is elevated.", + }, + ) + + if price_impact is not None: + if price_impact > Decimal(str(policy["priceImpactBlockPct"])): + _add_reason( + reasons, + { + "code": "PRICE_IMPACT_HIGH", + "severity": "block", + "score": 85, + "message": "Quoted price impact is above the active policy block threshold.", + }, + ) + elif price_impact >= Decimal(str(policy["priceImpactWarnPct"])): + _add_reason( + reasons, + { + "code": "PRICE_IMPACT_ELEVATED", + "severity": "warn", + "score": 57, + "message": "Quoted price impact is elevated.", + }, + ) + + +def _evaluate_token_scan( + context: Dict[str, Any], + token_scan: Dict[str, Any], + policy: Dict[str, Any], + reasons: List[Dict[str, Any]], +) -> None: + token = target_token(context) + token_needs_scan = bool(isinstance(token, dict) and token.get("address")) + status = token_scan.get("status") + data = token_scan.get("data") if isinstance(token_scan.get("data"), dict) else token_scan + risk_level = str(_dig(data, ["riskLevel", "risk_level", "level"]) or "").upper() + operation = context.get("operation") + + if token_needs_scan and status in ("unavailable", "timeout", "error"): + severity = str(policy.get("scanUnavailableSeverity", "warn")) + _add_reason( + reasons, + { + "code": "SCAN_UNAVAILABLE" if status != "timeout" else "SCAN_TIMEOUT", + "severity": severity, + "score": 62 if severity == "warn" else 82, + "message": "Token security scan did not complete; verification is incomplete.", + }, + ) + return + + if not risk_level: + return + + if risk_level == "CRITICAL": + if operation == "sell": + _add_reason( + reasons, + { + "code": "TOKEN_CRITICAL_SELL", + "severity": "warn", + "score": 70, + "message": "Target token has CRITICAL risk; selling may be an exit but requires confirmation.", + }, + ) + else: + _add_reason( + reasons, + { + "code": "TOKEN_CRITICAL", + "severity": "block", + "score": 96, + "message": "Target token has CRITICAL risk.", + }, + ) + elif risk_level == "HIGH": + severity = str(policy.get("tokenHighSeverity", "warn")) + _add_reason( + reasons, + { + "code": "TOKEN_HIGH", + "severity": severity, + "score": 68 if severity == "warn" else 86, + "message": "Target token has HIGH risk.", + }, + ) + elif risk_level == "MEDIUM": + severity = str(policy.get("tokenMediumSeverity", "warn")) + _add_reason( + reasons, + { + "code": "TOKEN_MEDIUM", + "severity": severity, + "score": 55 if severity == "warn" else 82, + "message": "Target token has MEDIUM risk.", + }, + ) + + +def _evaluate_token_report(token_report: Dict[str, Any], policy: Dict[str, Any], reasons: List[Dict[str, Any]]) -> None: + if token_report.get("status") != "ok": + return + liquidity = _find_numeric(token_report.get("data"), ("liquidityUsd", "liquidityUSD", "liquidity", "totalLiquidity")) + if liquidity is not None and liquidity < Decimal(str(policy["lowLiquidityUsd"])): + _add_reason( + reasons, + { + "code": "LIQUIDITY_LOW", + "severity": "warn", + "score": 56, + "message": "Token liquidity appears low for a retail swap.", + }, + ) + + +def _evaluate_tx_scan( + context: Dict[str, Any], + tx_scan: Dict[str, Any], + policy: Dict[str, Any], + reasons: List[Dict[str, Any]], +) -> None: + if not _has_tx(context): + return + status = tx_scan.get("status") + data = tx_scan.get("data") if isinstance(tx_scan.get("data"), dict) else tx_scan + action = str(_dig(data, ["action", "riskAction", "verdict"]) or "").lower() + + if status in ("unavailable", "timeout", "error"): + severity = str(policy.get("txScanUnavailableSeverity", "warn")) + _add_reason( + reasons, + { + "code": "TX_SCAN_UNAVAILABLE" if status != "timeout" else "TX_SCAN_TIMEOUT", + "severity": severity, + "score": 64 if severity == "warn" else 84, + "message": "Transaction security scan did not complete; verification is incomplete.", + }, + ) + return + + if action == "block": + _add_reason( + reasons, + { + "code": "TX_SCAN_BLOCK", + "severity": "block", + "score": 98, + "message": "OKX transaction scan returned a block action.", + }, + ) + elif action == "warn": + _add_reason( + reasons, + { + "code": "TX_SCAN_WARN", + "severity": "warn", + "score": 72, + "message": "OKX transaction scan returned a warning.", + }, + ) + + +def _evaluate_simulation( + context: Dict[str, Any], + simulation: Dict[str, Any], + policy: Dict[str, Any], + reasons: List[Dict[str, Any]], +) -> None: + if not _has_tx(context): + return + status = simulation.get("status") + data = simulation.get("data") if isinstance(simulation.get("data"), dict) else simulation + + if status in ("unavailable", "timeout", "error"): + severity = str(policy.get("simulationUnavailableSeverity", "warn")) + _add_reason( + reasons, + { + "code": "SIMULATION_UNAVAILABLE" if status != "timeout" else "SIMULATION_TIMEOUT", + "severity": severity, + "score": 63 if severity == "warn" else 83, + "message": "Transaction simulation did not complete; verification is incomplete.", + }, + ) + return + + if _simulation_reverted(data): + _add_reason( + reasons, + { + "code": "SIMULATION_REVERT", + "severity": "block", + "score": 94, + "message": "Transaction simulation indicates the transaction may revert or fail.", + }, + ) + + +def _evaluate_approval(context: Dict[str, Any], policy: Dict[str, Any], reasons: List[Dict[str, Any]]) -> None: + if context.get("operation") != "approval": + return + + approval = context.get("approval") or {} + tx = context.get("tx") or {} + spender = approval.get("spender") or tx.get("spender") + spender_type = str(approval.get("spenderType") or tx.get("spenderType") or "").strip().lower() + amount = approval.get("amount") or approval.get("allowance") or tx.get("allowance") or tx.get("amount") + allowlist = approval.get("allowedSpenders") or approval.get("allowlist") or [] + denylist = approval.get("blockedSpenders") or approval.get("denylist") or [] + + if not spender: + _add_reason( + reasons, + { + "code": "APPROVAL_SPENDER_MISSING", + "severity": str(policy.get("unknownSpenderSeverity", "warn")), + "score": 73, + "message": "Approval spender is missing or could not be identified.", + }, + ) + else: + spender_lc = str(spender).lower() + if _contains_address(denylist, spender_lc): + _add_reason( + reasons, + { + "code": "APPROVAL_SPENDER_BLOCKED", + "severity": "block", + "score": 97, + "message": "Approval spender is explicitly blocked by policy input.", + }, + ) + if allowlist and not _contains_address(allowlist, spender_lc): + _add_reason( + reasons, + { + "code": "APPROVAL_SPENDER_NOT_ALLOWLISTED", + "severity": "warn", + "score": 74, + "message": "Approval spender is not in the provided allowlist.", + }, + ) + + if spender_type in ("eoa", "externally-owned", "externally_owned", "wallet"): + severity = str(policy.get("spenderEoaSeverity", "warn")) + _add_reason( + reasons, + { + "code": "APPROVAL_SPENDER_EOA", + "severity": severity, + "score": 76 if severity == "warn" else 90, + "message": "Approval spender appears to be an externally owned account.", + }, + ) + + if _is_unlimited_approval(approval, amount): + severity = str(policy.get("unlimitedApprovalSeverity", "warn")) + _add_reason( + reasons, + { + "code": "APPROVAL_UNLIMITED", + "severity": severity, + "score": 75 if severity == "warn" else 89, + "message": "Approval grants an unlimited or effectively unlimited allowance.", + }, + ) + + +def _evaluate_external_evidence(context: Dict[str, Any], policy: Dict[str, Any], reasons: List[Dict[str, Any]]) -> None: + external = context.get("externalEvidence") or {} + if not isinstance(external, dict): + return + _evaluate_goplus_evidence(external.get("goplus") or {}, policy, reasons) + _evaluate_birdeye_evidence(external.get("birdeye") or {}, policy, reasons) + _evaluate_rootdata_evidence(external.get("rootdata") or {}, reasons) + + +def _evaluate_goplus_evidence(goplus: Any, policy: Dict[str, Any], reasons: List[Dict[str, Any]]) -> None: + if not isinstance(goplus, dict): + return + risk_level = str(_dig(goplus, ["riskLevel", "risk_level", "level"]) or "").upper() + if risk_level == "CRITICAL": + _add_reason( + reasons, + { + "code": "EXTERNAL_GOPLUS_CRITICAL", + "severity": "block", + "score": 96, + "message": "GoPlus external evidence reports CRITICAL token risk.", + }, + ) + elif risk_level == "HIGH": + severity = str(policy.get("tokenHighSeverity", "warn")) + _add_reason( + reasons, + { + "code": "EXTERNAL_GOPLUS_HIGH", + "severity": severity, + "score": 69 if severity == "warn" else 86, + "message": "GoPlus external evidence reports HIGH token risk.", + }, + ) + + if any(_truthy(_dig(goplus, [key])) for key in ("is_honeypot", "isHoneypot", "honeypot")): + _add_reason( + reasons, + { + "code": "EXTERNAL_GOPLUS_HONEYPOT", + "severity": "block", + "score": 98, + "message": "GoPlus external evidence indicates honeypot risk.", + }, + ) + if any(_truthy(_dig(goplus, [key])) for key in ("is_blacklisted", "blacklisted", "is_malicious", "malicious")): + _add_reason( + reasons, + { + "code": "EXTERNAL_GOPLUS_BLOCKLISTED", + "severity": "block", + "score": 98, + "message": "GoPlus external evidence indicates a malicious or blocklisted token/address.", + }, + ) + + buy_tax = _find_numeric(goplus, ("buy_tax", "buyTax", "buyTaxes")) + sell_tax = _find_numeric(goplus, ("sell_tax", "sellTax", "sellTaxes")) + max_tax = max([tax for tax in (buy_tax, sell_tax) if tax is not None], default=None) + if max_tax is not None: + if max_tax > Decimal(str(policy["highTaxBlockPct"])): + _add_reason( + reasons, + { + "code": "EXTERNAL_TAX_HIGH", + "severity": "block", + "score": 88, + "message": "External evidence reports token tax above the active policy block threshold.", + }, + ) + elif max_tax >= Decimal(str(policy["highTaxWarnPct"])): + _add_reason( + reasons, + { + "code": "EXTERNAL_TAX_ELEVATED", + "severity": "warn", + "score": 67, + "message": "External evidence reports elevated token tax.", + }, + ) + + +def _evaluate_birdeye_evidence(birdeye: Any, policy: Dict[str, Any], reasons: List[Dict[str, Any]]) -> None: + if not isinstance(birdeye, dict): + return + liquidity = _find_numeric(birdeye, ("liquidityUsd", "liquidityUSD", "liquidity", "totalLiquidity")) + if liquidity is not None and liquidity < Decimal(str(policy["lowLiquidityUsd"])): + _add_reason( + reasons, + { + "code": "EXTERNAL_LIQUIDITY_LOW", + "severity": "warn", + "score": 57, + "message": "External Birdeye evidence reports low liquidity.", + }, + ) + + top_holder_pct = _find_numeric( + birdeye, + ("top10HolderPercent", "top10HolderPct", "topHolderPercent", "holderConcentrationPct"), + ) + if top_holder_pct is not None: + if top_holder_pct > Decimal(str(policy["topHolderBlockPct"])): + _add_reason( + reasons, + { + "code": "EXTERNAL_HOLDER_CONCENTRATION_HIGH", + "severity": "block", + "score": 87, + "message": "External evidence reports very high holder concentration.", + }, + ) + elif top_holder_pct >= Decimal(str(policy["topHolderWarnPct"])): + _add_reason( + reasons, + { + "code": "EXTERNAL_HOLDER_CONCENTRATION_ELEVATED", + "severity": "warn", + "score": 66, + "message": "External evidence reports elevated holder concentration.", + }, + ) + + +def _evaluate_rootdata_evidence(rootdata: Any, reasons: List[Dict[str, Any]]) -> None: + if not isinstance(rootdata, dict): + return + risk_level = str(_dig(rootdata, ["riskLevel", "risk_level", "level"]) or "").upper() + tags = _lowered_values(_dig(rootdata, ["tags", "labels", "riskTags"]) or []) + bad_tags = {"scam", "rug", "rugpull", "malicious", "phishing", "exploit", "hack"} + if risk_level == "CRITICAL" or any(tag in bad_tags for tag in tags): + _add_reason( + reasons, + { + "code": "EXTERNAL_ROOTDATA_CRITICAL", + "severity": "block", + "score": 92, + "message": "RootData external evidence reports critical project risk.", + }, + ) + elif risk_level == "HIGH": + _add_reason( + reasons, + { + "code": "EXTERNAL_ROOTDATA_HIGH", + "severity": "warn", + "score": 67, + "message": "RootData external evidence reports high project risk.", + }, + ) + + +def _simulation_reverted(data: Any) -> bool: + if isinstance(data, dict): + explicit = _dig(data, ["success", "succeeded", "isSuccess"]) + if explicit is False: + return True + status = str(_dig(data, ["status", "state", "result"]) or "").lower() + if status in ("failed", "fail", "reverted", "revert", "error"): + return True + if _dig(data, ["revertReason", "revert_reason", "error", "errorMessage"]): + return True + for value in data.values(): + if _simulation_reverted(value): + return True + elif isinstance(data, list): + return any(_simulation_reverted(item) for item in data) + elif isinstance(data, str): + lowered = data.lower() + return "revert" in lowered or "execution failed" in lowered + return False + + +def _has_tx(context: Dict[str, Any]) -> bool: + tx = context.get("tx") or {} + return bool(tx.get("to") or tx.get("data") or tx.get("signedTx") or tx.get("signaturePayload")) + + +def _highest_verdict(reasons: Iterable[Dict[str, Any]]) -> str: + verdict = "allow" + for reason in reasons: + severity = reason.get("severity", "allow") + if VERDICT_PRIORITY.get(severity, 0) > VERDICT_PRIORITY[verdict]: + verdict = severity + return verdict + + +def _risk_score(reasons: List[Dict[str, Any]], verdict: str) -> int: + if not reasons: + return 5 + score = max(int(reason.get("score", 0)) for reason in reasons) + if verdict == "block": + return max(score, 80) + if verdict == "warn": + return max(score, 50) + return min(score, 20) + + +def _safe_next_step(verdict: str) -> str: + if verdict == "block": + return "Cancel the operation. Do not ask the user to sign or broadcast this transaction." + if verdict == "warn": + return "Show the warning reasons and ask the user for explicit confirmation before signing." + return "Proceed with the normal signing or broadcast flow if the user already requested it." + + +def _public_reasons(reasons: List[Dict[str, Any]]) -> List[Dict[str, str]]: + return [ + { + "code": str(reason.get("code")), + "severity": str(reason.get("severity")), + "message": str(reason.get("message")), + } + for reason in reasons + ] + + +def _add_reason(reasons: List[Dict[str, Any]], reason: Dict[str, Any]) -> None: + code = reason.get("code") + if any(existing.get("code") == code and existing.get("message") == reason.get("message") for existing in reasons): + return + reasons.append(reason) + + +def _audit_trail( + context: Dict[str, Any], + evidence: Dict[str, Any], + policy: Dict[str, Any], + verdict: str, + reasons: List[Dict[str, Any]], +) -> Dict[str, str]: + evidence_hash = _sha256(_stable_json(evidence)) + decision_payload = { + "context": _audit_context(context), + "evidenceHash": evidence_hash, + "policyProfile": policy.get("profile"), + "policyVersion": policy.get("policyVersion", POLICY_VERSION), + "verdict": verdict, + "reasonCodes": [str(reason.get("code")) for reason in reasons], + } + return { + "decisionId": "arf_" + _sha256(_stable_json(decision_payload))[:16], + "policyProfile": str(policy.get("profile")), + "policyVersion": str(policy.get("policyVersion", POLICY_VERSION)), + "evidenceHash": evidence_hash, + } + + +def _audit_context(context: Dict[str, Any]) -> Dict[str, Any]: + return { + "chain": context.get("chain"), + "operation": context.get("operation"), + "walletAddress": context.get("walletAddress"), + "tokenIn": context.get("tokenIn"), + "tokenOut": context.get("tokenOut"), + "amountIn": context.get("amountIn"), + "amountInUsd": context.get("amountInUsd"), + "walletValueUsd": context.get("walletValueUsd"), + "quote": context.get("quote"), + "tx": context.get("tx"), + "approval": context.get("approval"), + "externalEvidence": context.get("externalEvidence"), + "competition": context.get("competition"), + } + + +def _stable_json(payload: Any) -> str: + return json.dumps(payload, sort_keys=True, separators=(",", ":"), ensure_ascii=False, default=str) + + +def _sha256(text: str) -> str: + return hashlib.sha256(text.encode("utf-8")).hexdigest() + + +def _is_unlimited_approval(approval: Dict[str, Any], amount: Any) -> bool: + explicit = approval.get("isUnlimited") or approval.get("unlimited") or approval.get("isUnlimitedApproval") + if _truthy(explicit): + return True + if isinstance(amount, str) and amount.strip().lower() in ("unlimited", "infinite", "max", "max_uint256"): + return True + numeric = _to_decimal(amount) + if numeric is None: + return False + return numeric >= MAX_UINT256 + + +def _contains_address(values: Any, target_lower: str) -> bool: + if isinstance(values, str): + values = [values] + if not isinstance(values, list): + return False + return any(str(value).lower() == target_lower for value in values) + + +def _competition_active(competition: Dict[str, Any]) -> Optional[bool]: + if "active" in competition: + return _optional_bool(competition.get("active")) + status = competition.get("activityStatus", competition.get("status")) + if status is None: + return None + text = str(status).strip().lower() + if text in ("3", "active", "live", "running", "open"): + return True + if text in ("4", "ended", "closed", "inactive", "expired", "finished"): + return False + return None + + +def _competition_joined(competition: Dict[str, Any]) -> Optional[bool]: + if "joined" in competition: + return _optional_bool(competition.get("joined")) + user_status = competition.get("userStatus") if isinstance(competition.get("userStatus"), dict) else competition + if "joinStatus" in user_status: + return str(user_status.get("joinStatus")).strip() == "1" + return None + + +def _competition_supported_chains(competition: Dict[str, Any]) -> List[str]: + supported: List[str] = [] + raw_supported = competition.get("supportedChains") + if isinstance(raw_supported, str): + raw_supported = [raw_supported] + if isinstance(raw_supported, list): + supported.extend(_normalize_chain_alias(value) for value in raw_supported) + + primary = ( + competition.get("primaryChain") + or competition.get("chain") + or competition.get("chainName") + or competition.get("chainId") + or competition.get("chainIndex") + ) + normalized_primary = _normalize_chain_alias(primary) + if normalized_primary: + supported.append(normalized_primary) + # OKX Agentic Trading competitions currently count both Solana and the backend primary chain. + supported.append("solana") + + return _unique_chains(supported) + + +def _competition_decimal(competition: Dict[str, Any], keys: Iterable[str]) -> Optional[Decimal]: + for key in keys: + if key in competition: + value = _to_decimal(competition.get(key)) + if value is not None: + return value + return None + + +def _normalize_chain_alias(value: Any) -> str: + text = str(value or "").strip().lower().replace("_", "-") + aliases = { + "x-layer": "xlayer", + "x layer": "xlayer", + "xlayer": "xlayer", + "x layer mainnet": "xlayer", + "196": "xlayer", + "sol": "solana", + "solana": "solana", + "501": "solana", + } + return aliases.get(text, text) + + +def _unique_chains(values: Iterable[str]) -> List[str]: + seen = set() + output = [] + for value in values: + if not value or value in seen: + continue + seen.add(value) + output.append(value) + return output + + +def _optional_bool(value: Any) -> Optional[bool]: + if isinstance(value, bool): + return value + if value is None: + return None + text = str(value).strip().lower() + if text in ("1", "true", "yes", "y", "active", "joined"): + return True + if text in ("0", "false", "no", "n", "inactive", "ended", "closed"): + return False + return None + + +def _disallowed_pair_classes(context: Dict[str, Any], policy: Dict[str, Any]) -> List[str]: + competition = context.get("competition") if isinstance(context.get("competition"), dict) else {} + raw = competition.get("disallowedPairClasses") or policy.get("disallowedPairClasses") or [] + if isinstance(raw, str): + raw = [raw] + if not isinstance(raw, list): + return [] + return [str(value).strip().lower() for value in raw if str(value).strip()] + + +def _pair_class(token_a: Dict[str, Any], token_b: Dict[str, Any]) -> str: + left = _token_class(token_a) + right = _token_class(token_b) + if left == "unknown" or right == "unknown": + return "unknown" + ordered = sorted([left, right], key=lambda value: _pair_sort_order().get(value, 99)) + return ordered[0] + "-" + ordered[1] + + +def _pair_sort_order() -> Dict[str, int]: + return { + "stable": 0, + "native": 1, + "wrapped-native": 2, + "token": 3, + "unknown": 4, + } + + +def _pair_has_real_token(token_a: Dict[str, Any], token_b: Dict[str, Any]) -> bool: + return _token_class(token_a) == "token" or _token_class(token_b) == "token" + + +def _token_class(token: Dict[str, Any]) -> str: + if not isinstance(token, dict): + return "unknown" + symbol = str(token.get("symbol") or "").strip().upper() + address = str(token.get("address") or "").strip() + address_lower = address.lower() + native_addresses = { + "0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee", + "11111111111111111111111111111111", + } + wrapped_native_addresses = { + "So11111111111111111111111111111111111111112", + } + stable_symbols = { + "USDC", + "USDT", + "DAI", + "USDE", + "USDS", + "USDG", + "PYUSD", + "FDUSD", + "TUSD", + "FRAX", + "XLAYER_USDT", + "USD0", + } + native_symbols = {"OKB", "SOL", "ETH", "BNB", "MATIC"} + wrapped_native_symbols = {"WOKB", "WSOL", "WETH", "WBNB", "WMATIC"} + if address_lower in native_addresses: + return "native" + if address in wrapped_native_addresses: + return "wrapped-native" + if symbol in stable_symbols: + return "stable" + if symbol in native_symbols: + return "native" + if symbol in wrapped_native_symbols: + return "wrapped-native" + if symbol or address: + return "token" + return "unknown" + + +def _is_stable_or_native(token: Dict[str, Any]) -> bool: + symbol = str(token.get("symbol") or "").strip().upper() + address = str(token.get("address") or "").strip() + native_addresses = { + "0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee", + "11111111111111111111111111111111", + "So11111111111111111111111111111111111111112", + } + stable_symbols = { + "USDC", + "USDT", + "DAI", + "USDE", + "USDS", + "USDG", + "PYUSD", + "FDUSD", + "TUSD", + "FRAX", + "XLAYER_USDT", + "USDT0", + } + native_symbols = {"OKB", "WOKB", "SOL", "WSOL", "ETH", "WETH"} + return address in native_addresses or symbol in stable_symbols or symbol in native_symbols + + +def _truthy(value: Any) -> bool: + if isinstance(value, bool): + return value + if value is None: + return False + return str(value).strip().lower() in ("1", "true", "yes", "y", "risk", "risky", "high", "critical") + + +def _lowered_values(values: Any) -> List[str]: + if isinstance(values, str): + return [values.strip().lower()] + if isinstance(values, list): + return [str(value).strip().lower() for value in values] + return [] + + +def _to_decimal(value: Any) -> Optional[Decimal]: + if value is None or value == "": + return None + try: + return Decimal(str(value)) + except Exception: + return None + + +def _dig(data: Any, keys: Iterable[str]) -> Any: + if isinstance(data, dict): + for key in keys: + if key in data: + return data[key] + for value in data.values(): + found = _dig(value, keys) + if found is not None: + return found + elif isinstance(data, list): + for value in data: + found = _dig(value, keys) + if found is not None: + return found + return None + + +def _find_numeric(data: Any, keys: Iterable[str]) -> Optional[Decimal]: + found = _dig(data, keys) + if found is None: + return None + return _to_decimal(found) diff --git a/skills/agent-risk-firewall/src/agent_risk_firewall/render.py b/skills/agent-risk-firewall/src/agent_risk_firewall/render.py new file mode 100644 index 000000000..6a50f305f --- /dev/null +++ b/skills/agent-risk-firewall/src/agent_risk_firewall/render.py @@ -0,0 +1,23 @@ +import json +from decimal import Decimal +from typing import Any, Dict + + +def dumps_json(payload: Dict[str, Any]) -> str: + return json.dumps(payload, indent=2, ensure_ascii=False, default=_json_default) + + +def error_payload(code: str, message: str, details: Any = None) -> Dict[str, Any]: + return { + "error": { + "code": code, + "message": message, + "details": details or [], + } + } + + +def _json_default(value: Any) -> Any: + if isinstance(value, Decimal): + return str(value) + return str(value) diff --git a/skills/agent-risk-firewall/tests/conftest.py b/skills/agent-risk-firewall/tests/conftest.py new file mode 100644 index 000000000..d3772985d --- /dev/null +++ b/skills/agent-risk-firewall/tests/conftest.py @@ -0,0 +1,8 @@ +import sys +from pathlib import Path + + +ROOT = Path(__file__).resolve().parents[1] +SRC = ROOT / "src" +if str(SRC) not in sys.path: + sys.path.insert(0, str(SRC)) diff --git a/skills/agent-risk-firewall/tests/fixtures/simulation_revert.json b/skills/agent-risk-firewall/tests/fixtures/simulation_revert.json new file mode 100644 index 000000000..61a681c33 --- /dev/null +++ b/skills/agent-risk-firewall/tests/fixtures/simulation_revert.json @@ -0,0 +1,52 @@ +{ + "tokenScan": { + "status": "ok", + "data": { + "ok": true, + "data": [ + { + "chainId": "196", + "tokenAddress": "0x0000000000000000000000000000000000000002", + "riskLevel": "LOW" + } + ] + } + }, + "tokenReport": { + "status": "ok", + "data": { + "ok": true, + "data": [ + { + "liquidityUsd": "250000" + } + ] + } + }, + "txScan": { + "status": "ok", + "data": { + "ok": true, + "data": { + "action": "", + "riskItemDetail": [], + "simulator": { + "gasLimit": 21000, + "revertReason": null + } + } + } + }, + "simulation": { + "status": "ok", + "data": { + "ok": true, + "data": { + "simulator": { + "success": false, + "revertReason": "execution reverted: TRANSFER_FAILED" + } + } + } + } +} diff --git a/skills/agent-risk-firewall/tests/fixtures/token_critical.json b/skills/agent-risk-firewall/tests/fixtures/token_critical.json new file mode 100644 index 000000000..33b0b01c5 --- /dev/null +++ b/skills/agent-risk-firewall/tests/fixtures/token_critical.json @@ -0,0 +1,37 @@ +{ + "tokenScan": { + "status": "ok", + "data": { + "ok": true, + "data": [ + { + "chainId": "196", + "tokenAddress": "0x0000000000000000000000000000000000000002", + "riskLevel": "CRITICAL", + "isHoneypot": true, + "isAirdropScam": false + } + ] + } + }, + "tokenReport": { + "status": "ok", + "data": { + "ok": true, + "data": [ + { + "liquidityUsd": "250000", + "protocolName": "FixtureDEX" + } + ] + } + }, + "txScan": { + "status": "skipped", + "reason": "No transaction context." + }, + "simulation": { + "status": "skipped", + "reason": "No transaction context." + } +} diff --git a/skills/agent-risk-firewall/tests/fixtures/token_high.json b/skills/agent-risk-firewall/tests/fixtures/token_high.json new file mode 100644 index 000000000..3fcc3c829 --- /dev/null +++ b/skills/agent-risk-firewall/tests/fixtures/token_high.json @@ -0,0 +1,37 @@ +{ + "tokenScan": { + "status": "ok", + "data": { + "ok": true, + "data": [ + { + "chainId": "196", + "tokenAddress": "0x0000000000000000000000000000000000000002", + "riskLevel": "HIGH", + "isHoneypot": false, + "isLowLiquidity": false + } + ] + } + }, + "tokenReport": { + "status": "ok", + "data": { + "ok": true, + "data": [ + { + "liquidityUsd": "250000", + "protocolName": "FixtureDEX" + } + ] + } + }, + "txScan": { + "status": "skipped", + "reason": "No transaction context." + }, + "simulation": { + "status": "skipped", + "reason": "No transaction context." + } +} diff --git a/skills/agent-risk-firewall/tests/fixtures/tx_scan_block.json b/skills/agent-risk-firewall/tests/fixtures/tx_scan_block.json new file mode 100644 index 000000000..9a1d062f8 --- /dev/null +++ b/skills/agent-risk-firewall/tests/fixtures/tx_scan_block.json @@ -0,0 +1,55 @@ +{ + "tokenScan": { + "status": "ok", + "data": { + "ok": true, + "data": [ + { + "chainId": "196", + "tokenAddress": "0x0000000000000000000000000000000000000002", + "riskLevel": "LOW" + } + ] + } + }, + "tokenReport": { + "status": "ok", + "data": { + "ok": true, + "data": [ + { + "liquidityUsd": "250000" + } + ] + } + }, + "txScan": { + "status": "ok", + "data": { + "ok": true, + "data": { + "action": "block", + "riskItemDetail": [ + { + "name": "black_tag", + "action": "block" + } + ], + "simulator": { + "gasLimit": 21000, + "revertReason": null + } + } + } + }, + "simulation": { + "status": "ok", + "data": { + "ok": true, + "data": { + "success": true, + "status": "success" + } + } + } +} diff --git a/skills/agent-risk-firewall/tests/fixtures/tx_scan_warn.json b/skills/agent-risk-firewall/tests/fixtures/tx_scan_warn.json new file mode 100644 index 000000000..999187565 --- /dev/null +++ b/skills/agent-risk-firewall/tests/fixtures/tx_scan_warn.json @@ -0,0 +1,55 @@ +{ + "tokenScan": { + "status": "ok", + "data": { + "ok": true, + "data": [ + { + "chainId": "196", + "tokenAddress": "0x0000000000000000000000000000000000000002", + "riskLevel": "LOW" + } + ] + } + }, + "tokenReport": { + "status": "ok", + "data": { + "ok": true, + "data": [ + { + "liquidityUsd": "250000" + } + ] + } + }, + "txScan": { + "status": "ok", + "data": { + "ok": true, + "data": { + "action": "warn", + "riskItemDetail": [ + { + "name": "approve_eoa", + "action": "warn" + } + ], + "simulator": { + "gasLimit": 21000, + "revertReason": null + } + } + } + }, + "simulation": { + "status": "ok", + "data": { + "ok": true, + "data": { + "success": true, + "status": "success" + } + } + } +} diff --git a/skills/agent-risk-firewall/tests/test_cli.py b/skills/agent-risk-firewall/tests/test_cli.py new file mode 100644 index 000000000..80ed9ad7d --- /dev/null +++ b/skills/agent-risk-firewall/tests/test_cli.py @@ -0,0 +1,85 @@ +import json +import os +import subprocess +import sys +from pathlib import Path + + +ROOT = Path(__file__).resolve().parents[1] + + +def run_cli(args, input_text=None): + env = os.environ.copy() + env["PYTHONPATH"] = str(ROOT / "src") + env["AGENT_RISK_FIREWALL_DISABLE_ONCHAINOS"] = "1" + return subprocess.run( + [sys.executable, "-m", "agent_risk_firewall.cli"] + args, + input=input_text, + capture_output=True, + text=True, + cwd=str(ROOT), + env=env, + check=False, + ) + + +def valid_payload(): + return { + "chain": "xlayer", + "operation": "swap", + "walletAddress": "0x0000000000000000000000000000000000000000", + "tokenIn": { + "address": "0x0000000000000000000000000000000000000001", + "symbol": "USDC", + "decimals": 6, + }, + "tokenOut": { + "address": "0x0000000000000000000000000000000000000002", + "symbol": "TOKEN", + "decimals": 18, + }, + "amountIn": "10", + "amountInUsd": 10, + "quote": {"expectedOut": "100", "slippagePct": 1, "priceImpactPct": 1}, + } + + +def test_check_reads_file_and_outputs_json(tmp_path): + request = tmp_path / "request.json" + request.write_text(json.dumps(valid_payload()), encoding="utf-8") + completed = run_cli(["check", "--input", str(request), "--format", "json"]) + assert completed.returncode == 0 + payload = json.loads(completed.stdout) + assert payload["verdict"] == "warn" + assert "riskScore" in payload + assert "reasons" in payload + assert any(reason["code"] == "SCAN_UNAVAILABLE" for reason in payload["reasons"]) + + +def test_check_reads_stdin(): + completed = run_cli(["check", "--input", "-", "--format", "json"], json.dumps(valid_payload())) + assert completed.returncode == 0 + payload = json.loads(completed.stdout) + assert payload["verdict"] == "warn" + + +def test_malformed_input_returns_error_json(): + completed = run_cli(["check", "--input", "-", "--format", "json"], "{bad json") + assert completed.returncode != 0 + payload = json.loads(completed.stdout) + assert payload["error"]["code"] == "INVALID_JSON" + + +def test_policy_command_outputs_balanced_profile(): + completed = run_cli(["policy", "--profile", "balanced"]) + assert completed.returncode == 0 + payload = json.loads(completed.stdout) + assert payload["profile"] == "balanced" + assert payload["maxTradeUsd"] == 250 + + +def test_self_test_passes(): + completed = run_cli(["self-test"]) + assert completed.returncode == 0 + payload = json.loads(completed.stdout) + assert payload["status"] == "pass" diff --git a/skills/agent-risk-firewall/tests/test_golden_fixtures.py b/skills/agent-risk-firewall/tests/test_golden_fixtures.py new file mode 100644 index 000000000..f55dd01a5 --- /dev/null +++ b/skills/agent-risk-firewall/tests/test_golden_fixtures.py @@ -0,0 +1,83 @@ +import json +from pathlib import Path + +import pytest + +from agent_risk_firewall.models import validate_check_input +from agent_risk_firewall.policy import evaluate + + +FIXTURES = Path(__file__).resolve().parent / "fixtures" + + +def _load_fixture(name): + with open(FIXTURES / name, "r", encoding="utf-8") as handle: + return json.load(handle) + + +def _payload(operation="buy", with_tx=False): + payload = { + "chain": "xlayer", + "operation": operation, + "walletAddress": "0x0000000000000000000000000000000000000000", + "tokenIn": { + "address": "0x0000000000000000000000000000000000000001", + "symbol": "USDC", + "decimals": 6, + }, + "tokenOut": { + "address": "0x0000000000000000000000000000000000000002", + "symbol": "RISK", + "decimals": 18, + }, + "amountIn": "10", + "amountInUsd": "10", + "quote": { + "expectedOut": "100", + "slippagePct": 0.5, + "priceImpactPct": 0.5, + "route": "fixture-route", + "venue": "FixtureDEX", + }, + "policyProfile": "balanced", + } + if with_tx: + payload["tx"] = { + "from": "0x0000000000000000000000000000000000000000", + "to": "0x0000000000000000000000000000000000000003", + "data": "0x095ea7b3", + "value": "0", + } + return payload + + +@pytest.mark.parametrize( + "fixture_name,operation,with_tx,expected_verdict,expected_code,requires_confirmation", + [ + ("token_high.json", "buy", False, "warn", "TOKEN_HIGH", True), + ("token_critical.json", "buy", False, "block", "TOKEN_CRITICAL", False), + ("token_critical.json", "sell", False, "warn", "TOKEN_CRITICAL_SELL", True), + ("tx_scan_warn.json", "swap", True, "warn", "TX_SCAN_WARN", True), + ("tx_scan_block.json", "swap", True, "block", "TX_SCAN_BLOCK", False), + ("simulation_revert.json", "swap", True, "block", "SIMULATION_REVERT", False), + ], +) +def test_golden_risk_fixtures( + fixture_name, + operation, + with_tx, + expected_verdict, + expected_code, + requires_confirmation, +): + context, findings = validate_check_input(_payload(operation=operation, with_tx=with_tx)) + result = evaluate(context, _load_fixture(fixture_name), findings) + + assert result["verdict"] == expected_verdict + assert result["requiresUserConfirmation"] is requires_confirmation + assert any(reason["code"] == expected_code for reason in result["reasons"]) + + if expected_verdict == "block": + assert "Do not ask the user to sign or broadcast" in result["safeNextStep"] + elif expected_verdict == "warn": + assert "ask the user for explicit confirmation" in result["safeNextStep"] diff --git a/skills/agent-risk-firewall/tests/test_policy.py b/skills/agent-risk-firewall/tests/test_policy.py new file mode 100644 index 000000000..505b0ab79 --- /dev/null +++ b/skills/agent-risk-firewall/tests/test_policy.py @@ -0,0 +1,136 @@ +from agent_risk_firewall.models import validate_check_input +from agent_risk_firewall.policy import evaluate + + +def context(operation="buy", slippage=0.5, price_impact=0.5, amount_usd=10): + payload = { + "chain": "xlayer", + "operation": operation, + "walletAddress": "0x0000000000000000000000000000000000000000", + "tokenIn": { + "address": "0x0000000000000000000000000000000000000001", + "symbol": "USDC", + "decimals": 6, + }, + "tokenOut": { + "address": "0x0000000000000000000000000000000000000002", + "symbol": "RISK", + "decimals": 18, + }, + "amountIn": "10", + "amountInUsd": amount_usd, + "quote": {"slippagePct": slippage, "priceImpactPct": price_impact}, + } + normalized, findings = validate_check_input(payload) + return normalized, findings + + +def evidence(token_risk="LOW", tx_action=None, simulation_data=None): + tx_scan = {"status": "skipped"} + if tx_action: + tx_scan = {"status": "ok", "data": {"action": tx_action}} + simulation = {"status": "skipped"} + if simulation_data is not None: + simulation = {"status": "ok", "data": simulation_data} + return { + "tokenScan": {"status": "ok", "data": {"riskLevel": token_risk}}, + "tokenReport": {"status": "ok", "data": {"liquidityUsd": 500000}}, + "txScan": tx_scan, + "simulation": simulation, + } + + +def test_critical_token_buy_blocks(): + ctx, findings = context("buy") + result = evaluate(ctx, evidence("CRITICAL"), findings) + assert result["verdict"] == "block" + assert any(reason["code"] == "TOKEN_CRITICAL" for reason in result["reasons"]) + + +def test_critical_token_sell_warns(): + ctx, findings = context("sell") + result = evaluate(ctx, evidence("CRITICAL"), findings) + assert result["verdict"] == "warn" + assert any(reason["code"] == "TOKEN_CRITICAL_SELL" for reason in result["reasons"]) + + +def test_tx_scan_block_wins(): + payload = { + "chain": "xlayer", + "operation": "swap", + "walletAddress": "0x0000000000000000000000000000000000000000", + "tokenIn": {"address": "0x0000000000000000000000000000000000000001"}, + "tokenOut": {"address": "0x0000000000000000000000000000000000000002"}, + "quote": {"slippagePct": 0.1, "priceImpactPct": 0.1}, + "tx": { + "from": "0x0000000000000000000000000000000000000000", + "to": "0x0000000000000000000000000000000000000003", + "data": "0x", + }, + } + ctx, findings = validate_check_input(payload) + result = evaluate(ctx, evidence("LOW", tx_action="block"), findings) + assert result["verdict"] == "block" + assert any(reason["code"] == "TX_SCAN_BLOCK" for reason in result["reasons"]) + + +def test_simulation_revert_blocks(): + payload = { + "chain": "xlayer", + "operation": "swap", + "walletAddress": "0x0000000000000000000000000000000000000000", + "tokenIn": {"address": "0x0000000000000000000000000000000000000001"}, + "tokenOut": {"address": "0x0000000000000000000000000000000000000002"}, + "quote": {"slippagePct": 0.1, "priceImpactPct": 0.1}, + "tx": { + "from": "0x0000000000000000000000000000000000000000", + "to": "0x0000000000000000000000000000000000000003", + "data": "0x", + }, + } + ctx, findings = validate_check_input(payload) + ev = evidence("LOW", simulation_data={"success": False, "revertReason": "transfer failed"}) + result = evaluate(ctx, ev, findings) + assert result["verdict"] == "block" + assert any(reason["code"] == "SIMULATION_REVERT" for reason in result["reasons"]) + + +def test_slippage_thresholds(): + ctx_warn, findings_warn = context("swap", slippage=4) + result_warn = evaluate(ctx_warn, evidence("LOW"), findings_warn) + assert result_warn["verdict"] == "warn" + assert any(reason["code"] == "SLIPPAGE_ELEVATED" for reason in result_warn["reasons"]) + + ctx_block, findings_block = context("swap", slippage=6) + result_block = evaluate(ctx_block, evidence("LOW"), findings_block) + assert result_block["verdict"] == "block" + assert any(reason["code"] == "SLIPPAGE_HIGH" for reason in result_block["reasons"]) + + +def test_scan_timeout_warns_not_allow(): + ctx, findings = context("swap") + ev = { + "tokenScan": {"status": "timeout"}, + "tokenReport": {"status": "skipped"}, + "txScan": {"status": "skipped"}, + "simulation": {"status": "skipped"}, + } + result = evaluate(ctx, ev, findings) + assert result["verdict"] == "warn" + assert result["verdict"] != "allow" + assert any(reason["code"] == "SCAN_TIMEOUT" for reason in result["reasons"]) + + +def test_solana_evm_address_mismatch_blocks(): + payload = { + "chain": "solana", + "operation": "swap", + "walletAddress": "0x0000000000000000000000000000000000000000", + "tokenIn": {"address": "So11111111111111111111111111111111111111112"}, + "tokenOut": {"address": "11111111111111111111111111111111"}, + "quote": {"slippagePct": 0.1, "priceImpactPct": 0.1}, + } + ctx, findings = validate_check_input(payload) + result = evaluate(ctx, evidence("LOW"), findings) + assert result["verdict"] == "block" + assert any(reason["code"] == "ADDRESS_CHAIN_MISMATCH" for reason in result["reasons"]) diff --git a/skills/agent-risk-firewall/tests/test_policy_improvements.py b/skills/agent-risk-firewall/tests/test_policy_improvements.py new file mode 100644 index 000000000..af058a5a4 --- /dev/null +++ b/skills/agent-risk-firewall/tests/test_policy_improvements.py @@ -0,0 +1,265 @@ +from agent_risk_firewall.models import validate_check_input +from agent_risk_firewall.policy import evaluate, get_policy + + +def payload( + operation="swap", + chain="xlayer", + token_in=None, + token_out=None, + quote=None, + amount_usd=10, + wallet_value_usd=None, + tx=None, + approval=None, + external_evidence=None, + competition=None, + profile="balanced", +): + base = { + "chain": chain, + "operation": operation, + "walletAddress": "0x0000000000000000000000000000000000000000", + "tokenIn": token_in + or { + "address": "0x0000000000000000000000000000000000000001", + "symbol": "USDC", + "decimals": 6, + }, + "tokenOut": token_out + or { + "address": "0x0000000000000000000000000000000000000002", + "symbol": "RISK", + "decimals": 18, + }, + "amountIn": "10", + "amountInUsd": amount_usd, + "quote": quote or {"slippagePct": 0.5, "priceImpactPct": 0.5}, + "policyProfile": profile, + } + if wallet_value_usd is not None: + base["walletValueUsd"] = wallet_value_usd + if tx is not None: + base["tx"] = tx + if approval is not None: + base["approval"] = approval + if external_evidence is not None: + base["externalEvidence"] = external_evidence + if competition is not None: + base["competition"] = competition + return base + + +def safe_evidence(): + return { + "tokenScan": {"status": "ok", "data": {"riskLevel": "LOW"}}, + "tokenReport": {"status": "ok", "data": {"liquidityUsd": 500000}}, + "txScan": {"status": "skipped"}, + "simulation": {"status": "skipped"}, + } + + +def run(payload_data, evidence=None): + ctx, findings = validate_check_input(payload_data) + return evaluate(ctx, evidence or safe_evidence(), findings, ctx["policyProfile"]) + + +def test_policy_profiles_are_available(): + assert get_policy("strict")["maxTradeUsd"] == 100 + assert get_policy("competition")["disallowStableNativeOnlyPair"] is True + assert get_policy("degen-small-size")["slippageBlockPct"] == 12 + + +def test_strict_profile_blocks_scan_unavailable(): + ctx, findings = validate_check_input(payload(profile="strict")) + result = evaluate( + ctx, + { + "tokenScan": {"status": "unavailable"}, + "tokenReport": {"status": "skipped"}, + "txScan": {"status": "skipped"}, + "simulation": {"status": "skipped"}, + }, + findings, + "strict", + ) + assert result["verdict"] == "block" + assert any(reason["code"] == "SCAN_UNAVAILABLE" for reason in result["reasons"]) + + +def test_competition_profile_blocks_stable_native_only_pair(): + result = run( + payload( + profile="competition", + token_in={ + "address": "0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee", + "symbol": "OKB", + "decimals": 18, + }, + token_out={ + "address": "0x779ded0c9e1022225f8e0630b35a9b54be713736", + "symbol": "USDT", + "decimals": 6, + }, + ) + ) + assert result["verdict"] == "block" + assert any(reason["code"] == "COMPETITION_PAIR_NOT_ELIGIBLE" for reason in result["reasons"]) + + +def test_competition_profile_warns_without_competition_context(): + result = run(payload(profile="competition")) + + assert result["verdict"] == "warn" + assert any(reason["code"] == "COMPETITION_CONTEXT_MISSING" for reason in result["reasons"]) + + +def test_competition_profile_blocks_inactive_competition(): + result = run(payload(profile="competition", competition={"active": False, "joined": True, "supportedChains": ["xlayer"]})) + + assert result["verdict"] == "block" + assert any(reason["code"] == "COMPETITION_INACTIVE" for reason in result["reasons"]) + + +def test_competition_profile_blocks_unsupported_chain(): + result = run(payload(profile="competition", competition={"active": True, "joined": True, "supportedChains": ["solana"]})) + + assert result["verdict"] == "block" + assert any(reason["code"] == "COMPETITION_CHAIN_UNSUPPORTED" for reason in result["reasons"]) + + +def test_competition_profile_warns_when_not_joined(): + result = run(payload(profile="competition", competition={"active": True, "joined": False, "supportedChains": ["xlayer", "solana"]})) + + assert result["verdict"] == "warn" + assert any(reason["code"] == "COMPETITION_NOT_JOINED" for reason in result["reasons"]) + + +def test_competition_profile_warns_below_competition_thresholds(): + result = run( + payload( + profile="competition", + amount_usd=10, + wallet_value_usd=150, + competition={ + "active": True, + "joined": True, + "chainName": "X Layer", + "minParticipationUsd": 25, + "minLeaderboardUsd": 100, + "minWalletBalanceUsd": 200, + "eligibleTokenTradeRequired": True, + }, + ) + ) + + assert result["verdict"] == "warn" + codes = {reason["code"] for reason in result["reasons"]} + assert "COMPETITION_VOLUME_BELOW_PARTICIPATION_MIN" in codes + assert "COMPETITION_VOLUME_BELOW_LEADERBOARD_MIN" in codes + assert "COMPETITION_BALANCE_BELOW_MIN" in codes + + +def test_competition_profile_allows_real_token_trade_with_context(): + result = run( + payload( + profile="competition", + amount_usd=10, + competition={ + "active": True, + "joined": True, + "chainName": "X Layer", + "minParticipationUsd": 5, + "eligibleTokenTradeRequired": True, + }, + ) + ) + + assert result["verdict"] == "allow" + assert not any(reason["code"].startswith("COMPETITION_") for reason in result["reasons"]) + + +def test_degen_small_size_allows_higher_slippage_but_blocks_large_trade(): + result = run(payload(profile="degen-small-size", quote={"slippagePct": 6, "priceImpactPct": 1})) + assert result["verdict"] == "warn" + assert any(reason["code"] == "SLIPPAGE_ELEVATED" for reason in result["reasons"]) + + large = run(payload(profile="degen-small-size", amount_usd=26)) + assert large["verdict"] == "block" + assert any(reason["code"] == "TRADE_CAP_EXCEEDED" for reason in large["reasons"]) + + +def test_external_goplus_honeypot_blocks_and_is_preserved_in_evidence(): + result = run( + payload( + external_evidence={ + "goplus": { + "riskLevel": "HIGH", + "is_honeypot": "1", + "buy_tax": "10", + "sell_tax": "10", + } + } + ) + ) + assert result["verdict"] == "block" + assert result["evidence"]["externalEvidence"]["goplus"]["is_honeypot"] == "1" + assert any(reason["code"] == "EXTERNAL_GOPLUS_HONEYPOT" for reason in result["reasons"]) + + +def test_external_birdeye_holder_concentration_warns(): + result = run(payload(external_evidence={"birdeye": {"top10HolderPercent": 75, "liquidityUsd": 200000}})) + assert result["verdict"] == "warn" + assert any(reason["code"] == "EXTERNAL_HOLDER_CONCENTRATION_ELEVATED" for reason in result["reasons"]) + + +def test_external_rootdata_scam_tag_blocks(): + result = run(payload(external_evidence={"rootdata": {"tags": ["scam"]}})) + assert result["verdict"] == "block" + assert any(reason["code"] == "EXTERNAL_ROOTDATA_CRITICAL" for reason in result["reasons"]) + + +def test_approval_unlimited_eoa_warns_with_balanced_policy(): + result = run( + payload( + operation="approval", + token_in={ + "address": "0x0000000000000000000000000000000000000001", + "symbol": "USDC", + "decimals": 6, + }, + token_out=None, + approval={ + "spender": "0x0000000000000000000000000000000000000004", + "spenderType": "eoa", + "isUnlimited": True, + }, + ) + ) + assert result["verdict"] == "warn" + assert any(reason["code"] == "APPROVAL_UNLIMITED" for reason in result["reasons"]) + assert any(reason["code"] == "APPROVAL_SPENDER_EOA" for reason in result["reasons"]) + + +def test_approval_blocked_spender_blocks(): + result = run( + payload( + operation="approval", + approval={ + "spender": "0x0000000000000000000000000000000000000004", + "blockedSpenders": ["0x0000000000000000000000000000000000000004"], + }, + ) + ) + assert result["verdict"] == "block" + assert any(reason["code"] == "APPROVAL_SPENDER_BLOCKED" for reason in result["reasons"]) + + +def test_audit_trail_is_deterministic_and_contains_hashes(): + request = payload() + first = run(request) + second = run(request) + assert first["audit"] == second["audit"] + assert first["audit"]["decisionId"].startswith("arf_") + assert first["audit"]["policyVersion"] == "1.2.0" + assert len(first["audit"]["evidenceHash"]) == 64