#!/usr/bin/env python3 """ Script that download videos that are linked as an article in a RSS feed. The common use case would be a feed from an RSS aggregator with the unread items (non-video links are ignored). """ import enum import functools import logging import os import pickle import random import re import subprocess import sys import time import typing import urllib.parse import urllib.request import urllib.error from xml.dom import minidom import coloredlogs import configargparse import yt_dlp log = logging.getLogger(__name__) # TODO Lockfile, or a way to parallel watch and download # TODO Save ytdl infos and view info separately def configure_logging(args: configargparse.Namespace) -> None: # Configure logging if args.verbosity: coloredlogs.install( level=args.verbosity, ) else: coloredlogs.install( fmt="%(message)s", logger=log, ) class SaveInfoPP(yt_dlp.postprocessor.common.PostProcessor): """ yt_dlp.process_ie_result() doesn't return a completely updated info dict, notably the extension is still the one before it realizes the files cannot be merged. So we use this PostProcessor to catch the info dict in its final form and save it. """ def __init__(self, rvelement: "RVElement") -> None: self.rvelement = rvelement super().__init__() def run(self, info: dict) -> tuple[list, dict]: self.rvelement.ytdl_infos = info return [], info def parse_duration(string: str) -> int: DURATION_MULTIPLIERS = {"s": 1, "m": 60, "h": 3600, "": 1} mult_index = string[-1].lower() if mult_index.isdigit(): mult_index = "" else: string = string[:-1] try: multiplier = DURATION_MULTIPLIERS[mult_index] except IndexError: raise ValueError(f"Unknown duration multiplier: {mult_index}") return int(string) * multiplier def compare_duration(compstr: str) -> typing.Callable[[int], bool]: DURATION_COMPARATORS = { "<": int.__lt__, "-": int.__lt__, ">": int.__gt__, "+": int.__gt__, "=": int.__eq__, "": int.__le__, } comp_index = compstr[0] if comp_index.isdigit(): comp_index = "" else: compstr = compstr[1:] try: comparator = DURATION_COMPARATORS[comp_index] except IndexError: raise ValueError(f"Unknown duration comparator: {comp_index}") duration = parse_duration(compstr) return lambda d: comparator(d, duration) def format_duration(duration: int) -> str: return time.strftime("%H:%M:%S", time.gmtime(duration)) class RVElement: parent: "RVDatabase" item: minidom.Element was_downloaded: bool watched: bool def __init__(self, parent: "RVDatabase", item: minidom.Element) -> None: self.parent = parent self.item = item self.was_downloaded = False self.watched = False def get_tag_data(self, tag_name: str) -> str: nodes = self.item.getElementsByTagName(tag_name) if len(nodes) != 1: raise KeyError(f"Exepected 1 tag `{tag_name}`, got {len(nodes)}.") children = nodes[0].childNodes if len(children) != 1: raise KeyError( f"Exepected 1 children for tag `{tag_name}`, got {len(children)}." ) return children[0].data @property def title(self) -> str: return self.get_tag_data("title") @property def link(self) -> str: return self.get_tag_data("link") @property def creator(self) -> typing.Optional[str]: try: return self.get_tag_data("dc:creator") except KeyError: return None @property def description(self) -> str: # TODO Testing return self.get_tag_data("description") @property def date(self) -> str: # TODO datetime format return self.get_tag_data("pubDate") @property def guid(self) -> int: return int(self.get_tag_data("guid")) @property def is_researched(self) -> bool: return "ytdl_infos" in self.__dict__ def salvage_cache(self, cache: "RVElement") -> None: if not self.parent.args.research and cache.is_researched: self.__dict__["ytdl_infos"] = cache.__dict__["ytdl_infos"] log.debug(f"From cache: {self}") # if cache.was_downloaded: # self.was_downloaded = True if cache.watched: self.watched = True def __str__(self) -> str: str = f"{self.guid}: {self.creator if self.creator else '?'} – {self.title}" if self.is_researched: if self.is_video: str += f" ({format_duration(self.duration)})" else: str += " (N/A)" else: str += " (?)" str += f" – {self.link}" return str @property def downloaded(self) -> bool: if not self.is_researched: return False return os.path.isfile(self.filepath) @functools.cached_property def ytdl_infos(self) -> typing.Optional[dict]: # TODO Sanitize according to documentation log.info(f"Researching: {self}") try: infos = self.parent.ytdl_dry.extract_info(self.link, download=False) except KeyboardInterrupt as e: raise e except yt_dlp.utils.DownloadError as e: # TODO Still raise in case of temporary network issue log.warning(e) infos = None # Apparently that thing is transformed from a LazyList # somewhere in the normal yt_dlp process if ( infos and "thumbnails" in infos and isinstance(infos["thumbnails"], yt_dlp.utils.LazyList) ): infos["thumbnails"] = infos["thumbnails"].exhaust() # Save database once it's been computed self.__dict__["ytdl_infos"] = infos self.parent.save() return infos @property def duration(self) -> int: assert self.is_video assert self.ytdl_infos return self.ytdl_infos["duration"] @property def is_video(self) -> bool: # Duration might be missing in playlists and stuff return self.ytdl_infos is not None and "duration" in self.ytdl_infos @property def filepath(self) -> str: assert self.is_video return self.parent.ytdl_dry.prepare_filename(self.ytdl_infos) @property def filename(self) -> str: assert self.is_video return os.path.splitext(self.filepath)[0] def download(self) -> None: assert self.is_video log.info(f"Downloading: {self}") if not self.parent.args.dryrun: with yt_dlp.YoutubeDL(self.parent.ytdl_opts) as ydl: ydl.add_post_processor(SaveInfoPP(self)) ydl.process_ie_result(self.ytdl_infos, download=True) self.was_downloaded = True self.parent.save() def preload(self) -> None: assert self.is_video if self.downloaded: log.debug(f"Currently downloaded: {self}") return if self.was_downloaded: log.debug(f"Downloaded previously: {self}") return self.download() def matches_filter(self, args: configargparse.Namespace) -> bool: # Inexpensive filters if args.seen != "any" and (args.seen == "seen") != self.watched: log.debug(f"Not {args.seen}: {self}") return False if args.title and not re.search(args.title, self.title): log.debug(f"Title not matching {args.title}: {self}") return False if args.guid and not re.search(args.guid, str(self.guid)): log.debug(f"Guid not matching {args.guid}: {self}") return False if args.link and not re.search(args.link, self.link): log.debug(f"Link not matching {args.link}: {self}") return False if args.creator and ( not self.creator or not re.search(args.creator, self.creator) ): log.debug(f"Creator not matching {args.creator}: {self}") return False # Expensive filters if not self.is_video: log.debug(f"Not a video: {self}") return False if args.duration and not compare_duration(args.duration)(self.duration): log.debug( f"Duration {self.duration} not matching {args.duration}: {self}" ) return False return True def watch(self) -> None: if not self.downloaded: self.download() cmd = ["mpv", self.filepath] log.debug(f"Running {cmd}") if not self.parent.args.dryrun: proc = subprocess.run(cmd) proc.check_returncode() self.watched = True self.parent.save() def clean(self) -> None: assert self.is_video log.info(f"Removing gone video: {self.filename}*") for file in os.listdir(): if file.startswith(self.filename): log.debug(f"Removing file: {file}") if not self.parent.args.dryrun: os.unlink(file) class RVDatabase: SAVE_FILE = ".cache.p" args: configargparse.Namespace elements: list[RVElement] def __init__(self, args: configargparse.Namespace) -> None: self.args = args def save(self) -> None: log.debug("Saving cache") if self.args.dryrun: return with open(self.SAVE_FILE, "wb") as save_file: pickle.dump(self, save_file) @classmethod def load(cls) -> typing.Optional["RVDatabase"]: try: with open(cls.SAVE_FILE, "rb") as save_file: return pickle.load(save_file) except (TypeError, AttributeError, EOFError): log.warning("Corrupt / outdated cache, it will be rebuilt.") except FileNotFoundError: pass return None def salvage_cache(self, cache: "RVDatabase") -> None: log.debug(f"Salvaging cache") cache_els = dict() for cache_el in cache.elements: cache_els[cache_el.guid] = cache_el for el in self.elements: if el.guid in cache_els: el.salvage_cache(cache_els[el.guid]) def clean_cache(self, cache: "RVDatabase") -> None: log.debug(f"Cleaning cache") self_els = dict() for self_el in self.elements: self_els[self_el.guid] = self_el for el in cache.elements: if el.guid not in self_els: if el.is_researched and el.is_video: el.clean() def import_cache(self, cache: "RVDatabase") -> None: log.debug(f"Importing cache") self.feed_xml = cache.feed_xml self.read_feed() @functools.cached_property def feed_xml(self) -> minidom.Document: log.info("Fetching RSS feed") with urllib.request.urlopen(self.args.feed) as request: return minidom.parse(request) def read_feed(self) -> None: self.elements = [] for item in self.feed_xml.getElementsByTagName("item"): element = RVElement(self, item) self.elements.insert(0, element) log.debug(f"Known: {element}") def clean(self) -> None: log.debug("Cleaning") filenames = set() for element in self.elements: if element.is_video: filenames.add(element.filename) for file in os.listdir(): if file == RVDatabase.SAVE_FILE: continue if not os.path.isfile(file): continue for filename in filenames: if file.startswith(filename): break else: log.info(f"Removing unknown file: {file}") if not self.args.dryrun: os.unlink(file) @property def all_researched(self) -> bool: for element in self.elements: if not element.is_researched: return False return True def attempt_clean(self) -> None: if self.all_researched: self.clean() @property def ytdl_opts(self) -> dict: return {"format": self.args.format, "allsubtitles": self.args.subtitles} @property def ytdl_dry_opts(self) -> dict: opts = self.ytdl_opts.copy() opts.update({"quiet": True}) return opts @property def ytdl_dry(self) -> yt_dlp.YoutubeDL: return yt_dlp.YoutubeDL(self.ytdl_dry_opts) def filter(self, args: configargparse.Namespace) -> typing.Iterable[RVElement]: elements: typing.Iterable[RVElement] # Inexpensive sort if args.order == "new": elements = reversed(self.elements) elif args.order == "title": elements = sorted(self.elements, key=lambda el: el.title) elif args.order == "creator": elements = sorted(self.elements, key=lambda el: el.creator or "") elif args.order == "link": elements = sorted(self.elements, key=lambda el: el.link) elif args.order == "random": elements_random = self.elements.copy() random.shuffle(elements_random) elements = elements_random else: elements = self.elements # Possibly expensive filtering elements = filter(lambda el: el.matches_filter(args), elements) # Expensive sort if args.order == "short": elements = sorted( elements, key=lambda el: el.duration if el.is_video else 0 ) elif args.order == "long": elements = sorted( elements, key=lambda el: el.duration if el.is_video else 0, reverse=True ) # Post sorting filtering if args.total_duration: rem = parse_duration(args.total_duration) old_els = list(elements) elements = list() while rem > 0: for el in old_els: if el.duration < rem: elements.append(el) rem -= el.duration old_els.remove(el) break else: break return elements def get_args() -> configargparse.Namespace: defaultConfigPath = os.path.join( os.path.expanduser(os.getenv("XDG_CONFIG_PATH", "~/.config/")), "rssVideos" ) parser = configargparse.ArgParser( description="Download videos linked in " + "a RSS feed (e.g. an unread feed from " + "an RSS aggregator", default_config_files=[defaultConfigPath], ) # Runtime settings parser.add_argument( "-v", "--verbosity", choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], default=None, help="Verbosity of log messages", ) parser.add( "-c", "--config", required=False, is_config_file=True, help="Configuration file" ) parser.add( "-n", "--dryrun", help="Only pretend to do actions", action="store_const", const=True, default=False, ) # Input/Output parser.add( "--feed", help="URL of the RSS feed (must be public for now)", env_var="RSS_VIDEOS_FEED", required=True, ) parser.add( "--research", help="Fetch video info again", action="store_true", ) parser.add( "--no-refresh", dest="refresh", help="Don't fetch feed", action="store_false", ) parser.add( "--videos", help="Directory to store videos", env_var="RSS_VIDEOS_VIDEO_DIR", required=True, ) # Which videos parser.add( "--order", choices=("old", "new", "title", "creator", "link", "short", "long", "random"), default="old", help="Sorting mechanism", ) parser.add("--guid", help="Regex to filter guid") parser.add("--creator", help="Regex to filter by creator") parser.add("--title", help="Regex to filter by title") parser.add("--link", help="Regex to filter by link") parser.add("--duration", help="Comparative to filter by duration") parser.add( "--seen", choices=("seen", "unseen", "any"), default="unseen", help="Only include seen/unseen/any videos", ) parser.add( "--total-duration", help="Use videos that fit under the total given", ) # TODO Envrionment variables parser.add( "--max-duration", help="(Deprecated, use --duration instead)", env_var="RSS_VIDEOS_MAX_DURATION", type=int, default=0, ) # TODO Allow to ask # How to download parser.add( "--format", help="Use this format to download videos." + " See FORMAT SELECTION in youtube-dl(1)", env_var="RSS_VIDEOS_FORMAT", default="bestvideo+bestaudio/best", ) parser.add( "--subtitles", help="Download all subtitles", env_var="RSS_VIDEOS_SUBTITLES", action="store_true", ) parser.add( "action", nargs="?", choices=( "download", "list", "watch", "binge", "clean", "seen", "unseen", ), default="download", ) args = parser.parse_args() args.videos = os.path.realpath(os.path.expanduser(args.videos)) if not args.duration and args.max_duration: args.duration = str(args.max_duration) return args def main() -> None: args = get_args() configure_logging(args) os.makedirs(args.videos, exist_ok=True) os.chdir(args.videos) database = RVDatabase(args) cache = RVDatabase.load() feed_fetched = False if args.refresh: try: database.read_feed() feed_fetched = True except urllib.error.URLError as err: if args.action == "download": raise RuntimeError("Couldn't fetch feed, refusing to download") # This is a quirky failsafe in case of no internet connection, # so the script doesn't go noting that no element is a video. if not feed_fetched: if cache: log.warning("Using cached feed.") database.import_cache(cache) else: raise FileNotFoundError("Feed not fetched and no cached feed.") if cache: database.salvage_cache(cache) database.clean_cache(cache) database.save() log.debug(f"Running action") if args.action == "clean": database.clean() else: database.attempt_clean() duration = 0 for element in database.filter(args): if args.action == "download": element.preload() elif args.action == "list": print(element) elif args.action in ("watch", "binge"): element.watch() if args.action == "watch": break elif args.action == "seen": if not element.watched: log.info(f"Maked as seen: {element}") element.watched = True elif args.action == "unseen": if element.watched: log.info(f"Maked as unseen: {element}") element.watched = False else: raise NotImplementedError(f"Unimplemented action: {args.action}") duration += element.duration if element.is_video else 0 log.info(f"Total duration: {format_duration(duration)}") database.attempt_clean() database.save() if __name__ == "__main__": main()