summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoryenatch <yenatch@gmail.com>2014-10-28 22:00:52 -0700
committeryenatch <yenatch@gmail.com>2014-10-28 22:00:52 -0700
commit85e0e2af68301288f394631196a8dc5af10eab27 (patch)
tree52ba43b82f484710fd1fa61693879f8e7134e5bd
parent074f8dfdf4686d973234162ab6ebdc8a91e01550 (diff)
Refactor the de/compressor.
Speed, readability, accuracy and compression are all improved. Also fix an off-by-one error in repeat commands in the compressor.
-rw-r--r--pokemontools/gfx.py507
1 files changed, 290 insertions, 217 deletions
diff --git a/pokemontools/gfx.py b/pokemontools/gfx.py
index bfd6745..2bc39d6 100644
--- a/pokemontools/gfx.py
+++ b/pokemontools/gfx.py
@@ -186,222 +186,258 @@ lz_end = 0xff
class Compressed:
+ """
+ Usage:
+ lz = Compressed(data).output
+ or
+ lz = Compressed().compress(data)
+ or
+ c = Compressed()
+ c.data = data
+ lz = c.compress()
+ """
+
+ # The target compressor is not always as efficient as this implementation.
+ # To ignore compatibility and spit out a smaller blob, pass in small=True.
+ small = False
+
+ # BUG: literal [00] is a byte longer than blank 1.
+ # This bug exists in the target compressor as well,
+ # so don't fix until we've given up on replicating it.
+ min_scores = {
+ 'blank': 2,
+ 'iterate': 2,
+ 'alternate': 3,
+ 'repeat': 3,
+ 'reverse': 3,
+ 'flip': 3,
+ }
+
+ preference = [
+ 'repeat',
+ 'blank',
+ 'reverse',
+ 'flip',
+ 'iterate',
+ 'alternate',
+ #'literal',
+ ]
+
def __init__(self, data=None, commands=lz_commands, debug=False):
self.data = list(bytearray(data))
self.commands = commands
self.debug = debug
- self.compress()
- def byte_at(self, address):
- if address < len(self.data):
+ if self.data is not None:
+ self.compress()
+
+ def read_byte(self, address=None):
+ if address is None:
+ address = self.address
+ if 0 <= address < len(self.data):
return self.data[address]
return None
- def compress(self):
+ def reset_scores(self):
+ self.scores = {}
+ self.offsets = {}
+ for method in self.min_scores.keys():
+ self.scores[method] = 0
+
+ def score_literal(self, method):
+ address = self.address
+ compare = {
+ 'blank': [0],
+ 'iterate': [self.read_byte(address)],
+ 'alternate': [self.read_byte(address), self.read_byte(address + 1)],
+ }[method]
+ length = 0
+ while self.read_byte(address) == compare[length % len(compare)]:
+ length += 1
+ address += 1
+ self.scores[method] = length
+ return compare
+
+ def precompute_repeat_matches(self):
+ """This is faster than redundantly searching each time repeats are scored."""
+ self.indexes = {}
+ for byte in xrange(0x100):
+ self.indexes[byte] = []
+ index = -1
+ while 1:
+ try:
+ index = self.data.index(byte, index + 1)
+ except ValueError:
+ break
+ self.indexes[byte].append(index)
+
+ def score_repeats(self, name, direction=1, mutate=int):
+
+ address = self.address
+ byte = mutate(self.data[address])
+
+ for index in self.indexes[byte]:
+ if index >= address: break
+
+ length = 1 # we already know the first byte matches
+ while 1:
+ byte = self.read_byte(index + length * direction)
+ if byte == None or mutate(byte) != self.read_byte(address + length):
+ break
+ length += 1
+
+ # If repeats are almost entirely zeroes, just keep going and use blank instead.
+ if all(x == 0 for x in self.data[ address + 2 : address + length ]):
+ if self.read_byte(address + length) == 0:
+ # zeroes continue after this chunk
+ continue
+
+ # Adjust the score for two-byte offsets.
+ two_byte_index = index < address - 0x7f
+ if self.scores[name] >= length - int(two_byte_index):
+ continue
+
+ self.scores [name] = length
+ self.offsets[name] = index
+
+ def compress(self, data=None):
"""
This algorithm is greedy.
It aims to match the compressor it's based on as closely as possible.
It doesn't, but in the meantime the output is smaller.
"""
+
+ if data is not None:
+ self.data = data
+
self.address = 0
self.end = len(self.data)
self.output = []
self.literal = []
+ self.precompute_repeat_matches()
while self.address < self.end:
+
# Tally up the number of bytes that can be compressed
# by a single command from the current address.
- self.scores = {}
- for method in self.commands.keys():
- self.scores[method] = 0
- # The most common byte by far is 0 (whitespace in
- # images and padding in tilemaps and regular data).
- address = self.address
- while self.byte_at(address) == 0x00:
- self.scores['blank'] += 1
- address += 1
+ self.reset_scores()
- # In the same vein, see how long the same byte repeats for.
- address = self.address
- self.iter = self.byte_at(address)
- while self.byte_at(address) == self.iter:
- self.scores['iterate'] += 1
- address += 1
+ # Check for repetition. Alternating bytes are common since graphics data is planar.
- # Do it again, but for alternating bytes.
- address = self.address
- self.alts = []
- self.alts += [self.byte_at(address)]
- self.alts += [self.byte_at(address + 1)]
- while self.byte_at(address) == self.alts[(address - self.address) % 2]:
- self.scores['alternate'] += 1
- address += 1
+ _, self.iter, self.alts = map(self.score_literal, ['blank', 'iterate', 'alternate'])
- # Check if we can repeat any data that the
- # decompressor just output (here, the input data).
- # TODO this includes the current command's output
- self.matches = {}
- last_matches = {}
- address = self.address
- min_length = 4 # minimum worthwhile length
- max_length = 9 # any further and the time loss is too significant
- for length in xrange(min_length, min(len(self.data) - address, max_length)):
- keyword = self.data[address:address+length]
- for offset, byte in enumerate(self.data[:address]):
- # offset ranges are -0x80:-1 and 0:0x7fff
- if offset > 0x7fff and offset < address - 0x80:
- continue
- if byte == keyword[0]:
- # Straight repeat...
- if self.data[offset:offset+length] == keyword:
- if self.scores['repeat'] < length:
- self.scores['repeat'] = length
- self.matches['repeat'] = offset
- # In reverse...
- if self.data[offset-1:offset-length-1:-1] == keyword:
- if self.scores['reverse'] < length:
- self.scores['reverse'] = length
- self.matches['reverse'] = offset
- # Or bitflipped
- if self.bit_flip([byte]) == self.bit_flip([keyword[0]]):
- if self.bit_flip(self.data[offset:offset+length]) == self.bit_flip(keyword):
- if self.scores['flip'] < length:
- self.scores['flip'] = length
- self.matches['flip'] = offset
- if self.matches == last_matches:
- break
- last_matches = list(self.matches)
+ # Check if we can repeat any data that the decompressor just output (here, the input data).
+ # This includes the current command's output.
+
+ for args in [
+ ('repeat', 1, int),
+ ('reverse', -1, int),
+ ('flip', 1, self.bit_flip),
+ ]:
+ self.score_repeats(*args)
# If the scores are too low, try again from the next byte.
- if not any(map(lambda x: {
- 'blank': 1,
- 'iterate': 2,
- 'alternate': 3,
- 'repeat': 3,
- 'reverse': 3,
- 'flip': 3,
- }.get(x[0], 10000) < x[1], self.scores.items())):
- self.literal += [self.data[self.address]]
+ if not any(
+ self.min_scores.get(name, score) + int(self.scores[name] > lowmax) < score
+ for name, score in self.scores.items()
+ ):
+ self.literal += [self.read_byte()]
self.address += 1
- else: # payload
- # bug: literal [00] is a byte longer than blank 1.
- # this bug exists in the target compressor as well,
- # so don't fix until we've given up on replicating it.
- self.do_literal()
+ else:
+ self.do_literal() # payload
self.do_scored()
# unload any literals we're sitting on
self.do_literal()
+
self.output += [lz_end]
- def bit_flip(self, data):
- return [sum(((byte >> i) & 1) << (7 - i) for i in xrange(8)) for byte in data]
+ return self.output
+
+ def bit_flip(self, byte):
+ return sum(((byte >> i) & 1) << (7 - i) for i in xrange(8))
def do_literal(self):
if self.literal:
- cmd = self.commands['literal']
length = len(self.literal)
- self.do_cmd(cmd, length)
- # self.address has already been
- # incremented in the main loop
+ self.do_cmd('literal', length)
self.literal = []
- def do_cmd(self, cmd, length):
- if length > max_length:
- length = max_length
+ def do_scored(self):
+ # Which command did the best?
+ winner, score = sorted(
+ self.scores.items(),
+ key = lambda (name, score): (
+ -(score - self.min_scores[name] - int(score > lowmax)),
+ self.preference.index(name)
+ )
+ )[0]
+ length = self.do_cmd(winner, score)
+ self.address += length
+ def do_cmd(self, cmd, length):
+ length = min(length, max_length)
cmd_length = length - 1
+ output = []
+
if length > lowmax:
- output = [(self.commands['long'] << 5) + (cmd << 2) + (cmd_length >> 8)]
+ output += [(self.commands['long'] << 5) + (self.commands[cmd] << 2) + (cmd_length >> 8)]
output += [cmd_length & 0xff]
else:
- output = [(cmd << 5) + cmd_length]
-
- if cmd == self.commands['literal']:
- output += self.literal
- elif cmd == self.commands['iterate']:
- output += [self.iter]
- elif cmd == self.commands['alternate']:
- output += self.alts
- else:
- for command in ['repeat', 'reverse', 'flip']:
- if cmd == self.commands[command]:
- offset = self.matches[command]
- # negative offsets are a byte shorter
- if self.address - offset <= 0x80:
- offset = self.address - offset + 0x80
- if cmd == self.commands['repeat']:
- offset -= 1 # this is a hack, but it seems to work
- output += [offset]
- else:
- output += [offset / 0x100, offset % 0x100]
+ output += [(self.commands[cmd] << 5) + cmd_length]
+
+ output += {
+ 'literal': self.literal,
+ 'iterate': self.iter,
+ 'alternate': self.alts,
+ 'blank': [],
+ }.get(cmd, [])
+
+ if cmd in ['repeat', 'reverse', 'flip']:
+ offset = self.offsets[cmd]
+ # Negative offsets are one byte.
+ # Positive offsets are two.
+ if self.address - offset <= 0x7f:
+ offset = self.address - offset + 0x80
+ offset -= 1 # this is a hack, but it seems to work
+ output += [offset]
+ else:
+ output += [offset / 0x100, offset % 0x100] # big endian
if self.debug:
print (
- dict(map(reversed, self.commands.items()))[cmd],
- length, '\t',
+ cmd, length, '\t',
' '.join(map('{:02x}'.format, output))
)
self.output += output
return length
- def do_scored(self):
- # Which command did the best?
- winner, score = sorted(
- self.scores.items(),
- key=lambda x:(-x[1], [
- 'blank',
- 'repeat',
- 'reverse',
- 'flip',
- 'iterate',
- 'alternate',
- 'literal',
- 'long', # hack
- ].index(x[0]))
- )[0]
- cmd = self.commands[winner]
- length = self.do_cmd(cmd, score)
- self.address += length
-
class Decompressed:
"""
Parse compressed data, usually 2bpp.
- parameters:
- [compressed data]
- [tile arrangement] default: 'vert'
- [size of pic] default: None
- [start] (optional)
-
- splits output into pic [size] and animation tiles if applicable
- data can be fed in from rom if [start] is specified
+ To decompress from an offset (i.e. in a rom), pass in <start>.
"""
- def __init__(self, lz=None, start=0, debug=False):
- # todo: play nice with Compressed
+ def __init__(self, lz=None, start=0, commands=lz_commands, debug=False):
- assert lz, 'need something to decompress!'
self.lz = bytearray(lz)
+ self.commands = commands
+ self.command_names = dict(map(reversed, self.commands.items()))
- self.byte = None
- self.address = 0
- self.start = start
-
- self.output = []
+ self.address = start
+ self.start = start
self.decompress()
+ self.compressed_data = self.lz[self.start : self.address]
- self.compressed_data = self.lz[self.start : self.start + self.address]
-
- # print tuple containing start and end address
- if debug: print '(' + hex(self.start) + ', ' + hex(self.start + self.address+1) + '),'
+ if debug: print '({:x), {:x})'.format(self.start, self.address)
def command_list(self):
@@ -409,134 +445,163 @@ class Decompressed:
Print a list of commands that were used. Useful for debugging.
"""
- data = bytearray(self.lz)
- address = self.address
+ data = bytearray(self.compressed_data)
+ data_list = list(data)
+
+ text = ''
+ address = 0
+ head = 0
+
while 1:
+ offset = 0
+
cmd_addr = address
byte = data[address]
address += 1
- if byte == lz_end: break
+
+ if byte == lz_end:
+ break
+
cmd = (byte >> 5) & 0b111
- if cmd == lz_commands['long']:
+
+ if cmd == self.commands['long']:
cmd = (byte >> 2) & 0b111
- length = (byte & 0b11) << 8
+ length = (byte & 0b11) * 0x100
length += data[address]
address += 1
else:
length = byte & 0b11111
+
length += 1
- name = dict(map(reversed, lz_commands.items()))[cmd]
+
+ name = self.command_names[cmd]
+
if name == 'iterate':
address += 1
+
elif name == 'alternate':
address += 2
+
elif name in ['repeat', 'reverse', 'flip']:
if data[address] < 0x80:
+ offset = data[address] * 0x100 + data[address + 1]
address += 2
else:
+ offset = head - (data[address] & 0x7f) - 1
address += 1
+
elif name == 'literal':
address += length
- print name, length, '\t', ' '.join(map('{:02x}'.format, list(data)[cmd_addr:address]))
+
+ text += '{0}: {1}'.format(name, length)
+ text += '\t' + ' '.join(map('{:02x}'.format, data_list[cmd_addr:address]))
+
+ if name in ['repeat', 'reverse', 'flip']:
+
+ bites = self.output[ offset : offset + length ]
+ if name == 'reverse':
+ bites = self.output[ offset : offset - length : -1 ]
+
+ text += ' [' + ' '.join(map('{:02x}'.format, bites)) + ']'
+
+ text += '\n'
+
+ head += length
+
+
+ return text
def decompress(self):
- """
- Replica of crystal's decompression.
- """
self.output = []
- while True:
- self.getCurByte()
+ while 1:
if (self.byte == lz_end):
- self.address += 1
+ self.next()
break
self.cmd = (self.byte & 0b11100000) >> 5
- if self.cmd == lz_commands['long']: # 10-bit param
+ if self.cmd_name == 'long':
+ # 10-bit length
self.cmd = (self.byte & 0b00011100) >> 2
- self.length = (self.byte & 0b00000011) << 8
- self.next()
- self.length += self.byte + 1
- else: # 5-bit param
- self.length = (self.byte & 0b00011111) + 1
-
- # literals
- if self.cmd == lz_commands['literal']:
- self.doLiteral()
- elif self.cmd == lz_commands['iterate']:
- self.doIter()
- elif self.cmd == lz_commands['alternate']:
- self.doAlt()
- elif self.cmd == lz_commands['blank']:
- self.doZeros()
-
- else: # repeaters
- self.next()
- if self.byte > 0x7f: # negative
- self.displacement = self.byte & 0x7f
- self.displacement = len(self.output) - self.displacement - 1
- else: # positive
- self.displacement = self.byte * 0x100
- self.next()
- self.displacement += self.byte
+ self.length = (self.next() & 0b00000011) * 0x100
+ self.length += self.next() + 1
+ else:
+ # 5-bit length
+ self.length = (self.next() & 0b00011111) + 1
- if self.cmd == lz_commands['flip']:
- self.doFlip()
- elif self.cmd == lz_commands['reverse']:
- self.doReverse()
- else: # lz_commands['repeat']
- self.doRepeat()
+ do = {
+ 'literal': self.doLiteral,
+ 'iterate': self.doIter,
+ 'alternate': self.doAlt,
+ 'blank': self.doZeros,
+ 'flip': self.doFlip,
+ 'reverse': self.doReverse,
+ 'repeat': self.doRepeat,
+ }[ self.cmd_name ]
- self.address += 1
- #self.next() # somewhat of a hack
+ do()
- def getCurByte(self):
- self.byte = self.lz[self.start+self.address]
+ @property
+ def byte(self):
+ return self.lz[ self.address ]
def next(self):
+ byte = self.byte
self.address += 1
- self.getCurByte()
+ return byte
+
+ @property
+ def cmd_name(self):
+ return self.command_names.get(self.cmd)
+
+
+ def get_offset(self):
+
+ if self.byte >= 0x80: # negative
+ # negative
+ offset = self.next() & 0x7f
+ offset = len(self.output) - offset - 1
+ else:
+ # positive
+ offset = self.next() * 0x100
+ offset += self.next()
+
+ self.offset = offset
+
def doLiteral(self):
"""
Copy data directly.
"""
- for byte in range(self.length):
- self.next()
- self.output.append(self.byte)
+ self.output += self.lz[ self.address : self.address + self.length ]
+ self.address += self.length
def doIter(self):
"""
Write one byte repeatedly.
"""
- self.next()
- for byte in range(self.length):
- self.output.append(self.byte)
+ self.output += [self.next()] * self.length
def doAlt(self):
"""
Write alternating bytes.
"""
- self.alts = []
- self.next()
- self.alts.append(self.byte)
- self.next()
- self.alts.append(self.byte)
+ alts = [self.next(), self.next()]
+ self.output += [ alts[x & 1] for x in xrange(self.length) ]
- for byte in range(self.length):
- self.output.append(self.alts[byte&1])
+ #alts = [self.next(), self.next()] * (self.length / 2 + 1)
+ #self.output += alts[:self.length]
def doZeros(self):
"""
Write zeros.
"""
- for byte in range(self.length):
- self.output.append(0x00)
+ self.output += [0] * self.length
def doFlip(self):
"""
@@ -545,23 +610,31 @@ class Decompressed:
eg 11100100 -> 00100111
quat 3 2 1 0 -> 0 2 1 3
"""
- for byte in range(self.length):
- flipped = sum(1<<(7-i) for i in range(8) if self.output[self.displacement+byte]>>i&1)
+ self.get_offset()
+ # Note: appends must be one at a time (this way, repeats can draw from themselves if required)
+ for i in xrange(self.length):
+ byte = self.output[ self.offset + i ]
+ flipped = sum( 1 << (7 - j) for j in xrange(8) if (byte >> j) & 1)
self.output.append(flipped)
+ self.output.append( self.bit_flip( self.output[ self.offset + i ] ) )
def doReverse(self):
"""
Repeat reversed bytes from output.
"""
- for byte in range(self.length):
- self.output.append(self.output[self.displacement-byte])
+ self.get_offset()
+ # Note: appends must be one at a time (this way, repeats can draw from themselves if required)
+ for i in xrange(self.length):
+ self.output.append( self.output[ self.offset - i ] )
def doRepeat(self):
"""
Repeat bytes from output.
"""
- for byte in range(self.length):
- self.output.append(self.output[self.displacement+byte])
+ self.get_offset()
+ # Note: appends must be one at a time (this way, repeats can draw from themselves if required)
+ for i in xrange(self.length):
+ self.output.append( self.output[ self.offset + i ] )