From d88520552b704e2be6740ca4b234df11d3951b22 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Geoffrey=20=E2=80=9CFrogeye=E2=80=9D=20Preud=27homme?= Date: Wed, 23 Mar 2022 18:54:05 +0100 Subject: [PATCH] rssVideos: Now thread-safe (kinda) --- config/scripts/rssVideos | 343 +++++++++++++++++++-------------------- 1 file changed, 167 insertions(+), 176 deletions(-) diff --git a/config/scripts/rssVideos b/config/scripts/rssVideos index 2154000..e742f00 100755 --- a/config/scripts/rssVideos +++ b/config/scripts/rssVideos @@ -9,6 +9,7 @@ with the unread items (non-video links are ignored). """ import datetime +import filelock import functools import logging import os @@ -26,8 +27,6 @@ import yt_dlp log = logging.getLogger(__name__) -# TODO Lockfile, or a way to parallel watch and download - def configure_logging(args: configargparse.Namespace) -> None: # Configure logging @@ -107,17 +106,33 @@ def format_duration(duration: int) -> str: class RVElement: parent: "RVDatabase" item: dict - downloaded_filepath: typing.Optional[str] + + RERESEARCH_AFTER = datetime.timedelta(hours=1) def __init__(self, parent: "RVDatabase", item: dict) -> None: self.parent = parent self.item = item - self.downloaded_filepath = None @property def id(self) -> str: return self.item["id"] + @property + def sid(self) -> str: + return self.id.split("/")[-1] + + def metafile(self, extension: str) -> str: + return os.path.join(self.parent.METADATA_FOLDER, f"{self.sid}.{extension}") + + def metafile_read(self, extension: str) -> typing.Any: + return self.parent.metafile_read(f"{self.sid}.{extension}") + + def metafile_write(self, extension: str, data: typing.Any) -> None: + return self.parent.metafile_write(f"{self.sid}.{extension}", data) + + def save(self) -> None: + self.metafile_write("item", self.item) + @property def title(self) -> str: return self.item["title"] @@ -136,14 +151,8 @@ class RVElement: @property def is_researched(self) -> bool: - return "ytdl_infos" in self.__dict__ - - def salvage_cache(self, cache: "RVElement") -> None: - if cache.is_researched: - self.__dict__["ytdl_infos"] = cache.__dict__["ytdl_infos"] - log.debug(f"From cache: {self}") - if cache.downloaded_filepath: - self.downloaded_filepath = cache.downloaded_filepath + metafile = self.metafile("ytdl") + return os.path.isfile(metafile) def __str__(self) -> str: str = f"{self.date.strftime('%y-%m-%d %H:%M')} (" @@ -169,6 +178,14 @@ class RVElement: @functools.cached_property def ytdl_infos(self) -> typing.Optional[dict]: + try: + return self.metafile_read("ytdl") + except (FileNotFoundError, TypeError, AttributeError, EOFError): + infos = self._ytdl_infos() + self.metafile_write("ytdl", infos) + return infos + + def _ytdl_infos(self) -> typing.Optional[dict]: log.info(f"Researching: {self}") try: infos = self.parent.ytdl_dry.extract_info(self.link, download=False) @@ -180,9 +197,6 @@ class RVElement: infos = None if infos: infos = self.parent.ytdl_dry.sanitize_info(infos) - # Save database once it's been computed - self.__dict__["ytdl_infos"] = infos - self.parent.save() return infos @property @@ -196,6 +210,18 @@ class RVElement: # Duration might be missing in playlists and stuff return self.ytdl_infos is not None and "duration" in self.ytdl_infos + @functools.cached_property + def downloaded_filepath(self) -> typing.Optional[str]: + try: + return self.metafile_read("path") + except FileNotFoundError: + return None + + @property + def was_downloaded(self) -> bool: + metafile = self.metafile("path") + return os.path.exists(metafile) + @property def filepath(self) -> str: assert self.is_video @@ -204,37 +230,36 @@ class RVElement: return self.parent.ytdl_dry.prepare_filename(self.ytdl_infos) @property - def filename(self) -> str: + def basename(self) -> str: assert self.is_video return os.path.splitext(self.filepath)[0] + def expire_info(self) -> None: + metafile = self.metafile("ytdl") + if os.path.isfile(metafile): + stat = os.stat(metafile) + mtime = datetime.datetime.fromtimestamp(stat.st_mtime) + diff = datetime.datetime.now() - mtime + if diff > self.RERESEARCH_AFTER: + os.unlink(metafile) + del self.ytdl_infos + def download(self) -> None: assert self.is_video + if self.downloaded: + return + self.expire_info() log.info(f"Downloading: {self}") - if self.parent.args.research: - del self.ytdl_infos - if not self.parent.args.dryrun: - with yt_dlp.YoutubeDL(self.parent.ytdl_opts) as ydl: - ydl.add_post_processor(SaveInfoPP(self)) - ydl.process_ie_result(self.ytdl_infos, download=True) - self.parent.save() + lockfile = self.metafile("lock") + with filelock.FileLock(lockfile): + if not self.parent.args.dryrun: + with yt_dlp.YoutubeDL(self.parent.ytdl_opts) as ydl: + ydl.add_post_processor(SaveInfoPP(self)) + ydl.process_ie_result(self.ytdl_infos, download=True) def update_post_download(self, info: dict) -> None: self.downloaded_filepath = self.parent.ytdl_dry.prepare_filename(info) - - @property - def was_downloaded(self) -> bool: - return self.downloaded_filepath is not None - - def preload(self) -> None: - assert self.is_video - if self.downloaded: - log.debug(f"Currently downloaded: {self}") - return - if self.was_downloaded: - log.debug(f"Downloaded previously: {self}") - return - self.download() + self.metafile_write("path", self.downloaded_filepath) @property def watched(self) -> bool: @@ -270,8 +295,7 @@ class RVElement: return True def watch(self) -> None: - if not self.downloaded: - self.download() + self.download() cmd = ["mpv", self.filepath] log.debug(f"Running {cmd}") @@ -279,17 +303,27 @@ class RVElement: proc = subprocess.run(cmd) proc.check_returncode() - self.clean() + self.undownload() self.try_mark_read() - def clean(self) -> None: - assert self.is_video - log.info(f"Removing gone video: {self.filename}*") - for file in os.listdir(): - if file.startswith(self.filename): - log.debug(f"Removing file: {file}") + def clean_file(self, folder: str, basename: str) -> None: + for file in os.listdir(folder): + if file.startswith(basename): + path = os.path.join(folder, file) + log.debug(f"Removing file: {path}") if not self.parent.args.dryrun: - os.unlink(file) + os.unlink(path) + + def undownload(self) -> None: + assert self.is_video + log.info(f"Removing gone video: {self.basename}*") + self.clean_file(".", self.basename) + + def clean(self) -> None: + if self.is_video: + self.undownload() + log.info(f"Removing gone metadata: {self.sid}*") + self.clean_file(self.parent.METADATA_FOLDER, self.sid) def mark_read(self) -> None: log.debug(f"Marking {self} read") @@ -309,7 +343,7 @@ class RVElement: if r.text.strip() != "OK": raise RuntimeError(f"Couldn't mark {self} as read: {r.text}") log.info(f"Marked {self} as read") - self.parent.elements.remove(self) + self.clean() def try_mark_read(self) -> None: try: @@ -319,7 +353,7 @@ class RVElement: class RVDatabase: - SAVE_FILE = ".cache.p" + METADATA_FOLDER = ".metadata" args: configargparse.Namespace elements: list[RVElement] @@ -327,53 +361,27 @@ class RVDatabase: def __init__(self, args: configargparse.Namespace) -> None: self.args = args - def save(self) -> None: - log.debug("Saving cache") - if self.args.dryrun: - return - with open(self.SAVE_FILE, "wb") as save_file: - pickle.dump(self, save_file) + def metafile_read(self, name: str) -> typing.Any: + path = os.path.join(self.METADATA_FOLDER, name) + log.debug(f"Reading {path}") + with open(path, "rb") as mf: + return pickle.load(mf) - @classmethod - def load(cls) -> typing.Optional["RVDatabase"]: - try: - with open(cls.SAVE_FILE, "rb") as save_file: - return pickle.load(save_file) - except (TypeError, AttributeError, EOFError): - log.warning("Corrupt / outdated cache, it will be rebuilt.") - except FileNotFoundError: - pass - return None - - def salvage_cache_pre(self, cache: "RVDatabase") -> None: - if "auth_headers" in cache.__dict__: - self.auth_headers = cache.auth_headers - - def salvage_cache(self, cache: "RVDatabase") -> None: - log.debug("Salvaging cache") - cache_els = dict() - for cache_el in cache.elements: - cache_els[cache_el.id] = cache_el - for el in self.elements: - if el.id in cache_els: - el.salvage_cache(cache_els[el.id]) + def metafile_write(self, name: str, data: typing.Any) -> None: + path = os.path.join(self.METADATA_FOLDER, name) + log.debug(f"Writing {path}") + if not self.args.dryrun: + with open(path, "wb") as mf: + pickle.dump(data, mf) def clean_cache(self, cache: "RVDatabase") -> None: log.debug("Cleaning cache") - self_els = dict() - for self_el in self.elements: - self_els[self_el.id] = self_el + fresh_ids = set(el.id for el in self.elements) for el in cache.elements: - if el.id not in self_els: - if el.is_researched and el.is_video: - el.clean() + if el.id not in fresh_ids: + el.clean() - def import_cache(self, cache: "RVDatabase") -> None: - log.debug("Importing cache") - self.build_list([element.item for element in cache.elements]) - - @functools.cached_property - def auth_headers(self) -> dict[str, str]: + def _auth_headers(self) -> dict[str, str]: r = requests.get( f"{self.args.url}/accounts/ClientLogin", params={"Email": self.args.email, "Passwd": self.args.passwd}, @@ -385,6 +393,15 @@ class RVDatabase: return {"Authorization": f"GoogleLogin auth={val}"} raise RuntimeError("Couldn't find auth= key") + @functools.cached_property + def auth_headers(self) -> dict[str, str]: + try: + return self.metafile_read(".auth_headers") + except FileNotFoundError: + headers = self._auth_headers() + self.metafile_write(".auth_headers", headers) + return headers + def fetch_feed_elements(self) -> typing.Generator[dict, None, None]: log.info("Fetching RSS feed") continuation: typing.Optional[str] = None @@ -409,45 +426,47 @@ class RVDatabase: while continuation: yield from next_page() - def build_list(self, items: typing.Iterable[dict]) -> None: + def fetch_cache_elements(self) -> typing.Generator[dict, None, None]: + log.info("Fetching from cache") + for file in os.listdir(self.METADATA_FOLDER): + if not file.endswith(".item"): + continue + yield self.metafile_read(file) + + def build_list(self, items: typing.Iterable[dict], save: bool = False) -> None: self.elements = [] for item in items: element = RVElement(self, item) self.elements.insert(0, element) log.debug(f"Known: {element}") + if save: + element.save() def read_feed(self) -> None: - self.build_list(self.fetch_feed_elements()) + self.build_list(self.fetch_feed_elements(), save=True) + + def read_cache(self) -> None: + self.build_list(self.fetch_cache_elements()) + + def clean_folder(self, folder: str, basenames: set[str]) -> None: + for file in os.listdir(folder): + path = os.path.join(folder, file) + if not os.path.isfile(path) or file[0] == ".": + continue + for basename in basenames: + if file.startswith(basename): + break + else: + log.info(f"Removing unknown file: {path}") + if not self.args.dryrun: + os.unlink(path) def clean(self) -> None: log.debug("Cleaning") - filenames = set() - for element in self.elements: - if element.is_video: - filenames.add(element.filename) - for file in os.listdir(): - if file == RVDatabase.SAVE_FILE: - continue - if not os.path.isfile(file): - continue - for filename in filenames: - if file.startswith(filename): - break - else: - log.info(f"Removing unknown file: {file}") - if not self.args.dryrun: - os.unlink(file) - - @property - def all_researched(self) -> bool: - for element in self.elements: - if not element.is_researched: - return False - return True - - def attempt_clean(self) -> None: - if self.all_researched: - self.clean() + filenames = set(el.basename for el in self.elements if el.is_video) + self.clean_folder(".", filenames) + ids = set(el.sid for el in self.elements) + self.clean_folder(self.METADATA_FOLDER, ids) @property def ytdl_opts(self) -> dict: @@ -468,7 +487,9 @@ class RVDatabase: elements: typing.Iterable[RVElement] # Inexpensive sort if args.order == "new": - elements = reversed(elements_src) + elements = sorted(elements_src, key=lambda el: el.date, reverse=True) + elif args.order == "old": + elements = sorted(elements_src, key=lambda el: el.date) elif args.order == "title": elements = sorted(elements_src, key=lambda el: el.title) elif args.order == "creator": @@ -478,8 +499,6 @@ class RVDatabase: elif args.order == "random": elements = elements_src random.shuffle(elements) - else: - elements = elements_src # Possibly expensive filtering elements = filter(lambda el: el.matches_filter(args), elements) @@ -575,11 +594,6 @@ def get_args() -> configargparse.Namespace: env_var="RSS_VIDEOS_PASSWD", required=True, ) - parser.add( - "--research", - help="Fetch video info again", - action="store_true", - ) parser.add( "--no-refresh", dest="refresh", @@ -641,79 +655,56 @@ def get_args() -> configargparse.Namespace: "list", "watch", "binge", - "clean", ), default="download", ) args = parser.parse_args() args.videos = os.path.realpath(os.path.expanduser(args.videos)) - if not args.duration and args.max_duration: - args.duration = str(args.max_duration) return args def get_database(args: configargparse.Namespace) -> RVDatabase: - database = RVDatabase(args) - cache = RVDatabase.load() - feed_fetched = False - if cache: - database.salvage_cache_pre(cache) - if args.refresh: - try: - database.read_feed() - feed_fetched = True - except requests.ConnectionError as err: - if args.action == "download": - raise RuntimeError("Couldn't fetch feed, refusing to download") - # This is a quirky failsafe in case of no internet connection, - # so the script doesn't go noting that no element is a video. - log.warning(f"Couldn't fetch feed: {err}") - if not feed_fetched: - if cache: - log.warning("Using cached feed.") - database.import_cache(cache) - else: - raise FileNotFoundError("Feed not fetched and no cached feed.") - if cache: - database.salvage_cache(cache) - database.clean_cache(cache) - database.save() + cache = RVDatabase(args) + cache.read_cache() + if not args.refresh: + return cache - return database + fresh = RVDatabase(args) + fresh.read_feed() + fresh.clean_cache(cache) + return fresh def main() -> None: args = get_args() configure_logging(args) - os.makedirs(args.videos, exist_ok=True) + metadata_dir = os.path.join(args.videos, RVDatabase.METADATA_FOLDER) + for dir in (args.videos, metadata_dir): + os.makedirs(dir, exist_ok=True) os.chdir(args.videos) database = get_database(args) + database.clean() log.debug("Running action") - if args.action == "clean": - database.clean() - else: - duration = 0 - for element in database.filter(args): - duration += element.duration if element.is_video else 0 - if args.action == "download": - element.preload() - elif args.action == "list": - print(element) - elif args.action in ("watch", "binge"): - element.watch() - if args.action == "watch": - break - else: - raise NotImplementedError(f"Unimplemented action: {args.action}") - log.info(f"Total duration: {format_duration(duration)}") - database.attempt_clean() + duration = 0 + for element in database.filter(args): + duration += element.duration if element.is_video else 0 + if args.action == "download": + element.download() + elif args.action == "list": + print(element) + elif args.action in ("watch", "binge"): + element.watch() + if args.action == "watch": + break + else: + raise NotImplementedError(f"Unimplemented action: {args.action}") + log.info(f"Total duration: {format_duration(duration)}") database.try_mark_watched_read() - database.save() if __name__ == "__main__":