SRE 话题文档:Apache Kafka 集群运维
本文档面向生产环境,涵盖 Kafka 集群架构部署、性能调优、监控告警、故障排查等核心运维场景。
1. 生产环境部署架构
1.1 架构图(ASCII)
┌─────────────────────────────────────────────────────────────────────────────┐
│ Kafka 生产集群架构 │
└─────────────────────────────────────────────────────────────────────────────┘
┌─────────────────┐
│ Client Apps │
│ (Producers/ │
│ Consumers) │
└────────┬────────┘
│
┌──────────────────┼──────────────────┐
│ │ │
▼ ▼ ▼
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ Broker-01 │ │ Broker-02 │ │ Broker-03 │
│ (Leader) │ │ (Follower) │ │ (Follower) │
│ │ │ │ │ │
│ ID: 1 │ │ ID: 2 │ │ ID: 3 │
│ Port: 9092 │ │ Port: 9092 │ │ Port: 9092 │
│ JMX: 9999 │ │ JMX: 9999 │ │ JMX: 9999 │
└───────┬───────┘ └───────┬───────┘ └───────┬───────┘
│ │ │
└──────────────────┼──────────────────┘
│
┌───────┴───────┐
│ ZooKeeper │
│ Ensemble │
│ (3 or 5 nodes)│
└───────────────┘
┌─────────────────────────────────────────────────────────────────────────────┐
│ 监控 & 管理组件 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Prometheus │ │ Grafana │ │ Kafka Export│ │ CMAK/UI │ │
│ │ (监控) │ │ (可视化) │ │ (指标采集) │ │ (管理界面) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────────────────────┘
1.2 Kubernetes 部署配置
# kafka-cluster.yaml - 使用 Strimzi Operator
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: kafka-cluster
namespace: kafka
spec:
kafka:
version: 3.6.0
replicas: 3
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
authentication:
type: tls
- name: external
port: 9094
type: loadbalancer
tls: false
config:
# 性能优化
num.partitions: 3
default.replication.factor: 3
min.insync.replicas: 2
# 日志保留
log.retention.hours: 168
log.retention.bytes: 1073741824
log.segment.bytes: 1073741824
# 网络优化
socket.send.buffer.bytes: 102400
socket.receive.buffer.bytes: 102400
socket.request.max.bytes: 104857600
# 副本优化
num.network.threads: 8
num.io.threads: 8
background.threads: 10
# GC 配置
java.gc.log.options: "-Xlog:gc*:file=/var/log/kafka/gc.log:time,tags:filecount=10,filesize=100M"
storage:
type: jbod
volumes:
- id: 0
type: persistent-claim
size: 500Gi
class: fast-ssd
deleteClaim: false
resources:
requests:
memory: "8Gi"
cpu: "2"
limits:
memory: "16Gi"
cpu: "4"
jvmOptions:
-Xms: "8g"
-Xmx: "8g"
-XX:
UseG1GC: true
MaxGCPauseMillis: 20
InitiatingHeapOccupancyPercent: 35
livenessProbe:
initialDelaySeconds: 60
timeoutSeconds: 10
readinessProbe:
initialDelaySeconds: 60
timeoutSeconds: 10
zookeeper:
replicas: 3
storage:
type: persistent-claim
size: 100Gi
class: standard
resources:
requests:
memory: "2Gi"
cpu: "1"
limits:
memory: "4Gi"
cpu: "2"
entityOperator:
topicOperator: {}
userOperator: {}
---
# Kafka Topic 示例
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: orders-topic
namespace: kafka
labels:
strimzi.io/cluster: kafka-cluster
spec:
partitions: 12
replicas: 3
config:
retention.ms: 604800000 # 7天
segment.bytes: 1073741824 # 1GB
cleanup.policy: delete
min.insync.replicas: 2
compression.type: lz4
---
# Kafka User 示例
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
name: app-producer
namespace: kafka
labels:
strimzi.io/cluster: kafka-cluster
spec:
authentication:
type: tls
authorization:
type: simple
acls:
- resource:
type: topic
name: orders-topic
operations:
- Write
- Describe
- resource:
type: topic
name: orders-topic
host: "*"
operations:
- Read
- Describe
1.3 Docker Compose 部署(开发/测试)
# docker-compose.yml - Kafka 开发环境
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_INIT_LIMIT: 5
ZOOKEEPER_SYNC_LIMIT: 2
volumes:
- zookeeper-data:/var/lib/zookeeper/data
- zookeeper-logs:/var/lib/zookeeper/log
networks:
- kafka-net
healthcheck:
test: ["CMD", "nc", "-z", "localhost", "2181"]
interval: 10s
timeout: 5s
retries: 5
kafka-1:
image: confluentinc/cp-kafka:7.5.0
container_name: kafka-1
depends_on:
zookeeper:
condition: service_healthy
ports:
- "9092:9092"
- "9999:9999"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_JMX_PORT: 9999
KAFKA_JMX_HOSTNAME: localhost
KAFKA_HEAP_OPTS: "-Xms2g -Xmx2g"
KAFKA_LOG_DIRS: /var/lib/kafka/data
volumes:
- kafka-1-data:/var/lib/kafka/data
networks:
- kafka-net
healthcheck:
test: ["CMD", "kafka-broker-api-versions", "--bootstrap-server", "localhost:9092"]
interval: 30s
timeout: 10s
retries: 5
kafka-2:
image: confluentinc/cp-kafka:7.5.0
container_name: kafka-2
depends_on:
zookeeper:
condition: service_healthy
ports:
- "9093:9092"
- "10000:9999"
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-2:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_JMX_PORT: 9999
KAFKA_HEAP_OPTS: "-Xms2g -Xmx2g"
volumes:
- kafka-2-data:/var/lib/kafka/data
networks:
- kafka-net
kafka-3:
image: confluentinc/cp-kafka:7.5.0
container_name: kafka-3
depends_on:
zookeeper:
condition: service_healthy
ports:
- "9094:9092"
- "10001:9999"
environment:
KAFKA_BROKER_ID: 3
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-3:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_JMX_PORT: 9999
KAFKA_HEAP_OPTS: "-Xms2g -Xmx2g"
volumes:
- kafka-3-data:/var/lib/kafka/data
networks:
- kafka-net
kafka-exporter:
image: danielqsj/kafka-exporter:latest
container_name: kafka-exporter
ports:
- "9308:9308"
environment:
KAFKA_SERVER: kafka-1:9092,kafka-2:9092,kafka-3:9092
networks:
- kafka-net
depends_on:
- kafka-1
- kafka-2
- kafka-3
kafka-ui:
image: provectuslabs/kafka-ui:latest
container_name: kafka-ui
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka-1:9092,kafka-2:9092,kafka-3:9092
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
networks:
- kafka-net
depends_on:
- kafka-1
volumes:
zookeeper-data:
zookeeper-logs:
kafka-1-data:
kafka-2-data:
kafka-3-data:
networks:
kafka-net:
driver: bridge
2. 关键配置参数
2.1 Broker 核心配置
# ==============================================================================
# Kafka Broker 配置文件 - server.properties
# ==============================================================================
# ------------------------------------------------------------------------------
# 基础配置
# ------------------------------------------------------------------------------
# Broker 唯一标识(集群内唯一)
broker.id=1
# 监听地址
listeners=PLAINTEXT://0.0.0.0:9092,SSL://0.0.0.0:9093
advertised.listeners=PLAINTEXT://broker-01:9092,SSL://broker-01:9093
# ZooKeeper 连接
zookeeper.connect=zk1:2181,zk2:2181,zk3:2181/kafka
# 日志目录(多目录可分散 I/O)
log.dirs=/data/kafka/logs-1,/data/kafka/logs-2
# ------------------------------------------------------------------------------
# Topic 默认配置
# ------------------------------------------------------------------------------
# 默认分区数
num.partitions=3
# 默认副本因子
default.replication.factor=3
# 最小同步副本数(保障数据可靠性)
min.insync.replicas=2
# 自动创建 Topic(生产环境建议关闭)
auto.create.topics.enable=false
# ------------------------------------------------------------------------------
# 日志管理
# ------------------------------------------------------------------------------
# 日志保留时间(小时)
log.retention.hours=168
# 日志保留大小(字节)
log.retention.bytes=1073741824
# 日志段大小
log.segment.bytes=1073741824
# 日志段滚动检查间隔
log.retention.check.interval.ms=300000
# 清理策略(delete/compact)
log.cleanup.policy=delete
# 压缩主题配置
log.cleaner.enable=true
log.cleaner.threads=2
log.cleaner.dedupe.buffer.size=134217728
# ------------------------------------------------------------------------------
# 网络配置
# ------------------------------------------------------------------------------
# Socket 发送缓冲区
socket.send.buffer.bytes=102400
# Socket 接收缓冲区
socket.receive.buffer.bytes=102400
# Socket 请求最大字节数
socket.request.max.bytes=104857600
# 网络线程数
num.network.threads=8
# I/O 线程数
num.io.threads=8
# 后台线程数
background.threads=10
# ------------------------------------------------------------------------------
# 副本配置
# ------------------------------------------------------------------------------
# 副本拉取线程数
num.replica.fetchers=4
# 副本拉取最大等待时间
replica.fetch.wait.max.ms=500
# 副本拉取最小字节数
replica.fetch.min.bytes=1
# 副本拉取最大字节数
replica.fetch.max.bytes=1048576
# 副本滞后消息数(超过则从 ISR 移除)
replica.lag.max.messages=0
# 副本滞后时间(毫秒,超过则从 ISR 移除)
replica.lag.time.max.ms=30000
# ------------------------------------------------------------------------------
# 日志刷新配置
# ------------------------------------------------------------------------------
# 每隔多少消息刷新
log.flush.interval.messages=10000
# 每隔多久刷新
log.flush.interval.ms=1000
# 每隔多久检查是否需要刷新
log.flush.scheduler.interval.ms=3000
# ------------------------------------------------------------------------------
# 消费者组配置
# ------------------------------------------------------------------------------
# 消费者组初始再平衡延迟
group.initial.rebalance.delay.ms=3000
# 消费者组最小会话超时
group.min.session.timeout.ms=6000
# 消费者组最大会话超时
group.max.session.timeout.ms=300000
# ------------------------------------------------------------------------------
# 事务配置
# ------------------------------------------------------------------------------
# 事务主题副本数
transaction.state.log.replication.factor=3
# 事务主题最小 ISR
transaction.state.log.min.isr=2
# 事务超时时间
transaction.max.timeout.ms=900000
# ------------------------------------------------------------------------------
# 安全配置(SSL/SASL)
# ------------------------------------------------------------------------------
# SSL 配置
ssl.keystore.location=/etc/kafka/ssl/kafka.keystore.jks
ssl.keystore.password=changeit
ssl.key.password=changeit
ssl.truststore.location=/etc/kafka/ssl/kafka.truststore.jks
ssl.truststore.password=changeit
# SASL 配置
sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256
sasl.mechanism.inter.broker.protocol=PLAIN
# ACL 配置
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
allow.everyone.if.no.acl.found=false
2.2 生产者配置
# ==============================================================================
# Kafka Producer 配置
# ==============================================================================
# Bootstrap Servers
bootstrap.servers=broker-01:9092,broker-02:9092,broker-03:9092
# Key 序列化器
key.serializer=org.apache.kafka.common.serialization.StringSerializer
# Value 序列化器
value.serializer=org.apache.kafka.common.serialization.StringSerializer
# ------------------------------------------------------------------------------
# 可靠性配置
# ------------------------------------------------------------------------------
# ACK 配置(0/1/all 或 -1)
acks=all
# 重试次数
retries=2147483647
# 重试间隔
retry.backoff.ms=100
# 幂等性(防止重复)
enable.idempotence=true
# 事务配置
transactional.id=tx-producer-001
# ------------------------------------------------------------------------------
# 批处理配置
# ------------------------------------------------------------------------------
# 批次大小(字节)
batch.size=16384
# 批次等待时间(毫秒)
linger.ms=10
# 缓冲区大小
buffer.memory=33554432
# 最大请求大小
max.request.size=1048576
# ------------------------------------------------------------------------------
# 压缩配置
# ------------------------------------------------------------------------------
# 压缩类型(none/gzip/snappy/lz4/zstd)
compression.type=lz4
# ------------------------------------------------------------------------------
# 性能配置
# ------------------------------------------------------------------------------
# 最大阻塞时间
max.block.ms=60000
# 请求超时
request.timeout.ms=30000
# 交付超时
delivery.timeout.ms=120000
# 连接超时
connections.max.idle.ms=540000
# 元数据刷新
metadata.max.age.ms=300000
# ------------------------------------------------------------------------------
# 安全配置
# ------------------------------------------------------------------------------
# 安全协议
security.protocol=SASL_SSL
# SASL 机制
sasl.mechanism=SCRAM-SHA-256
# SASL JAAS 配置
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="producer" password="password";
# SSL 配置
ssl.truststore.location=/etc/kafka/ssl/kafka.truststore.jks
ssl.truststore.password=changeit
2.3 消费者配置
# ==============================================================================
# Kafka Consumer 配置
# ==============================================================================
# Bootstrap Servers
bootstrap.servers=broker-01:9092,broker-02:9092,broker-03:9092
# Key 反序列化器
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
# Value 反序列化器
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
# ------------------------------------------------------------------------------
# 消费组配置
# ------------------------------------------------------------------------------
# 消费组 ID
group.id=consumer-group-001
# 自动提交
enable.auto.commit=false
# 自动提交间隔
auto.commit.interval.ms=5000
# 初始偏移量(earliest/latest)
auto.offset.reset=earliest
# 会话超时
session.timeout.ms=30000
# 心跳间隔
heartbeat.interval.ms=10000
# 最大轮询间隔
max.poll.interval.ms=300000
# 最大轮询记录数
max.poll.records=500
# ------------------------------------------------------------------------------
# 性能配置
# ------------------------------------------------------------------------------
# 获取最小字节数
fetch.min.bytes=1
# 获取最大字节数
fetch.max.bytes=52428800
# 单分区获取最大字节数
max.partition.fetch.bytes=1048576
# 获取等待时间
fetch.max.wait.ms=500
# ------------------------------------------------------------------------------
# 隔离级别
# ------------------------------------------------------------------------------
# 隔离级别(read_uncommitted/read_committed)
isolation.level=read_committed
# ------------------------------------------------------------------------------
# 安全配置
# ------------------------------------------------------------------------------
# 安全协议
security.protocol=SASL_SSL
# SASL 机制
sasl.mechanism=SCRAM-SHA-256
# SASL JAAS 配置
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="consumer" password="password";
3. 性能调优
3.1 JVM 调优
# ==============================================================================
# Kafka JVM 配置
# 启动参数:export KAFKA_HEAP_OPTS="-Xms8g -Xmx8g"
# ==============================================================================
# 推荐 JVM 参数
KAFKA_HEAP_OPTS="-Xms8g -Xmx8g"
KAFKA_JVM_PERFORMANCE_OPTS="-XX:+UseG1GC \
-XX:MaxGCPauseMillis=20 \
-XX:InitiatingHeapOccupancyPercent=35 \
-XX:+ExplicitGCInvokesConcurrent \
-XX:ParallelGCThreads=8 \
-XX:ConcGCThreads=2 \
-XX:+UseStringDeduplication \
-Djava.awt.headless=true"
# G1 GC 日志(JDK 11+)
KAFKA_GC_LOG_OPTS="-Xlog:gc*:file=/var/log/kafka/gc.log:time,tags:filecount=10,filesize=100M"
# JMX 监控
KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote=true \
-Dcom.sun.management.jmxremote.port=9999 \
-Dcom.sun.management.jmxremote.rmi.port=9999 \
-Dcom.sun.management.jmxremote.local.only=false \
-Dcom.sun.management.jmxremote.authenticate=false \
-Dcom.sun.management.jmxremote.ssl=false \
-Djava.rmi.server.hostname=broker-01"
# ==============================================================================
# JVM 调优建议
# ==============================================================================
# 1. 堆内存不超过 6GB(避免 G1 GC 长时间暂停)
# 2. 使用 G1 GC 而非 CMS
# 3. MaxGCPauseMillis 设为 20-50ms
# 4. InitiatingHeapOccupancyPercent 设为 35-45
# 5. 预留内存给操作系统用于 Page Cache
# 6. 堆内存 = 系统内存 * 25%-50%
3.2 操作系统调优
# ==============================================================================
# 文件描述符
# ==============================================================================
# Kafka 需要大量文件描述符(每个分区多个文件)
# 查看:ulimit -n
# 建议:至少 100000
# 临时设置
ulimit -n 100000
# 永久设置 - /etc/security/limits.conf
kafka soft nofile 100000
kafka hard nofile 100000
# ==============================================================================
# 虚拟内存
# ==============================================================================
# 降低 Swap 使用
sysctl -w vm.swappiness=1
# 脏页比例
sysctl -w vm.dirty_ratio=80
sysctl -w vm.dirty_background_ratio=5
# ==============================================================================
# 文件系统
# ==============================================================================
# 推荐:XFS(性能优于 ext4)
# 挂载选项:noatime,nodiratime
# 挂载示例
mount -t xfs -o noatime,nodiratime /dev/sdb1 /data/kafka
# /etc/fstab
/dev/sdb1 /data/kafka xfs noatime,nodiratime 0 0
# ==============================================================================
# 网络
# ==============================================================================
sysctl -w net.core.wmem_default=131072
sysctl -w net.core.rmem_default=131072
sysctl -w net.core.wmem_max=2097152
sysctl -w net.core.rmem_max=2097152
sysctl -w net.ipv4.tcp_wmem='4096 65536 2097152'
sysctl -w net.ipv4.tcp_rmem='4096 65536 2097152'
# ==============================================================================
# 磁盘 I/O 调度
# ==============================================================================
# SSD/NVMe:none 或 mq-deadline
echo none > /sys/block/nvme0n1/queue/scheduler
3.3 磁盘性能测试
# ==============================================================================
# 磁盘性能测试
# ==============================================================================
# 顺序写入测试(Kafka 主要是顺序写)
fio --name=seqwrite \
--ioengine=libaio \
--iodepth=32 \
--numjobs=4 \
--bs=1M \
--size=10G \
--rw=write \
--direct=1 \
--filename=/data/kafka/testfile \
--group_reporting
# 顺序读取测试
fio --name=seqread \
--ioengine=libaio \
--iodepth=32 \
--numjobs=4 \
--bs=1M \
--size=10G \
--rw=read \
--direct=1 \
--filename=/data/kafka/testfile \
--group_reporting
# 随机读取测试(消费者可能随机读)
fio --name=randread \
--ioengine=libaio \
--iodepth=16 \
--numjobs=4 \
--bs=4k \
--size=10G \
--rw=randread \
--direct=1 \
--filename=/data/kafka/testfile \
--group_reporting
# Kafka 性能测试工具
# 生产者测试
kafka-producer-perf-test.sh \
--topic test-perf \
--num-records 1000000 \
--record-size 1024 \
--throughput -1 \
--producer-props bootstrap.servers=broker-01:9092
# 消费者测试
kafka-consumer-perf-test.sh \
--topic test-perf \
--messages 1000000 \
--bootstrap-server broker-01:9092
4. 监控与告警
4.1 Kafka Exporter 部署
# kafka-exporter-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-exporter
namespace: monitoring
spec:
replicas: 1
selector:
matchLabels:
app: kafka-exporter
template:
metadata:
labels:
app: kafka-exporter
spec:
containers:
- name: kafka-exporter
image: danielqsj/kafka-exporter:latest
args:
- --kafka.server=broker-01:9092
- --kafka.server=broker-02:9092
- --kafka.server=broker-03:9092
- --web.listen-address=:9308
- --web.telemetry-path=/metrics
- --log.level=info
- --group.filter=".*"
- --topic.filter=".*"
ports:
- containerPort: 9308
name: metrics
resources:
requests:
cpu: 100m
memory: 128Mi
limits:
cpu: 500m
memory: 512Mi
---
apiVersion: v1
kind: Service
metadata:
name: kafka-exporter
namespace: monitoring
spec:
ports:
- port: 9308
targetPort: 9308
name: metrics
selector:
app: kafka-exporter
4.2 Prometheus 配置
# prometheus-kafka.yml
scrape_configs:
- job_name: 'kafka-exporter'
static_configs:
- targets: ['kafka-exporter:9308']
labels:
cluster: 'production'
- job_name: 'kafka-jmx'
static_configs:
- targets:
- 'broker-01:9999'
- 'broker-02:9999'
- 'broker-03:9999'
labels:
cluster: 'production'
4.3 告警规则
# kafka-alerts.yml
groups:
- name: kafka-alerts
rules:
# Broker 宕机
- alert: KafkaBrokerDown
expr: kafka_brokers < 3
for: 1m
labels:
severity: critical
annotations:
summary: "Kafka Broker 宕机"
description: "Kafka 集群 Broker 数量 {{ $value }},少于预期的 3 个"
# 副本不足
- alert: KafkaUnderReplicatedPartitions
expr: kafka_topic_partition_under_replicated_partition > 0
for: 5m
labels:
severity: warning
annotations:
summary: "Kafka 分区副本不足"
description: "Topic {{ $labels.topic }} 分区 {{ $labels.partition }} 副本不足"
# 消息积压
- alert: KafkaConsumerLag
expr: kafka_consumergroup_lag > 100000
for: 5m
labels:
severity: warning
annotations:
summary: "Kafka 消费者组积压"
description: "消费者组 {{ $labels.consumergroup }} Topic {{ $labels.topic }} 积压 {{ $value }} 条消息"
# 严重消息积压
- alert: KafkaConsumerLagCritical
expr: kafka_consumergroup_lag > 1000000
for: 5m
labels:
severity: critical
annotations:
summary: "Kafka 消费者组严重积压"
description: "消费者组 {{ $labels.consumergroup }} Topic {{ $labels.topic }} 积压 {{ $value }} 条消息,需立即处理"
# 生产者发送失败
- alert: KafkaProducerRecordSendRate
expr: rate(kafka_producer_record_send_total[5m]) == 0
for: 5m
labels:
severity: warning
annotations:
summary: "Kafka 生产者发送速率为零"
description: "生产者 {{ $labels.client_id }} 发送速率为零"
# 离线分区
- alert: KafkaOfflinePartitions
expr: kafka_controller_offline_partitions_count > 0
for: 1m
labels:
severity: critical
annotations:
summary: "Kafka 存在离线分区"
description: "有 {{ $value }} 个分区处于离线状态"
# 日志目录空间不足
- alert: KafkaLogDirectorySpaceLow
expr: (kafka_log_log_directory_size_bytes / kafka_log_log_directory_capacity_bytes) * 100 > 85
for: 5m
labels:
severity: warning
annotations:
summary: "Kafka 日志目录空间不足"
description: "日志目录 {{ $labels.log_directory }} 使用率 {{ $value | printf \"%.1f\" }}%"
# 请求处理时间过长
- alert: KafkaRequestLatencyHigh
expr: histogram_quantile(0.95, rate(kafka_network_request_latency_seconds_bucket[5m])) > 0.5
for: 5m
labels:
severity: warning
annotations:
summary: "Kafka 请求延迟过高"
description: "Broker {{ $labels.instance }} 95 分位延迟 {{ $value | printf \"%.2f\" }}s"
# 消费者组成员不稳定
- alert: KafkaConsumerGroupMembersLow
expr: kafka_consumergroup_members < 2
for: 5m
labels:
severity: warning
annotations:
summary: "Kafka 消费者组成员过少"
description: "消费者组 {{ $labels.consumergroup }} 只有 {{ $value }} 个成员"
4.4 Grafana 面板 JSON
{
"dashboard": {
"title": "Kafka Cluster Monitoring",
"uid": "kafka-cluster",
"panels": [
{
"title": "Brokers Overview",
"type": "stat",
"gridPos": {"x": 0, "y": 0, "w": 4, "h": 4},
"targets": [
{"expr": "kafka_brokers", "legendFormat": "Active Brokers"}
]
},
{
"title": "Topics Count",
"type": "stat",
"gridPos": {"x": 4, "y": 0, "w": 4, "h": 4},
"targets": [
{"expr": "count(kafka_topic_partition_current_offset)", "legendFormat": "Topics"}
]
},
{
"title": "Under Replicated Partitions",
"type": "stat",
"gridPos": {"x": 8, "y": 0, "w": 4, "h": 4},
"targets": [
{"expr": "sum(kafka_topic_partition_under_replicated_partition)", "legendFormat": "Count"}
],
"fieldConfig": {
"defaults": {
"thresholds": {
"mode": "absolute",
"steps": [
{"color": "green", "value": 0},
{"color": "red", "value": 1}
]
}
}
}
},
{
"title": "Offline Partitions",
"type": "stat",
"gridPos": {"x": 12, "y": 0, "w": 4, "h": 4},
"targets": [
{"expr": "kafka_controller_offline_partitions_count", "legendFormat": "Offline"}
],
"fieldConfig": {
"defaults": {
"thresholds": {
"mode": "absolute",
"steps": [
{"color": "green", "value": 0},
{"color": "red", "value": 1}
]
}
}
}
},
{
"title": "Messages In/Out Rate",
"type": "graph",
"gridPos": {"x": 0, "y": 4, "w": 12, "h": 8},
"targets": [
{
"expr": "sum(rate(kafka_topic_partition_current_offset[5m])) by (topic)",
"legendFormat": "{{ topic }} In"
},
{
"expr": "sum(rate(kafka_consumergroup_current_offset[5m])) by (topic)",
"legendFormat": "{{ topic }} Out"
}
]
},
{
"title": "Consumer Group Lag",
"type": "graph",
"gridPos": {"x": 12, "y": 4, "w": 12, "h": 8},
"targets": [
{
"expr": "kafka_consumergroup_lag",
"legendFormat": "{{ consumergroup }} - {{ topic }}"
}
]
},
{
"title": "Bytes In/Out Rate",
"type": "graph",
"gridPos": {"x": 0, "y": 12, "w": 12, "h": 8},
"targets": [
{
"expr": "rate(kafka_server_brokertopicmetrics_bytesin_total[5m]) / 1024 / 1024",
"legendFormat": "{{ instance }} In MB/s"
},
{
"expr": "rate(kafka_server_brokertopicmetrics_bytesout_total[5m]) / 1024 / 1024",
"legendFormat": "{{ instance }} Out MB/s"
}
]
},
{
"title": "Request Latency",
"type": "graph",
"gridPos": {"x": 12, "y": 12, "w": 12, "h": 8},
"targets": [
{
"expr": "histogram_quantile(0.50, rate(kafka_network_request_latency_seconds_bucket[5m]))",
"legendFormat": "{{ instance }} p50"
},
{
"expr": "histogram_quantile(0.95, rate(kafka_network_request_latency_seconds_bucket[5m]))",
"legendFormat": "{{ instance }} p95"
},
{
"expr": "histogram_quantile(0.99, rate(kafka_network_request_latency_seconds_bucket[5m]))",
"legendFormat": "{{ instance }} p99"
}
]
}
]
}
}
5. 常用运维命令
5.1 Topic 管理
# ==============================================================================
# Topic 管理命令
# ==============================================================================
# 创建 Topic
kafka-topics.sh --create \
--bootstrap-server broker-01:9092 \
--topic my-topic \
--partitions 12 \
--replication-factor 3 \
--config retention.ms=604800000 \
--config min.insync.replicas=2
# 查看 Topic 列表
kafka-topics.sh --list \
--bootstrap-server broker-01:9092
# 查看 Topic 详情
kafka-topics.sh --describe \
--bootstrap-server broker-01:9092 \
--topic my-topic
# 修改 Topic 配置
kafka-configs.sh --alter \
--bootstrap-server broker-01:9092 \
--entity-type topics \
--entity-name my-topic \
--add-config retention.ms=259200000
# 增加 Topic 分区(只能增加,不能减少)
kafka-topics.sh --alter \
--bootstrap-server broker-01:9092 \
--topic my-topic \
--partitions 24
# 删除 Topic(需开启 delete.topic.enable=true)
kafka-topics.sh --delete \
--bootstrap-server broker-01:9092 \
--topic my-topic
# 查看 Topic 配置
kafka-configs.sh --describe \
--bootstrap-server broker-01:9092 \
--entity-type topics \
--entity-name my-topic
5.2 消费者组管理
# ==============================================================================
# 消费者组管理命令
# ==============================================================================
# 查看消费者组列表
kafka-consumer-groups.sh --list \
--bootstrap-server broker-01:9092
# 查看消费者组详情
kafka-consumer-groups.sh --describe \
--bootstrap-server broker-01:9092 \
--group my-consumer-group
# 查看消费者组状态
kafka-consumer-groups.sh --describe \
--bootstrap-server broker-01:9092 \
--group my-consumer-group \
--state
# 查看消费者组成员
kafka-consumer-groups.sh --describe \
--bootstrap-server broker-01:9092 \
--group my-consumer-group \
--members \
--verbose
# 重置消费者组偏移量(需停止消费者)
# 重置到最早
kafka-consumer-groups.sh --reset-offsets \
--bootstrap-server broker-01:9092 \
--group my-consumer-group \
--topic my-topic \
--to-earliest \
--execute
# 重置到最新
kafka-consumer-groups.sh --reset-offsets \
--bootstrap-server broker-01:9092 \
--group my-consumer-group \
--topic my-topic \
--to-latest \
--execute
# 重置到指定时间
kafka-consumer-groups.sh --reset-offsets \
--bootstrap-server broker-01:9092 \
--group my-consumer-group \
--topic my-topic \
--to-datetime 2024-01-01T00:00:00.000 \
--execute
# 删除消费者组(需停止消费者)
kafka-consumer-groups.sh --delete \
--bootstrap-server broker-01:9092 \
--group my-consumer-group
5.3 消息生产与消费
# ==============================================================================
# 消息生产与消费命令
# ==============================================================================
# 控制台生产者
kafka-console-producer.sh \
--bootstrap-server broker-01:9092 \
--topic my-topic
# 控制台生产者(带 Key)
kafka-console-producer.sh \
--bootstrap-server broker-01:9092 \
--topic my-topic \
--property parse.key=true \
--property key.separator=:
# 控制台消费者(从最新开始)
kafka-console-consumer.sh \
--bootstrap-server broker-01:9092 \
--topic my-topic
# 控制台消费者(从最早开始)
kafka-console-consumer.sh \
--bootstrap-server broker-01:9092 \
--topic my-topic \
--from-beginning
# 控制台消费者(显示 Key 和时间戳)
kafka-console-consumer.sh \
--bootstrap-server broker-01:9092 \
--topic my-topic \
--property print.key=true \
--property print.timestamp=true \
--property key.separator=, \
--from-beginning
# 指定分区消费
kafka-console-consumer.sh \
--bootstrap-server broker-01:9092 \
--topic my-topic \
--partition 0 \
--offset 100 \
--max-messages 10
# 消费者组消费
kafka-console-consumer.sh \
--bootstrap-server broker-01:9092 \
--topic my-topic \
--group test-consumer-group \
--from-beginning
# 查看消息数量
kafka-run-class.sh kafka.tools.GetOffsetShell \
--broker-list broker-01:9092 \
--topic my-topic \
--time -1
# 消费消息并计数
kafka-console-consumer.sh \
--bootstrap-server broker-01:9092 \
--topic my-topic \
--from-beginning \
--timeout-ms 5000 | wc -l
5.4 集群管理
# ==============================================================================
# 集群管理命令
# ==============================================================================
# 查看集群 ID
kafka-cluster.sh cluster-id \
--bootstrap-server broker-01:9092
# 查看 Broker 列表
kafka-broker-api-versions.sh \
--bootstrap-server broker-01:9092
# 查看集群元数据
kafka-metadata-quorum.sh describe \
--bootstrap-server broker-01:9092
# 重新分配分区
# 生成迁移计划
kafka-reassign-partitions.sh --generate \
--bootstrap-server broker-01:9092 \
--topics-to-move-json-file topics.json \
--broker-list "1,2,3"
# 执行迁移
kafka-reassign-partitions.sh --execute \
--bootstrap-server broker-01:9092 \
--reassignment-json-file move.json
# 验证迁移
kafka-reassign-partitions.sh --verify \
--bootstrap-server broker-01:9092 \
--reassignment-json-file move.json
# 优先副本选举
kafka-preferred-replica-election.sh \
--bootstrap-server broker-01:9092
# 查看日志目录
kafka-log-dirs.sh --describe \
--bootstrap-server broker-01:9092 \
--broker-list 1,2,3
# 删除记录(删除指定 offset 之前的消息)
kafka-delete-records.sh \
--bootstrap-server broker-01:9092 \
--offset-json-file offsets.json
5.5 ACL 安全管理
# ==============================================================================
# ACL 安全管理命令
# ==============================================================================
# 创建用户(SCRAM)
kafka-configs.sh --alter \
--bootstrap-server broker-01:9092 \
--entity-type users \
--entity-name producer-user \
--add-config 'SCRAM-SHA-256=[password=producer-password]'
# 查看用户配置
kafka-configs.sh --describe \
--bootstrap-server broker-01:9092 \
--entity-type users \
--entity-name producer-user
# 添加 ACL - 生产者权限
kafka-acls.sh --add \
--bootstrap-server broker-01:9092 \
--allow-principal User:producer-user \
--operation Write \
--operation Describe \
--topic my-topic
# 添加 ACL - 消费者权限
kafka-acls.sh --add \
--bootstrap-server broker-01:9092 \
--allow-principal User:consumer-user \
--operation Read \
--operation Describe \
--topic my-topic \
--group my-consumer-group
# 查看 ACL 列表
kafka-acls.sh --list \
--bootstrap-server broker-01:9092
# 删除 ACL
kafka-acls.sh --remove \
--bootstrap-server broker-01:9092 \
--allow-principal User:producer-user \
--operation Write \
--topic my-topic
# 超级用户配置(在 server.properties)
# super.users=User:admin;User:system
6. 故障排查案例
6.1 案例1:消费者组积压
# 场景:消费者组 lag 持续增长,无法追上生产速度
# 步骤1:确认积压情况
kafka-consumer-groups.sh --describe \
--bootstrap-server broker-01:9092 \
--group my-consumer-group
# 步骤2:检查消费者状态
kafka-consumer-groups.sh --describe \
--bootstrap-server broker-01:9092 \
--group my-consumer-group \
--members \
--verbose
# 步骤3:分析原因
# - 消费者数量少于分区数?
# - 消费者处理逻辑慢?
# - 消费者频繁 rebalance?
# - 消费者 GC 频繁?
# 解决方案:
# 1. 增加消费者实例(最多等于分区数)
# 2. 优化消费者处理逻辑
# 3. 调整 max.poll.records 和 max.poll.interval.ms
# 4. 增加消费者线程
# 步骤4:监控消费速率
kafka-consumer-perf-test.sh \
--topic my-topic \
--messages 10000 \
--bootstrap-server broker-01:9092 \
--group test-perf
6.2 案例2:副本同步延迟
# 场景:Under-replicated partitions 告警
# 步骤1:查看副本状态
kafka-topics.sh --describe \
--bootstrap-server broker-01:9092 \
--under-replicated-partitions
# 步骤2:检查 ISR 列表
kafka-topics.sh --describe \
--bootstrap-server broker-01:9092 \
--topic my-topic
# 步骤3:检查 Broker 状态
# - 是否有 Broker 宕机?
# - 网络是否正常?
# - 磁盘 I/O 是否正常?
# 步骤4:检查副本拉取速率
# JMX 指标:kafka.server:type=ReplicaFetcher,name=MaxLag
# 步骤5:调整参数
# 增加 replica.fetch.max.bytes
# 增加 num.replica.fetchers
# 步骤6:触发优先副本选举
kafka-preferred-replica-election.sh \
--bootstrap-server broker-01:9092
6.3 案例3:Broker 宕机恢复
# 场景:Broker 宕机,需要恢复数据
# 步骤1:确认宕机 Broker
kafka-broker-api-versions.sh \
--bootstrap-server broker-01:9092
# 步骤2:检查 Leader 分布
kafka-topics.sh --describe \
--bootstrap-server broker-01:9092 | grep Leader:1
# 步骤3:恢复 Broker
# 启动 Broker 进程
systemctl start kafka
# 或者通过 systemd
kafka-server-start.sh -daemon /etc/kafka/server.properties
# 步骤4:监控恢复进度
# JMX 指标:kafka.controller:type=ControllerStats,name=OfflinePartitionsCount
# 步骤5:确认恢复完成
kafka-topics.sh --describe \
--bootstrap-server broker-01:9092 \
--topic my-topic
# 步骤6:触发优先副本选举(恢复 Leader 分布)
kafka-preferred-replica-election.sh \
--bootstrap-server broker-01:9092
6.4 案例4:磁盘空间不足
# 场景:Kafka 日志目录磁盘使用率超过 90%
# 步骤1:查看磁盘使用
df -h /data/kafka
du -sh /data/kafka/*
# 步骤2:查看 Topic 大小
kafka-log-dirs.sh --describe \
--bootstrap-server broker-01:9092 \
--broker-list 1,2,3
# 步骤3:调整日志保留策略
kafka-configs.sh --alter \
--bootstrap-server broker-01:9092 \
--entity-type topics \
--entity-name large-topic \
--add-config retention.ms=86400000 # 1天
# 步骤4:手动删除旧日志段
# 停止 Broker
systemctl stop kafka
# 删除旧日志段
rm /data/kafka/logs-1/my-topic-0/00000000000000000000.log
# 启动 Broker
systemctl start kafka
# 步骤5:迁移分区到新磁盘
# 准备迁移计划 move.json
kafka-reassign-partitions.sh --execute \
--bootstrap-server broker-01:9092 \
--reassignment-json-file move.json
# 步骤6:长期解决方案
# - 增加磁盘容量
# - 添加新 Broker
# - 调整全局保留策略
7. 最佳实践
7.1 Topic 设计原则
| 原则 |
说明 |
| 分区数 |
根据吞吐量计算,建议不超过单 Broker 4000 分区 |
| 副本因子 |
生产环境建议 3,重要数据建议 3+ |
| min.insync.replicas |
建议 2,保障数据可靠性 |
| 保留策略 |
根据业务需求,建议 7-30 天 |
| 压缩 |
网络带宽受限时开启,推荐 lz4 或 zstd |
7.2 性能调优清单
| 参数 |
推荐值 |
说明 |
| batch.size |
16384-65536 |
批次大小,影响吞吐 |
| linger.ms |
10-100 |
等待时间,平衡延迟与吞吐 |
| compression.type |
lz4/zstd |
压缩算法,节省带宽 |
| buffer.memory |
33554432 |
生产者缓冲区 |
| num.io.threads |
8+ |
Broker I/O 线程 |
| num.network.threads |
8+ |
Broker 网络线程 |
| log.flush.interval |
依赖 |
依赖副本机制,不建议频繁刷盘 |
7.3 监控指标优先级
| 优先级 |
指标 |
告警阈值 |
| P0 |
Brokers 在线数 |
< 预期值 |
| P0 |
离线分区数 |
> 0 |
| P0 |
消息积压 |
> 100万 |
| P1 |
副本不足分区数 |
> 0 |
| P1 |
请求延迟 P99 |
> 500ms |
| P1 |
磁盘使用率 |
> 85% |
| P2 |
生产/消费速率 |
异常下降 |
| P2 |
消费者组成员数 |
异常变化 |
8. 参考资料
文档版本: 1.0
更新日期: 2024-01-15
适用环境: Apache Kafka 3.x,Confluent Platform 7.x