本文记录本人使用kafka过程中常用的运维指令。

一 准备客户端环境

安装好jdk和kafka

1. 配置producer/consumer.properties

修改config/producer.properties和config/consumer.properties,添加以下内容

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='1qaz@WSX';"


2. 配置静态jaas文件(可选)

如果使用的kafka版本太旧,或者properties文件的sasl.jaas.config无法生效的时候,可配置静态jaas文件
添加config/kafka_jaas.conf文件,然后配置环境变量使之启用

export KAFKA_OPTS="-Djava.security.auth.login.config=$KAFKA_HOME/config/kafka_jaas.conf"

账号密码认证方式,文件格式如下

KafkaClient {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="123456";
};

如果是keytab的话,格式应该是这样

KafkaClient {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  storeKey=true
  serviceName="kafka"
  keyTab="/mnt/XXX.keytab"
  principal="XXX/XXX@XXX.COM";
};

二 常用命令

# 查看主题列表:注意使用zk连接,也可自行替换成--bootstrap-server
bin/kafka-topics.sh --zookeeper $HOSTNAME:$ZK_CLIENT_PORT/kafka --list 
# 创建主题:注意使用zk连接,也可自行替换成--bootstrap-server
bin/kafka-topics.sh --zookeeper $HOSTNAME:$ZK_CLIENT_PORT/kafka --create  --topic $TOPIC_NAME --partitions 1 --replication-factor 1
# 生产消息
bin/kafka-console-producer.sh --bootstrap-server $KAFKA_HOSTNAME:$KAFKA_PORT --topic $TOPIC_NAME --producer.config config/producer.properties
# 消费消息:从头开始接收数据并打印时间戳
bin/kafka-console-consumer.sh --bootstrap-server $KAFKA_HOSTNAME:$KAFKA_PORT --from-beginning  --topic $TOPIC_NAME --consumer.config config/consumer.properties --property print.timestamp=true
# 接收1条最新的数据
bin/kafka-console-consumer.sh --bootstrap-server $HOSTNAME:$KAFKA_PORT --max-messages 1 --topic $TOPIC_NAME --consumer.config config/consumer.properties
# 查看消费组信息列表
bin/kafka-consumer-groups.sh --bootstrap-server  $KAFKA_HOSTNAME:$KAFKA_PORT --list  --command-config config/consumer.properties
# 查看指定消费组的消费/堆积情况
bin/kafka-consumer-groups.sh --bootstrap-server  $KAFKA_HOSTNAME:$KAFKA_PORT --describe --group $groupname  --command-config config/consumer.properties

三 压测命令


# 压测 预设吞吐量为5w/s,总共发送100w条数据,每条数据10字节(自动生成)
bin/kafka-producer-perf-test.sh --topic $TOPIC_NAME \
--num-record 1000000  \
--throughput 50000 \
--record-size=10 \
--producer-props bootstrap.servers=$KAFKA_HOSTNAME:$KAFKA_PORT \
--producer.config config/producer.properties

# 压测 预设吞吐量为1w/s,总共发送150w条数据,数据来自data.txt(按行切割)
bin/kafka-producer-perf-test.sh --topic $TOPIC_NAME \
--num-record 1500000  \
--throughput 10000 \
--payload-file=data.txt \
--producer-props bootstrap.servers=$KAFKA_HOSTNAME:$KAFKA_PORT \
--producer.config config/producer.properties

# 压测数据可使用如下方法获取
# 从指定主题中获取1000条数据写入到 data.txt文件中
bin/kafka-console-consumer.sh --bootstrap-server $KAFKA_HOSTNAME:$KAFKA_PORT \
--topic $TOPIC_NAME \
--max-messages=1000 \
--consumer.config config/consumer.properties >> data.txt


Q.E.D.