1295 lines
41 KiB
Python
1295 lines
41 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import asyncio
|
|
import collections
|
|
import datetime
|
|
import enum
|
|
import ipaddress
|
|
import logging
|
|
import os
|
|
import signal
|
|
import socket
|
|
import time
|
|
import typing
|
|
|
|
import gi
|
|
import gi.events
|
|
import i3ipc
|
|
import i3ipc.aio
|
|
import psutil
|
|
import pulsectl
|
|
import pulsectl_asyncio
|
|
import rich.color
|
|
import rich.logging
|
|
import rich.terminal_theme
|
|
|
|
gi.require_version("Playerctl", "2.0")
|
|
|
|
import gi.repository.GLib
|
|
import gi.repository.Playerctl
|
|
|
|
logging.basicConfig(
|
|
level="DEBUG",
|
|
format="%(message)s",
|
|
datefmt="[%X]",
|
|
handlers=[rich.logging.RichHandler()],
|
|
)
|
|
log = logging.getLogger("frobar")
|
|
|
|
T = typing.TypeVar("T", bound="ComposableText")
|
|
P = typing.TypeVar("P", bound="ComposableText")
|
|
C = typing.TypeVar("C", bound="ComposableText")
|
|
Sortable = str | int
|
|
|
|
# Display utilities
|
|
|
|
|
|
def humanSize(numi: int) -> str:
|
|
"""
|
|
Returns a string of width 3+3
|
|
"""
|
|
num = float(numi)
|
|
for unit in ("B ", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB"):
|
|
if abs(num) < 1000:
|
|
if num >= 10:
|
|
return f"{int(num):3d}{unit}"
|
|
else:
|
|
return f"{num:.1f}{unit}"
|
|
num /= 1024
|
|
return f"{numi:d}YiB"
|
|
|
|
|
|
def ramp(p: float, states: str = " ▁▂▃▄▅▆▇█") -> str:
|
|
if p < 0:
|
|
return ""
|
|
d, m = divmod(p, 1.0)
|
|
return states[-1] * int(d) + states[round(m * (len(states) - 1))]
|
|
|
|
|
|
def clip(text: str, length: int = 30) -> str:
|
|
if len(text) > length:
|
|
text = text[: length - 1] + "…"
|
|
return text
|
|
|
|
|
|
class ComposableText(typing.Generic[P, C]):
|
|
|
|
def __init__(
|
|
self,
|
|
parent: typing.Optional[P] = None,
|
|
sortKey: Sortable = 0,
|
|
) -> None:
|
|
self.parent: typing.Optional[P] = None
|
|
self.children: typing.MutableSequence[C] = list()
|
|
self.sortKey = sortKey
|
|
if parent:
|
|
self.setParent(parent)
|
|
self.bar = self.getFirstParentOfType(Bar)
|
|
|
|
def setParent(self, parent: P) -> None:
|
|
assert self.parent is None
|
|
parent.children.append(self)
|
|
assert isinstance(parent.children, list)
|
|
parent.children.sort(key=lambda c: c.sortKey)
|
|
self.parent = parent
|
|
self.parent.updateMarkup()
|
|
|
|
def unsetParent(self) -> None:
|
|
assert self.parent
|
|
self.parent.children.remove(self)
|
|
self.parent.updateMarkup()
|
|
self.parent = None
|
|
|
|
def getFirstParentOfType(self, typ: typing.Type[T]) -> T:
|
|
parent = self
|
|
while not isinstance(parent, typ):
|
|
assert parent.parent, f"{self} doesn't have a parent of {typ}"
|
|
parent = parent.parent
|
|
return parent
|
|
|
|
def updateMarkup(self) -> None:
|
|
self.bar.refresh.set()
|
|
# TODO OPTI See if worth caching the output
|
|
|
|
def generateMarkup(self) -> str:
|
|
raise NotImplementedError(f"{self} cannot generate markup")
|
|
|
|
def getMarkup(self) -> str:
|
|
return self.generateMarkup()
|
|
|
|
|
|
class Button(enum.Enum):
|
|
CLICK_LEFT = "1"
|
|
CLICK_MIDDLE = "2"
|
|
CLICK_RIGHT = "3"
|
|
SCROLL_UP = "4"
|
|
SCROLL_DOWN = "5"
|
|
|
|
|
|
class Section(ComposableText):
|
|
"""
|
|
Colorable block separated by chevrons
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
parent: "Module",
|
|
sortKey: Sortable = 0,
|
|
color: rich.color.Color = rich.color.Color.default(),
|
|
) -> None:
|
|
super().__init__(parent=parent, sortKey=sortKey)
|
|
self.parent: "Module"
|
|
self.color = color
|
|
|
|
self.desiredText: str | None = None
|
|
self.text = ""
|
|
self.targetSize = -1
|
|
self.size = -1
|
|
self.animationTask: asyncio.Task | None = None
|
|
self.actions: dict[Button, str] = dict()
|
|
|
|
def isHidden(self) -> bool:
|
|
return self.size < 0
|
|
|
|
# Geometric series, with a cap
|
|
ANIM_A = 0.025
|
|
ANIM_R = 0.9
|
|
ANIM_MIN = 0.001
|
|
|
|
async def animate(self) -> None:
|
|
increment = 1 if self.size < self.targetSize else -1
|
|
loop = asyncio.get_running_loop()
|
|
frameTime = loop.time()
|
|
animTime = self.ANIM_A
|
|
skipped = 0
|
|
|
|
while self.size != self.targetSize:
|
|
self.size += increment
|
|
self.updateMarkup()
|
|
|
|
animTime *= self.ANIM_R
|
|
animTime = max(self.ANIM_MIN, animTime)
|
|
frameTime += animTime
|
|
sleepTime = frameTime - loop.time()
|
|
|
|
# In case of stress, skip refreshing by not awaiting
|
|
if sleepTime > 0:
|
|
if skipped > 0:
|
|
log.warning(f"Skipped {skipped} animation frame(s)")
|
|
skipped = 0
|
|
await asyncio.sleep(sleepTime)
|
|
else:
|
|
skipped += 1
|
|
|
|
def setText(self, text: str | None) -> None:
|
|
# OPTI Don't redraw nor reset animation if setting the same text
|
|
if self.desiredText == text:
|
|
return
|
|
self.desiredText = text
|
|
if text is None:
|
|
self.text = ""
|
|
self.targetSize = -1
|
|
else:
|
|
self.text = f" {text} "
|
|
self.targetSize = len(self.text)
|
|
if self.animationTask:
|
|
self.animationTask.cancel()
|
|
# OPTI Skip the whole animation task if not required
|
|
if self.size == self.targetSize:
|
|
self.updateMarkup()
|
|
else:
|
|
self.animationTask = self.bar.taskGroup.create_task(self.animate())
|
|
|
|
def setAction(self, button: Button, callback: typing.Callable | None) -> None:
|
|
if button in self.actions:
|
|
command = self.actions[button]
|
|
self.bar.removeAction(command)
|
|
del self.actions[button]
|
|
if callback:
|
|
command = self.bar.addAction(callback)
|
|
self.actions[button] = command
|
|
|
|
def generateMarkup(self) -> str:
|
|
assert not self.isHidden()
|
|
pad = max(0, self.size - len(self.text))
|
|
text = self.text[: self.size] + " " * pad
|
|
for button, command in self.actions.items():
|
|
text = "%{A" + button.value + ":" + command + ":}" + text + "%{A}"
|
|
return text
|
|
|
|
|
|
class Module(ComposableText):
|
|
"""
|
|
Sections handled by a same updater
|
|
"""
|
|
|
|
def __init__(self, parent: "Side") -> None:
|
|
super().__init__(parent=parent)
|
|
self.parent: "Side"
|
|
self.children: typing.MutableSequence[Section]
|
|
|
|
self.mirroring: Module | None = None
|
|
self.mirrors: list[Module] = list()
|
|
|
|
def mirror(self, module: "Module") -> None:
|
|
self.mirroring = module
|
|
module.mirrors.append(self)
|
|
|
|
def getSections(self) -> typing.Sequence[Section]:
|
|
if self.mirroring:
|
|
return self.mirroring.children
|
|
else:
|
|
return self.children
|
|
|
|
def updateMarkup(self) -> None:
|
|
super().updateMarkup()
|
|
for mirror in self.mirrors:
|
|
mirror.updateMarkup()
|
|
|
|
|
|
class Alignment(enum.Enum):
|
|
LEFT = "l"
|
|
RIGHT = "r"
|
|
CENTER = "c"
|
|
|
|
|
|
class Side(ComposableText):
|
|
def __init__(self, parent: "Screen", alignment: Alignment) -> None:
|
|
super().__init__(parent=parent)
|
|
self.parent: Screen
|
|
self.children: typing.MutableSequence[Module] = []
|
|
|
|
self.alignment = alignment
|
|
self.bar = parent.getFirstParentOfType(Bar)
|
|
|
|
def generateMarkup(self) -> str:
|
|
if not self.children:
|
|
return ""
|
|
text = "%{" + self.alignment.value + "}"
|
|
lastSection: Section | None = None
|
|
for module in self.children:
|
|
for section in module.getSections():
|
|
if section.isHidden():
|
|
continue
|
|
hexa = section.color.get_truecolor(theme=self.bar.theme).hex
|
|
if lastSection is None:
|
|
if self.alignment == Alignment.LEFT:
|
|
text += "%{B" + hexa + "}%{F-}"
|
|
else:
|
|
text += "%{B-}%{F" + hexa + "}%{R}%{F-}"
|
|
else:
|
|
if self.alignment == Alignment.RIGHT:
|
|
if lastSection.color == section.color:
|
|
text += ""
|
|
else:
|
|
text += "%{F" + hexa + "}%{R}"
|
|
else:
|
|
if lastSection.color == section.color:
|
|
text += ""
|
|
else:
|
|
text += "%{R}%{B" + hexa + "}"
|
|
text += "%{F-}"
|
|
text += section.getMarkup()
|
|
lastSection = section
|
|
if self.alignment != Alignment.RIGHT:
|
|
text += "%{R}%{B-}"
|
|
return text
|
|
|
|
|
|
class Screen(ComposableText):
|
|
def __init__(self, parent: "Bar", output: str) -> None:
|
|
super().__init__(parent=parent)
|
|
self.parent: "Bar"
|
|
self.children: typing.MutableSequence[Side]
|
|
|
|
self.output = output
|
|
|
|
for alignment in Alignment:
|
|
Side(parent=self, alignment=alignment)
|
|
|
|
def generateMarkup(self) -> str:
|
|
return ("%{Sn" + self.output + "}") + "".join(
|
|
side.getMarkup() for side in self.children
|
|
)
|
|
|
|
|
|
class Bar(ComposableText):
|
|
"""
|
|
Top-level
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
theme: rich.terminal_theme.TerminalTheme = rich.terminal_theme.DEFAULT_TERMINAL_THEME,
|
|
) -> None:
|
|
super().__init__()
|
|
self.parent: None
|
|
self.children: typing.MutableSequence[Screen]
|
|
self.longRunningTasks: list[asyncio.Task] = list()
|
|
self.theme = theme
|
|
|
|
self.refresh = asyncio.Event()
|
|
self.taskGroup = asyncio.TaskGroup()
|
|
self.providers: list["Provider"] = list()
|
|
self.actionIndex = 0
|
|
self.actions: dict[str, typing.Callable] = dict()
|
|
|
|
self.periodicProviderTask: typing.Coroutine | None = None
|
|
|
|
i3 = i3ipc.Connection()
|
|
for output in i3.get_outputs():
|
|
if not output.active:
|
|
continue
|
|
Screen(parent=self, output=output.name)
|
|
|
|
def addLongRunningTask(self, coro: typing.Coroutine) -> None:
|
|
task = self.taskGroup.create_task(coro)
|
|
self.longRunningTasks.append(task)
|
|
|
|
async def run(self) -> None:
|
|
cmd = [
|
|
"lemonbar",
|
|
"-b",
|
|
"-a",
|
|
"64",
|
|
"-f",
|
|
"DejaVuSansM Nerd Font:size=10",
|
|
"-F",
|
|
self.theme.foreground_color.hex,
|
|
"-B",
|
|
self.theme.background_color.hex,
|
|
]
|
|
proc = await asyncio.create_subprocess_exec(
|
|
*cmd, stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE
|
|
)
|
|
|
|
async def refresher() -> None:
|
|
assert proc.stdin
|
|
while True:
|
|
await self.refresh.wait()
|
|
self.refresh.clear()
|
|
markup = self.getMarkup()
|
|
proc.stdin.write(markup.encode())
|
|
|
|
async def actionHandler() -> None:
|
|
assert proc.stdout
|
|
while True:
|
|
line = await proc.stdout.readline()
|
|
command = line.decode().strip()
|
|
callback = self.actions[command]
|
|
callback()
|
|
|
|
async with self.taskGroup:
|
|
self.addLongRunningTask(refresher())
|
|
self.addLongRunningTask(actionHandler())
|
|
for provider in self.providers:
|
|
self.addLongRunningTask(provider.run())
|
|
|
|
def exit() -> None:
|
|
log.info("Terminating")
|
|
for task in self.longRunningTasks:
|
|
task.cancel()
|
|
|
|
loop = asyncio.get_event_loop()
|
|
loop.add_signal_handler(signal.SIGINT, exit)
|
|
|
|
def generateMarkup(self) -> str:
|
|
return "".join(screen.getMarkup() for screen in self.children) + "\n"
|
|
|
|
def addProvider(
|
|
self,
|
|
provider: "Provider",
|
|
alignment: Alignment = Alignment.LEFT,
|
|
screenNum: int | None = None,
|
|
) -> None:
|
|
"""
|
|
screenNum: the provider will be added on this screen if set, all otherwise
|
|
"""
|
|
modules = list()
|
|
for s, screen in enumerate(self.children):
|
|
if screenNum is None or s == screenNum:
|
|
side = next(filter(lambda s: s.alignment == alignment, screen.children))
|
|
module = Module(parent=side)
|
|
modules.append(module)
|
|
provider.modules = modules
|
|
if modules:
|
|
self.providers.append(provider)
|
|
|
|
def addAction(self, callback: typing.Callable) -> str:
|
|
command = f"{self.actionIndex:x}"
|
|
self.actions[command] = callback
|
|
self.actionIndex += 1
|
|
return command
|
|
|
|
def removeAction(self, command: str) -> None:
|
|
del self.actions[command]
|
|
|
|
|
|
class Provider:
|
|
sectionType: type[Section] = Section
|
|
|
|
def __init__(self, color: rich.color.Color = rich.color.Color.default()) -> None:
|
|
self.modules: list[Module] = list()
|
|
self.color = color
|
|
|
|
async def run(self) -> None:
|
|
# Not a NotImplementedError, otherwise can't combine all classes
|
|
pass
|
|
|
|
|
|
class MirrorProvider(Provider):
|
|
def __init__(self, color: rich.color.Color = rich.color.Color.default()) -> None:
|
|
super().__init__(color=color)
|
|
self.module: Module
|
|
|
|
async def run(self) -> None:
|
|
await super().run()
|
|
self.module = self.modules[0]
|
|
for module in self.modules[1:]:
|
|
module.mirror(self.module)
|
|
|
|
|
|
class SingleSectionProvider(MirrorProvider):
|
|
async def run(self) -> None:
|
|
await super().run()
|
|
self.section = self.sectionType(parent=self.module, color=self.color)
|
|
|
|
|
|
class StaticProvider(SingleSectionProvider):
|
|
def __init__(
|
|
self, text: str, color: rich.color.Color = rich.color.Color.default()
|
|
) -> None:
|
|
super().__init__(color=color)
|
|
self.text = text
|
|
|
|
async def run(self) -> None:
|
|
await super().run()
|
|
self.section.setText(self.text)
|
|
|
|
|
|
class StatefulSection(Section):
|
|
|
|
def __init__(
|
|
self,
|
|
parent: Module,
|
|
sortKey: Sortable = 0,
|
|
color: rich.color.Color = rich.color.Color.default(),
|
|
) -> None:
|
|
super().__init__(parent=parent, sortKey=sortKey, color=color)
|
|
self.state = 0
|
|
self.numberStates: int
|
|
|
|
self.setAction(Button.CLICK_LEFT, self.incrementState)
|
|
self.setAction(Button.CLICK_RIGHT, self.decrementState)
|
|
|
|
def incrementState(self) -> None:
|
|
self.state += 1
|
|
self.changeState()
|
|
|
|
def decrementState(self) -> None:
|
|
self.state -= 1
|
|
self.changeState()
|
|
|
|
def setChangedState(self, callback: typing.Callable) -> None:
|
|
self.callback = callback
|
|
|
|
def changeState(self) -> None:
|
|
self.state %= self.numberStates
|
|
self.bar.taskGroup.create_task(self.callback())
|
|
|
|
|
|
class StatefulSectionProvider(Provider):
|
|
sectionType = StatefulSection
|
|
|
|
|
|
class SingleStatefulSectionProvider(StatefulSectionProvider, SingleSectionProvider):
|
|
section: StatefulSection
|
|
|
|
|
|
class MultiSectionsProvider(Provider):
|
|
|
|
def __init__(self, color: rich.color.Color = rich.color.Color.default()) -> None:
|
|
super().__init__(color=color)
|
|
self.sectionKeys: dict[Module, dict[Sortable, Section]] = (
|
|
collections.defaultdict(dict)
|
|
)
|
|
self.updaters: dict[Section, typing.Callable] = dict()
|
|
|
|
async def getSectionUpdater(self, section: Section) -> typing.Callable:
|
|
raise NotImplementedError()
|
|
|
|
async def updateSections(self, sections: set[Sortable], module: Module) -> None:
|
|
moduleSections = self.sectionKeys[module]
|
|
async with asyncio.TaskGroup() as tg:
|
|
for sortKey in sections:
|
|
section = moduleSections.get(sortKey)
|
|
if not section:
|
|
section = self.sectionType(
|
|
parent=module, sortKey=sortKey, color=self.color
|
|
)
|
|
self.updaters[section] = await self.getSectionUpdater(section)
|
|
moduleSections[sortKey] = section
|
|
|
|
updater = self.updaters[section]
|
|
tg.create_task(updater())
|
|
|
|
missingKeys = set(moduleSections.keys()) - sections
|
|
for missingKey in missingKeys:
|
|
section = moduleSections.get(missingKey)
|
|
assert section
|
|
section.setText(None)
|
|
|
|
|
|
class PeriodicProvider(Provider):
|
|
async def init(self) -> None:
|
|
pass
|
|
|
|
async def loop(self) -> None:
|
|
raise NotImplementedError()
|
|
|
|
@classmethod
|
|
async def task(cls, bar: Bar) -> None:
|
|
providers = list()
|
|
for provider in bar.providers:
|
|
if isinstance(provider, PeriodicProvider):
|
|
providers.append(provider)
|
|
await provider.init()
|
|
|
|
while True:
|
|
# TODO Block bar update during the periodic update of the loops
|
|
loops = [provider.loop() for provider in providers]
|
|
asyncio.gather(*loops)
|
|
|
|
now = datetime.datetime.now()
|
|
# Hardcoded to 1 second... not sure if we want some more than that,
|
|
# and if the logic to check if a task should run would be a win
|
|
# compared to the task itself
|
|
remaining = 1 - now.microsecond / 1000000
|
|
await asyncio.sleep(remaining)
|
|
|
|
async def run(self) -> None:
|
|
await super().run()
|
|
for module in self.modules:
|
|
bar = module.getFirstParentOfType(Bar)
|
|
assert bar
|
|
if not bar.periodicProviderTask:
|
|
bar.periodicProviderTask = PeriodicProvider.task(bar)
|
|
bar.addLongRunningTask(bar.periodicProviderTask)
|
|
|
|
|
|
class PeriodicStatefulProvider(SingleStatefulSectionProvider, PeriodicProvider):
|
|
async def run(self) -> None:
|
|
await super().run()
|
|
self.section.setChangedState(self.loop)
|
|
|
|
|
|
# Providers
|
|
|
|
|
|
class I3ModeProvider(SingleSectionProvider):
|
|
def on_mode(self, i3: i3ipc.Connection, e: i3ipc.Event) -> None:
|
|
self.section.setText(None if e.change == "default" else e.change)
|
|
|
|
async def run(self) -> None:
|
|
await super().run()
|
|
i3 = await i3ipc.aio.Connection(auto_reconnect=True).connect()
|
|
i3.on(i3ipc.Event.MODE, self.on_mode)
|
|
await i3.main()
|
|
|
|
# TODO Hide WorkspaceProvider when this is active
|
|
|
|
|
|
class I3WindowTitleProvider(SingleSectionProvider):
|
|
# TODO FEAT To make this available from start, we need to find the
|
|
# `focused=True` element following the `focus` array
|
|
def on_window(self, i3: i3ipc.Connection, e: i3ipc.Event) -> None:
|
|
self.section.setText(e.container.name)
|
|
|
|
async def run(self) -> None:
|
|
await super().run()
|
|
i3 = await i3ipc.aio.Connection(auto_reconnect=True).connect()
|
|
i3.on(i3ipc.Event.WINDOW, self.on_window)
|
|
await i3.main()
|
|
|
|
|
|
class I3WorkspacesProvider(MultiSectionsProvider):
|
|
COLOR_URGENT = rich.color.Color.parse("red")
|
|
COLOR_FOCUSED = rich.color.Color.parse("yellow")
|
|
# TODO Should be orange (not a terminal color)
|
|
COLOR_VISIBLE = rich.color.Color.parse("cyan")
|
|
COLOR_DEFAULT = rich.color.Color.parse("bright_black")
|
|
|
|
def __init__(
|
|
self,
|
|
) -> None:
|
|
super().__init__()
|
|
self.workspaces: dict[int, i3ipc.WorkspaceReply]
|
|
|
|
self.sections: dict[int, Section] = dict()
|
|
self.modulesFromOutput: dict[str, Module] = dict()
|
|
self.bar: Bar
|
|
|
|
async def getSectionUpdater(self, section: Section) -> typing.Callable:
|
|
assert isinstance(section.sortKey, int)
|
|
num = section.sortKey
|
|
|
|
def switch_to_workspace() -> None:
|
|
self.bar.taskGroup.create_task(self.i3.command(f"workspace number {num}"))
|
|
|
|
section.setAction(Button.CLICK_LEFT, switch_to_workspace)
|
|
|
|
async def update() -> None:
|
|
workspace = self.workspaces[num]
|
|
name = workspace.name
|
|
if workspace.urgent:
|
|
section.color = self.COLOR_URGENT
|
|
elif workspace.focused:
|
|
section.color = self.COLOR_FOCUSED
|
|
elif workspace.visible:
|
|
section.color = self.COLOR_VISIBLE
|
|
else:
|
|
section.color = self.COLOR_DEFAULT
|
|
if workspace.focused or workspace.visible:
|
|
name = f"{name} X" # FIXME Custom names
|
|
section.setText(name)
|
|
|
|
return update
|
|
|
|
async def updateWorkspaces(self) -> None:
|
|
"""
|
|
Since the i3 IPC interface cannot really tell you by events
|
|
when workspaces get invisible or not urgent anymore.
|
|
Relying on those exclusively would require reimplementing some of i3 logic.
|
|
Fetching all the workspaces on event looks ugly but is the most maintainable.
|
|
Times I tried to challenge this and failed: 2.
|
|
"""
|
|
workspaces = await self.i3.get_workspaces()
|
|
self.workspaces = dict()
|
|
modules = collections.defaultdict(set)
|
|
for workspace in workspaces:
|
|
self.workspaces[workspace.num] = workspace
|
|
module = self.modulesFromOutput[workspace.output]
|
|
modules[module].add(workspace.num)
|
|
|
|
await asyncio.gather(
|
|
*[self.updateSections(nums, module) for module, nums in modules.items()]
|
|
)
|
|
|
|
def onWorkspaceChange(
|
|
self, i3: i3ipc.Connection, e: i3ipc.Event | None = None
|
|
) -> None:
|
|
# Cancelling the task doesn't seem to prevent performance double-events
|
|
self.bar.taskGroup.create_task(self.updateWorkspaces())
|
|
|
|
async def run(self) -> None:
|
|
for module in self.modules:
|
|
screen = module.getFirstParentOfType(Screen)
|
|
output = screen.output
|
|
self.modulesFromOutput[output] = module
|
|
self.bar = module.bar
|
|
|
|
self.i3 = await i3ipc.aio.Connection(auto_reconnect=True).connect()
|
|
self.i3.on(i3ipc.Event.WORKSPACE, self.onWorkspaceChange)
|
|
self.onWorkspaceChange(self.i3)
|
|
await self.i3.main()
|
|
|
|
|
|
class MprisProvider(MirrorProvider):
|
|
|
|
STATUSES = {
|
|
gi.repository.Playerctl.PlaybackStatus.PLAYING: "",
|
|
gi.repository.Playerctl.PlaybackStatus.PAUSED: "",
|
|
gi.repository.Playerctl.PlaybackStatus.STOPPED: "",
|
|
}
|
|
|
|
PROVIDERS = {
|
|
"mpd": "",
|
|
"firefox": "",
|
|
"chromium": "",
|
|
"mpv": "",
|
|
}
|
|
|
|
@staticmethod
|
|
def get(
|
|
something: gi.overrides.GLib.Variant, key: str, default: typing.Any = None
|
|
) -> typing.Any:
|
|
if key in something.keys():
|
|
return something[key]
|
|
else:
|
|
return default
|
|
|
|
@staticmethod
|
|
def format_us(ms: int) -> str:
|
|
if ms < 60 * 60 * 1000000:
|
|
return time.strftime("%M:%S", time.gmtime(ms // 1000000))
|
|
else:
|
|
return str(datetime.timedelta(microseconds=ms))
|
|
|
|
def show_player(self, player_: gi.repository.Playerctl.Player) -> None:
|
|
player = player_.props.player_name
|
|
player = self.PROVIDERS.get(player, player)
|
|
status = self.STATUSES.get(player_.props.playback_status, "?")
|
|
self.status.setText(f"{player} {status}")
|
|
|
|
metadata = player_.props.metadata
|
|
|
|
album = self.get(metadata, "xesam:album")
|
|
if album:
|
|
self.album.setText(f" {clip(album)}")
|
|
else:
|
|
self.album.setText(None)
|
|
|
|
artists = self.get(metadata, "xesam:artist")
|
|
if artists:
|
|
artist = ", ".join(artists)
|
|
self.artist.setText(f" {clip(artist)}")
|
|
else:
|
|
self.artist.setText(None)
|
|
|
|
pos = player_.props.position # In µs
|
|
# FIXME Doesn't increment during play
|
|
text = f" {self.format_us(pos)}"
|
|
dur = self.get(metadata, "mpris:length")
|
|
if dur:
|
|
text += f"/{self.format_us(dur)}"
|
|
title = self.get(metadata, "xesam:title")
|
|
if title:
|
|
text += f" {clip(title)}"
|
|
self.title.setText(text)
|
|
|
|
def show_players(
|
|
self, manager: gi.repository.Playerctl.PlayerManager, exclude: str | None = None
|
|
) -> None:
|
|
# FIXME Opening and closing a chromium player, it will stay there (as default),
|
|
# which is not the behaviour of playerctl somehow
|
|
for name in manager.props.player_names:
|
|
if name == exclude: # DEBUG
|
|
continue
|
|
player = gi.repository.Playerctl.Player.new_from_name(name)
|
|
self.show_player(player)
|
|
break
|
|
else:
|
|
self.status.setText(None)
|
|
self.album.setText(None)
|
|
self.artist.setText(None)
|
|
self.title.setText(None)
|
|
|
|
def on_player_appear(
|
|
self,
|
|
manager: gi.repository.Playerctl.PlayerManager,
|
|
player: gi.repository.Playerctl.Player,
|
|
) -> None:
|
|
log.debug(f"Player appeared: {player.props.player_name}")
|
|
self.show_player(player)
|
|
|
|
def on_player_vanished(
|
|
self,
|
|
manager: gi.repository.Playerctl.PlayerManager,
|
|
player: gi.repository.Playerctl.Player,
|
|
) -> None:
|
|
log.debug(f"Player vanished: {player.props.player_name}")
|
|
self.show_player(player)
|
|
|
|
def on_event(
|
|
self,
|
|
player: gi.repository.Playerctl.Player,
|
|
_: typing.Any,
|
|
manager: gi.repository.Playerctl.PlayerManager,
|
|
) -> None:
|
|
log.debug(f"Player evented: {player.props.player_name}")
|
|
self.show_player(player)
|
|
|
|
def init_player(
|
|
self, manager: gi.repository.Playerctl.PlayerManager, name: str
|
|
) -> None:
|
|
player = gi.repository.Playerctl.Player.new_from_name(name)
|
|
# All events will cause the active player to change,
|
|
# so we listen on all events, even if the display won't change
|
|
player.connect("playback-status", self.on_event, manager)
|
|
player.connect("loop-status", self.on_event, manager)
|
|
player.connect("shuffle", self.on_event, manager)
|
|
player.connect("metadata", self.on_event, manager)
|
|
player.connect("volume", self.on_event, manager)
|
|
player.connect("seeked", self.on_event, manager)
|
|
manager.manage_player(player)
|
|
|
|
def on_name_appeared(
|
|
self, manager: gi.repository.Playerctl.PlayerManager, name: str
|
|
) -> None:
|
|
log.debug(f"Player name appeared: {name}")
|
|
self.init_player(manager, name)
|
|
|
|
def on_name_vanished(
|
|
self,
|
|
manager: gi.repository.Playerctl.PlayerManager,
|
|
name: str,
|
|
) -> None:
|
|
log.debug(f"Player name vanished: {name}")
|
|
self.show_players(manager, exclude=name)
|
|
|
|
async def run(self) -> None:
|
|
await super().run()
|
|
self.status = self.sectionType(parent=self.module, color=self.color)
|
|
self.album = self.sectionType(parent=self.module, color=self.color)
|
|
self.artist = self.sectionType(parent=self.module, color=self.color)
|
|
self.title = self.sectionType(parent=self.module, color=self.color)
|
|
|
|
manager = gi.repository.Playerctl.PlayerManager()
|
|
manager.connect("name-appeared", self.on_name_appeared)
|
|
manager.connect("player-appeared", self.on_player_appear)
|
|
manager.connect("name-vanished", self.on_name_vanished)
|
|
manager.connect("player-vanished", self.on_player_vanished)
|
|
|
|
for name in manager.props.player_names:
|
|
self.init_player(manager, name)
|
|
|
|
self.show_players(manager)
|
|
|
|
|
|
class AlertingProvider(Provider):
|
|
COLOR_NORMAL = rich.color.Color.parse("green")
|
|
COLOR_WARNING = rich.color.Color.parse("yellow")
|
|
COLOR_DANGER = rich.color.Color.parse("red")
|
|
|
|
warningThreshold: float
|
|
dangerThreshold: float
|
|
|
|
def updateLevel(self, level: float) -> None:
|
|
if level > self.dangerThreshold:
|
|
color = self.COLOR_DANGER
|
|
elif level > self.warningThreshold:
|
|
color = self.COLOR_WARNING
|
|
else:
|
|
color = self.COLOR_NORMAL
|
|
for module in self.modules:
|
|
for section in module.getSections():
|
|
section.color = color
|
|
|
|
|
|
class CpuProvider(AlertingProvider, PeriodicStatefulProvider):
|
|
async def init(self) -> None:
|
|
self.section.numberStates = 3
|
|
self.warningThreshold = 75
|
|
self.dangerThreshold = 95
|
|
|
|
async def loop(self) -> None:
|
|
percent = psutil.cpu_percent(percpu=False)
|
|
self.updateLevel(percent)
|
|
|
|
text = ""
|
|
if self.section.state >= 2:
|
|
percents = psutil.cpu_percent(percpu=True)
|
|
text += " " + "".join([ramp(p / 100) for p in percents])
|
|
elif self.section.state >= 1:
|
|
text += " " + ramp(percent / 100)
|
|
self.section.setText(text)
|
|
|
|
|
|
class LoadProvider(AlertingProvider, PeriodicStatefulProvider):
|
|
async def init(self) -> None:
|
|
self.section.numberStates = 3
|
|
self.warningThreshold = 5
|
|
self.dangerThreshold = 10
|
|
|
|
async def loop(self) -> None:
|
|
load = os.getloadavg()
|
|
self.updateLevel(load[0])
|
|
|
|
text = ""
|
|
loads = 3 if self.section.state >= 2 else self.section.state
|
|
for load_index in range(loads):
|
|
text += f" {load[load_index]:.2f}"
|
|
self.section.setText(text)
|
|
|
|
|
|
class RamProvider(AlertingProvider, PeriodicStatefulProvider):
|
|
|
|
async def init(self) -> None:
|
|
self.section.numberStates = 4
|
|
self.warningThreshold = 75
|
|
self.dangerThreshold = 95
|
|
|
|
async def loop(self) -> None:
|
|
mem = psutil.virtual_memory()
|
|
self.updateLevel(mem.percent)
|
|
|
|
text = ""
|
|
if self.section.state >= 1:
|
|
text += " " + ramp(mem.percent / 100)
|
|
if self.section.state >= 2:
|
|
text += humanSize(mem.total - mem.available)
|
|
if self.section.state >= 3:
|
|
text += "/" + humanSize(mem.total)
|
|
self.section.setText(text)
|
|
|
|
|
|
class TemperatureProvider(AlertingProvider, PeriodicStatefulProvider):
|
|
RAMP = ""
|
|
MAIN_TEMPS = ["coretemp", "amdgpu", "cpu_thermal"]
|
|
# For Intel, AMD and ARM respectively.
|
|
# FIXME Threshold doesn't seem to match old version
|
|
|
|
main: str
|
|
|
|
async def init(self) -> None:
|
|
self.section.numberStates = 2
|
|
self.warningThreshold = 75
|
|
self.dangerThreshold = 95
|
|
|
|
allTemp = psutil.sensors_temperatures()
|
|
for main in self.MAIN_TEMPS:
|
|
if main in allTemp:
|
|
self.main = main
|
|
break
|
|
else:
|
|
raise IndexError("Could not find suitable temperature sensor")
|
|
|
|
temp = allTemp[self.main][0]
|
|
self.warningThresold = temp.high or 90.0
|
|
self.dangerThresold = temp.critical or 100.0
|
|
|
|
async def loop(self) -> None:
|
|
allTemp = psutil.sensors_temperatures()
|
|
temp = allTemp[self.main][0]
|
|
self.updateLevel(temp.current)
|
|
|
|
text = ramp(temp.current / self.warningThreshold, self.RAMP)
|
|
if self.section.state >= 1:
|
|
text += f" {temp.current:.0f}°C"
|
|
self.section.setText(text)
|
|
|
|
|
|
class BatteryProvider(AlertingProvider, PeriodicStatefulProvider):
|
|
# TODO Support ACPID for events
|
|
RAMP = ""
|
|
|
|
async def init(self) -> None:
|
|
self.section.numberStates = 3
|
|
# TODO 1 refresh rate is too quick
|
|
|
|
self.warningThreshold = 75
|
|
self.dangerThreshold = 95
|
|
|
|
async def loop(self) -> None:
|
|
bat = psutil.sensors_battery()
|
|
if not bat:
|
|
self.section.setText(None)
|
|
|
|
self.updateLevel(100 - bat.percent)
|
|
|
|
text = "" if bat.power_plugged else ""
|
|
text += ramp(bat.percent / 100, self.RAMP)
|
|
|
|
if self.section.state >= 1:
|
|
text += f" {bat.percent:.0f}%"
|
|
if self.section.state >= 2:
|
|
h = int(bat.secsleft / 3600)
|
|
m = int((bat.secsleft - h * 3600) / 60)
|
|
text += f" ({h:d}:{m:02d})"
|
|
|
|
self.section.setText(text)
|
|
|
|
|
|
class PulseaudioProvider(
|
|
MirrorProvider, StatefulSectionProvider, MultiSectionsProvider
|
|
):
|
|
async def getSectionUpdater(self, section: Section) -> typing.Callable:
|
|
assert isinstance(section, StatefulSection)
|
|
assert isinstance(section.sortKey, str)
|
|
|
|
sink = self.sinks[section.sortKey]
|
|
|
|
if (
|
|
sink.port_active.name == "analog-output-headphones"
|
|
or sink.port_active.description == "Headphones"
|
|
):
|
|
icon = ""
|
|
elif (
|
|
sink.port_active.name == "analog-output-speaker"
|
|
or sink.port_active.description == "Speaker"
|
|
):
|
|
icon = ""
|
|
elif sink.port_active.name in ("headset-output", "headphone-output"):
|
|
icon = ""
|
|
else:
|
|
icon = "?"
|
|
|
|
section.numberStates = 3
|
|
section.state = 1
|
|
|
|
# TODO Change volume with wheel
|
|
|
|
async def updater() -> None:
|
|
assert isinstance(section, StatefulSection)
|
|
text = icon
|
|
sink = self.sinks[section.sortKey]
|
|
|
|
async with pulsectl_asyncio.PulseAsync("frobar-get-volume") as pulse:
|
|
vol = await pulse.volume_get_all_chans(sink)
|
|
if section.state == 1:
|
|
text += f" {ramp(vol)}"
|
|
elif section.state == 2:
|
|
text += f" {vol:.0%}"
|
|
# TODO Show which is default
|
|
section.setText(text)
|
|
|
|
section.setChangedState(updater)
|
|
|
|
return updater
|
|
|
|
async def update(self) -> None:
|
|
async with pulsectl_asyncio.PulseAsync("frobar-list-sinks") as pulse:
|
|
self.sinks = dict((sink.name, sink) for sink in await pulse.sink_list())
|
|
await self.updateSections(set(self.sinks.keys()), self.module)
|
|
|
|
async def run(self) -> None:
|
|
await super().run()
|
|
await self.update()
|
|
async with pulsectl_asyncio.PulseAsync("frobar-events-listener") as pulse:
|
|
async for event in pulse.subscribe_events(pulsectl.PulseEventMaskEnum.sink):
|
|
await self.update()
|
|
|
|
|
|
class NetworkProvider(
|
|
MirrorProvider, PeriodicProvider, StatefulSectionProvider, MultiSectionsProvider
|
|
):
|
|
def __init__(
|
|
self,
|
|
color: rich.color.Color = rich.color.Color.default(),
|
|
) -> None:
|
|
super().__init__(color=color)
|
|
|
|
async def init(self) -> None:
|
|
loop = asyncio.get_running_loop()
|
|
self.time = loop.time()
|
|
self.io_counters = psutil.net_io_counters(pernic=True)
|
|
|
|
async def doNothing(self) -> None:
|
|
pass
|
|
|
|
@staticmethod
|
|
def getIfaceAttributes(iface: str) -> tuple[bool, str, bool]:
|
|
relevant = True
|
|
icon = "?"
|
|
wifi = False
|
|
if iface == "lo":
|
|
relevant = False
|
|
elif iface.startswith("eth") or iface.startswith("enp"):
|
|
if "u" in iface:
|
|
icon = ""
|
|
else:
|
|
icon = ""
|
|
elif iface.startswith("wlan") or iface.startswith("wl"):
|
|
icon = ""
|
|
wifi = True
|
|
elif (
|
|
iface.startswith("tun") or iface.startswith("tap") or iface.startswith("wg")
|
|
):
|
|
icon = ""
|
|
|
|
elif iface.startswith("docker"):
|
|
icon = ""
|
|
elif iface.startswith("veth"):
|
|
icon = ""
|
|
elif iface.startswith("vboxnet"):
|
|
icon = ""
|
|
|
|
return relevant, icon, wifi
|
|
|
|
async def getSectionUpdater(self, section: Section) -> typing.Callable:
|
|
|
|
assert isinstance(section, StatefulSection)
|
|
assert isinstance(section.sortKey, str)
|
|
iface = section.sortKey
|
|
|
|
relevant, icon, wifi = self.getIfaceAttributes(iface)
|
|
|
|
if not relevant:
|
|
return self.doNothing
|
|
|
|
section.numberStates = 5 if wifi else 4
|
|
section.state = 1 if wifi else 0
|
|
|
|
async def update() -> None:
|
|
assert isinstance(section, StatefulSection)
|
|
|
|
if not self.if_stats[iface].isup:
|
|
section.setText(None)
|
|
return
|
|
|
|
text = icon
|
|
|
|
state = section.state + (0 if wifi else 1)
|
|
if wifi and state >= 1: # SSID
|
|
cmd = ["iwgetid", iface, "--raw"]
|
|
proc = await asyncio.create_subprocess_exec(
|
|
*cmd, stdout=asyncio.subprocess.PIPE
|
|
)
|
|
stdout, stderr = await proc.communicate()
|
|
text += f" {stdout.decode().strip()}"
|
|
|
|
if state >= 2: # Address
|
|
for address in self.if_addrs[iface]:
|
|
if address.family == socket.AF_INET:
|
|
net = ipaddress.IPv4Network(
|
|
(address.address, address.netmask), strict=False
|
|
)
|
|
text += f" {address.address}/{net.prefixlen}"
|
|
break
|
|
|
|
if state >= 3: # Speed
|
|
prevRecv = self.prev_io_counters[iface].bytes_recv
|
|
recv = self.io_counters[iface].bytes_recv
|
|
prevSent = self.prev_io_counters[iface].bytes_sent
|
|
sent = self.io_counters[iface].bytes_sent
|
|
dt = self.time - self.prev_time
|
|
|
|
recvDiff = (recv - prevRecv) / dt
|
|
sentDiff = (sent - prevSent) / dt
|
|
text += f" ↓{humanSize(recvDiff)}↑{humanSize(sentDiff)}"
|
|
|
|
if state >= 4: # Counter
|
|
text += f" ⇓{humanSize(recv)}⇑{humanSize(sent)}"
|
|
|
|
section.setText(text)
|
|
|
|
section.setChangedState(update)
|
|
|
|
return update
|
|
|
|
async def loop(self) -> None:
|
|
loop = asyncio.get_running_loop()
|
|
|
|
self.prev_io_counters = self.io_counters
|
|
self.prev_time = self.time
|
|
# On-demand would only benefit if_addrs:
|
|
# stats are used to determine display,
|
|
# and we want to keep previous io_counters
|
|
# so displaying stats is ~instant.
|
|
self.time = loop.time()
|
|
self.if_stats = psutil.net_if_stats()
|
|
self.if_addrs = psutil.net_if_addrs()
|
|
self.io_counters = psutil.net_io_counters(pernic=True)
|
|
|
|
await self.updateSections(set(self.if_stats.keys()), self.module)
|
|
|
|
|
|
class TimeProvider(PeriodicStatefulProvider):
|
|
FORMATS = ["%H:%M", "%m-%d %H:%M:%S", "%a %y-%m-%d %H:%M:%S"]
|
|
|
|
async def init(self) -> None:
|
|
self.section.state = 1
|
|
self.section.numberStates = len(self.FORMATS)
|
|
|
|
async def loop(self) -> None:
|
|
now = datetime.datetime.now()
|
|
format = self.FORMATS[self.section.state]
|
|
self.section.setText(now.strftime(format))
|
|
|
|
|
|
async def main() -> None:
|
|
FROGARIZED = [
|
|
"#092c0e",
|
|
"#143718",
|
|
"#5a7058",
|
|
"#677d64",
|
|
"#89947f",
|
|
"#99a08d",
|
|
"#fae2e3",
|
|
"#fff0f1",
|
|
"#e0332e",
|
|
"#cf4b15",
|
|
"#bb8801",
|
|
"#8d9800",
|
|
"#1fa198",
|
|
"#008dd1",
|
|
"#5c73c4",
|
|
"#d43982",
|
|
]
|
|
# TODO Configurable
|
|
# TODO Not super happy with the color management,
|
|
# while using an existing library is great, it's limited to ANSI colors
|
|
|
|
def base16_color(color: int) -> tuple[int, int, int]:
|
|
hexa = FROGARIZED[color]
|
|
return tuple(rich.color.parse_rgb_hex(hexa[1:]))
|
|
|
|
theme = rich.terminal_theme.TerminalTheme(
|
|
base16_color(0x0),
|
|
base16_color(0x0), # TODO should be 7, currently 0 so it's compatible with v2
|
|
[
|
|
base16_color(0x0), # black
|
|
base16_color(0x8), # red
|
|
base16_color(0xB), # green
|
|
base16_color(0xA), # yellow
|
|
base16_color(0xD), # blue
|
|
base16_color(0xE), # magenta
|
|
base16_color(0xC), # cyan
|
|
base16_color(0x5), # white
|
|
],
|
|
[
|
|
base16_color(0x3), # bright black
|
|
base16_color(0x8), # bright red
|
|
base16_color(0xB), # bright green
|
|
base16_color(0xA), # bright yellow
|
|
base16_color(0xD), # bright blue
|
|
base16_color(0xE), # bright magenta
|
|
base16_color(0xC), # bright cyan
|
|
base16_color(0x7), # bright white
|
|
],
|
|
)
|
|
|
|
bar = Bar(theme=theme)
|
|
dualScreen = len(bar.children) > 1
|
|
leftPreferred = 0 if dualScreen else None
|
|
rightPreferred = 1 if dualScreen else None
|
|
|
|
color = rich.color.Color.parse
|
|
|
|
bar.addProvider(I3ModeProvider(color=color("red")), alignment=Alignment.LEFT)
|
|
bar.addProvider(I3WorkspacesProvider(), alignment=Alignment.LEFT)
|
|
if dualScreen:
|
|
bar.addProvider(
|
|
I3WindowTitleProvider(color=color("white")),
|
|
screenNum=0,
|
|
alignment=Alignment.CENTER,
|
|
)
|
|
bar.addProvider(
|
|
MprisProvider(color=color("bright_white")),
|
|
screenNum=rightPreferred,
|
|
alignment=Alignment.CENTER,
|
|
)
|
|
|
|
bar.addProvider(CpuProvider(), screenNum=leftPreferred, alignment=Alignment.RIGHT)
|
|
bar.addProvider(LoadProvider(), screenNum=leftPreferred, alignment=Alignment.RIGHT)
|
|
bar.addProvider(RamProvider(), screenNum=leftPreferred, alignment=Alignment.RIGHT)
|
|
bar.addProvider(
|
|
TemperatureProvider(),
|
|
screenNum=leftPreferred,
|
|
alignment=Alignment.RIGHT,
|
|
)
|
|
bar.addProvider(
|
|
BatteryProvider(), screenNum=leftPreferred, alignment=Alignment.RIGHT
|
|
)
|
|
bar.addProvider(
|
|
PulseaudioProvider(color=color("magenta")),
|
|
screenNum=rightPreferred,
|
|
alignment=Alignment.RIGHT,
|
|
)
|
|
bar.addProvider(
|
|
NetworkProvider(color=color("blue")),
|
|
screenNum=leftPreferred,
|
|
alignment=Alignment.RIGHT,
|
|
)
|
|
bar.addProvider(TimeProvider(color=color("cyan")), alignment=Alignment.RIGHT)
|
|
|
|
await bar.run()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# Using GLib's event loop so we can run GLib's code
|
|
policy = gi.events.GLibEventLoopPolicy()
|
|
asyncio.set_event_loop_policy(policy)
|
|
loop = policy.get_event_loop()
|
|
loop.run_until_complete(main())
|