#!/usr/bin/env python3 """ Meh mail client A dumb Python scripts that leverages notmuch, mbsync, and msmtp to become a fully-functional extremly-opinonated mail client. """ # TODO Features # TODO Implement initial command set # TODO Lockfiles for write operations on mail files (mbsync, tags→maildir operations) # TODO OPTI Lockfile per account and process everything in parallel (if implemented, this # should be optional since while it may speed up the mail fetching process, its multi-threading # nature would cause a lot of cache flushes and be not very efficient on battery) # TODO Handle true character width # TODO IMAP IDLE watches? # TODO GPG # TODO (only then) Refactor # TODO OOP-based # TODO Merge file with melConf # DEBUG Small perf profiler import time perf_dict = dict() perf_last = time.perf_counter() def perfstep(name): t = time.perf_counter() global perf_last global perf_dict diff = t - perf_last if name not in perf_dict: perf_dict[name] = 0 perf_dict[name] += diff perf_last = time.perf_counter() import notmuch import logging import coloredlogs import colorama import datetime import os import progressbar import argparse import configparser import base64 import shutil import argparse import xdg.BaseDirectory import sys import subprocess import html import re import email.parser perfstep("import") ACCOUNTS = dict() ALIASES = set() db = None config = None def notmuch_new(): close_database() log.info("Indexing mails") notmuchConfigPath = os.path.expanduser("~/.notmuchrc") # TODO Better cmd = ["notmuch", "--config", notmuchConfigPath, "new"] log.debug(" ".join(cmd)) subprocess.run(cmd) def list_folders(): storagePath = os.path.realpath(os.path.expanduser(config["GENERAL"]["storage"])) folders = list() for account in ACCOUNTS.keys(): storagePathAccount = os.path.join(storagePath, account) for root, dirs, files in os.walk(storagePathAccount): if "cur" not in dirs or "new" not in dirs or "tmp" not in dirs: continue assert root.startswith(storagePath) path = root[len(storagePath):] pathSplit = path.split('/') if pathSplit[0] == '': pathSplit = pathSplit[1:] folders.append(tuple(pathSplit)) return folders def open_database(write=False): global db mode = notmuch.Database.MODE.READ_WRITE if write else notmuch.Database.MODE.READ_ONLY if db: if db.mode == mode: return else: log.info("Current database not in required mode, closing") close_database() log.info("Opening database in mode {}".format(mode)) dbPath = os.path.realpath(os.path.expanduser(config["GENERAL"]["storage"])) db = notmuch.Database(mode=mode, path=dbPath) def close_database(): global db if db: log.info("Closing database") db.close() db = None def generate_aliases(): for name in config.sections(): if not name.islower(): continue section = config[name] ALIASES.add(section["from"]) if "alternatives" in section: for alt in section["alternatives"].split(";"): ALIASES.add(alt) ACCOUNTS[name] = section def get_location(msg): path = msg.get_filename() path = os.path.dirname(path) base = db.get_path() assert path.startswith(base) path = path[len(base):] pathSplit = path.split('/') mailbox = pathSplit[1] assert mailbox in ACCOUNTS state = pathSplit[-1] folder = tuple(pathSplit[2:-1]) assert state in {'cur', 'tmp', 'new'} return (mailbox, folder, state) MAILBOX_COLORS = dict() def get_mailbox_color(mailbox): if mailbox not in MAILBOX_COLORS: colorStr = config[mailbox]["color"] colorStr = colorStr[1:] if colorStr[0] == '#' else colorStr R = int(colorStr[0:2], 16) G = int(colorStr[2:4], 16) B = int(colorStr[4:6], 16) MAILBOX_COLORS[mailbox] = '\x1b[38;2;{};{};{}m'.format(R, G, B) return MAILBOX_COLORS[mailbox] def format_date(date): now = datetime.datetime.now() midnight = datetime.datetime(year=now.year, month=now.month, day=now.day) if date > midnight: return date.strftime('%H:%M:%S') else: return date.strftime('%d/%m/%y') WIDTH_FIXED = 31 WIDTH_RATIO_DEST_SUBJECT = 0.3 ISATTY = sys.stdout.isatty() destWidth = None subjectWidth = None def compute_line_format(): if ISATTY: columns, rows = shutil.get_terminal_size((80, 20)) remain = columns - WIDTH_FIXED - 1 global destWidth, subjectWidth destWidth = int(remain * WIDTH_RATIO_DEST_SUBJECT) subjectWidth = remain - destWidth else: destWidth = None subjectWidth = None def clip_text(size, text): if size is None: return text l = len(text) if l == size: return text elif l > size: return text[:size-1] + '…' elif l < size: return text + " " * (size - l) def print_msg(msg): if not destWidth: compute_line_format() sep = " " if ISATTY else "\t" line = "" tags = set(msg.get_tags()) mailbox, folder, state = get_location(msg) if ISATTY: line += get_mailbox_color(mailbox) # UID uid = None for tag in tags: if tag.startswith('tuid'): uid = tag[4:] assert isUID(uid), uid line += uid # Date line += sep date = datetime.datetime.fromtimestamp(msg.get_date()) line += format_date(date) # Icons line += sep def tags2col1(tag1, tag2, both, first, second, none): nonlocal line if tag1 in tags: if tag2 in tags: line += both else: line += first else: if tag2 in tags: line += second else: line += none tags2col1('spam', 'draft', '?', 'S', 'D', ' ') tags2col1('attachment', 'encrypted', 'E', 'A', 'E', ' ') tags2col1('unread', 'flagged', '!', 'U', 'F', ' ') tags2col1('sent', 'replied', '?', '↑', '↪', ' ') if 'sent' in tags: dest = msg.get_header("to") else: dest = msg.get_header("from") line += sep line += clip_text(destWidth, dest) # Subject line += sep subject = msg.get_header("subject") line += clip_text(subjectWidth, subject) if ISATTY: line += colorama.Style.RESET_ALL print(line) def retag_msg(msg): mailbox, folder, state = get_location(msg) # Search-friendly folder name slugFolderList = list() for f, fold in [(f, folder[f]) for f in range(len(folder))]: if f == 0 and len(folder) > 1 and fold == "INBOX": continue slugFolderList.append(fold.upper()) slugFolder = tuple(slugFolderList) tags = set(msg.get_tags()) def tag_if(tag, condition): if condition and tag not in tags: msg.add_tag(tag) elif not condition and tag in tags: msg.remove_tag(tag) expeditor = extract_email(msg.get_header('from')) tag_if('inbox', slugFolder[0] == 'INBOX') tag_if('spam', slugFolder[0] == 'JUNK' or slugFolder[0] == 'SPAM') tag_if('deleted', slugFolder[0] == 'TRASH') tag_if('draft', slugFolder[0] == 'DRAFTS') tag_if('sent', expeditor in ALIASES) tag_if('unprocessed', False) # UID uid = msg.get_header("X-TUID") assert isUID(uid) uidtag = 'tuid{}'.format(uid) # Remove eventual others UID for tag in tags: if tag.startswith('tuid') and tag != uidtag: msg.remove_tag(tag) msg.add_tag(uidtag) def extract_email(field): try: sta = field.index('<') sto = field.index('>') return field[sta+1:sto] except ValueError: return field def applyMsgs(queryStr, action, *args, showProgress=False, write=False, closeDb=True, **kwargs): open_database(write=write) log.info("Querying {}".format(queryStr)) query = notmuch.Query(db, queryStr) query.set_sort(notmuch.Query.SORT.OLDEST_FIRST) elements = query.search_messages() nbMsgs = query.count_messages() iterator = progressbar.progressbar(elements, max_value=nbMsgs) if showProgress else elements log.info("Executing {}".format(action)) for msg in iterator: if write: msg.freeze() action(msg, *args, **kwargs) if write: msg.thaw() msg.tags_to_maildir_flags() if closeDb: close_database() return nbMsgs # def update_polybar_status(): def update_polybar_status(*args, **kwargs): log.info("Updating polybar status") accountsList = sorted(ACCOUNTS.keys()) print(accountsList) open_database() statusArr = [] for account in accountsList: queryStr = 'folder:/{}/ and tag:unread'.format(account) query = notmuch.Query(db, queryStr) nbMsgs = query.count_messages() if nbMsgs < 1: continue color = config[account]['color'] statusAccStr = '%{F' + color + '}' + str(nbMsgs) + '%{F-}' statusArr.append(statusAccStr) close_database() statusStr = ('_' + ' '.join(statusArr)) if len(statusArr) else '\n' statusPath = os.path.expanduser("~/.cache/mutt/status") # TODO Better with open(statusPath, 'w') as f: f.write(statusStr) # statusPath = os.path.expanduser("~/.cache/mel/polybarstatus") # TODO Better def notify_msg(msg): log.info("Sending notification for {}".format(msg)) subject = msg.get_header("subject") expd = msg.get_header("from") account, _, _ = get_location(msg) summary = '{} ({})'.format(html.escape(expd), account) body = html.escape(subject) cmd = ["notify-send", "-u", "low", "-i", "mail-message-new", summary, body] print(' '.join(cmd)) subprocess.run(cmd) def notify_all(*args, **kwargs): open_database() nbMsgs = applyMsgs('tag:unread and tag:unprocessed', notify_msg) if nbMsgs > 0: log.info("Playing notification sound ({} new message(s))".format(nbMsgs)) cmd = ["play", "-n", "synth", "sine", "E4", "sine", "A5", "remix", "1-2", "fade", "0.5", "1.2", "0.5", "2"] subprocess.run(cmd) close_database() def isUID(uid): return isinstance(uid, str) and len(uid) == 12 and re.match('^[a-zA-Z0-9+/]{12}$', uid) # From https://stackoverflow.com/a/312464 def chunks(l, n): """Yield successive n-sized chunks from l.""" for i in range(0, len(l), n): yield l[i:i + n] def apply_msgs_input(argmessages, action, write=False): if not len(argmessages): fromStdin = not sys.stdin.isatty() else: fromStdin = len(argmessages) == 1 and argmessages == '-' messages = list() if fromStdin: for line in sys.stdin: uid = line[:12] if not isUID(uid): log.error("Not an UID: {}".format(uid)) continue messages.append(uid) else: for uids in argmessages: if len(uids) > 12: log.warn("Might have forgotten some spaces between the UIDs. Don't worry, I'll split them for you") for uid in chunks(uids, 12): if not isUID(uid): log.error("Not an UID: {}".format(uid)) continue messages.append(uid) for message in messages: queryStr = 'tag:tuid{}'.format(message) nbMsgs = applyMsgs(queryStr, action, write=write, closeDb=False) if nbMsgs < 1: log.error("Couldn't execute function for message {}".format(message)) close_database() def format_header_value(val): return val.replace('\n', '').replace('\t', '').strip() # From https://stackoverflow.com/a/1094933 def sizeof_fmt(num, suffix='B'): for unit in ['','Ki','Mi','Gi','Ti','Pi','Ei','Zi']: if abs(num) < 1024.0: return "%3.1f %s%s" % (num, unit, suffix) num /= 1024.0 return "%.1f %s%s" % (num, 'Yi', suffix) PART_MULTI_FORMAT = colorama.Fore.BLUE + '{nb} {indent}+ {typ}' + colorama.Style.RESET_ALL PART_LEAF_FORMAT = colorama.Fore.BLUE + '{nb} {indent}→ {desc} ({typ}; {size})' + colorama.Style.RESET_ALL def show_parts_tree(part, lvl=0, nb=1): indent = lvl * '\t' typ = part.get_content_type() if part.is_multipart(): print(PART_MULTI_FORMAT.format(nb=nb, indent=indent, typ=typ)) payl = part.get_payload() size = 1 for obj in payl: size += show_parts_tree(obj, lvl=lvl+1, nb=nb+size) return size else: size = len(part.get_payload(decode=True)) desc = part.get('Content-Description', '') print(PART_LEAF_FORMAT.format(nb=nb, indent=indent, typ=typ, desc=desc, size=sizeof_fmt(size))) return 1 INTERESTING_HEADERS = ["Date", "From", "Subject", "To", "Cc", "Message-Id"] HEADER_FORMAT = colorama.Fore.BLUE + colorama.Style.BRIGHT + '{}:' + colorama.Style.NORMAL + ' {}' + colorama.Style.RESET_ALL def read_msg(msg): # Parse filename = msg.get_filename() parser = email.parser.BytesParser() with open(filename, 'rb') as f: mail = parser.parse(f) # Debug global a a = mail # Defects if len(mail.defects): log.warn("Defects found in the mail:") for defect in mail.defects: log.warn(mail.defects) # Headers for key in INTERESTING_HEADERS: val = mail.get(key) if val: val = format_header_value(val) print(HEADER_FORMAT.format(key, val)) # TODO Show all headers # TODO BONUS Highlight failed verifications show_parts_tree(mail) print() # Show text/plain for part in mail.walk(): if part.get_content_type() == "text/plain": payl = part.get_payload(decode=True) print(payl.decode()) perfstep("definitions") if __name__ == "__main__": # Main arguments parser = argparse.ArgumentParser(description="Meh mail client") selectedVerbosityLevels = ["DEBUG", "INFO", "WARNING", "ERROR", "FATAL"] parser.add_argument('-v', '--verbosity', choices=selectedVerbosityLevels, default='WARNING', help="Verbosity of log messages") # parser.add_argument('-n', '--dry-run', action='store_true', help="Don't do anything") # DEBUG defaultConfigFile = os.path.join(xdg.BaseDirectory.xdg_config_home, 'mel', 'accounts.conf') parser.add_argument('-c', '--config', default=defaultConfigFile, help="Accounts config file") subparsers = parser.add_subparsers(help="Action to execute") ## List messages def func_default(args): applyMsgs('tag:inbox', print_msg) parser.set_defaults(func=func_default) # inbox (default) def func_inbox(args): queryStr = 'tag:unread' if args.only_unread else 'tag:inbox' applyMsgs(queryStr, print_msg) parserInbox = subparsers.add_parser("inbox", help="Show unread, unsorted and flagged messages") parserInbox.add_argument('-u', '--only-unread', action='store_true', help="Show unread messages only") # TODO Make this more relevant parserInbox.set_defaults(func=func_inbox) # list folder [--recurse] ## List actions # flag msg... def func_flag(args): def flag_msg(msg): msg.add_tag('flagged') apply_msgs_input(args.message, flag_msg, write=True) parserFlag = subparsers.add_parser("flag", help="Mark messages as flagged") parserFlag.add_argument('message', nargs='*', help="Messages") parserFlag.set_defaults(func=func_flag) # unflag msg... def func_unflag(args): def unflag_msg(msg): msg.remove_tag('flagged') apply_msgs_input(args.message, unflag_msg, write=True) parserUnflag = subparsers.add_parser("unflag", help="Mark messages as not-flagged") parserUnflag.add_argument('message', nargs='*', help="Messages") parserUnflag.set_defaults(func=func_unflag) # delete msg... # spam msg... # move dest msg... ## Read message # read msg [--html] [--plain] [--browser] def func_read(args): apply_msgs_input(args.message, read_msg) parserRead = subparsers.add_parser("read", help="Read message") parserRead.add_argument('message', nargs=1, help="Messages") parserRead.set_defaults(func=func_read) # attach msg [id] [--save] (list if no id, xdg-open else) ## Redaction # new account # reply msg [--all] ## Folder management # tree [folder] # mkdir folder # rmdir folder (prevent if folder isn't empty (mail/subfolder)) # (yeah that should do) ## Meta # setup (interactive thing maybe) # fetch (mbsync, notmuch new, retag, notify; called by greater gods) def func_fetch(args): # Fetch mails log.info("Fetching mails") mbsyncConfigPath = os.path.expanduser("~/.mbsyncrc") # TODO Better cmd = ["mbsync", "--config", mbsyncConfigPath, "--all"] subprocess.run(cmd) # Index new mails notmuch_new() # Notify notify_all() update_polybar_status() # Tag new mails applyMsgs('tag:unprocessed', retag_msg, showProgress=True, write=True) parserFetch = subparsers.add_parser("fetch", help="Fetch mail, tag them, and run notifications") parserFetch.set_defaults(func=func_fetch) ## Debug # debug (various) def func_expose(args): # And leave the door open def expose_msg(a): global msg msg = a applyMsgs('tag:tuidyviU45m6flff', expose_msg, closeDb=False) def func_debug(args): from pprint import pprint pprint(list_folders()) parserDebug = subparsers.add_parser("debug", help="Who know what this holds...") parserDebug.set_defaults(verbosity='DEBUG') parserDebug.set_defaults(func=func_debug) # retag (all or unprocessed) def func_retag(args): applyMsgs('*', retag_msg, showProgress=True, write=True) parserRetag = subparsers.add_parser("retag", help="Retag all mails (when you changed configuration)") parserRetag.set_defaults(func=func_retag) # all def func_all(args): applyMsgs('*', print_msg) parserAll = subparsers.add_parser("all", help="Show ALL messages") parserAll.set_defaults(func=func_all) # Init args = parser.parse_args() perfstep("parse_args") colorama.init() coloredlogs.install(level=args.verbosity, fmt='%(levelname)s %(message)s') log = logging.getLogger() log.info("Loading config {}".format(args.config)) if not os.path.isfile(args.config): log.fatal("Config file not found: {}".format(args.config)) sys.exit(1) # TODO Create it, maybe? config = configparser.ConfigParser() config.read(args.config) generate_aliases() perfstep("config") if args.func: log.info("Executing function {}".format(args.func)) args.func(args) perfstep("exec") # DEBUG sys.exit(0) for kv in sorted(perf_dict.items(), key=lambda p: p[1]): log.debug("{1:.6f} {0}".format(*kv))