← 返回
未分类 中文

Pilot Etl Data Pipeline Setup

Deploy a five-stage ETL data pipeline with 5 agents. Use this skill when: 1. User wants to set up an ETL or data processing pipeline 2. User is configuring a...
Deploy a five-stage ETL data pipeline with 5 agents. Use this skill when: 1. User wants to set up an ETL or data processing pipeline 2. User is configuring a...
teoslayer teoslayer 来源
未分类 clawhub v1.0.0 1 版本 100000 Key: 无需
★ 0
Stars
📥 363
下载
💾 0
安装
1
版本
#latest

概述

ETL Data Pipeline Setup

Deploy 5 agents: ingest, transform, validate, load, and report.

Roles

RoleHostnameSkillsPurpose
---------------------------------
ingest-ingestpilot-s3-bridge, pilot-database-bridge, pilot-task-chain, pilot-cronPulls raw data on schedule
transform-transformpilot-task-router, pilot-stream-data, pilot-task-parallelParallel data processing
validate-validatepilot-task-router, pilot-audit-log, pilot-alert, pilot-quarantineQuality checks, quarantine
loader-loaderpilot-database-bridge, pilot-task-chain, pilot-receiptWrites to target stores
reporter-reporterpilot-webhook-bridge, pilot-metrics, pilot-slack-bridge, pilot-cronDashboards and reports

Setup Procedure

Step 1: Ask the user which role and prefix.

Step 2: Install skills:

# ingest:
clawhub install pilot-s3-bridge pilot-database-bridge pilot-task-chain pilot-cron
# transform:
clawhub install pilot-task-router pilot-stream-data pilot-task-parallel
# validate:
clawhub install pilot-task-router pilot-audit-log pilot-alert pilot-quarantine
# loader:
clawhub install pilot-database-bridge pilot-task-chain pilot-receipt
# reporter:
clawhub install pilot-webhook-bridge pilot-metrics pilot-slack-bridge pilot-cron

Step 3: Set hostname and write manifest to ~/.pilot/setups/etl-data-pipeline.json.

Step 4: Handshake with adjacent pipeline stages.

Manifest Templates Per Role

ingest

{
  "setup": "etl-data-pipeline", "role": "ingest", "role_name": "Data Ingestion",
  "hostname": "<prefix>-ingest",
  "skills": {
    "pilot-s3-bridge": "Pull raw data from S3 buckets.",
    "pilot-database-bridge": "Extract from source databases.",
    "pilot-task-chain": "Chain ingestion steps sequentially.",
    "pilot-cron": "Schedule periodic data pulls."
  },
  "data_flows": [{ "direction": "send", "peer": "<prefix>-transform", "port": 1001, "topic": "ingest-batch", "description": "Raw data batches" }],
  "handshakes_needed": ["<prefix>-transform"]
}

transform

{
  "setup": "etl-data-pipeline", "role": "transform", "role_name": "Data Transformer",
  "hostname": "<prefix>-transform",
  "skills": {
    "pilot-task-router": "Accept transform tasks from ingest.",
    "pilot-stream-data": "Stream transformed records to validator.",
    "pilot-task-parallel": "Process data in parallel for throughput."
  },
  "data_flows": [
    { "direction": "receive", "peer": "<prefix>-ingest", "port": 1001, "topic": "ingest-batch", "description": "Raw data" },
    { "direction": "send", "peer": "<prefix>-validate", "port": 1001, "topic": "transform-complete", "description": "Transformed records" }
  ],
  "handshakes_needed": ["<prefix>-ingest", "<prefix>-validate"]
}

validate

{
  "setup": "etl-data-pipeline", "role": "validate", "role_name": "Data Validator",
  "hostname": "<prefix>-validate",
  "skills": {
    "pilot-task-router": "Accept validation tasks.",
    "pilot-audit-log": "Log validation results.",
    "pilot-alert": "Alert on high error rates.",
    "pilot-quarantine": "Quarantine invalid records."
  },
  "data_flows": [
    { "direction": "receive", "peer": "<prefix>-transform", "port": 1001, "topic": "transform-complete", "description": "Transformed records" },
    { "direction": "send", "peer": "<prefix>-loader", "port": 1001, "topic": "validation-passed", "description": "Validated records" },
    { "direction": "send", "peer": "<prefix>-reporter", "port": 1002, "topic": "validation-metrics", "description": "Error rates" }
  ],
  "handshakes_needed": ["<prefix>-transform", "<prefix>-loader", "<prefix>-reporter"]
}

loader

{
  "setup": "etl-data-pipeline", "role": "loader", "role_name": "Data Loader",
  "hostname": "<prefix>-loader",
  "skills": {
    "pilot-database-bridge": "Write validated data to target databases.",
    "pilot-task-chain": "Chain load steps.",
    "pilot-receipt": "Issue receipts for every load batch."
  },
  "data_flows": [
    { "direction": "receive", "peer": "<prefix>-validate", "port": 1001, "topic": "validation-passed", "description": "Validated records" },
    { "direction": "send", "peer": "<prefix>-reporter", "port": 1002, "topic": "load-receipt", "description": "Load receipts" }
  ],
  "handshakes_needed": ["<prefix>-validate", "<prefix>-reporter"]
}

reporter

{
  "setup": "etl-data-pipeline", "role": "reporter", "role_name": "Pipeline Reporter",
  "hostname": "<prefix>-reporter",
  "skills": {
    "pilot-webhook-bridge": "Forward pipeline alerts to external services.",
    "pilot-metrics": "Aggregate pipeline metrics.",
    "pilot-slack-bridge": "Post daily pipeline summaries to Slack.",
    "pilot-cron": "Schedule hourly/daily report generation."
  },
  "data_flows": [
    { "direction": "receive", "peer": "<prefix>-validate", "port": 1002, "topic": "validation-metrics", "description": "Error rates" },
    { "direction": "receive", "peer": "<prefix>-loader", "port": 1002, "topic": "load-receipt", "description": "Load receipts" }
  ],
  "handshakes_needed": ["<prefix>-validate", "<prefix>-loader"]
}

Data Flows

  • ingest → transform : raw data batches (port 1001)
  • transform → validate : transformed records (port 1001)
  • validate → loader : validated records (port 1001)
  • loader → reporter : load receipts (port 1002)
  • validate → reporter : validation metrics (port 1002)

Workflow Example

# On ingest:
pilotctl --json send-file <prefix>-transform ./data/raw/orders.csv
pilotctl --json publish <prefix>-transform ingest-batch '{"source":"s3://data/orders","rows":50000}'
# On validate:
pilotctl --json publish <prefix>-loader validation-passed '{"batch_id":"B-1042","valid":49700,"quarantined":123}'
# On loader:
pilotctl --json publish <prefix>-reporter load-receipt '{"batch_id":"B-1042","rows_loaded":49700}'

Dependencies

Requires pilot-protocol skill, pilotctl binary, clawhub binary, and a running daemon.

版本历史

共 1 个版本

  • v1.0.0 当前
    2026-05-07 16:49 安全 安全

安全检测

腾讯云安全 (Keen)

安全,无风险
查看报告

腾讯云安全 (Sanbu)

安全,无风险
查看报告

🔗 相关推荐

data-analysis

Tavily 搜索

jacky1n7
通过 Tavily API 进行网页搜索(Brave 替代方案)。当用户要求搜索网页、查找来源或链接,且 Brave 网页搜索不可用时使用。
★ 273 📥 100,224
data-analysis

AdMapix

fly0pants
AdMapix 原始数据层,提供广告创意、应用、排名、下载/收入及市场元数据。返回 AdMapix API 的结构化 JSON;调用方...
★ 296 📥 139,674
it-ops-security

Pilot Priority Queue

teoslayer
基于Pilot协议网络的优先级消息传递,支持紧急程度级别。适用场景:1. 需要处理带优先级的紧急消息...
★ 0 📥 499