frobar: Some dev

This commit is contained in:
Geoffrey Frogeye 2024-08-13 01:58:51 +02:00
parent 86f9f75bd7
commit 139d3a3ea8
Signed by: geoffrey
GPG key ID: C72403E7F82E6AD8
7 changed files with 245 additions and 633 deletions

View file

@ -1,94 +0,0 @@
#!/usr/bin/env python3
import typing
import subprocess
import time
# CORE
class Notifier:
pass
class Section:
def __init__(self) -> None:
self.text = b"(Loading)"
class Module:
def __init__(self) -> None:
self.bar: "Bar"
self.section = Section()
self.sections = [self.section]
class Alignment:
def __init__(self, *modules: Module) -> None:
self.bar: "Bar"
self.modules = modules
for module in modules:
module.bar = self.bar
class Screen:
def __init__(self, left: Alignment = Alignment(), right: Alignment = Alignment()) -> None:
self.bar: "Bar"
self.left = left
self.left.bar = self.bar
self.right = right or Alignment()
self.right.bar = self.bar
class Bar:
def __init__(self, *screens: Screen) -> None:
self.screens = screens
for screen in screens:
screen.bar = self
self.process = subprocess.Popen(["lemonbar"], stdin=subprocess.PIPE)
def display(self) -> None:
string = b""
for s, screen in enumerate(self.screens):
string += b"%%{S%d}" % s
for control, alignment in [(b'%{l}', screen.left), (b'%{r}', screen.right)]:
string += control
for module in alignment.modules:
for section in module.sections:
string += b"<%b> |" % section.text
string += b"\n"
print(string)
assert self.process.stdin
self.process.stdin.write(string)
self.process.stdin.flush()
def run(self) -> None:
while True:
self.display()
time.sleep(1)
# REUSABLE
class ClockNotifier(Notifier):
def run(self) -> None:
while True:
def __init__(self, text: bytes):
super().__init__()
self.section.text = text
class StaticModule(Module):
def __init__(self, text: bytes):
super().__init__()
self.section.text = text
# USER
if __name__ == "__main__":
bar = Bar(
Screen(Alignment(StaticModule(b"A"))),
Screen(Alignment(StaticModule(b"B"))),
)
bar.run()

View file

@ -0,0 +1,241 @@
#!/usr/bin/env python3
import asyncio
import datetime
import enum
import random
import typing
import i3ipc
class ComposableText:
def __init__(self, parent: typing.Optional["ComposableText"] = None) -> None:
self.parent = parent
prevParent = self
while parent:
prevParent = parent
parent = parent.parent
assert isinstance(prevParent, Bar)
self.bar: Bar = prevParent
self.text: str
self.needsComposition = True
def composeText(self) -> str:
raise NotImplementedError(f"{self} cannot compose text")
def getText(self) -> str:
if self.needsComposition:
self.text = self.composeText()
print(f"{self} composed {self.text}")
self.needsComposition = False
return self.text
def setText(self, text: str) -> None:
self.text = text
self.needsComposition = False
parent = self.parent
while parent and not parent.needsComposition:
parent.needsComposition = True
parent = parent.parent
self.bar.refresh.set()
def randomColor(seed: int | bytes | None = None) -> str:
if seed is not None:
random.seed(seed)
return "#" + "".join(f"{random.randint(0, 0xff):02x}" for _ in range(3))
class Section(ComposableText):
"""
Colorable block separated by chevrons
"""
def __init__(self, parent: "Module") -> None:
super().__init__(parent=parent)
self.color = randomColor()
class Module(ComposableText):
"""
Sections handled by a same updater
"""
def __init__(self, parent: "Side") -> None:
super().__init__(parent=parent)
self.sections: list[Section] = []
def appendSection(self, section: Section) -> None:
self.sections.append(section)
class Alignment(enum.Enum):
LEFT = "l"
RIGHT = "r"
CENTER = "c"
class Side(ComposableText):
def __init__(self, parent: "Screen", alignment: Alignment) -> None:
super().__init__(parent=parent)
self.alignment = alignment
self.modules: list[Module] = []
def composeText(self) -> str:
if not self.modules:
return ""
text = "%{" + self.alignment.value + "}"
lastSection: Section | None = None
for module in self.modules:
for section in module.sections:
if lastSection is None:
if self.alignment == Alignment.LEFT:
text += "%{B" + section.color + "}%{F-}"
else:
text += "%{B-}%{F" + section.color + "}%{R}%{F-}"
else:
if self.alignment == Alignment.RIGHT:
if lastSection.color == section.color:
text += ""
else:
text += "%{F" + section.color + "}%{R}"
else:
if lastSection.color == section.color:
text += ""
else:
text += "%{R}%{B" + section.color + "}"
text += "%{F-}"
text += " " + section.getText() + " "
# FIXME Should be handled by animation
# FIXME Support hidden sections
lastSection = section
if self.alignment != Alignment.RIGHT:
text += "%{R}%{B-}"
return text
class Screen(ComposableText):
def __init__(self, parent: "Bar", output: str) -> None:
super().__init__(parent=parent)
self.output = output
self.sides = dict()
for alignment in Alignment:
self.sides[alignment] = Side(parent=self, alignment=alignment)
def composeText(self) -> str:
return ("%{Sn" + self.output + "}") + "".join(
side.getText() for side in self.sides.values()
)
ScreenSelection = enum.Enum("ScreenSelection", ["ALL", "PRIMARY", "SECONDARY"])
class Bar(ComposableText):
"""
Top-level
"""
def __init__(self) -> None:
super().__init__()
self.providers: list["Provider"] = []
self.refresh = asyncio.Event()
self.screens = []
i3 = i3ipc.Connection()
for output in i3.get_outputs():
if not output.active:
continue
screen = Screen(parent=self, output=output.name)
self.screens.append(screen)
async def run(self) -> None:
proc = await asyncio.create_subprocess_exec(
"lemonbar",
"-b",
"-a",
"64",
"-f",
"DejaVuSansM Nerd Font:size=10",
stdin=asyncio.subprocess.PIPE,
)
async def refresher() -> None:
assert proc.stdin
while True:
await self.refresh.wait()
self.refresh.clear()
proc.stdin.write(self.getText().encode())
async with asyncio.TaskGroup() as tg:
tg.create_task(refresher())
for updater in self.providers:
tg.create_task(updater.run())
def composeText(self) -> str:
return "".join(section.getText() for section in self.screens) + "\n"
def addProvider(
self,
provider: "Provider",
alignment: Alignment = Alignment.LEFT,
screenSelection: ScreenSelection = ScreenSelection.ALL,
) -> None:
# FIXME Actually have a screenNum and screenCount args
modules = list()
for s, screen in enumerate(self.screens):
if (
screenSelection == ScreenSelection.ALL
or (screenSelection == ScreenSelection.PRIMARY and s == 0)
or (screenSelection == ScreenSelection.SECONDARY and s == 1)
):
side = screen.sides[alignment]
module = Module(parent=side)
side.modules.append(module)
modules.append(module)
provider.modules = modules
if modules:
self.providers.append(provider)
class Provider:
def __init__(self) -> None:
self.modules: list[Module] = list()
async def run(self) -> None:
raise NotImplementedError()
class TimeProvider(Provider):
async def run(self) -> None:
sections = list()
for module in self.modules:
section = Section(parent=module)
module.sections.append(section)
sections.append(section)
# FIXME Allow for mirror(ed) modules so no need for updaters to handle all
while True:
for s, section in enumerate(sections):
now = datetime.datetime.now()
section.setText(now.strftime("%a %y-%m-%d %H:%M:%S"))
await asyncio.sleep(1)
async def main() -> None:
bar = Bar()
bar.addProvider(TimeProvider())
# bar.addProvider(TimeProvider(), screenSelection=ScreenSelection.PRIMARY)
# bar.addProvider(TimeProvider(), alignment=Alignment.CENTER)
# bar.addProvider(TimeProvider(), alignment=Alignment.CENTER)
# bar.addProvider(TimeProvider(), alignment=Alignment.RIGHT)
# bar.addProvider(TimeProvider(), alignment=Alignment.RIGHT)
await bar.run()
# FIXME Why time not updating?
asyncio.run(main())

View file

@ -1,199 +0,0 @@
#!/usr/bin/env python3
"""
Debugging script
"""
import i3ipc
import os
import psutil
# import alsaaudio
from time import time
import subprocess
i3 = i3ipc.Connection()
lemonbar = subprocess.Popen(["lemonbar", "-b"], stdin=subprocess.PIPE)
# Utils
def upChart(p):
block = " ▁▂▃▄▅▆▇█"
return block[round(p * (len(block) - 1))]
def humanSizeOf(num, suffix="B"): # TODO Credit
for unit in ["", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"]:
if abs(num) < 1024.0:
return "%3.0f%2s%s" % (num, unit, suffix)
num /= 1024.0
return "%.0f%2s%s" % (num, "Yi", suffix)
# Values
mode = ""
container = i3.get_tree().find_focused()
workspaces = i3.get_workspaces()
outputs = i3.get_outputs()
username = os.environ["USER"]
hostname = os.environ["HOSTNAME"]
if "-" in hostname:
hostname = hostname.split("-")[-1]
oldNetIO = dict()
oldTime = time()
def update():
activeOutputs = sorted(
sorted(list(filter(lambda o: o.active, outputs)), key=lambda o: o.rect.y),
key=lambda o: o.rect.x,
)
z = ""
for aOutput in range(len(activeOutputs)):
output = activeOutputs[aOutput]
# Mode || Workspaces
t = []
if mode != "":
t.append(mode)
else:
t.append(
" ".join(
[
(w.name.upper() if w.focused else w.name)
for w in workspaces
if w.output == output.name
]
)
)
# Windows Title
# if container:
# t.append(container.name)
# CPU
t.append(
"C" + "".join([upChart(p / 100) for p in psutil.cpu_percent(percpu=True)])
)
# Memory
t.append(
"M"
+ str(round(psutil.virtual_memory().percent))
+ "% "
+ "S"
+ str(round(psutil.swap_memory().percent))
+ "%"
)
# Disks
d = []
for disk in psutil.disk_partitions():
e = ""
if disk.device.startswith("/dev/sd"):
e += "S" + disk.device[-2:].upper()
elif disk.device.startswith("/dev/mmcblk"):
e += "M" + disk.device[-3] + disk.device[-1]
else:
e += "?"
e += " "
e += str(round(psutil.disk_usage(disk.mountpoint).percent)) + "%"
d.append(e)
t.append(" ".join(d))
# Network
netStats = psutil.net_if_stats()
netIO = psutil.net_io_counters(pernic=True)
net = []
for iface in filter(lambda i: i != "lo" and netStats[i].isup, netStats.keys()):
s = ""
if iface.startswith("eth"):
s += "E"
elif iface.startswith("wlan"):
s += "W"
else:
s += "?"
s += " "
now = time()
global oldNetIO, oldTime
sent = (
(oldNetIO[iface].bytes_sent if iface in oldNetIO else 0)
- (netIO[iface].bytes_sent if iface in netIO else 0)
) / (oldTime - now)
recv = (
(oldNetIO[iface].bytes_recv if iface in oldNetIO else 0)
- (netIO[iface].bytes_recv if iface in netIO else 0)
) / (oldTime - now)
s += (
""
+ humanSizeOf(abs(recv), "B/s")
+ ""
+ humanSizeOf(abs(sent), "B/s")
)
oldNetIO = netIO
oldTime = now
net.append(s)
t.append(" ".join(net))
# Battery
if os.path.isdir("/sys/class/power_supply/BAT0"):
with open("/sys/class/power_supply/BAT0/charge_now") as f:
charge_now = int(f.read())
with open("/sys/class/power_supply/BAT0/charge_full_design") as f:
charge_full = int(f.read())
t.append("B" + str(round(100 * charge_now / charge_full)) + "%")
# Volume
# t.append('V ' + str(alsaaudio.Mixer('Master').getvolume()[0]) + '%')
t.append(username + "@" + hostname)
# print(' - '.join(t))
# t = [output.name]
z += " - ".join(t) + "%{S" + str(aOutput + 1) + "}"
# lemonbar.stdin.write(bytes(' - '.join(t), 'utf-8'))
# lemonbar.stdin.write(bytes('%{S' + str(aOutput + 1) + '}', 'utf-8'))
lemonbar.stdin.write(bytes(z + "\n", "utf-8"))
lemonbar.stdin.flush()
# Event listeners
def on_mode(i3, e):
global mode
if e.change == "default":
mode = ""
else:
mode = e.change
update()
i3.on("mode", on_mode)
# def on_window_focus(i3, e):
# global container
# container = e.container
# update()
#
# i3.on("window::focus", on_window_focus)
def on_workspace_focus(i3, e):
global workspaces
workspaces = i3.get_workspaces()
update()
i3.on("workspace::focus", on_workspace_focus)
# Starting
update()
i3.main()

View file

@ -1,327 +0,0 @@
#!/usr/bin/env python3
"""
Beautiful script
"""
import subprocess
import time
import datetime
import os
import multiprocessing
import i3ipc
import difflib
# Constants
FONT = "DejaVuSansMono Nerd Font Mono"
# TODO Update to be in sync with base16
thm = [
"#002b36",
"#dc322f",
"#859900",
"#b58900",
"#268bd2",
"#6c71c4",
"#2aa198",
"#93a1a1",
"#657b83",
"#dc322f",
"#859900",
"#b58900",
"#268bd2",
"#6c71c4",
"#2aa198",
"#fdf6e3",
]
fg = "#93a1a1"
bg = "#002b36"
THEMES = {
"CENTER": (fg, bg),
"DEFAULT": (thm[0], thm[8]),
"1": (thm[0], thm[9]),
"2": (thm[0], thm[10]),
"3": (thm[0], thm[11]),
"4": (thm[0], thm[12]),
"5": (thm[0], thm[13]),
"6": (thm[0], thm[14]),
"7": (thm[0], thm[15]),
}
# Utils
def fitText(text, size):
"""
Add spaces or cut a string to be `size` characters long
"""
if size > 0:
t = len(text)
if t >= size:
return text[:size]
else:
diff = size - t
return text + " " * diff
else:
return ""
def fgColor(theme):
global THEMES
return THEMES[theme][0]
def bgColor(theme):
global THEMES
return THEMES[theme][1]
class Section:
def __init__(self, theme="DEFAULT"):
self.text = ""
self.size = 0
self.toSize = 0
self.theme = theme
self.visible = False
self.name = ""
def update(self, text):
if text == "":
self.toSize = 0
else:
if len(text) < len(self.text):
self.text = text + self.text[len(text) :]
else:
self.text = text
self.toSize = len(text) + 3
def updateSize(self):
"""
Set the size for the next frame of animation
Return if another frame is needed
"""
if self.toSize > self.size:
self.size += 1
elif self.toSize < self.size:
self.size -= 1
self.visible = self.size
return self.toSize == self.size
def draw(self, left=True, nextTheme="DEFAULT"):
s = ""
if self.visible:
if not left:
if self.theme == nextTheme:
s += ""
else:
s += "%{F" + bgColor(self.theme) + "}"
s += "%{B" + bgColor(nextTheme) + "}"
s += ""
s += "%{F" + fgColor(self.theme) + "}"
s += "%{B" + bgColor(self.theme) + "}"
s += " " if self.size > 1 else ""
s += fitText(self.text, self.size - 3)
s += " " if self.size > 2 else ""
if left:
if self.theme == nextTheme:
s += ""
else:
s += "%{F" + bgColor(self.theme) + "}"
s += "%{B" + bgColor(nextTheme) + "}"
s += ""
return s
# Section definition
sTime = Section("3")
hostname = os.environ["HOSTNAME"].split(".")[0]
sHost = Section("2")
sHost.update(
os.environ["USER"] + "@" + hostname.split("-")[-1] if "-" in hostname else hostname
)
# Groups definition
gLeft = []
gRight = [sTime, sHost]
# Bar handling
bar = subprocess.Popen(["lemonbar", "-f", FONT, "-b"], stdin=subprocess.PIPE)
def updateBar():
global timeLastUpdate, timeUpdate
global gLeft, gRight
global outputs
text = ""
for oi in range(len(outputs)):
output = outputs[oi]
gLeftFiltered = list(
filter(
lambda s: s.visible and (not s.output or s.output == output.name), gLeft
)
)
tLeft = ""
l = len(gLeftFiltered)
for gi in range(l):
g = gLeftFiltered[gi]
# Next visible section for transition
nextTheme = gLeftFiltered[gi + 1].theme if gi + 1 < l else "CENTER"
tLeft = tLeft + g.draw(True, nextTheme)
tRight = ""
for gi in range(len(gRight)):
g = gRight[gi]
nextTheme = "CENTER"
for gn in gRight[gi + 1 :]:
if gn.visible:
nextTheme = gn.theme
break
tRight = g.draw(False, nextTheme) + tRight
text += (
"%{l}"
+ tLeft
+ "%{r}"
+ tRight
+ "%{B"
+ bgColor("CENTER")
+ "}"
+ "%{S"
+ str(oi + 1)
+ "}"
)
bar.stdin.write(bytes(text + "\n", "utf-8"))
bar.stdin.flush()
# Values
i3 = i3ipc.Connection()
outputs = []
def on_output():
global outputs
outputs = sorted(
sorted(
list(filter(lambda o: o.active, i3.get_outputs())), key=lambda o: o.rect.y
),
key=lambda o: o.rect.x,
)
on_output()
def on_workspace_focus():
global i3
global gLeft
workspaces = i3.get_workspaces()
wNames = [w.name for w in workspaces]
sNames = [s.name for s in gLeft]
newGLeft = []
def actuate(section, workspace):
if workspace:
section.name = workspace.name
section.output = workspace.output
if workspace.visible:
section.update(workspace.name)
else:
section.update(workspace.name.split(" ")[0])
if workspace.focused:
section.theme = "4"
elif workspace.urgent:
section.theme = "1"
else:
section.theme = "6"
else:
section.update("")
section.theme = "6"
for tag, i, j, k, l in difflib.SequenceMatcher(None, sNames, wNames).get_opcodes():
if tag == "equal": # If the workspaces didn't changed
for a in range(j - i):
workspace = workspaces[k + a]
section = gLeft[i + a]
actuate(section, workspace)
newGLeft.append(section)
if tag in ("delete", "replace"): # If the workspaces were removed
for section in gLeft[i:j]:
if section.visible:
actuate(section, None)
newGLeft.append(section)
else:
del section
if tag in ("insert", "replace"): # If the workspaces were removed
for workspace in workspaces[k:l]:
section = Section()
actuate(section, workspace)
newGLeft.append(section)
gLeft = newGLeft
updateBar()
on_workspace_focus()
def i3events(i3childPipe):
global i3
# Proxy functions
def on_workspace_focus(i3, e):
global i3childPipe
i3childPipe.send("on_workspace_focus")
i3.on("workspace::focus", on_workspace_focus)
def on_output(i3, e):
global i3childPipe
i3childPipe.send("on_output")
i3.on("output", on_output)
i3.main()
i3parentPipe, i3childPipe = multiprocessing.Pipe()
i3process = multiprocessing.Process(target=i3events, args=(i3childPipe,))
i3process.start()
def updateValues():
# Time
now = datetime.datetime.now()
sTime.update(now.strftime("%x %X"))
def updateAnimation():
for s in set(gLeft + gRight):
s.updateSize()
updateBar()
lastUpdate = 0
while True:
now = time.time()
if i3parentPipe.poll():
msg = i3parentPipe.recv()
if msg == "on_workspace_focus":
on_workspace_focus()
elif msg == "on_output":
on_output()
# TODO Restart lemonbar
else:
print(msg)
updateAnimation()
if now >= lastUpdate + 1:
updateValues()
lastUpdate = now
time.sleep(0.05)

View file

@ -1,10 +0,0 @@
#!/usr/bin/env python3
import Xlib.display
dis = Xlib.display.Display()
nb = dis.screen_count()
for s in range(nb):
print(s)

View file

@ -12,7 +12,7 @@ let
in
# Tried using pyproject.nix but mpd2 dependency wouldn't resolve,
# is called pyton-mpd2 on PyPi but mpd2 in nixpkgs.
pkgs.python3Packages.buildPythonApplication {
pkgs.python3Packages.buildPythonApplication rec {
pname = "frobar";
version = "2.0";
@ -25,7 +25,8 @@ pkgs.python3Packages.buildPythonApplication {
pulsectl
pyinotify
];
makeWrapperArgs = [ "--prefix PATH : ${pkgs.lib.makeBinPath ([ lemonbar ] ++ (with pkgs; [ wirelesstools playerctl ]))}" ];
nativeBuildInputs = [ lemonbar ] ++ (with pkgs; [ wirelesstools playerctl ]);
makeWrapperArgs = [ "--prefix PATH : ${pkgs.lib.makeBinPath nativeBuildInputs}" ];
src = ./.;
}

View file

@ -278,7 +278,7 @@ class PulseaudioProvider(StatefulSection, ThreadedUpdater):
icon = ""
elif sink.port_active.name == "analog-output-speaker":
icon = "" if sink.mute else ""
elif sink.port_active.name == "headset-output":
elif sink.port_active.name in ("headset-output", "headphone-output"):
icon = ""
else:
icon = "?"