Source code for haka_mqtt.scheduler

from bisect import insort_right, bisect_left


[docs]class Deadline(object): def __init__(self, deadline_entry): self.__deadline_entry = deadline_entry
[docs] def expired(self): """bool: True if callback has already been called; False otherwise.""" return self.__deadline_entry.expired
[docs] def cancel(self): """Stops a scheduled callback from being made; has no effect if cancel is called after the callback has already been made.""" self.__deadline_entry.cancel()
class _DeadlineEntry(object): def __init__(self, instant, queue, cb): self.instant = instant self.cb = cb self.queue = queue self.expired = False def cancel(self): if not self.expired: self.expired = True idx = bisect_left(self.queue, self) while idx < len(self.queue): if id(self.queue[idx]) == id(self): del self.queue[idx] break elif self.queue[idx] > self: raise AssertionError() else: idx += 1 def __eq__(self, other): return self.instant == other.instant def __ne__(self, other): return self.instant != other.instant def __lt__(self, other): return self.instant < other.instant def __gt__(self, other): return self.instant > other.instant def __le__(self, other): return self.instant <= other.instant def __ge__(self, other): return self.instant >= other.instant
[docs]class Scheduler(object): def __init__(self): self._queue = []
[docs] def instant(self): """Returns the current tick. Returns ------- int """ raise NotImplementedError()
[docs] def remaining(self): """Duration remaining to next scheduled callback. Returns ------- int or None """ if self._queue: rv = self._queue[0].instant - self.instant() else: rv = None return rv
[docs] def add(self, duration, cb): """Adds `duration` to `self.instant()` and calls all scheduled callbacks. Parameters ---------- duration: int Number of ticks passed. cb: callable() No calling with so Returns ------- Deadline """ de = _DeadlineEntry(self.instant() + duration, self._queue, cb) insort_right(self._queue, de) return Deadline(de)
def __len__(self): return len(self._queue)
[docs]class DurationScheduler(Scheduler): def __init__(self): Scheduler.__init__(self) self._instant = 0 self._queue = []
[docs] def instant(self): """Returns the current tick. Returns ------- int """ return self._instant
[docs] def poll(self, duration): """Adds `duration` to `self.instant()` and calls all scheduled callbacks. Parameters ---------- duration: int """ self._instant += duration while self._queue and self._queue[0].instant <= self.instant(): de = self._queue.pop(0) de.expired = True de.cb()
[docs]class ClockScheduler(Scheduler): def __init__(self, clock): Scheduler.__init__(self) self.__clock = clock
[docs] def instant(self): """Current clock instant. Returns ------- int Current clock scheduler. """ return self.__clock.time()
[docs] def poll(self): """Calls all callbacks awaiting execution to this point.""" while self._queue and self._queue[0].instant <= self.instant(): de = self._queue.pop(0) de.expired = True de.cb()