Kafka生产者中的配置项key.serializer
和value.serializer
指示如何将用户通过其ProducerRecord
提供的键和值对象转换为字节。对于简单的字符串或字节类型,可以使用包含的ByteArraySerializer
或StringSerializer
进行序列化操作。
kafka在发送或者接收消息的时候实际是使用byte[]字节型数组进行传输的。但是我们平常使用的时候,不但可以使用byte[],还可以使用int、short、long、float、double、String等数据类型,这是因为在我们使用这些数据类型的时候,kafka根据我们指定的序列化和反序列化方式转成byte[]类型之后再进行传输来提高传输效率。
通常我们在使用kakfa发送或者接受消息的时候都需要指定消息的key和value序列化方式,如生产者我们可以设置value.serializer
为org.apache.kafka.common.serialization.StringSerializer
来设置value的序列化方式为字符串,即我们可以发送string类型的消息。目前kafka原生支持的序列化和反序列化方式如下两表所示,这些原生的序列化和反序列化的类都是在org.apache.kafka.common.serialization
包之下:
kafka序列化方式表
序列化方式 | 对应java数据类型 | 说明 |
---|---|---|
ByteArraySerializer | byte[] | 原生类型 |
ByteBufferSerializer | ByteBuffer | 关于ByteBuffer |
IntegerSerializer | Interger | |
ShortSerializer | Short | |
LongSerializer | Long | |
DoubleSerializer | Double | |
StringSerializer | String |
kafka反序列化方式表
序列化方式 | 对应java数据类型 | 说明 |
---|---|---|
ByteArrayDeserializer | byte[] | 原生类型 |
ByteBufferDeserializer | ByteBuffer | 关于ByteBuffer |
IntegerDeserializer | Interger | |
ShortDeserializer | Short | |
LongDeserializer | Long | |
DoubleDeserializer | Double | |
StringDeserializer | String |
Java原生类型的序列化和反序列化
上面我们了解一些关于kafka原生的一些序列化和反序列化方式。它们究竟是如实现的呢?以string类型为例子,我们看一下,kafka如何实现序列化/反序列化的。
kafka序列化/反序列化方式的实现代码在org.apache.kafka.common.serialization包下。
String 类型的序列化类
我们查看org.apache.kafka.common.serialization.StringSerializer这个类。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
package org.apache.kafka.common.serialization; import org.apache.kafka.common.errors.SerializationException; import java.io.UnsupportedEncodingException; import java.util.Map; /** * String encoding defaults to UTF8 and can be customized by setting the property key.serializer.encoding, * value.serializer.encoding or serializer.encoding. The first two take precedence over the last. */ public class StringSerializer implements Serializer<String> { private String encoding = "UTF8"; @Override public void configure(Map<String, ?> configs, boolean isKey) { String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding"; Object encodingValue = configs.get(propertyName); if (encodingValue == null) encodingValue = configs.get("serializer.encoding"); if (encodingValue instanceof String) encoding = (String) encodingValue; } @Override public byte[] serialize(String topic, String data) { try { if (data == null) return null; else return data.getBytes(encoding); } catch (UnsupportedEncodingException e) { throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding); } } @Override public void close() { // nothing to do } } |
由上面的代码我们可以看出:
- String的序列化类是继承了
Serializer
接口,指定<String>
泛型,然后实现的Serializer
接口的configure()
、serialize()
、close()
方法。代码重点的实现是在serialize()
,可以看出这个方法将我们传入的String类型的数据,简单的通过data.getBytes(encoding)
方法进行了序列化。
String 类新的反序列化类
我们查看org.apache.kafka.common.serialization.StringDeserializer
这个类。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
package org.apache.kafka.common.serialization; import org.apache.kafka.common.errors.SerializationException; import java.io.UnsupportedEncodingException; import java.util.Map; /** * String encoding defaults to UTF8 and can be customized by setting the property key.deserializer.encoding, * value.deserializer.encoding or deserializer.encoding. The first two take precedence over the last. */ public class StringDeserializer implements Deserializer<String> { private String encoding = "UTF8"; @Override public void configure(Map<String, ?> configs, boolean isKey) { String propertyName = isKey ? "key.deserializer.encoding" : "value.deserializer.encoding"; Object encodingValue = configs.get(propertyName); if (encodingValue == null) encodingValue = configs.get("deserializer.encoding"); if (encodingValue instanceof String) encoding = (String) encodingValue; } @Override public String deserialize(String topic, byte[] data) { try { if (data == null) return null; else return new String(data, encoding); } catch (UnsupportedEncodingException e) { throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding); } } @Override public void close() { // nothing to do } } |
同样,由上面的代码我们可以看出:
- String类型的反序列化类是继承了
Deserializer
接口,指定<String>
泛型,然后实现的Deserializer
接口的configure()
、deserialize()
、close()
方法。代码重点的实现是在deserialize()
,可以看出这个方法将我们传入的byte[]
类型的数据,简单的通过return new String(data, encoding)
方法进行了反序列化得到了String类型的数据。
复杂Java对象的序列化及反序列化
通过上面,我们对kafka原生序列化/反序列化方式的了解,我们可以看出,kafka实现序列化/反序列化可以简单的总结为两步,第一步实现序列化Serializer
或者反序列化Deserializer
接口。第二步实现接口方法,将指定类型序列化成byte[]
或者将byte[]
反序列化成指定数据类型(String
)。
1 2 |
graph LR 生产者(String)--序列化-->传输和持久("Byte[]")--反序列化-->消费者(String) |
由于原生类型不支持复杂对象类型,所以接下来,我们来实现对复杂对象类型的自定义序列化/反序列化方式。
这里我们介绍两种方式:
利用Buffer缓冲区
问题陈述:
Chapter 4 Activity 4.2
Kafka 消息也可以传递复杂对象类型。通常,这些对象将有多个字段。例如供应商对象。
实体类 Supplier
1 2 3 4 5 |
@Data public class Supplier { private final int supplierId; private final String supplierName; } |
如果希望发送此类自定义对象或类型结构,则需要实现自定义序列化器和反序列化器。实现需要自定义序列化器和反序列化器。
解决方案 为了实现上述要求,利用缓冲区(Buffer,内存中预留指定大小的存储空间)用来对输入/输出(I/O)的数据作临时存储,这里由于需要保存字节数据所以我们使用ByteBuffer
。
自定义序列化类 SupplierSerializer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
public class SupplierSerializer implements Serializer<Supplier> { private final String encoding = "UTF8"; @Override public void configure(Map<String, ?> configs, boolean isKey) { } @Override public byte[] serialize(String topic, Supplier data) { int sizeOfName; byte[] serializedName; try { if (data == null) { return null; } // 根据 字符编码 转换成字节数组 serializedName = data.getName().getBytes(encoding); // 通过sizeOfName可以帮助在反序列化的时候 // 可以知道要取多少个字符来根据指定编码格式转换成回字符串 sizeOfName = serializedName.length; ByteBuffer buf = ByteBuffer.allocate(4 + 4 + sizeOfName); buf.putInt(data.getID()); // supplierId 是int类型占4 bytes buf.putInt(sizeOfName); // supplierName 长度 buf.put(serializedName); // supplierName 内容 return buf.array(); } catch (Exception e) { throw new SerializationException("Error when serializing Supplier to byte[]"); } } @Override public void close() { // nothing to do } } |
自定义反序列化类 SupplierDeserializer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
public class SupplierDeserializer implements Deserializer<Supplier> { private final String encoding = "UTF8"; @Override public void configure(Map<String, ?> configs, boolean isKey) { //Nothing to configure } @Override public Supplier deserialize(String topic, byte[] data) { try { if (data == null) { System.out.println("Null recieved at deserialize"); return null; } ByteBuffer buf = ByteBuffer.wrap(data); int id = buf.getInt(); // 获取 supplierId的 int类型的值 // 获取 supplierName长度 int sizeOfName = buf.getInt(); // 创建相同长度的字节数组 byte[] nameBytes = new byte[sizeOfName]; // get(nameBytes) : // 从当前位置开始相对读,读nameBytes.length个byte, // 并写入dst下标从offset到offset+length的区域 buf.get(nameBytes); // 根据获取 supplierName String deserializedName = new String(nameBytes, encoding); return new Supplier(id, deserializedName); } catch (Exception e) { throw new SerializationException("Error when deserializing byte[] to Supplier"); } } @Override public void close() { // nothing to do } } |
JSON格式的序列化及反序列化
实体类 JsonData
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
import java.util.Date; import java.util.Map; // lombok @Data @AllArgsConstructor @NoArgsConstructor public class JsonData { @JsonProperty("lng") private double longitude; @JsonProperty("lat") private double latitude; private double weight; private Date timestamp; } |
什么是 JSON ?
- JSON 指的是 JavaScript 对象表示法(JavaScript Object Notation)
- JSON 是轻量级的文本数据交换格式,类似 XML
- JSON 比 XML 更小、更快,更易解析。
- JSON 独立于语言:JSON 使用 Javascript语法来描述数据对象,但是 JSON 仍然独立于语言和平台。JSON 解析器和 JSON 库支持许多不同的编程语言。 目前非常多的动态(PHP,JSP,.NET)编程语言都支持JSON。
- JSON 具有自我描述性,更易理解
JSON 实例
1 2 3 4 5 6 7 |
{ "sites": [ { "name":"NIIT" , "url":"www.niit.com.cn" }, { "name":"Google" , "url":"www.google.com" }, { "name":"WeiBo" , "url":"www.weibo.com" } ] } |
这个 sites 对象是包含 3 个站点记录(对象)的数组。
语法:
JSON 键必须是字符串,字符串必须使用双引号包裹。
JSON 值可以是:
- 数字(整数或浮点数)
- 字符串(在双引号中)
- 逻辑值(true 或 false)
- 数组(在中括号中)
- 对象(在大括号中)
- null
JSON的Java解析库 – Jackson
市面有很多用于解析JSON的用Java编写的第三方库,比如Jackson(fasterxml,可靠、灵活、可定制,使用广泛),Gson(Google, 轻量、简洁),国内比较著名的有FastJson(Alibaba个人开源,特点是快,虽然很有多历史漏洞,但是作为国人还是要支持一下,可以越来越好),这些库的用法比较类似,这里的案例使用的是Jackson
Jackson的maven依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
<dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> <version>2.11.0</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.11.0</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-annotations</artifactId> <version>2.11.0</version> </dependency> |
基于Jackson的一个用于解析Json的工具类 JsonSerializerUtil
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
public class JsonSerializerUtil { /** * JSON序列化 * * @param object 对象 * @return JSON字符串 */ public static String serialize(Object object) { ObjectMapper mapper = new ObjectMapper(); try { return mapper.writeValueAsString(object); } catch (JsonProcessingException e) { e.printStackTrace(); return ""; } } /** * JSON字符串反序列化 * * @param jsonStr JSON字符串 * @return a Map */ public static Map deserialize(String jsonStr) { try { return deserialize(jsonStr, Map.class); } catch (Exception e) { e.printStackTrace(); return new HashMap(); } } public static <T> T deserialize(String jsonStr, Class<T> classType) throws Exception { return new ObjectMapper().readValue(jsonStr, classType); } } |
自定义序列化类 JsonDataSerializer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
public class JsonDataSerializer implements Serializer<JsonData> { @Override public void configure(Map<String, ?> configs, boolean isKey) { } @Override public byte[] serialize(String topic, JsonData data) { return JsonSerializerUtil.serialize(data).getBytes(); } @Override public void close() { // nothing to do } } |
自定义反序列化类 JsonDataDeserializer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
public class JsonDataDeserializer implements Deserializer<JsonData> { private final String encoding = "UTF8"; @Override public void configure(Map<String, ?> configs, boolean isKey) { //Nothing to configure } @Override public JsonData deserialize(String topic, byte[] data) { if (data == null) { return null; } JsonData jsonData = null; try { String jsonString = new String(data, encoding); jsonData = JsonSerializerUtil.deserialize(jsonString, JsonData.class); } catch (Exception e) { e.printStackTrace(); } return jsonData; } @Override public void close() { // nothing to do } } |
作业
LG Activity Exer 1
需要将书籍(Book) 的信息通过Kafka传输, 书籍的相关字段有 书名 (name), 订购数量(quantityOrdered),单价(unitPrice),请编写一个生产者发送书籍消息, 在编写一个消费者消费书籍消息。
可以自行选择序列化的方式。
总结
实现序列化还有很多比较成熟的第三方序列化库可以使用(如avro,protoBuff等),关于采用什么样的方式去序列化数据还需要根据业务场景自己去定义。
Views: 559