frobarng: Forgotten dev

This commit is contained in:
Geoffrey Frogeye 2024-08-25 09:48:36 +02:00
parent f81fd6bfd2
commit c375cb2e11
Signed by: geoffrey
GPG key ID: C72403E7F82E6AD8

View file

@ -11,14 +11,45 @@ import typing
import coloredlogs
import i3ipc
import i3ipc.aio
import psutil
coloredlogs.install(level="DEBUG", fmt="%(levelname)s %(message)s")
log = logging.getLogger()
T = typing.TypeVar("T", bound="ComposableText")
P = typing.TypeVar("P", bound="ComposableText")
C = typing.TypeVar("C", bound="ComposableText")
Sortable = str | int
class ComposableText:
class ComposableText(typing.Generic[P, C]):
def __init__(
self,
parent: typing.Optional[P] = None,
sortKey: Sortable = 0,
) -> None:
self.parent: typing.Optional[P] = None
self.children: typing.MutableSequence[C] = list()
self.sortKey = sortKey
if parent:
self.setParent(parent)
self.bar = self.getFirstParentOfType(Bar)
def setParent(self, parent: P) -> None:
assert self.parent is None
parent.children.append(self)
assert isinstance(parent.children, list)
parent.children.sort(key=lambda c: c.sortKey)
self.parent = parent
self.parent.updateMarkup()
def unsetParent(self) -> None:
assert self.parent
self.parent.children.remove(self)
self.parent.updateMarkup()
self.parent = None
def getFirstParentOfType(self, typ: typing.Type[T]) -> T:
parent = self
while not isinstance(parent, typ):
@ -26,10 +57,6 @@ class ComposableText:
parent = parent.parent
return parent
def __init__(self, parent: typing.Optional["ComposableText"] = None) -> None:
self.parent = parent
self.bar = self.getFirstParentOfType(Bar)
def updateMarkup(self) -> None:
self.bar.refresh.set()
# TODO OPTI See if worth caching the output
@ -60,8 +87,8 @@ class Section(ComposableText):
Colorable block separated by chevrons
"""
def __init__(self, parent: "Module") -> None:
super().__init__(parent=parent)
def __init__(self, parent: "Module", sortKey: Sortable = 0) -> None:
super().__init__(parent=parent, sortKey=sortKey)
self.parent: "Module"
self.color = randomColor()
@ -146,8 +173,8 @@ class Module(ComposableText):
def __init__(self, parent: "Side") -> None:
super().__init__(parent=parent)
self.parent: "Side"
self.children: typing.MutableSequence[Section]
self.sections: list[Section] = []
self.mirroring: Module | None = None
self.mirrors: list[Module] = list()
@ -155,11 +182,11 @@ class Module(ComposableText):
self.mirroring = module
module.mirrors.append(self)
def getSections(self) -> list[Section]:
def getSections(self) -> typing.Sequence[Section]:
if self.mirroring:
return self.mirroring.sections
return self.mirroring.children
else:
return self.sections
return self.children
def updateMarkup(self) -> None:
super().updateMarkup()
@ -177,16 +204,16 @@ class Side(ComposableText):
def __init__(self, parent: "Screen", alignment: Alignment) -> None:
super().__init__(parent=parent)
self.parent: Screen
self.children: typing.MutableSequence[Module] = []
self.alignment = alignment
self.modules: list[Module] = []
def generateMarkup(self) -> str:
if not self.modules:
if not self.children:
return ""
text = "%{" + self.alignment.value + "}"
lastSection: Section | None = None
for module in self.modules:
for module in self.children:
for section in module.getSections():
if section.isHidden():
continue
@ -218,16 +245,16 @@ class Screen(ComposableText):
def __init__(self, parent: "Bar", output: str) -> None:
super().__init__(parent=parent)
self.parent: "Bar"
self.children: typing.MutableSequence[Side]
self.output = output
self.sides = dict()
for alignment in Alignment:
self.sides[alignment] = Side(parent=self, alignment=alignment)
Side(parent=self, alignment=alignment)
def generateMarkup(self) -> str:
return ("%{Sn" + self.output + "}") + "".join(
side.getMarkup() for side in self.sides.values()
side.getMarkup() for side in self.children
)
@ -238,19 +265,20 @@ class Bar(ComposableText):
def __init__(self) -> None:
super().__init__()
self.parent: None
self.children: typing.MutableSequence[Screen]
self.refresh = asyncio.Event()
self.taskGroup = asyncio.TaskGroup()
self.providers: list["Provider"] = list()
self.actionIndex = 0
self.actions: dict[str, typing.Callable] = dict()
self.screens = []
i3 = i3ipc.Connection()
for output in i3.get_outputs():
if not output.active:
continue
screen = Screen(parent=self, output=output.name)
self.screens.append(screen)
Screen(parent=self, output=output.name)
async def run(self) -> None:
cmd = [
@ -303,7 +331,7 @@ class Bar(ComposableText):
loop.add_signal_handler(signal.SIGINT, exit)
def generateMarkup(self) -> str:
return "".join(section.getMarkup() for section in self.screens) + "\n"
return "".join(screen.getMarkup() for screen in self.children) + "\n"
def addProvider(
self,
@ -315,11 +343,10 @@ class Bar(ComposableText):
screenNum: the provider will be added on this screen if set, all otherwise
"""
modules = list()
for s, screen in enumerate(self.screens):
for s, screen in enumerate(self.children):
if screenNum is None or s == screenNum:
side = screen.sides[alignment]
side = next(filter(lambda s: s.alignment == alignment, screen.children))
module = Module(parent=side)
side.modules.append(module)
modules.append(module)
provider.modules = modules
if modules:
@ -355,14 +382,11 @@ class MirrorProvider(Provider):
class SingleSectionProvider(MirrorProvider):
def __init__(self) -> None:
super().__init__()
self.section: Section
SECTION_CLASS = Section
async def run(self) -> None:
await super().run()
self.section = Section(parent=self.module)
self.module.sections.append(self.section)
self.section = self.SECTION_CLASS(parent=self.module)
class StaticProvider(SingleSectionProvider):
@ -374,15 +398,17 @@ class StaticProvider(SingleSectionProvider):
self.section.setText(self.text)
class StatefulProvider(SingleSectionProvider):
# TODO Should actually be a Section descendant
NUMBER_STATES: int
class StatefulSection(Section):
def __init__(self) -> None:
super().__init__()
def __init__(self, parent: Module, sortKey: Sortable = 0) -> None:
super().__init__(parent=parent, sortKey=sortKey)
self.state = 0
self.numberStates: int
self.stateChanged = asyncio.Event()
self.setAction(Button.CLICK_LEFT, self.incrementState)
self.setAction(Button.CLICK_RIGHT, self.decrementState)
def incrementState(self) -> None:
self.state += 1
self.changeState()
@ -392,14 +418,13 @@ class StatefulProvider(SingleSectionProvider):
self.changeState()
def changeState(self) -> None:
self.state %= self.NUMBER_STATES
self.state %= self.numberStates
self.stateChanged.set()
self.stateChanged.clear()
async def run(self) -> None:
await super().run()
self.section.setAction(Button.CLICK_LEFT, self.incrementState)
self.section.setAction(Button.CLICK_RIGHT, self.decrementState)
class StatefulProvider(SingleSectionProvider):
SECTION_CLASS = StatefulSection
# Providers
@ -444,18 +469,14 @@ class I3WorkspacesProvider(Provider):
workspaces = await i3.get_workspaces()
for workspace in workspaces:
module = self.modulesFromOutput[workspace.output]
insert = False
if workspace.num in self.sections:
section = self.sections[workspace.num]
if section.parent != module:
section.parent.sections.remove(section)
section.parent = module
section.updateMarkup()
insert = True
section.unsetParent()
section.setParent(module)
else:
section = Section(parent=module)
section = Section(parent=module, sortKey=workspace.num)
self.sections[workspace.num] = section
insert = True
def generate_switch_workspace(num: int) -> typing.Callable:
def switch_workspace() -> None:
@ -468,10 +489,6 @@ class I3WorkspacesProvider(Provider):
section.setAction(
Button.CLICK_LEFT, generate_switch_workspace(workspace.num)
)
if insert:
module.sections.append(section)
revSections = dict((v, k) for k, v in self.sections.items())
module.sections.sort(key=lambda s: revSections[s])
name = workspace.name
if workspace.urgent:
name = f"{name} !"
@ -514,29 +531,101 @@ class I3WorkspacesProvider(Provider):
await i3.main()
class TimeProvider(StatefulProvider):
FORMATS = ["%H:%M", "%m-%d %H:%M:%S", "%a %y-%m-%d %H:%M:%S"]
NUMBER_STATES = len(FORMATS)
class NetworkProviderSection(StatefulSection):
def __init__(self, parent: Module, iface: str, provider: "NetworkProvider") -> None:
super().__init__(parent=parent, sortKey=iface)
self.iface = iface
self.provider = provider
self.ignore = False
self.icon = "?"
self.wifi = False
if iface == "lo":
self.ignore = True
elif iface.startswith("eth") or iface.startswith("enp"):
if "u" in iface:
self.icon = ""
else:
self.icon = ""
elif iface.startswith("wlan") or iface.startswith("wl"):
self.icon = ""
self.wifi = True
elif (
iface.startswith("tun") or iface.startswith("tap") or iface.startswith("wg")
):
self.icon = ""
elif iface.startswith("docker"):
self.icon = ""
elif iface.startswith("veth"):
self.icon = ""
elif iface.startswith("vboxnet"):
self.icon = ""
self.numberStates = 5 if self.wifi else 4
self.state = 1 if self.wifi else 0
async def getText(self) -> str | None:
if self.ignore or not self.provider.if_stats[self.iface].isup:
return None
text = self.icon
return text
class NetworkProvider(MirrorProvider):
def __init__(self) -> None:
self.sections: dict[str, NetworkProviderSection] = dict()
async def updateIface(self, iface: str) -> None:
section = self.sections[iface]
section.setText(await section.getText())
async def run(self) -> None:
await super().run()
self.state = 1
while True:
# if_addrs: dict[str, list[psutil._common.snicaddr]] = psutil.net_if_addrs()
# io_counters: dict[str, psutil._common.snetio] = psutil.net_io_counters(pernic=True)
async with asyncio.TaskGroup() as tg:
self.if_stats = psutil.net_if_stats()
for iface in self.if_stats:
if iface not in self.sections:
section = NetworkProviderSection(
parent=self.module, iface=iface, provider=self
)
self.sections[iface] = section
tg.create_task(self.updateIface(iface))
for iface, section in self.sections.items():
if iface not in self.if_stats:
section.setText(None)
tg.create_task(asyncio.sleep(1))
class TimeProvider(StatefulProvider):
FORMATS = ["%H:%M", "%m-%d %H:%M:%S", "%a %y-%m-%d %H:%M:%S"]
async def run(self) -> None:
await super().run()
assert isinstance(self.section, StatefulSection)
self.section.state = 1
self.section.numberStates = len(self.FORMATS)
while True:
now = datetime.datetime.now()
format = self.FORMATS[self.state]
format = self.FORMATS[self.section.state]
self.section.setText(now.strftime(format))
remaining = 1 - now.microsecond / 1000000
try:
await asyncio.wait_for(self.stateChanged.wait(), remaining)
await asyncio.wait_for(self.section.stateChanged.wait(), remaining)
except TimeoutError:
pass
async def main() -> None:
bar = Bar()
dualScreen = len(bar.screens) > 1
dualScreen = len(bar.children) > 1
bar.addProvider(I3ModeProvider(), alignment=Alignment.LEFT)
bar.addProvider(I3WorkspacesProvider(), alignment=Alignment.LEFT)
@ -557,7 +646,7 @@ async def main() -> None:
alignment=Alignment.RIGHT,
)
bar.addProvider(
StaticProvider("network"),
NetworkProvider(),
screenNum=0 if dualScreen else None,
alignment=Alignment.RIGHT,
)