Spaces:
Paused
Paused
| """ | |
| Cycler | |
| ====== | |
| Cycling through combinations of values, producing dictionaries. | |
| You can add cyclers:: | |
| from cycler import cycler | |
| cc = (cycler(color=list('rgb')) + | |
| cycler(linestyle=['-', '--', '-.'])) | |
| for d in cc: | |
| print(d) | |
| Results in:: | |
| {'color': 'r', 'linestyle': '-'} | |
| {'color': 'g', 'linestyle': '--'} | |
| {'color': 'b', 'linestyle': '-.'} | |
| You can multiply cyclers:: | |
| from cycler import cycler | |
| cc = (cycler(color=list('rgb')) * | |
| cycler(linestyle=['-', '--', '-.'])) | |
| for d in cc: | |
| print(d) | |
| Results in:: | |
| {'color': 'r', 'linestyle': '-'} | |
| {'color': 'r', 'linestyle': '--'} | |
| {'color': 'r', 'linestyle': '-.'} | |
| {'color': 'g', 'linestyle': '-'} | |
| {'color': 'g', 'linestyle': '--'} | |
| {'color': 'g', 'linestyle': '-.'} | |
| {'color': 'b', 'linestyle': '-'} | |
| {'color': 'b', 'linestyle': '--'} | |
| {'color': 'b', 'linestyle': '-.'} | |
| """ | |
| from __future__ import annotations | |
| from collections.abc import Hashable, Iterable, Generator | |
| import copy | |
| from functools import reduce | |
| from itertools import product, cycle | |
| from operator import mul, add | |
| # Dict, List, Union required for runtime cast calls | |
| from typing import TypeVar, Generic, Callable, Union, Dict, List, Any, overload, cast | |
| __version__ = "0.12.1" | |
| K = TypeVar("K", bound=Hashable) | |
| L = TypeVar("L", bound=Hashable) | |
| V = TypeVar("V") | |
| U = TypeVar("U") | |
| def _process_keys( | |
| left: Cycler[K, V] | Iterable[dict[K, V]], | |
| right: Cycler[K, V] | Iterable[dict[K, V]] | None, | |
| ) -> set[K]: | |
| """ | |
| Helper function to compose cycler keys. | |
| Parameters | |
| ---------- | |
| left, right : iterable of dictionaries or None | |
| The cyclers to be composed. | |
| Returns | |
| ------- | |
| keys : set | |
| The keys in the composition of the two cyclers. | |
| """ | |
| l_peek: dict[K, V] = next(iter(left)) if left != [] else {} | |
| r_peek: dict[K, V] = next(iter(right)) if right is not None else {} | |
| l_key: set[K] = set(l_peek.keys()) | |
| r_key: set[K] = set(r_peek.keys()) | |
| if l_key & r_key: | |
| raise ValueError("Can not compose overlapping cycles") | |
| return l_key | r_key | |
| def concat(left: Cycler[K, V], right: Cycler[K, U]) -> Cycler[K, V | U]: | |
| r""" | |
| Concatenate `Cycler`\s, as if chained using `itertools.chain`. | |
| The keys must match exactly. | |
| Examples | |
| -------- | |
| >>> num = cycler('a', range(3)) | |
| >>> let = cycler('a', 'abc') | |
| >>> num.concat(let) | |
| cycler('a', [0, 1, 2, 'a', 'b', 'c']) | |
| Returns | |
| ------- | |
| `Cycler` | |
| The concatenated cycler. | |
| """ | |
| if left.keys != right.keys: | |
| raise ValueError( | |
| "Keys do not match:\n" | |
| "\tIntersection: {both!r}\n" | |
| "\tDisjoint: {just_one!r}".format( | |
| both=left.keys & right.keys, just_one=left.keys ^ right.keys | |
| ) | |
| ) | |
| _l = cast(Dict[K, List[Union[V, U]]], left.by_key()) | |
| _r = cast(Dict[K, List[Union[V, U]]], right.by_key()) | |
| return reduce(add, (_cycler(k, _l[k] + _r[k]) for k in left.keys)) | |
| class Cycler(Generic[K, V]): | |
| """ | |
| Composable cycles. | |
| This class has compositions methods: | |
| ``+`` | |
| for 'inner' products (zip) | |
| ``+=`` | |
| in-place ``+`` | |
| ``*`` | |
| for outer products (`itertools.product`) and integer multiplication | |
| ``*=`` | |
| in-place ``*`` | |
| and supports basic slicing via ``[]``. | |
| Parameters | |
| ---------- | |
| left, right : Cycler or None | |
| The 'left' and 'right' cyclers. | |
| op : func or None | |
| Function which composes the 'left' and 'right' cyclers. | |
| """ | |
| def __call__(self): | |
| return cycle(self) | |
| def __init__( | |
| self, | |
| left: Cycler[K, V] | Iterable[dict[K, V]] | None, | |
| right: Cycler[K, V] | None = None, | |
| op: Any = None, | |
| ): | |
| """ | |
| Semi-private init. | |
| Do not use this directly, use `cycler` function instead. | |
| """ | |
| if isinstance(left, Cycler): | |
| self._left: Cycler[K, V] | list[dict[K, V]] = Cycler( | |
| left._left, left._right, left._op | |
| ) | |
| elif left is not None: | |
| # Need to copy the dictionary or else that will be a residual | |
| # mutable that could lead to strange errors | |
| self._left = [copy.copy(v) for v in left] | |
| else: | |
| self._left = [] | |
| if isinstance(right, Cycler): | |
| self._right: Cycler[K, V] | None = Cycler( | |
| right._left, right._right, right._op | |
| ) | |
| else: | |
| self._right = None | |
| self._keys: set[K] = _process_keys(self._left, self._right) | |
| self._op: Any = op | |
| def __contains__(self, k): | |
| return k in self._keys | |
| def keys(self) -> set[K]: | |
| """The keys this Cycler knows about.""" | |
| return set(self._keys) | |
| def change_key(self, old: K, new: K) -> None: | |
| """ | |
| Change a key in this cycler to a new name. | |
| Modification is performed in-place. | |
| Does nothing if the old key is the same as the new key. | |
| Raises a ValueError if the new key is already a key. | |
| Raises a KeyError if the old key isn't a key. | |
| """ | |
| if old == new: | |
| return | |
| if new in self._keys: | |
| raise ValueError( | |
| f"Can't replace {old} with {new}, {new} is already a key" | |
| ) | |
| if old not in self._keys: | |
| raise KeyError( | |
| f"Can't replace {old} with {new}, {old} is not a key" | |
| ) | |
| self._keys.remove(old) | |
| self._keys.add(new) | |
| if self._right is not None and old in self._right.keys: | |
| self._right.change_key(old, new) | |
| # self._left should always be non-None | |
| # if self._keys is non-empty. | |
| elif isinstance(self._left, Cycler): | |
| self._left.change_key(old, new) | |
| else: | |
| # It should be completely safe at this point to | |
| # assume that the old key can be found in each | |
| # iteration. | |
| self._left = [{new: entry[old]} for entry in self._left] | |
| def _from_iter(cls, label: K, itr: Iterable[V]) -> Cycler[K, V]: | |
| """ | |
| Class method to create 'base' Cycler objects | |
| that do not have a 'right' or 'op' and for which | |
| the 'left' object is not another Cycler. | |
| Parameters | |
| ---------- | |
| label : hashable | |
| The property key. | |
| itr : iterable | |
| Finite length iterable of the property values. | |
| Returns | |
| ------- | |
| `Cycler` | |
| New 'base' cycler. | |
| """ | |
| ret: Cycler[K, V] = cls(None) | |
| ret._left = list({label: v} for v in itr) | |
| ret._keys = {label} | |
| return ret | |
| def __getitem__(self, key: slice) -> Cycler[K, V]: | |
| # TODO : maybe add numpy style fancy slicing | |
| if isinstance(key, slice): | |
| trans = self.by_key() | |
| return reduce(add, (_cycler(k, v[key]) for k, v in trans.items())) | |
| else: | |
| raise ValueError("Can only use slices with Cycler.__getitem__") | |
| def __iter__(self) -> Generator[dict[K, V], None, None]: | |
| if self._right is None: | |
| for left in self._left: | |
| yield dict(left) | |
| else: | |
| if self._op is None: | |
| raise TypeError( | |
| "Operation cannot be None when both left and right are defined" | |
| ) | |
| for a, b in self._op(self._left, self._right): | |
| out = {} | |
| out.update(a) | |
| out.update(b) | |
| yield out | |
| def __add__(self, other: Cycler[L, U]) -> Cycler[K | L, V | U]: | |
| """ | |
| Pair-wise combine two equal length cyclers (zip). | |
| Parameters | |
| ---------- | |
| other : Cycler | |
| """ | |
| if len(self) != len(other): | |
| raise ValueError( | |
| f"Can only add equal length cycles, not {len(self)} and {len(other)}" | |
| ) | |
| return Cycler( | |
| cast(Cycler[Union[K, L], Union[V, U]], self), | |
| cast(Cycler[Union[K, L], Union[V, U]], other), | |
| zip | |
| ) | |
| def __mul__(self, other: Cycler[L, U]) -> Cycler[K | L, V | U]: | |
| ... | |
| def __mul__(self, other: int) -> Cycler[K, V]: | |
| ... | |
| def __mul__(self, other): | |
| """ | |
| Outer product of two cyclers (`itertools.product`) or integer | |
| multiplication. | |
| Parameters | |
| ---------- | |
| other : Cycler or int | |
| """ | |
| if isinstance(other, Cycler): | |
| return Cycler( | |
| cast(Cycler[Union[K, L], Union[V, U]], self), | |
| cast(Cycler[Union[K, L], Union[V, U]], other), | |
| product | |
| ) | |
| elif isinstance(other, int): | |
| trans = self.by_key() | |
| return reduce( | |
| add, (_cycler(k, v * other) for k, v in trans.items()) | |
| ) | |
| else: | |
| return NotImplemented | |
| def __rmul__(self, other: Cycler[L, U]) -> Cycler[K | L, V | U]: | |
| ... | |
| def __rmul__(self, other: int) -> Cycler[K, V]: | |
| ... | |
| def __rmul__(self, other): | |
| return self * other | |
| def __len__(self) -> int: | |
| op_dict: dict[Callable, Callable[[int, int], int]] = {zip: min, product: mul} | |
| if self._right is None: | |
| return len(self._left) | |
| l_len = len(self._left) | |
| r_len = len(self._right) | |
| return op_dict[self._op](l_len, r_len) | |
| # iadd and imul do not exapand the the type as the returns must be consistent with | |
| # self, thus they flag as inconsistent with add/mul | |
| def __iadd__(self, other: Cycler[K, V]) -> Cycler[K, V]: # type: ignore[misc] | |
| """ | |
| In-place pair-wise combine two equal length cyclers (zip). | |
| Parameters | |
| ---------- | |
| other : Cycler | |
| """ | |
| if not isinstance(other, Cycler): | |
| raise TypeError("Cannot += with a non-Cycler object") | |
| # True shallow copy of self is fine since this is in-place | |
| old_self = copy.copy(self) | |
| self._keys = _process_keys(old_self, other) | |
| self._left = old_self | |
| self._op = zip | |
| self._right = Cycler(other._left, other._right, other._op) | |
| return self | |
| def __imul__(self, other: Cycler[K, V] | int) -> Cycler[K, V]: # type: ignore[misc] | |
| """ | |
| In-place outer product of two cyclers (`itertools.product`). | |
| Parameters | |
| ---------- | |
| other : Cycler | |
| """ | |
| if not isinstance(other, Cycler): | |
| raise TypeError("Cannot *= with a non-Cycler object") | |
| # True shallow copy of self is fine since this is in-place | |
| old_self = copy.copy(self) | |
| self._keys = _process_keys(old_self, other) | |
| self._left = old_self | |
| self._op = product | |
| self._right = Cycler(other._left, other._right, other._op) | |
| return self | |
| def __eq__(self, other: object) -> bool: | |
| if not isinstance(other, Cycler): | |
| return False | |
| if len(self) != len(other): | |
| return False | |
| if self.keys ^ other.keys: | |
| return False | |
| return all(a == b for a, b in zip(self, other)) | |
| __hash__ = None # type: ignore | |
| def __repr__(self) -> str: | |
| op_map = {zip: "+", product: "*"} | |
| if self._right is None: | |
| lab = self.keys.pop() | |
| itr = list(v[lab] for v in self) | |
| return f"cycler({lab!r}, {itr!r})" | |
| else: | |
| op = op_map.get(self._op, "?") | |
| msg = "({left!r} {op} {right!r})" | |
| return msg.format(left=self._left, op=op, right=self._right) | |
| def _repr_html_(self) -> str: | |
| # an table showing the value of each key through a full cycle | |
| output = "<table>" | |
| sorted_keys = sorted(self.keys, key=repr) | |
| for key in sorted_keys: | |
| output += f"<th>{key!r}</th>" | |
| for d in iter(self): | |
| output += "<tr>" | |
| for k in sorted_keys: | |
| output += f"<td>{d[k]!r}</td>" | |
| output += "</tr>" | |
| output += "</table>" | |
| return output | |
| def by_key(self) -> dict[K, list[V]]: | |
| """ | |
| Values by key. | |
| This returns the transposed values of the cycler. Iterating | |
| over a `Cycler` yields dicts with a single value for each key, | |
| this method returns a `dict` of `list` which are the values | |
| for the given key. | |
| The returned value can be used to create an equivalent `Cycler` | |
| using only `+`. | |
| Returns | |
| ------- | |
| transpose : dict | |
| dict of lists of the values for each key. | |
| """ | |
| # TODO : sort out if this is a bottle neck, if there is a better way | |
| # and if we care. | |
| keys = self.keys | |
| out: dict[K, list[V]] = {k: list() for k in keys} | |
| for d in self: | |
| for k in keys: | |
| out[k].append(d[k]) | |
| return out | |
| # for back compatibility | |
| _transpose = by_key | |
| def simplify(self) -> Cycler[K, V]: | |
| """ | |
| Simplify the cycler into a sum (but no products) of cyclers. | |
| Returns | |
| ------- | |
| simple : Cycler | |
| """ | |
| # TODO: sort out if it is worth the effort to make sure this is | |
| # balanced. Currently it is is | |
| # (((a + b) + c) + d) vs | |
| # ((a + b) + (c + d)) | |
| # I would believe that there is some performance implications | |
| trans = self.by_key() | |
| return reduce(add, (_cycler(k, v) for k, v in trans.items())) | |
| concat = concat | |
| def cycler(arg: Cycler[K, V]) -> Cycler[K, V]: | |
| ... | |
| def cycler(**kwargs: Iterable[V]) -> Cycler[str, V]: | |
| ... | |
| def cycler(label: K, itr: Iterable[V]) -> Cycler[K, V]: | |
| ... | |
| def cycler(*args, **kwargs): | |
| """ | |
| Create a new `Cycler` object from a single positional argument, | |
| a pair of positional arguments, or the combination of keyword arguments. | |
| cycler(arg) | |
| cycler(label1=itr1[, label2=iter2[, ...]]) | |
| cycler(label, itr) | |
| Form 1 simply copies a given `Cycler` object. | |
| Form 2 composes a `Cycler` as an inner product of the | |
| pairs of keyword arguments. In other words, all of the | |
| iterables are cycled simultaneously, as if through zip(). | |
| Form 3 creates a `Cycler` from a label and an iterable. | |
| This is useful for when the label cannot be a keyword argument | |
| (e.g., an integer or a name that has a space in it). | |
| Parameters | |
| ---------- | |
| arg : Cycler | |
| Copy constructor for Cycler (does a shallow copy of iterables). | |
| label : name | |
| The property key. In the 2-arg form of the function, | |
| the label can be any hashable object. In the keyword argument | |
| form of the function, it must be a valid python identifier. | |
| itr : iterable | |
| Finite length iterable of the property values. | |
| Can be a single-property `Cycler` that would | |
| be like a key change, but as a shallow copy. | |
| Returns | |
| ------- | |
| cycler : Cycler | |
| New `Cycler` for the given property | |
| """ | |
| if args and kwargs: | |
| raise TypeError( | |
| "cycler() can only accept positional OR keyword arguments -- not both." | |
| ) | |
| if len(args) == 1: | |
| if not isinstance(args[0], Cycler): | |
| raise TypeError( | |
| "If only one positional argument given, it must " | |
| "be a Cycler instance." | |
| ) | |
| return Cycler(args[0]) | |
| elif len(args) == 2: | |
| return _cycler(*args) | |
| elif len(args) > 2: | |
| raise TypeError( | |
| "Only a single Cycler can be accepted as the lone " | |
| "positional argument. Use keyword arguments instead." | |
| ) | |
| if kwargs: | |
| return reduce(add, (_cycler(k, v) for k, v in kwargs.items())) | |
| raise TypeError("Must have at least a positional OR keyword arguments") | |
| def _cycler(label: K, itr: Iterable[V]) -> Cycler[K, V]: | |
| """ | |
| Create a new `Cycler` object from a property name and iterable of values. | |
| Parameters | |
| ---------- | |
| label : hashable | |
| The property key. | |
| itr : iterable | |
| Finite length iterable of the property values. | |
| Returns | |
| ------- | |
| cycler : Cycler | |
| New `Cycler` for the given property | |
| """ | |
| if isinstance(itr, Cycler): | |
| keys = itr.keys | |
| if len(keys) != 1: | |
| msg = "Can not create Cycler from a multi-property Cycler" | |
| raise ValueError(msg) | |
| lab = keys.pop() | |
| # Doesn't need to be a new list because | |
| # _from_iter() will be creating that new list anyway. | |
| itr = (v[lab] for v in itr) | |
| return Cycler._from_iter(label, itr) | |