wpress-extractor/unpacker.py

210 lines
6.1 KiB
Python

""" Unpacks an `All-in-One WP Migration` package. """
import collections
import errno
import os
import struct
import sys
class Ai1wmUnpacker(tuple):
""" Unpacks an `All-in-One WP Migration` package. """
SIZE = 4377
EOF = b'\x00' * SIZE
_Location = collections.namedtuple('_Location', ['offset', 'size'])
_LOC_NAME = _Location(0, 255) # File name
_LOC_SIZE = _Location(255, 14) # File size
_LOC_TIME = _Location(269, 12) # Last modified time
_LOC_PATH = _Location(281, 4096) # File path
def __new__(cls, path=None, name=None, size=None, time=None):
""" Returns a new instance of the object. """
if path or name or size or time:
if not isinstance(path, str) or path == '':
raise ValueError('<path> must be a nonempty string')
if not isinstance(name, str) or name == '':
raise ValueError('<name> must be a nonempty string')
if not isinstance(size, int) or size < 0:
raise ValueError('<size> must be a non-negative integer')
if not isinstance(time, int) or time < 0:
raise ValueError('<time> must be a non-negative integer')
return super(Ai1wmUnpacker, cls).__new__(cls, [path, name, size, time])
@classmethod
def s__(cls, obj):
"""
Converts an object to str format.
:rtype: str
"""
if isinstance(obj, str):
return obj
v = sys.version_info[0]
if v == 2:
if isinstance(obj, unicode):
return obj.encode('utf-8')
elif v == 3:
if isinstance(obj, bytes):
return obj.decode('utf-8')
return str(obj)
@classmethod
def b__(cls, obj):
"""
Converts an object to bytes format.
:rtype: bytes
"""
v = sys.version_info[0]
if v == 2:
return cls.s__(obj)
if isinstance(obj, bytes):
return obj
if not isinstance(obj, str):
obj = str(obj)
return obj.encode('utf-8')
@classmethod
def unpack_header(cls, header):
""" Unpacks a binary header. """
if len(header) != cls.SIZE:
raise Exception('invalid header size')
if header == cls.EOF:
return cls()
return cls(
path=cls.s__(cls.__extract_field(header, cls._LOC_PATH)),
name=cls.s__(cls.__extract_field(header, cls._LOC_NAME)),
size=cls.__extract_int(header, cls._LOC_SIZE),
time=cls.__extract_int(header, cls._LOC_TIME),
)
def pack(self):
""" Packs to a binary header. """
attributes, formats, locations = [], '', [
('name', self._LOC_NAME),
('size', self._LOC_SIZE),
('time', self._LOC_TIME),
('path', self._LOC_PATH),
]
for name, location in locations:
attribute = self.b__(getattr(self, name))
if len(attribute) > location.size:
raise Exception('{} is too long to pack: {}'.format(name, getattr(self, name)))
attributes.append(attribute)
formats += '{}s'.format(location.size)
return struct.pack(formats, *attributes)
@property
def path(self):
""" Path of the file. """
return self[0]
@property
def name(self):
""" Name of the file. """
return self[1]
@property
def size(self):
""" Size of the file. """
return self[2]
@property
def time(self):
""" Time of the file. """
return self[3]
@property
def eof(self):
""" Indicates if this is an EOF header. """
return not any(self)
@classmethod
def __extract_field(cls, header, location):
""" Extracts a header field. """
try:
field = struct.unpack_from('{}s'.format(location.size), header, offset=location.offset)[0]
except struct.error as e:
raise Exception('error extracting a header field, error: {}'.format(e))
return field.rstrip(b'\x00')
@classmethod
def __extract_int(cls, header, location):
""" Extracts an integral header field. """
try:
return int(cls.__extract_field(header, location))
except ValueError:
raise Exception('invalid header field')
@staticmethod
def __make_dirs(path, mode=0o777):
""" A simple wrapper of os.makedirs(), which does not raise exception if the leaf directory already exists. """
try:
os.makedirs(path, mode=mode)
except OSError as e:
if e.errno != errno.EEXIST:
raise Exception('error creating a directory: {}, error: {}'.format(path, e))
return path
@classmethod
def __extract_file(cls, stream, path, size):
""" Extracts a file from the input stream. """
block_size = 0x4000
with open(path, 'wb') as f:
while size > 0:
if block_size > size:
block_size = size
block = stream.read(block_size)
if len(block) != block_size:
raise Exception('error extracting a file: {}, error: bad file size'.format(path))
f.write(block)
size -= len(block)
@classmethod
def __unpack(cls, stream, target):
""" Unpacks a package. """
while True:
header = cls.unpack_header(stream.read(Ai1wmUnpacker.SIZE))
if header.eof:
break
path = os.path.join(target, header.path)
cls.__make_dirs(path)
path = os.path.join(path, header.name)
cls.__extract_file(stream, path, header.size)
@classmethod
def unpack(cls, source, target):
""" Unpacks a package. """
source, target = cls.s__(os.path.realpath(source)), cls.s__(os.path.realpath(target))
try:
with open(source, 'rb') as f:
cls.__unpack(f, target)
except Exception as ex:
raise Exception('error unpacking a file: {}, error: {}'.format(source, ex))
return target