aio-libs / aiohttp
1 10
import io
2 10
from typing import Any, Iterable, List, Optional
3 10
from urllib.parse import urlencode
4

5 10
from multidict import MultiDict, MultiDictProxy
6

7 10
from . import hdrs, multipart, payload
8 10
from .helpers import guess_filename
9 10
from .payload import Payload
10

11 10
__all__ = ("FormData",)
12

13

14 10
class FormData:
15
    """Helper class for multipart/form-data and
16
    application/x-www-form-urlencoded body generation."""
17

18 10
    def __init__(
19
        self,
20
        fields: Iterable[Any] = (),
21
        quote_fields: bool = True,
22
        charset: Optional[str] = None,
23
    ) -> None:
24 10
        self._writer = multipart.MultipartWriter("form-data")
25 10
        self._fields = []  # type: List[Any]
26 10
        self._is_multipart = False
27 10
        self._is_processed = False
28 10
        self._quote_fields = quote_fields
29 10
        self._charset = charset
30

31 10
        if isinstance(fields, dict):
32 10
            fields = list(fields.items())
33 10
        elif not isinstance(fields, (list, tuple)):
34 10
            fields = (fields,)
35 10
        self.add_fields(*fields)
36

37 10
    @property
38 10
    def is_multipart(self) -> bool:
39 10
        return self._is_multipart
40

41 10
    def add_field(
42
        self,
43
        name: str,
44
        value: Any,
45
        *,
46
        content_type: Optional[str] = None,
47
        filename: Optional[str] = None,
48
        content_transfer_encoding: Optional[str] = None,
49
    ) -> None:
50

51 10
        if isinstance(value, io.IOBase):
52 10
            self._is_multipart = True
53 10
        elif isinstance(value, (bytes, bytearray, memoryview)):
54 10
            if filename is None and content_transfer_encoding is None:
55 10
                filename = name
56

57 10
        type_options: MultiDict[str] = MultiDict({"name": name})
58 10
        if filename is not None and not isinstance(filename, str):
59 10
            raise TypeError(
60
                "filename must be an instance of str. " "Got: %s" % filename
61
            )
62 10
        if filename is None and isinstance(value, io.IOBase):
63 10
            filename = guess_filename(value, name)
64 10
        if filename is not None:
65 10
            type_options["filename"] = filename
66 10
            self._is_multipart = True
67

68 10
        headers = {}
69 10
        if content_type is not None:
70 10
            if not isinstance(content_type, str):
71 10
                raise TypeError(
72
                    "content_type must be an instance of str. " "Got: %s" % content_type
73
                )
74 10
            headers[hdrs.CONTENT_TYPE] = content_type
75 10
            self._is_multipart = True
76 10
        if content_transfer_encoding is not None:
77 10
            if not isinstance(content_transfer_encoding, str):
78 10
                raise TypeError(
79
                    "content_transfer_encoding must be an instance"
80
                    " of str. Got: %s" % content_transfer_encoding
81
                )
82 10
            headers[hdrs.CONTENT_TRANSFER_ENCODING] = content_transfer_encoding
83 10
            self._is_multipart = True
84

85 10
        self._fields.append((type_options, headers, value))
86

87 10
    def add_fields(self, *fields: Any) -> None:
88 10
        to_add = list(fields)
89

90 10
        while to_add:
91 10
            rec = to_add.pop(0)
92

93 10
            if isinstance(rec, io.IOBase):
94 10
                k = guess_filename(rec, "unknown")
95 10
                self.add_field(k, rec)  # type: ignore[arg-type]
96

97 10
            elif isinstance(rec, (MultiDictProxy, MultiDict)):
98 10
                to_add.extend(rec.items())
99

100 10
            elif isinstance(rec, (list, tuple)) and len(rec) == 2:
101 10
                k, fp = rec
102 10
                self.add_field(k, fp)  # type: ignore[arg-type]
103

104
            else:
105 10
                raise TypeError(
106
                    "Only io.IOBase, multidict and (name, file) "
107
                    "pairs allowed, use .add_field() for passing "
108
                    "more complex parameters, got {!r}".format(rec)
109
                )
110

111 10
    def _gen_form_urlencoded(self) -> payload.BytesPayload:
112
        # form data (x-www-form-urlencoded)
113 10
        data = []
114 10
        for type_options, _, value in self._fields:
115 10
            data.append((type_options["name"], value))
116

117 10
        charset = self._charset if self._charset is not None else "utf-8"
118

119 10
        if charset == "utf-8":
120 10
            content_type = "application/x-www-form-urlencoded"
121
        else:
122 10
            content_type = "application/x-www-form-urlencoded; " "charset=%s" % charset
123

124 10
        return payload.BytesPayload(
125
            urlencode(data, doseq=True, encoding=charset).encode(),
126
            content_type=content_type,
127
        )
128

129 10
    def _gen_form_data(self) -> multipart.MultipartWriter:
130
        """Encode a list of fields using the multipart/form-data MIME format"""
131 10
        if self._is_processed:
132 10
            raise RuntimeError("Form data has been processed already")
133 10
        for dispparams, headers, value in self._fields:
134 10
            try:
135 10
                if hdrs.CONTENT_TYPE in headers:
136 10
                    part = payload.get_payload(
137
                        value,
138
                        content_type=headers[hdrs.CONTENT_TYPE],
139
                        headers=headers,
140
                        encoding=self._charset,
141
                    )
142
                else:
143 10
                    part = payload.get_payload(
144
                        value, headers=headers, encoding=self._charset
145
                    )
146 10
            except Exception as exc:
147 10
                raise TypeError(
148
                    "Can not serialize value type: %r\n "
149
                    "headers: %r\n value: %r" % (type(value), headers, value)
150
                ) from exc
151

152 10
            if dispparams:
153 10
                part.set_content_disposition(
154
                    "form-data", quote_fields=self._quote_fields, **dispparams
155
                )
156
                # FIXME cgi.FieldStorage doesn't likes body parts with
157
                # Content-Length which were sent via chunked transfer encoding
158 10
                assert part.headers is not None
159 10
                part.headers.popall(hdrs.CONTENT_LENGTH, None)
160

161 10
            self._writer.append_payload(part)
162

163 10
        self._is_processed = True
164 10
        return self._writer
165

166 10
    def __call__(self) -> Payload:
167 10
        if self._is_multipart:
168 10
            return self._gen_form_data()
169
        else:
170 10
            return self._gen_form_urlencoded()

Read our documentation on viewing source code .

Loading