From 9adfcd23778f63d87f33edbef3ef8e6aa09c704e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Geoffrey=20=E2=80=9CFrogeye=E2=80=9D=20Preud=27homme?= Date: Fri, 10 Jan 2025 19:04:14 +0100 Subject: [PATCH] frobar: Now version 3! --- hm/desktop/frobar/.dev/new.py | 1343 ---------------------- hm/desktop/frobar/default.nix | 17 +- hm/desktop/frobar/frobar/__init__.py | 197 ++-- hm/desktop/frobar/frobar/common.py | 617 ++++++++++- hm/desktop/frobar/frobar/display.py | 756 ------------- hm/desktop/frobar/frobar/providers.py | 1477 ++++++++++--------------- hm/desktop/frobar/frobar/updaters.py | 240 ---- hm/desktop/frobar/setup.py | 12 +- 8 files changed, 1329 insertions(+), 3330 deletions(-) delete mode 100644 hm/desktop/frobar/.dev/new.py delete mode 100644 hm/desktop/frobar/frobar/display.py delete mode 100644 hm/desktop/frobar/frobar/updaters.py diff --git a/hm/desktop/frobar/.dev/new.py b/hm/desktop/frobar/.dev/new.py deleted file mode 100644 index 9284bc5..0000000 --- a/hm/desktop/frobar/.dev/new.py +++ /dev/null @@ -1,1343 +0,0 @@ -#!/usr/bin/env python3 - -import asyncio -import collections -import datetime -import enum -import ipaddress -import logging -import os -import signal -import socket -import time -import typing - -import gi -import gi.events -import i3ipc -import i3ipc.aio -import psutil -import pulsectl -import pulsectl_asyncio -import rich.color -import rich.logging -import rich.terminal_theme - -gi.require_version("Playerctl", "2.0") - -import gi.repository.GLib -import gi.repository.Playerctl - -logging.basicConfig( - level="DEBUG", - format="%(message)s", - datefmt="[%X]", - handlers=[rich.logging.RichHandler()], -) -log = logging.getLogger("frobar") - -T = typing.TypeVar("T", bound="ComposableText") -P = typing.TypeVar("P", bound="ComposableText") -C = typing.TypeVar("C", bound="ComposableText") -Sortable = str | int - -# Display utilities - - -def humanSize(numi: int) -> str: - """ - Returns a string of width 3+3 - """ - num = float(numi) - for unit in ("B ", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB"): - if abs(num) < 1000: - if num >= 10: - return f"{int(num):3d}{unit}" - else: - return f"{num:.1f}{unit}" - num /= 1024 - return f"{numi:d}YiB" - - -def ramp(p: float, states: str = " ▁▂▃▄▅▆▇█") -> str: - if p < 0: - return "" - d, m = divmod(p, 1.0) - return states[-1] * int(d) + states[round(m * (len(states) - 1))] - - -def clip(text: str, length: int = 30) -> str: - if len(text) > length: - text = text[: length - 1] + "…" - return text - - -class ComposableText(typing.Generic[P, C]): - - def __init__( - self, - parent: typing.Optional[P] = None, - sortKey: Sortable = 0, - ) -> None: - self.parent: typing.Optional[P] = None - self.children: typing.MutableSequence[C] = list() - self.sortKey = sortKey - if parent: - self.setParent(parent) - self.bar = self.getFirstParentOfType(Bar) - - def setParent(self, parent: P) -> None: - assert self.parent is None - parent.children.append(self) - assert isinstance(parent.children, list) - parent.children.sort(key=lambda c: c.sortKey) - self.parent = parent - self.parent.updateMarkup() - - def unsetParent(self) -> None: - assert self.parent - self.parent.children.remove(self) - self.parent.updateMarkup() - self.parent = None - - def getFirstParentOfType(self, typ: typing.Type[T]) -> T: - parent = self - while not isinstance(parent, typ): - assert parent.parent, f"{self} doesn't have a parent of {typ}" - parent = parent.parent - return parent - - def updateMarkup(self) -> None: - self.bar.refresh.set() - # TODO OPTI See if worth caching the output - - def generateMarkup(self) -> str: - raise NotImplementedError(f"{self} cannot generate markup") - - def getMarkup(self) -> str: - return self.generateMarkup() - - -class Button(enum.Enum): - CLICK_LEFT = "1" - CLICK_MIDDLE = "2" - CLICK_RIGHT = "3" - SCROLL_UP = "4" - SCROLL_DOWN = "5" - - -class Section(ComposableText): - """ - Colorable block separated by chevrons - """ - - def __init__( - self, - parent: "Module", - sortKey: Sortable = 0, - color: rich.color.Color = rich.color.Color.default(), - ) -> None: - super().__init__(parent=parent, sortKey=sortKey) - self.parent: "Module" - self.color = color - - self.desiredText: str | None = None - self.text = "" - self.targetSize = -1 - self.size = -1 - self.animationTask: asyncio.Task | None = None - self.actions: dict[Button, str] = dict() - - def isHidden(self) -> bool: - return self.size < 0 - - # Geometric series, with a cap - ANIM_A = 0.025 - ANIM_R = 0.9 - ANIM_MIN = 0.001 - - async def animate(self) -> None: - increment = 1 if self.size < self.targetSize else -1 - loop = asyncio.get_running_loop() - frameTime = loop.time() - animTime = self.ANIM_A - skipped = 0 - - while self.size != self.targetSize: - self.size += increment - self.updateMarkup() - - animTime *= self.ANIM_R - animTime = max(self.ANIM_MIN, animTime) - frameTime += animTime - sleepTime = frameTime - loop.time() - - # In case of stress, skip refreshing by not awaiting - if sleepTime > 0: - if skipped > 0: - log.warning(f"Skipped {skipped} animation frame(s)") - skipped = 0 - await asyncio.sleep(sleepTime) - else: - skipped += 1 - - def setText(self, text: str | None) -> None: - # OPTI Don't redraw nor reset animation if setting the same text - if self.desiredText == text: - return - self.desiredText = text - if text is None: - self.text = "" - self.targetSize = -1 - else: - self.text = f" {text} " - self.targetSize = len(self.text) - if self.animationTask: - self.animationTask.cancel() - # OPTI Skip the whole animation task if not required - if self.size == self.targetSize: - self.updateMarkup() - else: - self.animationTask = self.bar.taskGroup.create_task(self.animate()) - - def setAction(self, button: Button, callback: typing.Callable | None) -> None: - if button in self.actions: - command = self.actions[button] - self.bar.removeAction(command) - del self.actions[button] - if callback: - command = self.bar.addAction(callback) - self.actions[button] = command - - def generateMarkup(self) -> str: - assert not self.isHidden() - pad = max(0, self.size - len(self.text)) - text = self.text[: self.size] + " " * pad - for button, command in self.actions.items(): - text = "%{A" + button.value + ":" + command + ":}" + text + "%{A}" - return text - - -class Module(ComposableText): - """ - Sections handled by a same updater - """ - - def __init__(self, parent: "Side") -> None: - super().__init__(parent=parent) - self.parent: "Side" - self.children: typing.MutableSequence[Section] - - self.mirroring: Module | None = None - self.mirrors: list[Module] = list() - - def mirror(self, module: "Module") -> None: - self.mirroring = module - module.mirrors.append(self) - - def getSections(self) -> typing.Sequence[Section]: - if self.mirroring: - return self.mirroring.children - else: - return self.children - - def updateMarkup(self) -> None: - super().updateMarkup() - for mirror in self.mirrors: - mirror.updateMarkup() - - -class Alignment(enum.Enum): - LEFT = "l" - RIGHT = "r" - CENTER = "c" - - -class Side(ComposableText): - def __init__(self, parent: "Screen", alignment: Alignment) -> None: - super().__init__(parent=parent) - self.parent: Screen - self.children: typing.MutableSequence[Module] = [] - - self.alignment = alignment - self.bar = parent.getFirstParentOfType(Bar) - - def generateMarkup(self) -> str: - if not self.children: - return "" - text = "%{" + self.alignment.value + "}" - lastSection: Section | None = None - for module in self.children: - for section in module.getSections(): - if section.isHidden(): - continue - hexa = section.color.get_truecolor(theme=self.bar.theme).hex - if lastSection is None: - if self.alignment == Alignment.LEFT: - text += "%{B" + hexa + "}%{F-}" - else: - text += "%{B-}%{F" + hexa + "}%{R}%{F-}" - elif isinstance(lastSection, SpacerSection): - text += "%{B-}%{F" + hexa + "}%{R}%{F-}" - else: - if self.alignment == Alignment.RIGHT: - if lastSection.color == section.color: - text += "" - else: - text += "%{F" + hexa + "}%{R}" - else: - if lastSection.color == section.color: - text += "" - else: - text += "%{R}%{B" + hexa + "}" - text += "%{F-}" - text += section.getMarkup() - lastSection = section - if self.alignment != Alignment.RIGHT and lastSection: - text += "%{R}%{B-}" - return text - - -class Screen(ComposableText): - def __init__(self, parent: "Bar", output: str) -> None: - super().__init__(parent=parent) - self.parent: "Bar" - self.children: typing.MutableSequence[Side] - - self.output = output - - for alignment in Alignment: - Side(parent=self, alignment=alignment) - - def generateMarkup(self) -> str: - return ("%{Sn" + self.output + "}") + "".join( - side.getMarkup() for side in self.children - ) - - -class Bar(ComposableText): - """ - Top-level - """ - - def __init__( - self, - theme: rich.terminal_theme.TerminalTheme = rich.terminal_theme.DEFAULT_TERMINAL_THEME, - ) -> None: - super().__init__() - self.parent: None - self.children: typing.MutableSequence[Screen] - self.longRunningTasks: list[asyncio.Task] = list() - self.theme = theme - - self.refresh = asyncio.Event() - self.taskGroup = asyncio.TaskGroup() - self.providers: list["Provider"] = list() - self.actionIndex = 0 - self.actions: dict[str, typing.Callable] = dict() - - self.periodicProviderTask: typing.Coroutine | None = None - - i3 = i3ipc.Connection() - for output in i3.get_outputs(): - if not output.active: - continue - Screen(parent=self, output=output.name) - - def addLongRunningTask(self, coro: typing.Coroutine) -> None: - task = self.taskGroup.create_task(coro) - self.longRunningTasks.append(task) - - async def run(self) -> None: - cmd = [ - "lemonbar", - "-b", - "-a", - "64", - "-f", - "DejaVuSansM Nerd Font:size=10", - "-F", - self.theme.foreground_color.hex, - "-B", - self.theme.background_color.hex, - ] - proc = await asyncio.create_subprocess_exec( - *cmd, stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE - ) - - async def refresher() -> None: - assert proc.stdin - while True: - await self.refresh.wait() - self.refresh.clear() - markup = self.getMarkup() - proc.stdin.write(markup.encode()) - - async def actionHandler() -> None: - assert proc.stdout - while True: - line = await proc.stdout.readline() - command = line.decode().strip() - callback = self.actions[command] - callback() - - async with self.taskGroup: - self.addLongRunningTask(refresher()) - self.addLongRunningTask(actionHandler()) - for provider in self.providers: - self.addLongRunningTask(provider.run()) - - def exit() -> None: - log.info("Terminating") - for task in self.longRunningTasks: - task.cancel() - - loop = asyncio.get_event_loop() - loop.add_signal_handler(signal.SIGINT, exit) - - def generateMarkup(self) -> str: - return "".join(screen.getMarkup() for screen in self.children) + "\n" - - def addProvider( - self, - provider: "Provider", - alignment: Alignment = Alignment.LEFT, - screenNum: int | None = None, - ) -> None: - """ - screenNum: the provider will be added on this screen if set, all otherwise - """ - modules = list() - for s, screen in enumerate(self.children): - if screenNum is None or s == screenNum: - side = next(filter(lambda s: s.alignment == alignment, screen.children)) - module = Module(parent=side) - modules.append(module) - provider.modules = modules - if modules: - self.providers.append(provider) - - def addAction(self, callback: typing.Callable) -> str: - command = f"{self.actionIndex:x}" - self.actions[command] = callback - self.actionIndex += 1 - return command - - def removeAction(self, command: str) -> None: - del self.actions[command] - - -class Provider: - sectionType: type[Section] = Section - - def __init__(self, color: rich.color.Color = rich.color.Color.default()) -> None: - self.modules: list[Module] = list() - self.color = color - - async def run(self) -> None: - # Not a NotImplementedError, otherwise can't combine all classes - pass - - -class MirrorProvider(Provider): - def __init__(self, color: rich.color.Color = rich.color.Color.default()) -> None: - super().__init__(color=color) - self.module: Module - - async def run(self) -> None: - await super().run() - self.module = self.modules[0] - for module in self.modules[1:]: - module.mirror(self.module) - - -class SingleSectionProvider(MirrorProvider): - async def run(self) -> None: - await super().run() - self.section = self.sectionType(parent=self.module, color=self.color) - - -class StaticProvider(SingleSectionProvider): - def __init__( - self, text: str, color: rich.color.Color = rich.color.Color.default() - ) -> None: - super().__init__(color=color) - self.text = text - - async def run(self) -> None: - await super().run() - self.section.setText(self.text) - - -class SpacerSection(Section): - pass - - -class SpacerProvider(SingleSectionProvider): - sectionType = SpacerSection - - def __init__(self, length: int = 5) -> None: - super().__init__(color=rich.color.Color.default()) - self.length = length - - async def run(self) -> None: - await super().run() - assert isinstance(self.section, SpacerSection) - self.section.setText(" " * self.length) - - -class StatefulSection(Section): - - def __init__( - self, - parent: Module, - sortKey: Sortable = 0, - color: rich.color.Color = rich.color.Color.default(), - ) -> None: - super().__init__(parent=parent, sortKey=sortKey, color=color) - self.state = 0 - self.numberStates: int - - self.setAction(Button.CLICK_LEFT, self.incrementState) - self.setAction(Button.CLICK_RIGHT, self.decrementState) - - def incrementState(self) -> None: - self.state += 1 - self.changeState() - - def decrementState(self) -> None: - self.state -= 1 - self.changeState() - - def setChangedState(self, callback: typing.Callable) -> None: - self.callback = callback - - def changeState(self) -> None: - self.state %= self.numberStates - self.bar.taskGroup.create_task(self.callback()) - - -class StatefulSectionProvider(Provider): - sectionType = StatefulSection - - -class SingleStatefulSectionProvider(StatefulSectionProvider, SingleSectionProvider): - section: StatefulSection - - -class MultiSectionsProvider(Provider): - - def __init__(self, color: rich.color.Color = rich.color.Color.default()) -> None: - super().__init__(color=color) - self.sectionKeys: dict[Module, dict[Sortable, Section]] = ( - collections.defaultdict(dict) - ) - self.updaters: dict[Section, typing.Callable] = dict() - - async def getSectionUpdater(self, section: Section) -> typing.Callable: - raise NotImplementedError() - - async def updateSections(self, sections: set[Sortable], module: Module) -> None: - moduleSections = self.sectionKeys[module] - async with asyncio.TaskGroup() as tg: - for sortKey in sections: - section = moduleSections.get(sortKey) - if not section: - section = self.sectionType( - parent=module, sortKey=sortKey, color=self.color - ) - self.updaters[section] = await self.getSectionUpdater(section) - moduleSections[sortKey] = section - - updater = self.updaters[section] - tg.create_task(updater()) - - missingKeys = set(moduleSections.keys()) - sections - for missingKey in missingKeys: - section = moduleSections.get(missingKey) - assert section - section.setText(None) - - -class PeriodicProvider(Provider): - async def init(self) -> None: - pass - - async def loop(self) -> None: - raise NotImplementedError() - - @classmethod - async def task(cls, bar: Bar) -> None: - providers = list() - for provider in bar.providers: - if isinstance(provider, PeriodicProvider): - providers.append(provider) - await provider.init() - - while True: - # TODO Block bar update during the periodic update of the loops - loops = [provider.loop() for provider in providers] - asyncio.gather(*loops) - - now = datetime.datetime.now() - # Hardcoded to 1 second... not sure if we want some more than that, - # and if the logic to check if a task should run would be a win - # compared to the task itself - remaining = 1 - now.microsecond / 1000000 - await asyncio.sleep(remaining) - - async def run(self) -> None: - await super().run() - for module in self.modules: - bar = module.getFirstParentOfType(Bar) - assert bar - if not bar.periodicProviderTask: - bar.periodicProviderTask = PeriodicProvider.task(bar) - bar.addLongRunningTask(bar.periodicProviderTask) - - -class PeriodicStatefulProvider(SingleStatefulSectionProvider, PeriodicProvider): - async def run(self) -> None: - await super().run() - self.section.setChangedState(self.loop) - - -# Providers - - -class I3ModeProvider(SingleSectionProvider): - def on_mode(self, i3: i3ipc.Connection, e: i3ipc.Event) -> None: - self.section.setText(None if e.change == "default" else e.change) - - async def run(self) -> None: - await super().run() - i3 = await i3ipc.aio.Connection(auto_reconnect=True).connect() - i3.on(i3ipc.Event.MODE, self.on_mode) - await i3.main() - - # TODO Hide WorkspaceProvider when this is active - - -class I3WindowTitleProvider(SingleSectionProvider): - # TODO FEAT To make this available from start, we need to find the - # `focused=True` element following the `focus` array - def on_window(self, i3: i3ipc.Connection, e: i3ipc.Event) -> None: - self.section.setText(e.container.name) - - async def run(self) -> None: - await super().run() - i3 = await i3ipc.aio.Connection(auto_reconnect=True).connect() - i3.on(i3ipc.Event.WINDOW, self.on_window) - await i3.main() - - -class I3WorkspacesProvider(MultiSectionsProvider): - COLOR_URGENT = rich.color.Color.parse("red") - COLOR_FOCUSED = rich.color.Color.parse("yellow") - # TODO Should be orange (not a terminal color) - COLOR_VISIBLE = rich.color.Color.parse("cyan") - COLOR_DEFAULT = rich.color.Color.parse("bright_black") - - def __init__( - self, - custom_names: dict[str, str] = {}, - ) -> None: - super().__init__() - self.workspaces: dict[int, i3ipc.WorkspaceReply] - self.custom_names = custom_names - - self.sections: dict[int, Section] = dict() - self.modulesFromOutput: dict[str, Module] = dict() - self.bar: Bar - - async def getSectionUpdater(self, section: Section) -> typing.Callable: - assert isinstance(section.sortKey, int) - num = section.sortKey - - def switch_to_workspace() -> None: - self.bar.taskGroup.create_task(self.i3.command(f"workspace number {num}")) - - section.setAction(Button.CLICK_LEFT, switch_to_workspace) - - async def update() -> None: - workspace = self.workspaces[num] - name = workspace.name - if workspace.urgent: - section.color = self.COLOR_URGENT - elif workspace.focused: - section.color = self.COLOR_FOCUSED - elif workspace.visible: - section.color = self.COLOR_VISIBLE - else: - section.color = self.COLOR_DEFAULT - if workspace.focused: - name = self.custom_names.get(name, name) - section.setText(name) - - return update - - async def updateWorkspaces(self) -> None: - """ - Since the i3 IPC interface cannot really tell you by events - when workspaces get invisible or not urgent anymore. - Relying on those exclusively would require reimplementing some of i3 logic. - Fetching all the workspaces on event looks ugly but is the most maintainable. - Times I tried to challenge this and failed: 2. - """ - workspaces = await self.i3.get_workspaces() - self.workspaces = dict() - modules = collections.defaultdict(set) - for workspace in workspaces: - self.workspaces[workspace.num] = workspace - module = self.modulesFromOutput[workspace.output] - modules[module].add(workspace.num) - - await asyncio.gather( - *[self.updateSections(nums, module) for module, nums in modules.items()] - ) - - def onWorkspaceChange( - self, i3: i3ipc.Connection, e: i3ipc.Event | None = None - ) -> None: - # Cancelling the task doesn't seem to prevent performance double-events - self.bar.taskGroup.create_task(self.updateWorkspaces()) - - async def run(self) -> None: - for module in self.modules: - screen = module.getFirstParentOfType(Screen) - output = screen.output - self.modulesFromOutput[output] = module - self.bar = module.bar - - self.i3 = await i3ipc.aio.Connection(auto_reconnect=True).connect() - self.i3.on(i3ipc.Event.WORKSPACE, self.onWorkspaceChange) - self.onWorkspaceChange(self.i3) - await self.i3.main() - - -class MprisProvider(MirrorProvider): - - STATUSES = { - gi.repository.Playerctl.PlaybackStatus.PLAYING: "", - gi.repository.Playerctl.PlaybackStatus.PAUSED: "", - gi.repository.Playerctl.PlaybackStatus.STOPPED: "", - } - - PROVIDERS = { - "mpd": "", - "firefox": "", - "chromium": "", - "mpv": "", - } - - async def run(self) -> None: - await super().run() - self.status = self.sectionType(parent=self.module, color=self.color) - self.album = self.sectionType(parent=self.module, color=self.color) - self.artist = self.sectionType(parent=self.module, color=self.color) - self.title = self.sectionType(parent=self.module, color=self.color) - - self.manager = gi.repository.Playerctl.PlayerManager() - self.manager.connect("name-appeared", self.on_name_appeared) - self.manager.connect("player-vanished", self.on_player_vanished) - - self.playerctldName = gi.repository.Playerctl.PlayerName() - self.playerctldName.name = "playerctld" - self.playerctldName.source = gi.repository.Playerctl.Source.DBUS_SESSION - - self.player: gi.repository.Playerctl.Player | None = None - self.playing = asyncio.Event() - - for name in self.manager.props.player_names: - self.init_player(name) - - self.updateSections() - - while True: - # Occasionally it will skip a second - # but haven't managed to reproduce with debug info - await self.playing.wait() - self.updateTitle() - if self.player: - pos = self.player.props.position - rem = 1 - (pos % 1000000) / 1000000 - await asyncio.sleep(rem) - else: - self.playing.clear() - - @staticmethod - def get( - something: gi.overrides.GLib.Variant, key: str, default: typing.Any = None - ) -> typing.Any: - if key in something.keys(): - return something[key] - else: - return default - - @staticmethod - def formatUs(ms: int) -> str: - if ms < 60 * 60 * 1000000: - return time.strftime("%M:%S", time.gmtime(ms // 1000000)) - else: - return str(datetime.timedelta(microseconds=ms)) - - def findCurrentPlayer(self) -> None: - for name in [self.playerctldName] + self.manager.props.player_names: - # TODO Test what happens when playerctld is not available - self.player = gi.repository.Playerctl.Player.new_from_name(name) - if not self.player.props.can_play: - continue - break - else: - self.player = None - - def updateSections(self) -> None: - self.findCurrentPlayer() - - if self.player is None: - self.status.setText(None) - self.album.setText(None) - self.artist.setText(None) - self.title.setText(None) - self.playing.clear() - return - - player = self.player.props.player_name - player = self.PROVIDERS.get(player, player) - status = self.STATUSES.get(self.player.props.playback_status, "?") - self.status.setText(f"{player} {status}") - - if ( - self.player.props.playback_status - == gi.repository.Playerctl.PlaybackStatus.PLAYING - ): - self.playing.set() - else: - self.playing.clear() - - metadata = self.player.props.metadata - - album = self.get(metadata, "xesam:album") - if album: - self.album.setText(f" {clip(album)}") - else: - self.album.setText(None) - - artists = self.get(metadata, "xesam:artist") - if artists: - artist = ", ".join(artists) - self.artist.setText(f" {clip(artist)}") - else: - self.artist.setText(None) - - self.updateTitle() - - def updateTitle(self) -> None: - if self.player is None: - return - - metadata = self.player.props.metadata - pos = self.player.props.position # In µs - text = f" {self.formatUs(pos)}" - dur = self.get(metadata, "mpris:length") - if dur: - text += f"/{self.formatUs(dur)}" - title = self.get(metadata, "xesam:title") - if title: - text += f" {clip(title)}" - self.title.setText(text) - - def on_player_vanished( - self, - manager: gi.repository.Playerctl.PlayerManager, - player: gi.repository.Playerctl.Player, - ) -> None: - self.updateSections() - - def on_event( - self, - player: gi.repository.Playerctl.Player, - _: typing.Any, - manager: gi.repository.Playerctl.PlayerManager, - ) -> None: - self.updateSections() - - def init_player(self, name: gi.repository.Playerctl.PlayerName) -> None: - player = gi.repository.Playerctl.Player.new_from_name(name) - # All events will cause the active player to change, - # so we listen on all events, even if the display won't change - player.connect("playback-status", self.on_event, self.manager) - player.connect("loop-status", self.on_event, self.manager) - player.connect("shuffle", self.on_event, self.manager) - player.connect("metadata", self.on_event, self.manager) - player.connect("volume", self.on_event, self.manager) - player.connect("seeked", self.on_event, self.manager) - self.manager.manage_player(player) - - def on_name_appeared( - self, manager: gi.repository.Playerctl.PlayerManager, name: str - ) -> None: - self.init_player(name) - self.updateSections() - - -class AlertingProvider(Provider): - COLOR_NORMAL = rich.color.Color.parse("green") - COLOR_WARNING = rich.color.Color.parse("yellow") - COLOR_DANGER = rich.color.Color.parse("red") - - warningThreshold: float - dangerThreshold: float - - def updateLevel(self, level: float) -> None: - if level > self.dangerThreshold: - color = self.COLOR_DANGER - elif level > self.warningThreshold: - color = self.COLOR_WARNING - else: - color = self.COLOR_NORMAL - for module in self.modules: - for section in module.getSections(): - section.color = color - - -class CpuProvider(AlertingProvider, PeriodicStatefulProvider): - async def init(self) -> None: - self.section.numberStates = 3 - self.warningThreshold = 75 - self.dangerThreshold = 95 - - async def loop(self) -> None: - percent = psutil.cpu_percent(percpu=False) - self.updateLevel(percent) - - text = "" - if self.section.state >= 2: - percents = psutil.cpu_percent(percpu=True) - text += " " + "".join([ramp(p / 100) for p in percents]) - elif self.section.state >= 1: - text += " " + ramp(percent / 100) - self.section.setText(text) - - -class LoadProvider(AlertingProvider, PeriodicStatefulProvider): - async def init(self) -> None: - self.section.numberStates = 3 - self.warningThreshold = 5 - self.dangerThreshold = 10 - - async def loop(self) -> None: - load = os.getloadavg() - self.updateLevel(load[0]) - - text = "" - loads = 3 if self.section.state >= 2 else self.section.state - for load_index in range(loads): - text += f" {load[load_index]:.2f}" - self.section.setText(text) - - -class RamProvider(AlertingProvider, PeriodicStatefulProvider): - - async def init(self) -> None: - self.section.numberStates = 4 - self.warningThreshold = 75 - self.dangerThreshold = 95 - - async def loop(self) -> None: - mem = psutil.virtual_memory() - self.updateLevel(mem.percent) - - text = "" - if self.section.state >= 1: - text += " " + ramp(mem.percent / 100) - if self.section.state >= 2: - text += humanSize(mem.total - mem.available) - if self.section.state >= 3: - text += "/" + humanSize(mem.total) - self.section.setText(text) - - -class TemperatureProvider(AlertingProvider, PeriodicStatefulProvider): - RAMP = "" - MAIN_TEMPS = ["coretemp", "amdgpu", "cpu_thermal"] - # For Intel, AMD and ARM respectively. - - main: str - - async def init(self) -> None: - self.section.numberStates = 2 - - allTemp = psutil.sensors_temperatures() - for main in self.MAIN_TEMPS: - if main in allTemp: - self.main = main - break - else: - raise IndexError("Could not find suitable temperature sensor") - - temp = allTemp[self.main][0] - self.warningThreshold = temp.high or 90.0 - self.dangerThreshold = temp.critical or 100.0 - - async def loop(self) -> None: - allTemp = psutil.sensors_temperatures() - temp = allTemp[self.main][0] - self.updateLevel(temp.current) - - text = ramp(temp.current / self.warningThreshold, self.RAMP) - if self.section.state >= 1: - text += f" {temp.current:.0f}°C" - self.section.setText(text) - - -class BatteryProvider(AlertingProvider, PeriodicStatefulProvider): - # TODO Support ACPID for events - RAMP = "" - - async def init(self) -> None: - self.section.numberStates = 3 - # TODO 1 refresh rate is too quick - - self.warningThreshold = 75 - self.dangerThreshold = 95 - - async def loop(self) -> None: - bat = psutil.sensors_battery() - if not bat: - self.section.setText(None) - - self.updateLevel(100 - bat.percent) - - text = "" if bat.power_plugged else "" - text += ramp(bat.percent / 100, self.RAMP) - - if self.section.state >= 1: - text += f" {bat.percent:.0f}%" - if self.section.state >= 2: - h = int(bat.secsleft / 3600) - m = int((bat.secsleft - h * 3600) / 60) - text += f" ({h:d}:{m:02d})" - - self.section.setText(text) - - -class PulseaudioProvider( - MirrorProvider, StatefulSectionProvider, MultiSectionsProvider -): - async def getSectionUpdater(self, section: Section) -> typing.Callable: - assert isinstance(section, StatefulSection) - assert isinstance(section.sortKey, str) - - sink = self.sinks[section.sortKey] - - if ( - sink.port_active.name == "analog-output-headphones" - or sink.port_active.description == "Headphones" - ): - icon = "" - elif ( - sink.port_active.name == "analog-output-speaker" - or sink.port_active.description == "Speaker" - ): - icon = "" - elif sink.port_active.name in ("headset-output", "headphone-output"): - icon = "" - else: - icon = "?" - - section.numberStates = 3 - section.state = 1 - - # TODO Change volume with wheel - - async def updater() -> None: - assert isinstance(section, StatefulSection) - text = icon - sink = self.sinks[section.sortKey] - - async with pulsectl_asyncio.PulseAsync("frobar-get-volume") as pulse: - vol = await pulse.volume_get_all_chans(sink) - if section.state == 1: - text += f" {ramp(vol)}" - elif section.state == 2: - text += f" {vol:.0%}" - # TODO Show which is default - section.setText(text) - - section.setChangedState(updater) - - return updater - - async def update(self) -> None: - async with pulsectl_asyncio.PulseAsync("frobar-list-sinks") as pulse: - self.sinks = dict((sink.name, sink) for sink in await pulse.sink_list()) - await self.updateSections(set(self.sinks.keys()), self.module) - - async def run(self) -> None: - await super().run() - await self.update() - async with pulsectl_asyncio.PulseAsync("frobar-events-listener") as pulse: - async for event in pulse.subscribe_events(pulsectl.PulseEventMaskEnum.sink): - await self.update() - - -class NetworkProvider( - MirrorProvider, PeriodicProvider, StatefulSectionProvider, MultiSectionsProvider -): - def __init__( - self, - color: rich.color.Color = rich.color.Color.default(), - ) -> None: - super().__init__(color=color) - - async def init(self) -> None: - loop = asyncio.get_running_loop() - self.time = loop.time() - self.io_counters = psutil.net_io_counters(pernic=True) - - async def doNothing(self) -> None: - pass - - @staticmethod - def getIfaceAttributes(iface: str) -> tuple[bool, str, bool]: - relevant = True - icon = "?" - wifi = False - if iface == "lo": - relevant = False - elif iface.startswith("eth") or iface.startswith("enp"): - if "u" in iface: - icon = "" - else: - icon = "" - elif iface.startswith("wlan") or iface.startswith("wl"): - icon = "" - wifi = True - elif ( - iface.startswith("tun") or iface.startswith("tap") or iface.startswith("wg") - ): - icon = "" - - elif iface.startswith("docker"): - icon = "" - elif iface.startswith("veth"): - icon = "" - elif iface.startswith("vboxnet"): - icon = "" - - return relevant, icon, wifi - - async def getSectionUpdater(self, section: Section) -> typing.Callable: - - assert isinstance(section, StatefulSection) - assert isinstance(section.sortKey, str) - iface = section.sortKey - - relevant, icon, wifi = self.getIfaceAttributes(iface) - - if not relevant: - return self.doNothing - - section.numberStates = 5 if wifi else 4 - section.state = 1 if wifi else 0 - - async def update() -> None: - assert isinstance(section, StatefulSection) - - if not self.if_stats[iface].isup: - section.setText(None) - return - - text = icon - - state = section.state + (0 if wifi else 1) - if wifi and state >= 1: # SSID - cmd = ["iwgetid", iface, "--raw"] - proc = await asyncio.create_subprocess_exec( - *cmd, stdout=asyncio.subprocess.PIPE - ) - stdout, stderr = await proc.communicate() - text += f" {stdout.decode().strip()}" - - if state >= 2: # Address - for address in self.if_addrs[iface]: - if address.family == socket.AF_INET: - net = ipaddress.IPv4Network( - (address.address, address.netmask), strict=False - ) - text += f" {address.address}/{net.prefixlen}" - break - - if state >= 3: # Speed - prevRecv = self.prev_io_counters[iface].bytes_recv - recv = self.io_counters[iface].bytes_recv - prevSent = self.prev_io_counters[iface].bytes_sent - sent = self.io_counters[iface].bytes_sent - dt = self.time - self.prev_time - - recvDiff = (recv - prevRecv) / dt - sentDiff = (sent - prevSent) / dt - text += f" ↓{humanSize(recvDiff)}↑{humanSize(sentDiff)}" - - if state >= 4: # Counter - text += f" ⇓{humanSize(recv)}⇑{humanSize(sent)}" - - section.setText(text) - - section.setChangedState(update) - - return update - - async def loop(self) -> None: - loop = asyncio.get_running_loop() - - self.prev_io_counters = self.io_counters - self.prev_time = self.time - # On-demand would only benefit if_addrs: - # stats are used to determine display, - # and we want to keep previous io_counters - # so displaying stats is ~instant. - self.time = loop.time() - self.if_stats = psutil.net_if_stats() - self.if_addrs = psutil.net_if_addrs() - self.io_counters = psutil.net_io_counters(pernic=True) - - await self.updateSections(set(self.if_stats.keys()), self.module) - - -class TimeProvider(PeriodicStatefulProvider): - FORMATS = ["%H:%M", "%m-%d %H:%M:%S", "%a %y-%m-%d %H:%M:%S"] - - async def init(self) -> None: - self.section.state = 1 - self.section.numberStates = len(self.FORMATS) - - async def loop(self) -> None: - now = datetime.datetime.now() - format = self.FORMATS[self.section.state] - self.section.setText(now.strftime(format)) - - -async def main() -> None: - # TODO Configurable - FROGARIZED = [ - "#092c0e", - "#143718", - "#5a7058", - "#677d64", - "#89947f", - "#99a08d", - "#fae2e3", - "#fff0f1", - "#e0332e", - "#cf4b15", - "#bb8801", - "#8d9800", - "#1fa198", - "#008dd1", - "#5c73c4", - "#d43982", - ] - # TODO Not super happy with the color management, - # while using an existing library is great, it's limited to ANSI colors - - def base16_color(color: int) -> tuple[int, int, int]: - hexa = FROGARIZED[color] - return tuple(rich.color.parse_rgb_hex(hexa[1:])) - - theme = rich.terminal_theme.TerminalTheme( - base16_color(0x0), - base16_color(0x0), # TODO should be 7, currently 0 so it's compatible with v2 - [ - base16_color(0x0), # black - base16_color(0x8), # red - base16_color(0xB), # green - base16_color(0xA), # yellow - base16_color(0xD), # blue - base16_color(0xE), # magenta - base16_color(0xC), # cyan - base16_color(0x5), # white - ], - [ - base16_color(0x3), # bright black - base16_color(0x8), # bright red - base16_color(0xB), # bright green - base16_color(0xA), # bright yellow - base16_color(0xD), # bright blue - base16_color(0xE), # bright magenta - base16_color(0xC), # bright cyan - base16_color(0x7), # bright white - ], - ) - - bar = Bar(theme=theme) - dualScreen = len(bar.children) > 1 - leftPreferred = 0 if dualScreen else None - rightPreferred = 1 if dualScreen else None - - workspaces_suffixes = "▲■" - workspaces_names = dict( - (str(i + 1), f"{i+1} {c}") for i, c in enumerate(workspaces_suffixes) - ) - - color = rich.color.Color.parse - - bar.addProvider(I3ModeProvider(color=color("red")), alignment=Alignment.LEFT) - bar.addProvider( - I3WorkspacesProvider(custom_names=workspaces_names), alignment=Alignment.LEFT - ) - - if dualScreen: - bar.addProvider( - I3WindowTitleProvider(color=color("white")), - screenNum=0, - alignment=Alignment.CENTER, - ) - bar.addProvider( - MprisProvider(color=color("bright_white")), - screenNum=rightPreferred, - alignment=Alignment.CENTER, - ) - else: - bar.addProvider( - SpacerProvider(), - alignment=Alignment.LEFT, - ) - bar.addProvider( - MprisProvider(color=color("bright_white")), - alignment=Alignment.LEFT, - ) - - bar.addProvider(CpuProvider(), screenNum=leftPreferred, alignment=Alignment.RIGHT) - bar.addProvider(LoadProvider(), screenNum=leftPreferred, alignment=Alignment.RIGHT) - bar.addProvider(RamProvider(), screenNum=leftPreferred, alignment=Alignment.RIGHT) - bar.addProvider( - TemperatureProvider(), - screenNum=leftPreferred, - alignment=Alignment.RIGHT, - ) - bar.addProvider( - BatteryProvider(), screenNum=leftPreferred, alignment=Alignment.RIGHT - ) - bar.addProvider( - PulseaudioProvider(color=color("magenta")), - screenNum=rightPreferred, - alignment=Alignment.RIGHT, - ) - bar.addProvider( - NetworkProvider(color=color("blue")), - screenNum=leftPreferred, - alignment=Alignment.RIGHT, - ) - bar.addProvider(TimeProvider(color=color("cyan")), alignment=Alignment.RIGHT) - - await bar.run() - - -if __name__ == "__main__": - # Using GLib's event loop so we can run GLib's code - policy = gi.events.GLibEventLoopPolicy() - asyncio.set_event_loop_policy(policy) - loop = policy.get_event_loop() - loop.run_until_complete(main()) diff --git a/hm/desktop/frobar/default.nix b/hm/desktop/frobar/default.nix index b966486..28bfbfc 100644 --- a/hm/desktop/frobar/default.nix +++ b/hm/desktop/frobar/default.nix @@ -22,18 +22,13 @@ in # is called pyton-mpd2 on PyPi but mpd2 in nixpkgs. pkgs.python3Packages.buildPythonApplication rec { pname = "frobar"; - version = "2.0"; + version = "3.0"; propagatedBuildInputs = with pkgs.python3Packages; [ - coloredlogs # old only i3ipc - mpd2 - notmuch psutil pulsectl-asyncio - pulsectl # old only pygobject3 - pyinotify rich ]; nativeBuildInputs = @@ -42,9 +37,15 @@ pkgs.python3Packages.buildPythonApplication rec { wirelesstools playerctl ]); - makeWrapperArgs = [ "--prefix PATH : ${pkgs.lib.makeBinPath nativeBuildInputs}" ]; + makeWrapperArgs = [ + "--prefix PATH : ${pkgs.lib.makeBinPath nativeBuildInputs}" + "--prefix GI_TYPELIB_PATH : ${GI_TYPELIB_PATH}" + ]; - GI_TYPELIB_PATH = pkgs.lib.makeSearchPath "lib/girepository-1.0" [ pkgs.glib.out pkgs.playerctl ]; + GI_TYPELIB_PATH = pkgs.lib.makeSearchPath "lib/girepository-1.0" [ + pkgs.glib.out + pkgs.playerctl + ]; src = ./.; } diff --git a/hm/desktop/frobar/frobar/__init__.py b/hm/desktop/frobar/frobar/__init__.py index de8d07a..29b120c 100644 --- a/hm/desktop/frobar/frobar/__init__.py +++ b/hm/desktop/frobar/frobar/__init__.py @@ -1,77 +1,146 @@ -#!/usr/bin/env python3 +import rich.color +import rich.logging +import rich.terminal_theme -from frobar import providers as fp -from frobar.display import Bar, BarGroupType -from frobar.updaters import Updater - -# TODO If multiple screen, expand the sections and share them -# TODO Graceful exit +import frobar.common +import frobar.providers +from frobar.common import Alignment -def run() -> None: - Bar.init() - Updater.init() +def main() -> None: + # TODO Configurable + FROGARIZED = [ + "#092c0e", + "#143718", + "#5a7058", + "#677d64", + "#89947f", + "#99a08d", + "#fae2e3", + "#fff0f1", + "#e0332e", + "#cf4b15", + "#bb8801", + "#8d9800", + "#1fa198", + "#008dd1", + "#5c73c4", + "#d43982", + ] + # TODO Not super happy with the color management, + # while using an existing library is great, it's limited to ANSI colors - # Bar.addSectionAll(fp.CpuProvider(), BarGroupType.RIGHT) - # Bar.addSectionAll(fp.NetworkProvider(theme=2), BarGroupType.RIGHT) + def base16_color(color: int) -> tuple[int, int, int]: + hexa = FROGARIZED[color] + return tuple(rich.color.parse_rgb_hex(hexa[1:])) - WORKSPACE_THEME = 8 - FOCUS_THEME = 2 - URGENT_THEME = 0 - CUSTOM_SUFFIXES = "▲■" - - customNames = dict() - for i in range(len(CUSTOM_SUFFIXES)): - short = str(i + 1) - full = short + " " + CUSTOM_SUFFIXES[i] - customNames[short] = full - Bar.addSectionAll( - fp.I3WorkspacesProvider( - theme=WORKSPACE_THEME, - themeFocus=FOCUS_THEME, - themeUrgent=URGENT_THEME, - themeMode=URGENT_THEME, - customNames=customNames, - ), - BarGroupType.LEFT, + theme = rich.terminal_theme.TerminalTheme( + base16_color(0x0), + base16_color(0x0), # TODO should be 7, currently 0 so it's compatible with v2 + [ + base16_color(0x0), # black + base16_color(0x8), # red + base16_color(0xB), # green + base16_color(0xA), # yellow + base16_color(0xD), # blue + base16_color(0xE), # magenta + base16_color(0xC), # cyan + base16_color(0x5), # white + ], + [ + base16_color(0x3), # bright black + base16_color(0x8), # bright red + base16_color(0xB), # bright green + base16_color(0xA), # bright yellow + base16_color(0xD), # bright blue + base16_color(0xE), # bright magenta + base16_color(0xC), # bright cyan + base16_color(0x7), # bright white + ], ) - # TODO Middle - Bar.addSectionAll(fp.MprisProvider(theme=9), BarGroupType.LEFT) - # Bar.addSectionAll(fp.MpdProvider(theme=9), BarGroupType.LEFT) - # Bar.addSectionAll(I3WindowTitleProvider(), BarGroupType.LEFT) + bar = frobar.common.Bar(theme=theme) + dualScreen = len(bar.children) > 1 + leftPreferred = 0 if dualScreen else None + rightPreferred = 1 if dualScreen else None - # TODO Computer modes + workspaces_suffixes = "▲■" + workspaces_names = dict( + (str(i + 1), f"{i+1} {c}") for i, c in enumerate(workspaces_suffixes) + ) - Bar.addSectionAll(fp.CpuProvider(), BarGroupType.RIGHT) - Bar.addSectionAll(fp.LoadProvider(), BarGroupType.RIGHT) - Bar.addSectionAll(fp.RamProvider(), BarGroupType.RIGHT) - Bar.addSectionAll(fp.TemperatureProvider(), BarGroupType.RIGHT) - Bar.addSectionAll(fp.BatteryProvider(), BarGroupType.RIGHT) + color = rich.color.Color.parse - # Peripherals - PERIPHERAL_THEME = 6 - NETWORK_THEME = 5 - # TODO Disk space provider - # TODO Screen (connected, autorandr configuration, bbswitch) provider - Bar.addSectionAll(fp.XautolockProvider(theme=PERIPHERAL_THEME), BarGroupType.RIGHT) - Bar.addSectionAll(fp.PulseaudioProvider(theme=PERIPHERAL_THEME), BarGroupType.RIGHT) - Bar.addSectionAll(fp.RfkillProvider(theme=PERIPHERAL_THEME), BarGroupType.RIGHT) - Bar.addSectionAll(fp.NetworkProvider(theme=NETWORK_THEME), BarGroupType.RIGHT) + bar.addProvider( + frobar.providers.I3ModeProvider(color=color("red")), alignment=Alignment.LEFT + ) + bar.addProvider( + frobar.providers.I3WorkspacesProvider(custom_names=workspaces_names), + alignment=Alignment.LEFT, + ) - # Personal - # PERSONAL_THEME = 7 - # Bar.addSectionAll(fp.KeystoreProvider(theme=PERSONAL_THEME), BarGroupType.RIGHT) - # Bar.addSectionAll( - # fp.NotmuchUnreadProvider(dir="~/.mail/", theme=PERSONAL_THEME), - # BarGroupType.RIGHT, - # ) - # Bar.addSectionAll( - # fp.TodoProvider(dir="~/.vdirsyncer/currentCalendars/", theme=PERSONAL_THEME), - # BarGroupType.RIGHT, - # ) + if dualScreen: + bar.addProvider( + frobar.providers.I3WindowTitleProvider(color=color("white")), + screenNum=0, + alignment=Alignment.CENTER, + ) + bar.addProvider( + frobar.providers.MprisProvider(color=color("bright_white")), + screenNum=rightPreferred, + alignment=Alignment.CENTER, + ) + else: + bar.addProvider( + frobar.common.SpacerProvider(), + alignment=Alignment.LEFT, + ) + bar.addProvider( + frobar.providers.MprisProvider(color=color("bright_white")), + alignment=Alignment.LEFT, + ) - TIME_THEME = 4 - Bar.addSectionAll(fp.TimeProvider(theme=TIME_THEME), BarGroupType.RIGHT) + bar.addProvider( + frobar.providers.CpuProvider(), + screenNum=leftPreferred, + alignment=Alignment.RIGHT, + ) + bar.addProvider( + frobar.providers.LoadProvider(), + screenNum=leftPreferred, + alignment=Alignment.RIGHT, + ) + bar.addProvider( + frobar.providers.RamProvider(), + screenNum=leftPreferred, + alignment=Alignment.RIGHT, + ) + bar.addProvider( + frobar.providers.TemperatureProvider(), + screenNum=leftPreferred, + alignment=Alignment.RIGHT, + ) + bar.addProvider( + frobar.providers.BatteryProvider(), + screenNum=leftPreferred, + alignment=Alignment.RIGHT, + ) + bar.addProvider( + frobar.providers.PulseaudioProvider(color=color("magenta")), + screenNum=rightPreferred, + alignment=Alignment.RIGHT, + ) + bar.addProvider( + frobar.providers.NetworkProvider(color=color("blue")), + screenNum=leftPreferred, + alignment=Alignment.RIGHT, + ) + bar.addProvider( + frobar.providers.TimeProvider(color=color("cyan")), alignment=Alignment.RIGHT + ) - # Bar.run() + bar.launch() + + +if __name__ == "__main__": + main() diff --git a/hm/desktop/frobar/frobar/common.py b/hm/desktop/frobar/frobar/common.py index 690e304..4a5dc59 100644 --- a/hm/desktop/frobar/frobar/common.py +++ b/hm/desktop/frobar/frobar/common.py @@ -1,5 +1,616 @@ -#!/usr/bin/env python3 +import asyncio +import collections +import datetime +import enum +import logging +import signal +import typing -import threading +import gi +import gi.events +import gi.repository.GLib +import i3ipc +import i3ipc.aio +import rich.color +import rich.logging +import rich.terminal_theme -notBusy = threading.Event() +logging.basicConfig( + level="DEBUG", + format="%(message)s", + datefmt="[%X]", + handlers=[rich.logging.RichHandler()], +) +log = logging.getLogger("frobar") + +T = typing.TypeVar("T", bound="ComposableText") +P = typing.TypeVar("P", bound="ComposableText") +C = typing.TypeVar("C", bound="ComposableText") +Sortable = str | int + +# Display utilities + + +def humanSize(numi: int) -> str: + """ + Returns a string of width 3+3 + """ + num = float(numi) + for unit in ("B ", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB"): + if abs(num) < 1000: + if num >= 10: + return f"{int(num):3d}{unit}" + else: + return f"{num:.1f}{unit}" + num /= 1024 + return f"{numi:d}YiB" + + +def ramp(p: float, states: str = " ▁▂▃▄▅▆▇█") -> str: + if p < 0: + return "" + d, m = divmod(p, 1.0) + return states[-1] * int(d) + states[round(m * (len(states) - 1))] + + +def clip(text: str, length: int = 30) -> str: + if len(text) > length: + text = text[: length - 1] + "…" + return text + + +class ComposableText(typing.Generic[P, C]): + + def __init__( + self, + parent: typing.Optional[P] = None, + sortKey: Sortable = 0, + ) -> None: + self.parent: typing.Optional[P] = None + self.children: typing.MutableSequence[C] = list() + self.sortKey = sortKey + if parent: + self.setParent(parent) + self.bar = self.getFirstParentOfType(Bar) + + def setParent(self, parent: P) -> None: + assert self.parent is None + parent.children.append(self) + assert isinstance(parent.children, list) + parent.children.sort(key=lambda c: c.sortKey) + self.parent = parent + self.parent.updateMarkup() + + def unsetParent(self) -> None: + assert self.parent + self.parent.children.remove(self) + self.parent.updateMarkup() + self.parent = None + + def getFirstParentOfType(self, typ: typing.Type[T]) -> T: + parent = self + while not isinstance(parent, typ): + assert parent.parent, f"{self} doesn't have a parent of {typ}" + parent = parent.parent + return parent + + def updateMarkup(self) -> None: + self.bar.refresh.set() + # TODO OPTI See if worth caching the output + + def generateMarkup(self) -> str: + raise NotImplementedError(f"{self} cannot generate markup") + + def getMarkup(self) -> str: + return self.generateMarkup() + + +class Button(enum.Enum): + CLICK_LEFT = "1" + CLICK_MIDDLE = "2" + CLICK_RIGHT = "3" + SCROLL_UP = "4" + SCROLL_DOWN = "5" + + +class Section(ComposableText): + """ + Colorable block separated by chevrons + """ + + def __init__( + self, + parent: "Module", + sortKey: Sortable = 0, + color: rich.color.Color = rich.color.Color.default(), + ) -> None: + super().__init__(parent=parent, sortKey=sortKey) + self.parent: "Module" + self.color = color + + self.desiredText: str | None = None + self.text = "" + self.targetSize = -1 + self.size = -1 + self.animationTask: asyncio.Task | None = None + self.actions: dict[Button, str] = dict() + + def isHidden(self) -> bool: + return self.size < 0 + + # Geometric series, with a cap + ANIM_A = 0.025 + ANIM_R = 0.9 + ANIM_MIN = 0.001 + + async def animate(self) -> None: + increment = 1 if self.size < self.targetSize else -1 + loop = asyncio.get_running_loop() + frameTime = loop.time() + animTime = self.ANIM_A + skipped = 0 + + while self.size != self.targetSize: + self.size += increment + self.updateMarkup() + + animTime *= self.ANIM_R + animTime = max(self.ANIM_MIN, animTime) + frameTime += animTime + sleepTime = frameTime - loop.time() + + # In case of stress, skip refreshing by not awaiting + if sleepTime > 0: + if skipped > 0: + log.warning(f"Skipped {skipped} animation frame(s)") + skipped = 0 + await asyncio.sleep(sleepTime) + else: + skipped += 1 + + def setText(self, text: str | None) -> None: + # OPTI Don't redraw nor reset animation if setting the same text + if self.desiredText == text: + return + self.desiredText = text + if text is None: + self.text = "" + self.targetSize = -1 + else: + self.text = f" {text} " + self.targetSize = len(self.text) + if self.animationTask: + self.animationTask.cancel() + # OPTI Skip the whole animation task if not required + if self.size == self.targetSize: + self.updateMarkup() + else: + self.animationTask = self.bar.taskGroup.create_task(self.animate()) + + def setAction(self, button: Button, callback: typing.Callable | None) -> None: + if button in self.actions: + command = self.actions[button] + self.bar.removeAction(command) + del self.actions[button] + if callback: + command = self.bar.addAction(callback) + self.actions[button] = command + + def generateMarkup(self) -> str: + assert not self.isHidden() + pad = max(0, self.size - len(self.text)) + text = self.text[: self.size] + " " * pad + for button, command in self.actions.items(): + text = "%{A" + button.value + ":" + command + ":}" + text + "%{A}" + return text + + +class Module(ComposableText): + """ + Sections handled by a same updater + """ + + def __init__(self, parent: "Side") -> None: + super().__init__(parent=parent) + self.parent: "Side" + self.children: typing.MutableSequence[Section] + + self.mirroring: Module | None = None + self.mirrors: list[Module] = list() + + def mirror(self, module: "Module") -> None: + self.mirroring = module + module.mirrors.append(self) + + def getSections(self) -> typing.Sequence[Section]: + if self.mirroring: + return self.mirroring.children + else: + return self.children + + def updateMarkup(self) -> None: + super().updateMarkup() + for mirror in self.mirrors: + mirror.updateMarkup() + + +class Alignment(enum.Enum): + LEFT = "l" + RIGHT = "r" + CENTER = "c" + + +class Side(ComposableText): + def __init__(self, parent: "Screen", alignment: Alignment) -> None: + super().__init__(parent=parent) + self.parent: Screen + self.children: typing.MutableSequence[Module] = [] + + self.alignment = alignment + self.bar = parent.getFirstParentOfType(Bar) + + def generateMarkup(self) -> str: + if not self.children: + return "" + text = "%{" + self.alignment.value + "}" + lastSection: Section | None = None + for module in self.children: + for section in module.getSections(): + if section.isHidden(): + continue + hexa = section.color.get_truecolor(theme=self.bar.theme).hex + if lastSection is None: + if self.alignment == Alignment.LEFT: + text += "%{B" + hexa + "}%{F-}" + else: + text += "%{B-}%{F" + hexa + "}%{R}%{F-}" + elif isinstance(lastSection, SpacerSection): + text += "%{B-}%{F" + hexa + "}%{R}%{F-}" + else: + if self.alignment == Alignment.RIGHT: + if lastSection.color == section.color: + text += "" + else: + text += "%{F" + hexa + "}%{R}" + else: + if lastSection.color == section.color: + text += "" + else: + text += "%{R}%{B" + hexa + "}" + text += "%{F-}" + text += section.getMarkup() + lastSection = section + if self.alignment != Alignment.RIGHT and lastSection: + text += "%{R}%{B-}" + return text + + +class Screen(ComposableText): + def __init__(self, parent: "Bar", output: str) -> None: + super().__init__(parent=parent) + self.parent: "Bar" + self.children: typing.MutableSequence[Side] + + self.output = output + + for alignment in Alignment: + Side(parent=self, alignment=alignment) + + def generateMarkup(self) -> str: + return ("%{Sn" + self.output + "}") + "".join( + side.getMarkup() for side in self.children + ) + + +class Bar(ComposableText): + """ + Top-level + """ + + def __init__( + self, + theme: rich.terminal_theme.TerminalTheme = rich.terminal_theme.DEFAULT_TERMINAL_THEME, + ) -> None: + super().__init__() + self.parent: None + self.children: typing.MutableSequence[Screen] + self.longRunningTasks: list[asyncio.Task] = list() + self.theme = theme + + self.refresh = asyncio.Event() + self.taskGroup = asyncio.TaskGroup() + self.providers: list["Provider"] = list() + self.actionIndex = 0 + self.actions: dict[str, typing.Callable] = dict() + + self.periodicProviderTask: typing.Coroutine | None = None + + i3 = i3ipc.Connection() + for output in i3.get_outputs(): + if not output.active: + continue + Screen(parent=self, output=output.name) + + def addLongRunningTask(self, coro: typing.Coroutine) -> None: + task = self.taskGroup.create_task(coro) + self.longRunningTasks.append(task) + + async def run(self) -> None: + cmd = [ + "lemonbar", + "-b", + "-a", + "64", + "-f", + "DejaVuSansM Nerd Font:size=10", + "-F", + self.theme.foreground_color.hex, + "-B", + self.theme.background_color.hex, + ] + proc = await asyncio.create_subprocess_exec( + *cmd, stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE + ) + + async def refresher() -> None: + assert proc.stdin + while True: + await self.refresh.wait() + self.refresh.clear() + markup = self.getMarkup() + proc.stdin.write(markup.encode()) + + async def actionHandler() -> None: + assert proc.stdout + while True: + line = await proc.stdout.readline() + command = line.decode().strip() + callback = self.actions[command] + callback() + + async with self.taskGroup: + self.addLongRunningTask(refresher()) + self.addLongRunningTask(actionHandler()) + for provider in self.providers: + self.addLongRunningTask(provider.run()) + + def exit() -> None: + log.info("Terminating") + for task in self.longRunningTasks: + task.cancel() + + loop = asyncio.get_event_loop() + loop.add_signal_handler(signal.SIGINT, exit) + + def generateMarkup(self) -> str: + return "".join(screen.getMarkup() for screen in self.children) + "\n" + + def addProvider( + self, + provider: "Provider", + alignment: Alignment = Alignment.LEFT, + screenNum: int | None = None, + ) -> None: + """ + screenNum: the provider will be added on this screen if set, all otherwise + """ + modules = list() + for s, screen in enumerate(self.children): + if screenNum is None or s == screenNum: + side = next(filter(lambda s: s.alignment == alignment, screen.children)) + module = Module(parent=side) + modules.append(module) + provider.modules = modules + if modules: + self.providers.append(provider) + + def addAction(self, callback: typing.Callable) -> str: + command = f"{self.actionIndex:x}" + self.actions[command] = callback + self.actionIndex += 1 + return command + + def removeAction(self, command: str) -> None: + del self.actions[command] + + def launch(self) -> None: + # Using GLib's event loop so we can run GLib's code + policy = gi.events.GLibEventLoopPolicy() + asyncio.set_event_loop_policy(policy) + loop = policy.get_event_loop() + loop.run_until_complete(self.run()) + + +class Provider: + sectionType: type[Section] = Section + + def __init__(self, color: rich.color.Color = rich.color.Color.default()) -> None: + self.modules: list[Module] = list() + self.color = color + + async def run(self) -> None: + # Not a NotImplementedError, otherwise can't combine all classes + pass + + +class MirrorProvider(Provider): + def __init__(self, color: rich.color.Color = rich.color.Color.default()) -> None: + super().__init__(color=color) + self.module: Module + + async def run(self) -> None: + await super().run() + self.module = self.modules[0] + for module in self.modules[1:]: + module.mirror(self.module) + + +class SingleSectionProvider(MirrorProvider): + async def run(self) -> None: + await super().run() + self.section = self.sectionType(parent=self.module, color=self.color) + + +class StaticProvider(SingleSectionProvider): + def __init__( + self, text: str, color: rich.color.Color = rich.color.Color.default() + ) -> None: + super().__init__(color=color) + self.text = text + + async def run(self) -> None: + await super().run() + self.section.setText(self.text) + + +class SpacerSection(Section): + pass + + +class SpacerProvider(SingleSectionProvider): + sectionType = SpacerSection + + def __init__(self, length: int = 5) -> None: + super().__init__(color=rich.color.Color.default()) + self.length = length + + async def run(self) -> None: + await super().run() + assert isinstance(self.section, SpacerSection) + self.section.setText(" " * self.length) + + +class StatefulSection(Section): + + def __init__( + self, + parent: Module, + sortKey: Sortable = 0, + color: rich.color.Color = rich.color.Color.default(), + ) -> None: + super().__init__(parent=parent, sortKey=sortKey, color=color) + self.state = 0 + self.numberStates: int + + self.setAction(Button.CLICK_LEFT, self.incrementState) + self.setAction(Button.CLICK_RIGHT, self.decrementState) + + def incrementState(self) -> None: + self.state += 1 + self.changeState() + + def decrementState(self) -> None: + self.state -= 1 + self.changeState() + + def setChangedState(self, callback: typing.Callable) -> None: + self.callback = callback + + def changeState(self) -> None: + self.state %= self.numberStates + self.bar.taskGroup.create_task(self.callback()) + + +class StatefulSectionProvider(Provider): + sectionType = StatefulSection + + +class SingleStatefulSectionProvider(StatefulSectionProvider, SingleSectionProvider): + section: StatefulSection + + +class MultiSectionsProvider(Provider): + + def __init__(self, color: rich.color.Color = rich.color.Color.default()) -> None: + super().__init__(color=color) + self.sectionKeys: dict[Module, dict[Sortable, Section]] = ( + collections.defaultdict(dict) + ) + self.updaters: dict[Section, typing.Callable] = dict() + + async def getSectionUpdater(self, section: Section) -> typing.Callable: + raise NotImplementedError() + + async def updateSections(self, sections: set[Sortable], module: Module) -> None: + moduleSections = self.sectionKeys[module] + async with asyncio.TaskGroup() as tg: + for sortKey in sections: + section = moduleSections.get(sortKey) + if not section: + section = self.sectionType( + parent=module, sortKey=sortKey, color=self.color + ) + self.updaters[section] = await self.getSectionUpdater(section) + moduleSections[sortKey] = section + + updater = self.updaters[section] + tg.create_task(updater()) + + missingKeys = set(moduleSections.keys()) - sections + for missingKey in missingKeys: + section = moduleSections.get(missingKey) + assert section + section.setText(None) + + +class PeriodicProvider(Provider): + async def init(self) -> None: + pass + + async def loop(self) -> None: + raise NotImplementedError() + + @classmethod + async def task(cls, bar: Bar) -> None: + providers = list() + for provider in bar.providers: + if isinstance(provider, PeriodicProvider): + providers.append(provider) + await provider.init() + + while True: + # TODO Block bar update during the periodic update of the loops + loops = [provider.loop() for provider in providers] + asyncio.gather(*loops) + + now = datetime.datetime.now() + # Hardcoded to 1 second... not sure if we want some more than that, + # and if the logic to check if a task should run would be a win + # compared to the task itself + remaining = 1 - now.microsecond / 1000000 + await asyncio.sleep(remaining) + + async def run(self) -> None: + await super().run() + for module in self.modules: + bar = module.getFirstParentOfType(Bar) + assert bar + if not bar.periodicProviderTask: + bar.periodicProviderTask = PeriodicProvider.task(bar) + bar.addLongRunningTask(bar.periodicProviderTask) + + +class PeriodicStatefulProvider(SingleStatefulSectionProvider, PeriodicProvider): + async def run(self) -> None: + await super().run() + self.section.setChangedState(self.loop) + + +class AlertingProvider(Provider): + COLOR_NORMAL = rich.color.Color.parse("green") + COLOR_WARNING = rich.color.Color.parse("yellow") + COLOR_DANGER = rich.color.Color.parse("red") + + warningThreshold: float + dangerThreshold: float + + def updateLevel(self, level: float) -> None: + if level > self.dangerThreshold: + color = self.COLOR_DANGER + elif level > self.warningThreshold: + color = self.COLOR_WARNING + else: + color = self.COLOR_NORMAL + for module in self.modules: + for section in module.getSections(): + section.color = color diff --git a/hm/desktop/frobar/frobar/display.py b/hm/desktop/frobar/frobar/display.py deleted file mode 100644 index c24b891..0000000 --- a/hm/desktop/frobar/frobar/display.py +++ /dev/null @@ -1,756 +0,0 @@ -#!/usr/bin/env python3init - -import enum -import logging -import os -import signal -import subprocess -import threading -import time -import typing - -import coloredlogs -import i3ipc - -from frobar.common import notBusy - -coloredlogs.install(level="DEBUG", fmt="%(levelname)s %(message)s") -log = logging.getLogger() - - -# TODO Allow deletion of Bar, BarGroup and Section for screen changes -# IDEA Use i3 ipc events rather than relying on xrandr or Xlib (less portable -# but easier) -# TODO Optimize to use write() calls instead of string concatenation (writing -# BarGroup strings should be a good compromise) -# TODO Use bytes rather than strings -# TODO Use default colors of lemonbar sometimes -# TODO Adapt bar height with font height -# TODO OPTI Static text objects that update its parents if modified -# TODO forceSize and changeText are different - - -Handle = typing.Callable[[], None] -Decorator = Handle | str | None -Element: typing.TypeAlias = typing.Union[str, "Text", None] -Part: typing.TypeAlias = typing.Union[str, "Text", "Section"] - - -class BarGroupType(enum.Enum): - LEFT = 0 - RIGHT = 1 - # TODO Middle - # MID_LEFT = 2 - # MID_RIGHT = 3 - - -class BarStdoutThread(threading.Thread): - def run(self) -> None: - while Bar.running: - assert Bar.process.stdout - handle = Bar.process.stdout.readline().strip() - if not len(handle): - Bar.stop() - if handle not in Bar.actionsH2F: - log.error("Unknown action: {}".format(handle)) - continue - function = Bar.actionsH2F[handle] - function() - - -class Bar: - """ - One bar for each screen - """ - - # Constants - FONTS = ["DejaVuSansM Nerd Font"] - FONTSIZE = 10 - - @staticmethod - def init() -> None: - Bar.running = True - Bar.everyone = set() - Section.init() - - cmd = [ - "lemonbar", - "-b", - "-a", - "64", - "-F", - Section.FGCOLOR, - "-B", - Section.BGCOLOR, - ] - for font in Bar.FONTS: - cmd += ["-f", "{}:size={}".format(font, Bar.FONTSIZE)] - Bar.process = subprocess.Popen( - cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE - ) - BarStdoutThread().start() - - i3 = i3ipc.Connection() - for output in i3.get_outputs(): - if not output.active: - continue - Bar(output.name) - - @staticmethod - def stop() -> None: - Bar.running = False - Bar.process.kill() - - # TODO This is not really the best way to do it I guess - os.killpg(os.getpid(), signal.SIGTERM) - - @staticmethod - def run() -> None: - Bar.forever() - i3 = i3ipc.Connection() - - def doStop(*args: list) -> None: - Bar.stop() - - try: - i3.on("ipc_shutdown", doStop) - i3.main() - except BaseException: - Bar.stop() - - # Class globals - everyone: set["Bar"] - string = "" - process: subprocess.Popen - running = False - - nextHandle = 0 - actionsF2H: dict[Handle, bytes] = dict() - actionsH2F: dict[bytes, Handle] = dict() - - @staticmethod - def getFunctionHandle(function: typing.Callable[[], None]) -> bytes: - assert callable(function) - if function in Bar.actionsF2H.keys(): - return Bar.actionsF2H[function] - - handle = "{:x}".format(Bar.nextHandle).encode() - Bar.nextHandle += 1 - - Bar.actionsF2H[function] = handle - Bar.actionsH2F[handle] = function - - return handle - - @staticmethod - def forever() -> None: - Bar.process.wait() - Bar.stop() - - def __init__(self, output: str) -> None: - self.output = output - self.groups = dict() - - for groupType in BarGroupType: - group = BarGroup(groupType, self) - self.groups[groupType] = group - - self.childsChanged = False - Bar.everyone.add(self) - - @staticmethod - def addSectionAll( - section: "Section", group: "BarGroupType" - ) -> None: - """ - .. note:: - Add the section before updating it for the first time. - """ - for bar in Bar.everyone: - bar.addSection(section, group=group) - section.added() - - def addSection(self, section: "Section", group: "BarGroupType") -> None: - self.groups[group].addSection(section) - - def update(self) -> None: - if self.childsChanged: - self.string = "%{Sn" + self.output + "}" - self.string += self.groups[BarGroupType.LEFT].string - self.string += self.groups[BarGroupType.RIGHT].string - - self.childsChanged = False - - @staticmethod - def updateAll() -> None: - if Bar.running: - Bar.string = "" - for bar in Bar.everyone: - bar.update() - Bar.string += bar.string - # Color for empty sections - Bar.string += BarGroup.color(*Section.EMPTY) - - string = Bar.string + "\n" - # print(string) - assert Bar.process.stdin - Bar.process.stdin.write(string.encode()) - Bar.process.stdin.flush() - - -class BarGroup: - """ - One for each group of each bar - """ - - everyone: set["BarGroup"] = set() - - def __init__(self, groupType: BarGroupType, parent: Bar): - - self.groupType = groupType - self.parent = parent - - self.sections: list["Section"] = list() - self.string = "" - self.parts: list[Part] = [] - - #: One of the sections that had their theme or visibility changed - self.childsThemeChanged = False - - #: One of the sections that had their text (maybe their size) changed - self.childsTextChanged = False - - BarGroup.everyone.add(self) - - def addSection(self, section: "Section") -> None: - self.sections.append(section) - section.addParent(self) - - def addSectionAfter(self, sectionRef: "Section", section: "Section") -> None: - index = self.sections.index(sectionRef) - self.sections.insert(index + 1, section) - section.addParent(self) - - ALIGNS = {BarGroupType.LEFT: "%{l}", BarGroupType.RIGHT: "%{r}"} - - @staticmethod - def fgColor(color: str) -> str: - return "%{F" + (color or "-") + "}" - - @staticmethod - def bgColor(color: str) -> str: - return "%{B" + (color or "-") + "}" - - @staticmethod - def color(fg: str, bg: str) -> str: - return BarGroup.fgColor(fg) + BarGroup.bgColor(bg) - - def update(self) -> None: - if self.childsThemeChanged: - parts: list[Part] = [BarGroup.ALIGNS[self.groupType]] - - secs = [sec for sec in self.sections if sec.visible] - lenS = len(secs) - for s in range(lenS): - sec = secs[s] - theme = Section.THEMES[sec.theme] - if self.groupType == BarGroupType.LEFT: - oSec = secs[s + 1] if s < lenS - 1 else None - else: - oSec = secs[s - 1] if s > 0 else None - oTheme = ( - Section.THEMES[oSec.theme] if oSec is not None else Section.EMPTY - ) - - if self.groupType == BarGroupType.LEFT: - if s == 0: - parts.append(BarGroup.bgColor(theme[1])) - parts.append(BarGroup.fgColor(theme[0])) - parts.append(sec) - if theme == oTheme: - parts.append("") - else: - parts.append(BarGroup.color(theme[1], oTheme[1]) + "") - else: - if theme is oTheme: - parts.append("") - else: - parts.append(BarGroup.fgColor(theme[1]) + "") - parts.append(BarGroup.color(*theme)) - parts.append(sec) - - # TODO OPTI Concatenate successive strings - self.parts = parts - - if self.childsTextChanged or self.childsThemeChanged: - self.string = "" - for part in self.parts: - if isinstance(part, str): - self.string += part - elif isinstance(part, Section): - self.string += part.curText - - self.parent.childsChanged = True - - self.childsThemeChanged = False - self.childsTextChanged = False - - @staticmethod - def updateAll() -> None: - for group in BarGroup.everyone: - group.update() - Bar.updateAll() - - -class SectionThread(threading.Thread): - ANIMATION_START = 0.025 - ANIMATION_STOP = 0.001 - ANIMATION_EVOLUTION = 0.9 - - def run(self) -> None: - while Section.somethingChanged.wait(): - notBusy.wait() - Section.updateAll() - animTime = self.ANIMATION_START - frameTime = time.perf_counter() - while len(Section.sizeChanging) > 0: - frameTime += animTime - curTime = time.perf_counter() - sleepTime = frameTime - curTime - time.sleep(sleepTime if sleepTime > 0 else 0) - Section.updateAll() - animTime *= self.ANIMATION_EVOLUTION - if animTime < self.ANIMATION_STOP: - animTime = self.ANIMATION_STOP - - -Theme = tuple[str, str] - - -class Section: - # TODO Update all of that to base16 - COLORS = [ - "#092c0e", - "#143718", - "#5a7058", - "#677d64", - "#89947f", - "#99a08d", - "#fae2e3", - "#fff0f1", - "#e0332e", - "#cf4b15", - "#bb8801", - "#8d9800", - "#1fa198", - "#008dd1", - "#5c73c4", - "#d43982", - ] - FGCOLOR = "#fff0f1" - BGCOLOR = "#092c0e" - - THEMES: list[Theme] = list() - EMPTY: Theme = (FGCOLOR, BGCOLOR) - - ICON: str | None = None - PERSISTENT = False - - #: Sections that do not have their destination size - sizeChanging: set["Section"] = set() - updateThread: threading.Thread = SectionThread(daemon=True) - somethingChanged = threading.Event() - lastChosenTheme = 0 - - @staticmethod - def init() -> None: - for t in range(8, 16): - Section.THEMES.append((Section.COLORS[0], Section.COLORS[t])) - Section.THEMES.append((Section.COLORS[0], Section.COLORS[3])) - Section.THEMES.append((Section.COLORS[0], Section.COLORS[6])) - - Section.updateThread.start() - - def __init__(self, theme: int | None = None) -> None: - #: Displayed section - #: Note: A section can be empty and displayed! - self.visible = False - - if theme is None: - theme = Section.lastChosenTheme - Section.lastChosenTheme = (Section.lastChosenTheme + 1) % len( - Section.THEMES - ) - self.theme = theme - - #: Displayed text - self.curText = "" - #: Displayed text size - self.curSize = 0 - - #: Destination text - self.dstText = Text(" ", Text(), " ") - #: Destination size - self.dstSize = 0 - - #: Groups that have this section - self.parents: set[BarGroup] = set() - - self.icon = self.ICON - self.persistent = self.PERSISTENT - - def __str__(self) -> str: - try: - return "<{}><{}>{:01d}{}{:02d}/{:02d}".format( - self.curText, - self.dstText, - self.theme, - "+" if self.visible else "-", - self.curSize, - self.dstSize, - ) - except Exception: - return super().__str__() - - def addParent(self, parent: BarGroup) -> None: - self.parents.add(parent) - - def appendAfter(self, section: "Section") -> None: - assert len(self.parents) - for parent in self.parents: - parent.addSectionAfter(self, section) - - def added(self) -> None: - pass - - def informParentsThemeChanged(self) -> None: - for parent in self.parents: - parent.childsThemeChanged = True - - def informParentsTextChanged(self) -> None: - for parent in self.parents: - parent.childsTextChanged = True - - def updateText(self, text: Element) -> None: - if isinstance(text, str): - text = Text(text) - elif isinstance(text, Text) and not len(text.elements): - text = None - - self.dstText[0] = ( - None - if (text is None and not self.persistent) - else ((" " + self.icon + " ") if self.icon else " ") - ) - self.dstText[1] = text - self.dstText[2] = ( - " " if self.dstText[1] is not None and len(self.dstText[1]) else None - ) - - self.dstSize = len(self.dstText) - self.dstText.setSection(self) - - if self.curSize == self.dstSize: - if self.dstSize > 0: - self.curText = str(self.dstText) - self.informParentsTextChanged() - else: - Section.sizeChanging.add(self) - Section.somethingChanged.set() - - def setDecorators(self, **kwargs: Handle) -> None: - self.dstText.setDecorators(**kwargs) - self.curText = str(self.dstText) - self.informParentsTextChanged() - Section.somethingChanged.set() - - def updateTheme(self, theme: int) -> None: - assert theme < len(Section.THEMES) - if theme == self.theme: - return - self.theme = theme - self.informParentsThemeChanged() - Section.somethingChanged.set() - - def updateVisibility(self, visibility: bool) -> None: - - self.visible = visibility - self.informParentsThemeChanged() - Section.somethingChanged.set() - - @staticmethod - def fit(text: str, size: int) -> str: - t = len(text) - return text[:size] if t >= size else text + " " * (size - t) - - def update(self) -> None: - # TODO Might profit of a better logic - if not self.visible: - self.updateVisibility(True) - return - - if self.dstSize > self.curSize: - self.curSize += 1 - elif self.dstSize < self.curSize: - self.curSize -= 1 - else: - # Visibility toggling must be done one step after curSize = 0 - if self.dstSize == 0: - self.updateVisibility(False) - Section.sizeChanging.remove(self) - return - - self.curText = self.dstText.text(size=self.curSize, pad=True) - self.informParentsTextChanged() - - @staticmethod - def updateAll() -> None: - """ - Process all sections for text size changes - """ - - for sizeChanging in Section.sizeChanging.copy(): - sizeChanging.update() - - BarGroup.updateAll() - - Section.somethingChanged.clear() - - @staticmethod - def ramp(p: float, ramp: str = " ▁▂▃▄▅▆▇█") -> str: - if p > 1: - return ramp[-1] - elif p < 0: - return ramp[0] - else: - return ramp[round(p * (len(ramp) - 1))] - - -class StatefulSection(Section): - # TODO FEAT Allow to temporary expand the section (e.g. when important change) - NUMBER_STATES: int - DEFAULT_STATE = 0 - - def __init__(self, theme: int | None) -> None: - Section.__init__(self, theme=theme) - self.state = self.DEFAULT_STATE - if hasattr(self, "onChangeState"): - self.onChangeState(self.state) - self.setDecorators( - clickLeft=self.incrementState, clickRight=self.decrementState - ) - - def incrementState(self) -> None: - newState = min(self.state + 1, self.NUMBER_STATES - 1) - self.changeState(newState) - - def decrementState(self) -> None: - newState = max(self.state - 1, 0) - self.changeState(newState) - - def changeState(self, state: int) -> None: - assert state < self.NUMBER_STATES - self.state = state - if hasattr(self, "onChangeState"): - self.onChangeState(state) - assert hasattr( - self, "refreshData" - ), "StatefulSection should be paired with some Updater" - self.refreshData() - - -class ColorCountsSection(StatefulSection): - # TODO FEAT Blend colors when not expanded - # TODO FEAT Blend colors with importance of count - # TODO FEAT Allow icons instead of counts - NUMBER_STATES = 3 - COLORABLE_ICON = "?" - - def __init__(self, theme: None | int = None) -> None: - StatefulSection.__init__(self, theme=theme) - - def subfetcher(self) -> list[tuple[int, str]]: - raise NotImplementedError("Interface must be implemented") - - def fetcher(self) -> typing.Union[None, "Text"]: - counts = self.subfetcher() - # Nothing - if not len(counts): - return None - # Icon colored - elif self.state == 0 and len(counts) == 1: - count, color = counts[0] - return Text(self.COLORABLE_ICON, fg=color) - # Icon - elif self.state == 0 and len(counts) > 1: - return Text(self.COLORABLE_ICON) - # Icon + Total - elif self.state == 1 and len(counts) > 1: - total = sum([count for count, color in counts]) - return Text(self.COLORABLE_ICON, " ", str(total)) - # Icon + Counts - else: - text = Text(self.COLORABLE_ICON) - for count, color in counts: - text.append(" ", Text(str(count), fg=color)) - return text - - -class Text: - def _setDecorators(self, decorators: dict[str, Decorator]) -> None: - # TODO OPTI Convert no decorator to strings - self.decorators = decorators - self.prefix: str | None = None - self.suffix: str | None = None - - def __init__(self, *args: Element, **kwargs: Decorator) -> None: - # TODO OPTI Concatenate consecutrive string - self.elements = list(args) - - self._setDecorators(kwargs) - self.section: Section - - def append(self, *args: Element) -> None: - self.elements += list(args) - - def prepend(self, *args: Element) -> None: - self.elements = list(args) + self.elements - - def setElements(self, *args: Element) -> None: - self.elements = list(args) - - def setDecorators(self, **kwargs: Decorator) -> None: - self._setDecorators(kwargs) - - def setSection(self, section: Section) -> None: - self.section = section - for element in self.elements: - if isinstance(element, Text): - element.setSection(section) - - def _genFixs(self) -> None: - if self.prefix is not None and self.suffix is not None: - return - - self.prefix = "" - self.suffix = "" - - def nest(prefix: str, suffix: str) -> None: - assert self.prefix is not None - assert self.suffix is not None - self.prefix = self.prefix + "%{" + prefix + "}" - self.suffix = "%{" + suffix + "}" + self.suffix - - def getColor(val: str) -> str: - # TODO Allow themes - assert len(val) == 7 - return val - - def button(number: str, function: Handle) -> None: - handle = Bar.getFunctionHandle(function) - nest("A" + number + ":" + handle.decode() + ":", "A" + number) - - for key, val in self.decorators.items(): - if val is None: - continue - if key == "fg": - reset = self.section.THEMES[self.section.theme][0] - assert isinstance(val, str) - nest("F" + getColor(val), "F" + reset) - elif key == "bg": - reset = self.section.THEMES[self.section.theme][1] - assert isinstance(val, str) - nest("B" + getColor(val), "B" + reset) - elif key == "clickLeft": - assert callable(val) - button("1", val) - elif key == "clickMiddle": - assert callable(val) - button("2", val) - elif key == "clickRight": - assert callable(val) - button("3", val) - elif key == "scrollUp": - assert callable(val) - button("4", val) - elif key == "scrollDown": - assert callable(val) - button("5", val) - else: - log.warn("Unkown decorator: {}".format(key)) - - def _text(self, size: int | None = None, pad: bool = False) -> tuple[str, int]: - self._genFixs() - assert self.prefix is not None - assert self.suffix is not None - curString = self.prefix - curSize = 0 - remSize = size - - for element in self.elements: - if element is None: - continue - elif isinstance(element, Text): - newString, newSize = element._text(size=remSize) - else: - newString = str(element) - if remSize is not None: - newString = newString[:remSize] - newSize = len(newString) - - curString += newString - curSize += newSize - - if remSize is not None: - remSize -= newSize - if remSize <= 0: - break - - curString += self.suffix - - if pad: - assert remSize is not None - if remSize > 0: - curString += " " * remSize - curSize += remSize - - if size is not None: - if pad: - assert size == curSize - else: - assert size >= curSize - return curString, curSize - - def text(self, size: int | None = None, pad: bool = False) -> str: - string, size = self._text(size=size, pad=pad) - return string - - def __str__(self) -> str: - self._genFixs() - assert self.prefix is not None - assert self.suffix is not None - curString = self.prefix - for element in self.elements: - if element is None: - continue - else: - curString += str(element) - curString += self.suffix - return curString - - def __len__(self) -> int: - curSize = 0 - for element in self.elements: - if element is None: - continue - elif isinstance(element, Text): - curSize += len(element) - else: - curSize += len(str(element)) - return curSize - - def __getitem__(self, index: int) -> Element: - return self.elements[index] - - def __setitem__(self, index: int, data: Element) -> None: - self.elements[index] = data diff --git a/hm/desktop/frobar/frobar/providers.py b/hm/desktop/frobar/frobar/providers.py index 7f9345c..7784fb4 100644 --- a/hm/desktop/frobar/frobar/providers.py +++ b/hm/desktop/frobar/frobar/providers.py @@ -1,958 +1,617 @@ -#!/usr/bin/env python3 - +import asyncio +import collections import datetime -import enum import ipaddress -import json -import logging import os -import random import socket -import subprocess import time +import typing -import coloredlogs +import gi import i3ipc -import mpd -import notmuch +import i3ipc.aio import psutil import pulsectl +import pulsectl_asyncio +import rich.color -from frobar.display import (ColorCountsSection, Element, Section, - StatefulSection, Text) -from frobar.updaters import (I3Updater, InotifyUpdater, MergedUpdater, - PeriodicUpdater, ThreadedUpdater, Updater) +from frobar.common import (AlertingProvider, Button, MirrorProvider, Module, + MultiSectionsProvider, PeriodicProvider, + PeriodicStatefulProvider, Screen, Section, + SingleSectionProvider, StatefulSection, + StatefulSectionProvider, clip, humanSize, ramp) -coloredlogs.install(level="DEBUG", fmt="%(levelname)s %(message)s") -log = logging.getLogger() +gi.require_version("Playerctl", "2.0") +import gi.repository.Playerctl -# TODO Generator class (for I3WorkspacesProvider, NetworkProvider and later -# PulseaudioProvider and MpdProvider) +class I3ModeProvider(SingleSectionProvider): + def on_mode(self, i3: i3ipc.Connection, e: i3ipc.Event) -> None: + self.section.setText(None if e.change == "default" else e.change) -def humanSize(numi: int) -> str: - """ - Returns a string of width 3+3 - """ - num = float(numi) - for unit in ("B ", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB"): - if abs(num) < 1000: - if num >= 10: - return "{:3d}{}".format(int(num), unit) - else: - return "{:.1f}{}".format(num, unit) - num /= 1024 - return "{:d}YiB".format(numi) + async def run(self) -> None: + await super().run() + i3 = await i3ipc.aio.Connection(auto_reconnect=True).connect() + i3.on(i3ipc.Event.MODE, self.on_mode) + await i3.main() + # TODO Hide WorkspaceProvider when this is active -def randomColor(seed: int | bytes = 0) -> str: - random.seed(seed) - return "#{:02x}{:02x}{:02x}".format(*[random.randint(0, 255) for _ in range(3)]) - -class TimeProvider(StatefulSection, PeriodicUpdater): - FORMATS = ["%H:%M", "%m-%d %H:%M:%S", "%a %y-%m-%d %H:%M:%S"] - NUMBER_STATES = len(FORMATS) - DEFAULT_STATE = 1 - - def fetcher(self) -> str: - now = datetime.datetime.now() - return now.strftime(self.FORMATS[self.state]) - - def __init__(self, theme: int | None = None): - PeriodicUpdater.__init__(self) - StatefulSection.__init__(self, theme) - self.changeInterval(1) # TODO OPTI When state < 1 - - -class AlertLevel(enum.Enum): - NORMAL = 0 - WARNING = 1 - DANGER = 2 - - -class AlertingSection(StatefulSection): - # TODO EASE Correct settings for themes - ALERT_THEMES = {AlertLevel.NORMAL: 3, AlertLevel.WARNING: 1, AlertLevel.DANGER: 0} - PERSISTENT = True - - def getLevel(self, quantity: float) -> AlertLevel: - if quantity > self.dangerThresold: - return AlertLevel.DANGER - elif quantity > self.warningThresold: - return AlertLevel.WARNING - else: - return AlertLevel.NORMAL - - def updateLevel(self, quantity: float) -> None: - self.level = self.getLevel(quantity) - self.updateTheme(self.ALERT_THEMES[self.level]) - if self.level == AlertLevel.NORMAL: - return - # TODO Temporary update state - - def __init__(self, theme: int | None = None): - StatefulSection.__init__(self, theme) - self.dangerThresold = 0.90 - self.warningThresold = 0.75 - - -class CpuProvider(AlertingSection, PeriodicUpdater): - NUMBER_STATES = 3 - ICON = "" - - def fetcher(self) -> Element: - percent = psutil.cpu_percent(percpu=False) - self.updateLevel(percent / 100) - if self.state >= 2: - percents = psutil.cpu_percent(percpu=True) - return "".join([Section.ramp(p / 100) for p in percents]) - elif self.state >= 1: - return Section.ramp(percent / 100) - return "" - - def __init__(self, theme: int | None = None): - AlertingSection.__init__(self, theme) - PeriodicUpdater.__init__(self) - self.changeInterval(1) - - -class LoadProvider(AlertingSection, PeriodicUpdater): - NUMBER_STATES = 3 - ICON = "" - - def fetcher(self) -> Element: - load = os.getloadavg() - self.updateLevel(load[0]) - if self.state >= 2: - return " ".join(f"{load[i]:.2f}" for i in range(3)) - elif self.state >= 1: - return f"{load[0]:.2f}" - return "" - - def __init__(self, theme: int | None = None): - AlertingSection.__init__(self, theme) - PeriodicUpdater.__init__(self) - self.changeInterval(5) - self.warningThresold = 5 - self.dangerThresold = 10 - - -class RamProvider(AlertingSection, PeriodicUpdater): - """ - Shows free RAM - """ - - NUMBER_STATES = 4 - ICON = "" - - def fetcher(self) -> Element: - mem = psutil.virtual_memory() - freePerc = mem.percent / 100 - self.updateLevel(freePerc) - - if self.state < 1: - return None - - text = Text(Section.ramp(freePerc)) - if self.state >= 2: - freeStr = humanSize(mem.total - mem.available) - text.append(freeStr) - if self.state >= 3: - totalStr = humanSize(mem.total) - text.append("/", totalStr) - - return text - - def __init__(self, theme: int | None = None): - AlertingSection.__init__(self, theme) - PeriodicUpdater.__init__(self) - self.changeInterval(1) - - -class TemperatureProvider(AlertingSection, PeriodicUpdater): - NUMBER_STATES = 2 - RAMP = "" - MAIN_TEMPS = ["coretemp", "amdgpu", "cpu_thermal"] - # For Intel, AMD and ARM respectively. - - def fetcher(self) -> Element: - allTemp = psutil.sensors_temperatures() - for main in self.MAIN_TEMPS: - if main in allTemp: - break - else: - return "?" - temp = allTemp[main][0] - - self.warningThresold = temp.high or 90.0 - self.dangerThresold = temp.critical or 100.0 - self.updateLevel(temp.current) - - self.icon = Section.ramp(temp.current / self.warningThresold, self.RAMP) - if self.state >= 1: - return "{:.0f}°C".format(temp.current) - return "" - - def __init__(self, theme: int | None = None): - AlertingSection.__init__(self, theme) - PeriodicUpdater.__init__(self) - self.changeInterval(5) - - -class BatteryProvider(AlertingSection, PeriodicUpdater): - # TODO Support ACPID for events - NUMBER_STATES = 3 - RAMP = "" - - def fetcher(self) -> Element: - bat = psutil.sensors_battery() - if not bat: - return None - - self.icon = ("" if bat.power_plugged else "") + Section.ramp( - bat.percent / 100, self.RAMP - ) - - self.updateLevel(1 - bat.percent / 100) - - if self.state < 1: - return "" - - t = Text("{:.0f}%".format(bat.percent)) - - if self.state < 2: - return t - - h = int(bat.secsleft / 3600) - m = int((bat.secsleft - h * 3600) / 60) - t.append(" ({:d}:{:02d})".format(h, m)) - return t - - def __init__(self, theme: int | None = None): - AlertingSection.__init__(self, theme) - PeriodicUpdater.__init__(self) - self.changeInterval(5) - - -class XautolockProvider(Section, InotifyUpdater): - ICON = "" - - def fetcher(self) -> str | None: - with open(self.path) as fd: - state = fd.read().strip() - if state == "enabled": - return None - elif state == "disabled": - return "" - else: - return "?" - - def __init__(self, theme: int | None = None): - Section.__init__(self, theme=theme) - InotifyUpdater.__init__(self) - # TODO XDG - self.path = os.path.realpath(os.path.expanduser("~/.cache/xautolock")) - self.addPath(self.path) - - -class PulseaudioProvider(StatefulSection, ThreadedUpdater): - NUMBER_STATES = 3 - DEFAULT_STATE = 1 - - def __init__(self, theme: int | None = None): - ThreadedUpdater.__init__(self) - StatefulSection.__init__(self, theme) - self.pulseEvents = pulsectl.Pulse("event-handler") - - self.pulseEvents.event_mask_set(pulsectl.PulseEventMaskEnum.sink) - self.pulseEvents.event_callback_set(self.handleEvent) - self.start() - self.refreshData() - - def fetcher(self) -> Element: - sinks = [] - with pulsectl.Pulse("list-sinks") as pulse: - for sink in pulse.sink_list(): - if ( - sink.port_active.name == "analog-output-headphones" - or sink.port_active.description == "Headphones" - ): - icon = "" - elif ( - sink.port_active.name == "analog-output-speaker" - or sink.port_active.description == "Speaker" - ): - icon = "" if sink.mute else "" - elif sink.port_active.name in ("headset-output", "headphone-output"): - icon = "" - else: - icon = "?" - vol = pulse.volume_get_all_chans(sink) - fg = (sink.mute and "#333333") or (vol > 1 and "#FF0000") or None - - t = Text(icon, fg=fg) - sinks.append(t) - - if self.state < 1: - continue - - if self.state < 2: - if not sink.mute: - ramp = " " - while vol >= 0: - ramp += self.ramp(vol if vol < 1 else 1) - vol -= 1 - t.append(ramp) - else: - t.append(" {:2.0f}%".format(vol * 100)) - - return Text(*sinks) - - def loop(self) -> None: - self.pulseEvents.event_listen() - - def handleEvent(self, ev: pulsectl.PulseEventInfo) -> None: - self.refreshData() - - -class NetworkProviderSection(StatefulSection, Updater): - NUMBER_STATES = 5 - DEFAULT_STATE = 1 - - def actType(self) -> None: - self.ssid = None - if self.iface.startswith("eth") or self.iface.startswith("enp"): - if "u" in self.iface: - self.icon = "" - else: - self.icon = "" - elif self.iface.startswith("wlan") or self.iface.startswith("wl"): - self.icon = "" - if self.showSsid: - cmd = ["iwgetid", self.iface, "--raw"] - p = subprocess.run(cmd, stdout=subprocess.PIPE) - self.ssid = p.stdout.strip().decode() - elif self.iface.startswith("tun") or self.iface.startswith("tap"): - self.icon = "" - elif self.iface.startswith("docker"): - self.icon = "" - elif self.iface.startswith("veth"): - self.icon = "" - elif self.iface.startswith("vboxnet"): - self.icon = "" - - def getAddresses( - self, - ) -> tuple[psutil._common.snicaddr, psutil._common.snicaddr]: - ipv4 = None - ipv6 = None - for address in self.parent.addrs[self.iface]: - if address.family == socket.AF_INET: - ipv4 = address - elif address.family == socket.AF_INET6: - ipv6 = address - return ipv4, ipv6 - - def fetcher(self) -> Element: - self.icon = "?" - self.persistent = False - if ( - self.iface not in self.parent.stats - or not self.parent.stats[self.iface].isup - or self.iface.startswith("lo") - ): - return None - - # Get addresses - ipv4, ipv6 = self.getAddresses() - if ipv4 is None and ipv6 is None: - return None - - text = [] - self.persistent = True - self.actType() - - if self.showSsid and self.ssid: - text.append(self.ssid) - - if self.showAddress: - if ipv4: - netStrFull = "{}/{}".format(ipv4.address, ipv4.netmask) - addr = ipaddress.IPv4Network(netStrFull, strict=False) - addrStr = "{}/{}".format(ipv4.address, addr.prefixlen) - text.append(addrStr) - # TODO IPV6 - # if ipv6: - # text += ' ' + ipv6.address - - if self.showSpeed: - recvDiff = ( - self.parent.IO[self.iface].bytes_recv - - self.parent.prevIO[self.iface].bytes_recv - ) - sentDiff = ( - self.parent.IO[self.iface].bytes_sent - - self.parent.prevIO[self.iface].bytes_sent - ) - recvDiff /= self.parent.dt - sentDiff /= self.parent.dt - text.append("↓{}↑{}".format(humanSize(recvDiff), humanSize(sentDiff))) - - if self.showTransfer: - text.append( - "⇓{}⇑{}".format( - humanSize(self.parent.IO[self.iface].bytes_recv), - humanSize(self.parent.IO[self.iface].bytes_sent), - ) - ) - - return " ".join(text) - - def onChangeState(self, state: int) -> None: - self.showSsid = state >= 1 - self.showAddress = state >= 2 - self.showSpeed = state >= 3 - self.showTransfer = state >= 4 - - def __init__(self, iface: str, parent: "NetworkProvider"): - Updater.__init__(self) - StatefulSection.__init__(self, theme=parent.theme) - self.iface = iface - self.parent = parent - - -class NetworkProvider(Section, PeriodicUpdater): - def fetchData(self) -> None: - self.prev = self.last - self.prevIO = self.IO - - self.stats = psutil.net_if_stats() - self.addrs: dict[str, list[psutil._common.snicaddr]] = psutil.net_if_addrs() - self.IO: dict[str, psutil._common.snetio] = psutil.net_io_counters(pernic=True) - self.ifaces = self.stats.keys() - - self.last: float = time.perf_counter() - self.dt = self.last - self.prev - - def fetcher(self) -> None: - self.fetchData() - - # Add missing sections - lastSection: NetworkProvider | NetworkProviderSection = self - for iface in sorted(list(self.ifaces)): - if iface not in self.sections.keys(): - section = NetworkProviderSection(iface, self) - lastSection.appendAfter(section) - self.sections[iface] = section - else: - section = self.sections[iface] - lastSection = section - - # Refresh section text - for section in self.sections.values(): - section.refreshData() - - return None - - def __init__(self, theme: int | None = None): - PeriodicUpdater.__init__(self) - Section.__init__(self, theme) - - self.sections: dict[str, NetworkProviderSection] = dict() - self.last = 0 - self.IO = dict() - self.fetchData() - self.changeInterval(5) - - -class RfkillProvider(Section, PeriodicUpdater): - # TODO FEAT rfkill doesn't seem to indicate that the hardware switch is - # toggled - PATH = "/sys/class/rfkill" - - def fetcher(self) -> Element: - t = Text() - for device in os.listdir(self.PATH): - with open(os.path.join(self.PATH, device, "soft"), "rb") as f: - softBlocked = f.read().strip() != b"0" - with open(os.path.join(self.PATH, device, "hard"), "rb") as f: - hardBlocked = f.read().strip() != b"0" - - if not hardBlocked and not softBlocked: - continue - - with open(os.path.join(self.PATH, device, "type"), "rb") as f: - typ = f.read().strip() - - fg = (hardBlocked and "#CCCCCC") or (softBlocked and "#FF0000") or None - if typ == b"wlan": - icon = "" - elif typ == b"bluetooth": - icon = "" - else: - icon = "?" - - t.append(Text(icon, fg=fg)) - return t - - def __init__(self, theme: int | None = None): - PeriodicUpdater.__init__(self) - Section.__init__(self, theme) - self.changeInterval(5) - - -class SshAgentProvider(PeriodicUpdater): - def fetcher(self) -> Element: - cmd = ["ssh-add", "-l"] - proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL) - if proc.returncode != 0: - return None - text = Text() - for line in proc.stdout.split(b"\n"): - if not len(line): - continue - fingerprint = line.split()[1] - text.append(Text("", fg=randomColor(seed=fingerprint))) - return text - - def __init__(self) -> None: - PeriodicUpdater.__init__(self) - self.changeInterval(5) - - -class GpgAgentProvider(PeriodicUpdater): - def fetcher(self) -> Element: - cmd = ["gpg-connect-agent", "keyinfo --list", "/bye"] - proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL) - # proc = subprocess.run(cmd) - if proc.returncode != 0: - return None - text = Text() - for line in proc.stdout.split(b"\n"): - if not len(line) or line == b"OK": - continue - spli = line.split() - if spli[6] != b"1": - continue - keygrip = spli[2] - text.append(Text("", fg=randomColor(seed=keygrip))) - return text - - def __init__(self) -> None: - PeriodicUpdater.__init__(self) - self.changeInterval(5) - - -class KeystoreProvider(Section, MergedUpdater): - # TODO OPTI+FEAT Use ColorCountsSection and not MergedUpdater, this is useless - ICON = "" - - def __init__(self, theme: int | None = None): - MergedUpdater.__init__(self, SshAgentProvider(), GpgAgentProvider()) - Section.__init__(self, theme) - - -class NotmuchUnreadProvider(ColorCountsSection, InotifyUpdater): - COLORABLE_ICON = "" - - def subfetcher(self) -> list[tuple[int, str]]: - db = notmuch.Database(mode=notmuch.Database.MODE.READ_ONLY, path=self.dir) - counts = [] - for account in self.accounts: - queryStr = "folder:/{}/ and tag:unread".format(account) - query = notmuch.Query(db, queryStr) - nbMsgs = query.count_messages() - if nbMsgs < 1: - continue - counts.append((nbMsgs, self.colors[account])) - # db.close() - return counts - - def __init__(self, dir: str = "~/.mail/", theme: int | None = None): - InotifyUpdater.__init__(self) - ColorCountsSection.__init__(self, theme) - - self.dir = os.path.realpath(os.path.expanduser(dir)) - assert os.path.isdir(self.dir) - - # Fetching account list - self.accounts = sorted( - [a for a in os.listdir(self.dir) if not a.startswith(".")] - ) - # Fetching colors - self.colors = dict() - for account in self.accounts: - filename = os.path.join(self.dir, account, "color") - with open(filename, "r") as f: - color = f.read().strip() - self.colors[account] = color - - self.addPath(os.path.join(self.dir, ".notmuch", "xapian")) - - -class TodoProvider(ColorCountsSection, InotifyUpdater): - # TODO OPT/UX Maybe we could get more data from the todoman python module - # TODO OPT Specific callback for specific directory - COLORABLE_ICON = "" - - def updateCalendarList(self) -> None: - calendars = sorted(os.listdir(self.dir)) - for calendar in calendars: - # If the calendar wasn't in the list - if calendar not in self.calendars: - self.addPath(os.path.join(self.dir, calendar), refresh=False) - - # Fetching name - path = os.path.join(self.dir, calendar, "displayname") - with open(path, "r") as f: - self.names[calendar] = f.read().strip() - - # Fetching color - path = os.path.join(self.dir, calendar, "color") - with open(path, "r") as f: - self.colors[calendar] = f.read().strip() - self.calendars: list[str] = calendars - - def __init__(self, dir: str, theme: int | None = None): - """ - :parm str dir: [main]path value in todoman.conf - """ - InotifyUpdater.__init__(self) - ColorCountsSection.__init__(self, theme=theme) - self.dir = os.path.realpath(os.path.expanduser(dir)) - assert os.path.isdir(self.dir) - - self.calendars = [] - self.colors: dict[str, str] = dict() - self.names: dict[str, str] = dict() - self.updateCalendarList() - self.refreshData() - - def countUndone(self, calendar: str | None) -> int: - cmd = ["todo", "--porcelain", "list"] - if calendar: - cmd.append(self.names[calendar]) - proc = subprocess.run(cmd, stdout=subprocess.PIPE) - data = json.loads(proc.stdout) - return len(data) - - def subfetcher(self) -> list[tuple[int, str]]: - counts = [] - - # TODO This an ugly optimisation that cuts on features, but todoman - # calls are very expensive so we keep that in the meanwhile - if self.state < 2: - c = self.countUndone(None) - if c > 0: - counts.append((c, "#00000")) - counts.append((0, "#FFFFF")) - return counts - # Optimisation ends here - - for calendar in self.calendars: - c = self.countUndone(calendar) - if c <= 0: - continue - counts.append((c, self.colors[calendar])) - return counts - - -class I3WindowTitleProvider(Section, I3Updater): +class I3WindowTitleProvider(SingleSectionProvider): # TODO FEAT To make this available from start, we need to find the # `focused=True` element following the `focus` array - # TODO Feat Make this output dependant if wanted def on_window(self, i3: i3ipc.Connection, e: i3ipc.Event) -> None: - self.updateText(e.container.name) + self.section.setText(clip(e.container.name, 60)) - def __init__(self, theme: int | None = None): - I3Updater.__init__(self) - Section.__init__(self, theme=theme) - self.on("window", self.on_window) + async def run(self) -> None: + await super().run() + i3 = await i3ipc.aio.Connection(auto_reconnect=True).connect() + i3.on(i3ipc.Event.WINDOW, self.on_window) + await i3.main() -class I3WorkspacesProviderSection(Section): - def selectTheme(self) -> int: - if self.workspace.urgent: - return self.parent.themeUrgent - elif self.workspace.focused: - return self.parent.themeFocus - elif self.workspace.visible: - return self.parent.themeVisible - else: - return self.parent.themeNormal - - # TODO On mode change the state (shown / hidden) gets overriden so every - # tab is shown - - def show(self) -> None: - self.updateTheme(self.selectTheme()) - self.updateText( - self.fullName if self.workspace.focused else self.workspace.name - ) - - def switchTo(self) -> None: - self.parent.i3.command("workspace {}".format(self.workspace.name)) - - def updateWorkspace(self, workspace: i3ipc.WorkspaceReply) -> None: - self.workspace = workspace - self.fullName: str = self.parent.customNames.get(workspace.name, workspace.name) - self.show() - - def __init__(self, parent: "I3WorkspacesProvider"): - Section.__init__(self) - self.parent = parent - self.setDecorators(clickLeft=self.switchTo) - self.tempText: Element = None - - def empty(self) -> None: - self.updateTheme(self.parent.themeNormal) - self.updateText(None) - - def tempShow(self) -> None: - self.updateText(self.tempText) - - def tempEmpty(self) -> None: - self.tempText = self.dstText[1] - self.updateText(None) - - -class I3WorkspacesProvider(Section, I3Updater): - - def updateWorkspace(self, workspace: i3ipc.WorkspaceReply) -> None: - section: Section | None = None - lastSectionOnOutput = self.modeSection - highestNumOnOutput = -1 - for sect in self.sections.values(): - if sect.workspace.num == workspace.num: - section = sect - break - elif ( - sect.workspace.num > highestNumOnOutput - and sect.workspace.num < workspace.num - and sect.workspace.output == workspace.output - ): - lastSectionOnOutput = sect - highestNumOnOutput = sect.workspace.num - else: - section = I3WorkspacesProviderSection(self) - self.sections[workspace.num] = section - - for bargroup in self.parents: - if bargroup.parent.output == workspace.output: - break - else: - bargroup = list(self.parents)[0] - bargroup.addSectionAfter(lastSectionOnOutput, section) - section.updateWorkspace(workspace) - - def updateWorkspaces(self) -> None: - workspaces = self.i3.get_workspaces() - for workspace in workspaces: - self.updateWorkspace(workspace) - - def added(self) -> None: - super().added() - self.appendAfter(self.modeSection) - self.updateWorkspaces() - - def on_workspace_change(self, i3: i3ipc.Connection, e: i3ipc.Event) -> None: - self.updateWorkspaces() - - def on_workspace_empty(self, i3: i3ipc.Connection, e: i3ipc.Event) -> None: - self.sections[e.current.num].empty() - - def on_mode(self, i3: i3ipc.Connection, e: i3ipc.Event) -> None: - if e.change == "default": - self.modeSection.updateText(None) - for section in self.sections.values(): - section.tempShow() - else: - self.modeSection.updateText(e.change) - for section in self.sections.values(): - section.tempEmpty() +class I3WorkspacesProvider(MultiSectionsProvider): + COLOR_URGENT = rich.color.Color.parse("red") + COLOR_FOCUSED = rich.color.Color.parse("yellow") + # TODO Should be orange (not a terminal color) + COLOR_VISIBLE = rich.color.Color.parse("cyan") + COLOR_DEFAULT = rich.color.Color.parse("bright_black") def __init__( self, - theme: int = 0, - themeVisible: int = 4, - themeFocus: int = 3, - themeUrgent: int = 1, - themeMode: int = 2, - customNames: dict[str, str] = dict(), - ): - I3Updater.__init__(self) - Section.__init__(self) - self.themeNormal = theme - self.themeFocus = themeFocus - self.themeUrgent = themeUrgent - self.themeVisible = themeVisible - self.customNames = customNames + custom_names: dict[str, str] = {}, + ) -> None: + super().__init__() + self.workspaces: dict[int, i3ipc.WorkspaceReply] + self.custom_names = custom_names - self.sections: dict[int, I3WorkspacesProviderSection] = dict() - # The event object doesn't have the visible property, - # so we have to fetch the list of workspaces anyways. - # This sacrifices a bit of performance for code simplicity. - self.on("workspace::init", self.on_workspace_change) - self.on("workspace::focus", self.on_workspace_change) - self.on("workspace::empty", self.on_workspace_empty) - self.on("workspace::urgent", self.on_workspace_change) - self.on("workspace::rename", self.on_workspace_change) - # TODO Un-handled/tested: reload, rename, restored, move + self.modulesFromOutput: dict[str, Module] = dict() - self.on("mode", self.on_mode) - self.modeSection = Section(theme=themeMode) + async def getSectionUpdater(self, section: Section) -> typing.Callable: + assert isinstance(section.sortKey, int) + num = section.sortKey + + def switch_to_workspace() -> None: + self.bar.taskGroup.create_task(self.i3.command(f"workspace number {num}")) + + section.setAction(Button.CLICK_LEFT, switch_to_workspace) + + async def update() -> None: + workspace = self.workspaces[num] + name = workspace.name + if workspace.urgent: + section.color = self.COLOR_URGENT + elif workspace.focused: + section.color = self.COLOR_FOCUSED + elif workspace.visible: + section.color = self.COLOR_VISIBLE + else: + section.color = self.COLOR_DEFAULT + if workspace.focused: + name = self.custom_names.get(name, name) + section.setText(name) + + return update + + async def updateWorkspaces(self) -> None: + """ + Since the i3 IPC interface cannot really tell you by events + when workspaces get invisible or not urgent anymore. + Relying on those exclusively would require reimplementing some of i3 logic. + Fetching all the workspaces on event looks ugly but is the most maintainable. + Times I tried to challenge this and failed: 2. + """ + workspaces = await self.i3.get_workspaces() + self.workspaces = dict() + modules = collections.defaultdict(set) + for workspace in workspaces: + self.workspaces[workspace.num] = workspace + module = self.modulesFromOutput[workspace.output] + modules[module].add(workspace.num) + + await asyncio.gather( + *[self.updateSections(nums, module) for module, nums in modules.items()] + ) + + def onWorkspaceChange( + self, i3: i3ipc.Connection, e: i3ipc.Event | None = None + ) -> None: + # Cancelling the task doesn't seem to prevent performance double-events + self.bar.taskGroup.create_task(self.updateWorkspaces()) + + async def run(self) -> None: + for module in self.modules: + screen = module.getFirstParentOfType(Screen) + output = screen.output + self.modulesFromOutput[output] = module + self.bar = module.bar + + self.i3 = await i3ipc.aio.Connection(auto_reconnect=True).connect() + self.i3.on(i3ipc.Event.WORKSPACE, self.onWorkspaceChange) + self.onWorkspaceChange(self.i3) + await self.i3.main() -class MpdProvider(Section, ThreadedUpdater): - # TODO FEAT More informations and controls +class MprisProvider(MirrorProvider): - MAX_LENGTH = 50 + STATUSES = { + gi.repository.Playerctl.PlaybackStatus.PLAYING: "", + gi.repository.Playerctl.PlaybackStatus.PAUSED: "", + gi.repository.Playerctl.PlaybackStatus.STOPPED: "", + } - def connect(self) -> None: - self.mpd.connect("localhost", 6600) - - def __init__(self, theme: int | None = None): - ThreadedUpdater.__init__(self) - Section.__init__(self, theme) - - self.mpd = mpd.MPDClient() - self.connect() - self.refreshData() - self.start() - - def fetcher(self) -> Element: - stat = self.mpd.status() - if not len(stat) or stat["state"] == "stop": - return None - - cur = self.mpd.currentsong() - if not len(cur): - return None - - infos = [] - - def tryAdd(field: str) -> None: - if field in cur: - infos.append(cur[field]) - - tryAdd("title") - tryAdd("album") - tryAdd("artist") - - infosStr = " - ".join(infos) - if len(infosStr) > MpdProvider.MAX_LENGTH: - infosStr = infosStr[: MpdProvider.MAX_LENGTH - 1] + "…" - - return " {}".format(infosStr) - - def loop(self) -> None: - try: - self.mpd.idle("player") - self.refreshData() - except mpd.base.ConnectionError as e: - log.warn(e, exc_info=True) - self.connect() - except BaseException as e: - log.error(e, exc_info=True) - - -class MprisProviderSection(Section, Updater): - def __init__(self, parent: "MprisProvider"): - Updater.__init__(self) - Section.__init__(self, theme=parent.theme) - self.parent = parent - - -class MprisProvider(Section, ThreadedUpdater): - # TODO Controls (select player at least) - # TODO Use the Python native thing for it: - # https://github.com/altdesktop/playerctl?tab=readme-ov-file#using-the-library - # TODO Make it less sucky - - SECTIONS = [ - "{{ playerName }} {{ status }}", - "{{ album }}", - "{{ artist }}", - "{{ duration(position) }}/{{ duration(mpris:length) }}" " {{ title }}", - ] - - # nf-fd icons don't work (UTF-16?) - SUBSTITUTIONS = { - "Playing": "", - "Paused": "", - "Stopped": "", + PROVIDERS = { "mpd": "", "firefox": "", "chromium": "", "mpv": "", } - ICONS = { - 1: "", - 2: "", - 3: "", - } + async def run(self) -> None: + await super().run() + self.status = self.sectionType(parent=self.module, color=self.color) + self.album = self.sectionType(parent=self.module, color=self.color) + self.artist = self.sectionType(parent=self.module, color=self.color) + self.title = self.sectionType(parent=self.module, color=self.color) - MAX_SECTION_LENGTH = 40 + self.manager = gi.repository.Playerctl.PlayerManager() + self.manager.connect("name-appeared", self.on_name_appeared) + self.manager.connect("player-vanished", self.on_player_vanished) - def __init__(self, theme: int | None = None): - ThreadedUpdater.__init__(self) - Section.__init__(self, theme) + self.playerctldName = gi.repository.Playerctl.PlayerName() + self.playerctldName.name = "playerctld" + self.playerctldName.source = gi.repository.Playerctl.Source.DBUS_SESSION - self.line = "" - self.start() + self.player: gi.repository.Playerctl.Player | None = None + self.playing = asyncio.Event() - self.sections: list[Section] = [] + for name in self.manager.props.player_names: + self.init_player(name) - def fetcher(self) -> Element: - create = not len(self.sections) - populate = self.line - split = self.line.split("\t") + self.updateSections() - lastSection: Section = self - for i in range(len(self.SECTIONS)): - if create: - section = Section(theme=self.theme) - lastSection.appendAfter(section) - lastSection = section - self.sections.append(section) + while True: + # Occasionally it will skip a second + # but haven't managed to reproduce with debug info + await self.playing.wait() + self.updateTitle() + if self.player: + pos = self.player.props.position + rem = 1 - (pos % 1000000) / 1000000 + await asyncio.sleep(rem) else: - section = self.sections[i] + self.playing.clear() - if populate: - text = split[i] - if i == 0: - for key, val in self.SUBSTITUTIONS.items(): - text = text.replace(key, val) - if text: - if i in self.ICONS: - text = f"{self.ICONS[i]} {text}" - if len(text) > self.MAX_SECTION_LENGTH: - text = text[: self.MAX_SECTION_LENGTH - 1] + "…" - section.updateText(text) - else: - section.updateText(None) + @staticmethod + def get( + something: gi.overrides.GLib.Variant, key: str, default: typing.Any = None + ) -> typing.Any: + if key in something.keys(): + return something[key] + else: + return default + + @staticmethod + def formatUs(ms: int) -> str: + if ms < 60 * 60 * 1000000: + return time.strftime("%M:%S", time.gmtime(ms // 1000000)) + else: + return str(datetime.timedelta(microseconds=ms)) + + def findCurrentPlayer(self) -> None: + for name in [self.playerctldName] + self.manager.props.player_names: + # TODO Test what happens when playerctld is not available + self.player = gi.repository.Playerctl.Player.new_from_name(name) + if not self.player.props.can_play: + continue + break + else: + self.player = None + + def updateSections(self) -> None: + self.findCurrentPlayer() + + if self.player is None: + self.status.setText(None) + self.album.setText(None) + self.artist.setText(None) + self.title.setText(None) + self.playing.clear() + return + + player = self.player.props.player_name + player = self.PROVIDERS.get(player, player) + status = self.STATUSES.get(self.player.props.playback_status, "?") + self.status.setText(f"{player} {status}") + + if ( + self.player.props.playback_status + == gi.repository.Playerctl.PlaybackStatus.PLAYING + ): + self.playing.set() + else: + self.playing.clear() + + metadata = self.player.props.metadata + + album = self.get(metadata, "xesam:album") + if album: + self.album.setText(f" {clip(album)}") + else: + self.album.setText(None) + + artists = self.get(metadata, "xesam:artist") + if artists: + artist = ", ".join(artists) + self.artist.setText(f" {clip(artist)}") + else: + self.artist.setText(None) + + self.updateTitle() + + def updateTitle(self) -> None: + if self.player is None: + return + + metadata = self.player.props.metadata + pos = self.player.props.position # In µs + text = f" {self.formatUs(pos)}" + dur = self.get(metadata, "mpris:length") + if dur: + text += f"/{self.formatUs(dur)}" + title = self.get(metadata, "xesam:title") + if title: + text += f" {clip(title)}" + self.title.setText(text) + + def on_player_vanished( + self, + manager: gi.repository.Playerctl.PlayerManager, + player: gi.repository.Playerctl.Player, + ) -> None: + self.updateSections() + + def on_event( + self, + player: gi.repository.Playerctl.Player, + _: typing.Any, + manager: gi.repository.Playerctl.PlayerManager, + ) -> None: + self.updateSections() + + def init_player(self, name: gi.repository.Playerctl.PlayerName) -> None: + player = gi.repository.Playerctl.Player.new_from_name(name) + # All events will cause the active player to change, + # so we listen on all events, even if the display won't change + player.connect("playback-status", self.on_event, self.manager) + player.connect("loop-status", self.on_event, self.manager) + player.connect("shuffle", self.on_event, self.manager) + player.connect("metadata", self.on_event, self.manager) + player.connect("volume", self.on_event, self.manager) + player.connect("seeked", self.on_event, self.manager) + self.manager.manage_player(player) + + def on_name_appeared( + self, manager: gi.repository.Playerctl.PlayerManager, name: str + ) -> None: + self.init_player(name) + self.updateSections() + + +class CpuProvider(AlertingProvider, PeriodicStatefulProvider): + async def init(self) -> None: + self.section.numberStates = 3 + self.warningThreshold = 75 + self.dangerThreshold = 95 + + async def loop(self) -> None: + percent = psutil.cpu_percent(percpu=False) + self.updateLevel(percent) + + text = "" + if self.section.state >= 2: + percents = psutil.cpu_percent(percpu=True) + text += " " + "".join([ramp(p / 100) for p in percents]) + elif self.section.state >= 1: + text += " " + ramp(percent / 100) + self.section.setText(text) + + +class LoadProvider(AlertingProvider, PeriodicStatefulProvider): + async def init(self) -> None: + self.section.numberStates = 3 + self.warningThreshold = 5 + self.dangerThreshold = 10 + + async def loop(self) -> None: + load = os.getloadavg() + self.updateLevel(load[0]) + + text = "" + loads = 3 if self.section.state >= 2 else self.section.state + for load_index in range(loads): + text += f" {load[load_index]:.2f}" + self.section.setText(text) + + +class RamProvider(AlertingProvider, PeriodicStatefulProvider): + + async def init(self) -> None: + self.section.numberStates = 4 + self.warningThreshold = 75 + self.dangerThreshold = 95 + + async def loop(self) -> None: + mem = psutil.virtual_memory() + self.updateLevel(mem.percent) + + text = "" + if self.section.state >= 1: + text += " " + ramp(mem.percent / 100) + if self.section.state >= 2: + text += humanSize(mem.total - mem.available) + if self.section.state >= 3: + text += "/" + humanSize(mem.total) + self.section.setText(text) + + +class TemperatureProvider(AlertingProvider, PeriodicStatefulProvider): + RAMP = "" + MAIN_TEMPS = ["coretemp", "amdgpu", "cpu_thermal"] + # For Intel, AMD and ARM respectively. + + main: str + + async def init(self) -> None: + self.section.numberStates = 2 + + allTemp = psutil.sensors_temperatures() + for main in self.MAIN_TEMPS: + if main in allTemp: + self.main = main + break + else: + raise IndexError("Could not find suitable temperature sensor") + + temp = allTemp[self.main][0] + self.warningThreshold = temp.high or 90.0 + self.dangerThreshold = temp.critical or 100.0 + + async def loop(self) -> None: + allTemp = psutil.sensors_temperatures() + temp = allTemp[self.main][0] + self.updateLevel(temp.current) + + text = ramp(temp.current / self.warningThreshold, self.RAMP) + if self.section.state >= 1: + text += f" {temp.current:.0f}°C" + self.section.setText(text) + + +class BatteryProvider(AlertingProvider, PeriodicStatefulProvider): + # TODO Support ACPID for events + RAMP = "" + + async def init(self) -> None: + self.section.numberStates = 3 + # TODO 1 refresh rate is too quick + + self.warningThreshold = 75 + self.dangerThreshold = 95 + + async def loop(self) -> None: + bat = psutil.sensors_battery() + if not bat: + self.section.setText(None) + + self.updateLevel(100 - bat.percent) + + text = "" if bat.power_plugged else "" + text += ramp(bat.percent / 100, self.RAMP) + + if self.section.state >= 1: + text += f" {bat.percent:.0f}%" + if self.section.state >= 2: + h = int(bat.secsleft / 3600) + m = int((bat.secsleft - h * 3600) / 60) + text += f" ({h:d}:{m:02d})" + + self.section.setText(text) + + +class PulseaudioProvider( + MirrorProvider, StatefulSectionProvider, MultiSectionsProvider +): + async def getSectionUpdater(self, section: Section) -> typing.Callable: + assert isinstance(section, StatefulSection) + assert isinstance(section.sortKey, str) + + sink = self.sinks[section.sortKey] + + if ( + sink.port_active.name == "analog-output-headphones" + or sink.port_active.description == "Headphones" + ): + icon = "" + elif ( + sink.port_active.name == "analog-output-speaker" + or sink.port_active.description == "Speaker" + ): + icon = "" + elif sink.port_active.name in ("headset-output", "headphone-output"): + icon = "" + else: + icon = "?" + + section.numberStates = 3 + section.state = 1 + + # TODO Change volume with wheel + + async def updater() -> None: + assert isinstance(section, StatefulSection) + text = icon + sink = self.sinks[section.sortKey] + + async with pulsectl_asyncio.PulseAsync("frobar-get-volume") as pulse: + vol = await pulse.volume_get_all_chans(sink) + if section.state == 1: + text += f" {ramp(vol)}" + elif section.state == 2: + text += f" {vol:.0%}" + # TODO Show which is default + section.setText(text) + + section.setChangedState(updater) + + return updater + + async def update(self) -> None: + async with pulsectl_asyncio.PulseAsync("frobar-list-sinks") as pulse: + self.sinks = dict((sink.name, sink) for sink in await pulse.sink_list()) + await self.updateSections(set(self.sinks.keys()), self.module) + + async def run(self) -> None: + await super().run() + await self.update() + async with pulsectl_asyncio.PulseAsync("frobar-events-listener") as pulse: + async for event in pulse.subscribe_events(pulsectl.PulseEventMaskEnum.sink): + await self.update() + + +class NetworkProvider( + MirrorProvider, PeriodicProvider, StatefulSectionProvider, MultiSectionsProvider +): + def __init__( + self, + color: rich.color.Color = rich.color.Color.default(), + ) -> None: + super().__init__(color=color) + + async def init(self) -> None: + loop = asyncio.get_running_loop() + self.time = loop.time() + self.io_counters = psutil.net_io_counters(pernic=True) + + async def doNothing(self) -> None: + pass + + @staticmethod + def getIfaceAttributes(iface: str) -> tuple[bool, str, bool]: + relevant = True + icon = "?" + wifi = False + if iface == "lo": + relevant = False + elif iface.startswith("eth") or iface.startswith("enp"): + if "u" in iface: + icon = "" else: - section.updateText(None) + icon = "" + elif iface.startswith("wlan") or iface.startswith("wl"): + icon = "" + wifi = True + elif ( + iface.startswith("tun") or iface.startswith("tap") or iface.startswith("wg") + ): + icon = "" - return None + elif iface.startswith("docker"): + icon = "" + elif iface.startswith("veth"): + icon = "" + elif iface.startswith("vboxnet"): + icon = "" - def loop(self) -> None: - cmd = [ - "playerctl", - "metadata", - "--format", - "\t".join(self.SECTIONS), - "--follow", - ] - p = subprocess.Popen(cmd, stdout=subprocess.PIPE) - assert p.stdout - while p.poll() is None: - self.line = p.stdout.readline().decode().strip() - self.refreshData() - p = subprocess.Popen(cmd, stdout=subprocess.PIPE) - assert p.stdout - while p.poll() is None: - self.line = p.stdout.readline().decode().strip() - self.refreshData() + return relevant, icon, wifi + + async def getSectionUpdater(self, section: Section) -> typing.Callable: + + assert isinstance(section, StatefulSection) + assert isinstance(section.sortKey, str) + iface = section.sortKey + + relevant, icon, wifi = self.getIfaceAttributes(iface) + + if not relevant: + return self.doNothing + + section.numberStates = 5 if wifi else 4 + section.state = 1 if wifi else 0 + + async def update() -> None: + assert isinstance(section, StatefulSection) + + if not self.if_stats[iface].isup: + section.setText(None) + return + + text = icon + + state = section.state + (0 if wifi else 1) + if wifi and state >= 1: # SSID + cmd = ["iwgetid", iface, "--raw"] + proc = await asyncio.create_subprocess_exec( + *cmd, stdout=asyncio.subprocess.PIPE + ) + stdout, stderr = await proc.communicate() + text += f" {stdout.decode().strip()}" + + if state >= 2: # Address + for address in self.if_addrs[iface]: + if address.family == socket.AF_INET: + net = ipaddress.IPv4Network( + (address.address, address.netmask), strict=False + ) + text += f" {address.address}/{net.prefixlen}" + break + + if state >= 3: # Speed + prevRecv = self.prev_io_counters[iface].bytes_recv + recv = self.io_counters[iface].bytes_recv + prevSent = self.prev_io_counters[iface].bytes_sent + sent = self.io_counters[iface].bytes_sent + dt = self.time - self.prev_time + + recvDiff = (recv - prevRecv) / dt + sentDiff = (sent - prevSent) / dt + text += f" ↓{humanSize(recvDiff)}↑{humanSize(sentDiff)}" + + if state >= 4: # Counter + text += f" ⇓{humanSize(recv)}⇑{humanSize(sent)}" + + section.setText(text) + + section.setChangedState(update) + + return update + + async def loop(self) -> None: + loop = asyncio.get_running_loop() + + self.prev_io_counters = self.io_counters + self.prev_time = self.time + # On-demand would only benefit if_addrs: + # stats are used to determine display, + # and we want to keep previous io_counters + # so displaying stats is ~instant. + self.time = loop.time() + self.if_stats = psutil.net_if_stats() + self.if_addrs = psutil.net_if_addrs() + self.io_counters = psutil.net_io_counters(pernic=True) + + await self.updateSections(set(self.if_stats.keys()), self.module) + + +class TimeProvider(PeriodicStatefulProvider): + FORMATS = ["%H:%M", "%m-%d %H:%M:%S", "%a %y-%m-%d %H:%M:%S"] + + async def init(self) -> None: + self.section.state = 1 + self.section.numberStates = len(self.FORMATS) + + async def loop(self) -> None: + now = datetime.datetime.now() + format = self.FORMATS[self.section.state] + self.section.setText(now.strftime(format)) diff --git a/hm/desktop/frobar/frobar/updaters.py b/hm/desktop/frobar/frobar/updaters.py deleted file mode 100644 index b5e07d5..0000000 --- a/hm/desktop/frobar/frobar/updaters.py +++ /dev/null @@ -1,240 +0,0 @@ -#!/usr/bin/env python3 - -import functools -import logging -import math -import os -import subprocess -import threading -import time - -import coloredlogs -import i3ipc -import pyinotify - -from frobar.common import notBusy -from frobar.display import Element - -coloredlogs.install(level="DEBUG", fmt="%(levelname)s %(message)s") -log = logging.getLogger() - -# TODO Sync bar update with PeriodicUpdater updates - - -class Updater: - @staticmethod - def init() -> None: - PeriodicUpdater.init() - InotifyUpdater.init() - notBusy.set() - - def updateText(self, text: Element) -> None: - print(text) - - def fetcher(self) -> Element: - return "{} refreshed".format(self) - - def __init__(self) -> None: - self.lock = threading.Lock() - - def refreshData(self) -> None: - # TODO OPTI Maybe discard the refresh if there's already another one? - self.lock.acquire() - try: - data = self.fetcher() - except BaseException as e: - log.error(e, exc_info=True) - data = "" - self.updateText(data) - self.lock.release() - - -class PeriodicUpdaterThread(threading.Thread): - def run(self) -> None: - # TODO Sync with system clock - counter = 0 - while True: - notBusy.set() - if PeriodicUpdater.intervalsChanged.wait( - timeout=PeriodicUpdater.intervalStep - ): - # ↑ sleeps here - notBusy.clear() - PeriodicUpdater.intervalsChanged.clear() - counter = 0 - for providerList in PeriodicUpdater.intervals.copy().values(): - for provider in providerList.copy(): - provider.refreshData() - else: - notBusy.clear() - assert PeriodicUpdater.intervalStep is not None - counter += PeriodicUpdater.intervalStep - counter = counter % PeriodicUpdater.intervalLoop - for interval in PeriodicUpdater.intervals.keys(): - if counter % interval == 0: - for provider in PeriodicUpdater.intervals[interval]: - provider.refreshData() - - -class PeriodicUpdater(Updater): - """ - Needs to call :func:`PeriodicUpdater.changeInterval` in `__init__` - """ - - intervals: dict[int, set["PeriodicUpdater"]] = dict() - intervalStep: int | None = None - intervalLoop: int - updateThread: threading.Thread = PeriodicUpdaterThread(daemon=True) - intervalsChanged = threading.Event() - - @staticmethod - def gcds(*args: int) -> int: - return functools.reduce(math.gcd, args) - - @staticmethod - def lcm(a: int, b: int) -> int: - """Return lowest common multiple.""" - return a * b // math.gcd(a, b) - - @staticmethod - def lcms(*args: int) -> int: - """Return lowest common multiple.""" - return functools.reduce(PeriodicUpdater.lcm, args) - - @staticmethod - def updateIntervals() -> None: - intervalsList = list(PeriodicUpdater.intervals.keys()) - PeriodicUpdater.intervalStep = PeriodicUpdater.gcds(*intervalsList) - PeriodicUpdater.intervalLoop = PeriodicUpdater.lcms(*intervalsList) - PeriodicUpdater.intervalsChanged.set() - - @staticmethod - def init() -> None: - PeriodicUpdater.updateThread.start() - - def __init__(self) -> None: - Updater.__init__(self) - self.interval: int | None = None - - def changeInterval(self, interval: int) -> None: - - if self.interval is not None: - PeriodicUpdater.intervals[self.interval].remove(self) - - self.interval = interval - - if interval not in PeriodicUpdater.intervals: - PeriodicUpdater.intervals[interval] = set() - PeriodicUpdater.intervals[interval].add(self) - - PeriodicUpdater.updateIntervals() - - -class InotifyUpdaterEventHandler(pyinotify.ProcessEvent): - def process_default(self, event: pyinotify.Event) -> None: - assert event.path in InotifyUpdater.paths - - if 0 in InotifyUpdater.paths[event.path]: - for provider in InotifyUpdater.paths[event.path][0]: - provider.refreshData() - - if event.name in InotifyUpdater.paths[event.path]: - for provider in InotifyUpdater.paths[event.path][event.name]: - provider.refreshData() - - -class InotifyUpdater(Updater): - """ - Needs to call :func:`PeriodicUpdater.changeInterval` in `__init__` - """ - - wm = pyinotify.WatchManager() - paths: dict[str, dict[str | int, set["InotifyUpdater"]]] = dict() - - @staticmethod - def init() -> None: - notifier = pyinotify.ThreadedNotifier( - InotifyUpdater.wm, InotifyUpdaterEventHandler() - ) - notifier.start() - - # TODO Mask for folders - MASK = pyinotify.IN_CREATE | pyinotify.IN_MODIFY | pyinotify.IN_DELETE - - def addPath(self, path: str, refresh: bool = True) -> None: - path = os.path.realpath(os.path.expanduser(path)) - - # Detect if file or folder - if os.path.isdir(path): - self.dirpath: str = path - # 0: Directory watcher - self.filename: str | int = 0 - elif os.path.isfile(path): - self.dirpath = os.path.dirname(path) - self.filename = os.path.basename(path) - else: - raise FileNotFoundError("No such file or directory: '{}'".format(path)) - - # Register watch action - if self.dirpath not in InotifyUpdater.paths: - InotifyUpdater.paths[self.dirpath] = dict() - if self.filename not in InotifyUpdater.paths[self.dirpath]: - InotifyUpdater.paths[self.dirpath][self.filename] = set() - InotifyUpdater.paths[self.dirpath][self.filename].add(self) - - # Add watch - InotifyUpdater.wm.add_watch(self.dirpath, InotifyUpdater.MASK) - - if refresh: - self.refreshData() - - -class ThreadedUpdaterThread(threading.Thread): - def __init__(self, updater: "ThreadedUpdater") -> None: - self.updater = updater - threading.Thread.__init__(self, daemon=True) - self.looping = True - - def run(self) -> None: - try: - while self.looping: - self.updater.loop() - except BaseException as e: - log.error("Error with {}".format(self.updater)) - log.error(e, exc_info=True) - self.updater.updateText("") - - -class ThreadedUpdater(Updater): - """ - Must implement loop(), and call start() - """ - - def __init__(self) -> None: - Updater.__init__(self) - self.thread = ThreadedUpdaterThread(self) - - def loop(self) -> None: - self.refreshData() - time.sleep(10) - - def start(self) -> None: - self.thread.start() - - -class I3Updater(ThreadedUpdater): - # TODO OPTI One i3 connection for all - - def __init__(self) -> None: - ThreadedUpdater.__init__(self) - self.i3 = i3ipc.Connection() - self.on = self.i3.on - self.start() - - def loop(self) -> None: - self.i3.main() - - -class MergedUpdater(Updater): - def __init__(self, *args: Updater) -> None: - raise NotImplementedError("Deprecated, as hacky and currently unused") diff --git a/hm/desktop/frobar/setup.py b/hm/desktop/frobar/setup.py index 275440c..8528d03 100644 --- a/hm/desktop/frobar/setup.py +++ b/hm/desktop/frobar/setup.py @@ -2,19 +2,17 @@ from setuptools import setup setup( name="frobar", - version="2.0", + version="3.0", install_requires=[ - "coloredlogs", - "notmuch", "i3ipc", - "python-mpd2", "psutil", - "pulsectl", - "pyinotify", + "pulsectl-asyncio", + "pygobject3", + "rich", ], entry_points={ "console_scripts": [ - "frobar = frobar:run", + "frobar = frobar:main", ] }, )