diff --git a/config/scripts/mel b/config/scripts/mel
index 692f238..9c41cfd 100755
--- a/config/scripts/mel
+++ b/config/scripts/mel
@@ -1,5 +1,5 @@
#!/usr/bin/env python3
-# pylint: disable=C0103,W0603,W0621,E1101
+# pylint: disable=E1101
"""
Meh mail client
@@ -19,9 +19,8 @@ to become a fully-functional extremly-opinonated mail client.
# TODO IMAP IDLE watches?
# TODO GPG
# TODO (only then) Refactor
-# TODO OOP-based
# TODO Merge file with melConf
-# TODO Un-ignore pyling warnings
+# TODO Config system revamp
import argparse
import configparser
@@ -31,11 +30,12 @@ import email.parser
import html
import logging
import os
+import pdb
import re
import shutil
import subprocess
import sys
-import time
+import traceback
import typing
import colorama
@@ -44,609 +44,589 @@ import notmuch
import progressbar
import xdg.BaseDirectory
-PERF_LAST = time.perf_counter()
-PERF_DICT: typing.Dict[str, float] = dict()
-
-a: typing.Any = 'DEBUG VARIABLE (empty)'
-
-
-def perfstep(name: str) -> None:
- """
- DEBUG
- Small performance profiler to measure steps.
- Call with the name of the step when you just finished it.
- """
- current_time = time.perf_counter()
- global PERF_LAST
- global PERF_DICT
- diff = current_time - PERF_LAST
- if name not in PERF_DICT:
- PERF_DICT[name] = 0.0
- PERF_DICT[name] += diff
- PERF_LAST = time.perf_counter()
-
-
-ACCOUNTS: typing.Dict[str, configparser.SectionProxy] = dict()
-ALIASES: typing.Set[str] = set() # All the emails the user is represented as
-# TODO If the user send emails to himself, maybe that wont cut it.
-DB = None
-CONFIG = None
-
-
-def notmuch_new() -> None:
- """
- Runs `notmuch new`, which basically update the database
- to match the mail folder.
- """
- close_database()
- log.info("Indexing mails")
- notmuchConfigPath = os.path.expanduser(
- "~/.config/notmuch-config") # TODO Better
- cmd = ["notmuch", "--config", notmuchConfigPath, "new"]
- log.debug(" ".join(cmd))
- subprocess.run(cmd, check=True)
-
-
-def list_folders() -> typing.List[typing.Tuple[str, ...]]:
- """
- List all the folders of the mail dir.
- """
- assert CONFIG
- storagePath = os.path.realpath(
- os.path.expanduser(CONFIG["GENERAL"]["storage"]))
- folders = list()
- for account in ACCOUNTS:
- storagePathAccount = os.path.join(storagePath, account)
- for root, dirs, _ in os.walk(storagePathAccount):
- if "cur" not in dirs or "new" not in dirs or "tmp" not in dirs:
- continue
- assert root.startswith(storagePath)
- path = root[len(storagePath):]
- pathSplit = path.split('/')
- if pathSplit[0] == '':
- pathSplit = pathSplit[1:]
- folders.append(tuple(pathSplit))
- return folders
-
-
-def open_database(write: bool = False) -> None:
- """
- Open an access notmuch database in read or read+write mode.
- It is stored in the global DB.
- Be sure to require only in the mode you want to avoid deadlocks.
- """
- assert CONFIG
- global DB
- mode = notmuch.Database.MODE.READ_WRITE if write \
- else notmuch.Database.MODE.READ_ONLY
- if DB:
- if DB.mode == mode:
- return
- log.info("Current database not in mode %s, closing", mode)
- close_database()
- log.info("Opening database in mode %s", mode)
- dbPath = os.path.realpath(os.path.expanduser(CONFIG["GENERAL"]["storage"]))
- DB = notmuch.Database(mode=mode, path=dbPath)
-
-
-def close_database() -> None:
- """
- Close the access notmuch database in read or read+write mode.
- It is stored in the global DB.
- """
- global DB
- if DB:
- log.info("Closing database")
- DB.close()
- DB = None
-
-
-def generate_aliases() -> None:
- assert CONFIG
- for name in CONFIG.sections():
- if not name.islower():
- continue
- section = CONFIG[name]
- ALIASES.add(section["from"])
- if "alternatives" in section:
- for alt in section["alternatives"].split(";"):
- ALIASES.add(alt)
- ACCOUNTS[name] = section
-
-
MailLocation = typing.NewType('MailLocation', typing.Tuple[str, str, str])
-def get_location(msg: notmuch.Message) -> MailLocation:
+class MelEngine:
"""
- Return the filesystem location (relative to the mail directory)
- of the given message.
+ Class with all the functions for manipulating the database / mails.
"""
- path = msg.get_filename()
- path = os.path.dirname(path)
- assert DB
- base = DB.get_path()
- assert path.startswith(base)
- path = path[len(base):]
- pathSplit = path.split('/')
- mailbox = pathSplit[1]
- assert mailbox in ACCOUNTS
- state = pathSplit[-1]
- folder = tuple(pathSplit[2:-1])
- assert state in {'cur', 'tmp', 'new'}
- return (mailbox, folder, state)
-
-MAILBOX_COLORS: typing.Dict[str, str] = dict()
-
-
-def get_mailbox_color(mailbox: str) -> str:
- """
- Return the color of the given mailbox in a ready to print
- string with ASCII escape codes.
- """
- # TODO Do not use 256³ colors but 16 colors
- assert CONFIG
- if mailbox not in MAILBOX_COLORS:
- colorStr = CONFIG[mailbox]["color"]
- colorStr = colorStr[1:] if colorStr[0] == '#' else colorStr
- R = int(colorStr[0:2], 16)
- G = int(colorStr[2:4], 16)
- B = int(colorStr[4:6], 16)
- MAILBOX_COLORS[mailbox] = '\x1b[38;2;{};{};{}m'.format(R, G, B)
- return MAILBOX_COLORS[mailbox]
-
-
-def format_date(date: datetime.datetime) -> str:
- """
- Format the given date as a 9-characters width string.
- Show the time if the mail is less than 24h old,
- else show the date.
- """
- # TODO Do as the description say
- now = datetime.datetime.now()
- midnight = datetime.datetime(year=now.year, month=now.month, day=now.day)
- if date > midnight:
- return date.strftime('%H:%M:%S')
- # TODO Use my favourite date system
- return date.strftime('%d/%m/%y')
-
-
-WIDTH_FIXED = 31
-WIDTH_RATIO_DEST_SUBJECT = 0.3
-ISATTY = sys.stdout.isatty()
-DEST_WIDTH: typing.Optional[int] = None
-SUBJECT_WIDTH: typing.Optional[int] = None
-
-
-def compute_line_format() -> None:
- """
- Based on the terminal width, assign the width of flexible columns.
- """
- if ISATTY:
- columns, _ = shutil.get_terminal_size((80, 20))
- remain = columns - WIDTH_FIXED - 1
- global DEST_WIDTH, SUBJECT_WIDTH
- DEST_WIDTH = int(remain * WIDTH_RATIO_DEST_SUBJECT)
- SUBJECT_WIDTH = remain - DEST_WIDTH
- else:
- DEST_WIDTH = None
- SUBJECT_WIDTH = None
-
-
-def clip_text(size: int, text: str) -> str:
- """
- Fit text into the given character size,
- fill with spaces if shorter,
- clip with … if larger.
- """
- if size is None:
- return text
- length = len(text)
- if length == size:
- return text
- if length > size:
- return text[:size-1] + '…'
- return text + ' ' * (size - length)
-
-
-def isUID(uid: typing.Any) -> bool:
- """
- Tells if the provided string is a valid UID.
- """
- return isinstance(uid, str) and len(uid) == 12 \
- and bool(re.match('^[a-zA-Z0-9+/]{12}$', uid))
-
-
-def print_msg(msg: notmuch.Message) -> None:
- """
- Print the given message header on one line.
- """
- if not DEST_WIDTH:
- compute_line_format()
- assert DEST_WIDTH and SUBJECT_WIDTH
-
- sep = " " if ISATTY else "\t"
- line = ""
- tags = set(msg.get_tags())
- mailbox, _, _ = get_location(msg)
- if ISATTY:
- line += get_mailbox_color(mailbox)
-
- # UID
- uid = None
- for tag in tags:
- if tag.startswith('tuid'):
- uid = tag[4:]
- assert uid and isUID(uid), "{uid} ({type(UID)}) is not a valid UID."
- line += uid
-
- # Date
- line += sep
- date = datetime.datetime.fromtimestamp(msg.get_date())
- line += format_date(date)
-
- # Icons
- line += sep
-
- def tags2col1(tag1: str, tag2: str,
- characters: typing.Tuple[str, str, str, str]) -> None:
+ def load_config(self, config_path: str) -> configparser.ConfigParser:
"""
- Show the presence/absence of two tags with one character.
+ Load the configuration file into MelEngine
"""
- nonlocal line
- both, first, second, none = characters
- if tag1 in tags:
- if tag2 in tags:
- line += both
- else:
- line += first
- else:
- if tag2 in tags:
- line += second
- else:
- line += none
+ self.log.info("Loading config file: %s", config_path)
+ if not os.path.isfile(config_path):
+ self.log.fatal("Config file not found!")
+ sys.exit(1)
+ # TODO Create it, maybe?
+ config = configparser.ConfigParser()
+ config.read(config_path)
+ # NOTE An empty/inexistant file while give an empty config
+ return config
- tags2col1('spam', 'draft', ('?', 'S', 'D', ' '))
- tags2col1('attachment', 'encrypted', ('E', 'A', 'E', ' '))
- tags2col1('unread', 'flagged', ('!', 'U', 'F', ' '))
- tags2col1('sent', 'replied', ('?', '↑', '↪', ' '))
-
- if 'sent' in tags:
- dest = msg.get_header("to")
- else:
- dest = msg.get_header("from")
- line += sep
- line += clip_text(DEST_WIDTH, dest)
-
- # Subject
- line += sep
- subject = msg.get_header("subject")
- line += clip_text(SUBJECT_WIDTH, subject)
-
- if ISATTY:
- line += colorama.Style.RESET_ALL
- print(line)
-
-
-def extract_email(field: str) -> str:
- """
- Extract the email adress from a To: or From: field
- (usually the whole field or between < >)
- """
- try:
- sta = field.index('<')
- sto = field.index('>')
- return field[sta+1:sto]
- except ValueError:
- return field
-
-
-def retag_msg(msg: notmuch.Message) -> None:
- """
- Update automatic tags for message.
- """
- _, folder, _ = get_location(msg)
-
- # Search-friendly folder name
- slugFolderList = list()
- for f, fold in [(f, folder[f]) for f in range(len(folder))]:
- if f == 0 and len(folder) > 1 and fold == "INBOX":
- continue
- slugFolderList.append(fold.upper())
- slugFolder = tuple(slugFolderList)
-
- tags = set(msg.get_tags())
-
- def tag_if(tag: str, condition: bool) -> None:
+ def generate_aliases(self) -> None:
"""
- Ensure the presence/absence of tag depending on the condition.
+ Populate MelEngine.aliases and MelEngine.accounts
"""
- nonlocal msg
- if condition and tag not in tags:
- msg.add_tag(tag)
- elif not condition and tag in tags:
- msg.remove_tag(tag)
- expeditor = extract_email(msg.get_header('from'))
-
- tag_if('inbox', slugFolder[0] == 'INBOX')
- tag_if('spam', slugFolder[0] in ('JUNK', 'SPAM'))
- tag_if('deleted', slugFolder[0] == 'TRASH')
- tag_if('draft', slugFolder[0] == 'DRAFTS')
- tag_if('sent', expeditor in ALIASES)
- tag_if('unprocessed', False)
-
- # UID
- uid = msg.get_header("X-TUID")
- if not isUID(uid):
- # TODO Happens to sent mails but should it?
- print(f"{msg.get_filename()} has no UID!")
- return
- uidtag = 'tuid{}'.format(uid)
- # Remove eventual others UID
- for tag in tags:
- if tag.startswith('tuid') and tag != uidtag:
- msg.remove_tag(tag)
- msg.add_tag(uidtag)
-
-
-def applyMsgs(queryStr: str, action: typing.Callable, *args: typing.Any,
- showProgress: bool = False, write: bool = False,
- closeDb: bool = True, **kwargs: typing.Any) -> int:
- """
- Run a function on the messages selected by the given query.
- """
- open_database(write=write)
-
- log.info("Querying %s", queryStr)
- query = notmuch.Query(DB, queryStr)
- query.set_sort(notmuch.Query.SORT.OLDEST_FIRST)
-
- elements = query.search_messages()
- nbMsgs = query.count_messages()
-
- iterator = progressbar.progressbar(
- elements, max_value=nbMsgs) if showProgress else elements
-
- log.info("Executing %s", action)
- for msg in iterator:
- if write:
- msg.freeze()
-
- action(msg, *args, **kwargs)
-
- if write:
- msg.thaw()
- msg.tags_to_maildir_flags()
-
- if closeDb:
- close_database()
-
- return nbMsgs
-
-
-def notify_msg(msg: notmuch.Message) -> None:
- """
- Send a notification for the given message.
- """
- log.info("Sending notification for %s", msg)
- subject = msg.get_header("subject")
- expd = msg.get_header("from")
- account, _, _ = get_location(msg)
-
- summary = '{} ({})'.format(html.escape(expd), account)
- body = html.escape(subject)
- cmd = ["notify-send", "-u", "low", "-i", "mail-message-new", summary, body]
- print(' '.join(cmd))
- subprocess.run(cmd, check=False)
-
-
-def notify_all() -> None:
- """
- Send a notification for unprocessed and unread message.
- Basically should only send a notification for a given message once
- since it should be marked as processed right after.
- """
- open_database()
- nbMsgs = applyMsgs('tag:unread and tag:unprocessed', notify_msg)
- if nbMsgs > 0:
- log.info("Playing notification sound (%d new message(s))", nbMsgs)
- cmd = ["play", "-n", "synth", "sine", "E4", "sine", "A5",
- "remix", "1-2", "fade", "0.5", "1.2", "0.5", "2"]
- subprocess.run(cmd, check=False)
- close_database()
-
-
-def chunks(l: str, n: int) -> typing.Iterable[str]:
- """Yield successive n-sized chunks from l."""
- # From https://stackoverflow.com/a/312464
- for i in range(0, len(l), n):
- yield l[i:i + n]
-
-
-def apply_msgs_input(argmessages: typing.List[str], action: typing.Callable,
- write: bool = False) -> None:
- """
- Run a function on the message given by the user.
- """
- if not argmessages:
- fromStdin = not sys.stdin.isatty()
- if argmessages:
- fromStdin = len(argmessages) == 1 and argmessages == '-'
-
- messages = list()
- if fromStdin:
- for line in sys.stdin:
- uid = line[:12]
- if not isUID(uid):
- log.error("Not an UID: %s", uid)
+ assert self.config
+ for name in self.config.sections():
+ if not name.islower():
continue
- messages.append(uid)
- else:
- for uids in argmessages:
- if len(uids) > 12:
- log.warning("Might have forgotten some spaces between the " +
- "UIDs. Don't worry, I'll split them for you")
- for uid in chunks(uids, 12):
- if not isUID(uid):
- log.error("Not an UID: %s", uid)
+ section = self.config[name]
+ self.aliases.add(section["from"])
+ if "alternatives" in section:
+ for alt in section["alternatives"].split(";"):
+ self.aliases.add(alt)
+ self.accounts[name] = section
+
+ def __init__(self, config_path: str) -> None:
+ self.log = logging.getLogger("Mel")
+
+ self.config = self.load_config(config_path)
+
+ self.database = None
+
+ # Caches
+ self.accounts: typing.Dict[str, configparser.SectionProxy] = dict()
+ # All the emails the user is represented as:
+ self.aliases: typing.Set[str] = set()
+ # TODO If the user send emails to himself, maybe that wont cut it.
+ self.mailbox_colors: typing.Dict[str, str] = dict()
+
+ self.generate_aliases()
+
+ def notmuch_new(self) -> None:
+ """
+ Runs `notmuch new`, which basically update the database
+ to match the mail folder.
+ """
+ assert not self.database
+ self.log.info("Indexing mails")
+ notmuch_config_file = os.path.expanduser(
+ "~/.config/notmuch-config") # TODO Better
+ cmd = ["notmuch", "--config", notmuch_config_file, "new"]
+ self.log.debug(" ".join(cmd))
+ subprocess.run(cmd, check=True)
+
+ def list_folders(self) -> typing.List[typing.Tuple[str, ...]]:
+ """
+ List all the folders of the mail dir.
+ """
+ assert self.config
+ storage_path = os.path.realpath(
+ os.path.expanduser(self.config["GENERAL"]["storage"]))
+ folders = list()
+ for account in self.accounts:
+ storage_path_account = os.path.join(storage_path, account)
+ for root, dirs, _ in os.walk(storage_path_account):
+ if "cur" not in dirs or "new" not in dirs or "tmp" not in dirs:
+ continue
+ assert root.startswith(storage_path)
+ path = root[len(storage_path):]
+ path_split = path.split('/')
+ if path_split[0] == '':
+ path_split = path_split[1:]
+ folders.append(tuple(path_split))
+ return folders
+
+ def open_database(self, write: bool = False) -> None:
+ """
+ Open an access notmuch database in read or read+write mode.
+ Be sure to require only in the mode you want to avoid deadlocks.
+ """
+ assert self.config
+ mode = notmuch.Database.MODE.READ_WRITE if write \
+ else notmuch.Database.MODE.READ_ONLY
+ if self.database:
+ # If the requested mode is the one already present,
+ # or we request read when it's already write, do nothing
+ if mode in (self.database.mode, notmuch.Database.MODE.READ_ONLY):
+ return
+ self.log.info("Current database not in mode %s, closing", mode)
+ self.close_database()
+ self.log.info("Opening database in mode %s", mode)
+ db_path = os.path.realpath(
+ os.path.expanduser(self.config["GENERAL"]["storage"]))
+ self.database = notmuch.Database(mode=mode, path=db_path)
+
+ def close_database(self) -> None:
+ """
+ Close the access notmuch database.
+ """
+ if self.database:
+ self.log.info("Closing database")
+ self.database.close()
+ self.database = None
+
+ def get_location(self, msg: notmuch.Message) -> MailLocation:
+ """
+ Return the filesystem location (relative to the mail directory)
+ of the given message.
+ """
+ path = msg.get_filename()
+ path = os.path.dirname(path)
+ assert self.database
+ base = self.database.get_path()
+ assert path.startswith(base)
+ path = path[len(base):]
+ path_split = path.split('/')
+ mailbox = path_split[1]
+ assert mailbox in self.accounts
+ state = path_split[-1]
+ folder = tuple(path_split[2:-1])
+ assert state in {'cur', 'tmp', 'new'}
+ return (mailbox, folder, state)
+
+ def get_mailbox_color(self, mailbox: str) -> str:
+ """
+ Return the color of the given mailbox in a ready to print
+ string with ASCII escape codes.
+ """
+ assert self.config
+ if mailbox not in self.mailbox_colors:
+ # RGB colors (not supported everywhere)
+ # color_str = self.config[mailbox]["color"]
+ # color_str = color_str[1:] if color_str[0] == '#' else color_str
+ # R = int(color_str[0:2], 16)
+ # G = int(color_str[2:4], 16)
+ # B = int(color_str[4:6], 16)
+ # self.mailbox_colors[mailbox] = f"\x1b[38;2;{R};{G};{B}m"
+ color_int = int(self.config[mailbox]["color16"])
+
+ self.mailbox_colors[mailbox] = f"\x1b[38;5;{color_int}m"
+ return self.mailbox_colors[mailbox]
+
+ @staticmethod
+ def is_uid(uid: typing.Any) -> bool:
+ """
+ Tells if the provided string is a valid UID.
+ """
+ return isinstance(uid, str) and len(uid) == 12 \
+ and bool(re.match('^[a-zA-Z0-9+/]{12}$', uid))
+
+ @staticmethod
+ def extract_email(field: str) -> str:
+ """
+ Extract the email adress from a To: or From: field
+ (usually the whole field or between < >)
+ """
+ # TODO Can be made better (extract name and email)
+ # Also what happens with multiple dests?
+ try:
+ sta = field.index('<')
+ sto = field.index('>')
+ return field[sta+1:sto]
+ except ValueError:
+ return field
+
+ def retag_msg(self, msg: notmuch.Message) -> None:
+ """
+ Update automatic tags for message.
+ """
+ _, folder, _ = self.get_location(msg)
+
+ # Search-friendly folder name
+ slug_folder_list = list()
+ for fold_index, fold in [(fold_index, folder[fold_index])
+ for fold_index in range(len(folder))]:
+ if fold_index == 0 and len(folder) > 1 and fold == "INBOX":
+ continue
+ slug_folder_list.append(fold.upper())
+ slug_folder = tuple(slug_folder_list)
+
+ tags = set(msg.get_tags())
+
+ def tag_if(tag: str, condition: bool) -> None:
+ """
+ Ensure the presence/absence of tag depending on the condition.
+ """
+ nonlocal msg
+ if condition and tag not in tags:
+ msg.add_tag(tag)
+ elif not condition and tag in tags:
+ msg.remove_tag(tag)
+ expeditor = MelEngine.extract_email(msg.get_header('from'))
+
+ tag_if('inbox', slug_folder[0] == 'INBOX')
+ tag_if('spam', slug_folder[0] in ('JUNK', 'SPAM'))
+ tag_if('deleted', slug_folder[0] == 'TRASH')
+ tag_if('draft', slug_folder[0] == 'DRAFTS')
+ tag_if('sent', expeditor in self.aliases)
+ tag_if('unprocessed', False)
+
+ # UID
+ uid = msg.get_header("X-TUID")
+ if not MelEngine.is_uid(uid):
+ # TODO Happens to sent mails but should it?
+ print(f"{msg.get_filename()} has no UID!")
+ return
+ uidtag = 'tuid{}'.format(uid)
+ # Remove eventual others UID
+ for tag in tags:
+ if tag.startswith('tuid') and tag != uidtag:
+ msg.remove_tag(tag)
+ msg.add_tag(uidtag)
+
+ def apply_msgs(self, query_str: str, action: typing.Callable,
+ *args: typing.Any, show_progress: bool = False,
+ write: bool = False, close_db: bool = True,
+ **kwargs: typing.Any) -> int:
+ # TODO Detail the typing.Callable
+ """
+ Run a function on the messages selected by the given query.
+ """
+ self.open_database(write=write)
+
+ self.log.info("Querying %s", query_str)
+ query = notmuch.Query(self.database, query_str)
+ query.set_sort(notmuch.Query.SORT.OLDEST_FIRST)
+
+ elements = query.search_messages()
+ nb_msgs = query.count_messages()
+
+ iterator = progressbar.progressbar(
+ elements, max_value=nb_msgs) if show_progress else elements
+
+ self.log.info("Executing %s", action)
+ for msg in iterator:
+ if write:
+ msg.freeze()
+
+ action(msg, *args, **kwargs)
+
+ if write:
+ msg.thaw()
+ msg.tags_to_maildir_flags()
+
+ if close_db:
+ self.close_database()
+
+ return nb_msgs
+
+
+class MelOutput:
+ """
+ All functions that print mail stuff onto the screen.
+ """
+
+ WIDTH_FIXED = 31
+ WIDTH_RATIO_DEST_SUBJECT = 0.3
+
+ def compute_line_format(self) -> typing.Tuple[typing.Optional[int],
+ typing.Optional[int]]:
+ """
+ Based on the terminal width, assign the width of flexible columns.
+ """
+ if self.is_tty:
+ columns, _ = shutil.get_terminal_size((80, 20))
+ remain = columns - MelOutput.WIDTH_FIXED - 1
+ dest_width = int(remain * MelOutput.WIDTH_RATIO_DEST_SUBJECT)
+ subject_width = remain - dest_width
+ return (dest_width, subject_width)
+ return (None, None)
+
+ def __init__(self, engine: MelEngine) -> None:
+ self.log = logging.getLogger("MelOutput")
+
+ self.engine = engine
+
+ self.is_tty = sys.stdout.isatty()
+ self.dest_width, self.subject_width = self.compute_line_format()
+
+ @staticmethod
+ def format_date(date: datetime.datetime) -> str:
+ """
+ Format the given date as a 9-characters width string.
+ Show the time if the mail is less than 24h old,
+ else show the date.
+ """
+ now = datetime.datetime.now()
+ if now - date < datetime.timedelta(days=1):
+ return date.strftime('%H:%M:%S')
+ return date.strftime('%y-%m-%d')
+
+ @staticmethod
+ def clip_text(size: typing.Optional[int], text: str) -> str:
+ """
+ Fit text into the given character size,
+ fill with spaces if shorter,
+ clip with … if larger.
+ """
+ if size is None:
+ return text
+ length = len(text)
+ if length == size:
+ return text
+ if length > size:
+ return text[:size-1] + '…'
+ return text + ' ' * (size - length)
+
+ @staticmethod
+ def chunks(iterable: str, chunk_size: int) -> typing.Iterable[str]:
+ """Yield successive chunk_size-sized chunks from iterable."""
+ # From https://stackoverflow.com/a/312464
+ for i in range(0, len(iterable), chunk_size):
+ yield iterable[i:i + chunk_size]
+
+ @staticmethod
+ def sizeof_fmt(num: int, suffix: str = 'B') -> str:
+ """
+ Print the given size in a human-readable format.
+ """
+ remainder = float(num)
+ # From https://stackoverflow.com/a/1094933
+ for unit in ['', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi']:
+ if abs(remainder) < 1024.0:
+ return "%3.1f %s%s" % (remainder, unit, suffix)
+ remainder /= 1024.0
+ return "%.1f %s%s" % (remainder, 'Yi', suffix)
+
+ def print_msg(self, msg: notmuch.Message) -> None:
+ """
+ Print the given message header on one line.
+ """
+ if not self.dest_width:
+ self.compute_line_format()
+
+ sep = " " if self.is_tty else "\t"
+ line = ""
+ tags = set(msg.get_tags())
+ mailbox, _, _ = self.engine.get_location(msg)
+ if self.is_tty:
+ line += self.engine.get_mailbox_color(mailbox)
+
+ # UID
+ uid = None
+ for tag in tags:
+ if tag.startswith('tuid'):
+ uid = tag[4:]
+ assert uid and MelEngine.is_uid(
+ uid), "{uid} ({type(UID)}) is not a valid UID."
+ line += uid
+
+ # Date
+ line += sep
+ date = datetime.datetime.fromtimestamp(msg.get_date())
+ line += self.format_date(date)
+
+ # Icons
+ line += sep
+
+ def tags2col1(tag1: str, tag2: str,
+ characters: typing.Tuple[str, str, str, str]) -> None:
+ """
+ Show the presence/absence of two tags with one character.
+ """
+ nonlocal line
+ both, first, second, none = characters
+ if tag1 in tags:
+ if tag2 in tags:
+ line += both
+ else:
+ line += first
+ else:
+ if tag2 in tags:
+ line += second
+ else:
+ line += none
+
+ tags2col1('spam', 'draft', ('?', 'S', 'D', ' '))
+ tags2col1('attachment', 'encrypted', ('E', 'A', 'E', ' '))
+ tags2col1('unread', 'flagged', ('!', 'U', 'F', ' '))
+ tags2col1('sent', 'replied', ('?', '↑', '↪', ' '))
+
+ if 'sent' in tags:
+ dest = msg.get_header("to")
+ else:
+ dest = msg.get_header("from")
+ line += sep
+ line += MelOutput.clip_text(self.dest_width, dest)
+
+ # Subject
+ line += sep
+ subject = msg.get_header("subject")
+ line += MelOutput.clip_text(self.subject_width, subject)
+
+ if self.is_tty:
+ line += colorama.Style.RESET_ALL
+ print(line)
+
+ def notify_msg(self, msg: notmuch.Message) -> None:
+ """
+ Send a notification for the given message.
+ """
+ self.log.info("Sending notification for %s", msg)
+ subject = msg.get_header("subject")
+ expd = msg.get_header("from")
+ account, _, _ = self.engine.get_location(msg)
+
+ summary = '{} ({})'.format(html.escape(expd), account)
+ body = html.escape(subject)
+ cmd = ["notify-send", "-u", "low", "-i",
+ "mail-message-new", summary, body]
+ print(' '.join(cmd))
+ subprocess.run(cmd, check=False)
+
+ def notify_all(self) -> None:
+ """
+ Send a notification for unprocessed and unread message.
+ Basically should only send a notification for a given message once
+ since it should be marked as processed right after.
+ """
+ nb_msgs = self.engine.apply_msgs(
+ 'tag:unread and tag:unprocessed', self.notify_msg)
+ if nb_msgs > 0:
+ self.log.info(
+ "Playing notification sound (%d new message(s))", nb_msgs)
+ cmd = ["play", "-n", "synth", "sine", "E4", "sine", "A5",
+ "remix", "1-2", "fade", "0.5", "1.2", "0.5", "2"]
+ subprocess.run(cmd, check=False)
+
+ @staticmethod
+ def format_header_value(val: str) -> str:
+ """
+ Return split header values in a contiguous string.
+ """
+ return val.replace('\n', '').replace('\t', '').strip()
+
+ PART_MULTI_FORMAT = colorama.Fore.BLUE + \
+ '{count} {indent}+ {typ}' + colorama.Style.RESET_ALL
+ PART_LEAF_FORMAT = colorama.Fore.BLUE + \
+ '{count} {indent}→ {desc} ({typ}; {size})' + \
+ colorama.Style.RESET_ALL
+
+ def show_parts_tree(self, part: email.message.Message,
+ depth: int = 0, count: int = 1) -> int:
+ """
+ Show a tree of the parts contained in a message.
+ Return the number of parts of the mesage.
+ """
+ indent = depth * '\t'
+ typ = part.get_content_type()
+
+ if part.is_multipart():
+ print(MelOutput.PART_MULTI_FORMAT.format(
+ count=count, indent=indent, typ=typ))
+ payl = part.get_payload()
+ assert isinstance(payl, list)
+ size = 1
+ for obj in payl:
+ size += self.show_parts_tree(obj, depth=depth+1,
+ count=count+size)
+ return size
+
+ payl = part.get_payload(decode=True)
+ assert isinstance(payl, bytes)
+ size = len(payl)
+ desc = part.get('Content-Description', '')
+ print(MelOutput.PART_LEAF_FORMAT.format(
+ count=count, indent=indent, typ=typ, desc=desc,
+ size=MelOutput.sizeof_fmt(size)))
+ return 1
+
+ INTERESTING_HEADERS = ["Date", "From", "Subject", "To", "Cc", "Message-Id"]
+ HEADER_FORMAT = colorama.Fore.BLUE + colorama.Style.BRIGHT + \
+ '{}:' + colorama.Style.NORMAL + ' {}' + colorama.Style.RESET_ALL
+
+ def read_msg(self, msg: notmuch.Message) -> None:
+ """
+ Display the content of a mail.
+ """
+ # Parse
+ filename = msg.get_filename()
+ parser = email.parser.BytesParser()
+ with open(filename, 'rb') as filedesc:
+ mail = parser.parse(filedesc)
+
+ # Defects
+ if mail.defects:
+ self.log.warning("Defects found in the mail:")
+ for defect in mail.defects:
+ self.log.warning(defect)
+
+ # Headers
+ for key in MelOutput.INTERESTING_HEADERS:
+ val = mail.get(key)
+ if val:
+ assert isinstance(val, str)
+ val = self.format_header_value(val)
+ print(MelOutput.HEADER_FORMAT.format(key, val))
+ # TODO Show all headers
+ # TODO BONUS Highlight failed verifications
+
+ self.show_parts_tree(mail)
+ print()
+
+ # Show text/plain
+ for part in mail.walk():
+ if part.get_content_type() == "text/plain":
+ payl = part.get_payload(decode=True)
+ assert isinstance(payl, bytes)
+ print(payl.decode())
+
+
+class MelCLI():
+ """
+ Handles the user input and run asked operations.
+ """
+ VERBOSITY_LEVELS = ["DEBUG", "INFO", "WARNING", "ERROR", "FATAL"]
+
+ def apply_msgs_input(self, argmessages: typing.List[str],
+ action: typing.Callable, write: bool = False) -> None:
+ """
+ Run a function on the message given by the user.
+ """
+ # TODO First argument might be unecessary
+ if not argmessages:
+ from_stdin = not sys.stdin.isatty()
+ if argmessages:
+ from_stdin = len(argmessages) == 1 and argmessages == '-'
+
+ messages = list()
+ if from_stdin:
+ for line in sys.stdin:
+ uid = line[:12]
+ if not MelEngine.is_uid(uid):
+ self.log.error("Not an UID: %s", uid)
continue
messages.append(uid)
+ else:
+ for uids in argmessages:
+ if len(uids) > 12:
+ self.log.warning("Might have forgotten some spaces "
+ "between the UIDs. Don't worry, I'll "
+ "split them for you")
+ for uid in MelOutput.chunks(uids, 12):
+ if not MelEngine.is_uid(uid):
+ self.log.error("Not an UID: %s", uid)
+ continue
+ messages.append(uid)
- for message in messages:
- queryStr = f'tag:tuid{message}'
- nbMsgs = applyMsgs(queryStr, action, write=write, closeDb=False)
- if nbMsgs < 1:
- log.error("Couldn't execute function for message %s", message)
- close_database()
+ for message in messages:
+ query_str = f'tag:tuid{message}'
+ nb_msgs = self.engine.apply_msgs(
+ query_str, action, write=write, close_db=False)
+ if nb_msgs < 1:
+ self.log.error(
+ "Couldn't execute function for message %s", message)
-
-def format_header_value(val: str) -> str:
- """
- Return split header values in a contiguous string.
- """
- return val.replace('\n', '').replace('\t', '').strip()
-
-
-def sizeof_fmt(num: int, suffix: str = 'B') -> str:
- """
- Print the given size in a human-readable format.
- """
- remainder = float(num)
- # From https://stackoverflow.com/a/1094933
- for unit in ['', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi']:
- if abs(remainder) < 1024.0:
- return "%3.1f %s%s" % (remainder, unit, suffix)
- remainder /= 1024.0
- return "%.1f %s%s" % (remainder, 'Yi', suffix)
-
-
-PART_MULTI_FORMAT = colorama.Fore.BLUE + \
- '{nb} {indent}+ {typ}' + colorama.Style.RESET_ALL
-PART_LEAF_FORMAT = colorama.Fore.BLUE + \
- '{nb} {indent}→ {desc} ({typ}; {size})' + \
- colorama.Style.RESET_ALL
-
-
-def show_parts_tree(part: email.message.Message,
- depth: int = 0, nb: int = 1) -> int:
- """
- Show a tree of the parts contained in a message.
- Return the number of parts of the mesage.
- """
- indent = depth * '\t'
- typ = part.get_content_type()
-
- if part.is_multipart():
- print(PART_MULTI_FORMAT.format(nb=nb, indent=indent, typ=typ))
- payl = part.get_payload()
- assert isinstance(payl, list)
- size = 1
- for obj in payl:
- size += show_parts_tree(obj, depth=depth+1, nb=nb+size)
- return size
-
- # size = len(part.get_payload(decode=True))
- payl = part.get_payload(decode=True)
- assert isinstance(payl, bytes)
- size = len(payl)
- desc = part.get('Content-Description', '')
- print(PART_LEAF_FORMAT.format(nb=nb, indent=indent, typ=typ,
- desc=desc, size=sizeof_fmt(size)))
- return 1
-
-
-INTERESTING_HEADERS = ["Date", "From", "Subject", "To", "Cc", "Message-Id"]
-HEADER_FORMAT = colorama.Fore.BLUE + colorama.Style.BRIGHT + \
- '{}:' + colorama.Style.NORMAL + ' {}' + colorama.Style.RESET_ALL
-
-
-def read_msg(msg: notmuch.Message) -> None:
- # Parse
- filename = msg.get_filename()
- parser = email.parser.BytesParser()
- with open(filename, 'rb') as f:
- mail = parser.parse(f)
-
- # Debug
- global a
- a = mail
-
- # Defects
- if mail.defects:
- log.warning("Defects found in the mail:")
- for defect in mail.defects:
- log.warning(defect)
-
- # Headers
- for key in INTERESTING_HEADERS:
- val = mail.get(key)
- if val:
- assert isinstance(val, str)
- val = format_header_value(val)
- print(HEADER_FORMAT.format(key, val))
- # TODO Show all headers
- # TODO BONUS Highlight failed verifications
-
- show_parts_tree(mail)
- print()
-
- # Show text/plain
- for part in mail.walk():
- if part.get_content_type() == "text/plain":
- payl = part.get_payload(decode=True)
- assert isinstance(payl, bytes)
- print(payl.decode())
-
-
-perfstep("definitions")
-
-if __name__ == "__main__":
- # Main arguments
- parser = argparse.ArgumentParser(description="Meh mail client")
- selectedVerbosityLevels = ["DEBUG", "INFO", "WARNING", "ERROR", "FATAL"]
- parser.add_argument('-v', '--verbosity', choices=selectedVerbosityLevels,
- default='WARNING', help="Verbosity of log messages")
- # parser.add_argument('-n', '--dry-run', action='store_true',
- # help="Don't do anything") # DEBUG
- defaultConfigFile = os.path.join(
- xdg.BaseDirectory.xdg_config_home, 'mel', 'accounts.conf')
- parser.add_argument('-c', '--config', default=defaultConfigFile,
- help="Accounts config file")
-
- subparsers = parser.add_subparsers(help="Action to execute")
-
- # List messages
-
- def func_default(_: argparse.Namespace) -> None:
+ def operation_default(self) -> None:
"""
Default operation: list all message in the inbox
"""
- applyMsgs('tag:inbox', print_msg)
- parser.set_defaults(func=func_default)
+ self.engine.apply_msgs('tag:inbox', self.output.print_msg)
- # inbox (default)
- def func_inbox(args: argparse.Namespace) -> None:
+ def operation_inbox(self) -> None:
"""
Inbox operation: list all message in the inbox,
possibly only the unread ones.
"""
- queryStr = 'tag:unread' if args.only_unread else 'tag:inbox'
- applyMsgs(queryStr, print_msg)
+ query_str = 'tag:unread' if self.args.only_unread else 'tag:inbox'
+ self.engine.apply_msgs(query_str, self.output.print_msg)
- parserInbox = subparsers.add_parser(
- "inbox", help="Show unread, unsorted and flagged messages")
- parserInbox.add_argument('-u', '--only-unread', action='store_true',
- help="Show unread messages only")
- # TODO Make this more relevant
- parserInbox.set_defaults(func=func_inbox)
-
- # list folder [--recurse]
- # List actions
-
- # flag msg...
-
- def func_flag(args: argparse.Namespace) -> None:
+ def operation_flag(self) -> None:
"""
Flag operation: Flag user selected messages.
"""
@@ -655,14 +635,9 @@ if __name__ == "__main__":
Flag given message.
"""
msg.add_tag('flagged')
- apply_msgs_input(args.message, flag_msg, write=True)
- parserFlag = subparsers.add_parser("flag", help="Mark messages as flagged")
- parserFlag.add_argument('message', nargs='*', help="Messages")
- parserFlag.set_defaults(func=func_flag)
+ self.apply_msgs_input(self.args.message, flag_msg, write=True)
- # unflag msg...
-
- def func_unflag(args: argparse.Namespace) -> None:
+ def operation_unflag(self) -> None:
"""
Unflag operation: Flag user selected messages.
"""
@@ -671,140 +646,175 @@ if __name__ == "__main__":
Unflag given message.
"""
msg.remove_tag('flagged')
- apply_msgs_input(args.message, unflag_msg, write=True)
- parserUnflag = subparsers.add_parser(
- "unflag", help="Mark messages as not-flagged")
- parserUnflag.add_argument('message', nargs='*', help="Messages")
- parserUnflag.set_defaults(func=func_unflag)
+ self.apply_msgs_input(self.args.message, unflag_msg, write=True)
- # delete msg...
- # spam msg...
- # move dest msg...
- # Read message
-
- # read msg [--html] [--plain] [--browser]
-
- def func_read(args: argparse.Namespace) -> None:
+ def operation_read(self) -> None:
"""
Read operation: show full content of selected message
"""
- apply_msgs_input(args.message, read_msg)
- parserRead = subparsers.add_parser("read", help="Read message")
- parserRead.add_argument('message', nargs=1, help="Messages")
- parserRead.set_defaults(func=func_read)
+ self.apply_msgs_input(self.args.message, self.output.read_msg)
- # attach msg [id] [--save] (list if no id, xdg-open else)
- # Redaction
- # new account
- # reply msg [--all]
- # Folder management
- # tree [folder]
- # mkdir folder
- # rmdir folder (prevent if folder isn't empty (mail/subfolder))
- # (yeah that should do)
- # Meta
- # setup (interactive thing maybe)
-
- # fetch (mbsync, notmuch new, retag, notify; called by greater gods)
-
- def func_fetch(args: argparse.Namespace) -> None:
+ def operation_fetch(self) -> None:
"""
Fetch operation: Sync remote databases with the local one.
"""
# Fetch mails
- log.info("Fetching mails")
- mbsyncConfigPath = os.path.expanduser(
+ self.log.info("Fetching mails")
+ mbsync_config_file = os.path.expanduser(
"~/.config/mbsyncrc") # TODO Better
- cmd = ["mbsync", "--config", mbsyncConfigPath, "--all"]
+ cmd = ["mbsync", "--config", mbsync_config_file, "--all"]
subprocess.run(cmd, check=True)
# Index new mails
- notmuch_new()
+ self.engine.notmuch_new()
# Notify
- notify_all()
+ self.output.notify_all()
# Tag new mails
- applyMsgs('tag:unprocessed', retag_msg, showProgress=True, write=True)
+ self.engine.apply_msgs('tag:unprocessed', self.engine.retag_msg,
+ show_progress=True, write=True)
- parserFetch = subparsers.add_parser(
- "fetch", help="Fetch mail, tag them, and run notifications")
- parserFetch.set_defaults(func=func_fetch)
-
- # Debug
- # debug (various)
-
- def func_expose(_: argparse.Namespace) -> None:
- """
- DEBUG
- """
- # And leave the door open
- def expose_object(msg: typing.Any) -> None:
- """
- DEBUG
- """
- global a
- a = msg
- applyMsgs('tag:tuidyviU45m6flff', expose_object, closeDb=False)
-
- def func_debug(_: argparse.Namespace) -> None:
+ def operation_debug(self) -> None:
"""
DEBUG
"""
from pprint import pprint
- pprint(list_folders())
- parserDebug = subparsers.add_parser(
- "debug", help="Who know what this holds...")
- parserDebug.set_defaults(verbosity='DEBUG')
- parserDebug.set_defaults(func=func_debug)
+ pprint(self.engine.list_folders())
- # retag (all or unprocessed)
- def func_retag(_: argparse.Namespace) -> None:
+ def operation_retag(self) -> None:
"""
Retag operation: Manually retag all the mails in the database.
Mostly debug I suppose.
"""
- applyMsgs('*', retag_msg, showProgress=True, write=True)
- parserRetag = subparsers.add_parser(
- "retag", help="Retag all mails (when you changed configuration)")
- parserRetag.set_defaults(func=func_retag)
+ self.engine.apply_msgs('*', self.engine.retag_msg,
+ show_progress=True, write=True)
- # all
- def func_all(_: argparse.Namespace) -> None:
+ def operation_all(self) -> None:
"""
All operation: list every single message.
"""
- applyMsgs('*', print_msg)
+ self.engine.apply_msgs('*', self.output.print_msg)
- parserAll = subparsers.add_parser("all", help="Show ALL messages")
- parserAll.set_defaults(func=func_all)
+ def add_subparsers(self) -> None:
+ """
+ Add the operation parser to the main parser.
+ """
+ # TODO If the only operation to the parser done are adding argument,
+ # we should automate this.
+ subparsers = self.parser.add_subparsers(help="Action to execute")
- # Init
- args = parser.parse_args()
- perfstep("parse_args")
+ # List messages
+ self.parser.set_defaults(operation=self.operation_default)
+ # inbox (default)
+ parser_inbox = subparsers.add_parser(
+ "inbox", help="Show unread, unsorted and flagged messages")
+ parser_inbox.add_argument('-u', '--only-unread', action='store_true',
+ help="Show unread messages only")
+ # TODO Make this more relevant
+ parser_inbox.set_defaults(operation=self.operation_inbox)
+
+ # list folder [--recurse]
+ # List actions
+
+ # flag msg...
+ parser_flag = subparsers.add_parser(
+ "flag", help="Mark messages as flagged")
+ parser_flag.add_argument('message', nargs='*', help="Messages")
+ parser_flag.set_defaults(operation=self.operation_flag)
+
+ # unflag msg...
+ parser_unflag = subparsers.add_parser(
+ "unflag", help="Mark messages as not-flagged")
+ parser_unflag.add_argument('message', nargs='*', help="Messages")
+ parser_unflag.set_defaults(operation=self.operation_unflag)
+
+ # delete msg...
+ # spam msg...
+ # move dest msg...
+ # Read message
+
+ # read msg [--html] [--plain] [--browser]
+
+ parser_read = subparsers.add_parser("read", help="Read message")
+ parser_read.add_argument('message', nargs=1, help="Messages")
+ parser_read.set_defaults(operation=self.operation_read)
+
+ # attach msg [id] [--save] (list if no id, xdg-open else)
+ # Redaction
+ # new account
+ # reply msg [--all]
+ # Folder management
+ # tree [folder]
+ # mkdir folder
+ # rmdir folder (prevent if folder isn't empty (mail/subfolder))
+ # (yeah that should do)
+ # Meta
+ # setup (interactive thing maybe)
+
+ # fetch (mbsync, notmuch new, retag, notify; called by greater gods)
+
+ parser_fetch = subparsers.add_parser(
+ "fetch", help="Fetch mail, tag them, and run notifications")
+ parser_fetch.set_defaults(operation=self.operation_fetch)
+
+ # Debug
+
+ # debug (various)
+ parser_debug = subparsers.add_parser(
+ "debug", help="Who know what this holds...")
+ parser_debug.set_defaults(verbosity='DEBUG')
+ parser_debug.set_defaults(operation=self.operation_debug)
+
+ # retag (all or unprocessed)
+ parser_retag = subparsers.add_parser(
+ "retag", help="Retag all mails (when you changed configuration)")
+ parser_retag.set_defaults(operation=self.operation_retag)
+
+ # all
+ parser_all = subparsers.add_parser("all", help="Show ALL messages")
+ parser_all.set_defaults(operation=self.operation_all)
+
+ def create_parser(self) -> argparse.ArgumentParser:
+ """
+ Create the main parser that will handle the user arguments.
+ """
+ parser = argparse.ArgumentParser(description="Meh mail client")
+ parser.add_argument('-v', '--verbosity',
+ choices=MelCLI.VERBOSITY_LEVELS, default='WARNING',
+ help="Verbosity of self.log messages")
+ # parser.add_argument('-n', '--dry-run', action='store_true',
+ # help="Don't do anything") # DEBUG
+ default_config_file = os.path.join(
+ xdg.BaseDirectory.xdg_config_home, 'mel', 'accounts.conf')
+ parser.add_argument('-c', '--config', default=default_config_file,
+ help="Accounts config file")
+ return parser
+
+ def __init__(self) -> None:
+ self.log = logging.getLogger("MelCLI")
+
+ self.parser = self.create_parser()
+ self.add_subparsers()
+
+ self.args = self.parser.parse_args()
+ coloredlogs.install(level=self.args.verbosity,
+ fmt='%(levelname)s %(name)s %(message)s')
+
+ self.engine = MelEngine(self.args.config)
+ self.output = MelOutput(self.engine)
+
+ if self.args.operation:
+ self.log.info("Executing operation %s", self.args.operation)
+ self.args.operation()
+
+
+if __name__ == "__main__":
colorama.init()
- coloredlogs.install(level=args.verbosity, fmt='%(levelname)s %(message)s')
- log = logging.getLogger()
-
- log.info("Loading config %s", args.config)
- if not os.path.isfile(args.config):
- log.fatal("config file not found: %s", args.config)
- sys.exit(1)
- # TODO Create it, maybe?
- CONFIG = configparser.ConfigParser()
- CONFIG.read(args.config)
-
- generate_aliases()
- perfstep("config")
-
- if args.func:
- log.info("Executing function %s", args.func)
- args.func(args)
-
- perfstep("exec")
-
- # DEBUG
- for kv in sorted(PERF_DICT.items(), key=lambda p: p[1]):
- log.debug("{1:.6f}s {0}".format(*kv))
- sys.exit(0)
+ try:
+ CLI = MelCLI()
+ except:
+ EXTYPE, VALUE, TB = sys.exc_info()
+ traceback.print_exc()
+ pdb.post_mortem(TB)