dotfiles/config/scripts/mel
Geoffrey Frogeye 5258e016ef mel: OOP
I said I would do it later but it's currently a mess so...
2019-10-28 11:49:06 +01:00

821 lines
28 KiB
Python
Executable file

#!/usr/bin/env python3
# pylint: disable=E1101
"""
Meh mail client
A dumb Python scripts that leverages notmuch, mbsync, and msmtp
to become a fully-functional extremly-opinonated mail client.
"""
# TODO Features
# TODO Implement initial command set
# TODO Lockfiles for write operations on mail files (mbsync,
# tags→maildir operations)
# TODO OPTI Lockfile per account and process everything in parallel
# (if implemented, this should be optional since while it may speed up
# the mail fetching process, its multi-threading nature would cause a
# lot of cache flushes and be not very efficient on battery)
# TODO Handle true character width
# TODO IMAP IDLE watches?
# TODO GPG
# TODO (only then) Refactor
# TODO Merge file with melConf
# TODO Config system revamp
import argparse
import configparser
import datetime
import email.message
import email.parser
import html
import logging
import os
import pdb
import re
import shutil
import subprocess
import sys
import traceback
import typing
import colorama
import coloredlogs
import notmuch
import progressbar
import xdg.BaseDirectory
MailLocation = typing.NewType('MailLocation', typing.Tuple[str, str, str])
class MelEngine:
"""
Class with all the functions for manipulating the database / mails.
"""
def load_config(self, config_path: str) -> configparser.ConfigParser:
"""
Load the configuration file into MelEngine
"""
self.log.info("Loading config file: %s", config_path)
if not os.path.isfile(config_path):
self.log.fatal("Config file not found!")
sys.exit(1)
# TODO Create it, maybe?
config = configparser.ConfigParser()
config.read(config_path)
# NOTE An empty/inexistant file while give an empty config
return config
def generate_aliases(self) -> None:
"""
Populate MelEngine.aliases and MelEngine.accounts
"""
assert self.config
for name in self.config.sections():
if not name.islower():
continue
section = self.config[name]
self.aliases.add(section["from"])
if "alternatives" in section:
for alt in section["alternatives"].split(";"):
self.aliases.add(alt)
self.accounts[name] = section
def __init__(self, config_path: str) -> None:
self.log = logging.getLogger("Mel")
self.config = self.load_config(config_path)
self.database = None
# Caches
self.accounts: typing.Dict[str, configparser.SectionProxy] = dict()
# All the emails the user is represented as:
self.aliases: typing.Set[str] = set()
# TODO If the user send emails to himself, maybe that wont cut it.
self.mailbox_colors: typing.Dict[str, str] = dict()
self.generate_aliases()
def notmuch_new(self) -> None:
"""
Runs `notmuch new`, which basically update the database
to match the mail folder.
"""
assert not self.database
self.log.info("Indexing mails")
notmuch_config_file = os.path.expanduser(
"~/.config/notmuch-config") # TODO Better
cmd = ["notmuch", "--config", notmuch_config_file, "new"]
self.log.debug(" ".join(cmd))
subprocess.run(cmd, check=True)
def list_folders(self) -> typing.List[typing.Tuple[str, ...]]:
"""
List all the folders of the mail dir.
"""
assert self.config
storage_path = os.path.realpath(
os.path.expanduser(self.config["GENERAL"]["storage"]))
folders = list()
for account in self.accounts:
storage_path_account = os.path.join(storage_path, account)
for root, dirs, _ in os.walk(storage_path_account):
if "cur" not in dirs or "new" not in dirs or "tmp" not in dirs:
continue
assert root.startswith(storage_path)
path = root[len(storage_path):]
path_split = path.split('/')
if path_split[0] == '':
path_split = path_split[1:]
folders.append(tuple(path_split))
return folders
def open_database(self, write: bool = False) -> None:
"""
Open an access notmuch database in read or read+write mode.
Be sure to require only in the mode you want to avoid deadlocks.
"""
assert self.config
mode = notmuch.Database.MODE.READ_WRITE if write \
else notmuch.Database.MODE.READ_ONLY
if self.database:
# If the requested mode is the one already present,
# or we request read when it's already write, do nothing
if mode in (self.database.mode, notmuch.Database.MODE.READ_ONLY):
return
self.log.info("Current database not in mode %s, closing", mode)
self.close_database()
self.log.info("Opening database in mode %s", mode)
db_path = os.path.realpath(
os.path.expanduser(self.config["GENERAL"]["storage"]))
self.database = notmuch.Database(mode=mode, path=db_path)
def close_database(self) -> None:
"""
Close the access notmuch database.
"""
if self.database:
self.log.info("Closing database")
self.database.close()
self.database = None
def get_location(self, msg: notmuch.Message) -> MailLocation:
"""
Return the filesystem location (relative to the mail directory)
of the given message.
"""
path = msg.get_filename()
path = os.path.dirname(path)
assert self.database
base = self.database.get_path()
assert path.startswith(base)
path = path[len(base):]
path_split = path.split('/')
mailbox = path_split[1]
assert mailbox in self.accounts
state = path_split[-1]
folder = tuple(path_split[2:-1])
assert state in {'cur', 'tmp', 'new'}
return (mailbox, folder, state)
def get_mailbox_color(self, mailbox: str) -> str:
"""
Return the color of the given mailbox in a ready to print
string with ASCII escape codes.
"""
assert self.config
if mailbox not in self.mailbox_colors:
# RGB colors (not supported everywhere)
# color_str = self.config[mailbox]["color"]
# color_str = color_str[1:] if color_str[0] == '#' else color_str
# R = int(color_str[0:2], 16)
# G = int(color_str[2:4], 16)
# B = int(color_str[4:6], 16)
# self.mailbox_colors[mailbox] = f"\x1b[38;2;{R};{G};{B}m"
color_int = int(self.config[mailbox]["color16"])
self.mailbox_colors[mailbox] = f"\x1b[38;5;{color_int}m"
return self.mailbox_colors[mailbox]
@staticmethod
def is_uid(uid: typing.Any) -> bool:
"""
Tells if the provided string is a valid UID.
"""
return isinstance(uid, str) and len(uid) == 12 \
and bool(re.match('^[a-zA-Z0-9+/]{12}$', uid))
@staticmethod
def extract_email(field: str) -> str:
"""
Extract the email adress from a To: or From: field
(usually the whole field or between < >)
"""
# TODO Can be made better (extract name and email)
# Also what happens with multiple dests?
try:
sta = field.index('<')
sto = field.index('>')
return field[sta+1:sto]
except ValueError:
return field
def retag_msg(self, msg: notmuch.Message) -> None:
"""
Update automatic tags for message.
"""
_, folder, _ = self.get_location(msg)
# Search-friendly folder name
slug_folder_list = list()
for fold_index, fold in [(fold_index, folder[fold_index])
for fold_index in range(len(folder))]:
if fold_index == 0 and len(folder) > 1 and fold == "INBOX":
continue
slug_folder_list.append(fold.upper())
slug_folder = tuple(slug_folder_list)
tags = set(msg.get_tags())
def tag_if(tag: str, condition: bool) -> None:
"""
Ensure the presence/absence of tag depending on the condition.
"""
nonlocal msg
if condition and tag not in tags:
msg.add_tag(tag)
elif not condition and tag in tags:
msg.remove_tag(tag)
expeditor = MelEngine.extract_email(msg.get_header('from'))
tag_if('inbox', slug_folder[0] == 'INBOX')
tag_if('spam', slug_folder[0] in ('JUNK', 'SPAM'))
tag_if('deleted', slug_folder[0] == 'TRASH')
tag_if('draft', slug_folder[0] == 'DRAFTS')
tag_if('sent', expeditor in self.aliases)
tag_if('unprocessed', False)
# UID
uid = msg.get_header("X-TUID")
if not MelEngine.is_uid(uid):
# TODO Happens to sent mails but should it?
print(f"{msg.get_filename()} has no UID!")
return
uidtag = 'tuid{}'.format(uid)
# Remove eventual others UID
for tag in tags:
if tag.startswith('tuid') and tag != uidtag:
msg.remove_tag(tag)
msg.add_tag(uidtag)
def apply_msgs(self, query_str: str, action: typing.Callable,
*args: typing.Any, show_progress: bool = False,
write: bool = False, close_db: bool = True,
**kwargs: typing.Any) -> int:
# TODO Detail the typing.Callable
"""
Run a function on the messages selected by the given query.
"""
self.open_database(write=write)
self.log.info("Querying %s", query_str)
query = notmuch.Query(self.database, query_str)
query.set_sort(notmuch.Query.SORT.OLDEST_FIRST)
elements = query.search_messages()
nb_msgs = query.count_messages()
iterator = progressbar.progressbar(
elements, max_value=nb_msgs) if show_progress else elements
self.log.info("Executing %s", action)
for msg in iterator:
if write:
msg.freeze()
action(msg, *args, **kwargs)
if write:
msg.thaw()
msg.tags_to_maildir_flags()
if close_db:
self.close_database()
return nb_msgs
class MelOutput:
"""
All functions that print mail stuff onto the screen.
"""
WIDTH_FIXED = 31
WIDTH_RATIO_DEST_SUBJECT = 0.3
def compute_line_format(self) -> typing.Tuple[typing.Optional[int],
typing.Optional[int]]:
"""
Based on the terminal width, assign the width of flexible columns.
"""
if self.is_tty:
columns, _ = shutil.get_terminal_size((80, 20))
remain = columns - MelOutput.WIDTH_FIXED - 1
dest_width = int(remain * MelOutput.WIDTH_RATIO_DEST_SUBJECT)
subject_width = remain - dest_width
return (dest_width, subject_width)
return (None, None)
def __init__(self, engine: MelEngine) -> None:
self.log = logging.getLogger("MelOutput")
self.engine = engine
self.is_tty = sys.stdout.isatty()
self.dest_width, self.subject_width = self.compute_line_format()
@staticmethod
def format_date(date: datetime.datetime) -> str:
"""
Format the given date as a 9-characters width string.
Show the time if the mail is less than 24h old,
else show the date.
"""
now = datetime.datetime.now()
if now - date < datetime.timedelta(days=1):
return date.strftime('%H:%M:%S')
return date.strftime('%y-%m-%d')
@staticmethod
def clip_text(size: typing.Optional[int], text: str) -> str:
"""
Fit text into the given character size,
fill with spaces if shorter,
clip with … if larger.
"""
if size is None:
return text
length = len(text)
if length == size:
return text
if length > size:
return text[:size-1] + ''
return text + ' ' * (size - length)
@staticmethod
def chunks(iterable: str, chunk_size: int) -> typing.Iterable[str]:
"""Yield successive chunk_size-sized chunks from iterable."""
# From https://stackoverflow.com/a/312464
for i in range(0, len(iterable), chunk_size):
yield iterable[i:i + chunk_size]
@staticmethod
def sizeof_fmt(num: int, suffix: str = 'B') -> str:
"""
Print the given size in a human-readable format.
"""
remainder = float(num)
# From https://stackoverflow.com/a/1094933
for unit in ['', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi']:
if abs(remainder) < 1024.0:
return "%3.1f %s%s" % (remainder, unit, suffix)
remainder /= 1024.0
return "%.1f %s%s" % (remainder, 'Yi', suffix)
def print_msg(self, msg: notmuch.Message) -> None:
"""
Print the given message header on one line.
"""
if not self.dest_width:
self.compute_line_format()
sep = " " if self.is_tty else "\t"
line = ""
tags = set(msg.get_tags())
mailbox, _, _ = self.engine.get_location(msg)
if self.is_tty:
line += self.engine.get_mailbox_color(mailbox)
# UID
uid = None
for tag in tags:
if tag.startswith('tuid'):
uid = tag[4:]
assert uid and MelEngine.is_uid(
uid), "{uid} ({type(UID)}) is not a valid UID."
line += uid
# Date
line += sep
date = datetime.datetime.fromtimestamp(msg.get_date())
line += self.format_date(date)
# Icons
line += sep
def tags2col1(tag1: str, tag2: str,
characters: typing.Tuple[str, str, str, str]) -> None:
"""
Show the presence/absence of two tags with one character.
"""
nonlocal line
both, first, second, none = characters
if tag1 in tags:
if tag2 in tags:
line += both
else:
line += first
else:
if tag2 in tags:
line += second
else:
line += none
tags2col1('spam', 'draft', ('?', 'S', 'D', ' '))
tags2col1('attachment', 'encrypted', ('E', 'A', 'E', ' '))
tags2col1('unread', 'flagged', ('!', 'U', 'F', ' '))
tags2col1('sent', 'replied', ('?', '', '', ' '))
if 'sent' in tags:
dest = msg.get_header("to")
else:
dest = msg.get_header("from")
line += sep
line += MelOutput.clip_text(self.dest_width, dest)
# Subject
line += sep
subject = msg.get_header("subject")
line += MelOutput.clip_text(self.subject_width, subject)
if self.is_tty:
line += colorama.Style.RESET_ALL
print(line)
def notify_msg(self, msg: notmuch.Message) -> None:
"""
Send a notification for the given message.
"""
self.log.info("Sending notification for %s", msg)
subject = msg.get_header("subject")
expd = msg.get_header("from")
account, _, _ = self.engine.get_location(msg)
summary = '{} (<i>{}</i>)'.format(html.escape(expd), account)
body = html.escape(subject)
cmd = ["notify-send", "-u", "low", "-i",
"mail-message-new", summary, body]
print(' '.join(cmd))
subprocess.run(cmd, check=False)
def notify_all(self) -> None:
"""
Send a notification for unprocessed and unread message.
Basically should only send a notification for a given message once
since it should be marked as processed right after.
"""
nb_msgs = self.engine.apply_msgs(
'tag:unread and tag:unprocessed', self.notify_msg)
if nb_msgs > 0:
self.log.info(
"Playing notification sound (%d new message(s))", nb_msgs)
cmd = ["play", "-n", "synth", "sine", "E4", "sine", "A5",
"remix", "1-2", "fade", "0.5", "1.2", "0.5", "2"]
subprocess.run(cmd, check=False)
@staticmethod
def format_header_value(val: str) -> str:
"""
Return split header values in a contiguous string.
"""
return val.replace('\n', '').replace('\t', '').strip()
PART_MULTI_FORMAT = colorama.Fore.BLUE + \
'{count} {indent}+ {typ}' + colorama.Style.RESET_ALL
PART_LEAF_FORMAT = colorama.Fore.BLUE + \
'{count} {indent}{desc} ({typ}; {size})' + \
colorama.Style.RESET_ALL
def show_parts_tree(self, part: email.message.Message,
depth: int = 0, count: int = 1) -> int:
"""
Show a tree of the parts contained in a message.
Return the number of parts of the mesage.
"""
indent = depth * '\t'
typ = part.get_content_type()
if part.is_multipart():
print(MelOutput.PART_MULTI_FORMAT.format(
count=count, indent=indent, typ=typ))
payl = part.get_payload()
assert isinstance(payl, list)
size = 1
for obj in payl:
size += self.show_parts_tree(obj, depth=depth+1,
count=count+size)
return size
payl = part.get_payload(decode=True)
assert isinstance(payl, bytes)
size = len(payl)
desc = part.get('Content-Description', '<no description>')
print(MelOutput.PART_LEAF_FORMAT.format(
count=count, indent=indent, typ=typ, desc=desc,
size=MelOutput.sizeof_fmt(size)))
return 1
INTERESTING_HEADERS = ["Date", "From", "Subject", "To", "Cc", "Message-Id"]
HEADER_FORMAT = colorama.Fore.BLUE + colorama.Style.BRIGHT + \
'{}:' + colorama.Style.NORMAL + ' {}' + colorama.Style.RESET_ALL
def read_msg(self, msg: notmuch.Message) -> None:
"""
Display the content of a mail.
"""
# Parse
filename = msg.get_filename()
parser = email.parser.BytesParser()
with open(filename, 'rb') as filedesc:
mail = parser.parse(filedesc)
# Defects
if mail.defects:
self.log.warning("Defects found in the mail:")
for defect in mail.defects:
self.log.warning(defect)
# Headers
for key in MelOutput.INTERESTING_HEADERS:
val = mail.get(key)
if val:
assert isinstance(val, str)
val = self.format_header_value(val)
print(MelOutput.HEADER_FORMAT.format(key, val))
# TODO Show all headers
# TODO BONUS Highlight failed verifications
self.show_parts_tree(mail)
print()
# Show text/plain
for part in mail.walk():
if part.get_content_type() == "text/plain":
payl = part.get_payload(decode=True)
assert isinstance(payl, bytes)
print(payl.decode())
class MelCLI():
"""
Handles the user input and run asked operations.
"""
VERBOSITY_LEVELS = ["DEBUG", "INFO", "WARNING", "ERROR", "FATAL"]
def apply_msgs_input(self, argmessages: typing.List[str],
action: typing.Callable, write: bool = False) -> None:
"""
Run a function on the message given by the user.
"""
# TODO First argument might be unecessary
if not argmessages:
from_stdin = not sys.stdin.isatty()
if argmessages:
from_stdin = len(argmessages) == 1 and argmessages == '-'
messages = list()
if from_stdin:
for line in sys.stdin:
uid = line[:12]
if not MelEngine.is_uid(uid):
self.log.error("Not an UID: %s", uid)
continue
messages.append(uid)
else:
for uids in argmessages:
if len(uids) > 12:
self.log.warning("Might have forgotten some spaces "
"between the UIDs. Don't worry, I'll "
"split them for you")
for uid in MelOutput.chunks(uids, 12):
if not MelEngine.is_uid(uid):
self.log.error("Not an UID: %s", uid)
continue
messages.append(uid)
for message in messages:
query_str = f'tag:tuid{message}'
nb_msgs = self.engine.apply_msgs(
query_str, action, write=write, close_db=False)
if nb_msgs < 1:
self.log.error(
"Couldn't execute function for message %s", message)
def operation_default(self) -> None:
"""
Default operation: list all message in the inbox
"""
self.engine.apply_msgs('tag:inbox', self.output.print_msg)
def operation_inbox(self) -> None:
"""
Inbox operation: list all message in the inbox,
possibly only the unread ones.
"""
query_str = 'tag:unread' if self.args.only_unread else 'tag:inbox'
self.engine.apply_msgs(query_str, self.output.print_msg)
def operation_flag(self) -> None:
"""
Flag operation: Flag user selected messages.
"""
def flag_msg(msg: notmuch.Message) -> None:
"""
Flag given message.
"""
msg.add_tag('flagged')
self.apply_msgs_input(self.args.message, flag_msg, write=True)
def operation_unflag(self) -> None:
"""
Unflag operation: Flag user selected messages.
"""
def unflag_msg(msg: notmuch.Message) -> None:
"""
Unflag given message.
"""
msg.remove_tag('flagged')
self.apply_msgs_input(self.args.message, unflag_msg, write=True)
def operation_read(self) -> None:
"""
Read operation: show full content of selected message
"""
self.apply_msgs_input(self.args.message, self.output.read_msg)
def operation_fetch(self) -> None:
"""
Fetch operation: Sync remote databases with the local one.
"""
# Fetch mails
self.log.info("Fetching mails")
mbsync_config_file = os.path.expanduser(
"~/.config/mbsyncrc") # TODO Better
cmd = ["mbsync", "--config", mbsync_config_file, "--all"]
subprocess.run(cmd, check=True)
# Index new mails
self.engine.notmuch_new()
# Notify
self.output.notify_all()
# Tag new mails
self.engine.apply_msgs('tag:unprocessed', self.engine.retag_msg,
show_progress=True, write=True)
def operation_debug(self) -> None:
"""
DEBUG
"""
from pprint import pprint
pprint(self.engine.list_folders())
def operation_retag(self) -> None:
"""
Retag operation: Manually retag all the mails in the database.
Mostly debug I suppose.
"""
self.engine.apply_msgs('*', self.engine.retag_msg,
show_progress=True, write=True)
def operation_all(self) -> None:
"""
All operation: list every single message.
"""
self.engine.apply_msgs('*', self.output.print_msg)
def add_subparsers(self) -> None:
"""
Add the operation parser to the main parser.
"""
# TODO If the only operation to the parser done are adding argument,
# we should automate this.
subparsers = self.parser.add_subparsers(help="Action to execute")
# List messages
self.parser.set_defaults(operation=self.operation_default)
# inbox (default)
parser_inbox = subparsers.add_parser(
"inbox", help="Show unread, unsorted and flagged messages")
parser_inbox.add_argument('-u', '--only-unread', action='store_true',
help="Show unread messages only")
# TODO Make this more relevant
parser_inbox.set_defaults(operation=self.operation_inbox)
# list folder [--recurse]
# List actions
# flag msg...
parser_flag = subparsers.add_parser(
"flag", help="Mark messages as flagged")
parser_flag.add_argument('message', nargs='*', help="Messages")
parser_flag.set_defaults(operation=self.operation_flag)
# unflag msg...
parser_unflag = subparsers.add_parser(
"unflag", help="Mark messages as not-flagged")
parser_unflag.add_argument('message', nargs='*', help="Messages")
parser_unflag.set_defaults(operation=self.operation_unflag)
# delete msg...
# spam msg...
# move dest msg...
# Read message
# read msg [--html] [--plain] [--browser]
parser_read = subparsers.add_parser("read", help="Read message")
parser_read.add_argument('message', nargs=1, help="Messages")
parser_read.set_defaults(operation=self.operation_read)
# attach msg [id] [--save] (list if no id, xdg-open else)
# Redaction
# new account
# reply msg [--all]
# Folder management
# tree [folder]
# mkdir folder
# rmdir folder (prevent if folder isn't empty (mail/subfolder))
# (yeah that should do)
# Meta
# setup (interactive thing maybe)
# fetch (mbsync, notmuch new, retag, notify; called by greater gods)
parser_fetch = subparsers.add_parser(
"fetch", help="Fetch mail, tag them, and run notifications")
parser_fetch.set_defaults(operation=self.operation_fetch)
# Debug
# debug (various)
parser_debug = subparsers.add_parser(
"debug", help="Who know what this holds...")
parser_debug.set_defaults(verbosity='DEBUG')
parser_debug.set_defaults(operation=self.operation_debug)
# retag (all or unprocessed)
parser_retag = subparsers.add_parser(
"retag", help="Retag all mails (when you changed configuration)")
parser_retag.set_defaults(operation=self.operation_retag)
# all
parser_all = subparsers.add_parser("all", help="Show ALL messages")
parser_all.set_defaults(operation=self.operation_all)
def create_parser(self) -> argparse.ArgumentParser:
"""
Create the main parser that will handle the user arguments.
"""
parser = argparse.ArgumentParser(description="Meh mail client")
parser.add_argument('-v', '--verbosity',
choices=MelCLI.VERBOSITY_LEVELS, default='WARNING',
help="Verbosity of self.log messages")
# parser.add_argument('-n', '--dry-run', action='store_true',
# help="Don't do anything") # DEBUG
default_config_file = os.path.join(
xdg.BaseDirectory.xdg_config_home, 'mel', 'accounts.conf')
parser.add_argument('-c', '--config', default=default_config_file,
help="Accounts config file")
return parser
def __init__(self) -> None:
self.log = logging.getLogger("MelCLI")
self.parser = self.create_parser()
self.add_subparsers()
self.args = self.parser.parse_args()
coloredlogs.install(level=self.args.verbosity,
fmt='%(levelname)s %(name)s %(message)s')
self.engine = MelEngine(self.args.config)
self.output = MelOutput(self.engine)
if self.args.operation:
self.log.info("Executing operation %s", self.args.operation)
self.args.operation()
if __name__ == "__main__":
colorama.init()
try:
CLI = MelCLI()
except:
EXTYPE, VALUE, TB = sys.exc_info()
traceback.print_exc()
pdb.post_mortem(TB)