Run black on all Python scripts!

This commit is contained in:
Geoffrey Frogeye 2021-06-13 11:49:21 +02:00
parent fb6cfce656
commit cd9cbcaa28
Signed by: geoffrey
GPG key ID: C72403E7F82E6AD8
30 changed files with 1027 additions and 704 deletions

View file

@ -6,7 +6,7 @@ import logging
import os
import sys
coloredlogs.install(level='DEBUG', fmt='%(levelname)s %(message)s')
coloredlogs.install(level="DEBUG", fmt="%(levelname)s %(message)s")
log = logging.getLogger()
# Coding conventions:
@ -15,10 +15,10 @@ log = logging.getLogger()
# TODO Config arparse and pass args to the functions. No globals
# Finding directories
assert 'HOME' in os.environ, "Home directory unknown"
DOCS = os.path.realpath(os.path.join(os.environ['HOME'], 'Documents'))
assert "HOME" in os.environ, "Home directory unknown"
DOCS = os.path.realpath(os.path.join(os.environ["HOME"], "Documents"))
assert os.path.isdir(DOCS), "Documents folder not found"
ARCS = os.path.realpath(os.path.join(os.environ['HOME'], 'Archives'))
ARCS = os.path.realpath(os.path.join(os.environ["HOME"], "Archives"))
assert os.path.isdir(ARCS), "Archives folder not found"
@ -27,7 +27,7 @@ def dirRange(relpath):
res = list()
for p in range(len(splits)):
partPath = os.path.join(*splits[:p+1])
partPath = os.path.join(*splits[: p + 1])
arcPath = os.path.join(os.path.join(ARCS, partPath))
docPath = os.path.join(os.path.join(DOCS, partPath))
@ -36,6 +36,7 @@ def dirRange(relpath):
return res
def travel(relpath):
"""
Dunno what this will do, let's write code and see.
@ -60,8 +61,10 @@ def travel(relpath):
elif os.path.islink(docPath) and os.path.exists(arcPath):
currentLink = os.readlink(docPath)
if currentLink != linkPath:
log.warning(f"'{docPath}' is pointing to '{currentLink}' " +
f"but should point to '{linkPath}'.")
log.warning(
f"'{docPath}' is pointing to '{currentLink}' "
+ f"but should point to '{linkPath}'."
)
# TODO Fixing if asked for
sys.exit(1)
log.debug("Early link already exists {docPath} → {arcPath}")
@ -69,13 +72,11 @@ def travel(relpath):
elif not os.path.exists(docPath) and os.path.exists(arcPath):
log.debug("Only existing on archive side, linking")
print(f"ln -s {linkPath} {docPath}")
elif os.path.exists(docPath) and not os.path.exists(arcPath) \
and isLast:
elif os.path.exists(docPath) and not os.path.exists(arcPath) and isLast:
log.debug("Only existing on doc side, moving and linking")
print(f"mv {docPath} {arcPath}")
print(f"ln -s {linkPath} {docPath}")
elif os.path.exists(docPath) and not os.path.exists(arcPath) \
and not isLast:
elif os.path.exists(docPath) and not os.path.exists(arcPath) and not isLast:
raise NotImplementedError("Here comes the trouble")
else:
log.error("Unhandled case")
@ -103,8 +104,10 @@ def ensureLink(relpath):
if os.path.islink(docPath):
currentLink = os.readlink(docPath)
if currentLink != linkPath:
log.warning(f"'{docPath}' is pointing to '{currentLink}' " +
f"but should point to '{linkPath}'. Fixing")
log.warning(
f"'{docPath}' is pointing to '{currentLink}' "
+ f"but should point to '{linkPath}'. Fixing"
)
if args.dry:
print(f"rm {docPath}")
else:
@ -117,10 +120,13 @@ def ensureLink(relpath):
elif os.path.isdir(docPath):
continue
else:
raise RuntimeError(f"'{docPath}' exists and is not a directory " +
f"or a link. Unable to link it to '{linkPath}'")
raise RuntimeError(f"'{docPath}' is a directory. Unable to link it to " +
f"'{linkPath}'")
raise RuntimeError(
f"'{docPath}' exists and is not a directory "
+ f"or a link. Unable to link it to '{linkPath}'"
)
raise RuntimeError(
f"'{docPath}' is a directory. Unable to link it to " + f"'{linkPath}'"
)
def archive(docdir):
@ -134,8 +140,8 @@ def archive(docdir):
print("ARC", reldir)
arcdir = os.path.join(ARCS, reldir)
parentArcdir = os.path.realpath(os.path.join(arcdir, '..'))
parentDocdir = os.path.realpath(os.path.join(docdir, '..'))
parentArcdir = os.path.realpath(os.path.join(arcdir, ".."))
parentDocdir = os.path.realpath(os.path.join(docdir, ".."))
linkDest = os.path.relpath(arcdir, parentDocdir)
# BULLSHIT
@ -172,11 +178,15 @@ def unarchive(arcdir):
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Place a folder in ~/Documents in ~/Documents/Archives and symlink it")
parser.add_argument('dir', metavar='DIRECTORY', type=str, help="The directory to archive")
parser.add_argument('-d', '--dry', action='store_true')
parser = argparse.ArgumentParser(
description="Place a folder in ~/Documents in ~/Documents/Archives and symlink it"
)
parser.add_argument(
"dir", metavar="DIRECTORY", type=str, help="The directory to archive"
)
parser.add_argument("-d", "--dry", action="store_true")
args = parser.parse_args()
args.dry = True # DEBUG
args.dry = True # DEBUG
# archive(args.dir)
ensureLink(args.dir)

View file

@ -14,7 +14,7 @@ import json
import statistics
import datetime
coloredlogs.install(level='DEBUG', fmt='%(levelname)s %(message)s')
coloredlogs.install(level="DEBUG", fmt="%(levelname)s %(message)s")
log = logging.getLogger()
# Constants
@ -34,15 +34,15 @@ def videoMetadata(filename):
p.check_returncode()
metadataRaw = p.stdout
data = dict()
for metadataLine in metadataRaw.split(b'\n'):
for metadataLine in metadataRaw.split(b"\n"):
# Skip empty lines
if not len(metadataLine):
continue
# Skip comments
if metadataLine.startswith(b';'):
if metadataLine.startswith(b";"):
continue
# Parse key-value
metadataLineSplit = metadataLine.split(b'=')
metadataLineSplit = metadataLine.split(b"=")
if len(metadataLineSplit) != 2:
log.warning("Unparsed metadata line: `{}`".format(metadataLine))
continue
@ -52,6 +52,7 @@ def videoMetadata(filename):
data[key] = val
return data
def videoInfos(filename):
assert os.path.isfile(filename)
cmd = ["ffprobe", filename, "-print_format", "json", "-show_streams"]
@ -61,7 +62,10 @@ def videoInfos(filename):
infos = json.loads(infosRaw)
return infos
from pprint import pprint
def streamDuration(stream):
if "duration" in stream:
return float(stream["duration"])
@ -77,13 +81,14 @@ def streamDuration(stream):
else:
raise KeyError("Can't find duration information in stream")
def videoDuration(filename):
# TODO Doesn't work with VP8 / webm
infos = videoInfos(filename)
durations = [streamDuration(stream) for stream in infos["streams"]]
dev = statistics.stdev(durations)
assert dev <= DURATION_MAX_DEV, "Too much deviation ({} s)".format(dev)
return sum(durations)/len(durations)
return sum(durations) / len(durations)
todos = set()
@ -130,13 +135,12 @@ for root, inputName in progressbar.progressbar(allVideos):
meta = videoMetadata(inputFull)
# If it has the field with the original file
if 'original' in meta:
if "original" in meta:
# Skip file
continue
else:
assert not os.path.isfile(outputFull), outputFull + " exists"
size = os.stat(inputFull).st_size
try:
duration = videoDuration(inputFull)
@ -151,7 +155,11 @@ for root, inputName in progressbar.progressbar(allVideos):
totalSize += size
todos.add(todo)
log.info("Converting {} videos ({})".format(len(todos), datetime.timedelta(seconds=totalDuration)))
log.info(
"Converting {} videos ({})".format(
len(todos), datetime.timedelta(seconds=totalDuration)
)
)
# From https://stackoverflow.com/a/3431838
def sha256(fname):
@ -161,17 +169,30 @@ def sha256(fname):
hash_sha256.update(chunk)
return hash_sha256.hexdigest()
# Progress bar things
totalDataSize = progressbar.widgets.DataSize()
totalDataSize.variable = 'max_value'
barWidgets = [progressbar.widgets.DataSize(), ' of ', totalDataSize, ' ', progressbar.widgets.Bar(), ' ', progressbar.widgets.FileTransferSpeed(), ' ', progressbar.widgets.AdaptiveETA()]
totalDataSize.variable = "max_value"
barWidgets = [
progressbar.widgets.DataSize(),
" of ",
totalDataSize,
" ",
progressbar.widgets.Bar(),
" ",
progressbar.widgets.FileTransferSpeed(),
" ",
progressbar.widgets.AdaptiveETA(),
]
bar = progressbar.DataTransferBar(max_value=totalSize, widgets=barWidgets)
bar.start()
processedSize = 0
for inputFull, originalFull, outputFull, size, duration in todos:
tmpfile = tempfile.mkstemp(prefix="compressPictureMovies", suffix="."+OUTPUT_EXTENSION)[1]
tmpfile = tempfile.mkstemp(
prefix="compressPictureMovies", suffix="." + OUTPUT_EXTENSION
)[1]
try:
# Calculate the sum of the original file
checksum = sha256(inputFull)
@ -180,7 +201,12 @@ for inputFull, originalFull, outputFull, size, duration in todos:
originalRel = os.path.relpath(originalFull, ORIGINAL_FOLDER)
originalContent = "{} {}".format(originalRel, checksum)
metadataCmd = ["-metadata", 'original="{}"'.format(originalContent)]
cmd = ["ffmpeg", "-hide_banner", "-y", "-i", inputFull] + OUTPUT_FFMPEG_PARAMETERS + metadataCmd + [tmpfile]
cmd = (
["ffmpeg", "-hide_banner", "-y", "-i", inputFull]
+ OUTPUT_FFMPEG_PARAMETERS
+ metadataCmd
+ [tmpfile]
)
p = subprocess.run(cmd)
p.check_returncode()

View file

@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
import sys
import random
@ -21,7 +21,7 @@ for line in sys.stdin:
else:
wrd = list(word)
random.shuffle(wrd)
nl += ''.join(wrd)
nl += "".join(wrd)
nl += c
word = ""
grace = True

View file

@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
import os
import sys
@ -7,7 +7,7 @@ import logging
import coloredlogs
coloredlogs.install(level='DEBUG', fmt='%(levelname)s %(message)s')
coloredlogs.install(level="DEBUG", fmt="%(levelname)s %(message)s")
log = logging.getLogger()
@ -26,7 +26,7 @@ def duration_file(path: str) -> float:
ret = run.stdout.decode().strip()
if run.returncode != 0:
log.warning(f"{path}: unable to get duration")
elif ret == 'N/A':
elif ret == "N/A":
log.warning(f"{path}: has no duration")
else:
try:

View file

@ -45,7 +45,7 @@ import notmuch
import progressbar
import xdg.BaseDirectory
MailLocation = typing.NewType('MailLocation', typing.Tuple[str, str, str])
MailLocation = typing.NewType("MailLocation", typing.Tuple[str, str, str])
# MessageAction = typing.Callable[[notmuch.Message], None]
@ -106,7 +106,8 @@ class MelEngine:
assert not self.database
self.log.info("Indexing mails")
notmuch_config_file = os.path.expanduser(
"~/.config/notmuch-config") # TODO Better
"~/.config/notmuch-config"
) # TODO Better
cmd = ["notmuch", "--config", notmuch_config_file, "new"]
self.log.debug(" ".join(cmd))
subprocess.run(cmd, check=True)
@ -117,7 +118,8 @@ class MelEngine:
"""
assert self.config
storage_path = os.path.realpath(
os.path.expanduser(self.config["GENERAL"]["storage"]))
os.path.expanduser(self.config["GENERAL"]["storage"])
)
folders = list()
for account in self.accounts:
storage_path_account = os.path.join(storage_path, account)
@ -125,9 +127,9 @@ class MelEngine:
if "cur" not in dirs or "new" not in dirs or "tmp" not in dirs:
continue
assert root.startswith(storage_path)
path = root[len(storage_path):]
path_split = path.split('/')
if path_split[0] == '':
path = root[len(storage_path) :]
path_split = path.split("/")
if path_split[0] == "":
path_split = path_split[1:]
folders.append(tuple(path_split))
return folders
@ -138,8 +140,11 @@ class MelEngine:
Be sure to require only in the mode you want to avoid deadlocks.
"""
assert self.config
mode = notmuch.Database.MODE.READ_WRITE if write \
mode = (
notmuch.Database.MODE.READ_WRITE
if write
else notmuch.Database.MODE.READ_ONLY
)
if self.database:
# If the requested mode is the one already present,
# or we request read when it's already write, do nothing
@ -149,7 +154,8 @@ class MelEngine:
self.close_database()
self.log.info("Opening database in mode %s", mode)
db_path = os.path.realpath(
os.path.expanduser(self.config["GENERAL"]["storage"]))
os.path.expanduser(self.config["GENERAL"]["storage"])
)
self.database = notmuch.Database(mode=mode, path=db_path)
def close_database(self) -> None:
@ -171,13 +177,13 @@ class MelEngine:
assert self.database
base = self.database.get_path()
assert path.startswith(base)
path = path[len(base):]
path_split = path.split('/')
path = path[len(base) :]
path_split = path.split("/")
mailbox = path_split[1]
assert mailbox in self.accounts
state = path_split[-1]
folder = tuple(path_split[2:-1])
assert state in {'cur', 'tmp', 'new'}
assert state in {"cur", "tmp", "new"}
return (mailbox, folder, state)
@staticmethod
@ -185,8 +191,11 @@ class MelEngine:
"""
Tells if the provided string is a valid UID.
"""
return isinstance(uid, str) and len(uid) == 12 \
and bool(re.match('^[a-zA-Z0-9+/]{12}$', uid))
return (
isinstance(uid, str)
and len(uid) == 12
and bool(re.match("^[a-zA-Z0-9+/]{12}$", uid))
)
@staticmethod
def extract_email(field: str) -> str:
@ -197,9 +206,9 @@ class MelEngine:
# TODO Can be made better (extract name and email)
# Also what happens with multiple dests?
try:
sta = field.index('<')
sto = field.index('>')
return field[sta+1:sto]
sta = field.index("<")
sto = field.index(">")
return field[sta + 1 : sto]
except ValueError:
return field
@ -211,8 +220,9 @@ class MelEngine:
# Search-friendly folder name
slug_folder_list = list()
for fold_index, fold in [(fold_index, folder[fold_index])
for fold_index in range(len(folder))]:
for fold_index, fold in [
(fold_index, folder[fold_index]) for fold_index in range(len(folder))
]:
if fold_index == 0 and len(folder) > 1 and fold == "INBOX":
continue
slug_folder_list.append(fold.upper())
@ -229,14 +239,15 @@ class MelEngine:
msg.add_tag(tag)
elif not condition and tag in tags:
msg.remove_tag(tag)
expeditor = MelEngine.extract_email(msg.get_header('from'))
tag_if('inbox', slug_folder[0] == 'INBOX')
tag_if('spam', slug_folder[0] in ('JUNK', 'SPAM'))
tag_if('deleted', slug_folder[0] == 'TRASH')
tag_if('draft', slug_folder[0] == 'DRAFTS')
tag_if('sent', expeditor in self.aliases)
tag_if('unprocessed', False)
expeditor = MelEngine.extract_email(msg.get_header("from"))
tag_if("inbox", slug_folder[0] == "INBOX")
tag_if("spam", slug_folder[0] in ("JUNK", "SPAM"))
tag_if("deleted", slug_folder[0] == "TRASH")
tag_if("draft", slug_folder[0] == "DRAFTS")
tag_if("sent", expeditor in self.aliases)
tag_if("unprocessed", False)
# UID
uid = msg.get_header("X-TUID")
@ -244,17 +255,23 @@ class MelEngine:
# TODO Happens to sent mails but should it?
print(f"{msg.get_filename()} has no UID!")
return
uidtag = 'tuid{}'.format(uid)
uidtag = "tuid{}".format(uid)
# Remove eventual others UID
for tag in tags:
if tag.startswith('tuid') and tag != uidtag:
if tag.startswith("tuid") and tag != uidtag:
msg.remove_tag(tag)
msg.add_tag(uidtag)
def apply_msgs(self, query_str: str, action: typing.Callable,
*args: typing.Any, show_progress: bool = False,
write: bool = False, close_db: bool = True,
**kwargs: typing.Any) -> int:
def apply_msgs(
self,
query_str: str,
action: typing.Callable,
*args: typing.Any,
show_progress: bool = False,
write: bool = False,
close_db: bool = True,
**kwargs: typing.Any,
) -> int:
"""
Run a function on the messages selected by the given query.
"""
@ -267,8 +284,11 @@ class MelEngine:
elements = query.search_messages()
nb_msgs = query.count_messages()
iterator = progressbar.progressbar(elements, max_value=nb_msgs) \
if show_progress and nb_msgs else elements
iterator = (
progressbar.progressbar(elements, max_value=nb_msgs)
if show_progress and nb_msgs
else elements
)
self.log.info("Executing %s", action)
for msg in iterator:
@ -296,8 +316,9 @@ class MelOutput:
WIDTH_FIXED = 31
WIDTH_RATIO_DEST_SUBJECT = 0.3
def compute_line_format(self) -> typing.Tuple[typing.Optional[int],
typing.Optional[int]]:
def compute_line_format(
self,
) -> typing.Tuple[typing.Optional[int], typing.Optional[int]]:
"""
Based on the terminal width, assign the width of flexible columns.
"""
@ -332,12 +353,12 @@ class MelOutput:
"""
now = datetime.datetime.now()
if now - date < datetime.timedelta(days=1):
return date.strftime('%H:%M:%S')
return date.strftime("%H:%M:%S")
if now - date < datetime.timedelta(days=28):
return date.strftime('%d %H:%M')
return date.strftime("%d %H:%M")
if now - date < datetime.timedelta(days=365):
return date.strftime('%m-%d %H')
return date.strftime('%y-%m-%d')
return date.strftime("%m-%d %H")
return date.strftime("%y-%m-%d")
@staticmethod
def clip_text(size: typing.Optional[int], text: str) -> str:
@ -352,28 +373,28 @@ class MelOutput:
if length == size:
return text
if length > size:
return text[:size-1] + '…'
return text + ' ' * (size - length)
return text[: size - 1] + "…"
return text + " " * (size - length)
@staticmethod
def chunks(iterable: str, chunk_size: int) -> typing.Iterable[str]:
"""Yield successive chunk_size-sized chunks from iterable."""
# From https://stackoverflow.com/a/312464
for i in range(0, len(iterable), chunk_size):
yield iterable[i:i + chunk_size]
yield iterable[i : i + chunk_size]
@staticmethod
def sizeof_fmt(num: int, suffix: str = 'B') -> str:
def sizeof_fmt(num: int, suffix: str = "B") -> str:
"""
Print the given size in a human-readable format.
"""
remainder = float(num)
# From https://stackoverflow.com/a/1094933
for unit in ['', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi']:
for unit in ["", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"]:
if abs(remainder) < 1024.0:
return "%3.1f %s%s" % (remainder, unit, suffix)
remainder /= 1024.0
return "%.1f %s%s" % (remainder, 'Yi', suffix)
return "%.1f %s%s" % (remainder, "Yi", suffix)
def get_mailbox_color(self, mailbox: str) -> str:
"""
@ -381,7 +402,7 @@ class MelOutput:
string with ASCII escape codes.
"""
if not self.is_tty:
return ''
return ""
if mailbox not in self.mailbox_colors:
# RGB colors (not supported everywhere)
# color_str = self.config[mailbox]["color"]
@ -406,21 +427,24 @@ class MelOutput:
line = ""
tags = set(msg.get_tags())
mailbox, _, _ = self.engine.get_location(msg)
if 'unread' in tags or 'flagged' in tags:
if "unread" in tags or "flagged" in tags:
line += colorama.Style.BRIGHT
# if 'flagged' in tags:
# line += colorama.Style.BRIGHT
# if 'unread' not in tags:
# line += colorama.Style.DIM
line += colorama.Back.LIGHTBLACK_EX if self.light_background \
line += (
colorama.Back.LIGHTBLACK_EX
if self.light_background
else colorama.Back.BLACK
)
self.light_background = not self.light_background
line += self.get_mailbox_color(mailbox)
# UID
uid = None
for tag in tags:
if tag.startswith('tuid'):
if tag.startswith("tuid"):
uid = tag[4:]
assert uid, f"No UID for message: {msg}."
assert MelEngine.is_uid(uid), f"{uid} {type(uid)} is not a valid UID."
@ -434,8 +458,9 @@ class MelOutput:
# Icons
line += sep + colorama.Fore.RED
def tags2col1(tag1: str, tag2: str,
characters: typing.Tuple[str, str, str, str]) -> None:
def tags2col1(
tag1: str, tag2: str, characters: typing.Tuple[str, str, str, str]
) -> None:
"""
Show the presence/absence of two tags with one character.
"""
@ -452,14 +477,14 @@ class MelOutput:
else:
line += none
tags2col1('spam', 'draft', ('?', 'S', 'D', ' '))
tags2col1('attachment', 'encrypted', ('E', 'A', 'E', ' '))
tags2col1('unread', 'flagged', ('!', 'U', 'F', ' '))
tags2col1('sent', 'replied', ('?', '↑', '↪', ' '))
tags2col1("spam", "draft", ("?", "S", "D", " "))
tags2col1("attachment", "encrypted", ("E", "A", "E", " "))
tags2col1("unread", "flagged", ("!", "U", "F", " "))
tags2col1("sent", "replied", ("?", "↑", "↪", " "))
# Opposed
line += sep + colorama.Fore.BLUE
if 'sent' in tags:
if "sent" in tags:
dest = msg.get_header("to")
else:
dest = msg.get_header("from")
@ -483,11 +508,10 @@ class MelOutput:
expd = msg.get_header("from")
account, _, _ = self.engine.get_location(msg)
summary = '{} (<i>{}</i>)'.format(html.escape(expd), account)
summary = "{} (<i>{}</i>)".format(html.escape(expd), account)
body = html.escape(subject)
cmd = ["notify-send", "-u", "low", "-i",
"mail-message-new", summary, body]
print(' '.join(cmd))
cmd = ["notify-send", "-u", "low", "-i", "mail-message-new", summary, body]
print(" ".join(cmd))
subprocess.run(cmd, check=False)
def notify_all(self) -> None:
@ -497,12 +521,26 @@ class MelOutput:
since it should be marked as processed right after.
"""
nb_msgs = self.engine.apply_msgs(
'tag:unread and tag:unprocessed', self.notify_msg)
"tag:unread and tag:unprocessed", self.notify_msg
)
if nb_msgs > 0:
self.log.info(
"Playing notification sound (%d new message(s))", nb_msgs)
cmd = ["play", "-n", "synth", "sine", "E4", "sine", "A5",
"remix", "1-2", "fade", "0.5", "1.2", "0.5", "2"]
self.log.info("Playing notification sound (%d new message(s))", nb_msgs)
cmd = [
"play",
"-n",
"synth",
"sine",
"E4",
"sine",
"A5",
"remix",
"1-2",
"fade",
"0.5",
"1.2",
"0.5",
"2",
]
subprocess.run(cmd, check=False)
@staticmethod
@ -510,46 +548,62 @@ class MelOutput:
"""
Return split header values in a contiguous string.
"""
return val.replace('\n', '').replace('\t', '').strip()
return val.replace("\n", "").replace("\t", "").strip()
PART_MULTI_FORMAT = colorama.Fore.BLUE + \
'{count} {indent}+ {typ}' + colorama.Style.RESET_ALL
PART_LEAF_FORMAT = colorama.Fore.BLUE + \
'{count} {indent}→ {desc} ({typ}; {size})' + \
colorama.Style.RESET_ALL
PART_MULTI_FORMAT = (
colorama.Fore.BLUE + "{count} {indent}+ {typ}" + colorama.Style.RESET_ALL
)
PART_LEAF_FORMAT = (
colorama.Fore.BLUE
+ "{count} {indent}→ {desc} ({typ}; {size})"
+ colorama.Style.RESET_ALL
)
def show_parts_tree(self, part: email.message.Message,
depth: int = 0, count: int = 1) -> int:
def show_parts_tree(
self, part: email.message.Message, depth: int = 0, count: int = 1
) -> int:
"""
Show a tree of the parts contained in a message.
Return the number of parts of the mesage.
"""
indent = depth * '\t'
indent = depth * "\t"
typ = part.get_content_type()
if part.is_multipart():
print(MelOutput.PART_MULTI_FORMAT.format(
count=count, indent=indent, typ=typ))
print(
MelOutput.PART_MULTI_FORMAT.format(count=count, indent=indent, typ=typ)
)
payl = part.get_payload()
assert isinstance(payl, list)
size = 1
for obj in payl:
size += self.show_parts_tree(obj, depth=depth+1,
count=count+size)
size += self.show_parts_tree(obj, depth=depth + 1, count=count + size)
return size
payl = part.get_payload(decode=True)
assert isinstance(payl, bytes)
size = len(payl)
desc = part.get('Content-Description', '<no description>')
print(MelOutput.PART_LEAF_FORMAT.format(
count=count, indent=indent, typ=typ, desc=desc,
size=MelOutput.sizeof_fmt(size)))
desc = part.get("Content-Description", "<no description>")
print(
MelOutput.PART_LEAF_FORMAT.format(
count=count,
indent=indent,
typ=typ,
desc=desc,
size=MelOutput.sizeof_fmt(size),
)
)
return 1
INTERESTING_HEADERS = ["Date", "From", "Subject", "To", "Cc", "Message-Id"]
HEADER_FORMAT = colorama.Fore.BLUE + colorama.Style.BRIGHT + \
'{}:' + colorama.Style.NORMAL + ' {}' + colorama.Style.RESET_ALL
HEADER_FORMAT = (
colorama.Fore.BLUE
+ colorama.Style.BRIGHT
+ "{}:"
+ colorama.Style.NORMAL
+ " {}"
+ colorama.Style.RESET_ALL
)
def read_msg(self, msg: notmuch.Message) -> None:
"""
@ -558,7 +612,7 @@ class MelOutput:
# Parse
filename = msg.get_filename()
parser = email.parser.BytesParser()
with open(filename, 'rb') as filedesc:
with open(filename, "rb") as filedesc:
mail = parser.parse(filedesc)
# Defects
@ -591,12 +645,13 @@ class MelOutput:
print(payl.decode())
else:
# TODO Use nametemplate from mailcap
temp_file = '/tmp/melcap.html' # TODO Real temporary file
temp_file = "/tmp/melcap.html" # TODO Real temporary file
# TODO FIFO if possible
with open(temp_file, 'wb') as temp_filedesc:
with open(temp_file, "wb") as temp_filedesc:
temp_filedesc.write(payl)
command, _ = mailcap.findmatch(
self.caps, part.get_content_type(), key='view', filename=temp_file)
self.caps, part.get_content_type(), key="view", filename=temp_file
)
if command:
os.system(command)
@ -611,21 +666,26 @@ class MelOutput:
line += arb[0].replace("'", "\\'")
line += colorama.Fore.LIGHTBLACK_EX
for inter in arb[1:-1]:
line += '/' + inter.replace("'", "\\'")
line += '/' + colorama.Fore.WHITE + arb[-1].replace("'", "\\'")
line += "/" + inter.replace("'", "\\'")
line += "/" + colorama.Fore.WHITE + arb[-1].replace("'", "\\'")
line += colorama.Fore.LIGHTBLACK_EX + "'"
line += colorama.Style.RESET_ALL
print(line)
class MelCLI():
class MelCLI:
"""
Handles the user input and run asked operations.
"""
VERBOSITY_LEVELS = ["DEBUG", "INFO", "WARNING", "ERROR", "FATAL"]
def apply_msgs_input(self, argmessages: typing.List[str],
action: typing.Callable, write: bool = False) -> None:
def apply_msgs_input(
self,
argmessages: typing.List[str],
action: typing.Callable,
write: bool = False,
) -> None:
"""
Run a function on the message given by the user.
"""
@ -633,7 +693,7 @@ class MelCLI():
if not argmessages:
from_stdin = not sys.stdin.isatty()
if argmessages:
from_stdin = len(argmessages) == 1 and argmessages == '-'
from_stdin = len(argmessages) == 1 and argmessages == "-"
messages = list()
if from_stdin:
@ -646,9 +706,11 @@ class MelCLI():
else:
for uids in argmessages:
if len(uids) > 12:
self.log.warning("Might have forgotten some spaces "
"between the UIDs. Don't worry, I'll "
"split them for you")
self.log.warning(
"Might have forgotten some spaces "
"between the UIDs. Don't worry, I'll "
"split them for you"
)
for uid in MelOutput.chunks(uids, 12):
if not MelEngine.is_uid(uid):
self.log.error("Not an UID: %s", uid)
@ -656,48 +718,52 @@ class MelCLI():
messages.append(uid)
for message in messages:
query_str = f'tag:tuid{message}'
nb_msgs = self.engine.apply_msgs(query_str, action,
write=write, close_db=False)
query_str = f"tag:tuid{message}"
nb_msgs = self.engine.apply_msgs(
query_str, action, write=write, close_db=False
)
if nb_msgs < 1:
self.log.error(
"Couldn't execute function for message %s", message)
self.log.error("Couldn't execute function for message %s", message)
self.engine.close_database()
def operation_default(self) -> None:
"""
Default operation: list all message in the inbox
"""
self.engine.apply_msgs('tag:inbox', self.output.print_msg)
self.engine.apply_msgs("tag:inbox", self.output.print_msg)
def operation_inbox(self) -> None:
"""
Inbox operation: list all message in the inbox,
possibly only the unread ones.
"""
query_str = 'tag:unread' if self.args.only_unread else 'tag:inbox'
query_str = "tag:unread" if self.args.only_unread else "tag:inbox"
self.engine.apply_msgs(query_str, self.output.print_msg)
def operation_flag(self) -> None:
"""
Flag operation: Flag user selected messages.
"""
def flag_msg(msg: notmuch.Message) -> None:
"""
Flag given message.
"""
msg.add_tag('flagged')
msg.add_tag("flagged")
self.apply_msgs_input(self.args.message, flag_msg, write=True)
def operation_unflag(self) -> None:
"""
Unflag operation: Flag user selected messages.
"""
def unflag_msg(msg: notmuch.Message) -> None:
"""
Unflag given message.
"""
msg.remove_tag('flagged')
msg.remove_tag("flagged")
self.apply_msgs_input(self.args.message, unflag_msg, write=True)
def operation_read(self) -> None:
@ -712,8 +778,7 @@ class MelCLI():
"""
# Fetch mails
self.log.info("Fetching mails")
mbsync_config_file = os.path.expanduser(
"~/.config/mbsyncrc") # TODO Better
mbsync_config_file = os.path.expanduser("~/.config/mbsyncrc") # TODO Better
cmd = ["mbsync", "--config", mbsync_config_file, "--all"]
subprocess.run(cmd, check=False)
@ -724,8 +789,9 @@ class MelCLI():
self.output.notify_all()
# Tag new mails
self.engine.apply_msgs('tag:unprocessed', self.engine.retag_msg,
show_progress=True, write=True)
self.engine.apply_msgs(
"tag:unprocessed", self.engine.retag_msg, show_progress=True, write=True
)
def operation_list(self) -> None:
"""
@ -744,14 +810,15 @@ class MelCLI():
Retag operation: Manually retag all the mails in the database.
Mostly debug I suppose.
"""
self.engine.apply_msgs('*', self.engine.retag_msg,
show_progress=True, write=True)
self.engine.apply_msgs(
"*", self.engine.retag_msg, show_progress=True, write=True
)
def operation_all(self) -> None:
"""
All operation: list every single message.
"""
self.engine.apply_msgs('*', self.output.print_msg)
self.engine.apply_msgs("*", self.output.print_msg)
def add_subparsers(self) -> None:
"""
@ -766,29 +833,30 @@ class MelCLI():
# inbox (default)
parser_inbox = subparsers.add_parser(
"inbox", help="Show unread, unsorted and flagged messages")
parser_inbox.add_argument('-u', '--only-unread', action='store_true',
help="Show unread messages only")
"inbox", help="Show unread, unsorted and flagged messages"
)
parser_inbox.add_argument(
"-u", "--only-unread", action="store_true", help="Show unread messages only"
)
# TODO Make this more relevant
parser_inbox.set_defaults(operation=self.operation_inbox)
# list folder [--recurse]
# List actions
parser_list = subparsers.add_parser(
"list", help="List all folders")
parser_list = subparsers.add_parser("list", help="List all folders")
# parser_list.add_argument('message', nargs='*', help="Messages")
parser_list.set_defaults(operation=self.operation_list)
# flag msg...
parser_flag = subparsers.add_parser(
"flag", help="Mark messages as flagged")
parser_flag.add_argument('message', nargs='*', help="Messages")
parser_flag = subparsers.add_parser("flag", help="Mark messages as flagged")
parser_flag.add_argument("message", nargs="*", help="Messages")
parser_flag.set_defaults(operation=self.operation_flag)
# unflag msg...
parser_unflag = subparsers.add_parser(
"unflag", help="Mark messages as not-flagged")
parser_unflag.add_argument('message', nargs='*', help="Messages")
"unflag", help="Mark messages as not-flagged"
)
parser_unflag.add_argument("message", nargs="*", help="Messages")
parser_unflag.set_defaults(operation=self.operation_unflag)
# delete msg...
@ -799,7 +867,7 @@ class MelCLI():
# read msg [--html] [--plain] [--browser]
parser_read = subparsers.add_parser("read", help="Read message")
parser_read.add_argument('message', nargs=1, help="Messages")
parser_read.add_argument("message", nargs=1, help="Messages")
parser_read.set_defaults(operation=self.operation_read)
# attach msg [id] [--save] (list if no id, xdg-open else)
@ -817,20 +885,23 @@ class MelCLI():
# fetch (mbsync, notmuch new, retag, notify; called by greater gods)
parser_fetch = subparsers.add_parser(
"fetch", help="Fetch mail, tag them, and run notifications")
"fetch", help="Fetch mail, tag them, and run notifications"
)
parser_fetch.set_defaults(operation=self.operation_fetch)
# Debug
# debug (various)
parser_debug = subparsers.add_parser(
"debug", help="Who know what this holds...")
parser_debug.set_defaults(verbosity='DEBUG')
"debug", help="Who know what this holds..."
)
parser_debug.set_defaults(verbosity="DEBUG")
parser_debug.set_defaults(operation=self.operation_debug)
# retag (all or unprocessed)
parser_retag = subparsers.add_parser(
"retag", help="Retag all mails (when you changed configuration)")
"retag", help="Retag all mails (when you changed configuration)"
)
parser_retag.set_defaults(operation=self.operation_retag)
# all
@ -842,15 +913,21 @@ class MelCLI():
Create the main parser that will handle the user arguments.
"""
parser = argparse.ArgumentParser(description="Meh mail client")
parser.add_argument('-v', '--verbosity',
choices=MelCLI.VERBOSITY_LEVELS, default='WARNING',
help="Verbosity of self.log messages")
parser.add_argument(
"-v",
"--verbosity",
choices=MelCLI.VERBOSITY_LEVELS,
default="WARNING",
help="Verbosity of self.log messages",
)
# parser.add_argument('-n', '--dry-run', action='store_true',
# help="Don't do anything") # DEBUG
default_config_file = os.path.join(
xdg.BaseDirectory.xdg_config_home, 'mel', 'accounts.conf')
parser.add_argument('-c', '--config', default=default_config_file,
help="Accounts config file")
xdg.BaseDirectory.xdg_config_home, "mel", "accounts.conf"
)
parser.add_argument(
"-c", "--config", default=default_config_file, help="Accounts config file"
)
return parser
def __init__(self) -> None:
@ -860,8 +937,9 @@ class MelCLI():
self.add_subparsers()
self.args = self.parser.parse_args()
coloredlogs.install(level=self.args.verbosity,
fmt='%(levelname)s %(name)s %(message)s')
coloredlogs.install(
level=self.args.verbosity, fmt="%(levelname)s %(name)s %(message)s"
)
self.engine = MelEngine(self.args.config)
self.output = MelOutput(self.engine)

View file

@ -14,7 +14,7 @@ import sys
# TODO Write in .config or .cache /mel
# TODO Fix IMAPS with mbsync
configPath = os.path.join(os.path.expanduser('~'), '.config', 'mel', 'accounts.conf')
configPath = os.path.join(os.path.expanduser("~"), ".config", "mel", "accounts.conf")
config = configparser.ConfigParser()
config.read(configPath)
@ -23,9 +23,9 @@ storageFull = os.path.realpath(os.path.expanduser(config["GENERAL"]["storage"]))
config["GENERAL"]["storage"] = storageFull
SERVER_DEFAULTS = {
"imap": {"port": 143, "starttls": True},
"smtp": {"port": 587, "starttls": True},
}
"imap": {"port": 143, "starttls": True},
"smtp": {"port": 587, "starttls": True},
}
SERVER_ITEMS = {"host", "port", "user", "pass", "starttls"}
ACCOUNT_DEFAULTS = {
"color": "#FFFFFF",
@ -53,7 +53,11 @@ for name in config.sections():
for item in SERVER_ITEMS:
key = server + item
try:
val = section.get(key) or section.get(item) or SERVER_DEFAULTS[server][item]
val = (
section.get(key)
or section.get(item)
or SERVER_DEFAULTS[server][item]
)
except KeyError:
raise KeyError("{}.{}".format(name, key))
@ -71,7 +75,7 @@ for name in config.sections():
continue
data[key] = section[key]
for k, v in config['DEFAULT'].items():
for k, v in config["DEFAULT"].items():
if k not in data:
data[k] = v
@ -85,7 +89,7 @@ for name in config.sections():
mails.add(alt)
data["account"] = name
data["storage"] = os.path.join(config['GENERAL']['storage'], name)
data["storage"] = os.path.join(config["GENERAL"]["storage"], name)
data["storageInbox"] = os.path.join(data["storage"], "INBOX")
accounts[name] = data
@ -139,7 +143,7 @@ remotepass = {imappass}
"""
offlineIMAPstr = OFFLINEIMAP_BEGIN.format(','.join(accounts), len(accounts))
offlineIMAPstr = OFFLINEIMAP_BEGIN.format(",".join(accounts), len(accounts))
for name, account in accounts.items():
if account["imapstarttls"]:
secconf = "ssl = no"
@ -182,9 +186,11 @@ for name, account in accounts.items():
if "certificate" in account:
secconf += "\nCertificateFile {certificate}".format(**account)
imappassEscaped = account["imappass"].replace("\\", "\\\\")
mbsyncStr += MBSYNC_ACCOUNT.format(**account, secconf=secconf, imappassEscaped=imappassEscaped)
mbsyncFilepath = os.path.join(os.path.expanduser('~'), '.config/mel/mbsyncrc')
with open(mbsyncFilepath, 'w') as f:
mbsyncStr += MBSYNC_ACCOUNT.format(
**account, secconf=secconf, imappassEscaped=imappassEscaped
)
mbsyncFilepath = os.path.join(os.path.expanduser("~"), ".config/mel/mbsyncrc")
with open(mbsyncFilepath, "w") as f:
f.write(mbsyncStr)
# msmtp
@ -208,8 +214,8 @@ tls on
msmtpStr = MSMTP_BEGIN
for name, account in accounts.items():
msmtpStr += MSMTP_ACCOUNT.format(**account)
mbsyncFilepath = os.path.join(os.path.expanduser('~'), '.config/msmtp/config')
with open(mbsyncFilepath, 'w') as f:
mbsyncFilepath = os.path.join(os.path.expanduser("~"), ".config/msmtp/config")
with open(mbsyncFilepath, "w") as f:
f.write(msmtpStr)
@ -241,8 +247,8 @@ other_email = mails.copy()
other_email.remove(general["main"]["from"])
other_email = ";".join(other_email)
notmuchStr = NOTMUCH_BEGIN.format(**general, other_email=other_email)
mbsyncFilepath = os.path.join(os.path.expanduser('~'), '.config/notmuch-config')
with open(mbsyncFilepath, 'w') as f:
mbsyncFilepath = os.path.join(os.path.expanduser("~"), ".config/notmuch-config")
with open(mbsyncFilepath, "w") as f:
f.write(notmuchStr)
# mutt (temp)
@ -254,15 +260,15 @@ mailboxesStr = MAILBOXES_BEGIN
for name, account in accounts.items():
lines = "-" * (20 - len(name))
mailboxesStr += f' "+{name}{lines}"'
for root, dirs, files in os.walk(account['storage']):
for root, dirs, files in os.walk(account["storage"]):
if "cur" not in dirs or "new" not in dirs or "tmp" not in dirs:
continue
assert root.startswith(storageFull)
path = root[len(storageFull)+1:]
path = root[len(storageFull) + 1 :]
mailboxesStr += f' "+{path}"'
mailboxesStr += "\n"
mailboxesFilepath = os.path.join(os.path.expanduser('~'), '.mutt/mailboxes')
with open(mailboxesFilepath, 'w') as f:
mailboxesFilepath = os.path.join(os.path.expanduser("~"), ".mutt/mailboxes")
with open(mailboxesFilepath, "w") as f:
f.write(mailboxesStr)
## accounts
@ -296,14 +302,14 @@ for name, account in accounts.items():
muttStr = MUTT_ACCOUNT.format(**account)
# Config
muttFilepath = os.path.join(os.path.expanduser('~'), f'.mutt/accounts/{name}')
with open(muttFilepath, 'w') as f:
muttFilepath = os.path.join(os.path.expanduser("~"), f".mutt/accounts/{name}")
with open(muttFilepath, "w") as f:
f.write(muttStr)
# Signature
sigStr = account.get("sig", account.get("name", ""))
sigFilepath = os.path.join(os.path.expanduser('~'), f'.mutt/accounts/{name}.sig')
with open(sigFilepath, 'w') as f:
sigFilepath = os.path.join(os.path.expanduser("~"), f".mutt/accounts/{name}.sig")
with open(sigFilepath, "w") as f:
f.write(sigStr)
MUTT_SELECTOR = """
@ -324,13 +330,15 @@ hooks = ""
for name, account in accounts.items():
hooks += f"folder-hook {name}/* source ~/.mutt/accounts/{name}\n"
selectStr += MUTT_SELECTOR.format(**general, hooks=hooks)
selectFilepath = os.path.join(os.path.expanduser('~'), '.mutt/muttrc')
with open(selectFilepath, 'w') as f:
selectFilepath = os.path.join(os.path.expanduser("~"), ".mutt/muttrc")
with open(selectFilepath, "w") as f:
f.write(selectStr)
## Color
for name, account in accounts.items():
# Config
colorFilepath = os.path.join(os.path.expanduser('~'), f'{general["storage"]}/{name}/color')
with open(colorFilepath, 'w') as f:
f.write(account['color'])
colorFilepath = os.path.join(
os.path.expanduser("~"), f'{general["storage"]}/{name}/color'
)
with open(colorFilepath, "w") as f:
f.write(account["color"])

View file

@ -16,17 +16,17 @@ def main() -> None:
"""
Function that executes the script.
"""
for root, _, files in os.walk('.'):
for root, _, files in os.walk("."):
for filename in files:
match = re.match(r'^(\d+) - (.+)$', filename)
match = re.match(r"^(\d+) - (.+)$", filename)
if not match:
continue
new_filename = f"{match[1]} {match[2]}"
old_path = os.path.join(root, filename)
new_path = os.path.join(root, new_filename)
print(old_path, '->', new_path)
print(old_path, "->", new_path)
os.rename(old_path, new_path)
if __name__ == '__main__':
if __name__ == "__main__":
main()

View file

@ -6,7 +6,7 @@ import shutil
import logging
import coloredlogs
coloredlogs.install(level='DEBUG', fmt='%(levelname)s %(message)s')
coloredlogs.install(level="DEBUG", fmt="%(levelname)s %(message)s")
log = logging.getLogger()
MUSICS_FOLDER = os.path.join(os.path.expanduser("~"), "Musique")
@ -36,4 +36,3 @@ for f in sys.argv[1:]:
log.info("{} → {}".format(src, dst))
os.makedirs(dstFolder, exist_ok=True)
shutil.move(src, dst)

View file

@ -10,15 +10,17 @@ import logging
import coloredlogs
import argparse
coloredlogs.install(level='DEBUG', fmt='%(levelname)s %(message)s')
coloredlogs.install(level="DEBUG", fmt="%(levelname)s %(message)s")
log = logging.getLogger()
debug = None
class OvhCli():
class OvhCli:
ROOT = "https://api.ovh.com/1.0?null"
def __init__(self):
self.cacheDir = os.path.join(xdg.BaseDirectory.xdg_cache_home, 'ovhcli')
self.cacheDir = os.path.join(xdg.BaseDirectory.xdg_cache_home, "ovhcli")
# TODO Corner cases: links, cache dir not done, configurable cache
if not os.path.isdir(self.cacheDir):
assert not os.path.exists(self.cacheDir)
@ -26,18 +28,18 @@ class OvhCli():
def updateCache(self):
log.info("Downloading the API description")
rootJsonPath = os.path.join(self.cacheDir, 'root.json')
rootJsonPath = os.path.join(self.cacheDir, "root.json")
log.debug(f"{self.ROOT} -> {rootJsonPath}")
urllib.request.urlretrieve(self.ROOT, rootJsonPath)
with open(rootJsonPath, 'rt') as rootJson:
with open(rootJsonPath, "rt") as rootJson:
root = json.load(rootJson)
basePath = root['basePath']
basePath = root["basePath"]
for apiRoot in root['apis']:
fmt = 'json'
assert fmt in apiRoot['format']
path = apiRoot['path']
schema = apiRoot['schema'].format(format=fmt, path=path)
for apiRoot in root["apis"]:
fmt = "json"
assert fmt in apiRoot["format"]
path = apiRoot["path"]
schema = apiRoot["schema"].format(format=fmt, path=path)
apiJsonPath = os.path.join(self.cacheDir, schema[1:])
apiJsonUrl = basePath + schema
log.debug(f"{apiJsonUrl} -> {apiJsonPath}")
@ -47,11 +49,11 @@ class OvhCli():
urllib.request.urlretrieve(apiJsonUrl, apiJsonPath)
def createParser(self):
parser = argparse.ArgumentParser(description='Access the OVH API')
parser = argparse.ArgumentParser(description="Access the OVH API")
return parser
if __name__ == '__main__':
if __name__ == "__main__":
cli = OvhCli()
# cli.updateCache()
parser = cli.createParser()

View file

@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
import os
import re
@ -9,16 +9,16 @@ import PIL.ExifTags
import PIL.Image
import progressbar
EXTENSION_PATTERN = re.compile(r'\.JPE?G', re.I)
COMMON_PATTERN = re.compile(r'(IMG|DSC[NF]?|100|P10|f|t)_?\d+', re.I)
EXIF_TAG_NAME = 'DateTimeOriginal'
EXIF_TAG_ID = list(PIL.ExifTags.TAGS.keys())[list(
PIL.ExifTags.TAGS.values()).index(EXIF_TAG_NAME)]
EXIF_DATE_FORMAT = '%Y:%m:%d %H:%M:%S'
EXTENSION_PATTERN = re.compile(r"\.JPE?G", re.I)
COMMON_PATTERN = re.compile(r"(IMG|DSC[NF]?|100|P10|f|t)_?\d+", re.I)
EXIF_TAG_NAME = "DateTimeOriginal"
EXIF_TAG_ID = list(PIL.ExifTags.TAGS.keys())[
list(PIL.ExifTags.TAGS.values()).index(EXIF_TAG_NAME)
]
EXIF_DATE_FORMAT = "%Y:%m:%d %H:%M:%S"
def get_pictures(directory: str = ".", skip_renamed: bool = True) \
-> typing.Generator:
def get_pictures(directory: str = ".", skip_renamed: bool = True) -> typing.Generator:
for root, _, files in os.walk(directory):
for filename in files:
filename_trunk, extension = os.path.splitext(filename)
@ -44,9 +44,9 @@ def main() -> None:
if exif_data and EXIF_TAG_ID in exif_data:
date_raw = exif_data[EXIF_TAG_ID]
date = datetime.datetime.strptime(date_raw, EXIF_DATE_FORMAT)
new_name = date.isoformat().replace(':', '-') + '.jpg' # For NTFS
new_name = date.isoformat().replace(":", "-") + ".jpg" # For NTFS
print(full_path, new_name)
os.rename(full_path, new_name) # TODO FOLDER
os.rename(full_path, new_name) # TODO FOLDER
img.close()

View file

@ -9,14 +9,16 @@ from Xlib.protocol import rq
KEY = XK.XK_F7
def mute(state):
with pulsectl.Pulse('list-source') as pulse:
with pulsectl.Pulse("list-source") as pulse:
for source in pulse.source_list():
if source.port_active:
if source.mute != state:
pulse.mute(source, state)
print(f"{source.name} {'un' if not state else ''}muted")
mute(True)
local_dpy = display.Display()
@ -36,7 +38,8 @@ def record_callback(reply):
data = reply.data
while len(data):
event, data = rq.EventField(None).parse_binary_value(
data, record_dpy.display, None, None)
data, record_dpy.display, None, None
)
if event.type in [X.KeyPress, X.KeyRelease]:
keysym = local_dpy.keycode_to_keysym(event.detail, 0)
@ -45,29 +48,32 @@ def record_callback(reply):
if keysym == KEY:
mute(event.type == X.KeyRelease)
# Check if the extension is present
if not record_dpy.has_extension("RECORD"):
print("RECORD extension not found")
sys.exit(1)
r = record_dpy.record_get_version(0, 0)
print("RECORD extension version %d.%d" %
(r.major_version, r.minor_version))
print("RECORD extension version %d.%d" % (r.major_version, r.minor_version))
# Create a recording context; we only want key and mouse events
ctx = record_dpy.record_create_context(
0,
[record.AllClients],
[{
'core_requests': (0, 0),
'core_replies': (0, 0),
'ext_requests': (0, 0, 0, 0),
'ext_replies': (0, 0, 0, 0),
'delivered_events': (0, 0),
'device_events': (X.KeyPress, X.MotionNotify),
'errors': (0, 0),
'client_started': False,
'client_died': False,
}])
[
{
"core_requests": (0, 0),
"core_replies": (0, 0),
"ext_requests": (0, 0, 0, 0),
"ext_replies": (0, 0, 0, 0),
"delivered_events": (0, 0),
"device_events": (X.KeyPress, X.MotionNotify),
"errors": (0, 0),
"client_started": False,
"client_died": False,
}
],
)
# Enable the context; this only returns after a call to record_disable_context,
# while calling the callback function in the meantime

View file

@ -14,15 +14,15 @@ import typing
import coloredlogs
import r128gain
coloredlogs.install(level='DEBUG', fmt='%(levelname)s %(message)s')
coloredlogs.install(level="DEBUG", fmt="%(levelname)s %(message)s")
log = logging.getLogger()
# TODO Remove debug
# Constants
FORCE = '-f' in sys.argv
FORCE = "-f" in sys.argv
if FORCE:
sys.argv.remove('-f')
sys.argv.remove("-f")
if len(sys.argv) >= 2:
SOURCE_FOLDER = os.path.realpath(sys.argv[1])
else:
@ -66,6 +66,5 @@ for album in albums:
if not musicFiles:
continue
r128gain.process(musicFiles, album_gain=True,
skip_tagged=not FORCE, report=True)
r128gain.process(musicFiles, album_gain=True, skip_tagged=not FORCE, report=True)
print("==============================")

View file

@ -13,7 +13,7 @@ import progressbar
import logging
progressbar.streams.wrap_stderr()
coloredlogs.install(level='INFO', fmt='%(levelname)s %(message)s')
coloredlogs.install(level="INFO", fmt="%(levelname)s %(message)s")
log = logging.getLogger()
# 1) Create file list with conflict files
@ -21,21 +21,20 @@ log = logging.getLogger()
# 3) Propose what to do
def sizeof_fmt(num, suffix='B'):
def sizeof_fmt(num, suffix="B"):
# Stolen from https://stackoverflow.com/a/1094933
for unit in ['', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi']:
for unit in ["", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"]:
if abs(num) < 1024.0:
return "%3.1f %s%s" % (num, unit, suffix)
num /= 1024.0
return "%.1f %s%s" % (num, 'Yi', suffix)
return "%.1f %s%s" % (num, "Yi", suffix)
class Table():
class Table:
def __init__(self, width, height):
self.width = width
self.height = height
self.data = [['' for _ in range(self.height)]
for _ in range(self.width)]
self.data = [["" for _ in range(self.height)] for _ in range(self.width)]
def set(self, x, y, data):
self.data[x][y] = str(data)
@ -48,15 +47,15 @@ class Table():
l = len(cell)
width = widths[x]
if x > 0:
cell = ' | ' + cell
cell = cell + ' ' * (width - l)
print(cell, end='\t')
cell = " | " + cell
cell = cell + " " * (width - l)
print(cell, end="\t")
print()
class Database():
class Database:
VERSION = 1
CONFLICT_PATTERN = re.compile('\.sync-conflict-\d{8}-\d{6}-\w{7}')
CONFLICT_PATTERN = re.compile("\.sync-conflict-\d{8}-\d{6}-\w{7}")
def __init__(self, directory):
self.version = Database.VERSION
@ -83,18 +82,25 @@ class Database():
return sum(databaseFile.maxSize() for databaseFile in self.data.values())
def totalChecksumSize(self):
return sum(databaseFile.totalChecksumSize() for databaseFile in self.data.values())
return sum(
databaseFile.totalChecksumSize() for databaseFile in self.data.values()
)
def getList(self):
self.prune()
log.info("Finding conflict files")
widgets = [
progressbar.AnimatedMarker(), ' ',
progressbar.BouncingBar(), ' ',
progressbar.DynamicMessage('conflicts'), ' ',
progressbar.DynamicMessage('files'), ' ',
progressbar.DynamicMessage('dir', width=20, precision=20), ' ',
progressbar.AnimatedMarker(),
" ",
progressbar.BouncingBar(),
" ",
progressbar.DynamicMessage("conflicts"),
" ",
progressbar.DynamicMessage("files"),
" ",
progressbar.DynamicMessage("dir", width=20, precision=20),
" ",
progressbar.Timer(),
]
bar = progressbar.ProgressBar(widgets=widgets).start()
@ -104,7 +110,7 @@ class Database():
f += 1
if not Database.CONFLICT_PATTERN.search(conflictFilename):
continue
filename = Database.CONFLICT_PATTERN.sub('', conflictFilename)
filename = Database.CONFLICT_PATTERN.sub("", conflictFilename)
key = (root, filename)
if key in self.data:
dataFile = self.data[key]
@ -116,11 +122,13 @@ class Database():
dataFile.addConflict(filename)
dataFile.addConflict(conflictFilename)
bar.update(conflicts=len(self.data), files=f,
dir=root[(len(self.directory)+1):])
bar.update(
conflicts=len(self.data), files=f, dir=root[(len(self.directory) + 1) :]
)
bar.finish()
log.info(
f"Found {len(self.data)} conflicts, totalling {self.nbFiles()} conflict files.")
f"Found {len(self.data)} conflicts, totalling {self.nbFiles()} conflict files."
)
def getStats(self):
log.info("Getting stats from conflict files")
@ -132,25 +140,38 @@ class Database():
bar.update(f)
bar.finish()
log.info(
f"Total file size: {sizeof_fmt(self.totalSize())}, possible save: {sizeof_fmt(self.totalSize() - self.maxSize())}")
f"Total file size: {sizeof_fmt(self.totalSize())}, possible save: {sizeof_fmt(self.totalSize() - self.maxSize())}"
)
def getChecksums(self):
log.info("Checksumming conflict files")
widgets = [
progressbar.DataSize(), ' of ', progressbar.DataSize('max_value'),
' (', progressbar.AdaptiveTransferSpeed(), ') ',
progressbar.Bar(), ' ',
progressbar.DynamicMessage('dir', width=20, precision=20), ' ',
progressbar.DynamicMessage('file', width=20, precision=20), ' ',
progressbar.Timer(), ' ',
progressbar.DataSize(),
" of ",
progressbar.DataSize("max_value"),
" (",
progressbar.AdaptiveTransferSpeed(),
") ",
progressbar.Bar(),
" ",
progressbar.DynamicMessage("dir", width=20, precision=20),
" ",
progressbar.DynamicMessage("file", width=20, precision=20),
" ",
progressbar.Timer(),
" ",
progressbar.AdaptiveETA(),
]
bar = progressbar.DataTransferBar(
max_value=self.totalChecksumSize(), widgets=widgets).start()
max_value=self.totalChecksumSize(), widgets=widgets
).start()
f = 0
for databaseFile in self.data.values():
bar.update(f, dir=databaseFile.root[(
len(self.directory)+1):], file=databaseFile.filename)
bar.update(
f,
dir=databaseFile.root[(len(self.directory) + 1) :],
file=databaseFile.filename,
)
f += databaseFile.totalChecksumSize()
try:
databaseFile.getChecksums()
@ -172,9 +193,9 @@ class Database():
databaseFile.takeAction(execute=execute)
class DatabaseFile():
class DatabaseFile:
BLOCK_SIZE = 4096
RELEVANT_STATS = ('st_mode', 'st_uid', 'st_gid', 'st_size', 'st_mtime')
RELEVANT_STATS = ("st_mode", "st_uid", "st_gid", "st_size", "st_mtime")
def __init__(self, root, filename):
self.root = root
@ -260,7 +281,15 @@ class DatabaseFile():
oldChecksum = self.checksums[f]
# If it's been already summed, and we have the same inode and same ctime, don't resum
if oldStat is None or not isinstance(oldChecksum, int) or oldStat.st_size != newStat.st_size or oldStat.st_dev != newStat.st_dev or oldStat.st_ino != newStat.st_ino or oldStat.st_ctime != newStat.st_ctime or oldStat.st_dev != newStat.st_dev:
if (
oldStat is None
or not isinstance(oldChecksum, int)
or oldStat.st_size != newStat.st_size
or oldStat.st_dev != newStat.st_dev
or oldStat.st_ino != newStat.st_ino
or oldStat.st_ctime != newStat.st_ctime
or oldStat.st_dev != newStat.st_dev
):
self.checksums[f] = None
self.stats[f] = newStat
@ -270,7 +299,10 @@ class DatabaseFile():
self.checksums = [False] * len(self.conflicts)
# If all the files are the same inode, set as same files
if len(set([s.st_ino for s in self.stats])) == 1 and len(set([s.st_dev for s in self.stats])) == 1:
if (
len(set([s.st_ino for s in self.stats])) == 1
and len(set([s.st_dev for s in self.stats])) == 1
):
self.checksums = [True] * len(self.conflicts)
def getChecksums(self):
@ -282,7 +314,7 @@ class DatabaseFile():
if self.checksums[f] is not None:
continue
self.checksums[f] = 1
filedescs[f] = open(self.getPath(conflict), 'rb')
filedescs[f] = open(self.getPath(conflict), "rb")
while len(filedescs):
toClose = set()
@ -305,12 +337,13 @@ class DatabaseFile():
def getFeatures(self):
features = dict()
features['name'] = self.conflicts
features['sum'] = self.checksums
features["name"] = self.conflicts
features["sum"] = self.checksums
for statName in DatabaseFile.RELEVANT_STATS:
# Rounding beause I Syncthing also rounds
features[statName] = [
int(stat.__getattribute__(statName)) for stat in self.stats]
int(stat.__getattribute__(statName)) for stat in self.stats
]
return features
def getDiffFeatures(self):
@ -327,7 +360,7 @@ class DatabaseFile():
if match:
return match[0][15:]
else:
return '-'
return "-"
def printInfos(self, diff=True):
print(os.path.join(self.root, self.filename))
@ -335,14 +368,13 @@ class DatabaseFile():
features = self.getDiffFeatures()
else:
features = self.getFeatures()
features['name'] = [DatabaseFile.shortConflict(
c) for c in self.conflicts]
table = Table(len(features), len(self.conflicts)+1)
features["name"] = [DatabaseFile.shortConflict(c) for c in self.conflicts]
table = Table(len(features), len(self.conflicts) + 1)
for x, featureName in enumerate(features.keys()):
table.set(x, 0, featureName)
for x, featureName in enumerate(features.keys()):
for y in range(len(self.conflicts)):
table.set(x, y+1, features[featureName][y])
table.set(x, y + 1, features[featureName][y])
table.print()
def decideAction(self, mostRecent=False):
@ -357,10 +389,10 @@ class DatabaseFile():
if len(features) == 1:
reason = "same files"
self.action = 0
elif 'st_mtime' in features and mostRecent:
recentTime = features['st_mtime'][0]
elif "st_mtime" in features and mostRecent:
recentTime = features["st_mtime"][0]
recentIndex = 0
for index, time in enumerate(features['st_mtime']):
for index, time in enumerate(features["st_mtime"]):
if time > recentTime:
recentTime = time
recentIndex = 0
@ -368,11 +400,11 @@ class DatabaseFile():
reason = "most recent"
if self.action is None:
log.warning(
f"{self.root}/{self.filename}: skip, cause: {reason}")
log.warning(f"{self.root}/{self.filename}: skip, cause: {reason}")
else:
log.info(
f"{self.root}/{self.filename}: keep {DatabaseFile.shortConflict(self.conflicts[self.action])}, cause: {reason}")
f"{self.root}/{self.filename}: keep {DatabaseFile.shortConflict(self.conflicts[self.action])}, cause: {reason}"
)
def takeAction(self, execute=False):
if self.action is None:
@ -380,7 +412,8 @@ class DatabaseFile():
actionName = self.conflicts[self.action]
if actionName != self.filename:
log.debug(
f"Rename {self.getPath(actionName)} → {self.getPath(self.filename)}")
f"Rename {self.getPath(actionName)} → {self.getPath(self.filename)}"
)
if execute:
os.rename(self.getPath(actionName), self.getPath(self.filename))
for conflict in self.conflicts:
@ -390,22 +423,33 @@ class DatabaseFile():
if execute:
os.unlink(self.getPath(conflict))
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Handle Syncthing's .sync-conflict files ")
description="Handle Syncthing's .sync-conflict files "
)
# Execution flow
parser.add_argument('directory', metavar='DIRECTORY',
nargs='?', help='Directory to analyse')
parser.add_argument('-d', '--database',
help='Database path for file informations')
parser.add_argument('-r', '--most-recent', action='store_true',
help='Always keep the most recent version')
parser.add_argument('-e', '--execute', action='store_true',
help='Really apply changes')
parser.add_argument('-p', '--print', action='store_true',
help='Only print differences between files')
parser.add_argument(
"directory", metavar="DIRECTORY", nargs="?", help="Directory to analyse"
)
parser.add_argument("-d", "--database", help="Database path for file informations")
parser.add_argument(
"-r",
"--most-recent",
action="store_true",
help="Always keep the most recent version",
)
parser.add_argument(
"-e", "--execute", action="store_true", help="Really apply changes"
)
parser.add_argument(
"-p",
"--print",
action="store_true",
help="Only print differences between files",
)
args = parser.parse_args()
@ -419,13 +463,17 @@ if __name__ == "__main__":
if args.database:
if os.path.isfile(args.database):
try:
with open(args.database, 'rb') as databaseFile:
with open(args.database, "rb") as databaseFile:
database = pickle.load(databaseFile)
assert isinstance(database, Database)
except BaseException as e:
raise ValueError("Not a database file")
assert database.version <= Database.VERSION, "Version of the loaded database is too recent"
assert database.directory == args.directory, "Directory of the loaded database doesn't match"
assert (
database.version <= Database.VERSION
), "Version of the loaded database is too recent"
assert (
database.directory == args.directory
), "Directory of the loaded database doesn't match"
if database is None:
database = Database(args.directory)
@ -433,7 +481,7 @@ if __name__ == "__main__":
def saveDatabase():
if args.database:
global database
with open(args.database, 'wb') as databaseFile:
with open(args.database, "wb") as databaseFile:
pickle.dump(database, databaseFile)
database.getList()

View file

@ -25,7 +25,11 @@ if __name__ == "__main__":
)
parser.add_argument("-p", "--port", env_var="PORT", default=25)
parser.add_argument(
"-S", "--security", env_var="SECURITY", choices=["plain", "ssl", "starttls"], default='plain'
"-S",
"--security",
env_var="SECURITY",
choices=["plain", "ssl", "starttls"],
default="plain",
)
parser.add_argument("-l", "--helo", env_var="HELO")
@ -67,7 +71,7 @@ if __name__ == "__main__":
args.to = args.receiver
if args.password:
password = args.password
args.password = '********'
args.password = "********"
# Transmission content
@ -163,7 +167,7 @@ Input arguments:
# Transmission
if args.security != 'starttls':
if args.security != "starttls":
recv()
send(f"EHLO {args.helo}")
if args.user:

View file

@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
import sys
import random

View file

@ -3,9 +3,9 @@
import os
import shutil
curDir = os.path.realpath('.')
assert '.stversions/' in curDir
tgDir = curDir.replace('.stversions/', '')
curDir = os.path.realpath(".")
assert ".stversions/" in curDir
tgDir = curDir.replace(".stversions/", "")
for root, dirs, files in os.walk(curDir):
@ -17,4 +17,3 @@ for root, dirs, files in os.walk(curDir):
dstPath = os.path.join(dstRoot, dstF)
print(f"{srcPath} → {dstPath}")
shutil.copy2(srcPath, dstPath)

View file

@ -11,7 +11,6 @@ filenames = sys.argv[2:]
for filename in filenames:
assert os.path.isfile(filename)
exifDict = piexif.load(filename)
exifDict['0th'][piexif.ImageIFD.Copyright] = creator.encode()
exifDict["0th"][piexif.ImageIFD.Copyright] = creator.encode()
exifBytes = piexif.dump(exifDict)
piexif.insert(exifBytes, filename)

View file

@ -11,22 +11,29 @@ if N < 2:
print("Ben reste chez toi alors.")
sys.exit(1)
def trajet_str(a, b):
return f"{gares[a]} → {gares[b]}"
def chemin_str(stack):
return ", ".join([trajet_str(stack[i], stack[i+1]) for i in range(len(stack)-1)])
return ", ".join(
[trajet_str(stack[i], stack[i + 1]) for i in range(len(stack) - 1)]
)
# Demande des prix des trajets
prices = dict()
for i in range(N):
for j in range(N-1, i, -1):
for j in range(N - 1, i, -1):
p = None
while not isinstance(p, float):
try:
p = float(input(f"Prix du trajet {trajet_str(i, j)} ? ").replace(',', '.'))
p = float(
input(f"Prix du trajet {trajet_str(i, j)} ? ").replace(",", ".")
)
except ValueError:
print("C'est pas un prix ça !")
if i not in prices:
@ -40,8 +47,9 @@ miniStack = None
maxiPrice = -inf
maxiStack = None
def register_path(stack):
price = sum([prices[stack[i]][stack[i+1]]for i in range(len(stack)-1)])
price = sum([prices[stack[i]][stack[i + 1]] for i in range(len(stack) - 1)])
global miniPrice, maxiPrice, miniStack, maxiStack
if price < miniPrice:
@ -52,6 +60,7 @@ def register_path(stack):
maxiStack = stack.copy()
print(f"{chemin_str(stack)} = {price:.2f} €")
stack = [0]
while stack[0] == 0:
if stack[-1] >= N - 1:
@ -59,7 +68,7 @@ while stack[0] == 0:
stack.pop()
stack[-1] += 1
else:
stack.append(stack[-1]+1)
stack.append(stack[-1] + 1)
print(f"Prix minimum: {chemin_str(miniStack)} = {miniPrice:.2f} €")
print(f"Prix maximum: {chemin_str(maxiStack)} = {maxiPrice:.2f} €")

View file

@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
# pylint: disable=C0103,W0621
# pip install tmdbv3api
@ -20,8 +20,8 @@ Episode = typing.Any # TODO
# Constants
API_KEY_PASS_PATH = 'http/themoviedb.org'
VIDEO_EXTENSIONS = {'mp4', 'mkv', 'avi', 'webm'}
API_KEY_PASS_PATH = "http/themoviedb.org"
VIDEO_EXTENSIONS = {"mp4", "mkv", "avi", "webm"}
# Functions
@ -31,12 +31,12 @@ def get_pass_data(path: str) -> typing.Dict[str, str]:
Returns the data stored in the Unix password manager
given its path.
"""
run = subprocess.run(['pass', path], stdout=subprocess.PIPE, check=True)
lines = run.stdout.decode().split('\n')
run = subprocess.run(["pass", path], stdout=subprocess.PIPE, check=True)
lines = run.stdout.decode().split("\n")
data = dict()
data['pass'] = lines[0]
data["pass"] = lines[0]
for line in lines[1:]:
match = re.match(r'(\w+): ?(.+)', line)
match = re.match(r"(\w+): ?(.+)", line)
if match:
data[match[1]] = match[2]
return data
@ -44,24 +44,27 @@ def get_pass_data(path: str) -> typing.Dict[str, str]:
def confirm(text: str) -> bool:
res = input(text + " [yn] ")
while res not in ('y', 'n'):
while res not in ("y", "n"):
res = input("Please answer with y or n: ")
return res == 'y'
return res == "y"
def episode_identifier(episode: typing.Any) -> str:
return f"S{episode['season_number']:02d}E" + \
f"{episode['episode_number']:02d} {episode['name']}"
return (
f"S{episode['season_number']:02d}E"
+ f"{episode['episode_number']:02d} {episode['name']}"
)
dryrun = '-n' in sys.argv
dryrun = "-n" in sys.argv
if dryrun:
dryrun = True
sys.argv.remove('-n')
sys.argv.remove("-n")
# Connecting to TMBDB
tmdb = tmdbv3api.TMDb()
tmdb.api_key = get_pass_data(API_KEY_PASS_PATH)['api']
tmdb.api_key = get_pass_data(API_KEY_PASS_PATH)["api"]
tmdb.language = sys.argv[1]
# Searching the TV show name (by current directory name)
@ -71,8 +74,8 @@ if len(sys.argv) >= 3:
show_name = sys.argv[2]
else:
show_name = os.path.split(os.path.realpath(os.path.curdir))[1]
if '(' in show_name:
show_name = show_name.split('(')[0].strip()
if "(" in show_name:
show_name = show_name.split("(")[0].strip()
search = tv.search(show_name)
@ -86,15 +89,14 @@ for res in search:
break
if not show:
print("Could not find a matching " +
f"show on TheMovieDatabase for {show_name}.")
print("Could not find a matching " + f"show on TheMovieDatabase for {show_name}.")
sys.exit(1)
# Retrieving all the episode of the show
episodes: typing.List[Episode] = list()
print(f"List of episodes for {show.name}:")
for season_number in range(0, show.number_of_seasons+1):
for season_number in range(0, show.number_of_seasons + 1):
season_details = season.details(show.id, season_number)
try:
@ -119,22 +121,22 @@ for root, dirs, files in os.walk(os.path.curdir):
print(f"- {filename}")
def get_episode(season_number: int, episode_number: int
) -> typing.Optional[Episode]:
def get_episode(season_number: int, episode_number: int) -> typing.Optional[Episode]:
# TODO Make more efficient using indexing
for episode in episodes:
if episode['season_number'] == season_number \
and episode['episode_number'] == episode_number:
if (
episode["season_number"] == season_number
and episode["episode_number"] == episode_number
):
return episode
return None
# Matching movie files to episode
associations: typing.List[typing.Tuple[typing.Tuple[str,
str], Episode]] = list()
associations: typing.List[typing.Tuple[typing.Tuple[str, str], Episode]] = list()
for video in videos:
root, filename = video
match = re.search(r'S(\d+)E(\d+)', filename)
match = re.search(r"S(\d+)E(\d+)", filename)
print(f"Treating file: {root}/{filename}")
episode = None
season_number = 0
@ -155,7 +157,8 @@ for video in videos:
episode = get_episode(season_number, episode_number)
if not episode:
print(
f" could not find episode S{season_number:02d}E{episode_number:02d} in TMBD")
f" could not find episode S{season_number:02d}E{episode_number:02d} in TMBD"
)
# Skip
if not episode:

View file

@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
import os
import subprocess

View file

@ -10,7 +10,7 @@ import re
import coloredlogs
import progressbar
coloredlogs.install(level='DEBUG', fmt='%(levelname)s %(message)s')
coloredlogs.install(level="DEBUG", fmt="%(levelname)s %(message)s")
log = logging.getLogger()
# Constants
@ -18,8 +18,14 @@ SOURCE_FOLDER = os.path.join(os.path.expanduser("~"), "Musiques")
OUTPUT_FOLDER = os.path.join(os.path.expanduser("~"), ".MusiqueCompressed")
CONVERSIONS = {"flac": "opus"}
FORBIDDEN_EXTENSIONS = ["jpg", "png", "pdf", "ffs_db"]
FORGIVEN_FILENAMES = ["cover.jpg", "front.jpg", "folder.jpg",
"cover.png", "front.png", "folder.png"]
FORGIVEN_FILENAMES = [
"cover.jpg",
"front.jpg",
"folder.jpg",
"cover.png",
"front.png",
"folder.png",
]
IGNORED_EMPTY_FOLDER = [".stfolder"]
RESTRICT_CHARACTERS = '[\0\\/:*"<>|]' # FAT32, NTFS
# RESTRICT_CHARACTERS = '[:/]' # HFS, HFS+
@ -55,7 +61,7 @@ def convertPath(path: str) -> typing.Optional[str]:
extension = extension[1:].lower()
# Remove unwanted characters from filename
filename_parts = os.path.normpath(filename).split(os.path.sep)
filename_parts = [re.sub(RESTRICT_CHARACTERS, '_', part) for part in filename_parts]
filename_parts = [re.sub(RESTRICT_CHARACTERS, "_", part) for part in filename_parts]
filename = os.path.sep.join(filename_parts)
# If the extension isn't allowed
if extension in FORBIDDEN_EXTENSIONS:
@ -103,7 +109,7 @@ for sourceFile in remainingConversions:
# Converting
fullSourceFile = os.path.join(SOURCE_FOLDER, sourceFile)
if sourceFile == outputFile:
log.debug('%s → %s', fullSourceFile, fullOutputFile)
log.debug("%s → %s", fullSourceFile, fullOutputFile)
if act and os.path.isfile(fullOutputFile):
os.remove(fullOutputFile)
os.link(fullSourceFile, fullOutputFile)
@ -113,19 +119,33 @@ for sourceFile in remainingConversions:
log.info("Removing extra files")
for extraFile in extraFiles:
fullExtraFile = os.path.join(OUTPUT_FOLDER, extraFile)
log.debug('× %s', fullExtraFile)
log.debug("× %s", fullExtraFile)
if act:
os.remove(fullExtraFile)
log.info("Listing files that will be converted")
for fullSourceFile, fullOutputFile in conversions:
log.debug('%s ⇒ %s', fullSourceFile, fullOutputFile)
log.debug("%s ⇒ %s", fullSourceFile, fullOutputFile)
log.info("Converting files")
for fullSourceFile, fullOutputFile in progressbar.progressbar(conversions):
cmd = ["ffmpeg", "-y", "-i", fullSourceFile, "-c:a", "libopus",
"-movflags", "+faststart", "-b:a", "128k", "-vbr", "on",
"-compression_level", "10", fullOutputFile]
cmd = [
"ffmpeg",
"-y",
"-i",
fullSourceFile,
"-c:a",
"libopus",
"-movflags",
"+faststart",
"-b:a",
"128k",
"-vbr",
"on",
"-compression_level",
"10",
fullOutputFile,
]
if act:
subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
else:

View file

@ -10,11 +10,9 @@ import subprocess
files = sys.argv[1:]
remove = False
if '-r' in files:
files.remove('-r')
if "-r" in files:
files.remove("-r")
remove = True
for f in files:
print(os.path.splitext(f))

View file

@ -1,6 +1,5 @@
#!/usr/bin/env python
#!/usr/bin/env python3
import os
import sys
import subprocess
import logging