381 lines
11 KiB
Python
Executable file
381 lines
11 KiB
Python
Executable file
#!/usr/bin/env python3
|
||
|
||
|
||
"""
|
||
Script that download videos that are linked as an article
|
||
in a RSS feed.
|
||
The common use case would be a feed from an RSS aggregator
|
||
with the unread items (non-video links are ignored).
|
||
"""
|
||
|
||
import enum
|
||
import functools
|
||
import logging
|
||
import os
|
||
import pickle
|
||
import sys
|
||
import typing
|
||
import urllib.parse
|
||
import urllib.request
|
||
from xml.dom import minidom
|
||
|
||
import coloredlogs
|
||
import configargparse
|
||
import yt_dlp as youtube_dl
|
||
|
||
log = logging.getLogger(__name__)
|
||
|
||
|
||
def configure_logging(args: configargparse.Namespace) -> None:
|
||
# Configure logging
|
||
if args.verbosity:
|
||
coloredlogs.install(
|
||
level=args.verbosity,
|
||
)
|
||
else:
|
||
coloredlogs.install(
|
||
fmt="%(message)s",
|
||
logger=log,
|
||
)
|
||
|
||
|
||
class RVCommand(enum.Enum):
|
||
download = "download"
|
||
list = "list"
|
||
|
||
|
||
class RVElement:
|
||
parent: "RVDatabase"
|
||
item: minidom.Element
|
||
was_downloaded: bool
|
||
|
||
def __init__(self, parent: "RVDatabase", item: minidom.Element) -> None:
|
||
self.parent = parent
|
||
self.item = item
|
||
self.was_downloaded = False
|
||
|
||
def get_tag_data(self, tag_name: str) -> str:
|
||
nodes = self.item.getElementsByTagName(tag_name)
|
||
if len(nodes) != 1:
|
||
raise KeyError(f"Exepected 1 tag `{tag_name}`, got {len(nodes)}.")
|
||
children = nodes[0].childNodes
|
||
if len(children) != 1:
|
||
raise KeyError(
|
||
f"Exepected 1 children for tag `{tag_name}`, got {len(children)}."
|
||
)
|
||
return children[0].data
|
||
|
||
@property
|
||
def title(self) -> str:
|
||
return self.get_tag_data("title")
|
||
|
||
@property
|
||
def link(self) -> str:
|
||
return self.get_tag_data("link")
|
||
|
||
@property
|
||
def creator(self) -> typing.Optional[str]:
|
||
try:
|
||
return self.get_tag_data("dc:creator")
|
||
except KeyError:
|
||
return None
|
||
|
||
@property
|
||
def description(self) -> str:
|
||
# TODO Testing
|
||
return self.get_tag_data("description")
|
||
|
||
@property
|
||
def date(self) -> str:
|
||
# TODO datetime format
|
||
return self.get_tag_data("pubDate")
|
||
|
||
@property
|
||
def guid(self) -> int:
|
||
return int(self.get_tag_data("guid"))
|
||
|
||
def read_cache(self, cache: "RVElement") -> None:
|
||
if "ytdl_infos" in cache.__dict__:
|
||
self.__dict__["ytdl_infos"] = cache.__dict__["ytdl_infos"]
|
||
log.debug(f"From cache: {self}")
|
||
if cache.was_downloaded:
|
||
self.was_downloaded = True
|
||
|
||
def __str__(self) -> str:
|
||
return f"{self.guid}: {self.creator} – {self.title} – {self.link}"
|
||
|
||
@property
|
||
def downloaded(self) -> bool:
|
||
if "ytdl_infos" not in self.__dict__:
|
||
return False
|
||
return os.path.isfile(self.filepath)
|
||
|
||
@functools.cached_property
|
||
def ytdl_infos(self) -> typing.Optional[dict]:
|
||
log.info(f"Researching: {self}")
|
||
try:
|
||
infos = self.parent.ytdl_dry.extract_info(self.link)
|
||
except KeyboardInterrupt as e:
|
||
raise e
|
||
except youtube_dl.utils.DownloadError as e:
|
||
# TODO Still raise in case of temporary network issue
|
||
log.warn(e)
|
||
infos = None
|
||
# Apparently that thing is transformed from a LazyList
|
||
# somewhere in the normal yt_dlp process
|
||
if (
|
||
infos
|
||
and "thumbnails" in infos
|
||
and isinstance(infos["thumbnails"], youtube_dl.utils.LazyList)
|
||
):
|
||
infos["thumbnails"] = infos["thumbnails"].exhaust()
|
||
# Save database once it's been computed
|
||
self.__dict__["ytdl_infos"] = infos
|
||
self.parent.save()
|
||
return infos
|
||
|
||
@property
|
||
def skip(self) -> bool:
|
||
assert self.is_video
|
||
assert self.ytdl_infos
|
||
if (
|
||
self.parent.args.max_duration > 0
|
||
and self.ytdl_infos["duration"] > self.parent.args.max_duration
|
||
):
|
||
return True
|
||
return False
|
||
|
||
@property
|
||
def is_video(self) -> bool:
|
||
# Duration might be missing in playlists and stuff
|
||
return self.ytdl_infos is not None and "duration" in self.ytdl_infos
|
||
|
||
@property
|
||
def filepath(self) -> str:
|
||
assert self.is_video
|
||
# TODO This doesn't change the extension to mkv when the formats are incomaptible
|
||
return self.parent.ytdl_dry.prepare_filename(self.ytdl_infos)
|
||
|
||
@property
|
||
def filename(self) -> str:
|
||
assert self.is_video
|
||
return os.path.splitext(self.filepath)[0]
|
||
|
||
def download(self) -> None:
|
||
assert self.is_video
|
||
log.info(f"Downloading: {self}")
|
||
if not self.parent.args.dryrun:
|
||
self.parent.ytdl.process_ie_result(self.ytdl_infos, True, {})
|
||
self.was_downloaded = True
|
||
self.parent.save()
|
||
|
||
def act(self) -> None:
|
||
if not self.is_video:
|
||
log.debug(f"Not a video: {self}")
|
||
return
|
||
if self.downloaded:
|
||
log.debug(f"Currently downloaded: {self}")
|
||
return
|
||
if self.was_downloaded:
|
||
log.debug(f"Downloaded previously: {self}")
|
||
return
|
||
if self.skip:
|
||
log.debug(f"Skipped: {self}")
|
||
return
|
||
self.download()
|
||
|
||
|
||
class RVDatabase:
|
||
SAVE_FILE = ".cache.p"
|
||
|
||
args: configargparse.Namespace
|
||
elements: list[RVElement]
|
||
|
||
def __init__(self, args: configargparse.Namespace) -> None:
|
||
self.args = args
|
||
|
||
def save(self) -> None:
|
||
log.debug("Saving cache")
|
||
if self.args.dryrun:
|
||
return
|
||
with open(self.SAVE_FILE, "wb") as save_file:
|
||
pickle.dump(self, save_file)
|
||
|
||
@classmethod
|
||
def load(cls) -> typing.Optional["RVDatabase"]:
|
||
try:
|
||
with open(cls.SAVE_FILE, "rb") as save_file:
|
||
return pickle.load(save_file)
|
||
except (TypeError, AttributeError, EOFError):
|
||
log.warn("Corrupt / outdated cache, it will be rebuilt.")
|
||
except FileNotFoundError:
|
||
pass
|
||
return None
|
||
|
||
def read_cache(self, cache: "RVDatabase") -> None:
|
||
cache_els = dict()
|
||
for cache_el in cache.elements:
|
||
cache_els[cache_el.guid] = cache_el
|
||
for el in self.elements:
|
||
if el.guid in cache_els:
|
||
el.read_cache(cache_els[el.guid])
|
||
|
||
@functools.cached_property
|
||
def feed_xml(self) -> minidom.Document:
|
||
with urllib.request.urlopen(self.args.feed) as request:
|
||
return minidom.parse(request)
|
||
|
||
def read_feed(self) -> None:
|
||
log.info("Fetching RSS feed")
|
||
self.elements = list()
|
||
for item in self.feed_xml.getElementsByTagName("item"):
|
||
element = RVElement(self, item)
|
||
self.elements.insert(0, element)
|
||
log.debug(f"Known: {element}")
|
||
|
||
def clean(self) -> None:
|
||
filenames = set()
|
||
for element in self.elements:
|
||
if element.is_video and not element.skip:
|
||
filenames.add(element.filename)
|
||
for file in os.listdir():
|
||
if file == RVDatabase.SAVE_FILE:
|
||
continue
|
||
if not os.path.isfile(file):
|
||
continue
|
||
for filename in filenames:
|
||
if file.startswith(filename):
|
||
break
|
||
else:
|
||
log.info(f"Removing: {file}")
|
||
if not self.args.dryrun:
|
||
os.unlink(file)
|
||
|
||
def act_all(self) -> None:
|
||
for element in self.elements:
|
||
element.act()
|
||
|
||
@property
|
||
def ytdl_opts(self) -> dict:
|
||
return {"format": self.args.format, "allsubtitles": self.args.subtitles}
|
||
|
||
@property
|
||
def ytdl_dry_opts(self) -> dict:
|
||
opts = self.ytdl_opts.copy()
|
||
opts.update({"simulate": True, "quiet": True})
|
||
return opts
|
||
|
||
@property
|
||
def ytdl(self) -> youtube_dl.YoutubeDL:
|
||
return youtube_dl.YoutubeDL(self.ytdl_opts)
|
||
|
||
@property
|
||
def ytdl_dry(self) -> youtube_dl.YoutubeDL:
|
||
return youtube_dl.YoutubeDL(self.ytdl_dry_opts)
|
||
|
||
|
||
def get_args() -> configargparse.Namespace:
|
||
defaultConfigPath = os.path.join(
|
||
os.path.expanduser(os.getenv("XDG_CONFIG_PATH", "~/.config/")), "rssVideos"
|
||
)
|
||
|
||
parser = configargparse.ArgParser(
|
||
description="Download videos linked in "
|
||
+ "a RSS feed (e.g. an unread feed from "
|
||
+ "an RSS aggregator",
|
||
default_config_files=[defaultConfigPath],
|
||
)
|
||
parser.add_argument(
|
||
"-v",
|
||
"--verbosity",
|
||
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
|
||
default=None,
|
||
help="Verbosity of log messages",
|
||
)
|
||
parser.add(
|
||
"-c", "--config", required=False, is_config_file=True, help="Configuration file"
|
||
)
|
||
parser.add(
|
||
"--feed",
|
||
help="URL of the RSS feed (must be public for now)",
|
||
env_var="RSS_VIDEOS_FEED",
|
||
required=True,
|
||
)
|
||
parser.add(
|
||
"--videos",
|
||
help="Directory to store videos",
|
||
env_var="RSS_VIDEOS_VIDEO_DIR",
|
||
required=True,
|
||
)
|
||
parser.add(
|
||
"-n",
|
||
"--dryrun",
|
||
help="Do not download the videos",
|
||
action="store_const",
|
||
const=True,
|
||
default=False,
|
||
)
|
||
parser.add(
|
||
"--max-duration",
|
||
help="Skip video longer than this amount of seconds",
|
||
env_var="RSS_VIDEOS_MAX_DURATION",
|
||
type=int,
|
||
default=0,
|
||
)
|
||
parser.add(
|
||
"--format",
|
||
help="Use this format to download videos."
|
||
+ " See FORMAT SELECTION in youtube-dl(1)",
|
||
env_var="RSS_VIDEOS_FORMAT",
|
||
default="bestvideo+bestaudio/best",
|
||
)
|
||
parser.add(
|
||
"--subtitles",
|
||
help="Download all subtitles",
|
||
env_var="RSS_VIDEOS_SUBTITLES",
|
||
action="store_true",
|
||
)
|
||
|
||
parser.set_defaults(subcommand=RVCommand.download)
|
||
subparsers = parser.add_subparsers(title="subcommand")
|
||
|
||
sc_download = subparsers.add_parser("download")
|
||
sc_download.set_defaults(subcommand=RVCommand.download)
|
||
|
||
sc_list = subparsers.add_parser("list")
|
||
sc_list.set_defaults(subcommand=RVCommand.list)
|
||
|
||
args = parser.parse_args()
|
||
args.videos = os.path.realpath(os.path.expanduser(args.videos))
|
||
|
||
return args
|
||
|
||
|
||
def main() -> None:
|
||
args = get_args()
|
||
configure_logging(args)
|
||
|
||
os.makedirs(args.videos, exist_ok=True)
|
||
os.chdir(args.videos)
|
||
|
||
if args.subcommand == RVCommand.download:
|
||
database = RVDatabase(args)
|
||
database.read_feed()
|
||
cache = RVDatabase.load()
|
||
if cache:
|
||
database.read_cache(cache)
|
||
database.clean()
|
||
database.act_all()
|
||
database.save()
|
||
|
||
elif args.subcommand == RVCommand.list:
|
||
cache = RVDatabase.load()
|
||
if not cache:
|
||
raise FileNotFoundError("This command doesn't work without a cache yet.")
|
||
for element in cache.elements:
|
||
print(element)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|