diff --git a/config/scripts/rssVideos b/config/scripts/rssVideos index fc5f80a..b88439c 100755 --- a/config/scripts/rssVideos +++ b/config/scripts/rssVideos @@ -1,5 +1,6 @@ #!/usr/bin/env python3 + """ Script that download videos that are linked as an article in a RSS feed. @@ -8,17 +9,235 @@ with the unread items (non-video links are ignored). """ # TODO Distribute this correclty, in the meanwhile please do -# pip install --user yt-dlp ConfigArgParse +# pip install --user coloredlogs ConfigArgParse yt-dlp -# TODO Better logging (youtube-dl allow to pass loggers) - -import sys -import urllib.request -import urllib.parse +import enum +import functools +import logging import os +import pickle +import sys +import typing +import urllib.parse +import urllib.request from xml.dom import minidom -import yt_dlp as youtube_dl + +import coloredlogs import configargparse +import yt_dlp as youtube_dl + +log = logging.getLogger(__name__) + + +def configure_logging(args: configargparse.Namespace) -> None: + # Configure logging + if args.verbosity: + coloredlogs.install( + level=args.verbosity, + ) + else: + coloredlogs.install( + fmt="%(message)s", + logger=log, + ) + + +class RVElement: + title: str + link: str + # creator: str + # description: str + # date: datetime.datetime + guid: int + + parent: "RVDatabase" + + def __init__(self, parent: "RVDatabase", item: minidom.Element) -> None: + def get_data(tag_name: str) -> str: + nodes = item.getElementsByTagName(tag_name) + if len(nodes) != 1: + raise RuntimeError(f"Exepected 1 tag `{tag_name}`, got {len(nodes)}.") + children = nodes[0].childNodes + if len(children) != 1: + raise RuntimeError( + f"Exepected 1 children for tag `{tag_name}`, got {len(children)}." + ) + return children[0].data + + self.title = get_data("title") + self.link = get_data("link") + # self.creator = get_data("dc:creator") + # self.description = get_data("description") + # self.date = get_data("pubDate") + self.guid = int(get_data("guid")) + + self.parent = parent + + def read_cache(self, cache: "RVElement") -> None: + if "ytdl_infos" in cache.__dict__: + self.__dict__["ytdl_infos"] = cache.__dict__["ytdl_infos"] + log.debug(f"From cache: {self}") + + def __str__(self) -> str: + return f"{self.title} – {self.link}" + + @property + def downloaded(self) -> bool: + if "ytdl_infos" not in self.__dict__: + return False + return os.path.isfile(self.filepath) + + @functools.cached_property + def ytdl_infos(self) -> typing.Optional[dict]: + log.info(f"Researching: {self}") + try: + infos = self.parent.ytdl_dry.extract_info(self.link) + except BaseException as e: + # TODO Still raise in case of temporary network issue + log.warn(e) + infos = None + # Apparently that thing is transformed from a LazyList + # somewhere in the normal yt_dlp process + if ( + infos + and "thumbnails" in infos + and isinstance(infos["thumbnails"], youtube_dl.utils.LazyList) + ): + infos["thumbnails"] = infos["thumbnails"].exhaust() + # Save database once it's been computed + self.__dict__["ytdl_infos"] = infos + self.parent.save() + return infos + + @property + def skip(self) -> bool: + assert self.is_video + assert self.ytdl_infos + if ( + self.parent.args.max_duration > 0 + and self.ytdl_infos["duration"] > self.parent.args.max_duration + ): + return True + return False + + @property + def is_video(self) -> bool: + # Duration might be missing in playlists and stuff + return self.ytdl_infos is not None and "duration" in self.ytdl_infos + + @property + def filepath(self) -> str: + assert self.is_video + return self.parent.ytdl_dry.prepare_filename(self.ytdl_infos) + + @property + def filename(self) -> str: + assert self.is_video + return os.path.splitext(self.filepath)[0] + + def download(self) -> None: + assert self.is_video + log.info(f"Downloading: {self}") + if self.parent.args.dryrun: + return + self.parent.ytdl.process_ie_result(self.ytdl_infos, True, {}) + + def act(self) -> None: + if not self.is_video: + log.debug(f"Not a video: {self}") + return + if self.downloaded: + log.debug(f"Already downloaded: {self}") + return + if self.skip: + log.debug(f"Skipped: {self}") + return + self.download() + + +class RVDatabase: + SAVE_FILE = ".cache.p" + + args: configargparse.Namespace + elements: list[RVElement] + + def __init__(self, args: configargparse.Namespace) -> None: + self.args = args + + def save(self) -> None: + if self.args.dryrun: + return + with open(self.SAVE_FILE, "wb") as save_file: + pickle.dump(self, save_file) + + @classmethod + def load(cls) -> typing.Optional["RVDatabase"]: + try: + with open(cls.SAVE_FILE, "rb") as save_file: + return pickle.load(save_file) + except (TypeError, AttributeError, EOFError): + log.warn("Corrupt / outdated cache, it will be rebuilt.") + except FileNotFoundError: + pass + return None + + def read_cache(self, cache: "RVDatabase") -> None: + cache_els = dict() + for cache_el in cache.elements: + cache_els[cache_el.guid] = cache_el + for el in self.elements: + if el.guid in cache_els: + el.read_cache(cache_els[el.guid]) + + def read_feed(self) -> None: + log.info("Fetching RSS feed") + self.elements = list() + with urllib.request.urlopen(self.args.feed) as request: + with minidom.parse(request) as xmldoc: + for item in xmldoc.getElementsByTagName("item"): + element = RVElement(self, item) + self.elements.insert(0, element) + log.debug(f"Known: {element}") + + def clean(self) -> None: + filenames = set() + for element in self.elements: + if element.is_video: + filenames.add(element.filename) + for file in os.listdir(): + if file == RVDatabase.SAVE_FILE: + continue + if not os.path.isfile(file): + continue + for filename in filenames: + if file.startswith(filename): + break + else: + log.info(f"Removing: {file}") + if not self.args.dryrun: + os.unlink(file) + + def act_all(self) -> None: + for element in self.elements: + element.act() + + @property + def ytdl_opts(self) -> dict: + return {"format": self.args.format, "allsubtitles": self.args.subtitles} + + @property + def ytdl_dry_opts(self) -> dict: + opts = self.ytdl_opts.copy() + opts.update({"simulate": True, "quiet": True}) + return opts + + @property + def ytdl(self) -> youtube_dl.YoutubeDL: + return youtube_dl.YoutubeDL(self.ytdl_opts) + + @property + def ytdl_dry(self) -> youtube_dl.YoutubeDL: + return youtube_dl.YoutubeDL(self.ytdl_dry_opts) def get_args() -> configargparse.Namespace: @@ -32,6 +251,13 @@ def get_args() -> configargparse.Namespace: + "an RSS aggregator", default_config_files=[defaultConfigPath], ) + parser.add_argument( + "-v", + "--verbosity", + choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], + default=None, + help="Verbosity of log messages", + ) parser.add( "-c", "--config", required=False, is_config_file=True, help="Configuration file" ) @@ -55,15 +281,6 @@ def get_args() -> configargparse.Namespace: const=True, default=False, ) - # TODO This feature might require additional documentation and an on/off switch - parser.add( - "--track", - help="Directory where download videos are marked " - + "to not download them after deletion.", - env_var="RSS_VIDEOS_TRACK", - required=False, - default=".rssVideos", - ) parser.add( "--max-duration", help="Skip video longer than this amount of seconds", @@ -87,207 +304,25 @@ def get_args() -> configargparse.Namespace: args = parser.parse_args() args.videos = os.path.realpath(os.path.expanduser(args.videos)) - args.track = os.path.expanduser(args.track) - if not os.path.isabs(args.track): - args.track = os.path.realpath(os.path.join(args.videos, args.track)) return args -def get_links(args: configargparse.Namespace) -> list[str]: - """ - Read the feed XML, get the links - """ - links = list() - with urllib.request.urlopen(args.feed) as request: - with minidom.parse(request) as xmldoc: - for item in xmldoc.getElementsByTagName("item"): - try: - linkNode = item.getElementsByTagName("link")[0] - link: str = linkNode.childNodes[0].data - if link not in links: - links.append(link) - except BaseException as e: - print("Error while getting link from item:", e) - continue - return links - - -def get_video_infos( - args: configargparse.Namespace, ydl_opts: dict, links: list[str] -) -> dict[str, dict]: - """ - Filter out non-video links and store video download info - and associated filename - """ - videosInfos = dict() - - dry_ydl_opts = ydl_opts.copy() - dry_ydl_opts.update({"simulate": True, "quiet": True}) - with youtube_dl.YoutubeDL(dry_ydl_opts) as ydl: - for link in links: - print(f"Researching {link}...") - try: - infos = ydl.extract_info(link) - if args.max_duration > 0 and infos["duration"] > args.max_duration: - print( - f"{infos['title']}: Skipping as longer than max duration: " - f"{infos['duration']} > {args.max_duration}" - ) - continue - filepath = ydl.prepare_filename(infos) - filename, extension = os.path.splitext(filepath) - videosInfos[filename] = infos - print(f"{infos['title']}: Added") - - except BaseException as e: - print(e) - continue - - return videosInfos - - -def get_downloaded_videos( - args: configargparse.Namespace, videosInfos: dict[str, dict] -) -> tuple[set[str], set[str]]: - videosDownloaded = set() - videosPartiallyDownloaded = set() - """ - Read the directory content, delete everything that's not a - video on the download list or already downloaded - """ - - for filepath in os.listdir(args.videos): - fullpath = os.path.join(args.videos, filepath) - if not os.path.isfile(fullpath): - continue - filename, extension = os.path.splitext(filepath) - - for onlineFilename in videosInfos.keys(): - # Full name already there: completly downloaded - # → remove from the download list - if filename == onlineFilename: - videosDownloaded.add(onlineFilename) - break - elif filename.startswith(onlineFilename): - # Subtitle file - # → ignore - if filename.endswith(".vtt"): - break - - # Partial name already there: not completly downloaded - # → keep on the download list - videosPartiallyDownloaded.add(onlineFilename) - break - # Unrelated filename: delete - else: - print(f"Deleting: {filename}") - os.unlink(fullpath) - - return videosDownloaded, videosPartiallyDownloaded - - -def get_tracked_videos(args: configargparse.Namespace, known: set[str]) -> set[str]: - """ - Return videos previously downloaded (=tracked) amongst the unread videos. - This is stored in the tracking directory as empty extension-less files. - Other tracking markers (e.g. for now read videos) are deleted. - """ - - videosTracked = set() - - for filepath in os.listdir(args.track): - fullpath = os.path.join(args.track, filepath) - if not os.path.isfile(fullpath): - continue - # Here filename is a filepath as no extension - - if filepath in known: - videosTracked.add(filepath) - else: - os.unlink(fullpath) - - return videosTracked - - def main() -> None: - args = get_args() + configure_logging(args) os.makedirs(args.videos, exist_ok=True) - os.makedirs(args.track, exist_ok=True) - ydl_opts = {"format": args.format, "allsubtitles": args.subtitles} - - print("→ Retrieveing RSS feed") - links = get_links(args) - # Oldest first - links = links[::-1] - - print(f"→ Getting infos on {len(links)} unread articles") - videosInfos = get_video_infos(args, ydl_opts, links) - - print(f"→ Deciding on what to do for {len(videosInfos)} videos") - videosDownloaded, videosPartiallyDownloaded = get_downloaded_videos( - args, videosInfos - ) - videosTracked = get_tracked_videos(args, set(videosInfos.keys())) - - # Deciding for the rest based on the informations - - def markTracked(filename: str) -> None: - markerPath = os.path.join(args.track, onlineFilename) - open(markerPath, "a").close() - - videosToDownload: set[str] = set() - videosReads: set[str] = set() - for onlineFilename in videosInfos.keys(): - # If the video was once downloaded but manually deleted, - # the marker should be left - if onlineFilename in videosTracked: - print(f"Should be marked as read: {onlineFilename}") - # TODO Automatically do that one day maybe? - # Need to login to the FreshRSS API and keep track of - # the item id along the process - videosReads.add(onlineFilename) - elif onlineFilename in videosDownloaded: - markTracked(onlineFilename) - print(f"Already downloaded: {onlineFilename}") - else: - if onlineFilename in videosPartiallyDownloaded: - print(f"Will be continued: {onlineFilename}") - else: - print(f"Will be downloaded: {onlineFilename}") - videosToDownload.add(onlineFilename) - - # Download the missing videos - print(f"→ Downloading {len(videosToDownload)} videos") - os.chdir(args.videos) - exit_code = 0 - with youtube_dl.YoutubeDL(ydl_opts) as ydl: - for onlineFilename, infos in videosInfos.items(): - if onlineFilename not in videosToDownload: - continue - - # Really download - if args.dryrun: - print(f"Would download {onlineFilename}") - else: - # Apparently that thing is transformed from a LazyList - # somewhere in the normal yt_dlp process - if isinstance(infos["thumbnails"], youtube_dl.utils.LazyList): - infos["thumbnails"] = infos["thumbnails"].exhaust() - try: - ydl.process_ie_result(infos, True, {}) - - markTracked(onlineFilename) - except BaseException as e: - print(e) - exit_code = 1 - continue - - sys.exit(exit_code) + database = RVDatabase(args) + database.read_feed() + cache = RVDatabase.load() + if cache: + database.read_cache(cache) + database.clean() + database.act_all() + database.save() if __name__ == "__main__":