版本: v1.0
创建日期: 2026-03-26
*作?: 象腿 (main agent)
*用?: 根据routing规则自动调度specialist agents
Agent Dispatch是main agent的核心协调skill,负责:
def match_route(intent):
"""
根据intent匹配routing规则
Returns:
dict: {
"target": "coder|danao|writer|engineer|manager|self",
"priority": 1-99,
"confidence": 0.0-1.0
}
"""
for rule in routing_rules:
pattern = rule["pattern"]
if re.search(pattern, intent["user_input"]):
return {
"target": rule["target"],
"priority": rule["priority"],
"confidence": calculate_confidence(intent, rule)
}
# 默认fallback到self
return {"target": "self", "priority": 99, "confidence": 0.5}
# Coder Agent
coder:
runtime: "acp"
agentId: "codex"
mode: "run"
timeout: 300
# Danao Agent
danao:
runtime: "subagent"
agentId: "danao"
mode: "session"
timeout: 600
# Writer Agent
writer:
runtime: "subagent"
agentId: "writer"
mode: "session"
timeout: 600
# Engineer Agent
engineer:
runtime: "subagent"
agentId: "engineer"
mode: "session"
timeout: 600
# Manager Agent
manager:
runtime: "subagent"
agentId: "manager"
mode: "session"
timeout: 300
def parallel_dispatch(tasks):
"""
并行调度多个独立任务
Args:
tasks: [
{"route": route1, "intent": intent1, "user_input": input1},
{"route": route2, "intent": intent2, "user_input": input2}
]
Returns:
list: [result1, result2]
"""
MAX_PARALLEL = 2
# 限制并行数量
tasks = tasks[:MAX_PARALLEL]
# 并行执行
results = []
with ThreadPoolExecutor(max_workers=MAX_PARALLEL) as executor:
futures = [
executor.submit(dispatch_task, task["route"], task["intent"], task["user_input"])
for task in tasks
]
for future in as_completed(futures):
results.append(future.result())
return results
def log_dispatch(route, intent, result):
"""
记录调度日志
Format:
[2026-03-26 12:00:00] [DISPATCH] target=coder, priority=1, confidence=0.95, success=true, time=3.45s
"""
log_entry = {
"timestamp": datetime.now().isoformat(),
"type": "DISPATCH",
"target": route["target"],
"priority": route["priority"],
"confidence": route["confidence"],
"success": result["success"],
"execution_time": result["metadata"]["execution_time"],
"retry_count": result["metadata"]["retry_count"]
}
write_log(log_entry)
# 用户输入
user_input = "写一篇关于AI工具推荐的小红书文案"
# 意图识别
intent = identify_intent(user_input)
# => {"task_type": "writing", "complexity": "medium"}
# 路由匹配
route = match_route(intent)
# => {"target": "writer", "priority": 3, "confidence": 0.92}
# 任务调度
result = dispatch_task(route, intent, user_input)
# 结果整合
response = integrate_results(result)
# => "【Writer】已处理完成(耗时15.23秒)\n\n小红书文案已生成..."
Skill版本: v1.0
最后更? 2026-03-26
维护? 象腿 (main agent)
共 1 个版本