diff --git a/config/scripts/mel b/config/scripts/mel index 693d6c2..692f238 100755 --- a/config/scripts/mel +++ b/config/scripts/mel @@ -1,4 +1,5 @@ #!/usr/bin/env python3 +# pylint: disable=C0103,W0603,W0621,E1101 """ Meh mail client @@ -7,73 +8,96 @@ to become a fully-functional extremly-opinonated mail client. """ # TODO Features - # TODO Implement initial command set - # TODO Lockfiles for write operations on mail files (mbsync, tags→maildir operations) - # TODO OPTI Lockfile per account and process everything in parallel (if implemented, this - # should be optional since while it may speed up the mail fetching process, its multi-threading - # nature would cause a lot of cache flushes and be not very efficient on battery) - # TODO Handle true character width - # TODO IMAP IDLE watches? - # TODO GPG +# TODO Implement initial command set +# TODO Lockfiles for write operations on mail files (mbsync, +# tags→maildir operations) +# TODO OPTI Lockfile per account and process everything in parallel +# (if implemented, this should be optional since while it may speed up +# the mail fetching process, its multi-threading nature would cause a +# lot of cache flushes and be not very efficient on battery) +# TODO Handle true character width +# TODO IMAP IDLE watches? +# TODO GPG # TODO (only then) Refactor - # TODO OOP-based - # TODO Merge file with melConf +# TODO OOP-based +# TODO Merge file with melConf +# TODO Un-ignore pyling warnings -# DEBUG Small perf profiler -import time -perf_dict = dict() -perf_last = time.perf_counter() -def perfstep(name): - t = time.perf_counter() - global perf_last - global perf_dict - diff = t - perf_last - if name not in perf_dict: - perf_dict[name] = 0 - perf_dict[name] += diff - perf_last = time.perf_counter() - - -import notmuch -import logging -import coloredlogs -import colorama -import datetime -import os -import progressbar import argparse import configparser -import base64 -import shutil -import argparse -import xdg.BaseDirectory -import sys -import subprocess -import html -import re +import datetime +import email.message import email.parser +import html +import logging +import os +import re +import shutil +import subprocess +import sys +import time +import typing -perfstep("import") +import colorama +import coloredlogs +import notmuch +import progressbar +import xdg.BaseDirectory -ACCOUNTS = dict() -ALIASES = set() -db = None -config = None +PERF_LAST = time.perf_counter() +PERF_DICT: typing.Dict[str, float] = dict() -def notmuch_new(): +a: typing.Any = 'DEBUG VARIABLE (empty)' + + +def perfstep(name: str) -> None: + """ + DEBUG + Small performance profiler to measure steps. + Call with the name of the step when you just finished it. + """ + current_time = time.perf_counter() + global PERF_LAST + global PERF_DICT + diff = current_time - PERF_LAST + if name not in PERF_DICT: + PERF_DICT[name] = 0.0 + PERF_DICT[name] += diff + PERF_LAST = time.perf_counter() + + +ACCOUNTS: typing.Dict[str, configparser.SectionProxy] = dict() +ALIASES: typing.Set[str] = set() # All the emails the user is represented as +# TODO If the user send emails to himself, maybe that wont cut it. +DB = None +CONFIG = None + + +def notmuch_new() -> None: + """ + Runs `notmuch new`, which basically update the database + to match the mail folder. + """ close_database() log.info("Indexing mails") - notmuchConfigPath = os.path.expanduser("~/.config/notmuch-config") # TODO Better + notmuchConfigPath = os.path.expanduser( + "~/.config/notmuch-config") # TODO Better cmd = ["notmuch", "--config", notmuchConfigPath, "new"] log.debug(" ".join(cmd)) - subprocess.run(cmd) + subprocess.run(cmd, check=True) -def list_folders(): - storagePath = os.path.realpath(os.path.expanduser(config["GENERAL"]["storage"])) + +def list_folders() -> typing.List[typing.Tuple[str, ...]]: + """ + List all the folders of the mail dir. + """ + assert CONFIG + storagePath = os.path.realpath( + os.path.expanduser(CONFIG["GENERAL"]["storage"])) folders = list() - for account in ACCOUNTS.keys(): + for account in ACCOUNTS: storagePathAccount = os.path.join(storagePath, account) - for root, dirs, files in os.walk(storagePathAccount): + for root, dirs, _ in os.walk(storagePathAccount): if "cur" not in dirs or "new" not in dirs or "tmp" not in dirs: continue assert root.startswith(storagePath) @@ -84,31 +108,45 @@ def list_folders(): folders.append(tuple(pathSplit)) return folders -def open_database(write=False): - global db - mode = notmuch.Database.MODE.READ_WRITE if write else notmuch.Database.MODE.READ_ONLY - if db: - if db.mode == mode: + +def open_database(write: bool = False) -> None: + """ + Open an access notmuch database in read or read+write mode. + It is stored in the global DB. + Be sure to require only in the mode you want to avoid deadlocks. + """ + assert CONFIG + global DB + mode = notmuch.Database.MODE.READ_WRITE if write \ + else notmuch.Database.MODE.READ_ONLY + if DB: + if DB.mode == mode: return - else: - log.info("Current database not in required mode, closing") - close_database() - log.info("Opening database in mode {}".format(mode)) - dbPath = os.path.realpath(os.path.expanduser(config["GENERAL"]["storage"])) - db = notmuch.Database(mode=mode, path=dbPath) + log.info("Current database not in mode %s, closing", mode) + close_database() + log.info("Opening database in mode %s", mode) + dbPath = os.path.realpath(os.path.expanduser(CONFIG["GENERAL"]["storage"])) + DB = notmuch.Database(mode=mode, path=dbPath) -def close_database(): - global db - if db: + +def close_database() -> None: + """ + Close the access notmuch database in read or read+write mode. + It is stored in the global DB. + """ + global DB + if DB: log.info("Closing database") - db.close() - db = None + DB.close() + DB = None -def generate_aliases(): - for name in config.sections(): + +def generate_aliases() -> None: + assert CONFIG + for name in CONFIG.sections(): if not name.islower(): continue - section = config[name] + section = CONFIG[name] ALIASES.add(section["from"]) if "alternatives" in section: for alt in section["alternatives"].split(";"): @@ -116,10 +154,18 @@ def generate_aliases(): ACCOUNTS[name] = section -def get_location(msg): +MailLocation = typing.NewType('MailLocation', typing.Tuple[str, str, str]) + + +def get_location(msg: notmuch.Message) -> MailLocation: + """ + Return the filesystem location (relative to the mail directory) + of the given message. + """ path = msg.get_filename() path = os.path.dirname(path) - base = db.get_path() + assert DB + base = DB.get_path() assert path.startswith(base) path = path[len(base):] pathSplit = path.split('/') @@ -130,11 +176,19 @@ def get_location(msg): assert state in {'cur', 'tmp', 'new'} return (mailbox, folder, state) -MAILBOX_COLORS = dict() -def get_mailbox_color(mailbox): +MAILBOX_COLORS: typing.Dict[str, str] = dict() + + +def get_mailbox_color(mailbox: str) -> str: + """ + Return the color of the given mailbox in a ready to print + string with ASCII escape codes. + """ + # TODO Do not use 256³ colors but 16 colors + assert CONFIG if mailbox not in MAILBOX_COLORS: - colorStr = config[mailbox]["color"] + colorStr = CONFIG[mailbox]["color"] colorStr = colorStr[1:] if colorStr[0] == '#' else colorStr R = int(colorStr[0:2], 16) G = int(colorStr[2:4], 16) @@ -142,50 +196,80 @@ def get_mailbox_color(mailbox): MAILBOX_COLORS[mailbox] = '\x1b[38;2;{};{};{}m'.format(R, G, B) return MAILBOX_COLORS[mailbox] -def format_date(date): + +def format_date(date: datetime.datetime) -> str: + """ + Format the given date as a 9-characters width string. + Show the time if the mail is less than 24h old, + else show the date. + """ + # TODO Do as the description say now = datetime.datetime.now() midnight = datetime.datetime(year=now.year, month=now.month, day=now.day) if date > midnight: return date.strftime('%H:%M:%S') - else: - return date.strftime('%d/%m/%y') + # TODO Use my favourite date system + return date.strftime('%d/%m/%y') + WIDTH_FIXED = 31 WIDTH_RATIO_DEST_SUBJECT = 0.3 ISATTY = sys.stdout.isatty() -destWidth = None -subjectWidth = None -def compute_line_format(): - if ISATTY: - columns, rows = shutil.get_terminal_size((80, 20)) - remain = columns - WIDTH_FIXED - 1 - global destWidth, subjectWidth - destWidth = int(remain * WIDTH_RATIO_DEST_SUBJECT) - subjectWidth = remain - destWidth - else: - destWidth = None - subjectWidth = None +DEST_WIDTH: typing.Optional[int] = None +SUBJECT_WIDTH: typing.Optional[int] = None -def clip_text(size, text): + +def compute_line_format() -> None: + """ + Based on the terminal width, assign the width of flexible columns. + """ + if ISATTY: + columns, _ = shutil.get_terminal_size((80, 20)) + remain = columns - WIDTH_FIXED - 1 + global DEST_WIDTH, SUBJECT_WIDTH + DEST_WIDTH = int(remain * WIDTH_RATIO_DEST_SUBJECT) + SUBJECT_WIDTH = remain - DEST_WIDTH + else: + DEST_WIDTH = None + SUBJECT_WIDTH = None + + +def clip_text(size: int, text: str) -> str: + """ + Fit text into the given character size, + fill with spaces if shorter, + clip with … if larger. + """ if size is None: return text - l = len(text) - if l == size: + length = len(text) + if length == size: return text - elif l > size: + if length > size: return text[:size-1] + '…' - elif l < size: - return text + " " * (size - l) + return text + ' ' * (size - length) -def print_msg(msg): - if not destWidth: +def isUID(uid: typing.Any) -> bool: + """ + Tells if the provided string is a valid UID. + """ + return isinstance(uid, str) and len(uid) == 12 \ + and bool(re.match('^[a-zA-Z0-9+/]{12}$', uid)) + + +def print_msg(msg: notmuch.Message) -> None: + """ + Print the given message header on one line. + """ + if not DEST_WIDTH: compute_line_format() + assert DEST_WIDTH and SUBJECT_WIDTH sep = " " if ISATTY else "\t" line = "" tags = set(msg.get_tags()) - mailbox, folder, state = get_location(msg) + mailbox, _, _ = get_location(msg) if ISATTY: line += get_mailbox_color(mailbox) @@ -194,7 +278,7 @@ def print_msg(msg): for tag in tags: if tag.startswith('tuid'): uid = tag[4:] - assert isUID(uid), uid + assert uid and isUID(uid), "{uid} ({type(UID)}) is not a valid UID." line += uid # Date @@ -204,8 +288,14 @@ def print_msg(msg): # Icons line += sep - def tags2col1(tag1, tag2, both, first, second, none): + + def tags2col1(tag1: str, tag2: str, + characters: typing.Tuple[str, str, str, str]) -> None: + """ + Show the presence/absence of two tags with one character. + """ nonlocal line + both, first, second, none = characters if tag1 in tags: if tag2 in tags: line += both @@ -217,30 +307,46 @@ def print_msg(msg): else: line += none - tags2col1('spam', 'draft', '?', 'S', 'D', ' ') - tags2col1('attachment', 'encrypted', 'E', 'A', 'E', ' ') - tags2col1('unread', 'flagged', '!', 'U', 'F', ' ') - tags2col1('sent', 'replied', '?', '↑', '↪', ' ') + tags2col1('spam', 'draft', ('?', 'S', 'D', ' ')) + tags2col1('attachment', 'encrypted', ('E', 'A', 'E', ' ')) + tags2col1('unread', 'flagged', ('!', 'U', 'F', ' ')) + tags2col1('sent', 'replied', ('?', '↑', '↪', ' ')) if 'sent' in tags: dest = msg.get_header("to") else: dest = msg.get_header("from") line += sep - line += clip_text(destWidth, dest) + line += clip_text(DEST_WIDTH, dest) # Subject line += sep subject = msg.get_header("subject") - line += clip_text(subjectWidth, subject) + line += clip_text(SUBJECT_WIDTH, subject) if ISATTY: line += colorama.Style.RESET_ALL print(line) -def retag_msg(msg): - mailbox, folder, state = get_location(msg) +def extract_email(field: str) -> str: + """ + Extract the email adress from a To: or From: field + (usually the whole field or between < >) + """ + try: + sta = field.index('<') + sto = field.index('>') + return field[sta+1:sto] + except ValueError: + return field + + +def retag_msg(msg: notmuch.Message) -> None: + """ + Update automatic tags for message. + """ + _, folder, _ = get_location(msg) # Search-friendly folder name slugFolderList = list() @@ -252,7 +358,11 @@ def retag_msg(msg): tags = set(msg.get_tags()) - def tag_if(tag, condition): + def tag_if(tag: str, condition: bool) -> None: + """ + Ensure the presence/absence of tag depending on the condition. + """ + nonlocal msg if condition and tag not in tags: msg.add_tag(tag) elif not condition and tag in tags: @@ -260,7 +370,7 @@ def retag_msg(msg): expeditor = extract_email(msg.get_header('from')) tag_if('inbox', slugFolder[0] == 'INBOX') - tag_if('spam', slugFolder[0] == 'JUNK' or slugFolder[0] == 'SPAM') + tag_if('spam', slugFolder[0] in ('JUNK', 'SPAM')) tag_if('deleted', slugFolder[0] == 'TRASH') tag_if('draft', slugFolder[0] == 'DRAFTS') tag_if('sent', expeditor in ALIASES) @@ -269,7 +379,7 @@ def retag_msg(msg): # UID uid = msg.get_header("X-TUID") if not isUID(uid): - # TODO Happens to sent mails but should it?k + # TODO Happens to sent mails but should it? print(f"{msg.get_filename()} has no UID!") return uidtag = 'tuid{}'.format(uid) @@ -280,28 +390,25 @@ def retag_msg(msg): msg.add_tag(uidtag) - -def extract_email(field): - try: - sta = field.index('<') - sto = field.index('>') - return field[sta+1:sto] - except ValueError: - return field - -def applyMsgs(queryStr, action, *args, showProgress=False, write=False, closeDb=True, **kwargs): +def applyMsgs(queryStr: str, action: typing.Callable, *args: typing.Any, + showProgress: bool = False, write: bool = False, + closeDb: bool = True, **kwargs: typing.Any) -> int: + """ + Run a function on the messages selected by the given query. + """ open_database(write=write) - log.info("Querying {}".format(queryStr)) - query = notmuch.Query(db, queryStr) + log.info("Querying %s", queryStr) + query = notmuch.Query(DB, queryStr) query.set_sort(notmuch.Query.SORT.OLDEST_FIRST) elements = query.search_messages() nbMsgs = query.count_messages() - iterator = progressbar.progressbar(elements, max_value=nbMsgs) if showProgress else elements + iterator = progressbar.progressbar( + elements, max_value=nbMsgs) if showProgress else elements - log.info("Executing {}".format(action)) + log.info("Executing %s", action) for msg in iterator: if write: msg.freeze() @@ -317,8 +424,12 @@ def applyMsgs(queryStr, action, *args, showProgress=False, write=False, closeDb= return nbMsgs -def notify_msg(msg): - log.info("Sending notification for {}".format(msg)) + +def notify_msg(msg: notmuch.Message) -> None: + """ + Send a notification for the given message. + """ + log.info("Sending notification for %s", msg) subject = msg.get_header("subject") expd = msg.get_header("from") account, _, _ = get_location(msg) @@ -327,31 +438,40 @@ def notify_msg(msg): body = html.escape(subject) cmd = ["notify-send", "-u", "low", "-i", "mail-message-new", summary, body] print(' '.join(cmd)) - subprocess.run(cmd) + subprocess.run(cmd, check=False) -def notify_all(*args, **kwargs): +def notify_all() -> None: + """ + Send a notification for unprocessed and unread message. + Basically should only send a notification for a given message once + since it should be marked as processed right after. + """ open_database() nbMsgs = applyMsgs('tag:unread and tag:unprocessed', notify_msg) if nbMsgs > 0: - log.info("Playing notification sound ({} new message(s))".format(nbMsgs)) - cmd = ["play", "-n", "synth", "sine", "E4", "sine", "A5", "remix", "1-2", "fade", "0.5", "1.2", "0.5", "2"] - subprocess.run(cmd) + log.info("Playing notification sound (%d new message(s))", nbMsgs) + cmd = ["play", "-n", "synth", "sine", "E4", "sine", "A5", + "remix", "1-2", "fade", "0.5", "1.2", "0.5", "2"] + subprocess.run(cmd, check=False) close_database() -def isUID(uid): - return isinstance(uid, str) and len(uid) == 12 and re.match('^[a-zA-Z0-9+/]{12}$', uid) -# From https://stackoverflow.com/a/312464 -def chunks(l, n): +def chunks(l: str, n: int) -> typing.Iterable[str]: """Yield successive n-sized chunks from l.""" + # From https://stackoverflow.com/a/312464 for i in range(0, len(l), n): yield l[i:i + n] -def apply_msgs_input(argmessages, action, write=False): - if not len(argmessages): + +def apply_msgs_input(argmessages: typing.List[str], action: typing.Callable, + write: bool = False) -> None: + """ + Run a function on the message given by the user. + """ + if not argmessages: fromStdin = not sys.stdin.isatty() - else: + if argmessages: fromStdin = len(argmessages) == 1 and argmessages == '-' messages = list() @@ -359,58 +479,89 @@ def apply_msgs_input(argmessages, action, write=False): for line in sys.stdin: uid = line[:12] if not isUID(uid): - log.error("Not an UID: {}".format(uid)) + log.error("Not an UID: %s", uid) continue messages.append(uid) else: for uids in argmessages: if len(uids) > 12: - log.warn("Might have forgotten some spaces between the UIDs. Don't worry, I'll split them for you") + log.warning("Might have forgotten some spaces between the " + + "UIDs. Don't worry, I'll split them for you") for uid in chunks(uids, 12): if not isUID(uid): - log.error("Not an UID: {}".format(uid)) + log.error("Not an UID: %s", uid) continue messages.append(uid) for message in messages: - queryStr = 'tag:tuid{}'.format(message) + queryStr = f'tag:tuid{message}' nbMsgs = applyMsgs(queryStr, action, write=write, closeDb=False) if nbMsgs < 1: - log.error("Couldn't execute function for message {}".format(message)) + log.error("Couldn't execute function for message %s", message) close_database() -def format_header_value(val): + +def format_header_value(val: str) -> str: + """ + Return split header values in a contiguous string. + """ return val.replace('\n', '').replace('\t', '').strip() -# From https://stackoverflow.com/a/1094933 -def sizeof_fmt(num, suffix='B'): - for unit in ['','Ki','Mi','Gi','Ti','Pi','Ei','Zi']: - if abs(num) < 1024.0: - return "%3.1f %s%s" % (num, unit, suffix) - num /= 1024.0 - return "%.1f %s%s" % (num, 'Yi', suffix) -PART_MULTI_FORMAT = colorama.Fore.BLUE + '{nb} {indent}+ {typ}' + colorama.Style.RESET_ALL -PART_LEAF_FORMAT = colorama.Fore.BLUE + '{nb} {indent}→ {desc} ({typ}; {size})' + colorama.Style.RESET_ALL -def show_parts_tree(part, lvl=0, nb=1): - indent = lvl * '\t' +def sizeof_fmt(num: int, suffix: str = 'B') -> str: + """ + Print the given size in a human-readable format. + """ + remainder = float(num) + # From https://stackoverflow.com/a/1094933 + for unit in ['', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi']: + if abs(remainder) < 1024.0: + return "%3.1f %s%s" % (remainder, unit, suffix) + remainder /= 1024.0 + return "%.1f %s%s" % (remainder, 'Yi', suffix) + + +PART_MULTI_FORMAT = colorama.Fore.BLUE + \ + '{nb} {indent}+ {typ}' + colorama.Style.RESET_ALL +PART_LEAF_FORMAT = colorama.Fore.BLUE + \ + '{nb} {indent}→ {desc} ({typ}; {size})' + \ + colorama.Style.RESET_ALL + + +def show_parts_tree(part: email.message.Message, + depth: int = 0, nb: int = 1) -> int: + """ + Show a tree of the parts contained in a message. + Return the number of parts of the mesage. + """ + indent = depth * '\t' typ = part.get_content_type() + if part.is_multipart(): print(PART_MULTI_FORMAT.format(nb=nb, indent=indent, typ=typ)) payl = part.get_payload() + assert isinstance(payl, list) size = 1 for obj in payl: - size += show_parts_tree(obj, lvl=lvl+1, nb=nb+size) + size += show_parts_tree(obj, depth=depth+1, nb=nb+size) return size - else: - size = len(part.get_payload(decode=True)) - desc = part.get('Content-Description', '') - print(PART_LEAF_FORMAT.format(nb=nb, indent=indent, typ=typ, desc=desc, size=sizeof_fmt(size))) - return 1 + + # size = len(part.get_payload(decode=True)) + payl = part.get_payload(decode=True) + assert isinstance(payl, bytes) + size = len(payl) + desc = part.get('Content-Description', '') + print(PART_LEAF_FORMAT.format(nb=nb, indent=indent, typ=typ, + desc=desc, size=sizeof_fmt(size))) + return 1 + INTERESTING_HEADERS = ["Date", "From", "Subject", "To", "Cc", "Message-Id"] -HEADER_FORMAT = colorama.Fore.BLUE + colorama.Style.BRIGHT + '{}:' + colorama.Style.NORMAL + ' {}' + colorama.Style.RESET_ALL -def read_msg(msg): +HEADER_FORMAT = colorama.Fore.BLUE + colorama.Style.BRIGHT + \ + '{}:' + colorama.Style.NORMAL + ' {}' + colorama.Style.RESET_ALL + + +def read_msg(msg: notmuch.Message) -> None: # Parse filename = msg.get_filename() parser = email.parser.BytesParser() @@ -422,16 +573,16 @@ def read_msg(msg): a = mail # Defects - if len(mail.defects): - log.warn("Defects found in the mail:") + if mail.defects: + log.warning("Defects found in the mail:") for defect in mail.defects: - log.warn(mail.defects) - + log.warning(defect) # Headers for key in INTERESTING_HEADERS: val = mail.get(key) if val: + assert isinstance(val, str) val = format_header_value(val) print(HEADER_FORMAT.format(key, val)) # TODO Show all headers @@ -444,6 +595,7 @@ def read_msg(msg): for part in mail.walk(): if part.get_content_type() == "text/plain": payl = part.get_payload(decode=True) + assert isinstance(payl, bytes) print(payl.decode()) @@ -453,88 +605,118 @@ if __name__ == "__main__": # Main arguments parser = argparse.ArgumentParser(description="Meh mail client") selectedVerbosityLevels = ["DEBUG", "INFO", "WARNING", "ERROR", "FATAL"] - parser.add_argument('-v', '--verbosity', choices=selectedVerbosityLevels, default='WARNING', help="Verbosity of log messages") - # parser.add_argument('-n', '--dry-run', action='store_true', help="Don't do anything") # DEBUG - defaultConfigFile = os.path.join(xdg.BaseDirectory.xdg_config_home, 'mel', 'accounts.conf') - parser.add_argument('-c', '--config', default=defaultConfigFile, help="Accounts config file") + parser.add_argument('-v', '--verbosity', choices=selectedVerbosityLevels, + default='WARNING', help="Verbosity of log messages") + # parser.add_argument('-n', '--dry-run', action='store_true', + # help="Don't do anything") # DEBUG + defaultConfigFile = os.path.join( + xdg.BaseDirectory.xdg_config_home, 'mel', 'accounts.conf') + parser.add_argument('-c', '--config', default=defaultConfigFile, + help="Accounts config file") subparsers = parser.add_subparsers(help="Action to execute") - ## List messages + # List messages - def func_default(args): + def func_default(_: argparse.Namespace) -> None: + """ + Default operation: list all message in the inbox + """ applyMsgs('tag:inbox', print_msg) parser.set_defaults(func=func_default) # inbox (default) - def func_inbox(args): + def func_inbox(args: argparse.Namespace) -> None: + """ + Inbox operation: list all message in the inbox, + possibly only the unread ones. + """ queryStr = 'tag:unread' if args.only_unread else 'tag:inbox' applyMsgs(queryStr, print_msg) - parserInbox = subparsers.add_parser("inbox", help="Show unread, unsorted and flagged messages") - parserInbox.add_argument('-u', '--only-unread', action='store_true', help="Show unread messages only") + parserInbox = subparsers.add_parser( + "inbox", help="Show unread, unsorted and flagged messages") + parserInbox.add_argument('-u', '--only-unread', action='store_true', + help="Show unread messages only") # TODO Make this more relevant parserInbox.set_defaults(func=func_inbox) - # list folder [--recurse] - ## List actions - + # List actions # flag msg... - def func_flag(args): - def flag_msg(msg): + + def func_flag(args: argparse.Namespace) -> None: + """ + Flag operation: Flag user selected messages. + """ + def flag_msg(msg: notmuch.Message) -> None: + """ + Flag given message. + """ msg.add_tag('flagged') apply_msgs_input(args.message, flag_msg, write=True) parserFlag = subparsers.add_parser("flag", help="Mark messages as flagged") parserFlag.add_argument('message', nargs='*', help="Messages") parserFlag.set_defaults(func=func_flag) - # unflag msg... - def func_unflag(args): - def unflag_msg(msg): + + def func_unflag(args: argparse.Namespace) -> None: + """ + Unflag operation: Flag user selected messages. + """ + def unflag_msg(msg: notmuch.Message) -> None: + """ + Unflag given message. + """ msg.remove_tag('flagged') apply_msgs_input(args.message, unflag_msg, write=True) - parserUnflag = subparsers.add_parser("unflag", help="Mark messages as not-flagged") + parserUnflag = subparsers.add_parser( + "unflag", help="Mark messages as not-flagged") parserUnflag.add_argument('message', nargs='*', help="Messages") parserUnflag.set_defaults(func=func_unflag) - # delete msg... # spam msg... # move dest msg... - ## Read message - + # Read message # read msg [--html] [--plain] [--browser] - def func_read(args): + + def func_read(args: argparse.Namespace) -> None: + """ + Read operation: show full content of selected message + """ apply_msgs_input(args.message, read_msg) parserRead = subparsers.add_parser("read", help="Read message") parserRead.add_argument('message', nargs=1, help="Messages") parserRead.set_defaults(func=func_read) - # attach msg [id] [--save] (list if no id, xdg-open else) - ## Redaction + # Redaction # new account # reply msg [--all] - ## Folder management + # Folder management # tree [folder] # mkdir folder # rmdir folder (prevent if folder isn't empty (mail/subfolder)) # (yeah that should do) - ## Meta + # Meta # setup (interactive thing maybe) - # fetch (mbsync, notmuch new, retag, notify; called by greater gods) - def func_fetch(args): + + def func_fetch(args: argparse.Namespace) -> None: + """ + Fetch operation: Sync remote databases with the local one. + """ # Fetch mails log.info("Fetching mails") - mbsyncConfigPath = os.path.expanduser("~/.config/mbsyncrc") # TODO Better + mbsyncConfigPath = os.path.expanduser( + "~/.config/mbsyncrc") # TODO Better cmd = ["mbsync", "--config", mbsyncConfigPath, "--all"] - subprocess.run(cmd) + subprocess.run(cmd, check=True) # Index new mails notmuch_new() @@ -545,33 +727,53 @@ if __name__ == "__main__": # Tag new mails applyMsgs('tag:unprocessed', retag_msg, showProgress=True, write=True) - parserFetch = subparsers.add_parser("fetch", help="Fetch mail, tag them, and run notifications") + parserFetch = subparsers.add_parser( + "fetch", help="Fetch mail, tag them, and run notifications") parserFetch.set_defaults(func=func_fetch) - - ## Debug + # Debug # debug (various) - def func_expose(args): + + def func_expose(_: argparse.Namespace) -> None: + """ + DEBUG + """ # And leave the door open - def expose_msg(a): - global msg - msg = a - applyMsgs('tag:tuidyviU45m6flff', expose_msg, closeDb=False) - def func_debug(args): + def expose_object(msg: typing.Any) -> None: + """ + DEBUG + """ + global a + a = msg + applyMsgs('tag:tuidyviU45m6flff', expose_object, closeDb=False) + + def func_debug(_: argparse.Namespace) -> None: + """ + DEBUG + """ from pprint import pprint pprint(list_folders()) - parserDebug = subparsers.add_parser("debug", help="Who know what this holds...") + parserDebug = subparsers.add_parser( + "debug", help="Who know what this holds...") parserDebug.set_defaults(verbosity='DEBUG') parserDebug.set_defaults(func=func_debug) # retag (all or unprocessed) - def func_retag(args): + def func_retag(_: argparse.Namespace) -> None: + """ + Retag operation: Manually retag all the mails in the database. + Mostly debug I suppose. + """ applyMsgs('*', retag_msg, showProgress=True, write=True) - parserRetag = subparsers.add_parser("retag", help="Retag all mails (when you changed configuration)") + parserRetag = subparsers.add_parser( + "retag", help="Retag all mails (when you changed configuration)") parserRetag.set_defaults(func=func_retag) # all - def func_all(args): + def func_all(_: argparse.Namespace) -> None: + """ + All operation: list every single message. + """ applyMsgs('*', print_msg) parserAll = subparsers.add_parser("all", help="Show ALL messages") @@ -585,24 +787,24 @@ if __name__ == "__main__": coloredlogs.install(level=args.verbosity, fmt='%(levelname)s %(message)s') log = logging.getLogger() - log.info("Loading config {}".format(args.config)) + log.info("Loading config %s", args.config) if not os.path.isfile(args.config): - log.fatal("Config file not found: {}".format(args.config)) + log.fatal("config file not found: %s", args.config) sys.exit(1) # TODO Create it, maybe? - config = configparser.ConfigParser() - config.read(args.config) + CONFIG = configparser.ConfigParser() + CONFIG.read(args.config) generate_aliases() perfstep("config") if args.func: - log.info("Executing function {}".format(args.func)) + log.info("Executing function %s", args.func) args.func(args) perfstep("exec") # DEBUG - for kv in sorted(perf_dict.items(), key=lambda p: p[1]): - log.debug("{1:.6f} {0}".format(*kv)) + for kv in sorted(PERF_DICT.items(), key=lambda p: p[1]): + log.debug("{1:.6f}s {0}".format(*kv)) sys.exit(0)