1
# "High-level" networking interface
2

3 27
import errno
4 27
from contextlib import contextmanager
5

6 27
import trio
7 27
from . import socket as tsocket
8 27
from ._util import ConflictDetector, Final
9 27
from .abc import HalfCloseableStream, Listener
10

11
# XX TODO: this number was picked arbitrarily. We should do experiments to
12
# tune it. (Or make it dynamic -- one idea is to start small and increase it
13
# if we observe single reads filling up the whole buffer, at least within some
14
# limits.)
15 27
DEFAULT_RECEIVE_SIZE = 65536
16

17 27
_closed_stream_errnos = {
18
    # Unix
19
    errno.EBADF,
20
    # Windows
21
    errno.ENOTSOCK,
22
}
23

24

25 27
@contextmanager
26 13
def _translate_socket_errors_to_stream_errors():
27 27
    try:
28 27
        yield
29 27
    except OSError as exc:
30 27
        if exc.errno in _closed_stream_errnos:
31 27
            raise trio.ClosedResourceError("this socket was already closed") from None
32
        else:
33 27
            raise trio.BrokenResourceError(
34
                "socket connection broken: {}".format(exc)
35
            ) from exc
36

37

38 27
class SocketStream(HalfCloseableStream, metaclass=Final):
39
    """An implementation of the :class:`trio.abc.HalfCloseableStream`
40
    interface based on a raw network socket.
41

42
    Args:
43
      socket: The Trio socket object to wrap. Must have type ``SOCK_STREAM``,
44
          and be connected.
45

46
    By default for TCP sockets, :class:`SocketStream` enables ``TCP_NODELAY``,
47
    and (on platforms where it's supported) enables ``TCP_NOTSENT_LOWAT`` with
48
    a reasonable buffer size (currently 16 KiB) – see `issue #72
49
    <https://github.com/python-trio/trio/issues/72>`__ for discussion. You can
50
    of course override these defaults by calling :meth:`setsockopt`.
51

52
    Once a :class:`SocketStream` object is constructed, it implements the full
53
    :class:`trio.abc.HalfCloseableStream` interface. In addition, it provides
54
    a few extra features:
55

56
    .. attribute:: socket
57

58
       The Trio socket object that this stream wraps.
59

60
    """
61

62 27
    def __init__(self, socket):
63 27
        if not isinstance(socket, tsocket.SocketType):
64 27
            raise TypeError("SocketStream requires a Trio socket object")
65 27
        if socket.type != tsocket.SOCK_STREAM:
66 27
            raise ValueError("SocketStream requires a SOCK_STREAM socket")
67

68 27
        self.socket = socket
69 27
        self._send_conflict_detector = ConflictDetector(
70
            "another task is currently sending data on this SocketStream"
71
        )
72

73
        # Socket defaults:
74

75
        # Not supported on e.g. unix domain sockets
76 27
        try:
77 27
            self.setsockopt(tsocket.IPPROTO_TCP, tsocket.TCP_NODELAY, True)
78 27
        except OSError:
79 16
            pass
80

81 27
        if hasattr(tsocket, "TCP_NOTSENT_LOWAT"):
82 16
            try:
83
                # 16 KiB is pretty arbitrary and could probably do with some
84
                # tuning. (Apple is also setting this by default in CFNetwork
85
                # apparently -- I'm curious what value they're using, though I
86
                # couldn't find it online trivially. CFNetwork-129.20 source
87
                # has no mentions of TCP_NOTSENT_LOWAT. This presentation says
88
                # "typically 8 kilobytes":
89
                # http://devstreaming.apple.com/videos/wwdc/2015/719ui2k57m/719/719_your_app_and_next_generation_networks.pdf?dl=1
90
                # ). The theory is that you want it to be bandwidth *
91
                # rescheduling interval.
92 16
                self.setsockopt(tsocket.IPPROTO_TCP, tsocket.TCP_NOTSENT_LOWAT, 2 ** 14)
93 16
            except OSError:
94 16
                pass
95

96 27
    async def send_all(self, data):
97 27
        if self.socket.did_shutdown_SHUT_WR:
98 27
            raise trio.ClosedResourceError("can't send data after sending EOF")
99 27
        with self._send_conflict_detector:
100 27
            with _translate_socket_errors_to_stream_errors():
101 27
                with memoryview(data) as data:
102 27
                    if not data:
103 27
                        if self.socket.fileno() == -1:
104 27
                            raise trio.ClosedResourceError("socket was already closed")
105 27
                        await trio.lowlevel.checkpoint()
106 27
                        return
107 27
                    total_sent = 0
108 27
                    while total_sent < len(data):
109 27
                        with data[total_sent:] as remaining:
110 27
                            sent = await self.socket.send(remaining)
111 27
                        total_sent += sent
112

113 27
    async def wait_send_all_might_not_block(self):
114 27
        with self._send_conflict_detector:
115 27
            if self.socket.fileno() == -1:
116 27
                raise trio.ClosedResourceError
117 27
            with _translate_socket_errors_to_stream_errors():
118 27
                await self.socket.wait_writable()
119

120 27
    async def send_eof(self):
121 27
        with self._send_conflict_detector:
122 27
            await trio.lowlevel.checkpoint()
123
            # On macOS, calling shutdown a second time raises ENOTCONN, but
124
            # send_eof needs to be idempotent.
125 27
            if self.socket.did_shutdown_SHUT_WR:
126 27
                return
127 27
            with _translate_socket_errors_to_stream_errors():
128 27
                self.socket.shutdown(tsocket.SHUT_WR)
129

130 27
    async def receive_some(self, max_bytes=None):
131 27
        if max_bytes is None:
132 27
            max_bytes = DEFAULT_RECEIVE_SIZE
133 27
        if max_bytes < 1:
134 27
            raise ValueError("max_bytes must be >= 1")
135 27
        with _translate_socket_errors_to_stream_errors():
136 27
            return await self.socket.recv(max_bytes)
137

138 27
    async def aclose(self):
139 27
        self.socket.close()
140 27
        await trio.lowlevel.checkpoint()
141

142
    # __aenter__, __aexit__ inherited from HalfCloseableStream are OK
143

144 27
    def setsockopt(self, level, option, value):
145
        """Set an option on the underlying socket.
146

147
        See :meth:`socket.socket.setsockopt` for details.
148

149
        """
150 27
        return self.socket.setsockopt(level, option, value)
151

152 27
    def getsockopt(self, level, option, buffersize=0):
153
        """Check the current value of an option on the underlying socket.
154

155
        See :meth:`socket.socket.getsockopt` for details.
156

157
        """
158
        # This is to work around
159
        #   https://bitbucket.org/pypy/pypy/issues/2561
160
        # We should be able to drop it when the next PyPy3 beta is released.
161 27
        if buffersize == 0:
162 27
            return self.socket.getsockopt(level, option)
163
        else:
164 27
            return self.socket.getsockopt(level, option, buffersize)
165

166

167
################################################################
168
# SocketListener
169
################################################################
170

171
# Accept error handling
172
# =====================
173
#
174
# Literature review
175
# -----------------
176
#
177
# Here's a list of all the possible errors that accept() can return, according
178
# to the POSIX spec or the Linux, FreeBSD, macOS, and Windows docs:
179
#
180
# Can't happen with a Trio socket:
181
# - EAGAIN/(WSA)EWOULDBLOCK
182
# - EINTR
183
# - WSANOTINITIALISED
184
# - WSAEINPROGRESS: a blocking call is already in progress
185
# - WSAEINTR: someone called WSACancelBlockingCall, but we don't make blocking
186
#   calls in the first place
187
#
188
# Something is wrong with our call:
189
# - EBADF: not a file descriptor
190
# - (WSA)EINVAL: socket isn't listening, or (Linux, BSD) bad flags
191
# - (WSA)ENOTSOCK: not a socket
192
# - (WSA)EOPNOTSUPP: this kind of socket doesn't support accept
193
# - (Linux, FreeBSD, Windows) EFAULT: the sockaddr pointer points to readonly
194
#   memory
195
#
196
# Something is wrong with the environment:
197
# - (WSA)EMFILE: this process hit its fd limit
198
# - ENFILE: the system hit its fd limit
199
# - (WSA)ENOBUFS, ENOMEM: unspecified memory problems
200
#
201
# Something is wrong with the connection we were going to accept. There's a
202
# ton of variability between systems here:
203
# - ECONNABORTED: documented everywhere, but apparently only the BSDs do this
204
#   (signals a connection was closed/reset before being accepted)
205
# - EPROTO: unspecified protocol error
206
# - (Linux) EPERM: firewall rule prevented connection
207
# - (Linux) ENETDOWN, EPROTO, ENOPROTOOPT, EHOSTDOWN, ENONET, EHOSTUNREACH,
208
#   EOPNOTSUPP, ENETUNREACH, ENOSR, ESOCKTNOSUPPORT, EPROTONOSUPPORT,
209
#   ETIMEDOUT, ... or any other error that the socket could give, because
210
#   apparently if an error happens on a connection before it's accept()ed,
211
#   Linux will report that error from accept().
212
# - (Windows) WSAECONNRESET, WSAENETDOWN
213
#
214
#
215
# Code review
216
# -----------
217
#
218
# What do other libraries do?
219
#
220
# Twisted on Unix or when using nonblocking I/O on Windows:
221
# - ignores EPERM, with comment about Linux firewalls
222
# - logs and ignores EMFILE, ENOBUFS, ENFILE, ENOMEM, ECONNABORTED
223
#   Comment notes that ECONNABORTED is a BSDism and that Linux returns the
224
#   socket before having it fail, and macOS just silently discards it.
225
# - other errors are raised, which is logged + kills the socket
226
# ref: src/twisted/internet/tcp.py, Port.doRead
227
#
228
# Twisted using IOCP on Windows:
229
# - logs and ignores all errors
230
# ref: src/twisted/internet/iocpreactor/tcp.py, Port.handleAccept
231
#
232
# Tornado:
233
# - ignore ECONNABORTED (comments notes that it was observed on FreeBSD)
234
# - everything else raised, but all this does (by default) is cause it to be
235
#   logged and then ignored
236
# (ref: tornado/netutil.py, tornado/ioloop.py)
237
#
238
# libuv on Unix:
239
# - ignores ECONNABORTED
240
# - does a "trick" for EMFILE or ENFILE
241
# - all other errors passed to the connection_cb to be handled
242
# (ref: src/unix/stream.c:uv__server_io, uv__emfile_trick)
243
#
244
# libuv on Windows:
245
# src/win/tcp.c:uv_tcp_queue_accept
246
#   this calls AcceptEx, and then arranges to call:
247
# src/win/tcp.c:uv_process_tcp_accept_req
248
#   this gets the result from AcceptEx. If the original AcceptEx call failed,
249
#   then "we stop accepting connections and report this error to the
250
#   connection callback". I think this is for things like ENOTSOCK. If
251
#   AcceptEx successfully queues an overlapped operation, and then that
252
#   reports an error, it's just discarded.
253
#
254
# asyncio, selector mode:
255
# - ignores EWOULDBLOCK, EINTR, ECONNABORTED
256
# - on EMFILE, ENFILE, ENOBUFS, ENOMEM, logs an error and then disables the
257
#   listening loop for 1 second
258
# - everything else raises, but then the event loop just logs and ignores it
259
# (selector_events.py: BaseSelectorEventLoop._accept_connection)
260
#
261
#
262
# What should we do?
263
# ------------------
264
#
265
# When accept() returns an error, we can either ignore it or raise it.
266
#
267
# We have a long list of errors that should be ignored, and a long list of
268
# errors that should be raised. The big question is what to do with an error
269
# that isn't on either list. On Linux apparently you can get nearly arbitrary
270
# errors from accept() and they should be ignored, because it just indicates a
271
# socket that crashed before it began, and there isn't really anything to be
272
# done about this, plus on other platforms you may not get any indication at
273
# all, so programs have to tolerate not getting any indication too. OTOH if we
274
# get an unexpected error then it could indicate something arbitrarily bad --
275
# after all, it's unexpected.
276
#
277
# Given that we know that other libraries seem to be getting along fine with a
278
# fairly minimal list of errors to ignore, I think we'll be OK if we write
279
# down that list and then raise on everything else.
280
#
281
# The other question is what to do about the capacity problem errors: EMFILE,
282
# ENFILE, ENOBUFS, ENOMEM. Just flat out ignoring these is clearly not optimal
283
# -- at the very least you want to log them, and probably you want to take
284
# some remedial action. And if we ignore them then it prevents higher levels
285
# from doing anything clever with them. So we raise them.
286

287 27
_ignorable_accept_errno_names = [
288
    # Linux can do this when the a connection is denied by the firewall
289
    "EPERM",
290
    # BSDs with an early close/reset
291
    "ECONNABORTED",
292
    # All the other miscellany noted above -- may not happen in practice, but
293
    # whatever.
294
    "EPROTO",
295
    "ENETDOWN",
296
    "ENOPROTOOPT",
297
    "EHOSTDOWN",
298
    "ENONET",
299
    "EHOSTUNREACH",
300
    "EOPNOTSUPP",
301
    "ENETUNREACH",
302
    "ENOSR",
303
    "ESOCKTNOSUPPORT",
304
    "EPROTONOSUPPORT",
305
    "ETIMEDOUT",
306
    "ECONNRESET",
307
]
308

309
# Not all errnos are defined on all platforms
310 27
_ignorable_accept_errnos = set()
311 27
for name in _ignorable_accept_errno_names:
312 27
    try:
313 27
        _ignorable_accept_errnos.add(getattr(errno, name))
314 15
    except AttributeError:
315 15
        pass
316

317

318 27
class SocketListener(Listener[SocketStream], metaclass=Final):
319
    """A :class:`~trio.abc.Listener` that uses a listening socket to accept
320
    incoming connections as :class:`SocketStream` objects.
321

322
    Args:
323
      socket: The Trio socket object to wrap. Must have type ``SOCK_STREAM``,
324
          and be listening.
325

326
    Note that the :class:`SocketListener` "takes ownership" of the given
327
    socket; closing the :class:`SocketListener` will also close the socket.
328

329
    .. attribute:: socket
330

331
       The Trio socket object that this stream wraps.
332

333
    """
334

335 27
    def __init__(self, socket):
336 27
        if not isinstance(socket, tsocket.SocketType):
337 27
            raise TypeError("SocketListener requires a Trio socket object")
338 27
        if socket.type != tsocket.SOCK_STREAM:
339 27
            raise ValueError("SocketListener requires a SOCK_STREAM socket")
340 27
        try:
341 27
            listening = socket.getsockopt(tsocket.SOL_SOCKET, tsocket.SO_ACCEPTCONN)
342 4
        except OSError:
343
            # SO_ACCEPTCONN fails on macOS; we just have to trust the user.
344 4
            pass
345
        else:
346 27
            if not listening:
347 23
                raise ValueError("SocketListener requires a listening socket")
348

349 27
        self.socket = socket
350

351 27
    async def accept(self):
352
        """Accept an incoming connection.
353

354
        Returns:
355
          :class:`SocketStream`
356

357
        Raises:
358
          OSError: if the underlying call to ``accept`` raises an unexpected
359
              error.
360
          ClosedResourceError: if you already closed the socket.
361

362
        This method handles routine errors like ``ECONNABORTED``, but passes
363
        other errors on to its caller. In particular, it does *not* make any
364
        special effort to handle resource exhaustion errors like ``EMFILE``,
365
        ``ENFILE``, ``ENOBUFS``, ``ENOMEM``.
366

367
        """
368 14
        while True:
369 27
            try:
370 27
                sock, _ = await self.socket.accept()
371 27
            except OSError as exc:
372 27
                if exc.errno in _closed_stream_errnos:
373 27
                    raise trio.ClosedResourceError
374 27
                if exc.errno not in _ignorable_accept_errnos:
375 27
                    raise
376
            else:
377 27
                return SocketStream(sock)
378

379 27
    async def aclose(self):
380
        """Close this listener and its underlying socket."""
381 27
        self.socket.close()
382 27
        await trio.lowlevel.checkpoint()

Read our documentation on viewing source code .

Loading