Merge remote-tracking branch 'origin/master'

This commit is contained in:
Geoffrey Frogeye 2021-12-27 16:46:08 +01:00
commit c0dfca5831
30 changed files with 719 additions and 274 deletions

1
config/automatrop/.gitignore vendored Normal file
View file

@ -0,0 +1 @@
self_name

View file

@ -14,3 +14,6 @@ has_forge_access: yes
extensions:
- g
- gh
x11_screens:
- HDMI-0
- eDP-1-1

View file

@ -0,0 +1,12 @@
root_access: no
display_server: "x11"
dev_stuffs:
- shell
- network
- ansible
- python
extensions:
- gh
x11_screens:
- HDMI-1
- HDMI-2

View file

@ -4,8 +4,12 @@ dev_stuffs:
- shell
- network
- ansible
- python
has_battery: yes
encrypt_home_stacked_fs: yes
extensions:
- g
- gh
x11_screens:
- DP-1
- eDP-1

View file

@ -1,3 +1,4 @@
curacao.geoffrey.frogeye.fr
# triffle.geoffrey.frogeye.fr
pindakaas.geoffrey.frogeye.fr
gho.geoffrey.frogeye.fr ansible_host=localhost ansible_port=2222

View file

@ -113,11 +113,11 @@
content: "{{ base16_schemes['schemes'][base16_scheme]['rofi']['themes']['base16-' + base16_scheme + '.' + item] }}"
dest: "{{ ansible_env.HOME }}/.config/rofi/theme.{{ item }}"
mode: "u=rw,g=r,o=r"
with_items:
- rasi
- config
tags:
- color
loop:
- config
- rasi
- name: Configure Dunst
template:
@ -167,6 +167,7 @@
loop:
- pulseaudio
- mpd
when: has_systemd
# TODO bar (might change bar in the future, so...)
# TODO highlight (there IS a template but the colors look different from vim and mostly the same from when there's no config)

View file

@ -14,8 +14,9 @@
git:
repo: "{% if has_forge_access %}git@git.frogeye.fr:{% else %}https://git.frogeye.fr/{% endif %}geoffrey/dotfiles.git"
dest: "{{ ansible_user_dir }}/.dotfiles"
update: "{{ not has_forge_access }}"
update: yes
notify: install dotfiles
tags: dotfiles_repo
# TODO Put actual dotfiles in a subdirectory of the repo, so we don't have to put everything in config
- name: Register as Ansible collection

View file

@ -1,2 +0,0 @@
dependencies:
- role: system

View file

@ -4,9 +4,17 @@
arch: "{{ ansible_lsb.id == 'Arch' }}"
manjaro: "{{ ansible_lsb.id == 'Manjaro' or ansible_lsb.id == 'Manjaro-ARM' }}"
termux: "{{ ansible_distribution == 'OtherLinux' and ansible_python.executable == '/data/data/com.termux/files/usr/bin/python' }}"
debian_based: "{{ ansible_distribution == 'Debian' or ansible_distribution == 'Ubuntu' }}"
debian: "{{ ansible_distribution == 'Debian' }}"
ubuntu: "{{ ansible_distribution == 'Ubuntu' }}"
junest: "{{ ansible_distribution == 'Archlinux' and ansible_is_chroot }}" # TODO Check if /etc/junest exists
tags:
- always
- name: Set composed facts
set_fact:
debian_based: "{{ debian or ubuntu }}"
can_chown: "{{ not junest }}"
has_systemd: "{{ not junest }}"
tags:
- always
# TODO Make this a real Ansible fact maybe?

View file

@ -48,3 +48,4 @@
- name: Install Geoffrey Frogeye's key
gpg_key:
fpr: 4FBA930D314A03215E2CDB0A8312C8CAC1BAC289
trust: 5

View file

@ -15,3 +15,9 @@
listen: "software changed"
when: root_access
when: arch_based
- name: update pacman cache
pacman:
update_cache: yes
become: yes
when: arch_based

View file

@ -59,6 +59,30 @@
# Arch configuration
# TODO Patch sudo-fake so it allows using -u so `become` works
- name: Enable multilib repo
lineinfile:
path: /etc/pacman.conf
regexp: '^#?\s*\[multilib\]$'
line: '[multilib]'
become: yes
when: arch_based and ansible_architecture == "x86_64"
notify: udpate pacman cache
- name: Configure multilib repo
lineinfile:
path: /etc/pacman.conf
regexp: '^#?\s*Include\s*=\s*/etc/pacman.d/mirrorlist'
line: 'Include = /etc/pacman.d/mirrorlist'
insertafter: '^\[multilib\]$'
become: yes
when: arch_based and ansible_architecture == "x86_64"
notify: udpate pacman cache
- name: Update cache if needed
meta: flush_handlers
- name: Install ccache
pacman:
name: ccache
@ -90,7 +114,6 @@
replace: "CFLAGS=\\1\\2"
become: yes
when: arch_based
tags: g
- name: Change -march to native from makepkg CFLAGS
replace:
@ -99,7 +122,6 @@
replace: "CFLAGS=\\1-march=native\\2\\3"
become: yes
when: arch_based
tags: g
- name: Set makepkg MAKEFLAGS
replace:
@ -140,24 +162,30 @@
# Install alternative package managers
- name: List packages from base-devel
command: pacman -Sqg base-devel
register: base_devel_packages
changed_when: no
check_mode: no
- name: Install dependencies for AUR helpers
pacman:
name:
- fakeroot
- base-devel
name: "{{ (base_devel_packages.stdout | split('\n') | reject('eq', 'sudo')) + ['fakeroot'] }}"
become: yes
when: arch_based and root_access
when: arch_based
# Do not install sudo because maybe sudo-fake is installed (otherwise it conflicts)
# It should already be installed already anyway
- name: Install AUR package manager (Arch)
aur:
name: yay-bin
when: arch and root_access
when: arch
- name: Install AUR package manager (Manjaro)
pacman:
name: yay
become: yes
when: manjaro and root_access
when: manjaro
# Not sure if regular Manjaro has yay in its community packages,
# but Manjaro-ARM sure does
@ -172,16 +200,6 @@
packages: "{{ query('template', 'package_manager.j2')[0].split('\n')[:-1]|sort|unique }}"
tags: softwarelist
- name: Check if list of packages changed
copy:
content: "{% for package in packages %}{{ package }}\n{% endfor %}"
dest: "{{ ansible_user_dir }}/.cache/automatrop/package_manager"
notify: "software changed"
tags: softwarelist
- debug:
msg: "{{ packages }}"
- name: Install packages (Arch-based)
aur:
name: "{{ packages }}"
@ -192,7 +210,14 @@
use: yay
notify: "software changed"
tags: softwarelist
when: arch_based and root_access
when: arch_based
- name: Check if list of packages changed
copy:
content: "{% for package in packages %}{{ package }}\n{% endfor %}"
dest: "{{ ansible_user_dir }}/.cache/automatrop/package_manager"
notify: "software changed"
tags: softwarelist
# translate-shell
# $ curl -L git.io/trans > ~/.local/bin/trans

View file

@ -5,7 +5,7 @@ rsync
borg
syncthing
{% if arch_based %}
{% if ansible_architecture == 'x86_64' %}
{% if ansible_architecture == 'x86_64' and can_chown %}
freefilesync-bin
{# Not worth the compilation if you can't have the binaries #}
{% endif %}

View file

@ -11,4 +11,6 @@ highlight
{% endif %}
{# For nvim's :Telescope live_grep #}
ripgrep
{# Offline documentation #}
zeal
{# EOF #}

View file

@ -8,3 +8,4 @@ python-lsp-server
python-mypy-ls
python-lsp-black
{% endif %}
ipython

View file

@ -1,7 +1,9 @@
moreutils
man
visidata
{% if can_chown or not arch_based %}
insect
{% endif %}
translate-shell
gnupg
{# Editor #}

View file

@ -3,7 +3,6 @@
vim_variants:
- vim
- nvim
tags: g
# TODO vim-minimal for bsh
# TODO Select those in a clever way
@ -25,7 +24,6 @@
src: loader.j2
dest: "{{ ansible_user_dir }}/.config/vim/loader.vim"
mode: "u=rw,g=r,o=r"
tags: g
- name: Install theme
template:
@ -54,4 +52,3 @@
loop: "{{ vim_variants }}"
loop_control:
loop_var: variant
tags: g

View file

@ -28,8 +28,8 @@ require('feline').setup({
base0F = base16_colors.base0F,
},
components = {
left = {
active = {
active = {
{
{
provider = function() return string.format(' %d ', vim.fn.line('$')) end,
-- If you can, make it depend on the actual bar size
@ -95,10 +95,8 @@ require('feline').setup({
provider='',
hl = { bg = 'base01', fg = 'base02' },
},
}
},
right = {
active = {
},
{
{
provider='',
hl = { bg = 'base03', fg = 'base01', },

View file

@ -50,7 +50,7 @@ bindsym $mod+Shift+d exec --no-startup-id rofi -modi drun -show drun
# Start Applications
# bindsym $mod+Return exec urxvtc
bindsym $mod+Return exec alacritty
bindsym $mod+Return exec alacritty -e zsh
bindsym $mod+Shift+Return exec urxvt
bindsym $mod+p exec thunar
bindsym $mod+m exec qutebrowser --override-restore --backend=webengine
@ -74,9 +74,9 @@ bindsym XF86MonBrightnessDown exec xbacklight -dec 5 -time 0
bindsym XF86MonBrightnessUp exec xbacklight -inc 5 -time 0
# Screenshots
bindsym Print exec scrot -ue 'mv $f ~/Screenshots/ && optipng ~/Screenshots/$f'
bindsym $mod+Print exec scrot -e 'mv $f ~/Screenshots/ && optipng ~/Screenshots/$f'
bindsym Ctrl+Print exec sleep 1 && scrot -se 'mv $f ~/Screenshots/ && optipng ~/Screenshots/$f'
bindsym Print exec scrot --focused --exec 'mv $f ~/Screenshots/ && optipng ~/Screenshots/$f'
bindsym $mod+Print exec scrot --exec 'mv $f ~/Screenshots/ && optipng ~/Screenshots/$f'
bindsym Ctrl+Print exec sleep 1 && scrot --select --exec 'mv $f ~/Screenshots/ && optipng ~/Screenshots/$f'
focus_follows_mouse no
mouse_warping output
@ -149,7 +149,7 @@ set $WS9 9
set $WS10 10
# Workspace output
{% set screens = ["HDMI-0", "eDP-1-1"] %}
{% set screens = x11_screens | default(['DEFAULT']) %}
{% for i in range(1, 11) %}
workspace "$WS{{ i }}" output {{ screens[(i - 1) % (screens | length)] }}
{% endfor %}

View file

@ -1,3 +1,2 @@
theme.config
theme.rasi

View file

@ -1,8 +1,4 @@
#include "theme.config"
rofi.theme: theme
rofi.cycle: true
rofi.case-sensitive: false
rofi.scroll-method: 0
rofi.show-match: true
rofi.lazy-grab: false
rofi.matching: regex

5
config/rofi/config.rasi Normal file
View file

@ -0,0 +1,5 @@
configuration {
lazy-grab: false;
matching: "regex";
}
@theme "theme"

View file

@ -1,4 +1,10 @@
#!/usr/bin/env bash
cd ~/.dotfiles/config/automatrop
ansible-playbook --diff playbooks/default.yml --limit $HOSTNAME --connection local "$@"
if [ -f ~/.config/automatrop/self_name ]
then
hostname=$(cat ~/.config/automatrop/self_name)
else
hostname="$HOSTNAME"
fi
ansible-playbook --diff playbooks/default.yml --limit $hostname --connection local "$@"

View file

@ -1,3 +1,4 @@
coloredlogs>=10.0<11
progressbar2>=3.47.0<4
youtube-dl>=2021.6.6
yt-dlp>=2021.10.22
ConfigArgParse>=1.5<2

View file

@ -1,5 +1,6 @@
#!/usr/bin/env python3
"""
Script that download videos that are linked as an article
in a RSS feed.
@ -7,18 +8,466 @@ The common use case would be a feed from an RSS aggregator
with the unread items (non-video links are ignored).
"""
# TODO Distribute this correclty, in the meanwhile please do
# pip install --user yt-dlp ConfigArgParse
# TODO Better logging (youtube-dl allow to pass loggers)
import sys
import urllib.request
import urllib.parse
import enum
import functools
import logging
import os
import pickle
import random
import re
import subprocess
import sys
import time
import typing
import urllib.parse
import urllib.request
import urllib.error
from xml.dom import minidom
import yt_dlp as youtube_dl
import coloredlogs
import configargparse
import yt_dlp
log = logging.getLogger(__name__)
# TODO Lockfile, or a way to parallel watch and download
# TODO Save ytdl infos and view info separately
def configure_logging(args: configargparse.Namespace) -> None:
# Configure logging
if args.verbosity:
coloredlogs.install(
level=args.verbosity,
)
else:
coloredlogs.install(
fmt="%(message)s",
logger=log,
)
class SaveInfoPP(yt_dlp.postprocessor.common.PostProcessor):
"""
yt_dlp.process_ie_result() doesn't return a completely updated info dict,
notably the extension is still the one before it realizes the files cannot
be merged. So we use this PostProcessor to catch the info dict in its final
form and save what we need from it (it's not serializable in this state).
"""
def __init__(self, rvelement: "RVElement") -> None:
self.rvelement = rvelement
super().__init__()
def run(self, info: dict) -> tuple[list, dict]:
self.rvelement.update_post_download(info)
return [], info
def parse_duration(string: str) -> int:
DURATION_MULTIPLIERS = {"s": 1, "m": 60, "h": 3600, "": 1}
mult_index = string[-1].lower()
if mult_index.isdigit():
mult_index = ""
else:
string = string[:-1]
try:
multiplier = DURATION_MULTIPLIERS[mult_index]
except IndexError:
raise ValueError(f"Unknown duration multiplier: {mult_index}")
return int(string) * multiplier
def compare_duration(compstr: str) -> typing.Callable[[int], bool]:
DURATION_COMPARATORS = {
"<": int.__lt__,
"-": int.__lt__,
">": int.__gt__,
"+": int.__gt__,
"=": int.__eq__,
"": int.__le__,
}
comp_index = compstr[0]
if comp_index.isdigit():
comp_index = ""
else:
compstr = compstr[1:]
try:
comparator = DURATION_COMPARATORS[comp_index]
except IndexError:
raise ValueError(f"Unknown duration comparator: {comp_index}")
duration = parse_duration(compstr)
return lambda d: comparator(d, duration)
def format_duration(duration: int) -> str:
return time.strftime("%H:%M:%S", time.gmtime(duration))
class RVElement:
parent: "RVDatabase"
item: minidom.Element
downloaded_filepath: typing.Optional[str]
watched: bool
def __init__(self, parent: "RVDatabase", item: minidom.Element) -> None:
self.parent = parent
self.item = item
self.downloaded_filepath = None
self.watched = False
def get_tag_data(self, tag_name: str) -> str:
nodes = self.item.getElementsByTagName(tag_name)
if len(nodes) != 1:
raise KeyError(f"Exepected 1 tag `{tag_name}`, got {len(nodes)}.")
children = nodes[0].childNodes
if len(children) != 1:
raise KeyError(
f"Exepected 1 children for tag `{tag_name}`, got {len(children)}."
)
return children[0].data
@property
def title(self) -> str:
return self.get_tag_data("title")
@property
def link(self) -> str:
return self.get_tag_data("link")
@property
def creator(self) -> typing.Optional[str]:
try:
return self.get_tag_data("dc:creator")
except KeyError:
return None
@property
def description(self) -> str:
# TODO Testing
return self.get_tag_data("description")
@property
def date(self) -> str:
# TODO datetime format
return self.get_tag_data("pubDate")
@property
def guid(self) -> int:
return int(self.get_tag_data("guid"))
@property
def is_researched(self) -> bool:
return "ytdl_infos" in self.__dict__
def salvage_cache(self, cache: "RVElement") -> None:
if cache.is_researched:
self.__dict__["ytdl_infos"] = cache.__dict__["ytdl_infos"]
log.debug(f"From cache: {self}")
if cache.downloaded_filepath:
self.downloaded_filepath = cache.downloaded_filepath
if cache.watched:
self.watched = True
def __str__(self) -> str:
str = f"{self.guid}: {self.creator if self.creator else '?'} {self.title}"
if self.is_researched:
if self.is_video:
str += f" ({format_duration(self.duration)})"
else:
str += " (N/A)"
else:
str += " (?)"
str += f" {self.link}"
return str
@property
def downloaded(self) -> bool:
if not self.is_researched:
return False
return os.path.isfile(self.filepath)
@functools.cached_property
def ytdl_infos(self) -> typing.Optional[dict]:
log.info(f"Researching: {self}")
try:
infos = self.parent.ytdl_dry.extract_info(self.link, download=False)
except KeyboardInterrupt as e:
raise e
except yt_dlp.utils.DownloadError as e:
# TODO Still raise in case of temporary network issue
log.warning(e)
infos = None
if infos:
infos = self.parent.ytdl_dry.sanitize_info(infos)
# Save database once it's been computed
self.__dict__["ytdl_infos"] = infos
self.parent.save()
return infos
@property
def duration(self) -> int:
assert self.is_video
assert self.ytdl_infos
return self.ytdl_infos["duration"]
@property
def is_video(self) -> bool:
# Duration might be missing in playlists and stuff
return self.ytdl_infos is not None and "duration" in self.ytdl_infos
@property
def filepath(self) -> str:
assert self.is_video
if self.downloaded_filepath:
return self.downloaded_filepath
return self.parent.ytdl_dry.prepare_filename(self.ytdl_infos)
@property
def filename(self) -> str:
assert self.is_video
return os.path.splitext(self.filepath)[0]
def download(self) -> None:
assert self.is_video
log.info(f"Downloading: {self}")
if self.parent.args.research:
del self.ytdl_infos
if not self.parent.args.dryrun:
with yt_dlp.YoutubeDL(self.parent.ytdl_opts) as ydl:
ydl.add_post_processor(SaveInfoPP(self))
ydl.process_ie_result(self.ytdl_infos, download=True)
self.parent.save()
def update_post_download(self, info: dict) -> None:
self.downloaded_filepath = self.parent.ytdl_dry.prepare_filename(info)
@property
def was_downloaded(self) -> bool:
return self.downloaded_filepath is not None
def preload(self) -> None:
assert self.is_video
if self.downloaded:
log.debug(f"Currently downloaded: {self}")
return
if self.was_downloaded:
log.debug(f"Downloaded previously: {self}")
return
self.download()
def matches_filter(self, args: configargparse.Namespace) -> bool:
# Inexpensive filters
if args.seen != "any" and (args.seen == "seen") != self.watched:
log.debug(f"Not {args.seen}: {self}")
return False
if args.title and not re.search(args.title, self.title):
log.debug(f"Title not matching {args.title}: {self}")
return False
if args.guid and not re.search(args.guid, str(self.guid)):
log.debug(f"Guid not matching {args.guid}: {self}")
return False
if args.link and not re.search(args.link, self.link):
log.debug(f"Link not matching {args.link}: {self}")
return False
if args.creator and (
not self.creator or not re.search(args.creator, self.creator)
):
log.debug(f"Creator not matching {args.creator}: {self}")
return False
# Expensive filters
if not self.is_video:
log.debug(f"Not a video: {self}")
return False
if args.duration and not compare_duration(args.duration)(self.duration):
log.debug(
f"Duration {self.duration} not matching {args.duration}: {self}"
)
return False
return True
def watch(self) -> None:
if not self.downloaded:
self.download()
cmd = ["mpv", self.filepath]
log.debug(f"Running {cmd}")
if not self.parent.args.dryrun:
proc = subprocess.run(cmd)
proc.check_returncode()
self.watched = True
self.parent.save()
def clean(self) -> None:
assert self.is_video
log.info(f"Removing gone video: {self.filename}*")
for file in os.listdir():
if file.startswith(self.filename):
log.debug(f"Removing file: {file}")
if not self.parent.args.dryrun:
os.unlink(file)
class RVDatabase:
SAVE_FILE = ".cache.p"
args: configargparse.Namespace
elements: list[RVElement]
def __init__(self, args: configargparse.Namespace) -> None:
self.args = args
def save(self) -> None:
log.debug("Saving cache")
if self.args.dryrun:
return
with open(self.SAVE_FILE, "wb") as save_file:
pickle.dump(self, save_file)
@classmethod
def load(cls) -> typing.Optional["RVDatabase"]:
try:
with open(cls.SAVE_FILE, "rb") as save_file:
return pickle.load(save_file)
except (TypeError, AttributeError, EOFError):
log.warning("Corrupt / outdated cache, it will be rebuilt.")
except FileNotFoundError:
pass
return None
def salvage_cache(self, cache: "RVDatabase") -> None:
log.debug(f"Salvaging cache")
cache_els = dict()
for cache_el in cache.elements:
cache_els[cache_el.guid] = cache_el
for el in self.elements:
if el.guid in cache_els:
el.salvage_cache(cache_els[el.guid])
def clean_cache(self, cache: "RVDatabase") -> None:
log.debug(f"Cleaning cache")
self_els = dict()
for self_el in self.elements:
self_els[self_el.guid] = self_el
for el in cache.elements:
if el.guid not in self_els:
if el.is_researched and el.is_video:
el.clean()
def import_cache(self, cache: "RVDatabase") -> None:
log.debug(f"Importing cache")
self.feed_xml = cache.feed_xml
self.read_feed()
@functools.cached_property
def feed_xml(self) -> minidom.Document:
log.info("Fetching RSS feed")
with urllib.request.urlopen(self.args.feed) as request:
return minidom.parse(request)
def read_feed(self) -> None:
self.elements = []
for item in self.feed_xml.getElementsByTagName("item"):
element = RVElement(self, item)
self.elements.insert(0, element)
log.debug(f"Known: {element}")
def clean(self) -> None:
log.debug("Cleaning")
filenames = set()
for element in self.elements:
if element.is_video:
filenames.add(element.filename)
for file in os.listdir():
if file == RVDatabase.SAVE_FILE:
continue
if not os.path.isfile(file):
continue
for filename in filenames:
if file.startswith(filename):
break
else:
log.info(f"Removing unknown file: {file}")
if not self.args.dryrun:
os.unlink(file)
@property
def all_researched(self) -> bool:
for element in self.elements:
if not element.is_researched:
return False
return True
def attempt_clean(self) -> None:
if self.all_researched:
self.clean()
@property
def ytdl_opts(self) -> dict:
return {"format": self.args.format, "allsubtitles": self.args.subtitles}
@property
def ytdl_dry_opts(self) -> dict:
opts = self.ytdl_opts.copy()
opts.update({"quiet": True})
return opts
@property
def ytdl_dry(self) -> yt_dlp.YoutubeDL:
return yt_dlp.YoutubeDL(self.ytdl_dry_opts)
def filter(self, args: configargparse.Namespace) -> typing.Iterable[RVElement]:
elements: typing.Iterable[RVElement]
# Inexpensive sort
if args.order == "new":
elements = reversed(self.elements)
elif args.order == "title":
elements = sorted(self.elements, key=lambda el: el.title)
elif args.order == "creator":
elements = sorted(self.elements, key=lambda el: el.creator or "")
elif args.order == "link":
elements = sorted(self.elements, key=lambda el: el.link)
elif args.order == "random":
elements_random = self.elements.copy()
random.shuffle(elements_random)
elements = elements_random
else:
elements = self.elements
# Possibly expensive filtering
elements = filter(lambda el: el.matches_filter(args), elements)
# Expensive sort
if args.order == "short":
elements = sorted(
elements, key=lambda el: el.duration if el.is_video else 0
)
elif args.order == "long":
elements = sorted(
elements, key=lambda el: el.duration if el.is_video else 0, reverse=True
)
# Post sorting filtering
if args.total_duration:
rem = parse_duration(args.total_duration)
old_els = list(elements)
elements = list()
while rem > 0:
for el in old_els:
if el.duration < rem:
elements.append(el)
rem -= el.duration
old_els.remove(el)
break
else:
break
return elements
def get_args() -> configargparse.Namespace:
@ -32,45 +481,85 @@ def get_args() -> configargparse.Namespace:
+ "an RSS aggregator",
default_config_files=[defaultConfigPath],
)
# Runtime settings
parser.add_argument(
"-v",
"--verbosity",
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
default=None,
help="Verbosity of log messages",
)
parser.add(
"-c", "--config", required=False, is_config_file=True, help="Configuration file"
)
parser.add(
"-n",
"--dryrun",
help="Only pretend to do actions",
action="store_const",
const=True,
default=False,
)
# Input/Output
parser.add(
"--feed",
help="URL of the RSS feed (must be public for now)",
env_var="RSS_VIDEOS_FEED",
required=True,
)
parser.add(
"--research",
help="Fetch video info again",
action="store_true",
)
parser.add(
"--no-refresh",
dest="refresh",
help="Don't fetch feed",
action="store_false",
)
parser.add(
"--videos",
help="Directory to store videos",
env_var="RSS_VIDEOS_VIDEO_DIR",
required=True,
)
# Which videos
parser.add(
"-n",
"--dryrun",
help="Do not download the videos",
action="store_const",
const=True,
default=False,
"--order",
choices=("old", "new", "title", "creator", "link", "short", "long", "random"),
default="old",
help="Sorting mechanism",
)
# TODO This feature might require additional documentation and an on/off switch
parser.add("--guid", help="Regex to filter guid")
parser.add("--creator", help="Regex to filter by creator")
parser.add("--title", help="Regex to filter by title")
parser.add("--link", help="Regex to filter by link")
parser.add("--duration", help="Comparative to filter by duration")
parser.add(
"--track",
help="Directory where download videos are marked "
+ "to not download them after deletion.",
env_var="RSS_VIDEOS_TRACK",
required=False,
default=".rssVideos",
"--seen",
choices=("seen", "unseen", "any"),
default="unseen",
help="Only include seen/unseen/any videos",
)
parser.add(
"--total-duration",
help="Use videos that fit under the total given",
)
# TODO Envrionment variables
parser.add(
"--max-duration",
help="Skip video longer than this amount of seconds",
help="(Deprecated, use --duration instead)",
env_var="RSS_VIDEOS_MAX_DURATION",
type=int,
default=0,
)
# TODO Allow to ask
# How to download
parser.add(
"--format",
help="Use this format to download videos."
@ -85,209 +574,87 @@ def get_args() -> configargparse.Namespace:
action="store_true",
)
parser.add(
"action",
nargs="?",
choices=(
"download",
"list",
"watch",
"binge",
"clean",
"seen",
"unseen",
),
default="download",
)
args = parser.parse_args()
args.videos = os.path.realpath(os.path.expanduser(args.videos))
args.track = os.path.expanduser(args.track)
if not os.path.isabs(args.track):
args.track = os.path.realpath(os.path.join(args.videos, args.track))
if not args.duration and args.max_duration:
args.duration = str(args.max_duration)
return args
def get_links(args: configargparse.Namespace) -> list[str]:
"""
Read the feed XML, get the links
"""
links = list()
with urllib.request.urlopen(args.feed) as request:
with minidom.parse(request) as xmldoc:
for item in xmldoc.getElementsByTagName("item"):
try:
linkNode = item.getElementsByTagName("link")[0]
link: str = linkNode.childNodes[0].data
if link not in links:
links.append(link)
except BaseException as e:
print("Error while getting link from item:", e)
continue
return links
def get_video_infos(
args: configargparse.Namespace, ydl_opts: dict, links: list[str]
) -> dict[str, dict]:
"""
Filter out non-video links and store video download info
and associated filename
"""
videosInfos = dict()
dry_ydl_opts = ydl_opts.copy()
dry_ydl_opts.update({"simulate": True, "quiet": True})
with youtube_dl.YoutubeDL(dry_ydl_opts) as ydl:
for link in links:
print(f"Researching {link}...")
try:
infos = ydl.extract_info(link)
if args.max_duration > 0 and infos["duration"] > args.max_duration:
print(
f"{infos['title']}: Skipping as longer than max duration: "
f"{infos['duration']} > {args.max_duration}"
)
continue
filepath = ydl.prepare_filename(infos)
filename, extension = os.path.splitext(filepath)
videosInfos[filename] = infos
print(f"{infos['title']}: Added")
except BaseException as e:
print(e)
continue
return videosInfos
def get_downloaded_videos(
args: configargparse.Namespace, videosInfos: dict[str, dict]
) -> tuple[set[str], set[str]]:
videosDownloaded = set()
videosPartiallyDownloaded = set()
"""
Read the directory content, delete everything that's not a
video on the download list or already downloaded
"""
for filepath in os.listdir(args.videos):
fullpath = os.path.join(args.videos, filepath)
if not os.path.isfile(fullpath):
continue
filename, extension = os.path.splitext(filepath)
for onlineFilename in videosInfos.keys():
# Full name already there: completly downloaded
# → remove from the download list
if filename == onlineFilename:
videosDownloaded.add(onlineFilename)
break
elif filename.startswith(onlineFilename):
# Subtitle file
# → ignore
if filename.endswith(".vtt"):
break
# Partial name already there: not completly downloaded
# → keep on the download list
videosPartiallyDownloaded.add(onlineFilename)
break
# Unrelated filename: delete
else:
print(f"Deleting: {filename}")
os.unlink(fullpath)
return videosDownloaded, videosPartiallyDownloaded
def get_tracked_videos(args: configargparse.Namespace, known: set[str]) -> set[str]:
"""
Return videos previously downloaded (=tracked) amongst the unread videos.
This is stored in the tracking directory as empty extension-less files.
Other tracking markers (e.g. for now read videos) are deleted.
"""
videosTracked = set()
for filepath in os.listdir(args.track):
fullpath = os.path.join(args.track, filepath)
if not os.path.isfile(fullpath):
continue
# Here filename is a filepath as no extension
if filepath in known:
videosTracked.add(filepath)
else:
os.unlink(fullpath)
return videosTracked
def main() -> None:
args = get_args()
configure_logging(args)
os.makedirs(args.videos, exist_ok=True)
os.makedirs(args.track, exist_ok=True)
ydl_opts = {"format": args.format, "allsubtitles": args.subtitles}
print("→ Retrieveing RSS feed")
links = get_links(args)
# Oldest first
links = links[::-1]
print(f"→ Getting infos on {len(links)} unread articles")
videosInfos = get_video_infos(args, ydl_opts, links)
print(f"→ Deciding on what to do for {len(videosInfos)} videos")
videosDownloaded, videosPartiallyDownloaded = get_downloaded_videos(
args, videosInfos
)
videosTracked = get_tracked_videos(args, set(videosInfos.keys()))
# Deciding for the rest based on the informations
def markTracked(filename: str) -> None:
markerPath = os.path.join(args.track, onlineFilename)
open(markerPath, "a").close()
videosToDownload: set[str] = set()
videosReads: set[str] = set()
for onlineFilename in videosInfos.keys():
# If the video was once downloaded but manually deleted,
# the marker should be left
if onlineFilename in videosTracked:
print(f"Should be marked as read: {onlineFilename}")
# TODO Automatically do that one day maybe?
# Need to login to the FreshRSS API and keep track of
# the item id along the process
videosReads.add(onlineFilename)
elif onlineFilename in videosDownloaded:
markTracked(onlineFilename)
print(f"Already downloaded: {onlineFilename}")
else:
if onlineFilename in videosPartiallyDownloaded:
print(f"Will be continued: {onlineFilename}")
else:
print(f"Will be downloaded: {onlineFilename}")
videosToDownload.add(onlineFilename)
# Download the missing videos
print(f"→ Downloading {len(videosToDownload)} videos")
os.chdir(args.videos)
exit_code = 0
with youtube_dl.YoutubeDL(ydl_opts) as ydl:
for onlineFilename, infos in videosInfos.items():
if onlineFilename not in videosToDownload:
continue
database = RVDatabase(args)
cache = RVDatabase.load()
feed_fetched = False
if args.refresh:
try:
database.read_feed()
feed_fetched = True
except urllib.error.URLError as err:
if args.action == "download":
raise RuntimeError("Couldn't fetch feed, refusing to download")
# This is a quirky failsafe in case of no internet connection,
# so the script doesn't go noting that no element is a video.
if not feed_fetched:
if cache:
log.warning("Using cached feed.")
database.import_cache(cache)
else:
raise FileNotFoundError("Feed not fetched and no cached feed.")
if cache:
database.salvage_cache(cache)
database.clean_cache(cache)
database.save()
# Really download
if args.dryrun:
print(f"Would download {onlineFilename}")
log.debug(f"Running action")
if args.action == "clean":
database.clean()
else:
duration = 0
for element in database.filter(args):
if args.action == "download":
element.preload()
elif args.action == "list":
print(element)
elif args.action in ("watch", "binge"):
element.watch()
if args.action == "watch":
break
elif args.action == "seen":
if not element.watched:
log.info(f"Maked as seen: {element}")
element.watched = True
elif args.action == "unseen":
if element.watched:
log.info(f"Maked as unseen: {element}")
element.watched = False
else:
# Apparently that thing is transformed from a LazyList
# somewhere in the normal yt_dlp process
if isinstance(infos["thumbnails"], youtube_dl.utils.LazyList):
infos["thumbnails"] = infos["thumbnails"].exhaust()
try:
ydl.process_ie_result(infos, True, {})
markTracked(onlineFilename)
except BaseException as e:
print(e)
exit_code = 1
continue
sys.exit(exit_code)
raise NotImplementedError(f"Unimplemented action: {args.action}")
duration += element.duration if element.is_video else 0
log.info(f"Total duration: {format_duration(duration)}")
database.attempt_clean()
database.save()
if __name__ == "__main__":

View file

@ -94,7 +94,7 @@ XJS*C4JDBQADN1.NSBN3*2IDNEN*GTUBE-STANDARD-ANTI-UBE-TEST-EMAIL*C.34X"""
body = f"\n\n{args.body}"
text = f"""Date: {now_email}
From: {getattr(args, 'from')}
From: {args.me} <{getattr(args, 'from')}>
Subject: {args.subject}
To: {args.to}
Message-ID: {mid}
@ -114,7 +114,9 @@ Input arguments:
."""
# Transmission setup
cmd = ["ssh", args.origin]
cmd = []
if args.origin != "localhost":
cmd += ["ssh", args.origin]
if args.security == "plain":
cmd += ["socat", "-", f"tcp:{args.destination}:{args.port}"]
elif args.security == "ssl":

View file

@ -33,6 +33,7 @@ audio_br_bi = 128000
quota_by = int(sys.argv[1])
in_file = sys.argv[2]
out_file = sys.argv[3]
filters = sys.argv[4:]
quota_bi = quota_by * 8
duration = duration_file(in_file)
@ -40,15 +41,21 @@ tot_br_bi = quota_bi / duration
video_br_bi = int(tot_br_bi - audio_br_bi)
assert video_br_bi > 0, "Not even enough space for audio"
cmd = [
"ffmpeg",
"-i",
in_file,
"-b:v",
str(video_br_bi),
"-b:a",
str(audio_br_bi),
out_file,
]
cmd = (
[
"ffmpeg",
"-i",
in_file,
]
+ filters
+ [
"-b:v",
str(video_br_bi),
"-b:a",
str(audio_br_bi),
out_file,
]
)
print(" ".join(cmd))
subprocess.run(cmd, check=True)

View file

@ -1,2 +1,3 @@
extrc
*.zwc
.zcompdump

View file

@ -30,7 +30,7 @@ then
fi
# If we have junest installed DE try them before all others
sessions_dir_junest="$HOME/.local/share/xsessions"
sessions_dir_junest="$HOME/.junest/usr/share/xsessions"
if [ -d "$sessions_dir_junest" ]
then
sessions_dirs="$sessions_dir_junest $sessions_dirs"

View file

@ -6,7 +6,7 @@
# Sourced by display managers
#
. ~/.xprofile
[ -f ~/.xprofile ] && . ~/.xprofile
if [ -f ~/.config/override_dm_choice ]
then
. ~/.config/xinitrc