#!/usr/bin/env python3 """ Script that download videos that are linked as an article in a RSS feed. The common use case would be a feed from an RSS aggregator with the unread items (non-video links are ignored). """ import enum import functools import logging import os import pickle import random import re import subprocess import sys import typing import urllib.parse import urllib.request from xml.dom import minidom import coloredlogs import configargparse import yt_dlp as youtube_dl log = logging.getLogger(__name__) def configure_logging(args: configargparse.Namespace) -> None: # Configure logging if args.verbosity: coloredlogs.install( level=args.verbosity, ) else: coloredlogs.install( fmt="%(message)s", logger=log, ) class RVCommand(enum.Enum): download = "download" list = "list" watch = "watch" class RVElement: parent: "RVDatabase" item: minidom.Element was_downloaded: bool watched: bool def __init__(self, parent: "RVDatabase", item: minidom.Element) -> None: self.parent = parent self.item = item self.was_downloaded = False self.watched = False def get_tag_data(self, tag_name: str) -> str: nodes = self.item.getElementsByTagName(tag_name) if len(nodes) != 1: raise KeyError(f"Exepected 1 tag `{tag_name}`, got {len(nodes)}.") children = nodes[0].childNodes if len(children) != 1: raise KeyError( f"Exepected 1 children for tag `{tag_name}`, got {len(children)}." ) return children[0].data @property def title(self) -> str: return self.get_tag_data("title") @property def link(self) -> str: return self.get_tag_data("link") @property def creator(self) -> typing.Optional[str]: try: return self.get_tag_data("dc:creator") except KeyError: return None @property def description(self) -> str: # TODO Testing return self.get_tag_data("description") @property def date(self) -> str: # TODO datetime format return self.get_tag_data("pubDate") @property def guid(self) -> int: return int(self.get_tag_data("guid")) def read_cache(self, cache: "RVElement") -> None: if "ytdl_infos" in cache.__dict__: self.__dict__["ytdl_infos"] = cache.__dict__["ytdl_infos"] log.debug(f"From cache: {self}") if cache.was_downloaded: self.was_downloaded = True if cache.watched: self.watched = True def __str__(self) -> str: return f"{self.guid}: {self.creator} – {self.title} – {self.link}" @property def downloaded(self) -> bool: if "ytdl_infos" not in self.__dict__: return False return os.path.isfile(self.filepath) @functools.cached_property def ytdl_infos(self) -> typing.Optional[dict]: log.info(f"Researching: {self}") try: infos = self.parent.ytdl_dry.extract_info(self.link) except KeyboardInterrupt as e: raise e except youtube_dl.utils.DownloadError as e: # TODO Still raise in case of temporary network issue log.warn(e) infos = None # Apparently that thing is transformed from a LazyList # somewhere in the normal yt_dlp process if ( infos and "thumbnails" in infos and isinstance(infos["thumbnails"], youtube_dl.utils.LazyList) ): infos["thumbnails"] = infos["thumbnails"].exhaust() # Save database once it's been computed self.__dict__["ytdl_infos"] = infos self.parent.save() return infos @property def duration(self) -> int: assert self.is_video assert self.ytdl_infos return self.ytdl_infos["duration"] @property def skip(self) -> bool: assert self.is_video if ( self.parent.args.max_duration > 0 and self.duration > self.parent.args.max_duration ): return True return False @property def is_video(self) -> bool: # Duration might be missing in playlists and stuff return self.ytdl_infos is not None and "duration" in self.ytdl_infos @property def filepath(self) -> str: assert self.is_video # TODO This doesn't change the extension to mkv when the formats are incomaptible return self.parent.ytdl_dry.prepare_filename(self.ytdl_infos) @property def filename(self) -> str: assert self.is_video return os.path.splitext(self.filepath)[0] def download(self) -> None: assert self.is_video log.info(f"Downloading: {self}") if not self.parent.args.dryrun: self.parent.ytdl.process_ie_result(self.ytdl_infos, True, {}) self.was_downloaded = True self.parent.save() def act(self) -> None: if not self.is_video: log.debug(f"Not a video: {self}") return if self.downloaded: log.debug(f"Currently downloaded: {self}") return if self.was_downloaded: log.debug(f"Downloaded previously: {self}") return if self.skip: log.debug(f"Skipped: {self}") return self.download() MATCHES_DURATION_MULTIPLIERS = {"s": 1, "m": 60, "h": 3600, None: 1} MATCHES_DURATION_COMPARATORS = { "<": int.__lt__, "-": int.__lt__, ">": int.__gt__, "+": int.__gt__, "=": int.__eq__, None: int.__le__, } def matches_search(self, args: configargparse.Namespace) -> bool: if not self.is_video: return False if self.watched: return False if args.title and not re.search(args.title, self.title): return False if args.creator and not re.search(args.creator, self.creator): return False if args.guid and not re.search(args.guid, str(self.guid)): return False if args.link and not re.search(args.link, self.link): return False if args.duration: dur = args.duration mult_index = dur[-1].lower() if mult_index.isdigit(): mult_index = None else: dur = dur[:-1] try: multiplier = self.MATCHES_DURATION_MULTIPLIERS[mult_index] except IndexError: raise ValueError(f"Unknown duration multiplier: {mult_index}") comp_index = dur[0] if comp_index.isdigit(): comp_index = None else: dur = dur[1:] try: comparator = self.MATCHES_DURATION_COMPARATORS[comp_index] except IndexError: raise ValueError(f"Unknown duration comparator: {comp_index}") duration = int(dur) if not comparator(self.duration, duration * multiplier): return False return True def watch(self) -> None: if not self.downloaded: self.download() proc = subprocess.run(['mpv', self.filepath]) proc.check_returncode() self.watched = True self.parent.save() class RVDatabase: SAVE_FILE = ".cache.p" args: configargparse.Namespace elements: list[RVElement] def __init__(self, args: configargparse.Namespace) -> None: self.args = args def save(self) -> None: log.debug("Saving cache") if self.args.dryrun: return with open(self.SAVE_FILE, "wb") as save_file: pickle.dump(self, save_file) @classmethod def load(cls) -> typing.Optional["RVDatabase"]: try: with open(cls.SAVE_FILE, "rb") as save_file: return pickle.load(save_file) except (TypeError, AttributeError, EOFError): log.warn("Corrupt / outdated cache, it will be rebuilt.") except FileNotFoundError: pass return None def read_cache(self, cache: "RVDatabase") -> None: cache_els = dict() for cache_el in cache.elements: cache_els[cache_el.guid] = cache_el for el in self.elements: if el.guid in cache_els: el.read_cache(cache_els[el.guid]) @functools.cached_property def feed_xml(self) -> minidom.Document: with urllib.request.urlopen(self.args.feed) as request: return minidom.parse(request) def read_feed(self) -> None: log.info("Fetching RSS feed") self.elements = list() for item in self.feed_xml.getElementsByTagName("item"): element = RVElement(self, item) self.elements.insert(0, element) log.debug(f"Known: {element}") def clean(self) -> None: filenames = set() for element in self.elements: if element.is_video and not element.skip: filenames.add(element.filename) for file in os.listdir(): if file == RVDatabase.SAVE_FILE: continue if not os.path.isfile(file): continue for filename in filenames: if file.startswith(filename): break else: log.info(f"Removing: {file}") if not self.args.dryrun: os.unlink(file) def act_all(self) -> None: for element in self.elements: element.act() @property def ytdl_opts(self) -> dict: return {"format": self.args.format, "allsubtitles": self.args.subtitles} @property def ytdl_dry_opts(self) -> dict: opts = self.ytdl_opts.copy() opts.update({"simulate": True, "quiet": True}) return opts @property def ytdl(self) -> youtube_dl.YoutubeDL: return youtube_dl.YoutubeDL(self.ytdl_opts) @property def ytdl_dry(self) -> youtube_dl.YoutubeDL: return youtube_dl.YoutubeDL(self.ytdl_dry_opts) def get_args() -> configargparse.Namespace: defaultConfigPath = os.path.join( os.path.expanduser(os.getenv("XDG_CONFIG_PATH", "~/.config/")), "rssVideos" ) parser = configargparse.ArgParser( description="Download videos linked in " + "a RSS feed (e.g. an unread feed from " + "an RSS aggregator", default_config_files=[defaultConfigPath], ) parser.add_argument( "-v", "--verbosity", choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], default=None, help="Verbosity of log messages", ) parser.add( "-c", "--config", required=False, is_config_file=True, help="Configuration file" ) parser.add( "--feed", help="URL of the RSS feed (must be public for now)", env_var="RSS_VIDEOS_FEED", required=True, ) parser.add( "--videos", help="Directory to store videos", env_var="RSS_VIDEOS_VIDEO_DIR", required=True, ) parser.add( "-n", "--dryrun", help="Do not download the videos", action="store_const", const=True, default=False, ) parser.add( "--max-duration", help="Skip video longer than this amount of seconds", env_var="RSS_VIDEOS_MAX_DURATION", type=int, default=0, ) parser.add( "--format", help="Use this format to download videos." + " See FORMAT SELECTION in youtube-dl(1)", env_var="RSS_VIDEOS_FORMAT", default="bestvideo+bestaudio/best", ) parser.add( "--subtitles", help="Download all subtitles", env_var="RSS_VIDEOS_SUBTITLES", action="store_true", ) parser.set_defaults(subcommand=RVCommand.download) subparsers = parser.add_subparsers(title="subcommand") sc_download = subparsers.add_parser("download") sc_download.set_defaults(subcommand=RVCommand.download) sc_list = subparsers.add_parser("list") sc_list.set_defaults(subcommand=RVCommand.list) sc_watch = subparsers.add_parser("watch") sc_watch.set_defaults(subcommand=RVCommand.watch) sc_watch.add("order", choices=("old", "new", "random"), nargs="?", default="old", help="Watch X first") # TODO Command to watch multiple # Common arguments for filtering for sc in (sc_list, sc_watch): sc.add("--guid", help="Regex to filter guid") sc.add("--creator", help="Regex to filter by creator") sc.add("--title", help="Regex to filter by title") sc.add("--link", help="Regex to filter by link") sc.add("--duration", help="Comparative to filter by duration") args = parser.parse_args() args.videos = os.path.realpath(os.path.expanduser(args.videos)) return args def main() -> None: args = get_args() configure_logging(args) os.makedirs(args.videos, exist_ok=True) os.chdir(args.videos) # TODO Abstract a bit if args.subcommand == RVCommand.download: database = RVDatabase(args) database.read_feed() cache = RVDatabase.load() if cache: database.read_cache(cache) database.clean() database.act_all() database.save() elif args.subcommand == RVCommand.list: cache = RVDatabase.load() if not cache: raise FileNotFoundError("This command doesn't work without a cache yet.") for element in cache.elements: if not element.matches_search(args): continue print(element) elif args.subcommand == RVCommand.watch: cache = RVDatabase.load() if not cache: raise FileNotFoundError("This command doesn't work without a cache yet.") elements = cache.elements.copy() if args.order == "new": elements = reversed(elements) elif args.order == "random": random.shuffle(elements) for element in elements: if not element.matches_search(args): continue element.watch() break if __name__ == "__main__": main()