723 lines
20 KiB
Python
723 lines
20 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import enum
|
|
import logging
|
|
import os
|
|
import signal
|
|
import subprocess
|
|
import threading
|
|
import time
|
|
|
|
import coloredlogs
|
|
import i3ipc
|
|
|
|
from frobar.notbusy import notBusy
|
|
|
|
coloredlogs.install(level="DEBUG", fmt="%(levelname)s %(message)s")
|
|
log = logging.getLogger()
|
|
|
|
|
|
# TODO Allow deletion of Bar, BarGroup and Section for screen changes
|
|
# IDEA Use i3 ipc events rather than relying on xrandr or Xlib (less portable
|
|
# but easier)
|
|
# TODO Optimize to use write() calls instead of string concatenation (writing
|
|
# BarGroup strings should be a good compromise)
|
|
# TODO Use bytes rather than strings
|
|
# TODO Use default colors of lemonbar sometimes
|
|
# TODO Adapt bar height with font height
|
|
# TODO OPTI Static text objects that update its parents if modified
|
|
# TODO forceSize and changeText are different
|
|
|
|
|
|
class BarGroupType(enum.Enum):
|
|
LEFT = 0
|
|
RIGHT = 1
|
|
# TODO Middle
|
|
# MID_LEFT = 2
|
|
# MID_RIGHT = 3
|
|
|
|
|
|
class BarStdoutThread(threading.Thread):
|
|
def run(self) -> None:
|
|
while Bar.running:
|
|
handle = Bar.process.stdout.readline().strip()
|
|
if not len(handle):
|
|
Bar.stop()
|
|
if handle not in Bar.actionsH2F:
|
|
log.error("Unknown action: {}".format(handle))
|
|
continue
|
|
function = Bar.actionsH2F[handle]
|
|
function()
|
|
|
|
|
|
class Bar:
|
|
"""
|
|
One bar for each screen
|
|
"""
|
|
|
|
# Constants
|
|
FONTS = ["DejaVuSansM Nerd Font"]
|
|
FONTSIZE = 10
|
|
|
|
@staticmethod
|
|
def init() -> None:
|
|
Bar.running = True
|
|
Section.init()
|
|
|
|
cmd = ["lemonbar", "-b", "-a", "64"]
|
|
for font in Bar.FONTS:
|
|
cmd += ["-f", "{}:size={}".format(font, Bar.FONTSIZE)]
|
|
Bar.process = subprocess.Popen(
|
|
cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE
|
|
)
|
|
Bar.stdoutThread = BarStdoutThread()
|
|
Bar.stdoutThread.start()
|
|
|
|
# Debug
|
|
Bar(0)
|
|
# Bar(1)
|
|
|
|
@staticmethod
|
|
def stop() -> None:
|
|
Bar.running = False
|
|
Bar.process.kill()
|
|
|
|
# TODO This is not really the best way to do it I guess
|
|
os.killpg(os.getpid(), signal.SIGTERM)
|
|
|
|
@staticmethod
|
|
def run() -> None:
|
|
Bar.forever()
|
|
i3 = i3ipc.Connection()
|
|
|
|
def doStop(*args) -> None:
|
|
Bar.stop()
|
|
print(88)
|
|
|
|
try:
|
|
i3.on("ipc_shutdown", doStop)
|
|
i3.main()
|
|
except BaseException:
|
|
print(93)
|
|
Bar.stop()
|
|
|
|
# Class globals
|
|
everyone = set()
|
|
string = ""
|
|
process = None
|
|
running = False
|
|
|
|
nextHandle = 0
|
|
actionsF2H = dict()
|
|
actionsH2F = dict()
|
|
|
|
@staticmethod
|
|
def getFunctionHandle(function):
|
|
assert callable(function)
|
|
if function in Bar.actionsF2H.keys():
|
|
return Bar.actionsF2H[function]
|
|
|
|
handle = "{:x}".format(Bar.nextHandle).encode()
|
|
Bar.nextHandle += 1
|
|
|
|
Bar.actionsF2H[function] = handle
|
|
Bar.actionsH2F[handle] = function
|
|
|
|
return handle
|
|
|
|
@staticmethod
|
|
def forever():
|
|
Bar.process.wait()
|
|
Bar.stop()
|
|
|
|
def __init__(self, screen):
|
|
assert isinstance(screen, int)
|
|
self.screen = "%{S" + str(screen) + "}"
|
|
self.groups = dict()
|
|
|
|
for groupType in BarGroupType:
|
|
group = BarGroup(groupType, self)
|
|
self.groups[groupType] = group
|
|
|
|
self.childsChanged = False
|
|
|
|
self.everyone.add(self)
|
|
|
|
@staticmethod
|
|
def addSectionAll(section, group, screens=None):
|
|
"""
|
|
.. note::
|
|
Add the section before updating it for the first time.
|
|
"""
|
|
assert isinstance(section, Section)
|
|
assert isinstance(group, BarGroupType)
|
|
# TODO screens selection
|
|
for bar in Bar.everyone:
|
|
bar.addSection(section, group=group)
|
|
|
|
def addSection(self, section, group):
|
|
assert isinstance(section, Section)
|
|
assert isinstance(group, BarGroupType)
|
|
self.groups[group].addSection(section)
|
|
|
|
def update(self):
|
|
if self.childsChanged:
|
|
self.string = self.screen
|
|
self.string += self.groups[BarGroupType.LEFT].string
|
|
self.string += self.groups[BarGroupType.RIGHT].string
|
|
|
|
self.childsChanged = False
|
|
|
|
@staticmethod
|
|
def updateAll():
|
|
if Bar.running:
|
|
Bar.string = ""
|
|
for bar in Bar.everyone:
|
|
bar.update()
|
|
Bar.string += bar.string
|
|
# Color for empty sections
|
|
Bar.string += BarGroup.color(*Section.EMPTY)
|
|
|
|
# print(Bar.string)
|
|
Bar.process.stdin.write(bytes(Bar.string + "\n", "utf-8"))
|
|
Bar.process.stdin.flush()
|
|
|
|
|
|
class BarGroup:
|
|
"""
|
|
One for each group of each bar
|
|
"""
|
|
|
|
everyone = set()
|
|
|
|
def __init__(self, groupType, parent):
|
|
assert isinstance(groupType, BarGroupType)
|
|
assert isinstance(parent, Bar)
|
|
|
|
self.groupType = groupType
|
|
self.parent = parent
|
|
|
|
self.sections = list()
|
|
self.string = ""
|
|
self.parts = []
|
|
|
|
#: One of the sections that had their theme or visibility changed
|
|
self.childsThemeChanged = False
|
|
|
|
#: One of the sections that had their text (maybe their size) changed
|
|
self.childsTextChanged = False
|
|
|
|
BarGroup.everyone.add(self)
|
|
|
|
def addSection(self, section):
|
|
self.sections.append(section)
|
|
section.addParent(self)
|
|
|
|
def addSectionAfter(self, sectionRef, section):
|
|
index = self.sections.index(sectionRef)
|
|
self.sections.insert(index + 1, section)
|
|
section.addParent(self)
|
|
|
|
ALIGNS = {BarGroupType.LEFT: "%{l}", BarGroupType.RIGHT: "%{r}"}
|
|
|
|
@staticmethod
|
|
def fgColor(color):
|
|
return "%{F" + (color or "-") + "}"
|
|
|
|
@staticmethod
|
|
def bgColor(color):
|
|
return "%{B" + (color or "-") + "}"
|
|
|
|
@staticmethod
|
|
def color(fg, bg):
|
|
return BarGroup.fgColor(fg) + BarGroup.bgColor(bg)
|
|
|
|
def update(self):
|
|
if self.childsThemeChanged:
|
|
parts = [BarGroup.ALIGNS[self.groupType]]
|
|
|
|
secs = [sec for sec in self.sections if sec.visible]
|
|
lenS = len(secs)
|
|
for s in range(lenS):
|
|
sec = secs[s]
|
|
theme = Section.THEMES[sec.theme]
|
|
if self.groupType == BarGroupType.LEFT:
|
|
oSec = secs[s + 1] if s < lenS - 1 else None
|
|
else:
|
|
oSec = secs[s - 1] if s > 0 else None
|
|
oTheme = (
|
|
Section.THEMES[oSec.theme] if oSec is not None else Section.EMPTY
|
|
)
|
|
|
|
if self.groupType == BarGroupType.LEFT:
|
|
if s == 0:
|
|
parts.append(BarGroup.bgColor(theme[1]))
|
|
parts.append(BarGroup.fgColor(theme[0]))
|
|
parts.append(sec)
|
|
if theme == oTheme:
|
|
parts.append("")
|
|
else:
|
|
parts.append(BarGroup.color(theme[1], oTheme[1]) + "")
|
|
else:
|
|
if theme is oTheme:
|
|
parts.append("")
|
|
else:
|
|
parts.append(BarGroup.fgColor(theme[1]) + "")
|
|
parts.append(BarGroup.color(*theme))
|
|
parts.append(sec)
|
|
|
|
# TODO OPTI Concatenate successive strings
|
|
self.parts = parts
|
|
|
|
if self.childsTextChanged or self.childsThemeChanged:
|
|
self.string = ""
|
|
for part in self.parts:
|
|
if isinstance(part, str):
|
|
self.string += part
|
|
elif isinstance(part, Section):
|
|
self.string += part.curText
|
|
|
|
self.parent.childsChanged = True
|
|
|
|
self.childsThemeChanged = False
|
|
self.childsTextChanged = False
|
|
|
|
@staticmethod
|
|
def updateAll():
|
|
for group in BarGroup.everyone:
|
|
group.update()
|
|
Bar.updateAll()
|
|
|
|
|
|
class SectionThread(threading.Thread):
|
|
ANIMATION_START = 0.025
|
|
ANIMATION_STOP = 0.001
|
|
ANIMATION_EVOLUTION = 0.9
|
|
|
|
def run(self):
|
|
while Section.somethingChanged.wait():
|
|
notBusy.wait()
|
|
Section.updateAll()
|
|
animTime = self.ANIMATION_START
|
|
frameTime = time.perf_counter()
|
|
while len(Section.sizeChanging) > 0:
|
|
frameTime += animTime
|
|
curTime = time.perf_counter()
|
|
sleepTime = frameTime - curTime
|
|
time.sleep(sleepTime if sleepTime > 0 else 0)
|
|
Section.updateAll()
|
|
animTime *= self.ANIMATION_EVOLUTION
|
|
if animTime < self.ANIMATION_STOP:
|
|
animTime = self.ANIMATION_STOP
|
|
|
|
|
|
class Section:
|
|
# TODO Update all of that to base16
|
|
COLORS = [
|
|
"#092c0e",
|
|
"#143718",
|
|
"#5a7058",
|
|
"#677d64",
|
|
"#89947f",
|
|
"#99a08d",
|
|
"#fae2e3",
|
|
"#fff0f1",
|
|
"#e0332e",
|
|
"#cf4b15",
|
|
"#bb8801",
|
|
"#8d9800",
|
|
"#1fa198",
|
|
"#008dd1",
|
|
"#5c73c4",
|
|
"#d43982",
|
|
]
|
|
FGCOLOR = "#fff0f1"
|
|
BGCOLOR = "#092c0e"
|
|
|
|
THEMES = list()
|
|
EMPTY = (FGCOLOR, BGCOLOR)
|
|
|
|
ICON = None
|
|
PERSISTENT = False
|
|
|
|
#: Sections that do not have their destination size
|
|
sizeChanging = set()
|
|
updateThread = SectionThread(daemon=True)
|
|
somethingChanged = threading.Event()
|
|
lastChosenTheme = 0
|
|
|
|
@staticmethod
|
|
def init():
|
|
for t in range(8, 16):
|
|
Section.THEMES.append((Section.COLORS[0], Section.COLORS[t]))
|
|
Section.THEMES.append((Section.COLORS[0], Section.COLORS[3]))
|
|
Section.THEMES.append((Section.COLORS[0], Section.COLORS[6]))
|
|
|
|
Section.updateThread.start()
|
|
|
|
def __init__(self, theme=None):
|
|
#: Displayed section
|
|
#: Note: A section can be empty and displayed!
|
|
self.visible = False
|
|
|
|
if theme is None:
|
|
theme = Section.lastChosenTheme
|
|
Section.lastChosenTheme = (Section.lastChosenTheme + 1) % len(
|
|
Section.THEMES
|
|
)
|
|
self.theme = theme
|
|
|
|
#: Displayed text
|
|
self.curText = ""
|
|
#: Displayed text size
|
|
self.curSize = 0
|
|
|
|
#: Destination text
|
|
self.dstText = Text(" ", Text(), " ")
|
|
#: Destination size
|
|
self.dstSize = 0
|
|
|
|
#: Groups that have this section
|
|
self.parents = set()
|
|
|
|
self.icon = self.ICON
|
|
self.persistent = self.PERSISTENT
|
|
|
|
def __str__(self):
|
|
try:
|
|
return "<{}><{}>{:01d}{}{:02d}/{:02d}".format(
|
|
self.curText,
|
|
self.dstText,
|
|
self.theme,
|
|
"+" if self.visible else "-",
|
|
self.curSize,
|
|
self.dstSize,
|
|
)
|
|
except:
|
|
return super().__str__()
|
|
|
|
def addParent(self, parent):
|
|
self.parents.add(parent)
|
|
|
|
def appendAfter(self, section):
|
|
assert len(self.parents)
|
|
for parent in self.parents:
|
|
parent.addSectionAfter(self, section)
|
|
|
|
def informParentsThemeChanged(self):
|
|
for parent in self.parents:
|
|
parent.childsThemeChanged = True
|
|
|
|
def informParentsTextChanged(self):
|
|
for parent in self.parents:
|
|
parent.childsTextChanged = True
|
|
|
|
def updateText(self, text):
|
|
if isinstance(text, str):
|
|
text = Text(text)
|
|
elif isinstance(text, Text) and not len(text.elements):
|
|
text = None
|
|
|
|
self.dstText[0] = (
|
|
None
|
|
if (text is None and not self.persistent)
|
|
else ((" " + self.icon + " ") if self.icon else " ")
|
|
)
|
|
self.dstText[1] = text
|
|
self.dstText[2] = (
|
|
" " if self.dstText[1] is not None and len(self.dstText[1]) else None
|
|
)
|
|
|
|
self.dstSize = len(self.dstText)
|
|
self.dstText.setSection(self)
|
|
|
|
if self.curSize == self.dstSize:
|
|
if self.dstSize > 0:
|
|
self.curText = str(self.dstText)
|
|
self.informParentsTextChanged()
|
|
else:
|
|
Section.sizeChanging.add(self)
|
|
Section.somethingChanged.set()
|
|
|
|
def setDecorators(self, **kwargs):
|
|
self.dstText.setDecorators(**kwargs)
|
|
self.curText = str(self.dstText)
|
|
self.informParentsTextChanged()
|
|
Section.somethingChanged.set()
|
|
|
|
def updateTheme(self, theme):
|
|
assert isinstance(theme, int)
|
|
assert theme < len(Section.THEMES)
|
|
if theme == self.theme:
|
|
return
|
|
self.theme = theme
|
|
self.informParentsThemeChanged()
|
|
Section.somethingChanged.set()
|
|
|
|
def updateVisibility(self, visibility):
|
|
assert isinstance(visibility, bool)
|
|
|
|
self.visible = visibility
|
|
self.informParentsThemeChanged()
|
|
Section.somethingChanged.set()
|
|
|
|
@staticmethod
|
|
def fit(text, size):
|
|
t = len(text)
|
|
return text[:size] if t >= size else text + [" "] * (size - t)
|
|
|
|
def update(self):
|
|
# TODO Might profit of a better logic
|
|
if not self.visible:
|
|
self.updateVisibility(True)
|
|
return
|
|
|
|
if self.dstSize > self.curSize:
|
|
self.curSize += 1
|
|
elif self.dstSize < self.curSize:
|
|
self.curSize -= 1
|
|
else:
|
|
# Visibility toggling must be done one step after curSize = 0
|
|
if self.dstSize == 0:
|
|
self.updateVisibility(False)
|
|
Section.sizeChanging.remove(self)
|
|
return
|
|
|
|
self.curText = self.dstText.text(size=self.curSize, pad=True)
|
|
self.informParentsTextChanged()
|
|
|
|
@staticmethod
|
|
def updateAll():
|
|
"""
|
|
Process all sections for text size changes
|
|
"""
|
|
|
|
for sizeChanging in Section.sizeChanging.copy():
|
|
sizeChanging.update()
|
|
|
|
BarGroup.updateAll()
|
|
|
|
Section.somethingChanged.clear()
|
|
|
|
@staticmethod
|
|
def ramp(p, ramp=" ▁▂▃▄▅▆▇█"):
|
|
if p > 1:
|
|
return ramp[-1]
|
|
elif p < 0:
|
|
return ramp[0]
|
|
else:
|
|
return ramp[round(p * (len(ramp) - 1))]
|
|
|
|
|
|
class StatefulSection(Section):
|
|
# TODO FEAT Allow to temporary expand the section (e.g. when important change)
|
|
NUMBER_STATES = None
|
|
DEFAULT_STATE = 0
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
Section.__init__(self, *args, **kwargs)
|
|
self.state = self.DEFAULT_STATE
|
|
if hasattr(self, "onChangeState"):
|
|
self.onChangeState(self.state)
|
|
self.setDecorators(
|
|
clickLeft=self.incrementState, clickRight=self.decrementState
|
|
)
|
|
|
|
def incrementState(self):
|
|
newState = min(self.state + 1, self.NUMBER_STATES - 1)
|
|
self.changeState(newState)
|
|
|
|
def decrementState(self):
|
|
newState = max(self.state - 1, 0)
|
|
self.changeState(newState)
|
|
|
|
def changeState(self, state):
|
|
assert isinstance(state, int)
|
|
assert state < self.NUMBER_STATES
|
|
self.state = state
|
|
if hasattr(self, "onChangeState"):
|
|
self.onChangeState(state)
|
|
self.refreshData()
|
|
|
|
|
|
class ColorCountsSection(StatefulSection):
|
|
# TODO FEAT Blend colors when not expanded
|
|
# TODO FEAT Blend colors with importance of count
|
|
# TODO FEAT Allow icons instead of counts
|
|
NUMBER_STATES = 3
|
|
COLORABLE_ICON = "?"
|
|
|
|
def __init__(self, theme=None):
|
|
StatefulSection.__init__(self, theme=theme)
|
|
|
|
def fetcher(self):
|
|
counts = self.subfetcher()
|
|
# Nothing
|
|
if not len(counts):
|
|
return None
|
|
# Icon colored
|
|
elif self.state == 0 and len(counts) == 1:
|
|
count, color = counts[0]
|
|
return Text(self.COLORABLE_ICON, fg=color)
|
|
# Icon
|
|
elif self.state == 0 and len(counts) > 1:
|
|
return Text(self.COLORABLE_ICON)
|
|
# Icon + Total
|
|
elif self.state == 1 and len(counts) > 1:
|
|
total = sum([count for count, color in counts])
|
|
return Text(self.COLORABLE_ICON, " ", total)
|
|
# Icon + Counts
|
|
else:
|
|
text = Text(self.COLORABLE_ICON)
|
|
for count, color in counts:
|
|
text.append(" ", Text(count, fg=color))
|
|
return text
|
|
|
|
|
|
class Text:
|
|
def _setElements(self, elements):
|
|
# TODO OPTI Concatenate consecutrive string
|
|
self.elements = list(elements)
|
|
|
|
def _setDecorators(self, decorators):
|
|
# TODO OPTI Convert no decorator to strings
|
|
self.decorators = decorators
|
|
self.prefix = None
|
|
self.suffix = None
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
self._setElements(args)
|
|
self._setDecorators(kwargs)
|
|
self.section = None
|
|
|
|
def append(self, *args):
|
|
self._setElements(self.elements + list(args))
|
|
|
|
def prepend(self, *args):
|
|
self._setElements(list(args) + self.elements)
|
|
|
|
def setElements(self, *args):
|
|
self._setElements(args)
|
|
|
|
def setDecorators(self, **kwargs):
|
|
self._setDecorators(kwargs)
|
|
|
|
def setSection(self, section):
|
|
assert isinstance(section, Section)
|
|
self.section = section
|
|
for element in self.elements:
|
|
if isinstance(element, Text):
|
|
element.setSection(section)
|
|
|
|
def _genFixs(self):
|
|
if self.prefix is not None and self.suffix is not None:
|
|
return
|
|
|
|
self.prefix = ""
|
|
self.suffix = ""
|
|
|
|
def nest(prefix, suffix):
|
|
self.prefix = self.prefix + "%{" + prefix + "}"
|
|
self.suffix = "%{" + suffix + "}" + self.suffix
|
|
|
|
def getColor(val):
|
|
# TODO Allow themes
|
|
assert isinstance(val, str) and len(val) == 7
|
|
return val
|
|
|
|
def button(number, function):
|
|
handle = Bar.getFunctionHandle(function)
|
|
nest("A" + number + ":" + handle.decode() + ":", "A" + number)
|
|
|
|
for key, val in self.decorators.items():
|
|
if val is None:
|
|
continue
|
|
if key == "fg":
|
|
reset = self.section.THEMES[self.section.theme][0]
|
|
nest("F" + getColor(val), "F" + reset)
|
|
elif key == "bg":
|
|
reset = self.section.THEMES[self.section.theme][1]
|
|
nest("B" + getColor(val), "B" + reset)
|
|
elif key == "clickLeft":
|
|
button("1", val)
|
|
elif key == "clickMiddle":
|
|
button("2", val)
|
|
elif key == "clickRight":
|
|
button("3", val)
|
|
elif key == "scrollUp":
|
|
button("4", val)
|
|
elif key == "scrollDown":
|
|
button("5", val)
|
|
else:
|
|
log.warn("Unkown decorator: {}".format(key))
|
|
|
|
def _text(self, size=None, pad=False):
|
|
self._genFixs()
|
|
curString = self.prefix
|
|
curSize = 0
|
|
remSize = size
|
|
|
|
for element in self.elements:
|
|
if element is None:
|
|
continue
|
|
elif isinstance(element, Text):
|
|
newString, newSize = element._text(size=remSize)
|
|
else:
|
|
newString = str(element)
|
|
if remSize is not None:
|
|
newString = newString[:remSize]
|
|
newSize = len(newString)
|
|
|
|
curString += newString
|
|
curSize += newSize
|
|
|
|
if remSize is not None:
|
|
remSize -= newSize
|
|
if remSize <= 0:
|
|
break
|
|
|
|
curString += self.suffix
|
|
|
|
if pad and remSize > 0:
|
|
curString += " " * remSize
|
|
curSize += remSize
|
|
|
|
if size is not None:
|
|
if pad:
|
|
assert size == curSize
|
|
else:
|
|
assert size >= curSize
|
|
return curString, curSize
|
|
|
|
def text(self, *args, **kwargs):
|
|
string, size = self._text(*args, **kwargs)
|
|
return string
|
|
|
|
def __str__(self):
|
|
self._genFixs()
|
|
curString = self.prefix
|
|
for element in self.elements:
|
|
if element is None:
|
|
continue
|
|
else:
|
|
curString += str(element)
|
|
curString += self.suffix
|
|
return curString
|
|
|
|
def __len__(self):
|
|
curSize = 0
|
|
for element in self.elements:
|
|
if element is None:
|
|
continue
|
|
elif isinstance(element, Text):
|
|
curSize += len(element)
|
|
else:
|
|
curSize += len(str(element))
|
|
return curSize
|
|
|
|
def __getitem__(self, index):
|
|
return self.elements[index]
|
|
|
|
def __setitem__(self, index, data):
|
|
self.elements[index] = data
|