233 lines
6.1 KiB
Python
Executable file
233 lines
6.1 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
|
|
"""
|
|
Meh mail client
|
|
"""
|
|
|
|
# TODO Features
|
|
# TODO (only then) Refactor
|
|
|
|
import notmuch
|
|
import logging
|
|
import coloredlogs
|
|
import colorama
|
|
import datetime
|
|
import os
|
|
import progressbar
|
|
import time
|
|
import argparse
|
|
import configparser
|
|
import base64
|
|
import shutil
|
|
|
|
colorama.init()
|
|
coloredlogs.install(level='DEBUG', fmt='%(levelname)s %(message)s')
|
|
log = logging.getLogger()
|
|
|
|
log.debug("Loading config")
|
|
|
|
# TODO XDG
|
|
configPath = os.path.join(os.path.expanduser('~'), '.config', 'mel', 'accounts.conf')
|
|
|
|
config = configparser.ConfigParser()
|
|
config.read(configPath)
|
|
|
|
# Reading config a bit
|
|
accounts = dict()
|
|
mails = set()
|
|
for name in config.sections():
|
|
if not name.islower():
|
|
continue
|
|
section = config[name]
|
|
mails.add(section["from"])
|
|
if "alternatives" in section:
|
|
for alt in section["alternatives"].split(";"):
|
|
mails.add(alt)
|
|
accounts[name] = section
|
|
|
|
log.debug("Loading database")
|
|
|
|
dbPath = os.path.realpath(os.path.expanduser(config["GENERAL"]["storage"]))
|
|
db = notmuch.Database(mode=notmuch.Database.MODE.READ_WRITE, path=dbPath)
|
|
# TODO Open read-only when needed
|
|
|
|
log.debug("Database loaded")
|
|
|
|
def get_location(msg):
|
|
path = msg.get_filename()
|
|
path = os.path.dirname(path)
|
|
base = db.get_path()
|
|
assert path.startswith(base)
|
|
path = path[len(base):]
|
|
pathSplit = path.split('/')
|
|
mailbox = pathSplit[1]
|
|
state = pathSplit[-1]
|
|
folder = tuple(pathSplit[2:-1])
|
|
assert state in {'cur', 'tmp', 'new'}
|
|
return (mailbox, folder, state)
|
|
|
|
MAILBOX_COLORS = dict()
|
|
|
|
def get_mailbox_color(mailbox):
|
|
if mailbox not in MAILBOX_COLORS:
|
|
colorStr = config[mailbox]["color"]
|
|
colorStr = colorStr[1:] if colorStr[0] == '#' else colorStr
|
|
R = int(colorStr[0:2], 16)
|
|
G = int(colorStr[2:4], 16)
|
|
B = int(colorStr[4:6], 16)
|
|
MAILBOX_COLORS[mailbox] = '\x1b[38;2;{};{};{}m'.format(R, G, B)
|
|
return MAILBOX_COLORS[mailbox]
|
|
|
|
def format_date(date):
|
|
now = datetime.datetime.now()
|
|
midnight = datetime.datetime(year=now.year, month=now.month, day=now.day)
|
|
if date > midnight:
|
|
return date.strftime('%H:%M:%S')
|
|
else:
|
|
return date.strftime('%d/%m/%y')
|
|
|
|
def threadIdToB64(tid):
|
|
assert len(tid) == 16
|
|
tidInt = int(tid, 16)
|
|
tidBytes = tidInt.to_bytes(8, 'big')
|
|
tidB64 = base64.b64encode(tidBytes)
|
|
assert len(tidB64) == 12
|
|
return tidB64.decode()
|
|
|
|
WIDTH_FIXED = 30
|
|
WIDTH_RATIO_DEST_SUBJECT = 0.3
|
|
destWidth = None
|
|
subjectWidth = None
|
|
def compute_line_format():
|
|
columns, rows = shutil.get_terminal_size((80, 20))
|
|
remain = columns - WIDTH_FIXED - 1
|
|
global destWidth, subjectWidth
|
|
destWidth = int(remain * WIDTH_RATIO_DEST_SUBJECT)
|
|
subjectWidth = remain - destWidth
|
|
|
|
def clip_text(size, text):
|
|
l = len(text)
|
|
if l == size:
|
|
return text
|
|
elif l > size:
|
|
return text[:size-1] + '…'
|
|
elif l < size:
|
|
return text + " " * (size - l)
|
|
|
|
|
|
def print_msg(msg):
|
|
if not destWidth:
|
|
compute_line_format()
|
|
|
|
line = ""
|
|
tags = set(msg.get_tags())
|
|
mailbox, folder, state = get_location(msg)
|
|
line += get_mailbox_color(mailbox)
|
|
|
|
# ID
|
|
line += threadIdToB64(msg.get_thread_id())
|
|
# line += str(int(msg.get_thread_id(), 16))
|
|
|
|
# Date
|
|
line += " "
|
|
date = datetime.datetime.fromtimestamp(msg.get_date())
|
|
line += format_date(date)
|
|
|
|
# Icons
|
|
line += " "
|
|
def tags2col1(tag1, tag2, both, first, second, none):
|
|
nonlocal line
|
|
if tag1 in tags:
|
|
if tag2 in tags:
|
|
line += both
|
|
else:
|
|
line += first
|
|
else:
|
|
if tag2 in tags:
|
|
line += second
|
|
else:
|
|
line += none
|
|
|
|
tags2col1('spam', 'draft', '?', 'S', 'D', ' ')
|
|
tags2col1('attachment', 'encrypted', 'E', 'A', 'E', ' ')
|
|
tags2col1('unread', 'flagged', '!', 'U', 'F', ' ')
|
|
tags2col1('sent', 'replied', '?', '↑', '↪', ' ')
|
|
|
|
if 'sent' in tags:
|
|
dest = msg.get_header("to")
|
|
else:
|
|
dest = msg.get_header("from")
|
|
line += " "
|
|
line += clip_text(destWidth, dest)
|
|
|
|
# Subject
|
|
line += " "
|
|
subject = msg.get_header("subject")
|
|
line += clip_text(subjectWidth, subject)
|
|
|
|
line += colorama.Style.RESET_ALL
|
|
print(line)
|
|
|
|
|
|
def retag_msg(msg):
|
|
msg.freeze()
|
|
mailbox, folder, state = get_location(msg)
|
|
|
|
# Search-friendly folder name
|
|
slugFolderList = list()
|
|
for f, fold in [(f, folder[f]) for f in range(len(folder))]:
|
|
if f == 0 and len(folder) > 1 and fold == "INBOX":
|
|
continue
|
|
slugFolderList.append(fold.upper())
|
|
slugFolder = tuple(slugFolderList)
|
|
|
|
tags = set(msg.get_tags())
|
|
def tag_if(tag, condition):
|
|
if condition and tag not in tags:
|
|
msg.add_tag(tag)
|
|
elif not condition and tag in tags:
|
|
msg.remove_tag(tag)
|
|
expeditor = extract_email(msg.get_header('from'))
|
|
|
|
tag_if('inbox', slugFolder[0] == 'INBOX')
|
|
tag_if('spam', slugFolder[0] == 'JUNK' or slugFolder[0] == 'SPAM')
|
|
tag_if('deleted', slugFolder[0] == 'TRASH')
|
|
tag_if('draft', slugFolder[0] == 'DRAFTS')
|
|
tag_if('sent', expeditor in mails)
|
|
|
|
# Save
|
|
msg.thaw()
|
|
msg.tags_to_maildir_flags()
|
|
|
|
|
|
def applyMsgs(queryStr, function, *args, useProgressbar=False, **kwargs):
|
|
query = notmuch.Query(db, queryStr)
|
|
query.set_sort(notmuch.Query.SORT.OLDEST_FIRST)
|
|
msgs = query.search_messages()
|
|
if useProgressbar:
|
|
nbMsgs = query.count_messages()
|
|
iterator = progressbar.progressbar(msgs, max_value=nbMsgs)
|
|
else:
|
|
iterator = msgs
|
|
for msg in iterator:
|
|
function(msg, *args, **kwargs)
|
|
|
|
def extract_email(field):
|
|
try:
|
|
sta = field.index('<')
|
|
sto = field.index('>')
|
|
return field[sta+1:sto]
|
|
except ValueError:
|
|
return field
|
|
|
|
# applyMsgs('*', print_msg)
|
|
applyMsgs('tag:inbox', print_msg)
|
|
# applyMsgs('tag:spam', print_msg)
|
|
# applyMsgs('tag:unread', print_msg)
|
|
# applyMsgs('tag:unprocessed', print_msg)
|
|
# applyMsgs('from:geoffrey@frogeye.fr', print_msg)
|
|
|
|
# applyMsgs('tag:unprocessed', retag_msg, useProgressbar=True)
|
|
# applyMsgs('*', retag_msg, useProgressbar=True)
|
|
|