Kafka 流处理专家
> 百万消息秒级吞吐,精确一次绝不丢失——从零搭建生产级 Kafka 实时数据管道。
一、Kafka 全景图
┌─────────────────────────────────┐
│ Kafka 生态全景 │
└────────────┬────────────────────┘
│
┌──────────┬──────────┬──────┴──────┬──────────┬──────────┐
▼ ▼ ▼ ▼ ▼ ▼
核心引擎 Kafka Kafka Kafka Mirror Schema
(Broker) Streams Connect Producer/ Maker 2 Registry
流处理 数据集成 Consumer 跨集群 Schema
DSL/ Source/ API 复制 管理
Processor Sink
二、快速开始
# 一键部署 Kafka 集群 + 监控
docker-compose up -d
# 创建 Topic
python scripts/create_topic.py --name orders --partitions 6 --replication 3
# 运行生产者压测
python scripts/producer_benchmark.py --topic orders --messages 1000000
# 运行消费者压测
python scripts/consumer_benchmark.py --topic orders --group order-consumer
# 启动 Kafka Streams 实时处理
python scripts/streams_demo.py
三、核心概念速查
3.1 架构模型
┌──────────────────────────────────────────────────────────┐
│ Kafka Cluster │
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Broker 1 │ │ Broker 2 │ │ Broker 3 │ │
│ │ │ │ │ │ │ │
│ │ P0(Leader) │ │ P0(Follower)│ │ P1(Leader) │ │
│ │ P1(Follower)│ │ P2(Leader) │ │ P2(Follower)│ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
│ Topic: orders (3 partitions, replication factor = 3) │
└──────────────────────────────────────────────────────────┘
▲ │
│ ▼
┌─────┴──────┐ ┌──────────┐
│ Producer │ │ Consumer │
│ 写入消息 │ │ Group │
└────────────┘ └──────────┘
3.2 关键参数速查
| 参数 | 推荐值 | 说明 |
|---|
| :--- | :--- | :--- |
num.partitions | CPU核数 × 2 | 分区数决定并行度 |
replication.factor | 3 | 生产环境至少3副本 |
min.insync.replicas | 2 | 至少2个副本确认才返回ack |
acks | all / -1 | 生产环境必须all |
enable.idempotence | true | 开启幂等生产者 |
compression.type | lz4 / zstd | 推荐lz4(速度)或zstd(压缩比) |
retention.ms | 604800000 (7天) | 按业务需求调整 |
segment.bytes | 1073741824 (1GB) | 日志段大小 |
四、生产者实战
4.1 幂等生产者 + 事务
# scripts/idempotent_producer.py
"""幂等生产者 + 事务:精确一次语义"""
from kafka import KafkaProducer
import json
import time
class ExactlyOnceProducer:
"""
精确一次语义实现:
1. enable_idempotence=True → 幂等(单分区精确一次)
2. transactional_id → 事务(跨分区精确一次)
"""
def __init__(self, bootstrap_servers: str = "localhost:9092"):
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
# 幂等性配置
enable_idempotence=True,
acks='all',
max_in_flight_requests_per_connection=5,
retries=3,
# 事务配置
transactional_id='order-producer-01',
# 性能配置
compression_type='lz4',
linger_ms=5,
batch_size=32768, # 32KB
# 序列化
key_serializer=lambda k: k.encode('utf-8') if k else None,
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
def send_order_transaction(self, order: dict):
"""事务性发送订单消息"""
self.producer.begin_transaction()
try:
# 发送订单主消息
self.producer.send(
'orders',
key=str(order['order_id']),
value=order
)
# 同时发送到审计Topic
self.producer.send(
'order_audit',
key=str(order['order_id']),
value={
'order_id': order['order_id'],
'event': 'ORDER_CREATED',
'timestamp': int(time.time() * 1000)
}
)
self.producer.commit_transaction()
print(f" 事务提交: order_id={order['order_id']}")
except Exception as e:
self.producer.abort_transaction()
print(f" 事务回滚: {e}")
raise
def close(self):
self.producer.close()
# 演示
producer = ExactlyOnceProducer()
orders = [
{"order_id": 1001, "amount": 299.00, "user_id": "U001", "status": "created"},
{"order_id": 1002, "amount": 599.00, "user_id": "U002", "status": "created"},
{"order_id": 1003, "amount": 199.00, "user_id": "U003", "status": "created"},
]
print("=== 事务性生产者演示 ===")
for order in orders:
producer.send_order_transaction(order)
producer.close()
print("\n 3条订单已事务性写入 orders + order_audit 两个Topic")
4.2 分区策略
# scripts/partition_strategy.py
"""分区策略对比:默认/Key哈希/自定义"""
class PartitionStrategies:
"""
三种分区策略:
1. 默认(Round-Robin):均匀分布,无顺序保证
2. Key哈希:同Key进同分区,保证顺序
3. 自定义:按业务规则路由
"""
@staticmethod
def default_partition(key, all_partitions, available):
"""默认轮询(Kafka内置)"""
return None # 让Kafka自动选择
@staticmethod
def key_hash_partition(key, all_partitions, available):
"""Key哈希分区:同user_id的消息进同一分区"""
if key is None:
return available[0]
return hash(key) % len(all_partitions)
@staticmethod
def custom_partition(key, all_partitions, available):
"""自定义:VIP用户进分区0,普通用户进其他分区"""
if key is None:
return available[0]
# 假设 key 格式为 "VIP_U001" 或 "U002"
if key.startswith("VIP_"):
return 0 # VIP专用分区
else:
return (hash(key) % (len(all_partitions) - 1)) + 1
# 演示
strategies = PartitionStrategies()
partitions = list(range(6)) # 6个分区
print("=== 分区策略演示 ===\n")
test_keys = ["VIP_U001", "U002", "U003", "VIP_U004", "U005"]
print(f"{'Key':<12s} {'默认':<8s} {'Key哈希':<8s} {'自定义':<8s}")
print("-" * 40)
for key in test_keys:
default = strategies.default_partition(key, partitions, partitions)
key_hash = strategies.key_hash_partition(key, partitions, partitions)
custom = strategies.custom_partition(key, partitions, partitions)
print(f"{key:<12s} {str(default):<8s} {key_hash:<8d} {custom:<8d}")
print("\n 自定义策略:VIP用户 → 分区0(隔离+优先处理)")
print(" Key哈希策略:同用户 → 同分区(保证顺序)")
五、消费者实战
5.1 消费者组 + 再均衡
# scripts/consumer_group.py
"""消费者组:自动负载均衡 + 再均衡监听"""
from kafka import KafkaConsumer
import json
class OrderConsumer:
"""
消费者组核心机制:
- 同组消费者自动分配分区
- 消费者增减 → 自动再均衡(Rebalance)
- 手动提交Offset保证精确一次
"""
def __init__(self, group_id: str = "order-processor"):
self.consumer = KafkaConsumer(
'orders',
bootstrap_servers='localhost:9092',
group_id=group_id,
# 手动提交Offset
enable_auto_commit=False,
# 从最早未消费的消息开始
auto_offset_reset='earliest',
# 每次拉取最大条数
max_poll_records=500,
# 两次poll最大间隔(超时触发再均衡)
max_poll_interval_ms=300000, # 5分钟
# Session超时
session_timeout_ms=30000,
# 反序列化
key_deserializer=lambda k: k.decode('utf-8') if k else None,
value_deserializer=lambda v: json.loads(v.decode('utf-8'))
)
def consume_with_manual_commit(self):
"""手动提交Offset:处理完再提交,保证不丢"""
print(f" 消费者组: {self.consumer.config['group_id']}")
print(f" 分配分区: {self.consumer.assignment()}")
for message in self.consumer:
order = message.value
# 业务处理
self.process_order(order)
# 处理成功后手动提交
self.consumer.commit()
print(f" 已处理+提交: order_id={order['order_id']}, "
f"partition={message.partition}, offset={message.offset}")
def process_order(self, order: dict):
"""模拟业务处理"""
# 实际场景:写入数据库、调用下游服务等
pass
def close(self):
self.consumer.close()
# 演示
print("=== 消费者组演示 ===")
print("""
启动方式(多终端模拟多消费者):
python consumer_group.py # 消费者1 → 分配分区0,1,2
python consumer_group.py # 消费者2 → 自动再均衡,各分1.5个分区
python consumer_group.py # 消费者3 → 再均衡,各分1个分区
同组消费者自动负载均衡,无需手动分配!
""")
5.2 死信队列
# scripts/dead_letter_queue.py
"""死信队列:处理失败的消息不阻塞主流程"""
class DeadLetterQueue:
"""
死信队列模式:
1. 主消费者处理消息
2. 失败的消息 → 写入死信Topic
3. 死信消费者 → 重试/告警/人工处理
"""
def __init__(self):
self.max_retries = 3
self.retry_backoff_ms = [1000, 5000, 30000] # 指数退避
def consume_with_dlq(self, message):
"""带死信队列的消费逻辑"""
retry_count = message.headers.get('retry_count', 0)
try:
# 业务处理
self.process(message)
# 成功 → 提交Offset
return "SUCCESS"
except TransientError as e:
# 瞬时错误 → 重试
if retry_count < self.max_retries:
# 写入重试Topic,带递增重试次数
self.send_to_retry_topic(message, retry_count + 1)
return "RETRY"
else:
# 超过重试次数 → 死信队列
self.send_to_dlq(message, str(e))
return "DEAD_LETTER"
except FatalError as e:
# 致命错误 → 直接死信
self.send_to_dlq(message, str(e))
return "DEAD_LETTER"
def send_to_dlq(self, message, error_reason):
"""写入死信队列"""
dlq_message = {
"original_topic": message.topic,
"original_partition": message.partition,
"original_offset": message.offset,
"original_key": message.key,
"original_value": message.value,
"error": error_reason,
"dead_letter_time": int(time.time() * 1000)
}
# 发送到 orders.dlq Topic
self.dlq_producer.send('orders.dlq', value=dlq_message)
print(f" → 死信: offset={message.offset}, reason={error_reason}")
def send_to_retry_topic(self, message, retry_count):
"""写入重试Topic"""
headers = [('retry_count', str(retry_count).encode())]
self.retry_producer.send(
'orders.retry',
key=message.key,
value=message.value,
headers=headers
)
# 演示
print("=== 死信队列模式 ===")
print("""
消息处理流程:
orders → [消费] → 成功 → commit
→ 瞬时失败 → orders.retry (最多3次)
→ 3次后仍失败 → orders.dlq (人工介入)
→ 致命错误 → orders.dlq (直接死信)
Topic 拓扑:
orders ← 主业务Topic
orders.retry ← 重试Topic(消费者自动消费重试)
orders.dlq ← 死信Topic(监控告警+人工处理)
""")
六、Kafka Streams 流处理
# scripts/streams_demo.py
"""Kafka Streams 实时流处理:订单实时聚合"""
# 使用 Faust(Python流处理框架,兼容Kafka Streams语义)
import faust
app = faust.App(
'order-stream-processor',
broker='kafka://localhost:9092',
store='memory://',
topic_partitions=6,
)
# 定义数据模型
class Order(faust.Record, serializer='json'):
order_id: int
user_id: str
amount: float
status: str
# 定义Topic
orders_topic = app.topic('orders', value_type=Order)
orders_by_user_topic = app.topic('orders_by_user', value_type=dict)
# 状态存储:按用户聚合
user_total_amount = app.Table(
'user_total_amount',
default=float,
partitions=6,
).tumbling(60.0) # 60秒滚动窗口
# 流处理:实时计算每用户订单总额
@app.agent(orders_topic)
async def process_orders(orders):
async for order in orders:
# 更新用户累计金额
user_total_amount[order.user_id] += order.amount
# 大额订单告警
if order.amount > 10000:
print(f" ⚠ 大额订单告警: user={order.user_id}, amount={order.amount}")
# 输出到下游Topic
await orders_by_user_topic.send(
key=order.user_id,
value={
'user_id': order.user_id,
'order_id': order.order_id,
'amount': order.amount,
'total_so_far': user_total_amount[order.user_id].value()
}
)
# 定时输出Top N用户
@app.timer(interval=10.0)
async def report_top_users():
top_users = sorted(
user_total_amount.items(),
key=lambda x: x[1],
reverse=True
)[:5]
print(f"\n === Top 5 用户消费金额 ===")
for user, amount in top_users:
print(f" {user}: ¥{amount:.2f}")
if __name__ == '__main__':
app.main()
七、Kafka Connect 数据集成
# scripts/connect_configs.yaml
# Kafka Connect 配置:MySQL → Kafka → Elasticsearch
# Source Connector: MySQL CDC → Kafka
mysql_source:
name: mysql-source-orders
connector.class: io.debezium.connector.mysql.MySqlConnector
database.hostname: mysql
database.port: 3306
database.user: debezium
database.password: dbz123
database.server.id: 1
database.server.name: mysql
database.include.list: sales
table.include.list: sales.orders
transforms: unwrap
transforms.unwrap.type: io.debezium.transforms.ExtractNewRecordState
# Sink Connector: Kafka → Elasticsearch
elasticsearch_sink:
name: elasticsearch-sink-orders
connector.class: io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
topics: mysql.sales.orders
connection.url: http://elasticsearch:9200
key.ignore: false
schema.ignore: true
type.name: _doc
behavior.on.null.values: delete
八、监控与运维
8.1 关键指标
| 指标 | 含义 | 告警阈值 |
|---|
| :--- | :--- | :--- |
UnderReplicatedPartitions | 副本不足的分区数 | > 0 |
OfflinePartitionsCount | 离线分区数 | > 0 |
ActiveControllerCount | 活跃Controller数 | ≠ 1 |
BytesInPerSec | 入站吞吐量 | 接近带宽上限 |
BytesOutPerSec | 出站吞吐量 | 接近带宽上限 |
MessagesInPerSec | 每秒消息数 | 突增>200% |
RequestQueueSize | 请求队列长度 | > 500 |
ConsumerLag | 消费者滞后量 | > 10000 |
8.2 消费者 Lag 监控
# scripts/monitor_consumer_lag.py
"""消费者 Lag 监控脚本"""
from kafka import KafkaAdminClient, KafkaConsumer
from kafka.admin import ConsumerGroupDescription
class LagMonitor:
"""监控消费者组 Lag"""
def __init__(self, bootstrap_servers: str = "localhost:9092"):
self.admin = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
def get_consumer_lag(self, group_id: str) -> dict:
"""获取消费者组 Lag"""
# 获取消费者组Offset
consumer_offsets = self.admin.list_consumer_group_offsets(group_id)
lag_report = {}
for tp, offset_meta in consumer_offsets.items():
# 获取分区最新Offset
end_offset = self.admin. \
._client. \
.get_partition_offsets(tp.topic, tp.partition)
lag = end_offset - offset_meta.offset
lag_report[f"{tp.topic}:{tp.partition}"] = {
"consumer_offset": offset_meta.offset,
"end_offset": end_offset,
"lag": lag,
"status": "⚠ 积压" if lag > 10000 else "✅ 正常"
}
return lag_report
def print_lag_report(self, group_id: str):
"""打印Lag报告"""
report = self.get_consumer_lag(group_id)
total_lag = sum(r['lag'] for r in report.values())
print(f"\n=== 消费者组 Lag 报告: {group_id} ===")
print(f"{'分区':<25s} {'消费Offset':>12s} {'最新Offset':>12s} {'Lag':>10s} {'状态'}")
print("-" * 75)
for tp, info in report.items():
print(f"{tp:<25s} {info['consumer_offset']:>12d} "
f"{info['end_offset']:>12d} {info['lag']:>10d} {info['status']}")
print(f"\n 总Lag: {total_lag}")
# 演示
monitor = LagMonitor()
monitor.print_lag_report("order-processor")
九、性能调优 Checklist
生产者调优:
□ acks=all(不丢消息)
□ enable.idempotence=true(幂等)
□ compression.type=lz4(压缩)
□ linger.ms=5-10(批量发送)
□ batch.size=32768-65536(批次大小)
□ buffer.memory=33554432(缓冲区32MB)
Broker调优:
□ num.network.threads=CPU核数
□ num.io.threads=CPU核数×2
□ num.partitions=CPU核数×2
□ log.flush.interval.messages=10000
□ socket.send.buffer.bytes=102400
□ socket.receive.buffer.bytes=102400
消费者调优:
□ fetch.min.bytes=1024(最小拉取量)
□ fetch.max.wait.ms=500(最大等待)
□ max.partition.fetch.bytes=1048576(1MB)
□ max.poll.records=500(单次拉取条数)
□ enable.auto.commit=false(手动提交)
OS调优:
□ vm.swappiness=1(减少Swap)
□ 文件系统:XFS(推荐)
□ 磁盘:SSD(必须)
□ 网络:10GbE(推荐)
十、常见问题排查
| 问题 | 原因 | 解决方案 |
|---|
| :--- | :--- | :--- |
| 消息丢失 | acks=0/1 | 改为 acks=all + min.insync.replicas=2 |
| 消息重复 | 生产者重试+消费者未去重 | 幂等生产者 + 消费者幂等处理 |
| 消费延迟大 | 消费者处理慢 | 增加消费者实例 / 增加分区 |
| 分区不均衡 | Key分布不均 | 自定义分区器 / 增加分区数 |
| Leader切换频繁 | Broker不稳定 | 检查网络/磁盘/GC |
| 磁盘满 | retention设置过大 | 调整retention.bytes / retention.ms |