1 22
import pytest
2 22
import threading
3 22
from queue import Queue
4 22
import time
5 22
import sys
6

7 22
from .tutil import slow, gc_collect_harder
8 22
from .. import _thread_cache
9 22
from .._thread_cache import start_thread_soon, ThreadCache
10

11

12 22
def test_thread_cache_basics():
13 22
    q = Queue()
14

15 22
    def fn():
16 22
        raise RuntimeError("hi")
17

18 22
    def deliver(outcome):
19 22
        q.put(outcome)
20

21 22
    start_thread_soon(fn, deliver)
22

23 22
    outcome = q.get()
24 22
    with pytest.raises(RuntimeError, match="hi"):
25 22
        outcome.unwrap()
26

27

28 22
def test_thread_cache_deref():
29 22
    res = [False]
30

31 22
    class del_me:
32 22
        def __call__(self):
33 22
            return 42
34

35 22
        def __del__(self):
36 20
            res[0] = True
37

38 22
    q = Queue()
39

40 22
    def deliver(outcome):
41 22
        q.put(outcome)
42

43 22
    start_thread_soon(del_me(), deliver)
44 22
    outcome = q.get()
45 22
    assert outcome.unwrap() == 42
46

47 22
    gc_collect_harder()
48 22
    assert res[0]
49

50

51 22
@slow
52 9
def test_spawning_new_thread_from_deliver_reuses_starting_thread():
53
    # We know that no-one else is using the thread cache, so if we keep
54
    # submitting new jobs the instant the previous one is finished, we should
55
    # keep getting the same thread over and over. This tests both that the
56
    # thread cache is LIFO, and that threads can be assigned new work *before*
57
    # deliver exits.
58

59
    # Make sure there are a few threads running, so if we weren't LIFO then we
60
    # could grab the wrong one.
61 22
    q = Queue()
62 22
    COUNT = 5
63 22
    for _ in range(COUNT):
64 22
        start_thread_soon(lambda: time.sleep(1), lambda result: q.put(result))
65 22
    for _ in range(COUNT):
66 22
        q.get().unwrap()
67

68 22
    seen_threads = set()
69 22
    done = threading.Event()
70

71 22
    def deliver(n, _):
72 22
        print(n)
73 22
        seen_threads.add(threading.current_thread())
74 22
        if n == 0:
75 22
            done.set()
76
        else:
77 22
            start_thread_soon(lambda: None, lambda _: deliver(n - 1, _))
78

79 22
    start_thread_soon(lambda: None, lambda _: deliver(5, _))
80

81 22
    done.wait()
82

83 22
    assert len(seen_threads) == 1
84

85

86 22
@slow
87 9
def test_idle_threads_exit(monkeypatch):
88
    # Temporarily set the idle timeout to something tiny, to speed up the
89
    # test. (But non-zero, so that the worker loop will at least yield the
90
    # CPU.)
91 22
    monkeypatch.setattr(_thread_cache, "IDLE_TIMEOUT", 0.0001)
92

93 22
    q = Queue()
94 22
    start_thread_soon(lambda: None, lambda _: q.put(threading.current_thread()))
95 22
    seen_thread = q.get()
96
    # Since the idle timeout is 0, after sleeping for 1 second, the thread
97
    # should have exited
98 22
    time.sleep(1)
99 22
    assert not seen_thread.is_alive()
100

101

102 22
def test_race_between_idle_exit_and_job_assignment(monkeypatch):
103
    # This is a lock where the first few times you try to acquire it with a
104
    # timeout, it waits until the lock is available and then pretends to time
105
    # out. Using this in our thread cache implementation causes the following
106
    # sequence:
107
    #
108
    # 1. start_thread_soon grabs the worker thread, assigns it a job, and
109
    #    releases its lock.
110
    # 2. The worker thread wakes up (because the lock has been released), but
111
    #    the JankyLock lies to it and tells it that the lock timed out. So the
112
    #    worker thread tries to exit.
113
    # 3. The worker thread checks for the race between exiting and being
114
    #    assigned a job, and discovers that it *is* in the process of being
115
    #    assigned a job, so it loops around and tries to acquire the lock
116
    #    again.
117
    # 4. Eventually the JankyLock admits that the lock is available, and
118
    #    everything proceeds as normal.
119

120 22
    class JankyLock:
121 22
        def __init__(self):
122 22
            self._lock = threading.Lock()
123 22
            self._counter = 3
124

125 22
        def acquire(self, timeout=None):
126 22
            self._lock.acquire()
127 22
            if timeout is None:
128 22
                return True
129
            else:
130 22
                if self._counter > 0:
131 22
                    self._counter -= 1
132 22
                    self._lock.release()
133 22
                    return False
134 22
                return True
135

136 22
        def release(self):
137 22
            self._lock.release()
138

139 22
    monkeypatch.setattr(_thread_cache, "Lock", JankyLock)
140

141 22
    tc = ThreadCache()
142 22
    done = threading.Event()
143 22
    tc.start_thread_soon(lambda: None, lambda _: done.set())
144 22
    done.wait()
145
    # Let's kill the thread we started, so it doesn't hang around until the
146
    # test suite finishes. Doesn't really do any harm, but it can be confusing
147
    # to see it in debug output. This is hacky, and leaves our ThreadCache
148
    # object in an inconsistent state... but it doesn't matter, because we're
149
    # not going to use it again anyway.
150 22
    tc.start_thread_soon(lambda: None, lambda _: sys.exit())

Read our documentation on viewing source code .

Loading