Bloodsucker

This commit is contained in:
Geoffrey Frogeye 2020-08-08 11:19:48 +02:00
parent 6cdd924613
commit f03d13f7db
8 changed files with 140 additions and 811 deletions

View file

@ -11,9 +11,11 @@ import enum
# TODO Able to ignore extensions everywhere
class ArchiveType():
suffix: str = ''
dest_suffix: str = ''
class ArchiveType:
suffix: str = ""
fullname: str = ""
dest_suffix: str = ""
mime: typing.Optional[str] = None
header: typing.Optional[bytes] = None
extract_cmd: typing.Optional[typing.List[str]] = None
@ -24,7 +26,7 @@ class ArchiveType():
self.log = logging.getLogger(self.__class__.__name__)
def dest_name(self, archive: str) -> str:
return archive[:-len(self.suffix)] + self.dest_suffix
return archive + self.dest_suffix
def fits(self, name_lower: str, mime: str, header: bytes) -> bool:
if not name_lower.endswith(self.suffix):
@ -55,81 +57,87 @@ class ArchiveType():
r.check_returncode()
if self.single_file:
assert os.path.isfile(dest)
os.unlink(archive)
extract_fun: typing.Optional[typing.Callable[[str, str], None]] = None
class ArchiveZip(ArchiveType):
suffix = '.zip'
mime = 'application/zip'
extract_cmd = ['unzip']
suffix = ".zip"
mime = "application/zip"
extract_cmd = ["unzip"]
class Archive7z(ArchiveType):
suffix = '.7z'
mime = 'application/x-7z-compressed'
extract_cmd = ['7z', 'x']
suffix = ".7z"
mime = "application/x-7z-compressed"
extract_cmd = ["7z", "x"]
class ArchiveRar(ArchiveType):
suffix = '.rar'
mime = 'application/x-rar'
extract_cmd = ['unrar', 'x']
suffix = ".rar"
mime = "application/x-rar"
extract_cmd = ["unrar", "x"]
class ArchiveTar(ArchiveType):
suffix = '.tar'
mime = 'application/x-tar'
extract_cmd = ['tar', '--extract', '--file']
suffix = ".tar"
mime = "application/x-tar"
extract_cmd = ["tar", "--extract", "--file"]
class ArchiveTarGz(ArchiveType):
suffix = '.tar.gz'
mime = 'application/gzip'
extract_cmd = ['tar', '--extract', '--gzip', '--file']
suffix = ".tar.gz"
mime = "application/gzip"
extract_cmd = ["tar", "--extract", "--gzip", "--file"]
class ArchiveTarXz(ArchiveType):
suffix = '.tar.xz'
mime = 'application/x-xz'
extract_cmd = ['tar', '--extract', '--xz', '--file']
suffix = ".tar.xz"
mime = "application/x-xz"
extract_cmd = ["tar", "--extract", "--xz", "--file"]
class ArchiveGzip(ArchiveType):
suffix = '.gz'
mime = 'application/gzip'
suffix = ".gz"
mime = "application/gzip"
single_file = True
extract_cmd = ['gunzip']
extract_cmd = ["gunzip"]
class TreeExtractor():
class TreeExtractor:
ARCHIVE_TYPES: typing.List[ArchiveType] = [
ArchiveZip(),
Archive7z(),
ArchiveRar(),
ArchiveTar(),
ArchiveTarGz(),
ArchiveTarXz(),
ArchiveGzip(),
ArchiveZip(),
Archive7z(),
ArchiveRar(),
ArchiveTar(),
ArchiveTarGz(),
ArchiveTarXz(),
ArchiveGzip(),
]
def __init__(self) -> None:
self.log = logging.getLogger('TreeExtractor')
self.extensions = set()
self.log = logging.getLogger("TreeExtractor")
self.suffixes = set()
for archive_type in self.ARCHIVE_TYPES:
suffixes = archive_type.suffix.split('.')
self.extensions.add('.' + suffixes[-1])
self.suffixes.add(archive_type.suffix)
def extract_tree(self, directory: str = '.') -> None:
def extract_tree(self, directory: str = ".") -> None:
for root, dirs, files in os.walk(directory):
real_root = os.path.realpath(root)
for name in files:
self.log.debug("Handling '%s' '%s'", real_root, name)
# Initial filtering with extensions
extension = os.path.splitext(name)[1].lower()
if extension not in self.extensions:
self.log.debug("Extension not matched: %s", name)
# Initial filtering with suffix
name_lower = name.lower()
for suffix in self.suffixes:
if name_lower.endswith(suffix):
break
else:
self.log.debug("Suffix not matched: %s", name)
continue
name_lower = name.lower()
filepath = os.path.join(real_root, name)
with open(filepath, 'rb') as filedesc:
with open(filepath, "rb") as filedesc:
header = filedesc.read(1024)
mime = magic.from_buffer(header, mime=True)
@ -144,19 +152,24 @@ class TreeExtractor():
dest_name = archive_type.dest_name(name)
dest = os.path.join(real_root, dest_name)
dest_tmp = dest + ".tmp"
try:
archive_type.extract(filepath, dest)
archive_type.extract(filepath, dest_tmp)
except BaseException as e:
# TODO Parameters stop on error
self.log.error(e, exc_info=True)
else:
os.unlink(filepath)
os.rename(dest_tmp, dest)
if os.path.isdir(dest):
self.extract_tree(dest)
def main(self) -> None:
directory = sys.argv[1] if len(sys.argv) > 1 else '.'
directory = sys.argv[1] if len(sys.argv) > 1 else "."
self.extract_tree(directory)
if __name__ == '__main__':
coloredlogs.install(level='DEBUG', fmt='%(levelname)s %(message)s')
if __name__ == "__main__":
coloredlogs.install(level="DEBUG", fmt="%(levelname)s %(message)s")
TreeExtractor().main()