1 5
import json
2 5
from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple, overload
3

4 5
import aiohttp
5 5
from typing_extensions import Literal
6 5
from yarl import URL
7

8 5
from .stream import Stream
9

10

11 5
if TYPE_CHECKING:
12 0
    from .docker import Docker
13

14

15
# When a Tty is allocated for an "exec" operation, the stdout and stderr are streamed
16
# straight to the client.
17
# When a Tty is NOT allocated, then stdout and stderr are multiplexed using the format
18
# given at
19
# https://docs.docker.com/engine/api/v1.40/#operation/ContainerAttach under the "Stream
20
# Format" heading. Note that that documentation is for "docker attach" but the format
21
# also applies to "docker exec."
22

23

24 5
class Exec:
25 5
    def __init__(self, docker: "Docker", id: str, tty: Optional[bool]) -> None:
26 4
        self.docker = docker
27 4
        self._id = id
28 4
        self._tty = tty
29

30 5
    @property
31 5
    def id(self) -> str:
32 4
        return self._id
33

34 5
    async def inspect(self) -> Dict[str, Any]:
35 4
        ret = await self.docker._query_json(f"exec/{self._id}/json")
36 4
        self._tty = ret["ProcessConfig"]["tty"]
37 4
        return ret
38

39 5
    async def resize(self, *, h: Optional[int] = None, w: Optional[int] = None) -> None:
40 4
        dct: Dict[str, int] = {}
41 4
        if h is not None:
42 4
            dct["h"] = h
43 4
        if w is not None:
44 4
            dct["w"] = w
45 4
        if not dct:
46 0
            return
47 4
        url = URL(f"exec/{self._id}/resize").with_query(dct)
48 4
        async with self.docker._query(url, method="POST") as resp:
49 4
            resp
50

51 5
    @overload
52 5
    def start(
53
        self,
54
        *,
55
        timeout: aiohttp.ClientTimeout = None,
56
        detach: Literal[False] = False,
57
    ) -> Stream:
58 0
        pass
59

60 5
    @overload  # noqa
61 5
    async def start(
62
        self,
63
        *,
64
        timeout: aiohttp.ClientTimeout = None,
65
        detach: Literal[True],
66
    ) -> bytes:
67 0
        pass
68

69 5
    def start(self, *, timeout=None, detach=False):  # noqa
70
        """
71
        Start this exec instance.
72
        Args:
73
            timeout: The timeout in seconds for the request to start the exec instance.
74
            detach: Indicates whether we should detach from the command (like the `-d`
75
                option to `docker exec`).
76
            tty: Indicates whether a TTY should be allocated (like the `-t` option to
77
                `docker exec`).
78
        Returns:
79
            If `detach` is `True`, this method will return the result of the exec
80
            process as a binary string.
81
            If `detach` is False, an `aiohttp.ClientWebSocketResponse` will be returned.
82
            You can use it to send and receive data the same wa as the response of
83
            "ws_connect" of aiohttp. If `tty` is `False`, then the messages returned
84
            from `receive*` will have their `extra` attribute set to 1 if the data was
85
            from stdout or 2 if from stderr.
86
        """
87 4
        if detach:
88 4
            return self._start_detached(timeout, self._tty)
89
        else:
90

91 4
            async def setup() -> Tuple[URL, bytes, bool]:
92 4
                if self._tty is None:
93 4
                    await self.inspect()  # should restore tty
94 4
                assert self._tty is not None
95 4
                return (
96
                    URL(f"exec/{self._id}/start"),
97
                    json.dumps({"Detach": False, "Tty": self._tty}).encode("utf8"),
98
                    self._tty,
99
                )
100

101 4
            return Stream(self.docker, setup, timeout)
102

103 5
    async def _start_detached(
104
        self,
105
        timeout: aiohttp.ClientTimeout = None,
106
        tty: bool = False,
107
    ) -> bytes:
108 4
        if self._tty is None:
109 4
            await self.inspect()  # should restore tty
110 4
        assert self._tty is not None
111 4
        async with self.docker._query(
112
            f"exec/{self._id}/start",
113
            method="POST",
114
            headers={"Content-Type": "application/json"},
115
            data=json.dumps({"Detach": True, "Tty": tty}),
116
            timeout=timeout,
117
        ) as response:
118 4
            result = await response.read()
119 4
            await response.release()
120 4
            return result

Read our documentation on viewing source code .

Loading