主题操作
kafka-topics.sh
脚本可以用来管理主题.
--topic
指定操作的主题名,除了创建之外都可以使用正则表达式,但是要使用\转义。另外命名不要使用两个下划线开头(__
表示系统内建的主题,比如__consumer_offset
),主题命名中不要将.和_混用(kafka会将.
最终转换成_
)。
创建主题
1 |
kafka-topics.sh --bootstrap-server hadoop000:9092 --create --topic test-1 --partitions 2 --replication-factor 2 |
注意: --bootstrap-server
后不要使用localhost, 而是跟上主机名
1 |
kafka-topics.sh --bootstrap-server hadoop000:9092 --create --topic test-2 --partitions 2 --replication-factor 2 |
--zookeeper localhost:2181/kafka
这样的写法从Kafka 2.2版本开始废弃,新版本kafka推荐使用--bootstrap-server
替换其中
localhost:2181/kafka
来自于 kafka的server.properties
的zookeeper.connect
的配置
描述主题
列出所有主题详情
1 |
kafka-topics.sh --bootstrap-server hadoop000:9092 --describe |
支持正则表达式
1 |
~]$ kafka-topics.sh --bootstrap-server hadoop000:9092 --describe --topic test-\.\+ |
修改主题
alter不能修改副本因子。分区数只能增加,不能减少
1 |
kafka-topics.sh --bootstrap-server hadoop000:9092 --alter --topic test-1 --partitions 3 |
删除主题
--delete
只是标记主题为删除状态, 并不会马上删除, 如果你希望立即从zookeeper中删除此主题的信息, 可以在配置文件 server.properties
添加配置 delete.topic.enable=true
1 |
kafka-topics.sh --bootstrap-server hadoop000:9092 --delete --topic test-2 |
列出主题
只列出主题名称, 不带详细信息
1 |
kafka-topics.sh --bootstrap-server hadoop000:9092 --list |
消费者操作
使用 kafka-consumer-groups.sh 工具,我们可以列出、描述或删除消费者组。消费者组可以被手动删除,也可以在该组最后一次提交的偏移量到期时被自动删除。手动删除仅在组中没有任何活动成员时有效。
列出消费者群组
1 2 3 4 5 |
> bin/kafka-consumer-groups.sh --bootstrap-server hadoop000:9092 --list test-consumer-group-1 test-consumer-group-2 test-consumer-group-3 |
如果使用新的消费者客户端,要列出消费者组,可以使用–bootstrap-server和–list选项。因为新的客户端已经删除了–zookeeper选项。
描述消费者群组
1 2 3 4 5 6 7 8 |
> bin/kafka-consumer-groups.sh --bootstrap-server hadoop000:9092 --describe --group my-group TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENTID topic1 0 854144 855809 1665 consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1 consumer1 topic2 0 460537 803290 342753 consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1 consumer1 topic3 2 243655 398812 155157 consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1 consumer4 |
删除消费者群组
1 2 3 4 |
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group my-group --group my-other-group Deletion of requested consumer groups('my-group', 'my-other-group') was successful. |
当要删除的消费者组不为空时,执行上面命令你将得到以下错误:
GroupNotEmptyException
。
消费偏移量管理
删除偏移量
删除某个主题的某个消费者组的偏移量
1 2 3 4 5 |
> kafka-consumer-groups.sh --bootstrap-server hadoop000:9092 --delete-offsets --group my-group --topic my-topic-1 --topic my-topic-2 TOPIC PARTITION STATUS my-topic-1 0 Successful my-topic-2 0 Successful |
○当要删除的消费者组不为空时,执行上面命令你将得到以下错误:
GroupNotEmptyException
。
重置偏移量
要重置消费者组的偏移量,可以使用“–reset-offsets”选项。此选项一次只支持一个消费者组。它需要定义以下范围: –all-topic 或 –topic。必须选择一个作用域,除非使用 “–from-file”从文件导入的方式。
1 2 3 4 |
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group consumergroup1 --topic topic1 --to-latest TOPIC PARTITION NEW-OFFSET topic1 0 |
如果您使用的是旧的高级消费者,也就是消费者组元数据是存储在ZooKeeper中(即配置了
offset .storage=ZooKeeper
),则需要使用--zooKeeper
而不是--bootstrap-server
。
配置项 | 描述 |
---|---|
–-to-datetime |
重置为某个时间点的偏移量.格式: ‘YYYY-MM-DDTHH:mm:SS.sss’ |
–to-earliest : | 重置为最早的偏移量 |
–to-latest : | 重置为最近的偏移量 |
–shift-by |
将当前的偏移量偏移n个单位, n可以为正数也可以是负数 |
–from-file : | 利用CSV文件中的数据重置偏移量 |
–to-current : | R将偏移量重置为当前值 |
–by-duration |
重置偏移量为从当前时间戳开始的时长, 格式: ‘PnDTnHnMnS’ |
–to-offset : | 重置偏移量为指定的值 |
动态配置修改
从Kafka 1.1.0 开始有了动态配置修改的新特性。动态意味着在修改了Broker的配置后,我们不需要重新启动Broker来使其生效。
这个新特性包含在一个名为kafka-configs.sh
的命令行工具脚本中。一旦使用这个工具设置了配置参数,新的更改将永久存储在zookeeper集群中。
覆盖主题默认配置
有许多应用于主题的配置,可以针对单个主题更改这些配置以适应集群中的不同用例。大多数配置都在代理配置中指定了缺省值,除非设置配置覆盖,否则将应用该缺省值。
格式
1 2 |
kafka-configs.sh --bootstrap-server <broker_list> --alter --entity-type topics --entity-name <topic name> --add-config <key>= <value>[,<key>=<value>...] |
例子
1 2 |
# 下面的示例将my-topic主题的留存时间设置为1小时,即3600000ms: $ kafka-configs.sh --bootstrap-server hadoop000:9092 --alter --entity-type topics --entity-name my-topic --add-config retention.ms=3600000 |
主题的有效可覆盖配置
配置(Key) | 说明 |
---|---|
cleanup.policy | 如果设置为compact,则topic中 的消息将被丢弃,仅保留具有给定key的最新消息(日志压缩)。 |
compression.type | broker将消息写入磁盘时使用的压缩类型,可以用gzip、snappy和lz4. |
delete.retention.ms | 压缩日志墓碑消息的最长存放时间 |
file.delete.delay.ms | 从磁盘中删除此topic的日志端和索引之前需要等待的多长时间 |
配置(Key) | 说明 |
---|---|
flush.messages | 在强制将此topic的消息刷到磁盘之前接收的消息数 |
flush.ms | 在强制将此topic的消息刷到磁盘之前需要的时间,单位是毫秒 |
index.interval.bytes | 日志段索引中的条目之间可以产生多少字节的消息 |
max.message.bytes | 此topic中当个消息的大小 |
retention.bytes | 为topic保留的消息量的总字节数 |
retention.ms | topic中消息保留的最长时间,单位是毫秒 |
描述覆盖的配置
可以使用命令行工具kafka-configs.sh
来检查主题或客户机的特定配置。显示覆盖的配置需要使用–describe选项。例如,一下代码显示名为my-topic的主题的所有覆盖过的配置
1 2 3 4 |
> kafka-configs.sh --bootstrap-server <broker_list> --describe --entity-type topics --entity-name my-topic Configs for topic 'my-topic' are retention.ms=3600000 |
覆盖客户端默认配置
Kafka客户端(生产者和消费者)唯一可以覆盖的配置是生产者和消费者的配额,即允许具有指定客户端ID的所有客户端在每个broker上每秒生产或者消费的字节数。
格式
1 2 |
> kafka-configs.sh --bootstrap-server <broker_list> --alter --entity-type clients --entity-name <client ID> --add-config <key>=<value>[,<key>=<value>...] |
Kafka客户端的有效配置
配置(key) | 描述 |
---|---|
producer_bytes_rate | The amount of messages, in bytes, that a singe client ID is allowed to produce to a single broker in one second. |
consumer_bytes_rate | The amount of messages, in bytes, that a single client ID is allowed to consume from a single broker in one second. |
如何把Broker中的生产者客户端ID为client_a的配额producer_byte_rate
设置成20MB/秒?
1 2 3 4 5 6 |
> bin/kafka-configs.sh --bootstrap-server hadoop000:9092 --alter --entity-type clients --entity-name client_a --add-config 'producer_bytes_rate=20971520' |
注意:
-
--add-config
可以同时指定多个配置, 配置之间使用逗号分割 -
要指定生产者或者消费者的client_id,如果是Java APIs可以为客户端添加配置
client.id
, 如果是控制台生产者或者消费者可以使用--producer.config
或--consumer.config
进行指定配置文件并且在配置文件中指定client.id
.
移除覆盖的配置
可以完全删除动态配置,这将导致集群配置恢复到默认值,要删除配置覆盖,请使用--alter
命令以及--delete-config
命令。
下面的示例可以删除一个名为my-topic
的主题的覆盖后的retention.ms
配置, 删除后retention.ms
将恢复为默认值:
1 2 3 4 |
kafka-configs.sh --bootstrap-server hadoop000:9092 --alter --entity-type topics --entity-name my-topic --delete-config retention.ms Completed updating config for entity: topic 'my-topic'. |
Views: 623