This commit is contained in:
Bryson Shepard 2024-03-03 22:21:59 -06:00
parent 5685c56087
commit d314c0fd07
8 changed files with 272 additions and 0 deletions

16
.vscode/launch.json vendored Normal file
View File

@ -0,0 +1,16 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Python Debugger: Current File with Arguments",
"type": "debugpy",
"request": "launch",
"program": "${file}",
"console": "integratedTerminal",
"args": "${command:pickArgs}"
}
]
}

6
error.py Normal file
View File

@ -0,0 +1,6 @@
""" Package specific exceptions. """
class Ai1wmError(Exception):
""" Exceptions raised from this package. """
pass

120
header.py Normal file
View File

@ -0,0 +1,120 @@
""" Parses ai1wm file header. """
import collections
import struct
from error import Ai1wmError
from tools import b__, s__
class Ai1wmHeader(tuple):
""" Parses an `All-in-One WP Migration` header. """
SIZE = 4377
EOF = b'\x00' * SIZE
_Location = collections.namedtuple('_Location', ['offset', 'size'])
_LOC_NAME = _Location(0, 255) # File name
_LOC_SIZE = _Location(255, 14) # File size
_LOC_TIME = _Location(269, 12) # Last modified time
_LOC_PATH = _Location(281, 4096) # File path
def __new__(cls, path=None, name=None, size=None, time=None):
""" Returns a new instance of the object. """
if path or name or size or time:
if not isinstance(path, str) or path == '':
raise ValueError('<path> must be a nonempty string')
if not isinstance(name, str) or name == '':
raise ValueError('<name> must be a nonempty string')
if not isinstance(size, int) or size < 0:
raise ValueError('<size> must be a non-negative integer')
if not isinstance(time, int) or time < 0:
raise ValueError('<time> must be a non-negative integer')
return super(Ai1wmHeader, cls).__new__(cls, [path, name, size, time])
@classmethod
def unpack(cls, header):
""" Unpacks a binary header. """
if len(header) != cls.SIZE:
raise Ai1wmError('invalid header size')
if header == cls.EOF:
return cls()
return cls(
path=s__(cls.__extract_field(header, cls._LOC_PATH)),
name=s__(cls.__extract_field(header, cls._LOC_NAME)),
size=cls.__extract_int(header, cls._LOC_SIZE),
time=cls.__extract_int(header, cls._LOC_TIME),
)
def pack(self):
""" Packs to a binary header. """
attributes, formats, locations = [], '', [
('name', self._LOC_NAME),
('size', self._LOC_SIZE),
('time', self._LOC_TIME),
('path', self._LOC_PATH),
]
for name, location in locations:
attribute = b__(getattr(self, name))
if len(attribute) > location.size:
raise Ai1wmError('{} is too long to pack: {}'.format(name, getattr(self, name)))
attributes.append(attribute)
formats += '{}s'.format(location.size)
return struct.pack(formats, *attributes)
@property
def path(self):
""" Path of the file. """
return self[0]
@property
def name(self):
""" Name of the file. """
return self[1]
@property
def size(self):
""" Size of the file. """
return self[2]
@property
def time(self):
""" Time of the file. """
return self[3]
@property
def eof(self):
""" Indicates if this is an EOF header. """
return not any(self)
@classmethod
def __extract_field(cls, header, location):
""" Extracts a header field. """
try:
field = struct.unpack_from('{}s'.format(location.size), header, offset=location.offset)[0]
except struct.error as e:
raise Ai1wmError('error extracting a header field, error: {}'.format(e))
return field.rstrip(b'\x00')
@classmethod
def __extract_int(cls, header, location):
""" Extracts an integral header field. """
try:
return int(cls.__extract_field(header, location))
except ValueError:
raise Ai1wmError('invalid header field')

29
main.py Normal file
View File

@ -0,0 +1,29 @@
import argparse
import os
import sys
import unpacker
from error import Ai1wmError
'''
check file for proper format
extract info file
gather details
extract files to new folder with permissions
'''
if __name__ == '__main__':
""" Entry of the ai1wm program. """
parser = argparse.ArgumentParser(prog='ai1wm', description='Pack/Unpack All-in-One WP Migration Packages')
parser.add_argument('source', help='source path')
parser.add_argument('target', help='target path')
args = parser.parse_args()
try:
if os.path.isfile(args.source):
unpacker.Ai1wmUnpacker(args.target).unpack(args.source)
except Ai1wmError as e:
print(e)
sys.exit(-1)
sys.exit(0)

36
tools.py Normal file
View File

@ -0,0 +1,36 @@
import sys
def s__(obj):
"""
Converts an object to str format.
:rtype: str
"""
if isinstance(obj, str):
return obj
v = sys.version_info[0]
if v == 2:
if isinstance(obj, unicode):
return obj.encode('utf-8')
elif v == 3:
if isinstance(obj, bytes):
return obj.decode('utf-8')
return str(obj)
def b__(obj):
"""
Converts an object to bytes format.
:rtype: bytes
"""
v = sys.version_info[0]
if v == 2:
return s__(obj)
if isinstance(obj, bytes):
return obj
if not isinstance(obj, str):
obj = str(obj)
return obj.encode('utf-8')

65
unpacker.py Normal file
View File

@ -0,0 +1,65 @@
""" Unpacks an `All-in-One WP Migration` package. """
import errno
import os
from error import Ai1wmError
from header import Ai1wmHeader
from tools import s__
class Ai1wmUnpacker(object):
""" Unpacks an `All-in-One WP Migration` package. """
@staticmethod
def __make_dirs(path, mode=0o777):
""" A simple wrapper of os.makedirs(), which does not raise exception if the leaf directory already exists. """
try:
os.makedirs(path, mode=mode)
except OSError as e:
if e.errno != errno.EEXIST:
raise Ai1wmError('error creating a directory: {}, error: {}'.format(path, e))
return path
@classmethod
def __extract_file(cls, stream, path, size):
""" Extracts a file from the input stream. """
block_size = 0x4000
with open(path, 'wb') as f:
while size > 0:
if block_size > size:
block_size = size
block = stream.read(block_size)
if len(block) != block_size:
raise Ai1wmError('error extracting a file: {}, error: bad file size'.format(path))
f.write(block)
size -= len(block)
@classmethod
def __unpack(cls, stream, target):
""" Unpacks a package. """
while True:
header = Ai1wmHeader.unpack(stream.read(Ai1wmHeader.SIZE))
if header.eof:
break
path = os.path.join(target, header.path)
cls.__make_dirs(path)
path = os.path.join(path, header.name)
cls.__extract_file(stream, path, header.size)
@classmethod
def unpack(cls, source, target):
""" Unpacks a package. """
source, target = s__(os.path.realpath(source)), s__(os.path.realpath(target))
try:
with open(source, 'rb') as f:
cls.__unpack(f, target)
except Exception as ex:
raise Ai1wmError('error unpacking a file: {}, error: {}'.format(source, ex))
return target