Zookeeper提供了分布式数据的发布/订阅功能,Zookeeper引入了Watcher机制来实现分布式的通知功能,Zookeeper允许客户端向服务端注册一个Watcher监听,当服务端的一些指定事件触发了这个Watcher,则会通知客户端进行相应的处理,Watcher机制概述如下图:
从上图可以看出,客户端在向Zookeeper注册Watcher时会同步把该Watcher存储在WatcherManger中,当Zookeeper服务器端触发Watcher时,会通知客户端,然后从WatcherManger中取出对应的Watcher执行相应的回调逻辑。
Watcher接口
Watcher接口用于表示一个标准的事件处理器,里面就一个抽象方法
abstract public void process(WatchedEvent event);
WatchedEvent
/*** A WatchedEvent represents a change on the ZooKeeper that a Watcher* is able to respond to. The WatchedEvent includes exactly what happened,* the current state of the ZooKeeper, and the path of the znode that* was involved in the event.*/
public class WatchedEvent {//事件触发时Zookeeper的状态final private KeeperState keeperState;//触发的事件类型final private EventType eventType;//事件涉及到的节点路径private String path;//省略一些方法/*** Convert WatchedEvent to type that can be sent over network*/public WatcherEvent getWrapper() {return new WatcherEvent(eventType.getIntValue(), keeperState.getIntValue(), path);}
}
WatchedEvent和WatcherEvent的区别和联系
本质上讲两者是同一个事物,都是对一个服务端事件的封装,区别是WatchedEvent是一个逻辑事件,用于客户端与服务端执行过程中的逻辑对象,而WatcherEvent实现了序列化接口,可以用于网络传输,WatchedEvent类提供了getWapper方法把WatchedEvent转换为WatcherEvent。
不管是WatcherEvent,还是WatchedEvent,其对Zookeeper服务端事件的封装是及其简单的,只会包含事件触发时服务端的状态,具体事件类型,涉及到的节点的路径。
如:KeeperState:SyncConnected
EventType:NodeChanged
Path:/test
至于节点的数据由什么变为什么,需要重新加载一次,从Watcher通知里面是无法得知的。
KeeperState,事件发生时Zookeeper的状态
/*** Enumeration of states the ZooKeeper may be at the event*/public enum KeeperState {/** Unused, this state is never generated by the server */@DeprecatedUnknown (-1),/** The client is in the disconnected state - it is not connected* to any server in the ensemble. */Disconnected (0),/** Unused, this state is never generated by the server */@DeprecatedNoSyncConnected (1),/** The client is in the connected state - it is connected* to a server in the ensemble (one of the servers specified* in the host connection parameter during ZooKeeper client* creation). */SyncConnected (3),/*** Auth failed state*/AuthFailed (4),/*** The client is connected to a read-only server, that is the* server which is not currently connected to the majority.* The only operations allowed after receiving this state is* read operations.* This state is generated for read-only clients only since* read/write clients aren't allowed to connect to r/o servers.*/ConnectedReadOnly (5),/*** SaslAuthenticated: used to notify clients that they are SASL-authenticated,* so that they can perform Zookeeper actions with their SASL-authorized permissions.*/SaslAuthenticated(6),/** The serving cluster has expired this session. The ZooKeeper* client connection (the session) is no longer valid. You must* create a new client connection (instantiate a new ZooKeeper* instance) if you with to access the ensemble. */Expired (-112);
}
EventType:Zookeeper上事件的类型
/*** Enumeration of types of events that may occur on the ZooKeeper*/public enum EventType {None (-1),NodeCreated (1),NodeDeleted (2),NodeDataChanged (3),NodeChildrenChanged (4);
}
注意NodeDataChanged表示dataVersion的表更。NodeChildrenChanged表示的节点的子节点列表发生变更,即新增或者删除子节点,但是子节点的数据发生变化是不会触发的。
客户端是如何注册Watcher的?
创建一个Zookeeper时可以向构造函数中传入一个默认的Watcher,构造函数如下(省略其中一些构造函数)
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,boolean canBeReadOnly)throws IOException{//设置默认的WatcherwatchManager.defaultWatcher = watcher;//地址解析器解析地址ConnectStringParser connectStringParser = new ConnectStringParser(connectString);//记录Zookeeper地址相关的信息HostProvider hostProvider = new StaticHostProvider(connectStringParser.getServerAddresses());//管理着客户端的socket i/ocnxn = new ClientCnxn(connectStringParser.getChrootPath(),hostProvider, sessionTimeout, this, watchManager,getClientCnxnSocket(), canBeReadOnly);cnxn.start();}
设置默认的watcher到ZKWatcherManger的defaultWatcher中,除了通过这种方式注册watcher之外,还可以通过 getData、exists和 getChildren三个接口进行注册Watcher。采用哪种方式注册逻辑都是一样的,此处使用getData进行说明。
getData接口的定义如下(去除了抛出的异常等信息)
public byte[] getData(String path, boolean watch, Stat stat);
public byte[] getData(final String path, Watcher watcher, Stat stat);
在调用getData接口注册Watcher时详细逻辑如下:
public byte[] getData(final String path, Watcher watcher, Stat stat)throws KeeperException, InterruptedException{final String clientPath = path;//验证地址的合法性PathUtils.validatePath(clientPath);// the watch contains the un-chroot pathWatchRegistration wcb = null;if (watcher != null) {wcb = new DataWatchRegistration(watcher, clientPath);}//解析获得服务端地址,主要是前缀处理final String serverPath = prependChroot(clientPath);//请求头RequestHeader h = new RequestHeader();//设置类型h.setType(ZooDefs.OpCode.getData);//设置请求使用的协议,后续会进行介绍GetDataRequest request = new GetDataRequest();//设置服务端路径request.setPath(serverPath);//设置是否注册watcher,只是打一个标识,并没有真正的传递Watcherrequest.setWatch(watcher != null);//响应体GetDataResponse response = new GetDataResponse();//获得响应头,通过网络调用获得结果并进行后续的处理ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);if (r.getErr() != 0) {throw KeeperException.create(KeeperException.Code.get(r.getErr()),clientPath);}if (stat != null) {DataTree.copyStat(response.getStat(), stat);}return response.getData();}
从上面的代码中可以看出,最终的调用就是
//Zookeeper中的调用处
ReplyHeader r = cnxn.submitRequest(h,request,response,wcb);//ClientCnxn中的方法submitRequest
public ReplyHeader submitRequest(RequestHeader h, Record request,Record response, WatchRegistration watchRegistration)throws InterruptedException {ReplyHeader r = new ReplyHeader();Packet packet = queuePacket(h, r, request, response, null, null, null,null, watchRegistration);synchronized (packet) {while (!packet.finished) {packet.wait();}}return r;}//Packet中的queuePacket方法
Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,Record response, AsyncCallback cb, String clientPath,String serverPath, Object ctx, WatchRegistration watchRegistration){Packet packet = null;//此处还没有为Packet生成Xid,在发送的时候生成,在接口ClientCnxnSocket::doIO()synchronized (outgoingQueue) {packet = new Packet(h, r, request, response, watchRegistration);packet.cb = cb;packet.ctx = ctx;packet.clientPath = clientPath;packet.serverPath = serverPath;if (!state.isAlive() || closing) {conLossPacket(packet);} else {// If the client is asking to close the session then// mark as closingif (h.getType() == OpCode.closeSession) {closing = true;}//加入发送队列中等待发送outgoingQueue.add(packet);}}sendThread.getClientCnxnSocket().wakeupCnxn();return packet;}
在Zookeeper中,Packet可以被看做一个最小的通信协议单元,用于进行客户端与服务端之间的通信传输,任何需要传输的对象都需要被包装为一个Packet对象,同时WatchRegistration 也被包装到Packet对象中,然后放入发送队列中等待发送。
随后Zookeeper的客户端发送这个请求,并等待请求的响应,完成请求后返回,会由SendThread的readResponse进行响应。finishPacket方法会从Packet中取出对应的Watcher并注册到ZKWatcherManager中。
private void finishPacket(Packet p) {if (p.watchRegistration != null) {//注册到ZKWatcherManager中p.watchRegistration.register(p.replyHeader.getErr());}if (p.cb == null) {synchronized (p) {p.finished = true;p.notifyAll();}} else {p.finished = true;//事件线程进行处理eventThread.queuePacket(p);}}//WatchRegistration.register方法如下/*** Register the watcher with the set of watches on path.* @param rc the result code of the operation that attempted to* add the watch on the path.*/public void register(int rc) {if (shouldAddWatch(rc)) {Map<String, Set<Watcher>> watches = getWatches(rc);synchronized(watches) {Set<Watcher> watchers = watches.get(clientPath);if (watchers == null) {watchers = new HashSet<Watcher>();watches.put(clientPath, watchers);}watchers.add(watcher);}}}
注册流程图如下:
注:客户端注册的Watcher是不会发送到服务端的。可以从Packet的createBB方法中看到序列化时没有处理Watcher,代码如下:
public void createBB() {try {ByteArrayOutputStream baos = new ByteArrayOutputStream();BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);boa.writeInt(-1, "len"); // We'll fill this in laterif (requestHeader != null) {//序列化headerrequestHeader.serialize(boa, "header");}if (request instanceof ConnectRequest) {request.serialize(boa, "connect");boa.writeBool(readOnly, "readOnly");} else if (request != null) {//序列化requestrequest.serialize(boa, "request");}baos.close();this.bb = ByteBuffer.wrap(baos.toByteArray());this.bb.putInt(this.bb.capacity() - 4);this.bb.rewind();} catch (IOException e) {LOG.warn("Ignoring unexpected exception", e);}}
可以看到只会处理header和request(request中有是否注册Watcher的标志),对于Watcher,没有序列化,当然也不会传输到服务端。
但是Watcher不传到服务端,服务端是怎么知道有Watcher存在的?又是怎么实现通知的?
Zookeeper服务端处理Watcher的序列图
FinalRequestProcessor.processRequest()中会判断当前请求是否需要注册Watcher,这个是由客户端传递过来时确定的
case OpCode.getData: {/**此处省略掉部分代码*///getDataRequest.getWatch() 为true表示需要注册Watcherbyte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,getDataRequest.getWatch() ? cnxn : null);rsp = new GetDataResponse(b, stat);break;}
如果需要注册Watcher,于是就会将当前的ServerCnxn进行注册,我们看下ServerCnxn的定义
/*** Interface to a Server connection - represents a connection from a client* to the server.* 一个Server连接,代表一个客户端与服务端的连接*/
public abstract class ServerCnxn implements Stats, Watcher {//省略部分代码public abstract void process(WatchedEvent event);//省略部分代码
}
服务端的Watcher统一由WatchManager进行管理
/*** This class manages watches. It allows watches to be associated with a string* and removes watchers and their watches in addition to managing triggers.*/
public class WatchManager {private static final Logger LOG = LoggerFactory.getLogger(WatchManager.class);//从数据节点的角度来托管Watcher,key表示path,Value表示该节点注册的Watcherprivate final HashMap<String, HashSet<Watcher>> watchTable =new HashMap<String, HashSet<Watcher>>();//从Watcher的粒度来控制事件触发时需要触发的数据节点private final HashMap<Watcher, HashSet<String>> watch2Paths =new HashMap<Watcher, HashSet<String>>();public synchronized void addWatch(String path, Watcher watcher) {HashSet<Watcher> list = watchTable.get(path);if (list == null) {// don't waste memory if there are few watches on a node// rehash when the 4th entry is added, doubling size thereafter// seems like a good compromiselist = new HashSet<Watcher>(4);watchTable.put(path, list);}list.add(watcher);HashSet<String> paths = watch2Paths.get(watcher);if (paths == null) {// cnxns typically have many watches, so use default cap herepaths = new HashSet<String>();watch2Paths.put(watcher, paths);}paths.add(path);}//省略部分代码
}
Watcher的触发
上面了解了Watcher的注册,会将ServerCnxn进行注册到WatcherManager中,前面了解过事件的类型(EventType)可以是NodeDataChanged,NodeDataChanged的触发条件是“Watcher监听的数据节点的内容发生变更”。setData的具体实现如下所示
public Stat setData(String path, byte data[], int version, long zxid,long time) throws KeeperException.NoNodeException {//节点信息Stat s = new Stat();//先获取DataNode n = nodes.get(path);if (n == null) {throw new KeeperException.NoNodeException();}byte lastdata[] = null;synchronized (n) {lastdata = n.data;//设置节点数据n.data = data;n.stat.setMtime(time);n.stat.setMzxid(zxid);n.stat.setVersion(version);n.copyStat(s);}// now update if the path is in a quota subtree.String lastPrefix;if((lastPrefix = getMaxPrefixWithQuota(path)) != null) {this.updateBytes(lastPrefix, (data == null ? 0 : data.length)- (lastdata == null ? 0 : lastdata.length));}//触发WatcherdataWatches.triggerWatch(path, EventType.NodeDataChanged);return s;}
设置数据后调用WatchManager的triggerWatch方法来触发Watcher
public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {//1、事件封装WatchedEvent e = new WatchedEvent(type,KeeperState.SyncConnected, path);HashSet<Watcher> watchers;synchronized (this) {//2、查询Watcher并移除Watcher,所以可以看出Watcher是一次性的,使用后会失效watchers = watchTable.remove(path);if (watchers == null || watchers.isEmpty()) {if (LOG.isTraceEnabled()) {ZooTrace.logTraceMessage(LOG,ZooTrace.EVENT_DELIVERY_TRACE_MASK,"No watchers for " + path);}return null;}for (Watcher w : watchers) {HashSet<String> paths = watch2Paths.get(w);if (paths != null) {paths.remove(path);}}}for (Watcher w : watchers) {if (supress != null && supress.contains(w)) {continue;}//3、调用Watcher接口的process方法处理w.process(e);}return watchers;}
可以看出Watcher是一次性的,使用后会失效。
前面说过注册时其实注册的就是ServerCnxn,ServerCnxn的实现有NIOServerCnxn 和 NettyServerCnxn....。实现逻辑都差不多,此处以NIOServerCnxn进行说明。
//此方法经过处理,去除掉日志等
public void process(WatchedEvent event) {//参数依次是 xid zxid err ,将ReplyHeader中的参数设置为-1,标识这是一个通知 ReplyHeader h = new ReplyHeader(-1, -1L, 0);//WatchedEvent包装为WatcherEvent,方便进行序列化传输WatcherEvent e = event.getWrapper();try {//发送响应回客户端sendResponse(h, e, "notification");} catch (IOException e1) {close();}}
客户端回掉Watcher
由SendThread的readResponse方法进行接收。
void readResponse(ByteBuffer incomingBuffer) throws IOException {ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);ReplyHeader replyHdr = new ReplyHeader();//反序列化获得头部replyHdr.deserialize(bbia, "header");//***省略了部分代码***// // -1 标识这是一个通知if (replyHdr.getXid() == -1) { WatcherEvent event = new WatcherEvent();event.deserialize(bbia, "response");// convert from a server path to a client pathif (chrootPath != null) {String serverPath = event.getPath();if(serverPath.compareTo(chrootPath)==0)event.setPath("/");else if (serverPath.length() > chrootPath.length())event.setPath(serverPath.substring(chrootPath.length()));else {LOG.warn("Got server path " + event.getPath()+ " which is too short for chroot path "+ chrootPath);}}//还原得到WatchedEventWatchedEvent we = new WatchedEvent(event);if (LOG.isDebugEnabled()) {LOG.debug("Got " + we + " for sessionid 0x"+ Long.toHexString(sessionId));}//加入Eventqueue进行后续的回掉处理eventThread.queueEvent( we );return;}//*******省略部分代码*******//}
EventThread的queueEvent方法如下
public void queueEvent(WatchedEvent event) {if (event.getType() == EventType.None&& sessionState == event.getState()) {return;}//获得session的状态。sessionState = event.getState();// materialize the watchers based on the eventWatcherSetEventPair pair = new WatcherSetEventPair(watcher.materialize(event.getState(), event.getType(),event.getPath()),event);// queue the pair (watch set & event) for later processingwaitingEvents.add(pair);}
可以看出,最核心的就是ZKWatcherManager的materialize方法
接口定义如下:
/***/
public interface ClientWatchManager {/*** Return a set of watchers that should be notified of the event. The * manager must not notify the watcher(s), however it will update it's * internal structure as if the watches had triggered. The intent being * that the callee is now responsible for notifying the watchers of the * event, possibly at some later time.* * @param state event state* @param type event type* @param path event path* @return may be empty set but must not be null*/public Set<Watcher> materialize(Watcher.Event.KeeperState state,Watcher.Event.EventType type, String path);
}
ZKWatcherManager中materialize实现如下:
@Overridepublic Set<Watcher> materialize(Watcher.Event.KeeperState state,Watcher.Event.EventType type,String clientPath){Set<Watcher> result = new HashSet<Watcher>();switch (type) {//***省略部分代码**//case NodeDataChanged:case NodeCreated:synchronized (dataWatches) {addTo(dataWatches.remove(clientPath), result);}synchronized (existWatches) {addTo(existWatches.remove(clientPath), result);}break;//***省略部分代码***//return result;}}
客户端在识别出事件类型EventType后,会从相应的Watcher存储(即dataWatchers、existsWatchers或childWatchers中的一个或多个)中去除相应的Watcher。使用remove接口,表示客服端的Watcher也是一次性的。解析完Watcher之后就会放入到EventThread的waitingEvents中,等待线程消耗处理。
EventThread的run方法如下:去除了日志等
@Overridepublic void run() {try {isRunning = true;while (true) {Object event = waitingEvents.take();if (event == eventOfDeath) {wasKilled = true;} else {//处理事件processEvent(event);}if (wasKilled)synchronized (waitingEvents) {if (waitingEvents.isEmpty()) {isRunning = false;break;}}}} catch (InterruptedException e) {}}
processEvent方法如下
private void processEvent(Object event) {try {if (event instanceof WatcherSetEventPair) {WatcherSetEventPair pair = (WatcherSetEventPair) event;for (Watcher watcher : pair.watchers) {try {//调用process进行处理watcher.process(pair.event);} catch (Throwable t) {}}} else{//去除}}
从代码可知,EventThread线程每次从waitingEvents队列中取出一个Watcher,并进行串行同步处理。此Watcher才是客户端真正注册的Watcher。
Watcher特性总结
一次性:不管是客户端的Watcher还是服务端的Watcher,都只能使用一次,使用一次后就失效
客户端串行化:客户端Watcher回调的过程是一个串行同步的过程
轻量:WatchedEvent是Zookeeper整个Watcher通知机制的最小通知单元,这个数据只包含三个部分:通知状态,事件类型和节点路劲。并不会有事件的详细类型。这样设计客户端注册时也不需要把整个Watcher传递到服务端。
此文仅供个人学习记录使用。