From f81fd6bfd2021cdf4b11a4581495505059f99b15 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Geoffrey=20=E2=80=9CFrogeye=E2=80=9D=20Preud=27homme?= Date: Sat, 17 Aug 2024 00:57:06 +0200 Subject: [PATCH] frobarng: Even more dev --- hm/desktop/frobar/.dev/new.py | 309 +++++++++++++++++++++++++++++----- 1 file changed, 268 insertions(+), 41 deletions(-) diff --git a/hm/desktop/frobar/.dev/new.py b/hm/desktop/frobar/.dev/new.py index bab7e0c..0e33ad2 100644 --- a/hm/desktop/frobar/.dev/new.py +++ b/hm/desktop/frobar/.dev/new.py @@ -3,27 +3,36 @@ import asyncio import datetime import enum +import logging import random import signal import typing +import coloredlogs import i3ipc +import i3ipc.aio + +coloredlogs.install(level="DEBUG", fmt="%(levelname)s %(message)s") +log = logging.getLogger() + +T = typing.TypeVar("T", bound="ComposableText") class ComposableText: + def getFirstParentOfType(self, typ: typing.Type[T]) -> T: + parent = self + while not isinstance(parent, typ): + assert parent.parent, f"{self} doesn't have a parent of {typ}" + parent = parent.parent + return parent + def __init__(self, parent: typing.Optional["ComposableText"] = None) -> None: self.parent = parent - - prevParent = self - while parent: - prevParent = parent - parent = parent.parent - assert isinstance(prevParent, Bar) - self.bar: Bar = prevParent + self.bar = self.getFirstParentOfType(Bar) def updateMarkup(self) -> None: self.bar.refresh.set() - # OPTI See if worth caching the output + # TODO OPTI See if worth caching the output def generateMarkup(self) -> str: raise NotImplementedError(f"{self} cannot generate markup") @@ -38,6 +47,14 @@ def randomColor(seed: int | bytes | None = None) -> str: return "#" + "".join(f"{random.randint(0, 0xff):02x}" for _ in range(3)) +class Button(enum.Enum): + CLICK_LEFT = "1" + CLICK_MIDDLE = "2" + CLICK_RIGHT = "3" + SCROLL_UP = "4" + SCROLL_DOWN = "5" + + class Section(ComposableText): """ Colorable block separated by chevrons @@ -45,54 +62,80 @@ class Section(ComposableText): def __init__(self, parent: "Module") -> None: super().__init__(parent=parent) + self.parent: "Module" + self.color = randomColor() - self.text: str = "" - self.size = 0 + self.desiredText: str | None = None + self.text = "" + self.targetSize = -1 + self.size = -1 self.animationTask: asyncio.Task | None = None + self.actions: dict[Button, str] = dict() def isHidden(self) -> bool: - return self.text is None + return self.size < 0 - # Geometric series + # Geometric series, with a cap ANIM_A = 0.025 ANIM_R = 0.9 + ANIM_MIN = 0.001 async def animate(self) -> None: - targetSize = len(self.text) - increment = 1 if self.size < targetSize else -1 - + increment = 1 if self.size < self.targetSize else -1 loop = asyncio.get_running_loop() frameTime = loop.time() animTime = self.ANIM_A - while self.size != targetSize: + while self.size != self.targetSize: self.size += increment self.updateMarkup() animTime *= self.ANIM_R + animTime = max(self.ANIM_MIN, animTime) frameTime += animTime sleepTime = frameTime - loop.time() # In case of stress, skip refreshing by not awaiting if sleepTime > 0: await asyncio.sleep(sleepTime) + else: + log.warning("Skipped an animation frame") def setText(self, text: str | None) -> None: - # OPTI Skip if same text - oldText = self.text - self.text = f" {text} " - if oldText == self.text: + # OPTI Don't redraw nor reset animation if setting the same text + if self.desiredText == text: return - if len(oldText) == len(self.text): + self.desiredText = text + if text is None: + self.text = "" + self.targetSize = -1 + else: + self.text = f" {text} " + self.targetSize = len(self.text) + if self.animationTask: + self.animationTask.cancel() + # OPTI Skip the whole animation task if not required + if self.size == self.targetSize: self.updateMarkup() else: - if self.animationTask: - self.animationTask.cancel() self.animationTask = self.bar.taskGroup.create_task(self.animate()) + def setAction(self, button: Button, callback: typing.Callable | None) -> None: + if button in self.actions: + command = self.actions[button] + self.bar.removeAction(command) + del self.actions[button] + if callback: + command = self.bar.addAction(callback) + self.actions[button] = command + def generateMarkup(self) -> str: + assert not self.isHidden() pad = max(0, self.size - len(self.text)) - return self.text[: self.size] + " " * pad + text = self.text[: self.size] + " " * pad + for button, command in self.actions.items(): + text = "%{A" + button.value + ":" + command + ":}" + text + "%{A}" + return text class Module(ComposableText): @@ -102,6 +145,8 @@ class Module(ComposableText): def __init__(self, parent: "Side") -> None: super().__init__(parent=parent) + self.parent: "Side" + self.sections: list[Section] = [] self.mirroring: Module | None = None self.mirrors: list[Module] = list() @@ -131,6 +176,8 @@ class Alignment(enum.Enum): class Side(ComposableText): def __init__(self, parent: "Screen", alignment: Alignment) -> None: super().__init__(parent=parent) + self.parent: Screen + self.alignment = alignment self.modules: list[Module] = [] @@ -170,6 +217,8 @@ class Side(ComposableText): class Screen(ComposableText): def __init__(self, parent: "Bar", output: str) -> None: super().__init__(parent=parent) + self.parent: "Bar" + self.output = output self.sides = dict() @@ -192,7 +241,8 @@ class Bar(ComposableText): self.refresh = asyncio.Event() self.taskGroup = asyncio.TaskGroup() self.providers: list["Provider"] = list() - self.running = True + self.actionIndex = 0 + self.actions: dict[str, typing.Callable] = dict() self.screens = [] i3 = i3ipc.Connection() @@ -211,24 +261,43 @@ class Bar(ComposableText): "-f", "DejaVuSansM Nerd Font:size=10", ] - proc = await asyncio.create_subprocess_exec(*cmd, stdin=asyncio.subprocess.PIPE) + proc = await asyncio.create_subprocess_exec( + *cmd, stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE + ) async def refresher() -> None: assert proc.stdin - while self.running: + while True: await self.refresh.wait() self.refresh.clear() - proc.stdin.write(self.getMarkup().encode()) + markup = self.getMarkup() + # log.debug(markup) + proc.stdin.write(markup.encode()) - async with self.taskGroup as tg: - ref = tg.create_task(refresher()) + async def actionHandler() -> None: + assert proc.stdout + while True: + line = await proc.stdout.readline() + command = line.decode().strip() + callback = self.actions[command] + callback() + + longRunningTasks = list() + + def addLongRunningTask(coro: typing.Coroutine) -> None: + task = self.taskGroup.create_task(coro) + longRunningTasks.append(task) + + async with self.taskGroup: + addLongRunningTask(refresher()) + addLongRunningTask(actionHandler()) for provider in self.providers: - tg.create_task(provider.run()) + addLongRunningTask(provider.run()) def exit() -> None: - print("Terminating") - ref.cancel() - self.running = False + log.info("Terminating") + for task in longRunningTasks: + task.cancel() loop = asyncio.get_event_loop() loop.add_signal_handler(signal.SIGINT, exit) @@ -256,6 +325,15 @@ class Bar(ComposableText): if modules: self.providers.append(provider) + def addAction(self, callback: typing.Callable) -> str: + command = f"{self.actionIndex:x}" + self.actions[command] = callback + self.actionIndex += 1 + return command + + def removeAction(self, command: str) -> None: + del self.actions[command] + class Provider: def __init__(self) -> None: @@ -296,26 +374,175 @@ class StaticProvider(SingleSectionProvider): self.section.setText(self.text) -class TimeProvider(SingleSectionProvider): +class StatefulProvider(SingleSectionProvider): + # TODO Should actually be a Section descendant + NUMBER_STATES: int + + def __init__(self) -> None: + super().__init__() + self.state = 0 + self.stateChanged = asyncio.Event() + + def incrementState(self) -> None: + self.state += 1 + self.changeState() + + def decrementState(self) -> None: + self.state -= 1 + self.changeState() + + def changeState(self) -> None: + self.state %= self.NUMBER_STATES + self.stateChanged.set() + self.stateChanged.clear() + async def run(self) -> None: await super().run() + self.section.setAction(Button.CLICK_LEFT, self.incrementState) + self.section.setAction(Button.CLICK_RIGHT, self.decrementState) - while self.section.bar.running: + +# Providers + + +class I3ModeProvider(SingleSectionProvider): + def on_mode(self, i3: i3ipc.Connection, e: i3ipc.Event) -> None: + self.section.setText(None if e.change == "default" else e.change) + + async def run(self) -> None: + await super().run() + i3 = await i3ipc.aio.Connection(auto_reconnect=True).connect() + i3.on(i3ipc.Event.MODE, self.on_mode) + await i3.main() + + +class I3WindowTitleProvider(SingleSectionProvider): + # TODO FEAT To make this available from start, we need to find the + # `focused=True` element following the `focus` array + def on_window(self, i3: i3ipc.Connection, e: i3ipc.Event) -> None: + self.section.setText(e.container.name) + + async def run(self) -> None: + await super().run() + i3 = await i3ipc.aio.Connection(auto_reconnect=True).connect() + i3.on(i3ipc.Event.WINDOW, self.on_window) + await i3.main() + + +class I3WorkspacesProvider(Provider): + # FIXME Custom names + # FIXME Colors + + async def updateWorkspaces(self, i3: i3ipc.Connection) -> None: + """ + Since the i3 IPC interface cannot really tell you by events + when workspaces get invisible or not urgent anymore. + Relying on those exclusively would require reimplementing some of i3 logic. + Fetching all the workspaces on event looks ugly but is the most maintainable. + Times I tried to challenge this and failed: 2. + """ + workspaces = await i3.get_workspaces() + for workspace in workspaces: + module = self.modulesFromOutput[workspace.output] + insert = False + if workspace.num in self.sections: + section = self.sections[workspace.num] + if section.parent != module: + section.parent.sections.remove(section) + section.parent = module + section.updateMarkup() + insert = True + else: + section = Section(parent=module) + self.sections[workspace.num] = section + insert = True + + def generate_switch_workspace(num: int) -> typing.Callable: + def switch_workspace() -> None: + self.bar.taskGroup.create_task( + i3.command(f"workspace number {num}") + ) + + return switch_workspace + + section.setAction( + Button.CLICK_LEFT, generate_switch_workspace(workspace.num) + ) + if insert: + module.sections.append(section) + revSections = dict((v, k) for k, v in self.sections.items()) + module.sections.sort(key=lambda s: revSections[s]) + name = workspace.name + if workspace.urgent: + name = f"{name} !" + elif workspace.focused: + name = f"{name} +" + elif workspace.visible: + name = f"{name} *" + section.setText(name) + workspacesNums = set(workspace.num for workspace in workspaces) + for num, section in self.sections.items(): + if num not in workspacesNums: + # This should delete the Section but it turned out to be hard + section.setText(None) + + def onWorkspaceChange( + self, i3: i3ipc.Connection, e: i3ipc.Event | None = None + ) -> None: + # Cancelling the task doesn't seem to prevent performance double-events + self.bar.taskGroup.create_task(self.updateWorkspaces(i3)) + + def __init__( + self, + ) -> None: + super().__init__() + + self.sections: dict[int, Section] = dict() + self.modulesFromOutput: dict[str, Module] = dict() + self.bar: Bar + + async def run(self) -> None: + for module in self.modules: + screen = module.getFirstParentOfType(Screen) + output = screen.output + self.modulesFromOutput[output] = module + self.bar = module.bar + + i3 = await i3ipc.aio.Connection(auto_reconnect=True).connect() + i3.on(i3ipc.Event.WORKSPACE, self.onWorkspaceChange) + self.onWorkspaceChange(i3) + await i3.main() + + +class TimeProvider(StatefulProvider): + FORMATS = ["%H:%M", "%m-%d %H:%M:%S", "%a %y-%m-%d %H:%M:%S"] + NUMBER_STATES = len(FORMATS) + + async def run(self) -> None: + await super().run() + self.state = 1 + + while True: now = datetime.datetime.now() - # section.setText(now.strftime("%a %y-%m-%d %H:%M:%S.%f")) - self.section.setText("-" * (now.second % 10)) + format = self.FORMATS[self.state] + self.section.setText(now.strftime(format)) + remaining = 1 - now.microsecond / 1000000 - await asyncio.sleep(remaining) + try: + await asyncio.wait_for(self.stateChanged.wait(), remaining) + except TimeoutError: + pass async def main() -> None: bar = Bar() dualScreen = len(bar.screens) > 1 - bar.addProvider(StaticProvider(text="i3 workspaces"), alignment=Alignment.LEFT) + bar.addProvider(I3ModeProvider(), alignment=Alignment.LEFT) + bar.addProvider(I3WorkspacesProvider(), alignment=Alignment.LEFT) if dualScreen: bar.addProvider( - StaticProvider(text="i3 title"), screenNum=0, alignment=Alignment.CENTER + I3WindowTitleProvider(), screenNum=0, alignment=Alignment.CENTER ) bar.addProvider( StaticProvider(text="mpris"),