zookeeper大规模分布式集群中任一单点设备上线下线心跳感知系统(一)
  TEZNKK3IfmPf 2023年11月14日 27 0

zookeeper大规模分布式集群中任一单点设备上线下线心跳感知系统(一)

分布式集群中,为了能全局感知任一单点设备的存活状态,经常有心跳感知系统的设计需求,要实现这样的心跳感知。
常规的做法无法就是保持一个Socket长连接或者http短连接,但是这样的实现手法往往扩展性极差,且问题非常多,维护成本很高。
而zookeeper恰恰就是这种分布式集群大规模设备心跳感知系统的最佳“框架”性解决方案。现在假设一个客户端,它需要实时监测分布式集群中的设备节点上线/下线/掉线情况,那么它可以(假设叫做 程序A):

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.util.List;
import java.util.concurrent.TimeUnit;

public class Main {
    private ZooKeeper zk;
    private String DEVICE_PATH = "/devices";

    public static void main(String[] args) {
        Main main = new Main();
        try {
            main.test();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private void test() throws Exception {
        int t = 10;
        zk = new ZooKeeper(getAddress(), t * 1000, new MyWatcher());

        System.out.println("开始连接...");
        while (!zk.getState().isConnected()) {
            TimeUnit.SECONDS.sleep(1);
        }
        System.out.println("连接建立");

        //创建/devices节点路径的工作可以手动。不必一定代码完成。
        if (!checkNodeExist(DEVICE_PATH)) {
            String path = zk.create(DEVICE_PATH, "server".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            System.out.println("创建:" + path);
        }

        synchronized (this) {
            wait();
        }
    }

    private class MyWatcher implements Watcher {
        @Override
        public void process(WatchedEvent event) {
            if (!checkNodeExist(DEVICE_PATH)) {
                return;
            }

            System.out.println("---------");

            try {
                List<String> children = zk.getChildren(DEVICE_PATH, true);

                for (String c : children) {
                    String path = DEVICE_PATH + "/" + c;
                    byte[] data = zk.getData(path, false, null);
                    System.out.println("设备在线:" + path + " @ " + new String(data));
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    private String getAddress() {
        String ip = "127.0.0.1";
        return ip + ":2181," + ip + ":2182," + ip + ":2183";
    }

    private boolean checkNodeExist(String path) {
        boolean b = false;

        try {
            Stat stat = zk.exists(path, true);
            b = stat == null ? false : true;
        } catch (Exception e) {
            e.printStackTrace();
        }

        return b;
    }
}

注意,上面的程序A代码不要把它理解为服务器端,事实上程序A可以运行在分布式集群中的任何设备终端上,它实现的一个功能是捕获和感知任何设备的上线和下线情况。当有新设备接入程序A所在的集群中时候,就被MyWatcher捕获和感知到,同样,只要有设备断开、离开集群,也会被MyWatcher捕获和感知到。

 

 

下面再写一个程序B,程序B模拟一个新设备接入分布式集群:

import org.apache.zookeeper.*;

import java.util.concurrent.TimeUnit;

public class Main {
    private ZooKeeper zooKeeper;
    private String DEVICE_PATH = "/devices";

    public static void main(String[] args) {
        //初始化log4j,zookeeper否则报错。
        //org.apache.log4j.BasicConfigurator.configure();

        try {
            Main app = new Main();
            app.zk();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private void zk() throws Exception {
        zooKeeper = new ZooKeeper(getAddress(), 10 * 1000, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                //System.out.println(event.toString());
            }
        });

        System.out.println("开始连接...");
        while (!zooKeeper.getState().isConnected()) {
            TimeUnit.SECONDS.sleep(1);
        }
        System.out.println("连接建立");

        login();

        synchronized (this) {
            wait();
        }
    }

    private void login() throws Exception {
        //创建节点。
        String path = zooKeeper.create(DEVICE_PATH + "/client", "ip:127.0.0.1".getBytes("UTF-8"), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        byte[] data = zooKeeper.getData(path, null, null);
        System.out.println(path + " => " + new String(data));
        System.out.println("注册设备成功");
    }

    private String getAddress() {
        String ip = "127.0.0.1";
        return ip + ":2181," + ip + ":2182," + ip + ":2183";
    }
}

运行程序B,相当于分布式集群中有新设备进入,那么程序B代表的设备就会被程序A捕获。若程序B离开、断开与集群的连接,那么同样可以被程序A感知。

实际上,把程序A的心跳、感知功能代码(其实就是程序A的MyWatcher)写入到程序B中,那么程序A,和程序B就可以互相感知,这样就做到在分布式环境中,设备心跳,上线/下线的互相感知系统。

【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月14日 0

暂无评论

推荐阅读
TEZNKK3IfmPf