1 27
import math
2

3 27
import attr
4 27
import outcome
5

6 27
import trio
7

8 27
from ._core import enable_ki_protection, ParkingLot
9 27
from ._deprecate import deprecated
10 27
from ._util import Final
11

12

13 27
@attr.s(repr=False, eq=False, hash=False)
14 27
class Event(metaclass=Final):
15
    """A waitable boolean value useful for inter-task synchronization,
16
    inspired by :class:`threading.Event`.
17

18
    An event object has an internal boolean flag, representing whether
19
    the event has happened yet. The flag is initially False, and the
20
    :meth:`wait` method waits until the flag is True. If the flag is
21
    already True, then :meth:`wait` returns immediately. (If the event has
22
    already happened, there's nothing to wait for.) The :meth:`set` method
23
    sets the flag to True, and wakes up any waiters.
24

25
    This behavior is useful because it helps avoid race conditions and
26
    lost wakeups: it doesn't matter whether :meth:`set` gets called just
27
    before or after :meth:`wait`. If you want a lower-level wakeup
28
    primitive that doesn't have this protection, consider :class:`Condition`
29
    or :class:`trio.lowlevel.ParkingLot`.
30

31
    .. note:: Unlike `threading.Event`, `trio.Event` has no
32
       `~threading.Event.clear` method. In Trio, once an `Event` has happened,
33
       it cannot un-happen. If you need to represent a series of events,
34
       consider creating a new `Event` object for each one (they're cheap!),
35
       or other synchronization methods like :ref:`channels <channels>` or
36
       `trio.lowlevel.ParkingLot`.
37

38
    """
39

40 27
    _lot = attr.ib(factory=ParkingLot, init=False)
41 27
    _flag = attr.ib(default=False, init=False)
42

43 27
    def is_set(self):
44
        """Return the current value of the internal flag."""
45 27
        return self._flag
46

47 27
    @enable_ki_protection
48 13
    def set(self):
49
        """Set the internal flag value to True, and wake any waiting tasks."""
50 27
        self._flag = True
51 27
        self._lot.unpark_all()
52

53 27
    async def wait(self):
54
        """Block until the internal flag value becomes True.
55

56
        If it's already True, then this method returns immediately.
57

58
        """
59 27
        if self._flag:
60 27
            await trio.lowlevel.checkpoint()
61
        else:
62 27
            await self._lot.park()
63

64 27
    def statistics(self):
65
        """Return an object containing debugging information.
66

67
        Currently the following fields are defined:
68

69
        * ``tasks_waiting``: The number of tasks blocked on this event's
70
          :meth:`wait` method.
71

72
        """
73 27
        return self._lot.statistics()
74

75

76 27
def async_cm(cls):
77 27
    @enable_ki_protection
78 13
    async def __aenter__(self):
79 27
        await self.acquire()
80

81 27
    __aenter__.__qualname__ = cls.__qualname__ + ".__aenter__"
82 27
    cls.__aenter__ = __aenter__
83

84 27
    @enable_ki_protection
85 13
    async def __aexit__(self, *args):
86 27
        self.release()
87

88 27
    __aexit__.__qualname__ = cls.__qualname__ + ".__aexit__"
89 27
    cls.__aexit__ = __aexit__
90 27
    return cls
91

92

93 27
@attr.s(frozen=True)
94 13
class _CapacityLimiterStatistics:
95 27
    borrowed_tokens = attr.ib()
96 27
    total_tokens = attr.ib()
97 27
    borrowers = attr.ib()
98 27
    tasks_waiting = attr.ib()
99

100

101 27
@async_cm
102 27
class CapacityLimiter(metaclass=Final):
103
    """An object for controlling access to a resource with limited capacity.
104

105
    Sometimes you need to put a limit on how many tasks can do something at
106
    the same time. For example, you might want to use some threads to run
107
    multiple blocking I/O operations in parallel... but if you use too many
108
    threads at once, then your system can become overloaded and it'll actually
109
    make things slower. One popular solution is to impose a policy like "run
110
    up to 40 threads at the same time, but no more". But how do you implement
111
    a policy like this?
112

113
    That's what :class:`CapacityLimiter` is for. You can think of a
114
    :class:`CapacityLimiter` object as a sack that starts out holding some fixed
115
    number of tokens::
116

117
       limit = trio.CapacityLimiter(40)
118

119
    Then tasks can come along and borrow a token out of the sack::
120

121
       # Borrow a token:
122
       async with limit:
123
           # We are holding a token!
124
           await perform_expensive_operation()
125
       # Exiting the 'async with' block puts the token back into the sack
126

127
    And crucially, if you try to borrow a token but the sack is empty, then
128
    you have to wait for another task to finish what it's doing and put its
129
    token back first before you can take it and continue.
130

131
    Another way to think of it: a :class:`CapacityLimiter` is like a sofa with a
132
    fixed number of seats, and if they're all taken then you have to wait for
133
    someone to get up before you can sit down.
134

135
    By default, :func:`trio.to_thread.run_sync` uses a
136
    :class:`CapacityLimiter` to limit the number of threads running at once;
137
    see `trio.to_thread.current_default_thread_limiter` for details.
138

139
    If you're familiar with semaphores, then you can think of this as a
140
    restricted semaphore that's specialized for one common use case, with
141
    additional error checking. For a more traditional semaphore, see
142
    :class:`Semaphore`.
143

144
    .. note::
145

146
       Don't confuse this with the `"leaky bucket"
147
       <https://en.wikipedia.org/wiki/Leaky_bucket>`__ or `"token bucket"
148
       <https://en.wikipedia.org/wiki/Token_bucket>`__ algorithms used to
149
       limit bandwidth usage on networks. The basic idea of using tokens to
150
       track a resource limit is similar, but this is a very simple sack where
151
       tokens aren't automatically created or destroyed over time; they're
152
       just borrowed and then put back.
153

154
    """
155

156 27
    def __init__(self, total_tokens):
157 27
        self._lot = ParkingLot()
158 27
        self._borrowers = set()
159
        # Maps tasks attempting to acquire -> borrower, to handle on-behalf-of
160 27
        self._pending_borrowers = {}
161
        # invoke the property setter for validation
162 27
        self.total_tokens = total_tokens
163 27
        assert self._total_tokens == total_tokens
164

165 27
    def __repr__(self):
166 27
        return "<trio.CapacityLimiter at {:#x}, {}/{} with {} waiting>".format(
167
            id(self), len(self._borrowers), self._total_tokens, len(self._lot)
168
        )
169

170 27
    @property
171 13
    def total_tokens(self):
172
        """The total capacity available.
173

174
        You can change :attr:`total_tokens` by assigning to this attribute. If
175
        you make it larger, then the appropriate number of waiting tasks will
176
        be woken immediately to take the new tokens. If you decrease
177
        total_tokens below the number of tasks that are currently using the
178
        resource, then all current tasks will be allowed to finish as normal,
179
        but no new tasks will be allowed in until the total number of tasks
180
        drops below the new total_tokens.
181

182
        """
183 27
        return self._total_tokens
184

185 27
    @total_tokens.setter
186 13
    def total_tokens(self, new_total_tokens):
187 27
        if not isinstance(new_total_tokens, int) and new_total_tokens != math.inf:
188 27
            raise TypeError("total_tokens must be an int or math.inf")
189 27
        if new_total_tokens < 1:
190 27
            raise ValueError("total_tokens must be >= 1")
191 27
        self._total_tokens = new_total_tokens
192 27
        self._wake_waiters()
193

194 27
    def _wake_waiters(self):
195 27
        available = self._total_tokens - len(self._borrowers)
196 27
        for woken in self._lot.unpark(count=available):
197 27
            self._borrowers.add(self._pending_borrowers.pop(woken))
198

199 27
    @property
200 13
    def borrowed_tokens(self):
201
        """The amount of capacity that's currently in use."""
202 27
        return len(self._borrowers)
203

204 27
    @property
205 13
    def available_tokens(self):
206
        """The amount of capacity that's available to use."""
207 27
        return self.total_tokens - self.borrowed_tokens
208

209 27
    @enable_ki_protection
210 13
    def acquire_nowait(self):
211
        """Borrow a token from the sack, without blocking.
212

213
        Raises:
214
          WouldBlock: if no tokens are available.
215
          RuntimeError: if the current task already holds one of this sack's
216
              tokens.
217

218
        """
219 27
        self.acquire_on_behalf_of_nowait(trio.lowlevel.current_task())
220

221 27
    @enable_ki_protection
222 13
    def acquire_on_behalf_of_nowait(self, borrower):
223
        """Borrow a token from the sack on behalf of ``borrower``, without
224
        blocking.
225

226
        Args:
227
          borrower: A :class:`trio.lowlevel.Task` or arbitrary opaque object
228
             used to record who is borrowing this token. This is used by
229
             :func:`trio.to_thread.run_sync` to allow threads to "hold
230
             tokens", with the intention in the future of using it to `allow
231
             deadlock detection and other useful things
232
             <https://github.com/python-trio/trio/issues/182>`__
233

234
        Raises:
235
          WouldBlock: if no tokens are available.
236
          RuntimeError: if ``borrower`` already holds one of this sack's
237
              tokens.
238

239
        """
240 27
        if borrower in self._borrowers:
241 27
            raise RuntimeError(
242
                "this borrower is already holding one of this "
243
                "CapacityLimiter's tokens"
244
            )
245 27
        if len(self._borrowers) < self._total_tokens and not self._lot:
246 27
            self._borrowers.add(borrower)
247
        else:
248 27
            raise trio.WouldBlock
249

250 27
    @enable_ki_protection
251 13
    async def acquire(self):
252
        """Borrow a token from the sack, blocking if necessary.
253

254
        Raises:
255
          RuntimeError: if the current task already holds one of this sack's
256
              tokens.
257

258
        """
259 27
        await self.acquire_on_behalf_of(trio.lowlevel.current_task())
260

261 27
    @enable_ki_protection
262 13
    async def acquire_on_behalf_of(self, borrower):
263
        """Borrow a token from the sack on behalf of ``borrower``, blocking if
264
        necessary.
265

266
        Args:
267
          borrower: A :class:`trio.lowlevel.Task` or arbitrary opaque object
268
             used to record who is borrowing this token; see
269
             :meth:`acquire_on_behalf_of_nowait` for details.
270

271
        Raises:
272
          RuntimeError: if ``borrower`` task already holds one of this sack's
273
             tokens.
274

275
        """
276 27
        await trio.lowlevel.checkpoint_if_cancelled()
277 27
        try:
278 27
            self.acquire_on_behalf_of_nowait(borrower)
279 27
        except trio.WouldBlock:
280 27
            task = trio.lowlevel.current_task()
281 27
            self._pending_borrowers[task] = borrower
282 27
            try:
283 27
                await self._lot.park()
284 27
            except trio.Cancelled:
285 27
                self._pending_borrowers.pop(task)
286 27
                raise
287
        else:
288 27
            await trio.lowlevel.cancel_shielded_checkpoint()
289

290 27
    @enable_ki_protection
291 13
    def release(self):
292
        """Put a token back into the sack.
293

294
        Raises:
295
          RuntimeError: if the current task has not acquired one of this
296
              sack's tokens.
297

298
        """
299 27
        self.release_on_behalf_of(trio.lowlevel.current_task())
300

301 27
    @enable_ki_protection
302 13
    def release_on_behalf_of(self, borrower):
303
        """Put a token back into the sack on behalf of ``borrower``.
304

305
        Raises:
306
          RuntimeError: if the given borrower has not acquired one of this
307
              sack's tokens.
308

309
        """
310 27
        if borrower not in self._borrowers:
311 27
            raise RuntimeError(
312
                "this borrower isn't holding any of this CapacityLimiter's tokens"
313
            )
314 27
        self._borrowers.remove(borrower)
315 27
        self._wake_waiters()
316

317 27
    def statistics(self):
318
        """Return an object containing debugging information.
319

320
        Currently the following fields are defined:
321

322
        * ``borrowed_tokens``: The number of tokens currently borrowed from
323
          the sack.
324
        * ``total_tokens``: The total number of tokens in the sack. Usually
325
          this will be larger than ``borrowed_tokens``, but it's possibly for
326
          it to be smaller if :attr:`total_tokens` was recently decreased.
327
        * ``borrowers``: A list of all tasks or other entities that currently
328
          hold a token.
329
        * ``tasks_waiting``: The number of tasks blocked on this
330
          :class:`CapacityLimiter`\'s :meth:`acquire` or
331
          :meth:`acquire_on_behalf_of` methods.
332

333
        """
334 27
        return _CapacityLimiterStatistics(
335
            borrowed_tokens=len(self._borrowers),
336
            total_tokens=self._total_tokens,
337
            # Use a list instead of a frozenset just in case we start to allow
338
            # one borrower to hold multiple tokens in the future
339
            borrowers=list(self._borrowers),
340
            tasks_waiting=len(self._lot),
341
        )
342

343

344 27
@async_cm
345 27
class Semaphore(metaclass=Final):
346
    """A `semaphore <https://en.wikipedia.org/wiki/Semaphore_(programming)>`__.
347

348
    A semaphore holds an integer value, which can be incremented by
349
    calling :meth:`release` and decremented by calling :meth:`acquire` – but
350
    the value is never allowed to drop below zero. If the value is zero, then
351
    :meth:`acquire` will block until someone calls :meth:`release`.
352

353
    If you're looking for a :class:`Semaphore` to limit the number of tasks
354
    that can access some resource simultaneously, then consider using a
355
    :class:`CapacityLimiter` instead.
356

357
    This object's interface is similar to, but different from, that of
358
    :class:`threading.Semaphore`.
359

360
    A :class:`Semaphore` object can be used as an async context manager; it
361
    blocks on entry but not on exit.
362

363
    Args:
364
      initial_value (int): A non-negative integer giving semaphore's initial
365
        value.
366
      max_value (int or None): If given, makes this a "bounded" semaphore that
367
        raises an error if the value is about to exceed the given
368
        ``max_value``.
369

370
    """
371

372 27
    def __init__(self, initial_value, *, max_value=None):
373 27
        if not isinstance(initial_value, int):
374 27
            raise TypeError("initial_value must be an int")
375 27
        if initial_value < 0:
376 27
            raise ValueError("initial value must be >= 0")
377 27
        if max_value is not None:
378 27
            if not isinstance(max_value, int):
379 27
                raise TypeError("max_value must be None or an int")
380 27
            if max_value < initial_value:
381 27
                raise ValueError("max_values must be >= initial_value")
382

383
        # Invariants:
384
        # bool(self._lot) implies self._value == 0
385
        # (or equivalently: self._value > 0 implies not self._lot)
386 27
        self._lot = trio.lowlevel.ParkingLot()
387 27
        self._value = initial_value
388 27
        self._max_value = max_value
389

390 27
    def __repr__(self):
391 27
        if self._max_value is None:
392 27
            max_value_str = ""
393
        else:
394 27
            max_value_str = ", max_value={}".format(self._max_value)
395 27
        return "<trio.Semaphore({}{}) at {:#x}>".format(
396
            self._value, max_value_str, id(self)
397
        )
398

399 27
    @property
400 13
    def value(self):
401
        """The current value of the semaphore."""
402 27
        return self._value
403

404 27
    @property
405 13
    def max_value(self):
406
        """The maximum allowed value. May be None to indicate no limit."""
407 27
        return self._max_value
408

409 27
    @enable_ki_protection
410 13
    def acquire_nowait(self):
411
        """Attempt to decrement the semaphore value, without blocking.
412

413
        Raises:
414
          WouldBlock: if the value is zero.
415

416
        """
417 27
        if self._value > 0:
418 27
            assert not self._lot
419 27
            self._value -= 1
420
        else:
421 27
            raise trio.WouldBlock
422

423 27
    @enable_ki_protection
424 13
    async def acquire(self):
425
        """Decrement the semaphore value, blocking if necessary to avoid
426
        letting it drop below zero.
427

428
        """
429 27
        await trio.lowlevel.checkpoint_if_cancelled()
430 27
        try:
431 27
            self.acquire_nowait()
432 27
        except trio.WouldBlock:
433 27
            await self._lot.park()
434
        else:
435 27
            await trio.lowlevel.cancel_shielded_checkpoint()
436

437 27
    @enable_ki_protection
438 13
    def release(self):
439
        """Increment the semaphore value, possibly waking a task blocked in
440
        :meth:`acquire`.
441

442
        Raises:
443
          ValueError: if incrementing the value would cause it to exceed
444
              :attr:`max_value`.
445

446
        """
447 27
        if self._lot:
448 27
            assert self._value == 0
449 27
            self._lot.unpark(count=1)
450
        else:
451 27
            if self._max_value is not None and self._value == self._max_value:
452 27
                raise ValueError("semaphore released too many times")
453 27
            self._value += 1
454

455 27
    def statistics(self):
456
        """Return an object containing debugging information.
457

458
        Currently the following fields are defined:
459

460
        * ``tasks_waiting``: The number of tasks blocked on this semaphore's
461
          :meth:`acquire` method.
462

463
        """
464 27
        return self._lot.statistics()
465

466

467 27
@attr.s(frozen=True)
468 13
class _LockStatistics:
469 27
    locked = attr.ib()
470 27
    owner = attr.ib()
471 27
    tasks_waiting = attr.ib()
472

473

474 27
@async_cm
475 27
@attr.s(eq=False, hash=False, repr=False)
476 13
class _LockImpl:
477 27
    _lot = attr.ib(factory=ParkingLot, init=False)
478 27
    _owner = attr.ib(default=None, init=False)
479

480 27
    def __repr__(self):
481 27
        if self.locked():
482 27
            s1 = "locked"
483 27
            s2 = " with {} waiters".format(len(self._lot))
484
        else:
485 27
            s1 = "unlocked"
486 27
            s2 = ""
487 27
        return "<{} {} object at {:#x}{}>".format(
488
            s1, self.__class__.__name__, id(self), s2
489
        )
490

491 27
    def locked(self):
492
        """Check whether the lock is currently held.
493

494
        Returns:
495
          bool: True if the lock is held, False otherwise.
496

497
        """
498 27
        return self._owner is not None
499

500 27
    @enable_ki_protection
501 13
    def acquire_nowait(self):
502
        """Attempt to acquire the lock, without blocking.
503

504
        Raises:
505
          WouldBlock: if the lock is held.
506

507
        """
508

509 27
        task = trio.lowlevel.current_task()
510 27
        if self._owner is task:
511 27
            raise RuntimeError("attempt to re-acquire an already held Lock")
512 27
        elif self._owner is None and not self._lot:
513
            # No-one owns it
514 27
            self._owner = task
515
        else:
516 27
            raise trio.WouldBlock
517

518 27
    @enable_ki_protection
519 13
    async def acquire(self):
520
        """Acquire the lock, blocking if necessary."""
521 27
        await trio.lowlevel.checkpoint_if_cancelled()
522 27
        try:
523 27
            self.acquire_nowait()
524 27
        except trio.WouldBlock:
525
            # NOTE: it's important that the contended acquire path is just
526
            # "_lot.park()", because that's how Condition.wait() acquires the
527
            # lock as well.
528 27
            await self._lot.park()
529
        else:
530 27
            await trio.lowlevel.cancel_shielded_checkpoint()
531

532 27
    @enable_ki_protection
533 13
    def release(self):
534
        """Release the lock.
535

536
        Raises:
537
          RuntimeError: if the calling task does not hold the lock.
538

539
        """
540 27
        task = trio.lowlevel.current_task()
541 27
        if task is not self._owner:
542 27
            raise RuntimeError("can't release a Lock you don't own")
543 27
        if self._lot:
544 27
            (self._owner,) = self._lot.unpark(count=1)
545
        else:
546 27
            self._owner = None
547

548 27
    def statistics(self):
549
        """Return an object containing debugging information.
550

551
        Currently the following fields are defined:
552

553
        * ``locked``: boolean indicating whether the lock is held.
554
        * ``owner``: the :class:`trio.lowlevel.Task` currently holding the lock,
555
          or None if the lock is not held.
556
        * ``tasks_waiting``: The number of tasks blocked on this lock's
557
          :meth:`acquire` method.
558

559
        """
560 27
        return _LockStatistics(
561
            locked=self.locked(), owner=self._owner, tasks_waiting=len(self._lot)
562
        )
563

564

565 27
class Lock(_LockImpl, metaclass=Final):
566
    """A classic `mutex
567
    <https://en.wikipedia.org/wiki/Lock_(computer_science)>`__.
568

569
    This is a non-reentrant, single-owner lock. Unlike
570
    :class:`threading.Lock`, only the owner of the lock is allowed to release
571
    it.
572

573
    A :class:`Lock` object can be used as an async context manager; it
574
    blocks on entry but not on exit.
575

576
    """
577

578

579 27
class StrictFIFOLock(_LockImpl, metaclass=Final):
580
    r"""A variant of :class:`Lock` where tasks are guaranteed to acquire the
581
    lock in strict first-come-first-served order.
582

583
    An example of when this is useful is if you're implementing something like
584
    :class:`trio.SSLStream` or an HTTP/2 server using `h2
585
    <https://hyper-h2.readthedocs.io/>`__, where you have multiple concurrent
586
    tasks that are interacting with a shared state machine, and at
587
    unpredictable moments the state machine requests that a chunk of data be
588
    sent over the network. (For example, when using h2 simply reading incoming
589
    data can occasionally `create outgoing data to send
590
    <https://http2.github.io/http2-spec/#PING>`__.) The challenge is to make
591
    sure that these chunks are sent in the correct order, without being
592
    garbled.
593

594
    One option would be to use a regular :class:`Lock`, and wrap it around
595
    every interaction with the state machine::
596

597
        # This approach is sometimes workable but often sub-optimal; see below
598
        async with lock:
599
            state_machine.do_something()
600
            if state_machine.has_data_to_send():
601
                await conn.sendall(state_machine.get_data_to_send())
602

603
    But this can be problematic. If you're using h2 then *usually* reading
604
    incoming data doesn't create the need to send any data, so we don't want
605
    to force every task that tries to read from the network to sit and wait
606
    a potentially long time for ``sendall`` to finish. And in some situations
607
    this could even potentially cause a deadlock, if the remote peer is
608
    waiting for you to read some data before it accepts the data you're
609
    sending.
610

611
    :class:`StrictFIFOLock` provides an alternative. We can rewrite our
612
    example like::
613

614
        # Note: no awaits between when we start using the state machine and
615
        # when we block to take the lock!
616
        state_machine.do_something()
617
        if state_machine.has_data_to_send():
618
            # Notice that we fetch the data to send out of the state machine
619
            # *before* sleeping, so that other tasks won't see it.
620
            chunk = state_machine.get_data_to_send()
621
            async with strict_fifo_lock:
622
                await conn.sendall(chunk)
623

624
    First we do all our interaction with the state machine in a single
625
    scheduling quantum (notice there are no ``await``\s in there), so it's
626
    automatically atomic with respect to other tasks. And then if and only if
627
    we have data to send, we get in line to send it – and
628
    :class:`StrictFIFOLock` guarantees that each task will send its data in
629
    the same order that the state machine generated it.
630

631
    Currently, :class:`StrictFIFOLock` is identical to :class:`Lock`,
632
    but (a) this may not always be true in the future, especially if Trio ever
633
    implements `more sophisticated scheduling policies
634
    <https://github.com/python-trio/trio/issues/32>`__, and (b) the above code
635
    is relying on a pretty subtle property of its lock. Using a
636
    :class:`StrictFIFOLock` acts as an executable reminder that you're relying
637
    on this property.
638

639
    """
640

641

642 27
@attr.s(frozen=True)
643 13
class _ConditionStatistics:
644 27
    tasks_waiting = attr.ib()
645 27
    lock_statistics = attr.ib()
646

647

648 27
@async_cm
649 27
class Condition(metaclass=Final):
650
    """A classic `condition variable
651
    <https://en.wikipedia.org/wiki/Monitor_(synchronization)>`__, similar to
652
    :class:`threading.Condition`.
653

654
    A :class:`Condition` object can be used as an async context manager to
655
    acquire the underlying lock; it blocks on entry but not on exit.
656

657
    Args:
658
      lock (Lock): the lock object to use. If given, must be a
659
          :class:`trio.Lock`. If None, a new :class:`Lock` will be allocated
660
          and used.
661

662
    """
663

664 27
    def __init__(self, lock=None):
665 27
        if lock is None:
666 27
            lock = Lock()
667 27
        if not type(lock) is Lock:
668 27
            raise TypeError("lock must be a trio.Lock")
669 27
        self._lock = lock
670 27
        self._lot = trio.lowlevel.ParkingLot()
671

672 27
    def locked(self):
673
        """Check whether the underlying lock is currently held.
674

675
        Returns:
676
          bool: True if the lock is held, False otherwise.
677

678
        """
679 27
        return self._lock.locked()
680

681 27
    def acquire_nowait(self):
682
        """Attempt to acquire the underlying lock, without blocking.
683

684
        Raises:
685
          WouldBlock: if the lock is currently held.
686

687
        """
688 27
        return self._lock.acquire_nowait()
689

690 27
    async def acquire(self):
691
        """Acquire the underlying lock, blocking if necessary."""
692 27
        await self._lock.acquire()
693

694 27
    def release(self):
695
        """Release the underlying lock."""
696 27
        self._lock.release()
697

698 27
    @enable_ki_protection
699 13
    async def wait(self):
700
        """Wait for another task to call :meth:`notify` or
701
        :meth:`notify_all`.
702

703
        When calling this method, you must hold the lock. It releases the lock
704
        while waiting, and then re-acquires it before waking up.
705

706
        There is a subtlety with how this method interacts with cancellation:
707
        when cancelled it will block to re-acquire the lock before raising
708
        :exc:`Cancelled`. This may cause cancellation to be less prompt than
709
        expected. The advantage is that it makes code like this work::
710

711
           async with condition:
712
               await condition.wait()
713

714
        If we didn't re-acquire the lock before waking up, and :meth:`wait`
715
        were cancelled here, then we'd crash in ``condition.__aexit__`` when
716
        we tried to release the lock we no longer held.
717

718
        Raises:
719
          RuntimeError: if the calling task does not hold the lock.
720

721
        """
722 27
        if trio.lowlevel.current_task() is not self._lock._owner:
723 27
            raise RuntimeError("must hold the lock to wait")
724 27
        self.release()
725
        # NOTE: we go to sleep on self._lot, but we'll wake up on
726
        # self._lock._lot. That's all that's required to acquire a Lock.
727 27
        try:
728 27
            await self._lot.park()
729 27
        except:
730 27
            with trio.CancelScope(shield=True):
731 27
                await self.acquire()
732 27
            raise
733

734 27
    def notify(self, n=1):
735
        """Wake one or more tasks that are blocked in :meth:`wait`.
736

737
        Args:
738
          n (int): The number of tasks to wake.
739

740
        Raises:
741
          RuntimeError: if the calling task does not hold the lock.
742

743
        """
744 27
        if trio.lowlevel.current_task() is not self._lock._owner:
745 27
            raise RuntimeError("must hold the lock to notify")
746 27
        self._lot.repark(self._lock._lot, count=n)
747

748 27
    def notify_all(self):
749
        """Wake all tasks that are currently blocked in :meth:`wait`.
750

751
        Raises:
752
          RuntimeError: if the calling task does not hold the lock.
753

754
        """
755 27
        if trio.lowlevel.current_task() is not self._lock._owner:
756 27
            raise RuntimeError("must hold the lock to notify")
757 27
        self._lot.repark_all(self._lock._lot)
758

759 27
    def statistics(self):
760
        r"""Return an object containing debugging information.
761

762
        Currently the following fields are defined:
763

764
        * ``tasks_waiting``: The number of tasks blocked on this condition's
765
          :meth:`wait` method.
766
        * ``lock_statistics``: The result of calling the underlying
767
          :class:`Lock`\s  :meth:`~Lock.statistics` method.
768

769
        """
770 27
        return _ConditionStatistics(
771
            tasks_waiting=len(self._lot), lock_statistics=self._lock.statistics()
772
        )

Read our documentation on viewing source code .

Loading