1 22
import os
2 22
import tempfile
3 22
from contextlib import contextmanager
4

5 22
import pytest
6

7 22
on_windows = os.name == "nt"
8
# Mark all the tests in this file as being windows-only
9 22
pytestmark = pytest.mark.skipif(not on_windows, reason="windows only")
10

11 22
from .tutil import slow, gc_collect_harder
12 22
from ... import _core, sleep, move_on_after
13 22
from ...testing import wait_all_tasks_blocked
14

15 22
if on_windows:
16 9
    from .._windows_cffi import (
17
        ffi,
18
        kernel32,
19
        INVALID_HANDLE_VALUE,
20
        raise_winerror,
21
        FileFlags,
22
    )
23

24

25
# The undocumented API that this is testing should be changed to stop using
26
# UnboundedQueue (or just removed until we have time to redo it), but until
27
# then we filter out the warning.
28 22
@pytest.mark.filterwarnings("ignore:.*UnboundedQueue:trio.TrioDeprecationWarning")
29 9
async def test_completion_key_listen():
30 9
    async def post(key):
31 9
        iocp = ffi.cast("HANDLE", _core.current_iocp())
32 22
        for i in range(10):
33 9
            print("post", i)
34 22
            if i % 3 == 0:
35 9
                await _core.checkpoint()
36 9
            success = kernel32.PostQueuedCompletionStatus(iocp, i, key, ffi.NULL)
37 9
            assert success
38

39 9
    with _core.monitor_completion_key() as (key, queue):
40 9
        async with _core.open_nursery() as nursery:
41 9
            nursery.start_soon(post, key)
42 9
            i = 0
43 9
            print("loop")
44 22
            async for batch in queue:  # pragma: no branch
45 9
                print("got some", batch)
46 22
                for info in batch:
47 9
                    assert info.lpOverlapped == 0
48 9
                    assert info.dwNumberOfBytesTransferred == i
49 9
                    i += 1
50 22
                if i == 10:
51 9
                    break
52 9
            print("end loop")
53

54

55 22
async def test_readinto_overlapped():
56 9
    data = b"1" * 1024 + b"2" * 1024 + b"3" * 1024 + b"4" * 1024
57 9
    buffer = bytearray(len(data))
58

59 9
    with tempfile.TemporaryDirectory() as tdir:
60 9
        tfile = os.path.join(tdir, "numbers.txt")
61 9
        with open(tfile, "wb") as fp:
62 9
            fp.write(data)
63 9
            fp.flush()
64

65 9
        rawname = tfile.encode("utf-16le") + b"\0\0"
66 9
        rawname_buf = ffi.from_buffer(rawname)
67 9
        handle = kernel32.CreateFileW(
68
            ffi.cast("LPCWSTR", rawname_buf),
69
            FileFlags.GENERIC_READ,
70
            FileFlags.FILE_SHARE_READ,
71
            ffi.NULL,  # no security attributes
72
            FileFlags.OPEN_EXISTING,
73
            FileFlags.FILE_FLAG_OVERLAPPED,
74
            ffi.NULL,  # no template file
75
        )
76
        if handle == INVALID_HANDLE_VALUE:  # pragma: no cover
77
            raise_winerror()
78

79 9
        try:
80 9
            with memoryview(buffer) as buffer_view:
81

82 9
                async def read_region(start, end):
83 9
                    await _core.readinto_overlapped(
84
                        handle, buffer_view[start:end], start
85
                    )
86

87 9
                _core.register_with_iocp(handle)
88 9
                async with _core.open_nursery() as nursery:
89 22
                    for start in range(0, 4096, 512):
90 9
                        nursery.start_soon(read_region, start, start + 512)
91

92 9
                assert buffer == data
93

94 9
                with pytest.raises(BufferError):
95 9
                    await _core.readinto_overlapped(handle, b"immutable")
96
        finally:
97 9
            kernel32.CloseHandle(handle)
98

99

100 22
@contextmanager
101 9
def pipe_with_overlapped_read():
102 9
    from asyncio.windows_utils import pipe
103 9
    import msvcrt
104

105 9
    read_handle, write_handle = pipe(overlapped=(True, False))
106 9
    try:
107 9
        write_fd = msvcrt.open_osfhandle(write_handle, 0)
108 9
        yield os.fdopen(write_fd, "wb", closefd=False), read_handle
109
    finally:
110 9
        kernel32.CloseHandle(ffi.cast("HANDLE", read_handle))
111 9
        kernel32.CloseHandle(ffi.cast("HANDLE", write_handle))
112

113

114 22
def test_forgot_to_register_with_iocp():
115 9
    with pipe_with_overlapped_read() as (write_fp, read_handle):
116 9
        with write_fp:
117 9
            write_fp.write(b"test\n")
118

119 9
        left_run_yet = False
120

121 9
        async def main():
122 9
            target = bytearray(1)
123 9
            try:
124 9
                async with _core.open_nursery() as nursery:
125 9
                    nursery.start_soon(
126
                        _core.readinto_overlapped, read_handle, target, name="xyz"
127
                    )
128 9
                    await wait_all_tasks_blocked()
129 9
                    nursery.cancel_scope.cancel()
130
            finally:
131
                # Run loop is exited without unwinding running tasks, so
132
                # we don't get here until the main() coroutine is GC'ed
133 9
                assert left_run_yet
134

135 9
        with pytest.raises(_core.TrioInternalError) as exc_info:
136 9
            _core.run(main)
137 9
        left_run_yet = True
138 9
        assert "Failed to cancel overlapped I/O in xyz " in str(exc_info.value)
139 9
        assert "forget to call register_with_iocp()?" in str(exc_info.value)
140

141
        # Make sure the Nursery.__del__ assertion about dangling children
142
        # gets put with the correct test
143 9
        del exc_info
144 9
        gc_collect_harder()
145

146

147 22
@slow
148 9
async def test_too_late_to_cancel():
149 9
    import time
150

151 9
    with pipe_with_overlapped_read() as (write_fp, read_handle):
152 9
        _core.register_with_iocp(read_handle)
153 9
        target = bytearray(6)
154 9
        async with _core.open_nursery() as nursery:
155
            # Start an async read in the background
156 9
            nursery.start_soon(_core.readinto_overlapped, read_handle, target)
157 9
            await wait_all_tasks_blocked()
158

159
            # Synchronous write to the other end of the pipe
160 9
            with write_fp:
161 9
                write_fp.write(b"test1\ntest2\n")
162

163
            # Note: not trio.sleep! We're making sure the OS level
164
            # ReadFile completes, before Trio has a chance to execute
165
            # another checkpoint and notice it completed.
166 9
            time.sleep(1)
167 9
            nursery.cancel_scope.cancel()
168 9
        assert target[:6] == b"test1\n"
169

170
        # Do another I/O to make sure we've actually processed the
171
        # fallback completion that was posted when CancelIoEx failed.
172 9
        assert await _core.readinto_overlapped(read_handle, target) == 6
173 9
        assert target[:6] == b"test2\n"
174

175

176 22
def test_lsp_that_hooks_select_gives_good_error(monkeypatch):
177 9
    from .._windows_cffi import WSAIoctls, _handle
178 9
    from .. import _io_windows
179

180 9
    def patched_get_underlying(sock, *, which=WSAIoctls.SIO_BASE_HANDLE):
181 22
        if hasattr(sock, "fileno"):  # pragma: no branch
182 9
            sock = sock.fileno()
183 22
        if which == WSAIoctls.SIO_BSP_HANDLE_SELECT:
184 9
            return _handle(sock + 1)
185
        else:
186 9
            return _handle(sock)
187

188 9
    monkeypatch.setattr(_io_windows, "_get_underlying_socket", patched_get_underlying)
189 9
    with pytest.raises(
190
        RuntimeError, match="SIO_BASE_HANDLE and SIO_BSP_HANDLE_SELECT differ"
191
    ):
192 9
        _core.run(sleep, 0)
193

194

195 22
def test_lsp_that_completely_hides_base_socket_gives_good_error(monkeypatch):
196
    # This tests behavior with an LSP that fails SIO_BASE_HANDLE and returns
197
    # self for SIO_BSP_HANDLE_SELECT (like Komodia), but also returns
198
    # self for SIO_BSP_HANDLE_POLL. No known LSP does this, but we want to
199
    # make sure we get an error rather than an infinite loop.
200

201 9
    from .._windows_cffi import WSAIoctls, _handle
202 9
    from .. import _io_windows
203

204 9
    def patched_get_underlying(sock, *, which=WSAIoctls.SIO_BASE_HANDLE):
205 22
        if hasattr(sock, "fileno"):  # pragma: no branch
206 9
            sock = sock.fileno()
207 22
        if which == WSAIoctls.SIO_BASE_HANDLE:
208 9
            raise OSError("nope")
209
        else:
210 9
            return _handle(sock)
211

212 9
    monkeypatch.setattr(_io_windows, "_get_underlying_socket", patched_get_underlying)
213 9
    with pytest.raises(
214
        RuntimeError,
215
        match="SIO_BASE_HANDLE failed and SIO_BSP_HANDLE_POLL didn't return a diff",
216
    ):
217 9
        _core.run(sleep, 0)

Read our documentation on viewing source code .

Loading