Kakfa管理操作的相关命令

主题操作

kafka-topics.sh脚本可以用来管理主题.

--topic 指定操作的主题名,除了创建之外都可以使用正则表达式,但是要使用\转义。另外命名不要使用两个下划线开头(__表示系统内建的主题,比如__consumer_offset),主题命名中不要将.和_混用(kafka会将.最终转换成_)。

创建主题

注意: --bootstrap-server 后不要使用localhost, 而是跟上主机名

--zookeeper localhost:2181/kafka 这样的写法从Kafka 2.2版本开始废弃,新版本kafka推荐使用--bootstrap-server替换

其中localhost:2181/kafka 来自于 kafka的server.propertieszookeeper.connect的配置

描述主题

列出所有主题详情

支持正则表达式

修改主题

alter不能修改副本因子。分区数只能增加,不能减少

删除主题

--delete 只是标记主题为删除状态, 并不会马上删除, 如果你希望立即从zookeeper中删除此主题的信息, 可以在配置文件 server.properties 添加配置 delete.topic.enable=true

列出主题

只列出主题名称, 不带详细信息

消费者操作

使用 kafka-consumer-groups.sh 工具,我们可以列出、描述或删除消费者组。消费者组可以被手动删除,也可以在该组最后一次提交的偏移量到期时被自动删除。手动删除仅在组中没有任何活动成员时有效。

列出消费者群组

如果使用新的消费者客户端,要列出消费者组,可以使用–bootstrap-server和–list选项。因为新的客户端已经删除了–zookeeper选项。

描述消费者群组

删除消费者群组

当要删除的消费者组不为空时,执行上面命令你将得到以下错误: GroupNotEmptyException

消费偏移量管理

删除偏移量

删除某个主题的某个消费者组的偏移量

○当要删除的消费者组不为空时,执行上面命令你将得到以下错误: GroupNotEmptyException

重置偏移量

要重置消费者组的偏移量,可以使用“–reset-offsets”选项。此选项一次只支持一个消费者组。它需要定义以下范围: –all-topic 或 –topic。必须选择一个作用域,除非使用 “–from-file”从文件导入的方式。

如果您使用的是旧的高级消费者,也就是消费者组元数据是存储在ZooKeeper中(即配置了offset .storage=ZooKeeper),则需要使用--zooKeeper而不是--bootstrap-server

配置项 描述
–-to-datetime 重置为某个时间点的偏移量.格式: ‘YYYY-MM-DDTHH:mm:SS.sss’
–to-earliest : 重置为最早的偏移量
–to-latest : 重置为最近的偏移量
–shift-by : 将当前的偏移量偏移n个单位, n可以为正数也可以是负数
–from-file : 利用CSV文件中的数据重置偏移量
–to-current : R将偏移量重置为当前值
–by-duration : 重置偏移量为从当前时间戳开始的时长, 格式: ‘PnDTnHnMnS’
–to-offset : 重置偏移量为指定的值

动态配置修改

从Kafka 1.1.0 开始有了动态配置修改的新特性。动态意味着在修改了Broker的配置后,我们不需要重新启动Broker来使其生效。

这个新特性包含在一个名为kafka-configs.sh的命令行工具脚本中。一旦使用这个工具设置了配置参数,新的更改将永久存储在zookeeper集群中。

覆盖主题默认配置

有许多应用于主题的配置,可以针对单个主题更改这些配置以适应集群中的不同用例。大多数配置都在代理配置中指定了缺省值,除非设置配置覆盖,否则将应用该缺省值。

格式

例子

主题的有效可覆盖配置

配置(Key 说明
cleanup.policy 如果设置为compact,则topic中 的消息将被丢弃,仅保留具有给定key的最新消息(日志压缩)。
compression.type broker将消息写入磁盘时使用的压缩类型,可以用gzip、snappy和lz4.
delete.retention.ms 压缩日志墓碑消息的最长存放时间
file.delete.delay.ms 从磁盘中删除此topic的日志端和索引之前需要等待的多长时间
配置(Key 说明
flush.messages 在强制将此topic的消息刷到磁盘之前接收的消息数
flush.ms 在强制将此topic的消息刷到磁盘之前需要的时间,单位是毫秒
index.interval.bytes 日志段索引中的条目之间可以产生多少字节的消息
max.message.bytes 此topic中当个消息的大小
retention.bytes 为topic保留的消息量的总字节数
retention.ms topic中消息保留的最长时间,单位是毫秒

描述覆盖的配置

可以使用命令行工具kafka-configs.sh来检查主题或客户机的特定配置。显示覆盖的配置需要使用–describe选项。例如,一下代码显示名为my-topic的主题的所有覆盖过的配置

覆盖客户端默认配置

Kafka客户端(生产者和消费者)唯一可以覆盖的配置是生产者和消费者的配额,即允许具有指定客户端ID的所有客户端在每个broker上每秒生产或者消费的字节数。

格式

Kafka客户端的有效配置

配置key 描述
producer_bytes_rate The amount of messages, in bytes, that a singe client ID is allowed to produce to a single broker in one second.
consumer_bytes_rate The amount of messages, in bytes, that a single client ID is allowed to consume from a single broker in one second.

如何把Broker中的生产者客户端ID为client_a的配额producer_byte_rate设置成20MB/秒?

注意:

  • --add-config 可以同时指定多个配置, 配置之间使用逗号分割

  • 要指定生产者或者消费者的client_id,如果是Java APIs可以为客户端添加配置client.id, 如果是控制台生产者或者消费者可以使用--producer.config--consumer.config进行指定配置文件并且在配置文件中指定client.id.

移除覆盖的配置

可以完全删除动态配置,这将导致集群配置恢复到默认值,要删除配置覆盖,请使用--alter命令以及--delete-config命令。

下面的示例可以删除一个名为my-topic的主题的覆盖后的retention.ms配置, 删除后retention.ms将恢复为默认值:

Views: 623

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注