zk的列Z令应用主要是针对三类: 在学Java API之前,应用用命我们先来了解一下zookeeper的及常常用命令。 连接zookeeper server。列Z令 获取帮助help。应用用命 连接远程节点。及常 关闭连接。列Z令 显示集群。应用用命 [zk: localhost:2181(CONNECTED) 0] config server.0=jt2:2888:3888:participant server.1=jt3:2888:3888:participant server.2=jt4:2888:3888:participant 创建一个znode。及常 命令语法:create [-s] [-e] [-c] [-t ttl] path [data] [acl] -s:创建的是带序列号的节点,序列号用0填充节点路径。 -e:创建的是临时节点。高防服务器 -c:创建的是容器节点 path:znode的路径,ZooKeeper中没有相对路径,所有路径都必须以’/开头。 data:znode携带的数据。 acl:这个节点的ACL。 #创建一个永久节点 [zk: localhost:2181(CONNECTED) 2] create /zkBase Created /zkBase #创建一个临时节点 [zk: localhost:2181(CONNECTED) 3] create -e /ephemeral_node 删除znode节点。 #删除节点前要求节点目录为空,不存在子节点 [zk: localhost:2181(CONNECTED) 34] delete /config Node not empty: /config [zk: localhost:2181(CONNECTED) 35] delete /config/topics/test [zk: localhost:2181(CONNECTED) 27] delete /ephemeral_node #如果要删除整个节点及子节点可以使用deleteall 显示一个节点的状态。 [zk: localhost:2181(CONNECTED) 11] stat /test cZxid = 0x180000000e ctime = Thu Jul 28 03:25:08 CST 2022 mZxid = 0x180000000e mtime = Thu Jul 28 03:25:08 CST 2022 pZxid = 0x180000000e cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 0 查看路径子节点。 命令语法:ls [-s] [-w] [-R] path。 获取指定路径下的数据。 [zk: localhost:2181(CONNECTED) 16] get /zookeeper/config server.0=jt2:2888:3888:participant server.1=jt3:2888:3888:participant server.2=jt4:2888:3888:participant 设置或者更新路径数据。 [zk: localhost:2181(CONNECTED) 19] set /test/hehe "haha" [zk: localhost:2181(CONNECTED) 20] get /test/hehe 设置ACL。 ACL权限 ACL 简写 允许的操作 CREATE c 创建子节点 READ r 获取节点的数据和它的子节点 WRITE w 设置节点的数据 DELETE d 删除子节点 (仅下一级节点) ADMIN a 设置 ACL 权限 ZooKeeper内置了一些权限控制方案,可以用以下方案为每个节点设置权限: 方案 描述 world 只有一个用户:anyone,代表所有人(默认) ip 使用IP地址认证 auth 使用已添加认证的用户认证 digest 使用“用户名:密码”方式认证 [zk: localhost:2181(CONNECTED) 21] getAcl /test world,anyone : cdrwa [zk: localhost:2181(CONNECTED) 22] create /mynode1 hello Created /mynode1 [zk: localhost:2181(CONNECTED) 23] addauth digest admin:admin [zk: localhost:2181(CONNECTED) 24] setAcl /mynode1 auth:admin:cdrwa [zk: localhost:2181(CONNECTED) 25] getAcl /mynode1 digest,admin:x1nq8J5GOJVPY6zgzhtTtA9izLc= 同步数据集群间数据。 [zk: localhost:2181(CONNECTED) 26] sync / 查看命令执行历史。源码下载 [zk: localhost:2181(CONNECTED) 27] history 17 - help 18 - getAllChildrenNumber /zookeeper 19 - set /test/hehe "haha" 20 - get /test/hehe 21 - getAcl /test 22 - create /mynode1 hello 23 - addauth digest admin:admin 24 - setAcl /mynode1 auth:admin:cdrwa 25 - getAcl /mynode1 26 - sync / 退出客户端。 [zk: localhost:2181(CONNECTED) 28] quit WATCHER:: WatchedEvent state:Closed type:None path:null 2022-07-28 03:33:49,307 [myid:] - INFO [main:ZooKeeper@1422] - Session: 0xcebb0001 closed zookeeper客户端和服务器会话的建立是一个异步的过程,也就是说在程序中,程序方法在处理完客户端初始化后立即返回(即程序继续往下执行代码,这样,在大多数情况下并没有真正的构建好一个可用会话,在会话的生命周期处于“CONNECTED”时才算真正的建立完毕,所以需要使用到多线程中的一个工具类CountDownLatch)。 (一共有4个构造方法,根据参数不同)。 Zookeeper(String connectString,int sessionTimeout,Watcher watcher) Zookeeper(String connectString,int sessionTimeout,Watcher watcher,boolean canBeReadOnly) Zookeeper(String connectString,int sessionTimeout,Watcher watcher,long sessionId,byte[] sessionPasswd) 参数说明: 注意,整个创建会话的过程是异步的,构造方法会在初始化连接后即返回,并不代表真正建立好了一个会话,此时会话处于"CONNECTING"状态。当会话真正创建起来后,服务器会发送事件通知给客户端,只有客户端获取到这个通知后,会话才真正建立。 代码演示:public class ZkConnect implements Watcher { private static final Logger log = LoggerFactory.getLogger(ZkConnect.class); //public static final String zkServerPath = "192.168.8.74:2181,192.168.8.75:2181,192.168.8.76:2181"; public static final String zkServerPath = "127.0.0.1:2181"; public static final Integer timeout = 5000; public static CountDownLatch countDownLatch = new CountDownLatch(1); / * 客户端与zkServer连接是一个异步的过程,当连接成功后,客户端会收到一个watch通知 * 参数: * connectString: 连接服务器的ip字符串 * sessionTimeout: 超时时间,心跳收不到了,就超时 * watcher: 通知事件,如果有对应的事件触发,则会收到一个通知;如果不需要,就设置为null * canBeReadOnly: 可读,当这个物理机节点断开后,还是可以读到数据的,只是不能写;此时数据被读取到的可能 * 是旧数据,此处建议设置为false * sessionId: 会话id * sessionPasswd: 会话密码,当会话丢失后,可以依据sessionId和sessionPasswd重新获取会话 */ public static void main(String[] args) throws Exception { ZooKeeper zk = new ZooKeeper(zkServerPath, timeout, new ZkConnect()); log.warn("客户端开始连接zookeeper服务器。。。连接状态: { }", zk.getState()); countDownLatch.await(); // 如果不停顿一段时间, 会收不到watch通知 log.warn("连接状态: { }", zk.getState()); } @Override public void process(WatchedEvent event) { log.warn("接收到watch通知: { }", event); countDownLatch.countDown(); } }public class ZkReconnect implements Watcher { private static final Logger log = LogManager.getLogger(ZkReconnect.class); public static final String zkServerPath = "127.0.0.1:2181"; public static final Integer timeout = 5000; public static CountDownLatch countDownLatch1 = new CountDownLatch(1); public static CountDownLatch countDownLatch2 = new CountDownLatch(2); public static void main(String[] args) throws Exception { ZooKeeper zk = new ZooKeeper(zkServerPath, timeout, new ZkReconnect()); long sessionId = zk.getSessionId(); byte[] sessionPasswd = zk.getSessionPasswd(); log.warn("客户端开始连接zookeeper服务器。。。连接状态: { }", zk.getState()); countDownLatch1.await(); // 如果不停顿一段时间, 会收不到watch通知 log.warn("连接状态: { }", zk.getState()); Thread.sleep(1000); log.warn("开始会话重连..."); ZooKeeper zkSession = new ZooKeeper(zkServerPath, timeout, new ZkReconnect(), sessionId, sessionPasswd); log.warn("重新连接, 状态: { }", zk.getState()); countDownLatch2.await(); log.warn("重新连接, 状态: { }", zk.getState()); } @Override public void process(WatchedEvent event) { log.warn("接收到watch通知: { }", event); countDownLatch1.countDown(); countDownLatch2.countDown(); } 提供了两套创建节点的方法,同步和异步创建节点方式。 String create(final String path,byte data[],List acl,CreateMode createMode);//同步方式创建 path:节点路径(名称):/nodeName。不允许递归创建节点,在父节点不存在的情况下,不允许创建子节点。 data[]:节点内容:要求类型是字节数组,也就是说不支持序列话方式,如果需要实现序列化,可使用java相关序列化框架,如Hessian,Kryo。 acl:节点权限:使用Ids.OPEN_ACL_UNSAFE开放权限即可。 createMode:节点类型:创建节点的类型,CreateMode.*,提供了如下所示的四种节点类型: 异步方式(在同步方法参数的基础上增加两个参数): cb:回调方法:注册一个异步回调方法,要实现。 ctx:传递给回调方法的参数,一般为上下文(Context)信息。 代码演示public class ZkNodeCreate implements Watcher { private ZooKeeper zooKeeper = null; private static final Logger log = LoggerFactory.getLogger(ZkNodeCreate.class); private static final String zkServerPath = "127.0.0.1:2181"; private static final Integer timeout = 5000; public ZkNodeCreate() { } public ZkNodeCreate(String connectString) { try { zooKeeper = new ZooKeeper(connectString, timeout, new ZkNodeCreate()); } catch (Exception e) { e.printStackTrace(); if (zooKeeper != null) { try { zooKeeper.close(); } catch (Exception e1) { e1.printStackTrace(); } } } } public static void main(String[] args) throws InterruptedException { ZkNodeCreate zkNodeOperator = new ZkNodeCreate(zkServerPath); zkNodeOperator.createZKNode("/testnode", "testnode".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE); new CountDownLatch(1).await(); } / * 同步或异步创建节点,都不支持子节点的递归创建,异步有一个callback函数 * 参数: * path: 创建的路径 * data: 存储的数据 * acl: 控制权限策略. Ids.OPEN_ACL_UNSAFE --> world:anyone:cdrwa * Ids.CREATOR_ALL_ACL --> auth:user:password:cdrwa * createMode: 节点类型,是一个枚举 * PERSISTENT 持久节点 * PERSISTENT_SEQUENTIAL 持久顺序节点 * EPHEMERAL 临时节点 * EPHEMERAL_SEQUENTIAL 临时顺序节点 * @param path * @param data * @param acls */ private void createZKNode(String path, byte[] data, ArrayList acls) { String result = ""; try { // 同步创建 //result = zooKeeper.create(path, data, acls, CreateMode.EPHEMERAL); //log.warn("同步创建临时节点: { } 成功。。。", result); // 异步创建 String ctx = "{ create:success}"; zooKeeper.create(path, data, acls, CreateMode.EPHEMERAL, new CreateNodeCallBack(), ctx); Thread.sleep(5000); log.warn("异步创建临时节点: { } 成功。。。", result); } catch (Exception e) { e.printStackTrace(); } } @Override public void process(WatchedEvent event) { log.warn("客户端连接接收到watch通知: { }", event); } public ZooKeeper getZooKeeper() { return zooKeeper; } public void setZooKeeper(ZooKeeper zooKeeper) { this.zooKeeper = zooKeeper; } private static class CreateNodeCallBack implements AsyncCallback.StringCallback { @Override public void processResult(int rc, String path, Object ctx, String name) { log.warn("异步创建节点:{ }, ctx: { }", path, (String)ctx); } } 修改节点数据。 public class ZkNodeUpdate implements Watcher { private ZooKeeper zooKeeper = null; private static final Logger log = LoggerFactory.getLogger(ZkNodeUpdate.class); private static final String zkServerPath = "127.0.0.1:2181"; private static final Integer timeout = 5000; public ZkNodeUpdate() { } public ZkNodeUpdate(String connectString) { try { zooKeeper = new ZooKeeper(connectString, timeout, new ZkNodeUpdate()); } catch (Exception e) { e.printStackTrace(); if (zooKeeper != null) { try { zooKeeper.close(); } catch (Exception e1) { e1.printStackTrace(); } } } } public static void main(String[] args) throws KeeperException, InterruptedException { ZkNodeUpdate zkNodeOperator = new ZkNodeUpdate(zkServerPath); // 创建节点 zkNodeOperator.createZKNode("/testnode", "testnode".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE); // 修改节点数据 第三个参数是版本号dataVersion,用于乐观锁控制 Stat stat = zkNodeOperator.getZooKeeper().setData("/testnode", "修改后的数据".getBytes(), 0); //zk.setData(path, data, version,new UpdateCallBack(),ctx);//异步修改 Thread.sleep(5000); log.warn("修改后, dataVersion版本: { }", stat.getVersion()); new CountDownLatch(1).await(); } private void createZKNode(String path, byte[] data, ArrayList acls) { String result = ""; try { // 异步创建 String ctx = "{ create:success}"; zooKeeper.create(path, data, acls, CreateMode.EPHEMERAL, new CreateNodeCallBack(), ctx); Thread.sleep(5000); log.warn("异步创建临时节点: { } 成功。。。", result); } catch (Exception e) { e.printStackTrace(); } } @Override public void process(WatchedEvent event) { log.warn("客户端连接接收到watch通知: { }", event); } public ZooKeeper getZooKeeper() { return zooKeeper; } public void setZooKeeper(ZooKeeper zooKeeper) { this.zooKeeper = zooKeeper; } private static class CreateNodeCallBack implements AsyncCallback.StringCallback { @Override public void processResult(int rc, String path, Object ctx, String name) { log.warn("异步创建节点:{ }, ctx: { }", path, (String)ctx); } } 同步或异步删除节点数据。 public static void main(String[] args) throws KeeperException, InterruptedException { ZkNodeDelete zkNodeOperator = new ZkNodeDelete(zkServerPath); // 创建节点 zkNodeOperator.createZKNode("/testnode", "testnode".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE); // 同步删除节点 //zkNodeOperator.getZooKeeper().delete("/testnode", 1); // 第二个参数 dataVersion Thread.sleep(5000); // 异步删除节点 String ctx = "{ delete:success}"; zkNodeOperator.getZooKeeper().delete("/testnode", 0, new AsyncCallback.VoidCallback() { @Override public void processResult(int rc, String path, Object ctx) { log.warn("异步删除节点:{ }, ctx: { }", path, (String)ctx); } }, ctx); new CountDownLatch(1).await(); 节点查询。 获取节点数据。public class ZKGetNodeData implements Watcher { private ZooKeeper zooKeeper = null; private static final Logger log = LoggerFactory.getLogger(ZKGetNodeData.class); private static final String zkServerPath = "127.0.0.1:2181"; private static final Integer timeout = 5000; public ZKGetNodeData() { } public ZKGetNodeData(String connectString) { try { zooKeeper = new ZooKeeper(connectString, timeout, new ZKGetNodeData()); } catch (Exception e) { e.printStackTrace(); if (zooKeeper != null) { try { zooKeeper.close(); } catch (Exception e1) { e1.printStackTrace(); } } } } private static CountDownLatch countDownLatch = new CountDownLatch(1); private static Stat stat = new Stat(); public static void main(String[] args) throws Exception { ZKGetNodeData zkGetNodeData = new ZKGetNodeData(zkServerPath); zkGetNodeData.getZooKeeper().create("/testnode","testnode".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); Thread.sleep(5000); // 第一个参数: 节点path; 第二个参数: true注册一个监听事件; 第三个参数: 获取的结果会保存在stat byte[] result = zkGetNodeData.getZooKeeper().getData("/testnode", true, stat); log.warn("当前值: { }", new String(result)); countDownLatch.await(); } @Override public void process(WatchedEvent event) { try { if (event.getType() == Event.EventType.NodeDataChanged) { ZKGetNodeData zkGetNodeData = new ZKGetNodeData(zkServerPath); byte[] result = zkGetNodeData.getZooKeeper().getData("/testnode", false, stat); log.warn("监听到值已经更改, 更改后的值为: { }, 版本号: { }", new String(result), stat.getVersion()); countDownLatch.countDown(); // 计数器减1 } else if (event.getType() == Event.EventType.NodeCreated) { } else if (event.getType() == Event.EventType.NodeDeleted) { } else if (event.getType() == Event.EventType.NodeChildrenChanged) { } } catch (Exception e) { e.printStackTrace(); } } public ZooKeeper getZooKeeper() { return zooKeeper; } public void setZooKeeper(ZooKeeper zooKeeper) { this.zooKeeper = zooKeeper; } }获取子节点列表。public class ZKGetChildrenList implements Watcher { private ZooKeeper zooKeeper = null; private static final Logger log = LoggerFactory.getLogger(ZKGetChildrenList.class); private static final String zkServerPath = "127.0.0.1:2181"; private static final Integer timeout = 5000; public ZKGetChildrenList() { } public ZKGetChildrenList(String connectString) { try { zooKeeper = new ZooKeeper(connectString, timeout, new ZKGetChildrenList()); } catch (Exception e) { e.printStackTrace(); if (zooKeeper != null) { try { zooKeeper.close(); } catch (Exception e1) { e1.printStackTrace(); } } } } private static CountDownLatch countDownLatch = new CountDownLatch(1); private static Stat stat = new Stat(); public static void main(String[] args) throws Exception { ZKGetChildrenList zkGetChildrenList = new ZKGetChildrenList(zkServerPath); zkGetChildrenList.getZooKeeper().create("/zookeeper/bbb","bbb".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); Thread.sleep(5000); // 同步调用: 参数1 节点路径, 参数2 true或false, 注册一个watch事件 List for (String child : children) { log.warn(child); } // 异步调用 // String ctx = "{ callback:ChildrenCallback}"; // zkGetChildrenList.getZooKeeper().getChildren("/testnode", true, new AsyncCallback.ChildrenCallback() { // @Override // public void processResult(int rc, String path, Object ctx, List // log.warn("callback, path: { }, children: { }", path, children.toString()); // } // }, ctx); countDownLatch.await(); } @Override public void process(WatchedEvent event) { try { if (event.getType() == Event.EventType.NodeDataChanged) { } else if (event.getType() == Event.EventType.NodeCreated) { } else if (event.getType() == Event.EventType.NodeDeleted) { } else if (event.getType() == Event.EventType.NodeChildrenChanged) { ZKGetChildrenList zkGetChildrenList = new ZKGetChildrenList(zkServerPath); List log.warn("监听到子节点改变, 改变后子节点数组为:"); for (String child : children) { log.warn(child); } countDownLatch.countDown(); // 计数器减1 } } catch (Exception e) { e.printStackTrace(); } } public ZooKeeper getZooKeeper() { return zooKeeper; } public void setZooKeeper(ZooKeeper zooKeeper) { this.zooKeeper = zooKeeper; } 判断节点是否存在。 public class ZKNodeExist implements Watcher { private ZooKeeper zooKeeper = null; private static final Logger log = LoggerFactory.getLogger(ZKNodeExist.class); private static final String zkServerPath = "127.0.0.1:2181"; private static final Integer timeout = 5000; public ZKNodeExist() { } public ZKNodeExist(String connectString) { try { zooKeeper = new ZooKeeper(connectString, timeout, new ZKNodeExist()); } catch (Exception e) { e.printStackTrace(); if (zooKeeper != null) { try { zooKeeper.close(); } catch (Exception e1) { e1.printStackTrace(); } } } } private static CountDownLatch countDownLatch = new CountDownLatch(1); public static void main(String[] args) throws Exception { ZKNodeExist zkNodeExist = new ZKNodeExist(zkServerPath); Stat stat = zkNodeExist.getZooKeeper().exists("/testnode", true); if (stat == null) { log.warn("节点/testnode不存在"); } else { log.warn("节点/testnode存在. stat: { }", stat); } countDownLatch.await(); } @Override public void process(WatchedEvent event) { try { if (event.getType() == Event.EventType.NodeDataChanged) { } else if (event.getType() == Event.EventType.NodeCreated) { } else if (event.getType() == Event.EventType.NodeDeleted) { } else if (event.getType() == Event.EventType.NodeChildrenChanged) { } countDownLatch.countDown(); } catch (Exception e) { e.printStackTrace(); } } public ZooKeeper getZooKeeper() { return zooKeeper; } public void setZooKeeper(ZooKeeper zooKeeper) { this.zooKeeper = zooKeeper; } zookeeper有watch事件,是一次性触发的。当watch监视的数据发生变化时,通知在创建zookeeper是设置了Watcher的客户端。Watcher类监视的事件类型和状态类型如下所示: 事件类型(znode节点相关): 状态类型(客户端实例相关): Watcher的特性:一次性、客户端串行执行、轻量。 ACL(Access Control List),Zookeeper作为一个分布式协调框架,其内部存储的都是一些关乎分布式系统运行时状态的元数据,尤其是涉及到一些分布式锁、Master选举和协调等应用场景。我们需要有效的保障Zookeeper中的数据安全,Zookeeper提供了一套完善的ACL权限控制机制来保障数据的安全。 Zookeeper提供了三种模式,权限模式、授权对象、权限: 权限模式:Scheme,开发人员经常使用如下四种权限模式: 权限对象:指的是权限赋予给用户或者一个指定的实体,例如IP地址或机器等。在不同的模式下,授权对象是不同的。这种模式和授权对象一一对应。 权限:权限就是指那些通过权限检测后可以被允许执行的操作,在Zookeeper中,对数据的操作权限分为以下五大类: 自定义用户权限。 public class ZkNodeAcl implements Watcher { private ZooKeeper zooKeeper = null; private static final Logger log = LoggerFactory.getLogger(ZkNodeAcl.class); private static final String zkServerPath = "127.0.0.1:2181"; private static final Integer timeout = 5000; public ZkNodeAcl() { } public ZkNodeAcl(String connectString) { try { zooKeeper = new ZooKeeper(connectString, timeout, new ZkNodeAcl()); } catch (Exception e) { e.printStackTrace(); if (zooKeeper != null) { try { zooKeeper.close(); } catch (Exception e1) { e1.printStackTrace(); } } } } public static void main(String[] args) throws InterruptedException, NoSuchAlgorithmException, KeeperException { ZkNodeAcl zkNodeOperator = new ZkNodeAcl(zkServerPath); ArrayList acls = new ArrayList(); Id test1 = new Id("digest", DigestAuthenticationProvider.generateDigest("test1:123456")); Id test2 = new Id("digest", DigestAuthenticationProvider.generateDigest("test2:123456")); acls.add(new ACL(ZooDefs.Perms.ALL,test1)); acls.add(new ACL(ZooDefs.Perms.READ,test2)); acls.add(new ACL(ZooDefs.Perms.DELETE | ZooDefs.Perms.CREATE,test2)); zkNodeOperator.createZKNode("/testacl", "heihei".getBytes(), acls); zkNodeOperator.getZooKeeper().addAuthInfo("digest", "test2:123456".getBytes()); Thread.sleep(10000); Stat stat = new Stat(); byte[] result = zkNodeOperator.getZooKeeper().getData("/testacl", false, stat); log.warn("当前值: { }, 版本: { }", new String(result), stat.getVersion()); new CountDownLatch(1).await(); } private void createZKNode(String path, byte[] data, ArrayList acls) { String result = ""; try { String ctx = "{ create:success}"; zooKeeper.create(path, data, acls, CreateMode.PERSISTENT, new CreateCallBack(), ctx); Thread.sleep(5000); log.warn("异步创建节点: { } 成功。。。", result); } catch (Exception e) { e.printStackTrace(); } } @Override public void process(WatchedEvent event) { log.warn("客户端连接接收到watch通知: { }", event); } public ZooKeeper getZooKeeper() { return zooKeeper; } public void setZooKeeper(ZooKeeper zooKeeper) { this.zooKeeper = zooKeeper; } acl之ip权限。 public class ZkNodeAclIp implements Watcher { private ZooKeeper zooKeeper = null; private static final Logger log = LoggerFactory.getLogger(ZkNodeAclIp.class); private static final String zkServerPath = "127.0.0.1:2181"; private static final Integer timeout = 5000; public ZkNodeAclIp() { } public ZkNodeAclIp(String connectString) { try { zooKeeper = new ZooKeeper(connectString, timeout, new ZkNodeAclIp()); } catch (Exception e) { e.printStackTrace(); if (zooKeeper != null) { try { zooKeeper.close(); } catch (Exception e1) { e1.printStackTrace(); } } } } public static void main(String[] args) throws Exception { ZkNodeAclIp zkNodeAcl = new ZkNodeAclIp(zkServerPath); // ip 方式的 acl ArrayList aclsIP = new ArrayList<>(); Id ipId1 = new Id("ip", "127.0.0.1"); aclsIP.add(new ACL(ZooDefs.Perms.ALL, ipId1)); // 创建节点 zkNodeAcl.createZKNode("/testaclip", "testaclip".getBytes(), aclsIP); // 验证ip是否有权限 Stat stat = new Stat(); byte[] result = zkNodeAcl.getZooKeeper().getData("/testaclip", false, stat); log.warn("当前值: { }, 版本: { }", new String(result), stat.getVersion()); } / * 创建节点 * @param path * @param data * @param acls */ private void createZKNode(String path, byte[] data, ArrayList acls) { String result = ""; try { // 同步创建 result = zooKeeper.create(path, data, acls, CreateMode.PERSISTENT); log.warn("同步创建临时节点: { } 成功。。。", result); } catch (Exception e) { e.printStackTrace(); } } @Override public void process(WatchedEvent event) { log.warn("接收到watch通知: { }", event); } public ZooKeeper getZooKeeper() { return zooKeeper; } public void setZooKeeper(ZooKeeper zooKeeper) { this.zooKeeper = zooKeeper; } Curator是netflix公司开源的一套zookeeper客户端,目前是Apache的顶级项目。与Zookeeper提供的原生客户端相比,Curator的抽象层次更高,简化了Zookeeper客户端的开发量。Curator解决了很多zookeeper客户端非常底层的细节开发工作,包括连接重连、反复注册wathcer和NodeExistsException 异常等。 引包: zookeeper curator-framework zookeeper curator-recipes zookeeper public class CuratorBase { private static final Logger log = LoggerFactory.getLogger(CuratorBase.class); //zk服务地址 static final String zk_path = "127.0.0.1:2181"; //会话超时,默认60秒 static final int session_timeout=60000; //连接超时时间 static final int connect_timeout=15000; / * 创建客户端 * @return */ private static CuratorFramework createClient(){ //重连策略:1秒3次 RetryPolicy retryPolicy = new RetryNTimes(1000,3); CuratorFramework zkClient = CuratorFrameworkFactory.builder() .connectionTimeoutMs(connect_timeout) .sessionTimeoutMs(session_timeout) .connectString(zk_path) .retryPolicy(retryPolicy) .build(); //开启链接 zkClient.start(); return zkClient; } public static void baseAPI() throws Exception { CuratorFramework zkCli = createClient(); CuratorFrameworkState state = zkCli.getState(); if(state.equals(CuratorFrameworkState.STARTED)){ / * 创建节点 * zk节点类型: * PERSISTENT : 持久化节点 * PERSISTENT_SEQUENTIAL : 持久化有序节点 * EPHEMERAL : 会话节点(伴随会话结束消失) * EPHEMERAL_SEQUENTIAL : 会话有序节点 */ String path = zkCli.create() .creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT) .forPath("/curator/base/1", "curator测试".getBytes()); log.warn("path : { }",path); / * 获取节点数据 */ byte[] bytes = zkCli.getData().forPath(path); log.warn("节点数据 : { } ",new String(bytes)); / * 更新节点数据 */ zkCli.setData().forPath(path,"修改后的数据".getBytes()); byte[] bytes1 = zkCli.getData().forPath(path); log.warn("更新节点数据 : { }",new String(bytes1)); / * 获取子节点 */ List children_paths.forEach(x->{ log.warn(path+" 子节点:"+x); }); / * 检查节点状态 */ Stat stat = zkCli.checkExists().forPath(path); log.warn(path+" 节点状态:"+stat.toString()); / * 删除节点 */ zkCli.delete().guaranteed().deletingChildrenIfNeeded().forPath(path); CountDownLatch countDownLatch = new CountDownLatch(1); ExecutorService executorService = Executors.newCachedThreadPool(); String path2 = zkCli.create() .creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT) .inBackground(new BackgroundCallback() { @Override public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception { log.warn("code:" + curatorEvent.getResultCode()); log.warn("type:" + curatorEvent.getType()); log.warn("线程为:" + Thread.currentThread().getName()); countDownLatch.countDown(); } }, executorService) .forPath("/curator/base/2","curator测试2".getBytes()); countDownLatch.await(); if(path2!=null){ byte[] bytes2 = zkCli.getData().forPath(path2); log.warn("/curator/base/2 : "+ new String(bytes)); } } } public static void main(String[] args) throws Exception { baseAPI(); } }public class BaseOperator { public static CuratorFramework getClient() { return CuratorFrameworkFactory.builder() .connectString("127.0.0.1:2181") .retryPolicy(new ExponentialBackoffRetry(1000, 3)) .connectionTimeoutMs(15 * 1000) //连接超时时间,默认15秒 .sessionTimeoutMs(60 * 1000) //会话超时时间,默认60秒 .namespace("arch") //设置命名空间 .build(); } public static void create(final CuratorFramework client, final String path, final byte[] payload) throws Exception { client.create().creatingParentsIfNeeded().forPath(path, payload); } public static void createEphemeral(final CuratorFramework client, final String path, final byte[] payload) throws Exception { client.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload); } public static String createEphemeralSequential(final CuratorFramework client, final String path, final byte[] payload) throws Exception { return client.create().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, payload); } public static void setData(final CuratorFramework client, final String path, final byte[] payload) throws Exception { client.setData().forPath(path, payload); } public static void delete(final CuratorFramework client, final String path) throws Exception { client.delete().deletingChildrenIfNeeded().forPath(path); } public static void guaranteedDelete(final CuratorFramework client, final String path) throws Exception { client.delete().guaranteed().forPath(path); } public static String getData(final CuratorFramework client, final String path) throws Exception { return new String(client.getData().forPath(path)); } public static List return client.getChildren().forPath(path); } zookeeper原生支持通过注册watcher来进行事件监听,但是其使用不是特别方便,需要开发人员自己反复注册watcher,比较繁琐。 Curator引入Cache来实现对zookeeper服务端事务的监听。Cache是Curator中对事件监听的包装,其对事件的监听其实可以近似看作是一个本地缓存视图和远程Zookeeper视图的对比过程。同时,Curator能够自动为开发人员处理反复注册监听,从而大大简化原生api开发的繁琐过程。 NodeCache: public class ZkCuratorNodeCache { public static void main(String[] args) throws Exception { nodeCache(); } public static void nodeCache() throws Exception { final String path = "/nodeCache"; final CuratorFramework client = BaseOperator.getClient(); client.start(); // BaseOperator.delete(client, path); BaseOperator.create(client, path, "cache".getBytes()); final NodeCache nodeCache = new NodeCache(client, path); nodeCache.start(true); nodeCache.getListenable() .addListener(() -> System.out.println("节点数据发生变化,新数据为:" + new String(nodeCache.getCurrentData().getData()))); BaseOperator.setData(client, path, "cache1".getBytes()); BaseOperator.setData(client, path, "cache2".getBytes()); Thread.sleep(1000); client.close(); } NodeCache可以监听指定的节点,注册监听器后,节点的变化会通知相应的监听器。 Path Cache: Path Cache 用来监听ZNode的子节点事件,包括added、updateed、removed,Path Cache会同步子节点的状态,产生的事件会传递给注册的PathChildrenCacheListener。 public class ZkCuratorPathCache { public static void main(String[] args) throws Exception { pathChildrenCache(); } public static void pathChildrenCache() throws Exception { final String path = "/pathChildrenCache"; final CuratorFramework client = BaseOperator.getClient(); client.start(); final PathChildrenCache cache = new PathChildrenCache(client, path, true); cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); cache.getListenable().addListener((client1, event) -> { switch (event.getType()) { case CHILD_ADDED: System.out.println("CHILD_ADDED:" + event.getData().getPath()); break; case CHILD_REMOVED: System.out.println("CHILD_REMOVED:" + event.getData().getPath()); break; case CHILD_UPDATED: System.out.println("CHILD_UPDATED:" + event.getData().getPath()); break; case CONNECTION_LOST: System.out.println("CONNECTION_LOST:" + event.getData().getPath()); break; case CONNECTION_RECONNECTED: System.out.println("CONNECTION_RECONNECTED:" + event.getData().getPath()); break; case CONNECTION_SUSPENDED: System.out.println("CONNECTION_SUSPENDED:" + event.getData().getPath()); break; case INITIALIZED: System.out.println("INITIALIZED:" + event.getData().getPath()); break; default: break; } }); // client.create().withMode(CreateMode.PERSISTENT).forPath(path); Thread.sleep(1000); client.create().withMode(CreateMode.PERSISTENT).forPath(path + "/c1"); Thread.sleep(1000); client.delete().forPath(path + "/c1"); Thread.sleep(1000); client.delete().forPath(path); //监听节点本身的变化不会通知 Thread.sleep(1000); client.close(); } TreeCache: Path Cache和Node Cache的“合体”,监视路径下的创建、更新、删除事件,并缓存路径下所有孩子结点的数据。 public class ZkCuratorTreeCache { public static void main(String[] args) throws Exception { treeCache(); } public static void treeCache() throws Exception { final String path = "/treeChildrenCache"; final CuratorFramework client = BaseOperator.getClient(); client.start(); final TreeCache cache = new TreeCache(client, path); cache.start(); cache.getListenable().addListener((client1, event) -> { switch (event.getType()){ case NODE_ADDED: System.out.println("NODE_ADDED:" + event.getData().getPath()); break; case NODE_REMOVED: System.out.println("NODE_REMOVED:" + event.getData().getPath()); break; case NODE_UPDATED: System.out.println("NODE_UPDATED:" + event.getData().getPath()); break; case CONNECTION_LOST: System.out.println("CONNECTION_LOST:" + event.getData().getPath()); break; case CONNECTION_RECONNECTED: System.out.println("CONNECTION_RECONNECTED:" + event.getData().getPath()); break; case CONNECTION_SUSPENDED: System.out.println("CONNECTION_SUSPENDED:" + event.getData().getPath()); break; case INITIALIZED: System.out.println("INITIALIZED:" + event.getData().getPath()); break; default: break; } }); client.create().withMode(CreateMode.PERSISTENT).forPath(path); Thread.sleep(1000); client.create().withMode(CreateMode.PERSISTENT).forPath(path + "/c1"); Thread.sleep(1000); BaseOperator.setData(client, path, "test".getBytes()); Thread.sleep(1000); client.delete().forPath(path + "/c1"); Thread.sleep(1000); client.delete().forPath(path); Thread.sleep(1000); client.close(); } 可重入锁Shared Reentrant Lock。 Shared意味着锁是全局可见的, 客户端都可以请求锁。Reentrant和JDK的ReentrantLock类似, 意味着同一个客户端在拥有锁的同时,可以多次获取,不会被阻塞。它是由类InterProcessMutex来实现。它的构造函数为: 不可重入锁Shared Lock。 使用InterProcessSemaphoreMutex,调用方法类似,区别在于该锁是不可重入的,在同一个线程中不可重入。 可重入读写锁Shared Reentrant Read Write Lock 类似JDK的ReentrantReadWriteLock. 一个读写锁管理一对相关的锁。一个负责读操作,另外一个负责写操作。读操作在写锁没被使用时可同时由多个进程使用,而写锁使用时不允许读 (阻塞)。此锁是可重入的。一个拥有写锁的线程可重入读锁,但是读锁却不能进入写锁。这也意味着写锁可以降级成读锁, 比如请求写锁 —>读锁 —->释放写锁。从读锁升级成写锁是不成的。主要由两个类实现: InterProcessReadWriteLock 信号量Shared Semaphore 一个计数的信号量类似JDK的Semaphore。JDK中Semaphore维护的一组许可(permits),而Cubator中称之为租约(Lease)。注意,所有的实例必须使用相同的numberOfLeases值。调用acquire会返回一个租约对象。客户端必须在finally中close这些租约对象,否则这些租约会丢失掉。但是, 但是,如果客户端session由于某种原因比如crash丢掉, 那么这些客户端持有的租约会自动close, 这样其它客户端可以继续使用这些租约。租约还可以通过下面的方式返还: public void returnAll(Collection 多锁对象Multi Shared Lock Multi Shared Lock是一个锁的容器。当调用acquire, 所有的锁都会被acquire,如果请求失败,所有的锁都会被release。同样调用release时所有的锁都被release(失败被忽略)。基本上,它就是组锁的代表,在它上面的请求释放操作都会传递给它包含的所有的锁。主要涉及两个类: InterProcessMultiLock 它的构造函数需要包含的锁的集合,或者一组ZooKeeper的path。 public InterProcessMultiLock(List 代码演示: public class ZkCuratorLock { private static final String zk_server = "127.0.0.1:2181"; private static final String zk_path = "/curator/zklock"; public static void doWithLock(CuratorFramework curatorFramework){ List zkPaths.add(zk_path); InterProcessMultiLock lock = new InterProcessMultiLock(curatorFramework,zkPaths); // InterProcessMutex lock2 = new InterProcessMutex(curatorFramework,zk_path); try { if(lock.acquire(30, TimeUnit.SECONDS)){ long threadId = Thread.currentThread().getId(); System.out.println("线程-"+threadId+",acquire lock"); Thread.sleep(1000); System.out.println("线程-"+threadId+",replease lock"); } }catch (Exception e){ e.fillInStackTrace(); }finally { try { lock.release(); } catch (Exception e) { e.fillInStackTrace(); } } } public static void main(String[] args) { ExecutorService es = Executors.newFixedThreadPool(10); for(int i=10;i>0;i--){ es.execute(new Runnable() { @Override public void run() { RetryNTimes retryNTimes = new RetryNTimes(1000, 3); CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(zk_server, retryNTimes); curatorFramework.start(); ZkCuratorLock.doWithLock(curatorFramework); } }); } es.shutdown(); } DistributedBarrier构造函数中barrierPath参数用来确定一个栅栏,只要barrierPath参数相同(路径相同)就是同一个栅栏。通常情况下栅栏的使用如下: 双栅栏Double Barrier 双栅栏允许客户端在计算的开始和结束时同步。当足够的进程加入到双栅栏时,进程开始计算,当计算完成时,离开栅栏。双栅栏类是DistributedDoubleBarrier DistributedDoubleBarrier类实现了双栅栏的功能。它的构造函数如下: // client - the client // barrierPath - path to use // memberQty - the number of members in the barrier memberQty是成员数量,当enter方法被调用时,成员被阻塞,直到所有的成员都调用了enter。当leave方法被调用时,它也阻塞调用线程,直到所有的成员都调用了leave。 注意:参数memberQty的值只是一个阈值,而不是一个限制值。当等待栅栏的数量大于或等于这个值栅栏就会打开! 与栅栏(DistributedBarrier)一样,双栅栏的barrierPath参数也是用来确定是否是同一个栅栏的,双栅栏的使用情况如下: 利用ZooKeeper可以实现一个集群共享的计数器。只要使用相同的path就可以得到最新的计数器值, 这是由ZooKeeper的一致性保证的。Curator有两个计数器, 一个是用int来计数,一个用long来计数。 1)SharedCount 这个类使用int类型来计数。主要涉及三个类。 2)DistributedAtomicLong 除了计数的范围比SharedCount大了之外, 它首先尝试使用乐观锁的方式设置计数器, 如果不成功(比如期间计数器已经被其它client更新了), 它使用InterProcessMutex方式来更新计数值。此计数器有一系列的操作: 你必须检查返回结果的succeeded(), 它代表此操作是否成功。如果操作成功, preValue()代表操作前的值, postValue()代表操作后的值。 curator提供了两种方式,分别是Leader Latch和Leader Election。 随机从候选者中选出一台作为leader,选中之后除非调用close()释放leadship,否则其他的后选择无法成为leader。 public class LeaderLatchTest { private static final String PATH = "/demo/leader"; public static void main(String[] args) { List List try { for (int i = 0; i < 10; i++) { CuratorFramework client = getClient(); client.start(); clients.add(client); final LeaderLatch leaderLatch = new LeaderLatch(client, PATH, "client#" + i); leaderLatch.addListener(new LeaderLatchListener() { @Override public void isLeader() { System.out.println(leaderLatch.getId() + ":I am leader. I am doing jobs!"); } @Override public void notLeader() { System.out.println(leaderLatch.getId() + ":I am not leader. I will do nothing!"); } }); latchList.add(leaderLatch); leaderLatch.start(); } Thread.sleep(1000 * 60); } catch (Exception e) { e.printStackTrace(); } finally { for (CuratorFramework client : clients) { CloseableUtils.closeQuietly(client); } for (LeaderLatch leaderLatch : latchList) { CloseableUtils.closeQuietly(leaderLatch); } } } public static CuratorFramework getClient() { return CuratorFrameworkFactory.builder() .connectString("127.0.0.1:2181") .retryPolicy(new ExponentialBackoffRetry(1000, 3)) .connectionTimeoutMs(15 * 1000) //连接超时时间,默认15秒 .sessionTimeoutMs(60 * 1000) //会话超时时间,默认60秒 .namespace("arch") //设置命名空间 .build(); } 通过LeaderSelectorListener可以对领导权进行控制, 在适当的时候释放领导权,这样每个节点都有可能获得领导权。而LeaderLatch则一直持有leadership, 除非调用close方法,否则它不会释放领导权。 public class LeaderSelectorTest { private static final String PATH = "/demo/leader"; public static void main(String[] args) { List List try { for (int i = 0; i < 10; i++) { CuratorFramework client = getClient(); client.start(); clients.add(client); final String name = "client#" + i; LeaderSelector leaderSelector = new LeaderSelector(client, PATH, new LeaderSelectorListenerAdapter() { @Override public void takeLeadership(CuratorFramework client) throws Exception { System.out.println(name + ":I am leader."); Thread.sleep(2000); } }); leaderSelector.autoRequeue(); leaderSelector.start(); selectors.add(leaderSelector); } Thread.sleep(Integer.MAX_VALUE); } catch (Exception e) { e.printStackTrace(); } finally { for (CuratorFramework client : clients) { CloseableUtils.closeQuietly(client); } for (LeaderSelector selector : selectors) { CloseableUtils.closeQuietly(selector); } } } public static CuratorFramework getClient() { return CuratorFrameworkFactory.builder() .connectString("127.0.0.1:2181") .retryPolicy(new ExponentialBackoffRetry(1000, 3)) .connectionTimeoutMs(15 * 1000) //连接超时时间,默认15秒 .sessionTimeoutMs(60 * 1000) //会话超时时间,默认60秒 .namespace("arch") //设置命名空间 .build(); } 至此Zookeeper的应用大体讲完了,在这里多说一句,技术的API不用去背,背也是背不住的,多使用就好了。第1章 常用命令
第2章 Java API使用
1、创建会话
2、创建节点
3、节点操作
4、Watcher机制及ACL
第3章 Curator应用
1、基础API
2、事件监听
3、分布式锁应用
4、栅栏barrier
5、计数器Counters
6、 选举