dotfiles/hm/desktop/frobar/frobar/providers.py

945 lines
29 KiB
Python

#!/usr/bin/env python3
import datetime
import enum
import ipaddress
import json
import logging
import os
import random
import socket
import subprocess
import time
import coloredlogs
import i3ipc
import mpd
import notmuch
import psutil
import pulsectl
from frobar.display import (ColorCountsSection, Element, Section,
StatefulSection, Text)
from frobar.updaters import (I3Updater, InotifyUpdater, MergedUpdater,
PeriodicUpdater, ThreadedUpdater, Updater)
coloredlogs.install(level="DEBUG", fmt="%(levelname)s %(message)s")
log = logging.getLogger()
# TODO Generator class (for I3WorkspacesProvider, NetworkProvider and later
# PulseaudioProvider and MpdProvider)
def humanSize(numi: int) -> str:
"""
Returns a string of width 3+3
"""
num = float(numi)
for unit in ("B ", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB"):
if abs(num) < 1000:
if num >= 10:
return "{:3d}{}".format(int(num), unit)
else:
return "{:.1f}{}".format(num, unit)
num /= 1024
return "{:d}YiB".format(numi)
def randomColor(seed: int | bytes = 0) -> str:
random.seed(seed)
return "#{:02x}{:02x}{:02x}".format(*[random.randint(0, 255) for _ in range(3)])
class TimeProvider(StatefulSection, PeriodicUpdater):
FORMATS = ["%H:%M", "%m-%d %H:%M:%S", "%a %y-%m-%d %H:%M:%S"]
NUMBER_STATES = len(FORMATS)
DEFAULT_STATE = 1
def fetcher(self) -> str:
now = datetime.datetime.now()
return now.strftime(self.FORMATS[self.state])
def __init__(self, theme: int | None = None):
PeriodicUpdater.__init__(self)
StatefulSection.__init__(self, theme)
self.changeInterval(1) # TODO OPTI When state < 1
class AlertLevel(enum.Enum):
NORMAL = 0
WARNING = 1
DANGER = 2
class AlertingSection(StatefulSection):
# TODO EASE Correct settings for themes
ALERT_THEMES = {AlertLevel.NORMAL: 3, AlertLevel.WARNING: 1, AlertLevel.DANGER: 0}
PERSISTENT = True
def getLevel(self, quantity: float) -> AlertLevel:
if quantity > self.dangerThresold:
return AlertLevel.DANGER
elif quantity > self.warningThresold:
return AlertLevel.WARNING
else:
return AlertLevel.NORMAL
def updateLevel(self, quantity: float) -> None:
self.level = self.getLevel(quantity)
self.updateTheme(self.ALERT_THEMES[self.level])
if self.level == AlertLevel.NORMAL:
return
# TODO Temporary update state
def __init__(self, theme: int | None = None):
StatefulSection.__init__(self, theme)
self.dangerThresold = 0.90
self.warningThresold = 0.75
class CpuProvider(AlertingSection, PeriodicUpdater):
NUMBER_STATES = 3
ICON = ""
def fetcher(self) -> Element:
percent = psutil.cpu_percent(percpu=False)
self.updateLevel(percent / 100)
if self.state >= 2:
percents = psutil.cpu_percent(percpu=True)
return "".join([Section.ramp(p / 100) for p in percents])
elif self.state >= 1:
return Section.ramp(percent / 100)
return ""
def __init__(self, theme: int | None = None):
AlertingSection.__init__(self, theme)
PeriodicUpdater.__init__(self)
self.changeInterval(1)
class LoadProvider(AlertingSection, PeriodicUpdater):
NUMBER_STATES = 3
ICON = ""
def fetcher(self) -> Element:
load = os.getloadavg()
self.updateLevel(load[0])
if self.state >= 2:
return " ".join(f"{load[i]:.2f}" for i in range(3))
elif self.state >= 1:
return f"{load[0]:.2f}"
return ""
def __init__(self, theme: int | None = None):
AlertingSection.__init__(self, theme)
PeriodicUpdater.__init__(self)
self.changeInterval(5)
self.warningThresold = 5
self.dangerThresold = 10
class RamProvider(AlertingSection, PeriodicUpdater):
"""
Shows free RAM
"""
NUMBER_STATES = 4
ICON = ""
def fetcher(self) -> Element:
mem = psutil.virtual_memory()
freePerc = mem.percent / 100
self.updateLevel(freePerc)
if self.state < 1:
return None
text = Text(Section.ramp(freePerc))
if self.state >= 2:
freeStr = humanSize(mem.total - mem.available)
text.append(freeStr)
if self.state >= 3:
totalStr = humanSize(mem.total)
text.append("/", totalStr)
return text
def __init__(self, theme: int | None = None):
AlertingSection.__init__(self, theme)
PeriodicUpdater.__init__(self)
self.changeInterval(1)
class TemperatureProvider(AlertingSection, PeriodicUpdater):
NUMBER_STATES = 2
RAMP = ""
MAIN_TEMPS = ["coretemp", "amdgpu", "cpu_thermal"]
# For Intel, AMD and ARM respectively.
def fetcher(self) -> Element:
allTemp = psutil.sensors_temperatures()
for main in self.MAIN_TEMPS:
if main in allTemp:
break
else:
return "?"
temp = allTemp[main][0]
self.warningThresold = temp.high or 90.0
self.dangerThresold = temp.critical or 100.0
self.updateLevel(temp.current)
self.icon = Section.ramp(temp.current / self.warningThresold, self.RAMP)
if self.state >= 1:
return "{:.0f}°C".format(temp.current)
return ""
def __init__(self, theme: int | None = None):
AlertingSection.__init__(self, theme)
PeriodicUpdater.__init__(self)
self.changeInterval(5)
class BatteryProvider(AlertingSection, PeriodicUpdater):
# TODO Support ACPID for events
NUMBER_STATES = 3
RAMP = ""
def fetcher(self) -> Element:
bat = psutil.sensors_battery()
if not bat:
return None
self.icon = ("" if bat.power_plugged else "") + Section.ramp(
bat.percent / 100, self.RAMP
)
self.updateLevel(1 - bat.percent / 100)
if self.state < 1:
return ""
t = Text("{:.0f}%".format(bat.percent))
if self.state < 2:
return t
h = int(bat.secsleft / 3600)
m = int((bat.secsleft - h * 3600) / 60)
t.append(" ({:d}:{:02d})".format(h, m))
return t
def __init__(self, theme: int | None = None):
AlertingSection.__init__(self, theme)
PeriodicUpdater.__init__(self)
self.changeInterval(5)
class XautolockProvider(Section, InotifyUpdater):
ICON = ""
def fetcher(self) -> str | None:
with open(self.path) as fd:
state = fd.read().strip()
if state == "enabled":
return None
elif state == "disabled":
return ""
else:
return "?"
def __init__(self, theme: int | None = None):
Section.__init__(self, theme=theme)
InotifyUpdater.__init__(self)
# TODO XDG
self.path = os.path.realpath(os.path.expanduser("~/.cache/xautolock"))
self.addPath(self.path)
class PulseaudioProvider(StatefulSection, ThreadedUpdater):
NUMBER_STATES = 3
DEFAULT_STATE = 1
def __init__(self, theme: int | None = None):
ThreadedUpdater.__init__(self)
StatefulSection.__init__(self, theme)
self.pulseEvents = pulsectl.Pulse("event-handler")
self.pulseEvents.event_mask_set(pulsectl.PulseEventMaskEnum.sink)
self.pulseEvents.event_callback_set(self.handleEvent)
self.start()
self.refreshData()
def fetcher(self) -> Element:
sinks = []
with pulsectl.Pulse("list-sinks") as pulse:
for sink in pulse.sink_list():
if sink.port_active.name == "analog-output-headphones":
icon = ""
elif sink.port_active.name == "analog-output-speaker":
icon = "" if sink.mute else ""
elif sink.port_active.name == "headset-output":
icon = ""
else:
icon = "?"
vol = pulse.volume_get_all_chans(sink)
fg = (sink.mute and "#333333") or (vol > 1 and "#FF0000") or None
t = Text(icon, fg=fg)
sinks.append(t)
if self.state < 1:
continue
if self.state < 2:
if not sink.mute:
ramp = " "
while vol >= 0:
ramp += self.ramp(vol if vol < 1 else 1)
vol -= 1
t.append(ramp)
else:
t.append(" {:2.0f}%".format(vol * 100))
return Text(*sinks)
def loop(self) -> None:
self.pulseEvents.event_listen()
def handleEvent(self, ev: pulsectl.PulseEventInfo) -> None:
self.refreshData()
class NetworkProviderSection(StatefulSection, Updater):
NUMBER_STATES = 5
DEFAULT_STATE = 1
def actType(self) -> None:
self.ssid = None
if self.iface.startswith("eth") or self.iface.startswith("enp"):
if "u" in self.iface:
self.icon = ""
else:
self.icon = ""
elif self.iface.startswith("wlan") or self.iface.startswith("wl"):
self.icon = ""
if self.showSsid:
cmd = ["iwgetid", self.iface, "--raw"]
p = subprocess.run(cmd, stdout=subprocess.PIPE)
self.ssid = p.stdout.strip().decode()
elif self.iface.startswith("tun") or self.iface.startswith("tap"):
self.icon = ""
elif self.iface.startswith("docker"):
self.icon = ""
elif self.iface.startswith("veth"):
self.icon = ""
elif self.iface.startswith("vboxnet"):
self.icon = ""
def getAddresses(
self,
) -> tuple[psutil._common.snicaddr, psutil._common.snicaddr]:
ipv4 = None
ipv6 = None
for address in self.parent.addrs[self.iface]:
if address.family == socket.AF_INET:
ipv4 = address
elif address.family == socket.AF_INET6:
ipv6 = address
return ipv4, ipv6
def fetcher(self) -> Element:
self.icon = "?"
self.persistent = False
if (
self.iface not in self.parent.stats
or not self.parent.stats[self.iface].isup
or self.iface.startswith("lo")
):
return None
# Get addresses
ipv4, ipv6 = self.getAddresses()
if ipv4 is None and ipv6 is None:
return None
text = []
self.persistent = True
self.actType()
if self.showSsid and self.ssid:
text.append(self.ssid)
if self.showAddress:
if ipv4:
netStrFull = "{}/{}".format(ipv4.address, ipv4.netmask)
addr = ipaddress.IPv4Network(netStrFull, strict=False)
addrStr = "{}/{}".format(ipv4.address, addr.prefixlen)
text.append(addrStr)
# TODO IPV6
# if ipv6:
# text += ' ' + ipv6.address
if self.showSpeed:
recvDiff = (
self.parent.IO[self.iface].bytes_recv
- self.parent.prevIO[self.iface].bytes_recv
)
sentDiff = (
self.parent.IO[self.iface].bytes_sent
- self.parent.prevIO[self.iface].bytes_sent
)
recvDiff /= self.parent.dt
sentDiff /= self.parent.dt
text.append("{}{}".format(humanSize(recvDiff), humanSize(sentDiff)))
if self.showTransfer:
text.append(
"{}{}".format(
humanSize(self.parent.IO[self.iface].bytes_recv),
humanSize(self.parent.IO[self.iface].bytes_sent),
)
)
return " ".join(text)
def onChangeState(self, state: int) -> None:
self.showSsid = state >= 1
self.showAddress = state >= 2
self.showSpeed = state >= 3
self.showTransfer = state >= 4
def __init__(self, iface: str, parent: "NetworkProvider"):
Updater.__init__(self)
StatefulSection.__init__(self, theme=parent.theme)
self.iface = iface
self.parent = parent
class NetworkProvider(Section, PeriodicUpdater):
def fetchData(self) -> None:
self.prev = self.last
self.prevIO = self.IO
self.stats = psutil.net_if_stats()
self.addrs: dict[str, list[psutil._common.snicaddr]] = psutil.net_if_addrs()
self.IO: dict[str, psutil._common.snetio] = psutil.net_io_counters(pernic=True)
self.ifaces = self.stats.keys()
self.last: float = time.perf_counter()
self.dt = self.last - self.prev
def fetcher(self) -> None:
self.fetchData()
# Add missing sections
lastSection: NetworkProvider | NetworkProviderSection = self
for iface in sorted(list(self.ifaces)):
if iface not in self.sections.keys():
section = NetworkProviderSection(iface, self)
lastSection.appendAfter(section)
self.sections[iface] = section
else:
section = self.sections[iface]
lastSection = section
# Refresh section text
for section in self.sections.values():
section.refreshData()
return None
def __init__(self, theme: int | None = None):
PeriodicUpdater.__init__(self)
Section.__init__(self, theme)
self.sections: dict[str, NetworkProviderSection] = dict()
self.last = 0
self.IO = dict()
self.fetchData()
self.changeInterval(5)
class RfkillProvider(Section, PeriodicUpdater):
# TODO FEAT rfkill doesn't seem to indicate that the hardware switch is
# toggled
PATH = "/sys/class/rfkill"
def fetcher(self) -> Element:
t = Text()
for device in os.listdir(self.PATH):
with open(os.path.join(self.PATH, device, "soft"), "rb") as f:
softBlocked = f.read().strip() != b"0"
with open(os.path.join(self.PATH, device, "hard"), "rb") as f:
hardBlocked = f.read().strip() != b"0"
if not hardBlocked and not softBlocked:
continue
with open(os.path.join(self.PATH, device, "type"), "rb") as f:
typ = f.read().strip()
fg = (hardBlocked and "#CCCCCC") or (softBlocked and "#FF0000") or None
if typ == b"wlan":
icon = ""
elif typ == b"bluetooth":
icon = ""
else:
icon = "?"
t.append(Text(icon, fg=fg))
return t
def __init__(self, theme: int | None = None):
PeriodicUpdater.__init__(self)
Section.__init__(self, theme)
self.changeInterval(5)
class SshAgentProvider(PeriodicUpdater):
def fetcher(self) -> Element:
cmd = ["ssh-add", "-l"]
proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
if proc.returncode != 0:
return None
text = Text()
for line in proc.stdout.split(b"\n"):
if not len(line):
continue
fingerprint = line.split()[1]
text.append(Text("", fg=randomColor(seed=fingerprint)))
return text
def __init__(self) -> None:
PeriodicUpdater.__init__(self)
self.changeInterval(5)
class GpgAgentProvider(PeriodicUpdater):
def fetcher(self) -> Element:
cmd = ["gpg-connect-agent", "keyinfo --list", "/bye"]
proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
# proc = subprocess.run(cmd)
if proc.returncode != 0:
return None
text = Text()
for line in proc.stdout.split(b"\n"):
if not len(line) or line == b"OK":
continue
spli = line.split()
if spli[6] != b"1":
continue
keygrip = spli[2]
text.append(Text("", fg=randomColor(seed=keygrip)))
return text
def __init__(self) -> None:
PeriodicUpdater.__init__(self)
self.changeInterval(5)
class KeystoreProvider(Section, MergedUpdater):
# TODO OPTI+FEAT Use ColorCountsSection and not MergedUpdater, this is useless
ICON = ""
def __init__(self, theme: int | None = None):
MergedUpdater.__init__(self, SshAgentProvider(), GpgAgentProvider())
Section.__init__(self, theme)
class NotmuchUnreadProvider(ColorCountsSection, InotifyUpdater):
COLORABLE_ICON = ""
def subfetcher(self) -> list[tuple[int, str]]:
db = notmuch.Database(mode=notmuch.Database.MODE.READ_ONLY, path=self.dir)
counts = []
for account in self.accounts:
queryStr = "folder:/{}/ and tag:unread".format(account)
query = notmuch.Query(db, queryStr)
nbMsgs = query.count_messages()
if nbMsgs < 1:
continue
counts.append((nbMsgs, self.colors[account]))
# db.close()
return counts
def __init__(self, dir: str = "~/.mail/", theme: int | None = None):
InotifyUpdater.__init__(self)
ColorCountsSection.__init__(self, theme)
self.dir = os.path.realpath(os.path.expanduser(dir))
assert os.path.isdir(self.dir)
# Fetching account list
self.accounts = sorted(
[a for a in os.listdir(self.dir) if not a.startswith(".")]
)
# Fetching colors
self.colors = dict()
for account in self.accounts:
filename = os.path.join(self.dir, account, "color")
with open(filename, "r") as f:
color = f.read().strip()
self.colors[account] = color
self.addPath(os.path.join(self.dir, ".notmuch", "xapian"))
class TodoProvider(ColorCountsSection, InotifyUpdater):
# TODO OPT/UX Maybe we could get more data from the todoman python module
# TODO OPT Specific callback for specific directory
COLORABLE_ICON = ""
def updateCalendarList(self) -> None:
calendars = sorted(os.listdir(self.dir))
for calendar in calendars:
# If the calendar wasn't in the list
if calendar not in self.calendars:
self.addPath(os.path.join(self.dir, calendar), refresh=False)
# Fetching name
path = os.path.join(self.dir, calendar, "displayname")
with open(path, "r") as f:
self.names[calendar] = f.read().strip()
# Fetching color
path = os.path.join(self.dir, calendar, "color")
with open(path, "r") as f:
self.colors[calendar] = f.read().strip()
self.calendars: list[str] = calendars
def __init__(self, dir: str, theme: int | None = None):
"""
:parm str dir: [main]path value in todoman.conf
"""
InotifyUpdater.__init__(self)
ColorCountsSection.__init__(self, theme=theme)
self.dir = os.path.realpath(os.path.expanduser(dir))
assert os.path.isdir(self.dir)
self.calendars = []
self.colors: dict[str, str] = dict()
self.names: dict[str, str] = dict()
self.updateCalendarList()
self.refreshData()
def countUndone(self, calendar: str | None) -> int:
cmd = ["todo", "--porcelain", "list"]
if calendar:
cmd.append(self.names[calendar])
proc = subprocess.run(cmd, stdout=subprocess.PIPE)
data = json.loads(proc.stdout)
return len(data)
def subfetcher(self) -> list[tuple[int, str]]:
counts = []
# TODO This an ugly optimisation that cuts on features, but todoman
# calls are very expensive so we keep that in the meanwhile
if self.state < 2:
c = self.countUndone(None)
if c > 0:
counts.append((c, "#00000"))
counts.append((0, "#FFFFF"))
return counts
# Optimisation ends here
for calendar in self.calendars:
c = self.countUndone(calendar)
if c <= 0:
continue
counts.append((c, self.colors[calendar]))
return counts
class I3WindowTitleProvider(Section, I3Updater):
# TODO FEAT To make this available from start, we need to find the
# `focused=True` element following the `focus` array
# TODO Feat Make this output dependant if wanted
def on_window(self, i3: i3ipc.Connection, e: i3ipc.Event) -> None:
self.updateText(e.container.name)
def __init__(self, theme: int | None = None):
I3Updater.__init__(self)
Section.__init__(self, theme=theme)
self.on("window", self.on_window)
class I3WorkspacesProviderSection(Section):
def selectTheme(self) -> int:
if self.workspace.urgent:
return self.parent.themeUrgent
elif self.workspace.focused:
return self.parent.themeFocus
elif self.workspace.visible:
return self.parent.themeVisible
else:
return self.parent.themeNormal
# TODO On mode change the state (shown / hidden) gets overriden so every
# tab is shown
def show(self) -> None:
self.updateTheme(self.selectTheme())
self.updateText(
self.fullName if self.workspace.focused else self.workspace.name
)
def switchTo(self) -> None:
self.parent.i3.command("workspace {}".format(self.workspace.name))
def updateWorkspace(self, workspace: i3ipc.WorkspaceReply) -> None:
self.workspace = workspace
self.fullName: str = self.parent.customNames.get(workspace.name, workspace.name)
self.show()
def __init__(self, parent: "I3WorkspacesProvider"):
Section.__init__(self)
self.parent = parent
self.setDecorators(clickLeft=self.switchTo)
self.tempText: Element = None
def empty(self) -> None:
self.updateTheme(self.parent.themeNormal)
self.updateText(None)
def tempShow(self) -> None:
self.updateText(self.tempText)
def tempEmpty(self) -> None:
self.tempText = self.dstText[1]
self.updateText(None)
class I3WorkspacesProvider(Section, I3Updater):
def updateWorkspace(self, workspace: i3ipc.WorkspaceReply) -> None:
section: Section | None = None
lastSectionOnOutput = self.modeSection
highestNumOnOutput = -1
for sect in self.sections.values():
if sect.workspace.num == workspace.num:
section = sect
break
elif (
sect.workspace.num > highestNumOnOutput
and sect.workspace.num < workspace.num
and sect.workspace.output == workspace.output
):
lastSectionOnOutput = sect
highestNumOnOutput = sect.workspace.num
else:
section = I3WorkspacesProviderSection(self)
self.sections[workspace.num] = section
for bargroup in self.parents:
if bargroup.parent.output == workspace.output:
break
else:
bargroup = list(self.parents)[0]
bargroup.addSectionAfter(lastSectionOnOutput, section)
section.updateWorkspace(workspace)
def updateWorkspaces(self) -> None:
workspaces = self.i3.get_workspaces()
for workspace in workspaces:
self.updateWorkspace(workspace)
def added(self) -> None:
super().added()
self.appendAfter(self.modeSection)
self.updateWorkspaces()
def on_workspace_change(self, i3: i3ipc.Connection, e: i3ipc.Event) -> None:
self.updateWorkspaces()
def on_workspace_empty(self, i3: i3ipc.Connection, e: i3ipc.Event) -> None:
self.sections[e.current.num].empty()
def on_mode(self, i3: i3ipc.Connection, e: i3ipc.Event) -> None:
if e.change == "default":
self.modeSection.updateText(None)
for section in self.sections.values():
section.tempShow()
else:
self.modeSection.updateText(e.change)
for section in self.sections.values():
section.tempEmpty()
def __init__(
self,
theme: int = 0,
themeVisible: int = 4,
themeFocus: int = 3,
themeUrgent: int = 1,
themeMode: int = 2,
customNames: dict[str, str] = dict(),
):
I3Updater.__init__(self)
Section.__init__(self)
self.themeNormal = theme
self.themeFocus = themeFocus
self.themeUrgent = themeUrgent
self.themeVisible = themeVisible
self.customNames = customNames
self.sections: dict[int, I3WorkspacesProviderSection] = dict()
# The event object doesn't have the visible property,
# so we have to fetch the list of workspaces anyways.
# This sacrifices a bit of performance for code simplicity.
self.on("workspace::init", self.on_workspace_change)
self.on("workspace::focus", self.on_workspace_change)
self.on("workspace::empty", self.on_workspace_empty)
self.on("workspace::urgent", self.on_workspace_change)
self.on("workspace::rename", self.on_workspace_change)
# TODO Un-handled/tested: reload, rename, restored, move
self.on("mode", self.on_mode)
self.modeSection = Section(theme=themeMode)
class MpdProvider(Section, ThreadedUpdater):
# TODO FEAT More informations and controls
MAX_LENGTH = 50
def connect(self) -> None:
self.mpd.connect("localhost", 6600)
def __init__(self, theme: int | None = None):
ThreadedUpdater.__init__(self)
Section.__init__(self, theme)
self.mpd = mpd.MPDClient()
self.connect()
self.refreshData()
self.start()
def fetcher(self) -> Element:
stat = self.mpd.status()
if not len(stat) or stat["state"] == "stop":
return None
cur = self.mpd.currentsong()
if not len(cur):
return None
infos = []
def tryAdd(field: str) -> None:
if field in cur:
infos.append(cur[field])
tryAdd("title")
tryAdd("album")
tryAdd("artist")
infosStr = " - ".join(infos)
if len(infosStr) > MpdProvider.MAX_LENGTH:
infosStr = infosStr[: MpdProvider.MAX_LENGTH - 1] + ""
return "{}".format(infosStr)
def loop(self) -> None:
try:
self.mpd.idle("player")
self.refreshData()
except mpd.base.ConnectionError as e:
log.warn(e, exc_info=True)
self.connect()
except BaseException as e:
log.error(e, exc_info=True)
class MprisProviderSection(Section, Updater):
def __init__(self, parent: "MprisProvider"):
Updater.__init__(self)
Section.__init__(self, theme=parent.theme)
self.parent = parent
class MprisProvider(Section, ThreadedUpdater):
# TODO Controls (select player at least)
# TODO Use the Python native thing for it:
# https://github.com/altdesktop/playerctl?tab=readme-ov-file#using-the-library
# TODO Make it less sucky
SECTIONS = [
"{{ playerName }} {{ status }}",
"{{ album }}",
"{{ artist }}",
"{{ duration(position) }}|{{ duration(mpris:length) }}"
" {{ title }}",
]
# nf-fd icons don't work (UTF-16?)
SUBSTITUTIONS = {
"Playing": "",
"Paused": "",
"Stopped": "",
"mpd": "",
"firefox": "",
"chromium": "",
"mpv": "",
}
ICONS = {
1: "",
2: "",
3: "",
}
def __init__(self, theme: int | None = None):
ThreadedUpdater.__init__(self)
Section.__init__(self, theme)
self.line = ""
self.start()
self.sections: list[Section] = []
def fetcher(self) -> Element:
create = not len(self.sections)
populate = self.line
split = self.line.split("\t")
lastSection: Section = self
for i in range(len(self.SECTIONS)):
if create:
section = Section(theme=self.theme)
lastSection.appendAfter(section)
lastSection = section
self.sections.append(section)
else:
section = self.sections[i]
if populate:
text = split[i]
if i == 0:
for key, val in self.SUBSTITUTIONS.items():
text = text.replace(key, val)
if text:
if i in self.ICONS:
text = f"{self.ICONS[i]} {text}"
section.updateText(text)
else:
section.updateText(None)
else:
section.updateText(None)
return None
def loop(self) -> None:
cmd = [
"playerctl",
"metadata",
"--format",
"\t".join(self.SECTIONS),
"--follow",
]
p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
assert p.stdout
while p.poll() is None:
self.line = p.stdout.readline().decode().strip()
self.refreshData()