#!/usr/bin/env python3 import asyncio import collections import datetime import enum import ipaddress import logging import os import signal import socket import time import typing import gi import gi.events import i3ipc import i3ipc.aio import psutil import pulsectl import pulsectl_asyncio import rich.color import rich.logging import rich.terminal_theme gi.require_version("Playerctl", "2.0") import gi.repository.GLib import gi.repository.Playerctl logging.basicConfig( level="DEBUG", format="%(message)s", datefmt="[%X]", handlers=[rich.logging.RichHandler()], ) log = logging.getLogger("frobar") T = typing.TypeVar("T", bound="ComposableText") P = typing.TypeVar("P", bound="ComposableText") C = typing.TypeVar("C", bound="ComposableText") Sortable = str | int # Display utilities def humanSize(numi: int) -> str: """ Returns a string of width 3+3 """ num = float(numi) for unit in ("B ", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB"): if abs(num) < 1000: if num >= 10: return f"{int(num):3d}{unit}" else: return f"{num:.1f}{unit}" num /= 1024 return f"{numi:d}YiB" def ramp(p: float, states: str = " ▁▂▃▄▅▆▇█") -> str: if p < 0: return "" d, m = divmod(p, 1.0) return states[-1] * int(d) + states[round(m * (len(states) - 1))] def clip(text: str, length: int = 30) -> str: if len(text) > length: text = text[: length - 1] + "…" return text class ComposableText(typing.Generic[P, C]): def __init__( self, parent: typing.Optional[P] = None, sortKey: Sortable = 0, ) -> None: self.parent: typing.Optional[P] = None self.children: typing.MutableSequence[C] = list() self.sortKey = sortKey if parent: self.setParent(parent) self.bar = self.getFirstParentOfType(Bar) def setParent(self, parent: P) -> None: assert self.parent is None parent.children.append(self) assert isinstance(parent.children, list) parent.children.sort(key=lambda c: c.sortKey) self.parent = parent self.parent.updateMarkup() def unsetParent(self) -> None: assert self.parent self.parent.children.remove(self) self.parent.updateMarkup() self.parent = None def getFirstParentOfType(self, typ: typing.Type[T]) -> T: parent = self while not isinstance(parent, typ): assert parent.parent, f"{self} doesn't have a parent of {typ}" parent = parent.parent return parent def updateMarkup(self) -> None: self.bar.refresh.set() # TODO OPTI See if worth caching the output def generateMarkup(self) -> str: raise NotImplementedError(f"{self} cannot generate markup") def getMarkup(self) -> str: return self.generateMarkup() class Button(enum.Enum): CLICK_LEFT = "1" CLICK_MIDDLE = "2" CLICK_RIGHT = "3" SCROLL_UP = "4" SCROLL_DOWN = "5" class Section(ComposableText): """ Colorable block separated by chevrons """ def __init__( self, parent: "Module", sortKey: Sortable = 0, color: rich.color.Color = rich.color.Color.default(), ) -> None: super().__init__(parent=parent, sortKey=sortKey) self.parent: "Module" self.color = color self.desiredText: str | None = None self.text = "" self.targetSize = -1 self.size = -1 self.animationTask: asyncio.Task | None = None self.actions: dict[Button, str] = dict() def isHidden(self) -> bool: return self.size < 0 # Geometric series, with a cap ANIM_A = 0.025 ANIM_R = 0.9 ANIM_MIN = 0.001 async def animate(self) -> None: increment = 1 if self.size < self.targetSize else -1 loop = asyncio.get_running_loop() frameTime = loop.time() animTime = self.ANIM_A skipped = 0 while self.size != self.targetSize: self.size += increment self.updateMarkup() animTime *= self.ANIM_R animTime = max(self.ANIM_MIN, animTime) frameTime += animTime sleepTime = frameTime - loop.time() # In case of stress, skip refreshing by not awaiting if sleepTime > 0: if skipped > 0: log.warning(f"Skipped {skipped} animation frame(s)") skipped = 0 await asyncio.sleep(sleepTime) else: skipped += 1 def setText(self, text: str | None) -> None: # OPTI Don't redraw nor reset animation if setting the same text if self.desiredText == text: return self.desiredText = text if text is None: self.text = "" self.targetSize = -1 else: self.text = f" {text} " self.targetSize = len(self.text) if self.animationTask: self.animationTask.cancel() # OPTI Skip the whole animation task if not required if self.size == self.targetSize: self.updateMarkup() else: self.animationTask = self.bar.taskGroup.create_task(self.animate()) def setAction(self, button: Button, callback: typing.Callable | None) -> None: if button in self.actions: command = self.actions[button] self.bar.removeAction(command) del self.actions[button] if callback: command = self.bar.addAction(callback) self.actions[button] = command def generateMarkup(self) -> str: assert not self.isHidden() pad = max(0, self.size - len(self.text)) text = self.text[: self.size] + " " * pad for button, command in self.actions.items(): text = "%{A" + button.value + ":" + command + ":}" + text + "%{A}" return text class Module(ComposableText): """ Sections handled by a same updater """ def __init__(self, parent: "Side") -> None: super().__init__(parent=parent) self.parent: "Side" self.children: typing.MutableSequence[Section] self.mirroring: Module | None = None self.mirrors: list[Module] = list() def mirror(self, module: "Module") -> None: self.mirroring = module module.mirrors.append(self) def getSections(self) -> typing.Sequence[Section]: if self.mirroring: return self.mirroring.children else: return self.children def updateMarkup(self) -> None: super().updateMarkup() for mirror in self.mirrors: mirror.updateMarkup() class Alignment(enum.Enum): LEFT = "l" RIGHT = "r" CENTER = "c" class Side(ComposableText): def __init__(self, parent: "Screen", alignment: Alignment) -> None: super().__init__(parent=parent) self.parent: Screen self.children: typing.MutableSequence[Module] = [] self.alignment = alignment self.bar = parent.getFirstParentOfType(Bar) def generateMarkup(self) -> str: if not self.children: return "" text = "%{" + self.alignment.value + "}" lastSection: Section | None = None for module in self.children: for section in module.getSections(): if section.isHidden(): continue hexa = section.color.get_truecolor(theme=self.bar.theme).hex if lastSection is None: if self.alignment == Alignment.LEFT: text += "%{B" + hexa + "}%{F-}" else: text += "%{B-}%{F" + hexa + "}%{R}%{F-}" elif isinstance(lastSection, SpacerSection): text += "%{B-}%{F" + hexa + "}%{R}%{F-}" else: if self.alignment == Alignment.RIGHT: if lastSection.color == section.color: text += "" else: text += "%{F" + hexa + "}%{R}" else: if lastSection.color == section.color: text += "" else: text += "%{R}%{B" + hexa + "}" text += "%{F-}" text += section.getMarkup() lastSection = section if self.alignment != Alignment.RIGHT and lastSection: text += "%{R}%{B-}" return text class Screen(ComposableText): def __init__(self, parent: "Bar", output: str) -> None: super().__init__(parent=parent) self.parent: "Bar" self.children: typing.MutableSequence[Side] self.output = output for alignment in Alignment: Side(parent=self, alignment=alignment) def generateMarkup(self) -> str: return ("%{Sn" + self.output + "}") + "".join( side.getMarkup() for side in self.children ) class Bar(ComposableText): """ Top-level """ def __init__( self, theme: rich.terminal_theme.TerminalTheme = rich.terminal_theme.DEFAULT_TERMINAL_THEME, ) -> None: super().__init__() self.parent: None self.children: typing.MutableSequence[Screen] self.longRunningTasks: list[asyncio.Task] = list() self.theme = theme self.refresh = asyncio.Event() self.taskGroup = asyncio.TaskGroup() self.providers: list["Provider"] = list() self.actionIndex = 0 self.actions: dict[str, typing.Callable] = dict() self.periodicProviderTask: typing.Coroutine | None = None i3 = i3ipc.Connection() for output in i3.get_outputs(): if not output.active: continue Screen(parent=self, output=output.name) def addLongRunningTask(self, coro: typing.Coroutine) -> None: task = self.taskGroup.create_task(coro) self.longRunningTasks.append(task) async def run(self) -> None: cmd = [ "lemonbar", "-b", "-a", "64", "-f", "DejaVuSansM Nerd Font:size=10", "-F", self.theme.foreground_color.hex, "-B", self.theme.background_color.hex, ] proc = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE ) async def refresher() -> None: assert proc.stdin while True: await self.refresh.wait() self.refresh.clear() markup = self.getMarkup() proc.stdin.write(markup.encode()) async def actionHandler() -> None: assert proc.stdout while True: line = await proc.stdout.readline() command = line.decode().strip() callback = self.actions[command] callback() async with self.taskGroup: self.addLongRunningTask(refresher()) self.addLongRunningTask(actionHandler()) for provider in self.providers: self.addLongRunningTask(provider.run()) def exit() -> None: log.info("Terminating") for task in self.longRunningTasks: task.cancel() loop = asyncio.get_event_loop() loop.add_signal_handler(signal.SIGINT, exit) def generateMarkup(self) -> str: return "".join(screen.getMarkup() for screen in self.children) + "\n" def addProvider( self, provider: "Provider", alignment: Alignment = Alignment.LEFT, screenNum: int | None = None, ) -> None: """ screenNum: the provider will be added on this screen if set, all otherwise """ modules = list() for s, screen in enumerate(self.children): if screenNum is None or s == screenNum: side = next(filter(lambda s: s.alignment == alignment, screen.children)) module = Module(parent=side) modules.append(module) provider.modules = modules if modules: self.providers.append(provider) def addAction(self, callback: typing.Callable) -> str: command = f"{self.actionIndex:x}" self.actions[command] = callback self.actionIndex += 1 return command def removeAction(self, command: str) -> None: del self.actions[command] class Provider: sectionType: type[Section] = Section def __init__(self, color: rich.color.Color = rich.color.Color.default()) -> None: self.modules: list[Module] = list() self.color = color async def run(self) -> None: # Not a NotImplementedError, otherwise can't combine all classes pass class MirrorProvider(Provider): def __init__(self, color: rich.color.Color = rich.color.Color.default()) -> None: super().__init__(color=color) self.module: Module async def run(self) -> None: await super().run() self.module = self.modules[0] for module in self.modules[1:]: module.mirror(self.module) class SingleSectionProvider(MirrorProvider): async def run(self) -> None: await super().run() self.section = self.sectionType(parent=self.module, color=self.color) class StaticProvider(SingleSectionProvider): def __init__( self, text: str, color: rich.color.Color = rich.color.Color.default() ) -> None: super().__init__(color=color) self.text = text async def run(self) -> None: await super().run() self.section.setText(self.text) class SpacerSection(Section): pass class SpacerProvider(SingleSectionProvider): sectionType = SpacerSection def __init__(self, length: int = 5) -> None: super().__init__(color=rich.color.Color.default()) self.length = length async def run(self) -> None: await super().run() assert isinstance(self.section, SpacerSection) self.section.setText(" " * self.length) class StatefulSection(Section): def __init__( self, parent: Module, sortKey: Sortable = 0, color: rich.color.Color = rich.color.Color.default(), ) -> None: super().__init__(parent=parent, sortKey=sortKey, color=color) self.state = 0 self.numberStates: int self.setAction(Button.CLICK_LEFT, self.incrementState) self.setAction(Button.CLICK_RIGHT, self.decrementState) def incrementState(self) -> None: self.state += 1 self.changeState() def decrementState(self) -> None: self.state -= 1 self.changeState() def setChangedState(self, callback: typing.Callable) -> None: self.callback = callback def changeState(self) -> None: self.state %= self.numberStates self.bar.taskGroup.create_task(self.callback()) class StatefulSectionProvider(Provider): sectionType = StatefulSection class SingleStatefulSectionProvider(StatefulSectionProvider, SingleSectionProvider): section: StatefulSection class MultiSectionsProvider(Provider): def __init__(self, color: rich.color.Color = rich.color.Color.default()) -> None: super().__init__(color=color) self.sectionKeys: dict[Module, dict[Sortable, Section]] = ( collections.defaultdict(dict) ) self.updaters: dict[Section, typing.Callable] = dict() async def getSectionUpdater(self, section: Section) -> typing.Callable: raise NotImplementedError() async def updateSections(self, sections: set[Sortable], module: Module) -> None: moduleSections = self.sectionKeys[module] async with asyncio.TaskGroup() as tg: for sortKey in sections: section = moduleSections.get(sortKey) if not section: section = self.sectionType( parent=module, sortKey=sortKey, color=self.color ) self.updaters[section] = await self.getSectionUpdater(section) moduleSections[sortKey] = section updater = self.updaters[section] tg.create_task(updater()) missingKeys = set(moduleSections.keys()) - sections for missingKey in missingKeys: section = moduleSections.get(missingKey) assert section section.setText(None) class PeriodicProvider(Provider): async def init(self) -> None: pass async def loop(self) -> None: raise NotImplementedError() @classmethod async def task(cls, bar: Bar) -> None: providers = list() for provider in bar.providers: if isinstance(provider, PeriodicProvider): providers.append(provider) await provider.init() while True: # TODO Block bar update during the periodic update of the loops loops = [provider.loop() for provider in providers] asyncio.gather(*loops) now = datetime.datetime.now() # Hardcoded to 1 second... not sure if we want some more than that, # and if the logic to check if a task should run would be a win # compared to the task itself remaining = 1 - now.microsecond / 1000000 await asyncio.sleep(remaining) async def run(self) -> None: await super().run() for module in self.modules: bar = module.getFirstParentOfType(Bar) assert bar if not bar.periodicProviderTask: bar.periodicProviderTask = PeriodicProvider.task(bar) bar.addLongRunningTask(bar.periodicProviderTask) class PeriodicStatefulProvider(SingleStatefulSectionProvider, PeriodicProvider): async def run(self) -> None: await super().run() self.section.setChangedState(self.loop) # Providers class I3ModeProvider(SingleSectionProvider): def on_mode(self, i3: i3ipc.Connection, e: i3ipc.Event) -> None: self.section.setText(None if e.change == "default" else e.change) async def run(self) -> None: await super().run() i3 = await i3ipc.aio.Connection(auto_reconnect=True).connect() i3.on(i3ipc.Event.MODE, self.on_mode) await i3.main() # TODO Hide WorkspaceProvider when this is active class I3WindowTitleProvider(SingleSectionProvider): # TODO FEAT To make this available from start, we need to find the # `focused=True` element following the `focus` array def on_window(self, i3: i3ipc.Connection, e: i3ipc.Event) -> None: self.section.setText(e.container.name) async def run(self) -> None: await super().run() i3 = await i3ipc.aio.Connection(auto_reconnect=True).connect() i3.on(i3ipc.Event.WINDOW, self.on_window) await i3.main() class I3WorkspacesProvider(MultiSectionsProvider): COLOR_URGENT = rich.color.Color.parse("red") COLOR_FOCUSED = rich.color.Color.parse("yellow") # TODO Should be orange (not a terminal color) COLOR_VISIBLE = rich.color.Color.parse("cyan") COLOR_DEFAULT = rich.color.Color.parse("bright_black") def __init__( self, custom_names: dict[str, str] = {}, ) -> None: super().__init__() self.workspaces: dict[int, i3ipc.WorkspaceReply] self.custom_names = custom_names self.sections: dict[int, Section] = dict() self.modulesFromOutput: dict[str, Module] = dict() self.bar: Bar async def getSectionUpdater(self, section: Section) -> typing.Callable: assert isinstance(section.sortKey, int) num = section.sortKey def switch_to_workspace() -> None: self.bar.taskGroup.create_task(self.i3.command(f"workspace number {num}")) section.setAction(Button.CLICK_LEFT, switch_to_workspace) async def update() -> None: workspace = self.workspaces[num] name = workspace.name if workspace.urgent: section.color = self.COLOR_URGENT elif workspace.focused: section.color = self.COLOR_FOCUSED elif workspace.visible: section.color = self.COLOR_VISIBLE else: section.color = self.COLOR_DEFAULT if workspace.focused: name = self.custom_names.get(name, name) section.setText(name) return update async def updateWorkspaces(self) -> None: """ Since the i3 IPC interface cannot really tell you by events when workspaces get invisible or not urgent anymore. Relying on those exclusively would require reimplementing some of i3 logic. Fetching all the workspaces on event looks ugly but is the most maintainable. Times I tried to challenge this and failed: 2. """ workspaces = await self.i3.get_workspaces() self.workspaces = dict() modules = collections.defaultdict(set) for workspace in workspaces: self.workspaces[workspace.num] = workspace module = self.modulesFromOutput[workspace.output] modules[module].add(workspace.num) await asyncio.gather( *[self.updateSections(nums, module) for module, nums in modules.items()] ) def onWorkspaceChange( self, i3: i3ipc.Connection, e: i3ipc.Event | None = None ) -> None: # Cancelling the task doesn't seem to prevent performance double-events self.bar.taskGroup.create_task(self.updateWorkspaces()) async def run(self) -> None: for module in self.modules: screen = module.getFirstParentOfType(Screen) output = screen.output self.modulesFromOutput[output] = module self.bar = module.bar self.i3 = await i3ipc.aio.Connection(auto_reconnect=True).connect() self.i3.on(i3ipc.Event.WORKSPACE, self.onWorkspaceChange) self.onWorkspaceChange(self.i3) await self.i3.main() class MprisProvider(MirrorProvider): STATUSES = { gi.repository.Playerctl.PlaybackStatus.PLAYING: "", gi.repository.Playerctl.PlaybackStatus.PAUSED: "", gi.repository.Playerctl.PlaybackStatus.STOPPED: "", } PROVIDERS = { "mpd": "", "firefox": "", "chromium": "", "mpv": "", } async def run(self) -> None: await super().run() self.status = self.sectionType(parent=self.module, color=self.color) self.album = self.sectionType(parent=self.module, color=self.color) self.artist = self.sectionType(parent=self.module, color=self.color) self.title = self.sectionType(parent=self.module, color=self.color) self.manager = gi.repository.Playerctl.PlayerManager() self.manager.connect("name-appeared", self.on_name_appeared) self.manager.connect("player-vanished", self.on_player_vanished) self.playerctldName = gi.repository.Playerctl.PlayerName() self.playerctldName.name = "playerctld" self.playerctldName.source = gi.repository.Playerctl.Source.DBUS_SESSION self.player: gi.repository.Playerctl.Player | None = None self.playing = asyncio.Event() for name in self.manager.props.player_names: self.init_player(name) self.updateSections() while True: # Occasionally it will skip a second # but haven't managed to reproduce with debug info await self.playing.wait() self.updateTitle() if self.player: pos = self.player.props.position rem = 1 - (pos % 1000000) / 1000000 await asyncio.sleep(rem) else: self.playing.clear() @staticmethod def get( something: gi.overrides.GLib.Variant, key: str, default: typing.Any = None ) -> typing.Any: if key in something.keys(): return something[key] else: return default @staticmethod def formatUs(ms: int) -> str: if ms < 60 * 60 * 1000000: return time.strftime("%M:%S", time.gmtime(ms // 1000000)) else: return str(datetime.timedelta(microseconds=ms)) def findCurrentPlayer(self) -> None: for name in [self.playerctldName] + self.manager.props.player_names: # TODO Test what happens when playerctld is not available self.player = gi.repository.Playerctl.Player.new_from_name(name) if not self.player.props.can_play: continue break else: self.player = None def updateSections(self) -> None: self.findCurrentPlayer() if self.player is None: self.status.setText(None) self.album.setText(None) self.artist.setText(None) self.title.setText(None) self.playing.clear() return player = self.player.props.player_name player = self.PROVIDERS.get(player, player) status = self.STATUSES.get(self.player.props.playback_status, "?") self.status.setText(f"{player} {status}") if ( self.player.props.playback_status == gi.repository.Playerctl.PlaybackStatus.PLAYING ): self.playing.set() else: self.playing.clear() metadata = self.player.props.metadata album = self.get(metadata, "xesam:album") if album: self.album.setText(f" {clip(album)}") else: self.album.setText(None) artists = self.get(metadata, "xesam:artist") if artists: artist = ", ".join(artists) self.artist.setText(f" {clip(artist)}") else: self.artist.setText(None) self.updateTitle() def updateTitle(self) -> None: if self.player is None: return metadata = self.player.props.metadata pos = self.player.props.position # In µs text = f" {self.formatUs(pos)}" dur = self.get(metadata, "mpris:length") if dur: text += f"/{self.formatUs(dur)}" title = self.get(metadata, "xesam:title") if title: text += f" {clip(title)}" self.title.setText(text) def on_player_vanished( self, manager: gi.repository.Playerctl.PlayerManager, player: gi.repository.Playerctl.Player, ) -> None: self.updateSections() def on_event( self, player: gi.repository.Playerctl.Player, _: typing.Any, manager: gi.repository.Playerctl.PlayerManager, ) -> None: self.updateSections() def init_player(self, name: gi.repository.Playerctl.PlayerName) -> None: player = gi.repository.Playerctl.Player.new_from_name(name) # All events will cause the active player to change, # so we listen on all events, even if the display won't change player.connect("playback-status", self.on_event, self.manager) player.connect("loop-status", self.on_event, self.manager) player.connect("shuffle", self.on_event, self.manager) player.connect("metadata", self.on_event, self.manager) player.connect("volume", self.on_event, self.manager) player.connect("seeked", self.on_event, self.manager) self.manager.manage_player(player) def on_name_appeared( self, manager: gi.repository.Playerctl.PlayerManager, name: str ) -> None: self.init_player(name) self.updateSections() class AlertingProvider(Provider): COLOR_NORMAL = rich.color.Color.parse("green") COLOR_WARNING = rich.color.Color.parse("yellow") COLOR_DANGER = rich.color.Color.parse("red") warningThreshold: float dangerThreshold: float def updateLevel(self, level: float) -> None: if level > self.dangerThreshold: color = self.COLOR_DANGER elif level > self.warningThreshold: color = self.COLOR_WARNING else: color = self.COLOR_NORMAL for module in self.modules: for section in module.getSections(): section.color = color class CpuProvider(AlertingProvider, PeriodicStatefulProvider): async def init(self) -> None: self.section.numberStates = 3 self.warningThreshold = 75 self.dangerThreshold = 95 async def loop(self) -> None: percent = psutil.cpu_percent(percpu=False) self.updateLevel(percent) text = "" if self.section.state >= 2: percents = psutil.cpu_percent(percpu=True) text += " " + "".join([ramp(p / 100) for p in percents]) elif self.section.state >= 1: text += " " + ramp(percent / 100) self.section.setText(text) class LoadProvider(AlertingProvider, PeriodicStatefulProvider): async def init(self) -> None: self.section.numberStates = 3 self.warningThreshold = 5 self.dangerThreshold = 10 async def loop(self) -> None: load = os.getloadavg() self.updateLevel(load[0]) text = "" loads = 3 if self.section.state >= 2 else self.section.state for load_index in range(loads): text += f" {load[load_index]:.2f}" self.section.setText(text) class RamProvider(AlertingProvider, PeriodicStatefulProvider): async def init(self) -> None: self.section.numberStates = 4 self.warningThreshold = 75 self.dangerThreshold = 95 async def loop(self) -> None: mem = psutil.virtual_memory() self.updateLevel(mem.percent) text = "" if self.section.state >= 1: text += " " + ramp(mem.percent / 100) if self.section.state >= 2: text += humanSize(mem.total - mem.available) if self.section.state >= 3: text += "/" + humanSize(mem.total) self.section.setText(text) class TemperatureProvider(AlertingProvider, PeriodicStatefulProvider): RAMP = "" MAIN_TEMPS = ["coretemp", "amdgpu", "cpu_thermal"] # For Intel, AMD and ARM respectively. main: str async def init(self) -> None: self.section.numberStates = 2 allTemp = psutil.sensors_temperatures() for main in self.MAIN_TEMPS: if main in allTemp: self.main = main break else: raise IndexError("Could not find suitable temperature sensor") temp = allTemp[self.main][0] self.warningThreshold = temp.high or 90.0 self.dangerThreshold = temp.critical or 100.0 async def loop(self) -> None: allTemp = psutil.sensors_temperatures() temp = allTemp[self.main][0] self.updateLevel(temp.current) text = ramp(temp.current / self.warningThreshold, self.RAMP) if self.section.state >= 1: text += f" {temp.current:.0f}°C" self.section.setText(text) class BatteryProvider(AlertingProvider, PeriodicStatefulProvider): # TODO Support ACPID for events RAMP = "" async def init(self) -> None: self.section.numberStates = 3 # TODO 1 refresh rate is too quick self.warningThreshold = 75 self.dangerThreshold = 95 async def loop(self) -> None: bat = psutil.sensors_battery() if not bat: self.section.setText(None) self.updateLevel(100 - bat.percent) text = "" if bat.power_plugged else "" text += ramp(bat.percent / 100, self.RAMP) if self.section.state >= 1: text += f" {bat.percent:.0f}%" if self.section.state >= 2: h = int(bat.secsleft / 3600) m = int((bat.secsleft - h * 3600) / 60) text += f" ({h:d}:{m:02d})" self.section.setText(text) class PulseaudioProvider( MirrorProvider, StatefulSectionProvider, MultiSectionsProvider ): async def getSectionUpdater(self, section: Section) -> typing.Callable: assert isinstance(section, StatefulSection) assert isinstance(section.sortKey, str) sink = self.sinks[section.sortKey] if ( sink.port_active.name == "analog-output-headphones" or sink.port_active.description == "Headphones" ): icon = "" elif ( sink.port_active.name == "analog-output-speaker" or sink.port_active.description == "Speaker" ): icon = "" elif sink.port_active.name in ("headset-output", "headphone-output"): icon = "" else: icon = "?" section.numberStates = 3 section.state = 1 # TODO Change volume with wheel async def updater() -> None: assert isinstance(section, StatefulSection) text = icon sink = self.sinks[section.sortKey] async with pulsectl_asyncio.PulseAsync("frobar-get-volume") as pulse: vol = await pulse.volume_get_all_chans(sink) if section.state == 1: text += f" {ramp(vol)}" elif section.state == 2: text += f" {vol:.0%}" # TODO Show which is default section.setText(text) section.setChangedState(updater) return updater async def update(self) -> None: async with pulsectl_asyncio.PulseAsync("frobar-list-sinks") as pulse: self.sinks = dict((sink.name, sink) for sink in await pulse.sink_list()) await self.updateSections(set(self.sinks.keys()), self.module) async def run(self) -> None: await super().run() await self.update() async with pulsectl_asyncio.PulseAsync("frobar-events-listener") as pulse: async for event in pulse.subscribe_events(pulsectl.PulseEventMaskEnum.sink): await self.update() class NetworkProvider( MirrorProvider, PeriodicProvider, StatefulSectionProvider, MultiSectionsProvider ): def __init__( self, color: rich.color.Color = rich.color.Color.default(), ) -> None: super().__init__(color=color) async def init(self) -> None: loop = asyncio.get_running_loop() self.time = loop.time() self.io_counters = psutil.net_io_counters(pernic=True) async def doNothing(self) -> None: pass @staticmethod def getIfaceAttributes(iface: str) -> tuple[bool, str, bool]: relevant = True icon = "?" wifi = False if iface == "lo": relevant = False elif iface.startswith("eth") or iface.startswith("enp"): if "u" in iface: icon = "" else: icon = "" elif iface.startswith("wlan") or iface.startswith("wl"): icon = "" wifi = True elif ( iface.startswith("tun") or iface.startswith("tap") or iface.startswith("wg") ): icon = "" elif iface.startswith("docker"): icon = "" elif iface.startswith("veth"): icon = "" elif iface.startswith("vboxnet"): icon = "" return relevant, icon, wifi async def getSectionUpdater(self, section: Section) -> typing.Callable: assert isinstance(section, StatefulSection) assert isinstance(section.sortKey, str) iface = section.sortKey relevant, icon, wifi = self.getIfaceAttributes(iface) if not relevant: return self.doNothing section.numberStates = 5 if wifi else 4 section.state = 1 if wifi else 0 async def update() -> None: assert isinstance(section, StatefulSection) if not self.if_stats[iface].isup: section.setText(None) return text = icon state = section.state + (0 if wifi else 1) if wifi and state >= 1: # SSID cmd = ["iwgetid", iface, "--raw"] proc = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE ) stdout, stderr = await proc.communicate() text += f" {stdout.decode().strip()}" if state >= 2: # Address for address in self.if_addrs[iface]: if address.family == socket.AF_INET: net = ipaddress.IPv4Network( (address.address, address.netmask), strict=False ) text += f" {address.address}/{net.prefixlen}" break if state >= 3: # Speed prevRecv = self.prev_io_counters[iface].bytes_recv recv = self.io_counters[iface].bytes_recv prevSent = self.prev_io_counters[iface].bytes_sent sent = self.io_counters[iface].bytes_sent dt = self.time - self.prev_time recvDiff = (recv - prevRecv) / dt sentDiff = (sent - prevSent) / dt text += f" ↓{humanSize(recvDiff)}↑{humanSize(sentDiff)}" if state >= 4: # Counter text += f" ⇓{humanSize(recv)}⇑{humanSize(sent)}" section.setText(text) section.setChangedState(update) return update async def loop(self) -> None: loop = asyncio.get_running_loop() self.prev_io_counters = self.io_counters self.prev_time = self.time # On-demand would only benefit if_addrs: # stats are used to determine display, # and we want to keep previous io_counters # so displaying stats is ~instant. self.time = loop.time() self.if_stats = psutil.net_if_stats() self.if_addrs = psutil.net_if_addrs() self.io_counters = psutil.net_io_counters(pernic=True) await self.updateSections(set(self.if_stats.keys()), self.module) class TimeProvider(PeriodicStatefulProvider): FORMATS = ["%H:%M", "%m-%d %H:%M:%S", "%a %y-%m-%d %H:%M:%S"] async def init(self) -> None: self.section.state = 1 self.section.numberStates = len(self.FORMATS) async def loop(self) -> None: now = datetime.datetime.now() format = self.FORMATS[self.section.state] self.section.setText(now.strftime(format)) async def main() -> None: # TODO Configurable FROGARIZED = [ "#092c0e", "#143718", "#5a7058", "#677d64", "#89947f", "#99a08d", "#fae2e3", "#fff0f1", "#e0332e", "#cf4b15", "#bb8801", "#8d9800", "#1fa198", "#008dd1", "#5c73c4", "#d43982", ] # TODO Not super happy with the color management, # while using an existing library is great, it's limited to ANSI colors def base16_color(color: int) -> tuple[int, int, int]: hexa = FROGARIZED[color] return tuple(rich.color.parse_rgb_hex(hexa[1:])) theme = rich.terminal_theme.TerminalTheme( base16_color(0x0), base16_color(0x0), # TODO should be 7, currently 0 so it's compatible with v2 [ base16_color(0x0), # black base16_color(0x8), # red base16_color(0xB), # green base16_color(0xA), # yellow base16_color(0xD), # blue base16_color(0xE), # magenta base16_color(0xC), # cyan base16_color(0x5), # white ], [ base16_color(0x3), # bright black base16_color(0x8), # bright red base16_color(0xB), # bright green base16_color(0xA), # bright yellow base16_color(0xD), # bright blue base16_color(0xE), # bright magenta base16_color(0xC), # bright cyan base16_color(0x7), # bright white ], ) bar = Bar(theme=theme) dualScreen = len(bar.children) > 1 leftPreferred = 0 if dualScreen else None rightPreferred = 1 if dualScreen else None workspaces_suffixes = "▲■" workspaces_names = dict( (str(i + 1), f"{i+1} {c}") for i, c in enumerate(workspaces_suffixes) ) color = rich.color.Color.parse bar.addProvider(I3ModeProvider(color=color("red")), alignment=Alignment.LEFT) bar.addProvider( I3WorkspacesProvider(custom_names=workspaces_names), alignment=Alignment.LEFT ) if dualScreen: bar.addProvider( I3WindowTitleProvider(color=color("white")), screenNum=0, alignment=Alignment.CENTER, ) bar.addProvider( MprisProvider(color=color("bright_white")), screenNum=rightPreferred, alignment=Alignment.CENTER, ) else: bar.addProvider( SpacerProvider(), alignment=Alignment.LEFT, ) bar.addProvider( MprisProvider(color=color("bright_white")), alignment=Alignment.LEFT, ) bar.addProvider(CpuProvider(), screenNum=leftPreferred, alignment=Alignment.RIGHT) bar.addProvider(LoadProvider(), screenNum=leftPreferred, alignment=Alignment.RIGHT) bar.addProvider(RamProvider(), screenNum=leftPreferred, alignment=Alignment.RIGHT) bar.addProvider( TemperatureProvider(), screenNum=leftPreferred, alignment=Alignment.RIGHT, ) bar.addProvider( BatteryProvider(), screenNum=leftPreferred, alignment=Alignment.RIGHT ) bar.addProvider( PulseaudioProvider(color=color("magenta")), screenNum=rightPreferred, alignment=Alignment.RIGHT, ) bar.addProvider( NetworkProvider(color=color("blue")), screenNum=leftPreferred, alignment=Alignment.RIGHT, ) bar.addProvider(TimeProvider(color=color("cyan")), alignment=Alignment.RIGHT) await bar.run() if __name__ == "__main__": # Using GLib's event loop so we can run GLib's code policy = gi.events.GLibEventLoopPolicy() asyncio.set_event_loop_policy(policy) loop = policy.get_event_loop() loop.run_until_complete(main())