#!/usr/bin/env python3 """ Script that download videos that are linked as an article in a RSS feed. The common use case would be a feed from an RSS aggregator with the unread items (non-video links are ignored). """ # TODO Distribute this correclty, in the meanwhile please do # pip install --user coloredlogs ConfigArgParse yt-dlp import enum import functools import logging import os import pickle import sys import typing import urllib.parse import urllib.request from xml.dom import minidom import coloredlogs import configargparse import yt_dlp as youtube_dl log = logging.getLogger(__name__) def configure_logging(args: configargparse.Namespace) -> None: # Configure logging if args.verbosity: coloredlogs.install( level=args.verbosity, ) else: coloredlogs.install( fmt="%(message)s", logger=log, ) class RVCommand(enum.Enum): download = "download" list = "list" class RVElement: title: str link: str # creator: str # description: str # date: datetime.datetime guid: int parent: "RVDatabase" was_downloaded: bool def __init__(self, parent: "RVDatabase", item: minidom.Element) -> None: def get_data(tag_name: str) -> str: nodes = item.getElementsByTagName(tag_name) if len(nodes) != 1: raise RuntimeError(f"Exepected 1 tag `{tag_name}`, got {len(nodes)}.") children = nodes[0].childNodes if len(children) != 1: raise RuntimeError( f"Exepected 1 children for tag `{tag_name}`, got {len(children)}." ) return children[0].data self.title = get_data("title") self.link = get_data("link") # self.creator = get_data("dc:creator") # self.description = get_data("description") # self.date = get_data("pubDate") self.guid = int(get_data("guid")) self.parent = parent self.was_downloaded = False def read_cache(self, cache: "RVElement") -> None: if "ytdl_infos" in cache.__dict__: self.__dict__["ytdl_infos"] = cache.__dict__["ytdl_infos"] log.debug(f"From cache: {self}") if cache.was_downloaded: self.was_downloaded = True def __str__(self) -> str: return f"{self.title} – {self.link}" @property def downloaded(self) -> bool: if "ytdl_infos" not in self.__dict__: return False return os.path.isfile(self.filepath) @functools.cached_property def ytdl_infos(self) -> typing.Optional[dict]: log.info(f"Researching: {self}") try: infos = self.parent.ytdl_dry.extract_info(self.link) except BaseException as e: # TODO Still raise in case of temporary network issue log.warn(e) infos = None # Apparently that thing is transformed from a LazyList # somewhere in the normal yt_dlp process if ( infos and "thumbnails" in infos and isinstance(infos["thumbnails"], youtube_dl.utils.LazyList) ): infos["thumbnails"] = infos["thumbnails"].exhaust() # Save database once it's been computed self.__dict__["ytdl_infos"] = infos self.parent.save() return infos @property def skip(self) -> bool: assert self.is_video assert self.ytdl_infos if ( self.parent.args.max_duration > 0 and self.ytdl_infos["duration"] > self.parent.args.max_duration ): return True return False @property def is_video(self) -> bool: # Duration might be missing in playlists and stuff return self.ytdl_infos is not None and "duration" in self.ytdl_infos @property def filepath(self) -> str: assert self.is_video # TODO This doesn't change the extension to mkv when the formats are incomaptible return self.parent.ytdl_dry.prepare_filename(self.ytdl_infos) @property def filename(self) -> str: assert self.is_video return os.path.splitext(self.filepath)[0] def download(self) -> None: assert self.is_video log.info(f"Downloading: {self}") if not self.parent.args.dryrun: self.parent.ytdl.process_ie_result(self.ytdl_infos, True, {}) self.was_downloaded = True self.parent.save() def act(self) -> None: if not self.is_video: log.debug(f"Not a video: {self}") return if self.downloaded: log.debug(f"Currently downloaded: {self}") return if self.was_downloaded: log.debug(f"Downloaded previously: {self}") return if self.skip: log.debug(f"Skipped: {self}") return self.download() class RVDatabase: SAVE_FILE = ".cache.p" args: configargparse.Namespace elements: list[RVElement] def __init__(self, args: configargparse.Namespace) -> None: self.args = args def save(self) -> None: log.debug("Saving cache") if self.args.dryrun: return with open(self.SAVE_FILE, "wb") as save_file: pickle.dump(self, save_file) @classmethod def load(cls) -> typing.Optional["RVDatabase"]: try: with open(cls.SAVE_FILE, "rb") as save_file: return pickle.load(save_file) except (TypeError, AttributeError, EOFError): log.warn("Corrupt / outdated cache, it will be rebuilt.") except FileNotFoundError: pass return None def read_cache(self, cache: "RVDatabase") -> None: cache_els = dict() for cache_el in cache.elements: cache_els[cache_el.guid] = cache_el for el in self.elements: if el.guid in cache_els: el.read_cache(cache_els[el.guid]) def read_feed(self) -> None: log.info("Fetching RSS feed") self.elements = list() with urllib.request.urlopen(self.args.feed) as request: with minidom.parse(request) as xmldoc: for item in xmldoc.getElementsByTagName("item"): element = RVElement(self, item) self.elements.insert(0, element) log.debug(f"Known: {element}") def clean(self) -> None: filenames = set() for element in self.elements: if element.is_video and not element.skip: filenames.add(element.filename) for file in os.listdir(): if file == RVDatabase.SAVE_FILE: continue if not os.path.isfile(file): continue for filename in filenames: if file.startswith(filename): break else: log.info(f"Removing: {file}") if not self.args.dryrun: os.unlink(file) def act_all(self) -> None: for element in self.elements: element.act() @property def ytdl_opts(self) -> dict: return {"format": self.args.format, "allsubtitles": self.args.subtitles} @property def ytdl_dry_opts(self) -> dict: opts = self.ytdl_opts.copy() opts.update({"simulate": True, "quiet": True}) return opts @property def ytdl(self) -> youtube_dl.YoutubeDL: return youtube_dl.YoutubeDL(self.ytdl_opts) @property def ytdl_dry(self) -> youtube_dl.YoutubeDL: return youtube_dl.YoutubeDL(self.ytdl_dry_opts) def get_args() -> configargparse.Namespace: defaultConfigPath = os.path.join( os.path.expanduser(os.getenv("XDG_CONFIG_PATH", "~/.config/")), "rssVideos" ) parser = configargparse.ArgParser( description="Download videos linked in " + "a RSS feed (e.g. an unread feed from " + "an RSS aggregator", default_config_files=[defaultConfigPath], ) parser.add_argument( "-v", "--verbosity", choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], default=None, help="Verbosity of log messages", ) parser.add( "-c", "--config", required=False, is_config_file=True, help="Configuration file" ) parser.add( "--feed", help="URL of the RSS feed (must be public for now)", env_var="RSS_VIDEOS_FEED", required=True, ) parser.add( "--videos", help="Directory to store videos", env_var="RSS_VIDEOS_VIDEO_DIR", required=True, ) parser.add( "-n", "--dryrun", help="Do not download the videos", action="store_const", const=True, default=False, ) parser.add( "--max-duration", help="Skip video longer than this amount of seconds", env_var="RSS_VIDEOS_MAX_DURATION", type=int, default=0, ) parser.add( "--format", help="Use this format to download videos." + " See FORMAT SELECTION in youtube-dl(1)", env_var="RSS_VIDEOS_FORMAT", default="bestvideo+bestaudio/best", ) parser.add( "--subtitles", help="Download all subtitles", env_var="RSS_VIDEOS_SUBTITLES", action="store_true", ) parser.set_defaults(subcommand=RVCommand.download) subparsers = parser.add_subparsers(title="subcommand") sc_download = subparsers.add_parser("download") sc_download.set_defaults(subcommand=RVCommand.download) sc_list = subparsers.add_parser("list") sc_list.set_defaults(subcommand=RVCommand.list) args = parser.parse_args() args.videos = os.path.realpath(os.path.expanduser(args.videos)) return args def main() -> None: args = get_args() configure_logging(args) os.makedirs(args.videos, exist_ok=True) os.chdir(args.videos) if args.subcommand == RVCommand.download: database = RVDatabase(args) database.read_feed() cache = RVDatabase.load() if cache: database.read_cache(cache) database.clean() database.act_all() database.save() elif args.subcommand == RVCommand.list: cache = RVDatabase.load() if not cache: raise FileNotFoundError("This command doesn't work without a cache yet.") for element in cache.elements: print(element) if __name__ == "__main__": main()