1 11
import sys
2 11
from typing import TYPE_CHECKING
3 11
from . import _core
4 11
from ._abc import SendStream, ReceiveStream
5 11
from ._util import ConflictDetector, Final
6 11
from ._core._windows_cffi import _handle, raise_winerror, kernel32, ffi
7

8 11
assert sys.platform == "win32" or not TYPE_CHECKING
9

10
# XX TODO: don't just make this up based on nothing.
11 11
DEFAULT_RECEIVE_SIZE = 65536
12

13

14
# See the comments on _unix_pipes._FdHolder for discussion of why we set the
15
# handle to -1 when it's closed.
16 11
class _HandleHolder:
17 11
    def __init__(self, handle: int) -> None:
18 11
        self.handle = -1
19 27
        if not isinstance(handle, int):
20 11
            raise TypeError("handle must be an int")
21 11
        self.handle = handle
22 11
        _core.register_with_iocp(self.handle)
23

24 11
    @property
25 7
    def closed(self):
26 11
        return self.handle == -1
27

28 11
    def _close(self):
29 27
        if self.closed:
30 11
            return
31 11
        handle = self.handle
32 11
        self.handle = -1
33 27
        if not kernel32.CloseHandle(_handle(handle)):
34 11
            raise_winerror()
35

36 11
    async def aclose(self):
37 11
        self._close()
38 11
        await _core.checkpoint()
39

40 11
    def __del__(self):
41 11
        self._close()
42

43

44 11
class PipeSendStream(SendStream, metaclass=Final):
45
    """Represents a send stream over a Windows named pipe that has been
46
    opened in OVERLAPPED mode.
47
    """
48

49 11
    def __init__(self, handle: int) -> None:
50 11
        self._handle_holder = _HandleHolder(handle)
51 11
        self._conflict_detector = ConflictDetector(
52
            "another task is currently using this pipe"
53
        )
54

55 11
    async def send_all(self, data: bytes):
56 11
        with self._conflict_detector:
57 27
            if self._handle_holder.closed:
58 11
                raise _core.ClosedResourceError("this pipe is already closed")
59

60 27
            if not data:
61 11
                await _core.checkpoint()
62 11
                return
63

64 11
            try:
65 11
                written = await _core.write_overlapped(self._handle_holder.handle, data)
66 11
            except BrokenPipeError as ex:
67 11
                raise _core.BrokenResourceError from ex
68
            # By my reading of MSDN, this assert is guaranteed to pass so long
69
            # as the pipe isn't in nonblocking mode, but... let's just
70
            # double-check.
71 11
            assert written == len(data)
72

73 11
    async def wait_send_all_might_not_block(self) -> None:
74 11
        with self._conflict_detector:
75 27
            if self._handle_holder.closed:
76 11
                raise _core.ClosedResourceError("This pipe is already closed")
77

78
            # not implemented yet, and probably not needed
79 11
            await _core.checkpoint()
80

81 11
    async def aclose(self):
82 11
        await self._handle_holder.aclose()
83

84

85 11
class PipeReceiveStream(ReceiveStream, metaclass=Final):
86
    """Represents a receive stream over an os.pipe object."""
87

88 11
    def __init__(self, handle: int) -> None:
89 11
        self._handle_holder = _HandleHolder(handle)
90 11
        self._conflict_detector = ConflictDetector(
91
            "another task is currently using this pipe"
92
        )
93

94 11
    async def receive_some(self, max_bytes=None) -> bytes:
95 11
        with self._conflict_detector:
96 27
            if self._handle_holder.closed:
97 11
                raise _core.ClosedResourceError("this pipe is already closed")
98

99 27
            if max_bytes is None:
100 11
                max_bytes = DEFAULT_RECEIVE_SIZE
101
            else:
102 27
                if not isinstance(max_bytes, int):
103 11
                    raise TypeError("max_bytes must be integer >= 1")
104 27
                if max_bytes < 1:
105 11
                    raise ValueError("max_bytes must be integer >= 1")
106

107 11
            buffer = bytearray(max_bytes)
108 11
            try:
109 11
                size = await _core.readinto_overlapped(
110
                    self._handle_holder.handle, buffer
111
                )
112 11
            except BrokenPipeError:
113 27
                if self._handle_holder.closed:
114 11
                    raise _core.ClosedResourceError(
115
                        "another task closed this pipe"
116
                    ) from None
117

118
                # Windows raises BrokenPipeError on one end of a pipe
119
                # whenever the other end closes, regardless of direction.
120
                # Convert this to the Unix behavior of returning EOF to the
121
                # reader when the writer closes.
122
                #
123
                # And since we're not raising an exception, we have to
124
                # checkpoint. But readinto_overlapped did raise an exception,
125
                # so it might not have checkpointed for us. So we have to
126
                # checkpoint manually.
127 11
                await _core.checkpoint()
128 11
                return b""
129
            else:
130 11
                del buffer[size:]
131 11
                return buffer
132

133 11
    async def aclose(self):
134 11
        await self._handle_holder.aclose()

Read our documentation on viewing source code .

Loading