← 返回
未分类 中文

Task Automation Workflows

Automate repetitive tasks with scripts, workflows, and schedules. Create efficient automation for file operations, data processing, API calls, and scheduled...
使用脚本、工作流和定时任务自动化重复性任务,创建高效的文件操作、数据处理、API调用和定时任务自动化...
engsathiago
未分类 clawhub v1.0.0 1 版本 99904.7 Key: 无需
★ 0
Stars
📥 1,048
下载
💾 0
安装
1
版本
#automation#batch#cron#latest#scheduling#scripts#workflow

概述

Task Automation

Turn repetitive work into automated workflows. Save time, reduce errors, scale operations.

Automation Types

1. File Operations

Batch Rename:

import os
import re

def batch_rename(directory, pattern, replacement):
    """Rename files matching pattern"""
    for filename in os.listdir(directory):
        if re.match(pattern, filename):
            new_name = re.sub(pattern, replacement, filename)
            os.rename(
                os.path.join(directory, filename),
                os.path.join(directory, new_name)
            )
            print(f"Renamed: {filename} -> {new_name}")

Batch Convert:

from PIL import Image
import os

def convert_images(input_dir, output_dir, format='webp'):
    """Convert all images to format"""
    os.makedirs(output_dir, exist_ok=True)
    
    for filename in os.listdir(input_dir):
        if filename.lower().endswith(('.png', '.jpg', '.jpeg')):
            img = Image.open(os.path.join(input_dir, filename))
            name = os.path.splitext(filename)[0]
            img.save(os.path.join(output_dir, f"{name}.{format}"), format.upper())
            print(f"Converted: {filename}")

Organize Files:

import os
import shutil

def organize_by_type(directory):
    """Move files into type folders"""
    extensions = {
        'images': ['.jpg', '.jpeg', '.png', '.gif', '.webp'],
        'documents': ['.pdf', '.doc', '.docx', '.txt', '.md'],
        'videos': ['.mp4', '.mov', '.avi', '.mkv'],
        'audio': ['.mp3', '.wav', '.flac'],
        'code': ['.py', '.js', '.ts', '.go', '.rs'],
    }
    
    for filename in os.listdir(directory):
        filepath = os.path.join(directory, filename)
        if os.path.isfile(filepath):
            ext = os.path.splitext(filename)[1].lower()
            for folder, exts in extensions.items():
                if ext in exts:
                    target = os.path.join(directory, folder)
                    os.makedirs(target, exist_ok=True)
                    shutil.move(filepath, os.path.join(target, filename))
                    print(f"Moved {filename} to {folder}/")
                    break

2. Data Processing

Batch Transform:

import pandas as pd

def process_csv_batch(input_dir, output_file, transform_func):
    """Process multiple CSVs and combine"""
    dfs = []
    
    for filename in os.listdir(input_dir):
        if filename.endswith('.csv'):
            df = pd.read_csv(os.path.join(input_dir, filename))
            df = transform_func(df)
            dfs.append(df)
    
    combined = pd.concat(dfs, ignore_index=True)
    combined.to_csv(output_file, index=False)
    print(f"Processed {len(dfs)} files into {output_file}")

Data Pipeline:

def create_pipeline(steps):
    """Create reusable data pipeline"""
    def pipeline(data):
        result = data
        for step in steps:
            result = step(result)
        return result
    return pipeline

# Example usage:
pipeline = create_pipeline([
    lambda x: x.dropna(),
    lambda x: x.drop_duplicates(),
    lambda x: x[x['value'] > 0],
    lambda x: x.sort_values('date')
])

clean_data = pipeline(raw_data)

3. API Operations

Rate-Limited API Client:

import time
from functools import wraps

def rate_limit(calls_per_second=2):
    """Decorator to rate limit API calls"""
    min_interval = 1.0 / calls_per_second
    last_call = [0.0]
    
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            elapsed = time.time() - last_call[0]
            if elapsed < min_interval:
                time.sleep(min_interval - elapsed)
            last_call[0] = time.time()
            return func(*args, **kwargs)
        return wrapper
    return decorator

@rate_limit(2)  # 2 calls per second
def api_call(endpoint, data):
    return requests.post(endpoint, json=data)

Batch API Calls:

def batch_api_calls(items, endpoint, batch_size=100):
    """Process API calls in batches"""
    results = []
    
    for i in range(0, len(items), batch_size):
        batch = items[i:i + batch_size]
        
        # Process batch
        response = requests.post(endpoint, json={'items': batch})
        
        if response.status_code == 200:
            results.extend(response.json())
        else:
            print(f"Batch {i//batch_size} failed: {response.status_code}")
        
        time.sleep(1)  # Rate limiting
    
    return results

Retry with Backoff:

import time
import random

def retry_with_backoff(func, max_retries=3, base_delay=1):
    """Retry failed calls with exponential backoff"""
    for attempt in range(max_retries):
        try:
            return func()
        except Exception as e:
            if attempt == max_retries - 1:
                raise
            
            delay = base_delay * (2 ** attempt) + random.random()
            print(f"Retry {attempt + 1}/{max_retries} in {delay:.1f}s: {e}")
            time.sleep(delay)

4. Scheduled Tasks

Cron Jobs:

# Every hour
0 * * * * /path/to/script.sh

# Every day at 9 AM
0 9 * * * /path/to/script.sh

# Every Monday at 9 AM
0 9 * * 1 /path/to/script.sh

# Every hour on weekdays
0 * * * 1-5 /path/to/script.sh

Python Scheduler:

import schedule
import time

def job():
    print("Running scheduled task...")

schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("09:00").do(job)
schedule.every().monday.do(job)

while True:
    schedule.run_pending()
    time.sleep(60)

OpenClaw Cron:

openclaw cron add \
  --name "Daily Report" \
  --schedule "0 9 * * *" \
  --task "Generate daily report and send to slack"

Workflow Patterns

Sequential Pipeline

def sequential_workflow(steps):
    """Run steps in sequence"""
    results = []
    
    for i, step in enumerate(steps):
        try:
            result = step['action'](**step.get('params', {}))
            results.append({'step': i, 'status': 'success', 'result': result})
        except Exception as e:
            results.append({'step': i, 'status': 'error', 'error': str(e)})
            if step.get('stop_on_error', True):
                break
    
    return results

Parallel Execution

from concurrent.futures import ThreadPoolExecutor, as_completed

def parallel_workflow(tasks, max_workers=5):
    """Run tasks in parallel"""
    results = []
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {executor.submit(task['action'], **task.get('params', {})): task 
                   for task in tasks}
        
        for future in as_completed(futures):
            task = futures[future]
            try:
                result = future.result()
                results.append({'task': task['name'], 'status': 'success', 'result': result})
            except Exception as e:
                results.append({'task': task['name'], 'status': 'error', 'error': str(e)})
    
    return results

Conditional Workflow

def conditional_workflow(steps):
    """Run steps based on conditions"""
    context = {}
    
    for step in steps:
        # Check condition
        if 'condition' in step:
            if not step['condition'](context):
                print(f"Skipping {step['name']}: condition not met")
                continue
        
        # Execute step
        result = step['action'](**step.get('params', {}), context=context)
        context[step['name']] = result
    
    return context

Error Handling

Graceful Degradation

def robust_operation(data, fallback=None):
    """Try operation with fallback"""
    try:
        return primary_operation(data)
    except SpecificError as e:
        print(f"Primary failed: {e}, trying fallback")
        return fallback_operation(data) if fallback else None
    except Exception as e:
        print(f"All options failed: {e}")
        return fallback

Error Notification

def notify_on_error(func, notify_func):
    """Decorator to notify on errors"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except Exception as e:
            notify_func(f"Error in {func.__name__}: {e}")
            raise
    return wrapper

@notify_on_error(send_slack_message)
def important_operation():
    # ...

Monitoring

Progress Tracking

from tqdm import tqdm

def process_with_progress(items):
    """Process items with progress bar"""
    results = []
    for item in tqdm(items, desc="Processing"):
        results.append(process(item))
    return results

Logging

import logging

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    filename='automation.log'
)

logger = logging.getLogger(__name__)

def logged_operation(data):
    logger.info(f"Starting operation with {len(data)} items")
    try:
        result = process(data)
        logger.info(f"Operation completed: {len(result)} results")
        return result
    except Exception as e:
        logger.error(f"Operation failed: {e}")
        raise

Best Practices

1. Start Simple

Manual → Script → Scheduled → Monitored

Don't over-engineer. Start with a manual process, then automate.

2. Make Idempotent

def safe_operation(data):
    """Can be run multiple times safely"""
    # Check if already done
    if already_processed(data):
        return get_cached_result(data)
    
    # Process
    result = process(data)
    
    # Mark as done
    mark_processed(data)
    
    return result

3. Add Checkpoints

def long_running_workflow(data):
    """Save progress at checkpoints"""
    checkpoint_file = "workflow_checkpoint.json"
    
    # Load checkpoint if exists
    if os.path.exists(checkpoint_file):
        with open(checkpoint_file) as f:
            state = json.load(f)
            start_from = state['step']
            data = state['data']
    else:
        start_from = 0
    
    # Process with checkpoints
    for i, step in enumerate(steps[start_from:], start=start_from):
        result = step(data)
        
        # Save checkpoint
        with open(checkpoint_file, 'w') as f:
            json.dump({'step': i + 1, 'data': result}, f)
    
    # Clean up
    os.remove(checkpoint_file)
    return result

4. Test Thoroughly

def test_automation():
    """Test automation with mock data"""
    test_data = create_mock_data()
    
    # Dry run
    result = automation(test_data, dry_run=True)
    
    # Validate
    assert result['status'] == 'success'
    assert len(result['output']) == expected_count
    
    print("All tests passed!")

5. Document Everything

def automated_task(config):
    """
    Process daily sales data and generate report.
    
    Args:
        config: Dict with keys:
            - input_dir: Directory with CSV files
            - output_file: Path for output report
            - notify: Email to notify on completion
    
    Returns:
        Dict with keys:
            - status: 'success' or 'error'
            - records_processed: Number of records
            - output_file: Path to generated report
    
    Example:
        result = automated_task({
            'input_dir': '/data/sales',
            'output_file': '/reports/daily.csv',
            'notify': 'team@company.com'
        })
    """
    # Implementation...

Common Use Cases

Daily Report Automation

def daily_report():
    # 1. Fetch data
    data = fetch_from_sources()
    
    # 2. Process
    processed = process_data(data)
    
    # 3. Generate report
    report = generate_report(processed)
    
    # 4. Distribute
    send_email(report)
    upload_to_slack(report)
    
    # 5. Archive
    archive_report(report)

Data Synchronization

def sync_data():
    # 1. Get last sync state
    last_sync = get_last_sync_time()
    
    # 2. Fetch changes
    changes = fetch_changes_since(last_sync)
    
    # 3. Apply changes
    for change in changes:
        apply_change(change)
    
    # 4. Update sync state
    update_sync_time()

Cleanup Automation

def cleanup():
    # 1. Remove old files
    remove_old_files(days=30)
    
    # 2. Clear temp directories
    clear_temp_dirs()
    
    # 3. Archive old logs
    archive_logs()
    
    # 4. Optimize database
    optimize_database()

版本历史

共 1 个版本

  • v1.0.0 当前
    2026-05-01 01:48 安全 安全

安全检测

腾讯云安全 (Keen)

安全,无风险
查看报告

腾讯云安全 (Sanbu)

安全,无风险
查看报告

🔗 相关推荐

ai-intelligence

OpenViking Setup

engsathiago
为 OpenClaw 智能体配置 OpenViking 上下文数据库。OpenViking 是专为 AI 智能体设计的开源文件系统上下文数据库。
★ 1 📥 786

Data Extractor

engsathiago
从非结构化来源提取结构化数据。将JSON、CSV、日志及混合格式解析为整洁可用的数据,支持处理异常数据和嵌套结构。
★ 0 📥 746
productivity

Agent Monetization Strategies

engsathiago
AI代理创收策略与框架,涵盖Soul.Markets、ClawHub高级技能、服务变现及被动收入模式。
★ 0 📥 721