Move scripts dir inside hm

And remove weird path contraptions
This commit is contained in:
Geoffrey Frogeye 2023-11-30 22:09:44 +01:00
parent 050901da2f
commit edeef96133
Signed by: geoffrey
GPG key ID: C72403E7F82E6AD8
49 changed files with 2 additions and 11 deletions

53
hm/scripts/.bsh/bashrc Normal file
View file

@ -0,0 +1,53 @@
#!/usr/bin/env bash
# Geoffrey's trimmed and condensed shell configuration file,
# with an healthy and safe dose of aliases and config.
# Can be embedded to any server
# Completion for existing commands
alias ls='ls -h --color=auto'
alias mkdir='mkdir -v'
alias cp="cp -i"
alias mv="mv -iv"
alias free='free -h'
alias df='df -h'
# Advanced completions
# (not busybox compatible)
alias cp="cp -i --reflink=auto"
alias grep="grep --color=auto"
alias dd='dd status=progress'
alias rm='rm -v --one-file-system'
alias free='free -m'
alias diff='diff --color=auto'
alias dmesg='dmesg --ctime'
# Frequent mistakes
alias sl=ls
alias al=la
alias mdkir=mkdir
alias systemclt=systemctl
alias please=sudo
# Shortcuts for commonly used commands
alias ll="ls -l"
alias la="ls -la"
# alias s='sudo -s -E'
# Bash options
shopt -s expand_aliases
shopt -s histappend
HISTCONTROL=ignoreboth:erasedups
# Program configuration
export TIME_STYLE='+%Y-%m-%d %H:%M:%S'
export LESS=-R
export LESS_TERMCAP_mb=$'\E[1;31m'
export LESS_TERMCAP_md=$'\E[1;36m'
export LESS_TERMCAP_me=$'\E[0m'
export LESS_TERMCAP_so=$'\E[01;44;33m'
export LESS_TERMCAP_se=$'\E[0m'
export LESS_TERMCAP_us=$'\E[1;32m'
export LESS_TERMCAP_ue=$'\E[0m'

View file

@ -0,0 +1,25 @@
#!/usr/bin/env bash
# Prefered programs environment variables
export PAGER=less
if [ -x "$(command -v nvim)" ]
then
export EDITOR=nvim
alias vi=nvim
elif [ -x "$(command -v vim)" ]
then
export EDITOR=vim
alias vi=vim
else
export EDITOR=vi
fi
# Prompt
if [[ $USER == 'root' ]]; then
col=31;
elif [[ $USER == 'geoffrey' ]]; then
col=32;
else
col=33;
fi
export PS1="\[\e]2;\u@\H \w\a\]\[\e[0;37m\][\[\e[0;${col}m\]\u\[\e[0;37m\]@\[\e[0;34m\]\h \[\e[0;36m\]\W\[\e[0;37m\]]\$\[\e[0m\] "

50
hm/scripts/.bsh/inputrc Normal file
View file

@ -0,0 +1,50 @@
$include /etc/inputrc
set bell-style none
set colored-completion-prefix on
set colored-stats on
set completion-ignore-case on
set completion-query-items 200
set editing-mode vi
set history-preserve-point on
set history-size 10000
set horizontal-scroll-mode off
set mark-directories on
set mark-modified-lines off
set mark-symlinked-directories on
set match-hidden-files on
set menu-complete-display-prefix on
set page-completions on
set print-completions-horizontally off
set revert-all-at-newline off
set show-all-if-ambiguous on
set show-all-if-unmodified on
set show-mode-in-prompt on
set skip-completed-text on
set visible-stats off
$if mode=vi
# these are for vi-command mode
set keymap vi-command
"k": history-search-backward
"j": history-search-forward
"\e[A": history-search-backward
"\e[B": history-search-forward
Control-l: clear-screen
# these are for vi-insert mode
set keymap vi-insert
"jk": vi-movement-mode
"\e[A": history-search-backward
"\e[B": history-search-forward
Control-l: clear-screen
# Switch between thin cursor and thicc block depending on vi mode
$if term=linux
set vi-ins-mode-string \1\e[?0c\2
set vi-cmd-mode-string \1\e[?8c\2
$else
set vi-ins-mode-string \1\e[6 q\2
set vi-cmd-mode-string \1\e[2 q\2
$endif
$endif

51
hm/scripts/.bsh/vimrc Normal file
View file

@ -0,0 +1,51 @@
cmap w!! w !sudo tee > /dev/null %
filetype indent on
filetype on
filetype plugin on
imap jk <Esc>
let g:netrw_fastbrowse = 0
nmap <C-H> :bp<CR>
nmap <C-J> jjjjjjjjjjjjjjjjjjjjj
nmap <C-K> kkkkkkkkkkkkkkkkkkkkk
nmap <C-L> :bn<CR>
nmap <Enter> o<Esc>
nnoremap <Leader>s :%s/\<<C-r><C-w>\>/
set backspace=indent,eol,start
set cursorcolumn
set encoding=utf-8
set expandtab
set gdefault
set hidden
set hlsearch
set ignorecase
set incsearch
set lazyredraw
set list
set listchars=tab:╾╌,trail,extends:↦,precedes:↤,nbsp:_
set noerrorbells
set number
set ruler
set scrolloff=10
set shiftwidth=4
set showbreak=
set showcmd
set smartcase
set splitbelow
set tabstop=4
set title
set updatetime=250
set visualbell
set wildmenu
set wildmode=longest,list
set wrap
syntax enable
vmap <Enter> <Esc>
if has('nvim')
set inccommand=nosplit
set relativenumber
augroup every
autocmd!
au InsertEnter * set norelativenumber
au InsertLeave * set relativenumber
augroup END
endif

2
hm/scripts/.gitignore vendored Normal file
View file

@ -0,0 +1,2 @@
sct
node_modules

193
hm/scripts/archive Executable file
View file

@ -0,0 +1,193 @@
#!/usr/bin/env nix-shell
#! nix-shell -i python3 --pure
#! nix-shell -p python3 python3Packages.coloredlogs
import argparse
import logging
import os
import sys
import coloredlogs
coloredlogs.install(level="DEBUG", fmt="%(levelname)s %(message)s")
log = logging.getLogger()
# Coding conventions:
# No leading or trailing slashes. Let os.path.join do its job
# TODO Config arparse and pass args to the functions. No globals
# Finding directories
assert "HOME" in os.environ, "Home directory unknown"
DOCS = os.path.realpath(os.path.join(os.environ["HOME"], "Documents"))
assert os.path.isdir(DOCS), "Documents folder not found"
ARCS = os.path.realpath(os.path.join(os.environ["HOME"], "Archives"))
assert os.path.isdir(ARCS), "Archives folder not found"
def dirRange(relpath):
splits = relpath.split(os.path.sep)
res = list()
for p in range(len(splits)):
partPath = os.path.join(*splits[: p + 1])
arcPath = os.path.join(os.path.join(ARCS, partPath))
docPath = os.path.join(os.path.join(DOCS, partPath))
res.append((docPath, arcPath))
return res
def travel(relpath):
"""
Dunno what this will do, let's write code and see.
"""
wholeRange = dirRange(relpath)
for tup in wholeRange:
isLast = wholeRange[-1] == tup
docPath, arcPath = tup
linkPath = os.path.relpath(arcPath, start=docPath)
log.debug(f"47 {tup}")
if not os.path.exists(docPath) and not os.path.exists(arcPath):
log.error("Not existing")
sys.exit(1)
elif os.path.isdir(docPath) and os.path.isdir(arcPath) and not isLast:
log.debug("Both folder")
continue
elif os.path.isdir(docPath) and os.path.isdir(arcPath) and isLast:
log.error("This should fail for some reason, maybe")
sys.exit(1)
elif os.path.islink(docPath) and os.path.exists(arcPath):
currentLink = os.readlink(docPath)
if currentLink != linkPath:
log.warning(
f"'{docPath}' is pointing to '{currentLink}' "
+ f"but should point to '{linkPath}'."
)
# TODO Fixing if asked for
sys.exit(1)
log.debug("Early link already exists {docPath} → {arcPath}")
return
elif not os.path.exists(docPath) and os.path.exists(arcPath):
log.debug("Only existing on archive side, linking")
print(f"ln -s {linkPath} {docPath}")
elif os.path.exists(docPath) and not os.path.exists(arcPath) and isLast:
log.debug("Only existing on doc side, moving and linking")
print(f"mv {docPath} {arcPath}")
print(f"ln -s {linkPath} {docPath}")
elif os.path.exists(docPath) and not os.path.exists(arcPath) and not isLast:
raise NotImplementedError("Here comes the trouble")
else:
log.error("Unhandled case")
sys.exit(1)
def ensureLink(relpath):
"""
Ensure that ~/Documents/$relpath points to ~/Archives/$relpath
"""
arcPath = os.path.join(os.path.join(ARCS, relpath))
docPath = os.path.join(os.path.join(DOCS, relpath))
assert os.path.exists(arcPath)
# For each tree element of the path
for docPath, arcPath in dirRange(relpath):
linkPath = os.path.relpath(arcPath, start=docPath)
def installLink():
if args.dry:
print(f"ln -s {linkPath} {docPath}")
else:
os.symlink(linkPath, docPath)
if os.path.islink(docPath):
currentLink = os.readlink(docPath)
if currentLink != linkPath:
log.warning(
f"'{docPath}' is pointing to '{currentLink}' "
+ f"but should point to '{linkPath}'. Fixing"
)
if args.dry:
print(f"rm {docPath}")
else:
os.unlink(docPath)
installLink()
return
elif not os.path.exists(docPath):
installLink()
return
elif os.path.isdir(docPath):
continue
else:
raise RuntimeError(
f"'{docPath}' exists and is not a directory "
+ f"or a link. Unable to link it to '{linkPath}'"
)
raise RuntimeError(
f"'{docPath}' is a directory. Unable to link it to " + f"'{linkPath}'"
)
def archive(docdir):
docdir = os.path.realpath(args.dir)
assert os.path.isdir(docdir), docdir + " must be a directory"
assert docdir.startswith(DOCS), "Directory is not in the document folder"
assert not docdir.startswith(ARCS), "Directory is already in the archive folder"
reldir = os.path.relpath(docdir, DOCS)
print("ARC", reldir)
arcdir = os.path.join(ARCS, reldir)
parentArcdir = os.path.realpath(os.path.join(arcdir, ".."))
parentDocdir = os.path.realpath(os.path.join(docdir, ".."))
linkDest = os.path.relpath(arcdir, parentDocdir)
# BULLSHIT
# If the directory exists
if os.path.isdir(arcdir):
return
# for f in os.listdir(arcdir):
# assert os.path.isdir(f), "Something unknown in Archive dir")
# archive(os.path.join(arcdir, f))
# If the directory doesn't exist, create the directories under it and move all the folder
else:
if args.dry:
print("mkdir -p", parentArcdir)
else:
os.makedirs(parentArcdir, exist_ok=True)
if args.dry:
print("mv", docdir, arcdir)
else:
os.rename(docdir, arcdir)
if args.dry:
print("ln -s", linkDest, docdir)
else:
os.symlink(linkDest, docdir)
def unarchive(arcdir):
pass
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Place a folder in ~/Documents in ~/Documents/Archives and symlink it"
)
parser.add_argument(
"dir", metavar="DIRECTORY", type=str, help="The directory to archive"
)
parser.add_argument("-d", "--dry", action="store_true")
args = parser.parse_args()
args.dry = True # DEBUG
# archive(args.dir)
ensureLink(args.dir)

70
hm/scripts/bsh Executable file
View file

@ -0,0 +1,70 @@
#!/usr/bin/env nix-shell
#! nix-shell -i bash --pure
#! nix-shell -p bash openssh coreutils gawk gnused
# TODO More integrated with current config
CACHE_DIR="${XDG_CACHE_DIR:-$HOME/.cache}/bsh"
FOLDER_NAME="geoffrey"
SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
mkdir -p "$CACHE_DIR"
if [ ! -f "${CACHE_DIR}/cmd" ]
then
# Preparation
WORK="${CACHE_DIR}/${FOLDER_NAME}"
DEST="/tmp/${FOLDER_NAME}"
mkdir "$WORK"
# TODO Maybe we should just set HOME there…
# Bashrc generation (sortable then unsortable)
grep -o '^\s*[^#]*' $SCRIPT_DIR/.bsh/bashrc | sed 's/^\s\+//' > "${WORK}/b"
echo "alias s='sudo -s -E bash --rcfile ${DEST}/b'" >> "${WORK}/b"
echo "export VIMINIT='source ${DEST}/v'" >> "${WORK}/b"
#echo "export TERMINFO=${DEST}/terminfo" >> "${WORK}/b"
echo "export INPUTRC=${DEST}/i" >> "${WORK}/b"
# Sort for compression efficiency (saves a whooping 12 bytes)
sort -u "${WORK}/b" > "${WORK}/b_sorted"
mv "${WORK}/b_sorted" "${WORK}/b"
dircolors --sh >> "${WORK}/b"
grep -o '^[^#]*' $SCRIPT_DIR/.bsh/bashrc_unsortable | sed 's/^\s\+//' >> "${WORK}/b"
# Other files generation
#mkdir -p "${WORK}/terminfo/${TERM:0:1}"
#if [ -f "/usr/share/terminfo/${TERM:0:1}/${TERM}" ]
#then
# cp "/usr/share/terminfo/${TERM:0:1}/${TERM}" "${WORK}/terminfo/${TERM:0:1}/${TERM}"
#elif [ -f "$HOME/.config/terminfo/${TERM:0:1}/${TERM}" ]
#then
# cp "$HOME/.config/terminfo/${TERM:0:1}/${TERM}" "${WORK}/terminfo/${TERM:0:1}/${TERM}"
#fi
grep -o '^\s*[^#]*' $SCRIPT_DIR/.bsh/inputrc | sed 's/^\s\+//' > "${WORK}/i"
grep -o '^\s*[^"]*' $SCRIPT_DIR/.bsh/vimrc | sed 's/^\s\+//' > "${WORK}/v"
# Crafting command
b64="$(cd "$CACHE_DIR"; tar czf - "$FOLDER_NAME" | base64 -w 0)"
echo "echo $b64|base64 -d|tar xzC /tmp" > "${CACHE_DIR}/cmd"
echo "bash --rcfile ${DEST}/b" >> "${CACHE_DIR}/cmd"
echo "rm -rf ${DEST}" >> "${CACHE_DIR}/cmd"
# TODO Do not remove unless last one connected
# Cleanup
rm -rf "$WORK"
fi
# To keep until https://github.com/openssh/openssh-portable/commit/f64f8c00d158acc1359b8a096835849b23aa2e86
# is merged
function _ssh {
if [ "${TERM}" = "alacritty" ]
then
TERM=xterm-256color ssh "$@"
else
ssh "$@"
fi
}
alias ssh='_ssh'
_ssh -t "$@" "$(cat "${CACHE_DIR}/cmd")"

30
hm/scripts/cached_pass Executable file
View file

@ -0,0 +1,30 @@
#!/usr/bin/env nix-shell
#! nix-shell -i bash
#! nix-shell -p bash pass libnotify
# TODO Password changed?
set -euo pipefail
if [[ $# -ne 1 ]]
then
echo "Usage: $0 pass-name"
exit 2
fi
name="$1"
BASEDIR="/tmp/cached_pass_${UID}"
mkdir -p "$BASEDIR"
chmod 700 "$BASEDIR"
name_base64="$(echo "$name" | base64)"
file="${BASEDIR}/${name_base64}"
if [ ! -s "${file}" ]
then
notify-send -u low "cached_pass" "Asking to cache: ${name}"
pass ${name} > "${file}"
fi
cat "${file}"

116
hm/scripts/camera_name_date Executable file
View file

@ -0,0 +1,116 @@
#!/usr/bin/env nix-shell
#! nix-shell -i python3 --pure
#! nix-shell -p python3 python3Packages.coloredlogs
"""
Same as picture_name_date
except it's tailored for OpenCamera.
It uses filenames, that way:
- Easier to get metadata
- JPG/DNG, MP4/SRT keep the same filename
"""
import argparse
import logging
import os
import re
import coloredlogs
log = logging.getLogger(__name__)
coloredlogs.install(level="DEBUG", fmt="%(levelname)s %(message)s", logger=log)
PATTERNS = [
re.compile( # OpenCamera
r"^(?P<type>IMG|VID)_"
# No TRIM. We don't want those as they're a short copy of an existing VID.
r"(?P<Y>\d\d\d\d)(?P<M>\d\d)(?P<D>\d\d)_"
r"(?P<h>\d\d)(?P<m>\d\d)(?P<s>\d\d)"
r"(?P<dup>_(\d+))?"
r"(?P<spec>_(PANO|HDR))?"
r"\.(?P<ext>jpg|dng|mp4|srt)$"
),
re.compile( # Samsung camera app (?)
r"(?P<Y>\d\d\d\d)(?P<M>\d\d)(?P<D>\d\d)_"
r"(?P<h>\d\d)(?P<m>\d\d)(?P<s>\d\d)"
r"\.(?P<ext>jpg|mp4)$"
),
re.compile( # Telegram Media Downloader photo
r"photo_"
r"(?P<Y>\d\d\d\d)-(?P<M>\d\d)-(?P<D>\d\d)_"
r"(?P<h>\d\d)-(?P<m>\d\d)-(?P<s>\d\d)"
r"(?P<dup>_\d{19})"
r"\.(?P<ext>jpg)$"
), # Time of publication, not time of photo!
re.compile( # Telegram Media Downloader video
r"video_"
r"(?P<Y>\d\d\d\d)-(?P<M>\d\d)-(?P<D>\d\d)_"
r"(?P<h>\d\d)-(?P<m>\d\d)-(?P<s>\d\d)"
r"(?P<dup>_\d{19})"
r"\.(?P<ext>mp4)$"
), # Time of publication, not time of video!
]
def main(args: argparse.Namespace) -> None:
for root, _, files in os.walk(args.dir):
for filename in files:
full_path = os.path.join(root, filename)
for pattern in PATTERNS:
match = re.match(pattern, filename)
if match:
break
else:
log.warning(f"{full_path} doesn't any pattern")
continue
# Build new filename
m = match.groupdict()
new_filename = (
f"{m['Y']}-{m['M']}-{m['D']}_"
f"{m['h']}-{m['m']}-{m['s']}"
f"{m.get('dup') or ''}"
f"{m.get('spec') or ''}"
f"{args.suffix}"
f".{m['ext']}"
)
new_path = os.path.join(args.dir, new_filename)
# TODO Allow keeping image in same folder
# Rename file
if full_path == new_path:
log.debug(f"{full_path} already at required filename")
continue
log.info(f"{full_path} →\t{new_path}")
if os.path.exists(new_path):
raise FileExistsError(f"{new_path} already exists!")
if not args.dry:
os.rename(full_path, new_path)
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Rename OpenCamera files based on their dates"
)
parser.add_argument(
"dir",
metavar="DIRECTORY",
type=str,
default=".",
nargs="?",
help="Directory containing the pictures",
)
parser.add_argument(
"-d",
"--dry",
action="store_true",
help="Do not actually rename, just show old and new path",
)
parser.add_argument(
"-s",
"--suffix",
default="",
help="Text to add before the extension",
)
args = parser.parse_args()
main(args)

18
hm/scripts/cleandev Executable file
View file

@ -0,0 +1,18 @@
#!/usr/bin/env nix-shell
#! nix-shell -i bash --pure
#! nix-shell -p bash findutils git gnumake
# Removes files that can be regenerated
# from a dev environment
find . -type d -name bower_components -or -name node_modules -print0 | while read file; do
rm -rf "$file"
done
find . -type f -name Makefile -print0 | while IFS= read -r -d '' file; do
echo "--> $file"
(cd "${file//Makefile}"; make clean)
done
find . -type d -name .git -print0 | while IFS= read -r -d '' dir; do
echo "--> $file"
(cd "$dir"; git gc)
done

243
hm/scripts/compressPictureMovies Executable file
View file

@ -0,0 +1,243 @@
#!/usr/bin/env nix-shell
#! nix-shell -i python3 --pure
#! nix-shell -p python3 python3Packages.coloredlogs python3Packages.progressbar2 ffmpeg
import datetime
import hashlib
import json
import logging
import os
import shutil
import statistics
import subprocess
import sys
import tempfile
import time
import coloredlogs
import progressbar
coloredlogs.install(level="DEBUG", fmt="%(levelname)s %(message)s")
log = logging.getLogger()
# Constants
PICTURES_FOLDER = os.path.join(os.path.expanduser("~"), "Images")
ORIGINAL_FOLDER = os.path.join(os.path.expanduser("~"), ".ImagesOriginaux")
MOVIE_EXTENSIONS = ["mov", "avi", "mp4", "3gp", "webm", "mkv"]
OUTPUT_EXTENSION = "webm"
OUTPUT_FFMPEG_PARAMETERS = ["-c:v", "libvpx-vp9", "-crf", "30", "-b:v", "0"]
# OUTPUT_FFMPEG_PARAMETERS = ["-c:v", "libaom-av1", "-crf", "30", "-strict", "experimental", "-c:a", "libopus"]
DURATION_MAX_DEV = 1
def videoMetadata(filename):
assert os.path.isfile(filename)
cmd = ["ffmpeg", "-i", filename, "-f", "ffmetadata", "-"]
p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
p.check_returncode()
metadataRaw = p.stdout
data = dict()
for metadataLine in metadataRaw.split(b"\n"):
# Skip empty lines
if not len(metadataLine):
continue
# Skip comments
if metadataLine.startswith(b";"):
continue
# Parse key-value
metadataLineSplit = metadataLine.split(b"=")
if len(metadataLineSplit) != 2:
log.warning("Unparsed metadata line: `{}`".format(metadataLine))
continue
key, val = metadataLineSplit
key = key.decode().lower()
val = val.decode()
data[key] = val
return data
def videoInfos(filename):
assert os.path.isfile(filename)
cmd = ["ffprobe", filename, "-print_format", "json", "-show_streams"]
p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
p.check_returncode()
infosRaw = p.stdout
infos = json.loads(infosRaw)
return infos
from pprint import pprint
def streamDuration(stream):
if "duration" in stream:
return float(stream["duration"])
elif "sample_rate" in stream and "nb_frames" in stream:
return int(stream["nb_frames"]) / int(stream["sample_rate"])
elif "tags" in stream and "DURATION" in stream["tags"]:
durRaw = stream["tags"]["DURATION"]
durSplit = durRaw.split(":")
assert len(durSplit) == 3
durSplitFloat = [float(a) for a in durSplit]
hours, minutes, seconds = durSplitFloat
return (hours * 60 + minutes) * 60 + seconds
else:
raise KeyError("Can't find duration information in stream")
def videoDuration(filename):
# TODO Doesn't work with VP8 / webm
infos = videoInfos(filename)
durations = [streamDuration(stream) for stream in infos["streams"]]
dev = statistics.stdev(durations)
assert dev <= DURATION_MAX_DEV, "Too much deviation ({} s)".format(dev)
return sum(durations) / len(durations)
todos = set()
totalSize = 0
totalDuration = 0
# Walk folders
log.info("Listing files in {}".format(PICTURES_FOLDER))
allVideos = list()
for root, dirs, files in os.walk(PICTURES_FOLDER):
# If folder is in ORIGINAL_FOLDER, skip it
if root.startswith(ORIGINAL_FOLDER):
continue
# Iterate over files
for inputName in files:
# If the file is not a video, skip it
inputNameBase, inputExt = os.path.splitext(inputName)
inputExt = inputExt[1:].lower()
if inputExt not in MOVIE_EXTENSIONS:
continue
allVideos.append((root, inputName))
log.info("Analyzing videos")
for root, inputName in progressbar.progressbar(allVideos):
inputNameBase, inputExt = os.path.splitext(inputName)
inputExt = inputExt[1:].lower()
# Generates all needed filepaths
## Found file
inputFull = os.path.join(root, inputName)
inputRel = os.path.relpath(inputFull, PICTURES_FOLDER)
## Original file
originalFull = os.path.join(ORIGINAL_FOLDER, inputRel)
originalRel = inputRel
assert not os.path.isfile(originalFull), originalFile + " exists"
## Compressed file
outputFull = os.path.join(root, inputNameBase + "." + OUTPUT_EXTENSION)
# If the extension is the same of the output one
if inputExt == OUTPUT_EXTENSION:
# Read the metadata of the video
meta = videoMetadata(inputFull)
# If it has the field with the original file
if "original" in meta:
# Skip file
continue
else:
assert not os.path.isfile(outputFull), outputFull + " exists"
size = os.stat(inputFull).st_size
try:
duration = videoDuration(inputFull)
except Exception as e:
log.warning("Can't determine duration of {}, skipping".format(inputFull))
log.debug(e, exc_info=True)
continue
todo = (inputFull, originalFull, outputFull, size, duration)
totalDuration += duration
totalSize += size
todos.add(todo)
log.info(
"Converting {} videos ({})".format(
len(todos), datetime.timedelta(seconds=totalDuration)
)
)
# From https://stackoverflow.com/a/3431838
def sha256(fname):
hash_sha256 = hashlib.sha256()
with open(fname, "rb") as f:
for chunk in iter(lambda: f.read(131072), b""):
hash_sha256.update(chunk)
return hash_sha256.hexdigest()
# Progress bar things
totalDataSize = progressbar.widgets.DataSize()
totalDataSize.variable = "max_value"
barWidgets = [
progressbar.widgets.DataSize(),
" of ",
totalDataSize,
" ",
progressbar.widgets.Bar(),
" ",
progressbar.widgets.FileTransferSpeed(),
" ",
progressbar.widgets.AdaptiveETA(),
]
bar = progressbar.DataTransferBar(max_value=totalSize, widgets=barWidgets)
bar.start()
processedSize = 0
for inputFull, originalFull, outputFull, size, duration in todos:
tmpfile = tempfile.mkstemp(
prefix="compressPictureMovies", suffix="." + OUTPUT_EXTENSION
)[1]
try:
# Calculate the sum of the original file
checksum = sha256(inputFull)
# Initiate a conversion in a temporary file
originalRel = os.path.relpath(originalFull, ORIGINAL_FOLDER)
originalContent = "{} {}".format(originalRel, checksum)
metadataCmd = ["-metadata", 'original="{}"'.format(originalContent)]
cmd = (
["ffmpeg", "-hide_banner", "-y", "-i", inputFull]
+ OUTPUT_FFMPEG_PARAMETERS
+ metadataCmd
+ [tmpfile]
)
p = subprocess.run(cmd)
p.check_returncode()
# Verify the durartion of the new file
newDuration = videoDuration(tmpfile)
dev = statistics.stdev((duration, newDuration))
assert dev < DURATION_MAX_DEV, "Too much deviation in duration"
# Move the original to the corresponding original folder
originalDir = os.path.dirname(originalFull)
os.makedirs(originalDir, exist_ok=True)
shutil.move(inputFull, originalFull)
# Move the converted file in place of the original
shutil.move(tmpfile, outputFull)
except Exception as e:
log.error("Couldn't process file {}".format(inputFull))
log.error(e, exc_info=True)
try:
os.unlink(tmpfile)
except Exception:
pass
# Progress bar things
processedSize += size
bar.update(processedSize)
bar.finish()
# TODO Iterate over the already compressed videos to assert the originals are
# in their correct place, else move them

12
hm/scripts/crepuscule Executable file
View file

@ -0,0 +1,12 @@
#!/usr/bin/env bash
# FIXME
if [ "$(cat /etc/hostname)" = "curacao.geoffrey.frogeye.fr" ]
then
echo 10000 | sudo tee /sys/class/backlight/intel_backlight/brightness
elif [ "$(cat /etc/hostname)" = "pindakaas.geoffrey.frogeye.fr" ]
then
echo 3000 | sudo tee /sys/class/backlight/edp-backlight/brightness
fi
automatrop -e base16_scheme=solarized-dark --tags color

13
hm/scripts/docker-image-childs Executable file
View file

@ -0,0 +1,13 @@
#!/usr/bin/env nix-shell
#! nix-shell -i bash --pure
#! nix-shell -p bash docker gnugrep coreutils
# Find the dependent child image from an image
parent="$1"
# From https://stackoverflow.com/a/41634462
for i in $(docker images -q)
do
docker history "$i" | grep -q "$parent" && echo "$i"
done | grep -v "$parent" | sort -u

8
hm/scripts/docker-rm Executable file
View file

@ -0,0 +1,8 @@
#!/usr/bin/env nix-shell
#! nix-shell -i bash --pure
#! nix-shell -p bash docker
docker unpause $(docker ps -q)
docker kill $(docker ps -q)
docker container prune -f
docker network prune -f

16
hm/scripts/dummy Executable file
View file

@ -0,0 +1,16 @@
#!/usr/bin/env nix-shell
#! nix-shell -i bash --pure
#! nix-shell -p bash tree coreutils
# Remplace le contenu d'un dossier par un fichier texte
# relatant son arborescense
dir="$(echo "$1" | sed 's|/\+||')"
if [ -d "$dir" ]; then
TEMP=$(mktemp)
tree -a -p -D -n "$dir" > "$TEMP"
mv "$dir" "$dir.bkp"
mv "$TEMP" "$dir"
else
echo "$dir n'est pas un dossier"
fi

19
hm/scripts/emergency-clean Executable file
View file

@ -0,0 +1,19 @@
#!/usr/bin/env nix-shell
#! nix-shell -i bash --pure
#! nix-shell -p bash
# Clears everything it can to save space
rm -rf $HOME/.cache
if command -v pacman > /dev/null; then
sudo pacman -Scc
fi
if command -v apt-get > /dev/null; then
sudo apt-get clean
fi
if command -v nix-store > /dev/null; then
sudo journalctl --vacuum-size=100M
fi
if command -v journalctl > /dev/null; then
sudo journalctl --vacuum-size=100M
fi

8
hm/scripts/gitCheckoutModes Executable file
View file

@ -0,0 +1,8 @@
#!/usr/bin/env nix-shell
#! nix-shell -i bash --pure
#! nix-shell -p bash git gnugrep coreutils findutils
# From https://stackoverflow.com/a/2083563
git diff --summary | grep --color 'mode change 100755 => 100644' | cut -d' ' -f7- | xargs -d'\n' chmod +x
git diff --summary | grep --color 'mode change 100644 => 100755' | cut -d' ' -f7- | xargs -d'\n' chmod -x

72
hm/scripts/gitghost Executable file
View file

@ -0,0 +1,72 @@
#!/usr/bin/env nix-shell
#! nix-shell -i bash --pure
#! nix-shell -p bash coreutils git gnused
# Replace git folders with a placeholder containing the remote and the commit
function prompt { # text
while true; do
read -p "$1 [yn] " yn
case $yn in
[Yy]* ) return 1;;
[Nn]* ) return 0;;
* ) echo "Please answer yes or no.";;
esac
done
}
if [[ "$#" == 0 || "$#" > 2 ]]
then
echo "Usage: $0 gitfolder [-y]"
exit 1
fi
folder="$(echo "$1" | sed 's/\/*$//')"
if [ "$2" == "-y" ]
then
donotask=true
fi
if [ ! -d "$folder/.git" ]
then
echo "$folder is not a git repository"
exit 1
fi
if [ -n "$(git -C "$folder" diff)" ]
then
echo "WARNING: There are unstaged change. Those will be discarded if you continue."
fi
echo "Be sure that every commit in the repository is backed up somewhere else, since those will be discarded."
TMPFILE=$(mktemp)
(echo "[gitplaceholder]"
echo "lastcommit=$(git log --format="%H" -n 1)"
echo
echo "[remote]"
git -C "$folder" remote -v
echo
echo "[branch]"
git -C "$folder" branch -v
echo
echo "[diff]"
git -C "$folder" diff -v) > $TMPFILE 2> /dev/null
if [ ! $donotask ]
then
less $TMPFILE
echo
echo "This will be written in place of $folder."
prompt "Do you want to continue ?"
if [ "$?" == 0 ]
then
echo "Canceled"
rm $TMPFILE
exit 0
fi
fi
echo "Dummying..."
rm -rf "$folder"
mv $TMPFILE $folder

12
hm/scripts/jour Executable file
View file

@ -0,0 +1,12 @@
#!/usr/bin/env bash
# FIXME
if [ "$(cat /etc/hostname)" = "curacao.geoffrey.frogeye.fr" ]
then
echo 40000 | sudo tee /sys/class/backlight/intel_backlight/brightness
elif [ "$(cat /etc/hostname)" = "pindakaas.geoffrey.frogeye.fr" ]
then
echo 3500 | sudo tee /sys/class/backlight/edp-backlight/brightness
fi
automatrop -e base16_scheme=solarized-light --tags color

23
hm/scripts/lestrte Executable file
View file

@ -0,0 +1,23 @@
#!/usr/bin/env nix-shell
#! nix-shell -i python3 --pure
#! nix-shell -p python3
import random
import sys
for line in sys.stdin:
nl = ""
word = ""
for c in line:
if c.isalpha():
word += c
else:
if len(word) > 2:
wrd = list(word)[1:]
random.shuffle(wrd)
nl += word[0] + "".join(wrd)
else:
nl += word
word = ""
nl += c
print(nl, end="")

23
hm/scripts/letrtes Executable file
View file

@ -0,0 +1,23 @@
#!/usr/bin/env nix-shell
#! nix-shell -i python3 --pure
#! nix-shell -p python3
import random
import sys
for line in sys.stdin:
nl = ""
word = ""
for c in line:
if c.isalpha():
word += c
else:
if len(word) > 3:
wrd = list(word)[1:-1]
random.shuffle(wrd)
nl += word[0] + "".join(wrd) + word[-1]
else:
nl += word
word = ""
nl += c
print(nl, end="")

32
hm/scripts/lip Executable file
View file

@ -0,0 +1,32 @@
#!/usr/bin/env nix-shell
#! nix-shell -i bash --pure
#! nix-shell -p bash jq curl findutils coreutils
set -euo pipefail
url="https://ip.frogeye.fr/json"
cachedir="$HOME/.cache/lip"
ip="${1:-}"
jq_sel="del(.user_agent)"
if [ $# -gt 1 ]
then
shift
jq_sel="$@"
fi
if [ -n "$ip" ]
then
cachefile="$cachedir/$ip"
if ! find "$cachefile" -mtime -7 &> /dev/null
then
mkdir -p "$cachedir"
curl --silent "$url?ip=$ip" > "$cachefile"
fi
cat "$cachefile" | jq $jq_sel
else
curl --silent "$url" | jq $jq_sel
fi

24
hm/scripts/lorem Executable file
View file

@ -0,0 +1,24 @@
#!/usr/bin/env nix-shell
#! nix-shell -i bash --pure
#! nix-shell -p bash
# Generates Lorem Ipsum
original="Lorem ipsum dolor sit amet, consectetur adipiscing elit. Sed non risus. Suspendisse lectus tortor, dignissim sit amet, adipiscing nec, ultricies sed, dolor. Cras elementum ultrices diam. Maecenas ligula massa, varius a, semper congue, euismod non, mi. Proin porttitor, orci nec nonummy molestie, enim est eleifend mi, non fermentum diam nisl sit amet erat. Duis semper. Duis arcu massa, scelerisque vitae, consequat in, pretium a, enim. Pellentesque congue. Ut in risus volutpat libero pharetra tempor. Cras vestibulum bibendum augue. Praesent egestas leo in pede. Praesent blandit odio eu enim. Pellentesque sed dui ut augue blandit sodales. Vestibulum ante ipsum primis in faucibus orci luctus et ultrices posuere cubilia Curae; Aliquam nibh. Mauris ac mauris sed pede pellentesque fermentum. Maecenas adipiscing ante non diam sodales hendrerit.
Ut velit mauris, egestas sed, gravida nec, ornare ut, mi. Aenean ut orci vel massa suscipit pulvinar. Nulla sollicitudin. Fusce varius, ligula non tempus aliquam, nunc turpis ullamcorper nibh, in tempus sapien eros vitae ligula. Pellentesque rhoncus nunc et augue. Integer id felis. Curabitur aliquet pellentesque diam. Integer quis metus vitae elit lobortis egestas. Lorem ipsum dolor sit amet, consectetuer adipiscing elit. Morbi vel erat non mauris convallis vehicula. Nulla et sapien. Integer tortor tellus, aliquam faucibus, convallis id, congue eu, quam. Mauris ullamcorper felis vitae erat. Proin feugiat, augue non elementum posuere, metus purus iaculis lectus, et tristique ligula justo vitae magna.
Aliquam convallis sollicitudin purus. Praesent aliquam, enim at fermentum mollis, ligula massa adipiscing nisl, ac euismod nibh nisl eu lectus. Fusce vulputate sem at sapien. Vivamus leo. Aliquam euismod libero eu enim. Nulla nec felis sed leo placerat imperdiet. Aenean suscipit nulla in justo. Suspendisse cursus rutrum augue. Nulla tincidunt tincidunt mi. Curabitur iaculis, lorem vel rhoncus faucibus, felis magna fermentum augue, et ultricies lacus lorem varius purus. Curabitur eu amet.
"
repet=$1
if [ -z $repet ]; then
repet=1
fi
for i in $(seq 1 $repet)
do
echo -e "$original"
done

60
hm/scripts/mediaDuration Executable file
View file

@ -0,0 +1,60 @@
#!/usr/bin/env nix-shell
#! nix-shell -i python3 --pure
#! nix-shell -p python3 python3Packages.coloredlogs ffmpeg
import logging
import os
import subprocess
import sys
import coloredlogs
coloredlogs.install(level="DEBUG", fmt="%(levelname)s %(message)s")
log = logging.getLogger()
def duration_file(path: str) -> float:
cmd = [
"ffprobe",
"-v",
"error",
"-show_entries",
"format=duration",
"-of",
"default=noprint_wrappers=1:nokey=1",
path,
]
run = subprocess.run(cmd, stdout=subprocess.PIPE)
ret = run.stdout.decode().strip()
if run.returncode != 0:
log.warning(f"{path}: unable to get duration")
elif ret == "N/A":
log.warning(f"{path}: has no duration")
else:
try:
return float(ret)
except ValueError:
log.error(f"{path}: returned {ret}")
return 0
def duration_directory(path: str) -> float:
total = 0.0
for root, dirs, files in os.walk(path):
for f in files:
fullPath = os.path.join(root, f)
total += duration_file(fullPath)
return total
total = 0.0
for arg in sys.argv[1:]:
if os.path.isfile(arg):
total += duration_file(arg)
elif os.path.isdir(arg):
total += duration_directory(arg)
else:
raise FileNotFoundError(f"No such file or directory: '{arg}'")
print(total)

34
hm/scripts/music_remove_dashes Executable file
View file

@ -0,0 +1,34 @@
#!/usr/bin/env nix-shell
#! nix-shell -i python3 --pure
#! nix-shell -p python3
"""
Small script to convert music files in the form:
$(tracknumber) - $(title).$(ext)
to the form
$(tracknumber) $(title).$(ext)
(note the absence of dash)
"""
import os
import re
def main() -> None:
"""
Function that executes the script.
"""
for root, _, files in os.walk("."):
for filename in files:
match = re.match(r"^(\d+) - (.+)$", filename)
if not match:
continue
new_filename = f"{match[1]} {match[2]}"
old_path = os.path.join(root, filename)
new_path = os.path.join(root, new_filename)
print(old_path, "->", new_path)
os.rename(old_path, new_path)
if __name__ == "__main__":
main()

12
hm/scripts/nuit Executable file
View file

@ -0,0 +1,12 @@
#!/usr/bin/env bash
# FIXME
if [ "$(cat /etc/hostname)" = "curacao.geoffrey.frogeye.fr" ]
then
echo 1 | sudo tee /sys/class/backlight/intel_backlight/brightness
elif [ "$(cat /etc/hostname)" = "pindakaas.geoffrey.frogeye.fr" ]
then
echo 700 | sudo tee /sys/class/backlight/edp-backlight/brightness
fi
automatrop -e base16_scheme=solarized-dark --tags color

84
hm/scripts/o Executable file
View file

@ -0,0 +1,84 @@
#!/usr/bin/env nix-shell
#! nix-shell -i python3
#! nix-shell -p python3 python3Packages.magic xdg-utils feh zathura
# pylint: disable=C0103
"""
Find the subjectively best software
to open the file given in arguments with.
Doesn't use that XDG mess (only in last resort).
"""
import os
import subprocess
import sys
import tempfile
import urllib.request
import magic
# Getting what's needed
path = sys.argv[1]
# Getting the MIME type
ishttp = path.startswith("http")
buf = None
if ishttp:
buf = urllib.request.urlopen(path)
chunk = buf.read(1024)
fmagic = magic.detect_from_content(chunk)
else:
assert os.path.isfile(path), f"Not a file: {path}"
path = os.path.realpath(path)
fmagic = magic.detect_from_filename(path)
mime = tuple(fmagic.mime_type.split("/"))
assert len(mime) == 2
graphical = os.environ.get("DISPLAY")
# Some energumens
if mime[0] == "application" and mime[1] in ("json", "javascript"):
mime = ("text", mime[1])
# Determine stuff
ex = None # Executable needed to open the file
forcelocal = False # If we need to copy the file locally before opening it
isterm = False # Executable should run in a terminal
if mime[0] == "text":
if not ishttp:
ex = os.environ.get("VISUAL" if graphical else "EDITOR", None)
isterm = True
elif mime[0] == "image":
ex = "feh"
elif mime[0] in ("audio", "video"):
ex = "mpv"
isterm = True
elif mime == ("application", "pdf"):
ex = "zathura"
forcelocal = True
# Open stuff
tmp = None
if ex:
if forcelocal and ishttp:
assert buf
tmp = tempfile.NamedTemporaryFile(prefix="o")
tmp.write(chunk)
tmp.write(buf.read())
path = tmp.name
else:
ex = "xdg-open"
if ishttp:
ex = os.environ.get("BROWSER", ex)
if buf:
buf.close()
# TODO Launch a new terminal window for some
assert ex
p = subprocess.run([ex, path])
if tmp:
tmp.close()
sys.exit(p.returncode)

192
hm/scripts/optimize Executable file
View file

@ -0,0 +1,192 @@
#!/usr/bin/env nix-shell
#! nix-shell -i bash --pure
#! nix-shell -p bash coreutils imagemagick libjpeg optipng ffmpeg diffutils
# Optimizes everything the script can find in a folder,
# meaning it will compress files as much as possible,
# without losing any data (verification will be done
# in order to verify that no data has been done)
# (executable)
# TODO Run in parallel
# TODO Lots of dupplicated code there
# TODO Maybe replace part with https://github.com/toy/image_optim?
SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
dir=${1:-$PWD}
total=$(mktemp)
echo -n 0 > $total
function showtotal {
echo "Total saved: $(cat "$total") bytes"
rm $total
exit
}
trap showtotal SIGTERM SIGINT SIGFPE
function doReplace { # candidate original
mv "$c" "$o"
saved=$(($os - $cs))
perc=$((100 * $saved / $os))
echo "→ $os ⇒ $cs (saved $saved bytes, or ${perc}%)"
newtotal=$(($(cat $total) + $saved))
echo -n $newtotal > $total
}
function replace { # candidate original
c="$1"
o="$2"
# File verifications
if [ ! -f "$o" ]; then
echo "→ Original is inexistant, skipping!"
return
fi
if [ ! -f "$c" ]; then
echo "→ Candidate is inexistant, skipping!"
return
fi
# Size verifications
cs=$(wc -c "$c" | cut -d' ' -f1)
os=$(wc -c "$o" | cut -d' ' -f1)
if [ $cs -le 0 ]; then
echo "→ Candidate is empty, skipping!"
rm "$c"
return
fi
if [ $cs -eq $os ]; then
echo "→ Candidate weight the same, skipping."
rm "$c"
return
fi
if [ $cs -gt $os ]; then
echo "→ Candidate is larger, skipping."
rm "$c"
return
fi
doReplace "$c" "$o"
}
function replaceImg { # candidate original
# With bitmap verification
c="$1"
o="$2"
# File verifications
if [ ! -f "$o" ]; then
echo "→ Original is inexistant, skipping!"
return
fi
if [ ! -f "$c" ]; then
echo "→ Candidate is inexistant, skipping!"
return
fi
# Size verifications
cs=$(wc -c "$c" | cut -d' ' -f1)
os=$(wc -c "$o" | cut -d' ' -f1)
if [ $cs -le 0 ]; then
echo "→ Candidate is empty, skipping!"
rm "$c"
return
fi
if [ $cs -eq $os ]; then
echo "→ Candidate weight the same, skipping."
rm "$c"
return
fi
if [ $cs -gt $os ]; then
echo "→ Candidate is larger, skipping."
rm "$c"
return
fi
# Bitmap verification
ppmc="$(mktemp --suffix .ppm)"
ppmo="$(mktemp --suffix .ppm)"
convert "$c" "$ppmc"
convert "$o" "$ppmo"
if cmp --silent "$ppmo" "$ppmc"; then
doReplace "$c" "$o"
else
echo "→ Candidate don't have the same bit map as original, skipping!"
fi
rm -f "$ppmc" "$ppmo" "$c"
}
# JPEG (requires jpegtran)
while read image
do
if [ -z "$image" ]; then continue; fi
echo Processing $image
prog=$(mktemp --suffix .jpg)
jpegtran -copy all -progressive "$image" > "$prog"
echo "→ Progressive done"
progs=$(wc -c "$prog" | cut -d' ' -f1)
replace "$prog" "$image"
done <<< "$(find "$dir/" -type f -iregex ".+.jpe?g$")"
# PNG (requires optipng)
while read image
do
if [ -z "$image" ]; then continue; fi
echo Processing $image
temp=$(mktemp --suffix .png)
cp "$image" "$temp"
optipng -quiet "$temp"
echo "→ Optimize done"
replace "$temp" "$image"
done <<< "$(find "$dir/" -type f -iname "*.png")"
# FLAC (requires ffmpeg)
while read music
do
if [ -z "$music" ]; then continue; fi
echo Processing $music
temp=$(mktemp --suffix .flac)
cp "$music" "$temp"
ffmpeg -8 -o "$temp"
echo "→ Optimize done"
replace "$temp" "$music"
done <<< "$(find "$dir/" -type f -iname "*.flac")"
# # SVG (requires scour)
# while read image
# do
# if [ -z "$image" ]; then continue; fi
# echo Processing $image
#
# temp=$(mktemp --suffix .svg)
# scour --quiet "$image" "$temp" --no-line-breaks
# echo "→ Optimize done"
#
# replaceImg "$temp" "$image"
#
# done <<< "$(find "$dir/" -type f -iname "*.svg")"
# NOTE Explicitely disabled since:
# - I only have ~50 MiB of SVG in TOTAL
# - Most conversions are not image losseless
# - Even when they are losseless, they are mostly worthless
# - I might want to keep editor data and/or ids for some of them
# So rather use scour explicitely when needed
${SCRIPT_DIR}/cleandev
showtotal

64
hm/scripts/overpdf Executable file
View file

@ -0,0 +1,64 @@
#!/usr/bin/env nix-shell
#! nix-shell -i bash --pure
#! nix-shell -p bash pdftk inkscape gnused coreutils file
# Utility to write over a PDF file pages
# TODO Inkscape vodoo: Put the original in its own layer and skip when merging
orig_path="$1"
orig_dir="$(dirname "$orig_path")"
orig_file="$(basename "$orig_path")"
orig_ext="${orig_file##*.}"
orig_name="${orig_file%.*}"
wdir_file="${orig_name}_src.${orig_ext}"
wdir_path="${orig_dir}/${wdir_file}"
if [ -d "$wdir_path" ]
then
echo "Source directory $wdir_path found"
ls "${wdir_path}/"*"_og.pdf" | while read page_orig_path
do
page_stmp_svg="$(echo "$page_orig_path" | sed 's|_og\.pdf$|_fg\.svg|')"
page_stmp_pdf="$(echo "$page_orig_path" | sed 's|_og\.pdf$|_fg\.pdf|')"
page_fin_pdf="$(echo "$page_orig_path" | sed 's|_og\.pdf$|_fin\.pdf|')"
if [ -f "$page_stmp_svg" ]
then
echo "Processing $page_orig_path (applying stamp)"
inkscape "$page_stmp_svg" --export-filename "$page_stmp_pdf"
pdftk "$page_orig_path" stamp "$page_stmp_pdf" output "$page_fin_pdf"
else
echo "Processing $page_orig_path (copying)"
cp "$page_orig_path" "$page_fin_pdf"
fi
done
echo "Merging everything back to ${orig_path}."
pdftk "${wdir_path}/"*"_fin.pdf" output "$orig_path"
echo "Deleting temporary files."
rm "${wdir_path}/"*"_fin.pdf" "${wdir_path}/"*"_fg.pdf"
echo "Done."
elif [ -f "$orig_path" ]
then
if [ "$(file --mime-type --brief "$orig_path")" != "application/pdf" ]
then
echo "${orig_path}: not a PDF file"
exit 1
fi
echo "Creating source directory $wdir_path with original pages and template SVGs"
mkdir "$wdir_path"
pdftk "$orig_file" burst output "${wdir_path}/${orig_name}_%03d_og.pdf"
ls "${wdir_path}/"*"_og.pdf" | while read page_orig_path
do
page_stmp_svg="$(echo "$page_orig_path" | sed 's|_og\.pdf$|_fg\.svg|')"
echo "Processing $page_orig_path"
inkscape "$page_orig_path" --export-plain-svg --export-filename "$page_stmp_svg"
done
echo "Done. Make sure to edit in a a new layer in Inkscape and hide the original one."
else
echo "${orig_path}: no such file or directory"
exit 1
fi

10
hm/scripts/pdfpages Executable file
View file

@ -0,0 +1,10 @@
#!/usr/bin/env nix-shell
#! nix-shell -i bash --pure
#! nix-shell -p bash pdftk gnugrep gawk
# From https://stackoverflow.com/a/14736593
for FILE in "$@"
do
printf "$FILE: "
pdftk "$FILE" dump_data | grep NumberOfPages | awk '{print $2}'
done

51
hm/scripts/pdfrename Executable file
View file

@ -0,0 +1,51 @@
#!/usr/bin/env nix-shell
#! nix-shell -i bash --pure
#! nix-shell -p bash pdftk coreutils
# Change the title of a PDF file
if [[ -z "$1" || -z "$2" ]]; then
echo "Usage: $0 FILE TITLE [CREATOR [PRODUCER]]"
echo
echo "Arguments:"
echo " FILE Path to the PDF document"
echo " TITLE Content of the Title tag"
echo " CREATOR Content of the Creator tag"
echo " PRODUCER Title to give"
exit 1
fi
file="$1"
title="$2"
creator="$3"
producer="$4"
if [ ! -f "$1" ]; then
echo "No such file or directory: $1" >> /dev/stderr
exit 2
fi
instructions=$(mktemp)
echo "InfoBegin" >> "$instructions"
echo "InfoKey: Title" >> "$instructions"
echo "InfoValue: $title" >> "$instructions"
if [ -n "$creator" ]; then
echo "InfoBegin" >> "$instructions"
echo "InfoKey: Creator" >> "$instructions"
echo "InfoValue: $creator" >> "$instructions"
fi
if [ -n "$producer" ]; then
echo "InfoBegin" >> "$instructions"
echo "InfoKey: Producer" >> "$instructions"
echo "InfoValue: $producer" >> "$instructions"
fi
copy=$(mktemp)
cp "$file" "$copy"
pdftk "$copy" update_info "$instructions" output "$file"
rm "$instructions" "$copy"

114
hm/scripts/picture_name_date Executable file
View file

@ -0,0 +1,114 @@
#!/usr/bin/env nix-shell
#! nix-shell -i python3 --pure
#! nix-shell -p python3 python3Packages.coloredlogs python3Packages.exifread
import argparse
import datetime
import logging
import os
import re
import typing
import coloredlogs
import exifread
log = logging.getLogger(__name__)
coloredlogs.install(level="DEBUG", fmt="%(levelname)s %(message)s", logger=log)
EXTENSION_PATTERN = re.compile(r"\.(JPE?G|DNG)", re.I)
COMMON_PATTERN = re.compile(r"(IMG|DSC[NF]?|100|P10|f|t)_?\d+", re.I)
EXIF_TAG_ID = 0x9003 # DateTimeOriginal
EXIF_DATE_FORMAT = "%Y:%m:%d %H:%M:%S"
def get_pictures(directory: str = ".", skip_renamed: bool = True) -> typing.Generator:
for root, _, files in os.walk(directory):
for filename in files:
filename_trunk, extension = os.path.splitext(filename)
if not re.match(EXTENSION_PATTERN, extension):
continue
if skip_renamed:
if not re.match(COMMON_PATTERN, filename_trunk):
continue
full_path = os.path.join(root, filename)
yield full_path
def main(args: argparse.Namespace) -> None:
log.warning("Counting files...")
kwargs = {"directory": args.dir, "skip_renamed": args.skip_renamed}
log.warning("Processing files...")
for full_path in get_pictures(**kwargs):
# Find date
with open(full_path, "rb") as fd:
exif_data = exifread.process_file(fd)
if not exif_data:
log.warning(f"{full_path} does not have EXIF data")
for ifd_tag in exif_data.values():
if ifd_tag.tag == EXIF_TAG_ID:
date_raw = ifd_tag.values
break
else:
log.warning(f"{full_path} does not have required EXIF tag")
continue
date = datetime.datetime.strptime(date_raw, EXIF_DATE_FORMAT)
# Determine new filename
ext = os.path.splitext(full_path)[1].lower()
if ext == ".jpeg":
ext = ".jpg"
new_name = date.isoformat().replace(":", "-").replace("T", "_") + args.suffix
# First substitution is to allow images being sent to a NTFS filesystem
# Second substitution is for esthetics
new_path = os.path.join(args.dir, f"{new_name}{ext}")
# TODO Allow keeping image in same folder
i = 0
while os.path.exists(new_path):
if full_path == new_path:
break
log.debug(f"{new_path} already exists, incrementing")
i += 1
new_path = os.path.join(args.dir, f"{new_name}_{i}{ext}")
# Rename file
if full_path == new_path:
log.debug(f"{full_path} already at required filename")
continue
log.info(f"{full_path} →\t{new_path}")
if os.path.exists(new_path):
raise FileExistsError(f"Won't overwrite {new_path}")
if not args.dry:
os.rename(full_path, new_path)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Rename images based on their dates")
parser.add_argument(
"dir",
metavar="DIRECTORY",
type=str,
default=".",
nargs="?",
help="Directory containing the pictures",
)
parser.add_argument(
"-d",
"--dry",
action="store_true",
help="Do not actually rename, just show old and new path",
)
parser.add_argument(
"-r",
"--skip-renamed",
action="store_true",
help="Skip images whose filename doesn't match usual camera output filenames.",
)
parser.add_argument(
"-s",
"--suffix",
default="",
help="Text to add before the extension",
)
args = parser.parse_args()
main(args)

89
hm/scripts/pushToTalk Executable file
View file

@ -0,0 +1,89 @@
#!/usr/bin/env nix-shell
#! nix-shell -i python3 --pure
#! nix-shell -p python3 python3Packages.pulsectl python3Packages.xlib
import sys
import pulsectl
from Xlib import XK, X, display
from Xlib.ext import record
from Xlib.protocol import rq
KEY = XK.XK_F7
def mute(state):
with pulsectl.Pulse("list-source") as pulse:
for source in pulse.source_list():
if source.port_active:
if source.mute != state:
pulse.mute(source, state)
print(f"{source.name} {'un' if not state else ''}muted")
mute(True)
local_dpy = display.Display()
record_dpy = display.Display()
def record_callback(reply):
if reply.category != record.FromServer:
return
if reply.client_swapped:
print("* received swapped protocol data, cowardly ignored")
return
if not len(reply.data) or reply.data[0] < 2:
# not an event
return
data = reply.data
while len(data):
event, data = rq.EventField(None).parse_binary_value(
data, record_dpy.display, None, None
)
if event.type in [X.KeyPress, X.KeyRelease]:
keysym = local_dpy.keycode_to_keysym(event.detail, 0)
print(KEY)
if keysym == KEY:
mute(event.type == X.KeyRelease)
# Check if the extension is present
if not record_dpy.has_extension("RECORD"):
print("RECORD extension not found")
sys.exit(1)
r = record_dpy.record_get_version(0, 0)
print("RECORD extension version %d.%d" % (r.major_version, r.minor_version))
# Create a recording context; we only want key and mouse events
ctx = record_dpy.record_create_context(
0,
[record.AllClients],
[
{
"core_requests": (0, 0),
"core_replies": (0, 0),
"ext_requests": (0, 0, 0, 0),
"ext_replies": (0, 0, 0, 0),
"delivered_events": (0, 0),
"device_events": (X.KeyPress, X.MotionNotify),
"errors": (0, 0),
"client_started": False,
"client_died": False,
}
],
)
# Enable the context; this only returns after a call to record_disable_context,
# while calling the callback function in the meantime
try:
record_dpy.record_enable_context(ctx, record_callback)
except KeyboardInterrupt:
local_dpy.record_disable_context(ctx)
local_dpy.flush()
# Finally free the context
record_dpy.record_free_context(ctx)

70
hm/scripts/raw_move_precomp Executable file
View file

@ -0,0 +1,70 @@
#!/usr/bin/env nix-shell
#! nix-shell -i python3 --pure
#! nix-shell -p python3 python3Packages.coloredlogs
"""
Same as picture_name_date
except it's tailored for OpenCamera.
It uses filenames, that way:
- Easier to get metadata
- JPG/DNG, MP4/SRT keep the same filename
"""
import argparse
import logging
import os
import coloredlogs
log = logging.getLogger(__name__)
coloredlogs.install(level="DEBUG", fmt="%(levelname)s %(message)s", logger=log)
RAW_EXTENSIONS = [".dng"]
PRECOMP_EXTENSIONS = [".jpg", ".jpeg"]
PRECOMP_DIRECTORY = ".precomp"
def main(args: argparse.Namespace) -> None:
for root, _, files in os.walk(args.dir):
for filename in files:
raw_path = os.path.join(root, filename)
basename, raw_ext = os.path.splitext(filename)
if raw_ext.lower() not in RAW_EXTENSIONS:
log.debug(f"{raw_path} isn't a RAW file")
continue
# TODO Search for upper case extension
for precomp_ext in PRECOMP_EXTENSIONS:
precomp_filename = basename + precomp_ext
precomp_path = os.path.join(root, precomp_filename)
if not os.path.exists(precomp_path):
continue
precomp_dir = os.path.join(root, PRECOMP_DIRECTORY)
precomp_dest = os.path.join(precomp_dir, precomp_filename)
log.info(f"{precomp_path} -> {precomp_dest} because of {raw_path}")
if not args.dry:
os.makedirs(precomp_dir, exist_ok=True)
os.rename(precomp_path, precomp_dest)
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Move pre-composed JPEG to a directory "
"when a matching raw picture is found"
)
parser.add_argument(
"dir",
metavar="DIRECTORY",
type=str,
default=".",
nargs="?",
help="Directory containing the pictures",
)
parser.add_argument(
"-d",
"--dry",
action="store_true",
help="Do not actually rename, just show old and new path",
)
args = parser.parse_args()
main(args)

27
hm/scripts/rep Executable file
View file

@ -0,0 +1,27 @@
#!/usr/bin/env nix-shell
#! nix-shell -i bash --pure
#! nix-shell -p bash coreutils
# Moves a file to another place and put a symbolic link in place
function rep_help {
echo "Usage: $0 SOURCE DEST"
echo
echo "Arguments:"
echo " SOURCE File to be moved"
echo " DEST Where to be moved"
return 0
}
mv "$1" "$2"
ln -s "$2" "$1"
# MAIN
command="$1"
shift
if type "rep_$command" &> /dev/null; then
"rep_$command" "$@"
else
rep_help
fi

74
hm/scripts/replayGain Executable file
View file

@ -0,0 +1,74 @@
#!/usr/bin/env nix-shell
#! nix-shell -i python3 --pure
#! nix-shell -p python3 python3Packages.coloredlogs python3Packages.r128gain
# TODO r128gain is not maintainted anymore
# Normalisation is done at the default of each program,
# which is usually -89.0 dB
# TODO The simplifications/fixes I've done makes it consider
# multi-discs albums as multiple albums
import logging
import os
import sys
import typing
import coloredlogs
import r128gain
coloredlogs.install(level="DEBUG", fmt="%(levelname)s %(message)s")
log = logging.getLogger()
# TODO Remove debug
# Constants
FORCE = "-f" in sys.argv
if FORCE:
sys.argv.remove("-f")
if len(sys.argv) >= 2:
SOURCE_FOLDER = os.path.realpath(sys.argv[1])
else:
SOURCE_FOLDER = os.path.join(os.path.expanduser("~"), "Musiques")
def isMusic(f: str) -> bool:
ext = os.path.splitext(f)[1][1:].lower()
return ext in r128gain.AUDIO_EXTENSIONS
# Get album paths
log.info("Listing albums and tracks")
albums = list()
singleFiles = list()
for root, dirs, files in os.walk(SOURCE_FOLDER):
folder_has_music = False
for f in files:
if isMusic(f):
folder_has_music = True
fullPath = os.path.join(root, f)
singleFiles.append(fullPath)
if folder_has_music:
albums.append(root)
# log.info("Processing single files")
# r128gain.process(singleFiles, album_gain=False,
# skip_tagged=not FORCE, report=True)
for album in albums:
albumName = os.path.relpath(album, SOURCE_FOLDER)
log.info("Processing album {}".format(albumName))
musicFiles = list()
for f in os.listdir(album):
if isMusic(f):
fullPath = os.path.join(album, f)
musicFiles.append(fullPath)
if not musicFiles:
continue
r128gain.process(musicFiles, album_gain=True, skip_tagged=not FORCE, report=True)
print("==============================")

501
hm/scripts/rmf Executable file
View file

@ -0,0 +1,501 @@
#!/usr/bin/env nix-shell
#! nix-shell -i python3 --pure
#! nix-shell -p python3 python3Packages.coloredlogs python3Packages.progressbar2
# Handles sync-conflict files
import argparse
import logging
import os
import pickle
import re
import sys
import zlib
import coloredlogs
import progressbar
progressbar.streams.wrap_stderr()
coloredlogs.install(level="INFO", fmt="%(levelname)s %(message)s")
log = logging.getLogger()
# 1) Create file list with conflict files
# 2) Gather file informations (date, owner, size, checksum)
# 3) Propose what to do
def sizeof_fmt(num, suffix="B"):
# Stolen from https://stackoverflow.com/a/1094933
for unit in ["", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"]:
if abs(num) < 1024.0:
return "%3.1f %s%s" % (num, unit, suffix)
num /= 1024.0
return "%.1f %s%s" % (num, "Yi", suffix)
class Table:
def __init__(self, width, height):
self.width = width
self.height = height
self.data = [["" for _ in range(self.height)] for _ in range(self.width)]
def set(self, x, y, data):
self.data[x][y] = str(data)
def print(self):
widths = [max([len(cell) for cell in column]) for column in self.data]
for y in range(self.height):
for x in range(self.width):
cell = self.data[x][y]
l = len(cell)
width = widths[x]
if x > 0:
cell = " | " + cell
cell = cell + " " * (width - l)
print(cell, end="\t")
print()
class Database:
VERSION = 1
CONFLICT_PATTERN = re.compile("\.sync-conflict-\d{8}-\d{6}-\w{7}")
def __init__(self, directory):
self.version = Database.VERSION
self.directory = directory
self.data = dict()
def prune(self):
toPrune = list()
for filepath, databaseFile in self.data.items():
databaseFile.migrate() # TODO Temp dev stuff
databaseFile.prune()
if not databaseFile.isRelevant():
toPrune.append(filepath)
for filepath in toPrune:
del self.data[filepath]
def nbFiles(self):
return sum(databaseFile.nbFiles() for databaseFile in self.data.values())
def totalSize(self):
return sum(databaseFile.totalSize() for databaseFile in self.data.values())
def maxSize(self):
return sum(databaseFile.maxSize() for databaseFile in self.data.values())
def totalChecksumSize(self):
return sum(
databaseFile.totalChecksumSize() for databaseFile in self.data.values()
)
def getList(self):
self.prune()
log.info("Finding conflict files")
widgets = [
progressbar.AnimatedMarker(),
" ",
progressbar.BouncingBar(),
" ",
progressbar.DynamicMessage("conflicts"),
" ",
progressbar.DynamicMessage("files"),
" ",
progressbar.DynamicMessage("dir", width=20, precision=20),
" ",
progressbar.Timer(),
]
bar = progressbar.ProgressBar(widgets=widgets).start()
f = 0
for root, dirs, files in os.walk(self.directory):
for conflictFilename in files:
f += 1
if not Database.CONFLICT_PATTERN.search(conflictFilename):
continue
filename = Database.CONFLICT_PATTERN.sub("", conflictFilename)
key = (root, filename)
if key in self.data:
dataFile = self.data[key]
else:
dataFile = DatabaseFile(root, filename)
self.data[key] = dataFile
if filename in files:
dataFile.addConflict(filename)
dataFile.addConflict(conflictFilename)
bar.update(
conflicts=len(self.data), files=f, dir=root[(len(self.directory) + 1) :]
)
bar.finish()
log.info(
f"Found {len(self.data)} conflicts, totalling {self.nbFiles()} conflict files."
)
def getStats(self):
log.info("Getting stats from conflict files")
bar = progressbar.ProgressBar(max_value=self.nbFiles()).start()
f = 0
for databaseFile in self.data.values():
databaseFile.getStats()
f += databaseFile.nbFiles()
bar.update(f)
bar.finish()
log.info(
f"Total file size: {sizeof_fmt(self.totalSize())}, possible save: {sizeof_fmt(self.totalSize() - self.maxSize())}"
)
def getChecksums(self):
log.info("Checksumming conflict files")
widgets = [
progressbar.DataSize(),
" of ",
progressbar.DataSize("max_value"),
" (",
progressbar.AdaptiveTransferSpeed(),
") ",
progressbar.Bar(),
" ",
progressbar.DynamicMessage("dir", width=20, precision=20),
" ",
progressbar.DynamicMessage("file", width=20, precision=20),
" ",
progressbar.Timer(),
" ",
progressbar.AdaptiveETA(),
]
bar = progressbar.DataTransferBar(
max_value=self.totalChecksumSize(), widgets=widgets
).start()
f = 0
for databaseFile in self.data.values():
bar.update(
f,
dir=databaseFile.root[(len(self.directory) + 1) :],
file=databaseFile.filename,
)
f += databaseFile.totalChecksumSize()
try:
databaseFile.getChecksums()
except KeyboardInterrupt:
return
except BaseException as e:
log.error(e, exc_info=True)
pass
bar.finish()
def printDifferences(self):
for databaseFile in self.data.values():
print()
databaseFile.printInfos(diff=True)
def takeAction(self, execute=False, *args, **kwargs):
for databaseFile in self.data.values():
databaseFile.decideAction(*args, **kwargs)
databaseFile.takeAction(execute=execute)
class DatabaseFile:
BLOCK_SIZE = 4096
RELEVANT_STATS = ("st_mode", "st_uid", "st_gid", "st_size", "st_mtime")
def __init__(self, root, filename):
self.root = root
self.filename = filename
self.stats = []
self.conflicts = []
self.checksums = []
self.action = None
log.debug(f"{self.root}/{self.filename} - new")
def addConflict(self, conflict):
if conflict in self.conflicts:
return
self.conflicts.append(conflict)
self.stats.append(None)
self.checksums.append(None)
log.debug(f"{self.root}/{self.filename} - add: {conflict}")
def migrate(self):
# Temp dev stuff since I don't want to resum that whole 400 GiB dir
if self.stats is None:
self.stats = [None] * len(self.conflicts)
try:
if self.checksums is None:
self.checksums = [None] * len(self.conflicts)
except AttributeError:
self.checksums = [None] * len(self.conflicts)
def removeConflict(self, conflict):
f = self.conflicts.index(conflict)
del self.conflicts[f]
del self.stats[f]
del self.checksums[f]
log.debug(f"{self.root}/{self.filename} - del: {conflict}")
def getPath(self, conflict):
return os.path.join(self.root, conflict)
def getPaths(self):
return [self.getPath(conflict) for conflict in self.conflicts]
def prune(self):
toPrune = list()
for conflict in self.conflicts:
if not os.path.isfile(self.getPath(conflict)):
toPrune.append(conflict)
if len(toPrune):
for conflict in toPrune:
self.removeConflict(conflict)
def isRelevant(self):
if len(self.conflicts) == 1:
if self.conflicts[0] == self.filename:
return False
elif len(self.conflicts) < 1:
return False
else:
return True
def nbFiles(self):
return len(self.conflicts)
def totalSize(self):
return sum((stat.st_size if stat is not None else 0) for stat in self.stats)
def maxSize(self):
return max((stat.st_size if stat is not None else 0) for stat in self.stats)
def totalChecksumSize(self):
size = 0
for f, checksum in enumerate(self.checksums):
if checksum is None:
stat = self.stats[f]
if stat is not None:
size += stat.st_size
return size
def getStats(self):
for f, conflict in enumerate(self.conflicts):
oldStat = self.stats[f]
newStat = os.stat(self.getPath(conflict))
oldChecksum = self.checksums[f]
# If it's been already summed, and we have the same inode and same ctime, don't resum
if (
oldStat is None
or not isinstance(oldChecksum, int)
or oldStat.st_size != newStat.st_size
or oldStat.st_dev != newStat.st_dev
or oldStat.st_ino != newStat.st_ino
or oldStat.st_ctime != newStat.st_ctime
or oldStat.st_dev != newStat.st_dev
):
self.checksums[f] = None
self.stats[f] = newStat
# If all the file are of different size, set as different files
if len(self.stats) == len(set([s.st_size for s in self.stats])):
self.checksums = [False] * len(self.conflicts)
# If all the files are the same inode, set as same files
if (
len(set([s.st_ino for s in self.stats])) == 1
and len(set([s.st_dev for s in self.stats])) == 1
):
self.checksums = [True] * len(self.conflicts)
def getChecksums(self):
# TODO It's not even required to have a sum, this thing is not collision resistant now
# TODO We might use BTRFS feature to know if conflict files are deduplicated between them
filedescs = dict()
for f, conflict in enumerate(self.conflicts):
if self.checksums[f] is not None:
continue
self.checksums[f] = 1
filedescs[f] = open(self.getPath(conflict), "rb")
while len(filedescs):
toClose = set()
# Compute checksums for next block for all files
for f, filedesc in filedescs.items():
data = filedesc.read(DatabaseFile.BLOCK_SIZE)
self.checksums[f] = zlib.adler32(data, self.checksums[f])
if len(data) < DatabaseFile.BLOCK_SIZE:
toClose.add(f)
# Stop summing as soon as checksum diverge
for f in filedescs.keys():
if self.checksums.count(self.checksums[f]) < 2:
toClose.add(f)
for f in toClose:
filedescs[f].close()
del filedescs[f]
def getFeatures(self):
features = dict()
features["name"] = self.conflicts
features["sum"] = self.checksums
for statName in DatabaseFile.RELEVANT_STATS:
# Rounding beause I Syncthing also rounds
features[statName] = [
int(stat.__getattribute__(statName)) for stat in self.stats
]
return features
def getDiffFeatures(self):
features = self.getFeatures()
diffFeatures = dict()
for key, vals in features.items():
if len(set(vals)) > 1:
diffFeatures[key] = vals
return diffFeatures
@staticmethod
def shortConflict(conflict):
match = Database.CONFLICT_PATTERN.search(conflict)
if match:
return match[0][15:]
else:
return "-"
def printInfos(self, diff=True):
print(os.path.join(self.root, self.filename))
if diff:
features = self.getDiffFeatures()
else:
features = self.getFeatures()
features["name"] = [DatabaseFile.shortConflict(c) for c in self.conflicts]
table = Table(len(features), len(self.conflicts) + 1)
for x, featureName in enumerate(features.keys()):
table.set(x, 0, featureName)
for x, featureName in enumerate(features.keys()):
for y in range(len(self.conflicts)):
table.set(x, y + 1, features[featureName][y])
table.print()
def decideAction(self, mostRecent=False):
# TODO More arguments for choosing
reason = "undecided"
self.action = None
if len(self.conflicts) == 1:
self.action = 0
reason = "only file"
else:
features = self.getDiffFeatures()
if len(features) == 1:
reason = "same files"
self.action = 0
elif "st_mtime" in features and mostRecent:
recentTime = features["st_mtime"][0]
recentIndex = 0
for index, time in enumerate(features["st_mtime"]):
if time > recentTime:
recentTime = time
recentIndex = 0
self.action = recentIndex
reason = "most recent"
if self.action is None:
log.warning(f"{self.root}/{self.filename}: skip, cause: {reason}")
else:
log.info(
f"{self.root}/{self.filename}: keep {DatabaseFile.shortConflict(self.conflicts[self.action])}, cause: {reason}"
)
def takeAction(self, execute=False):
if self.action is None:
return
actionName = self.conflicts[self.action]
if actionName != self.filename:
log.debug(
f"Rename {self.getPath(actionName)} → {self.getPath(self.filename)}"
)
if execute:
os.rename(self.getPath(actionName), self.getPath(self.filename))
for conflict in self.conflicts:
if conflict is actionName:
continue
log.debug(f"Delete {self.getPath(conflict)}")
if execute:
os.unlink(self.getPath(conflict))
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Handle Syncthing's .sync-conflict files "
)
# Execution flow
parser.add_argument(
"directory", metavar="DIRECTORY", nargs="?", help="Directory to analyse"
)
parser.add_argument("-d", "--database", help="Database path for file informations")
parser.add_argument(
"-r",
"--most-recent",
action="store_true",
help="Always keep the most recent version",
)
parser.add_argument(
"-e", "--execute", action="store_true", help="Really apply changes"
)
parser.add_argument(
"-p",
"--print",
action="store_true",
help="Only print differences between files",
)
args = parser.parse_args()
# Argument default values attribution
if args.directory is None:
args.directory = os.curdir
args.directory = os.path.realpath(args.directory)
# Create / load the database
database = None
if args.database:
if os.path.isfile(args.database):
try:
with open(args.database, "rb") as databaseFile:
database = pickle.load(databaseFile)
assert isinstance(database, Database)
except BaseException as e:
raise ValueError("Not a database file")
assert (
database.version <= Database.VERSION
), "Version of the loaded database is too recent"
assert (
database.directory == args.directory
), "Directory of the loaded database doesn't match"
if database is None:
database = Database(args.directory)
def saveDatabase():
if args.database:
global database
with open(args.database, "wb") as databaseFile:
pickle.dump(database, databaseFile)
database.getList()
saveDatabase()
database.getStats()
saveDatabase()
database.getChecksums()
saveDatabase()
if args.print:
database.printDifferences()
else:
database.takeAction(mostRecent=args.most_recent, execute=args.execute)

712
hm/scripts/rssVideos Executable file
View file

@ -0,0 +1,712 @@
#!/usr/bin/env nix-shell
#! nix-shell -i python3
#! nix-shell -p python3 python3Packages.coloredlogs python3Packages.configargparse python3Packages.filelock python3Packages.filelock python3Packages.requests python3Packages.yt-dlp ffmpeg
# Also needs mpv but if I put it there it's not using the configured one
"""
Script that download videos that are linked as an article
in a RSS feed.
The common use case would be a feed from an RSS aggregator
with the unread items (non-video links are ignored).
"""
import datetime
import functools
import logging
import os
import pickle
import random
import re
import subprocess
import sys
import time
import typing
import coloredlogs
import configargparse
import filelock
import requests
import yt_dlp
log = logging.getLogger(__name__)
def configure_logging(args: configargparse.Namespace) -> None:
# Configure logging
if args.verbosity:
coloredlogs.install(
level=args.verbosity,
)
else:
coloredlogs.install(
fmt="%(message)s",
logger=log,
)
class SaveInfoPP(yt_dlp.postprocessor.common.PostProcessor):
"""
yt_dlp.process_ie_result() doesn't return a completely updated info dict,
notably the extension is still the one before it realizes the files cannot
be merged. So we use this PostProcessor to catch the info dict in its final
form and save what we need from it (it's not serializable in this state).
"""
def __init__(self, rvelement: "RVElement") -> None:
self.rvelement = rvelement
super().__init__()
def run(self, info: dict) -> tuple[list, dict]:
self.rvelement.update_post_download(info)
return [], info
def parse_duration(string: str) -> int:
DURATION_MULTIPLIERS = {"s": 1, "m": 60, "h": 3600, "": 1}
mult_index = string[-1].lower()
if mult_index.isdigit():
mult_index = ""
else:
string = string[:-1]
try:
multiplier = DURATION_MULTIPLIERS[mult_index]
except IndexError:
raise ValueError(f"Unknown duration multiplier: {mult_index}")
return int(string) * multiplier
def compare_duration(compstr: str) -> typing.Callable[[int], bool]:
DURATION_COMPARATORS = {
"<": int.__lt__,
"-": int.__lt__,
">": int.__gt__,
"+": int.__gt__,
"=": int.__eq__,
"": int.__le__,
}
comp_index = compstr[0]
if comp_index.isdigit():
comp_index = ""
else:
compstr = compstr[1:]
try:
comparator = DURATION_COMPARATORS[comp_index]
except IndexError:
raise ValueError(f"Unknown duration comparator: {comp_index}")
duration = parse_duration(compstr)
return lambda d: comparator(d, duration)
def format_duration(duration: int) -> str:
return time.strftime("%H:%M:%S", time.gmtime(duration))
class RVElement:
parent: "RVDatabase"
item: dict
RERESEARCH_AFTER = datetime.timedelta(hours=1)
def __init__(self, parent: "RVDatabase", item: dict) -> None:
self.parent = parent
self.item = item
@property
def id(self) -> str:
return self.item["id"]
@property
def sid(self) -> str:
return self.id.split("/")[-1]
def metafile(self, extension: str) -> str:
return os.path.join(self.parent.METADATA_FOLDER, f"{self.sid}.{extension}")
def metafile_read(self, extension: str) -> typing.Any:
return self.parent.metafile_read(f"{self.sid}.{extension}")
def metafile_write(self, extension: str, data: typing.Any) -> None:
return self.parent.metafile_write(f"{self.sid}.{extension}", data)
def save(self) -> None:
self.metafile_write("item", self.item)
@property
def title(self) -> str:
return self.item["title"]
@property
def link(self) -> str:
return self.item["canonical"][0]["href"]
@property
def creator(self) -> str:
return self.item["origin"]["title"]
@property
def date(self) -> datetime.datetime:
timestamp = (
int(self.item.get("timestampUsec", "0")) / 1000000
or int(self.item.get("crawlTimeMsec", "0")) / 1000
or self.item["published"]
)
return datetime.datetime.fromtimestamp(timestamp)
@property
def is_researched(self) -> bool:
metafile = self.metafile("ytdl")
return os.path.isfile(metafile)
def __str__(self) -> str:
str = f"{self.date.strftime('%y-%m-%d %H:%M')} ("
if self.is_researched:
if self.is_video:
str += format_duration(self.duration)
else:
str += "--:--:--"
else:
str += "??:??:??"
str += (
f") {self.creator if self.creator else '?'} "
f" {self.title} "
f" {self.link}"
)
return str
@property
def downloaded(self) -> bool:
if not self.is_researched:
return False
return os.path.isfile(self.filepath)
@functools.cached_property
def ytdl_infos(self) -> typing.Optional[dict]:
try:
return self.metafile_read("ytdl")
except (FileNotFoundError, TypeError, AttributeError, EOFError):
infos = self._ytdl_infos()
self.metafile_write("ytdl", infos)
return infos
def _ytdl_infos(self) -> typing.Optional[dict]:
log.info(f"Researching: {self}")
try:
infos = self.parent.ytdl_dry.extract_info(self.link, download=False)
except KeyboardInterrupt as e:
raise e
except yt_dlp.utils.DownloadError as e:
# TODO Still raise in case of temporary network issue
log.warning(e)
infos = None
if infos:
infos = self.parent.ytdl_dry.sanitize_info(infos)
return infos
@property
def duration(self) -> int:
assert self.is_video
assert self.ytdl_infos
return int(self.ytdl_infos["duration"])
@property
def is_video(self) -> bool:
# Duration might be missing in playlists and stuff
return self.ytdl_infos is not None and "duration" in self.ytdl_infos
@functools.cached_property
def downloaded_filepath(self) -> typing.Optional[str]:
try:
return self.metafile_read("path")
except FileNotFoundError:
return None
@property
def was_downloaded(self) -> bool:
metafile = self.metafile("path")
return os.path.exists(metafile)
@property
def filepath(self) -> str:
assert self.is_video
if self.downloaded_filepath:
return self.downloaded_filepath
return self.parent.ytdl_dry.prepare_filename(self.ytdl_infos)
@property
def basename(self) -> str:
assert self.is_video
return os.path.splitext(self.filepath)[0]
def expire_info(self) -> None:
metafile = self.metafile("ytdl")
if os.path.isfile(metafile):
stat = os.stat(metafile)
mtime = datetime.datetime.fromtimestamp(stat.st_mtime)
diff = datetime.datetime.now() - mtime
if diff > self.RERESEARCH_AFTER:
os.unlink(metafile)
del self.ytdl_infos
def download(self) -> None:
assert self.is_video
if self.downloaded:
return
self.expire_info()
log.info(f"Downloading: {self}")
lockfile = self.metafile("lock")
with filelock.FileLock(lockfile):
if not self.parent.args.dryrun:
with yt_dlp.YoutubeDL(self.parent.ytdl_opts) as ydl:
ydl.add_post_processor(SaveInfoPP(self))
ydl.process_ie_result(self.ytdl_infos, download=True)
def update_post_download(self, info: dict) -> None:
self.downloaded_filepath = self.parent.ytdl_dry.prepare_filename(info)
assert self.downloaded_filepath
assert self.downloaded_filepath.startswith(self.basename)
self.metafile_write("path", self.downloaded_filepath)
@property
def watched(self) -> bool:
if not self.is_researched:
return False
return self.was_downloaded and not self.downloaded
def matches_filter(self, args: configargparse.Namespace) -> bool:
# Inexpensive filters
if args.seen != "any" and (args.seen == "seen") != self.watched:
log.debug(f"Not {args.seen}: {self}")
return False
if args.title and not re.search(args.title, self.title):
log.debug(f"Title not matching {args.title}: {self}")
return False
if args.link and not re.search(args.link, self.link):
log.debug(f"Link not matching {args.link}: {self}")
return False
if args.creator and (
not self.creator or not re.search(args.creator, self.creator)
):
log.debug(f"Creator not matching {args.creator}: {self}")
return False
# Expensive filters
if not self.is_video:
log.debug(f"Not a video: {self}")
return False
if args.duration and not compare_duration(args.duration)(self.duration):
log.debug(f"Duration {self.duration} not matching {args.duration}: {self}")
return False
return True
def watch(self) -> None:
self.download()
cmd = ["mpv", self.filepath]
log.debug(f"Running {cmd}")
if not self.parent.args.dryrun:
proc = subprocess.run(cmd)
proc.check_returncode()
self.undownload()
self.try_mark_read()
def clean_file(self, folder: str, basename: str) -> None:
for file in os.listdir(folder):
if file.startswith(basename):
path = os.path.join(folder, file)
log.debug(f"Removing file: {path}")
if not self.parent.args.dryrun:
os.unlink(path)
def undownload(self) -> None:
assert self.is_video
log.info(f"Removing gone video: {self.basename}*")
self.clean_file(".", self.basename)
def clean(self) -> None:
if self.is_researched and self.is_video:
self.undownload()
log.info(f"Removing gone metadata: {self.sid}*")
self.clean_file(self.parent.METADATA_FOLDER, self.sid)
def mark_read(self) -> None:
log.debug(f"Marking {self} read")
if self.parent.args.dryrun:
return
r = requests.post(
f"{self.parent.args.url}/reader/api/0/edit-tag",
data={
"i": self.id,
"a": "user/-/state/com.google/read",
"ac": "edit",
"token": self.parent.feed_token,
},
headers=self.parent.auth_headers,
)
r.raise_for_status()
if r.text.strip() != "OK":
raise RuntimeError(f"Couldn't mark {self} as read: {r.text}")
log.info(f"Marked {self} as read")
self.clean()
def try_mark_read(self) -> None:
try:
self.mark_read()
except requests.ConnectionError:
log.warning(f"Couldn't mark {self} as read")
class RVDatabase:
METADATA_FOLDER = ".metadata"
args: configargparse.Namespace
elements: list[RVElement]
def __init__(self, args: configargparse.Namespace) -> None:
self.args = args
def metafile_read(self, name: str) -> typing.Any:
path = os.path.join(self.METADATA_FOLDER, name)
log.debug(f"Reading {path}")
with open(path, "rb") as mf:
return pickle.load(mf)
def metafile_write(self, name: str, data: typing.Any) -> None:
path = os.path.join(self.METADATA_FOLDER, name)
log.debug(f"Writing {path}")
if not self.args.dryrun:
with open(path, "wb") as mf:
pickle.dump(data, mf)
def clean_cache(self, cache: "RVDatabase") -> None:
log.debug("Cleaning cache")
fresh_ids = set(el.id for el in self.elements)
for el in cache.elements:
if el.id not in fresh_ids:
el.clean()
def _auth_headers(self) -> dict[str, str]:
r = requests.get(
f"{self.args.url}/accounts/ClientLogin",
params={"Email": self.args.email, "Passwd": self.args.passwd},
)
r.raise_for_status()
for line in r.text.split("\n"):
if line.lower().startswith("auth="):
val = "=".join(line.split("=")[1:])
return {"Authorization": f"GoogleLogin auth={val}"}
raise RuntimeError("Couldn't find auth= key")
@functools.cached_property
def auth_headers(self) -> dict[str, str]:
try:
return self.metafile_read(".auth_headers")
except FileNotFoundError:
headers = self._auth_headers()
self.metafile_write(".auth_headers", headers)
return headers
def fetch_feed_elements(self) -> typing.Generator[dict, None, None]:
log.info("Fetching RSS feed")
continuation: typing.Optional[str] = None
with requests.Session() as s:
def next_page() -> typing.Generator[dict, None, None]:
nonlocal continuation
r = s.get(
f"{self.args.url}/reader/api/0/stream/contents",
params={
"xt": "user/-/state/com.google/read",
"c": continuation,
},
headers=self.auth_headers,
)
r.raise_for_status()
json = r.json()
yield from json["items"]
continuation = json.get("continuation")
yield from next_page()
while continuation:
yield from next_page()
def fetch_cache_elements(self) -> typing.Generator[dict, None, None]:
log.info("Fetching from cache")
for file in os.listdir(self.METADATA_FOLDER):
if not file.endswith(".item"):
continue
yield self.metafile_read(file)
def build_list(self, items: typing.Iterable[dict], save: bool = False) -> None:
self.elements = []
for item in items:
element = RVElement(self, item)
self.elements.insert(0, element)
log.debug(f"Known: {element}")
if save:
element.save()
def read_feed(self) -> None:
self.build_list(self.fetch_feed_elements(), save=True)
def read_cache(self) -> None:
self.build_list(self.fetch_cache_elements())
def clean_folder(self, folder: str, basenames: set[str]) -> None:
for file in os.listdir(folder):
path = os.path.join(folder, file)
if not os.path.isfile(path) or file[0] == ".":
continue
for basename in basenames:
if file.startswith(basename):
break
else:
log.info(f"Removing unknown file: {path}")
if not self.args.dryrun:
os.unlink(path)
def clean(self) -> None:
log.debug("Cleaning")
filenames = set(el.basename for el in self.elements if el.is_video)
self.clean_folder(".", filenames)
ids = set(el.sid for el in self.elements)
self.clean_folder(self.METADATA_FOLDER, ids)
@property
def ytdl_opts(self) -> dict:
# Get user/system options
prev_argv = sys.argv
sys.argv = ["yt-dlp"]
_, _, _, ydl_opts = yt_dlp.parse_options()
sys.argv = prev_argv
return ydl_opts
@property
def ytdl_dry_opts(self) -> dict:
opts = self.ytdl_opts.copy()
opts.update({"quiet": True})
return opts
@property
def ytdl_dry(self) -> yt_dlp.YoutubeDL:
return yt_dlp.YoutubeDL(self.ytdl_dry_opts)
def filter(self, args: configargparse.Namespace) -> typing.Iterable[RVElement]:
elements_src = self.elements.copy()
elements: typing.Iterable[RVElement]
# Inexpensive sort
if args.order == "new":
elements = sorted(elements_src, key=lambda el: el.date, reverse=True)
elif args.order == "old":
elements = sorted(elements_src, key=lambda el: el.date)
elif args.order == "title":
elements = sorted(elements_src, key=lambda el: el.title)
elif args.order == "creator":
elements = sorted(elements_src, key=lambda el: el.creator or "")
elif args.order == "link":
elements = sorted(elements_src, key=lambda el: el.link)
elif args.order == "random":
elements = elements_src
random.shuffle(elements)
# Possibly expensive filtering
elements = filter(lambda el: el.matches_filter(args), elements)
# Expensive sort
if args.order == "short":
elements = sorted(
elements, key=lambda el: el.duration if el.is_video else 0
)
elif args.order == "long":
elements = sorted(
elements, key=lambda el: el.duration if el.is_video else 0, reverse=True
)
# Post sorting filtering
if args.total_duration:
rem = parse_duration(args.total_duration)
old_els = list(elements)
elements = list()
while rem > 0:
for el in old_els:
if el.duration < rem:
elements.append(el)
rem -= el.duration
old_els.remove(el)
break
else:
break
return elements
@functools.cached_property
def feed_token(self) -> str:
r = requests.get(
f"{self.args.url}/reader/api/0/token",
headers=self.auth_headers,
)
r.raise_for_status()
return r.text.strip()
def try_mark_watched_read(self) -> None:
for element in self.elements:
if element.watched:
element.try_mark_read()
def get_args() -> configargparse.Namespace:
defaultConfigPath = os.path.join(
os.path.expanduser(os.getenv("XDG_CONFIG_PATH", "~/.config/")), "rssVideos"
)
parser = configargparse.ArgParser(
description="Download videos in unread articles from a feed aggregator",
default_config_files=[defaultConfigPath],
)
# Runtime settings
parser.add_argument(
"-v",
"--verbosity",
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
default=None,
help="Verbosity of log messages",
)
parser.add(
"-c", "--config", required=False, is_config_file=True, help="Configuration file"
)
parser.add(
"-n",
"--dryrun",
help="Only pretend to do actions",
action="store_const",
const=True,
default=False,
)
# Input/Output
parser.add(
"--url",
help="URL of the Google Reader API of the aggregator",
env_var="RSS_VIDEOS_URL",
required=True,
)
parser.add(
"--email",
help="E-mail / user to connect to the aggregator",
env_var="RSS_VIDEOS_EMAIL",
required=True,
)
parser.add(
"--passwd",
help="Password to connect to the aggregator",
env_var="RSS_VIDEOS_PASSWD",
required=True,
)
parser.add(
"--no-refresh",
dest="refresh",
help="Don't fetch feed",
action="store_false",
)
parser.add(
"--videos",
help="Directory to store videos",
env_var="RSS_VIDEOS_VIDEO_DIR",
required=True,
)
# Which videos
parser.add(
"--order",
choices=("old", "new", "title", "creator", "link", "short", "long", "random"),
default="old",
help="Sorting mechanism",
)
parser.add("--creator", help="Regex to filter by creator")
parser.add("--title", help="Regex to filter by title")
parser.add("--link", help="Regex to filter by link")
parser.add("--duration", help="Comparative to filter by duration")
# TODO Date selector
parser.add(
"--seen",
choices=("seen", "unseen", "any"),
default="unseen",
help="Only include seen/unseen/any videos",
)
parser.add(
"--total-duration",
help="Use videos that fit under the total given",
)
# TODO Envrionment variables
# TODO Allow to ask
parser.add(
"action",
nargs="?",
choices=(
"download",
"list",
"watch",
"binge",
),
default="download",
)
args = parser.parse_args()
args.videos = os.path.realpath(os.path.expanduser(args.videos))
return args
def get_database(args: configargparse.Namespace) -> RVDatabase:
cache = RVDatabase(args)
cache.read_cache()
if not args.refresh:
return cache
fresh = RVDatabase(args)
fresh.read_feed()
fresh.clean_cache(cache)
return fresh
def main() -> None:
args = get_args()
configure_logging(args)
metadata_dir = os.path.join(args.videos, RVDatabase.METADATA_FOLDER)
for dir in (args.videos, metadata_dir):
os.makedirs(dir, exist_ok=True)
os.chdir(args.videos)
database = get_database(args)
log.debug("Running action")
duration = 0
for element in database.filter(args):
duration += element.duration if element.is_video else 0
if args.action == "download":
element.download()
elif args.action == "list":
print(element)
elif args.action in ("watch", "binge"):
element.watch()
if args.action == "watch":
break
else:
raise NotImplementedError(f"Unimplemented action: {args.action}")
log.info(f"Total duration: {format_duration(duration)}")
database.try_mark_watched_read()
database.clean()
if __name__ == "__main__":
main()

225
hm/scripts/smtpdummy Executable file
View file

@ -0,0 +1,225 @@
#!/usr/bin/env nix-shell
#! nix-shell -i python3
#! nix-shell -p python3 python3Packages.colorama python3Packages.configargparse
import base64
import datetime
import email.utils
import io
import pprint
import subprocess
import sys
import colorama
import configargparse
if __name__ == "__main__":
parser = configargparse.ArgParser(
description="Generate SMTP messages to send a mail"
)
now = datetime.datetime.now()
now_email = email.utils.formatdate(now.timestamp(), True)
parser.add_argument("-o", "--origin", env_var="ORIGIN", default="localhost")
parser.add_argument(
"-d", "--destination", env_var="DESTINATION", default="localhost"
)
parser.add_argument("-p", "--port", env_var="PORT", default=25)
parser.add_argument(
"-S",
"--security",
env_var="SECURITY",
choices=["plain", "ssl", "starttls"],
default="plain",
)
parser.add_argument("-l", "--helo", env_var="HELO")
parser.add_argument("-L", "--helo-verb", env_var="HELO_VERB", default="EHLO")
parser.add_argument(
"-s", "--sender", env_var="SENDER", default="geoffrey@frogeye.fr"
)
parser.add_argument(
"-r",
"--receiver",
env_var="RECEIVER",
default=[],
action="append",
)
# parser.add_argument("-a", "--auth", env_var="AUTH", default="PLAIN")
parser.add_argument("-u", "--user", env_var="MUSER")
parser.add_argument("-w", "--password", env_var="PASSWORD")
parser.add_argument("-f", "--from", env_var="FROM")
parser.add_argument("-t", "--to", env_var="TO")
parser.add_argument("-T", "--reply-to", env_var="REPLYTO")
parser.add_argument(
"-j",
"--subject",
env_var="SUBJECT",
default=f"Test message {now.strftime('%H:%M:%S')}",
)
parser.add_argument("-8", "--smtputf8", env_var="SMTPUTF8", action="store_true")
parser.add_argument("-c", "--callout", env_var="CALLOUT", action="store_true")
parser.add_argument("-b", "--body", env_var="BODY", default="")
parser.add_argument("-g", "--gtube", env_var="GTUBE", action="store_true")
parser.add_argument("-m", "--me", env_var="ME", default="Geoffrey")
parser.add_argument(
"-H", "--headers", default=[], action="append", env_var="HEADER"
)
parser.add_argument("-y", "--dryrun", env_var="DRYRUN", action="store_true")
parser.add_argument("-q", "--quiet", env_var="QUIET", action="store_true")
args = parser.parse_args()
# Default values
if not args.receiver:
args.receiver = ["geoffrey@frogeye.fr"]
if args.helo is None:
args.helo = args.origin
if getattr(args, "from") is None:
setattr(args, "from", args.sender)
if args.to is None:
args.to = args.receiver[0]
if args.reply_to is None:
args.reply_to = getattr(args, "from")
if args.password:
password = args.password
args.password = "********"
mid = email.utils.make_msgid(domain=args.helo)
# Transmission content
headers = ""
if args.headers:
headers = "\n" + "\n".join(args.headers)
gtube = ""
if args.gtube:
gtube = """
XJS*C4JDBQADN1.NSBN3*2IDNEN*GTUBE-STANDARD-ANTI-UBE-TEST-EMAIL*C.34X"""
body = ""
if args.body:
body = f"\n\n{args.body}"
text = f"""Date: {now_email}
From: {args.me} <{getattr(args, 'from')}>
Subject: {args.subject}
To: {args.to}
Reply-To: {args.reply_to}
Message-ID: {mid}{headers}
Hello there,
This is a test message, generated from a template.
If you didn't expect to see this message, please contact {args.me}.{gtube}{body}
Greetings,
Input arguments:
{pprint.pformat(args.__dict__, indent=4)}
--
{args.me}
."""
# Transmission setup
cmd = []
if args.origin != "localhost":
cmd += ["ssh", args.origin]
if args.security == "plain":
cmd += ["socat", "-", f"tcp:{args.destination}:{args.port}"]
elif args.security == "ssl":
cmd += ["socat", "-", f"openssl:{args.destination}:{args.port}"]
elif args.security == "starttls":
cmd += [
"openssl",
"s_client",
"-starttls",
"smtp",
"-crlf",
"-connect",
f"{args.destination}:{args.port}",
"-quiet",
]
if not args.quiet:
print(colorama.Fore.MAGENTA + f"# {' '.join(cmd)}" + colorama.Fore.RESET)
if not args.dryrun:
p = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
)
def recv() -> None:
if args.dryrun:
return
assert isinstance(p.stdout, io.BufferedReader)
next = True
while next:
line = p.stdout.readline()
try:
code = int(line[:3])
except ValueError:
raise ValueError(f"Could not parse line: '{line.decode()}'")
success = code < 400
color = colorama.Fore.GREEN if success else colorama.Fore.RED
if not args.quiet:
print(color + f"< {line[:-1].decode()}" + colorama.Fore.RESET)
next = line[3] == b"-"[0]
if not next and not success:
send("QUIT") # TODO Can loop if QUIT fails
sys.exit(1)
def _send(command: str) -> None:
if not args.quiet:
print(colorama.Fore.BLUE + f"> {command}" + colorama.Fore.RESET)
if args.dryrun:
return
assert isinstance(p.stdin, io.BufferedWriter)
cmd = command.encode() + b"\n"
p.stdin.write(cmd)
p.stdin.flush()
def send(command: str) -> None:
_send(command)
recv()
# Transmission
if args.security != "starttls":
recv()
send(f"{args.helo_verb} {args.helo}")
if args.user:
encoded = base64.b64encode(
args.user.encode()
+ b"\x00"
+ args.user.encode()
+ b"\x00"
+ password.encode()
).decode()
send(f"AUTH PLAIN {encoded}")
send(f"MAIL FROM:<{args.sender}>" + (" SMTPUTF8" if args.smtputf8 else ""))
for receiver in args.receiver:
send(f"RCPT TO:<{receiver}>")
if not args.callout:
send("DATA")
send(text)
send("QUIT")
sys.exit(0)
# For reference:
# send("RSET")
# send("VRFY")
# send("NOOP")
# send("QUIT")

20
hm/scripts/spongebob Executable file
View file

@ -0,0 +1,20 @@
#!/usr/bin/env nix-shell
#! nix-shell -i python3 --pure
#! nix-shell -p python3
import random
import sys
# maj = True
for line in sys.stdin:
new_line = ""
for c in line:
maj = random.random() > 0.5
if maj:
nc = c.upper()
else:
nc = c.lower()
new_line += nc
# maj = not maj
print(new_line, end="")

21
hm/scripts/syncthingRestore Executable file
View file

@ -0,0 +1,21 @@
#!/usr/bin/env nix-shell
#! nix-shell -i python3 --pure
#! nix-shell -p python3
import os
import shutil
curDir = os.path.realpath(".")
assert ".stversions/" in curDir
tgDir = curDir.replace(".stversions/", "")
for root, dirs, files in os.walk(curDir):
dstRoot = root.replace(curDir, tgDir)
os.makedirs(dstRoot, exist_ok=True)
for f in files:
srcPath = os.path.join(root, f)
dstF = f
dstPath = os.path.join(dstRoot, dstF)
print(f"{srcPath} → {dstPath}")
shutil.copy2(srcPath, dstPath)

19
hm/scripts/tagCreatorPhotos Executable file
View file

@ -0,0 +1,19 @@
#!/usr/bin/env nix-shell
#! nix-shell -i python3 --pure
#! nix-shell -p python3 python3Packages.piexif
import os
import sys
import piexif
assert len(sys.argv) >= 3, "Usage {} CREATOR FILENAMES...".format(sys.argv[0])
creator = sys.argv[1]
filenames = sys.argv[2:]
for filename in filenames:
assert os.path.isfile(filename)
exifDict = piexif.load(filename)
exifDict["0th"][piexif.ImageIFD.Copyright] = creator.encode()
exifBytes = piexif.dump(exifDict)
piexif.insert(exifBytes, filename)

76
hm/scripts/ter Executable file
View file

@ -0,0 +1,76 @@
#!/usr/bin/env nix-shell
#! nix-shell -i python3 --pure
#! nix-shell -p python3
import sys
from math import inf
gares = sys.argv[1:]
N = len(gares)
if N < 2:
print("Ben reste chez toi alors.")
sys.exit(1)
def trajet_str(a, b):
return f"{gares[a]} → {gares[b]}"
def chemin_str(stack):
return ", ".join(
[trajet_str(stack[i], stack[i + 1]) for i in range(len(stack) - 1)]
)
# Demande des prix des trajets
prices = dict()
for i in range(N):
for j in range(N - 1, i, -1):
p = None
while not isinstance(p, float):
try:
p = float(
input(f"Prix du trajet {trajet_str(i, j)} ? ").replace(",", ".")
)
except ValueError:
print("C'est pas un prix ça !")
if i not in prices:
prices[i] = dict()
prices[i][j] = float(p)
# Calcul des prix des chemins
miniPrice = +inf
miniStack = None
maxiPrice = -inf
maxiStack = None
def register_path(stack):
price = sum([prices[stack[i]][stack[i + 1]] for i in range(len(stack) - 1)])
global miniPrice, maxiPrice, miniStack, maxiStack
if price < miniPrice:
miniPrice = price
miniStack = stack.copy()
if price > maxiPrice:
maxiPrice = price
maxiStack = stack.copy()
print(f"{chemin_str(stack)} = {price:.2f} €")
stack = [0]
while stack[0] == 0:
if stack[-1] >= N - 1:
register_path(stack)
stack.pop()
stack[-1] += 1
else:
stack.append(stack[-1] + 1)
print(f"Prix minimum: {chemin_str(miniStack)} = {miniPrice:.2f} €")
print(f"Prix maximum: {chemin_str(maxiStack)} = {maxiPrice:.2f} €")

177
hm/scripts/unziptree Executable file
View file

@ -0,0 +1,177 @@
#!/usr/bin/env nix-shell
#! nix-shell -i python3 --pure
#! nix-shell -p python3 python3Packages.coloredlogs python3Packages.magic unzip p7zip unrar gnutar gzip
import logging
import os
import subprocess
import sys
import typing
import coloredlogs
import magic
# TODO Able to ignore extensions everywhere
class ArchiveType:
suffix: str = ""
fullname: str = ""
dest_suffix: str = ""
mime: typing.Optional[str] = None
header: typing.Optional[bytes] = None
extract_cmd: typing.Optional[typing.List[str]] = None
single_file = False
append_dest = False
def __init__(self) -> None:
self.log = logging.getLogger(self.__class__.__name__)
def dest_name(self, archive: str) -> str:
return archive + self.dest_suffix
def fits(self, name_lower: str, mime: str, header: bytes) -> bool:
if not name_lower.endswith(self.suffix):
return False
if self.mime is not None and mime != self.mime:
return False
if self.header is not None and not header.startswith(self.header):
return False
return True
def _get_cmd(self, archive: str, dest: str) -> typing.List[str]:
assert self.extract_cmd
cmd = self.extract_cmd + [archive]
if self.append_dest:
cmd.append(dest)
return cmd
def extract(self, archive: str, dest: str) -> None:
cmd = self._get_cmd(archive, dest)
if not self.single_file:
os.mkdir(dest)
self.log.info("Extracting '%s' into '%s'", archive, dest)
self.log.debug("%s", cmd)
if self.single_file:
r = subprocess.run(cmd)
else:
r = subprocess.run(cmd, cwd=dest)
r.check_returncode()
if self.single_file:
assert os.path.isfile(dest)
extract_fun: typing.Optional[typing.Callable[[str, str], None]] = None
class ArchiveZip(ArchiveType):
suffix = ".zip"
mime = "application/zip"
extract_cmd = ["unzip"]
class Archive7z(ArchiveType):
suffix = ".7z"
mime = "application/x-7z-compressed"
extract_cmd = ["7z", "x"]
class ArchiveRar(ArchiveType):
suffix = ".rar"
mime = "application/x-rar"
extract_cmd = ["unrar", "x"]
class ArchiveTar(ArchiveType):
suffix = ".tar"
mime = "application/x-tar"
extract_cmd = ["tar", "--extract", "--file"]
class ArchiveTarGz(ArchiveType):
suffix = ".tar.gz"
mime = "application/gzip"
extract_cmd = ["tar", "--extract", "--gzip", "--file"]
class ArchiveTarXz(ArchiveType):
suffix = ".tar.xz"
mime = "application/x-xz"
extract_cmd = ["tar", "--extract", "--xz", "--file"]
class ArchiveGzip(ArchiveType):
suffix = ".gz"
mime = "application/gzip"
single_file = True
extract_cmd = ["gunzip"]
class TreeExtractor:
ARCHIVE_TYPES: typing.List[ArchiveType] = [
ArchiveZip(),
Archive7z(),
ArchiveRar(),
ArchiveTar(),
ArchiveTarGz(),
ArchiveTarXz(),
ArchiveGzip(),
]
def __init__(self) -> None:
self.log = logging.getLogger("TreeExtractor")
self.suffixes = set()
for archive_type in self.ARCHIVE_TYPES:
self.suffixes.add(archive_type.suffix)
def extract_tree(self, directory: str = ".") -> None:
for root, dirs, files in os.walk(directory):
real_root = os.path.realpath(root)
for name in files:
self.log.debug("Handling '%s' '%s'", real_root, name)
# Initial filtering with suffix
name_lower = name.lower()
for suffix in self.suffixes:
if name_lower.endswith(suffix):
break
else:
self.log.debug("Suffix not matched: %s", name)
continue
filepath = os.path.join(real_root, name)
with open(filepath, "rb") as filedesc:
header = filedesc.read(1024)
mime = magic.detect_from_content(header).mime_type
archive_type = None
for archtyp in self.ARCHIVE_TYPES:
if archtyp.fits(name_lower, mime, header):
archive_type = archtyp
break
if not archive_type:
self.log.debug("Not matched: %s", filepath)
continue
dest_name = archive_type.dest_name(name)
dest = os.path.join(real_root, dest_name)
dest_tmp = dest + ".tmp"
try:
archive_type.extract(filepath, dest_tmp)
except BaseException as e:
# TODO Parameters stop on error
self.log.error(e, exc_info=True)
else:
os.unlink(filepath)
os.rename(dest_tmp, dest)
if os.path.isdir(dest):
self.extract_tree(dest)
def main(self) -> None:
directory = sys.argv[1] if len(sys.argv) > 1 else "."
self.extract_tree(directory)
if __name__ == "__main__":
coloredlogs.install(level="DEBUG", fmt="%(levelname)s %(message)s")
TreeExtractor().main()

161
hm/scripts/updateCompressedMusic Executable file
View file

@ -0,0 +1,161 @@
#!/usr/bin/env nix-shell
#! nix-shell -i python3 --pure
#! nix-shell -p python3 python3Packages.coloredlogs python3Packages.progresbar2 ffmpeg
# pylint: disable=C0103
import logging
import os
import re
import subprocess
import typing
import coloredlogs
import progressbar
coloredlogs.install(level="DEBUG", fmt="%(levelname)s %(message)s")
log = logging.getLogger()
# Constants
SOURCE_FOLDER = os.path.join(os.path.expanduser("~"), "Musiques")
OUTPUT_FOLDER = os.path.join(os.path.expanduser("~"), ".MusiqueCompressed")
CONVERSIONS = {"flac": "opus"}
FORBIDDEN_EXTENSIONS = ["jpg", "png", "pdf", "ffs_db"]
FORGIVEN_FILENAMES = [
"cover.jpg",
"front.jpg",
"folder.jpg",
"cover.png",
"front.png",
"folder.png",
]
IGNORED_EMPTY_FOLDER = [".stfolder"]
RESTRICT_CHARACTERS = '[\0\\/:*"<>|]' # FAT32, NTFS
# RESTRICT_CHARACTERS = '[:/]' # HFS, HFS+
# RESTRICT_CHARACTERS = '[\0/]' # ext2-4, linux-based?
act = True
# TODO FEAT Make the directory structure the same as the base one and
# remove IGNORED_EMPTY_FOLDER variable
# Listing files
log.info("Listing files")
sourceFiles = dict()
for root, dirs, files in os.walk(SOURCE_FOLDER):
for f in files:
fullPath = os.path.join(root, f)
path = os.path.relpath(fullPath, SOURCE_FOLDER)
sourceFiles[path] = os.path.getctime(fullPath)
outputFiles = dict()
for root, dirs, files in os.walk(OUTPUT_FOLDER):
for f in files:
fullPath = os.path.join(root, f)
path = os.path.relpath(fullPath, OUTPUT_FOLDER)
outputFiles[path] = os.path.getctime(fullPath)
# Sorting files
remainingConversions = dict()
extraFiles = set(outputFiles.keys())
def convertPath(path: str) -> typing.Optional[str]:
filename, extension = os.path.splitext(path)
extension = extension[1:].lower()
# Remove unwanted characters from filename
filename_parts = os.path.normpath(filename).split(os.path.sep)
filename_parts = [re.sub(RESTRICT_CHARACTERS, "_", part) for part in filename_parts]
filename = os.path.sep.join(filename_parts)
# If the extension isn't allowed
if extension in FORBIDDEN_EXTENSIONS:
basename = os.path.basename(path)
# And the filename is not an exception
if basename not in FORGIVEN_FILENAMES:
# This file shouldn't be copied nor converted
return None
# If this needs a conversion
elif extension in CONVERSIONS:
extension = CONVERSIONS[extension]
return filename + "." + extension
# In all other case, this is a simple copy
return path
log.info("Determining action over %d files", len(sourceFiles))
for sourceFile in sourceFiles:
outputFile = convertPath(sourceFile)
# If the file should not be converted, do nothing
if not outputFile:
continue
# If the file already has something as an output
elif outputFile in outputFiles:
extraFiles.remove(outputFile)
# If the output file is newer than the source file, do not initiate a
# conversion
if outputFiles[outputFile] >= sourceFiles[sourceFile]:
continue
# If the file needs to be converted, do it
remainingConversions[sourceFile] = outputFile
log.debug("%d actions will need to be taken", len(remainingConversions))
log.info("Copying files that do not require a conversion")
conversions = set()
for sourceFile in remainingConversions:
outputFile = remainingConversions[sourceFile]
# Creating folder if it doesn't exists
fullOutputFile = os.path.join(OUTPUT_FOLDER, outputFile)
fullOutputDir = os.path.dirname(fullOutputFile)
if act:
os.makedirs(fullOutputDir, exist_ok=True)
# Converting
fullSourceFile = os.path.join(SOURCE_FOLDER, sourceFile)
if sourceFile == outputFile:
log.debug("%s → %s", fullSourceFile, fullOutputFile)
if act and os.path.isfile(fullOutputFile):
os.remove(fullOutputFile)
os.link(fullSourceFile, fullOutputFile)
else:
conversions.add((fullSourceFile, fullOutputFile))
log.info("Removing extra files")
for extraFile in extraFiles:
fullExtraFile = os.path.join(OUTPUT_FOLDER, extraFile)
log.debug("× %s", fullExtraFile)
if act:
os.remove(fullExtraFile)
log.info("Listing files that will be converted")
for fullSourceFile, fullOutputFile in conversions:
log.debug("%s ⇒ %s", fullSourceFile, fullOutputFile)
log.info("Converting files")
for fullSourceFile, fullOutputFile in progressbar.progressbar(conversions):
cmd = [
"ffmpeg",
"-y",
"-i",
fullSourceFile,
"-c:a",
"libopus",
"-movflags",
"+faststart",
"-b:a",
"128k",
"-vbr",
"on",
"-compression_level",
"10",
fullOutputFile,
]
if act:
subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
else:
print(cmd)
# Removing empty dirs
for root, dirs, files in os.walk(OUTPUT_FOLDER):
if not dirs and not files:
dirBasename = os.path.basename(root)
if act and dirBasename not in IGNORED_EMPTY_FOLDER:
os.rmdir(root)

65
hm/scripts/videoQuota Executable file
View file

@ -0,0 +1,65 @@
#!/usr/bin/env nix-shell
#! nix-shell -i python3 --pure
#! nix-shell -p python3 ffmpeg
# Compress a video to make it fit under a certain size.
# Usage: videoQuota SIZE SRC DST
# SIZE: destination video size in bytes
# SRC: source video file
# DST: destination video file
# Example: videoQuota 20971520 source.mov dest.mp4
# To make a ~20 MiB MP4 of a MOV video
import subprocess
import sys
def duration_file(path: str) -> float:
cmd = [
"ffprobe",
"-v",
"error",
"-show_entries",
"format=duration",
"-of",
"default=noprint_wrappers=1:nokey=1",
path,
]
run = subprocess.run(cmd, stdout=subprocess.PIPE, check=True)
ret = run.stdout.decode().strip()
return float(ret)
# Constants
audio_br_bi = 128000
# TODO Arguments if you feel like it
quota_by = int(sys.argv[1])
in_file = sys.argv[2]
out_file = sys.argv[3]
filters = sys.argv[4:]
quota_bi = quota_by * 8
duration = duration_file(in_file)
tot_br_bi = quota_bi / duration
video_br_bi = int(tot_br_bi - audio_br_bi)
assert video_br_bi > 0, "Not even enough space for audio"
cmd = (
[
"ffmpeg",
"-i",
in_file,
]
+ filters
+ [
"-b:v",
str(video_br_bi),
"-b:a",
str(audio_br_bi),
out_file,
]
)
print(" ".join(cmd))
subprocess.run(cmd, check=True)

10
hm/scripts/wttr Executable file
View file

@ -0,0 +1,10 @@
#!/usr/bin/env nix-shell
#! nix-shell -i bash --pure
#! nix-shell -p bash curl ncurses
# change Paris to your default location
request="v2.wttr.in/${1-Amsterdam}"
[ "$(tput cols)" -lt 125 ] && request+='?n'
curl -H "Accept-Language: ${LANG%_*}" --compressed "$request"