#!/usr/bin/env python3 # pylint: disable=C0103,W0603,W0621,E1101 """ Meh mail client A dumb Python scripts that leverages notmuch, mbsync, and msmtp to become a fully-functional extremly-opinonated mail client. """ # TODO Features # TODO Implement initial command set # TODO Lockfiles for write operations on mail files (mbsync, # tags→maildir operations) # TODO OPTI Lockfile per account and process everything in parallel # (if implemented, this should be optional since while it may speed up # the mail fetching process, its multi-threading nature would cause a # lot of cache flushes and be not very efficient on battery) # TODO Handle true character width # TODO IMAP IDLE watches? # TODO GPG # TODO (only then) Refactor # TODO OOP-based # TODO Merge file with melConf # TODO Un-ignore pyling warnings import argparse import configparser import datetime import email.message import email.parser import html import logging import os import re import shutil import subprocess import sys import time import typing import colorama import coloredlogs import notmuch import progressbar import xdg.BaseDirectory PERF_LAST = time.perf_counter() PERF_DICT: typing.Dict[str, float] = dict() a: typing.Any = 'DEBUG VARIABLE (empty)' def perfstep(name: str) -> None: """ DEBUG Small performance profiler to measure steps. Call with the name of the step when you just finished it. """ current_time = time.perf_counter() global PERF_LAST global PERF_DICT diff = current_time - PERF_LAST if name not in PERF_DICT: PERF_DICT[name] = 0.0 PERF_DICT[name] += diff PERF_LAST = time.perf_counter() ACCOUNTS: typing.Dict[str, configparser.SectionProxy] = dict() ALIASES: typing.Set[str] = set() # All the emails the user is represented as # TODO If the user send emails to himself, maybe that wont cut it. DB = None CONFIG = None def notmuch_new() -> None: """ Runs `notmuch new`, which basically update the database to match the mail folder. """ close_database() log.info("Indexing mails") notmuchConfigPath = os.path.expanduser( "~/.config/notmuch-config") # TODO Better cmd = ["notmuch", "--config", notmuchConfigPath, "new"] log.debug(" ".join(cmd)) subprocess.run(cmd, check=True) def list_folders() -> typing.List[typing.Tuple[str, ...]]: """ List all the folders of the mail dir. """ assert CONFIG storagePath = os.path.realpath( os.path.expanduser(CONFIG["GENERAL"]["storage"])) folders = list() for account in ACCOUNTS: storagePathAccount = os.path.join(storagePath, account) for root, dirs, _ in os.walk(storagePathAccount): if "cur" not in dirs or "new" not in dirs or "tmp" not in dirs: continue assert root.startswith(storagePath) path = root[len(storagePath):] pathSplit = path.split('/') if pathSplit[0] == '': pathSplit = pathSplit[1:] folders.append(tuple(pathSplit)) return folders def open_database(write: bool = False) -> None: """ Open an access notmuch database in read or read+write mode. It is stored in the global DB. Be sure to require only in the mode you want to avoid deadlocks. """ assert CONFIG global DB mode = notmuch.Database.MODE.READ_WRITE if write \ else notmuch.Database.MODE.READ_ONLY if DB: if DB.mode == mode: return log.info("Current database not in mode %s, closing", mode) close_database() log.info("Opening database in mode %s", mode) dbPath = os.path.realpath(os.path.expanduser(CONFIG["GENERAL"]["storage"])) DB = notmuch.Database(mode=mode, path=dbPath) def close_database() -> None: """ Close the access notmuch database in read or read+write mode. It is stored in the global DB. """ global DB if DB: log.info("Closing database") DB.close() DB = None def generate_aliases() -> None: assert CONFIG for name in CONFIG.sections(): if not name.islower(): continue section = CONFIG[name] ALIASES.add(section["from"]) if "alternatives" in section: for alt in section["alternatives"].split(";"): ALIASES.add(alt) ACCOUNTS[name] = section MailLocation = typing.NewType('MailLocation', typing.Tuple[str, str, str]) def get_location(msg: notmuch.Message) -> MailLocation: """ Return the filesystem location (relative to the mail directory) of the given message. """ path = msg.get_filename() path = os.path.dirname(path) assert DB base = DB.get_path() assert path.startswith(base) path = path[len(base):] pathSplit = path.split('/') mailbox = pathSplit[1] assert mailbox in ACCOUNTS state = pathSplit[-1] folder = tuple(pathSplit[2:-1]) assert state in {'cur', 'tmp', 'new'} return (mailbox, folder, state) MAILBOX_COLORS: typing.Dict[str, str] = dict() def get_mailbox_color(mailbox: str) -> str: """ Return the color of the given mailbox in a ready to print string with ASCII escape codes. """ # TODO Do not use 256³ colors but 16 colors assert CONFIG if mailbox not in MAILBOX_COLORS: colorStr = CONFIG[mailbox]["color"] colorStr = colorStr[1:] if colorStr[0] == '#' else colorStr R = int(colorStr[0:2], 16) G = int(colorStr[2:4], 16) B = int(colorStr[4:6], 16) MAILBOX_COLORS[mailbox] = '\x1b[38;2;{};{};{}m'.format(R, G, B) return MAILBOX_COLORS[mailbox] def format_date(date: datetime.datetime) -> str: """ Format the given date as a 9-characters width string. Show the time if the mail is less than 24h old, else show the date. """ # TODO Do as the description say now = datetime.datetime.now() midnight = datetime.datetime(year=now.year, month=now.month, day=now.day) if date > midnight: return date.strftime('%H:%M:%S') # TODO Use my favourite date system return date.strftime('%d/%m/%y') WIDTH_FIXED = 31 WIDTH_RATIO_DEST_SUBJECT = 0.3 ISATTY = sys.stdout.isatty() DEST_WIDTH: typing.Optional[int] = None SUBJECT_WIDTH: typing.Optional[int] = None def compute_line_format() -> None: """ Based on the terminal width, assign the width of flexible columns. """ if ISATTY: columns, _ = shutil.get_terminal_size((80, 20)) remain = columns - WIDTH_FIXED - 1 global DEST_WIDTH, SUBJECT_WIDTH DEST_WIDTH = int(remain * WIDTH_RATIO_DEST_SUBJECT) SUBJECT_WIDTH = remain - DEST_WIDTH else: DEST_WIDTH = None SUBJECT_WIDTH = None def clip_text(size: int, text: str) -> str: """ Fit text into the given character size, fill with spaces if shorter, clip with … if larger. """ if size is None: return text length = len(text) if length == size: return text if length > size: return text[:size-1] + '…' return text + ' ' * (size - length) def isUID(uid: typing.Any) -> bool: """ Tells if the provided string is a valid UID. """ return isinstance(uid, str) and len(uid) == 12 \ and bool(re.match('^[a-zA-Z0-9+/]{12}$', uid)) def print_msg(msg: notmuch.Message) -> None: """ Print the given message header on one line. """ if not DEST_WIDTH: compute_line_format() assert DEST_WIDTH and SUBJECT_WIDTH sep = " " if ISATTY else "\t" line = "" tags = set(msg.get_tags()) mailbox, _, _ = get_location(msg) if ISATTY: line += get_mailbox_color(mailbox) # UID uid = None for tag in tags: if tag.startswith('tuid'): uid = tag[4:] assert uid and isUID(uid), "{uid} ({type(UID)}) is not a valid UID." line += uid # Date line += sep date = datetime.datetime.fromtimestamp(msg.get_date()) line += format_date(date) # Icons line += sep def tags2col1(tag1: str, tag2: str, characters: typing.Tuple[str, str, str, str]) -> None: """ Show the presence/absence of two tags with one character. """ nonlocal line both, first, second, none = characters if tag1 in tags: if tag2 in tags: line += both else: line += first else: if tag2 in tags: line += second else: line += none tags2col1('spam', 'draft', ('?', 'S', 'D', ' ')) tags2col1('attachment', 'encrypted', ('E', 'A', 'E', ' ')) tags2col1('unread', 'flagged', ('!', 'U', 'F', ' ')) tags2col1('sent', 'replied', ('?', '↑', '↪', ' ')) if 'sent' in tags: dest = msg.get_header("to") else: dest = msg.get_header("from") line += sep line += clip_text(DEST_WIDTH, dest) # Subject line += sep subject = msg.get_header("subject") line += clip_text(SUBJECT_WIDTH, subject) if ISATTY: line += colorama.Style.RESET_ALL print(line) def extract_email(field: str) -> str: """ Extract the email adress from a To: or From: field (usually the whole field or between < >) """ try: sta = field.index('<') sto = field.index('>') return field[sta+1:sto] except ValueError: return field def retag_msg(msg: notmuch.Message) -> None: """ Update automatic tags for message. """ _, folder, _ = get_location(msg) # Search-friendly folder name slugFolderList = list() for f, fold in [(f, folder[f]) for f in range(len(folder))]: if f == 0 and len(folder) > 1 and fold == "INBOX": continue slugFolderList.append(fold.upper()) slugFolder = tuple(slugFolderList) tags = set(msg.get_tags()) def tag_if(tag: str, condition: bool) -> None: """ Ensure the presence/absence of tag depending on the condition. """ nonlocal msg if condition and tag not in tags: msg.add_tag(tag) elif not condition and tag in tags: msg.remove_tag(tag) expeditor = extract_email(msg.get_header('from')) tag_if('inbox', slugFolder[0] == 'INBOX') tag_if('spam', slugFolder[0] in ('JUNK', 'SPAM')) tag_if('deleted', slugFolder[0] == 'TRASH') tag_if('draft', slugFolder[0] == 'DRAFTS') tag_if('sent', expeditor in ALIASES) tag_if('unprocessed', False) # UID uid = msg.get_header("X-TUID") if not isUID(uid): # TODO Happens to sent mails but should it? print(f"{msg.get_filename()} has no UID!") return uidtag = 'tuid{}'.format(uid) # Remove eventual others UID for tag in tags: if tag.startswith('tuid') and tag != uidtag: msg.remove_tag(tag) msg.add_tag(uidtag) def applyMsgs(queryStr: str, action: typing.Callable, *args: typing.Any, showProgress: bool = False, write: bool = False, closeDb: bool = True, **kwargs: typing.Any) -> int: """ Run a function on the messages selected by the given query. """ open_database(write=write) log.info("Querying %s", queryStr) query = notmuch.Query(DB, queryStr) query.set_sort(notmuch.Query.SORT.OLDEST_FIRST) elements = query.search_messages() nbMsgs = query.count_messages() iterator = progressbar.progressbar( elements, max_value=nbMsgs) if showProgress else elements log.info("Executing %s", action) for msg in iterator: if write: msg.freeze() action(msg, *args, **kwargs) if write: msg.thaw() msg.tags_to_maildir_flags() if closeDb: close_database() return nbMsgs def notify_msg(msg: notmuch.Message) -> None: """ Send a notification for the given message. """ log.info("Sending notification for %s", msg) subject = msg.get_header("subject") expd = msg.get_header("from") account, _, _ = get_location(msg) summary = '{} ({})'.format(html.escape(expd), account) body = html.escape(subject) cmd = ["notify-send", "-u", "low", "-i", "mail-message-new", summary, body] print(' '.join(cmd)) subprocess.run(cmd, check=False) def notify_all() -> None: """ Send a notification for unprocessed and unread message. Basically should only send a notification for a given message once since it should be marked as processed right after. """ open_database() nbMsgs = applyMsgs('tag:unread and tag:unprocessed', notify_msg) if nbMsgs > 0: log.info("Playing notification sound (%d new message(s))", nbMsgs) cmd = ["play", "-n", "synth", "sine", "E4", "sine", "A5", "remix", "1-2", "fade", "0.5", "1.2", "0.5", "2"] subprocess.run(cmd, check=False) close_database() def chunks(l: str, n: int) -> typing.Iterable[str]: """Yield successive n-sized chunks from l.""" # From https://stackoverflow.com/a/312464 for i in range(0, len(l), n): yield l[i:i + n] def apply_msgs_input(argmessages: typing.List[str], action: typing.Callable, write: bool = False) -> None: """ Run a function on the message given by the user. """ if not argmessages: fromStdin = not sys.stdin.isatty() if argmessages: fromStdin = len(argmessages) == 1 and argmessages == '-' messages = list() if fromStdin: for line in sys.stdin: uid = line[:12] if not isUID(uid): log.error("Not an UID: %s", uid) continue messages.append(uid) else: for uids in argmessages: if len(uids) > 12: log.warning("Might have forgotten some spaces between the " + "UIDs. Don't worry, I'll split them for you") for uid in chunks(uids, 12): if not isUID(uid): log.error("Not an UID: %s", uid) continue messages.append(uid) for message in messages: queryStr = f'tag:tuid{message}' nbMsgs = applyMsgs(queryStr, action, write=write, closeDb=False) if nbMsgs < 1: log.error("Couldn't execute function for message %s", message) close_database() def format_header_value(val: str) -> str: """ Return split header values in a contiguous string. """ return val.replace('\n', '').replace('\t', '').strip() def sizeof_fmt(num: int, suffix: str = 'B') -> str: """ Print the given size in a human-readable format. """ remainder = float(num) # From https://stackoverflow.com/a/1094933 for unit in ['', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi']: if abs(remainder) < 1024.0: return "%3.1f %s%s" % (remainder, unit, suffix) remainder /= 1024.0 return "%.1f %s%s" % (remainder, 'Yi', suffix) PART_MULTI_FORMAT = colorama.Fore.BLUE + \ '{nb} {indent}+ {typ}' + colorama.Style.RESET_ALL PART_LEAF_FORMAT = colorama.Fore.BLUE + \ '{nb} {indent}→ {desc} ({typ}; {size})' + \ colorama.Style.RESET_ALL def show_parts_tree(part: email.message.Message, depth: int = 0, nb: int = 1) -> int: """ Show a tree of the parts contained in a message. Return the number of parts of the mesage. """ indent = depth * '\t' typ = part.get_content_type() if part.is_multipart(): print(PART_MULTI_FORMAT.format(nb=nb, indent=indent, typ=typ)) payl = part.get_payload() assert isinstance(payl, list) size = 1 for obj in payl: size += show_parts_tree(obj, depth=depth+1, nb=nb+size) return size # size = len(part.get_payload(decode=True)) payl = part.get_payload(decode=True) assert isinstance(payl, bytes) size = len(payl) desc = part.get('Content-Description', '') print(PART_LEAF_FORMAT.format(nb=nb, indent=indent, typ=typ, desc=desc, size=sizeof_fmt(size))) return 1 INTERESTING_HEADERS = ["Date", "From", "Subject", "To", "Cc", "Message-Id"] HEADER_FORMAT = colorama.Fore.BLUE + colorama.Style.BRIGHT + \ '{}:' + colorama.Style.NORMAL + ' {}' + colorama.Style.RESET_ALL def read_msg(msg: notmuch.Message) -> None: # Parse filename = msg.get_filename() parser = email.parser.BytesParser() with open(filename, 'rb') as f: mail = parser.parse(f) # Debug global a a = mail # Defects if mail.defects: log.warning("Defects found in the mail:") for defect in mail.defects: log.warning(defect) # Headers for key in INTERESTING_HEADERS: val = mail.get(key) if val: assert isinstance(val, str) val = format_header_value(val) print(HEADER_FORMAT.format(key, val)) # TODO Show all headers # TODO BONUS Highlight failed verifications show_parts_tree(mail) print() # Show text/plain for part in mail.walk(): if part.get_content_type() == "text/plain": payl = part.get_payload(decode=True) assert isinstance(payl, bytes) print(payl.decode()) perfstep("definitions") if __name__ == "__main__": # Main arguments parser = argparse.ArgumentParser(description="Meh mail client") selectedVerbosityLevels = ["DEBUG", "INFO", "WARNING", "ERROR", "FATAL"] parser.add_argument('-v', '--verbosity', choices=selectedVerbosityLevels, default='WARNING', help="Verbosity of log messages") # parser.add_argument('-n', '--dry-run', action='store_true', # help="Don't do anything") # DEBUG defaultConfigFile = os.path.join( xdg.BaseDirectory.xdg_config_home, 'mel', 'accounts.conf') parser.add_argument('-c', '--config', default=defaultConfigFile, help="Accounts config file") subparsers = parser.add_subparsers(help="Action to execute") # List messages def func_default(_: argparse.Namespace) -> None: """ Default operation: list all message in the inbox """ applyMsgs('tag:inbox', print_msg) parser.set_defaults(func=func_default) # inbox (default) def func_inbox(args: argparse.Namespace) -> None: """ Inbox operation: list all message in the inbox, possibly only the unread ones. """ queryStr = 'tag:unread' if args.only_unread else 'tag:inbox' applyMsgs(queryStr, print_msg) parserInbox = subparsers.add_parser( "inbox", help="Show unread, unsorted and flagged messages") parserInbox.add_argument('-u', '--only-unread', action='store_true', help="Show unread messages only") # TODO Make this more relevant parserInbox.set_defaults(func=func_inbox) # list folder [--recurse] # List actions # flag msg... def func_flag(args: argparse.Namespace) -> None: """ Flag operation: Flag user selected messages. """ def flag_msg(msg: notmuch.Message) -> None: """ Flag given message. """ msg.add_tag('flagged') apply_msgs_input(args.message, flag_msg, write=True) parserFlag = subparsers.add_parser("flag", help="Mark messages as flagged") parserFlag.add_argument('message', nargs='*', help="Messages") parserFlag.set_defaults(func=func_flag) # unflag msg... def func_unflag(args: argparse.Namespace) -> None: """ Unflag operation: Flag user selected messages. """ def unflag_msg(msg: notmuch.Message) -> None: """ Unflag given message. """ msg.remove_tag('flagged') apply_msgs_input(args.message, unflag_msg, write=True) parserUnflag = subparsers.add_parser( "unflag", help="Mark messages as not-flagged") parserUnflag.add_argument('message', nargs='*', help="Messages") parserUnflag.set_defaults(func=func_unflag) # delete msg... # spam msg... # move dest msg... # Read message # read msg [--html] [--plain] [--browser] def func_read(args: argparse.Namespace) -> None: """ Read operation: show full content of selected message """ apply_msgs_input(args.message, read_msg) parserRead = subparsers.add_parser("read", help="Read message") parserRead.add_argument('message', nargs=1, help="Messages") parserRead.set_defaults(func=func_read) # attach msg [id] [--save] (list if no id, xdg-open else) # Redaction # new account # reply msg [--all] # Folder management # tree [folder] # mkdir folder # rmdir folder (prevent if folder isn't empty (mail/subfolder)) # (yeah that should do) # Meta # setup (interactive thing maybe) # fetch (mbsync, notmuch new, retag, notify; called by greater gods) def func_fetch(args: argparse.Namespace) -> None: """ Fetch operation: Sync remote databases with the local one. """ # Fetch mails log.info("Fetching mails") mbsyncConfigPath = os.path.expanduser( "~/.config/mbsyncrc") # TODO Better cmd = ["mbsync", "--config", mbsyncConfigPath, "--all"] subprocess.run(cmd, check=True) # Index new mails notmuch_new() # Notify notify_all() # Tag new mails applyMsgs('tag:unprocessed', retag_msg, showProgress=True, write=True) parserFetch = subparsers.add_parser( "fetch", help="Fetch mail, tag them, and run notifications") parserFetch.set_defaults(func=func_fetch) # Debug # debug (various) def func_expose(_: argparse.Namespace) -> None: """ DEBUG """ # And leave the door open def expose_object(msg: typing.Any) -> None: """ DEBUG """ global a a = msg applyMsgs('tag:tuidyviU45m6flff', expose_object, closeDb=False) def func_debug(_: argparse.Namespace) -> None: """ DEBUG """ from pprint import pprint pprint(list_folders()) parserDebug = subparsers.add_parser( "debug", help="Who know what this holds...") parserDebug.set_defaults(verbosity='DEBUG') parserDebug.set_defaults(func=func_debug) # retag (all or unprocessed) def func_retag(_: argparse.Namespace) -> None: """ Retag operation: Manually retag all the mails in the database. Mostly debug I suppose. """ applyMsgs('*', retag_msg, showProgress=True, write=True) parserRetag = subparsers.add_parser( "retag", help="Retag all mails (when you changed configuration)") parserRetag.set_defaults(func=func_retag) # all def func_all(_: argparse.Namespace) -> None: """ All operation: list every single message. """ applyMsgs('*', print_msg) parserAll = subparsers.add_parser("all", help="Show ALL messages") parserAll.set_defaults(func=func_all) # Init args = parser.parse_args() perfstep("parse_args") colorama.init() coloredlogs.install(level=args.verbosity, fmt='%(levelname)s %(message)s') log = logging.getLogger() log.info("Loading config %s", args.config) if not os.path.isfile(args.config): log.fatal("config file not found: %s", args.config) sys.exit(1) # TODO Create it, maybe? CONFIG = configparser.ConfigParser() CONFIG.read(args.config) generate_aliases() perfstep("config") if args.func: log.info("Executing function %s", args.func) args.func(args) perfstep("exec") # DEBUG for kv in sorted(PERF_DICT.items(), key=lambda p: p[1]): log.debug("{1:.6f}s {0}".format(*kv)) sys.exit(0)