#!/usr/bin/env python3 """ Meh mail client """ # TODO Features # TODO (only then) Refactor import notmuch import logging import coloredlogs import colorama import datetime import os import progressbar import time import argparse import configparser import base64 import shutil import argparse import xdg.BaseDirectory import sys import subprocess ACCOUNTS = dict() ALIASES = set() def generate_aliases(): for name in config.sections(): if not name.islower(): continue section = config[name] ALIASES.add(section["from"]) if "alternatives" in section: for alt in section["alternatives"].split(";"): ALIASES.add(alt) ACCOUNTS[name] = section def get_location(msg): path = msg.get_filename() path = os.path.dirname(path) base = db.get_path() assert path.startswith(base) path = path[len(base):] pathSplit = path.split('/') mailbox = pathSplit[1] assert mailbox in ACCOUNTS state = pathSplit[-1] folder = tuple(pathSplit[2:-1]) assert state in {'cur', 'tmp', 'new'} return (mailbox, folder, state) MAILBOX_COLORS = dict() def get_mailbox_color(mailbox): if mailbox not in MAILBOX_COLORS: colorStr = config[mailbox]["color"] colorStr = colorStr[1:] if colorStr[0] == '#' else colorStr R = int(colorStr[0:2], 16) G = int(colorStr[2:4], 16) B = int(colorStr[4:6], 16) MAILBOX_COLORS[mailbox] = '\x1b[38;2;{};{};{}m'.format(R, G, B) return MAILBOX_COLORS[mailbox] def format_date(date): now = datetime.datetime.now() midnight = datetime.datetime(year=now.year, month=now.month, day=now.day) if date > midnight: return date.strftime('%H:%M:%S') else: return date.strftime('%d/%m/%y') def threadIdToB64(tid): assert len(tid) == 16 tidInt = int(tid, 16) tidBytes = tidInt.to_bytes(8, 'big') tidB64 = base64.b64encode(tidBytes) assert len(tidB64) == 12 return tidB64.decode() WIDTH_FIXED = 30 WIDTH_RATIO_DEST_SUBJECT = 0.3 destWidth = None subjectWidth = None def compute_line_format(): columns, rows = shutil.get_terminal_size((80, 20)) remain = columns - WIDTH_FIXED - 1 global destWidth, subjectWidth destWidth = int(remain * WIDTH_RATIO_DEST_SUBJECT) subjectWidth = remain - destWidth def clip_text(size, text): l = len(text) if l == size: return text elif l > size: return text[:size-1] + '…' elif l < size: return text + " " * (size - l) def print_msg(msg): if not destWidth: compute_line_format() line = "" tags = set(msg.get_tags()) mailbox, folder, state = get_location(msg) line += get_mailbox_color(mailbox) # ID line += threadIdToB64(msg.get_thread_id()) # line += str(int(msg.get_thread_id(), 16)) # Date line += " " date = datetime.datetime.fromtimestamp(msg.get_date()) line += format_date(date) # Icons line += " " def tags2col1(tag1, tag2, both, first, second, none): nonlocal line if tag1 in tags: if tag2 in tags: line += both else: line += first else: if tag2 in tags: line += second else: line += none tags2col1('spam', 'draft', '?', 'S', 'D', ' ') tags2col1('attachment', 'encrypted', 'E', 'A', 'E', ' ') tags2col1('unread', 'flagged', '!', 'U', 'F', ' ') tags2col1('sent', 'replied', '?', '↑', '↪', ' ') if 'sent' in tags: dest = msg.get_header("to") else: dest = msg.get_header("from") line += " " line += clip_text(destWidth, dest) # Subject line += " " subject = msg.get_header("subject") line += clip_text(subjectWidth, subject) line += colorama.Style.RESET_ALL print(line) def retag_msg(msg): msg.freeze() mailbox, folder, state = get_location(msg) # Search-friendly folder name slugFolderList = list() for f, fold in [(f, folder[f]) for f in range(len(folder))]: if f == 0 and len(folder) > 1 and fold == "INBOX": continue slugFolderList.append(fold.upper()) slugFolder = tuple(slugFolderList) tags = set(msg.get_tags()) def tag_if(tag, condition): if condition and tag not in tags: msg.add_tag(tag) elif not condition and tag in tags: msg.remove_tag(tag) expeditor = extract_email(msg.get_header('from')) tag_if('inbox', slugFolder[0] == 'INBOX') tag_if('spam', slugFolder[0] == 'JUNK' or slugFolder[0] == 'SPAM') tag_if('deleted', slugFolder[0] == 'TRASH') tag_if('draft', slugFolder[0] == 'DRAFTS') tag_if('sent', expeditor in ALIASES) # TODO remove unprocessed # Save msg.thaw() msg.tags_to_maildir_flags() def extract_email(field): try: sta = field.index('<') sto = field.index('>') return field[sta+1:sto] except ValueError: return field def applyMsgs(queryStr, action, *args, showProgress=False, **kwargs): log.info("Querying {}".format(queryStr)) query = notmuch.Query(db, queryStr) query.set_sort(notmuch.Query.SORT.OLDEST_FIRST) elements = query.search_messages() if showProgress: nbMsgs = query.count_messages() iterator = progressbar.progressbar(elements, max_value=nbMsgs) else: iterator = elements log.info("Executing {}".format(action)) for msg in iterator: action(msg, *args, **kwargs) # applyMsgs('*', print_msg) # applyMsgs('tag:inbox', print_msg) # applyMsgs('tag:spam', print_msg) # applyMsgs('tag:unread', print_msg) # applyMsgs('tag:unprocessed', print_msg) # applyMsgs('from:geoffrey@frogeye.fr', print_msg) # applyMsgs('tag:unprocessed', retag_msg, useProgressbar=True) # applyMsgs('*', retag_msg, useProgressbar=True) if __name__ == "__main__": # Main arguments parser = argparse.ArgumentParser(description="Meh mail client") selectedVerbosityLevels = ["DEBUG", "INFO", "WARNING", "ERROR", "FATAL"] parser.add_argument('-v', '--verbosity', choices=selectedVerbosityLevels, default='WARNING', help="Verbosity of log messages") # parser.add_argument('-n', '--dry-run', action='store_true', help="Don't do anything") # DEBUG defaultConfigFile = os.path.join(xdg.BaseDirectory.xdg_config_home, 'mel', 'accounts.conf') parser.add_argument('-c', '--config', default=defaultConfigFile, help="Accounts config file") parser.set_defaults(dbmode=notmuch.Database.MODE.READ_ONLY) parser.set_defaults(showProgress=False) parser.set_defaults(useThreads=False) parser.set_defaults(actionBefore=None) parser.set_defaults(actionAfter=None) parser.set_defaults(action=None) subparsers = parser.add_subparsers(help="Action to execute", required=True) ## List messages # inbox (default) def func_inbox(args): queryStr = 'tag:unread' if args.only_unread else 'tag:inbox' applyMsgs(queryStr, print_msg) parserInbox = subparsers.add_parser("inbox", help="Show unread, unsorted and flagged messages") parserInbox.add_argument('-u', '--only-unread', action='store_true', help="Show unread messages only") # TODO Make this more relevant parserInbox.set_defaults(func=func_inbox) # list folder [--recurse] ## List actions # delete msg... # spam msg... # flag msg... # ↑un* equivalents # move dest msg... ## Read message # read msg [--html] [--plain] [--browser] # attach msg [id] [--save] (list if no id, xdg-open else) ## Redaction # new account # reply msg [--all] ## Folder management # mkdir folder # rmdir folder (prevent if folder isn't empty (mail/subfolder)) # (yeah that should do) ## Meta # setup (interactive thing maybe) # fetch (mbsync, notmuch new, retag, notify; called by greater gods) def func_fetch(args): # Fetch mails log.info("Fetching mails") mbsyncConfigPath = os.path.expanduser("~/.mbsyncrc") # TODO Better cmd = ["mbsync", "--config", mbsyncConfigPath, "--all"] subprocess.run(cmd) # Index new mails log.info("Indexing mails") log.error("TODO Can't `notmuch new` when database is already open!") notmuchConfigPath = os.path.expanduser("~/.notmuchrc") # TODO Better cmd = ["notmuch", "--config", notmuchConfigPath, "new"] log.debug(" ".join(cmd)) subprocess.run(cmd) # Tag new mails applyMsgs('tag:unprocessed', retag_msg, showProgress=True) # Notify log.info("Notifying new mails") # TODO Maybe before retag, notify unprocessed && unread parserFetch = subparsers.add_parser("fetch", help="Fetch mail, tag them, and run notifications") parserFetch.set_defaults(dbmode=notmuch.Database.MODE.READ_WRITE) parserFetch.set_defaults(func=func_fetch) ## Debug # process (all or unprocessed) args = parser.parse_args() print(args) # Installing logs colorama.init() coloredlogs.install(level=args.verbosity, fmt='%(levelname)s %(message)s') log = logging.getLogger() log.info("Loading config {}".format(args.config)) if not os.path.isfile(args.config): log.fatal("Config file not found: {}".format(args.config)) sys.exit(1) # TODO Create it, maybe? config = configparser.ConfigParser() config.read(args.config) generate_aliases() if args.dbmode is not None: log.info("Loading database") dbPath = os.path.realpath(os.path.expanduser(config["GENERAL"]["storage"])) db = notmuch.Database(mode=args.dbmode, path=dbPath) if args.func: log.info("Executing function {}".format(args.func)) args.func(args)