Spaces:
Paused
Paused
| from __future__ import annotations | |
| import codecs | |
| from collections.abc import Callable, Mapping | |
| from dataclasses import InitVar, dataclass, field | |
| from typing import Any | |
| from ..abc import ( | |
| AnyByteReceiveStream, | |
| AnyByteSendStream, | |
| AnyByteStream, | |
| ObjectReceiveStream, | |
| ObjectSendStream, | |
| ObjectStream, | |
| ) | |
| class TextReceiveStream(ObjectReceiveStream[str]): | |
| """ | |
| Stream wrapper that decodes bytes to strings using the given encoding. | |
| Decoding is done using :class:`~codecs.IncrementalDecoder` which returns any | |
| completely received unicode characters as soon as they come in. | |
| :param transport_stream: any bytes-based receive stream | |
| :param encoding: character encoding to use for decoding bytes to strings (defaults | |
| to ``utf-8``) | |
| :param errors: handling scheme for decoding errors (defaults to ``strict``; see the | |
| `codecs module documentation`_ for a comprehensive list of options) | |
| .. _codecs module documentation: | |
| https://docs.python.org/3/library/codecs.html#codec-objects | |
| """ | |
| transport_stream: AnyByteReceiveStream | |
| encoding: InitVar[str] = "utf-8" | |
| errors: InitVar[str] = "strict" | |
| _decoder: codecs.IncrementalDecoder = field(init=False) | |
| def __post_init__(self, encoding: str, errors: str) -> None: | |
| decoder_class = codecs.getincrementaldecoder(encoding) | |
| self._decoder = decoder_class(errors=errors) | |
| async def receive(self) -> str: | |
| while True: | |
| chunk = await self.transport_stream.receive() | |
| decoded = self._decoder.decode(chunk) | |
| if decoded: | |
| return decoded | |
| async def aclose(self) -> None: | |
| await self.transport_stream.aclose() | |
| self._decoder.reset() | |
| def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]: | |
| return self.transport_stream.extra_attributes | |
| class TextSendStream(ObjectSendStream[str]): | |
| """ | |
| Sends strings to the wrapped stream as bytes using the given encoding. | |
| :param AnyByteSendStream transport_stream: any bytes-based send stream | |
| :param str encoding: character encoding to use for encoding strings to bytes | |
| (defaults to ``utf-8``) | |
| :param str errors: handling scheme for encoding errors (defaults to ``strict``; see | |
| the `codecs module documentation`_ for a comprehensive list of options) | |
| .. _codecs module documentation: | |
| https://docs.python.org/3/library/codecs.html#codec-objects | |
| """ | |
| transport_stream: AnyByteSendStream | |
| encoding: InitVar[str] = "utf-8" | |
| errors: str = "strict" | |
| _encoder: Callable[..., tuple[bytes, int]] = field(init=False) | |
| def __post_init__(self, encoding: str) -> None: | |
| self._encoder = codecs.getencoder(encoding) | |
| async def send(self, item: str) -> None: | |
| encoded = self._encoder(item, self.errors)[0] | |
| await self.transport_stream.send(encoded) | |
| async def aclose(self) -> None: | |
| await self.transport_stream.aclose() | |
| def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]: | |
| return self.transport_stream.extra_attributes | |
| class TextStream(ObjectStream[str]): | |
| """ | |
| A bidirectional stream that decodes bytes to strings on receive and encodes strings | |
| to bytes on send. | |
| Extra attributes will be provided from both streams, with the receive stream | |
| providing the values in case of a conflict. | |
| :param AnyByteStream transport_stream: any bytes-based stream | |
| :param str encoding: character encoding to use for encoding/decoding strings to/from | |
| bytes (defaults to ``utf-8``) | |
| :param str errors: handling scheme for encoding errors (defaults to ``strict``; see | |
| the `codecs module documentation`_ for a comprehensive list of options) | |
| .. _codecs module documentation: | |
| https://docs.python.org/3/library/codecs.html#codec-objects | |
| """ | |
| transport_stream: AnyByteStream | |
| encoding: InitVar[str] = "utf-8" | |
| errors: InitVar[str] = "strict" | |
| _receive_stream: TextReceiveStream = field(init=False) | |
| _send_stream: TextSendStream = field(init=False) | |
| def __post_init__(self, encoding: str, errors: str) -> None: | |
| self._receive_stream = TextReceiveStream( | |
| self.transport_stream, encoding=encoding, errors=errors | |
| ) | |
| self._send_stream = TextSendStream( | |
| self.transport_stream, encoding=encoding, errors=errors | |
| ) | |
| async def receive(self) -> str: | |
| return await self._receive_stream.receive() | |
| async def send(self, item: str) -> None: | |
| await self._send_stream.send(item) | |
| async def send_eof(self) -> None: | |
| await self.transport_stream.send_eof() | |
| async def aclose(self) -> None: | |
| await self._send_stream.aclose() | |
| await self._receive_stream.aclose() | |
| def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]: | |
| return { | |
| **self._send_stream.extra_attributes, | |
| **self._receive_stream.extra_attributes, | |
| } | |