【Zookeeper源码分析】—Watcher机制(一)

作者:leesf

出处:https://www.cnblogs.com/leesf456/p/6518040.html


一、前言

前面已经分析了Zookeeper持久话相关的类,下面接着分析Zookeeper中的Watcher机制所涉及到的类。

二、总体框图

对于Watcher机制而言,主要涉及的类主要如下。

说明:

  • Watcher,接口类型,其定义了process方法,需子类实现。
  • Event,接口类型,Watcher的内部类,无任何方法。
  • KeeperState,枚举类型,Event的内部类,表示Zookeeper所处的状态。
  • EventType,枚举类型,Event的内部类,表示Zookeeper中发生的事件类型。
  • WatchedEvent,表示对ZooKeeper上发生变化后的反馈,包含了KeeperState和EventType。
  • ClientWatchManager,接口类型,表示客户端的Watcher管理者,其定义了materialized方法,需子类实现。
  • ZKWatchManager,Zookeeper的内部类,继承ClientWatchManager。
  • MyWatcher,ZooKeeperMain的内部类,继承Watcher。
  • ServerCnxn,接口类型,继承Watcher,表示客户端与服务端的一个连接。
  • WatchManager,管理Watcher。

三、Watcher源码分析

3.1 内部类

Event,接口类型,表示事件代表的状态,除去其内部类,其源码结构如下

public interface Event {}

说明:以看到,Event接口并没有定义任何属性和方法,但是其包含了KeeperState和EventType两个内部枚举类。

3.2 接口方法

abstract public void process(WatchedEvent event);

说明:其代表了实现Watcher接口时必须实现的的方法,即定义进行处理,WatchedEvent表示观察的事件。

四、Event源码分析

3.1 内部类

1. KeeperState


public enum KeeperState { // 事件发生时Zookeeper的状态 / Unused, this state is never generated by the server */ @Deprecated // 未知状态,不再使用,服务器不会产生此状态 Unknown (-1), / The client is in the disconnected state - it is not connected \* to any server in the ensemble. */ // 断开 Disconnected (0), / Unused, this state is never generated by the server */ @Deprecated // 未同步连接,不再使用,服务器不会产生此状态 NoSyncConnected (1), / The client is in the connected state - it is connected \* to a server in the ensemble (one of the servers specified \* in the host connection parameter during ZooKeeper client \* creation). */ // 同步连接状态 SyncConnected (3), / \* Auth failed state */ // 认证失败状态 AuthFailed (4), / \* The client is connected to a read-only server, that is the \* server which is not currently connected to the majority. \* The only operations allowed after receiving this state is \* read operations. \* This state is generated for read-only clients only since \* read/write clients aren't allowed to connect to r/o servers. */ // 只读连接状态 ConnectedReadOnly (5), / \* SaslAuthenticated: used to notify clients that they are SASL-authenticated, \* so that they can perform Zookeeper actions with their SASL-authorized permissions. */ // SASL认证通过状态 SaslAuthenticated(6), / The serving cluster has expired this session. The ZooKeeper \* client connection (the session) is no longer valid. You must \* create a new client connection (instantiate a new ZooKeeper \* instance) if you with to access the ensemble. */ // 过期状态 Expired (-112); // 代表状态的整形值 private final int intValue; // Integer representation of value // for sending over wire // 构造函数 KeeperState(int intValue) { this.intValue = intValue; } // 返回整形值 public int getIntValue() { return intValue; } // 从整形值构造相应的状态 public static KeeperState fromInt(int intValue) { switch(intValue) { case -1: return KeeperState.Unknown; case 0: return KeeperState.Disconnected; case 1: return KeeperState.NoSyncConnected; case 3: return KeeperState.SyncConnected; case 4: return KeeperState.AuthFailed; case 5: return KeeperState.ConnectedReadOnly; case 6: return KeeperState.SaslAuthenticated; case -112: return KeeperState.Expired; default: throw new RuntimeException("Invalid integer value for conversion to KeeperState"); } } }

说明:KeeperState是一个枚举类,其定义了在事件发生时Zookeeper所处的各种状态,其还定义了一个从整形值返回对应状态的方法fromInt。

2. EventType 


public enum EventType { // 事件类型 // 无 None (-1), // 结点创建 NodeCreated (1), // 结点删除 NodeDeleted (2), // 结点数据变化 NodeDataChanged (3), // 结点子节点变化 NodeChildrenChanged (4); // 代表事件类型的整形 private final int intValue; // Integer representation of value // for sending over wire // 构造函数 EventType(int intValue) { this.intValue = intValue; } // 返回整形 public int getIntValue() { return intValue; } // 从整形构造相应的事件 public static EventType fromInt(int intValue) { switch(intValue) { case -1: return EventType.None; case 1: return EventType.NodeCreated; case 2: return EventType.NodeDeleted; case 3: return EventType.NodeDataChanged; case 4: return EventType.NodeChildrenChanged; default: throw new RuntimeException("Invalid integer value for conversion to EventType"); } } } }

说明:EventType是一个枚举类,其定义了事件的类型(如创建节点、删除节点等事件),同时,其还定义了一个从整形值返回对应事件类型的方法fromInt。

五、WatchedEvent

5.1 类的属性


public class WatchedEvent { // Zookeeper的状态 final private KeeperState keeperState; // 事件类型 final private EventType eventType; // 事件所涉及节点的路径 private String path; }

说明:WatchedEvent类包含了三个属性,分别代表事件发生时Zookeeper的状态、事件类型和发生事件所涉及的节点路径。

5.2 构造函数

1. public WatchedEvent(EventType eventType, KeeperState keeperState, String path)型构造函数 


public WatchedEvent(EventType eventType, KeeperState keeperState, String path) { // 初始化属性 this.keeperState = keeperState; this.eventType = eventType; this.path = path; }

说明:构造函数传入了三个参数,然后分别对属性进行赋值操作。

2. public WatchedEvent(WatcherEvent eventMessage)型构造函数


public WatchedEvent(WatcherEvent eventMessage) { // 从eventMessage中取出相应属性进行赋值 keeperState = KeeperState.fromInt(eventMessage.getState()); eventType = EventType.fromInt(eventMessage.getType()); path = eventMessage.getPath(); }

说明:构造函数传入了WatcherEvent参数,之后直接从该参数中取出相应属性进行赋值操作。

对于WatchedEvent类的方法而言,相对简单,包含了几个getXXX方法,用于获取相应的属性值。

六、ClientWatchManager

6.1 接口方法 

public Set<Watcher> materialize(Watcher.Event.KeeperState state,
        Watcher.Event.EventType type, String path);

说明:该方法表示事件发生时,返回需要被通知的Watcher集合,可能为空集合。

七、ZKWatchManager

7.1 类的属性 


private static class ZKWatchManager implements ClientWatchManager { // 数据变化的Watchers private final Map<String, Set<Watcher>> dataWatches = new HashMap<String, Set<Watcher>>(); // 节点存在与否的Watchers private final Map<String, Set<Watcher>> existWatches = new HashMap<String, Set<Watcher>>(); // 子节点变化的Watchers private final Map<String, Set<Watcher>> childWatches = new HashMap<String, Set<Watcher>>(); }

说明:ZKWatchManager实现了ClientWatchManager,并定义了三个Map键值对,键为节点路径,值为Watcher。分别对应数据变化的Watcher、节点是否存在的Watcher、子节点变化的Watcher。

7.2 核心方法分析

1. materialize方法


public Set<Watcher> materialize(Watcher.Event.KeeperState state, Watcher.Event.EventType type, String clientPath) { // 新生成结果Watcher集合 Set<Watcher> result = new HashSet<Watcher>(); switch (type) { // 确定事件类型 case None: // 无类型 // 添加默认Watcher result.add(defaultWatcher); // 是否需要清空(提取对zookeeper.disableAutoWatchReset字段进行配置的值、Zookeeper的状态是否为同步连接) boolean clear = ClientCnxn.getDisableAutoResetWatch() && state != Watcher.Event.KeeperState.SyncConnected; synchronized(dataWatches) { // 同步块 for(Set<Watcher> ws: dataWatches.values()) { // 添加至结果集合 result.addAll(ws); } if (clear) { // 是否需要清空 dataWatches.clear(); } } synchronized(existWatches) { // 同步块 for(Set<Watcher> ws: existWatches.values()) { // 添加至结果集合 result.addAll(ws); } if (clear) { // 是否需要清空 existWatches.clear(); } } synchronized(childWatches) { // 同步块 for(Set<Watcher> ws: childWatches.values()) { // 添加至结果集合 result.addAll(ws); } if (clear) { // 是否需要清空 childWatches.clear(); } } // 返回结果 return result; case NodeDataChanged: // 节点数据变化 case NodeCreated: // 创建节点 synchronized (dataWatches) { // 同步块 // 移除clientPath对应的Watcher后全部添加至结果集合 addTo(dataWatches.remove(clientPath), result); } synchronized (existWatches) { // 移除clientPath对应的Watcher后全部添加至结果集合 addTo(existWatches.remove(clientPath), result); } break; case NodeChildrenChanged: // 节点子节点变化 synchronized (childWatches) { // 移除clientPath对应的Watcher后全部添加至结果集合 addTo(childWatches.remove(clientPath), result); } break; case NodeDeleted: // 删除节点 synchronized (dataWatches) { // 移除clientPath对应的Watcher后全部添加至结果集合 addTo(dataWatches.remove(clientPath), result); } // XXX This shouldn't be needed, but just in case synchronized (existWatches) { // 移除clientPath对应的Watcher Set<Watcher> list = existWatches.remove(clientPath); if (list != null) { // 移除clientPath对应的Watcher后全部添加至结果集合 addTo(existWatches.remove(clientPath), result); LOG.warn("We are triggering an exists watch for delete! Shouldn't happen!"); } } synchronized (childWatches) { // 移除clientPath对应的Watcher后全部添加至结果集合 addTo(childWatches.remove(clientPath), result); } break; default: // 缺省处理 String msg = "Unhandled watch event type " + type \+ " with state " + state + " on path " + clientPath; LOG.error(msg); throw new RuntimeException(msg); } // 返回结果集合 return result; } }

说明:该方法在事件发生后,返回需要被通知的Watcher集合。在该方法中,首先会根据EventType类型确定相应的事件类型,然后根据事件类型的不同做出相应的操作,如针对None类型,即无任何事件,则首先会从三个键值对中删除clientPath对应的Watcher,然后将剩余的Watcher集合添加至结果集合;针对NodeDataChanged和NodeCreated事件而言,其会从dataWatches和existWatches中删除clientPath对应的Watcher,然后将剩余的Watcher集合添加至结果集合。

八、总结

针对Watcher机制的第一部分的源码分析就已经完成,可以看到此部分的源码相对简单,之后会分析org.apache.zookeeper.server下的WatchManager和ClientWatchManager所在外部类ZooKeeper,也谢谢各位园友的观看~

赞(0) 打赏

如未加特殊说明,此网站文章均为原创,转载必须注明出处。Java 技术驿站 » 【Zookeeper源码分析】—Watcher机制(一)
分享到: 更多 (0)

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

Java 技术驿站 | 致力打造 Java 精品博客

联系作者优质文章

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏