#!/usr/bin/env python3 import asyncio import datetime import enum import ipaddress import logging import random import signal import socket import typing import coloredlogs import i3ipc import i3ipc.aio import psutil coloredlogs.install(level="DEBUG", fmt="%(levelname)s %(message)s") log = logging.getLogger() T = typing.TypeVar("T", bound="ComposableText") P = typing.TypeVar("P", bound="ComposableText") C = typing.TypeVar("C", bound="ComposableText") Sortable = str | int def humanSize(numi: int) -> str: """ Returns a string of width 3+3 """ num = float(numi) for unit in ("B ", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB"): if abs(num) < 1000: if num >= 10: return "{:3d}{}".format(int(num), unit) else: return "{:.1f}{}".format(num, unit) num /= 1024 return "{:d}YiB".format(numi) class ComposableText(typing.Generic[P, C]): def __init__( self, parent: typing.Optional[P] = None, sortKey: Sortable = 0, ) -> None: self.parent: typing.Optional[P] = None self.children: typing.MutableSequence[C] = list() self.sortKey = sortKey if parent: self.setParent(parent) self.bar = self.getFirstParentOfType(Bar) def setParent(self, parent: P) -> None: assert self.parent is None parent.children.append(self) assert isinstance(parent.children, list) parent.children.sort(key=lambda c: c.sortKey) self.parent = parent self.parent.updateMarkup() def unsetParent(self) -> None: assert self.parent self.parent.children.remove(self) self.parent.updateMarkup() self.parent = None def getFirstParentOfType(self, typ: typing.Type[T]) -> T: parent = self while not isinstance(parent, typ): assert parent.parent, f"{self} doesn't have a parent of {typ}" parent = parent.parent return parent def updateMarkup(self) -> None: self.bar.refresh.set() # TODO OPTI See if worth caching the output def generateMarkup(self) -> str: raise NotImplementedError(f"{self} cannot generate markup") def getMarkup(self) -> str: return self.generateMarkup() def randomColor(seed: int | bytes | None = None) -> str: if seed is not None: random.seed(seed) return "#" + "".join(f"{random.randint(0, 0xff):02x}" for _ in range(3)) class Button(enum.Enum): CLICK_LEFT = "1" CLICK_MIDDLE = "2" CLICK_RIGHT = "3" SCROLL_UP = "4" SCROLL_DOWN = "5" class Section(ComposableText): """ Colorable block separated by chevrons """ def __init__(self, parent: "Module", sortKey: Sortable = 0) -> None: super().__init__(parent=parent, sortKey=sortKey) self.parent: "Module" self.color = randomColor() self.desiredText: str | None = None self.text = "" self.targetSize = -1 self.size = -1 self.animationTask: asyncio.Task | None = None self.actions: dict[Button, str] = dict() def isHidden(self) -> bool: return self.size < 0 # Geometric series, with a cap ANIM_A = 0.025 ANIM_R = 0.9 ANIM_MIN = 0.001 async def animate(self) -> None: increment = 1 if self.size < self.targetSize else -1 loop = asyncio.get_running_loop() frameTime = loop.time() animTime = self.ANIM_A while self.size != self.targetSize: self.size += increment self.updateMarkup() animTime *= self.ANIM_R animTime = max(self.ANIM_MIN, animTime) frameTime += animTime sleepTime = frameTime - loop.time() # In case of stress, skip refreshing by not awaiting if sleepTime > 0: await asyncio.sleep(sleepTime) else: log.warning("Skipped an animation frame") def setText(self, text: str | None) -> None: # OPTI Don't redraw nor reset animation if setting the same text if self.desiredText == text: return self.desiredText = text if text is None: self.text = "" self.targetSize = -1 else: self.text = f" {text} " self.targetSize = len(self.text) if self.animationTask: self.animationTask.cancel() # OPTI Skip the whole animation task if not required if self.size == self.targetSize: self.updateMarkup() else: self.animationTask = self.bar.taskGroup.create_task(self.animate()) def setAction(self, button: Button, callback: typing.Callable | None) -> None: if button in self.actions: command = self.actions[button] self.bar.removeAction(command) del self.actions[button] if callback: command = self.bar.addAction(callback) self.actions[button] = command def generateMarkup(self) -> str: assert not self.isHidden() pad = max(0, self.size - len(self.text)) text = self.text[: self.size] + " " * pad for button, command in self.actions.items(): text = "%{A" + button.value + ":" + command + ":}" + text + "%{A}" return text class Module(ComposableText): """ Sections handled by a same updater """ def __init__(self, parent: "Side") -> None: super().__init__(parent=parent) self.parent: "Side" self.children: typing.MutableSequence[Section] self.mirroring: Module | None = None self.mirrors: list[Module] = list() def mirror(self, module: "Module") -> None: self.mirroring = module module.mirrors.append(self) def getSections(self) -> typing.Sequence[Section]: if self.mirroring: return self.mirroring.children else: return self.children def updateMarkup(self) -> None: super().updateMarkup() for mirror in self.mirrors: mirror.updateMarkup() class Alignment(enum.Enum): LEFT = "l" RIGHT = "r" CENTER = "c" class Side(ComposableText): def __init__(self, parent: "Screen", alignment: Alignment) -> None: super().__init__(parent=parent) self.parent: Screen self.children: typing.MutableSequence[Module] = [] self.alignment = alignment def generateMarkup(self) -> str: if not self.children: return "" text = "%{" + self.alignment.value + "}" lastSection: Section | None = None for module in self.children: for section in module.getSections(): if section.isHidden(): continue if lastSection is None: if self.alignment == Alignment.LEFT: text += "%{B" + section.color + "}%{F-}" else: text += "%{B-}%{F" + section.color + "}%{R}%{F-}" else: if self.alignment == Alignment.RIGHT: if lastSection.color == section.color: text += "" else: text += "%{F" + section.color + "}%{R}" else: if lastSection.color == section.color: text += "" else: text += "%{R}%{B" + section.color + "}" text += "%{F-}" text += section.getMarkup() lastSection = section if self.alignment != Alignment.RIGHT: text += "%{R}%{B-}" return text class Screen(ComposableText): def __init__(self, parent: "Bar", output: str) -> None: super().__init__(parent=parent) self.parent: "Bar" self.children: typing.MutableSequence[Side] self.output = output for alignment in Alignment: Side(parent=self, alignment=alignment) def generateMarkup(self) -> str: return ("%{Sn" + self.output + "}") + "".join( side.getMarkup() for side in self.children ) class Bar(ComposableText): """ Top-level """ def __init__(self) -> None: super().__init__() self.parent: None self.children: typing.MutableSequence[Screen] self.longRunningTasks: list[asyncio.Task] = list() self.refresh = asyncio.Event() self.taskGroup = asyncio.TaskGroup() self.providers: list["Provider"] = list() self.actionIndex = 0 self.actions: dict[str, typing.Callable] = dict() self.periodicProviderTask: typing.Coroutine | None = None i3 = i3ipc.Connection() for output in i3.get_outputs(): if not output.active: continue Screen(parent=self, output=output.name) def addLongRunningTask(self, coro: typing.Coroutine) -> None: task = self.taskGroup.create_task(coro) self.longRunningTasks.append(task) async def run(self) -> None: cmd = [ "lemonbar", "-b", "-a", "64", "-f", "DejaVuSansM Nerd Font:size=10", ] proc = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE ) async def refresher() -> None: assert proc.stdin while True: await self.refresh.wait() self.refresh.clear() markup = self.getMarkup() proc.stdin.write(markup.encode()) async def actionHandler() -> None: assert proc.stdout while True: line = await proc.stdout.readline() command = line.decode().strip() callback = self.actions[command] callback() async with self.taskGroup: self.addLongRunningTask(refresher()) self.addLongRunningTask(actionHandler()) for provider in self.providers: self.addLongRunningTask(provider.run()) def exit() -> None: log.info("Terminating") for task in self.longRunningTasks: task.cancel() loop = asyncio.get_event_loop() loop.add_signal_handler(signal.SIGINT, exit) def generateMarkup(self) -> str: return "".join(screen.getMarkup() for screen in self.children) + "\n" def addProvider( self, provider: "Provider", alignment: Alignment = Alignment.LEFT, screenNum: int | None = None, ) -> None: """ screenNum: the provider will be added on this screen if set, all otherwise """ modules = list() for s, screen in enumerate(self.children): if screenNum is None or s == screenNum: side = next(filter(lambda s: s.alignment == alignment, screen.children)) module = Module(parent=side) modules.append(module) provider.modules = modules if modules: self.providers.append(provider) def addAction(self, callback: typing.Callable) -> str: command = f"{self.actionIndex:x}" self.actions[command] = callback self.actionIndex += 1 return command def removeAction(self, command: str) -> None: del self.actions[command] class Provider: def __init__(self) -> None: self.modules: list[Module] = list() async def run(self) -> None: # Not a NotImplementedError, otherwise can't combine all classes pass class MirrorProvider(Provider): def __init__(self) -> None: super().__init__() self.module: Module async def run(self) -> None: await super().run() self.module = self.modules[0] for module in self.modules[1:]: module.mirror(self.module) class SingleSectionProvider(MirrorProvider): async def run(self) -> None: await super().run() self.section = Section(parent=self.module) class StaticProvider(SingleSectionProvider): def __init__(self, text: str) -> None: self.text = text async def run(self) -> None: await super().run() self.section.setText(self.text) class StatefulSection(Section): def __init__(self, parent: Module, sortKey: Sortable = 0) -> None: super().__init__(parent=parent, sortKey=sortKey) self.state = 0 self.numberStates: int self.setAction(Button.CLICK_LEFT, self.incrementState) self.setAction(Button.CLICK_RIGHT, self.decrementState) def incrementState(self) -> None: self.state += 1 self.changeState() def decrementState(self) -> None: self.state -= 1 self.changeState() def setChangedState(self, callback: typing.Callable) -> None: self.callback = callback def changeState(self) -> None: self.state %= self.numberStates self.bar.taskGroup.create_task(self.callback()) class SingleStatefulSectionProvider(MirrorProvider): async def run(self) -> None: await super().run() self.section = StatefulSection(parent=self.module) class PeriodicProvider(Provider): async def init(self) -> None: pass async def loop(self) -> None: raise NotImplementedError() @classmethod async def task(cls, bar: Bar) -> None: providers = list() for provider in bar.providers: if isinstance(provider, PeriodicProvider): providers.append(provider) await provider.init() while True: # TODO Block bar update during the periodic update of the loops loops = [provider.loop() for provider in providers] asyncio.gather(*loops) now = datetime.datetime.now() # Hardcoded to 1 second... not sure if we want some more than that, # and if the logic to check if a task should run would be a win # compared to the task itself remaining = 1 - now.microsecond / 1000000 await asyncio.sleep(remaining) async def run(self) -> None: await super().run() for module in self.modules: bar = module.getFirstParentOfType(Bar) assert bar if not bar.periodicProviderTask: bar.periodicProviderTask = PeriodicProvider.task(bar) bar.addLongRunningTask(bar.periodicProviderTask) class PeriodicStatefulProvider(SingleStatefulSectionProvider, PeriodicProvider): async def run(self) -> None: await super().run() self.section.setChangedState(self.loop) # Providers class I3ModeProvider(SingleSectionProvider): def on_mode(self, i3: i3ipc.Connection, e: i3ipc.Event) -> None: self.section.setText(None if e.change == "default" else e.change) async def run(self) -> None: await super().run() i3 = await i3ipc.aio.Connection(auto_reconnect=True).connect() i3.on(i3ipc.Event.MODE, self.on_mode) await i3.main() class I3WindowTitleProvider(SingleSectionProvider): # TODO FEAT To make this available from start, we need to find the # `focused=True` element following the `focus` array def on_window(self, i3: i3ipc.Connection, e: i3ipc.Event) -> None: self.section.setText(e.container.name) async def run(self) -> None: await super().run() i3 = await i3ipc.aio.Connection(auto_reconnect=True).connect() i3.on(i3ipc.Event.WINDOW, self.on_window) await i3.main() class I3WorkspacesProvider(Provider): # TODO Custom names # TODO Colors async def updateWorkspaces(self, i3: i3ipc.Connection) -> None: """ Since the i3 IPC interface cannot really tell you by events when workspaces get invisible or not urgent anymore. Relying on those exclusively would require reimplementing some of i3 logic. Fetching all the workspaces on event looks ugly but is the most maintainable. Times I tried to challenge this and failed: 2. """ workspaces = await i3.get_workspaces() for workspace in workspaces: module = self.modulesFromOutput[workspace.output] if workspace.num in self.sections: section = self.sections[workspace.num] if section.parent != module: section.unsetParent() section.setParent(module) else: section = Section(parent=module, sortKey=workspace.num) self.sections[workspace.num] = section def generate_switch_workspace(num: int) -> typing.Callable: def switch_workspace() -> None: self.bar.taskGroup.create_task( i3.command(f"workspace number {num}") ) return switch_workspace section.setAction( Button.CLICK_LEFT, generate_switch_workspace(workspace.num) ) name = workspace.name if workspace.urgent: name = f"{name} !" elif workspace.focused: name = f"{name} +" elif workspace.visible: name = f"{name} *" section.setText(name) workspacesNums = set(workspace.num for workspace in workspaces) for num, section in self.sections.items(): if num not in workspacesNums: # This should delete the Section but it turned out to be hard section.setText(None) def onWorkspaceChange( self, i3: i3ipc.Connection, e: i3ipc.Event | None = None ) -> None: # Cancelling the task doesn't seem to prevent performance double-events self.bar.taskGroup.create_task(self.updateWorkspaces(i3)) def __init__( self, ) -> None: super().__init__() self.sections: dict[int, Section] = dict() self.modulesFromOutput: dict[str, Module] = dict() self.bar: Bar async def run(self) -> None: for module in self.modules: screen = module.getFirstParentOfType(Screen) output = screen.output self.modulesFromOutput[output] = module self.bar = module.bar i3 = await i3ipc.aio.Connection(auto_reconnect=True).connect() i3.on(i3ipc.Event.WORKSPACE, self.onWorkspaceChange) self.onWorkspaceChange(i3) await i3.main() class NetworkProviderSection(StatefulSection): def __init__(self, parent: Module, iface: str, provider: "NetworkProvider") -> None: super().__init__(parent=parent, sortKey=iface) self.iface = iface self.provider = provider self.ignore = False self.icon = "?" self.wifi = False if iface == "lo": self.ignore = True elif iface.startswith("eth") or iface.startswith("enp"): if "u" in iface: self.icon = "" else: self.icon = "" elif iface.startswith("wlan") or iface.startswith("wl"): self.icon = "" self.wifi = True elif ( iface.startswith("tun") or iface.startswith("tap") or iface.startswith("wg") ): self.icon = "" elif iface.startswith("docker"): self.icon = "" elif iface.startswith("veth"): self.icon = "" elif iface.startswith("vboxnet"): self.icon = "" self.numberStates = 5 if self.wifi else 4 self.state = 1 if self.wifi else 0 self.setChangedState(self.update) async def update(self) -> None: if self.ignore or not self.provider.if_stats[self.iface].isup: self.setText(None) return text = self.icon state = self.state + (0 if self.wifi else 1) # SSID if self.wifi and state >= 1: cmd = ["iwgetid", self.iface, "--raw"] proc = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE ) stdout, stderr = await proc.communicate() text += f" {stdout.decode().strip()}" if state >= 2: # Address for address in self.provider.if_addrs[self.iface]: if address.family == socket.AF_INET: net = ipaddress.IPv4Network( (address.address, address.netmask), strict=False ) text += f" {net.with_prefixlen}" break if state >= 3: # Speed prevRecv = self.provider.prev_io_counters[self.iface].bytes_recv recv = self.provider.io_counters[self.iface].bytes_recv prevSent = self.provider.prev_io_counters[self.iface].bytes_sent sent = self.provider.io_counters[self.iface].bytes_sent dt = self.provider.time - self.provider.prev_time recvDiff = (recv - prevRecv) / dt sentDiff = (sent - prevSent) / dt text += f" ↓{humanSize(recvDiff)}↑{humanSize(sentDiff)}" if state >= 4: # Counter text += f" ⇓{humanSize(recv)}⇑{humanSize(sent)}" self.setText(text) class NetworkProvider(MirrorProvider, PeriodicProvider): def __init__(self) -> None: super().__init__() self.sections: dict[str, NetworkProviderSection] = dict() async def init(self) -> None: loop = asyncio.get_running_loop() self.time = loop.time() self.io_counters = psutil.net_io_counters(pernic=True) async def loop(self) -> None: loop = asyncio.get_running_loop() async with asyncio.TaskGroup() as tg: self.prev_io_counters = self.io_counters self.prev_time = self.time # On-demand would only benefit if_addrs: # stats are used to determine display, # and we want to keep previous io_counters # so displaying stats is ~instant. self.time = loop.time() self.if_stats = psutil.net_if_stats() self.if_addrs = psutil.net_if_addrs() self.io_counters = psutil.net_io_counters(pernic=True) for iface in self.if_stats: section = self.sections.get(iface) if not section: section = NetworkProviderSection( parent=self.module, iface=iface, provider=self ) self.sections[iface] = section tg.create_task(section.update()) for iface, section in self.sections.items(): if iface not in self.if_stats: section.setText(None) async def onStateChange(self, section: StatefulSection) -> None: assert isinstance(section, NetworkProviderSection) await section.update() class TimeProvider(PeriodicStatefulProvider): FORMATS = ["%H:%M", "%m-%d %H:%M:%S", "%a %y-%m-%d %H:%M:%S"] async def init(self) -> None: self.section.state = 1 self.section.numberStates = len(self.FORMATS) async def loop(self) -> None: now = datetime.datetime.now() format = self.FORMATS[self.section.state] self.section.setText(now.strftime(format)) async def main() -> None: bar = Bar() dualScreen = len(bar.children) > 1 bar.addProvider(I3ModeProvider(), alignment=Alignment.LEFT) bar.addProvider(I3WorkspacesProvider(), alignment=Alignment.LEFT) if dualScreen: bar.addProvider( I3WindowTitleProvider(), screenNum=0, alignment=Alignment.CENTER ) bar.addProvider( StaticProvider(text="mpris"), screenNum=1 if dualScreen else None, alignment=Alignment.CENTER, ) bar.addProvider(StaticProvider("C L M T B"), alignment=Alignment.RIGHT) bar.addProvider( StaticProvider("pulse"), screenNum=1 if dualScreen else None, alignment=Alignment.RIGHT, ) bar.addProvider( NetworkProvider(), screenNum=0 if dualScreen else None, alignment=Alignment.RIGHT, ) bar.addProvider(TimeProvider(), alignment=Alignment.RIGHT) await bar.run() asyncio.run(main())