ZK实现分布式锁
时间:2022-07-24
本文章向大家介绍ZK实现分布式锁,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
上一篇说了ZK是什么以及能干什么,今儿这篇就来用ZK实现分布式锁,分别用java原生的zookeeper客户端、ZKClient实现。
一、分布式锁
分布式锁的思路是每个客户端都在某个目录下注册一个临时有序节点,每次最小的节点会获取锁,当前节点会去监听上一个较小节点,如果较小节点失效之后,就会去获取锁。
java原生zookeeper客户端
(1)引入jar包
(2)创建ZK客户端连接单例
public class ZookeeperClient {
//zk集群地址
public static final String ZOOKEEPER_CONNECT="192.168.197.100:2181,192.168.197.110:2181,192.168.197.120:2181";
//计数器,用于等待连接成功
public static CountDownLatch countDownLatch = new CountDownLatch(1);
//连接超时时间
public static final int SESSION_TIMEOUT = 5000;
//用volatile修饰单例,防止赋值时发生指令重排
private volatile static ZooKeeper instance;
//用Double check获取单例
public static ZooKeeper getInstance() throws IOException, InterruptedException {
if (instance == null ){
synchronized (ZookeeperClient.class) {
if (instance == null) {
//连接时注册一个监听,监听连接状态变化
instance = new ZooKeeper(ZOOKEEPER_CONNECT, SESSION_TIMEOUT, new Watcher() {
//监听回调方法
@Override
public void process(WatchedEvent watchedEvent) {
//当连接状态变成connected,就说明连接成功
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
countDownLatch.countDown();
}
}
});
//等待连接成功
countDownLatch.await();
}
}
}
return instance;
}
public static int getSessionTimeout() {
return SESSION_TIMEOUT;
}
}
上述代码中的CountDownLatch是因为连接时会耗时较长,所以需要添加一个计数器进行阻塞,否则会在connecting阶段就被释放了。
(3)创建分布式锁客户端
public class DistibutedLock {
//根目录,客户端都会去此目录下创建临时有序子节点
private final String ROOT_PATH = "/lock";
//客户端
private ZooKeeper zookeeper;
//session超时时间
private int SESSION_TIMEOUT;
//当前客户端创建有序节点的名称
private String lockId;
private CountDownLatch countDownLatch = new CountDownLatch(1);
public DistibutedLock() throws IOException, InterruptedException {
this.zookeeper =ZookeeperClient.getInstance();
this.SESSION_TIMEOUT = ZookeeperClient.getSessionTimeout();
}
public boolean lock(){
try {
//创建临时有序子节点
lockId = zookeeper.create(ROOT_PATH+"/","123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(Thread.currentThread().getName()+"创建节点"+lockId+",开始竞争锁");
//获取/lock目录下所有子节点
List<String> children = zookeeper.getChildren(ROOT_PATH, true);
//用SortedSet对子节点从小到大进行排序
SortedSet<String> sortedSet = new TreeSet<String>();
for (String child : children) {
sortedSet.add(ROOT_PATH+"/"+child);
}
//获取最小节点名称
String first = sortedSet.first();
//如果当前创建节点就是最小节点,则获取锁
if (first.equals(lockId)) {
System.out.println(Thread.currentThread().getName()+"获取锁"+lockId);
return true;
}
//获取比当前id小的节点集合
SortedSet<String> frontSet = sortedSet.headSet(lockId);
if (!frontSet.isEmpty()) {
//取集合中最后一个元素,也就是临近最小节点
String last = frontSet.last();
System.out.println(lockId+"监听"+last);
//当前节点去监听上一个节点,当上一个节点被删除的时候
//当前节点就可以获取锁
zookeeper.exists(last, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
if (watchedEvent.getType() == Event.EventType.NodeDeleted) {
countDownLatch.countDown();
}
}
});
countDownLatch.await(SESSION_TIMEOUT, TimeUnit.MILLISECONDS);
System.out.println(Thread.currentThread().getName() + "获取锁" + lockId);
}
return true;
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}
//释放锁
public boolean unLock(){
try {
System.out.println(Thread.currentThread().getName() + "开始删除锁" + lockId);
//删除当前节点
zookeeper.delete(lockId, -1);
return true;
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
return false;
}
}
(4)测试代码
//等待器,当所有线程都执行到某个步骤才停止阻塞
CyclicBarrier cyclicBarrier = new CyclicBarrier(10);
//模拟十个线程去获取锁
for (int i = 0; i < 10; i++) {
new Thread(()-> {
DistibutedLock lock = null;
try {
lock = new DistibutedLock();
cyclicBarrier.await();
lock.lock();
TimeUnit.MILLISECONDS.sleep(new Random().nextInt(500));
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
} finally {
if(lock!=null){
lock.unLock();
}
}
}).start();
}
运行结果:按照创建顺序去获取锁
ZKClient
(1)引入jar包
(2)创建ZK客户端连接单例
public class ZKClientInstance {
public static final String ZOOKEEPER_CONNECT="192.168.197.100:2181,192.168.197.110:2181,192.168.197.120:2181";
private volatile static ZkClient instance;
public static ZkClient getInstance(){
if (instance == null) {
synchronized (ZKClientInstance.class) {
if (instance == null) {
instance = new ZkClient(ZOOKEEPER_CONNECT,5000,
5000,new SerializableSerializer());
}
}
}
return instance;
}
}
(3)创建分布式锁客户端
public class ZKClientDisLock {
private static final String ROOT_PATH = "/lock";
private ZkClient zkClient;
private CountDownLatch countDownLatch = new CountDownLatch(1);
private String lockId;
public ZKClientDisLock(ZkClient zkClient) {
this.zkClient = zkClient;
}
public boolean lock(){
lockId = zkClient.createEphemeralSequential(ROOT_PATH + "/", "123");
List<String> children = zkClient.getChildren(ROOT_PATH);
SortedSet<String> sortedSet = new TreeSet<String>();
for (String child : children) {
sortedSet.add(ROOT_PATH+"/"+child);
}
String first = sortedSet.first();
if (first.equals(lockId)) {
System.out.println(Thread.currentThread().getName() + "获取锁" + lockId);
return true;
}
SortedSet<String> frontSet = sortedSet.headSet(lockId);
if (null != frontSet && frontSet.size() > 0) {
String last = frontSet.last();
IZkDataListener iZkDataListener = null;
try {
System.out.println(lockId + "监听" + last + "节点变化");
iZkDataListener = new IZkDataListener() {
@Override
public void handleDataChange(String s, Object o) throws Exception {
}
@Override
public void handleDataDeleted(String s) throws Exception {
countDownLatch.countDown();
}
};
zkClient.subscribeDataChanges(last, iZkDataListener);
countDownLatch.await();
System.out.println(Thread.currentThread().getName() + "获取锁" + lockId);
} catch (Exception e) {
}finally {
zkClient.unsubscribeDataChanges(last,iZkDataListener);
}
return true;
}
return false;
}
public void unLock(){
System.out.println(Thread.currentThread().getName()+ "释放锁"+ lockId + "-----");
zkClient.delete(lockId);
}
}
(3)测试代码
CyclicBarrier cyclicBarrier = new CyclicBarrier(10);
for (int i = 0; i < 10; i++) {
int finalI = i;
new Thread(()-> {
ZKClientDisLock lock = null;
try {
lock = new ZKClientDisLock(ZKClientInstance.getInstance());
cyclicBarrier.await();
lock.lock();
TimeUnit.MILLISECONDS.sleep(new Random().nextInt(500));
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
} finally {
if(lock!=null){
lock.unLock();
}
}
}).start();
}
运行结果:
上述就是用java原生api以及ZKClient实现的分布式锁。
还有一种是用apache-curator实现,其可以实现可重入锁、排它锁、读写锁。之后有机会介绍curator的使用方法。
- 大数据助你购买航空延误险,飞机延误未必是坏事!
- 作为一个有追求的程序员,你应该掌握的七种武器
- 腾讯云直播答题方案解析
- C+虚函数实现多态性的思考
- Zzreal的大数据笔记-SparkDay04
- 180数字集团品牌升级 启用域名180.ai
- iDC预测全球物联网花费在2018年至7720亿美金
- 北京青年报:用数据说话应是网络强国标配
- 3字母域名谁不爱?游戏门户启用价值六位数的域名
- 中国版Space X首台“民营火箭”发动机试车成功;贾跃亭甘薇在美欢度圣诞,证监局喊破嗓子他也不回家;谷歌AI新技能
- 我的写作工具链
- 微信小程序更新新能力:四大功能让小程序更火爆!一起来了解更新吧,快来学习吧
- CentOS下ssh免密码问题
- flask-script
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 前端网络高级篇(四)CORS 跨域
- 自建私有docker仓库-Harbor
- 前端网络高级篇(五)常见网络攻击
- 一文搞懂Cookie,Session,Token,JWT
- 前端知识点系列二:CSS
- [HTTP趣谈]支持跨域及相关cookie设置
- pytest封神之路第三步 精通fixture
- 前端网络高级篇(六)网站性能优化
- 使用Webrtc和React Js在网络上共享跨平台的点对点文件
- JSBridge小科普
- Greenplum编译安装
- 宏任务和微任务到底是什么?
- React中的setState是异步的吗?
- java安全编码指南之:堆污染Heap pollution
- ECMAScript6基础学习教程(五)对象