一、Zookeeper简介
Zookeeper是一个分布式协同工具,提供了可靠的分布式节点数据管理、状态同步、配置维护等功能,使得分布式应用中各个节点能够协同工作。它具有高性能,遵循CP原则,即一致性和分区容错性,保证了数据的一致性;同时可实现高可用性,保证节点的可用性。Zookeeper提供了轻量级的HTTP API,可以通过Java和其他编程语言使用。
二、锁概念
锁(Lock)是一种同步机制,用于管理对共享资源的访问。在分布式环境中,锁的实现变得更加困难,需要确保各个节点的同步和互斥。分布式锁就是在分布式环境中,实现锁的一种方式,保证各个节点的数据一致性和互斥性。
三、Zookeeper分布式锁实现
在分布式锁的实现中,先来看看Zookeeper的数据模型。Zookeeper中的数据模型是非常简单的,类似于Unix文件系统,它也是一个树状结构,称为ZNode。ZNode可以存储一些数据,可以看成一个目录,其中包含了一些子节点。每个ZNode都有一个版本号,可以实现对ZNode的乐观锁控制。
/** * ZooKeeper实现分布式锁 * 使用zk.create()方法的EPHEMERAL_SEQUENTIAL模式实现锁的节点创建 */ private ZooKeeper zk; private String lockName; private String lockPath; private int sessionTimeout; public ZookeeperLock(String connectStr, String lockName, int sessionTimeout) throws IOException { this.lockName = lockName; this.sessionTimeout = sessionTimeout; // 连接zookeeper zk = new ZooKeeper(connectStr, sessionTimeout, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { if (Watcher.Event.KeeperState.SyncConnected == watchedEvent.getState()) { System.out.println("已连接"); } } }); } // 创建锁的节点 private void createLockNode(String lockName) throws InterruptedException { try { String basePath = "/locks"; // 创建锁的节点,使用EPHEMERAL_SEQUENTIAL模式实现 lockPath = zk.create(basePath + "/" + lockName + "-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); } catch (KeeperException e) { throw new RuntimeException(e); } } // 获取锁 public synchronized boolean getLock() throws InterruptedException { // 创建锁节点 createLockNode(lockName); // 获取/locks下的所有节点 ListallLocks = zk.getChildren("/locks", false); // 对节点按照名称进行排序 Collections.sort(allLocks); // 取出所有锁节点中最小的节点,并判断是否为当前锁 String currentLockPath = lockPath.substring("/locks".length() + 1); int index = allLocks.indexOf(currentLockPath); if (index == 0) { // 已经获取到锁 return true; } else { // 未获取到锁,删除当前锁节点 String preLockPath = "/locks/" + allLocks.get(index - 1); Stat stat = zk.exists(preLockPath, true); if (stat == null) { return getLock(); } else { CountDownLatch latch = new CountDownLatch(1); Watcher watcher = new Watcher() { @Override public void process(WatchedEvent watchedEvent) { latch.countDown(); } }; zk.getData(preLockPath, watcher, stat); latch.await(); return getLock(); } } } // 释放锁 public synchronized void releaseLock() { try { zk.delete(lockPath, -1); zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { throw new RuntimeException(e); } }
四、分布式锁实现的问题和优化
4.1 问题
在上述Zookeeper分布式锁的实现中,会出现“惊群效应”的问题。当有多个节点需要获取锁时,由于各个节点的状态变化都会触发Zookeeper的watcher机制,会使得羊群效应严重,性能严重下降。
4.2 优化
避免“惊群效应”的解决方案有很多种,这里介绍一种基于Zookeeper的改进方案。在Zookeeper的watcher机制下,多个节点的状态改变都会触发,导致了“惊群效应”的问题。因此,解决方案就是减少状态改变的次数,这样能减少watcher的触发次数,提高性能。
改进方案如下:
- 只在需要获取锁时,才创建锁节点
- 在获取到锁时,传入一个timeout参数作为锁的过期时间,如果在指定的时间内锁未被释放,自动删除锁节点
- 在释放锁时,先检查锁是否已经失效
/** * ZooKeeper实现分布式锁 - 改进版 */ private ZooKeeper zk; private String lockName; private String lockPath; private int sessionTimeout; public ZookeeperLock(String connectStr, String lockName, int sessionTimeout) throws IOException { this.lockName = lockName; this.sessionTimeout = sessionTimeout; // 连接zookeeper zk = new ZooKeeper(connectStr, sessionTimeout, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { if (Watcher.Event.KeeperState.SyncConnected == watchedEvent.getState()) { System.out.println("已连接"); } } }); } // 获取锁 public synchronized boolean getLock(long timeout) throws InterruptedException { long start = System.currentTimeMillis(); // 创建锁节点 createLockNode(lockName); // 获取/locks下的所有节点 ListallLocks = zk.getChildren("/locks", false); // 对节点按照名称进行排序 Collections.sort(allLocks); // 取出所有锁节点中最小的节点,并判断是否为当前锁 String currentLockPath = lockPath.substring("/locks".length() + 1); int index = allLocks.indexOf(currentLockPath); if (index == 0) { // 已经获取到锁 return true; } else { // 未获取到锁,监视前一个节点 String preLockPath = "/locks/" + allLocks.get(index - 1); Stat stat = zk.exists(preLockPath, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { synchronized (this) { notifyAll(); } } }); if (stat == null) { return getLock(timeout - (System.currentTimeMillis() - start)); } else { // 等待前一个锁释放 synchronized (this) { wait(timeout - (System.currentTimeMillis() - start)); } return getLock(timeout); } } } // 创建锁的节点 private void createLockNode(String lockName) throws InterruptedException { try { String basePath = "/locks"; // 创建锁的节点,使用EPHEMERAL_SEQUENTIAL模式实现 lockPath = zk.create(basePath + "/" + lockName + "-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); } catch (KeeperException e) { throw new RuntimeException(e); } } // 释放锁 public synchronized void releaseLock() { try { zk.delete(lockPath, -1); zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { throw new RuntimeException(e); } } // 检查锁是否已经失效 private boolean isLockExpired(String currentLockPath, long timeout) throws Exception { List allLocks = zk.getChildren("/locks", false); int index = allLocks.indexOf(currentLockPath); if (index < 0) { // 当前锁节点已经不存在了 return true; } else if (index == 0) { // 已经获取到了锁 return false; } else { // 未获取到锁,检查前一个节点是否已经失效 String preLockPath = "/locks/" + allLocks.get(index - 1); Stat stat = zk.exists(preLockPath, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { synchronized (this) { notifyAll(); } } }); if (stat == null) { // 前一个节点已经失效 return isLockExpired(currentLockPath, timeout); } else { // 前一个节点未失效 CountDownLatch latch = new CountDownLatch(1); Watcher watcher = new Watcher() { @Override public void process(WatchedEvent watchedEvent) { latch.countDown(); } }; zk.getData(preLockPath, watcher, stat); latch.await(timeout, TimeUnit.MILLISECONDS); return isLockExpired(currentLockPath, timeout); } } }
五、总结
在分布式系统中,实现分布式锁是比较复杂的一件事情。Zookeeper提供可靠的分布式节点数据管理功能,适合实现分布式锁。通过Zookeeper的全局有序性和watcher机制,能够确保分布式锁的同步和互斥。同时,为了避免“惊群效应”和提高分布式锁的性能,可以通过上述的优化方案。