dotfiles/hm/desktop/frobar/frobar/updaters.py

241 lines
7 KiB
Python

#!/usr/bin/env python3
import functools
import logging
import math
import os
import subprocess
import threading
import time
import coloredlogs
import i3ipc
import pyinotify
from frobar.common import notBusy
from frobar.display import Element
coloredlogs.install(level="DEBUG", fmt="%(levelname)s %(message)s")
log = logging.getLogger()
# TODO Sync bar update with PeriodicUpdater updates
class Updater:
@staticmethod
def init() -> None:
PeriodicUpdater.init()
InotifyUpdater.init()
notBusy.set()
def updateText(self, text: Element) -> None:
print(text)
def fetcher(self) -> Element:
return "{} refreshed".format(self)
def __init__(self) -> None:
self.lock = threading.Lock()
def refreshData(self) -> None:
# TODO OPTI Maybe discard the refresh if there's already another one?
self.lock.acquire()
try:
data = self.fetcher()
except BaseException as e:
log.error(e, exc_info=True)
data = ""
self.updateText(data)
self.lock.release()
class PeriodicUpdaterThread(threading.Thread):
def run(self) -> None:
# TODO Sync with system clock
counter = 0
while True:
notBusy.set()
if PeriodicUpdater.intervalsChanged.wait(
timeout=PeriodicUpdater.intervalStep
):
# ↑ sleeps here
notBusy.clear()
PeriodicUpdater.intervalsChanged.clear()
counter = 0
for providerList in PeriodicUpdater.intervals.copy().values():
for provider in providerList.copy():
provider.refreshData()
else:
notBusy.clear()
assert PeriodicUpdater.intervalStep is not None
counter += PeriodicUpdater.intervalStep
counter = counter % PeriodicUpdater.intervalLoop
for interval in PeriodicUpdater.intervals.keys():
if counter % interval == 0:
for provider in PeriodicUpdater.intervals[interval]:
provider.refreshData()
class PeriodicUpdater(Updater):
"""
Needs to call :func:`PeriodicUpdater.changeInterval` in `__init__`
"""
intervals: dict[int, set["PeriodicUpdater"]] = dict()
intervalStep: int | None = None
intervalLoop: int
updateThread: threading.Thread = PeriodicUpdaterThread(daemon=True)
intervalsChanged = threading.Event()
@staticmethod
def gcds(*args: int) -> int:
return functools.reduce(math.gcd, args)
@staticmethod
def lcm(a: int, b: int) -> int:
"""Return lowest common multiple."""
return a * b // math.gcd(a, b)
@staticmethod
def lcms(*args: int) -> int:
"""Return lowest common multiple."""
return functools.reduce(PeriodicUpdater.lcm, args)
@staticmethod
def updateIntervals() -> None:
intervalsList = list(PeriodicUpdater.intervals.keys())
PeriodicUpdater.intervalStep = PeriodicUpdater.gcds(*intervalsList)
PeriodicUpdater.intervalLoop = PeriodicUpdater.lcms(*intervalsList)
PeriodicUpdater.intervalsChanged.set()
@staticmethod
def init() -> None:
PeriodicUpdater.updateThread.start()
def __init__(self) -> None:
Updater.__init__(self)
self.interval: int | None = None
def changeInterval(self, interval: int) -> None:
if self.interval is not None:
PeriodicUpdater.intervals[self.interval].remove(self)
self.interval = interval
if interval not in PeriodicUpdater.intervals:
PeriodicUpdater.intervals[interval] = set()
PeriodicUpdater.intervals[interval].add(self)
PeriodicUpdater.updateIntervals()
class InotifyUpdaterEventHandler(pyinotify.ProcessEvent):
def process_default(self, event: pyinotify.Event) -> None:
assert event.path in InotifyUpdater.paths
if 0 in InotifyUpdater.paths[event.path]:
for provider in InotifyUpdater.paths[event.path][0]:
provider.refreshData()
if event.name in InotifyUpdater.paths[event.path]:
for provider in InotifyUpdater.paths[event.path][event.name]:
provider.refreshData()
class InotifyUpdater(Updater):
"""
Needs to call :func:`PeriodicUpdater.changeInterval` in `__init__`
"""
wm = pyinotify.WatchManager()
paths: dict[str, dict[str | int, set["InotifyUpdater"]]] = dict()
@staticmethod
def init() -> None:
notifier = pyinotify.ThreadedNotifier(
InotifyUpdater.wm, InotifyUpdaterEventHandler()
)
notifier.start()
# TODO Mask for folders
MASK = pyinotify.IN_CREATE | pyinotify.IN_MODIFY | pyinotify.IN_DELETE
def addPath(self, path: str, refresh: bool = True) -> None:
path = os.path.realpath(os.path.expanduser(path))
# Detect if file or folder
if os.path.isdir(path):
self.dirpath: str = path
# 0: Directory watcher
self.filename: str | int = 0
elif os.path.isfile(path):
self.dirpath = os.path.dirname(path)
self.filename = os.path.basename(path)
else:
raise FileNotFoundError("No such file or directory: '{}'".format(path))
# Register watch action
if self.dirpath not in InotifyUpdater.paths:
InotifyUpdater.paths[self.dirpath] = dict()
if self.filename not in InotifyUpdater.paths[self.dirpath]:
InotifyUpdater.paths[self.dirpath][self.filename] = set()
InotifyUpdater.paths[self.dirpath][self.filename].add(self)
# Add watch
InotifyUpdater.wm.add_watch(self.dirpath, InotifyUpdater.MASK)
if refresh:
self.refreshData()
class ThreadedUpdaterThread(threading.Thread):
def __init__(self, updater: "ThreadedUpdater") -> None:
self.updater = updater
threading.Thread.__init__(self, daemon=True)
self.looping = True
def run(self) -> None:
try:
while self.looping:
self.updater.loop()
except BaseException as e:
log.error("Error with {}".format(self.updater))
log.error(e, exc_info=True)
self.updater.updateText("")
class ThreadedUpdater(Updater):
"""
Must implement loop(), and call start()
"""
def __init__(self) -> None:
Updater.__init__(self)
self.thread = ThreadedUpdaterThread(self)
def loop(self) -> None:
self.refreshData()
time.sleep(10)
def start(self) -> None:
self.thread.start()
class I3Updater(ThreadedUpdater):
# TODO OPTI One i3 connection for all
def __init__(self) -> None:
ThreadedUpdater.__init__(self)
self.i3 = i3ipc.Connection()
self.on = self.i3.on
self.start()
def loop(self) -> None:
self.i3.main()
class MergedUpdater(Updater):
def __init__(self, *args: Updater) -> None:
raise NotImplementedError("Deprecated, as hacky and currently unused")