dotfiles/scripts/mel
2018-08-14 19:25:07 +02:00

599 lines
18 KiB
Python
Executable file

#!/usr/bin/env python3
"""
Meh mail client
A dumb Python scripts that leverages notmuch, mbsync, and msmtp
to become a fully-functional extremly-opinonated mail client.
"""
# TODO Features
# TODO Lockfiles for write operations on mail files (mbsync, tags→maildir operations)
# TODO OPTI Lockfile per account and process everything in parallel (if implemented, this
# should be optional since while it may speed up the mail fetching process, its multi-threading
# nature would cause a lot of cache flushes and be not very efficient on battery)
# TODO Handle true character width
# TODO IMAP IDLE watches?
# TODO (only then) Refactor
# TODO OOP-based
# TODO Merge file with melConf
# DEBUG Small perf profiler
import time
perf_dict = dict()
perf_last = time.perf_counter()
def perfstep(name):
t = time.perf_counter()
global perf_last
global perf_dict
diff = t - perf_last
if name not in perf_dict:
perf_dict[name] = 0
perf_dict[name] += diff
perf_last = time.perf_counter()
import notmuch
import logging
import coloredlogs
import colorama
import datetime
import os
import progressbar
import argparse
import configparser
import base64
import shutil
import argparse
import xdg.BaseDirectory
import sys
import subprocess
import html
import re
import email.parser
perfstep("import")
ACCOUNTS = dict()
ALIASES = set()
db = None
config = None
def open_database(write=False):
global db
mode = notmuch.Database.MODE.READ_WRITE if write else notmuch.Database.MODE.READ_ONLY
dbPath = os.path.realpath(os.path.expanduser(config["GENERAL"]["storage"]))
db = notmuch.Database(mode=mode, path=dbPath)
def close_database():
global db
db.close()
db = None
def generate_aliases():
for name in config.sections():
if not name.islower():
continue
section = config[name]
ALIASES.add(section["from"])
if "alternatives" in section:
for alt in section["alternatives"].split(";"):
ALIASES.add(alt)
ACCOUNTS[name] = section
def get_location(msg):
path = msg.get_filename()
path = os.path.dirname(path)
base = db.get_path()
assert path.startswith(base)
path = path[len(base):]
pathSplit = path.split('/')
mailbox = pathSplit[1]
assert mailbox in ACCOUNTS
state = pathSplit[-1]
folder = tuple(pathSplit[2:-1])
assert state in {'cur', 'tmp', 'new'}
return (mailbox, folder, state)
MAILBOX_COLORS = dict()
def get_mailbox_color(mailbox):
if mailbox not in MAILBOX_COLORS:
colorStr = config[mailbox]["color"]
colorStr = colorStr[1:] if colorStr[0] == '#' else colorStr
R = int(colorStr[0:2], 16)
G = int(colorStr[2:4], 16)
B = int(colorStr[4:6], 16)
MAILBOX_COLORS[mailbox] = '\x1b[38;2;{};{};{}m'.format(R, G, B)
return MAILBOX_COLORS[mailbox]
def format_date(date):
now = datetime.datetime.now()
midnight = datetime.datetime(year=now.year, month=now.month, day=now.day)
if date > midnight:
return date.strftime('%H:%M:%S')
else:
return date.strftime('%d/%m/%y')
WIDTH_FIXED = 31
WIDTH_RATIO_DEST_SUBJECT = 0.3
ISATTY = sys.stdout.isatty()
destWidth = None
subjectWidth = None
def compute_line_format():
if ISATTY:
columns, rows = shutil.get_terminal_size((80, 20))
remain = columns - WIDTH_FIXED - 1
global destWidth, subjectWidth
destWidth = int(remain * WIDTH_RATIO_DEST_SUBJECT)
subjectWidth = remain - destWidth
else:
destWidth = None
subjectWidth = None
def clip_text(size, text):
if size is None:
return text
l = len(text)
if l == size:
return text
elif l > size:
return text[:size-1] + ''
elif l < size:
return text + " " * (size - l)
def print_msg(msg):
if not destWidth:
compute_line_format()
sep = " " if ISATTY else "\t"
line = ""
tags = set(msg.get_tags())
mailbox, folder, state = get_location(msg)
if ISATTY:
line += get_mailbox_color(mailbox)
# UID
uid = None
for tag in tags:
if tag.startswith('tuid'):
uid = tag[4:]
assert isUID(uid), uid
line += uid
# Date
line += sep
date = datetime.datetime.fromtimestamp(msg.get_date())
line += format_date(date)
# Icons
line += sep
def tags2col1(tag1, tag2, both, first, second, none):
nonlocal line
if tag1 in tags:
if tag2 in tags:
line += both
else:
line += first
else:
if tag2 in tags:
line += second
else:
line += none
tags2col1('spam', 'draft', '?', 'S', 'D', ' ')
tags2col1('attachment', 'encrypted', 'E', 'A', 'E', ' ')
tags2col1('unread', 'flagged', '!', 'U', 'F', ' ')
tags2col1('sent', 'replied', '?', '', '', ' ')
if 'sent' in tags:
dest = msg.get_header("to")
else:
dest = msg.get_header("from")
line += sep
line += clip_text(destWidth, dest)
# Subject
line += sep
subject = msg.get_header("subject")
line += clip_text(subjectWidth, subject)
if ISATTY:
line += colorama.Style.RESET_ALL
print(line)
def retag_msg(msg):
mailbox, folder, state = get_location(msg)
# Search-friendly folder name
slugFolderList = list()
for f, fold in [(f, folder[f]) for f in range(len(folder))]:
if f == 0 and len(folder) > 1 and fold == "INBOX":
continue
slugFolderList.append(fold.upper())
slugFolder = tuple(slugFolderList)
tags = set(msg.get_tags())
def tag_if(tag, condition):
if condition and tag not in tags:
msg.add_tag(tag)
elif not condition and tag in tags:
msg.remove_tag(tag)
expeditor = extract_email(msg.get_header('from'))
tag_if('inbox', slugFolder[0] == 'INBOX')
tag_if('spam', slugFolder[0] == 'JUNK' or slugFolder[0] == 'SPAM')
tag_if('deleted', slugFolder[0] == 'TRASH')
tag_if('draft', slugFolder[0] == 'DRAFTS')
tag_if('sent', expeditor in ALIASES)
tag_if('unprocessed', False)
# UID
uid = msg.get_header("X-TUID")
assert isUID(uid)
uidtag = 'tuid{}'.format(uid)
# Remove eventual others UID
for tag in tags:
if tag.startswith('tuid') and tag != uidtag:
msg.remove_tag(tag)
msg.add_tag(uidtag)
def extract_email(field):
try:
sta = field.index('<')
sto = field.index('>')
return field[sta+1:sto]
except ValueError:
return field
def applyMsgs(queryStr, action, *args, showProgress=False, write=False, closeDb=True, **kwargs):
# TODO Verify it's open in the correct mode
if db is None:
open_database(write=write)
log.info("Querying {}".format(queryStr))
query = notmuch.Query(db, queryStr)
query.set_sort(notmuch.Query.SORT.OLDEST_FIRST)
elements = query.search_messages()
nbMsgs = query.count_messages()
iterator = progressbar.progressbar(elements, max_value=nbMsgs) if showProgress else elements
log.info("Executing {}".format(action))
for msg in iterator:
if write:
msg.freeze()
action(msg, *args, **kwargs)
if write:
msg.thaw()
msg.tags_to_maildir_flags()
if closeDb:
close_database()
return nbMsgs
# def update_polybar_status():
def update_polybar_status(*args, **kwargs):
log.info("Updating polybar status")
accountsList = sorted(ACCOUNTS.keys())
print(accountsList)
open_database()
statusArr = []
for account in accountsList:
queryStr = 'folder:/{}/ and tag:unread'.format(account)
query = notmuch.Query(db, queryStr)
nbMsgs = query.count_messages()
if nbMsgs < 1:
continue
color = config[account]['color']
statusAccStr = '%{F' + color + '}' + str(nbMsgs) + '%{F-}'
statusArr.append(statusAccStr)
close_database()
statusStr = ('_' + ' '.join(statusArr)) if len(statusArr) else '\n'
statusPath = os.path.expanduser("~/.cache/mutt/status") # TODO Better
with open(statusPath, 'w') as f:
f.write(statusStr)
# statusPath = os.path.expanduser("~/.cache/mel/polybarstatus") # TODO Better
def notify_msg(msg):
log.info("Sending notification for {}".format(msg))
subject = msg.get_header("subject")
expd = msg.get_header("from")
account, _, _ = get_location(msg)
summary = '{} (<i>{}</i>)'.format(html.escape(expd), account)
body = html.escape(subject)
cmd = ["notify-send", "-u", "low", "-i", "mail-message-new", summary, body]
print(' '.join(cmd))
subprocess.run(cmd)
def notify_all(*args, **kwargs):
open_database()
nbMsgs = applyMsgs('tag:unread and tag:unprocessed', notify_msg)
if nbMsgs > 0:
log.info("Playing notification sound ({} new message(s))".format(nbMsgs))
cmd = ["play", "-n", "synth", "sine", "E4", "sine", "A5", "remix", "1-2", "fade", "0.5", "1.2", "0.5", "2"]
subprocess.run(cmd)
close_database()
def isUID(uid):
return isinstance(uid, str) and len(uid) == 12 and re.match('^[a-zA-Z0-9+/]{12}$', uid)
# From https://stackoverflow.com/a/312464
def chunks(l, n):
"""Yield successive n-sized chunks from l."""
for i in range(0, len(l), n):
yield l[i:i + n]
def apply_msgs_input(argmessages, action, write=False):
if not len(argmessages):
fromStdin = not sys.stdin.isatty()
else:
fromStdin = len(argmessages) == 1 and argmessages == '-'
messages = list()
if fromStdin:
for line in sys.stdin:
uid = line[:12]
if not isUID(uid):
log.error("Not an UID: {}".format(uid))
continue
messages.append(uid)
else:
for uids in argmessages:
if len(uids) > 12:
log.warn("Might have forgotten some spaces between the UIDs. Don't worry, I'll split them for you")
for uid in chunks(uids, 12):
if not isUID(uid):
log.error("Not an UID: {}".format(uid))
continue
messages.append(uid)
for message in messages:
queryStr = 'tag:tuid{}'.format(message)
nbMsgs = applyMsgs(queryStr, action, write=write, closeDb=False)
if nbMsgs < 1:
log.error("Couldn't execute function for message {}".format(message))
close_database()
def format_header_value(val):
return val.replace('\n', '').replace('\t', '').strip()
# From https://stackoverflow.com/a/1094933
def sizeof_fmt(num, suffix='B'):
for unit in ['','Ki','Mi','Gi','Ti','Pi','Ei','Zi']:
if abs(num) < 1024.0:
return "%3.1f %s%s" % (num, unit, suffix)
num /= 1024.0
return "%.1f %s%s" % (num, 'Yi', suffix)
def show_parts_tree(part, lvl=0, nb=1):
indent = lvl * '\t'
typ = part.get_content_type()
if part.is_multipart():
print(PART_MULTI_FORMAT.format(nb=nb, indent=indent, typ=typ))
payl = part.get_payload()
size = 1
for obj in payl:
size += show_parts_tree(obj, lvl=lvl+1, nb=nb+size)
return size
else:
size = len(part.get_payload(decode=True))
desc = part.get('Content-Description', '<no description>')
print(PART_LEAF_FORMAT.format(nb=nb, indent=indent, typ=typ, desc=desc, size=sizeof_fmt(size)))
return 1
INTERESTING_HEADERS = ["Date", "From", "Subject", "To", "Cc", "Message-Id"]
HEADER_FORMAT = colorama.Fore.BLUE + colorama.Style.BRIGHT + '{}:' + colorama.Style.NORMAL + ' {}' + colorama.Style.RESET_ALL
PART_MULTI_FORMAT = colorama.Fore.BLUE + '{nb} {indent}+ {typ}' + colorama.Style.RESET_ALL
PART_LEAF_FORMAT = colorama.Fore.BLUE + '{nb} {indent}{desc} ({typ}; {size})' + colorama.Style.RESET_ALL
def read_msg(msg):
# Parse
filename = msg.get_filename()
parser = email.parser.BytesParser()
with open(filename, 'rb') as f:
mail = parser.parse(f)
# Debug
global a
a = mail
# Defects
if len(mail.defects):
log.warn("Defects found in the mail:")
for defect in mail.defects:
log.warn(mail.defects)
# Headers
for key in INTERESTING_HEADERS:
val = mail.get(key)
if val:
val = format_header_value(val)
print(HEADER_FORMAT.format(key, val))
# TODO Show all headers
# TODO BONUS Highlight failed verifications
show_parts_tree(mail)
print()
# Show text/plain
for part in mail.walk():
if part.get_content_type() == "text/plain":
payl = part.get_payload(decode=True)
print(payl.decode())
perfstep("definitions")
if __name__ == "__main__":
# Main arguments
parser = argparse.ArgumentParser(description="Meh mail client")
selectedVerbosityLevels = ["DEBUG", "INFO", "WARNING", "ERROR", "FATAL"]
parser.add_argument('-v', '--verbosity', choices=selectedVerbosityLevels, default='WARNING', help="Verbosity of log messages")
# parser.add_argument('-n', '--dry-run', action='store_true', help="Don't do anything") # DEBUG
defaultConfigFile = os.path.join(xdg.BaseDirectory.xdg_config_home, 'mel', 'accounts.conf')
parser.add_argument('-c', '--config', default=defaultConfigFile, help="Accounts config file")
subparsers = parser.add_subparsers(help="Action to execute")
## List messages
def func_default(args):
applyMsgs('tag:inbox', print_msg)
parser.set_defaults(func=func_default)
# inbox (default)
def func_inbox(args):
queryStr = 'tag:unread' if args.only_unread else 'tag:inbox'
applyMsgs(queryStr, print_msg)
parserInbox = subparsers.add_parser("inbox", help="Show unread, unsorted and flagged messages")
parserInbox.add_argument('-u', '--only-unread', action='store_true', help="Show unread messages only")
# TODO Make this more relevant
parserInbox.set_defaults(func=func_inbox)
# list folder [--recurse]
## List actions
# flag msg...
def func_flag(args):
def flag_msg(msg):
msg.add_tag('flagged')
apply_msgs_input(args.message, flag_msg, write=True)
parserFlag = subparsers.add_parser("flag", help="Mark messages as flagged")
parserFlag.add_argument('message', nargs='*', help="Messages")
parserFlag.set_defaults(func=func_flag)
# unflag msg...
def func_unflag(args):
def unflag_msg(msg):
msg.remove_tag('flagged')
apply_msgs_input(args.message, unflag_msg, write=True)
parserUnflag = subparsers.add_parser("unflag", help="Mark messages as not-flagged")
parserUnflag.add_argument('message', nargs='*', help="Messages")
parserUnflag.set_defaults(func=func_unflag)
# delete msg...
# spam msg...
# move dest msg...
## Read message
# read msg [--html] [--plain] [--browser]
def func_read(args):
apply_msgs_input(args.message, read_msg)
parserRead = subparsers.add_parser("read", help="Read message")
parserRead.add_argument('message', nargs=1, help="Messages")
parserRead.set_defaults(func=func_read)
# attach msg [id] [--save] (list if no id, xdg-open else)
## Redaction
# new account
# reply msg [--all]
## Folder management
# tree [folder]
# mkdir folder
# rmdir folder (prevent if folder isn't empty (mail/subfolder))
# (yeah that should do)
## Meta
# setup (interactive thing maybe)
# fetch (mbsync, notmuch new, retag, notify; called by greater gods)
def func_fetch(args):
# Fetch mails
log.info("Fetching mails")
mbsyncConfigPath = os.path.expanduser("~/.mbsyncrc") # TODO Better
cmd = ["mbsync", "--config", mbsyncConfigPath, "--all"]
subprocess.run(cmd)
# Index new mails
log.info("Indexing mails")
log.error("TODO Can't `notmuch new` when database is already open!")
notmuchConfigPath = os.path.expanduser("~/.notmuchrc") # TODO Better
cmd = ["notmuch", "--config", notmuchConfigPath, "new"]
log.debug(" ".join(cmd))
subprocess.run(cmd)
# Notify
notify_all()
update_polybar_status()
# Tag new mails
applyMsgs('tag:unprocessed', retag_msg, showProgress=True, write=True)
parserFetch = subparsers.add_parser("fetch", help="Fetch mail, tag them, and run notifications")
parserFetch.set_defaults(func=func_fetch)
## Debug
# debug (various)
def func_expose(args):
# And leave the door open
def expose_msg(a):
global msg
msg = a
applyMsgs('tag:tuidyviU45m6flff', expose_msg, closeDb=False)
parserDebug = subparsers.add_parser("debug", help="Who know what this holds...")
parserDebug.set_defaults(verbosity='DEBUG')
parserDebug.set_defaults(func=func_expose)
# retag (all or unprocessed)
def func_retag(args):
applyMsgs('*', retag_msg, showProgress=True, write=True)
parserRetag = subparsers.add_parser("retag", help="Retag all mails (when you changed configuration)")
parserRetag.set_defaults(func=func_retag)
# all
def func_all(args):
applyMsgs('*', print_msg)
parserAll = subparsers.add_parser("all", help="Show ALL messages")
parserAll.set_defaults(func=func_all)
# Init
args = parser.parse_args()
perfstep("parse_args")
colorama.init()
coloredlogs.install(level=args.verbosity, fmt='%(levelname)s %(message)s')
log = logging.getLogger()
log.info("Loading config {}".format(args.config))
if not os.path.isfile(args.config):
log.fatal("Config file not found: {}".format(args.config))
sys.exit(1)
# TODO Create it, maybe?
config = configparser.ConfigParser()
config.read(args.config)
generate_aliases()
perfstep("config")
if args.func:
log.info("Executing function {}".format(args.func))
args.func(args)
perfstep("exec")
# DEBUG
sys.exit(0)
for kv in sorted(perf_dict.items(), key=lambda p: p[1]):
log.debug("{1:.6f} {0}".format(*kv))