1
# coding: utf-8
2

3 27
import threading
4 27
import queue as stdlib_queue
5 27
from itertools import count
6

7 27
import attr
8 27
import inspect
9 27
import outcome
10

11 27
import trio
12

13 27
from ._sync import CapacityLimiter
14 27
from ._core import (
15
    enable_ki_protection,
16
    disable_ki_protection,
17
    RunVar,
18
    TrioToken,
19
    start_thread_soon,
20
)
21 27
from ._util import coroutine_or_error
22

23
# Global due to Threading API, thread local storage for trio token
24 27
TOKEN_LOCAL = threading.local()
25

26 27
_limiter_local = RunVar("limiter")
27
# I pulled this number out of the air; it isn't based on anything. Probably we
28
# should make some kind of measurements to pick a good value.
29 27
DEFAULT_LIMIT = 40
30 27
_thread_counter = count()
31

32

33 27
def current_default_thread_limiter():
34
    """Get the default `~trio.CapacityLimiter` used by
35
    `trio.to_thread.run_sync`.
36

37
    The most common reason to call this would be if you want to modify its
38
    :attr:`~trio.CapacityLimiter.total_tokens` attribute.
39

40
    """
41 27
    try:
42 27
        limiter = _limiter_local.get()
43 27
    except LookupError:
44 27
        limiter = CapacityLimiter(DEFAULT_LIMIT)
45 27
        _limiter_local.set(limiter)
46 27
    return limiter
47

48

49
# Eventually we might build this into a full-fledged deadlock-detection
50
# system; see https://github.com/python-trio/trio/issues/182
51
# But for now we just need an object to stand in for the thread, so we can
52
# keep track of who's holding the CapacityLimiter's token.
53 27
@attr.s(frozen=True, eq=False, hash=False)
54 13
class ThreadPlaceholder:
55 27
    name = attr.ib()
56

57

58 27
@enable_ki_protection
59 27
async def to_thread_run_sync(sync_fn, *args, cancellable=False, limiter=None):
60
    """Convert a blocking operation into an async operation using a thread.
61

62
    These two lines are equivalent::
63

64
        sync_fn(*args)
65
        await trio.to_thread.run_sync(sync_fn, *args)
66

67
    except that if ``sync_fn`` takes a long time, then the first line will
68
    block the Trio loop while it runs, while the second line allows other Trio
69
    tasks to continue working while ``sync_fn`` runs. This is accomplished by
70
    pushing the call to ``sync_fn(*args)`` off into a worker thread.
71

72
    From inside the worker thread, you can get back into Trio using the
73
    functions in `trio.from_thread`.
74

75
    Args:
76
      sync_fn: An arbitrary synchronous callable.
77
      *args: Positional arguments to pass to sync_fn. If you need keyword
78
          arguments, use :func:`functools.partial`.
79
      cancellable (bool): Whether to allow cancellation of this operation. See
80
          discussion below.
81
      limiter (None, or CapacityLimiter-like object):
82
          An object used to limit the number of simultaneous threads. Most
83
          commonly this will be a `~trio.CapacityLimiter`, but it could be
84
          anything providing compatible
85
          :meth:`~trio.CapacityLimiter.acquire_on_behalf_of` and
86
          :meth:`~trio.CapacityLimiter.release_on_behalf_of` methods. This
87
          function will call ``acquire_on_behalf_of`` before starting the
88
          thread, and ``release_on_behalf_of`` after the thread has finished.
89

90
          If None (the default), uses the default `~trio.CapacityLimiter`, as
91
          returned by :func:`current_default_thread_limiter`.
92

93
    **Cancellation handling**: Cancellation is a tricky issue here, because
94
    neither Python nor the operating systems it runs on provide any general
95
    mechanism for cancelling an arbitrary synchronous function running in a
96
    thread. This function will always check for cancellation on entry, before
97
    starting the thread. But once the thread is running, there are two ways it
98
    can handle being cancelled:
99

100
    * If ``cancellable=False``, the function ignores the cancellation and
101
      keeps going, just like if we had called ``sync_fn`` synchronously. This
102
      is the default behavior.
103

104
    * If ``cancellable=True``, then this function immediately raises
105
      `~trio.Cancelled`. In this case **the thread keeps running in
106
      background** – we just abandon it to do whatever it's going to do, and
107
      silently discard any return value or errors that it raises. Only use
108
      this if you know that the operation is safe and side-effect free. (For
109
      example: :func:`trio.socket.getaddrinfo` uses a thread with
110
      ``cancellable=True``, because it doesn't really affect anything if a
111
      stray hostname lookup keeps running in the background.)
112

113
      The ``limiter`` is only released after the thread has *actually*
114
      finished – which in the case of cancellation may be some time after this
115
      function has returned. If :func:`trio.run` finishes before the thread
116
      does, then the limiter release method will never be called at all.
117

118
    .. warning::
119

120
       You should not use this function to call long-running CPU-bound
121
       functions! In addition to the usual GIL-related reasons why using
122
       threads for CPU-bound work is not very effective in Python, there is an
123
       additional problem: on CPython, `CPU-bound threads tend to "starve out"
124
       IO-bound threads <https://bugs.python.org/issue7946>`__, so using
125
       threads for CPU-bound work is likely to adversely affect the main
126
       thread running Trio. If you need to do this, you're better off using a
127
       worker process, or perhaps PyPy (which still has a GIL, but may do a
128
       better job of fairly allocating CPU time between threads).
129

130
    Returns:
131
      Whatever ``sync_fn(*args)`` returns.
132

133
    Raises:
134
      Exception: Whatever ``sync_fn(*args)`` raises.
135

136
    """
137 27
    await trio.lowlevel.checkpoint_if_cancelled()
138 27
    if limiter is None:
139 27
        limiter = current_default_thread_limiter()
140

141
    # Holds a reference to the task that's blocked in this function waiting
142
    # for the result – or None if this function was cancelled and we should
143
    # discard the result.
144 27
    task_register = [trio.lowlevel.current_task()]
145 27
    name = f"trio.to_thread.run_sync-{next(_thread_counter)}"
146 27
    placeholder = ThreadPlaceholder(name)
147

148
    # This function gets scheduled into the Trio run loop to deliver the
149
    # thread's result.
150 27
    def report_back_in_trio_thread_fn(result):
151 27
        def do_release_then_return_result():
152
            # release_on_behalf_of is an arbitrary user-defined method, so it
153
            # might raise an error. If it does, we want that error to
154
            # replace the regular return value, and if the regular return was
155
            # already an exception then we want them to chain.
156 27
            try:
157 27
                return result.unwrap()
158
            finally:
159 27
                limiter.release_on_behalf_of(placeholder)
160

161 27
        result = outcome.capture(do_release_then_return_result)
162 27
        if task_register[0] is not None:
163 27
            trio.lowlevel.reschedule(task_register[0], result)
164

165 27
    current_trio_token = trio.lowlevel.current_trio_token()
166

167 27
    def worker_fn():
168 27
        TOKEN_LOCAL.token = current_trio_token
169 27
        try:
170 27
            ret = sync_fn(*args)
171

172 27
            if inspect.iscoroutine(ret):
173
                # Manually close coroutine to avoid RuntimeWarnings
174 27
                ret.close()
175 27
                raise TypeError(
176
                    "Trio expected a sync function, but {!r} appears to be "
177
                    "asynchronous".format(getattr(sync_fn, "__qualname__", sync_fn))
178
                )
179

180 27
            return ret
181
        finally:
182 27
            del TOKEN_LOCAL.token
183

184 27
    def deliver_worker_fn_result(result):
185 27
        try:
186 27
            current_trio_token.run_sync_soon(report_back_in_trio_thread_fn, result)
187 27
        except trio.RunFinishedError:
188
            # The entire run finished, so the task we're trying to contact is
189
            # certainly long gone -- it must have been cancelled and abandoned
190
            # us.
191 27
            pass
192

193 27
    await limiter.acquire_on_behalf_of(placeholder)
194 27
    try:
195 27
        start_thread_soon(worker_fn, deliver_worker_fn_result)
196 27
    except:
197 27
        limiter.release_on_behalf_of(placeholder)
198 27
        raise
199

200 27
    def abort(_):
201 27
        if cancellable:
202 27
            task_register[0] = None
203 27
            return trio.lowlevel.Abort.SUCCEEDED
204
        else:
205 27
            return trio.lowlevel.Abort.FAILED
206

207 27
    return await trio.lowlevel.wait_task_rescheduled(abort)
208

209

210 27
def _run_fn_as_system_task(cb, fn, *args, trio_token=None):
211
    """Helper function for from_thread.run and from_thread.run_sync.
212

213
    Since this internally uses TrioToken.run_sync_soon, all warnings about
214
    raised exceptions canceling all tasks should be noted.
215
    """
216

217 27
    if trio_token and not isinstance(trio_token, TrioToken):
218 27
        raise RuntimeError("Passed kwarg trio_token is not of type TrioToken")
219

220 27
    if not trio_token:
221 27
        try:
222 27
            trio_token = TOKEN_LOCAL.token
223 27
        except AttributeError:
224 27
            raise RuntimeError(
225
                "this thread wasn't created by Trio, pass kwarg trio_token=..."
226
            )
227

228
    # Avoid deadlock by making sure we're not called from Trio thread
229 27
    try:
230 27
        trio.lowlevel.current_task()
231 27
    except RuntimeError:
232 27
        pass
233
    else:
234 27
        raise RuntimeError("this is a blocking function; call it from a thread")
235

236 27
    q = stdlib_queue.Queue()
237 27
    trio_token.run_sync_soon(cb, q, fn, args)
238 27
    return q.get().unwrap()
239

240

241 27
def from_thread_run(afn, *args, trio_token=None):
242
    """Run the given async function in the parent Trio thread, blocking until it
243
    is complete.
244

245
    Returns:
246
      Whatever ``afn(*args)`` returns.
247

248
    Returns or raises whatever the given function returns or raises. It
249
    can also raise exceptions of its own:
250

251
    Raises:
252
        RunFinishedError: if the corresponding call to :func:`trio.run` has
253
            already completed, or if the run has started its final cleanup phase
254
            and can no longer spawn new system tasks.
255
        Cancelled: if the corresponding call to :func:`trio.run` completes
256
            while ``afn(*args)`` is running, then ``afn`` is likely to raise
257
            :exc:`trio.Cancelled`, and this will propagate out into
258
        RuntimeError: if you try calling this from inside the Trio thread,
259
            which would otherwise cause a deadlock.
260
        AttributeError: if no ``trio_token`` was provided, and we can't infer
261
            one from context.
262
        TypeError: if ``afn`` is not an asynchronous function.
263

264
    **Locating a Trio Token**: There are two ways to specify which
265
    `trio.run` loop to reenter:
266

267
        - Spawn this thread from `trio.to_thread.run_sync`. Trio will
268
          automatically capture the relevant Trio token and use it when you
269
          want to re-enter Trio.
270
        - Pass a keyword argument, ``trio_token`` specifiying a specific
271
          `trio.run` loop to re-enter. This is useful in case you have a
272
          "foreign" thread, spawned using some other framework, and still want
273
          to enter Trio.
274
    """
275

276 27
    def callback(q, afn, args):
277 27
        @disable_ki_protection
278 13
        async def unprotected_afn():
279 27
            coro = coroutine_or_error(afn, *args)
280 27
            return await coro
281

282 27
        async def await_in_trio_thread_task():
283 27
            q.put_nowait(await outcome.acapture(unprotected_afn))
284

285 27
        try:
286 27
            trio.lowlevel.spawn_system_task(await_in_trio_thread_task, name=afn)
287 26
        except RuntimeError:  # system nursery is closed
288 26
            q.put_nowait(
289
                outcome.Error(trio.RunFinishedError("system nursery is closed"))
290
            )
291

292 27
    return _run_fn_as_system_task(callback, afn, *args, trio_token=trio_token)
293

294

295 27
def from_thread_run_sync(fn, *args, trio_token=None):
296
    """Run the given sync function in the parent Trio thread, blocking until it
297
    is complete.
298

299
    Returns:
300
      Whatever ``fn(*args)`` returns.
301

302
    Returns or raises whatever the given function returns or raises. It
303
    can also raise exceptions of its own:
304

305
    Raises:
306
        RunFinishedError: if the corresponding call to `trio.run` has
307
            already completed.
308
        RuntimeError: if you try calling this from inside the Trio thread,
309
            which would otherwise cause a deadlock.
310
        AttributeError: if no ``trio_token`` was provided, and we can't infer
311
            one from context.
312
        TypeError: if ``fn`` is an async function.
313

314
    **Locating a Trio Token**: There are two ways to specify which
315
    `trio.run` loop to reenter:
316

317
        - Spawn this thread from `trio.to_thread.run_sync`. Trio will
318
          automatically capture the relevant Trio token and use it when you
319
          want to re-enter Trio.
320
        - Pass a keyword argument, ``trio_token`` specifiying a specific
321
          `trio.run` loop to re-enter. This is useful in case you have a
322
          "foreign" thread, spawned using some other framework, and still want
323
          to enter Trio.
324
    """
325

326 27
    def callback(q, fn, args):
327 27
        @disable_ki_protection
328 13
        def unprotected_fn():
329 27
            ret = fn(*args)
330

331 27
            if inspect.iscoroutine(ret):
332
                # Manually close coroutine to avoid RuntimeWarnings
333 27
                ret.close()
334 27
                raise TypeError(
335
                    "Trio expected a sync function, but {!r} appears to be "
336
                    "asynchronous".format(getattr(fn, "__qualname__", fn))
337
                )
338

339 27
            return ret
340

341 27
        res = outcome.capture(unprotected_fn)
342 27
        q.put_nowait(res)
343

344 27
    return _run_fn_as_system_task(callback, fn, *args, trio_token=trio_token)

Read our documentation on viewing source code .

Loading