dotfiles/config/lemonbar/updaters.py
2018-10-14 16:59:49 +02:00

270 lines
7.4 KiB
Python
Executable file

#!/usr/bin/env python3
import math
import functools
import threading
import pyinotify
import os
import time
import logging
import coloredlogs
import i3ipc
from display import Text
coloredlogs.install(level='DEBUG', fmt='%(levelname)s %(message)s')
log = logging.getLogger()
# TODO Sync bar update with PeriodicUpdater updates
notBusy = threading.Event()
class Updater:
@staticmethod
def init():
PeriodicUpdater.init()
InotifyUpdater.init()
notBusy.set()
def updateText(self, text):
print(text)
def fetcher(self):
return "{} refreshed".format(self)
def __init__(self):
self.lock = threading.Lock()
def refreshData(self):
# TODO OPTI Maybe discard the refresh if there's already another one?
self.lock.acquire()
try:
data = self.fetcher()
except BaseException as e:
log.error(e, exc_info=True)
data = ""
self.updateText(data)
self.lock.release()
class PeriodicUpdaterThread(threading.Thread):
def run(self):
# TODO Sync with system clock
counter = 0
while True:
notBusy.set()
if PeriodicUpdater.intervalsChanged \
.wait(timeout=PeriodicUpdater.intervalStep):
# ↑ sleeps here
notBusy.clear()
PeriodicUpdater.intervalsChanged.clear()
counter = 0
for providerList in PeriodicUpdater.intervals.copy().values():
for provider in providerList.copy():
provider.refreshData()
else:
notBusy.clear()
counter += PeriodicUpdater.intervalStep
counter = counter % PeriodicUpdater.intervalLoop
for interval in PeriodicUpdater.intervals.keys():
if counter % interval == 0:
for provider in PeriodicUpdater.intervals[interval]:
provider.refreshData()
class PeriodicUpdater(Updater):
"""
Needs to call :func:`PeriodicUpdater.changeInterval` in `__init__`
"""
intervals = dict()
intervalStep = None
intervalLoop = None
updateThread = PeriodicUpdaterThread(daemon=True)
intervalsChanged = threading.Event()
@staticmethod
def gcds(*args):
return functools.reduce(math.gcd, args)
@staticmethod
def lcm(a, b):
"""Return lowest common multiple."""
return a * b // math.gcd(a, b)
@staticmethod
def lcms(*args):
"""Return lowest common multiple."""
return functools.reduce(PeriodicUpdater.lcm, args)
@staticmethod
def updateIntervals():
intervalsList = list(PeriodicUpdater.intervals.keys())
PeriodicUpdater.intervalStep = PeriodicUpdater.gcds(*intervalsList)
PeriodicUpdater.intervalLoop = PeriodicUpdater.lcms(*intervalsList)
PeriodicUpdater.intervalsChanged.set()
@staticmethod
def init():
PeriodicUpdater.updateThread.start()
def __init__(self):
Updater.__init__(self)
self.interval = None
def changeInterval(self, interval):
assert isinstance(interval, int)
if self.interval is not None:
PeriodicUpdater.intervals[self.interval].remove(self)
self.interval = interval
if interval not in PeriodicUpdater.intervals:
PeriodicUpdater.intervals[interval] = set()
PeriodicUpdater.intervals[interval].add(self)
PeriodicUpdater.updateIntervals()
class InotifyUpdaterEventHandler(pyinotify.ProcessEvent):
def process_default(self, event):
# DEBUG
# from pprint import pprint
# pprint(event.__dict__)
# return
assert event.path in InotifyUpdater.paths
if 0 in InotifyUpdater.paths[event.path]:
for provider in InotifyUpdater.paths[event.path][0]:
provider.refreshData()
if event.name in InotifyUpdater.paths[event.path]:
for provider in InotifyUpdater.paths[event.path][event.name]:
provider.refreshData()
class InotifyUpdater(Updater):
"""
Needs to call :func:`PeriodicUpdater.changeInterval` in `__init__`
"""
wm = pyinotify.WatchManager()
paths = dict()
@staticmethod
def init():
notifier = pyinotify.ThreadedNotifier(InotifyUpdater.wm,
InotifyUpdaterEventHandler())
notifier.start()
# TODO Mask for folders
MASK = pyinotify.IN_CREATE | pyinotify.IN_MODIFY | pyinotify.IN_DELETE
def addPath(self, path, refresh=True):
path = os.path.realpath(os.path.expanduser(path))
# Detect if file or folder
if os.path.isdir(path):
self.dirpath = path
# 0: Directory watcher
self.filename = 0
elif os.path.isfile(path):
self.dirpath = os.path.dirname(path)
self.filename = os.path.basename(path)
else:
raise FileNotFoundError("No such file or directory: '{}'"
.format(path))
# Register watch action
if self.dirpath not in InotifyUpdater.paths:
InotifyUpdater.paths[self.dirpath] = dict()
if self.filename not in InotifyUpdater.paths[self.dirpath]:
InotifyUpdater.paths[self.dirpath][self.filename] = set()
InotifyUpdater.paths[self.dirpath][self.filename].add(self)
# Add watch
InotifyUpdater.wm.add_watch(self.dirpath, InotifyUpdater.MASK)
if refresh:
self.refreshData()
class ThreadedUpdaterThread(threading.Thread):
def __init__(self, updater, *args, **kwargs):
self.updater = updater
threading.Thread.__init__(self, *args, **kwargs)
self.looping = True
def run(self):
try:
while self.looping:
self.updater.loop()
except BaseException as e:
log.error("Error with {}".format(self.updater))
log.error(e, exc_info=True)
self.updater.updateText("")
class ThreadedUpdater(Updater):
"""
Must implement loop(), and call start()
"""
def __init__(self):
Updater.__init__(self)
self.thread = ThreadedUpdaterThread(self, daemon=True)
def loop(self):
self.refreshData()
time.sleep(10)
def start(self):
self.thread.start()
class I3Updater(ThreadedUpdater):
# TODO OPTI One i3 connection for all
def __init__(self):
ThreadedUpdater.__init__(self)
self.i3 = i3ipc.Connection()
self.start()
def on(self, event, function):
self.i3.on(event, function)
def loop(self):
self.i3.main()
class MergedUpdater(Updater):
# TODO OPTI Do not update until end of periodic batch
def fetcher(self):
text = Text()
for updater in self.updaters:
text.append(self.texts[updater])
if not len(text):
return None
return text
def __init__(self, *args):
Updater.__init__(self)
self.updaters = []
self.texts = dict()
for updater in args:
assert isinstance(updater, Updater)
def newUpdateText(updater, text):
self.texts[updater] = text
self.refreshData()
updater.updateText = newUpdateText.__get__(updater, Updater)
self.updaters.append(updater)
self.texts[updater] = ''