dotfiles/hm/desktop/frobar/.dev/new.py

659 lines
20 KiB
Python
Raw Normal View History

2024-08-13 01:58:51 +02:00
#!/usr/bin/env python3
import asyncio
import datetime
import enum
2024-08-17 00:57:06 +02:00
import logging
2024-08-13 01:58:51 +02:00
import random
2024-08-14 03:29:34 +02:00
import signal
2024-08-13 01:58:51 +02:00
import typing
2024-08-17 00:57:06 +02:00
import coloredlogs
2024-08-13 01:58:51 +02:00
import i3ipc
2024-08-17 00:57:06 +02:00
import i3ipc.aio
2024-08-25 09:48:36 +02:00
import psutil
2024-08-17 00:57:06 +02:00
coloredlogs.install(level="DEBUG", fmt="%(levelname)s %(message)s")
log = logging.getLogger()
T = typing.TypeVar("T", bound="ComposableText")
2024-08-25 09:48:36 +02:00
P = typing.TypeVar("P", bound="ComposableText")
C = typing.TypeVar("C", bound="ComposableText")
Sortable = str | int
2024-08-13 01:58:51 +02:00
2024-08-25 09:48:36 +02:00
class ComposableText(typing.Generic[P, C]):
def __init__(
self,
parent: typing.Optional[P] = None,
sortKey: Sortable = 0,
) -> None:
self.parent: typing.Optional[P] = None
self.children: typing.MutableSequence[C] = list()
self.sortKey = sortKey
if parent:
self.setParent(parent)
self.bar = self.getFirstParentOfType(Bar)
def setParent(self, parent: P) -> None:
assert self.parent is None
parent.children.append(self)
assert isinstance(parent.children, list)
parent.children.sort(key=lambda c: c.sortKey)
self.parent = parent
self.parent.updateMarkup()
def unsetParent(self) -> None:
assert self.parent
self.parent.children.remove(self)
self.parent.updateMarkup()
self.parent = None
2024-08-17 00:57:06 +02:00
def getFirstParentOfType(self, typ: typing.Type[T]) -> T:
parent = self
while not isinstance(parent, typ):
assert parent.parent, f"{self} doesn't have a parent of {typ}"
parent = parent.parent
return parent
2024-08-14 00:28:17 +02:00
def updateMarkup(self) -> None:
2024-08-13 01:58:51 +02:00
self.bar.refresh.set()
2024-08-17 00:57:06 +02:00
# TODO OPTI See if worth caching the output
2024-08-14 00:28:17 +02:00
def generateMarkup(self) -> str:
raise NotImplementedError(f"{self} cannot generate markup")
def getMarkup(self) -> str:
return self.generateMarkup()
2024-08-13 01:58:51 +02:00
def randomColor(seed: int | bytes | None = None) -> str:
if seed is not None:
random.seed(seed)
return "#" + "".join(f"{random.randint(0, 0xff):02x}" for _ in range(3))
2024-08-17 00:57:06 +02:00
class Button(enum.Enum):
CLICK_LEFT = "1"
CLICK_MIDDLE = "2"
CLICK_RIGHT = "3"
SCROLL_UP = "4"
SCROLL_DOWN = "5"
2024-08-13 01:58:51 +02:00
class Section(ComposableText):
"""
Colorable block separated by chevrons
"""
2024-08-25 09:48:36 +02:00
def __init__(self, parent: "Module", sortKey: Sortable = 0) -> None:
super().__init__(parent=parent, sortKey=sortKey)
2024-08-17 00:57:06 +02:00
self.parent: "Module"
2024-08-13 01:58:51 +02:00
self.color = randomColor()
2024-08-17 00:57:06 +02:00
self.desiredText: str | None = None
self.text = ""
self.targetSize = -1
self.size = -1
2024-08-14 00:28:17 +02:00
self.animationTask: asyncio.Task | None = None
2024-08-17 00:57:06 +02:00
self.actions: dict[Button, str] = dict()
2024-08-14 00:28:17 +02:00
def isHidden(self) -> bool:
2024-08-17 00:57:06 +02:00
return self.size < 0
2024-08-14 00:28:17 +02:00
2024-08-17 00:57:06 +02:00
# Geometric series, with a cap
2024-08-14 00:28:17 +02:00
ANIM_A = 0.025
ANIM_R = 0.9
2024-08-17 00:57:06 +02:00
ANIM_MIN = 0.001
2024-08-14 00:28:17 +02:00
async def animate(self) -> None:
2024-08-17 00:57:06 +02:00
increment = 1 if self.size < self.targetSize else -1
2024-08-14 00:28:17 +02:00
loop = asyncio.get_running_loop()
frameTime = loop.time()
animTime = self.ANIM_A
2024-08-17 00:57:06 +02:00
while self.size != self.targetSize:
2024-08-14 00:28:17 +02:00
self.size += increment
self.updateMarkup()
animTime *= self.ANIM_R
2024-08-17 00:57:06 +02:00
animTime = max(self.ANIM_MIN, animTime)
2024-08-14 00:28:17 +02:00
frameTime += animTime
sleepTime = frameTime - loop.time()
# In case of stress, skip refreshing by not awaiting
if sleepTime > 0:
await asyncio.sleep(sleepTime)
2024-08-17 00:57:06 +02:00
else:
log.warning("Skipped an animation frame")
2024-08-14 00:28:17 +02:00
def setText(self, text: str | None) -> None:
2024-08-17 00:57:06 +02:00
# OPTI Don't redraw nor reset animation if setting the same text
if self.desiredText == text:
2024-08-14 00:28:17 +02:00
return
2024-08-17 00:57:06 +02:00
self.desiredText = text
if text is None:
self.text = ""
self.targetSize = -1
else:
self.text = f" {text} "
self.targetSize = len(self.text)
if self.animationTask:
self.animationTask.cancel()
# OPTI Skip the whole animation task if not required
if self.size == self.targetSize:
2024-08-14 00:28:17 +02:00
self.updateMarkup()
else:
self.animationTask = self.bar.taskGroup.create_task(self.animate())
2024-08-17 00:57:06 +02:00
def setAction(self, button: Button, callback: typing.Callable | None) -> None:
if button in self.actions:
command = self.actions[button]
self.bar.removeAction(command)
del self.actions[button]
if callback:
command = self.bar.addAction(callback)
self.actions[button] = command
2024-08-14 00:28:17 +02:00
def generateMarkup(self) -> str:
2024-08-17 00:57:06 +02:00
assert not self.isHidden()
2024-08-14 00:28:17 +02:00
pad = max(0, self.size - len(self.text))
2024-08-17 00:57:06 +02:00
text = self.text[: self.size] + " " * pad
for button, command in self.actions.items():
text = "%{A" + button.value + ":" + command + ":}" + text + "%{A}"
return text
2024-08-13 01:58:51 +02:00
class Module(ComposableText):
"""
Sections handled by a same updater
"""
def __init__(self, parent: "Side") -> None:
super().__init__(parent=parent)
2024-08-17 00:57:06 +02:00
self.parent: "Side"
2024-08-25 09:48:36 +02:00
self.children: typing.MutableSequence[Section]
2024-08-17 00:57:06 +02:00
2024-08-14 02:45:25 +02:00
self.mirroring: Module | None = None
self.mirrors: list[Module] = list()
def mirror(self, module: "Module") -> None:
self.mirroring = module
module.mirrors.append(self)
2024-08-25 09:48:36 +02:00
def getSections(self) -> typing.Sequence[Section]:
2024-08-14 02:45:25 +02:00
if self.mirroring:
2024-08-25 09:48:36 +02:00
return self.mirroring.children
2024-08-14 02:45:25 +02:00
else:
2024-08-25 09:48:36 +02:00
return self.children
2024-08-14 02:45:25 +02:00
def updateMarkup(self) -> None:
super().updateMarkup()
for mirror in self.mirrors:
mirror.updateMarkup()
2024-08-13 01:58:51 +02:00
class Alignment(enum.Enum):
LEFT = "l"
RIGHT = "r"
CENTER = "c"
class Side(ComposableText):
def __init__(self, parent: "Screen", alignment: Alignment) -> None:
super().__init__(parent=parent)
2024-08-17 00:57:06 +02:00
self.parent: Screen
2024-08-25 09:48:36 +02:00
self.children: typing.MutableSequence[Module] = []
2024-08-17 00:57:06 +02:00
2024-08-13 01:58:51 +02:00
self.alignment = alignment
2024-08-14 00:28:17 +02:00
def generateMarkup(self) -> str:
2024-08-25 09:48:36 +02:00
if not self.children:
2024-08-13 01:58:51 +02:00
return ""
text = "%{" + self.alignment.value + "}"
lastSection: Section | None = None
2024-08-25 09:48:36 +02:00
for module in self.children:
2024-08-14 02:45:25 +02:00
for section in module.getSections():
2024-08-14 00:28:17 +02:00
if section.isHidden():
continue
2024-08-13 01:58:51 +02:00
if lastSection is None:
if self.alignment == Alignment.LEFT:
text += "%{B" + section.color + "}%{F-}"
else:
text += "%{B-}%{F" + section.color + "}%{R}%{F-}"
else:
if self.alignment == Alignment.RIGHT:
if lastSection.color == section.color:
text += ""
else:
text += "%{F" + section.color + "}%{R}"
else:
if lastSection.color == section.color:
text += ""
else:
text += "%{R}%{B" + section.color + "}"
text += "%{F-}"
2024-08-14 00:28:17 +02:00
text += section.getMarkup()
2024-08-13 01:58:51 +02:00
lastSection = section
if self.alignment != Alignment.RIGHT:
text += "%{R}%{B-}"
return text
class Screen(ComposableText):
def __init__(self, parent: "Bar", output: str) -> None:
super().__init__(parent=parent)
2024-08-17 00:57:06 +02:00
self.parent: "Bar"
2024-08-25 09:48:36 +02:00
self.children: typing.MutableSequence[Side]
2024-08-17 00:57:06 +02:00
2024-08-13 01:58:51 +02:00
self.output = output
for alignment in Alignment:
2024-08-25 09:48:36 +02:00
Side(parent=self, alignment=alignment)
2024-08-13 01:58:51 +02:00
2024-08-14 00:28:17 +02:00
def generateMarkup(self) -> str:
2024-08-13 01:58:51 +02:00
return ("%{Sn" + self.output + "}") + "".join(
2024-08-25 09:48:36 +02:00
side.getMarkup() for side in self.children
2024-08-13 01:58:51 +02:00
)
class Bar(ComposableText):
"""
Top-level
"""
def __init__(self) -> None:
super().__init__()
2024-08-25 09:48:36 +02:00
self.parent: None
self.children: typing.MutableSequence[Screen]
2024-08-13 01:58:51 +02:00
self.refresh = asyncio.Event()
2024-08-14 00:28:17 +02:00
self.taskGroup = asyncio.TaskGroup()
self.providers: list["Provider"] = list()
2024-08-17 00:57:06 +02:00
self.actionIndex = 0
self.actions: dict[str, typing.Callable] = dict()
2024-08-13 01:58:51 +02:00
i3 = i3ipc.Connection()
for output in i3.get_outputs():
if not output.active:
continue
2024-08-25 09:48:36 +02:00
Screen(parent=self, output=output.name)
2024-08-13 01:58:51 +02:00
async def run(self) -> None:
2024-08-14 02:45:25 +02:00
cmd = [
2024-08-13 01:58:51 +02:00
"lemonbar",
"-b",
"-a",
"64",
"-f",
"DejaVuSansM Nerd Font:size=10",
2024-08-14 02:45:25 +02:00
]
2024-08-17 00:57:06 +02:00
proc = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE
)
2024-08-13 01:58:51 +02:00
async def refresher() -> None:
assert proc.stdin
2024-08-17 00:57:06 +02:00
while True:
2024-08-13 01:58:51 +02:00
await self.refresh.wait()
self.refresh.clear()
2024-08-17 00:57:06 +02:00
markup = self.getMarkup()
# log.debug(markup)
proc.stdin.write(markup.encode())
async def actionHandler() -> None:
assert proc.stdout
while True:
line = await proc.stdout.readline()
command = line.decode().strip()
callback = self.actions[command]
callback()
longRunningTasks = list()
def addLongRunningTask(coro: typing.Coroutine) -> None:
task = self.taskGroup.create_task(coro)
longRunningTasks.append(task)
async with self.taskGroup:
addLongRunningTask(refresher())
addLongRunningTask(actionHandler())
2024-08-14 00:28:17 +02:00
for provider in self.providers:
2024-08-17 00:57:06 +02:00
addLongRunningTask(provider.run())
2024-08-13 01:58:51 +02:00
2024-08-14 03:29:34 +02:00
def exit() -> None:
2024-08-17 00:57:06 +02:00
log.info("Terminating")
for task in longRunningTasks:
task.cancel()
2024-08-14 03:29:34 +02:00
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGINT, exit)
2024-08-14 00:28:17 +02:00
def generateMarkup(self) -> str:
2024-08-25 09:48:36 +02:00
return "".join(screen.getMarkup() for screen in self.children) + "\n"
2024-08-13 01:58:51 +02:00
def addProvider(
self,
provider: "Provider",
alignment: Alignment = Alignment.LEFT,
2024-08-14 00:28:17 +02:00
screenNum: int | None = None,
2024-08-13 01:58:51 +02:00
) -> None:
2024-08-14 00:28:17 +02:00
"""
screenNum: the provider will be added on this screen if set, all otherwise
"""
2024-08-13 01:58:51 +02:00
modules = list()
2024-08-25 09:48:36 +02:00
for s, screen in enumerate(self.children):
2024-08-14 02:45:25 +02:00
if screenNum is None or s == screenNum:
2024-08-25 09:48:36 +02:00
side = next(filter(lambda s: s.alignment == alignment, screen.children))
2024-08-13 01:58:51 +02:00
module = Module(parent=side)
modules.append(module)
provider.modules = modules
if modules:
self.providers.append(provider)
2024-08-17 00:57:06 +02:00
def addAction(self, callback: typing.Callable) -> str:
command = f"{self.actionIndex:x}"
self.actions[command] = callback
self.actionIndex += 1
return command
def removeAction(self, command: str) -> None:
del self.actions[command]
2024-08-13 01:58:51 +02:00
class Provider:
def __init__(self) -> None:
self.modules: list[Module] = list()
async def run(self) -> None:
raise NotImplementedError()
2024-08-14 02:45:25 +02:00
class MirrorProvider(Provider):
def __init__(self) -> None:
super().__init__()
self.module: Module
2024-08-13 01:58:51 +02:00
async def run(self) -> None:
2024-08-14 02:45:25 +02:00
self.module = self.modules[0]
for module in self.modules[1:]:
module.mirror(self.module)
class SingleSectionProvider(MirrorProvider):
2024-08-25 09:48:36 +02:00
SECTION_CLASS = Section
2024-08-14 02:45:25 +02:00
async def run(self) -> None:
await super().run()
2024-08-25 09:48:36 +02:00
self.section = self.SECTION_CLASS(parent=self.module)
2024-08-14 02:45:25 +02:00
class StaticProvider(SingleSectionProvider):
def __init__(self, text: str) -> None:
self.text = text
async def run(self) -> None:
await super().run()
self.section.setText(self.text)
2024-08-25 09:48:36 +02:00
class StatefulSection(Section):
2024-08-17 00:57:06 +02:00
2024-08-25 09:48:36 +02:00
def __init__(self, parent: Module, sortKey: Sortable = 0) -> None:
super().__init__(parent=parent, sortKey=sortKey)
2024-08-17 00:57:06 +02:00
self.state = 0
2024-08-25 09:48:36 +02:00
self.numberStates: int
2024-08-17 00:57:06 +02:00
self.stateChanged = asyncio.Event()
2024-08-25 09:48:36 +02:00
self.setAction(Button.CLICK_LEFT, self.incrementState)
self.setAction(Button.CLICK_RIGHT, self.decrementState)
2024-08-17 00:57:06 +02:00
def incrementState(self) -> None:
self.state += 1
self.changeState()
def decrementState(self) -> None:
self.state -= 1
self.changeState()
def changeState(self) -> None:
2024-08-25 09:48:36 +02:00
self.state %= self.numberStates
2024-08-17 00:57:06 +02:00
self.stateChanged.set()
self.stateChanged.clear()
2024-08-25 09:48:36 +02:00
class StatefulProvider(SingleSectionProvider):
SECTION_CLASS = StatefulSection
2024-08-17 00:57:06 +02:00
# Providers
class I3ModeProvider(SingleSectionProvider):
def on_mode(self, i3: i3ipc.Connection, e: i3ipc.Event) -> None:
self.section.setText(None if e.change == "default" else e.change)
async def run(self) -> None:
await super().run()
i3 = await i3ipc.aio.Connection(auto_reconnect=True).connect()
i3.on(i3ipc.Event.MODE, self.on_mode)
await i3.main()
class I3WindowTitleProvider(SingleSectionProvider):
# TODO FEAT To make this available from start, we need to find the
# `focused=True` element following the `focus` array
def on_window(self, i3: i3ipc.Connection, e: i3ipc.Event) -> None:
self.section.setText(e.container.name)
async def run(self) -> None:
await super().run()
i3 = await i3ipc.aio.Connection(auto_reconnect=True).connect()
i3.on(i3ipc.Event.WINDOW, self.on_window)
await i3.main()
class I3WorkspacesProvider(Provider):
# FIXME Custom names
# FIXME Colors
async def updateWorkspaces(self, i3: i3ipc.Connection) -> None:
"""
Since the i3 IPC interface cannot really tell you by events
when workspaces get invisible or not urgent anymore.
Relying on those exclusively would require reimplementing some of i3 logic.
Fetching all the workspaces on event looks ugly but is the most maintainable.
Times I tried to challenge this and failed: 2.
"""
workspaces = await i3.get_workspaces()
for workspace in workspaces:
module = self.modulesFromOutput[workspace.output]
if workspace.num in self.sections:
section = self.sections[workspace.num]
if section.parent != module:
2024-08-25 09:48:36 +02:00
section.unsetParent()
section.setParent(module)
2024-08-17 00:57:06 +02:00
else:
2024-08-25 09:48:36 +02:00
section = Section(parent=module, sortKey=workspace.num)
2024-08-17 00:57:06 +02:00
self.sections[workspace.num] = section
def generate_switch_workspace(num: int) -> typing.Callable:
def switch_workspace() -> None:
self.bar.taskGroup.create_task(
i3.command(f"workspace number {num}")
)
return switch_workspace
section.setAction(
Button.CLICK_LEFT, generate_switch_workspace(workspace.num)
)
name = workspace.name
if workspace.urgent:
name = f"{name} !"
elif workspace.focused:
name = f"{name} +"
elif workspace.visible:
name = f"{name} *"
section.setText(name)
workspacesNums = set(workspace.num for workspace in workspaces)
for num, section in self.sections.items():
if num not in workspacesNums:
# This should delete the Section but it turned out to be hard
section.setText(None)
def onWorkspaceChange(
self, i3: i3ipc.Connection, e: i3ipc.Event | None = None
) -> None:
# Cancelling the task doesn't seem to prevent performance double-events
self.bar.taskGroup.create_task(self.updateWorkspaces(i3))
def __init__(
self,
) -> None:
super().__init__()
self.sections: dict[int, Section] = dict()
self.modulesFromOutput: dict[str, Module] = dict()
self.bar: Bar
async def run(self) -> None:
for module in self.modules:
screen = module.getFirstParentOfType(Screen)
output = screen.output
self.modulesFromOutput[output] = module
self.bar = module.bar
i3 = await i3ipc.aio.Connection(auto_reconnect=True).connect()
i3.on(i3ipc.Event.WORKSPACE, self.onWorkspaceChange)
self.onWorkspaceChange(i3)
await i3.main()
2024-08-25 09:48:36 +02:00
class NetworkProviderSection(StatefulSection):
def __init__(self, parent: Module, iface: str, provider: "NetworkProvider") -> None:
super().__init__(parent=parent, sortKey=iface)
self.iface = iface
self.provider = provider
self.ignore = False
self.icon = "?"
self.wifi = False
if iface == "lo":
self.ignore = True
elif iface.startswith("eth") or iface.startswith("enp"):
if "u" in iface:
self.icon = ""
else:
self.icon = ""
elif iface.startswith("wlan") or iface.startswith("wl"):
self.icon = ""
self.wifi = True
elif (
iface.startswith("tun") or iface.startswith("tap") or iface.startswith("wg")
):
self.icon = ""
elif iface.startswith("docker"):
self.icon = ""
elif iface.startswith("veth"):
self.icon = ""
elif iface.startswith("vboxnet"):
self.icon = ""
self.numberStates = 5 if self.wifi else 4
self.state = 1 if self.wifi else 0
async def getText(self) -> str | None:
if self.ignore or not self.provider.if_stats[self.iface].isup:
return None
text = self.icon
return text
class NetworkProvider(MirrorProvider):
def __init__(self) -> None:
self.sections: dict[str, NetworkProviderSection] = dict()
async def updateIface(self, iface: str) -> None:
section = self.sections[iface]
section.setText(await section.getText())
async def run(self) -> None:
await super().run()
while True:
# if_addrs: dict[str, list[psutil._common.snicaddr]] = psutil.net_if_addrs()
# io_counters: dict[str, psutil._common.snetio] = psutil.net_io_counters(pernic=True)
async with asyncio.TaskGroup() as tg:
self.if_stats = psutil.net_if_stats()
for iface in self.if_stats:
if iface not in self.sections:
section = NetworkProviderSection(
parent=self.module, iface=iface, provider=self
)
self.sections[iface] = section
tg.create_task(self.updateIface(iface))
for iface, section in self.sections.items():
if iface not in self.if_stats:
section.setText(None)
tg.create_task(asyncio.sleep(1))
2024-08-17 00:57:06 +02:00
class TimeProvider(StatefulProvider):
FORMATS = ["%H:%M", "%m-%d %H:%M:%S", "%a %y-%m-%d %H:%M:%S"]
2024-08-14 02:45:25 +02:00
async def run(self) -> None:
await super().run()
2024-08-25 09:48:36 +02:00
assert isinstance(self.section, StatefulSection)
self.section.state = 1
self.section.numberStates = len(self.FORMATS)
2024-08-13 01:58:51 +02:00
2024-08-17 00:57:06 +02:00
while True:
2024-08-14 00:28:17 +02:00
now = datetime.datetime.now()
2024-08-25 09:48:36 +02:00
format = self.FORMATS[self.section.state]
2024-08-17 00:57:06 +02:00
self.section.setText(now.strftime(format))
2024-08-14 00:28:17 +02:00
remaining = 1 - now.microsecond / 1000000
2024-08-17 00:57:06 +02:00
try:
2024-08-25 09:48:36 +02:00
await asyncio.wait_for(self.section.stateChanged.wait(), remaining)
2024-08-17 00:57:06 +02:00
except TimeoutError:
pass
2024-08-13 01:58:51 +02:00
async def main() -> None:
bar = Bar()
2024-08-25 09:48:36 +02:00
dualScreen = len(bar.children) > 1
2024-08-14 02:45:25 +02:00
2024-08-17 00:57:06 +02:00
bar.addProvider(I3ModeProvider(), alignment=Alignment.LEFT)
bar.addProvider(I3WorkspacesProvider(), alignment=Alignment.LEFT)
2024-08-14 02:45:25 +02:00
if dualScreen:
bar.addProvider(
2024-08-17 00:57:06 +02:00
I3WindowTitleProvider(), screenNum=0, alignment=Alignment.CENTER
2024-08-14 02:45:25 +02:00
)
bar.addProvider(
StaticProvider(text="mpris"),
screenNum=1 if dualScreen else None,
alignment=Alignment.CENTER,
)
bar.addProvider(StaticProvider("C L M T B"), alignment=Alignment.RIGHT)
bar.addProvider(
StaticProvider("pulse"),
screenNum=1 if dualScreen else None,
alignment=Alignment.RIGHT,
)
bar.addProvider(
2024-08-25 09:48:36 +02:00
NetworkProvider(),
2024-08-14 02:45:25 +02:00
screenNum=0 if dualScreen else None,
alignment=Alignment.RIGHT,
)
2024-08-14 00:28:17 +02:00
bar.addProvider(TimeProvider(), alignment=Alignment.RIGHT)
2024-08-14 02:45:25 +02:00
2024-08-13 01:58:51 +02:00
await bar.run()
asyncio.run(main())