1 27
import os
2 27
import errno
3

4 27
from ._abc import Stream
5 27
from ._util import ConflictDetector, Final
6

7 27
import trio
8

9 27
if os.name != "posix":
10
    # We raise an error here rather than gating the import in lowlevel.py
11
    # in order to keep jedi static analysis happy.
12 11
    raise ImportError
13

14
# XX TODO: is this a good number? who knows... it does match the default Linux
15
# pipe capacity though.
16 16
DEFAULT_RECEIVE_SIZE = 65536
17

18

19 16
class _FdHolder:
20
    # This class holds onto a raw file descriptor, in non-blocking mode, and
21
    # is responsible for managing its lifecycle. In particular, it's
22
    # responsible for making sure it gets closed, and also for tracking
23
    # whether it's been closed.
24
    #
25
    # The way we track closure is to set the .fd field to -1, discarding the
26
    # original value. You might think that this is a strange idea, since it
27
    # overloads the same field to do two different things. Wouldn't it be more
28
    # natural to have a dedicated .closed field? But that would be more
29
    # error-prone. Fds are represented by small integers, and once an fd is
30
    # closed, its integer value may be reused immediately. If we accidentally
31
    # used the old fd after being closed, we might end up doing something to
32
    # another unrelated fd that happened to get assigned the same integer
33
    # value. By throwing away the integer value immediately, it becomes
34
    # impossible to make this mistake – we'll just get an EBADF.
35
    #
36
    # (This trick was copied from the stdlib socket module.)
37 16
    def __init__(self, fd: int):
38
        # make sure self.fd is always initialized to *something*, because even
39
        # if we error out here then __del__ will run and access it.
40 16
        self.fd = -1
41 27
        if not isinstance(fd, int):
42 16
            raise TypeError("file descriptor must be an int")
43 16
        self.fd = fd
44
        # Store original state, and ensure non-blocking mode is enabled
45 16
        self._original_is_blocking = os.get_blocking(fd)
46 16
        os.set_blocking(fd, False)
47

48 16
    @property
49 6
    def closed(self):
50 16
        return self.fd == -1
51

52 16
    def _raw_close(self):
53
        # This doesn't assume it's in a Trio context, so it can be called from
54
        # __del__. You should never call it from Trio context, because it
55
        # skips calling notify_fd_close. But from __del__, skipping that is
56
        # OK, because notify_fd_close just wakes up other tasks that are
57
        # waiting on this fd, and those tasks hold a reference to this object.
58
        # So if __del__ is being called, we know there aren't any tasks that
59
        # need to be woken.
60 27
        if self.closed:
61 16
            return
62 16
        fd = self.fd
63 16
        self.fd = -1
64 16
        os.set_blocking(fd, self._original_is_blocking)
65 16
        os.close(fd)
66

67 16
    def __del__(self):
68 16
        self._raw_close()
69

70 16
    async def aclose(self):
71 27
        if not self.closed:
72 16
            trio.lowlevel.notify_closing(self.fd)
73 16
            self._raw_close()
74 16
        await trio.lowlevel.checkpoint()
75

76

77 16
class FdStream(Stream, metaclass=Final):
78
    """
79
    Represents a stream given the file descriptor to a pipe, TTY, etc.
80

81
    *fd* must refer to a file that is open for reading and/or writing and
82
    supports non-blocking I/O (pipes and TTYs will work, on-disk files probably
83
    not).  The returned stream takes ownership of the fd, so closing the stream
84
    will close the fd too.  As with `os.fdopen`, you should not directly use
85
    an fd after you have wrapped it in a stream using this function.
86

87
    To be used as a Trio stream, an open file must be placed in non-blocking
88
    mode.  Unfortunately, this impacts all I/O that goes through the
89
    underlying open file, including I/O that uses a different
90
    file descriptor than the one that was passed to Trio. If other threads
91
    or processes are using file descriptors that are related through `os.dup`
92
    or inheritance across `os.fork` to the one that Trio is using, they are
93
    unlikely to be prepared to have non-blocking I/O semantics suddenly
94
    thrust upon them.  For example, you can use
95
    ``FdStream(os.dup(sys.stdin.fileno()))`` to obtain a stream for reading
96
    from standard input, but it is only safe to do so with heavy caveats: your
97
    stdin must not be shared by any other processes and you must not make any
98
    calls to synchronous methods of `sys.stdin` until the stream returned by
99
    `FdStream` is closed. See `issue #174
100
    <https://github.com/python-trio/trio/issues/174>`__ for a discussion of the
101
    challenges involved in relaxing this restriction.
102

103
    Args:
104
      fd (int): The fd to be wrapped.
105

106
    Returns:
107
      A new `FdStream` object.
108
    """
109

110 16
    def __init__(self, fd: int):
111 16
        self._fd_holder = _FdHolder(fd)
112 16
        self._send_conflict_detector = ConflictDetector(
113
            "another task is using this stream for send"
114
        )
115 16
        self._receive_conflict_detector = ConflictDetector(
116
            "another task is using this stream for receive"
117
        )
118

119 16
    async def send_all(self, data: bytes):
120 16
        with self._send_conflict_detector:
121
            # have to check up front, because send_all(b"") on a closed pipe
122
            # should raise
123 27
            if self._fd_holder.closed:
124 16
                raise trio.ClosedResourceError("file was already closed")
125 16
            await trio.lowlevel.checkpoint()
126 16
            length = len(data)
127
            # adapted from the SocketStream code
128 16
            with memoryview(data) as view:
129 16
                sent = 0
130 27
                while sent < length:
131 16
                    with view[sent:] as remaining:
132 16
                        try:
133 16
                            sent += os.write(self._fd_holder.fd, remaining)
134 27
                        except BlockingIOError:
135 16
                            await trio.lowlevel.wait_writable(self._fd_holder.fd)
136 16
                        except OSError as e:
137 27
                            if e.errno == errno.EBADF:
138 16
                                raise trio.ClosedResourceError(
139
                                    "file was already closed"
140
                                ) from None
141
                            else:
142 16
                                raise trio.BrokenResourceError from e
143

144 16
    async def wait_send_all_might_not_block(self) -> None:
145 16
        with self._send_conflict_detector:
146 27
            if self._fd_holder.closed:
147 16
                raise trio.ClosedResourceError("file was already closed")
148 16
            try:
149 16
                await trio.lowlevel.wait_writable(self._fd_holder.fd)
150 16
            except BrokenPipeError as e:
151
                # kqueue: raises EPIPE on wait_writable instead
152
                # of sending, which is annoying
153 3
                raise trio.BrokenResourceError from e
154

155 16
    async def receive_some(self, max_bytes=None) -> bytes:
156 16
        with self._receive_conflict_detector:
157 27
            if max_bytes is None:
158 16
                max_bytes = DEFAULT_RECEIVE_SIZE
159
            else:
160 27
                if not isinstance(max_bytes, int):
161 16
                    raise TypeError("max_bytes must be integer >= 1")
162 27
                if max_bytes < 1:
163 16
                    raise ValueError("max_bytes must be integer >= 1")
164

165 16
            await trio.lowlevel.checkpoint()
166 10
            while True:
167 16
                try:
168 16
                    data = os.read(self._fd_holder.fd, max_bytes)
169 27
                except BlockingIOError:
170 16
                    await trio.lowlevel.wait_readable(self._fd_holder.fd)
171 16
                except OSError as e:
172 27
                    if e.errno == errno.EBADF:
173 16
                        raise trio.ClosedResourceError(
174
                            "file was already closed"
175
                        ) from None
176
                    else:
177 16
                        raise trio.BrokenResourceError from e
178
                else:
179 10
                    break
180

181 16
            return data
182

183 16
    async def aclose(self):
184 16
        await self._fd_holder.aclose()
185

186 16
    def fileno(self):
187 16
        return self._fd_holder.fd

Read our documentation on viewing source code .

Loading