Kafka中的消息序列化和反序列化

Kafka生产者中的配置项key.serializervalue.serializer指示如何将用户通过其ProducerRecord提供的键和值对象转换为字节。对于简单的字符串或字节类型,可以使用包含的ByteArraySerializerStringSerializer进行序列化操作。

kafka在发送或者接收消息的时候实际是使用byte[]字节型数组进行传输的。但是我们平常使用的时候,不但可以使用byte[],还可以使用int、short、long、float、double、String等数据类型,这是因为在我们使用这些数据类型的时候,kafka根据我们指定的序列化和反序列化方式转成byte[]类型之后再进行传输来提高传输效率。

通常我们在使用kakfa发送或者接受消息的时候都需要指定消息的key和value序列化方式,如生产者我们可以设置value.serializerorg.apache.kafka.common.serialization.StringSerializer来设置value的序列化方式为字符串,即我们可以发送string类型的消息。目前kafka原生支持的序列化和反序列化方式如下两表所示,这些原生的序列化和反序列化的类都是在org.apache.kafka.common.serialization包之下:

kafka序列化方式表

序列化方式 对应java数据类型 说明
ByteArraySerializer byte[] 原生类型
ByteBufferSerializer ByteBuffer 关于ByteBuffer
IntegerSerializer Interger
ShortSerializer Short
LongSerializer Long
DoubleSerializer Double
StringSerializer String

kafka反序列化方式表

序列化方式 对应java数据类型 说明
ByteArrayDeserializer byte[] 原生类型
ByteBufferDeserializer ByteBuffer 关于ByteBuffer
IntegerDeserializer Interger
ShortDeserializer Short
LongDeserializer Long
DoubleDeserializer Double
StringDeserializer String

Java原生类型的序列化和反序列化

上面我们了解一些关于kafka原生的一些序列化和反序列化方式。它们究竟是如实现的呢?以string类型为例子,我们看一下,kafka如何实现序列化/反序列化的。

kafka序列化/反序列化方式的实现代码在org.apache.kafka.common.serialization包下。

String 类型的序列化类

我们查看org.apache.kafka.common.serialization.StringSerializer这个类。

由上面的代码我们可以看出:

  • String的序列化类是继承了Serializer接口,指定<String>泛型,然后实现的Serializer接口的configure()serialize()close()方法。代码重点的实现是在serialize(),可以看出这个方法将我们传入的String类型的数据,简单的通过data.getBytes(encoding)方法进行了序列化。

String 类新的反序列化类

我们查看org.apache.kafka.common.serialization.StringDeserializer这个类。

同样,由上面的代码我们可以看出:

  • String类型的反序列化类是继承了Deserializer接口,指定<String>泛型,然后实现的Deserializer接口的configure()deserialize()close()方法。代码重点的实现是在deserialize(),可以看出这个方法将我们传入的byte[]类型的数据,简单的通过return new String(data, encoding)方法进行了反序列化得到了String类型的数据。

复杂Java对象的序列化及反序列化

通过上面,我们对kafka原生序列化/反序列化方式的了解,我们可以看出,kafka实现序列化/反序列化可以简单的总结为两步,第一步实现序列化Serializer或者反序列化Deserializer接口。第二步实现接口方法,将指定类型序列化成byte[]或者将byte[]反序列化成指定数据类型(String)。

由于原生类型不支持复杂对象类型,所以接下来,我们来实现对复杂对象类型的自定义序列化/反序列化方式。

这里我们介绍两种方式:

利用Buffer缓冲区

问题陈述:

Chapter 4 Activity 4.2

Kafka 消息也可以传递复杂对象类型。通常,这些对象将有多个字段。例如供应商对象。

实体类 Supplier

如果希望发送此类自定义对象或类型结构,则需要实现自定义序列化器和反序列化器。实现需要自定义序列化器和反序列化器。

解决方案 为了实现上述要求,利用缓冲区(Buffer,内存中预留指定大小的存储空间)用来对输入/输出(I/O)的数据作临时存储,这里由于需要保存字节数据所以我们使用ByteBuffer

自定义序列化类 SupplierSerializer

自定义反序列化类 SupplierDeserializer

JSON格式的序列化及反序列化

实体类 JsonData

什么是 JSON ?

  • JSON 指的是 JavaScript 对象表示法(JavaScript Object Notation)
  • JSON 是轻量级的文本数据交换格式,类似 XML
  • JSON 比 XML 更小、更快,更易解析。
  • JSON 独立于语言:JSON 使用 Javascript语法来描述数据对象,但是 JSON 仍然独立于语言和平台。JSON 解析器和 JSON 库支持许多不同的编程语言。 目前非常多的动态(PHP,JSP,.NET)编程语言都支持JSON。
  • JSON 具有自我描述性,更易理解

JSON 实例

这个 sites 对象是包含 3 个站点记录(对象)的数组。

语法:

JSON 键必须是字符串,字符串必须使用双引号包裹。

JSON 值可以是:

  • 数字(整数或浮点数)
  • 字符串(在双引号中)
  • 逻辑值(true 或 false)
  • 数组(在中括号中)
  • 对象(在大括号中)
  • null

JSON的Java解析库 – Jackson

市面有很多用于解析JSON的用Java编写的第三方库,比如Jackson(fasterxml,可靠、灵活、可定制,使用广泛),Gson(Google, 轻量、简洁),国内比较著名的有FastJson(Alibaba个人开源,特点是快,虽然很有多历史漏洞,但是作为国人还是要支持一下,可以越来越好),这些库的用法比较类似,这里的案例使用的是Jackson

Jackson的maven依赖

基于Jackson的一个用于解析Json的工具类 JsonSerializerUtil

自定义序列化类 JsonDataSerializer

自定义反序列化类 JsonDataDeserializer

作业

LG Activity Exer 1

需要将书籍(Book) 的信息通过Kafka传输, 书籍的相关字段有 书名 (name), 订购数量(quantityOrdered),单价(unitPrice),请编写一个生产者发送书籍消息, 在编写一个消费者消费书籍消息。

可以自行选择序列化的方式。

总结

实现序列化还有很多比较成熟的第三方序列化库可以使用(如avro,protoBuff等),关于采用什么样的方式去序列化数据还需要根据业务场景自己去定义。

Views: 559

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注