分布式情况下,可以通过Zookeeper实现分布式锁,在Curator框架中,给我们封装了InterProcessMutex 这个类,可以实现分布式锁的功能,分布式锁的实现原理是,创建临时节点,回话关闭,临时节点就删除,其他的节点就可以通过判断是否有这个临时节点,来实现分布式锁。

案例

启动10个线程,共同完成减法的操作,通过分布式线程的作用,打印结果时,他们是一个一个的运行的,这说明分布式锁起到作用了。

package com.yellowcong.zookeeper.curator;

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

/** * 创建日期:2017年10月14日 <br/> * 创建用户:yellowcong <br/> * 功能描述: */
public class CuratorLock {
      
        

    private static final String CONNECT_PATH = "192.168.66.110:2181,192.168.66.110:2182,192.168.66.110:2183";

    // Session 超时时间
    private static final int SESSION_TIME_OUT = 60000;

    //连接超时
    private static final int CONNECT_TIME_OUT = 5000;

    //定义
    private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(10);
    public static void main(String[] args) throws Exception {
        /* */

        final AtomicInteger cnt = new AtomicInteger(10);

        for(int i=0;i<10;i++){
            new Thread(new Runnable() {

                public void run() {
                    //分布式锁
                    CuratorFramework cf = getConenction();
                    InterProcessMutex lock = null;
                    try {
                        System.out.println(Thread.currentThread().getName()+"\t线程准备");

                        //等10个线程都准备好了,再启动
                        cyclicBarrier.await();

                        //获取到锁
                        lock = new InterProcessMutex(cf,"/lock");
                        if(cnt.get() >= 0){
                            //分布式锁
                            lock.acquire();
                            Thread.sleep(1000);
                            System.out.println(Thread.currentThread().getName()+"\t"+cnt.decrementAndGet());
                        }
                    } catch (Exception e) {
                        // TODO: handle exception
                    }finally{
                        //释放锁
                        try {
                            if(lock != null){
                                lock.release();
                            }
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }
            }).start();
        }
    }

    /** * 获取连接 * 创建日期:2017年10月14日<br/> * 创建用户:yellowcong<br/> * 功能描述: * @return */
    private static CuratorFramework getConenction(){
        RetryPolicy retryPolicy = new   ExponentialBackoffRetry(5000, 10);
        CuratorFramework cf = CuratorFrameworkFactory.builder()
                .retryPolicy(retryPolicy)
                .connectString(CONNECT_PATH)
                .sessionTimeoutMs(SESSION_TIME_OUT)
                .connectionTimeoutMs(CONNECT_TIME_OUT)
                .build();

        cf.start();

        return cf ;
    }


}