Merge remote-tracking branch 'origin/main'

This commit is contained in:
Geoffrey Frogeye 2024-09-02 16:28:40 +02:00
commit ae61f0c6d4
4 changed files with 433 additions and 115 deletions

View file

@ -1,4 +1,4 @@
{ pkgs, lib, config, nixos-hardware, displaylinknixpkgs, ... }: { pkgs, lib, nixos-hardware, unixpkgs, ... }:
let let
displays = { displays = {
embedded = { embedded = {
@ -65,7 +65,10 @@ in
}; };
nixpkgs.overlays = [ nixpkgs.overlays = [
(self: super: { (self: super: {
displaylink = (import displaylinknixpkgs { inherit (super) system; config.allowUnfree = true; }).displaylink; displaylink = (import unixpkgs {
inherit (super) system;
config.allowUnfree = true;
}).displaylink;
}) })
]; ];
services = { services = {

View file

@ -142,11 +142,11 @@
] ]
}, },
"locked": { "locked": {
"lastModified": 1722217815, "lastModified": 1724895876,
"narHash": "sha256-8r5AJ3n8WEDw3rsZLALSuFQ5kJyWOcssNZvPxYLr2yc=", "narHash": "sha256-GSqAwa00+vRuHbq9O/yRv7Ov7W/pcMLis3HmeHv8a+Q=",
"owner": "nix-community", "owner": "nix-community",
"repo": "disko", "repo": "disko",
"rev": "1e6f8a7b4634fc051cc9361959bf414fcf17e094", "rev": "511388d837178979de66d14ca4a2ebd5f7991cd3",
"type": "github" "type": "github"
}, },
"original": { "original": {
@ -154,22 +154,6 @@
"type": "indirect" "type": "indirect"
} }
}, },
"displaylinknixpkgs": {
"locked": {
"lastModified": 1717533296,
"narHash": "sha256-TOxOpOYy/tQB+eYQOTPQXNeUmkMghLVDBO0Gc2nj/vs=",
"owner": "GeoffreyFrogeye",
"repo": "nixpkgs",
"rev": "99006b6f4cd24796b1ff6b6981b8f44c9cebd301",
"type": "github"
},
"original": {
"owner": "GeoffreyFrogeye",
"ref": "displaylink-600",
"repo": "nixpkgs",
"type": "github"
}
},
"flake-compat": { "flake-compat": {
"locked": { "locked": {
"lastModified": 1696426674, "lastModified": 1696426674,
@ -224,11 +208,11 @@
] ]
}, },
"locked": { "locked": {
"lastModified": 1719994518, "lastModified": 1725024810,
"narHash": "sha256-pQMhCCHyQGRzdfAkdJ4cIWiw+JNuWsTX7f0ZYSyz0VY=", "narHash": "sha256-ODYRm8zHfLTH3soTFWE452ydPYz2iTvr9T8ftDMUQ3E=",
"owner": "hercules-ci", "owner": "hercules-ci",
"repo": "flake-parts", "repo": "flake-parts",
"rev": "9227223f6d922fee3c7b190b2cc238a99527bbb7", "rev": "af510d4a62d071ea13925ce41c95e3dec816c01d",
"type": "github" "type": "github"
}, },
"original": { "original": {
@ -285,11 +269,11 @@
] ]
}, },
"locked": { "locked": {
"lastModified": 1721042469, "lastModified": 1724857454,
"narHash": "sha256-6FPUl7HVtvRHCCBQne7Ylp4p+dpP3P/OYuzjztZ4s70=", "narHash": "sha256-Qyl9Q4QMTLZnnBb/8OuQ9LSkzWjBU1T5l5zIzTxkkhk=",
"owner": "cachix", "owner": "cachix",
"repo": "git-hooks.nix", "repo": "git-hooks.nix",
"rev": "f451c19376071a90d8c58ab1a953c6e9840527fd", "rev": "4509ca64f1084e73bc7a721b20c669a8d4c5ebe6",
"type": "github" "type": "github"
}, },
"original": { "original": {
@ -408,11 +392,11 @@
] ]
}, },
"locked": { "locked": {
"lastModified": 1722082646, "lastModified": 1724994893,
"narHash": "sha256-od8dBWVP/ngg0cuoyEl/w9D+TCNDj6Kh4tr151Aax7w=", "narHash": "sha256-yutISDGg6HUaZqCaa54EcsfTwew3vhNtt/FNXBBo44g=",
"owner": "lnl7", "owner": "lnl7",
"repo": "nix-darwin", "repo": "nix-darwin",
"rev": "0413754b3cdb879ba14f6e96915e5fdf06c6aab6", "rev": "c8d3157d1f768e382de5526bb38e74d2245cad04",
"type": "github" "type": "github"
}, },
"original": { "original": {
@ -476,11 +460,11 @@
}, },
"nixos-hardware": { "nixos-hardware": {
"locked": { "locked": {
"lastModified": 1722332872, "lastModified": 1724878143,
"narHash": "sha256-2xLM4sc5QBfi0U/AANJAW21Bj4ZX479MHPMPkB+eKBU=", "narHash": "sha256-UjpKo92iZ25M05kgSOw/Ti6VZwpgdlOa73zHj8OcaDk=",
"owner": "NixOS", "owner": "NixOS",
"repo": "nixos-hardware", "repo": "nixos-hardware",
"rev": "14c333162ba53c02853add87a0000cbd7aa230c2", "rev": "95c3dfe6ef2e96ddc1ccdd7194e3cda02ca9a8ef",
"type": "github" "type": "github"
}, },
"original": { "original": {
@ -490,11 +474,11 @@
}, },
"nixpkgs": { "nixpkgs": {
"locked": { "locked": {
"lastModified": 1722221733, "lastModified": 1725001927,
"narHash": "sha256-sga9SrrPb+pQJxG1ttJfMPheZvDOxApFfwXCFO0H9xw=", "narHash": "sha256-eV+63gK0Mp7ygCR0Oy4yIYSNcum2VQwnZamHxYTNi+M=",
"owner": "NixOS", "owner": "NixOS",
"repo": "nixpkgs", "repo": "nixpkgs",
"rev": "12bf09802d77264e441f48e25459c10c93eada2e", "rev": "6e99f2a27d600612004fbd2c3282d614bfee6421",
"type": "github" "type": "github"
}, },
"original": { "original": {
@ -549,11 +533,11 @@
"treefmt-nix": "treefmt-nix" "treefmt-nix": "treefmt-nix"
}, },
"locked": { "locked": {
"lastModified": 1722353133, "lastModified": 1725107436,
"narHash": "sha256-16GH5+2ctcKLSAG8cxtR6YZwAemymIJGKHatLfMWA7I=", "narHash": "sha256-84Rz+GeFifzaJHnyMlkz4TdnrWxQdryTQKU3XVFQR1Q=",
"owner": "nix-community", "owner": "nix-community",
"repo": "nixvim", "repo": "nixvim",
"rev": "d69fb1bd7114a56532e666dc450c46cb42d382e0", "rev": "7cae6d0202140ec322e18b65b63d03b423d595f7",
"type": "github" "type": "github"
}, },
"original": { "original": {
@ -603,11 +587,11 @@
}, },
"nur": { "nur": {
"locked": { "locked": {
"lastModified": 1722362301, "lastModified": 1725177927,
"narHash": "sha256-e60ThtBCc2Y0jNSEgADr0/zZ2R4elsdMAGVGGNejF4c=", "narHash": "sha256-l6Wu5dnme8LpkdLYe+/WxKzK5Pgi96Iiuge9wfnzb4E=",
"owner": "nix-community", "owner": "nix-community",
"repo": "NUR", "repo": "NUR",
"rev": "c96302f9ba5788dd1913ea4d001c29ed29101bd2", "rev": "90b3a926d1c4d52c2d3851702be75cbde4e13a0f",
"type": "github" "type": "github"
}, },
"original": { "original": {
@ -619,7 +603,6 @@
"root": { "root": {
"inputs": { "inputs": {
"disko": "disko", "disko": "disko",
"displaylinknixpkgs": "displaylinknixpkgs",
"flake-utils": "flake-utils", "flake-utils": "flake-utils",
"home-manager": "home-manager", "home-manager": "home-manager",
"nix-on-droid": "nix-on-droid", "nix-on-droid": "nix-on-droid",
@ -627,7 +610,8 @@
"nixpkgs": "nixpkgs", "nixpkgs": "nixpkgs",
"nixvim": "nixvim", "nixvim": "nixvim",
"nur": "nur", "nur": "nur",
"stylix": "stylix" "stylix": "stylix",
"unixpkgs": "unixpkgs"
} }
}, },
"scss-reset": { "scss-reset": {
@ -700,11 +684,11 @@
] ]
}, },
"locked": { "locked": {
"lastModified": 1722330636, "lastModified": 1724833132,
"narHash": "sha256-uru7JzOa33YlSRwf9sfXpJG+UAV+bnBEYMjrzKrQZFw=", "narHash": "sha256-F4djBvyNRAXGusJiNYInqR6zIMI3rvlp6WiKwsRISos=",
"owner": "numtide", "owner": "numtide",
"repo": "treefmt-nix", "repo": "treefmt-nix",
"rev": "768acdb06968e53aa1ee8de207fd955335c754b7", "rev": "3ffd842a5f50f435d3e603312eefa4790db46af5",
"type": "github" "type": "github"
}, },
"original": { "original": {
@ -712,6 +696,21 @@
"repo": "treefmt-nix", "repo": "treefmt-nix",
"type": "github" "type": "github"
} }
},
"unixpkgs": {
"locked": {
"lastModified": 1725183711,
"narHash": "sha256-gkjg8FfjL92azt3gzZUm1+v+U4y+wbQE630uIf4Aybo=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "a2c345850e5e1d96c62e7fa8ca6c9d77ebad1c37",
"type": "github"
},
"original": {
"id": "nixpkgs",
"ref": "master",
"type": "indirect"
}
} }
}, },
"root": "root", "root": "root",

View file

@ -4,7 +4,7 @@
inputs = { inputs = {
# Packages # Packages
nixpkgs.url = "nixpkgs/nixos-24.05"; nixpkgs.url = "nixpkgs/nixos-24.05";
displaylinknixpkgs.url = "github:GeoffreyFrogeye/nixpkgs/displaylink-600"; unixpkgs.url = "nixpkgs/master";
# OS # OS
disko = { disko = {
url = "disko"; url = "disko";

View file

@ -3,27 +3,63 @@
import asyncio import asyncio
import datetime import datetime
import enum import enum
import logging
import random import random
import signal import signal
import typing import typing
import coloredlogs
import i3ipc import i3ipc
import i3ipc.aio
import psutil
coloredlogs.install(level="DEBUG", fmt="%(levelname)s %(message)s")
log = logging.getLogger()
T = typing.TypeVar("T", bound="ComposableText")
P = typing.TypeVar("P", bound="ComposableText")
C = typing.TypeVar("C", bound="ComposableText")
Sortable = str | int
class ComposableText: class ComposableText(typing.Generic[P, C]):
def __init__(self, parent: typing.Optional["ComposableText"] = None) -> None:
def __init__(
self,
parent: typing.Optional[P] = None,
sortKey: Sortable = 0,
) -> None:
self.parent: typing.Optional[P] = None
self.children: typing.MutableSequence[C] = list()
self.sortKey = sortKey
if parent:
self.setParent(parent)
self.bar = self.getFirstParentOfType(Bar)
def setParent(self, parent: P) -> None:
assert self.parent is None
parent.children.append(self)
assert isinstance(parent.children, list)
parent.children.sort(key=lambda c: c.sortKey)
self.parent = parent self.parent = parent
self.parent.updateMarkup()
prevParent = self def unsetParent(self) -> None:
while parent: assert self.parent
prevParent = parent self.parent.children.remove(self)
self.parent.updateMarkup()
self.parent = None
def getFirstParentOfType(self, typ: typing.Type[T]) -> T:
parent = self
while not isinstance(parent, typ):
assert parent.parent, f"{self} doesn't have a parent of {typ}"
parent = parent.parent parent = parent.parent
assert isinstance(prevParent, Bar) return parent
self.bar: Bar = prevParent
def updateMarkup(self) -> None: def updateMarkup(self) -> None:
self.bar.refresh.set() self.bar.refresh.set()
# OPTI See if worth caching the output # TODO OPTI See if worth caching the output
def generateMarkup(self) -> str: def generateMarkup(self) -> str:
raise NotImplementedError(f"{self} cannot generate markup") raise NotImplementedError(f"{self} cannot generate markup")
@ -38,61 +74,95 @@ def randomColor(seed: int | bytes | None = None) -> str:
return "#" + "".join(f"{random.randint(0, 0xff):02x}" for _ in range(3)) return "#" + "".join(f"{random.randint(0, 0xff):02x}" for _ in range(3))
class Button(enum.Enum):
CLICK_LEFT = "1"
CLICK_MIDDLE = "2"
CLICK_RIGHT = "3"
SCROLL_UP = "4"
SCROLL_DOWN = "5"
class Section(ComposableText): class Section(ComposableText):
""" """
Colorable block separated by chevrons Colorable block separated by chevrons
""" """
def __init__(self, parent: "Module") -> None: def __init__(self, parent: "Module", sortKey: Sortable = 0) -> None:
super().__init__(parent=parent) super().__init__(parent=parent, sortKey=sortKey)
self.parent: "Module"
self.color = randomColor() self.color = randomColor()
self.text: str = "" self.desiredText: str | None = None
self.size = 0 self.text = ""
self.targetSize = -1
self.size = -1
self.animationTask: asyncio.Task | None = None self.animationTask: asyncio.Task | None = None
self.actions: dict[Button, str] = dict()
def isHidden(self) -> bool: def isHidden(self) -> bool:
return self.text is None return self.size < 0
# Geometric series # Geometric series, with a cap
ANIM_A = 0.025 ANIM_A = 0.025
ANIM_R = 0.9 ANIM_R = 0.9
ANIM_MIN = 0.001
async def animate(self) -> None: async def animate(self) -> None:
targetSize = len(self.text) increment = 1 if self.size < self.targetSize else -1
increment = 1 if self.size < targetSize else -1
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()
frameTime = loop.time() frameTime = loop.time()
animTime = self.ANIM_A animTime = self.ANIM_A
while self.size != targetSize: while self.size != self.targetSize:
self.size += increment self.size += increment
self.updateMarkup() self.updateMarkup()
animTime *= self.ANIM_R animTime *= self.ANIM_R
animTime = max(self.ANIM_MIN, animTime)
frameTime += animTime frameTime += animTime
sleepTime = frameTime - loop.time() sleepTime = frameTime - loop.time()
# In case of stress, skip refreshing by not awaiting # In case of stress, skip refreshing by not awaiting
if sleepTime > 0: if sleepTime > 0:
await asyncio.sleep(sleepTime) await asyncio.sleep(sleepTime)
else:
log.warning("Skipped an animation frame")
def setText(self, text: str | None) -> None: def setText(self, text: str | None) -> None:
# OPTI Skip if same text # OPTI Don't redraw nor reset animation if setting the same text
oldText = self.text if self.desiredText == text:
self.text = f" {text} "
if oldText == self.text:
return return
if len(oldText) == len(self.text): self.desiredText = text
self.updateMarkup() if text is None:
self.text = ""
self.targetSize = -1
else: else:
self.text = f" {text} "
self.targetSize = len(self.text)
if self.animationTask: if self.animationTask:
self.animationTask.cancel() self.animationTask.cancel()
# OPTI Skip the whole animation task if not required
if self.size == self.targetSize:
self.updateMarkup()
else:
self.animationTask = self.bar.taskGroup.create_task(self.animate()) self.animationTask = self.bar.taskGroup.create_task(self.animate())
def setAction(self, button: Button, callback: typing.Callable | None) -> None:
if button in self.actions:
command = self.actions[button]
self.bar.removeAction(command)
del self.actions[button]
if callback:
command = self.bar.addAction(callback)
self.actions[button] = command
def generateMarkup(self) -> str: def generateMarkup(self) -> str:
assert not self.isHidden()
pad = max(0, self.size - len(self.text)) pad = max(0, self.size - len(self.text))
return self.text[: self.size] + " " * pad text = self.text[: self.size] + " " * pad
for button, command in self.actions.items():
text = "%{A" + button.value + ":" + command + ":}" + text + "%{A}"
return text
class Module(ComposableText): class Module(ComposableText):
@ -102,7 +172,9 @@ class Module(ComposableText):
def __init__(self, parent: "Side") -> None: def __init__(self, parent: "Side") -> None:
super().__init__(parent=parent) super().__init__(parent=parent)
self.sections: list[Section] = [] self.parent: "Side"
self.children: typing.MutableSequence[Section]
self.mirroring: Module | None = None self.mirroring: Module | None = None
self.mirrors: list[Module] = list() self.mirrors: list[Module] = list()
@ -110,11 +182,11 @@ class Module(ComposableText):
self.mirroring = module self.mirroring = module
module.mirrors.append(self) module.mirrors.append(self)
def getSections(self) -> list[Section]: def getSections(self) -> typing.Sequence[Section]:
if self.mirroring: if self.mirroring:
return self.mirroring.sections return self.mirroring.children
else: else:
return self.sections return self.children
def updateMarkup(self) -> None: def updateMarkup(self) -> None:
super().updateMarkup() super().updateMarkup()
@ -131,15 +203,17 @@ class Alignment(enum.Enum):
class Side(ComposableText): class Side(ComposableText):
def __init__(self, parent: "Screen", alignment: Alignment) -> None: def __init__(self, parent: "Screen", alignment: Alignment) -> None:
super().__init__(parent=parent) super().__init__(parent=parent)
self.parent: Screen
self.children: typing.MutableSequence[Module] = []
self.alignment = alignment self.alignment = alignment
self.modules: list[Module] = []
def generateMarkup(self) -> str: def generateMarkup(self) -> str:
if not self.modules: if not self.children:
return "" return ""
text = "%{" + self.alignment.value + "}" text = "%{" + self.alignment.value + "}"
lastSection: Section | None = None lastSection: Section | None = None
for module in self.modules: for module in self.children:
for section in module.getSections(): for section in module.getSections():
if section.isHidden(): if section.isHidden():
continue continue
@ -170,15 +244,17 @@ class Side(ComposableText):
class Screen(ComposableText): class Screen(ComposableText):
def __init__(self, parent: "Bar", output: str) -> None: def __init__(self, parent: "Bar", output: str) -> None:
super().__init__(parent=parent) super().__init__(parent=parent)
self.parent: "Bar"
self.children: typing.MutableSequence[Side]
self.output = output self.output = output
self.sides = dict()
for alignment in Alignment: for alignment in Alignment:
self.sides[alignment] = Side(parent=self, alignment=alignment) Side(parent=self, alignment=alignment)
def generateMarkup(self) -> str: def generateMarkup(self) -> str:
return ("%{Sn" + self.output + "}") + "".join( return ("%{Sn" + self.output + "}") + "".join(
side.getMarkup() for side in self.sides.values() side.getMarkup() for side in self.children
) )
@ -189,18 +265,20 @@ class Bar(ComposableText):
def __init__(self) -> None: def __init__(self) -> None:
super().__init__() super().__init__()
self.parent: None
self.children: typing.MutableSequence[Screen]
self.refresh = asyncio.Event() self.refresh = asyncio.Event()
self.taskGroup = asyncio.TaskGroup() self.taskGroup = asyncio.TaskGroup()
self.providers: list["Provider"] = list() self.providers: list["Provider"] = list()
self.running = True self.actionIndex = 0
self.actions: dict[str, typing.Callable] = dict()
self.screens = []
i3 = i3ipc.Connection() i3 = i3ipc.Connection()
for output in i3.get_outputs(): for output in i3.get_outputs():
if not output.active: if not output.active:
continue continue
screen = Screen(parent=self, output=output.name) Screen(parent=self, output=output.name)
self.screens.append(screen)
async def run(self) -> None: async def run(self) -> None:
cmd = [ cmd = [
@ -211,30 +289,49 @@ class Bar(ComposableText):
"-f", "-f",
"DejaVuSansM Nerd Font:size=10", "DejaVuSansM Nerd Font:size=10",
] ]
proc = await asyncio.create_subprocess_exec(*cmd, stdin=asyncio.subprocess.PIPE) proc = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE
)
async def refresher() -> None: async def refresher() -> None:
assert proc.stdin assert proc.stdin
while self.running: while True:
await self.refresh.wait() await self.refresh.wait()
self.refresh.clear() self.refresh.clear()
proc.stdin.write(self.getMarkup().encode()) markup = self.getMarkup()
# log.debug(markup)
proc.stdin.write(markup.encode())
async with self.taskGroup as tg: async def actionHandler() -> None:
ref = tg.create_task(refresher()) assert proc.stdout
while True:
line = await proc.stdout.readline()
command = line.decode().strip()
callback = self.actions[command]
callback()
longRunningTasks = list()
def addLongRunningTask(coro: typing.Coroutine) -> None:
task = self.taskGroup.create_task(coro)
longRunningTasks.append(task)
async with self.taskGroup:
addLongRunningTask(refresher())
addLongRunningTask(actionHandler())
for provider in self.providers: for provider in self.providers:
tg.create_task(provider.run()) addLongRunningTask(provider.run())
def exit() -> None: def exit() -> None:
print("Terminating") log.info("Terminating")
ref.cancel() for task in longRunningTasks:
self.running = False task.cancel()
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGINT, exit) loop.add_signal_handler(signal.SIGINT, exit)
def generateMarkup(self) -> str: def generateMarkup(self) -> str:
return "".join(section.getMarkup() for section in self.screens) + "\n" return "".join(screen.getMarkup() for screen in self.children) + "\n"
def addProvider( def addProvider(
self, self,
@ -246,16 +343,24 @@ class Bar(ComposableText):
screenNum: the provider will be added on this screen if set, all otherwise screenNum: the provider will be added on this screen if set, all otherwise
""" """
modules = list() modules = list()
for s, screen in enumerate(self.screens): for s, screen in enumerate(self.children):
if screenNum is None or s == screenNum: if screenNum is None or s == screenNum:
side = screen.sides[alignment] side = next(filter(lambda s: s.alignment == alignment, screen.children))
module = Module(parent=side) module = Module(parent=side)
side.modules.append(module)
modules.append(module) modules.append(module)
provider.modules = modules provider.modules = modules
if modules: if modules:
self.providers.append(provider) self.providers.append(provider)
def addAction(self, callback: typing.Callable) -> str:
command = f"{self.actionIndex:x}"
self.actions[command] = callback
self.actionIndex += 1
return command
def removeAction(self, command: str) -> None:
del self.actions[command]
class Provider: class Provider:
def __init__(self) -> None: def __init__(self) -> None:
@ -277,14 +382,11 @@ class MirrorProvider(Provider):
class SingleSectionProvider(MirrorProvider): class SingleSectionProvider(MirrorProvider):
def __init__(self) -> None: SECTION_CLASS = Section
super().__init__()
self.section: Section
async def run(self) -> None: async def run(self) -> None:
await super().run() await super().run()
self.section = Section(parent=self.module) self.section = self.SECTION_CLASS(parent=self.module)
self.module.sections.append(self.section)
class StaticProvider(SingleSectionProvider): class StaticProvider(SingleSectionProvider):
@ -296,26 +398,240 @@ class StaticProvider(SingleSectionProvider):
self.section.setText(self.text) self.section.setText(self.text)
class TimeProvider(SingleSectionProvider): class StatefulSection(Section):
def __init__(self, parent: Module, sortKey: Sortable = 0) -> None:
super().__init__(parent=parent, sortKey=sortKey)
self.state = 0
self.numberStates: int
self.stateChanged = asyncio.Event()
self.setAction(Button.CLICK_LEFT, self.incrementState)
self.setAction(Button.CLICK_RIGHT, self.decrementState)
def incrementState(self) -> None:
self.state += 1
self.changeState()
def decrementState(self) -> None:
self.state -= 1
self.changeState()
def changeState(self) -> None:
self.state %= self.numberStates
self.stateChanged.set()
self.stateChanged.clear()
class StatefulProvider(SingleSectionProvider):
SECTION_CLASS = StatefulSection
# Providers
class I3ModeProvider(SingleSectionProvider):
def on_mode(self, i3: i3ipc.Connection, e: i3ipc.Event) -> None:
self.section.setText(None if e.change == "default" else e.change)
async def run(self) -> None:
await super().run()
i3 = await i3ipc.aio.Connection(auto_reconnect=True).connect()
i3.on(i3ipc.Event.MODE, self.on_mode)
await i3.main()
class I3WindowTitleProvider(SingleSectionProvider):
# TODO FEAT To make this available from start, we need to find the
# `focused=True` element following the `focus` array
def on_window(self, i3: i3ipc.Connection, e: i3ipc.Event) -> None:
self.section.setText(e.container.name)
async def run(self) -> None:
await super().run()
i3 = await i3ipc.aio.Connection(auto_reconnect=True).connect()
i3.on(i3ipc.Event.WINDOW, self.on_window)
await i3.main()
class I3WorkspacesProvider(Provider):
# FIXME Custom names
# FIXME Colors
async def updateWorkspaces(self, i3: i3ipc.Connection) -> None:
"""
Since the i3 IPC interface cannot really tell you by events
when workspaces get invisible or not urgent anymore.
Relying on those exclusively would require reimplementing some of i3 logic.
Fetching all the workspaces on event looks ugly but is the most maintainable.
Times I tried to challenge this and failed: 2.
"""
workspaces = await i3.get_workspaces()
for workspace in workspaces:
module = self.modulesFromOutput[workspace.output]
if workspace.num in self.sections:
section = self.sections[workspace.num]
if section.parent != module:
section.unsetParent()
section.setParent(module)
else:
section = Section(parent=module, sortKey=workspace.num)
self.sections[workspace.num] = section
def generate_switch_workspace(num: int) -> typing.Callable:
def switch_workspace() -> None:
self.bar.taskGroup.create_task(
i3.command(f"workspace number {num}")
)
return switch_workspace
section.setAction(
Button.CLICK_LEFT, generate_switch_workspace(workspace.num)
)
name = workspace.name
if workspace.urgent:
name = f"{name} !"
elif workspace.focused:
name = f"{name} +"
elif workspace.visible:
name = f"{name} *"
section.setText(name)
workspacesNums = set(workspace.num for workspace in workspaces)
for num, section in self.sections.items():
if num not in workspacesNums:
# This should delete the Section but it turned out to be hard
section.setText(None)
def onWorkspaceChange(
self, i3: i3ipc.Connection, e: i3ipc.Event | None = None
) -> None:
# Cancelling the task doesn't seem to prevent performance double-events
self.bar.taskGroup.create_task(self.updateWorkspaces(i3))
def __init__(
self,
) -> None:
super().__init__()
self.sections: dict[int, Section] = dict()
self.modulesFromOutput: dict[str, Module] = dict()
self.bar: Bar
async def run(self) -> None:
for module in self.modules:
screen = module.getFirstParentOfType(Screen)
output = screen.output
self.modulesFromOutput[output] = module
self.bar = module.bar
i3 = await i3ipc.aio.Connection(auto_reconnect=True).connect()
i3.on(i3ipc.Event.WORKSPACE, self.onWorkspaceChange)
self.onWorkspaceChange(i3)
await i3.main()
class NetworkProviderSection(StatefulSection):
def __init__(self, parent: Module, iface: str, provider: "NetworkProvider") -> None:
super().__init__(parent=parent, sortKey=iface)
self.iface = iface
self.provider = provider
self.ignore = False
self.icon = "?"
self.wifi = False
if iface == "lo":
self.ignore = True
elif iface.startswith("eth") or iface.startswith("enp"):
if "u" in iface:
self.icon = ""
else:
self.icon = ""
elif iface.startswith("wlan") or iface.startswith("wl"):
self.icon = ""
self.wifi = True
elif (
iface.startswith("tun") or iface.startswith("tap") or iface.startswith("wg")
):
self.icon = ""
elif iface.startswith("docker"):
self.icon = ""
elif iface.startswith("veth"):
self.icon = ""
elif iface.startswith("vboxnet"):
self.icon = ""
self.numberStates = 5 if self.wifi else 4
self.state = 1 if self.wifi else 0
async def getText(self) -> str | None:
if self.ignore or not self.provider.if_stats[self.iface].isup:
return None
text = self.icon
return text
class NetworkProvider(MirrorProvider):
def __init__(self) -> None:
self.sections: dict[str, NetworkProviderSection] = dict()
async def updateIface(self, iface: str) -> None:
section = self.sections[iface]
section.setText(await section.getText())
async def run(self) -> None: async def run(self) -> None:
await super().run() await super().run()
while self.section.bar.running: while True:
# if_addrs: dict[str, list[psutil._common.snicaddr]] = psutil.net_if_addrs()
# io_counters: dict[str, psutil._common.snetio] = psutil.net_io_counters(pernic=True)
async with asyncio.TaskGroup() as tg:
self.if_stats = psutil.net_if_stats()
for iface in self.if_stats:
if iface not in self.sections:
section = NetworkProviderSection(
parent=self.module, iface=iface, provider=self
)
self.sections[iface] = section
tg.create_task(self.updateIface(iface))
for iface, section in self.sections.items():
if iface not in self.if_stats:
section.setText(None)
tg.create_task(asyncio.sleep(1))
class TimeProvider(StatefulProvider):
FORMATS = ["%H:%M", "%m-%d %H:%M:%S", "%a %y-%m-%d %H:%M:%S"]
async def run(self) -> None:
await super().run()
assert isinstance(self.section, StatefulSection)
self.section.state = 1
self.section.numberStates = len(self.FORMATS)
while True:
now = datetime.datetime.now() now = datetime.datetime.now()
# section.setText(now.strftime("%a %y-%m-%d %H:%M:%S.%f")) format = self.FORMATS[self.section.state]
self.section.setText("-" * (now.second % 10)) self.section.setText(now.strftime(format))
remaining = 1 - now.microsecond / 1000000 remaining = 1 - now.microsecond / 1000000
await asyncio.sleep(remaining) try:
await asyncio.wait_for(self.section.stateChanged.wait(), remaining)
except TimeoutError:
pass
async def main() -> None: async def main() -> None:
bar = Bar() bar = Bar()
dualScreen = len(bar.screens) > 1 dualScreen = len(bar.children) > 1
bar.addProvider(StaticProvider(text="i3 workspaces"), alignment=Alignment.LEFT) bar.addProvider(I3ModeProvider(), alignment=Alignment.LEFT)
bar.addProvider(I3WorkspacesProvider(), alignment=Alignment.LEFT)
if dualScreen: if dualScreen:
bar.addProvider( bar.addProvider(
StaticProvider(text="i3 title"), screenNum=0, alignment=Alignment.CENTER I3WindowTitleProvider(), screenNum=0, alignment=Alignment.CENTER
) )
bar.addProvider( bar.addProvider(
StaticProvider(text="mpris"), StaticProvider(text="mpris"),
@ -330,7 +646,7 @@ async def main() -> None:
alignment=Alignment.RIGHT, alignment=Alignment.RIGHT,
) )
bar.addProvider( bar.addProvider(
StaticProvider("network"), NetworkProvider(),
screenNum=0 if dualScreen else None, screenNum=0 if dualScreen else None,
alignment=Alignment.RIGHT, alignment=Alignment.RIGHT,
) )