1 22
import errno
2 22
import select
3 22
import os
4 22
import tempfile
5 22
import sys
6

7 22
import pytest
8

9 22
from .._core.tests.tutil import gc_collect_harder, skip_if_fbsd_pipes_broken
10 22
from .. import _core, move_on_after
11 22
from ..testing import wait_all_tasks_blocked, check_one_way_stream
12

13 22
posix = os.name == "posix"
14 22
pytestmark = pytest.mark.skipif(not posix, reason="posix only")
15 22
if posix:
16 13
    from .._unix_pipes import FdStream
17
else:
18 9
    with pytest.raises(ImportError):
19 9
        from .._unix_pipes import FdStream
20

21

22
# Have to use quoted types so import doesn't crash on windows
23 22
async def make_pipe() -> "Tuple[FdStream, FdStream]":
24
    """Makes a new pair of pipes."""
25 13
    (r, w) = os.pipe()
26 13
    return FdStream(w), FdStream(r)
27

28

29 22
async def make_clogged_pipe():
30 13
    s, r = await make_pipe()
31 13
    try:
32 9
        while True:
33
            # We want to totally fill up the pipe buffer.
34
            # This requires working around a weird feature that POSIX pipes
35
            # have.
36
            # If you do a write of <= PIPE_BUF bytes, then it's guaranteed
37
            # to either complete entirely, or not at all. So if we tried to
38
            # write PIPE_BUF bytes, and the buffer's free space is only
39
            # PIPE_BUF/2, then the write will raise BlockingIOError... even
40
            # though a smaller write could still succeed! To avoid this,
41
            # make sure to write >PIPE_BUF bytes each time, which disables
42
            # the special behavior.
43
            # For details, search for PIPE_BUF here:
44
            #   http://pubs.opengroup.org/onlinepubs/9699919799/functions/write.html
45

46
            # for the getattr:
47
            # https://bitbucket.org/pypy/pypy/issues/2876/selectpipe_buf-is-missing-on-pypy3
48 13
            buf_size = getattr(select, "PIPE_BUF", 8192)
49 13
            os.write(s.fileno(), b"x" * buf_size * 2)
50 13
    except BlockingIOError:
51 13
        pass
52 13
    return s, r
53

54

55 22
async def test_send_pipe():
56 13
    r, w = os.pipe()
57 13
    async with FdStream(w) as send:
58 13
        assert send.fileno() == w
59 13
        await send.send_all(b"123")
60 13
        assert (os.read(r, 8)) == b"123"
61

62 13
        os.close(r)
63

64

65 22
async def test_receive_pipe():
66 13
    r, w = os.pipe()
67 13
    async with FdStream(r) as recv:
68 13
        assert (recv.fileno()) == r
69 13
        os.write(w, b"123")
70 13
        assert (await recv.receive_some(8)) == b"123"
71

72 13
        os.close(w)
73

74

75 22
async def test_pipes_combined():
76 13
    write, read = await make_pipe()
77 13
    count = 2 ** 20
78

79 13
    async def sender():
80 13
        big = bytearray(count)
81 13
        await write.send_all(big)
82

83 13
    async def reader():
84 13
        await wait_all_tasks_blocked()
85 13
        received = 0
86 22
        while received < count:
87 13
            received += len(await read.receive_some(4096))
88

89 13
        assert received == count
90

91 13
    async with _core.open_nursery() as n:
92 13
        n.start_soon(sender)
93 13
        n.start_soon(reader)
94

95 13
    await read.aclose()
96 13
    await write.aclose()
97

98

99 22
async def test_pipe_errors():
100 13
    with pytest.raises(TypeError):
101 13
        FdStream(None)
102

103 13
    r, w = os.pipe()
104 13
    os.close(w)
105 13
    async with FdStream(r) as s:
106 13
        with pytest.raises(ValueError):
107 13
            await s.receive_some(0)
108

109

110 22
async def test_del():
111 13
    w, r = await make_pipe()
112 13
    f1, f2 = w.fileno(), r.fileno()
113 13
    del w, r
114 13
    gc_collect_harder()
115

116 13
    with pytest.raises(OSError) as excinfo:
117 13
        os.close(f1)
118 13
    assert excinfo.value.errno == errno.EBADF
119

120 13
    with pytest.raises(OSError) as excinfo:
121 13
        os.close(f2)
122 13
    assert excinfo.value.errno == errno.EBADF
123

124

125 22
async def test_async_with():
126 13
    w, r = await make_pipe()
127 13
    async with w, r:
128 13
        pass
129

130 13
    assert w.fileno() == -1
131 13
    assert r.fileno() == -1
132

133 13
    with pytest.raises(OSError) as excinfo:
134 13
        os.close(w.fileno())
135 13
    assert excinfo.value.errno == errno.EBADF
136

137 13
    with pytest.raises(OSError) as excinfo:
138 13
        os.close(r.fileno())
139 13
    assert excinfo.value.errno == errno.EBADF
140

141

142 22
async def test_misdirected_aclose_regression():
143
    # https://github.com/python-trio/trio/issues/661#issuecomment-456582356
144 13
    w, r = await make_pipe()
145 13
    old_r_fd = r.fileno()
146

147
    # Close the original objects
148 13
    await w.aclose()
149 13
    await r.aclose()
150

151
    # Do a little dance to get a new pipe whose receive handle matches the old
152
    # receive handle.
153 13
    r2_fd, w2_fd = os.pipe()
154
    if r2_fd != old_r_fd:  # pragma: no cover
155
        os.dup2(r2_fd, old_r_fd)
156
        os.close(r2_fd)
157 13
    async with FdStream(old_r_fd) as r2:
158 13
        assert r2.fileno() == old_r_fd
159

160
        # And now set up a background task that's working on the new receive
161
        # handle
162 13
        async def expect_eof():
163 13
            assert await r2.receive_some(10) == b""
164

165 13
        async with _core.open_nursery() as nursery:
166 13
            nursery.start_soon(expect_eof)
167 13
            await wait_all_tasks_blocked()
168

169
            # Here's the key test: does calling aclose() again on the *old*
170
            # handle, cause the task blocked on the *new* handle to raise
171
            # ClosedResourceError?
172 13
            await r.aclose()
173 13
            await wait_all_tasks_blocked()
174

175
            # Guess we survived! Close the new write handle so that the task
176
            # gets an EOF and can exit cleanly.
177 13
            os.close(w2_fd)
178

179

180 22
async def test_close_at_bad_time_for_receive_some(monkeypatch):
181
    # We used to have race conditions where if one task was using the pipe,
182
    # and another closed it at *just* the wrong moment, it would give an
183
    # unexpected error instead of ClosedResourceError:
184
    # https://github.com/python-trio/trio/issues/661
185
    #
186
    # This tests what happens if the pipe gets closed in the moment *between*
187
    # when receive_some wakes up, and when it tries to call os.read
188 13
    async def expect_closedresourceerror():
189 13
        with pytest.raises(_core.ClosedResourceError):
190 13
            await r.receive_some(10)
191

192 13
    orig_wait_readable = _core._run.TheIOManager.wait_readable
193

194 13
    async def patched_wait_readable(*args, **kwargs):
195 13
        await orig_wait_readable(*args, **kwargs)
196 13
        await r.aclose()
197

198 13
    monkeypatch.setattr(_core._run.TheIOManager, "wait_readable", patched_wait_readable)
199 13
    s, r = await make_pipe()
200 13
    async with s, r:
201 13
        async with _core.open_nursery() as nursery:
202 13
            nursery.start_soon(expect_closedresourceerror)
203 13
            await wait_all_tasks_blocked()
204
            # Trigger everything by waking up the receiver
205 13
            await s.send_all(b"x")
206

207

208 22
async def test_close_at_bad_time_for_send_all(monkeypatch):
209
    # We used to have race conditions where if one task was using the pipe,
210
    # and another closed it at *just* the wrong moment, it would give an
211
    # unexpected error instead of ClosedResourceError:
212
    # https://github.com/python-trio/trio/issues/661
213
    #
214
    # This tests what happens if the pipe gets closed in the moment *between*
215
    # when send_all wakes up, and when it tries to call os.write
216 13
    async def expect_closedresourceerror():
217 13
        with pytest.raises(_core.ClosedResourceError):
218 13
            await s.send_all(b"x" * 100)
219

220 13
    orig_wait_writable = _core._run.TheIOManager.wait_writable
221

222 13
    async def patched_wait_writable(*args, **kwargs):
223 13
        await orig_wait_writable(*args, **kwargs)
224 13
        await s.aclose()
225

226 13
    monkeypatch.setattr(_core._run.TheIOManager, "wait_writable", patched_wait_writable)
227 13
    s, r = await make_clogged_pipe()
228 13
    async with s, r:
229 13
        async with _core.open_nursery() as nursery:
230 13
            nursery.start_soon(expect_closedresourceerror)
231 13
            await wait_all_tasks_blocked()
232
            # Trigger everything by waking up the sender
233 13
            await r.receive_some(10000)
234

235

236
# On FreeBSD, directories are readable, and we haven't found any other trick
237
# for making an unreadable fd, so there's no way to run this test. Fortunately
238
# the logic this is testing doesn't depend on the platform, so testing on
239
# other platforms is probably good enough.
240 22
@pytest.mark.skipif(
241
    sys.platform.startswith("freebsd"),
242
    reason="no way to make read() return a bizarro error on FreeBSD",
243
)
244 9
async def test_bizarro_OSError_from_receive():
245
    # Make sure that if the read syscall returns some bizarro error, then we
246
    # get a BrokenResourceError. This is incredibly unlikely; there's almost
247
    # no way to trigger a failure here intentionally (except for EBADF, but we
248
    # exploit that to detect file closure, so it takes a different path). So
249
    # we set up a strange scenario where the pipe fd somehow transmutes into a
250
    # directory fd, causing os.read to raise IsADirectoryError (yes, that's a
251
    # real built-in exception type).
252 13
    s, r = await make_pipe()
253 13
    async with s, r:
254 13
        dir_fd = os.open("/", os.O_DIRECTORY, 0)
255 13
        try:
256 13
            os.dup2(dir_fd, r.fileno())
257 13
            with pytest.raises(_core.BrokenResourceError):
258 13
                await r.receive_some(10)
259
        finally:
260 13
            os.close(dir_fd)
261

262

263 22
@skip_if_fbsd_pipes_broken
264 9
async def test_pipe_fully():
265 13
    await check_one_way_stream(make_pipe, make_clogged_pipe)

Read our documentation on viewing source code .

Loading