dotfiles/config/lemonbar/providers.py
2018-09-05 09:07:37 +02:00

398 lines
12 KiB
Python
Executable file

#!/usr/bin/env python3
import datetime
from updaters import *
from display import *
import pulsectl
import psutil
import subprocess
import socket
import ipaddress
import logging
import coloredlogs
import json
import i3ipc
from pprint import pprint
coloredlogs.install(level='DEBUG', fmt='%(levelname)s %(message)s')
log = logging.getLogger()
def humanSize(num):
"""
Returns a string of width 3+3
"""
for unit in ('B ','KiB','MiB','GiB','TiB','PiB','EiB','ZiB'):
if abs(num) < 1000:
if num >= 10:
return "{:3d}{}".format(int(num), unit)
else:
return "{:.1f}{}".format(num, unit)
num /= 1024.0
return "{:d}YiB".format(num)
class TimeProvider(Section, PeriodicUpdater):
def fetcher(self):
now = datetime.datetime.now()
return now.strftime('%d/%m/%y %H:%M:%S')
def __init__(self):
PeriodicUpdater.__init__(self)
Section.__init__(self)
self.changeInterval(1)
class BatteryProvider(Section, PeriodicUpdater):
RAMP = ""
def fetcher(self):
with open(self.batdir + 'status') as f:
status = f.read().strip()
if status == "Full":
return ""
elif status == "Discharging":
icon = ""
elif status == "Charging":
icon = ""
else:
log.warn("Unknwon battery status: {}".format(status))
icon = "?"
with open(self.batdir + 'capacity') as f:
capacity = int(f.read())
icon += self.ramp(capacity/100, self.RAMP)
return '{} {}%'.format(icon, capacity)
def __init__(self, battery='BAT0'):
PeriodicUpdater.__init__(self)
Section.__init__(self)
self.batdir = '/sys/class/power_supply/{}/'.format(battery)
assert os.path.isdir(self.batdir)
self.changeInterval(5)
class PulseaudioProvider(Section, ThreadedUpdater):
def __init__(self):
ThreadedUpdater.__init__(self)
Section.__init__(self)
self.pulseEvents = pulsectl.Pulse('event-handler')
self.pulseEvents.event_mask_set(pulsectl.PulseEventMaskEnum.sink)
self.pulseEvents.event_callback_set(self.handleEvent)
self.start()
self.refreshData()
def fetcher(self):
sinks = []
with pulsectl.Pulse('list-sinks') as pulse:
for sink in pulse.sink_list():
vol = pulse.volume_get_all_chans(sink)
if vol > 1:
vol = 1
if sink.port_active.name == "analog-output-headphones":
icon = ""
elif sink.port_active.name == "analog-output-speaker":
icon = ""
else:
icon = "?"
ramp = "" if sink.mute else (" " + self.ramp(vol))
sinks.append(icon + ramp)
return " ".join(sinks)
def loop(self):
self.pulseEvents.event_listen()
def handleEvent(self, ev):
self.refreshData()
class NetworkProviderSection(Section, Updater):
THEME = 5
def fetcher(self):
text = ''
if self.iface not in self.parent.stats or \
not self.parent.stats[self.iface].isup or \
self.iface.startswith('lo'):
return text
# Get addresses
ipv4 = None
ipv6 = None
for address in self.parent.addrs[self.iface]:
if address.family == socket.AF_INET:
ipv4 = address
elif address.family == socket.AF_INET6:
ipv6 = address
if ipv4 is None and ipv6 is None:
return text
# Set icon
if self.iface.startswith('eth') or self.iface.startswith('enp'):
if 'u' in self.iface:
text = ''
else:
text = ''
elif self.iface.startswith('wlan') or self.iface.startswith('wlp'):
text = ''
cmd = ["iwgetid", self.iface, "--raw"]
p = subprocess.run(cmd, stdout=subprocess.PIPE)
text += ' ' + p.stdout.strip().decode()
elif self.iface.startswith('tun') or self.iface.startswith('tap'):
text = ''
else:
text = '?'
if self.showAddress:
if ipv4:
netStrFull = '{}/{}'.format(ipv4.address, ipv4.netmask)
addr = ipaddress.IPv4Network(netStrFull, strict=False)
text += ' {}/{}'.format(ipv4.address, addr.prefixlen)
# TODO IPV6
# if ipv6:
# text += ' ' + ipv6.address
if self.showSpeed:
recvDiff = self.parent.IO[self.iface].bytes_recv \
- self.parent.prevIO[self.iface].bytes_recv
sentDiff = self.parent.IO[self.iface].bytes_sent \
- self.parent.prevIO[self.iface].bytes_sent
recvDiff /= self.parent.dt
sentDiff /= self.parent.dt
text += '{}{}'.format(humanSize(recvDiff), humanSize(sentDiff))
if self.showTransfer:
text += '{}{}'.format(
humanSize(self.parent.IO[self.iface].bytes_recv),
humanSize(self.parent.IO[self.iface].bytes_sent))
return text
def cycleState(self):
newState = (self.state + 1) % 4
self.changeState(newState)
def changeState(self, state):
assert isinstance(state, int)
assert state < 4
self.state = state
self.showAddress = state >= 1
self.showSpeed = state >= 2
self.showTransfer = state >= 3
def __init__(self, iface, parent):
Section.__init__(self, theme=self.THEME)
self.iface = iface
self.parent = parent
self.changeState(1)
self.refreshData()
class NetworkProvider(Section, PeriodicUpdater):
def fetchData(self):
self.prev = self.last
self.prevIO = self.IO
self.stats = psutil.net_if_stats()
self.addrs = psutil.net_if_addrs()
self.IO = psutil.net_io_counters(pernic=True)
self.ifaces = self.stats.keys()
self.last = time.perf_counter()
self.dt = self.last - self.prev
def refreshData(self):
self.fetchData()
lastSection = self
for iface in sorted(list(self.ifaces)):
if iface not in self.sections.keys():
section = NetworkProviderSection(iface, self)
lastSection.appendAfter(section)
self.sections[iface] = section
else:
section = self.sections[iface]
section.refreshData()
lastSection = section
def addParent(self, parent):
self.parents.add(parent)
self.refreshData()
def __init__(self):
PeriodicUpdater.__init__(self)
Section.__init__(self)
self.sections = dict()
self.last = 0
self.IO = dict()
self.fetchData()
class TodoProvider(Section, InotifyUpdater):
# TODO OPT/UX Maybe we could get more data from the todoman python module
# TODO OPT Specific callback for specific directory
def updateCalendarList(self):
calendars = sorted(os.listdir(self.dir))
for calendar in calendars:
# If the calendar wasn't in the list
if calendar not in self.calendars:
self.addPath(os.path.join(self.dir, calendar), refresh=False)
self.calendars = calendars
def __init__(self, dir):
"""
:parm str dir: [main]path value in todoman.conf
"""
InotifyUpdater.__init__(self)
Section.__init__(self)
self.dir = os.path.realpath(os.path.expanduser(dir))
assert os.path.isdir(self.dir)
self.calendars = []
self.addPath(self.dir)
def getName(self, calendar):
path = os.path.join(self.dir, calendar, 'displayname')
with open(path, 'r') as f:
name = f.read().strip()
return name
def getColor(self, calendar):
path = os.path.join(self.dir, calendar, 'color')
with open(path, 'r') as f:
color = f.read().strip()
return color
def countUndone(self, calendar):
cmd = ["todo", "--porcelain", "list", self.getName(calendar)]
proc = subprocess.run(cmd, stdout=subprocess.PIPE)
data = json.loads(proc.stdout)
return len(data)
def fetcher(self):
text = []
self.updateCalendarList()
for calendar in self.calendars:
c = self.countUndone(calendar)
if c > 0:
color = self.getColor(calendar)
text += [' ']
text += [{"text": str(c), "fgColor": color}]
if len(text):
return [''] + text
else:
return ''
class I3Provider(Section, ThreadedUpdater):
# TODO Multi-screen
THEME_NORMAL = 0
THEME_FOCUSED = 2
THEME_URGENT = 1
THEME_MODE = 1
def setName(self, section, origName):
# TODO Custom names
if origName:
section.fullName = origName
else:
section.fullName = ''
section.updateText(section.fullName)
def initialPopulation(self, parent):
"""
Called on init
Can't reuse addWorkspace since i3.get_workspaces() gives dict and not
ConObjects
"""
workspaces = self.i3.get_workspaces()
lastSection = self.modeSection
for workspace in workspaces:
# if parent.display != workspace["display"]:
# continue
theme = self.THEME_FOCUSED if workspace["focused"] \
else (self.THEME_URGENT if workspace["urgent"]
else self.THEME_NORMAL)
section = Section(theme=theme)
parent.addSectionAfter(lastSection, section)
self.setName(section, workspace["name"])
self.sections[workspace["num"]] = section
lastSection = section
def on_workspace(self, i3, e):
print(304, e.change)
def on_workspace_init(self, i3, e):
workspace = e.current
i = workspace.num
if i in self.sections:
section = self.sections[i]
else:
while i not in self.sections.keys() and i > 0:
i -= 1
prevSection = self.sections[i] if i != 0 else self.modeSection
section = Section()
self.sections[workspace.num] = section
prevSection.appendAfter(section)
self.setName(section, workspace.name)
def on_workspace_empty(self, i3, e):
workspace = e.current
section = self.sections[workspace.num]
self.setName(section, None)
def on_workspace_focus(self, i3, e):
self.sections[e.current.num].updateTheme(self.THEME_FOCUSED)
self.sections[e.old.num].updateTheme(self.THEME_NORMAL)
def on_workspace_urgent(self, i3, e):
self.sections[e.current.num].updateTheme(self.THEME_URGENT)
def on_workspace_rename(self, i3, e):
self.sections[e.current.num].updateText(e.name)
def on_mode(self, i3, e):
if e.change == 'default':
self.modeSection.updateText('')
for section in self.sections.values():
section.updateText(section.fullName)
else:
self.modeSection.updateText(e.change)
for section in self.sections.values():
section.updateText('')
def __init__(self):
ThreadedUpdater.__init__(self)
Section.__init__(self)
self.i3 = i3ipc.Connection()
self.sections = dict()
self.i3.on("workspace::init", self.on_workspace_init)
self.i3.on("workspace::focus", self.on_workspace_focus)
self.i3.on("workspace::empty", self.on_workspace_empty)
self.i3.on("workspace::urgent", self.on_workspace_urgent)
self.i3.on("workspace::rename", self.on_workspace_rename)
# TODO Un-handled/tested: reload, rename, restored, move
self.i3.on("mode", self.on_mode)
self.modeSection = Section(theme=self.THEME_MODE)
self.start()
def addParent(self, parent):
self.parents.add(parent)
parent.addSection(self.modeSection)
self.initialPopulation(parent)
def loop(self):
self.i3.main()