← 返回
未分类

Data Pipeline

Lightweight ETL pipeline orchestrator with lifecycle hooks, bundle execution, stage retry, timeout control, and built-in transformers. 26 tests, 100% pass rate.
轻量级 ETL流程编排器,支持生命周期钩子、捆绑执行、阶段重试、超时控制及内置转换器,26 项测试全部通过。
pagoda111king
未分类 clawhub v1.1.0 1 版本 100000 Key: 无需
★ 0
Stars
📥 490
下载
💾 3
安装
1
版本
#latest

概述

data-pipeline · 数据处理管线引擎

> 可组合的数据转换、验证和分析管线。像搭积木一样处理数据。


何时使用

当用户提到:数据清洗、数据转换、ETL、数据验证、数据分组、数据聚合、管道处理、批量数据处理、数组处理

快速开始

const { Pipeline, Transformers, Validators, PipelineFactory } = require('data-pipeline/src/pipeline');

// 创建一个清洗管线
const pipeline = new Pipeline();
pipeline
  .addStage('filter', Transformers.filter(x => x.age >= 18))
  .addStage('pick', Transformers.pick(['name', 'email']))
  .addStage('sort', Transformers.sort('name', 'asc'));

const result = await pipeline.run(users);

核心 API

Pipeline

const pipeline = new Pipeline({ strict: true, context: { key: 'value' } });

// 添加阶段
pipeline.addStage(name, asyncFn, { retryCount: 0, retryDelay: 100, timeout: 30000 });
pipeline.addStages([{ name, fn, options }]);

// 阶段管理
pipeline.insertBefore(target, name, fn, options);
pipeline.insertAfter(target, name, fn, options);
pipeline.removeStage(name);
pipeline.toggleStage(name, enabled);

// 执行
const result = await pipeline.execute(data);  // 返回 { data, metadata }
const data = await pipeline.run(data);         // 只返回数据

// 指标
const metrics = pipeline.getMetrics();
pipeline.resetMetrics();

内置转换器

转换器说明示例
--------------------
filter(fn)过滤Transformers.filter(x => x.active)
map(fn)映射Transformers.map(x => x.name)
reduce(fn, init)归约Transformers.reduce((a,b) => a+b, 0)
groupBy(key)分组Transformers.groupBy('dept')
sort(key, order)排序Transformers.sort('age', 'desc')
dedup(key)去重Transformers.dedup('id')
flatten(depth)扁平化Transformers.flatten(2)
paginate(page, size)分页Transformers.paginate(1, 10)
limit(n)限制Transformers.limit(5)
pick(fields)选择字段Transformers.pick(['name', 'age'])
rename(map)重命名Transformers.rename({old: 'new'})
merge(key, ...sources)合并Transformers.merge('id', extras)

验证器

const schema = {
  name: { required: true, type: 'string', minLength: 1 },
  age: { type: 'number', min: 0, max: 150 },
  email: { pattern: /^[^\s@]+@[^\s@]+\.[^\s@]+$/ },
  role: { enum: ['admin', 'user'] },
  password: { validate: (v) => v.length >= 8 ? true : 'Too short' }
};

const validator = Validators.schema(schema);
const result = validator(data);
// { valid: boolean, errors: [...], totalItems, validItems }

工厂函数

// ETL 管线
const etl = PipelineFactory.createETL(extract, transforms, load);

// 数据清洗管线
const cleaner = PipelineFactory.createCleaner(schema, { defaultField: 'value' });

// 数据分析管线
const analyzer = PipelineFactory.createAnalyzer('groupKey', {
  avgVal: vals => vals.reduce((a,b) => a+b, 0) / vals.length,
  maxVal: vals => Math.max(...vals)
});

使用场景

  1. 数据清洗:验证 → 去重 → 填充默认值 → 修剪字符串
  2. ETL 流程:提取 → 转换(map/filter/reduce)→ 加载
  3. 数据分析:分组 → 聚合 → 排序 → 分页
  4. 数据验证:批量验证对象数组,返回详细错误报告
  5. API 数据处理:合并多个数据源 → 重命名字段 → 选择输出字段

错误处理

try {
  const result = await pipeline.execute(data);
} catch (err) {
  if (err instanceof PipelineError) {
    console.log('Failed at:', err.failedStage);
    console.log('Partial data:', err.lastData);
    console.log('Stage results:', err.stageResults);
  }
}

性能指标

const metrics = pipeline.getMetrics();
// {
//   pipeline: { totalRuns, totalErrors, avgTime },
//   stages: [{ name, calls, errors, avgTime }, ...]
// }

版本历史

共 1 个版本

  • v1.1.0 当前
    2026-05-07 06:40 安全 安全

安全检测

腾讯云安全 (Keen)

安全,无风险
查看报告

腾讯云安全 (Sanbu)

安全,无风险
查看报告

🔗 相关推荐

Meta Skill Weaver

pagoda111king
在编排复杂多步骤任务时使用此技能。提供第一性原理任务分解、EventBus 事件系统、多技能协作...
★ 0 📥 584

Context Booster

pagoda111king
在管理长上下文对话和任务时使用此技能。提供智能压缩、关键信息提取、上下文增强、记忆检索等功能。
★ 0 📥 569

First Principle Analyzer

pagoda111king
在运用第一性原理分析复杂问题时使用此技能。它提供结构化的七阶段分析框架,包括假设识别与挑战。
★ 0 📥 586