taskrunner.py 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. import logging
  2. import threading
  3. import time
  4. logger = logging.getLogger(__name__)
  5. class TaskRunner:
  6. """
  7. Class to run a list of tasks periodically at a given interval.
  8. """
  9. def __init__(self, tasks: tuple[tuple]) -> None:
  10. """
  11. Initialize the TaskRunner.
  12. Parameters:
  13. tasks (tuple): Tuple of tuples containing the callable
  14. and the calling period in seconds.
  15. """
  16. self.__tasks = tasks
  17. self.__stop_event = threading.Event()
  18. self.__thread = threading.Thread(target=self.__run, daemon=True)
  19. def start(self) -> None:
  20. """
  21. Start executing the tasks.
  22. """
  23. logger.info("Started a task runner thread.")
  24. logger.info(f"Tasks: {str(self.__tasks)}")
  25. self.__thread.start()
  26. def __run(self) -> None:
  27. """
  28. Private method where the thread executes the given methods
  29. at their specified frequencies.
  30. """
  31. next_call_times = [time.time() for _ in self.__tasks]
  32. while not self.__stop_event.is_set():
  33. now = time.time()
  34. for i, task in enumerate(self.__tasks):
  35. to_call, period = task
  36. if now >= next_call_times[i]:
  37. logger.debug(
  38. f"Now: {now}, Calling: {str(to_call)}, "
  39. f"Runner id: {id(self)}"
  40. )
  41. # Keep the thread alive even if an exception occurs
  42. # so that other tasks can keep running
  43. try:
  44. to_call() # Call the method
  45. except Exception as e:
  46. logger.exception(e)
  47. next_call_times[i] = now + period
  48. # Wait for the next call or stop event
  49. earliest_next_call_time = min(next_call_times) - now
  50. logger.debug(
  51. f"Time to next call: {earliest_next_call_time} sec, "
  52. f"Runner id: {id(self)}"
  53. )
  54. if earliest_next_call_time > 0:
  55. self.__stop_event.wait(earliest_next_call_time)
  56. def __del__(self) -> None:
  57. """
  58. Cleanup resources when the object is destroyed.
  59. """
  60. self.stop()
  61. def stop(self) -> None:
  62. """
  63. Signal the thread to stop and wait for graceful exit.
  64. """
  65. logger.debug(f"Firing stop event, Runner id: {id(self)}")
  66. self.__stop_event.set()
  67. if self.__thread.is_alive():
  68. logger.debug(
  69. f"Waiting for the runner thread to terminate, "
  70. f"Runner id: {id(self)}"
  71. )
  72. self.__thread.join()