Reformat all python files

This commit is contained in:
Geoffrey Frogeye 2025-05-08 17:55:34 +02:00
parent c0873f4343
commit a3f4c2a932
6 changed files with 103 additions and 44 deletions

View file

@ -27,8 +27,7 @@ def process_flake(flakeUri: str) -> None:
dep_url = dep["url"]
# if not local path, continue
if not (
dep_url.startswith("path:")
or dep_url.startswith("git+file:")
dep_url.startswith("path:") or dep_url.startswith("git+file:")
):
continue
if dep.get("flake", True):

View file

@ -36,7 +36,9 @@ def main() -> None:
theme = rich.terminal_theme.TerminalTheme(
base16_color(0x0),
base16_color(0x0), # TODO should be 7, currently 0 so it's compatible with v2
base16_color(
0x0
), # TODO should be 7, currently 0 so it's compatible with v2
[
base16_color(0x0), # black
base16_color(0x8), # red
@ -72,7 +74,8 @@ def main() -> None:
color = rich.color.Color.parse
bar.addProvider(
frobar.providers.I3ModeProvider(color=color("red")), alignment=Alignment.LEFT
frobar.providers.I3ModeProvider(color=color("red")),
alignment=Alignment.LEFT,
)
bar.addProvider(
frobar.providers.I3WorkspacesProvider(custom_names=workspaces_names),
@ -136,7 +139,8 @@ def main() -> None:
alignment=Alignment.RIGHT,
)
bar.addProvider(
frobar.providers.TimeProvider(color=color("cyan")), alignment=Alignment.RIGHT
frobar.providers.TimeProvider(color=color("cyan")),
alignment=Alignment.RIGHT,
)
bar.launch()

View file

@ -63,7 +63,6 @@ def clip(text: str, length: int = 30) -> str:
class ComposableText(typing.Generic[P, C]):
def __init__(
self,
parent: typing.Optional[P] = None,
@ -190,7 +189,9 @@ class Section(ComposableText):
else:
self.animationTask = self.bar.taskGroup.create_task(self.animate())
def setAction(self, button: Button, callback: typing.Callable | None) -> None:
def setAction(
self, button: Button, callback: typing.Callable | None
) -> None:
if button in self.actions:
command = self.actions[button]
self.bar.removeAction(command)
@ -406,7 +407,9 @@ class Bar(ComposableText):
modules = list()
for s, screen in enumerate(self.children):
if screenNum is None or s == screenNum:
side = next(filter(lambda s: s.alignment == alignment, screen.children))
side = next(
filter(lambda s: s.alignment == alignment, screen.children)
)
module = Module(parent=side)
modules.append(module)
provider.modules = modules
@ -433,7 +436,9 @@ class Bar(ComposableText):
class Provider:
sectionType: type[Section] = Section
def __init__(self, color: rich.color.Color = rich.color.Color.default()) -> None:
def __init__(
self, color: rich.color.Color = rich.color.Color.default()
) -> None:
self.modules: list[Module] = list()
self.color = color
@ -443,7 +448,9 @@ class Provider:
class MirrorProvider(Provider):
def __init__(self, color: rich.color.Color = rich.color.Color.default()) -> None:
def __init__(
self, color: rich.color.Color = rich.color.Color.default()
) -> None:
super().__init__(color=color)
self.module: Module
@ -490,7 +497,6 @@ class SpacerProvider(SingleSectionProvider):
class StatefulSection(Section):
def __init__(
self,
parent: Module,
@ -524,13 +530,16 @@ class StatefulSectionProvider(Provider):
sectionType = StatefulSection
class SingleStatefulSectionProvider(StatefulSectionProvider, SingleSectionProvider):
class SingleStatefulSectionProvider(
StatefulSectionProvider, SingleSectionProvider
):
section: StatefulSection
class MultiSectionsProvider(Provider):
def __init__(self, color: rich.color.Color = rich.color.Color.default()) -> None:
def __init__(
self, color: rich.color.Color = rich.color.Color.default()
) -> None:
super().__init__(color=color)
self.sectionKeys: dict[Module, dict[Sortable, Section]] = (
collections.defaultdict(dict)
@ -544,7 +553,9 @@ class MultiSectionsProvider(Provider):
async def doNothing() -> None:
pass
async def updateSections(self, sections: set[Sortable], module: Module) -> None:
async def updateSections(
self, sections: set[Sortable], module: Module
) -> None:
moduleSections = self.sectionKeys[module]
async with asyncio.TaskGroup() as tg:
for sortKey in sections:
@ -553,7 +564,9 @@ class MultiSectionsProvider(Provider):
section = self.sectionType(
parent=module, sortKey=sortKey, color=self.color
)
self.updaters[section] = await self.getSectionUpdater(section)
self.updaters[section] = await self.getSectionUpdater(
section
)
moduleSections[sortKey] = section
updater = self.updaters[section]
@ -603,7 +616,9 @@ class PeriodicProvider(Provider):
bar.addLongRunningTask(bar.periodicProviderTask)
class PeriodicStatefulProvider(SingleStatefulSectionProvider, PeriodicProvider):
class PeriodicStatefulProvider(
SingleStatefulSectionProvider, PeriodicProvider
):
async def run(self) -> None:
await super().run()
self.section.setChangedState(self.loop)

View file

@ -15,11 +15,24 @@ import pulsectl
import pulsectl_asyncio
import rich.color
from frobar.common import (AlertingProvider, Button, MirrorProvider, Module,
MultiSectionsProvider, PeriodicProvider,
PeriodicStatefulProvider, Screen, Section,
SingleSectionProvider, StatefulSection,
StatefulSectionProvider, clip, humanSize, log, ramp)
from frobar.common import (
AlertingProvider,
Button,
MirrorProvider,
Module,
MultiSectionsProvider,
PeriodicProvider,
PeriodicStatefulProvider,
Screen,
Section,
SingleSectionProvider,
StatefulSection,
StatefulSectionProvider,
clip,
humanSize,
log,
ramp,
)
gi.require_version("Playerctl", "2.0")
import gi.repository.Playerctl
@ -76,7 +89,9 @@ class I3WorkspacesProvider(MultiSectionsProvider):
num = section.sortKey
def switch_to_workspace() -> None:
self.bar.taskGroup.create_task(self.i3.command(f"workspace number {num}"))
self.bar.taskGroup.create_task(
self.i3.command(f"workspace number {num}")
)
section.setAction(Button.CLICK_LEFT, switch_to_workspace)
@ -119,7 +134,10 @@ class I3WorkspacesProvider(MultiSectionsProvider):
modules[module].add(workspace.num)
await asyncio.gather(
*[self.updateSections(nums, module) for module, nums in modules.items()]
*[
self.updateSections(nums, module)
for module, nums in modules.items()
]
)
def onWorkspaceChange(
@ -142,7 +160,6 @@ class I3WorkspacesProvider(MultiSectionsProvider):
class MprisProvider(MirrorProvider):
STATUSES = {
gi.repository.Playerctl.PlaybackStatus.PLAYING: "",
gi.repository.Playerctl.PlaybackStatus.PAUSED: "",
@ -169,7 +186,9 @@ class MprisProvider(MirrorProvider):
self.playerctldName = gi.repository.Playerctl.PlayerName()
self.playerctldName.name = "playerctld"
self.playerctldName.source = gi.repository.Playerctl.Source.DBUS_SESSION
self.playerctldName.source = (
gi.repository.Playerctl.Source.DBUS_SESSION
)
self.player: gi.repository.Playerctl.Player | None = None
self.playing = asyncio.Event()
@ -193,7 +212,9 @@ class MprisProvider(MirrorProvider):
@staticmethod
def get(
something: gi.overrides.GLib.Variant, key: str, default: typing.Any = None
something: gi.overrides.GLib.Variant,
key: str,
default: typing.Any = None,
) -> typing.Any:
if key in something.keys():
return something[key]
@ -210,7 +231,9 @@ class MprisProvider(MirrorProvider):
def findCurrentPlayer(self) -> None:
for name in [self.playerctldName] + self.manager.props.player_names:
try:
self.player = gi.repository.Playerctl.Player.new_from_name(name)
self.player = gi.repository.Playerctl.Player.new_from_name(
name
)
except gi.repository.GLib.GError:
# Player not found, usually playerctld during startup
continue
@ -347,7 +370,6 @@ class LoadProvider(AlertingProvider, PeriodicStatefulProvider):
class RamProvider(AlertingProvider, PeriodicStatefulProvider):
async def init(self) -> None:
self.section.numberStates = 4
self.warningThreshold = 75
@ -467,7 +489,9 @@ class PulseaudioProvider(
text = icon
sink = self.sinks[section.sortKey]
async with pulsectl_asyncio.PulseAsync("frobar-get-volume") as pulse:
async with pulsectl_asyncio.PulseAsync(
"frobar-get-volume"
) as pulse:
vol = await pulse.volume_get_all_chans(sink)
if section.state == 1:
text += f" {ramp(vol)}"
@ -482,19 +506,28 @@ class PulseaudioProvider(
async def update(self) -> None:
async with pulsectl_asyncio.PulseAsync("frobar-list-sinks") as pulse:
self.sinks = dict((sink.name, sink) for sink in await pulse.sink_list())
self.sinks = dict(
(sink.name, sink) for sink in await pulse.sink_list()
)
await self.updateSections(set(self.sinks.keys()), self.module)
async def run(self) -> None:
await super().run()
await self.update()
async with pulsectl_asyncio.PulseAsync("frobar-events-listener") as pulse:
async for event in pulse.subscribe_events(pulsectl.PulseEventMaskEnum.sink):
async with pulsectl_asyncio.PulseAsync(
"frobar-events-listener"
) as pulse:
async for event in pulse.subscribe_events(
pulsectl.PulseEventMaskEnum.sink
):
await self.update()
class NetworkProvider(
MirrorProvider, PeriodicProvider, StatefulSectionProvider, MultiSectionsProvider
MirrorProvider,
PeriodicProvider,
StatefulSectionProvider,
MultiSectionsProvider,
):
def __init__(
self,
@ -523,7 +556,9 @@ class NetworkProvider(
icon = ""
wifi = True
elif (
iface.startswith("tun") or iface.startswith("tap") or iface.startswith("wg")
iface.startswith("tun")
or iface.startswith("tap")
or iface.startswith("wg")
):
icon = ""
@ -537,7 +572,6 @@ class NetworkProvider(
return relevant, icon, wifi
async def getSectionUpdater(self, section: Section) -> typing.Callable:
assert isinstance(section, StatefulSection)
assert isinstance(section.sortKey, str)
iface = section.sortKey

View file

@ -3,10 +3,12 @@
import sys
import os
# From https://github.com/python/cpython/blob/v3.7.0b5/Lib/site.py#L436
# Changing the history file
def register_readline() -> None:
import atexit
try:
import readline
import rlcompleter
@ -15,11 +17,11 @@ def register_readline() -> None:
# Reading the initialization (config) file may not be enough to set a
# completion key, so we set one first and then read the file.
readline_doc = getattr(readline, '__doc__', '')
if readline_doc is not None and 'libedit' in readline_doc:
readline.parse_and_bind('bind ^I rl_complete')
readline_doc = getattr(readline, "__doc__", "")
if readline_doc is not None and "libedit" in readline_doc:
readline.parse_and_bind("bind ^I rl_complete")
else:
readline.parse_and_bind('tab: complete')
readline.parse_and_bind("tab: complete")
try:
readline.read_init_file()
@ -36,12 +38,14 @@ def register_readline() -> None:
# each interpreter exit when readline was already configured
# through a PYTHONSTARTUP hook, see:
# http://bugs.python.org/issue5845#msg198636
history = os.path.join(os.path.expanduser('~'),
'.cache/python_history')
history = os.path.join(
os.path.expanduser("~"), ".cache/python_history"
)
try:
readline.read_history_file(history)
except OSError:
pass
atexit.register(readline.write_history_file, history)
sys.__interactivehook__ = register_readline

View file

@ -5,6 +5,9 @@ from subprocess import check_output
def get_pass(account):
return check_output("pass " + account, shell=True).splitlines()[0]
def beep():
check_output("play -n synth sine E4 sine A5 remix 1-2 fade 0.5 1.2 0.5 2", shell=True)
def beep():
check_output(
"play -n synth sine E4 sine A5 remix 1-2 fade 0.5 1.2 0.5 2",
shell=True,
)