177 lines
		
	
	
	
		
			5.2 KiB
		
	
	
	
		
			Text
		
	
	
		
			Executable file
		
	
	
	
	
			
		
		
	
	
			177 lines
		
	
	
	
		
			5.2 KiB
		
	
	
	
		
			Text
		
	
	
		
			Executable file
		
	
	
	
	
| #!/usr/bin/env nix-shell
 | |
| #! nix-shell -i python3 --pure
 | |
| #! nix-shell -p python3 python3Packages.coloredlogs python3Packages.magic unzip p7zip unrar gnutar gzip
 | |
| 
 | |
| import logging
 | |
| import os
 | |
| import subprocess
 | |
| import sys
 | |
| import typing
 | |
| 
 | |
| import coloredlogs
 | |
| import magic
 | |
| 
 | |
| # TODO Able to ignore extensions everywhere
 | |
| 
 | |
| 
 | |
| class ArchiveType:
 | |
|     suffix: str = ""
 | |
|     fullname: str = ""
 | |
|     dest_suffix: str = ""
 | |
|     mime: typing.Optional[str] = None
 | |
|     header: typing.Optional[bytes] = None
 | |
|     extract_cmd: typing.Optional[typing.List[str]] = None
 | |
|     single_file = False
 | |
|     append_dest = False
 | |
| 
 | |
|     def __init__(self) -> None:
 | |
|         self.log = logging.getLogger(self.__class__.__name__)
 | |
| 
 | |
|     def dest_name(self, archive: str) -> str:
 | |
|         return archive + self.dest_suffix
 | |
| 
 | |
|     def fits(self, name_lower: str, mime: str, header: bytes) -> bool:
 | |
|         if not name_lower.endswith(self.suffix):
 | |
|             return False
 | |
|         if self.mime is not None and mime != self.mime:
 | |
|             return False
 | |
|         if self.header is not None and not header.startswith(self.header):
 | |
|             return False
 | |
|         return True
 | |
| 
 | |
|     def _get_cmd(self, archive: str, dest: str) -> typing.List[str]:
 | |
|         assert self.extract_cmd
 | |
|         cmd = self.extract_cmd + [archive]
 | |
|         if self.append_dest:
 | |
|             cmd.append(dest)
 | |
|         return cmd
 | |
| 
 | |
|     def extract(self, archive: str, dest: str) -> None:
 | |
|         cmd = self._get_cmd(archive, dest)
 | |
|         if not self.single_file:
 | |
|             os.mkdir(dest)
 | |
|         self.log.info("Extracting '%s' into '%s'", archive, dest)
 | |
|         self.log.debug("%s", cmd)
 | |
|         if self.single_file:
 | |
|             r = subprocess.run(cmd)
 | |
|         else:
 | |
|             r = subprocess.run(cmd, cwd=dest)
 | |
|         r.check_returncode()
 | |
|         if self.single_file:
 | |
|             assert os.path.isfile(dest)
 | |
| 
 | |
|     extract_fun: typing.Optional[typing.Callable[[str, str], None]] = None
 | |
| 
 | |
| 
 | |
| class ArchiveZip(ArchiveType):
 | |
|     suffix = ".zip"
 | |
|     mime = "application/zip"
 | |
|     extract_cmd = ["unzip"]
 | |
| 
 | |
| 
 | |
| class Archive7z(ArchiveType):
 | |
|     suffix = ".7z"
 | |
|     mime = "application/x-7z-compressed"
 | |
|     extract_cmd = ["7z", "x"]
 | |
| 
 | |
| 
 | |
| class ArchiveRar(ArchiveType):
 | |
|     suffix = ".rar"
 | |
|     mime = "application/x-rar"
 | |
|     extract_cmd = ["unrar", "x"]
 | |
| 
 | |
| 
 | |
| class ArchiveTar(ArchiveType):
 | |
|     suffix = ".tar"
 | |
|     mime = "application/x-tar"
 | |
|     extract_cmd = ["tar", "--extract", "--file"]
 | |
| 
 | |
| 
 | |
| class ArchiveTarGz(ArchiveType):
 | |
|     suffix = ".tar.gz"
 | |
|     mime = "application/gzip"
 | |
|     extract_cmd = ["tar", "--extract", "--gzip", "--file"]
 | |
| 
 | |
| 
 | |
| class ArchiveTarXz(ArchiveType):
 | |
|     suffix = ".tar.xz"
 | |
|     mime = "application/x-xz"
 | |
|     extract_cmd = ["tar", "--extract", "--xz", "--file"]
 | |
| 
 | |
| 
 | |
| class ArchiveGzip(ArchiveType):
 | |
|     suffix = ".gz"
 | |
|     mime = "application/gzip"
 | |
|     single_file = True
 | |
|     extract_cmd = ["gunzip"]
 | |
| 
 | |
| 
 | |
| class TreeExtractor:
 | |
|     ARCHIVE_TYPES: typing.List[ArchiveType] = [
 | |
|         ArchiveZip(),
 | |
|         Archive7z(),
 | |
|         ArchiveRar(),
 | |
|         ArchiveTar(),
 | |
|         ArchiveTarGz(),
 | |
|         ArchiveTarXz(),
 | |
|         ArchiveGzip(),
 | |
|     ]
 | |
| 
 | |
|     def __init__(self) -> None:
 | |
|         self.log = logging.getLogger("TreeExtractor")
 | |
|         self.suffixes = set()
 | |
|         for archive_type in self.ARCHIVE_TYPES:
 | |
|             self.suffixes.add(archive_type.suffix)
 | |
| 
 | |
|     def extract_tree(self, directory: str = ".") -> None:
 | |
|         for root, dirs, files in os.walk(directory):
 | |
|             real_root = os.path.realpath(root)
 | |
|             for name in files:
 | |
|                 self.log.debug("Handling '%s' '%s'", real_root, name)
 | |
| 
 | |
|                 # Initial filtering with suffix
 | |
|                 name_lower = name.lower()
 | |
|                 for suffix in self.suffixes:
 | |
|                     if name_lower.endswith(suffix):
 | |
|                         break
 | |
|                 else:
 | |
|                     self.log.debug("Suffix not matched: %s", name)
 | |
|                     continue
 | |
| 
 | |
|                 filepath = os.path.join(real_root, name)
 | |
|                 with open(filepath, "rb") as filedesc:
 | |
|                     header = filedesc.read(1024)
 | |
|                 mime = magic.detect_from_content(header).mime_type
 | |
| 
 | |
|                 archive_type = None
 | |
|                 for archtyp in self.ARCHIVE_TYPES:
 | |
|                     if archtyp.fits(name_lower, mime, header):
 | |
|                         archive_type = archtyp
 | |
|                         break
 | |
|                 if not archive_type:
 | |
|                     self.log.debug("Not matched: %s", filepath)
 | |
|                     continue
 | |
| 
 | |
|                 dest_name = archive_type.dest_name(name)
 | |
|                 dest = os.path.join(real_root, dest_name)
 | |
|                 dest_tmp = dest + ".tmp"
 | |
|                 try:
 | |
|                     archive_type.extract(filepath, dest_tmp)
 | |
|                 except BaseException as e:
 | |
|                     # TODO Parameters stop on error
 | |
|                     self.log.error(e, exc_info=True)
 | |
|                 else:
 | |
|                     os.unlink(filepath)
 | |
|                     os.rename(dest_tmp, dest)
 | |
| 
 | |
|                 if os.path.isdir(dest):
 | |
|                     self.extract_tree(dest)
 | |
| 
 | |
|     def main(self) -> None:
 | |
|         directory = sys.argv[1] if len(sys.argv) > 1 else "."
 | |
|         self.extract_tree(directory)
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     coloredlogs.install(level="DEBUG", fmt="%(levelname)s %(message)s")
 | |
|     TreeExtractor().main()
 |