1
# coding: utf-8
2

3 27
from abc import ABCMeta, abstractmethod
4 27
from typing import Generic, TypeVar
5 27
import trio
6

7

8
# We use ABCMeta instead of ABC, plus set __slots__=(), so as not to force a
9
# __dict__ onto subclasses.
10 27
class Clock(metaclass=ABCMeta):
11
    """The interface for custom run loop clocks."""
12

13 27
    __slots__ = ()
14

15 27
    @abstractmethod
16 13
    def start_clock(self):
17
        """Do any setup this clock might need.
18

19
        Called at the beginning of the run.
20

21
        """
22

23 27
    @abstractmethod
24 13
    def current_time(self):
25
        """Return the current time, according to this clock.
26

27
        This is used to implement functions like :func:`trio.current_time` and
28
        :func:`trio.move_on_after`.
29

30
        Returns:
31
            float: The current time.
32

33
        """
34

35 27
    @abstractmethod
36 13
    def deadline_to_sleep_time(self, deadline):
37
        """Compute the real time until the given deadline.
38

39
        This is called before we enter a system-specific wait function like
40
        :func:`select.select`, to get the timeout to pass.
41

42
        For a clock using wall-time, this should be something like::
43

44
           return deadline - self.current_time()
45

46
        but of course it may be different if you're implementing some kind of
47
        virtual clock.
48

49
        Args:
50
            deadline (float): The absolute time of the next deadline,
51
                according to this clock.
52

53
        Returns:
54
            float: The number of real seconds to sleep until the given
55
            deadline. May be :data:`math.inf`.
56

57
        """
58

59

60 27
class Instrument(metaclass=ABCMeta):
61
    """The interface for run loop instrumentation.
62

63
    Instruments don't have to inherit from this abstract base class, and all
64
    of these methods are optional. This class serves mostly as documentation.
65

66
    """
67

68 27
    __slots__ = ()
69

70 27
    def before_run(self):
71
        """Called at the beginning of :func:`trio.run`."""
72

73 27
    def after_run(self):
74
        """Called just before :func:`trio.run` returns."""
75

76 27
    def task_spawned(self, task):
77
        """Called when the given task is created.
78

79
        Args:
80
            task (trio.lowlevel.Task): The new task.
81

82
        """
83

84 27
    def task_scheduled(self, task):
85
        """Called when the given task becomes runnable.
86

87
        It may still be some time before it actually runs, if there are other
88
        runnable tasks ahead of it.
89

90
        Args:
91
            task (trio.lowlevel.Task): The task that became runnable.
92

93
        """
94

95 27
    def before_task_step(self, task):
96
        """Called immediately before we resume running the given task.
97

98
        Args:
99
            task (trio.lowlevel.Task): The task that is about to run.
100

101
        """
102

103 27
    def after_task_step(self, task):
104
        """Called when we return to the main run loop after a task has yielded.
105

106
        Args:
107
            task (trio.lowlevel.Task): The task that just ran.
108

109
        """
110

111 27
    def task_exited(self, task):
112
        """Called when the given task exits.
113

114
        Args:
115
            task (trio.lowlevel.Task): The finished task.
116

117
        """
118

119 27
    def before_io_wait(self, timeout):
120
        """Called before blocking to wait for I/O readiness.
121

122
        Args:
123
            timeout (float): The number of seconds we are willing to wait.
124

125
        """
126

127 27
    def after_io_wait(self, timeout):
128
        """Called after handling pending I/O.
129

130
        Args:
131
            timeout (float): The number of seconds we were willing to
132
                wait. This much time may or may not have elapsed, depending on
133
                whether any I/O was ready.
134

135
        """
136

137

138 27
class HostnameResolver(metaclass=ABCMeta):
139
    """If you have a custom hostname resolver, then implementing
140
    :class:`HostnameResolver` allows you to register this to be used by Trio.
141

142
    See :func:`trio.socket.set_custom_hostname_resolver`.
143

144
    """
145

146 27
    __slots__ = ()
147

148 27
    @abstractmethod
149 27
    async def getaddrinfo(self, host, port, family=0, type=0, proto=0, flags=0):
150
        """A custom implementation of :func:`~trio.socket.getaddrinfo`.
151

152
        Called by :func:`trio.socket.getaddrinfo`.
153

154
        If ``host`` is given as a numeric IP address, then
155
        :func:`~trio.socket.getaddrinfo` may handle the request itself rather
156
        than calling this method.
157

158
        Any required IDNA encoding is handled before calling this function;
159
        your implementation can assume that it will never see U-labels like
160
        ``"café.com"``, and only needs to handle A-labels like
161
        ``b"xn--caf-dma.com"``.
162

163
        """
164

165 27
    @abstractmethod
166 13
    async def getnameinfo(self, sockaddr, flags):
167
        """A custom implementation of :func:`~trio.socket.getnameinfo`.
168

169
        Called by :func:`trio.socket.getnameinfo`.
170

171
        """
172

173

174 27
class SocketFactory(metaclass=ABCMeta):
175
    """If you write a custom class implementing the Trio socket interface,
176
    then you can use a :class:`SocketFactory` to get Trio to use it.
177

178
    See :func:`trio.socket.set_custom_socket_factory`.
179

180
    """
181

182 27
    @abstractmethod
183 27
    def socket(self, family=None, type=None, proto=None):
184
        """Create and return a socket object.
185

186
        Your socket object must inherit from :class:`trio.socket.SocketType`,
187
        which is an empty class whose only purpose is to "mark" which classes
188
        should be considered valid Trio sockets.
189

190
        Called by :func:`trio.socket.socket`.
191

192
        Note that unlike :func:`trio.socket.socket`, this does not take a
193
        ``fileno=`` argument. If a ``fileno=`` is specified, then
194
        :func:`trio.socket.socket` returns a regular Trio socket object
195
        instead of calling this method.
196

197
        """
198

199

200 27
class AsyncResource(metaclass=ABCMeta):
201
    """A standard interface for resources that needs to be cleaned up, and
202
    where that cleanup may require blocking operations.
203

204
    This class distinguishes between "graceful" closes, which may perform I/O
205
    and thus block, and a "forceful" close, which cannot. For example, cleanly
206
    shutting down a TLS-encrypted connection requires sending a "goodbye"
207
    message; but if a peer has become non-responsive, then sending this
208
    message might block forever, so we may want to just drop the connection
209
    instead. Therefore the :meth:`aclose` method is unusual in that it
210
    should always close the connection (or at least make its best attempt)
211
    *even if it fails*; failure indicates a failure to achieve grace, not a
212
    failure to close the connection.
213

214
    Objects that implement this interface can be used as async context
215
    managers, i.e., you can write::
216

217
      async with create_resource() as some_async_resource:
218
          ...
219

220
    Entering the context manager is synchronous (not a checkpoint); exiting it
221
    calls :meth:`aclose`. The default implementations of
222
    ``__aenter__`` and ``__aexit__`` should be adequate for all subclasses.
223

224
    """
225

226 27
    __slots__ = ()
227

228 27
    @abstractmethod
229 13
    async def aclose(self):
230
        """Close this resource, possibly blocking.
231

232
        IMPORTANT: This method may block in order to perform a "graceful"
233
        shutdown. But, if this fails, then it still *must* close any
234
        underlying resources before returning. An error from this method
235
        indicates a failure to achieve grace, *not* a failure to close the
236
        connection.
237

238
        For example, suppose we call :meth:`aclose` on a TLS-encrypted
239
        connection. This requires sending a "goodbye" message; but if the peer
240
        has become non-responsive, then our attempt to send this message might
241
        block forever, and eventually time out and be cancelled. In this case
242
        the :meth:`aclose` method on :class:`~trio.SSLStream` will
243
        immediately close the underlying transport stream using
244
        :func:`trio.aclose_forcefully` before raising :exc:`~trio.Cancelled`.
245

246
        If the resource is already closed, then this method should silently
247
        succeed.
248

249
        Once this method completes, any other pending or future operations on
250
        this resource should generally raise :exc:`~trio.ClosedResourceError`,
251
        unless there's a good reason to do otherwise.
252

253
        See also: :func:`trio.aclose_forcefully`.
254

255
        """
256

257 27
    async def __aenter__(self):
258 27
        return self
259

260 27
    async def __aexit__(self, *args):
261 27
        await self.aclose()
262

263

264 27
class SendStream(AsyncResource):
265
    """A standard interface for sending data on a byte stream.
266

267
    The underlying stream may be unidirectional, or bidirectional. If it's
268
    bidirectional, then you probably want to also implement
269
    :class:`ReceiveStream`, which makes your object a :class:`Stream`.
270

271
    :class:`SendStream` objects also implement the :class:`AsyncResource`
272
    interface, so they can be closed by calling :meth:`~AsyncResource.aclose`
273
    or using an ``async with`` block.
274

275
    If you want to send Python objects rather than raw bytes, see
276
    :class:`SendChannel`.
277

278
    """
279

280 27
    __slots__ = ()
281

282 27
    @abstractmethod
283 13
    async def send_all(self, data):
284
        """Sends the given data through the stream, blocking if necessary.
285

286
        Args:
287
          data (bytes, bytearray, or memoryview): The data to send.
288

289
        Raises:
290
          trio.BusyResourceError: if another task is already executing a
291
              :meth:`send_all`, :meth:`wait_send_all_might_not_block`, or
292
              :meth:`HalfCloseableStream.send_eof` on this stream.
293
          trio.BrokenResourceError: if something has gone wrong, and the stream
294
              is broken.
295
          trio.ClosedResourceError: if you previously closed this stream
296
              object, or if another task closes this stream object while
297
              :meth:`send_all` is running.
298

299
        Most low-level operations in Trio provide a guarantee: if they raise
300
        :exc:`trio.Cancelled`, this means that they had no effect, so the
301
        system remains in a known state. This is **not true** for
302
        :meth:`send_all`. If this operation raises :exc:`trio.Cancelled` (or
303
        any other exception for that matter), then it may have sent some, all,
304
        or none of the requested data, and there is no way to know which.
305

306
        """
307

308 27
    @abstractmethod
309 13
    async def wait_send_all_might_not_block(self):
310
        """Block until it's possible that :meth:`send_all` might not block.
311

312
        This method may return early: it's possible that after it returns,
313
        :meth:`send_all` will still block. (In the worst case, if no better
314
        implementation is available, then it might always return immediately
315
        without blocking. It's nice to do better than that when possible,
316
        though.)
317

318
        This method **must not** return *late*: if it's possible for
319
        :meth:`send_all` to complete without blocking, then it must
320
        return. When implementing it, err on the side of returning early.
321

322
        Raises:
323
          trio.BusyResourceError: if another task is already executing a
324
              :meth:`send_all`, :meth:`wait_send_all_might_not_block`, or
325
              :meth:`HalfCloseableStream.send_eof` on this stream.
326
          trio.BrokenResourceError: if something has gone wrong, and the stream
327
              is broken.
328
          trio.ClosedResourceError: if you previously closed this stream
329
              object, or if another task closes this stream object while
330
              :meth:`wait_send_all_might_not_block` is running.
331

332
        Note:
333

334
          This method is intended to aid in implementing protocols that want
335
          to delay choosing which data to send until the last moment. E.g.,
336
          suppose you're working on an implemention of a remote display server
337
          like `VNC
338
          <https://en.wikipedia.org/wiki/Virtual_Network_Computing>`__, and
339
          the network connection is currently backed up so that if you call
340
          :meth:`send_all` now then it will sit for 0.5 seconds before actually
341
          sending anything. In this case it doesn't make sense to take a
342
          screenshot, then wait 0.5 seconds, and then send it, because the
343
          screen will keep changing while you wait; it's better to wait 0.5
344
          seconds, then take the screenshot, and then send it, because this
345
          way the data you deliver will be more
346
          up-to-date. Using :meth:`wait_send_all_might_not_block` makes it
347
          possible to implement the better strategy.
348

349
          If you use this method, you might also want to read up on
350
          ``TCP_NOTSENT_LOWAT``.
351

352
          Further reading:
353

354
          * `Prioritization Only Works When There's Pending Data to Prioritize
355
            <https://insouciant.org/tech/prioritization-only-works-when-theres-pending-data-to-prioritize/>`__
356

357
          * WWDC 2015: Your App and Next Generation Networks: `slides
358
            <http://devstreaming.apple.com/videos/wwdc/2015/719ui2k57m/719/719_your_app_and_next_generation_networks.pdf?dl=1>`__,
359
            `video and transcript
360
            <https://developer.apple.com/videos/play/wwdc2015/719/>`__
361

362
        """
363

364

365 27
class ReceiveStream(AsyncResource):
366
    """A standard interface for receiving data on a byte stream.
367

368
    The underlying stream may be unidirectional, or bidirectional. If it's
369
    bidirectional, then you probably want to also implement
370
    :class:`SendStream`, which makes your object a :class:`Stream`.
371

372
    :class:`ReceiveStream` objects also implement the :class:`AsyncResource`
373
    interface, so they can be closed by calling :meth:`~AsyncResource.aclose`
374
    or using an ``async with`` block.
375

376
    If you want to receive Python objects rather than raw bytes, see
377
    :class:`ReceiveChannel`.
378

379
    `ReceiveStream` objects can be used in ``async for`` loops. Each iteration
380
    will produce an arbitrary sized chunk of bytes, like calling
381
    `receive_some` with no arguments. Every chunk will contain at least one
382
    byte, and the loop automatically exits when reaching end-of-file.
383

384
    """
385

386 27
    __slots__ = ()
387

388 27
    @abstractmethod
389 27
    async def receive_some(self, max_bytes=None):
390
        """Wait until there is data available on this stream, and then return
391
        some of it.
392

393
        A return value of ``b""`` (an empty bytestring) indicates that the
394
        stream has reached end-of-file. Implementations should be careful that
395
        they return ``b""`` if, and only if, the stream has reached
396
        end-of-file!
397

398
        Args:
399
          max_bytes (int): The maximum number of bytes to return. Must be
400
              greater than zero. Optional; if omitted, then the stream object
401
              is free to pick a reasonable default.
402

403
        Returns:
404
          bytes or bytearray: The data received.
405

406
        Raises:
407
          trio.BusyResourceError: if two tasks attempt to call
408
              :meth:`receive_some` on the same stream at the same time.
409
          trio.BrokenResourceError: if something has gone wrong, and the stream
410
              is broken.
411
          trio.ClosedResourceError: if you previously closed this stream
412
              object, or if another task closes this stream object while
413
              :meth:`receive_some` is running.
414

415
        """
416

417 27
    def __aiter__(self):
418 27
        return self
419

420 27
    async def __anext__(self):
421 27
        data = await self.receive_some()
422 27
        if not data:
423 27
            raise StopAsyncIteration
424 27
        return data
425

426

427 27
class Stream(SendStream, ReceiveStream):
428
    """A standard interface for interacting with bidirectional byte streams.
429

430
    A :class:`Stream` is an object that implements both the
431
    :class:`SendStream` and :class:`ReceiveStream` interfaces.
432

433
    If implementing this interface, you should consider whether you can go one
434
    step further and implement :class:`HalfCloseableStream`.
435

436
    """
437

438 27
    __slots__ = ()
439

440

441 27
class HalfCloseableStream(Stream):
442
    """This interface extends :class:`Stream` to also allow closing the send
443
    part of the stream without closing the receive part.
444

445
    """
446

447 27
    __slots__ = ()
448

449 27
    @abstractmethod
450 13
    async def send_eof(self):
451
        """Send an end-of-file indication on this stream, if possible.
452

453
        The difference between :meth:`send_eof` and
454
        :meth:`~AsyncResource.aclose` is that :meth:`send_eof` is a
455
        *unidirectional* end-of-file indication. After you call this method,
456
        you shouldn't try sending any more data on this stream, and your
457
        remote peer should receive an end-of-file indication (eventually,
458
        after receiving all the data you sent before that). But, they may
459
        continue to send data to you, and you can continue to receive it by
460
        calling :meth:`~ReceiveStream.receive_some`. You can think of it as
461
        calling :meth:`~AsyncResource.aclose` on just the
462
        :class:`SendStream` "half" of the stream object (and in fact that's
463
        literally how :class:`trio.StapledStream` implements it).
464

465
        Examples:
466

467
        * On a socket, this corresponds to ``shutdown(..., SHUT_WR)`` (`man
468
          page <https://linux.die.net/man/2/shutdown>`__).
469

470
        * The SSH protocol provides the ability to multiplex bidirectional
471
          "channels" on top of a single encrypted connection. A Trio
472
          implementation of SSH could expose these channels as
473
          :class:`HalfCloseableStream` objects, and calling :meth:`send_eof`
474
          would send an ``SSH_MSG_CHANNEL_EOF`` request (see `RFC 4254 §5.3
475
          <https://tools.ietf.org/html/rfc4254#section-5.3>`__).
476

477
        * On an SSL/TLS-encrypted connection, the protocol doesn't provide any
478
          way to do a unidirectional shutdown without closing the connection
479
          entirely, so :class:`~trio.SSLStream` implements
480
          :class:`Stream`, not :class:`HalfCloseableStream`.
481

482
        If an EOF has already been sent, then this method should silently
483
        succeed.
484

485
        Raises:
486
          trio.BusyResourceError: if another task is already executing a
487
              :meth:`~SendStream.send_all`,
488
              :meth:`~SendStream.wait_send_all_might_not_block`, or
489
              :meth:`send_eof` on this stream.
490
          trio.BrokenResourceError: if something has gone wrong, and the stream
491
              is broken.
492
          trio.ClosedResourceError: if you previously closed this stream
493
              object, or if another task closes this stream object while
494
              :meth:`send_eof` is running.
495

496
        """
497

498

499
# A regular invariant generic type
500 27
T = TypeVar("T")
501

502
# The type of object produced by a ReceiveChannel (covariant because
503
# ReceiveChannel[Derived] can be passed to someone expecting
504
# ReceiveChannel[Base])
505 27
ReceiveType = TypeVar("ReceiveType", covariant=True)
506

507
# The type of object accepted by a SendChannel (contravariant because
508
# SendChannel[Base] can be passed to someone expecting
509
# SendChannel[Derived])
510 27
SendType = TypeVar("SendType", contravariant=True)
511

512
# The type of object produced by a Listener (covariant plus must be
513
# an AsyncResource)
514 27
T_resource = TypeVar("T_resource", bound=AsyncResource, covariant=True)
515

516

517 27
class Listener(AsyncResource, Generic[T_resource]):
518
    """A standard interface for listening for incoming connections.
519

520
    :class:`Listener` objects also implement the :class:`AsyncResource`
521
    interface, so they can be closed by calling :meth:`~AsyncResource.aclose`
522
    or using an ``async with`` block.
523

524
    """
525

526 27
    __slots__ = ()
527

528 27
    @abstractmethod
529 13
    async def accept(self):
530
        """Wait until an incoming connection arrives, and then return it.
531

532
        Returns:
533
          AsyncResource: An object representing the incoming connection. In
534
          practice this is generally some kind of :class:`Stream`,
535
          but in principle you could also define a :class:`Listener` that
536
          returned, say, channel objects.
537

538
        Raises:
539
          trio.BusyResourceError: if two tasks attempt to call
540
              :meth:`accept` on the same listener at the same time.
541
          trio.ClosedResourceError: if you previously closed this listener
542
              object, or if another task closes this listener object while
543
              :meth:`accept` is running.
544

545
        Listeners don't generally raise :exc:`~trio.BrokenResourceError`,
546
        because for listeners there is no general condition of "the
547
        network/remote peer broke the connection" that can be handled in a
548
        generic way, like there is for streams. Other errors *can* occur and
549
        be raised from :meth:`accept` – for example, if you run out of file
550
        descriptors then you might get an :class:`OSError` with its errno set
551
        to ``EMFILE``.
552

553
        """
554

555

556 27
class SendChannel(AsyncResource, Generic[SendType]):
557
    """A standard interface for sending Python objects to some receiver.
558

559
    `SendChannel` objects also implement the `AsyncResource` interface, so
560
    they can be closed by calling `~AsyncResource.aclose` or using an ``async
561
    with`` block.
562

563
    If you want to send raw bytes rather than Python objects, see
564
    `SendStream`.
565

566
    """
567

568 27
    __slots__ = ()
569

570 27
    @abstractmethod
571 27
    async def send(self, value: SendType) -> None:
572
        """Attempt to send an object through the channel, blocking if necessary.
573

574
        Args:
575
          value (object): The object to send.
576

577
        Raises:
578
          trio.BrokenResourceError: if something has gone wrong, and the
579
              channel is broken. For example, you may get this if the receiver
580
              has already been closed.
581
          trio.ClosedResourceError: if you previously closed this
582
              :class:`SendChannel` object, or if another task closes it while
583
              :meth:`send` is running.
584
          trio.BusyResourceError: some channels allow multiple tasks to call
585
              `send` at the same time, but others don't. If you try to call
586
              `send` simultaneously from multiple tasks on a channel that
587
              doesn't support it, then you can get `~trio.BusyResourceError`.
588

589
        """
590

591

592 27
class ReceiveChannel(AsyncResource, Generic[ReceiveType]):
593
    """A standard interface for receiving Python objects from some sender.
594

595
    You can iterate over a :class:`ReceiveChannel` using an ``async for``
596
    loop::
597

598
       async for value in receive_channel:
599
           ...
600

601
    This is equivalent to calling :meth:`receive` repeatedly. The loop exits
602
    without error when `receive` raises `~trio.EndOfChannel`.
603

604
    `ReceiveChannel` objects also implement the `AsyncResource` interface, so
605
    they can be closed by calling `~AsyncResource.aclose` or using an ``async
606
    with`` block.
607

608
    If you want to receive raw bytes rather than Python objects, see
609
    `ReceiveStream`.
610

611
    """
612

613 27
    __slots__ = ()
614

615 27
    @abstractmethod
616 27
    async def receive(self) -> ReceiveType:
617
        """Attempt to receive an incoming object, blocking if necessary.
618

619
        Returns:
620
          object: Whatever object was received.
621

622
        Raises:
623
          trio.EndOfChannel: if the sender has been closed cleanly, and no
624
              more objects are coming. This is not an error condition.
625
          trio.ClosedResourceError: if you previously closed this
626
              :class:`ReceiveChannel` object.
627
          trio.BrokenResourceError: if something has gone wrong, and the
628
              channel is broken.
629
          trio.BusyResourceError: some channels allow multiple tasks to call
630
              `receive` at the same time, but others don't. If you try to call
631
              `receive` simultaneously from multiple tasks on a channel that
632
              doesn't support it, then you can get `~trio.BusyResourceError`.
633

634
        """
635

636 27
    def __aiter__(self):
637 27
        return self
638

639 27
    async def __anext__(self) -> ReceiveType:
640 27
        try:
641 27
            return await self.receive()
642 27
        except trio.EndOfChannel:
643 27
            raise StopAsyncIteration
644

645

646 27
class Channel(SendChannel[T], ReceiveChannel[T]):
647
    """A standard interface for interacting with bidirectional channels.
648

649
    A `Channel` is an object that implements both the `SendChannel` and
650
    `ReceiveChannel` interfaces, so you can both send and receive objects.
651

652
    """

Read our documentation on viewing source code .

Loading