Spaces:
Runtime error
Runtime error
| import io | |
| import warnings | |
| from typing import Any, Iterable, List, Optional | |
| from urllib.parse import urlencode | |
| from multidict import MultiDict, MultiDictProxy | |
| from . import hdrs, multipart, payload | |
| from .helpers import guess_filename | |
| from .payload import Payload | |
| __all__ = ("FormData",) | |
| class FormData: | |
| """Helper class for form body generation. | |
| Supports multipart/form-data and application/x-www-form-urlencoded. | |
| """ | |
| def __init__( | |
| self, | |
| fields: Iterable[Any] = (), | |
| quote_fields: bool = True, | |
| charset: Optional[str] = None, | |
| ) -> None: | |
| self._writer = multipart.MultipartWriter("form-data") | |
| self._fields: List[Any] = [] | |
| self._is_multipart = False | |
| self._is_processed = False | |
| self._quote_fields = quote_fields | |
| self._charset = charset | |
| if isinstance(fields, dict): | |
| fields = list(fields.items()) | |
| elif not isinstance(fields, (list, tuple)): | |
| fields = (fields,) | |
| self.add_fields(*fields) | |
| def is_multipart(self) -> bool: | |
| return self._is_multipart | |
| def add_field( | |
| self, | |
| name: str, | |
| value: Any, | |
| *, | |
| content_type: Optional[str] = None, | |
| filename: Optional[str] = None, | |
| content_transfer_encoding: Optional[str] = None, | |
| ) -> None: | |
| if isinstance(value, io.IOBase): | |
| self._is_multipart = True | |
| elif isinstance(value, (bytes, bytearray, memoryview)): | |
| msg = ( | |
| "In v4, passing bytes will no longer create a file field. " | |
| "Please explicitly use the filename parameter or pass a BytesIO object." | |
| ) | |
| if filename is None and content_transfer_encoding is None: | |
| warnings.warn(msg, DeprecationWarning) | |
| filename = name | |
| type_options: MultiDict[str] = MultiDict({"name": name}) | |
| if filename is not None and not isinstance(filename, str): | |
| raise TypeError( | |
| "filename must be an instance of str. " "Got: %s" % filename | |
| ) | |
| if filename is None and isinstance(value, io.IOBase): | |
| filename = guess_filename(value, name) | |
| if filename is not None: | |
| type_options["filename"] = filename | |
| self._is_multipart = True | |
| headers = {} | |
| if content_type is not None: | |
| if not isinstance(content_type, str): | |
| raise TypeError( | |
| "content_type must be an instance of str. " "Got: %s" % content_type | |
| ) | |
| headers[hdrs.CONTENT_TYPE] = content_type | |
| self._is_multipart = True | |
| if content_transfer_encoding is not None: | |
| if not isinstance(content_transfer_encoding, str): | |
| raise TypeError( | |
| "content_transfer_encoding must be an instance" | |
| " of str. Got: %s" % content_transfer_encoding | |
| ) | |
| msg = ( | |
| "content_transfer_encoding is deprecated. " | |
| "To maintain compatibility with v4 please pass a BytesPayload." | |
| ) | |
| warnings.warn(msg, DeprecationWarning) | |
| self._is_multipart = True | |
| self._fields.append((type_options, headers, value)) | |
| def add_fields(self, *fields: Any) -> None: | |
| to_add = list(fields) | |
| while to_add: | |
| rec = to_add.pop(0) | |
| if isinstance(rec, io.IOBase): | |
| k = guess_filename(rec, "unknown") | |
| self.add_field(k, rec) # type: ignore[arg-type] | |
| elif isinstance(rec, (MultiDictProxy, MultiDict)): | |
| to_add.extend(rec.items()) | |
| elif isinstance(rec, (list, tuple)) and len(rec) == 2: | |
| k, fp = rec | |
| self.add_field(k, fp) # type: ignore[arg-type] | |
| else: | |
| raise TypeError( | |
| "Only io.IOBase, multidict and (name, file) " | |
| "pairs allowed, use .add_field() for passing " | |
| "more complex parameters, got {!r}".format(rec) | |
| ) | |
| def _gen_form_urlencoded(self) -> payload.BytesPayload: | |
| # form data (x-www-form-urlencoded) | |
| data = [] | |
| for type_options, _, value in self._fields: | |
| data.append((type_options["name"], value)) | |
| charset = self._charset if self._charset is not None else "utf-8" | |
| if charset == "utf-8": | |
| content_type = "application/x-www-form-urlencoded" | |
| else: | |
| content_type = "application/x-www-form-urlencoded; " "charset=%s" % charset | |
| return payload.BytesPayload( | |
| urlencode(data, doseq=True, encoding=charset).encode(), | |
| content_type=content_type, | |
| ) | |
| def _gen_form_data(self) -> multipart.MultipartWriter: | |
| """Encode a list of fields using the multipart/form-data MIME format""" | |
| if self._is_processed: | |
| raise RuntimeError("Form data has been processed already") | |
| for dispparams, headers, value in self._fields: | |
| try: | |
| if hdrs.CONTENT_TYPE in headers: | |
| part = payload.get_payload( | |
| value, | |
| content_type=headers[hdrs.CONTENT_TYPE], | |
| headers=headers, | |
| encoding=self._charset, | |
| ) | |
| else: | |
| part = payload.get_payload( | |
| value, headers=headers, encoding=self._charset | |
| ) | |
| except Exception as exc: | |
| raise TypeError( | |
| "Can not serialize value type: %r\n " | |
| "headers: %r\n value: %r" % (type(value), headers, value) | |
| ) from exc | |
| if dispparams: | |
| part.set_content_disposition( | |
| "form-data", quote_fields=self._quote_fields, **dispparams | |
| ) | |
| # FIXME cgi.FieldStorage doesn't likes body parts with | |
| # Content-Length which were sent via chunked transfer encoding | |
| assert part.headers is not None | |
| part.headers.popall(hdrs.CONTENT_LENGTH, None) | |
| self._writer.append_payload(part) | |
| self._is_processed = True | |
| return self._writer | |
| def __call__(self) -> Payload: | |
| if self._is_multipart: | |
| return self._gen_form_data() | |
| else: | |
| return self._gen_form_urlencoded() | |