diff --git a/hm/desktop/frobar/.dev/new.py b/hm/desktop/frobar/.dev/new.py index d5b74b8..31a1f09 100644 --- a/hm/desktop/frobar/.dev/new.py +++ b/hm/desktop/frobar/.dev/new.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 import asyncio +import collections import datetime import enum import ipaddress @@ -411,6 +412,8 @@ class Bar(ComposableText): class Provider: + sectionType: type[Section] = Section + def __init__(self, color: rich.color.Color = rich.color.Color.default()) -> None: self.modules: list[Module] = list() self.color = color @@ -435,7 +438,7 @@ class MirrorProvider(Provider): class SingleSectionProvider(MirrorProvider): async def run(self) -> None: await super().run() - self.section = Section(parent=self.module, color=self.color) + self.section = self.sectionType(parent=self.module, color=self.color) class StaticProvider(SingleSectionProvider): @@ -481,10 +484,46 @@ class StatefulSection(Section): self.bar.taskGroup.create_task(self.callback()) -class SingleStatefulSectionProvider(MirrorProvider): - async def run(self) -> None: - await super().run() - self.section = StatefulSection(parent=self.module, color=self.color) +class StatefulSectionProvider(Provider): + sectionType = StatefulSection + + +class SingleStatefulSectionProvider(StatefulSectionProvider, SingleSectionProvider): + section: StatefulSection + + +class MultiSectionsProvider(Provider): + + def __init__(self, color: rich.color.Color = rich.color.Color.default()) -> None: + super().__init__(color=color) + self.sectionKeys: dict[Module, dict[Sortable, Section]] = ( + collections.defaultdict(dict) + ) + self.updaters: dict[Section, typing.Callable] = dict() + + async def getSectionUpdater(self, section: Section) -> typing.Callable: + raise NotImplementedError() + + async def updateSections(self, sections: set[Sortable], module: Module) -> None: + moduleSections = self.sectionKeys[module] + async with asyncio.TaskGroup() as tg: + for sortKey in sections: + section = moduleSections.get(sortKey) + if not section: + section = self.sectionType( + parent=module, sortKey=sortKey, color=self.color + ) + self.updaters[section] = await self.getSectionUpdater(section) + moduleSections[sortKey] = section + + updater = self.updaters[section] + tg.create_task(updater()) + + missingKeys = set(moduleSections.keys()) - sections + for missingKey in missingKeys: + section = moduleSections.get(missingKey) + assert section + section.setText(None) class PeriodicProvider(Provider): @@ -543,6 +582,8 @@ class I3ModeProvider(SingleSectionProvider): i3.on(i3ipc.Event.MODE, self.on_mode) await i3.main() + # TODO Hide WorkspaceProvider when this is active + class I3WindowTitleProvider(SingleSectionProvider): # TODO FEAT To make this available from start, we need to find the @@ -557,44 +598,34 @@ class I3WindowTitleProvider(SingleSectionProvider): await i3.main() -class I3WorkspacesProvider(Provider): +class I3WorkspacesProvider(MultiSectionsProvider): COLOR_URGENT = rich.color.Color.parse("red") COLOR_FOCUSED = rich.color.Color.parse("yellow") # TODO Should be orange (not a terminal color) - COLOR_VISIBLE = rich.color.Color.parse("blue") + COLOR_VISIBLE = rich.color.Color.parse("cyan") COLOR_DEFAULT = rich.color.Color.parse("bright_black") - async def updateWorkspaces(self, i3: i3ipc.Connection) -> None: - """ - Since the i3 IPC interface cannot really tell you by events - when workspaces get invisible or not urgent anymore. - Relying on those exclusively would require reimplementing some of i3 logic. - Fetching all the workspaces on event looks ugly but is the most maintainable. - Times I tried to challenge this and failed: 2. - """ - workspaces = await i3.get_workspaces() - for workspace in workspaces: - module = self.modulesFromOutput[workspace.output] - if workspace.num in self.sections: - section = self.sections[workspace.num] - if section.parent != module: - section.unsetParent() - section.setParent(module) - else: - section = Section(parent=module, sortKey=workspace.num) - self.sections[workspace.num] = section + def __init__( + self, + ) -> None: + super().__init__() + self.workspaces: dict[int, i3ipc.WorkspaceReply] - def generate_switch_workspace(num: int) -> typing.Callable: - def switch_workspace() -> None: - self.bar.taskGroup.create_task( - i3.command(f"workspace number {num}") - ) + self.sections: dict[int, Section] = dict() + self.modulesFromOutput: dict[str, Module] = dict() + self.bar: Bar - return switch_workspace + async def getSectionUpdater(self, section: Section) -> typing.Callable: + assert isinstance(section.sortKey, int) + num = section.sortKey - section.setAction( - Button.CLICK_LEFT, generate_switch_workspace(workspace.num) - ) + def switch_to_workspace() -> None: + self.bar.taskGroup.create_task(self.i3.command(f"workspace number {num}")) + + section.setAction(Button.CLICK_LEFT, switch_to_workspace) + + async def update() -> None: + workspace = self.workspaces[num] name = workspace.name if workspace.urgent: section.color = self.COLOR_URGENT @@ -607,26 +638,34 @@ class I3WorkspacesProvider(Provider): if workspace.focused or workspace.visible: name = f"{name} X" # TODO Custom names section.setText(name) - workspacesNums = set(workspace.num for workspace in workspaces) - for num, section in self.sections.items(): - if num not in workspacesNums: - # This should delete the Section but it turned out to be hard - section.setText(None) + + return update + + async def updateWorkspaces(self) -> None: + """ + Since the i3 IPC interface cannot really tell you by events + when workspaces get invisible or not urgent anymore. + Relying on those exclusively would require reimplementing some of i3 logic. + Fetching all the workspaces on event looks ugly but is the most maintainable. + Times I tried to challenge this and failed: 2. + """ + workspaces = await self.i3.get_workspaces() + self.workspaces = dict() + modules = collections.defaultdict(set) + for workspace in workspaces: + self.workspaces[workspace.num] = workspace + module = self.modulesFromOutput[workspace.output] + modules[module].add(workspace.num) + + await asyncio.gather( + *[self.updateSections(nums, module) for module, nums in modules.items()] + ) def onWorkspaceChange( self, i3: i3ipc.Connection, e: i3ipc.Event | None = None ) -> None: # Cancelling the task doesn't seem to prevent performance double-events - self.bar.taskGroup.create_task(self.updateWorkspaces(i3)) - - def __init__( - self, - ) -> None: - super().__init__() - - self.sections: dict[int, Section] = dict() - self.modulesFromOutput: dict[str, Module] = dict() - self.bar: Bar + self.bar.taskGroup.create_task(self.updateWorkspaces()) async def run(self) -> None: for module in self.modules: @@ -635,10 +674,10 @@ class I3WorkspacesProvider(Provider): self.modulesFromOutput[output] = module self.bar = module.bar - i3 = await i3ipc.aio.Connection(auto_reconnect=True).connect() - i3.on(i3ipc.Event.WORKSPACE, self.onWorkspaceChange) - self.onWorkspaceChange(i3) - await i3.main() + self.i3 = await i3ipc.aio.Connection(auto_reconnect=True).connect() + self.i3.on(i3ipc.Event.WORKSPACE, self.onWorkspaceChange) + self.onWorkspaceChange(self.i3) + await self.i3.main() class AlertingProvider(Provider): @@ -785,166 +824,188 @@ class BatteryProvider(AlertingProvider, PeriodicStatefulProvider): self.section.setText(text) -class PulseaudioProvider(SingleSectionProvider): - async def update(self) -> None: - async with pulsectl_asyncio.PulseAsync("frobar-updater") as pulse: - text = "" - # TODO Sections - for sink in await pulse.sink_list(): - log.debug(f"{sink}") - if ( - sink.port_active.name == "analog-output-headphones" - or sink.port_active.description == "Headphones" - ): - icon = "" - elif ( - sink.port_active.name == "analog-output-speaker" - or sink.port_active.description == "Speaker" - ): - icon = "" if sink.mute else "" - elif sink.port_active.name in ("headset-output", "headphone-output"): - icon = "" - else: - icon = "?" - vol = await pulse.volume_get_all_chans(sink) - fg = (sink.mute and "#333333") or (vol > 1 and "#FF0000") or None - # TODO Show which is default +class PulseaudioProvider( + MirrorProvider, StatefulSectionProvider, MultiSectionsProvider +): + async def getSectionUpdater(self, section: Section) -> typing.Callable: + assert isinstance(section, StatefulSection) + assert isinstance(section.sortKey, str) - text += f" {icon} {vol:.0%}" - self.section.setText(text) + sink = self.sinks[section.sortKey] + + if ( + sink.port_active.name == "analog-output-headphones" + or sink.port_active.description == "Headphones" + ): + icon = "" + elif ( + sink.port_active.name == "analog-output-speaker" + or sink.port_active.description == "Speaker" + ): + icon = "" + elif sink.port_active.name in ("headset-output", "headphone-output"): + icon = "" + else: + icon = "?" + + section.numberStates = 3 + section.state = 1 + + # TODO Change volume with wheel + + async def updater() -> None: + assert isinstance(section, StatefulSection) + text = icon + sink = self.sinks[section.sortKey] + + async with pulsectl_asyncio.PulseAsync("frobar-get-volume") as pulse: + vol = await pulse.volume_get_all_chans(sink) + if section.state == 1: + text += f" {ramp(vol)}" + elif section.state == 2: + text += f" {vol:.0%}" + # TODO Show which is default + section.setText(text) + + section.setChangedState(updater) + + return updater + + async def update(self) -> None: + async with pulsectl_asyncio.PulseAsync("frobar-list-sinks") as pulse: + self.sinks = dict((sink.name, sink) for sink in await pulse.sink_list()) + await self.updateSections(set(self.sinks.keys()), self.module) async def run(self) -> None: await super().run() await self.update() - async with pulsectl_asyncio.PulseAsync("frobar-events") as pulse: + async with pulsectl_asyncio.PulseAsync("frobar-events-listener") as pulse: async for event in pulse.subscribe_events(pulsectl.PulseEventMaskEnum.sink): await self.update() -class NetworkProviderSection(StatefulSection): - def __init__( - self, - parent: Module, - iface: str, - provider: "NetworkProvider", - ) -> None: - super().__init__(parent=parent, sortKey=iface, color=provider.color) - self.iface = iface - self.provider = provider - - self.ignore = False - self.icon = "?" - self.wifi = False - if iface == "lo": - self.ignore = True - elif iface.startswith("eth") or iface.startswith("enp"): - if "u" in iface: - self.icon = "" - else: - self.icon = "" - elif iface.startswith("wlan") or iface.startswith("wl"): - self.icon = "" - self.wifi = True - elif ( - iface.startswith("tun") or iface.startswith("tap") or iface.startswith("wg") - ): - self.icon = "" - - elif iface.startswith("docker"): - self.icon = "" - elif iface.startswith("veth"): - self.icon = "" - elif iface.startswith("vboxnet"): - self.icon = "" - self.numberStates = 5 if self.wifi else 4 - self.state = 1 if self.wifi else 0 - - self.setChangedState(self.update) - - async def update(self) -> None: - if self.ignore or not self.provider.if_stats[self.iface].isup: - self.setText(None) - return - text = self.icon - - state = self.state + (0 if self.wifi else 1) # SSID - if self.wifi and state >= 1: - cmd = ["iwgetid", self.iface, "--raw"] - proc = await asyncio.create_subprocess_exec( - *cmd, stdout=asyncio.subprocess.PIPE - ) - stdout, stderr = await proc.communicate() - text += f" {stdout.decode().strip()}" - - if state >= 2: # Address - for address in self.provider.if_addrs[self.iface]: - if address.family == socket.AF_INET: - net = ipaddress.IPv4Network( - (address.address, address.netmask), strict=False - ) - text += f" {address.address}/{net.prefixlen}" - break - - if state >= 3: # Speed - prevRecv = self.provider.prev_io_counters[self.iface].bytes_recv - recv = self.provider.io_counters[self.iface].bytes_recv - prevSent = self.provider.prev_io_counters[self.iface].bytes_sent - sent = self.provider.io_counters[self.iface].bytes_sent - dt = self.provider.time - self.provider.prev_time - - recvDiff = (recv - prevRecv) / dt - sentDiff = (sent - prevSent) / dt - text += f" ↓{humanSize(recvDiff)}↑{humanSize(sentDiff)}" - - if state >= 4: # Counter - text += f" ⇓{humanSize(recv)}⇑{humanSize(sent)}" - - self.setText(text) - - -class NetworkProvider(MirrorProvider, PeriodicProvider): +class NetworkProvider( + MirrorProvider, PeriodicProvider, StatefulSectionProvider, MultiSectionsProvider +): def __init__( self, color: rich.color.Color = rich.color.Color.default(), ) -> None: super().__init__(color=color) - self.sections: dict[str, NetworkProviderSection] = dict() async def init(self) -> None: loop = asyncio.get_running_loop() self.time = loop.time() self.io_counters = psutil.net_io_counters(pernic=True) + async def doNothing(self) -> None: + pass + + @staticmethod + def getIfaceAttributes(iface: str) -> tuple[bool, str, bool]: + relevant = True + icon = "?" + wifi = False + if iface == "lo": + relevant = False + elif iface.startswith("eth") or iface.startswith("enp"): + if "u" in iface: + icon = "" + else: + icon = "" + elif iface.startswith("wlan") or iface.startswith("wl"): + icon = "" + wifi = True + elif ( + iface.startswith("tun") or iface.startswith("tap") or iface.startswith("wg") + ): + icon = "" + + elif iface.startswith("docker"): + icon = "" + elif iface.startswith("veth"): + icon = "" + elif iface.startswith("vboxnet"): + icon = "" + + return relevant, icon, wifi + + async def getSectionUpdater(self, section: Section) -> typing.Callable: + + assert isinstance(section, StatefulSection) + assert isinstance(section.sortKey, str) + iface = section.sortKey + + relevant, icon, wifi = self.getIfaceAttributes(iface) + + if not relevant: + return self.doNothing + + section.numberStates = 5 if wifi else 4 + section.state = 1 if wifi else 0 + + async def update() -> None: + assert isinstance(section, StatefulSection) + + if not self.if_stats[iface].isup: + section.setText(None) + return + + text = icon + + state = section.state + (0 if wifi else 1) + if wifi and state >= 1: # SSID + cmd = ["iwgetid", iface, "--raw"] + proc = await asyncio.create_subprocess_exec( + *cmd, stdout=asyncio.subprocess.PIPE + ) + stdout, stderr = await proc.communicate() + text += f" {stdout.decode().strip()}" + + if state >= 2: # Address + for address in self.if_addrs[iface]: + if address.family == socket.AF_INET: + net = ipaddress.IPv4Network( + (address.address, address.netmask), strict=False + ) + text += f" {address.address}/{net.prefixlen}" + break + + if state >= 3: # Speed + prevRecv = self.prev_io_counters[iface].bytes_recv + recv = self.io_counters[iface].bytes_recv + prevSent = self.prev_io_counters[iface].bytes_sent + sent = self.io_counters[iface].bytes_sent + dt = self.time - self.prev_time + + recvDiff = (recv - prevRecv) / dt + sentDiff = (sent - prevSent) / dt + text += f" ↓{humanSize(recvDiff)}↑{humanSize(sentDiff)}" + + if state >= 4: # Counter + text += f" ⇓{humanSize(recv)}⇑{humanSize(sent)}" + + section.setText(text) + + section.setChangedState(update) + + return update + async def loop(self) -> None: loop = asyncio.get_running_loop() - async with asyncio.TaskGroup() as tg: - self.prev_io_counters = self.io_counters - self.prev_time = self.time - # On-demand would only benefit if_addrs: - # stats are used to determine display, - # and we want to keep previous io_counters - # so displaying stats is ~instant. - self.time = loop.time() - self.if_stats = psutil.net_if_stats() - self.if_addrs = psutil.net_if_addrs() - self.io_counters = psutil.net_io_counters(pernic=True) - for iface in self.if_stats: - section = self.sections.get(iface) - if not section: - section = NetworkProviderSection( - parent=self.module, iface=iface, provider=self - ) - self.sections[iface] = section + self.prev_io_counters = self.io_counters + self.prev_time = self.time + # On-demand would only benefit if_addrs: + # stats are used to determine display, + # and we want to keep previous io_counters + # so displaying stats is ~instant. + self.time = loop.time() + self.if_stats = psutil.net_if_stats() + self.if_addrs = psutil.net_if_addrs() + self.io_counters = psutil.net_io_counters(pernic=True) - tg.create_task(section.update()) - for iface, section in self.sections.items(): - if iface not in self.if_stats: - section.setText(None) - - async def onStateChange(self, section: StatefulSection) -> None: - assert isinstance(section, NetworkProviderSection) - await section.update() + await self.updateSections(set(self.if_stats.keys()), self.module) class TimeProvider(PeriodicStatefulProvider):