From 139d3a3ea8e10f1424e940c22b79f28458c8cc6c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Geoffrey=20=E2=80=9CFrogeye=E2=80=9D=20Preud=27homme?= Date: Tue, 13 Aug 2024 01:58:51 +0200 Subject: [PATCH] frobar: Some dev --- hm/desktop/frobar/.dev/barng.py | 94 -------- hm/desktop/frobar/.dev/new.py | 241 +++++++++++++++++++ hm/desktop/frobar/.dev/oldbar.py | 199 ---------------- hm/desktop/frobar/.dev/pip.py | 327 -------------------------- hm/desktop/frobar/.dev/x.py | 10 - hm/desktop/frobar/default.nix | 5 +- hm/desktop/frobar/frobar/providers.py | 2 +- 7 files changed, 245 insertions(+), 633 deletions(-) delete mode 100755 hm/desktop/frobar/.dev/barng.py create mode 100644 hm/desktop/frobar/.dev/new.py delete mode 100755 hm/desktop/frobar/.dev/oldbar.py delete mode 100755 hm/desktop/frobar/.dev/pip.py delete mode 100755 hm/desktop/frobar/.dev/x.py diff --git a/hm/desktop/frobar/.dev/barng.py b/hm/desktop/frobar/.dev/barng.py deleted file mode 100755 index f27e9b3..0000000 --- a/hm/desktop/frobar/.dev/barng.py +++ /dev/null @@ -1,94 +0,0 @@ -#!/usr/bin/env python3 - -import typing -import subprocess -import time - -# CORE - - -class Notifier: - pass - - -class Section: - def __init__(self) -> None: - self.text = b"(Loading)" - - -class Module: - def __init__(self) -> None: - self.bar: "Bar" - self.section = Section() - self.sections = [self.section] - - -class Alignment: - def __init__(self, *modules: Module) -> None: - self.bar: "Bar" - self.modules = modules - for module in modules: - module.bar = self.bar - - -class Screen: - def __init__(self, left: Alignment = Alignment(), right: Alignment = Alignment()) -> None: - self.bar: "Bar" - self.left = left - self.left.bar = self.bar - self.right = right or Alignment() - self.right.bar = self.bar - - -class Bar: - def __init__(self, *screens: Screen) -> None: - self.screens = screens - for screen in screens: - screen.bar = self - self.process = subprocess.Popen(["lemonbar"], stdin=subprocess.PIPE) - - def display(self) -> None: - string = b"" - for s, screen in enumerate(self.screens): - string += b"%%{S%d}" % s - for control, alignment in [(b'%{l}', screen.left), (b'%{r}', screen.right)]: - string += control - for module in alignment.modules: - for section in module.sections: - string += b"<%b> |" % section.text - - string += b"\n" - print(string) - assert self.process.stdin - self.process.stdin.write(string) - self.process.stdin.flush() - - def run(self) -> None: - while True: - self.display() - time.sleep(1) - - -# REUSABLE - -class ClockNotifier(Notifier): - def run(self) -> None: - while True: - def __init__(self, text: bytes): - super().__init__() - self.section.text = text - -class StaticModule(Module): - def __init__(self, text: bytes): - super().__init__() - self.section.text = text - - -# USER - -if __name__ == "__main__": - bar = Bar( - Screen(Alignment(StaticModule(b"A"))), - Screen(Alignment(StaticModule(b"B"))), - ) - bar.run() diff --git a/hm/desktop/frobar/.dev/new.py b/hm/desktop/frobar/.dev/new.py new file mode 100644 index 0000000..9a9b859 --- /dev/null +++ b/hm/desktop/frobar/.dev/new.py @@ -0,0 +1,241 @@ +#!/usr/bin/env python3 + +import asyncio +import datetime +import enum +import random +import typing + +import i3ipc + + +class ComposableText: + def __init__(self, parent: typing.Optional["ComposableText"] = None) -> None: + self.parent = parent + + prevParent = self + while parent: + prevParent = parent + parent = parent.parent + assert isinstance(prevParent, Bar) + self.bar: Bar = prevParent + + self.text: str + self.needsComposition = True + + def composeText(self) -> str: + raise NotImplementedError(f"{self} cannot compose text") + + def getText(self) -> str: + if self.needsComposition: + self.text = self.composeText() + print(f"{self} composed {self.text}") + self.needsComposition = False + return self.text + + def setText(self, text: str) -> None: + self.text = text + self.needsComposition = False + parent = self.parent + while parent and not parent.needsComposition: + parent.needsComposition = True + parent = parent.parent + self.bar.refresh.set() + + +def randomColor(seed: int | bytes | None = None) -> str: + if seed is not None: + random.seed(seed) + return "#" + "".join(f"{random.randint(0, 0xff):02x}" for _ in range(3)) + + +class Section(ComposableText): + """ + Colorable block separated by chevrons + """ + + def __init__(self, parent: "Module") -> None: + super().__init__(parent=parent) + self.color = randomColor() + + +class Module(ComposableText): + """ + Sections handled by a same updater + """ + + def __init__(self, parent: "Side") -> None: + super().__init__(parent=parent) + self.sections: list[Section] = [] + + def appendSection(self, section: Section) -> None: + self.sections.append(section) + + +class Alignment(enum.Enum): + LEFT = "l" + RIGHT = "r" + CENTER = "c" + + +class Side(ComposableText): + def __init__(self, parent: "Screen", alignment: Alignment) -> None: + super().__init__(parent=parent) + self.alignment = alignment + self.modules: list[Module] = [] + + def composeText(self) -> str: + if not self.modules: + return "" + text = "%{" + self.alignment.value + "}" + lastSection: Section | None = None + for module in self.modules: + for section in module.sections: + if lastSection is None: + if self.alignment == Alignment.LEFT: + text += "%{B" + section.color + "}%{F-}" + else: + text += "%{B-}%{F" + section.color + "}%{R}%{F-}" + else: + if self.alignment == Alignment.RIGHT: + if lastSection.color == section.color: + text += "" + else: + text += "%{F" + section.color + "}%{R}" + else: + if lastSection.color == section.color: + text += "" + else: + text += "%{R}%{B" + section.color + "}" + text += "%{F-}" + text += " " + section.getText() + " " + # FIXME Should be handled by animation + # FIXME Support hidden sections + lastSection = section + if self.alignment != Alignment.RIGHT: + text += "%{R}%{B-}" + return text + + +class Screen(ComposableText): + def __init__(self, parent: "Bar", output: str) -> None: + super().__init__(parent=parent) + self.output = output + + self.sides = dict() + for alignment in Alignment: + self.sides[alignment] = Side(parent=self, alignment=alignment) + + def composeText(self) -> str: + return ("%{Sn" + self.output + "}") + "".join( + side.getText() for side in self.sides.values() + ) + + +ScreenSelection = enum.Enum("ScreenSelection", ["ALL", "PRIMARY", "SECONDARY"]) + + +class Bar(ComposableText): + """ + Top-level + """ + + def __init__(self) -> None: + super().__init__() + self.providers: list["Provider"] = [] + self.refresh = asyncio.Event() + + self.screens = [] + i3 = i3ipc.Connection() + for output in i3.get_outputs(): + if not output.active: + continue + screen = Screen(parent=self, output=output.name) + self.screens.append(screen) + + async def run(self) -> None: + proc = await asyncio.create_subprocess_exec( + "lemonbar", + "-b", + "-a", + "64", + "-f", + "DejaVuSansM Nerd Font:size=10", + stdin=asyncio.subprocess.PIPE, + ) + + async def refresher() -> None: + assert proc.stdin + while True: + await self.refresh.wait() + self.refresh.clear() + proc.stdin.write(self.getText().encode()) + + async with asyncio.TaskGroup() as tg: + tg.create_task(refresher()) + for updater in self.providers: + tg.create_task(updater.run()) + + def composeText(self) -> str: + return "".join(section.getText() for section in self.screens) + "\n" + + def addProvider( + self, + provider: "Provider", + alignment: Alignment = Alignment.LEFT, + screenSelection: ScreenSelection = ScreenSelection.ALL, + ) -> None: + # FIXME Actually have a screenNum and screenCount args + modules = list() + for s, screen in enumerate(self.screens): + if ( + screenSelection == ScreenSelection.ALL + or (screenSelection == ScreenSelection.PRIMARY and s == 0) + or (screenSelection == ScreenSelection.SECONDARY and s == 1) + ): + side = screen.sides[alignment] + module = Module(parent=side) + side.modules.append(module) + modules.append(module) + provider.modules = modules + if modules: + self.providers.append(provider) + + +class Provider: + def __init__(self) -> None: + self.modules: list[Module] = list() + + async def run(self) -> None: + raise NotImplementedError() + + +class TimeProvider(Provider): + async def run(self) -> None: + sections = list() + for module in self.modules: + section = Section(parent=module) + module.sections.append(section) + sections.append(section) + # FIXME Allow for mirror(ed) modules so no need for updaters to handle all + + while True: + for s, section in enumerate(sections): + now = datetime.datetime.now() + section.setText(now.strftime("%a %y-%m-%d %H:%M:%S")) + await asyncio.sleep(1) + + +async def main() -> None: + bar = Bar() + bar.addProvider(TimeProvider()) + # bar.addProvider(TimeProvider(), screenSelection=ScreenSelection.PRIMARY) + # bar.addProvider(TimeProvider(), alignment=Alignment.CENTER) + # bar.addProvider(TimeProvider(), alignment=Alignment.CENTER) + # bar.addProvider(TimeProvider(), alignment=Alignment.RIGHT) + # bar.addProvider(TimeProvider(), alignment=Alignment.RIGHT) + await bar.run() + # FIXME Why time not updating? + + +asyncio.run(main()) diff --git a/hm/desktop/frobar/.dev/oldbar.py b/hm/desktop/frobar/.dev/oldbar.py deleted file mode 100755 index 4032e22..0000000 --- a/hm/desktop/frobar/.dev/oldbar.py +++ /dev/null @@ -1,199 +0,0 @@ -#!/usr/bin/env python3 - -""" -Debugging script -""" - -import i3ipc -import os -import psutil - -# import alsaaudio -from time import time -import subprocess - -i3 = i3ipc.Connection() -lemonbar = subprocess.Popen(["lemonbar", "-b"], stdin=subprocess.PIPE) - -# Utils -def upChart(p): - block = " ▁▂▃▄▅▆▇█" - return block[round(p * (len(block) - 1))] - - -def humanSizeOf(num, suffix="B"): # TODO Credit - for unit in ["", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"]: - if abs(num) < 1024.0: - return "%3.0f%2s%s" % (num, unit, suffix) - num /= 1024.0 - return "%.0f%2s%s" % (num, "Yi", suffix) - - -# Values -mode = "" -container = i3.get_tree().find_focused() -workspaces = i3.get_workspaces() -outputs = i3.get_outputs() - -username = os.environ["USER"] -hostname = os.environ["HOSTNAME"] -if "-" in hostname: - hostname = hostname.split("-")[-1] - -oldNetIO = dict() -oldTime = time() - - -def update(): - activeOutputs = sorted( - sorted(list(filter(lambda o: o.active, outputs)), key=lambda o: o.rect.y), - key=lambda o: o.rect.x, - ) - z = "" - for aOutput in range(len(activeOutputs)): - output = activeOutputs[aOutput] - # Mode || Workspaces - t = [] - if mode != "": - t.append(mode) - else: - t.append( - " ".join( - [ - (w.name.upper() if w.focused else w.name) - for w in workspaces - if w.output == output.name - ] - ) - ) - - # Windows Title - # if container: - # t.append(container.name) - - # CPU - t.append( - "C" + "".join([upChart(p / 100) for p in psutil.cpu_percent(percpu=True)]) - ) - - # Memory - t.append( - "M" - + str(round(psutil.virtual_memory().percent)) - + "% " - + "S" - + str(round(psutil.swap_memory().percent)) - + "%" - ) - - # Disks - d = [] - for disk in psutil.disk_partitions(): - e = "" - if disk.device.startswith("/dev/sd"): - e += "S" + disk.device[-2:].upper() - elif disk.device.startswith("/dev/mmcblk"): - e += "M" + disk.device[-3] + disk.device[-1] - else: - e += "?" - e += " " - e += str(round(psutil.disk_usage(disk.mountpoint).percent)) + "%" - d.append(e) - t.append(" ".join(d)) - - # Network - netStats = psutil.net_if_stats() - netIO = psutil.net_io_counters(pernic=True) - net = [] - for iface in filter(lambda i: i != "lo" and netStats[i].isup, netStats.keys()): - s = "" - if iface.startswith("eth"): - s += "E" - elif iface.startswith("wlan"): - s += "W" - else: - s += "?" - - s += " " - now = time() - global oldNetIO, oldTime - - sent = ( - (oldNetIO[iface].bytes_sent if iface in oldNetIO else 0) - - (netIO[iface].bytes_sent if iface in netIO else 0) - ) / (oldTime - now) - recv = ( - (oldNetIO[iface].bytes_recv if iface in oldNetIO else 0) - - (netIO[iface].bytes_recv if iface in netIO else 0) - ) / (oldTime - now) - s += ( - "↓" - + humanSizeOf(abs(recv), "B/s") - + " ↑" - + humanSizeOf(abs(sent), "B/s") - ) - - oldNetIO = netIO - oldTime = now - - net.append(s) - t.append(" ".join(net)) - - # Battery - if os.path.isdir("/sys/class/power_supply/BAT0"): - with open("/sys/class/power_supply/BAT0/charge_now") as f: - charge_now = int(f.read()) - with open("/sys/class/power_supply/BAT0/charge_full_design") as f: - charge_full = int(f.read()) - t.append("B" + str(round(100 * charge_now / charge_full)) + "%") - - # Volume - # t.append('V ' + str(alsaaudio.Mixer('Master').getvolume()[0]) + '%') - - t.append(username + "@" + hostname) - - # print(' - '.join(t)) - # t = [output.name] - - z += " - ".join(t) + "%{S" + str(aOutput + 1) + "}" - # lemonbar.stdin.write(bytes(' - '.join(t), 'utf-8')) - # lemonbar.stdin.write(bytes('%{S' + str(aOutput + 1) + '}', 'utf-8')) - - lemonbar.stdin.write(bytes(z + "\n", "utf-8")) - lemonbar.stdin.flush() - - -# Event listeners -def on_mode(i3, e): - global mode - if e.change == "default": - mode = "" - else: - mode = e.change - update() - - -i3.on("mode", on_mode) - -# def on_window_focus(i3, e): -# global container -# container = e.container -# update() -# -# i3.on("window::focus", on_window_focus) - - -def on_workspace_focus(i3, e): - global workspaces - workspaces = i3.get_workspaces() - update() - - -i3.on("workspace::focus", on_workspace_focus) - -# Starting - -update() - - -i3.main() diff --git a/hm/desktop/frobar/.dev/pip.py b/hm/desktop/frobar/.dev/pip.py deleted file mode 100755 index a51dc82..0000000 --- a/hm/desktop/frobar/.dev/pip.py +++ /dev/null @@ -1,327 +0,0 @@ -#!/usr/bin/env python3 - -""" -Beautiful script -""" - -import subprocess -import time -import datetime -import os -import multiprocessing -import i3ipc -import difflib - -# Constants -FONT = "DejaVuSansMono Nerd Font Mono" - -# TODO Update to be in sync with base16 -thm = [ - "#002b36", - "#dc322f", - "#859900", - "#b58900", - "#268bd2", - "#6c71c4", - "#2aa198", - "#93a1a1", - "#657b83", - "#dc322f", - "#859900", - "#b58900", - "#268bd2", - "#6c71c4", - "#2aa198", - "#fdf6e3", -] -fg = "#93a1a1" -bg = "#002b36" - -THEMES = { - "CENTER": (fg, bg), - "DEFAULT": (thm[0], thm[8]), - "1": (thm[0], thm[9]), - "2": (thm[0], thm[10]), - "3": (thm[0], thm[11]), - "4": (thm[0], thm[12]), - "5": (thm[0], thm[13]), - "6": (thm[0], thm[14]), - "7": (thm[0], thm[15]), -} - -# Utils - - -def fitText(text, size): - """ - Add spaces or cut a string to be `size` characters long - """ - if size > 0: - t = len(text) - if t >= size: - return text[:size] - else: - diff = size - t - return text + " " * diff - else: - return "" - - -def fgColor(theme): - global THEMES - return THEMES[theme][0] - - -def bgColor(theme): - global THEMES - return THEMES[theme][1] - - -class Section: - def __init__(self, theme="DEFAULT"): - self.text = "" - self.size = 0 - self.toSize = 0 - self.theme = theme - self.visible = False - self.name = "" - - def update(self, text): - if text == "": - self.toSize = 0 - else: - if len(text) < len(self.text): - self.text = text + self.text[len(text) :] - else: - self.text = text - self.toSize = len(text) + 3 - - def updateSize(self): - """ - Set the size for the next frame of animation - Return if another frame is needed - """ - if self.toSize > self.size: - self.size += 1 - elif self.toSize < self.size: - self.size -= 1 - self.visible = self.size - return self.toSize == self.size - - def draw(self, left=True, nextTheme="DEFAULT"): - s = "" - if self.visible: - if not left: - if self.theme == nextTheme: - s += "" - else: - s += "%{F" + bgColor(self.theme) + "}" - s += "%{B" + bgColor(nextTheme) + "}" - s += "" - s += "%{F" + fgColor(self.theme) + "}" - s += "%{B" + bgColor(self.theme) + "}" - s += " " if self.size > 1 else "" - s += fitText(self.text, self.size - 3) - s += " " if self.size > 2 else "" - if left: - if self.theme == nextTheme: - s += "" - else: - s += "%{F" + bgColor(self.theme) + "}" - s += "%{B" + bgColor(nextTheme) + "}" - s += "" - return s - - -# Section definition -sTime = Section("3") - -hostname = os.environ["HOSTNAME"].split(".")[0] -sHost = Section("2") -sHost.update( - os.environ["USER"] + "@" + hostname.split("-")[-1] if "-" in hostname else hostname -) - - -# Groups definition -gLeft = [] -gRight = [sTime, sHost] - -# Bar handling -bar = subprocess.Popen(["lemonbar", "-f", FONT, "-b"], stdin=subprocess.PIPE) - - -def updateBar(): - global timeLastUpdate, timeUpdate - global gLeft, gRight - global outputs - - text = "" - for oi in range(len(outputs)): - output = outputs[oi] - gLeftFiltered = list( - filter( - lambda s: s.visible and (not s.output or s.output == output.name), gLeft - ) - ) - tLeft = "" - l = len(gLeftFiltered) - for gi in range(l): - g = gLeftFiltered[gi] - # Next visible section for transition - nextTheme = gLeftFiltered[gi + 1].theme if gi + 1 < l else "CENTER" - tLeft = tLeft + g.draw(True, nextTheme) - - tRight = "" - for gi in range(len(gRight)): - g = gRight[gi] - nextTheme = "CENTER" - for gn in gRight[gi + 1 :]: - if gn.visible: - nextTheme = gn.theme - break - tRight = g.draw(False, nextTheme) + tRight - text += ( - "%{l}" - + tLeft - + "%{r}" - + tRight - + "%{B" - + bgColor("CENTER") - + "}" - + "%{S" - + str(oi + 1) - + "}" - ) - - bar.stdin.write(bytes(text + "\n", "utf-8")) - bar.stdin.flush() - - -# Values -i3 = i3ipc.Connection() -outputs = [] - - -def on_output(): - global outputs - outputs = sorted( - sorted( - list(filter(lambda o: o.active, i3.get_outputs())), key=lambda o: o.rect.y - ), - key=lambda o: o.rect.x, - ) - - -on_output() - - -def on_workspace_focus(): - global i3 - global gLeft - workspaces = i3.get_workspaces() - wNames = [w.name for w in workspaces] - sNames = [s.name for s in gLeft] - - newGLeft = [] - - def actuate(section, workspace): - if workspace: - section.name = workspace.name - section.output = workspace.output - if workspace.visible: - section.update(workspace.name) - else: - section.update(workspace.name.split(" ")[0]) - - if workspace.focused: - section.theme = "4" - elif workspace.urgent: - section.theme = "1" - else: - section.theme = "6" - else: - section.update("") - section.theme = "6" - - for tag, i, j, k, l in difflib.SequenceMatcher(None, sNames, wNames).get_opcodes(): - if tag == "equal": # If the workspaces didn't changed - for a in range(j - i): - workspace = workspaces[k + a] - section = gLeft[i + a] - actuate(section, workspace) - newGLeft.append(section) - if tag in ("delete", "replace"): # If the workspaces were removed - for section in gLeft[i:j]: - if section.visible: - actuate(section, None) - newGLeft.append(section) - else: - del section - if tag in ("insert", "replace"): # If the workspaces were removed - for workspace in workspaces[k:l]: - section = Section() - actuate(section, workspace) - newGLeft.append(section) - gLeft = newGLeft - - updateBar() - - -on_workspace_focus() - - -def i3events(i3childPipe): - global i3 - - # Proxy functions - def on_workspace_focus(i3, e): - global i3childPipe - i3childPipe.send("on_workspace_focus") - - i3.on("workspace::focus", on_workspace_focus) - - def on_output(i3, e): - global i3childPipe - i3childPipe.send("on_output") - - i3.on("output", on_output) - - i3.main() - - -i3parentPipe, i3childPipe = multiprocessing.Pipe() -i3process = multiprocessing.Process(target=i3events, args=(i3childPipe,)) -i3process.start() - - -def updateValues(): - # Time - now = datetime.datetime.now() - sTime.update(now.strftime("%x %X")) - - -def updateAnimation(): - for s in set(gLeft + gRight): - s.updateSize() - updateBar() - - -lastUpdate = 0 -while True: - now = time.time() - if i3parentPipe.poll(): - msg = i3parentPipe.recv() - if msg == "on_workspace_focus": - on_workspace_focus() - elif msg == "on_output": - on_output() - # TODO Restart lemonbar - else: - print(msg) - updateAnimation() - if now >= lastUpdate + 1: - updateValues() - lastUpdate = now - - time.sleep(0.05) diff --git a/hm/desktop/frobar/.dev/x.py b/hm/desktop/frobar/.dev/x.py deleted file mode 100755 index ff08c01..0000000 --- a/hm/desktop/frobar/.dev/x.py +++ /dev/null @@ -1,10 +0,0 @@ -#!/usr/bin/env python3 - -import Xlib.display - -dis = Xlib.display.Display() - -nb = dis.screen_count() - -for s in range(nb): - print(s) diff --git a/hm/desktop/frobar/default.nix b/hm/desktop/frobar/default.nix index ca2c174..f139f58 100644 --- a/hm/desktop/frobar/default.nix +++ b/hm/desktop/frobar/default.nix @@ -12,7 +12,7 @@ let in # Tried using pyproject.nix but mpd2 dependency wouldn't resolve, # is called pyton-mpd2 on PyPi but mpd2 in nixpkgs. -pkgs.python3Packages.buildPythonApplication { +pkgs.python3Packages.buildPythonApplication rec { pname = "frobar"; version = "2.0"; @@ -25,7 +25,8 @@ pkgs.python3Packages.buildPythonApplication { pulsectl pyinotify ]; - makeWrapperArgs = [ "--prefix PATH : ${pkgs.lib.makeBinPath ([ lemonbar ] ++ (with pkgs; [ wirelesstools playerctl ]))}" ]; + nativeBuildInputs = [ lemonbar ] ++ (with pkgs; [ wirelesstools playerctl ]); + makeWrapperArgs = [ "--prefix PATH : ${pkgs.lib.makeBinPath nativeBuildInputs}" ]; src = ./.; } diff --git a/hm/desktop/frobar/frobar/providers.py b/hm/desktop/frobar/frobar/providers.py index 21c22e6..e0a32f8 100644 --- a/hm/desktop/frobar/frobar/providers.py +++ b/hm/desktop/frobar/frobar/providers.py @@ -278,7 +278,7 @@ class PulseaudioProvider(StatefulSection, ThreadedUpdater): icon = "" elif sink.port_active.name == "analog-output-speaker": icon = "" if sink.mute else "" - elif sink.port_active.name == "headset-output": + elif sink.port_active.name in ("headset-output", "headphone-output"): icon = "" else: icon = "?"