ZMQ/ZeroMQ简介、安装使用及zmq_poll函数
  qrJHiMhufrJ3 2023年11月13日 17 0

1、什么是ZMQ

  ZeroMQ(也称为ÖMQ、0MQ或zmq)看起来像是一个可嵌入的网络库,但它的作用类似于一个并发框架。它为您提供了在进程内、进程间、TCP和多播等各种传输中传递原子消息的套接字。您可以使用扇出、发布订阅、任务分发和请求回复等模式将套接字N到N连接起来。它的速度足以成为集群产品的结构。它的异步I/O模型为您提供了可扩展的多核应用程序,构建为异步消息处理任务。它有许多语言API,并在大多数操作系统上运行。

  ZeroMQ(也拼写为ÖMQ、0MQ或ZMQ)是一个高性能异步消息传递库,旨在用于分布式或并发应用程序。它提供了一个消息队列,但与面向消息的中间件不同,ZeroMQ系统可以在没有专用消息代理的情况下运行。

  ZeroMQ支持多种传输(TCP、进程内、进程间、多播、WebSocket等)上的通用消息传递模式(发布/订阅、请求/回复、客户端/服务器等),使进程间消息传递与线程间消息传递一样简单。这使您的代码保持清晰、模块化和极易扩展。

  ZeroMQ是由大量贡献者开发的。有许多流行编程语言的第三方绑定,以及C#和Java的本机端口。

2、ZMQ的特点

  1、 组件来去自如,ZQM会负责自动重连,服务端和客户端可以随意的退出网络。tcp的话,必须现有服务端启动,在启动客户端,否则会报错。

  2、ZMQ会在必要的情况下将消息放入队列中保存,一旦建立了连接就开始发送。

  3、ZMQ有阈值机制,当队列满的时候,可以自动阻塞发送者,或者丢弃部分消息。

  4、ZMQ可以使用不同的通信协议进行连接,TCP,进程间,线程间。

  5、ZMQ提供了多种模式进行消息路由,如请求-应答模式,发布-订阅模式等,这些模式可以用来搭建网络拓扑结构。

  6、ZMQ会在后台线程异步的处理I/O操作,他使用一种不会死锁的数据结构来存储消息。同时ZeroMQ不在乎目的是否存在。

  7、TCP的通信拓扑是一对一的,而ZMQ可以是一对一、一对多、多对一或者多对多。

  8、ZeroMQ传输的是消息,TCP传输字节。

3、ZMQ与TCP的区别

3.1、连接的区别

  1、使用多种协议,inproc(进程内)、ipc(进程间)、tcp、pgm(广播)、epgm。

  2、当客户端使用zmq_connect()时连接就已经建立了,并不要求该端点已有某个服务使用zmq_bind()进行了绑定。

  3、连接是异步的,并由一组消息队列做缓冲。

  4、连接会表现出某种消息模式,这是由创建连接的套接字类型决定的。

  5、一个套接字可以有多个输入和输出连接。

  6、ZMQ没有提供类似zmq_accept()的函数,因为当套接字绑定至端点时它就自动开始接受连接了。

  7、应用程序无法直接和这些连接打交道,因为它们是被封装在ZMQ底层的。

3.2、传输数据的区别

  1、ZMQ套接字传输的是消息,而不是字节(TCP)或帧(UDP)。消息指的是一段指定长度的二进制数据块,这种设计是为了性能优化而考虑的,所以可能会比较难以理解。

  2、ZMQ套接字在后台进行I/O操作,也就是说无论是接收还是发送消息,它都会先传送到一个本地的缓冲队列,这个内存队列的大小是可以配置的。

  3、ZMQ套接字可以和多个套接字进行连接(如果套接字类型允许的话)。TCP协议只能进行点对点的连接,而ZMQ则可以进行一对多(类似于无线广播)、多对多(类似于邮局)、多对一(类似于信箱),当然也包括一对一的情况。

  4、ZMQ套接字可以发送消息给多个端点(扇出模型),或从多个端点中接收消息(扇入模型)。

图片.png

4、编译安装

4.1、安装依赖

sudo apt-get install libtool pkg-config build-essential autoconf automake

4.2、编译安装ZMQ使用的加密库

git clone git://github.com/jedisct1/libsodium.git
cd libsodium 
./autogen.sh 
./configure
make check
sudo make install
sudo ldconfig
cd ../

4.3、编译安装libzmq

git clone git://github.com/zeromq/libzmq.git
cd libzmq
./autogen.sh
./configure –with-libsodium
make 
sudo make install
sudo ldconfig
cd ../

4.4、安装ZMQ的c库

  添加编译选项 -lczmq -lzmq。

git clone git://github.com/zeromq/czmq.git
cd czmq
./autogen.sh
./configure && make check
sudo make install
sudo ldconfig
cd -

4.5、添加ZMQ的C++库

git clone https://github.com/Microsoft/vcpkg.git
cd vcpkg
./bootstrap-vcpkg.sh # bootstrap-vcpkg.bat for Powershell
./vcpkg integrate install
./vcpkg install cppzmq
cd -

5、使用

5.1、cmake使用

  cmake下使用,需要再CMakeList文件中添加如下内容:

#find cppzmq wrapper, installed by make of cppzmq
find_package(cppzmq)
target_link_libraries(*Your Project Name* cppzmq)

5.2、实例

  C++开发可以参考如下实例代码进行开发工作:   server端:

#include <zmq.hpp>
 
int main()
{
    zmq::context_t ctx;
    zmq::socket_t sock(ctx, zmq::socket_type::push);
    sock.bind("inproc://test");
    sock.send(zmq::str_buffer("Hello, world"), zmq::send_flags::dontwait);
}

  Client端:

#include <iostream>
#include <zmq_addon.hpp>
 
int main()
{
    zmq::context_t ctx;
    zmq::socket_t sock1(ctx, zmq::socket_type::push);
    zmq::socket_t sock2(ctx, zmq::socket_type::pull);
    sock1.bind("tcp://127.0.0.1:*");
    const std::string last_endpoint =
        sock1.get(zmq::sockopt::last_endpoint);
    std::cout << "Connecting to "
              << last_endpoint << std::endl;
    sock2.connect(last_endpoint);
 
    std::array<zmq::const_buffer, 2> send_msgs = {
        zmq::str_buffer("foo"),
        zmq::str_buffer("bar!")
    };
    if (!zmq::send_multipart(sock1, send_msgs)) {
        return 1;
    }
 
    std::vector<zmq::message_t> recv_msgs;
    const auto ret = zmq::recv_multipart(
        sock2, std::back_inserter(recv_msgs));
    if (!ret) {
        return 1;
    }
    std::cout << "Got " << *ret
              << " messages" << std::endl;
    return 0;
}

  在之前的示例中,主程序的循环体内会做以下几件事:

    1、等待套接字的消息。

    2、处理消息。

    3、返回第一步。

6、zmq_poll()的使用

  如果我们想要读取多个套接字中的消息呢?最简单的方法是将套接字连接到多个端点上,让ZMQ使用公平队列的机制来接受消息。如果不同端点上的套接字类型是一致的,那可以使用这种方法。但是,如果一个套接字的类型是PULL,另一个是PUB怎么办?如果现在开始混用套接字类型,那将来就没有可靠性可言了。

  正确的方法应该是使用zmq_poll()函数。更好的方法是将zmq_poll()包装成一个框架,编写一个事件驱动的反应器,但这个就比较复杂了,我们这里暂不讨论。

  我们先不使用zmq_poll(),而用NOBLOCK(非阻塞)的方式来实现从多个套接字读取消息的功能。下面将气象信息服务和并行处理这两个示例结合起来:
  msreader: Multiple socket reader in C

#include "zhelpers.h"
 
int main (void) 
{
    //  准备上下文和套接字
    void *context = zmq_init (1);
 
    //  连接至任务分发器
    void *receiver = zmq_socket (context, ZMQ_PULL);
    zmq_connect (receiver, "tcp://localhost:5557");
 
    //  连接至天气服务
    void *subscriber = zmq_socket (context, ZMQ_SUB);
    zmq_connect (subscriber, "tcp://localhost:5556");
    zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "10001 ", 6);
 
    //  处理从两个套接字中接收到的消息
    //  这里我们会优先处理从任务分发器接收到的消息
    while (1) {
        //  处理等待中的任务
        int rc;
        for (rc = 0; !rc; ) {
            zmq_msg_t task;
            zmq_msg_init (&task);
            if ((rc = zmq_recv (receiver, &task, ZMQ_NOBLOCK)) == 0) {
                //  处理任务
            }
            zmq_msg_close (&task);
        }
        //  处理等待中的气象更新
        for (rc = 0; !rc; ) {
            zmq_msg_t update;
            zmq_msg_init (&update);
            if ((rc = zmq_recv (subscriber, &update, ZMQ_NOBLOCK)) == 0) {
                //  处理气象更新
            }
            zmq_msg_close (&update);
        }
        // 没有消息,等待1毫秒
        s_sleep (1);
    }
    //  程序不会运行到这里,但还是做正确的退出清理工作
    zmq_close (receiver);
    zmq_close (subscriber);
    zmq_term (context);
    return 0;
}

  这种方式的缺点之一是,在收到第一条消息之前会有1毫秒的延迟,这在高压力的程序中还是会构成问题的。此外,你还需要翻阅诸如nanosleep()的函数,不会造成循环次数的激增。

  示例中将任务分发器的优先级提升了,你可以做一个改进,轮流处理消息,正如ZMQ内部做的公平队列机制一样。

  下面,让我们看看如何用zmq_poll()来实现同样的功能:

  mspoller: Multiple socket poller in C

#include "zhelpers.h"
 
int main (void) 
{
    void *context = zmq_init (1);
 
    //  连接任务分发器
    void *receiver = zmq_socket (context, ZMQ_PULL);
    zmq_connect (receiver, "tcp://localhost:5557");
 
    //  连接气象更新服务
    void *subscriber = zmq_socket (context, ZMQ_SUB);
    zmq_connect (subscriber, "tcp://localhost:5556");
    zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "10001 ", 6);
 
    //  初始化轮询对象
    zmq_pollitem_t items [] = {
        { receiver, 0, ZMQ_POLLIN, 0 },
        { subscriber, 0, ZMQ_POLLIN, 0 }
    };
    //  处理来自两个套接字的消息
    while (1) {
        zmq_msg_t message;
        zmq_poll (items, 2, -1);
        if (items [0].revents & ZMQ_POLLIN) {
            zmq_msg_init (&message);
            zmq_recv (receiver, &message, 0);
            //  处理任务
            zmq_msg_close (&message);
        }
        if (items [1].revents & ZMQ_POLLIN) {
            zmq_msg_init (&message);
            zmq_recv (subscriber, &message, 0);
            //  处理气象更新
            zmq_msg_close (&message);
        }
    }
    //  程序不会运行到这儿
    zmq_close (receiver);
    zmq_close (subscriber);
    zmq_term (context);
    return 0;
}
【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月13日 0

暂无评论

qrJHiMhufrJ3