分布式锁基于zookeeper实现
  TEZNKK3IfmPf 2023年11月15日 18 0

 

public class LockUtil {
    private static final Logger logger = LoggerFactory.getLogger(LockUtil.class);
    private static final byte[]  data      = { 0x12, 0x34 };
    private static Watcher watcher =  new Watcher() {  
        
        public void process(WatchedEvent event) {  
            LoggerUtils.logInfo(logger,"process : " + event.getType());  
        }  
    }; 
    private static ZooKeeper           zookeeper=null;
    private final String         root;                                     //根节点路径
    private String               id;
    private LockNode             idName;
    private String               ownerId;
    private String               lastChildId;
    private Throwable            other     = null;
    private KeeperException      exception = null;
    private InterruptedException interrupt = null;
    private static final Long DEFAULT_TIMEOUT_PERIOD=1000L;
    private ReentrantLock       reentrantLock = new ReentrantLock();

    static{
        try
        {
            zookeeper =new ZooKeeper("ip:port", DEFAULT_TIMEOUT_PERIOD.intValue(),watcher);
        }
        catch (IOException e)
        {
            logger.error("获取zookeeper错误");
        }
    }
     
    public LockUtil(String root) {
        this.root = root;
        ensureExists(root);
       
    }
    

    /**
     * 尝试获取锁操作,阻塞式可被中断
     * @throws NestableRuntimeException 
     */
    public void lock() throws InterruptedException, KeeperException, NestableRuntimeException {
        // 可能初始化的时候就失败了
        if (exception != null) {
            throw exception;
        }

        if (interrupt != null) {
            throw interrupt;
        }

        if (other != null) {
            throw new NestableRuntimeException(other);
        }

        if (isOwner()) {//锁重入
            return;
        }
        reentrantLock.lock();
        BooleanMutex mutex = new BooleanMutex();
        acquireLock(mutex);
        // 避免zookeeper重启后导致watcher丢失,会出现死锁使用了超时进行重试
        try {
            mutex.get(DEFAULT_TIMEOUT_PERIOD, TimeUnit.MICROSECONDS);// 阻塞等待值为true
            // mutex.get();
        } catch (TimeoutException e) {
            if (!mutex.state()) {
                lock();
            }
        }

        if (exception != null) {
            throw exception;
        }

        if (interrupt != null) {
            throw interrupt;
        }

        if (other != null) {
            throw new NestableRuntimeException(other);
        }
    }

    /**
     * 尝试获取锁对象, 不会阻塞
     * 
     * @throws InterruptedException
     * @throws KeeperException
     * @throws NestableRuntimeException 
     */
    public boolean tryLock() throws KeeperException, NestableRuntimeException {
        // 可能初始化的时候就失败了
        if (exception != null) {
            throw exception;
        }

        if (isOwner()) {//锁重入
            return true;
        }
        reentrantLock.lock();
        acquireLock(null);

        if (exception != null) {
            throw exception;
        }

        if (interrupt != null) {
            Thread.currentThread().interrupt();
        }

        if (other != null) {
            throw new NestableRuntimeException(other);
        }

        return isOwner();
    }

    /**
     * 释放锁对象
     */
    public void unlock() throws KeeperException {
        if (id != null) {
            try {
                zookeeper.delete(root + "/" + id, -1);
                reentrantLock.unlock();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } catch (KeeperException.NoNodeException e) {
                // do nothing
            } finally {
                id = null;
            }
        } else {
            //do nothing
        }
    }

    private void ensureExists(final String path) {
        try {
            Stat stat = zookeeper.exists(path, false);
            if (stat != null) {
                return;
            }

            zookeeper.create(path, data,Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } catch (KeeperException e) {
            exception = e;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            interrupt = e;
        }
    }

    /**
     * 返回锁对象对应的path
     */
    public String getRoot() {
        return root;
    }

    /**
     * 判断当前是不是锁的owner
     */
    public boolean isOwner() {
        return id != null && ownerId != null && id.equals(ownerId);
    }

    /**
     * 返回当前的节点id
     */
    public String getId() {
        return this.id;
    }

    // ===================== helper method =============================

    /**
     * 执行lock操作,允许传递watch变量控制是否需要阻塞lock操作
     */
    private Boolean acquireLock(final BooleanMutex mutex) {
        try {
            do {
                if (id == null) {//构建当前lock的唯一标识
                    long sessionId = zookeeper.getSessionId();
                    String prefix = "x-" + sessionId + "-";
                    //如果第一次,则创建一个节点
                    String path = zookeeper.create(root + "/" + prefix, data,
                            Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
                    int index = path.lastIndexOf("/");
                    id = StringUtils.substring(path, index + 1);
                    idName = new LockNode(id);
                }

                if (id != null) {
                    List<String> names = zookeeper.getChildren(root, false);
                    if (names.isEmpty()) {
                        id = null;//异常情况,重新创建一个
                    } else {
                        //对节点进行排序
                        SortedSet<LockNode> sortedNames = new TreeSet<LockNode>();
                        for (String name : names) {
                            sortedNames.add(new LockNode(name));
                        }

                        if (sortedNames.contains(idName) == false) {
                            id = null;//清空为null,重新创建一个
                            continue;
                        }

                        //将第一个节点做为ownerId
                        ownerId = sortedNames.first().getName();
                        if (mutex != null && isOwner()) {
                            mutex.set(true);//直接更新状态,返回
                            return true;
                        } else if (mutex == null) {
                            return isOwner();
                        }

                        SortedSet<LockNode> lessThanMe = sortedNames.headSet(idName);
                        if (!lessThanMe.isEmpty()) {
                            //关注一下排队在自己之前的最近的一个节点
                            LockNode lastChildName = lessThanMe.last();
                            lastChildId = lastChildName.getName();
                            //异步watcher处理
                            Stat stat=zookeeper.exists(root + "/" + lastChildId, new AsyncWatcher() {

                                public void asyncProcess(WatchedEvent event) {
                                    acquireLock(mutex);
                                }

                            });

                            if (stat == null) {
                                acquireLock(mutex);// 如果节点不存在,需要自己重新触发一下,watcher不会被挂上去
                            }
                        } else {
                            if (isOwner()) {
                                mutex.set(true);
                            } else {
                                id = null;// 可能自己的节点已超时挂了,所以id和ownerId不相同
                            }
                        }
                    }
                }
            } while (id == null);
        } catch (KeeperException e) {
            exception = e;
            if (mutex != null) {
                mutex.set(true);
            }
        } catch (InterruptedException e) {
            interrupt = e;
            if (mutex != null) {
                mutex.set(true);
            }
        } catch (Throwable e) {
            other = e;
            if (mutex != null) {
                mutex.set(true);
            }
        }

        if (isOwner() && mutex != null) {
            mutex.set(true);
        }
        return Boolean.FALSE;
    }
public class LockNode implements Comparable<LockNode>
{
    private final String name;

    private String       prefix;

    private int          sequence = -1;

    public LockNode(String name)
    {
        this.name = name;
        this.prefix = name;
        int idx = name.lastIndexOf('-');
        if (idx >= 0)
        {
            this.prefix = name.substring(0, idx);
            try
            {
                this.sequence = Integer.parseInt(name.substring(idx + 1));
            }
            catch (Exception e)
            {
                // ignore
            }
        }
    }

    public int compareTo(LockNode that)
    {
        int s1 = this.sequence;
        int s2 = that.sequence;
        if (s1 == -1 && s2 == -1)
        {
            return this.name.compareTo(that.name);
        }
        if (s1 == -1)
        {
            return -1;
        }
        else if (s2 == -1)
        {
            return 1;
        }
        else
        {
            return s1 - s2;
        }
    }

    public String getName()
    {
        return name;
    }

    public int getSequence()
    {
        return sequence;
    }

    public String getPrefix()
    {
        return prefix;
    }

    public String toString()
    {
        return name.toString();
    }

    // ==================== hashcode & equals方法=======================
    @Override
    public int hashCode()
    {
        final int prime = 31;
        int result = 1;
        result = prime * result + ((name == null) ? 0 : name.hashCode());
        return result;
    }

    @Override
    public boolean equals(Object obj)
    {
        if (this == obj)
        {
            return true;
        }
        if (obj == null)
        {
            return false;
        }
        if (getClass() != obj.getClass())
        {
            return false;
        }
        LockNode other = (LockNode) obj;
        if (name == null)
        {
            if (other.name != null)
            {
                return false;
            }
        }
        else if (!name.equals(other.name))
        {
            return false;
        }
        return true;
    }
}

 

public class BooleanMutex {

    private Sync sync;

    public BooleanMutex() {
        sync = new Sync();
        set(false);
    }

    public BooleanMutex(Boolean mutex) {
        sync = new Sync();
        set(mutex);
    }

    /**
     * 阻塞等待Boolean为true
     * 
     * @throws InterruptedException
     */
    public void get() throws InterruptedException {
        sync.innerGet();
    }

    /**
     * 阻塞等待Boolean为true,允许设置超时时间
     * 
     * @param timeout
     * @param unit
     * @throws InterruptedException
     * @throws TimeoutException
     */
    public void get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
        sync.innerGet(unit.toNanos(timeout));
    }

    /**
     * 重新设置对应的Boolean mutex
     * 
     * @param mutex
     */
    public void set(Boolean mutex) {
        if (mutex) {
            sync.innerSetTrue();
        } else {
            sync.innerSetFalse();
        }
    }

    public boolean state() {
        return sync.innerState();
    }

    /**
     * Synchronization control for BooleanMutex. Uses AQS sync state to
     * represent run status
     */
    private final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -7828117401763700385L;

        /** State value representing that TRUE */
        private static final int  TRUE             = 1;
        /** State value representing that FALSE */
        private static final int  FALSE            = 2;

        private boolean isTrue(int state) {
            return (state & TRUE) != 0;
        }

        /**
         * 实现AQS的接口,获取共享锁的判断
         */
        protected int tryAcquireShared(int state) {
            // 如果为true,直接允许获取锁对象
            // 如果为false,进入阻塞队列,等待被唤醒
            return isTrue(getState()) ? 1 : -1;
        }

        /**
         * 实现AQS的接口,释放共享锁的判断
         */
        protected boolean tryReleaseShared(int ignore) {
            //始终返回true,代表可以release
            return true;
        }

        boolean innerState() {
            return isTrue(getState());
        }

        void innerGet() throws InterruptedException {
            acquireSharedInterruptibly(0);
        }

        void innerGet(long nanosTimeout) throws InterruptedException, TimeoutException {
            if (!tryAcquireSharedNanos(0, nanosTimeout))
                throw new TimeoutException();
        }

        void innerSetTrue() {
            for (;;) {
                int s = getState();
                if (s == TRUE) {
                    return; //直接退出
                }
                if (compareAndSetState(s, TRUE)) {// cas更新状态,避免并发更新true操作
                    releaseShared(0);//释放一下锁对象,唤醒一下阻塞的Thread
                }
            }
        }

        void innerSetFalse() {
            for (;;) {
                int s = getState();
                if (s == FALSE) {
                    return; //直接退出
                }
                if (compareAndSetState(s, FALSE)) {//cas更新状态,避免并发更新false操作
                    setState(FALSE);
                }
            }
        }

    }
}
public abstract class AsyncWatcher implements Watcher
{
    private static final int       DEFAULT_POOL_SIZE    = 30;

    private static final int       DEFAULT_ACCEPT_COUNT = 60;

    private static ExecutorService executor             = new ThreadPoolExecutor(
                                                                1,
                                                                DEFAULT_POOL_SIZE,
                                                                0L,
                                                                TimeUnit.MILLISECONDS,
                                                                new ArrayBlockingQueue(
                                                                        DEFAULT_ACCEPT_COUNT),
                                                                new NamedThreadFactory(
                                                                        "Arbitrate-Async-Watcher"),
                                                                new ThreadPoolExecutor.CallerRunsPolicy());

    public void process(final WatchedEvent event)
    {
        executor.execute(new Runnable()
        {// 提交异步处理
            @Override
            public void run()
            {
                asyncProcess(event);
            }
        });
    }

    public abstract void asyncProcess(WatchedEvent event);
}

 

public class NestableRuntimeException extends Exception
{
    public NestableRuntimeException(Throwable nest){
        super(nest);
    }
}

 

【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

上一篇: leetcode-dp-204 下一篇: leetcode-53-dp
  1. 分享:
最后一次编辑于 2023年11月15日 0

暂无评论

推荐阅读
  TEZNKK3IfmPf   2023年11月15日   20   0   0 edn
  TEZNKK3IfmPf   2023年11月14日   30   0   0 zookeeper
  TEZNKK3IfmPf   2023年11月15日   16   0   0 edn配置文件
TEZNKK3IfmPf