Compare commits

...

2 commits

Author SHA1 Message Date
Geoffrey Frogeye 6690f3aa0d
frobarng: Mirroring and more 2024-08-14 02:45:25 +02:00
Geoffrey Frogeye 7d60269b49
frobarng: animations 2024-08-14 00:28:17 +02:00

View file

@ -20,27 +20,15 @@ class ComposableText:
assert isinstance(prevParent, Bar) assert isinstance(prevParent, Bar)
self.bar: Bar = prevParent self.bar: Bar = prevParent
self.text: str def updateMarkup(self) -> None:
self.needsComposition = True
def composeText(self) -> str:
raise NotImplementedError(f"{self} cannot compose text")
def getText(self) -> str:
if self.needsComposition:
self.text = self.composeText()
print(f"{self} composed {self.text}")
self.needsComposition = False
return self.text
def setText(self, text: str) -> None:
self.text = text
self.needsComposition = False
parent = self.parent
while parent and not parent.needsComposition:
parent.needsComposition = True
parent = parent.parent
self.bar.refresh.set() self.bar.refresh.set()
# OPTI See if worth caching the output
def generateMarkup(self) -> str:
raise NotImplementedError(f"{self} cannot generate markup")
def getMarkup(self) -> str:
return self.generateMarkup()
def randomColor(seed: int | bytes | None = None) -> str: def randomColor(seed: int | bytes | None = None) -> str:
@ -57,6 +45,53 @@ class Section(ComposableText):
def __init__(self, parent: "Module") -> None: def __init__(self, parent: "Module") -> None:
super().__init__(parent=parent) super().__init__(parent=parent)
self.color = randomColor() self.color = randomColor()
self.text: str = ""
self.size = 0
self.animationTask: asyncio.Task | None = None
def isHidden(self) -> bool:
return self.text is None
# Geometric series
ANIM_A = 0.025
ANIM_R = 0.9
async def animate(self) -> None:
targetSize = len(self.text)
increment = 1 if self.size < targetSize else -1
loop = asyncio.get_running_loop()
frameTime = loop.time()
animTime = self.ANIM_A
while self.size != targetSize:
self.size += increment
self.updateMarkup()
animTime *= self.ANIM_R
frameTime += animTime
sleepTime = frameTime - loop.time()
# In case of stress, skip refreshing by not awaiting
if sleepTime > 0:
await asyncio.sleep(sleepTime)
def setText(self, text: str | None) -> None:
# OPTI Skip if same text
oldText = self.text
self.text = f" {text} "
if oldText == self.text:
return
if len(oldText) == len(self.text):
self.updateMarkup()
else:
if self.animationTask:
self.animationTask.cancel()
self.animationTask = self.bar.taskGroup.create_task(self.animate())
def generateMarkup(self) -> str:
pad = max(0, self.size - len(self.text))
return self.text[: self.size] + " " * pad
class Module(ComposableText): class Module(ComposableText):
@ -67,9 +102,23 @@ class Module(ComposableText):
def __init__(self, parent: "Side") -> None: def __init__(self, parent: "Side") -> None:
super().__init__(parent=parent) super().__init__(parent=parent)
self.sections: list[Section] = [] self.sections: list[Section] = []
self.mirroring: Module | None = None
self.mirrors: list[Module] = list()
def appendSection(self, section: Section) -> None: def mirror(self, module: "Module") -> None:
self.sections.append(section) self.mirroring = module
module.mirrors.append(self)
def getSections(self) -> list[Section]:
if self.mirroring:
return self.mirroring.sections
else:
return self.sections
def updateMarkup(self) -> None:
super().updateMarkup()
for mirror in self.mirrors:
mirror.updateMarkup()
class Alignment(enum.Enum): class Alignment(enum.Enum):
@ -84,13 +133,15 @@ class Side(ComposableText):
self.alignment = alignment self.alignment = alignment
self.modules: list[Module] = [] self.modules: list[Module] = []
def composeText(self) -> str: def generateMarkup(self) -> str:
if not self.modules: if not self.modules:
return "" return ""
text = "%{" + self.alignment.value + "}" text = "%{" + self.alignment.value + "}"
lastSection: Section | None = None lastSection: Section | None = None
for module in self.modules: for module in self.modules:
for section in module.sections: for section in module.getSections():
if section.isHidden():
continue
if lastSection is None: if lastSection is None:
if self.alignment == Alignment.LEFT: if self.alignment == Alignment.LEFT:
text += "%{B" + section.color + "}%{F-}" text += "%{B" + section.color + "}%{F-}"
@ -108,9 +159,7 @@ class Side(ComposableText):
else: else:
text += "%{R}%{B" + section.color + "}" text += "%{R}%{B" + section.color + "}"
text += "%{F-}" text += "%{F-}"
text += " " + section.getText() + " " text += section.getMarkup()
# FIXME Should be handled by animation
# FIXME Support hidden sections
lastSection = section lastSection = section
if self.alignment != Alignment.RIGHT: if self.alignment != Alignment.RIGHT:
text += "%{R}%{B-}" text += "%{R}%{B-}"
@ -126,15 +175,12 @@ class Screen(ComposableText):
for alignment in Alignment: for alignment in Alignment:
self.sides[alignment] = Side(parent=self, alignment=alignment) self.sides[alignment] = Side(parent=self, alignment=alignment)
def composeText(self) -> str: def generateMarkup(self) -> str:
return ("%{Sn" + self.output + "}") + "".join( return ("%{Sn" + self.output + "}") + "".join(
side.getText() for side in self.sides.values() side.getMarkup() for side in self.sides.values()
) )
ScreenSelection = enum.Enum("ScreenSelection", ["ALL", "PRIMARY", "SECONDARY"])
class Bar(ComposableText): class Bar(ComposableText):
""" """
Top-level Top-level
@ -142,8 +188,9 @@ class Bar(ComposableText):
def __init__(self) -> None: def __init__(self) -> None:
super().__init__() super().__init__()
self.providers: list["Provider"] = []
self.refresh = asyncio.Event() self.refresh = asyncio.Event()
self.taskGroup = asyncio.TaskGroup()
self.providers: list["Provider"] = list()
self.screens = [] self.screens = []
i3 = i3ipc.Connection() i3 = i3ipc.Connection()
@ -154,45 +201,43 @@ class Bar(ComposableText):
self.screens.append(screen) self.screens.append(screen)
async def run(self) -> None: async def run(self) -> None:
proc = await asyncio.create_subprocess_exec( cmd = [
"lemonbar", "lemonbar",
"-b", "-b",
"-a", "-a",
"64", "64",
"-f", "-f",
"DejaVuSansM Nerd Font:size=10", "DejaVuSansM Nerd Font:size=10",
stdin=asyncio.subprocess.PIPE, ]
) proc = await asyncio.create_subprocess_exec(*cmd, stdin=asyncio.subprocess.PIPE)
async def refresher() -> None: async def refresher() -> None:
assert proc.stdin assert proc.stdin
while True: while True:
await self.refresh.wait() await self.refresh.wait()
self.refresh.clear() self.refresh.clear()
proc.stdin.write(self.getText().encode()) proc.stdin.write(self.getMarkup().encode())
async with asyncio.TaskGroup() as tg: async with self.taskGroup as tg:
tg.create_task(refresher()) tg.create_task(refresher())
for updater in self.providers: for provider in self.providers:
tg.create_task(updater.run()) tg.create_task(provider.run())
def composeText(self) -> str: def generateMarkup(self) -> str:
return "".join(section.getText() for section in self.screens) + "\n" return "".join(section.getMarkup() for section in self.screens) + "\n"
def addProvider( def addProvider(
self, self,
provider: "Provider", provider: "Provider",
alignment: Alignment = Alignment.LEFT, alignment: Alignment = Alignment.LEFT,
screenSelection: ScreenSelection = ScreenSelection.ALL, screenNum: int | None = None,
) -> None: ) -> None:
# FIXME Actually have a screenNum and screenCount args """
screenNum: the provider will be added on this screen if set, all otherwise
"""
modules = list() modules = list()
for s, screen in enumerate(self.screens): for s, screen in enumerate(self.screens):
if ( if screenNum is None or s == screenNum:
screenSelection == ScreenSelection.ALL
or (screenSelection == ScreenSelection.PRIMARY and s == 0)
or (screenSelection == ScreenSelection.SECONDARY and s == 1)
):
side = screen.sides[alignment] side = screen.sides[alignment]
module = Module(parent=side) module = Module(parent=side)
side.modules.append(module) side.modules.append(module)
@ -210,32 +255,79 @@ class Provider:
raise NotImplementedError() raise NotImplementedError()
class TimeProvider(Provider): class MirrorProvider(Provider):
def __init__(self) -> None:
super().__init__()
self.module: Module
async def run(self) -> None: async def run(self) -> None:
sections = list() self.module = self.modules[0]
for module in self.modules: for module in self.modules[1:]:
section = Section(parent=module) module.mirror(self.module)
module.sections.append(section)
sections.append(section)
# FIXME Allow for mirror(ed) modules so no need for updaters to handle all class SingleSectionProvider(MirrorProvider):
def __init__(self) -> None:
super().__init__()
self.section: Section
async def run(self) -> None:
await super().run()
self.section = Section(parent=self.module)
self.module.sections.append(self.section)
class StaticProvider(SingleSectionProvider):
def __init__(self, text: str) -> None:
self.text = text
async def run(self) -> None:
await super().run()
self.section.setText(self.text)
class TimeProvider(SingleSectionProvider):
async def run(self) -> None:
await super().run()
while True: while True:
for s, section in enumerate(sections): now = datetime.datetime.now()
now = datetime.datetime.now() # section.setText(now.strftime("%a %y-%m-%d %H:%M:%S.%f"))
section.setText(now.strftime("%a %y-%m-%d %H:%M:%S")) self.section.setText("-" * (now.second % 10))
await asyncio.sleep(1) remaining = 1 - now.microsecond / 1000000
await asyncio.sleep(remaining)
async def main() -> None: async def main() -> None:
bar = Bar() bar = Bar()
bar.addProvider(TimeProvider()) dualScreen = len(bar.screens) > 1
# bar.addProvider(TimeProvider(), screenSelection=ScreenSelection.PRIMARY)
# bar.addProvider(TimeProvider(), alignment=Alignment.CENTER) bar.addProvider(StaticProvider(text="i3 workspaces"), alignment=Alignment.LEFT)
# bar.addProvider(TimeProvider(), alignment=Alignment.CENTER) if dualScreen:
# bar.addProvider(TimeProvider(), alignment=Alignment.RIGHT) bar.addProvider(
# bar.addProvider(TimeProvider(), alignment=Alignment.RIGHT) StaticProvider(text="i3 title"), screenNum=0, alignment=Alignment.CENTER
)
bar.addProvider(
StaticProvider(text="mpris"),
screenNum=1 if dualScreen else None,
alignment=Alignment.CENTER,
)
bar.addProvider(StaticProvider("C L M T B"), alignment=Alignment.RIGHT)
bar.addProvider(
StaticProvider("pulse"),
screenNum=1 if dualScreen else None,
alignment=Alignment.RIGHT,
)
bar.addProvider(
StaticProvider("network"),
screenNum=0 if dualScreen else None,
alignment=Alignment.RIGHT,
)
bar.addProvider(TimeProvider(), alignment=Alignment.RIGHT)
await bar.run() await bar.run()
# FIXME Why time not updating?
asyncio.run(main()) asyncio.run(main())
# TODO Replace while True with while bar.running or something