Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

crash when socket used in multiple threads #2043

Open
1 task done
Sec42 opened this issue Oct 22, 2024 · 8 comments
Open
1 task done

crash when socket used in multiple threads #2043

Sec42 opened this issue Oct 22, 2024 · 8 comments
Labels

Comments

@Sec42
Copy link

Sec42 commented Oct 22, 2024

This is a pyzmq bug

  • This is a pyzmq-specific bug, not an issue of zmq socket behavior. Don't worry if you're not sure! We'll figure it out together.

What pyzmq version?

26.2.0

What libzmq version?

4.3.5

Python version (and how it was installed)

Python 3.8.13 (7.3.9+dfsg-1ubuntu0.1, Nov 15 2022, 06:22:50) [PyPy 7.3.9 with GCC 11.3.0] on linux

OS

Ubuntu 22.04.5 LTS

What happened?

my script using zmq randomly dies in different asserts after a few (~1-8) hours

Example asserts:

Assertion failed: false (src/object.cpp:142)
Assertion failed: ok (src/mailbox.cpp:72)

Code to reproduce bug

    import zmq

    url = "tcp://127.0.0.1:4223"

    context = zmq.Context()
    socket = context.socket(zmq.XPUB)
    socket.setsockopt(zmq.XPUB_VERBOSE, True)
    socket.bind(url)

    def zmq_thread(socket):
        try:
            while True:
                event = socket.recv()
                 # Event is one byte 0=unsub or 1=sub, followed by topic
                if event[0] == 1:
                    log("new subscriber for", event[1:])
                elif event[0] == 0:
                    log("unsubscribed",event[1:])

        except zmq.error.ContextTerminated:
            pass

    from threading import Thread
    zthread = Thread(target = zmq_thread, args = [socket], daemon= True, name='zmq')
    zthread.start()

    # real code of course does some more work locally and sends changing messages.
    while True:
        socket.send_string("FOO BAR")

Traceback, if applicable

No response

More info

Last time I checked, it did not fail if run without pypy

@minrk
Copy link
Member

minrk commented Oct 22, 2024

libzmq sockets are not thread safe. You must not use a socket from multiple threads at once unless you are careful to use a lock around all socket methods.

@Sec42
Copy link
Author

Sec42 commented Oct 22, 2024

I'm not sure I understand how you are supposed to receive the (asynchronous) notification about clients connecting/disconnecting with pyzmq then? Can you give an example on how to do it correctly?

Thanks

@minrk
Copy link
Member

minrk commented Oct 22, 2024

you can use pollers or an event loop such as asyncio. Here's an asyncio version of your script that adds a subscriber so you can see the subscription messages are coming in while sending is happening:

import asyncio
from threading import Thread
import time

import zmq.asyncio

# for now
log = print

async def log_subscribers(socket):
    try:
        while True:
            event = await socket.recv()
             # Event is one byte 0=unsub or 1=sub, followed by topic
            if event[0] == 1:
                log("new subscriber for", event[1:])
            elif event[0] == 0:
                log("unsubscribed",event[1:])

    except zmq.ContextTerminated:
        pass

def subscriber_main(url):
    with zmq.Context() as ctx:
        for i in range(10):
            with ctx.socket(zmq.SUB) as sub:
                sub.connect(url)
                topic = f"topic{i}"
                sub.subscribe(topic)
                time.sleep(0.1)
                sub.unsubscribe(topic)
                time.sleep(0.1)

async def main():
    url = "tcp://127.0.0.1:4223"

    context = zmq.asyncio.Context()
    socket = context.socket(zmq.XPUB)
    socket.setsockopt(zmq.XPUB_VERBOSE, True)
    socket.bind(url)
    # spawn 'thread' but it's a coroutine in the same thread
    asyncio.create_task(log_subscribers(socket))
    
    # spawn subscribers in an actual thread, so this is a complete demo
    subscriber_thread = Thread(target=subscriber_main, args=(url,), daemon=True)
    subscriber_thread.start()

    # real code of course does some more work locally and sends changing messages.
    while subscriber_thread.is_alive():
        log("sending")
        await socket.send_string("FOO BAR")
        await asyncio.sleep(0.1)

if __name__ == "__main__":
    asyncio.run(main())

@Sec42
Copy link
Author

Sec42 commented Oct 23, 2024

Thank you very much for your example. As far as I understand it, this will in effect check/poll the socket after each line is sent, right? I will do some tests based on your example.

@minrk
Copy link
Member

minrk commented Oct 24, 2024

Approximately, yeah. It's not quite polling, but everywhere you see an await is an opportunity for another coroutine to take over and advance to its next await if whatever it was waiting for has become ready. This is sometimes called "cooperative multitasking" because if you write code that has no awaits no other tasks will be run, unlike threads which can be fully concurrent except when locks (such as the GIL) are held.

It is event-driven, so the log_subscribers coroutine will sleep until an event arrives indicating that the socket has something to receive. Similarly, if send is unavailable, the send_string will sleep until send can proceed.

@Sec42
Copy link
Author

Sec42 commented Nov 4, 2024

Again, thank you for your example, however I couldn't get the asyncio version to work for my usecase.
I made an alternative solution that runs a thread that does

poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)
poller.poll()

and when that returns, signals the main thread to call

socket.recv()

It seems like this works without crashing. My question is, is that acceptable from the zmq "thread safety" standpoint?

@minrk minrk changed the title BUG: pypy pyzmq randomly fails in asserts crash when socket used in multiple threads Nov 5, 2024
@minrk minrk added the question label Nov 5, 2024
@minrk
Copy link
Member

minrk commented Nov 5, 2024

If one thread only does polling, it is probably okay. I can't say for sure. The most important thing is that send/recv/set/getsockopt/close are not called from multiple threads concurrently. You may get a crash if your poller thread is still running when close is called in the main thread, for example.

@Sec42
Copy link
Author

Sec42 commented Nov 17, 2024

To update you: no, it is not okay, it stil crashed every once in a while.
I have given up trying to create a fancy solution and added

poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)

and do an

if len(poller.poll(0))>0:

in my main processing loop to check for events.

This costs me a few % performance, but at least it's stable.

Thanks for your help.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants