This commit is contained in:
Geoffrey Frogeye 2025-05-27 19:26:30 +02:00
parent 492f085d52
commit bb021a1aae
30 changed files with 487 additions and 573 deletions

View file

@ -1,23 +1,12 @@
{
pkgs ? import <nixpkgs> {
# nixpkgs ? builtins.getFlake "github:GeoffreyFrogeye/nixpkgs/zelbar",
nixpkgs ? /nix/store/8g86qw3c2fr56bhhvqznrlic4jig9hb3-source,
pkgs ? import nixpkgs {
config = { };
overlays = [ ];
},
...
}:
let
lemonbar = (
pkgs.lemonbar-xft.overrideAttrs (old: {
src = pkgs.fetchFromGitHub {
owner = "drscream";
repo = "lemonbar-xft";
rev = "a64a2a6a6d643f4d92f9d7600722710eebce7bdb";
sha256 = "sha256-T5FhEPIiDt/9paJwL9Sj84CBtA0YFi1hZz0+87Hd6jU=";
# https://github.com/drscream/lemonbar-xft/pull/2
};
})
);
in
# Tried using pyproject.nix but mpd2 dependency wouldn't resolve,
# is called pyton-mpd2 on PyPi but mpd2 in nixpkgs.
pkgs.python3Packages.buildPythonApplication rec {
@ -31,12 +20,12 @@ pkgs.python3Packages.buildPythonApplication rec {
pygobject3
rich
];
nativeBuildInputs =
[ lemonbar ]
++ (with pkgs; [
wirelesstools
playerctl
]);
# TODO Might just be buildInputs, maybe without the need for prefix?
nativeBuildInputs = with pkgs; [
wirelesstools
playerctl
zelbar
];
makeWrapperArgs = [
"--prefix PATH : ${pkgs.lib.makeBinPath nativeBuildInputs}"
"--prefix GI_TYPELIB_PATH : ${GI_TYPELIB_PATH}"

View file

@ -36,9 +36,7 @@ def main() -> None:
theme = rich.terminal_theme.TerminalTheme(
base16_color(0x0),
base16_color(
0x0
), # TODO should be 7, currently 0 so it's compatible with v2
base16_color(0x7),
[
base16_color(0x0), # black
base16_color(0x8), # red
@ -68,7 +66,7 @@ def main() -> None:
workspaces_suffixes = "▲■"
workspaces_names = {
str(i + 1): f"{i+1} {c}" for i, c in enumerate(workspaces_suffixes)
str(i + 1): f"{i + 1} {c}" for i, c in enumerate(workspaces_suffixes)
}
color = rich.color.Color.parse

View file

@ -4,6 +4,7 @@ import datetime
import enum
import logging
import signal
import sys
import typing
import gi
@ -62,6 +63,10 @@ def clip(text: str, length: int = 30) -> str:
return text
def color_0x(color: rich.color.ColorTriplet) -> str:
return f"0x{color.red:02X}{color.green:02X}{color.blue:02X}"
class ComposableText(typing.Generic[P, C]):
def __init__(
self,
@ -73,6 +78,7 @@ class ComposableText(typing.Generic[P, C]):
self.sortKey = sort_key
if parent:
self.set_parent(parent)
self.screen = self.get_first_parent_of_type(Screen)
self.bar = self.get_first_parent_of_type(Bar)
def set_parent(self, parent: P) -> None:
@ -92,12 +98,14 @@ class ComposableText(typing.Generic[P, C]):
def get_first_parent_of_type(self, typ: type[T]) -> T:
parent = self
while not isinstance(parent, typ):
assert parent.parent, f"{self} doesn't have a parent of {typ}"
if not parent.parent:
msg = f"{self} doesn't have a parent of {typ}"
raise RuntimeError(msg)
parent = parent.parent
return parent
def update_markup(self) -> None:
self.bar.refresh.set()
self.parent.update_markup()
# TODO OPTI See if worth caching the output
def generate_markup(self) -> str:
@ -191,23 +199,24 @@ class Section(ComposableText):
else:
self.animationTask = self.bar.taskGroup.create_task(self.animate())
def set_action(
self, button: Button, callback: typing.Callable | None
) -> None:
def set_action(self, button: Button, callback: typing.Callable | None) -> None:
if button in self.actions:
command = self.actions[button]
self.bar.remove_action(command)
del self.actions[button]
if callback:
command = self.bar.add_action(callback)
command = self.screen.add_action(callback)
self.actions[button] = command
def generate_markup(self) -> str:
assert not self.is_hidden()
pad = max(0, self.size - len(self.text))
text = self.text[: self.size] + " " * pad
for button, command in self.actions.items():
text = "%{A" + button.value + ":" + command + ":}" + text + "%{A}"
if text:
for button, command in self.actions.items():
# TODO zelbar doesn't support other button types
if button == Button.CLICK_LEFT:
text = "%{A:" + command + "}" + text
return text
@ -239,8 +248,8 @@ class Module(ComposableText):
class Alignment(enum.Enum):
LEFT = "l"
RIGHT = "r"
CENTER = "c"
RIGHT = "r"
class Side(ComposableText):
@ -255,38 +264,64 @@ class Side(ComposableText):
def generate_markup(self) -> str:
if not self.children:
return ""
text = "%{" + self.alignment.value + "}"
markup = ""
last_section: Section | None = None
default = self.bar.theme.background_color
current = default # Fallback value
def text(
text: str,
bg: rich.color.ColorTriplet = default,
fg: rich.color.ColorTriplet = default,
) -> None:
if not text:
return ""
return (
"%{F:"
+ color_0x(fg)
+ "}%{B:"
+ color_0x(bg)
+ "}%{"
+ self.alignment.value
+ "}"
+ text
)
for module in self.children:
for section in module.get_sections():
if section.is_hidden():
continue
hexa = section.color.get_truecolor(theme=self.bar.theme).hex
current = section.color.get_truecolor(theme=self.bar.theme)
if last_section is None:
text += (
"%{B" + hexa + "}%{F-}"
if self.alignment == Alignment.LEFT
else "%{B-}%{F" + hexa + "}%{R}%{F-}"
markup += (
text("", default, current)
if self.alignment != Alignment.LEFT
else ""
)
elif isinstance(last_section, SpacerSection):
text += "%{B-}%{F" + hexa + "}%{R}%{F-}"
markup += text("", default, current)
elif last_section.color == section.color:
markup += text(
"" if self.alignment == Alignment.RIGHT else "",
current,
default,
)
else:
if self.alignment == Alignment.RIGHT:
text += (
""
if last_section.color == section.color
else "%{F" + hexa + "}%{R}"
)
elif last_section.color == section.color:
text += ""
else:
text += "%{R}%{B" + hexa + "}"
text += "%{F-}"
text += section.get_markup()
lastone = last_section.color.get_truecolor(theme=self.bar.theme)
markup += (
text("", lastone, current)
if self.alignment == Alignment.RIGHT
else text("", current, lastone)
)
markup += text(section.get_markup(), current, default)
last_section = section
if self.alignment != Alignment.RIGHT and last_section:
text += "%{R}%{B-}"
return text
markup += (
text("", default, current)
if self.alignment != Alignment.RIGHT and last_section
else ""
)
return markup
class Screen(ComposableText):
@ -296,15 +331,79 @@ class Screen(ComposableText):
self.children: typing.MutableSequence[Side]
self.output = output
self.refresh = asyncio.Event()
self.actionIndex = 0
self.actions: dict[str, typing.Callable] = {}
for alignment in Alignment:
Side(parent=self, alignment=alignment)
def update_markup(self) -> str:
self.screen.refresh.set()
def generate_markup(self) -> str:
return ("%{Sn" + self.output + "}") + "".join(
side.get_markup() for side in self.children
return "".join(side.get_markup() for side in self.children) + "\n"
async def run(self) -> None:
cmd = [
"zelbar",
"-btm",
"-g",
"0:20",
"-fn",
"DejaVuSansM Nerd Font:size=13",
"-F",
color_0x(self.bar.theme.foreground_color),
"-B",
color_0x(self.bar.theme.background_color),
"-o",
self.output,
]
print(" ".join(cmd))
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stdin=asyncio.subprocess.PIPE,
)
async def refresher() -> None:
assert proc.stdin
while True:
await self.refresh.wait()
self.refresh.clear()
markup = self.get_markup()
# sys.stdout.write(markup) # DEBUG
proc.stdin.write(markup.encode())
async def action_handler() -> None:
assert proc.stdout
while True:
line = await proc.stdout.readline()
try:
command = line.decode().strip()
except UnicodeDecodeError:
# FIXME zelbar seems to have some memory issues
log.exception("Not unicode: %s", str(line))
continue
callback = self.actions.get(command)
if callback is None:
log.error("Unknown command: %s", command)
continue
callback()
self.bar.add_long_running_task(refresher())
self.bar.add_long_running_task(action_handler())
def add_action(self, callback: typing.Callable) -> str:
command = f"com{self.actionIndex:x}"
self.actions[command] = callback
self.actionIndex += 1
return command
def remove_action(self, command: str) -> None:
del self.actions[command]
RICH_DEFAULT_THEME = rich.terminal_theme.DEFAULT_TERMINAL_THEME
@ -322,11 +421,8 @@ class Bar(ComposableText):
self.longRunningTasks: list[asyncio.Task] = []
self.theme = theme
self.refresh = asyncio.Event()
self.taskGroup = asyncio.TaskGroup()
self.providers: list[Provider] = []
self.actionIndex = 0
self.actions: dict[str, typing.Callable] = {}
self.periodicProviderTask: typing.Coroutine | None = None
@ -343,45 +439,9 @@ class Bar(ComposableText):
self.longRunningTasks.append(task)
async def run(self) -> None:
cmd = [
"lemonbar",
"-b",
"-a",
"64",
"-f",
"DejaVuSansM Nerd Font:size=10",
"-F",
self.theme.foreground_color.hex,
"-B",
self.theme.background_color.hex,
]
proc = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE
)
async def refresher() -> None:
assert proc.stdin
while True:
await self.refresh.wait()
self.refresh.clear()
markup = self.get_markup()
proc.stdin.write(markup.encode())
async def action_handler() -> None:
assert proc.stdout
while True:
line = await proc.stdout.readline()
command = line.decode().strip()
callback = self.actions.get(command)
if callback is None:
# In some conditions on start it's empty
log.error("Unknown command: %s", command)
return
callback()
async with self.taskGroup:
self.add_long_running_task(refresher())
self.add_long_running_task(action_handler())
for screen in self.children:
await screen.run()
for provider in self.providers:
self.add_long_running_task(provider.run())
@ -393,9 +453,6 @@ class Bar(ComposableText):
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGINT, finish)
def generate_markup(self) -> str:
return "".join(screen.get_markup() for screen in self.children) + "\n"
def add_provider(
self,
provider: "Provider",
@ -405,24 +462,13 @@ class Bar(ComposableText):
modules = []
for s, screen in enumerate(self.children):
if screen_num is None or s == screen_num:
side = next(
filter(lambda s: s.alignment == alignment, screen.children)
)
side = next(filter(lambda s: s.alignment == alignment, screen.children))
module = Module(parent=side)
modules.append(module)
provider.modules = modules
if modules:
self.providers.append(provider)
def add_action(self, callback: typing.Callable) -> str:
command = f"{self.actionIndex:x}"
self.actions[command] = callback
self.actionIndex += 1
return command
def remove_action(self, command: str) -> None:
del self.actions[command]
def launch(self) -> None:
# Using GLib's event loop so we can run GLib's code
policy = gi.events.GLibEventLoopPolicy()
@ -430,6 +476,9 @@ class Bar(ComposableText):
loop = policy.get_event_loop()
loop.run_until_complete(self.run())
def update_markup(self) -> None:
pass
class Provider:
section_type: type[Section] = Section
@ -462,9 +511,7 @@ class SingleSectionProvider(MirrorProvider):
class StaticProvider(SingleSectionProvider):
def __init__(
self, text: str, color: rich.color.Color = RICH_DEFAULT_COLOR
) -> None:
def __init__(self, text: str, color: rich.color.Color = RICH_DEFAULT_COLOR) -> None:
super().__init__(color=color)
self.text = text
@ -524,9 +571,7 @@ class StatefulSectionProvider(Provider):
section_type = StatefulSection
class SingleStatefulSectionProvider(
StatefulSectionProvider, SingleSectionProvider
):
class SingleStatefulSectionProvider(StatefulSectionProvider, SingleSectionProvider):
section: StatefulSection
@ -545,9 +590,7 @@ class MultiSectionsProvider(Provider):
async def do_nothing() -> None:
pass
async def update_sections(
self, sections: set[Sortable], module: Module
) -> None:
async def update_sections(self, sections: set[Sortable], module: Module) -> None:
module_sections = self.sectionKeys[module]
async with asyncio.TaskGroup() as tg:
for sort_key in sections:
@ -556,9 +599,7 @@ class MultiSectionsProvider(Provider):
section = self.section_type(
parent=module, sort_key=sort_key, color=self.color
)
self.updaters[section] = await self.get_section_updater(
section
)
self.updaters[section] = await self.get_section_updater(section)
module_sections[sort_key] = section
updater = self.updaters[section]
@ -608,9 +649,7 @@ class PeriodicProvider(Provider):
bar.add_long_running_task(bar.periodicProviderTask)
class PeriodicStatefulProvider(
SingleStatefulSectionProvider, PeriodicProvider
):
class PeriodicStatefulProvider(SingleStatefulSectionProvider, PeriodicProvider):
async def run(self) -> None:
await super().run()
self.section.set_changed_state(self.loop)

View file

@ -6,25 +6,25 @@
}:
{
config = lib.mkIf config.frogeye.desktop.xorg {
xsession.windowManager.i3.config.bars = [ ];
wayland.windowManager.sway.config.bars = [ ];
programs.autorandr.hooks.postswitch = {
frobar = "${pkgs.systemd}/bin/systemctl --user restart frobar";
};
systemd.user.services.frobar = {
Unit = {
Description = "frobar";
After = [ "graphical-session-pre.target" ];
PartOf = [ "graphical-session.target" ];
After = [ "kanshi.service" ];
PartOf = [ "kanshi.service" ];
};
Service = {
# Wait for i3 to start. Can't use ExecStartPre because otherwise it blocks graphical-session.target, and there's nothing i3/systemd
# TODO Do that better
ExecStart = ''${pkgs.bash}/bin/bash -c "while ! ${pkgs.i3}/bin/i3-msg; do ${pkgs.coreutils}/bin/sleep 1; done; ${pkgs.callPackage ./. { }}/bin/frobar"'';
ExecStart = ''${pkgs.bash}/bin/bash -c "while ! ${pkgs.sway}/bin/swaymsg; do ${pkgs.coreutils}/bin/sleep 1; done; ${pkgs.callPackage ./. { }}/bin/frobar"'';
};
Install = {
WantedBy = [ "graphical-session.target" ];
WantedBy = [ "kanshi.service" ];
};
};
};