← 返回
未分类

智能工作流

Captain
未分类 community v1.0.0 1 版本 98437.5 Key: 无需
★ 0
Stars
📥 63
下载
💾 0
安装
1
版本
#latest

概述

Agent Workflow Orchestrator v2.0

AI Agent 原生工作流编排器


定位对比

特性Airflow/DataWorks虾评工作流引擎本技能
---------------------------------------------------
定位企业级调度平台通用任务编排AI Agent 工作流 + 数据处理
数据处理依赖Operator内置8种数据节点
执行模式串行/后台串行串行/并行/条件路由
AI能力自然语言生成DAG
技能调用支持OpenClaw技能
安全验证基础基础SQL白名单+路径校验+资源限制
部署需安装/配置云端零依赖+单文件运行
适用场景大规模ETL日常自动化数据处理流水线+AI Agent编排

独特价值

用户需求 → 自然语言 → AI生成DAG → 串行/并行/条件执行 → 技能调用 → 结果输出
                      ↓
              数据清洗 → 数据转换 → 数据聚合 → 可视化
                              ↓
                        [调用数据分析技能]

快速开始

1. 执行工作流

# 从JSON文件执行
python workflow_cli.py run workflow.json

# 并行模式执行
python workflow_cli.py run workflow.json --mode parallel

# 仅验证
python workflow_cli.py run workflow.json --validate-only

# 模拟运行
python workflow_cli.py run workflow.json --dry-run

2. 从自然语言生成

python workflow_cli.py generate \
  "从sales.csv读取数据,过滤status=completed,按category聚合求和,输出到summary.csv"

3. 生成可视化

python workflow_cli.py visualize workflow.json -o workflow.html

DAG 定义格式

{
  "workflow_id": "my_workflow",
  "name": "我的工作流",
  "version": "2.0.0",
  "execution_mode": "auto",
  "on_failure": "stop",
  
  "nodes": [
    {
      "id": "node_1",
      "type": "DataIn",
      "name": "读取数据",
      "config": {
        "path": "data/sales.csv",
        "format": "csv"
      }
    },
    {
      "id": "node_2",
      "type": "Filter",
      "name": "过滤数据",
      "depends_on": ["node_1"],
      "config": {
        "conditions": [
          {"field": "status", "operator": "==", "value": "completed"}
        ],
        "logic": "and"
      }
    },
    {
      "id": "node_3",
      "type": "DataOut",
      "name": "输出数据",
      "depends_on": ["node_2"],
      "config": {
        "path": "output/summary.csv"
      }
    }
  ],
  
  "edges": [
    {"from": "node_1", "to": "node_2"},
    {"from": "node_2", "to": "node_3"}
  ]
}

节点类型

类型说明配置示例
----------------------
DataIn数据输入{"path": "data.csv", "format": "csv"}
DataOut数据输出{"path": "output.csv", "format": "csv"}
Filter数据过滤{"conditions": [{"field": "status", "operator": "==", "value": "active"}]}
Join数据关联{"left_key": "id", "right_key": "id", "join_type": "left"}
Aggregate数据聚合{"group_by": ["category"], "aggregations": [{"func": "sum", "field": "amount"}]}
PythonPython脚本{"code": "output_data = input_data * 2"}
Branch条件分支{"condition": "result > 0", "true_branch": "node_a", "false_branch": "node_b"}
CLI外部命令{"command": "echo hello"}
Skill技能调用{"skill": "smart-data-analyzer"}

执行模式

1. 串行流水线 (sequential)

按拓扑排序顺序依次执行每个节点。

{
  "execution_mode": "sequential"
}

2. 并行协作 (parallel)

无依赖的节点并发执行。

{
  "execution_mode": "parallel"
}

3. 自动模式 (auto)

根据DAG结构自动选择最优执行方式:

  • 存在条件分支 → 条件路由
  • 存在并行机会 → 并行执行
  • 简单链式 → 串行执行
{
  "execution_mode": "auto"
}

条件路由

支持 if-else 分支,根据上游节点输出决定执行路径。

{
  "nodes": [
    {"id": "check", "type": "DataIn", "name": "数据检查"},
    {"id": "branch", "type": "Branch", "depends_on": ["check"]},
    {"id": "process_valid", "type": "Filter"},
    {"id": "process_invalid", "type": "Filter"}
  ],
  "edges": [
    {"from": "check", "to": "branch"},
    {"from": "branch", "to": "process_valid", "condition": true},
    {"from": "branch", "to": "process_invalid", "condition": false}
  ]
}

技能调用

支持调用 OpenClaw 平台的技能。

{
  "id": "analyze",
  "type": "Skill",
  "name": "数据分析",
  "skill": "smart-data-analyzer",
  "config": {
    "query": "SELECT * FROM sales"
  }
}

支持的技能

技能名说明
--------------
smart-data-analyzerDuckDB数据分析
echarts-vizECharts可视化
chartjs-reporterChart.js报表
excel-workbenchExcel工作台
duckdb-data-analysisDuckDB分析
mysql-analyticsMySQL分析
postgres-analyticsPostgreSQL分析
data-visualization数据可视化

安全机制

1. SQL白名单

# 允许的关键字
ALLOWED_KEYWORDS = {'SELECT', 'INSERT', 'UPDATE', 'CREATE', 'ALTER'}

# 禁止的关键字
FORBIDDEN_KEYWORDS = {'DROP', 'DELETE', 'TRUNCATE', 'EXEC', 'GRANT'}

2. 路径校验

禁止路径遍历攻击:

  • ../ 模式
  • URL编码的遍历
  • 符号链接到敏感目录

3. 资源限制

限制项默认值
----------------
最大执行时间3600秒
最大节点数100
最大文件大小100MB
最大内存512MB

API 使用

Python API

from dag_runner import DAGRunner, ExecutionMode
from node_executor import NodeExecutor

# 创建执行器
executor = NodeExecutor()

# 加载工作流
runner = DAGRunner(workflow=workflow, execution_mode=ExecutionMode.AUTO)

# 注册节点执行器
for node_type in executor.list_types():
    runner.register_executor(node_type, executor.execute)

# 执行
result = runner.run()

print(result['status'])
print(result['summary'])

AI生成

from ai_dag_generator import AIDAGGenerator

generator = AIDAGGenerator()
workflow = generator.generate("从sales.csv读取数据,过滤status=completed,按category聚合")

print(workflow)

可视化

from dag_visualizer import DAGVisualizer

visualizer = DAGVisualizer()
html = visualizer.render(workflow)

with open('workflow.html', 'w') as f:
    f.write(html)

示例工作流

数据ETL

{
  "workflow_id": "etl_pipeline",
  "name": "数据ETL流程",
  "execution_mode": "parallel",
  "nodes": [
    {
      "id": "load_sales",
      "type": "DataIn",
      "name": "读取销售数据",
      "config": {"path": "data/sales.csv"}
    },
    {
      "id": "load_products",
      "type": "DataIn",
      "name": "读取产品数据",
      "config": {"path": "data/products.csv"}
    },
    {
      "id": "join_data",
      "type": "Join",
      "name": "关联数据",
      "depends_on": ["load_sales", "load_products"],
      "config": {"left_key": "product_id", "right_key": "id", "join_type": "left"}
    },
    {
      "id": "aggregate",
      "type": "Aggregate",
      "name": "按产品聚合",
      "depends_on": ["join_data"],
      "config": {
        "group_by": ["product_name"],
        "aggregations": [
          {"field": "amount", "func": "sum", "alias": "total_sales"},
          {"field": "quantity", "func": "count", "alias": "order_count"}
        ]
      }
    },
    {
      "id": "output",
      "type": "DataOut",
      "name": "输出结果",
      "depends_on": ["aggregate"],
      "config": {"path": "output/summary.csv"}
    }
  ]
}

条件分支

{
  "workflow_id": "conditional_flow",
  "name": "条件分支流程",
  "execution_mode": "auto",
  "nodes": [
    {"id": "start", "type": "DataIn", "name": "读取数据"},
    {"id": "check", "type": "Branch", "depends_on": ["start"]},
    {"id": "path_a", "type": "Filter", "name": "路径A"},
    {"id": "path_b", "type": "Filter", "name": "路径B"},
    {"id": "merge", "type": "Join", "depends_on": ["path_a", "path_b"]}
  ],
  "edges": [
    {"from": "start", "to": "check"},
    {"from": "check", "to": "path_a", "condition": true},
    {"from": "check", "to": "path_b", "condition": false},
    {"from": "path_a", "to": "merge"},
    {"from": "path_b", "to": "merge"}
  ]
}

目录结构

agent-workflow-orchestrator/
├── SKILL.md                    # 本文档
├── scripts/
│   ├── __init__.py
│   ├── dag_runner.py           # DAG执行引擎
│   ├── node_executor.py        # 节点执行器
│   ├── skills_executor.py      # 技能调用器
│   ├── ai_dag_generator.py     # AI DAG生成
│   ├── lineage_tracker.py      # 数据血缘追踪
│   ├── dag_visualizer.py       # DAG可视化
│   ├── security_guard.py       # 安全守卫
│   ├── state_manager.py        # 状态管理
│   ├── config_exporter.py      # 配置导出
│   └── workflow_cli.py         # CLI入口
├── references/
│   ├── workflows/             # 工作流示例
│   └── node_templates/         # 节点模板
└── tests/
    ├── test_dag_runner.py
    ├── test_node_executor.py
    ├── test_skills_executor.py
    ├── test_security_guard.py
    └── test_dag_visualizer.py

版本信息

  • 版本: 2.0.0
  • 更新: 2026-04-27
  • 作者: captain_ai

版本历史

共 1 个版本

  • v1.0.0 Initial release 当前
    2026-05-26 23:25 安全

安全检测

腾讯云安全 (Keen)

安全,无风险
查看报告

腾讯云安全 (Sanbu)

suspicious
查看报告

🔗 相关推荐

飞书CLI一键安装助手

user_af28adda
★ 0 📥 1,018

智能数据可视化

user_af28adda
AI智能数据可视化;支持多种图表类型(柱状图、折线图、饼图、散点图、直方图、箱线图、小提琴图、面积图、热力图、平行坐标图、旭日图等),根据数据特征自动分析并推荐最佳图表组合,生成精美交互式HTML仪表板
★ 4 📥 847

企微CLI一键安装助手

user_af28adda
企业微信 CLI 一键安装助手。当用户提到「安装企业微信 CLI」「安装 wecom-cli」「企微 CLI 怎么安装」「一键安装企微」「配置企业微信机器人」「初始化企微 CLI」「wecom-cli 安装」「企业微信命令行工具安装」「we
★ 2 📥 1,513