zookeeper API
1.maven 依赖
<!-- zk 原生api-->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.14</version>
</dependency>
2.创建 zk 连接
/**
* 参数说明:
* connectString, sessionTimeout, watcher, sessionId, sessionPasswd,canBeReadOnly
*
* connectString: 连接服务器的ip字符串,比如: "192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181"
* 可以是一个ip,也可以是多个ip,一个ip代表单机,多个ip代表集群,也可以在ip后加路径
* sessionTimeout:超时时间,心跳收不到了,那就超时
* watcher:通知事件,如果有对应的事件触发,则会收到一个通知,实现implements Watcher接口;如果不需要,那就设置为null。
* canBeReadOnly:可读,当这个物理机节点断开后,还是可以读到数据的,只是不能写,
* 此时数据被读取到的可能是旧数据,此处建议设置为false,不推荐使用
* sessionId:会话的id
* sessionPasswd:会话密码 当会话丢失后,可以依据 sessionId 和 sessionPasswd 重新获取会话
*
*/
ZooKeeper zk = new ZooKeeper(zkServerPath, timeout, (WatchedEvent watchedEvent) -> {
log.warn("接受到watch通知:{}", watchedEvent);});
也可以使用缓存的sessionId
以及sessionPasswd
重新获取会话
// 重连
long sessionId = zk.getSessionId();
String ssid = "0x" + Long.toHexString(sessionId); // 十六进制转换
log.warn("sessionId:{}", ssid);
byte[] sessionPasswd = zk.getSessionPasswd();
log.warn("开始会话重连...");
ZooKeeper zooKeeper = new ZooKeeper(zkServerPath,
timeout,
new ZKConnect(),
sessionId,
sessionPasswd);
3.创建 zk 节点
//同步创建,timeout 不能太短
zk.create("/lin", "qin".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
//方式二: 异步创建节点
zk.create("/worke_1", "123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, new CreateStringCallBack(), "创建");
class CreateStringCallBack implements AsyncCallback.StringCallback {
/**
* @param i i<0创建失败 i=0 创建成功
* @param s 节点名称
* @param o 传入的上下文
* @param s1 节点名称
*/
public void processResult(int i, String s, Object o, String s1) {
}
}
CreateMode 创建模式源码:
public enum CreateMode {
/**
* The znode will not be automatically deleted upon client's disconnect.
久节点:节点创建后,会一直存在,不会因客户端会话失效而删除
*/
PERSISTENT (0, false, false),
/**
* The znode will not be automatically deleted upon client's disconnect,
* and its name will be appended with a monotonically increasing number.
持久顺序节点:基本特性与持久节点一致,创建节点的过程中,zookeeper会在其名字后自动追加一个单调增长的数字后 缀,作为新的节点名;
*/
PERSISTENT_SEQUENTIAL (2, false, true),
/**
* The znode will be deleted upon the client's disconnect.
临时节点:客户端会话失效或连接关闭后,该节点会被自动删除,且不能再临时节点下面创建子节点
*/
EPHEMERAL (1, true, false),
/**
* The znode will be deleted upon the client's disconnect, and its name
* will be appended with a monotonically increasing number.临时顺序节点
*/
EPHEMERAL_SEQUENTIAL (3, true, true);
acl:控制权限策略:
Ids.OPEN_ACL_UNSAFE | world:anyone:cdrwa |
---|---|
REATOR_ALL_ACL | auth:user:password:cdrwa |
4.修改 zk 节点数据
同步更新,也支持异步回调
zk.setData("/lin", "1234".getBytes(), 0);
5.删除 zk 节点
同步删除,要正确的版本好,类似 CAS 。
if (zk.exists("/lin", null) != null) {
zk.delete("/lin", 1);
}
6.获取节点信息
Stat 对应节点的详细信息。访问则初始化连接时声明用户即可。
zk.addAuthInfo("digest", "user:user".getBytes());
// 获取节点信息
byte[] data = zk.getData("/lin", true, new Stat());
String result = new String(data);
System.out.println("当前值:" + result);
List<String> children = zk.getChildren("/", true, new Stat());
children.forEach(System.out::println);
7.自定义用户认证访问
//自定义用户认证访问
List<ACL> acls = new ArrayList<>();
Id lin = new Id("digest", DigestAuthenticationProvider.generateDigest("user:user"));
acls.add(new ACL(ZooDefs.Perms.ALL, lin));
zk.create("/lin", "qin".getBytes(), acls, CreateMode.PERSISTENT);
8.ip 权限
List<ACL> aclsIP = new ArrayList<>();
Id ipId1 = new Id("ip", "127.0.0.1");
Id ipId2 = new Id("ip", "192.168.152.1");
aclsIP.add(new ACL(ZooDefs.Perms.ALL, ipId1));
aclsIP.add(new ACL(ZooDefs.Perms.ALL, ipId2));
zk.create("/lin/testIp", "testIp".getBytes(), aclsIP, CreateMode.EPHEMERAL);