SRE 话题文档:Apache Kafka 集群运维

本文档面向生产环境,涵盖 Kafka 集群架构部署、性能调优、监控告警、故障排查等核心运维场景。


1. 生产环境部署架构

1.1 架构图(ASCII)

┌─────────────────────────────────────────────────────────────────────────────┐
│                        Kafka 生产集群架构                                    │
└─────────────────────────────────────────────────────────────────────────────┘

                              ┌─────────────────┐
                              │   Client Apps   │
                              │ (Producers/     │
                              │  Consumers)     │
                              └────────┬────────┘
                                       │
                    ┌──────────────────┼──────────────────┐
                    │                  │                  │
                    ▼                  ▼                  ▼
           ┌───────────────┐  ┌───────────────┐  ┌───────────────┐
           │  Broker-01    │  │  Broker-02    │  │  Broker-03    │
           │  (Leader)     │  │  (Follower)   │  │  (Follower)   │
           │               │  │               │  │               │
           │ ID: 1         │  │ ID: 2         │  │ ID: 3         │
           │ Port: 9092    │  │ Port: 9092    │  │ Port: 9092    │
           │ JMX: 9999     │  │ JMX: 9999     │  │ JMX: 9999     │
           └───────┬───────┘  └───────┬───────┘  └───────┬───────┘
                   │                  │                  │
                   └──────────────────┼──────────────────┘
                                      │
                              ┌───────┴───────┐
                              │   ZooKeeper   │
                              │  Ensemble     │
                              │ (3 or 5 nodes)│
                              └───────────────┘

┌─────────────────────────────────────────────────────────────────────────────┐
│  监控 & 管理组件                                                             │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐         │
│  │ Prometheus  │  │   Grafana   │  │ Kafka Export│  │  CMAK/UI    │        │
│  │  (监控)      │  │  (可视化)    │  │  (指标采集)  │  │  (管理界面)  │        │
│  └─────────────┘  └─────────────┘  └─────────────┘  └─────────────┘         │
└─────────────────────────────────────────────────────────────────────────────┘

1.2 Kubernetes 部署配置

# kafka-cluster.yaml - 使用 Strimzi Operator
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: kafka-cluster
  namespace: kafka
spec:
  kafka:
    version: 3.6.0
    replicas: 3
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: true
        authentication:
          type: tls
      - name: external
        port: 9094
        type: loadbalancer
        tls: false
    config:
      # 性能优化
      num.partitions: 3
      default.replication.factor: 3
      min.insync.replicas: 2

      # 日志保留
      log.retention.hours: 168
      log.retention.bytes: 1073741824
      log.segment.bytes: 1073741824

      # 网络优化
      socket.send.buffer.bytes: 102400
      socket.receive.buffer.bytes: 102400
      socket.request.max.bytes: 104857600

      # 副本优化
      num.network.threads: 8
      num.io.threads: 8
      background.threads: 10

      # GC 配置
      java.gc.log.options: "-Xlog:gc*:file=/var/log/kafka/gc.log:time,tags:filecount=10,filesize=100M"

    storage:
      type: jbod
      volumes:
        - id: 0
          type: persistent-claim
          size: 500Gi
          class: fast-ssd
          deleteClaim: false

    resources:
      requests:
        memory: "8Gi"
        cpu: "2"
      limits:
        memory: "16Gi"
        cpu: "4"

    jvmOptions:
      -Xms: "8g"
      -Xmx: "8g"
      -XX:
        UseG1GC: true
        MaxGCPauseMillis: 20
        InitiatingHeapOccupancyPercent: 35

    livenessProbe:
      initialDelaySeconds: 60
      timeoutSeconds: 10
    readinessProbe:
      initialDelaySeconds: 60
      timeoutSeconds: 10

  zookeeper:
    replicas: 3
    storage:
      type: persistent-claim
      size: 100Gi
      class: standard
    resources:
      requests:
        memory: "2Gi"
        cpu: "1"
      limits:
        memory: "4Gi"
        cpu: "2"

  entityOperator:
    topicOperator: {}
    userOperator: {}

---
# Kafka Topic 示例
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: orders-topic
  namespace: kafka
  labels:
    strimzi.io/cluster: kafka-cluster
spec:
  partitions: 12
  replicas: 3
  config:
    retention.ms: 604800000        # 7天
    segment.bytes: 1073741824      # 1GB
    cleanup.policy: delete
    min.insync.replicas: 2
    compression.type: lz4

---
# Kafka User 示例
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
  name: app-producer
  namespace: kafka
  labels:
    strimzi.io/cluster: kafka-cluster
spec:
  authentication:
    type: tls
  authorization:
    type: simple
    acls:
      - resource:
          type: topic
          name: orders-topic
        operations:
          - Write
          - Describe
      - resource:
          type: topic
          name: orders-topic
        host: "*"
        operations:
          - Read
          - Describe

1.3 Docker Compose 部署(开发/测试)

# docker-compose.yml - Kafka 开发环境
version: '3.8'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_LIMIT: 2
    volumes:
      - zookeeper-data:/var/lib/zookeeper/data
      - zookeeper-logs:/var/lib/zookeeper/log
    networks:
      - kafka-net
    healthcheck:
      test: ["CMD", "nc", "-z", "localhost", "2181"]
      interval: 10s
      timeout: 5s
      retries: 5

  kafka-1:
    image: confluentinc/cp-kafka:7.5.0
    container_name: kafka-1
    depends_on:
      zookeeper:
        condition: service_healthy
    ports:
      - "9092:9092"
      - "9999:9999"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_JMX_PORT: 9999
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_HEAP_OPTS: "-Xms2g -Xmx2g"
      KAFKA_LOG_DIRS: /var/lib/kafka/data
    volumes:
      - kafka-1-data:/var/lib/kafka/data
    networks:
      - kafka-net
    healthcheck:
      test: ["CMD", "kafka-broker-api-versions", "--bootstrap-server", "localhost:9092"]
      interval: 30s
      timeout: 10s
      retries: 5

  kafka-2:
    image: confluentinc/cp-kafka:7.5.0
    container_name: kafka-2
    depends_on:
      zookeeper:
        condition: service_healthy
    ports:
      - "9093:9092"
      - "10000:9999"
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-2:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_JMX_PORT: 9999
      KAFKA_HEAP_OPTS: "-Xms2g -Xmx2g"
    volumes:
      - kafka-2-data:/var/lib/kafka/data
    networks:
      - kafka-net

  kafka-3:
    image: confluentinc/cp-kafka:7.5.0
    container_name: kafka-3
    depends_on:
      zookeeper:
        condition: service_healthy
    ports:
      - "9094:9092"
      - "10001:9999"
    environment:
      KAFKA_BROKER_ID: 3
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-3:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_JMX_PORT: 9999
      KAFKA_HEAP_OPTS: "-Xms2g -Xmx2g"
    volumes:
      - kafka-3-data:/var/lib/kafka/data
    networks:
      - kafka-net

  kafka-exporter:
    image: danielqsj/kafka-exporter:latest
    container_name: kafka-exporter
    ports:
      - "9308:9308"
    environment:
      KAFKA_SERVER: kafka-1:9092,kafka-2:9092,kafka-3:9092
    networks:
      - kafka-net
    depends_on:
      - kafka-1
      - kafka-2
      - kafka-3

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    container_name: kafka-ui
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka-1:9092,kafka-2:9092,kafka-3:9092
      KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
    networks:
      - kafka-net
    depends_on:
      - kafka-1

volumes:
  zookeeper-data:
  zookeeper-logs:
  kafka-1-data:
  kafka-2-data:
  kafka-3-data:

networks:
  kafka-net:
    driver: bridge

2. 关键配置参数

2.1 Broker 核心配置

# ==============================================================================
# Kafka Broker 配置文件 - server.properties
# ==============================================================================

# ------------------------------------------------------------------------------
# 基础配置
# ------------------------------------------------------------------------------

# Broker 唯一标识(集群内唯一)
broker.id=1

# 监听地址
listeners=PLAINTEXT://0.0.0.0:9092,SSL://0.0.0.0:9093
advertised.listeners=PLAINTEXT://broker-01:9092,SSL://broker-01:9093

# ZooKeeper 连接
zookeeper.connect=zk1:2181,zk2:2181,zk3:2181/kafka

# 日志目录(多目录可分散 I/O)
log.dirs=/data/kafka/logs-1,/data/kafka/logs-2

# ------------------------------------------------------------------------------
# Topic 默认配置
# ------------------------------------------------------------------------------

# 默认分区数
num.partitions=3

# 默认副本因子
default.replication.factor=3

# 最小同步副本数(保障数据可靠性)
min.insync.replicas=2

# 自动创建 Topic(生产环境建议关闭)
auto.create.topics.enable=false

# ------------------------------------------------------------------------------
# 日志管理
# ------------------------------------------------------------------------------

# 日志保留时间(小时)
log.retention.hours=168

# 日志保留大小(字节)
log.retention.bytes=1073741824

# 日志段大小
log.segment.bytes=1073741824

# 日志段滚动检查间隔
log.retention.check.interval.ms=300000

# 清理策略(delete/compact)
log.cleanup.policy=delete

# 压缩主题配置
log.cleaner.enable=true
log.cleaner.threads=2
log.cleaner.dedupe.buffer.size=134217728

# ------------------------------------------------------------------------------
# 网络配置
# ------------------------------------------------------------------------------

# Socket 发送缓冲区
socket.send.buffer.bytes=102400

# Socket 接收缓冲区
socket.receive.buffer.bytes=102400

# Socket 请求最大字节数
socket.request.max.bytes=104857600

# 网络线程数
num.network.threads=8

# I/O 线程数
num.io.threads=8

# 后台线程数
background.threads=10

# ------------------------------------------------------------------------------
# 副本配置
# ------------------------------------------------------------------------------

# 副本拉取线程数
num.replica.fetchers=4

# 副本拉取最大等待时间
replica.fetch.wait.max.ms=500

# 副本拉取最小字节数
replica.fetch.min.bytes=1

# 副本拉取最大字节数
replica.fetch.max.bytes=1048576

# 副本滞后消息数(超过则从 ISR 移除)
replica.lag.max.messages=0

# 副本滞后时间(毫秒,超过则从 ISR 移除)
replica.lag.time.max.ms=30000

# ------------------------------------------------------------------------------
# 日志刷新配置
# ------------------------------------------------------------------------------

# 每隔多少消息刷新
log.flush.interval.messages=10000

# 每隔多久刷新
log.flush.interval.ms=1000

# 每隔多久检查是否需要刷新
log.flush.scheduler.interval.ms=3000

# ------------------------------------------------------------------------------
# 消费者组配置
# ------------------------------------------------------------------------------

# 消费者组初始再平衡延迟
group.initial.rebalance.delay.ms=3000

# 消费者组最小会话超时
group.min.session.timeout.ms=6000

# 消费者组最大会话超时
group.max.session.timeout.ms=300000

# ------------------------------------------------------------------------------
# 事务配置
# ------------------------------------------------------------------------------

# 事务主题副本数
transaction.state.log.replication.factor=3

# 事务主题最小 ISR
transaction.state.log.min.isr=2

# 事务超时时间
transaction.max.timeout.ms=900000

# ------------------------------------------------------------------------------
# 安全配置(SSL/SASL)
# ------------------------------------------------------------------------------

# SSL 配置
ssl.keystore.location=/etc/kafka/ssl/kafka.keystore.jks
ssl.keystore.password=changeit
ssl.key.password=changeit
ssl.truststore.location=/etc/kafka/ssl/kafka.truststore.jks
ssl.truststore.password=changeit

# SASL 配置
sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256
sasl.mechanism.inter.broker.protocol=PLAIN

# ACL 配置
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
allow.everyone.if.no.acl.found=false

2.2 生产者配置

# ==============================================================================
# Kafka Producer 配置
# ==============================================================================

# Bootstrap Servers
bootstrap.servers=broker-01:9092,broker-02:9092,broker-03:9092

# Key 序列化器
key.serializer=org.apache.kafka.common.serialization.StringSerializer

# Value 序列化器
value.serializer=org.apache.kafka.common.serialization.StringSerializer

# ------------------------------------------------------------------------------
# 可靠性配置
# ------------------------------------------------------------------------------

# ACK 配置(0/1/all 或 -1)
acks=all

# 重试次数
retries=2147483647

# 重试间隔
retry.backoff.ms=100

# 幂等性(防止重复)
enable.idempotence=true

# 事务配置
transactional.id=tx-producer-001

# ------------------------------------------------------------------------------
# 批处理配置
# ------------------------------------------------------------------------------

# 批次大小(字节)
batch.size=16384

# 批次等待时间(毫秒)
linger.ms=10

# 缓冲区大小
buffer.memory=33554432

# 最大请求大小
max.request.size=1048576

# ------------------------------------------------------------------------------
# 压缩配置
# ------------------------------------------------------------------------------

# 压缩类型(none/gzip/snappy/lz4/zstd)
compression.type=lz4

# ------------------------------------------------------------------------------
# 性能配置
# ------------------------------------------------------------------------------

# 最大阻塞时间
max.block.ms=60000

# 请求超时
request.timeout.ms=30000

# 交付超时
delivery.timeout.ms=120000

# 连接超时
connections.max.idle.ms=540000

# 元数据刷新
metadata.max.age.ms=300000

# ------------------------------------------------------------------------------
# 安全配置
# ------------------------------------------------------------------------------

# 安全协议
security.protocol=SASL_SSL

# SASL 机制
sasl.mechanism=SCRAM-SHA-256

# SASL JAAS 配置
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="producer" password="password";

# SSL 配置
ssl.truststore.location=/etc/kafka/ssl/kafka.truststore.jks
ssl.truststore.password=changeit

2.3 消费者配置

# ==============================================================================
# Kafka Consumer 配置
# ==============================================================================

# Bootstrap Servers
bootstrap.servers=broker-01:9092,broker-02:9092,broker-03:9092

# Key 反序列化器
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer

# Value 反序列化器
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

# ------------------------------------------------------------------------------
# 消费组配置
# ------------------------------------------------------------------------------

# 消费组 ID
group.id=consumer-group-001

# 自动提交
enable.auto.commit=false

# 自动提交间隔
auto.commit.interval.ms=5000

# 初始偏移量(earliest/latest)
auto.offset.reset=earliest

# 会话超时
session.timeout.ms=30000

# 心跳间隔
heartbeat.interval.ms=10000

# 最大轮询间隔
max.poll.interval.ms=300000

# 最大轮询记录数
max.poll.records=500

# ------------------------------------------------------------------------------
# 性能配置
# ------------------------------------------------------------------------------

# 获取最小字节数
fetch.min.bytes=1

# 获取最大字节数
fetch.max.bytes=52428800

# 单分区获取最大字节数
max.partition.fetch.bytes=1048576

# 获取等待时间
fetch.max.wait.ms=500

# ------------------------------------------------------------------------------
# 隔离级别
# ------------------------------------------------------------------------------

# 隔离级别(read_uncommitted/read_committed)
isolation.level=read_committed

# ------------------------------------------------------------------------------
# 安全配置
# ------------------------------------------------------------------------------

# 安全协议
security.protocol=SASL_SSL

# SASL 机制
sasl.mechanism=SCRAM-SHA-256

# SASL JAAS 配置
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="consumer" password="password";

3. 性能调优

3.1 JVM 调优

# ==============================================================================
# Kafka JVM 配置
# 启动参数:export KAFKA_HEAP_OPTS="-Xms8g -Xmx8g"
# ==============================================================================

# 推荐 JVM 参数
KAFKA_HEAP_OPTS="-Xms8g -Xmx8g"
KAFKA_JVM_PERFORMANCE_OPTS="-XX:+UseG1GC \
-XX:MaxGCPauseMillis=20 \
-XX:InitiatingHeapOccupancyPercent=35 \
-XX:+ExplicitGCInvokesConcurrent \
-XX:ParallelGCThreads=8 \
-XX:ConcGCThreads=2 \
-XX:+UseStringDeduplication \
-Djava.awt.headless=true"

# G1 GC 日志(JDK 11+)
KAFKA_GC_LOG_OPTS="-Xlog:gc*:file=/var/log/kafka/gc.log:time,tags:filecount=10,filesize=100M"

# JMX 监控
KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote=true \
-Dcom.sun.management.jmxremote.port=9999 \
-Dcom.sun.management.jmxremote.rmi.port=9999 \
-Dcom.sun.management.jmxremote.local.only=false \
-Dcom.sun.management.jmxremote.authenticate=false \
-Dcom.sun.management.jmxremote.ssl=false \
-Djava.rmi.server.hostname=broker-01"

# ==============================================================================
# JVM 调优建议
# ==============================================================================
# 1. 堆内存不超过 6GB(避免 G1 GC 长时间暂停)
# 2. 使用 G1 GC 而非 CMS
# 3. MaxGCPauseMillis 设为 20-50ms
# 4. InitiatingHeapOccupancyPercent 设为 35-45
# 5. 预留内存给操作系统用于 Page Cache
# 6. 堆内存 = 系统内存 * 25%-50%

3.2 操作系统调优

# ==============================================================================
# 文件描述符
# ==============================================================================
# Kafka 需要大量文件描述符(每个分区多个文件)
# 查看:ulimit -n
# 建议:至少 100000

# 临时设置
ulimit -n 100000

# 永久设置 - /etc/security/limits.conf
kafka soft nofile 100000
kafka hard nofile 100000

# ==============================================================================
# 虚拟内存
# ==============================================================================
# 降低 Swap 使用
sysctl -w vm.swappiness=1

# 脏页比例
sysctl -w vm.dirty_ratio=80
sysctl -w vm.dirty_background_ratio=5

# ==============================================================================
# 文件系统
# ==============================================================================
# 推荐:XFS(性能优于 ext4)
# 挂载选项:noatime,nodiratime

# 挂载示例
mount -t xfs -o noatime,nodiratime /dev/sdb1 /data/kafka

# /etc/fstab
/dev/sdb1 /data/kafka xfs noatime,nodiratime 0 0

# ==============================================================================
# 网络
# ==============================================================================
sysctl -w net.core.wmem_default=131072
sysctl -w net.core.rmem_default=131072
sysctl -w net.core.wmem_max=2097152
sysctl -w net.core.rmem_max=2097152
sysctl -w net.ipv4.tcp_wmem='4096 65536 2097152'
sysctl -w net.ipv4.tcp_rmem='4096 65536 2097152'

# ==============================================================================
# 磁盘 I/O 调度
# ==============================================================================
# SSD/NVMe:none 或 mq-deadline
echo none > /sys/block/nvme0n1/queue/scheduler

3.3 磁盘性能测试

# ==============================================================================
# 磁盘性能测试
# ==============================================================================

# 顺序写入测试(Kafka 主要是顺序写)
fio --name=seqwrite \
    --ioengine=libaio \
    --iodepth=32 \
    --numjobs=4 \
    --bs=1M \
    --size=10G \
    --rw=write \
    --direct=1 \
    --filename=/data/kafka/testfile \
    --group_reporting

# 顺序读取测试
fio --name=seqread \
    --ioengine=libaio \
    --iodepth=32 \
    --numjobs=4 \
    --bs=1M \
    --size=10G \
    --rw=read \
    --direct=1 \
    --filename=/data/kafka/testfile \
    --group_reporting

# 随机读取测试(消费者可能随机读)
fio --name=randread \
    --ioengine=libaio \
    --iodepth=16 \
    --numjobs=4 \
    --bs=4k \
    --size=10G \
    --rw=randread \
    --direct=1 \
    --filename=/data/kafka/testfile \
    --group_reporting

# Kafka 性能测试工具
# 生产者测试
kafka-producer-perf-test.sh \
  --topic test-perf \
  --num-records 1000000 \
  --record-size 1024 \
  --throughput -1 \
  --producer-props bootstrap.servers=broker-01:9092

# 消费者测试
kafka-consumer-perf-test.sh \
  --topic test-perf \
  --messages 1000000 \
  --bootstrap-server broker-01:9092

4. 监控与告警

4.1 Kafka Exporter 部署

# kafka-exporter-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: kafka-exporter
  namespace: monitoring
spec:
  replicas: 1
  selector:
    matchLabels:
      app: kafka-exporter
  template:
    metadata:
      labels:
        app: kafka-exporter
    spec:
      containers:
      - name: kafka-exporter
        image: danielqsj/kafka-exporter:latest
        args:
          - --kafka.server=broker-01:9092
          - --kafka.server=broker-02:9092
          - --kafka.server=broker-03:9092
          - --web.listen-address=:9308
          - --web.telemetry-path=/metrics
          - --log.level=info
          - --group.filter=".*"
          - --topic.filter=".*"
        ports:
        - containerPort: 9308
          name: metrics
        resources:
          requests:
            cpu: 100m
            memory: 128Mi
          limits:
            cpu: 500m
            memory: 512Mi
---
apiVersion: v1
kind: Service
metadata:
  name: kafka-exporter
  namespace: monitoring
spec:
  ports:
  - port: 9308
    targetPort: 9308
    name: metrics
  selector:
    app: kafka-exporter

4.2 Prometheus 配置

# prometheus-kafka.yml
scrape_configs:
  - job_name: 'kafka-exporter'
    static_configs:
      - targets: ['kafka-exporter:9308']
        labels:
          cluster: 'production'

  - job_name: 'kafka-jmx'
    static_configs:
      - targets:
          - 'broker-01:9999'
          - 'broker-02:9999'
          - 'broker-03:9999'
        labels:
          cluster: 'production'

4.3 告警规则

# kafka-alerts.yml
groups:
  - name: kafka-alerts
    rules:
      # Broker 宕机
      - alert: KafkaBrokerDown
        expr: kafka_brokers < 3
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "Kafka Broker 宕机"
          description: "Kafka 集群 Broker 数量 {{ $value }},少于预期的 3 个"

      # 副本不足
      - alert: KafkaUnderReplicatedPartitions
        expr: kafka_topic_partition_under_replicated_partition > 0
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Kafka 分区副本不足"
          description: "Topic {{ $labels.topic }} 分区 {{ $labels.partition }} 副本不足"

      # 消息积压
      - alert: KafkaConsumerLag
        expr: kafka_consumergroup_lag > 100000
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Kafka 消费者组积压"
          description: "消费者组 {{ $labels.consumergroup }} Topic {{ $labels.topic }} 积压 {{ $value }} 条消息"

      # 严重消息积压
      - alert: KafkaConsumerLagCritical
        expr: kafka_consumergroup_lag > 1000000
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "Kafka 消费者组严重积压"
          description: "消费者组 {{ $labels.consumergroup }} Topic {{ $labels.topic }} 积压 {{ $value }} 条消息,需立即处理"

      # 生产者发送失败
      - alert: KafkaProducerRecordSendRate
        expr: rate(kafka_producer_record_send_total[5m]) == 0
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Kafka 生产者发送速率为零"
          description: "生产者 {{ $labels.client_id }} 发送速率为零"

      # 离线分区
      - alert: KafkaOfflinePartitions
        expr: kafka_controller_offline_partitions_count > 0
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "Kafka 存在离线分区"
          description: "有 {{ $value }} 个分区处于离线状态"

      # 日志目录空间不足
      - alert: KafkaLogDirectorySpaceLow
        expr: (kafka_log_log_directory_size_bytes / kafka_log_log_directory_capacity_bytes) * 100 > 85
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Kafka 日志目录空间不足"
          description: "日志目录 {{ $labels.log_directory }} 使用率 {{ $value | printf \"%.1f\" }}%"

      # 请求处理时间过长
      - alert: KafkaRequestLatencyHigh
        expr: histogram_quantile(0.95, rate(kafka_network_request_latency_seconds_bucket[5m])) > 0.5
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Kafka 请求延迟过高"
          description: "Broker {{ $labels.instance }} 95 分位延迟 {{ $value | printf \"%.2f\" }}s"

      # 消费者组成员不稳定
      - alert: KafkaConsumerGroupMembersLow
        expr: kafka_consumergroup_members < 2
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Kafka 消费者组成员过少"
          description: "消费者组 {{ $labels.consumergroup }} 只有 {{ $value }} 个成员"

4.4 Grafana 面板 JSON

{
  "dashboard": {
    "title": "Kafka Cluster Monitoring",
    "uid": "kafka-cluster",
    "panels": [
      {
        "title": "Brokers Overview",
        "type": "stat",
        "gridPos": {"x": 0, "y": 0, "w": 4, "h": 4},
        "targets": [
          {"expr": "kafka_brokers", "legendFormat": "Active Brokers"}
        ]
      },
      {
        "title": "Topics Count",
        "type": "stat",
        "gridPos": {"x": 4, "y": 0, "w": 4, "h": 4},
        "targets": [
          {"expr": "count(kafka_topic_partition_current_offset)", "legendFormat": "Topics"}
        ]
      },
      {
        "title": "Under Replicated Partitions",
        "type": "stat",
        "gridPos": {"x": 8, "y": 0, "w": 4, "h": 4},
        "targets": [
          {"expr": "sum(kafka_topic_partition_under_replicated_partition)", "legendFormat": "Count"}
        ],
        "fieldConfig": {
          "defaults": {
            "thresholds": {
              "mode": "absolute",
              "steps": [
                {"color": "green", "value": 0},
                {"color": "red", "value": 1}
              ]
            }
          }
        }
      },
      {
        "title": "Offline Partitions",
        "type": "stat",
        "gridPos": {"x": 12, "y": 0, "w": 4, "h": 4},
        "targets": [
          {"expr": "kafka_controller_offline_partitions_count", "legendFormat": "Offline"}
        ],
        "fieldConfig": {
          "defaults": {
            "thresholds": {
              "mode": "absolute",
              "steps": [
                {"color": "green", "value": 0},
                {"color": "red", "value": 1}
              ]
            }
          }
        }
      },
      {
        "title": "Messages In/Out Rate",
        "type": "graph",
        "gridPos": {"x": 0, "y": 4, "w": 12, "h": 8},
        "targets": [
          {
            "expr": "sum(rate(kafka_topic_partition_current_offset[5m])) by (topic)",
            "legendFormat": "{{ topic }} In"
          },
          {
            "expr": "sum(rate(kafka_consumergroup_current_offset[5m])) by (topic)",
            "legendFormat": "{{ topic }} Out"
          }
        ]
      },
      {
        "title": "Consumer Group Lag",
        "type": "graph",
        "gridPos": {"x": 12, "y": 4, "w": 12, "h": 8},
        "targets": [
          {
            "expr": "kafka_consumergroup_lag",
            "legendFormat": "{{ consumergroup }} - {{ topic }}"
          }
        ]
      },
      {
        "title": "Bytes In/Out Rate",
        "type": "graph",
        "gridPos": {"x": 0, "y": 12, "w": 12, "h": 8},
        "targets": [
          {
            "expr": "rate(kafka_server_brokertopicmetrics_bytesin_total[5m]) / 1024 / 1024",
            "legendFormat": "{{ instance }} In MB/s"
          },
          {
            "expr": "rate(kafka_server_brokertopicmetrics_bytesout_total[5m]) / 1024 / 1024",
            "legendFormat": "{{ instance }} Out MB/s"
          }
        ]
      },
      {
        "title": "Request Latency",
        "type": "graph",
        "gridPos": {"x": 12, "y": 12, "w": 12, "h": 8},
        "targets": [
          {
            "expr": "histogram_quantile(0.50, rate(kafka_network_request_latency_seconds_bucket[5m]))",
            "legendFormat": "{{ instance }} p50"
          },
          {
            "expr": "histogram_quantile(0.95, rate(kafka_network_request_latency_seconds_bucket[5m]))",
            "legendFormat": "{{ instance }} p95"
          },
          {
            "expr": "histogram_quantile(0.99, rate(kafka_network_request_latency_seconds_bucket[5m]))",
            "legendFormat": "{{ instance }} p99"
          }
        ]
      }
    ]
  }
}

5. 常用运维命令

5.1 Topic 管理

# ==============================================================================
# Topic 管理命令
# ==============================================================================

# 创建 Topic
kafka-topics.sh --create \
  --bootstrap-server broker-01:9092 \
  --topic my-topic \
  --partitions 12 \
  --replication-factor 3 \
  --config retention.ms=604800000 \
  --config min.insync.replicas=2

# 查看 Topic 列表
kafka-topics.sh --list \
  --bootstrap-server broker-01:9092

# 查看 Topic 详情
kafka-topics.sh --describe \
  --bootstrap-server broker-01:9092 \
  --topic my-topic

# 修改 Topic 配置
kafka-configs.sh --alter \
  --bootstrap-server broker-01:9092 \
  --entity-type topics \
  --entity-name my-topic \
  --add-config retention.ms=259200000

# 增加 Topic 分区(只能增加,不能减少)
kafka-topics.sh --alter \
  --bootstrap-server broker-01:9092 \
  --topic my-topic \
  --partitions 24

# 删除 Topic(需开启 delete.topic.enable=true)
kafka-topics.sh --delete \
  --bootstrap-server broker-01:9092 \
  --topic my-topic

# 查看 Topic 配置
kafka-configs.sh --describe \
  --bootstrap-server broker-01:9092 \
  --entity-type topics \
  --entity-name my-topic

5.2 消费者组管理

# ==============================================================================
# 消费者组管理命令
# ==============================================================================

# 查看消费者组列表
kafka-consumer-groups.sh --list \
  --bootstrap-server broker-01:9092

# 查看消费者组详情
kafka-consumer-groups.sh --describe \
  --bootstrap-server broker-01:9092 \
  --group my-consumer-group

# 查看消费者组状态
kafka-consumer-groups.sh --describe \
  --bootstrap-server broker-01:9092 \
  --group my-consumer-group \
  --state

# 查看消费者组成员
kafka-consumer-groups.sh --describe \
  --bootstrap-server broker-01:9092 \
  --group my-consumer-group \
  --members \
  --verbose

# 重置消费者组偏移量(需停止消费者)
# 重置到最早
kafka-consumer-groups.sh --reset-offsets \
  --bootstrap-server broker-01:9092 \
  --group my-consumer-group \
  --topic my-topic \
  --to-earliest \
  --execute

# 重置到最新
kafka-consumer-groups.sh --reset-offsets \
  --bootstrap-server broker-01:9092 \
  --group my-consumer-group \
  --topic my-topic \
  --to-latest \
  --execute

# 重置到指定时间
kafka-consumer-groups.sh --reset-offsets \
  --bootstrap-server broker-01:9092 \
  --group my-consumer-group \
  --topic my-topic \
  --to-datetime 2024-01-01T00:00:00.000 \
  --execute

# 删除消费者组(需停止消费者)
kafka-consumer-groups.sh --delete \
  --bootstrap-server broker-01:9092 \
  --group my-consumer-group

5.3 消息生产与消费

# ==============================================================================
# 消息生产与消费命令
# ==============================================================================

# 控制台生产者
kafka-console-producer.sh \
  --bootstrap-server broker-01:9092 \
  --topic my-topic

# 控制台生产者(带 Key)
kafka-console-producer.sh \
  --bootstrap-server broker-01:9092 \
  --topic my-topic \
  --property parse.key=true \
  --property key.separator=:

# 控制台消费者(从最新开始)
kafka-console-consumer.sh \
  --bootstrap-server broker-01:9092 \
  --topic my-topic

# 控制台消费者(从最早开始)
kafka-console-consumer.sh \
  --bootstrap-server broker-01:9092 \
  --topic my-topic \
  --from-beginning

# 控制台消费者(显示 Key 和时间戳)
kafka-console-consumer.sh \
  --bootstrap-server broker-01:9092 \
  --topic my-topic \
  --property print.key=true \
  --property print.timestamp=true \
  --property key.separator=, \
  --from-beginning

# 指定分区消费
kafka-console-consumer.sh \
  --bootstrap-server broker-01:9092 \
  --topic my-topic \
  --partition 0 \
  --offset 100 \
  --max-messages 10

# 消费者组消费
kafka-console-consumer.sh \
  --bootstrap-server broker-01:9092 \
  --topic my-topic \
  --group test-consumer-group \
  --from-beginning

# 查看消息数量
kafka-run-class.sh kafka.tools.GetOffsetShell \
  --broker-list broker-01:9092 \
  --topic my-topic \
  --time -1

# 消费消息并计数
kafka-console-consumer.sh \
  --bootstrap-server broker-01:9092 \
  --topic my-topic \
  --from-beginning \
  --timeout-ms 5000 | wc -l

5.4 集群管理

# ==============================================================================
# 集群管理命令
# ==============================================================================

# 查看集群 ID
kafka-cluster.sh cluster-id \
  --bootstrap-server broker-01:9092

# 查看 Broker 列表
kafka-broker-api-versions.sh \
  --bootstrap-server broker-01:9092

# 查看集群元数据
kafka-metadata-quorum.sh describe \
  --bootstrap-server broker-01:9092

# 重新分配分区
# 生成迁移计划
kafka-reassign-partitions.sh --generate \
  --bootstrap-server broker-01:9092 \
  --topics-to-move-json-file topics.json \
  --broker-list "1,2,3"

# 执行迁移
kafka-reassign-partitions.sh --execute \
  --bootstrap-server broker-01:9092 \
  --reassignment-json-file move.json

# 验证迁移
kafka-reassign-partitions.sh --verify \
  --bootstrap-server broker-01:9092 \
  --reassignment-json-file move.json

# 优先副本选举
kafka-preferred-replica-election.sh \
  --bootstrap-server broker-01:9092

# 查看日志目录
kafka-log-dirs.sh --describe \
  --bootstrap-server broker-01:9092 \
  --broker-list 1,2,3

# 删除记录(删除指定 offset 之前的消息)
kafka-delete-records.sh \
  --bootstrap-server broker-01:9092 \
  --offset-json-file offsets.json

5.5 ACL 安全管理

# ==============================================================================
# ACL 安全管理命令
# ==============================================================================

# 创建用户(SCRAM)
kafka-configs.sh --alter \
  --bootstrap-server broker-01:9092 \
  --entity-type users \
  --entity-name producer-user \
  --add-config 'SCRAM-SHA-256=[password=producer-password]'

# 查看用户配置
kafka-configs.sh --describe \
  --bootstrap-server broker-01:9092 \
  --entity-type users \
  --entity-name producer-user

# 添加 ACL - 生产者权限
kafka-acls.sh --add \
  --bootstrap-server broker-01:9092 \
  --allow-principal User:producer-user \
  --operation Write \
  --operation Describe \
  --topic my-topic

# 添加 ACL - 消费者权限
kafka-acls.sh --add \
  --bootstrap-server broker-01:9092 \
  --allow-principal User:consumer-user \
  --operation Read \
  --operation Describe \
  --topic my-topic \
  --group my-consumer-group

# 查看 ACL 列表
kafka-acls.sh --list \
  --bootstrap-server broker-01:9092

# 删除 ACL
kafka-acls.sh --remove \
  --bootstrap-server broker-01:9092 \
  --allow-principal User:producer-user \
  --operation Write \
  --topic my-topic

# 超级用户配置(在 server.properties)
# super.users=User:admin;User:system

6. 故障排查案例

6.1 案例1:消费者组积压

# 场景:消费者组 lag 持续增长,无法追上生产速度

# 步骤1:确认积压情况
kafka-consumer-groups.sh --describe \
  --bootstrap-server broker-01:9092 \
  --group my-consumer-group

# 步骤2:检查消费者状态
kafka-consumer-groups.sh --describe \
  --bootstrap-server broker-01:9092 \
  --group my-consumer-group \
  --members \
  --verbose

# 步骤3:分析原因
# - 消费者数量少于分区数?
# - 消费者处理逻辑慢?
# - 消费者频繁 rebalance?
# - 消费者 GC 频繁?

# 解决方案:
# 1. 增加消费者实例(最多等于分区数)
# 2. 优化消费者处理逻辑
# 3. 调整 max.poll.records 和 max.poll.interval.ms
# 4. 增加消费者线程

# 步骤4:监控消费速率
kafka-consumer-perf-test.sh \
  --topic my-topic \
  --messages 10000 \
  --bootstrap-server broker-01:9092 \
  --group test-perf

6.2 案例2:副本同步延迟

# 场景:Under-replicated partitions 告警

# 步骤1:查看副本状态
kafka-topics.sh --describe \
  --bootstrap-server broker-01:9092 \
  --under-replicated-partitions

# 步骤2:检查 ISR 列表
kafka-topics.sh --describe \
  --bootstrap-server broker-01:9092 \
  --topic my-topic

# 步骤3:检查 Broker 状态
# - 是否有 Broker 宕机?
# - 网络是否正常?
# - 磁盘 I/O 是否正常?

# 步骤4:检查副本拉取速率
# JMX 指标:kafka.server:type=ReplicaFetcher,name=MaxLag

# 步骤5:调整参数
# 增加 replica.fetch.max.bytes
# 增加 num.replica.fetchers

# 步骤6:触发优先副本选举
kafka-preferred-replica-election.sh \
  --bootstrap-server broker-01:9092

6.3 案例3:Broker 宕机恢复

# 场景:Broker 宕机,需要恢复数据

# 步骤1:确认宕机 Broker
kafka-broker-api-versions.sh \
  --bootstrap-server broker-01:9092

# 步骤2:检查 Leader 分布
kafka-topics.sh --describe \
  --bootstrap-server broker-01:9092 | grep Leader:1

# 步骤3:恢复 Broker
# 启动 Broker 进程
systemctl start kafka

# 或者通过 systemd
kafka-server-start.sh -daemon /etc/kafka/server.properties

# 步骤4:监控恢复进度
# JMX 指标:kafka.controller:type=ControllerStats,name=OfflinePartitionsCount

# 步骤5:确认恢复完成
kafka-topics.sh --describe \
  --bootstrap-server broker-01:9092 \
  --topic my-topic

# 步骤6:触发优先副本选举(恢复 Leader 分布)
kafka-preferred-replica-election.sh \
  --bootstrap-server broker-01:9092

6.4 案例4:磁盘空间不足

# 场景:Kafka 日志目录磁盘使用率超过 90%

# 步骤1:查看磁盘使用
df -h /data/kafka
du -sh /data/kafka/*

# 步骤2:查看 Topic 大小
kafka-log-dirs.sh --describe \
  --bootstrap-server broker-01:9092 \
  --broker-list 1,2,3

# 步骤3:调整日志保留策略
kafka-configs.sh --alter \
  --bootstrap-server broker-01:9092 \
  --entity-type topics \
  --entity-name large-topic \
  --add-config retention.ms=86400000  # 1天

# 步骤4:手动删除旧日志段
# 停止 Broker
systemctl stop kafka

# 删除旧日志段
rm /data/kafka/logs-1/my-topic-0/00000000000000000000.log

# 启动 Broker
systemctl start kafka

# 步骤5:迁移分区到新磁盘
# 准备迁移计划 move.json
kafka-reassign-partitions.sh --execute \
  --bootstrap-server broker-01:9092 \
  --reassignment-json-file move.json

# 步骤6:长期解决方案
# - 增加磁盘容量
# - 添加新 Broker
# - 调整全局保留策略

7. 最佳实践

7.1 Topic 设计原则

原则 说明
分区数 根据吞吐量计算,建议不超过单 Broker 4000 分区
副本因子 生产环境建议 3,重要数据建议 3+
min.insync.replicas 建议 2,保障数据可靠性
保留策略 根据业务需求,建议 7-30 天
压缩 网络带宽受限时开启,推荐 lz4 或 zstd

7.2 性能调优清单

参数 推荐值 说明
batch.size 16384-65536 批次大小,影响吞吐
linger.ms 10-100 等待时间,平衡延迟与吞吐
compression.type lz4/zstd 压缩算法,节省带宽
buffer.memory 33554432 生产者缓冲区
num.io.threads 8+ Broker I/O 线程
num.network.threads 8+ Broker 网络线程
log.flush.interval 依赖 依赖副本机制,不建议频繁刷盘

7.3 监控指标优先级

优先级 指标 告警阈值
P0 Brokers 在线数 < 预期值
P0 离线分区数 > 0
P0 消息积压 > 100万
P1 副本不足分区数 > 0
P1 请求延迟 P99 > 500ms
P1 磁盘使用率 > 85%
P2 生产/消费速率 异常下降
P2 消费者组成员数 异常变化

8. 参考资料


文档版本: 1.0 更新日期: 2024-01-15 适用环境: Apache Kafka 3.x,Confluent Platform 7.x

results matching ""

    No results matching ""