#!/usr/bin/env python3 import datetime from updaters import * from display import * import pulsectl import psutil import subprocess import socket import ipaddress import logging import coloredlogs import json import notmuch import mpd import random coloredlogs.install(level='DEBUG', fmt='%(levelname)s %(message)s') log = logging.getLogger() def humanSize(num): """ Returns a string of width 3+3 """ for unit in ('B ','KiB','MiB','GiB','TiB','PiB','EiB','ZiB'): if abs(num) < 1000: if num >= 10: return "{:3d}{}".format(int(num), unit) else: return "{:.1f}{}".format(num, unit) num /= 1024.0 return "{:d}YiB".format(num) def randomColor(seed=0): random.seed(seed) return '#{:02x}{:02x}{:02x}'.format(*[random.randint(0, 255) for _ in range(3)]) class TimeProvider(StatefulSection, PeriodicUpdater): FORMATS = ["%H:%M", "%d/%m %H:%M:%S", "%a %d/%m/%y %H:%M:%S"] NUMBER_STATES = len(FORMATS) def fetcher(self): now = datetime.datetime.now() return now.strftime(self.FORMATS[self.state]) def __init__(self, theme=None): PeriodicUpdater.__init__(self) StatefulSection.__init__(self, theme) self.changeInterval(1) # TODO OPTI When state < 1 class AlertLevel(enum.Enum): NORMAL = 0 WARNING = 1 DANGER = 2 class AlertingSection(StatefulSection): # TODO EASE Correct settings for themes THEMES = {AlertLevel.NORMAL: 2, AlertLevel.WARNING: 3, AlertLevel.DANGER: 1} PERSISTENT = True def getLevel(self, quantity): if quantity > self.dangerThresold: return AlertLevel.DANGER elif quantity > self.warningThresold: return AlertLevel.WARNING else: return AlertLevel.NORMAL def updateLevel(self, quantity): self.level = self.getLevel(quantity) self.updateTheme(self.THEMES[self.level]) if self.level == AlertLevel.NORMAL: return # TODO Temporary update state def __init__(self, theme): StatefulSection.__init__(self, theme) self.dangerThresold = 0.90 self.warningThresold = 0.75 class CpuProvider(AlertingSection, PeriodicUpdater): NUMBER_STATES = 3 ICON = '' def fetcher(self): percent = psutil.cpu_percent(percpu=False) self.updateLevel(percent/100) if self.state >= 2: percents = psutil.cpu_percent(percpu=True) return ''.join([Section.ramp(p/100) for p in percents]) elif self.state >= 1: return Section.ramp(percent/100) def __init__(self, theme=None, themeDanger=None, themeCritical=None): AlertingSection.__init__(self, theme) PeriodicUpdater.__init__(self) self.changeInterval(1) class RamProvider(Section, PeriodicUpdater): # TODO Use AlertingSection """ Shows free RAM """ def fetcher(self): mem = psutil.virtual_memory() ramp = Section.ramp(1-mem.percent/100) free = humanSize(mem.available) theme = self.themeCritical if mem.percent >= 90 else \ (self.themeDanger if mem.percent >= 75 else self.themeNormal) self.updateTheme(theme) return ' {}{}'.format(ramp, free) def __init__(self, theme=None, themeDanger=None, themeCritical=None): PeriodicUpdater.__init__(self) Section.__init__(self, theme) self.themeNormal = theme or self.theme self.themeDanger = themeDanger or self.theme self.themeCritical = themeCritical or self.theme self.changeInterval(1) class TemperatureProvider(Section, PeriodicUpdater): # TODO Use AlertingSection RAMP = "" def fetcher(self): allTemp = psutil.sensors_temperatures() if 'coretemp' not in allTemp: # TODO Opti Remove interval return '' temp = allTemp['coretemp'][0] theme = self.themeCritical if temp.current >= temp.critical else \ (self.themeDanger if temp.current >= temp.high else self.themeNormal) self.updateTheme(theme) ramp = Section.ramp(temp.current/temp.high, self.RAMP) return '{} {:.0f}°C'.format(ramp, temp.current) def __init__(self, theme=None, themeDanger=None, themeCritical=None): PeriodicUpdater.__init__(self) Section.__init__(self, theme=theme) self.themeNormal = theme or self.theme self.themeDanger = themeDanger or self.theme self.themeCritical = themeCritical or self.theme self.changeInterval(5) class BatteryProvider(Section, PeriodicUpdater): # TODO Use AlertingSection RAMP = "" def fetcher(self): bat = psutil.sensors_battery() text = '' if not bat: return text if bat.power_plugged: text += "" text += Section.ramp(bat.percent/100, self.RAMP) if bat.percent < 100: text += ' {:.0f}%'.format(bat.percent) theme = self.themeCritical if bat.percent < 10 else \ (self.themeDanger if bat.percent < 25 else self.themeNormal) self.updateTheme(theme) return text def __init__(self, theme=None, themeDanger=None, themeCritical=None): PeriodicUpdater.__init__(self) Section.__init__(self, theme) self.themeNormal = theme or self.theme self.themeDanger = themeDanger or self.theme self.themeCritical = themeCritical or self.theme self.changeInterval(5) class PulseaudioProvider(Section, ThreadedUpdater): def __init__(self, theme=None): ThreadedUpdater.__init__(self) Section.__init__(self, theme) self.pulseEvents = pulsectl.Pulse('event-handler') self.pulseEvents.event_mask_set(pulsectl.PulseEventMaskEnum.sink) self.pulseEvents.event_callback_set(self.handleEvent) self.start() self.refreshData() def fetcher(self): sinks = [] with pulsectl.Pulse('list-sinks') as pulse: for sink in pulse.sink_list(): vol = pulse.volume_get_all_chans(sink) if vol > 1: vol = 1 if sink.port_active.name == "analog-output-headphones": icon = "" elif sink.port_active.name == "analog-output-speaker": icon = "" else: icon = "?" ramp = "" if sink.mute else (" " + self.ramp(vol)) sinks.append(icon + ramp) return " ".join(sinks) def loop(self): self.pulseEvents.event_listen() def handleEvent(self, ev): self.refreshData() class NetworkProviderSection(StatefulSection, Updater): NUMBER_STATES = 4 def actType(self): self.ssid = None if self.iface.startswith('eth') or self.iface.startswith('enp'): if 'u' in self.iface: self.icon = '' else: self.icon = '' elif self.iface.startswith('wlan') or self.iface.startswith('wlp'): self.icon = '' if self.showSsid: cmd = ["iwgetid", self.iface, "--raw"] p = subprocess.run(cmd, stdout=subprocess.PIPE) self.ssid = p.stdout.strip().decode() elif self.iface.startswith('tun') or self.iface.startswith('tap'): self.icon = '' else: self.icon = '?' def getAddresses(self): ipv4 = None ipv6 = None for address in self.parent.addrs[self.iface]: if address.family == socket.AF_INET: ipv4 = address elif address.family == socket.AF_INET6: ipv6 = address return ipv4, ipv6 def fetcher(self): if self.iface not in self.parent.stats or \ not self.parent.stats[self.iface].isup or \ self.iface.startswith('lo'): return None # Get addresses ipv4, ipv6 = self.getAddresses() if ipv4 is None and ipv6 is None: return None text = [] self.actType() if self.showSsid and self.ssid: text.append(self.ssid) if self.showAddress: if ipv4: netStrFull = '{}/{}'.format(ipv4.address, ipv4.netmask) addr = ipaddress.IPv4Network(netStrFull, strict=False) addrStr = '{}/{}'.format(ipv4.address, addr.prefixlen) text.append(addrStr) # TODO IPV6 # if ipv6: # text += ' ' + ipv6.address if self.showSpeed: recvDiff = self.parent.IO[self.iface].bytes_recv \ - self.parent.prevIO[self.iface].bytes_recv sentDiff = self.parent.IO[self.iface].bytes_sent \ - self.parent.prevIO[self.iface].bytes_sent recvDiff /= self.parent.dt sentDiff /= self.parent.dt text.append('↓{}↑{}'.format(humanSize(recvDiff), humanSize(sentDiff))) if self.showTransfer: text.append('⇓{}⇑{}'.format( humanSize(self.parent.IO[self.iface].bytes_recv), humanSize(self.parent.IO[self.iface].bytes_sent))) return ' '.join(text) def onChangeState(self, state): self.showSsid = state >= 0 self.showAddress = state >= 1 self.showSpeed = state >= 2 self.showTransfer = state >= 3 def __init__(self, iface, parent): Updater.__init__(self) StatefulSection.__init__(self, theme=parent.theme) self.iface = iface self.parent = parent class NetworkProvider(Section, PeriodicUpdater): def fetchData(self): self.prev = self.last self.prevIO = self.IO self.stats = psutil.net_if_stats() self.addrs = psutil.net_if_addrs() self.IO = psutil.net_io_counters(pernic=True) self.ifaces = self.stats.keys() self.last = time.perf_counter() self.dt = self.last - self.prev def fetcher(self): self.fetchData() # Add missing sections lastSection = self for iface in sorted(list(self.ifaces)): if iface not in self.sections.keys(): section = NetworkProviderSection(iface, self) lastSection.appendAfter(section) self.sections[iface] = section else: section = self.sections[iface] lastSection = section # Refresh section text for section in self.sections.values(): section.refreshData() return None def addParent(self, parent): self.parents.add(parent) self.refreshData() def __init__(self, theme=None): PeriodicUpdater.__init__(self) Section.__init__(self, theme) self.sections = dict() self.last = 0 self.IO = dict() self.fetchData() self.changeInterval(5) class SshAgentProvider(PeriodicUpdater): def fetcher(self): cmd = ["ssh-add", "-l"] proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL) if proc.returncode != 0: return None text = Text() for line in proc.stdout.split(b'\n'): if not len(line): continue fingerprint = line.split()[1] text.append(Text('', fg=randomColor(seed=fingerprint))) return text def __init__(self): PeriodicUpdater.__init__(self) self.changeInterval(5) class GpgAgentProvider(PeriodicUpdater): def fetcher(self): cmd = ["gpg-connect-agent", "keyinfo --list", "/bye"] proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL) # proc = subprocess.run(cmd) if proc.returncode != 0: return None text = Text() for line in proc.stdout.split(b'\n'): if not len(line) or line == b'OK': continue spli = line.split() if spli[6] != b'1': continue keygrip = spli[2] text.append(Text('', fg=randomColor(seed=keygrip))) return text def __init__(self): PeriodicUpdater.__init__(self) self.changeInterval(5) class KeystoreProvider(Section, MergedUpdater): ICON = '' def __init__(self, theme=None): MergedUpdater.__init__(self, SshAgentProvider(), GpgAgentProvider()) Section.__init__(self, theme) class NotmuchUnreadProvider(ColorCountsSection, PeriodicUpdater): # TODO OPTI Transform InotifyUpdater (watching notmuch folder should be # enough) COLORABLE_ICON = '' def subfetcher(self): db = notmuch.Database(mode=notmuch.Database.MODE.READ_ONLY, path=self.dir) counts = [] for account in self.accounts: queryStr = 'folder:/{}/ and tag:unread'.format(account) query = notmuch.Query(db, queryStr) nbMsgs = query.count_messages() if nbMsgs < 1: continue counts.append((nbMsgs, self.colors[account])) db.close() return counts def __init__(self, dir='~/.mail/', theme=None): PeriodicUpdater.__init__(self) ColorCountsSection.__init__(self, theme) self.dir = os.path.realpath(os.path.expanduser(dir)) assert os.path.isdir(self.dir) # Fetching account list self.accounts = sorted([a for a in os.listdir(self.dir) if not a.startswith('.')]) # Fetching colors self.colors = dict() for account in self.accounts: filename = os.path.join(self.dir, account, 'color') with open(filename, 'r') as f: color = f.read().strip() self.colors[account] = color self.changeInterval(10) class TodoProvider(ColorCountsSection, InotifyUpdater): # TODO OPT/UX Maybe we could get more data from the todoman python module # TODO OPT Specific callback for specific directory COLORABLE_ICON = '' def updateCalendarList(self): calendars = sorted(os.listdir(self.dir)) for calendar in calendars: # If the calendar wasn't in the list if calendar not in self.calendars: self.addPath(os.path.join(self.dir, calendar), refresh=False) self.calendars = calendars def __init__(self, dir, theme=None): """ :parm str dir: [main]path value in todoman.conf """ InotifyUpdater.__init__(self) ColorCountsSection.__init__(self, theme=theme) self.dir = os.path.realpath(os.path.expanduser(dir)) assert os.path.isdir(self.dir) self.calendars = [] self.addPath(self.dir) def getName(self, calendar): path = os.path.join(self.dir, calendar, 'displayname') with open(path, 'r') as f: name = f.read().strip() return name def getColor(self, calendar): path = os.path.join(self.dir, calendar, 'color') with open(path, 'r') as f: color = f.read().strip() return color def countUndone(self, calendar): cmd = ["todo", "--porcelain", "list", self.getName(calendar)] proc = subprocess.run(cmd, stdout=subprocess.PIPE) data = json.loads(proc.stdout) return len(data) def subfetcher(self): counts = [] self.updateCalendarList() for calendar in self.calendars: c = self.countUndone(calendar) if c <= 0: continue counts.append((c, self.getColor(calendar))) return counts class I3WindowTitleProvider(Section, I3Updater): # TODO FEAT To make this available from start, we need to find the # `focused=True` element following the `focus` array # TODO Feat Make this output dependant if wanted def on_window(self, i3, e): self.updateText(e.container.name) def __init__(self, theme=None): I3Updater.__init__(self) Section.__init__(self, theme=theme) self.on("window", self.on_window) class I3WorkspacesProvider(Section, I3Updater): # TODO Multi-screen THEME_NORMAL = 0 THEME_FOCUSED = 2 THEME_URGENT = 1 THEME_MODE = 1 def setName(self, section, origName): # TODO Custom names if origName: section.fullName = origName else: section.fullName = '' section.updateText(section.fullName) def initialPopulation(self, parent): """ Called on init Can't reuse addWorkspace since i3.get_workspaces() gives dict and not ConObjects """ workspaces = self.i3.get_workspaces() lastSection = self.modeSection for workspace in workspaces: # if parent.display != workspace["display"]: # continue theme = self.themeFocus if workspace["focused"] \ else (self.themeUrgent if workspace["urgent"] else self.theme) section = Section(theme=theme) parent.addSectionAfter(lastSection, section) self.setName(section, workspace["name"]) self.sections[workspace["num"]] = section lastSection = section def on_workspace_init(self, i3, e): workspace = e.current i = workspace.num if i in self.sections: section = self.sections[i] else: while i not in self.sections.keys() and i > 0: i -= 1 prevSection = self.sections[i] if i != 0 else self.modeSection section = Section() self.sections[workspace.num] = section prevSection.appendAfter(section) self.setName(section, workspace.name) def on_workspace_empty(self, i3, e): workspace = e.current section = self.sections[workspace.num] self.setName(section, None) def on_workspace_focus(self, i3, e): self.sections[e.current.num].updateTheme(self.themeFocus) self.sections[e.old.num].updateTheme(self.theme) def on_workspace_urgent(self, i3, e): self.sections[e.current.num].updateTheme(self.themeUrgent) def on_workspace_rename(self, i3, e): self.sections[e.current.num].updateText(e.name) def on_mode(self, i3, e): if e.change == 'default': self.modeSection.updateText('') for section in self.sections.values(): section.updateText(section.fullName) else: self.modeSection.updateText(e.change) for section in self.sections.values(): section.updateText('') def __init__(self, theme=0, themeFocus=3, themeUrgent=1, themeMode=2): I3Updater.__init__(self) Section.__init__(self) self.theme = theme self.themeFocus = themeFocus self.themeUrgent = themeUrgent self.sections = dict() self.on("workspace::init", self.on_workspace_init) self.on("workspace::focus", self.on_workspace_focus) self.on("workspace::empty", self.on_workspace_empty) self.on("workspace::urgent", self.on_workspace_urgent) self.on("workspace::rename", self.on_workspace_rename) # TODO Un-handled/tested: reload, rename, restored, move self.on("mode", self.on_mode) self.modeSection = Section(theme=themeMode) def addParent(self, parent): self.parents.add(parent) parent.addSection(self.modeSection) self.initialPopulation(parent) class MpdProvider(Section, ThreadedUpdater): # TODO FEAT More informations and controls MAX_LENGTH = 50 def connect(self): self.mpd.connect('localhost', 6600) def __init__(self, theme=None): ThreadedUpdater.__init__(self) Section.__init__(self, theme) self.mpd = mpd.MPDClient() self.connect() self.refreshData() self.start() def fetcher(self): cur = self.mpd.currentsong() if not len(cur): return '' infos = [] def tryAdd(field): if field in cur: infos.append(cur[field]) tryAdd("title") tryAdd("album") tryAdd("artist") infosStr = " - ".join(infos) if len(infosStr) > MpdProvider.MAX_LENGTH: infosStr = infosStr[:MpdProvider.MAX_LENGTH-1] + '…' return " {}".format(infosStr) def loop(self): try: self.mpd.idle('player') self.refreshData() except mpd.base.ConnectionError as e: log.warn(e, exc_info=True) self.connect() except BaseException as e: log.error(e, exc_info=True)