1. 数据存储
事务日志
快照日志
运行时日志 bin/zookeeper.out
2 基于 Java API 初探 zookeeper 的使用
2.1 zookeeper 增删改查
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;import java.io.IOException;
import java.util.concurrent.CountDownLatch;/*** @author: xiepanpan* @Date: 2020/5/30* @Description: zookeeper 连接*/
public class ConnectionDemo {
public static void main(String[] args) throws Exception {
try {
final CountDownLatch countDownLatch = new CountDownLatch(1);ZooKeeper zooKeeper =new ZooKeeper("192.168.217.130:2181,192.168.217.130:2183,192.168.217.130:2183", 4000, new Watcher() {
public void process(WatchedEvent watchedEvent) {
//如果收到了服务端的响应事件,连接成功if (Event.KeeperState.SyncConnected==watchedEvent.getState()) {
countDownLatch.countDown();}}});countDownLatch.await();//CONNECTEDSystem.out.println(zooKeeper.getState());//创建节点 第三个参数是权限 OPEN_ACL_UNSAFE 所有人都可以访问 第四个参数 节点类型 持久节点zooKeeper.create("/zk-persistent-xpp","0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);Thread.sleep(1000);//得到节点Stat stat = new Stat();byte[] bytes = zooKeeper.getData("/zk-persistent-xpp", null, stat);System.out.println(new String(bytes));//修改节点zooKeeper.setData("/zk-persistent-xpp","1".getBytes(),stat.getVersion());byte[] data = zooKeeper.getData("/zk-persistent-xpp", null, stat);System.out.println(new String(data));//删除节点 使用乐观锁 比较版本号zooKeeper.delete("/zk-persistent-xpp",stat.getVersion());zooKeeper.close();} catch (IOException e) {
e.printStackTrace();} catch (InterruptedException e) {
e.printStackTrace();}}
}
2.2 事件机制
Watcher 监听机制是 Zookeeper 中非常重要的特性,我们基于 zookeeper 上创建的节点,可以对这些节点绑定监听
事件,比如可以监听节点数据变更、节点删除、子节点状态变更等事件,通过这个事件机制,可以基于 zookeeper
实现分布式锁、集群管理等功能
watcher 特性:当数据发生变化的时候, zookeeper 会产生一个 watcher 事件,并且会发送到客户端。但是客户端
只会收到一次通知。如果后续这个节点再次发生变化,那么之前设置 watcher 的客户端不会再次收到消息。(watcher 是一次性的操作)。 可以通过循环监听去达到永久监听效果
2.3 如何注册事件机制
通过这三个操作来绑定事件 :getData、Exists、getChildren
2.4 如何触发事件?
凡是事务类型的操作,都会触发监听事件。
create /delete /setData
2.5 watcher 事件类型
None (-1), 客户端链接状态发生变化的时候,会收到 none 的事件
NodeCreated (1), 创建节点的事件。 比如 zk-persis-mic
NodeDeleted (2), 删除节点的事件
NodeDataChanged (3), 节点数据发生变更
NodeChildrenChanged (4); 子节点被创建、被删除、会发生事件触发
2.6 操作对应的water事件类型
zk-persis-mic ( 监听事件) | zk-persis-mic/child (子节点监听事件) | |
---|---|---|
create(/ zk-persis-mic) | NodeCreated (exists getData) | 无 |
delete(/ zk-persis-mic) | NodeDeleted (exists getData) | 无 |
setData(/ zk-persis-mic) | NodeDataChanged (exists getData) | |
create(/ zk-persis-mic/child) | NodeChildrenChange(getChildren) | NodedCreated |
delete(/ zk-persis-mic/child) | NodeChildrenChange(getChildren) | NodedDeleted |
setData(/ zk-persis-mic/child) | NodeDataChanged |
3 Curator 客户端的使用,简单高效
3.1使用Curator对zookeeper增删改查
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;/*** @author: xiepanpan* @Date: 2020/5/31* @Description: 使用Curator框架来对zookeeper操作*/
public class CuratorDemo {
public static void main(String[] args) throws Exception {
CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()//衰减的重试机制.retryPolicy(new ExponentialBackoffRetry(1000,3)).connectString("192.168.217.130:2181,192.168.217.130:2183,192.168.217.130:2183").sessionTimeoutMs(4000)//隔离命名空间 以下所有操作都是基于该相对目录进行的.namespace("curator").build();curatorFramework.start();//创建节点//结果 /curator/xpp/mode1curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/xpp/node1","1".getBytes());//更改节点//保存节点状态Stat stat = new Stat();curatorFramework.getData().storingStatIn(stat).forPath("/xpp/node1");curatorFramework.setData().withVersion(stat.getVersion()).forPath("/xpp/node1","xx".getBytes());//删除节点curatorFramework.delete().deletingChildrenIfNeeded().forPath("/xpp/node1");}}
3.2 监听
PathChildrenCache 监听一个节点下的子节点的创建删除和更新
NodeCache监听一个节点的更新和创建事件
TreeCache 综合PathChildrenCache 和NodeCache 的特性
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.ExponentialBackoffRetry;/*** @author: xiepanpan* @Date: 2020/5/31* @Description: 事件监听*/
public class CuratorWatcherDemo {
public static void main(String[] args) throws Exception {
CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()//衰减的重试机制.retryPolicy(new ExponentialBackoffRetry(1000,3)).connectString("192.168.217.130:2181,192.168.217.130:2183,192.168.217.130:2183").sessionTimeoutMs(4000)//隔离命名空间 以下所有操作都是基于该相对目录进行的.namespace("curator").build();curatorFramework.start();// addListenerWithNodeCache(curatorFramework,"/xpp");
// addListenerWithPathChildCache(curatorFramework,"/xpp");addListenerWithTreeCache(curatorFramework,"/xpp");System.in.read();}/*** 监听一个节点的更新和创建事件* @param curatorFramework* @param path* @throws Exception*/public static void addListenerWithNodeCache(CuratorFramework curatorFramework,String path) throws Exception {
//第三个参数 对详细内容数据的压缩final NodeCache nodeCache = new NodeCache(curatorFramework,path,false);NodeCacheListener nodeCacheListener = new NodeCacheListener() {
public void nodeChanged() throws Exception {
System.out.println("Receive Event:"+nodeCache.getCurrentData().getPath());}};nodeCache.getListenable().addListener(nodeCacheListener);nodeCache.start();}/*** 监听一个节点子节点的创建删除和更新* @param curatorFramework* @param path* @throws Exception*/public static void addListenerWithPathChildCache(CuratorFramework curatorFramework,String path) throws Exception {
PathChildrenCache pathChildrenCache = new PathChildrenCache(curatorFramework, path, true);PathChildrenCacheListener pathChildrenCacheListener = new PathChildrenCacheListener() {
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
System.out.println("Receive Event:"+pathChildrenCacheEvent.getType());}};pathChildrenCache.getListenable().addListener(pathChildrenCacheListener);pathChildrenCache.start(PathChildrenCache.StartMode.NORMAL);}/*** 综合节点监听事件 监听当前节点和子节点 节点上任何一个事件都能收到* @param curatorFramework* @param path* @throws Exception*/public static void addListenerWithTreeCache(CuratorFramework curatorFramework,String path) throws Exception {
TreeCache treeCache = new TreeCache(curatorFramework,path);TreeCacheListener treeCacheListener = new TreeCacheListener() {
public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
System.out.println(treeCacheEvent.getType()+"->"+treeCacheEvent.getData().getPath());}};treeCache.getListenable().addListener(treeCacheListener);treeCache.start();}
}
我的pom文件
<!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper --><dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.5.4-beta</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.curator/curator-framework --><dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>2.7.1</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes --><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>2.7.1</version></dependency>
zookeeper 我是用的3.4.14 版本
注意 Curator 版本 和zookeeper版本有对应关系 我的Curator版本是2.7.1
https://blog.csdn.net/glory1234work2115/article/details/51967507