Skip to content

Commit

Permalink
Add new trio.to_thread module for running worker threads
Browse files Browse the repository at this point in the history
For consistency with trio.from_thread, and to give us a place for
future extensions, like utilities for pushing context managers into
threads.

See python-triogh-810.
  • Loading branch information
njsmith committed Jul 27, 2019
1 parent a9bb934 commit 1768aea
Show file tree
Hide file tree
Showing 26 changed files with 176 additions and 180 deletions.
2 changes: 1 addition & 1 deletion docs/source/history.rst
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,7 @@ CPython, or PyPy3 5.9+.
Other changes
~~~~~~~~~~~~~

* :func:`run_sync_in_thread` now has a :ref:`robust mechanism
* ``run_sync_in_worker_thread`` now has a :ref:`robust mechanism
for applying capacity limits to the number of concurrent threads
<worker-thread-limiting>` (`#10
<https://github.com/python-trio/trio/issues/170>`__, `#57
Expand Down
36 changes: 21 additions & 15 deletions docs/source/reference-core.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1471,7 +1471,7 @@ In acknowledgment of this reality, Trio provides two useful utilities
for working with real, operating-system level,
:mod:`threading`\-module-style threads. First, if you're in Trio but
need to push some blocking I/O into a thread, there's
:func:`run_sync_in_thread`. And if you're in a thread and need
`trio.to_thread.run_sync`. And if you're in a thread and need
to communicate back with Trio, you can use
:func:`trio.from_thread.run` and :func:`trio.from_thread.run_sync`.

Expand All @@ -1494,7 +1494,7 @@ are spawned and the system gets overloaded and crashes. Instead, the N
threads start executing the first N jobs, while the other
(100,000 - N) jobs sit in a queue and wait their turn. Which is
generally what you want, and this is how
:func:`trio.run_sync_in_thread` works by default.
:func:`trio.to_thread.run_sync` works by default.

The downside of this kind of thread pool is that sometimes, you need
more sophisticated logic for controlling how many threads are run at
Expand Down Expand Up @@ -1541,16 +1541,16 @@ re-using threads, but has no admission control policy: if you give it
responsible for providing the policy to make sure that this doesn't
happen – but since it *only* has to worry about policy, it can be much
simpler. In fact, all there is to it is the ``limiter=`` argument
passed to :func:`run_sync_in_thread`. This defaults to a global
passed to :func:`trio.to_thread.run_sync`. This defaults to a global
:class:`CapacityLimiter` object, which gives us the classic fixed-size
thread pool behavior. (See
:func:`current_default_thread_limiter`.) But if you want to use
"separate pools" for type A jobs and type B jobs, then it's just a
matter of creating two separate :class:`CapacityLimiter` objects and
passing them in when running these jobs. Or here's an example of
defining a custom policy that respects the global thread limit, while
making sure that no individual user can use more than 3 threads at a
time::
:func:`trio.to_thread.current_default_thread_limiter`.) But if you
want to use "separate pools" for type A jobs and type B jobs, then
it's just a matter of creating two separate :class:`CapacityLimiter`
objects and passing them in when running these jobs. Or here's an
example of defining a custom policy that respects the global thread
limit, while making sure that no individual user can use more than 3
threads at a time::

class CombinedLimiter:
def __init__(self, first, second):
Expand Down Expand Up @@ -1594,19 +1594,24 @@ time::
return combined_limiter


async def run_in_worker_thread_for_user(user_id, async_fn, *args, **kwargs):
# *args belong to async_fn; **kwargs belong to run_sync_in_thread
async def run_sync_in_thread_for_user(user_id, sync_fn, *args):
kwargs["limiter"] = get_user_limiter(user_id)
return await trio.run_sync_in_thread(asycn_fn, *args, **kwargs)
return await trio.to_thread.run_sync(asycn_fn, *args)


.. module:: trio.to_thread
.. currentmodule:: trio

Putting blocking I/O into worker threads
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

.. autofunction:: run_sync_in_thread
.. autofunction:: trio.to_thread.run_sync

.. autofunction:: trio.to_thread.current_default_thread_limiter

.. autofunction:: current_default_thread_limiter

.. module:: trio.from_thread
.. currentmodule:: trio

Getting back into the Trio thread from another thread
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Expand All @@ -1615,6 +1620,7 @@ Getting back into the Trio thread from another thread

.. autofunction:: trio.from_thread.run_sync


This will probably be clearer with an example. Here we demonstrate how
to spawn a child thread, and then use a :ref:`memory channel
<channels>` to send messages between the thread and a Trio task:
Expand Down
2 changes: 1 addition & 1 deletion docs/source/reference-core/from-thread-example.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ async def main():
# In a background thread, run:
# thread_fn(portal, receive_from_trio, send_to_trio)
nursery.start_soon(
trio.run_sync_in_thread, thread_fn, receive_from_trio, send_to_trio
trio.to_thread.run_sync, thread_fn, receive_from_trio, send_to_trio
)

# prints "1"
Expand Down
4 changes: 2 additions & 2 deletions docs/source/reference-hazmat.rst
Original file line number Diff line number Diff line change
Expand Up @@ -403,8 +403,8 @@ This logic is a bit convoluted, but accomplishes all of the following:
loop outside of the ``except BlockingIOError:`` block.

These functions can also be useful in other situations. For example,
when :func:`trio.run_sync_in_thread` schedules some work to run
in a worker thread, it blocks until the work is finished (so it's a
when :func:`trio.to_thread.run_sync` schedules some work to run in a
worker thread, it blocks until the work is finished (so it's a
schedule point), but by default it doesn't allow cancellation. So to
make sure that the call always acts as a checkpoint, it calls
:func:`checkpoint_if_cancelled` before starting the thread.
Expand Down
2 changes: 1 addition & 1 deletion docs/source/reference-io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ To understand why, you need to know two things.
First, right now no mainstream operating system offers a generic,
reliable, native API for async file or filesystem operations, so we
have to fake it by using threads (specifically,
:func:`run_sync_in_thread`). This is cheap but isn't free: on a
:func:`trio.to_thread.run_sync`). This is cheap but isn't free: on a
typical PC, dispatching to a worker thread adds something like ~100 µs
of overhead to each operation. ("µs" is pronounced "microseconds", and
there are 1,000,000 µs in a second. Note that all the numbers here are
Expand Down
10 changes: 5 additions & 5 deletions newsfragments/810.removal.rst
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
``run_sync_in_worker_thread`` was too much of a mouthful – now it's
just called `run_sync_in_thread` (though the old name still works with
a deprecation warning, for now). Similarly,
``current_default_worker_thread_limiter`` is becoming
`current_default_thread_limiter`.
``run_sync_in_worker_thread`` has become `trio.in_thread.run_sync`, in
order to make it shorter, and more consistent with the new
``trio.from_thread``. And ``current_default_worker_thread_limiter`` is
now `trio.to_thread.current_default_thread_limiter`. (Of course the
old names still work with a deprecation warning, for now.)
2 changes: 1 addition & 1 deletion notes-to-self/blocking-read-hack.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ async def kill_it_after_timeout(new_fd):
async with trio.open_nursery() as nursery:
nursery.start_soon(kill_it_after_timeout, new_fd)
try:
data = await trio.run_sync_in_thread(os.read, new_fd, count)
data = await trio.to_thread.run_sync(os.read, new_fd, count)
except OSError as exc:
if cancel_requested and exc.errno == errno.ENOTCONN:
# Call was successfully cancelled. In a real version we'd
Expand Down
2 changes: 1 addition & 1 deletion notes-to-self/thread-dispatch-bench.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# minimal a fashion as possible.
#
# This is useful to get a sense of the *lower-bound* cost of
# run_sync_in_thread
# trio.to_thread.run_sync

import threading
from queue import Queue
Expand Down
12 changes: 7 additions & 5 deletions trio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
Event, CapacityLimiter, Semaphore, Lock, StrictFIFOLock, Condition
)

from ._threads import (run_sync_in_thread, current_default_thread_limiter)
from ._threads import BlockingTrioPortal as _BlockingTrioPortal

from ._highlevel_generic import aclose_forcefully, StapledStream
Expand Down Expand Up @@ -67,12 +66,14 @@

from ._deprecate import TrioDeprecationWarning

# Imported by default
# Submodules imported by default
from . import hazmat
from . import socket
from . import abc
from . import from_thread
# Not imported by default: testing
from . import to_thread
# Not imported by default, but mentioned here so static analysis tools like
# pylint will know that it exists.
if False:
from . import testing

Expand Down Expand Up @@ -104,13 +105,13 @@
),
"run_sync_in_worker_thread":
_deprecate.DeprecatedAttribute(
run_sync_in_thread,
to_thread.run_sync,
"0.12.0",
issue=810,
),
"current_default_worker_thread_limiter":
_deprecate.DeprecatedAttribute(
current_default_thread_limiter,
to_thread.current_default_thread_limiter,
"0.12.0",
issue=810,
),
Expand Down Expand Up @@ -163,6 +164,7 @@
fixup_module_metadata(socket.__name__, socket.__dict__)
fixup_module_metadata(abc.__name__, abc.__dict__)
fixup_module_metadata(from_thread.__name__, from_thread.__dict__)
fixup_module_metadata(to_thread.__name__, to_thread.__dict__)
fixup_module_metadata(__name__ + ".ssl", _deprecated_ssl_reexports.__dict__)
fixup_module_metadata(
__name__ + ".subprocess", _deprecated_subprocess_reexports.__dict__
Expand Down
8 changes: 4 additions & 4 deletions trio/_core/_entry_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,10 @@ class TrioToken:
This object has two uses:
1. It lets you re-enter the Trio run loop from external threads or signal
handlers. This is the low-level primitive that
:func:`trio.run_sync_in_thread` uses to receive results from
worker threads, that :func:`trio.open_signal_receiver` uses to receive
notifications about signals, and so forth.
handlers. This is the low-level primitive that :func:`trio.to_thread`
and `trio.from_thread` use to communicate with worker threads, that
`trio.open_signal_receiver` uses to receive notifications about
signals, and so forth.
2. Each call to :func:`trio.run` has exactly one associated
:class:`TrioToken` object, so you can use it to identify a particular
Expand Down
2 changes: 1 addition & 1 deletion trio/_core/_traps.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def abort_func(raise_cancel):
At that point there are again two possibilities. You can simply ignore
the cancellation altogether: wait for the operation to complete and
then reschedule and continue as normal. (For example, this is what
:func:`trio.run_sync_in_thread` does if cancellation is disabled.)
:func:`trio.to_thread.run_sync` does if cancellation is disabled.)
The other possibility is that the ``abort_func`` does succeed in
cancelling the operation, but for some reason isn't able to report that
right away. (Example: on Windows, it's possible to request that an
Expand Down
4 changes: 2 additions & 2 deletions trio/_core/tests/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

from .tutil import check_sequence_matches, gc_collect_harder
from ... import _core
from ..._threads import run_sync_in_thread
from ..._threads import to_thread_run_sync
from ..._timeouts import sleep, fail_after
from ..._util import aiter_compat
from ...testing import (
Expand Down Expand Up @@ -552,7 +552,7 @@ async def test_cancel_scope_repr(mock_clock):
scope.deadline = _core.current_time() + 10
assert "deadline is 10.00 seconds from now" in repr(scope)
# when not in async context, can't get the current time
assert "deadline" not in await run_sync_in_thread(repr, scope)
assert "deadline" not in await to_thread_run_sync(repr, scope)
scope.cancel()
assert "cancelled" in repr(scope)
assert "exited" in repr(scope)
Expand Down
10 changes: 5 additions & 5 deletions trio/_file_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
class AsyncIOWrapper(AsyncResource):
"""A generic :class:`~io.IOBase` wrapper that implements the :term:`asynchronous
file object` interface. Wrapped methods that could block are executed in
:meth:`trio.run_sync_in_thread`.
:meth:`trio.to_thread.run_sync`.
All properties and methods defined in in :mod:`~io` are exposed by this
wrapper, if they exist in the wrapped file object.
Expand All @@ -80,7 +80,7 @@ def __getattr__(self, name):
@async_wraps(self.__class__, self._wrapped.__class__, name)
async def wrapper(*args, **kwargs):
func = partial(meth, *args, **kwargs)
return await trio.run_sync_in_thread(func)
return await trio.to_thread.run_sync(func)

# cache the generated method
setattr(self, name, wrapper)
Expand Down Expand Up @@ -115,7 +115,7 @@ async def detach(self):
"""

raw = await trio.run_sync_in_thread(self._wrapped.detach)
raw = await trio.to_thread.run_sync(self._wrapped.detach)
return wrap_file(raw)

async def aclose(self):
Expand All @@ -128,7 +128,7 @@ async def aclose(self):

# ensure the underling file is closed during cancellation
with trio.CancelScope(shield=True):
await trio.run_sync_in_thread(self._wrapped.close)
await trio.to_thread.run_sync(self._wrapped.close)

await trio.hazmat.checkpoint_if_cancelled()

Expand Down Expand Up @@ -165,7 +165,7 @@ async def open_file(
file = fspath(file)

_file = wrap_file(
await trio.run_sync_in_thread(
await trio.to_thread.run_sync(
io.open, file, mode, buffering, encoding, errors, newline, closefd,
opener
)
Expand Down
10 changes: 5 additions & 5 deletions trio/_path.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def iter_wrapper_factory(cls, meth_name):
async def wrapper(self, *args, **kwargs):
meth = getattr(self._wrapped, meth_name)
func = partial(meth, *args, **kwargs)
items = await trio.run_sync_in_thread(func)
items = await trio.to_thread.run_sync(func)
return (rewrap_path(item) for item in items)

return wrapper
Expand All @@ -70,7 +70,7 @@ async def wrapper(self, *args, **kwargs):
args = unwrap_paths(args)
meth = getattr(self._wrapped, meth_name)
func = partial(meth, *args, **kwargs)
value = await trio.run_sync_in_thread(func)
value = await trio.to_thread.run_sync(func)
return rewrap_path(value)

return wrapper
Expand All @@ -83,7 +83,7 @@ async def wrapper(cls, *args, **kwargs):
args = unwrap_paths(args)
meth = getattr(cls._wraps, meth_name)
func = partial(meth, *args, **kwargs)
value = await trio.run_sync_in_thread(func)
value = await trio.to_thread.run_sync(func)
return rewrap_path(value)

return wrapper
Expand Down Expand Up @@ -145,7 +145,7 @@ def generate_iter(cls, attrs):

class Path(metaclass=AsyncAutoWrapperType):
"""A :class:`pathlib.Path` wrapper that executes blocking methods in
:meth:`trio.run_sync_in_thread`.
:meth:`trio.to_thread.run_sync`.
"""

Expand Down Expand Up @@ -185,7 +185,7 @@ async def open(self, *args, **kwargs):
"""

func = partial(self._wrapped.open, *args, **kwargs)
value = await trio.run_sync_in_thread(func)
value = await trio.to_thread.run_sync(func)
return trio.wrap_file(value)


Expand Down
9 changes: 4 additions & 5 deletions trio/_socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import idna as _idna

import trio
from ._threads import run_sync_in_thread
from ._util import fspath
from . import _core

Expand Down Expand Up @@ -179,7 +178,7 @@ def numeric_only_failure(exc):
if hr is not None:
return await hr.getaddrinfo(host, port, family, type, proto, flags)
else:
return await run_sync_in_thread(
return await trio.to_thread.run_sync(
_stdlib_socket.getaddrinfo,
host,
port,
Expand All @@ -205,7 +204,7 @@ async def getnameinfo(sockaddr, flags):
if hr is not None:
return await hr.getnameinfo(sockaddr, flags)
else:
return await run_sync_in_thread(
return await trio.to_thread.run_sync(
_stdlib_socket.getnameinfo, sockaddr, flags, cancellable=True
)

Expand All @@ -216,7 +215,7 @@ async def getprotobyname(name):
Like :func:`socket.getprotobyname`, but async.
"""
return await run_sync_in_thread(
return await trio.to_thread.run_sync(
_stdlib_socket.getprotobyname, name, cancellable=True
)

Expand Down Expand Up @@ -464,7 +463,7 @@ async def bind(self, address):
):
# Use a thread for the filesystem traversal (unless it's an
# abstract domain socket)
return await run_sync_in_thread(self._sock.bind, address)
return await trio.to_thread.run_sync(self._sock.bind, address)
else:
# POSIX actually says that bind can return EWOULDBLOCK and
# complete asynchronously, like connect. But in practice AFAICT
Expand Down
4 changes: 2 additions & 2 deletions trio/_subprocess_platform/waitid.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from .. import _core, _subprocess
from .._sync import CapacityLimiter, Event
from .._threads import run_sync_in_thread
from .._threads import to_thread_run_sync

try:
from os import waitid
Expand Down Expand Up @@ -74,7 +74,7 @@ async def _waitid_system_task(pid: int, event: Event) -> None:
# call to trio.run is shutting down.

try:
await run_sync_in_thread(
await to_thread_run_sync(
sync_wait_reapable,
pid,
cancellable=True,
Expand Down
Loading

0 comments on commit 1768aea

Please sign in to comment.