Spaces:
Paused
Paused
| import os | |
| import re | |
| import sys | |
| import typing as t | |
| from functools import update_wrapper | |
| from types import ModuleType | |
| from types import TracebackType | |
| from ._compat import _default_text_stderr | |
| from ._compat import _default_text_stdout | |
| from ._compat import _find_binary_writer | |
| from ._compat import auto_wrap_for_ansi | |
| from ._compat import binary_streams | |
| from ._compat import open_stream | |
| from ._compat import should_strip_ansi | |
| from ._compat import strip_ansi | |
| from ._compat import text_streams | |
| from ._compat import WIN | |
| from .globals import resolve_color_default | |
| if t.TYPE_CHECKING: | |
| import typing_extensions as te | |
| P = te.ParamSpec("P") | |
| R = t.TypeVar("R") | |
| def _posixify(name: str) -> str: | |
| return "-".join(name.split()).lower() | |
| def safecall(func: "t.Callable[P, R]") -> "t.Callable[P, t.Optional[R]]": | |
| """Wraps a function so that it swallows exceptions.""" | |
| def wrapper(*args: "P.args", **kwargs: "P.kwargs") -> t.Optional[R]: | |
| try: | |
| return func(*args, **kwargs) | |
| except Exception: | |
| pass | |
| return None | |
| return update_wrapper(wrapper, func) | |
| def make_str(value: t.Any) -> str: | |
| """Converts a value into a valid string.""" | |
| if isinstance(value, bytes): | |
| try: | |
| return value.decode(sys.getfilesystemencoding()) | |
| except UnicodeError: | |
| return value.decode("utf-8", "replace") | |
| return str(value) | |
| def make_default_short_help(help: str, max_length: int = 45) -> str: | |
| """Returns a condensed version of help string.""" | |
| # Consider only the first paragraph. | |
| paragraph_end = help.find("\n\n") | |
| if paragraph_end != -1: | |
| help = help[:paragraph_end] | |
| # Collapse newlines, tabs, and spaces. | |
| words = help.split() | |
| if not words: | |
| return "" | |
| # The first paragraph started with a "no rewrap" marker, ignore it. | |
| if words[0] == "\b": | |
| words = words[1:] | |
| total_length = 0 | |
| last_index = len(words) - 1 | |
| for i, word in enumerate(words): | |
| total_length += len(word) + (i > 0) | |
| if total_length > max_length: # too long, truncate | |
| break | |
| if word[-1] == ".": # sentence end, truncate without "..." | |
| return " ".join(words[: i + 1]) | |
| if total_length == max_length and i != last_index: | |
| break # not at sentence end, truncate with "..." | |
| else: | |
| return " ".join(words) # no truncation needed | |
| # Account for the length of the suffix. | |
| total_length += len("...") | |
| # remove words until the length is short enough | |
| while i > 0: | |
| total_length -= len(words[i]) + (i > 0) | |
| if total_length <= max_length: | |
| break | |
| i -= 1 | |
| return " ".join(words[:i]) + "..." | |
| class LazyFile: | |
| """A lazy file works like a regular file but it does not fully open | |
| the file but it does perform some basic checks early to see if the | |
| filename parameter does make sense. This is useful for safely opening | |
| files for writing. | |
| """ | |
| def __init__( | |
| self, | |
| filename: t.Union[str, "os.PathLike[str]"], | |
| mode: str = "r", | |
| encoding: t.Optional[str] = None, | |
| errors: t.Optional[str] = "strict", | |
| atomic: bool = False, | |
| ): | |
| self.name: str = os.fspath(filename) | |
| self.mode = mode | |
| self.encoding = encoding | |
| self.errors = errors | |
| self.atomic = atomic | |
| self._f: t.Optional[t.IO[t.Any]] | |
| self.should_close: bool | |
| if self.name == "-": | |
| self._f, self.should_close = open_stream(filename, mode, encoding, errors) | |
| else: | |
| if "r" in mode: | |
| # Open and close the file in case we're opening it for | |
| # reading so that we can catch at least some errors in | |
| # some cases early. | |
| open(filename, mode).close() | |
| self._f = None | |
| self.should_close = True | |
| def __getattr__(self, name: str) -> t.Any: | |
| return getattr(self.open(), name) | |
| def __repr__(self) -> str: | |
| if self._f is not None: | |
| return repr(self._f) | |
| return f"<unopened file '{format_filename(self.name)}' {self.mode}>" | |
| def open(self) -> t.IO[t.Any]: | |
| """Opens the file if it's not yet open. This call might fail with | |
| a :exc:`FileError`. Not handling this error will produce an error | |
| that Click shows. | |
| """ | |
| if self._f is not None: | |
| return self._f | |
| try: | |
| rv, self.should_close = open_stream( | |
| self.name, self.mode, self.encoding, self.errors, atomic=self.atomic | |
| ) | |
| except OSError as e: # noqa: E402 | |
| from .exceptions import FileError | |
| raise FileError(self.name, hint=e.strerror) from e | |
| self._f = rv | |
| return rv | |
| def close(self) -> None: | |
| """Closes the underlying file, no matter what.""" | |
| if self._f is not None: | |
| self._f.close() | |
| def close_intelligently(self) -> None: | |
| """This function only closes the file if it was opened by the lazy | |
| file wrapper. For instance this will never close stdin. | |
| """ | |
| if self.should_close: | |
| self.close() | |
| def __enter__(self) -> "LazyFile": | |
| return self | |
| def __exit__( | |
| self, | |
| exc_type: t.Optional[t.Type[BaseException]], | |
| exc_value: t.Optional[BaseException], | |
| tb: t.Optional[TracebackType], | |
| ) -> None: | |
| self.close_intelligently() | |
| def __iter__(self) -> t.Iterator[t.AnyStr]: | |
| self.open() | |
| return iter(self._f) # type: ignore | |
| class KeepOpenFile: | |
| def __init__(self, file: t.IO[t.Any]) -> None: | |
| self._file: t.IO[t.Any] = file | |
| def __getattr__(self, name: str) -> t.Any: | |
| return getattr(self._file, name) | |
| def __enter__(self) -> "KeepOpenFile": | |
| return self | |
| def __exit__( | |
| self, | |
| exc_type: t.Optional[t.Type[BaseException]], | |
| exc_value: t.Optional[BaseException], | |
| tb: t.Optional[TracebackType], | |
| ) -> None: | |
| pass | |
| def __repr__(self) -> str: | |
| return repr(self._file) | |
| def __iter__(self) -> t.Iterator[t.AnyStr]: | |
| return iter(self._file) | |
| def echo( | |
| message: t.Optional[t.Any] = None, | |
| file: t.Optional[t.IO[t.Any]] = None, | |
| nl: bool = True, | |
| err: bool = False, | |
| color: t.Optional[bool] = None, | |
| ) -> None: | |
| """Print a message and newline to stdout or a file. This should be | |
| used instead of :func:`print` because it provides better support | |
| for different data, files, and environments. | |
| Compared to :func:`print`, this does the following: | |
| - Ensures that the output encoding is not misconfigured on Linux. | |
| - Supports Unicode in the Windows console. | |
| - Supports writing to binary outputs, and supports writing bytes | |
| to text outputs. | |
| - Supports colors and styles on Windows. | |
| - Removes ANSI color and style codes if the output does not look | |
| like an interactive terminal. | |
| - Always flushes the output. | |
| :param message: The string or bytes to output. Other objects are | |
| converted to strings. | |
| :param file: The file to write to. Defaults to ``stdout``. | |
| :param err: Write to ``stderr`` instead of ``stdout``. | |
| :param nl: Print a newline after the message. Enabled by default. | |
| :param color: Force showing or hiding colors and other styles. By | |
| default Click will remove color if the output does not look like | |
| an interactive terminal. | |
| .. versionchanged:: 6.0 | |
| Support Unicode output on the Windows console. Click does not | |
| modify ``sys.stdout``, so ``sys.stdout.write()`` and ``print()`` | |
| will still not support Unicode. | |
| .. versionchanged:: 4.0 | |
| Added the ``color`` parameter. | |
| .. versionadded:: 3.0 | |
| Added the ``err`` parameter. | |
| .. versionchanged:: 2.0 | |
| Support colors on Windows if colorama is installed. | |
| """ | |
| if file is None: | |
| if err: | |
| file = _default_text_stderr() | |
| else: | |
| file = _default_text_stdout() | |
| # There are no standard streams attached to write to. For example, | |
| # pythonw on Windows. | |
| if file is None: | |
| return | |
| # Convert non bytes/text into the native string type. | |
| if message is not None and not isinstance(message, (str, bytes, bytearray)): | |
| out: t.Optional[t.Union[str, bytes]] = str(message) | |
| else: | |
| out = message | |
| if nl: | |
| out = out or "" | |
| if isinstance(out, str): | |
| out += "\n" | |
| else: | |
| out += b"\n" | |
| if not out: | |
| file.flush() | |
| return | |
| # If there is a message and the value looks like bytes, we manually | |
| # need to find the binary stream and write the message in there. | |
| # This is done separately so that most stream types will work as you | |
| # would expect. Eg: you can write to StringIO for other cases. | |
| if isinstance(out, (bytes, bytearray)): | |
| binary_file = _find_binary_writer(file) | |
| if binary_file is not None: | |
| file.flush() | |
| binary_file.write(out) | |
| binary_file.flush() | |
| return | |
| # ANSI style code support. For no message or bytes, nothing happens. | |
| # When outputting to a file instead of a terminal, strip codes. | |
| else: | |
| color = resolve_color_default(color) | |
| if should_strip_ansi(file, color): | |
| out = strip_ansi(out) | |
| elif WIN: | |
| if auto_wrap_for_ansi is not None: | |
| file = auto_wrap_for_ansi(file) # type: ignore | |
| elif not color: | |
| out = strip_ansi(out) | |
| file.write(out) # type: ignore | |
| file.flush() | |
| def get_binary_stream(name: "te.Literal['stdin', 'stdout', 'stderr']") -> t.BinaryIO: | |
| """Returns a system stream for byte processing. | |
| :param name: the name of the stream to open. Valid names are ``'stdin'``, | |
| ``'stdout'`` and ``'stderr'`` | |
| """ | |
| opener = binary_streams.get(name) | |
| if opener is None: | |
| raise TypeError(f"Unknown standard stream '{name}'") | |
| return opener() | |
| def get_text_stream( | |
| name: "te.Literal['stdin', 'stdout', 'stderr']", | |
| encoding: t.Optional[str] = None, | |
| errors: t.Optional[str] = "strict", | |
| ) -> t.TextIO: | |
| """Returns a system stream for text processing. This usually returns | |
| a wrapped stream around a binary stream returned from | |
| :func:`get_binary_stream` but it also can take shortcuts for already | |
| correctly configured streams. | |
| :param name: the name of the stream to open. Valid names are ``'stdin'``, | |
| ``'stdout'`` and ``'stderr'`` | |
| :param encoding: overrides the detected default encoding. | |
| :param errors: overrides the default error mode. | |
| """ | |
| opener = text_streams.get(name) | |
| if opener is None: | |
| raise TypeError(f"Unknown standard stream '{name}'") | |
| return opener(encoding, errors) | |
| def open_file( | |
| filename: str, | |
| mode: str = "r", | |
| encoding: t.Optional[str] = None, | |
| errors: t.Optional[str] = "strict", | |
| lazy: bool = False, | |
| atomic: bool = False, | |
| ) -> t.IO[t.Any]: | |
| """Open a file, with extra behavior to handle ``'-'`` to indicate | |
| a standard stream, lazy open on write, and atomic write. Similar to | |
| the behavior of the :class:`~click.File` param type. | |
| If ``'-'`` is given to open ``stdout`` or ``stdin``, the stream is | |
| wrapped so that using it in a context manager will not close it. | |
| This makes it possible to use the function without accidentally | |
| closing a standard stream: | |
| .. code-block:: python | |
| with open_file(filename) as f: | |
| ... | |
| :param filename: The name of the file to open, or ``'-'`` for | |
| ``stdin``/``stdout``. | |
| :param mode: The mode in which to open the file. | |
| :param encoding: The encoding to decode or encode a file opened in | |
| text mode. | |
| :param errors: The error handling mode. | |
| :param lazy: Wait to open the file until it is accessed. For read | |
| mode, the file is temporarily opened to raise access errors | |
| early, then closed until it is read again. | |
| :param atomic: Write to a temporary file and replace the given file | |
| on close. | |
| .. versionadded:: 3.0 | |
| """ | |
| if lazy: | |
| return t.cast( | |
| t.IO[t.Any], LazyFile(filename, mode, encoding, errors, atomic=atomic) | |
| ) | |
| f, should_close = open_stream(filename, mode, encoding, errors, atomic=atomic) | |
| if not should_close: | |
| f = t.cast(t.IO[t.Any], KeepOpenFile(f)) | |
| return f | |
| def format_filename( | |
| filename: "t.Union[str, bytes, os.PathLike[str], os.PathLike[bytes]]", | |
| shorten: bool = False, | |
| ) -> str: | |
| """Format a filename as a string for display. Ensures the filename can be | |
| displayed by replacing any invalid bytes or surrogate escapes in the name | |
| with the replacement character ``�``. | |
| Invalid bytes or surrogate escapes will raise an error when written to a | |
| stream with ``errors="strict". This will typically happen with ``stdout`` | |
| when the locale is something like ``en_GB.UTF-8``. | |
| Many scenarios *are* safe to write surrogates though, due to PEP 538 and | |
| PEP 540, including: | |
| - Writing to ``stderr``, which uses ``errors="backslashreplace"``. | |
| - The system has ``LANG=C.UTF-8``, ``C``, or ``POSIX``. Python opens | |
| stdout and stderr with ``errors="surrogateescape"``. | |
| - None of ``LANG/LC_*`` are set. Python assumes ``LANG=C.UTF-8``. | |
| - Python is started in UTF-8 mode with ``PYTHONUTF8=1`` or ``-X utf8``. | |
| Python opens stdout and stderr with ``errors="surrogateescape"``. | |
| :param filename: formats a filename for UI display. This will also convert | |
| the filename into unicode without failing. | |
| :param shorten: this optionally shortens the filename to strip of the | |
| path that leads up to it. | |
| """ | |
| if shorten: | |
| filename = os.path.basename(filename) | |
| else: | |
| filename = os.fspath(filename) | |
| if isinstance(filename, bytes): | |
| filename = filename.decode(sys.getfilesystemencoding(), "replace") | |
| else: | |
| filename = filename.encode("utf-8", "surrogateescape").decode( | |
| "utf-8", "replace" | |
| ) | |
| return filename | |
| def get_app_dir(app_name: str, roaming: bool = True, force_posix: bool = False) -> str: | |
| r"""Returns the config folder for the application. The default behavior | |
| is to return whatever is most appropriate for the operating system. | |
| To give you an idea, for an app called ``"Foo Bar"``, something like | |
| the following folders could be returned: | |
| Mac OS X: | |
| ``~/Library/Application Support/Foo Bar`` | |
| Mac OS X (POSIX): | |
| ``~/.foo-bar`` | |
| Unix: | |
| ``~/.config/foo-bar`` | |
| Unix (POSIX): | |
| ``~/.foo-bar`` | |
| Windows (roaming): | |
| ``C:\Users\<user>\AppData\Roaming\Foo Bar`` | |
| Windows (not roaming): | |
| ``C:\Users\<user>\AppData\Local\Foo Bar`` | |
| .. versionadded:: 2.0 | |
| :param app_name: the application name. This should be properly capitalized | |
| and can contain whitespace. | |
| :param roaming: controls if the folder should be roaming or not on Windows. | |
| Has no effect otherwise. | |
| :param force_posix: if this is set to `True` then on any POSIX system the | |
| folder will be stored in the home folder with a leading | |
| dot instead of the XDG config home or darwin's | |
| application support folder. | |
| """ | |
| if WIN: | |
| key = "APPDATA" if roaming else "LOCALAPPDATA" | |
| folder = os.environ.get(key) | |
| if folder is None: | |
| folder = os.path.expanduser("~") | |
| return os.path.join(folder, app_name) | |
| if force_posix: | |
| return os.path.join(os.path.expanduser(f"~/.{_posixify(app_name)}")) | |
| if sys.platform == "darwin": | |
| return os.path.join( | |
| os.path.expanduser("~/Library/Application Support"), app_name | |
| ) | |
| return os.path.join( | |
| os.environ.get("XDG_CONFIG_HOME", os.path.expanduser("~/.config")), | |
| _posixify(app_name), | |
| ) | |
| class PacifyFlushWrapper: | |
| """This wrapper is used to catch and suppress BrokenPipeErrors resulting | |
| from ``.flush()`` being called on broken pipe during the shutdown/final-GC | |
| of the Python interpreter. Notably ``.flush()`` is always called on | |
| ``sys.stdout`` and ``sys.stderr``. So as to have minimal impact on any | |
| other cleanup code, and the case where the underlying file is not a broken | |
| pipe, all calls and attributes are proxied. | |
| """ | |
| def __init__(self, wrapped: t.IO[t.Any]) -> None: | |
| self.wrapped = wrapped | |
| def flush(self) -> None: | |
| try: | |
| self.wrapped.flush() | |
| except OSError as e: | |
| import errno | |
| if e.errno != errno.EPIPE: | |
| raise | |
| def __getattr__(self, attr: str) -> t.Any: | |
| return getattr(self.wrapped, attr) | |
| def _detect_program_name( | |
| path: t.Optional[str] = None, _main: t.Optional[ModuleType] = None | |
| ) -> str: | |
| """Determine the command used to run the program, for use in help | |
| text. If a file or entry point was executed, the file name is | |
| returned. If ``python -m`` was used to execute a module or package, | |
| ``python -m name`` is returned. | |
| This doesn't try to be too precise, the goal is to give a concise | |
| name for help text. Files are only shown as their name without the | |
| path. ``python`` is only shown for modules, and the full path to | |
| ``sys.executable`` is not shown. | |
| :param path: The Python file being executed. Python puts this in | |
| ``sys.argv[0]``, which is used by default. | |
| :param _main: The ``__main__`` module. This should only be passed | |
| during internal testing. | |
| .. versionadded:: 8.0 | |
| Based on command args detection in the Werkzeug reloader. | |
| :meta private: | |
| """ | |
| if _main is None: | |
| _main = sys.modules["__main__"] | |
| if not path: | |
| path = sys.argv[0] | |
| # The value of __package__ indicates how Python was called. It may | |
| # not exist if a setuptools script is installed as an egg. It may be | |
| # set incorrectly for entry points created with pip on Windows. | |
| # It is set to "" inside a Shiv or PEX zipapp. | |
| if getattr(_main, "__package__", None) in {None, ""} or ( | |
| os.name == "nt" | |
| and _main.__package__ == "" | |
| and not os.path.exists(path) | |
| and os.path.exists(f"{path}.exe") | |
| ): | |
| # Executed a file, like "python app.py". | |
| return os.path.basename(path) | |
| # Executed a module, like "python -m example". | |
| # Rewritten by Python from "-m script" to "/path/to/script.py". | |
| # Need to look at main module to determine how it was executed. | |
| py_module = t.cast(str, _main.__package__) | |
| name = os.path.splitext(os.path.basename(path))[0] | |
| # A submodule like "example.cli". | |
| if name != "__main__": | |
| py_module = f"{py_module}.{name}" | |
| return f"python -m {py_module.lstrip('.')}" | |
| def _expand_args( | |
| args: t.Iterable[str], | |
| *, | |
| user: bool = True, | |
| env: bool = True, | |
| glob_recursive: bool = True, | |
| ) -> t.List[str]: | |
| """Simulate Unix shell expansion with Python functions. | |
| See :func:`glob.glob`, :func:`os.path.expanduser`, and | |
| :func:`os.path.expandvars`. | |
| This is intended for use on Windows, where the shell does not do any | |
| expansion. It may not exactly match what a Unix shell would do. | |
| :param args: List of command line arguments to expand. | |
| :param user: Expand user home directory. | |
| :param env: Expand environment variables. | |
| :param glob_recursive: ``**`` matches directories recursively. | |
| .. versionchanged:: 8.1 | |
| Invalid glob patterns are treated as empty expansions rather | |
| than raising an error. | |
| .. versionadded:: 8.0 | |
| :meta private: | |
| """ | |
| from glob import glob | |
| out = [] | |
| for arg in args: | |
| if user: | |
| arg = os.path.expanduser(arg) | |
| if env: | |
| arg = os.path.expandvars(arg) | |
| try: | |
| matches = glob(arg, recursive=glob_recursive) | |
| except re.error: | |
| matches = [] | |
| if not matches: | |
| out.append(arg) | |
| else: | |
| out.extend(matches) | |
| return out | |