Stop using RawGit URL in PyPI
1 | 22 |
import pytest |
2 | 22 |
import threading |
3 | 22 |
from queue import Queue |
4 | 22 |
import time |
5 | 22 |
import sys |
6 |
|
|
7 | 22 |
from .tutil import slow, gc_collect_harder |
8 | 22 |
from .. import _thread_cache |
9 | 22 |
from .._thread_cache import start_thread_soon, ThreadCache |
10 |
|
|
11 |
|
|
12 | 22 |
def test_thread_cache_basics(): |
13 | 22 |
q = Queue() |
14 |
|
|
15 | 22 |
def fn(): |
16 | 22 |
raise RuntimeError("hi") |
17 |
|
|
18 | 22 |
def deliver(outcome): |
19 | 22 |
q.put(outcome) |
20 |
|
|
21 | 22 |
start_thread_soon(fn, deliver) |
22 |
|
|
23 | 22 |
outcome = q.get() |
24 | 22 |
with pytest.raises(RuntimeError, match="hi"): |
25 | 22 |
outcome.unwrap() |
26 |
|
|
27 |
|
|
28 | 22 |
def test_thread_cache_deref(): |
29 | 22 |
res = [False] |
30 |
|
|
31 | 22 |
class del_me: |
32 | 22 |
def __call__(self): |
33 | 22 |
return 42 |
34 |
|
|
35 | 22 |
def __del__(self): |
36 | 20 |
res[0] = True |
37 |
|
|
38 | 22 |
q = Queue() |
39 |
|
|
40 | 22 |
def deliver(outcome): |
41 | 22 |
q.put(outcome) |
42 |
|
|
43 | 22 |
start_thread_soon(del_me(), deliver) |
44 | 22 |
outcome = q.get() |
45 | 22 |
assert outcome.unwrap() == 42 |
46 |
|
|
47 | 22 |
gc_collect_harder() |
48 | 22 |
assert res[0] |
49 |
|
|
50 |
|
|
51 | 22 |
@slow
|
52 | 9 |
def test_spawning_new_thread_from_deliver_reuses_starting_thread(): |
53 |
# We know that no-one else is using the thread cache, so if we keep
|
|
54 |
# submitting new jobs the instant the previous one is finished, we should
|
|
55 |
# keep getting the same thread over and over. This tests both that the
|
|
56 |
# thread cache is LIFO, and that threads can be assigned new work *before*
|
|
57 |
# deliver exits.
|
|
58 |
|
|
59 |
# Make sure there are a few threads running, so if we weren't LIFO then we
|
|
60 |
# could grab the wrong one.
|
|
61 | 22 |
q = Queue() |
62 | 22 |
COUNT = 5 |
63 |
for _ in range(COUNT): |
|
64 |
start_thread_soon(lambda: time.sleep(1), lambda result: q.put(result)) |
|
65 |
for _ in range(COUNT): |
|
66 | 22 |
q.get().unwrap() |
67 |
|
|
68 | 22 |
seen_threads = set() |
69 | 22 |
done = threading.Event() |
70 |
|
|
71 | 22 |
def deliver(n, _): |
72 | 22 |
print(n) |
73 | 22 |
seen_threads.add(threading.current_thread()) |
74 |
if n == 0: |
|
75 | 22 |
done.set() |
76 |
else: |
|
77 |
start_thread_soon(lambda: None, lambda _: deliver(n - 1, _)) |
|
78 |
|
|
79 |
start_thread_soon(lambda: None, lambda _: deliver(5, _)) |
|
80 |
|
|
81 | 22 |
done.wait() |
82 |
|
|
83 | 22 |
assert len(seen_threads) == 1 |
84 |
|
|
85 |
|
|
86 | 22 |
@slow
|
87 | 9 |
def test_idle_threads_exit(monkeypatch): |
88 |
# Temporarily set the idle timeout to something tiny, to speed up the
|
|
89 |
# test. (But non-zero, so that the worker loop will at least yield the
|
|
90 |
# CPU.)
|
|
91 | 22 |
monkeypatch.setattr(_thread_cache, "IDLE_TIMEOUT", 0.0001) |
92 |
|
|
93 | 22 |
q = Queue() |
94 |
start_thread_soon(lambda: None, lambda _: q.put(threading.current_thread())) |
|
95 | 22 |
seen_thread = q.get() |
96 |
# Since the idle timeout is 0, after sleeping for 1 second, the thread
|
|
97 |
# should have exited
|
|
98 | 22 |
time.sleep(1) |
99 | 22 |
assert not seen_thread.is_alive() |
100 |
|
|
101 |
|
|
102 | 22 |
def test_race_between_idle_exit_and_job_assignment(monkeypatch): |
103 |
# This is a lock where the first few times you try to acquire it with a
|
|
104 |
# timeout, it waits until the lock is available and then pretends to time
|
|
105 |
# out. Using this in our thread cache implementation causes the following
|
|
106 |
# sequence:
|
|
107 |
#
|
|
108 |
# 1. start_thread_soon grabs the worker thread, assigns it a job, and
|
|
109 |
# releases its lock.
|
|
110 |
# 2. The worker thread wakes up (because the lock has been released), but
|
|
111 |
# the JankyLock lies to it and tells it that the lock timed out. So the
|
|
112 |
# worker thread tries to exit.
|
|
113 |
# 3. The worker thread checks for the race between exiting and being
|
|
114 |
# assigned a job, and discovers that it *is* in the process of being
|
|
115 |
# assigned a job, so it loops around and tries to acquire the lock
|
|
116 |
# again.
|
|
117 |
# 4. Eventually the JankyLock admits that the lock is available, and
|
|
118 |
# everything proceeds as normal.
|
|
119 |
|
|
120 | 22 |
class JankyLock: |
121 | 22 |
def __init__(self): |
122 | 22 |
self._lock = threading.Lock() |
123 | 22 |
self._counter = 3 |
124 |
|
|
125 | 22 |
def acquire(self, timeout=None): |
126 | 22 |
self._lock.acquire() |
127 |
if timeout is None: |
|
128 | 22 |
return True |
129 |
else: |
|
130 |
if self._counter > 0: |
|
131 | 22 |
self._counter -= 1 |
132 | 22 |
self._lock.release() |
133 | 22 |
return False |
134 | 22 |
return True |
135 |
|
|
136 | 22 |
def release(self): |
137 | 22 |
self._lock.release() |
138 |
|
|
139 | 22 |
monkeypatch.setattr(_thread_cache, "Lock", JankyLock) |
140 |
|
|
141 | 22 |
tc = ThreadCache() |
142 | 22 |
done = threading.Event() |
143 |
tc.start_thread_soon(lambda: None, lambda _: done.set()) |
|
144 | 22 |
done.wait() |
145 |
# Let's kill the thread we started, so it doesn't hang around until the
|
|
146 |
# test suite finishes. Doesn't really do any harm, but it can be confusing
|
|
147 |
# to see it in debug output. This is hacky, and leaves our ThreadCache
|
|
148 |
# object in an inconsistent state... but it doesn't matter, because we're
|
|
149 |
# not going to use it again anyway.
|
|
150 |
tc.start_thread_soon(lambda: None, lambda _: sys.exit()) |
Read our documentation on viewing source code .