收到表格数据后,按固定流水线自动完成:加载概览 → 清洗 → 探索 → 分析 → 建议。每一步先诊断再处理,不跳过中间步骤。
LOAD → CLEAN → EXPLORE → ANALYZE → REPORT
各阶段不可跳过。上一阶段的输出是下一阶段的输入。
import pandas as pd
import numpy as np
# 自动检测文件类型加载
def load_data(path):
ext = path.suffix.lower() if hasattr(path, 'suffix') else path.split('.')[-1]
loaders = {
'.csv': lambda p: pd.read_csv(p, encoding='utf-8-sig'),
'.tsv': lambda p: pd.read_csv(p, sep='\t', encoding='utf-8-sig'),
'.xlsx': lambda p: pd.read_excel(p),
'.xls': lambda p: pd.read_excel(p),
'.json': lambda p: pd.read_json(p),
'.parquet': lambda p: pd.read_parquet(p),
}
return loaders.get(ext, pd.read_csv)(path)
加载后立即输出以下信息,不等待用户确认:
df = load_data(filepath)
print(f"Shape: {df.shape[0]} rows x {df.shape[1]} cols")
print(f"\nDtypes:\n{df.dtypes.value_counts()}")
print(f"\nMissing:\n{df.isnull().sum()[df.isnull().sum() > 0]}")
print(f"\nDuplicated rows: {df.duplicated().sum()}")
print(f"\nHead:\n{df.head()}")
print(f"\nDescribe:\n{df.describe(include='all')}")
def diagnose_missing(df):
missing = df.isnull().sum()
missing_pct = (missing / len(df) * 100).round(2)
report = pd.DataFrame({'count': missing, 'pct': missing_pct})
return report[report['count'] > 0].sort_values('count', ascending=False)
def handle_missing(df, report):
for col in report.index:
pct = report.loc[col, 'pct']
if pct > 50:
df.drop(columns=[col], inplace=True)
print(f"Dropped column '{col}' ({pct}% missing)")
elif pct > 5:
if df[col].dtype in ('float64', 'int64'):
df[col].fillna(df[col].median(), inplace=True)
print(f"Filled '{col}' with median")
else:
df[col].fillna(df[col].mode()[0] if not df[col].mode().empty else 'UNKNOWN', inplace=True)
print(f"Filled '{col}' with mode")
else:
df.dropna(subset=[col], inplace=True)
print(f"Dropped {report.loc[col, 'count']} rows with missing '{col}'")
return df
n_dup = df.duplicated().sum()
if n_dup > 0:
df.drop_duplicates(inplace=True)
print(f"Removed {n_dup} duplicate rows")
def detect_outliers(df):
outliers = {}
for col in df.select_dtypes(include=np.number).columns:
Q1, Q3 = df[col].quantile(0.25), df[col].quantile(0.75)
IQR = Q3 - Q1
lower, upper = Q1 - 1.5 * IQR, Q3 + 1.5 * IQR
mask = (df[col] < lower) | (df[col] > upper)
if mask.sum() > 0:
outliers[col] = {
'count': mask.sum(), 'pct': round(mask.sum() / len(df) * 100, 2),
'lower': lower, 'upper': upper
}
return outliers
# 处理:Winsorize(截尾)而非粗暴删除
from scipy.stats.mstats import winsorize
def handle_outliers(df, outliers, method='winsorize'):
for col in outliers:
limits = (0.01, 0.01) # 1% each tail
df[col] = winsorize(df[col].astype(float), limits=limits)
print(f"Winsorized '{col}' ({outliers[col]['count']} outliers)")
return df
def auto_convert_types(df):
for col in df.columns:
# 尝试转 datetime
if df[col].dtype == 'object':
try:
df[col] = pd.to_datetime(df[col], infer_datetime_format=True)
print(f"Converted '{col}' to datetime")
continue
except (ValueError, TypeError):
pass
# 尝试转 numeric
if df[col].dtype == 'object':
converted = pd.to_numeric(df[col], errors='coerce')
if converted.notna().sum() > df[col].notna().sum() * 0.8:
df[col] = converted
print(f"Converted '{col}' to numeric")
# 低基数 object → category
if df[col].dtype == 'object' and df[col].nunique() < df[col].notna().sum() * 0.1:
df[col] = df[col].astype('category')
print(f"Converted '{col}' to category ({df[col].nunique()} unique)")
return df
import matplotlib.pyplot as plt
import seaborn as sns
# 中文字体设置
plt.rcParams['font.sans-serif'] = ['SimHei', 'DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False
def explore_plots(df, output_dir='./analysis_plots'):
import os
os.makedirs(output_dir, exist_ok=True)
numeric_cols = df.select_dtypes(include=np.number).columns
cat_cols = df.select_dtypes(include=['object', 'category']).columns
# 1. 缺失值热图(如有缺失)
if df.isnull().sum().sum() > 0:
fig, ax = plt.subplots(figsize=(10, max(4, len(df.columns) * 0.3)))
sns.heatmap(df.isnull(), cbar=False, yticklabels=False, ax=ax)
ax.set_title('Missing Values Heatmap')
fig.savefig(f'{output_dir}/missing_heatmap.png', dpi=100, bbox_inches='tight')
plt.close(fig)
# 2. 数值列分布直方图(限制最多 16 张,多列时分批)
for i, col in enumerate(numeric_cols[:16]):
fig, ax = plt.subplots(figsize=(6, 4))
df[col].hist(bins=30, ax=ax, edgecolor='white')
ax.set_title(f'Distribution: {col}')
fig.savefig(f'{output_dir}/hist_{col}.png', dpi=100, bbox_inches='tight')
plt.close(fig)
# 3. 数值列箱线图
if len(numeric_cols) > 0 and len(numeric_cols) <= 20:
fig, ax = plt.subplots(figsize=(max(6, len(numeric_cols) * 0.5), 4))
df[numeric_cols].boxplot(ax=ax, rot=45)
ax.set_title('Boxplot of Numeric Columns')
fig.savefig(f'{output_dir}/boxplot.png', dpi=100, bbox_inches='tight')
plt.close(fig)
# 4. 相关性热图
if len(numeric_cols) >= 2:
fig, ax = plt.subplots(figsize=(max(8, len(numeric_cols) * 0.6), max(6, len(numeric_cols) * 0.5)))
corr = df[numeric_cols].corr()
sns.heatmap(corr, annot=True, fmt='.2f', cmap='RdYlBu_r', center=0, ax=ax)
ax.set_title('Correlation Heatmap')
fig.savefig(f'{output_dir}/correlation_heatmap.png', dpi=100, bbox_inches='tight')
plt.close(fig)
# 5. 分类列柱状图(top 10 categories)
for col in cat_cols[:6]:
fig, ax = plt.subplots(figsize=(8, 4))
top = df[col].value_counts().nlargest(10)
top.plot.bar(ax=ax, edgecolor='white')
ax.set_title(f'Top 10: {col}')
ax.tick_params(axis='x', rotation=45)
fig.savefig(f'{output_dir}/bar_{col}.png', dpi=100, bbox_inches='tight')
plt.close(fig)
return output_dir
def statistical_analysis(df):
numeric_cols = df.select_dtypes(include=np.number).columns
# 描述性统计(含偏度、峰度)
desc = df[numeric_cols].describe()
desc.loc['skew'] = df[numeric_cols].skew()
desc.loc['kurtosis'] = df[numeric_cols].kurtosis()
print("=== Descriptive Statistics ===")
print(desc.round(2))
# 高偏度列提醒
for col in numeric_cols:
skew = desc.loc['skew', col]
if abs(skew) > 1:
print(f"'{col}' is highly skewed ({skew:.2f}), consider log/sqrt transform")
# 相关性报告(高相关对)
if len(numeric_cols) >= 2:
corr = df[numeric_cols].corr()
high_corr = []
for i in range(len(numeric_cols)):
for j in range(i + 1, len(numeric_cols)):
if abs(corr.iloc[i, j]) > 0.7:
high_corr.append((numeric_cols[i], numeric_cols[j], corr.iloc[i, j]))
if high_corr:
print("\n=== High Correlations (|r| > 0.7) ===")
for c1, c2, r in sorted(high_corr, key=lambda x: -abs(x[2])):
print(f" {c1} <-> {c2}: {r:.3f}")
# 分类列信息
cat_cols = df.select_dtypes(include=['object', 'category']).columns
for col in cat_cols:
n_unique = df[col].nunique()
top_val = df[col].value_counts().index[0]
top_pct = df[col].value_counts().iloc[0] / len(df) * 100
print(f"'{col}': {n_unique} unique, top='{top_val}' ({top_pct:.1f}%)")
最终输出结构化摘要,包含:
def quality_score(df_original, df_cleaned, outliers_report):
score = 100
n = len(df_original)
score -= (df_original.isnull().sum().sum() / (n * len(df_original.columns))) * 100
score -= (df_original.duplicated().sum() / n) * 100
score -= (sum(o['count'] for o in outliers_report.values()) / n) * 50
return max(0, min(100, round(score)))
| 情况 | 处理方式 |
|---|---|
| ------ | --------- |
| 缺失率 > 50% | 删除该列 |
| 缺失率 5%-50% | 数值列用中位数填充,分类列用众数填充 |
| 缺失率 < 5% | 删除对应行 |
| 异常值 < 5% 的列 | Winsorize 截尾 |
| 异常值 > 5% 的列 | 标注后保留,提醒用户 |
| 重复行 | 直接删除 |
| 相关性 > 0.95 的列对 | 建议删除其中一个 |
| 偏度 > 2 的列 | 建议 log/sqrt 变换 |
共 1 个版本