kafka初学者必须知道的常用命令分享
1、启动kafka server 。进入安装目录,在安装目录的bin路径上,使用下图的命令进行启动。需要说明的是你需要在config目录下配置好server.properties 这个文件。关于这个文件的配置我在这里就不多介绍了,你可以上去kafka的官网或者其他网站上理解各个参数的使用。

2、关闭kafka server。这个就比较简单,直接用下面的命令关闭一下就可以了。

3、创建Topic 。关于创建主题,大家也可以去了解kafka的工作原理,创建主题的命令可用下面的命令操作。
bin/kafka-create-topic.sh
--topic <topic> 指定topic的名字
--zookeeper <urls> 指定zookeeper的连接地址,格式为host:port
--partition <integer> 指定partition的数量,默认为1
--replica <integer> 指定partition的备份数,默认为1
--replica-assignment-list 手动分配replica到broker上,格式为
<broker_id_for_part1_replica1 : broker_id_for_part1_replica2,
broker_id_for_part2_replica1 : broker_id_for_part2_replica2, ...>
下图为创建主题的两个例子。

4、查看Topic的信息。查看你创建的主题信息,可以采用下面的命令:
bin/kafka-list-topic.sh
--topic <topic> 指定topic的名字
--zookeeper <urls> 指定zookeeper的连接地址,格式host:port
--unavailable-partitions 只显示leader不可用的partitions
--under-replicated-partitions 只显示正在replicate的partitions
下图为查看topic的例子。

5、重新分配partition ,可以采用下面的命令:
bin/kafka-reassign-partitions.sh
--zookeeper <urls> 指定zookeeper的连接地址,格式host:port
--broker-list<brokerlist> 指定partition需要重新分配到哪些节点,格式为”0,1,2”
--topics-to-move-json-file <topics to reassign json file path> 指定JSON文件的地址,文件内容是需要重新分配的topic列表。这个选项和manual-assignment-json-file选项需要指定其中
的一个。文件内容的格式为
{"topics": [{"topic": "test"},{"topic": "test1"}], "version":1 }
--manual-assignment-json-file<manual assignment json file path> 指定JSON文件的地址,文件内容是手动分配的策略。这个选项和topics-to-move-json-file选项需要指定其中的一个。文件内容的格式为
{"partitions":
[{"topic": "test", "partition": 1, "replicas": [1,2,3] }], "version":1 }
--status-check-json-file<partition reassignment json file path> 指定JSON文件的地址,文件内容是partition和partition需要分配到的新的replica的列表。这个JSON文件可以从模拟执行的结果得到。
--execute 如果使用这个选项,那么会执行真实的重新分配分区的操作。如果不指定这个选项,默认会进行模拟执行。
例子如下图所示。

6、增加Topic的partition数量,命令为:
bin/kafka-add-partitions.sh
--zookeeper 指定zookeeper的连接地址,格式host:port
--topic 指定topic的名字
--partition 指定添加的partition的数量
--replica-assignment-list 手动分配replica到broker上,格式为
<broker_id_for_part1_replica1 : broker_id_for_part1_replica2, broker_id_for_part2_replica1 : broker_id_for_part2_replica2, ...>
例子如下图所示。

7、手动均衡Topic,让partition选择preferred replica作为leader
bin/kafka-preferred-replica-election.sh
--zookeeper 指定zookeeper的连接地址,格式host:port
--path-to-json-file 指定需要重新进行leader选举的partition列表文件所在的地址,文件内容的格式为
{“partitions”: [{“topic”: “test”,“partitions”: 1},{“topic”: “test”, “partitions”: 2}]}
默认值为所有存在的partition 。
例子如下图所示。

8、从一个Topic读消息,把消息重放到另一个Topic 。命令格式如下:
bin/kafka-replay-log-producer.sh
--async 如果设置该选项,则异步发送消息
--batch-size <Integer: batch size> 每次批量发送的消息数,默认值200
--broker-list <hostname:port> 指定broker列表,格式host:port
--compression-codec <Integer: compression docec> 如果设置该选项,则消息经过压缩后再发送,默认值0。(0:none, 1:gizp, 2:snappy)
--delay-btw-batch-ms<Long: ms> 两次批量发送的间隔时间,默认值0
--inputtopic<input-topic> 消息消费的源Topic
--messages <Integer: count> 需要发送的消息数,默认值-1,即发送所有消息
--outputtopic<output-topic> 消息发送至的目标Topic
--reporting-interval <Integer: size> 打印过程信息的时间间隔,默认值5000
--threads <Integer: threads> 发送消息的线程数,默认值1
--zookeeper <zookeeper url> 指定zookeeper的连接地址,格式host:port,默认值为127.0.0.1:2181
例子如下图所示。

9、查看Consumer的消费和积压信息。命令格式如下:
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker
--broker-info 显示broker的信息
--group <group name> 必须,指定consumer group的名字
--topic<topic> 指定topic的名字
--zkconnect<urls> 必须,指定zookeeper的连接地址,格式host:port
例子如下图所示。
