#!/usr/bin/env python3 # pylint: disable=E1101 """ Meh mail client A dumb Python scripts that leverages notmuch, mbsync, and msmtp to become a fully-functional extremly-opinonated mail client. """ # TODO Features # TODO Implement initial command set # TODO Lockfiles for write operations on mail files (mbsync, # tags→maildir operations) # TODO OPTI Lockfile per account and process everything in parallel # (if implemented, this should be optional since while it may speed up # the mail fetching process, its multi-threading nature would cause a # lot of cache flushes and be not very efficient on battery) # TODO Handle true character width # TODO IMAP IDLE watches? # TODO GPG # TODO (only then) Refactor # TODO Merge file with melConf # TODO Config system revamp import argparse import configparser import datetime import email.message import email.parser import html import logging import mailcap import os import pdb import re import shutil import subprocess import sys import traceback import typing import colorama import coloredlogs import notmuch import progressbar import xdg.BaseDirectory MailLocation = typing.NewType("MailLocation", typing.Tuple[str, str, str]) # MessageAction = typing.Callable[[notmuch.Message], None] class MelEngine: """ Class with all the functions for manipulating the database / mails. """ def load_config(self, config_path: str) -> configparser.ConfigParser: """ Load the configuration file into MelEngine """ self.log.info("Loading config file: %s", config_path) if not os.path.isfile(config_path): self.log.fatal("Config file not found!") sys.exit(1) # TODO Create it, maybe? config = configparser.ConfigParser() config.read(config_path) # NOTE An empty/inexistant file while give an empty config return config def generate_aliases(self) -> None: """ Populate MelEngine.aliases and MelEngine.accounts """ assert self.config for name in self.config.sections(): if not name.islower(): continue section = self.config[name] self.aliases.add(section["from"]) if "alternatives" in section: for alt in section["alternatives"].split(";"): self.aliases.add(alt) self.accounts[name] = section def __init__(self, config_path: str) -> None: self.log = logging.getLogger("MelEngine") self.config = self.load_config(config_path) self.database = None # Caches self.accounts: typing.Dict[str, configparser.SectionProxy] = dict() # All the emails the user is represented as: self.aliases: typing.Set[str] = set() # TODO If the user send emails to himself, maybe that wont cut it. self.generate_aliases() def notmuch_new(self) -> None: """ Runs `notmuch new`, which basically update the database to match the mail folder. """ assert not self.database self.log.info("Indexing mails") notmuch_config_file = os.path.expanduser( "~/.config/notmuch-config" ) # TODO Better cmd = ["notmuch", "--config", notmuch_config_file, "new"] self.log.debug(" ".join(cmd)) subprocess.run(cmd, check=True) def list_folders(self) -> typing.List[typing.Tuple[str, ...]]: """ List all the folders of the mail dir. """ assert self.config storage_path = os.path.realpath( os.path.expanduser(self.config["GENERAL"]["storage"]) ) folders = list() for account in self.accounts: storage_path_account = os.path.join(storage_path, account) for root, dirs, _ in os.walk(storage_path_account): if "cur" not in dirs or "new" not in dirs or "tmp" not in dirs: continue assert root.startswith(storage_path) path = root[len(storage_path) :] path_split = path.split("/") if path_split[0] == "": path_split = path_split[1:] folders.append(tuple(path_split)) return folders def open_database(self, write: bool = False) -> None: """ Open an access notmuch database in read or read+write mode. Be sure to require only in the mode you want to avoid deadlocks. """ assert self.config mode = ( notmuch.Database.MODE.READ_WRITE if write else notmuch.Database.MODE.READ_ONLY ) if self.database: # If the requested mode is the one already present, # or we request read when it's already write, do nothing if mode in (self.database.mode, notmuch.Database.MODE.READ_ONLY): return self.log.info("Current database not in mode %s, closing", mode) self.close_database() self.log.info("Opening database in mode %s", mode) db_path = os.path.realpath( os.path.expanduser(self.config["GENERAL"]["storage"]) ) self.database = notmuch.Database(mode=mode, path=db_path) def close_database(self) -> None: """ Close the access notmuch database. """ if self.database: self.log.info("Closing database") self.database.close() self.database = None def get_location(self, msg: notmuch.Message) -> MailLocation: """ Return the filesystem location (relative to the mail directory) of the given message. """ path = msg.get_filename() path = os.path.dirname(path) assert self.database base = self.database.get_path() assert path.startswith(base) path = path[len(base) :] path_split = path.split("/") mailbox = path_split[1] assert mailbox in self.accounts state = path_split[-1] folder = tuple(path_split[2:-1]) assert state in {"cur", "tmp", "new"} return (mailbox, folder, state) @staticmethod def is_uid(uid: typing.Any) -> bool: """ Tells if the provided string is a valid UID. """ return ( isinstance(uid, str) and len(uid) == 12 and bool(re.match("^[a-zA-Z0-9+/]{12}$", uid)) ) @staticmethod def extract_email(field: str) -> str: """ Extract the email adress from a To: or From: field (usually the whole field or between < >) """ # TODO Can be made better (extract name and email) # Also what happens with multiple dests? try: sta = field.index("<") sto = field.index(">") return field[sta + 1 : sto] except ValueError: return field def retag_msg(self, msg: notmuch.Message) -> None: """ Update automatic tags for message. """ _, folder, _ = self.get_location(msg) # Search-friendly folder name slug_folder_list = list() for fold_index, fold in [ (fold_index, folder[fold_index]) for fold_index in range(len(folder)) ]: if fold_index == 0 and len(folder) > 1 and fold == "INBOX": continue slug_folder_list.append(fold.upper()) slug_folder = tuple(slug_folder_list) tags = set(msg.get_tags()) def tag_if(tag: str, condition: bool) -> None: """ Ensure the presence/absence of tag depending on the condition. """ nonlocal msg if condition and tag not in tags: msg.add_tag(tag) elif not condition and tag in tags: msg.remove_tag(tag) expeditor = MelEngine.extract_email(msg.get_header("from")) tag_if("inbox", slug_folder[0] == "INBOX") tag_if("spam", slug_folder[0] in ("JUNK", "SPAM")) tag_if("deleted", slug_folder[0] == "TRASH") tag_if("draft", slug_folder[0] == "DRAFTS") tag_if("sent", expeditor in self.aliases) tag_if("unprocessed", False) # UID uid = msg.get_header("X-TUID") if not MelEngine.is_uid(uid): # TODO Happens to sent mails but should it? print(f"{msg.get_filename()} has no UID!") return uidtag = "tuid{}".format(uid) # Remove eventual others UID for tag in tags: if tag.startswith("tuid") and tag != uidtag: msg.remove_tag(tag) msg.add_tag(uidtag) def apply_msgs( self, query_str: str, action: typing.Callable, *args: typing.Any, show_progress: bool = False, write: bool = False, close_db: bool = True, **kwargs: typing.Any, ) -> int: """ Run a function on the messages selected by the given query. """ self.open_database(write=write) self.log.info("Querying %s", query_str) query = notmuch.Query(self.database, query_str) query.set_sort(notmuch.Query.SORT.OLDEST_FIRST) elements = query.search_messages() nb_msgs = query.count_messages() iterator = ( progressbar.progressbar(elements, max_value=nb_msgs) if show_progress and nb_msgs else elements ) self.log.info("Executing %s", action) for msg in iterator: self.log.debug("On mail %s", msg) if write: msg.freeze() action(msg, *args, **kwargs) if write: msg.thaw() msg.tags_to_maildir_flags() if close_db: self.close_database() return nb_msgs class MelOutput: """ All functions that print mail stuff onto the screen. """ WIDTH_FIXED = 31 WIDTH_RATIO_DEST_SUBJECT = 0.3 def compute_line_format( self, ) -> typing.Tuple[typing.Optional[int], typing.Optional[int]]: """ Based on the terminal width, assign the width of flexible columns. """ if self.is_tty: columns, _ = shutil.get_terminal_size((80, 20)) remain = columns - MelOutput.WIDTH_FIXED - 1 dest_width = int(remain * MelOutput.WIDTH_RATIO_DEST_SUBJECT) subject_width = remain - dest_width return (dest_width, subject_width) return (None, None) def __init__(self, engine: MelEngine) -> None: colorama.init() self.log = logging.getLogger("MelOutput") self.engine = engine self.light_background = True self.is_tty = sys.stdout.isatty() self.dest_width, self.subject_width = self.compute_line_format() self.mailbox_colors: typing.Dict[str, str] = dict() # TODO Allow custom path self.caps = mailcap.getcaps() @staticmethod def format_date(date: datetime.datetime) -> str: """ Format the given date as a 9-characters width string. Show the time if the mail is less than 24h old, else show the date. """ now = datetime.datetime.now() if now - date < datetime.timedelta(days=1): return date.strftime("%H:%M:%S") if now - date < datetime.timedelta(days=28): return date.strftime("%d %H:%M") if now - date < datetime.timedelta(days=365): return date.strftime("%m-%d %H") return date.strftime("%y-%m-%d") @staticmethod def clip_text(size: typing.Optional[int], text: str) -> str: """ Fit text into the given character size, fill with spaces if shorter, clip with … if larger. """ if size is None: return text length = len(text) if length == size: return text if length > size: return text[: size - 1] + "…" return text + " " * (size - length) @staticmethod def chunks(iterable: str, chunk_size: int) -> typing.Iterable[str]: """Yield successive chunk_size-sized chunks from iterable.""" # From https://stackoverflow.com/a/312464 for i in range(0, len(iterable), chunk_size): yield iterable[i : i + chunk_size] @staticmethod def sizeof_fmt(num: int, suffix: str = "B") -> str: """ Print the given size in a human-readable format. """ remainder = float(num) # From https://stackoverflow.com/a/1094933 for unit in ["", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"]: if abs(remainder) < 1024.0: return "%3.1f %s%s" % (remainder, unit, suffix) remainder /= 1024.0 return "%.1f %s%s" % (remainder, "Yi", suffix) def get_mailbox_color(self, mailbox: str) -> str: """ Return the color of the given mailbox in a ready to print string with ASCII escape codes. """ if not self.is_tty: return "" if mailbox not in self.mailbox_colors: # RGB colors (not supported everywhere) # color_str = self.config[mailbox]["color"] # color_str = color_str[1:] if color_str[0] == '#' else color_str # R = int(color_str[0:2], 16) # G = int(color_str[2:4], 16) # B = int(color_str[4:6], 16) # self.mailbox_colors[mailbox] = f"\x1b[38;2;{R};{G};{B}m" color_int = int(self.engine.config[mailbox]["color16"]) self.mailbox_colors[mailbox] = f"\x1b[38;5;{color_int}m" return self.mailbox_colors[mailbox] def print_msg(self, msg: notmuch.Message) -> None: """ Print the given message header on one line. """ if not self.dest_width: self.compute_line_format() sep = " " if self.is_tty else "\t" line = "" tags = set(msg.get_tags()) mailbox, _, _ = self.engine.get_location(msg) if "unread" in tags or "flagged" in tags: line += colorama.Style.BRIGHT # if 'flagged' in tags: # line += colorama.Style.BRIGHT # if 'unread' not in tags: # line += colorama.Style.DIM line += ( colorama.Back.LIGHTBLACK_EX if self.light_background else colorama.Back.BLACK ) self.light_background = not self.light_background line += self.get_mailbox_color(mailbox) # UID uid = None for tag in tags: if tag.startswith("tuid"): uid = tag[4:] assert uid, f"No UID for message: {msg}." assert MelEngine.is_uid(uid), f"{uid} {type(uid)} is not a valid UID." line += uid # Date line += sep + colorama.Fore.MAGENTA date = datetime.datetime.fromtimestamp(msg.get_date()) line += self.format_date(date) # Icons line += sep + colorama.Fore.RED def tags2col1( tag1: str, tag2: str, characters: typing.Tuple[str, str, str, str] ) -> None: """ Show the presence/absence of two tags with one character. """ nonlocal line both, first, second, none = characters if tag1 in tags: if tag2 in tags: line += both else: line += first else: if tag2 in tags: line += second else: line += none tags2col1("spam", "draft", ("?", "S", "D", " ")) tags2col1("attachment", "encrypted", ("E", "A", "E", " ")) tags2col1("unread", "flagged", ("!", "U", "F", " ")) tags2col1("sent", "replied", ("?", "↑", "↪", " ")) # Opposed line += sep + colorama.Fore.BLUE if "sent" in tags: dest = msg.get_header("to") else: dest = msg.get_header("from") line += MelOutput.clip_text(self.dest_width, dest) # Subject line += sep + colorama.Fore.WHITE subject = msg.get_header("subject") line += MelOutput.clip_text(self.subject_width, subject) if self.is_tty: line += colorama.Style.RESET_ALL print(line) def notify_msg(self, msg: notmuch.Message) -> None: """ Send a notification for the given message. """ self.log.info("Sending notification for %s", msg) subject = msg.get_header("subject") expd = msg.get_header("from") account, _, _ = self.engine.get_location(msg) summary = "{} (<i>{}</i>)".format(html.escape(expd), account) body = html.escape(subject) cmd = ["notify-send", "-u", "low", "-i", "mail-message-new", summary, body] print(" ".join(cmd)) subprocess.run(cmd, check=False) def notify_all(self) -> None: """ Send a notification for unprocessed and unread message. Basically should only send a notification for a given message once since it should be marked as processed right after. """ nb_msgs = self.engine.apply_msgs( "tag:unread and tag:unprocessed", self.notify_msg ) if nb_msgs > 0: self.log.info("Playing notification sound (%d new message(s))", nb_msgs) cmd = [ "play", "-n", "synth", "sine", "E4", "sine", "A5", "remix", "1-2", "fade", "0.5", "1.2", "0.5", "2", ] subprocess.run(cmd, check=False) @staticmethod def format_header_value(val: str) -> str: """ Return split header values in a contiguous string. """ return val.replace("\n", "").replace("\t", "").strip() PART_MULTI_FORMAT = ( colorama.Fore.BLUE + "{count} {indent}+ {typ}" + colorama.Style.RESET_ALL ) PART_LEAF_FORMAT = ( colorama.Fore.BLUE + "{count} {indent}→ {desc} ({typ}; {size})" + colorama.Style.RESET_ALL ) def show_parts_tree( self, part: email.message.Message, depth: int = 0, count: int = 1 ) -> int: """ Show a tree of the parts contained in a message. Return the number of parts of the mesage. """ indent = depth * "\t" typ = part.get_content_type() if part.is_multipart(): print( MelOutput.PART_MULTI_FORMAT.format(count=count, indent=indent, typ=typ) ) payl = part.get_payload() assert isinstance(payl, list) size = 1 for obj in payl: size += self.show_parts_tree(obj, depth=depth + 1, count=count + size) return size payl = part.get_payload(decode=True) assert isinstance(payl, bytes) size = len(payl) desc = part.get("Content-Description", "<no description>") print( MelOutput.PART_LEAF_FORMAT.format( count=count, indent=indent, typ=typ, desc=desc, size=MelOutput.sizeof_fmt(size), ) ) return 1 INTERESTING_HEADERS = ["Date", "From", "Subject", "To", "Cc", "Message-Id"] HEADER_FORMAT = ( colorama.Fore.BLUE + colorama.Style.BRIGHT + "{}:" + colorama.Style.NORMAL + " {}" + colorama.Style.RESET_ALL ) def read_msg(self, msg: notmuch.Message) -> None: """ Display the content of a mail. """ # Parse filename = msg.get_filename() parser = email.parser.BytesParser() with open(filename, "rb") as filedesc: mail = parser.parse(filedesc) # Defects if mail.defects: self.log.warning("Defects found in the mail:") for defect in mail.defects: self.log.warning(defect) # Headers for key in MelOutput.INTERESTING_HEADERS: val = mail.get(key) if val: assert isinstance(val, str) val = self.format_header_value(val) print(MelOutput.HEADER_FORMAT.format(key, val)) # TODO Show all headers # TODO BONUS Highlight failed verifications self.show_parts_tree(mail) print() # Show text/plain # TODO Consider alternative for part in mail.walk(): if part.is_multipart(): continue payl = part.get_payload(decode=True) assert isinstance(payl, bytes) if part.get_content_type() == "text/plain": print(payl.decode()) else: # TODO Use nametemplate from mailcap temp_file = "/tmp/melcap.html" # TODO Real temporary file # TODO FIFO if possible with open(temp_file, "wb") as temp_filedesc: temp_filedesc.write(payl) command, _ = mailcap.findmatch( self.caps, part.get_content_type(), key="view", filename=temp_file ) if command: os.system(command) def print_dir_list(self) -> None: """ Print a colored directory list. Every line is easilly copiable. """ for arb in self.engine.list_folders(): line = colorama.Fore.LIGHTBLACK_EX + "'" line += self.get_mailbox_color(arb[0]) line += arb[0].replace("'", "\\'") line += colorama.Fore.LIGHTBLACK_EX for inter in arb[1:-1]: line += "/" + inter.replace("'", "\\'") line += "/" + colorama.Fore.WHITE + arb[-1].replace("'", "\\'") line += colorama.Fore.LIGHTBLACK_EX + "'" line += colorama.Style.RESET_ALL print(line) class MelCLI: """ Handles the user input and run asked operations. """ VERBOSITY_LEVELS = ["NOTSET", "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] def apply_msgs_input( self, argmessages: typing.List[str], action: typing.Callable, write: bool = False, ) -> None: """ Run a function on the message given by the user. """ # TODO First argument might be unecessary if not argmessages: from_stdin = not sys.stdin.isatty() if argmessages: from_stdin = len(argmessages) == 1 and argmessages == "-" messages = list() if from_stdin: for line in sys.stdin: uid = line[:12] if not MelEngine.is_uid(uid): self.log.error("Not an UID: %s", uid) continue messages.append(uid) else: for uids in argmessages: if len(uids) > 12: self.log.warning( "Might have forgotten some spaces " "between the UIDs. Don't worry, I'll " "split them for you" ) for uid in MelOutput.chunks(uids, 12): if not MelEngine.is_uid(uid): self.log.error("Not an UID: %s", uid) continue messages.append(uid) for message in messages: query_str = f"tag:tuid{message}" nb_msgs = self.engine.apply_msgs( query_str, action, write=write, close_db=False ) if nb_msgs < 1: self.log.error("Couldn't execute function for message %s", message) self.engine.close_database() def operation_default(self) -> None: """ Default operation: list all message in the inbox """ self.engine.apply_msgs("tag:inbox", self.output.print_msg) def operation_inbox(self) -> None: """ Inbox operation: list all message in the inbox, possibly only the unread ones. """ query_str = "tag:unread" if self.args.only_unread else "tag:inbox" self.engine.apply_msgs(query_str, self.output.print_msg) def operation_flag(self) -> None: """ Flag operation: Flag user selected messages. """ def flag_msg(msg: notmuch.Message) -> None: """ Flag given message. """ msg.add_tag("flagged") self.apply_msgs_input(self.args.message, flag_msg, write=True) def operation_unflag(self) -> None: """ Unflag operation: Flag user selected messages. """ def unflag_msg(msg: notmuch.Message) -> None: """ Unflag given message. """ msg.remove_tag("flagged") self.apply_msgs_input(self.args.message, unflag_msg, write=True) def operation_read(self) -> None: """ Read operation: show full content of selected message """ self.apply_msgs_input(self.args.message, self.output.read_msg) def operation_fetch(self) -> None: """ Fetch operation: Sync remote databases with the local one. """ # Fetch mails self.log.info("Fetching mails") mbsync_config_file = os.path.expanduser("~/.config/mbsyncrc") # TODO Better cmd = ["mbsync", "--config", mbsync_config_file, "--all"] subprocess.run(cmd, check=False) # Index new mails self.engine.notmuch_new() # Notify self.output.notify_all() # Tag new mails self.engine.apply_msgs( "tag:unprocessed", self.engine.retag_msg, show_progress=True, write=True ) def operation_list(self) -> None: """ List operation: Print all folders. """ self.output.print_dir_list() def operation_debug(self) -> None: """ DEBUG """ print("UwU") def operation_retag(self) -> None: """ Retag operation: Manually retag all the mails in the database. Mostly debug I suppose. """ self.engine.apply_msgs( "*", self.engine.retag_msg, show_progress=True, write=True ) def operation_all(self) -> None: """ All operation: list every single message. """ self.engine.apply_msgs("*", self.output.print_msg) def add_subparsers(self) -> None: """ Add the operation parser to the main parser. """ # TODO If the only operation to the parser done are adding argument, # we should automate this. subparsers = self.parser.add_subparsers(help="Action to execute") # List messages self.parser.set_defaults(operation=self.operation_default) # inbox (default) parser_inbox = subparsers.add_parser( "inbox", help="Show unread, unsorted and flagged messages" ) parser_inbox.add_argument( "-u", "--only-unread", action="store_true", help="Show unread messages only" ) # TODO Make this more relevant parser_inbox.set_defaults(operation=self.operation_inbox) # list folder [--recurse] # List actions parser_list = subparsers.add_parser("list", help="List all folders") # parser_list.add_argument('message', nargs='*', help="Messages") parser_list.set_defaults(operation=self.operation_list) # flag msg... parser_flag = subparsers.add_parser("flag", help="Mark messages as flagged") parser_flag.add_argument("message", nargs="*", help="Messages") parser_flag.set_defaults(operation=self.operation_flag) # unflag msg... parser_unflag = subparsers.add_parser( "unflag", help="Mark messages as not-flagged" ) parser_unflag.add_argument("message", nargs="*", help="Messages") parser_unflag.set_defaults(operation=self.operation_unflag) # delete msg... # spam msg... # move dest msg... # Read message # read msg [--html] [--plain] [--browser] parser_read = subparsers.add_parser("read", help="Read message") parser_read.add_argument("message", nargs=1, help="Messages") parser_read.set_defaults(operation=self.operation_read) # attach msg [id] [--save] (list if no id, xdg-open else) # Redaction # new account # reply msg [--all] # Folder management # tree [folder] # mkdir folder # rmdir folder (prevent if folder isn't empty (mail/subfolder)) # (yeah that should do) # Meta # setup (interactive thing maybe) # fetch (mbsync, notmuch new, retag, notify; called by greater gods) parser_fetch = subparsers.add_parser( "fetch", help="Fetch mail, tag them, and run notifications" ) parser_fetch.set_defaults(operation=self.operation_fetch) # Debug # debug (various) parser_debug = subparsers.add_parser( "debug", help="Who know what this holds..." ) parser_debug.set_defaults(verbosity="DEBUG") parser_debug.set_defaults(operation=self.operation_debug) # retag (all or unprocessed) parser_retag = subparsers.add_parser( "retag", help="Retag all mails (when you changed configuration)" ) parser_retag.set_defaults(operation=self.operation_retag) # all parser_all = subparsers.add_parser("all", help="Show ALL messages") parser_all.set_defaults(operation=self.operation_all) def create_parser(self) -> argparse.ArgumentParser: """ Create the main parser that will handle the user arguments. """ parser = argparse.ArgumentParser(description="Meh mail client") parser.add_argument( "-v", "--verbosity", choices=MelCLI.VERBOSITY_LEVELS, default="WARNING", help="Verbosity of self.log messages", ) # parser.add_argument('-n', '--dry-run', action='store_true', # help="Don't do anything") # DEBUG default_config_file = os.path.join( xdg.BaseDirectory.xdg_config_home, "mel", "accounts.conf" ) parser.add_argument( "-c", "--config", default=default_config_file, help="Accounts config file" ) return parser def __init__(self) -> None: self.log = logging.getLogger("MelCLI") self.parser = self.create_parser() self.add_subparsers() self.args = self.parser.parse_args() coloredlogs.install( level=self.args.verbosity, fmt="%(levelname)s %(name)s %(message)s" ) self.engine = MelEngine(self.args.config) self.output = MelOutput(self.engine) if self.args.operation: self.log.info("Executing operation %s", self.args.operation) self.args.operation() if __name__ == "__main__": if not os.environ.get("MEL_DEBUG"): CLI = MelCLI() else: try: CLI = MelCLI() except: EXTYPE, VALUE, TB = sys.exc_info() traceback.print_exc() pdb.post_mortem(TB)