← 返回
未分类 中文

Python Flow Engine

Lightweight Python flow orchestration library. Use when building data processing pipelines, workflow automation, ETL tasks, or any multi-step computation tha...
轻量级 Python 流程编排库。用于构建数据处理管道、工作流自动化、ETL 任务或任何多步骤计算...
dnaxxx-hub
未分类 clawhub v1.0.0 1 版本 100000 Key: 无需
★ 0
Stars
📥 308
下载
💾 1
安装
1
版本
#flow#latest#orchestration#pipeline#python#workflow

概述

Python Flow Engine

Lightweight Python flow orchestration — no external dependencies. Build pipelines with serial (>>), parallel (//), and conditional (|) node execution.

Core API

Node

from pipeline import Node

def my_fn(context, data):
    # context: shared dict for the pipeline run
    # data: input from previous node
    return modified_data

node = Node(my_fn, name="step1", timeout=10, retry=1)

Pipeline

pipe = Pipeline()
pipe.add_node(node)
pipe.run(start_node, input_data)

Connection Operators

OperatorModeBehavior
--------------------------
a >> bSeriala runs, then b runs with a's output
a // bParallela and b run concurrently (bidirectional)
`a \b`Conditionalb runs based on a's output (use connect)

For conditional connections, use connect():

pipe.connect(node_a, node_b, "conditional",
    condition=lambda ctx, data: ctx.get("is_valid", False))

Or use Node.condition parameter:

node_b = Node(fn, condition=lambda ctx, data: data > 0)

Visualization

print(pipe.visualize())  # outputs Mermaid.js flowchart

Error Handling

  • retry: Node(fn, retry=2) retries up to 3 total attempts
  • timeout: Node(fn, timeout=5) raises RuntimeError if execution exceeds 5s

Available Scripts

  • scripts/pipeline.py — Core library (importable, no dependencies)
  • scripts/pipeline_demo.py — End-to-end demo (6 scenarios)

Quick Start

from pipeline import Node, Pipeline

def double(ctx, x): return x * 2
def add_one(ctx, x): return x + 1

pipe = Pipeline()
n1 = Node(double, name="double")
n2 = Node(add_one, name="add_one")
pipe.add_node(n1)
pipe.add_node(n2)
n1 >> n2

result = pipe.run(n1, 5)  # 5*2+1 = 11
print(result)  # 11
print(pipe.visualize())

Run the full demo:

python scripts/pipeline_demo.py

版本历史

共 1 个版本

  • v1.0.0 当前
    2026-05-29 21:25 安全 安全

安全检测

腾讯云安全 (Keen)

安全,无风险
查看报告

腾讯云安全 (Sanbu)

安全,无风险
查看报告

🔗 相关推荐

DOCX Editor

dnaxxx-hub
读取并生成结构、样式正确且跨平台兼容的 Word 文档。
★ 0 📥 230

Windows Embedding Setup

dnaxxx-hub
在 Windows 上为 OpenClaw 配置本地 embedding / 本地记忆检索时使用。适用于:下载并接入 `nomic-embed-text-v1.5.Q8_0.gguf`、把 `memorySearch.provider` 改
★ 0 📥 198

File Cleaner ZH

dnaxxx-hub
文件整理器,自动按类型分类
★ 0 📥 194