1 26
import asyncio
2 26
import enum
3 26
import sys
4 26
import warnings
5 26
from types import TracebackType
6 26
from typing import Any, Optional, Type
7

8 26
from typing_extensions import final
9

10

11 26
__version__ = "4.0.0a3"
12

13

14 26
__all__ = ("timeout", "timeout_at")
15

16

17 26
def timeout(delay: Optional[float]) -> "Timeout":
18
    """timeout context manager.
19

20
    Useful in cases when you want to apply timeout logic around block
21
    of code or in cases when asyncio.wait_for is not suitable. For example:
22

23
    >>> async with timeout(0.001):
24
    ...     async with aiohttp.get('https://github.com') as r:
25
    ...         await r.text()
26

27

28
    delay - value in seconds or None to disable timeout logic
29
    """
30 26
    loop = _get_running_loop()
31 26
    if delay is not None:
32 26
        deadline = loop.time() + delay  # type: Optional[float]
33
    else:
34 26
        deadline = None
35 26
    return Timeout(deadline, loop)
36

37

38 26
def timeout_at(deadline: Optional[float]) -> "Timeout":
39
    """Schedule the timeout at absolute time.
40

41
    deadline arguments points on the time in the same clock system
42
    as loop.time().
43

44
    Please note: it is not POSIX time but a time with
45
    undefined starting base, e.g. the time of the system power on.
46

47
    >>> async with timeout_at(loop.time() + 10):
48
    ...     async with aiohttp.get('https://github.com') as r:
49
    ...         await r.text()
50

51

52
    """
53 26
    loop = _get_running_loop()
54 26
    return Timeout(deadline, loop)
55

56

57 26
class _State(enum.Enum):
58 26
    INIT = "INIT"
59 26
    ENTER = "ENTER"
60 26
    TIMEOUT = "TIMEOUT"
61 26
    EXIT = "EXIT"
62

63

64 26
@final
65 12
class Timeout:
66
    # Internal class, please don't instantiate it directly
67
    # Use timeout() and timeout_at() public factories instead.
68
    #
69
    # Implementation note: `async with timeout()` is preferred
70
    # over `with timeout()`.
71
    # While technically the Timeout class implementation
72
    # doesn't need to be async at all,
73
    # the `async with` statement explicitly points that
74
    # the context manager should be used from async function context.
75
    #
76
    # This design allows to avoid many silly misusages.
77
    #
78
    # TimeoutError is raised immadiatelly when scheduled
79
    # if the deadline is passed.
80
    # The purpose is to time out as sson as possible
81
    # without waiting for the next await expression.
82

83 26
    __slots__ = ("_deadline", "_loop", "_state", "_task", "_timeout_handler")
84

85 26
    def __init__(
86
        self, deadline: Optional[float], loop: asyncio.AbstractEventLoop
87
    ) -> None:
88 26
        self._loop = loop
89 26
        self._state = _State.INIT
90

91 26
        task = _current_task(self._loop)
92 26
        self._task = task
93

94 26
        self._timeout_handler = None  # type: Optional[asyncio.Handle]
95 26
        if deadline is None:
96 26
            self._deadline = None  # type: Optional[float]
97
        else:
98 26
            self.shift_to(deadline)
99

100 26
    def __enter__(self) -> "Timeout":
101 26
        warnings.warn(
102
            "with timeout() is deprecated, use async with timeout() instead",
103
            DeprecationWarning,
104
            stacklevel=2,
105
        )
106 26
        self._do_enter()
107 26
        return self
108

109 26
    def __exit__(
110
        self,
111
        exc_type: Type[BaseException],
112
        exc_val: BaseException,
113
        exc_tb: TracebackType,
114
    ) -> Optional[bool]:
115 26
        self._do_exit(exc_type)
116 26
        return None
117

118 26
    async def __aenter__(self) -> "Timeout":
119 26
        self._do_enter()
120 26
        return self
121

122 26
    async def __aexit__(
123
        self,
124
        exc_type: Type[BaseException],
125
        exc_val: BaseException,
126
        exc_tb: TracebackType,
127
    ) -> Optional[bool]:
128 26
        self._do_exit(exc_type)
129 26
        return None
130

131 26
    @property
132 26
    def expired(self) -> bool:
133
        """Is timeout expired during execution?"""
134 26
        return self._state == _State.TIMEOUT
135

136 26
    @property
137 26
    def deadline(self) -> Optional[float]:
138 26
        return self._deadline
139

140 26
    def reject(self) -> None:
141
        """Reject scheduled timeout if any."""
142
        # cancel is maybe better name but
143
        # task.cancel() raises CancelledError in asyncio world.
144 26
        if self._state not in (_State.INIT, _State.ENTER):
145 26
            raise RuntimeError("invalid state {}".format(self._state.value))
146 26
        self._reject()
147

148 26
    def _reject(self) -> None:
149 26
        if self._timeout_handler is not None:
150 26
            self._timeout_handler.cancel()
151 26
            self._timeout_handler = None
152

153 26
    def shift_by(self, delay: float) -> None:
154
        """Advance timeout on delay seconds.
155

156
        The delay can be negative.
157
        """
158 26
        now = self._loop.time()
159 26
        self.shift_to(now + delay)
160

161 26
    def shift_to(self, deadline: float) -> None:
162
        """Advance timeout on the abdelay seconds.
163

164
        If new deadline is in the past
165
        the timeout is raised immediatelly.
166
        """
167 26
        if self._state == _State.EXIT:
168 26
            raise RuntimeError("cannot reschedule after exit from context manager")
169 26
        if self._state == _State.TIMEOUT:
170 26
            raise RuntimeError("cannot reschedule expired timeout")
171 26
        if self._timeout_handler is not None:
172 26
            self._timeout_handler.cancel()
173 26
        self._deadline = deadline
174 26
        now = self._loop.time()
175 26
        if deadline <= now:
176 26
            self._timeout_handler = None
177 26
            if self._state == _State.INIT:
178 26
                raise asyncio.TimeoutError
179
            else:
180
                # state is ENTER
181 26
                raise asyncio.CancelledError
182 26
        self._timeout_handler = self._loop.call_at(
183
            deadline, self._on_timeout, self._task
184
        )
185

186 26
    def _do_enter(self) -> None:
187 26
        if self._state != _State.INIT:
188 26
            raise RuntimeError("invalid state {}".format(self._state.value))
189 26
        self._state = _State.ENTER
190

191 26
    def _do_exit(self, exc_type: Type[BaseException]) -> None:
192 26
        if exc_type is asyncio.CancelledError and self._state == _State.TIMEOUT:
193 26
            self._timeout_handler = None
194 26
            raise asyncio.TimeoutError
195
        # timeout is not expired
196 26
        self._state = _State.EXIT
197 26
        self._reject()
198 26
        return None
199

200 26
    def _on_timeout(self, task: "asyncio.Task[None]") -> None:
201 26
        task.cancel()
202 26
        self._state = _State.TIMEOUT
203

204

205 26
def _current_task(loop: asyncio.AbstractEventLoop) -> "Optional[asyncio.Task[Any]]":
206 26
    if sys.version_info >= (3, 7):
207 18
        return asyncio.current_task(loop=loop)
208
    else:
209 8
        return asyncio.Task.current_task(loop=loop)
210

211

212 26
def _get_running_loop() -> asyncio.AbstractEventLoop:
213 26
    if sys.version_info >= (3, 7):
214 18
        return asyncio.get_running_loop()
215
    else:
216 8
        loop = asyncio.get_event_loop()
217 26
        if not loop.is_running():
218 8
            raise RuntimeError("no running event loop")
219 8
        return loop

Read our documentation on viewing source code .

Loading