package CreateGroup; import java.io.IOException; import java.util.List; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.AsyncCallback.Children2Callback; import org.apache.zookeeper.AsyncCallback.ChildrenCallback; import org.apache.zookeeper.AsyncCallback.StatCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.server.ConnectionBean; public class TestZkGroup implements Watcher { private static final int SESSION_TIMEOUT = 5000; private ZooKeeper zk; private CountDownLatch connectedSignal = new CountDownLatch(2); Childwatcher childwatcher = new Childwatcher(); private void getthreadname(String funname){ Thread current = Thread.currentThread(); System.out.println(funname + " is call in " + current.getName()); } @Override // 運行在另外一個線程 main-EventThread public void process(WatchedEvent event) { getthreadname("process"); System.out.println((event.getType())); // 打印狀態 if (event.getState() == KeeperState.SyncConnected){ connectedSignal.countDown(); } } // 測試自定義監聽 public Watcher wh = new Watcher() { public void process(WatchedEvent event) { getthreadname("Watcher::process"); System.out.println("回調watcher實例: 路徑" + event.getPath() + " 類型:" + event.getType()); } }; public void connect(String hosts) throws IOException, InterruptedException{ getthreadname("connect"); zk = new ZooKeeper(hosts, SESSION_TIMEOUT, wh); // 最后一個參數用this會調用自身的監聽 wh 代表? //connectedSignal.await(); // 主線程掛起 } // 加入組(可以理解成一個創建本次連接的一個組) public void join(String groupname, String meberName) throws KeeperException, InterruptedException{ String path = "/" + groupname + "/" + meberName ; // EPHEMERAL斷開將被刪除 String createdpath = zk.create(path, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println("join : " + createdpath); } public List<String> getChilds(String path) throws KeeperException, InterruptedException { // TODO Auto-generated method stub if (zk != null) { // return zk.getChildren(path, false); // false表示沒有設置觀察 //zk.getChildren(path, true, childwatcher.processResult, null); zk.getChildren( path, true, new AsyncCallback.Children2Callback() { @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { // stat參數代表元數據 // TODO Auto-generated method stub System.out.println("****"); for (int i = 0; i < children.size() - 1; i ++){ System.out.println("mempath = " + children.get(i) + stat); } } }, null); } return null; } public void create(String path) throws KeeperException, InterruptedException, IOException{ // Ids.OPEN_ACL_UNSAFE 開放式ACL,允許任何客戶端對znode進行讀寫 // CreateMode.PERSISTENT 持久的znode,本次連接斷開后還會存在,應該有持久化操作. // PERSISTENT_SEQUENTIAL 持久化順序,這個由zookeeper來保證 String createdpath = zk.create(path, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("crateed : " + createdpath); } private void close() throws InterruptedException{ if (zk != null){ zk.close(); } } public static class Childwatcher implements Children2Callback{ public ChildrenCallback processResult; @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { // TODO Auto-generated method stub System.out.println("**** path " + stat); } } public void delete(String groupname) throws InterruptedException, KeeperException{ zk.delete(groupname, -1); } public Stat isexist(String groupname) throws InterruptedException, KeeperException{ return zk.exists(groupname, true); // this } public void write(String path, String value)throws Exception{ Stat stat=zk.exists(path, false); if(stat==null){ zk.create(path, value.getBytes("UTF-8"), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); }else{ zk.setData(path, value.getBytes("UTF-8"), -1); } } public String read(String path,Watcher watch)throws Exception{ byte[] data=zk.getData(path, watch, null); return new String(data, "UTF-8"); } public static void main(String[] args) throws Exception { String hosts = "localhost"; String groupname = "zktest" ; String meberName = String.valueOf(System.currentTimeMillis()); String path = "/" + groupname; // create TestZkGroup test = new TestZkGroup(); // 連接 test.connect(hosts); // if (null != test.isexist(path)){ test.delete(path); } test.isexist(path); test.create(path); test.isexist(path); test.write(path, "test"); test.isexist(path); String result = test.read(path, test.wh); System.out.println(path + " value = " + result); int sum = 0; for (int j = 0 ; j< 10000; j++){ sum++; Thread.sleep(10); } System.out.println(sum); test.close(); System.exit(2); // 一個本地連接的znode test.connect(hosts); test.join(groupname, meberName); // 遍歷 List<String> memlist = test.getChilds("/" + "zktest" ); if (memlist != null){ for (int i = 0; i < memlist.size() - 1; i ++){ System.out.println("mempath = " + memlist.get(i)); } } } }
?
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
