#!/usr/bin/env python3 import asyncio import datetime import enum import logging import random import signal import typing import coloredlogs import i3ipc import i3ipc.aio import psutil coloredlogs.install(level="DEBUG", fmt="%(levelname)s %(message)s") log = logging.getLogger() T = typing.TypeVar("T", bound="ComposableText") P = typing.TypeVar("P", bound="ComposableText") C = typing.TypeVar("C", bound="ComposableText") Sortable = str | int class ComposableText(typing.Generic[P, C]): def __init__( self, parent: typing.Optional[P] = None, sortKey: Sortable = 0, ) -> None: self.parent: typing.Optional[P] = None self.children: typing.MutableSequence[C] = list() self.sortKey = sortKey if parent: self.setParent(parent) self.bar = self.getFirstParentOfType(Bar) def setParent(self, parent: P) -> None: assert self.parent is None parent.children.append(self) assert isinstance(parent.children, list) parent.children.sort(key=lambda c: c.sortKey) self.parent = parent self.parent.updateMarkup() def unsetParent(self) -> None: assert self.parent self.parent.children.remove(self) self.parent.updateMarkup() self.parent = None def getFirstParentOfType(self, typ: typing.Type[T]) -> T: parent = self while not isinstance(parent, typ): assert parent.parent, f"{self} doesn't have a parent of {typ}" parent = parent.parent return parent def updateMarkup(self) -> None: self.bar.refresh.set() # TODO OPTI See if worth caching the output def generateMarkup(self) -> str: raise NotImplementedError(f"{self} cannot generate markup") def getMarkup(self) -> str: return self.generateMarkup() def randomColor(seed: int | bytes | None = None) -> str: if seed is not None: random.seed(seed) return "#" + "".join(f"{random.randint(0, 0xff):02x}" for _ in range(3)) class Button(enum.Enum): CLICK_LEFT = "1" CLICK_MIDDLE = "2" CLICK_RIGHT = "3" SCROLL_UP = "4" SCROLL_DOWN = "5" class Section(ComposableText): """ Colorable block separated by chevrons """ def __init__(self, parent: "Module", sortKey: Sortable = 0) -> None: super().__init__(parent=parent, sortKey=sortKey) self.parent: "Module" self.color = randomColor() self.desiredText: str | None = None self.text = "" self.targetSize = -1 self.size = -1 self.animationTask: asyncio.Task | None = None self.actions: dict[Button, str] = dict() def isHidden(self) -> bool: return self.size < 0 # Geometric series, with a cap ANIM_A = 0.025 ANIM_R = 0.9 ANIM_MIN = 0.001 async def animate(self) -> None: increment = 1 if self.size < self.targetSize else -1 loop = asyncio.get_running_loop() frameTime = loop.time() animTime = self.ANIM_A while self.size != self.targetSize: self.size += increment self.updateMarkup() animTime *= self.ANIM_R animTime = max(self.ANIM_MIN, animTime) frameTime += animTime sleepTime = frameTime - loop.time() # In case of stress, skip refreshing by not awaiting if sleepTime > 0: await asyncio.sleep(sleepTime) else: log.warning("Skipped an animation frame") def setText(self, text: str | None) -> None: # OPTI Don't redraw nor reset animation if setting the same text if self.desiredText == text: return self.desiredText = text if text is None: self.text = "" self.targetSize = -1 else: self.text = f" {text} " self.targetSize = len(self.text) if self.animationTask: self.animationTask.cancel() # OPTI Skip the whole animation task if not required if self.size == self.targetSize: self.updateMarkup() else: self.animationTask = self.bar.taskGroup.create_task(self.animate()) def setAction(self, button: Button, callback: typing.Callable | None) -> None: if button in self.actions: command = self.actions[button] self.bar.removeAction(command) del self.actions[button] if callback: command = self.bar.addAction(callback) self.actions[button] = command def generateMarkup(self) -> str: assert not self.isHidden() pad = max(0, self.size - len(self.text)) text = self.text[: self.size] + " " * pad for button, command in self.actions.items(): text = "%{A" + button.value + ":" + command + ":}" + text + "%{A}" return text class Module(ComposableText): """ Sections handled by a same updater """ def __init__(self, parent: "Side") -> None: super().__init__(parent=parent) self.parent: "Side" self.children: typing.MutableSequence[Section] self.mirroring: Module | None = None self.mirrors: list[Module] = list() def mirror(self, module: "Module") -> None: self.mirroring = module module.mirrors.append(self) def getSections(self) -> typing.Sequence[Section]: if self.mirroring: return self.mirroring.children else: return self.children def updateMarkup(self) -> None: super().updateMarkup() for mirror in self.mirrors: mirror.updateMarkup() class Alignment(enum.Enum): LEFT = "l" RIGHT = "r" CENTER = "c" class Side(ComposableText): def __init__(self, parent: "Screen", alignment: Alignment) -> None: super().__init__(parent=parent) self.parent: Screen self.children: typing.MutableSequence[Module] = [] self.alignment = alignment def generateMarkup(self) -> str: if not self.children: return "" text = "%{" + self.alignment.value + "}" lastSection: Section | None = None for module in self.children: for section in module.getSections(): if section.isHidden(): continue if lastSection is None: if self.alignment == Alignment.LEFT: text += "%{B" + section.color + "}%{F-}" else: text += "%{B-}%{F" + section.color + "}%{R}%{F-}" else: if self.alignment == Alignment.RIGHT: if lastSection.color == section.color: text += "" else: text += "%{F" + section.color + "}%{R}" else: if lastSection.color == section.color: text += "" else: text += "%{R}%{B" + section.color + "}" text += "%{F-}" text += section.getMarkup() lastSection = section if self.alignment != Alignment.RIGHT: text += "%{R}%{B-}" return text class Screen(ComposableText): def __init__(self, parent: "Bar", output: str) -> None: super().__init__(parent=parent) self.parent: "Bar" self.children: typing.MutableSequence[Side] self.output = output for alignment in Alignment: Side(parent=self, alignment=alignment) def generateMarkup(self) -> str: return ("%{Sn" + self.output + "}") + "".join( side.getMarkup() for side in self.children ) class Bar(ComposableText): """ Top-level """ def __init__(self) -> None: super().__init__() self.parent: None self.children: typing.MutableSequence[Screen] self.refresh = asyncio.Event() self.taskGroup = asyncio.TaskGroup() self.providers: list["Provider"] = list() self.actionIndex = 0 self.actions: dict[str, typing.Callable] = dict() i3 = i3ipc.Connection() for output in i3.get_outputs(): if not output.active: continue Screen(parent=self, output=output.name) async def run(self) -> None: cmd = [ "lemonbar", "-b", "-a", "64", "-f", "DejaVuSansM Nerd Font:size=10", ] proc = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE ) async def refresher() -> None: assert proc.stdin while True: await self.refresh.wait() self.refresh.clear() markup = self.getMarkup() # log.debug(markup) proc.stdin.write(markup.encode()) async def actionHandler() -> None: assert proc.stdout while True: line = await proc.stdout.readline() command = line.decode().strip() callback = self.actions[command] callback() longRunningTasks = list() def addLongRunningTask(coro: typing.Coroutine) -> None: task = self.taskGroup.create_task(coro) longRunningTasks.append(task) async with self.taskGroup: addLongRunningTask(refresher()) addLongRunningTask(actionHandler()) for provider in self.providers: addLongRunningTask(provider.run()) def exit() -> None: log.info("Terminating") for task in longRunningTasks: task.cancel() loop = asyncio.get_event_loop() loop.add_signal_handler(signal.SIGINT, exit) def generateMarkup(self) -> str: return "".join(screen.getMarkup() for screen in self.children) + "\n" def addProvider( self, provider: "Provider", alignment: Alignment = Alignment.LEFT, screenNum: int | None = None, ) -> None: """ screenNum: the provider will be added on this screen if set, all otherwise """ modules = list() for s, screen in enumerate(self.children): if screenNum is None or s == screenNum: side = next(filter(lambda s: s.alignment == alignment, screen.children)) module = Module(parent=side) modules.append(module) provider.modules = modules if modules: self.providers.append(provider) def addAction(self, callback: typing.Callable) -> str: command = f"{self.actionIndex:x}" self.actions[command] = callback self.actionIndex += 1 return command def removeAction(self, command: str) -> None: del self.actions[command] class Provider: def __init__(self) -> None: self.modules: list[Module] = list() async def run(self) -> None: raise NotImplementedError() class MirrorProvider(Provider): def __init__(self) -> None: super().__init__() self.module: Module async def run(self) -> None: self.module = self.modules[0] for module in self.modules[1:]: module.mirror(self.module) class SingleSectionProvider(MirrorProvider): SECTION_CLASS = Section async def run(self) -> None: await super().run() self.section = self.SECTION_CLASS(parent=self.module) class StaticProvider(SingleSectionProvider): def __init__(self, text: str) -> None: self.text = text async def run(self) -> None: await super().run() self.section.setText(self.text) class StatefulSection(Section): def __init__(self, parent: Module, sortKey: Sortable = 0) -> None: super().__init__(parent=parent, sortKey=sortKey) self.state = 0 self.numberStates: int self.stateChanged = asyncio.Event() self.setAction(Button.CLICK_LEFT, self.incrementState) self.setAction(Button.CLICK_RIGHT, self.decrementState) def incrementState(self) -> None: self.state += 1 self.changeState() def decrementState(self) -> None: self.state -= 1 self.changeState() def changeState(self) -> None: self.state %= self.numberStates self.stateChanged.set() self.stateChanged.clear() class StatefulProvider(SingleSectionProvider): SECTION_CLASS = StatefulSection # Providers class I3ModeProvider(SingleSectionProvider): def on_mode(self, i3: i3ipc.Connection, e: i3ipc.Event) -> None: self.section.setText(None if e.change == "default" else e.change) async def run(self) -> None: await super().run() i3 = await i3ipc.aio.Connection(auto_reconnect=True).connect() i3.on(i3ipc.Event.MODE, self.on_mode) await i3.main() class I3WindowTitleProvider(SingleSectionProvider): # TODO FEAT To make this available from start, we need to find the # `focused=True` element following the `focus` array def on_window(self, i3: i3ipc.Connection, e: i3ipc.Event) -> None: self.section.setText(e.container.name) async def run(self) -> None: await super().run() i3 = await i3ipc.aio.Connection(auto_reconnect=True).connect() i3.on(i3ipc.Event.WINDOW, self.on_window) await i3.main() class I3WorkspacesProvider(Provider): # FIXME Custom names # FIXME Colors async def updateWorkspaces(self, i3: i3ipc.Connection) -> None: """ Since the i3 IPC interface cannot really tell you by events when workspaces get invisible or not urgent anymore. Relying on those exclusively would require reimplementing some of i3 logic. Fetching all the workspaces on event looks ugly but is the most maintainable. Times I tried to challenge this and failed: 2. """ workspaces = await i3.get_workspaces() for workspace in workspaces: module = self.modulesFromOutput[workspace.output] if workspace.num in self.sections: section = self.sections[workspace.num] if section.parent != module: section.unsetParent() section.setParent(module) else: section = Section(parent=module, sortKey=workspace.num) self.sections[workspace.num] = section def generate_switch_workspace(num: int) -> typing.Callable: def switch_workspace() -> None: self.bar.taskGroup.create_task( i3.command(f"workspace number {num}") ) return switch_workspace section.setAction( Button.CLICK_LEFT, generate_switch_workspace(workspace.num) ) name = workspace.name if workspace.urgent: name = f"{name} !" elif workspace.focused: name = f"{name} +" elif workspace.visible: name = f"{name} *" section.setText(name) workspacesNums = set(workspace.num for workspace in workspaces) for num, section in self.sections.items(): if num not in workspacesNums: # This should delete the Section but it turned out to be hard section.setText(None) def onWorkspaceChange( self, i3: i3ipc.Connection, e: i3ipc.Event | None = None ) -> None: # Cancelling the task doesn't seem to prevent performance double-events self.bar.taskGroup.create_task(self.updateWorkspaces(i3)) def __init__( self, ) -> None: super().__init__() self.sections: dict[int, Section] = dict() self.modulesFromOutput: dict[str, Module] = dict() self.bar: Bar async def run(self) -> None: for module in self.modules: screen = module.getFirstParentOfType(Screen) output = screen.output self.modulesFromOutput[output] = module self.bar = module.bar i3 = await i3ipc.aio.Connection(auto_reconnect=True).connect() i3.on(i3ipc.Event.WORKSPACE, self.onWorkspaceChange) self.onWorkspaceChange(i3) await i3.main() class NetworkProviderSection(StatefulSection): def __init__(self, parent: Module, iface: str, provider: "NetworkProvider") -> None: super().__init__(parent=parent, sortKey=iface) self.iface = iface self.provider = provider self.ignore = False self.icon = "?" self.wifi = False if iface == "lo": self.ignore = True elif iface.startswith("eth") or iface.startswith("enp"): if "u" in iface: self.icon = "" else: self.icon = "" elif iface.startswith("wlan") or iface.startswith("wl"): self.icon = "" self.wifi = True elif ( iface.startswith("tun") or iface.startswith("tap") or iface.startswith("wg") ): self.icon = "" elif iface.startswith("docker"): self.icon = "" elif iface.startswith("veth"): self.icon = "" elif iface.startswith("vboxnet"): self.icon = "" self.numberStates = 5 if self.wifi else 4 self.state = 1 if self.wifi else 0 async def getText(self) -> str | None: if self.ignore or not self.provider.if_stats[self.iface].isup: return None text = self.icon return text class NetworkProvider(MirrorProvider): def __init__(self) -> None: self.sections: dict[str, NetworkProviderSection] = dict() async def updateIface(self, iface: str) -> None: section = self.sections[iface] section.setText(await section.getText()) async def run(self) -> None: await super().run() while True: # if_addrs: dict[str, list[psutil._common.snicaddr]] = psutil.net_if_addrs() # io_counters: dict[str, psutil._common.snetio] = psutil.net_io_counters(pernic=True) async with asyncio.TaskGroup() as tg: self.if_stats = psutil.net_if_stats() for iface in self.if_stats: if iface not in self.sections: section = NetworkProviderSection( parent=self.module, iface=iface, provider=self ) self.sections[iface] = section tg.create_task(self.updateIface(iface)) for iface, section in self.sections.items(): if iface not in self.if_stats: section.setText(None) tg.create_task(asyncio.sleep(1)) class TimeProvider(StatefulProvider): FORMATS = ["%H:%M", "%m-%d %H:%M:%S", "%a %y-%m-%d %H:%M:%S"] async def run(self) -> None: await super().run() assert isinstance(self.section, StatefulSection) self.section.state = 1 self.section.numberStates = len(self.FORMATS) while True: now = datetime.datetime.now() format = self.FORMATS[self.section.state] self.section.setText(now.strftime(format)) remaining = 1 - now.microsecond / 1000000 try: await asyncio.wait_for(self.section.stateChanged.wait(), remaining) except TimeoutError: pass async def main() -> None: bar = Bar() dualScreen = len(bar.children) > 1 bar.addProvider(I3ModeProvider(), alignment=Alignment.LEFT) bar.addProvider(I3WorkspacesProvider(), alignment=Alignment.LEFT) if dualScreen: bar.addProvider( I3WindowTitleProvider(), screenNum=0, alignment=Alignment.CENTER ) bar.addProvider( StaticProvider(text="mpris"), screenNum=1 if dualScreen else None, alignment=Alignment.CENTER, ) bar.addProvider(StaticProvider("C L M T B"), alignment=Alignment.RIGHT) bar.addProvider( StaticProvider("pulse"), screenNum=1 if dualScreen else None, alignment=Alignment.RIGHT, ) bar.addProvider( NetworkProvider(), screenNum=0 if dualScreen else None, alignment=Alignment.RIGHT, ) bar.addProvider(TimeProvider(), alignment=Alignment.RIGHT) await bar.run() asyncio.run(main())