dotfiles/config/scripts/rssVideos

721 lines
21 KiB
Python
Executable file
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Script that download videos that are linked as an article
in a RSS feed.
The common use case would be a feed from an RSS aggregator
with the unread items (non-video links are ignored).
"""
import datetime
import functools
import logging
import os
import pickle
import random
import requests
import re
import subprocess
import time
import typing
import coloredlogs
import configargparse
import yt_dlp
log = logging.getLogger(__name__)
# TODO Lockfile, or a way to parallel watch and download
def configure_logging(args: configargparse.Namespace) -> None:
# Configure logging
if args.verbosity:
coloredlogs.install(
level=args.verbosity,
)
else:
coloredlogs.install(
fmt="%(message)s",
logger=log,
)
class SaveInfoPP(yt_dlp.postprocessor.common.PostProcessor):
"""
yt_dlp.process_ie_result() doesn't return a completely updated info dict,
notably the extension is still the one before it realizes the files cannot
be merged. So we use this PostProcessor to catch the info dict in its final
form and save what we need from it (it's not serializable in this state).
"""
def __init__(self, rvelement: "RVElement") -> None:
self.rvelement = rvelement
super().__init__()
def run(self, info: dict) -> tuple[list, dict]:
self.rvelement.update_post_download(info)
return [], info
def parse_duration(string: str) -> int:
DURATION_MULTIPLIERS = {"s": 1, "m": 60, "h": 3600, "": 1}
mult_index = string[-1].lower()
if mult_index.isdigit():
mult_index = ""
else:
string = string[:-1]
try:
multiplier = DURATION_MULTIPLIERS[mult_index]
except IndexError:
raise ValueError(f"Unknown duration multiplier: {mult_index}")
return int(string) * multiplier
def compare_duration(compstr: str) -> typing.Callable[[int], bool]:
DURATION_COMPARATORS = {
"<": int.__lt__,
"-": int.__lt__,
">": int.__gt__,
"+": int.__gt__,
"=": int.__eq__,
"": int.__le__,
}
comp_index = compstr[0]
if comp_index.isdigit():
comp_index = ""
else:
compstr = compstr[1:]
try:
comparator = DURATION_COMPARATORS[comp_index]
except IndexError:
raise ValueError(f"Unknown duration comparator: {comp_index}")
duration = parse_duration(compstr)
return lambda d: comparator(d, duration)
def format_duration(duration: int) -> str:
return time.strftime("%H:%M:%S", time.gmtime(duration))
class RVElement:
parent: "RVDatabase"
item: dict
downloaded_filepath: typing.Optional[str]
def __init__(self, parent: "RVDatabase", item: dict) -> None:
self.parent = parent
self.item = item
self.downloaded_filepath = None
@property
def id(self) -> str:
return self.item["id"]
@property
def title(self) -> str:
return self.item["title"]
@property
def link(self) -> str:
return self.item["canonical"][0]["href"]
@property
def creator(self) -> str:
return self.item["origin"]["title"]
@property
def date(self) -> datetime.datetime:
return datetime.datetime.fromtimestamp(self.item["published"])
@property
def is_researched(self) -> bool:
return "ytdl_infos" in self.__dict__
def salvage_cache(self, cache: "RVElement") -> None:
if cache.is_researched:
self.__dict__["ytdl_infos"] = cache.__dict__["ytdl_infos"]
log.debug(f"From cache: {self}")
if cache.downloaded_filepath:
self.downloaded_filepath = cache.downloaded_filepath
def __str__(self) -> str:
str = f"{self.date.strftime('%y-%m-%d %H:%M')} ("
if self.is_researched:
if self.is_video:
str += format_duration(self.duration)
else:
str += "--:--:--"
else:
str += "??:??:??"
str += (
f") {self.creator if self.creator else '?'} "
f" {self.title} "
f" {self.link}"
)
return str
@property
def downloaded(self) -> bool:
if not self.is_researched:
return False
return os.path.isfile(self.filepath)
@functools.cached_property
def ytdl_infos(self) -> typing.Optional[dict]:
log.info(f"Researching: {self}")
try:
infos = self.parent.ytdl_dry.extract_info(self.link, download=False)
except KeyboardInterrupt as e:
raise e
except yt_dlp.utils.DownloadError as e:
# TODO Still raise in case of temporary network issue
log.warning(e)
infos = None
if infos:
infos = self.parent.ytdl_dry.sanitize_info(infos)
# Save database once it's been computed
self.__dict__["ytdl_infos"] = infos
self.parent.save()
return infos
@property
def duration(self) -> int:
assert self.is_video
assert self.ytdl_infos
return self.ytdl_infos["duration"]
@property
def is_video(self) -> bool:
# Duration might be missing in playlists and stuff
return self.ytdl_infos is not None and "duration" in self.ytdl_infos
@property
def filepath(self) -> str:
assert self.is_video
if self.downloaded_filepath:
return self.downloaded_filepath
return self.parent.ytdl_dry.prepare_filename(self.ytdl_infos)
@property
def filename(self) -> str:
assert self.is_video
return os.path.splitext(self.filepath)[0]
def download(self) -> None:
assert self.is_video
log.info(f"Downloading: {self}")
if self.parent.args.research:
del self.ytdl_infos
if not self.parent.args.dryrun:
with yt_dlp.YoutubeDL(self.parent.ytdl_opts) as ydl:
ydl.add_post_processor(SaveInfoPP(self))
ydl.process_ie_result(self.ytdl_infos, download=True)
self.parent.save()
def update_post_download(self, info: dict) -> None:
self.downloaded_filepath = self.parent.ytdl_dry.prepare_filename(info)
@property
def was_downloaded(self) -> bool:
return self.downloaded_filepath is not None
def preload(self) -> None:
assert self.is_video
if self.downloaded:
log.debug(f"Currently downloaded: {self}")
return
if self.was_downloaded:
log.debug(f"Downloaded previously: {self}")
return
self.download()
@property
def watched(self) -> bool:
if not self.is_researched:
return False
return self.was_downloaded and not self.downloaded
def matches_filter(self, args: configargparse.Namespace) -> bool:
# Inexpensive filters
if args.seen != "any" and (args.seen == "seen") != self.watched:
log.debug(f"Not {args.seen}: {self}")
return False
if args.title and not re.search(args.title, self.title):
log.debug(f"Title not matching {args.title}: {self}")
return False
if args.link and not re.search(args.link, self.link):
log.debug(f"Link not matching {args.link}: {self}")
return False
if args.creator and (
not self.creator or not re.search(args.creator, self.creator)
):
log.debug(f"Creator not matching {args.creator}: {self}")
return False
# Expensive filters
if not self.is_video:
log.debug(f"Not a video: {self}")
return False
if args.duration and not compare_duration(args.duration)(self.duration):
log.debug(f"Duration {self.duration} not matching {args.duration}: {self}")
return False
return True
def watch(self) -> None:
if not self.downloaded:
self.download()
cmd = ["mpv", self.filepath]
log.debug(f"Running {cmd}")
if not self.parent.args.dryrun:
proc = subprocess.run(cmd)
proc.check_returncode()
self.clean()
self.try_mark_read()
def clean(self) -> None:
assert self.is_video
log.info(f"Removing gone video: {self.filename}*")
for file in os.listdir():
if file.startswith(self.filename):
log.debug(f"Removing file: {file}")
if not self.parent.args.dryrun:
os.unlink(file)
def mark_read(self) -> None:
log.debug(f"Marking {self} read")
if self.parent.args.dryrun:
return
r = requests.post(
f"{self.parent.args.url}/reader/api/0/edit-tag",
data={
"i": self.id,
"a": "user/-/state/com.google/read",
"ac": "edit",
"token": self.parent.feed_token,
},
headers=self.parent.auth_headers,
)
r.raise_for_status()
if r.text.strip() != "OK":
raise RuntimeError(f"Couldn't mark {self} as read: {r.text}")
log.info(f"Marked {self} as read")
self.parent.elements.remove(self)
def try_mark_read(self) -> None:
try:
self.mark_read()
except requests.ConnectionError:
log.warning(f"Couldn't mark {self} as read")
class RVDatabase:
SAVE_FILE = ".cache.p"
args: configargparse.Namespace
elements: list[RVElement]
def __init__(self, args: configargparse.Namespace) -> None:
self.args = args
def save(self) -> None:
log.debug("Saving cache")
if self.args.dryrun:
return
with open(self.SAVE_FILE, "wb") as save_file:
pickle.dump(self, save_file)
@classmethod
def load(cls) -> typing.Optional["RVDatabase"]:
try:
with open(cls.SAVE_FILE, "rb") as save_file:
return pickle.load(save_file)
except (TypeError, AttributeError, EOFError):
log.warning("Corrupt / outdated cache, it will be rebuilt.")
except FileNotFoundError:
pass
return None
def salvage_cache_pre(self, cache: "RVDatabase") -> None:
if "auth_headers" in cache.__dict__:
self.auth_headers = cache.auth_headers
def salvage_cache(self, cache: "RVDatabase") -> None:
log.debug("Salvaging cache")
cache_els = dict()
for cache_el in cache.elements:
cache_els[cache_el.id] = cache_el
for el in self.elements:
if el.id in cache_els:
el.salvage_cache(cache_els[el.id])
def clean_cache(self, cache: "RVDatabase") -> None:
log.debug("Cleaning cache")
self_els = dict()
for self_el in self.elements:
self_els[self_el.id] = self_el
for el in cache.elements:
if el.id not in self_els:
if el.is_researched and el.is_video:
el.clean()
def import_cache(self, cache: "RVDatabase") -> None:
log.debug("Importing cache")
self.build_list([element.item for element in cache.elements])
@functools.cached_property
def auth_headers(self) -> dict[str, str]:
r = requests.get(
f"{self.args.url}/accounts/ClientLogin",
params={"Email": self.args.email, "Passwd": self.args.passwd},
)
r.raise_for_status()
for line in r.text.split("\n"):
if line.lower().startswith("auth="):
val = "=".join(line.split("=")[1:])
return {"Authorization": f"GoogleLogin auth={val}"}
raise RuntimeError("Couldn't find auth= key")
def fetch_feed_elements(self) -> typing.Generator[dict, None, None]:
log.info("Fetching RSS feed")
continuation: typing.Optional[str] = None
with requests.Session() as s:
def next_page() -> typing.Generator[dict, None, None]:
nonlocal continuation
r = s.get(
f"{self.args.url}/reader/api/0/stream/contents",
params={
"xt": "user/-/state/com.google/read",
"c": continuation,
},
headers=self.auth_headers,
)
r.raise_for_status()
json = r.json()
yield from json["items"]
continuation = json.get("continuation")
yield from next_page()
while continuation:
yield from next_page()
def build_list(self, items: typing.Iterable[dict]) -> None:
self.elements = []
for item in items:
element = RVElement(self, item)
self.elements.insert(0, element)
log.debug(f"Known: {element}")
def read_feed(self) -> None:
self.build_list(self.fetch_feed_elements())
def clean(self) -> None:
log.debug("Cleaning")
filenames = set()
for element in self.elements:
if element.is_video:
filenames.add(element.filename)
for file in os.listdir():
if file == RVDatabase.SAVE_FILE:
continue
if not os.path.isfile(file):
continue
for filename in filenames:
if file.startswith(filename):
break
else:
log.info(f"Removing unknown file: {file}")
if not self.args.dryrun:
os.unlink(file)
@property
def all_researched(self) -> bool:
for element in self.elements:
if not element.is_researched:
return False
return True
def attempt_clean(self) -> None:
if self.all_researched:
self.clean()
@property
def ytdl_opts(self) -> dict:
return {"format": self.args.format, "allsubtitles": self.args.subtitles}
@property
def ytdl_dry_opts(self) -> dict:
opts = self.ytdl_opts.copy()
opts.update({"quiet": True})
return opts
@property
def ytdl_dry(self) -> yt_dlp.YoutubeDL:
return yt_dlp.YoutubeDL(self.ytdl_dry_opts)
def filter(self, args: configargparse.Namespace) -> typing.Iterable[RVElement]:
elements: typing.Iterable[RVElement]
# Inexpensive sort
if args.order == "new":
elements = reversed(self.elements)
elif args.order == "title":
elements = sorted(self.elements, key=lambda el: el.title)
elif args.order == "creator":
elements = sorted(self.elements, key=lambda el: el.creator or "")
elif args.order == "link":
elements = sorted(self.elements, key=lambda el: el.link)
elif args.order == "random":
elements_random = self.elements.copy()
random.shuffle(elements_random)
elements = elements_random
else:
elements = self.elements
# Possibly expensive filtering
elements = filter(lambda el: el.matches_filter(args), elements)
# Expensive sort
if args.order == "short":
elements = sorted(
elements, key=lambda el: el.duration if el.is_video else 0
)
elif args.order == "long":
elements = sorted(
elements, key=lambda el: el.duration if el.is_video else 0, reverse=True
)
# Post sorting filtering
if args.total_duration:
rem = parse_duration(args.total_duration)
old_els = list(elements)
elements = list()
while rem > 0:
for el in old_els:
if el.duration < rem:
elements.append(el)
rem -= el.duration
old_els.remove(el)
break
else:
break
return elements
@functools.cached_property
def feed_token(self) -> str:
r = requests.get(
f"{self.args.url}/reader/api/0/token",
headers=self.auth_headers,
)
r.raise_for_status()
return r.text.strip()
def try_mark_watched_read(self) -> None:
for element in self.elements:
if element.watched:
element.try_mark_read()
def get_args() -> configargparse.Namespace:
defaultConfigPath = os.path.join(
os.path.expanduser(os.getenv("XDG_CONFIG_PATH", "~/.config/")), "rssVideos"
)
parser = configargparse.ArgParser(
description="Download videos in unread articles from a feed aggregator",
default_config_files=[defaultConfigPath],
)
# Runtime settings
parser.add_argument(
"-v",
"--verbosity",
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
default=None,
help="Verbosity of log messages",
)
parser.add(
"-c", "--config", required=False, is_config_file=True, help="Configuration file"
)
parser.add(
"-n",
"--dryrun",
help="Only pretend to do actions",
action="store_const",
const=True,
default=False,
)
# Input/Output
parser.add(
"--url",
help="URL of the Google Reader API of the aggregator",
env_var="RSS_VIDEOS_URL",
required=True,
)
parser.add(
"--email",
help="E-mail / user to connect to the aggregator",
env_var="RSS_VIDEOS_EMAIL",
required=True,
)
parser.add(
"--passwd",
help="Password to connect to the aggregator",
env_var="RSS_VIDEOS_PASSWD",
required=True,
)
parser.add(
"--research",
help="Fetch video info again",
action="store_true",
)
parser.add(
"--no-refresh",
dest="refresh",
help="Don't fetch feed",
action="store_false",
)
parser.add(
"--videos",
help="Directory to store videos",
env_var="RSS_VIDEOS_VIDEO_DIR",
required=True,
)
# Which videos
parser.add(
"--order",
choices=("old", "new", "title", "creator", "link", "short", "long", "random"),
default="old",
help="Sorting mechanism",
)
parser.add("--creator", help="Regex to filter by creator")
parser.add("--title", help="Regex to filter by title")
parser.add("--link", help="Regex to filter by link")
parser.add("--duration", help="Comparative to filter by duration")
# TODO Date selector
parser.add(
"--seen",
choices=("seen", "unseen", "any"),
default="unseen",
help="Only include seen/unseen/any videos",
)
parser.add(
"--total-duration",
help="Use videos that fit under the total given",
)
# TODO Envrionment variables
# TODO Allow to ask
# How to download
parser.add(
"--format",
help="Use this format to download videos."
+ " See FORMAT SELECTION in youtube-dl(1)",
env_var="RSS_VIDEOS_FORMAT",
default="bestvideo+bestaudio/best",
)
parser.add(
"--subtitles",
help="Download all subtitles",
env_var="RSS_VIDEOS_SUBTITLES",
action="store_true",
)
parser.add(
"action",
nargs="?",
choices=(
"download",
"list",
"watch",
"binge",
"clean",
),
default="download",
)
args = parser.parse_args()
args.videos = os.path.realpath(os.path.expanduser(args.videos))
if not args.duration and args.max_duration:
args.duration = str(args.max_duration)
return args
def get_database(args: configargparse.Namespace) -> RVDatabase:
database = RVDatabase(args)
cache = RVDatabase.load()
feed_fetched = False
if cache:
database.salvage_cache_pre(cache)
if args.refresh:
try:
database.read_feed()
feed_fetched = True
except requests.ConnectionError as err:
if args.action == "download":
raise RuntimeError("Couldn't fetch feed, refusing to download")
# This is a quirky failsafe in case of no internet connection,
# so the script doesn't go noting that no element is a video.
log.warning(f"Couldn't fetch feed: {err}")
if not feed_fetched:
if cache:
log.warning("Using cached feed.")
database.import_cache(cache)
else:
raise FileNotFoundError("Feed not fetched and no cached feed.")
if cache:
database.salvage_cache(cache)
database.clean_cache(cache)
database.save()
return database
def main() -> None:
args = get_args()
configure_logging(args)
os.makedirs(args.videos, exist_ok=True)
os.chdir(args.videos)
database = get_database(args)
log.debug("Running action")
if args.action == "clean":
database.clean()
else:
duration = 0
for element in database.filter(args):
duration += element.duration if element.is_video else 0
if args.action == "download":
element.preload()
elif args.action == "list":
print(element)
elif args.action in ("watch", "binge"):
element.watch()
if args.action == "watch":
break
else:
raise NotImplementedError(f"Unimplemented action: {args.action}")
log.info(f"Total duration: {format_duration(duration)}")
database.attempt_clean()
database.try_mark_watched_read()
database.save()
if __name__ == "__main__":
main()