683 lines
20 KiB
Python
Executable file
683 lines
20 KiB
Python
Executable file
#!/usr/bin/env python3
|
||
|
||
|
||
"""
|
||
Script that download videos that are linked as an article
|
||
in a RSS feed.
|
||
The common use case would be a feed from an RSS aggregator
|
||
with the unread items (non-video links are ignored).
|
||
"""
|
||
|
||
import functools
|
||
import logging
|
||
import os
|
||
import pickle
|
||
import random
|
||
import requests
|
||
import re
|
||
import subprocess
|
||
import time
|
||
import typing
|
||
|
||
import coloredlogs
|
||
import configargparse
|
||
import yt_dlp
|
||
|
||
log = logging.getLogger(__name__)
|
||
|
||
# TODO Lockfile, or a way to parallel watch and download
|
||
# TODO Save ytdl infos and view info separately
|
||
|
||
|
||
def configure_logging(args: configargparse.Namespace) -> None:
|
||
# Configure logging
|
||
if args.verbosity:
|
||
coloredlogs.install(
|
||
level=args.verbosity,
|
||
)
|
||
else:
|
||
coloredlogs.install(
|
||
fmt="%(message)s",
|
||
logger=log,
|
||
)
|
||
|
||
|
||
class SaveInfoPP(yt_dlp.postprocessor.common.PostProcessor):
|
||
"""
|
||
yt_dlp.process_ie_result() doesn't return a completely updated info dict,
|
||
notably the extension is still the one before it realizes the files cannot
|
||
be merged. So we use this PostProcessor to catch the info dict in its final
|
||
form and save what we need from it (it's not serializable in this state).
|
||
"""
|
||
|
||
def __init__(self, rvelement: "RVElement") -> None:
|
||
self.rvelement = rvelement
|
||
super().__init__()
|
||
|
||
def run(self, info: dict) -> tuple[list, dict]:
|
||
self.rvelement.update_post_download(info)
|
||
return [], info
|
||
|
||
|
||
def parse_duration(string: str) -> int:
|
||
DURATION_MULTIPLIERS = {"s": 1, "m": 60, "h": 3600, "": 1}
|
||
|
||
mult_index = string[-1].lower()
|
||
if mult_index.isdigit():
|
||
mult_index = ""
|
||
else:
|
||
string = string[:-1]
|
||
try:
|
||
multiplier = DURATION_MULTIPLIERS[mult_index]
|
||
except IndexError:
|
||
raise ValueError(f"Unknown duration multiplier: {mult_index}")
|
||
|
||
return int(string) * multiplier
|
||
|
||
|
||
def compare_duration(compstr: str) -> typing.Callable[[int], bool]:
|
||
DURATION_COMPARATORS = {
|
||
"<": int.__lt__,
|
||
"-": int.__lt__,
|
||
">": int.__gt__,
|
||
"+": int.__gt__,
|
||
"=": int.__eq__,
|
||
"": int.__le__,
|
||
}
|
||
|
||
comp_index = compstr[0]
|
||
if comp_index.isdigit():
|
||
comp_index = ""
|
||
else:
|
||
compstr = compstr[1:]
|
||
try:
|
||
comparator = DURATION_COMPARATORS[comp_index]
|
||
except IndexError:
|
||
raise ValueError(f"Unknown duration comparator: {comp_index}")
|
||
|
||
duration = parse_duration(compstr)
|
||
|
||
return lambda d: comparator(d, duration)
|
||
|
||
|
||
def format_duration(duration: int) -> str:
|
||
return time.strftime("%H:%M:%S", time.gmtime(duration))
|
||
|
||
|
||
class RVElement:
|
||
parent: "RVDatabase"
|
||
item: dict
|
||
downloaded_filepath: typing.Optional[str]
|
||
watched: bool
|
||
|
||
def __init__(self, parent: "RVDatabase", item: dict) -> None:
|
||
self.parent = parent
|
||
self.item = item
|
||
self.downloaded_filepath = None
|
||
self.watched = False
|
||
|
||
@property
|
||
def title(self) -> str:
|
||
return self.item["title"]
|
||
|
||
@property
|
||
def link(self) -> str:
|
||
return self.item["canonical"][0]["href"]
|
||
|
||
@property
|
||
def creator(self) -> str:
|
||
return self.item["origin"]["title"]
|
||
|
||
@property
|
||
def guid(self) -> int:
|
||
return int(self.item["timestampUsec"])
|
||
|
||
@property
|
||
def is_researched(self) -> bool:
|
||
return "ytdl_infos" in self.__dict__
|
||
|
||
def salvage_cache(self, cache: "RVElement") -> None:
|
||
if cache.is_researched:
|
||
self.__dict__["ytdl_infos"] = cache.__dict__["ytdl_infos"]
|
||
log.debug(f"From cache: {self}")
|
||
if cache.downloaded_filepath:
|
||
self.downloaded_filepath = cache.downloaded_filepath
|
||
if cache.watched:
|
||
self.watched = True
|
||
|
||
def __str__(self) -> str:
|
||
str = f"{self.guid}: {self.creator if self.creator else '?'} – {self.title}"
|
||
if self.is_researched:
|
||
if self.is_video:
|
||
str += f" ({format_duration(self.duration)})"
|
||
else:
|
||
str += " (N/A)"
|
||
else:
|
||
str += " (?)"
|
||
str += f" – {self.link}"
|
||
return str
|
||
|
||
@property
|
||
def downloaded(self) -> bool:
|
||
if not self.is_researched:
|
||
return False
|
||
return os.path.isfile(self.filepath)
|
||
|
||
@functools.cached_property
|
||
def ytdl_infos(self) -> typing.Optional[dict]:
|
||
log.info(f"Researching: {self}")
|
||
try:
|
||
infos = self.parent.ytdl_dry.extract_info(self.link, download=False)
|
||
except KeyboardInterrupt as e:
|
||
raise e
|
||
except yt_dlp.utils.DownloadError as e:
|
||
# TODO Still raise in case of temporary network issue
|
||
log.warning(e)
|
||
infos = None
|
||
if infos:
|
||
infos = self.parent.ytdl_dry.sanitize_info(infos)
|
||
# Save database once it's been computed
|
||
self.__dict__["ytdl_infos"] = infos
|
||
self.parent.save()
|
||
return infos
|
||
|
||
@property
|
||
def duration(self) -> int:
|
||
assert self.is_video
|
||
assert self.ytdl_infos
|
||
return self.ytdl_infos["duration"]
|
||
|
||
@property
|
||
def is_video(self) -> bool:
|
||
# Duration might be missing in playlists and stuff
|
||
return self.ytdl_infos is not None and "duration" in self.ytdl_infos
|
||
|
||
@property
|
||
def filepath(self) -> str:
|
||
assert self.is_video
|
||
if self.downloaded_filepath:
|
||
return self.downloaded_filepath
|
||
return self.parent.ytdl_dry.prepare_filename(self.ytdl_infos)
|
||
|
||
@property
|
||
def filename(self) -> str:
|
||
assert self.is_video
|
||
return os.path.splitext(self.filepath)[0]
|
||
|
||
def download(self) -> None:
|
||
assert self.is_video
|
||
log.info(f"Downloading: {self}")
|
||
if self.parent.args.research:
|
||
del self.ytdl_infos
|
||
if not self.parent.args.dryrun:
|
||
with yt_dlp.YoutubeDL(self.parent.ytdl_opts) as ydl:
|
||
ydl.add_post_processor(SaveInfoPP(self))
|
||
ydl.process_ie_result(self.ytdl_infos, download=True)
|
||
self.parent.save()
|
||
|
||
def update_post_download(self, info: dict) -> None:
|
||
self.downloaded_filepath = self.parent.ytdl_dry.prepare_filename(info)
|
||
|
||
@property
|
||
def was_downloaded(self) -> bool:
|
||
return self.downloaded_filepath is not None
|
||
|
||
def preload(self) -> None:
|
||
assert self.is_video
|
||
if self.downloaded:
|
||
log.debug(f"Currently downloaded: {self}")
|
||
return
|
||
if self.was_downloaded:
|
||
log.debug(f"Downloaded previously: {self}")
|
||
return
|
||
self.download()
|
||
|
||
def matches_filter(self, args: configargparse.Namespace) -> bool:
|
||
# Inexpensive filters
|
||
if args.seen != "any" and (args.seen == "seen") != self.watched:
|
||
log.debug(f"Not {args.seen}: {self}")
|
||
return False
|
||
if args.title and not re.search(args.title, self.title):
|
||
log.debug(f"Title not matching {args.title}: {self}")
|
||
return False
|
||
if args.guid and not re.search(args.guid, str(self.guid)):
|
||
log.debug(f"Guid not matching {args.guid}: {self}")
|
||
return False
|
||
if args.link and not re.search(args.link, self.link):
|
||
log.debug(f"Link not matching {args.link}: {self}")
|
||
return False
|
||
if args.creator and (
|
||
not self.creator or not re.search(args.creator, self.creator)
|
||
):
|
||
log.debug(f"Creator not matching {args.creator}: {self}")
|
||
return False
|
||
|
||
# Expensive filters
|
||
if not self.is_video:
|
||
log.debug(f"Not a video: {self}")
|
||
return False
|
||
if args.duration and not compare_duration(args.duration)(self.duration):
|
||
log.debug(f"Duration {self.duration} not matching {args.duration}: {self}")
|
||
return False
|
||
|
||
return True
|
||
|
||
def watch(self) -> None:
|
||
if not self.downloaded:
|
||
self.download()
|
||
|
||
cmd = ["mpv", self.filepath]
|
||
log.debug(f"Running {cmd}")
|
||
if not self.parent.args.dryrun:
|
||
proc = subprocess.run(cmd)
|
||
proc.check_returncode()
|
||
|
||
self.watched = True
|
||
self.parent.save()
|
||
|
||
def clean(self) -> None:
|
||
assert self.is_video
|
||
log.info(f"Removing gone video: {self.filename}*")
|
||
for file in os.listdir():
|
||
if file.startswith(self.filename):
|
||
log.debug(f"Removing file: {file}")
|
||
if not self.parent.args.dryrun:
|
||
os.unlink(file)
|
||
|
||
|
||
class RVDatabase:
|
||
SAVE_FILE = ".cache.p"
|
||
|
||
args: configargparse.Namespace
|
||
elements: list[RVElement]
|
||
|
||
def __init__(self, args: configargparse.Namespace) -> None:
|
||
self.args = args
|
||
|
||
def save(self) -> None:
|
||
log.debug("Saving cache")
|
||
if self.args.dryrun:
|
||
return
|
||
with open(self.SAVE_FILE, "wb") as save_file:
|
||
pickle.dump(self, save_file)
|
||
|
||
@classmethod
|
||
def load(cls) -> typing.Optional["RVDatabase"]:
|
||
try:
|
||
with open(cls.SAVE_FILE, "rb") as save_file:
|
||
return pickle.load(save_file)
|
||
except (TypeError, AttributeError, EOFError):
|
||
log.warning("Corrupt / outdated cache, it will be rebuilt.")
|
||
except FileNotFoundError:
|
||
pass
|
||
return None
|
||
|
||
def salvage_cache_pre(self, cache: "RVDatabase") -> None:
|
||
if "auth_headers" in cache.__dict__:
|
||
self.auth_headers = cache.auth_headers
|
||
|
||
def salvage_cache(self, cache: "RVDatabase") -> None:
|
||
log.debug("Salvaging cache")
|
||
cache_els = dict()
|
||
for cache_el in cache.elements:
|
||
cache_els[cache_el.guid] = cache_el
|
||
for el in self.elements:
|
||
if el.guid in cache_els:
|
||
el.salvage_cache(cache_els[el.guid])
|
||
|
||
def clean_cache(self, cache: "RVDatabase") -> None:
|
||
log.debug("Cleaning cache")
|
||
self_els = dict()
|
||
for self_el in self.elements:
|
||
self_els[self_el.guid] = self_el
|
||
for el in cache.elements:
|
||
if el.guid not in self_els:
|
||
if el.is_researched and el.is_video:
|
||
el.clean()
|
||
|
||
def import_cache(self, cache: "RVDatabase") -> None:
|
||
log.debug("Importing cache")
|
||
self.build_list([element.item for element in cache.elements])
|
||
|
||
@functools.cached_property
|
||
def auth_headers(self) -> dict[str, str]:
|
||
r = requests.get(
|
||
f"{self.args.url}/accounts/ClientLogin",
|
||
params={"Email": self.args.email, "Passwd": self.args.passwd},
|
||
)
|
||
r.raise_for_status()
|
||
for line in r.text.split("\n"):
|
||
if line.lower().startswith("auth="):
|
||
val = "=".join(line.split("=")[1:])
|
||
return {"Authorization": f"GoogleLogin auth={val}"}
|
||
raise RuntimeError("Couldn't find auth= key")
|
||
|
||
def fetch_feed_elements(self) -> typing.Generator[dict, None, None]:
|
||
log.info("Fetching RSS feed")
|
||
continuation: typing.Optional[str] = None
|
||
with requests.Session() as s:
|
||
|
||
def next_page() -> typing.Generator[dict, None, None]:
|
||
nonlocal continuation
|
||
r = s.get(
|
||
f"{self.args.url}/reader/api/0/stream/contents",
|
||
params={
|
||
"xt": "user/-/state/com.google/read",
|
||
"c": continuation,
|
||
},
|
||
headers=self.auth_headers,
|
||
)
|
||
r.raise_for_status()
|
||
json = r.json()
|
||
yield from json["items"]
|
||
continuation = json.get("continuation")
|
||
|
||
yield from next_page()
|
||
while continuation:
|
||
yield from next_page()
|
||
|
||
def build_list(self, items: typing.Iterable[dict]) -> None:
|
||
self.elements = []
|
||
for item in items:
|
||
element = RVElement(self, item)
|
||
self.elements.insert(0, element)
|
||
log.debug(f"Known: {element}")
|
||
|
||
def read_feed(self) -> None:
|
||
self.build_list(self.fetch_feed_elements())
|
||
|
||
def clean(self) -> None:
|
||
log.debug("Cleaning")
|
||
filenames = set()
|
||
for element in self.elements:
|
||
if element.is_video:
|
||
filenames.add(element.filename)
|
||
for file in os.listdir():
|
||
if file == RVDatabase.SAVE_FILE:
|
||
continue
|
||
if not os.path.isfile(file):
|
||
continue
|
||
for filename in filenames:
|
||
if file.startswith(filename):
|
||
break
|
||
else:
|
||
log.info(f"Removing unknown file: {file}")
|
||
if not self.args.dryrun:
|
||
os.unlink(file)
|
||
|
||
@property
|
||
def all_researched(self) -> bool:
|
||
for element in self.elements:
|
||
if not element.is_researched:
|
||
return False
|
||
return True
|
||
|
||
def attempt_clean(self) -> None:
|
||
if self.all_researched:
|
||
self.clean()
|
||
|
||
@property
|
||
def ytdl_opts(self) -> dict:
|
||
return {"format": self.args.format, "allsubtitles": self.args.subtitles}
|
||
|
||
@property
|
||
def ytdl_dry_opts(self) -> dict:
|
||
opts = self.ytdl_opts.copy()
|
||
opts.update({"quiet": True})
|
||
return opts
|
||
|
||
@property
|
||
def ytdl_dry(self) -> yt_dlp.YoutubeDL:
|
||
return yt_dlp.YoutubeDL(self.ytdl_dry_opts)
|
||
|
||
def filter(self, args: configargparse.Namespace) -> typing.Iterable[RVElement]:
|
||
elements: typing.Iterable[RVElement]
|
||
# Inexpensive sort
|
||
if args.order == "new":
|
||
elements = reversed(self.elements)
|
||
elif args.order == "title":
|
||
elements = sorted(self.elements, key=lambda el: el.title)
|
||
elif args.order == "creator":
|
||
elements = sorted(self.elements, key=lambda el: el.creator or "")
|
||
elif args.order == "link":
|
||
elements = sorted(self.elements, key=lambda el: el.link)
|
||
elif args.order == "random":
|
||
elements_random = self.elements.copy()
|
||
random.shuffle(elements_random)
|
||
elements = elements_random
|
||
else:
|
||
elements = self.elements
|
||
|
||
# Possibly expensive filtering
|
||
elements = filter(lambda el: el.matches_filter(args), elements)
|
||
|
||
# Expensive sort
|
||
if args.order == "short":
|
||
elements = sorted(
|
||
elements, key=lambda el: el.duration if el.is_video else 0
|
||
)
|
||
elif args.order == "long":
|
||
elements = sorted(
|
||
elements, key=lambda el: el.duration if el.is_video else 0, reverse=True
|
||
)
|
||
|
||
# Post sorting filtering
|
||
if args.total_duration:
|
||
rem = parse_duration(args.total_duration)
|
||
old_els = list(elements)
|
||
elements = list()
|
||
while rem > 0:
|
||
for el in old_els:
|
||
if el.duration < rem:
|
||
elements.append(el)
|
||
rem -= el.duration
|
||
old_els.remove(el)
|
||
break
|
||
else:
|
||
break
|
||
|
||
return elements
|
||
|
||
|
||
def get_args() -> configargparse.Namespace:
|
||
defaultConfigPath = os.path.join(
|
||
os.path.expanduser(os.getenv("XDG_CONFIG_PATH", "~/.config/")), "rssVideos"
|
||
)
|
||
|
||
parser = configargparse.ArgParser(
|
||
description="Download videos in unread articles from a feed aggregator",
|
||
default_config_files=[defaultConfigPath],
|
||
)
|
||
|
||
# Runtime settings
|
||
parser.add_argument(
|
||
"-v",
|
||
"--verbosity",
|
||
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
|
||
default=None,
|
||
help="Verbosity of log messages",
|
||
)
|
||
parser.add(
|
||
"-c", "--config", required=False, is_config_file=True, help="Configuration file"
|
||
)
|
||
parser.add(
|
||
"-n",
|
||
"--dryrun",
|
||
help="Only pretend to do actions",
|
||
action="store_const",
|
||
const=True,
|
||
default=False,
|
||
)
|
||
|
||
# Input/Output
|
||
parser.add(
|
||
"--url",
|
||
help="URL of the Google Reader API of the aggregator",
|
||
env_var="RSS_VIDEOS_URL",
|
||
required=True,
|
||
)
|
||
parser.add(
|
||
"--email",
|
||
help="E-mail / user to connect to the aggregator",
|
||
env_var="RSS_VIDEOS_EMAIL",
|
||
required=True,
|
||
)
|
||
parser.add(
|
||
"--passwd",
|
||
help="Password to connect to the aggregator",
|
||
env_var="RSS_VIDEOS_PASSWD",
|
||
required=True,
|
||
)
|
||
parser.add(
|
||
"--research",
|
||
help="Fetch video info again",
|
||
action="store_true",
|
||
)
|
||
parser.add(
|
||
"--no-refresh",
|
||
dest="refresh",
|
||
help="Don't fetch feed",
|
||
action="store_false",
|
||
)
|
||
parser.add(
|
||
"--videos",
|
||
help="Directory to store videos",
|
||
env_var="RSS_VIDEOS_VIDEO_DIR",
|
||
required=True,
|
||
)
|
||
|
||
# Which videos
|
||
parser.add(
|
||
"--order",
|
||
choices=("old", "new", "title", "creator", "link", "short", "long", "random"),
|
||
default="old",
|
||
help="Sorting mechanism",
|
||
)
|
||
parser.add("--guid", help="Regex to filter guid")
|
||
parser.add("--creator", help="Regex to filter by creator")
|
||
parser.add("--title", help="Regex to filter by title")
|
||
parser.add("--link", help="Regex to filter by link")
|
||
parser.add("--duration", help="Comparative to filter by duration")
|
||
parser.add(
|
||
"--seen",
|
||
choices=("seen", "unseen", "any"),
|
||
default="unseen",
|
||
help="Only include seen/unseen/any videos",
|
||
)
|
||
parser.add(
|
||
"--total-duration",
|
||
help="Use videos that fit under the total given",
|
||
)
|
||
# TODO Envrionment variables
|
||
# TODO Allow to ask
|
||
|
||
# How to download
|
||
parser.add(
|
||
"--format",
|
||
help="Use this format to download videos."
|
||
+ " See FORMAT SELECTION in youtube-dl(1)",
|
||
env_var="RSS_VIDEOS_FORMAT",
|
||
default="bestvideo+bestaudio/best",
|
||
)
|
||
parser.add(
|
||
"--subtitles",
|
||
help="Download all subtitles",
|
||
env_var="RSS_VIDEOS_SUBTITLES",
|
||
action="store_true",
|
||
)
|
||
|
||
parser.add(
|
||
"action",
|
||
nargs="?",
|
||
choices=(
|
||
"download",
|
||
"list",
|
||
"watch",
|
||
"binge",
|
||
"clean",
|
||
"seen",
|
||
"unseen",
|
||
),
|
||
default="download",
|
||
)
|
||
|
||
args = parser.parse_args()
|
||
args.videos = os.path.realpath(os.path.expanduser(args.videos))
|
||
if not args.duration and args.max_duration:
|
||
args.duration = str(args.max_duration)
|
||
|
||
return args
|
||
|
||
|
||
def get_database(args: configargparse.Namespace) -> RVDatabase:
|
||
database = RVDatabase(args)
|
||
cache = RVDatabase.load()
|
||
feed_fetched = False
|
||
if cache:
|
||
database.salvage_cache_pre(cache)
|
||
if args.refresh:
|
||
try:
|
||
database.read_feed()
|
||
feed_fetched = True
|
||
except requests.ConnectionError as err:
|
||
if args.action == "download":
|
||
raise RuntimeError("Couldn't fetch feed, refusing to download")
|
||
# This is a quirky failsafe in case of no internet connection,
|
||
# so the script doesn't go noting that no element is a video.
|
||
log.warning(f"Couldn't fetch feed: {err}")
|
||
if not feed_fetched:
|
||
if cache:
|
||
log.warning("Using cached feed.")
|
||
database.import_cache(cache)
|
||
else:
|
||
raise FileNotFoundError("Feed not fetched and no cached feed.")
|
||
if cache:
|
||
database.salvage_cache(cache)
|
||
database.clean_cache(cache)
|
||
database.save()
|
||
|
||
return database
|
||
|
||
|
||
def main() -> None:
|
||
args = get_args()
|
||
configure_logging(args)
|
||
|
||
os.makedirs(args.videos, exist_ok=True)
|
||
os.chdir(args.videos)
|
||
|
||
database = get_database(args)
|
||
|
||
log.debug("Running action")
|
||
if args.action == "clean":
|
||
database.clean()
|
||
else:
|
||
duration = 0
|
||
for element in database.filter(args):
|
||
if args.action == "download":
|
||
element.preload()
|
||
elif args.action == "list":
|
||
print(element)
|
||
elif args.action in ("watch", "binge"):
|
||
element.watch()
|
||
if args.action == "watch":
|
||
break
|
||
elif args.action == "seen":
|
||
if not element.watched:
|
||
log.info(f"Maked as seen: {element}")
|
||
element.watched = True
|
||
elif args.action == "unseen":
|
||
if element.watched:
|
||
log.info(f"Maked as unseen: {element}")
|
||
element.watched = False
|
||
else:
|
||
raise NotImplementedError(f"Unimplemented action: {args.action}")
|
||
duration += element.duration if element.is_video else 0
|
||
log.info(f"Total duration: {format_duration(duration)}")
|
||
database.attempt_clean()
|
||
database.save()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|