1 22
from collections import deque
2 22
import threading
3

4 22
import attr
5

6 22
from .. import _core
7 22
from .._util import NoPublicConstructor
8 22
from ._wakeup_socketpair import WakeupSocketpair
9

10

11 22
@attr.s(slots=True)
12 9
class EntryQueue:
13
    # This used to use a queue.Queue. but that was broken, because Queues are
14
    # implemented in Python, and not reentrant -- so it was thread-safe, but
15
    # not signal-safe. deque is implemented in C, so each operation is atomic
16
    # WRT threads (and this is guaranteed in the docs), AND each operation is
17
    # atomic WRT signal delivery (signal handlers can run on either side, but
18
    # not *during* a deque operation). dict makes similar guarantees - and on
19
    # CPython 3.6 and PyPy, it's even ordered!
20 22
    queue = attr.ib(factory=deque)
21 22
    idempotent_queue = attr.ib(factory=dict)
22

23 22
    wakeup = attr.ib(factory=WakeupSocketpair)
24 22
    done = attr.ib(default=False)
25
    # Must be a reentrant lock, because it's acquired from signal handlers.
26
    # RLock is signal-safe as of cpython 3.2. NB that this does mean that the
27
    # lock is effectively *disabled* when we enter from signal context. The
28
    # way we use the lock this is OK though, because when
29
    # run_sync_soon is called from a signal it's atomic WRT the
30
    # main thread -- it just might happen at some inconvenient place. But if
31
    # you look at the one place where the main thread holds the lock, it's
32
    # just to make 1 assignment, so that's atomic WRT a signal anyway.
33 22
    lock = attr.ib(factory=threading.RLock)
34

35 22
    async def task(self):
36 22
        assert _core.currently_ki_protected()
37
        # RLock has two implementations: a signal-safe version in _thread, and
38
        # and signal-UNsafe version in threading. We need the signal safe
39
        # version. Python 3.2 and later should always use this anyway, but,
40
        # since the symptoms if this goes wrong are just "weird rare
41
        # deadlocks", then let's make a little check.
42
        # See:
43
        #     https://bugs.python.org/issue13697#msg237140
44 22
        assert self.lock.__class__.__module__ == "_thread"
45

46 22
        def run_cb(job):
47
            # We run this with KI protection enabled; it's the callback's
48
            # job to disable it if it wants it disabled. Exceptions are
49
            # treated like system task exceptions (i.e., converted into
50
            # TrioInternalError and cause everything to shut down).
51 22
            sync_fn, args = job
52 22
            try:
53 22
                sync_fn(*args)
54 22
            except BaseException as exc:
55

56 22
                async def kill_everything(exc):
57 22
                    raise exc
58

59 22
                try:
60 22
                    _core.spawn_system_task(kill_everything, exc)
61 21
                except RuntimeError:
62
                    # We're quite late in the shutdown process and the
63
                    # system nursery is already closed.
64
                    # TODO(2020-06): this is a gross hack and should
65
                    # be fixed soon when we address #1607.
66 21
                    _core.current_task().parent_nursery.start_soon(kill_everything, exc)
67

68 22
            return True
69

70
        # This has to be carefully written to be safe in the face of new items
71
        # being queued while we iterate, and to do a bounded amount of work on
72
        # each pass:
73 22
        def run_all_bounded():
74 22
            for _ in range(len(self.queue)):
75 22
                run_cb(self.queue.popleft())
76 22
            for job in list(self.idempotent_queue):
77 22
                del self.idempotent_queue[job]
78 22
                run_cb(job)
79

80 22
        try:
81 13
            while True:
82 22
                run_all_bounded()
83 22
                if not self.queue and not self.idempotent_queue:
84 22
                    await self.wakeup.wait_woken()
85
                else:
86 22
                    await _core.checkpoint()
87 22
        except _core.Cancelled:
88
            # Keep the work done with this lock held as minimal as possible,
89
            # because it doesn't protect us against concurrent signal delivery
90
            # (see the comment above). Notice that this code would still be
91
            # correct if written like:
92
            #   self.done = True
93
            #   with self.lock:
94
            #       pass
95
            # because all we want is to force run_sync_soon
96
            # to either be completely before or completely after the write to
97
            # done. That's why we don't need the lock to protect
98
            # against signal handlers.
99 22
            with self.lock:
100 22
                self.done = True
101
            # No more jobs will be submitted, so just clear out any residual
102
            # ones:
103 22
            run_all_bounded()
104 22
            assert not self.queue
105 22
            assert not self.idempotent_queue
106

107 22
    def close(self):
108 22
        self.wakeup.close()
109

110 22
    def size(self):
111 22
        return len(self.queue) + len(self.idempotent_queue)
112

113 22
    def run_sync_soon(self, sync_fn, *args, idempotent=False):
114 22
        with self.lock:
115 22
            if self.done:
116 22
                raise _core.RunFinishedError("run() has exited")
117
            # We have to hold the lock all the way through here, because
118
            # otherwise the main thread might exit *while* we're doing these
119
            # calls, and then our queue item might not be processed, or the
120
            # wakeup call might trigger an OSError b/c the IO manager has
121
            # already been shut down.
122 22
            if idempotent:
123 22
                self.idempotent_queue[(sync_fn, args)] = None
124
            else:
125 22
                self.queue.append((sync_fn, args))
126 22
            self.wakeup.wakeup_thread_and_signal_safe()
127

128

129 22
class TrioToken(metaclass=NoPublicConstructor):
130
    """An opaque object representing a single call to :func:`trio.run`.
131

132
    It has no public constructor; instead, see :func:`current_trio_token`.
133

134
    This object has two uses:
135

136
    1. It lets you re-enter the Trio run loop from external threads or signal
137
       handlers. This is the low-level primitive that :func:`trio.to_thread`
138
       and `trio.from_thread` use to communicate with worker threads, that
139
       `trio.open_signal_receiver` uses to receive notifications about
140
       signals, and so forth.
141

142
    2. Each call to :func:`trio.run` has exactly one associated
143
       :class:`TrioToken` object, so you can use it to identify a particular
144
       call.
145

146
    """
147

148 22
    __slots__ = ("_reentry_queue",)
149

150 22
    def __init__(self, reentry_queue):
151 22
        self._reentry_queue = reentry_queue
152

153 22
    def run_sync_soon(self, sync_fn, *args, idempotent=False):
154
        """Schedule a call to ``sync_fn(*args)`` to occur in the context of a
155
        Trio task.
156

157
        This is safe to call from the main thread, from other threads, and
158
        from signal handlers. This is the fundamental primitive used to
159
        re-enter the Trio run loop from outside of it.
160

161
        The call will happen "soon", but there's no guarantee about exactly
162
        when, and no mechanism provided for finding out when it's happened.
163
        If you need this, you'll have to build your own.
164

165
        The call is effectively run as part of a system task (see
166
        :func:`~trio.lowlevel.spawn_system_task`). In particular this means
167
        that:
168

169
        * :exc:`KeyboardInterrupt` protection is *enabled* by default; if
170
          you want ``sync_fn`` to be interruptible by control-C, then you
171
          need to use :func:`~trio.lowlevel.disable_ki_protection`
172
          explicitly.
173

174
        * If ``sync_fn`` raises an exception, then it's converted into a
175
          :exc:`~trio.TrioInternalError` and *all* tasks are cancelled. You
176
          should be careful that ``sync_fn`` doesn't crash.
177

178
        All calls with ``idempotent=False`` are processed in strict
179
        first-in first-out order.
180

181
        If ``idempotent=True``, then ``sync_fn`` and ``args`` must be
182
        hashable, and Trio will make a best-effort attempt to discard any
183
        call submission which is equal to an already-pending call. Trio
184
        will process these in first-in first-out order.
185

186
        Any ordering guarantees apply separately to ``idempotent=False``
187
        and ``idempotent=True`` calls; there's no rule for how calls in the
188
        different categories are ordered with respect to each other.
189

190
        :raises trio.RunFinishedError:
191
              if the associated call to :func:`trio.run`
192
              has already exited. (Any call that *doesn't* raise this error
193
              is guaranteed to be fully processed before :func:`trio.run`
194
              exits.)
195

196
        """
197 22
        self._reentry_queue.run_sync_soon(sync_fn, *args, idempotent=idempotent)

Read our documentation on viewing source code .

Loading