Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .flocks/plugins/skills/onesec-use/SKILL.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
name: onesec-use
description: 用于处理 OneSEC 终端安全平台相关任务,适合通过API或者结合浏览器进行以下任务: 终端安全调查、威胁事件分析、终端告警检索、行为日志排查、IOC 查询、恶意文件分析、DNS 威胁排查、软件与终端资产查询、任务进度查看、审计日志分析、病毒扫描和常见终端处置场景。只要用户提到 OneSEC、微步 EDR等相关操纵需求时,必须先加载本 skill。本 skill 是 OneSEC 平台操作的唯一决策入口:在未阅读本 skill 并完成模式判断前,不要直接调用任何 `onesec_*` tool。
description: 用于处理 OneSEC/OneDNS 终端安全平台相关任务,适合通过API或者结合浏览器进行以下任务: 终端安全调查、威胁事件分析、终端告警检索、行为日志排查、IOC 查询、恶意文件分析、DNS 威胁排查、软件与终端资产查询、任务进度查看、审计日志分析、病毒扫描和常见终端处置场景。只要用户提到 OneSEC、微步 EDR等相关操纵需求时,必须先加载本 skill。本 skill 是 OneSEC 平台操作的唯一决策入口:在未阅读本 skill 并完成模式判断前,不要直接调用任何 `onesec_*` tool。
---

# OneSEC Use
Expand Down
25 changes: 11 additions & 14 deletions .flocks/plugins/skills/web2cli/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ uv run python .flocks/plugins/skills/web2cli/scripts/generate-spec.py \

- 目标站点与命令名
- 鉴权策略(如 `PUBLIC` / `COOKIE` / `HEADER`)
- 主请求的 method、endpoint、query/body 模板
- 主请求的 method、endpoint、query/body/payload 模板
- CLI 参数定义
- 固定输出列定义
- 验证材料初稿
Expand Down Expand Up @@ -314,16 +314,24 @@ uv run python .flocks/plugins/skills/web2cli/scripts/generate-cli.py \
--output "$CAPTURE_ROOT/${CAPTURE_NAME}_api.md"
```

### 10. CLI工具验证 和浏览器关闭
### 10. CLI工具验证与修改

根据生成的 CLI ,任意选择一个接口调用测试可用性
- CLI 工具可用性
- 认证状态可用性
- `verify.json` 的输出约束是否满足
- method、endpoint、query/body/payload 的一致性,必要时根据${CAPTURE_NAME}_api.json调整

推荐先查看 `"$CAPTURE_ROOT/${CAPTURE_NAME}_verify.json"`,再用生成的 CLI 以默认参数执行一次,确认固定输出列与认证状态都正确。

当验证完成,确保 CLI 可用后关闭浏览器或 Tab
### 11. CLI 工具集成到skill

将 CLI 按 `references/cli-in-skill.md` 集成为 skill;

### 12. summary并关闭浏览器 tab

1. 总结当前生成的 CLI 工具有哪些接口/能力
2. 确保 CLI 可用后关闭浏览器或 Tab

#### 关闭浏览器或 Tab

Expand All @@ -343,17 +351,6 @@ else:

必须保留用户原有的 tab 不受影响。

### 11. CLI 工具集成到skill

将 CLI 按 `references/cli-in-skill.md` 集成为 skill;

### 12. summary

总结当前生成的 CLI 工具有哪些能力,然后可提示用户下一步操作:

- 精简或修正 CLI
- 若仍需扩展能力或沉淀为 skill,回到步骤 11

## 故障处理

### Hook 注入报错
Expand Down
148 changes: 135 additions & 13 deletions .flocks/plugins/skills/web2cli/scripts/generate-cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,9 @@ def generate_postman_collection_from_spec(spec: Dict[str, Any]) -> Dict[str, Any
def generate_python_cli_from_spec(spec: Dict[str, Any]) -> str:
"""Generate a fixed command CLI script from a web2cli spec."""
spec_json = json.dumps(spec, indent=2, ensure_ascii=False)
spec_json = re.sub(r'\btrue\b', 'True', spec_json)
spec_json = re.sub(r'\bfalse\b', 'False', spec_json)
spec_json = re.sub(r'\bnull\b', 'None', spec_json)
return '''#!/usr/bin/env python3
"""
Auto-generated Web2CLI command script.
Expand All @@ -720,6 +723,7 @@ def generate_python_cli_from_spec(spec: Dict[str, Any]) -> str:
import argparse
import csv
import json
from pathlib import Path
import re
import sys
from typing import Any, Dict, List
Expand Down Expand Up @@ -764,6 +768,25 @@ def _coerce_bool(value: str) -> bool:
raise argparse.ArgumentTypeError(f"invalid boolean value: {value}")


def _auth_header_dest(header_name: str) -> str:
normalized = re.sub(r"[^A-Za-z0-9]+", "_", str(header_name or "")).strip("_").lower()
return f"auth_header_{normalized or 'value'}"


def _manual_auth_rules() -> List[Dict[str, Any]]:
auth = SPEC.get("auth", {})
if not isinstance(auth, dict):
return []
rules = auth.get("requiredHeaders", [])
if not isinstance(rules, list):
return []
return [
rule
for rule in rules
if isinstance(rule, dict) and rule.get("source") == "manual" and rule.get("name")
]


def _type_name(value: Any) -> str:
if value is None:
return "null"
Expand Down Expand Up @@ -992,12 +1015,55 @@ def _extract_first(cls, value: Any, path: str) -> Any:
values = cls._extract_many(value, path)
return values[0] if values else None

def __init__(self, base_url: str = SPEC.get("baseUrl", ""), auth_state: str = "auth-state.json"):
@staticmethod
def _stringify_multipart_value(value: Any) -> str:
if value is None:
return ""
if isinstance(value, (dict, list)):
return json.dumps(value, ensure_ascii=False)
return str(value)

@classmethod
def _build_multipart_files(
cls,
body: Dict[str, Any],
file_fields: List[str],
) -> tuple[List[Any], List[Any]]:
files = []
opened_files = []
target_fields = {str(item) for item in file_fields if item}
for key, value in (body or {}).items():
if key in target_fields:
file_path = Path(str(value or ""))
if not str(value or "").strip():
raise SystemExit(f"missing required multipart file path: {key}")
try:
handle = file_path.open("rb")
except OSError as error:
raise SystemExit(f"failed to open multipart file for {key}: {error}") from error
opened_files.append(handle)
files.append((key, (file_path.name, handle)))
else:
files.append((key, (None, cls._stringify_multipart_value(value))))
return files, opened_files

def __init__(
self,
base_url: str = SPEC.get("baseUrl", ""),
auth_state: str = "auth-state.json",
manual_headers: Dict[str, str] | None = None,
):
self.base_url = (base_url or SPEC.get("baseUrl", "")).rstrip("/")
self.auth_state_path = auth_state
self.auth_state = _load_json(auth_state) if auth_state else {}
if not isinstance(self.auth_state, dict):
self.auth_state = {}
raw_manual_headers = manual_headers if isinstance(manual_headers, dict) else {}
self.manual_headers = {
str(key): str(value)
for key, value in raw_manual_headers.items()
if value not in (None, "")
}
self.session = requests.Session()
self._apply_auth_state()

Expand All @@ -1009,16 +1075,25 @@ def _apply_auth_state(self) -> None:
self.session.headers.update(headers)

if strategy == "HEADER":
missing_manual_headers = []
for rule in auth.get("requiredHeaders", []):
if not isinstance(rule, dict) or not rule.get("name"):
continue
source = rule.get("source")
if source == "cookie":
value = self._resolve_cookie_value(rule.get("key"))
elif source == "manual":
value = self.manual_headers.get(str(rule["name"]))
else:
value = self._resolve_header_value(self.auth_state, rule)
if value is not None:
self.session.headers[str(rule["name"])] = value
elif source == "manual":
missing_manual_headers.append(str(rule["name"]))
if missing_manual_headers:
raise SystemExit(
"missing required auth headers: " + ", ".join(sorted(missing_manual_headers))
)

def build_request(self, args: Dict[str, Any], entry: Dict[str, Any]) -> Dict[str, Any]:
operation = entry.get("operation", {})
Expand All @@ -1035,12 +1110,24 @@ def build_request(self, args: Dict[str, Any], entry: Dict[str, Any]) -> Dict[str
"params": query or None,
"json": None,
"data": None,
"files": None,
"opened_files": [],
"headers": headers or None,
}
if payload_mode == "json":
request_options["json"] = body or None
elif payload_mode == "form":
request_options["data"] = body or None
elif payload_mode == "multipart":
multipart_body = body if isinstance(body, dict) else {}
multipart_files, opened_files = self._build_multipart_files(
multipart_body,
operation.get("multipartFileFields", []),
)
headers.pop("Content-Type", None)
headers.pop("content-type", None)
request_options["files"] = multipart_files or None
request_options["opened_files"] = opened_files
elif payload_mode == "raw":
request_options["data"] = raw_body or None
if cookie_strategy in {"COOKIE", "HEADER"}:
Expand All @@ -1053,7 +1140,9 @@ def build_request(self, args: Dict[str, Any], entry: Dict[str, Any]) -> Dict[str
"params": request_options["params"],
"json": request_options["json"],
"data": request_options["data"],
"headers": request_options["headers"],
"files": request_options["files"],
"opened_files": request_options["opened_files"],
"headers": headers or None,
}

def _project_rows(self, payload: Any, entry: Dict[str, Any]) -> List[Dict[str, Any]]:
Expand Down Expand Up @@ -1084,16 +1173,28 @@ def _project_rows(self, payload: Any, entry: Dict[str, Any]) -> List[Dict[str, A
def run(self, args: Dict[str, Any], entry: Dict[str, Any] | None = None) -> List[Dict[str, Any]]:
operation_entry = entry or _operation_entries()[0]
request_options = self.build_request(args, operation_entry)
response = self.session.request(
request_options["method"],
request_options["url"],
params=request_options["params"],
json=request_options["json"],
data=request_options["data"],
headers=request_options["headers"],
)
response.raise_for_status()
return self._project_rows(response.json(), operation_entry)
request_kwargs = {
"params": request_options["params"],
"json": request_options["json"],
"data": request_options["data"],
"headers": request_options["headers"],
}
if request_options["files"] is not None:
request_kwargs["files"] = request_options["files"]
try:
response = self.session.request(
request_options["method"],
request_options["url"],
**request_kwargs,
)
response.raise_for_status()
return self._project_rows(response.json(), operation_entry)
finally:
for handle in request_options.get("opened_files", []):
try:
handle.close()
except OSError:
pass


def verify_rows(rows: List[Dict[str, Any]], verify_spec: Dict[str, Any]) -> List[str]:
Expand Down Expand Up @@ -1157,6 +1258,17 @@ def _add_output_arguments(parser: argparse.ArgumentParser) -> None:
parser.add_argument("--verify-spec", help="Optional verify JSON path")


def _add_manual_auth_arguments(parser: argparse.ArgumentParser) -> None:
for rule in _manual_auth_rules():
header_name = str(rule["name"])
option_name = re.sub(r"[^a-z0-9]+", "-", header_name.lower()).strip("-")
parser.add_argument(
f"--auth-header-{option_name}",
dest=_auth_header_dest(header_name),
help=f"Value for required header {header_name}",
)


def _add_operation_arguments(parser: argparse.ArgumentParser, entry: Dict[str, Any]) -> None:
for arg in entry.get("args", []):
if not isinstance(arg, dict) or not arg.get("name"):
Expand Down Expand Up @@ -1187,6 +1299,7 @@ def build_parser() -> argparse.ArgumentParser:
default=(SPEC.get("auth", {}) or {}).get("stateFile", "auth-state.json"),
help="Path to auth state JSON",
)
_add_manual_auth_arguments(parser)
entries = _operation_entries()
if _uses_subcommands():
subparsers = parser.add_subparsers(dest="command", required=True)
Expand All @@ -1213,7 +1326,16 @@ def main() -> None:
for item in entry.get("args", [])
if isinstance(item, dict) and item.get("name")
}
client = APIClient(base_url=parsed.base_url, auth_state=parsed.auth_state)
manual_headers = {
str(rule["name"]): getattr(parsed, _auth_header_dest(str(rule["name"])))
for rule in _manual_auth_rules()
if getattr(parsed, _auth_header_dest(str(rule["name"])), None) not in (None, "")
}
client = APIClient(
base_url=parsed.base_url,
auth_state=parsed.auth_state,
manual_headers=manual_headers,
)
rows = client.run(runtime_args, entry)

if parsed.verify:
Expand Down
Loading