#!/usr/bin/env python3 import asyncio import datetime import enum import logging import random import signal import typing import coloredlogs import i3ipc import i3ipc.aio coloredlogs.install(level="DEBUG", fmt="%(levelname)s %(message)s") log = logging.getLogger() T = typing.TypeVar("T", bound="ComposableText") class ComposableText: def getFirstParentOfType(self, typ: typing.Type[T]) -> T: parent = self while not isinstance(parent, typ): assert parent.parent, f"{self} doesn't have a parent of {typ}" parent = parent.parent return parent def __init__(self, parent: typing.Optional["ComposableText"] = None) -> None: self.parent = parent self.bar = self.getFirstParentOfType(Bar) def updateMarkup(self) -> None: self.bar.refresh.set() # TODO OPTI See if worth caching the output def generateMarkup(self) -> str: raise NotImplementedError(f"{self} cannot generate markup") def getMarkup(self) -> str: return self.generateMarkup() def randomColor(seed: int | bytes | None = None) -> str: if seed is not None: random.seed(seed) return "#" + "".join(f"{random.randint(0, 0xff):02x}" for _ in range(3)) class Button(enum.Enum): CLICK_LEFT = "1" CLICK_MIDDLE = "2" CLICK_RIGHT = "3" SCROLL_UP = "4" SCROLL_DOWN = "5" class Section(ComposableText): """ Colorable block separated by chevrons """ def __init__(self, parent: "Module") -> None: super().__init__(parent=parent) self.parent: "Module" self.color = randomColor() self.desiredText: str | None = None self.text = "" self.targetSize = -1 self.size = -1 self.animationTask: asyncio.Task | None = None self.actions: dict[Button, str] = dict() def isHidden(self) -> bool: return self.size < 0 # Geometric series, with a cap ANIM_A = 0.025 ANIM_R = 0.9 ANIM_MIN = 0.001 async def animate(self) -> None: increment = 1 if self.size < self.targetSize else -1 loop = asyncio.get_running_loop() frameTime = loop.time() animTime = self.ANIM_A while self.size != self.targetSize: self.size += increment self.updateMarkup() animTime *= self.ANIM_R animTime = max(self.ANIM_MIN, animTime) frameTime += animTime sleepTime = frameTime - loop.time() # In case of stress, skip refreshing by not awaiting if sleepTime > 0: await asyncio.sleep(sleepTime) else: log.warning("Skipped an animation frame") def setText(self, text: str | None) -> None: # OPTI Don't redraw nor reset animation if setting the same text if self.desiredText == text: return self.desiredText = text if text is None: self.text = "" self.targetSize = -1 else: self.text = f" {text} " self.targetSize = len(self.text) if self.animationTask: self.animationTask.cancel() # OPTI Skip the whole animation task if not required if self.size == self.targetSize: self.updateMarkup() else: self.animationTask = self.bar.taskGroup.create_task(self.animate()) def setAction(self, button: Button, callback: typing.Callable | None) -> None: if button in self.actions: command = self.actions[button] self.bar.removeAction(command) del self.actions[button] if callback: command = self.bar.addAction(callback) self.actions[button] = command def generateMarkup(self) -> str: assert not self.isHidden() pad = max(0, self.size - len(self.text)) text = self.text[: self.size] + " " * pad for button, command in self.actions.items(): text = "%{A" + button.value + ":" + command + ":}" + text + "%{A}" return text class Module(ComposableText): """ Sections handled by a same updater """ def __init__(self, parent: "Side") -> None: super().__init__(parent=parent) self.parent: "Side" self.sections: list[Section] = [] self.mirroring: Module | None = None self.mirrors: list[Module] = list() def mirror(self, module: "Module") -> None: self.mirroring = module module.mirrors.append(self) def getSections(self) -> list[Section]: if self.mirroring: return self.mirroring.sections else: return self.sections def updateMarkup(self) -> None: super().updateMarkup() for mirror in self.mirrors: mirror.updateMarkup() class Alignment(enum.Enum): LEFT = "l" RIGHT = "r" CENTER = "c" class Side(ComposableText): def __init__(self, parent: "Screen", alignment: Alignment) -> None: super().__init__(parent=parent) self.parent: Screen self.alignment = alignment self.modules: list[Module] = [] def generateMarkup(self) -> str: if not self.modules: return "" text = "%{" + self.alignment.value + "}" lastSection: Section | None = None for module in self.modules: for section in module.getSections(): if section.isHidden(): continue if lastSection is None: if self.alignment == Alignment.LEFT: text += "%{B" + section.color + "}%{F-}" else: text += "%{B-}%{F" + section.color + "}%{R}%{F-}" else: if self.alignment == Alignment.RIGHT: if lastSection.color == section.color: text += "" else: text += "%{F" + section.color + "}%{R}" else: if lastSection.color == section.color: text += "" else: text += "%{R}%{B" + section.color + "}" text += "%{F-}" text += section.getMarkup() lastSection = section if self.alignment != Alignment.RIGHT: text += "%{R}%{B-}" return text class Screen(ComposableText): def __init__(self, parent: "Bar", output: str) -> None: super().__init__(parent=parent) self.parent: "Bar" self.output = output self.sides = dict() for alignment in Alignment: self.sides[alignment] = Side(parent=self, alignment=alignment) def generateMarkup(self) -> str: return ("%{Sn" + self.output + "}") + "".join( side.getMarkup() for side in self.sides.values() ) class Bar(ComposableText): """ Top-level """ def __init__(self) -> None: super().__init__() self.refresh = asyncio.Event() self.taskGroup = asyncio.TaskGroup() self.providers: list["Provider"] = list() self.actionIndex = 0 self.actions: dict[str, typing.Callable] = dict() self.screens = [] i3 = i3ipc.Connection() for output in i3.get_outputs(): if not output.active: continue screen = Screen(parent=self, output=output.name) self.screens.append(screen) async def run(self) -> None: cmd = [ "lemonbar", "-b", "-a", "64", "-f", "DejaVuSansM Nerd Font:size=10", ] proc = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE ) async def refresher() -> None: assert proc.stdin while True: await self.refresh.wait() self.refresh.clear() markup = self.getMarkup() # log.debug(markup) proc.stdin.write(markup.encode()) async def actionHandler() -> None: assert proc.stdout while True: line = await proc.stdout.readline() command = line.decode().strip() callback = self.actions[command] callback() longRunningTasks = list() def addLongRunningTask(coro: typing.Coroutine) -> None: task = self.taskGroup.create_task(coro) longRunningTasks.append(task) async with self.taskGroup: addLongRunningTask(refresher()) addLongRunningTask(actionHandler()) for provider in self.providers: addLongRunningTask(provider.run()) def exit() -> None: log.info("Terminating") for task in longRunningTasks: task.cancel() loop = asyncio.get_event_loop() loop.add_signal_handler(signal.SIGINT, exit) def generateMarkup(self) -> str: return "".join(section.getMarkup() for section in self.screens) + "\n" def addProvider( self, provider: "Provider", alignment: Alignment = Alignment.LEFT, screenNum: int | None = None, ) -> None: """ screenNum: the provider will be added on this screen if set, all otherwise """ modules = list() for s, screen in enumerate(self.screens): if screenNum is None or s == screenNum: side = screen.sides[alignment] module = Module(parent=side) side.modules.append(module) modules.append(module) provider.modules = modules if modules: self.providers.append(provider) def addAction(self, callback: typing.Callable) -> str: command = f"{self.actionIndex:x}" self.actions[command] = callback self.actionIndex += 1 return command def removeAction(self, command: str) -> None: del self.actions[command] class Provider: def __init__(self) -> None: self.modules: list[Module] = list() async def run(self) -> None: raise NotImplementedError() class MirrorProvider(Provider): def __init__(self) -> None: super().__init__() self.module: Module async def run(self) -> None: self.module = self.modules[0] for module in self.modules[1:]: module.mirror(self.module) class SingleSectionProvider(MirrorProvider): def __init__(self) -> None: super().__init__() self.section: Section async def run(self) -> None: await super().run() self.section = Section(parent=self.module) self.module.sections.append(self.section) class StaticProvider(SingleSectionProvider): def __init__(self, text: str) -> None: self.text = text async def run(self) -> None: await super().run() self.section.setText(self.text) class StatefulProvider(SingleSectionProvider): # TODO Should actually be a Section descendant NUMBER_STATES: int def __init__(self) -> None: super().__init__() self.state = 0 self.stateChanged = asyncio.Event() def incrementState(self) -> None: self.state += 1 self.changeState() def decrementState(self) -> None: self.state -= 1 self.changeState() def changeState(self) -> None: self.state %= self.NUMBER_STATES self.stateChanged.set() self.stateChanged.clear() async def run(self) -> None: await super().run() self.section.setAction(Button.CLICK_LEFT, self.incrementState) self.section.setAction(Button.CLICK_RIGHT, self.decrementState) # Providers class I3ModeProvider(SingleSectionProvider): def on_mode(self, i3: i3ipc.Connection, e: i3ipc.Event) -> None: self.section.setText(None if e.change == "default" else e.change) async def run(self) -> None: await super().run() i3 = await i3ipc.aio.Connection(auto_reconnect=True).connect() i3.on(i3ipc.Event.MODE, self.on_mode) await i3.main() class I3WindowTitleProvider(SingleSectionProvider): # TODO FEAT To make this available from start, we need to find the # `focused=True` element following the `focus` array def on_window(self, i3: i3ipc.Connection, e: i3ipc.Event) -> None: self.section.setText(e.container.name) async def run(self) -> None: await super().run() i3 = await i3ipc.aio.Connection(auto_reconnect=True).connect() i3.on(i3ipc.Event.WINDOW, self.on_window) await i3.main() class I3WorkspacesProvider(Provider): # FIXME Custom names # FIXME Colors async def updateWorkspaces(self, i3: i3ipc.Connection) -> None: """ Since the i3 IPC interface cannot really tell you by events when workspaces get invisible or not urgent anymore. Relying on those exclusively would require reimplementing some of i3 logic. Fetching all the workspaces on event looks ugly but is the most maintainable. Times I tried to challenge this and failed: 2. """ workspaces = await i3.get_workspaces() for workspace in workspaces: module = self.modulesFromOutput[workspace.output] insert = False if workspace.num in self.sections: section = self.sections[workspace.num] if section.parent != module: section.parent.sections.remove(section) section.parent = module section.updateMarkup() insert = True else: section = Section(parent=module) self.sections[workspace.num] = section insert = True def generate_switch_workspace(num: int) -> typing.Callable: def switch_workspace() -> None: self.bar.taskGroup.create_task( i3.command(f"workspace number {num}") ) return switch_workspace section.setAction( Button.CLICK_LEFT, generate_switch_workspace(workspace.num) ) if insert: module.sections.append(section) revSections = dict((v, k) for k, v in self.sections.items()) module.sections.sort(key=lambda s: revSections[s]) name = workspace.name if workspace.urgent: name = f"{name} !" elif workspace.focused: name = f"{name} +" elif workspace.visible: name = f"{name} *" section.setText(name) workspacesNums = set(workspace.num for workspace in workspaces) for num, section in self.sections.items(): if num not in workspacesNums: # This should delete the Section but it turned out to be hard section.setText(None) def onWorkspaceChange( self, i3: i3ipc.Connection, e: i3ipc.Event | None = None ) -> None: # Cancelling the task doesn't seem to prevent performance double-events self.bar.taskGroup.create_task(self.updateWorkspaces(i3)) def __init__( self, ) -> None: super().__init__() self.sections: dict[int, Section] = dict() self.modulesFromOutput: dict[str, Module] = dict() self.bar: Bar async def run(self) -> None: for module in self.modules: screen = module.getFirstParentOfType(Screen) output = screen.output self.modulesFromOutput[output] = module self.bar = module.bar i3 = await i3ipc.aio.Connection(auto_reconnect=True).connect() i3.on(i3ipc.Event.WORKSPACE, self.onWorkspaceChange) self.onWorkspaceChange(i3) await i3.main() class TimeProvider(StatefulProvider): FORMATS = ["%H:%M", "%m-%d %H:%M:%S", "%a %y-%m-%d %H:%M:%S"] NUMBER_STATES = len(FORMATS) async def run(self) -> None: await super().run() self.state = 1 while True: now = datetime.datetime.now() format = self.FORMATS[self.state] self.section.setText(now.strftime(format)) remaining = 1 - now.microsecond / 1000000 try: await asyncio.wait_for(self.stateChanged.wait(), remaining) except TimeoutError: pass async def main() -> None: bar = Bar() dualScreen = len(bar.screens) > 1 bar.addProvider(I3ModeProvider(), alignment=Alignment.LEFT) bar.addProvider(I3WorkspacesProvider(), alignment=Alignment.LEFT) if dualScreen: bar.addProvider( I3WindowTitleProvider(), screenNum=0, alignment=Alignment.CENTER ) bar.addProvider( StaticProvider(text="mpris"), screenNum=1 if dualScreen else None, alignment=Alignment.CENTER, ) bar.addProvider(StaticProvider("C L M T B"), alignment=Alignment.RIGHT) bar.addProvider( StaticProvider("pulse"), screenNum=1 if dualScreen else None, alignment=Alignment.RIGHT, ) bar.addProvider( StaticProvider("network"), screenNum=0 if dualScreen else None, alignment=Alignment.RIGHT, ) bar.addProvider(TimeProvider(), alignment=Alignment.RIGHT) await bar.run() asyncio.run(main())