Manual fixes to Python file

To see if I like the rules
This commit is contained in:
Geoffrey Frogeye 2025-05-08 18:29:03 +02:00
parent 34b545890d
commit 8179433c41
10 changed files with 612 additions and 547 deletions

View file

@ -9,7 +9,7 @@ from frobar.common import Alignment
def main() -> None:
# TODO Configurable
FROGARIZED = [
frogarized = (
"#092c0e",
"#143718",
"#5a7058",
@ -26,12 +26,12 @@ def main() -> None:
"#008dd1",
"#5c73c4",
"#d43982",
]
)
# TODO Not super happy with the color management,
# while using an existing library is great, it's limited to ANSI colors
def base16_color(color: int) -> tuple[int, int, int]:
hexa = FROGARIZED[color]
hexa = frogarized[color]
return tuple(rich.color.parse_rgb_hex(hexa[1:]))
theme = rich.terminal_theme.TerminalTheme(
@ -62,83 +62,83 @@ def main() -> None:
)
bar = frobar.common.Bar(theme=theme)
dualScreen = len(bar.children) > 1
leftPreferred = 0 if dualScreen else None
rightPreferred = 1 if dualScreen else None
dual_screen = len(bar.children) > 1
left_preferred = 0 if dual_screen else None
right_preferred = 1 if dual_screen else None
workspaces_suffixes = "▲■"
workspaces_names = dict(
(str(i + 1), f"{i+1} {c}") for i, c in enumerate(workspaces_suffixes)
)
workspaces_names = {
str(i + 1): f"{i+1} {c}" for i, c in enumerate(workspaces_suffixes)
}
color = rich.color.Color.parse
bar.addProvider(
bar.add_provider(
frobar.providers.I3ModeProvider(color=color("red")),
alignment=Alignment.LEFT,
)
bar.addProvider(
bar.add_provider(
frobar.providers.I3WorkspacesProvider(custom_names=workspaces_names),
alignment=Alignment.LEFT,
)
if dualScreen:
bar.addProvider(
if dual_screen:
bar.add_provider(
frobar.providers.I3WindowTitleProvider(color=color("white")),
screenNum=0,
screen_num=0,
alignment=Alignment.CENTER,
)
bar.addProvider(
bar.add_provider(
frobar.providers.MprisProvider(color=color("bright_white")),
screenNum=rightPreferred,
screen_num=right_preferred,
alignment=Alignment.CENTER,
)
else:
bar.addProvider(
bar.add_provider(
frobar.common.SpacerProvider(),
alignment=Alignment.LEFT,
)
bar.addProvider(
bar.add_provider(
frobar.providers.MprisProvider(color=color("bright_white")),
alignment=Alignment.LEFT,
)
bar.addProvider(
bar.add_provider(
frobar.providers.CpuProvider(),
screenNum=leftPreferred,
screen_num=left_preferred,
alignment=Alignment.RIGHT,
)
bar.addProvider(
bar.add_provider(
frobar.providers.LoadProvider(),
screenNum=leftPreferred,
screen_num=left_preferred,
alignment=Alignment.RIGHT,
)
bar.addProvider(
bar.add_provider(
frobar.providers.RamProvider(),
screenNum=leftPreferred,
screen_num=left_preferred,
alignment=Alignment.RIGHT,
)
bar.addProvider(
bar.add_provider(
frobar.providers.TemperatureProvider(),
screenNum=leftPreferred,
screen_num=left_preferred,
alignment=Alignment.RIGHT,
)
bar.addProvider(
bar.add_provider(
frobar.providers.BatteryProvider(),
screenNum=leftPreferred,
screen_num=left_preferred,
alignment=Alignment.RIGHT,
)
bar.addProvider(
bar.add_provider(
frobar.providers.PulseaudioProvider(color=color("magenta")),
screenNum=rightPreferred,
screen_num=right_preferred,
alignment=Alignment.RIGHT,
)
bar.addProvider(
bar.add_provider(
frobar.providers.NetworkProvider(color=color("blue")),
screenNum=leftPreferred,
screen_num=left_preferred,
alignment=Alignment.RIGHT,
)
bar.addProvider(
bar.add_provider(
frobar.providers.TimeProvider(color=color("cyan")),
alignment=Alignment.RIGHT,
)

View file

@ -30,15 +30,16 @@ Sortable = str | int
# Display utilities
UNIT_THRESHOLD = 1000
FLOAT_THRESHOLD = 10
def humanSize(numi: int) -> str:
"""
Returns a string of width 3+3
"""
def human_size(numi: int) -> str:
"""Return a string of width 3+3."""
num = float(numi)
for unit in ("B ", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB"):
if abs(num) < 1000:
if num >= 10:
if abs(num) < UNIT_THRESHOLD:
if num >= FLOAT_THRESHOLD:
return f"{int(num):3d}{unit}"
return f"{num:.1f}{unit}"
num /= 1024
@ -64,46 +65,47 @@ def clip(text: str, length: int = 30) -> str:
class ComposableText(typing.Generic[P, C]):
def __init__(
self,
parent: typing.Optional[P] = None,
sortKey: Sortable = 0,
parent: P | None = None,
sort_key: Sortable = 0,
) -> None:
self.parent: typing.Optional[P] = None
self.children: typing.MutableSequence[C] = list()
self.sortKey = sortKey
self.parent: P | None = None
self.children: typing.MutableSequence[C] = []
self.sortKey = sort_key
if parent:
self.setParent(parent)
self.bar = self.getFirstParentOfType(Bar)
self.set_parent(parent)
self.bar = self.get_first_parent_of_type(Bar)
def setParent(self, parent: P) -> None:
def set_parent(self, parent: P) -> None:
assert self.parent is None
parent.children.append(self)
assert isinstance(parent.children, list)
parent.children.sort(key=lambda c: c.sortKey)
self.parent = parent
self.parent.updateMarkup()
self.parent.update_markup()
def unsetParent(self) -> None:
def unset_parent(self) -> None:
assert self.parent
self.parent.children.remove(self)
self.parent.updateMarkup()
self.parent.update_markup()
self.parent = None
def getFirstParentOfType(self, typ: typing.Type[T]) -> T:
def get_first_parent_of_type(self, typ: type[T]) -> T:
parent = self
while not isinstance(parent, typ):
assert parent.parent, f"{self} doesn't have a parent of {typ}"
parent = parent.parent
return parent
def updateMarkup(self) -> None:
def update_markup(self) -> None:
self.bar.refresh.set()
# TODO OPTI See if worth caching the output
def generateMarkup(self) -> str:
raise NotImplementedError(f"{self} cannot generate markup")
def generate_markup(self) -> str:
msg = f"{self} cannot generate markup"
raise NotImplementedError(msg)
def getMarkup(self) -> str:
return self.generateMarkup()
def get_markup(self) -> str:
return self.generate_markup()
class Button(enum.Enum):
@ -114,18 +116,19 @@ class Button(enum.Enum):
SCROLL_DOWN = "5"
RICH_DEFAULT_COLOR = rich.color.Color.default()
class Section(ComposableText):
"""
Colorable block separated by chevrons
"""
"""Colorable block separated by chevrons."""
def __init__(
self,
parent: "Module",
sortKey: Sortable = 0,
color: rich.color.Color = rich.color.Color.default(),
sort_key: Sortable = 0,
color: rich.color.Color = RICH_DEFAULT_COLOR,
) -> None:
super().__init__(parent=parent, sortKey=sortKey)
super().__init__(parent=parent, sort_key=sort_key)
self.parent: Module
self.color = color
@ -134,9 +137,9 @@ class Section(ComposableText):
self.targetSize = -1
self.size = -1
self.animationTask: asyncio.Task | None = None
self.actions: dict[Button, str] = dict()
self.actions: dict[Button, str] = {}
def isHidden(self) -> bool:
def is_hidden(self) -> bool:
return self.size < 0
# Geometric series, with a cap
@ -147,29 +150,29 @@ class Section(ComposableText):
async def animate(self) -> None:
increment = 1 if self.size < self.targetSize else -1
loop = asyncio.get_running_loop()
frameTime = loop.time()
animTime = self.ANIM_A
frame_time = loop.time()
anim_time = self.ANIM_A
skipped = 0
while self.size != self.targetSize:
self.size += increment
self.updateMarkup()
self.update_markup()
animTime *= self.ANIM_R
animTime = max(self.ANIM_MIN, animTime)
frameTime += animTime
sleepTime = frameTime - loop.time()
anim_time *= self.ANIM_R
anim_time = max(self.ANIM_MIN, anim_time)
frame_time += anim_time
sleep_time = frame_time - loop.time()
# In case of stress, skip refreshing by not awaiting
if sleepTime > 0:
if sleep_time > 0:
if skipped > 0:
log.warning(f"Skipped {skipped} animation frame(s)")
log.warning("Skipped %d animation frame(s)", skipped)
skipped = 0
await asyncio.sleep(sleepTime)
await asyncio.sleep(sleep_time)
else:
skipped += 1
def setText(self, text: str | None) -> None:
def set_text(self, text: str | None) -> None:
# OPTI Don't redraw nor reset animation if setting the same text
if self.desiredText == text:
return
@ -184,23 +187,23 @@ class Section(ComposableText):
self.animationTask.cancel()
# OPTI Skip the whole animation task if not required
if self.size == self.targetSize:
self.updateMarkup()
self.update_markup()
else:
self.animationTask = self.bar.taskGroup.create_task(self.animate())
def setAction(
def set_action(
self, button: Button, callback: typing.Callable | None
) -> None:
if button in self.actions:
command = self.actions[button]
self.bar.removeAction(command)
self.bar.remove_action(command)
del self.actions[button]
if callback:
command = self.bar.addAction(callback)
command = self.bar.add_action(callback)
self.actions[button] = command
def generateMarkup(self) -> str:
assert not self.isHidden()
def generate_markup(self) -> str:
assert not self.is_hidden()
pad = max(0, self.size - len(self.text))
text = self.text[: self.size] + " " * pad
for button, command in self.actions.items():
@ -209,9 +212,7 @@ class Section(ComposableText):
class Module(ComposableText):
"""
Sections handled by a same updater
"""
"""Sections handled by a same updater."""
def __init__(self, parent: "Side") -> None:
super().__init__(parent=parent)
@ -219,21 +220,21 @@ class Module(ComposableText):
self.children: typing.MutableSequence[Section]
self.mirroring: Module | None = None
self.mirrors: list[Module] = list()
self.mirrors: list[Module] = []
def mirror(self, module: "Module") -> None:
self.mirroring = module
module.mirrors.append(self)
def getSections(self) -> typing.Sequence[Section]:
def get_sections(self) -> typing.Sequence[Section]:
if self.mirroring:
return self.mirroring.children
return self.children
def updateMarkup(self) -> None:
super().updateMarkup()
def update_markup(self) -> None:
super().update_markup()
for mirror in self.mirrors:
mirror.updateMarkup()
mirror.update_markup()
class Alignment(enum.Enum):
@ -249,39 +250,41 @@ class Side(ComposableText):
self.children: typing.MutableSequence[Module] = []
self.alignment = alignment
self.bar = parent.getFirstParentOfType(Bar)
self.bar = parent.get_first_parent_of_type(Bar)
def generateMarkup(self) -> str:
def generate_markup(self) -> str:
if not self.children:
return ""
text = "%{" + self.alignment.value + "}"
lastSection: Section | None = None
last_section: Section | None = None
for module in self.children:
for section in module.getSections():
if section.isHidden():
for section in module.get_sections():
if section.is_hidden():
continue
hexa = section.color.get_truecolor(theme=self.bar.theme).hex
if lastSection is None:
if self.alignment == Alignment.LEFT:
text += "%{B" + hexa + "}%{F-}"
else:
text += "%{B-}%{F" + hexa + "}%{R}%{F-}"
elif isinstance(lastSection, SpacerSection):
if last_section is None:
text += (
"%{B" + hexa + "}%{F-}"
if self.alignment == Alignment.LEFT
else "%{B-}%{F" + hexa + "}%{R}%{F-}"
)
elif isinstance(last_section, SpacerSection):
text += "%{B-}%{F" + hexa + "}%{R}%{F-}"
else:
if self.alignment == Alignment.RIGHT:
if lastSection.color == section.color:
text += ""
else:
text += "%{F" + hexa + "}%{R}"
elif lastSection.color == section.color:
text += (
""
if last_section.color == section.color
else "%{F" + hexa + "}%{R}"
)
elif last_section.color == section.color:
text += ""
else:
text += "%{R}%{B" + hexa + "}"
text += "%{F-}"
text += section.getMarkup()
lastSection = section
if self.alignment != Alignment.RIGHT and lastSection:
text += section.get_markup()
last_section = section
if self.alignment != Alignment.RIGHT and last_section:
text += "%{R}%{B-}"
return text
@ -297,32 +300,33 @@ class Screen(ComposableText):
for alignment in Alignment:
Side(parent=self, alignment=alignment)
def generateMarkup(self) -> str:
def generate_markup(self) -> str:
return ("%{Sn" + self.output + "}") + "".join(
side.getMarkup() for side in self.children
side.get_markup() for side in self.children
)
RICH_DEFAULT_THEME = rich.terminal_theme.DEFAULT_TERMINAL_THEME
class Bar(ComposableText):
"""
Top-level
"""
"""Top-level."""
def __init__(
self,
theme: rich.terminal_theme.TerminalTheme = rich.terminal_theme.DEFAULT_TERMINAL_THEME,
theme: rich.terminal_theme.TerminalTheme = RICH_DEFAULT_THEME,
) -> None:
super().__init__()
self.parent: None
self.children: typing.MutableSequence[Screen]
self.longRunningTasks: list[asyncio.Task] = list()
self.longRunningTasks: list[asyncio.Task] = []
self.theme = theme
self.refresh = asyncio.Event()
self.taskGroup = asyncio.TaskGroup()
self.providers: list[Provider] = list()
self.providers: list[Provider] = []
self.actionIndex = 0
self.actions: dict[str, typing.Callable] = dict()
self.actions: dict[str, typing.Callable] = {}
self.periodicProviderTask: typing.Coroutine | None = None
@ -334,7 +338,7 @@ class Bar(ComposableText):
continue
Screen(parent=self, output=output.name)
def addLongRunningTask(self, coro: typing.Coroutine) -> None:
def add_long_running_task(self, coro: typing.Coroutine) -> None:
task = self.taskGroup.create_task(coro)
self.longRunningTasks.append(task)
@ -360,10 +364,10 @@ class Bar(ComposableText):
while True:
await self.refresh.wait()
self.refresh.clear()
markup = self.getMarkup()
markup = self.get_markup()
proc.stdin.write(markup.encode())
async def actionHandler() -> None:
async def action_handler() -> None:
assert proc.stdout
while True:
line = await proc.stdout.readline()
@ -371,39 +375,36 @@ class Bar(ComposableText):
callback = self.actions.get(command)
if callback is None:
# In some conditions on start it's empty
log.error(f"Unknown command: {command}")
log.error("Unknown command: %s", command)
return
callback()
async with self.taskGroup:
self.addLongRunningTask(refresher())
self.addLongRunningTask(actionHandler())
self.add_long_running_task(refresher())
self.add_long_running_task(action_handler())
for provider in self.providers:
self.addLongRunningTask(provider.run())
self.add_long_running_task(provider.run())
def exit() -> None:
def finish() -> None:
log.info("Terminating")
for task in self.longRunningTasks:
task.cancel()
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGINT, exit)
loop.add_signal_handler(signal.SIGINT, finish)
def generateMarkup(self) -> str:
return "".join(screen.getMarkup() for screen in self.children) + "\n"
def generate_markup(self) -> str:
return "".join(screen.get_markup() for screen in self.children) + "\n"
def addProvider(
def add_provider(
self,
provider: "Provider",
alignment: Alignment = Alignment.LEFT,
screenNum: int | None = None,
screen_num: int | None = None, # None = all screens
) -> None:
"""
screenNum: the provider will be added on this screen if set, all otherwise
"""
modules = list()
modules = []
for s, screen in enumerate(self.children):
if screenNum is None or s == screenNum:
if screen_num is None or s == screen_num:
side = next(
filter(lambda s: s.alignment == alignment, screen.children)
)
@ -413,13 +414,13 @@ class Bar(ComposableText):
if modules:
self.providers.append(provider)
def addAction(self, callback: typing.Callable) -> str:
def add_action(self, callback: typing.Callable) -> str:
command = f"{self.actionIndex:x}"
self.actions[command] = callback
self.actionIndex += 1
return command
def removeAction(self, command: str) -> None:
def remove_action(self, command: str) -> None:
del self.actions[command]
def launch(self) -> None:
@ -431,12 +432,10 @@ class Bar(ComposableText):
class Provider:
sectionType: type[Section] = Section
section_type: type[Section] = Section
def __init__(
self, color: rich.color.Color = rich.color.Color.default()
) -> None:
self.modules: list[Module] = list()
def __init__(self, color: rich.color.Color = RICH_DEFAULT_COLOR) -> None:
self.modules: list[Module] = []
self.color = color
async def run(self) -> None:
@ -445,9 +444,7 @@ class Provider:
class MirrorProvider(Provider):
def __init__(
self, color: rich.color.Color = rich.color.Color.default()
) -> None:
def __init__(self, color: rich.color.Color = RICH_DEFAULT_COLOR) -> None:
super().__init__(color=color)
self.module: Module
@ -461,19 +458,19 @@ class MirrorProvider(Provider):
class SingleSectionProvider(MirrorProvider):
async def run(self) -> None:
await super().run()
self.section = self.sectionType(parent=self.module, color=self.color)
self.section = self.section_type(parent=self.module, color=self.color)
class StaticProvider(SingleSectionProvider):
def __init__(
self, text: str, color: rich.color.Color = rich.color.Color.default()
self, text: str, color: rich.color.Color = RICH_DEFAULT_COLOR
) -> None:
super().__init__(color=color)
self.text = text
async def run(self) -> None:
await super().run()
self.section.setText(self.text)
self.section.set_text(self.text)
class SpacerSection(Section):
@ -481,7 +478,7 @@ class SpacerSection(Section):
class SpacerProvider(SingleSectionProvider):
sectionType = SpacerSection
section_type = SpacerSection
def __init__(self, length: int = 5) -> None:
super().__init__(color=rich.color.Color.default())
@ -490,41 +487,41 @@ class SpacerProvider(SingleSectionProvider):
async def run(self) -> None:
await super().run()
assert isinstance(self.section, SpacerSection)
self.section.setText(" " * self.length)
self.section.set_text(" " * self.length)
class StatefulSection(Section):
def __init__(
self,
parent: Module,
sortKey: Sortable = 0,
color: rich.color.Color = rich.color.Color.default(),
sort_key: Sortable = 0,
color: rich.color.Color = RICH_DEFAULT_COLOR,
) -> None:
super().__init__(parent=parent, sortKey=sortKey, color=color)
super().__init__(parent=parent, sort_key=sort_key, color=color)
self.state = 0
self.numberStates: int
self.setAction(Button.CLICK_LEFT, self.incrementState)
self.setAction(Button.CLICK_RIGHT, self.decrementState)
self.set_action(Button.CLICK_LEFT, self.increment_state)
self.set_action(Button.CLICK_RIGHT, self.decrement_state)
def incrementState(self) -> None:
def increment_state(self) -> None:
self.state += 1
self.changeState()
self.change_state()
def decrementState(self) -> None:
def decrement_state(self) -> None:
self.state -= 1
self.changeState()
self.change_state()
def setChangedState(self, callback: typing.Callable) -> None:
def set_changed_state(self, callback: typing.Callable) -> None:
self.callback = callback
def changeState(self) -> None:
def change_state(self) -> None:
self.state %= self.numberStates
self.bar.taskGroup.create_task(self.callback())
class StatefulSectionProvider(Provider):
sectionType = StatefulSection
section_type = StatefulSection
class SingleStatefulSectionProvider(
@ -534,46 +531,44 @@ class SingleStatefulSectionProvider(
class MultiSectionsProvider(Provider):
def __init__(
self, color: rich.color.Color = rich.color.Color.default()
) -> None:
def __init__(self, color: rich.color.Color = RICH_DEFAULT_COLOR) -> None:
super().__init__(color=color)
self.sectionKeys: dict[Module, dict[Sortable, Section]] = (
collections.defaultdict(dict)
)
self.updaters: dict[Section, typing.Callable] = dict()
self.updaters: dict[Section, typing.Callable] = {}
async def getSectionUpdater(self, section: Section) -> typing.Callable:
async def get_section_updater(self, section: Section) -> typing.Callable:
raise NotImplementedError
@staticmethod
async def doNothing() -> None:
async def do_nothing() -> None:
pass
async def updateSections(
async def update_sections(
self, sections: set[Sortable], module: Module
) -> None:
moduleSections = self.sectionKeys[module]
module_sections = self.sectionKeys[module]
async with asyncio.TaskGroup() as tg:
for sortKey in sections:
section = moduleSections.get(sortKey)
for sort_key in sections:
section = module_sections.get(sort_key)
if not section:
section = self.sectionType(
parent=module, sortKey=sortKey, color=self.color
section = self.section_type(
parent=module, sort_key=sort_key, color=self.color
)
self.updaters[section] = await self.getSectionUpdater(
self.updaters[section] = await self.get_section_updater(
section
)
moduleSections[sortKey] = section
module_sections[sort_key] = section
updater = self.updaters[section]
tg.create_task(updater())
missingKeys = set(moduleSections.keys()) - sections
for missingKey in missingKeys:
section = moduleSections.get(missingKey)
missing_keys = set(module_sections.keys()) - sections
for missing_key in missing_keys:
section = module_sections.get(missing_key)
assert section
section.setText(None)
section.set_text(None)
class PeriodicProvider(Provider):
@ -585,7 +580,7 @@ class PeriodicProvider(Provider):
@classmethod
async def task(cls, bar: Bar) -> None:
providers = list()
providers = []
for provider in bar.providers:
if isinstance(provider, PeriodicProvider):
providers.append(provider)
@ -596,7 +591,7 @@ class PeriodicProvider(Provider):
loops = [provider.loop() for provider in providers]
asyncio.gather(*loops)
now = datetime.datetime.now()
now = datetime.datetime.now(datetime.UTC)
# Hardcoded to 1 second... not sure if we want some more than that,
# and if the logic to check if a task should run would be a win
# compared to the task itself
@ -606,11 +601,11 @@ class PeriodicProvider(Provider):
async def run(self) -> None:
await super().run()
for module in self.modules:
bar = module.getFirstParentOfType(Bar)
bar = module.get_first_parent_of_type(Bar)
assert bar
if not bar.periodicProviderTask:
bar.periodicProviderTask = PeriodicProvider.task(bar)
bar.addLongRunningTask(bar.periodicProviderTask)
bar.add_long_running_task(bar.periodicProviderTask)
class PeriodicStatefulProvider(
@ -618,7 +613,7 @@ class PeriodicStatefulProvider(
):
async def run(self) -> None:
await super().run()
self.section.setChangedState(self.loop)
self.section.set_changed_state(self.loop)
class AlertingProvider(Provider):
@ -626,16 +621,16 @@ class AlertingProvider(Provider):
COLOR_WARNING = rich.color.Color.parse("yellow")
COLOR_DANGER = rich.color.Color.parse("red")
warningThreshold: float
dangerThreshold: float
warning_threshold: float
danger_threshold: float
def updateLevel(self, level: float) -> None:
if level > self.dangerThreshold:
def update_level(self, level: float) -> None:
if level > self.danger_threshold:
color = self.COLOR_DANGER
elif level > self.warningThreshold:
elif level > self.warning_threshold:
color = self.COLOR_WARNING
else:
color = self.COLOR_NORMAL
for module in self.modules:
for section in module.getSections():
for section in module.get_sections():
section.color = color

View file

@ -16,6 +16,7 @@ import pulsectl_asyncio
import rich.color
from frobar.common import (
RICH_DEFAULT_COLOR,
AlertingProvider,
Button,
MirrorProvider,
@ -29,18 +30,20 @@ from frobar.common import (
StatefulSection,
StatefulSectionProvider,
clip,
humanSize,
human_size,
log,
ramp,
)
gi.require_version("Playerctl", "2.0")
import gi.repository.Playerctl
import gi.repository.Playerctl # noqa: E402
T = typing.TypeVar("T")
class I3ModeProvider(SingleSectionProvider):
def on_mode(self, i3: i3ipc.Connection, e: i3ipc.Event) -> None:
self.section.setText(None if e.change == "default" else e.change)
def on_mode(self, _i3: i3ipc.Connection, e: i3ipc.Event) -> None:
self.section.set_text(None if e.change == "default" else e.change)
async def run(self) -> None:
await super().run()
@ -54,11 +57,11 @@ class I3ModeProvider(SingleSectionProvider):
class I3WindowTitleProvider(SingleSectionProvider):
# TODO FEAT To make this available from start, we need to find the
# `focused=True` element following the `focus` array
def on_window(self, i3: i3ipc.Connection, e: i3ipc.Event) -> None:
def on_window(self, _i3: i3ipc.Connection, e: i3ipc.Event) -> None:
if e.container.name is None:
self.section.setText(None)
self.section.set_text(None)
else:
self.section.setText(clip(e.container.name, 60))
self.section.set_text(clip(e.container.name, 60))
async def run(self) -> None:
await super().run()
@ -76,15 +79,15 @@ class I3WorkspacesProvider(MultiSectionsProvider):
def __init__(
self,
custom_names: dict[str, str] = {},
custom_names: dict[str, str] | None = None,
) -> None:
super().__init__()
self.workspaces: dict[int, i3ipc.WorkspaceReply]
self.custom_names = custom_names
self.custom_names = custom_names or {}
self.modulesFromOutput: dict[str, Module] = dict()
self.modulesFromOutput: dict[str, Module] = {}
async def getSectionUpdater(self, section: Section) -> typing.Callable:
async def get_section_updater(self, section: Section) -> typing.Callable:
assert isinstance(section.sortKey, int)
num = section.sortKey
@ -93,13 +96,13 @@ class I3WorkspacesProvider(MultiSectionsProvider):
self.i3.command(f"workspace number {num}")
)
section.setAction(Button.CLICK_LEFT, switch_to_workspace)
section.set_action(Button.CLICK_LEFT, switch_to_workspace)
async def update() -> None:
workspace = self.workspaces.get(num)
if workspace is None:
log.warning(f"Can't find workspace {num}")
section.setText("X")
section.set_text("X")
return
name = workspace.name
@ -113,20 +116,18 @@ class I3WorkspacesProvider(MultiSectionsProvider):
section.color = self.COLOR_DEFAULT
if workspace.focused:
name = self.custom_names.get(name, name)
section.setText(name)
section.set_text(name)
return update
async def updateWorkspaces(self) -> None:
"""
Since the i3 IPC interface cannot really tell you by events
when workspaces get invisible or not urgent anymore.
Relying on those exclusively would require reimplementing some of i3 logic.
Fetching all the workspaces on event looks ugly but is the most maintainable.
Times I tried to challenge this and failed: 2.
"""
async def update_workspaces(self) -> None:
# Since the i3 IPC interface cannot really tell you by events
# when workspaces get invisible or not urgent anymore.
# Relying on those exclusively would need reimplementing the i3 logic.
# Fetching all the workspaces on event is ugly but maintainable.
# Times I tried to challenge this and failed: 2.
workspaces = await self.i3.get_workspaces()
self.workspaces = dict()
self.workspaces = {}
modules = collections.defaultdict(set)
for workspace in workspaces:
self.workspaces[workspace.num] = workspace
@ -135,38 +136,40 @@ class I3WorkspacesProvider(MultiSectionsProvider):
await asyncio.gather(
*[
self.updateSections(nums, module)
self.update_sections(nums, module)
for module, nums in modules.items()
]
)
def onWorkspaceChange(
self, i3: i3ipc.Connection, e: i3ipc.Event | None = None
def on_workspace_change(
self,
_i3: i3ipc.Connection,
_event: i3ipc.Event | None = None,
) -> None:
# Cancelling the task doesn't seem to prevent performance double-events
self.bar.taskGroup.create_task(self.updateWorkspaces())
self.bar.taskGroup.create_task(self.update_workspaces())
async def run(self) -> None:
for module in self.modules:
screen = module.getFirstParentOfType(Screen)
screen = module.get_first_parent_of_type(Screen)
output = screen.output
self.modulesFromOutput[output] = module
self.bar = module.bar
self.i3 = await i3ipc.aio.Connection(auto_reconnect=True).connect()
self.i3.on(i3ipc.Event.WORKSPACE, self.onWorkspaceChange)
self.onWorkspaceChange(self.i3)
self.i3.on(i3ipc.Event.WORKSPACE, self.on_workspace_change)
self.on_workspace_change(self.i3)
await self.i3.main()
class MprisProvider(MirrorProvider):
STATUSES = {
STATUSES: typing.ClassVar = {
gi.repository.Playerctl.PlaybackStatus.PLAYING: "",
gi.repository.Playerctl.PlaybackStatus.PAUSED: "",
gi.repository.Playerctl.PlaybackStatus.STOPPED: "",
}
PROVIDERS = {
PROVIDERS: typing.ClassVar = {
"mpd": "",
"firefox": "",
"chromium": "",
@ -175,10 +178,10 @@ class MprisProvider(MirrorProvider):
async def run(self) -> None:
await super().run()
self.status = self.sectionType(parent=self.module, color=self.color)
self.album = self.sectionType(parent=self.module, color=self.color)
self.artist = self.sectionType(parent=self.module, color=self.color)
self.title = self.sectionType(parent=self.module, color=self.color)
self.status = self.section_type(parent=self.module, color=self.color)
self.album = self.section_type(parent=self.module, color=self.color)
self.artist = self.section_type(parent=self.module, color=self.color)
self.title = self.section_type(parent=self.module, color=self.color)
self.manager = gi.repository.Playerctl.PlayerManager()
self.manager.connect("name-appeared", self.on_name_appeared)
@ -196,13 +199,13 @@ class MprisProvider(MirrorProvider):
for name in self.manager.props.player_names:
self.init_player(name)
self.updateSections()
self.update_sections()
while True:
# Occasionally it will skip a second
# but haven't managed to reproduce with debug info
await self.playing.wait()
self.updateTitle()
self.update_title()
if self.player:
pos = self.player.props.position
rem = 1 - (pos % 1000000) / 1000000
@ -214,20 +217,20 @@ class MprisProvider(MirrorProvider):
def get(
something: gi.overrides.GLib.Variant,
key: str,
default: typing.Any = None,
) -> typing.Any:
if key in something.keys():
default: T | None = None,
) -> T | None:
if key in something.keys(): # noqa: SIM118
return something[key]
return default
@staticmethod
def formatUs(ms: int) -> str:
def format_us(ms: int) -> str:
if ms < 60 * 60 * 1000000:
return time.strftime("%M:%S", time.gmtime(ms // 1000000))
return str(datetime.timedelta(microseconds=ms))
def findCurrentPlayer(self) -> None:
for name in [self.playerctldName] + self.manager.props.player_names:
def find_current_player(self) -> None:
for name in [self.playerctldName, *self.manager.props.player_names]:
try:
self.player = gi.repository.Playerctl.Player.new_from_name(
name
@ -241,21 +244,21 @@ class MprisProvider(MirrorProvider):
else:
self.player = None
def updateSections(self) -> None:
self.findCurrentPlayer()
def update_sections(self) -> None:
self.find_current_player()
if self.player is None:
self.status.setText(None)
self.album.setText(None)
self.artist.setText(None)
self.title.setText(None)
self.status.set_text(None)
self.album.set_text(None)
self.artist.set_text(None)
self.title.set_text(None)
self.playing.clear()
return
player = self.player.props.player_name
player = self.PROVIDERS.get(player, player)
status = self.STATUSES.get(self.player.props.playback_status, "?")
self.status.setText(f"{player} {status}")
self.status.set_text(f"{player} {status}")
if (
self.player.props.playback_status
@ -269,48 +272,48 @@ class MprisProvider(MirrorProvider):
album = self.get(metadata, "xesam:album")
if album:
self.album.setText(f"{clip(album)}")
self.album.set_text(f"{clip(album)}")
else:
self.album.setText(None)
self.album.set_text(None)
artists = self.get(metadata, "xesam:artist")
if artists:
artist = ", ".join(artists)
self.artist.setText(f"{clip(artist)}")
self.artist.set_text(f"{clip(artist)}")
else:
self.artist.setText(None)
self.artist.set_text(None)
self.updateTitle()
self.update_title()
def updateTitle(self) -> None:
def update_title(self) -> None:
if self.player is None:
return
metadata = self.player.props.metadata
pos = self.player.props.position # In µs
text = f"{self.formatUs(pos)}"
text = f"{self.format_us(pos)}"
dur = self.get(metadata, "mpris:length")
if dur:
text += f"/{self.formatUs(dur)}"
text += f"/{self.format_us(dur)}"
title = self.get(metadata, "xesam:title")
if title:
text += f" {clip(title)}"
self.title.setText(text)
self.title.set_text(text)
def on_player_vanished(
self,
manager: gi.repository.Playerctl.PlayerManager,
player: gi.repository.Playerctl.Player,
_manager: gi.repository.Playerctl.PlayerManager,
_player: gi.repository.Playerctl.Player,
) -> None:
self.updateSections()
self.update_sections()
def on_event(
self,
player: gi.repository.Playerctl.Player,
_: typing.Any,
manager: gi.repository.Playerctl.PlayerManager,
_player: gi.repository.Playerctl.Player,
_event_data: gi.overrides.GLib.Variant,
_manager: gi.repository.Playerctl.PlayerManager,
) -> None:
self.updateSections()
self.update_sections()
def init_player(self, name: gi.repository.Playerctl.PlayerName) -> None:
player = gi.repository.Playerctl.Player.new_from_name(name)
@ -325,136 +328,165 @@ class MprisProvider(MirrorProvider):
self.manager.manage_player(player)
def on_name_appeared(
self, manager: gi.repository.Playerctl.PlayerManager, name: str
self,
_manager: gi.repository.Playerctl.PlayerManager,
name: str,
) -> None:
self.init_player(name)
self.updateSections()
self.update_sections()
class CpuProvider(AlertingProvider, PeriodicStatefulProvider):
STATE_MINIMIZED = 0
STATE_AGGREGATED = 1
STATE_FULL = 2
async def init(self) -> None:
self.section.numberStates = 3
self.warningThreshold = 75
self.dangerThreshold = 95
self.warning_threshold = 75
self.danger_threshold = 95
async def loop(self) -> None:
percent = psutil.cpu_percent(percpu=False)
self.updateLevel(percent)
self.update_level(percent)
text = ""
if self.section.state >= 2:
if self.section.state >= self.STATE_FULL:
percents = psutil.cpu_percent(percpu=True)
text += " " + "".join([ramp(p / 100) for p in percents])
elif self.section.state >= 1:
elif self.section.state >= self.STATE_AGGREGATED:
text += " " + ramp(percent / 100)
self.section.setText(text)
self.section.set_text(text)
class LoadProvider(AlertingProvider, PeriodicStatefulProvider):
STATE_MINIMIZED = 0
STATE_AGGREGATED = 1
STATE_FULL = 2
async def init(self) -> None:
self.section.numberStates = 3
self.warningThreshold = 5
self.dangerThreshold = 10
self.warning_threshold = 5
self.danger_threshold = 10
async def loop(self) -> None:
load = os.getloadavg()
self.updateLevel(load[0])
self.update_level(load[0])
text = ""
loads = 3 if self.section.state >= 2 else self.section.state
loads = (
3 if self.section.state >= self.STATE_FULL else self.section.state
)
for load_index in range(loads):
text += f" {load[load_index]:.2f}"
self.section.setText(text)
self.section.set_text(text)
class RamProvider(AlertingProvider, PeriodicStatefulProvider):
STATE_MINIMIZED = 0
STATE_ICON = 1
STATE_NUMBER = 2
STATE_FULL = 3
async def init(self) -> None:
self.section.numberStates = 4
self.warningThreshold = 75
self.dangerThreshold = 95
self.warning_threshold = 75
self.danger_threshold = 95
async def loop(self) -> None:
mem = psutil.virtual_memory()
self.updateLevel(mem.percent)
self.update_level(mem.percent)
text = ""
if self.section.state >= 1:
if self.section.state >= self.STATE_ICON:
text += " " + ramp(mem.percent / 100)
if self.section.state >= 2:
text += humanSize(mem.total - mem.available)
if self.section.state >= 3:
text += "/" + humanSize(mem.total)
self.section.setText(text)
if self.section.state >= self.STATE_NUMBER:
text += human_size(mem.total - mem.available)
if self.section.state >= self.STATE_FULL:
text += "/" + human_size(mem.total)
self.section.set_text(text)
class TemperatureProvider(AlertingProvider, PeriodicStatefulProvider):
RAMP = ""
MAIN_TEMPS = ["coretemp", "amdgpu", "cpu_thermal"]
MAIN_TEMPS = ("coretemp", "amdgpu", "cpu_thermal")
# For Intel, AMD and ARM respectively.
STATE_MINIMIZED = 0
STATE_FULL = 1
main: str
async def init(self) -> None:
self.section.numberStates = 2
allTemp = psutil.sensors_temperatures()
all_temps = psutil.sensors_temperatures()
for main in self.MAIN_TEMPS:
if main in allTemp:
if main in all_temps:
self.main = main
break
else:
raise IndexError("Could not find suitable temperature sensor")
msg = "Could not find suitable temperature sensor"
raise IndexError(msg)
temp = allTemp[self.main][0]
self.warningThreshold = temp.high or 90.0
self.dangerThreshold = temp.critical or 100.0
temp = all_temps[self.main][0]
self.warning_threshold = temp.high or 90.0
self.danger_threshold = temp.critical or 100.0
async def loop(self) -> None:
allTemp = psutil.sensors_temperatures()
temp = allTemp[self.main][0]
self.updateLevel(temp.current)
all_temps = psutil.sensors_temperatures()
temp = all_temps[self.main][0]
self.update_level(temp.current)
text = ramp(temp.current / self.warningThreshold, self.RAMP)
if self.section.state >= 1:
text = ramp(temp.current / self.warning_threshold, self.RAMP)
if self.section.state >= self.STATE_FULL:
text += f" {temp.current:.0f}°C"
self.section.setText(text)
self.section.set_text(text)
class BatteryProvider(AlertingProvider, PeriodicStatefulProvider):
# TODO Support ACPID for events
RAMP = ""
STATE_MINIMIZED = 0
STATE_PERCENTAGE = 1
STATE_ESTIMATE = 2
async def init(self) -> None:
self.section.numberStates = 3
# TODO 1 refresh rate is too quick
self.warningThreshold = 75
self.dangerThreshold = 95
self.warning_threshold = 75
self.danger_threshold = 95
async def loop(self) -> None:
bat = psutil.sensors_battery()
if not bat:
self.section.setText(None)
self.section.set_text(None)
self.updateLevel(100 - bat.percent)
self.update_level(100 - bat.percent)
text = "" if bat.power_plugged else ""
text += ramp(bat.percent / 100, self.RAMP)
if self.section.state >= 1:
if self.section.state >= self.STATE_PERCENTAGE:
text += f" {bat.percent:.0f}%"
if self.section.state >= 2:
if self.section.state >= self.STATE_ESTIMATE:
h = int(bat.secsleft / 3600)
m = int((bat.secsleft - h * 3600) / 60)
text += f" ({h:d}:{m:02d})"
self.section.setText(text)
self.section.set_text(text)
class PulseaudioProvider(
MirrorProvider, StatefulSectionProvider, MultiSectionsProvider
):
async def getSectionUpdater(self, section: Section) -> typing.Callable:
STATE_MINIMIZED = 0
STATE_BAR = 1
STATE_PERCENTAGE = 2
async def get_section_updater(self, section: Section) -> typing.Callable:
assert isinstance(section, StatefulSection)
assert isinstance(section.sortKey, str)
@ -478,7 +510,7 @@ class PulseaudioProvider(
icon = ""
section.numberStates = 3
section.state = 1
section.state = self.STATE_BAR
# TODO Change volume with wheel
@ -491,23 +523,21 @@ class PulseaudioProvider(
"frobar-get-volume"
) as pulse:
vol = await pulse.volume_get_all_chans(sink)
if section.state == 1:
if section.state == self.STATE_BAR:
text += f" {ramp(vol)}"
elif section.state == 2:
elif section.state == self.STATE_PERCENTAGE:
text += f" {vol:.0%}"
# TODO Show which is default
section.setText(text)
section.set_text(text)
section.setChangedState(updater)
section.set_changed_state(updater)
return updater
async def update(self) -> None:
async with pulsectl_asyncio.PulseAsync("frobar-list-sinks") as pulse:
self.sinks = dict(
(sink.name, sink) for sink in await pulse.sink_list()
)
await self.updateSections(set(self.sinks.keys()), self.module)
self.sinks = {sink.name: sink for sink in await pulse.sink_list()}
await self.update_sections(set(self.sinks.keys()), self.module)
async def run(self) -> None:
await super().run()
@ -515,7 +545,7 @@ class PulseaudioProvider(
async with pulsectl_asyncio.PulseAsync(
"frobar-events-listener"
) as pulse:
async for event in pulse.subscribe_events(
async for _ in pulse.subscribe_events(
pulsectl.PulseEventMaskEnum.sink
):
await self.update()
@ -527,9 +557,15 @@ class NetworkProvider(
StatefulSectionProvider,
MultiSectionsProvider,
):
STATE_MINIMIZED = 0
STATE_SSID = 1
STATE_IP = 2
STATE_SPEED = 3
STATE_TOTALS = 4
def __init__(
self,
color: rich.color.Color = rich.color.Color.default(),
color: rich.color.Color = RICH_DEFAULT_COLOR,
) -> None:
super().__init__(color=color)
@ -539,27 +575,19 @@ class NetworkProvider(
self.io_counters = psutil.net_io_counters(pernic=True)
@staticmethod
def getIfaceAttributes(iface: str) -> tuple[bool, str, bool]:
def get_iface_attribute(iface: str) -> tuple[bool, str, bool]:
relevant = True
icon = "?"
wifi = False
if iface == "lo":
relevant = False
elif iface.startswith("eth") or iface.startswith("enp"):
if "u" in iface:
icon = ""
else:
icon = ""
elif iface.startswith("wlan") or iface.startswith("wl"):
elif iface.startswith(("eth", "enp")):
icon = "" if "u" in iface else ""
elif iface.startswith(("wlan", "wl")):
icon = ""
wifi = True
elif (
iface.startswith("tun")
or iface.startswith("tap")
or iface.startswith("wg")
):
elif iface.startswith(("tun", "tap", "wg")):
icon = ""
elif iface.startswith("docker"):
icon = ""
elif iface.startswith("veth"):
@ -569,30 +597,30 @@ class NetworkProvider(
return relevant, icon, wifi
async def getSectionUpdater(self, section: Section) -> typing.Callable:
async def get_section_updater(self, section: Section) -> typing.Callable:
assert isinstance(section, StatefulSection)
assert isinstance(section.sortKey, str)
iface = section.sortKey
relevant, icon, wifi = self.getIfaceAttributes(iface)
relevant, icon, wifi = self.get_iface_attribute(iface)
if not relevant:
return self.doNothing
return self.do_nothing
section.numberStates = 5 if wifi else 4
section.state = 1 if wifi else 0
section.state = self.STATE_SSID if wifi else self.STATE_MINIMIZED
async def update() -> None:
assert isinstance(section, StatefulSection)
if not self.if_stats[iface].isup:
section.setText(None)
section.set_text(None)
return
text = icon
state = section.state + (0 if wifi else 1)
if wifi and state >= 1: # SSID
if wifi and state >= self.STATE_SSID:
cmd = ["iwgetid", iface, "--raw"]
proc = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE
@ -600,7 +628,7 @@ class NetworkProvider(
stdout, stderr = await proc.communicate()
text += f" {stdout.decode().strip()}"
if state >= 2: # Address
if state >= self.STATE_IP:
for address in self.if_addrs[iface]:
if address.family == socket.AF_INET:
net = ipaddress.IPv4Network(
@ -609,23 +637,23 @@ class NetworkProvider(
text += f" {address.address}/{net.prefixlen}"
break
if state >= 3: # Speed
prevRecv = self.prev_io_counters[iface].bytes_recv
if state >= self.STATE_SPEED:
prev_recv = self.prev_io_counters[iface].bytes_recv
recv = self.io_counters[iface].bytes_recv
prevSent = self.prev_io_counters[iface].bytes_sent
prev_sent = self.prev_io_counters[iface].bytes_sent
sent = self.io_counters[iface].bytes_sent
dt = self.time - self.prev_time
recvDiff = (recv - prevRecv) / dt
sentDiff = (sent - prevSent) / dt
text += f"{humanSize(recvDiff)}{humanSize(sentDiff)}"
recv_diff = (recv - prev_recv) / dt
sent_diff = (sent - prev_sent) / dt
text += f"{human_size(recv_diff)}{human_size(sent_diff)}"
if state >= 4: # Counter
text += f"{humanSize(recv)}{humanSize(sent)}"
if state >= self.STATE_TOTALS:
text += f"{human_size(recv)}{human_size(sent)}"
section.setText(text)
section.set_text(text)
section.setChangedState(update)
section.set_changed_state(update)
return update
@ -643,17 +671,17 @@ class NetworkProvider(
self.if_addrs = psutil.net_if_addrs()
self.io_counters = psutil.net_io_counters(pernic=True)
await self.updateSections(set(self.if_stats.keys()), self.module)
await self.update_sections(set(self.if_stats.keys()), self.module)
class TimeProvider(PeriodicStatefulProvider):
FORMATS = ["%H:%M", "%m-%d %H:%M:%S", "%a %y-%m-%d %H:%M:%S"]
FORMATS = ("%H:%M", "%m-%d %H:%M:%S", "%a %y-%m-%d %H:%M:%S")
async def init(self) -> None:
self.section.state = 1
self.section.numberStates = len(self.FORMATS)
async def loop(self) -> None:
now = datetime.datetime.now()
format = self.FORMATS[self.section.state]
self.section.setText(now.strftime(format))
now = datetime.datetime.now(datetime.UTC).astimezone()
format_ = self.FORMATS[self.section.state]
self.section.set_text(now.strftime(format_))

View file

@ -1,7 +1,6 @@
#!/usr/bin/env python3
import os
import sys
import sys # noqa: I001
import contextlib
import pathlib
# From https://github.com/python/cpython/blob/v3.7.0b5/Lib/site.py#L436
@ -11,7 +10,7 @@ def register_readline() -> None:
try:
import readline
import rlcompleter
import rlcompleter # noqa: F401
except ImportError:
return
@ -23,14 +22,13 @@ def register_readline() -> None:
else:
readline.parse_and_bind("tab: complete")
try:
# An OSError here could have many causes, but the most likely one
# is that there's no .inputrc file (or .editrc file in the case of
# Mac OS X + libedit) in the expected location. In that case, we
# want to ignore the exception.
cm = contextlib.suppress(OSError)
with cm:
readline.read_init_file()
except OSError:
# An OSError here could have many causes, but the most likely one
# is that there's no .inputrc file (or .editrc file in the case of
# Mac OS X + libedit) in the expected location. In that case, we
# want to ignore the exception.
pass
if readline.get_current_history_length() == 0:
# If no history was loaded, default to .python_history.
@ -38,14 +36,11 @@ def register_readline() -> None:
# each interpreter exit when readline was already configured
# through a PYTHONSTARTUP hook, see:
# http://bugs.python.org/issue5845#msg198636
history = os.path.join(
os.path.expanduser("~"), ".cache/python_history"
)
try:
history = pathlib.Path("~/.cache/python_history").expanduser()
cm = contextlib.suppress(OSError)
with cm:
readline.read_history_file(history)
except OSError:
pass
atexit.register(readline.write_history_file, history)
sys.__interactivehook__ = register_readline
sys.__interactivehook__ = register_readline # noqa: attr-defined