Apache Curator客户端的使用

  • zk 原生 api 的不足之处
    • 超时重连,不支持自动,需要手动操作
    • watch 注册一次会失效
    • 不支持递归创建节点
  • apache curator 提供更多解决方案并且实现简单:比如 分布式锁
  • apache curator 提供常用的 zookeeper工具类

依赖

这个依赖和 zk 原生依赖有版本对应关系,都存在要注意版本兼容。

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>4.0.1</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.0.1</version>
</dependency>

1.创建连接

  • build 后要start客户端。
  • .namespace("workspace")指定命名空间,该客户端成功创建的节点都会有这个前缀。
  • 可以使用getState()查看客户端的状态。
 /**
 * 方式一:推荐使用
 * curator 连接 zookeeper的策略ExponentialBackoffRetry
 * 参数介绍:
 * int baseSleepTimeMs, int maxRetries, int maxSleepMs
 * baseSleepTimeMs:初始化 sleep的时间
 * maxRetries:最大重试次数
 * maxSleepMs:最大重试时间
 */
 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);

/**
* 方式二:推荐使用
* curator 连接 zookeeper 的策略:RetryNTimes
* n:重试次数
* sleepMsBetweenRetries 每次重试间隔时间
*/
RetryPolicy retryPolicy1 = new RetryNTimes(3, 5000);

/**
* 方式三: 不推荐使用
* curator链接zookeeper的策略:RetryOneTime
* sleepMsBetweenRetry:每次重试间隔的时间,
* 只会重置一次,第一次失败隔 3 秒之后重置一次
*/
RetryPolicy retryPolicy2 = new RetryOneTime(3000);

/**
* 永远重试,不推荐使用
*/
//RetryPolicy retryPolicy3 = new RetryForever(retryIntervalMs);

/**
* curator连接 zookeeper的策略:RetryUntilElapsed
* maxElapsedTimeMs:最大重试时间
* sleepMsBetweenRetries:每次重试间隔
* 重试时间超过 maxElapsedTimeMs 后,就不再重试
*/
RetryPolicy retryPolicy4 = new RetryUntilElapsed(2000, 3000);


client = CuratorFrameworkFactory.builder().connectString(zkServerPath)
                                .sessionTimeoutMs(60000)
                                .retryPolicy(retryPolicy1)
                                .build();

client.start();

2.创建节点

  //创建节点
  String nodePath = "/lin/qin";
  byte[] data = "iamlinqin".getBytes();
  /**
  * creatingParentsIfNeeded():递归创建节点;
  * PERSISTENT:持久节点
  * OPEN_ACL_UNSAFE:权限,所有用户都可以操作
  */
  curatorConntion.client.create()
      .creatingParentsIfNeeded()
      .withMode(CreateMode.PERSISTENT)
      .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
      .forPath(nodePath, data);

3.修改数据

String nodePath = "/lin/qin";
byte[] newData = "666".getBytes();
curatorConntion.client.setData()
     .withVersion(0)
     .forPath(nodePath,newData);

4.查询节点数据

/*查询节点*/
Stat stat = new Stat();
byte[] getResultData = curatorConntion.client.getData().storingStatIn(stat).forPath(nodePath);
System.out.println("节点" + nodePath + "的数据为:"+new String(getResultData));
System.out.println("该节点的版本号为:"+ stat.getVersion());


/* 查询当前节点下的子节点 */
List<String> childNodes = curatorConntion.client.getChildren().forPath("/lin");
for (String childNode : childNodes) {
    System.out.println(childNode);
}

Stat stat1 = curatorConntion.client.checkExists().forPath("/lin/qin1");
if (stat1 == null) {
    System.out.println("该节点不存在!");
}

5.删除数据节点

删除一个节点

client.delete().forPath("path");

注意,此方法只能删除叶子节点,否则会抛出异常。

删除一个节点,并且递归删除其所有的子节点

client.delete().deletingChildrenIfNeeded().forPath("path");

删除一个节点,强制指定版本进行删除

client.delete().withVersion(10086).forPath("path");

删除一个节点,强制保证删除

client.delete().guaranteed().forPath("path");

guaranteed() 接口是一个保障措施,只要客户端会话有效,那么 Curator 会在后台持续进行删除操作,直到删除节点成功。

**注意:**上面的多个流式接口是可以自由组合的,例如:

client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(10086).forPath("path");

6.设置 watch

//watcher事件,当使用 usingWatcher的时候,监听只会触发一次,监听完毕后就销毁
/*************监听只会触发一次***************/
//        curatorConntion.client.getData().usingWatcher((CuratorWatcher) watchedEvent -> {
//            System.out.println("getData触发watcher,节点路径为:" + watchedEvent.getPath());
//        }).forPath(nodePath);


/*************一次监听N次触发***************/
//NodeCache:监听数据节点的变更,会触发事件
final NodeCache nodeCache = new NodeCache(curatorConntion.client, nodePath);
//buildInitial:初始化的时候获取node的值并且缓存,默认false,不缓存
nodeCache.start(true);
if (nodeCache.getCurrentData() != null) {
    System.out.println("节点初始化数据为:" + new String(nodeCache.getCurrentData().getData()));
} else {
    System.out.println("节点初始化数据为空....");
}
nodeCache.getListenable().addListener(new NodeCacheListener() {
public void nodeChanged() throws Exception {
if (nodeCache.getCurrentData() == null) {
    System.out.println("空节点");
    return;
}
String data = new String(nodeCache.getCurrentData().getData());
System.out.println("节点路径:" + nodeCache.getCurrentData().getPath() + "数据: " + data);
}
});

7.事务

CuratorFramework 的实例包含 inTransaction( ) 接口方法,调用此方法开启一个ZooKeeper事务. 可以复合 create, setData, check, and/or delete 等操作然后调用 commit() 作为一个原子操作提交。一个例子如下:

client.inTransaction().check().forPath("path")
      .and()
      .create().withMode(CreateMode.EPHEMERAL).forPath("path","data".getBytes())
      .and()
      .setData().withVersion(10086).forPath("path","data2".getBytes())
      .and()
      .commit();

8.异步任务

一个异步创建节点的例子如下:

Executor executor = Executors.newFixedThreadPool(2);
client.create()
      .creatingParentsIfNeeded()
      .withMode(CreateMode.EPHEMERAL)
      .inBackground((curatorFramework, curatorEvent) -> {      System.out.println(String.format("eventType:%s,resultCode:%s",curatorEvent.getType(),curatorEvent.getResultCode()));
      },executor)
      .forPath("path");

注意:如果 inBackground() 方法不指定 executor,那么会默认使用 Curator 的 EventThread 去进行异步处理。

上次更新时间: 2024/5/7 05:59:02