主题操作
kafka-topics.sh脚本可以用来管理主题.
--topic 指定操作的主题名,除了创建之外都可以使用正则表达式,但是要使用\转义。另外命名不要使用两个下划线开头(__表示系统内建的主题,比如__consumer_offset),主题命名中不要将.和_混用(kafka会将.最终转换成_)。
创建主题
kafka-topics.sh --bootstrap-server hadoop000:9092 --create --topic test-1 --partitions 2 --replication-factor 2
注意: --bootstrap-server 后不要使用localhost, 而是跟上主机名
kafka-topics.sh --bootstrap-server hadoop000:9092 --create --topic test-2 --partitions 2 --replication-factor 2
--zookeeper localhost:2181/kafka这样的写法从Kafka 2.2版本开始废弃,新版本kafka推荐使用--bootstrap-server替换其中
localhost:2181/kafka来自于 kafka的server.properties的zookeeper.connect的配置
描述主题
列出所有主题详情
kafka-topics.sh --bootstrap-server hadoop000:9092 --describe
支持正则表达式
~]$ kafka-topics.sh --bootstrap-server hadoop000:9092 --describe --topic test-\.\+
修改主题
alter不能修改副本因子。分区数只能增加,不能减少
kafka-topics.sh --bootstrap-server hadoop000:9092 --alter --topic test-1 --partitions 3
删除主题
--delete 只是标记主题为删除状态, 并不会马上删除, 如果你希望立即从zookeeper中删除此主题的信息, 可以在配置文件 server.properties 添加配置 delete.topic.enable=true
kafka-topics.sh --bootstrap-server hadoop000:9092 --delete --topic test-2
列出主题
只列出主题名称, 不带详细信息
kafka-topics.sh --bootstrap-server hadoop000:9092 --list
消费者操作
使用 kafka-consumer-groups.sh 工具,我们可以列出、描述或删除消费者组。消费者组可以被手动删除,也可以在该组最后一次提交的偏移量到期时被自动删除。手动删除仅在组中没有任何活动成员时有效。
列出消费者群组
> bin/kafka-consumer-groups.sh --bootstrap-server hadoop000:9092 --list
test-consumer-group-1
test-consumer-group-2
test-consumer-group-3
如果使用新的消费者客户端,要列出消费者组,可以使用--bootstrap-server和--list选项。因为新的客户端已经删除了--zookeeper选项。
描述消费者群组
> bin/kafka-consumer-groups.sh --bootstrap-server hadoop000:9092
--describe --group my-group
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENTID
topic1 0 854144 855809 1665 consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1 consumer1
topic2 0 460537 803290 342753 consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1 consumer1
topic3 2 243655 398812 155157 consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1 consumer4
删除消费者群组
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092
--delete --group my-group --group my-other-group
Deletion of requested consumer groups('my-group', 'my-other-group') was successful.
当要删除的消费者组不为空时,执行上面命令你将得到以下错误:
GroupNotEmptyException。
消费偏移量管理
删除偏移量
删除某个主题的某个消费者组的偏移量
> kafka-consumer-groups.sh --bootstrap-server hadoop000:9092 --delete-offsets --group my-group --topic my-topic-1 --topic my-topic-2
TOPIC PARTITION STATUS
my-topic-1 0 Successful
my-topic-2 0 Successful
○当要删除的消费者组不为空时,执行上面命令你将得到以下错误:
GroupNotEmptyException。
重置偏移量
要重置消费者组的偏移量,可以使用“--reset-offsets”选项。此选项一次只支持一个消费者组。它需要定义以下范围: --all-topic 或 --topic。必须选择一个作用域,除非使用 “--from-file”从文件导入的方式。
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group consumergroup1 --topic topic1 --to-latest
TOPIC PARTITION NEW-OFFSET
topic1 0
如果您使用的是旧的高级消费者,也就是消费者组元数据是存储在ZooKeeper中(即配置了
offset .storage=ZooKeeper),则需要使用--zooKeeper而不是--bootstrap-server。
| 配置项 | 描述 |
|---|---|
| –-to-datetime |
重置为某个时间点的偏移量.格式: 'YYYY-MM-DDTHH:mm:SS.sss' |
| --to-earliest : | 重置为最早的偏移量 |
| --to-latest : | 重置为最近的偏移量 |
| --shift-by |
将当前的偏移量偏移n个单位, n可以为正数也可以是负数 |
| --from-file : | 利用CSV文件中的数据重置偏移量 |
| --to-current : | R将偏移量重置为当前值 |
| --by-duration |
重置偏移量为从当前时间戳开始的时长, 格式: 'PnDTnHnMnS' |
| --to-offset : | 重置偏移量为指定的值 |
动态配置修改
从Kafka 1.1.0 开始有了动态配置修改的新特性。动态意味着在修改了Broker的配置后,我们不需要重新启动Broker来使其生效。
这个新特性包含在一个名为kafka-configs.sh的命令行工具脚本中。一旦使用这个工具设置了配置参数,新的更改将永久存储在zookeeper集群中。
覆盖主题默认配置
有许多应用于主题的配置,可以针对单个主题更改这些配置以适应集群中的不同用例。大多数配置都在代理配置中指定了缺省值,除非设置配置覆盖,否则将应用该缺省值。
格式
kafka-configs.sh --bootstrap-server --alter --entity-type topics --entity-name --add-config = [,=...]
例子
# 下面的示例将my-topic主题的留存时间设置为1小时,即3600000ms:
$ kafka-configs.sh --bootstrap-server hadoop000:9092 --alter --entity-type topics --entity-name my-topic --add-config retention.ms=3600000
主题的有效可覆盖配置
| 配置(Key) | 说明 |
|---|---|
| cleanup.policy | 如果设置为compact,则topic中 的消息将被丢弃,仅保留具有给定key的最新消息(日志压缩)。 |
| compression.type | broker将消息写入磁盘时使用的压缩类型,可以用gzip、snappy和lz4. |
| delete.retention.ms | 压缩日志墓碑消息的最长存放时间 |
| file.delete.delay.ms | 从磁盘中删除此topic的日志端和索引之前需要等待的多长时间 |
| 配置(Key) | 说明 |
|---|---|
| flush.messages | 在强制将此topic的消息刷到磁盘之前接收的消息数 |
| flush.ms | 在强制将此topic的消息刷到磁盘之前需要的时间,单位是毫秒 |
| index.interval.bytes | 日志段索引中的条目之间可以产生多少字节的消息 |
| max.message.bytes | 此topic中当个消息的大小 |
| retention.bytes | 为topic保留的消息量的总字节数 |
| retention.ms | topic中消息保留的最长时间,单位是毫秒 |
描述覆盖的配置
可以使用命令行工具kafka-configs.sh来检查主题或客户机的特定配置。显示覆盖的配置需要使用--describe选项。例如,一下代码显示名为my-topic的主题的所有覆盖过的配置
> kafka-configs.sh --bootstrap-server --describe --entity-type topics --entity-name my-topic
Configs for topic 'my-topic' are retention.ms=3600000
覆盖客户端默认配置
Kafka客户端(生产者和消费者)唯一可以覆盖的配置是生产者和消费者的配额,即允许具有指定客户端ID的所有客户端在每个broker上每秒生产或者消费的字节数。
格式
> kafka-configs.sh --bootstrap-server --alter --entity-type clients --entity-name --add-config =[,=...]
Kafka客户端的有效配置
| 配置(key) | 描述 |
|---|---|
| producer_bytes_rate | The amount of messages, in bytes, that a singe client ID is allowed to produce to a single broker in one second. |
| consumer_bytes_rate | The amount of messages, in bytes, that a single client ID is allowed to consume from a single broker in one second. |
如何把Broker中的生产者客户端ID为client_a的配额producer_byte_rate设置成20MB/秒?
> bin/kafka-configs.sh
--bootstrap-server hadoop000:9092
--alter
--entity-type clients
--entity-name client_a
--add-config 'producer_bytes_rate=20971520'
注意:
-
--add-config可以同时指定多个配置, 配置之间使用逗号分割 -
要指定生产者或者消费者的client_id,如果是Java APIs可以为客户端添加配置
client.id, 如果是控制台生产者或者消费者可以使用--producer.config或--consumer.config进行指定配置文件并且在配置文件中指定client.id.
移除覆盖的配置
可以完全删除动态配置,这将导致集群配置恢复到默认值,要删除配置覆盖,请使用--alter命令以及--delete-config命令。
下面的示例可以删除一个名为my-topic的主题的覆盖后的retention.ms配置, 删除后retention.ms将恢复为默认值:
kafka-configs.sh --bootstrap-server hadoop000:9092 --alter --entity-type topics --entity-name my-topic --delete-config retention.ms
Completed updating config for entity: topic 'my-topic'.
Views: 623
