1 22
import pytest
2

3 22
import weakref
4

5 22
from ..testing import wait_all_tasks_blocked, assert_checkpoints
6

7 22
from .. import _core
8 22
from .. import _timeouts
9 22
from .._timeouts import sleep_forever, move_on_after
10 22
from .._sync import *
11

12

13 22
async def test_Event():
14 22
    e = Event()
15 22
    assert not e.is_set()
16 22
    assert e.statistics().tasks_waiting == 0
17

18 22
    e.set()
19 22
    assert e.is_set()
20 22
    with assert_checkpoints():
21 22
        await e.wait()
22

23 22
    e = Event()
24

25 22
    record = []
26

27 22
    async def child():
28 22
        record.append("sleeping")
29 22
        await e.wait()
30 22
        record.append("woken")
31

32 22
    async with _core.open_nursery() as nursery:
33 22
        nursery.start_soon(child)
34 22
        nursery.start_soon(child)
35 22
        await wait_all_tasks_blocked()
36 22
        assert record == ["sleeping", "sleeping"]
37 22
        assert e.statistics().tasks_waiting == 2
38 22
        e.set()
39 22
        await wait_all_tasks_blocked()
40 22
        assert record == ["sleeping", "sleeping", "woken", "woken"]
41

42

43 22
async def test_CapacityLimiter():
44 22
    with pytest.raises(TypeError):
45 22
        CapacityLimiter(1.0)
46 22
    with pytest.raises(ValueError):
47 22
        CapacityLimiter(-1)
48 22
    c = CapacityLimiter(2)
49 22
    repr(c)  # smoke test
50 22
    assert c.total_tokens == 2
51 22
    assert c.borrowed_tokens == 0
52 22
    assert c.available_tokens == 2
53 22
    with pytest.raises(RuntimeError):
54 22
        c.release()
55 22
    assert c.borrowed_tokens == 0
56 22
    c.acquire_nowait()
57 22
    assert c.borrowed_tokens == 1
58 22
    assert c.available_tokens == 1
59

60 22
    stats = c.statistics()
61 22
    assert stats.borrowed_tokens == 1
62 22
    assert stats.total_tokens == 2
63 22
    assert stats.borrowers == [_core.current_task()]
64 22
    assert stats.tasks_waiting == 0
65

66
    # Can't re-acquire when we already have it
67 22
    with pytest.raises(RuntimeError):
68 22
        c.acquire_nowait()
69 22
    assert c.borrowed_tokens == 1
70 22
    with pytest.raises(RuntimeError):
71 22
        await c.acquire()
72 22
    assert c.borrowed_tokens == 1
73

74
    # We can acquire on behalf of someone else though
75 22
    with assert_checkpoints():
76 22
        await c.acquire_on_behalf_of("someone")
77

78
    # But then we've run out of capacity
79 22
    assert c.borrowed_tokens == 2
80 22
    with pytest.raises(_core.WouldBlock):
81 22
        c.acquire_on_behalf_of_nowait("third party")
82

83 22
    assert set(c.statistics().borrowers) == {_core.current_task(), "someone"}
84

85
    # Until we release one
86 22
    c.release_on_behalf_of(_core.current_task())
87 22
    assert c.statistics().borrowers == ["someone"]
88

89 22
    c.release_on_behalf_of("someone")
90 22
    assert c.borrowed_tokens == 0
91 22
    with assert_checkpoints():
92 22
        async with c:
93 22
            assert c.borrowed_tokens == 1
94

95 22
    async with _core.open_nursery() as nursery:
96 22
        await c.acquire_on_behalf_of("value 1")
97 22
        await c.acquire_on_behalf_of("value 2")
98 22
        nursery.start_soon(c.acquire_on_behalf_of, "value 3")
99 22
        await wait_all_tasks_blocked()
100 22
        assert c.borrowed_tokens == 2
101 22
        assert c.statistics().tasks_waiting == 1
102 22
        c.release_on_behalf_of("value 2")
103
        # Fairness:
104 22
        assert c.borrowed_tokens == 2
105 22
        with pytest.raises(_core.WouldBlock):
106 22
            c.acquire_nowait()
107

108 22
    c.release_on_behalf_of("value 3")
109 22
    c.release_on_behalf_of("value 1")
110

111

112 22
async def test_CapacityLimiter_inf():
113 22
    from math import inf
114

115 22
    c = CapacityLimiter(inf)
116 22
    repr(c)  # smoke test
117 22
    assert c.total_tokens == inf
118 22
    assert c.borrowed_tokens == 0
119 22
    assert c.available_tokens == inf
120 22
    with pytest.raises(RuntimeError):
121 22
        c.release()
122 22
    assert c.borrowed_tokens == 0
123 22
    c.acquire_nowait()
124 22
    assert c.borrowed_tokens == 1
125 22
    assert c.available_tokens == inf
126

127

128 22
async def test_CapacityLimiter_change_total_tokens():
129 22
    c = CapacityLimiter(2)
130

131 22
    with pytest.raises(TypeError):
132 22
        c.total_tokens = 1.0
133

134 22
    with pytest.raises(ValueError):
135 22
        c.total_tokens = 0
136

137 22
    with pytest.raises(ValueError):
138 22
        c.total_tokens = -10
139

140 22
    assert c.total_tokens == 2
141

142 22
    async with _core.open_nursery() as nursery:
143 22
        for i in range(5):
144 22
            nursery.start_soon(c.acquire_on_behalf_of, i)
145 22
            await wait_all_tasks_blocked()
146 22
        assert set(c.statistics().borrowers) == {0, 1}
147 22
        assert c.statistics().tasks_waiting == 3
148 22
        c.total_tokens += 2
149 22
        assert set(c.statistics().borrowers) == {0, 1, 2, 3}
150 22
        assert c.statistics().tasks_waiting == 1
151 22
        c.total_tokens -= 3
152 22
        assert c.borrowed_tokens == 4
153 22
        assert c.total_tokens == 1
154 22
        c.release_on_behalf_of(0)
155 22
        c.release_on_behalf_of(1)
156 22
        c.release_on_behalf_of(2)
157 22
        assert set(c.statistics().borrowers) == {3}
158 22
        assert c.statistics().tasks_waiting == 1
159 22
        c.release_on_behalf_of(3)
160 22
        assert set(c.statistics().borrowers) == {4}
161 22
        assert c.statistics().tasks_waiting == 0
162

163

164
# regression test for issue #548
165 22
async def test_CapacityLimiter_memleak_548():
166 22
    limiter = CapacityLimiter(total_tokens=1)
167 22
    await limiter.acquire()
168

169 22
    async with _core.open_nursery() as n:
170 22
        n.start_soon(limiter.acquire)
171 22
        await wait_all_tasks_blocked()  # give it a chance to run the task
172 22
        n.cancel_scope.cancel()
173

174
    # if this is 1, the acquire call (despite being killed) is still there in the task, and will
175
    # leak memory all the while the limiter is active
176 22
    assert len(limiter._pending_borrowers) == 0
177

178

179 22
async def test_Semaphore():
180 22
    with pytest.raises(TypeError):
181 22
        Semaphore(1.0)
182 22
    with pytest.raises(ValueError):
183 22
        Semaphore(-1)
184 22
    s = Semaphore(1)
185 22
    repr(s)  # smoke test
186 22
    assert s.value == 1
187 22
    assert s.max_value is None
188 22
    s.release()
189 22
    assert s.value == 2
190 22
    assert s.statistics().tasks_waiting == 0
191 22
    s.acquire_nowait()
192 22
    assert s.value == 1
193 22
    with assert_checkpoints():
194 22
        await s.acquire()
195 22
    assert s.value == 0
196 22
    with pytest.raises(_core.WouldBlock):
197 22
        s.acquire_nowait()
198

199 22
    s.release()
200 22
    assert s.value == 1
201 22
    with assert_checkpoints():
202 22
        async with s:
203 22
            assert s.value == 0
204 22
    assert s.value == 1
205 22
    s.acquire_nowait()
206

207 22
    record = []
208

209 22
    async def do_acquire(s):
210 22
        record.append("started")
211 22
        await s.acquire()
212 22
        record.append("finished")
213

214 22
    async with _core.open_nursery() as nursery:
215 22
        nursery.start_soon(do_acquire, s)
216 22
        await wait_all_tasks_blocked()
217 22
        assert record == ["started"]
218 22
        assert s.value == 0
219 22
        s.release()
220
        # Fairness:
221 22
        assert s.value == 0
222 22
        with pytest.raises(_core.WouldBlock):
223 22
            s.acquire_nowait()
224 22
    assert record == ["started", "finished"]
225

226

227 22
async def test_Semaphore_bounded():
228 22
    with pytest.raises(TypeError):
229 22
        Semaphore(1, max_value=1.0)
230 22
    with pytest.raises(ValueError):
231 22
        Semaphore(2, max_value=1)
232 22
    bs = Semaphore(1, max_value=1)
233 22
    assert bs.max_value == 1
234 22
    repr(bs)  # smoke test
235 22
    with pytest.raises(ValueError):
236 22
        bs.release()
237 22
    assert bs.value == 1
238 22
    bs.acquire_nowait()
239 22
    assert bs.value == 0
240 22
    bs.release()
241 22
    assert bs.value == 1
242

243

244 22
@pytest.mark.parametrize("lockcls", [Lock, StrictFIFOLock], ids=lambda fn: fn.__name__)
245 9
async def test_Lock_and_StrictFIFOLock(lockcls):
246 22
    l = lockcls()  # noqa
247 22
    assert not l.locked()
248

249
    # make sure locks can be weakref'ed (gh-331)
250 22
    r = weakref.ref(l)
251 22
    assert r() is l
252

253 22
    repr(l)  # smoke test
254
    # make sure repr uses the right name for subclasses
255 22
    assert lockcls.__name__ in repr(l)
256 22
    with assert_checkpoints():
257 22
        async with l:
258 22
            assert l.locked()
259 22
            repr(l)  # smoke test (repr branches on locked/unlocked)
260 22
    assert not l.locked()
261 22
    l.acquire_nowait()
262 22
    assert l.locked()
263 22
    l.release()
264 22
    assert not l.locked()
265 22
    with assert_checkpoints():
266 22
        await l.acquire()
267 22
    assert l.locked()
268 22
    l.release()
269 22
    assert not l.locked()
270

271 22
    l.acquire_nowait()
272 22
    with pytest.raises(RuntimeError):
273
        # Error out if we already own the lock
274 22
        l.acquire_nowait()
275 22
    l.release()
276 22
    with pytest.raises(RuntimeError):
277
        # Error out if we don't own the lock
278 22
        l.release()
279

280 22
    holder_task = None
281

282 22
    async def holder():
283
        nonlocal holder_task
284 22
        holder_task = _core.current_task()
285 22
        async with l:
286 22
            await sleep_forever()
287

288 22
    async with _core.open_nursery() as nursery:
289 22
        assert not l.locked()
290 22
        nursery.start_soon(holder)
291 22
        await wait_all_tasks_blocked()
292 22
        assert l.locked()
293
        # WouldBlock if someone else holds the lock
294 22
        with pytest.raises(_core.WouldBlock):
295 22
            l.acquire_nowait()
296
        # Can't release a lock someone else holds
297 22
        with pytest.raises(RuntimeError):
298 22
            l.release()
299

300 22
        statistics = l.statistics()
301 22
        print(statistics)
302 22
        assert statistics.locked
303 22
        assert statistics.owner is holder_task
304 22
        assert statistics.tasks_waiting == 0
305

306 22
        nursery.start_soon(holder)
307 22
        await wait_all_tasks_blocked()
308 22
        statistics = l.statistics()
309 22
        print(statistics)
310 22
        assert statistics.tasks_waiting == 1
311

312 22
        nursery.cancel_scope.cancel()
313

314 22
    statistics = l.statistics()
315 22
    assert not statistics.locked
316 22
    assert statistics.owner is None
317 22
    assert statistics.tasks_waiting == 0
318

319

320 22
async def test_Condition():
321 22
    with pytest.raises(TypeError):
322 22
        Condition(Semaphore(1))
323 22
    with pytest.raises(TypeError):
324 22
        Condition(StrictFIFOLock)
325 22
    l = Lock()  # noqa
326 22
    c = Condition(l)
327 22
    assert not l.locked()
328 22
    assert not c.locked()
329 22
    with assert_checkpoints():
330 22
        await c.acquire()
331 22
    assert l.locked()
332 22
    assert c.locked()
333

334 22
    c = Condition()
335 22
    assert not c.locked()
336 22
    c.acquire_nowait()
337 22
    assert c.locked()
338 22
    with pytest.raises(RuntimeError):
339 22
        c.acquire_nowait()
340 22
    c.release()
341

342 22
    with pytest.raises(RuntimeError):
343
        # Can't wait without holding the lock
344 22
        await c.wait()
345 22
    with pytest.raises(RuntimeError):
346
        # Can't notify without holding the lock
347 22
        c.notify()
348 22
    with pytest.raises(RuntimeError):
349
        # Can't notify without holding the lock
350 22
        c.notify_all()
351

352 22
    finished_waiters = set()
353

354 22
    async def waiter(i):
355 22
        async with c:
356 22
            await c.wait()
357 22
        finished_waiters.add(i)
358

359 22
    async with _core.open_nursery() as nursery:
360 22
        for i in range(3):
361 22
            nursery.start_soon(waiter, i)
362 22
            await wait_all_tasks_blocked()
363 22
        async with c:
364 22
            c.notify()
365 22
        assert c.locked()
366 22
        await wait_all_tasks_blocked()
367 22
        assert finished_waiters == {0}
368 22
        async with c:
369 22
            c.notify_all()
370 22
        await wait_all_tasks_blocked()
371 22
        assert finished_waiters == {0, 1, 2}
372

373 22
    finished_waiters = set()
374 22
    async with _core.open_nursery() as nursery:
375 22
        for i in range(3):
376 22
            nursery.start_soon(waiter, i)
377 22
            await wait_all_tasks_blocked()
378 22
        async with c:
379 22
            c.notify(2)
380 22
            statistics = c.statistics()
381 22
            print(statistics)
382 22
            assert statistics.tasks_waiting == 1
383 22
            assert statistics.lock_statistics.tasks_waiting == 2
384
        # exiting the context manager hands off the lock to the first task
385 22
        assert c.statistics().lock_statistics.tasks_waiting == 1
386

387 22
        await wait_all_tasks_blocked()
388 22
        assert finished_waiters == {0, 1}
389

390 22
        async with c:
391 22
            c.notify_all()
392

393
    # After being cancelled still hold the lock (!)
394
    # (Note that c.__aexit__ checks that we hold the lock as well)
395 22
    with _core.CancelScope() as scope:
396 22
        async with c:
397 22
            scope.cancel()
398 22
            try:
399 22
                await c.wait()
400
            finally:
401 22
                assert c.locked()
402

403

404 22
from .._sync import async_cm
405 22
from .._channel import open_memory_channel
406

407
# Three ways of implementing a Lock in terms of a channel. Used to let us put
408
# the channel through the generic lock tests.
409

410

411 22
@async_cm
412 9
class ChannelLock1:
413 22
    def __init__(self, capacity):
414 22
        self.s, self.r = open_memory_channel(capacity)
415 22
        for _ in range(capacity - 1):
416 22
            self.s.send_nowait(None)
417

418 22
    def acquire_nowait(self):
419 22
        self.s.send_nowait(None)
420

421 22
    async def acquire(self):
422 22
        await self.s.send(None)
423

424 22
    def release(self):
425 22
        self.r.receive_nowait()
426

427

428 22
@async_cm
429 9
class ChannelLock2:
430 22
    def __init__(self):
431 22
        self.s, self.r = open_memory_channel(10)
432 22
        self.s.send_nowait(None)
433

434 22
    def acquire_nowait(self):
435 22
        self.r.receive_nowait()
436

437 22
    async def acquire(self):
438 22
        await self.r.receive()
439

440 22
    def release(self):
441 22
        self.s.send_nowait(None)
442

443

444 22
@async_cm
445 9
class ChannelLock3:
446 22
    def __init__(self):
447 22
        self.s, self.r = open_memory_channel(0)
448
        # self.acquired is true when one task acquires the lock and
449
        # only becomes false when it's released and no tasks are
450
        # waiting to acquire.
451 22
        self.acquired = False
452

453 22
    def acquire_nowait(self):
454 22
        assert not self.acquired
455 22
        self.acquired = True
456

457 22
    async def acquire(self):
458 22
        if self.acquired:
459 22
            await self.s.send(None)
460
        else:
461 22
            self.acquired = True
462 22
            await _core.checkpoint()
463

464 22
    def release(self):
465 22
        try:
466 22
            self.r.receive_nowait()
467 22
        except _core.WouldBlock:
468 22
            assert self.acquired
469 22
            self.acquired = False
470

471

472 22
lock_factories = [
473
    lambda: CapacityLimiter(1),
474
    lambda: Semaphore(1),
475
    Lock,
476
    StrictFIFOLock,
477
    lambda: ChannelLock1(10),
478
    lambda: ChannelLock1(1),
479
    ChannelLock2,
480
    ChannelLock3,
481
]
482 22
lock_factory_names = [
483
    "CapacityLimiter(1)",
484
    "Semaphore(1)",
485
    "Lock",
486
    "StrictFIFOLock",
487
    "ChannelLock1(10)",
488
    "ChannelLock1(1)",
489
    "ChannelLock2",
490
    "ChannelLock3",
491
]
492

493 22
generic_lock_test = pytest.mark.parametrize(
494
    "lock_factory", lock_factories, ids=lock_factory_names
495
)
496

497

498
# Spawn a bunch of workers that take a lock and then yield; make sure that
499
# only one worker is ever in the critical section at a time.
500 22
@generic_lock_test
501 9
async def test_generic_lock_exclusion(lock_factory):
502 22
    LOOPS = 10
503 22
    WORKERS = 5
504 22
    in_critical_section = False
505 22
    acquires = 0
506

507 22
    async def worker(lock_like):
508
        nonlocal in_critical_section, acquires
509 22
        for _ in range(LOOPS):
510 22
            async with lock_like:
511 22
                acquires += 1
512 22
                assert not in_critical_section
513 22
                in_critical_section = True
514 22
                await _core.checkpoint()
515 22
                await _core.checkpoint()
516 22
                assert in_critical_section
517 22
                in_critical_section = False
518

519 22
    async with _core.open_nursery() as nursery:
520 22
        lock_like = lock_factory()
521 22
        for _ in range(WORKERS):
522 22
            nursery.start_soon(worker, lock_like)
523 22
    assert not in_critical_section
524 22
    assert acquires == LOOPS * WORKERS
525

526

527
# Several workers queue on the same lock; make sure they each get it, in
528
# order.
529 22
@generic_lock_test
530 9
async def test_generic_lock_fifo_fairness(lock_factory):
531 22
    initial_order = []
532 22
    record = []
533 22
    LOOPS = 5
534

535 22
    async def loopy(name, lock_like):
536
        # Record the order each task was initially scheduled in
537 22
        initial_order.append(name)
538 22
        for _ in range(LOOPS):
539 22
            async with lock_like:
540 22
                record.append(name)
541

542 22
    lock_like = lock_factory()
543 22
    async with _core.open_nursery() as nursery:
544 22
        nursery.start_soon(loopy, 1, lock_like)
545 22
        nursery.start_soon(loopy, 2, lock_like)
546 22
        nursery.start_soon(loopy, 3, lock_like)
547
    # The first three could be in any order due to scheduling randomness,
548
    # but after that they should repeat in the same order
549 22
    for i in range(LOOPS):
550 22
        assert record[3 * i : 3 * (i + 1)] == initial_order
551

552

553 22
@generic_lock_test
554 9
async def test_generic_lock_acquire_nowait_blocks_acquire(lock_factory):
555 22
    lock_like = lock_factory()
556

557 22
    record = []
558

559 22
    async def lock_taker():
560 22
        record.append("started")
561 22
        async with lock_like:
562 22
            pass
563 22
        record.append("finished")
564

565 22
    async with _core.open_nursery() as nursery:
566 22
        lock_like.acquire_nowait()
567 22
        nursery.start_soon(lock_taker)
568 22
        await wait_all_tasks_blocked()
569 22
        assert record == ["started"]
570 22
        lock_like.release()

Read our documentation on viewing source code .

Loading