dotfiles/scripts/compressPictureMovies
Geoffrey Frogeye ee178b7d57
nix: Make nix the root
Which means now I'll have to think about real prefixes in commit names.
2023-11-26 23:58:22 +01:00

244 lines
7.4 KiB
Plaintext
Executable file

#!/usr/bin/env nix-shell
#! nix-shell -i python3 --pure
#! nix-shell -p python3 python3Packages.coloredlogs python3Packages.progressbar2 ffmpeg
import datetime
import hashlib
import json
import logging
import os
import shutil
import statistics
import subprocess
import sys
import tempfile
import time
import coloredlogs
import progressbar
coloredlogs.install(level="DEBUG", fmt="%(levelname)s %(message)s")
log = logging.getLogger()
# Constants
PICTURES_FOLDER = os.path.join(os.path.expanduser("~"), "Images")
ORIGINAL_FOLDER = os.path.join(os.path.expanduser("~"), ".ImagesOriginaux")
MOVIE_EXTENSIONS = ["mov", "avi", "mp4", "3gp", "webm", "mkv"]
OUTPUT_EXTENSION = "webm"
OUTPUT_FFMPEG_PARAMETERS = ["-c:v", "libvpx-vp9", "-crf", "30", "-b:v", "0"]
# OUTPUT_FFMPEG_PARAMETERS = ["-c:v", "libaom-av1", "-crf", "30", "-strict", "experimental", "-c:a", "libopus"]
DURATION_MAX_DEV = 1
def videoMetadata(filename):
assert os.path.isfile(filename)
cmd = ["ffmpeg", "-i", filename, "-f", "ffmetadata", "-"]
p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
p.check_returncode()
metadataRaw = p.stdout
data = dict()
for metadataLine in metadataRaw.split(b"\n"):
# Skip empty lines
if not len(metadataLine):
continue
# Skip comments
if metadataLine.startswith(b";"):
continue
# Parse key-value
metadataLineSplit = metadataLine.split(b"=")
if len(metadataLineSplit) != 2:
log.warning("Unparsed metadata line: `{}`".format(metadataLine))
continue
key, val = metadataLineSplit
key = key.decode().lower()
val = val.decode()
data[key] = val
return data
def videoInfos(filename):
assert os.path.isfile(filename)
cmd = ["ffprobe", filename, "-print_format", "json", "-show_streams"]
p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
p.check_returncode()
infosRaw = p.stdout
infos = json.loads(infosRaw)
return infos
from pprint import pprint
def streamDuration(stream):
if "duration" in stream:
return float(stream["duration"])
elif "sample_rate" in stream and "nb_frames" in stream:
return int(stream["nb_frames"]) / int(stream["sample_rate"])
elif "tags" in stream and "DURATION" in stream["tags"]:
durRaw = stream["tags"]["DURATION"]
durSplit = durRaw.split(":")
assert len(durSplit) == 3
durSplitFloat = [float(a) for a in durSplit]
hours, minutes, seconds = durSplitFloat
return (hours * 60 + minutes) * 60 + seconds
else:
raise KeyError("Can't find duration information in stream")
def videoDuration(filename):
# TODO Doesn't work with VP8 / webm
infos = videoInfos(filename)
durations = [streamDuration(stream) for stream in infos["streams"]]
dev = statistics.stdev(durations)
assert dev <= DURATION_MAX_DEV, "Too much deviation ({} s)".format(dev)
return sum(durations) / len(durations)
todos = set()
totalSize = 0
totalDuration = 0
# Walk folders
log.info("Listing files in {}".format(PICTURES_FOLDER))
allVideos = list()
for root, dirs, files in os.walk(PICTURES_FOLDER):
# If folder is in ORIGINAL_FOLDER, skip it
if root.startswith(ORIGINAL_FOLDER):
continue
# Iterate over files
for inputName in files:
# If the file is not a video, skip it
inputNameBase, inputExt = os.path.splitext(inputName)
inputExt = inputExt[1:].lower()
if inputExt not in MOVIE_EXTENSIONS:
continue
allVideos.append((root, inputName))
log.info("Analyzing videos")
for root, inputName in progressbar.progressbar(allVideos):
inputNameBase, inputExt = os.path.splitext(inputName)
inputExt = inputExt[1:].lower()
# Generates all needed filepaths
## Found file
inputFull = os.path.join(root, inputName)
inputRel = os.path.relpath(inputFull, PICTURES_FOLDER)
## Original file
originalFull = os.path.join(ORIGINAL_FOLDER, inputRel)
originalRel = inputRel
assert not os.path.isfile(originalFull), originalFile + " exists"
## Compressed file
outputFull = os.path.join(root, inputNameBase + "." + OUTPUT_EXTENSION)
# If the extension is the same of the output one
if inputExt == OUTPUT_EXTENSION:
# Read the metadata of the video
meta = videoMetadata(inputFull)
# If it has the field with the original file
if "original" in meta:
# Skip file
continue
else:
assert not os.path.isfile(outputFull), outputFull + " exists"
size = os.stat(inputFull).st_size
try:
duration = videoDuration(inputFull)
except Exception as e:
log.warning("Can't determine duration of {}, skipping".format(inputFull))
log.debug(e, exc_info=True)
continue
todo = (inputFull, originalFull, outputFull, size, duration)
totalDuration += duration
totalSize += size
todos.add(todo)
log.info(
"Converting {} videos ({})".format(
len(todos), datetime.timedelta(seconds=totalDuration)
)
)
# From https://stackoverflow.com/a/3431838
def sha256(fname):
hash_sha256 = hashlib.sha256()
with open(fname, "rb") as f:
for chunk in iter(lambda: f.read(131072), b""):
hash_sha256.update(chunk)
return hash_sha256.hexdigest()
# Progress bar things
totalDataSize = progressbar.widgets.DataSize()
totalDataSize.variable = "max_value"
barWidgets = [
progressbar.widgets.DataSize(),
" of ",
totalDataSize,
" ",
progressbar.widgets.Bar(),
" ",
progressbar.widgets.FileTransferSpeed(),
" ",
progressbar.widgets.AdaptiveETA(),
]
bar = progressbar.DataTransferBar(max_value=totalSize, widgets=barWidgets)
bar.start()
processedSize = 0
for inputFull, originalFull, outputFull, size, duration in todos:
tmpfile = tempfile.mkstemp(
prefix="compressPictureMovies", suffix="." + OUTPUT_EXTENSION
)[1]
try:
# Calculate the sum of the original file
checksum = sha256(inputFull)
# Initiate a conversion in a temporary file
originalRel = os.path.relpath(originalFull, ORIGINAL_FOLDER)
originalContent = "{} {}".format(originalRel, checksum)
metadataCmd = ["-metadata", 'original="{}"'.format(originalContent)]
cmd = (
["ffmpeg", "-hide_banner", "-y", "-i", inputFull]
+ OUTPUT_FFMPEG_PARAMETERS
+ metadataCmd
+ [tmpfile]
)
p = subprocess.run(cmd)
p.check_returncode()
# Verify the durartion of the new file
newDuration = videoDuration(tmpfile)
dev = statistics.stdev((duration, newDuration))
assert dev < DURATION_MAX_DEV, "Too much deviation in duration"
# Move the original to the corresponding original folder
originalDir = os.path.dirname(originalFull)
os.makedirs(originalDir, exist_ok=True)
shutil.move(inputFull, originalFull)
# Move the converted file in place of the original
shutil.move(tmpfile, outputFull)
except Exception as e:
log.error("Couldn't process file {}".format(inputFull))
log.error(e, exc_info=True)
try:
os.unlink(tmpfile)
except Exception:
pass
# Progress bar things
processedSize += size
bar.update(processedSize)
bar.finish()
# TODO Iterate over the already compressed videos to assert the originals are
# in their correct place, else move them