diff options
author | yenatch <yenatch@gmail.com> | 2014-10-28 22:00:52 -0700 |
---|---|---|
committer | yenatch <yenatch@gmail.com> | 2014-10-28 22:00:52 -0700 |
commit | 85e0e2af68301288f394631196a8dc5af10eab27 (patch) | |
tree | 52ba43b82f484710fd1fa61693879f8e7134e5bd | |
parent | 074f8dfdf4686d973234162ab6ebdc8a91e01550 (diff) |
Refactor the de/compressor.
Speed, readability, accuracy and compression are all improved.
Also fix an off-by-one error in repeat commands in the compressor.
-rw-r--r-- | pokemontools/gfx.py | 507 |
1 files changed, 290 insertions, 217 deletions
diff --git a/pokemontools/gfx.py b/pokemontools/gfx.py index bfd6745..2bc39d6 100644 --- a/pokemontools/gfx.py +++ b/pokemontools/gfx.py @@ -186,222 +186,258 @@ lz_end = 0xff class Compressed: + """ + Usage: + lz = Compressed(data).output + or + lz = Compressed().compress(data) + or + c = Compressed() + c.data = data + lz = c.compress() + """ + + # The target compressor is not always as efficient as this implementation. + # To ignore compatibility and spit out a smaller blob, pass in small=True. + small = False + + # BUG: literal [00] is a byte longer than blank 1. + # This bug exists in the target compressor as well, + # so don't fix until we've given up on replicating it. + min_scores = { + 'blank': 2, + 'iterate': 2, + 'alternate': 3, + 'repeat': 3, + 'reverse': 3, + 'flip': 3, + } + + preference = [ + 'repeat', + 'blank', + 'reverse', + 'flip', + 'iterate', + 'alternate', + #'literal', + ] + def __init__(self, data=None, commands=lz_commands, debug=False): self.data = list(bytearray(data)) self.commands = commands self.debug = debug - self.compress() - def byte_at(self, address): - if address < len(self.data): + if self.data is not None: + self.compress() + + def read_byte(self, address=None): + if address is None: + address = self.address + if 0 <= address < len(self.data): return self.data[address] return None - def compress(self): + def reset_scores(self): + self.scores = {} + self.offsets = {} + for method in self.min_scores.keys(): + self.scores[method] = 0 + + def score_literal(self, method): + address = self.address + compare = { + 'blank': [0], + 'iterate': [self.read_byte(address)], + 'alternate': [self.read_byte(address), self.read_byte(address + 1)], + }[method] + length = 0 + while self.read_byte(address) == compare[length % len(compare)]: + length += 1 + address += 1 + self.scores[method] = length + return compare + + def precompute_repeat_matches(self): + """This is faster than redundantly searching each time repeats are scored.""" + self.indexes = {} + for byte in xrange(0x100): + self.indexes[byte] = [] + index = -1 + while 1: + try: + index = self.data.index(byte, index + 1) + except ValueError: + break + self.indexes[byte].append(index) + + def score_repeats(self, name, direction=1, mutate=int): + + address = self.address + byte = mutate(self.data[address]) + + for index in self.indexes[byte]: + if index >= address: break + + length = 1 # we already know the first byte matches + while 1: + byte = self.read_byte(index + length * direction) + if byte == None or mutate(byte) != self.read_byte(address + length): + break + length += 1 + + # If repeats are almost entirely zeroes, just keep going and use blank instead. + if all(x == 0 for x in self.data[ address + 2 : address + length ]): + if self.read_byte(address + length) == 0: + # zeroes continue after this chunk + continue + + # Adjust the score for two-byte offsets. + two_byte_index = index < address - 0x7f + if self.scores[name] >= length - int(two_byte_index): + continue + + self.scores [name] = length + self.offsets[name] = index + + def compress(self, data=None): """ This algorithm is greedy. It aims to match the compressor it's based on as closely as possible. It doesn't, but in the meantime the output is smaller. """ + + if data is not None: + self.data = data + self.address = 0 self.end = len(self.data) self.output = [] self.literal = [] + self.precompute_repeat_matches() while self.address < self.end: + # Tally up the number of bytes that can be compressed # by a single command from the current address. - self.scores = {} - for method in self.commands.keys(): - self.scores[method] = 0 - # The most common byte by far is 0 (whitespace in - # images and padding in tilemaps and regular data). - address = self.address - while self.byte_at(address) == 0x00: - self.scores['blank'] += 1 - address += 1 + self.reset_scores() - # In the same vein, see how long the same byte repeats for. - address = self.address - self.iter = self.byte_at(address) - while self.byte_at(address) == self.iter: - self.scores['iterate'] += 1 - address += 1 + # Check for repetition. Alternating bytes are common since graphics data is planar. - # Do it again, but for alternating bytes. - address = self.address - self.alts = [] - self.alts += [self.byte_at(address)] - self.alts += [self.byte_at(address + 1)] - while self.byte_at(address) == self.alts[(address - self.address) % 2]: - self.scores['alternate'] += 1 - address += 1 + _, self.iter, self.alts = map(self.score_literal, ['blank', 'iterate', 'alternate']) - # Check if we can repeat any data that the - # decompressor just output (here, the input data). - # TODO this includes the current command's output - self.matches = {} - last_matches = {} - address = self.address - min_length = 4 # minimum worthwhile length - max_length = 9 # any further and the time loss is too significant - for length in xrange(min_length, min(len(self.data) - address, max_length)): - keyword = self.data[address:address+length] - for offset, byte in enumerate(self.data[:address]): - # offset ranges are -0x80:-1 and 0:0x7fff - if offset > 0x7fff and offset < address - 0x80: - continue - if byte == keyword[0]: - # Straight repeat... - if self.data[offset:offset+length] == keyword: - if self.scores['repeat'] < length: - self.scores['repeat'] = length - self.matches['repeat'] = offset - # In reverse... - if self.data[offset-1:offset-length-1:-1] == keyword: - if self.scores['reverse'] < length: - self.scores['reverse'] = length - self.matches['reverse'] = offset - # Or bitflipped - if self.bit_flip([byte]) == self.bit_flip([keyword[0]]): - if self.bit_flip(self.data[offset:offset+length]) == self.bit_flip(keyword): - if self.scores['flip'] < length: - self.scores['flip'] = length - self.matches['flip'] = offset - if self.matches == last_matches: - break - last_matches = list(self.matches) + # Check if we can repeat any data that the decompressor just output (here, the input data). + # This includes the current command's output. + + for args in [ + ('repeat', 1, int), + ('reverse', -1, int), + ('flip', 1, self.bit_flip), + ]: + self.score_repeats(*args) # If the scores are too low, try again from the next byte. - if not any(map(lambda x: { - 'blank': 1, - 'iterate': 2, - 'alternate': 3, - 'repeat': 3, - 'reverse': 3, - 'flip': 3, - }.get(x[0], 10000) < x[1], self.scores.items())): - self.literal += [self.data[self.address]] + if not any( + self.min_scores.get(name, score) + int(self.scores[name] > lowmax) < score + for name, score in self.scores.items() + ): + self.literal += [self.read_byte()] self.address += 1 - else: # payload - # bug: literal [00] is a byte longer than blank 1. - # this bug exists in the target compressor as well, - # so don't fix until we've given up on replicating it. - self.do_literal() + else: + self.do_literal() # payload self.do_scored() # unload any literals we're sitting on self.do_literal() + self.output += [lz_end] - def bit_flip(self, data): - return [sum(((byte >> i) & 1) << (7 - i) for i in xrange(8)) for byte in data] + return self.output + + def bit_flip(self, byte): + return sum(((byte >> i) & 1) << (7 - i) for i in xrange(8)) def do_literal(self): if self.literal: - cmd = self.commands['literal'] length = len(self.literal) - self.do_cmd(cmd, length) - # self.address has already been - # incremented in the main loop + self.do_cmd('literal', length) self.literal = [] - def do_cmd(self, cmd, length): - if length > max_length: - length = max_length + def do_scored(self): + # Which command did the best? + winner, score = sorted( + self.scores.items(), + key = lambda (name, score): ( + -(score - self.min_scores[name] - int(score > lowmax)), + self.preference.index(name) + ) + )[0] + length = self.do_cmd(winner, score) + self.address += length + def do_cmd(self, cmd, length): + length = min(length, max_length) cmd_length = length - 1 + output = [] + if length > lowmax: - output = [(self.commands['long'] << 5) + (cmd << 2) + (cmd_length >> 8)] + output += [(self.commands['long'] << 5) + (self.commands[cmd] << 2) + (cmd_length >> 8)] output += [cmd_length & 0xff] else: - output = [(cmd << 5) + cmd_length] - - if cmd == self.commands['literal']: - output += self.literal - elif cmd == self.commands['iterate']: - output += [self.iter] - elif cmd == self.commands['alternate']: - output += self.alts - else: - for command in ['repeat', 'reverse', 'flip']: - if cmd == self.commands[command]: - offset = self.matches[command] - # negative offsets are a byte shorter - if self.address - offset <= 0x80: - offset = self.address - offset + 0x80 - if cmd == self.commands['repeat']: - offset -= 1 # this is a hack, but it seems to work - output += [offset] - else: - output += [offset / 0x100, offset % 0x100] + output += [(self.commands[cmd] << 5) + cmd_length] + + output += { + 'literal': self.literal, + 'iterate': self.iter, + 'alternate': self.alts, + 'blank': [], + }.get(cmd, []) + + if cmd in ['repeat', 'reverse', 'flip']: + offset = self.offsets[cmd] + # Negative offsets are one byte. + # Positive offsets are two. + if self.address - offset <= 0x7f: + offset = self.address - offset + 0x80 + offset -= 1 # this is a hack, but it seems to work + output += [offset] + else: + output += [offset / 0x100, offset % 0x100] # big endian if self.debug: print ( - dict(map(reversed, self.commands.items()))[cmd], - length, '\t', + cmd, length, '\t', ' '.join(map('{:02x}'.format, output)) ) self.output += output return length - def do_scored(self): - # Which command did the best? - winner, score = sorted( - self.scores.items(), - key=lambda x:(-x[1], [ - 'blank', - 'repeat', - 'reverse', - 'flip', - 'iterate', - 'alternate', - 'literal', - 'long', # hack - ].index(x[0])) - )[0] - cmd = self.commands[winner] - length = self.do_cmd(cmd, score) - self.address += length - class Decompressed: """ Parse compressed data, usually 2bpp. - parameters: - [compressed data] - [tile arrangement] default: 'vert' - [size of pic] default: None - [start] (optional) - - splits output into pic [size] and animation tiles if applicable - data can be fed in from rom if [start] is specified + To decompress from an offset (i.e. in a rom), pass in <start>. """ - def __init__(self, lz=None, start=0, debug=False): - # todo: play nice with Compressed + def __init__(self, lz=None, start=0, commands=lz_commands, debug=False): - assert lz, 'need something to decompress!' self.lz = bytearray(lz) + self.commands = commands + self.command_names = dict(map(reversed, self.commands.items())) - self.byte = None - self.address = 0 - self.start = start - - self.output = [] + self.address = start + self.start = start self.decompress() + self.compressed_data = self.lz[self.start : self.address] - self.compressed_data = self.lz[self.start : self.start + self.address] - - # print tuple containing start and end address - if debug: print '(' + hex(self.start) + ', ' + hex(self.start + self.address+1) + '),' + if debug: print '({:x), {:x})'.format(self.start, self.address) def command_list(self): @@ -409,134 +445,163 @@ class Decompressed: Print a list of commands that were used. Useful for debugging. """ - data = bytearray(self.lz) - address = self.address + data = bytearray(self.compressed_data) + data_list = list(data) + + text = '' + address = 0 + head = 0 + while 1: + offset = 0 + cmd_addr = address byte = data[address] address += 1 - if byte == lz_end: break + + if byte == lz_end: + break + cmd = (byte >> 5) & 0b111 - if cmd == lz_commands['long']: + + if cmd == self.commands['long']: cmd = (byte >> 2) & 0b111 - length = (byte & 0b11) << 8 + length = (byte & 0b11) * 0x100 length += data[address] address += 1 else: length = byte & 0b11111 + length += 1 - name = dict(map(reversed, lz_commands.items()))[cmd] + + name = self.command_names[cmd] + if name == 'iterate': address += 1 + elif name == 'alternate': address += 2 + elif name in ['repeat', 'reverse', 'flip']: if data[address] < 0x80: + offset = data[address] * 0x100 + data[address + 1] address += 2 else: + offset = head - (data[address] & 0x7f) - 1 address += 1 + elif name == 'literal': address += length - print name, length, '\t', ' '.join(map('{:02x}'.format, list(data)[cmd_addr:address])) + + text += '{0}: {1}'.format(name, length) + text += '\t' + ' '.join(map('{:02x}'.format, data_list[cmd_addr:address])) + + if name in ['repeat', 'reverse', 'flip']: + + bites = self.output[ offset : offset + length ] + if name == 'reverse': + bites = self.output[ offset : offset - length : -1 ] + + text += ' [' + ' '.join(map('{:02x}'.format, bites)) + ']' + + text += '\n' + + head += length + + + return text def decompress(self): - """ - Replica of crystal's decompression. - """ self.output = [] - while True: - self.getCurByte() + while 1: if (self.byte == lz_end): - self.address += 1 + self.next() break self.cmd = (self.byte & 0b11100000) >> 5 - if self.cmd == lz_commands['long']: # 10-bit param + if self.cmd_name == 'long': + # 10-bit length self.cmd = (self.byte & 0b00011100) >> 2 - self.length = (self.byte & 0b00000011) << 8 - self.next() - self.length += self.byte + 1 - else: # 5-bit param - self.length = (self.byte & 0b00011111) + 1 - - # literals - if self.cmd == lz_commands['literal']: - self.doLiteral() - elif self.cmd == lz_commands['iterate']: - self.doIter() - elif self.cmd == lz_commands['alternate']: - self.doAlt() - elif self.cmd == lz_commands['blank']: - self.doZeros() - - else: # repeaters - self.next() - if self.byte > 0x7f: # negative - self.displacement = self.byte & 0x7f - self.displacement = len(self.output) - self.displacement - 1 - else: # positive - self.displacement = self.byte * 0x100 - self.next() - self.displacement += self.byte + self.length = (self.next() & 0b00000011) * 0x100 + self.length += self.next() + 1 + else: + # 5-bit length + self.length = (self.next() & 0b00011111) + 1 - if self.cmd == lz_commands['flip']: - self.doFlip() - elif self.cmd == lz_commands['reverse']: - self.doReverse() - else: # lz_commands['repeat'] - self.doRepeat() + do = { + 'literal': self.doLiteral, + 'iterate': self.doIter, + 'alternate': self.doAlt, + 'blank': self.doZeros, + 'flip': self.doFlip, + 'reverse': self.doReverse, + 'repeat': self.doRepeat, + }[ self.cmd_name ] - self.address += 1 - #self.next() # somewhat of a hack + do() - def getCurByte(self): - self.byte = self.lz[self.start+self.address] + @property + def byte(self): + return self.lz[ self.address ] def next(self): + byte = self.byte self.address += 1 - self.getCurByte() + return byte + + @property + def cmd_name(self): + return self.command_names.get(self.cmd) + + + def get_offset(self): + + if self.byte >= 0x80: # negative + # negative + offset = self.next() & 0x7f + offset = len(self.output) - offset - 1 + else: + # positive + offset = self.next() * 0x100 + offset += self.next() + + self.offset = offset + def doLiteral(self): """ Copy data directly. """ - for byte in range(self.length): - self.next() - self.output.append(self.byte) + self.output += self.lz[ self.address : self.address + self.length ] + self.address += self.length def doIter(self): """ Write one byte repeatedly. """ - self.next() - for byte in range(self.length): - self.output.append(self.byte) + self.output += [self.next()] * self.length def doAlt(self): """ Write alternating bytes. """ - self.alts = [] - self.next() - self.alts.append(self.byte) - self.next() - self.alts.append(self.byte) + alts = [self.next(), self.next()] + self.output += [ alts[x & 1] for x in xrange(self.length) ] - for byte in range(self.length): - self.output.append(self.alts[byte&1]) + #alts = [self.next(), self.next()] * (self.length / 2 + 1) + #self.output += alts[:self.length] def doZeros(self): """ Write zeros. """ - for byte in range(self.length): - self.output.append(0x00) + self.output += [0] * self.length def doFlip(self): """ @@ -545,23 +610,31 @@ class Decompressed: eg 11100100 -> 00100111 quat 3 2 1 0 -> 0 2 1 3 """ - for byte in range(self.length): - flipped = sum(1<<(7-i) for i in range(8) if self.output[self.displacement+byte]>>i&1) + self.get_offset() + # Note: appends must be one at a time (this way, repeats can draw from themselves if required) + for i in xrange(self.length): + byte = self.output[ self.offset + i ] + flipped = sum( 1 << (7 - j) for j in xrange(8) if (byte >> j) & 1) self.output.append(flipped) + self.output.append( self.bit_flip( self.output[ self.offset + i ] ) ) def doReverse(self): """ Repeat reversed bytes from output. """ - for byte in range(self.length): - self.output.append(self.output[self.displacement-byte]) + self.get_offset() + # Note: appends must be one at a time (this way, repeats can draw from themselves if required) + for i in xrange(self.length): + self.output.append( self.output[ self.offset - i ] ) def doRepeat(self): """ Repeat bytes from output. """ - for byte in range(self.length): - self.output.append(self.output[self.displacement+byte]) + self.get_offset() + # Note: appends must be one at a time (this way, repeats can draw from themselves if required) + for i in xrange(self.length): + self.output.append( self.output[ self.offset + i ] ) |