Compare commits
2 commits
139d3a3ea8
...
6690f3aa0d
Author | SHA1 | Date | |
---|---|---|---|
Geoffrey Frogeye | 6690f3aa0d | ||
Geoffrey Frogeye | 7d60269b49 |
|
@ -20,27 +20,15 @@ class ComposableText:
|
||||||
assert isinstance(prevParent, Bar)
|
assert isinstance(prevParent, Bar)
|
||||||
self.bar: Bar = prevParent
|
self.bar: Bar = prevParent
|
||||||
|
|
||||||
self.text: str
|
def updateMarkup(self) -> None:
|
||||||
self.needsComposition = True
|
|
||||||
|
|
||||||
def composeText(self) -> str:
|
|
||||||
raise NotImplementedError(f"{self} cannot compose text")
|
|
||||||
|
|
||||||
def getText(self) -> str:
|
|
||||||
if self.needsComposition:
|
|
||||||
self.text = self.composeText()
|
|
||||||
print(f"{self} composed {self.text}")
|
|
||||||
self.needsComposition = False
|
|
||||||
return self.text
|
|
||||||
|
|
||||||
def setText(self, text: str) -> None:
|
|
||||||
self.text = text
|
|
||||||
self.needsComposition = False
|
|
||||||
parent = self.parent
|
|
||||||
while parent and not parent.needsComposition:
|
|
||||||
parent.needsComposition = True
|
|
||||||
parent = parent.parent
|
|
||||||
self.bar.refresh.set()
|
self.bar.refresh.set()
|
||||||
|
# OPTI See if worth caching the output
|
||||||
|
|
||||||
|
def generateMarkup(self) -> str:
|
||||||
|
raise NotImplementedError(f"{self} cannot generate markup")
|
||||||
|
|
||||||
|
def getMarkup(self) -> str:
|
||||||
|
return self.generateMarkup()
|
||||||
|
|
||||||
|
|
||||||
def randomColor(seed: int | bytes | None = None) -> str:
|
def randomColor(seed: int | bytes | None = None) -> str:
|
||||||
|
@ -57,6 +45,53 @@ class Section(ComposableText):
|
||||||
def __init__(self, parent: "Module") -> None:
|
def __init__(self, parent: "Module") -> None:
|
||||||
super().__init__(parent=parent)
|
super().__init__(parent=parent)
|
||||||
self.color = randomColor()
|
self.color = randomColor()
|
||||||
|
self.text: str = ""
|
||||||
|
self.size = 0
|
||||||
|
self.animationTask: asyncio.Task | None = None
|
||||||
|
|
||||||
|
def isHidden(self) -> bool:
|
||||||
|
return self.text is None
|
||||||
|
|
||||||
|
# Geometric series
|
||||||
|
ANIM_A = 0.025
|
||||||
|
ANIM_R = 0.9
|
||||||
|
|
||||||
|
async def animate(self) -> None:
|
||||||
|
targetSize = len(self.text)
|
||||||
|
increment = 1 if self.size < targetSize else -1
|
||||||
|
|
||||||
|
loop = asyncio.get_running_loop()
|
||||||
|
frameTime = loop.time()
|
||||||
|
animTime = self.ANIM_A
|
||||||
|
|
||||||
|
while self.size != targetSize:
|
||||||
|
self.size += increment
|
||||||
|
self.updateMarkup()
|
||||||
|
|
||||||
|
animTime *= self.ANIM_R
|
||||||
|
frameTime += animTime
|
||||||
|
sleepTime = frameTime - loop.time()
|
||||||
|
|
||||||
|
# In case of stress, skip refreshing by not awaiting
|
||||||
|
if sleepTime > 0:
|
||||||
|
await asyncio.sleep(sleepTime)
|
||||||
|
|
||||||
|
def setText(self, text: str | None) -> None:
|
||||||
|
# OPTI Skip if same text
|
||||||
|
oldText = self.text
|
||||||
|
self.text = f" {text} "
|
||||||
|
if oldText == self.text:
|
||||||
|
return
|
||||||
|
if len(oldText) == len(self.text):
|
||||||
|
self.updateMarkup()
|
||||||
|
else:
|
||||||
|
if self.animationTask:
|
||||||
|
self.animationTask.cancel()
|
||||||
|
self.animationTask = self.bar.taskGroup.create_task(self.animate())
|
||||||
|
|
||||||
|
def generateMarkup(self) -> str:
|
||||||
|
pad = max(0, self.size - len(self.text))
|
||||||
|
return self.text[: self.size] + " " * pad
|
||||||
|
|
||||||
|
|
||||||
class Module(ComposableText):
|
class Module(ComposableText):
|
||||||
|
@ -67,9 +102,23 @@ class Module(ComposableText):
|
||||||
def __init__(self, parent: "Side") -> None:
|
def __init__(self, parent: "Side") -> None:
|
||||||
super().__init__(parent=parent)
|
super().__init__(parent=parent)
|
||||||
self.sections: list[Section] = []
|
self.sections: list[Section] = []
|
||||||
|
self.mirroring: Module | None = None
|
||||||
|
self.mirrors: list[Module] = list()
|
||||||
|
|
||||||
def appendSection(self, section: Section) -> None:
|
def mirror(self, module: "Module") -> None:
|
||||||
self.sections.append(section)
|
self.mirroring = module
|
||||||
|
module.mirrors.append(self)
|
||||||
|
|
||||||
|
def getSections(self) -> list[Section]:
|
||||||
|
if self.mirroring:
|
||||||
|
return self.mirroring.sections
|
||||||
|
else:
|
||||||
|
return self.sections
|
||||||
|
|
||||||
|
def updateMarkup(self) -> None:
|
||||||
|
super().updateMarkup()
|
||||||
|
for mirror in self.mirrors:
|
||||||
|
mirror.updateMarkup()
|
||||||
|
|
||||||
|
|
||||||
class Alignment(enum.Enum):
|
class Alignment(enum.Enum):
|
||||||
|
@ -84,13 +133,15 @@ class Side(ComposableText):
|
||||||
self.alignment = alignment
|
self.alignment = alignment
|
||||||
self.modules: list[Module] = []
|
self.modules: list[Module] = []
|
||||||
|
|
||||||
def composeText(self) -> str:
|
def generateMarkup(self) -> str:
|
||||||
if not self.modules:
|
if not self.modules:
|
||||||
return ""
|
return ""
|
||||||
text = "%{" + self.alignment.value + "}"
|
text = "%{" + self.alignment.value + "}"
|
||||||
lastSection: Section | None = None
|
lastSection: Section | None = None
|
||||||
for module in self.modules:
|
for module in self.modules:
|
||||||
for section in module.sections:
|
for section in module.getSections():
|
||||||
|
if section.isHidden():
|
||||||
|
continue
|
||||||
if lastSection is None:
|
if lastSection is None:
|
||||||
if self.alignment == Alignment.LEFT:
|
if self.alignment == Alignment.LEFT:
|
||||||
text += "%{B" + section.color + "}%{F-}"
|
text += "%{B" + section.color + "}%{F-}"
|
||||||
|
@ -108,9 +159,7 @@ class Side(ComposableText):
|
||||||
else:
|
else:
|
||||||
text += "%{R}%{B" + section.color + "}"
|
text += "%{R}%{B" + section.color + "}"
|
||||||
text += "%{F-}"
|
text += "%{F-}"
|
||||||
text += " " + section.getText() + " "
|
text += section.getMarkup()
|
||||||
# FIXME Should be handled by animation
|
|
||||||
# FIXME Support hidden sections
|
|
||||||
lastSection = section
|
lastSection = section
|
||||||
if self.alignment != Alignment.RIGHT:
|
if self.alignment != Alignment.RIGHT:
|
||||||
text += "%{R}%{B-}"
|
text += "%{R}%{B-}"
|
||||||
|
@ -126,15 +175,12 @@ class Screen(ComposableText):
|
||||||
for alignment in Alignment:
|
for alignment in Alignment:
|
||||||
self.sides[alignment] = Side(parent=self, alignment=alignment)
|
self.sides[alignment] = Side(parent=self, alignment=alignment)
|
||||||
|
|
||||||
def composeText(self) -> str:
|
def generateMarkup(self) -> str:
|
||||||
return ("%{Sn" + self.output + "}") + "".join(
|
return ("%{Sn" + self.output + "}") + "".join(
|
||||||
side.getText() for side in self.sides.values()
|
side.getMarkup() for side in self.sides.values()
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
ScreenSelection = enum.Enum("ScreenSelection", ["ALL", "PRIMARY", "SECONDARY"])
|
|
||||||
|
|
||||||
|
|
||||||
class Bar(ComposableText):
|
class Bar(ComposableText):
|
||||||
"""
|
"""
|
||||||
Top-level
|
Top-level
|
||||||
|
@ -142,8 +188,9 @@ class Bar(ComposableText):
|
||||||
|
|
||||||
def __init__(self) -> None:
|
def __init__(self) -> None:
|
||||||
super().__init__()
|
super().__init__()
|
||||||
self.providers: list["Provider"] = []
|
|
||||||
self.refresh = asyncio.Event()
|
self.refresh = asyncio.Event()
|
||||||
|
self.taskGroup = asyncio.TaskGroup()
|
||||||
|
self.providers: list["Provider"] = list()
|
||||||
|
|
||||||
self.screens = []
|
self.screens = []
|
||||||
i3 = i3ipc.Connection()
|
i3 = i3ipc.Connection()
|
||||||
|
@ -154,45 +201,43 @@ class Bar(ComposableText):
|
||||||
self.screens.append(screen)
|
self.screens.append(screen)
|
||||||
|
|
||||||
async def run(self) -> None:
|
async def run(self) -> None:
|
||||||
proc = await asyncio.create_subprocess_exec(
|
cmd = [
|
||||||
"lemonbar",
|
"lemonbar",
|
||||||
"-b",
|
"-b",
|
||||||
"-a",
|
"-a",
|
||||||
"64",
|
"64",
|
||||||
"-f",
|
"-f",
|
||||||
"DejaVuSansM Nerd Font:size=10",
|
"DejaVuSansM Nerd Font:size=10",
|
||||||
stdin=asyncio.subprocess.PIPE,
|
]
|
||||||
)
|
proc = await asyncio.create_subprocess_exec(*cmd, stdin=asyncio.subprocess.PIPE)
|
||||||
|
|
||||||
async def refresher() -> None:
|
async def refresher() -> None:
|
||||||
assert proc.stdin
|
assert proc.stdin
|
||||||
while True:
|
while True:
|
||||||
await self.refresh.wait()
|
await self.refresh.wait()
|
||||||
self.refresh.clear()
|
self.refresh.clear()
|
||||||
proc.stdin.write(self.getText().encode())
|
proc.stdin.write(self.getMarkup().encode())
|
||||||
|
|
||||||
async with asyncio.TaskGroup() as tg:
|
async with self.taskGroup as tg:
|
||||||
tg.create_task(refresher())
|
tg.create_task(refresher())
|
||||||
for updater in self.providers:
|
for provider in self.providers:
|
||||||
tg.create_task(updater.run())
|
tg.create_task(provider.run())
|
||||||
|
|
||||||
def composeText(self) -> str:
|
def generateMarkup(self) -> str:
|
||||||
return "".join(section.getText() for section in self.screens) + "\n"
|
return "".join(section.getMarkup() for section in self.screens) + "\n"
|
||||||
|
|
||||||
def addProvider(
|
def addProvider(
|
||||||
self,
|
self,
|
||||||
provider: "Provider",
|
provider: "Provider",
|
||||||
alignment: Alignment = Alignment.LEFT,
|
alignment: Alignment = Alignment.LEFT,
|
||||||
screenSelection: ScreenSelection = ScreenSelection.ALL,
|
screenNum: int | None = None,
|
||||||
) -> None:
|
) -> None:
|
||||||
# FIXME Actually have a screenNum and screenCount args
|
"""
|
||||||
|
screenNum: the provider will be added on this screen if set, all otherwise
|
||||||
|
"""
|
||||||
modules = list()
|
modules = list()
|
||||||
for s, screen in enumerate(self.screens):
|
for s, screen in enumerate(self.screens):
|
||||||
if (
|
if screenNum is None or s == screenNum:
|
||||||
screenSelection == ScreenSelection.ALL
|
|
||||||
or (screenSelection == ScreenSelection.PRIMARY and s == 0)
|
|
||||||
or (screenSelection == ScreenSelection.SECONDARY and s == 1)
|
|
||||||
):
|
|
||||||
side = screen.sides[alignment]
|
side = screen.sides[alignment]
|
||||||
module = Module(parent=side)
|
module = Module(parent=side)
|
||||||
side.modules.append(module)
|
side.modules.append(module)
|
||||||
|
@ -210,32 +255,79 @@ class Provider:
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
|
|
||||||
class TimeProvider(Provider):
|
class MirrorProvider(Provider):
|
||||||
|
def __init__(self) -> None:
|
||||||
|
super().__init__()
|
||||||
|
self.module: Module
|
||||||
|
|
||||||
async def run(self) -> None:
|
async def run(self) -> None:
|
||||||
sections = list()
|
self.module = self.modules[0]
|
||||||
for module in self.modules:
|
for module in self.modules[1:]:
|
||||||
section = Section(parent=module)
|
module.mirror(self.module)
|
||||||
module.sections.append(section)
|
|
||||||
sections.append(section)
|
|
||||||
# FIXME Allow for mirror(ed) modules so no need for updaters to handle all
|
class SingleSectionProvider(MirrorProvider):
|
||||||
|
def __init__(self) -> None:
|
||||||
|
super().__init__()
|
||||||
|
self.section: Section
|
||||||
|
|
||||||
|
async def run(self) -> None:
|
||||||
|
await super().run()
|
||||||
|
self.section = Section(parent=self.module)
|
||||||
|
self.module.sections.append(self.section)
|
||||||
|
|
||||||
|
|
||||||
|
class StaticProvider(SingleSectionProvider):
|
||||||
|
def __init__(self, text: str) -> None:
|
||||||
|
self.text = text
|
||||||
|
|
||||||
|
async def run(self) -> None:
|
||||||
|
await super().run()
|
||||||
|
self.section.setText(self.text)
|
||||||
|
|
||||||
|
|
||||||
|
class TimeProvider(SingleSectionProvider):
|
||||||
|
async def run(self) -> None:
|
||||||
|
await super().run()
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
for s, section in enumerate(sections):
|
now = datetime.datetime.now()
|
||||||
now = datetime.datetime.now()
|
# section.setText(now.strftime("%a %y-%m-%d %H:%M:%S.%f"))
|
||||||
section.setText(now.strftime("%a %y-%m-%d %H:%M:%S"))
|
self.section.setText("-" * (now.second % 10))
|
||||||
await asyncio.sleep(1)
|
remaining = 1 - now.microsecond / 1000000
|
||||||
|
await asyncio.sleep(remaining)
|
||||||
|
|
||||||
|
|
||||||
async def main() -> None:
|
async def main() -> None:
|
||||||
bar = Bar()
|
bar = Bar()
|
||||||
bar.addProvider(TimeProvider())
|
dualScreen = len(bar.screens) > 1
|
||||||
# bar.addProvider(TimeProvider(), screenSelection=ScreenSelection.PRIMARY)
|
|
||||||
# bar.addProvider(TimeProvider(), alignment=Alignment.CENTER)
|
bar.addProvider(StaticProvider(text="i3 workspaces"), alignment=Alignment.LEFT)
|
||||||
# bar.addProvider(TimeProvider(), alignment=Alignment.CENTER)
|
if dualScreen:
|
||||||
# bar.addProvider(TimeProvider(), alignment=Alignment.RIGHT)
|
bar.addProvider(
|
||||||
# bar.addProvider(TimeProvider(), alignment=Alignment.RIGHT)
|
StaticProvider(text="i3 title"), screenNum=0, alignment=Alignment.CENTER
|
||||||
|
)
|
||||||
|
bar.addProvider(
|
||||||
|
StaticProvider(text="mpris"),
|
||||||
|
screenNum=1 if dualScreen else None,
|
||||||
|
alignment=Alignment.CENTER,
|
||||||
|
)
|
||||||
|
|
||||||
|
bar.addProvider(StaticProvider("C L M T B"), alignment=Alignment.RIGHT)
|
||||||
|
bar.addProvider(
|
||||||
|
StaticProvider("pulse"),
|
||||||
|
screenNum=1 if dualScreen else None,
|
||||||
|
alignment=Alignment.RIGHT,
|
||||||
|
)
|
||||||
|
bar.addProvider(
|
||||||
|
StaticProvider("network"),
|
||||||
|
screenNum=0 if dualScreen else None,
|
||||||
|
alignment=Alignment.RIGHT,
|
||||||
|
)
|
||||||
|
bar.addProvider(TimeProvider(), alignment=Alignment.RIGHT)
|
||||||
|
|
||||||
await bar.run()
|
await bar.run()
|
||||||
# FIXME Why time not updating?
|
|
||||||
|
|
||||||
|
|
||||||
asyncio.run(main())
|
asyncio.run(main())
|
||||||
|
# TODO Replace while True with while bar.running or something
|
||||||
|
|
Loading…
Reference in a new issue