diff --git a/.github/CODESTYLE.md b/.github/CODESTYLE.md index 9499611..cfd199f 100644 --- a/.github/CODESTYLE.md +++ b/.github/CODESTYLE.md @@ -19,39 +19,39 @@ Listed is a example class used demonstrate general rules you should follow throu ```python class ExampleClass: - """ - ExampleClass - ------------ - Example class for CODESTYLE.md - """ - # ^^^ reST Docstring Format - - _private_attribute : int # private attributes begin with a lowercase - public_attribute : int # type hint for integer is defined here - - def __init__( - self, - public_attribute: int # type hint for parameters - ) -> None: # the expected return value of method """ - Initializes a ExampleClass - - Parameters - ---------- - :param public_attribute: example attribute - """ - self.public_attribute = public_attribute - self.private_attribute = square(public_attribute) - - def square(self, value: int) -> int: - """ - Example method that square roots a value - - Parameters - ---------- - :param value: value that you want squared + ExampleClass + ------------ + Example class for CODESTYLE.md """ - return value**2 + # ^^^ reST Docstring Format + + _private_attribute : int # private attributes begin with a lowercase + public_attribute : int # type hint for integer is defined here + + def __init__( + self, + public_attribute: int # type hint for parameters + ) -> None: # the expected return value of method + """ + Initializes a ExampleClass + + Parameters + ---------- + :param public_attribute: example attribute + """ + self.public_attribute = public_attribute + self.private_attribute = square(public_attribute) + + def square(self, value: int) -> int: + """ + Example method that square roots a value + + Parameters + ---------- + :param value: value that you want squared + """ + return value**2 ``` @@ -87,7 +87,7 @@ Currently, documentation for this project resides in markdown files. - Use of HTML is permitted - [reference style links](https://www.markdownguide.org/basic-syntax/#reference-style-links) are not required by are appreciated - Exceedingly long lines are to be broken - - The indents are to be two spaces + - The indents are to be 4 spaces ```markdown diff --git a/ruff.toml b/ruff.toml index 12673e1..ae553fb 100644 --- a/ruff.toml +++ b/ruff.toml @@ -1,5 +1,3 @@ -indent-width = 2 - [format] # Exclude commonly ignored directories. exclude = [ diff --git a/src/thread/__init__.py b/src/thread/__init__.py index b0505e8..4c9062a 100644 --- a/src/thread/__init__.py +++ b/src/thread/__init__.py @@ -38,12 +38,12 @@ # Wildcard Export __all__ = [ - 'Thread', - 'ParallelProcessing', - 'threaded', - 'processor', - 'types', - 'exceptions', - 'Settings', - '__version__', + 'Thread', + 'ParallelProcessing', + 'threaded', + 'processor', + 'types', + 'exceptions', + 'Settings', + '__version__', ] diff --git a/src/thread/__main__.py b/src/thread/__main__.py index 4dd94a0..f02849d 100644 --- a/src/thread/__main__.py +++ b/src/thread/__main__.py @@ -2,4 +2,4 @@ from .cli import app if __name__ == '__main__': - app(prog_name='thread') + app(prog_name='thread') diff --git a/src/thread/_types.py b/src/thread/_types.py index ee393d6..3f81b10 100644 --- a/src/thread/_types.py +++ b/src/thread/_types.py @@ -16,13 +16,13 @@ # Variable Types ThreadStatus = Literal[ -'Idle', -'Running', -'Invoking hooks', -'Completed', -'Errored', -'Kill Scheduled', -'Killed', + 'Idle', + 'Running', + 'Invoking hooks', + 'Completed', + 'Errored', + 'Kill Scheduled', + 'Killed', ] diff --git a/src/thread/cli.py b/src/thread/cli.py index d8c5c26..1498369 100644 --- a/src/thread/cli.py +++ b/src/thread/cli.py @@ -1,10 +1,10 @@ try: - import importlib + import importlib - thread_cli = importlib.import_module('thread-cli') - app = thread_cli.app + thread_cli = importlib.import_module('thread-cli') + app = thread_cli.app except ModuleNotFoundError: - def app(prog_name='thread'): - print('thread-cli not found, please install it with `pip install thread-cli`') - exit(1) + def app(prog_name='thread'): + print('thread-cli not found, please install it with `pip install thread-cli`') + exit(1) diff --git a/src/thread/decorators/_processor.py b/src/thread/decorators/_processor.py index 7cea5bb..505d58c 100644 --- a/src/thread/decorators/_processor.py +++ b/src/thread/decorators/_processor.py @@ -19,146 +19,148 @@ NoParamReturn = Callable[ -Concatenate[Sequence[_DataT], _TargetP], -ParallelProcessing[_TargetP, _TargetT, _DataT], + Concatenate[Sequence[_DataT], _TargetP], + ParallelProcessing[_TargetP, _TargetT, _DataT], ] WithParamReturn = Callable[ -[TargetFunction[_DataT, _TargetP, _TargetT]], -NoParamReturn[_DataT, _TargetP, _TargetT], + [TargetFunction[_DataT, _TargetP, _TargetT]], + NoParamReturn[_DataT, _TargetP, _TargetT], ] FullParamReturn = Callable[ -Concatenate[Sequence[_DataT], _TargetP], -ParallelProcessing[_TargetP, _TargetT, _DataT], + Concatenate[Sequence[_DataT], _TargetP], + ParallelProcessing[_TargetP, _TargetT, _DataT], ] @overload def processor( - __function: TargetFunction[_DataT, _TargetP, _TargetT], - ) -> NoParamReturn[_DataT, _TargetP, _TargetT]: - ... + __function: TargetFunction[_DataT, _TargetP, _TargetT], +) -> NoParamReturn[_DataT, _TargetP, _TargetT]: + ... @overload def processor( - *, - args: Sequence[Data_In] = (), - kwargs: Mapping[str, Data_In] = {}, - ignore_errors: Sequence[type[Exception]] = (), - suppress_errors: bool = False, - **overflow_kwargs: Overflow_In, - ) -> WithParamReturn[_DataT, _TargetP, _TargetT]: - ... + *, + args: Sequence[Data_In] = (), + kwargs: Mapping[str, Data_In] = {}, + ignore_errors: Sequence[type[Exception]] = (), + suppress_errors: bool = False, + **overflow_kwargs: Overflow_In, +) -> WithParamReturn[_DataT, _TargetP, _TargetT]: + ... @overload def processor( - __function: TargetFunction[_DataT, _TargetP, _TargetT], - *, - args: Sequence[Data_In] = (), - kwargs: Mapping[str, Data_In] = {}, - ignore_errors: Sequence[type[Exception]] = (), - suppress_errors: bool = False, - **overflow_kwargs: Overflow_In, - ) -> FullParamReturn[_DataT, _TargetP, _TargetT]: - ... + __function: TargetFunction[_DataT, _TargetP, _TargetT], + *, + args: Sequence[Data_In] = (), + kwargs: Mapping[str, Data_In] = {}, + ignore_errors: Sequence[type[Exception]] = (), + suppress_errors: bool = False, + **overflow_kwargs: Overflow_In, +) -> FullParamReturn[_DataT, _TargetP, _TargetT]: + ... def processor( - __function: Optional[TargetFunction[_DataT, _TargetP, _TargetT]] = None, - *, - args: Sequence[Data_In] = (), - kwargs: Mapping[str, Data_In] = {}, - ignore_errors: Sequence[type[Exception]] = (), - suppress_errors: bool = False, - **overflow_kwargs: Overflow_In, - ) -> Union[ - NoParamReturn[_DataT, _TargetP, _TargetT], - WithParamReturn[_DataT, _TargetP, _TargetT], - FullParamReturn[_DataT, _TargetP, _TargetT], + __function: Optional[TargetFunction[_DataT, _TargetP, _TargetT]] = None, + *, + args: Sequence[Data_In] = (), + kwargs: Mapping[str, Data_In] = {}, + ignore_errors: Sequence[type[Exception]] = (), + suppress_errors: bool = False, + **overflow_kwargs: Overflow_In, +) -> Union[ + NoParamReturn[_DataT, _TargetP, _TargetT], + WithParamReturn[_DataT, _TargetP, _TargetT], + FullParamReturn[_DataT, _TargetP, _TargetT], ]: - """ - Decorate a function to run it in a thread - - Parameters - ---------- - :param __function: The function to run in a thread - :param args: Keyword-Only arguments to pass into `thread.Thread` - :param kwargs: Keyword-Only keyword arguments to pass into `thread.Thread` - :param ignore_errors: Keyword-Only arguments to pass into `thread.Thread` - :param suppress_errors: Keyword-Only arguments to pass into `thread.Thread` - :param **: Keyword-Only arguments to pass into `thread.Thread` - - Returns - ------- - :return decorator: - - Use Case - -------- - Now whenever `myfunction` is invoked, it will be executed in a thread and the `Thread` object will be returned - - >>> @thread.threaded - >>> def myfunction(*args, **kwargs): ... - - >>> myJob = myfunction(1, 2) - >>> type(myjob) - > Thread - - You can also pass keyword arguments to change the thread behaviour, it otherwise follows the defaults of `thread.Thread` - >>> @thread.threaded(daemon = True) - >>> def myfunction(): - ... ... - - Args will be ordered infront of function-parsed args parsed into `thread.Thread.args` - >>> @thread.threaded(args = (1)) - >>> def myfunction(*args): - >>> print(args) - >>> - >>> myfunction(4, 6).get_return_value() - 1, 4, 6 - """ - - if not callable(__function): - - def wrapper( - func: TargetFunction[_DataT, _TargetP, _TargetT], - ) -> FullParamReturn[_DataT, _TargetP, _TargetT]: - return processor( - func, - args=args, - kwargs=kwargs, - ignore_errors=ignore_errors, - suppress_errors=suppress_errors, - **overflow_kwargs, - ) - - return wrapper - - overflow_kwargs.update( - {'ignore_errors': ignore_errors, 'suppress_errors': suppress_errors} - ) - - kwargs = dict(kwargs) - - @wraps(__function) - def wrapped( - data: Sequence[_DataT], - *parsed_args: _TargetP.args, - **parsed_kwargs: _TargetP.kwargs, - ) -> ParallelProcessing[_TargetP, _TargetT, _DataT]: - kwargs.update(parsed_kwargs) - - processed_args = (*args, *parsed_args) - processed_kwargs = {i: v for i, v in kwargs.items() if i not in ['args', 'kwargs']} - - job = ParallelProcessing( - function=__function, - dataset=data, - args=processed_args, - kwargs=processed_kwargs, - **overflow_kwargs, + """ + Decorate a function to run it in a thread + + Parameters + ---------- + :param __function: The function to run in a thread + :param args: Keyword-Only arguments to pass into `thread.Thread` + :param kwargs: Keyword-Only keyword arguments to pass into `thread.Thread` + :param ignore_errors: Keyword-Only arguments to pass into `thread.Thread` + :param suppress_errors: Keyword-Only arguments to pass into `thread.Thread` + :param **: Keyword-Only arguments to pass into `thread.Thread` + + Returns + ------- + :return decorator: + + Use Case + -------- + Now whenever `myfunction` is invoked, it will be executed in a thread and the `Thread` object will be returned + + >>> @thread.threaded + >>> def myfunction(*args, **kwargs): ... + + >>> myJob = myfunction(1, 2) + >>> type(myjob) + > Thread + + You can also pass keyword arguments to change the thread behaviour, it otherwise follows the defaults of `thread.Thread` + >>> @thread.threaded(daemon = True) + >>> def myfunction(): + ... ... + + Args will be ordered infront of function-parsed args parsed into `thread.Thread.args` + >>> @thread.threaded(args = (1)) + >>> def myfunction(*args): + >>> print(args) + >>> + >>> myfunction(4, 6).get_return_value() + 1, 4, 6 + """ + + if not callable(__function): + + def wrapper( + func: TargetFunction[_DataT, _TargetP, _TargetT], + ) -> FullParamReturn[_DataT, _TargetP, _TargetT]: + return processor( + func, + args=args, + kwargs=kwargs, + ignore_errors=ignore_errors, + suppress_errors=suppress_errors, + **overflow_kwargs, + ) + + return wrapper + + overflow_kwargs.update( + {'ignore_errors': ignore_errors, 'suppress_errors': suppress_errors} ) - job.start() - return job - return wrapped + kwargs = dict(kwargs) + + @wraps(__function) + def wrapped( + data: Sequence[_DataT], + *parsed_args: _TargetP.args, + **parsed_kwargs: _TargetP.kwargs, + ) -> ParallelProcessing[_TargetP, _TargetT, _DataT]: + kwargs.update(parsed_kwargs) + + processed_args = (*args, *parsed_args) + processed_kwargs = { + i: v for i, v in kwargs.items() if i not in ['args', 'kwargs'] + } + + job = ParallelProcessing( + function=__function, + dataset=data, + args=processed_args, + kwargs=processed_kwargs, + **overflow_kwargs, + ) + job.start() + return job + + return wrapped diff --git a/src/thread/decorators/_threaded.py b/src/thread/decorators/_threaded.py index efed3d1..0f3a06c 100644 --- a/src/thread/decorators/_threaded.py +++ b/src/thread/decorators/_threaded.py @@ -24,117 +24,120 @@ @overload def threaded(__function: TargetFunction[P, T]) -> NoParamReturn[P, T]: - ... + ... @overload def threaded( - *, - args: Sequence[Data_In] = (), - kwargs: Mapping[str, Data_In] = {}, - ignore_errors: Sequence[type[Exception]] = (), - suppress_errors: bool = False, - **overflow_kwargs: Overflow_In, - ) -> WithParamReturn[P, T]: - ... + *, + args: Sequence[Data_In] = (), + kwargs: Mapping[str, Data_In] = {}, + ignore_errors: Sequence[type[Exception]] = (), + suppress_errors: bool = False, + **overflow_kwargs: Overflow_In, +) -> WithParamReturn[P, T]: + ... @overload def threaded( - __function: TargetFunction[P, T], - *, - args: Sequence[Data_In] = (), - kwargs: Mapping[str, Data_In] = {}, - ignore_errors: Sequence[type[Exception]] = (), - suppress_errors: bool = False, - **overflow_kwargs: Overflow_In, - ) -> FullParamReturn[P, T]: - ... + __function: TargetFunction[P, T], + *, + args: Sequence[Data_In] = (), + kwargs: Mapping[str, Data_In] = {}, + ignore_errors: Sequence[type[Exception]] = (), + suppress_errors: bool = False, + **overflow_kwargs: Overflow_In, +) -> FullParamReturn[P, T]: + ... def threaded( - __function: Optional[TargetFunction[P, T]] = None, - *, - args: Sequence[Data_In] = (), - kwargs: Mapping[str, Data_In] = {}, - ignore_errors: Sequence[type[Exception]] = (), - suppress_errors: bool = False, - **overflow_kwargs: Overflow_In, - ) -> Union[NoParamReturn[P, T], WithParamReturn[P, T], FullParamReturn[P, T]]: - """ - Decorate a function to run it in a thread - - Parameters - ---------- - :param __function: The function to run in a thread - :param args: Keyword-Only arguments to pass into `thread.Thread` - :param kwargs: Keyword-Only keyword arguments to pass into `thread.Thread` - :param ignore_errors: Keyword-Only arguments to pass into `thread.Thread` - :param suppress_errors: Keyword-Only arguments to pass into `thread.Thread` - :param **: Keyword-Only arguments to pass into `thread.Thread` - - Returns - ------- - :return decorator: - - Use Case - -------- - Now whenever `myfunction` is invoked, it will be executed in a thread and the `Thread` object will be returned - - >>> @thread.threaded - >>> def myfunction(*args, **kwargs): ... - - >>> myJob = myfunction(1, 2) - >>> type(myjob) - > Thread - - You can also pass keyword arguments to change the thread behaviour, it otherwise follows the defaults of `thread.Thread` - >>> @thread.threaded(daemon = True) - >>> def myfunction(): - ... ... - - Args will be ordered infront of function-parsed args parsed into `thread.Thread.args` - >>> @thread.threaded(args = (1)) - >>> def myfunction(*args): - >>> print(args) - >>> - >>> myfunction(4, 6).get_return_value() - 1, 4, 6 - """ - - if not callable(__function): - - def wrapper(func: TargetFunction[P, T]) -> FullParamReturn[P, T]: - return threaded( - func, - args=args, - kwargs=kwargs, - ignore_errors=ignore_errors, - suppress_errors=suppress_errors, - **overflow_kwargs, - ) - - return wrapper - - overflow_kwargs.update( - {'ignore_errors': ignore_errors, 'suppress_errors': suppress_errors} - ) - - kwargs = dict(kwargs) - - @wraps(__function) - def wrapped(*parsed_args: P.args, **parsed_kwargs: P.kwargs) -> Thread[P, T]: - kwargs.update(parsed_kwargs) - - processed_args = (*args, *parsed_args) - processed_kwargs = { - i: v for i, v in parsed_kwargs.items() if i not in ['args', 'kwargs'] - } - - job = Thread( - target=__function, args=processed_args, kwargs=processed_kwargs, **overflow_kwargs + __function: Optional[TargetFunction[P, T]] = None, + *, + args: Sequence[Data_In] = (), + kwargs: Mapping[str, Data_In] = {}, + ignore_errors: Sequence[type[Exception]] = (), + suppress_errors: bool = False, + **overflow_kwargs: Overflow_In, +) -> Union[NoParamReturn[P, T], WithParamReturn[P, T], FullParamReturn[P, T]]: + """ + Decorate a function to run it in a thread + + Parameters + ---------- + :param __function: The function to run in a thread + :param args: Keyword-Only arguments to pass into `thread.Thread` + :param kwargs: Keyword-Only keyword arguments to pass into `thread.Thread` + :param ignore_errors: Keyword-Only arguments to pass into `thread.Thread` + :param suppress_errors: Keyword-Only arguments to pass into `thread.Thread` + :param **: Keyword-Only arguments to pass into `thread.Thread` + + Returns + ------- + :return decorator: + + Use Case + -------- + Now whenever `myfunction` is invoked, it will be executed in a thread and the `Thread` object will be returned + + >>> @thread.threaded + >>> def myfunction(*args, **kwargs): ... + + >>> myJob = myfunction(1, 2) + >>> type(myjob) + > Thread + + You can also pass keyword arguments to change the thread behaviour, it otherwise follows the defaults of `thread.Thread` + >>> @thread.threaded(daemon = True) + >>> def myfunction(): + ... ... + + Args will be ordered infront of function-parsed args parsed into `thread.Thread.args` + >>> @thread.threaded(args = (1)) + >>> def myfunction(*args): + >>> print(args) + >>> + >>> myfunction(4, 6).get_return_value() + 1, 4, 6 + """ + + if not callable(__function): + + def wrapper(func: TargetFunction[P, T]) -> FullParamReturn[P, T]: + return threaded( + func, + args=args, + kwargs=kwargs, + ignore_errors=ignore_errors, + suppress_errors=suppress_errors, + **overflow_kwargs, + ) + + return wrapper + + overflow_kwargs.update( + {'ignore_errors': ignore_errors, 'suppress_errors': suppress_errors} ) - job.start() - return job - return wrapped + kwargs = dict(kwargs) + + @wraps(__function) + def wrapped(*parsed_args: P.args, **parsed_kwargs: P.kwargs) -> Thread[P, T]: + kwargs.update(parsed_kwargs) + + processed_args = (*args, *parsed_args) + processed_kwargs = { + i: v for i, v in parsed_kwargs.items() if i not in ['args', 'kwargs'] + } + + job = Thread( + target=__function, + args=processed_args, + kwargs=processed_kwargs, + **overflow_kwargs, + ) + job.start() + return job + + return wrapped diff --git a/src/thread/exceptions.py b/src/thread/exceptions.py index 28c126f..184ee18 100644 --- a/src/thread/exceptions.py +++ b/src/thread/exceptions.py @@ -9,58 +9,60 @@ class ErrorBase(Exception): - """Base exception class for all errors within this library""" + """Base exception class for all errors within this library""" - message: str = 'Something went wrong!' + message: str = 'Something went wrong!' - def __init__(self, message: Optional[str] = None, *args: Any, **kwargs: Any) -> None: - message = message or self.message - super().__init__(message, *args, **kwargs) + def __init__( + self, message: Optional[str] = None, *args: Any, **kwargs: Any + ) -> None: + message = message or self.message + super().__init__(message, *args, **kwargs) # THREAD ERRORS # class ThreadStillRunningError(ErrorBase): - """Exception class for attempting to invoke a method which requires the thread not be running, but isn't""" + """Exception class for attempting to invoke a method which requires the thread not be running, but isn't""" - message: str = 'Thread is still running, unable to invoke method. You can wait for the thread to terminate with `Thread.join()` or check with `Thread.is_alive()`' + message: str = 'Thread is still running, unable to invoke method. You can wait for the thread to terminate with `Thread.join()` or check with `Thread.is_alive()`' class ThreadNotRunningError(ErrorBase): - """Exception class for attempting to invoke a method which requires the thread to be running, but isn't""" + """Exception class for attempting to invoke a method which requires the thread to be running, but isn't""" - message: str = ( - 'Thread is not running, unable to invoke method. Have you ran `Thread.start()`?' - ) + message: str = ( + 'Thread is not running, unable to invoke method. Have you ran `Thread.start()`?' + ) class ThreadNotInitializedError(ErrorBase): - """Exception class for attempting to invoke a method which requires the thread to be initialized, but isn't""" + """Exception class for attempting to invoke a method which requires the thread to be initialized, but isn't""" - message: str = 'Thread is not initialized, unable to invoke method.' + message: str = 'Thread is not initialized, unable to invoke method.' class HookRuntimeError(ErrorBase): - """Exception class for hook runtime errors""" + """Exception class for hook runtime errors""" - message: str = 'Encountered runtime errors in hooks' - count: int = 0 + message: str = 'Encountered runtime errors in hooks' + count: int = 0 - def __init__( - self, message: Optional[str] = '', extra: Sequence[Tuple[Exception, str]] = [] + def __init__( + self, message: Optional[str] = '', extra: Sequence[Tuple[Exception, str]] = [] ) -> None: - """ - Extra for parsing all hooks that errored - - Parameters - ---------- - :param message: The message to be parsed, can be left blank - :param extra: Tuple of (Exception_Raised, function_name) - """ - new_message: str = message or self.message - - for i, v in enumerate(extra): - trace = '\n'.join(traceback.format_stack()) - new_message += f'\n\n{i}. {v[1]}\n>>>>>>>>>>' - new_message += f'{trace}\n{v[0]}' - new_message += '<<<<<<<<<<' - super().__init__(new_message) + """ + Extra for parsing all hooks that errored + + Parameters + ---------- + :param message: The message to be parsed, can be left blank + :param extra: Tuple of (Exception_Raised, function_name) + """ + new_message: str = message or self.message + + for i, v in enumerate(extra): + trace = '\n'.join(traceback.format_stack()) + new_message += f'\n\n{i}. {v[1]}\n>>>>>>>>>>' + new_message += f'{trace}\n{v[0]}' + new_message += '<<<<<<<<<<' + super().__init__(new_message) diff --git a/src/thread/thread.py b/src/thread/thread.py index 9297236..d597f3a 100644 --- a/src/thread/thread.py +++ b/src/thread/thread.py @@ -21,16 +21,16 @@ class ParallelProcessing: ... from .utils.algorithm import chunk_split from ._types import ( - ThreadStatus, - Data_In, - Data_Out, - Overflow_In, - TargetFunction, - _Target_P, - _Target_T, - DatasetFunction, - _Dataset_T, - HookFunction, + ThreadStatus, + Data_In, + Data_Out, + Overflow_In, + TargetFunction, + _Target_P, + _Target_T, + DatasetFunction, + _Dataset_T, + HookFunction, ) from typing_extensions import Generic, ParamSpec from typing import List, Optional, Union, Mapping, Sequence, Tuple, Generator @@ -40,510 +40,513 @@ class ParallelProcessing: ... class Thread(threading.Thread, Generic[_Target_P, _Target_T]): - """ - Wraps python's `threading.Thread` class - --------------------------------------- - - Type-Safe and provides more functionality on top - """ - - status: ThreadStatus - hooks: List[HookFunction] - _returned_value: Data_Out - - errors: List[Exception] - ignore_errors: Sequence[type[Exception]] - suppress_errors: bool - - # threading.Thread stuff - _initialized: bool - - def __init__( - self, - target: TargetFunction[_Target_P, _Target_T], - args: Sequence[Data_In] = (), - kwargs: Mapping[str, Data_In] = {}, - ignore_errors: Sequence[type[Exception]] = (), - suppress_errors: bool = False, - name: Optional[str] = None, - daemon: bool = False, - group=None, - *overflow_args: Overflow_In, - **overflow_kwargs: Overflow_In, - ) -> None: - """ - Initializes a thread - - Parameters - ---------- - :param target: This should be a function that takes in anything and returns anything - :param args: This should be an interable sequence of arguments parsed to the `target` function (e.g. tuple('foo', 'bar')) - :param kwargs: This should be the kwargs parsed to the `target` function (e.g. dict(foo = 'bar')) - :param ignore_errors: This should be an interable sequence of all exceptions to ignore. To ignore all exceptions, parse tuple(Exception) - :param suppress_errors: This should be a boolean indicating whether exceptions will be raised, else will only write to internal `errors` property - :param name: This is an argument parsed to `threading.Thread` - :param daemon: This is an argument parsed to `threading.Thread` - :param group: This does nothing right now, but should be left as None - :param *: These are arguments parsed to `threading.Thread` - :param **: These are arguments parsed to `thread.Thread` - """ - _target = self._wrap_target(target) - self._returned_value = None - self.status = 'Idle' - self.hooks = [] - - self.errors = [] - self.ignore_errors = ignore_errors - self.suppress_errors = suppress_errors - - super().__init__( - target=_target, - args=args, - kwargs=kwargs, - name=name, - daemon=daemon, - group=group, - *overflow_args, - **overflow_kwargs, - ) - - def _wrap_target( - self, target: TargetFunction[_Target_P, _Target_T] - ) -> TargetFunction[_Target_P, Union[_Target_T, None]]: - """Wraps the target function""" - - @wraps(target) - def wrapper( - *args: _Target_P.args, **kwargs: _Target_P.kwargs - ) -> Union[_Target_T, None]: - try: - self.status = 'Running' - - global Threads - Threads.add(self) - - try: - self._returned_value = target(*args, **kwargs) - except Exception as e: - if not any(isinstance(e, ignore) for ignore in self.ignore_errors): - self.status = 'Errored' - self.errors.append(e) - return - - self.status = 'Invoking hooks' - self._invoke_hooks() - Threads.remove(self) - self.status = 'Completed' - - except SystemExit: - self.status = 'Killed' - print('KILLED ident: %s' % self.ident) - return - - return wrapper - - def _invoke_hooks(self) -> None: - """Invokes hooks in the thread""" - errors: List[Tuple[Exception, str]] = [] - for hook in self.hooks: - try: - hook(self._returned_value) - except Exception as e: - if not any(isinstance(e, ignore) for ignore in self.ignore_errors): - errors.append((e, hook.__name__)) - - if len(errors) > 0: - self.errors.append(exceptions.HookRuntimeError(None, errors)) - - def _handle_exceptions(self) -> None: - """Raises exceptions if not suppressed in the main thread""" - if self.suppress_errors: - return - - for e in self.errors: - raise e - - @property - def result(self) -> _Target_T: - """ - The return value of the thread - - Raises - ------ - ThreadNotInitializedError: If the thread is not initialized - ThreadNotRunningError: If the thread is not running - ThreadStillRunningError: If the thread is still running - """ - if not self._initialized: - raise exceptions.ThreadNotInitializedError() - if self.status in ['Idle', 'Killed']: - raise exceptions.ThreadNotRunningError() - - self._handle_exceptions() - if self.status in ['Invoking hooks', 'Completed']: - return self._returned_value - else: - raise exceptions.ThreadStillRunningError() - - def is_alive(self) -> bool: - """ - See if thread is still alive - - Raises - ------ - ThreadNotInitializedError: If the thread is not initialized - """ - if not self._initialized: - raise exceptions.ThreadNotInitializedError() - return super().is_alive() - - def add_hook(self, hook: HookFunction[_Target_T]) -> None: """ - Adds a hook to the thread - ------------------------- - Hooks are executed automatically after a successful thread execution. - The returned value is parsed directly into the hook - - Parameters - ---------- - :param hook: This should be a function which takes the output value of `target` and should return None - """ - self.hooks.append(hook) - - def join(self, timeout: Optional[float] = None) -> bool: - """ - Halts the current thread execution until a thread completes or exceeds the timeout - - Parameters - ---------- - :param timeout: The maximum time allowed to halt the thread - - Returns - ------- - :returns bool: True if the thread is no-longer alive - - Raises - ------ - ThreadNotInitializedError: If the thread is not initialized - ThreadNotRunningError: If the thread is not running - """ - if not self._initialized: - raise exceptions.ThreadNotInitializedError() - - if self.status == ['Idle', 'Killed']: - raise exceptions.ThreadNotRunningError() - - super().join(timeout) - self._handle_exceptions() - return not self.is_alive() + Wraps python's `threading.Thread` class + --------------------------------------- - def get_return_value(self) -> _Target_T: + Type-Safe and provides more functionality on top """ - Halts the current thread execution until the thread completes - - Returns - ------- - :returns Any: The return value of the target function - """ - self.join() - return self.result - - def kill(self, yielding: bool = False, timeout: float = 5) -> bool: - """ - Schedules a thread to be killed - - Parameters - ---------- - :param yielding: If true, halts the current thread execution until the thread is killed - :param timeout: The maximum number of seconds to wait before exiting - - Returns - ------- - :returns bool: False if the it exceeded the timeout without being killed - - Raises - ------ - ValueError: If the thread ident does not exist - ThreadNotInitializedError: If the thread is not initialized - ThreadNotRunningError: If the thread is not running - """ - if not self.is_alive(): - raise exceptions.ThreadNotRunningError() - - self.status = 'Kill Scheduled' - - res: int = ctypes.pythonapi.PyThreadState_SetAsyncExc( - ctypes.c_long(self.ident), ctypes.py_object(SystemExit) - ) - - if res == 0: - raise ValueError('Thread IDENT does not exist') - elif res > 1: - # Unexpected behaviour, something seriously went wrong - # https://docs.python.org/3/c-api/init.html#c.PyThreadState_SetAsyncExc - ctypes.pythonapi.PyThreadState_SetAsyncExc(self.ident, None) - raise SystemError( - f'Killing thread with ident [{self.ident}] failed!\nPyThreadState_SetAsyncExc returned: {res}' - ) - if not yielding: - return True - - start = time.perf_counter() - while self.status != 'Killed': - time.sleep(0.01) - if (time.perf_counter() - start) >= timeout: - return False - - return True - - def start(self) -> None: - """ - Starts the thread - - Raises - ------ - ThreadNotInitializedError: If the thread is not initialized - ThreadStillRunningError: If there already is a running thread - """ - if self.is_alive(): - raise exceptions.ThreadStillRunningError() + status: ThreadStatus + hooks: List[HookFunction] + _returned_value: Data_Out + + errors: List[Exception] + ignore_errors: Sequence[type[Exception]] + suppress_errors: bool + + # threading.Thread stuff + _initialized: bool + + def __init__( + self, + target: TargetFunction[_Target_P, _Target_T], + args: Sequence[Data_In] = (), + kwargs: Mapping[str, Data_In] = {}, + ignore_errors: Sequence[type[Exception]] = (), + suppress_errors: bool = False, + name: Optional[str] = None, + daemon: bool = False, + group=None, + *overflow_args: Overflow_In, + **overflow_kwargs: Overflow_In, + ) -> None: + """ + Initializes a thread + + Parameters + ---------- + :param target: This should be a function that takes in anything and returns anything + :param args: This should be an interable sequence of arguments parsed to the `target` function (e.g. tuple('foo', 'bar')) + :param kwargs: This should be the kwargs parsed to the `target` function (e.g. dict(foo = 'bar')) + :param ignore_errors: This should be an interable sequence of all exceptions to ignore. To ignore all exceptions, parse tuple(Exception) + :param suppress_errors: This should be a boolean indicating whether exceptions will be raised, else will only write to internal `errors` property + :param name: This is an argument parsed to `threading.Thread` + :param daemon: This is an argument parsed to `threading.Thread` + :param group: This does nothing right now, but should be left as None + :param *: These are arguments parsed to `threading.Thread` + :param **: These are arguments parsed to `thread.Thread` + """ + _target = self._wrap_target(target) + self._returned_value = None + self.status = 'Idle' + self.hooks = [] + + self.errors = [] + self.ignore_errors = ignore_errors + self.suppress_errors = suppress_errors + + super().__init__( + target=_target, + args=args, + kwargs=kwargs, + name=name, + daemon=daemon, + group=group, + *overflow_args, + **overflow_kwargs, + ) + + def _wrap_target( + self, target: TargetFunction[_Target_P, _Target_T] + ) -> TargetFunction[_Target_P, Union[_Target_T, None]]: + """Wraps the target function""" + + @wraps(target) + def wrapper( + *args: _Target_P.args, **kwargs: _Target_P.kwargs + ) -> Union[_Target_T, None]: + try: + self.status = 'Running' + + global Threads + Threads.add(self) + + try: + self._returned_value = target(*args, **kwargs) + except Exception as e: + if not any(isinstance(e, ignore) for ignore in self.ignore_errors): + self.status = 'Errored' + self.errors.append(e) + return + + self.status = 'Invoking hooks' + self._invoke_hooks() + Threads.remove(self) + self.status = 'Completed' + + except SystemExit: + self.status = 'Killed' + print('KILLED ident: %s' % self.ident) + return + + return wrapper + + def _invoke_hooks(self) -> None: + """Invokes hooks in the thread""" + errors: List[Tuple[Exception, str]] = [] + for hook in self.hooks: + try: + hook(self._returned_value) + except Exception as e: + if not any(isinstance(e, ignore) for ignore in self.ignore_errors): + errors.append((e, hook.__name__)) + + if len(errors) > 0: + self.errors.append(exceptions.HookRuntimeError(None, errors)) + + def _handle_exceptions(self) -> None: + """Raises exceptions if not suppressed in the main thread""" + if self.suppress_errors: + return - super().start() + for e in self.errors: + raise e + + @property + def result(self) -> _Target_T: + """ + The return value of the thread + + Raises + ------ + ThreadNotInitializedError: If the thread is not initialized + ThreadNotRunningError: If the thread is not running + ThreadStillRunningError: If the thread is still running + """ + if not self._initialized: + raise exceptions.ThreadNotInitializedError() + if self.status in ['Idle', 'Killed']: + raise exceptions.ThreadNotRunningError() + + self._handle_exceptions() + if self.status in ['Invoking hooks', 'Completed']: + return self._returned_value + else: + raise exceptions.ThreadStillRunningError() + + def is_alive(self) -> bool: + """ + See if thread is still alive + + Raises + ------ + ThreadNotInitializedError: If the thread is not initialized + """ + if not self._initialized: + raise exceptions.ThreadNotInitializedError() + return super().is_alive() + + def add_hook(self, hook: HookFunction[_Target_T]) -> None: + """ + Adds a hook to the thread + ------------------------- + Hooks are executed automatically after a successful thread execution. + The returned value is parsed directly into the hook + + Parameters + ---------- + :param hook: This should be a function which takes the output value of `target` and should return None + """ + self.hooks.append(hook) + + def join(self, timeout: Optional[float] = None) -> bool: + """ + Halts the current thread execution until a thread completes or exceeds the timeout + + Parameters + ---------- + :param timeout: The maximum time allowed to halt the thread + + Returns + ------- + :returns bool: True if the thread is no-longer alive + + Raises + ------ + ThreadNotInitializedError: If the thread is not initialized + ThreadNotRunningError: If the thread is not running + """ + if not self._initialized: + raise exceptions.ThreadNotInitializedError() + + if self.status == ['Idle', 'Killed']: + raise exceptions.ThreadNotRunningError() + + super().join(timeout) + self._handle_exceptions() + return not self.is_alive() + + def get_return_value(self) -> _Target_T: + """ + Halts the current thread execution until the thread completes + + Returns + ------- + :returns Any: The return value of the target function + """ + self.join() + return self.result + + def kill(self, yielding: bool = False, timeout: float = 5) -> bool: + """ + Schedules a thread to be killed + + Parameters + ---------- + :param yielding: If true, halts the current thread execution until the thread is killed + :param timeout: The maximum number of seconds to wait before exiting + + Returns + ------- + :returns bool: False if the it exceeded the timeout without being killed + + Raises + ------ + ValueError: If the thread ident does not exist + ThreadNotInitializedError: If the thread is not initialized + ThreadNotRunningError: If the thread is not running + """ + if not self.is_alive(): + raise exceptions.ThreadNotRunningError() + + self.status = 'Kill Scheduled' + + res: int = ctypes.pythonapi.PyThreadState_SetAsyncExc( + ctypes.c_long(self.ident), ctypes.py_object(SystemExit) + ) + + if res == 0: + raise ValueError('Thread IDENT does not exist') + elif res > 1: + # Unexpected behaviour, something seriously went wrong + # https://docs.python.org/3/c-api/init.html#c.PyThreadState_SetAsyncExc + ctypes.pythonapi.PyThreadState_SetAsyncExc(self.ident, None) + raise SystemError( + f'Killing thread with ident [{self.ident}] failed!\nPyThreadState_SetAsyncExc returned: {res}' + ) + + if not yielding: + return True + + start = time.perf_counter() + while self.status != 'Killed': + time.sleep(0.01) + if (time.perf_counter() - start) >= timeout: + return False + + return True + + def start(self) -> None: + """ + Starts the thread + + Raises + ------ + ThreadNotInitializedError: If the thread is not initialized + ThreadStillRunningError: If there already is a running thread + """ + if self.is_alive(): + raise exceptions.ThreadStillRunningError() + + super().start() _P = ParamSpec('_P') class _ThreadWorker: - progress: float - thread: Thread + progress: float + thread: Thread - def __init__(self, thread: Thread, progress: float = 0) -> None: - self.thread = thread - self.progress = progress + def __init__(self, thread: Thread, progress: float = 0) -> None: + self.thread = thread + self.progress = progress class ParallelProcessing(Generic[_Target_P, _Target_T, _Dataset_T]): - """ - Multi-Threaded Parallel Processing - --------------------------------------- - - Type-Safe and provides more functionality on top - """ - - _threads: List[_ThreadWorker] - _completed: int - - status: ThreadStatus - function: TargetFunction - dataset: Sequence[Data_In] - max_threads: int - - overflow_args: Sequence[Overflow_In] - overflow_kwargs: Mapping[str, Overflow_In] - - def __init__( - self, - function: DatasetFunction[_Dataset_T, _Target_P, _Target_T], - dataset: Sequence[_Dataset_T], - max_threads: int = 8, - *overflow_args: Overflow_In, - **overflow_kwargs: Overflow_In, - ) -> None: - """ - Initializes a new Multi-Threaded Pool\n - Best for data processing - - Splits a dataset as evenly as it can among the threads and run them in parallel - - Parameters - ---------- - :param function: This should be the function to validate each data entry in the `dataset`, the first argument parsed will be a value of the dataset - :param dataset: This should be an iterable sequence of data entries - :param max_threads: This should be an integer value of the max threads allowed - :param *: These are arguments parsed to `threading.Thread` and `Thread` - :param **: These are arguments parsed to `thread.Thread` and `Thread` - - Raises - ------ - AssertionError: invalid `max_threads` """ - assert 0 <= max_threads, 'Cannot run a thread pool with max threads set to 0' - - self._threads = [] - self._completed = 0 - - self.status = 'Idle' - self.function = self._wrap_function(function) - self.dataset = dataset - self.max_threads = max_threads - - self.overflow_args = overflow_args - self.overflow_kwargs = overflow_kwargs - - def _wrap_function(self, function: TargetFunction) -> TargetFunction: - @wraps(function) - def wrapper( - index: int, - length: int, - data_chunk: Generator[_Dataset_T, None, None], - *args: _Target_P.args, - **kwargs: _Target_P.kwargs, - ) -> List[_Target_T]: - computed: List[Data_Out] = [] - - i = 0 - for data_entry in data_chunk: - v = function(data_entry, *args, **kwargs) - computed.append(v) - self._threads[index].progress = round((i + 1) / length, 5) - i += 1 - - self._completed += 1 - if self._completed == len(self._threads): - self.status = 'Completed' - - return computed - - return wrapper - - @property - def results(self) -> List[_Dataset_T]: - """ - The return value of the threads if completed + Multi-Threaded Parallel Processing + --------------------------------------- - Raises - ------ - ThreadNotInitializedError: If the threads are not initialized - ThreadNotRunningError: If the threads are not running - ThreadStillRunningError: If the threads are still running + Type-Safe and provides more functionality on top """ - if len(self._threads) == 0: - raise exceptions.ThreadNotInitializedError() - - results: List[Data_Out] = [] - for entry in self._threads: - results += entry.thread.result - return results - def is_alive(self) -> bool: - """ - See if any threads are still alive + _threads: List[_ThreadWorker] + _completed: int - Raises - ------ - ThreadNotInitializedError: If the thread is not initialized - """ - if len(self._threads) == 0: - raise exceptions.ThreadNotInitializedError() - return any(entry.thread.is_alive() for entry in self._threads) + status: ThreadStatus + function: TargetFunction + dataset: Sequence[Data_In] + max_threads: int - def get_return_values(self) -> List[_Dataset_T]: - """ - Halts the current thread execution until the thread completes + overflow_args: Sequence[Overflow_In] + overflow_kwargs: Mapping[str, Overflow_In] - Returns - ------- - :returns Any: The return value of the target function - """ - results: List[Data_Out] = [] - for entry in self._threads: - entry.thread.join() - results += entry.thread.result - return results - - def join(self) -> bool: - """ - Halts the current thread execution until a thread completes or exceeds the timeout - - Returns - ------- - :returns bool: True if the thread is no-longer alive - - Raises - ------ - ThreadNotInitializedError: If the thread is not initialized - ThreadNotRunningError: If the thread is not running - """ - if len(self._threads) == 0: - raise exceptions.ThreadNotInitializedError() - - if self.status == 'Idle': - raise exceptions.ThreadNotRunningError() - - for entry in self._threads: - entry.thread.join() - return True - - def kill(self) -> None: - """ - Kills the threads - - Raises - ------ - ThreadNotInitializedError: If the thread is not initialized - ThreadNotRunningError: If the thread is not running - """ - for entry in self._threads: - entry.thread.kill() - - def start(self) -> None: - """ - Starts the threads + def __init__( + self, + function: DatasetFunction[_Dataset_T, _Target_P, _Target_T], + dataset: Sequence[_Dataset_T], + max_threads: int = 8, + *overflow_args: Overflow_In, + **overflow_kwargs: Overflow_In, + ) -> None: + """ + Initializes a new Multi-Threaded Pool\n + Best for data processing + + Splits a dataset as evenly as it can among the threads and run them in parallel + + Parameters + ---------- + :param function: This should be the function to validate each data entry in the `dataset`, the first argument parsed will be a value of the dataset + :param dataset: This should be an iterable sequence of data entries + :param max_threads: This should be an integer value of the max threads allowed + :param *: These are arguments parsed to `threading.Thread` and `Thread` + :param **: These are arguments parsed to `thread.Thread` and `Thread` + + Raises + ------ + AssertionError: invalid `max_threads` + """ + assert 0 <= max_threads, 'Cannot run a thread pool with max threads set to 0' + + self._threads = [] + self._completed = 0 + + self.status = 'Idle' + self.function = self._wrap_function(function) + self.dataset = dataset + self.max_threads = max_threads + + self.overflow_args = overflow_args + self.overflow_kwargs = overflow_kwargs + + def _wrap_function(self, function: TargetFunction) -> TargetFunction: + @wraps(function) + def wrapper( + index: int, + length: int, + data_chunk: Generator[_Dataset_T, None, None], + *args: _Target_P.args, + **kwargs: _Target_P.kwargs, + ) -> List[_Target_T]: + computed: List[Data_Out] = [] + + i = 0 + for data_entry in data_chunk: + v = function(data_entry, *args, **kwargs) + computed.append(v) + self._threads[index].progress = round((i + 1) / length, 5) + i += 1 + + self._completed += 1 + if self._completed == len(self._threads): + self.status = 'Completed' + + return computed + + return wrapper + + @property + def results(self) -> List[_Dataset_T]: + """ + The return value of the threads if completed + + Raises + ------ + ThreadNotInitializedError: If the threads are not initialized + ThreadNotRunningError: If the threads are not running + ThreadStillRunningError: If the threads are still running + """ + if len(self._threads) == 0: + raise exceptions.ThreadNotInitializedError() + + results: List[Data_Out] = [] + for entry in self._threads: + results += entry.thread.result + return results + + def is_alive(self) -> bool: + """ + See if any threads are still alive + + Raises + ------ + ThreadNotInitializedError: If the thread is not initialized + """ + if len(self._threads) == 0: + raise exceptions.ThreadNotInitializedError() + return any(entry.thread.is_alive() for entry in self._threads) + + def get_return_values(self) -> List[_Dataset_T]: + """ + Halts the current thread execution until the thread completes + + Returns + ------- + :returns Any: The return value of the target function + """ + results: List[Data_Out] = [] + for entry in self._threads: + entry.thread.join() + results += entry.thread.result + return results + + def join(self) -> bool: + """ + Halts the current thread execution until a thread completes or exceeds the timeout + + Returns + ------- + :returns bool: True if the thread is no-longer alive + + Raises + ------ + ThreadNotInitializedError: If the thread is not initialized + ThreadNotRunningError: If the thread is not running + """ + if len(self._threads) == 0: + raise exceptions.ThreadNotInitializedError() + + if self.status == 'Idle': + raise exceptions.ThreadNotRunningError() + + for entry in self._threads: + entry.thread.join() + return True + + def kill(self) -> None: + """ + Kills the threads + + Raises + ------ + ThreadNotInitializedError: If the thread is not initialized + ThreadNotRunningError: If the thread is not running + """ + for entry in self._threads: + entry.thread.kill() + + def start(self) -> None: + """ + Starts the threads + + Raises + ------ + ThreadStillRunningError: If there already is a running thread + """ + if self.status == 'Running': + raise exceptions.ThreadStillRunningError() - Raises - ------ - ThreadStillRunningError: If there already is a running thread - """ - if self.status == 'Running': - raise exceptions.ThreadStillRunningError() - - self.status = 'Running' - max_threads = min(self.max_threads, len(self.dataset)) - - parsed_args = self.overflow_kwargs.get('args', []) - name_format = ( - self.overflow_kwargs.get('name') and self.overflow_kwargs['name'] + '%s' - ) - self.overflow_kwargs = { - i: v for i, v in self.overflow_kwargs.items() if i != 'name' and i != 'args' - } - - i = 0 - for chunkStart, chunkEnd in chunk_split(len(self.dataset), max_threads): - chunk_thread = Thread( - target=self.function, - args=[ - i, - chunkEnd - chunkStart, - (self.dataset[x] for x in range(chunkStart, chunkEnd)), - *parsed_args, - *self.overflow_args, - ], - name=name_format and name_format % i or None, - **self.overflow_kwargs, - ) - self._threads.append(_ThreadWorker(chunk_thread, 0)) - chunk_thread.start() - i += 1 + self.status = 'Running' + max_threads = min(self.max_threads, len(self.dataset)) + + parsed_args = self.overflow_kwargs.get('args', []) + name_format = ( + self.overflow_kwargs.get('name') and self.overflow_kwargs['name'] + '%s' + ) + self.overflow_kwargs = { + i: v for i, v in self.overflow_kwargs.items() if i != 'name' and i != 'args' + } + + i = 0 + for chunkStart, chunkEnd in chunk_split(len(self.dataset), max_threads): + chunk_thread = Thread( + target=self.function, + args=[ + i, + chunkEnd - chunkStart, + (self.dataset[x] for x in range(chunkStart, chunkEnd)), + *parsed_args, + *self.overflow_args, + ], + name=name_format and name_format % i or None, + **self.overflow_kwargs, + ) + self._threads.append(_ThreadWorker(chunk_thread, 0)) + chunk_thread.start() + i += 1 # Handle abrupt exit def service_shutdown(signum, frame): - if Settings.GRACEFUL_EXIT_ENABLED: - print('\nCaught signal %d' % signum) - print('Gracefully killing active threads') - - for thread in Threads: - if isinstance(thread, Thread): - try: - thread.kill() - except (exceptions.ThreadNotRunningError, exceptions.ThreadNotInitializedError): - pass - except Exception: - print('Failed to kill ident: %d' % thread.ident or 0) - sys.exit(0) + if Settings.GRACEFUL_EXIT_ENABLED: + print('\nCaught signal %d' % signum) + print('Gracefully killing active threads') + + for thread in Threads: + if isinstance(thread, Thread): + try: + thread.kill() + except ( + exceptions.ThreadNotRunningError, + exceptions.ThreadNotInitializedError, + ): + pass + except Exception: + print('Failed to kill ident: %d' % thread.ident or 0) + sys.exit(0) # Register the signal handlers diff --git a/src/thread/utils/__init__.py b/src/thread/utils/__init__.py index 1063cca..030ef71 100644 --- a/src/thread/utils/__init__.py +++ b/src/thread/utils/__init__.py @@ -5,5 +5,5 @@ from .config import Settings from . import ( - algorithm, + algorithm, ) diff --git a/src/thread/utils/algorithm.py b/src/thread/utils/algorithm.py index 0250abf..84c3d9c 100644 --- a/src/thread/utils/algorithm.py +++ b/src/thread/utils/algorithm.py @@ -12,42 +12,42 @@ def chunk_split(dataset_length: int, number_of_chunks: int) -> List[Tuple[int, int]]: - """ - Splits a dataset into balanced chunks + """ + Splits a dataset into balanced chunks - If the size of the dataset is not fully divisible by the number of chunks, it is split like this - > `[ [n+1], [n+1], [n+1], [n], [n], [n] ]` + If the size of the dataset is not fully divisible by the number of chunks, it is split like this + > `[ [n+1], [n+1], [n+1], [n], [n], [n] ]` - Parameters - ---------- - :param dataset_length: This should be the length of the dataset you want to split into chunks - :param number_of_chunks: The should be the number of chunks it will attempt to split into + Parameters + ---------- + :param dataset_length: This should be the length of the dataset you want to split into chunks + :param number_of_chunks: The should be the number of chunks it will attempt to split into - Returns - ------- - :returns list[tuple[int, int]]: The chunked dataset slices + Returns + ------- + :returns list[tuple[int, int]]: The chunked dataset slices - Raises - ------ - AssertionError: The number of chunks specified is larger than the dataset size - """ - assert ( - dataset_length >= number_of_chunks - ), 'The number of chunks specified is larger than the dataset size' + Raises + ------ + AssertionError: The number of chunks specified is larger than the dataset size + """ + assert ( + dataset_length >= number_of_chunks + ), 'The number of chunks specified is larger than the dataset size' - chunk_count = dataset_length // number_of_chunks - overflow = dataset_length % number_of_chunks + chunk_count = dataset_length // number_of_chunks + overflow = dataset_length % number_of_chunks - i = 0 - split = [] - while i < dataset_length: - chunk_length = chunk_count + int(overflow > 0) - b = i + chunk_length + i = 0 + split = [] + while i < dataset_length: + chunk_length = chunk_count + int(overflow > 0) + b = i + chunk_length - split.append((i, b)) - overflow -= 1 - i = b + split.append((i, b)) + overflow -= 1 + i = b - return split + return split diff --git a/src/thread/utils/config.py b/src/thread/utils/config.py index f9108c9..46b4480 100644 --- a/src/thread/utils/config.py +++ b/src/thread/utils/config.py @@ -1,14 +1,14 @@ class Settings: - """ - # Settings - `Non Instantiable` - """ + """ + # Settings + `Non Instantiable` + """ - GRACEFUL_EXIT_ENABLED: bool = True + GRACEFUL_EXIT_ENABLED: bool = True - def __init__(self): - raise NotImplementedError('This class is not instantiable') + def __init__(self): + raise NotImplementedError('This class is not instantiable') - @staticmethod - def set_graceful_exit(enabled: bool = True): - Settings.GRACEFUL_EXIT_ENABLED = enabled + @staticmethod + def set_graceful_exit(enabled: bool = True): + Settings.GRACEFUL_EXIT_ENABLED = enabled diff --git a/tests/test_algorithm.py b/tests/test_algorithm.py index 9e5ed4d..8d9b4be 100644 --- a/tests/test_algorithm.py +++ b/tests/test_algorithm.py @@ -3,43 +3,43 @@ def test_chunking_1(): - assert algorithm.chunk_split(5, 1) == [(0, 5)] + assert algorithm.chunk_split(5, 1) == [(0, 5)] def test_chunking_2(): - assert algorithm.chunk_split(5, 2) == [(0, 3), (3, 5)] + assert algorithm.chunk_split(5, 2) == [(0, 3), (3, 5)] def test_chunking_3(): - assert algorithm.chunk_split(100, 8) == [ - (0, 13), - (13, 26), - (26, 39), - (39, 52), - (52, 64), - (64, 76), - (76, 88), - (88, 100), - ] + assert algorithm.chunk_split(100, 8) == [ + (0, 13), + (13, 26), + (26, 39), + (39, 52), + (52, 64), + (64, 76), + (76, 88), + (88, 100), + ] def test_chunking_dynamic(): - dataset_length = random.randint(400, int(10e6)) - thread_count = random.randint(2, 100) + dataset_length = random.randint(400, int(10e6)) + thread_count = random.randint(2, 100) - expected_chunk_length_low = dataset_length // thread_count - expected_chunk_high = dataset_length % thread_count + expected_chunk_length_low = dataset_length // thread_count + expected_chunk_high = dataset_length % thread_count - i = 0 - heap = [] - while i < dataset_length: - chunk_length = expected_chunk_length_low + int(expected_chunk_high > 0) - b = i + chunk_length + i = 0 + heap = [] + while i < dataset_length: + chunk_length = expected_chunk_length_low + int(expected_chunk_high > 0) + b = i + chunk_length - heap.append((i, b)) - expected_chunk_high -= 1 - i = b + heap.append((i, b)) + expected_chunk_high -= 1 + i = b - assert ( - algorithm.chunk_split(dataset_length, thread_count) == heap - ), f'\nLength: {dataset_length}\nThreads: {thread_count}\nExpected: {heap}\nActual: {algorithm.chunk_split(dataset_length, thread_count)}' + assert ( + algorithm.chunk_split(dataset_length, thread_count) == heap + ), f'\nLength: {dataset_length}\nThreads: {thread_count}\nExpected: {heap}\nActual: {algorithm.chunk_split(dataset_length, thread_count)}' diff --git a/tests/test_decorator.py b/tests/test_decorator.py index 54b4f24..4b676d0 100644 --- a/tests/test_decorator.py +++ b/tests/test_decorator.py @@ -4,87 +4,90 @@ # >>>>>>>>>> Dummy Functions <<<<<<<<<< # def _dummy_target_raiseToPower(x: float, power: float, delay: float = 0): - time.sleep(delay) - return x**power - - + time.sleep(delay) + return x**power # >>>>>>>>>> Threaded <<<<<<<<<< # def test_threadedCreationNoParam(): - @threaded - def _run(*args): - return _dummy_target_raiseToPower(*args) - - x = _run(2, 2) - assert x.get_return_value() == 4 + @threaded + def _run(*args): + return _dummy_target_raiseToPower(*args) + + x = _run(2, 2) + assert x.get_return_value() == 4 + def test_threadedCreationEmptyParam(): - @threaded() - def _run(*args): - return _dummy_target_raiseToPower(*args) - - x = _run(2, 2) - assert x.get_return_value() == 4 + @threaded() + def _run(*args): + return _dummy_target_raiseToPower(*args) + + x = _run(2, 2) + assert x.get_return_value() == 4 + def test_threadedCreationWithParam(): - @threaded(daemon = True) - def _run(*args): - return _dummy_target_raiseToPower(*args) - - x = _run(2, 2) - assert x.daemon - assert x.get_return_value() == 4 + @threaded(daemon=True) + def _run(*args): + return _dummy_target_raiseToPower(*args) -def test_threadedArgJoin(): - @threaded(daemon = True, args = (1, 2, 3)) - def _run(*args): - return args - - x = _run(8, 9) - assert x.get_return_value() == (1, 2, 3, 8, 9) + x = _run(2, 2) + assert x.daemon + assert x.get_return_value() == 4 +def test_threadedArgJoin(): + @threaded(daemon=True, args=(1, 2, 3)) + def _run(*args): + return args + + x = _run(8, 9) + assert x.get_return_value() == (1, 2, 3, 8, 9) def test_processorCreationNoParam(): - @processor - def _run(args): - return _dummy_target_raiseToPower(*args) - - x = _run([[2, 2]]) - assert x.get_return_values() == [4] + @processor + def _run(args): + return _dummy_target_raiseToPower(*args) + + x = _run([[2, 2]]) + assert x.get_return_values() == [4] + def test_processorCreationEmptyParam(): - @processor() - def _run(args): - return _dummy_target_raiseToPower(*args) - - x = _run([[2, 2]]) - assert x.get_return_values() == [4] + @processor() + def _run(args): + return _dummy_target_raiseToPower(*args) + + x = _run([[2, 2]]) + assert x.get_return_values() == [4] + def test_processorCreationWithParam(): - @processor(daemon = True) - def _run(args): - return _dummy_target_raiseToPower(*args) - - x = _run([[2, 2]]) - assert len(x._threads) == 1 - assert x._threads[0].thread.daemon - assert x.get_return_values() == [4] + @processor(daemon=True) + def _run(args): + return _dummy_target_raiseToPower(*args) + + x = _run([[2, 2]]) + assert len(x._threads) == 1 + assert x._threads[0].thread.daemon + assert x.get_return_values() == [4] + def test_processorArgJoin(): - @processor(daemon = True, args = (1, 2, 3)) - def _run(data, *args): - return [*args, *data] - - x = _run([[8, 9]]) - assert x.get_return_values() == [[1, 2, 3, 8, 9]] + @processor(daemon=True, args=(1, 2, 3)) + def _run(data, *args): + return [*args, *data] + + x = _run([[8, 9]]) + assert x.get_return_values() == [[1, 2, 3, 8, 9]] + def test_processorMultiArgJoin(): - @processor(daemon = True, args = (1, 2, 3)) - def _run(data, *args): - return [*args, *data] - - x = _run([[8, 9], [10, 11]]) - assert x.get_return_values() == [[1, 2, 3, 8, 9], [1, 2, 3, 10, 11]] + @processor(daemon=True, args=(1, 2, 3)) + def _run(data, *args): + return [*args, *data] + + x = _run([[8, 9], [10, 11]]) + assert x.get_return_values() == [[1, 2, 3, 8, 9], [1, 2, 3, 10, 11]] diff --git a/tests/test_parallelprocessing.py b/tests/test_parallelprocessing.py index 53b68ac..78491ea 100644 --- a/tests/test_parallelprocessing.py +++ b/tests/test_parallelprocessing.py @@ -5,68 +5,58 @@ # >>>>>>>>>> Dummy Functions <<<<<<<<<< # def _dummy_dataProcessor(data_in: int, delay: float = 0) -> int: - time.sleep(delay) - return data_in - -def _dummy_raiseException(x: Exception, delay: float = 0): - time.sleep(delay) - raise x + time.sleep(delay) + return data_in +def _dummy_raiseException(x: Exception, delay: float = 0): + time.sleep(delay) + raise x # >>>>>>>>>> General Use <<<<<<<<<< # def test_threadsScaleDown(): - """This test is for testing if threads scale down `max_threads` when the dataset is lesser than the thread count""" - dataset = list(range(0, 2)) - new = ParallelProcessing( - function = _dummy_dataProcessor, - dataset = dataset, - max_threads = 4, - kwargs = { 'delay': 2 }, - daemon = True - ) - new.start() - assert len(new._threads) == 2 - -def test_threadsProcessing(): - """This test is for testing if threads correctly order data in the `dataset` arrangement""" - dataset = list(range(0, 500)) - new = ParallelProcessing( - function = _dummy_dataProcessor, - dataset = dataset, - args = [0.001], - daemon = True - ) - new.start() - assert new.get_return_values() == dataset + """This test is for testing if threads scale down `max_threads` when the dataset is lesser than the thread count""" + dataset = list(range(0, 2)) + new = ParallelProcessing( + function=_dummy_dataProcessor, + dataset=dataset, + max_threads=4, + kwargs={'delay': 2}, + daemon=True, + ) + new.start() + assert len(new._threads) == 2 +def test_threadsProcessing(): + """This test is for testing if threads correctly order data in the `dataset` arrangement""" + dataset = list(range(0, 500)) + new = ParallelProcessing( + function=_dummy_dataProcessor, dataset=dataset, args=[0.001], daemon=True + ) + new.start() + assert new.get_return_values() == dataset # >>>>>>>>>> Raising Exceptions <<<<<<<<<< # def test_raises_StillRunningError(): - """This test should raise ThreadStillRunningError""" - dataset = list(range(0, 8)) - new = ParallelProcessing( - function = _dummy_dataProcessor, - dataset = dataset, - args = [1], - daemon = True - ) - new.start() - with pytest.raises(exceptions.ThreadStillRunningError): - new.results + """This test should raise ThreadStillRunningError""" + dataset = list(range(0, 8)) + new = ParallelProcessing( + function=_dummy_dataProcessor, dataset=dataset, args=[1], daemon=True + ) + new.start() + with pytest.raises(exceptions.ThreadStillRunningError): + new.results + def test_raises_RunTimeError(): - """This test should raise a RunTimeError""" - dataset = [RuntimeError()] * 8 - new = ParallelProcessing( - function = _dummy_raiseException, - dataset = dataset, - args = [0.01], - daemon = True - ) - with pytest.raises(RuntimeError): - new.start() - new.join() + """This test should raise a RunTimeError""" + dataset = [RuntimeError()] * 8 + new = ParallelProcessing( + function=_dummy_raiseException, dataset=dataset, args=[0.01], daemon=True + ) + with pytest.raises(RuntimeError): + new.start() + new.join() diff --git a/tests/test_thread.py b/tests/test_thread.py index 47b5c20..a5dd6e4 100644 --- a/tests/test_thread.py +++ b/tests/test_thread.py @@ -5,142 +5,133 @@ # >>>>>>>>>> Dummy Functions <<<<<<<<<< # def _dummy_target_raiseToPower(x: float, power: float, delay: float = 0): - time.sleep(delay) - return x**power + time.sleep(delay) + return x**power -def _dummy_raiseException(x: Exception, delay: float = 0): - time.sleep(delay) - raise x -def _dummy_iterative(itemCount: int, pTime: float = 0.1, delay: float = 0): - time.sleep(delay) - for i in range(itemCount): - time.sleep(pTime) +def _dummy_raiseException(x: Exception, delay: float = 0): + time.sleep(delay) + raise x +def _dummy_iterative(itemCount: int, pTime: float = 0.1, delay: float = 0): + time.sleep(delay) + for i in range(itemCount): + time.sleep(pTime) # >>>>>>>>>> General Use <<<<<<<<<< # def test_threadCreation(): - """This test is for testing parsing of args and kwargs and `.join()` method""" - new = Thread( - target = _dummy_target_raiseToPower, - args = [4], - kwargs = { 'power': 2 }, - daemon = True - ) - new.start() - assert new.join() - assert new.result == 16 + """This test is for testing parsing of args and kwargs and `.join()` method""" + new = Thread( + target=_dummy_target_raiseToPower, args=[4], kwargs={'power': 2}, daemon=True + ) + new.start() + assert new.join() + assert new.result == 16 + def test_threadingThreadParsing(): - """This test is for testing parsing arguments to `threading.Thead`""" - new = Thread( - target = _dummy_target_raiseToPower, - args = [4, 2, 5], - name = 'testingThread', - daemon = True - ) - new.start() - assert new.name == 'testingThread' + """This test is for testing parsing arguments to `threading.Thead`""" + new = Thread( + target=_dummy_target_raiseToPower, + args=[4, 2, 5], + name='testingThread', + daemon=True, + ) + new.start() + assert new.name == 'testingThread' + def test_suppressAll(): - """This test is for testing that errors are suppressed properly""" - new = Thread( - target = _dummy_raiseException, - args = [ValueError()], - suppress_errors = True, - daemon = True - ) - new.start() - new.join() - assert len(new.errors) == 1 - assert isinstance(new.errors[0], ValueError) + """This test is for testing that errors are suppressed properly""" + new = Thread( + target=_dummy_raiseException, + args=[ValueError()], + suppress_errors=True, + daemon=True, + ) + new.start() + new.join() + assert len(new.errors) == 1 + assert isinstance(new.errors[0], ValueError) + def test_ignoreSpecificError(): - """This test is for testing that specific errors are ignored properly""" - new = Thread( - target = _dummy_raiseException, - args = [ValueError()], - ignore_errors = [ValueError], - daemon = True - ) - new.start() - new.join() - assert len(new.errors) == 0 + """This test is for testing that specific errors are ignored properly""" + new = Thread( + target=_dummy_raiseException, + args=[ValueError()], + ignore_errors=[ValueError], + daemon=True, + ) + new.start() + new.join() + assert len(new.errors) == 0 -def test_ignoreAll(): - """This test is for testing that all errors are ignored properly""" - new = Thread( - target = _dummy_raiseException, - args = [ValueError()], - ignore_errors = [Exception], - daemon = True - ) - new.start() - new.join() - assert len(new.errors) == 0 -def test_threadKilling(): - """This test is for testing that threads are killed properly""" - stdout = [] +def test_ignoreAll(): + """This test is for testing that all errors are ignored properly""" + new = Thread( + target=_dummy_raiseException, + args=[ValueError()], + ignore_errors=[Exception], + daemon=True, + ) + new.start() + new.join() + assert len(new.errors) == 0 - def _dummy_target_killThread(x: int, delay: float = 0): - for i in range(x): - stdout.append(i) - time.sleep(delay) - new = Thread( - target = _dummy_target_killThread, - args = [4, 1], - daemon = True - ) - new.start() - new.kill(True) - assert not new.is_alive() - assert len(stdout) != 4 +def test_threadKilling(): + """This test is for testing that threads are killed properly""" + stdout = [] + def _dummy_target_killThread(x: int, delay: float = 0): + for i in range(x): + stdout.append(i) + time.sleep(delay) + new = Thread(target=_dummy_target_killThread, args=[4, 1], daemon=True) + new.start() + new.kill(True) + assert not new.is_alive() + assert len(stdout) != 4 # >>>>>>>>>> Raising Exceptions <<<<<<<<<< # def test_raises_stillRunningError(): - """This test should raise ThreadStillRunningError""" - new = Thread( - target = _dummy_target_raiseToPower, - args = [4, 2, 5], - daemon = True - ) - new.start() + """This test should raise ThreadStillRunningError""" + new = Thread(target=_dummy_target_raiseToPower, args=[4, 2, 5], daemon=True) + new.start() + + with pytest.raises(exceptions.ThreadStillRunningError): + new.result - with pytest.raises(exceptions.ThreadStillRunningError): - new.result def test_raises_ignoreSpecificError(): - """This test is for testing that non-specified errors are not ignored""" - new = Thread( - target = _dummy_raiseException, - args = [FileExistsError()], - ignore_errors = [ValueError], - suppress_errors = False, - daemon = True - ) - with pytest.raises(FileExistsError): - new.start() - new.join() + """This test is for testing that non-specified errors are not ignored""" + new = Thread( + target=_dummy_raiseException, + args=[FileExistsError()], + ignore_errors=[ValueError], + suppress_errors=False, + daemon=True, + ) + with pytest.raises(FileExistsError): + new.start() + new.join() + def test_raises_HookError(): - """This test should raise """ - new = Thread( - target = _dummy_target_raiseToPower, - args = [4, 2], - daemon = True - ) - - def newhook(x: int): - raise RuntimeError() - new.add_hook(newhook) - - with pytest.raises(exceptions.HookRuntimeError): - new.start() - new.join() + """This test should raise""" + new = Thread(target=_dummy_target_raiseToPower, args=[4, 2], daemon=True) + + def newhook(x: int): + raise RuntimeError() + + new.add_hook(newhook) + + with pytest.raises(exceptions.HookRuntimeError): + new.start() + new.join()