zookeeper API

1.maven 依赖

   <!-- zk 原生api-->
   <dependency>
       <groupId>org.apache.zookeeper</groupId>
       <artifactId>zookeeper</artifactId>
       <version>3.4.14</version>
   </dependency>

2.创建 zk 连接

/**
* 参数说明:
* connectString, sessionTimeout, watcher, sessionId, sessionPasswd,canBeReadOnly
*
* connectString: 连接服务器的ip字符串,比如: "192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181"
* 可以是一个ip,也可以是多个ip,一个ip代表单机,多个ip代表集群,也可以在ip后加路径
* sessionTimeout:超时时间,心跳收不到了,那就超时
* watcher:通知事件,如果有对应的事件触发,则会收到一个通知,实现implements Watcher接口;如果不需要,那就设置为null。
* canBeReadOnly:可读,当这个物理机节点断开后,还是可以读到数据的,只是不能写,
*	此时数据被读取到的可能是旧数据,此处建议设置为false,不推荐使用
* sessionId:会话的id
* sessionPasswd:会话密码	当会话丢失后,可以依据 sessionId 和 sessionPasswd 重新获取会话
*
*/
ZooKeeper zk = new ZooKeeper(zkServerPath, timeout, (WatchedEvent watchedEvent) -> {
log.warn("接受到watch通知:{}", watchedEvent);});

也可以使用缓存的sessionId以及sessionPasswd重新获取会话

// 重连
long sessionId = zk.getSessionId();
String ssid = "0x" + Long.toHexString(sessionId); // 十六进制转换
log.warn("sessionId:{}", ssid);
byte[] sessionPasswd = zk.getSessionPasswd();
log.warn("开始会话重连...");
ZooKeeper zooKeeper = new ZooKeeper(zkServerPath,
    timeout,
    new ZKConnect(),
    sessionId,
    sessionPasswd);

3.创建 zk 节点

//同步创建,timeout 不能太短
zk.create("/lin", "qin".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
//方式二: 异步创建节点
  zk.create("/worke_1", "123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, new CreateStringCallBack(), "创建");

 class CreateStringCallBack implements AsyncCallback.StringCallback {
        /**
         * @param i  i<0创建失败 i=0 创建成功
         * @param s  节点名称
         * @param o  传入的上下文
         * @param s1 节点名称
         */
        public void processResult(int i, String s, Object o, String s1) {  
        }
    }

CreateMode 创建模式源码:

public enum CreateMode {
    /**
     * The znode will not be automatically deleted upon client's disconnect.
       久节点:节点创建后,会一直存在,不会因客户端会话失效而删除
     */
    PERSISTENT (0, false, false),
    /**
    * The znode will not be automatically deleted upon client's disconnect,
    * and its name will be appended with a monotonically increasing number.
    持久顺序节点:基本特性与持久节点一致,创建节点的过程中,zookeeper会在其名字后自动追加一个单调增长的数字后		缀,作为新的节点名;
    */
    PERSISTENT_SEQUENTIAL (2, false, true),
    /**
     * The znode will be deleted upon the client's disconnect.
     临时节点:客户端会话失效或连接关闭后,该节点会被自动删除,且不能再临时节点下面创建子节点
     */
    EPHEMERAL (1, true, false),
    /**
     * The znode will be deleted upon the client's disconnect, and its name
     * will be appended with a monotonically increasing number.临时顺序节点
     */
    EPHEMERAL_SEQUENTIAL (3, true, true);

acl:控制权限策略:

Ids.OPEN_ACL_UNSAFE world:anyone:cdrwa
REATOR_ALL_ACL auth:user:password:cdrwa

4.修改 zk 节点数据

同步更新,也支持异步回调

zk.setData("/lin", "1234".getBytes(), 0);

5.删除 zk 节点

同步删除,要正确的版本好,类似 CAS 。

if (zk.exists("/lin", null) != null) {
	zk.delete("/lin", 1);
}

6.获取节点信息

Stat 对应节点的详细信息。访问则初始化连接时声明用户即可。

zk.addAuthInfo("digest", "user:user".getBytes());

// 获取节点信息
byte[] data = zk.getData("/lin", true, new Stat());
String result = new String(data);
System.out.println("当前值:" + result);

List<String> children = zk.getChildren("/", true, new Stat());
children.forEach(System.out::println);

7.自定义用户认证访问

//自定义用户认证访问
List<ACL> acls = new ArrayList<>();
Id lin = new Id("digest", DigestAuthenticationProvider.generateDigest("user:user"));
acls.add(new ACL(ZooDefs.Perms.ALL, lin));
zk.create("/lin", "qin".getBytes(), acls, CreateMode.PERSISTENT);

8.ip 权限

 List<ACL> aclsIP = new ArrayList<>();
 Id ipId1 = new Id("ip", "127.0.0.1");
 Id ipId2 = new Id("ip", "192.168.152.1");
 aclsIP.add(new ACL(ZooDefs.Perms.ALL, ipId1));
 aclsIP.add(new ACL(ZooDefs.Perms.ALL, ipId2));
 zk.create("/lin/testIp", "testIp".getBytes(), aclsIP, CreateMode.EPHEMERAL);
上次更新时间: 2024/5/7 05:59:02