dotfiles/config/scripts/smtpdummy

224 lines
6.2 KiB
Python
Executable file

#!/usr/bin/env python3
import base64
import datetime
import email.utils
import io
import pprint
import subprocess
import sys
import colorama
import configargparse
if __name__ == "__main__":
parser = configargparse.ArgParser(
description="Generate SMTP messages to send a mail"
)
now = datetime.datetime.now()
now_email = email.utils.formatdate(now.timestamp(), True)
parser.add_argument("-o", "--origin", env_var="ORIGIN", default="localhost")
parser.add_argument(
"-d", "--destination", env_var="DESTINATION", default="localhost"
)
parser.add_argument("-p", "--port", env_var="PORT", default=25)
parser.add_argument(
"-S",
"--security",
env_var="SECURITY",
choices=["plain", "ssl", "starttls"],
default="plain",
)
parser.add_argument("-l", "--helo", env_var="HELO")
parser.add_argument("-L", "--helo-verb", env_var="HELO_VERB", default="EHLO")
parser.add_argument(
"-s", "--sender", env_var="SENDER", default="geoffrey@frogeye.fr"
)
parser.add_argument(
"-r",
"--receiver",
env_var="RECEIVER",
default=[],
action="append",
)
# parser.add_argument("-a", "--auth", env_var="AUTH", default="PLAIN")
parser.add_argument("-u", "--user", env_var="MUSER")
parser.add_argument("-w", "--password", env_var="PASSWORD")
parser.add_argument("-f", "--from", env_var="FROM")
parser.add_argument("-t", "--to", env_var="TO")
parser.add_argument("-T", "--reply-to", env_var="REPLYTO")
parser.add_argument(
"-j",
"--subject",
env_var="SUBJECT",
default=f"Test message {now.strftime('%H:%M:%S')}",
)
parser.add_argument("-8", "--smtputf8", env_var="SMTPUTF8", action="store_true")
parser.add_argument("-c", "--callout", env_var="CALLOUT", action="store_true")
parser.add_argument("-b", "--body", env_var="BODY", default="")
parser.add_argument("-g", "--gtube", env_var="GTUBE", action="store_true")
parser.add_argument("-m", "--me", env_var="ME", default="Geoffrey")
parser.add_argument(
"-H", "--headers", default=[], action="append", env_var="HEADER"
)
parser.add_argument("-y", "--dryrun", env_var="DRYRUN", action="store_true")
parser.add_argument("-q", "--quiet", env_var="QUIET", action="store_true")
args = parser.parse_args()
# Default values
if not args.receiver:
args.receiver = ["geoffrey@frogeye.fr"]
if args.helo is None:
args.helo = args.origin
if getattr(args, "from") is None:
setattr(args, "from", args.sender)
if args.to is None:
args.to = args.receiver[0]
if args.reply_to is None:
args.reply_to = getattr(args, "from")
if args.password:
password = args.password
args.password = "********"
mid = email.utils.make_msgid(domain=args.helo)
# Transmission content
headers = ""
if args.headers:
headers = "\n" + "\n".join(args.headers)
gtube = ""
if args.gtube:
gtube = """
XJS*C4JDBQADN1.NSBN3*2IDNEN*GTUBE-STANDARD-ANTI-UBE-TEST-EMAIL*C.34X"""
body = ""
if args.body:
body = f"\n\n{args.body}"
text = f"""Date: {now_email}
From: {args.me} <{getattr(args, 'from')}>
Subject: {args.subject}
To: {args.to}
Reply-To: {args.reply_to}
Message-ID: {mid}{headers}
Hello there,
This is a test message, generated from a template.
If you didn't expect to see this message, please contact {args.me}.{gtube}{body}
Greetings,
Input arguments:
{pprint.pformat(args.__dict__, indent=4)}
--
{args.me}
."""
# Transmission setup
cmd = []
if args.origin != "localhost":
cmd += ["ssh", args.origin]
if args.security == "plain":
cmd += ["socat", "-", f"tcp:{args.destination}:{args.port}"]
elif args.security == "ssl":
cmd += ["socat", "-", f"openssl:{args.destination}:{args.port}"]
elif args.security == "starttls":
cmd += [
"openssl",
"s_client",
"-starttls",
"smtp",
"-crlf",
"-connect",
f"{args.destination}:{args.port}",
"-quiet",
]
if not args.quiet:
print(colorama.Fore.MAGENTA + f"# {' '.join(cmd)}" + colorama.Fore.RESET)
if not args.dryrun:
p = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
)
def recv() -> None:
if args.dryrun:
return
assert isinstance(p.stdout, io.BufferedReader)
next = True
while next:
line = p.stdout.readline()
try:
code = int(line[:3])
except ValueError:
raise ValueError(f"Could not parse line: '{line.decode()}'")
success = code < 400
color = colorama.Fore.GREEN if success else colorama.Fore.RED
if not args.quiet:
print(color + f"< {line[:-1].decode()}" + colorama.Fore.RESET)
next = line[3] == b"-"[0]
if not next and not success:
send("QUIT") # TODO Can loop if QUIT fails
sys.exit(1)
def _send(command: str) -> None:
if not args.quiet:
print(colorama.Fore.BLUE + f"> {command}" + colorama.Fore.RESET)
if args.dryrun:
return
assert isinstance(p.stdin, io.BufferedWriter)
cmd = command.encode() + b"\n"
p.stdin.write(cmd)
p.stdin.flush()
def send(command: str) -> None:
_send(command)
recv()
# Transmission
if args.security != "starttls":
recv()
send(f"{args.helo_verb} {args.helo}")
if args.user:
encoded = base64.b64encode(
args.user.encode()
+ b"\x00"
+ args.user.encode()
+ b"\x00"
+ password.encode()
).decode()
send(f"AUTH PLAIN {encoded}")
send(f"MAIL FROM:<{args.sender}>" + (" SMTPUTF8" if args.smtputf8 else ""))
for receiver in args.receiver:
send(f"RCPT TO:<{receiver}>")
if not args.callout:
send("DATA")
send(text)
send("QUIT")
sys.exit(0)
# For reference:
# send("RSET")
# send("VRFY")
# send("NOOP")
# send("QUIT")