【Zookeeper 学习笔记】—Zookeeper实现分布式锁

作者:一只小哈

出处:https://www.jianshu.com/p/5d12a01018e1


前几天分析了一下三种分布式锁的实现,但是没有利用zookeeper实现一个分布式锁,因为感觉基于Zookeeper实现分布式锁还是稍微复杂的,同时也需要使用Watcher机制,所以就单独搞一篇Zookeeper实现的分布式锁。

首先,第一种实现。我们可以利用Zookeeper不能重复创建一个节点的特性来实现一个分布式锁,这看起来和redis实现分布式锁很像。但是也是有差异的,后面会详细分析。
主要流程图如下:

上面的流程很简单:

  1. 查看目标Node是否已经创建,已经创建,那么等待锁。
  2. 如果未创建,创建一个瞬时Node,表示已经占有锁。
  3. 如果创建失败,那么证明锁已经被其他线程占有了,那么同样等待锁。
  4. 当释放锁,或者当前Session超时的时候,节点被删除,唤醒之前等待锁的线程去争抢锁。

上面是一个完整的流程,简单的代码实现如下:

package com.codertom.params.engine;

import com.google.common.base.Strings;
import org.apache.zookeeper.*;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Lock;

/**
* Zookeepr实现分布式锁
*/
public class LockTest {

   private String zkQurom = "localhost:2181";

   private String lockNameSpace = "/mylock";

   private String nodeString = lockNameSpace + "/test1";

   private Lock mainLock;

   private ZooKeeper zk;

   public LockTest(){
       try {
           zk = new ZooKeeper(zkQurom, 6000, new Watcher() {
               @Override
               public void process(WatchedEvent watchedEvent) {
                   System.out.println("Receive event "+watchedEvent);
                   if(Event.KeeperState.SyncConnected == watchedEvent.getState())
                       System.out.println("connection is established...");
               }
           });
       } catch (IOException e) {
           e.printStackTrace();
       }

   }

   private void ensureRootPath() throws InterruptedException {
       try {
           if (zk.exists(lockNameSpace,true)==null){
               zk.create(lockNameSpace,"".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
           }
       } catch (KeeperException e) {
           e.printStackTrace();
       }
   }

   private void watchNode(String nodeString, final Thread thread) throws InterruptedException {
       try {
           zk.exists(nodeString, new Watcher() {
               @Override
               public void process(WatchedEvent watchedEvent) {
                   System.out.println( "==" + watchedEvent.toString());
                   if(watchedEvent.getType() == Event.EventType.NodeDeleted){
                       System.out.println("Threre is a Thread released Lock==============");
                       thread.interrupt();
                   }
                   try {
                       zk.exists(nodeString,new Watcher() {
                           @Override
                           public void process(WatchedEvent watchedEvent) {
                               System.out.println( "==" + watchedEvent.toString());
                               if(watchedEvent.getType() == Event.EventType.NodeDeleted){
                                   System.out.println("Threre is a Thread released Lock==============");
                                   thread.interrupt();
                               }
                               try {
                                   zk.exists(nodeString,true);
                               } catch (KeeperException e) {
                                   e.printStackTrace();
                               } catch (InterruptedException e) {
                                   e.printStackTrace();
                               }
                           }

                       });
                   } catch (KeeperException e) {
                       e.printStackTrace();
                   } catch (InterruptedException e) {
                       e.printStackTrace();
                   }
               }

           });
       } catch (KeeperException e) {
           e.printStackTrace();
       }
   }

   /**
    * 获取锁
    * @return
    * @throws InterruptedException
    */
   public boolean lock() throws InterruptedException {
       String path = null;
       ensureRootPath();
       watchNode(nodeString,Thread.currentThread());
       while (true) {
           try {
               path = zk.create(nodeString, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
           } catch (KeeperException e) {
               System.out.println(Thread.currentThread().getName() + "  getting Lock but can not get");
               try {
                   Thread.sleep(5000);
               }catch (InterruptedException ex){
                   System.out.println("thread is notify");
               }
           }
           if (!Strings.nullToEmpty(path).trim().isEmpty()) {
               System.out.println(Thread.currentThread().getName() + "  get Lock...");
               return true;
           }
       }
   }

   /**
    * 释放锁
    */
   public void unlock(){
       try {
           zk.delete(nodeString,-1);
           System.out.println("Thread.currentThread().getName() +  release Lock...");
       } catch (InterruptedException e) {
           e.printStackTrace();
       } catch (KeeperException e) {
           e.printStackTrace();
       }
   }

   public static void main(String args[]) throws InterruptedException {
       ExecutorService service = Executors.newFixedThreadPool(10);
       for (int i = 0;i<4;i++){
           service.execute(()-> {
               LockTest test = new LockTest();
               try {
                   test.lock();
                   Thread.sleep(3000);
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
               test.unlock();
           });
       }
       service.shutdown();
   }
}

代码比较糙,但是大致的实现思路和上述一致,这里需要注意:

  1. 因为使用的是原生的Zookeeper API实现,Watch需要重复的设置,所以代码复杂的些。
  2. 唤醒直接用的Thread.interupt这样其实控制流程其实是不好的。

其实上面的实现有优点也有缺点:
优点:
实现比较简单,有通知机制,能提供较快的响应,有点类似reentrantlock的思想,对于节点删除失败的场景由Session超时保证节点能够删除掉。
缺点:
重量级,同时在大量锁的情况下会有“惊群”的问题。

“惊群”就是在一个节点删除的时候,大量对这个节点的删除动作有订阅Watcher的线程会进行回调,这对Zk集群是十分不利的。所以需要避免这种现象的发生。

解决“惊群”:

为了解决“惊群“问题,我们需要放弃订阅一个节点的策略,那么怎么做呢?

  1. 我们将锁抽象成目录,多个线程在此目录下创建瞬时的顺序节点,因为Zk会为我们保证节点的顺序性,所以可以利用节点的顺序进行锁的判断。
  2. 首先创建顺序节点,然后获取当前目录下最小的节点,判断最小节点是不是当前节点,如果是那么获取锁成功,如果不是那么获取锁失败。
  3. 获取锁失败的节点获取当前节点上一个顺序节点,对此节点注册监听,当节点删除的时候通知当前节点。
  4. 当unlock的时候删除节点之后会通知下一个节点。

上面的实现和reentrantlock的公平锁实现还是比较类似的,下面是简单的实现:

package com.codertom.params.engine;

import com.google.common.base.Strings;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* Created by zhiming on 2017-02-05.
*/
public class FairLockTest {

   private String zkQurom = "localhost:2181";

   private String lockName = "/mylock";

   private String lockZnode = null;

   private ZooKeeper zk;

   public FairLockTest(){
       try {
           zk = new ZooKeeper(zkQurom, 6000, new Watcher() {
               @Override
               public void process(WatchedEvent watchedEvent) {
                   System.out.println("Receive event "+watchedEvent);
                   if(Event.KeeperState.SyncConnected == watchedEvent.getState())
                       System.out.println("connection is established...");
               }
           });
       } catch (IOException e) {
           e.printStackTrace();
       }

   }

   private void ensureRootPath(){
       try {
           if (zk.exists(lockName,true)==null){
               zk.create(lockName,"".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
           }
       } catch (Exception e) {
           e.printStackTrace();
       }
   }
   /**
    * 获取锁
    * @return
    * @throws InterruptedException
    */
   public void lock(){
       String path = null;
       ensureRootPath();
           try {
               path = zk.create(lockName+"/mylock_", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
               lockZnode = path;
               List<String> minPath = zk.getChildren(lockName,false);
               System.out.println(minPath);
               Collections.sort(minPath);
               System.out.println(minPath.get(0)+" and path "+path);
               if (!Strings.nullToEmpty(path).trim().isEmpty()&&!Strings.nullToEmpty(minPath.get(0)).trim().isEmpty()&&path.equals(lockName+"/"+minPath.get(0))) {
                   System.out.println(Thread.currentThread().getName() + "  get Lock...");
                   return;
               }
               String watchNode = null;
               for (int i=minPath.size()-1;i>=0;i--){
                   if(minPath.get(i).compareTo(path.substring(path.lastIndexOf("/") + 1))<0){
                       watchNode = minPath.get(i);
                       break;
                   }
               }

               if (watchNode!=null){
                   final String watchNodeTmp = watchNode;
                   final Thread thread = Thread.currentThread();
                   Stat stat = zk.exists(lockName + "/" + watchNodeTmp,new Watcher() {
                       @Override
                       public void process(WatchedEvent watchedEvent) {
                           if(watchedEvent.getType() == Event.EventType.NodeDeleted){
                               thread.interrupt();
                           }
                           try {
                               zk.exists(lockName + "/" + watchNodeTmp,true);
                           } catch (KeeperException e) {
                               e.printStackTrace();
                           } catch (InterruptedException e) {
                               e.printStackTrace();
                           }
                       }

                   });
                   if(stat != null){
                       System.out.println("Thread " + Thread.currentThread().getId() + " waiting for " + lockName + "/" + watchNode);
                   }
               }
               try {
                   Thread.sleep(1000000000);
               }catch (InterruptedException ex){
                   System.out.println(Thread.currentThread().getName() + " notify");
                   System.out.println(Thread.currentThread().getName() + "  get Lock...");
                   return;
               }

           } catch (Exception e) {
              e.printStackTrace();
           }
   }

   /**
    * 释放锁
    */
   public void unlock(){
       try {
           System.out.println(Thread.currentThread().getName() +  "release Lock...");
           zk.delete(lockZnode,-1);
       } catch (InterruptedException e) {
           e.printStackTrace();
       } catch (KeeperException e) {
           e.printStackTrace();
       }
   }

   public static void main(String args[]) throws InterruptedException {
       ExecutorService service = Executors.newFixedThreadPool(10);
       for (int i = 0;i<4;i++){
           service.execute(()-> {
               FairLockTest test = new FairLockTest();
               try {
                   test.lock();
                   Thread.sleep(3000);
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
               test.unlock();
           });
       }
       service.shutdown();
   }

}

同样上面的程序也有几点需要注意:

  1. Zookeeper的API没有提供直接的获取上一个节点或者最小节点的API需要我们自己实现。
  2. 使用了interrupt做线程的唤醒,这样不科学,因为不想将JVM的lock引进来所以没有用countdownlatch来做流程控制。
  3. Watch也是要重新设置的,这里使用了Watch的复用,所以代码简单些。

其实上面的实现还是很复杂的,因为你需要反复的去关注Watcher,实现一个Demo可以,做一个生产环境可用的Lock并不容易。因为你的代码bug在生产环境上会引起很严重的bug。

其实对于Zookeeper的一些常用功能是有一些成熟的包实现的,像Curator。Curator的确是足够牛逼,不仅封装了Zookeeper的常用API,也包装了很多常用Case的实现。但是它的编程风格其实还是吧比较难以接受的。

可以用Curator轻易的实现一个分布式锁:

InterProcessMutex lock = new InterProcessMutex(client, lockPath);
if ( lock.acquire(maxWait, waitUnit) )
{
   try
   {
       // do some work inside of the critical section here
   }
   finally
   {
       lock.release();
   }
}

是的就这么简单,一个直接拿过来可用的轮子。

基于Zookeeper的分布式锁就说完了。基于Zookeeper实现分布式锁,其实是不常用的。虽然它实现锁十分优雅,但编程复杂,同时还要单独维护一套Zookeeper集群,频繁的Watch对Zookeeper集群的压力还是蛮大的,如果不是原有的项目以来Zookeeper,同时锁的量级比较小的话,还是不用为妙。

赞(0) 打赏

如未加特殊说明,此网站文章均为原创,转载必须注明出处。Java 技术驿站 » 【Zookeeper 学习笔记】—Zookeeper实现分布式锁
分享到: 更多 (0)

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

Java 技术驿站 | 致力打造 Java 精品博客

联系作者优质文章

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏