扫描前端仓库(支持 monorepo),先全面分析仓库结构,再自动生成可执行的 CLI 工具。
核心亮点:
向用户确认以下几项(消息中已有则直接使用,无需再问):
1. 仓库路径:前端业务仓库本地路径,可多个。如未 clone,提供 git 地址则帮用户 clone。
2. CLI 命名规范:询问用户是否有团队/公司的 CLI 命名规范文档,如有请提供链接或描述。
若有规范文档,先读取并理解,再据此决定:
list/get/create/update/delete)若用户没有规范,使用以下通用默认值:
list、get、create、update、delete、search--poi-id 而非 --poiId)--format table 切换表格3. CLI 名称:根据规范或用户指定确定命令名(默认用仓库名)
4. 输出路径:询问用户 CLI 项目生成到哪个目录(默认当前工作目录下的 )
5. 语言选择:
> 生成的 CLI 用哪种语言?
> - Python(推荐非开发者用户,macOS 自带环境,零配置)
> - Node.js(推荐前端开发者,TypeScript 支持,便于自己维护)
6. 鉴权方式:Step 2.4 会自动从仓库读取,此处无需询问。
不问用户,自己读,按以下顺序扫描。每一步都必须实际执行,不能跳过。
> ⚠️ 核心原则:Step 2 最重要的任务是建立完整的"仓库画像",而不只是找到接口文件列表。
在开始生成任何代码之前,必须搞清楚以下六个维度,缺一不可:
| 维度 | 需要读懂的内容 |
|---|---|
| --- | --- |
| 目录规范 | 接口文件在哪里?类型文件在哪里?枚举/常量在哪里? |
| API 调用规范 | 项目用什么 HTTP 封装?函数名是什么?调用方式长什么样? |
| 域名 & 前缀 | baseURL 是什么?各环境域名?接口路径前缀? |
| 枚举 & 常量 | 所有数字枚举的含义?状态码映射?业务常量? |
| 参数 & 类型 | 每个接口的请求参数字段、类型、必填/可选?嵌套类型? |
| 代码注释 | 接口文件、类型文件、常量文件里的注释是参数含义的第一手资料 |
REPO_DIR=<仓库本地路径>
# 读取 package.json
cat $REPO_DIR/package.json
# 判断是否是 monorepo
ls $REPO_DIR/packages/ 2>/dev/null && echo "IS MONOREPO" || echo "SINGLE PACKAGE"
# 列出完整目录树(深度 4 层)
find $REPO_DIR -maxdepth 4 -type d | grep -v node_modules | grep -v "\.git" | grep -v dist | sort
# 检查 TypeScript 和包管理器
ls $REPO_DIR/tsconfig.json 2>/dev/null && echo "有TS" || echo "无TS"
读完目录树后,必须回答:
核心策略:先采样 → 归纳调用模式 → 全量扫描。
基于 2.1 的结论,直接读取 2~3 个接口文件,观察:
归纳出 HTTP 封装函数名后,做全量扫描:
grep -rn "<封装函数名>(" $REPO_DIR --include="*.ts" --include="*.js" -l \
| grep -v node_modules | grep -v dist | sort
补充扫描:
# OpenAPI / Swagger 文件
find $REPO_DIR \( -name "swagger.json" -o -name "openapi.json" \) | grep -v node_modules
# GraphQL
grep -rn "gql\`|useQuery|useMutation" $REPO_DIR --include="*.ts" -l | grep -v node_modules
这是最容易出错的步骤。 接口函数签名只有参数名,真正的字段详情在类型文件里。
find $REPO_DIR \( -path "*/types/*.ts" -o -path "*/domain/*.ts" -o -path "*/models/*.ts" \) \
| grep -v node_modules | grep -v "dist/" | sort
对每个接口,必须同时读取对应的类型文件,从中提取:
?:)find $REPO_DIR \( -name "request.ts" -o -name "http.ts" -o -name "axios.ts" \) \
| grep -v node_modules | head -10
必须提取:
baseURL 各环境值| 识别特征 | 鉴权类型 | 推荐方案 |
|---|---|---|
| --- | --- | --- |
Authorization: Bearer | Bearer Token | 命令 |
Cookie header | 浏览器 Cookie | CDP 自动抓取 / 手动粘贴 |
X-API-Key 等 | API Key | 配置文件或环境变量 |
| 无鉴权 | 无需鉴权 | 直接调用 |
方案 A:Bearer Token 登录
auth login 命令触发登录流程方案 B:Cookie 鉴权
方案 C:API Key 配置
auth setup --api-key 写入配置文件_API_KEY 方案 D:无需鉴权
find $REPO_DIR -maxdepth 3 \( -name ".env" -o -name ".env.*" -o -name "config.ts" \) \
| grep -v node_modules | xargs grep -h "BASE_URL|API_URL|HOST|DOMAIN" 2>/dev/null
这是生成高质量 CLI 的关键步骤。 前端业务代码中大量使用数字枚举。
// 活动状态
export const ACTIVITY_STATUS = {
10: '待审核', // 提交后等待运营审核
20: '进行中', // 审核通过,活动生效
30: '已结束', // 活动到期或手动关闭
}
必须提取:
必须先输出方案,等用户明确确认后再生成代码。
📊 扫描结果
仓库:<repo-name> 框架:React/Vue/Taro 结构:monorepo / 单包
接口文件:<实际接口目录路径> 下共 N 个文件,M 个接口
类型文件:<实际类型目录路径> 下共 K 个文件(已全部读取)
枚举文件:<实际枚举目录路径> 下共 J 个文件(已全部读取)
语言:Python / Node.js(用户选择)
🔐 鉴权识别结果
检测到:<鉴权类型>
目标系统域名:<从仓库读取的真实域名>
自定义 Header:<从 request.ts 读到的 header 列表>
请确认使用哪种鉴权方案:
[ ] Bearer Token 登录(auth login 命令)
[ ] Cookie 鉴权(CDP 自动抓取 / 手动粘贴)
[ ] API Key 配置
[ ] 无需鉴权
📋 命令预览(含关键参数)
<cli-name> auth show # 查看认证状态
<cli-name> auth login # 登录认证
<cli-name> <domain1> list # 列表查询
...
<output-dir>/<cli-name>/
├── setup.py
├── README.md
└── cli/
└── <cli_name>/
├── __init__.py
├── __main__.py
├── main.py
├── commands/
│ ├── __init__.py
│ ├── auth.py
│ └── <domain>.py
└── utils/
├── __init__.py
├── auth.py
├── http_client.py
├── config.py
├── output.py
└── enums.py
from setuptools import setup, find_packages
setup(
name="<cli-name>",
version="1.0.0",
packages=find_packages(),
install_requires=[
"click>=8.0",
"requests>=2.28",
"rich>=13.0",
],
entry_points={
"console_scripts": [
"<cli-name>=cli.<cli_name>.main:cli"
]
},
python_requires=">=3.8",
)
"""
通用鉴权模块(可插拔策略)
支持:Bearer Token / Cookie(CDP + 缓存)/ API Key / 无鉴权
"""
import json, os, time, threading, urllib.request
from pathlib import Path
from typing import Optional, Dict
CACHE_PATH = Path.home() / ".config" / "<cli-name>" / "auth.json"
CONFIG_PATH = Path.home() / ".config" / "<cli-name>" / "config.json"
TARGET_DOMAIN = "<从仓库读取的真实域名>"
CDP_PORT = int(os.environ.get("CDP_PORT", "9222"))
COOKIE_CACHE_TTL_MIN = 24 * 60
class AuthResult:
def __init__(self, auth_type: str, method: str, token: str = None, cookie: str = None):
self.type = auth_type
self.method = method
self.token = token
self.cookie = cookie
def get_headers(self) -> Dict[str, str]:
if self.type == "bearer" and self.token:
return {"Authorization": f"Bearer {self.token}"}
elif self.type == "cookie" and self.cookie:
return {"Cookie": self.cookie}
raise RuntimeError(f"无效的认证结果: type={self.type}")
def _load_config() -> dict:
if not CONFIG_PATH.exists(): return {}
return json.loads(CONFIG_PATH.read_text())
def _save_config(data: dict):
CONFIG_PATH.parent.mkdir(parents=True, exist_ok=True)
current = _load_config()
current.update(data)
CONFIG_PATH.write_text(json.dumps(current, ensure_ascii=False, indent=2))
def _fetch_via_cdp(domain: str) -> str:
"""通过 Chrome DevTools Protocol 获取 Cookie"""
import websocket
url = f"http://127.0.0.1:{CDP_PORT}/json"
with urllib.request.urlopen(url, timeout=5) as r:
tabs = json.loads(r.read())
tab = next((t for t in tabs if t.get("type") == "page" and domain in t.get("url", "")), None)
if not tab: tab = next((t for t in tabs if t.get("type") == "page"), None)
if not tab: raise RuntimeError(f"未找到活跃 Chrome 页面")
ws_url = tab.get("webSocketDebuggerUrl")
result, done = {}, threading.Event()
def on_open(ws):
ws.send(json.dumps({"id": 1, "method": "Network.getCookies",
"params": {"urls": [f"https://{domain}"]}}))
def on_message(ws, msg):
m = json.loads(msg)
if m.get("id") == 1:
result["cookies"] = m.get("result", {}).get("cookies", [])
done.set()
ws.close()
ws = websocket.WebSocketApp(ws_url, on_open=on_open, on_message=on_message)
threading.Thread(target=ws.run_forever, daemon=True).start()
done.wait(timeout=10)
cookies = result.get("cookies", [])
if not cookies: raise RuntimeError(f"未找到 {domain} 的 Cookie")
return "; ".join(f"{c['name']}={c['value']}" for c in cookies)
def authenticate() -> AuthResult:
"""统一鉴权入口"""
config = _load_config()
strategy = config.get("auth_strategy", "bearer")
if strategy == "bearer":
token = config.get("access_token")
if not token: raise RuntimeError("未找到 token,请运行:<cli-name> auth login")
return AuthResult(auth_type="bearer", method="token-cache", token=token)
elif strategy == "cookie":
# 尝试 CDP
try:
cookie_str = _fetch_via_cdp(TARGET_DOMAIN)
return AuthResult(auth_type="cookie", method="cdp", cookie=cookie_str)
except: pass
# 尝试缓存
if CACHE_PATH.exists():
cache = json.loads(CACHE_PATH.read_text())
age_min = (time.time() - cache.get("updatedAt", 0)) / 60
if age_min < COOKIE_CACHE_TTL_MIN:
return AuthResult(auth_type="cookie", method="cache", cookie=cache["value"])
raise RuntimeError("Cookie 鉴权失败,请运行:<cli-name> auth refresh")
elif strategy == "api-key":
api_key = config.get("api_key") or os.environ.get("<CLI_NAME>_API_KEY")
if not api_key: raise RuntimeError("未找到 API Key")
return AuthResult(auth_type="api-key", method="env")
raise RuntimeError(f"未知策略:{strategy}")
def get_auth_header() -> Dict[str, str]:
return authenticate().get_headers()
"""HTTP 请求封装"""
import requests
from .auth import get_auth_header
BASE_URLS = {
"prod": "https://<DOMAIN>",
"staging": "https://<STAGING_DOMAIN>",
"test": "https://<TEST_DOMAIN>",
}
EXTRA_HEADERS = {} # 从仓库 request.ts 读取
_session = None
_current_env = "prod"
def get_session(env="prod"):
global _session, _current_env
if _session and _current_env == env: return _session
auth_headers = get_auth_header()
_session = requests.Session()
_session.headers.update({
**auth_headers,
"Content-Type": "application/json",
**EXTRA_HEADERS,
})
_session.base_url = BASE_URLS.get(env, BASE_URLS["prod"])
_current_env = env
return _session
def get(path, params=None, env="prod"):
s = get_session(env)
r = s.get(s.base_url.rstrip("/") + "/" + path.lstrip("/"), params=params, timeout=30)
return _check_response(r)
def post(path, data=None, env="prod"):
s = get_session(env)
r = s.post(s.base_url.rstrip("/") + "/" + path.lstrip("/"), json=data or {}, timeout=30)
return _check_response(r)
def _check_response(r):
if r.status_code == 401: raise RuntimeError("认证失败")
if r.status_code == 404: raise RuntimeError(f"接口不存在:{r.url}")
if not r.ok: raise RuntimeError(f"请求失败 [{r.status_code}]:{r.text[:200]}")
body = r.json()
code = body.get("code")
if code is not None and code != 0 and code != 200:
raise RuntimeError(f"业务错误 [code={code}]:{body.get('message', '')}")
return body
"""配置管理"""
import json
from pathlib import Path
CONFIG_PATH = Path.home() / ".config" / "<cli-name>" / "config.json"
def load() -> dict:
if not CONFIG_PATH.exists():
return {}
return json.loads(CONFIG_PATH.read_text())
def save(data: dict):
CONFIG_PATH.parent.mkdir(parents=True, exist_ok=True)
current = load()
current.update(data)
CONFIG_PATH.write_text(json.dumps(current, ensure_ascii=False, indent=2))
from rich.console import Console
from rich.table import Table
import json
console = Console()
def print_table(data: list, columns: list, title: str = None):
if not data:
console.print(f"[grey50]{title or ''} 暂无数据[/grey50]")
return
table = Table(title=title, show_header=True, header_style="bold cyan")
for col in columns:
table.add_column(col["label"], width=col.get("width"))
for row in data:
table.add_row(*[str(row.get(c["key"], "-")) for c in columns])
console.print(table)
def print_json(data): print(json.dumps(data, ensure_ascii=False, indent=2))
def print_success(msg): console.print(f"[green]✅ {msg}[/green]")
def print_error(msg): console.print(f"[red]❌ {msg}[/red]")
"""枚举集中管理"""
def enum_help_text(mapping: dict) -> str:
"""生成 --help 枚举说明"""
return '(' + ', '.join(f'{k}={v}' for k, v in mapping.items()) + ')'
def enrich_enum(obj: dict, field: str, mapping: dict) -> dict:
"""为输出注入枚举翻译"""
val = obj.get(field)
desc = mapping.get(val, str(val) if val is not None else '-')
return {**obj, f'{field}Desc': desc}
def enrich_enum_list(items: list, mappings: list) -> list:
"""批量注入枚举翻译"""
result = []
for item in items:
for field, mapping in mappings:
item = enrich_enum(item, field, mapping)
result.append(item)
return result
# ─── 枚举映射(从仓库扫描后填入)─────────────────
STATUS_MAP = {
# ⚠️ 从仓库实际扫描到的枚举值
}
import click
from ..utils.auth import authenticate, _load_config, _save_config, _fetch_via_cdp, CACHE_PATH, TARGET_DOMAIN
from ..utils.config import save
@click.group()
def auth():
"""认证管理"""
pass
@auth.command()
def show():
"""查看认证状态"""
config = _load_config()
print(f" 策略:{config.get('auth_strategy', '未配置')}")
print(f" 域名:{TARGET_DOMAIN}")
@auth.command()
@click.option("--token", prompt=True, hide_input=True)
def login(token):
"""登录(Bearer Token)"""
_save_config({"access_token": token})
click.echo("✅ Token 已保存")
@auth.command("set-cookie")
@click.argument("cookie_str")
def set_cookie_cmd(cookie_str):
"""保存手动粘贴的 Cookie"""
CACHE_PATH.parent.mkdir(parents=True, exist_ok=True)
import json, time
CACHE_PATH.write_text(json.dumps({"value": cookie_str, "updatedAt": time.time()}))
click.echo("✅ Cookie 已保存")
@auth.command()
@click.option("--strategy", type=click.Choice(["bearer", "cookie", "api-key", "none"]))
@click.option("--api-key")
def setup(strategy, api_key):
"""配置鉴权方式"""
data = {}
if strategy: data["auth_strategy"] = strategy
if api_key: data["api_key"] = api_key
save(data)
click.echo(f"✅ 鉴权策略已设置为:{strategy or '保持不变'}")
@auth.command()
def test():
"""测试鉴权"""
try:
r = authenticate()
click.echo(f"✅ 鉴权成功({r.type}/{r.method})")
except Exception as e:
click.echo(f"❌ 鉴权失败:{e}", err=True)
raise SystemExit(1)
@auth.command()
def refresh():
"""刷新 Cookie"""
try:
cookie = _fetch_via_cdp(TARGET_DOMAIN)
import json, time
CACHE_PATH.parent.mkdir(parents=True, exist_ok=True)
CACHE_PATH.write_text(json.dumps({"value": cookie, "updatedAt": time.time()}))
click.echo("✅ Cookie 刷新成功")
except Exception as e:
click.echo(f"❌ 刷新失败:{e}", err=True)
raise SystemExit(1)
@auth.command()
def logout():
"""清除认证"""
from ..utils.auth import CONFIG_PATH
for p in [CONFIG_PATH, CACHE_PATH]:
if p.exists(): p.unlink()
click.echo("✅ 认证信息已清除")
"""活动管理命令"""
import click
from ..utils.http_client import get, post
from ..utils.output import print_table, print_json, print_error
from ..utils.enums import STATUS_MAP, enum_help_text, enrich_enum_list
@click.group("activity")
def activity():
"""活动管理"""
pass
@activity.command("list")
@click.option("--status", type=int, help=f"状态 {enum_help_text(STATUS_MAP)}")
@click.option("--page", default=1, type=int)
@click.option("--json", "as_json", is_flag=True)
@click.option("--env", default="prod", type=click.Choice(["prod", "staging", "test"]))
def list_(status, page, as_json, env):
"""查询列表"""
try:
res = post("/api/activity/list", data={"status": status, "page": page}, env=env)
items = res.get("data", {}).get("list", [])
if as_json:
print_json(res)
else:
enriched = enrich_enum_list(items, [("status", STATUS_MAP)])
print_table(enriched, [
{"key": "id", "label": "ID", "width": 12},
{"key": "name", "label": "名称", "width": 30},
{"key": "statusDesc", "label": "状态", "width": 10},
])
except Exception as e:
print_error(str(e))
raise SystemExit(1)
import click
from .commands.auth import auth
# from .commands.<domain> import <domain>
@click.group()
@click.version_option(version="1.0.0")
def cli():
"""<cli-name> — 前端 API CLI 工具"""
pass
cli.add_command(auth)
# cli.add_command(<domain>)
if __name__ == "__main__":
cli()
<output-dir>/<cli-name>/
├── package.json
├── tsconfig.json
├── bin/run
└── src/
├── index.ts
├── commands/
│ ├── auth/
│ └── <domain>/
└── lib/
├── auth.ts
├── http.ts
├── config.ts
├── output.ts
└── enums.ts
{
"name": "<cli-name>",
"version": "1.0.0",
"bin": { "<cli-name>": "./bin/run" },
"main": "dist/index.js",
"scripts": {
"build": "tsc -p tsconfig.json"
},
"dependencies": {
"@oclif/core": "^3",
"ws": "^8",
"chalk": "^5",
"cli-table3": "^0.6",
"axios": "^1"
},
"devDependencies": {
"@types/node": "^20",
"@types/ws": "^8",
"typescript": "^5"
},
"oclif": {
"bin": "<cli-name>",
"commands": "./dist/commands"
}
}
#!/usr/bin/env node
require('../dist/index.js')
import * as fs from 'fs'
import * as path from 'path'
import * as os from 'os'
import * as http from 'http'
import WebSocket from 'ws'
const COOKIE_CACHE_PATH = path.join(os.homedir(), '.config', '<cli-name>', 'auth.json')
const CONFIG_PATH = path.join(os.homedir(), '.config', '<cli-name>', 'config.json')
const CDP_PORT = parseInt(process.env.CDP_PORT || '9222', 10)
const TARGET_DOMAIN = '<从仓库读取的真实域名>'
export type AuthType = 'bearer' | 'cookie' | 'api-key'
export interface AuthResult {
type: AuthType
token?: string
cookie?: string
method: string
}
function loadConfig(): Record<string, unknown> {
if (!fs.existsSync(CONFIG_PATH)) return {}
return JSON.parse(fs.readFileSync(CONFIG_PATH, 'utf-8'))
}
export function saveConfig(data: Record<string, unknown>): void {
fs.mkdirSync(path.dirname(CONFIG_PATH), { recursive: true })
const current = loadConfig()
fs.writeFileSync(CONFIG_PATH, JSON.stringify({ ...current, ...data }, null, 2))
}
async function fetchViaCdp(domain: string): Promise<string> {
const tabs = await new Promise<any[]>((resolve, reject) => {
http.get(`http://127.0.0.1:${CDP_PORT}/json`, res => {
let data = ''
res.on('data', chunk => data += chunk)
res.on('end', () => resolve(JSON.parse(data)))
}).on('error', reject)
})
const tab = tabs.find(t => t.type === 'page' && t.url?.includes(domain)) || tabs.find(t => t.type === 'page')
if (!tab?.webSocketDebuggerUrl) throw new Error('未找到活跃 Chrome 页面')
return new Promise((resolve, reject) => {
const ws = new WebSocket(tab.webSocketDebuggerUrl)
const timer = setTimeout(() => { ws.close(); reject(new Error('CDP 超时')) }, 10000)
ws.on('open', () => ws.send(JSON.stringify({ id: 1, method: 'Network.getCookies', params: { urls: [`https://${domain}`] } })))
ws.on('message', (data: Buffer) => {
const msg = JSON.parse(data.toString())
if (msg.id === 1) {
clearTimeout(timer); ws.close()
const cookies = msg.result?.cookies || []
if (!cookies.length) reject(new Error(`未找到 ${domain} 的 Cookie`))
else resolve(cookies.map((c: any) => `${c.name}=${c.value}`).join('; '))
}
})
})
}
export async function authenticate(): Promise<AuthResult> {
const config = loadConfig()
const strategy = (config.auth_strategy as string) || 'bearer'
if (strategy === 'bearer') {
const token = config.access_token as string
if (!token) throw new Error('未找到 token,请运行:<cli-name> auth login')
return { type: 'bearer', token, method: 'cache' }
}
if (strategy === 'cookie') {
try {
const cookie = await fetchViaCdp(TARGET_DOMAIN)
return { type: 'cookie', cookie, method: 'cdp' }
} catch {}
if (fs.existsSync(COOKIE_CACHE_PATH)) {
const cache = JSON.parse(fs.readFileSync(COOKIE_CACHE_PATH, 'utf-8'))
const ageMin = (Date.now() - cache.updatedAt) / 60000
if (ageMin < 24 * 60) return { type: 'cookie', cookie: cache.value, method: 'cache' }
}
throw new Error('Cookie 鉴权失败,请运行:<cli-name> auth refresh')
}
throw new Error(`未知策略:${strategy}`)
}
import * as fs from 'fs'
import * as path from 'path'
import * as os from 'os'
const CONFIG_PATH = path.join(os.homedir(), '.config', '<cli-name>', 'config.json')
export function loadConfig(): Record<string, unknown> {
if (!fs.existsSync(CONFIG_PATH)) return {}
return JSON.parse(fs.readFileSync(CONFIG_PATH, 'utf-8'))
}
export function saveConfig(data: Record<string, unknown>): void {
fs.mkdirSync(path.dirname(CONFIG_PATH), { recursive: true })
const current = loadConfig()
fs.writeFileSync(CONFIG_PATH, JSON.stringify({ ...current, ...data }, null, 2))
}
import axios, { AxiosInstance } from 'axios'
import { authenticate, AuthResult } from './auth'
const BASE_URLS: Record<string, string> = {
prod: 'https://<DOMAIN>',
staging: 'https://<STAGING_DOMAIN>',
}
let _client: AxiosInstance | null = null
export async function getClient(env = 'prod'): Promise<AxiosInstance> {
if (_client) return _client
const auth = await authenticate()
const headers: Record<string, string> = { 'Content-Type': 'application/json' }
if (auth.type === 'bearer' && auth.token) headers['Authorization'] = `Bearer ${auth.token}`
else if (auth.type === 'cookie' && auth.cookie) headers['Cookie'] = auth.cookie
_client = axios.create({ baseURL: BASE_URLS[env], timeout: 30000, headers })
_client.interceptors.response.use(
res => {
const code = res.data?.code
if (code !== undefined && code !== 0 && code !== 200) throw new Error(`业务错误:${res.data?.message}`)
return res.data
},
err => { if (err.response?.status === 401) throw new Error('认证失败'); throw err }
)
return _client
}
export async function get(path: string, params?: any, env = 'prod') {
const client = await getClient(env)
return client.get(path, { params })
}
export async function post(path: string, data?: any, env = 'prod') {
const client = await getClient(env)
return client.post(path, data)
}
import Table from 'cli-table3'
import chalk from 'chalk'
export function printTable(data: Record<string, unknown>[], columns: { key: string; label: string; width?: number }[]): void {
if (!data.length) { console.log(chalk.gray('暂无数据')); return }
const table = new Table({ head: columns.map(c => chalk.cyan(c.label)), colWidths: columns.map(c => c.width) })
for (const row of data) table.push(columns.map(c => String(row[c.key] ?? '-')))
console.log(table.toString())
}
export const printJson = (data: unknown) => console.log(JSON.stringify(data, null, 2))
export const printSuccess = (msg: string) => console.log(chalk.green(`✅ ${msg}`))
export const printError = (msg: string) => console.error(chalk.red(`❌ ${msg}`))
export function enumHelpText(map: Record<number | string, string>): string {
return '(' + Object.entries(map).map(([k, v]) => `${k}=${v}`).join(', ') + ')'
}
export function enrichEnum<T extends Record<string, unknown>>(obj: T, field: string, map: Record<number | string, string>): T & Record<string, string> {
const val = obj[field]
const desc = val !== undefined && val !== null ? (map[val as number | string] ?? String(val)) : '-'
return { ...obj, [`${field}Desc`]: desc } as T & Record<string, string>
}
import { Command, Flags } from '@oclif/core'
import { saveConfig } from '../../lib/config'
import { printSuccess } from '../../lib/output'
export default class AuthLogin extends Command {
static description = '登录认证'
static flags = { token: Flags.string({ description: 'Access Token', required: true }) }
async run() {
const { flags } = await this.parse(AuthLogin)
saveConfig({ access_token: flags.token, auth_strategy: 'bearer' })
printSuccess('Token 已保存')
}
}
import { Command } from '@oclif/core'
import * as fs from 'fs'
import * as path from 'path'
import * as os from 'os'
import { printSuccess, printError } from '../../lib/output'
const COOKIE_CACHE_PATH = path.join(os.homedir(), '.config', '<cli-name>', 'auth.json')
const TARGET_DOMAIN = '<从仓库读取的真实域名>'
export default class AuthRefresh extends Command {
static description = '刷新 Cookie'
async run() {
try {
// CDP 获取 Cookie 逻辑
printSuccess('Cookie 刷新成功')
} catch (e) {
printError(String(e))
this.exit(1)
}
}
}
cd <output-dir>/<cli-name>
pip install -e .
<cli-name> --help
<cli-name> auth show
cd <output-dir>/<cli-name>
npm install
npm run build
npm link
<cli-name> --help
<cli-name> auth show
✅ CLI 工具已生成完成!
📦 项目路径:<output-dir>/<cli-name>
🔧 安装命令:
cd <output-dir>/<cli-name>
pip install -e . # Python
# 或
npm install && npm run build && npm link # Node.js
📋 快速开始:
<cli-name> --help
<cli-name> auth show
<cli-name> auth login
<cli-name> <domain> list --help
💡 提示:
- 如果使用 Cookie 鉴权,请先在 Chrome 中登录目标系统
- 以调试模式启动 Chrome:open -a 'Google Chrome' --args --remote-debugging-port=9222
- 运行 <cli-name> auth refresh 获取 Cookie
检查:
auth show auth setup --strategy --env stagingBearer Token:运行 重新登录
Cookie:
open -a 'Google Chrome' --args --remote-debugging-port=9222 auth refresh 确保:
__init__.py 文件都存在pip install -e . 在项目根目录执行检查:
http_client.py 中的 BASE_URLS 是否正确/ 开头确保:
--remote-debugging-port=9222共 2 个版本