配置Kafka无消息丢失
Kafka 只对“已提交”的消息(committed message)做有限度的持久化保证。Kafka 的 一个Broker或多个Broker 成功地接收到一条消息并写入到日志文件后,它们会告诉生产者程序这条消息已成功提交,具体是一个Broker还是多个Broker取决ack参数的配置。
要想要消息不丢失,假如你的消息保存在 N 个 Kafka Broker 上,那么这个前提条件就是这 N 个 Broker 中至少有 1 个存活。
目前 Kafka Producer 的send方法是异步发送消息的,也就是说如果你调用的是 producer.send(msg) 这个 API,那么它通常会立即返回,消息会异步继续发送到Broker中,在这个过程中是有可能失败的,比如网络抖动导致发送失败,比如消息格式不正确等导致Broker拒绝消息。
解决方法:Producer 要使用带有回调通知的发送 API,也就是说不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。
通过callback(回调),它能准确地告诉你消息是否真的提交成功了。一旦出现消息提交失败的情况,你就可以有针对性地进行处理。举例来说,如果是因为那些瞬时错误,那么仅仅让 Producer 重试就可以了;如果是消息不合格造成的,那么可以调整消息格式后再次发送。总之,处理发送失败的责任在 Producer 端而非 Broker 端。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Properties; import java.util.concurrent.ExecutionException; public class ProducerWithCallBackExample { public static void main(String[] args) throws ExecutionException, InterruptedException { final Logger logger = LoggerFactory.getLogger(ProducerWithCallBackExample.class); //Create producer properties Properties properties = new Properties(); properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop000:9092"); properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //Create the Producer KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties); //Send 10 messages for (int i = 0; i < 10; i++) { //Create a Producer Record ProducerRecord<String, String> record = new ProducerRecord<String, String>("my-topic", "id_" + i, "Hello Kakfka" + i); logger.info("Key :" + "id_" + i); //Log the Key //send data producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e == null) { logger.info("Received new metadata. \n" + "Topic:" + recordMetadata.topic() + "\n" + "Pratition:" + recordMetadata.partition() + "\n" + "Offeset:" + recordMetadata.offset() + "\n" + "Timestamp" + recordMetadata.timestamp()); } else { logger.error("Error while producing", e); } } }); producer.flush(); } producer.close(); } } |
最佳实践
分享一下 Kafka 无消息丢失的配置的最佳实践:
- 不要使用
producer.send(msg)
,而要使用producer.send(msg, callback)
。记住,一定要使用带有回调通知的 send 方法。 - 设置
acks = all
。acks 是 Producer 的一个参数,代表了你对“已提交”消息的定义。如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。 - 设置
retries
为一个较大的值。这里的retries
同样是 Producer 的参数,对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了retries > 0
的 Producer 能够自动重试消息发送,避免消息丢失。 - 设置
unclean.leader.election.enable = false
。这是 Broker 端的参数,它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。故一般都要将该参数设置成 false,即不允许这种情况的发生。 - 设置
replication.factor >= 3
。这也是 Broker 端的参数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。 - 设置
min.insync.replicas > 1
。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。 - 确保
replication.factor > min.insync.replicas
。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成replication.factor = min.insync.replicas + 1
。 - 确保消息消费完成再提交。Consumer 端有个参数
enable.auto.commit
,最好把它设置成 false,并采用手动提交位移的方式。就像前面说的,这对于单 Consumer 多线程处理的场景而言是至关重要的。
Views: 283