frobar: Now version 3!
This commit is contained in:
parent
42d3d1b3a6
commit
9adfcd2377
File diff suppressed because it is too large
Load diff
|
@ -22,18 +22,13 @@ in
|
|||
# is called pyton-mpd2 on PyPi but mpd2 in nixpkgs.
|
||||
pkgs.python3Packages.buildPythonApplication rec {
|
||||
pname = "frobar";
|
||||
version = "2.0";
|
||||
version = "3.0";
|
||||
|
||||
propagatedBuildInputs = with pkgs.python3Packages; [
|
||||
coloredlogs # old only
|
||||
i3ipc
|
||||
mpd2
|
||||
notmuch
|
||||
psutil
|
||||
pulsectl-asyncio
|
||||
pulsectl # old only
|
||||
pygobject3
|
||||
pyinotify
|
||||
rich
|
||||
];
|
||||
nativeBuildInputs =
|
||||
|
@ -42,9 +37,15 @@ pkgs.python3Packages.buildPythonApplication rec {
|
|||
wirelesstools
|
||||
playerctl
|
||||
]);
|
||||
makeWrapperArgs = [ "--prefix PATH : ${pkgs.lib.makeBinPath nativeBuildInputs}" ];
|
||||
makeWrapperArgs = [
|
||||
"--prefix PATH : ${pkgs.lib.makeBinPath nativeBuildInputs}"
|
||||
"--prefix GI_TYPELIB_PATH : ${GI_TYPELIB_PATH}"
|
||||
];
|
||||
|
||||
GI_TYPELIB_PATH = pkgs.lib.makeSearchPath "lib/girepository-1.0" [ pkgs.glib.out pkgs.playerctl ];
|
||||
GI_TYPELIB_PATH = pkgs.lib.makeSearchPath "lib/girepository-1.0" [
|
||||
pkgs.glib.out
|
||||
pkgs.playerctl
|
||||
];
|
||||
|
||||
src = ./.;
|
||||
}
|
||||
|
|
|
@ -1,77 +1,146 @@
|
|||
#!/usr/bin/env python3
|
||||
import rich.color
|
||||
import rich.logging
|
||||
import rich.terminal_theme
|
||||
|
||||
from frobar import providers as fp
|
||||
from frobar.display import Bar, BarGroupType
|
||||
from frobar.updaters import Updater
|
||||
|
||||
# TODO If multiple screen, expand the sections and share them
|
||||
# TODO Graceful exit
|
||||
import frobar.common
|
||||
import frobar.providers
|
||||
from frobar.common import Alignment
|
||||
|
||||
|
||||
def run() -> None:
|
||||
Bar.init()
|
||||
Updater.init()
|
||||
def main() -> None:
|
||||
# TODO Configurable
|
||||
FROGARIZED = [
|
||||
"#092c0e",
|
||||
"#143718",
|
||||
"#5a7058",
|
||||
"#677d64",
|
||||
"#89947f",
|
||||
"#99a08d",
|
||||
"#fae2e3",
|
||||
"#fff0f1",
|
||||
"#e0332e",
|
||||
"#cf4b15",
|
||||
"#bb8801",
|
||||
"#8d9800",
|
||||
"#1fa198",
|
||||
"#008dd1",
|
||||
"#5c73c4",
|
||||
"#d43982",
|
||||
]
|
||||
# TODO Not super happy with the color management,
|
||||
# while using an existing library is great, it's limited to ANSI colors
|
||||
|
||||
# Bar.addSectionAll(fp.CpuProvider(), BarGroupType.RIGHT)
|
||||
# Bar.addSectionAll(fp.NetworkProvider(theme=2), BarGroupType.RIGHT)
|
||||
def base16_color(color: int) -> tuple[int, int, int]:
|
||||
hexa = FROGARIZED[color]
|
||||
return tuple(rich.color.parse_rgb_hex(hexa[1:]))
|
||||
|
||||
WORKSPACE_THEME = 8
|
||||
FOCUS_THEME = 2
|
||||
URGENT_THEME = 0
|
||||
CUSTOM_SUFFIXES = "▲■"
|
||||
|
||||
customNames = dict()
|
||||
for i in range(len(CUSTOM_SUFFIXES)):
|
||||
short = str(i + 1)
|
||||
full = short + " " + CUSTOM_SUFFIXES[i]
|
||||
customNames[short] = full
|
||||
Bar.addSectionAll(
|
||||
fp.I3WorkspacesProvider(
|
||||
theme=WORKSPACE_THEME,
|
||||
themeFocus=FOCUS_THEME,
|
||||
themeUrgent=URGENT_THEME,
|
||||
themeMode=URGENT_THEME,
|
||||
customNames=customNames,
|
||||
),
|
||||
BarGroupType.LEFT,
|
||||
theme = rich.terminal_theme.TerminalTheme(
|
||||
base16_color(0x0),
|
||||
base16_color(0x0), # TODO should be 7, currently 0 so it's compatible with v2
|
||||
[
|
||||
base16_color(0x0), # black
|
||||
base16_color(0x8), # red
|
||||
base16_color(0xB), # green
|
||||
base16_color(0xA), # yellow
|
||||
base16_color(0xD), # blue
|
||||
base16_color(0xE), # magenta
|
||||
base16_color(0xC), # cyan
|
||||
base16_color(0x5), # white
|
||||
],
|
||||
[
|
||||
base16_color(0x3), # bright black
|
||||
base16_color(0x8), # bright red
|
||||
base16_color(0xB), # bright green
|
||||
base16_color(0xA), # bright yellow
|
||||
base16_color(0xD), # bright blue
|
||||
base16_color(0xE), # bright magenta
|
||||
base16_color(0xC), # bright cyan
|
||||
base16_color(0x7), # bright white
|
||||
],
|
||||
)
|
||||
|
||||
# TODO Middle
|
||||
Bar.addSectionAll(fp.MprisProvider(theme=9), BarGroupType.LEFT)
|
||||
# Bar.addSectionAll(fp.MpdProvider(theme=9), BarGroupType.LEFT)
|
||||
# Bar.addSectionAll(I3WindowTitleProvider(), BarGroupType.LEFT)
|
||||
bar = frobar.common.Bar(theme=theme)
|
||||
dualScreen = len(bar.children) > 1
|
||||
leftPreferred = 0 if dualScreen else None
|
||||
rightPreferred = 1 if dualScreen else None
|
||||
|
||||
# TODO Computer modes
|
||||
workspaces_suffixes = "▲■"
|
||||
workspaces_names = dict(
|
||||
(str(i + 1), f"{i+1} {c}") for i, c in enumerate(workspaces_suffixes)
|
||||
)
|
||||
|
||||
Bar.addSectionAll(fp.CpuProvider(), BarGroupType.RIGHT)
|
||||
Bar.addSectionAll(fp.LoadProvider(), BarGroupType.RIGHT)
|
||||
Bar.addSectionAll(fp.RamProvider(), BarGroupType.RIGHT)
|
||||
Bar.addSectionAll(fp.TemperatureProvider(), BarGroupType.RIGHT)
|
||||
Bar.addSectionAll(fp.BatteryProvider(), BarGroupType.RIGHT)
|
||||
color = rich.color.Color.parse
|
||||
|
||||
# Peripherals
|
||||
PERIPHERAL_THEME = 6
|
||||
NETWORK_THEME = 5
|
||||
# TODO Disk space provider
|
||||
# TODO Screen (connected, autorandr configuration, bbswitch) provider
|
||||
Bar.addSectionAll(fp.XautolockProvider(theme=PERIPHERAL_THEME), BarGroupType.RIGHT)
|
||||
Bar.addSectionAll(fp.PulseaudioProvider(theme=PERIPHERAL_THEME), BarGroupType.RIGHT)
|
||||
Bar.addSectionAll(fp.RfkillProvider(theme=PERIPHERAL_THEME), BarGroupType.RIGHT)
|
||||
Bar.addSectionAll(fp.NetworkProvider(theme=NETWORK_THEME), BarGroupType.RIGHT)
|
||||
bar.addProvider(
|
||||
frobar.providers.I3ModeProvider(color=color("red")), alignment=Alignment.LEFT
|
||||
)
|
||||
bar.addProvider(
|
||||
frobar.providers.I3WorkspacesProvider(custom_names=workspaces_names),
|
||||
alignment=Alignment.LEFT,
|
||||
)
|
||||
|
||||
# Personal
|
||||
# PERSONAL_THEME = 7
|
||||
# Bar.addSectionAll(fp.KeystoreProvider(theme=PERSONAL_THEME), BarGroupType.RIGHT)
|
||||
# Bar.addSectionAll(
|
||||
# fp.NotmuchUnreadProvider(dir="~/.mail/", theme=PERSONAL_THEME),
|
||||
# BarGroupType.RIGHT,
|
||||
# )
|
||||
# Bar.addSectionAll(
|
||||
# fp.TodoProvider(dir="~/.vdirsyncer/currentCalendars/", theme=PERSONAL_THEME),
|
||||
# BarGroupType.RIGHT,
|
||||
# )
|
||||
if dualScreen:
|
||||
bar.addProvider(
|
||||
frobar.providers.I3WindowTitleProvider(color=color("white")),
|
||||
screenNum=0,
|
||||
alignment=Alignment.CENTER,
|
||||
)
|
||||
bar.addProvider(
|
||||
frobar.providers.MprisProvider(color=color("bright_white")),
|
||||
screenNum=rightPreferred,
|
||||
alignment=Alignment.CENTER,
|
||||
)
|
||||
else:
|
||||
bar.addProvider(
|
||||
frobar.common.SpacerProvider(),
|
||||
alignment=Alignment.LEFT,
|
||||
)
|
||||
bar.addProvider(
|
||||
frobar.providers.MprisProvider(color=color("bright_white")),
|
||||
alignment=Alignment.LEFT,
|
||||
)
|
||||
|
||||
TIME_THEME = 4
|
||||
Bar.addSectionAll(fp.TimeProvider(theme=TIME_THEME), BarGroupType.RIGHT)
|
||||
bar.addProvider(
|
||||
frobar.providers.CpuProvider(),
|
||||
screenNum=leftPreferred,
|
||||
alignment=Alignment.RIGHT,
|
||||
)
|
||||
bar.addProvider(
|
||||
frobar.providers.LoadProvider(),
|
||||
screenNum=leftPreferred,
|
||||
alignment=Alignment.RIGHT,
|
||||
)
|
||||
bar.addProvider(
|
||||
frobar.providers.RamProvider(),
|
||||
screenNum=leftPreferred,
|
||||
alignment=Alignment.RIGHT,
|
||||
)
|
||||
bar.addProvider(
|
||||
frobar.providers.TemperatureProvider(),
|
||||
screenNum=leftPreferred,
|
||||
alignment=Alignment.RIGHT,
|
||||
)
|
||||
bar.addProvider(
|
||||
frobar.providers.BatteryProvider(),
|
||||
screenNum=leftPreferred,
|
||||
alignment=Alignment.RIGHT,
|
||||
)
|
||||
bar.addProvider(
|
||||
frobar.providers.PulseaudioProvider(color=color("magenta")),
|
||||
screenNum=rightPreferred,
|
||||
alignment=Alignment.RIGHT,
|
||||
)
|
||||
bar.addProvider(
|
||||
frobar.providers.NetworkProvider(color=color("blue")),
|
||||
screenNum=leftPreferred,
|
||||
alignment=Alignment.RIGHT,
|
||||
)
|
||||
bar.addProvider(
|
||||
frobar.providers.TimeProvider(color=color("cyan")), alignment=Alignment.RIGHT
|
||||
)
|
||||
|
||||
# Bar.run()
|
||||
bar.launch()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
|
|
@ -1,5 +1,616 @@
|
|||
#!/usr/bin/env python3
|
||||
import asyncio
|
||||
import collections
|
||||
import datetime
|
||||
import enum
|
||||
import logging
|
||||
import signal
|
||||
import typing
|
||||
|
||||
import threading
|
||||
import gi
|
||||
import gi.events
|
||||
import gi.repository.GLib
|
||||
import i3ipc
|
||||
import i3ipc.aio
|
||||
import rich.color
|
||||
import rich.logging
|
||||
import rich.terminal_theme
|
||||
|
||||
notBusy = threading.Event()
|
||||
logging.basicConfig(
|
||||
level="DEBUG",
|
||||
format="%(message)s",
|
||||
datefmt="[%X]",
|
||||
handlers=[rich.logging.RichHandler()],
|
||||
)
|
||||
log = logging.getLogger("frobar")
|
||||
|
||||
T = typing.TypeVar("T", bound="ComposableText")
|
||||
P = typing.TypeVar("P", bound="ComposableText")
|
||||
C = typing.TypeVar("C", bound="ComposableText")
|
||||
Sortable = str | int
|
||||
|
||||
# Display utilities
|
||||
|
||||
|
||||
def humanSize(numi: int) -> str:
|
||||
"""
|
||||
Returns a string of width 3+3
|
||||
"""
|
||||
num = float(numi)
|
||||
for unit in ("B ", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB"):
|
||||
if abs(num) < 1000:
|
||||
if num >= 10:
|
||||
return f"{int(num):3d}{unit}"
|
||||
else:
|
||||
return f"{num:.1f}{unit}"
|
||||
num /= 1024
|
||||
return f"{numi:d}YiB"
|
||||
|
||||
|
||||
def ramp(p: float, states: str = " ▁▂▃▄▅▆▇█") -> str:
|
||||
if p < 0:
|
||||
return ""
|
||||
d, m = divmod(p, 1.0)
|
||||
return states[-1] * int(d) + states[round(m * (len(states) - 1))]
|
||||
|
||||
|
||||
def clip(text: str, length: int = 30) -> str:
|
||||
if len(text) > length:
|
||||
text = text[: length - 1] + "…"
|
||||
return text
|
||||
|
||||
|
||||
class ComposableText(typing.Generic[P, C]):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
parent: typing.Optional[P] = None,
|
||||
sortKey: Sortable = 0,
|
||||
) -> None:
|
||||
self.parent: typing.Optional[P] = None
|
||||
self.children: typing.MutableSequence[C] = list()
|
||||
self.sortKey = sortKey
|
||||
if parent:
|
||||
self.setParent(parent)
|
||||
self.bar = self.getFirstParentOfType(Bar)
|
||||
|
||||
def setParent(self, parent: P) -> None:
|
||||
assert self.parent is None
|
||||
parent.children.append(self)
|
||||
assert isinstance(parent.children, list)
|
||||
parent.children.sort(key=lambda c: c.sortKey)
|
||||
self.parent = parent
|
||||
self.parent.updateMarkup()
|
||||
|
||||
def unsetParent(self) -> None:
|
||||
assert self.parent
|
||||
self.parent.children.remove(self)
|
||||
self.parent.updateMarkup()
|
||||
self.parent = None
|
||||
|
||||
def getFirstParentOfType(self, typ: typing.Type[T]) -> T:
|
||||
parent = self
|
||||
while not isinstance(parent, typ):
|
||||
assert parent.parent, f"{self} doesn't have a parent of {typ}"
|
||||
parent = parent.parent
|
||||
return parent
|
||||
|
||||
def updateMarkup(self) -> None:
|
||||
self.bar.refresh.set()
|
||||
# TODO OPTI See if worth caching the output
|
||||
|
||||
def generateMarkup(self) -> str:
|
||||
raise NotImplementedError(f"{self} cannot generate markup")
|
||||
|
||||
def getMarkup(self) -> str:
|
||||
return self.generateMarkup()
|
||||
|
||||
|
||||
class Button(enum.Enum):
|
||||
CLICK_LEFT = "1"
|
||||
CLICK_MIDDLE = "2"
|
||||
CLICK_RIGHT = "3"
|
||||
SCROLL_UP = "4"
|
||||
SCROLL_DOWN = "5"
|
||||
|
||||
|
||||
class Section(ComposableText):
|
||||
"""
|
||||
Colorable block separated by chevrons
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
parent: "Module",
|
||||
sortKey: Sortable = 0,
|
||||
color: rich.color.Color = rich.color.Color.default(),
|
||||
) -> None:
|
||||
super().__init__(parent=parent, sortKey=sortKey)
|
||||
self.parent: "Module"
|
||||
self.color = color
|
||||
|
||||
self.desiredText: str | None = None
|
||||
self.text = ""
|
||||
self.targetSize = -1
|
||||
self.size = -1
|
||||
self.animationTask: asyncio.Task | None = None
|
||||
self.actions: dict[Button, str] = dict()
|
||||
|
||||
def isHidden(self) -> bool:
|
||||
return self.size < 0
|
||||
|
||||
# Geometric series, with a cap
|
||||
ANIM_A = 0.025
|
||||
ANIM_R = 0.9
|
||||
ANIM_MIN = 0.001
|
||||
|
||||
async def animate(self) -> None:
|
||||
increment = 1 if self.size < self.targetSize else -1
|
||||
loop = asyncio.get_running_loop()
|
||||
frameTime = loop.time()
|
||||
animTime = self.ANIM_A
|
||||
skipped = 0
|
||||
|
||||
while self.size != self.targetSize:
|
||||
self.size += increment
|
||||
self.updateMarkup()
|
||||
|
||||
animTime *= self.ANIM_R
|
||||
animTime = max(self.ANIM_MIN, animTime)
|
||||
frameTime += animTime
|
||||
sleepTime = frameTime - loop.time()
|
||||
|
||||
# In case of stress, skip refreshing by not awaiting
|
||||
if sleepTime > 0:
|
||||
if skipped > 0:
|
||||
log.warning(f"Skipped {skipped} animation frame(s)")
|
||||
skipped = 0
|
||||
await asyncio.sleep(sleepTime)
|
||||
else:
|
||||
skipped += 1
|
||||
|
||||
def setText(self, text: str | None) -> None:
|
||||
# OPTI Don't redraw nor reset animation if setting the same text
|
||||
if self.desiredText == text:
|
||||
return
|
||||
self.desiredText = text
|
||||
if text is None:
|
||||
self.text = ""
|
||||
self.targetSize = -1
|
||||
else:
|
||||
self.text = f" {text} "
|
||||
self.targetSize = len(self.text)
|
||||
if self.animationTask:
|
||||
self.animationTask.cancel()
|
||||
# OPTI Skip the whole animation task if not required
|
||||
if self.size == self.targetSize:
|
||||
self.updateMarkup()
|
||||
else:
|
||||
self.animationTask = self.bar.taskGroup.create_task(self.animate())
|
||||
|
||||
def setAction(self, button: Button, callback: typing.Callable | None) -> None:
|
||||
if button in self.actions:
|
||||
command = self.actions[button]
|
||||
self.bar.removeAction(command)
|
||||
del self.actions[button]
|
||||
if callback:
|
||||
command = self.bar.addAction(callback)
|
||||
self.actions[button] = command
|
||||
|
||||
def generateMarkup(self) -> str:
|
||||
assert not self.isHidden()
|
||||
pad = max(0, self.size - len(self.text))
|
||||
text = self.text[: self.size] + " " * pad
|
||||
for button, command in self.actions.items():
|
||||
text = "%{A" + button.value + ":" + command + ":}" + text + "%{A}"
|
||||
return text
|
||||
|
||||
|
||||
class Module(ComposableText):
|
||||
"""
|
||||
Sections handled by a same updater
|
||||
"""
|
||||
|
||||
def __init__(self, parent: "Side") -> None:
|
||||
super().__init__(parent=parent)
|
||||
self.parent: "Side"
|
||||
self.children: typing.MutableSequence[Section]
|
||||
|
||||
self.mirroring: Module | None = None
|
||||
self.mirrors: list[Module] = list()
|
||||
|
||||
def mirror(self, module: "Module") -> None:
|
||||
self.mirroring = module
|
||||
module.mirrors.append(self)
|
||||
|
||||
def getSections(self) -> typing.Sequence[Section]:
|
||||
if self.mirroring:
|
||||
return self.mirroring.children
|
||||
else:
|
||||
return self.children
|
||||
|
||||
def updateMarkup(self) -> None:
|
||||
super().updateMarkup()
|
||||
for mirror in self.mirrors:
|
||||
mirror.updateMarkup()
|
||||
|
||||
|
||||
class Alignment(enum.Enum):
|
||||
LEFT = "l"
|
||||
RIGHT = "r"
|
||||
CENTER = "c"
|
||||
|
||||
|
||||
class Side(ComposableText):
|
||||
def __init__(self, parent: "Screen", alignment: Alignment) -> None:
|
||||
super().__init__(parent=parent)
|
||||
self.parent: Screen
|
||||
self.children: typing.MutableSequence[Module] = []
|
||||
|
||||
self.alignment = alignment
|
||||
self.bar = parent.getFirstParentOfType(Bar)
|
||||
|
||||
def generateMarkup(self) -> str:
|
||||
if not self.children:
|
||||
return ""
|
||||
text = "%{" + self.alignment.value + "}"
|
||||
lastSection: Section | None = None
|
||||
for module in self.children:
|
||||
for section in module.getSections():
|
||||
if section.isHidden():
|
||||
continue
|
||||
hexa = section.color.get_truecolor(theme=self.bar.theme).hex
|
||||
if lastSection is None:
|
||||
if self.alignment == Alignment.LEFT:
|
||||
text += "%{B" + hexa + "}%{F-}"
|
||||
else:
|
||||
text += "%{B-}%{F" + hexa + "}%{R}%{F-}"
|
||||
elif isinstance(lastSection, SpacerSection):
|
||||
text += "%{B-}%{F" + hexa + "}%{R}%{F-}"
|
||||
else:
|
||||
if self.alignment == Alignment.RIGHT:
|
||||
if lastSection.color == section.color:
|
||||
text += ""
|
||||
else:
|
||||
text += "%{F" + hexa + "}%{R}"
|
||||
else:
|
||||
if lastSection.color == section.color:
|
||||
text += ""
|
||||
else:
|
||||
text += "%{R}%{B" + hexa + "}"
|
||||
text += "%{F-}"
|
||||
text += section.getMarkup()
|
||||
lastSection = section
|
||||
if self.alignment != Alignment.RIGHT and lastSection:
|
||||
text += "%{R}%{B-}"
|
||||
return text
|
||||
|
||||
|
||||
class Screen(ComposableText):
|
||||
def __init__(self, parent: "Bar", output: str) -> None:
|
||||
super().__init__(parent=parent)
|
||||
self.parent: "Bar"
|
||||
self.children: typing.MutableSequence[Side]
|
||||
|
||||
self.output = output
|
||||
|
||||
for alignment in Alignment:
|
||||
Side(parent=self, alignment=alignment)
|
||||
|
||||
def generateMarkup(self) -> str:
|
||||
return ("%{Sn" + self.output + "}") + "".join(
|
||||
side.getMarkup() for side in self.children
|
||||
)
|
||||
|
||||
|
||||
class Bar(ComposableText):
|
||||
"""
|
||||
Top-level
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
theme: rich.terminal_theme.TerminalTheme = rich.terminal_theme.DEFAULT_TERMINAL_THEME,
|
||||
) -> None:
|
||||
super().__init__()
|
||||
self.parent: None
|
||||
self.children: typing.MutableSequence[Screen]
|
||||
self.longRunningTasks: list[asyncio.Task] = list()
|
||||
self.theme = theme
|
||||
|
||||
self.refresh = asyncio.Event()
|
||||
self.taskGroup = asyncio.TaskGroup()
|
||||
self.providers: list["Provider"] = list()
|
||||
self.actionIndex = 0
|
||||
self.actions: dict[str, typing.Callable] = dict()
|
||||
|
||||
self.periodicProviderTask: typing.Coroutine | None = None
|
||||
|
||||
i3 = i3ipc.Connection()
|
||||
for output in i3.get_outputs():
|
||||
if not output.active:
|
||||
continue
|
||||
Screen(parent=self, output=output.name)
|
||||
|
||||
def addLongRunningTask(self, coro: typing.Coroutine) -> None:
|
||||
task = self.taskGroup.create_task(coro)
|
||||
self.longRunningTasks.append(task)
|
||||
|
||||
async def run(self) -> None:
|
||||
cmd = [
|
||||
"lemonbar",
|
||||
"-b",
|
||||
"-a",
|
||||
"64",
|
||||
"-f",
|
||||
"DejaVuSansM Nerd Font:size=10",
|
||||
"-F",
|
||||
self.theme.foreground_color.hex,
|
||||
"-B",
|
||||
self.theme.background_color.hex,
|
||||
]
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
*cmd, stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE
|
||||
)
|
||||
|
||||
async def refresher() -> None:
|
||||
assert proc.stdin
|
||||
while True:
|
||||
await self.refresh.wait()
|
||||
self.refresh.clear()
|
||||
markup = self.getMarkup()
|
||||
proc.stdin.write(markup.encode())
|
||||
|
||||
async def actionHandler() -> None:
|
||||
assert proc.stdout
|
||||
while True:
|
||||
line = await proc.stdout.readline()
|
||||
command = line.decode().strip()
|
||||
callback = self.actions[command]
|
||||
callback()
|
||||
|
||||
async with self.taskGroup:
|
||||
self.addLongRunningTask(refresher())
|
||||
self.addLongRunningTask(actionHandler())
|
||||
for provider in self.providers:
|
||||
self.addLongRunningTask(provider.run())
|
||||
|
||||
def exit() -> None:
|
||||
log.info("Terminating")
|
||||
for task in self.longRunningTasks:
|
||||
task.cancel()
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
loop.add_signal_handler(signal.SIGINT, exit)
|
||||
|
||||
def generateMarkup(self) -> str:
|
||||
return "".join(screen.getMarkup() for screen in self.children) + "\n"
|
||||
|
||||
def addProvider(
|
||||
self,
|
||||
provider: "Provider",
|
||||
alignment: Alignment = Alignment.LEFT,
|
||||
screenNum: int | None = None,
|
||||
) -> None:
|
||||
"""
|
||||
screenNum: the provider will be added on this screen if set, all otherwise
|
||||
"""
|
||||
modules = list()
|
||||
for s, screen in enumerate(self.children):
|
||||
if screenNum is None or s == screenNum:
|
||||
side = next(filter(lambda s: s.alignment == alignment, screen.children))
|
||||
module = Module(parent=side)
|
||||
modules.append(module)
|
||||
provider.modules = modules
|
||||
if modules:
|
||||
self.providers.append(provider)
|
||||
|
||||
def addAction(self, callback: typing.Callable) -> str:
|
||||
command = f"{self.actionIndex:x}"
|
||||
self.actions[command] = callback
|
||||
self.actionIndex += 1
|
||||
return command
|
||||
|
||||
def removeAction(self, command: str) -> None:
|
||||
del self.actions[command]
|
||||
|
||||
def launch(self) -> None:
|
||||
# Using GLib's event loop so we can run GLib's code
|
||||
policy = gi.events.GLibEventLoopPolicy()
|
||||
asyncio.set_event_loop_policy(policy)
|
||||
loop = policy.get_event_loop()
|
||||
loop.run_until_complete(self.run())
|
||||
|
||||
|
||||
class Provider:
|
||||
sectionType: type[Section] = Section
|
||||
|
||||
def __init__(self, color: rich.color.Color = rich.color.Color.default()) -> None:
|
||||
self.modules: list[Module] = list()
|
||||
self.color = color
|
||||
|
||||
async def run(self) -> None:
|
||||
# Not a NotImplementedError, otherwise can't combine all classes
|
||||
pass
|
||||
|
||||
|
||||
class MirrorProvider(Provider):
|
||||
def __init__(self, color: rich.color.Color = rich.color.Color.default()) -> None:
|
||||
super().__init__(color=color)
|
||||
self.module: Module
|
||||
|
||||
async def run(self) -> None:
|
||||
await super().run()
|
||||
self.module = self.modules[0]
|
||||
for module in self.modules[1:]:
|
||||
module.mirror(self.module)
|
||||
|
||||
|
||||
class SingleSectionProvider(MirrorProvider):
|
||||
async def run(self) -> None:
|
||||
await super().run()
|
||||
self.section = self.sectionType(parent=self.module, color=self.color)
|
||||
|
||||
|
||||
class StaticProvider(SingleSectionProvider):
|
||||
def __init__(
|
||||
self, text: str, color: rich.color.Color = rich.color.Color.default()
|
||||
) -> None:
|
||||
super().__init__(color=color)
|
||||
self.text = text
|
||||
|
||||
async def run(self) -> None:
|
||||
await super().run()
|
||||
self.section.setText(self.text)
|
||||
|
||||
|
||||
class SpacerSection(Section):
|
||||
pass
|
||||
|
||||
|
||||
class SpacerProvider(SingleSectionProvider):
|
||||
sectionType = SpacerSection
|
||||
|
||||
def __init__(self, length: int = 5) -> None:
|
||||
super().__init__(color=rich.color.Color.default())
|
||||
self.length = length
|
||||
|
||||
async def run(self) -> None:
|
||||
await super().run()
|
||||
assert isinstance(self.section, SpacerSection)
|
||||
self.section.setText(" " * self.length)
|
||||
|
||||
|
||||
class StatefulSection(Section):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
parent: Module,
|
||||
sortKey: Sortable = 0,
|
||||
color: rich.color.Color = rich.color.Color.default(),
|
||||
) -> None:
|
||||
super().__init__(parent=parent, sortKey=sortKey, color=color)
|
||||
self.state = 0
|
||||
self.numberStates: int
|
||||
|
||||
self.setAction(Button.CLICK_LEFT, self.incrementState)
|
||||
self.setAction(Button.CLICK_RIGHT, self.decrementState)
|
||||
|
||||
def incrementState(self) -> None:
|
||||
self.state += 1
|
||||
self.changeState()
|
||||
|
||||
def decrementState(self) -> None:
|
||||
self.state -= 1
|
||||
self.changeState()
|
||||
|
||||
def setChangedState(self, callback: typing.Callable) -> None:
|
||||
self.callback = callback
|
||||
|
||||
def changeState(self) -> None:
|
||||
self.state %= self.numberStates
|
||||
self.bar.taskGroup.create_task(self.callback())
|
||||
|
||||
|
||||
class StatefulSectionProvider(Provider):
|
||||
sectionType = StatefulSection
|
||||
|
||||
|
||||
class SingleStatefulSectionProvider(StatefulSectionProvider, SingleSectionProvider):
|
||||
section: StatefulSection
|
||||
|
||||
|
||||
class MultiSectionsProvider(Provider):
|
||||
|
||||
def __init__(self, color: rich.color.Color = rich.color.Color.default()) -> None:
|
||||
super().__init__(color=color)
|
||||
self.sectionKeys: dict[Module, dict[Sortable, Section]] = (
|
||||
collections.defaultdict(dict)
|
||||
)
|
||||
self.updaters: dict[Section, typing.Callable] = dict()
|
||||
|
||||
async def getSectionUpdater(self, section: Section) -> typing.Callable:
|
||||
raise NotImplementedError()
|
||||
|
||||
async def updateSections(self, sections: set[Sortable], module: Module) -> None:
|
||||
moduleSections = self.sectionKeys[module]
|
||||
async with asyncio.TaskGroup() as tg:
|
||||
for sortKey in sections:
|
||||
section = moduleSections.get(sortKey)
|
||||
if not section:
|
||||
section = self.sectionType(
|
||||
parent=module, sortKey=sortKey, color=self.color
|
||||
)
|
||||
self.updaters[section] = await self.getSectionUpdater(section)
|
||||
moduleSections[sortKey] = section
|
||||
|
||||
updater = self.updaters[section]
|
||||
tg.create_task(updater())
|
||||
|
||||
missingKeys = set(moduleSections.keys()) - sections
|
||||
for missingKey in missingKeys:
|
||||
section = moduleSections.get(missingKey)
|
||||
assert section
|
||||
section.setText(None)
|
||||
|
||||
|
||||
class PeriodicProvider(Provider):
|
||||
async def init(self) -> None:
|
||||
pass
|
||||
|
||||
async def loop(self) -> None:
|
||||
raise NotImplementedError()
|
||||
|
||||
@classmethod
|
||||
async def task(cls, bar: Bar) -> None:
|
||||
providers = list()
|
||||
for provider in bar.providers:
|
||||
if isinstance(provider, PeriodicProvider):
|
||||
providers.append(provider)
|
||||
await provider.init()
|
||||
|
||||
while True:
|
||||
# TODO Block bar update during the periodic update of the loops
|
||||
loops = [provider.loop() for provider in providers]
|
||||
asyncio.gather(*loops)
|
||||
|
||||
now = datetime.datetime.now()
|
||||
# Hardcoded to 1 second... not sure if we want some more than that,
|
||||
# and if the logic to check if a task should run would be a win
|
||||
# compared to the task itself
|
||||
remaining = 1 - now.microsecond / 1000000
|
||||
await asyncio.sleep(remaining)
|
||||
|
||||
async def run(self) -> None:
|
||||
await super().run()
|
||||
for module in self.modules:
|
||||
bar = module.getFirstParentOfType(Bar)
|
||||
assert bar
|
||||
if not bar.periodicProviderTask:
|
||||
bar.periodicProviderTask = PeriodicProvider.task(bar)
|
||||
bar.addLongRunningTask(bar.periodicProviderTask)
|
||||
|
||||
|
||||
class PeriodicStatefulProvider(SingleStatefulSectionProvider, PeriodicProvider):
|
||||
async def run(self) -> None:
|
||||
await super().run()
|
||||
self.section.setChangedState(self.loop)
|
||||
|
||||
|
||||
class AlertingProvider(Provider):
|
||||
COLOR_NORMAL = rich.color.Color.parse("green")
|
||||
COLOR_WARNING = rich.color.Color.parse("yellow")
|
||||
COLOR_DANGER = rich.color.Color.parse("red")
|
||||
|
||||
warningThreshold: float
|
||||
dangerThreshold: float
|
||||
|
||||
def updateLevel(self, level: float) -> None:
|
||||
if level > self.dangerThreshold:
|
||||
color = self.COLOR_DANGER
|
||||
elif level > self.warningThreshold:
|
||||
color = self.COLOR_WARNING
|
||||
else:
|
||||
color = self.COLOR_NORMAL
|
||||
for module in self.modules:
|
||||
for section in module.getSections():
|
||||
section.color = color
|
||||
|
|
|
@ -1,756 +0,0 @@
|
|||
#!/usr/bin/env python3init
|
||||
|
||||
import enum
|
||||
import logging
|
||||
import os
|
||||
import signal
|
||||
import subprocess
|
||||
import threading
|
||||
import time
|
||||
import typing
|
||||
|
||||
import coloredlogs
|
||||
import i3ipc
|
||||
|
||||
from frobar.common import notBusy
|
||||
|
||||
coloredlogs.install(level="DEBUG", fmt="%(levelname)s %(message)s")
|
||||
log = logging.getLogger()
|
||||
|
||||
|
||||
# TODO Allow deletion of Bar, BarGroup and Section for screen changes
|
||||
# IDEA Use i3 ipc events rather than relying on xrandr or Xlib (less portable
|
||||
# but easier)
|
||||
# TODO Optimize to use write() calls instead of string concatenation (writing
|
||||
# BarGroup strings should be a good compromise)
|
||||
# TODO Use bytes rather than strings
|
||||
# TODO Use default colors of lemonbar sometimes
|
||||
# TODO Adapt bar height with font height
|
||||
# TODO OPTI Static text objects that update its parents if modified
|
||||
# TODO forceSize and changeText are different
|
||||
|
||||
|
||||
Handle = typing.Callable[[], None]
|
||||
Decorator = Handle | str | None
|
||||
Element: typing.TypeAlias = typing.Union[str, "Text", None]
|
||||
Part: typing.TypeAlias = typing.Union[str, "Text", "Section"]
|
||||
|
||||
|
||||
class BarGroupType(enum.Enum):
|
||||
LEFT = 0
|
||||
RIGHT = 1
|
||||
# TODO Middle
|
||||
# MID_LEFT = 2
|
||||
# MID_RIGHT = 3
|
||||
|
||||
|
||||
class BarStdoutThread(threading.Thread):
|
||||
def run(self) -> None:
|
||||
while Bar.running:
|
||||
assert Bar.process.stdout
|
||||
handle = Bar.process.stdout.readline().strip()
|
||||
if not len(handle):
|
||||
Bar.stop()
|
||||
if handle not in Bar.actionsH2F:
|
||||
log.error("Unknown action: {}".format(handle))
|
||||
continue
|
||||
function = Bar.actionsH2F[handle]
|
||||
function()
|
||||
|
||||
|
||||
class Bar:
|
||||
"""
|
||||
One bar for each screen
|
||||
"""
|
||||
|
||||
# Constants
|
||||
FONTS = ["DejaVuSansM Nerd Font"]
|
||||
FONTSIZE = 10
|
||||
|
||||
@staticmethod
|
||||
def init() -> None:
|
||||
Bar.running = True
|
||||
Bar.everyone = set()
|
||||
Section.init()
|
||||
|
||||
cmd = [
|
||||
"lemonbar",
|
||||
"-b",
|
||||
"-a",
|
||||
"64",
|
||||
"-F",
|
||||
Section.FGCOLOR,
|
||||
"-B",
|
||||
Section.BGCOLOR,
|
||||
]
|
||||
for font in Bar.FONTS:
|
||||
cmd += ["-f", "{}:size={}".format(font, Bar.FONTSIZE)]
|
||||
Bar.process = subprocess.Popen(
|
||||
cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE
|
||||
)
|
||||
BarStdoutThread().start()
|
||||
|
||||
i3 = i3ipc.Connection()
|
||||
for output in i3.get_outputs():
|
||||
if not output.active:
|
||||
continue
|
||||
Bar(output.name)
|
||||
|
||||
@staticmethod
|
||||
def stop() -> None:
|
||||
Bar.running = False
|
||||
Bar.process.kill()
|
||||
|
||||
# TODO This is not really the best way to do it I guess
|
||||
os.killpg(os.getpid(), signal.SIGTERM)
|
||||
|
||||
@staticmethod
|
||||
def run() -> None:
|
||||
Bar.forever()
|
||||
i3 = i3ipc.Connection()
|
||||
|
||||
def doStop(*args: list) -> None:
|
||||
Bar.stop()
|
||||
|
||||
try:
|
||||
i3.on("ipc_shutdown", doStop)
|
||||
i3.main()
|
||||
except BaseException:
|
||||
Bar.stop()
|
||||
|
||||
# Class globals
|
||||
everyone: set["Bar"]
|
||||
string = ""
|
||||
process: subprocess.Popen
|
||||
running = False
|
||||
|
||||
nextHandle = 0
|
||||
actionsF2H: dict[Handle, bytes] = dict()
|
||||
actionsH2F: dict[bytes, Handle] = dict()
|
||||
|
||||
@staticmethod
|
||||
def getFunctionHandle(function: typing.Callable[[], None]) -> bytes:
|
||||
assert callable(function)
|
||||
if function in Bar.actionsF2H.keys():
|
||||
return Bar.actionsF2H[function]
|
||||
|
||||
handle = "{:x}".format(Bar.nextHandle).encode()
|
||||
Bar.nextHandle += 1
|
||||
|
||||
Bar.actionsF2H[function] = handle
|
||||
Bar.actionsH2F[handle] = function
|
||||
|
||||
return handle
|
||||
|
||||
@staticmethod
|
||||
def forever() -> None:
|
||||
Bar.process.wait()
|
||||
Bar.stop()
|
||||
|
||||
def __init__(self, output: str) -> None:
|
||||
self.output = output
|
||||
self.groups = dict()
|
||||
|
||||
for groupType in BarGroupType:
|
||||
group = BarGroup(groupType, self)
|
||||
self.groups[groupType] = group
|
||||
|
||||
self.childsChanged = False
|
||||
Bar.everyone.add(self)
|
||||
|
||||
@staticmethod
|
||||
def addSectionAll(
|
||||
section: "Section", group: "BarGroupType"
|
||||
) -> None:
|
||||
"""
|
||||
.. note::
|
||||
Add the section before updating it for the first time.
|
||||
"""
|
||||
for bar in Bar.everyone:
|
||||
bar.addSection(section, group=group)
|
||||
section.added()
|
||||
|
||||
def addSection(self, section: "Section", group: "BarGroupType") -> None:
|
||||
self.groups[group].addSection(section)
|
||||
|
||||
def update(self) -> None:
|
||||
if self.childsChanged:
|
||||
self.string = "%{Sn" + self.output + "}"
|
||||
self.string += self.groups[BarGroupType.LEFT].string
|
||||
self.string += self.groups[BarGroupType.RIGHT].string
|
||||
|
||||
self.childsChanged = False
|
||||
|
||||
@staticmethod
|
||||
def updateAll() -> None:
|
||||
if Bar.running:
|
||||
Bar.string = ""
|
||||
for bar in Bar.everyone:
|
||||
bar.update()
|
||||
Bar.string += bar.string
|
||||
# Color for empty sections
|
||||
Bar.string += BarGroup.color(*Section.EMPTY)
|
||||
|
||||
string = Bar.string + "\n"
|
||||
# print(string)
|
||||
assert Bar.process.stdin
|
||||
Bar.process.stdin.write(string.encode())
|
||||
Bar.process.stdin.flush()
|
||||
|
||||
|
||||
class BarGroup:
|
||||
"""
|
||||
One for each group of each bar
|
||||
"""
|
||||
|
||||
everyone: set["BarGroup"] = set()
|
||||
|
||||
def __init__(self, groupType: BarGroupType, parent: Bar):
|
||||
|
||||
self.groupType = groupType
|
||||
self.parent = parent
|
||||
|
||||
self.sections: list["Section"] = list()
|
||||
self.string = ""
|
||||
self.parts: list[Part] = []
|
||||
|
||||
#: One of the sections that had their theme or visibility changed
|
||||
self.childsThemeChanged = False
|
||||
|
||||
#: One of the sections that had their text (maybe their size) changed
|
||||
self.childsTextChanged = False
|
||||
|
||||
BarGroup.everyone.add(self)
|
||||
|
||||
def addSection(self, section: "Section") -> None:
|
||||
self.sections.append(section)
|
||||
section.addParent(self)
|
||||
|
||||
def addSectionAfter(self, sectionRef: "Section", section: "Section") -> None:
|
||||
index = self.sections.index(sectionRef)
|
||||
self.sections.insert(index + 1, section)
|
||||
section.addParent(self)
|
||||
|
||||
ALIGNS = {BarGroupType.LEFT: "%{l}", BarGroupType.RIGHT: "%{r}"}
|
||||
|
||||
@staticmethod
|
||||
def fgColor(color: str) -> str:
|
||||
return "%{F" + (color or "-") + "}"
|
||||
|
||||
@staticmethod
|
||||
def bgColor(color: str) -> str:
|
||||
return "%{B" + (color or "-") + "}"
|
||||
|
||||
@staticmethod
|
||||
def color(fg: str, bg: str) -> str:
|
||||
return BarGroup.fgColor(fg) + BarGroup.bgColor(bg)
|
||||
|
||||
def update(self) -> None:
|
||||
if self.childsThemeChanged:
|
||||
parts: list[Part] = [BarGroup.ALIGNS[self.groupType]]
|
||||
|
||||
secs = [sec for sec in self.sections if sec.visible]
|
||||
lenS = len(secs)
|
||||
for s in range(lenS):
|
||||
sec = secs[s]
|
||||
theme = Section.THEMES[sec.theme]
|
||||
if self.groupType == BarGroupType.LEFT:
|
||||
oSec = secs[s + 1] if s < lenS - 1 else None
|
||||
else:
|
||||
oSec = secs[s - 1] if s > 0 else None
|
||||
oTheme = (
|
||||
Section.THEMES[oSec.theme] if oSec is not None else Section.EMPTY
|
||||
)
|
||||
|
||||
if self.groupType == BarGroupType.LEFT:
|
||||
if s == 0:
|
||||
parts.append(BarGroup.bgColor(theme[1]))
|
||||
parts.append(BarGroup.fgColor(theme[0]))
|
||||
parts.append(sec)
|
||||
if theme == oTheme:
|
||||
parts.append("")
|
||||
else:
|
||||
parts.append(BarGroup.color(theme[1], oTheme[1]) + "")
|
||||
else:
|
||||
if theme is oTheme:
|
||||
parts.append("")
|
||||
else:
|
||||
parts.append(BarGroup.fgColor(theme[1]) + "")
|
||||
parts.append(BarGroup.color(*theme))
|
||||
parts.append(sec)
|
||||
|
||||
# TODO OPTI Concatenate successive strings
|
||||
self.parts = parts
|
||||
|
||||
if self.childsTextChanged or self.childsThemeChanged:
|
||||
self.string = ""
|
||||
for part in self.parts:
|
||||
if isinstance(part, str):
|
||||
self.string += part
|
||||
elif isinstance(part, Section):
|
||||
self.string += part.curText
|
||||
|
||||
self.parent.childsChanged = True
|
||||
|
||||
self.childsThemeChanged = False
|
||||
self.childsTextChanged = False
|
||||
|
||||
@staticmethod
|
||||
def updateAll() -> None:
|
||||
for group in BarGroup.everyone:
|
||||
group.update()
|
||||
Bar.updateAll()
|
||||
|
||||
|
||||
class SectionThread(threading.Thread):
|
||||
ANIMATION_START = 0.025
|
||||
ANIMATION_STOP = 0.001
|
||||
ANIMATION_EVOLUTION = 0.9
|
||||
|
||||
def run(self) -> None:
|
||||
while Section.somethingChanged.wait():
|
||||
notBusy.wait()
|
||||
Section.updateAll()
|
||||
animTime = self.ANIMATION_START
|
||||
frameTime = time.perf_counter()
|
||||
while len(Section.sizeChanging) > 0:
|
||||
frameTime += animTime
|
||||
curTime = time.perf_counter()
|
||||
sleepTime = frameTime - curTime
|
||||
time.sleep(sleepTime if sleepTime > 0 else 0)
|
||||
Section.updateAll()
|
||||
animTime *= self.ANIMATION_EVOLUTION
|
||||
if animTime < self.ANIMATION_STOP:
|
||||
animTime = self.ANIMATION_STOP
|
||||
|
||||
|
||||
Theme = tuple[str, str]
|
||||
|
||||
|
||||
class Section:
|
||||
# TODO Update all of that to base16
|
||||
COLORS = [
|
||||
"#092c0e",
|
||||
"#143718",
|
||||
"#5a7058",
|
||||
"#677d64",
|
||||
"#89947f",
|
||||
"#99a08d",
|
||||
"#fae2e3",
|
||||
"#fff0f1",
|
||||
"#e0332e",
|
||||
"#cf4b15",
|
||||
"#bb8801",
|
||||
"#8d9800",
|
||||
"#1fa198",
|
||||
"#008dd1",
|
||||
"#5c73c4",
|
||||
"#d43982",
|
||||
]
|
||||
FGCOLOR = "#fff0f1"
|
||||
BGCOLOR = "#092c0e"
|
||||
|
||||
THEMES: list[Theme] = list()
|
||||
EMPTY: Theme = (FGCOLOR, BGCOLOR)
|
||||
|
||||
ICON: str | None = None
|
||||
PERSISTENT = False
|
||||
|
||||
#: Sections that do not have their destination size
|
||||
sizeChanging: set["Section"] = set()
|
||||
updateThread: threading.Thread = SectionThread(daemon=True)
|
||||
somethingChanged = threading.Event()
|
||||
lastChosenTheme = 0
|
||||
|
||||
@staticmethod
|
||||
def init() -> None:
|
||||
for t in range(8, 16):
|
||||
Section.THEMES.append((Section.COLORS[0], Section.COLORS[t]))
|
||||
Section.THEMES.append((Section.COLORS[0], Section.COLORS[3]))
|
||||
Section.THEMES.append((Section.COLORS[0], Section.COLORS[6]))
|
||||
|
||||
Section.updateThread.start()
|
||||
|
||||
def __init__(self, theme: int | None = None) -> None:
|
||||
#: Displayed section
|
||||
#: Note: A section can be empty and displayed!
|
||||
self.visible = False
|
||||
|
||||
if theme is None:
|
||||
theme = Section.lastChosenTheme
|
||||
Section.lastChosenTheme = (Section.lastChosenTheme + 1) % len(
|
||||
Section.THEMES
|
||||
)
|
||||
self.theme = theme
|
||||
|
||||
#: Displayed text
|
||||
self.curText = ""
|
||||
#: Displayed text size
|
||||
self.curSize = 0
|
||||
|
||||
#: Destination text
|
||||
self.dstText = Text(" ", Text(), " ")
|
||||
#: Destination size
|
||||
self.dstSize = 0
|
||||
|
||||
#: Groups that have this section
|
||||
self.parents: set[BarGroup] = set()
|
||||
|
||||
self.icon = self.ICON
|
||||
self.persistent = self.PERSISTENT
|
||||
|
||||
def __str__(self) -> str:
|
||||
try:
|
||||
return "<{}><{}>{:01d}{}{:02d}/{:02d}".format(
|
||||
self.curText,
|
||||
self.dstText,
|
||||
self.theme,
|
||||
"+" if self.visible else "-",
|
||||
self.curSize,
|
||||
self.dstSize,
|
||||
)
|
||||
except Exception:
|
||||
return super().__str__()
|
||||
|
||||
def addParent(self, parent: BarGroup) -> None:
|
||||
self.parents.add(parent)
|
||||
|
||||
def appendAfter(self, section: "Section") -> None:
|
||||
assert len(self.parents)
|
||||
for parent in self.parents:
|
||||
parent.addSectionAfter(self, section)
|
||||
|
||||
def added(self) -> None:
|
||||
pass
|
||||
|
||||
def informParentsThemeChanged(self) -> None:
|
||||
for parent in self.parents:
|
||||
parent.childsThemeChanged = True
|
||||
|
||||
def informParentsTextChanged(self) -> None:
|
||||
for parent in self.parents:
|
||||
parent.childsTextChanged = True
|
||||
|
||||
def updateText(self, text: Element) -> None:
|
||||
if isinstance(text, str):
|
||||
text = Text(text)
|
||||
elif isinstance(text, Text) and not len(text.elements):
|
||||
text = None
|
||||
|
||||
self.dstText[0] = (
|
||||
None
|
||||
if (text is None and not self.persistent)
|
||||
else ((" " + self.icon + " ") if self.icon else " ")
|
||||
)
|
||||
self.dstText[1] = text
|
||||
self.dstText[2] = (
|
||||
" " if self.dstText[1] is not None and len(self.dstText[1]) else None
|
||||
)
|
||||
|
||||
self.dstSize = len(self.dstText)
|
||||
self.dstText.setSection(self)
|
||||
|
||||
if self.curSize == self.dstSize:
|
||||
if self.dstSize > 0:
|
||||
self.curText = str(self.dstText)
|
||||
self.informParentsTextChanged()
|
||||
else:
|
||||
Section.sizeChanging.add(self)
|
||||
Section.somethingChanged.set()
|
||||
|
||||
def setDecorators(self, **kwargs: Handle) -> None:
|
||||
self.dstText.setDecorators(**kwargs)
|
||||
self.curText = str(self.dstText)
|
||||
self.informParentsTextChanged()
|
||||
Section.somethingChanged.set()
|
||||
|
||||
def updateTheme(self, theme: int) -> None:
|
||||
assert theme < len(Section.THEMES)
|
||||
if theme == self.theme:
|
||||
return
|
||||
self.theme = theme
|
||||
self.informParentsThemeChanged()
|
||||
Section.somethingChanged.set()
|
||||
|
||||
def updateVisibility(self, visibility: bool) -> None:
|
||||
|
||||
self.visible = visibility
|
||||
self.informParentsThemeChanged()
|
||||
Section.somethingChanged.set()
|
||||
|
||||
@staticmethod
|
||||
def fit(text: str, size: int) -> str:
|
||||
t = len(text)
|
||||
return text[:size] if t >= size else text + " " * (size - t)
|
||||
|
||||
def update(self) -> None:
|
||||
# TODO Might profit of a better logic
|
||||
if not self.visible:
|
||||
self.updateVisibility(True)
|
||||
return
|
||||
|
||||
if self.dstSize > self.curSize:
|
||||
self.curSize += 1
|
||||
elif self.dstSize < self.curSize:
|
||||
self.curSize -= 1
|
||||
else:
|
||||
# Visibility toggling must be done one step after curSize = 0
|
||||
if self.dstSize == 0:
|
||||
self.updateVisibility(False)
|
||||
Section.sizeChanging.remove(self)
|
||||
return
|
||||
|
||||
self.curText = self.dstText.text(size=self.curSize, pad=True)
|
||||
self.informParentsTextChanged()
|
||||
|
||||
@staticmethod
|
||||
def updateAll() -> None:
|
||||
"""
|
||||
Process all sections for text size changes
|
||||
"""
|
||||
|
||||
for sizeChanging in Section.sizeChanging.copy():
|
||||
sizeChanging.update()
|
||||
|
||||
BarGroup.updateAll()
|
||||
|
||||
Section.somethingChanged.clear()
|
||||
|
||||
@staticmethod
|
||||
def ramp(p: float, ramp: str = " ▁▂▃▄▅▆▇█") -> str:
|
||||
if p > 1:
|
||||
return ramp[-1]
|
||||
elif p < 0:
|
||||
return ramp[0]
|
||||
else:
|
||||
return ramp[round(p * (len(ramp) - 1))]
|
||||
|
||||
|
||||
class StatefulSection(Section):
|
||||
# TODO FEAT Allow to temporary expand the section (e.g. when important change)
|
||||
NUMBER_STATES: int
|
||||
DEFAULT_STATE = 0
|
||||
|
||||
def __init__(self, theme: int | None) -> None:
|
||||
Section.__init__(self, theme=theme)
|
||||
self.state = self.DEFAULT_STATE
|
||||
if hasattr(self, "onChangeState"):
|
||||
self.onChangeState(self.state)
|
||||
self.setDecorators(
|
||||
clickLeft=self.incrementState, clickRight=self.decrementState
|
||||
)
|
||||
|
||||
def incrementState(self) -> None:
|
||||
newState = min(self.state + 1, self.NUMBER_STATES - 1)
|
||||
self.changeState(newState)
|
||||
|
||||
def decrementState(self) -> None:
|
||||
newState = max(self.state - 1, 0)
|
||||
self.changeState(newState)
|
||||
|
||||
def changeState(self, state: int) -> None:
|
||||
assert state < self.NUMBER_STATES
|
||||
self.state = state
|
||||
if hasattr(self, "onChangeState"):
|
||||
self.onChangeState(state)
|
||||
assert hasattr(
|
||||
self, "refreshData"
|
||||
), "StatefulSection should be paired with some Updater"
|
||||
self.refreshData()
|
||||
|
||||
|
||||
class ColorCountsSection(StatefulSection):
|
||||
# TODO FEAT Blend colors when not expanded
|
||||
# TODO FEAT Blend colors with importance of count
|
||||
# TODO FEAT Allow icons instead of counts
|
||||
NUMBER_STATES = 3
|
||||
COLORABLE_ICON = "?"
|
||||
|
||||
def __init__(self, theme: None | int = None) -> None:
|
||||
StatefulSection.__init__(self, theme=theme)
|
||||
|
||||
def subfetcher(self) -> list[tuple[int, str]]:
|
||||
raise NotImplementedError("Interface must be implemented")
|
||||
|
||||
def fetcher(self) -> typing.Union[None, "Text"]:
|
||||
counts = self.subfetcher()
|
||||
# Nothing
|
||||
if not len(counts):
|
||||
return None
|
||||
# Icon colored
|
||||
elif self.state == 0 and len(counts) == 1:
|
||||
count, color = counts[0]
|
||||
return Text(self.COLORABLE_ICON, fg=color)
|
||||
# Icon
|
||||
elif self.state == 0 and len(counts) > 1:
|
||||
return Text(self.COLORABLE_ICON)
|
||||
# Icon + Total
|
||||
elif self.state == 1 and len(counts) > 1:
|
||||
total = sum([count for count, color in counts])
|
||||
return Text(self.COLORABLE_ICON, " ", str(total))
|
||||
# Icon + Counts
|
||||
else:
|
||||
text = Text(self.COLORABLE_ICON)
|
||||
for count, color in counts:
|
||||
text.append(" ", Text(str(count), fg=color))
|
||||
return text
|
||||
|
||||
|
||||
class Text:
|
||||
def _setDecorators(self, decorators: dict[str, Decorator]) -> None:
|
||||
# TODO OPTI Convert no decorator to strings
|
||||
self.decorators = decorators
|
||||
self.prefix: str | None = None
|
||||
self.suffix: str | None = None
|
||||
|
||||
def __init__(self, *args: Element, **kwargs: Decorator) -> None:
|
||||
# TODO OPTI Concatenate consecutrive string
|
||||
self.elements = list(args)
|
||||
|
||||
self._setDecorators(kwargs)
|
||||
self.section: Section
|
||||
|
||||
def append(self, *args: Element) -> None:
|
||||
self.elements += list(args)
|
||||
|
||||
def prepend(self, *args: Element) -> None:
|
||||
self.elements = list(args) + self.elements
|
||||
|
||||
def setElements(self, *args: Element) -> None:
|
||||
self.elements = list(args)
|
||||
|
||||
def setDecorators(self, **kwargs: Decorator) -> None:
|
||||
self._setDecorators(kwargs)
|
||||
|
||||
def setSection(self, section: Section) -> None:
|
||||
self.section = section
|
||||
for element in self.elements:
|
||||
if isinstance(element, Text):
|
||||
element.setSection(section)
|
||||
|
||||
def _genFixs(self) -> None:
|
||||
if self.prefix is not None and self.suffix is not None:
|
||||
return
|
||||
|
||||
self.prefix = ""
|
||||
self.suffix = ""
|
||||
|
||||
def nest(prefix: str, suffix: str) -> None:
|
||||
assert self.prefix is not None
|
||||
assert self.suffix is not None
|
||||
self.prefix = self.prefix + "%{" + prefix + "}"
|
||||
self.suffix = "%{" + suffix + "}" + self.suffix
|
||||
|
||||
def getColor(val: str) -> str:
|
||||
# TODO Allow themes
|
||||
assert len(val) == 7
|
||||
return val
|
||||
|
||||
def button(number: str, function: Handle) -> None:
|
||||
handle = Bar.getFunctionHandle(function)
|
||||
nest("A" + number + ":" + handle.decode() + ":", "A" + number)
|
||||
|
||||
for key, val in self.decorators.items():
|
||||
if val is None:
|
||||
continue
|
||||
if key == "fg":
|
||||
reset = self.section.THEMES[self.section.theme][0]
|
||||
assert isinstance(val, str)
|
||||
nest("F" + getColor(val), "F" + reset)
|
||||
elif key == "bg":
|
||||
reset = self.section.THEMES[self.section.theme][1]
|
||||
assert isinstance(val, str)
|
||||
nest("B" + getColor(val), "B" + reset)
|
||||
elif key == "clickLeft":
|
||||
assert callable(val)
|
||||
button("1", val)
|
||||
elif key == "clickMiddle":
|
||||
assert callable(val)
|
||||
button("2", val)
|
||||
elif key == "clickRight":
|
||||
assert callable(val)
|
||||
button("3", val)
|
||||
elif key == "scrollUp":
|
||||
assert callable(val)
|
||||
button("4", val)
|
||||
elif key == "scrollDown":
|
||||
assert callable(val)
|
||||
button("5", val)
|
||||
else:
|
||||
log.warn("Unkown decorator: {}".format(key))
|
||||
|
||||
def _text(self, size: int | None = None, pad: bool = False) -> tuple[str, int]:
|
||||
self._genFixs()
|
||||
assert self.prefix is not None
|
||||
assert self.suffix is not None
|
||||
curString = self.prefix
|
||||
curSize = 0
|
||||
remSize = size
|
||||
|
||||
for element in self.elements:
|
||||
if element is None:
|
||||
continue
|
||||
elif isinstance(element, Text):
|
||||
newString, newSize = element._text(size=remSize)
|
||||
else:
|
||||
newString = str(element)
|
||||
if remSize is not None:
|
||||
newString = newString[:remSize]
|
||||
newSize = len(newString)
|
||||
|
||||
curString += newString
|
||||
curSize += newSize
|
||||
|
||||
if remSize is not None:
|
||||
remSize -= newSize
|
||||
if remSize <= 0:
|
||||
break
|
||||
|
||||
curString += self.suffix
|
||||
|
||||
if pad:
|
||||
assert remSize is not None
|
||||
if remSize > 0:
|
||||
curString += " " * remSize
|
||||
curSize += remSize
|
||||
|
||||
if size is not None:
|
||||
if pad:
|
||||
assert size == curSize
|
||||
else:
|
||||
assert size >= curSize
|
||||
return curString, curSize
|
||||
|
||||
def text(self, size: int | None = None, pad: bool = False) -> str:
|
||||
string, size = self._text(size=size, pad=pad)
|
||||
return string
|
||||
|
||||
def __str__(self) -> str:
|
||||
self._genFixs()
|
||||
assert self.prefix is not None
|
||||
assert self.suffix is not None
|
||||
curString = self.prefix
|
||||
for element in self.elements:
|
||||
if element is None:
|
||||
continue
|
||||
else:
|
||||
curString += str(element)
|
||||
curString += self.suffix
|
||||
return curString
|
||||
|
||||
def __len__(self) -> int:
|
||||
curSize = 0
|
||||
for element in self.elements:
|
||||
if element is None:
|
||||
continue
|
||||
elif isinstance(element, Text):
|
||||
curSize += len(element)
|
||||
else:
|
||||
curSize += len(str(element))
|
||||
return curSize
|
||||
|
||||
def __getitem__(self, index: int) -> Element:
|
||||
return self.elements[index]
|
||||
|
||||
def __setitem__(self, index: int, data: Element) -> None:
|
||||
self.elements[index] = data
|
File diff suppressed because it is too large
Load diff
|
@ -1,240 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
import functools
|
||||
import logging
|
||||
import math
|
||||
import os
|
||||
import subprocess
|
||||
import threading
|
||||
import time
|
||||
|
||||
import coloredlogs
|
||||
import i3ipc
|
||||
import pyinotify
|
||||
|
||||
from frobar.common import notBusy
|
||||
from frobar.display import Element
|
||||
|
||||
coloredlogs.install(level="DEBUG", fmt="%(levelname)s %(message)s")
|
||||
log = logging.getLogger()
|
||||
|
||||
# TODO Sync bar update with PeriodicUpdater updates
|
||||
|
||||
|
||||
class Updater:
|
||||
@staticmethod
|
||||
def init() -> None:
|
||||
PeriodicUpdater.init()
|
||||
InotifyUpdater.init()
|
||||
notBusy.set()
|
||||
|
||||
def updateText(self, text: Element) -> None:
|
||||
print(text)
|
||||
|
||||
def fetcher(self) -> Element:
|
||||
return "{} refreshed".format(self)
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.lock = threading.Lock()
|
||||
|
||||
def refreshData(self) -> None:
|
||||
# TODO OPTI Maybe discard the refresh if there's already another one?
|
||||
self.lock.acquire()
|
||||
try:
|
||||
data = self.fetcher()
|
||||
except BaseException as e:
|
||||
log.error(e, exc_info=True)
|
||||
data = ""
|
||||
self.updateText(data)
|
||||
self.lock.release()
|
||||
|
||||
|
||||
class PeriodicUpdaterThread(threading.Thread):
|
||||
def run(self) -> None:
|
||||
# TODO Sync with system clock
|
||||
counter = 0
|
||||
while True:
|
||||
notBusy.set()
|
||||
if PeriodicUpdater.intervalsChanged.wait(
|
||||
timeout=PeriodicUpdater.intervalStep
|
||||
):
|
||||
# ↑ sleeps here
|
||||
notBusy.clear()
|
||||
PeriodicUpdater.intervalsChanged.clear()
|
||||
counter = 0
|
||||
for providerList in PeriodicUpdater.intervals.copy().values():
|
||||
for provider in providerList.copy():
|
||||
provider.refreshData()
|
||||
else:
|
||||
notBusy.clear()
|
||||
assert PeriodicUpdater.intervalStep is not None
|
||||
counter += PeriodicUpdater.intervalStep
|
||||
counter = counter % PeriodicUpdater.intervalLoop
|
||||
for interval in PeriodicUpdater.intervals.keys():
|
||||
if counter % interval == 0:
|
||||
for provider in PeriodicUpdater.intervals[interval]:
|
||||
provider.refreshData()
|
||||
|
||||
|
||||
class PeriodicUpdater(Updater):
|
||||
"""
|
||||
Needs to call :func:`PeriodicUpdater.changeInterval` in `__init__`
|
||||
"""
|
||||
|
||||
intervals: dict[int, set["PeriodicUpdater"]] = dict()
|
||||
intervalStep: int | None = None
|
||||
intervalLoop: int
|
||||
updateThread: threading.Thread = PeriodicUpdaterThread(daemon=True)
|
||||
intervalsChanged = threading.Event()
|
||||
|
||||
@staticmethod
|
||||
def gcds(*args: int) -> int:
|
||||
return functools.reduce(math.gcd, args)
|
||||
|
||||
@staticmethod
|
||||
def lcm(a: int, b: int) -> int:
|
||||
"""Return lowest common multiple."""
|
||||
return a * b // math.gcd(a, b)
|
||||
|
||||
@staticmethod
|
||||
def lcms(*args: int) -> int:
|
||||
"""Return lowest common multiple."""
|
||||
return functools.reduce(PeriodicUpdater.lcm, args)
|
||||
|
||||
@staticmethod
|
||||
def updateIntervals() -> None:
|
||||
intervalsList = list(PeriodicUpdater.intervals.keys())
|
||||
PeriodicUpdater.intervalStep = PeriodicUpdater.gcds(*intervalsList)
|
||||
PeriodicUpdater.intervalLoop = PeriodicUpdater.lcms(*intervalsList)
|
||||
PeriodicUpdater.intervalsChanged.set()
|
||||
|
||||
@staticmethod
|
||||
def init() -> None:
|
||||
PeriodicUpdater.updateThread.start()
|
||||
|
||||
def __init__(self) -> None:
|
||||
Updater.__init__(self)
|
||||
self.interval: int | None = None
|
||||
|
||||
def changeInterval(self, interval: int) -> None:
|
||||
|
||||
if self.interval is not None:
|
||||
PeriodicUpdater.intervals[self.interval].remove(self)
|
||||
|
||||
self.interval = interval
|
||||
|
||||
if interval not in PeriodicUpdater.intervals:
|
||||
PeriodicUpdater.intervals[interval] = set()
|
||||
PeriodicUpdater.intervals[interval].add(self)
|
||||
|
||||
PeriodicUpdater.updateIntervals()
|
||||
|
||||
|
||||
class InotifyUpdaterEventHandler(pyinotify.ProcessEvent):
|
||||
def process_default(self, event: pyinotify.Event) -> None:
|
||||
assert event.path in InotifyUpdater.paths
|
||||
|
||||
if 0 in InotifyUpdater.paths[event.path]:
|
||||
for provider in InotifyUpdater.paths[event.path][0]:
|
||||
provider.refreshData()
|
||||
|
||||
if event.name in InotifyUpdater.paths[event.path]:
|
||||
for provider in InotifyUpdater.paths[event.path][event.name]:
|
||||
provider.refreshData()
|
||||
|
||||
|
||||
class InotifyUpdater(Updater):
|
||||
"""
|
||||
Needs to call :func:`PeriodicUpdater.changeInterval` in `__init__`
|
||||
"""
|
||||
|
||||
wm = pyinotify.WatchManager()
|
||||
paths: dict[str, dict[str | int, set["InotifyUpdater"]]] = dict()
|
||||
|
||||
@staticmethod
|
||||
def init() -> None:
|
||||
notifier = pyinotify.ThreadedNotifier(
|
||||
InotifyUpdater.wm, InotifyUpdaterEventHandler()
|
||||
)
|
||||
notifier.start()
|
||||
|
||||
# TODO Mask for folders
|
||||
MASK = pyinotify.IN_CREATE | pyinotify.IN_MODIFY | pyinotify.IN_DELETE
|
||||
|
||||
def addPath(self, path: str, refresh: bool = True) -> None:
|
||||
path = os.path.realpath(os.path.expanduser(path))
|
||||
|
||||
# Detect if file or folder
|
||||
if os.path.isdir(path):
|
||||
self.dirpath: str = path
|
||||
# 0: Directory watcher
|
||||
self.filename: str | int = 0
|
||||
elif os.path.isfile(path):
|
||||
self.dirpath = os.path.dirname(path)
|
||||
self.filename = os.path.basename(path)
|
||||
else:
|
||||
raise FileNotFoundError("No such file or directory: '{}'".format(path))
|
||||
|
||||
# Register watch action
|
||||
if self.dirpath not in InotifyUpdater.paths:
|
||||
InotifyUpdater.paths[self.dirpath] = dict()
|
||||
if self.filename not in InotifyUpdater.paths[self.dirpath]:
|
||||
InotifyUpdater.paths[self.dirpath][self.filename] = set()
|
||||
InotifyUpdater.paths[self.dirpath][self.filename].add(self)
|
||||
|
||||
# Add watch
|
||||
InotifyUpdater.wm.add_watch(self.dirpath, InotifyUpdater.MASK)
|
||||
|
||||
if refresh:
|
||||
self.refreshData()
|
||||
|
||||
|
||||
class ThreadedUpdaterThread(threading.Thread):
|
||||
def __init__(self, updater: "ThreadedUpdater") -> None:
|
||||
self.updater = updater
|
||||
threading.Thread.__init__(self, daemon=True)
|
||||
self.looping = True
|
||||
|
||||
def run(self) -> None:
|
||||
try:
|
||||
while self.looping:
|
||||
self.updater.loop()
|
||||
except BaseException as e:
|
||||
log.error("Error with {}".format(self.updater))
|
||||
log.error(e, exc_info=True)
|
||||
self.updater.updateText("")
|
||||
|
||||
|
||||
class ThreadedUpdater(Updater):
|
||||
"""
|
||||
Must implement loop(), and call start()
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
Updater.__init__(self)
|
||||
self.thread = ThreadedUpdaterThread(self)
|
||||
|
||||
def loop(self) -> None:
|
||||
self.refreshData()
|
||||
time.sleep(10)
|
||||
|
||||
def start(self) -> None:
|
||||
self.thread.start()
|
||||
|
||||
|
||||
class I3Updater(ThreadedUpdater):
|
||||
# TODO OPTI One i3 connection for all
|
||||
|
||||
def __init__(self) -> None:
|
||||
ThreadedUpdater.__init__(self)
|
||||
self.i3 = i3ipc.Connection()
|
||||
self.on = self.i3.on
|
||||
self.start()
|
||||
|
||||
def loop(self) -> None:
|
||||
self.i3.main()
|
||||
|
||||
|
||||
class MergedUpdater(Updater):
|
||||
def __init__(self, *args: Updater) -> None:
|
||||
raise NotImplementedError("Deprecated, as hacky and currently unused")
|
|
@ -2,19 +2,17 @@ from setuptools import setup
|
|||
|
||||
setup(
|
||||
name="frobar",
|
||||
version="2.0",
|
||||
version="3.0",
|
||||
install_requires=[
|
||||
"coloredlogs",
|
||||
"notmuch",
|
||||
"i3ipc",
|
||||
"python-mpd2",
|
||||
"psutil",
|
||||
"pulsectl",
|
||||
"pyinotify",
|
||||
"pulsectl-asyncio",
|
||||
"pygobject3",
|
||||
"rich",
|
||||
],
|
||||
entry_points={
|
||||
"console_scripts": [
|
||||
"frobar = frobar:run",
|
||||
"frobar = frobar:main",
|
||||
]
|
||||
},
|
||||
)
|
||||
|
|
Loading…
Reference in a new issue