#!/usr/bin/env python3 import asyncio import datetime import enum import random import typing import i3ipc class ComposableText: def __init__(self, parent: typing.Optional["ComposableText"] = None) -> None: self.parent = parent prevParent = self while parent: prevParent = parent parent = parent.parent assert isinstance(prevParent, Bar) self.bar: Bar = prevParent def updateMarkup(self) -> None: self.bar.refresh.set() # OPTI See if worth caching the output def generateMarkup(self) -> str: raise NotImplementedError(f"{self} cannot generate markup") def getMarkup(self) -> str: return self.generateMarkup() def randomColor(seed: int | bytes | None = None) -> str: if seed is not None: random.seed(seed) return "#" + "".join(f"{random.randint(0, 0xff):02x}" for _ in range(3)) class Section(ComposableText): """ Colorable block separated by chevrons """ def __init__(self, parent: "Module") -> None: super().__init__(parent=parent) self.color = randomColor() self.text: str = "" self.size = 0 self.animationTask: asyncio.Task | None = None def isHidden(self) -> bool: return self.text is None # Geometric series ANIM_A = 0.025 ANIM_R = 0.9 async def animate(self) -> None: targetSize = len(self.text) increment = 1 if self.size < targetSize else -1 loop = asyncio.get_running_loop() frameTime = loop.time() animTime = self.ANIM_A while self.size != targetSize: self.size += increment self.updateMarkup() animTime *= self.ANIM_R frameTime += animTime sleepTime = frameTime - loop.time() # In case of stress, skip refreshing by not awaiting if sleepTime > 0: await asyncio.sleep(sleepTime) def setText(self, text: str | None) -> None: # OPTI Skip if same text oldText = self.text self.text = f" {text} " if oldText == self.text: return if len(oldText) == len(self.text): self.updateMarkup() else: if self.animationTask: self.animationTask.cancel() self.animationTask = self.bar.taskGroup.create_task(self.animate()) def generateMarkup(self) -> str: pad = max(0, self.size - len(self.text)) return self.text[: self.size] + " " * pad class Module(ComposableText): """ Sections handled by a same updater """ def __init__(self, parent: "Side") -> None: super().__init__(parent=parent) self.sections: list[Section] = [] class Alignment(enum.Enum): LEFT = "l" RIGHT = "r" CENTER = "c" class Side(ComposableText): def __init__(self, parent: "Screen", alignment: Alignment) -> None: super().__init__(parent=parent) self.alignment = alignment self.modules: list[Module] = [] def generateMarkup(self) -> str: if not self.modules: return "" text = "%{" + self.alignment.value + "}" lastSection: Section | None = None for module in self.modules: for section in module.sections: if section.isHidden(): continue if lastSection is None: if self.alignment == Alignment.LEFT: text += "%{B" + section.color + "}%{F-}" else: text += "%{B-}%{F" + section.color + "}%{R}%{F-}" else: if self.alignment == Alignment.RIGHT: if lastSection.color == section.color: text += "" else: text += "%{F" + section.color + "}%{R}" else: if lastSection.color == section.color: text += "" else: text += "%{R}%{B" + section.color + "}" text += "%{F-}" text += section.getMarkup() lastSection = section if self.alignment != Alignment.RIGHT: text += "%{R}%{B-}" return text class Screen(ComposableText): def __init__(self, parent: "Bar", output: str) -> None: super().__init__(parent=parent) self.output = output self.sides = dict() for alignment in Alignment: self.sides[alignment] = Side(parent=self, alignment=alignment) def generateMarkup(self) -> str: return ("%{Sn" + self.output + "}") + "".join( side.getMarkup() for side in self.sides.values() ) class Bar(ComposableText): """ Top-level """ def __init__(self) -> None: super().__init__() self.refresh = asyncio.Event() self.taskGroup = asyncio.TaskGroup() self.providers: list["Provider"] = list() self.screens = [] i3 = i3ipc.Connection() for output in i3.get_outputs(): if not output.active: continue screen = Screen(parent=self, output=output.name) self.screens.append(screen) async def run(self) -> None: proc = await asyncio.create_subprocess_exec( "lemonbar", "-b", "-a", "64", "-f", "DejaVuSansM Nerd Font:size=10", stdin=asyncio.subprocess.PIPE, ) async def refresher() -> None: assert proc.stdin while True: await self.refresh.wait() self.refresh.clear() proc.stdin.write(self.getMarkup().encode()) async with self.taskGroup as tg: tg.create_task(refresher()) for provider in self.providers: tg.create_task(provider.run()) def generateMarkup(self) -> str: return "".join(section.getMarkup() for section in self.screens) + "\n" def addProvider( self, provider: "Provider", alignment: Alignment = Alignment.LEFT, screenNum: int | None = None, screenCount: int | None = None, ) -> None: """ screenNum: the provider will be added on this screen if set, all otherwise screenCount: the provider will be added if there is this many screens, always otherwise """ modules = list() for s, screen in enumerate(self.screens): if (screenCount is None or len(self.screens) == screenCount) and ( screenNum is None or s == screenNum ): side = screen.sides[alignment] module = Module(parent=side) side.modules.append(module) modules.append(module) provider.modules = modules if modules: self.providers.append(provider) class Provider: def __init__(self) -> None: self.modules: list[Module] = list() async def run(self) -> None: raise NotImplementedError() class TimeProvider(Provider): async def run(self) -> None: sections = list() for module in self.modules: section = Section(parent=module) module.sections.append(section) sections.append(section) # FIXME Allow for mirror(ed) modules so no need for updaters to handle all while True: now = datetime.datetime.now() for s, section in enumerate(sections): # section.setText(now.strftime("%a %y-%m-%d %H:%M:%S.%f")) section.setText("-" * (now.second % 10)) remaining = 1 - now.microsecond / 1000000 await asyncio.sleep(remaining) async def main() -> None: bar = Bar() bar.addProvider(TimeProvider()) bar.addProvider(TimeProvider(), screenNum=1) bar.addProvider(TimeProvider(), alignment=Alignment.CENTER, screenCount=2) bar.addProvider(TimeProvider(), alignment=Alignment.CENTER) bar.addProvider(TimeProvider(), alignment=Alignment.RIGHT) bar.addProvider(TimeProvider(), alignment=Alignment.RIGHT) await bar.run() asyncio.run(main()) # TODO Replace while True with while bar.running or something