From a4ca731833e4e270a636594bbccc01e3301beb5d Mon Sep 17 00:00:00 2001 From: Setsugennoao <41454651+Setsugennoao@users.noreply.github.com> Date: Sun, 29 Oct 2023 16:54:07 +0100 Subject: [PATCH] Initial commit All this I initially wrote in https://github.com/Irrational-Encoding-Wizardry/vs-tools --- stgpytools/__init__.py | 5 + stgpytools/enums/__init__.py | 2 + stgpytools/enums/base.py | 77 +++++ stgpytools/enums/other.py | 56 ++++ stgpytools/exceptions/__init__.py | 5 + stgpytools/exceptions/base.py | 211 +++++++++++++ stgpytools/exceptions/enum.py | 11 + stgpytools/exceptions/file.py | 32 ++ stgpytools/exceptions/generic.py | 45 +++ stgpytools/exceptions/module.py | 39 +++ stgpytools/functions/__init__.py | 2 + stgpytools/functions/funcs.py | 152 +++++++++ stgpytools/functions/normalize.py | 254 +++++++++++++++ stgpytools/py.typed | 0 stgpytools/types/__init__.py | 6 + stgpytools/types/builtins.py | 75 +++++ stgpytools/types/file.py | 103 ++++++ stgpytools/types/funcs.py | 109 +++++++ stgpytools/types/generic.py | 54 ++++ stgpytools/types/supports.py | 127 ++++++++ stgpytools/types/utils.py | 509 ++++++++++++++++++++++++++++++ stgpytools/utils/__init__.py | 4 + stgpytools/utils/file.py | 253 +++++++++++++++ stgpytools/utils/funcs.py | 35 ++ stgpytools/utils/math.py | 149 +++++++++ stgpytools/utils/ranges.py | 89 ++++++ 26 files changed, 2404 insertions(+) create mode 100644 stgpytools/__init__.py create mode 100644 stgpytools/enums/__init__.py create mode 100644 stgpytools/enums/base.py create mode 100644 stgpytools/enums/other.py create mode 100644 stgpytools/exceptions/__init__.py create mode 100644 stgpytools/exceptions/base.py create mode 100644 stgpytools/exceptions/enum.py create mode 100644 stgpytools/exceptions/file.py create mode 100644 stgpytools/exceptions/generic.py create mode 100644 stgpytools/exceptions/module.py create mode 100644 stgpytools/functions/__init__.py create mode 100644 stgpytools/functions/funcs.py create mode 100644 stgpytools/functions/normalize.py create mode 100644 stgpytools/py.typed create mode 100644 stgpytools/types/__init__.py create mode 100644 stgpytools/types/builtins.py create mode 100644 stgpytools/types/file.py create mode 100644 stgpytools/types/funcs.py create mode 100644 stgpytools/types/generic.py create mode 100644 stgpytools/types/supports.py create mode 100644 stgpytools/types/utils.py create mode 100644 stgpytools/utils/__init__.py create mode 100644 stgpytools/utils/file.py create mode 100644 stgpytools/utils/funcs.py create mode 100644 stgpytools/utils/math.py create mode 100644 stgpytools/utils/ranges.py diff --git a/stgpytools/__init__.py b/stgpytools/__init__.py new file mode 100644 index 0000000..2ac507f --- /dev/null +++ b/stgpytools/__init__.py @@ -0,0 +1,5 @@ +from .enums import * # noqa: F401, F403 +from .exceptions import * # noqa: F401, F403 +from .functions import * # noqa: F401, F403 +from .types import * # noqa: F401, F403 +from .utils import * # noqa: F401, F403 diff --git a/stgpytools/enums/__init__.py b/stgpytools/enums/__init__.py new file mode 100644 index 0000000..b2c1021 --- /dev/null +++ b/stgpytools/enums/__init__.py @@ -0,0 +1,2 @@ +from .base import * # noqa: F401, F403 +from .other import * # noqa: F401, F403 diff --git a/stgpytools/enums/base.py b/stgpytools/enums/base.py new file mode 100644 index 0000000..182b872 --- /dev/null +++ b/stgpytools/enums/base.py @@ -0,0 +1,77 @@ +from __future__ import annotations + +from enum import Enum +from typing import Any, TypeVar + +from ..exceptions import CustomValueError, NotFoundEnumValue +from ..types import FuncExceptT + +__all__ = [ + 'SelfEnum', + 'CustomEnum', 'CustomIntEnum', 'CustomStrEnum' +] + + +class CustomEnum(Enum): + """Base class for custom enums.""" + + @classmethod + def _missing_(cls: type[SelfEnum], value: Any) -> SelfEnum | None: + return cls.from_param(value) + + @classmethod + def from_param(cls: type[SelfEnum], value: Any, func_except: FuncExceptT | None = None) -> SelfEnum | None: + """ + Return the enum value from a parameter. + + :param value: Value to instantiate the enum class. + :param func_except: Exception function. + + :return: Enum value. + + :raises NotFoundEnumValue: Variable not found in the given enum. + """ + + if value is None: + return None + + if func_except is None: + func_except = cls.from_param + + if isinstance(value, cls): + return value + + if value is cls: + raise CustomValueError('You must select a member, not pass the enum!', func_except) + + try: + return cls(value) + except ValueError: + ... + + if isinstance(func_except, tuple): + func_name, var_name = func_except + else: + func_name, var_name = func_except, '' + + raise NotFoundEnumValue( + 'Value for "{var_name}" argument must be a valid {enum_name}.\n' + 'It can be a value in [{readable_enum}].', func_name, + var_name=var_name, enum_name=cls, + readable_enum=iter([f'{x.name} ({x.value})' for x in cls]) + ) + + +class CustomIntEnum(int, CustomEnum): + """Base class for custom int enums.""" + + value: int + + +class CustomStrEnum(str, CustomEnum): + """Base class for custom str enums.""" + + value: str + + +SelfEnum = TypeVar('SelfEnum', bound=CustomEnum) diff --git a/stgpytools/enums/other.py b/stgpytools/enums/other.py new file mode 100644 index 0000000..f211db0 --- /dev/null +++ b/stgpytools/enums/other.py @@ -0,0 +1,56 @@ +from __future__ import annotations + +from typing import TypeVar, overload + +__all__ = [ + 'Coordinate', + 'Position', + 'Size' +] + + +class Coordinate: + """ + Positive set of (x, y) coordinates. + + :raises ValueError: Negative values were passed. + """ + + x: int + """Horizontal coordinate.""" + + y: int + """Vertical coordinate.""" + + @overload + def __init__(self: SelfCoord, other: tuple[int, int] | SelfCoord, /) -> None: + ... + + @overload + def __init__(self: SelfCoord, x: int, y: int, /) -> None: + ... + + def __init__(self: SelfCoord, x_or_self: int | tuple[int, int] | SelfCoord, y: int, /) -> None: # type: ignore + from ..exceptions import CustomValueError + + if isinstance(x_or_self, int): + x = x_or_self + else: + x, y = x_or_self if isinstance(x_or_self, tuple) else (x_or_self.x, x_or_self.y) + + if x < 0 or y < 0: + raise CustomValueError("Values can't be negative!", self.__class__) + + self.x = x + self.y = y + + +SelfCoord = TypeVar('SelfCoord', bound=Coordinate) + + +class Position(Coordinate): + """Positive set of an (x,y) offset relative to the top left corner of an area.""" + + +class Size(Coordinate): + """Positive set of an (x,y), (horizontal,vertical), size of an area.""" diff --git a/stgpytools/exceptions/__init__.py b/stgpytools/exceptions/__init__.py new file mode 100644 index 0000000..5008cf6 --- /dev/null +++ b/stgpytools/exceptions/__init__.py @@ -0,0 +1,5 @@ +from .base import * # noqa: F401, F403 +from .enum import * # noqa: F401, F403 +from .file import * # noqa: F401, F403 +from .generic import * # noqa: F401, F403 +from .module import * # noqa: F401, F403 diff --git a/stgpytools/exceptions/base.py b/stgpytools/exceptions/base.py new file mode 100644 index 0000000..6a63647 --- /dev/null +++ b/stgpytools/exceptions/base.py @@ -0,0 +1,211 @@ +from __future__ import annotations + +import sys +from copy import deepcopy +from typing import TYPE_CHECKING, Any, TypeVar + +from ..types import MISSING, FuncExceptT, SupportsString + +__all__ = [ + 'CustomError', + + 'CustomValueError', + 'CustomIndexError', + 'CustomOverflowError', + 'CustomKeyError', + 'CustomTypeError', + 'CustomRuntimeError', + 'CustomNotImplementedError', + 'CustomPermissionError' +] + + +if TYPE_CHECKING: + class ExceptionT(Exception, type): + ... +else: + ExceptionT = Exception + + +class CustomErrorMeta(type): + """Custom base exception meta class.""" + + def __new__(cls: type[SelfCErrorMeta], *args: Any) -> SelfCErrorMeta: + return CustomErrorMeta.setup_exception(type.__new__(cls, *args)) # type: ignore + + @staticmethod + def setup_exception(exception: SelfCErrorMeta, override: str | ExceptionT | None = None) -> SelfCErrorMeta: + """ + Setup an exception for later use in CustomError. + + :param exception: Exception to update. + :param override: Optional name or exception from which get the override values. + + :return: Set up exception. + """ + + if override: + if isinstance(override, str): + over_name = over_qual = override + else: + over_name, over_qual = override.__name__, override.__qualname__ + + if over_name.startswith('Custom'): + exception.__name__ = over_name + else: + exception.__name__ = f'Custom{over_name}' + + exception.__qualname__ = over_qual + + if exception.__qualname__.startswith('Custom'): + exception.__qualname__ = exception.__qualname__[6:] + + if sys.stdout and sys.stdout.isatty(): + exception.__qualname__ = f'\033[0;31;1m{exception.__qualname__}\033[0m' + + exception.__module__ = Exception.__module__ + + return exception + + if TYPE_CHECKING: + def __getitem__(self, exception: type[Exception]) -> CustomError: + ... + + +SelfCErrorMeta = TypeVar('SelfCErrorMeta', bound=CustomErrorMeta) + + +class CustomError(ExceptionT, metaclass=CustomErrorMeta): + """Custom base exception class.""" + + def __init__( + self, message: SupportsString | None = None, func: FuncExceptT | None = None, reason: Any = None, **kwargs: Any + ) -> None: + """ + Instantiate a new exception with pretty printing and more. + + :param message: Message of the error. + :param func: Function this exception was raised from. + :param reason: Reason of the exception. For example, an optional parameter. + """ + + self.message = message + self.func = func + self.reason = reason + self.kwargs = kwargs + + super().__init__(message) + + def __class_getitem__(cls, exception: str | type[ExceptionT] | ExceptionT) -> CustomError: + if isinstance(exception, str): + class inner_exception(cls): # type: ignore + ... + else: + if not issubclass(exception, type): + exception = exception.__class__ # type: ignore + + class inner_exception(cls, exception): # type: ignore + ... + + return CustomErrorMeta.setup_exception(inner_exception, exception) # type: ignore + + def __call__( + self: SelfError, message: SupportsString | None = MISSING, + func: FuncExceptT | None = MISSING, reason: SupportsString | FuncExceptT | None = MISSING, # type: ignore + **kwargs: Any + ) -> SelfError: + """ + Copy an existing exception with defaults and instantiate a new one. + + :param message: Message of the error. + :param func: Function this exception was raised from. + :param reason: Reason of the exception. For example, an optional parameter. + """ + + err = deepcopy(self) + + if message is not MISSING: + err.message = message + + if func is not MISSING: # type: ignore[comparison-overlap] + err.func = func + + if reason is not MISSING: + err.reason = reason + + err.kwargs |= kwargs + + return err + + def __str__(self) -> str: + from ..functions import norm_display_name, norm_func_name + + message = self.message + + if not message: + message = 'An error occurred!' + + if self.func: + func_header = norm_func_name(self.func).strip() + + if sys.stdout and sys.stdout.isatty(): + func_header = f'\033[0;36m{func_header}\033[0m' + + func_header = f'({func_header}) ' + else: + func_header = '' + + if self.kwargs: + self.kwargs = { + key: norm_display_name(value) for key, value in self.kwargs.items() + } + + if self.reason: + reason = self.reason = norm_display_name(self.reason) + + if reason: + if not isinstance(self.reason, dict): + reason = f'({reason})' + + if sys.stdout and sys.stdout.isatty(): + reason = f'\033[0;33m{reason}\033[0m' + reason = f' {reason}' + else: + reason = '' + + return f'{func_header}{self.message!s}{reason}'.format(**self.kwargs).strip() + + +SelfError = TypeVar('SelfError', bound=CustomError) + + +class CustomValueError(CustomError, ValueError): + """Thrown when a specified value is invalid.""" + + +class CustomIndexError(CustomError, IndexError): + """Thrown when an index or generic numeric value is out of bound.""" + + +class CustomOverflowError(CustomError, OverflowError): + """Thrown when a value is out of range. e.g. temporal radius too big.""" + + +class CustomKeyError(CustomError, KeyError): + """Thrown when trying to access an non-existent key.""" + + +class CustomTypeError(CustomError, TypeError): + """Thrown when a passed argument is of wrong type.""" + + +class CustomRuntimeError(CustomError, RuntimeError): + """Thrown when a runtime error occurs.""" + + +class CustomNotImplementedError(CustomError, NotImplementedError): + """Thrown when you encounter a yet not implemented branch of code.""" + + +class CustomPermissionError(CustomError, PermissionError): + """Thrown when the user can't perform an action.""" diff --git a/stgpytools/exceptions/enum.py b/stgpytools/exceptions/enum.py new file mode 100644 index 0000000..739df7b --- /dev/null +++ b/stgpytools/exceptions/enum.py @@ -0,0 +1,11 @@ +from __future__ import annotations + +from .base import CustomKeyError + +__all__ = [ + 'NotFoundEnumValue' +] + + +class NotFoundEnumValue(CustomKeyError): + """Raised when you try to instantiate an Enum with unknown value""" diff --git a/stgpytools/exceptions/file.py b/stgpytools/exceptions/file.py new file mode 100644 index 0000000..ed9eac4 --- /dev/null +++ b/stgpytools/exceptions/file.py @@ -0,0 +1,32 @@ +from __future__ import annotations + +from .base import CustomError, CustomPermissionError + + +__all__ = [ + 'FileNotExistsError', + 'FileWasNotFoundError', + 'FilePermissionError', + 'FileTypeMismatchError', + 'FileIsADirectoryError' +] + + +class FileNotExistsError(CustomError, FileExistsError): + """Raised when a file doesn't exists""" + + +class FileWasNotFoundError(CustomError, FileNotFoundError): + """Raised when a file wasn't found but the path is correct, e.g. parent directory exists""" + + +class FilePermissionError(CustomPermissionError): + """Raised when you try to access a file but haven't got permissions to do so""" + + +class FileTypeMismatchError(CustomError, OSError): + """Raised when you try to access a file with a FileType != AUTO and it's another file type""" + + +class FileIsADirectoryError(CustomError, IsADirectoryError): + """Raised when you try to access a file but it's a directory instead""" diff --git a/stgpytools/exceptions/generic.py b/stgpytools/exceptions/generic.py new file mode 100644 index 0000000..51f5666 --- /dev/null +++ b/stgpytools/exceptions/generic.py @@ -0,0 +1,45 @@ +from __future__ import annotations + +from typing import Any, Iterable + +from ..types import FuncExceptT, SupportsString, T +from .base import CustomValueError + +__all__ = [ + 'MismatchError', 'MismatchRefError' +] + + +class MismatchError(CustomValueError): + """Raised when there's a mismatch between two or more values.""" + + @classmethod + def _item_to_name(cls, item: Any) -> str: + return str(item) + + @classmethod + def _reduce(cls, items: Iterable[T]) -> tuple[str]: + return tuple[str](dict.fromkeys(map(cls._item_to_name, items)).keys()) # type: ignore + + def __init__( + self, func: FuncExceptT, items: Iterable[T], message: SupportsString = 'All items must be equal!', + reason: Any = '{reduced_items}', **kwargs: Any + ) -> None: + super().__init__(message, func, reason, **kwargs, reduced_items=iter(self._reduce(items))) + + @classmethod + def check(cls, func: FuncExceptT, *items: T, **kwargs: Any) -> None: + if len(cls._reduce(items)) != 1: + raise cls(func, items, **kwargs) + + +class MismatchRefError(MismatchError): + def __init__( + self, func: FuncExceptT, base: T, ref: T, message: SupportsString = 'All items must be equal!', **kwargs: Any + ) -> None: + super().__init__(func, [base, ref], message, **kwargs) + + @classmethod + def check(cls, func: FuncExceptT, *items: T, **kwargs: Any) -> None: + if len(cls._reduce(items)) != 1: + raise cls(func, *items, **kwargs) diff --git a/stgpytools/exceptions/module.py b/stgpytools/exceptions/module.py new file mode 100644 index 0000000..6b75eed --- /dev/null +++ b/stgpytools/exceptions/module.py @@ -0,0 +1,39 @@ +from __future__ import annotations + +from typing import Any + +from ..types import FuncExceptT, SupportsString +from .base import CustomError + +__all__ = [ + 'CustomImportError', + 'DependencyNotFoundError' +] + + +class CustomImportError(CustomError, ImportError): + """Raised when there's a general import error.""" + + def __init__( + self, func: FuncExceptT, package: str | ImportError, + message: SupportsString = "Import failed for package '{package}'!", + **kwargs: Any + ) -> None: + """ + :param func: Function this error was raised from. + :param package: Either the raised error or the name of the missing package. + :param message: Custom error message. + """ + + super().__init__(message, func, package=package if isinstance(package, str) else package.name, **kwargs) + + +class DependencyNotFoundError(CustomImportError): + """Raised when there's a missing optional dependency.""" + + def __init__( + self, func: FuncExceptT, package: str | ImportError, + message: SupportsString = "Missing dependency '{package}'!", + **kwargs: Any + ) -> None: + super().__init__(func, package, message, **kwargs) diff --git a/stgpytools/functions/__init__.py b/stgpytools/functions/__init__.py new file mode 100644 index 0000000..f342f02 --- /dev/null +++ b/stgpytools/functions/__init__.py @@ -0,0 +1,2 @@ +from .funcs import * # noqa: F401, F403 +from .normalize import * # noqa: F401, F403 diff --git a/stgpytools/functions/funcs.py b/stgpytools/functions/funcs.py new file mode 100644 index 0000000..4e730fa --- /dev/null +++ b/stgpytools/functions/funcs.py @@ -0,0 +1,152 @@ +from __future__ import annotations + +from typing import Callable, Concatenate, overload + +from ..exceptions import CustomRuntimeError +from ..types import MISSING, KwargsT, MissingT, P, R, T + +__all__ = [ + 'iterate', 'fallback', 'kwargs_fallback' +] + + +def iterate( + base: T, function: Callable[Concatenate[T | R, P], T | R], + count: int, *args: P.args, **kwargs: P.kwargs +) -> T | R: + """ + Execute a given function over the base value multiple times. + + Different from regular iteration functions is that you do not need to pass a partial object. + This function accepts *args and **kwargs. These will be passed on to the given function. + + Examples: + + >>> iterate(5, lambda x: x * 2, 2) + 20 + + :param base: Base value, etc. to iterate over. + :param function: Function to iterate over the base. + :param count: Number of times to execute function. + :param *args: Positional arguments to pass to the given function. + :param **kwargs: Keyword arguments to pass to the given function. + + :return: Value, etc. with the given function run over it + *n* amount of times based on the given count. + """ + + if count <= 0: + return base + + result: T | R = base + + for _ in range(count): + result = function(result, *args, **kwargs) + + return result + + +fallback_missing = object() + + +@overload +def fallback(value: T | None, fallback: T) -> T: + ... + + +@overload +def fallback(value: T | None, fallback0: T | None, default: T) -> T: + ... + + +@overload +def fallback(value: T | None, fallback0: T | None, fallback1: T | None, default: T) -> T: + ... + + +@overload +def fallback(value: T | None, *fallbacks: T | None) -> T | MissingT: + ... + + +@overload +def fallback(value: T | None, *fallbacks: T | None, default: T) -> T: + ... + + +def fallback(value: T | None, *fallbacks: T | None, default: T = fallback_missing) -> T | MissingT: # type: ignore + """ + Utility function that returns a value or a fallback if the value is None. + + Example: + + .. code-block:: python + + >>> fallback(5, 6) + 5 + >>> fallback(None, 6) + 6 + + :param value: Input value to evaluate. Can be None. + :param fallback_value: Value to return if the input value is None. + + :return: Input value or fallback value if input value is None. + """ + + if value is not None: + return value + + for fallback in fallbacks: + if fallback is not None: + return fallback + + if default is not fallback_missing: + return default + elif len(fallbacks) > 3: + return MISSING + + raise CustomRuntimeError('You need to specify a default/fallback value!') + + +@overload +def kwargs_fallback( + input_value: T | None, kwargs: tuple[KwargsT, str], fallback: T +) -> T: + ... + + +@overload +def kwargs_fallback( + input_value: T | None, kwargs: tuple[KwargsT, str], fallback0: T | None, default: T +) -> T: + ... + + +@overload +def kwargs_fallback( + input_value: T | None, kwargs: tuple[KwargsT, str], fallback0: T | None, fallback1: T | None, + default: T +) -> T: + ... + + +@overload +def kwargs_fallback( + input_value: T | None, kwargs: tuple[KwargsT, str], *fallbacks: T | None +) -> T | MissingT: + ... + + +@overload +def kwargs_fallback( + input_value: T | None, kwargs: tuple[KwargsT, str], *fallbacks: T | None, default: T +) -> T: + ... + + +def kwargs_fallback( # type: ignore + value: T | None, kwargs: tuple[KwargsT, str], *fallbacks: T | None, default: T = fallback_missing # type: ignore +) -> T | MissingT: + """Utility function to return a fallback value from kwargs if value was not found or is None.""" + + return fallback(value, kwargs[0].get(kwargs[1], None), *fallbacks, default=default) diff --git a/stgpytools/functions/normalize.py b/stgpytools/functions/normalize.py new file mode 100644 index 0000000..9930ee9 --- /dev/null +++ b/stgpytools/functions/normalize.py @@ -0,0 +1,254 @@ +from __future__ import annotations + +from fractions import Fraction +from typing import Any, Iterable, Iterator, Sequence, overload + +from ..types import F, SupportsString, T, SoftRange, SoftRangeN, SoftRangesN, StrictRange + +__all__ = [ + 'normalize_seq', + 'to_arr', + 'flatten', + 'normalize_list_to_ranges', + 'normalize_ranges_to_list', + 'normalize_range', + 'normalize_ranges', + 'invert_ranges', + 'norm_func_name', 'norm_display_name' +] + + +@overload +def normalize_seq(val: Sequence[T], length: int) -> list[T]: + ... + + +@overload +def normalize_seq(val: T | Sequence[T], length: int) -> list[T]: + ... + + +def normalize_seq(val: T | Sequence[T], length: int) -> list[T]: + """ + Normalize a sequence of values. + + :param val: Input value. + :param length: Amount of items in the output. + If original sequence length is less that this, + the last item will be repeated. + + :return: List of normalized values with a set amount of items. + """ + + val = to_arr(val) + + val += [val[-1]] * (length - len(val)) + + return val[:length] + + +_iterables_t = (list, tuple, range, zip, set, map, enumerate) + + +@overload +def to_arr(val: list[T], *, sub: bool = False) -> list[T]: + ... + + +@overload +def to_arr(val: T | Sequence[T], *, sub: bool = False) -> list[T]: + ... + + +def to_arr(val: T | Sequence[T], *, sub: bool = False) -> list[T]: + """Normalize any value into an iterable.""" + + if sub: + return list(val) if any(isinstance(val, x) for x in _iterables_t) else [val] # type: ignore + + return list(val) if type(val) in _iterables_t else [val] # type: ignore + + +@overload +def flatten(items: T | Iterable[T | Iterable[T | Iterable[T]]]) -> Iterable[T]: + ... + + +@overload +def flatten(items: T | Iterable[T | Iterable[T]]) -> Iterable[T]: # type: ignore + ... + + +@overload +def flatten(items: T | Iterable[T]) -> Iterable[T]: # type: ignore + ... + + +def flatten(items: Any) -> Any: + """Flatten an array of values.""" + + for val in items: + if isinstance(val, Iterable) and not isinstance(val, (str, bytes)): + for sub_x in flatten(val): + yield sub_x + else: + yield val + + +def normalize_range(ranges: SoftRange, /) -> Iterable[int]: + """ + Normalize ranges represented by a tuple to an iterable of frame numbers. + + :param ranges: Ranges to normalize. + + :return: List of positive frame ranges. + """ + + if isinstance(ranges, int): + return [ranges] + + if isinstance(ranges, tuple): + start, stop = ranges + step = -1 if stop < start else 1 + + return range(start, stop + step, step) + + return ranges + + +def normalize_list_to_ranges(flist: Iterable[int], min_length: int = 0) -> list[StrictRange]: + flist2 = list[list[int]]() + flist3 = list[int]() + + prev_n = -1 + + for n in sorted(set(flist)): + if prev_n + 1 != n: + if flist3: + flist2.append(flist3) + flist3 = [] + flist3.append(n) + prev_n = n + + if flist3: + flist2.append(flist3) + + flist4 = [i for i in flist2 if len(i) > min_length] + + return list(zip( + [i[0] for i in flist4], + [i[-1] for j, i in enumerate(flist4)] + )) + + +def normalize_ranges_to_list(ranges: Iterable[SoftRange]) -> list[int]: + out = list[int]() + + for srange in ranges: + out.extend(normalize_range(srange)) + + return out + + +def normalize_ranges(ranges: SoftRangeN | SoftRangesN, end: int) -> list[StrictRange]: + """ + Normalize ranges to a list of positive ranges. + + Frame ranges can include None and negative values. + None will be converted to either 0 if it's the first value in a SoftRange, or the end if it's the second item. + Negative values will be subtracted from the end. + + Examples: + + .. code-block:: python + + >>> normalize_ranges((None, None), end=1000) + [(0, 999)] + >>> normalize_ranges((24, -24), end=1000) + [(24, 975)] + >>> normalize_ranges([(24, 100), (80, 150)], end=1000) + [(24, 150)] + + + :param clip: Input clip. + :param franges: Frame range or list of frame ranges. + + :return: List of positive frame ranges. + """ + + ranges = ranges if isinstance(ranges, list) else [ranges] # type:ignore + + out = [] + + for r in ranges: + if r is None: + r = (None, None) + + if isinstance(r, tuple): + start, endd = r + if start is None: + start = 0 + if endd is None: + endd = end - 1 + else: + start = r + endd = r + + if start < 0: + start = end - 1 + start + + if endd < 0: + endd = end - 1 + endd + + out.append((start, endd)) + + return normalize_list_to_ranges([ + x for start, endd in out for x in range(start, endd + 1) + ]) + + +def invert_ranges(ranges: SoftRangeN | SoftRangesN, enda: int, endb: int | None) -> list[StrictRange]: + norm_ranges = normalize_ranges(enda if endb is None else endb, ranges) + + b_frames = {*normalize_ranges_to_list(norm_ranges)} + + return normalize_list_to_ranges({*range(enda)} - b_frames) + + +def norm_func_name(func_name: SupportsString | F) -> str: + """Normalize a class, function, or other object to obtain its name""" + + if isinstance(func_name, str): + return func_name.strip() + + if not isinstance(func_name, type) and not callable(func_name): + return str(func_name).strip() + + func = func_name + + if hasattr(func_name, '__name__'): + func_name = func.__name__ + elif hasattr(func_name, '__qualname__'): + func_name = func.__qualname__ + + if callable(func): + if hasattr(func, '__self__'): + func = func.__self__ if isinstance(func.__self__, type) else func.__self__.__class__ + func_name = f'{func.__name__}.{func_name}' + + return str(func_name).strip() + + +def norm_display_name(obj: object) -> str: + """Get a fancy name from any object.""" + + if isinstance(obj, Iterator): + return ', '.join(norm_display_name(v) for v in obj).strip() + + if isinstance(obj, Fraction): + return f'{obj.numerator}/{obj.denominator}' + + if isinstance(obj, dict): + return '(' + ', '.join(f'{k}={v}' for k, v in obj.items()) + ')' + + return norm_func_name(obj) diff --git a/stgpytools/py.typed b/stgpytools/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/stgpytools/types/__init__.py b/stgpytools/types/__init__.py new file mode 100644 index 0000000..f5c9896 --- /dev/null +++ b/stgpytools/types/__init__.py @@ -0,0 +1,6 @@ +from .builtins import * # noqa: F401, F403 +from .file import * # noqa: F401, F403 +from .funcs import * # noqa: F401, F403 +from .generic import * # noqa: F401, F403 +from .supports import * # noqa: F401, F403 +from .utils import * # noqa: F401, F403 diff --git a/stgpytools/types/builtins.py b/stgpytools/types/builtins.py new file mode 100644 index 0000000..d9af0d1 --- /dev/null +++ b/stgpytools/types/builtins.py @@ -0,0 +1,75 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Any, Callable, ParamSpec, Sequence, SupportsFloat, SupportsIndex, TypeAlias, TypeVar, Union + +__all__ = [ + 'T', 'T0', 'T1', 'T2', 'T_contra', + + 'F', 'F0', 'F1', 'F2', + + 'P', 'P0', 'P1', 'P2', + 'R', 'R0', 'R1', 'R2', 'R_contra', + + 'Nb', + + 'StrictRange', 'SoftRange', 'SoftRangeN', 'SoftRangesN', + + 'Self', + + 'SingleOrArr', 'SingleOrArrOpt', + 'SingleOrSeq', 'SingleOrSeqOpt', + + 'SimpleByteData', 'SimpleByteDataArray', + 'ByteData', + + 'KwargsT' +] + +Nb = TypeVar('Nb', float, int) + +T = TypeVar('T') +T0 = TypeVar('T0') +T1 = TypeVar('T1') +T2 = TypeVar('T2') + +F = TypeVar('F', bound=Callable[..., Any]) +F0 = TypeVar('F0', bound=Callable[..., Any]) +F1 = TypeVar('F1', bound=Callable[..., Any]) +F2 = TypeVar('F2', bound=Callable[..., Any]) + +P = ParamSpec('P') +P0 = ParamSpec('P0') +P1 = ParamSpec('P1') +P2 = ParamSpec('P2') + +R = TypeVar('R') +R0 = TypeVar('R0') +R1 = TypeVar('R1') +R2 = TypeVar('R2') + +T_contra = TypeVar('T_contra', contravariant=True) +R_contra = TypeVar('R_contra', contravariant=True) + +Self = TypeVar('Self') + +StrictRange: TypeAlias = tuple[int, int] +SoftRange: TypeAlias = int | StrictRange | Sequence[int] + +SoftRangeN: TypeAlias = int | tuple[int | None, int | None] | None + +if TYPE_CHECKING: + SoftRangesN: TypeAlias = Sequence[SoftRangeN] +else: + SoftRangesN: TypeAlias = list[SoftRangeN] + +SingleOrArr = Union[T, list[T]] +SingleOrSeq = Union[T, Sequence[T]] +SingleOrArrOpt = Union[SingleOrArr[T], None] +SingleOrSeqOpt = Union[SingleOrSeq[T], None] + +SimpleByteData: TypeAlias = str | bytes | bytearray +SimpleByteDataArray = Union[SimpleByteData, Sequence[SimpleByteData]] + +ByteData: TypeAlias = SupportsFloat | SupportsIndex | SimpleByteData | memoryview + +KwargsT = dict[str, Any] diff --git a/stgpytools/types/file.py b/stgpytools/types/file.py new file mode 100644 index 0000000..a96693f --- /dev/null +++ b/stgpytools/types/file.py @@ -0,0 +1,103 @@ +from __future__ import annotations + +from os import PathLike, path +from pathlib import Path +from typing import TYPE_CHECKING, Any, Callable, Iterable, Literal, TypeAlias, Union + +__all__ = [ + 'FilePathType', 'FileDescriptor', + 'FileOpener', + + 'OpenTextModeUpdating', + 'OpenTextModeWriting', + 'OpenTextModeReading', + + 'OpenBinaryModeUpdating', + 'OpenBinaryModeWriting', + 'OpenBinaryModeReading', + + 'OpenTextMode', + 'OpenBinaryMode', + + 'SPath', 'SPathLike' +] + +FileDescriptor: TypeAlias = int + +FilePathType: TypeAlias = str | bytes | PathLike[str] | PathLike[bytes] + +FileOpener: TypeAlias = Callable[[str, int], int] + +OpenTextModeUpdating: TypeAlias = Literal[ + 'r+', '+r', 'rt+', 'r+t', '+rt', 'tr+', 't+r', '+tr', 'w+', '+w', 'wt+', 'w+t', '+wt', 'tw+', 't+w', '+tw', + 'a+', '+a', 'at+', 'a+t', '+at', 'ta+', 't+a', '+ta', 'x+', '+x', 'xt+', 'x+t', '+xt', 'tx+', 't+x', '+tx', +] +OpenTextModeWriting: TypeAlias = Literal[ + 'w', 'wt', 'tw', 'a', 'at', 'ta', 'x', 'xt', 'tx' +] +OpenTextModeReading: TypeAlias = Literal[ + 'r', 'rt', 'tr', 'U', 'rU', 'Ur', 'rtU', 'rUt', 'Urt', 'trU', 'tUr', 'Utr' +] + +OpenBinaryModeUpdating: TypeAlias = Literal[ + 'rb+', 'r+b', '+rb', 'br+', 'b+r', '+br', 'wb+', 'w+b', '+wb', 'bw+', 'b+w', '+bw', + 'ab+', 'a+b', '+ab', 'ba+', 'b+a', '+ba', 'xb+', 'x+b', '+xb', 'bx+', 'b+x', '+bx' +] +OpenBinaryModeWriting: TypeAlias = Literal[ + 'wb', 'bw', 'ab', 'ba', 'xb', 'bx' +] +OpenBinaryModeReading: TypeAlias = Literal[ + 'rb', 'br', 'rbU', 'rUb', 'Urb', 'brU', 'bUr', 'Ubr' +] + +OpenTextMode: TypeAlias = OpenTextModeUpdating | OpenTextModeWriting | OpenTextModeReading +OpenBinaryMode: TypeAlias = OpenBinaryModeUpdating | OpenBinaryModeReading | OpenBinaryModeWriting + + +class SPath(Path): + """Modified version of pathlib.Path""" + _flavour = type(Path())._flavour # type: ignore + + if TYPE_CHECKING: + def __new__(cls, *args: SPathLike, **kwargs: Any) -> SPath: + ... + + def format(self, *args: Any, **kwargs: Any) -> SPath: + return SPath(self.to_str().format(*args, **kwargs)) + + def to_str(self) -> str: + return str(self) + + def get_folder(self) -> SPath: + folder_path = self.resolve() + + if folder_path.is_dir(): + return folder_path + + return SPath(path.dirname(folder_path)) + + def mkdirp(self) -> None: + return self.get_folder().mkdir(parents=True, exist_ok=True) + + def rmdirs(self, missing_ok: bool = False, ignore_errors: bool = True) -> None: + from shutil import rmtree + + try: + return rmtree(str(self.get_folder()), ignore_errors) + except FileNotFoundError: + if not missing_ok: + raise + + def read_lines( + self, encoding: str | None = None, errors: str | None = None, keepends: bool = False + ) -> list[str]: + return super().read_text(encoding, errors).splitlines(keepends) + + def write_lines( + self, data: Iterable[str], encoding: str | None = None, + errors: str | None = None, newline: str | None = None + ) -> int: + return super().write_text('\n'.join(data), encoding, errors, newline) + + +SPathLike = Union[str, Path, SPath] diff --git a/stgpytools/types/funcs.py b/stgpytools/types/funcs.py new file mode 100644 index 0000000..502ebd1 --- /dev/null +++ b/stgpytools/types/funcs.py @@ -0,0 +1,109 @@ +from __future__ import annotations +from functools import wraps + +from typing import TYPE_CHECKING, Any, Callable, Iterable, List, SupportsIndex, TypeAlias, TypeVar, overload + +from .builtins import T, P +from .supports import SupportsString + +__all__ = [ + 'StrList', + 'Sentinel' +] + + +class StrList(List[SupportsString]): + """Custom class for representing a recursively "stringable" list.""" + + if TYPE_CHECKING: + @overload + def __init__(self, __iterable: Iterable[SupportsString | None] = []) -> None: + ... + + @overload + def __init__(self, __iterable: Iterable[Iterable[SupportsString | None] | None] = []) -> None: + ... + + def __init__(self, __iterable: Any = []) -> None: + ... + + @property + def string(self) -> str: + return self.to_str() + + def to_str(self) -> str: + return str(self) + + def __str__(self) -> str: + from ..functions import flatten + + return ' '.join( + filter( + None, + (str(x).strip() for x in flatten(self) if x is not None) # type: ignore[var-annotated,arg-type] + ) + ) + + def __add__(self, __x: list[SupportsString]) -> StrList: # type: ignore[override] + return StrList(super().__add__(__x)) + + def __mul__(self, __n: SupportsIndex) -> StrList: + return StrList(super().__mul__(__n)) + + def __rmul__(self, __n: SupportsIndex) -> StrList: + return StrList(super().__rmul__(__n)) + + @property + def mlength(self) -> int: + return len(self) - 1 + + def append(self, *__object: SupportsString) -> None: + for __obj in __object: + super().append(__obj) + + +class SentinelDispatcher: + def check(self, ret_value: T, cond: bool) -> T | SentinelDispatcher: + return ret_value if cond else self + + def check_cb(self, callback: Callable[P, tuple[T, bool]]) -> Callable[P, T | SentinelDispatcher]: + @wraps(callback) + def _wrap(*args: P.args, **kwargs: P.kwargs) -> T | SentinelDispatcher: + return self.check(*callback(*args, **kwargs)) + + return _wrap + + def filter(self: SelfSentinel, items: Iterable[T | SelfSentinel]) -> Iterable[T]: + for item in items: + if item is self: + continue + + yield item # type: ignore + + @classmethod + def filter_multi(cls, items: Iterable[T | SelfSentinel], *sentinels: SelfSentinel) -> Iterable[T]: + for item in items: + if item in sentinels: + continue + + yield item # type: ignore + + def __getattr__(self, name: str) -> SentinelDispatcher: + if name not in _sentinels: + _sentinels[name] = SentinelDispatcher() + return _sentinels[name] + + def __setattr__(self, __name: str, __value: Any) -> None: + raise NameError + + def __call__(self) -> SentinelDispatcher: + return SentinelDispatcher() + + Type: TypeAlias = 'SentinelDispatcher' + + +Sentinel = SentinelDispatcher() + +_sentinels = dict[str, SentinelDispatcher]() + +SelfSentinel = TypeVar('SelfSentinel', bound=SentinelDispatcher) diff --git a/stgpytools/types/generic.py b/stgpytools/types/generic.py new file mode 100644 index 0000000..24a657b --- /dev/null +++ b/stgpytools/types/generic.py @@ -0,0 +1,54 @@ +from __future__ import annotations + +from enum import Enum, auto +from typing import Any, Callable, Literal, TypeAlias, Union + +from .builtins import F, SingleOrArr, SingleOrArrOpt +from .supports import SupportsString + +__all__ = [ + 'MissingT', 'MISSING', + + 'FuncExceptT', + + 'DataType', + + 'StrArr', 'StrArrOpt', + + 'PassthroughC' +] + + +class MissingTBase(Enum): + MissingT = auto() + + +MissingT: TypeAlias = Literal[MissingTBase.MissingT] +MISSING = MissingTBase.MissingT + +DataType = Union[str, bytes, bytearray, SupportsString] + +FuncExceptT = str | Callable[..., Any] | tuple[Callable[..., Any] | str, str] # type: ignore +""" +This type is used in specific functions that can throw an exception. +``` +def can_throw(..., *, func: FuncExceptT) -> None: + ... + if some_error: + raise CustomValueError('Some error occurred!!', func) + +def some_func() -> None: + ... + can_throw(..., func=some_func) +``` +If an error occurs, this will print a clear error ->\n +``ValueError: (some_func) Some error occurred!!`` +""" + +FuncExceptT = str | Callable[..., Any] | tuple[Callable[..., Any] | str, str] # type: ignore + + +StrArr = SingleOrArr[SupportsString] +StrArrOpt = SingleOrArrOpt[SupportsString] + +PassthroughC = Callable[[F], F] diff --git a/stgpytools/types/supports.py b/stgpytools/types/supports.py new file mode 100644 index 0000000..b6cc75d --- /dev/null +++ b/stgpytools/types/supports.py @@ -0,0 +1,127 @@ +from __future__ import annotations + +from abc import abstractmethod +from typing import ( + Any, Callable, Iterable, Protocol, SupportsFloat, SupportsIndex, TypeAlias, TypeVar, overload, runtime_checkable +) + +from .builtins import T0, T1, T2, T_contra + +__all__ = [ + 'SupportsTrunc', + + 'SupportsString', + + 'SupportsDunderLT', 'SupportsDunderGT', + 'SupportsDunderLE', 'SupportsDunderGE', + + 'SupportsFloatOrIndex', + + 'SupportsIndexing', + 'SupportsKeysAndGetItem', + + 'SupportsAllComparisons', + 'SupportsRichComparison', 'SupportsRichComparisonT', + 'ComparatorFunc' +] + + +_KT = TypeVar('_KT') +_VT_co = TypeVar('_VT_co', covariant=True) + + +@runtime_checkable +class SupportsTrunc(Protocol): + def __trunc__(self) -> int: + ... + + +@runtime_checkable +class SupportsString(Protocol): + @abstractmethod + def __str__(self) -> str: + ... + + +@runtime_checkable +class SupportsDunderLT(Protocol[T_contra]): + def __lt__(self, __other: T_contra) -> bool: + ... + + +@runtime_checkable +class SupportsDunderGT(Protocol[T_contra]): + def __gt__(self, __other: T_contra) -> bool: + ... + + +@runtime_checkable +class SupportsDunderLE(Protocol[T_contra]): + def __le__(self, __other: T_contra) -> bool: + ... + + +@runtime_checkable +class SupportsDunderGE(Protocol[T_contra]): + def __ge__(self, __other: T_contra) -> bool: + ... + + +@runtime_checkable +class SupportsAllComparisons( + SupportsDunderLT[Any], SupportsDunderGT[Any], SupportsDunderLE[Any], SupportsDunderGE[Any], Protocol +): + ... + + +SupportsRichComparison: TypeAlias = SupportsDunderLT[Any] | SupportsDunderGT[Any] +SupportsRichComparisonT = TypeVar('SupportsRichComparisonT', bound=SupportsRichComparison) + + +class ComparatorFunc(Protocol): + @overload + def __call__( + self, __arg1: SupportsRichComparisonT, __arg2: SupportsRichComparisonT, + *_args: SupportsRichComparisonT, key: None = ... + ) -> SupportsRichComparisonT: + ... + + @overload + def __call__(self, __arg1: T0, __arg2: T0, *_args: T0, key: Callable[[T0], SupportsRichComparison]) -> T0: + ... + + @overload + def __call__(self, __iterable: Iterable[SupportsRichComparisonT], *, key: None = ...) -> SupportsRichComparisonT: + ... + + @overload + def __call__(self, __iterable: Iterable[T0], *, key: Callable[[T0], SupportsRichComparison]) -> T0: + ... + + @overload + def __call__( + self, __iterable: Iterable[SupportsRichComparisonT], *, key: None = ..., default: T0 + ) -> SupportsRichComparisonT | T0: + ... + + @overload + def __call__( + self, __iterable: Iterable[T1], *, key: Callable[[T1], SupportsRichComparison], default: T2 + ) -> T1 | T2: + ... + + +class SupportsIndexing(Protocol[_VT_co]): + def __getitem__(self, __k: int) -> _VT_co: + ... + + +class SupportsKeysAndGetItem(Protocol[_KT, _VT_co]): + def keys(self) -> Iterable[_KT]: + ... + + def __getitem__(self, __k: _KT) -> _VT_co: + ... + + +SupportsFloatOrIndex: TypeAlias = SupportsFloat | SupportsIndex diff --git a/stgpytools/types/utils.py b/stgpytools/types/utils.py new file mode 100644 index 0000000..97918db --- /dev/null +++ b/stgpytools/types/utils.py @@ -0,0 +1,509 @@ +from __future__ import annotations + +from functools import wraps +from inspect import Signature, isclass +from typing import ( + TYPE_CHECKING, Any, Callable, Concatenate, Generator, Generic, Iterable, Iterator, Mapping, NoReturn, Protocol, + Sequence, TypeVar, cast, overload +) + +from .builtins import F0, P0, P1, R0, R1, T0, T1, T2, KwargsT, P, R, T + +__all__ = [ + 'copy_signature', + + 'inject_self', + + 'complex_hash', + + 'get_subclasses', + + 'classproperty', 'cachedproperty', + + 'KwargsNotNone', + + 'Singleton', 'to_singleton', + + 'LinearRangeLut' +] + + +class copy_signature(Generic[F0]): + """ + Type util to copy the signature of one function to another function.\n + Especially useful for passthrough functions. + + .. code-block:: + + class SomeClass: + def __init__( + self, some: Any, complex: Any, /, *args: Any, + long: Any, signature: Any, **kwargs: Any + ) -> None: + ... + + class SomeClassChild(SomeClass): + @copy_signature(SomeClass.__init__) + def __init__(*args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + # do some other thing + + class Example(SomeClass): + @copy_signature(SomeClass.__init__) + def __init__(*args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + # another thing + """ + + def __init__(self, target: F0) -> None: + """Copy the signature of ``target``.""" + + def __call__(self, wrapped: Callable[..., Any]) -> F0: + return cast(F0, wrapped) + + +class injected_self_func(Generic[T, P, R], Protocol): # type: ignore[misc] + @overload + @staticmethod + def __call__(*args: P.args, **kwargs: P.kwargs) -> R: + ... + + @overload + @staticmethod + def __call__(self: T, *args: P.args, **kwargs: P.kwargs) -> R: + ... + + @overload + @staticmethod + def __call__(self: T, _self: T, *args: P.args, **kwargs: P.kwargs) -> R: + ... + + @overload + @staticmethod + def __call__(cls: type[T], *args: P.args, **kwargs: P.kwargs) -> R: + ... + + @overload + @staticmethod + def __call__(cls: type[T], _cls: type[T], *args: P.args, **kwargs: P.kwargs) -> R: + ... + + @staticmethod # type: ignore + def __call__(*args: Any, **kwds: Any) -> Any: + ... + + +self_objects_cache = dict[type[T], T]() + + +class inject_self_base(Generic[T, P, R]): + def __init__(self, function: Callable[Concatenate[T, P], R], /, *, cache: bool = False) -> None: + """ + Wrap ``function`` to always have a self provided to it. + + :param function: Method to wrap. + :param cache: Whether to cache the self object. + """ + + self.cache = self.init_kwargs = None + + if isinstance(self, inject_self.cached): + self.cache = True + + self.function = function + + self.signature = self.first_key = self.init_kwargs = None + + self.args = tuple[Any]() + self.kwargs = dict[str, Any]() + + self.clean_kwargs = False + + def __get__( + self, class_obj: type[T] | T | None, class_type: type[T] | type[type[T]] # type: ignore + ) -> injected_self_func[T, P, R]: + if not self.signature or not self.first_key: # type: ignore + self.signature = Signature.from_callable(self.function, follow_wrapped=True, eval_str=True) # type: ignore + self.first_key = next(iter(list(self.signature.parameters.keys())), None) # type: ignore + + if isinstance(self, inject_self.init_kwargs): + from ..exceptions import CustomValueError + + if 4 not in {x.kind for x in self.signature.parameters.values()}: # type: ignore + raise CustomValueError( + 'This function hasn\'t got any kwargs!', 'inject_self.init_kwargs', self.function + ) + + self.init_kwargs = list[str]( # type: ignore + k for k, x in self.signature.parameters.items() if x.kind != 4 # type: ignore + ) + + @wraps(self.function) + def _wrapper(*args: Any, **kwargs: Any) -> Any: + first_arg = (args[0] if args else None) or ( + kwargs.get(self.first_key, None) if self.first_key else None # type: ignore + ) + + if ( + first_arg and ( + (is_obj := isinstance(first_arg, class_type)) + or isinstance(first_arg, type(class_type)) # noqa + or first_arg is class_type # noqa + ) + ): + obj = first_arg if is_obj else first_arg() + if args: + args = args[1:] + elif kwargs and self.first_key: + kwargs.pop(self.first_key) # type: ignore + elif class_obj is None: + if self.cache: + if class_type not in self_objects_cache: + obj = self_objects_cache[class_type] = class_type(*self.args, **self.kwargs) + else: + obj = self_objects_cache[class_type] + elif self.init_kwargs: + obj = class_type( # type: ignore + *self.args, **(self.kwargs | {k: v for k, v in kwargs.items() if k not in self.init_kwargs}) + ) + if self.clean_kwargs: + kwargs = {k: v for k, v in kwargs.items() if k in self.init_kwargs} + else: + obj = class_type(*self.args, **self.kwargs) + else: + obj = class_obj + + return self.function(obj, *args, **kwargs) + + return _wrapper + + @classmethod + def with_args( + cls, *args: Any, **kwargs: Any + ) -> Callable[[Callable[Concatenate[T0, P0], R0]], inject_self[T0, P0, R0]]: + """Provide custom args to instantiate the ``self`` object with.""" + + def _wrapper(function: Callable[Concatenate[T0, P0], R0]) -> inject_self[T0, P0, R0]: + inj = cls(function) # type: ignore + inj.args = args + inj.kwargs = kwargs + return inj # type: ignore + return _wrapper + + +class inject_self(Generic[T, P, R], inject_self_base[T, P, R]): # type: ignore + """Wrap a method so it always has a constructed ``self`` provided to it.""" + + class cached(Generic[T0, P0, R0], inject_self_base[T0, P0, R0]): # type: ignore + """ + Wrap a method so it always has a constructed ``self`` provided to it. + Once ``self`` is constructed, it will be reused. + """ + + class init_kwargs(Generic[T0, P0, R0], inject_self_base[T0, P0, R0]): # type: ignore + """ + Wrap a method so it always has a constructed ``self`` provided to it. + When constructed, kwargs to the function will be passed to the constructor. + """ + + @classmethod + def clean(cls, function: Callable[Concatenate[T0, P0], R0]) -> inject_self[T0, P0, R0]: + """Wrap a method, pass kwargs to the constructor and remove them from actual **kwargs.""" + inj = cls(function) + inj.clean_kwargs = True + return inj # type: ignore + + +class complex_hash(Generic[T]): + """ + Decorator for classes to add a ``__hash__`` method to them. + + Especially useful for NamedTuples. + """ + + def __new__(cls, class_type: T) -> T: # type: ignore + class inner_class_type(class_type): # type: ignore + def __hash__(self) -> int: + return complex_hash.hash( + self.__class__.__name__, *( + getattr(self, key) for key in self.__annotations__.keys() + ) + ) + + return inner_class_type # type: ignore + + @staticmethod + def hash(*args: Any) -> int: + """ + Recursively hash every unhashable object in ``*args``. + + :param *args: Objects to be hashed. + + :return: Hash of all the combined objects' hashes. + """ + + values = list[str]() + for value in args: + try: + new_hash = hash(value) + except TypeError: + if isinstance(value, Iterable): + new_hash = complex_hash.hash(*value) + else: + new_hash = hash(str(value)) + + values.append(str(new_hash)) + + return hash('_'.join(values)) + + +def get_subclasses(family: type[T], exclude: Sequence[type[T]] = []) -> list[type[T]]: + """ + Get all subclasses of a given type. + + :param family: "Main" type all other classes inherit from. + :param exclude: Excluded types from the yield. Note that they won't be excluded from search. + For examples, subclasses of these excluded classes will be yield. + + :return: List of all subclasses of "family". + """ + + def _subclasses(cls: type[T]) -> Generator[type[T], None, None]: + for subclass in cls.__subclasses__(): + yield from _subclasses(subclass) + if subclass in exclude: + continue + yield subclass + + return list(set(_subclasses(family))) + + +class classproperty(Generic[P, R, T, T0, P0]): + """ + Make a class property. A combination between classmethod and property. + """ + + __isabstractmethod__: bool = False + + class metaclass(type): + """This must be set for the decorator to work.""" + + def __setattr__(self, key: str, value: Any) -> None: + if key in self.__dict__: + obj = self.__dict__.get(key) + + if obj and type(obj) is classproperty: + return obj.__set__(self, value) + + return super(classproperty.metaclass, self).__setattr__(key, value) + + def __init__( + self, + fget: classmethod[R] | Callable[P, R], + fset: classmethod[None] | Callable[[T, T0], None] | None = None, + fdel: classmethod[None] | Callable[P0, None] | None = None, + doc: str | None = None, + ) -> None: + if not isinstance(fget, (classmethod, staticmethod)): + fget = classmethod(fget) + + self.fget = self._wrap(fget) + self.fset = self._wrap(fset) if fset is not None else fset + self.fdel = self._wrap(fdel) if fdel is not None else fdel + + self.doc = doc + + def _wrap(self, func: classmethod[R1] | Callable[P1, R1]) -> classmethod[R1]: + if not isinstance(func, (classmethod, staticmethod)): + func = classmethod(func) + + return func + + def getter(self, __fget: classmethod[R] | Callable[P1, R1]) -> classproperty[P1, R1, T, T0, P0]: + self.fget = self._wrap(__fget) # type: ignore + return self # type: ignore + + def setter(self, __fset: classmethod[None] | Callable[[T1, T2], None]) -> classproperty[P, R, T1, T2, P0]: + self.fset = self._wrap(__fset) + return self # type: ignore + + def deleter(self, __fdel: classmethod[None] | Callable[P1, None]) -> classproperty[P, R, T, T0, P1]: + self.fdel = self._wrap(__fdel) + return self # type: ignore + + def __get__(self, __obj: Any, __type: type | None = None) -> R: + if __type is None: + __type = type(__obj) + + return self.fget.__get__(__obj, __type)() + + def __set__(self, __obj: Any, __value: T1) -> None: + from ..exceptions import CustomError + + if not self.fset: + raise CustomError[AttributeError]("Can't set attribute") + + if isclass(__obj): + type_, __obj = __obj, None + else: + type_ = type(__obj) + + return self.fset.__get__(__obj, type_)(__value) + + def __delete__(self, __obj: Any) -> None: + from ..exceptions import CustomError + + if not self.fdel: + raise CustomError[AttributeError]("Can't delete attribute") + + if isclass(__obj): + type_, __obj = __obj, None + else: + type_ = type(__obj) + + return self.fdel.__delete__(__obj, type_)(__obj) # type: ignore + + +class cachedproperty(property, Generic[P, R, T, T0, P0]): + """ + Wrapper for a one-time get property, that will be cached. + + Keep in mind two things: + + * The cache is per-object. Don't hold a reference to itself or it will never get garbage collected. + * Your class has to either manually set __dict__[cachedproperty.cache_key] + or inherit from cachedproperty.baseclass. + """ + + __isabstractmethod__: bool = False + + cache_key = '_stgpt_cachedproperty_cache' + + class baseclass: + """Inherit from this class to automatically set the cache dict.""" + + if not TYPE_CHECKING: + def __new__(cls, *args: Any, **kwargs: Any) -> None: + try: + self = super().__new__(cls, *args, **kwargs) + except TypeError: + self = super().__new__(cls) + self.__dict__.__setitem__(cachedproperty.cache_key, dict[str, Any]()) + return self + + if TYPE_CHECKING: + def __init__( + self, fget: Callable[P, R], fset: Callable[[T, T0], None] | None = None, + fdel: Callable[P0, None] | None = None, doc: str | None = None, + ) -> None: + ... + + def getter(self, __fget: Callable[P1, R1]) -> cachedproperty[P1, R1, T, T0, P0]: + ... + + def setter(self, __fset: Callable[[T1, T2], None]) -> cachedproperty[P, R, T1, T2, P0]: + ... + + def deleter(self, __fdel: Callable[P1, None]) -> cachedproperty[P, R, T, T0, P1]: + ... + + def __get__(self, __obj: Any, __type: type | None = None) -> R: + function = self.fget.__get__(__obj, __type) # type: ignore + + cache = __obj.__dict__.get(cachedproperty.cache_key) + name = function.__name__ + + if name not in cache: + cache[name] = function() + + return cache[name] # type: ignore + + +class KwargsNotNone(KwargsT): + """Remove all None objects from this kwargs dict.""" + + if not TYPE_CHECKING: + def __new__(cls, *args: Any, **kwargs: Any) -> KwargsNotNone: + return KwargsT(**{ + key: value for key, value in KwargsT(*args, **kwargs).items() + if value is not None + }) + + +SingleMeta = TypeVar('SingleMeta', bound=type) + + +class SingletonMeta(type): + _instances = dict[type[SingleMeta], SingleMeta]() + _singleton_init: bool + + def __new__( + cls: type[SingletonSelf], name: str, bases: tuple[type, ...], namespace: dict[str, Any], **kwargs: Any + ) -> SingletonSelf: + return type.__new__(cls, name, bases, namespace | {'_singleton_init': kwargs.pop('init', False)}) + + def __call__(cls: type[SingletonSelf], *args: Any, **kwargs: Any) -> SingletonSelf: # type: ignore + if cls not in cls._instances: + cls._instances[cls] = super(SingletonMeta, cls).__call__(*args, **kwargs) + elif cls._singleton_init: + cls._instances[cls].__init__(*args, **kwargs) # type: ignore + + return cls._instances[cls] + + +SingletonSelf = TypeVar('SingletonSelf', bound=SingletonMeta) + + +class Singleton(metaclass=SingletonMeta): + """Handy class to inherit to have the SingletonMeta metaclass.""" + + +class to_singleton: + _ts_args = tuple[str, ...]() + _ts_kwargs = dict[str, Any]() + + def __new__(_cls, cls: type[T]) -> T: # type: ignore + return cls(*_cls._ts_args, **_cls._ts_kwargs) + + @staticmethod + def with_args(*args: Any, **kwargs: Any) -> to_singleton: + class _inner_singl(to_singleton): + _ts_args = args + _ts_kwargs = kwargs + + return _inner_singl + + +class LinearRangeLut(Mapping[int, int]): + __slots__ = ('ranges', '_ranges_idx_lut', '_misses_n') + + def __init__(self, ranges: Mapping[int, range]) -> None: + self.ranges = ranges + + self._ranges_idx_lut = list(self.ranges.items()) + self._misses_n = 0 + + def __getitem__(self, n: int) -> int: + for missed_hit, (idx, k) in enumerate(self._ranges_idx_lut): + if n in k: + break + + if missed_hit: + self._misses_n += 1 + + if self._misses_n > 2: + self._ranges_idx_lut = self._ranges_idx_lut[missed_hit:] + self._ranges_idx_lut[:missed_hit] + + return idx + + def __len__(self) -> int: + return len(self.ranges) + + def __iter__(self) -> Iterator[int]: + return iter(range(len(self))) + + def __setitem__(self, n: int, _range: range) -> NoReturn: + raise NotImplementedError + + def __delitem__(self, n: int) -> NoReturn: + raise NotImplementedError diff --git a/stgpytools/utils/__init__.py b/stgpytools/utils/__init__.py new file mode 100644 index 0000000..3522803 --- /dev/null +++ b/stgpytools/utils/__init__.py @@ -0,0 +1,4 @@ +from .file import * # noqa: F401, F403 +from .funcs import * # noqa: F401, F403 +from .math import * # noqa: F401, F403 +from .ranges import * # noqa: F401, F403 diff --git a/stgpytools/utils/file.py b/stgpytools/utils/file.py new file mode 100644 index 0000000..2cdfa7b --- /dev/null +++ b/stgpytools/utils/file.py @@ -0,0 +1,253 @@ +from __future__ import annotations + +import ctypes +import sys +from io import BufferedRandom, BufferedReader, BufferedWriter, FileIO, TextIOWrapper +from os import F_OK, R_OK, W_OK, X_OK, access, getenv, path +from pathlib import Path +from typing import IO, Any, BinaryIO, Callable, Literal, overload + +from ..exceptions import FileIsADirectoryError, FileNotExistsError, FilePermissionError, FileWasNotFoundError +from ..types import ( + FileOpener, FilePathType, FuncExceptT, OpenBinaryMode, OpenBinaryModeReading, OpenBinaryModeUpdating, + OpenBinaryModeWriting, OpenTextMode, SPath +) + +__all__ = [ + 'add_script_path_hook', + 'remove_script_path_hook', + + 'get_script_path', + + 'get_user_data_dir', + + 'check_perms', + 'open_file' +] + +_script_path_hooks = list[Callable[[], SPath | None]]() + + +def add_script_path_hook(hook: Callable[[], SPath | None]) -> None: + _script_path_hooks.append(hook) + + +def remove_script_path_hook(hook: Callable[[], SPath | None]) -> None: + _script_path_hooks.remove(hook) + + +def get_script_path() -> SPath: + for hook in reversed(_script_path_hooks): + if (script_path := hook()) is not None: + return script_path + + import __main__ + + return SPath(__main__.__file__) + + +def get_user_data_dir() -> Path: + """Get user data dir path.""" + + if sys.platform == 'win32': + buf = ctypes.create_unicode_buffer(1024) + ctypes.windll.shell32.SHGetFolderPathW(None, 28, None, 0, buf) + + if any([ord(c) > 255 for c in buf]): + buf2 = ctypes.create_unicode_buffer(1024) + if ctypes.windll.kernel32.GetShortPathNameW(buf.value, buf2, 1024): + buf = buf2 + + return Path(path.normpath(buf.value)) + + if sys.platform == 'darwin': # type: ignore[unreachable] + return Path(path.expanduser('~/Library/Application Support/')) + + return Path(getenv('XDG_DATA_HOME', path.expanduser("~/.local/share"))) + + +def check_perms( + file: FilePathType, mode: OpenTextMode | OpenBinaryMode, strict: bool = False, + *, func: FuncExceptT | None = None +) -> bool: + """ + Confirm whether the user has write/read access to a file. + + :param file: Path to file. + :param mode: Read/Write mode. + :param func: Function that this was called from, only useful to *func writers. + + :param: True if the user has write/read access, else False. + + :raises FileNotExistsError: File could not be found. + :raises FilePermissionError: User does not have access to the file. + :raises FileIsADirectoryError: Given path is a directory, not a file. + :raises FileWasNotFoundError: Parent directories exist, but the given file could not be found. + """ + + file = Path(str(file)) + got_perms = False + + mode_i = F_OK + + if func is not None: + if not str(file): + raise FileNotExistsError(file, func) + + for char in 'rbU': + mode_str = mode.replace(char, '') + + if not mode_str: + mode_i = R_OK + elif 'x' in mode_str: + mode_i = X_OK + elif '+' in mode_str or 'w' in mode_str: + mode_i = W_OK + + check_file = file + + if not strict and mode_i != R_OK: + while not check_file.exists(): + check_file = check_file.parent + + got_perms = access(check_file, mode_i) + + if func is not None: + if not got_perms: + raise FilePermissionError(file, func) + + if strict: + if file.is_dir(): + raise FileIsADirectoryError(file, func) + elif not file.exists(): + if file.parent.exists(): + raise FileWasNotFoundError(file, func) + else: + raise FileNotExistsError(file, func) + + return got_perms + + +@overload +def open_file( + file: FilePathType, mode: OpenTextMode = 'r', buffering: int = ..., + encoding: str | None = None, errors: str | None = ..., newline: str | None = ..., + *, func: FuncExceptT | None = None +) -> TextIOWrapper: + ... + + +@overload +def open_file( + file: FilePathType, mode: OpenBinaryMode, buffering: Literal[0], + encoding: None = None, *, func: FuncExceptT | None = None +) -> FileIO: + ... + + +@overload +def open_file( + file: FilePathType, mode: OpenBinaryModeUpdating, buffering: Literal[-1, 1] = ..., + encoding: None = None, *, func: FuncExceptT | None = None +) -> BufferedRandom: + ... + + +@overload +def open_file( + file: FilePathType, mode: OpenBinaryModeWriting, buffering: Literal[-1, 1] = ..., + encoding: None = None, *, func: FuncExceptT | None = None +) -> BufferedWriter: + ... + + +@overload +def open_file( + file: FilePathType, mode: OpenBinaryModeReading, buffering: Literal[-1, 1] = ..., + encoding: None = None, *, func: FuncExceptT | None = None +) -> BufferedReader: + ... + + +@overload +def open_file( + file: FilePathType, mode: OpenBinaryMode, buffering: int = ..., + encoding: None = None, *, func: FuncExceptT | None = None +) -> BinaryIO: + ... + + +@overload +def open_file( + file: FilePathType, mode: str, buffering: int = ..., + encoding: str | None = ..., errors: str | None = ..., newline: str | None = ..., + closefd: bool = ..., opener: FileOpener | None = ..., *, func: FuncExceptT | None = None +) -> IO[Any]: + ... + + +def open_file(file: FilePathType, mode: Any = 'r+', *args: Any, func: FuncExceptT | None = None, **kwargs: Any) -> Any: + """ + Open file and return a stream. Raise OSError upon failure. + + :param file: Is either a text or byte string giving the name of the file to be opened. + It is also possible to use a string or bytearray as a file for both reading and writing. + For strings, StringIO can be used like a file opened in a text mode. + For bytes a BytesIO can be used like a file opened in a binary mode. + :param mode: This is an optional string that specifies the mode in which the file is opened. + It defaults to 'r' which means open for reading in text mode. + Other common values are: + 'w' for writing, and truncating the file if it already exists + 'x' for creating and writing to a new file + 'a' for appending, which on some Unix systems means that all writes append to the end + of the file regardless of the current seek position). + In text mode, if encoding is not specified the encoding used is platform dependent: + locale.getpreferredencoding(False) is called to get the current locale encoding. + For reading and writing raw bytes use binary mode and leave encoding unspecified. + :param buffering: This is an optional integer used to set the buffering policy. + Pass: + 0 to switch buffering off (only allowed in binary mode) + 1 to select line buffering (only usable in text mode) + Integer > 1 to indicate the size of a fixed-size chunk buffer. + When no buffering argument is given, the default buffering policy works as follows: + Binary files are buffered in fixed-size chunks; + the size of the buffer is chosen using a heuristic trying to determine the + underlying device's "block size" and falling back on io.DEFAULT_BUFFER_SIZE. + On many systems, the buffer will typically be 4096 or 8192 bytes long. + "Interactive" text files (files for which isatty() returns True) use line buffering. + Other text files use the policy described above for binary files. + :param: encoding: This is the name of the encoding used to decode or encode the file. + This should only be used in text mode + The default encoding is platform dependent, but any encoding supported by Python can be passed. + See the codecs module for the list of supported encodings. + :param newline: This parameter controls how universal newlines works (it only applies to text mode). + It can be None, '', '\n', '\r', and '\r\n'. + It works as follows: + On input, + if newline is None, universal newlines mode is enabled. + Lines in the input can end in '\n', '\r', or '\r\n', + and these are translated into '\n' before being returned to the caller. + If it is '', universal newline mode is enabled, but line endings are + returned to the caller untranslated. + If it has any of the other legal values, input lines are only terminated + by the given string, and the line ending is returned to the caller untranslated. + On output, + if newline is None, any '\n' characters written are translated to the system default + line separator, os.linesep. If newline is '' or '\n', no translation takes place. + If newline is any of the other legal values, any '\n' characters written are + translated to the given string. + + :return: A file object whose type depends on the mode, and through which the standard file operations + such as reading and writing are performed. + When open_file is used to open a file in a text mode ('w', 'r', 'wt', 'rt', etc.), + it returns a TextIOWrapper. + When used to open a file in a binary mode, the returned class varies: + in read binary mode, it returns a BufferedReader + in write binary and append binary modes, it returns a BufferedWriter + in read/write mode, it returns a BufferedRandom + + """ + + check_perms(file, mode, func=func) + + return open(file, mode, *args, errors='strict', closefd=True, **kwargs) # type: ignore diff --git a/stgpytools/utils/funcs.py b/stgpytools/utils/funcs.py new file mode 100644 index 0000000..bb6402b --- /dev/null +++ b/stgpytools/utils/funcs.py @@ -0,0 +1,35 @@ +from __future__ import annotations + +from functools import update_wrapper +from types import FunctionType +from typing import Sequence + +from ..types import F + +__all__ = [ + 'copy_func', + 'erase_module' +] + + +def copy_func(f: F) -> FunctionType: + """Try copying a function.""" + + try: + g = FunctionType( + f.__code__, f.__globals__, name=f.__name__, argdefs=f.__defaults__, closure=f.__closure__ + ) + g = update_wrapper(g, f) + g.__kwdefaults__ = f.__kwdefaults__ + return g + except BaseException: # for builtins + return f # type: ignore + + +def erase_module(func: F, modules: Sequence[str] | None = None) -> F: + """Delete the __module__ of the function.""" + + if hasattr(func, '__module__') and (True if modules is None else (func.__module__ in modules)): + func.__module__ = None # type: ignore + + return func diff --git a/stgpytools/utils/math.py b/stgpytools/utils/math.py new file mode 100644 index 0000000..d4d37e3 --- /dev/null +++ b/stgpytools/utils/math.py @@ -0,0 +1,149 @@ +from __future__ import annotations + +from math import ceil, log +from typing import Sequence + +from ..types import Nb + +__all__ = [ + 'clamp', 'clamp_arr', + + 'cround', + + 'mod_x', 'mod2', 'mod4', 'mod8', + + 'next_power_of_y', 'next_power_of_2', + + 'spline_coeff' +] + + +def clamp(val: Nb, min_val: Nb, max_val: Nb) -> Nb: + """Faster max(min(value, max_val), min_val) "wrapper" """ + + return min_val if val < min_val else max_val if val > max_val else val + + +def clamp_arr(vals: Sequence[Nb], min_val: Nb, max_val: Nb) -> list[Nb]: + """Map an array to clamp.""" + + return [clamp(x, min_val, max_val) for x in vals] + + +def cround(x: float, *, eps: float = 1e-6) -> int: + """Rounding function that accounts for float's imprecision.""" + + return round(x + (eps if x > 0. else - eps)) + + +def mod_x(val: int | float, x: int) -> int: + """Force a value to be divisible by x (val % x == 0).""" + + if x == 0: + return cround(val) + + return cround(val / x) * x + + +def mod2(val: int | float) -> int: + """Force a value to be mod 2""" + + return mod_x(val, x=2) + + +def mod4(val: int | float) -> int: + """Force a value to be mod 4""" + + return mod_x(val, x=4) + + +def mod8(val: int | float) -> int: + """Force a value to be mod 8""" + + return mod_x(val, x=8) + + +def next_power_of_2(x: float) -> int: + """Get the next power of 2 of x.""" + + x = cround(x) + + if x == 0: + return 1 + + if x & (x - 1) == 0: + return x + + while x & (x - 1) > 0: + x &= (x - 1) + + return x << 1 + + +def next_power_of_y(x: float, y: int) -> int: + """Get the next power of y of x.""" + + if x == 0: + return 1 + + return int(y ** ceil(log(x, y))) + + +def spline_coeff( + x: int, coordinates: list[tuple[float, float]] = [ + (0, 0), (0.5, 0.1), (1, 0.6), (2, 0.9), (2.5, 1), (3, 1.1), (3.5, 1.15), (4, 1.2), (8, 1.25), (255, 1.5) + ] +) -> float: + """Get spline coefficient of an index and coordinates.""" + + length = len(coordinates) + + if length < 3: + raise ValueError("coordinates require at least three pairs") + + px, py = zip(*coordinates) + + matrix = [[1.0] + [0.0] * length] + + for i in range(1, length - 1): + p = [0.0] * (length + 1) + + p[i - 1] = px[i] - px[i - 1] + p[i] = 2 * (px[i + 1] - px[i - 1]) + p[i + 1] = px[i + 1] - px[i] + p[length] = 6 * (((py[i + 1] - py[i]) / p[i + 1]) - (py[i] - py[i - 1]) / p[i - 1]) + + matrix.append(p) + + matrix += [([0.0] * (length - 1) + [1.0, 0.0])] + + for i in range(length): + num = matrix[i][i] + + for j in range(length + 1): + matrix[i][j] /= num + + for j in range(length): + if i != j: + a = matrix[j][i] + + for k in range(i, length + 1): + matrix[j][k] -= a * matrix[i][k] + + for i in range(length + 1): + if x >= px[i] and x <= px[i + 1]: + break + + j = i + 1 + + h = px[j] - px[i] + + s = matrix[j][length] * float((x - px[i]) ** 3) + s -= matrix[i][length] * float((x - px[j]) ** 3) + + s /= 6 * h + + s += (py[j] / h - h * matrix[j][length] / 6) * (x - px[i]) + s -= (py[i] / h - h * matrix[i][length] / 6) * (x - px[j]) + + return s diff --git a/stgpytools/utils/ranges.py b/stgpytools/utils/ranges.py new file mode 100644 index 0000000..e522d1d --- /dev/null +++ b/stgpytools/utils/ranges.py @@ -0,0 +1,89 @@ +from __future__ import annotations + +from itertools import chain, zip_longest +from typing import Iterable, overload + +from ..exceptions import CustomIndexError +from ..types import T0, T + +__all__ = [ + 'ranges_product', + + 'interleave_arr' +] + + +@overload +def ranges_product(range0: range | int, range1: range | int, /) -> Iterable[tuple[int, int]]: + ... + + +@overload +def ranges_product(range0: range | int, range1: range | int, range2: range | int, /) -> Iterable[tuple[int, int, int]]: + ... + + +def ranges_product(*_iterables: range | int) -> Iterable[tuple[int, ...]]: + """ + Take two or three lengths/ranges and make a cartesian product of them. + + Useful for getting all coordinates of an image. + For example ranges_product(1920, 1080) will give you [(0, 0), (0, 1), (0, 2), ..., (1919, 1078), (1919, 1079)]. + """ + + n_iterables = len(_iterables) + + if n_iterables <= 1: + raise CustomIndexError(f'Not enough ranges passed! ({n_iterables})', ranges_product) + + iterables = [range(x) if isinstance(x, int) else x for x in _iterables] + + if n_iterables == 2: + first_it, second_it = iterables + + for xx in first_it: + for yy in second_it: + yield xx, yy + elif n_iterables == 3: + first_it, second_it, third_it = iterables + + for xx in first_it: + for yy in second_it: + for zz in third_it: + yield xx, yy, zz + else: + raise CustomIndexError(f'Too many ranges passed! ({n_iterables})', ranges_product) + + +def interleave_arr(arr0: Iterable[T], arr1: Iterable[T0], n: int = 2) -> Iterable[T | T0]: + """ + Interleave two arrays of variable length. + + :param arr0: First array to be interleaved. + :param arr1: Second array to be interleaved. + :param n: The number of elements from arr0 to include in the interleaved sequence + before including an element from arr1. + + :yield: Elements from either arr0 or arr01. + """ + if n == 1: + yield from (x for x in chain.from_iterable(zip_longest(arr0, arr1)) if x is not None) + + return + + arr0_i, arr1_i = iter(arr0), iter(arr1) + arr1_vals = arr0_vals = True + + while arr1_vals or arr0_vals: + if arr0_vals: + for _ in range(n): + try: + yield next(arr0_i) + except StopIteration: + arr0_vals = False + + if arr1_vals: + try: + yield next(arr1_i) + except StopIteration: + arr1_vals = False