#!/usr/bin/env python3 import asyncio import datetime import enum import ipaddress import logging import os import signal import socket import typing import i3ipc import i3ipc.aio import psutil import pulsectl import pulsectl_asyncio import rich.color import rich.logging import rich.terminal_theme logging.basicConfig( level="DEBUG", format="%(message)s", datefmt="[%X]", handlers=[rich.logging.RichHandler()], ) log = logging.getLogger("frobar") T = typing.TypeVar("T", bound="ComposableText") P = typing.TypeVar("P", bound="ComposableText") C = typing.TypeVar("C", bound="ComposableText") Sortable = str | int # Display utilities def humanSize(numi: int) -> str: """ Returns a string of width 3+3 """ num = float(numi) for unit in ("B ", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB"): if abs(num) < 1000: if num >= 10: return f"{int(num):3d}{unit}" else: return f"{num:.1f}{unit}" num /= 1024 return f"{numi:d}YiB" def ramp(p: float, states: str = " ▁▂▃▄▅▆▇█") -> str: if p < 0: return "" d, m = divmod(p, 1.0) return states[-1] * int(d) + states[round(m * (len(states) - 1))] class ComposableText(typing.Generic[P, C]): def __init__( self, parent: typing.Optional[P] = None, sortKey: Sortable = 0, ) -> None: self.parent: typing.Optional[P] = None self.children: typing.MutableSequence[C] = list() self.sortKey = sortKey if parent: self.setParent(parent) self.bar = self.getFirstParentOfType(Bar) def setParent(self, parent: P) -> None: assert self.parent is None parent.children.append(self) assert isinstance(parent.children, list) parent.children.sort(key=lambda c: c.sortKey) self.parent = parent self.parent.updateMarkup() def unsetParent(self) -> None: assert self.parent self.parent.children.remove(self) self.parent.updateMarkup() self.parent = None def getFirstParentOfType(self, typ: typing.Type[T]) -> T: parent = self while not isinstance(parent, typ): assert parent.parent, f"{self} doesn't have a parent of {typ}" parent = parent.parent return parent def updateMarkup(self) -> None: self.bar.refresh.set() # TODO OPTI See if worth caching the output def generateMarkup(self) -> str: raise NotImplementedError(f"{self} cannot generate markup") def getMarkup(self) -> str: return self.generateMarkup() class Button(enum.Enum): CLICK_LEFT = "1" CLICK_MIDDLE = "2" CLICK_RIGHT = "3" SCROLL_UP = "4" SCROLL_DOWN = "5" class Section(ComposableText): """ Colorable block separated by chevrons """ def __init__( self, parent: "Module", sortKey: Sortable = 0, color: rich.color.Color = rich.color.Color.default(), ) -> None: super().__init__(parent=parent, sortKey=sortKey) self.parent: "Module" self.color = color self.desiredText: str | None = None self.text = "" self.targetSize = -1 self.size = -1 self.animationTask: asyncio.Task | None = None self.actions: dict[Button, str] = dict() def isHidden(self) -> bool: return self.size < 0 # Geometric series, with a cap ANIM_A = 0.025 ANIM_R = 0.9 ANIM_MIN = 0.001 async def animate(self) -> None: increment = 1 if self.size < self.targetSize else -1 loop = asyncio.get_running_loop() frameTime = loop.time() animTime = self.ANIM_A while self.size != self.targetSize: self.size += increment self.updateMarkup() animTime *= self.ANIM_R animTime = max(self.ANIM_MIN, animTime) frameTime += animTime sleepTime = frameTime - loop.time() # In case of stress, skip refreshing by not awaiting if sleepTime > 0: await asyncio.sleep(sleepTime) else: log.warning("Skipped an animation frame") def setText(self, text: str | None) -> None: # OPTI Don't redraw nor reset animation if setting the same text if self.desiredText == text: return self.desiredText = text if text is None: self.text = "" self.targetSize = -1 else: self.text = f" {text} " self.targetSize = len(self.text) if self.animationTask: self.animationTask.cancel() # OPTI Skip the whole animation task if not required if self.size == self.targetSize: self.updateMarkup() else: self.animationTask = self.bar.taskGroup.create_task(self.animate()) def setAction(self, button: Button, callback: typing.Callable | None) -> None: if button in self.actions: command = self.actions[button] self.bar.removeAction(command) del self.actions[button] if callback: command = self.bar.addAction(callback) self.actions[button] = command def generateMarkup(self) -> str: assert not self.isHidden() pad = max(0, self.size - len(self.text)) text = self.text[: self.size] + " " * pad for button, command in self.actions.items(): text = "%{A" + button.value + ":" + command + ":}" + text + "%{A}" return text class Module(ComposableText): """ Sections handled by a same updater """ def __init__(self, parent: "Side") -> None: super().__init__(parent=parent) self.parent: "Side" self.children: typing.MutableSequence[Section] self.mirroring: Module | None = None self.mirrors: list[Module] = list() def mirror(self, module: "Module") -> None: self.mirroring = module module.mirrors.append(self) def getSections(self) -> typing.Sequence[Section]: if self.mirroring: return self.mirroring.children else: return self.children def updateMarkup(self) -> None: super().updateMarkup() for mirror in self.mirrors: mirror.updateMarkup() class Alignment(enum.Enum): LEFT = "l" RIGHT = "r" CENTER = "c" class Side(ComposableText): def __init__(self, parent: "Screen", alignment: Alignment) -> None: super().__init__(parent=parent) self.parent: Screen self.children: typing.MutableSequence[Module] = [] self.alignment = alignment self.bar = parent.getFirstParentOfType(Bar) def generateMarkup(self) -> str: if not self.children: return "" text = "%{" + self.alignment.value + "}" lastSection: Section | None = None for module in self.children: for section in module.getSections(): if section.isHidden(): continue hexa = section.color.get_truecolor(theme=self.bar.theme).hex if lastSection is None: if self.alignment == Alignment.LEFT: text += "%{B" + hexa + "}%{F-}" else: text += "%{B-}%{F" + hexa + "}%{R}%{F-}" else: if self.alignment == Alignment.RIGHT: if lastSection.color == section.color: text += "" else: text += "%{F" + hexa + "}%{R}" else: if lastSection.color == section.color: text += "" else: text += "%{R}%{B" + hexa + "}" text += "%{F-}" text += section.getMarkup() lastSection = section if self.alignment != Alignment.RIGHT: text += "%{R}%{B-}" return text class Screen(ComposableText): def __init__(self, parent: "Bar", output: str) -> None: super().__init__(parent=parent) self.parent: "Bar" self.children: typing.MutableSequence[Side] self.output = output for alignment in Alignment: Side(parent=self, alignment=alignment) def generateMarkup(self) -> str: return ("%{Sn" + self.output + "}") + "".join( side.getMarkup() for side in self.children ) class Bar(ComposableText): """ Top-level """ def __init__( self, theme: rich.terminal_theme.TerminalTheme = rich.terminal_theme.DEFAULT_TERMINAL_THEME, ) -> None: super().__init__() self.parent: None self.children: typing.MutableSequence[Screen] self.longRunningTasks: list[asyncio.Task] = list() self.theme = theme self.refresh = asyncio.Event() self.taskGroup = asyncio.TaskGroup() self.providers: list["Provider"] = list() self.actionIndex = 0 self.actions: dict[str, typing.Callable] = dict() self.periodicProviderTask: typing.Coroutine | None = None i3 = i3ipc.Connection() for output in i3.get_outputs(): if not output.active: continue Screen(parent=self, output=output.name) def addLongRunningTask(self, coro: typing.Coroutine) -> None: task = self.taskGroup.create_task(coro) self.longRunningTasks.append(task) async def run(self) -> None: cmd = [ "lemonbar", "-b", "-a", "64", "-f", "DejaVuSansM Nerd Font:size=10", "-F", self.theme.foreground_color.hex, "-B", self.theme.background_color.hex, ] proc = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE ) async def refresher() -> None: assert proc.stdin while True: await self.refresh.wait() self.refresh.clear() markup = self.getMarkup() proc.stdin.write(markup.encode()) async def actionHandler() -> None: assert proc.stdout while True: line = await proc.stdout.readline() command = line.decode().strip() callback = self.actions[command] callback() async with self.taskGroup: self.addLongRunningTask(refresher()) self.addLongRunningTask(actionHandler()) for provider in self.providers: self.addLongRunningTask(provider.run()) def exit() -> None: log.info("Terminating") for task in self.longRunningTasks: task.cancel() loop = asyncio.get_event_loop() loop.add_signal_handler(signal.SIGINT, exit) def generateMarkup(self) -> str: return "".join(screen.getMarkup() for screen in self.children) + "\n" def addProvider( self, provider: "Provider", alignment: Alignment = Alignment.LEFT, screenNum: int | None = None, ) -> None: """ screenNum: the provider will be added on this screen if set, all otherwise """ modules = list() for s, screen in enumerate(self.children): if screenNum is None or s == screenNum: side = next(filter(lambda s: s.alignment == alignment, screen.children)) module = Module(parent=side) modules.append(module) provider.modules = modules if modules: self.providers.append(provider) def addAction(self, callback: typing.Callable) -> str: command = f"{self.actionIndex:x}" self.actions[command] = callback self.actionIndex += 1 return command def removeAction(self, command: str) -> None: del self.actions[command] class Provider: def __init__(self, color: rich.color.Color = rich.color.Color.default()) -> None: self.modules: list[Module] = list() self.color = color async def run(self) -> None: # Not a NotImplementedError, otherwise can't combine all classes pass class MirrorProvider(Provider): def __init__(self, color: rich.color.Color = rich.color.Color.default()) -> None: super().__init__(color=color) self.module: Module async def run(self) -> None: await super().run() self.module = self.modules[0] for module in self.modules[1:]: module.mirror(self.module) class SingleSectionProvider(MirrorProvider): async def run(self) -> None: await super().run() self.section = Section(parent=self.module, color=self.color) class StaticProvider(SingleSectionProvider): def __init__( self, text: str, color: rich.color.Color = rich.color.Color.default() ) -> None: super().__init__(color=color) self.text = text async def run(self) -> None: await super().run() self.section.setText(self.text) class StatefulSection(Section): def __init__( self, parent: Module, sortKey: Sortable = 0, color: rich.color.Color = rich.color.Color.default(), ) -> None: super().__init__(parent=parent, sortKey=sortKey, color=color) self.state = 0 self.numberStates: int self.setAction(Button.CLICK_LEFT, self.incrementState) self.setAction(Button.CLICK_RIGHT, self.decrementState) def incrementState(self) -> None: self.state += 1 self.changeState() def decrementState(self) -> None: self.state -= 1 self.changeState() def setChangedState(self, callback: typing.Callable) -> None: self.callback = callback def changeState(self) -> None: self.state %= self.numberStates self.bar.taskGroup.create_task(self.callback()) class SingleStatefulSectionProvider(MirrorProvider): async def run(self) -> None: await super().run() self.section = StatefulSection(parent=self.module, color=self.color) class PeriodicProvider(Provider): async def init(self) -> None: pass async def loop(self) -> None: raise NotImplementedError() @classmethod async def task(cls, bar: Bar) -> None: providers = list() for provider in bar.providers: if isinstance(provider, PeriodicProvider): providers.append(provider) await provider.init() while True: # TODO Block bar update during the periodic update of the loops loops = [provider.loop() for provider in providers] asyncio.gather(*loops) now = datetime.datetime.now() # Hardcoded to 1 second... not sure if we want some more than that, # and if the logic to check if a task should run would be a win # compared to the task itself remaining = 1 - now.microsecond / 1000000 await asyncio.sleep(remaining) async def run(self) -> None: await super().run() for module in self.modules: bar = module.getFirstParentOfType(Bar) assert bar if not bar.periodicProviderTask: bar.periodicProviderTask = PeriodicProvider.task(bar) bar.addLongRunningTask(bar.periodicProviderTask) class PeriodicStatefulProvider(SingleStatefulSectionProvider, PeriodicProvider): async def run(self) -> None: await super().run() self.section.setChangedState(self.loop) # Providers class I3ModeProvider(SingleSectionProvider): def on_mode(self, i3: i3ipc.Connection, e: i3ipc.Event) -> None: self.section.setText(None if e.change == "default" else e.change) async def run(self) -> None: await super().run() i3 = await i3ipc.aio.Connection(auto_reconnect=True).connect() i3.on(i3ipc.Event.MODE, self.on_mode) await i3.main() class I3WindowTitleProvider(SingleSectionProvider): # TODO FEAT To make this available from start, we need to find the # `focused=True` element following the `focus` array def on_window(self, i3: i3ipc.Connection, e: i3ipc.Event) -> None: self.section.setText(e.container.name) async def run(self) -> None: await super().run() i3 = await i3ipc.aio.Connection(auto_reconnect=True).connect() i3.on(i3ipc.Event.WINDOW, self.on_window) await i3.main() class I3WorkspacesProvider(Provider): COLOR_URGENT = rich.color.Color.parse("red") COLOR_FOCUSED = rich.color.Color.parse("yellow") # TODO Should be orange (not a terminal color) COLOR_VISIBLE = rich.color.Color.parse("blue") COLOR_DEFAULT = rich.color.Color.parse("bright_black") async def updateWorkspaces(self, i3: i3ipc.Connection) -> None: """ Since the i3 IPC interface cannot really tell you by events when workspaces get invisible or not urgent anymore. Relying on those exclusively would require reimplementing some of i3 logic. Fetching all the workspaces on event looks ugly but is the most maintainable. Times I tried to challenge this and failed: 2. """ workspaces = await i3.get_workspaces() for workspace in workspaces: module = self.modulesFromOutput[workspace.output] if workspace.num in self.sections: section = self.sections[workspace.num] if section.parent != module: section.unsetParent() section.setParent(module) else: section = Section(parent=module, sortKey=workspace.num) self.sections[workspace.num] = section def generate_switch_workspace(num: int) -> typing.Callable: def switch_workspace() -> None: self.bar.taskGroup.create_task( i3.command(f"workspace number {num}") ) return switch_workspace section.setAction( Button.CLICK_LEFT, generate_switch_workspace(workspace.num) ) name = workspace.name if workspace.urgent: section.color = self.COLOR_URGENT elif workspace.focused: section.color = self.COLOR_FOCUSED elif workspace.visible: section.color = self.COLOR_VISIBLE else: section.color = self.COLOR_DEFAULT if workspace.focused or workspace.visible: name = f"{name} X" # TODO Custom names section.setText(name) workspacesNums = set(workspace.num for workspace in workspaces) for num, section in self.sections.items(): if num not in workspacesNums: # This should delete the Section but it turned out to be hard section.setText(None) def onWorkspaceChange( self, i3: i3ipc.Connection, e: i3ipc.Event | None = None ) -> None: # Cancelling the task doesn't seem to prevent performance double-events self.bar.taskGroup.create_task(self.updateWorkspaces(i3)) def __init__( self, ) -> None: super().__init__() self.sections: dict[int, Section] = dict() self.modulesFromOutput: dict[str, Module] = dict() self.bar: Bar async def run(self) -> None: for module in self.modules: screen = module.getFirstParentOfType(Screen) output = screen.output self.modulesFromOutput[output] = module self.bar = module.bar i3 = await i3ipc.aio.Connection(auto_reconnect=True).connect() i3.on(i3ipc.Event.WORKSPACE, self.onWorkspaceChange) self.onWorkspaceChange(i3) await i3.main() class AlertingProvider(Provider): COLOR_NORMAL = rich.color.Color.parse("green") COLOR_WARNING = rich.color.Color.parse("yellow") COLOR_DANGER = rich.color.Color.parse("red") warningThreshold: float dangerThreshold: float def updateLevel(self, level: float) -> None: if level > self.dangerThreshold: color = self.COLOR_DANGER elif level > self.warningThreshold: color = self.COLOR_WARNING else: color = self.COLOR_NORMAL for module in self.modules: for section in module.getSections(): section.color = color class CpuProvider(AlertingProvider, PeriodicStatefulProvider): async def init(self) -> None: self.section.numberStates = 3 self.warningThreshold = 75 self.dangerThreshold = 95 async def loop(self) -> None: percent = psutil.cpu_percent(percpu=False) self.updateLevel(percent) text = "" if self.section.state >= 2: percents = psutil.cpu_percent(percpu=True) text += " " + "".join([ramp(p / 100) for p in percents]) elif self.section.state >= 1: text += " " + ramp(percent / 100) self.section.setText(text) class LoadProvider(AlertingProvider, PeriodicStatefulProvider): async def init(self) -> None: self.section.numberStates = 3 self.warningThreshold = 5 self.dangerThreshold = 10 async def loop(self) -> None: load = os.getloadavg() self.updateLevel(load[0]) text = "" loads = 3 if self.section.state >= 2 else self.section.state for load_index in range(loads): text += f" {load[load_index]:.2f}" self.section.setText(text) class RamProvider(AlertingProvider, PeriodicStatefulProvider): async def init(self) -> None: self.section.numberStates = 4 self.warningThreshold = 75 self.dangerThreshold = 95 async def loop(self) -> None: mem = psutil.virtual_memory() self.updateLevel(mem.percent) text = "" if self.section.state >= 1: text += " " + ramp(mem.percent / 100) if self.section.state >= 2: text += humanSize(mem.total - mem.available) if self.section.state >= 3: text += "/" + humanSize(mem.total) self.section.setText(text) class TemperatureProvider(AlertingProvider, PeriodicStatefulProvider): RAMP = "" MAIN_TEMPS = ["coretemp", "amdgpu", "cpu_thermal"] # For Intel, AMD and ARM respectively. # FIXME Threshold doesn't seem to match old version main: str async def init(self) -> None: self.section.numberStates = 2 self.warningThreshold = 75 self.dangerThreshold = 95 allTemp = psutil.sensors_temperatures() for main in self.MAIN_TEMPS: if main in allTemp: self.main = main break else: raise IndexError("Could not find suitable temperature sensor") temp = allTemp[self.main][0] self.warningThresold = temp.high or 90.0 self.dangerThresold = temp.critical or 100.0 async def loop(self) -> None: allTemp = psutil.sensors_temperatures() temp = allTemp[self.main][0] self.updateLevel(temp.current) text = ramp(temp.current / self.warningThreshold, self.RAMP) if self.section.state >= 1: text += f" {temp.current:.0f}°C" self.section.setText(text) class BatteryProvider(AlertingProvider, PeriodicStatefulProvider): # TODO Support ACPID for events RAMP = "" async def init(self) -> None: self.section.numberStates = 3 # TODO 1 refresh rate is too quick self.warningThreshold = 75 self.dangerThreshold = 95 async def loop(self) -> None: bat = psutil.sensors_battery() if not bat: self.section.setText(None) self.updateLevel(100 - bat.percent) text = "" if bat.power_plugged else "" text += ramp(bat.percent / 100, self.RAMP) if self.section.state >= 1: text += f" {bat.percent:.0f}%" if self.section.state >= 2: h = int(bat.secsleft / 3600) m = int((bat.secsleft - h * 3600) / 60) text += f" ({h:d}:{m:02d})" self.section.setText(text) class PulseaudioProvider(SingleSectionProvider): async def update(self) -> None: async with pulsectl_asyncio.PulseAsync("frobar-updater") as pulse: text = "" # TODO Sections for sink in await pulse.sink_list(): log.debug(f"{sink}") if ( sink.port_active.name == "analog-output-headphones" or sink.port_active.description == "Headphones" ): icon = "" elif ( sink.port_active.name == "analog-output-speaker" or sink.port_active.description == "Speaker" ): icon = "" if sink.mute else "" elif sink.port_active.name in ("headset-output", "headphone-output"): icon = "" else: icon = "?" vol = await pulse.volume_get_all_chans(sink) fg = (sink.mute and "#333333") or (vol > 1 and "#FF0000") or None # TODO Show which is default text += f" {icon} {vol:.0%}" self.section.setText(text) async def run(self) -> None: await super().run() await self.update() async with pulsectl_asyncio.PulseAsync("frobar-events") as pulse: async for event in pulse.subscribe_events(pulsectl.PulseEventMaskEnum.sink): await self.update() class NetworkProviderSection(StatefulSection): def __init__( self, parent: Module, iface: str, provider: "NetworkProvider", ) -> None: super().__init__(parent=parent, sortKey=iface, color=provider.color) self.iface = iface self.provider = provider self.ignore = False self.icon = "?" self.wifi = False if iface == "lo": self.ignore = True elif iface.startswith("eth") or iface.startswith("enp"): if "u" in iface: self.icon = "" else: self.icon = "" elif iface.startswith("wlan") or iface.startswith("wl"): self.icon = "" self.wifi = True elif ( iface.startswith("tun") or iface.startswith("tap") or iface.startswith("wg") ): self.icon = "" elif iface.startswith("docker"): self.icon = "" elif iface.startswith("veth"): self.icon = "" elif iface.startswith("vboxnet"): self.icon = "" self.numberStates = 5 if self.wifi else 4 self.state = 1 if self.wifi else 0 self.setChangedState(self.update) async def update(self) -> None: if self.ignore or not self.provider.if_stats[self.iface].isup: self.setText(None) return text = self.icon state = self.state + (0 if self.wifi else 1) # SSID if self.wifi and state >= 1: cmd = ["iwgetid", self.iface, "--raw"] proc = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE ) stdout, stderr = await proc.communicate() text += f" {stdout.decode().strip()}" if state >= 2: # Address for address in self.provider.if_addrs[self.iface]: if address.family == socket.AF_INET: net = ipaddress.IPv4Network( (address.address, address.netmask), strict=False ) text += f" {address.address}/{net.prefixlen}" break if state >= 3: # Speed prevRecv = self.provider.prev_io_counters[self.iface].bytes_recv recv = self.provider.io_counters[self.iface].bytes_recv prevSent = self.provider.prev_io_counters[self.iface].bytes_sent sent = self.provider.io_counters[self.iface].bytes_sent dt = self.provider.time - self.provider.prev_time recvDiff = (recv - prevRecv) / dt sentDiff = (sent - prevSent) / dt text += f" ↓{humanSize(recvDiff)}↑{humanSize(sentDiff)}" if state >= 4: # Counter text += f" ⇓{humanSize(recv)}⇑{humanSize(sent)}" self.setText(text) class NetworkProvider(MirrorProvider, PeriodicProvider): def __init__( self, color: rich.color.Color = rich.color.Color.default(), ) -> None: super().__init__(color=color) self.sections: dict[str, NetworkProviderSection] = dict() async def init(self) -> None: loop = asyncio.get_running_loop() self.time = loop.time() self.io_counters = psutil.net_io_counters(pernic=True) async def loop(self) -> None: loop = asyncio.get_running_loop() async with asyncio.TaskGroup() as tg: self.prev_io_counters = self.io_counters self.prev_time = self.time # On-demand would only benefit if_addrs: # stats are used to determine display, # and we want to keep previous io_counters # so displaying stats is ~instant. self.time = loop.time() self.if_stats = psutil.net_if_stats() self.if_addrs = psutil.net_if_addrs() self.io_counters = psutil.net_io_counters(pernic=True) for iface in self.if_stats: section = self.sections.get(iface) if not section: section = NetworkProviderSection( parent=self.module, iface=iface, provider=self ) self.sections[iface] = section tg.create_task(section.update()) for iface, section in self.sections.items(): if iface not in self.if_stats: section.setText(None) async def onStateChange(self, section: StatefulSection) -> None: assert isinstance(section, NetworkProviderSection) await section.update() class TimeProvider(PeriodicStatefulProvider): FORMATS = ["%H:%M", "%m-%d %H:%M:%S", "%a %y-%m-%d %H:%M:%S"] async def init(self) -> None: self.section.state = 1 self.section.numberStates = len(self.FORMATS) async def loop(self) -> None: now = datetime.datetime.now() format = self.FORMATS[self.section.state] self.section.setText(now.strftime(format)) async def main() -> None: FROGARIZED = [ "#092c0e", "#143718", "#5a7058", "#677d64", "#89947f", "#99a08d", "#fae2e3", "#fff0f1", "#e0332e", "#cf4b15", "#bb8801", "#8d9800", "#1fa198", "#008dd1", "#5c73c4", "#d43982", ] # TODO Configurable # TODO Not super happy with the color management, # while using an existing library is great, it's limited to ANSI colors def base16_color(color: int) -> tuple[int, int, int]: hexa = FROGARIZED[color] return tuple(rich.color.parse_rgb_hex(hexa[1:])) theme = rich.terminal_theme.TerminalTheme( base16_color(0x0), base16_color(0x0), # TODO should be 7, currently 0 so it's compatible with v2 [ base16_color(0x0), # black base16_color(0x8), # red base16_color(0xB), # green base16_color(0xA), # yellow base16_color(0xD), # blue base16_color(0xE), # magenta base16_color(0xC), # cyan base16_color(0x5), # white ], [ base16_color(0x3), # bright black base16_color(0x8), # bright red base16_color(0xB), # bright green base16_color(0xA), # bright yellow base16_color(0xD), # bright blue base16_color(0xE), # bright magenta base16_color(0xC), # bright cyan base16_color(0x7), # bright white ], ) bar = Bar(theme=theme) dualScreen = len(bar.children) > 1 leftPreferred = 0 if dualScreen else None rightPreferred = 1 if dualScreen else None color = rich.color.Color.parse bar.addProvider(I3ModeProvider(color=color("red")), alignment=Alignment.LEFT) bar.addProvider(I3WorkspacesProvider(), alignment=Alignment.LEFT) if dualScreen: bar.addProvider( I3WindowTitleProvider(color=color("white")), screenNum=0, alignment=Alignment.CENTER, ) bar.addProvider( StaticProvider(text="mpris", color=color("bright_white")), screenNum=rightPreferred, alignment=Alignment.CENTER, ) bar.addProvider(CpuProvider(), screenNum=leftPreferred, alignment=Alignment.RIGHT) bar.addProvider(LoadProvider(), screenNum=leftPreferred, alignment=Alignment.RIGHT) bar.addProvider(RamProvider(), screenNum=leftPreferred, alignment=Alignment.RIGHT) bar.addProvider( TemperatureProvider(), screenNum=leftPreferred, alignment=Alignment.RIGHT, ) bar.addProvider( BatteryProvider(), screenNum=leftPreferred, alignment=Alignment.RIGHT ) bar.addProvider( PulseaudioProvider(color=color("magenta")), screenNum=rightPreferred, alignment=Alignment.RIGHT, ) bar.addProvider( NetworkProvider(color=color("blue")), screenNum=leftPreferred, alignment=Alignment.RIGHT, ) bar.addProvider(TimeProvider(color=color("cyan")), alignment=Alignment.RIGHT) await bar.run() asyncio.run(main())