dotfiles/hm/desktop/frobar/.dev/new.py

1297 lines
41 KiB
Python
Raw Normal View History

2024-08-13 01:58:51 +02:00
#!/usr/bin/env python3
import asyncio
2025-01-06 22:48:56 +01:00
import collections
2024-08-13 01:58:51 +02:00
import datetime
import enum
2024-09-26 23:04:11 +02:00
import ipaddress
2024-08-17 00:57:06 +02:00
import logging
2025-01-03 19:59:07 +01:00
import os
2024-08-14 03:29:34 +02:00
import signal
2024-09-26 23:04:11 +02:00
import socket
2025-01-10 00:16:48 +01:00
import time
2024-08-13 01:58:51 +02:00
import typing
2025-01-10 00:16:48 +01:00
import gi
2024-08-13 01:58:51 +02:00
import i3ipc
2024-08-17 00:57:06 +02:00
import i3ipc.aio
2024-08-25 09:48:36 +02:00
import psutil
2025-01-03 20:18:31 +01:00
import pulsectl
import pulsectl_asyncio
2025-01-03 18:56:20 +01:00
import rich.color
import rich.logging
import rich.terminal_theme
2025-01-10 00:16:48 +01:00
gi.require_version("Playerctl", "2.0")
import gi.repository.GLib
import gi.repository.Playerctl
2025-01-03 18:56:20 +01:00
logging.basicConfig(
level="DEBUG",
format="%(message)s",
datefmt="[%X]",
handlers=[rich.logging.RichHandler()],
)
log = logging.getLogger("frobar")
2024-08-17 00:57:06 +02:00
T = typing.TypeVar("T", bound="ComposableText")
2024-08-25 09:48:36 +02:00
P = typing.TypeVar("P", bound="ComposableText")
C = typing.TypeVar("C", bound="ComposableText")
Sortable = str | int
2024-08-13 01:58:51 +02:00
2025-01-03 19:59:07 +01:00
# Display utilities
2024-08-13 01:58:51 +02:00
2024-09-26 23:04:11 +02:00
def humanSize(numi: int) -> str:
"""
Returns a string of width 3+3
"""
num = float(numi)
for unit in ("B ", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB"):
if abs(num) < 1000:
if num >= 10:
2025-01-03 19:59:07 +01:00
return f"{int(num):3d}{unit}"
2024-09-26 23:04:11 +02:00
else:
2025-01-03 19:59:07 +01:00
return f"{num:.1f}{unit}"
2024-09-26 23:04:11 +02:00
num /= 1024
2025-01-03 19:59:07 +01:00
return f"{numi:d}YiB"
2025-01-03 20:18:31 +01:00
def ramp(p: float, states: str = " ▁▂▃▄▅▆▇█") -> str:
if p < 0:
return ""
d, m = divmod(p, 1.0)
return states[-1] * int(d) + states[round(m * (len(states) - 1))]
2024-09-26 23:04:11 +02:00
2025-01-10 00:16:48 +01:00
def clip(text: str, length: int = 30) -> str:
if len(text) > length:
text = text[: length - 1] + ""
return text
2024-08-25 09:48:36 +02:00
class ComposableText(typing.Generic[P, C]):
def __init__(
self,
parent: typing.Optional[P] = None,
sortKey: Sortable = 0,
) -> None:
self.parent: typing.Optional[P] = None
self.children: typing.MutableSequence[C] = list()
self.sortKey = sortKey
if parent:
self.setParent(parent)
self.bar = self.getFirstParentOfType(Bar)
def setParent(self, parent: P) -> None:
assert self.parent is None
parent.children.append(self)
assert isinstance(parent.children, list)
parent.children.sort(key=lambda c: c.sortKey)
self.parent = parent
self.parent.updateMarkup()
def unsetParent(self) -> None:
assert self.parent
self.parent.children.remove(self)
self.parent.updateMarkup()
self.parent = None
2024-08-17 00:57:06 +02:00
def getFirstParentOfType(self, typ: typing.Type[T]) -> T:
parent = self
while not isinstance(parent, typ):
assert parent.parent, f"{self} doesn't have a parent of {typ}"
parent = parent.parent
return parent
2024-08-14 00:28:17 +02:00
def updateMarkup(self) -> None:
2024-08-13 01:58:51 +02:00
self.bar.refresh.set()
2024-08-17 00:57:06 +02:00
# TODO OPTI See if worth caching the output
2024-08-14 00:28:17 +02:00
def generateMarkup(self) -> str:
raise NotImplementedError(f"{self} cannot generate markup")
def getMarkup(self) -> str:
return self.generateMarkup()
2024-08-13 01:58:51 +02:00
2024-08-17 00:57:06 +02:00
class Button(enum.Enum):
CLICK_LEFT = "1"
CLICK_MIDDLE = "2"
CLICK_RIGHT = "3"
SCROLL_UP = "4"
SCROLL_DOWN = "5"
2024-08-13 01:58:51 +02:00
class Section(ComposableText):
"""
Colorable block separated by chevrons
"""
2025-01-03 18:56:20 +01:00
def __init__(
self,
parent: "Module",
sortKey: Sortable = 0,
color: rich.color.Color = rich.color.Color.default(),
) -> None:
2024-08-25 09:48:36 +02:00
super().__init__(parent=parent, sortKey=sortKey)
2024-08-17 00:57:06 +02:00
self.parent: "Module"
2025-01-03 18:56:20 +01:00
self.color = color
2024-08-17 00:57:06 +02:00
self.desiredText: str | None = None
self.text = ""
self.targetSize = -1
self.size = -1
2024-08-14 00:28:17 +02:00
self.animationTask: asyncio.Task | None = None
2024-08-17 00:57:06 +02:00
self.actions: dict[Button, str] = dict()
2024-08-14 00:28:17 +02:00
def isHidden(self) -> bool:
2024-08-17 00:57:06 +02:00
return self.size < 0
2024-08-14 00:28:17 +02:00
2024-08-17 00:57:06 +02:00
# Geometric series, with a cap
2024-08-14 00:28:17 +02:00
ANIM_A = 0.025
ANIM_R = 0.9
2025-01-10 00:16:48 +01:00
ANIM_MIN = 0.001
2024-08-14 00:28:17 +02:00
async def animate(self) -> None:
2024-08-17 00:57:06 +02:00
increment = 1 if self.size < self.targetSize else -1
2024-08-14 00:28:17 +02:00
loop = asyncio.get_running_loop()
frameTime = loop.time()
animTime = self.ANIM_A
2025-01-06 00:56:40 +01:00
skipped = 0
2024-08-14 00:28:17 +02:00
2024-08-17 00:57:06 +02:00
while self.size != self.targetSize:
2024-08-14 00:28:17 +02:00
self.size += increment
self.updateMarkup()
animTime *= self.ANIM_R
2024-08-17 00:57:06 +02:00
animTime = max(self.ANIM_MIN, animTime)
2024-08-14 00:28:17 +02:00
frameTime += animTime
sleepTime = frameTime - loop.time()
# In case of stress, skip refreshing by not awaiting
if sleepTime > 0:
2025-01-06 00:56:40 +01:00
if skipped > 0:
log.warning(f"Skipped {skipped} animation frame(s)")
skipped = 0
2024-08-14 00:28:17 +02:00
await asyncio.sleep(sleepTime)
2024-08-17 00:57:06 +02:00
else:
2025-01-06 00:56:40 +01:00
skipped += 1
2024-08-14 00:28:17 +02:00
def setText(self, text: str | None) -> None:
2024-08-17 00:57:06 +02:00
# OPTI Don't redraw nor reset animation if setting the same text
if self.desiredText == text:
2024-08-14 00:28:17 +02:00
return
2024-08-17 00:57:06 +02:00
self.desiredText = text
if text is None:
self.text = ""
self.targetSize = -1
else:
self.text = f" {text} "
self.targetSize = len(self.text)
if self.animationTask:
self.animationTask.cancel()
# OPTI Skip the whole animation task if not required
if self.size == self.targetSize:
2024-08-14 00:28:17 +02:00
self.updateMarkup()
else:
self.animationTask = self.bar.taskGroup.create_task(self.animate())
2024-08-17 00:57:06 +02:00
def setAction(self, button: Button, callback: typing.Callable | None) -> None:
if button in self.actions:
command = self.actions[button]
self.bar.removeAction(command)
del self.actions[button]
if callback:
command = self.bar.addAction(callback)
self.actions[button] = command
2024-08-14 00:28:17 +02:00
def generateMarkup(self) -> str:
2024-08-17 00:57:06 +02:00
assert not self.isHidden()
2024-08-14 00:28:17 +02:00
pad = max(0, self.size - len(self.text))
2024-08-17 00:57:06 +02:00
text = self.text[: self.size] + " " * pad
for button, command in self.actions.items():
text = "%{A" + button.value + ":" + command + ":}" + text + "%{A}"
return text
2024-08-13 01:58:51 +02:00
class Module(ComposableText):
"""
Sections handled by a same updater
"""
def __init__(self, parent: "Side") -> None:
super().__init__(parent=parent)
2024-08-17 00:57:06 +02:00
self.parent: "Side"
2024-08-25 09:48:36 +02:00
self.children: typing.MutableSequence[Section]
2024-08-17 00:57:06 +02:00
2024-08-14 02:45:25 +02:00
self.mirroring: Module | None = None
self.mirrors: list[Module] = list()
def mirror(self, module: "Module") -> None:
self.mirroring = module
module.mirrors.append(self)
2024-08-25 09:48:36 +02:00
def getSections(self) -> typing.Sequence[Section]:
2024-08-14 02:45:25 +02:00
if self.mirroring:
2024-08-25 09:48:36 +02:00
return self.mirroring.children
2024-08-14 02:45:25 +02:00
else:
2024-08-25 09:48:36 +02:00
return self.children
2024-08-14 02:45:25 +02:00
def updateMarkup(self) -> None:
super().updateMarkup()
for mirror in self.mirrors:
mirror.updateMarkup()
2024-08-13 01:58:51 +02:00
class Alignment(enum.Enum):
LEFT = "l"
RIGHT = "r"
CENTER = "c"
class Side(ComposableText):
def __init__(self, parent: "Screen", alignment: Alignment) -> None:
super().__init__(parent=parent)
2024-08-17 00:57:06 +02:00
self.parent: Screen
2024-08-25 09:48:36 +02:00
self.children: typing.MutableSequence[Module] = []
2024-08-17 00:57:06 +02:00
2024-08-13 01:58:51 +02:00
self.alignment = alignment
2025-01-03 18:56:20 +01:00
self.bar = parent.getFirstParentOfType(Bar)
2024-08-13 01:58:51 +02:00
2024-08-14 00:28:17 +02:00
def generateMarkup(self) -> str:
2024-08-25 09:48:36 +02:00
if not self.children:
2024-08-13 01:58:51 +02:00
return ""
text = "%{" + self.alignment.value + "}"
lastSection: Section | None = None
2024-08-25 09:48:36 +02:00
for module in self.children:
2024-08-14 02:45:25 +02:00
for section in module.getSections():
2024-08-14 00:28:17 +02:00
if section.isHidden():
continue
2025-01-03 18:56:20 +01:00
hexa = section.color.get_truecolor(theme=self.bar.theme).hex
2024-08-13 01:58:51 +02:00
if lastSection is None:
if self.alignment == Alignment.LEFT:
2025-01-03 18:56:20 +01:00
text += "%{B" + hexa + "}%{F-}"
2024-08-13 01:58:51 +02:00
else:
2025-01-03 18:56:20 +01:00
text += "%{B-}%{F" + hexa + "}%{R}%{F-}"
2024-08-13 01:58:51 +02:00
else:
if self.alignment == Alignment.RIGHT:
if lastSection.color == section.color:
text += ""
else:
2025-01-03 18:56:20 +01:00
text += "%{F" + hexa + "}%{R}"
2024-08-13 01:58:51 +02:00
else:
if lastSection.color == section.color:
text += ""
else:
2025-01-03 18:56:20 +01:00
text += "%{R}%{B" + hexa + "}"
2024-08-13 01:58:51 +02:00
text += "%{F-}"
2024-08-14 00:28:17 +02:00
text += section.getMarkup()
2024-08-13 01:58:51 +02:00
lastSection = section
if self.alignment != Alignment.RIGHT:
text += "%{R}%{B-}"
return text
class Screen(ComposableText):
def __init__(self, parent: "Bar", output: str) -> None:
super().__init__(parent=parent)
2024-08-17 00:57:06 +02:00
self.parent: "Bar"
2024-08-25 09:48:36 +02:00
self.children: typing.MutableSequence[Side]
2024-08-17 00:57:06 +02:00
2024-08-13 01:58:51 +02:00
self.output = output
for alignment in Alignment:
2024-08-25 09:48:36 +02:00
Side(parent=self, alignment=alignment)
2024-08-13 01:58:51 +02:00
2024-08-14 00:28:17 +02:00
def generateMarkup(self) -> str:
2024-08-13 01:58:51 +02:00
return ("%{Sn" + self.output + "}") + "".join(
2024-08-25 09:48:36 +02:00
side.getMarkup() for side in self.children
2024-08-13 01:58:51 +02:00
)
class Bar(ComposableText):
"""
Top-level
"""
2025-01-03 18:56:20 +01:00
def __init__(
self,
theme: rich.terminal_theme.TerminalTheme = rich.terminal_theme.DEFAULT_TERMINAL_THEME,
) -> None:
2024-08-13 01:58:51 +02:00
super().__init__()
2024-08-25 09:48:36 +02:00
self.parent: None
self.children: typing.MutableSequence[Screen]
2024-09-26 23:04:11 +02:00
self.longRunningTasks: list[asyncio.Task] = list()
2025-01-03 18:56:20 +01:00
self.theme = theme
2024-08-25 09:48:36 +02:00
2024-08-13 01:58:51 +02:00
self.refresh = asyncio.Event()
2024-08-14 00:28:17 +02:00
self.taskGroup = asyncio.TaskGroup()
self.providers: list["Provider"] = list()
2024-08-17 00:57:06 +02:00
self.actionIndex = 0
self.actions: dict[str, typing.Callable] = dict()
2024-08-13 01:58:51 +02:00
2024-09-27 23:06:13 +02:00
self.periodicProviderTask: typing.Coroutine | None = None
2024-08-13 01:58:51 +02:00
i3 = i3ipc.Connection()
for output in i3.get_outputs():
if not output.active:
continue
2024-08-25 09:48:36 +02:00
Screen(parent=self, output=output.name)
2024-08-13 01:58:51 +02:00
2024-09-26 23:04:11 +02:00
def addLongRunningTask(self, coro: typing.Coroutine) -> None:
task = self.taskGroup.create_task(coro)
self.longRunningTasks.append(task)
2024-08-13 01:58:51 +02:00
async def run(self) -> None:
2024-08-14 02:45:25 +02:00
cmd = [
2024-08-13 01:58:51 +02:00
"lemonbar",
"-b",
"-a",
"64",
"-f",
"DejaVuSansM Nerd Font:size=10",
2025-01-03 18:56:20 +01:00
"-F",
self.theme.foreground_color.hex,
"-B",
self.theme.background_color.hex,
2024-08-14 02:45:25 +02:00
]
2024-08-17 00:57:06 +02:00
proc = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE
)
2024-08-13 01:58:51 +02:00
async def refresher() -> None:
assert proc.stdin
2024-08-17 00:57:06 +02:00
while True:
2024-08-13 01:58:51 +02:00
await self.refresh.wait()
self.refresh.clear()
2024-08-17 00:57:06 +02:00
markup = self.getMarkup()
proc.stdin.write(markup.encode())
async def actionHandler() -> None:
assert proc.stdout
while True:
line = await proc.stdout.readline()
command = line.decode().strip()
callback = self.actions[command]
callback()
async with self.taskGroup:
2024-09-26 23:04:11 +02:00
self.addLongRunningTask(refresher())
self.addLongRunningTask(actionHandler())
2024-08-14 00:28:17 +02:00
for provider in self.providers:
2024-09-26 23:04:11 +02:00
self.addLongRunningTask(provider.run())
2024-08-13 01:58:51 +02:00
2024-08-14 03:29:34 +02:00
def exit() -> None:
2024-08-17 00:57:06 +02:00
log.info("Terminating")
2024-09-26 23:04:11 +02:00
for task in self.longRunningTasks:
2024-08-17 00:57:06 +02:00
task.cancel()
2024-08-14 03:29:34 +02:00
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGINT, exit)
2024-08-14 00:28:17 +02:00
def generateMarkup(self) -> str:
2024-08-25 09:48:36 +02:00
return "".join(screen.getMarkup() for screen in self.children) + "\n"
2024-08-13 01:58:51 +02:00
def addProvider(
self,
provider: "Provider",
alignment: Alignment = Alignment.LEFT,
2024-08-14 00:28:17 +02:00
screenNum: int | None = None,
2024-08-13 01:58:51 +02:00
) -> None:
2024-08-14 00:28:17 +02:00
"""
screenNum: the provider will be added on this screen if set, all otherwise
"""
2024-08-13 01:58:51 +02:00
modules = list()
2024-08-25 09:48:36 +02:00
for s, screen in enumerate(self.children):
2024-08-14 02:45:25 +02:00
if screenNum is None or s == screenNum:
2024-08-25 09:48:36 +02:00
side = next(filter(lambda s: s.alignment == alignment, screen.children))
2024-08-13 01:58:51 +02:00
module = Module(parent=side)
modules.append(module)
provider.modules = modules
if modules:
self.providers.append(provider)
2024-08-17 00:57:06 +02:00
def addAction(self, callback: typing.Callable) -> str:
command = f"{self.actionIndex:x}"
self.actions[command] = callback
self.actionIndex += 1
return command
def removeAction(self, command: str) -> None:
del self.actions[command]
2024-08-13 01:58:51 +02:00
class Provider:
2025-01-06 22:48:56 +01:00
sectionType: type[Section] = Section
2025-01-03 18:56:20 +01:00
def __init__(self, color: rich.color.Color = rich.color.Color.default()) -> None:
2024-08-13 01:58:51 +02:00
self.modules: list[Module] = list()
2025-01-03 18:56:20 +01:00
self.color = color
2024-08-13 01:58:51 +02:00
async def run(self) -> None:
2024-09-27 23:06:13 +02:00
# Not a NotImplementedError, otherwise can't combine all classes
pass
2024-08-13 01:58:51 +02:00
2024-08-14 02:45:25 +02:00
class MirrorProvider(Provider):
2025-01-03 18:56:20 +01:00
def __init__(self, color: rich.color.Color = rich.color.Color.default()) -> None:
super().__init__(color=color)
2024-08-14 02:45:25 +02:00
self.module: Module
2024-08-13 01:58:51 +02:00
async def run(self) -> None:
2024-09-27 23:06:13 +02:00
await super().run()
2024-08-14 02:45:25 +02:00
self.module = self.modules[0]
for module in self.modules[1:]:
module.mirror(self.module)
class SingleSectionProvider(MirrorProvider):
async def run(self) -> None:
await super().run()
2025-01-06 22:48:56 +01:00
self.section = self.sectionType(parent=self.module, color=self.color)
2024-08-14 02:45:25 +02:00
class StaticProvider(SingleSectionProvider):
2025-01-03 18:56:20 +01:00
def __init__(
self, text: str, color: rich.color.Color = rich.color.Color.default()
) -> None:
super().__init__(color=color)
2024-08-14 02:45:25 +02:00
self.text = text
async def run(self) -> None:
await super().run()
self.section.setText(self.text)
2024-08-25 09:48:36 +02:00
class StatefulSection(Section):
2024-08-17 00:57:06 +02:00
2025-01-03 18:56:20 +01:00
def __init__(
self,
parent: Module,
sortKey: Sortable = 0,
color: rich.color.Color = rich.color.Color.default(),
) -> None:
super().__init__(parent=parent, sortKey=sortKey, color=color)
2024-08-17 00:57:06 +02:00
self.state = 0
2024-08-25 09:48:36 +02:00
self.numberStates: int
2024-08-17 00:57:06 +02:00
2024-08-25 09:48:36 +02:00
self.setAction(Button.CLICK_LEFT, self.incrementState)
self.setAction(Button.CLICK_RIGHT, self.decrementState)
2024-08-17 00:57:06 +02:00
def incrementState(self) -> None:
self.state += 1
self.changeState()
def decrementState(self) -> None:
self.state -= 1
self.changeState()
2024-09-27 23:06:13 +02:00
def setChangedState(self, callback: typing.Callable) -> None:
self.callback = callback
2024-08-17 00:57:06 +02:00
def changeState(self) -> None:
2024-08-25 09:48:36 +02:00
self.state %= self.numberStates
2024-09-27 23:06:13 +02:00
self.bar.taskGroup.create_task(self.callback())
2025-01-06 22:48:56 +01:00
class StatefulSectionProvider(Provider):
sectionType = StatefulSection
class SingleStatefulSectionProvider(StatefulSectionProvider, SingleSectionProvider):
section: StatefulSection
class MultiSectionsProvider(Provider):
def __init__(self, color: rich.color.Color = rich.color.Color.default()) -> None:
super().__init__(color=color)
self.sectionKeys: dict[Module, dict[Sortable, Section]] = (
collections.defaultdict(dict)
)
self.updaters: dict[Section, typing.Callable] = dict()
async def getSectionUpdater(self, section: Section) -> typing.Callable:
raise NotImplementedError()
async def updateSections(self, sections: set[Sortable], module: Module) -> None:
moduleSections = self.sectionKeys[module]
async with asyncio.TaskGroup() as tg:
for sortKey in sections:
section = moduleSections.get(sortKey)
if not section:
section = self.sectionType(
parent=module, sortKey=sortKey, color=self.color
)
self.updaters[section] = await self.getSectionUpdater(section)
moduleSections[sortKey] = section
updater = self.updaters[section]
tg.create_task(updater())
missingKeys = set(moduleSections.keys()) - sections
for missingKey in missingKeys:
section = moduleSections.get(missingKey)
assert section
section.setText(None)
2024-09-27 23:06:13 +02:00
class PeriodicProvider(Provider):
async def init(self) -> None:
pass
async def loop(self) -> None:
raise NotImplementedError()
@classmethod
async def task(cls, bar: Bar) -> None:
providers = list()
for provider in bar.providers:
if isinstance(provider, PeriodicProvider):
providers.append(provider)
await provider.init()
while True:
# TODO Block bar update during the periodic update of the loops
loops = [provider.loop() for provider in providers]
asyncio.gather(*loops)
now = datetime.datetime.now()
# Hardcoded to 1 second... not sure if we want some more than that,
# and if the logic to check if a task should run would be a win
# compared to the task itself
remaining = 1 - now.microsecond / 1000000
await asyncio.sleep(remaining)
async def run(self) -> None:
await super().run()
for module in self.modules:
bar = module.getFirstParentOfType(Bar)
assert bar
if not bar.periodicProviderTask:
bar.periodicProviderTask = PeriodicProvider.task(bar)
bar.addLongRunningTask(bar.periodicProviderTask)
2024-08-17 00:57:06 +02:00
2024-08-25 09:48:36 +02:00
2024-09-27 23:06:13 +02:00
class PeriodicStatefulProvider(SingleStatefulSectionProvider, PeriodicProvider):
async def run(self) -> None:
await super().run()
self.section.setChangedState(self.loop)
2024-08-17 00:57:06 +02:00
# Providers
class I3ModeProvider(SingleSectionProvider):
def on_mode(self, i3: i3ipc.Connection, e: i3ipc.Event) -> None:
self.section.setText(None if e.change == "default" else e.change)
async def run(self) -> None:
await super().run()
i3 = await i3ipc.aio.Connection(auto_reconnect=True).connect()
i3.on(i3ipc.Event.MODE, self.on_mode)
await i3.main()
2025-01-06 22:48:56 +01:00
# TODO Hide WorkspaceProvider when this is active
2024-08-17 00:57:06 +02:00
class I3WindowTitleProvider(SingleSectionProvider):
# TODO FEAT To make this available from start, we need to find the
# `focused=True` element following the `focus` array
def on_window(self, i3: i3ipc.Connection, e: i3ipc.Event) -> None:
self.section.setText(e.container.name)
async def run(self) -> None:
await super().run()
i3 = await i3ipc.aio.Connection(auto_reconnect=True).connect()
i3.on(i3ipc.Event.WINDOW, self.on_window)
await i3.main()
2025-01-06 22:48:56 +01:00
class I3WorkspacesProvider(MultiSectionsProvider):
2025-01-03 18:56:20 +01:00
COLOR_URGENT = rich.color.Color.parse("red")
COLOR_FOCUSED = rich.color.Color.parse("yellow")
2025-01-03 19:59:07 +01:00
# TODO Should be orange (not a terminal color)
2025-01-06 22:48:56 +01:00
COLOR_VISIBLE = rich.color.Color.parse("cyan")
2025-01-03 18:56:20 +01:00
COLOR_DEFAULT = rich.color.Color.parse("bright_black")
2024-08-17 00:57:06 +02:00
2025-01-06 22:48:56 +01:00
def __init__(
self,
) -> None:
super().__init__()
self.workspaces: dict[int, i3ipc.WorkspaceReply]
2024-08-17 00:57:06 +02:00
2025-01-06 22:48:56 +01:00
self.sections: dict[int, Section] = dict()
self.modulesFromOutput: dict[str, Module] = dict()
self.bar: Bar
2024-08-17 00:57:06 +02:00
2025-01-06 22:48:56 +01:00
async def getSectionUpdater(self, section: Section) -> typing.Callable:
assert isinstance(section.sortKey, int)
num = section.sortKey
2024-08-17 00:57:06 +02:00
2025-01-06 22:48:56 +01:00
def switch_to_workspace() -> None:
self.bar.taskGroup.create_task(self.i3.command(f"workspace number {num}"))
section.setAction(Button.CLICK_LEFT, switch_to_workspace)
async def update() -> None:
workspace = self.workspaces[num]
2024-08-17 00:57:06 +02:00
name = workspace.name
if workspace.urgent:
2025-01-03 18:56:20 +01:00
section.color = self.COLOR_URGENT
2024-08-17 00:57:06 +02:00
elif workspace.focused:
2025-01-03 18:56:20 +01:00
section.color = self.COLOR_FOCUSED
2024-08-17 00:57:06 +02:00
elif workspace.visible:
2025-01-03 18:56:20 +01:00
section.color = self.COLOR_VISIBLE
else:
section.color = self.COLOR_DEFAULT
if workspace.focused or workspace.visible:
2025-01-10 00:16:48 +01:00
name = f"{name} X" # FIXME Custom names
2024-08-17 00:57:06 +02:00
section.setText(name)
2025-01-06 22:48:56 +01:00
return update
async def updateWorkspaces(self) -> None:
"""
Since the i3 IPC interface cannot really tell you by events
when workspaces get invisible or not urgent anymore.
Relying on those exclusively would require reimplementing some of i3 logic.
Fetching all the workspaces on event looks ugly but is the most maintainable.
Times I tried to challenge this and failed: 2.
"""
workspaces = await self.i3.get_workspaces()
self.workspaces = dict()
modules = collections.defaultdict(set)
for workspace in workspaces:
self.workspaces[workspace.num] = workspace
module = self.modulesFromOutput[workspace.output]
modules[module].add(workspace.num)
await asyncio.gather(
*[self.updateSections(nums, module) for module, nums in modules.items()]
)
2024-08-17 00:57:06 +02:00
def onWorkspaceChange(
self, i3: i3ipc.Connection, e: i3ipc.Event | None = None
) -> None:
# Cancelling the task doesn't seem to prevent performance double-events
2025-01-06 22:48:56 +01:00
self.bar.taskGroup.create_task(self.updateWorkspaces())
2024-08-17 00:57:06 +02:00
async def run(self) -> None:
for module in self.modules:
screen = module.getFirstParentOfType(Screen)
output = screen.output
self.modulesFromOutput[output] = module
self.bar = module.bar
2025-01-06 22:48:56 +01:00
self.i3 = await i3ipc.aio.Connection(auto_reconnect=True).connect()
self.i3.on(i3ipc.Event.WORKSPACE, self.onWorkspaceChange)
self.onWorkspaceChange(self.i3)
await self.i3.main()
2024-08-17 00:57:06 +02:00
2025-01-10 00:16:48 +01:00
class MprisProvider(MirrorProvider):
STATUSES = {
gi.repository.Playerctl.PlaybackStatus.PLAYING: "",
gi.repository.Playerctl.PlaybackStatus.PAUSED: "",
gi.repository.Playerctl.PlaybackStatus.STOPPED: "",
}
PROVIDERS = {
"mpd": "",
"firefox": "",
"chromium": "",
"mpv": "",
}
@staticmethod
def get(
something: gi.overrides.GLib.Variant, key: str, default: typing.Any = None
) -> typing.Any:
if key in something.keys():
return something[key]
else:
return default
@staticmethod
def format_us(ms: int) -> str:
if ms < 60 * 60 * 1000000:
return time.strftime("%M:%S", time.gmtime(ms // 1000000))
else:
return str(datetime.timedelta(microseconds=ms))
def show_player(self, player_: gi.repository.Playerctl.Player) -> None:
player = player_.props.player_name
player = self.PROVIDERS.get(player, player)
status = self.STATUSES.get(player_.props.playback_status, "?")
self.status.setText(f"{player} {status}")
metadata = player_.props.metadata
album = self.get(metadata, "xesam:album")
if album:
self.album.setText(f"{clip(album)}")
else:
self.album.setText(None)
artists = self.get(metadata, "xesam:artist")
if artists:
artist = ", ".join(artists)
self.artist.setText(f"{clip(artist)}")
else:
self.artist.setText(None)
pos = player_.props.position # In µs
# FIXME Doesn't increment during play
text = f"{self.format_us(pos)}"
dur = self.get(metadata, "mpris:length")
if dur:
text += f"/{self.format_us(dur)}"
title = self.get(metadata, "xesam:title")
if title:
text += f" {clip(title)}"
self.title.setText(text)
def show_players(self, manager: gi.repository.Playerctl.PlayerManager, exclude: str | None = None) -> None:
# FIXME Opening and closing a chromium player, it will stay there (as default),
# which is not the behaviour of playerctl somehow
for name in manager.props.player_names:
if name == exclude: # DEBUG
continue
player = gi.repository.Playerctl.Player.new_from_name(name)
self.show_player(player)
break
else:
self.status.setText(None)
self.album.setText(None)
self.artist.setText(None)
self.title.setText(None)
def on_player_appear(
self,
manager: gi.repository.Playerctl.PlayerManager,
player: gi.repository.Playerctl.Player,
) -> None:
log.debug(f"Player appeared: {player.props.player_name}")
self.show_player(player)
def on_player_vanished(
self,
manager: gi.repository.Playerctl.PlayerManager,
player: gi.repository.Playerctl.Player,
) -> None:
log.debug(f"Player vanished: {player.props.player_name}")
self.show_player(player)
def on_event(
self,
player: gi.repository.Playerctl.Player,
_: typing.Any,
manager: gi.repository.Playerctl.PlayerManager,
) -> None:
log.debug(f"Player evented: {player.props.player_name}")
self.show_player(player)
def init_player(
self, manager: gi.repository.Playerctl.PlayerManager, name: str
) -> None:
player = gi.repository.Playerctl.Player.new_from_name(name)
# All events will cause the active player to change,
# so we listen on all events, even if the display won't change
player.connect("playback-status", self.on_event, manager)
player.connect("loop-status", self.on_event, manager)
player.connect("shuffle", self.on_event, manager)
player.connect("metadata", self.on_event, manager)
player.connect("volume", self.on_event, manager)
player.connect("seeked", self.on_event, manager)
manager.manage_player(player)
def on_name_appeared(
self, manager: gi.repository.Playerctl.PlayerManager, name: str
) -> None:
log.debug(f"Player name appeared: {name}")
self.init_player(manager, name)
def on_name_vanished(
self,
manager: gi.repository.Playerctl.PlayerManager,
name: str,
) -> None:
log.debug(f"Player name vanished: {name}")
self.show_players(manager, exclude=name)
def blocking(self) -> None:
manager = gi.repository.Playerctl.PlayerManager()
manager.connect("name-appeared", self.on_name_appeared)
manager.connect("player-appeared", self.on_player_appear)
manager.connect("name-vanished", self.on_name_vanished)
manager.connect("player-vanished", self.on_player_vanished)
for name in manager.props.player_names:
self.init_player(manager, name)
self.show_players(manager)
# FIXME Prevents graceful shutdown
main = gi.repository.GLib.MainLoop()
main.run()
async def run(self) -> None:
await super().run()
self.status = self.sectionType(parent=self.module, color=self.color)
self.album = self.sectionType(parent=self.module, color=self.color)
self.artist = self.sectionType(parent=self.module, color=self.color)
self.title = self.sectionType(parent=self.module, color=self.color)
# PyGObject can work with async code, but is experimental,
# also it running in its own event loop makes things tricky
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, self.blocking)
2025-01-03 19:59:07 +01:00
class AlertingProvider(Provider):
COLOR_NORMAL = rich.color.Color.parse("green")
COLOR_WARNING = rich.color.Color.parse("yellow")
COLOR_DANGER = rich.color.Color.parse("red")
warningThreshold: float
dangerThreshold: float
def updateLevel(self, level: float) -> None:
if level > self.dangerThreshold:
color = self.COLOR_DANGER
elif level > self.warningThreshold:
color = self.COLOR_WARNING
else:
color = self.COLOR_NORMAL
for module in self.modules:
for section in module.getSections():
section.color = color
class CpuProvider(AlertingProvider, PeriodicStatefulProvider):
async def init(self) -> None:
self.section.numberStates = 3
self.warningThreshold = 75
self.dangerThreshold = 95
async def loop(self) -> None:
percent = psutil.cpu_percent(percpu=False)
self.updateLevel(percent)
text = ""
if self.section.state >= 2:
percents = psutil.cpu_percent(percpu=True)
text += " " + "".join([ramp(p / 100) for p in percents])
elif self.section.state >= 1:
text += " " + ramp(percent / 100)
self.section.setText(text)
class LoadProvider(AlertingProvider, PeriodicStatefulProvider):
async def init(self) -> None:
self.section.numberStates = 3
self.warningThreshold = 5
self.dangerThreshold = 10
async def loop(self) -> None:
load = os.getloadavg()
self.updateLevel(load[0])
text = ""
loads = 3 if self.section.state >= 2 else self.section.state
for load_index in range(loads):
text += f" {load[load_index]:.2f}"
self.section.setText(text)
class RamProvider(AlertingProvider, PeriodicStatefulProvider):
async def init(self) -> None:
self.section.numberStates = 4
self.warningThreshold = 75
self.dangerThreshold = 95
async def loop(self) -> None:
mem = psutil.virtual_memory()
self.updateLevel(mem.percent)
text = ""
if self.section.state >= 1:
text += " " + ramp(mem.percent / 100)
if self.section.state >= 2:
text += humanSize(mem.total - mem.available)
if self.section.state >= 3:
text += "/" + humanSize(mem.total)
self.section.setText(text)
class TemperatureProvider(AlertingProvider, PeriodicStatefulProvider):
RAMP = ""
MAIN_TEMPS = ["coretemp", "amdgpu", "cpu_thermal"]
# For Intel, AMD and ARM respectively.
2025-01-03 20:18:31 +01:00
# FIXME Threshold doesn't seem to match old version
2025-01-03 19:59:07 +01:00
main: str
async def init(self) -> None:
self.section.numberStates = 2
self.warningThreshold = 75
self.dangerThreshold = 95
allTemp = psutil.sensors_temperatures()
for main in self.MAIN_TEMPS:
if main in allTemp:
self.main = main
break
else:
raise IndexError("Could not find suitable temperature sensor")
temp = allTemp[self.main][0]
self.warningThresold = temp.high or 90.0
self.dangerThresold = temp.critical or 100.0
async def loop(self) -> None:
allTemp = psutil.sensors_temperatures()
temp = allTemp[self.main][0]
self.updateLevel(temp.current)
text = ramp(temp.current / self.warningThreshold, self.RAMP)
if self.section.state >= 1:
text += f" {temp.current:.0f}°C"
self.section.setText(text)
class BatteryProvider(AlertingProvider, PeriodicStatefulProvider):
# TODO Support ACPID for events
RAMP = ""
async def init(self) -> None:
self.section.numberStates = 3
# TODO 1 refresh rate is too quick
self.warningThreshold = 75
self.dangerThreshold = 95
async def loop(self) -> None:
bat = psutil.sensors_battery()
if not bat:
self.section.setText(None)
self.updateLevel(100 - bat.percent)
text = "" if bat.power_plugged else ""
text += ramp(bat.percent / 100, self.RAMP)
if self.section.state >= 1:
text += f" {bat.percent:.0f}%"
if self.section.state >= 2:
h = int(bat.secsleft / 3600)
m = int((bat.secsleft - h * 3600) / 60)
text += f" ({h:d}:{m:02d})"
self.section.setText(text)
2025-01-06 22:48:56 +01:00
class PulseaudioProvider(
MirrorProvider, StatefulSectionProvider, MultiSectionsProvider
):
async def getSectionUpdater(self, section: Section) -> typing.Callable:
assert isinstance(section, StatefulSection)
assert isinstance(section.sortKey, str)
sink = self.sinks[section.sortKey]
if (
sink.port_active.name == "analog-output-headphones"
or sink.port_active.description == "Headphones"
):
icon = ""
elif (
sink.port_active.name == "analog-output-speaker"
or sink.port_active.description == "Speaker"
):
icon = ""
elif sink.port_active.name in ("headset-output", "headphone-output"):
icon = ""
else:
icon = "?"
section.numberStates = 3
section.state = 1
# TODO Change volume with wheel
async def updater() -> None:
assert isinstance(section, StatefulSection)
text = icon
sink = self.sinks[section.sortKey]
async with pulsectl_asyncio.PulseAsync("frobar-get-volume") as pulse:
2025-01-03 20:18:31 +01:00
vol = await pulse.volume_get_all_chans(sink)
2025-01-06 22:48:56 +01:00
if section.state == 1:
text += f" {ramp(vol)}"
elif section.state == 2:
text += f" {vol:.0%}"
# TODO Show which is default
section.setText(text)
2025-01-03 20:18:31 +01:00
2025-01-06 22:48:56 +01:00
section.setChangedState(updater)
return updater
async def update(self) -> None:
async with pulsectl_asyncio.PulseAsync("frobar-list-sinks") as pulse:
self.sinks = dict((sink.name, sink) for sink in await pulse.sink_list())
await self.updateSections(set(self.sinks.keys()), self.module)
2025-01-03 20:18:31 +01:00
async def run(self) -> None:
await super().run()
await self.update()
2025-01-06 22:48:56 +01:00
async with pulsectl_asyncio.PulseAsync("frobar-events-listener") as pulse:
2025-01-03 20:18:31 +01:00
async for event in pulse.subscribe_events(pulsectl.PulseEventMaskEnum.sink):
await self.update()
2025-01-06 22:48:56 +01:00
class NetworkProvider(
MirrorProvider, PeriodicProvider, StatefulSectionProvider, MultiSectionsProvider
):
2025-01-03 18:56:20 +01:00
def __init__(
self,
2025-01-06 22:48:56 +01:00
color: rich.color.Color = rich.color.Color.default(),
2025-01-03 18:56:20 +01:00
) -> None:
2025-01-06 22:48:56 +01:00
super().__init__(color=color)
async def init(self) -> None:
loop = asyncio.get_running_loop()
self.time = loop.time()
self.io_counters = psutil.net_io_counters(pernic=True)
2024-08-25 09:48:36 +02:00
2025-01-06 22:48:56 +01:00
async def doNothing(self) -> None:
pass
@staticmethod
def getIfaceAttributes(iface: str) -> tuple[bool, str, bool]:
relevant = True
icon = "?"
wifi = False
2024-08-25 09:48:36 +02:00
if iface == "lo":
2025-01-06 22:48:56 +01:00
relevant = False
2024-08-25 09:48:36 +02:00
elif iface.startswith("eth") or iface.startswith("enp"):
if "u" in iface:
2025-01-06 22:48:56 +01:00
icon = ""
2024-08-25 09:48:36 +02:00
else:
2025-01-06 22:48:56 +01:00
icon = ""
2024-08-25 09:48:36 +02:00
elif iface.startswith("wlan") or iface.startswith("wl"):
2025-01-06 22:48:56 +01:00
icon = ""
wifi = True
2024-08-25 09:48:36 +02:00
elif (
iface.startswith("tun") or iface.startswith("tap") or iface.startswith("wg")
):
2025-01-06 22:48:56 +01:00
icon = ""
2025-01-03 20:18:31 +01:00
2024-08-25 09:48:36 +02:00
elif iface.startswith("docker"):
2025-01-06 22:48:56 +01:00
icon = ""
2024-08-25 09:48:36 +02:00
elif iface.startswith("veth"):
2025-01-06 22:48:56 +01:00
icon = ""
2024-08-25 09:48:36 +02:00
elif iface.startswith("vboxnet"):
2025-01-06 22:48:56 +01:00
icon = ""
2024-08-25 09:48:36 +02:00
2025-01-06 22:48:56 +01:00
return relevant, icon, wifi
2024-09-26 23:04:11 +02:00
2025-01-06 22:48:56 +01:00
async def getSectionUpdater(self, section: Section) -> typing.Callable:
2024-09-26 23:04:11 +02:00
2025-01-06 22:48:56 +01:00
assert isinstance(section, StatefulSection)
assert isinstance(section.sortKey, str)
iface = section.sortKey
2024-09-26 23:04:11 +02:00
2025-01-06 22:48:56 +01:00
relevant, icon, wifi = self.getIfaceAttributes(iface)
2024-09-26 23:04:11 +02:00
2025-01-06 22:48:56 +01:00
if not relevant:
return self.doNothing
2024-09-26 23:04:11 +02:00
2025-01-06 22:48:56 +01:00
section.numberStates = 5 if wifi else 4
section.state = 1 if wifi else 0
2024-08-25 09:48:36 +02:00
2025-01-06 22:48:56 +01:00
async def update() -> None:
assert isinstance(section, StatefulSection)
2024-08-25 09:48:36 +02:00
2025-01-06 22:48:56 +01:00
if not self.if_stats[iface].isup:
section.setText(None)
return
2024-08-25 09:48:36 +02:00
2025-01-06 22:48:56 +01:00
text = icon
state = section.state + (0 if wifi else 1)
if wifi and state >= 1: # SSID
cmd = ["iwgetid", iface, "--raw"]
proc = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
text += f" {stdout.decode().strip()}"
if state >= 2: # Address
for address in self.if_addrs[iface]:
if address.family == socket.AF_INET:
net = ipaddress.IPv4Network(
(address.address, address.netmask), strict=False
)
text += f" {address.address}/{net.prefixlen}"
break
if state >= 3: # Speed
prevRecv = self.prev_io_counters[iface].bytes_recv
recv = self.io_counters[iface].bytes_recv
prevSent = self.prev_io_counters[iface].bytes_sent
sent = self.io_counters[iface].bytes_sent
dt = self.time - self.prev_time
recvDiff = (recv - prevRecv) / dt
sentDiff = (sent - prevSent) / dt
text += f"{humanSize(recvDiff)}{humanSize(sentDiff)}"
if state >= 4: # Counter
text += f"{humanSize(recv)}{humanSize(sent)}"
section.setText(text)
section.setChangedState(update)
return update
2024-08-25 09:48:36 +02:00
2024-09-27 23:06:13 +02:00
async def loop(self) -> None:
loop = asyncio.get_running_loop()
2025-01-06 22:48:56 +01:00
self.prev_io_counters = self.io_counters
self.prev_time = self.time
# On-demand would only benefit if_addrs:
# stats are used to determine display,
# and we want to keep previous io_counters
# so displaying stats is ~instant.
self.time = loop.time()
self.if_stats = psutil.net_if_stats()
self.if_addrs = psutil.net_if_addrs()
self.io_counters = psutil.net_io_counters(pernic=True)
2024-08-25 09:48:36 +02:00
2025-01-06 22:48:56 +01:00
await self.updateSections(set(self.if_stats.keys()), self.module)
2024-08-25 09:48:36 +02:00
2024-09-27 23:06:13 +02:00
class TimeProvider(PeriodicStatefulProvider):
2024-08-17 00:57:06 +02:00
FORMATS = ["%H:%M", "%m-%d %H:%M:%S", "%a %y-%m-%d %H:%M:%S"]
2024-09-27 23:06:13 +02:00
async def init(self) -> None:
2024-08-25 09:48:36 +02:00
self.section.state = 1
self.section.numberStates = len(self.FORMATS)
2024-08-13 01:58:51 +02:00
2024-09-27 23:06:13 +02:00
async def loop(self) -> None:
now = datetime.datetime.now()
format = self.FORMATS[self.section.state]
self.section.setText(now.strftime(format))
2024-08-13 01:58:51 +02:00
async def main() -> None:
2025-01-03 18:56:20 +01:00
FROGARIZED = [
"#092c0e",
"#143718",
"#5a7058",
"#677d64",
"#89947f",
"#99a08d",
"#fae2e3",
"#fff0f1",
"#e0332e",
"#cf4b15",
"#bb8801",
"#8d9800",
"#1fa198",
"#008dd1",
"#5c73c4",
"#d43982",
]
# TODO Configurable
2025-01-03 19:59:07 +01:00
# TODO Not super happy with the color management,
# while using an existing library is great, it's limited to ANSI colors
2025-01-03 18:56:20 +01:00
def base16_color(color: int) -> tuple[int, int, int]:
hexa = FROGARIZED[color]
return tuple(rich.color.parse_rgb_hex(hexa[1:]))
theme = rich.terminal_theme.TerminalTheme(
base16_color(0x0),
base16_color(0x0), # TODO should be 7, currently 0 so it's compatible with v2
[
base16_color(0x0), # black
base16_color(0x8), # red
base16_color(0xB), # green
base16_color(0xA), # yellow
base16_color(0xD), # blue
base16_color(0xE), # magenta
base16_color(0xC), # cyan
base16_color(0x5), # white
],
[
base16_color(0x3), # bright black
base16_color(0x8), # bright red
base16_color(0xB), # bright green
base16_color(0xA), # bright yellow
base16_color(0xD), # bright blue
base16_color(0xE), # bright magenta
base16_color(0xC), # bright cyan
base16_color(0x7), # bright white
],
)
bar = Bar(theme=theme)
2024-08-25 09:48:36 +02:00
dualScreen = len(bar.children) > 1
2025-01-03 20:18:31 +01:00
leftPreferred = 0 if dualScreen else None
rightPreferred = 1 if dualScreen else None
2024-08-14 02:45:25 +02:00
2025-01-03 18:56:20 +01:00
color = rich.color.Color.parse
bar.addProvider(I3ModeProvider(color=color("red")), alignment=Alignment.LEFT)
2024-08-17 00:57:06 +02:00
bar.addProvider(I3WorkspacesProvider(), alignment=Alignment.LEFT)
2024-08-14 02:45:25 +02:00
if dualScreen:
bar.addProvider(
2025-01-03 20:18:31 +01:00
I3WindowTitleProvider(color=color("white")),
screenNum=0,
alignment=Alignment.CENTER,
2024-08-14 02:45:25 +02:00
)
bar.addProvider(
2025-01-10 00:16:48 +01:00
MprisProvider(color=color("bright_white")),
2025-01-03 20:18:31 +01:00
screenNum=rightPreferred,
2024-08-14 02:45:25 +02:00
alignment=Alignment.CENTER,
)
2025-01-03 20:18:31 +01:00
bar.addProvider(CpuProvider(), screenNum=leftPreferred, alignment=Alignment.RIGHT)
bar.addProvider(LoadProvider(), screenNum=leftPreferred, alignment=Alignment.RIGHT)
bar.addProvider(RamProvider(), screenNum=leftPreferred, alignment=Alignment.RIGHT)
bar.addProvider(
TemperatureProvider(),
screenNum=leftPreferred,
alignment=Alignment.RIGHT,
)
bar.addProvider(
BatteryProvider(), screenNum=leftPreferred, alignment=Alignment.RIGHT
)
2025-01-03 18:56:20 +01:00
bar.addProvider(
2025-01-03 20:18:31 +01:00
PulseaudioProvider(color=color("magenta")),
screenNum=rightPreferred,
2024-08-14 02:45:25 +02:00
alignment=Alignment.RIGHT,
)
bar.addProvider(
2025-01-03 18:56:20 +01:00
NetworkProvider(color=color("blue")),
2025-01-03 20:18:31 +01:00
screenNum=leftPreferred,
2024-08-14 02:45:25 +02:00
alignment=Alignment.RIGHT,
)
2025-01-03 18:56:20 +01:00
bar.addProvider(TimeProvider(color=color("cyan")), alignment=Alignment.RIGHT)
2024-08-14 02:45:25 +02:00
2024-08-13 01:58:51 +02:00
await bar.run()
asyncio.run(main())