Deploy 5 agents: ingest, transform, validate, load, and report.
| Role | Hostname | Skills | Purpose |
|---|---|---|---|
| ------ | ---------- | -------- | --------- |
| ingest | | pilot-s3-bridge, pilot-database-bridge, pilot-task-chain, pilot-cron | Pulls raw data on schedule |
| transform | | pilot-task-router, pilot-stream-data, pilot-task-parallel | Parallel data processing |
| validate | | pilot-task-router, pilot-audit-log, pilot-alert, pilot-quarantine | Quality checks, quarantine |
| loader | | pilot-database-bridge, pilot-task-chain, pilot-receipt | Writes to target stores |
| reporter | | pilot-webhook-bridge, pilot-metrics, pilot-slack-bridge, pilot-cron | Dashboards and reports |
Step 1: Ask the user which role and prefix.
Step 2: Install skills:
# ingest:
clawhub install pilot-s3-bridge pilot-database-bridge pilot-task-chain pilot-cron
# transform:
clawhub install pilot-task-router pilot-stream-data pilot-task-parallel
# validate:
clawhub install pilot-task-router pilot-audit-log pilot-alert pilot-quarantine
# loader:
clawhub install pilot-database-bridge pilot-task-chain pilot-receipt
# reporter:
clawhub install pilot-webhook-bridge pilot-metrics pilot-slack-bridge pilot-cron
Step 3: Set hostname and write manifest to ~/.pilot/setups/etl-data-pipeline.json.
Step 4: Handshake with adjacent pipeline stages.
{
"setup": "etl-data-pipeline", "role": "ingest", "role_name": "Data Ingestion",
"hostname": "<prefix>-ingest",
"skills": {
"pilot-s3-bridge": "Pull raw data from S3 buckets.",
"pilot-database-bridge": "Extract from source databases.",
"pilot-task-chain": "Chain ingestion steps sequentially.",
"pilot-cron": "Schedule periodic data pulls."
},
"data_flows": [{ "direction": "send", "peer": "<prefix>-transform", "port": 1001, "topic": "ingest-batch", "description": "Raw data batches" }],
"handshakes_needed": ["<prefix>-transform"]
}
{
"setup": "etl-data-pipeline", "role": "transform", "role_name": "Data Transformer",
"hostname": "<prefix>-transform",
"skills": {
"pilot-task-router": "Accept transform tasks from ingest.",
"pilot-stream-data": "Stream transformed records to validator.",
"pilot-task-parallel": "Process data in parallel for throughput."
},
"data_flows": [
{ "direction": "receive", "peer": "<prefix>-ingest", "port": 1001, "topic": "ingest-batch", "description": "Raw data" },
{ "direction": "send", "peer": "<prefix>-validate", "port": 1001, "topic": "transform-complete", "description": "Transformed records" }
],
"handshakes_needed": ["<prefix>-ingest", "<prefix>-validate"]
}
{
"setup": "etl-data-pipeline", "role": "validate", "role_name": "Data Validator",
"hostname": "<prefix>-validate",
"skills": {
"pilot-task-router": "Accept validation tasks.",
"pilot-audit-log": "Log validation results.",
"pilot-alert": "Alert on high error rates.",
"pilot-quarantine": "Quarantine invalid records."
},
"data_flows": [
{ "direction": "receive", "peer": "<prefix>-transform", "port": 1001, "topic": "transform-complete", "description": "Transformed records" },
{ "direction": "send", "peer": "<prefix>-loader", "port": 1001, "topic": "validation-passed", "description": "Validated records" },
{ "direction": "send", "peer": "<prefix>-reporter", "port": 1002, "topic": "validation-metrics", "description": "Error rates" }
],
"handshakes_needed": ["<prefix>-transform", "<prefix>-loader", "<prefix>-reporter"]
}
{
"setup": "etl-data-pipeline", "role": "loader", "role_name": "Data Loader",
"hostname": "<prefix>-loader",
"skills": {
"pilot-database-bridge": "Write validated data to target databases.",
"pilot-task-chain": "Chain load steps.",
"pilot-receipt": "Issue receipts for every load batch."
},
"data_flows": [
{ "direction": "receive", "peer": "<prefix>-validate", "port": 1001, "topic": "validation-passed", "description": "Validated records" },
{ "direction": "send", "peer": "<prefix>-reporter", "port": 1002, "topic": "load-receipt", "description": "Load receipts" }
],
"handshakes_needed": ["<prefix>-validate", "<prefix>-reporter"]
}
{
"setup": "etl-data-pipeline", "role": "reporter", "role_name": "Pipeline Reporter",
"hostname": "<prefix>-reporter",
"skills": {
"pilot-webhook-bridge": "Forward pipeline alerts to external services.",
"pilot-metrics": "Aggregate pipeline metrics.",
"pilot-slack-bridge": "Post daily pipeline summaries to Slack.",
"pilot-cron": "Schedule hourly/daily report generation."
},
"data_flows": [
{ "direction": "receive", "peer": "<prefix>-validate", "port": 1002, "topic": "validation-metrics", "description": "Error rates" },
{ "direction": "receive", "peer": "<prefix>-loader", "port": 1002, "topic": "load-receipt", "description": "Load receipts" }
],
"handshakes_needed": ["<prefix>-validate", "<prefix>-loader"]
}
ingest → transform : raw data batches (port 1001)transform → validate : transformed records (port 1001)validate → loader : validated records (port 1001)loader → reporter : load receipts (port 1002)validate → reporter : validation metrics (port 1002)# On ingest:
pilotctl --json send-file <prefix>-transform ./data/raw/orders.csv
pilotctl --json publish <prefix>-transform ingest-batch '{"source":"s3://data/orders","rows":50000}'
# On validate:
pilotctl --json publish <prefix>-loader validation-passed '{"batch_id":"B-1042","valid":49700,"quarantined":123}'
# On loader:
pilotctl --json publish <prefix>-reporter load-receipt '{"batch_id":"B-1042","rows_loaded":49700}'
Requires pilot-protocol skill, pilotctl binary, clawhub binary, and a running daemon.
共 1 个版本