226 lines
6.3 KiB
Plaintext
Executable file
226 lines
6.3 KiB
Plaintext
Executable file
#!/usr/bin/env nix-shell
|
|
#! nix-shell -i python3
|
|
#! nix-shell -p python3 python3Packages.colorama python3Packages.configargparse
|
|
|
|
import base64
|
|
import datetime
|
|
import email.utils
|
|
import io
|
|
import pprint
|
|
import subprocess
|
|
import sys
|
|
|
|
import colorama
|
|
import configargparse
|
|
|
|
if __name__ == "__main__":
|
|
parser = configargparse.ArgParser(
|
|
description="Generate SMTP messages to send a mail"
|
|
)
|
|
|
|
now = datetime.datetime.now()
|
|
now_email = email.utils.formatdate(now.timestamp(), True)
|
|
|
|
parser.add_argument("-o", "--origin", env_var="ORIGIN", default="localhost")
|
|
parser.add_argument(
|
|
"-d", "--destination", env_var="DESTINATION", default="localhost"
|
|
)
|
|
parser.add_argument("-p", "--port", env_var="PORT", default=25)
|
|
parser.add_argument(
|
|
"-S",
|
|
"--security",
|
|
env_var="SECURITY",
|
|
choices=["plain", "ssl", "starttls"],
|
|
default="plain",
|
|
)
|
|
|
|
parser.add_argument("-l", "--helo", env_var="HELO")
|
|
parser.add_argument("-L", "--helo-verb", env_var="HELO_VERB", default="EHLO")
|
|
parser.add_argument(
|
|
"-s", "--sender", env_var="SENDER", default="geoffrey@frogeye.fr"
|
|
)
|
|
parser.add_argument(
|
|
"-r",
|
|
"--receiver",
|
|
env_var="RECEIVER",
|
|
default=[],
|
|
action="append",
|
|
)
|
|
# parser.add_argument("-a", "--auth", env_var="AUTH", default="PLAIN")
|
|
parser.add_argument("-u", "--user", env_var="MUSER")
|
|
parser.add_argument("-w", "--password", env_var="PASSWORD")
|
|
|
|
parser.add_argument("-f", "--from", env_var="FROM")
|
|
parser.add_argument("-t", "--to", env_var="TO")
|
|
parser.add_argument("-T", "--reply-to", env_var="REPLYTO")
|
|
parser.add_argument(
|
|
"-j",
|
|
"--subject",
|
|
env_var="SUBJECT",
|
|
default=f"Test message {now.strftime('%H:%M:%S')}",
|
|
)
|
|
parser.add_argument("-8", "--smtputf8", env_var="SMTPUTF8", action="store_true")
|
|
parser.add_argument("-c", "--callout", env_var="CALLOUT", action="store_true")
|
|
|
|
parser.add_argument("-b", "--body", env_var="BODY", default="")
|
|
parser.add_argument("-g", "--gtube", env_var="GTUBE", action="store_true")
|
|
parser.add_argument("-m", "--me", env_var="ME", default="Geoffrey")
|
|
parser.add_argument(
|
|
"-H", "--headers", default=[], action="append", env_var="HEADER"
|
|
)
|
|
|
|
parser.add_argument("-y", "--dryrun", env_var="DRYRUN", action="store_true")
|
|
parser.add_argument("-q", "--quiet", env_var="QUIET", action="store_true")
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Default values
|
|
if not args.receiver:
|
|
args.receiver = ["geoffrey@frogeye.fr"]
|
|
if args.helo is None:
|
|
args.helo = args.origin
|
|
if getattr(args, "from") is None:
|
|
setattr(args, "from", args.sender)
|
|
if args.to is None:
|
|
args.to = args.receiver[0]
|
|
if args.reply_to is None:
|
|
args.reply_to = getattr(args, "from")
|
|
if args.password:
|
|
password = args.password
|
|
args.password = "********"
|
|
mid = email.utils.make_msgid(domain=args.helo)
|
|
|
|
# Transmission content
|
|
|
|
headers = ""
|
|
if args.headers:
|
|
headers = "\n" + "\n".join(args.headers)
|
|
|
|
gtube = ""
|
|
if args.gtube:
|
|
gtube = """
|
|
|
|
XJS*C4JDBQADN1.NSBN3*2IDNEN*GTUBE-STANDARD-ANTI-UBE-TEST-EMAIL*C.34X"""
|
|
|
|
body = ""
|
|
if args.body:
|
|
body = f"\n\n{args.body}"
|
|
|
|
text = f"""Date: {now_email}
|
|
From: {args.me} <{getattr(args, 'from')}>
|
|
Subject: {args.subject}
|
|
To: {args.to}
|
|
Reply-To: {args.reply_to}
|
|
Message-ID: {mid}{headers}
|
|
|
|
Hello there,
|
|
|
|
This is a test message, generated from a template.
|
|
If you didn't expect to see this message, please contact {args.me}.{gtube}{body}
|
|
|
|
Greetings,
|
|
|
|
Input arguments:
|
|
{pprint.pformat(args.__dict__, indent=4)}
|
|
|
|
--
|
|
{args.me}
|
|
."""
|
|
|
|
# Transmission setup
|
|
cmd = []
|
|
if args.origin != "localhost":
|
|
cmd += ["ssh", args.origin]
|
|
if args.security == "plain":
|
|
cmd += ["socat", "-", f"tcp:{args.destination}:{args.port}"]
|
|
elif args.security == "ssl":
|
|
cmd += ["socat", "-", f"openssl:{args.destination}:{args.port}"]
|
|
elif args.security == "starttls":
|
|
cmd += [
|
|
"openssl",
|
|
"s_client",
|
|
"-starttls",
|
|
"smtp",
|
|
"-crlf",
|
|
"-connect",
|
|
f"{args.destination}:{args.port}",
|
|
"-quiet",
|
|
]
|
|
|
|
if not args.quiet:
|
|
print(colorama.Fore.MAGENTA + f"# {' '.join(cmd)}" + colorama.Fore.RESET)
|
|
|
|
if not args.dryrun:
|
|
p = subprocess.Popen(
|
|
cmd,
|
|
stdin=subprocess.PIPE,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.DEVNULL,
|
|
)
|
|
|
|
def recv() -> None:
|
|
if args.dryrun:
|
|
return
|
|
|
|
assert isinstance(p.stdout, io.BufferedReader)
|
|
next = True
|
|
while next:
|
|
line = p.stdout.readline()
|
|
try:
|
|
code = int(line[:3])
|
|
except ValueError:
|
|
raise ValueError(f"Could not parse line: '{line.decode()}'")
|
|
success = code < 400
|
|
color = colorama.Fore.GREEN if success else colorama.Fore.RED
|
|
if not args.quiet:
|
|
print(color + f"< {line[:-1].decode()}" + colorama.Fore.RESET)
|
|
next = line[3] == b"-"[0]
|
|
if not next and not success:
|
|
send("QUIT") # TODO Can loop if QUIT fails
|
|
sys.exit(1)
|
|
|
|
def _send(command: str) -> None:
|
|
if not args.quiet:
|
|
print(colorama.Fore.BLUE + f"> {command}" + colorama.Fore.RESET)
|
|
|
|
if args.dryrun:
|
|
return
|
|
|
|
assert isinstance(p.stdin, io.BufferedWriter)
|
|
cmd = command.encode() + b"\n"
|
|
p.stdin.write(cmd)
|
|
p.stdin.flush()
|
|
|
|
def send(command: str) -> None:
|
|
_send(command)
|
|
recv()
|
|
|
|
# Transmission
|
|
|
|
if args.security != "starttls":
|
|
recv()
|
|
send(f"{args.helo_verb} {args.helo}")
|
|
if args.user:
|
|
encoded = base64.b64encode(
|
|
args.user.encode()
|
|
+ b"\x00"
|
|
+ args.user.encode()
|
|
+ b"\x00"
|
|
+ password.encode()
|
|
).decode()
|
|
send(f"AUTH PLAIN {encoded}")
|
|
send(f"MAIL FROM:<{args.sender}>" + (" SMTPUTF8" if args.smtputf8 else ""))
|
|
for receiver in args.receiver:
|
|
send(f"RCPT TO:<{receiver}>")
|
|
if not args.callout:
|
|
send("DATA")
|
|
send(text)
|
|
send("QUIT")
|
|
sys.exit(0)
|
|
|
|
# For reference:
|
|
# send("RSET")
|
|
# send("VRFY")
|
|
# send("NOOP")
|
|
# send("QUIT")
|