Refactored rssVideos

- Has a cache for yt-dlp research (save time on reruns)
- Simplified logic for cleanup / continue downloading
- Using OOP / functional programming (?)
- Removed tracking logic (unused)
This commit is contained in:
Geoffrey Frogeye 2021-12-10 22:59:39 +01:00
parent a01b369bc8
commit 814b316584
Signed by: geoffrey
GPG key ID: C72403E7F82E6AD8

View file

@ -1,5 +1,6 @@
#!/usr/bin/env python3
"""
Script that download videos that are linked as an article
in a RSS feed.
@ -8,17 +9,235 @@ with the unread items (non-video links are ignored).
"""
# TODO Distribute this correclty, in the meanwhile please do
# pip install --user yt-dlp ConfigArgParse
# pip install --user coloredlogs ConfigArgParse yt-dlp
# TODO Better logging (youtube-dl allow to pass loggers)
import sys
import urllib.request
import urllib.parse
import enum
import functools
import logging
import os
import pickle
import sys
import typing
import urllib.parse
import urllib.request
from xml.dom import minidom
import yt_dlp as youtube_dl
import coloredlogs
import configargparse
import yt_dlp as youtube_dl
log = logging.getLogger(__name__)
def configure_logging(args: configargparse.Namespace) -> None:
# Configure logging
if args.verbosity:
coloredlogs.install(
level=args.verbosity,
)
else:
coloredlogs.install(
fmt="%(message)s",
logger=log,
)
class RVElement:
title: str
link: str
# creator: str
# description: str
# date: datetime.datetime
guid: int
parent: "RVDatabase"
def __init__(self, parent: "RVDatabase", item: minidom.Element) -> None:
def get_data(tag_name: str) -> str:
nodes = item.getElementsByTagName(tag_name)
if len(nodes) != 1:
raise RuntimeError(f"Exepected 1 tag `{tag_name}`, got {len(nodes)}.")
children = nodes[0].childNodes
if len(children) != 1:
raise RuntimeError(
f"Exepected 1 children for tag `{tag_name}`, got {len(children)}."
)
return children[0].data
self.title = get_data("title")
self.link = get_data("link")
# self.creator = get_data("dc:creator")
# self.description = get_data("description")
# self.date = get_data("pubDate")
self.guid = int(get_data("guid"))
self.parent = parent
def read_cache(self, cache: "RVElement") -> None:
if "ytdl_infos" in cache.__dict__:
self.__dict__["ytdl_infos"] = cache.__dict__["ytdl_infos"]
log.debug(f"From cache: {self}")
def __str__(self) -> str:
return f"{self.title} {self.link}"
@property
def downloaded(self) -> bool:
if "ytdl_infos" not in self.__dict__:
return False
return os.path.isfile(self.filepath)
@functools.cached_property
def ytdl_infos(self) -> typing.Optional[dict]:
log.info(f"Researching: {self}")
try:
infos = self.parent.ytdl_dry.extract_info(self.link)
except BaseException as e:
# TODO Still raise in case of temporary network issue
log.warn(e)
infos = None
# Apparently that thing is transformed from a LazyList
# somewhere in the normal yt_dlp process
if (
infos
and "thumbnails" in infos
and isinstance(infos["thumbnails"], youtube_dl.utils.LazyList)
):
infos["thumbnails"] = infos["thumbnails"].exhaust()
# Save database once it's been computed
self.__dict__["ytdl_infos"] = infos
self.parent.save()
return infos
@property
def skip(self) -> bool:
assert self.is_video
assert self.ytdl_infos
if (
self.parent.args.max_duration > 0
and self.ytdl_infos["duration"] > self.parent.args.max_duration
):
return True
return False
@property
def is_video(self) -> bool:
# Duration might be missing in playlists and stuff
return self.ytdl_infos is not None and "duration" in self.ytdl_infos
@property
def filepath(self) -> str:
assert self.is_video
return self.parent.ytdl_dry.prepare_filename(self.ytdl_infos)
@property
def filename(self) -> str:
assert self.is_video
return os.path.splitext(self.filepath)[0]
def download(self) -> None:
assert self.is_video
log.info(f"Downloading: {self}")
if self.parent.args.dryrun:
return
self.parent.ytdl.process_ie_result(self.ytdl_infos, True, {})
def act(self) -> None:
if not self.is_video:
log.debug(f"Not a video: {self}")
return
if self.downloaded:
log.debug(f"Already downloaded: {self}")
return
if self.skip:
log.debug(f"Skipped: {self}")
return
self.download()
class RVDatabase:
SAVE_FILE = ".cache.p"
args: configargparse.Namespace
elements: list[RVElement]
def __init__(self, args: configargparse.Namespace) -> None:
self.args = args
def save(self) -> None:
if self.args.dryrun:
return
with open(self.SAVE_FILE, "wb") as save_file:
pickle.dump(self, save_file)
@classmethod
def load(cls) -> typing.Optional["RVDatabase"]:
try:
with open(cls.SAVE_FILE, "rb") as save_file:
return pickle.load(save_file)
except (TypeError, AttributeError, EOFError):
log.warn("Corrupt / outdated cache, it will be rebuilt.")
except FileNotFoundError:
pass
return None
def read_cache(self, cache: "RVDatabase") -> None:
cache_els = dict()
for cache_el in cache.elements:
cache_els[cache_el.guid] = cache_el
for el in self.elements:
if el.guid in cache_els:
el.read_cache(cache_els[el.guid])
def read_feed(self) -> None:
log.info("Fetching RSS feed")
self.elements = list()
with urllib.request.urlopen(self.args.feed) as request:
with minidom.parse(request) as xmldoc:
for item in xmldoc.getElementsByTagName("item"):
element = RVElement(self, item)
self.elements.insert(0, element)
log.debug(f"Known: {element}")
def clean(self) -> None:
filenames = set()
for element in self.elements:
if element.is_video:
filenames.add(element.filename)
for file in os.listdir():
if file == RVDatabase.SAVE_FILE:
continue
if not os.path.isfile(file):
continue
for filename in filenames:
if file.startswith(filename):
break
else:
log.info(f"Removing: {file}")
if not self.args.dryrun:
os.unlink(file)
def act_all(self) -> None:
for element in self.elements:
element.act()
@property
def ytdl_opts(self) -> dict:
return {"format": self.args.format, "allsubtitles": self.args.subtitles}
@property
def ytdl_dry_opts(self) -> dict:
opts = self.ytdl_opts.copy()
opts.update({"simulate": True, "quiet": True})
return opts
@property
def ytdl(self) -> youtube_dl.YoutubeDL:
return youtube_dl.YoutubeDL(self.ytdl_opts)
@property
def ytdl_dry(self) -> youtube_dl.YoutubeDL:
return youtube_dl.YoutubeDL(self.ytdl_dry_opts)
def get_args() -> configargparse.Namespace:
@ -32,6 +251,13 @@ def get_args() -> configargparse.Namespace:
+ "an RSS aggregator",
default_config_files=[defaultConfigPath],
)
parser.add_argument(
"-v",
"--verbosity",
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
default=None,
help="Verbosity of log messages",
)
parser.add(
"-c", "--config", required=False, is_config_file=True, help="Configuration file"
)
@ -55,15 +281,6 @@ def get_args() -> configargparse.Namespace:
const=True,
default=False,
)
# TODO This feature might require additional documentation and an on/off switch
parser.add(
"--track",
help="Directory where download videos are marked "
+ "to not download them after deletion.",
env_var="RSS_VIDEOS_TRACK",
required=False,
default=".rssVideos",
)
parser.add(
"--max-duration",
help="Skip video longer than this amount of seconds",
@ -87,207 +304,25 @@ def get_args() -> configargparse.Namespace:
args = parser.parse_args()
args.videos = os.path.realpath(os.path.expanduser(args.videos))
args.track = os.path.expanduser(args.track)
if not os.path.isabs(args.track):
args.track = os.path.realpath(os.path.join(args.videos, args.track))
return args
def get_links(args: configargparse.Namespace) -> list[str]:
"""
Read the feed XML, get the links
"""
links = list()
with urllib.request.urlopen(args.feed) as request:
with minidom.parse(request) as xmldoc:
for item in xmldoc.getElementsByTagName("item"):
try:
linkNode = item.getElementsByTagName("link")[0]
link: str = linkNode.childNodes[0].data
if link not in links:
links.append(link)
except BaseException as e:
print("Error while getting link from item:", e)
continue
return links
def get_video_infos(
args: configargparse.Namespace, ydl_opts: dict, links: list[str]
) -> dict[str, dict]:
"""
Filter out non-video links and store video download info
and associated filename
"""
videosInfos = dict()
dry_ydl_opts = ydl_opts.copy()
dry_ydl_opts.update({"simulate": True, "quiet": True})
with youtube_dl.YoutubeDL(dry_ydl_opts) as ydl:
for link in links:
print(f"Researching {link}...")
try:
infos = ydl.extract_info(link)
if args.max_duration > 0 and infos["duration"] > args.max_duration:
print(
f"{infos['title']}: Skipping as longer than max duration: "
f"{infos['duration']} > {args.max_duration}"
)
continue
filepath = ydl.prepare_filename(infos)
filename, extension = os.path.splitext(filepath)
videosInfos[filename] = infos
print(f"{infos['title']}: Added")
except BaseException as e:
print(e)
continue
return videosInfos
def get_downloaded_videos(
args: configargparse.Namespace, videosInfos: dict[str, dict]
) -> tuple[set[str], set[str]]:
videosDownloaded = set()
videosPartiallyDownloaded = set()
"""
Read the directory content, delete everything that's not a
video on the download list or already downloaded
"""
for filepath in os.listdir(args.videos):
fullpath = os.path.join(args.videos, filepath)
if not os.path.isfile(fullpath):
continue
filename, extension = os.path.splitext(filepath)
for onlineFilename in videosInfos.keys():
# Full name already there: completly downloaded
# → remove from the download list
if filename == onlineFilename:
videosDownloaded.add(onlineFilename)
break
elif filename.startswith(onlineFilename):
# Subtitle file
# → ignore
if filename.endswith(".vtt"):
break
# Partial name already there: not completly downloaded
# → keep on the download list
videosPartiallyDownloaded.add(onlineFilename)
break
# Unrelated filename: delete
else:
print(f"Deleting: {filename}")
os.unlink(fullpath)
return videosDownloaded, videosPartiallyDownloaded
def get_tracked_videos(args: configargparse.Namespace, known: set[str]) -> set[str]:
"""
Return videos previously downloaded (=tracked) amongst the unread videos.
This is stored in the tracking directory as empty extension-less files.
Other tracking markers (e.g. for now read videos) are deleted.
"""
videosTracked = set()
for filepath in os.listdir(args.track):
fullpath = os.path.join(args.track, filepath)
if not os.path.isfile(fullpath):
continue
# Here filename is a filepath as no extension
if filepath in known:
videosTracked.add(filepath)
else:
os.unlink(fullpath)
return videosTracked
def main() -> None:
args = get_args()
configure_logging(args)
os.makedirs(args.videos, exist_ok=True)
os.makedirs(args.track, exist_ok=True)
ydl_opts = {"format": args.format, "allsubtitles": args.subtitles}
print("→ Retrieveing RSS feed")
links = get_links(args)
# Oldest first
links = links[::-1]
print(f"→ Getting infos on {len(links)} unread articles")
videosInfos = get_video_infos(args, ydl_opts, links)
print(f"→ Deciding on what to do for {len(videosInfos)} videos")
videosDownloaded, videosPartiallyDownloaded = get_downloaded_videos(
args, videosInfos
)
videosTracked = get_tracked_videos(args, set(videosInfos.keys()))
# Deciding for the rest based on the informations
def markTracked(filename: str) -> None:
markerPath = os.path.join(args.track, onlineFilename)
open(markerPath, "a").close()
videosToDownload: set[str] = set()
videosReads: set[str] = set()
for onlineFilename in videosInfos.keys():
# If the video was once downloaded but manually deleted,
# the marker should be left
if onlineFilename in videosTracked:
print(f"Should be marked as read: {onlineFilename}")
# TODO Automatically do that one day maybe?
# Need to login to the FreshRSS API and keep track of
# the item id along the process
videosReads.add(onlineFilename)
elif onlineFilename in videosDownloaded:
markTracked(onlineFilename)
print(f"Already downloaded: {onlineFilename}")
else:
if onlineFilename in videosPartiallyDownloaded:
print(f"Will be continued: {onlineFilename}")
else:
print(f"Will be downloaded: {onlineFilename}")
videosToDownload.add(onlineFilename)
# Download the missing videos
print(f"→ Downloading {len(videosToDownload)} videos")
os.chdir(args.videos)
exit_code = 0
with youtube_dl.YoutubeDL(ydl_opts) as ydl:
for onlineFilename, infos in videosInfos.items():
if onlineFilename not in videosToDownload:
continue
# Really download
if args.dryrun:
print(f"Would download {onlineFilename}")
else:
# Apparently that thing is transformed from a LazyList
# somewhere in the normal yt_dlp process
if isinstance(infos["thumbnails"], youtube_dl.utils.LazyList):
infos["thumbnails"] = infos["thumbnails"].exhaust()
try:
ydl.process_ie_result(infos, True, {})
markTracked(onlineFilename)
except BaseException as e:
print(e)
exit_code = 1
continue
sys.exit(exit_code)
database = RVDatabase(args)
database.read_feed()
cache = RVDatabase.load()
if cache:
database.read_cache(cache)
database.clean()
database.act_all()
database.save()
if __name__ == "__main__":