Skip to content

Commit f675f55

Browse files
committed
using netflix Curator to test zk
1 parent 088f071 commit f675f55

File tree

10 files changed

+155
-127
lines changed

10 files changed

+155
-127
lines changed

pom.xml

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,15 @@
2828
<version>12.0</version>
2929
</dependency>
3030
<dependency>
31-
<groupId>org.apache.zookeeper</groupId>
32-
<artifactId>zookeeper</artifactId>
33-
<version>3.3.5</version>
31+
<groupId>com.netflix.curator</groupId>
32+
<artifactId>curator-recipes</artifactId>
33+
<version>1.1.14</version>
34+
</dependency>
35+
<dependency>
36+
<groupId>com.netflix.curator</groupId>
37+
<artifactId>curator-test</artifactId>
38+
<version>1.1.14</version>
39+
<scope>test</scope>
3440
</dependency>
3541
<dependency>
3642
<groupId>com.sun.jersey</groupId>
Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
11
package org.zookeeper.app;
22

3+
import com.netflix.curator.framework.api.CuratorWatcher;
34
import org.apache.zookeeper.CreateMode;
4-
import org.apache.zookeeper.KeeperException;
55
import org.apache.zookeeper.Watcher;
66
import org.apache.zookeeper.ZooDefs;
77
import org.apache.zookeeper.data.Stat;
8-
import org.zookeeper.tdg.ConnectionWatcher;
9-
10-
import java.io.UnsupportedEncodingException;
8+
import org.zookeeper.tdg.CuratorConnection;
119

1210
/**
1311
* Licensed to the Apache Software Foundation (ASF) under one or more
@@ -25,20 +23,23 @@
2523
* See the License for the specific language governing permissions and
2624
* limitations under the License.
2725
*/
28-
public class ActiveKeyValueStore extends ConnectionWatcher {
26+
public class ActiveKeyValueStore extends CuratorConnection {
2927

3028
public static final String CHARSET = "UTF-8";
3129

32-
public void write(String path, String value) throws InterruptedException, KeeperException, UnsupportedEncodingException {
33-
Stat stat = zk.exists(path, false);
30+
public void write(String path, String value) throws Exception {
31+
Stat stat = client.checkExists().forPath(path);
3432
if (stat == null)
35-
zk.create(path, value.getBytes(CHARSET), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
33+
client.create()
34+
.withMode(CreateMode.PERSISTENT)
35+
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
36+
.forPath(path, value.getBytes(CHARSET));
3637
else
37-
zk.setData(path, value.getBytes(CHARSET), -1);
38+
client.setData().forPath(path, value.getBytes(CHARSET));
3839
}
3940

40-
public String read(String path, Watcher watcher) throws InterruptedException, KeeperException, UnsupportedEncodingException {
41-
byte[] data = zk.getData(path,watcher,null);
41+
public String read(String path, CuratorWatcher watcher) throws Exception {
42+
byte[] data = client.getData().usingWatcher(watcher).forPath(path);
4243
return new String(data,CHARSET);
4344
}
4445
}
Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
11
package org.zookeeper.app;
22

3-
import org.apache.zookeeper.KeeperException;
4-
53
import java.io.IOException;
6-
import java.io.UnsupportedEncodingException;
74
import java.util.Random;
85
import java.util.concurrent.TimeUnit;
96

@@ -23,7 +20,7 @@
2320
* See the License for the specific language governing permissions and
2421
* limitations under the License.
2522
*/
26-
public class ConfigUpdater {
23+
public class ConfigUpdater implements Runnable {
2724
public static final String PATH = "/config";
2825

2926
private ActiveKeyValueStore store;
@@ -34,17 +31,19 @@ public ConfigUpdater(String hosts) throws IOException, InterruptedException {
3431
store.connect(hosts);
3532
}
3633

37-
public void run() throws InterruptedException, UnsupportedEncodingException, KeeperException {
38-
while(true){
39-
String value = random.nextInt(100) + "";
40-
store.write(PATH, value);
41-
System.out.printf("Set %s to %s\n",PATH,value);
42-
TimeUnit.SECONDS.sleep(random.nextInt(10));
34+
@Override
35+
public void run() {
36+
try {
37+
for (int i = 0; i < 3; i++) {
38+
String value = random.nextInt(100) + "";
39+
store.write(PATH, value);
40+
System.out.printf("Set %s to %s\n", PATH, value);
41+
TimeUnit.SECONDS.sleep(random.nextInt(10));
42+
}
43+
} catch (Exception e) {
44+
//do nothing
45+
} finally {
46+
store.close();
4347
}
4448
}
45-
46-
public static void main(String[] args) throws InterruptedException, KeeperException, IOException {
47-
ConfigUpdater configUpdater = new ConfigUpdater("localhost");
48-
configUpdater.run();
49-
}
5049
}

src/main/java/org/zookeeper/app/ConfigWatcher.java

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package org.zookeeper.app;
22

3+
import com.netflix.curator.framework.api.CuratorWatcher;
34
import org.apache.zookeeper.KeeperException;
45
import org.apache.zookeeper.WatchedEvent;
56
import org.apache.zookeeper.Watcher;
@@ -23,39 +24,33 @@
2324
* See the License for the specific language governing permissions and
2425
* limitations under the License.
2526
*/
26-
public class ConfigWatcher implements Watcher {
27+
public class ConfigWatcher implements CuratorWatcher {
2728
private ActiveKeyValueStore store;
2829

2930
public ConfigWatcher(String hosts) throws IOException, InterruptedException {
3031
store = new ActiveKeyValueStore();
3132
store.connect(hosts);
3233
}
3334

34-
public void displayConfig() throws InterruptedException, UnsupportedEncodingException, KeeperException {
35+
public void displayConfig() throws Exception{
3536
String value = store.read(ConfigUpdater.PATH, this);
3637
System.out.printf("Read %s as %s\n", ConfigUpdater.PATH, value);
3738
}
3839

3940
@Override
4041
public void process(WatchedEvent event) {
41-
if (event.getType() == Event.EventType.NodeDataChanged)
42+
if (event.getType() == Watcher.Event.EventType.NodeDataChanged)
4243
try {
4344
displayConfig();
4445
} catch (InterruptedException e) {
4546
System.err.println("Interrupted. Exiting.");
4647
Thread.currentThread().interrupt();
4748
} catch (KeeperException e) {
4849
System.err.printf("KeeperException: %s. Exiting.\n", e);
49-
} catch (UnsupportedEncodingException e) {
50+
} catch (Exception e) {
5051
System.err.println("Unsupported. Exiting.");
5152
Thread.currentThread().interrupt();
5253
}
5354
}
5455

55-
public static void main(String[] args) throws InterruptedException, KeeperException, IOException {
56-
ConfigWatcher configWatcher = new ConfigWatcher("localhost");
57-
configWatcher.displayConfig();
58-
59-
Thread.sleep(Long.MAX_VALUE);
60-
}
6156
}

src/main/java/org/zookeeper/tdg/CreateGroup.java

Lines changed: 6 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,6 @@
22

33
import org.apache.zookeeper.*;
44

5-
import java.io.IOException;
6-
import java.util.concurrent.CountDownLatch;
7-
85
/**
96
* Licensed to the Apache Software Foundation (ASF) under one or more
107
* contributor license agreements. See the NOTICE file distributed with
@@ -21,39 +18,14 @@
2118
* See the License for the specific language governing permissions and
2219
* limitations under the License.
2320
*/
24-
public class CreateGroup implements Watcher {
21+
public class CreateGroup extends CuratorConnection {
2522

26-
private static final int SESSION_TIMEOUT = 5000;
27-
28-
private ZooKeeper zk;
29-
private CountDownLatch connectedSignal = new CountDownLatch(1);
30-
31-
@Override
32-
public void process(WatchedEvent watchedEvent) {
33-
if(watchedEvent.getState() == Event.KeeperState.SyncConnected)
34-
connectedSignal.countDown();
35-
}
36-
37-
public void connect(String hosts) throws IOException, InterruptedException {
38-
zk = new ZooKeeper(hosts,SESSION_TIMEOUT,this);
39-
connectedSignal.await();
40-
}
41-
42-
public void create(String groupName) throws KeeperException, InterruptedException {
23+
public void create(String groupName) throws Exception {
4324
String path = "/" +groupName;
44-
String createdPath = zk.create(path,null/*data*/, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
25+
String createdPath = client.create()
26+
.withMode(CreateMode.PERSISTENT)
27+
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
28+
.forPath(path);
4529
System.out.println("Created "+ createdPath);
4630
}
47-
48-
public void close() throws InterruptedException{
49-
zk.close();
50-
}
51-
52-
public static void main(String[] args) throws InterruptedException, IOException, KeeperException {
53-
CreateGroup createGroup = new CreateGroup();
54-
createGroup.connect(args[0]);
55-
createGroup.create(args[1]);
56-
createGroup.close();
57-
}
58-
5931
}

src/main/java/org/zookeeper/tdg/ConnectionWatcher.java renamed to src/main/java/org/zookeeper/tdg/CuratorConnection.java

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
package org.zookeeper.tdg;
22

3+
import com.netflix.curator.framework.CuratorFramework;
4+
import com.netflix.curator.framework.CuratorFrameworkFactory;
5+
import com.netflix.curator.retry.RetryOneTime;
36
import org.apache.zookeeper.WatchedEvent;
47
import org.apache.zookeeper.Watcher;
58
import org.apache.zookeeper.ZooKeeper;
9+
import org.omg.CORBA.TIMEOUT;
610

711
import java.io.IOException;
812
import java.util.concurrent.CountDownLatch;
@@ -23,24 +27,17 @@
2327
* See the License for the specific language governing permissions and
2428
* limitations under the License.
2529
*/
26-
public class ConnectionWatcher implements Watcher {
27-
private static final int SESSION_TIMEOUT = 5000;
30+
public class CuratorConnection {
31+
private static final int TIMEOUT = 1000;
2832

29-
protected ZooKeeper zk;
30-
private CountDownLatch connectedSignal = new CountDownLatch(1);
31-
32-
@Override
33-
public void process(WatchedEvent watchedEvent) {
34-
if (watchedEvent.getState() == Event.KeeperState.SyncConnected)
35-
connectedSignal.countDown();
36-
}
33+
protected CuratorFramework client;
3734

3835
public void connect(String hosts) throws IOException, InterruptedException {
39-
zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this);
40-
connectedSignal.await();
36+
client = CuratorFrameworkFactory.newClient(hosts, new RetryOneTime(TIMEOUT));
37+
client.start();
4138
}
4239

43-
public void close() throws InterruptedException {
44-
zk.close();
40+
public void close() {
41+
client.close();
4542
}
4643
}

src/main/java/org/zookeeper/tdg/DeleteGroup.java

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,27 +20,20 @@
2020
* See the License for the specific language governing permissions and
2121
* limitations under the License.
2222
*/
23-
public class DeleteGroup extends ConnectionWatcher {
23+
public class DeleteGroup extends CuratorConnection {
2424

25-
public void delete(String groupName) throws KeeperException, InterruptedException {
25+
public void delete(String groupName) throws Exception {
2626
String path = "/" + groupName;
2727

2828
try {
29-
List<String> children = zk.getChildren(path, false);
29+
List<String> children = client.getChildren().forPath(path);
3030
for (String child : children) {
31-
zk.delete(path + "/" + child, -1);
31+
client.delete().withVersion(-1).forPath(path + "/" + child);
3232
}
33-
zk.delete(path, -1);
33+
client.delete().withVersion(-1).forPath(path);
3434
} catch (KeeperException.NoNodeException e) {
3535
System.out.printf("Group %s does not exist\n", groupName);
3636
System.exit(1);
3737
}
3838
}
39-
40-
public static void main(String[] args) throws Exception {
41-
DeleteGroup deleteGroup = new DeleteGroup();
42-
deleteGroup.connect(args[0]);
43-
deleteGroup.delete(args[1]);
44-
deleteGroup.close();
45-
}
4639
}
Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,5 @@
11
package org.zookeeper.tdg;
22

3-
import org.apache.zookeeper.CreateMode;
4-
import org.apache.zookeeper.KeeperException;
5-
import org.apache.zookeeper.ZooDefs;
6-
7-
import java.io.IOException;
8-
93
/**
104
* Licensed to the Apache Software Foundation (ASF) under one or more
115
* contributor license agreements. See the NOTICE file distributed with
@@ -22,19 +16,11 @@
2216
* See the License for the specific language governing permissions and
2317
* limitations under the License.
2418
*/
25-
public class JoinGroup extends ConnectionWatcher {
19+
public class JoinGroup extends CuratorConnection {
2620

27-
public void join(String groupName, String memberName) throws InterruptedException, KeeperException {
21+
public void join(String groupName, String memberName) throws Exception {
2822
String path = "/" + groupName + "/" + memberName;
29-
String createdPath = zk.create(path,null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
23+
String createdPath = client.create().forPath(path);
3024
System.out.println("Created " + createdPath);
3125
}
32-
33-
public static void main(String[] args) throws InterruptedException, KeeperException, IOException {
34-
JoinGroup joinGroup = new JoinGroup();
35-
joinGroup.connect(args[0]);
36-
joinGroup.join(args[1],args[2]);
37-
38-
Thread.sleep(Long.MAX_VALUE);
39-
}
4026
}

src/main/java/org/zookeeper/tdg/ListGroup.java

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import org.apache.zookeeper.KeeperException;
44

5-
import java.io.IOException;
65
import java.util.List;
76

87
/**
@@ -21,13 +20,13 @@
2120
* See the License for the specific language governing permissions and
2221
* limitations under the License.
2322
*/
24-
public class ListGroup extends ConnectionWatcher {
23+
public class ListGroup extends CuratorConnection {
2524

26-
public void list(String groupName) throws InterruptedException, KeeperException {
25+
public void list(String groupName) throws Exception {
2726
String path = "/" + groupName;
2827

2928
try {
30-
List<String> children = zk.getChildren(path, false);
29+
List<String> children = client.getChildren().forPath(path);
3130
if (children.isEmpty()) {
3231
System.out.printf("No members in group %s\n", groupName);
3332
System.exit(1);
@@ -39,11 +38,4 @@ public void list(String groupName) throws InterruptedException, KeeperException
3938
System.exit(1);
4039
}
4140
}
42-
43-
public static void main(String[] args) throws InterruptedException, KeeperException, IOException {
44-
ListGroup listGroup = new ListGroup();
45-
listGroup.connect(args[0]);
46-
listGroup.list(args[1]);
47-
listGroup.close();
48-
}
4941
}

0 commit comments

Comments
 (0)