1
#!/usr/bin/env python3
2

3
# Contest Management System - http://cms-dev.github.io/
4
# Copyright © 2013 Luca Wehrstedt <luca.wehrstedt@gmail.com>
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU Affero General Public License as
8
# published by the Free Software Foundation, either version 3 of the
9
# License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU Affero General Public License for more details.
15
#
16
# You should have received a copy of the GNU Affero General Public License
17
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
18

19 1
import re
20 1
import time
21 1
from collections import deque
22 1
from weakref import WeakSet
23

24 1
from gevent import Timeout
25 1
from gevent.pywsgi import WSGIHandler
26 1
from gevent.queue import Queue, Empty
27 1
from werkzeug.exceptions import NotAcceptable
28 1
from werkzeug.wrappers import Request
29

30

31 1
__all__ = [
32
    "format_event",
33
    "Publisher", "Subscriber", "EventSource",
34
    ]
35

36

37 1
def format_event(id_, event, data):
38
    """Format the parameters to be sent on an event stream.
39

40
    Produce a text that, written on a Server-Sent Events connection,
41
    will cause the client to receive an event of the given type with
42
    the given data, and set the last event ID to the given id. No colon
43
    nor line breaks (i.e. "\\r\\n", "\\r", "\\n") are allowed in the
44
    event name and all line breaks in the event data will become "\\n".
45

46
    id_ (unicode): the ID of the event.
47
    event (unicode): the name of the event, or None.
48
    data (unicode): the content of the event, or None.
49

50
    return (bytes): the value to write on the stream.
51

52
    raise (TypeError): if any parameter isn't unicode.
53
    raise (ValueError): if event contains illegal characters.
54

55
    """
56 1
    if not isinstance(id_, str):
57 0
        raise TypeError("Id isn't unicode.")
58 1
    result = [b"id:%s" % id_.encode('utf-8')]
59

60 1
    if event is not None and event != "message":
61 1
        if not isinstance(event, str):
62 0
            raise TypeError("Event isn't unicode.")
63 1
        if not set("\r\n:").isdisjoint(event):
64 0
            raise ValueError("Event cannot contain '\\r', '\\n' or ':'.")
65 1
        result += [b"event:%s" % event.encode('utf-8')]
66

67 1
    if data is not None:
68 1
        if not isinstance(data, str):
69 0
            raise TypeError("Data isn't unicode.")
70 1
        for line in re.split("\r\n|(?<!\r)\n|\r(?!\n)", data):
71 1
            result += [b"data:%s" % line.encode('utf-8')]
72

73 1
    result += [b'\n']
74

75 1
    return b'\n'.join(result)
76

77

78 1
class Publisher:
79
    """The publish part of a pub-sub broadcast system.
80

81
    Publish-subscribe is actually an improper name, as there's just one
82
    "topic", making it a simple broadcast system. The publisher class
83
    is responsible for receiving messages to be sent, keeping them in
84
    a cache for a while, instantiating subscribers, each with its own
85
    queue, and pushing new messages to all these queues.
86

87
    """
88 1
    def __init__(self, size):
89
        """Instantiate a new publisher.
90

91
        size (int): the number of messages to keep in cache.
92

93
        """
94
        # We use a deque as it's efficient to add messages to one end
95
        # and have the ones at the other end be dropped when the total
96
        # number exceeds the given limit.
97 1
        self._cache = deque(maxlen=size)
98
        # We use a WeakSet as we want queues to be vanish automatically
99
        # when no one else is using (i.e. fetching from) them.
100 1
        self._sub_queues = WeakSet()
101

102 1
    def put(self, event, data):
103
        """Dispatch a new item to all subscribers.
104

105
        See format_event for details about the parameters.
106

107
        event (unicode): the type of event the client will receive.
108
        data (unicode): the associated data.
109

110
        """
111
        # Number of microseconds since epoch.
112 1
        key = int(time.time() * 1_000_000)
113 1
        msg = format_event("%x" % key, event, data)
114
        # Put into cache.
115 1
        self._cache.append((key, msg))
116
        # Send to all subscribers.
117 1
        for queue in self._sub_queues:
118 0
            queue.put(msg)
119

120 1
    def get_subscriber(self, last_event_id=None):
121
        """Obtain a new subscriber.
122

123
        The returned subscriber will receive all messages after the one
124
        with the given index (if they are still in the cache).
125

126
        last_event_id (unicode|None): the ID of the last message the
127
            client did receive, to request the one generated since
128
            then to be sent again. If not given no past message will
129
            be sent.
130

131
        return (Subscriber): a new subscriber instance.
132

133
        """
134 0
        queue = Queue()
135
        # If a valid last_event_id is provided see if cache can supply
136
        # missed events.
137 0
        if last_event_id is not None and \
138
                re.match("^[0-9A-Fa-f]+$", last_event_id):
139 0
            last_event_key = int(last_event_id, 16)
140 0
            if len(self._cache) > 0 and last_event_key >= self._cache[0][0]:
141
                # All missed events are in cache.
142 0
                for key, msg in self._cache:
143 0
                    if key > last_event_key:
144 0
                        queue.put(msg)
145
            else:
146
                # Some events may be missing. Ask to reinit.
147 0
                queue.put(b"event:reinit\n\n")
148
        # Store the queue and return a subscriber bound to it.
149 0
        self._sub_queues.add(queue)
150 0
        return Subscriber(queue)
151

152

153 1
class Subscriber:
154
    """The subscribe part of a pub-sub broadcast system.
155

156
    This class receives the messages sent to the Publisher that created
157
    it.
158

159
    """
160 1
    def __init__(self, queue):
161
        """Create a new subscriber.
162

163
        Make it wait for messages on the given queue, managed by the
164
        Publisher.
165

166
        queue (Queue): a message queue.
167

168
        """
169 0
        self._queue = queue
170

171 1
    def get(self):
172
        """Retrieve new messages.
173

174
        Obtain all messages that were put in the associated publisher
175
        since this method was last called, or (on the first call) since
176
        the last_event_id given to get_subscriber.
177

178
        return ([objects]): the items put in the publisher, in order
179
            (actually, returns a generator, not a list).
180

181
        raise (OutdatedError): if some of the messages it's supposed to
182
            retrieve have already been removed from the cache.
183

184
        """
185
        # Block until we have something to do.
186 0
        self._queue.peek()
187
        # Fetch all items that are immediately available.
188 0
        try:
189 0
            while True:
190 0
                yield self._queue.get_nowait()
191 0
        except Empty:
192 0
            pass
193

194

195 1
class EventSource:
196
    """A class that implements a Server-Sent Events [1] handler.
197

198
    This class is intended to be extended: it takes charge of all the
199
    hard work of managing a stream of events, leaving to the subclass
200
    only the fun of determining which events to send.
201

202
    Server-Sent Events are a way to make server push using long-polling
203
    over HTTP connections, preferably with chunked transfer encoding.
204
    This use wasn't a design goal of WSGI but this class, which is a
205
    WSGI application, should be able to manage it. It has been written
206
    to work on a gevent.pywsgi server, but should handle other servers
207
    as well.
208

209
    """
210 1
    _GLOBAL_TIMEOUT = 600
211 1
    _WRITE_TIMEOUT = 30
212 1
    _PING_TIMEOUT = 15
213

214 1
    _CACHE_SIZE = 250
215

216 1
    def __init__(self):
217
        """Create an event source.
218

219
        """
220 1
        self._pub = Publisher(self._CACHE_SIZE)
221

222 1
    def send(self, event, data):
223
        """Send the event to the stream.
224

225
        Intended for subclasses to push new events to clients. See
226
        format_event for the meaning of the parameters.
227

228
        event (unicode): the type of the event.
229
        data (unicode): the data of the event.
230

231
        """
232 1
        self._pub.put(event, data)
233

234 1
    def __call__(self, environ, start_response):
235
        """Execute this instance as a WSGI application.
236

237
        See the PEP for the meaning of parameters. The separation of
238
        __call__ and wsgi_app eases the insertion of middlewares.
239

240
        """
241 0
        return self.wsgi_app(environ, start_response)
242

243 1
    def wsgi_app(self, environ, start_response):
244
        """Execute this instance as a WSGI application.
245

246
        See the PEP for the meaning of parameters. The separation of
247
        __call__ and wsgi_app eases the insertion of middlewares.
248

249
        """
250 0
        request = Request(environ)
251 0
        request.encoding_errors = "strict"
252

253
        # The problem here is that we'd like to send an infinite stream
254
        # of events, but WSGI has been designed to handle only finite
255
        # responses. Hence, to do this we will have to "abuse" the API
256
        # a little. This works well with gevent's pywsgi implementation
257
        # but it may not with others (still PEP-compliant). Therefore,
258
        # just to be extra-safe, we will terminate the response anyway,
259
        # after a long timeout, to make it finite.
260

261
        # The first such "hack" is the mechanism to trigger the chunked
262
        # transfer-encoding. The PEP states just that "the server *may*
263
        # use chunked encoding" to send each piece of data we give it,
264
        # if we don't specify a Content-Length header and if both the
265
        # client and the server support it. According to the HTTP spec.
266
        # all (and only) HTTP/1.1 compliant clients have to support it.
267
        # We'll assume that the server software supports it too, and
268
        # actually uses it (gevent does!) even if we have no way to
269
        # check it. We cannot try to force such behavior as the PEP
270
        # doesn't even allow us to set the Transfer-Encoding header.
271

272
        # The second abuse is the use of the write() callable, returned
273
        # by start_response, even if the PEP strongly discourages its
274
        # use in new applications. We do it because we need a way to
275
        # detect when the client disconnects, and we hope to achieve
276
        # this by seeing when a call to write() fails, i.e. raises an
277
        # exception. This behavior isn't documented by the PEP, but it
278
        # seems reasonable and it's present in gevent (which raises a
279
        # OSError).
280

281
        # The third non-standard behavior that we expect (related to
282
        # the previous one) is that no one in the application-to-client
283
        # chain does response buffering: neither any middleware nor the
284
        # server (gevent doesn't!). This should also hold outside the
285
        # server realm (i.e. no proxy buffering) but that's definitely
286
        # not our responsibility.
287

288
        # The fourth "hack" is to avoid an error to be printed on the
289
        # logs. If the client terminates the connection, we catch and
290
        # silently ignore the exception and return gracefully making
291
        # the server try to write the last zero-sized chunk (used to
292
        # mark the end of the stream). This will fail and produce an
293
        # error. To avoid this we detect if we're running on a gevent
294
        # server and make it "forget" this was a chunked response.
295

296
        # Check if the client will understand what we will produce.
297 0
        if request.accept_mimetypes.quality("text/event-stream") <= 0:
298 0
            return NotAcceptable()(environ, start_response)
299

300
        # Initialize the response and get the write() callback. The
301
        # Cache-Control header is useless for conforming clients, as
302
        # the spec. already imposes that behavior on them, but we set
303
        # it explicitly to avoid unwanted caching by unaware proxies and
304
        # middlewares.
305 0
        write = start_response(
306
            "200 OK", [("Content-Type", "text/event-stream; charset=utf-8"),
307
                       ("Cache-Control", "no-cache")])
308

309
        # This is a part of the fourth hack (see above).
310 0
        if hasattr(start_response, "__self__") and \
311
                isinstance(start_response.__self__, WSGIHandler):
312 0
            handler = start_response.__self__
313
        else:
314 0
            handler = None
315

316
        # One-shot means that we will terminate the request after the
317
        # first batch of sent events. We do this when we believe the
318
        # client doesn't support chunked transfer. As this encoding has
319
        # been introduced in HTTP/1.1 (as mandatory!) we restrict to
320
        # requests in that HTTP version. Also, if it comes from an
321
        # XMLHttpRequest it has been probably sent from a polyfill (not
322
        # from the native browser implementation) which will be able to
323
        # read the response body only when it has been fully received.
324 0
        if environ["SERVER_PROTOCOL"] != "HTTP/1.1" or request.is_xhr:
325 0
            one_shot = True
326
        else:
327 0
            one_shot = False
328

329
        # As for the Server-Sent Events [1] spec., this is the way for
330
        # the client to tell us the ID of the last event it received
331
        # and to ask us to send it the ones that happened since then.
332
        # [1] http://www.w3.org/TR/eventsource/
333
        # The spec. requires implementations to retry the connection
334
        # when it fails, adding the "Last-Event-ID" HTTP header. But in
335
        # case of an error they stop, and we have to (manually) delete
336
        # the EventSource and create a new one. To obtain that behavior
337
        # again we give the "last_event_id" as a URL query parameter
338
        # (with lower priority, to have the header override it).
339 0
        last_event_id = request.headers.get("Last-Event-ID")
340 0
        if last_event_id is None:
341 0
            last_event_id = request.args.get("last_event_id")
342

343
        # We subscribe to the publisher to receive events.
344 0
        sub = self._pub.get_subscriber(last_event_id)
345

346
        # Send some data down the pipe. We need that to make the user
347
        # agent announces the connection (see the spec.). Since it's a
348
        # comment it will be ignored.
349 0
        write(b":\n")
350

351
        # XXX We could make the client change its reconnection timeout
352
        # by sending a "retry:" line.
353

354
        # As a last line of defence from very bad-behaving servers we
355
        # don't want to the request to last longer than _GLOBAL_TIMEOUT
356
        # seconds (see above). We use "False" to just cause the control
357
        # exit the with block, instead of raising an exception.
358 0
        with Timeout(self._GLOBAL_TIMEOUT, False):
359
            # Repeat indefinitely.
360 0
            while True:
361
                # Proxies often have a read timeout. We try not to hit
362
                # it by not being idle for more than _PING_TIMEOUT
363
                # seconds, sending a ping (i.e. a comment) if there's
364
                # no real data.
365 0
                try:
366 0
                    with Timeout(self._PING_TIMEOUT):
367 0
                        data = b"".join(sub.get())
368 0
                        got_sth = True
369 0
                except Timeout:
370 0
                    data = b":\n"
371 0
                    got_sth = False
372

373 0
                try:
374 0
                    with Timeout(self._WRITE_TIMEOUT):
375 0
                        write(data)
376
                # The PEP doesn't tell what has to happen when a write
377
                # fails. We're conservative, and allow any unexpected
378
                # event to interrupt the request. We hope it's enough
379
                # to detect when the client disconnects. It is with
380
                # gevent, which raises an OSError. The timeout (we
381
                # catch that too) is just an extra precaution.
382 0
                except Exception:
383
                    # This is part of the fourth hack (see above).
384 0
                    if handler is not None:
385 0
                        handler.response_use_chunked = False
386 0
                    break
387

388
                # If we decided this is one-shot, stop the long-poll as
389
                # soon as we sent the client some real data.
390 0
                if one_shot and got_sth:
391 0
                    break
392

393
        # An empty iterable tells the server not to send anything.
394 0
        return []

Read our documentation on viewing source code .

Loading