【www.bbyears.com--php安装】
有这么一个场景,多套测试的zookeeper集群之间数据拷贝,按之前的理解,如果可以的话,直接拷贝zk的data文件可以解决问题。
但是最近碰到这么一个事情,只是无意中删除了一个集群某一个路径下的数据,由于data数据并不可读,不能有选择的copy。所以,在这场景下,只能通过代码的方式解决了。
我所用到的2个场景:
1. 误删除数据,从其它zookeeper集群拷贝
2. 搭建测试环境,直接从线上导入部分节点数据到本地
zookeeper管理node,基本和我们的文件系统一致,这个需求就变得非常简单,可以直接转换为递归一个路径,然后在新的集群下创建。简单的实现如下:
import com.metaboy.common.zk.dao.ZkDaoImpl;
import com.netflix.curator.framework.CuratorFramework;
import com.netflix.curator.framework.CuratorFrameworkFactory;
import com.netflix.curator.retry.RetryNTimes;
import java.util.List;
/**
* @author yuxiong.wangy
* Date: 14-8-14
* Time: 下午7:33
*/
public class ZookeeperDemo {
protected static CuratorFramework client_src;
protected static CuratorFramework client_dst;
protected static String namespace_src;
protected static String namespace_dst;
protected static String zkRoot_src = “/app”;
public static void main(String[] args) throws InterruptedException {
String zkConnectionStr_src = “*.*.*.*:2181,*.*.*.*:2181,*.*.*.*:2181”;
client_src = getZKClient(zkConnectionStr_src,namespace_src,60000);
String zkConnectionStr_dst = “*.*.*.*:2181,*.*.*.*:2181,*.*.*.*:2181”;
client_dst = getZKClient(zkConnectionStr_dst,namespace_dst,60000);
copyDataRecursion(zkRoot_src);
}
/*
* 递归拷贝数据
*/
public static void copyDataRecursion(String parent){
List
if(groups.size() > 0){
for(String group: groups){
String path = parent+”/”+group;
if(ZkDaoImpl.getData(client_src, path) != null){
ZkDaoImpl.createPersistentFile(client_dst, path, ZkDaoImpl.getData(client_src, path));
System.out.println(“[” + path + “]:” + ZkDaoImpl.getData(client_src,path));
}else{
ZkDaoImpl.createPersistentFile(client_dst, path);
System.out.println(“[” + path + “]:”);
}
if(ZkDaoImpl.getChildren(client_src,path).size() > 0){
copyDataRecursion(path);
}
}
}else {
ZkDaoImpl.createPersistentFile(client_dst,parent,ZkDaoImpl.getData(client_src,parent));
}
}
public static CuratorFramework getZKClient(String zkConnectionStr, String namespace, int sessionTimeout) throws InterruptedException {
int connectTimeout = 60000;
int retry = 3;
int retryTimeout = 10000;
CuratorFramework client = CuratorFrameworkFactory.builder().connectString(zkConnectionStr)
.retryPolicy(new RetryNTimes(retry, retryTimeout)).connectionTimeoutMs(connectTimeout)
.sessionTimeoutMs(sessionTimeout).namespace(namespace).build();
client.start();
client.getZookeeperClient().blockUntilConnectedOrTimedOut();
return client;
}
}
ZkDaoImpl封装一个对ZK的操作的集合,类似下面这种,将原生的方法包装了下,使用上是差不多:
/**
* 获取文件数据
*
* @param path
* 文件路径
* @return 文件内容
*/
public static String getData(CuratorFramework client, String path) {
try {
return new String(client.getData().forPath(path));
} catch (Exception e) {
throw new ZkException(ZkErrors.GET_DATA_EXCEPTION, “path:” + path, e);
}
}