| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384 |
- import logging
- import threading
- import time
- logger = logging.getLogger(__name__)
- class TaskRunner:
- """
- Class to run a list of tasks periodically at a given interval.
- """
- def __init__(self, tasks: tuple[tuple]) -> None:
- """
- Initialize the TaskRunner.
- Parameters:
- tasks (tuple): Tuple of tuples containing the callable
- and the calling period in seconds.
- """
- self.__tasks = tasks
- self.__stop_event = threading.Event()
- self.__thread = threading.Thread(target=self.__run, daemon=True)
- def start(self) -> None:
- """
- Start executing the tasks.
- """
- logger.info("Started a task runner thread.")
- logger.info(f"Tasks: {str(self.__tasks)}")
- self.__thread.start()
- def __run(self) -> None:
- """
- Private method where the thread executes the given methods
- at their specified frequencies.
- """
- next_call_times = [time.time() for _ in self.__tasks]
- while not self.__stop_event.is_set():
- now = time.time()
- for i, task in enumerate(self.__tasks):
- to_call, period = task
- if now >= next_call_times[i]:
- logger.debug(
- f"Now: {now}, Calling: {str(to_call)}, "
- f"Runner id: {id(self)}"
- )
- # Keep the thread alive even if an exception occurs
- # so that other tasks can keep running
- try:
- to_call() # Call the method
- except Exception as e:
- logger.exception(e)
- next_call_times[i] = now + period
- # Wait for the next call or stop event
- earliest_next_call_time = min(next_call_times) - now
- logger.debug(
- f"Time to next call: {earliest_next_call_time} sec, "
- f"Runner id: {id(self)}"
- )
- if earliest_next_call_time > 0:
- self.__stop_event.wait(earliest_next_call_time)
- def __del__(self) -> None:
- """
- Cleanup resources when the object is destroyed.
- """
- self.stop()
- def stop(self) -> None:
- """
- Signal the thread to stop and wait for graceful exit.
- """
- logger.debug(f"Firing stop event, Runner id: {id(self)}")
- self.__stop_event.set()
- if self.__thread.is_alive():
- logger.debug(
- f"Waiting for the runner thread to terminate, "
- f"Runner id: {id(self)}"
- )
- self.__thread.join()
|