Kakfa管理操作的相关命令

主题操作

kafka-topics.sh脚本可以用来管理主题.

--topic 指定操作的主题名,除了创建之外都可以使用正则表达式,但是要使用\转义。另外命名不要使用两个下划线开头(__表示系统内建的主题,比如__consumer_offset),主题命名中不要将.和_混用(kafka会将.最终转换成_)。

创建主题

kafka-topics.sh --bootstrap-server hadoop000:9092 --create --topic test-1 --partitions 2 --replication-factor 2

注意: --bootstrap-server 后不要使用localhost, 而是跟上主机名

kafka-topics.sh --bootstrap-server hadoop000:9092 --create --topic test-2 --partitions 2 --replication-factor 2

--zookeeper localhost:2181/kafka 这样的写法从Kafka 2.2版本开始废弃,新版本kafka推荐使用--bootstrap-server替换

其中localhost:2181/kafka 来自于 kafka的server.propertieszookeeper.connect的配置

描述主题

列出所有主题详情

kafka-topics.sh --bootstrap-server hadoop000:9092 --describe

支持正则表达式

 ~]$ kafka-topics.sh --bootstrap-server hadoop000:9092 --describe --topic test-\.\+

修改主题

alter不能修改副本因子。分区数只能增加,不能减少

kafka-topics.sh --bootstrap-server hadoop000:9092 --alter --topic test-1 --partitions 3

删除主题

--delete 只是标记主题为删除状态, 并不会马上删除, 如果你希望立即从zookeeper中删除此主题的信息, 可以在配置文件 server.properties 添加配置 delete.topic.enable=true

kafka-topics.sh --bootstrap-server hadoop000:9092 --delete --topic test-2

列出主题

只列出主题名称, 不带详细信息

kafka-topics.sh --bootstrap-server hadoop000:9092 --list

消费者操作

使用 kafka-consumer-groups.sh 工具,我们可以列出、描述或删除消费者组。消费者组可以被手动删除,也可以在该组最后一次提交的偏移量到期时被自动删除。手动删除仅在组中没有任何活动成员时有效。

列出消费者群组

> bin/kafka-consumer-groups.sh --bootstrap-server hadoop000:9092 --list
test-consumer-group-1
test-consumer-group-2
test-consumer-group-3

如果使用新的消费者客户端,要列出消费者组,可以使用--bootstrap-server和--list选项。因为新的客户端已经删除了--zookeeper选项。

描述消费者群组

> bin/kafka-consumer-groups.sh --bootstrap-server hadoop000:9092 
--describe --group my-group

TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENTID
topic1 0  854144 855809 1665 consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1 consumer1
topic2 0  460537  803290  342753  consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1 consumer1 
topic3 2  243655  398812  155157  consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1 consumer4

删除消费者群组

> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 
--delete --group my-group --group my-other-group
Deletion of requested consumer groups('my-group', 'my-other-group') was successful.

当要删除的消费者组不为空时,执行上面命令你将得到以下错误: GroupNotEmptyException

消费偏移量管理

删除偏移量

删除某个主题的某个消费者组的偏移量

> kafka-consumer-groups.sh --bootstrap-server hadoop000:9092 --delete-offsets --group my-group --topic my-topic-1 --topic my-topic-2
TOPIC                          PARTITION       STATUS 
my-topic-1                     0               Successful
my-topic-2                     0               Successful

○当要删除的消费者组不为空时,执行上面命令你将得到以下错误: GroupNotEmptyException

重置偏移量

要重置消费者组的偏移量,可以使用“--reset-offsets”选项。此选项一次只支持一个消费者组。它需要定义以下范围: --all-topic 或 --topic。必须选择一个作用域,除非使用 “--from-file”从文件导入的方式。

> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group consumergroup1 --topic topic1 --to-latest
TOPIC                          PARTITION  NEW-OFFSET
topic1                         0 

如果您使用的是旧的高级消费者,也就是消费者组元数据是存储在ZooKeeper中(即配置了offset .storage=ZooKeeper),则需要使用--zooKeeper而不是--bootstrap-server

配置项 描述
–-to-datetime 重置为某个时间点的偏移量.格式: 'YYYY-MM-DDTHH:mm:SS.sss'
--to-earliest : 重置为最早的偏移量
--to-latest : 重置为最近的偏移量
--shift-by : 将当前的偏移量偏移n个单位, n可以为正数也可以是负数
--from-file : 利用CSV文件中的数据重置偏移量
--to-current : R将偏移量重置为当前值
--by-duration : 重置偏移量为从当前时间戳开始的时长, 格式: 'PnDTnHnMnS'
--to-offset : 重置偏移量为指定的值

动态配置修改

从Kafka 1.1.0 开始有了动态配置修改的新特性。动态意味着在修改了Broker的配置后,我们不需要重新启动Broker来使其生效。

这个新特性包含在一个名为kafka-configs.sh的命令行工具脚本中。一旦使用这个工具设置了配置参数,新的更改将永久存储在zookeeper集群中。

覆盖主题默认配置

有许多应用于主题的配置,可以针对单个主题更改这些配置以适应集群中的不同用例。大多数配置都在代理配置中指定了缺省值,除非设置配置覆盖,否则将应用该缺省值。

格式

kafka-configs.sh --bootstrap-server  --alter --entity-type topics --entity-name  --add-config = [,=...]

例子

# 下面的示例将my-topic主题的留存时间设置为1小时,即3600000ms:
$ kafka-configs.sh --bootstrap-server hadoop000:9092 --alter --entity-type topics --entity-name my-topic --add-config retention.ms=3600000

主题的有效可覆盖配置

配置(Key 说明
cleanup.policy 如果设置为compact,则topic中 的消息将被丢弃,仅保留具有给定key的最新消息(日志压缩)。
compression.type broker将消息写入磁盘时使用的压缩类型,可以用gzip、snappy和lz4.
delete.retention.ms 压缩日志墓碑消息的最长存放时间
file.delete.delay.ms 从磁盘中删除此topic的日志端和索引之前需要等待的多长时间
配置(Key 说明
flush.messages 在强制将此topic的消息刷到磁盘之前接收的消息数
flush.ms 在强制将此topic的消息刷到磁盘之前需要的时间,单位是毫秒
index.interval.bytes 日志段索引中的条目之间可以产生多少字节的消息
max.message.bytes 此topic中当个消息的大小
retention.bytes 为topic保留的消息量的总字节数
retention.ms topic中消息保留的最长时间,单位是毫秒

描述覆盖的配置

可以使用命令行工具kafka-configs.sh来检查主题或客户机的特定配置。显示覆盖的配置需要使用--describe选项。例如,一下代码显示名为my-topic的主题的所有覆盖过的配置

> kafka-configs.sh --bootstrap-server  --describe --entity-type topics --entity-name my-topic

Configs for topic 'my-topic' are retention.ms=3600000

覆盖客户端默认配置

Kafka客户端(生产者和消费者)唯一可以覆盖的配置是生产者和消费者的配额,即允许具有指定客户端ID的所有客户端在每个broker上每秒生产或者消费的字节数。

格式

> kafka-configs.sh --bootstrap-server  --alter --entity-type clients --entity-name  --add-config =[,=...]

Kafka客户端的有效配置

配置key 描述
producer_bytes_rate The amount of messages, in bytes, that a singe client ID is allowed to produce to a single broker in one second.
consumer_bytes_rate The amount of messages, in bytes, that a single client ID is allowed to consume from a single broker in one second.

如何把Broker中的生产者客户端ID为client_a的配额producer_byte_rate设置成20MB/秒?

> bin/kafka-configs.sh
 --bootstrap-server hadoop000:9092 
 --alter
 --entity-type clients 
 --entity-name client_a
 --add-config 'producer_bytes_rate=20971520'

注意:

  • --add-config 可以同时指定多个配置, 配置之间使用逗号分割

  • 要指定生产者或者消费者的client_id,如果是Java APIs可以为客户端添加配置client.id, 如果是控制台生产者或者消费者可以使用--producer.config--consumer.config进行指定配置文件并且在配置文件中指定client.id.

移除覆盖的配置

可以完全删除动态配置,这将导致集群配置恢复到默认值,要删除配置覆盖,请使用--alter命令以及--delete-config命令。

下面的示例可以删除一个名为my-topic的主题的覆盖后的retention.ms配置, 删除后retention.ms将恢复为默认值:

kafka-configs.sh --bootstrap-server hadoop000:9092 --alter --entity-type topics --entity-name my-topic --delete-config retention.ms

Completed updating config for entity: topic 'my-topic'.

Views: 623

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注