本章节内容:
-
理解 ZooKeeper 架构
-
认识 ZooKeeper 的数据模型
-
ZooKeeper 的操作
- 数据操作
- ACL 权限控制
- Watch 监控
-
理解 ZooKeeper 的内部工作原理
其中,数据模型是最重要的,很多 ZooKeeper 中典型的应用场景都是利用这些基础模块实现的。比如我们可以利用数据模型中的临时节点和 Watch 监控机制来实现一个发布订阅的功能。
ZooKeeper Service
ZooKeeper 作为一个分布式一致性协调框架,给出了在分布式环境下一致性问题的工业解决方案,目前流行的很多开源框架技术背后都有 ZooKeeper 的身影。开发中我们应该如何使用 ZooKeeper?在这之前,我们先要对 ZooKeeper 的基础知识进行全面的掌握。
ZooKeeper 以高可用的集群方式提供集中服务, ZooKeeper 集群被称为 Ensemble(合唱团).
The members of the ensemble are aware of each other’s state. This means that the current in-memory state, transaction logs, and the point-in-time copies of the state of the service are stored in a durable manner in the local data store by the individual hosts that together form the ensemble.
ZooKeeper 数据模型
下面部署一个开发测试环境,并在上面做一些简单的操作。来探索 ZooKeeper 的数据模型:
配置文件
1 2 3 |
tickTime=2000 dataDir=/var/lib/zookeeper clientPort=2181 |
操作 Zookeeper
启动 Zookeeper
1 |
$ bin/zkServer.sh start |
完整的写法
bin/zkServer.sh conf/zoo.cfg
默认使用
zoo.cfg
作为配置文件
查看进程是否启动
1 2 3 |
$ jps 4020 Jps 4001 QuorumPeerMain |
查看状态:
1 2 3 4 |
$ bin/zkServer.sh status ZooKeeper JMX enabled by default Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg Mode: standalone |
启动客户端:
1 |
$ bin/zkCli.sh |
完整的写法:
bin/zkCli.sh -server 127.0.0.1:2181
退出客户端:
1 |
[zk: localhost:2181(CONNECTED) 0] quit |
停止 Zookeeper
1 |
$ bin/zkServer.sh stop |
配置参数解读
Zookeeper 中的配置文件 zoo.cfg 中参数含义解读如下:
-
tickTime =2000
:通信心跳数,Zookeeper 服务器与客户端心跳时间,单位毫秒Zookeeper 使用的基本时间,服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳,时间单位为毫秒。它用于心跳机制,并且设置最小的 session 超时时间为两倍心跳时间。(session 的最小超时时间是 2*tickTime)
-
initLimit =10
:LF 初始通信时限集群中的 Follower 跟随者服务器与 Leader 领导者服务器之间初始连接时能容忍的最多心跳数(tickTime 的数量),用它来限定集群中的 Zookeeper 服务器连接到 Leader 的时限。
-
syncLimit =5
:LF 同步通信时限集群中 Leader 与 Follower 之间的最大响应时间单位,假如响应超过 syncLimit * tickTime,Leader 认为 Follwer 死掉,从服务器列表中删除 Follwer。
-
dataDir
:数据文件目录+数据持久化路径主要用于保存 Zookeeper 中的数据。
-
dataLogDir
:日志文件目录Zookeeper 保存日志文件的目录zoo-sample.cfg 中没有, 需要额外添加
-
clientPort
=2181:客户端连接端口监听客户端连接的端口。
这样单机版的开发环境就已经构建完成了,接下来我们通过 ZooKeeper 提供的 create 命令来创建几个节点,分别是:“/locks”,“/servers”,“/works”:
1 2 3 |
create /locks create /servers create /works |
ZooKeeper 命名空间的层级结构
最终在 ZooKeeper 服务器上会得到一个具有层级关系的数据结构,非常像 Linux 中的文件系统,有一个根目录,下面还有很多子目录。如下图所示:
ZooKeeper 的数据模型有一个根节点(/),根节点下可以创建子节点,并在子节点下可以继续创建下一级节点。ZooKeeper 树中的每一层级用斜杠(/)分隔开,且只能用绝对路径(如“get /work/task1”)的方式查询 ZooKeeper 节点,而不能使用相对路径。
为什么 ZooKeeper 客户端的操作中不支持使用相对路径呢?
因为 ZooKeeper 在底层实现的时候,用节点的完整路径来作为 key 缓存节点数据以提高性能。如果使用相对路径,那么就需要在客户端进行路径拼接,这样会增加客户端的开销,降低性能。
znode 节点类型与特性
知道了 ZooKeeper 的数据模型是一种树形结构,就像在 MySQL 中数据是存在于数据表中,ZooKeeper 中的数据是由多个数据节点最终构成的一个层级的树状结构,和我们在创建 MySOL 数据表时会定义不同类型的数据列字段,ZooKeeper 中的数据节点也分为持久节点、临时节点、有序节点三种类型:
1、持久节点 persistent
我们第一个介绍的是持久节点,这种节点也是在 ZooKeeper 最为常用的,几乎所有业务场景中都会包含持久节点的创建。之所以叫作持久节点是因为一旦将节点创建为持久节点,该数据节点会一直存储在 ZooKeeper 服务器上,即使创建该节点的客户端与服务端的会话关闭了,该节点依然不会被删除。如果我们想删除持久节点,就要显式调用 delete 函数进行删除操作。
2、临时节点 ephemeral
接下来我们来介绍临时节点。从名称上我们可以看出该节点的一个最重要的特性就是临时性。所谓临时性是指,如果将节点创建为临时节点,那么该节点数据不会一直存储在 ZooKeeper 服务器上。当创建该临时节点的客户端会话因超时或发生异常而关闭时,该节点也相应在 ZooKeeper 服务器上被删除。同样,我们可以像删除持久节点一样主动删除临时节点。
在平时的开发中,我们可以利用临时节点的这一特性来做服务器集群内机器运行情况的统计,将集群设置为“/servers”节点,并为集群下的每台服务器创建一个临时节点“/servers/host”,当服务器下线时该节点自动被删除,最后统计临时节点个数就可以知道集群中的运行情况。如图所示.
3、有序节点 sequential
最后我们再说一下有序节点,其实有序节点并不算是一种单独种类的节点,而是在之前提到的持久节点和临时节点特性的基础上,增加了一个节点有序的性质。所谓节点有序是说在我们创建有序节点的时候,ZooKeeper 服务器会自动使用一个单调递增的数字作为后缀,追加到我们创建节点的后边。例如一个客户端创建了一个路径为 works/task- 的有序节点,那么 ZooKeeper 将会生成一个序号并追加到该节点的路径后,最后该节点的路径为 works/task-1, 以后重复创建时序号将递增。通过这种方式我们可以直观的查看到节点的创建顺序。
实际有序节点的编号从 works/task-0000000000 开始, 并递增
zookeeper 3.5.x 中新引入了 container 节点 和 ttl 节点
- container 节点用来存放子节点,如果 container 节点中的子节点为 0 ,则 container 节点在未来(60s 后)会被服务器删除。
- ttl 节点默认禁用,需要通过配置开启, 如果 ttl 节点没有子节点,或者 ttl 节点在 指定的时间内没有被修改则会被服务器删除。
上述这几种数据节点虽然类型不同,但 ZooKeeper 中的每个节点都维护有这些内容:一个二进制数组(byte data[]),用来存储节点的数据、ACL 访问控制信息、子节点数据(`因为临时节点不允许有子节点,所以其子节点字段为 null),除此之外每个数据节点还有一个记录自身状态信息的字段 stat。下面我们详细说明节点的状态信息。
节点的状态结构
每个节点都有属于自己的状态信息,这就很像我们每个人的身份信息一样,我们打开之前的客户端,执行 stat /zk_test,可以看到控制台输出了一些信息,这些就是节点状态信息。
每一个节点都有一个自己的状态属性,记录了节点本身的一些信息,
数据节点的版本
这里我们重点讲解一下版本相关的属性,在 ZooKeeper 中为数据节点引入了版本的概念,每个数据节点有 3 种类型的版本信息,对数据节点的任何更新操作都会引起版本号的变化。ZooKeeper 的版本信息表示的是对节点数据内容、子节点信息或者是 ACL 信息的修改次数。
ZooKeeper API 操作
Operation | Description |
---|---|
create | Creates a znode in a specified path of the ZooKeeper namespace |
delete | Deletes a znode from a specified path of the ZooKeeper namespace |
exists | Checks if a znode exists in the path |
getChildren | Gets a list of children of a znode |
getData | Gets the data associated with a znode |
setData | Sets/writes data into the data field of a znode |
getACL | Gets the ACL of a znode |
setACL | Sets the ACL in a znode |
sync | Synchronizes a client’s view of a znode with ZooKeeper |
ZooKeeper 还支持通过名为 multi 的操作批量更新 znodes。这一组批量操作要么全部成功,要么全部失败。
ZooKeeper APIs 中的更新操作,如delete或setData,必须指定要更新的 znode 的版本号。(设置为-1 则不对版本进行校验)
版本号可以通过 exists()调用返回的状态信息中获取。
Read and Write operation in ZooKeeper
-
读请求:在客户端当前连接的 ZooKeeper 服务器本地处理。
-
写请求: These are forwarded to the leader and go through majority consensus before a response is generated.
- majority consensus(多数共识): 一般情况下,ZooKeeper 集群中的服务器数量为奇数,这样可以保证 majority consensus 的一致性。例如,当 ZooKeeper 集群中有 3 台服务器时,只要有 2 台服务器同意,就可以保证 majority consensus 的一致性。当 ZooKeeper 集群中有 5 台服务器时,只要有 3 台服务器同意,就可以保证 majority consensus 的一致性。(超过半数)
ZooKeeper Watch 操作
现在让我们来学习 ZooKeeper 又一关键技术——Watch 监控机制,并用它实现一个发布订阅功能。
发布订阅: 订阅者订阅某个主题,当主题有更新时,订阅者会收到通知。如:
- 订阅者订阅了某个新闻网站的新闻,当网站有新闻更新时,订阅者会收到通知。
- 订阅者订阅了某个电商网站的商品,当网站有商品更新时,订阅者会收到通知。
- 订阅者订阅了某个微博用户的动态,当用户有动态更新时,订阅者会收到通知。
- 订阅者订阅了某个微信公众号的文章,当公众号有文章更新时,订阅者会收到通知。
- 订阅者订阅了某个微信群的消息,当群里有消息更新时,订阅者会收到通知。
Watch(监听器) 是一种简单的机制,让客户端获得关于 ZooKeeper 集合中变化的通知。客户端可以在读取特定 znode 上设置的监听。对于 znode(客户端在其上注册了监听器)的任何更改,监听器都会向已注册的客户端发送通知。
Znode 的更改事件包括:
- Znode 本身的数据变化
- 以及其下的子节点变化。
设置的监听器一经触发就会移除。因此如果客户端希望再次收到通知,则必须重新设置监听。
另外当连接会话过期时,客户端将与服务器断开连接,相关的监听器也将被删除。
对于在指定 znode 上注册的监听器, 会触发事件通知的操作有:
-
对 znode 数据的任何更改,例如使用 setData 操作将新数据写入 znode 的数据字段时。
-
对 znode 子节点的任何更改。例如,使用 delete 操作删除 znode 的子节点。
-
创建或删除当前的 znode。
监听和通知机制
Watch 机制如何实现
正如我们可以通过点击视频网站上的”收藏“按钮来订阅我们喜欢的内容,ZooKeeper 的客户端也可以通过 Watch 机制来订阅当服务器上某一节点的数据或状态发生变化时收到相应的通知,我们可以通过向 ZooKeeper 客户端的构造方法中传递 Watcher 参数的方式实现:
1 |
new ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) |
上面代码定义了一个了 ZooKeeper 客户端对象实例,并传入三个参数:
- connectString 服务端地址
- sessionTimeout:超时时间
- Watcher:监控事件
这个 Watcher 将作为整个 ZooKeeper 会话期间的上下文 ,一直被保存在客户端 ZKWatchManager 的 defaultWatcher 中。
除此之外,ZooKeeper 客户端也可以通过 getData、exists 和 getChildren 三个接口来向 ZooKeeper 服务器注册 Watcher,从而方便地在不同的情况下添加 Watch 事件:
etData(String path, Watcher watcher, Stat stat)
知道了 ZooKeeper 添加服务器监控事件的方式,下面我们来讲解一下触发通知的条件。ZooKeeper 的 znode 相关的状态和事件包括有:
Watch 机制的底层原理
ZooKeeper 的 Watch 机制是通过 ZKWatchManager
来实现的,它是 ZooKeeper 类的内部类,负责管理所有的 Watcher。
ZKWatchManager 中有三个 HashMap,分别用于存储数据 Watcher、子节点 Watcher 和存在 Watcher。
-
dataWatches:用于存储所有的数据 Watcher,它是一个 HashMap,key 是 znode 的路径,value 是一个 WatcherSet,它是一个 HashSet,用于存储对该 znode 的所有数据 Watcher。
-
childWatches:用于存储所有的子节点 Watcher,它也是一个 HashMap,key 是 znode 的路径,value 是一个 WatcherSet,它是一个 HashSet,用于存储对该 znode 的所有子节点 Watcher。
-
existWatches:用于存储所有的存在 Watcher,它也是一个 HashMap,key 是 znode 的路径,value 是一个 WatcherSet,它是一个 HashSet,用于存储对该 znode 的所有存在 Watcher。
从设计模式角度出发来分析其底层实现:
Watch 机制理解为是分布式环境下的观察者模式。所以接下来我们就以观察者模式的角度点来看看 ZooKeeper 底层 Watch 是如何实现的。
实现观察者模式最核心或者说关键的代码就是创建一个列表来存放观察者。
而在 ZooKeeper 中则是在客户端和服务器端分别实现两个存放观察者列表,即:ZKWatchManager 和 WatchManager。其核心操作就是围绕着这两个展开的
客户端 Watch 注册实现过程
我们先看一下客户端的实现过程,在发送一个 Watch 监控事件的会话请求时,ZooKeeper 客户端主要做了两个工作:
- 标记该会话是一个带有 Watch 事件的请求
- 将 Watch 事件存储到 ZKWatchManager
我们以 getData 接口为例。当发送一个带有 Watch 事件的请求时,客户端首先会把该会话标记为带有 Watch 监控的事件请求,之后通过 DataWatchRegistration 类来保存 watcher 事件和节点的对应关系:
1 2 3 4 5 6 7 8 9 10 11 12 |
public byte[] getData(final String path, Watcher watcher, Stat stat){ ... WatchRegistration wcb = null; if (watcher != null) { wcb = new DataWatchRegistration(watcher, clientPath); } RequestHeader h = new RequestHeader(); request.setWatch(watcher != null); ... GetDataResponse response = new GetDataResponse(); ReplyHeader r = cnxn.submitRequest(h, request, response, wcb); } |
之后客户端向服务器发送请求时,是将请求封装成一个 Packet 对象,并添加到一个等待发送队列 outgoingQueue 中:
1 2 3 4 5 6 7 8 9 |
public Packet queuePacket(RequestHeader h, ReplyHeader r,...) { Packet packet = null; ... packet = new Packet(h, r, request, response, watchRegistration); ... outgoingQueue.add(packet); ... return packet; } |
最后,ZooKeeper 客户端就会向服务器端发送这个请求,完成请求发送后。调用负责处理服务器响应的 SendThread 线程类中的 readResponse 方法接收服务端的回调,并在最后执行 finishPacket()方法将 Watch 注册到 ZKWatchManager 中:
1 2 3 4 5 6 7 |
private void finishPacket(Packet p) { int err = p.replyHeader.getErr(); if (p.watchRegistration != null) { p.watchRegistration.register(err); } ... } |
服务端 Watch 注册实现过程
Zookeeper 服务端处理 Watch 事件基本有 2 个过程:
- 解析收到的请求是否带有 Watch 注册事件
- 将对应的 Watch 事件存储到 WatchManager
FinalRequestProcessor
类中的 processRequest
函数:
当 getDataRequest.getWatch()
值为 True 时,表明该请求需要进行 Watch 监控注册。并通过 zks.getZKDatabase().getData
函数将 Watch 事件注册到服务端的 WatchManager 中。
1 2 3 4 5 6 7 8 9 10 |
public void processRequest(Request request) { ... byte b[] = zks.getZKDatabase().getData( getDataRequest.getPath(), stat, getDataRequest.getWatch() ? cnxn : null ); rsp = new GetDataResponse(b, stat); .. } |
服务端 Watch 事件的触发过程
以 setData
接口即“节点数据内容发生变更”事件为例。在 setData 方法内部执行完对节点数据的变更后,会调用 WatchManager.triggerWatch
方法触发数据变更事件。
1 2 3 4 5 6 7 |
public Stat setData(String path, byte data[], ...){ Stat s = new Stat(); DataNode n = nodes.get(path); ... dataWatches.triggerWatch(path, EventType.NodeDataChanged); return s; } |
下面我们进入 triggerWatch
函数内部。
- 首先,封装了一个具有会话状态、事件类型、数据节点 3 种属性的 WatchedEvent 对象。
- 查询该节点注册的 Watch 事件,如果为空说明该节点没有注册过 Watch 事件。如果存在 Watch 事件则添加到定义的 Wathers 集合中,并在 WatchManager 管理中删除。
- 最后,通过调用 process 方法向客户端发送通知。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
Set<Watcher> triggerWatch(String path, EventType type...) { WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path); Set<Watcher> watchers; synchronized (this) { watchers = watchTable.remove(path); ... for (Watcher w : watchers) { Set<String> paths = watch2Paths.get(w); if (paths != null) { paths.remove(path); } } } for (Watcher w : watchers) { if (supress != null && supress.contains(w)) { continue; } w.process(e); } return watchers; } |
客户端回调的处理过程
客户端使用 SendThread.readResponse()
方法来统一处理服务端的响应。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
if (replyHdr.getXid() == -1) { // -1 means notification ... WatcherEvent event = new WatcherEvent(); event.deserialize(bbia, "response"); ... if (chrootPath != null) { // chroot path is set String serverPath = event.getPath(); if(serverPath.compareTo(chrootPath)==0) event.setPath("/"); ... event.setPath(serverPath.substring(chrootPath.length())); ... } WatchedEvent we = new WatchedEvent(event); ... eventThread.queueEvent( we ); // queue the event } |
-
首先反序列化服务器发送请求头信息
replyHdr.deserialize(bbia, "header")
,并判断相属性字段 xid 的值为 -1,表示该请求响应为通知类型。 -
在处理通知类型时,首先将己收到的字节流反序列化转换成 WatcherEvent 对象。接着判断客户端是否配置了 chrootPath 属性,如果为 True 说明客户端配置了 chrootPath 属性。需要对接收到的节点路径进行 chrootPath 处理。最后调用
eventThread.queueEvent( )
方法将接收到的事件交给 EventThread 线程进行处理.
接下来我们来看一下 EventThread.queueEvent()
方法内部的执行逻辑。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
public Set<Watcher> materialize(...) { Set<Watcher> result = new HashSet<Watcher>(); ... switch (type) { ... case NodeDataChanged: case NodeCreated: synchronized (dataWatches) { addTo(dataWatches.remove(clientPath), result); } synchronized (existWatches) { addTo(existWatches.remove(clientPath), result); } break; .... } return result; } |
首先按照通知的事件类型,从 ZKWatchManager 中查询注册过的客户端 Watch 信息。客户端在查询到对应的 Watch 信息后,会将其从 ZKWatchManager 的管理中删除。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
public void run() { try { isRunning = true; while (true) { Object event = waitingEvents.take(); if (event == eventOfDeath) { wasKilled = true; } else { processEvent(event); } if (wasKilled) synchronized (waitingEvents) { if (waitingEvents.isEmpty()) { isRunning = false; break; } } } ... } |
获取到对应的 Watcher 信息后,将查询到的 Watcher 存储到 waitingEvents 队列中,调用 EventThread 类中的 run 方法会循环取出在 waitingEvents 队列中等待的 Watcher 事件进行处理。
最后调用 processEvent(event)
方法来最终执行实现了 Watcher
接口的 process()
方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
private void processEvent(Object event) { ... if (event instanceof WatcherSetEventPair) { WatcherSetEventPair pair = (WatcherSetEventPair) event; for (Watcher watcher : pair.watchers) { try { watcher.process(pair.event); } catch (Throwable t) { LOG.error("Error while calling watcher ", t); } } } } |
使用 ZooKeeper 实现发布订阅模式
在系统开发的过程中会用到各种各样的配置信息,如数据库配置项、第三方接口、服务地址等,这些配置操作在我们开发过程中很容易完成,但是放到一个大规模的集群中配置起来就比较麻烦了。
我们可以利用 ZooKeeper 的发布订阅功能实现自动完成服务器配置信息的维护。
我们可以把诸如数据库配置项这样的信息存储在 ZooKeeper 数据节点中。如图中的 /confs/data_item1。
服务器集群客户端对该节点添加 Watch 事件监控,当集群中的服务启动时,会读取该节点数据获取数据配置信息。而当该节点数据发生变化时,ZooKeeper 服务器会发送 Watch 事件给各个客户端,集群中的客户端在接收到该通知后,重新读取节点的数据库配置信息。
在前面我们使用 Watch 机制实现了一个分布式环境下的配置管理功能,通过对 ZooKeeper 服务器节点添加数据变更事件,实现当数据库配置项信息变更后,集群中的各个客户端能接收到该变更事件的通知,并获取最新的配置信息。要注意一点是,我们提到 Watch 具有一次性,所以当我们获得服务器通知后要再次添加 Watch 事件。因此如果要继续保持监听, 需要重新注册 Watch 事件。
对于 watch,ZooKeeper 提供了这些保障:
- Watch 与其他事件、其他 watch 以及异步回复都是有序的。 ZooKeeper 客户端库保证所有事件都会按顺序分发。
- 客户端会保障它在看到相应的 znode 的新数据之前接收到 watch 事件。//这保证了在 process()再次利用 zk client 访问时数据是存在的
- 从 ZooKeeper(客户端)接收到的 watch 事件顺序一定和 ZooKeeper 服务所看到的事件顺序是一致的。
关于 Watch 的一些值得注意的事情
- Watch 是一次性触发器,如果得到了一个 watch 事件,而希望在以后发生变更时继续得到通知,应该再设置一个 watch。
- 因为 watch 是一次性触发器,而获得事件再发送一个新的设置 watch 的请求这一过程会有延时,所以无法确保看到了所有发生在 ZooKeeper 上的 一个节点上的事件。所以请处理好在这个时间窗口中可能会发生多次 znode 变更的这种情况。(可以不处理,但至少要意识到这一点)。//也就是说,在 process()中如果处理得慢而没有注册 new watch 时,在这期间有其它事件出现时是不会通知!!
那么这个问题如何解决?
12可以在客户端添加 Watch 事件时,同时指定一个版本号,</br>当 ZooKeeper 服务器发送通知时,会将该节点的最新版本号一并发送给客户端,</br>客户端在收到通知后,可以根据版本号判断该通知是否已经处理过,</br>如果已经处理过则忽略该通知,否则继续处理该通知。这样就能保证客户端能够收到所有的通知。
- 一个 watch 对象或一个函数/上下文对,为一个事件只会被通知一次。比如,如果同一个 watch 对象在同一个文件上分别通过 exists 和 getData 注册了两次,而这个文件之后被删除了,这时这个 watch 对象将只会收到一次该文件的 deletion 通知。//同一个 watch 注册同一个节点多次只会生成一个 event。
- 当从一个服务器上断开时(比如服务器出故障了),在再次连接上之前,将无法获得任何 watch。请使用这些会话事件来进入安全模式:在 disconnected 状态下将不会收到事件,所以程序在此期间应该谨慎行事。
移除 Watch 事件
在前面我们介绍了如何添加 Watch 事件,那么如何移除 Watch 事件呢?ZooKeeper 提供了两种方式来移除 Watch 事件:
-
通过调用 ZooKeeper 的 removeWatches() 方法来移除 Watch 事件。
-
通过调用 ZooKeeper 的 exists()、getData()、getChildren() 等方法来移除 Watch 事件。具体的移除方式是在调用这些方法时,将 watch 参数设置为 null。
使用 ZooKeeper 实现锁
学习了 ZooKeeper 的数据模型和数据节点的相关知识,下面我们通过实际的应用进一步加深理解。
设想这样一个情景:一个购物网站,某个商品库存只剩一件,客户 A 搜索到这件商品并准备下单,但在这期间客户 B 也查询到了该件商品并提交了购买,于此同时,客户 A 也下单购买了此商品,这样就出现了只有一件库存的商品实际上卖出了两件的情况。为了解决这个问题,我们可以在客户 A 对商品进行操作的时候对这件商品进行锁定从而避免这种超卖的情况发生。
实现锁的方式有很多中,这里我们主要介绍两种:悲观锁、乐观锁。
悲观锁
悲观锁认为进程对临界区的竞争总是会出现,为了保证进程在操作数据时,该条数据不被其他进程修改。数据会一直处于被锁定的状态。
我们假设一个具有 n 个进程的应用,同时访问临界区资源,我们通过进程创建 ZooKeeper 节点 /locks 的方式获取锁。
线程 a 通过成功创建 ZooKeeper 节点“/locks”的方式获取锁后继续执行,如下图所示:
这时进程 b 也要访问临界区资源,于是进程 b 也尝试创建“/locks”节点来获取锁,因为之前进程 a 已经创建该节点,所以进程 b 创建节点失败无法获得锁。
这样就实现了一个简单的悲观锁,不过这也有一个隐含的问题,就是当进程 a 因为异常中断导致 /locks 节点始终存在,其他线程因为无法再次创建节点而无法获取锁,这就产生了一个死锁问题。针对这种情况我们可以通过将节点设置为临时节点的方式避免。并通过在服务器端添加监听事件来通知其他进程重新获取锁。
乐观锁
乐观锁认为,进程对临界区资源的竞争不会总是出现,所以相对悲观锁而言。加锁方式没有那么激烈,不会全程的锁定资源,而是在数据进行提交更新的时候,对数据的冲突与否进行检测,如果发现冲突了,则拒绝操作。
乐观锁基本可以分为读取、校验、写入三个步骤。
CAS(Compare-And-Swap),即比较并替换,就是一个乐观锁的实现。CAS 有 3 个操作数,内存值 V,旧的预期值 A,要修改的新值 B。当且仅当预期值 A 和内存值 V 相同时,将内存值 V 修改为 B,否则什么都不做。
在 ZooKeeper 中的 version 属性就是用来实现乐观锁机制中的“校验”的,ZooKeeper 每个节点都有数据版本的概念,在调用更新操作的时候,假如有一个客户端试图进行更新操作,它会携带上次获取到的 version 值进行更新。而如果在这段时间内,ZooKeeper 服务器上该节点的数值恰好已经被其他客户端更新了,那么其数据版本一定也会发生变化,因此肯定与客户端携带的 version 无法匹配,便无法成功更新,因此可以有效地避免一些分布式更新的并发问题。
在 ZooKeeper 的底层实现中,当服务端处理 setDataRequest 请求时,首先会调用 checkAndIncVersion 方法进行数据版本校验。ZooKeeper 会从 setDataRequest 请求中获取当前请求的版本 version,同时通过 getRecordForPath 方法获取服务器数据记录 nodeRecord, 从中得到当前服务器上的版本信息 currentversion。如果 version 为 -1,表示该请求操作不使用乐观锁,可以忽略版本对比;如果 version 不是 -1,那么就对比 version 和 currentversion,如果相等,则进行更新操作,否则就会抛出 BadVersionException 异常中断操作。
总结
本节课主要介绍了 ZooKeeper 的基础知识点——数据模型。并深入介绍了节点类型、stat 状态属性等知识,并利用目前学到的知识解决了集群中服务器运行情况统计、悲观锁、乐观锁等问题。这些知识对接下来的课程至关重要,请务必掌握。
Views: 485