#!/usr/bin/env python3 import asyncio import datetime import enum import random import signal import typing import i3ipc class ComposableText: def __init__(self, parent: typing.Optional["ComposableText"] = None) -> None: self.parent = parent prevParent = self while parent: prevParent = parent parent = parent.parent assert isinstance(prevParent, Bar) self.bar: Bar = prevParent def updateMarkup(self) -> None: self.bar.refresh.set() # OPTI See if worth caching the output def generateMarkup(self) -> str: raise NotImplementedError(f"{self} cannot generate markup") def getMarkup(self) -> str: return self.generateMarkup() def randomColor(seed: int | bytes | None = None) -> str: if seed is not None: random.seed(seed) return "#" + "".join(f"{random.randint(0, 0xff):02x}" for _ in range(3)) class Section(ComposableText): """ Colorable block separated by chevrons """ def __init__(self, parent: "Module") -> None: super().__init__(parent=parent) self.color = randomColor() self.text: str = "" self.size = 0 self.animationTask: asyncio.Task | None = None def isHidden(self) -> bool: return self.text is None # Geometric series ANIM_A = 0.025 ANIM_R = 0.9 async def animate(self) -> None: targetSize = len(self.text) increment = 1 if self.size < targetSize else -1 loop = asyncio.get_running_loop() frameTime = loop.time() animTime = self.ANIM_A while self.size != targetSize: self.size += increment self.updateMarkup() animTime *= self.ANIM_R frameTime += animTime sleepTime = frameTime - loop.time() # In case of stress, skip refreshing by not awaiting if sleepTime > 0: await asyncio.sleep(sleepTime) def setText(self, text: str | None) -> None: # OPTI Skip if same text oldText = self.text self.text = f" {text} " if oldText == self.text: return if len(oldText) == len(self.text): self.updateMarkup() else: if self.animationTask: self.animationTask.cancel() self.animationTask = self.bar.taskGroup.create_task(self.animate()) def generateMarkup(self) -> str: pad = max(0, self.size - len(self.text)) return self.text[: self.size] + " " * pad class Module(ComposableText): """ Sections handled by a same updater """ def __init__(self, parent: "Side") -> None: super().__init__(parent=parent) self.sections: list[Section] = [] self.mirroring: Module | None = None self.mirrors: list[Module] = list() def mirror(self, module: "Module") -> None: self.mirroring = module module.mirrors.append(self) def getSections(self) -> list[Section]: if self.mirroring: return self.mirroring.sections else: return self.sections def updateMarkup(self) -> None: super().updateMarkup() for mirror in self.mirrors: mirror.updateMarkup() class Alignment(enum.Enum): LEFT = "l" RIGHT = "r" CENTER = "c" class Side(ComposableText): def __init__(self, parent: "Screen", alignment: Alignment) -> None: super().__init__(parent=parent) self.alignment = alignment self.modules: list[Module] = [] def generateMarkup(self) -> str: if not self.modules: return "" text = "%{" + self.alignment.value + "}" lastSection: Section | None = None for module in self.modules: for section in module.getSections(): if section.isHidden(): continue if lastSection is None: if self.alignment == Alignment.LEFT: text += "%{B" + section.color + "}%{F-}" else: text += "%{B-}%{F" + section.color + "}%{R}%{F-}" else: if self.alignment == Alignment.RIGHT: if lastSection.color == section.color: text += "" else: text += "%{F" + section.color + "}%{R}" else: if lastSection.color == section.color: text += "" else: text += "%{R}%{B" + section.color + "}" text += "%{F-}" text += section.getMarkup() lastSection = section if self.alignment != Alignment.RIGHT: text += "%{R}%{B-}" return text class Screen(ComposableText): def __init__(self, parent: "Bar", output: str) -> None: super().__init__(parent=parent) self.output = output self.sides = dict() for alignment in Alignment: self.sides[alignment] = Side(parent=self, alignment=alignment) def generateMarkup(self) -> str: return ("%{Sn" + self.output + "}") + "".join( side.getMarkup() for side in self.sides.values() ) class Bar(ComposableText): """ Top-level """ def __init__(self) -> None: super().__init__() self.refresh = asyncio.Event() self.taskGroup = asyncio.TaskGroup() self.providers: list["Provider"] = list() self.running = True self.screens = [] i3 = i3ipc.Connection() for output in i3.get_outputs(): if not output.active: continue screen = Screen(parent=self, output=output.name) self.screens.append(screen) async def run(self) -> None: cmd = [ "lemonbar", "-b", "-a", "64", "-f", "DejaVuSansM Nerd Font:size=10", ] proc = await asyncio.create_subprocess_exec(*cmd, stdin=asyncio.subprocess.PIPE) async def refresher() -> None: assert proc.stdin while self.running: await self.refresh.wait() self.refresh.clear() proc.stdin.write(self.getMarkup().encode()) async with self.taskGroup as tg: ref = tg.create_task(refresher()) for provider in self.providers: tg.create_task(provider.run()) def exit() -> None: print("Terminating") ref.cancel() self.running = False loop = asyncio.get_event_loop() loop.add_signal_handler(signal.SIGINT, exit) def generateMarkup(self) -> str: return "".join(section.getMarkup() for section in self.screens) + "\n" def addProvider( self, provider: "Provider", alignment: Alignment = Alignment.LEFT, screenNum: int | None = None, ) -> None: """ screenNum: the provider will be added on this screen if set, all otherwise """ modules = list() for s, screen in enumerate(self.screens): if screenNum is None or s == screenNum: side = screen.sides[alignment] module = Module(parent=side) side.modules.append(module) modules.append(module) provider.modules = modules if modules: self.providers.append(provider) class Provider: def __init__(self) -> None: self.modules: list[Module] = list() async def run(self) -> None: raise NotImplementedError() class MirrorProvider(Provider): def __init__(self) -> None: super().__init__() self.module: Module async def run(self) -> None: self.module = self.modules[0] for module in self.modules[1:]: module.mirror(self.module) class SingleSectionProvider(MirrorProvider): def __init__(self) -> None: super().__init__() self.section: Section async def run(self) -> None: await super().run() self.section = Section(parent=self.module) self.module.sections.append(self.section) class StaticProvider(SingleSectionProvider): def __init__(self, text: str) -> None: self.text = text async def run(self) -> None: await super().run() self.section.setText(self.text) class TimeProvider(SingleSectionProvider): async def run(self) -> None: await super().run() while self.section.bar.running: now = datetime.datetime.now() # section.setText(now.strftime("%a %y-%m-%d %H:%M:%S.%f")) self.section.setText("-" * (now.second % 10)) remaining = 1 - now.microsecond / 1000000 await asyncio.sleep(remaining) async def main() -> None: bar = Bar() dualScreen = len(bar.screens) > 1 bar.addProvider(StaticProvider(text="i3 workspaces"), alignment=Alignment.LEFT) if dualScreen: bar.addProvider( StaticProvider(text="i3 title"), screenNum=0, alignment=Alignment.CENTER ) bar.addProvider( StaticProvider(text="mpris"), screenNum=1 if dualScreen else None, alignment=Alignment.CENTER, ) bar.addProvider(StaticProvider("C L M T B"), alignment=Alignment.RIGHT) bar.addProvider( StaticProvider("pulse"), screenNum=1 if dualScreen else None, alignment=Alignment.RIGHT, ) bar.addProvider( StaticProvider("network"), screenNum=0 if dualScreen else None, alignment=Alignment.RIGHT, ) bar.addProvider(TimeProvider(), alignment=Alignment.RIGHT) await bar.run() asyncio.run(main())