dotfiles/config/scripts/rssVideos

444 lines
13 KiB
Plaintext
Raw Normal View History

2019-04-30 08:22:27 +02:00
#!/usr/bin/env python3
2019-04-30 08:22:27 +02:00
"""
Script that download videos that are linked as an article
in a RSS feed.
The common use case would be a feed from an RSS aggregator
with the unread items (non-video links are ignored).
"""
import enum
import functools
import logging
import os
import pickle
2021-12-17 22:13:46 +01:00
import re
2020-12-27 14:20:44 +01:00
import sys
import typing
2019-04-30 08:22:27 +02:00
import urllib.parse
import urllib.request
2019-04-30 08:22:27 +02:00
from xml.dom import minidom
import coloredlogs
2019-04-30 08:22:27 +02:00
import configargparse
import yt_dlp as youtube_dl
log = logging.getLogger(__name__)
def configure_logging(args: configargparse.Namespace) -> None:
# Configure logging
if args.verbosity:
coloredlogs.install(
level=args.verbosity,
)
else:
coloredlogs.install(
fmt="%(message)s",
logger=log,
)
class RVCommand(enum.Enum):
download = "download"
list = "list"
class RVElement:
parent: "RVDatabase"
item: minidom.Element
was_downloaded: bool
def __init__(self, parent: "RVDatabase", item: minidom.Element) -> None:
self.parent = parent
self.item = item
self.was_downloaded = False
def get_tag_data(self, tag_name: str) -> str:
nodes = self.item.getElementsByTagName(tag_name)
if len(nodes) != 1:
raise KeyError(f"Exepected 1 tag `{tag_name}`, got {len(nodes)}.")
children = nodes[0].childNodes
if len(children) != 1:
raise KeyError(
f"Exepected 1 children for tag `{tag_name}`, got {len(children)}."
)
return children[0].data
@property
def title(self) -> str:
return self.get_tag_data("title")
@property
def link(self) -> str:
return self.get_tag_data("link")
@property
def creator(self) -> typing.Optional[str]:
try:
return self.get_tag_data("dc:creator")
except KeyError:
return None
@property
def description(self) -> str:
# TODO Testing
return self.get_tag_data("description")
@property
def date(self) -> str:
# TODO datetime format
return self.get_tag_data("pubDate")
@property
def guid(self) -> int:
return int(self.get_tag_data("guid"))
def read_cache(self, cache: "RVElement") -> None:
if "ytdl_infos" in cache.__dict__:
self.__dict__["ytdl_infos"] = cache.__dict__["ytdl_infos"]
log.debug(f"From cache: {self}")
if cache.was_downloaded:
self.was_downloaded = True
def __str__(self) -> str:
return f"{self.guid}: {self.creator} {self.title} {self.link}"
@property
def downloaded(self) -> bool:
if "ytdl_infos" not in self.__dict__:
return False
return os.path.isfile(self.filepath)
@functools.cached_property
def ytdl_infos(self) -> typing.Optional[dict]:
log.info(f"Researching: {self}")
try:
infos = self.parent.ytdl_dry.extract_info(self.link)
except KeyboardInterrupt as e:
raise e
except youtube_dl.utils.DownloadError as e:
# TODO Still raise in case of temporary network issue
log.warn(e)
infos = None
# Apparently that thing is transformed from a LazyList
# somewhere in the normal yt_dlp process
if (
infos
and "thumbnails" in infos
and isinstance(infos["thumbnails"], youtube_dl.utils.LazyList)
):
infos["thumbnails"] = infos["thumbnails"].exhaust()
# Save database once it's been computed
self.__dict__["ytdl_infos"] = infos
self.parent.save()
return infos
@property
2021-12-17 22:42:35 +01:00
def duration(self) -> int:
assert self.is_video
assert self.ytdl_infos
2021-12-17 22:42:35 +01:00
return self.ytdl_infos["duration"]
@property
def skip(self) -> bool:
assert self.is_video
if (
self.parent.args.max_duration > 0
2021-12-17 22:42:35 +01:00
and self.duration > self.parent.args.max_duration
):
return True
return False
@property
def is_video(self) -> bool:
# Duration might be missing in playlists and stuff
return self.ytdl_infos is not None and "duration" in self.ytdl_infos
@property
def filepath(self) -> str:
assert self.is_video
# TODO This doesn't change the extension to mkv when the formats are incomaptible
return self.parent.ytdl_dry.prepare_filename(self.ytdl_infos)
@property
def filename(self) -> str:
assert self.is_video
return os.path.splitext(self.filepath)[0]
def download(self) -> None:
assert self.is_video
log.info(f"Downloading: {self}")
if not self.parent.args.dryrun:
self.parent.ytdl.process_ie_result(self.ytdl_infos, True, {})
self.was_downloaded = True
self.parent.save()
def act(self) -> None:
if not self.is_video:
log.debug(f"Not a video: {self}")
return
if self.downloaded:
log.debug(f"Currently downloaded: {self}")
return
if self.was_downloaded:
log.debug(f"Downloaded previously: {self}")
return
if self.skip:
log.debug(f"Skipped: {self}")
return
self.download()
2021-12-17 22:42:35 +01:00
MATCHES_DURATION_MULTIPLIERS = {"s": 1, "m": 60, "h": 3600, None: 1}
MATCHES_DURATION_COMPARATORS = {
"<": int.__lt__,
"-": int.__lt__,
">": int.__gt__,
"+": int.__gt__,
"=": int.__eq__,
None: int.__le__,
}
2021-12-17 22:13:46 +01:00
def matches_search(self, args: configargparse.Namespace) -> bool:
if not self.is_video:
return False
if args.title and not re.search(args.title, self.title):
return False
if args.creator and not re.search(args.creator, self.creator):
return False
if args.guid and not re.search(args.guid, str(self.guid)):
return False
if args.link and not re.search(args.link, self.link):
return False
2021-12-17 22:42:35 +01:00
if args.duration:
dur = args.duration
mult_index = dur[-1].lower()
if mult_index.isdigit():
mult_index = None
else:
dur = dur[:-1]
try:
multiplier = self.MATCHES_DURATION_MULTIPLIERS[mult_index]
except IndexError:
raise ValueError(f"Unknown duration multiplier: {mult_index}")
comp_index = dur[0]
if comp_index.isdigit():
comp_index = None
else:
dur = dur[1:]
try:
comparator = self.MATCHES_DURATION_COMPARATORS[comp_index]
except IndexError:
raise ValueError(f"Unknown duration comparator: {comp_index}")
duration = int(dur)
if not comparator(self.duration, duration * multiplier):
return False
2021-12-17 22:13:46 +01:00
return True
class RVDatabase:
SAVE_FILE = ".cache.p"
args: configargparse.Namespace
elements: list[RVElement]
def __init__(self, args: configargparse.Namespace) -> None:
self.args = args
def save(self) -> None:
log.debug("Saving cache")
if self.args.dryrun:
return
with open(self.SAVE_FILE, "wb") as save_file:
pickle.dump(self, save_file)
@classmethod
def load(cls) -> typing.Optional["RVDatabase"]:
try:
with open(cls.SAVE_FILE, "rb") as save_file:
return pickle.load(save_file)
except (TypeError, AttributeError, EOFError):
log.warn("Corrupt / outdated cache, it will be rebuilt.")
except FileNotFoundError:
pass
return None
def read_cache(self, cache: "RVDatabase") -> None:
cache_els = dict()
for cache_el in cache.elements:
cache_els[cache_el.guid] = cache_el
for el in self.elements:
if el.guid in cache_els:
el.read_cache(cache_els[el.guid])
@functools.cached_property
def feed_xml(self) -> minidom.Document:
with urllib.request.urlopen(self.args.feed) as request:
return minidom.parse(request)
def read_feed(self) -> None:
log.info("Fetching RSS feed")
self.elements = list()
for item in self.feed_xml.getElementsByTagName("item"):
element = RVElement(self, item)
self.elements.insert(0, element)
log.debug(f"Known: {element}")
def clean(self) -> None:
filenames = set()
for element in self.elements:
if element.is_video and not element.skip:
filenames.add(element.filename)
for file in os.listdir():
if file == RVDatabase.SAVE_FILE:
continue
if not os.path.isfile(file):
continue
for filename in filenames:
if file.startswith(filename):
break
else:
log.info(f"Removing: {file}")
if not self.args.dryrun:
os.unlink(file)
def act_all(self) -> None:
for element in self.elements:
element.act()
@property
def ytdl_opts(self) -> dict:
return {"format": self.args.format, "allsubtitles": self.args.subtitles}
@property
def ytdl_dry_opts(self) -> dict:
opts = self.ytdl_opts.copy()
opts.update({"simulate": True, "quiet": True})
return opts
@property
def ytdl(self) -> youtube_dl.YoutubeDL:
return youtube_dl.YoutubeDL(self.ytdl_opts)
@property
def ytdl_dry(self) -> youtube_dl.YoutubeDL:
return youtube_dl.YoutubeDL(self.ytdl_dry_opts)
2019-04-30 08:22:27 +02:00
2020-12-27 14:20:44 +01:00
def get_args() -> configargparse.Namespace:
defaultConfigPath = os.path.join(
os.path.expanduser(os.getenv("XDG_CONFIG_PATH", "~/.config/")), "rssVideos"
)
parser = configargparse.ArgParser(
description="Download videos linked in "
+ "a RSS feed (e.g. an unread feed from "
+ "an RSS aggregator",
default_config_files=[defaultConfigPath],
)
parser.add_argument(
"-v",
"--verbosity",
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
default=None,
help="Verbosity of log messages",
)
2020-12-27 14:20:44 +01:00
parser.add(
"-c", "--config", required=False, is_config_file=True, help="Configuration file"
)
parser.add(
"--feed",
help="URL of the RSS feed (must be public for now)",
env_var="RSS_VIDEOS_FEED",
required=True,
)
parser.add(
"--videos",
help="Directory to store videos",
env_var="RSS_VIDEOS_VIDEO_DIR",
required=True,
)
parser.add(
"-n",
"--dryrun",
help="Do not download the videos",
action="store_const",
const=True,
default=False,
)
parser.add(
"--max-duration",
help="Skip video longer than this amount of seconds",
env_var="RSS_VIDEOS_MAX_DURATION",
type=int,
default=0,
)
parser.add(
"--format",
help="Use this format to download videos."
+ " See FORMAT SELECTION in youtube-dl(1)",
env_var="RSS_VIDEOS_FORMAT",
default="bestvideo+bestaudio/best",
)
parser.add(
"--subtitles",
help="Download all subtitles",
env_var="RSS_VIDEOS_SUBTITLES",
action="store_true",
)
2019-04-30 08:22:27 +02:00
parser.set_defaults(subcommand=RVCommand.download)
subparsers = parser.add_subparsers(title="subcommand")
sc_download = subparsers.add_parser("download")
sc_download.set_defaults(subcommand=RVCommand.download)
sc_list = subparsers.add_parser("list")
sc_list.set_defaults(subcommand=RVCommand.list)
2021-12-17 22:13:46 +01:00
sc_list.add("--guid", help="Regex to filter guid")
2021-12-17 22:42:35 +01:00
sc_list.add("--creator", help="Regex to filter by creator")
sc_list.add("--title", help="Regex to filter by title")
sc_list.add("--link", help="Regex to filter by link")
sc_list.add("--duration", help="Comparative to filter by duration")
2019-04-30 08:22:27 +02:00
args = parser.parse_args()
args.videos = os.path.realpath(os.path.expanduser(args.videos))
2020-12-27 14:20:44 +01:00
return args
2019-04-30 08:22:27 +02:00
2020-12-27 14:20:44 +01:00
def main() -> None:
args = get_args()
configure_logging(args)
2020-12-27 14:20:44 +01:00
os.makedirs(args.videos, exist_ok=True)
2019-04-30 08:22:27 +02:00
os.chdir(args.videos)
if args.subcommand == RVCommand.download:
database = RVDatabase(args)
database.read_feed()
cache = RVDatabase.load()
if cache:
database.read_cache(cache)
database.clean()
database.act_all()
database.save()
elif args.subcommand == RVCommand.list:
cache = RVDatabase.load()
if not cache:
raise FileNotFoundError("This command doesn't work without a cache yet.")
for element in cache.elements:
2021-12-17 22:13:46 +01:00
if not element.matches_search(args):
continue
print(element)
2020-12-27 14:20:44 +01:00
if __name__ == "__main__":
main()