Kafka中的消息序列化和反序列化

Kafka生产者中的配置项key.serializer 和value.serializer指示如何将用户通过其ProducerRecord提供的键和值对象转换为字节。对于简单的字符串或字节类型,可以使用包含的ByteArraySerializer或StringSerializer进行序列化操作。 kafka在发送或者接收消息的时候实际是使用byte[]字节型数组进行传输的。但是我们平常使用的时候,不但可以使用byte[],还可以使用int、short、long、float、doubl... ... [查看更多]

Views: 559

Kafka Consumer API

高级API 在控制台创建发送者 创建消费者(过时API) 官方提供案例(自动维护消费情况, 新API) 高级消费者和简单的消费者有以下的区别。 1.自动/隐藏偏移管理(Offset Management ) 2.自动(简单)分区分配 3.Broker 故障转移 => 自动重新平衡 4.Consumer 故障转移 => 自动重新平衡 低级API 也叫Simple Consumer, 实际使用起来并不简单. 实现使用低级API读取指定topic,指定partition,... ... [查看更多]

Views: 551

玩转 Java 8 Stream API

先贴上几个案例,水平高超的同学可以挑战一下: 从员工集合中筛选出salary大于8000的员工,并放置到新的集合里。 统计员工的最高薪资、平均薪资、薪资之和。 将员工按薪资从高到低排序,同样薪资者年龄小者在前。 将员工按性别分类,将员工按性别和地区分类,将员工按薪资是否高于8000分为两部分。 用传统的迭代处理也不是很难,但代码就显得冗余了,跟Stream相比高下立判。 1 Stream概述 Java 8 是一个非常成功的版本,这个版本新增的Stream,配合同版本出现的 La... ... [查看更多]

Views: 538

Kafka的消息存储

为了便于说明问题,假设这里只有一个单节点的伪分布式Kafka集群。在这个Kafka broker实例的$KAFKA_HOME/config/server.properties中配置 log.dirs=/tmp/kafka-logs,以此来设置 Kafka 消息文件存储目录。并通过命令: 创建一个 topic:topic_test,partition的数量配置为4。接下来可以在 /tmp/kafka-logs 目录中可以看到生成了 4 个partition目录: 在Kafka文件存... ... [查看更多]

Views: 589

Kafka如何保证无消息丢失

配置Kafka无消息丢失 Kafka 只对“已提交”的消息(committed message)做有限度的持久化保证。Kafka 的 一个Broker或多个Broker 成功地接收到一条消息并写入到日志文件后,它们会告诉生产者程序这条消息已成功提交,具体是一个Broker还是多个Broker取决ack参数的配置。 要想要消息不丢失,假如你的消息保存在 N 个 Kafka Broker 上,那么这个前提条件就是这 N 个 Broker 中至少有 1 个存活。 目前 Kafka Prod... ... [查看更多]

Views: 283