1 22
import threading
2 22
import queue as stdlib_queue
3 22
from itertools import count
4

5 22
import attr
6 22
import inspect
7 22
import outcome
8

9 22
import trio
10

11 22
from ._sync import CapacityLimiter
12 22
from ._core import (
13
    enable_ki_protection,
14
    disable_ki_protection,
15
    RunVar,
16
    TrioToken,
17
    start_thread_soon,
18
)
19 22
from ._util import coroutine_or_error
20

21
# Global due to Threading API, thread local storage for trio token
22 22
TOKEN_LOCAL = threading.local()
23

24 22
_limiter_local = RunVar("limiter")
25
# I pulled this number out of the air; it isn't based on anything. Probably we
26
# should make some kind of measurements to pick a good value.
27 22
DEFAULT_LIMIT = 40
28 22
_thread_counter = count()
29

30

31 22
def current_default_thread_limiter():
32
    """Get the default `~trio.CapacityLimiter` used by
33
    `trio.to_thread.run_sync`.
34

35
    The most common reason to call this would be if you want to modify its
36
    :attr:`~trio.CapacityLimiter.total_tokens` attribute.
37

38
    """
39 22
    try:
40 22
        limiter = _limiter_local.get()
41 22
    except LookupError:
42 22
        limiter = CapacityLimiter(DEFAULT_LIMIT)
43 22
        _limiter_local.set(limiter)
44 22
    return limiter
45

46

47
# Eventually we might build this into a full-fledged deadlock-detection
48
# system; see https://github.com/python-trio/trio/issues/182
49
# But for now we just need an object to stand in for the thread, so we can
50
# keep track of who's holding the CapacityLimiter's token.
51 22
@attr.s(frozen=True, eq=False, hash=False)
52 9
class ThreadPlaceholder:
53 22
    name = attr.ib()
54

55

56 22
@enable_ki_protection
57 22
async def to_thread_run_sync(sync_fn, *args, cancellable=False, limiter=None):
58
    """Convert a blocking operation into an async operation using a thread.
59

60
    These two lines are equivalent::
61

62
        sync_fn(*args)
63
        await trio.to_thread.run_sync(sync_fn, *args)
64

65
    except that if ``sync_fn`` takes a long time, then the first line will
66
    block the Trio loop while it runs, while the second line allows other Trio
67
    tasks to continue working while ``sync_fn`` runs. This is accomplished by
68
    pushing the call to ``sync_fn(*args)`` off into a worker thread.
69

70
    From inside the worker thread, you can get back into Trio using the
71
    functions in `trio.from_thread`.
72

73
    Args:
74
      sync_fn: An arbitrary synchronous callable.
75
      *args: Positional arguments to pass to sync_fn. If you need keyword
76
          arguments, use :func:`functools.partial`.
77
      cancellable (bool): Whether to allow cancellation of this operation. See
78
          discussion below.
79
      limiter (None, or CapacityLimiter-like object):
80
          An object used to limit the number of simultaneous threads. Most
81
          commonly this will be a `~trio.CapacityLimiter`, but it could be
82
          anything providing compatible
83
          :meth:`~trio.CapacityLimiter.acquire_on_behalf_of` and
84
          :meth:`~trio.CapacityLimiter.release_on_behalf_of` methods. This
85
          function will call ``acquire_on_behalf_of`` before starting the
86
          thread, and ``release_on_behalf_of`` after the thread has finished.
87

88
          If None (the default), uses the default `~trio.CapacityLimiter`, as
89
          returned by :func:`current_default_thread_limiter`.
90

91
    **Cancellation handling**: Cancellation is a tricky issue here, because
92
    neither Python nor the operating systems it runs on provide any general
93
    mechanism for cancelling an arbitrary synchronous function running in a
94
    thread. This function will always check for cancellation on entry, before
95
    starting the thread. But once the thread is running, there are two ways it
96
    can handle being cancelled:
97

98
    * If ``cancellable=False``, the function ignores the cancellation and
99
      keeps going, just like if we had called ``sync_fn`` synchronously. This
100
      is the default behavior.
101

102
    * If ``cancellable=True``, then this function immediately raises
103
      `~trio.Cancelled`. In this case **the thread keeps running in
104
      background** – we just abandon it to do whatever it's going to do, and
105
      silently discard any return value or errors that it raises. Only use
106
      this if you know that the operation is safe and side-effect free. (For
107
      example: :func:`trio.socket.getaddrinfo` uses a thread with
108
      ``cancellable=True``, because it doesn't really affect anything if a
109
      stray hostname lookup keeps running in the background.)
110

111
      The ``limiter`` is only released after the thread has *actually*
112
      finished – which in the case of cancellation may be some time after this
113
      function has returned. If :func:`trio.run` finishes before the thread
114
      does, then the limiter release method will never be called at all.
115

116
    .. warning::
117

118
       You should not use this function to call long-running CPU-bound
119
       functions! In addition to the usual GIL-related reasons why using
120
       threads for CPU-bound work is not very effective in Python, there is an
121
       additional problem: on CPython, `CPU-bound threads tend to "starve out"
122
       IO-bound threads <https://bugs.python.org/issue7946>`__, so using
123
       threads for CPU-bound work is likely to adversely affect the main
124
       thread running Trio. If you need to do this, you're better off using a
125
       worker process, or perhaps PyPy (which still has a GIL, but may do a
126
       better job of fairly allocating CPU time between threads).
127

128
    Returns:
129
      Whatever ``sync_fn(*args)`` returns.
130

131
    Raises:
132
      Exception: Whatever ``sync_fn(*args)`` raises.
133

134
    """
135 22
    await trio.lowlevel.checkpoint_if_cancelled()
136 22
    if limiter is None:
137 22
        limiter = current_default_thread_limiter()
138

139
    # Holds a reference to the task that's blocked in this function waiting
140
    # for the result – or None if this function was cancelled and we should
141
    # discard the result.
142 22
    task_register = [trio.lowlevel.current_task()]
143 22
    name = f"trio.to_thread.run_sync-{next(_thread_counter)}"
144 22
    placeholder = ThreadPlaceholder(name)
145

146
    # This function gets scheduled into the Trio run loop to deliver the
147
    # thread's result.
148 22
    def report_back_in_trio_thread_fn(result):
149 22
        def do_release_then_return_result():
150
            # release_on_behalf_of is an arbitrary user-defined method, so it
151
            # might raise an error. If it does, we want that error to
152
            # replace the regular return value, and if the regular return was
153
            # already an exception then we want them to chain.
154 22
            try:
155 22
                return result.unwrap()
156
            finally:
157 22
                limiter.release_on_behalf_of(placeholder)
158

159 22
        result = outcome.capture(do_release_then_return_result)
160 22
        if task_register[0] is not None:
161 22
            trio.lowlevel.reschedule(task_register[0], result)
162

163 22
    current_trio_token = trio.lowlevel.current_trio_token()
164

165 22
    def worker_fn():
166 22
        TOKEN_LOCAL.token = current_trio_token
167 22
        try:
168 22
            ret = sync_fn(*args)
169

170 22
            if inspect.iscoroutine(ret):
171
                # Manually close coroutine to avoid RuntimeWarnings
172 22
                ret.close()
173 22
                raise TypeError(
174
                    "Trio expected a sync function, but {!r} appears to be "
175
                    "asynchronous".format(getattr(sync_fn, "__qualname__", sync_fn))
176
                )
177

178 22
            return ret
179
        finally:
180 22
            del TOKEN_LOCAL.token
181

182 22
    def deliver_worker_fn_result(result):
183 22
        try:
184 22
            current_trio_token.run_sync_soon(report_back_in_trio_thread_fn, result)
185 22
        except trio.RunFinishedError:
186
            # The entire run finished, so the task we're trying to contact is
187
            # certainly long gone -- it must have been cancelled and abandoned
188
            # us.
189 22
            pass
190

191 22
    await limiter.acquire_on_behalf_of(placeholder)
192 22
    try:
193 22
        start_thread_soon(worker_fn, deliver_worker_fn_result)
194 22
    except:
195 22
        limiter.release_on_behalf_of(placeholder)
196 22
        raise
197

198 22
    def abort(_):
199 22
        if cancellable:
200 22
            task_register[0] = None
201 22
            return trio.lowlevel.Abort.SUCCEEDED
202
        else:
203 22
            return trio.lowlevel.Abort.FAILED
204

205 22
    return await trio.lowlevel.wait_task_rescheduled(abort)
206

207

208 22
def _run_fn_as_system_task(cb, fn, *args, trio_token=None):
209
    """Helper function for from_thread.run and from_thread.run_sync.
210

211
    Since this internally uses TrioToken.run_sync_soon, all warnings about
212
    raised exceptions canceling all tasks should be noted.
213
    """
214

215 22
    if trio_token and not isinstance(trio_token, TrioToken):
216 22
        raise RuntimeError("Passed kwarg trio_token is not of type TrioToken")
217

218 22
    if not trio_token:
219 22
        try:
220 22
            trio_token = TOKEN_LOCAL.token
221 22
        except AttributeError:
222 22
            raise RuntimeError(
223
                "this thread wasn't created by Trio, pass kwarg trio_token=..."
224
            )
225

226
    # Avoid deadlock by making sure we're not called from Trio thread
227 22
    try:
228 22
        trio.lowlevel.current_task()
229 22
    except RuntimeError:
230 22
        pass
231
    else:
232 22
        raise RuntimeError("this is a blocking function; call it from a thread")
233

234 22
    q = stdlib_queue.Queue()
235 22
    trio_token.run_sync_soon(cb, q, fn, args)
236 22
    return q.get().unwrap()
237

238

239 22
def from_thread_run(afn, *args, trio_token=None):
240
    """Run the given async function in the parent Trio thread, blocking until it
241
    is complete.
242

243
    Returns:
244
      Whatever ``afn(*args)`` returns.
245

246
    Returns or raises whatever the given function returns or raises. It
247
    can also raise exceptions of its own:
248

249
    Raises:
250
        RunFinishedError: if the corresponding call to :func:`trio.run` has
251
            already completed.
252
        Cancelled: if the corresponding call to :func:`trio.run` completes
253
            while ``afn(*args)`` is running, then ``afn`` is likely to raise
254
            :exc:`trio.Cancelled`, and this will propagate out into
255
        RuntimeError: if you try calling this from inside the Trio thread,
256
            which would otherwise cause a deadlock.
257
        AttributeError: if no ``trio_token`` was provided, and we can't infer
258
            one from context.
259
        TypeError: if ``afn`` is not an asynchronous function.
260

261
    **Locating a Trio Token**: There are two ways to specify which
262
    `trio.run` loop to reenter:
263

264
        - Spawn this thread from `trio.to_thread.run_sync`. Trio will
265
          automatically capture the relevant Trio token and use it when you
266
          want to re-enter Trio.
267
        - Pass a keyword argument, ``trio_token`` specifiying a specific
268
          `trio.run` loop to re-enter. This is useful in case you have a
269
          "foreign" thread, spawned using some other framework, and still want
270
          to enter Trio.
271
    """
272

273 22
    def callback(q, afn, args):
274 22
        @disable_ki_protection
275 9
        async def unprotected_afn():
276 22
            coro = coroutine_or_error(afn, *args)
277 22
            return await coro
278

279 22
        async def await_in_trio_thread_task():
280 22
            q.put_nowait(await outcome.acapture(unprotected_afn))
281

282 22
        trio.lowlevel.spawn_system_task(await_in_trio_thread_task, name=afn)
283

284 22
    return _run_fn_as_system_task(callback, afn, *args, trio_token=trio_token)
285

286

287 22
def from_thread_run_sync(fn, *args, trio_token=None):
288
    """Run the given sync function in the parent Trio thread, blocking until it
289
    is complete.
290

291
    Returns:
292
      Whatever ``fn(*args)`` returns.
293

294
    Returns or raises whatever the given function returns or raises. It
295
    can also raise exceptions of its own:
296

297
    Raises:
298
        RunFinishedError: if the corresponding call to `trio.run` has
299
            already completed.
300
        RuntimeError: if you try calling this from inside the Trio thread,
301
            which would otherwise cause a deadlock.
302
        AttributeError: if no ``trio_token`` was provided, and we can't infer
303
            one from context.
304
        TypeError: if ``fn`` is an async function.
305

306
    **Locating a Trio Token**: There are two ways to specify which
307
    `trio.run` loop to reenter:
308

309
        - Spawn this thread from `trio.to_thread.run_sync`. Trio will
310
          automatically capture the relevant Trio token and use it when you
311
          want to re-enter Trio.
312
        - Pass a keyword argument, ``trio_token`` specifiying a specific
313
          `trio.run` loop to re-enter. This is useful in case you have a
314
          "foreign" thread, spawned using some other framework, and still want
315
          to enter Trio.
316
    """
317

318 22
    def callback(q, fn, args):
319 22
        @disable_ki_protection
320 9
        def unprotected_fn():
321 22
            ret = fn(*args)
322

323 22
            if inspect.iscoroutine(ret):
324
                # Manually close coroutine to avoid RuntimeWarnings
325 22
                ret.close()
326 22
                raise TypeError(
327
                    "Trio expected a sync function, but {!r} appears to be "
328
                    "asynchronous".format(getattr(fn, "__qualname__", fn))
329
                )
330

331 22
            return ret
332

333 22
        res = outcome.capture(unprotected_fn)
334 22
        q.put_nowait(res)
335

336 22
    return _run_fn_as_system_task(callback, fn, *args, trio_token=trio_token)

Read our documentation on viewing source code .

Loading