Kafka Producer API

http://kafka.apache.org/25/documentation.html#api

Kafka 架构回顾

image-20211011145945791

1)Producer :消息生产者,就是向kafka broker发消息的客户端;

2)Consumer :消息消费者,向kafka broker取消息的客户端;

3)Topic :可以理解为一个队列;

4) Consumer Group (CG):这是kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段。一个topic可以有多个CG。topic的消息会复制(不是真的复制,是概念上的)到所有的CG,但每个partion只会把消息发给该CG中的一个consumer。如果需要实现广播,只要每个consumer有一个独立的CG就可以了。要实现单播只要所有的consumer在同一个CG。用CG还可以将consumer进行自由的分组而不需要多次发送消息到不同的topic;

5)Broker :一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic;

6)Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序;

7)Offset:kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。当然the first offset就是00000000000.kafka。

添加 maven 依赖

Producer API可以让应用程序发送数据流到Kafka集群的消息主题中。

Producer的使用可以查看官方文档 javadocs.

http://kafka.apache.org/25/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html

要使用Producer可以添加以下Maven依赖:

Producer的简单使用

KafkaProducer是Kafka客户端,用于将一条消息(记录)发布到Kafka集群。

KafkaProducer是线程安全的,跨线程共享单个Producer实例通常比拥有多个Producer实例更快。

下面是一个使用Producer发送消息的简单示例,该记录使用包含0到99的数字序列的字符串作为键/值对。

注意这里send()发送消息是异步的.

Producer 生产过程

写入方式

producer采用推(push)模式将消息发布到broker,每条消息都被追加(append)到分区(patition)中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障kafka吞吐率)。

分区(Partition)

消息发送时都被发送到一个topic,其本质就是一个目录,而topic是由一些Partition Logs(分区日志)组成,其组织结构如下图所示:

image-20211011150538034

我们可以看到,每个Partition中的消息都是有序的,生产的消息被不断追加到Partition log上,其中的每一个消息都被赋予了一个唯一的offset值。

如何唯一标识一个消息: 主题->分区->偏移量

1)分区的原因

(1)方便在集群中扩展,每个Partition可以通过调整以适应它所在的机器,而一个topic又可以有多个Partition组成,因此整个集群就可以适应任意大小的数据了;

(2)可以提高并发,因为可以以Partition为单位读写了。

2)分区的原则

(1)指定了patition,则直接使用;

(2)未指定patition但指定key,通过对key的value进行hash出一个patition;

(3)patition和key都未指定,使用轮询选出一个patition。

副本(Replication)

同一个partition可能会有多个replication(对应 server.properties 配置中的 default.replication.factor=N)。没有replication的情况下,一旦broker 宕机,其上所有 patition 的数据都不可被消费,同时producer也不能再将数据存于其上的patition。引入replication之后,同一个partition可能会有多个replication,而这时需要在这些replication之间选出一个leader,producer和consumer只与这个leader交互,其它replication作为follower从leader 中复制数据。

消息写入流程

image-20211011150947554

producer写入消息流程如下:

1)producer先从zookeeper的 "/brokers/…/state"节点找到该partition的leader

2)producer将消息发送给该leader

3)leader将消息写入本地log

4)followers从leader pull消息,写入本地log后向leader发送ACK

5)leader收到所有ISR中的replication的ACK后,增加HW(high watermark,最后commit 的offset)并向producer发送ACK

Broker 保存消息

存储方式

物理上把topic分成一个或多个patition(对应 server.properties 中的num.partitions=3配置),每个patition物理上对应一个文件夹(该文件夹存储该patition的所有消息和索引文件),如下:

存储策略

无论消息是否被消费,kafka都会保留所有消息。有两种策略可以删除旧数据:

1)基于时间:log.retention.hours=168

2)基于大小:log.retention.bytes=1073741824

需要注意的是,因为Kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,所以这里删除过期文件与提高 Kafka 性能无关。

Zookeeper存储结构

image-20211018163259464

注意:producer不在zk中注册,消费者在zk中注册。

producer 包括一个保存尚未传输到服务器的消息(record)的缓冲区空间池,以及一个负责将这些记录转换为请求并将其传输到集群的后台I/O线程。producer使用后未及时关闭将导致这些资源泄漏。

send()方法是异步的,调用send()方法会将待发送消息(record)添加到缓冲区等待发送,并立即返回。生产者会积攒一批消息后批量发送以提高效率。

acks配置项用来控制请求如何被认为已处理完成。我们指定的“all”设置将导致阻塞全部记录提交,这是最慢但最持久的设置。

如果请求失败,生产者可以自动重试,除非配置了retries配置项为0。启用重试可能导致出现重复消息的可能性(有关消息传递语义的详细信息,请参阅文档)。

Producer 在每个分区维都维护未发送记录的缓冲区。这些缓冲区的大小由batch.size配置项进行配置,增大这个配置的值可以允许一个批次处理更多消息,但需要更多的内存(因为我们通常会为每个活动分区拥有一个这样的缓冲区)。

默认情况下,即使缓冲区中有额外的未使用空间,缓冲数据也是可以立即发送出去的。然而如果你想减少请求的数量可以设置linger.ms为一个大于0的数, 这可以让Producer发送请求之前等待该毫秒数,这样可以有更多消息在一个批次中批量发送。

在上面的代码片段中,可能会在一个请求中发送所有100条记录,因为我们将延迟时间设置为1毫秒。但是,如果我们没有填满缓冲区,这个设置会给请求增加1毫秒的延迟,以等待更多的记录填充到缓冲区。因此在负载大的情况下,无论是否配置linger.ms,消息都是通过批处理发送的;将linger.ms的值设置为大于0的值的意义在于,在没有处于最大负载的情况下可以有效减少请求数量, 代价是增加一点点延迟。

配置项buffer.memory 用来控制Producer用于缓冲区的可用内存总量。如果消息发送到服务器的速度大于缓冲区耗尽的速度,当缓冲区空间耗尽时,其他消息的发送将被阻塞,阻塞时间的阈值由max.block决定。之后,它会抛出一个TimeoutException异常。

配置项 key.serializervalue.serializer 用来配置键值对象的序列化类, 对于简单的字符串或字节类型,可以使用Kafka自带的ByteArraySerializer或StringSerializer。

从Kafka 0.11这个版本开始,KafkaProducer支持另外两种模式:幂等Producer和事务Producer。幂等Producer加强了卡夫卡的交付语义,从至少一次交付(at least once delivery)加强为准确地一次交付(exactly once delivery), 幂等Producer的消息无论失败重试多少次都不会导致数据重复统计。而事务Producer是以原子操作将消息发送到多个分区(和主题!),即这些操作要么同时成功, 要么同时失败!.

若要启用等幂性(idempotence),则 enable.idempotence配置必须设置为true。此时retries配置值将默认为Integer.MAX_VALUE, 而acks配置默认为all。使用幂等性Producer在API上没有变化,因此不需要修改现有应用程序。

一旦启用了幂等性(idempotence),建议不要配置重试(retries)参数,因为它默认值为Integer.MAX_VALUE。另外,如果send(ProducerRecord)方法在进行无限次重试之后仍然返回错误(例如消息在缓冲区中过期),那么建议关闭Producer并检查最后生成的消息的内容,以确保没有重复消息。最后,生产者只能保证在单个会话中发送的消息的幂等性。

过时的Producer API

一些公司可能因为历史原因, 仍然在使用旧的(过时的)API.

pom.xml

简单代码示例

过时的生产者API不支持幂等性和事务.

复杂代码示例

自定义调度器代码

生产者代码

Producer 配置参数

所有producer的配置参阅官网

同步发送结合自定义调度器

自定义调度器

生产者代码

异步发送结合回调

这里回调采用使用匿名内部类的方式:

事务Producer

客户端能与之交互的Broker上安装Kafka版本要至少0.10.0。有些代理服务器可能不支持某些客户端特性。例如,事务api需要Kafka版本至少0.11.0。当使用的代码和实际版本不符合时将收到unsupportedVersionException异常。

要使用事务Producer API,必须设置 transactional.id 属性, 设置transactional.id可用来在单个Producer的实例跨多个会话时方便进行事务的恢复(回滚操作)。只要 transactional.id 设置好后,幂等性所依赖的所有相关的Producer配置就会自动生效。此外,事务中包含的主题应该做好持久性设置。特别是replication.factor 应该至少是3,主题的min.insync.replicas 应设为2。最后,为了从端到端实现事务保证,还必须将使用者的事务隔离级别配置为只读取提交(read only committed messages)。

所有新的事务api都是阻塞的,一旦失败就会抛出异常。下面的示例说明了如何使用新的api。它与上面的示例类似,只是100条消息都是属于一个事务(要么同时成功,要么同时失败)。

正如示例中所暗示的,每个生产者只能有一个开放的事务。所有在beginTransaction()commitTransaction()调用之间发送的消息都是单个事务的一部分。如果指定了事务id,则生产者发送的所有消息必须是事务的一部分。

事务Producer通过抛出异常来传递错误状态。因此不需要为producer.send()指定回调,也不需要调用producer.send().get()来返回将来执行的结果. 如果任何producer.send()或事务调用在事务期间遇到不可恢复的错误,就会抛出KafkaException。有关从事务发送中检测错误的详细信息,请参阅文档 send(ProducerRecord) 。通过在接收到KafkaException时调用producer.abortTransaction(),从而保证事务性。

Views: 471

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注