ZeroMQ入门
-
-
- ZeroMQ入门和进阶
-
- ZeroMQ下载编译
- ZeroMQ文档
- ZeroMQ 解决传统网络编程的问题
- ZeroMQ模型介绍
-
- 【1】REQ/REP 请求响应模型
- 【2】PUB/SUB 发布订阅模型
- 【3】PUSH/PULL 推拉模型
- 【4】ROUTER/DEALER 模型
- PUB/SUB模型与PUSH/PULL模型的区别
- ZeroMQ应用范例
-
- 【1】REQ/REP 请求响应模型
- 【2】PUB/SUB 发布订阅模型
- 【3】PUSH/PULL 推拉模型
- 【4】ROUTER/DEALER 模型
- ZeroMQ存在的问题
-
ZeroMQ入门和进阶
ZeroMQ下载编译
源码下载地址:http://download.zeromq.org/
ubuntu安装必须的库
sudo apt-get install libtool
sudo apt-get install pkg-config
sudo apt-get install build-essential
sudo apt-get install autoconf
sudo apt-get install automake
安装加密库
Sodium?个易于使用的可为我们提供加密、解密、签名,密码哈希等功能的软件库。除了自身强?的功能外,它还为我们提供了?个兼容API和?个外部API,以进?步的帮助我们提高其可用性。Sodium的目标是提供构建更高级别加密工具所需的所有核心操作。
若命令行不能安装,则去这个github网址手动下载并解压
git clone git://github.com/jedisct1/libsodium.git
cd libsodium
./autogen.sh -s
./configure && make check
sudo make install
sudo ldconfig
cd ..
下载、编译、安装libzmq
# 下载
git clone https://github.com/zeromq/libzmq.git
cd libzmq
# 查看tag
git tag
# 版本 获取指定的版本,不要?主分?,可能有bug
git checkout v4.3.4
./autogen.sh
./configure && make check
sudo make install
sudo ldconfig
cd ..
编译debug版本时使用 ./configure --enable-debug
sudo make install的是时候可以看到具体的.so和.a
libtool: install: /usr/bin/install -c src/.libs/libzmq.lai /usr/local/lib/libzmq.la
libtool: install: /usr/bin/install -c src/.libs/libzmq.a /usr/local/lib/libzmq.a
libtool: install: chmod 644 /usr/local/lib/libzmq.a
libtool: install: ranlib /usr/local/lib/libzmq.a
libtool: finish: PATH="/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/snap/bin:/sbin" ldconfig - n /usr/local/lib
我们在编译的时候需要加上libzmq库,?如gcc -o bin file.c -lzmq
ZeroMQ文档
官方API: http://api.zeromq.org/
英文指南:http://zguide.zeromq.org/page:all
中文指南:https://github.com/anjuke/zguide-cn,文档里面的代码有些是过时的,需要参考提供的github链接的代码
性能测试:http://wiki.zeromq.org/results:perf-howto
ZeroMQ 解决传统网络编程的问题
? 调用的socket接口较多;
? TCP是一对一的连接;- 一对多 , reactor模式
? 编程需要关注很多socket细节问题;
? 不支持跨平台编程;
? 需要自行处理分包、组包问题;
? 流式传输时需处理粘包、半包问题;
? 需自行处理网络异常,比如连接异常中断、重连等;
? 服务端和客户端启动有先后;
? 自行处理IO模型;
? 自行实现消息的缓存 (消息水位);
? 自行实现对消息的加密
ZeroMQ模型介绍
【1】REQ/REP 请求响应模型
【2】PUB/SUB 发布订阅模型
【3】PUSH/PULL 推拉模型
如上图所示,push/pull既可以做客户端,也可以做服务端。
- Ventllator push推送信息
- 多个Worker pull拉取信息消费,然后把结果push给Sink
- Sink pull拉取结果消费。
这个模式有点像多线程竞争从全局队列获取消息进行消费。
【4】ROUTER/DEALER 模型
PUB/SUB模型与PUSH/PULL模型的区别
- PUB/SUB模型中,client发的消息,不同的server可接收所有的msg进行消费(同topic)。
- PUSH/PULL模型中,client发的消息,不同server接收指定的msg进行消费。类似于多线程从全局队列取消息模型。
ZeroMQ应用范例
【1】REQ/REP 请求响应模型
hwclient.c
#include <zmq.h>
#include <string.h>
#include <stdio.h>
#include <unistd.h>
//编译:gcc -o hwclient hwclient.c -lzmq
int main (void)
{
printf ("Connecting to hello world server...\n");void *context = zmq_ctx_new ();// 连接至服务端的套接字void *requester = zmq_socket (context, ZMQ_REQ);zmq_connect (requester, "tcp://localhost:5555");int request_nbr;int ret = 0;for (request_nbr = 0; request_nbr != 10; request_nbr++) {
char buffer [10];printf ("正在发送1 Hello %d...\n", request_nbr);ret = zmq_send (requester, "Hello", 5, 0);printf ("zmq_send1 ret:%d\n", ret);printf ("正在发送2 Hello %d...\n", request_nbr);ret = zmq_send (requester, "Hello", 5, 0);printf ("zmq_send2 ret:%d\n", ret);zmq_recv (requester, buffer, 6, 0); // 收到响应才能再发zmq_recv (requester, buffer, 6, 0); printf ("接收到 World %d\n", request_nbr);}zmq_close (requester);zmq_ctx_destroy (context);return 0;
}
hwserver.c
#include <zmq.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <assert.h>
//gcc -o hwserver hwserver.c -lzmq
int main (void)
{
// Socket to talk to clientsvoid *context = zmq_ctx_new ();// 与客户端通信的套接字void *responder = zmq_socket (context, ZMQ_REP);int rc = zmq_bind (responder, "tcp://*:5555"); // 服务器要做绑定assert (rc == 0);while (1) {
// 等待客户端请求char buffer [10];int size = zmq_recv (responder, buffer, 10, 0);buffer[size] = '\0';printf ("收到 %s\n", buffer);sleep (1); // Do some 'work'// 返回应答zmq_send (responder, "World", 5, 0);}return 0;
}
开启hwserver
# ./hwserver
调试hwclient
# gdb hwclient //开始gdb调试(gdb) b main //打断点到main函数(gdb) r //重新开始运行文件(run-text:加载文本文件,run-bin:加载二进制文件)(gdb) c //继续运行# ctrl+c //终止/中断运行(gdb) info threads //查看hwclient进程有多少个线程,如下可见开了3个线程Id Target Id Frame 3 Thread 0x7ffff65c7700 (LWP 29297) "ZMQbg/IO/0" 0x00007ffff7b0c0e3 in epoll_wait ()from /lib64/libc.so.62 Thread 0x7ffff6dc8700 (LWP 29296) "ZMQbg/Reaper" 0x00007ffff7b0c0e3 in epoll_wait ()from /lib64/libc.so.6
* 1 Thread 0x7ffff7ef0740 (LWP 29292) "hwclient" 0x00007ffff7b00ddd in poll () from /lib64/libc.so.6(gdb) thread 3 //切换到线程3
[Switching to thread 3 (Thread 0x7ffff65c7700 (LWP 29297))]
#0 0x00007ffff7b0c0e3 in epoll_wait () from /lib64/libc.so.6(gdb) bt //查看堆栈信息,可见线程3用到了epoll
#0 0x00007ffff7b0c0e3 in epoll_wait () from /lib64/libc.so.6
#1 0x00007ffff7f2e22b in zmq::epoll_t::loop (this=0x4095b0) at src/epoll.cpp:184
#2 0x00007ffff7f4f2d9 in zmq::worker_poller_base_t::worker_routine (arg_=0x4095b0)at src/poller_base.cpp:146
#3 0x00007ffff7f7a060 in thread_routine (arg_=0x409608) at src/thread.cpp:257
#4 0x00007ffff75f0ea5 in start_thread () from /lib64/libpthread.so.0
#5 0x00007ffff7b0bb0d in clone () from /lib64/libc.so.6(gdb) q //退出gdb
【2】PUB/SUB 发布订阅模型
wuclient.c
#include "zhelpers.h"
//编译:gcc -o wuclient wuclient.c -lzmq
int main (int argc, char *argv [])
{
// Socket to talk to serverprintf ("Collecting updates from weather server...\n");void *context = zmq_ctx_new ();void *subscriber = zmq_socket (context, ZMQ_SUB);printf("zmq_connect\n");int rc = zmq_connect (subscriber, "tcp://localhost:5556");assert (rc == 0);// Subscribe to zipcode, default is NYC, 10001const char *filter = (argc > 1)? argv [1]: "10001 ";printf("ZMQ_SUBSCRIBE\n");rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE,filter, strlen (filter)); // 字符匹配的方式assert (rc == 0);// Process 100 updatesint update_nbr;long total_temp = 0;printf("into for\n");for (update_nbr = 0; update_nbr < 100; update_nbr++) {
char *string = s_recv (subscriber);int zipcode, temperature, relhumidity;sscanf (string, "%d %d %d",&zipcode, &temperature, &relhumidity);total_temp += temperature;printf("zipcode = %d, temperature:%d\n", zipcode, temperature);free (string);}printf ("Average temperature for zipcode '%s' was %dF\n",filter, (int) (total_temp / update_nbr));zmq_close (subscriber);zmq_ctx_destroy (context);return 0;
}
wuserver.c
#include "zhelpers.h"
int main (void)
{
// Prepare our context and publishervoid *context = zmq_ctx_new ();void *publisher = zmq_socket (context, ZMQ_PUB);int rc = zmq_bind (publisher, "tcp://*:5556");assert (rc == 0);// Initialize random number generatorsrandom ((unsigned) time (NULL));int zipcode, temperature, relhumidity;temperature = 0;while (1) {
// Get values that will fool the bossint zipcode, temperature, relhumidity;zipcode = randof (100000);// zipcode = 10001;if(++temperature < 0)temperature = 0;relhumidity = randof (50) + 10;// Send message to all subscriberschar update [20];sprintf (update, "%05d %d %d", zipcode, temperature, relhumidity);printf ("pub %05d %d %d\n", zipcode, temperature, relhumidity);s_send (publisher, update); // 发布是不管有没有人订阅// sleep(1);}zmq_close (publisher);zmq_ctx_destroy (context);return 0;
}
启动服务器
# ./wuserver
客户端1订阅10001频道
# ./wuclient 10001
zmq_connect
ZMQ_SUBSCRIBE
into for
zipcode = 10001, temperature:155055
zipcode = 10001, temperature:291385
zipcode = 10001, temperature:343617
zipcode = 10001, temperature:486772
zipcode = 10001, temperature:554440
zipcode = 10001, temperature:668110
...
客户端2订阅10002频道
# ./wuclient 10002
zmq_connect
ZMQ_SUBSCRIBE
into for
zipcode = 10002, temperature:38414
zipcode = 10002, temperature:100755
zipcode = 10002, temperature:125598
zipcode = 10002, temperature:196114
...
【3】PUSH/PULL 推拉模型
taskvent.c
#include "zhelpers.h"
int main (void)
{
void *context = zmq_ctx_new ();// Socket to send messages onvoid *sender = zmq_socket (context, ZMQ_PUSH);zmq_bind (sender, "tcp://*:5557");// Socket to send start of batch message onvoid *sink = zmq_socket (context, ZMQ_PUSH);zmq_connect (sink, "tcp://localhost:5558");printf ("Press Enter when the workers are ready: ");getchar ();printf ("Sending tasks to workers...\n");// The first message is "0" and signals start of batchs_send (sink, "0");// Initialize random number generatorsrandom ((unsigned) time (NULL));// Send 100 tasksint task_nbr;int total_msec = 0; // Total expected cost in msecsfor (task_nbr = 0; task_nbr < 100; task_nbr++) {
int workload;// Random workload from 1 to 100msecsworkload = randof (100) + 1;total_msec += workload;char string [10];sprintf (string, "%d", workload);s_send (sender, string); // 用户层没有关注pull}printf ("Total expected cost: %d msec\n", total_msec);zmq_close (sink);zmq_close (sender);zmq_ctx_destroy (context);return 0;
}
taskwork.c
#include "zhelpers.h"
int main (void)
{
// Socket to receive messages onvoid *context = zmq_ctx_new ();void *receiver = zmq_socket (context, ZMQ_PULL);zmq_connect (receiver, "tcp://localhost:5557");// Socket to send messages tovoid *sender = zmq_socket (context, ZMQ_PUSH);zmq_connect (sender, "tcp://localhost:5558");// Process tasks foreverwhile (1) {
char *string = s_recv (receiver);printf ("%s.", string); // Show progressfflush (stdout);s_sleep (atoi (string)); // Do the workfree (string);s_send (sender, ""); // Send results to sink 把结果发送给sink}zmq_close (receiver);zmq_close (sender);zmq_ctx_destroy (context);return 0;
}
tasksink.c
#include "zhelpers.h"
int main (void)
{
// Prepare our context and socketvoid *context = zmq_ctx_new ();void *receiver = zmq_socket (context, ZMQ_PULL);zmq_bind (receiver, "tcp://*:5558");// Wait for start of batchchar *string = s_recv (receiver);free (string);// Start our clock nowint64_t start_time = s_clock ();// Process 100 confirmationsint task_nbr;for (task_nbr = 0; task_nbr < 100; task_nbr++) {
char *string = s_recv (receiver);free (string);if (task_nbr % 10 == 0)printf (":");elseprintf (".");fflush (stdout);}// Calculate and report duration of batchprintf ("Total elapsed time: %d msec\n", (int) (s_clock () - start_time));zmq_close (receiver);zmq_ctx_destroy (context);return 0;
}
开启taskvent
# ./taskvent
Press Enter when the workers are ready:
Sending tasks to workers...
Total expected cost: 4830 msec
开启tasksink
# ./tasksink
:.........:.........:.........:.........:.........:.........:.........:.........:.........:.........Total elapsed time: 2640 msec
开启taskwork1
# ./taskwork
69.72.7.85.100.89.19.16.75.52.20.97.78.32.57.74.55.77.83.3.95.37.2.67.83.33.19.6.8.97.6.90.15.22.28.79.14.36.69.91.20.54.53.34.97.25.100.62.80.11.
开启taskwork2
# ./taskwork
87.52.38.41.34.55.11.54.31.58.100.4.79.2.69.26.45.62.62.82.91.13.53.76.19.3.30.96.38.65.70.60.67.97.24.23.16.16.2.51.24.38.60.61.30.2.14.15.83.8.
【4】ROUTER/DEALER 模型
rrworker.c
#include "zhelpers.h"
#include <unistd.h>
int main (void)
{
void *context = zmq_ctx_new ();// Socket to talk to clientsvoid *responder = zmq_socket (context, ZMQ_REP);zmq_connect (responder, "tcp://localhost:5560");while (1) {
// Wait for next request from clientchar *string = s_recv (responder);printf ("Received request: [%s]\n", string);free (string);// Do some 'work'sleep (1);// Send reply back to clients_send (responder, "World");}// We never get here, but clean up anyhowzmq_close (responder);zmq_ctx_destroy (context);return 0;
}
rrworker2.c
#include "zhelpers.h"
#include <unistd.h>
int main (void)
{
void *context = zmq_ctx_new ();// Socket to talk to clientsvoid *responder = zmq_socket (context, ZMQ_REP);zmq_connect (responder, "tcp://localhost:5560");while (1) {
// Wait for next request from clientchar *string = s_recv (responder);printf ("Received request: [%s]\n", string);free (string);// Do some 'work'sleep (1);// Send reply back to clients_send (responder, "World2");}// We never get here, but clean up anyhowzmq_close (responder);zmq_ctx_destroy (context);return 0;
}
rrbroker.c
// Simple request-reply broker
#include "zhelpers.h"int main (void)
{
// Prepare our context and socketsvoid *context = zmq_ctx_new ();void *frontend = zmq_socket (context, ZMQ_ROUTER);void *backend = zmq_socket (context, ZMQ_DEALER);zmq_bind (frontend, "tcp://*:5559");zmq_bind (backend, "tcp://*:5560");// Initialize poll setzmq_pollitem_t items [] = {
{
frontend, 0, ZMQ_POLLIN, 0 },{
backend, 0, ZMQ_POLLIN, 0 }};// Switch messages between socketswhile (1) {
zmq_msg_t message;zmq_poll (items, 2, -1);if (items [0].revents & ZMQ_POLLIN) {
while (1) {
// Process all parts of the messagezmq_msg_init (&message);zmq_msg_recv (&message, frontend, 0);int more = zmq_msg_more (&message);zmq_msg_send (&message, backend, more? ZMQ_SNDMORE: 0);zmq_msg_close (&message);if (!more)break; // Last message part}}if (items [1].revents & ZMQ_POLLIN) {
while (1) {
// Process all parts of the messagezmq_msg_init (&message);zmq_msg_recv (&message, backend, 0);int more = zmq_msg_more (&message);zmq_msg_send (&message, frontend, more? ZMQ_SNDMORE: 0);zmq_msg_close (&message);if (!more)break; // Last message part}}}// We never get here, but clean up anyhowzmq_close (frontend);zmq_close (backend);zmq_ctx_destroy (context);return 0;
}
rrclient.c
#include "zhelpers.h"
int main (void)
{
void *context = zmq_ctx_new ();// Socket to talk to servervoid *requester = zmq_socket (context, ZMQ_REQ);zmq_connect (requester, "tcp://localhost:5559");int request_nbr;for (request_nbr = 0; request_nbr != 10; request_nbr++) {
s_send (requester, "Hello");char *string = s_recv (requester);printf ("Received reply %d [%s]\n", request_nbr, string);free (string);}zmq_close (requester);zmq_ctx_destroy (context);return 0;
}
开启rrworker
# ./rrworker
Received request: [Hello]
Received request: [Hello]
Received request: [Hello]
Received request: [Hello]
Received request: [Hello]
Received request: [Hello]
Received request: [Hello]
Received request: [Hello]
Received request: [Hello]
Received request: [Hello]
Received request: [Hello]
Received request: [Hello]
Received request: [Hello]
Received request: [Hello]
Received request: [Hello]
开启rrworker2
# ./rrworker2
Received request: [Hello]
Received request: [Hello]
Received request: [Hello]
Received request: [Hello]
Received request: [Hello]
Received request: [Hello]
Received request: [Hello]
Received request: [Hello]
Received request: [Hello]
Received request: [Hello]
开启rrbroker
# ./rrbroker
开启rrclient1
# ./rrclient
Received reply 0 [World2]
Received reply 1 [World2]
Received reply 2 [World2]
Received reply 3 [World2]
Received reply 4 [World2]
Received reply 5 [World2]
Received reply 6 [World2]
Received reply 7 [World2]
Received reply 8 [World]
Received reply 9 [World2]
开启rrclient2
# ./rrclient
Received reply 0 [World]
Received reply 1 [World2]
Received reply 2 [World]
Received reply 3 [World]
Received reply 4 [World]
Received reply 5 [World]
Received reply 6 [World]
Received reply 7 [World]
Received reply 8 [World]
Received reply 9 [World]
ZeroMQ存在的问题
ZeroMQ使用的不如Kafka、RocketMQ广泛的主要原因:
- 耦合的问题。程序要继承ZeroMQ本身
- 无法集群
- 无副本模式
- 无法持久化