dotfiles/config/scripts/mel

884 lines
31 KiB
Plaintext
Raw Normal View History

2018-08-13 12:20:09 +02:00
#!/usr/bin/env python3
# pylint: disable=E1101
2018-08-13 12:20:09 +02:00
"""
Meh mail client
2018-08-14 17:23:57 +02:00
A dumb Python scripts that leverages notmuch, mbsync, and msmtp
to become a fully-functional extremly-opinonated mail client.
2018-08-13 12:20:09 +02:00
"""
2018-08-13 17:59:40 +02:00
# TODO Features
2019-10-26 17:09:22 +02:00
# TODO Implement initial command set
# TODO Lockfiles for write operations on mail files (mbsync,
# tags→maildir operations)
# TODO OPTI Lockfile per account and process everything in parallel
# (if implemented, this should be optional since while it may speed up
# the mail fetching process, its multi-threading nature would cause a
# lot of cache flushes and be not very efficient on battery)
# TODO Handle true character width
# TODO IMAP IDLE watches?
# TODO GPG
2018-08-13 17:59:40 +02:00
# TODO (only then) Refactor
2019-10-26 17:09:22 +02:00
# TODO Merge file with melConf
# TODO Config system revamp
2018-08-14 17:23:57 +02:00
2019-10-26 17:09:22 +02:00
import argparse
import configparser
import datetime
import email.message
import email.parser
import html
import logging
2019-11-01 18:34:45 +01:00
import mailcap
2019-10-26 17:09:22 +02:00
import os
import pdb
2019-10-26 17:09:22 +02:00
import re
import shutil
import subprocess
import sys
import traceback
2019-10-26 17:09:22 +02:00
import typing
2018-08-13 17:59:40 +02:00
2018-08-13 12:20:09 +02:00
import colorama
2019-10-26 17:09:22 +02:00
import coloredlogs
import notmuch
2018-08-13 12:20:09 +02:00
import progressbar
2018-08-14 10:08:59 +02:00
import xdg.BaseDirectory
2018-08-14 17:23:57 +02:00
MailLocation = typing.NewType('MailLocation', typing.Tuple[str, str, str])
2019-11-01 18:34:45 +01:00
# MessageAction = typing.Callable[[notmuch.Message], None]
2019-10-26 17:09:22 +02:00
class MelEngine:
2019-10-26 17:09:22 +02:00
"""
Class with all the functions for manipulating the database / mails.
2019-10-26 17:09:22 +02:00
"""
2018-08-17 15:08:40 +02:00
def load_config(self, config_path: str) -> configparser.ConfigParser:
"""
Load the configuration file into MelEngine
"""
self.log.info("Loading config file: %s", config_path)
if not os.path.isfile(config_path):
self.log.fatal("Config file not found!")
sys.exit(1)
# TODO Create it, maybe?
config = configparser.ConfigParser()
config.read(config_path)
# NOTE An empty/inexistant file while give an empty config
return config
def generate_aliases(self) -> None:
"""
Populate MelEngine.aliases and MelEngine.accounts
"""
assert self.config
for name in self.config.sections():
if not name.islower():
2018-08-17 15:08:40 +02:00
continue
section = self.config[name]
self.aliases.add(section["from"])
if "alternatives" in section:
for alt in section["alternatives"].split(";"):
self.aliases.add(alt)
self.accounts[name] = section
2019-10-26 17:09:22 +02:00
def __init__(self, config_path: str) -> None:
2019-11-01 18:34:45 +01:00
self.log = logging.getLogger("MelEngine")
2019-10-26 17:09:22 +02:00
self.config = self.load_config(config_path)
2019-10-26 17:09:22 +02:00
self.database = None
2019-10-26 17:09:22 +02:00
# Caches
self.accounts: typing.Dict[str, configparser.SectionProxy] = dict()
# All the emails the user is represented as:
self.aliases: typing.Set[str] = set()
# TODO If the user send emails to himself, maybe that wont cut it.
2018-08-13 12:20:09 +02:00
self.generate_aliases()
2019-10-26 17:09:22 +02:00
def notmuch_new(self) -> None:
"""
Runs `notmuch new`, which basically update the database
to match the mail folder.
"""
assert not self.database
self.log.info("Indexing mails")
notmuch_config_file = os.path.expanduser(
"~/.config/notmuch-config") # TODO Better
cmd = ["notmuch", "--config", notmuch_config_file, "new"]
self.log.debug(" ".join(cmd))
subprocess.run(cmd, check=True)
2019-10-26 17:09:22 +02:00
def list_folders(self) -> typing.List[typing.Tuple[str, ...]]:
"""
List all the folders of the mail dir.
"""
assert self.config
storage_path = os.path.realpath(
os.path.expanduser(self.config["GENERAL"]["storage"]))
folders = list()
for account in self.accounts:
storage_path_account = os.path.join(storage_path, account)
for root, dirs, _ in os.walk(storage_path_account):
if "cur" not in dirs or "new" not in dirs or "tmp" not in dirs:
continue
assert root.startswith(storage_path)
path = root[len(storage_path):]
path_split = path.split('/')
if path_split[0] == '':
path_split = path_split[1:]
folders.append(tuple(path_split))
return folders
def open_database(self, write: bool = False) -> None:
"""
Open an access notmuch database in read or read+write mode.
Be sure to require only in the mode you want to avoid deadlocks.
"""
assert self.config
mode = notmuch.Database.MODE.READ_WRITE if write \
else notmuch.Database.MODE.READ_ONLY
if self.database:
# If the requested mode is the one already present,
# or we request read when it's already write, do nothing
if mode in (self.database.mode, notmuch.Database.MODE.READ_ONLY):
return
self.log.info("Current database not in mode %s, closing", mode)
self.close_database()
self.log.info("Opening database in mode %s", mode)
db_path = os.path.realpath(
os.path.expanduser(self.config["GENERAL"]["storage"]))
self.database = notmuch.Database(mode=mode, path=db_path)
def close_database(self) -> None:
"""
Close the access notmuch database.
"""
if self.database:
self.log.info("Closing database")
self.database.close()
self.database = None
2018-08-13 12:20:09 +02:00
def get_location(self, msg: notmuch.Message) -> MailLocation:
"""
Return the filesystem location (relative to the mail directory)
of the given message.
"""
path = msg.get_filename()
path = os.path.dirname(path)
assert self.database
base = self.database.get_path()
assert path.startswith(base)
path = path[len(base):]
path_split = path.split('/')
mailbox = path_split[1]
assert mailbox in self.accounts
state = path_split[-1]
folder = tuple(path_split[2:-1])
assert state in {'cur', 'tmp', 'new'}
return (mailbox, folder, state)
@staticmethod
def is_uid(uid: typing.Any) -> bool:
"""
Tells if the provided string is a valid UID.
"""
return isinstance(uid, str) and len(uid) == 12 \
and bool(re.match('^[a-zA-Z0-9+/]{12}$', uid))
2019-10-26 17:09:22 +02:00
@staticmethod
def extract_email(field: str) -> str:
"""
Extract the email adress from a To: or From: field
(usually the whole field or between < >)
"""
# TODO Can be made better (extract name and email)
# Also what happens with multiple dests?
try:
sta = field.index('<')
sto = field.index('>')
return field[sta+1:sto]
except ValueError:
return field
def retag_msg(self, msg: notmuch.Message) -> None:
"""
Update automatic tags for message.
"""
_, folder, _ = self.get_location(msg)
2019-10-26 17:09:22 +02:00
# Search-friendly folder name
slug_folder_list = list()
for fold_index, fold in [(fold_index, folder[fold_index])
for fold_index in range(len(folder))]:
if fold_index == 0 and len(folder) > 1 and fold == "INBOX":
continue
slug_folder_list.append(fold.upper())
slug_folder = tuple(slug_folder_list)
2019-10-26 17:09:22 +02:00
tags = set(msg.get_tags())
2019-10-26 17:09:22 +02:00
def tag_if(tag: str, condition: bool) -> None:
"""
Ensure the presence/absence of tag depending on the condition.
"""
nonlocal msg
if condition and tag not in tags:
msg.add_tag(tag)
elif not condition and tag in tags:
msg.remove_tag(tag)
expeditor = MelEngine.extract_email(msg.get_header('from'))
tag_if('inbox', slug_folder[0] == 'INBOX')
tag_if('spam', slug_folder[0] in ('JUNK', 'SPAM'))
tag_if('deleted', slug_folder[0] == 'TRASH')
tag_if('draft', slug_folder[0] == 'DRAFTS')
tag_if('sent', expeditor in self.aliases)
tag_if('unprocessed', False)
# UID
uid = msg.get_header("X-TUID")
if not MelEngine.is_uid(uid):
# TODO Happens to sent mails but should it?
print(f"{msg.get_filename()} has no UID!")
return
uidtag = 'tuid{}'.format(uid)
# Remove eventual others UID
for tag in tags:
if tag.startswith('tuid') and tag != uidtag:
msg.remove_tag(tag)
msg.add_tag(uidtag)
def apply_msgs(self, query_str: str, action: typing.Callable,
*args: typing.Any, show_progress: bool = False,
write: bool = False, close_db: bool = True,
**kwargs: typing.Any) -> int:
"""
Run a function on the messages selected by the given query.
"""
self.open_database(write=write)
2018-08-13 17:59:40 +02:00
self.log.info("Querying %s", query_str)
query = notmuch.Query(self.database, query_str)
query.set_sort(notmuch.Query.SORT.OLDEST_FIRST)
2018-08-13 12:20:09 +02:00
elements = query.search_messages()
nb_msgs = query.count_messages()
2018-08-13 12:20:09 +02:00
2019-11-01 18:34:45 +01:00
iterator = progressbar.progressbar(elements, max_value=nb_msgs) \
if show_progress and nb_msgs else elements
2018-08-13 12:20:09 +02:00
self.log.info("Executing %s", action)
for msg in iterator:
2019-11-01 18:34:45 +01:00
self.log.debug("On mail %s", msg)
if write:
msg.freeze()
2018-08-13 12:20:09 +02:00
action(msg, *args, **kwargs)
2018-08-13 17:59:40 +02:00
if write:
msg.thaw()
msg.tags_to_maildir_flags()
2018-08-13 12:20:09 +02:00
if close_db:
self.close_database()
2018-08-13 12:20:09 +02:00
return nb_msgs
2019-10-26 17:09:22 +02:00
class MelOutput:
2019-10-26 17:09:22 +02:00
"""
All functions that print mail stuff onto the screen.
2019-10-26 17:09:22 +02:00
"""
2018-08-14 17:23:57 +02:00
WIDTH_FIXED = 31
WIDTH_RATIO_DEST_SUBJECT = 0.3
2018-08-14 10:08:59 +02:00
def compute_line_format(self) -> typing.Tuple[typing.Optional[int],
typing.Optional[int]]:
"""
Based on the terminal width, assign the width of flexible columns.
"""
if self.is_tty:
columns, _ = shutil.get_terminal_size((80, 20))
remain = columns - MelOutput.WIDTH_FIXED - 1
dest_width = int(remain * MelOutput.WIDTH_RATIO_DEST_SUBJECT)
subject_width = remain - dest_width
return (dest_width, subject_width)
return (None, None)
2018-08-14 10:08:59 +02:00
def __init__(self, engine: MelEngine) -> None:
2019-11-01 18:34:45 +01:00
colorama.init()
self.log = logging.getLogger("MelOutput")
2018-08-14 17:23:57 +02:00
self.engine = engine
2018-08-14 17:23:57 +02:00
2019-11-01 18:34:45 +01:00
self.light_background = True
self.is_tty = sys.stdout.isatty()
self.dest_width, self.subject_width = self.compute_line_format()
2019-11-01 18:34:45 +01:00
self.mailbox_colors: typing.Dict[str, str] = dict()
# TODO Allow custom path
self.caps = mailcap.getcaps()
2018-08-14 17:23:57 +02:00
@staticmethod
def format_date(date: datetime.datetime) -> str:
"""
Format the given date as a 9-characters width string.
Show the time if the mail is less than 24h old,
else show the date.
"""
now = datetime.datetime.now()
if now - date < datetime.timedelta(days=1):
return date.strftime('%H:%M:%S')
2019-11-01 18:34:45 +01:00
if now - date < datetime.timedelta(days=28):
return date.strftime('%d %H:%M')
if now - date < datetime.timedelta(days=365):
return date.strftime('%m-%d %H')
return date.strftime('%y-%m-%d')
2019-10-26 17:09:22 +02:00
@staticmethod
def clip_text(size: typing.Optional[int], text: str) -> str:
"""
Fit text into the given character size,
fill with spaces if shorter,
clip with … if larger.
"""
if size is None:
return text
length = len(text)
if length == size:
return text
if length > size:
return text[:size-1] + '…'
return text + ' ' * (size - length)
@staticmethod
def chunks(iterable: str, chunk_size: int) -> typing.Iterable[str]:
"""Yield successive chunk_size-sized chunks from iterable."""
# From https://stackoverflow.com/a/312464
for i in range(0, len(iterable), chunk_size):
yield iterable[i:i + chunk_size]
@staticmethod
def sizeof_fmt(num: int, suffix: str = 'B') -> str:
"""
Print the given size in a human-readable format.
"""
remainder = float(num)
# From https://stackoverflow.com/a/1094933
for unit in ['', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi']:
if abs(remainder) < 1024.0:
return "%3.1f %s%s" % (remainder, unit, suffix)
remainder /= 1024.0
return "%.1f %s%s" % (remainder, 'Yi', suffix)
2019-11-01 18:34:45 +01:00
def get_mailbox_color(self, mailbox: str) -> str:
"""
Return the color of the given mailbox in a ready to print
string with ASCII escape codes.
"""
if not self.is_tty:
return ''
if mailbox not in self.mailbox_colors:
# RGB colors (not supported everywhere)
# color_str = self.config[mailbox]["color"]
# color_str = color_str[1:] if color_str[0] == '#' else color_str
# R = int(color_str[0:2], 16)
# G = int(color_str[2:4], 16)
# B = int(color_str[4:6], 16)
# self.mailbox_colors[mailbox] = f"\x1b[38;2;{R};{G};{B}m"
color_int = int(self.engine.config[mailbox]["color16"])
self.mailbox_colors[mailbox] = f"\x1b[38;5;{color_int}m"
return self.mailbox_colors[mailbox]
def print_msg(self, msg: notmuch.Message) -> None:
"""
Print the given message header on one line.
"""
if not self.dest_width:
self.compute_line_format()
sep = " " if self.is_tty else "\t"
line = ""
tags = set(msg.get_tags())
mailbox, _, _ = self.engine.get_location(msg)
2019-11-01 18:34:45 +01:00
if 'unread' in tags or 'flagged' in tags:
line += colorama.Style.BRIGHT
# if 'flagged' in tags:
# line += colorama.Style.BRIGHT
# if 'unread' not in tags:
# line += colorama.Style.DIM
line += colorama.Back.LIGHTBLACK_EX if self.light_background \
else colorama.Back.BLACK
self.light_background = not self.light_background
line += self.get_mailbox_color(mailbox)
# UID
uid = None
for tag in tags:
if tag.startswith('tuid'):
uid = tag[4:]
2019-11-01 18:34:45 +01:00
assert uid, f"No UID for message: {msg}."
assert MelEngine.is_uid(uid), f"{uid} {type(uid)} is not a valid UID."
line += uid
# Date
2019-11-01 18:34:45 +01:00
line += sep + colorama.Fore.MAGENTA
date = datetime.datetime.fromtimestamp(msg.get_date())
line += self.format_date(date)
# Icons
2019-11-01 18:34:45 +01:00
line += sep + colorama.Fore.RED
def tags2col1(tag1: str, tag2: str,
characters: typing.Tuple[str, str, str, str]) -> None:
"""
Show the presence/absence of two tags with one character.
"""
nonlocal line
both, first, second, none = characters
if tag1 in tags:
if tag2 in tags:
line += both
else:
line += first
else:
if tag2 in tags:
line += second
else:
line += none
tags2col1('spam', 'draft', ('?', 'S', 'D', ' '))
tags2col1('attachment', 'encrypted', ('E', 'A', 'E', ' '))
tags2col1('unread', 'flagged', ('!', 'U', 'F', ' '))
tags2col1('sent', 'replied', ('?', '↑', '↪', ' '))
2019-11-01 18:34:45 +01:00
# Opposed
line += sep + colorama.Fore.BLUE
if 'sent' in tags:
dest = msg.get_header("to")
else:
dest = msg.get_header("from")
line += MelOutput.clip_text(self.dest_width, dest)
2018-08-14 17:23:57 +02:00
# Subject
2019-11-01 18:34:45 +01:00
line += sep + colorama.Fore.WHITE
subject = msg.get_header("subject")
line += MelOutput.clip_text(self.subject_width, subject)
2018-08-14 17:23:57 +02:00
if self.is_tty:
line += colorama.Style.RESET_ALL
print(line)
2018-08-14 17:23:57 +02:00
def notify_msg(self, msg: notmuch.Message) -> None:
"""
Send a notification for the given message.
"""
self.log.info("Sending notification for %s", msg)
subject = msg.get_header("subject")
expd = msg.get_header("from")
account, _, _ = self.engine.get_location(msg)
summary = '{} (<i>{}</i>)'.format(html.escape(expd), account)
body = html.escape(subject)
cmd = ["notify-send", "-u", "low", "-i",
"mail-message-new", summary, body]
print(' '.join(cmd))
2019-10-26 17:09:22 +02:00
subprocess.run(cmd, check=False)
2018-08-14 17:23:57 +02:00
def notify_all(self) -> None:
"""
Send a notification for unprocessed and unread message.
Basically should only send a notification for a given message once
since it should be marked as processed right after.
"""
nb_msgs = self.engine.apply_msgs(
'tag:unread and tag:unprocessed', self.notify_msg)
if nb_msgs > 0:
self.log.info(
"Playing notification sound (%d new message(s))", nb_msgs)
cmd = ["play", "-n", "synth", "sine", "E4", "sine", "A5",
"remix", "1-2", "fade", "0.5", "1.2", "0.5", "2"]
subprocess.run(cmd, check=False)
@staticmethod
def format_header_value(val: str) -> str:
"""
Return split header values in a contiguous string.
"""
return val.replace('\n', '').replace('\t', '').strip()
2018-08-14 17:23:57 +02:00
PART_MULTI_FORMAT = colorama.Fore.BLUE + \
'{count} {indent}+ {typ}' + colorama.Style.RESET_ALL
PART_LEAF_FORMAT = colorama.Fore.BLUE + \
'{count} {indent}→ {desc} ({typ}; {size})' + \
colorama.Style.RESET_ALL
2019-10-26 17:09:22 +02:00
def show_parts_tree(self, part: email.message.Message,
depth: int = 0, count: int = 1) -> int:
"""
Show a tree of the parts contained in a message.
Return the number of parts of the mesage.
"""
indent = depth * '\t'
typ = part.get_content_type()
if part.is_multipart():
print(MelOutput.PART_MULTI_FORMAT.format(
count=count, indent=indent, typ=typ))
payl = part.get_payload()
assert isinstance(payl, list)
size = 1
for obj in payl:
size += self.show_parts_tree(obj, depth=depth+1,
count=count+size)
return size
payl = part.get_payload(decode=True)
assert isinstance(payl, bytes)
size = len(payl)
desc = part.get('Content-Description', '<no description>')
print(MelOutput.PART_LEAF_FORMAT.format(
count=count, indent=indent, typ=typ, desc=desc,
size=MelOutput.sizeof_fmt(size)))
return 1
INTERESTING_HEADERS = ["Date", "From", "Subject", "To", "Cc", "Message-Id"]
HEADER_FORMAT = colorama.Fore.BLUE + colorama.Style.BRIGHT + \
'{}:' + colorama.Style.NORMAL + ' {}' + colorama.Style.RESET_ALL
def read_msg(self, msg: notmuch.Message) -> None:
"""
Display the content of a mail.
"""
# Parse
filename = msg.get_filename()
parser = email.parser.BytesParser()
with open(filename, 'rb') as filedesc:
mail = parser.parse(filedesc)
# Defects
if mail.defects:
self.log.warning("Defects found in the mail:")
for defect in mail.defects:
self.log.warning(defect)
# Headers
for key in MelOutput.INTERESTING_HEADERS:
val = mail.get(key)
if val:
assert isinstance(val, str)
val = self.format_header_value(val)
print(MelOutput.HEADER_FORMAT.format(key, val))
# TODO Show all headers
# TODO BONUS Highlight failed verifications
self.show_parts_tree(mail)
print()
# Show text/plain
2019-11-01 18:34:45 +01:00
# TODO Consider alternative
for part in mail.walk():
2019-11-01 18:34:45 +01:00
if part.is_multipart():
continue
payl = part.get_payload(decode=True)
assert isinstance(payl, bytes)
if part.get_content_type() == "text/plain":
print(payl.decode())
2019-11-01 18:34:45 +01:00
else:
# TODO Use nametemplate from mailcap
temp_file = '/tmp/melcap.html' # TODO Real temporary file
# TODO FIFO if possible
with open(temp_file, 'wb') as temp_filedesc:
temp_filedesc.write(payl)
command, _ = mailcap.findmatch(
self.caps, part.get_content_type(), key='view', filename=temp_file)
if command:
os.system(command)
def print_dir_list(self) -> None:
"""
Print a colored directory list.
Every line is easilly copiable.
"""
for arb in self.engine.list_folders():
line = colorama.Fore.LIGHTBLACK_EX + "'"
line += self.get_mailbox_color(arb[0])
line += arb[0].replace("'", "\\'")
line += colorama.Fore.LIGHTBLACK_EX
for inter in arb[1:-1]:
line += '/' + inter.replace("'", "\\'")
line += '/' + colorama.Fore.WHITE + arb[-1].replace("'", "\\'")
line += colorama.Fore.LIGHTBLACK_EX + "'"
line += colorama.Style.RESET_ALL
print(line)
class MelCLI():
"""
Handles the user input and run asked operations.
"""
VERBOSITY_LEVELS = ["DEBUG", "INFO", "WARNING", "ERROR", "FATAL"]
def apply_msgs_input(self, argmessages: typing.List[str],
action: typing.Callable, write: bool = False) -> None:
"""
Run a function on the message given by the user.
"""
# TODO First argument might be unecessary
if not argmessages:
from_stdin = not sys.stdin.isatty()
if argmessages:
from_stdin = len(argmessages) == 1 and argmessages == '-'
messages = list()
if from_stdin:
for line in sys.stdin:
uid = line[:12]
if not MelEngine.is_uid(uid):
self.log.error("Not an UID: %s", uid)
2018-08-14 17:23:57 +02:00
continue
messages.append(uid)
else:
for uids in argmessages:
if len(uids) > 12:
self.log.warning("Might have forgotten some spaces "
"between the UIDs. Don't worry, I'll "
"split them for you")
for uid in MelOutput.chunks(uids, 12):
if not MelEngine.is_uid(uid):
self.log.error("Not an UID: %s", uid)
continue
messages.append(uid)
for message in messages:
query_str = f'tag:tuid{message}'
2019-11-01 18:34:45 +01:00
nb_msgs = self.engine.apply_msgs(query_str, action,
write=write, close_db=False)
if nb_msgs < 1:
self.log.error(
"Couldn't execute function for message %s", message)
2019-11-01 18:34:45 +01:00
self.engine.close_database()
def operation_default(self) -> None:
2019-10-26 17:09:22 +02:00
"""
Default operation: list all message in the inbox
"""
self.engine.apply_msgs('tag:inbox', self.output.print_msg)
2018-08-14 10:08:59 +02:00
def operation_inbox(self) -> None:
2019-10-26 17:09:22 +02:00
"""
Inbox operation: list all message in the inbox,
possibly only the unread ones.
"""
query_str = 'tag:unread' if self.args.only_unread else 'tag:inbox'
self.engine.apply_msgs(query_str, self.output.print_msg)
2018-08-14 17:23:57 +02:00
def operation_flag(self) -> None:
2019-10-26 17:09:22 +02:00
"""
Flag operation: Flag user selected messages.
"""
def flag_msg(msg: notmuch.Message) -> None:
"""
Flag given message.
"""
2018-08-14 17:23:57 +02:00
msg.add_tag('flagged')
self.apply_msgs_input(self.args.message, flag_msg, write=True)
2019-10-26 17:09:22 +02:00
def operation_unflag(self) -> None:
2019-10-26 17:09:22 +02:00
"""
Unflag operation: Flag user selected messages.
"""
def unflag_msg(msg: notmuch.Message) -> None:
"""
Unflag given message.
"""
2018-08-14 17:23:57 +02:00
msg.remove_tag('flagged')
self.apply_msgs_input(self.args.message, unflag_msg, write=True)
2018-08-14 19:25:07 +02:00
def operation_read(self) -> None:
2019-10-26 17:09:22 +02:00
"""
Read operation: show full content of selected message
"""
self.apply_msgs_input(self.args.message, self.output.read_msg)
2018-08-14 10:08:59 +02:00
def operation_fetch(self) -> None:
2019-10-26 17:09:22 +02:00
"""
Fetch operation: Sync remote databases with the local one.
"""
2018-08-14 10:08:59 +02:00
# Fetch mails
self.log.info("Fetching mails")
mbsync_config_file = os.path.expanduser(
2019-10-26 17:09:22 +02:00
"~/.config/mbsyncrc") # TODO Better
cmd = ["mbsync", "--config", mbsync_config_file, "--all"]
2019-11-01 18:34:45 +01:00
subprocess.run(cmd, check=False)
2018-08-14 10:08:59 +02:00
# Index new mails
self.engine.notmuch_new()
2018-08-14 10:08:59 +02:00
# Notify
self.output.notify_all()
2018-08-14 17:23:57 +02:00
# Tag new mails
self.engine.apply_msgs('tag:unprocessed', self.engine.retag_msg,
show_progress=True, write=True)
2019-10-26 17:09:22 +02:00
2019-11-01 18:34:45 +01:00
def operation_list(self) -> None:
"""
List operation: Print all folders.
"""
self.output.print_dir_list()
def operation_debug(self) -> None:
2019-10-26 17:09:22 +02:00
"""
DEBUG
"""
2019-11-01 18:34:45 +01:00
print("UwU")
2018-08-14 17:23:57 +02:00
def operation_retag(self) -> None:
2019-10-26 17:09:22 +02:00
"""
Retag operation: Manually retag all the mails in the database.
Mostly debug I suppose.
"""
self.engine.apply_msgs('*', self.engine.retag_msg,
show_progress=True, write=True)
2018-08-14 17:23:57 +02:00
def operation_all(self) -> None:
2019-10-26 17:09:22 +02:00
"""
All operation: list every single message.
"""
self.engine.apply_msgs('*', self.output.print_msg)
def add_subparsers(self) -> None:
"""
Add the operation parser to the main parser.
"""
# TODO If the only operation to the parser done are adding argument,
# we should automate this.
subparsers = self.parser.add_subparsers(help="Action to execute")
# List messages
self.parser.set_defaults(operation=self.operation_default)
# inbox (default)
parser_inbox = subparsers.add_parser(
"inbox", help="Show unread, unsorted and flagged messages")
parser_inbox.add_argument('-u', '--only-unread', action='store_true',
help="Show unread messages only")
# TODO Make this more relevant
parser_inbox.set_defaults(operation=self.operation_inbox)
# list folder [--recurse]
# List actions
2019-11-01 18:34:45 +01:00
parser_list = subparsers.add_parser(
"list", help="List all folders")
# parser_list.add_argument('message', nargs='*', help="Messages")
parser_list.set_defaults(operation=self.operation_list)
# flag msg...
parser_flag = subparsers.add_parser(
"flag", help="Mark messages as flagged")
parser_flag.add_argument('message', nargs='*', help="Messages")
parser_flag.set_defaults(operation=self.operation_flag)
# unflag msg...
parser_unflag = subparsers.add_parser(
"unflag", help="Mark messages as not-flagged")
parser_unflag.add_argument('message', nargs='*', help="Messages")
parser_unflag.set_defaults(operation=self.operation_unflag)
# delete msg...
# spam msg...
# move dest msg...
# Read message
# read msg [--html] [--plain] [--browser]
parser_read = subparsers.add_parser("read", help="Read message")
parser_read.add_argument('message', nargs=1, help="Messages")
parser_read.set_defaults(operation=self.operation_read)
# attach msg [id] [--save] (list if no id, xdg-open else)
# Redaction
# new account
# reply msg [--all]
# Folder management
# tree [folder]
# mkdir folder
# rmdir folder (prevent if folder isn't empty (mail/subfolder))
# (yeah that should do)
# Meta
# setup (interactive thing maybe)
# fetch (mbsync, notmuch new, retag, notify; called by greater gods)
parser_fetch = subparsers.add_parser(
"fetch", help="Fetch mail, tag them, and run notifications")
parser_fetch.set_defaults(operation=self.operation_fetch)
# Debug
# debug (various)
parser_debug = subparsers.add_parser(
"debug", help="Who know what this holds...")
parser_debug.set_defaults(verbosity='DEBUG')
parser_debug.set_defaults(operation=self.operation_debug)
# retag (all or unprocessed)
parser_retag = subparsers.add_parser(
"retag", help="Retag all mails (when you changed configuration)")
parser_retag.set_defaults(operation=self.operation_retag)
# all
parser_all = subparsers.add_parser("all", help="Show ALL messages")
parser_all.set_defaults(operation=self.operation_all)
def create_parser(self) -> argparse.ArgumentParser:
"""
Create the main parser that will handle the user arguments.
"""
parser = argparse.ArgumentParser(description="Meh mail client")
parser.add_argument('-v', '--verbosity',
choices=MelCLI.VERBOSITY_LEVELS, default='WARNING',
help="Verbosity of self.log messages")
# parser.add_argument('-n', '--dry-run', action='store_true',
# help="Don't do anything") # DEBUG
default_config_file = os.path.join(
xdg.BaseDirectory.xdg_config_home, 'mel', 'accounts.conf')
parser.add_argument('-c', '--config', default=default_config_file,
help="Accounts config file")
return parser
def __init__(self) -> None:
self.log = logging.getLogger("MelCLI")
2018-08-14 10:08:59 +02:00
self.parser = self.create_parser()
self.add_subparsers()
2018-08-14 17:23:57 +02:00
self.args = self.parser.parse_args()
coloredlogs.install(level=self.args.verbosity,
fmt='%(levelname)s %(name)s %(message)s')
2018-08-14 10:08:59 +02:00
self.engine = MelEngine(self.args.config)
self.output = MelOutput(self.engine)
if self.args.operation:
self.log.info("Executing operation %s", self.args.operation)
self.args.operation()
if __name__ == "__main__":
2019-11-01 18:34:45 +01:00
if not os.environ.get("MEL_DEBUG"):
CLI = MelCLI()
2019-11-01 18:34:45 +01:00
else:
try:
CLI = MelCLI()
except:
EXTYPE, VALUE, TB = sys.exc_info()
traceback.print_exc()
pdb.post_mortem(TB)