← 返回
未分类

fe-api2cli(扫描前端仓库生成可执行CLI)

扫描前端仓库(React/Vue/Taro/monorepo),先全面分析仓库结构再决定去哪里找接口(不预设 api/ 目录),自动分析接口定义、TypeScript 类型、枚举常量、鉴权方式,生成基于 Python+Click 或 Node.js+oclif 的可执行 CLI 工具。生成前输出完整技术方案供用户确认,支持多种鉴权方式(Bearer Token / Cookie / API Key / 无鉴权)、多环境切换等通用模块。当用户提到"前端生成CLI"、"fe-api2cli"、"把前端接口做成命令行"、"扫描前端仓库生成CLI"、"前端项目命令行化"、"api转cli"时使用。
空梦
未分类 community v1.0.1 2 版本 99259.3 Key: 无需
★ 0
Stars
📥 134
下载
💾 8
安装
2
版本
#latest

概述

fe-api2cli

扫描前端仓库(支持 monorepo),先全面分析仓库结构,再自动生成可执行的 CLI 工具。

核心亮点

  • 先读懂仓库再找接口(不预设目录名)
  • 多种通用鉴权方案(Bearer Token / Cookie / API Key / 无鉴权)
  • TypeScript 类型 & 枚举完整解析
  • 方案预览确认后再生成
  • 支持 Python / Node.js 双语言

Step 1:收集基本信息

向用户确认以下几项(消息中已有则直接使用,无需再问):

1. 仓库路径:前端业务仓库本地路径,可多个。如未 clone,提供 git 地址则帮用户 clone。

2. CLI 命名规范:询问用户是否有团队/公司的 CLI 命名规范文档,如有请提供链接或描述。

若有规范文档,先读取并理解,再据此决定:

  • CLI 命令名格式
  • 允许的 Action 动词列表(如只允许 list/get/create/update/delete
  • 参数命名风格(kebab-case / camelCase 等)
  • 输出格式要求(默认 JSON / table 等)

若用户没有规范,使用以下通用默认值:

  • 命令名:仓库名或用户指定名称
  • Action 动词:listgetcreateupdatedeletesearch
  • 参数风格:kebab-case(--poi-id 而非 --poiId
  • 输出:默认 JSON,--format table 切换表格

3. CLI 名称:根据规范或用户指定确定命令名(默认用仓库名)

4. 输出路径:询问用户 CLI 项目生成到哪个目录(默认当前工作目录下的 /

5. 语言选择

> 生成的 CLI 用哪种语言?

> - Python(推荐非开发者用户,macOS 自带环境,零配置)

> - Node.js(推荐前端开发者,TypeScript 支持,便于自己维护)

6. 鉴权方式:Step 2.4 会自动从仓库读取,此处无需询问。


Step 2:自动扫描分析仓库

不问用户,自己读,按以下顺序扫描。每一步都必须实际执行,不能跳过。

> ⚠️ 核心原则:Step 2 最重要的任务是建立完整的"仓库画像",而不只是找到接口文件列表。

在开始生成任何代码之前,必须搞清楚以下六个维度,缺一不可:

维度需要读懂的内容
------
目录规范接口文件在哪里?类型文件在哪里?枚举/常量在哪里?
API 调用规范项目用什么 HTTP 封装?函数名是什么?调用方式长什么样?
域名 & 前缀baseURL 是什么?各环境域名?接口路径前缀?
枚举 & 常量所有数字枚举的含义?状态码映射?业务常量?
参数 & 类型每个接口的请求参数字段、类型、必填/可选?嵌套类型?
代码注释接口文件、类型文件、常量文件里的注释是参数含义的第一手资料

2.1 全局仓库结构分析(⚠️ 必须最先执行)

REPO_DIR=<仓库本地路径>

# 读取 package.json
cat $REPO_DIR/package.json

# 判断是否是 monorepo
ls $REPO_DIR/packages/ 2>/dev/null && echo "IS MONOREPO" || echo "SINGLE PACKAGE"

# 列出完整目录树(深度 4 层)
find $REPO_DIR -maxdepth 4 -type d | grep -v node_modules | grep -v "\.git" | grep -v dist | sort

# 检查 TypeScript 和包管理器
ls $REPO_DIR/tsconfig.json 2>/dev/null && echo "有TS" || echo "无TS"

读完目录树后,必须回答

  • 仓库是 monorepo 还是单包?
  • 哪些子包/目录可能包含接口定义?
  • 接口文件的命名规律是什么?
  • 类型/枚举文件在哪里?

2.2 接口层文件定位与读取

核心策略:先采样 → 归纳调用模式 → 全量扫描。

基于 2.1 的结论,直接读取 2~3 个接口文件,观察:

  • 导入了什么 HTTP 工具?
  • 实际调用长什么样?
  • URL 路径前缀是什么?

归纳出 HTTP 封装函数名后,做全量扫描:

grep -rn "<封装函数名>(" $REPO_DIR --include="*.ts" --include="*.js" -l \
  | grep -v node_modules | grep -v dist | sort

补充扫描

# OpenAPI / Swagger 文件
find $REPO_DIR \( -name "swagger.json" -o -name "openapi.json" \) | grep -v node_modules

# GraphQL
grep -rn "gql\`|useQuery|useMutation" $REPO_DIR --include="*.ts" -l | grep -v node_modules

2.3 必须读取 TypeScript 类型文件

这是最容易出错的步骤。 接口函数签名只有参数名,真正的字段详情在类型文件里。

find $REPO_DIR \( -path "*/types/*.ts" -o -path "*/domain/*.ts" -o -path "*/models/*.ts" \) \
  | grep -v node_modules | grep -v "dist/" | sort

对每个接口,必须同时读取对应的类型文件,从中提取:

  • interface / type 的所有字段(包括可选字段 ?:
  • 字段的 TypeScript 类型
  • 枚举值
  • 嵌套类型

2.4 读取 HTTP 请求封装(鉴权 & Header)

find $REPO_DIR \( -name "request.ts" -o -name "http.ts" -o -name "axios.ts" \) \
  | grep -v node_modules | head -10

必须提取

  • baseURL 各环境值
  • 所有自定义 header
  • 鉴权方式
  • 响应 code 字段

鉴权方式识别

识别特征鉴权类型推荐方案
---------
Authorization: Bearer Bearer Token auth login 命令
Cookie header浏览器 CookieCDP 自动抓取 / 手动粘贴
X-API-KeyAPI Key配置文件或环境变量
无鉴权无需鉴权直接调用

通用鉴权方案

方案 A:Bearer Token 登录

  • auth login 命令触发登录流程
  • Token 缓存到本地

方案 B:Cookie 鉴权

  • B1:CDP 自动抓取 — 通过 Chrome DevTools Protocol 从本地 Chrome 读取 Cookie
  • B2:手动粘贴 Cookie — 用户从浏览器复制 Cookie
  • B3:本地缓存 — Cookie 成功获取后缓存,24 小时有效

方案 C:API Key 配置

  • auth setup --api-key 写入配置文件
  • 或环境变量 _API_KEY

方案 D:无需鉴权

  • 直接调用,不生成 auth 模块

2.5 读取环境配置

find $REPO_DIR -maxdepth 3 \( -name ".env" -o -name ".env.*" -o -name "config.ts" \) \
  | grep -v node_modules | xargs grep -h "BASE_URL|API_URL|HOST|DOMAIN" 2>/dev/null

2.6 必须读取枚举/常量文件

这是生成高质量 CLI 的关键步骤。 前端业务代码中大量使用数字枚举。

// 活动状态
export const ACTIVITY_STATUS = {
  10: '待审核',   // 提交后等待运营审核
  20: '进行中',   // 审核通过,活动生效
  30: '已结束',   // 活动到期或手动关闭
}

必须提取

  • 枚举值和含义
  • 代码注释中的业务说明

Step 3:输出技术方案,等用户确认

必须先输出方案,等用户明确确认后再生成代码。

📊 扫描结果

仓库:<repo-name>  框架:React/Vue/Taro  结构:monorepo / 单包
接口文件:<实际接口目录路径> 下共 N 个文件,M 个接口
类型文件:<实际类型目录路径> 下共 K 个文件(已全部读取)
枚举文件:<实际枚举目录路径> 下共 J 个文件(已全部读取)
语言:Python / Node.js(用户选择)

🔐 鉴权识别结果
检测到:<鉴权类型>
目标系统域名:<从仓库读取的真实域名>
自定义 Header:<从 request.ts 读到的 header 列表>

请确认使用哪种鉴权方案:
  [ ] Bearer Token 登录(auth login 命令)
  [ ] Cookie 鉴权(CDP 自动抓取 / 手动粘贴)
  [ ] API Key 配置
  [ ] 无需鉴权

📋 命令预览(含关键参数)
<cli-name> auth show              # 查看认证状态
<cli-name> auth login             # 登录认证
<cli-name> <domain1> list         # 列表查询
...

Step 4A:生成 Python + Click 版本

目录结构

<output-dir>/<cli-name>/
├── setup.py
├── README.md
└── cli/
    └── <cli_name>/
        ├── __init__.py
        ├── __main__.py
        ├── main.py
        ├── commands/
        │   ├── __init__.py
        │   ├── auth.py
        │   └── <domain>.py
        └── utils/
            ├── __init__.py
            ├── auth.py
            ├── http_client.py
            ├── config.py
            ├── output.py
            └── enums.py

setup.py

from setuptools import setup, find_packages

setup(
    name="<cli-name>",
    version="1.0.0",
    packages=find_packages(),
    install_requires=[
        "click>=8.0",
        "requests>=2.28",
        "rich>=13.0",
    ],
    entry_points={
        "console_scripts": [
            "<cli-name>=cli.<cli_name>.main:cli"
        ]
    },
    python_requires=">=3.8",
)

utils/auth.py

"""
通用鉴权模块(可插拔策略)
支持:Bearer Token / Cookie(CDP + 缓存)/ API Key / 无鉴权
"""
import json, os, time, threading, urllib.request
from pathlib import Path
from typing import Optional, Dict

CACHE_PATH = Path.home() / ".config" / "<cli-name>" / "auth.json"
CONFIG_PATH = Path.home() / ".config" / "<cli-name>" / "config.json"
TARGET_DOMAIN = "<从仓库读取的真实域名>"
CDP_PORT = int(os.environ.get("CDP_PORT", "9222"))
COOKIE_CACHE_TTL_MIN = 24 * 60


class AuthResult:
    def __init__(self, auth_type: str, method: str, token: str = None, cookie: str = None):
        self.type = auth_type
        self.method = method
        self.token = token
        self.cookie = cookie

    def get_headers(self) -> Dict[str, str]:
        if self.type == "bearer" and self.token:
            return {"Authorization": f"Bearer {self.token}"}
        elif self.type == "cookie" and self.cookie:
            return {"Cookie": self.cookie}
        raise RuntimeError(f"无效的认证结果: type={self.type}")


def _load_config() -> dict:
    if not CONFIG_PATH.exists(): return {}
    return json.loads(CONFIG_PATH.read_text())


def _save_config(data: dict):
    CONFIG_PATH.parent.mkdir(parents=True, exist_ok=True)
    current = _load_config()
    current.update(data)
    CONFIG_PATH.write_text(json.dumps(current, ensure_ascii=False, indent=2))


def _fetch_via_cdp(domain: str) -> str:
    """通过 Chrome DevTools Protocol 获取 Cookie"""
    import websocket
    url = f"http://127.0.0.1:{CDP_PORT}/json"
    with urllib.request.urlopen(url, timeout=5) as r:
        tabs = json.loads(r.read())
    tab = next((t for t in tabs if t.get("type") == "page" and domain in t.get("url", "")), None)
    if not tab: tab = next((t for t in tabs if t.get("type") == "page"), None)
    if not tab: raise RuntimeError(f"未找到活跃 Chrome 页面")

    ws_url = tab.get("webSocketDebuggerUrl")
    result, done = {}, threading.Event()

    def on_open(ws):
        ws.send(json.dumps({"id": 1, "method": "Network.getCookies",
                            "params": {"urls": [f"https://{domain}"]}}))

    def on_message(ws, msg):
        m = json.loads(msg)
        if m.get("id") == 1:
            result["cookies"] = m.get("result", {}).get("cookies", [])
            done.set()
            ws.close()

    ws = websocket.WebSocketApp(ws_url, on_open=on_open, on_message=on_message)
    threading.Thread(target=ws.run_forever, daemon=True).start()
    done.wait(timeout=10)

    cookies = result.get("cookies", [])
    if not cookies: raise RuntimeError(f"未找到 {domain} 的 Cookie")
    return "; ".join(f"{c['name']}={c['value']}" for c in cookies)


def authenticate() -> AuthResult:
    """统一鉴权入口"""
    config = _load_config()
    strategy = config.get("auth_strategy", "bearer")

    if strategy == "bearer":
        token = config.get("access_token")
        if not token: raise RuntimeError("未找到 token,请运行:<cli-name> auth login")
        return AuthResult(auth_type="bearer", method="token-cache", token=token)

    elif strategy == "cookie":
        # 尝试 CDP
        try:
            cookie_str = _fetch_via_cdp(TARGET_DOMAIN)
            return AuthResult(auth_type="cookie", method="cdp", cookie=cookie_str)
        except: pass

        # 尝试缓存
        if CACHE_PATH.exists():
            cache = json.loads(CACHE_PATH.read_text())
            age_min = (time.time() - cache.get("updatedAt", 0)) / 60
            if age_min < COOKIE_CACHE_TTL_MIN:
                return AuthResult(auth_type="cookie", method="cache", cookie=cache["value"])

        raise RuntimeError("Cookie 鉴权失败,请运行:<cli-name> auth refresh")

    elif strategy == "api-key":
        api_key = config.get("api_key") or os.environ.get("<CLI_NAME>_API_KEY")
        if not api_key: raise RuntimeError("未找到 API Key")
        return AuthResult(auth_type="api-key", method="env")

    raise RuntimeError(f"未知策略:{strategy}")


def get_auth_header() -> Dict[str, str]:
    return authenticate().get_headers()

utils/http_client.py

"""HTTP 请求封装"""
import requests
from .auth import get_auth_header

BASE_URLS = {
    "prod": "https://<DOMAIN>",
    "staging": "https://<STAGING_DOMAIN>",
    "test": "https://<TEST_DOMAIN>",
}

EXTRA_HEADERS = {}  # 从仓库 request.ts 读取

_session = None
_current_env = "prod"


def get_session(env="prod"):
    global _session, _current_env
    if _session and _current_env == env: return _session
    auth_headers = get_auth_header()
    _session = requests.Session()
    _session.headers.update({
        **auth_headers,
        "Content-Type": "application/json",
        **EXTRA_HEADERS,
    })
    _session.base_url = BASE_URLS.get(env, BASE_URLS["prod"])
    _current_env = env
    return _session


def get(path, params=None, env="prod"):
    s = get_session(env)
    r = s.get(s.base_url.rstrip("/") + "/" + path.lstrip("/"), params=params, timeout=30)
    return _check_response(r)


def post(path, data=None, env="prod"):
    s = get_session(env)
    r = s.post(s.base_url.rstrip("/") + "/" + path.lstrip("/"), json=data or {}, timeout=30)
    return _check_response(r)


def _check_response(r):
    if r.status_code == 401: raise RuntimeError("认证失败")
    if r.status_code == 404: raise RuntimeError(f"接口不存在:{r.url}")
    if not r.ok: raise RuntimeError(f"请求失败 [{r.status_code}]:{r.text[:200]}")
    body = r.json()
    code = body.get("code")
    if code is not None and code != 0 and code != 200:
        raise RuntimeError(f"业务错误 [code={code}]:{body.get('message', '')}")
    return body

utils/config.py

"""配置管理"""
import json
from pathlib import Path

CONFIG_PATH = Path.home() / ".config" / "<cli-name>" / "config.json"


def load() -> dict:
    if not CONFIG_PATH.exists():
        return {}
    return json.loads(CONFIG_PATH.read_text())


def save(data: dict):
    CONFIG_PATH.parent.mkdir(parents=True, exist_ok=True)
    current = load()
    current.update(data)
    CONFIG_PATH.write_text(json.dumps(current, ensure_ascii=False, indent=2))

utils/output.py

from rich.console import Console
from rich.table import Table
import json

console = Console()

def print_table(data: list, columns: list, title: str = None):
    if not data:
        console.print(f"[grey50]{title or ''}  暂无数据[/grey50]")
        return
    table = Table(title=title, show_header=True, header_style="bold cyan")
    for col in columns:
        table.add_column(col["label"], width=col.get("width"))
    for row in data:
        table.add_row(*[str(row.get(c["key"], "-")) for c in columns])
    console.print(table)

def print_json(data): print(json.dumps(data, ensure_ascii=False, indent=2))
def print_success(msg): console.print(f"[green]✅ {msg}[/green]")
def print_error(msg): console.print(f"[red]❌ {msg}[/red]")

utils/enums.py

"""枚举集中管理"""

def enum_help_text(mapping: dict) -> str:
    """生成 --help 枚举说明"""
    return '(' + ', '.join(f'{k}={v}' for k, v in mapping.items()) + ')'

def enrich_enum(obj: dict, field: str, mapping: dict) -> dict:
    """为输出注入枚举翻译"""
    val = obj.get(field)
    desc = mapping.get(val, str(val) if val is not None else '-')
    return {**obj, f'{field}Desc': desc}

def enrich_enum_list(items: list, mappings: list) -> list:
    """批量注入枚举翻译"""
    result = []
    for item in items:
        for field, mapping in mappings:
            item = enrich_enum(item, field, mapping)
        result.append(item)
    return result

# ─── 枚举映射(从仓库扫描后填入)─────────────────
STATUS_MAP = {
    # ⚠️ 从仓库实际扫描到的枚举值
}

commands/auth.py

import click
from ..utils.auth import authenticate, _load_config, _save_config, _fetch_via_cdp, CACHE_PATH, TARGET_DOMAIN
from ..utils.config import save

@click.group()
def auth():
    """认证管理"""
    pass

@auth.command()
def show():
    """查看认证状态"""
    config = _load_config()
    print(f"  策略:{config.get('auth_strategy', '未配置')}")
    print(f"  域名:{TARGET_DOMAIN}")

@auth.command()
@click.option("--token", prompt=True, hide_input=True)
def login(token):
    """登录(Bearer Token)"""
    _save_config({"access_token": token})
    click.echo("✅ Token 已保存")

@auth.command("set-cookie")
@click.argument("cookie_str")
def set_cookie_cmd(cookie_str):
    """保存手动粘贴的 Cookie"""
    CACHE_PATH.parent.mkdir(parents=True, exist_ok=True)
    import json, time
    CACHE_PATH.write_text(json.dumps({"value": cookie_str, "updatedAt": time.time()}))
    click.echo("✅ Cookie 已保存")

@auth.command()
@click.option("--strategy", type=click.Choice(["bearer", "cookie", "api-key", "none"]))
@click.option("--api-key")
def setup(strategy, api_key):
    """配置鉴权方式"""
    data = {}
    if strategy: data["auth_strategy"] = strategy
    if api_key: data["api_key"] = api_key
    save(data)
    click.echo(f"✅ 鉴权策略已设置为:{strategy or '保持不变'}")

@auth.command()
def test():
    """测试鉴权"""
    try:
        r = authenticate()
        click.echo(f"✅ 鉴权成功({r.type}/{r.method})")
    except Exception as e:
        click.echo(f"❌ 鉴权失败:{e}", err=True)
        raise SystemExit(1)

@auth.command()
def refresh():
    """刷新 Cookie"""
    try:
        cookie = _fetch_via_cdp(TARGET_DOMAIN)
        import json, time
        CACHE_PATH.parent.mkdir(parents=True, exist_ok=True)
        CACHE_PATH.write_text(json.dumps({"value": cookie, "updatedAt": time.time()}))
        click.echo("✅ Cookie 刷新成功")
    except Exception as e:
        click.echo(f"❌ 刷新失败:{e}", err=True)
        raise SystemExit(1)

@auth.command()
def logout():
    """清除认证"""
    from ..utils.auth import CONFIG_PATH
    for p in [CONFIG_PATH, CACHE_PATH]:
        if p.exists(): p.unlink()
    click.echo("✅ 认证信息已清除")

commands/.py 示例

"""活动管理命令"""
import click
from ..utils.http_client import get, post
from ..utils.output import print_table, print_json, print_error
from ..utils.enums import STATUS_MAP, enum_help_text, enrich_enum_list

@click.group("activity")
def activity():
    """活动管理"""
    pass

@activity.command("list")
@click.option("--status", type=int, help=f"状态 {enum_help_text(STATUS_MAP)}")
@click.option("--page", default=1, type=int)
@click.option("--json", "as_json", is_flag=True)
@click.option("--env", default="prod", type=click.Choice(["prod", "staging", "test"]))
def list_(status, page, as_json, env):
    """查询列表"""
    try:
        res = post("/api/activity/list", data={"status": status, "page": page}, env=env)
        items = res.get("data", {}).get("list", [])
        if as_json:
            print_json(res)
        else:
            enriched = enrich_enum_list(items, [("status", STATUS_MAP)])
            print_table(enriched, [
                {"key": "id", "label": "ID", "width": 12},
                {"key": "name", "label": "名称", "width": 30},
                {"key": "statusDesc", "label": "状态", "width": 10},
            ])
    except Exception as e:
        print_error(str(e))
        raise SystemExit(1)

main.py

import click
from .commands.auth import auth
# from .commands.<domain> import <domain>

@click.group()
@click.version_option(version="1.0.0")
def cli():
    """<cli-name> — 前端 API CLI 工具"""
    pass

cli.add_command(auth)
# cli.add_command(<domain>)

if __name__ == "__main__":
    cli()

Step 4B:生成 Node.js + oclif 版本

目录结构

<output-dir>/<cli-name>/
├── package.json
├── tsconfig.json
├── bin/run
└── src/
    ├── index.ts
    ├── commands/
    │   ├── auth/
    │   └── <domain>/
    └── lib/
        ├── auth.ts
        ├── http.ts
        ├── config.ts
        ├── output.ts
        └── enums.ts

package.json

{
  "name": "<cli-name>",
  "version": "1.0.0",
  "bin": { "<cli-name>": "./bin/run" },
  "main": "dist/index.js",
  "scripts": {
    "build": "tsc -p tsconfig.json"
  },
  "dependencies": {
    "@oclif/core": "^3",
    "ws": "^8",
    "chalk": "^5",
    "cli-table3": "^0.6",
    "axios": "^1"
  },
  "devDependencies": {
    "@types/node": "^20",
    "@types/ws": "^8",
    "typescript": "^5"
  },
  "oclif": {
    "bin": "<cli-name>",
    "commands": "./dist/commands"
  }
}

bin/run

#!/usr/bin/env node
require('../dist/index.js')

src/lib/auth.ts

import * as fs from 'fs'
import * as path from 'path'
import * as os from 'os'
import * as http from 'http'
import WebSocket from 'ws'

const COOKIE_CACHE_PATH = path.join(os.homedir(), '.config', '<cli-name>', 'auth.json')
const CONFIG_PATH = path.join(os.homedir(), '.config', '<cli-name>', 'config.json')
const CDP_PORT = parseInt(process.env.CDP_PORT || '9222', 10)
const TARGET_DOMAIN = '<从仓库读取的真实域名>'

export type AuthType = 'bearer' | 'cookie' | 'api-key'

export interface AuthResult {
  type: AuthType
  token?: string
  cookie?: string
  method: string
}

function loadConfig(): Record<string, unknown> {
  if (!fs.existsSync(CONFIG_PATH)) return {}
  return JSON.parse(fs.readFileSync(CONFIG_PATH, 'utf-8'))
}

export function saveConfig(data: Record<string, unknown>): void {
  fs.mkdirSync(path.dirname(CONFIG_PATH), { recursive: true })
  const current = loadConfig()
  fs.writeFileSync(CONFIG_PATH, JSON.stringify({ ...current, ...data }, null, 2))
}

async function fetchViaCdp(domain: string): Promise<string> {
  const tabs = await new Promise<any[]>((resolve, reject) => {
    http.get(`http://127.0.0.1:${CDP_PORT}/json`, res => {
      let data = ''
      res.on('data', chunk => data += chunk)
      res.on('end', () => resolve(JSON.parse(data)))
    }).on('error', reject)
  })
  const tab = tabs.find(t => t.type === 'page' && t.url?.includes(domain)) || tabs.find(t => t.type === 'page')
  if (!tab?.webSocketDebuggerUrl) throw new Error('未找到活跃 Chrome 页面')

  return new Promise((resolve, reject) => {
    const ws = new WebSocket(tab.webSocketDebuggerUrl)
    const timer = setTimeout(() => { ws.close(); reject(new Error('CDP 超时')) }, 10000)
    ws.on('open', () => ws.send(JSON.stringify({ id: 1, method: 'Network.getCookies', params: { urls: [`https://${domain}`] } })))
    ws.on('message', (data: Buffer) => {
      const msg = JSON.parse(data.toString())
      if (msg.id === 1) {
        clearTimeout(timer); ws.close()
        const cookies = msg.result?.cookies || []
        if (!cookies.length) reject(new Error(`未找到 ${domain} 的 Cookie`))
        else resolve(cookies.map((c: any) => `${c.name}=${c.value}`).join('; '))
      }
    })
  })
}

export async function authenticate(): Promise<AuthResult> {
  const config = loadConfig()
  const strategy = (config.auth_strategy as string) || 'bearer'

  if (strategy === 'bearer') {
    const token = config.access_token as string
    if (!token) throw new Error('未找到 token,请运行:<cli-name> auth login')
    return { type: 'bearer', token, method: 'cache' }
  }

  if (strategy === 'cookie') {
    try {
      const cookie = await fetchViaCdp(TARGET_DOMAIN)
      return { type: 'cookie', cookie, method: 'cdp' }
    } catch {}

    if (fs.existsSync(COOKIE_CACHE_PATH)) {
      const cache = JSON.parse(fs.readFileSync(COOKIE_CACHE_PATH, 'utf-8'))
      const ageMin = (Date.now() - cache.updatedAt) / 60000
      if (ageMin < 24 * 60) return { type: 'cookie', cookie: cache.value, method: 'cache' }
    }
    throw new Error('Cookie 鉴权失败,请运行:<cli-name> auth refresh')
  }

  throw new Error(`未知策略:${strategy}`)
}

src/lib/config.ts

import * as fs from 'fs'
import * as path from 'path'
import * as os from 'os'

const CONFIG_PATH = path.join(os.homedir(), '.config', '<cli-name>', 'config.json')

export function loadConfig(): Record<string, unknown> {
  if (!fs.existsSync(CONFIG_PATH)) return {}
  return JSON.parse(fs.readFileSync(CONFIG_PATH, 'utf-8'))
}

export function saveConfig(data: Record<string, unknown>): void {
  fs.mkdirSync(path.dirname(CONFIG_PATH), { recursive: true })
  const current = loadConfig()
  fs.writeFileSync(CONFIG_PATH, JSON.stringify({ ...current, ...data }, null, 2))
}

src/lib/http.ts

import axios, { AxiosInstance } from 'axios'
import { authenticate, AuthResult } from './auth'

const BASE_URLS: Record<string, string> = {
  prod: 'https://<DOMAIN>',
  staging: 'https://<STAGING_DOMAIN>',
}

let _client: AxiosInstance | null = null

export async function getClient(env = 'prod'): Promise<AxiosInstance> {
  if (_client) return _client
  const auth = await authenticate()
  const headers: Record<string, string> = { 'Content-Type': 'application/json' }
  if (auth.type === 'bearer' && auth.token) headers['Authorization'] = `Bearer ${auth.token}`
  else if (auth.type === 'cookie' && auth.cookie) headers['Cookie'] = auth.cookie

  _client = axios.create({ baseURL: BASE_URLS[env], timeout: 30000, headers })
  _client.interceptors.response.use(
    res => {
      const code = res.data?.code
      if (code !== undefined && code !== 0 && code !== 200) throw new Error(`业务错误:${res.data?.message}`)
      return res.data
    },
    err => { if (err.response?.status === 401) throw new Error('认证失败'); throw err }
  )
  return _client
}

export async function get(path: string, params?: any, env = 'prod') {
  const client = await getClient(env)
  return client.get(path, { params })
}

export async function post(path: string, data?: any, env = 'prod') {
  const client = await getClient(env)
  return client.post(path, data)
}

src/lib/output.ts

import Table from 'cli-table3'
import chalk from 'chalk'

export function printTable(data: Record<string, unknown>[], columns: { key: string; label: string; width?: number }[]): void {
  if (!data.length) { console.log(chalk.gray('暂无数据')); return }
  const table = new Table({ head: columns.map(c => chalk.cyan(c.label)), colWidths: columns.map(c => c.width) })
  for (const row of data) table.push(columns.map(c => String(row[c.key] ?? '-')))
  console.log(table.toString())
}

export const printJson = (data: unknown) => console.log(JSON.stringify(data, null, 2))
export const printSuccess = (msg: string) => console.log(chalk.green(`✅ ${msg}`))
export const printError = (msg: string) => console.error(chalk.red(`❌ ${msg}`))

src/lib/enums.ts

export function enumHelpText(map: Record<number | string, string>): string {
  return '(' + Object.entries(map).map(([k, v]) => `${k}=${v}`).join(', ') + ')'
}

export function enrichEnum<T extends Record<string, unknown>>(obj: T, field: string, map: Record<number | string, string>): T & Record<string, string> {
  const val = obj[field]
  const desc = val !== undefined && val !== null ? (map[val as number | string] ?? String(val)) : '-'
  return { ...obj, [`${field}Desc`]: desc } as T & Record<string, string>
}

src/commands/auth/login.ts

import { Command, Flags } from '@oclif/core'
import { saveConfig } from '../../lib/config'
import { printSuccess } from '../../lib/output'

export default class AuthLogin extends Command {
  static description = '登录认证'
  static flags = { token: Flags.string({ description: 'Access Token', required: true }) }

  async run() {
    const { flags } = await this.parse(AuthLogin)
    saveConfig({ access_token: flags.token, auth_strategy: 'bearer' })
    printSuccess('Token 已保存')
  }
}

src/commands/auth/refresh.ts

import { Command } from '@oclif/core'
import * as fs from 'fs'
import * as path from 'path'
import * as os from 'os'
import { printSuccess, printError } from '../../lib/output'

const COOKIE_CACHE_PATH = path.join(os.homedir(), '.config', '<cli-name>', 'auth.json')
const TARGET_DOMAIN = '<从仓库读取的真实域名>'

export default class AuthRefresh extends Command {
  static description = '刷新 Cookie'

  async run() {
    try {
      // CDP 获取 Cookie 逻辑
      printSuccess('Cookie 刷新成功')
    } catch (e) {
      printError(String(e))
      this.exit(1)
    }
  }
}

Step 5:本地安装确认

Python 版本

cd <output-dir>/<cli-name>
pip install -e .
<cli-name> --help
<cli-name> auth show

Node.js 版本

cd <output-dir>/<cli-name>
npm install
npm run build
npm link
<cli-name> --help
<cli-name> auth show

Step 6:生成完成后的输出

✅ CLI 工具已生成完成!

📦 项目路径:<output-dir>/<cli-name>

🔧 安装命令:
   cd <output-dir>/<cli-name>
   pip install -e .          # Python
   # 或
   npm install && npm run build && npm link   # Node.js

📋 快速开始:
   <cli-name> --help
   <cli-name> auth show
   <cli-name> auth login
   <cli-name> <domain> list --help

💡 提示:
   - 如果使用 Cookie 鉴权,请先在 Chrome 中登录目标系统
   - 以调试模式启动 Chrome:open -a 'Google Chrome' --args --remote-debugging-port=9222
   - 运行 <cli-name> auth refresh 获取 Cookie

常见问题排查

1. 接口返回错误码

检查:

  • 是否已登录: auth show
  • 鉴权策略是否正确: auth setup --strategy
  • 环境是否正确:--env staging

2. 401 认证失败

Bearer Token:运行 auth login 重新登录

Cookie

  • 确保 Chrome 以调试模式启动:open -a 'Google Chrome' --args --remote-debugging-port=9222
  • 确保已在 Chrome 中登录目标系统
  • 运行 auth refresh

3. ModuleNotFoundError(Python)

确保:

  • 所有 __init__.py 文件都存在
  • pip install -e . 在项目根目录执行

4. URL 拼接错误

检查:

  • http_client.py 中的 BASE_URLS 是否正确
  • 接口路径是否以 / 开头

5. CDP 连接失败

确保:

  • Chrome 以调试模式启动:--remote-debugging-port=9222
  • 端口未被占用
  • 防火墙允许本地连接

版本历史

共 2 个版本

  • v1.0.1 名称更改 当前
    2026-04-03 14:15 安全 安全
  • v1.0.0 Initial release
    2026-04-03 11:18 安全

安全检测

腾讯云安全 (Keen)

安全,无风险
查看报告

腾讯云安全 (Sanbu)

安全,无风险
查看报告

🔗 相关推荐

developer-tools

Github

steipete
使用 `gh` CLI 与 GitHub 交互,通过 `gh issue`、`gh pr`、`gh run` 和 `gh api` 管理议题、PR、CI 运行及高级查询。
★ 672 📥 324,481
ai-intelligence

Self-Improving + Proactive Agent

ivangdavila
自我反思+自我批评+自我学习+自组织记忆。智能体评估自身工作、发现错误并持续改进。
★ 1,362 📥 318,989
security-compliance

Skill Vetter

spclaudehome
AI智能体技能安全预审工具。安装ClawdHub、GitHub等来源技能前,检查风险信号、权限范围及可疑模式。
★ 1,219 📥 266,817