> 从元数据采集到合规审计,一站式数据治理自动化平台——"让数据治理从文档驱动变成代码驱动"。
┌──────────────────────────┐
│ 数据治理驾驶舱 │
└──────────┬───────────────┘
│
┌───────────────────────┼───────────────────────┐
│ │ │
▼ ▼ ▼
┌─────────┐ ┌───────────┐ ┌──────────┐
│ 元数据 │ │ 数据质量 │ │ 合规安全 │
│ 管理 │ │ 管理 │ │ 管理 │
└────┬────┘ └─────┬─────┘ └────┬─────┘
│ │ │
▼ ▼ ▼
血缘图谱 六维质量规则 敏感数据识别
数据字典 质量报告 自动脱敏
影响分析 SLA监控 审计报告
# 一键部署治理平台
docker-compose up -d
# 自动扫描数据库元数据
python scripts/collect_metadata.py --source postgresql
# 生成血缘图谱
python scripts/generate_lineage.py
# 识别敏感数据
python scripts/classify_sensitive.py --threshold 0.8
# 生成合规审计报告
python scripts/generate_audit_report.py --output reports/
# scripts/collect_metadata.py
"""自动采集 PostgreSQL / MySQL / ClickHouse / Hive 的元数据"""
def collect_table_metadata(db_type, host, port, db_name):
"""
采集维度:
- 表信息(名称、schema、owner、行数、大小、创建时间)
- 列信息(名称、类型、是否可空、默认值、注释)
- 索引信息(名称、类型、列)
- 分区信息(分区键、分区类型、分区数量)
"""
metadata = {
'database': db_name,
'collection_time': datetime.now().isoformat(),
'tables': []
}
for table in get_table_list():
table_meta = {
'table_name': table.name,
'schema': table.schema,
'owner': table.owner,
'row_count': table.row_count,
'size_bytes': table.size_bytes,
'created_at': table.created_at,
'last_modified': table.last_modified,
'columns': [],
'indexes': [],
}
for col in get_columns(table):
table_meta['columns'].append({
'name': col.name,
'type': col.type,
'nullable': col.nullable,
'default_value': col.default,
'comment': col.comment,
})
metadata['tables'].append(table_meta)
return metadata
# scripts/generate_lineage.py
"""通过 SQL 解析推导表级血缘关系"""
import sqlparse
from sqlparse.sql import IdentifierList, Identifier
from sqlparse.tokens import Keyword, DML
def parse_lineage(sql_text):
"""从 SQL 语句提取源表→目标表的血缘关系"""
parsed = sqlparse.parse(sql_text)[0]
sources = set()
target = None
# 解析 CREATE TABLE AS / INSERT INTO / SELECT FROM
for token in parsed.tokens:
if token.ttype is DML:
# 找到 FROM / JOIN 后的表名
sources = extract_table_names(parsed, after='FROM')
sources.update(extract_table_names(parsed, after='JOIN'))
if token.match(Keyword, 'INTO'):
target = extract_first_table(parsed, after='INTO')
return {
'target': target,
'sources': list(sources),
'lineage_type': 'direct',
'sql_preview': sql_text[:200]
}
# quality_rules.yaml
rules:
- id: R001
name: "订单ID完整性"
dimension: completeness
target: "public.orders.order_id"
check: "not_null"
threshold: 0.999
severity: critical
- id: R002
name: "订单ID唯一性"
dimension: uniqueness
target: "public.orders.order_id"
check: "unique"
threshold: 0.9999
severity: critical
- id: R003
name: "订单金额有效性"
dimension: validity
target: "public.orders.amount"
check: "between"
params: {min: 0, max: 1000000}
threshold: 0.995
severity: high
- id: R004
name: "订单状态一致性"
dimension: consistency
target: "public.orders.status"
check: "in_set"
params: {values: ["pending", "completed", "shipped", "cancelled"]}
threshold: 1.0
severity: high
- id: R005
name: "数据时效性"
dimension: timeliness
target: "public.orders.created_at"
check: "freshness"
params: {max_delay_hours: 2}
threshold: 0.95
severity: high
- id: R006
name: "用户ID引用完整性"
dimension: referential
target: "public.orders.user_id"
check: "exist_in"
params: {foreign_table: "public.users", foreign_column: "user_id"}
threshold: 0.99
severity: critical
# scripts/run_quality_checks.py
"""根据YAML规则定义执行质量检查,生成质量报告"""
def run_quality_check(rule):
check_type = rule['check']
if check_type == 'not_null':
sql = f"SELECT COUNT(*) as total, COUNT({rule['target']}) as non_null FROM {table}"
elif check_type == 'unique':
sql = f"SELECT COUNT(*) as total, COUNT(DISTINCT {col}) as unique_cnt FROM {table}"
elif check_type == 'between':
sql = f"SELECT COUNT(*) as total, COUNT(*) FILTER (WHERE {col} BETWEEN {min} AND {max}) as valid FROM {table}"
elif check_type == 'freshness':
sql = f"SELECT MAX({col}) as latest FROM {table}"
hours_delay = (datetime.now() - latest).total_seconds() / 3600
passed = hours_delay <= max_delay_hours
# 质量评分
score = passed_count / total * 100
status = '✅ PASS' if score >= rule['threshold'] * 100 else '❌ FAIL'
return {'rule': rule['name'], 'dimension': rule['dimension'],
'score': round(score, 2), 'status': status}
# scripts/classify_sensitive.py
"""基于正则+规则引擎,自动识别敏感数据字段"""
SENSITIVE_PATTERNS = {
'phone': {
'level': 'L3',
'regex': r'^1[3-9]\d{9}$',
'law_basis': '《个人信息保护法》第28条'
},
'email': {
'level': 'L3',
'regex': r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$',
'law_basis': '《个人信息保护法》第28条'
},
'id_card': {
'level': 'L4',
'regex': r'^\d{17}[\dXx]$',
'law_basis': '《个人信息保护法》第28条 —— 敏感个人信息'
},
'bank_card': {
'level': 'L4',
'regex': r'^\d{16,19}$',
'law_basis': '《个人信息保护法》第28条 —— 金融账户'
},
'address': {
'level': 'L3',
'regex': r'(省|市|区|县|镇|村|路|街|号|栋|单元|室)',
'law_basis': '《个人信息保护法》第4条'
},
'password_hash': {
'level': 'L5',
'regex': r'password|passwd|pwd|secret|token',
'law_basis': '《数据安全法》第27条 —— 核心数据'
},
}
def classify_table(table_name, columns):
"""对表的所有列进行分类分级"""
results = []
for col in columns:
col_data_for_classify = col['name'] + (' ' + col['comment'] if col.get('comment') else '')
matched = []
for label, pattern in SENSITIVE_PATTERNS.items():
if re.search(pattern['regex'], col_data_for_classify, re.IGNORECASE):
matched.append({
'type': label,
'level': pattern['level'],
'law': pattern['law_basis']
})
if matched:
results.append({
'column': col['name'],
'classifications': matched,
'max_level': max(m['level'] for m in matched)
})
return results
# scripts/auto_mask.py
"""根据分类级别自动应用脱敏策略"""
MASK_STRATEGIES = {
'L3': 'partial_mask', # 138****1234
'L4': 'full_mask', # ****
'L5': 'hash_or_drop', # SHA256 或不采集
}
def apply_mask(column_name, classification_level, value):
strategy = MASK_STRATEGIES.get(classification_level, 'none')
if strategy == 'partial_mask':
# 手机号:保留前3后4
if len(str(value)) == 11:
return f"{str(value)[:3]}****{str(value)[-4:]}"
# 邮箱:保留首字符和域名
if '@' in str(value):
name, domain = str(value).split('@')
return f"{name[0]}***@{domain}"
return str(value)[:2] + '***' + str(value)[-2:]
elif strategy == 'full_mask':
return '****'
elif strategy == 'hash_or_drop':
return hashlib.sha256(str(value).encode()).hexdigest()[:16]
return value
# scripts/generate_audit_report.py
"""一键生成符合《数据安全法》《个人信息保护法》的审计报告"""
def generate_report():
report = {
'report_title': '数据治理合规审计报告',
'generate_time': datetime.now().isoformat(),
'legal_framework': ['《数据安全法》', '《个人信息保护法》', '《网络数据安全管理条例》'],
'sections': []
}
# 1. 数据资产盘点
report['sections'].append({
'name': '数据资产盘点',
'metrics': {
'total_tables': get_table_count(),
'total_columns': get_column_count(),
'classified_tables': get_classified_count(),
'coverage_rate': f"{get_classified_count() / get_table_count() * 100:.1f}%"
}
})
# 2. 分类分级统计
report['sections'].append({
'name': '数据分类分级',
'level_distribution': get_level_distribution(), # {L3: 45, L4: 12, L5: 3}
'unclassified_count': get_unclassified_count(),
})
# 3. 质量检查结果
report['sections'].append({
'name': '数据质量',
'total_checks': total_checks,
'passed': passed_checks,
'failed': failed_checks,
'pass_rate': f"{passed_checks/total_checks*100:.1f}%",
'critical_failures': critical_fails,
})
# 4. 合规风险项
report['sections'].append({
'name': '合规风险评估',
'risks': identify_risks(), # 如:未脱敏L4字段、缺少访问日志
})
return report
| DAMA 知识领域 | 本技能覆盖 | 实现方式 |
|---|---|---|
| :--- | :---: | :--- |
| 数据架构 | ✅ | 元数据采集 + 血缘图谱 |
| 数据建模与设计 | ✅ | 数据字典自动生成 |
| 数据存储与操作 | ⚠️ | 存储统计 + 增长趋势 |
| 数据安全 | ✅ | 敏感数据识别 + 分级 + 脱敏 |
| 数据集成与互操作 | ⚠️ | 血缘追踪覆盖集成链路 |
| 文档与内容管理 | ✅ | 自动生成数据字典 |
| 参考数据与主数据 | ⚠️ | 引用完整性检查 |
| 数据仓库与BI | ✅ | 质量规则覆盖DW层 |
| 元数据管理 | ✅ | 全自动采集 + 血缘推导 |
| 数据质量 | ✅ | 六维规则 + 质量评分 |
┌──────────────────────────────────────┐
│ Data Governance Autopilot │
│ │
│ ┌────────┐ ┌────────┐ ┌─────────┐ │
│ │元数据 │ │质量 │ │合规 │ │
│ │采集器 │ │检查器 │ │扫描器 │ │
│ └────┬───┘ └───┬────┘ └────┬────┘ │
│ │ │ │ │
│ └────┬────┴───────────┘ │
│ ▼ │
│ ┌─────────────┐ │
│ │ PostgreSQL │ 治理元数据库 │
│ └─────────────┘ │
│ │ │
│ ┌──────▼────────┐ │
│ │ 治理仪表盘 │ Grafana │
│ └──────────────┘ │
└──────────────────────────────────────┘
| 问题 | 解决 |
|---|---|
| :--- | :--- |
| 元数据采集慢(千表级) | 增量采集 --incremental,仅扫描 last_modified 有变化的表 |
| 分类准确率低 | 调整 --threshold 阈值,或手动标注后反馈训练 |
| 质量规则太多难维护 | 按 severity: critical 优先,非关键规则降级 |
| 脱敏后数据不可用 | L3 部分脱敏保留分析价值,L4/L5 用 Tokenization 替代 Masking |
共 1 个版本