← 返回
未分类

数据清洗与分析

Use when receiving tabular data (CSV, Excel, JSON, SQL results) that needs cleaning, exploration, or analysis — handles missing values, outliers, duplicates, type conversion, statistical summary, visualization, and correlation analysis automatically
Use when receiving tabular data (CSV, Excel, JSON, SQL results) that needs cleaning, exploration, or analysis — handles missing values, outliers, duplicates, type conversion, statistical summary, visualization, and correlation analysis automatically
user_d9bb652a
未分类 community v1.0.0 1 版本 99230.8 Key: 无需
★ 0
Stars
📥 129
下载
💾 0
安装
1
版本
#latest

概述

Data Cleaning & Analysis

Overview

收到表格数据后,按固定流水线自动完成:加载概览 → 清洗 → 探索 → 分析 → 建议。每一步先诊断再处理,不跳过中间步骤。

Pipeline

LOAD → CLEAN → EXPLORE → ANALYZE → REPORT

各阶段不可跳过。上一阶段的输出是下一阶段的输入。

Stage 1: LOAD — 加载与概览

import pandas as pd
import numpy as np

# 自动检测文件类型加载
def load_data(path):
    ext = path.suffix.lower() if hasattr(path, 'suffix') else path.split('.')[-1]
    loaders = {
        '.csv': lambda p: pd.read_csv(p, encoding='utf-8-sig'),
        '.tsv': lambda p: pd.read_csv(p, sep='\t', encoding='utf-8-sig'),
        '.xlsx': lambda p: pd.read_excel(p),
        '.xls': lambda p: pd.read_excel(p),
        '.json': lambda p: pd.read_json(p),
        '.parquet': lambda p: pd.read_parquet(p),
    }
    return loaders.get(ext, pd.read_csv)(path)

加载后立即输出以下信息,不等待用户确认:

df = load_data(filepath)
print(f"Shape: {df.shape[0]} rows x {df.shape[1]} cols")
print(f"\nDtypes:\n{df.dtypes.value_counts()}")
print(f"\nMissing:\n{df.isnull().sum()[df.isnull().sum() > 0]}")
print(f"\nDuplicated rows: {df.duplicated().sum()}")
print(f"\nHead:\n{df.head()}")
print(f"\nDescribe:\n{df.describe(include='all')}")

Stage 2: CLEAN — 数据清洗

2.1 缺失值处理

def diagnose_missing(df):
    missing = df.isnull().sum()
    missing_pct = (missing / len(df) * 100).round(2)
    report = pd.DataFrame({'count': missing, 'pct': missing_pct})
    return report[report['count'] > 0].sort_values('count', ascending=False)

def handle_missing(df, report):
    for col in report.index:
        pct = report.loc[col, 'pct']
        if pct > 50:
            df.drop(columns=[col], inplace=True)
            print(f"Dropped column '{col}' ({pct}% missing)")
        elif pct > 5:
            if df[col].dtype in ('float64', 'int64'):
                df[col].fillna(df[col].median(), inplace=True)
                print(f"Filled '{col}' with median")
            else:
                df[col].fillna(df[col].mode()[0] if not df[col].mode().empty else 'UNKNOWN', inplace=True)
                print(f"Filled '{col}' with mode")
        else:
            df.dropna(subset=[col], inplace=True)
            print(f"Dropped {report.loc[col, 'count']} rows with missing '{col}'")
    return df

2.2 重复值

n_dup = df.duplicated().sum()
if n_dup > 0:
    df.drop_duplicates(inplace=True)
    print(f"Removed {n_dup} duplicate rows")

2.3 异常值(IQR 方法)

def detect_outliers(df):
    outliers = {}
    for col in df.select_dtypes(include=np.number).columns:
        Q1, Q3 = df[col].quantile(0.25), df[col].quantile(0.75)
        IQR = Q3 - Q1
        lower, upper = Q1 - 1.5 * IQR, Q3 + 1.5 * IQR
        mask = (df[col] < lower) | (df[col] > upper)
        if mask.sum() > 0:
            outliers[col] = {
                'count': mask.sum(), 'pct': round(mask.sum() / len(df) * 100, 2),
                'lower': lower, 'upper': upper
            }
    return outliers

# 处理:Winsorize(截尾)而非粗暴删除
from scipy.stats.mstats import winsorize

def handle_outliers(df, outliers, method='winsorize'):
    for col in outliers:
        limits = (0.01, 0.01)  # 1% each tail
        df[col] = winsorize(df[col].astype(float), limits=limits)
        print(f"Winsorized '{col}' ({outliers[col]['count']} outliers)")
    return df

2.4 数据类型转换

def auto_convert_types(df):
    for col in df.columns:
        # 尝试转 datetime
        if df[col].dtype == 'object':
            try:
                df[col] = pd.to_datetime(df[col], infer_datetime_format=True)
                print(f"Converted '{col}' to datetime")
                continue
            except (ValueError, TypeError):
                pass
        # 尝试转 numeric
        if df[col].dtype == 'object':
            converted = pd.to_numeric(df[col], errors='coerce')
            if converted.notna().sum() > df[col].notna().sum() * 0.8:
                df[col] = converted
                print(f"Converted '{col}' to numeric")
        # 低基数 object → category
        if df[col].dtype == 'object' and df[col].nunique() < df[col].notna().sum() * 0.1:
            df[col] = df[col].astype('category')
            print(f"Converted '{col}' to category ({df[col].nunique()} unique)")
    return df

Stage 3: EXPLORE — 可视化探索

import matplotlib.pyplot as plt
import seaborn as sns

# 中文字体设置
plt.rcParams['font.sans-serif'] = ['SimHei', 'DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False

def explore_plots(df, output_dir='./analysis_plots'):
    import os
    os.makedirs(output_dir, exist_ok=True)
    numeric_cols = df.select_dtypes(include=np.number).columns
    cat_cols = df.select_dtypes(include=['object', 'category']).columns

    # 1. 缺失值热图(如有缺失)
    if df.isnull().sum().sum() > 0:
        fig, ax = plt.subplots(figsize=(10, max(4, len(df.columns) * 0.3)))
        sns.heatmap(df.isnull(), cbar=False, yticklabels=False, ax=ax)
        ax.set_title('Missing Values Heatmap')
        fig.savefig(f'{output_dir}/missing_heatmap.png', dpi=100, bbox_inches='tight')
        plt.close(fig)

    # 2. 数值列分布直方图(限制最多 16 张,多列时分批)
    for i, col in enumerate(numeric_cols[:16]):
        fig, ax = plt.subplots(figsize=(6, 4))
        df[col].hist(bins=30, ax=ax, edgecolor='white')
        ax.set_title(f'Distribution: {col}')
        fig.savefig(f'{output_dir}/hist_{col}.png', dpi=100, bbox_inches='tight')
        plt.close(fig)

    # 3. 数值列箱线图
    if len(numeric_cols) > 0 and len(numeric_cols) <= 20:
        fig, ax = plt.subplots(figsize=(max(6, len(numeric_cols) * 0.5), 4))
        df[numeric_cols].boxplot(ax=ax, rot=45)
        ax.set_title('Boxplot of Numeric Columns')
        fig.savefig(f'{output_dir}/boxplot.png', dpi=100, bbox_inches='tight')
        plt.close(fig)

    # 4. 相关性热图
    if len(numeric_cols) >= 2:
        fig, ax = plt.subplots(figsize=(max(8, len(numeric_cols) * 0.6), max(6, len(numeric_cols) * 0.5)))
        corr = df[numeric_cols].corr()
        sns.heatmap(corr, annot=True, fmt='.2f', cmap='RdYlBu_r', center=0, ax=ax)
        ax.set_title('Correlation Heatmap')
        fig.savefig(f'{output_dir}/correlation_heatmap.png', dpi=100, bbox_inches='tight')
        plt.close(fig)

    # 5. 分类列柱状图(top 10 categories)
    for col in cat_cols[:6]:
        fig, ax = plt.subplots(figsize=(8, 4))
        top = df[col].value_counts().nlargest(10)
        top.plot.bar(ax=ax, edgecolor='white')
        ax.set_title(f'Top 10: {col}')
        ax.tick_params(axis='x', rotation=45)
        fig.savefig(f'{output_dir}/bar_{col}.png', dpi=100, bbox_inches='tight')
        plt.close(fig)

    return output_dir

Stage 4: ANALYZE — 统计分析

def statistical_analysis(df):
    numeric_cols = df.select_dtypes(include=np.number).columns

    # 描述性统计(含偏度、峰度)
    desc = df[numeric_cols].describe()
    desc.loc['skew'] = df[numeric_cols].skew()
    desc.loc['kurtosis'] = df[numeric_cols].kurtosis()
    print("=== Descriptive Statistics ===")
    print(desc.round(2))

    # 高偏度列提醒
    for col in numeric_cols:
        skew = desc.loc['skew', col]
        if abs(skew) > 1:
            print(f"'{col}' is highly skewed ({skew:.2f}), consider log/sqrt transform")

    # 相关性报告(高相关对)
    if len(numeric_cols) >= 2:
        corr = df[numeric_cols].corr()
        high_corr = []
        for i in range(len(numeric_cols)):
            for j in range(i + 1, len(numeric_cols)):
                if abs(corr.iloc[i, j]) > 0.7:
                    high_corr.append((numeric_cols[i], numeric_cols[j], corr.iloc[i, j]))
        if high_corr:
            print("\n=== High Correlations (|r| > 0.7) ===")
            for c1, c2, r in sorted(high_corr, key=lambda x: -abs(x[2])):
                print(f"  {c1} <-> {c2}: {r:.3f}")

    # 分类列信息
    cat_cols = df.select_dtypes(include=['object', 'category']).columns
    for col in cat_cols:
        n_unique = df[col].nunique()
        top_val = df[col].value_counts().index[0]
        top_pct = df[col].value_counts().iloc[0] / len(df) * 100
        print(f"'{col}': {n_unique} unique, top='{top_val}' ({top_pct:.1f}%)")

Stage 5: REPORT — 汇总与建议

最终输出结构化摘要,包含:

  • 数据质量评分(0-100):基于缺失率、重复率、异常值率的加权扣分
  • 已执行的清洗操作列表
  • 数据关键发现(偏度、高相关、基数信息)
  • 特征工程建议(需要 log 变换的列、需要编码的列、可能的交互特征)
def quality_score(df_original, df_cleaned, outliers_report):
    score = 100
    n = len(df_original)
    score -= (df_original.isnull().sum().sum() / (n * len(df_original.columns))) * 100
    score -= (df_original.duplicated().sum() / n) * 100
    score -= (sum(o['count'] for o in outliers_report.values()) / n) * 50
    return max(0, min(100, round(score)))

Decision Rules

情况处理方式
---------------
缺失率 > 50%删除该列
缺失率 5%-50%数值列用中位数填充,分类列用众数填充
缺失率 < 5%删除对应行
异常值 < 5% 的列Winsorize 截尾
异常值 > 5% 的列标注后保留,提醒用户
重复行直接删除
相关性 > 0.95 的列对建议删除其中一个
偏度 > 2 的列建议 log/sqrt 变换

Common Mistakes

  • 跳过概览直接清洗: 必须先了解数据全貌再动手
  • 用均值填充偏态列: 偏态列用中位数,正态列用均值
  • IQR 范围之外全删除: 实际业务中的极端值可能是关键信号
  • 把测试集和训练集分开清洗: 合并清洗或分别用相同的统计量
  • 不检查编码问题: CSV 可能是 GBK/GB2312 而非 UTF-8

版本历史

共 1 个版本

  • v1.0.0 Initial release 当前
    2026-05-18 10:57 安全 安全

安全检测

腾讯云安全 (Keen)

安全,无风险
查看报告

腾讯云安全 (Sanbu)

安全,无风险
查看报告

🔗 相关推荐

developer-tools

Github

steipete
使用 `gh` CLI 与 GitHub 交互,通过 `gh issue`、`gh pr`、`gh run` 和 `gh api` 管理议题、PR、CI 运行及高级查询。
★ 672 📥 324,437
ai-intelligence

Self-Improving + Proactive Agent

ivangdavila
自我反思+自我批评+自我学习+自组织记忆。智能体评估自身工作、发现错误并持续改进。
★ 1,362 📥 318,920
security-compliance

Skill Vetter

spclaudehome
AI智能体技能安全预审工具。安装ClawdHub、GitHub等来源技能前,检查风险信号、权限范围及可疑模式。
★ 1,219 📥 266,762