1 5
import io
2 5
import json
3 5
import warnings
4 5
from typing import (
5
    Any,
6
    AsyncIterator,
7
    BinaryIO,
8
    Dict,
9
    List,
10
    Mapping,
11
    MutableMapping,
12
    Optional,
13
    Union,
14
    overload,
15
)
16

17 5
from typing_extensions import Literal
18

19 5
from .jsonstream import json_stream_list, json_stream_stream
20 5
from .utils import clean_map, compose_auth_header
21

22

23 5
class DockerImages(object):
24 5
    def __init__(self, docker):
25 5
        self.docker = docker
26

27 5
    async def list(self, **params) -> Mapping:
28
        """
29
        List of images
30
        """
31 4
        response = await self.docker._query_json("images/json", "GET", params=params)
32 4
        return response
33

34 5
    async def inspect(self, name: str) -> Mapping:
35
        """
36
        Return low-level information about an image
37

38
        Args:
39
            name: name of the image
40
        """
41 5
        response = await self.docker._query_json("images/{name}/json".format(name=name))
42 4
        return response
43

44 5
    async def get(self, name: str) -> Mapping:
45 4
        warnings.warn(
46
            """images.get is deprecated and will be removed in the next release,
47
            please use images.inspect instead.""",
48
            DeprecationWarning,
49
            stacklevel=2,
50
        )
51 4
        return await self.inspect(name)
52

53 5
    async def history(self, name: str) -> Mapping:
54 4
        response = await self.docker._query_json(
55
            "images/{name}/history".format(name=name)
56
        )
57 4
        return response
58

59 5
    @overload
60 5
    async def pull(
61
        self,
62
        from_image: str,
63
        *,
64
        auth: Optional[Union[MutableMapping, str, bytes]] = None,
65
        tag: str = None,
66
        repo: str = None,
67
        stream: Literal[False] = False,
68
    ) -> Dict[str, Any]:
69 0
        pass
70

71 5
    @overload  # noqa: F811
72 5
    def pull(
73
        self,
74
        from_image: str,
75
        *,
76
        auth: Optional[Union[MutableMapping, str, bytes]] = None,
77
        tag: str = None,
78
        repo: str = None,
79
        stream: Literal[True],
80
    ) -> AsyncIterator[Dict[str, Any]]:
81 0
        pass
82

83 5
    def pull(  # noqa: F811
84
        self,
85
        from_image: str,
86
        *,
87
        auth: Optional[Union[MutableMapping, str, bytes]] = None,
88
        tag: str = None,
89
        repo: str = None,
90
        stream: bool = False,
91
    ) -> Any:
92
        """
93
        Similar to `docker pull`, pull an image locally
94

95
        Args:
96
            fromImage: name of the image to pull
97
            repo: repository name given to an image when it is imported
98
            tag: if empty when pulling an image all tags
99
                 for the given image to be pulled
100
            auth: special {'auth': base64} pull private repo
101
        """
102 5
        image = from_image  # TODO: clean up
103 5
        params = {"fromImage": image}
104 5
        headers = {}
105 5
        if repo:
106 0
            params["repo"] = repo
107 5
        if tag:
108 0
            params["tag"] = tag
109 5
        if auth is not None:
110 4
            registry, has_registry_host, _ = image.partition("/")
111 4
            if not has_registry_host:
112 4
                raise ValueError(
113
                    "Image should have registry host "
114
                    "when auth information is provided"
115
                )
116
            # TODO: assert registry == repo?
117 4
            headers["X-Registry-Auth"] = compose_auth_header(auth, registry)
118 5
        cm = self.docker._query("images/create", "POST", params=params, headers=headers)
119 5
        return self._handle_response(cm, stream)
120

121 5
    def _handle_response(self, cm, stream):
122 5
        if stream:
123 4
            return self._handle_stream(cm)
124
        else:
125 5
            return self._handle_list(cm)
126

127 5
    async def _handle_stream(self, cm):
128 4
        async with cm as response:
129 4
            async for item in json_stream_stream(response):
130 4
                yield item
131

132 5
    async def _handle_list(self, cm):
133 5
        async with cm as response:
134 5
            return await json_stream_list(response)
135

136 5
    @overload
137 5
    async def push(
138
        self,
139
        name: str,
140
        *,
141
        auth: Union[MutableMapping, str, bytes] = None,
142
        tag: str = None,
143
        stream: Literal[False] = False,
144
    ) -> Dict[str, Any]:
145 0
        pass
146

147 5
    @overload  # noqa: F811
148 5
    def push(
149
        self,
150
        name: str,
151
        *,
152
        auth: Union[MutableMapping, str, bytes] = None,
153
        tag: str = None,
154
        stream: Literal[True],
155
    ) -> AsyncIterator[Dict[str, Any]]:
156 0
        pass
157

158 5
    def push(  # noqa: F811
159
        self,
160
        name: str,
161
        *,
162
        auth: Union[MutableMapping, str, bytes] = None,
163
        tag: str = None,
164
        stream: bool = False,
165
    ) -> Any:
166 4
        params = {}
167 4
        headers = {
168
            # Anonymous push requires a dummy auth header.
169
            "X-Registry-Auth": "placeholder"
170
        }
171 4
        if tag:
172 4
            params["tag"] = tag
173 4
        if auth is not None:
174 4
            registry, has_registry_host, _ = name.partition("/")
175 4
            if not has_registry_host:
176 0
                raise ValueError(
177
                    "Image should have registry host "
178
                    "when auth information is provided"
179
                )
180 4
            headers["X-Registry-Auth"] = compose_auth_header(auth, registry)
181 4
        cm = self.docker._query(
182
            "images/{name}/push".format(name=name),
183
            "POST",
184
            params=params,
185
            headers=headers,
186
        )
187 4
        return self._handle_response(cm, stream)
188

189 5
    async def tag(self, name: str, repo: str, *, tag: str = None) -> bool:
190
        """
191
        Tag the given image so that it becomes part of a repository.
192

193
        Args:
194
            repo: the repository to tag in
195
            tag: the name for the new tag
196
        """
197 4
        params = {"repo": repo}
198

199 4
        if tag:
200 4
            params["tag"] = tag
201

202 4
        async with self.docker._query(
203
            "images/{name}/tag".format(name=name),
204
            "POST",
205
            params=params,
206
            headers={"content-type": "application/json"},
207
        ):
208 4
            return True
209

210 5
    async def delete(
211
        self, name: str, *, force: bool = False, noprune: bool = False
212
    ) -> List:
213
        """
214
        Remove an image along with any untagged parent
215
        images that were referenced by that image
216

217
        Args:
218
            name: name/id of the image to delete
219
            force: remove the image even if it is being used
220
                   by stopped containers or has other tags
221
            noprune: don't delete untagged parent images
222

223
        Returns:
224
            List of deleted images
225
        """
226 4
        params = {"force": force, "noprune": noprune}
227 4
        return await self.docker._query_json(
228
            "images/{name}".format(name=name), "DELETE", params=params
229
        )
230

231 5
    @staticmethod
232 5
    async def _stream(fileobj: BinaryIO) -> AsyncIterator[bytes]:
233 4
        chunk = fileobj.read(io.DEFAULT_BUFFER_SIZE)
234 4
        while chunk:
235 4
            yield chunk
236 4
            chunk = fileobj.read(io.DEFAULT_BUFFER_SIZE)
237

238 5
    @overload
239 5
    async def build(
240
        self,
241
        *,
242
        remote: str = None,
243
        fileobj: BinaryIO = None,
244
        path_dockerfile: str = None,
245
        tag: str = None,
246
        quiet: bool = False,
247
        nocache: bool = False,
248
        buildargs: Mapping = None,
249
        pull: bool = False,
250
        rm: bool = True,
251
        forcerm: bool = False,
252
        labels: Mapping = None,
253
        stream: Literal[False] = False,
254
        encoding: str = None,
255
    ) -> Dict[str, Any]:
256 0
        pass
257

258 5
    @overload  # noqa: F811
259 5
    def build(
260
        self,
261
        *,
262
        remote: str = None,
263
        fileobj: BinaryIO = None,
264
        path_dockerfile: str = None,
265
        tag: str = None,
266
        quiet: bool = False,
267
        nocache: bool = False,
268
        buildargs: Mapping = None,
269
        pull: bool = False,
270
        rm: bool = True,
271
        forcerm: bool = False,
272
        labels: Mapping = None,
273
        stream: Literal[True],
274
        encoding: str = None,
275
    ) -> AsyncIterator[Dict[str, Any]]:
276 0
        pass
277

278 5
    def build(  # noqa: F811
279
        self,
280
        *,
281
        remote: str = None,
282
        fileobj: BinaryIO = None,
283
        path_dockerfile: str = None,
284
        tag: str = None,
285
        quiet: bool = False,
286
        nocache: bool = False,
287
        buildargs: Mapping = None,
288
        pull: bool = False,
289
        rm: bool = True,
290
        forcerm: bool = False,
291
        labels: Mapping = None,
292
        stream: bool = False,
293
        encoding: str = None,
294
    ) -> Any:
295
        """
296
        Build an image given a remote Dockerfile
297
        or a file object with a Dockerfile inside
298

299
        Args:
300
            path_dockerfile: path within the build context to the Dockerfile
301
            remote: a Git repository URI or HTTP/HTTPS context URI
302
            quiet: suppress verbose build output
303
            nocache: do not use the cache when building the image
304
            rm: remove intermediate containers after a successful build
305
            pull: downloads any updates to the FROM image in Dockerfiles
306
            encoding: set `Content-Encoding` for the file object your send
307
            forcerm: always remove intermediate containers, even upon failure
308
            labels: arbitrary key/value labels to set on the image
309
            fileobj: a tar archive compressed or not
310
        """
311 4
        headers = {}
312

313 4
        params = {
314
            "t": tag,
315
            "rm": rm,
316
            "q": quiet,
317
            "pull": pull,
318
            "remote": remote,
319
            "nocache": nocache,
320
            "forcerm": forcerm,
321
            "dockerfile": path_dockerfile,
322
        }
323

324 4
        if remote is None and fileobj is None:
325 0
            raise ValueError("You need to specify either remote or fileobj")
326

327 4
        if fileobj and remote:
328 0
            raise ValueError("You cannot specify both fileobj and remote")
329

330 4
        if fileobj and not encoding:
331 0
            raise ValueError("You need to specify an encoding")
332

333 4
        if remote is None and fileobj is None:
334 0
            raise ValueError("Either remote or fileobj needs to be provided.")
335

336 4
        data = None
337 4
        if fileobj:
338 4
            data = self._stream(fileobj)
339 4
            headers["content-type"] = "application/x-tar"
340

341 4
        if fileobj and encoding:
342 4
            headers["Content-Encoding"] = encoding
343

344 4
        if buildargs:
345 0
            params.update({"buildargs": json.dumps(buildargs)})
346

347 4
        if labels:
348 0
            params.update({"labels": json.dumps(labels)})
349

350 4
        cm = self.docker._query(
351
            "build",
352
            "POST",
353
            params=clean_map(params),
354
            headers=headers,
355
            data=data,
356
        )
357 4
        return self._handle_response(cm, stream)
358

359 5
    def export_image(self, name: str):
360
        """
361
        Get a tarball of an image by name or id.
362

363
        Args:
364
            name: name/id of the image to be exported
365

366
        Returns:
367
            Streamreader of tarball image
368
        """
369 4
        return _ExportCM(
370
            self.docker._query("images/{name}/get".format(name=name), "GET")
371
        )
372

373 5
    def import_image(self, data, stream: bool = False):
374
        """
375
        Import tarball of image to docker.
376

377
        Args:
378
            data: tarball data of image to be imported
379

380
        Returns:
381
            Tarball of the image
382
        """
383 4
        headers = {"Content-Type": "application/x-tar"}
384 4
        cm = self.docker._query_chunked_post(
385
            "images/load", "POST", data=data, headers=headers
386
        )
387 4
        return self._handle_response(cm, stream)
388

389

390 5
class _ExportCM:
391 5
    def __init__(self, cm):
392 4
        self._cm = cm
393

394 5
    async def __aenter__(self):
395 4
        resp = await self._cm.__aenter__()
396 4
        return resp.content
397

398 5
    async def __aexit__(self, exc_type, exc_val, exc_tb):
399 4
        return await self._cm.__aexit__(exc_type, exc_val, exc_tb)

Read our documentation on viewing source code .

Loading