Apache Curator客户端的使用
- zk 原生 api 的不足之处
- 超时重连,不支持自动,需要手动操作
- watch 注册一次会失效
- 不支持递归创建节点
- apache curator 提供更多解决方案并且实现简单:比如 分布式锁
- apache curator 提供常用的 zookeeper工具类
依赖
这个依赖和 zk 原生依赖有版本对应关系,都存在要注意版本兼容。
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.1</version>
</dependency>
1.创建连接
- build 后要
start
客户端。 .namespace("workspace")
指定命名空间,该客户端成功创建的节点都会有这个前缀。- 可以使用
getState()
查看客户端的状态。
/**
* 方式一:推荐使用
* curator 连接 zookeeper的策略ExponentialBackoffRetry
* 参数介绍:
* int baseSleepTimeMs, int maxRetries, int maxSleepMs
* baseSleepTimeMs:初始化 sleep的时间
* maxRetries:最大重试次数
* maxSleepMs:最大重试时间
*/
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
/**
* 方式二:推荐使用
* curator 连接 zookeeper 的策略:RetryNTimes
* n:重试次数
* sleepMsBetweenRetries 每次重试间隔时间
*/
RetryPolicy retryPolicy1 = new RetryNTimes(3, 5000);
/**
* 方式三: 不推荐使用
* curator链接zookeeper的策略:RetryOneTime
* sleepMsBetweenRetry:每次重试间隔的时间,
* 只会重置一次,第一次失败隔 3 秒之后重置一次
*/
RetryPolicy retryPolicy2 = new RetryOneTime(3000);
/**
* 永远重试,不推荐使用
*/
//RetryPolicy retryPolicy3 = new RetryForever(retryIntervalMs);
/**
* curator连接 zookeeper的策略:RetryUntilElapsed
* maxElapsedTimeMs:最大重试时间
* sleepMsBetweenRetries:每次重试间隔
* 重试时间超过 maxElapsedTimeMs 后,就不再重试
*/
RetryPolicy retryPolicy4 = new RetryUntilElapsed(2000, 3000);
client = CuratorFrameworkFactory.builder().connectString(zkServerPath)
.sessionTimeoutMs(60000)
.retryPolicy(retryPolicy1)
.build();
client.start();
2.创建节点
//创建节点
String nodePath = "/lin/qin";
byte[] data = "iamlinqin".getBytes();
/**
* creatingParentsIfNeeded():递归创建节点;
* PERSISTENT:持久节点
* OPEN_ACL_UNSAFE:权限,所有用户都可以操作
*/
curatorConntion.client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath(nodePath, data);
3.修改数据
String nodePath = "/lin/qin";
byte[] newData = "666".getBytes();
curatorConntion.client.setData()
.withVersion(0)
.forPath(nodePath,newData);
4.查询节点数据
/*查询节点*/
Stat stat = new Stat();
byte[] getResultData = curatorConntion.client.getData().storingStatIn(stat).forPath(nodePath);
System.out.println("节点" + nodePath + "的数据为:"+new String(getResultData));
System.out.println("该节点的版本号为:"+ stat.getVersion());
/* 查询当前节点下的子节点 */
List<String> childNodes = curatorConntion.client.getChildren().forPath("/lin");
for (String childNode : childNodes) {
System.out.println(childNode);
}
Stat stat1 = curatorConntion.client.checkExists().forPath("/lin/qin1");
if (stat1 == null) {
System.out.println("该节点不存在!");
}
5.删除数据节点
删除一个节点
client.delete().forPath("path");
注意,此方法只能删除叶子节点,否则会抛出异常。
删除一个节点,并且递归删除其所有的子节点
client.delete().deletingChildrenIfNeeded().forPath("path");
删除一个节点,强制指定版本进行删除
client.delete().withVersion(10086).forPath("path");
删除一个节点,强制保证删除
client.delete().guaranteed().forPath("path");
guaranteed() 接口是一个保障措施,只要客户端会话有效,那么 Curator 会在后台持续进行删除操作,直到删除节点成功。
**注意:**上面的多个流式接口是可以自由组合的,例如:
client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(10086).forPath("path");
6.设置 watch
//watcher事件,当使用 usingWatcher的时候,监听只会触发一次,监听完毕后就销毁
/*************监听只会触发一次***************/
// curatorConntion.client.getData().usingWatcher((CuratorWatcher) watchedEvent -> {
// System.out.println("getData触发watcher,节点路径为:" + watchedEvent.getPath());
// }).forPath(nodePath);
/*************一次监听N次触发***************/
//NodeCache:监听数据节点的变更,会触发事件
final NodeCache nodeCache = new NodeCache(curatorConntion.client, nodePath);
//buildInitial:初始化的时候获取node的值并且缓存,默认false,不缓存
nodeCache.start(true);
if (nodeCache.getCurrentData() != null) {
System.out.println("节点初始化数据为:" + new String(nodeCache.getCurrentData().getData()));
} else {
System.out.println("节点初始化数据为空....");
}
nodeCache.getListenable().addListener(new NodeCacheListener() {
public void nodeChanged() throws Exception {
if (nodeCache.getCurrentData() == null) {
System.out.println("空节点");
return;
}
String data = new String(nodeCache.getCurrentData().getData());
System.out.println("节点路径:" + nodeCache.getCurrentData().getPath() + "数据: " + data);
}
});
7.事务
CuratorFramework 的实例包含 inTransaction( ) 接口方法,调用此方法开启一个ZooKeeper事务. 可以复合 create, setData, check, and/or delete 等操作然后调用 commit() 作为一个原子操作提交。一个例子如下:
client.inTransaction().check().forPath("path")
.and()
.create().withMode(CreateMode.EPHEMERAL).forPath("path","data".getBytes())
.and()
.setData().withVersion(10086).forPath("path","data2".getBytes())
.and()
.commit();
8.异步任务
一个异步创建节点的例子如下:
Executor executor = Executors.newFixedThreadPool(2);
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.inBackground((curatorFramework, curatorEvent) -> { System.out.println(String.format("eventType:%s,resultCode:%s",curatorEvent.getType(),curatorEvent.getResultCode()));
},executor)
.forPath("path");
注意:如果 inBackground() 方法不指定 executor,那么会默认使用 Curator 的 EventThread 去进行异步处理。