常用命令

日志组

  • 查看日志组列表
./bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server 127.0.0.1:9092 --list
  • 删除组

组还在使用删除会报错

./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.32.188:9092 --group mygroup --delete
  • 查看堆积
./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.13.25:9092 --group mygroup --describe

topic

  • 查看 topic 内消息数目
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.13.33:9092  --topic  mytopic --time -1 --offsets 1 | awk -F  ":" '{sum += $3} END {print sum}'
  • 动态修改队列数据保存时间ms(不用重启)默认7天 604800000

修改后数据就保存时间短了。不会消费之前的数据。但是查看堆积还是有那么多数据量的。

 ./bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 -topic test --alter --config retention.ms=1000
  • producer:
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
  • comsumer:
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
  • 重置偏移量:

停止消费者,将消费组所有分区的位移调整为为最新位移

./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.100.81:9092,192.168.100.82:9092 --group mygroup --reset-offsets --all-topics --to-latest --execute
  • 删除topic
./bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test
  • 创建topic:或者直接查看,其他参数就默认。也会新建
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
  • 查看topic
./bin/kafka-topics.sh --list --zookeeper localhost:2181
  • 删除topic
./kafka-topics.sh --zookeeper node1:2181 --delete --topic test

分区迁移

针对扩容或则缩容,对应调整一下分区ID即可,json文件设置处理的 topic

{
    "topics":
    [{
            "topic": "mytopic"
        }
    ],
    "version": 1
}

迁移命令

./bin/kafka-reassign-partitions.sh --zookeeper 192.168.32.187:2181 --topics-to-move-json-file topic.json  --broker-list "1,2,3,4,5" --generate

查看结果,可以 grep 一下结果 success 对比一下设置的分区知道执行百分比

./bin/kafka-reassign-partitions.sh --zookeeper 192.168.32.187:2181 --reassignment-json-file topic.json  --verify  

取消迁移

$ bin/kafka-run-class.sh kafka.admin.ShutdownBroker --zookeeper 192.168.2.225:2181 --broker ${brokerId} --num.retries 3 --retry.interval.ms 60 

启动配置

  • 启动
./bin/kafka-server-start.sh config/server.properties
cd /root/linqin;nohup sh ./bin/kafka-server-start.sh ./config/server.properties > kafka.log & 2>&1
  • 停止:
./bin/kafka-server-stop.sh ./config/server.properties 
  • 集群配置:
config/server-1.properties:
    broker.id=1
    listeners=PLAINTEXT://:9093
    log.dirs=/tmp/kafka-logs-1

config/server-2.properties:
    broker.id=2
    listeners=PLAINTEXT://:9094
    log.dirs=/tmp/kafka-logs-2

定时打印消费数目

for i in `seq 10000`;do bin/kafka-consumer-groups.sh --describe --bootstrap-server 172.29.87.20:9092 --group topic |awk '{print $5}'|egrep -v "-"|egrep -v "LAG"|awk '{sum+=$1}END{print sum}' ;echo ------ ;sleep 10;done
上次更新时间: 2024/5/7 05:59:02