#!/usr/bin/env python3 import math import functools import threading import pyinotify import os import time import logging import coloredlogs import i3ipc from display import Text coloredlogs.install(level='DEBUG', fmt='%(levelname)s %(message)s') log = logging.getLogger() # TODO Sync bar update with PeriodicUpdater updates notBusy = threading.Event() class Updater: @staticmethod def init(): PeriodicUpdater.init() InotifyUpdater.init() notBusy.set() def updateText(self, text): print(text) def fetcher(self): return "{} refreshed".format(self) def __init__(self): self.lock = threading.Lock() def refreshData(self): # TODO OPTI Maybe discard the refresh if there's already another one? self.lock.acquire() try: data = self.fetcher() except BaseException as e: log.error(e, exc_info=True) data = "" self.updateText(data) self.lock.release() class PeriodicUpdaterThread(threading.Thread): def run(self): # TODO Sync with system clock counter = 0 while True: notBusy.set() if PeriodicUpdater.intervalsChanged \ .wait(timeout=PeriodicUpdater.intervalStep): # ↑ sleeps here notBusy.clear() PeriodicUpdater.intervalsChanged.clear() counter = 0 for providerList in PeriodicUpdater.intervals.copy().values(): for provider in providerList.copy(): provider.refreshData() else: notBusy.clear() counter += PeriodicUpdater.intervalStep counter = counter % PeriodicUpdater.intervalLoop for interval in PeriodicUpdater.intervals.keys(): if counter % interval == 0: for provider in PeriodicUpdater.intervals[interval]: provider.refreshData() class PeriodicUpdater(Updater): """ Needs to call :func:`PeriodicUpdater.changeInterval` in `__init__` """ intervals = dict() intervalStep = None intervalLoop = None updateThread = PeriodicUpdaterThread(daemon=True) intervalsChanged = threading.Event() @staticmethod def gcds(*args): return functools.reduce(math.gcd, args) @staticmethod def lcm(a, b): """Return lowest common multiple.""" return a * b // math.gcd(a, b) @staticmethod def lcms(*args): """Return lowest common multiple.""" return functools.reduce(PeriodicUpdater.lcm, args) @staticmethod def updateIntervals(): intervalsList = list(PeriodicUpdater.intervals.keys()) PeriodicUpdater.intervalStep = PeriodicUpdater.gcds(*intervalsList) PeriodicUpdater.intervalLoop = PeriodicUpdater.lcms(*intervalsList) PeriodicUpdater.intervalsChanged.set() @staticmethod def init(): PeriodicUpdater.updateThread.start() def __init__(self): Updater.__init__(self) self.interval = None def changeInterval(self, interval): assert isinstance(interval, int) if self.interval is not None: PeriodicUpdater.intervals[self.interval].remove(self) self.interval = interval if interval not in PeriodicUpdater.intervals: PeriodicUpdater.intervals[interval] = set() PeriodicUpdater.intervals[interval].add(self) PeriodicUpdater.updateIntervals() class InotifyUpdaterEventHandler(pyinotify.ProcessEvent): def process_default(self, event): # DEBUG # from pprint import pprint # pprint(event.__dict__) # return assert event.path in InotifyUpdater.paths if 0 in InotifyUpdater.paths[event.path]: for provider in InotifyUpdater.paths[event.path][0]: provider.refreshData() if event.name in InotifyUpdater.paths[event.path]: for provider in InotifyUpdater.paths[event.path][event.name]: provider.refreshData() class InotifyUpdater(Updater): """ Needs to call :func:`PeriodicUpdater.changeInterval` in `__init__` """ wm = pyinotify.WatchManager() paths = dict() @staticmethod def init(): notifier = pyinotify.ThreadedNotifier(InotifyUpdater.wm, InotifyUpdaterEventHandler()) notifier.start() # TODO Mask for folders MASK = pyinotify.IN_CREATE | pyinotify.IN_MODIFY | pyinotify.IN_DELETE def addPath(self, path, refresh=True): path = os.path.realpath(os.path.expanduser(path)) # Detect if file or folder if os.path.isdir(path): self.dirpath = path # 0: Directory watcher self.filename = 0 elif os.path.isfile(path): self.dirpath = os.path.dirname(path) self.filename = os.path.basename(path) else: raise FileNotFoundError("No such file or directory: '{}'" .format(path)) # Register watch action if self.dirpath not in InotifyUpdater.paths: InotifyUpdater.paths[self.dirpath] = dict() if self.filename not in InotifyUpdater.paths[self.dirpath]: InotifyUpdater.paths[self.dirpath][self.filename] = set() InotifyUpdater.paths[self.dirpath][self.filename].add(self) # Add watch InotifyUpdater.wm.add_watch(self.dirpath, InotifyUpdater.MASK) if refresh: self.refreshData() class ThreadedUpdaterThread(threading.Thread): def __init__(self, updater, *args, **kwargs): self.updater = updater threading.Thread.__init__(self, *args, **kwargs) self.looping = True def run(self): try: while self.looping: self.updater.loop() except BaseException as e: log.error("Error with {}".format(self.updater)) log.error(e, exc_info=True) self.updater.updateText("") class ThreadedUpdater(Updater): """ Must implement loop(), and call start() """ def __init__(self): Updater.__init__(self) self.thread = ThreadedUpdaterThread(self, daemon=True) def loop(self): self.refreshData() time.sleep(10) def start(self): self.thread.start() class I3Updater(ThreadedUpdater): # TODO OPTI One i3 connection for all def __init__(self): ThreadedUpdater.__init__(self) self.i3 = i3ipc.Connection() self.start() def on(self, event, function): self.i3.on(event, function) def loop(self): self.i3.main() class MergedUpdater(Updater): # TODO OPTI Do not update until end of periodic batch def fetcher(self): text = Text() for updater in self.updaters: text.append(self.texts[updater]) if not len(text): return None return text def __init__(self, *args): Updater.__init__(self) self.updaters = [] self.texts = dict() for updater in args: assert isinstance(updater, Updater) def newUpdateText(updater, text): self.texts[updater] = text self.refreshData() updater.updateText = newUpdateText.__get__(updater, Updater) self.updaters.append(updater) self.texts[updater] = ''