ZooKeeper 是一个基于观察者模式设计的开源分布式协调服务,它负责存储和管理大家都关心的数据,一旦这些数据发生变化,便会通知相应的观察者做出响应。
它数据模型结构如下,和Unix文件系统非常相似:
其中每个节点称为ZNode
,可以通过其路径唯一标识,并存储不超过 1MB
的数据。
总结:投票过半数时,服务器 id 大的胜出
总结: ①EPOCH大的直接胜出 ②EPOCH相同,事务id大的胜出 ③事务id相同,服务器id大的胜出
Zookeeper 提供的服务包括:统一命名服务、统一配置管理、统一集群管理、服务器节点动态上下线(注册中心)、软负载均衡等。
ZooKeeper一般部署为集群模式,且节点数为奇数,如3或5台等,当整个集群中有半数以上的节点存活时,那么整个集群环境才可用。
服务器台数多:好处,提高可靠性;坏处:提高通信延时。
为什么ZooKeeper集群一般为奇数个节点呢?
答:因为ZooKeeper集群选举leader时,必须要求半数以上的节点通过(不包含半数),因此3台和2台服务器都是要求至少2台可用,整个集群才可用,以防止“脑裂”现象,同理,5台和4台也是类似情况。
x1# 1. 安装JDK
2https://www.oracle.com/java/technologies/javase/jdk11-archive-downloads.html
3
4# 2. 下载zookpper
5https://zookeeper.apache.org/releases.html
6http://mirror.bit.edu.cn/apache/zookeeper/
7
8# 3. 上传和解压安装包
9mkdir /usr/local/zookpper
10tar -zxvf zookeeper-3.4.12.tar.gz
11
12# 4. 创建data和logs目录
13mkdir /usr/local/zookeeper/zookeeper-3.4.12/data
14mkdir /usr/local/zookeeper/zookeeper-3.4.12/logs
15
16# 5. 拷贝配置模板
17cp ./conf/zoo_sample.cfg ./conf/zoo.cfg
18
221# 通信心跳时间(毫秒)
2tickTime=2000
3# Leader和Follower初始连接时能容忍的最多心跳数
4initLimit=10
5# Leader和Follower数据同步时能容忍的最多心跳数(如果超过,则认为Follwer死掉,将其剔除)
6syncLimit=5
7
8# 数据存储目录及日志保存目录(如果没有指明dataLogDir,则日志也保存到这个文件夹中)
9dataDir=/usr/local/zookeeper/zookeeper-3.4.12/data
10dataLogDir=/usr/local/zookeeper/zookeeper-3.4.12/logs
11
12# 管理服务端端口
13admin.serverPort=8888
14# 客户端连接的端口
15clientPort=2181
16
17# 集群配置
18# server.服务器编号=服务器IP:Zookeeper服务器之间的通信端口:Leader选举的端口
19server.0=10.0.0.116:2888:3888
20server.1=10.0.0.117:2888:3888
21server.2=10.0.0.118:2888:3888
22
21cd /usr/local/zookeeper/zookeeper-3.4.12/data # 必须配置在 dataDir 目录
2echo 0 > myid # 服务器编号,在集群中唯一,与服务器IP相对应
修改用户启动配置文件:vim ~/.bash_profile
,然后重新加载:source ~/.bash_profile
。
21export ZK_HOME=/usr/local/software/zookeeper-3.4.12
2export PATH=$PATH:$ZK_HOME/bin
在cd /etc/rc.d/init.d
目录创建zookeeper服务:vim zookeeper
,并授权:chmod +x /etc/rc.d/init.d/zookeeper
。
71
2#chkconfig: 2345 10 90
3#description: service zookeeper
4export JAVA_HOME=/usr/local/jdk1.8.0_191
5export ZOO_LOG_DIR=/opt/zookeeper/log
6ZOOKEEPER_HOME=/usr/local/zookeeper-cluster/zookeeper-3.4.12/
7su root ${ZOOKEEPER_HOME}/bin/zkServer.sh "$1"
然后添加到开机启动项:chkconfig --add zookeeper
,可通过chkconfig --list
查看是否添加成功。
最后,可以通过reboot
命令重启机器,查看zookeeper是否开机启动:service zookeeper status
。
注意:若出现“服务 zookeeper 不支持 chkconfig”错误,请检查
/etc/init.d/zookeeper
文件的内容。
151# 启动
2zkServer.sh start
3
4# 停止
5zkServer.sh stop
6
7# 重启服务
8zkServer.sh restart
9
10# 查看节点状态
11[root@loaclhost ~]# zkServer.sh status
12ZooKeeper JMX enabled by default
13Using config: /usr/local/zookeeper-cluster/zookeeper-3.4.12/bin/../conf/zoo.cfg
14Mode: leader
15
注意:如果查看ZK状态时发现ZK不在运行中,先使用
`./zkServer.sh start-foreground
查看启动报错信息。
141# 连接到ZK
2zkCli.sh -server 127.0.0.1:2181
3
4# 测试如下
5--------------------------------------------
6[zk: 127.0.0.1:2181(CONNECTED) 4] create /test
7Created /test
8[zk: 127.0.0.1:2181(CONNECTED) 5] ls /
9[test, zookeeper]
10--------------------------------------------
11
12# 退出
13quit
14
131# 查看节点
2ls /
3ls /狗狗/边牧 # 查询节点
4ls -R / # 递归查询节点
5ls -s /猩猩 # 查询节点的详细信息
6stat /猩猩 # 查看节点状态
7
8# 查看节点值
9get /骆驼
10get -s /猩猩 # 查询节点的详细信息
11get -w /猩猩 # 监听节点的详细信息(注意:注册一次只能监听一次,想再次监听,需要再次注册)
12ls -w /猩猩 # 监听节点的详细信息(注意:注册一次只能监听一次,想再次监听,需要再次注册)
13
161# 创建节点
2create /狗狗
3create /狗狗/边牧
4create -s /bb/bb1 # 序列节点
5create -e /cc # 临时节点 在会话结束后,自动被删除(通过这个特性,zk可以实现服务注册与发现的效果)
6create -e -s /dd # 临时序列节点
7create \[-s\] \[-e\] path data acl # 创建权限节点
8
9# 删除节点
10delete /aa # 下面没有子节点的,可以直接用 delete
11deleteall /手机 # 下面有子节点的,用 deleteall
12
13# 设置节点值
14set /猩猩 园外区3号
15create /骆驼 园区2号 # 创建节点并写入值
16
71# 查看帮助
2help
3
4# 添加认证用户
5addauth digest maidou # 不带密码
6addauth digest beita:123456 # 带密码
7
431public class ZkApiTest {
2 public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
3
4 // 建立连接
5 // 参数1:连接字符串(支持以逗号分隔配置多个,随机进行连接)
6 // 参数2:会话超时时间
7 // 参数3:监听器
8 // 其它参数:是否只读、密码等
9 ZooKeeper zooKeeper = new ZooKeeper("10.210.5.252:2181", 1000 * 60, null);
10
11 // 创建节点
12 // 参数1:节点路径
13 // 参数2:节点数据
14 // 参数3:权限控制,一般是无权限控制,即ZooDefs.Ids.OPEN_ACL_UNSAFE
15 // 参数4:节点类型,可选 PERSISTENT、PERSISTENT_SEQUENTIAL、EPHEMERAL、EPHEMERAL_SEQUENTIAL 等
16 zooKeeper.create("/test", "test ok!".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
17
18 // 查询子节点
19 List<String> children = zooKeeper.getChildren("/", null);
20 System.out.println("子节点:" + children);
21
22 // 判断节点是否存在
23 Stat existStat = zooKeeper.exists("/test", null);
24 System.out.println(existStat != null ? "节点存在" : "节点不存在");
25
26 // 查询节点数据
27 Stat stat = new Stat();
28 byte[] data = zooKeeper.getData("/test", false, stat);
29 System.out.println("当前事务ID:" + stat.getCzxid());
30 System.out.println("当前节点数据:" + new String(data));
31
32 // 修改节点数据
33 Stat updatestat = zooKeeper.setData("/test", "new-data".getBytes(), -1);
34 System.out.println("当前事务ID:" + updatestat.getCzxid());
35
36 // 删除节点
37 zooKeeper.delete("/test", -1);
38
39 // 关闭连接
40 zooKeeper.close();
41 }
42}
43
注意:
- ZooKeeper集群中,只有 Leader 有权限发起数据修改操作,如果向 Follower 发起修改请求,则会转给 Leader 处理。
231public class DistributeServer {
2 private static ZooKeeper zk = null;
3 private static String parentNode = "/test";
4 private static String serverNode = "/server01";
5
6 public static void main(String[] args) throws Exception {
7 // 1. 连接到ZK
8 zk = new ZooKeeper("10.210.5.252:2181", 1000 * 60, new
9 Watcher() {
10
11 public void process(WatchedEvent event) {
12 }
13 });
14
15 // 2. 注册服务(临时节点)
16 zk.create(parentNode + serverNode, serverNode.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
17
18 // 3. 执行业务代码
19 System.out.println("server01 is working ...");
20 Thread.sleep(Long.MAX_VALUE);
21 }
22}
23
471public class DistributeClient {
2 private static ZooKeeper zk = null;
3 private static String parentNode = "/test";
4
5 public static void main(String[] args) throws Exception {
6 // 1. 连接到ZK
7 zk = new ZooKeeper("10.210.5.252:2181", 1000 * 60, new
8 Watcher() {
9
10 public void process(WatchedEvent event) {
11 try {
12 // 再次启动监听
13 watchServerList();
14 } catch (Exception e) {
15 e.printStackTrace();
16 }
17 }
18 });
19
20 // 2. 监听当前在线服务
21 watchServerList();
22
23 // 3. 执行业务代码
24 System.out.println("client is working ...");
25 Thread.sleep(Long.MAX_VALUE);
26 }
27
28
29 // 监听当前在线服务
30 public static void watchServerList() throws KeeperException, InterruptedException {
31 List<String> servers = new ArrayList<>();
32
33 // 1. 查询 /test 的子节点,并监听 /test 节点
34 List<String> children = zk.getChildren(parentNode, true);
35
36 // 2. 查询子节点信息
37 for (String child : children) {
38 String childNode = parentNode + "/" + child;
39 byte[] data = zk.getData(childNode, false, null);
40 servers.add(childNode + ":" + new String(data));
41 }
42
43 // 3. 打印
44 System.out.println("servers = " + servers);
45 }
46
47}
101操作步骤:
21> 启动客户端进行“服务监听”
32> 启动服务端进行“服务注册”
43> 关闭服务端模拟”服务关闭“
5
6客户端控制台打印:
7servers = []
8servers = [/test/server010000000011:/server01]
9servers = []
10
1741// 分布式锁实现
2public class DistributedLock {
3
4 // ZK连接
5 private ZooKeeper zk;
6
7 // 连接信息
8 private String connectString = "10.210.5.252:2181";
9 private int sessionTimeout = 1000 * 60;
10
11 // 根节点、子节点前缀
12 private String rootNode = "locks";
13 private String subNode = "seq-";
14
15 // 当前节点(等待锁或持有锁)
16 private String currentNode;
17
18 // 前置节点(当前节点等待的节点)
19 private String waitPath;
20
21 // 线程协作工具
22 private CountDownLatch waitLatch = new CountDownLatch(1);
23
24 // 和 zk 服务建立连接,并创建根节点
25 public DistributedLock() throws IOException, InterruptedException, KeeperException {
26 CountDownLatch connectLatch = new CountDownLatch(1);
27
28 // 连接到ZK
29 zk = new ZooKeeper(connectString, sessionTimeout, new
30 Watcher() {
31
32 public void process(WatchedEvent event) {
33 // 连接建立时, 打开 latch, 唤醒 wait 在该 latch 上的线程
34 if (event.getState() == Event.KeeperState.SyncConnected) {
35 connectLatch.countDown();
36 }
37
38 // 发生了 waitPath 的删除事件
39 if (event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) {
40 waitLatch.countDown();
41 }
42 }
43 });
44
45 // 等待连接建立
46 connectLatch.await();
47
48 //获取根节点状态
49 Stat stat = zk.exists("/" + rootNode, false);
50
51 //如果根节点不存在,则创建根节点,根节点类型为永久节点
52 if (stat == null) {
53 System.out.println("根节点不存在");
54 zk.create("/" + rootNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
55 }
56 }
57
58 // 加锁方法
59 public void zkLock() {
60 try {
61 // 在根节点下创建临时顺序节点,返回值为创建的节点路径
62 currentNode = zk.create("/" + rootNode + "/" + subNode, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
63
64 // wait 一小会, 让结果更清晰一些
65 Thread.sleep(10);
66
67 // 查询所有子节点
68 // 注意:没有必要监听 /locks 的子节点的变化情况
69 List<String> childrenNodes = zk.getChildren("/" + rootNode, false);
70
71 // 如果列表中只有一个子节点, 那肯定就是 currentNode , 说明 client 获得锁
72 if (childrenNodes.size() == 1) {
73 return;
74 }
75
76 // 对子节点进行排序(从小到大)
77 Collections.sort(childrenNodes);
78
79 // 获取当前节点的位置
80 String thisNode = currentNode.substring(("/" + rootNode + "/").length());
81 int index = childrenNodes.indexOf(thisNode);
82
83 // 如果未找到当前节点,则为数据异常
84 if (index == -1) {
85 throw new RuntimeException("数据异常");
86 }
87
88 // 如果thisNode 在列表中最小, 当前 client 获得锁
89 if (index == 0) {
90 return;
91 }
92
93 // 获得前置节点(当前节点的前1节点)
94 this.waitPath = "/" + rootNode + "/" + childrenNodes.get(index - 1);
95
96 // 在前置节点上注册监听器,
97 // 当前置节点被删除时, zookeeper 会回调监听器的 process 方法
98 zk.getData(waitPath, true, new Stat());
99
100 // 等待前置节点被回调
101 waitLatch.await();
102
103 } catch (KeeperException | InterruptedException e) {
104 e.printStackTrace();
105 }
106 }
107
108 // 解锁方法
109 public void zkUnlock() {
110 try {
111 zk.delete(this.currentNode, -1);
112 } catch (InterruptedException | KeeperException e) {
113 e.printStackTrace();
114 }
115 }
116
117}
118
119// 分布式锁测试
120public class DistributedLockTest {
121
122 // 测试
123 public static void main(String[] args) throws InterruptedException, IOException, KeeperException {
124
125 // 创建2个分布式锁
126 final DistributedLock lock1 = new DistributedLock();
127 final DistributedLock lock2 = new DistributedLock();
128
129 // 线程1
130 new Thread(new Runnable() {
131
132 public void run() {
133 try {
134 lock1.zkLock();
135 System.out.println("线程 1 获取锁");
136
137 // 执行业务
138 Thread.sleep(5 * 1000);
139
140 lock1.zkUnlock();
141 System.out.println("线程 1 释放锁");
142 } catch (Exception e) {
143 e.printStackTrace();
144 }
145 }
146 }).start();
147
148 // 线程2
149 new Thread(new Runnable() {
150
151 public void run() {
152 try {
153 lock2.zkLock();
154 System.out.println("线程 2 获取锁");
155
156 // 执行业务
157 Thread.sleep(5 * 1000);
158
159 lock2.zkUnlock();
160 System.out.println("线程 2 释放锁");
161 } catch (Exception e) {
162 e.printStackTrace();
163 }
164 }
165 }).start();
166 }
167}
168
169// 结果打印
170线程 2 获取锁
171线程 2 释放锁
172线程 1 获取锁
173线程 1 释放锁
174
首先需导入 Curator
相关依赖:
161<!-- curator -->
2<dependency>
3 <groupId>org.apache.curator</groupId>
4 <artifactId>curator-framework</artifactId>
5 <version>4.3.0</version>
6</dependency>
7<dependency>
8 <groupId>org.apache.curator</groupId>
9 <artifactId>curator-recipes</artifactId>
10 <version>4.3.0</version>
11</dependency>
12<dependency>
13 <groupId>org.apache.curator</groupId>
14 <artifactId>curator-client</artifactId>
15 <version>4.3.0</version>
16</dependency>
然后创建一个可重入锁并测试:
951public class CuratorLockTest {
2 private String connectString = "10.210.5.252:2181";
3 private String rootNode = "/locks";
4 private int connectionTimeout = 1000 * 60;
5 private int sessionTimeout = 1000 * 60;
6
7 // 测试
8 public static void main(String[] args) {
9 new CuratorLockTest().test();
10 }
11
12 // 测试
13 private void test() {
14 // 创建两个分布式锁
15 final InterProcessLock lock1 = new InterProcessMutex(getCuratorFramework(), rootNode);
16 final InterProcessLock lock2 = new InterProcessMutex(getCuratorFramework(), rootNode);
17
18 // 线程1
19 new Thread(new Runnable() {
20
21 public void run() {
22 try {
23 lock1.acquire();
24 System.out.println("线程 1 获取锁");
25
26 lock1.acquire();
27 System.out.println("线程 1 再次获取锁");
28
29 // 执行业务
30 Thread.sleep(3 * 1000);
31
32 lock1.release();
33 System.out.println("线程 1 释放锁");
34
35 lock1.release();
36 System.out.println("线程 1 再次释放锁");
37 } catch (Exception e) {
38 e.printStackTrace();
39 }
40 }
41 }).start();
42
43 // 线程2
44 new Thread(new Runnable() {
45
46 public void run() {
47 try {
48 lock2.acquire();
49 System.out.println("线程 2 获取锁");
50
51 lock2.acquire();
52 System.out.println("线程 2 再次获取锁");
53
54 // 执行业务
55 Thread.sleep(5 * 1000);
56
57 lock2.release();
58 System.out.println("线程 2 释放锁");
59
60 lock2.release();
61 System.out.println("线程 2 再次释放锁");
62 } catch (Exception e) {
63 e.printStackTrace();
64 }
65 }
66 }).start();
67 }
68
69 // 分布式锁初始化
70 public CuratorFramework getCuratorFramework() {
71 // 重试策略,初试时间 3 秒,重试 3 次
72 RetryPolicy policy = new ExponentialBackoffRetry(3000, 3);
73
74 //通过工厂创建 Curator
75 CuratorFramework client = CuratorFrameworkFactory.builder()
76 .connectString(connectString)
77 .connectionTimeoutMs(connectionTimeout)
78 .sessionTimeoutMs(sessionTimeout)
79 .retryPolicy(policy).build();
80
81 //开启连接
82 client.start();
83 return client;
84 }
85}
86
87// 结果打印
88线程 1 获取锁
89线程 1 再次获取锁
90线程 1 释放锁
91线程 1 再次释放锁
92线程 2 获取锁
93线程 2 再次获取锁
94线程 2 释放锁
95线程 2 再次释放锁
提示:
- 更多 Curator 框架的用法请参考:https://curator.apache.org/index.html