From 8179433c41f0f8d9f86c398d2466f98a1a472104 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Geoffrey=20=E2=80=9CFrogeye=E2=80=9D=20Preud=27homme?= Date: Thu, 8 May 2025 18:29:03 +0200 Subject: [PATCH] Manual fixes to Python file To see if I like the rules --- common/frogarized/frogarized.py | 13 +- .../update-local-flakes.py | 28 +- curacao/desk/desk_mqtt.py | 272 +++++++------- hm/desktop/frobar/frobar/__init__.py | 66 ++-- hm/desktop/frobar/frobar/common.py | 311 ++++++++-------- hm/desktop/frobar/frobar/providers.py | 340 ++++++++++-------- hm/dev/pythonstartup.py | 33 +- os/wireless/apply.py | 18 +- os/wireless/import.py | 39 +- unprocessed/config/offlineimap.py | 39 +- 10 files changed, 612 insertions(+), 547 deletions(-) mode change 100755 => 100644 common/update-local-flakes/update-local-flakes.py mode change 100755 => 100644 curacao/desk/desk_mqtt.py mode change 100755 => 100644 os/wireless/import.py diff --git a/common/frogarized/frogarized.py b/common/frogarized/frogarized.py index c3aec87..013c746 100755 --- a/common/frogarized/frogarized.py +++ b/common/frogarized/frogarized.py @@ -1,3 +1,8 @@ +#!/usr/bin/env nix-shell +#! nix-shell -i python3 --pure +#! nix-shell -p python3 python3Packages.colorspacious python3Packages.numpy +# vim: filetype=python + import argparse import json @@ -90,14 +95,14 @@ if args.polarity == "light": frogarized_rgb = np.vstack([frogarized_rgb[7::-1], frogarized_rgb[8:]]) # Output -palette = dict() +palette = {} for i in range(16): rgb = frogarized_rgb[i] r, g, b = rgb - hex = f"#{r:02x}{g:02x}{b:02x}" - palette[f"base{i:02X}"] = hex + hexa = f"#{r:02x}{g:02x}{b:02x}" + palette[f"base{i:02X}"] = hexa if args.output == "truecolor": - print(f"\033[48;2;{r};{g};{b}m{hex}\033[0m") # ]] + print(f"\033[48;2;{r};{g};{b}m{hexa}\033[0m") # ]] # treesitter is silly and will consider brackets in strings # as indentation, hence the comment above if args.output == "json": diff --git a/common/update-local-flakes/update-local-flakes.py b/common/update-local-flakes/update-local-flakes.py old mode 100755 new mode 100644 index 3ccae12..b3d7fde --- a/common/update-local-flakes/update-local-flakes.py +++ b/common/update-local-flakes/update-local-flakes.py @@ -1,6 +1,6 @@ import argparse import json -import os +import pathlib import subprocess GET_INPUTS_CMD = [ @@ -12,28 +12,26 @@ GET_INPUTS_CMD = [ ] -def process_flake(flakeUri: str) -> None: +def process_flake(flake_uri: pathlib.Path) -> None: # get full path - flakeUri = os.path.normpath(flakeUri) - flakeFile = os.path.join(flakeUri, "flake.nix") - if not os.path.isfile(flakeFile): - raise FileNotFoundError(f"Flake not found: {flakeUri}") + flake_file = flake_uri / "flake.nix" + if not flake_file.is_file(): + msg = f"Flake not found: {flake_uri}" + raise FileNotFoundError(msg) # import dependencies - p = subprocess.run(GET_INPUTS_CMD, cwd=flakeUri, stdout=subprocess.PIPE, check=True) + p = subprocess.run( + GET_INPUTS_CMD, cwd=flake_uri, stdout=subprocess.PIPE, check=True + ) deps = json.loads(p.stdout) # for each dependency for dep_name, dep in deps.items(): dep_url = dep["url"] # if not local path, continue - if not ( - dep_url.startswith("path:") or dep_url.startswith("git+file:") - ): + if not dep_url.startswith(("path:", "git+file:")): continue if dep.get("flake", True): # get flake file corresponding - dep_path = dep_url.split(":")[1] - if not dep_path.startswith("/"): - dep_path = os.path.join(flakeUri, dep_path) + dep_path = pathlib.Path(flake_uri, dep_url.split(":")[1]) process_flake(dep_path) # update lockfile cmd = [ @@ -46,7 +44,7 @@ def process_flake(flakeUri: str) -> None: "update", dep_name, ] - p = subprocess.run(cmd, cwd=flakeUri, check=True) + p = subprocess.run(cmd, cwd=flake_uri, check=True) if __name__ == "__main__": @@ -54,6 +52,6 @@ if __name__ == "__main__": description="Recursively update lockfiles " "of flakes located on the system" ) - parser.add_argument("flake", help="Starting flake", default="/") + parser.add_argument("flake", help="Starting flake", type=pathlib.Path) args = parser.parse_args() process_flake(args.flake) diff --git a/curacao/desk/desk_mqtt.py b/curacao/desk/desk_mqtt.py old mode 100755 new mode 100644 index 52306c9..8e2bdf0 --- a/curacao/desk/desk_mqtt.py +++ b/curacao/desk/desk_mqtt.py @@ -12,7 +12,9 @@ import usb.util class Desk: """ - Controls my Linak desk, which is a CBD4P controller connected via USB2LIN06 + Controls my Linak desk. + + It is a CBD4P controller connected via USB2LIN06. This particular combination doesn't seem to report desk height, so it is estimated from the physical controller that does work. """ @@ -69,24 +71,27 @@ class Desk: # Better estimate a bit slower SPEED = (VALUE_TOP - VALUE_BOT) / FULL_TIME * SPEED_MARGIN # unit / s - def _cmToUnit(self, height: float) -> int: + HEADER_STRUCT = struct.Struct(" int: return round((height - self.HEIGHT_OFFSET) * self.HEIGHT_MULT) - def _unitToCm(self, height: int) -> float: + def _unit_to_cm(self, height: int) -> float: return height / self.HEIGHT_MULT + self.HEIGHT_OFFSET - def _get(self, typ: int, overflow_ok: bool = False) -> bytes: + def _get(self, typ: int) -> bytes: # Magic numbers: get class interface, HID get report raw = self._dev.ctrl_transfer( 0xA1, 0x01, 0x300 + typ, 0, self.BUF_LEN ).tobytes() - self.log.debug(f"Received {raw.hex()}") - assert raw[0] == typ - size = raw[1] - end = 2 + size - if not overflow_ok: - assert end < self.BUF_LEN - return raw[2:end] + self.log.debug("Received %s", raw.hex()) + typ, size = self.HEADER_STRUCT.unpack(raw[: self.HEADER_STRUCT.size]) + start = self.HEADER_STRUCT.size + end = self.HEADER_STRUCT.size + size + if end >= self.BUF_LEN: + raise OverflowError + return raw[start:end] # Non-implemented types: # 1, 7: some kind of stream when the device isn't initialized? # size reduces the faster you poll, increases when buttons are held @@ -96,7 +101,7 @@ class Desk: buf = bytes([typ]) + buf # The official apps pad, not that it doesn't seem to work without buf = buf + b"\x00" * (self.BUF_LEN - len(buf)) - self.log.debug(f"Sending {buf.hex()}") + self.log.debug("Sending %s", buf.hex()) # Magic numbers: set class interface, HID set report self._dev.ctrl_transfer(0x21, 0x09, 0x300 + typ, 0, buf) # Non-implemented types: @@ -110,9 +115,11 @@ class Desk: def _initialize(self) -> None: """ - Seems to take the USB2LIN06 out of "boot mode" - (name according to CBD4 Controller) which it is after reset. + Seems to take the USB2LIN06 out of "boot mode". + + It is like that after reset. Permits control and reading the report. + (name according to CBD4 Controller) """ buf = bytes([0x04, 0x00, 0xFB]) self._set(3, buf) @@ -122,9 +129,8 @@ class Desk: self.log = logging.getLogger("Desk") self._dev = usb.core.find(idVendor=Desk.VEND, idProduct=Desk.PROD) if not self._dev: - raise ValueError( - f"Device {Desk.VEND}:" f"{Desk.PROD:04d} " f"not found!" - ) + msg = f"Device {Desk.VEND}: {Desk.PROD:04d} not found!" + raise ValueError(msg) if self._dev.is_kernel_driver_active(0): self._dev.detach_kernel_driver(0) @@ -136,9 +142,7 @@ class Desk: self.fetch_callback: typing.Callable[[Desk], None] | None = None def _get_report(self) -> bytes: - raw = self._get(4) - assert len(raw) == 0x38 - return raw + return self._get(4) def _update_estimations(self) -> None: now = time.time() @@ -192,23 +196,19 @@ class Desk: try: raw = self._get_report() break - except usb.USBError as e: - self.log.error(e) + except usb.USBError: + self.log.exception("USB issue") else: raw = self._get_report() - # Allegedly, from decompiling: - # https://www.linak-us.com/products/controls/desk-control-basic-software/ - # Never reports anything in practice - self.value = struct.unpack(" None: """ + Move the desk to the given position in cm. + If any button is held during movement, the desk will stop moving, yet this will think it's still moving, throwing off the estimates. It's not a bug, it's a safety feature. @@ -243,7 +245,7 @@ class Desk: """ # Would to stop for a while before reversing course, without being able # to read the actual height it's just too annoying to implement - return self._move_to(self._cmToUnit(position)) + return self._move_to(self._cm_to_unit(position)) def stop(self) -> None: self.log.info("Stop moving") @@ -252,128 +254,144 @@ class Desk: def get_height_bounds(self) -> tuple[float, float]: return ( - self._unitToCm(int(self.est_value_bot)), - self._unitToCm(int(self.est_value_top)), + self._unit_to_cm(int(self.est_value_bot)), + self._unit_to_cm(int(self.est_value_top)), ) def get_height(self) -> float | None: if self.est_value is None: return None - return self._unitToCm(self.est_value) + return self._unit_to_cm(self.est_value) -if __name__ == "__main__": - logging.basicConfig() - log = logging.getLogger(__name__) +class App: + NDIGITS = 1 - desk = Desk() - serial = "000C-34E7" + def __init__(self) -> None: + logging.basicConfig() + self.log = logging.getLogger(__name__) - # Configure the required parameters for the MQTT broker - mqtt_settings = ha_mqtt_discoverable.Settings.MQTT(host="192.168.7.53") - ndigits = 1 - target_height: float | None = None + self.desk = Desk() + self.desk.fetch_callback = self.fetch_callback + serial = "000C-34E7" - device_info = ha_mqtt_discoverable.DeviceInfo( - name="Desk", - identifiers=["Linak", serial], - manufacturer="Linak", - model="CBD4P", - suggested_area="Desk", - hw_version="77402", - sw_version="1.91", - serial_number=serial, - ) + # Configure the required parameters for the MQTT broker + mqtt_settings = ha_mqtt_discoverable.Settings.MQTT(host="192.168.7.53") + self.target_height: float | None = None - common_opts = { - "device": device_info, - "icon": "mdi:desk", - "unit_of_measurement": "cm", - "device_class": "distance", - "expire_after": 10, - } - # TODO Implement proper availability in hq-mqtt-discoverable + device_info = ha_mqtt_discoverable.DeviceInfo( + name="Desk", + identifiers=["Linak", serial], + manufacturer="Linak", + model="CBD4P", + suggested_area="Desk", + hw_version="77402", + sw_version="1.91", + serial_number=serial, + ) - height_info = ha_mqtt_discoverable.sensors.NumberInfo( - name="Height ", - min=desk.HEIGHT_BOT, - max=desk.HEIGHT_TOP, - mode="slider", - step=10 ** (-ndigits), - unique_id="desk_height", - **common_opts, - ) - height_settings = ha_mqtt_discoverable.Settings( - mqtt=mqtt_settings, entity=height_info - ) + common_opts = { + "device": device_info, + "icon": "mdi:desk", + "unit_of_measurement": "cm", + "device_class": "distance", + "expire_after": 10, + } + # TODO Implement proper availability in hq-mqtt-discoverable - def height_callback( - client: paho.mqtt.client.Client, - user_data: None, - message: paho.mqtt.client.MQTTMessage, - ) -> None: - global target_height - target_height = float(message.payload.decode()) - log.info(f"Requested height to {target_height:.1f}") + height_info = ha_mqtt_discoverable.sensors.NumberInfo( + name="Height ", + min=self.desk.HEIGHT_BOT, + max=self.desk.HEIGHT_TOP, + mode="slider", + step=10 ** (-self.NDIGITS), + unique_id="desk_height", + **common_opts, + ) + height_settings = ha_mqtt_discoverable.Settings( + mqtt=mqtt_settings, entity=height_info + ) - height = ha_mqtt_discoverable.sensors.Number( - height_settings, height_callback - ) + self.height = ha_mqtt_discoverable.sensors.Number( + height_settings, self.height_callback + ) - height_max_info = ha_mqtt_discoverable.sensors.SensorInfo( - name="Estimated height max", - unique_id="desk_height_max", - entity_category="diagnostic", - **common_opts, - ) - height_max_settings = ha_mqtt_discoverable.Settings( - mqtt=mqtt_settings, entity=height_max_info - ) - height_max = ha_mqtt_discoverable.sensors.Sensor(height_max_settings) + height_max_info = ha_mqtt_discoverable.sensors.SensorInfo( + name="Estimated height max", + unique_id="desk_height_max", + entity_category="diagnostic", + **common_opts, + ) + height_max_settings = ha_mqtt_discoverable.Settings( + mqtt=mqtt_settings, entity=height_max_info + ) + self.height_max = ha_mqtt_discoverable.sensors.Sensor( + height_max_settings + ) - height_min_info = ha_mqtt_discoverable.sensors.SensorInfo( - name="Estimated height min", - unique_id="desk_height_min", - entity_category="diagnostic", - **common_opts, - ) - height_min_settings = ha_mqtt_discoverable.Settings( - mqtt=mqtt_settings, entity=height_min_info - ) - height_min = ha_mqtt_discoverable.sensors.Sensor(height_min_settings) + height_min_info = ha_mqtt_discoverable.sensors.SensorInfo( + name="Estimated height min", + unique_id="desk_height_min", + entity_category="diagnostic", + **common_opts, + ) + height_min_settings = ha_mqtt_discoverable.Settings( + mqtt=mqtt_settings, entity=height_min_info + ) + self.height_min = ha_mqtt_discoverable.sensors.Sensor( + height_min_settings + ) - last_published_state = None + self.last_published_state: tuple[float | None, float, float] | None = ( + None + ) - def fetch_callback(desk: Desk) -> None: + def fetch_callback(self, desk: Desk) -> None: hcur = desk.get_height() hmin, hmax = desk.get_height_bounds() - global last_published_state state = hcur, hmin, hmax - if state == last_published_state: + if state == self.last_published_state: return - last_published_state = state + self.last_published_state = state # If none this will set as unknown # Also readings can be a bit outside the boundaries, # so this skips verification if isinstance(hcur, float): - hcur = round(hcur, ndigits=ndigits) - height._update_state(hcur) + hcur = round(hcur, ndigits=self.NDIGITS) + self.height._update_state(hcur) # noqa: SLF001 - height_max._update_state(round(hmax, ndigits=ndigits)) - height_min._update_state(round(hmin, ndigits=ndigits)) + self.height_max._update_state( # noqa: SLF001 + round(hmax, ndigits=self.NDIGITS), + ) + self.height_min._update_state( # noqa: SLF001 + round(hmin, ndigits=self.NDIGITS), + ) - desk.fetch_callback = fetch_callback + def height_callback( + self, + _client: paho.mqtt.client.Client, + _user_data: None, + message: paho.mqtt.client.MQTTMessage, + ) -> None: + self.target_height = float(message.payload.decode()) + self.log.info("Requested height to %.1f", self.target_height) - interval = 0.2 - # Need to be rective to catch - while True: - if target_height: - temp_target_height = target_height - # Allows queuing of other instructions while moving - target_height = None - desk.move_to(temp_target_height) - else: - time.sleep(interval) - desk.fetch() + def run(self) -> None: + interval = 0.2 + # Need to be rective to catch + while True: + if self.target_height: + temp_target_height = self.target_height + # Allows queuing of other instructions while moving + self.target_height = None + self.desk.move_to(temp_target_height) + else: + time.sleep(interval) + self.desk.fetch() + + +if __name__ == "__main__": + app = App() + app.run() diff --git a/hm/desktop/frobar/frobar/__init__.py b/hm/desktop/frobar/frobar/__init__.py index 4622b53..82caa66 100644 --- a/hm/desktop/frobar/frobar/__init__.py +++ b/hm/desktop/frobar/frobar/__init__.py @@ -9,7 +9,7 @@ from frobar.common import Alignment def main() -> None: # TODO Configurable - FROGARIZED = [ + frogarized = ( "#092c0e", "#143718", "#5a7058", @@ -26,12 +26,12 @@ def main() -> None: "#008dd1", "#5c73c4", "#d43982", - ] + ) # TODO Not super happy with the color management, # while using an existing library is great, it's limited to ANSI colors def base16_color(color: int) -> tuple[int, int, int]: - hexa = FROGARIZED[color] + hexa = frogarized[color] return tuple(rich.color.parse_rgb_hex(hexa[1:])) theme = rich.terminal_theme.TerminalTheme( @@ -62,83 +62,83 @@ def main() -> None: ) bar = frobar.common.Bar(theme=theme) - dualScreen = len(bar.children) > 1 - leftPreferred = 0 if dualScreen else None - rightPreferred = 1 if dualScreen else None + dual_screen = len(bar.children) > 1 + left_preferred = 0 if dual_screen else None + right_preferred = 1 if dual_screen else None workspaces_suffixes = "▲■" - workspaces_names = dict( - (str(i + 1), f"{i+1} {c}") for i, c in enumerate(workspaces_suffixes) - ) + workspaces_names = { + str(i + 1): f"{i+1} {c}" for i, c in enumerate(workspaces_suffixes) + } color = rich.color.Color.parse - bar.addProvider( + bar.add_provider( frobar.providers.I3ModeProvider(color=color("red")), alignment=Alignment.LEFT, ) - bar.addProvider( + bar.add_provider( frobar.providers.I3WorkspacesProvider(custom_names=workspaces_names), alignment=Alignment.LEFT, ) - if dualScreen: - bar.addProvider( + if dual_screen: + bar.add_provider( frobar.providers.I3WindowTitleProvider(color=color("white")), - screenNum=0, + screen_num=0, alignment=Alignment.CENTER, ) - bar.addProvider( + bar.add_provider( frobar.providers.MprisProvider(color=color("bright_white")), - screenNum=rightPreferred, + screen_num=right_preferred, alignment=Alignment.CENTER, ) else: - bar.addProvider( + bar.add_provider( frobar.common.SpacerProvider(), alignment=Alignment.LEFT, ) - bar.addProvider( + bar.add_provider( frobar.providers.MprisProvider(color=color("bright_white")), alignment=Alignment.LEFT, ) - bar.addProvider( + bar.add_provider( frobar.providers.CpuProvider(), - screenNum=leftPreferred, + screen_num=left_preferred, alignment=Alignment.RIGHT, ) - bar.addProvider( + bar.add_provider( frobar.providers.LoadProvider(), - screenNum=leftPreferred, + screen_num=left_preferred, alignment=Alignment.RIGHT, ) - bar.addProvider( + bar.add_provider( frobar.providers.RamProvider(), - screenNum=leftPreferred, + screen_num=left_preferred, alignment=Alignment.RIGHT, ) - bar.addProvider( + bar.add_provider( frobar.providers.TemperatureProvider(), - screenNum=leftPreferred, + screen_num=left_preferred, alignment=Alignment.RIGHT, ) - bar.addProvider( + bar.add_provider( frobar.providers.BatteryProvider(), - screenNum=leftPreferred, + screen_num=left_preferred, alignment=Alignment.RIGHT, ) - bar.addProvider( + bar.add_provider( frobar.providers.PulseaudioProvider(color=color("magenta")), - screenNum=rightPreferred, + screen_num=right_preferred, alignment=Alignment.RIGHT, ) - bar.addProvider( + bar.add_provider( frobar.providers.NetworkProvider(color=color("blue")), - screenNum=leftPreferred, + screen_num=left_preferred, alignment=Alignment.RIGHT, ) - bar.addProvider( + bar.add_provider( frobar.providers.TimeProvider(color=color("cyan")), alignment=Alignment.RIGHT, ) diff --git a/hm/desktop/frobar/frobar/common.py b/hm/desktop/frobar/frobar/common.py index 8af3762..f54bc75 100644 --- a/hm/desktop/frobar/frobar/common.py +++ b/hm/desktop/frobar/frobar/common.py @@ -30,15 +30,16 @@ Sortable = str | int # Display utilities +UNIT_THRESHOLD = 1000 +FLOAT_THRESHOLD = 10 -def humanSize(numi: int) -> str: - """ - Returns a string of width 3+3 - """ + +def human_size(numi: int) -> str: + """Return a string of width 3+3.""" num = float(numi) for unit in ("B ", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB"): - if abs(num) < 1000: - if num >= 10: + if abs(num) < UNIT_THRESHOLD: + if num >= FLOAT_THRESHOLD: return f"{int(num):3d}{unit}" return f"{num:.1f}{unit}" num /= 1024 @@ -64,46 +65,47 @@ def clip(text: str, length: int = 30) -> str: class ComposableText(typing.Generic[P, C]): def __init__( self, - parent: typing.Optional[P] = None, - sortKey: Sortable = 0, + parent: P | None = None, + sort_key: Sortable = 0, ) -> None: - self.parent: typing.Optional[P] = None - self.children: typing.MutableSequence[C] = list() - self.sortKey = sortKey + self.parent: P | None = None + self.children: typing.MutableSequence[C] = [] + self.sortKey = sort_key if parent: - self.setParent(parent) - self.bar = self.getFirstParentOfType(Bar) + self.set_parent(parent) + self.bar = self.get_first_parent_of_type(Bar) - def setParent(self, parent: P) -> None: + def set_parent(self, parent: P) -> None: assert self.parent is None parent.children.append(self) assert isinstance(parent.children, list) parent.children.sort(key=lambda c: c.sortKey) self.parent = parent - self.parent.updateMarkup() + self.parent.update_markup() - def unsetParent(self) -> None: + def unset_parent(self) -> None: assert self.parent self.parent.children.remove(self) - self.parent.updateMarkup() + self.parent.update_markup() self.parent = None - def getFirstParentOfType(self, typ: typing.Type[T]) -> T: + def get_first_parent_of_type(self, typ: type[T]) -> T: parent = self while not isinstance(parent, typ): assert parent.parent, f"{self} doesn't have a parent of {typ}" parent = parent.parent return parent - def updateMarkup(self) -> None: + def update_markup(self) -> None: self.bar.refresh.set() # TODO OPTI See if worth caching the output - def generateMarkup(self) -> str: - raise NotImplementedError(f"{self} cannot generate markup") + def generate_markup(self) -> str: + msg = f"{self} cannot generate markup" + raise NotImplementedError(msg) - def getMarkup(self) -> str: - return self.generateMarkup() + def get_markup(self) -> str: + return self.generate_markup() class Button(enum.Enum): @@ -114,18 +116,19 @@ class Button(enum.Enum): SCROLL_DOWN = "5" +RICH_DEFAULT_COLOR = rich.color.Color.default() + + class Section(ComposableText): - """ - Colorable block separated by chevrons - """ + """Colorable block separated by chevrons.""" def __init__( self, parent: "Module", - sortKey: Sortable = 0, - color: rich.color.Color = rich.color.Color.default(), + sort_key: Sortable = 0, + color: rich.color.Color = RICH_DEFAULT_COLOR, ) -> None: - super().__init__(parent=parent, sortKey=sortKey) + super().__init__(parent=parent, sort_key=sort_key) self.parent: Module self.color = color @@ -134,9 +137,9 @@ class Section(ComposableText): self.targetSize = -1 self.size = -1 self.animationTask: asyncio.Task | None = None - self.actions: dict[Button, str] = dict() + self.actions: dict[Button, str] = {} - def isHidden(self) -> bool: + def is_hidden(self) -> bool: return self.size < 0 # Geometric series, with a cap @@ -147,29 +150,29 @@ class Section(ComposableText): async def animate(self) -> None: increment = 1 if self.size < self.targetSize else -1 loop = asyncio.get_running_loop() - frameTime = loop.time() - animTime = self.ANIM_A + frame_time = loop.time() + anim_time = self.ANIM_A skipped = 0 while self.size != self.targetSize: self.size += increment - self.updateMarkup() + self.update_markup() - animTime *= self.ANIM_R - animTime = max(self.ANIM_MIN, animTime) - frameTime += animTime - sleepTime = frameTime - loop.time() + anim_time *= self.ANIM_R + anim_time = max(self.ANIM_MIN, anim_time) + frame_time += anim_time + sleep_time = frame_time - loop.time() # In case of stress, skip refreshing by not awaiting - if sleepTime > 0: + if sleep_time > 0: if skipped > 0: - log.warning(f"Skipped {skipped} animation frame(s)") + log.warning("Skipped %d animation frame(s)", skipped) skipped = 0 - await asyncio.sleep(sleepTime) + await asyncio.sleep(sleep_time) else: skipped += 1 - def setText(self, text: str | None) -> None: + def set_text(self, text: str | None) -> None: # OPTI Don't redraw nor reset animation if setting the same text if self.desiredText == text: return @@ -184,23 +187,23 @@ class Section(ComposableText): self.animationTask.cancel() # OPTI Skip the whole animation task if not required if self.size == self.targetSize: - self.updateMarkup() + self.update_markup() else: self.animationTask = self.bar.taskGroup.create_task(self.animate()) - def setAction( + def set_action( self, button: Button, callback: typing.Callable | None ) -> None: if button in self.actions: command = self.actions[button] - self.bar.removeAction(command) + self.bar.remove_action(command) del self.actions[button] if callback: - command = self.bar.addAction(callback) + command = self.bar.add_action(callback) self.actions[button] = command - def generateMarkup(self) -> str: - assert not self.isHidden() + def generate_markup(self) -> str: + assert not self.is_hidden() pad = max(0, self.size - len(self.text)) text = self.text[: self.size] + " " * pad for button, command in self.actions.items(): @@ -209,9 +212,7 @@ class Section(ComposableText): class Module(ComposableText): - """ - Sections handled by a same updater - """ + """Sections handled by a same updater.""" def __init__(self, parent: "Side") -> None: super().__init__(parent=parent) @@ -219,21 +220,21 @@ class Module(ComposableText): self.children: typing.MutableSequence[Section] self.mirroring: Module | None = None - self.mirrors: list[Module] = list() + self.mirrors: list[Module] = [] def mirror(self, module: "Module") -> None: self.mirroring = module module.mirrors.append(self) - def getSections(self) -> typing.Sequence[Section]: + def get_sections(self) -> typing.Sequence[Section]: if self.mirroring: return self.mirroring.children return self.children - def updateMarkup(self) -> None: - super().updateMarkup() + def update_markup(self) -> None: + super().update_markup() for mirror in self.mirrors: - mirror.updateMarkup() + mirror.update_markup() class Alignment(enum.Enum): @@ -249,39 +250,41 @@ class Side(ComposableText): self.children: typing.MutableSequence[Module] = [] self.alignment = alignment - self.bar = parent.getFirstParentOfType(Bar) + self.bar = parent.get_first_parent_of_type(Bar) - def generateMarkup(self) -> str: + def generate_markup(self) -> str: if not self.children: return "" text = "%{" + self.alignment.value + "}" - lastSection: Section | None = None + last_section: Section | None = None for module in self.children: - for section in module.getSections(): - if section.isHidden(): + for section in module.get_sections(): + if section.is_hidden(): continue hexa = section.color.get_truecolor(theme=self.bar.theme).hex - if lastSection is None: - if self.alignment == Alignment.LEFT: - text += "%{B" + hexa + "}%{F-}" - else: - text += "%{B-}%{F" + hexa + "}%{R}%{F-}" - elif isinstance(lastSection, SpacerSection): + if last_section is None: + text += ( + "%{B" + hexa + "}%{F-}" + if self.alignment == Alignment.LEFT + else "%{B-}%{F" + hexa + "}%{R}%{F-}" + ) + elif isinstance(last_section, SpacerSection): text += "%{B-}%{F" + hexa + "}%{R}%{F-}" else: if self.alignment == Alignment.RIGHT: - if lastSection.color == section.color: - text += "" - else: - text += "%{F" + hexa + "}%{R}" - elif lastSection.color == section.color: + text += ( + "" + if last_section.color == section.color + else "%{F" + hexa + "}%{R}" + ) + elif last_section.color == section.color: text += "" else: text += "%{R}%{B" + hexa + "}" text += "%{F-}" - text += section.getMarkup() - lastSection = section - if self.alignment != Alignment.RIGHT and lastSection: + text += section.get_markup() + last_section = section + if self.alignment != Alignment.RIGHT and last_section: text += "%{R}%{B-}" return text @@ -297,32 +300,33 @@ class Screen(ComposableText): for alignment in Alignment: Side(parent=self, alignment=alignment) - def generateMarkup(self) -> str: + def generate_markup(self) -> str: return ("%{Sn" + self.output + "}") + "".join( - side.getMarkup() for side in self.children + side.get_markup() for side in self.children ) +RICH_DEFAULT_THEME = rich.terminal_theme.DEFAULT_TERMINAL_THEME + + class Bar(ComposableText): - """ - Top-level - """ + """Top-level.""" def __init__( self, - theme: rich.terminal_theme.TerminalTheme = rich.terminal_theme.DEFAULT_TERMINAL_THEME, + theme: rich.terminal_theme.TerminalTheme = RICH_DEFAULT_THEME, ) -> None: super().__init__() self.parent: None self.children: typing.MutableSequence[Screen] - self.longRunningTasks: list[asyncio.Task] = list() + self.longRunningTasks: list[asyncio.Task] = [] self.theme = theme self.refresh = asyncio.Event() self.taskGroup = asyncio.TaskGroup() - self.providers: list[Provider] = list() + self.providers: list[Provider] = [] self.actionIndex = 0 - self.actions: dict[str, typing.Callable] = dict() + self.actions: dict[str, typing.Callable] = {} self.periodicProviderTask: typing.Coroutine | None = None @@ -334,7 +338,7 @@ class Bar(ComposableText): continue Screen(parent=self, output=output.name) - def addLongRunningTask(self, coro: typing.Coroutine) -> None: + def add_long_running_task(self, coro: typing.Coroutine) -> None: task = self.taskGroup.create_task(coro) self.longRunningTasks.append(task) @@ -360,10 +364,10 @@ class Bar(ComposableText): while True: await self.refresh.wait() self.refresh.clear() - markup = self.getMarkup() + markup = self.get_markup() proc.stdin.write(markup.encode()) - async def actionHandler() -> None: + async def action_handler() -> None: assert proc.stdout while True: line = await proc.stdout.readline() @@ -371,39 +375,36 @@ class Bar(ComposableText): callback = self.actions.get(command) if callback is None: # In some conditions on start it's empty - log.error(f"Unknown command: {command}") + log.error("Unknown command: %s", command) return callback() async with self.taskGroup: - self.addLongRunningTask(refresher()) - self.addLongRunningTask(actionHandler()) + self.add_long_running_task(refresher()) + self.add_long_running_task(action_handler()) for provider in self.providers: - self.addLongRunningTask(provider.run()) + self.add_long_running_task(provider.run()) - def exit() -> None: + def finish() -> None: log.info("Terminating") for task in self.longRunningTasks: task.cancel() loop = asyncio.get_event_loop() - loop.add_signal_handler(signal.SIGINT, exit) + loop.add_signal_handler(signal.SIGINT, finish) - def generateMarkup(self) -> str: - return "".join(screen.getMarkup() for screen in self.children) + "\n" + def generate_markup(self) -> str: + return "".join(screen.get_markup() for screen in self.children) + "\n" - def addProvider( + def add_provider( self, provider: "Provider", alignment: Alignment = Alignment.LEFT, - screenNum: int | None = None, + screen_num: int | None = None, # None = all screens ) -> None: - """ - screenNum: the provider will be added on this screen if set, all otherwise - """ - modules = list() + modules = [] for s, screen in enumerate(self.children): - if screenNum is None or s == screenNum: + if screen_num is None or s == screen_num: side = next( filter(lambda s: s.alignment == alignment, screen.children) ) @@ -413,13 +414,13 @@ class Bar(ComposableText): if modules: self.providers.append(provider) - def addAction(self, callback: typing.Callable) -> str: + def add_action(self, callback: typing.Callable) -> str: command = f"{self.actionIndex:x}" self.actions[command] = callback self.actionIndex += 1 return command - def removeAction(self, command: str) -> None: + def remove_action(self, command: str) -> None: del self.actions[command] def launch(self) -> None: @@ -431,12 +432,10 @@ class Bar(ComposableText): class Provider: - sectionType: type[Section] = Section + section_type: type[Section] = Section - def __init__( - self, color: rich.color.Color = rich.color.Color.default() - ) -> None: - self.modules: list[Module] = list() + def __init__(self, color: rich.color.Color = RICH_DEFAULT_COLOR) -> None: + self.modules: list[Module] = [] self.color = color async def run(self) -> None: @@ -445,9 +444,7 @@ class Provider: class MirrorProvider(Provider): - def __init__( - self, color: rich.color.Color = rich.color.Color.default() - ) -> None: + def __init__(self, color: rich.color.Color = RICH_DEFAULT_COLOR) -> None: super().__init__(color=color) self.module: Module @@ -461,19 +458,19 @@ class MirrorProvider(Provider): class SingleSectionProvider(MirrorProvider): async def run(self) -> None: await super().run() - self.section = self.sectionType(parent=self.module, color=self.color) + self.section = self.section_type(parent=self.module, color=self.color) class StaticProvider(SingleSectionProvider): def __init__( - self, text: str, color: rich.color.Color = rich.color.Color.default() + self, text: str, color: rich.color.Color = RICH_DEFAULT_COLOR ) -> None: super().__init__(color=color) self.text = text async def run(self) -> None: await super().run() - self.section.setText(self.text) + self.section.set_text(self.text) class SpacerSection(Section): @@ -481,7 +478,7 @@ class SpacerSection(Section): class SpacerProvider(SingleSectionProvider): - sectionType = SpacerSection + section_type = SpacerSection def __init__(self, length: int = 5) -> None: super().__init__(color=rich.color.Color.default()) @@ -490,41 +487,41 @@ class SpacerProvider(SingleSectionProvider): async def run(self) -> None: await super().run() assert isinstance(self.section, SpacerSection) - self.section.setText(" " * self.length) + self.section.set_text(" " * self.length) class StatefulSection(Section): def __init__( self, parent: Module, - sortKey: Sortable = 0, - color: rich.color.Color = rich.color.Color.default(), + sort_key: Sortable = 0, + color: rich.color.Color = RICH_DEFAULT_COLOR, ) -> None: - super().__init__(parent=parent, sortKey=sortKey, color=color) + super().__init__(parent=parent, sort_key=sort_key, color=color) self.state = 0 self.numberStates: int - self.setAction(Button.CLICK_LEFT, self.incrementState) - self.setAction(Button.CLICK_RIGHT, self.decrementState) + self.set_action(Button.CLICK_LEFT, self.increment_state) + self.set_action(Button.CLICK_RIGHT, self.decrement_state) - def incrementState(self) -> None: + def increment_state(self) -> None: self.state += 1 - self.changeState() + self.change_state() - def decrementState(self) -> None: + def decrement_state(self) -> None: self.state -= 1 - self.changeState() + self.change_state() - def setChangedState(self, callback: typing.Callable) -> None: + def set_changed_state(self, callback: typing.Callable) -> None: self.callback = callback - def changeState(self) -> None: + def change_state(self) -> None: self.state %= self.numberStates self.bar.taskGroup.create_task(self.callback()) class StatefulSectionProvider(Provider): - sectionType = StatefulSection + section_type = StatefulSection class SingleStatefulSectionProvider( @@ -534,46 +531,44 @@ class SingleStatefulSectionProvider( class MultiSectionsProvider(Provider): - def __init__( - self, color: rich.color.Color = rich.color.Color.default() - ) -> None: + def __init__(self, color: rich.color.Color = RICH_DEFAULT_COLOR) -> None: super().__init__(color=color) self.sectionKeys: dict[Module, dict[Sortable, Section]] = ( collections.defaultdict(dict) ) - self.updaters: dict[Section, typing.Callable] = dict() + self.updaters: dict[Section, typing.Callable] = {} - async def getSectionUpdater(self, section: Section) -> typing.Callable: + async def get_section_updater(self, section: Section) -> typing.Callable: raise NotImplementedError @staticmethod - async def doNothing() -> None: + async def do_nothing() -> None: pass - async def updateSections( + async def update_sections( self, sections: set[Sortable], module: Module ) -> None: - moduleSections = self.sectionKeys[module] + module_sections = self.sectionKeys[module] async with asyncio.TaskGroup() as tg: - for sortKey in sections: - section = moduleSections.get(sortKey) + for sort_key in sections: + section = module_sections.get(sort_key) if not section: - section = self.sectionType( - parent=module, sortKey=sortKey, color=self.color + section = self.section_type( + parent=module, sort_key=sort_key, color=self.color ) - self.updaters[section] = await self.getSectionUpdater( + self.updaters[section] = await self.get_section_updater( section ) - moduleSections[sortKey] = section + module_sections[sort_key] = section updater = self.updaters[section] tg.create_task(updater()) - missingKeys = set(moduleSections.keys()) - sections - for missingKey in missingKeys: - section = moduleSections.get(missingKey) + missing_keys = set(module_sections.keys()) - sections + for missing_key in missing_keys: + section = module_sections.get(missing_key) assert section - section.setText(None) + section.set_text(None) class PeriodicProvider(Provider): @@ -585,7 +580,7 @@ class PeriodicProvider(Provider): @classmethod async def task(cls, bar: Bar) -> None: - providers = list() + providers = [] for provider in bar.providers: if isinstance(provider, PeriodicProvider): providers.append(provider) @@ -596,7 +591,7 @@ class PeriodicProvider(Provider): loops = [provider.loop() for provider in providers] asyncio.gather(*loops) - now = datetime.datetime.now() + now = datetime.datetime.now(datetime.UTC) # Hardcoded to 1 second... not sure if we want some more than that, # and if the logic to check if a task should run would be a win # compared to the task itself @@ -606,11 +601,11 @@ class PeriodicProvider(Provider): async def run(self) -> None: await super().run() for module in self.modules: - bar = module.getFirstParentOfType(Bar) + bar = module.get_first_parent_of_type(Bar) assert bar if not bar.periodicProviderTask: bar.periodicProviderTask = PeriodicProvider.task(bar) - bar.addLongRunningTask(bar.periodicProviderTask) + bar.add_long_running_task(bar.periodicProviderTask) class PeriodicStatefulProvider( @@ -618,7 +613,7 @@ class PeriodicStatefulProvider( ): async def run(self) -> None: await super().run() - self.section.setChangedState(self.loop) + self.section.set_changed_state(self.loop) class AlertingProvider(Provider): @@ -626,16 +621,16 @@ class AlertingProvider(Provider): COLOR_WARNING = rich.color.Color.parse("yellow") COLOR_DANGER = rich.color.Color.parse("red") - warningThreshold: float - dangerThreshold: float + warning_threshold: float + danger_threshold: float - def updateLevel(self, level: float) -> None: - if level > self.dangerThreshold: + def update_level(self, level: float) -> None: + if level > self.danger_threshold: color = self.COLOR_DANGER - elif level > self.warningThreshold: + elif level > self.warning_threshold: color = self.COLOR_WARNING else: color = self.COLOR_NORMAL for module in self.modules: - for section in module.getSections(): + for section in module.get_sections(): section.color = color diff --git a/hm/desktop/frobar/frobar/providers.py b/hm/desktop/frobar/frobar/providers.py index fcda010..d9f5f86 100644 --- a/hm/desktop/frobar/frobar/providers.py +++ b/hm/desktop/frobar/frobar/providers.py @@ -16,6 +16,7 @@ import pulsectl_asyncio import rich.color from frobar.common import ( + RICH_DEFAULT_COLOR, AlertingProvider, Button, MirrorProvider, @@ -29,18 +30,20 @@ from frobar.common import ( StatefulSection, StatefulSectionProvider, clip, - humanSize, + human_size, log, ramp, ) gi.require_version("Playerctl", "2.0") -import gi.repository.Playerctl +import gi.repository.Playerctl # noqa: E402 + +T = typing.TypeVar("T") class I3ModeProvider(SingleSectionProvider): - def on_mode(self, i3: i3ipc.Connection, e: i3ipc.Event) -> None: - self.section.setText(None if e.change == "default" else e.change) + def on_mode(self, _i3: i3ipc.Connection, e: i3ipc.Event) -> None: + self.section.set_text(None if e.change == "default" else e.change) async def run(self) -> None: await super().run() @@ -54,11 +57,11 @@ class I3ModeProvider(SingleSectionProvider): class I3WindowTitleProvider(SingleSectionProvider): # TODO FEAT To make this available from start, we need to find the # `focused=True` element following the `focus` array - def on_window(self, i3: i3ipc.Connection, e: i3ipc.Event) -> None: + def on_window(self, _i3: i3ipc.Connection, e: i3ipc.Event) -> None: if e.container.name is None: - self.section.setText(None) + self.section.set_text(None) else: - self.section.setText(clip(e.container.name, 60)) + self.section.set_text(clip(e.container.name, 60)) async def run(self) -> None: await super().run() @@ -76,15 +79,15 @@ class I3WorkspacesProvider(MultiSectionsProvider): def __init__( self, - custom_names: dict[str, str] = {}, + custom_names: dict[str, str] | None = None, ) -> None: super().__init__() self.workspaces: dict[int, i3ipc.WorkspaceReply] - self.custom_names = custom_names + self.custom_names = custom_names or {} - self.modulesFromOutput: dict[str, Module] = dict() + self.modulesFromOutput: dict[str, Module] = {} - async def getSectionUpdater(self, section: Section) -> typing.Callable: + async def get_section_updater(self, section: Section) -> typing.Callable: assert isinstance(section.sortKey, int) num = section.sortKey @@ -93,13 +96,13 @@ class I3WorkspacesProvider(MultiSectionsProvider): self.i3.command(f"workspace number {num}") ) - section.setAction(Button.CLICK_LEFT, switch_to_workspace) + section.set_action(Button.CLICK_LEFT, switch_to_workspace) async def update() -> None: workspace = self.workspaces.get(num) if workspace is None: log.warning(f"Can't find workspace {num}") - section.setText("X") + section.set_text("X") return name = workspace.name @@ -113,20 +116,18 @@ class I3WorkspacesProvider(MultiSectionsProvider): section.color = self.COLOR_DEFAULT if workspace.focused: name = self.custom_names.get(name, name) - section.setText(name) + section.set_text(name) return update - async def updateWorkspaces(self) -> None: - """ - Since the i3 IPC interface cannot really tell you by events - when workspaces get invisible or not urgent anymore. - Relying on those exclusively would require reimplementing some of i3 logic. - Fetching all the workspaces on event looks ugly but is the most maintainable. - Times I tried to challenge this and failed: 2. - """ + async def update_workspaces(self) -> None: + # Since the i3 IPC interface cannot really tell you by events + # when workspaces get invisible or not urgent anymore. + # Relying on those exclusively would need reimplementing the i3 logic. + # Fetching all the workspaces on event is ugly but maintainable. + # Times I tried to challenge this and failed: 2. workspaces = await self.i3.get_workspaces() - self.workspaces = dict() + self.workspaces = {} modules = collections.defaultdict(set) for workspace in workspaces: self.workspaces[workspace.num] = workspace @@ -135,38 +136,40 @@ class I3WorkspacesProvider(MultiSectionsProvider): await asyncio.gather( *[ - self.updateSections(nums, module) + self.update_sections(nums, module) for module, nums in modules.items() ] ) - def onWorkspaceChange( - self, i3: i3ipc.Connection, e: i3ipc.Event | None = None + def on_workspace_change( + self, + _i3: i3ipc.Connection, + _event: i3ipc.Event | None = None, ) -> None: # Cancelling the task doesn't seem to prevent performance double-events - self.bar.taskGroup.create_task(self.updateWorkspaces()) + self.bar.taskGroup.create_task(self.update_workspaces()) async def run(self) -> None: for module in self.modules: - screen = module.getFirstParentOfType(Screen) + screen = module.get_first_parent_of_type(Screen) output = screen.output self.modulesFromOutput[output] = module self.bar = module.bar self.i3 = await i3ipc.aio.Connection(auto_reconnect=True).connect() - self.i3.on(i3ipc.Event.WORKSPACE, self.onWorkspaceChange) - self.onWorkspaceChange(self.i3) + self.i3.on(i3ipc.Event.WORKSPACE, self.on_workspace_change) + self.on_workspace_change(self.i3) await self.i3.main() class MprisProvider(MirrorProvider): - STATUSES = { + STATUSES: typing.ClassVar = { gi.repository.Playerctl.PlaybackStatus.PLAYING: "", gi.repository.Playerctl.PlaybackStatus.PAUSED: "", gi.repository.Playerctl.PlaybackStatus.STOPPED: "", } - PROVIDERS = { + PROVIDERS: typing.ClassVar = { "mpd": "", "firefox": "", "chromium": "", @@ -175,10 +178,10 @@ class MprisProvider(MirrorProvider): async def run(self) -> None: await super().run() - self.status = self.sectionType(parent=self.module, color=self.color) - self.album = self.sectionType(parent=self.module, color=self.color) - self.artist = self.sectionType(parent=self.module, color=self.color) - self.title = self.sectionType(parent=self.module, color=self.color) + self.status = self.section_type(parent=self.module, color=self.color) + self.album = self.section_type(parent=self.module, color=self.color) + self.artist = self.section_type(parent=self.module, color=self.color) + self.title = self.section_type(parent=self.module, color=self.color) self.manager = gi.repository.Playerctl.PlayerManager() self.manager.connect("name-appeared", self.on_name_appeared) @@ -196,13 +199,13 @@ class MprisProvider(MirrorProvider): for name in self.manager.props.player_names: self.init_player(name) - self.updateSections() + self.update_sections() while True: # Occasionally it will skip a second # but haven't managed to reproduce with debug info await self.playing.wait() - self.updateTitle() + self.update_title() if self.player: pos = self.player.props.position rem = 1 - (pos % 1000000) / 1000000 @@ -214,20 +217,20 @@ class MprisProvider(MirrorProvider): def get( something: gi.overrides.GLib.Variant, key: str, - default: typing.Any = None, - ) -> typing.Any: - if key in something.keys(): + default: T | None = None, + ) -> T | None: + if key in something.keys(): # noqa: SIM118 return something[key] return default @staticmethod - def formatUs(ms: int) -> str: + def format_us(ms: int) -> str: if ms < 60 * 60 * 1000000: return time.strftime("%M:%S", time.gmtime(ms // 1000000)) return str(datetime.timedelta(microseconds=ms)) - def findCurrentPlayer(self) -> None: - for name in [self.playerctldName] + self.manager.props.player_names: + def find_current_player(self) -> None: + for name in [self.playerctldName, *self.manager.props.player_names]: try: self.player = gi.repository.Playerctl.Player.new_from_name( name @@ -241,21 +244,21 @@ class MprisProvider(MirrorProvider): else: self.player = None - def updateSections(self) -> None: - self.findCurrentPlayer() + def update_sections(self) -> None: + self.find_current_player() if self.player is None: - self.status.setText(None) - self.album.setText(None) - self.artist.setText(None) - self.title.setText(None) + self.status.set_text(None) + self.album.set_text(None) + self.artist.set_text(None) + self.title.set_text(None) self.playing.clear() return player = self.player.props.player_name player = self.PROVIDERS.get(player, player) status = self.STATUSES.get(self.player.props.playback_status, "?") - self.status.setText(f"{player} {status}") + self.status.set_text(f"{player} {status}") if ( self.player.props.playback_status @@ -269,48 +272,48 @@ class MprisProvider(MirrorProvider): album = self.get(metadata, "xesam:album") if album: - self.album.setText(f" {clip(album)}") + self.album.set_text(f" {clip(album)}") else: - self.album.setText(None) + self.album.set_text(None) artists = self.get(metadata, "xesam:artist") if artists: artist = ", ".join(artists) - self.artist.setText(f" {clip(artist)}") + self.artist.set_text(f" {clip(artist)}") else: - self.artist.setText(None) + self.artist.set_text(None) - self.updateTitle() + self.update_title() - def updateTitle(self) -> None: + def update_title(self) -> None: if self.player is None: return metadata = self.player.props.metadata pos = self.player.props.position # In µs - text = f" {self.formatUs(pos)}" + text = f" {self.format_us(pos)}" dur = self.get(metadata, "mpris:length") if dur: - text += f"/{self.formatUs(dur)}" + text += f"/{self.format_us(dur)}" title = self.get(metadata, "xesam:title") if title: text += f" {clip(title)}" - self.title.setText(text) + self.title.set_text(text) def on_player_vanished( self, - manager: gi.repository.Playerctl.PlayerManager, - player: gi.repository.Playerctl.Player, + _manager: gi.repository.Playerctl.PlayerManager, + _player: gi.repository.Playerctl.Player, ) -> None: - self.updateSections() + self.update_sections() def on_event( self, - player: gi.repository.Playerctl.Player, - _: typing.Any, - manager: gi.repository.Playerctl.PlayerManager, + _player: gi.repository.Playerctl.Player, + _event_data: gi.overrides.GLib.Variant, + _manager: gi.repository.Playerctl.PlayerManager, ) -> None: - self.updateSections() + self.update_sections() def init_player(self, name: gi.repository.Playerctl.PlayerName) -> None: player = gi.repository.Playerctl.Player.new_from_name(name) @@ -325,136 +328,165 @@ class MprisProvider(MirrorProvider): self.manager.manage_player(player) def on_name_appeared( - self, manager: gi.repository.Playerctl.PlayerManager, name: str + self, + _manager: gi.repository.Playerctl.PlayerManager, + name: str, ) -> None: self.init_player(name) - self.updateSections() + self.update_sections() class CpuProvider(AlertingProvider, PeriodicStatefulProvider): + STATE_MINIMIZED = 0 + STATE_AGGREGATED = 1 + STATE_FULL = 2 + async def init(self) -> None: self.section.numberStates = 3 - self.warningThreshold = 75 - self.dangerThreshold = 95 + self.warning_threshold = 75 + self.danger_threshold = 95 async def loop(self) -> None: percent = psutil.cpu_percent(percpu=False) - self.updateLevel(percent) + self.update_level(percent) text = "" - if self.section.state >= 2: + if self.section.state >= self.STATE_FULL: percents = psutil.cpu_percent(percpu=True) text += " " + "".join([ramp(p / 100) for p in percents]) - elif self.section.state >= 1: + elif self.section.state >= self.STATE_AGGREGATED: text += " " + ramp(percent / 100) - self.section.setText(text) + self.section.set_text(text) class LoadProvider(AlertingProvider, PeriodicStatefulProvider): + STATE_MINIMIZED = 0 + STATE_AGGREGATED = 1 + STATE_FULL = 2 + async def init(self) -> None: self.section.numberStates = 3 - self.warningThreshold = 5 - self.dangerThreshold = 10 + self.warning_threshold = 5 + self.danger_threshold = 10 async def loop(self) -> None: load = os.getloadavg() - self.updateLevel(load[0]) + self.update_level(load[0]) text = "" - loads = 3 if self.section.state >= 2 else self.section.state + loads = ( + 3 if self.section.state >= self.STATE_FULL else self.section.state + ) for load_index in range(loads): text += f" {load[load_index]:.2f}" - self.section.setText(text) + self.section.set_text(text) class RamProvider(AlertingProvider, PeriodicStatefulProvider): + STATE_MINIMIZED = 0 + STATE_ICON = 1 + STATE_NUMBER = 2 + STATE_FULL = 3 + async def init(self) -> None: self.section.numberStates = 4 - self.warningThreshold = 75 - self.dangerThreshold = 95 + self.warning_threshold = 75 + self.danger_threshold = 95 async def loop(self) -> None: mem = psutil.virtual_memory() - self.updateLevel(mem.percent) + self.update_level(mem.percent) text = "" - if self.section.state >= 1: + if self.section.state >= self.STATE_ICON: text += " " + ramp(mem.percent / 100) - if self.section.state >= 2: - text += humanSize(mem.total - mem.available) - if self.section.state >= 3: - text += "/" + humanSize(mem.total) - self.section.setText(text) + if self.section.state >= self.STATE_NUMBER: + text += human_size(mem.total - mem.available) + if self.section.state >= self.STATE_FULL: + text += "/" + human_size(mem.total) + self.section.set_text(text) class TemperatureProvider(AlertingProvider, PeriodicStatefulProvider): RAMP = "" - MAIN_TEMPS = ["coretemp", "amdgpu", "cpu_thermal"] + MAIN_TEMPS = ("coretemp", "amdgpu", "cpu_thermal") # For Intel, AMD and ARM respectively. + STATE_MINIMIZED = 0 + STATE_FULL = 1 + main: str async def init(self) -> None: self.section.numberStates = 2 - allTemp = psutil.sensors_temperatures() + all_temps = psutil.sensors_temperatures() for main in self.MAIN_TEMPS: - if main in allTemp: + if main in all_temps: self.main = main break else: - raise IndexError("Could not find suitable temperature sensor") + msg = "Could not find suitable temperature sensor" + raise IndexError(msg) - temp = allTemp[self.main][0] - self.warningThreshold = temp.high or 90.0 - self.dangerThreshold = temp.critical or 100.0 + temp = all_temps[self.main][0] + self.warning_threshold = temp.high or 90.0 + self.danger_threshold = temp.critical or 100.0 async def loop(self) -> None: - allTemp = psutil.sensors_temperatures() - temp = allTemp[self.main][0] - self.updateLevel(temp.current) + all_temps = psutil.sensors_temperatures() + temp = all_temps[self.main][0] + self.update_level(temp.current) - text = ramp(temp.current / self.warningThreshold, self.RAMP) - if self.section.state >= 1: + text = ramp(temp.current / self.warning_threshold, self.RAMP) + if self.section.state >= self.STATE_FULL: text += f" {temp.current:.0f}°C" - self.section.setText(text) + self.section.set_text(text) class BatteryProvider(AlertingProvider, PeriodicStatefulProvider): # TODO Support ACPID for events RAMP = "" + STATE_MINIMIZED = 0 + STATE_PERCENTAGE = 1 + STATE_ESTIMATE = 2 + async def init(self) -> None: self.section.numberStates = 3 # TODO 1 refresh rate is too quick - self.warningThreshold = 75 - self.dangerThreshold = 95 + self.warning_threshold = 75 + self.danger_threshold = 95 async def loop(self) -> None: bat = psutil.sensors_battery() if not bat: - self.section.setText(None) + self.section.set_text(None) - self.updateLevel(100 - bat.percent) + self.update_level(100 - bat.percent) text = "" if bat.power_plugged else "" text += ramp(bat.percent / 100, self.RAMP) - if self.section.state >= 1: + if self.section.state >= self.STATE_PERCENTAGE: text += f" {bat.percent:.0f}%" - if self.section.state >= 2: + if self.section.state >= self.STATE_ESTIMATE: h = int(bat.secsleft / 3600) m = int((bat.secsleft - h * 3600) / 60) text += f" ({h:d}:{m:02d})" - self.section.setText(text) + self.section.set_text(text) class PulseaudioProvider( MirrorProvider, StatefulSectionProvider, MultiSectionsProvider ): - async def getSectionUpdater(self, section: Section) -> typing.Callable: + STATE_MINIMIZED = 0 + STATE_BAR = 1 + STATE_PERCENTAGE = 2 + + async def get_section_updater(self, section: Section) -> typing.Callable: assert isinstance(section, StatefulSection) assert isinstance(section.sortKey, str) @@ -478,7 +510,7 @@ class PulseaudioProvider( icon = "" section.numberStates = 3 - section.state = 1 + section.state = self.STATE_BAR # TODO Change volume with wheel @@ -491,23 +523,21 @@ class PulseaudioProvider( "frobar-get-volume" ) as pulse: vol = await pulse.volume_get_all_chans(sink) - if section.state == 1: + if section.state == self.STATE_BAR: text += f" {ramp(vol)}" - elif section.state == 2: + elif section.state == self.STATE_PERCENTAGE: text += f" {vol:.0%}" # TODO Show which is default - section.setText(text) + section.set_text(text) - section.setChangedState(updater) + section.set_changed_state(updater) return updater async def update(self) -> None: async with pulsectl_asyncio.PulseAsync("frobar-list-sinks") as pulse: - self.sinks = dict( - (sink.name, sink) for sink in await pulse.sink_list() - ) - await self.updateSections(set(self.sinks.keys()), self.module) + self.sinks = {sink.name: sink for sink in await pulse.sink_list()} + await self.update_sections(set(self.sinks.keys()), self.module) async def run(self) -> None: await super().run() @@ -515,7 +545,7 @@ class PulseaudioProvider( async with pulsectl_asyncio.PulseAsync( "frobar-events-listener" ) as pulse: - async for event in pulse.subscribe_events( + async for _ in pulse.subscribe_events( pulsectl.PulseEventMaskEnum.sink ): await self.update() @@ -527,9 +557,15 @@ class NetworkProvider( StatefulSectionProvider, MultiSectionsProvider, ): + STATE_MINIMIZED = 0 + STATE_SSID = 1 + STATE_IP = 2 + STATE_SPEED = 3 + STATE_TOTALS = 4 + def __init__( self, - color: rich.color.Color = rich.color.Color.default(), + color: rich.color.Color = RICH_DEFAULT_COLOR, ) -> None: super().__init__(color=color) @@ -539,27 +575,19 @@ class NetworkProvider( self.io_counters = psutil.net_io_counters(pernic=True) @staticmethod - def getIfaceAttributes(iface: str) -> tuple[bool, str, bool]: + def get_iface_attribute(iface: str) -> tuple[bool, str, bool]: relevant = True icon = "?" wifi = False if iface == "lo": relevant = False - elif iface.startswith("eth") or iface.startswith("enp"): - if "u" in iface: - icon = "" - else: - icon = "" - elif iface.startswith("wlan") or iface.startswith("wl"): + elif iface.startswith(("eth", "enp")): + icon = "" if "u" in iface else "" + elif iface.startswith(("wlan", "wl")): icon = "" wifi = True - elif ( - iface.startswith("tun") - or iface.startswith("tap") - or iface.startswith("wg") - ): + elif iface.startswith(("tun", "tap", "wg")): icon = "" - elif iface.startswith("docker"): icon = "" elif iface.startswith("veth"): @@ -569,30 +597,30 @@ class NetworkProvider( return relevant, icon, wifi - async def getSectionUpdater(self, section: Section) -> typing.Callable: + async def get_section_updater(self, section: Section) -> typing.Callable: assert isinstance(section, StatefulSection) assert isinstance(section.sortKey, str) iface = section.sortKey - relevant, icon, wifi = self.getIfaceAttributes(iface) + relevant, icon, wifi = self.get_iface_attribute(iface) if not relevant: - return self.doNothing + return self.do_nothing section.numberStates = 5 if wifi else 4 - section.state = 1 if wifi else 0 + section.state = self.STATE_SSID if wifi else self.STATE_MINIMIZED async def update() -> None: assert isinstance(section, StatefulSection) if not self.if_stats[iface].isup: - section.setText(None) + section.set_text(None) return text = icon state = section.state + (0 if wifi else 1) - if wifi and state >= 1: # SSID + if wifi and state >= self.STATE_SSID: cmd = ["iwgetid", iface, "--raw"] proc = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE @@ -600,7 +628,7 @@ class NetworkProvider( stdout, stderr = await proc.communicate() text += f" {stdout.decode().strip()}" - if state >= 2: # Address + if state >= self.STATE_IP: for address in self.if_addrs[iface]: if address.family == socket.AF_INET: net = ipaddress.IPv4Network( @@ -609,23 +637,23 @@ class NetworkProvider( text += f" {address.address}/{net.prefixlen}" break - if state >= 3: # Speed - prevRecv = self.prev_io_counters[iface].bytes_recv + if state >= self.STATE_SPEED: + prev_recv = self.prev_io_counters[iface].bytes_recv recv = self.io_counters[iface].bytes_recv - prevSent = self.prev_io_counters[iface].bytes_sent + prev_sent = self.prev_io_counters[iface].bytes_sent sent = self.io_counters[iface].bytes_sent dt = self.time - self.prev_time - recvDiff = (recv - prevRecv) / dt - sentDiff = (sent - prevSent) / dt - text += f" ↓{humanSize(recvDiff)}↑{humanSize(sentDiff)}" + recv_diff = (recv - prev_recv) / dt + sent_diff = (sent - prev_sent) / dt + text += f" ↓{human_size(recv_diff)}↑{human_size(sent_diff)}" - if state >= 4: # Counter - text += f" ⇓{humanSize(recv)}⇑{humanSize(sent)}" + if state >= self.STATE_TOTALS: + text += f" ⇓{human_size(recv)}⇑{human_size(sent)}" - section.setText(text) + section.set_text(text) - section.setChangedState(update) + section.set_changed_state(update) return update @@ -643,17 +671,17 @@ class NetworkProvider( self.if_addrs = psutil.net_if_addrs() self.io_counters = psutil.net_io_counters(pernic=True) - await self.updateSections(set(self.if_stats.keys()), self.module) + await self.update_sections(set(self.if_stats.keys()), self.module) class TimeProvider(PeriodicStatefulProvider): - FORMATS = ["%H:%M", "%m-%d %H:%M:%S", "%a %y-%m-%d %H:%M:%S"] + FORMATS = ("%H:%M", "%m-%d %H:%M:%S", "%a %y-%m-%d %H:%M:%S") async def init(self) -> None: self.section.state = 1 self.section.numberStates = len(self.FORMATS) async def loop(self) -> None: - now = datetime.datetime.now() - format = self.FORMATS[self.section.state] - self.section.setText(now.strftime(format)) + now = datetime.datetime.now(datetime.UTC).astimezone() + format_ = self.FORMATS[self.section.state] + self.section.set_text(now.strftime(format_)) diff --git a/hm/dev/pythonstartup.py b/hm/dev/pythonstartup.py index e1561c5..b041246 100644 --- a/hm/dev/pythonstartup.py +++ b/hm/dev/pythonstartup.py @@ -1,7 +1,6 @@ -#!/usr/bin/env python3 - -import os -import sys +import sys # noqa: I001 +import contextlib +import pathlib # From https://github.com/python/cpython/blob/v3.7.0b5/Lib/site.py#L436 @@ -11,7 +10,7 @@ def register_readline() -> None: try: import readline - import rlcompleter + import rlcompleter # noqa: F401 except ImportError: return @@ -23,14 +22,13 @@ def register_readline() -> None: else: readline.parse_and_bind("tab: complete") - try: + # An OSError here could have many causes, but the most likely one + # is that there's no .inputrc file (or .editrc file in the case of + # Mac OS X + libedit) in the expected location. In that case, we + # want to ignore the exception. + cm = contextlib.suppress(OSError) + with cm: readline.read_init_file() - except OSError: - # An OSError here could have many causes, but the most likely one - # is that there's no .inputrc file (or .editrc file in the case of - # Mac OS X + libedit) in the expected location. In that case, we - # want to ignore the exception. - pass if readline.get_current_history_length() == 0: # If no history was loaded, default to .python_history. @@ -38,14 +36,11 @@ def register_readline() -> None: # each interpreter exit when readline was already configured # through a PYTHONSTARTUP hook, see: # http://bugs.python.org/issue5845#msg198636 - history = os.path.join( - os.path.expanduser("~"), ".cache/python_history" - ) - try: + history = pathlib.Path("~/.cache/python_history").expanduser() + cm = contextlib.suppress(OSError) + with cm: readline.read_history_file(history) - except OSError: - pass atexit.register(readline.write_history_file, history) -sys.__interactivehook__ = register_readline +sys.__interactivehook__ = register_readline # noqa: attr-defined diff --git a/os/wireless/apply.py b/os/wireless/apply.py index accd75c..7e2acec 100644 --- a/os/wireless/apply.py +++ b/os/wireless/apply.py @@ -1,14 +1,15 @@ """ -Add the networks saved in wireless_networks to wpa_supplicant, -without restarting it or touching its config file. +Add the networks saved in wireless_networks to wpa_supplicant. + +Does not restart it or touch its config file. """ import json -import os +import pathlib import re import subprocess -NETWORKS_FILE = "/etc/keys/wireless_networks.json" +NETWORKS_FILE = pathlib.Path("/etc/keys/wireless_networks.json") def wpa_cli(command: list[str]) -> list[bytes]: @@ -20,7 +21,7 @@ def wpa_cli(command: list[str]) -> list[bytes]: return lines -network_numbers: dict[str, int] = dict() +network_numbers: dict[str, int] = {} networks_tsv = wpa_cli(["list_networks"]) networks_tsv.pop(0) for network_line in networks_tsv: @@ -34,8 +35,8 @@ for network_line in networks_tsv: ).decode() network_numbers[ssid] = number -if os.path.isfile(NETWORKS_FILE): - with open(NETWORKS_FILE) as fd: +if NETWORKS_FILE.is_file(): + with NETWORKS_FILE.open() as fd: networks = json.load(fd) for network in networks: @@ -54,4 +55,5 @@ if os.path.isfile(NETWORKS_FILE): value_str = str(value) ret = wpa_cli(["set_network", number_str, key, value_str]) if ret[0] != b"OK": - raise RuntimeError(f"Couldn't set {key} for {ssid}, got {ret}") + msg = f"Couldn't set {key} for {ssid}, got {ret}" + raise RuntimeError(msg) diff --git a/os/wireless/import.py b/os/wireless/import.py old mode 100755 new mode 100644 index e19b511..30293b0 --- a/os/wireless/import.py +++ b/os/wireless/import.py @@ -1,7 +1,4 @@ -""" -Exports Wi-Fi networks configuration stored in pass -into a format readable by Nix. -""" +"""Exports Wi-Fi networks from password store.""" # TODO EAP ca_cert=/etc/ssl/... probably won't work. Example fix: # builtins.fetchurl { @@ -11,52 +8,60 @@ into a format readable by Nix. import json import os +import pathlib import subprocess import yaml # passpy doesn't handle encoding properly, so doing this with calls -PASSWORD_STORE = os.environ["PASSWORD_STORE_DIR"] +PASSWORD_STORE = pathlib.Path(os.environ["PASSWORD_STORE_DIR"]) SUBFOLDER = "wifi" -def list_networks() -> list[str]: +def list_networks() -> list[pathlib.Path]: paths = [] - pass_folder = os.path.join(PASSWORD_STORE, SUBFOLDER) + pass_folder = PASSWORD_STORE / SUBFOLDER for filename in os.listdir(pass_folder): if not filename.endswith(".gpg"): continue - filepath = os.path.join(pass_folder, filename) - if not os.path.isfile(filepath): + filepath = pass_folder / filename + if not filepath.is_file(): continue file = filename[:-4] - path = os.path.join(SUBFOLDER, file) + path = pathlib.Path(SUBFOLDER, file) paths.append(path) paths.sort() return paths -networks: list[dict[str, str | list[str] | int]] = list() +networks: list[dict[str, str | list[str] | int]] = [] for path in list_networks(): - proc = subprocess.run(["pass", path], stdout=subprocess.PIPE, check=True) + proc = subprocess.run( + ["pass", path], # noqa: S607 + stdout=subprocess.PIPE, + check=True, + ) raw = proc.stdout.decode() split = raw.split("\n") password = split[0] - data = yaml.safe_load("\n".join(split[1:])) or dict() + data = yaml.safe_load("\n".join(split[1:])) or {} # Helpers to prevent repetition suffixes = data.pop("suffixes", [""]) data.setdefault("key_mgmt", ["WPA-PSK"] if password else ["NONE"]) if password: - if any(map(lambda m: "PSK" in m.split("-"), data["key_mgmt"])): + if any("PSK" in m.split("-") for m in data["key_mgmt"]): data["psk"] = password - if any(map(lambda m: "EAP" in m.split("-"), data["key_mgmt"])): + if any("EAP" in m.split("-") for m in data["key_mgmt"]): data["password"] = password - assert "ssid" in data, f"{path}: Missing SSID" + + if "ssid" in data: + msg = f"{path}: Missing SSID" + raise KeyError(msg) data.setdefault("disabled", 0) for suffix in suffixes: @@ -64,5 +69,5 @@ for path in list_networks(): network["ssid"] += suffix networks.append(network) -with open("wireless_networks.json", "w") as fd: +with pathlib.Path("wireless_networks.json").open("w") as fd: json.dump(networks, fd, indent=4) diff --git a/unprocessed/config/offlineimap.py b/unprocessed/config/offlineimap.py index 49aa933..72d5e90 100644 --- a/unprocessed/config/offlineimap.py +++ b/unprocessed/config/offlineimap.py @@ -1,13 +1,32 @@ -#! /usr/bin/env python2 -from subprocess import check_output +import subprocess -def get_pass(account): - return check_output("pass " + account, shell=True).splitlines()[0] - - -def beep(): - check_output( - "play -n synth sine E4 sine A5 remix 1-2 fade 0.5 1.2 0.5 2", - shell=True, +def get_pass(account: str) -> str: + proc = subprocess.run( + ["pass", account], # noqa: S607 + stdout=subprocess.PIPE, + check=True, ) + + raw = proc.stdout.decode() + return raw.splitlines()[0] + + +def beep() -> None: + cmd = [ + "play", + "-n", + "synth", + "sine", + "E4", + "sine", + "A5", + "remix", + "1-2", + "fade", + "0.5", + "1.2", + "0.5", + "2", + ] + subprocess.run(cmd, check=False)