dotfiles/config/scripts/rssVideos

537 lines
16 KiB
Python
Executable file
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Script that download videos that are linked as an article
in a RSS feed.
The common use case would be a feed from an RSS aggregator
with the unread items (non-video links are ignored).
"""
import enum
import functools
import logging
import os
import pickle
import random
import re
import subprocess
import sys
import typing
import urllib.parse
import urllib.request
import urllib.error
from xml.dom import minidom
import coloredlogs
import configargparse
import yt_dlp as youtube_dl
log = logging.getLogger(__name__)
# TODO Lockfile, or a way to parallel watch and download
def configure_logging(args: configargparse.Namespace) -> None:
# Configure logging
if args.verbosity:
coloredlogs.install(
level=args.verbosity,
)
else:
coloredlogs.install(
fmt="%(message)s",
logger=log,
)
class RVElement:
parent: "RVDatabase"
item: minidom.Element
was_downloaded: bool
watched: bool
def __init__(self, parent: "RVDatabase", item: minidom.Element) -> None:
self.parent = parent
self.item = item
self.was_downloaded = False
self.watched = False
def get_tag_data(self, tag_name: str) -> str:
nodes = self.item.getElementsByTagName(tag_name)
if len(nodes) != 1:
raise KeyError(f"Exepected 1 tag `{tag_name}`, got {len(nodes)}.")
children = nodes[0].childNodes
if len(children) != 1:
raise KeyError(
f"Exepected 1 children for tag `{tag_name}`, got {len(children)}."
)
return children[0].data
@property
def title(self) -> str:
return self.get_tag_data("title")
@property
def link(self) -> str:
return self.get_tag_data("link")
@property
def creator(self) -> typing.Optional[str]:
try:
return self.get_tag_data("dc:creator")
except KeyError:
return None
@property
def description(self) -> str:
# TODO Testing
return self.get_tag_data("description")
@property
def date(self) -> str:
# TODO datetime format
return self.get_tag_data("pubDate")
@property
def guid(self) -> int:
return int(self.get_tag_data("guid"))
@property
def is_researched(self) -> bool:
return "ytdl_infos" in self.__dict__
def salvage_cache(self, cache: "RVElement") -> None:
if cache.is_researched:
self.__dict__["ytdl_infos"] = cache.__dict__["ytdl_infos"]
log.debug(f"From cache: {self}")
if cache.was_downloaded:
self.was_downloaded = True
if cache.watched:
self.watched = True
def __str__(self) -> str:
return f"{self.guid}: {self.creator} {self.title} {self.link}"
@property
def downloaded(self) -> bool:
if not self.is_researched:
return False
return os.path.isfile(self.filepath)
@functools.cached_property
def ytdl_infos(self) -> typing.Optional[dict]:
log.info(f"Researching: {self}")
try:
infos = self.parent.ytdl_dry.extract_info(self.link)
except KeyboardInterrupt as e:
raise e
except youtube_dl.utils.DownloadError as e:
# TODO Still raise in case of temporary network issue
log.warning(e)
infos = None
# Apparently that thing is transformed from a LazyList
# somewhere in the normal yt_dlp process
if (
infos
and "thumbnails" in infos
and isinstance(infos["thumbnails"], youtube_dl.utils.LazyList)
):
infos["thumbnails"] = infos["thumbnails"].exhaust()
# Save database once it's been computed
self.__dict__["ytdl_infos"] = infos
self.parent.save()
return infos
@property
def duration(self) -> int:
assert self.is_video
assert self.ytdl_infos
return self.ytdl_infos["duration"]
@property
def is_video(self) -> bool:
# Duration might be missing in playlists and stuff
return self.ytdl_infos is not None and "duration" in self.ytdl_infos
@property
def filepath(self) -> str:
assert self.is_video
# TODO This doesn't change the extension to mkv when the formats are incomaptible
return self.parent.ytdl_dry.prepare_filename(self.ytdl_infos)
@property
def filename(self) -> str:
assert self.is_video
return os.path.splitext(self.filepath)[0]
def download(self) -> None:
assert self.is_video
log.info(f"Downloading: {self}")
if not self.parent.args.dryrun:
self.parent.ytdl.process_ie_result(self.ytdl_infos, True, {})
self.was_downloaded = True
self.parent.save()
def preload(self) -> None:
assert self.is_video
if self.downloaded:
log.debug(f"Currently downloaded: {self}")
return
if self.was_downloaded:
log.debug(f"Downloaded previously: {self}")
return
self.download()
MATCHES_DURATION_MULTIPLIERS = {"s": 1, "m": 60, "h": 3600, None: 1}
MATCHES_DURATION_COMPARATORS = {
"<": int.__lt__,
"-": int.__lt__,
">": int.__gt__,
"+": int.__gt__,
"=": int.__eq__,
None: int.__le__,
}
def matches_filter(self, args: configargparse.Namespace) -> bool:
if args.seen != "any" and (args.seen == "seen") != self.watched:
log.debug(f"Not {args.seen}: {self}")
return False
if args.title and not re.search(args.title, self.title):
log.debug(f"Title not matching {args.title}: {self}")
return False
if args.guid and not re.search(args.guid, str(self.guid)):
log.debug(f"Guid not matching {args.guid}: {self}")
return False
if args.link and not re.search(args.link, self.link):
log.debug(f"Link not matching {args.link}: {self}")
return False
if args.creator and (not self.creator or not re.search(args.creator, self.creator)):
log.debug(f"Creator not matching {args.creator}: {self}")
return False
if not self.is_video:
log.debug(f"Not a video: {self}")
return False
if args.duration:
dur = args.duration
mult_index = dur[-1].lower()
if mult_index.isdigit():
mult_index = None
else:
dur = dur[:-1]
try:
multiplier = self.MATCHES_DURATION_MULTIPLIERS[mult_index]
except IndexError:
raise ValueError(f"Unknown duration multiplier: {mult_index}")
comp_index = dur[0]
if comp_index.isdigit():
comp_index = None
else:
dur = dur[1:]
try:
comparator = self.MATCHES_DURATION_COMPARATORS[comp_index]
except IndexError:
raise ValueError(f"Unknown duration comparator: {comp_index}")
duration = int(dur)
if not comparator(self.duration, duration * multiplier):
log.debug(f"Duration {self.duration} not matching {args.duration}: {self}")
return False
return True
def watch(self) -> None:
if not self.downloaded:
self.download()
cmd = ["mpv", self.filepath]
log.debug(f"Running {cmd}")
if not self.parent.args.dryrun:
proc = subprocess.run(cmd)
proc.check_returncode()
self.watched = True
self.parent.save()
def clean(self) -> None:
assert self.is_video
log.info(f"Removing gone video: {self.filename}*")
for file in os.listdir():
if file.startswith(self.filename):
log.debug(f"Removing file: {file}")
if not self.parent.args.dryrun:
os.unlink(file)
class RVDatabase:
SAVE_FILE = ".cache.p"
args: configargparse.Namespace
elements: list[RVElement]
def __init__(self, args: configargparse.Namespace) -> None:
self.args = args
def save(self) -> None:
log.debug("Saving cache")
if self.args.dryrun:
return
with open(self.SAVE_FILE, "wb") as save_file:
pickle.dump(self, save_file)
@classmethod
def load(cls) -> typing.Optional["RVDatabase"]:
try:
with open(cls.SAVE_FILE, "rb") as save_file:
return pickle.load(save_file)
except (TypeError, AttributeError, EOFError):
log.warning("Corrupt / outdated cache, it will be rebuilt.")
except FileNotFoundError:
pass
return None
def salvage_cache(self, cache: "RVDatabase") -> None:
log.debug(f"Salvaging cache")
cache_els = dict()
for cache_el in cache.elements:
cache_els[cache_el.guid] = cache_el
for el in self.elements:
if el.guid in cache_els:
el.salvage_cache(cache_els[el.guid])
def clean_cache(self, cache: "RVDatabase") -> None:
log.debug(f"Cleaning cache")
self_els = dict()
for self_el in self.elements:
self_els[self_el.guid] = self_el
for el in cache.elements:
if el.guid not in self_els:
if el.is_researched and el.is_video:
el.clean()
def import_cache(self, cache: "RVDatabase") -> None:
log.debug(f"Importing cache")
self.feed_xml = cache.feed_xml
self.read_feed()
@functools.cached_property
def feed_xml(self) -> minidom.Document:
log.info("Fetching RSS feed")
with urllib.request.urlopen(self.args.feed) as request:
return minidom.parse(request)
def read_feed(self) -> None:
self.elements = []
for item in self.feed_xml.getElementsByTagName("item"):
element = RVElement(self, item)
self.elements.insert(0, element)
log.debug(f"Known: {element}")
def clean(self) -> None:
log.debug("Cleaning")
filenames = set()
for element in self.elements:
if element.is_video:
filenames.add(element.filename)
for file in os.listdir():
if file == RVDatabase.SAVE_FILE:
continue
if not os.path.isfile(file):
continue
for filename in filenames:
if file.startswith(filename):
break
else:
log.info(f"Removing unknown file: {file}")
if not self.args.dryrun:
os.unlink(file)
@property
def all_researched(self) -> bool:
for element in self.elements:
if not element.is_researched:
return False
return True
def attempt_clean(self) -> None:
if self.all_researched:
self.clean()
@property
def ytdl_opts(self) -> dict:
return {"format": self.args.format, "allsubtitles": self.args.subtitles}
@property
def ytdl_dry_opts(self) -> dict:
opts = self.ytdl_opts.copy()
opts.update({"simulate": True, "quiet": True})
return opts
@property
def ytdl(self) -> youtube_dl.YoutubeDL:
return youtube_dl.YoutubeDL(self.ytdl_opts)
@property
def ytdl_dry(self) -> youtube_dl.YoutubeDL:
return youtube_dl.YoutubeDL(self.ytdl_dry_opts)
def filter(self, args: configargparse.Namespace) -> typing.Iterable[RVElement]:
elements: typing.Iterable[RVElement]
if args.order == "old":
elements = self.elements
elif args.order == "new":
elements = reversed(self.elements)
elif args.order == "random":
elements_random = self.elements.copy()
random.shuffle(elements_random)
elements = elements_random
return filter(lambda el: el.matches_filter(args), elements)
def get_args() -> configargparse.Namespace:
defaultConfigPath = os.path.join(
os.path.expanduser(os.getenv("XDG_CONFIG_PATH", "~/.config/")), "rssVideos"
)
parser = configargparse.ArgParser(
description="Download videos linked in "
+ "a RSS feed (e.g. an unread feed from "
+ "an RSS aggregator",
default_config_files=[defaultConfigPath],
)
# Runtime settings
parser.add_argument(
"-v",
"--verbosity",
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
default=None,
help="Verbosity of log messages",
)
parser.add(
"-c", "--config", required=False, is_config_file=True, help="Configuration file"
)
parser.add(
"-n",
"--dryrun",
help="Only pretend to do actions",
action="store_const",
const=True,
default=False,
)
# Input/Output
parser.add(
"--feed",
help="URL of the RSS feed (must be public for now)",
env_var="RSS_VIDEOS_FEED",
required=True,
)
parser.add(
"--videos",
help="Directory to store videos",
env_var="RSS_VIDEOS_VIDEO_DIR",
required=True,
)
# Which videos
parser.add(
"--order",
choices=("old", "new", "random"),
default="old",
help="Sorting mechanism",
)
parser.add("--guid", help="Regex to filter guid")
parser.add("--creator", help="Regex to filter by creator")
parser.add("--title", help="Regex to filter by title")
parser.add("--link", help="Regex to filter by link")
parser.add("--duration", help="Comparative to filter by duration")
parser.add("--seen", choices=("seen","unseen","any"), default="unseen", help="Only include seen/unseen/any videos")
# TODO Envrionment variables
parser.add(
"--max-duration",
help="(Deprecated, use --duration instead)",
env_var="RSS_VIDEOS_MAX_DURATION",
type=int,
default=0,
)
# TODO Allow to ask
# How to download
parser.add(
"--format",
help="Use this format to download videos."
+ " See FORMAT SELECTION in youtube-dl(1)",
env_var="RSS_VIDEOS_FORMAT",
default="bestvideo+bestaudio/best",
)
parser.add(
"--subtitles",
help="Download all subtitles",
env_var="RSS_VIDEOS_SUBTITLES",
action="store_true",
)
parser.add(
"action",
nargs="?",
choices=("download", "list", "watch", "binge", "clean", "seen", "unseen"),
default="download",
)
args = parser.parse_args()
args.videos = os.path.realpath(os.path.expanduser(args.videos))
if not args.duration and args.max_duration:
args.duration = str(args.max_duration)
return args
def main() -> None:
args = get_args()
configure_logging(args)
os.makedirs(args.videos, exist_ok=True)
os.chdir(args.videos)
database = RVDatabase(args)
cache = RVDatabase.load()
try:
database.read_feed()
except urllib.error.URLError as err:
if args.action == "download" or not cache:
raise err
else:
log.warning("Cannot fetch RSS feed, using cached feed.", err)
database.import_cache(cache)
if cache:
database.salvage_cache(cache)
database.clean_cache(cache)
database.save()
log.debug(f"Running action")
if args.action == "clean":
database.clean()
else:
database.attempt_clean()
for element in database.filter(args):
if args.action == "download":
element.preload()
elif args.action == "list":
print(element)
elif args.action in ("watch", "binge"):
element.watch()
elif args.action == "seen":
element.watched = True
elif args.action == "unseen":
element.watched = False
if args.action == "watch":
break
database.attempt_clean()
database.save()
if __name__ == "__main__":
main()