ZooKeeper有一個Java和C綁定的官方API。ZooKeeper社區提供了對於大多數語言(.NET,Python等)的非官方API。使用ZooKeeper的API,應用程式可以連接,互動,運算元據,協調,以及從ZooKeeper集成斷開。
ZooKeeper API有一組豐富的功能,在一個簡單而安全的方式在ZooKeeper集成獲得所有功能。ZooKeeper API提供同步和非同步方法。
ZooKeeper的集成和ZooKeeper API 在各個方面完全互補,它有利於開發商在一個簡便的方式。 在本章討論Java綁定。
ZooKeeper的API基礎知識
應用程式使用 ZooKeeper 集成的交互稱為 ZooKeeper 客戶端。
Znode 是 ZooKeeper 集成的核心組件,ZooKeeper API提供一個方法來處理znode所有使用ZooKeeper 集成。
客戶端應遵循下麵給出帶 ZooKeeper 集成一個清晰的交互步驟。
-
連接到ZooKeeper 。ZooKeeper 集成分配客戶端的會話ID。
-
定期發送心跳到伺服器。否則,ZooKeeper 集成過期的會話ID,那麼客戶端需要重新連接。
-
獲得/設置只要znodes會話ID是活動的。
-
從 ZooKeeper 集成斷開,當所有的任務都完成後。如果客戶端處於非活動狀態較長時間,那麼 ZooKeeper 集成會自動斷開客戶機。
Java綁定
讓我們這一章中理解最重要的ZooKeeper API。ZooKeeper API的中心部分是ZooKeeper 類。它提供了一些選項來連接 ZooKeeper 集成在其構造,有以下幾種方法 −
-
connect − 連接到 ZooKeeper 的集成
-
create − 創建一個 znode
-
exists − 檢查znode是否存在及其資訊
-
getData − 從一個特定的znode獲取數據
-
setData − 設置數據在特定znode
-
getChildren − 得到一個特定 znode 的所有可用子節點
-
delete − 得到一個特定的 znode 及其所有子節點
-
close − 關閉連接
連接到 ZooKeeper 集合
ZooKeeper類通過它的構造函數提供了連接功能。構造函數的簽名如下:
ZooKeeper(String connectionString, int sessionTimeout, Watcher watcher)
在這裏,
-
connectionString − ZooKeeper集合主機。
-
sessionTimeout − 以毫秒為單位會話超時。
-
watcher − 一個執行對象“觀察者”的介面。ZooKeeper 集合返回通過監控器對象的連接狀態。
讓我們創建一個新的輔助類 ZooKeeperConnection 並添加一個方法連接。在連接方法創建一個 ZooKeeper 對象,連接到 ZooKeeper 集合,然後返回該對象。
這裏CountDownLatch用於停止(等待)主進程,直到客戶端與 ZooKeeper 集合連接。
ZooKeeper集合通過觀察回調回應連接狀態。一旦客戶與 ZooKeeper 集合連接,觀察者回調調用CountDownLatch 釋放鎖倒計時方法在主進程中等待監視回調會被調用。
下麵是完整的代碼,ZooKeeper集合連接。
代碼: ZooKeeperConnection.java
// import java classes import java.io.IOException; import java.util.concurrent.CountDownLatch; // import zookeeper classes import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.AsyncCallback.StatCallback; import org.apache.zookeeper.KeeperException.Code; import org.apache.zookeeper.data.Stat; public class ZooKeeperConnection { // declare zookeeper instance to access ZooKeeper ensemble private ZooKeeper zoo; final CountDownLatch connectedSignal = new CountDownLatch(1); // Method to connect zookeeper ensemble. public ZooKeeper connect(String host) throws IOException,InterruptedException { zoo = new ZooKeeper(host,5000,new Watcher() { public void process(WatchedEvent we) { if (we.getState() == KeeperState.SyncConnected) { connectedSignal.countDown(); } } }); connectedSignal.await(); return zoo; } // Method to disconnect from zookeeper server public void close() throws InterruptedException { zoo.close(); } }
保存上述代碼,它將被用於下一部分,用於連接的ZooKeeper集合。
創建一個Znode
ZooKeeper類提供了一個方法來在集合 ZooKeeper 創建一個新的 znode。創建方法的簽名如下:
create(String path, byte[] data, List<ACL> acl, CreateMode createMode)
Where,
-
path − Znode的路徑。例如 /myapp1, /myapp2, /myapp1/mydata1, myapp2/mydata1/myanothersubdata
-
data − 在一個指定的znode路徑存儲數據
-
acl − 要創建節點的訪問控制列表。 ZooKeeperAPI提供了一個靜態介面ZooDefs.Ids得到一些基本的ACL列表。例如,ZooDefs.Ids.OPEN_ACL_UNSAFE返回ACL開放的 znodes 列表。
-
createMode − 節點的類型,可以是短暫的,連續的,或兩者。這是一個枚舉類型。
讓我們創建一個新的Java應用程式來檢查 ZooKeeper API 創建功能。創建一個檔ZKCreate.java。在main方法中,創建一個類型ZooKeeperConnection 的對象,並調用connect方法連接到 ZooKeeper 集合。
連接方法將返回 ZooKeeper 對象 zk。 現在,調用自定義路徑和數據創建 zk 對象的方法。.
完整的程式代碼,以創建一個znode如下 -
代碼: ZKCreate.java
import java.io.IOException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs; public class ZKCreate { // create static instance for zookeeper class. private static ZooKeeper zk; // create static instance for ZooKeeperConnection class. private static ZooKeeperConnection conn; // Method to create znode in zookeeper ensemble public static void create(String path, byte[] data) throws KeeperException,InterruptedException { zk.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } public static void main(String[] args) { // znode path String path = "/MyFirstZnode"; // Assign path to znode // data in byte array byte[] data = "My first zookeeper app”.getBytes(); // Declare data try { conn = new ZooKeeperConnection(); zk = conn.connect("localhost"); create(path, data); // Create the data to the specified path conn.close(); } catch (Exception e) { System.out.println(e.getMessage()); //Catch error message } } }
一旦應用程式編譯和執行,使用指定數據的 znode 將在ZooKeeper集合創建。您可以使用 ZooKeeper CLI 的 zkCli.sh 來檢查它。
cd /path/to/zookeeper bin/zkCli.sh >>> get /MyFirstZnode
Exists – 檢查一個Znode的存在
ZooKeeper類提供了 exists 方法來檢查 znode 的存在。如果指定的 znode 存在它返回一個 znode 元數據。exists 方法的簽名如下 −
exists(String path, boolean watcher)
在這裏 ,
-
path − Znode 路徑
-
watcher − 布爾值,指定是否監視指定的znode與否
讓我們創建一個新的Java應用程式來檢查 ZooKeeper API的 “exists” 功能。創建一個檔 “ZKExists.java”。
在 main 方法,創建 ZooKeeper 對象, “zk” 使用 “ZooKeeperConnection” 對象. 然後再調用zk” 對象的“exists”方法和指定的“path”。完整的列表如下 -
代碼: ZKExists.java
import java.io.IOException; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.data.Stat; public class ZKExists { private static ZooKeeper zk; private static ZooKeeperConnection conn; // Method to check existence of znode and its status, if znode is available. public static Stat znode_exists(String path) throws KeeperException,InterruptedException { return zk.exists(path, true); } public static void main(String[] args) throws InterruptedException,KeeperException { String path = "/MyFirstZnode"; // Assign znode to the specified path try { conn = new ZooKeeperConnection(); zk = conn.connect("localhost"); Stat stat = znode_exists(path); // Stat checks the path of the znode if(stat != null) { System.out.println("Node exists and the node version is " + stat.getVersion()); } else { System.out.println("Node does not exists"); } } catch(Exception e) { System.out.println(e.getMessage()); // Catches error messages } } }
一旦應用程式編譯和執行,會得到下麵的輸出。
Node exists and the node version is 1.
getData 方法
ZooKeeper類提供 getData方法來獲取連接在指定 znode 及其狀態的數據。getData方法的簽名如下 -
getData(String path, Watcher watcher, Stat stat)
這裏,
-
path − Znode 路徑.
-
watcher − Watcher類型的回調函數。ZooKeeper集合將通知通過觀察者回調時指定的節點改變的數據。這是一次性的通知。
-
stat − 返回 znode 元數據。
讓我們創建一個新的Java應用程式,以瞭解的ZooKeeperAPI的 getData 功能。創建一個檔 ZKGetData.java. 在main方法,用ZooKeeperConnection創建一個的ZooKeeper對象zk。然後,調用zk對象的自定義路徑GetData方法。
下麵是完整的程式代碼,以從規定的節點獲得數據-
代碼: ZKGetData.java
import java.io.IOException; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.data.Stat; public class ZKGetData { private static ZooKeeper zk; private static ZooKeeperConnection conn; public static Stat znode_exists(String path) throws KeeperException,InterruptedException { return zk.exists(path,true); } public static void main(String[] args) throws InterruptedException, KeeperException { String path = "/MyFirstZnode"; final CountDownLatch connectedSignal = new CountDownLatch(1); try { conn = new ZooKeeperConnection(); zk = conn.connect("localhost"); Stat stat = znode_exists(path); if(stat != null) { byte[] b = zk.getData(path, new Watcher() { public void process(WatchedEvent we) { if (we.getType() == Event.EventType.None) { switch(we.getState()) { case Expired: connectedSignal.countDown(); break; } } else { String path = "/MyFirstZnode"; try { byte[] bn = zk.getData(path, false, null); String data = new String(bn, "UTF-8"); System.out.println(data); connectedSignal.countDown(); } catch(Exception ex) { System.out.println(ex.getMessage()); } } } }, null); String data = new String(b, "UTF-8"); System.out.println(data); connectedSignal.await(); } else { System.out.println("Node does not exists"); } } catch(Exception e) { System.out.println(e.getMessage()); } } }
一旦應用程式編譯和執行,會得到下麵的輸出
My first zookeeper app
應用程式將等待來自的ZooKeeper集合進一步通知。通過使用 ZooKeeper CLI zkCli.sh 更改指定znode的數據。
cd /path/to/zookeeper bin/zkCli.sh >>> set /MyFirstZnode Hello
現在,應用程式將列印以下的輸出並退出。
Hello
setData 方法
ZooKeeper類提供SetData方法來修改附著在指定 znode 的數據。SetData方法的簽名如下 -
setData(String path, byte[] data, int version)
在這裏,
-
path − Znode 路徑
-
data − 數據存儲在一個指定的znode路徑。
-
version − 當前znode的版本。ZooKeeper更新數據在znode的版本號改變了以後。
現在,讓我們創建一個新的Java應用程式,以瞭解ZooKeeper API 的 setData 功能的使用。 創建一個檔ZKSetData.java。
在main方法中,使用ZooKeeperConnection創建一個ZooKeeper對象ZK。 然後,使用指定路徑,新的數據,和節點版本調用zk對象SetData方法。
下麵是完整的程式代碼用來修改附加在指定znode的數據。
代碼: ZKSetData.java
import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.KeeperState; import java.io.IOException; public class ZKSetData { private static ZooKeeper zk; private static ZooKeeperConnection conn; // Method to update the data in a znode. Similar to getData but without watcher. public static void update(String path, byte[] data) throws KeeperException,InterruptedException { zk.setData(path, data, zk.exists(path,true).getVersion()); } public static void main(String[] args) throws InterruptedException,KeeperException { String path= "/MyFirstZnode"; byte[] data = "Success".getBytes(); //Assign data which is to be updated. try { conn = new ZooKeeperConnection(); zk = conn.connect("localhost"); update(path, data); // Update znode data to the specified path } catch(Exception e) { System.out.println(e.getMessage()); } } }
一旦應用程式被編譯和執行時,指定znode的數據將被改變,並且它可以使用 ZooKeeperCLI,zkCli.sh 進行檢查
cd /path/to/zookeeper bin/zkCli.sh >>> get /MyFirstZnode
getChildren 方法
ZooKeeper類提供 getChildren方法來得到一個特定的 znode 所有子節點。getChildren 方法的簽名如下 -
getChildren(String path, Watcher watcher)
在這裏,
-
path − Znode 路徑.
-
watcher − 調用“Watcher”類型函數. ZooKeeper集合將通知在指定的 znode 被刪除或znode以下子節點創建/刪除。 這是一次性的通知。
代碼: ZKGetChildren.java
import java.io.IOException; import java.util.*; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.data.Stat; public class ZKGetChildren { private static ZooKeeper zk; private static ZooKeeperConnection conn; // Method to check existence of znode and its status, if znode is available. public static Stat znode_exists(String path) throws KeeperException,InterruptedException { return zk.exists(path,true); } public static void main(String[] args) throws InterruptedException,KeeperException { String path = "/MyFirstZnode"; // Assign path to the znode try { conn = new ZooKeeperConnection(); zk = conn.connect("localhost"); Stat stat = znode_exists(path); // Stat checks the path if(stat!= null) { //“getChildren” method- get all the children of znode.It has two args, path and watch List <String> children = zk.getChildren(path, false); for(int i = 0; i < children.size(); i++) System.out.println(children.get(i)); //Print children's } else { System.out.println("Node does not exists"); } } catch(Exception e) { System.out.println(e.getMessage()); } } }
在運行程式之前,讓我們使用 ZooKeeperCLI,zkCli.sh 創建 /MyFirstZnode 的兩個子節點。
cd /path/to/zookeeper bin/zkCli.sh >>> create /MyFirstZnode/myfirstsubnode Hi >>> create /MyFirstZnode/mysecondsubmode Hi
現在,編譯並運行該程式將輸出上面創建znodes。
myfirstsubnode mysecondsubnode
刪除一個Znode
ZooKeeper類提供了 delete 方法來刪除指定 znode。delete方法的簽名如下 -
delete(String path, int version)
在這裏,
-
path − Znode 路徑
-
version − 當前 znode 的版本
讓我們創建一個新的Java應用程式,以瞭解ZooKeeperAPI的刪除功能。
創建一個檔 ZKDelete.java. 在main方法中,使用 ZooKeeperConnection對象創建一個 ZooKeeper 對象ZK。然後,調用 zk 對象的 delete方法與節點的指定的路徑和版本。
完整的程式代碼,以刪除一個znode如下 -
代碼: ZKDelete.java
import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.KeeperException; public class ZKDelete { private static ZooKeeper zk; private static ZooKeeperConnection conn; // Method to check existence of znode and its status, if znode is available. public static void delete(String path) throws KeeperException,InterruptedException { zk.delete(path,zk.exists(path,true).getVersion()); } public static void main(String[] args) throws InterruptedException,KeeperException { String path = "/MyFirstZnode"; //Assign path to the znode try { conn = new ZooKeeperConnection(); zk = conn.connect("localhost"); delete(path); //delete the node with the specified path } catch(Exception e) { System.out.println(e.getMessage()); // catches error messages } } }