aio-libs / aiohttp
1 10
import asyncio
2 10
from contextlib import suppress
3 10
from typing import Any, Optional, Tuple
4

5 10
from .base_protocol import BaseProtocol
6 10
from .client_exceptions import (
7
    ClientOSError,
8
    ClientPayloadError,
9
    ServerDisconnectedError,
10
    ServerTimeoutError,
11
)
12 10
from .helpers import BaseTimerContext, set_exception, set_result
13 10
from .http import HttpResponseParser, RawResponseMessage
14 10
from .streams import EMPTY_PAYLOAD, DataQueue, StreamReader
15

16

17 10
class ResponseHandler(BaseProtocol, DataQueue[Tuple[RawResponseMessage, StreamReader]]):
18
    """Helper class to adapt between Protocol and StreamReader."""
19

20 10
    def __init__(self, loop: asyncio.AbstractEventLoop) -> None:
21 10
        BaseProtocol.__init__(self, loop=loop)
22 10
        DataQueue.__init__(self, loop)
23

24 10
        self._should_close = False
25

26 10
        self._payload: Optional[StreamReader] = None
27 10
        self._skip_payload = False
28 10
        self._payload_parser = None
29

30 10
        self._timer = None
31

32 10
        self._tail = b""
33 10
        self._upgraded = False
34 10
        self._parser = None  # type: Optional[HttpResponseParser]
35

36 10
        self._read_timeout = None  # type: Optional[float]
37 10
        self._read_timeout_handle = None  # type: Optional[asyncio.TimerHandle]
38

39 10
        self.closed = self._loop.create_future()  # type: asyncio.Future[None]
40

41 10
    @property
42 10
    def upgraded(self) -> bool:
43 10
        return self._upgraded
44

45 10
    @property
46 10
    def should_close(self) -> bool:
47 10
        if self._payload is not None and not self._payload.is_eof() or self._upgraded:
48 10
            return True
49

50 10
        return (
51
            self._should_close
52
            or self._upgraded
53
            or self.exception() is not None
54
            or self._payload_parser is not None
55
            or len(self) > 0
56
            or bool(self._tail)
57
        )
58

59 10
    def force_close(self) -> None:
60 10
        self._should_close = True
61

62 10
    def close(self) -> None:
63 10
        transport = self.transport
64 10
        if transport is not None:
65 10
            transport.close()
66 10
            self.transport = None
67 10
            self._payload = None
68 10
            self._drop_timeout()
69

70 10
    def is_connected(self) -> bool:
71 10
        return self.transport is not None and not self.transport.is_closing()
72

73 10
    def connection_lost(self, exc: Optional[BaseException]) -> None:
74 10
        self._drop_timeout()
75

76 10
        if exc is not None:
77 10
            set_exception(self.closed, exc)
78
        else:
79 10
            set_result(self.closed, None)
80

81 10
        if self._payload_parser is not None:
82 10
            with suppress(Exception):
83 10
                self._payload_parser.feed_eof()
84

85 10
        uncompleted = None
86 10
        if self._parser is not None:
87 10
            try:
88 10
                uncompleted = self._parser.feed_eof()
89 10
            except Exception:
90 10
                if self._payload is not None:
91 10
                    self._payload.set_exception(
92
                        ClientPayloadError("Response payload is not completed")
93
                    )
94

95 10
        if not self.is_eof():
96 10
            if isinstance(exc, OSError):
97 10
                exc = ClientOSError(*exc.args)
98 10
            if exc is None:
99 10
                exc = ServerDisconnectedError(uncompleted)
100
            # assigns self._should_close to True as side effect,
101
            # we do it anyway below
102 10
            self.set_exception(exc)
103

104 10
        self._should_close = True
105 10
        self._parser = None
106 10
        self._payload = None
107 10
        self._payload_parser = None
108 10
        self._reading_paused = False
109

110 10
        super().connection_lost(exc)
111

112 10
    def eof_received(self) -> None:
113
        # should call parser.feed_eof() most likely
114 10
        self._drop_timeout()
115

116 10
    def pause_reading(self) -> None:
117 10
        super().pause_reading()
118 10
        self._drop_timeout()
119

120 10
    def resume_reading(self) -> None:
121 10
        super().resume_reading()
122 10
        self._reschedule_timeout()
123

124 10
    def set_exception(self, exc: BaseException) -> None:
125 10
        self._should_close = True
126 10
        self._drop_timeout()
127 10
        super().set_exception(exc)
128

129 10
    def set_parser(self, parser: Any, payload: Any) -> None:
130
        # TODO: actual types are:
131
        #   parser: WebSocketReader
132
        #   payload: FlowControlDataQueue
133
        # but they are not generi enough
134
        # Need an ABC for both types
135 10
        self._payload = payload
136 10
        self._payload_parser = parser
137

138 10
        self._drop_timeout()
139

140 10
        if self._tail:
141 10
            data, self._tail = self._tail, b""
142 10
            self.data_received(data)
143

144 10
    def set_response_params(
145
        self,
146
        *,
147
        timer: Optional[BaseTimerContext] = None,
148
        skip_payload: bool = False,
149
        read_until_eof: bool = False,
150
        auto_decompress: bool = True,
151
        read_timeout: Optional[float] = None,
152
        read_bufsize: int = 2 ** 16,
153
    ) -> None:
154 10
        self._skip_payload = skip_payload
155

156 10
        self._read_timeout = read_timeout
157 10
        self._reschedule_timeout()
158

159 10
        self._parser = HttpResponseParser(
160
            self,
161
            self._loop,
162
            read_bufsize,
163
            timer=timer,
164
            payload_exception=ClientPayloadError,
165
            response_with_body=not skip_payload,
166
            read_until_eof=read_until_eof,
167
            auto_decompress=auto_decompress,
168
        )
169

170 10
        if self._tail:
171 10
            data, self._tail = self._tail, b""
172 10
            self.data_received(data)
173

174 10
    def _drop_timeout(self) -> None:
175 10
        if self._read_timeout_handle is not None:
176 10
            self._read_timeout_handle.cancel()
177 10
            self._read_timeout_handle = None
178

179 10
    def _reschedule_timeout(self) -> None:
180 10
        timeout = self._read_timeout
181 10
        if self._read_timeout_handle is not None:
182 10
            self._read_timeout_handle.cancel()
183

184 10
        if timeout:
185 10
            self._read_timeout_handle = self._loop.call_later(
186
                timeout, self._on_read_timeout
187
            )
188
        else:
189 10
            self._read_timeout_handle = None
190

191 10
    def _on_read_timeout(self) -> None:
192 10
        exc = ServerTimeoutError("Timeout on reading data from socket")
193 10
        self.set_exception(exc)
194 10
        if self._payload is not None:
195 10
            self._payload.set_exception(exc)
196

197 10
    def data_received(self, data: bytes) -> None:
198 10
        self._reschedule_timeout()
199

200 10
        if not data:
201 10
            return
202

203
        # custom payload parser
204 10
        if self._payload_parser is not None:
205 10
            eof, tail = self._payload_parser.feed_data(data)
206 10
            if eof:
207 10
                self._payload = None
208 10
                self._payload_parser = None
209

210 10
                if tail:
211 0
                    self.data_received(tail)
212 10
            return
213
        else:
214 10
            if self._upgraded or self._parser is None:
215
                # i.e. websocket connection, websocket parser is not set yet
216 10
                self._tail += data
217
            else:
218
                # parse http messages
219 10
                try:
220 10
                    messages, upgraded, tail = self._parser.feed_data(data)
221 10
                except BaseException as exc:
222 10
                    if self.transport is not None:
223
                        # connection.release() could be called BEFORE
224
                        # data_received(), the transport is already
225
                        # closed in this case
226 10
                        self.transport.close()
227
                    # should_close is True after the call
228 10
                    self.set_exception(exc)
229 10
                    return
230

231 10
                self._upgraded = upgraded
232

233 10
                payload: Optional[StreamReader] = None
234 10
                for message, payload in messages:
235 10
                    if message.should_close:
236 10
                        self._should_close = True
237

238 10
                    self._payload = payload
239

240 10
                    if self._skip_payload or message.code in (204, 304):
241 10
                        self.feed_data((message, EMPTY_PAYLOAD), 0)
242
                    else:
243 10
                        self.feed_data((message, payload), 0)
244 10
                if payload is not None:
245
                    # new message(s) was processed
246
                    # register timeout handler unsubscribing
247
                    # either on end-of-stream or immediately for
248
                    # EMPTY_PAYLOAD
249 10
                    if payload is not EMPTY_PAYLOAD:
250 10
                        payload.on_eof(self._drop_timeout)
251
                    else:
252 10
                        self._drop_timeout()
253

254 10
                if tail:
255 10
                    if upgraded:
256 8
                        self.data_received(tail)
257
                    else:
258 0
                        self._tail = tail

Read our documentation on viewing source code .

Loading