Spaces:
Paused
Paused
| import io | |
| import logging | |
| import os | |
| import pathlib | |
| import shutil | |
| import sys | |
| import tempfile | |
| from collections import OrderedDict | |
| from contextlib import contextmanager | |
| from typing import (IO, Dict, Iterable, Iterator, Mapping, Optional, Tuple, | |
| Union) | |
| from .parser import Binding, parse_stream | |
| from .variables import parse_variables | |
| # A type alias for a string path to be used for the paths in this file. | |
| # These paths may flow to `open()` and `shutil.move()`; `shutil.move()` | |
| # only accepts string paths, not byte paths or file descriptors. See | |
| # https://github.com/python/typeshed/pull/6832. | |
| StrPath = Union[str, 'os.PathLike[str]'] | |
| logger = logging.getLogger(__name__) | |
| def with_warn_for_invalid_lines(mappings: Iterator[Binding]) -> Iterator[Binding]: | |
| for mapping in mappings: | |
| if mapping.error: | |
| logger.warning( | |
| "Python-dotenv could not parse statement starting at line %s", | |
| mapping.original.line, | |
| ) | |
| yield mapping | |
| class DotEnv: | |
| def __init__( | |
| self, | |
| dotenv_path: Optional[StrPath], | |
| stream: Optional[IO[str]] = None, | |
| verbose: bool = False, | |
| encoding: Optional[str] = None, | |
| interpolate: bool = True, | |
| override: bool = True, | |
| ) -> None: | |
| self.dotenv_path: Optional[StrPath] = dotenv_path | |
| self.stream: Optional[IO[str]] = stream | |
| self._dict: Optional[Dict[str, Optional[str]]] = None | |
| self.verbose: bool = verbose | |
| self.encoding: Optional[str] = encoding | |
| self.interpolate: bool = interpolate | |
| self.override: bool = override | |
| def _get_stream(self) -> Iterator[IO[str]]: | |
| if self.dotenv_path and os.path.isfile(self.dotenv_path): | |
| with open(self.dotenv_path, encoding=self.encoding) as stream: | |
| yield stream | |
| elif self.stream is not None: | |
| yield self.stream | |
| else: | |
| if self.verbose: | |
| logger.info( | |
| "Python-dotenv could not find configuration file %s.", | |
| self.dotenv_path or '.env', | |
| ) | |
| yield io.StringIO('') | |
| def dict(self) -> Dict[str, Optional[str]]: | |
| """Return dotenv as dict""" | |
| if self._dict: | |
| return self._dict | |
| raw_values = self.parse() | |
| if self.interpolate: | |
| self._dict = OrderedDict(resolve_variables(raw_values, override=self.override)) | |
| else: | |
| self._dict = OrderedDict(raw_values) | |
| return self._dict | |
| def parse(self) -> Iterator[Tuple[str, Optional[str]]]: | |
| with self._get_stream() as stream: | |
| for mapping in with_warn_for_invalid_lines(parse_stream(stream)): | |
| if mapping.key is not None: | |
| yield mapping.key, mapping.value | |
| def set_as_environment_variables(self) -> bool: | |
| """ | |
| Load the current dotenv as system environment variable. | |
| """ | |
| if not self.dict(): | |
| return False | |
| for k, v in self.dict().items(): | |
| if k in os.environ and not self.override: | |
| continue | |
| if v is not None: | |
| os.environ[k] = v | |
| return True | |
| def get(self, key: str) -> Optional[str]: | |
| """ | |
| """ | |
| data = self.dict() | |
| if key in data: | |
| return data[key] | |
| if self.verbose: | |
| logger.warning("Key %s not found in %s.", key, self.dotenv_path) | |
| return None | |
| def get_key( | |
| dotenv_path: StrPath, | |
| key_to_get: str, | |
| encoding: Optional[str] = "utf-8", | |
| ) -> Optional[str]: | |
| """ | |
| Get the value of a given key from the given .env. | |
| Returns `None` if the key isn't found or doesn't have a value. | |
| """ | |
| return DotEnv(dotenv_path, verbose=True, encoding=encoding).get(key_to_get) | |
| def rewrite( | |
| path: StrPath, | |
| encoding: Optional[str], | |
| ) -> Iterator[Tuple[IO[str], IO[str]]]: | |
| pathlib.Path(path).touch() | |
| with tempfile.NamedTemporaryFile(mode="w", encoding=encoding, delete=False) as dest: | |
| error = None | |
| try: | |
| with open(path, encoding=encoding) as source: | |
| yield (source, dest) | |
| except BaseException as err: | |
| error = err | |
| if error is None: | |
| shutil.move(dest.name, path) | |
| else: | |
| os.unlink(dest.name) | |
| raise error from None | |
| def set_key( | |
| dotenv_path: StrPath, | |
| key_to_set: str, | |
| value_to_set: str, | |
| quote_mode: str = "always", | |
| export: bool = False, | |
| encoding: Optional[str] = "utf-8", | |
| ) -> Tuple[Optional[bool], str, str]: | |
| """ | |
| Adds or Updates a key/value to the given .env | |
| If the .env path given doesn't exist, fails instead of risking creating | |
| an orphan .env somewhere in the filesystem | |
| """ | |
| if quote_mode not in ("always", "auto", "never"): | |
| raise ValueError(f"Unknown quote_mode: {quote_mode}") | |
| quote = ( | |
| quote_mode == "always" | |
| or (quote_mode == "auto" and not value_to_set.isalnum()) | |
| ) | |
| if quote: | |
| value_out = "'{}'".format(value_to_set.replace("'", "\\'")) | |
| else: | |
| value_out = value_to_set | |
| if export: | |
| line_out = f'export {key_to_set}={value_out}\n' | |
| else: | |
| line_out = f"{key_to_set}={value_out}\n" | |
| with rewrite(dotenv_path, encoding=encoding) as (source, dest): | |
| replaced = False | |
| missing_newline = False | |
| for mapping in with_warn_for_invalid_lines(parse_stream(source)): | |
| if mapping.key == key_to_set: | |
| dest.write(line_out) | |
| replaced = True | |
| else: | |
| dest.write(mapping.original.string) | |
| missing_newline = not mapping.original.string.endswith("\n") | |
| if not replaced: | |
| if missing_newline: | |
| dest.write("\n") | |
| dest.write(line_out) | |
| return True, key_to_set, value_to_set | |
| def unset_key( | |
| dotenv_path: StrPath, | |
| key_to_unset: str, | |
| quote_mode: str = "always", | |
| encoding: Optional[str] = "utf-8", | |
| ) -> Tuple[Optional[bool], str]: | |
| """ | |
| Removes a given key from the given `.env` file. | |
| If the .env path given doesn't exist, fails. | |
| If the given key doesn't exist in the .env, fails. | |
| """ | |
| if not os.path.exists(dotenv_path): | |
| logger.warning("Can't delete from %s - it doesn't exist.", dotenv_path) | |
| return None, key_to_unset | |
| removed = False | |
| with rewrite(dotenv_path, encoding=encoding) as (source, dest): | |
| for mapping in with_warn_for_invalid_lines(parse_stream(source)): | |
| if mapping.key == key_to_unset: | |
| removed = True | |
| else: | |
| dest.write(mapping.original.string) | |
| if not removed: | |
| logger.warning("Key %s not removed from %s - key doesn't exist.", key_to_unset, dotenv_path) | |
| return None, key_to_unset | |
| return removed, key_to_unset | |
| def resolve_variables( | |
| values: Iterable[Tuple[str, Optional[str]]], | |
| override: bool, | |
| ) -> Mapping[str, Optional[str]]: | |
| new_values: Dict[str, Optional[str]] = {} | |
| for (name, value) in values: | |
| if value is None: | |
| result = None | |
| else: | |
| atoms = parse_variables(value) | |
| env: Dict[str, Optional[str]] = {} | |
| if override: | |
| env.update(os.environ) # type: ignore | |
| env.update(new_values) | |
| else: | |
| env.update(new_values) | |
| env.update(os.environ) # type: ignore | |
| result = "".join(atom.resolve(env) for atom in atoms) | |
| new_values[name] = result | |
| return new_values | |
| def _walk_to_root(path: str) -> Iterator[str]: | |
| """ | |
| Yield directories starting from the given directory up to the root | |
| """ | |
| if not os.path.exists(path): | |
| raise IOError('Starting path not found') | |
| if os.path.isfile(path): | |
| path = os.path.dirname(path) | |
| last_dir = None | |
| current_dir = os.path.abspath(path) | |
| while last_dir != current_dir: | |
| yield current_dir | |
| parent_dir = os.path.abspath(os.path.join(current_dir, os.path.pardir)) | |
| last_dir, current_dir = current_dir, parent_dir | |
| def find_dotenv( | |
| filename: str = '.env', | |
| raise_error_if_not_found: bool = False, | |
| usecwd: bool = False, | |
| ) -> str: | |
| """ | |
| Search in increasingly higher folders for the given file | |
| Returns path to the file if found, or an empty string otherwise | |
| """ | |
| def _is_interactive(): | |
| """ Decide whether this is running in a REPL or IPython notebook """ | |
| try: | |
| main = __import__('__main__', None, None, fromlist=['__file__']) | |
| except ModuleNotFoundError: | |
| return False | |
| return not hasattr(main, '__file__') | |
| if usecwd or _is_interactive() or getattr(sys, 'frozen', False): | |
| # Should work without __file__, e.g. in REPL or IPython notebook. | |
| path = os.getcwd() | |
| else: | |
| # will work for .py files | |
| frame = sys._getframe() | |
| current_file = __file__ | |
| while frame.f_code.co_filename == current_file or not os.path.exists( | |
| frame.f_code.co_filename | |
| ): | |
| assert frame.f_back is not None | |
| frame = frame.f_back | |
| frame_filename = frame.f_code.co_filename | |
| path = os.path.dirname(os.path.abspath(frame_filename)) | |
| for dirname in _walk_to_root(path): | |
| check_path = os.path.join(dirname, filename) | |
| if os.path.isfile(check_path): | |
| return check_path | |
| if raise_error_if_not_found: | |
| raise IOError('File not found') | |
| return '' | |
| def load_dotenv( | |
| dotenv_path: Optional[StrPath] = None, | |
| stream: Optional[IO[str]] = None, | |
| verbose: bool = False, | |
| override: bool = False, | |
| interpolate: bool = True, | |
| encoding: Optional[str] = "utf-8", | |
| ) -> bool: | |
| """Parse a .env file and then load all the variables found as environment variables. | |
| Parameters: | |
| dotenv_path: Absolute or relative path to .env file. | |
| stream: Text stream (such as `io.StringIO`) with .env content, used if | |
| `dotenv_path` is `None`. | |
| verbose: Whether to output a warning the .env file is missing. | |
| override: Whether to override the system environment variables with the variables | |
| from the `.env` file. | |
| encoding: Encoding to be used to read the file. | |
| Returns: | |
| Bool: True if at least one environment variable is set else False | |
| If both `dotenv_path` and `stream` are `None`, `find_dotenv()` is used to find the | |
| .env file. | |
| """ | |
| if dotenv_path is None and stream is None: | |
| dotenv_path = find_dotenv() | |
| dotenv = DotEnv( | |
| dotenv_path=dotenv_path, | |
| stream=stream, | |
| verbose=verbose, | |
| interpolate=interpolate, | |
| override=override, | |
| encoding=encoding, | |
| ) | |
| return dotenv.set_as_environment_variables() | |
| def dotenv_values( | |
| dotenv_path: Optional[StrPath] = None, | |
| stream: Optional[IO[str]] = None, | |
| verbose: bool = False, | |
| interpolate: bool = True, | |
| encoding: Optional[str] = "utf-8", | |
| ) -> Dict[str, Optional[str]]: | |
| """ | |
| Parse a .env file and return its content as a dict. | |
| The returned dict will have `None` values for keys without values in the .env file. | |
| For example, `foo=bar` results in `{"foo": "bar"}` whereas `foo` alone results in | |
| `{"foo": None}` | |
| Parameters: | |
| dotenv_path: Absolute or relative path to the .env file. | |
| stream: `StringIO` object with .env content, used if `dotenv_path` is `None`. | |
| verbose: Whether to output a warning if the .env file is missing. | |
| encoding: Encoding to be used to read the file. | |
| If both `dotenv_path` and `stream` are `None`, `find_dotenv()` is used to find the | |
| .env file. | |
| """ | |
| if dotenv_path is None and stream is None: | |
| dotenv_path = find_dotenv() | |
| return DotEnv( | |
| dotenv_path=dotenv_path, | |
| stream=stream, | |
| verbose=verbose, | |
| interpolate=interpolate, | |
| override=True, | |
| encoding=encoding, | |
| ).dict() | |