Stop using RawGit URL in PyPI
1 | 22 |
import threading |
2 | 22 |
import queue as stdlib_queue |
3 | 22 |
from itertools import count |
4 |
|
|
5 | 22 |
import attr |
6 | 22 |
import inspect |
7 | 22 |
import outcome |
8 |
|
|
9 | 22 |
import trio |
10 |
|
|
11 | 22 |
from ._sync import CapacityLimiter |
12 | 22 |
from ._core import ( |
13 |
enable_ki_protection, |
|
14 |
disable_ki_protection, |
|
15 |
RunVar, |
|
16 |
TrioToken, |
|
17 |
start_thread_soon, |
|
18 |
)
|
|
19 | 22 |
from ._util import coroutine_or_error |
20 |
|
|
21 |
# Global due to Threading API, thread local storage for trio token
|
|
22 | 22 |
TOKEN_LOCAL = threading.local() |
23 |
|
|
24 | 22 |
_limiter_local = RunVar("limiter") |
25 |
# I pulled this number out of the air; it isn't based on anything. Probably we
|
|
26 |
# should make some kind of measurements to pick a good value.
|
|
27 | 22 |
DEFAULT_LIMIT = 40 |
28 | 22 |
_thread_counter = count() |
29 |
|
|
30 |
|
|
31 | 22 |
def current_default_thread_limiter(): |
32 |
"""Get the default `~trio.CapacityLimiter` used by
|
|
33 |
`trio.to_thread.run_sync`.
|
|
34 |
|
|
35 |
The most common reason to call this would be if you want to modify its
|
|
36 |
:attr:`~trio.CapacityLimiter.total_tokens` attribute.
|
|
37 |
|
|
38 |
"""
|
|
39 | 22 |
try: |
40 | 22 |
limiter = _limiter_local.get() |
41 | 22 |
except LookupError: |
42 | 22 |
limiter = CapacityLimiter(DEFAULT_LIMIT) |
43 | 22 |
_limiter_local.set(limiter) |
44 | 22 |
return limiter |
45 |
|
|
46 |
|
|
47 |
# Eventually we might build this into a full-fledged deadlock-detection
|
|
48 |
# system; see https://github.com/python-trio/trio/issues/182
|
|
49 |
# But for now we just need an object to stand in for the thread, so we can
|
|
50 |
# keep track of who's holding the CapacityLimiter's token.
|
|
51 |
@attr.s(frozen=True, eq=False, hash=False) |
|
52 | 9 |
class ThreadPlaceholder: |
53 | 22 |
name = attr.ib() |
54 |
|
|
55 |
|
|
56 | 22 |
@enable_ki_protection
|
57 | 22 |
async def to_thread_run_sync(sync_fn, *args, cancellable=False, limiter=None): |
58 |
"""Convert a blocking operation into an async operation using a thread.
|
|
59 |
|
|
60 |
These two lines are equivalent::
|
|
61 |
|
|
62 |
sync_fn(*args)
|
|
63 |
await trio.to_thread.run_sync(sync_fn, *args)
|
|
64 |
|
|
65 |
except that if ``sync_fn`` takes a long time, then the first line will
|
|
66 |
block the Trio loop while it runs, while the second line allows other Trio
|
|
67 |
tasks to continue working while ``sync_fn`` runs. This is accomplished by
|
|
68 |
pushing the call to ``sync_fn(*args)`` off into a worker thread.
|
|
69 |
|
|
70 |
From inside the worker thread, you can get back into Trio using the
|
|
71 |
functions in `trio.from_thread`.
|
|
72 |
|
|
73 |
Args:
|
|
74 |
sync_fn: An arbitrary synchronous callable.
|
|
75 |
*args: Positional arguments to pass to sync_fn. If you need keyword
|
|
76 |
arguments, use :func:`functools.partial`.
|
|
77 |
cancellable (bool): Whether to allow cancellation of this operation. See
|
|
78 |
discussion below.
|
|
79 |
limiter (None, or CapacityLimiter-like object):
|
|
80 |
An object used to limit the number of simultaneous threads. Most
|
|
81 |
commonly this will be a `~trio.CapacityLimiter`, but it could be
|
|
82 |
anything providing compatible
|
|
83 |
:meth:`~trio.CapacityLimiter.acquire_on_behalf_of` and
|
|
84 |
:meth:`~trio.CapacityLimiter.release_on_behalf_of` methods. This
|
|
85 |
function will call ``acquire_on_behalf_of`` before starting the
|
|
86 |
thread, and ``release_on_behalf_of`` after the thread has finished.
|
|
87 |
|
|
88 |
If None (the default), uses the default `~trio.CapacityLimiter`, as
|
|
89 |
returned by :func:`current_default_thread_limiter`.
|
|
90 |
|
|
91 |
**Cancellation handling**: Cancellation is a tricky issue here, because
|
|
92 |
neither Python nor the operating systems it runs on provide any general
|
|
93 |
mechanism for cancelling an arbitrary synchronous function running in a
|
|
94 |
thread. This function will always check for cancellation on entry, before
|
|
95 |
starting the thread. But once the thread is running, there are two ways it
|
|
96 |
can handle being cancelled:
|
|
97 |
|
|
98 |
* If ``cancellable=False``, the function ignores the cancellation and
|
|
99 |
keeps going, just like if we had called ``sync_fn`` synchronously. This
|
|
100 |
is the default behavior.
|
|
101 |
|
|
102 |
* If ``cancellable=True``, then this function immediately raises
|
|
103 |
`~trio.Cancelled`. In this case **the thread keeps running in
|
|
104 |
background** – we just abandon it to do whatever it's going to do, and
|
|
105 |
silently discard any return value or errors that it raises. Only use
|
|
106 |
this if you know that the operation is safe and side-effect free. (For
|
|
107 |
example: :func:`trio.socket.getaddrinfo` uses a thread with
|
|
108 |
``cancellable=True``, because it doesn't really affect anything if a
|
|
109 |
stray hostname lookup keeps running in the background.)
|
|
110 |
|
|
111 |
The ``limiter`` is only released after the thread has *actually*
|
|
112 |
finished – which in the case of cancellation may be some time after this
|
|
113 |
function has returned. If :func:`trio.run` finishes before the thread
|
|
114 |
does, then the limiter release method will never be called at all.
|
|
115 |
|
|
116 |
.. warning::
|
|
117 |
|
|
118 |
You should not use this function to call long-running CPU-bound
|
|
119 |
functions! In addition to the usual GIL-related reasons why using
|
|
120 |
threads for CPU-bound work is not very effective in Python, there is an
|
|
121 |
additional problem: on CPython, `CPU-bound threads tend to "starve out"
|
|
122 |
IO-bound threads <https://bugs.python.org/issue7946>`__, so using
|
|
123 |
threads for CPU-bound work is likely to adversely affect the main
|
|
124 |
thread running Trio. If you need to do this, you're better off using a
|
|
125 |
worker process, or perhaps PyPy (which still has a GIL, but may do a
|
|
126 |
better job of fairly allocating CPU time between threads).
|
|
127 |
|
|
128 |
Returns:
|
|
129 |
Whatever ``sync_fn(*args)`` returns.
|
|
130 |
|
|
131 |
Raises:
|
|
132 |
Exception: Whatever ``sync_fn(*args)`` raises.
|
|
133 |
|
|
134 |
"""
|
|
135 | 22 |
await trio.lowlevel.checkpoint_if_cancelled() |
136 |
if limiter is None: |
|
137 | 22 |
limiter = current_default_thread_limiter() |
138 |
|
|
139 |
# Holds a reference to the task that's blocked in this function waiting
|
|
140 |
# for the result – or None if this function was cancelled and we should
|
|
141 |
# discard the result.
|
|
142 | 22 |
task_register = [trio.lowlevel.current_task()] |
143 | 22 |
name = f"trio.to_thread.run_sync-{next(_thread_counter)}" |
144 | 22 |
placeholder = ThreadPlaceholder(name) |
145 |
|
|
146 |
# This function gets scheduled into the Trio run loop to deliver the
|
|
147 |
# thread's result.
|
|
148 | 22 |
def report_back_in_trio_thread_fn(result): |
149 | 22 |
def do_release_then_return_result(): |
150 |
# release_on_behalf_of is an arbitrary user-defined method, so it
|
|
151 |
# might raise an error. If it does, we want that error to
|
|
152 |
# replace the regular return value, and if the regular return was
|
|
153 |
# already an exception then we want them to chain.
|
|
154 | 22 |
try: |
155 |
return result.unwrap() |
|
156 |
finally: |
|
157 | 22 |
limiter.release_on_behalf_of(placeholder) |
158 |
|
|
159 | 22 |
result = outcome.capture(do_release_then_return_result) |
160 |
if task_register[0] is not None: |
|
161 | 22 |
trio.lowlevel.reschedule(task_register[0], result) |
162 |
|
|
163 | 22 |
current_trio_token = trio.lowlevel.current_trio_token() |
164 |
|
|
165 | 22 |
def worker_fn(): |
166 | 22 |
TOKEN_LOCAL.token = current_trio_token |
167 | 22 |
try: |
168 | 22 |
ret = sync_fn(*args) |
169 |
|
|
170 |
if inspect.iscoroutine(ret): |
|
171 |
# Manually close coroutine to avoid RuntimeWarnings
|
|
172 | 22 |
ret.close() |
173 | 22 |
raise TypeError( |
174 |
"Trio expected a sync function, but {!r} appears to be "
|
|
175 |
"asynchronous".format(getattr(sync_fn, "__qualname__", sync_fn)) |
|
176 |
)
|
|
177 |
|
|
178 |
return ret |
|
179 |
finally: |
|
180 |
del TOKEN_LOCAL.token |
|
181 |
|
|
182 | 22 |
def deliver_worker_fn_result(result): |
183 | 22 |
try: |
184 | 22 |
current_trio_token.run_sync_soon(report_back_in_trio_thread_fn, result) |
185 | 22 |
except trio.RunFinishedError: |
186 |
# The entire run finished, so the task we're trying to contact is
|
|
187 |
# certainly long gone -- it must have been cancelled and abandoned
|
|
188 |
# us.
|
|
189 | 22 |
pass
|
190 |
|
|
191 | 22 |
await limiter.acquire_on_behalf_of(placeholder) |
192 | 22 |
try: |
193 | 22 |
start_thread_soon(worker_fn, deliver_worker_fn_result) |
194 | 22 |
except: |
195 | 22 |
limiter.release_on_behalf_of(placeholder) |
196 | 22 |
raise
|
197 |
|
|
198 | 22 |
def abort(_): |
199 |
if cancellable: |
|
200 | 22 |
task_register[0] = None |
201 | 22 |
return trio.lowlevel.Abort.SUCCEEDED |
202 |
else: |
|
203 | 22 |
return trio.lowlevel.Abort.FAILED |
204 |
|
|
205 | 22 |
return await trio.lowlevel.wait_task_rescheduled(abort) |
206 |
|
|
207 |
|
|
208 | 22 |
def _run_fn_as_system_task(cb, fn, *args, trio_token=None): |
209 |
"""Helper function for from_thread.run and from_thread.run_sync.
|
|
210 |
|
|
211 |
Since this internally uses TrioToken.run_sync_soon, all warnings about
|
|
212 |
raised exceptions canceling all tasks should be noted.
|
|
213 |
"""
|
|
214 |
|
|
215 |
if trio_token and not isinstance(trio_token, TrioToken): |
|
216 | 22 |
raise RuntimeError("Passed kwarg trio_token is not of type TrioToken") |
217 |
|
|
218 |
if not trio_token: |
|
219 | 22 |
try: |
220 | 22 |
trio_token = TOKEN_LOCAL.token |
221 | 22 |
except AttributeError: |
222 | 22 |
raise RuntimeError( |
223 |
"this thread wasn't created by Trio, pass kwarg trio_token=..."
|
|
224 |
)
|
|
225 |
|
|
226 |
# Avoid deadlock by making sure we're not called from Trio thread
|
|
227 | 22 |
try: |
228 | 22 |
trio.lowlevel.current_task() |
229 | 22 |
except RuntimeError: |
230 | 22 |
pass
|
231 |
else: |
|
232 | 22 |
raise RuntimeError("this is a blocking function; call it from a thread") |
233 |
|
|
234 | 22 |
q = stdlib_queue.Queue() |
235 | 22 |
trio_token.run_sync_soon(cb, q, fn, args) |
236 | 22 |
return q.get().unwrap() |
237 |
|
|
238 |
|
|
239 | 22 |
def from_thread_run(afn, *args, trio_token=None): |
240 |
"""Run the given async function in the parent Trio thread, blocking until it
|
|
241 |
is complete.
|
|
242 |
|
|
243 |
Returns:
|
|
244 |
Whatever ``afn(*args)`` returns.
|
|
245 |
|
|
246 |
Returns or raises whatever the given function returns or raises. It
|
|
247 |
can also raise exceptions of its own:
|
|
248 |
|
|
249 |
Raises:
|
|
250 |
RunFinishedError: if the corresponding call to :func:`trio.run` has
|
|
251 |
already completed.
|
|
252 |
Cancelled: if the corresponding call to :func:`trio.run` completes
|
|
253 |
while ``afn(*args)`` is running, then ``afn`` is likely to raise
|
|
254 |
:exc:`trio.Cancelled`, and this will propagate out into
|
|
255 |
RuntimeError: if you try calling this from inside the Trio thread,
|
|
256 |
which would otherwise cause a deadlock.
|
|
257 |
AttributeError: if no ``trio_token`` was provided, and we can't infer
|
|
258 |
one from context.
|
|
259 |
TypeError: if ``afn`` is not an asynchronous function.
|
|
260 |
|
|
261 |
**Locating a Trio Token**: There are two ways to specify which
|
|
262 |
`trio.run` loop to reenter:
|
|
263 |
|
|
264 |
- Spawn this thread from `trio.to_thread.run_sync`. Trio will
|
|
265 |
automatically capture the relevant Trio token and use it when you
|
|
266 |
want to re-enter Trio.
|
|
267 |
- Pass a keyword argument, ``trio_token`` specifiying a specific
|
|
268 |
`trio.run` loop to re-enter. This is useful in case you have a
|
|
269 |
"foreign" thread, spawned using some other framework, and still want
|
|
270 |
to enter Trio.
|
|
271 |
"""
|
|
272 |
|
|
273 | 22 |
def callback(q, afn, args): |
274 | 22 |
@disable_ki_protection
|
275 | 9 |
async def unprotected_afn(): |
276 | 22 |
coro = coroutine_or_error(afn, *args) |
277 | 22 |
return await coro |
278 |
|
|
279 | 22 |
async def await_in_trio_thread_task(): |
280 | 22 |
q.put_nowait(await outcome.acapture(unprotected_afn)) |
281 |
|
|
282 | 22 |
trio.lowlevel.spawn_system_task(await_in_trio_thread_task, name=afn) |
283 |
|
|
284 | 22 |
return _run_fn_as_system_task(callback, afn, *args, trio_token=trio_token) |
285 |
|
|
286 |
|
|
287 | 22 |
def from_thread_run_sync(fn, *args, trio_token=None): |
288 |
"""Run the given sync function in the parent Trio thread, blocking until it
|
|
289 |
is complete.
|
|
290 |
|
|
291 |
Returns:
|
|
292 |
Whatever ``fn(*args)`` returns.
|
|
293 |
|
|
294 |
Returns or raises whatever the given function returns or raises. It
|
|
295 |
can also raise exceptions of its own:
|
|
296 |
|
|
297 |
Raises:
|
|
298 |
RunFinishedError: if the corresponding call to `trio.run` has
|
|
299 |
already completed.
|
|
300 |
RuntimeError: if you try calling this from inside the Trio thread,
|
|
301 |
which would otherwise cause a deadlock.
|
|
302 |
AttributeError: if no ``trio_token`` was provided, and we can't infer
|
|
303 |
one from context.
|
|
304 |
TypeError: if ``fn`` is an async function.
|
|
305 |
|
|
306 |
**Locating a Trio Token**: There are two ways to specify which
|
|
307 |
`trio.run` loop to reenter:
|
|
308 |
|
|
309 |
- Spawn this thread from `trio.to_thread.run_sync`. Trio will
|
|
310 |
automatically capture the relevant Trio token and use it when you
|
|
311 |
want to re-enter Trio.
|
|
312 |
- Pass a keyword argument, ``trio_token`` specifiying a specific
|
|
313 |
`trio.run` loop to re-enter. This is useful in case you have a
|
|
314 |
"foreign" thread, spawned using some other framework, and still want
|
|
315 |
to enter Trio.
|
|
316 |
"""
|
|
317 |
|
|
318 | 22 |
def callback(q, fn, args): |
319 | 22 |
@disable_ki_protection
|
320 | 9 |
def unprotected_fn(): |
321 | 22 |
ret = fn(*args) |
322 |
|
|
323 |
if inspect.iscoroutine(ret): |
|
324 |
# Manually close coroutine to avoid RuntimeWarnings
|
|
325 | 22 |
ret.close() |
326 | 22 |
raise TypeError( |
327 |
"Trio expected a sync function, but {!r} appears to be "
|
|
328 |
"asynchronous".format(getattr(fn, "__qualname__", fn)) |
|
329 |
)
|
|
330 |
|
|
331 | 22 |
return ret |
332 |
|
|
333 | 22 |
res = outcome.capture(unprotected_fn) |
334 | 22 |
q.put_nowait(res) |
335 |
|
|
336 | 22 |
return _run_fn_as_system_task(callback, fn, *args, trio_token=trio_token) |
Read our documentation on viewing source code .