zookeeper实现分布式锁

zookeeper实现分布式锁

流程很简单:

  1. 查看目标Node是否已经创建,已经创建,那么等待锁。
  2. 如果未创建,创建一个瞬时Node,表示已经占有锁。
  3. 如果创建失败,那么证明锁已经被其他线程占有了,那么同样等待锁。
  4. 当释放锁,或者当前Session超时的时候,节点被删除,唤醒之前等待锁的线程去争抢锁。

“惊群”就是在一个节点删除的时候,大量对这个节点的删除动作有订阅Watcher的线程会进行回调,这对Zk集群是十分不利的。所以需要避免这种现象的发生。

java解决“惊群”现象案例:

为了解决“惊群“问题,我们需要放弃订阅一个节点的策略,那么怎么做呢?

  1. 我们将锁抽象成目录,多个线程在此目录下创建瞬时的顺序节点,因为Zk会为我们保证节点的顺序性,所以可以利用节点的顺序进行锁的判断。
  2. 首先创建顺序节点,然后获取当前目录下最小的节点,判断最小节点是不是当前节点,如果是那么获取锁成功,如果不是那么获取锁失败。
  3. 获取锁失败的节点获取当前节点上一个顺序节点,对此节点注册监听,当节点删除的时候通知当前节点。
  4. 当unlock的时候删除节点之后会通知下一个节点。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package com.codertom.params.engine;

import com.google.common.base.Strings;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* Created by zhiming on 2017-02-05.
*/
public class FairLockTest {

private String zkQurom = "localhost:2181";

private String lockName = "/mylock";

private String lockZnode = null;

private ZooKeeper zk;

public FairLockTest(){
try {
zk = new ZooKeeper(zkQurom, 6000, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("Receive event "+watchedEvent);
if(Event.KeeperState.SyncConnected == watchedEvent.getState())
System.out.println("connection is established...");
}
});
} catch (IOException e) {
e.printStackTrace();
}


}

private void ensureRootPath(){
try {
if (zk.exists(lockName,true)==null){
zk.create(lockName,"".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 获取锁
* @return
* @throws InterruptedException
*/
public void lock(){
String path = null;
ensureRootPath();
try {
path = zk.create(lockName+"/mylock_", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
lockZnode = path;
List<String> minPath = zk.getChildren(lockName,false);
System.out.println(minPath);
Collections.sort(minPath);
System.out.println(minPath.get(0)+" and path "+path);
if (!Strings.nullToEmpty(path).trim().isEmpty()&&!Strings.nullToEmpty(minPath.get(0)).trim().isEmpty()&&path.equals(lockName+"/"+minPath.get(0))) {
System.out.println(Thread.currentThread().getName() + " get Lock...");
return;
}
String watchNode = null;
for (int i=minPath.size()-1;i>=0;i--){
if(minPath.get(i).compareTo(path.substring(path.lastIndexOf("/") + 1))<0){
watchNode = minPath.get(i);
break;
}
}

if (watchNode!=null){
final String watchNodeTmp = watchNode;
final Thread thread = Thread.currentThread();
Stat stat = zk.exists(lockName + "/" + watchNodeTmp,new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
if(watchedEvent.getType() == Event.EventType.NodeDeleted){
thread.interrupt();
}
try {
zk.exists(lockName + "/" + watchNodeTmp,true);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

});
if(stat != null){
System.out.println("Thread " + Thread.currentThread().getId() + " waiting for " + lockName + "/" + watchNode);
}
}
try {
Thread.sleep(1000000000);
}catch (InterruptedException ex){
System.out.println(Thread.currentThread().getName() + " notify");
System.out.println(Thread.currentThread().getName() + " get Lock...");
return;
}

} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 释放锁
*/
public void unlock(){
try {
System.out.println(Thread.currentThread().getName() + "release Lock...");
zk.delete(lockZnode,-1);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
public static void main(String args[]) throws InterruptedException {
ExecutorService service = Executors.newFixedThreadPool(10);
for (int i = 0;i<4;i++){
service.execute(()-> {
FairLockTest test = new FairLockTest();
try {
test.lock();
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
test.unlock();
});
}
service.shutdown();
}
}

同样上面的程序也有几点需要注意:

  1. Zookeeper的API没有提供直接的获取上一个节点或者最小节点的API需要我们自己实现。
  2. 使用了interrupt做线程的唤醒,这样不科学,因为不想将JVM的lock引进来所以没有用countdownlatch来做流程控制。
  3. Watch也是要重新设置的,这里使用了Watch的复用,所以代码简单些。

手写代码实现分布式锁

加入Maven依赖

1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.6</version>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.4</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
public class ZkLock implements Lock {
//计数器,用于加锁失败时,阻塞
private static CountDownLatch cdl = new CountDownLatch(1);
//ZooKeeper服务器的IP端口
private static final String IP_PORT = "127.0.0.1:2181";
//锁的根路径
private static final String ROOT_NODE = "/Lock";
//上一个节点的路径
private volatile String beforePath;
//当前上锁的节点路径
private volatile String currPath;
//创建ZooKeeper客户端
private ZkClient zkClient = new ZkClient(IP_PORT);

public ZkLock() {
//判断是否存在根节点
if (!zkClient.exists(ROOT_NODE)) {
//不存在则创建
zkClient.createPersistent(ROOT_NODE);
}
}

//加锁
public void lock() {
if (tryLock()) {
System.out.println("加锁成功!!");
} else {
// 尝试加锁失败,进入等待 监听
waitForLock();
// 再次尝试加锁
lock();
}
}

//尝试加锁
public synchronized boolean tryLock() {
// 第一次就进来创建自己的临时节点
if (StringUtils.isBlank(currPath)) {
currPath = zkClient.createEphemeralSequential(ROOT_NODE + "/", "lock");
}
// 对节点排序
List<String> children = zkClient.getChildren(ROOT_NODE);
Collections.sort(children);

// 当前的是最小节点就返回加锁成功
if (currPath.equals(ROOT_NODE + "/" + children.get(0))) {
return true;
} else {
// 不是最小节点 就找到自己的前一个 依次类推 释放也是一样
int beforePathIndex = Collections.binarySearch(children, currPath.substring(ROOT_NODE.length() + 1)) - 1;
beforePath = ROOT_NODE + "/" + children.get(beforePathIndex);
//返回加锁失败
return false;
}
}

//解锁
public void unlock() {
//删除节点并关闭客户端
zkClient.delete(currPath);
zkClient.close();
}

//等待上锁,加锁失败进入阻塞,监听上一个节点
private void waitForLock() {
IZkDataListener listener = new IZkDataListener() {
//监听节点更新事件
public void handleDataChange(String s, Object o) throws Exception {
}

//监听节点被删除事件
public void handleDataDeleted(String s) throws Exception {
//解除阻塞
cdl.countDown();
}
};
// 监听上一个节点
this.zkClient.subscribeDataChanges(beforePath, listener);
//判断上一个节点是否存在
if (zkClient.exists(beforePath)) {
//上一个节点存在
try {
System.out.println("加锁失败 等待");
//加锁失败,阻塞等待
cdl.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 释放监听
zkClient.unsubscribeDataChanges(beforePath, listener);
}

public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}

public void lockInterruptibly() throws InterruptedException {
}

public Condition newCondition() {
return null;
}
}

在Controller层加上锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@PostMapping("/purchase")
public boolean purchaseCommodityInfo(@RequestParam(name = "commodityId") String commodityId, @RequestParam(name = "number") Integer number) throws Exception {
boolean bool;
//获取ZooKeeper分布式锁
ZkLock zkLock = new ZkLock();
try {
//上锁
zkLock.lock();
//调用秒杀抢购的service方法
bool = commodityInfoService.purchaseCommodityInfo(commodityId, number);
} catch (Exception e) {
e.printStackTrace();
bool = false;
} finally {
//解锁
zkLock.unlock();
}
return bool;
}

测试,依然起两台服务器,8080、8081。然后跑测试脚本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static void main(String[] args) throws Exception {
//请求地址
String url = "http://localhost:%s/mall/commodity/purchase";
//请求参数,商品ID,数量
Map<String, String> map = new HashMap<>();
map.put("commodityId", "4f863bb5266b9508e0c1f28c61ea8de1");
map.put("number", "1");
//创建10个线程通过HttpClient进行发送请求,测试
for (int i = 0; i < 10; i++) {
//8080、8081交替请求
String port = "808" + (i % 2);
CommodityThread commodityThread = new CommodityThread(String.format(url, port), map);
commodityThread.start();
}
}

Curator实现分布式锁:

Curator是Netflix公司开源的一套zookeeper客户端框架,解决了很多Zookeeper客户端非常底层的细节开发工作,包括连接重连、反复注册Watcher和NodeExistsException异常等等。Patrixck Hunt(Zookeeper)以一句“Guava is to Java that Curator to Zookeeper”给Curator予高度评价。

Curator封装了Zookeeper的常用API,也包装了很多常用Case的实现。它提供了基于Fluent的编程风格支持。还提供了Zookeeper的各种应用场景:Recipe、共享锁服务、Master选举机制和分布式计数器等。

添加Maven依赖:

1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.3.0</version>
</dependency>
@PostMapping("/purchase")
public boolean purchaseCommodityInfo(@RequestParam(name = "commodityId") String commodityId,
                                     @RequestParam(name = "number") Integer number) throws Exception {
    boolean bool = false;
    //设置重试策略
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
    CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", retryPolicy);
    // 启动客户端
    client.start();
    InterProcessMutex mutex = new InterProcessMutex(client, "/locks");
    try {
        //加锁
        if (mutex.acquire(3, TimeUnit.SECONDS)) {
            //调用抢购秒杀service方法
            bool = commodityInfoService.purchaseCommodityInfo(commodityId, number);
        }
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        //解锁
        mutex.release();
        client.close();
    }
    return bool;
}

参考

https://juejin.im/post/6854573210756972557

-------------本文结束-------------
0%