frobar: Type hint

That was TEDIOUS
This commit is contained in:
Geoffrey Frogeye 2024-06-18 00:31:29 +02:00
parent a489830949
commit e09774c4ca
Signed by: geoffrey
GPG key ID: C72403E7F82E6AD8
4 changed files with 298 additions and 286 deletions

View file

@ -1,10 +1,14 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from frobar.providers import *
from frobar import providers as fp
from frobar.display import Bar, BarGroupType
from frobar.updaters import Updater
# TODO If multiple screen, expand the sections and share them # TODO If multiple screen, expand the sections and share them
# TODO Graceful exit # TODO Graceful exit
def run():
def run() -> None:
Bar.init() Bar.init()
Updater.init() Updater.init()
@ -19,7 +23,7 @@ def run():
full = short + " " + CUSTOM_SUFFIXES[i] full = short + " " + CUSTOM_SUFFIXES[i]
customNames[short] = full customNames[short] = full
Bar.addSectionAll( Bar.addSectionAll(
I3WorkspacesProvider( fp.I3WorkspacesProvider(
theme=WORKSPACE_THEME, theme=WORKSPACE_THEME,
themeFocus=FOCUS_THEME, themeFocus=FOCUS_THEME,
themeUrgent=URGENT_THEME, themeUrgent=URGENT_THEME,
@ -30,37 +34,40 @@ def run():
) )
# TODO Middle # TODO Middle
Bar.addSectionAll(MpdProvider(theme=9), BarGroupType.LEFT) Bar.addSectionAll(fp.MpdProvider(theme=9), BarGroupType.LEFT)
# Bar.addSectionAll(I3WindowTitleProvider(), BarGroupType.LEFT) # Bar.addSectionAll(I3WindowTitleProvider(), BarGroupType.LEFT)
# TODO Computer modes # TODO Computer modes
SYSTEM_THEME = 3 Bar.addSectionAll(fp.CpuProvider(), BarGroupType.RIGHT)
DANGER_THEME = 1 Bar.addSectionAll(fp.LoadProvider(), BarGroupType.RIGHT)
CRITICAL_THEME = 0 Bar.addSectionAll(fp.RamProvider(), BarGroupType.RIGHT)
Bar.addSectionAll(CpuProvider(), BarGroupType.RIGHT) Bar.addSectionAll(fp.TemperatureProvider(), BarGroupType.RIGHT)
Bar.addSectionAll(LoadProvider(), BarGroupType.RIGHT) Bar.addSectionAll(fp.BatteryProvider(), BarGroupType.RIGHT)
Bar.addSectionAll(RamProvider(), BarGroupType.RIGHT)
Bar.addSectionAll(TemperatureProvider(), BarGroupType.RIGHT)
Bar.addSectionAll(BatteryProvider(), BarGroupType.RIGHT)
# Peripherals # Peripherals
PERIPHERAL_THEME = 6 PERIPHERAL_THEME = 6
NETWORK_THEME = 5 NETWORK_THEME = 5
# TODO Disk space provider # TODO Disk space provider
# TODO Screen (connected, autorandr configuration, bbswitch) provider # TODO Screen (connected, autorandr configuration, bbswitch) provider
Bar.addSectionAll(XautolockProvider(theme=PERIPHERAL_THEME), BarGroupType.RIGHT) Bar.addSectionAll(fp.XautolockProvider(theme=PERIPHERAL_THEME), BarGroupType.RIGHT)
Bar.addSectionAll(PulseaudioProvider(theme=PERIPHERAL_THEME), BarGroupType.RIGHT) Bar.addSectionAll(fp.PulseaudioProvider(theme=PERIPHERAL_THEME), BarGroupType.RIGHT)
Bar.addSectionAll(RfkillProvider(theme=PERIPHERAL_THEME), BarGroupType.RIGHT) Bar.addSectionAll(fp.RfkillProvider(theme=PERIPHERAL_THEME), BarGroupType.RIGHT)
Bar.addSectionAll(NetworkProvider(theme=NETWORK_THEME), BarGroupType.RIGHT) Bar.addSectionAll(fp.NetworkProvider(theme=NETWORK_THEME), BarGroupType.RIGHT)
# Personal # Personal
PERSONAL_THEME = 7 # PERSONAL_THEME = 7
# Bar.addSectionAll(KeystoreProvider(theme=PERSONAL_THEME), BarGroupType.RIGHT) # Bar.addSectionAll(fp.KeystoreProvider(theme=PERSONAL_THEME), BarGroupType.RIGHT)
# Bar.addSectionAll(NotmuchUnreadProvider(dir='~/.mail/', theme=PERSONAL_THEME), BarGroupType.RIGHT) # Bar.addSectionAll(
# Bar.addSectionAll(TodoProvider(dir='~/.vdirsyncer/currentCalendars/', theme=PERSONAL_THEME), BarGroupType.RIGHT) # fp.NotmuchUnreadProvider(dir="~/.mail/", theme=PERSONAL_THEME),
# BarGroupType.RIGHT,
# )
# Bar.addSectionAll(
# fp.TodoProvider(dir="~/.vdirsyncer/currentCalendars/", theme=PERSONAL_THEME),
# BarGroupType.RIGHT,
# )
TIME_THEME = 4 TIME_THEME = 4
Bar.addSectionAll(TimeProvider(theme=TIME_THEME), BarGroupType.RIGHT) Bar.addSectionAll(fp.TimeProvider(theme=TIME_THEME), BarGroupType.RIGHT)
# Bar.run() # Bar.run()

View file

@ -1,4 +1,4 @@
#!/usr/bin/env python3 #!/usr/bin/env python3init
import enum import enum
import logging import logging
@ -7,6 +7,7 @@ import signal
import subprocess import subprocess
import threading import threading
import time import time
import typing
import coloredlogs import coloredlogs
import i3ipc import i3ipc
@ -29,6 +30,12 @@ log = logging.getLogger()
# TODO forceSize and changeText are different # TODO forceSize and changeText are different
Handle = typing.Callable[[], None]
Decorator = Handle | str | None
Element: typing.TypeAlias = typing.Union[str, "Text", None]
Part: typing.TypeAlias = typing.Union[str, "Text", "Section"]
class BarGroupType(enum.Enum): class BarGroupType(enum.Enum):
LEFT = 0 LEFT = 0
RIGHT = 1 RIGHT = 1
@ -40,6 +47,7 @@ class BarGroupType(enum.Enum):
class BarStdoutThread(threading.Thread): class BarStdoutThread(threading.Thread):
def run(self) -> None: def run(self) -> None:
while Bar.running: while Bar.running:
assert Bar.process.stdout
handle = Bar.process.stdout.readline().strip() handle = Bar.process.stdout.readline().strip()
if not len(handle): if not len(handle):
Bar.stop() Bar.stop()
@ -70,8 +78,7 @@ class Bar:
Bar.process = subprocess.Popen( Bar.process = subprocess.Popen(
cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE
) )
Bar.stdoutThread = BarStdoutThread() BarStdoutThread().start()
Bar.stdoutThread.start()
# Debug # Debug
Bar(0) Bar(0)
@ -90,7 +97,7 @@ class Bar:
Bar.forever() Bar.forever()
i3 = i3ipc.Connection() i3 = i3ipc.Connection()
def doStop(*args) -> None: def doStop(*args: list) -> None:
Bar.stop() Bar.stop()
print(88) print(88)
@ -102,17 +109,17 @@ class Bar:
Bar.stop() Bar.stop()
# Class globals # Class globals
everyone = set() everyone: set["Bar"] = set()
string = "" string = ""
process = None process: subprocess.Popen
running = False running = False
nextHandle = 0 nextHandle = 0
actionsF2H = dict() actionsF2H: dict[Handle, bytes] = dict()
actionsH2F = dict() actionsH2F: dict[bytes, Handle] = dict()
@staticmethod @staticmethod
def getFunctionHandle(function): def getFunctionHandle(function: typing.Callable[[], None]) -> bytes:
assert callable(function) assert callable(function)
if function in Bar.actionsF2H.keys(): if function in Bar.actionsF2H.keys():
return Bar.actionsF2H[function] return Bar.actionsF2H[function]
@ -126,12 +133,11 @@ class Bar:
return handle return handle
@staticmethod @staticmethod
def forever(): def forever() -> None:
Bar.process.wait() Bar.process.wait()
Bar.stop() Bar.stop()
def __init__(self, screen): def __init__(self, screen: int) -> None:
assert isinstance(screen, int)
self.screen = "%{S" + str(screen) + "}" self.screen = "%{S" + str(screen) + "}"
self.groups = dict() self.groups = dict()
@ -144,23 +150,21 @@ class Bar:
self.everyone.add(self) self.everyone.add(self)
@staticmethod @staticmethod
def addSectionAll(section, group, screens=None): def addSectionAll(
section: "Section", group: "BarGroupType", screens: None = None
) -> None:
""" """
.. note:: .. note::
Add the section before updating it for the first time. Add the section before updating it for the first time.
""" """
assert isinstance(section, Section)
assert isinstance(group, BarGroupType)
# TODO screens selection # TODO screens selection
for bar in Bar.everyone: for bar in Bar.everyone:
bar.addSection(section, group=group) bar.addSection(section, group=group)
def addSection(self, section, group): def addSection(self, section: "Section", group: "BarGroupType") -> None:
assert isinstance(section, Section)
assert isinstance(group, BarGroupType)
self.groups[group].addSection(section) self.groups[group].addSection(section)
def update(self): def update(self) -> None:
if self.childsChanged: if self.childsChanged:
self.string = self.screen self.string = self.screen
self.string += self.groups[BarGroupType.LEFT].string self.string += self.groups[BarGroupType.LEFT].string
@ -169,7 +173,7 @@ class Bar:
self.childsChanged = False self.childsChanged = False
@staticmethod @staticmethod
def updateAll(): def updateAll() -> None:
if Bar.running: if Bar.running:
Bar.string = "" Bar.string = ""
for bar in Bar.everyone: for bar in Bar.everyone:
@ -179,6 +183,7 @@ class Bar:
Bar.string += BarGroup.color(*Section.EMPTY) Bar.string += BarGroup.color(*Section.EMPTY)
# print(Bar.string) # print(Bar.string)
assert Bar.process.stdin
Bar.process.stdin.write(bytes(Bar.string + "\n", "utf-8")) Bar.process.stdin.write(bytes(Bar.string + "\n", "utf-8"))
Bar.process.stdin.flush() Bar.process.stdin.flush()
@ -188,18 +193,16 @@ class BarGroup:
One for each group of each bar One for each group of each bar
""" """
everyone = set() everyone: set["BarGroup"] = set()
def __init__(self, groupType, parent): def __init__(self, groupType: BarGroupType, parent: Bar):
assert isinstance(groupType, BarGroupType)
assert isinstance(parent, Bar)
self.groupType = groupType self.groupType = groupType
self.parent = parent self.parent = parent
self.sections = list() self.sections: list["Section"] = list()
self.string = "" self.string = ""
self.parts = [] self.parts: list[Part] = []
#: One of the sections that had their theme or visibility changed #: One of the sections that had their theme or visibility changed
self.childsThemeChanged = False self.childsThemeChanged = False
@ -209,11 +212,11 @@ class BarGroup:
BarGroup.everyone.add(self) BarGroup.everyone.add(self)
def addSection(self, section): def addSection(self, section: "Section") -> None:
self.sections.append(section) self.sections.append(section)
section.addParent(self) section.addParent(self)
def addSectionAfter(self, sectionRef, section): def addSectionAfter(self, sectionRef: "Section", section: "Section") -> None:
index = self.sections.index(sectionRef) index = self.sections.index(sectionRef)
self.sections.insert(index + 1, section) self.sections.insert(index + 1, section)
section.addParent(self) section.addParent(self)
@ -221,20 +224,20 @@ class BarGroup:
ALIGNS = {BarGroupType.LEFT: "%{l}", BarGroupType.RIGHT: "%{r}"} ALIGNS = {BarGroupType.LEFT: "%{l}", BarGroupType.RIGHT: "%{r}"}
@staticmethod @staticmethod
def fgColor(color): def fgColor(color: str) -> str:
return "%{F" + (color or "-") + "}" return "%{F" + (color or "-") + "}"
@staticmethod @staticmethod
def bgColor(color): def bgColor(color: str) -> str:
return "%{B" + (color or "-") + "}" return "%{B" + (color or "-") + "}"
@staticmethod @staticmethod
def color(fg, bg): def color(fg: str, bg: str) -> str:
return BarGroup.fgColor(fg) + BarGroup.bgColor(bg) return BarGroup.fgColor(fg) + BarGroup.bgColor(bg)
def update(self): def update(self) -> None:
if self.childsThemeChanged: if self.childsThemeChanged:
parts = [BarGroup.ALIGNS[self.groupType]] parts: list[Part] = [BarGroup.ALIGNS[self.groupType]]
secs = [sec for sec in self.sections if sec.visible] secs = [sec for sec in self.sections if sec.visible]
lenS = len(secs) lenS = len(secs)
@ -283,7 +286,7 @@ class BarGroup:
self.childsTextChanged = False self.childsTextChanged = False
@staticmethod @staticmethod
def updateAll(): def updateAll() -> None:
for group in BarGroup.everyone: for group in BarGroup.everyone:
group.update() group.update()
Bar.updateAll() Bar.updateAll()
@ -294,7 +297,7 @@ class SectionThread(threading.Thread):
ANIMATION_STOP = 0.001 ANIMATION_STOP = 0.001
ANIMATION_EVOLUTION = 0.9 ANIMATION_EVOLUTION = 0.9
def run(self): def run(self) -> None:
while Section.somethingChanged.wait(): while Section.somethingChanged.wait():
notBusy.wait() notBusy.wait()
Section.updateAll() Section.updateAll()
@ -311,6 +314,9 @@ class SectionThread(threading.Thread):
animTime = self.ANIMATION_STOP animTime = self.ANIMATION_STOP
Theme = tuple[str, str]
class Section: class Section:
# TODO Update all of that to base16 # TODO Update all of that to base16
COLORS = [ COLORS = [
@ -334,20 +340,20 @@ class Section:
FGCOLOR = "#fff0f1" FGCOLOR = "#fff0f1"
BGCOLOR = "#092c0e" BGCOLOR = "#092c0e"
THEMES = list() THEMES: list[Theme] = list()
EMPTY = (FGCOLOR, BGCOLOR) EMPTY: Theme = (FGCOLOR, BGCOLOR)
ICON = None ICON: str | None = None
PERSISTENT = False PERSISTENT = False
#: Sections that do not have their destination size #: Sections that do not have their destination size
sizeChanging = set() sizeChanging: set["Section"] = set()
updateThread = SectionThread(daemon=True) updateThread: threading.Thread = SectionThread(daemon=True)
somethingChanged = threading.Event() somethingChanged = threading.Event()
lastChosenTheme = 0 lastChosenTheme = 0
@staticmethod @staticmethod
def init(): def init() -> None:
for t in range(8, 16): for t in range(8, 16):
Section.THEMES.append((Section.COLORS[0], Section.COLORS[t])) Section.THEMES.append((Section.COLORS[0], Section.COLORS[t]))
Section.THEMES.append((Section.COLORS[0], Section.COLORS[3])) Section.THEMES.append((Section.COLORS[0], Section.COLORS[3]))
@ -355,7 +361,7 @@ class Section:
Section.updateThread.start() Section.updateThread.start()
def __init__(self, theme=None): def __init__(self, theme: int | None = None) -> None:
#: Displayed section #: Displayed section
#: Note: A section can be empty and displayed! #: Note: A section can be empty and displayed!
self.visible = False self.visible = False
@ -378,12 +384,12 @@ class Section:
self.dstSize = 0 self.dstSize = 0
#: Groups that have this section #: Groups that have this section
self.parents = set() self.parents: set[BarGroup] = set()
self.icon = self.ICON self.icon = self.ICON
self.persistent = self.PERSISTENT self.persistent = self.PERSISTENT
def __str__(self): def __str__(self) -> str:
try: try:
return "<{}><{}>{:01d}{}{:02d}/{:02d}".format( return "<{}><{}>{:01d}{}{:02d}/{:02d}".format(
self.curText, self.curText,
@ -393,26 +399,26 @@ class Section:
self.curSize, self.curSize,
self.dstSize, self.dstSize,
) )
except: except Exception:
return super().__str__() return super().__str__()
def addParent(self, parent): def addParent(self, parent: BarGroup) -> None:
self.parents.add(parent) self.parents.add(parent)
def appendAfter(self, section): def appendAfter(self, section: "Section") -> None:
assert len(self.parents) assert len(self.parents)
for parent in self.parents: for parent in self.parents:
parent.addSectionAfter(self, section) parent.addSectionAfter(self, section)
def informParentsThemeChanged(self): def informParentsThemeChanged(self) -> None:
for parent in self.parents: for parent in self.parents:
parent.childsThemeChanged = True parent.childsThemeChanged = True
def informParentsTextChanged(self): def informParentsTextChanged(self) -> None:
for parent in self.parents: for parent in self.parents:
parent.childsTextChanged = True parent.childsTextChanged = True
def updateText(self, text): def updateText(self, text: Element) -> None:
if isinstance(text, str): if isinstance(text, str):
text = Text(text) text = Text(text)
elif isinstance(text, Text) and not len(text.elements): elif isinstance(text, Text) and not len(text.elements):
@ -439,14 +445,13 @@ class Section:
Section.sizeChanging.add(self) Section.sizeChanging.add(self)
Section.somethingChanged.set() Section.somethingChanged.set()
def setDecorators(self, **kwargs): def setDecorators(self, **kwargs: Handle) -> None:
self.dstText.setDecorators(**kwargs) self.dstText.setDecorators(**kwargs)
self.curText = str(self.dstText) self.curText = str(self.dstText)
self.informParentsTextChanged() self.informParentsTextChanged()
Section.somethingChanged.set() Section.somethingChanged.set()
def updateTheme(self, theme): def updateTheme(self, theme: int) -> None:
assert isinstance(theme, int)
assert theme < len(Section.THEMES) assert theme < len(Section.THEMES)
if theme == self.theme: if theme == self.theme:
return return
@ -454,19 +459,18 @@ class Section:
self.informParentsThemeChanged() self.informParentsThemeChanged()
Section.somethingChanged.set() Section.somethingChanged.set()
def updateVisibility(self, visibility): def updateVisibility(self, visibility: bool) -> None:
assert isinstance(visibility, bool)
self.visible = visibility self.visible = visibility
self.informParentsThemeChanged() self.informParentsThemeChanged()
Section.somethingChanged.set() Section.somethingChanged.set()
@staticmethod @staticmethod
def fit(text, size): def fit(text: str, size: int) -> str:
t = len(text) t = len(text)
return text[:size] if t >= size else text + [" "] * (size - t) return text[:size] if t >= size else text + " " * (size - t)
def update(self): def update(self) -> None:
# TODO Might profit of a better logic # TODO Might profit of a better logic
if not self.visible: if not self.visible:
self.updateVisibility(True) self.updateVisibility(True)
@ -487,7 +491,7 @@ class Section:
self.informParentsTextChanged() self.informParentsTextChanged()
@staticmethod @staticmethod
def updateAll(): def updateAll() -> None:
""" """
Process all sections for text size changes Process all sections for text size changes
""" """
@ -500,7 +504,7 @@ class Section:
Section.somethingChanged.clear() Section.somethingChanged.clear()
@staticmethod @staticmethod
def ramp(p, ramp=" ▁▂▃▄▅▆▇█"): def ramp(p: float, ramp: str = " ▁▂▃▄▅▆▇█") -> str:
if p > 1: if p > 1:
return ramp[-1] return ramp[-1]
elif p < 0: elif p < 0:
@ -511,11 +515,11 @@ class Section:
class StatefulSection(Section): class StatefulSection(Section):
# TODO FEAT Allow to temporary expand the section (e.g. when important change) # TODO FEAT Allow to temporary expand the section (e.g. when important change)
NUMBER_STATES = None NUMBER_STATES: int
DEFAULT_STATE = 0 DEFAULT_STATE = 0
def __init__(self, *args, **kwargs): def __init__(self, theme: int | None) -> None:
Section.__init__(self, *args, **kwargs) Section.__init__(self, theme=theme)
self.state = self.DEFAULT_STATE self.state = self.DEFAULT_STATE
if hasattr(self, "onChangeState"): if hasattr(self, "onChangeState"):
self.onChangeState(self.state) self.onChangeState(self.state)
@ -523,20 +527,22 @@ class StatefulSection(Section):
clickLeft=self.incrementState, clickRight=self.decrementState clickLeft=self.incrementState, clickRight=self.decrementState
) )
def incrementState(self): def incrementState(self) -> None:
newState = min(self.state + 1, self.NUMBER_STATES - 1) newState = min(self.state + 1, self.NUMBER_STATES - 1)
self.changeState(newState) self.changeState(newState)
def decrementState(self): def decrementState(self) -> None:
newState = max(self.state - 1, 0) newState = max(self.state - 1, 0)
self.changeState(newState) self.changeState(newState)
def changeState(self, state): def changeState(self, state: int) -> None:
assert isinstance(state, int)
assert state < self.NUMBER_STATES assert state < self.NUMBER_STATES
self.state = state self.state = state
if hasattr(self, "onChangeState"): if hasattr(self, "onChangeState"):
self.onChangeState(state) self.onChangeState(state)
assert hasattr(
self, "refreshData"
), "StatefulSection should be paired with some Updater"
self.refreshData() self.refreshData()
@ -547,10 +553,13 @@ class ColorCountsSection(StatefulSection):
NUMBER_STATES = 3 NUMBER_STATES = 3
COLORABLE_ICON = "?" COLORABLE_ICON = "?"
def __init__(self, theme=None): def __init__(self, theme: None | int = None) -> None:
StatefulSection.__init__(self, theme=theme) StatefulSection.__init__(self, theme=theme)
def fetcher(self): def subfetcher(self) -> list[tuple[int, str]]:
raise NotImplementedError("Interface must be implemented")
def fetcher(self) -> typing.Union[None, "Text"]:
counts = self.subfetcher() counts = self.subfetcher()
# Nothing # Nothing
if not len(counts): if not len(counts):
@ -565,67 +574,66 @@ class ColorCountsSection(StatefulSection):
# Icon + Total # Icon + Total
elif self.state == 1 and len(counts) > 1: elif self.state == 1 and len(counts) > 1:
total = sum([count for count, color in counts]) total = sum([count for count, color in counts])
return Text(self.COLORABLE_ICON, " ", total) return Text(self.COLORABLE_ICON, " ", str(total))
# Icon + Counts # Icon + Counts
else: else:
text = Text(self.COLORABLE_ICON) text = Text(self.COLORABLE_ICON)
for count, color in counts: for count, color in counts:
text.append(" ", Text(count, fg=color)) text.append(" ", Text(str(count), fg=color))
return text return text
class Text: class Text:
def _setElements(self, elements): def _setDecorators(self, decorators: dict[str, Decorator]) -> None:
# TODO OPTI Concatenate consecutrive string
self.elements = list(elements)
def _setDecorators(self, decorators):
# TODO OPTI Convert no decorator to strings # TODO OPTI Convert no decorator to strings
self.decorators = decorators self.decorators = decorators
self.prefix = None self.prefix: str | None = None
self.suffix = None self.suffix: str | None = None
def __init__(self, *args: Element, **kwargs: Decorator) -> None:
# TODO OPTI Concatenate consecutrive string
self.elements = list(args)
def __init__(self, *args, **kwargs):
self._setElements(args)
self._setDecorators(kwargs) self._setDecorators(kwargs)
self.section = None self.section: Section
def append(self, *args): def append(self, *args: Element) -> None:
self._setElements(self.elements + list(args)) self.elements += list(args)
def prepend(self, *args): def prepend(self, *args: Element) -> None:
self._setElements(list(args) + self.elements) self.elements = list(args) + self.elements
def setElements(self, *args): def setElements(self, *args: Element) -> None:
self._setElements(args) self.elements = list(args)
def setDecorators(self, **kwargs): def setDecorators(self, **kwargs: Decorator) -> None:
self._setDecorators(kwargs) self._setDecorators(kwargs)
def setSection(self, section): def setSection(self, section: Section) -> None:
assert isinstance(section, Section)
self.section = section self.section = section
for element in self.elements: for element in self.elements:
if isinstance(element, Text): if isinstance(element, Text):
element.setSection(section) element.setSection(section)
def _genFixs(self): def _genFixs(self) -> None:
if self.prefix is not None and self.suffix is not None: if self.prefix is not None and self.suffix is not None:
return return
self.prefix = "" self.prefix = ""
self.suffix = "" self.suffix = ""
def nest(prefix, suffix): def nest(prefix: str, suffix: str) -> None:
assert self.prefix is not None
assert self.suffix is not None
self.prefix = self.prefix + "%{" + prefix + "}" self.prefix = self.prefix + "%{" + prefix + "}"
self.suffix = "%{" + suffix + "}" + self.suffix self.suffix = "%{" + suffix + "}" + self.suffix
def getColor(val): def getColor(val: str) -> str:
# TODO Allow themes # TODO Allow themes
assert isinstance(val, str) and len(val) == 7 assert len(val) == 7
return val return val
def button(number, function): def button(number: str, function: Handle) -> None:
handle = Bar.getFunctionHandle(function) handle = Bar.getFunctionHandle(function)
nest("A" + number + ":" + handle.decode() + ":", "A" + number) nest("A" + number + ":" + handle.decode() + ":", "A" + number)
@ -634,25 +642,34 @@ class Text:
continue continue
if key == "fg": if key == "fg":
reset = self.section.THEMES[self.section.theme][0] reset = self.section.THEMES[self.section.theme][0]
assert isinstance(val, str)
nest("F" + getColor(val), "F" + reset) nest("F" + getColor(val), "F" + reset)
elif key == "bg": elif key == "bg":
reset = self.section.THEMES[self.section.theme][1] reset = self.section.THEMES[self.section.theme][1]
assert isinstance(val, str)
nest("B" + getColor(val), "B" + reset) nest("B" + getColor(val), "B" + reset)
elif key == "clickLeft": elif key == "clickLeft":
assert callable(val)
button("1", val) button("1", val)
elif key == "clickMiddle": elif key == "clickMiddle":
assert callable(val)
button("2", val) button("2", val)
elif key == "clickRight": elif key == "clickRight":
assert callable(val)
button("3", val) button("3", val)
elif key == "scrollUp": elif key == "scrollUp":
assert callable(val)
button("4", val) button("4", val)
elif key == "scrollDown": elif key == "scrollDown":
assert callable(val)
button("5", val) button("5", val)
else: else:
log.warn("Unkown decorator: {}".format(key)) log.warn("Unkown decorator: {}".format(key))
def _text(self, size=None, pad=False): def _text(self, size: int | None = None, pad: bool = False) -> tuple[str, int]:
self._genFixs() self._genFixs()
assert self.prefix is not None
assert self.suffix is not None
curString = self.prefix curString = self.prefix
curSize = 0 curSize = 0
remSize = size remSize = size
@ -678,9 +695,11 @@ class Text:
curString += self.suffix curString += self.suffix
if pad and remSize > 0: if pad:
curString += " " * remSize assert remSize is not None
curSize += remSize if remSize > 0:
curString += " " * remSize
curSize += remSize
if size is not None: if size is not None:
if pad: if pad:
@ -689,12 +708,14 @@ class Text:
assert size >= curSize assert size >= curSize
return curString, curSize return curString, curSize
def text(self, *args, **kwargs): def text(self, size: int | None = None, pad: bool = False) -> str:
string, size = self._text(*args, **kwargs) string, size = self._text(size=size, pad=pad)
return string return string
def __str__(self): def __str__(self) -> str:
self._genFixs() self._genFixs()
assert self.prefix is not None
assert self.suffix is not None
curString = self.prefix curString = self.prefix
for element in self.elements: for element in self.elements:
if element is None: if element is None:
@ -704,7 +725,7 @@ class Text:
curString += self.suffix curString += self.suffix
return curString return curString
def __len__(self): def __len__(self) -> int:
curSize = 0 curSize = 0
for element in self.elements: for element in self.elements:
if element is None: if element is None:
@ -715,8 +736,8 @@ class Text:
curSize += len(str(element)) curSize += len(str(element))
return curSize return curSize
def __getitem__(self, index): def __getitem__(self, index: int) -> Element:
return self.elements[index] return self.elements[index]
def __setitem__(self, index, data): def __setitem__(self, index: int, data: Element) -> None:
self.elements[index] = data self.elements[index] = data

View file

@ -1,6 +1,7 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import datetime import datetime
import enum
import ipaddress import ipaddress
import json import json
import logging import logging
@ -8,15 +9,19 @@ import os
import random import random
import socket import socket
import subprocess import subprocess
import time
import coloredlogs import coloredlogs
import i3ipc
import mpd import mpd
import notmuch import notmuch
import psutil import psutil
import pulsectl import pulsectl
from frobar.display import * from frobar.display import (BarGroup, ColorCountsSection, Element, Section,
from frobar.updaters import * StatefulSection, Text)
from frobar.updaters import (I3Updater, InotifyUpdater, MergedUpdater,
PeriodicUpdater, ThreadedUpdater, Updater)
coloredlogs.install(level="DEBUG", fmt="%(levelname)s %(message)s") coloredlogs.install(level="DEBUG", fmt="%(levelname)s %(message)s")
log = logging.getLogger() log = logging.getLogger()
@ -25,7 +30,7 @@ log = logging.getLogger()
# PulseaudioProvider and MpdProvider) # PulseaudioProvider and MpdProvider)
def humanSize(num): def humanSize(num: int) -> str:
""" """
Returns a string of width 3+3 Returns a string of width 3+3
""" """
@ -35,11 +40,11 @@ def humanSize(num):
return "{:3d}{}".format(int(num), unit) return "{:3d}{}".format(int(num), unit)
else: else:
return "{:.1f}{}".format(num, unit) return "{:.1f}{}".format(num, unit)
num /= 1024.0 num //= 1024
return "{:d}YiB".format(num) return "{:d}YiB".format(num)
def randomColor(seed=0): def randomColor(seed: int | bytes = 0) -> str:
random.seed(seed) random.seed(seed)
return "#{:02x}{:02x}{:02x}".format(*[random.randint(0, 255) for _ in range(3)]) return "#{:02x}{:02x}{:02x}".format(*[random.randint(0, 255) for _ in range(3)])
@ -49,11 +54,11 @@ class TimeProvider(StatefulSection, PeriodicUpdater):
NUMBER_STATES = len(FORMATS) NUMBER_STATES = len(FORMATS)
DEFAULT_STATE = 1 DEFAULT_STATE = 1
def fetcher(self): def fetcher(self) -> str:
now = datetime.datetime.now() now = datetime.datetime.now()
return now.strftime(self.FORMATS[self.state]) return now.strftime(self.FORMATS[self.state])
def __init__(self, theme=None): def __init__(self, theme: int | None = None):
PeriodicUpdater.__init__(self) PeriodicUpdater.__init__(self)
StatefulSection.__init__(self, theme) StatefulSection.__init__(self, theme)
self.changeInterval(1) # TODO OPTI When state < 1 self.changeInterval(1) # TODO OPTI When state < 1
@ -67,10 +72,10 @@ class AlertLevel(enum.Enum):
class AlertingSection(StatefulSection): class AlertingSection(StatefulSection):
# TODO EASE Correct settings for themes # TODO EASE Correct settings for themes
THEMES = {AlertLevel.NORMAL: 3, AlertLevel.WARNING: 1, AlertLevel.DANGER: 0} ALERT_THEMES = {AlertLevel.NORMAL: 3, AlertLevel.WARNING: 1, AlertLevel.DANGER: 0}
PERSISTENT = True PERSISTENT = True
def getLevel(self, quantity): def getLevel(self, quantity: float) -> AlertLevel:
if quantity > self.dangerThresold: if quantity > self.dangerThresold:
return AlertLevel.DANGER return AlertLevel.DANGER
elif quantity > self.warningThresold: elif quantity > self.warningThresold:
@ -78,14 +83,14 @@ class AlertingSection(StatefulSection):
else: else:
return AlertLevel.NORMAL return AlertLevel.NORMAL
def updateLevel(self, quantity): def updateLevel(self, quantity: float) -> None:
self.level = self.getLevel(quantity) self.level = self.getLevel(quantity)
self.updateTheme(self.THEMES[self.level]) self.updateTheme(self.ALERT_THEMES[self.level])
if self.level == AlertLevel.NORMAL: if self.level == AlertLevel.NORMAL:
return return
# TODO Temporary update state # TODO Temporary update state
def __init__(self, theme): def __init__(self, theme: int | None = None):
StatefulSection.__init__(self, theme) StatefulSection.__init__(self, theme)
self.dangerThresold = 0.90 self.dangerThresold = 0.90
self.warningThresold = 0.75 self.warningThresold = 0.75
@ -95,7 +100,7 @@ class CpuProvider(AlertingSection, PeriodicUpdater):
NUMBER_STATES = 3 NUMBER_STATES = 3
ICON = "" ICON = ""
def fetcher(self): def fetcher(self) -> Element:
percent = psutil.cpu_percent(percpu=False) percent = psutil.cpu_percent(percpu=False)
self.updateLevel(percent / 100) self.updateLevel(percent / 100)
if self.state >= 2: if self.state >= 2:
@ -103,31 +108,35 @@ class CpuProvider(AlertingSection, PeriodicUpdater):
return "".join([Section.ramp(p / 100) for p in percents]) return "".join([Section.ramp(p / 100) for p in percents])
elif self.state >= 1: elif self.state >= 1:
return Section.ramp(percent / 100) return Section.ramp(percent / 100)
return ""
def __init__(self, theme=None): def __init__(self, theme: int | None = None):
AlertingSection.__init__(self, theme) AlertingSection.__init__(self, theme)
PeriodicUpdater.__init__(self) PeriodicUpdater.__init__(self)
self.changeInterval(1) self.changeInterval(1)
class LoadProvider(AlertingSection, PeriodicUpdater): class LoadProvider(AlertingSection, PeriodicUpdater):
NUMBER_STATES = 3 NUMBER_STATES = 3
ICON = "" ICON = ""
def fetcher(self): def fetcher(self) -> Element:
load = os.getloadavg() load = os.getloadavg()
self.updateLevel(load[0]) self.updateLevel(load[0])
if self.state >= 2: if self.state >= 2:
return " ".join(f"{load[i]:.2f}" for i in range(3)) return " ".join(f"{load[i]:.2f}" for i in range(3))
elif self.state >= 1: elif self.state >= 1:
return f"{load[0]:.2f}" return f"{load[0]:.2f}"
return ""
def __init__(self, theme=None): def __init__(self, theme: int | None = None):
AlertingSection.__init__(self, theme) AlertingSection.__init__(self, theme)
PeriodicUpdater.__init__(self) PeriodicUpdater.__init__(self)
self.changeInterval(5) self.changeInterval(5)
self.warningThresold = 5 self.warningThresold = 5
self.dangerThresold = 10 self.dangerThresold = 10
class RamProvider(AlertingSection, PeriodicUpdater): class RamProvider(AlertingSection, PeriodicUpdater):
""" """
Shows free RAM Shows free RAM
@ -136,7 +145,7 @@ class RamProvider(AlertingSection, PeriodicUpdater):
NUMBER_STATES = 4 NUMBER_STATES = 4
ICON = "" ICON = ""
def fetcher(self): def fetcher(self) -> Element:
mem = psutil.virtual_memory() mem = psutil.virtual_memory()
freePerc = mem.percent / 100 freePerc = mem.percent / 100
self.updateLevel(freePerc) self.updateLevel(freePerc)
@ -154,7 +163,7 @@ class RamProvider(AlertingSection, PeriodicUpdater):
return text return text
def __init__(self, theme=None): def __init__(self, theme: int | None = None):
AlertingSection.__init__(self, theme) AlertingSection.__init__(self, theme)
PeriodicUpdater.__init__(self) PeriodicUpdater.__init__(self)
self.changeInterval(1) self.changeInterval(1)
@ -166,7 +175,7 @@ class TemperatureProvider(AlertingSection, PeriodicUpdater):
MAIN_TEMPS = ["coretemp", "amdgpu", "cpu_thermal"] MAIN_TEMPS = ["coretemp", "amdgpu", "cpu_thermal"]
# For Intel, AMD and ARM respectively. # For Intel, AMD and ARM respectively.
def fetcher(self): def fetcher(self) -> Element:
allTemp = psutil.sensors_temperatures() allTemp = psutil.sensors_temperatures()
for main in self.MAIN_TEMPS: for main in self.MAIN_TEMPS:
if main in allTemp: if main in allTemp:
@ -182,8 +191,9 @@ class TemperatureProvider(AlertingSection, PeriodicUpdater):
self.icon = Section.ramp(temp.current / temp.high, self.RAMP) self.icon = Section.ramp(temp.current / temp.high, self.RAMP)
if self.state >= 1: if self.state >= 1:
return "{:.0f}°C".format(temp.current) return "{:.0f}°C".format(temp.current)
return ""
def __init__(self, theme=None): def __init__(self, theme: int | None = None):
AlertingSection.__init__(self, theme) AlertingSection.__init__(self, theme)
PeriodicUpdater.__init__(self) PeriodicUpdater.__init__(self)
self.changeInterval(5) self.changeInterval(5)
@ -194,10 +204,9 @@ class BatteryProvider(AlertingSection, PeriodicUpdater):
NUMBER_STATES = 3 NUMBER_STATES = 3
RAMP = "" RAMP = ""
def fetcher(self): def fetcher(self) -> Element:
bat = psutil.sensors_battery() bat = psutil.sensors_battery()
if not bat: if not bat:
self.icon = None
return None return None
self.icon = ("" if bat.power_plugged else "") + Section.ramp( self.icon = ("" if bat.power_plugged else "") + Section.ramp(
@ -207,7 +216,7 @@ class BatteryProvider(AlertingSection, PeriodicUpdater):
self.updateLevel(1 - bat.percent / 100) self.updateLevel(1 - bat.percent / 100)
if self.state < 1: if self.state < 1:
return return ""
t = Text("{:.0f}%".format(bat.percent)) t = Text("{:.0f}%".format(bat.percent))
@ -219,7 +228,7 @@ class BatteryProvider(AlertingSection, PeriodicUpdater):
t.append(" ({:d}:{:02d})".format(h, m)) t.append(" ({:d}:{:02d})".format(h, m))
return t return t
def __init__(self, theme=None): def __init__(self, theme: int | None = None):
AlertingSection.__init__(self, theme) AlertingSection.__init__(self, theme)
PeriodicUpdater.__init__(self) PeriodicUpdater.__init__(self)
self.changeInterval(5) self.changeInterval(5)
@ -238,7 +247,7 @@ class XautolockProvider(Section, InotifyUpdater):
else: else:
return "?" return "?"
def __init__(self, theme=None): def __init__(self, theme: int | None = None):
Section.__init__(self, theme=theme) Section.__init__(self, theme=theme)
InotifyUpdater.__init__(self) InotifyUpdater.__init__(self)
# TODO XDG # TODO XDG
@ -250,7 +259,7 @@ class PulseaudioProvider(StatefulSection, ThreadedUpdater):
NUMBER_STATES = 3 NUMBER_STATES = 3
DEFAULT_STATE = 1 DEFAULT_STATE = 1
def __init__(self, theme=None): def __init__(self, theme: int | None = None):
ThreadedUpdater.__init__(self) ThreadedUpdater.__init__(self)
StatefulSection.__init__(self, theme) StatefulSection.__init__(self, theme)
self.pulseEvents = pulsectl.Pulse("event-handler") self.pulseEvents = pulsectl.Pulse("event-handler")
@ -260,7 +269,7 @@ class PulseaudioProvider(StatefulSection, ThreadedUpdater):
self.start() self.start()
self.refreshData() self.refreshData()
def fetcher(self): def fetcher(self) -> Element:
sinks = [] sinks = []
with pulsectl.Pulse("list-sinks") as pulse: with pulsectl.Pulse("list-sinks") as pulse:
for sink in pulse.sink_list(): for sink in pulse.sink_list():
@ -293,10 +302,10 @@ class PulseaudioProvider(StatefulSection, ThreadedUpdater):
return Text(*sinks) return Text(*sinks)
def loop(self): def loop(self) -> None:
self.pulseEvents.event_listen() self.pulseEvents.event_listen()
def handleEvent(self, ev): def handleEvent(self, ev: pulsectl.PulseEventInfo) -> None:
self.refreshData() self.refreshData()
@ -304,7 +313,7 @@ class NetworkProviderSection(StatefulSection, Updater):
NUMBER_STATES = 5 NUMBER_STATES = 5
DEFAULT_STATE = 1 DEFAULT_STATE = 1
def actType(self): def actType(self) -> None:
self.ssid = None self.ssid = None
if self.iface.startswith("eth") or self.iface.startswith("enp"): if self.iface.startswith("eth") or self.iface.startswith("enp"):
if "u" in self.iface: if "u" in self.iface:
@ -325,10 +334,10 @@ class NetworkProviderSection(StatefulSection, Updater):
self.icon = "" self.icon = ""
elif self.iface.startswith("vboxnet"): elif self.iface.startswith("vboxnet"):
self.icon = "" self.icon = ""
else:
self.icon = "?"
def getAddresses(self): def getAddresses(
self,
) -> tuple[psutil._common.snicaddr, psutil._common.snicaddr]:
ipv4 = None ipv4 = None
ipv6 = None ipv6 = None
for address in self.parent.addrs[self.iface]: for address in self.parent.addrs[self.iface]:
@ -338,8 +347,8 @@ class NetworkProviderSection(StatefulSection, Updater):
ipv6 = address ipv6 = address
return ipv4, ipv6 return ipv4, ipv6
def fetcher(self): def fetcher(self) -> Element:
self.icon = None self.icon = "?"
self.persistent = False self.persistent = False
if ( if (
self.iface not in self.parent.stats self.iface not in self.parent.stats
@ -393,13 +402,13 @@ class NetworkProviderSection(StatefulSection, Updater):
return " ".join(text) return " ".join(text)
def onChangeState(self, state): def onChangeState(self, state: int) -> None:
self.showSsid = state >= 1 self.showSsid = state >= 1
self.showAddress = state >= 2 self.showAddress = state >= 2
self.showSpeed = state >= 3 self.showSpeed = state >= 3
self.showTransfer = state >= 4 self.showTransfer = state >= 4
def __init__(self, iface, parent): def __init__(self, iface: str, parent: "NetworkProvider"):
Updater.__init__(self) Updater.__init__(self)
StatefulSection.__init__(self, theme=parent.theme) StatefulSection.__init__(self, theme=parent.theme)
self.iface = iface self.iface = iface
@ -407,23 +416,23 @@ class NetworkProviderSection(StatefulSection, Updater):
class NetworkProvider(Section, PeriodicUpdater): class NetworkProvider(Section, PeriodicUpdater):
def fetchData(self): def fetchData(self) -> None:
self.prev = self.last self.prev = self.last
self.prevIO = self.IO self.prevIO = self.IO
self.stats = psutil.net_if_stats() self.stats = psutil.net_if_stats()
self.addrs = psutil.net_if_addrs() self.addrs: dict[str, list[psutil._common.snicaddr]] = psutil.net_if_addrs()
self.IO = psutil.net_io_counters(pernic=True) self.IO: dict[str, psutil._common.snetio] = psutil.net_io_counters(pernic=True)
self.ifaces = self.stats.keys() self.ifaces = self.stats.keys()
self.last = time.perf_counter() self.last: float = time.perf_counter()
self.dt = self.last - self.prev self.dt = self.last - self.prev
def fetcher(self): def fetcher(self) -> None:
self.fetchData() self.fetchData()
# Add missing sections # Add missing sections
lastSection = self lastSection: NetworkProvider | NetworkProviderSection = self
for iface in sorted(list(self.ifaces)): for iface in sorted(list(self.ifaces)):
if iface not in self.sections.keys(): if iface not in self.sections.keys():
section = NetworkProviderSection(iface, self) section = NetworkProviderSection(iface, self)
@ -439,15 +448,15 @@ class NetworkProvider(Section, PeriodicUpdater):
return None return None
def addParent(self, parent): def addParent(self, parent: BarGroup) -> None:
self.parents.add(parent) self.parents.add(parent)
self.refreshData() self.refreshData()
def __init__(self, theme=None): def __init__(self, theme: int | None = None):
PeriodicUpdater.__init__(self) PeriodicUpdater.__init__(self)
Section.__init__(self, theme) Section.__init__(self, theme)
self.sections = dict() self.sections: dict[str, NetworkProviderSection] = dict()
self.last = 0 self.last = 0
self.IO = dict() self.IO = dict()
self.fetchData() self.fetchData()
@ -459,7 +468,7 @@ class RfkillProvider(Section, PeriodicUpdater):
# toggled # toggled
PATH = "/sys/class/rfkill" PATH = "/sys/class/rfkill"
def fetcher(self): def fetcher(self) -> Element:
t = Text() t = Text()
for device in os.listdir(self.PATH): for device in os.listdir(self.PATH):
with open(os.path.join(self.PATH, device, "soft"), "rb") as f: with open(os.path.join(self.PATH, device, "soft"), "rb") as f:
@ -473,7 +482,7 @@ class RfkillProvider(Section, PeriodicUpdater):
with open(os.path.join(self.PATH, device, "type"), "rb") as f: with open(os.path.join(self.PATH, device, "type"), "rb") as f:
typ = f.read().strip() typ = f.read().strip()
fg = (hardBlocked and "#CCCCCC") or (softBlocked and "#FF0000") fg = (hardBlocked and "#CCCCCC") or (softBlocked and "#FF0000") or None
if typ == b"wlan": if typ == b"wlan":
icon = "" icon = ""
elif typ == b"bluetooth": elif typ == b"bluetooth":
@ -484,14 +493,14 @@ class RfkillProvider(Section, PeriodicUpdater):
t.append(Text(icon, fg=fg)) t.append(Text(icon, fg=fg))
return t return t
def __init__(self, theme=None): def __init__(self, theme: int | None = None):
PeriodicUpdater.__init__(self) PeriodicUpdater.__init__(self)
Section.__init__(self, theme) Section.__init__(self, theme)
self.changeInterval(5) self.changeInterval(5)
class SshAgentProvider(PeriodicUpdater): class SshAgentProvider(PeriodicUpdater):
def fetcher(self): def fetcher(self) -> Element:
cmd = ["ssh-add", "-l"] cmd = ["ssh-add", "-l"]
proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL) proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
if proc.returncode != 0: if proc.returncode != 0:
@ -504,13 +513,13 @@ class SshAgentProvider(PeriodicUpdater):
text.append(Text("", fg=randomColor(seed=fingerprint))) text.append(Text("", fg=randomColor(seed=fingerprint)))
return text return text
def __init__(self): def __init__(self) -> None:
PeriodicUpdater.__init__(self) PeriodicUpdater.__init__(self)
self.changeInterval(5) self.changeInterval(5)
class GpgAgentProvider(PeriodicUpdater): class GpgAgentProvider(PeriodicUpdater):
def fetcher(self): def fetcher(self) -> Element:
cmd = ["gpg-connect-agent", "keyinfo --list", "/bye"] cmd = ["gpg-connect-agent", "keyinfo --list", "/bye"]
proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL) proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
# proc = subprocess.run(cmd) # proc = subprocess.run(cmd)
@ -527,7 +536,7 @@ class GpgAgentProvider(PeriodicUpdater):
text.append(Text("", fg=randomColor(seed=keygrip))) text.append(Text("", fg=randomColor(seed=keygrip)))
return text return text
def __init__(self): def __init__(self) -> None:
PeriodicUpdater.__init__(self) PeriodicUpdater.__init__(self)
self.changeInterval(5) self.changeInterval(5)
@ -536,7 +545,7 @@ class KeystoreProvider(Section, MergedUpdater):
# TODO OPTI+FEAT Use ColorCountsSection and not MergedUpdater, this is useless # TODO OPTI+FEAT Use ColorCountsSection and not MergedUpdater, this is useless
ICON = "" ICON = ""
def __init__(self, theme=None): def __init__(self, theme: int | None = None):
MergedUpdater.__init__(self, SshAgentProvider(), GpgAgentProvider()) MergedUpdater.__init__(self, SshAgentProvider(), GpgAgentProvider())
Section.__init__(self, theme) Section.__init__(self, theme)
@ -544,24 +553,21 @@ class KeystoreProvider(Section, MergedUpdater):
class NotmuchUnreadProvider(ColorCountsSection, InotifyUpdater): class NotmuchUnreadProvider(ColorCountsSection, InotifyUpdater):
COLORABLE_ICON = "" COLORABLE_ICON = ""
def subfetcher(self): def subfetcher(self) -> list[tuple[int, str]]:
db = notmuch.Database(mode=notmuch.Database.MODE.READ_ONLY, path=self.dir) db = notmuch.Database(mode=notmuch.Database.MODE.READ_ONLY, path=self.dir)
counts = [] counts = []
for account in self.accounts: for account in self.accounts:
queryStr = "folder:/{}/ and tag:unread".format(account) queryStr = "folder:/{}/ and tag:unread".format(account)
query = notmuch.Query(db, queryStr) query = notmuch.Query(db, queryStr)
nbMsgs = query.count_messages() nbMsgs = query.count_messages()
if account == "frogeye":
global q
q = query
if nbMsgs < 1: if nbMsgs < 1:
continue continue
counts.append((nbMsgs, self.colors[account])) counts.append((nbMsgs, self.colors[account]))
# db.close() # db.close()
return counts return counts
def __init__(self, dir="~/.mail/", theme=None): def __init__(self, dir: str = "~/.mail/", theme: int | None = None):
PeriodicUpdater.__init__(self) InotifyUpdater.__init__(self)
ColorCountsSection.__init__(self, theme) ColorCountsSection.__init__(self, theme)
self.dir = os.path.realpath(os.path.expanduser(dir)) self.dir = os.path.realpath(os.path.expanduser(dir))
@ -587,7 +593,7 @@ class TodoProvider(ColorCountsSection, InotifyUpdater):
# TODO OPT Specific callback for specific directory # TODO OPT Specific callback for specific directory
COLORABLE_ICON = "" COLORABLE_ICON = ""
def updateCalendarList(self): def updateCalendarList(self) -> None:
calendars = sorted(os.listdir(self.dir)) calendars = sorted(os.listdir(self.dir))
for calendar in calendars: for calendar in calendars:
# If the calendar wasn't in the list # If the calendar wasn't in the list
@ -603,9 +609,9 @@ class TodoProvider(ColorCountsSection, InotifyUpdater):
path = os.path.join(self.dir, calendar, "color") path = os.path.join(self.dir, calendar, "color")
with open(path, "r") as f: with open(path, "r") as f:
self.colors[calendar] = f.read().strip() self.colors[calendar] = f.read().strip()
self.calendars = calendars self.calendars: list[str] = calendars
def __init__(self, dir, theme=None): def __init__(self, dir: str, theme: int | None = None):
""" """
:parm str dir: [main]path value in todoman.conf :parm str dir: [main]path value in todoman.conf
""" """
@ -615,12 +621,12 @@ class TodoProvider(ColorCountsSection, InotifyUpdater):
assert os.path.isdir(self.dir) assert os.path.isdir(self.dir)
self.calendars = [] self.calendars = []
self.colors = dict() self.colors: dict[str, str] = dict()
self.names = dict() self.names: dict[str, str] = dict()
self.updateCalendarList() self.updateCalendarList()
self.refreshData() self.refreshData()
def countUndone(self, calendar): def countUndone(self, calendar: str | None) -> int:
cmd = ["todo", "--porcelain", "list"] cmd = ["todo", "--porcelain", "list"]
if calendar: if calendar:
cmd.append(self.names[calendar]) cmd.append(self.names[calendar])
@ -628,7 +634,7 @@ class TodoProvider(ColorCountsSection, InotifyUpdater):
data = json.loads(proc.stdout) data = json.loads(proc.stdout)
return len(data) return len(data)
def subfetcher(self): def subfetcher(self) -> list[tuple[int, str]]:
counts = [] counts = []
# TODO This an ugly optimisation that cuts on features, but todoman # TODO This an ugly optimisation that cuts on features, but todoman
@ -653,17 +659,17 @@ class I3WindowTitleProvider(Section, I3Updater):
# TODO FEAT To make this available from start, we need to find the # TODO FEAT To make this available from start, we need to find the
# `focused=True` element following the `focus` array # `focused=True` element following the `focus` array
# TODO Feat Make this output dependant if wanted # TODO Feat Make this output dependant if wanted
def on_window(self, i3, e): def on_window(self, i3: i3ipc.Connection, e: i3ipc.Event) -> None:
self.updateText(e.container.name) self.updateText(e.container.name)
def __init__(self, theme=None): def __init__(self, theme: int | None = None):
I3Updater.__init__(self) I3Updater.__init__(self)
Section.__init__(self, theme=theme) Section.__init__(self, theme=theme)
self.on("window", self.on_window) self.on("window", self.on_window)
class I3WorkspacesProviderSection(Section): class I3WorkspacesProviderSection(Section):
def selectTheme(self): def selectTheme(self) -> int:
if self.urgent: if self.urgent:
return self.parent.themeUrgent return self.parent.themeUrgent
elif self.focused: elif self.focused:
@ -674,39 +680,39 @@ class I3WorkspacesProviderSection(Section):
# TODO On mode change the state (shown / hidden) gets overriden so every # TODO On mode change the state (shown / hidden) gets overriden so every
# tab is shown # tab is shown
def show(self): def show(self) -> None:
self.updateTheme(self.selectTheme()) self.updateTheme(self.selectTheme())
self.updateText(self.fullName if self.focused else self.shortName) self.updateText(self.fullName if self.focused else self.shortName)
def changeState(self, focused, urgent): def changeState(self, focused: bool, urgent: bool) -> None:
self.focused = focused self.focused = focused
self.urgent = urgent self.urgent = urgent
self.show() self.show()
def setName(self, name): def setName(self, name: str) -> None:
self.shortName = name self.shortName = name
self.fullName = ( self.fullName: str = (
self.parent.customNames[name] if name in self.parent.customNames else name self.parent.customNames[name] if name in self.parent.customNames else name
) )
def switchTo(self): def switchTo(self) -> None:
self.parent.i3.command("workspace {}".format(self.shortName)) self.parent.i3.command("workspace {}".format(self.shortName))
def __init__(self, name, parent): def __init__(self, name: str, parent: "I3WorkspacesProvider"):
Section.__init__(self) Section.__init__(self)
self.parent = parent self.parent = parent
self.setName(name) self.setName(name)
self.setDecorators(clickLeft=self.switchTo) self.setDecorators(clickLeft=self.switchTo)
self.tempText = None self.tempText: Element = None
def empty(self): def empty(self) -> None:
self.updateTheme(self.parent.themeNormal) self.updateTheme(self.parent.themeNormal)
self.updateText(None) self.updateText(None)
def tempShow(self): def tempShow(self) -> None:
self.updateText(self.tempText) self.updateText(self.tempText)
def tempEmpty(self): def tempEmpty(self) -> None:
self.tempText = self.dstText[1] self.tempText = self.dstText[1]
self.updateText(None) self.updateText(None)
@ -714,7 +720,7 @@ class I3WorkspacesProviderSection(Section):
class I3WorkspacesProvider(Section, I3Updater): class I3WorkspacesProvider(Section, I3Updater):
# TODO FEAT Multi-screen # TODO FEAT Multi-screen
def initialPopulation(self, parent): def initialPopulation(self, parent: BarGroup) -> None:
""" """
Called on init Called on init
Can't reuse addWorkspace since i3.get_workspaces() gives dict and not Can't reuse addWorkspace since i3.get_workspaces() gives dict and not
@ -735,7 +741,7 @@ class I3WorkspacesProvider(Section, I3Updater):
lastSection = section lastSection = section
def on_workspace_init(self, i3, e): def on_workspace_init(self, i3: i3ipc.Connection, e: i3ipc.Event) -> None:
workspace = e.current workspace = e.current
i = workspace.num i = workspace.num
if i in self.sections: if i in self.sections:
@ -753,24 +759,24 @@ class I3WorkspacesProvider(Section, I3Updater):
section.urgent = workspace.urgent section.urgent = workspace.urgent
section.show() section.show()
def on_workspace_empty(self, i3, e): def on_workspace_empty(self, i3: i3ipc.Connection, e: i3ipc.Event) -> None:
self.sections[e.current.num].empty() self.sections[e.current.num].empty()
def on_workspace_focus(self, i3, e): def on_workspace_focus(self, i3: i3ipc.Connection, e: i3ipc.Event) -> None:
self.sections[e.old.num].focused = False self.sections[e.old.num].focused = False
self.sections[e.old.num].show() self.sections[e.old.num].show()
self.sections[e.current.num].focused = True self.sections[e.current.num].focused = True
self.sections[e.current.num].show() self.sections[e.current.num].show()
def on_workspace_urgent(self, i3, e): def on_workspace_urgent(self, i3: i3ipc.Connection, e: i3ipc.Event) -> None:
self.sections[e.current.num].urgent = e.current.urgent self.sections[e.current.num].urgent = e.current.urgent
self.sections[e.current.num].show() self.sections[e.current.num].show()
def on_workspace_rename(self, i3, e): def on_workspace_rename(self, i3: i3ipc.Connection, e: i3ipc.Event) -> None:
self.sections[e.current.num].setName(e.name) self.sections[e.current.num].setName(e.name)
self.sections[e.current.num].show() self.sections[e.current.num].show()
def on_mode(self, i3, e): def on_mode(self, i3: i3ipc.Connection, e: i3ipc.Event) -> None:
if e.change == "default": if e.change == "default":
self.modeSection.updateText(None) self.modeSection.updateText(None)
for section in self.sections.values(): for section in self.sections.values():
@ -781,7 +787,12 @@ class I3WorkspacesProvider(Section, I3Updater):
section.tempEmpty() section.tempEmpty()
def __init__( def __init__(
self, theme=0, themeFocus=3, themeUrgent=1, themeMode=2, customNames=dict() self,
theme: int = 0,
themeFocus: int = 3,
themeUrgent: int = 1,
themeMode: int = 2,
customNames: dict[str, str] = dict(),
): ):
I3Updater.__init__(self) I3Updater.__init__(self)
Section.__init__(self) Section.__init__(self)
@ -790,7 +801,7 @@ class I3WorkspacesProvider(Section, I3Updater):
self.themeUrgent = themeUrgent self.themeUrgent = themeUrgent
self.customNames = customNames self.customNames = customNames
self.sections = dict() self.sections: dict[str, I3WorkspacesProviderSection] = dict()
self.on("workspace::init", self.on_workspace_init) self.on("workspace::init", self.on_workspace_init)
self.on("workspace::focus", self.on_workspace_focus) self.on("workspace::focus", self.on_workspace_focus)
self.on("workspace::empty", self.on_workspace_empty) self.on("workspace::empty", self.on_workspace_empty)
@ -801,7 +812,7 @@ class I3WorkspacesProvider(Section, I3Updater):
self.on("mode", self.on_mode) self.on("mode", self.on_mode)
self.modeSection = Section(theme=themeMode) self.modeSection = Section(theme=themeMode)
def addParent(self, parent): def addParent(self, parent: BarGroup) -> None:
self.parents.add(parent) self.parents.add(parent)
parent.addSection(self.modeSection) parent.addSection(self.modeSection)
self.initialPopulation(parent) self.initialPopulation(parent)
@ -812,10 +823,10 @@ class MpdProvider(Section, ThreadedUpdater):
MAX_LENGTH = 50 MAX_LENGTH = 50
def connect(self): def connect(self) -> None:
self.mpd.connect("localhost", 6600) self.mpd.connect("localhost", 6600)
def __init__(self, theme=None): def __init__(self, theme: int | None = None):
ThreadedUpdater.__init__(self) ThreadedUpdater.__init__(self)
Section.__init__(self, theme) Section.__init__(self, theme)
@ -824,7 +835,7 @@ class MpdProvider(Section, ThreadedUpdater):
self.refreshData() self.refreshData()
self.start() self.start()
def fetcher(self): def fetcher(self) -> Element:
stat = self.mpd.status() stat = self.mpd.status()
if not len(stat) or stat["state"] == "stop": if not len(stat) or stat["state"] == "stop":
return None return None
@ -835,7 +846,7 @@ class MpdProvider(Section, ThreadedUpdater):
infos = [] infos = []
def tryAdd(field): def tryAdd(field: str) -> None:
if field in cur: if field in cur:
infos.append(cur[field]) infos.append(cur[field])
@ -849,7 +860,7 @@ class MpdProvider(Section, ThreadedUpdater):
return "{}".format(infosStr) return "{}".format(infosStr)
def loop(self): def loop(self) -> None:
try: try:
self.mpd.idle("player") self.mpd.idle("player")
self.refreshData() self.refreshData()

View file

@ -11,7 +11,7 @@ import coloredlogs
import i3ipc import i3ipc
import pyinotify import pyinotify
from frobar.display import Text from frobar.display import Element
from frobar.notbusy import notBusy from frobar.notbusy import notBusy
coloredlogs.install(level="DEBUG", fmt="%(levelname)s %(message)s") coloredlogs.install(level="DEBUG", fmt="%(levelname)s %(message)s")
@ -20,24 +20,23 @@ log = logging.getLogger()
# TODO Sync bar update with PeriodicUpdater updates # TODO Sync bar update with PeriodicUpdater updates
class Updater: class Updater:
@staticmethod @staticmethod
def init(): def init() -> None:
PeriodicUpdater.init() PeriodicUpdater.init()
InotifyUpdater.init() InotifyUpdater.init()
notBusy.set() notBusy.set()
def updateText(self, text): def updateText(self, text: Element) -> None:
print(text) print(text)
def fetcher(self): def fetcher(self) -> Element:
return "{} refreshed".format(self) return "{} refreshed".format(self)
def __init__(self): def __init__(self) -> None:
self.lock = threading.Lock() self.lock = threading.Lock()
def refreshData(self): def refreshData(self) -> None:
# TODO OPTI Maybe discard the refresh if there's already another one? # TODO OPTI Maybe discard the refresh if there's already another one?
self.lock.acquire() self.lock.acquire()
try: try:
@ -50,7 +49,7 @@ class Updater:
class PeriodicUpdaterThread(threading.Thread): class PeriodicUpdaterThread(threading.Thread):
def run(self): def run(self) -> None:
# TODO Sync with system clock # TODO Sync with system clock
counter = 0 counter = 0
while True: while True:
@ -67,6 +66,7 @@ class PeriodicUpdaterThread(threading.Thread):
provider.refreshData() provider.refreshData()
else: else:
notBusy.clear() notBusy.clear()
assert PeriodicUpdater.intervalStep is not None
counter += PeriodicUpdater.intervalStep counter += PeriodicUpdater.intervalStep
counter = counter % PeriodicUpdater.intervalLoop counter = counter % PeriodicUpdater.intervalLoop
for interval in PeriodicUpdater.intervals.keys(): for interval in PeriodicUpdater.intervals.keys():
@ -80,43 +80,42 @@ class PeriodicUpdater(Updater):
Needs to call :func:`PeriodicUpdater.changeInterval` in `__init__` Needs to call :func:`PeriodicUpdater.changeInterval` in `__init__`
""" """
intervals = dict() intervals: dict[int, set["PeriodicUpdater"]] = dict()
intervalStep = None intervalStep: int | None = None
intervalLoop = None intervalLoop: int
updateThread = PeriodicUpdaterThread(daemon=True) updateThread: threading.Thread = PeriodicUpdaterThread(daemon=True)
intervalsChanged = threading.Event() intervalsChanged = threading.Event()
@staticmethod @staticmethod
def gcds(*args): def gcds(*args: int) -> int:
return functools.reduce(math.gcd, args) return functools.reduce(math.gcd, args)
@staticmethod @staticmethod
def lcm(a, b): def lcm(a: int, b: int) -> int:
"""Return lowest common multiple.""" """Return lowest common multiple."""
return a * b // math.gcd(a, b) return a * b // math.gcd(a, b)
@staticmethod @staticmethod
def lcms(*args): def lcms(*args: int) -> int:
"""Return lowest common multiple.""" """Return lowest common multiple."""
return functools.reduce(PeriodicUpdater.lcm, args) return functools.reduce(PeriodicUpdater.lcm, args)
@staticmethod @staticmethod
def updateIntervals(): def updateIntervals() -> None:
intervalsList = list(PeriodicUpdater.intervals.keys()) intervalsList = list(PeriodicUpdater.intervals.keys())
PeriodicUpdater.intervalStep = PeriodicUpdater.gcds(*intervalsList) PeriodicUpdater.intervalStep = PeriodicUpdater.gcds(*intervalsList)
PeriodicUpdater.intervalLoop = PeriodicUpdater.lcms(*intervalsList) PeriodicUpdater.intervalLoop = PeriodicUpdater.lcms(*intervalsList)
PeriodicUpdater.intervalsChanged.set() PeriodicUpdater.intervalsChanged.set()
@staticmethod @staticmethod
def init(): def init() -> None:
PeriodicUpdater.updateThread.start() PeriodicUpdater.updateThread.start()
def __init__(self): def __init__(self) -> None:
Updater.__init__(self) Updater.__init__(self)
self.interval = None self.interval: int | None = None
def changeInterval(self, interval): def changeInterval(self, interval: int) -> None:
assert isinstance(interval, int)
if self.interval is not None: if self.interval is not None:
PeriodicUpdater.intervals[self.interval].remove(self) PeriodicUpdater.intervals[self.interval].remove(self)
@ -131,7 +130,7 @@ class PeriodicUpdater(Updater):
class InotifyUpdaterEventHandler(pyinotify.ProcessEvent): class InotifyUpdaterEventHandler(pyinotify.ProcessEvent):
def process_default(self, event): def process_default(self, event: pyinotify.Event) -> None:
# DEBUG # DEBUG
# from pprint import pprint # from pprint import pprint
# pprint(event.__dict__) # pprint(event.__dict__)
@ -154,10 +153,10 @@ class InotifyUpdater(Updater):
""" """
wm = pyinotify.WatchManager() wm = pyinotify.WatchManager()
paths = dict() paths: dict[str, dict[str | int, set["InotifyUpdater"]]] = dict()
@staticmethod @staticmethod
def init(): def init() -> None:
notifier = pyinotify.ThreadedNotifier( notifier = pyinotify.ThreadedNotifier(
InotifyUpdater.wm, InotifyUpdaterEventHandler() InotifyUpdater.wm, InotifyUpdaterEventHandler()
) )
@ -166,14 +165,14 @@ class InotifyUpdater(Updater):
# TODO Mask for folders # TODO Mask for folders
MASK = pyinotify.IN_CREATE | pyinotify.IN_MODIFY | pyinotify.IN_DELETE MASK = pyinotify.IN_CREATE | pyinotify.IN_MODIFY | pyinotify.IN_DELETE
def addPath(self, path, refresh=True): def addPath(self, path: str, refresh: bool = True) -> None:
path = os.path.realpath(os.path.expanduser(path)) path = os.path.realpath(os.path.expanduser(path))
# Detect if file or folder # Detect if file or folder
if os.path.isdir(path): if os.path.isdir(path):
self.dirpath = path self.dirpath: str = path
# 0: Directory watcher # 0: Directory watcher
self.filename = 0 self.filename: str | int = 0
elif os.path.isfile(path): elif os.path.isfile(path):
self.dirpath = os.path.dirname(path) self.dirpath = os.path.dirname(path)
self.filename = os.path.basename(path) self.filename = os.path.basename(path)
@ -195,12 +194,12 @@ class InotifyUpdater(Updater):
class ThreadedUpdaterThread(threading.Thread): class ThreadedUpdaterThread(threading.Thread):
def __init__(self, updater, *args, **kwargs): def __init__(self, updater: "ThreadedUpdater") -> None:
self.updater = updater self.updater = updater
threading.Thread.__init__(self, *args, **kwargs) threading.Thread.__init__(self, daemon=True)
self.looping = True self.looping = True
def run(self): def run(self) -> None:
try: try:
while self.looping: while self.looping:
self.updater.loop() self.updater.loop()
@ -215,57 +214,31 @@ class ThreadedUpdater(Updater):
Must implement loop(), and call start() Must implement loop(), and call start()
""" """
def __init__(self): def __init__(self) -> None:
Updater.__init__(self) Updater.__init__(self)
self.thread = ThreadedUpdaterThread(self, daemon=True) self.thread = ThreadedUpdaterThread(self)
def loop(self): def loop(self) -> None:
self.refreshData() self.refreshData()
time.sleep(10) time.sleep(10)
def start(self): def start(self) -> None:
self.thread.start() self.thread.start()
class I3Updater(ThreadedUpdater): class I3Updater(ThreadedUpdater):
# TODO OPTI One i3 connection for all # TODO OPTI One i3 connection for all
def __init__(self): def __init__(self) -> None:
ThreadedUpdater.__init__(self) ThreadedUpdater.__init__(self)
self.i3 = i3ipc.Connection() self.i3 = i3ipc.Connection()
self.on = self.i3.on
self.start() self.start()
def on(self, event, function): def loop(self) -> None:
self.i3.on(event, function)
def loop(self):
self.i3.main() self.i3.main()
class MergedUpdater(Updater): class MergedUpdater(Updater):
# TODO OPTI Do not update until end of periodic batch def __init__(self, *args: Updater) -> None:
def fetcher(self): raise NotImplementedError("Deprecated, as hacky and currently unused")
text = Text()
for updater in self.updaters:
text.append(self.texts[updater])
if not len(text):
return None
return text
def __init__(self, *args):
Updater.__init__(self)
self.updaters = []
self.texts = dict()
for updater in args:
assert isinstance(updater, Updater)
def newUpdateText(updater, text):
self.texts[updater] = text
self.refreshData()
updater.updateText = newUpdateText.__get__(updater, Updater)
self.updaters.append(updater)
self.texts[updater] = ""