""" Unpacks an `All-in-One WP Migration` package. """ import collections import errno import os import struct import sys class Ai1wmUnpacker(tuple): """ Unpacks an `All-in-One WP Migration` package. """ SIZE = 4377 EOF = b'\x00' * SIZE _Location = collections.namedtuple('_Location', ['offset', 'size']) _LOC_NAME = _Location(0, 255) # File name _LOC_SIZE = _Location(255, 14) # File size _LOC_TIME = _Location(269, 12) # Last modified time _LOC_PATH = _Location(281, 4096) # File path def __new__(cls, path=None, name=None, size=None, time=None): """ Returns a new instance of the object. """ if path or name or size or time: if not isinstance(path, str) or path == '': raise ValueError('<path> must be a nonempty string') if not isinstance(name, str) or name == '': raise ValueError('<name> must be a nonempty string') if not isinstance(size, int) or size < 0: raise ValueError('<size> must be a non-negative integer') if not isinstance(time, int) or time < 0: raise ValueError('<time> must be a non-negative integer') return super(Ai1wmUnpacker, cls).__new__(cls, [path, name, size, time]) @classmethod def s__(cls, obj): """ Converts an object to str format. :rtype: str """ if isinstance(obj, str): return obj v = sys.version_info[0] if v == 2: if isinstance(obj, unicode): return obj.encode('utf-8') elif v == 3: if isinstance(obj, bytes): return obj.decode('utf-8') return str(obj) @classmethod def b__(cls, obj): """ Converts an object to bytes format. :rtype: bytes """ v = sys.version_info[0] if v == 2: return cls.s__(obj) if isinstance(obj, bytes): return obj if not isinstance(obj, str): obj = str(obj) return obj.encode('utf-8') @classmethod def unpack_header(cls, header): """ Unpacks a binary header. """ if len(header) != cls.SIZE: raise Exception('invalid header size') if header == cls.EOF: return cls() return cls( path=cls.s__(cls.__extract_field(header, cls._LOC_PATH)), name=cls.s__(cls.__extract_field(header, cls._LOC_NAME)), size=cls.__extract_int(header, cls._LOC_SIZE), time=cls.__extract_int(header, cls._LOC_TIME), ) def pack(self): """ Packs to a binary header. """ attributes, formats, locations = [], '', [ ('name', self._LOC_NAME), ('size', self._LOC_SIZE), ('time', self._LOC_TIME), ('path', self._LOC_PATH), ] for name, location in locations: attribute = self.b__(getattr(self, name)) if len(attribute) > location.size: raise Exception('{} is too long to pack: {}'.format(name, getattr(self, name))) attributes.append(attribute) formats += '{}s'.format(location.size) return struct.pack(formats, *attributes) @property def path(self): """ Path of the file. """ return self[0] @property def name(self): """ Name of the file. """ return self[1] @property def size(self): """ Size of the file. """ return self[2] @property def time(self): """ Time of the file. """ return self[3] @property def eof(self): """ Indicates if this is an EOF header. """ return not any(self) @classmethod def __extract_field(cls, header, location): """ Extracts a header field. """ try: field = struct.unpack_from('{}s'.format(location.size), header, offset=location.offset)[0] except struct.error as e: raise Exception('error extracting a header field, error: {}'.format(e)) return field.rstrip(b'\x00') @classmethod def __extract_int(cls, header, location): """ Extracts an integral header field. """ try: return int(cls.__extract_field(header, location)) except ValueError: raise Exception('invalid header field') @staticmethod def __make_dirs(path, mode=0o777): """ A simple wrapper of os.makedirs(), which does not raise exception if the leaf directory already exists. """ try: os.makedirs(path, mode=mode) except OSError as e: if e.errno != errno.EEXIST: raise Exception('error creating a directory: {}, error: {}'.format(path, e)) return path @classmethod def __extract_file(cls, stream, path, size): """ Extracts a file from the input stream. """ block_size = 0x4000 with open(path, 'wb') as f: while size > 0: if block_size > size: block_size = size block = stream.read(block_size) if len(block) != block_size: raise Exception('error extracting a file: {}, error: bad file size'.format(path)) f.write(block) size -= len(block) @classmethod def __unpack(cls, stream, target): """ Unpacks a package. """ while True: header = cls.unpack_header(stream.read(Ai1wmUnpacker.SIZE)) if header.eof: break path = os.path.join(target, header.path) cls.__make_dirs(path) path = os.path.join(path, header.name) cls.__extract_file(stream, path, header.size) @classmethod def unpack(cls, source, target): """ Unpacks a package. """ source, target = cls.s__(os.path.realpath(source)), cls.s__(os.path.realpath(target)) try: with open(source, 'rb') as f: cls.__unpack(f, target) except Exception as ex: raise Exception('error unpacking a file: {}, error: {}'.format(source, ex)) return target