AI Agent 原生工作流编排器
| 特性 | Airflow/DataWorks | 虾评工作流引擎 | 本技能 |
|---|---|---|---|
| ------ | ------------------- | --------------- | ----------- |
| 定位 | 企业级调度平台 | 通用任务编排 | AI Agent 工作流 + 数据处理 |
| 数据处理 | 依赖Operator | 无 | 内置8种数据节点 |
| 执行模式 | 串行/后台 | 串行 | 串行/并行/条件路由 |
| AI能力 | 无 | 无 | 自然语言生成DAG |
| 技能调用 | 无 | 有 | 支持OpenClaw技能 |
| 安全验证 | 基础 | 基础 | SQL白名单+路径校验+资源限制 |
| 部署 | 需安装/配置 | 云端 | 零依赖+单文件运行 |
| 适用场景 | 大规模ETL | 日常自动化 | 数据处理流水线+AI Agent编排 |
用户需求 → 自然语言 → AI生成DAG → 串行/并行/条件执行 → 技能调用 → 结果输出
↓
数据清洗 → 数据转换 → 数据聚合 → 可视化
↓
[调用数据分析技能]
# 从JSON文件执行
python workflow_cli.py run workflow.json
# 并行模式执行
python workflow_cli.py run workflow.json --mode parallel
# 仅验证
python workflow_cli.py run workflow.json --validate-only
# 模拟运行
python workflow_cli.py run workflow.json --dry-run
python workflow_cli.py generate \
"从sales.csv读取数据,过滤status=completed,按category聚合求和,输出到summary.csv"
python workflow_cli.py visualize workflow.json -o workflow.html
{
"workflow_id": "my_workflow",
"name": "我的工作流",
"version": "2.0.0",
"execution_mode": "auto",
"on_failure": "stop",
"nodes": [
{
"id": "node_1",
"type": "DataIn",
"name": "读取数据",
"config": {
"path": "data/sales.csv",
"format": "csv"
}
},
{
"id": "node_2",
"type": "Filter",
"name": "过滤数据",
"depends_on": ["node_1"],
"config": {
"conditions": [
{"field": "status", "operator": "==", "value": "completed"}
],
"logic": "and"
}
},
{
"id": "node_3",
"type": "DataOut",
"name": "输出数据",
"depends_on": ["node_2"],
"config": {
"path": "output/summary.csv"
}
}
],
"edges": [
{"from": "node_1", "to": "node_2"},
{"from": "node_2", "to": "node_3"}
]
}
| 类型 | 说明 | 配置示例 |
|---|---|---|
| ------ | ------ | ---------- |
DataIn | 数据输入 | {"path": "data.csv", "format": "csv"} |
DataOut | 数据输出 | {"path": "output.csv", "format": "csv"} |
Filter | 数据过滤 | {"conditions": [{"field": "status", "operator": "==", "value": "active"}]} |
Join | 数据关联 | {"left_key": "id", "right_key": "id", "join_type": "left"} |
Aggregate | 数据聚合 | {"group_by": ["category"], "aggregations": [{"func": "sum", "field": "amount"}]} |
Python | Python脚本 | {"code": "output_data = input_data * 2"} |
Branch | 条件分支 | {"condition": "result > 0", "true_branch": "node_a", "false_branch": "node_b"} |
CLI | 外部命令 | {"command": "echo hello"} |
Skill | 技能调用 | {"skill": "smart-data-analyzer"} |
按拓扑排序顺序依次执行每个节点。
{
"execution_mode": "sequential"
}
无依赖的节点并发执行。
{
"execution_mode": "parallel"
}
根据DAG结构自动选择最优执行方式:
{
"execution_mode": "auto"
}
支持 if-else 分支,根据上游节点输出决定执行路径。
{
"nodes": [
{"id": "check", "type": "DataIn", "name": "数据检查"},
{"id": "branch", "type": "Branch", "depends_on": ["check"]},
{"id": "process_valid", "type": "Filter"},
{"id": "process_invalid", "type": "Filter"}
],
"edges": [
{"from": "check", "to": "branch"},
{"from": "branch", "to": "process_valid", "condition": true},
{"from": "branch", "to": "process_invalid", "condition": false}
]
}
支持调用 OpenClaw 平台的技能。
{
"id": "analyze",
"type": "Skill",
"name": "数据分析",
"skill": "smart-data-analyzer",
"config": {
"query": "SELECT * FROM sales"
}
}
| 技能名 | 说明 |
|---|---|
| -------- | ------ |
smart-data-analyzer | DuckDB数据分析 |
echarts-viz | ECharts可视化 |
chartjs-reporter | Chart.js报表 |
excel-workbench | Excel工作台 |
duckdb-data-analysis | DuckDB分析 |
mysql-analytics | MySQL分析 |
postgres-analytics | PostgreSQL分析 |
data-visualization | 数据可视化 |
# 允许的关键字
ALLOWED_KEYWORDS = {'SELECT', 'INSERT', 'UPDATE', 'CREATE', 'ALTER'}
# 禁止的关键字
FORBIDDEN_KEYWORDS = {'DROP', 'DELETE', 'TRUNCATE', 'EXEC', 'GRANT'}
禁止路径遍历攻击:
../ 模式| 限制项 | 默认值 |
|---|---|
| -------- | -------- |
| 最大执行时间 | 3600秒 |
| 最大节点数 | 100 |
| 最大文件大小 | 100MB |
| 最大内存 | 512MB |
from dag_runner import DAGRunner, ExecutionMode
from node_executor import NodeExecutor
# 创建执行器
executor = NodeExecutor()
# 加载工作流
runner = DAGRunner(workflow=workflow, execution_mode=ExecutionMode.AUTO)
# 注册节点执行器
for node_type in executor.list_types():
runner.register_executor(node_type, executor.execute)
# 执行
result = runner.run()
print(result['status'])
print(result['summary'])
from ai_dag_generator import AIDAGGenerator
generator = AIDAGGenerator()
workflow = generator.generate("从sales.csv读取数据,过滤status=completed,按category聚合")
print(workflow)
from dag_visualizer import DAGVisualizer
visualizer = DAGVisualizer()
html = visualizer.render(workflow)
with open('workflow.html', 'w') as f:
f.write(html)
{
"workflow_id": "etl_pipeline",
"name": "数据ETL流程",
"execution_mode": "parallel",
"nodes": [
{
"id": "load_sales",
"type": "DataIn",
"name": "读取销售数据",
"config": {"path": "data/sales.csv"}
},
{
"id": "load_products",
"type": "DataIn",
"name": "读取产品数据",
"config": {"path": "data/products.csv"}
},
{
"id": "join_data",
"type": "Join",
"name": "关联数据",
"depends_on": ["load_sales", "load_products"],
"config": {"left_key": "product_id", "right_key": "id", "join_type": "left"}
},
{
"id": "aggregate",
"type": "Aggregate",
"name": "按产品聚合",
"depends_on": ["join_data"],
"config": {
"group_by": ["product_name"],
"aggregations": [
{"field": "amount", "func": "sum", "alias": "total_sales"},
{"field": "quantity", "func": "count", "alias": "order_count"}
]
}
},
{
"id": "output",
"type": "DataOut",
"name": "输出结果",
"depends_on": ["aggregate"],
"config": {"path": "output/summary.csv"}
}
]
}
{
"workflow_id": "conditional_flow",
"name": "条件分支流程",
"execution_mode": "auto",
"nodes": [
{"id": "start", "type": "DataIn", "name": "读取数据"},
{"id": "check", "type": "Branch", "depends_on": ["start"]},
{"id": "path_a", "type": "Filter", "name": "路径A"},
{"id": "path_b", "type": "Filter", "name": "路径B"},
{"id": "merge", "type": "Join", "depends_on": ["path_a", "path_b"]}
],
"edges": [
{"from": "start", "to": "check"},
{"from": "check", "to": "path_a", "condition": true},
{"from": "check", "to": "path_b", "condition": false},
{"from": "path_a", "to": "merge"},
{"from": "path_b", "to": "merge"}
]
}
agent-workflow-orchestrator/
├── SKILL.md # 本文档
├── scripts/
│ ├── __init__.py
│ ├── dag_runner.py # DAG执行引擎
│ ├── node_executor.py # 节点执行器
│ ├── skills_executor.py # 技能调用器
│ ├── ai_dag_generator.py # AI DAG生成
│ ├── lineage_tracker.py # 数据血缘追踪
│ ├── dag_visualizer.py # DAG可视化
│ ├── security_guard.py # 安全守卫
│ ├── state_manager.py # 状态管理
│ ├── config_exporter.py # 配置导出
│ └── workflow_cli.py # CLI入口
├── references/
│ ├── workflows/ # 工作流示例
│ └── node_templates/ # 节点模板
└── tests/
├── test_dag_runner.py
├── test_node_executor.py
├── test_skills_executor.py
├── test_security_guard.py
└── test_dag_visualizer.py
共 1 个版本