ZeroMQ发布/订阅模型

在前面一篇文章中ZeroMQ介绍已经介绍了ZeroMQ,以及相应的Python接口zmq,还介绍了一个REQ/RES模型,这篇文章就介绍ZeroMQ另外一种模型。

Publish/Subscribe

Publish/Subscribe是一个很经典的模型,发布消息端就是Publish端,接收消息端就是Subscribe。Publish只管发布消息,甚至都不需要知道是否有Subscribe存在。在现实生活中天气预报就是可以应用这个方式。
看下面两种典型的模型。

  • 一个SUB可以订阅多个PUB,各个SUB都是socket连接,而且可以独立。
  • 正常情况都是一个PUB有多个SUB订阅,类似client和server概念。

消息都是从PUB端推送到SUB端,消息的流通是单向的。下面实现一个模型:一个是PUB端代码,另一个是SUB端代码。

PUB端

初始化socket的时候指定zmq.PUB,然后绑定某个端口监听。

import zmq
import random
import sys
import time

port = "5556"
if len(sys.argv) > 1:
    port =  sys.argv[1]
    int(port)

context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:%s" % port)

i = 0
time.sleep(1)
while True:
    topic = i
    messagedata = i
    print "%d %d" % (topic, messagedata)
    socket.send("%d %d" % (topic, messagedata))
    i = i + 1
    time.sleep(1)

SUB端

socket初始化的时候指定zmq.SUB,还需要设置属性zmq.SUBSCRIBE

import sys
import time
import zmq

port = 5556
if len(sys.argv) > 1:
    port =  sys.argv[1]
    int(port)
    
if len(sys.argv) > 2:
    port1 =  sys.argv[2]
    int(port1)

# Socket to talk to server
context = zmq.Context()
socket = context.socket(zmq.SUB)

print "Collecting updates from weather server..."
socket.connect ("tcp://localhost:%s" % port)

if len(sys.argv) > 2:
    socket.connect ("tcp://localhost:%s" % port1)

# Subscribe to zipcode, default is NYC, 10001
# topicfilter = "10001"
socket.setsockopt(zmq.SUBSCRIBE,'')

# Process 5 updates
total_value = 0
i = 0 
while True:
    string = socket.recv()
    topic, messagedata = string.split()
    total_value += int(messagedata)
    print topic, messagedata

测试运行

  • 可以运行一个PUB端,运行多个SUB端,PUB端每次发布消息,所有SUB都会收到。
  • PUB端和SUB端都无所谓启动顺序,都不会报错。但是一旦PUB端先启动起来,立马开始发布消息,这时候SUB端之后起来接收不到之前已经发布过的消息。
  • PUB和SUB正常工作的工程中,如果PUB突然退出,然后PUB再启动,SUB端还是能接收到PUB发布的消息,正常工作。

丢消息

  • 先运行SUB端,然后PUB端启动后就不断发消息,很快就发完,会导致丢消息。把PUB端代码修改成这样。
#PUB
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:%s" % port)

for i in xrange(100):
    socket.send(i)

上面的代码会发现PUB发完了消息,SUB一个都没收到,官方的解释是,PUB端bind之后就发送消息,这时候PUB和SUB还没建立连接,于是消息全丢了。一个比较笨的方法就是,在PUB发布消息之前,sleep一会儿,等待SUB和PUB建立好连接再发送。

  • 一旦PUB和SUB连接好之后,我测试了,PUB端while循环发送100000条数据,三个SUB收到了所有的数据,这个还是很牛逼的。

总结

  • 对于PUB,它是只管发布消息,不管SUB是否存在,有多少个SUB监听,就都收到PUB发布的消息。
  • 为了确保PUB发布消息不丢,在发布消息之前必须保证PUB和SUB连接建立OK再发,否则可能会丢失建立连接前的数据。

标签:none

评论已关闭