Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Move to asyncio #642

Closed
wants to merge 3 commits into from
Closed

Conversation

chsasank
Copy link
Contributor

@chsasank chsasank commented Jun 7, 2021

Starting a draft of move to asyncio discussed earlier in #527 #639. This is work in progress, meant to show how we can move and take feedback.

Will add in comments where I got the inspiration to refactor something in that certain way.

@chsasank
Copy link
Contributor Author

chsasank commented Jun 7, 2021

AssociationSocket looks like StreamReader and StreamWriter from asyncio.streams. See code from python source here: https://github.com/python/cpython/blob/3.9/Lib/asyncio/streams.py

@scaramallion
Copy link
Member

Any change to the transport service should support TLS v1.3 as well

@chsasank
Copy link
Contributor Author

chsasank commented Jun 7, 2021

Correct. For now ignoring TLS for the sake of PoC.

# SO_REUSEADDR: reuse the socket in TIME_WAIT state without
# waiting for its natural timeout to expire
# Allows local address reuse
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure how important these socket options are

class AssociationSocket:
"""A wrapper for a `socket
<https://docs.python.org/3/library/socket.html#socket-objects>`_ object.
class AssociationStream:
Copy link
Contributor Author

@chsasank chsasank Jun 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took inspiration from asyncio.StreamReader and asyncio.StreamWriter.

# SO_REUSEADDR: reuse the socket in TIME_WAIT state without
# waiting for its natural timeout to expire
# Allows local address reuse
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure how important these socket options are.

@@ -384,52 +217,32 @@ def tls_args(self, tls_args):
self._tls_args = tls_args


class RequestHandler(BaseRequestHandler):
"""Connection request handler for the ``AssociationServer``.
class AssociationProtocol(asyncio.Protocol):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took inspiration from asyncio.StreamReaderProtocol

@@ -646,81 +475,6 @@ def get_handlers(self, event):

return self._handlers[event]

def get_request(self):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note how I removed all the over rides of TCPServer. Instead made it into a protocol factory which can be passed to asyncio.start_server.

Echo server example is here: https://docs.python.org/3/library/asyncio-stream.html#tcp-echo-server-using-streams

@chsasank
Copy link
Contributor Author

chsasank commented Jun 7, 2021

@scaramallion Added general picture of how transport with asyncio looks like. IMO this was the hardest part. Let me know if this approach sounds good. cc: @arunkant

def local(self):
"""Return a 2-tuple of the local server's ``(host, port)`` address."""
return self.server.server_address
asyncio.create_task(self._assoc.start())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note how association reactor is a asyncio task. This is inspired by line in asyncio.streams here: https://github.com/python/cpython/blob/89e50ab36fac6a0e7f1998501f36fcd2872a6604/Lib/asyncio/streams.py#L244

@chsasank
Copy link
Contributor Author

chsasank commented Jun 7, 2021

Also any ideas on testing plan? This is going to be pretty comprehensive refactor. So it's best to test modules as we write them up. I've had to hard time figuring this out because everything seems to be depending on everything else.

@chsasank
Copy link
Contributor Author

chsasank commented Jun 8, 2021

On @arunkant's suggestion, I moved to AssociationProtocol to subclass StreamReaderProtocol. Removed transport from AssociationStream.

@SimonBiggs
Copy link
Contributor

Might there be value in creating a staged approach? As in, having a pynetdicom.asyncio submodule that for now just provides an async wrapper around the current threaded server. This submodule naming approach is utilised by other libraries:

https://tqdm.github.io/docs/asyncio/

Then, overtime, the asyncio module can iteratively become more and more "asyncio pure", while allowing for small PRs to make the changes as time goes on.

@chsasank
Copy link
Contributor Author

chsasank commented Aug 13, 2021

Sounds like a good idea but I have no clue how to do this incrementally. Once I moved transport to asyncio there doesn't seem any option other than migrating everything to asyncio paradigm.

@SimonBiggs
Copy link
Contributor

SimonBiggs commented Aug 13, 2021

@chsasank threads and async can work together hand in hand. You lose a lot of the benefit of async, but, refactoring is best done in small units, so having this occur piece by piece with the goal for full async one day, will result in a much more reliable and maintainable refactor.

Here is a good tutorial on using threading and async together:

https://www.roguelynn.com/words/asyncio-sync-and-threaded/

And here is some example code that starts a pynetdicom server, but utilises an asyncio loop for all of the event handling:

# Copyright (C) 2021 Radiotherapy AI Pty Ltd

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at

#     http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
from typing import Union

import aiologger
import aiologger.formatters.base
import aiologger.levels
import pynetdicom
import pynetdicom.events
import pynetdicom.pdu_primitives

logger = aiologger.Logger.with_default_handlers(
    name=__name__,
    level=aiologger.levels.LogLevel.DEBUG,
    formatter=aiologger.formatters.base.Formatter(
        fmt="%(asctime)s.%(msecs)d %(levelname)s: %(message)s", datefmt="%H:%M:%S"
    ),
)

EventTuple = Union[pynetdicom.evt.NotificationEvent, pynetdicom.evt.InterventionEvent]
Primitive = Union[
    pynetdicom.pdu_primitives.A_ABORT,
    pynetdicom.pdu_primitives.A_ASSOCIATE,
    pynetdicom.pdu_primitives.A_P_ABORT,
    pynetdicom.pdu_primitives.A_RELEASE,
    pynetdicom.pdu_primitives.P_DATA,
]


def main(port: int):
    loop = asyncio.get_event_loop()
    loop.set_debug(True)

    loop.run_until_complete(_start_pynetdicom_server(port, loop))
    loop.run_forever()


async def _start_pynetdicom_server(port: int, loop: asyncio.AbstractEventLoop):
    def start_server():
        ae = _build_ae()
        handlers = _build_handlers(loop)
        ae.start_server(("", port), block=True, evt_handlers=handlers)

    asyncio.create_task(asyncio.to_thread(start_server))
    await logger.info("pynetdicom server initialised")


def _build_ae():
    ae = pynetdicom.AE()

    ae.network_timeout = None
    ae.acse_timeout = None
    ae.dimse_timeout = None
    ae.maximum_pdu_size = 0

    storage_sop_classes = [
        cx.abstract_syntax for cx in pynetdicom.AllStoragePresentationContexts
    ]
    verification_sop_classes = [
        cx.abstract_syntax for cx in pynetdicom.VerificationPresentationContexts
    ]
    sop_classes = storage_sop_classes + verification_sop_classes

    for uid in sop_classes:
        ae.add_supported_context(uid, pynetdicom.ALL_TRANSFER_SYNTAXES)

    return ae


def _build_handlers(loop: asyncio.AbstractEventLoop):
    events = [
        pynetdicom.evt.EVT_ESTABLISHED,
        pynetdicom.evt.EVT_C_STORE,
        pynetdicom.evt.EVT_ACSE_RECV,
    ]

    handlers = [(event, _logging_handler, [loop]) for event in events]

    return handlers


def _logging_handler(event: pynetdicom.events.Event, loop: asyncio.AbstractEventLoop):
    event_tuple: EventTuple = event.event
    name = event_tuple.name
    description = event_tuple.description
    _logger(loop, f"{name} | {description}")

    if event.event == pynetdicom.evt.EVT_ACSE_RECV:
        primitive: Primitive = event.primitive
        if isinstance(primitive, pynetdicom.pdu_primitives.A_RELEASE):
            _logger(loop, "About to close the association")

    return 0x0000


def _logger(loop: asyncio.AbstractEventLoop, msg: str):
    async def log():
        await logger.info(msg)

    asyncio.run_coroutine_threadsafe(log(), loop)

The key component is the following line:

asyncio.run_coroutine_threadsafe(log(), loop)

Essentially, the "asyncio" version of pynetdicom could just start off as a range of asyncio wrappers around the current threaded code. With none of the internals changed. Then, in separate PRs, stage by stage, small subsections of pynetdicom can change from having parts of its code "taken out of the wrapper" and moved up into pure asyncio. But all the while keeping the user facing asyncio API the same. That way, you're free to make pynetdicom releases, and users can begin using pynetdicom.asyncio if they wish, but with the understanding that that portion of code is currently under rapid development.

I would expect that files like transport.py might actually be one of the very last files to change over. As this approach I am proposing would mean that code would be "asyncified" from the top down, instead of bottom up.

What do you think?

I would expect the stages would follow in the current order:

  • Leave all pynetdicom code as threaded, write an asyncio api via wrapper functions within the new module called pynetdicom.asyncio.
  • Build out a testing suite that exercises this wrapper code using it as it is intended to be used one day, entirely async
  • Select a component that is as close to the surface of the wrapper functions as possible (no other threaded code currently depends on it), then create an "async version" of that component, build out the tests covering this async component
    • It is likely that this "async version" will itself depend on threaded code, so in essence it will become a wrapper with some surrounding async glue code, and as time goes on the wrapper will "wrap" less and less code.
  • Rinse and repeat stage 3 stepping down through the code's dependency tree

@SimonBiggs
Copy link
Contributor

SimonBiggs commented Aug 16, 2021

Hi @scaramallion and @chsasank,

I have had some success with utilising asyncio.Queue to feed the pynetdicom handler events through to an asyncio loop. I made a little standalone example repo which can be installed with:

pip install git+https://github.com/RadiotherapyAI/aiopynetdicom

and then the server can be run with:

aiopynetdicom

Here is a link to the async code handling the pynetdicom events:

https://github.com/RadiotherapyAI/aiopynetdicom/blob/525a08e1e506f860797c6b41be160164d0d914af/aiopynetdicom/_consume.py#L35-L50

I only just wrote this today so it might have some issues, but figured given it looked like a promising approach it might be nice for you guys to see it and it might give some ideas for the initial pynetdicom asyncio wrapper.

Cheers 🙂,
Simon

@scaramallion
Copy link
Member

Stale, closing

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants