← 返回
未分类

智能数据治理

智能数据治理 / 数据治理自动化 / 数据治理全流程 / 数据血缘管理 / 元数据管理 / 数据质量 / 元数据自动采集 / 数据质量六维规则 / 数据分类分级 / 敏感数据识别 / AI数据脱敏 / 合规审计 / 数据安全法 / 数据治理框架 / DAMA / 数据资产盘点
从元数据自动采集到合规审计,一站式数据治理自动化平台——"让数据治理从文档驱动变成代码驱动"。覆盖数据分类分级、敏感数据识别与脱敏、六维数据质量规则、血缘追踪、DAMA治理框架映射,附带一键部署环境。
庄子十八代技师
未分类 community v1.0.0 1 版本 100000 Key: 无需
★ 0
Stars
📥 31
下载
💾 0
安装
1
版本
#latest

概述

智能数据治理

> 从元数据采集到合规审计,一站式数据治理自动化平台——"让数据治理从文档驱动变成代码驱动"。

一、数据治理全景图

                  ┌──────────────────────────┐
                  │     数据治理驾驶舱        │
                  └──────────┬───────────────┘
                             │
     ┌───────────────────────┼───────────────────────┐
     │                       │                       │
     ▼                       ▼                       ▼
┌─────────┐           ┌───────────┐           ┌──────────┐
│ 元数据   │           │  数据质量  │           │ 合规安全  │
│ 管理    │           │  管理      │           │ 管理      │
└────┬────┘           └─────┬─────┘           └────┬─────┘
     │                      │                      │
     ▼                      ▼                      ▼
 血缘图谱              六维质量规则           敏感数据识别
 数据字典              质量报告              自动脱敏
 影响分析              SLA监控              审计报告

二、快速开始

# 一键部署治理平台
docker-compose up -d

# 自动扫描数据库元数据
python scripts/collect_metadata.py --source postgresql

# 生成血缘图谱
python scripts/generate_lineage.py

# 识别敏感数据
python scripts/classify_sensitive.py --threshold 0.8

# 生成合规审计报告
python scripts/generate_audit_report.py --output reports/

三、元数据自动采集

3.1 多源元数据采集器

# scripts/collect_metadata.py
"""自动采集 PostgreSQL / MySQL / ClickHouse / Hive 的元数据"""

def collect_table_metadata(db_type, host, port, db_name):
    """
    采集维度:
    - 表信息(名称、schema、owner、行数、大小、创建时间)
    - 列信息(名称、类型、是否可空、默认值、注释)
    - 索引信息(名称、类型、列)
    - 分区信息(分区键、分区类型、分区数量)
    """
    metadata = {
        'database': db_name,
        'collection_time': datetime.now().isoformat(),
        'tables': []
    }

    for table in get_table_list():
        table_meta = {
            'table_name': table.name,
            'schema': table.schema,
            'owner': table.owner,
            'row_count': table.row_count,
            'size_bytes': table.size_bytes,
            'created_at': table.created_at,
            'last_modified': table.last_modified,
            'columns': [],
            'indexes': [],
        }

        for col in get_columns(table):
            table_meta['columns'].append({
                'name': col.name,
                'type': col.type,
                'nullable': col.nullable,
                'default_value': col.default,
                'comment': col.comment,
            })

        metadata['tables'].append(table_meta)

    return metadata

3.2 血缘自动推导

# scripts/generate_lineage.py
"""通过 SQL 解析推导表级血缘关系"""

import sqlparse
from sqlparse.sql import IdentifierList, Identifier
from sqlparse.tokens import Keyword, DML

def parse_lineage(sql_text):
    """从 SQL 语句提取源表→目标表的血缘关系"""
    parsed = sqlparse.parse(sql_text)[0]

    sources = set()
    target = None

    # 解析 CREATE TABLE AS / INSERT INTO / SELECT FROM
    for token in parsed.tokens:
        if token.ttype is DML:
            # 找到 FROM / JOIN 后的表名
            sources = extract_table_names(parsed, after='FROM')
            sources.update(extract_table_names(parsed, after='JOIN'))

        if token.match(Keyword, 'INTO'):
            target = extract_first_table(parsed, after='INTO')

    return {
        'target': target,
        'sources': list(sources),
        'lineage_type': 'direct',
        'sql_preview': sql_text[:200]
    }

四、数据质量六维规则

4.1 规则定义(YAML 驱动)

# quality_rules.yaml
rules:
  - id: R001
    name: "订单ID完整性"
    dimension: completeness
    target: "public.orders.order_id"
    check: "not_null"
    threshold: 0.999
    severity: critical

  - id: R002
    name: "订单ID唯一性"
    dimension: uniqueness
    target: "public.orders.order_id"
    check: "unique"
    threshold: 0.9999
    severity: critical

  - id: R003
    name: "订单金额有效性"
    dimension: validity
    target: "public.orders.amount"
    check: "between"
    params: {min: 0, max: 1000000}
    threshold: 0.995
    severity: high

  - id: R004
    name: "订单状态一致性"
    dimension: consistency
    target: "public.orders.status"
    check: "in_set"
    params: {values: ["pending", "completed", "shipped", "cancelled"]}
    threshold: 1.0
    severity: high

  - id: R005
    name: "数据时效性"
    dimension: timeliness
    target: "public.orders.created_at"
    check: "freshness"
    params: {max_delay_hours: 2}
    threshold: 0.95
    severity: high

  - id: R006
    name: "用户ID引用完整性"
    dimension: referential
    target: "public.orders.user_id"
    check: "exist_in"
    params: {foreign_table: "public.users", foreign_column: "user_id"}
    threshold: 0.99
    severity: critical

4.2 质量检查引擎

# scripts/run_quality_checks.py
"""根据YAML规则定义执行质量检查,生成质量报告"""

def run_quality_check(rule):
    check_type = rule['check']

    if check_type == 'not_null':
        sql = f"SELECT COUNT(*) as total, COUNT({rule['target']}) as non_null FROM {table}"
    elif check_type == 'unique':
        sql = f"SELECT COUNT(*) as total, COUNT(DISTINCT {col}) as unique_cnt FROM {table}"
    elif check_type == 'between':
        sql = f"SELECT COUNT(*) as total, COUNT(*) FILTER (WHERE {col} BETWEEN {min} AND {max}) as valid FROM {table}"
    elif check_type == 'freshness':
        sql = f"SELECT MAX({col}) as latest FROM {table}"
        hours_delay = (datetime.now() - latest).total_seconds() / 3600
        passed = hours_delay <= max_delay_hours

    # 质量评分
    score = passed_count / total * 100
    status = '✅ PASS' if score >= rule['threshold'] * 100 else '❌ FAIL'

    return {'rule': rule['name'], 'dimension': rule['dimension'],
            'score': round(score, 2), 'status': status}

五、AI 敏感数据识别

5.1 分类分级引擎

# scripts/classify_sensitive.py
"""基于正则+规则引擎,自动识别敏感数据字段"""

SENSITIVE_PATTERNS = {
    'phone': {
        'level': 'L3',
        'regex': r'^1[3-9]\d{9}$',
        'law_basis': '《个人信息保护法》第28条'
    },
    'email': {
        'level': 'L3',
        'regex': r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$',
        'law_basis': '《个人信息保护法》第28条'
    },
    'id_card': {
        'level': 'L4',
        'regex': r'^\d{17}[\dXx]$',
        'law_basis': '《个人信息保护法》第28条 —— 敏感个人信息'
    },
    'bank_card': {
        'level': 'L4',
        'regex': r'^\d{16,19}$',
        'law_basis': '《个人信息保护法》第28条 —— 金融账户'
    },
    'address': {
        'level': 'L3',
        'regex': r'(省|市|区|县|镇|村|路|街|号|栋|单元|室)',
        'law_basis': '《个人信息保护法》第4条'
    },
    'password_hash': {
        'level': 'L5',
        'regex': r'password|passwd|pwd|secret|token',
        'law_basis': '《数据安全法》第27条 —— 核心数据'
    },
}

def classify_table(table_name, columns):
    """对表的所有列进行分类分级"""
    results = []
    for col in columns:
        col_data_for_classify = col['name'] + (' ' + col['comment'] if col.get('comment') else '')
        matched = []
        for label, pattern in SENSITIVE_PATTERNS.items():
            if re.search(pattern['regex'], col_data_for_classify, re.IGNORECASE):
                matched.append({
                    'type': label,
                    'level': pattern['level'],
                    'law': pattern['law_basis']
                })
        if matched:
            results.append({
                'column': col['name'],
                'classifications': matched,
                'max_level': max(m['level'] for m in matched)
            })
    return results

5.2 自动脱敏策略

# scripts/auto_mask.py
"""根据分类级别自动应用脱敏策略"""

MASK_STRATEGIES = {
    'L3': 'partial_mask',     # 138****1234
    'L4': 'full_mask',        # ****
    'L5': 'hash_or_drop',     # SHA256 或不采集
}

def apply_mask(column_name, classification_level, value):
    strategy = MASK_STRATEGIES.get(classification_level, 'none')

    if strategy == 'partial_mask':
        # 手机号:保留前3后4
        if len(str(value)) == 11:
            return f"{str(value)[:3]}****{str(value)[-4:]}"
        # 邮箱:保留首字符和域名
        if '@' in str(value):
            name, domain = str(value).split('@')
            return f"{name[0]}***@{domain}"
        return str(value)[:2] + '***' + str(value)[-2:]

    elif strategy == 'full_mask':
        return '****'

    elif strategy == 'hash_or_drop':
        return hashlib.sha256(str(value).encode()).hexdigest()[:16]

    return value

六、合规审计报告

# scripts/generate_audit_report.py
"""一键生成符合《数据安全法》《个人信息保护法》的审计报告"""

def generate_report():
    report = {
        'report_title': '数据治理合规审计报告',
        'generate_time': datetime.now().isoformat(),
        'legal_framework': ['《数据安全法》', '《个人信息保护法》', '《网络数据安全管理条例》'],
        'sections': []
    }

    # 1. 数据资产盘点
    report['sections'].append({
        'name': '数据资产盘点',
        'metrics': {
            'total_tables': get_table_count(),
            'total_columns': get_column_count(),
            'classified_tables': get_classified_count(),
            'coverage_rate': f"{get_classified_count() / get_table_count() * 100:.1f}%"
        }
    })

    # 2. 分类分级统计
    report['sections'].append({
        'name': '数据分类分级',
        'level_distribution': get_level_distribution(),  # {L3: 45, L4: 12, L5: 3}
        'unclassified_count': get_unclassified_count(),
    })

    # 3. 质量检查结果
    report['sections'].append({
        'name': '数据质量',
        'total_checks': total_checks,
        'passed': passed_checks,
        'failed': failed_checks,
        'pass_rate': f"{passed_checks/total_checks*100:.1f}%",
        'critical_failures': critical_fails,
    })

    # 4. 合规风险项
    report['sections'].append({
        'name': '合规风险评估',
        'risks': identify_risks(),  # 如:未脱敏L4字段、缺少访问日志
    })

    return report

七、DAMA 治理框架映射

DAMA 知识领域本技能覆盖实现方式
:---:---::---
数据架构元数据采集 + 血缘图谱
数据建模与设计数据字典自动生成
数据存储与操作⚠️存储统计 + 增长趋势
数据安全敏感数据识别 + 分级 + 脱敏
数据集成与互操作⚠️血缘追踪覆盖集成链路
文档与内容管理自动生成数据字典
参考数据与主数据⚠️引用完整性检查
数据仓库与BI质量规则覆盖DW层
元数据管理全自动采集 + 血缘推导
数据质量六维规则 + 质量评分

八、部署架构

┌──────────────────────────────────────┐
│         Data Governance Autopilot    │
│                                      │
│  ┌────────┐ ┌────────┐ ┌─────────┐  │
│  │元数据   │ │质量     │ │合规      │  │
│  │采集器   │ │检查器   │ │扫描器    │  │
│  └────┬───┘ └───┬────┘ └────┬────┘  │
│       │         │           │       │
│       └────┬────┴───────────┘       │
│            ▼                        │
│     ┌─────────────┐                 │
│     │ PostgreSQL  │  治理元数据库    │
│     └─────────────┘                 │
│            │                        │
│     ┌──────▼────────┐               │
│     │  治理仪表盘    │  Grafana     │
│     └──────────────┘               │
└──────────────────────────────────────┘

九、常见问题

问题解决
:---:---
元数据采集慢(千表级)增量采集 --incremental,仅扫描 last_modified 有变化的表
分类准确率低调整 --threshold 阈值,或手动标注后反馈训练
质量规则太多难维护severity: critical 优先,非关键规则降级
脱敏后数据不可用L3 部分脱敏保留分析价值,L4/L5 用 Tokenization 替代 Masking

版本历史

共 1 个版本

  • v1.0.0 首批发布:① SKILL.md(元数据自动采集+六维质量规则+AI敏感数据识别+合规审计报告);② Docker Compose一键部署(PostgreSQL+MySQL+Grafana+治理工具);③ setup_governance.sh自动初始化脚本;④ 参考文档三份:数据质量六维规则体系、数据合规法律框架与落地指南、元数据管理最佳实践 当前
    2026-06-01 15:09 安全 安全

安全检测

腾讯云安全 (Keen)

安全,无风险
查看报告

腾讯云安全 (Sanbu)

安全,无风险
查看报告

🔗 相关推荐

data-analysis

AdMapix

fly0pants
AdMapix 原始数据层,提供广告创意、应用、排名、下载/收入及市场元数据。返回 AdMapix API 的结构化 JSON;调用方...
★ 296 📥 139,158
data-analysis

Tavily 搜索

jacky1n7
通过 Tavily API 进行网页搜索(Brave 替代方案)。当用户要求搜索网页、查找来源或链接,且 Brave 网页搜索不可用时使用。
★ 272 📥 100,137
ai-agent

RAG 知识库搭建

user_69009747
企业级 RAG 知识库从零搭建全流程。文档分块策略(固定/语义/结构化三大方案 + 小2大高级优化)、2026 主流嵌入模型选型(中文 BGE / 多语言 BGE-M3 / 英文 OpenAI,含 MTEB 榜单)、完整 Pipeline(
★ 0 📥 115