1 27
import errno
2 27
import logging
3 27
import os
4

5 27
import trio
6

7
# Errors that accept(2) can return, and which indicate that the system is
8
# overloaded
9 27
ACCEPT_CAPACITY_ERRNOS = {
10
    errno.EMFILE,
11
    errno.ENFILE,
12
    errno.ENOMEM,
13
    errno.ENOBUFS,
14
}
15

16
# How long to sleep when we get one of those errors
17 27
SLEEP_TIME = 0.100
18

19
# The logger we use to complain when this happens
20 27
LOGGER = logging.getLogger("trio.serve_listeners")
21

22

23 27
async def _run_handler(stream, handler):
24 27
    try:
25 27
        await handler(stream)
26
    finally:
27 27
        await trio.aclose_forcefully(stream)
28

29

30 27
async def _serve_one_listener(listener, handler_nursery, handler):
31 27
    async with listener:
32 14
        while True:
33 27
            try:
34 27
                stream = await listener.accept()
35 27
            except OSError as exc:
36 27
                if exc.errno in ACCEPT_CAPACITY_ERRNOS:
37 27
                    LOGGER.error(
38
                        "accept returned %s (%s); retrying in %s seconds",
39
                        errno.errorcode[exc.errno],
40
                        os.strerror(exc.errno),
41
                        SLEEP_TIME,
42
                        exc_info=True,
43
                    )
44 27
                    await trio.sleep(SLEEP_TIME)
45
                else:
46 27
                    raise
47
            else:
48 27
                handler_nursery.start_soon(_run_handler, stream, handler)
49

50

51 27
async def serve_listeners(
52
    handler, listeners, *, handler_nursery=None, task_status=trio.TASK_STATUS_IGNORED
53
):
54
    r"""Listen for incoming connections on ``listeners``, and for each one
55
    start a task running ``handler(stream)``.
56

57
    .. warning::
58

59
       If ``handler`` raises an exception, then this function doesn't do
60
       anything special to catch it – so by default the exception will
61
       propagate out and crash your server. If you don't want this, then catch
62
       exceptions inside your ``handler``, or use a ``handler_nursery`` object
63
       that responds to exceptions in some other way.
64

65
    Args:
66

67
      handler: An async callable, that will be invoked like
68
          ``handler_nursery.start_soon(handler, stream)`` for each incoming
69
          connection.
70

71
      listeners: A list of :class:`~trio.abc.Listener` objects.
72
          :func:`serve_listeners` takes responsibility for closing them.
73

74
      handler_nursery: The nursery used to start handlers, or any object with
75
          a ``start_soon`` method. If ``None`` (the default), then
76
          :func:`serve_listeners` will create a new nursery internally and use
77
          that.
78

79
      task_status: This function can be used with ``nursery.start``, which
80
          will return ``listeners``.
81

82
    Returns:
83

84
      This function never returns unless cancelled.
85

86
    Resource handling:
87

88
      If ``handler`` neglects to close the ``stream``, then it will be closed
89
      using :func:`trio.aclose_forcefully`.
90

91
    Error handling:
92

93
      Most errors coming from :meth:`~trio.abc.Listener.accept` are allowed to
94
      propagate out (crashing the server in the process). However, some errors –
95
      those which indicate that the server is temporarily overloaded – are
96
      handled specially. These are :class:`OSError`\s with one of the following
97
      errnos:
98

99
      * ``EMFILE``: process is out of file descriptors
100
      * ``ENFILE``: system is out of file descriptors
101
      * ``ENOBUFS``, ``ENOMEM``: the kernel hit some sort of memory limitation
102
        when trying to create a socket object
103

104
      When :func:`serve_listeners` gets one of these errors, then it:
105

106
      * Logs the error to the standard library logger ``trio.serve_listeners``
107
        (level = ERROR, with exception information included). By default this
108
        causes it to be printed to stderr.
109
      * Waits 100 ms before calling ``accept`` again, in hopes that the
110
        system will recover.
111

112
    """
113 27
    async with trio.open_nursery() as nursery:
114 27
        if handler_nursery is None:
115 27
            handler_nursery = nursery
116 27
        for listener in listeners:
117 27
            nursery.start_soon(_serve_one_listener, listener, handler_nursery, handler)
118
        # The listeners are already queueing connections when we're called,
119
        # but we wait until the end to call started() just in case we get an
120
        # error or whatever.
121 27
        task_status.started(listeners)

Read our documentation on viewing source code .

Loading