← 返回
未分类

etl-pipeline-generator

Generate automated ETL pipelines for transforming and loading data into graph databases or knowledge graphs.
Generate automated ETL pipelines for transforming and loading data into graph databases or knowledge graphs.
yjkj999999
未分类 community v1.0.0 1 版本 100000 Key: 无需
★ 0
Stars
📥 17
下载
💾 0
安装
1
版本
#latest

概述

ETL Pipeline Generator

Design automated Extract-Transform-Load pipelines for knowledge graph construction.

This skill generates structured ETL pipelines that move raw data from various sources into graph-ready datasets or knowledge graph storage systems. It orchestrates data ingestion, transformation, and loading steps so that messy input data becomes structured graph data.

Quick Start

Use When

  • Building automated data ingestion workflows
  • Designing Extract-Transform-Load pipelines
  • Orchestrating multi-step data transformations
  • Creating repeatable data pipeline architectures
  • Integrating multiple data sources
  • Automating knowledge graph population
  • Building data quality and validation workflows

Inputs

  • Data source specifications (type, location, format)
  • Transformation requirements and rules
  • Target system definitions (database, format)
  • Data quality requirements
  • Pipeline scheduling and monitoring rules
  • Error handling and recovery strategies

Outputs

  • Pipeline configuration (YAML)
  • Directed Acyclic Graph (DAG) definitions
  • Python/Scala/SQL implementation scripts
  • Data flow diagrams
  • Pipeline documentation
  • Monitoring and alerting configurations

Example

Simple Customer Data Pipeline:

name: customer_graph_ingestion
description: Ingest customer data into knowledge graph

extract:
  source_type: csv
  source_path: data/customers.csv
  format: csv

transform:
  - normalize_entities
  - detect_relationships
  - validate_schema

load:
  target: neo4j
  database: customer_graph
  method: bulk_import

Generated Pipeline Flow:

Extract CSV → Normalize Entities → Detect Relationships → 
Validate Data → Load to Neo4j → Verify Load

ETL Pipeline Architecture

1. Extract Stage

Purpose: Retrieve raw data from external sources

Supported Sources:

  • CSV files
  • JSON files
  • REST APIs
  • Databases (SQL, NoSQL)
  • Text documents
  • Streaming sources (Kafka, Pub/Sub)
  • Data warehouses (Snowflake, BigQuery)

Configuration:

extract:
  source_type: csv|json|api|database|text|stream
  location: /path/to/file or endpoint
  format: format_specifier
  authentication: credentials (if needed)
  filtering: (optional) filter criteria

2. Transform Stage

Purpose: Convert raw data into graph-ready structures

Transformation Operations:

  • Entity Detection - Identify entity boundaries
  • Relationship Inference - Discover relationships
  • Schema Mapping - Map to target schema
  • Data Normalization - Standardize values
  • Type Conversion - Convert data types
  • Deduplication - Remove duplicates
  • Enrichment - Add contextual data
  • Validation - Check data quality
  • Filtering - Remove invalid records

Example Transform:

transform:
  operations:
    - normalize_entity_names
    - infer_relationships_from_columns
    - convert_dates_to_iso8601
    - deduplicate_entities
    - validate_required_fields
    - enrich_with_external_data

3. Load Stage

Purpose: Load transformed data into target system

Supported Targets:

  • Neo4j - Property graph database
  • RDF Triple Stores - SPARQL endpoint
  • ArangoDB - Multi-model database
  • TigerGraph - Graph database
  • Property Graph Engines - Generic property graphs
  • CSV/JSON - File output

Configuration:

load:
  target: neo4j|rdf|arangodb|tigergraph|property_graph|file
  connection_params:
    host: localhost
    port: 7687
    database: graph_database
  method: bulk_import|streaming|batch
  batch_size: 1000

Pipeline Stages Explained

Extract Stage Details

Data Source Connectors:

CSV Extractor
  - delimiter detection
  - header parsing
  - encoding handling
  - chunk processing

JSON Extractor
  - nested object handling
  - array flattening
  - schema inference
  - streaming JSON support

API Extractor
  - authentication handling
  - pagination support
  - rate limiting
  - response transformation

Database Extractor
  - SQL query execution
  - connection pooling
  - partition handling
  - transaction management

Transform Stage Details

Data Processing Operations:

Normalization
  - case normalization
  - whitespace trimming
  - special character handling
  - URL encoding

Type Conversion
  - string → integer/float
  - string → datetime
  - string → boolean
  - safe type conversions

Deduplication
  - exact match detection
  - fuzzy matching
  - identity resolution
  - merge strategies

Validation
  - required field checks
  - data type validation
  - range validation
  - pattern matching

Load Stage Details

Data Loading Methods:

Bulk Import
  - CSV nodes/edges files
  - Batch processing
  - High throughput
  - Transactional rollback

Streaming Load
  - Real-time ingestion
  - Event-based
  - Lower latency
  - Connection streaming

Batch Load
  - Scheduled runs
  - Configurable batch sizes
  - Progress tracking
  - Failure recovery

Pipeline Execution Modes

1. Synchronous Execution

Sequential execution of all stages
Immediate feedback on success/failure
Best for: Small to medium datasets

2. Asynchronous Execution

Non-blocking pipeline execution
Async progress monitoring
Best for: Large datasets, scheduled runs

3. Streaming Mode

Continuous data flow
Real-time processing
Best for: Live data feeds, event streams

4. Scheduled Execution

Cron-based or scheduled triggers
Repeatable, idempotent operations
Best for: Regular data refreshes

Pipeline Monitoring and Observability

Monitoring Capabilities

  • Step execution times
  • Data volume metrics (rows in/out)
  • Error tracking and logging
  • Data quality metrics
  • Performance profiling

Alerting

alerts:
  - condition: execution_time > 3600s
    action: notify_admin
  - condition: error_count > 10
    action: pause_pipeline
  - condition: data_quality_score < 0.9
    action: log_warning

Error Handling and Recovery

Error Handling Strategies

Fail Fast: Stop on first error
Fail Safe: Continue with logging
Retry: Exponential backoff retry
Skip: Skip failed records, continue
Dead Letter: Route to error queue

Recovery Mechanisms

Checkpoint/Resume: Save state, resume from checkpoint
Rollback: Undo on failure
Compensation: Execute cleanup actions
Retry with Backoff: Automatic retry logic

Output Formats

YAML Pipeline Configuration

name: pipeline_name
description: pipeline description

extract:
  source_type: csv
  location: data/file.csv

transform:
  operations:
    - normalize_entities
    - validate_data

load:
  target: neo4j
  database: my_graph

DAG (Directed Acyclic Graph)

Extract(csv) → Parse → Normalize → 
Validate → Deduplicate → Load(Neo4j) → Verify

Python Implementation

def pipeline_execution():
    data = extract_from_csv('data.csv')
    data = normalize_entities(data)
    data = validate_data(data)
    load_to_neo4j(data)
    return verify_load()

Execution Metrics

{
  "pipeline_id": "customer_ingestion_001",
  "start_time": "2024-04-09T10:00:00Z",
  "end_time": "2024-04-09T10:05:30Z",
  "duration_seconds": 330,
  "stages": [
    {"name": "extract", "duration": 45, "records": 10000},
    {"name": "transform", "duration": 120, "records": 9900},
    {"name": "load", "duration": 150, "records": 9900}
  ],
  "status": "SUCCESS"
}

Execution Steps

  1. Validate Pipeline Configuration – Check all parameters
  2. Initialize Extractors – Set up data source connections
  3. Execute Extract – Retrieve raw data
  4. Initialize Transformers – Prepare transformation engines
  5. Execute Transform – Apply transformation operations
  6. Validate Transformed Data – Check data quality
  7. Initialize Loaders – Set up target connections
  8. Execute Load – Load data to target system
  9. Verify Load – Confirm successful loading
  10. Generate Metrics – Collect execution statistics

Integration with Other Skills

The ETL Pipeline Generator orchestrates these skills:

  • API Ingestion Connectors → Extract from APIs
  • CSV Graph Loader Generator → Transform and load CSVs
  • JSON to Triples Converter → Transform JSON to RDF
  • Text Entity Relation Extractor → Extract from text
  • Graph Schema Validation → Validate loaded data
  • Graph Constraint Generator → Apply constraints

Recommended Libraries

  • Orchestration: Apache Airflow, Prefect, Dagster
  • ETL Processing: Apache Spark, Pandas, Polars
  • Data Validation: Great Expectations, Pandera
  • Graph Loaders: neo4j, rdflib, networkx
  • Scheduling: APScheduler, Luigi, Celery
  • Monitoring: Prometheus, Grafana, DataDog

Best Practices

Idempotency – Pipelines can be rerun safely

Data Validation – Validate at each stage

Error Handling – Graceful error management

Monitoring – Track pipeline execution

Documentation – Document data lineage

Testing – Test with sample data first

Version Control – Track pipeline changes

Scalability – Design for data growth

Security – Secure credentials and data

Performance – Optimize transformation logic

References

See pipeline-patterns.md for detailed ETL pipeline patterns and example-pipelines.md for complete real-world pipeline examples.


Version: 1.0.0

版本历史

共 1 个版本

  • v1.0.0 从ClawHub迁移发布 当前
    2026-06-07 11:09 安全 安全

安全检测

腾讯云安全 (Keen)

安全,无风险
查看报告

腾讯云安全 (Sanbu)

安全,无风险
查看报告

🔗 相关推荐

data-analysis

Data Analysis

ivangdavila
{"answer":"数据分析与可视化。查询数据库、生成报告、自动化电子表格,将原始数据转化为清晰可行的见解。适用于:(1) 您……"}
★ 211 📥 69,878
data-analysis

Tavily 搜索

jacky1n7
通过 Tavily API 进行网页搜索(Brave 替代方案)。当用户要求搜索网页、查找来源或链接,且 Brave 网页搜索不可用时使用。
★ 274 📥 100,893
design-media

agnes-image-gen

user_15292d5a
使用 Agnes AI 的图片生成模型生成图片,支持文生图(agnes-image-2.1-flash)和图生图(agnes-image-2.0-flash)。支持自定义 API Key,用户可使用自己的 Agnes Key。优化重点:降低
★ 1 📥 199