diff --git a/config/i3/screentime b/config/i3/screentime index 9104117..1cbd1b5 100755 --- a/config/i3/screentime +++ b/config/i3/screentime @@ -20,27 +20,28 @@ class ScreenTime: line["date"] = now.timestamp() print("WROTE", line) - with open(self.csv_path, 'a') as typedfd: + with open(self.csv_path, "a") as typedfd: writer = csv.DictWriter(typedfd, fieldnames=self.FIELDS) writer.writerow(line) - def on_window_event(self, _: i3ipc.connection.Connection, - e: i3ipc.events.WindowEvent) -> None: + def on_window_event( + self, _: i3ipc.connection.Connection, e: i3ipc.events.WindowEvent + ) -> None: focused = self.i3.get_tree().find_focused() - self.write({ - "type": "window_" + e.change, - "class": focused.window_class, - "role": focused.window_role, - "title": focused.window_title, - "instance": focused.window_instance, - }) + self.write( + { + "type": "window_" + e.change, + "class": focused.window_class, + "role": focused.window_role, + "title": focused.window_title, + "instance": focused.window_instance, + } + ) - def on_mode_event(self, _: i3ipc.connection.Connection, - e: i3ipc.events.ModeEvent) -> None: - self.write({ - "type": "mode", - "title": e.change - }) + def on_mode_event( + self, _: i3ipc.connection.Connection, e: i3ipc.events.ModeEvent + ) -> None: + self.write({"type": "mode", "title": e.change}) def __init__(self) -> None: self.i3 = i3ipc.Connection() @@ -48,11 +49,11 @@ class ScreenTime: self.i3.on(i3ipc.Event.MODE, self.on_mode_event) self.csv_path = os.path.join( - os.path.expanduser( - os.getenv('XDG_CACHE_PATH', '~/.cache/')), - 'screentime.csv') + os.path.expanduser(os.getenv("XDG_CACHE_PATH", "~/.cache/")), + "screentime.csv", + ) if not os.path.isfile(self.csv_path): - with open(self.csv_path, 'w') as typedfd: + with open(self.csv_path, "w") as typedfd: writer = csv.DictWriter(typedfd, fieldnames=self.FIELDS) writer.writeheader() self.write({"type": "start"}) @@ -61,6 +62,6 @@ class ScreenTime: self.i3.main() -if __name__ == '__main__': +if __name__ == "__main__": ST = ScreenTime() ST.main() diff --git a/config/lemonbar/bar.py b/config/lemonbar/bar.py index 4b55aa7..45127ef 100755 --- a/config/lemonbar/bar.py +++ b/config/lemonbar/bar.py @@ -11,14 +11,23 @@ if __name__ == "__main__": WORKSPACE_THEME = 0 FOCUS_THEME = 3 URGENT_THEME = 1 - CUSTOM_SUFFIXES = '▲■' + CUSTOM_SUFFIXES = "▲■" customNames = dict() for i in range(len(CUSTOM_SUFFIXES)): - short = str(i+1) - full = short + ' ' + CUSTOM_SUFFIXES[i] + short = str(i + 1) + full = short + " " + CUSTOM_SUFFIXES[i] customNames[short] = full - Bar.addSectionAll(I3WorkspacesProvider(theme=WORKSPACE_THEME, themeFocus=FOCUS_THEME, themeUrgent=URGENT_THEME, themeMode=URGENT_THEME, customNames=customNames), BarGroupType.LEFT) + Bar.addSectionAll( + I3WorkspacesProvider( + theme=WORKSPACE_THEME, + themeFocus=FOCUS_THEME, + themeUrgent=URGENT_THEME, + themeMode=URGENT_THEME, + customNames=customNames, + ), + BarGroupType.LEFT, + ) # TODO Middle Bar.addSectionAll(MpdProvider(theme=7), BarGroupType.LEFT) diff --git a/config/lemonbar/display.py b/config/lemonbar/display.py index b1803bd..d30f273 100755 --- a/config/lemonbar/display.py +++ b/config/lemonbar/display.py @@ -11,7 +11,7 @@ import logging import coloredlogs import updaters -coloredlogs.install(level='DEBUG', fmt='%(levelname)s %(message)s') +coloredlogs.install(level="DEBUG", fmt="%(levelname)s %(message)s") log = logging.getLogger() @@ -62,12 +62,13 @@ class Bar: Bar.running = True Section.init() - cmd = ['lemonbar', '-b', '-a', '64'] + cmd = ["lemonbar", "-b", "-a", "64"] for font in Bar.FONTS: cmd += ["-f", "{}:size={}".format(font, Bar.FONTSIZE)] - Bar.process = subprocess.Popen(cmd, stdin=subprocess.PIPE, - stdout=subprocess.PIPE) - Bar.stdoutThread = BarStdoutThread() + Bar.process = subprocess.Popen( + cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE + ) + Bar.stdoutThread = BarStdoutThread() Bar.stdoutThread.start() # Debug @@ -92,7 +93,7 @@ class Bar: print(88) try: - i3.on('ipc_shutdown', doStop) + i3.on("ipc_shutdown", doStop) i3.main() except BaseException: print(93) @@ -114,7 +115,7 @@ class Bar: if function in Bar.actionsF2H.keys(): return Bar.actionsF2H[function] - handle = '{:x}'.format(Bar.nextHandle).encode() + handle = "{:x}".format(Bar.nextHandle).encode() Bar.nextHandle += 1 Bar.actionsF2H[function] = handle @@ -177,7 +178,7 @@ class Bar: Bar.string += BarGroup.color(*Section.EMPTY) # print(Bar.string) - Bar.process.stdin.write(bytes(Bar.string + '\n', 'utf-8')) + Bar.process.stdin.write(bytes(Bar.string + "\n", "utf-8")) Bar.process.stdin.flush() @@ -196,7 +197,7 @@ class BarGroup: self.parent = parent self.sections = list() - self.string = '' + self.string = "" self.parts = [] #: One of the sections that had their theme or visibility changed @@ -220,11 +221,11 @@ class BarGroup: @staticmethod def fgColor(color): - return "%{F" + (color or '-') + "}" + return "%{F" + (color or "-") + "}" @staticmethod def bgColor(color): - return "%{B" + (color or '-') + "}" + return "%{B" + (color or "-") + "}" @staticmethod def color(fg, bg): @@ -243,8 +244,9 @@ class BarGroup: oSec = secs[s + 1] if s < lenS - 1 else None else: oSec = secs[s - 1] if s > 0 else None - oTheme = Section.THEMES[oSec.theme] \ - if oSec is not None else Section.EMPTY + oTheme = ( + Section.THEMES[oSec.theme] if oSec is not None else Section.EMPTY + ) if self.groupType == BarGroupType.LEFT: if s == 0: @@ -263,7 +265,6 @@ class BarGroup: parts.append(BarGroup.color(*theme)) parts.append(sec) - # TODO OPTI Concatenate successive strings self.parts = parts @@ -315,11 +316,26 @@ class Section: # COLORS = ['#272822', '#383830', '#49483e', '#75715e', '#a59f85', '#f8f8f2', # '#f5f4f1', '#f9f8f5', '#f92672', '#fd971f', '#f4bf75', '#a6e22e', # '#a1efe4', '#66d9ef', '#ae81ff', '#cc6633'] - COLORS = ['#181818', '#AB4642', '#A1B56C', '#F7CA88', '#7CAFC2', '#BA8BAF', - '#86C1B9', '#D8D8D8', '#585858', '#AB4642', '#A1B56C', '#F7CA88', - '#7CAFC2', '#BA8BAF', '#86C1B9', '#F8F8F8'] - FGCOLOR = '#F8F8F2' - BGCOLOR = '#272822' + COLORS = [ + "#181818", + "#AB4642", + "#A1B56C", + "#F7CA88", + "#7CAFC2", + "#BA8BAF", + "#86C1B9", + "#D8D8D8", + "#585858", + "#AB4642", + "#A1B56C", + "#F7CA88", + "#7CAFC2", + "#BA8BAF", + "#86C1B9", + "#F8F8F8", + ] + FGCOLOR = "#F8F8F2" + BGCOLOR = "#272822" THEMES = list() EMPTY = (FGCOLOR, BGCOLOR) @@ -347,17 +363,18 @@ class Section: if theme is None: theme = Section.lastChosenTheme - Section.lastChosenTheme = (Section.lastChosenTheme + 1) \ - % len(Section.THEMES) + Section.lastChosenTheme = (Section.lastChosenTheme + 1) % len( + Section.THEMES + ) self.theme = theme #: Displayed text - self.curText = '' + self.curText = "" #: Displayed text size self.curSize = 0 #: Destination text - self.dstText = Text(' ', Text(), ' ') + self.dstText = Text(" ", Text(), " ") #: Destination size self.dstSize = 0 @@ -367,13 +384,16 @@ class Section: self.icon = self.ICON self.persistent = self.PERSISTENT - def __str__(self): try: - return "<{}><{}>{:01d}{}{:02d}/{:02d}" \ - .format(self.curText, self.dstText, - self.theme, "+" if self.visible else "-", - self.curSize, self.dstSize) + return "<{}><{}>{:01d}{}{:02d}/{:02d}".format( + self.curText, + self.dstText, + self.theme, + "+" if self.visible else "-", + self.curSize, + self.dstSize, + ) except: return super().__str__() @@ -399,9 +419,15 @@ class Section: elif isinstance(text, Text) and not len(text.elements): text = None - self.dstText[0] = None if (text is None and not self.persistent) else ((' ' + self.icon + ' ') if self.icon else ' ') + self.dstText[0] = ( + None + if (text is None and not self.persistent) + else ((" " + self.icon + " ") if self.icon else " ") + ) self.dstText[1] = text - self.dstText[2] = ' ' if self.dstText[1] is not None and len(self.dstText[1]) else None + self.dstText[2] = ( + " " if self.dstText[1] is not None and len(self.dstText[1]) else None + ) self.dstSize = len(self.dstText) self.dstText.setSection(self) @@ -481,7 +507,7 @@ class Section: elif p < 0: return ramp[0] else: - return ramp[round(p * (len(ramp)-1))] + return ramp[round(p * (len(ramp) - 1))] class StatefulSection(Section): @@ -492,10 +518,11 @@ class StatefulSection(Section): def __init__(self, *args, **kwargs): Section.__init__(self, *args, **kwargs) self.state = self.DEFAULT_STATE - if hasattr(self, 'onChangeState'): + if hasattr(self, "onChangeState"): self.onChangeState(self.state) - self.setDecorators(clickLeft=self.incrementState, - clickRight=self.decrementState) + self.setDecorators( + clickLeft=self.incrementState, clickRight=self.decrementState + ) def incrementState(self): newState = min(self.state + 1, self.NUMBER_STATES - 1) @@ -509,16 +536,17 @@ class StatefulSection(Section): assert isinstance(state, int) assert state < self.NUMBER_STATES self.state = state - if hasattr(self, 'onChangeState'): + if hasattr(self, "onChangeState"): self.onChangeState(state) self.refreshData() + class ColorCountsSection(StatefulSection): # TODO FEAT Blend colors when not expanded # TODO FEAT Blend colors with importance of count # TODO FEAT Allow icons instead of counts NUMBER_STATES = 3 - COLORABLE_ICON = '?' + COLORABLE_ICON = "?" def __init__(self, theme=None): StatefulSection.__init__(self, theme=theme) @@ -538,12 +566,12 @@ class ColorCountsSection(StatefulSection): # Icon + Total elif self.state == 1 and len(counts) > 1: total = sum([count for count, color in counts]) - return Text(self.COLORABLE_ICON, ' ', total) + return Text(self.COLORABLE_ICON, " ", total) # Icon + Counts else: text = Text(self.COLORABLE_ICON) for count, color in counts: - text.append(' ', Text(count, fg=color)) + text.append(" ", Text(count, fg=color)) return text @@ -586,12 +614,12 @@ class Text: if self.prefix is not None and self.suffix is not None: return - self.prefix = '' - self.suffix = '' + self.prefix = "" + self.suffix = "" def nest(prefix, suffix): - self.prefix = self.prefix + '%{' + prefix + '}' - self.suffix = '%{' + suffix + '}' + self.suffix + self.prefix = self.prefix + "%{" + prefix + "}" + self.suffix = "%{" + suffix + "}" + self.suffix def getColor(val): # TODO Allow themes @@ -600,27 +628,27 @@ class Text: def button(number, function): handle = Bar.getFunctionHandle(function) - nest('A' + number + ':' + handle.decode() + ':', 'A' + number) + nest("A" + number + ":" + handle.decode() + ":", "A" + number) for key, val in self.decorators.items(): if val is None: continue - if key == 'fg': + if key == "fg": reset = self.section.THEMES[self.section.theme][0] - nest('F' + getColor(val), 'F' + reset) - elif key == 'bg': + nest("F" + getColor(val), "F" + reset) + elif key == "bg": reset = self.section.THEMES[self.section.theme][1] - nest('B' + getColor(val), 'B' + reset) + nest("B" + getColor(val), "B" + reset) elif key == "clickLeft": - button('1', val) + button("1", val) elif key == "clickMiddle": - button('2', val) + button("2", val) elif key == "clickRight": - button('3', val) + button("3", val) elif key == "scrollUp": - button('4', val) + button("4", val) elif key == "scrollDown": - button('5', val) + button("5", val) else: log.warn("Unkown decorator: {}".format(key)) @@ -652,7 +680,7 @@ class Text: curString += self.suffix if pad and remSize > 0: - curString += ' ' * remSize + curString += " " * remSize curSize += remSize if size is not None: @@ -688,7 +716,6 @@ class Text: curSize += len(str(element)) return curSize - def __getitem__(self, index): return self.elements[index] diff --git a/config/lemonbar/oldbar.py b/config/lemonbar/oldbar.py index 2b36ad5..4032e22 100755 --- a/config/lemonbar/oldbar.py +++ b/config/lemonbar/oldbar.py @@ -7,150 +7,188 @@ Debugging script import i3ipc import os import psutil + # import alsaaudio from time import time import subprocess i3 = i3ipc.Connection() -lemonbar = subprocess.Popen(['lemonbar', '-b'], stdin=subprocess.PIPE) +lemonbar = subprocess.Popen(["lemonbar", "-b"], stdin=subprocess.PIPE) # Utils def upChart(p): - block = ' ▁▂▃▄▅▆▇█' - return block[round(p * (len(block)-1))] + block = " ▁▂▃▄▅▆▇█" + return block[round(p * (len(block) - 1))] -def humanSizeOf(num, suffix='B'): # TODO Credit - for unit in ['','Ki','Mi','Gi','Ti','Pi','Ei','Zi']: + +def humanSizeOf(num, suffix="B"): # TODO Credit + for unit in ["", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"]: if abs(num) < 1024.0: return "%3.0f%2s%s" % (num, unit, suffix) num /= 1024.0 - return "%.0f%2s%s" % (num, 'Yi', suffix) + return "%.0f%2s%s" % (num, "Yi", suffix) + # Values -mode = '' +mode = "" container = i3.get_tree().find_focused() workspaces = i3.get_workspaces() outputs = i3.get_outputs() -username = os.environ['USER'] -hostname = os.environ['HOSTNAME'] -if '-' in hostname: - hostname = hostname.split('-')[-1] +username = os.environ["USER"] +hostname = os.environ["HOSTNAME"] +if "-" in hostname: + hostname = hostname.split("-")[-1] oldNetIO = dict() oldTime = time() + def update(): - activeOutputs = sorted(sorted(list(filter(lambda o: o.active, outputs)), key=lambda o: o.rect.y), key=lambda o: o.rect.x) - z = '' + activeOutputs = sorted( + sorted(list(filter(lambda o: o.active, outputs)), key=lambda o: o.rect.y), + key=lambda o: o.rect.x, + ) + z = "" for aOutput in range(len(activeOutputs)): output = activeOutputs[aOutput] # Mode || Workspaces t = [] - if (mode != ''): + if mode != "": t.append(mode) else: - t.append(' '.join([(w.name.upper() if w.focused else w.name) for w in workspaces if w.output == output.name])) + t.append( + " ".join( + [ + (w.name.upper() if w.focused else w.name) + for w in workspaces + if w.output == output.name + ] + ) + ) # Windows Title - #if container: + # if container: # t.append(container.name) # CPU - t.append('C' + ''.join([upChart(p/100) for p in psutil.cpu_percent(percpu=True)])) + t.append( + "C" + "".join([upChart(p / 100) for p in psutil.cpu_percent(percpu=True)]) + ) # Memory - t.append('M' + str(round(psutil.virtual_memory().percent)) + '% ' + - 'S' + str(round(psutil.swap_memory().percent)) + '%') + t.append( + "M" + + str(round(psutil.virtual_memory().percent)) + + "% " + + "S" + + str(round(psutil.swap_memory().percent)) + + "%" + ) # Disks d = [] for disk in psutil.disk_partitions(): - e = '' - if disk.device.startswith('/dev/sd'): - e += 'S' + disk.device[-2:].upper() - elif disk.device.startswith('/dev/mmcblk'): - e += 'M' + disk.device[-3] + disk.device[-1] + e = "" + if disk.device.startswith("/dev/sd"): + e += "S" + disk.device[-2:].upper() + elif disk.device.startswith("/dev/mmcblk"): + e += "M" + disk.device[-3] + disk.device[-1] else: - e += '?' - e += ' ' - e += str(round(psutil.disk_usage(disk.mountpoint).percent)) + '%' + e += "?" + e += " " + e += str(round(psutil.disk_usage(disk.mountpoint).percent)) + "%" d.append(e) - t.append(' '.join(d)) + t.append(" ".join(d)) # Network netStats = psutil.net_if_stats() netIO = psutil.net_io_counters(pernic=True) net = [] - for iface in filter(lambda i: i != 'lo' and netStats[i].isup, netStats.keys()): - s = '' - if iface.startswith('eth'): - s += 'E' - elif iface.startswith('wlan'): - s += 'W' + for iface in filter(lambda i: i != "lo" and netStats[i].isup, netStats.keys()): + s = "" + if iface.startswith("eth"): + s += "E" + elif iface.startswith("wlan"): + s += "W" else: - s += '?' + s += "?" - s += ' ' + s += " " now = time() global oldNetIO, oldTime - sent = ((oldNetIO[iface].bytes_sent if iface in oldNetIO else 0) - (netIO[iface].bytes_sent if iface in netIO else 0)) / (oldTime - now) - recv = ((oldNetIO[iface].bytes_recv if iface in oldNetIO else 0) - (netIO[iface].bytes_recv if iface in netIO else 0)) / (oldTime - now) - s += '↓' + humanSizeOf(abs(recv), 'B/s') + ' ↑' + humanSizeOf(abs(sent), 'B/s') + sent = ( + (oldNetIO[iface].bytes_sent if iface in oldNetIO else 0) + - (netIO[iface].bytes_sent if iface in netIO else 0) + ) / (oldTime - now) + recv = ( + (oldNetIO[iface].bytes_recv if iface in oldNetIO else 0) + - (netIO[iface].bytes_recv if iface in netIO else 0) + ) / (oldTime - now) + s += ( + "↓" + + humanSizeOf(abs(recv), "B/s") + + " ↑" + + humanSizeOf(abs(sent), "B/s") + ) oldNetIO = netIO oldTime = now net.append(s) - t.append(' '.join(net)) + t.append(" ".join(net)) # Battery - if os.path.isdir('/sys/class/power_supply/BAT0'): - with open('/sys/class/power_supply/BAT0/charge_now') as f: + if os.path.isdir("/sys/class/power_supply/BAT0"): + with open("/sys/class/power_supply/BAT0/charge_now") as f: charge_now = int(f.read()) - with open('/sys/class/power_supply/BAT0/charge_full_design') as f: + with open("/sys/class/power_supply/BAT0/charge_full_design") as f: charge_full = int(f.read()) - t.append('B' + str(round(100*charge_now/charge_full)) + '%') + t.append("B" + str(round(100 * charge_now / charge_full)) + "%") # Volume # t.append('V ' + str(alsaaudio.Mixer('Master').getvolume()[0]) + '%') - t.append(username + '@' + hostname) + t.append(username + "@" + hostname) # print(' - '.join(t)) # t = [output.name] - z += ' - '.join(t) + '%{S' + str(aOutput + 1) + '}' - #lemonbar.stdin.write(bytes(' - '.join(t), 'utf-8')) - #lemonbar.stdin.write(bytes('%{S' + str(aOutput + 1) + '}', 'utf-8')) + z += " - ".join(t) + "%{S" + str(aOutput + 1) + "}" + # lemonbar.stdin.write(bytes(' - '.join(t), 'utf-8')) + # lemonbar.stdin.write(bytes('%{S' + str(aOutput + 1) + '}', 'utf-8')) - lemonbar.stdin.write(bytes(z+'\n', 'utf-8')) + lemonbar.stdin.write(bytes(z + "\n", "utf-8")) lemonbar.stdin.flush() + # Event listeners def on_mode(i3, e): global mode - if (e.change == 'default'): - mode = '' - else : + if e.change == "default": + mode = "" + else: mode = e.change update() + i3.on("mode", on_mode) -#def on_window_focus(i3, e): +# def on_window_focus(i3, e): # global container # container = e.container # update() # -#i3.on("window::focus", on_window_focus) +# i3.on("window::focus", on_window_focus) + def on_workspace_focus(i3, e): global workspaces workspaces = i3.get_workspaces() update() + i3.on("workspace::focus", on_workspace_focus) # Starting diff --git a/config/lemonbar/pip.py b/config/lemonbar/pip.py index 655b6b2..7cabff3 100755 --- a/config/lemonbar/pip.py +++ b/config/lemonbar/pip.py @@ -16,22 +16,37 @@ import difflib FONT = "DejaVu Sans Mono for Powerline" # TODO Update to be in sync with base16 -thm = ['#002b36', '#dc322f', '#859900', '#b58900', '#268bd2', '#6c71c4', - '#2aa198', '#93a1a1', '#657b83', '#dc322f', '#859900', '#b58900', - '#268bd2', '#6c71c4', '#2aa198', '#fdf6e3'] -fg = '#93a1a1' -bg = '#002b36' +thm = [ + "#002b36", + "#dc322f", + "#859900", + "#b58900", + "#268bd2", + "#6c71c4", + "#2aa198", + "#93a1a1", + "#657b83", + "#dc322f", + "#859900", + "#b58900", + "#268bd2", + "#6c71c4", + "#2aa198", + "#fdf6e3", +] +fg = "#93a1a1" +bg = "#002b36" THEMES = { - 'CENTER': (fg, bg), - 'DEFAULT': (thm[0], thm[8]), - '1': (thm[0], thm[9]), - '2': (thm[0], thm[10]), - '3': (thm[0], thm[11]), - '4': (thm[0], thm[12]), - '5': (thm[0], thm[13]), - '6': (thm[0], thm[14]), - '7': (thm[0], thm[15]), + "CENTER": (fg, bg), + "DEFAULT": (thm[0], thm[8]), + "1": (thm[0], thm[9]), + "2": (thm[0], thm[10]), + "3": (thm[0], thm[11]), + "4": (thm[0], thm[12]), + "5": (thm[0], thm[13]), + "6": (thm[0], thm[14]), + "7": (thm[0], thm[15]), } # Utils @@ -49,7 +64,7 @@ def fitText(text, size): diff = size - t return text + " " * diff else: - return '' + return "" def fgColor(theme): @@ -63,20 +78,20 @@ def bgColor(theme): class Section: - def __init__(self, theme='DEFAULT'): - self.text = '' + def __init__(self, theme="DEFAULT"): + self.text = "" self.size = 0 self.toSize = 0 self.theme = theme self.visible = False - self.name = '' + self.name = "" def update(self, text): - if text == '': + if text == "": self.toSize = 0 else: if len(text) < len(self.text): - self.text = text + self.text[len(text):] + self.text = text + self.text[len(text) :] else: self.text = text self.toSize = len(text) + 3 @@ -93,39 +108,39 @@ class Section: self.visible = self.size return self.toSize == self.size - def draw(self, left=True, nextTheme='DEFAULT'): - s = '' + def draw(self, left=True, nextTheme="DEFAULT"): + s = "" if self.visible: if not left: if self.theme == nextTheme: - s += '' + s += "" else: - s += '%{F' + bgColor(self.theme) + '}' - s += '%{B' + bgColor(nextTheme) + '}' - s += '' - s += '%{F' + fgColor(self.theme) + '}' - s += '%{B' + bgColor(self.theme) + '}' - s += ' ' if self.size > 1 else '' + s += "%{F" + bgColor(self.theme) + "}" + s += "%{B" + bgColor(nextTheme) + "}" + s += "" + s += "%{F" + fgColor(self.theme) + "}" + s += "%{B" + bgColor(self.theme) + "}" + s += " " if self.size > 1 else "" s += fitText(self.text, self.size - 3) - s += ' ' if self.size > 2 else '' + s += " " if self.size > 2 else "" if left: if self.theme == nextTheme: - s += '' + s += "" else: - s += '%{F' + bgColor(self.theme) + '}' - s += '%{B' + bgColor(nextTheme) + '}' - s += '' + s += "%{F" + bgColor(self.theme) + "}" + s += "%{B" + bgColor(nextTheme) + "}" + s += "" return s # Section definition -sTime = Section('3') +sTime = Section("3") -hostname = os.environ['HOSTNAME'].split('.')[0] -sHost = Section('2') +hostname = os.environ["HOSTNAME"].split(".")[0] +sHost = Section("2") sHost.update( - os.environ['USER'] + '@' + hostname.split('-')[-1] - if '-' in hostname else hostname) + os.environ["USER"] + "@" + hostname.split("-")[-1] if "-" in hostname else hostname +) # Groups definition @@ -133,7 +148,7 @@ gLeft = [] gRight = [sTime, sHost] # Bar handling -bar = subprocess.Popen(['lemonbar', '-f', FONT, '-b'], stdin=subprocess.PIPE) +bar = subprocess.Popen(["lemonbar", "-f", FONT, "-b"], stdin=subprocess.PIPE) def updateBar(): @@ -141,35 +156,45 @@ def updateBar(): global gLeft, gRight global outputs - text = '' + text = "" for oi in range(len(outputs)): output = outputs[oi] gLeftFiltered = list( filter( - lambda s: s.visible and ( - not s.output or s.output == output.name), - gLeft)) - tLeft = '' + lambda s: s.visible and (not s.output or s.output == output.name), gLeft + ) + ) + tLeft = "" l = len(gLeftFiltered) for gi in range(l): g = gLeftFiltered[gi] # Next visible section for transition - nextTheme = gLeftFiltered[gi + 1].theme if gi + 1 < l else 'CENTER' + nextTheme = gLeftFiltered[gi + 1].theme if gi + 1 < l else "CENTER" tLeft = tLeft + g.draw(True, nextTheme) - tRight = '' + tRight = "" for gi in range(len(gRight)): g = gRight[gi] - nextTheme = 'CENTER' - for gn in gRight[gi + 1:]: + nextTheme = "CENTER" + for gn in gRight[gi + 1 :]: if gn.visible: nextTheme = gn.theme break tRight = g.draw(False, nextTheme) + tRight - text += '%{l}' + tLeft + '%{r}' + tRight + \ - '%{B' + bgColor('CENTER') + '}' + '%{S' + str(oi + 1) + '}' + text += ( + "%{l}" + + tLeft + + "%{r}" + + tRight + + "%{B" + + bgColor("CENTER") + + "}" + + "%{S" + + str(oi + 1) + + "}" + ) - bar.stdin.write(bytes(text + '\n', 'utf-8')) + bar.stdin.write(bytes(text + "\n", "utf-8")) bar.stdin.flush() @@ -182,12 +207,10 @@ def on_output(): global outputs outputs = sorted( sorted( - list( - filter( - lambda o: o.active, - i3.get_outputs())), - key=lambda o: o.rect.y), - key=lambda o: o.rect.x) + list(filter(lambda o: o.active, i3.get_outputs())), key=lambda o: o.rect.y + ), + key=lambda o: o.rect.x, + ) on_output() @@ -209,34 +232,33 @@ def on_workspace_focus(): if workspace.visible: section.update(workspace.name) else: - section.update(workspace.name.split(' ')[0]) + section.update(workspace.name.split(" ")[0]) if workspace.focused: - section.theme = '4' + section.theme = "4" elif workspace.urgent: - section.theme = '1' + section.theme = "1" else: - section.theme = '6' + section.theme = "6" else: - section.update('') - section.theme = '6' + section.update("") + section.theme = "6" - for tag, i, j, k, l in difflib.SequenceMatcher( - None, sNames, wNames).get_opcodes(): - if tag == 'equal': # If the workspaces didn't changed + for tag, i, j, k, l in difflib.SequenceMatcher(None, sNames, wNames).get_opcodes(): + if tag == "equal": # If the workspaces didn't changed for a in range(j - i): workspace = workspaces[k + a] section = gLeft[i + a] actuate(section, workspace) newGLeft.append(section) - if tag in ('delete', 'replace'): # If the workspaces were removed + if tag in ("delete", "replace"): # If the workspaces were removed for section in gLeft[i:j]: if section.visible: actuate(section, None) newGLeft.append(section) else: del section - if tag in ('insert', 'replace'): # If the workspaces were removed + if tag in ("insert", "replace"): # If the workspaces were removed for workspace in workspaces[k:l]: section = Section() actuate(section, workspace) @@ -255,12 +277,14 @@ def i3events(i3childPipe): # Proxy functions def on_workspace_focus(i3, e): global i3childPipe - i3childPipe.send('on_workspace_focus') + i3childPipe.send("on_workspace_focus") + i3.on("workspace::focus", on_workspace_focus) def on_output(i3, e): global i3childPipe - i3childPipe.send('on_output') + i3childPipe.send("on_output") + i3.on("output", on_output) i3.main() @@ -274,7 +298,7 @@ i3process.start() def updateValues(): # Time now = datetime.datetime.now() - sTime.update(now.strftime('%x %X')) + sTime.update(now.strftime("%x %X")) def updateAnimation(): @@ -288,9 +312,9 @@ while True: now = time.time() if i3parentPipe.poll(): msg = i3parentPipe.recv() - if msg == 'on_workspace_focus': + if msg == "on_workspace_focus": on_workspace_focus() - elif msg == 'on_output': + elif msg == "on_output": on_output() # TODO Restart lemonbar else: diff --git a/config/lemonbar/providers.py b/config/lemonbar/providers.py index 708b513..c65f528 100755 --- a/config/lemonbar/providers.py +++ b/config/lemonbar/providers.py @@ -16,17 +16,18 @@ import mpd import random import math -coloredlogs.install(level='DEBUG', fmt='%(levelname)s %(message)s') +coloredlogs.install(level="DEBUG", fmt="%(levelname)s %(message)s") log = logging.getLogger() # TODO Generator class (for I3WorkspacesProvider, NetworkProvider and later # PulseaudioProvider and MpdProvider) + def humanSize(num): """ Returns a string of width 3+3 """ - for unit in ('B ','KiB','MiB','GiB','TiB','PiB','EiB','ZiB'): + for unit in ("B ", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB"): if abs(num) < 1000: if num >= 10: return "{:3d}{}".format(int(num), unit) @@ -35,16 +36,15 @@ def humanSize(num): num /= 1024.0 return "{:d}YiB".format(num) + def randomColor(seed=0): random.seed(seed) - return '#{:02x}{:02x}{:02x}'.format(*[random.randint(0, 255) for _ in range(3)]) + return "#{:02x}{:02x}{:02x}".format(*[random.randint(0, 255) for _ in range(3)]) class TimeProvider(StatefulSection, PeriodicUpdater): - FORMATS = ["%H:%M", - "%m-%d %H:%M:%S", - "%a %y-%m-%d %H:%M:%S"] + FORMATS = ["%H:%M", "%m-%d %H:%M:%S", "%a %y-%m-%d %H:%M:%S"] NUMBER_STATES = len(FORMATS) DEFAULT_STATE = 1 @@ -55,18 +55,18 @@ class TimeProvider(StatefulSection, PeriodicUpdater): def __init__(self, theme=None): PeriodicUpdater.__init__(self) StatefulSection.__init__(self, theme) - self.changeInterval(1) # TODO OPTI When state < 1 + self.changeInterval(1) # TODO OPTI When state < 1 + class AlertLevel(enum.Enum): NORMAL = 0 WARNING = 1 DANGER = 2 + class AlertingSection(StatefulSection): # TODO EASE Correct settings for themes - THEMES = {AlertLevel.NORMAL: 2, - AlertLevel.WARNING: 3, - AlertLevel.DANGER: 1} + THEMES = {AlertLevel.NORMAL: 2, AlertLevel.WARNING: 3, AlertLevel.DANGER: 1} PERSISTENT = True def getLevel(self, quantity): @@ -92,16 +92,16 @@ class AlertingSection(StatefulSection): class CpuProvider(AlertingSection, PeriodicUpdater): NUMBER_STATES = 3 - ICON = '' + ICON = "" def fetcher(self): percent = psutil.cpu_percent(percpu=False) - self.updateLevel(percent/100) + self.updateLevel(percent / 100) if self.state >= 2: percents = psutil.cpu_percent(percpu=True) - return ''.join([Section.ramp(p/100) for p in percents]) + return "".join([Section.ramp(p / 100) for p in percents]) elif self.state >= 1: - return Section.ramp(percent/100) + return Section.ramp(percent / 100) def __init__(self, theme=None): AlertingSection.__init__(self, theme) @@ -113,12 +113,13 @@ class RamProvider(AlertingSection, PeriodicUpdater): """ Shows free RAM """ + NUMBER_STATES = 4 - ICON = '' + ICON = "" def fetcher(self): mem = psutil.virtual_memory() - freePerc = mem.percent/100 + freePerc = mem.percent / 100 self.updateLevel(freePerc) if self.state < 1: @@ -130,7 +131,7 @@ class RamProvider(AlertingSection, PeriodicUpdater): text.append(freeStr) if self.state >= 3: totalStr = humanSize(mem.total) - text.append('/', totalStr) + text.append("/", totalStr) return text @@ -146,18 +147,18 @@ class TemperatureProvider(AlertingSection, PeriodicUpdater): def fetcher(self): allTemp = psutil.sensors_temperatures() - if 'coretemp' not in allTemp: + if "coretemp" not in allTemp: # TODO Opti Remove interval - return '' - temp = allTemp['coretemp'][0] + return "" + temp = allTemp["coretemp"][0] self.warningThresold = temp.high self.dangerThresold = temp.critical self.updateLevel(temp.current) - self.icon = Section.ramp(temp.current/temp.high, self.RAMP) + self.icon = Section.ramp(temp.current / temp.high, self.RAMP) if self.state >= 1: - return '{:.0f}°C'.format(temp.current) + return "{:.0f}°C".format(temp.current) def __init__(self, theme=None): AlertingSection.__init__(self, theme) @@ -176,15 +177,16 @@ class BatteryProvider(AlertingSection, PeriodicUpdater): self.icon = None return None - self.icon = ("" if bat.power_plugged else "") + \ - Section.ramp(bat.percent/100, self.RAMP) + self.icon = ("" if bat.power_plugged else "") + Section.ramp( + bat.percent / 100, self.RAMP + ) - self.updateLevel(1-bat.percent/100) + self.updateLevel(1 - bat.percent / 100) if self.state < 1: return - t = Text('{:.0f}%'.format(bat.percent)) + t = Text("{:.0f}%".format(bat.percent)) if self.state < 2: return t @@ -200,7 +202,6 @@ class BatteryProvider(AlertingSection, PeriodicUpdater): self.changeInterval(5) - class PulseaudioProvider(StatefulSection, ThreadedUpdater): NUMBER_STATES = 3 DEFAULT_STATE = 1 @@ -208,28 +209,27 @@ class PulseaudioProvider(StatefulSection, ThreadedUpdater): def __init__(self, theme=None): ThreadedUpdater.__init__(self) StatefulSection.__init__(self, theme) - self.pulseEvents = pulsectl.Pulse('event-handler') + self.pulseEvents = pulsectl.Pulse("event-handler") self.pulseEvents.event_mask_set(pulsectl.PulseEventMaskEnum.sink) self.pulseEvents.event_callback_set(self.handleEvent) self.start() self.refreshData() - def fetcher(self): sinks = [] - with pulsectl.Pulse('list-sinks') as pulse: + with pulsectl.Pulse("list-sinks") as pulse: for sink in pulse.sink_list(): if sink.port_active.name == "analog-output-headphones": icon = "" elif sink.port_active.name == "analog-output-speaker": icon = "" if sink.mute else "" elif sink.port_active.name == "headset-output": - icon = '' + icon = "" else: icon = "?" vol = pulse.volume_get_all_chans(sink) - fg = (sink.mute and '#333333') or (vol > 1 and '#FF0000') or None + fg = (sink.mute and "#333333") or (vol > 1 and "#FF0000") or None t = Text(icon, fg=fg) sinks.append(t) @@ -245,7 +245,7 @@ class PulseaudioProvider(StatefulSection, ThreadedUpdater): vol -= 1 t.append(ramp) else: - t.append(" {:2.0f}%".format(vol*100)) + t.append(" {:2.0f}%".format(vol * 100)) return Text(*sinks) @@ -263,27 +263,27 @@ class NetworkProviderSection(StatefulSection, Updater): def actType(self): self.ssid = None - if self.iface.startswith('eth') or self.iface.startswith('enp'): - if 'u' in self.iface: - self.icon = '' + if self.iface.startswith("eth") or self.iface.startswith("enp"): + if "u" in self.iface: + self.icon = "" else: - self.icon = '' - elif self.iface.startswith('wlan') or self.iface.startswith('wl'): - self.icon = '' + self.icon = "" + elif self.iface.startswith("wlan") or self.iface.startswith("wl"): + self.icon = "" if self.showSsid: cmd = ["iwgetid", self.iface, "--raw"] p = subprocess.run(cmd, stdout=subprocess.PIPE) self.ssid = p.stdout.strip().decode() - elif self.iface.startswith('tun') or self.iface.startswith('tap'): - self.icon = '' - elif self.iface.startswith('docker'): - self.icon = '' - elif self.iface.startswith('veth'): - self.icon = '' - elif self.iface.startswith('vboxnet'): - self.icon = '' + elif self.iface.startswith("tun") or self.iface.startswith("tap"): + self.icon = "" + elif self.iface.startswith("docker"): + self.icon = "" + elif self.iface.startswith("veth"): + self.icon = "" + elif self.iface.startswith("vboxnet"): + self.icon = "" else: - self.icon = '?' + self.icon = "?" def getAddresses(self): ipv4 = None @@ -298,9 +298,11 @@ class NetworkProviderSection(StatefulSection, Updater): def fetcher(self): self.icon = None self.persistent = False - if self.iface not in self.parent.stats or \ - not self.parent.stats[self.iface].isup or \ - self.iface.startswith('lo'): + if ( + self.iface not in self.parent.stats + or not self.parent.stats[self.iface].isup + or self.iface.startswith("lo") + ): return None # Get addresses @@ -317,30 +319,36 @@ class NetworkProviderSection(StatefulSection, Updater): if self.showAddress: if ipv4: - netStrFull = '{}/{}'.format(ipv4.address, ipv4.netmask) + netStrFull = "{}/{}".format(ipv4.address, ipv4.netmask) addr = ipaddress.IPv4Network(netStrFull, strict=False) - addrStr = '{}/{}'.format(ipv4.address, addr.prefixlen) + addrStr = "{}/{}".format(ipv4.address, addr.prefixlen) text.append(addrStr) # TODO IPV6 # if ipv6: # text += ' ' + ipv6.address if self.showSpeed: - recvDiff = self.parent.IO[self.iface].bytes_recv \ + recvDiff = ( + self.parent.IO[self.iface].bytes_recv - self.parent.prevIO[self.iface].bytes_recv - sentDiff = self.parent.IO[self.iface].bytes_sent \ + ) + sentDiff = ( + self.parent.IO[self.iface].bytes_sent - self.parent.prevIO[self.iface].bytes_sent + ) recvDiff /= self.parent.dt sentDiff /= self.parent.dt - text.append('↓{}↑{}'.format(humanSize(recvDiff), - humanSize(sentDiff))) + text.append("↓{}↑{}".format(humanSize(recvDiff), humanSize(sentDiff))) if self.showTransfer: - text.append('⇓{}⇑{}'.format( - humanSize(self.parent.IO[self.iface].bytes_recv), - humanSize(self.parent.IO[self.iface].bytes_sent))) + text.append( + "⇓{}⇑{}".format( + humanSize(self.parent.IO[self.iface].bytes_recv), + humanSize(self.parent.IO[self.iface].bytes_sent), + ) + ) - return ' '.join(text) + return " ".join(text) def onChangeState(self, state): self.showSsid = state >= 1 @@ -402,32 +410,33 @@ class NetworkProvider(Section, PeriodicUpdater): self.fetchData() self.changeInterval(5) + class RfkillProvider(Section, PeriodicUpdater): # TODO FEAT rfkill doesn't seem to indicate that the hardware switch is # toggled - PATH = '/sys/class/rfkill' + PATH = "/sys/class/rfkill" def fetcher(self): t = Text() for device in os.listdir(self.PATH): - with open(os.path.join(self.PATH, device, 'soft'), 'rb') as f: - softBlocked = f.read().strip() != b'0' - with open(os.path.join(self.PATH, device, 'hard'), 'rb') as f: - hardBlocked = f.read().strip() != b'0' + with open(os.path.join(self.PATH, device, "soft"), "rb") as f: + softBlocked = f.read().strip() != b"0" + with open(os.path.join(self.PATH, device, "hard"), "rb") as f: + hardBlocked = f.read().strip() != b"0" if not hardBlocked and not softBlocked: continue - with open(os.path.join(self.PATH, device, 'type'), 'rb') as f: + with open(os.path.join(self.PATH, device, "type"), "rb") as f: typ = f.read().strip() - fg = (hardBlocked and '#CCCCCC') or (softBlocked and '#FF0000') - if typ == b'wlan': - icon = '' - elif typ == b'bluetooth': - icon = '' + fg = (hardBlocked and "#CCCCCC") or (softBlocked and "#FF0000") + if typ == b"wlan": + icon = "" + elif typ == b"bluetooth": + icon = "" else: - icon = '?' + icon = "?" t.append(Text(icon, fg=fg)) return t @@ -437,6 +446,7 @@ class RfkillProvider(Section, PeriodicUpdater): Section.__init__(self, theme) self.changeInterval(5) + class SshAgentProvider(PeriodicUpdater): def fetcher(self): cmd = ["ssh-add", "-l"] @@ -444,17 +454,18 @@ class SshAgentProvider(PeriodicUpdater): if proc.returncode != 0: return None text = Text() - for line in proc.stdout.split(b'\n'): + for line in proc.stdout.split(b"\n"): if not len(line): continue fingerprint = line.split()[1] - text.append(Text('', fg=randomColor(seed=fingerprint))) + text.append(Text("", fg=randomColor(seed=fingerprint))) return text def __init__(self): PeriodicUpdater.__init__(self) self.changeInterval(5) + class GpgAgentProvider(PeriodicUpdater): def fetcher(self): cmd = ["gpg-connect-agent", "keyinfo --list", "/bye"] @@ -463,39 +474,41 @@ class GpgAgentProvider(PeriodicUpdater): if proc.returncode != 0: return None text = Text() - for line in proc.stdout.split(b'\n'): - if not len(line) or line == b'OK': + for line in proc.stdout.split(b"\n"): + if not len(line) or line == b"OK": continue spli = line.split() - if spli[6] != b'1': + if spli[6] != b"1": continue keygrip = spli[2] - text.append(Text('', fg=randomColor(seed=keygrip))) + text.append(Text("", fg=randomColor(seed=keygrip))) return text def __init__(self): PeriodicUpdater.__init__(self) self.changeInterval(5) + class KeystoreProvider(Section, MergedUpdater): # TODO OPTI+FEAT Use ColorCountsSection and not MergedUpdater, this is useless - ICON = '' + ICON = "" def __init__(self, theme=None): MergedUpdater.__init__(self, SshAgentProvider(), GpgAgentProvider()) Section.__init__(self, theme) + class NotmuchUnreadProvider(ColorCountsSection, InotifyUpdater): - COLORABLE_ICON = '' + COLORABLE_ICON = "" def subfetcher(self): db = notmuch.Database(mode=notmuch.Database.MODE.READ_ONLY, path=self.dir) counts = [] for account in self.accounts: - queryStr = 'folder:/{}/ and tag:unread'.format(account) + queryStr = "folder:/{}/ and tag:unread".format(account) query = notmuch.Query(db, queryStr) nbMsgs = query.count_messages() - if account == 'frogeye': + if account == "frogeye": global q q = query if nbMsgs < 1: @@ -504,7 +517,7 @@ class NotmuchUnreadProvider(ColorCountsSection, InotifyUpdater): # db.close() return counts - def __init__(self, dir='~/.mail/', theme=None): + def __init__(self, dir="~/.mail/", theme=None): PeriodicUpdater.__init__(self) ColorCountsSection.__init__(self, theme) @@ -512,23 +525,24 @@ class NotmuchUnreadProvider(ColorCountsSection, InotifyUpdater): assert os.path.isdir(self.dir) # Fetching account list - self.accounts = sorted([a for a in os.listdir(self.dir) - if not a.startswith('.')]) + self.accounts = sorted( + [a for a in os.listdir(self.dir) if not a.startswith(".")] + ) # Fetching colors self.colors = dict() for account in self.accounts: - filename = os.path.join(self.dir, account, 'color') - with open(filename, 'r') as f: + filename = os.path.join(self.dir, account, "color") + with open(filename, "r") as f: color = f.read().strip() self.colors[account] = color - self.addPath(os.path.join(self.dir, '.notmuch', 'xapian')) + self.addPath(os.path.join(self.dir, ".notmuch", "xapian")) class TodoProvider(ColorCountsSection, InotifyUpdater): # TODO OPT/UX Maybe we could get more data from the todoman python module # TODO OPT Specific callback for specific directory - COLORABLE_ICON = '' + COLORABLE_ICON = "" def updateCalendarList(self): calendars = sorted(os.listdir(self.dir)) @@ -538,13 +552,13 @@ class TodoProvider(ColorCountsSection, InotifyUpdater): self.addPath(os.path.join(self.dir, calendar), refresh=False) # Fetching name - path = os.path.join(self.dir, calendar, 'displayname') - with open(path, 'r') as f: + path = os.path.join(self.dir, calendar, "displayname") + with open(path, "r") as f: self.names[calendar] = f.read().strip() # Fetching color - path = os.path.join(self.dir, calendar, 'color') - with open(path, 'r') as f: + path = os.path.join(self.dir, calendar, "color") + with open(path, "r") as f: self.colors[calendar] = f.read().strip() self.calendars = calendars @@ -579,8 +593,8 @@ class TodoProvider(ColorCountsSection, InotifyUpdater): if self.state < 2: c = self.countUndone(None) if c > 0: - counts.append((c, '#00000')) - counts.append((0, '#FFFFF')) + counts.append((c, "#00000")) + counts.append((0, "#FFFFF")) return counts # Optimisation ends here @@ -591,6 +605,7 @@ class TodoProvider(ColorCountsSection, InotifyUpdater): counts.append((c, self.colors[calendar])) return counts + class I3WindowTitleProvider(Section, I3Updater): # TODO FEAT To make this available from start, we need to find the # `focused=True` element following the `focus` array @@ -603,6 +618,7 @@ class I3WindowTitleProvider(Section, I3Updater): Section.__init__(self, theme=theme) self.on("window", self.on_window) + class I3WorkspacesProviderSection(Section): def selectTheme(self): if self.urgent: @@ -626,12 +642,12 @@ class I3WorkspacesProviderSection(Section): def setName(self, name): self.shortName = name - self.fullName = self.parent.customNames[name] \ - if name in self.parent.customNames else name + self.fullName = ( + self.parent.customNames[name] if name in self.parent.customNames else name + ) def switchTo(self): - self.parent.i3.command('workspace {}'.format(self.shortName)) - + self.parent.i3.command("workspace {}".format(self.shortName)) def __init__(self, name, parent): Section.__init__(self) @@ -652,7 +668,6 @@ class I3WorkspacesProviderSection(Section): self.updateText(None) - class I3WorkspacesProvider(Section, I3Updater): # TODO FEAT Multi-screen @@ -713,7 +728,7 @@ class I3WorkspacesProvider(Section, I3Updater): self.sections[e.current.num].show() def on_mode(self, i3, e): - if e.change == 'default': + if e.change == "default": self.modeSection.updateText(None) for section in self.sections.values(): section.tempShow() @@ -722,7 +737,9 @@ class I3WorkspacesProvider(Section, I3Updater): for section in self.sections.values(): section.tempEmpty() - def __init__(self, theme=0, themeFocus=3, themeUrgent=1, themeMode=2, customNames=dict()): + def __init__( + self, theme=0, themeFocus=3, themeUrgent=1, themeMode=2, customNames=dict() + ): I3Updater.__init__(self) Section.__init__(self) self.themeNormal = theme @@ -746,13 +763,14 @@ class I3WorkspacesProvider(Section, I3Updater): parent.addSection(self.modeSection) self.initialPopulation(parent) + class MpdProvider(Section, ThreadedUpdater): # TODO FEAT More informations and controls MAX_LENGTH = 50 def connect(self): - self.mpd.connect('localhost', 6600) + self.mpd.connect("localhost", 6600) def __init__(self, theme=None): ThreadedUpdater.__init__(self) @@ -784,18 +802,16 @@ class MpdProvider(Section, ThreadedUpdater): infosStr = " - ".join(infos) if len(infosStr) > MpdProvider.MAX_LENGTH: - infosStr = infosStr[:MpdProvider.MAX_LENGTH-1] + '…' + infosStr = infosStr[: MpdProvider.MAX_LENGTH - 1] + "…" return " {}".format(infosStr) def loop(self): try: - self.mpd.idle('player') + self.mpd.idle("player") self.refreshData() except mpd.base.ConnectionError as e: log.warn(e, exc_info=True) self.connect() except BaseException as e: log.error(e, exc_info=True) - - diff --git a/config/lemonbar/updaters.py b/config/lemonbar/updaters.py index b8c4ae6..d3e5986 100755 --- a/config/lemonbar/updaters.py +++ b/config/lemonbar/updaters.py @@ -11,13 +11,14 @@ import coloredlogs import i3ipc from display import Text -coloredlogs.install(level='DEBUG', fmt='%(levelname)s %(message)s') +coloredlogs.install(level="DEBUG", fmt="%(levelname)s %(message)s") log = logging.getLogger() # TODO Sync bar update with PeriodicUpdater updates notBusy = threading.Event() + class Updater: @staticmethod def init(): @@ -52,8 +53,9 @@ class PeriodicUpdaterThread(threading.Thread): counter = 0 while True: notBusy.set() - if PeriodicUpdater.intervalsChanged \ - .wait(timeout=PeriodicUpdater.intervalStep): + if PeriodicUpdater.intervalsChanged.wait( + timeout=PeriodicUpdater.intervalStep + ): # ↑ sleeps here notBusy.clear() PeriodicUpdater.intervalsChanged.clear() @@ -127,7 +129,6 @@ class PeriodicUpdater(Updater): class InotifyUpdaterEventHandler(pyinotify.ProcessEvent): - def process_default(self, event): # DEBUG # from pprint import pprint @@ -155,8 +156,9 @@ class InotifyUpdater(Updater): @staticmethod def init(): - notifier = pyinotify.ThreadedNotifier(InotifyUpdater.wm, - InotifyUpdaterEventHandler()) + notifier = pyinotify.ThreadedNotifier( + InotifyUpdater.wm, InotifyUpdaterEventHandler() + ) notifier.start() # TODO Mask for folders @@ -174,8 +176,7 @@ class InotifyUpdater(Updater): self.dirpath = os.path.dirname(path) self.filename = os.path.basename(path) else: - raise FileNotFoundError("No such file or directory: '{}'" - .format(path)) + raise FileNotFoundError("No such file or directory: '{}'".format(path)) # Register watch action if self.dirpath not in InotifyUpdater.paths: @@ -266,4 +267,4 @@ class MergedUpdater(Updater): updater.updateText = newUpdateText.__get__(updater, Updater) self.updaters.append(updater) - self.texts[updater] = '' + self.texts[updater] = "" diff --git a/config/scripts/archive b/config/scripts/archive index 9eaeaa1..b942455 100755 --- a/config/scripts/archive +++ b/config/scripts/archive @@ -6,7 +6,7 @@ import logging import os import sys -coloredlogs.install(level='DEBUG', fmt='%(levelname)s %(message)s') +coloredlogs.install(level="DEBUG", fmt="%(levelname)s %(message)s") log = logging.getLogger() # Coding conventions: @@ -15,10 +15,10 @@ log = logging.getLogger() # TODO Config arparse and pass args to the functions. No globals # Finding directories -assert 'HOME' in os.environ, "Home directory unknown" -DOCS = os.path.realpath(os.path.join(os.environ['HOME'], 'Documents')) +assert "HOME" in os.environ, "Home directory unknown" +DOCS = os.path.realpath(os.path.join(os.environ["HOME"], "Documents")) assert os.path.isdir(DOCS), "Documents folder not found" -ARCS = os.path.realpath(os.path.join(os.environ['HOME'], 'Archives')) +ARCS = os.path.realpath(os.path.join(os.environ["HOME"], "Archives")) assert os.path.isdir(ARCS), "Archives folder not found" @@ -27,7 +27,7 @@ def dirRange(relpath): res = list() for p in range(len(splits)): - partPath = os.path.join(*splits[:p+1]) + partPath = os.path.join(*splits[: p + 1]) arcPath = os.path.join(os.path.join(ARCS, partPath)) docPath = os.path.join(os.path.join(DOCS, partPath)) @@ -36,6 +36,7 @@ def dirRange(relpath): return res + def travel(relpath): """ Dunno what this will do, let's write code and see. @@ -60,8 +61,10 @@ def travel(relpath): elif os.path.islink(docPath) and os.path.exists(arcPath): currentLink = os.readlink(docPath) if currentLink != linkPath: - log.warning(f"'{docPath}' is pointing to '{currentLink}' " + - f"but should point to '{linkPath}'.") + log.warning( + f"'{docPath}' is pointing to '{currentLink}' " + + f"but should point to '{linkPath}'." + ) # TODO Fixing if asked for sys.exit(1) log.debug("Early link already exists {docPath} → {arcPath}") @@ -69,13 +72,11 @@ def travel(relpath): elif not os.path.exists(docPath) and os.path.exists(arcPath): log.debug("Only existing on archive side, linking") print(f"ln -s {linkPath} {docPath}") - elif os.path.exists(docPath) and not os.path.exists(arcPath) \ - and isLast: + elif os.path.exists(docPath) and not os.path.exists(arcPath) and isLast: log.debug("Only existing on doc side, moving and linking") print(f"mv {docPath} {arcPath}") print(f"ln -s {linkPath} {docPath}") - elif os.path.exists(docPath) and not os.path.exists(arcPath) \ - and not isLast: + elif os.path.exists(docPath) and not os.path.exists(arcPath) and not isLast: raise NotImplementedError("Here comes the trouble") else: log.error("Unhandled case") @@ -103,8 +104,10 @@ def ensureLink(relpath): if os.path.islink(docPath): currentLink = os.readlink(docPath) if currentLink != linkPath: - log.warning(f"'{docPath}' is pointing to '{currentLink}' " + - f"but should point to '{linkPath}'. Fixing") + log.warning( + f"'{docPath}' is pointing to '{currentLink}' " + + f"but should point to '{linkPath}'. Fixing" + ) if args.dry: print(f"rm {docPath}") else: @@ -117,10 +120,13 @@ def ensureLink(relpath): elif os.path.isdir(docPath): continue else: - raise RuntimeError(f"'{docPath}' exists and is not a directory " + - f"or a link. Unable to link it to '{linkPath}'") - raise RuntimeError(f"'{docPath}' is a directory. Unable to link it to " + - f"'{linkPath}'") + raise RuntimeError( + f"'{docPath}' exists and is not a directory " + + f"or a link. Unable to link it to '{linkPath}'" + ) + raise RuntimeError( + f"'{docPath}' is a directory. Unable to link it to " + f"'{linkPath}'" + ) def archive(docdir): @@ -134,8 +140,8 @@ def archive(docdir): print("ARC", reldir) arcdir = os.path.join(ARCS, reldir) - parentArcdir = os.path.realpath(os.path.join(arcdir, '..')) - parentDocdir = os.path.realpath(os.path.join(docdir, '..')) + parentArcdir = os.path.realpath(os.path.join(arcdir, "..")) + parentDocdir = os.path.realpath(os.path.join(docdir, "..")) linkDest = os.path.relpath(arcdir, parentDocdir) # BULLSHIT @@ -172,11 +178,15 @@ def unarchive(arcdir): if __name__ == "__main__": - parser = argparse.ArgumentParser(description="Place a folder in ~/Documents in ~/Documents/Archives and symlink it") - parser.add_argument('dir', metavar='DIRECTORY', type=str, help="The directory to archive") - parser.add_argument('-d', '--dry', action='store_true') + parser = argparse.ArgumentParser( + description="Place a folder in ~/Documents in ~/Documents/Archives and symlink it" + ) + parser.add_argument( + "dir", metavar="DIRECTORY", type=str, help="The directory to archive" + ) + parser.add_argument("-d", "--dry", action="store_true") args = parser.parse_args() - args.dry = True # DEBUG + args.dry = True # DEBUG # archive(args.dir) ensureLink(args.dir) diff --git a/config/scripts/compressPictureMovies b/config/scripts/compressPictureMovies index da74b42..29df236 100755 --- a/config/scripts/compressPictureMovies +++ b/config/scripts/compressPictureMovies @@ -14,7 +14,7 @@ import json import statistics import datetime -coloredlogs.install(level='DEBUG', fmt='%(levelname)s %(message)s') +coloredlogs.install(level="DEBUG", fmt="%(levelname)s %(message)s") log = logging.getLogger() # Constants @@ -34,15 +34,15 @@ def videoMetadata(filename): p.check_returncode() metadataRaw = p.stdout data = dict() - for metadataLine in metadataRaw.split(b'\n'): + for metadataLine in metadataRaw.split(b"\n"): # Skip empty lines if not len(metadataLine): continue # Skip comments - if metadataLine.startswith(b';'): + if metadataLine.startswith(b";"): continue # Parse key-value - metadataLineSplit = metadataLine.split(b'=') + metadataLineSplit = metadataLine.split(b"=") if len(metadataLineSplit) != 2: log.warning("Unparsed metadata line: `{}`".format(metadataLine)) continue @@ -52,6 +52,7 @@ def videoMetadata(filename): data[key] = val return data + def videoInfos(filename): assert os.path.isfile(filename) cmd = ["ffprobe", filename, "-print_format", "json", "-show_streams"] @@ -61,7 +62,10 @@ def videoInfos(filename): infos = json.loads(infosRaw) return infos + from pprint import pprint + + def streamDuration(stream): if "duration" in stream: return float(stream["duration"]) @@ -77,13 +81,14 @@ def streamDuration(stream): else: raise KeyError("Can't find duration information in stream") + def videoDuration(filename): # TODO Doesn't work with VP8 / webm infos = videoInfos(filename) durations = [streamDuration(stream) for stream in infos["streams"]] dev = statistics.stdev(durations) assert dev <= DURATION_MAX_DEV, "Too much deviation ({} s)".format(dev) - return sum(durations)/len(durations) + return sum(durations) / len(durations) todos = set() @@ -130,13 +135,12 @@ for root, inputName in progressbar.progressbar(allVideos): meta = videoMetadata(inputFull) # If it has the field with the original file - if 'original' in meta: + if "original" in meta: # Skip file continue else: assert not os.path.isfile(outputFull), outputFull + " exists" - size = os.stat(inputFull).st_size try: duration = videoDuration(inputFull) @@ -151,7 +155,11 @@ for root, inputName in progressbar.progressbar(allVideos): totalSize += size todos.add(todo) -log.info("Converting {} videos ({})".format(len(todos), datetime.timedelta(seconds=totalDuration))) +log.info( + "Converting {} videos ({})".format( + len(todos), datetime.timedelta(seconds=totalDuration) + ) +) # From https://stackoverflow.com/a/3431838 def sha256(fname): @@ -161,17 +169,30 @@ def sha256(fname): hash_sha256.update(chunk) return hash_sha256.hexdigest() + # Progress bar things totalDataSize = progressbar.widgets.DataSize() -totalDataSize.variable = 'max_value' -barWidgets = [progressbar.widgets.DataSize(), ' of ', totalDataSize, ' ', progressbar.widgets.Bar(), ' ', progressbar.widgets.FileTransferSpeed(), ' ', progressbar.widgets.AdaptiveETA()] +totalDataSize.variable = "max_value" +barWidgets = [ + progressbar.widgets.DataSize(), + " of ", + totalDataSize, + " ", + progressbar.widgets.Bar(), + " ", + progressbar.widgets.FileTransferSpeed(), + " ", + progressbar.widgets.AdaptiveETA(), +] bar = progressbar.DataTransferBar(max_value=totalSize, widgets=barWidgets) bar.start() processedSize = 0 for inputFull, originalFull, outputFull, size, duration in todos: - tmpfile = tempfile.mkstemp(prefix="compressPictureMovies", suffix="."+OUTPUT_EXTENSION)[1] + tmpfile = tempfile.mkstemp( + prefix="compressPictureMovies", suffix="." + OUTPUT_EXTENSION + )[1] try: # Calculate the sum of the original file checksum = sha256(inputFull) @@ -180,7 +201,12 @@ for inputFull, originalFull, outputFull, size, duration in todos: originalRel = os.path.relpath(originalFull, ORIGINAL_FOLDER) originalContent = "{} {}".format(originalRel, checksum) metadataCmd = ["-metadata", 'original="{}"'.format(originalContent)] - cmd = ["ffmpeg", "-hide_banner", "-y", "-i", inputFull] + OUTPUT_FFMPEG_PARAMETERS + metadataCmd + [tmpfile] + cmd = ( + ["ffmpeg", "-hide_banner", "-y", "-i", inputFull] + + OUTPUT_FFMPEG_PARAMETERS + + metadataCmd + + [tmpfile] + ) p = subprocess.run(cmd) p.check_returncode() diff --git a/config/scripts/lestrte b/config/scripts/lestrte index 549edcb..f20360d 100755 --- a/config/scripts/lestrte +++ b/config/scripts/lestrte @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 import sys import random @@ -21,7 +21,7 @@ for line in sys.stdin: else: wrd = list(word) random.shuffle(wrd) - nl += ''.join(wrd) + nl += "".join(wrd) nl += c word = "" grace = True diff --git a/config/scripts/mediaDuration b/config/scripts/mediaDuration index 153f48d..5775540 100755 --- a/config/scripts/mediaDuration +++ b/config/scripts/mediaDuration @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 import os import sys @@ -7,7 +7,7 @@ import logging import coloredlogs -coloredlogs.install(level='DEBUG', fmt='%(levelname)s %(message)s') +coloredlogs.install(level="DEBUG", fmt="%(levelname)s %(message)s") log = logging.getLogger() @@ -26,7 +26,7 @@ def duration_file(path: str) -> float: ret = run.stdout.decode().strip() if run.returncode != 0: log.warning(f"{path}: unable to get duration") - elif ret == 'N/A': + elif ret == "N/A": log.warning(f"{path}: has no duration") else: try: diff --git a/config/scripts/mel b/config/scripts/mel index ff96bd1..9b0f8c3 100755 --- a/config/scripts/mel +++ b/config/scripts/mel @@ -45,7 +45,7 @@ import notmuch import progressbar import xdg.BaseDirectory -MailLocation = typing.NewType('MailLocation', typing.Tuple[str, str, str]) +MailLocation = typing.NewType("MailLocation", typing.Tuple[str, str, str]) # MessageAction = typing.Callable[[notmuch.Message], None] @@ -106,7 +106,8 @@ class MelEngine: assert not self.database self.log.info("Indexing mails") notmuch_config_file = os.path.expanduser( - "~/.config/notmuch-config") # TODO Better + "~/.config/notmuch-config" + ) # TODO Better cmd = ["notmuch", "--config", notmuch_config_file, "new"] self.log.debug(" ".join(cmd)) subprocess.run(cmd, check=True) @@ -117,7 +118,8 @@ class MelEngine: """ assert self.config storage_path = os.path.realpath( - os.path.expanduser(self.config["GENERAL"]["storage"])) + os.path.expanduser(self.config["GENERAL"]["storage"]) + ) folders = list() for account in self.accounts: storage_path_account = os.path.join(storage_path, account) @@ -125,9 +127,9 @@ class MelEngine: if "cur" not in dirs or "new" not in dirs or "tmp" not in dirs: continue assert root.startswith(storage_path) - path = root[len(storage_path):] - path_split = path.split('/') - if path_split[0] == '': + path = root[len(storage_path) :] + path_split = path.split("/") + if path_split[0] == "": path_split = path_split[1:] folders.append(tuple(path_split)) return folders @@ -138,8 +140,11 @@ class MelEngine: Be sure to require only in the mode you want to avoid deadlocks. """ assert self.config - mode = notmuch.Database.MODE.READ_WRITE if write \ + mode = ( + notmuch.Database.MODE.READ_WRITE + if write else notmuch.Database.MODE.READ_ONLY + ) if self.database: # If the requested mode is the one already present, # or we request read when it's already write, do nothing @@ -149,7 +154,8 @@ class MelEngine: self.close_database() self.log.info("Opening database in mode %s", mode) db_path = os.path.realpath( - os.path.expanduser(self.config["GENERAL"]["storage"])) + os.path.expanduser(self.config["GENERAL"]["storage"]) + ) self.database = notmuch.Database(mode=mode, path=db_path) def close_database(self) -> None: @@ -171,13 +177,13 @@ class MelEngine: assert self.database base = self.database.get_path() assert path.startswith(base) - path = path[len(base):] - path_split = path.split('/') + path = path[len(base) :] + path_split = path.split("/") mailbox = path_split[1] assert mailbox in self.accounts state = path_split[-1] folder = tuple(path_split[2:-1]) - assert state in {'cur', 'tmp', 'new'} + assert state in {"cur", "tmp", "new"} return (mailbox, folder, state) @staticmethod @@ -185,8 +191,11 @@ class MelEngine: """ Tells if the provided string is a valid UID. """ - return isinstance(uid, str) and len(uid) == 12 \ - and bool(re.match('^[a-zA-Z0-9+/]{12}$', uid)) + return ( + isinstance(uid, str) + and len(uid) == 12 + and bool(re.match("^[a-zA-Z0-9+/]{12}$", uid)) + ) @staticmethod def extract_email(field: str) -> str: @@ -197,9 +206,9 @@ class MelEngine: # TODO Can be made better (extract name and email) # Also what happens with multiple dests? try: - sta = field.index('<') - sto = field.index('>') - return field[sta+1:sto] + sta = field.index("<") + sto = field.index(">") + return field[sta + 1 : sto] except ValueError: return field @@ -211,8 +220,9 @@ class MelEngine: # Search-friendly folder name slug_folder_list = list() - for fold_index, fold in [(fold_index, folder[fold_index]) - for fold_index in range(len(folder))]: + for fold_index, fold in [ + (fold_index, folder[fold_index]) for fold_index in range(len(folder)) + ]: if fold_index == 0 and len(folder) > 1 and fold == "INBOX": continue slug_folder_list.append(fold.upper()) @@ -229,14 +239,15 @@ class MelEngine: msg.add_tag(tag) elif not condition and tag in tags: msg.remove_tag(tag) - expeditor = MelEngine.extract_email(msg.get_header('from')) - tag_if('inbox', slug_folder[0] == 'INBOX') - tag_if('spam', slug_folder[0] in ('JUNK', 'SPAM')) - tag_if('deleted', slug_folder[0] == 'TRASH') - tag_if('draft', slug_folder[0] == 'DRAFTS') - tag_if('sent', expeditor in self.aliases) - tag_if('unprocessed', False) + expeditor = MelEngine.extract_email(msg.get_header("from")) + + tag_if("inbox", slug_folder[0] == "INBOX") + tag_if("spam", slug_folder[0] in ("JUNK", "SPAM")) + tag_if("deleted", slug_folder[0] == "TRASH") + tag_if("draft", slug_folder[0] == "DRAFTS") + tag_if("sent", expeditor in self.aliases) + tag_if("unprocessed", False) # UID uid = msg.get_header("X-TUID") @@ -244,17 +255,23 @@ class MelEngine: # TODO Happens to sent mails but should it? print(f"{msg.get_filename()} has no UID!") return - uidtag = 'tuid{}'.format(uid) + uidtag = "tuid{}".format(uid) # Remove eventual others UID for tag in tags: - if tag.startswith('tuid') and tag != uidtag: + if tag.startswith("tuid") and tag != uidtag: msg.remove_tag(tag) msg.add_tag(uidtag) - def apply_msgs(self, query_str: str, action: typing.Callable, - *args: typing.Any, show_progress: bool = False, - write: bool = False, close_db: bool = True, - **kwargs: typing.Any) -> int: + def apply_msgs( + self, + query_str: str, + action: typing.Callable, + *args: typing.Any, + show_progress: bool = False, + write: bool = False, + close_db: bool = True, + **kwargs: typing.Any, + ) -> int: """ Run a function on the messages selected by the given query. """ @@ -267,8 +284,11 @@ class MelEngine: elements = query.search_messages() nb_msgs = query.count_messages() - iterator = progressbar.progressbar(elements, max_value=nb_msgs) \ - if show_progress and nb_msgs else elements + iterator = ( + progressbar.progressbar(elements, max_value=nb_msgs) + if show_progress and nb_msgs + else elements + ) self.log.info("Executing %s", action) for msg in iterator: @@ -296,8 +316,9 @@ class MelOutput: WIDTH_FIXED = 31 WIDTH_RATIO_DEST_SUBJECT = 0.3 - def compute_line_format(self) -> typing.Tuple[typing.Optional[int], - typing.Optional[int]]: + def compute_line_format( + self, + ) -> typing.Tuple[typing.Optional[int], typing.Optional[int]]: """ Based on the terminal width, assign the width of flexible columns. """ @@ -332,12 +353,12 @@ class MelOutput: """ now = datetime.datetime.now() if now - date < datetime.timedelta(days=1): - return date.strftime('%H:%M:%S') + return date.strftime("%H:%M:%S") if now - date < datetime.timedelta(days=28): - return date.strftime('%d %H:%M') + return date.strftime("%d %H:%M") if now - date < datetime.timedelta(days=365): - return date.strftime('%m-%d %H') - return date.strftime('%y-%m-%d') + return date.strftime("%m-%d %H") + return date.strftime("%y-%m-%d") @staticmethod def clip_text(size: typing.Optional[int], text: str) -> str: @@ -352,28 +373,28 @@ class MelOutput: if length == size: return text if length > size: - return text[:size-1] + '…' - return text + ' ' * (size - length) + return text[: size - 1] + "…" + return text + " " * (size - length) @staticmethod def chunks(iterable: str, chunk_size: int) -> typing.Iterable[str]: """Yield successive chunk_size-sized chunks from iterable.""" # From https://stackoverflow.com/a/312464 for i in range(0, len(iterable), chunk_size): - yield iterable[i:i + chunk_size] + yield iterable[i : i + chunk_size] @staticmethod - def sizeof_fmt(num: int, suffix: str = 'B') -> str: + def sizeof_fmt(num: int, suffix: str = "B") -> str: """ Print the given size in a human-readable format. """ remainder = float(num) # From https://stackoverflow.com/a/1094933 - for unit in ['', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi']: + for unit in ["", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"]: if abs(remainder) < 1024.0: return "%3.1f %s%s" % (remainder, unit, suffix) remainder /= 1024.0 - return "%.1f %s%s" % (remainder, 'Yi', suffix) + return "%.1f %s%s" % (remainder, "Yi", suffix) def get_mailbox_color(self, mailbox: str) -> str: """ @@ -381,7 +402,7 @@ class MelOutput: string with ASCII escape codes. """ if not self.is_tty: - return '' + return "" if mailbox not in self.mailbox_colors: # RGB colors (not supported everywhere) # color_str = self.config[mailbox]["color"] @@ -406,21 +427,24 @@ class MelOutput: line = "" tags = set(msg.get_tags()) mailbox, _, _ = self.engine.get_location(msg) - if 'unread' in tags or 'flagged' in tags: + if "unread" in tags or "flagged" in tags: line += colorama.Style.BRIGHT # if 'flagged' in tags: # line += colorama.Style.BRIGHT # if 'unread' not in tags: # line += colorama.Style.DIM - line += colorama.Back.LIGHTBLACK_EX if self.light_background \ + line += ( + colorama.Back.LIGHTBLACK_EX + if self.light_background else colorama.Back.BLACK + ) self.light_background = not self.light_background line += self.get_mailbox_color(mailbox) # UID uid = None for tag in tags: - if tag.startswith('tuid'): + if tag.startswith("tuid"): uid = tag[4:] assert uid, f"No UID for message: {msg}." assert MelEngine.is_uid(uid), f"{uid} {type(uid)} is not a valid UID." @@ -434,8 +458,9 @@ class MelOutput: # Icons line += sep + colorama.Fore.RED - def tags2col1(tag1: str, tag2: str, - characters: typing.Tuple[str, str, str, str]) -> None: + def tags2col1( + tag1: str, tag2: str, characters: typing.Tuple[str, str, str, str] + ) -> None: """ Show the presence/absence of two tags with one character. """ @@ -452,14 +477,14 @@ class MelOutput: else: line += none - tags2col1('spam', 'draft', ('?', 'S', 'D', ' ')) - tags2col1('attachment', 'encrypted', ('E', 'A', 'E', ' ')) - tags2col1('unread', 'flagged', ('!', 'U', 'F', ' ')) - tags2col1('sent', 'replied', ('?', '↑', '↪', ' ')) + tags2col1("spam", "draft", ("?", "S", "D", " ")) + tags2col1("attachment", "encrypted", ("E", "A", "E", " ")) + tags2col1("unread", "flagged", ("!", "U", "F", " ")) + tags2col1("sent", "replied", ("?", "↑", "↪", " ")) # Opposed line += sep + colorama.Fore.BLUE - if 'sent' in tags: + if "sent" in tags: dest = msg.get_header("to") else: dest = msg.get_header("from") @@ -483,11 +508,10 @@ class MelOutput: expd = msg.get_header("from") account, _, _ = self.engine.get_location(msg) - summary = '{} ({})'.format(html.escape(expd), account) + summary = "{} ({})".format(html.escape(expd), account) body = html.escape(subject) - cmd = ["notify-send", "-u", "low", "-i", - "mail-message-new", summary, body] - print(' '.join(cmd)) + cmd = ["notify-send", "-u", "low", "-i", "mail-message-new", summary, body] + print(" ".join(cmd)) subprocess.run(cmd, check=False) def notify_all(self) -> None: @@ -497,12 +521,26 @@ class MelOutput: since it should be marked as processed right after. """ nb_msgs = self.engine.apply_msgs( - 'tag:unread and tag:unprocessed', self.notify_msg) + "tag:unread and tag:unprocessed", self.notify_msg + ) if nb_msgs > 0: - self.log.info( - "Playing notification sound (%d new message(s))", nb_msgs) - cmd = ["play", "-n", "synth", "sine", "E4", "sine", "A5", - "remix", "1-2", "fade", "0.5", "1.2", "0.5", "2"] + self.log.info("Playing notification sound (%d new message(s))", nb_msgs) + cmd = [ + "play", + "-n", + "synth", + "sine", + "E4", + "sine", + "A5", + "remix", + "1-2", + "fade", + "0.5", + "1.2", + "0.5", + "2", + ] subprocess.run(cmd, check=False) @staticmethod @@ -510,46 +548,62 @@ class MelOutput: """ Return split header values in a contiguous string. """ - return val.replace('\n', '').replace('\t', '').strip() + return val.replace("\n", "").replace("\t", "").strip() - PART_MULTI_FORMAT = colorama.Fore.BLUE + \ - '{count} {indent}+ {typ}' + colorama.Style.RESET_ALL - PART_LEAF_FORMAT = colorama.Fore.BLUE + \ - '{count} {indent}→ {desc} ({typ}; {size})' + \ - colorama.Style.RESET_ALL + PART_MULTI_FORMAT = ( + colorama.Fore.BLUE + "{count} {indent}+ {typ}" + colorama.Style.RESET_ALL + ) + PART_LEAF_FORMAT = ( + colorama.Fore.BLUE + + "{count} {indent}→ {desc} ({typ}; {size})" + + colorama.Style.RESET_ALL + ) - def show_parts_tree(self, part: email.message.Message, - depth: int = 0, count: int = 1) -> int: + def show_parts_tree( + self, part: email.message.Message, depth: int = 0, count: int = 1 + ) -> int: """ Show a tree of the parts contained in a message. Return the number of parts of the mesage. """ - indent = depth * '\t' + indent = depth * "\t" typ = part.get_content_type() if part.is_multipart(): - print(MelOutput.PART_MULTI_FORMAT.format( - count=count, indent=indent, typ=typ)) + print( + MelOutput.PART_MULTI_FORMAT.format(count=count, indent=indent, typ=typ) + ) payl = part.get_payload() assert isinstance(payl, list) size = 1 for obj in payl: - size += self.show_parts_tree(obj, depth=depth+1, - count=count+size) + size += self.show_parts_tree(obj, depth=depth + 1, count=count + size) return size payl = part.get_payload(decode=True) assert isinstance(payl, bytes) size = len(payl) - desc = part.get('Content-Description', '') - print(MelOutput.PART_LEAF_FORMAT.format( - count=count, indent=indent, typ=typ, desc=desc, - size=MelOutput.sizeof_fmt(size))) + desc = part.get("Content-Description", "") + print( + MelOutput.PART_LEAF_FORMAT.format( + count=count, + indent=indent, + typ=typ, + desc=desc, + size=MelOutput.sizeof_fmt(size), + ) + ) return 1 INTERESTING_HEADERS = ["Date", "From", "Subject", "To", "Cc", "Message-Id"] - HEADER_FORMAT = colorama.Fore.BLUE + colorama.Style.BRIGHT + \ - '{}:' + colorama.Style.NORMAL + ' {}' + colorama.Style.RESET_ALL + HEADER_FORMAT = ( + colorama.Fore.BLUE + + colorama.Style.BRIGHT + + "{}:" + + colorama.Style.NORMAL + + " {}" + + colorama.Style.RESET_ALL + ) def read_msg(self, msg: notmuch.Message) -> None: """ @@ -558,7 +612,7 @@ class MelOutput: # Parse filename = msg.get_filename() parser = email.parser.BytesParser() - with open(filename, 'rb') as filedesc: + with open(filename, "rb") as filedesc: mail = parser.parse(filedesc) # Defects @@ -591,12 +645,13 @@ class MelOutput: print(payl.decode()) else: # TODO Use nametemplate from mailcap - temp_file = '/tmp/melcap.html' # TODO Real temporary file + temp_file = "/tmp/melcap.html" # TODO Real temporary file # TODO FIFO if possible - with open(temp_file, 'wb') as temp_filedesc: + with open(temp_file, "wb") as temp_filedesc: temp_filedesc.write(payl) command, _ = mailcap.findmatch( - self.caps, part.get_content_type(), key='view', filename=temp_file) + self.caps, part.get_content_type(), key="view", filename=temp_file + ) if command: os.system(command) @@ -611,21 +666,26 @@ class MelOutput: line += arb[0].replace("'", "\\'") line += colorama.Fore.LIGHTBLACK_EX for inter in arb[1:-1]: - line += '/' + inter.replace("'", "\\'") - line += '/' + colorama.Fore.WHITE + arb[-1].replace("'", "\\'") + line += "/" + inter.replace("'", "\\'") + line += "/" + colorama.Fore.WHITE + arb[-1].replace("'", "\\'") line += colorama.Fore.LIGHTBLACK_EX + "'" line += colorama.Style.RESET_ALL print(line) -class MelCLI(): +class MelCLI: """ Handles the user input and run asked operations. """ + VERBOSITY_LEVELS = ["DEBUG", "INFO", "WARNING", "ERROR", "FATAL"] - def apply_msgs_input(self, argmessages: typing.List[str], - action: typing.Callable, write: bool = False) -> None: + def apply_msgs_input( + self, + argmessages: typing.List[str], + action: typing.Callable, + write: bool = False, + ) -> None: """ Run a function on the message given by the user. """ @@ -633,7 +693,7 @@ class MelCLI(): if not argmessages: from_stdin = not sys.stdin.isatty() if argmessages: - from_stdin = len(argmessages) == 1 and argmessages == '-' + from_stdin = len(argmessages) == 1 and argmessages == "-" messages = list() if from_stdin: @@ -646,9 +706,11 @@ class MelCLI(): else: for uids in argmessages: if len(uids) > 12: - self.log.warning("Might have forgotten some spaces " - "between the UIDs. Don't worry, I'll " - "split them for you") + self.log.warning( + "Might have forgotten some spaces " + "between the UIDs. Don't worry, I'll " + "split them for you" + ) for uid in MelOutput.chunks(uids, 12): if not MelEngine.is_uid(uid): self.log.error("Not an UID: %s", uid) @@ -656,48 +718,52 @@ class MelCLI(): messages.append(uid) for message in messages: - query_str = f'tag:tuid{message}' - nb_msgs = self.engine.apply_msgs(query_str, action, - write=write, close_db=False) + query_str = f"tag:tuid{message}" + nb_msgs = self.engine.apply_msgs( + query_str, action, write=write, close_db=False + ) if nb_msgs < 1: - self.log.error( - "Couldn't execute function for message %s", message) + self.log.error("Couldn't execute function for message %s", message) self.engine.close_database() def operation_default(self) -> None: """ Default operation: list all message in the inbox """ - self.engine.apply_msgs('tag:inbox', self.output.print_msg) + self.engine.apply_msgs("tag:inbox", self.output.print_msg) def operation_inbox(self) -> None: """ Inbox operation: list all message in the inbox, possibly only the unread ones. """ - query_str = 'tag:unread' if self.args.only_unread else 'tag:inbox' + query_str = "tag:unread" if self.args.only_unread else "tag:inbox" self.engine.apply_msgs(query_str, self.output.print_msg) def operation_flag(self) -> None: """ Flag operation: Flag user selected messages. """ + def flag_msg(msg: notmuch.Message) -> None: """ Flag given message. """ - msg.add_tag('flagged') + msg.add_tag("flagged") + self.apply_msgs_input(self.args.message, flag_msg, write=True) def operation_unflag(self) -> None: """ Unflag operation: Flag user selected messages. """ + def unflag_msg(msg: notmuch.Message) -> None: """ Unflag given message. """ - msg.remove_tag('flagged') + msg.remove_tag("flagged") + self.apply_msgs_input(self.args.message, unflag_msg, write=True) def operation_read(self) -> None: @@ -712,8 +778,7 @@ class MelCLI(): """ # Fetch mails self.log.info("Fetching mails") - mbsync_config_file = os.path.expanduser( - "~/.config/mbsyncrc") # TODO Better + mbsync_config_file = os.path.expanduser("~/.config/mbsyncrc") # TODO Better cmd = ["mbsync", "--config", mbsync_config_file, "--all"] subprocess.run(cmd, check=False) @@ -724,8 +789,9 @@ class MelCLI(): self.output.notify_all() # Tag new mails - self.engine.apply_msgs('tag:unprocessed', self.engine.retag_msg, - show_progress=True, write=True) + self.engine.apply_msgs( + "tag:unprocessed", self.engine.retag_msg, show_progress=True, write=True + ) def operation_list(self) -> None: """ @@ -744,14 +810,15 @@ class MelCLI(): Retag operation: Manually retag all the mails in the database. Mostly debug I suppose. """ - self.engine.apply_msgs('*', self.engine.retag_msg, - show_progress=True, write=True) + self.engine.apply_msgs( + "*", self.engine.retag_msg, show_progress=True, write=True + ) def operation_all(self) -> None: """ All operation: list every single message. """ - self.engine.apply_msgs('*', self.output.print_msg) + self.engine.apply_msgs("*", self.output.print_msg) def add_subparsers(self) -> None: """ @@ -766,29 +833,30 @@ class MelCLI(): # inbox (default) parser_inbox = subparsers.add_parser( - "inbox", help="Show unread, unsorted and flagged messages") - parser_inbox.add_argument('-u', '--only-unread', action='store_true', - help="Show unread messages only") + "inbox", help="Show unread, unsorted and flagged messages" + ) + parser_inbox.add_argument( + "-u", "--only-unread", action="store_true", help="Show unread messages only" + ) # TODO Make this more relevant parser_inbox.set_defaults(operation=self.operation_inbox) # list folder [--recurse] # List actions - parser_list = subparsers.add_parser( - "list", help="List all folders") + parser_list = subparsers.add_parser("list", help="List all folders") # parser_list.add_argument('message', nargs='*', help="Messages") parser_list.set_defaults(operation=self.operation_list) # flag msg... - parser_flag = subparsers.add_parser( - "flag", help="Mark messages as flagged") - parser_flag.add_argument('message', nargs='*', help="Messages") + parser_flag = subparsers.add_parser("flag", help="Mark messages as flagged") + parser_flag.add_argument("message", nargs="*", help="Messages") parser_flag.set_defaults(operation=self.operation_flag) # unflag msg... parser_unflag = subparsers.add_parser( - "unflag", help="Mark messages as not-flagged") - parser_unflag.add_argument('message', nargs='*', help="Messages") + "unflag", help="Mark messages as not-flagged" + ) + parser_unflag.add_argument("message", nargs="*", help="Messages") parser_unflag.set_defaults(operation=self.operation_unflag) # delete msg... @@ -799,7 +867,7 @@ class MelCLI(): # read msg [--html] [--plain] [--browser] parser_read = subparsers.add_parser("read", help="Read message") - parser_read.add_argument('message', nargs=1, help="Messages") + parser_read.add_argument("message", nargs=1, help="Messages") parser_read.set_defaults(operation=self.operation_read) # attach msg [id] [--save] (list if no id, xdg-open else) @@ -817,20 +885,23 @@ class MelCLI(): # fetch (mbsync, notmuch new, retag, notify; called by greater gods) parser_fetch = subparsers.add_parser( - "fetch", help="Fetch mail, tag them, and run notifications") + "fetch", help="Fetch mail, tag them, and run notifications" + ) parser_fetch.set_defaults(operation=self.operation_fetch) # Debug # debug (various) parser_debug = subparsers.add_parser( - "debug", help="Who know what this holds...") - parser_debug.set_defaults(verbosity='DEBUG') + "debug", help="Who know what this holds..." + ) + parser_debug.set_defaults(verbosity="DEBUG") parser_debug.set_defaults(operation=self.operation_debug) # retag (all or unprocessed) parser_retag = subparsers.add_parser( - "retag", help="Retag all mails (when you changed configuration)") + "retag", help="Retag all mails (when you changed configuration)" + ) parser_retag.set_defaults(operation=self.operation_retag) # all @@ -842,15 +913,21 @@ class MelCLI(): Create the main parser that will handle the user arguments. """ parser = argparse.ArgumentParser(description="Meh mail client") - parser.add_argument('-v', '--verbosity', - choices=MelCLI.VERBOSITY_LEVELS, default='WARNING', - help="Verbosity of self.log messages") + parser.add_argument( + "-v", + "--verbosity", + choices=MelCLI.VERBOSITY_LEVELS, + default="WARNING", + help="Verbosity of self.log messages", + ) # parser.add_argument('-n', '--dry-run', action='store_true', # help="Don't do anything") # DEBUG default_config_file = os.path.join( - xdg.BaseDirectory.xdg_config_home, 'mel', 'accounts.conf') - parser.add_argument('-c', '--config', default=default_config_file, - help="Accounts config file") + xdg.BaseDirectory.xdg_config_home, "mel", "accounts.conf" + ) + parser.add_argument( + "-c", "--config", default=default_config_file, help="Accounts config file" + ) return parser def __init__(self) -> None: @@ -860,8 +937,9 @@ class MelCLI(): self.add_subparsers() self.args = self.parser.parse_args() - coloredlogs.install(level=self.args.verbosity, - fmt='%(levelname)s %(name)s %(message)s') + coloredlogs.install( + level=self.args.verbosity, fmt="%(levelname)s %(name)s %(message)s" + ) self.engine = MelEngine(self.args.config) self.output = MelOutput(self.engine) diff --git a/config/scripts/melConf b/config/scripts/melConf index 3b78fee..fa795b9 100755 --- a/config/scripts/melConf +++ b/config/scripts/melConf @@ -14,7 +14,7 @@ import sys # TODO Write in .config or .cache /mel # TODO Fix IMAPS with mbsync -configPath = os.path.join(os.path.expanduser('~'), '.config', 'mel', 'accounts.conf') +configPath = os.path.join(os.path.expanduser("~"), ".config", "mel", "accounts.conf") config = configparser.ConfigParser() config.read(configPath) @@ -23,9 +23,9 @@ storageFull = os.path.realpath(os.path.expanduser(config["GENERAL"]["storage"])) config["GENERAL"]["storage"] = storageFull SERVER_DEFAULTS = { - "imap": {"port": 143, "starttls": True}, - "smtp": {"port": 587, "starttls": True}, - } + "imap": {"port": 143, "starttls": True}, + "smtp": {"port": 587, "starttls": True}, +} SERVER_ITEMS = {"host", "port", "user", "pass", "starttls"} ACCOUNT_DEFAULTS = { "color": "#FFFFFF", @@ -53,7 +53,11 @@ for name in config.sections(): for item in SERVER_ITEMS: key = server + item try: - val = section.get(key) or section.get(item) or SERVER_DEFAULTS[server][item] + val = ( + section.get(key) + or section.get(item) + or SERVER_DEFAULTS[server][item] + ) except KeyError: raise KeyError("{}.{}".format(name, key)) @@ -71,7 +75,7 @@ for name in config.sections(): continue data[key] = section[key] - for k, v in config['DEFAULT'].items(): + for k, v in config["DEFAULT"].items(): if k not in data: data[k] = v @@ -85,7 +89,7 @@ for name in config.sections(): mails.add(alt) data["account"] = name - data["storage"] = os.path.join(config['GENERAL']['storage'], name) + data["storage"] = os.path.join(config["GENERAL"]["storage"], name) data["storageInbox"] = os.path.join(data["storage"], "INBOX") accounts[name] = data @@ -139,7 +143,7 @@ remotepass = {imappass} """ -offlineIMAPstr = OFFLINEIMAP_BEGIN.format(','.join(accounts), len(accounts)) +offlineIMAPstr = OFFLINEIMAP_BEGIN.format(",".join(accounts), len(accounts)) for name, account in accounts.items(): if account["imapstarttls"]: secconf = "ssl = no" @@ -182,9 +186,11 @@ for name, account in accounts.items(): if "certificate" in account: secconf += "\nCertificateFile {certificate}".format(**account) imappassEscaped = account["imappass"].replace("\\", "\\\\") - mbsyncStr += MBSYNC_ACCOUNT.format(**account, secconf=secconf, imappassEscaped=imappassEscaped) -mbsyncFilepath = os.path.join(os.path.expanduser('~'), '.config/mel/mbsyncrc') -with open(mbsyncFilepath, 'w') as f: + mbsyncStr += MBSYNC_ACCOUNT.format( + **account, secconf=secconf, imappassEscaped=imappassEscaped + ) +mbsyncFilepath = os.path.join(os.path.expanduser("~"), ".config/mel/mbsyncrc") +with open(mbsyncFilepath, "w") as f: f.write(mbsyncStr) # msmtp @@ -208,8 +214,8 @@ tls on msmtpStr = MSMTP_BEGIN for name, account in accounts.items(): msmtpStr += MSMTP_ACCOUNT.format(**account) -mbsyncFilepath = os.path.join(os.path.expanduser('~'), '.config/msmtp/config') -with open(mbsyncFilepath, 'w') as f: +mbsyncFilepath = os.path.join(os.path.expanduser("~"), ".config/msmtp/config") +with open(mbsyncFilepath, "w") as f: f.write(msmtpStr) @@ -241,8 +247,8 @@ other_email = mails.copy() other_email.remove(general["main"]["from"]) other_email = ";".join(other_email) notmuchStr = NOTMUCH_BEGIN.format(**general, other_email=other_email) -mbsyncFilepath = os.path.join(os.path.expanduser('~'), '.config/notmuch-config') -with open(mbsyncFilepath, 'w') as f: +mbsyncFilepath = os.path.join(os.path.expanduser("~"), ".config/notmuch-config") +with open(mbsyncFilepath, "w") as f: f.write(notmuchStr) # mutt (temp) @@ -254,15 +260,15 @@ mailboxesStr = MAILBOXES_BEGIN for name, account in accounts.items(): lines = "-" * (20 - len(name)) mailboxesStr += f' "+{name}{lines}"' - for root, dirs, files in os.walk(account['storage']): + for root, dirs, files in os.walk(account["storage"]): if "cur" not in dirs or "new" not in dirs or "tmp" not in dirs: continue assert root.startswith(storageFull) - path = root[len(storageFull)+1:] + path = root[len(storageFull) + 1 :] mailboxesStr += f' "+{path}"' mailboxesStr += "\n" -mailboxesFilepath = os.path.join(os.path.expanduser('~'), '.mutt/mailboxes') -with open(mailboxesFilepath, 'w') as f: +mailboxesFilepath = os.path.join(os.path.expanduser("~"), ".mutt/mailboxes") +with open(mailboxesFilepath, "w") as f: f.write(mailboxesStr) ## accounts @@ -296,14 +302,14 @@ for name, account in accounts.items(): muttStr = MUTT_ACCOUNT.format(**account) # Config - muttFilepath = os.path.join(os.path.expanduser('~'), f'.mutt/accounts/{name}') - with open(muttFilepath, 'w') as f: + muttFilepath = os.path.join(os.path.expanduser("~"), f".mutt/accounts/{name}") + with open(muttFilepath, "w") as f: f.write(muttStr) # Signature sigStr = account.get("sig", account.get("name", "")) - sigFilepath = os.path.join(os.path.expanduser('~'), f'.mutt/accounts/{name}.sig') - with open(sigFilepath, 'w') as f: + sigFilepath = os.path.join(os.path.expanduser("~"), f".mutt/accounts/{name}.sig") + with open(sigFilepath, "w") as f: f.write(sigStr) MUTT_SELECTOR = """ @@ -324,13 +330,15 @@ hooks = "" for name, account in accounts.items(): hooks += f"folder-hook {name}/* source ~/.mutt/accounts/{name}\n" selectStr += MUTT_SELECTOR.format(**general, hooks=hooks) -selectFilepath = os.path.join(os.path.expanduser('~'), '.mutt/muttrc') -with open(selectFilepath, 'w') as f: +selectFilepath = os.path.join(os.path.expanduser("~"), ".mutt/muttrc") +with open(selectFilepath, "w") as f: f.write(selectStr) ## Color for name, account in accounts.items(): # Config - colorFilepath = os.path.join(os.path.expanduser('~'), f'{general["storage"]}/{name}/color') - with open(colorFilepath, 'w') as f: - f.write(account['color']) + colorFilepath = os.path.join( + os.path.expanduser("~"), f'{general["storage"]}/{name}/color' + ) + with open(colorFilepath, "w") as f: + f.write(account["color"]) diff --git a/config/scripts/music_remove_dashes b/config/scripts/music_remove_dashes index 2a1ba1e..2693a5b 100755 --- a/config/scripts/music_remove_dashes +++ b/config/scripts/music_remove_dashes @@ -16,17 +16,17 @@ def main() -> None: """ Function that executes the script. """ - for root, _, files in os.walk('.'): + for root, _, files in os.walk("."): for filename in files: - match = re.match(r'^(\d+) - (.+)$', filename) + match = re.match(r"^(\d+) - (.+)$", filename) if not match: continue new_filename = f"{match[1]} {match[2]}" old_path = os.path.join(root, filename) new_path = os.path.join(root, new_filename) - print(old_path, '->', new_path) + print(old_path, "->", new_path) os.rename(old_path, new_path) -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/config/scripts/musiqueBof b/config/scripts/musiqueBof index b0f9658..44a490d 100755 --- a/config/scripts/musiqueBof +++ b/config/scripts/musiqueBof @@ -6,7 +6,7 @@ import shutil import logging import coloredlogs -coloredlogs.install(level='DEBUG', fmt='%(levelname)s %(message)s') +coloredlogs.install(level="DEBUG", fmt="%(levelname)s %(message)s") log = logging.getLogger() MUSICS_FOLDER = os.path.join(os.path.expanduser("~"), "Musique") @@ -36,4 +36,3 @@ for f in sys.argv[1:]: log.info("{} → {}".format(src, dst)) os.makedirs(dstFolder, exist_ok=True) shutil.move(src, dst) - diff --git a/config/scripts/ovhcli b/config/scripts/ovhcli index f081737..8565e82 100755 --- a/config/scripts/ovhcli +++ b/config/scripts/ovhcli @@ -10,15 +10,17 @@ import logging import coloredlogs import argparse -coloredlogs.install(level='DEBUG', fmt='%(levelname)s %(message)s') +coloredlogs.install(level="DEBUG", fmt="%(levelname)s %(message)s") log = logging.getLogger() debug = None -class OvhCli(): + +class OvhCli: ROOT = "https://api.ovh.com/1.0?null" + def __init__(self): - self.cacheDir = os.path.join(xdg.BaseDirectory.xdg_cache_home, 'ovhcli') + self.cacheDir = os.path.join(xdg.BaseDirectory.xdg_cache_home, "ovhcli") # TODO Corner cases: links, cache dir not done, configurable cache if not os.path.isdir(self.cacheDir): assert not os.path.exists(self.cacheDir) @@ -26,18 +28,18 @@ class OvhCli(): def updateCache(self): log.info("Downloading the API description") - rootJsonPath = os.path.join(self.cacheDir, 'root.json') + rootJsonPath = os.path.join(self.cacheDir, "root.json") log.debug(f"{self.ROOT} -> {rootJsonPath}") urllib.request.urlretrieve(self.ROOT, rootJsonPath) - with open(rootJsonPath, 'rt') as rootJson: + with open(rootJsonPath, "rt") as rootJson: root = json.load(rootJson) - basePath = root['basePath'] + basePath = root["basePath"] - for apiRoot in root['apis']: - fmt = 'json' - assert fmt in apiRoot['format'] - path = apiRoot['path'] - schema = apiRoot['schema'].format(format=fmt, path=path) + for apiRoot in root["apis"]: + fmt = "json" + assert fmt in apiRoot["format"] + path = apiRoot["path"] + schema = apiRoot["schema"].format(format=fmt, path=path) apiJsonPath = os.path.join(self.cacheDir, schema[1:]) apiJsonUrl = basePath + schema log.debug(f"{apiJsonUrl} -> {apiJsonPath}") @@ -47,11 +49,11 @@ class OvhCli(): urllib.request.urlretrieve(apiJsonUrl, apiJsonPath) def createParser(self): - parser = argparse.ArgumentParser(description='Access the OVH API') + parser = argparse.ArgumentParser(description="Access the OVH API") return parser -if __name__ == '__main__': +if __name__ == "__main__": cli = OvhCli() # cli.updateCache() parser = cli.createParser() diff --git a/config/scripts/picture_name_date b/config/scripts/picture_name_date index f82a394..55cf027 100755 --- a/config/scripts/picture_name_date +++ b/config/scripts/picture_name_date @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 import os import re @@ -9,16 +9,16 @@ import PIL.ExifTags import PIL.Image import progressbar -EXTENSION_PATTERN = re.compile(r'\.JPE?G', re.I) -COMMON_PATTERN = re.compile(r'(IMG|DSC[NF]?|100|P10|f|t)_?\d+', re.I) -EXIF_TAG_NAME = 'DateTimeOriginal' -EXIF_TAG_ID = list(PIL.ExifTags.TAGS.keys())[list( - PIL.ExifTags.TAGS.values()).index(EXIF_TAG_NAME)] -EXIF_DATE_FORMAT = '%Y:%m:%d %H:%M:%S' +EXTENSION_PATTERN = re.compile(r"\.JPE?G", re.I) +COMMON_PATTERN = re.compile(r"(IMG|DSC[NF]?|100|P10|f|t)_?\d+", re.I) +EXIF_TAG_NAME = "DateTimeOriginal" +EXIF_TAG_ID = list(PIL.ExifTags.TAGS.keys())[ + list(PIL.ExifTags.TAGS.values()).index(EXIF_TAG_NAME) +] +EXIF_DATE_FORMAT = "%Y:%m:%d %H:%M:%S" -def get_pictures(directory: str = ".", skip_renamed: bool = True) \ - -> typing.Generator: +def get_pictures(directory: str = ".", skip_renamed: bool = True) -> typing.Generator: for root, _, files in os.walk(directory): for filename in files: filename_trunk, extension = os.path.splitext(filename) @@ -44,9 +44,9 @@ def main() -> None: if exif_data and EXIF_TAG_ID in exif_data: date_raw = exif_data[EXIF_TAG_ID] date = datetime.datetime.strptime(date_raw, EXIF_DATE_FORMAT) - new_name = date.isoformat().replace(':', '-') + '.jpg' # For NTFS + new_name = date.isoformat().replace(":", "-") + ".jpg" # For NTFS print(full_path, new_name) - os.rename(full_path, new_name) # TODO FOLDER + os.rename(full_path, new_name) # TODO FOLDER img.close() diff --git a/config/scripts/pushToTalk b/config/scripts/pushToTalk index bd2dff3..dc6e6e3 100755 --- a/config/scripts/pushToTalk +++ b/config/scripts/pushToTalk @@ -9,14 +9,16 @@ from Xlib.protocol import rq KEY = XK.XK_F7 + def mute(state): - with pulsectl.Pulse('list-source') as pulse: + with pulsectl.Pulse("list-source") as pulse: for source in pulse.source_list(): if source.port_active: if source.mute != state: pulse.mute(source, state) print(f"{source.name} {'un' if not state else ''}muted") + mute(True) local_dpy = display.Display() @@ -36,7 +38,8 @@ def record_callback(reply): data = reply.data while len(data): event, data = rq.EventField(None).parse_binary_value( - data, record_dpy.display, None, None) + data, record_dpy.display, None, None + ) if event.type in [X.KeyPress, X.KeyRelease]: keysym = local_dpy.keycode_to_keysym(event.detail, 0) @@ -45,29 +48,32 @@ def record_callback(reply): if keysym == KEY: mute(event.type == X.KeyRelease) + # Check if the extension is present if not record_dpy.has_extension("RECORD"): print("RECORD extension not found") sys.exit(1) r = record_dpy.record_get_version(0, 0) - print("RECORD extension version %d.%d" % - (r.major_version, r.minor_version)) + print("RECORD extension version %d.%d" % (r.major_version, r.minor_version)) # Create a recording context; we only want key and mouse events ctx = record_dpy.record_create_context( 0, [record.AllClients], - [{ - 'core_requests': (0, 0), - 'core_replies': (0, 0), - 'ext_requests': (0, 0, 0, 0), - 'ext_replies': (0, 0, 0, 0), - 'delivered_events': (0, 0), - 'device_events': (X.KeyPress, X.MotionNotify), - 'errors': (0, 0), - 'client_started': False, - 'client_died': False, - }]) + [ + { + "core_requests": (0, 0), + "core_replies": (0, 0), + "ext_requests": (0, 0, 0, 0), + "ext_replies": (0, 0, 0, 0), + "delivered_events": (0, 0), + "device_events": (X.KeyPress, X.MotionNotify), + "errors": (0, 0), + "client_started": False, + "client_died": False, + } + ], +) # Enable the context; this only returns after a call to record_disable_context, # while calling the callback function in the meantime diff --git a/config/scripts/replayGain b/config/scripts/replayGain index 25545a8..9f48e35 100755 --- a/config/scripts/replayGain +++ b/config/scripts/replayGain @@ -14,15 +14,15 @@ import typing import coloredlogs import r128gain -coloredlogs.install(level='DEBUG', fmt='%(levelname)s %(message)s') +coloredlogs.install(level="DEBUG", fmt="%(levelname)s %(message)s") log = logging.getLogger() # TODO Remove debug # Constants -FORCE = '-f' in sys.argv +FORCE = "-f" in sys.argv if FORCE: - sys.argv.remove('-f') + sys.argv.remove("-f") if len(sys.argv) >= 2: SOURCE_FOLDER = os.path.realpath(sys.argv[1]) else: @@ -66,6 +66,5 @@ for album in albums: if not musicFiles: continue - r128gain.process(musicFiles, album_gain=True, - skip_tagged=not FORCE, report=True) + r128gain.process(musicFiles, album_gain=True, skip_tagged=not FORCE, report=True) print("==============================") diff --git a/config/scripts/rmf b/config/scripts/rmf index 3828b2c..d90c2b2 100755 --- a/config/scripts/rmf +++ b/config/scripts/rmf @@ -13,7 +13,7 @@ import progressbar import logging progressbar.streams.wrap_stderr() -coloredlogs.install(level='INFO', fmt='%(levelname)s %(message)s') +coloredlogs.install(level="INFO", fmt="%(levelname)s %(message)s") log = logging.getLogger() # 1) Create file list with conflict files @@ -21,21 +21,20 @@ log = logging.getLogger() # 3) Propose what to do -def sizeof_fmt(num, suffix='B'): +def sizeof_fmt(num, suffix="B"): # Stolen from https://stackoverflow.com/a/1094933 - for unit in ['', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi']: + for unit in ["", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"]: if abs(num) < 1024.0: return "%3.1f %s%s" % (num, unit, suffix) num /= 1024.0 - return "%.1f %s%s" % (num, 'Yi', suffix) + return "%.1f %s%s" % (num, "Yi", suffix) -class Table(): +class Table: def __init__(self, width, height): self.width = width self.height = height - self.data = [['' for _ in range(self.height)] - for _ in range(self.width)] + self.data = [["" for _ in range(self.height)] for _ in range(self.width)] def set(self, x, y, data): self.data[x][y] = str(data) @@ -48,15 +47,15 @@ class Table(): l = len(cell) width = widths[x] if x > 0: - cell = ' | ' + cell - cell = cell + ' ' * (width - l) - print(cell, end='\t') + cell = " | " + cell + cell = cell + " " * (width - l) + print(cell, end="\t") print() -class Database(): +class Database: VERSION = 1 - CONFLICT_PATTERN = re.compile('\.sync-conflict-\d{8}-\d{6}-\w{7}') + CONFLICT_PATTERN = re.compile("\.sync-conflict-\d{8}-\d{6}-\w{7}") def __init__(self, directory): self.version = Database.VERSION @@ -83,18 +82,25 @@ class Database(): return sum(databaseFile.maxSize() for databaseFile in self.data.values()) def totalChecksumSize(self): - return sum(databaseFile.totalChecksumSize() for databaseFile in self.data.values()) + return sum( + databaseFile.totalChecksumSize() for databaseFile in self.data.values() + ) def getList(self): self.prune() log.info("Finding conflict files") widgets = [ - progressbar.AnimatedMarker(), ' ', - progressbar.BouncingBar(), ' ', - progressbar.DynamicMessage('conflicts'), ' ', - progressbar.DynamicMessage('files'), ' ', - progressbar.DynamicMessage('dir', width=20, precision=20), ' ', + progressbar.AnimatedMarker(), + " ", + progressbar.BouncingBar(), + " ", + progressbar.DynamicMessage("conflicts"), + " ", + progressbar.DynamicMessage("files"), + " ", + progressbar.DynamicMessage("dir", width=20, precision=20), + " ", progressbar.Timer(), ] bar = progressbar.ProgressBar(widgets=widgets).start() @@ -104,7 +110,7 @@ class Database(): f += 1 if not Database.CONFLICT_PATTERN.search(conflictFilename): continue - filename = Database.CONFLICT_PATTERN.sub('', conflictFilename) + filename = Database.CONFLICT_PATTERN.sub("", conflictFilename) key = (root, filename) if key in self.data: dataFile = self.data[key] @@ -116,11 +122,13 @@ class Database(): dataFile.addConflict(filename) dataFile.addConflict(conflictFilename) - bar.update(conflicts=len(self.data), files=f, - dir=root[(len(self.directory)+1):]) + bar.update( + conflicts=len(self.data), files=f, dir=root[(len(self.directory) + 1) :] + ) bar.finish() log.info( - f"Found {len(self.data)} conflicts, totalling {self.nbFiles()} conflict files.") + f"Found {len(self.data)} conflicts, totalling {self.nbFiles()} conflict files." + ) def getStats(self): log.info("Getting stats from conflict files") @@ -132,25 +140,38 @@ class Database(): bar.update(f) bar.finish() log.info( - f"Total file size: {sizeof_fmt(self.totalSize())}, possible save: {sizeof_fmt(self.totalSize() - self.maxSize())}") + f"Total file size: {sizeof_fmt(self.totalSize())}, possible save: {sizeof_fmt(self.totalSize() - self.maxSize())}" + ) def getChecksums(self): log.info("Checksumming conflict files") widgets = [ - progressbar.DataSize(), ' of ', progressbar.DataSize('max_value'), - ' (', progressbar.AdaptiveTransferSpeed(), ') ', - progressbar.Bar(), ' ', - progressbar.DynamicMessage('dir', width=20, precision=20), ' ', - progressbar.DynamicMessage('file', width=20, precision=20), ' ', - progressbar.Timer(), ' ', + progressbar.DataSize(), + " of ", + progressbar.DataSize("max_value"), + " (", + progressbar.AdaptiveTransferSpeed(), + ") ", + progressbar.Bar(), + " ", + progressbar.DynamicMessage("dir", width=20, precision=20), + " ", + progressbar.DynamicMessage("file", width=20, precision=20), + " ", + progressbar.Timer(), + " ", progressbar.AdaptiveETA(), ] bar = progressbar.DataTransferBar( - max_value=self.totalChecksumSize(), widgets=widgets).start() + max_value=self.totalChecksumSize(), widgets=widgets + ).start() f = 0 for databaseFile in self.data.values(): - bar.update(f, dir=databaseFile.root[( - len(self.directory)+1):], file=databaseFile.filename) + bar.update( + f, + dir=databaseFile.root[(len(self.directory) + 1) :], + file=databaseFile.filename, + ) f += databaseFile.totalChecksumSize() try: databaseFile.getChecksums() @@ -172,9 +193,9 @@ class Database(): databaseFile.takeAction(execute=execute) -class DatabaseFile(): +class DatabaseFile: BLOCK_SIZE = 4096 - RELEVANT_STATS = ('st_mode', 'st_uid', 'st_gid', 'st_size', 'st_mtime') + RELEVANT_STATS = ("st_mode", "st_uid", "st_gid", "st_size", "st_mtime") def __init__(self, root, filename): self.root = root @@ -260,7 +281,15 @@ class DatabaseFile(): oldChecksum = self.checksums[f] # If it's been already summed, and we have the same inode and same ctime, don't resum - if oldStat is None or not isinstance(oldChecksum, int) or oldStat.st_size != newStat.st_size or oldStat.st_dev != newStat.st_dev or oldStat.st_ino != newStat.st_ino or oldStat.st_ctime != newStat.st_ctime or oldStat.st_dev != newStat.st_dev: + if ( + oldStat is None + or not isinstance(oldChecksum, int) + or oldStat.st_size != newStat.st_size + or oldStat.st_dev != newStat.st_dev + or oldStat.st_ino != newStat.st_ino + or oldStat.st_ctime != newStat.st_ctime + or oldStat.st_dev != newStat.st_dev + ): self.checksums[f] = None self.stats[f] = newStat @@ -270,7 +299,10 @@ class DatabaseFile(): self.checksums = [False] * len(self.conflicts) # If all the files are the same inode, set as same files - if len(set([s.st_ino for s in self.stats])) == 1 and len(set([s.st_dev for s in self.stats])) == 1: + if ( + len(set([s.st_ino for s in self.stats])) == 1 + and len(set([s.st_dev for s in self.stats])) == 1 + ): self.checksums = [True] * len(self.conflicts) def getChecksums(self): @@ -282,7 +314,7 @@ class DatabaseFile(): if self.checksums[f] is not None: continue self.checksums[f] = 1 - filedescs[f] = open(self.getPath(conflict), 'rb') + filedescs[f] = open(self.getPath(conflict), "rb") while len(filedescs): toClose = set() @@ -305,12 +337,13 @@ class DatabaseFile(): def getFeatures(self): features = dict() - features['name'] = self.conflicts - features['sum'] = self.checksums + features["name"] = self.conflicts + features["sum"] = self.checksums for statName in DatabaseFile.RELEVANT_STATS: # Rounding beause I Syncthing also rounds features[statName] = [ - int(stat.__getattribute__(statName)) for stat in self.stats] + int(stat.__getattribute__(statName)) for stat in self.stats + ] return features def getDiffFeatures(self): @@ -327,7 +360,7 @@ class DatabaseFile(): if match: return match[0][15:] else: - return '-' + return "-" def printInfos(self, diff=True): print(os.path.join(self.root, self.filename)) @@ -335,14 +368,13 @@ class DatabaseFile(): features = self.getDiffFeatures() else: features = self.getFeatures() - features['name'] = [DatabaseFile.shortConflict( - c) for c in self.conflicts] - table = Table(len(features), len(self.conflicts)+1) + features["name"] = [DatabaseFile.shortConflict(c) for c in self.conflicts] + table = Table(len(features), len(self.conflicts) + 1) for x, featureName in enumerate(features.keys()): table.set(x, 0, featureName) for x, featureName in enumerate(features.keys()): for y in range(len(self.conflicts)): - table.set(x, y+1, features[featureName][y]) + table.set(x, y + 1, features[featureName][y]) table.print() def decideAction(self, mostRecent=False): @@ -357,10 +389,10 @@ class DatabaseFile(): if len(features) == 1: reason = "same files" self.action = 0 - elif 'st_mtime' in features and mostRecent: - recentTime = features['st_mtime'][0] + elif "st_mtime" in features and mostRecent: + recentTime = features["st_mtime"][0] recentIndex = 0 - for index, time in enumerate(features['st_mtime']): + for index, time in enumerate(features["st_mtime"]): if time > recentTime: recentTime = time recentIndex = 0 @@ -368,11 +400,11 @@ class DatabaseFile(): reason = "most recent" if self.action is None: - log.warning( - f"{self.root}/{self.filename}: skip, cause: {reason}") + log.warning(f"{self.root}/{self.filename}: skip, cause: {reason}") else: log.info( - f"{self.root}/{self.filename}: keep {DatabaseFile.shortConflict(self.conflicts[self.action])}, cause: {reason}") + f"{self.root}/{self.filename}: keep {DatabaseFile.shortConflict(self.conflicts[self.action])}, cause: {reason}" + ) def takeAction(self, execute=False): if self.action is None: @@ -380,7 +412,8 @@ class DatabaseFile(): actionName = self.conflicts[self.action] if actionName != self.filename: log.debug( - f"Rename {self.getPath(actionName)} → {self.getPath(self.filename)}") + f"Rename {self.getPath(actionName)} → {self.getPath(self.filename)}" + ) if execute: os.rename(self.getPath(actionName), self.getPath(self.filename)) for conflict in self.conflicts: @@ -390,22 +423,33 @@ class DatabaseFile(): if execute: os.unlink(self.getPath(conflict)) + if __name__ == "__main__": parser = argparse.ArgumentParser( - description="Handle Syncthing's .sync-conflict files ") + description="Handle Syncthing's .sync-conflict files " + ) # Execution flow - parser.add_argument('directory', metavar='DIRECTORY', - nargs='?', help='Directory to analyse') - parser.add_argument('-d', '--database', - help='Database path for file informations') - parser.add_argument('-r', '--most-recent', action='store_true', - help='Always keep the most recent version') - parser.add_argument('-e', '--execute', action='store_true', - help='Really apply changes') - parser.add_argument('-p', '--print', action='store_true', - help='Only print differences between files') + parser.add_argument( + "directory", metavar="DIRECTORY", nargs="?", help="Directory to analyse" + ) + parser.add_argument("-d", "--database", help="Database path for file informations") + parser.add_argument( + "-r", + "--most-recent", + action="store_true", + help="Always keep the most recent version", + ) + parser.add_argument( + "-e", "--execute", action="store_true", help="Really apply changes" + ) + parser.add_argument( + "-p", + "--print", + action="store_true", + help="Only print differences between files", + ) args = parser.parse_args() @@ -419,13 +463,17 @@ if __name__ == "__main__": if args.database: if os.path.isfile(args.database): try: - with open(args.database, 'rb') as databaseFile: + with open(args.database, "rb") as databaseFile: database = pickle.load(databaseFile) assert isinstance(database, Database) except BaseException as e: raise ValueError("Not a database file") - assert database.version <= Database.VERSION, "Version of the loaded database is too recent" - assert database.directory == args.directory, "Directory of the loaded database doesn't match" + assert ( + database.version <= Database.VERSION + ), "Version of the loaded database is too recent" + assert ( + database.directory == args.directory + ), "Directory of the loaded database doesn't match" if database is None: database = Database(args.directory) @@ -433,7 +481,7 @@ if __name__ == "__main__": def saveDatabase(): if args.database: global database - with open(args.database, 'wb') as databaseFile: + with open(args.database, "wb") as databaseFile: pickle.dump(database, databaseFile) database.getList() diff --git a/config/scripts/smtpdummy b/config/scripts/smtpdummy index af44ad4..f8cd0aa 100755 --- a/config/scripts/smtpdummy +++ b/config/scripts/smtpdummy @@ -25,7 +25,11 @@ if __name__ == "__main__": ) parser.add_argument("-p", "--port", env_var="PORT", default=25) parser.add_argument( - "-S", "--security", env_var="SECURITY", choices=["plain", "ssl", "starttls"], default='plain' + "-S", + "--security", + env_var="SECURITY", + choices=["plain", "ssl", "starttls"], + default="plain", ) parser.add_argument("-l", "--helo", env_var="HELO") @@ -67,7 +71,7 @@ if __name__ == "__main__": args.to = args.receiver if args.password: password = args.password - args.password = '********' + args.password = "********" # Transmission content @@ -163,7 +167,7 @@ Input arguments: # Transmission - if args.security != 'starttls': + if args.security != "starttls": recv() send(f"EHLO {args.helo}") if args.user: diff --git a/config/scripts/spongebob b/config/scripts/spongebob index e285222..3152624 100755 --- a/config/scripts/spongebob +++ b/config/scripts/spongebob @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 import sys import random diff --git a/config/scripts/syncthingRestore b/config/scripts/syncthingRestore index e1b3e5f..f6b41f7 100755 --- a/config/scripts/syncthingRestore +++ b/config/scripts/syncthingRestore @@ -3,9 +3,9 @@ import os import shutil -curDir = os.path.realpath('.') -assert '.stversions/' in curDir -tgDir = curDir.replace('.stversions/', '') +curDir = os.path.realpath(".") +assert ".stversions/" in curDir +tgDir = curDir.replace(".stversions/", "") for root, dirs, files in os.walk(curDir): @@ -17,4 +17,3 @@ for root, dirs, files in os.walk(curDir): dstPath = os.path.join(dstRoot, dstF) print(f"{srcPath} → {dstPath}") shutil.copy2(srcPath, dstPath) - diff --git a/config/scripts/tagCreatorPhotos b/config/scripts/tagCreatorPhotos index 0002db7..bb50068 100755 --- a/config/scripts/tagCreatorPhotos +++ b/config/scripts/tagCreatorPhotos @@ -11,7 +11,6 @@ filenames = sys.argv[2:] for filename in filenames: assert os.path.isfile(filename) exifDict = piexif.load(filename) - exifDict['0th'][piexif.ImageIFD.Copyright] = creator.encode() + exifDict["0th"][piexif.ImageIFD.Copyright] = creator.encode() exifBytes = piexif.dump(exifDict) piexif.insert(exifBytes, filename) - diff --git a/config/scripts/ter b/config/scripts/ter index 6185e9f..ec432a5 100755 --- a/config/scripts/ter +++ b/config/scripts/ter @@ -11,22 +11,29 @@ if N < 2: print("Ben reste chez toi alors.") sys.exit(1) + def trajet_str(a, b): return f"{gares[a]} → {gares[b]}" + def chemin_str(stack): - return ", ".join([trajet_str(stack[i], stack[i+1]) for i in range(len(stack)-1)]) + return ", ".join( + [trajet_str(stack[i], stack[i + 1]) for i in range(len(stack) - 1)] + ) + # Demande des prix des trajets prices = dict() for i in range(N): - for j in range(N-1, i, -1): + for j in range(N - 1, i, -1): p = None while not isinstance(p, float): try: - p = float(input(f"Prix du trajet {trajet_str(i, j)} ? ").replace(',', '.')) + p = float( + input(f"Prix du trajet {trajet_str(i, j)} ? ").replace(",", ".") + ) except ValueError: print("C'est pas un prix ça !") if i not in prices: @@ -40,8 +47,9 @@ miniStack = None maxiPrice = -inf maxiStack = None + def register_path(stack): - price = sum([prices[stack[i]][stack[i+1]]for i in range(len(stack)-1)]) + price = sum([prices[stack[i]][stack[i + 1]] for i in range(len(stack) - 1)]) global miniPrice, maxiPrice, miniStack, maxiStack if price < miniPrice: @@ -52,6 +60,7 @@ def register_path(stack): maxiStack = stack.copy() print(f"{chemin_str(stack)} = {price:.2f} €") + stack = [0] while stack[0] == 0: if stack[-1] >= N - 1: @@ -59,7 +68,7 @@ while stack[0] == 0: stack.pop() stack[-1] += 1 else: - stack.append(stack[-1]+1) + stack.append(stack[-1] + 1) print(f"Prix minimum: {chemin_str(miniStack)} = {miniPrice:.2f} €") print(f"Prix maximum: {chemin_str(maxiStack)} = {maxiPrice:.2f} €") diff --git a/config/scripts/tvshow b/config/scripts/tvshow index d090eaa..9e6fd90 100755 --- a/config/scripts/tvshow +++ b/config/scripts/tvshow @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 # pylint: disable=C0103,W0621 # pip install tmdbv3api @@ -20,8 +20,8 @@ Episode = typing.Any # TODO # Constants -API_KEY_PASS_PATH = 'http/themoviedb.org' -VIDEO_EXTENSIONS = {'mp4', 'mkv', 'avi', 'webm'} +API_KEY_PASS_PATH = "http/themoviedb.org" +VIDEO_EXTENSIONS = {"mp4", "mkv", "avi", "webm"} # Functions @@ -31,12 +31,12 @@ def get_pass_data(path: str) -> typing.Dict[str, str]: Returns the data stored in the Unix password manager given its path. """ - run = subprocess.run(['pass', path], stdout=subprocess.PIPE, check=True) - lines = run.stdout.decode().split('\n') + run = subprocess.run(["pass", path], stdout=subprocess.PIPE, check=True) + lines = run.stdout.decode().split("\n") data = dict() - data['pass'] = lines[0] + data["pass"] = lines[0] for line in lines[1:]: - match = re.match(r'(\w+): ?(.+)', line) + match = re.match(r"(\w+): ?(.+)", line) if match: data[match[1]] = match[2] return data @@ -44,24 +44,27 @@ def get_pass_data(path: str) -> typing.Dict[str, str]: def confirm(text: str) -> bool: res = input(text + " [yn] ") - while res not in ('y', 'n'): + while res not in ("y", "n"): res = input("Please answer with y or n: ") - return res == 'y' + return res == "y" def episode_identifier(episode: typing.Any) -> str: - return f"S{episode['season_number']:02d}E" + \ - f"{episode['episode_number']:02d} {episode['name']}" + return ( + f"S{episode['season_number']:02d}E" + + f"{episode['episode_number']:02d} {episode['name']}" + ) -dryrun = '-n' in sys.argv + +dryrun = "-n" in sys.argv if dryrun: dryrun = True - sys.argv.remove('-n') + sys.argv.remove("-n") # Connecting to TMBDB tmdb = tmdbv3api.TMDb() -tmdb.api_key = get_pass_data(API_KEY_PASS_PATH)['api'] +tmdb.api_key = get_pass_data(API_KEY_PASS_PATH)["api"] tmdb.language = sys.argv[1] # Searching the TV show name (by current directory name) @@ -71,8 +74,8 @@ if len(sys.argv) >= 3: show_name = sys.argv[2] else: show_name = os.path.split(os.path.realpath(os.path.curdir))[1] - if '(' in show_name: - show_name = show_name.split('(')[0].strip() + if "(" in show_name: + show_name = show_name.split("(")[0].strip() search = tv.search(show_name) @@ -86,15 +89,14 @@ for res in search: break if not show: - print("Could not find a matching " + - f"show on TheMovieDatabase for {show_name}.") + print("Could not find a matching " + f"show on TheMovieDatabase for {show_name}.") sys.exit(1) # Retrieving all the episode of the show episodes: typing.List[Episode] = list() print(f"List of episodes for {show.name}:") -for season_number in range(0, show.number_of_seasons+1): +for season_number in range(0, show.number_of_seasons + 1): season_details = season.details(show.id, season_number) try: @@ -119,22 +121,22 @@ for root, dirs, files in os.walk(os.path.curdir): print(f"- {filename}") -def get_episode(season_number: int, episode_number: int - ) -> typing.Optional[Episode]: +def get_episode(season_number: int, episode_number: int) -> typing.Optional[Episode]: # TODO Make more efficient using indexing for episode in episodes: - if episode['season_number'] == season_number \ - and episode['episode_number'] == episode_number: + if ( + episode["season_number"] == season_number + and episode["episode_number"] == episode_number + ): return episode return None # Matching movie files to episode -associations: typing.List[typing.Tuple[typing.Tuple[str, - str], Episode]] = list() +associations: typing.List[typing.Tuple[typing.Tuple[str, str], Episode]] = list() for video in videos: root, filename = video - match = re.search(r'S(\d+)E(\d+)', filename) + match = re.search(r"S(\d+)E(\d+)", filename) print(f"Treating file: {root}/{filename}") episode = None season_number = 0 @@ -155,7 +157,8 @@ for video in videos: episode = get_episode(season_number, episode_number) if not episode: print( - f" could not find episode S{season_number:02d}E{episode_number:02d} in TMBD") + f" could not find episode S{season_number:02d}E{episode_number:02d} in TMBD" + ) # Skip if not episode: diff --git a/config/scripts/unziptree b/config/scripts/unziptree index 3c5843e..910f4d0 100755 --- a/config/scripts/unziptree +++ b/config/scripts/unziptree @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 import os import subprocess diff --git a/config/scripts/updateCompressedMusic b/config/scripts/updateCompressedMusic index 6675f54..a424d8e 100755 --- a/config/scripts/updateCompressedMusic +++ b/config/scripts/updateCompressedMusic @@ -10,7 +10,7 @@ import re import coloredlogs import progressbar -coloredlogs.install(level='DEBUG', fmt='%(levelname)s %(message)s') +coloredlogs.install(level="DEBUG", fmt="%(levelname)s %(message)s") log = logging.getLogger() # Constants @@ -18,8 +18,14 @@ SOURCE_FOLDER = os.path.join(os.path.expanduser("~"), "Musiques") OUTPUT_FOLDER = os.path.join(os.path.expanduser("~"), ".MusiqueCompressed") CONVERSIONS = {"flac": "opus"} FORBIDDEN_EXTENSIONS = ["jpg", "png", "pdf", "ffs_db"] -FORGIVEN_FILENAMES = ["cover.jpg", "front.jpg", "folder.jpg", - "cover.png", "front.png", "folder.png"] +FORGIVEN_FILENAMES = [ + "cover.jpg", + "front.jpg", + "folder.jpg", + "cover.png", + "front.png", + "folder.png", +] IGNORED_EMPTY_FOLDER = [".stfolder"] RESTRICT_CHARACTERS = '[\0\\/:*"<>|]' # FAT32, NTFS # RESTRICT_CHARACTERS = '[:/]' # HFS, HFS+ @@ -55,7 +61,7 @@ def convertPath(path: str) -> typing.Optional[str]: extension = extension[1:].lower() # Remove unwanted characters from filename filename_parts = os.path.normpath(filename).split(os.path.sep) - filename_parts = [re.sub(RESTRICT_CHARACTERS, '_', part) for part in filename_parts] + filename_parts = [re.sub(RESTRICT_CHARACTERS, "_", part) for part in filename_parts] filename = os.path.sep.join(filename_parts) # If the extension isn't allowed if extension in FORBIDDEN_EXTENSIONS: @@ -103,7 +109,7 @@ for sourceFile in remainingConversions: # Converting fullSourceFile = os.path.join(SOURCE_FOLDER, sourceFile) if sourceFile == outputFile: - log.debug('%s → %s', fullSourceFile, fullOutputFile) + log.debug("%s → %s", fullSourceFile, fullOutputFile) if act and os.path.isfile(fullOutputFile): os.remove(fullOutputFile) os.link(fullSourceFile, fullOutputFile) @@ -113,19 +119,33 @@ for sourceFile in remainingConversions: log.info("Removing extra files") for extraFile in extraFiles: fullExtraFile = os.path.join(OUTPUT_FOLDER, extraFile) - log.debug('× %s', fullExtraFile) + log.debug("× %s", fullExtraFile) if act: os.remove(fullExtraFile) log.info("Listing files that will be converted") for fullSourceFile, fullOutputFile in conversions: - log.debug('%s ⇒ %s', fullSourceFile, fullOutputFile) + log.debug("%s ⇒ %s", fullSourceFile, fullOutputFile) log.info("Converting files") for fullSourceFile, fullOutputFile in progressbar.progressbar(conversions): - cmd = ["ffmpeg", "-y", "-i", fullSourceFile, "-c:a", "libopus", - "-movflags", "+faststart", "-b:a", "128k", "-vbr", "on", - "-compression_level", "10", fullOutputFile] + cmd = [ + "ffmpeg", + "-y", + "-i", + fullSourceFile, + "-c:a", + "libopus", + "-movflags", + "+faststart", + "-b:a", + "128k", + "-vbr", + "on", + "-compression_level", + "10", + fullOutputFile, + ] if act: subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) else: diff --git a/config/scripts/vidcmp b/config/scripts/vidcmp index f8e27eb..1dd0d66 100755 --- a/config/scripts/vidcmp +++ b/config/scripts/vidcmp @@ -10,11 +10,9 @@ import subprocess files = sys.argv[1:] remove = False -if '-r' in files: - files.remove('-r') +if "-r" in files: + files.remove("-r") remove = True for f in files: print(os.path.splitext(f)) - - diff --git a/config/scripts/videoQuota b/config/scripts/videoQuota index 4c63016..58af75e 100755 --- a/config/scripts/videoQuota +++ b/config/scripts/videoQuota @@ -1,6 +1,5 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 -import os import sys import subprocess import logging