#!/usr/bin/env python import os import subprocess import sys import logging import magic import typing import coloredlogs import enum # TODO Able to ignore extensions everywhere class ArchiveType(): suffix: str = '' dest_suffix: str = '' mime: typing.Optional[str] = None header: typing.Optional[bytes] = None extract_cmd: typing.Optional[typing.List[str]] = None single_file = False append_dest = False def __init__(self) -> None: self.log = logging.getLogger(self.__class__.__name__) def dest_name(self, archive: str) -> str: return archive[:-len(self.suffix)] + self.dest_suffix def fits(self, name_lower: str, mime: str, header: bytes) -> bool: if not name_lower.endswith(self.suffix): return False if self.mime is not None and mime != self.mime: return False if self.header is not None and not header.startswith(self.header): return False return True def _get_cmd(self, archive: str, dest: str) -> typing.List[str]: assert self.extract_cmd cmd = self.extract_cmd + [archive] if self.append_dest: cmd.append(dest) return cmd def extract(self, archive: str, dest: str) -> None: cmd = self._get_cmd(archive, dest) if not self.single_file: os.mkdir(dest) self.log.info("Extracting '%s' into '%s'", archive, dest) self.log.debug("%s", cmd) if self.single_file: r = subprocess.run(cmd) else: r = subprocess.run(cmd, cwd=dest) r.check_returncode() if self.single_file: assert os.path.isfile(dest) os.unlink(archive) extract_fun: typing.Optional[typing.Callable[[str, str], None]] = None class ArchiveZip(ArchiveType): suffix = '.zip' mime = 'application/zip' extract_cmd = ['unzip'] class Archive7z(ArchiveType): suffix = '.7z' mime = 'application/x-7z-compressed' extract_cmd = ['7z', 'x'] class ArchiveRar(ArchiveType): suffix = '.rar' mime = 'application/x-rar' extract_cmd = ['unrar', 'x'] class ArchiveTar(ArchiveType): suffix = '.tar' mime = 'application/x-tar' extract_cmd = ['tar', '--extract', '--file'] class ArchiveTarGz(ArchiveType): suffix = '.tar.gz' mime = 'application/gzip' extract_cmd = ['tar', '--extract', '--gzip', '--file'] class ArchiveTarXz(ArchiveType): suffix = '.tar.xz' mime = 'application/x-xz' extract_cmd = ['tar', '--extract', '--xz', '--file'] class ArchiveGzip(ArchiveType): suffix = '.gz' mime = 'application/gzip' single_file = True extract_cmd = ['gunzip'] class TreeExtractor(): ARCHIVE_TYPES: typing.List[ArchiveType] = [ ArchiveZip(), Archive7z(), ArchiveRar(), ArchiveTar(), ArchiveTarGz(), ArchiveTarXz(), ArchiveGzip(), ] def __init__(self) -> None: self.log = logging.getLogger('TreeExtractor') self.extensions = set() for archive_type in self.ARCHIVE_TYPES: suffixes = archive_type.suffix.split('.') self.extensions.add('.' + suffixes[-1]) def extract_tree(self, directory: str = '.') -> None: for root, dirs, files in os.walk(directory): real_root = os.path.realpath(root) for name in files: self.log.debug("Handling '%s' '%s'", real_root, name) # Initial filtering with extensions extension = os.path.splitext(name)[1].lower() if extension not in self.extensions: self.log.debug("Extension not matched: %s", name) continue name_lower = name.lower() filepath = os.path.join(real_root, name) with open(filepath, 'rb') as filedesc: header = filedesc.read(1024) mime = magic.from_buffer(header, mime=True) archive_type = None for archtyp in self.ARCHIVE_TYPES: if archtyp.fits(name_lower, mime, header): archive_type = archtyp break if not archive_type: self.log.debug("Not matched: %s", filepath) continue dest_name = archive_type.dest_name(name) dest = os.path.join(real_root, dest_name) try: archive_type.extract(filepath, dest) except BaseException as e: # TODO Parameters stop on error self.log.error(e, exc_info=True) if os.path.isdir(dest): self.extract_tree(dest) def main(self) -> None: directory = sys.argv[1] if len(sys.argv) > 1 else '.' self.extract_tree(directory) if __name__ == '__main__': coloredlogs.install(level='DEBUG', fmt='%(levelname)s %(message)s') TreeExtractor().main()