#!/usr/bin/env python import os import subprocess import sys import logging import magic import typing import coloredlogs # TODO Able to ignore extensions everywhere class ArchiveType: suffix: str = "" fullname: str = "" dest_suffix: str = "" mime: typing.Optional[str] = None header: typing.Optional[bytes] = None extract_cmd: typing.Optional[typing.List[str]] = None single_file = False append_dest = False def __init__(self) -> None: self.log = logging.getLogger(self.__class__.__name__) def dest_name(self, archive: str) -> str: return archive + self.dest_suffix def fits(self, name_lower: str, mime: str, header: bytes) -> bool: if not name_lower.endswith(self.suffix): return False if self.mime is not None and mime != self.mime: return False if self.header is not None and not header.startswith(self.header): return False return True def _get_cmd(self, archive: str, dest: str) -> typing.List[str]: assert self.extract_cmd cmd = self.extract_cmd + [archive] if self.append_dest: cmd.append(dest) return cmd def extract(self, archive: str, dest: str) -> None: cmd = self._get_cmd(archive, dest) if not self.single_file: os.mkdir(dest) self.log.info("Extracting '%s' into '%s'", archive, dest) self.log.debug("%s", cmd) if self.single_file: r = subprocess.run(cmd) else: r = subprocess.run(cmd, cwd=dest) r.check_returncode() if self.single_file: assert os.path.isfile(dest) extract_fun: typing.Optional[typing.Callable[[str, str], None]] = None class ArchiveZip(ArchiveType): suffix = ".zip" mime = "application/zip" extract_cmd = ["unzip"] class Archive7z(ArchiveType): suffix = ".7z" mime = "application/x-7z-compressed" extract_cmd = ["7z", "x"] class ArchiveRar(ArchiveType): suffix = ".rar" mime = "application/x-rar" extract_cmd = ["unrar", "x"] class ArchiveTar(ArchiveType): suffix = ".tar" mime = "application/x-tar" extract_cmd = ["tar", "--extract", "--file"] class ArchiveTarGz(ArchiveType): suffix = ".tar.gz" mime = "application/gzip" extract_cmd = ["tar", "--extract", "--gzip", "--file"] class ArchiveTarXz(ArchiveType): suffix = ".tar.xz" mime = "application/x-xz" extract_cmd = ["tar", "--extract", "--xz", "--file"] class ArchiveGzip(ArchiveType): suffix = ".gz" mime = "application/gzip" single_file = True extract_cmd = ["gunzip"] class TreeExtractor: ARCHIVE_TYPES: typing.List[ArchiveType] = [ ArchiveZip(), Archive7z(), ArchiveRar(), ArchiveTar(), ArchiveTarGz(), ArchiveTarXz(), ArchiveGzip(), ] def __init__(self) -> None: self.log = logging.getLogger("TreeExtractor") self.suffixes = set() for archive_type in self.ARCHIVE_TYPES: self.suffixes.add(archive_type.suffix) def extract_tree(self, directory: str = ".") -> None: for root, dirs, files in os.walk(directory): real_root = os.path.realpath(root) for name in files: self.log.debug("Handling '%s' '%s'", real_root, name) # Initial filtering with suffix name_lower = name.lower() for suffix in self.suffixes: if name_lower.endswith(suffix): break else: self.log.debug("Suffix not matched: %s", name) continue filepath = os.path.join(real_root, name) with open(filepath, "rb") as filedesc: header = filedesc.read(1024) mime = magic.detect_from_content(header).mime_type archive_type = None for archtyp in self.ARCHIVE_TYPES: if archtyp.fits(name_lower, mime, header): archive_type = archtyp break if not archive_type: self.log.debug("Not matched: %s", filepath) continue dest_name = archive_type.dest_name(name) dest = os.path.join(real_root, dest_name) dest_tmp = dest + ".tmp" try: archive_type.extract(filepath, dest_tmp) except BaseException as e: # TODO Parameters stop on error self.log.error(e, exc_info=True) else: os.unlink(filepath) os.rename(dest_tmp, dest) if os.path.isdir(dest): self.extract_tree(dest) def main(self) -> None: directory = sys.argv[1] if len(sys.argv) > 1 else "." self.extract_tree(directory) if __name__ == "__main__": coloredlogs.install(level="DEBUG", fmt="%(levelname)s %(message)s") TreeExtractor().main()