diff --git a/hm/desktop/frobar/.dev/new.py b/hm/desktop/frobar/.dev/new.py index 0e33ad2..36ee2ec 100644 --- a/hm/desktop/frobar/.dev/new.py +++ b/hm/desktop/frobar/.dev/new.py @@ -11,14 +11,45 @@ import typing import coloredlogs import i3ipc import i3ipc.aio +import psutil coloredlogs.install(level="DEBUG", fmt="%(levelname)s %(message)s") log = logging.getLogger() T = typing.TypeVar("T", bound="ComposableText") +P = typing.TypeVar("P", bound="ComposableText") +C = typing.TypeVar("C", bound="ComposableText") +Sortable = str | int -class ComposableText: +class ComposableText(typing.Generic[P, C]): + + def __init__( + self, + parent: typing.Optional[P] = None, + sortKey: Sortable = 0, + ) -> None: + self.parent: typing.Optional[P] = None + self.children: typing.MutableSequence[C] = list() + self.sortKey = sortKey + if parent: + self.setParent(parent) + self.bar = self.getFirstParentOfType(Bar) + + def setParent(self, parent: P) -> None: + assert self.parent is None + parent.children.append(self) + assert isinstance(parent.children, list) + parent.children.sort(key=lambda c: c.sortKey) + self.parent = parent + self.parent.updateMarkup() + + def unsetParent(self) -> None: + assert self.parent + self.parent.children.remove(self) + self.parent.updateMarkup() + self.parent = None + def getFirstParentOfType(self, typ: typing.Type[T]) -> T: parent = self while not isinstance(parent, typ): @@ -26,10 +57,6 @@ class ComposableText: parent = parent.parent return parent - def __init__(self, parent: typing.Optional["ComposableText"] = None) -> None: - self.parent = parent - self.bar = self.getFirstParentOfType(Bar) - def updateMarkup(self) -> None: self.bar.refresh.set() # TODO OPTI See if worth caching the output @@ -60,8 +87,8 @@ class Section(ComposableText): Colorable block separated by chevrons """ - def __init__(self, parent: "Module") -> None: - super().__init__(parent=parent) + def __init__(self, parent: "Module", sortKey: Sortable = 0) -> None: + super().__init__(parent=parent, sortKey=sortKey) self.parent: "Module" self.color = randomColor() @@ -146,8 +173,8 @@ class Module(ComposableText): def __init__(self, parent: "Side") -> None: super().__init__(parent=parent) self.parent: "Side" + self.children: typing.MutableSequence[Section] - self.sections: list[Section] = [] self.mirroring: Module | None = None self.mirrors: list[Module] = list() @@ -155,11 +182,11 @@ class Module(ComposableText): self.mirroring = module module.mirrors.append(self) - def getSections(self) -> list[Section]: + def getSections(self) -> typing.Sequence[Section]: if self.mirroring: - return self.mirroring.sections + return self.mirroring.children else: - return self.sections + return self.children def updateMarkup(self) -> None: super().updateMarkup() @@ -177,16 +204,16 @@ class Side(ComposableText): def __init__(self, parent: "Screen", alignment: Alignment) -> None: super().__init__(parent=parent) self.parent: Screen + self.children: typing.MutableSequence[Module] = [] self.alignment = alignment - self.modules: list[Module] = [] def generateMarkup(self) -> str: - if not self.modules: + if not self.children: return "" text = "%{" + self.alignment.value + "}" lastSection: Section | None = None - for module in self.modules: + for module in self.children: for section in module.getSections(): if section.isHidden(): continue @@ -218,16 +245,16 @@ class Screen(ComposableText): def __init__(self, parent: "Bar", output: str) -> None: super().__init__(parent=parent) self.parent: "Bar" + self.children: typing.MutableSequence[Side] self.output = output - self.sides = dict() for alignment in Alignment: - self.sides[alignment] = Side(parent=self, alignment=alignment) + Side(parent=self, alignment=alignment) def generateMarkup(self) -> str: return ("%{Sn" + self.output + "}") + "".join( - side.getMarkup() for side in self.sides.values() + side.getMarkup() for side in self.children ) @@ -238,19 +265,20 @@ class Bar(ComposableText): def __init__(self) -> None: super().__init__() + self.parent: None + self.children: typing.MutableSequence[Screen] + self.refresh = asyncio.Event() self.taskGroup = asyncio.TaskGroup() self.providers: list["Provider"] = list() self.actionIndex = 0 self.actions: dict[str, typing.Callable] = dict() - self.screens = [] i3 = i3ipc.Connection() for output in i3.get_outputs(): if not output.active: continue - screen = Screen(parent=self, output=output.name) - self.screens.append(screen) + Screen(parent=self, output=output.name) async def run(self) -> None: cmd = [ @@ -303,7 +331,7 @@ class Bar(ComposableText): loop.add_signal_handler(signal.SIGINT, exit) def generateMarkup(self) -> str: - return "".join(section.getMarkup() for section in self.screens) + "\n" + return "".join(screen.getMarkup() for screen in self.children) + "\n" def addProvider( self, @@ -315,11 +343,10 @@ class Bar(ComposableText): screenNum: the provider will be added on this screen if set, all otherwise """ modules = list() - for s, screen in enumerate(self.screens): + for s, screen in enumerate(self.children): if screenNum is None or s == screenNum: - side = screen.sides[alignment] + side = next(filter(lambda s: s.alignment == alignment, screen.children)) module = Module(parent=side) - side.modules.append(module) modules.append(module) provider.modules = modules if modules: @@ -355,14 +382,11 @@ class MirrorProvider(Provider): class SingleSectionProvider(MirrorProvider): - def __init__(self) -> None: - super().__init__() - self.section: Section + SECTION_CLASS = Section async def run(self) -> None: await super().run() - self.section = Section(parent=self.module) - self.module.sections.append(self.section) + self.section = self.SECTION_CLASS(parent=self.module) class StaticProvider(SingleSectionProvider): @@ -374,15 +398,17 @@ class StaticProvider(SingleSectionProvider): self.section.setText(self.text) -class StatefulProvider(SingleSectionProvider): - # TODO Should actually be a Section descendant - NUMBER_STATES: int +class StatefulSection(Section): - def __init__(self) -> None: - super().__init__() + def __init__(self, parent: Module, sortKey: Sortable = 0) -> None: + super().__init__(parent=parent, sortKey=sortKey) self.state = 0 + self.numberStates: int self.stateChanged = asyncio.Event() + self.setAction(Button.CLICK_LEFT, self.incrementState) + self.setAction(Button.CLICK_RIGHT, self.decrementState) + def incrementState(self) -> None: self.state += 1 self.changeState() @@ -392,14 +418,13 @@ class StatefulProvider(SingleSectionProvider): self.changeState() def changeState(self) -> None: - self.state %= self.NUMBER_STATES + self.state %= self.numberStates self.stateChanged.set() self.stateChanged.clear() - async def run(self) -> None: - await super().run() - self.section.setAction(Button.CLICK_LEFT, self.incrementState) - self.section.setAction(Button.CLICK_RIGHT, self.decrementState) + +class StatefulProvider(SingleSectionProvider): + SECTION_CLASS = StatefulSection # Providers @@ -444,18 +469,14 @@ class I3WorkspacesProvider(Provider): workspaces = await i3.get_workspaces() for workspace in workspaces: module = self.modulesFromOutput[workspace.output] - insert = False if workspace.num in self.sections: section = self.sections[workspace.num] if section.parent != module: - section.parent.sections.remove(section) - section.parent = module - section.updateMarkup() - insert = True + section.unsetParent() + section.setParent(module) else: - section = Section(parent=module) + section = Section(parent=module, sortKey=workspace.num) self.sections[workspace.num] = section - insert = True def generate_switch_workspace(num: int) -> typing.Callable: def switch_workspace() -> None: @@ -468,10 +489,6 @@ class I3WorkspacesProvider(Provider): section.setAction( Button.CLICK_LEFT, generate_switch_workspace(workspace.num) ) - if insert: - module.sections.append(section) - revSections = dict((v, k) for k, v in self.sections.items()) - module.sections.sort(key=lambda s: revSections[s]) name = workspace.name if workspace.urgent: name = f"{name} !" @@ -514,29 +531,101 @@ class I3WorkspacesProvider(Provider): await i3.main() -class TimeProvider(StatefulProvider): - FORMATS = ["%H:%M", "%m-%d %H:%M:%S", "%a %y-%m-%d %H:%M:%S"] - NUMBER_STATES = len(FORMATS) +class NetworkProviderSection(StatefulSection): + def __init__(self, parent: Module, iface: str, provider: "NetworkProvider") -> None: + super().__init__(parent=parent, sortKey=iface) + self.iface = iface + self.provider = provider + + self.ignore = False + self.icon = "?" + self.wifi = False + if iface == "lo": + self.ignore = True + elif iface.startswith("eth") or iface.startswith("enp"): + if "u" in iface: + self.icon = "" + else: + self.icon = "" + elif iface.startswith("wlan") or iface.startswith("wl"): + self.icon = "" + self.wifi = True + elif ( + iface.startswith("tun") or iface.startswith("tap") or iface.startswith("wg") + ): + self.icon = "" + elif iface.startswith("docker"): + self.icon = "" + elif iface.startswith("veth"): + self.icon = "" + elif iface.startswith("vboxnet"): + self.icon = "" + self.numberStates = 5 if self.wifi else 4 + self.state = 1 if self.wifi else 0 + + async def getText(self) -> str | None: + if self.ignore or not self.provider.if_stats[self.iface].isup: + return None + + text = self.icon + return text + + +class NetworkProvider(MirrorProvider): + def __init__(self) -> None: + self.sections: dict[str, NetworkProviderSection] = dict() + + async def updateIface(self, iface: str) -> None: + section = self.sections[iface] + section.setText(await section.getText()) async def run(self) -> None: await super().run() - self.state = 1 + + while True: + # if_addrs: dict[str, list[psutil._common.snicaddr]] = psutil.net_if_addrs() + # io_counters: dict[str, psutil._common.snetio] = psutil.net_io_counters(pernic=True) + + async with asyncio.TaskGroup() as tg: + self.if_stats = psutil.net_if_stats() + for iface in self.if_stats: + if iface not in self.sections: + section = NetworkProviderSection( + parent=self.module, iface=iface, provider=self + ) + self.sections[iface] = section + + tg.create_task(self.updateIface(iface)) + for iface, section in self.sections.items(): + if iface not in self.if_stats: + section.setText(None) + tg.create_task(asyncio.sleep(1)) + + +class TimeProvider(StatefulProvider): + FORMATS = ["%H:%M", "%m-%d %H:%M:%S", "%a %y-%m-%d %H:%M:%S"] + + async def run(self) -> None: + await super().run() + assert isinstance(self.section, StatefulSection) + self.section.state = 1 + self.section.numberStates = len(self.FORMATS) while True: now = datetime.datetime.now() - format = self.FORMATS[self.state] + format = self.FORMATS[self.section.state] self.section.setText(now.strftime(format)) remaining = 1 - now.microsecond / 1000000 try: - await asyncio.wait_for(self.stateChanged.wait(), remaining) + await asyncio.wait_for(self.section.stateChanged.wait(), remaining) except TimeoutError: pass async def main() -> None: bar = Bar() - dualScreen = len(bar.screens) > 1 + dualScreen = len(bar.children) > 1 bar.addProvider(I3ModeProvider(), alignment=Alignment.LEFT) bar.addProvider(I3WorkspacesProvider(), alignment=Alignment.LEFT) @@ -557,7 +646,7 @@ async def main() -> None: alignment=Alignment.RIGHT, ) bar.addProvider( - StaticProvider("network"), + NetworkProvider(), screenNum=0 if dualScreen else None, alignment=Alignment.RIGHT, )