zookeeper分布式集群Curator的分布式整型int计数器SharedCount
  TEZNKK3IfmPf 2023年11月14日 24 0

zookeeper分布式集群Curator的分布式整型int计数器SharedCount

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.shared.SharedCount;
import org.apache.curator.framework.recipes.shared.SharedCountListener;
import org.apache.curator.framework.recipes.shared.SharedCountReader;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;


public class Main {
    private int COUNT = 10;
    private int TIME = 50;

    public static void main(String[] args) {
        //初始化log4j,zookeeper否则报错。
        //org.apache.log4j.BasicConfigurator.configure();

        try {
            Main app = new Main();
            app.zk();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private void zk() throws Exception {
        //任意位置的SharedCount,只要使用相同的path,都可以得到这个计数值。
        String path = "/path/count";

        CuratorFramework client = initClient(path);

        //分布式集群中存在的某一处计数。
        SharedCount baseCount = new SharedCount(client, path, 0);
        baseCount.start();

        //baseCount.setCount(0);

        baseCount.addListener(new SharedCountListener() {
            @Override
            public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception {
                System.out.println("countHasChanged最新=" + newCount);
            }

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                //System.out.println(TAG + "状态变化:" + client.toString());
            }
        });

        List<SharedCount> sharedCountList = new ArrayList();

        //开启COUNT个线程,模拟对分布式中SharedCount的不同节点的赋值操作。
        ExecutorService service = Executors.newFixedThreadPool(COUNT);
        for (int i = 0; i < COUNT; i++) {
            SharedCount count = new SharedCount(client, path, 0);
            count.start();

            sharedCountList.add(count);

            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    try {
                        TimeUnit.SECONDS.sleep((long) (Math.random() * TIME));

                        setValue(count);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            };

            service.submit(runnable);
        }

        service.shutdown();
        service.awaitTermination(TIME, TimeUnit.SECONDS);


        //使用完成后要记得关闭。

        for (SharedCount c : sharedCountList) {
            c.close();
        }

        baseCount.close();
    }

    private void setValue(SharedCount count) throws Exception {
        System.out.println("-------------");

        int oldCount = count.getCount();
        System.out.println("当前值=" + oldCount);
        System.out.println("当前VersionedValue version=" + count.getVersionedValue().getVersion());
        System.out.println("当前VersionedValue value=" + count.getVersionedValue().getValue());

        int n = (int) (Math.random() * 1000);
        System.out.println("尝试赋值:" + n);

        /**
         * trySetCount设置计数器和setCount不同,setCount强制更新计数器的值。
         * trySetCount第一个参数提供当前的VersionedValue,如果期间其它client更新了计数值,
         * 你的更新可能不成功,但是这时你的client更新了最新的值,
         * 若失败可再尝试更新。
         *
         * 只有当此client最后读取的值与zookeeper中的值相等时,才能更新.
         * 调用此方法,不成功时,会刷新本地缓存值.
         * 之后调用getCount()就是新值.
         */
        //boolean b = count.trySetCount(count.getVersionedValue(), n);
        //System.out.println(n + "更新?" + b);

        //或者使用setCount强制更新计数器的值。
        //此方法强制更新计数器值。
        count.setCount(n);
    }

    private CuratorFramework initClient(String path) throws Exception {
        CuratorFramework client = makeClient();
        client.start();

        boolean b = isPathExist(client, path);

        //如果不存在这个路径,stat为null,创建新的节点路径。
        if (!b) {
            String s = client.create()
                    .creatingParentsIfNeeded()
                    .withMode(CreateMode.PERSISTENT)
                    .forPath(path);

            System.out.println("创建 " + s);
        } else {
            System.out.println("已存在:" + path + ",不需重复创建");
        }

        return client;
    }

    //检测是否存在该路径。
    private boolean isPathExist(CuratorFramework client, String path) {
        boolean b = false;

        //检测是否存在该路径。
        try {
            Stat stat = client.checkExists().forPath(path);
            b = stat == null ? false : true;
        } catch (Exception e) {
            e.printStackTrace();
        }

        return b;
    }

    private CuratorFramework makeClient() {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString(getAddress())
                .sessionTimeoutMs(10 * 1000)
                .connectionTimeoutMs(20 * 1000)
                .retryPolicy(retryPolicy)
                .build();

        return client;
    }

    private String getAddress() {
        String ip = "127.0.0.1";
        return ip + ":2181," + ip + ":2182," + ip + ":2183";
    }
}

 

 

输出:

创建 /path/count
-------------
当前值=825700910
当前VersionedValue version=0
当前VersionedValue value=825700910
尝试赋值:260
countHasChanged最新=260
-------------
当前值=260
当前VersionedValue version=1
当前VersionedValue value=260
尝试赋值:295
countHasChanged最新=295
-------------
当前值=295
当前VersionedValue version=2
当前VersionedValue value=295
尝试赋值:614
countHasChanged最新=614
-------------
当前值=614
当前VersionedValue version=3
当前VersionedValue value=614
尝试赋值:736
countHasChanged最新=736
-------------
当前值=736
当前VersionedValue version=4
当前VersionedValue value=736
尝试赋值:240
countHasChanged最新=240
-------------
当前值=240
当前VersionedValue version=5
当前VersionedValue value=240
尝试赋值:299
countHasChanged最新=299
-------------
当前值=299
当前VersionedValue version=6
当前VersionedValue value=299
尝试赋值:130
countHasChanged最新=130
-------------
当前值=130
当前VersionedValue version=7
当前VersionedValue value=130
尝试赋值:472
countHasChanged最新=472
-------------
当前值=472
当前VersionedValue version=8
当前VersionedValue value=472
尝试赋值:223
countHasChanged最新=223
-------------
当前值=223
当前VersionedValue version=9
当前VersionedValue value=223
尝试赋值:427
countHasChanged最新=427

 

注意:

1,如本例,首次启动,当/path/count第一次创建时候,即此时/path/count为空时候,虽然在为SharedCount构造函数时候设置了seedValue为0,但运行结果莫名其妙,第一次取出的当前值竟为825700910,后续使用trySetCount设置SharedCount的值屡试不成功。唯有使用setCount才能成功。

2,一个经验,在整个程序初始化,首次创建节点计数器时候,SharedCount的构造函数虽然设置了seedValue,但若在随后代码中使用trySetCount设置最新值不成功。若当前节点计数器(本例是/path/count)在前面程序中已经设置过值,则trySetCount才能成功。

3,setCount比trySetCount更能达到计数器值更新目标。正是如此,本例最终使用setCount更新计数器而没有使用trySetCount更新计数器。

【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月14日 0

暂无评论

推荐阅读
  TEZNKK3IfmPf   2023年11月14日   37   0   0 zookeeper
TEZNKK3IfmPf