Compare commits

..

No commits in common. "4d39ac076974f506c1b56d0c4acdfe13d72586b2" and "f81fd6bfd2021cdf4b11a4581495505059f99b15" have entirely different histories.

3 changed files with 59 additions and 151 deletions

View file

@ -1,4 +1,4 @@
{ pkgs, lib, nixos-hardware, unixpkgs, ... }:
{ pkgs, lib, config, nixos-hardware, displaylinknixpkgs, ... }:
let
displays = {
embedded = {
@ -65,10 +65,7 @@ in
};
nixpkgs.overlays = [
(self: super: {
displaylink = (import unixpkgs {
inherit (super) system;
config.allowUnfree = true;
}).displaylink;
displaylink = (import displaylinknixpkgs { inherit (super) system; config.allowUnfree = true; }).displaylink;
})
];
services = {

View file

@ -4,7 +4,7 @@
inputs = {
# Packages
nixpkgs.url = "nixpkgs/nixos-24.05";
unixpkgs.url = "nixpkgs/master";
displaylinknixpkgs.url = "github:GeoffreyFrogeye/nixpkgs/displaylink-600";
# OS
disko = {
url = "disko";

View file

@ -11,45 +11,14 @@ import typing
import coloredlogs
import i3ipc
import i3ipc.aio
import psutil
coloredlogs.install(level="DEBUG", fmt="%(levelname)s %(message)s")
log = logging.getLogger()
T = typing.TypeVar("T", bound="ComposableText")
P = typing.TypeVar("P", bound="ComposableText")
C = typing.TypeVar("C", bound="ComposableText")
Sortable = str | int
class ComposableText(typing.Generic[P, C]):
def __init__(
self,
parent: typing.Optional[P] = None,
sortKey: Sortable = 0,
) -> None:
self.parent: typing.Optional[P] = None
self.children: typing.MutableSequence[C] = list()
self.sortKey = sortKey
if parent:
self.setParent(parent)
self.bar = self.getFirstParentOfType(Bar)
def setParent(self, parent: P) -> None:
assert self.parent is None
parent.children.append(self)
assert isinstance(parent.children, list)
parent.children.sort(key=lambda c: c.sortKey)
self.parent = parent
self.parent.updateMarkup()
def unsetParent(self) -> None:
assert self.parent
self.parent.children.remove(self)
self.parent.updateMarkup()
self.parent = None
class ComposableText:
def getFirstParentOfType(self, typ: typing.Type[T]) -> T:
parent = self
while not isinstance(parent, typ):
@ -57,6 +26,10 @@ class ComposableText(typing.Generic[P, C]):
parent = parent.parent
return parent
def __init__(self, parent: typing.Optional["ComposableText"] = None) -> None:
self.parent = parent
self.bar = self.getFirstParentOfType(Bar)
def updateMarkup(self) -> None:
self.bar.refresh.set()
# TODO OPTI See if worth caching the output
@ -87,8 +60,8 @@ class Section(ComposableText):
Colorable block separated by chevrons
"""
def __init__(self, parent: "Module", sortKey: Sortable = 0) -> None:
super().__init__(parent=parent, sortKey=sortKey)
def __init__(self, parent: "Module") -> None:
super().__init__(parent=parent)
self.parent: "Module"
self.color = randomColor()
@ -173,8 +146,8 @@ class Module(ComposableText):
def __init__(self, parent: "Side") -> None:
super().__init__(parent=parent)
self.parent: "Side"
self.children: typing.MutableSequence[Section]
self.sections: list[Section] = []
self.mirroring: Module | None = None
self.mirrors: list[Module] = list()
@ -182,11 +155,11 @@ class Module(ComposableText):
self.mirroring = module
module.mirrors.append(self)
def getSections(self) -> typing.Sequence[Section]:
def getSections(self) -> list[Section]:
if self.mirroring:
return self.mirroring.children
return self.mirroring.sections
else:
return self.children
return self.sections
def updateMarkup(self) -> None:
super().updateMarkup()
@ -204,16 +177,16 @@ class Side(ComposableText):
def __init__(self, parent: "Screen", alignment: Alignment) -> None:
super().__init__(parent=parent)
self.parent: Screen
self.children: typing.MutableSequence[Module] = []
self.alignment = alignment
self.modules: list[Module] = []
def generateMarkup(self) -> str:
if not self.children:
if not self.modules:
return ""
text = "%{" + self.alignment.value + "}"
lastSection: Section | None = None
for module in self.children:
for module in self.modules:
for section in module.getSections():
if section.isHidden():
continue
@ -245,16 +218,16 @@ class Screen(ComposableText):
def __init__(self, parent: "Bar", output: str) -> None:
super().__init__(parent=parent)
self.parent: "Bar"
self.children: typing.MutableSequence[Side]
self.output = output
self.sides = dict()
for alignment in Alignment:
Side(parent=self, alignment=alignment)
self.sides[alignment] = Side(parent=self, alignment=alignment)
def generateMarkup(self) -> str:
return ("%{Sn" + self.output + "}") + "".join(
side.getMarkup() for side in self.children
side.getMarkup() for side in self.sides.values()
)
@ -265,20 +238,19 @@ class Bar(ComposableText):
def __init__(self) -> None:
super().__init__()
self.parent: None
self.children: typing.MutableSequence[Screen]
self.refresh = asyncio.Event()
self.taskGroup = asyncio.TaskGroup()
self.providers: list["Provider"] = list()
self.actionIndex = 0
self.actions: dict[str, typing.Callable] = dict()
self.screens = []
i3 = i3ipc.Connection()
for output in i3.get_outputs():
if not output.active:
continue
Screen(parent=self, output=output.name)
screen = Screen(parent=self, output=output.name)
self.screens.append(screen)
async def run(self) -> None:
cmd = [
@ -331,7 +303,7 @@ class Bar(ComposableText):
loop.add_signal_handler(signal.SIGINT, exit)
def generateMarkup(self) -> str:
return "".join(screen.getMarkup() for screen in self.children) + "\n"
return "".join(section.getMarkup() for section in self.screens) + "\n"
def addProvider(
self,
@ -343,10 +315,11 @@ class Bar(ComposableText):
screenNum: the provider will be added on this screen if set, all otherwise
"""
modules = list()
for s, screen in enumerate(self.children):
for s, screen in enumerate(self.screens):
if screenNum is None or s == screenNum:
side = next(filter(lambda s: s.alignment == alignment, screen.children))
side = screen.sides[alignment]
module = Module(parent=side)
side.modules.append(module)
modules.append(module)
provider.modules = modules
if modules:
@ -382,11 +355,14 @@ class MirrorProvider(Provider):
class SingleSectionProvider(MirrorProvider):
SECTION_CLASS = Section
def __init__(self) -> None:
super().__init__()
self.section: Section
async def run(self) -> None:
await super().run()
self.section = self.SECTION_CLASS(parent=self.module)
self.section = Section(parent=self.module)
self.module.sections.append(self.section)
class StaticProvider(SingleSectionProvider):
@ -398,17 +374,15 @@ class StaticProvider(SingleSectionProvider):
self.section.setText(self.text)
class StatefulSection(Section):
class StatefulProvider(SingleSectionProvider):
# TODO Should actually be a Section descendant
NUMBER_STATES: int
def __init__(self, parent: Module, sortKey: Sortable = 0) -> None:
super().__init__(parent=parent, sortKey=sortKey)
def __init__(self) -> None:
super().__init__()
self.state = 0
self.numberStates: int
self.stateChanged = asyncio.Event()
self.setAction(Button.CLICK_LEFT, self.incrementState)
self.setAction(Button.CLICK_RIGHT, self.decrementState)
def incrementState(self) -> None:
self.state += 1
self.changeState()
@ -418,13 +392,14 @@ class StatefulSection(Section):
self.changeState()
def changeState(self) -> None:
self.state %= self.numberStates
self.state %= self.NUMBER_STATES
self.stateChanged.set()
self.stateChanged.clear()
class StatefulProvider(SingleSectionProvider):
SECTION_CLASS = StatefulSection
async def run(self) -> None:
await super().run()
self.section.setAction(Button.CLICK_LEFT, self.incrementState)
self.section.setAction(Button.CLICK_RIGHT, self.decrementState)
# Providers
@ -469,14 +444,18 @@ class I3WorkspacesProvider(Provider):
workspaces = await i3.get_workspaces()
for workspace in workspaces:
module = self.modulesFromOutput[workspace.output]
insert = False
if workspace.num in self.sections:
section = self.sections[workspace.num]
if section.parent != module:
section.unsetParent()
section.setParent(module)
section.parent.sections.remove(section)
section.parent = module
section.updateMarkup()
insert = True
else:
section = Section(parent=module, sortKey=workspace.num)
section = Section(parent=module)
self.sections[workspace.num] = section
insert = True
def generate_switch_workspace(num: int) -> typing.Callable:
def switch_workspace() -> None:
@ -489,6 +468,10 @@ class I3WorkspacesProvider(Provider):
section.setAction(
Button.CLICK_LEFT, generate_switch_workspace(workspace.num)
)
if insert:
module.sections.append(section)
revSections = dict((v, k) for k, v in self.sections.items())
module.sections.sort(key=lambda s: revSections[s])
name = workspace.name
if workspace.urgent:
name = f"{name} !"
@ -531,101 +514,29 @@ class I3WorkspacesProvider(Provider):
await i3.main()
class NetworkProviderSection(StatefulSection):
def __init__(self, parent: Module, iface: str, provider: "NetworkProvider") -> None:
super().__init__(parent=parent, sortKey=iface)
self.iface = iface
self.provider = provider
self.ignore = False
self.icon = "?"
self.wifi = False
if iface == "lo":
self.ignore = True
elif iface.startswith("eth") or iface.startswith("enp"):
if "u" in iface:
self.icon = ""
else:
self.icon = ""
elif iface.startswith("wlan") or iface.startswith("wl"):
self.icon = ""
self.wifi = True
elif (
iface.startswith("tun") or iface.startswith("tap") or iface.startswith("wg")
):
self.icon = ""
elif iface.startswith("docker"):
self.icon = ""
elif iface.startswith("veth"):
self.icon = ""
elif iface.startswith("vboxnet"):
self.icon = ""
self.numberStates = 5 if self.wifi else 4
self.state = 1 if self.wifi else 0
async def getText(self) -> str | None:
if self.ignore or not self.provider.if_stats[self.iface].isup:
return None
text = self.icon
return text
class NetworkProvider(MirrorProvider):
def __init__(self) -> None:
self.sections: dict[str, NetworkProviderSection] = dict()
async def updateIface(self, iface: str) -> None:
section = self.sections[iface]
section.setText(await section.getText())
async def run(self) -> None:
await super().run()
while True:
# if_addrs: dict[str, list[psutil._common.snicaddr]] = psutil.net_if_addrs()
# io_counters: dict[str, psutil._common.snetio] = psutil.net_io_counters(pernic=True)
async with asyncio.TaskGroup() as tg:
self.if_stats = psutil.net_if_stats()
for iface in self.if_stats:
if iface not in self.sections:
section = NetworkProviderSection(
parent=self.module, iface=iface, provider=self
)
self.sections[iface] = section
tg.create_task(self.updateIface(iface))
for iface, section in self.sections.items():
if iface not in self.if_stats:
section.setText(None)
tg.create_task(asyncio.sleep(1))
class TimeProvider(StatefulProvider):
FORMATS = ["%H:%M", "%m-%d %H:%M:%S", "%a %y-%m-%d %H:%M:%S"]
NUMBER_STATES = len(FORMATS)
async def run(self) -> None:
await super().run()
assert isinstance(self.section, StatefulSection)
self.section.state = 1
self.section.numberStates = len(self.FORMATS)
self.state = 1
while True:
now = datetime.datetime.now()
format = self.FORMATS[self.section.state]
format = self.FORMATS[self.state]
self.section.setText(now.strftime(format))
remaining = 1 - now.microsecond / 1000000
try:
await asyncio.wait_for(self.section.stateChanged.wait(), remaining)
await asyncio.wait_for(self.stateChanged.wait(), remaining)
except TimeoutError:
pass
async def main() -> None:
bar = Bar()
dualScreen = len(bar.children) > 1
dualScreen = len(bar.screens) > 1
bar.addProvider(I3ModeProvider(), alignment=Alignment.LEFT)
bar.addProvider(I3WorkspacesProvider(), alignment=Alignment.LEFT)
@ -646,7 +557,7 @@ async def main() -> None:
alignment=Alignment.RIGHT,
)
bar.addProvider(
NetworkProvider(),
StaticProvider("network"),
screenNum=0 if dualScreen else None,
alignment=Alignment.RIGHT,
)