Imagine the following code:
import threading, zmq, time
context = zmq.Context()
receivers = []
poller = zmq.Poller()
def thread_fn(number: int):
sender = context.socket(zmq.PUSH)
sender.connect("tcp://localhost:%d" % (6666 + number))
for i in range(10):
sender.send_string("message from thread %d" % number)
for i in range(3):
new_receiver = context.socket(zmq.PULL)
new_receiver.bind("tcp://*:%d" % (6666 + i))
poller.register(new_receiver, zmq.POLLIN)
receivers.append(new_receiver)
threading.Thread(target=lambda: thread_fn(i), daemon=True).start()
while True:
try:
socks = dict(poller.poll())
except KeyboardInterrupt:
break
for i in range(3):
if receivers[i] in socks:
print("%d: process message %s" % (i, receivers[i].recv_string()))
time.sleep(0.2) # 'process' the data
The threads send some messages without interruption which arrive in some random order at the corresponding PULL-sockets where they get 'processed'.
Note: usually you would connect to one PULL-socket but this example intends to provide more than one receiving socket.
Output is:
0: process message message from thread 0
1: process message message from thread 1
0: process message message from thread 0
1: process message message from thread 1
2: process message message from thread 2
0: process message message from thread 0
1: process message message from thread 1
2: process message message from thread 2
....
Now I want to read from all sockets like in the example but I'd like to prioritise one socket.
I.e.: I want the output to be:
0: process message message from thread 0 <-- socket 0 processed first
0: process message message from thread 0
0: process message message from thread 0
0: process message message from thread 0
0: process message message from thread 0
1: process message message from thread 1
1: process message message from thread 1
2: process message message from thread 2
1: process message message from thread 1
2: process message message from thread 2
....
Of course I can just poll the sockets separately with timeout=0 but I want to be sure ZeroMQ doesn't do this for me already.
So the questions are:Q1:
Is there another way ( except the built-in .poll( timeout ) )
to make sure I've read messages from one socket first
before waiting for messages on the other sockets?
Q2:
Is there a known best practice to do it manually?