summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoryenatch <yenatch@gmail.com>2015-02-07 13:06:08 -0800
committeryenatch <yenatch@gmail.com>2015-02-07 13:06:08 -0800
commit36f7ee513e0a2d3fe7278e58a56d05d1a34177f0 (patch)
tree6aad8f603c463e477e3121d2e1837c2f49cafd13
parent78aa4f00be882b1688c5913400ccf32a3454844a (diff)
Refactor the compressor again.
This is a little closer to the target compressor than before.
-rw-r--r--pokemontools/gfx.py411
1 files changed, 241 insertions, 170 deletions
diff --git a/pokemontools/gfx.py b/pokemontools/gfx.py
index 25caeba..25f7ce2 100644
--- a/pokemontools/gfx.py
+++ b/pokemontools/gfx.py
@@ -216,240 +216,311 @@ class Compressed:
c = Compressed()
c.data = data
lz = c.compress()
+
+ There are some issues with reproducing the target compressor.
+ Some notes are listed here:
+ - the criteria for detecting a lookback is inconsistent
+ - sometimes lookbacks that are mostly 0s are pruned, sometimes not
+ - target appears to skip ahead if it can use a lookback soon, stopping the current command short or in some cases truncating it with literals.
+ - this has been implemented, but the specifics are unknown
+ - self.min_scores: It's unknown if blank's minimum score should be 1 or 2. Most likely it's 1, with some other hack to account for edge cases.
+ - may be related to the above
+ - target does not appear to compress backwards
"""
- # The target compressor is not always as efficient as this implementation.
- # To ignore compatibility and spit out a smaller blob, pass in small=True.
- small = False
-
- # BUG: literal [00] is a byte longer than blank 1.
- # In other words, blank's real minimum score is 1.
- # This bug exists in the target compressor as well,
- # so don't fix until we've given up on replicating it.
- min_scores = {
- 'blank': 2,
- 'iterate': 2,
- 'alternate': 3,
- 'repeat': 3,
- 'reverse': 3,
- 'flip': 3,
- }
+ def __init__(self, *args, **kwargs):
- preference = [
- 'repeat',
- 'blank',
- 'reverse',
- 'flip',
- 'iterate',
- 'alternate',
- #'literal',
- ]
-
- data = None
- commands = lz_commands
- debug = False
- literal_only = False
+ self.min_scores = {
+ 'blank': 1,
+ 'iterate': 2,
+ 'alternate': 3,
+ 'repeat': 3,
+ 'reverse': 3,
+ 'flip': 3,
+ }
+
+ self.preference = [
+ 'repeat',
+ 'blank',
+ 'flip',
+ 'reverse',
+ 'iterate',
+ 'alternate',
+ #'literal',
+ ]
- arg_names = 'data', 'commands', 'debug', 'literal_only'
+ self.lookback_methods = 'repeat', 'reverse', 'flip'
+
+ self.__dict__.update({
+ 'data': None,
+ 'commands': lz_commands,
+ 'debug': False,
+ 'literal_only': False,
+ })
+
+ self.arg_names = 'data', 'commands', 'debug', 'literal_only'
- def __init__(self, *args, **kwargs):
- self.__dict__.update(dict(zip(self.arg_names, args)))
self.__dict__.update(kwargs)
+ self.__dict__.update(dict(zip(self.arg_names, args)))
if self.data is not None:
self.compress()
- def read_byte(self, address=None):
- if address is None:
- address = self.address
- if 0 <= address < len(self.data):
- return self.data[address]
- return None
-
- def reset_scores(self):
- self.scores = {}
- self.offsets = {}
- for method in self.min_scores.keys():
- self.scores[method] = 0
+ def compress(self, data=None):
+ if data is not None:
+ self.data = data
- def score_literal(self, method):
- address = self.address
- compare = {
- 'blank': [0],
- 'iterate': [self.read_byte(address)],
- 'alternate': [self.read_byte(address), self.read_byte(address + 1)],
- }[method]
- length = 0
- while self.read_byte(address) == compare[length % len(compare)]:
- length += 1
- address += 1
- self.scores[method] = length
- return compare
+ self.data = list(bytearray(self.data))
- def precompute_repeat_matches(self):
- """
- This is faster than redundantly searching each time repeats are scored.
- """
self.indexes = {}
- for byte in xrange(0x100):
- self.indexes[byte] = []
- index = -1
- while 1:
- try:
- index = self.data.index(byte, index + 1)
- except ValueError:
- break
- self.indexes[byte].append(index)
+ self.lookbacks = {}
+ for method in self.lookback_methods:
+ self.lookbacks[method] = {}
- def score_repeats(self, name, direction=1, mutate=int):
+ self.address = 0
+ self.end = len(self.data)
+ self.output = []
+ self.literal = None
- address = self.address
- byte = mutate(self.data[address])
+ while self.address < self.end:
- for index in self.indexes[byte]:
- if index >= address: break
+ if self.score():
+ self.do_literal()
+ self.do_winner()
- length = 1 # we already know the first byte matches
- while 1:
- byte = self.read_byte(index + length * direction)
- if byte == None or mutate(byte) != self.read_byte(address + length):
- break
- length += 1
+ else:
+ if self.literal == None:
+ self.literal = self.address
+ self.address += 1
- # If repeats are almost entirely zeroes, just keep going and use blank instead.
- if all(x == 0 for x in self.data[ address + 2 : address + length ]):
- if self.read_byte(address + length) == 0:
- # zeroes continue after this chunk
- continue
+ self.do_literal()
- # Adjust the score for two-byte offsets.
- two_byte_index = index < address - 0x7f
- if self.scores[name] >= length - int(two_byte_index):
- continue
+ self.output += [lz_end]
+ return self.output
- self.scores [name] = length
- self.offsets[name] = index
+ def reset_scores(self):
+ self.scores = {}
+ self.offsets = {}
+ self.helpers = {}
+ for method in self.min_scores.iterkeys():
+ self.scores[method] = 0
- def compress(self, data=None):
- """
- This algorithm is greedy.
- It aims to match the compressor it's based on as closely as possible.
- It doesn't, but in the meantime the output is smaller.
- """
+ def bit_flip(self, byte):
+ return bit_flipped[byte]
- if data is not None:
- self.data = data
+ def do_literal(self):
+ if self.literal != None:
+ length = abs(self.address - self.literal)
+ start = min(self.literal, self.address + 1)
+ self.helpers['literal'] = self.data[start:start+length]
+ self.do_cmd('literal', length)
+ self.literal = None
- self.data = list(bytearray(self.data))
+ def score(self):
+ self.reset_scores()
- self.address = 0
- self.end = len(self.data)
- self.output = []
- self.literal = []
- self.precompute_repeat_matches()
+ map(self.score_literal, ['iterate', 'alternate', 'blank'])
- while self.address < self.end:
+ for method in self.lookback_methods:
+ self.scores[method], self.offsets[method] = self.find_lookback(method, self.address)
+
+ # Compatibility:
+ # If a lookback is close, reduce the scores of other commands
+ best_method, best_score = max(
+ self.scores.items(),
+ key = lambda x: (
+ x[1],
+ -self.preference.index(x[0])
+ )
+ )
+ for method in self.lookback_methods:
+ for address in xrange(self.address+1, self.address+min(best_score, 6)):
+ if self.find_lookback(method, address)[0] > max(self.min_scores[method], best_score):
+ # BUG: lookbacks can reduce themselves. This appears to be a bug in the target also.
+ for m, score in self.scores.items():
+ self.scores[m] = min(score, address - self.address)
+
+ return any(
+ score
+ > self.min_scores[method] + int(score > lowmax)
+ for method, score in self.scores.iteritems()
+ )
+
+ def read(self, address=None):
+ if address is None:
+ address = self.address
+ if 0 <= address < len(self.data):
+ return self.data[address]
+ return None
- # Tally up the number of bytes that can be compressed
- # by a single command from the current address.
+ def find_all_lookbacks(self):
+ for method in self.lookback_methods:
+ for address, byte in enumerate(self.data):
+ self.find_lookback(method, address)
- self.reset_scores()
+ def find_lookback(self, method, address=None):
+ if address is None:
+ address = self.address
- # Check for repetition. Alternating bytes are common since graphics data is planar.
+ existing = self.lookbacks.get(method, {}).get(address)
+ if existing != None:
+ return existing
- _, self.iter, self.alts = map(self.score_literal, ['blank', 'iterate', 'alternate'])
+ lookback = 0, None
- # Check if we can repeat any data that the decompressor just output (here, the input data).
- # This includes the current command's output.
+ # Better to not carelessly optimize at the moment.
+ """
+ if address < 2:
+ return lookback
+ """
- for args in [
- ('repeat', 1, int),
- ('reverse', -1, int),
- ('flip', 1, self.bit_flip),
- ]:
- self.score_repeats(*args)
+ byte = self.read(address)
+ if byte is None:
+ return lookback
- # If the scores are too low, try again from the next byte.
+ direction, mutate = {
+ 'repeat': ( 1, int),
+ 'reverse': (-1, int),
+ 'flip': ( 1, self.bit_flip),
+ }[method]
- if self.literal_only or not any(
- self.min_scores.get(name, score)
- + int(score > lowmax)
- < score
- for name, score in self.scores.items()
- ):
- self.literal += [self.read_byte()]
- self.address += 1
+ # Doesn't seem to help
+ """
+ if mutate == self.bit_flip:
+ if byte == 0:
+ self.lookbacks[method][address] = lookback
+ return lookback
+ """
+
+ data_len = len(self.data)
+ is_two_byte_index = lambda index: int(index < address - 0x7f)
+
+ for index in self.get_indexes(mutate(byte)):
+ if index >= address:
+ break
+
+ old_length, old_index = lookback
+ if direction == 1:
+ if old_length > data_len - index: break
else:
- self.do_literal() # payload
- self.do_scored()
+ if old_length > index: continue
- # unload any literals we're sitting on
- self.do_literal()
+ if self.read(index) in [None]: continue
- self.output += [lz_end]
+ length = 1 # we know there's at least one match, or we wouldn't be checking this index
+ while 1:
+ this_byte = self.read(address + length)
+ that_byte = self.read(index + length * direction)
+ if that_byte == None or this_byte != mutate(that_byte):
+ break
+ length += 1
+ """
+ if direction == 1:
+ if not any(self.data[address+2:address+length]): continue
+ """
+ if length - is_two_byte_index(index) >= old_length - is_two_byte_index(old_index): # XXX >?
+ # XXX maybe avoid two-byte indexes when possible
+ lookback = length, index
+
+ self.lookbacks[method][address] = lookback
+ return lookback
+
+ def get_indexes(self, byte):
+ if not self.indexes.has_key(byte):
+ self.indexes[byte] = []
+ index = -1
+ while 1:
+ try:
+ index = self.data.index(byte, index + 1)
+ except ValueError:
+ break
+ self.indexes[byte].append(index)
+ return self.indexes[byte]
- return self.output
+ def score_literal(self, method):
+ address = self.address
- def bit_flip(self, byte):
- return bit_flipped[byte]
+ compare = {
+ 'blank': [0],
+ 'iterate': [self.read(address)],
+ 'alternate': [self.read(address), self.read(address + 1)],
+ }[method]
- def do_literal(self):
- if self.literal:
- length = len(self.literal)
- self.do_cmd('literal', length)
- self.literal = []
+ # XXX may or may not be correct
+ if method == 'alternate' and compare[0] == 0:
+ return
- def do_scored(self):
- # Which command will compress the longest chunk?
- winner, score = sorted(
- self.scores.items(),
- key = lambda (name, score): (
- -(score - self.min_scores[name] - int(score > lowmax)),
- self.preference.index(name)
+ length = 0
+ while self.read(address + length) == compare[length % len(compare)]:
+ length += 1
+
+ self.scores[method] = length
+ self.helpers[method] = compare
+
+ def do_winner(self):
+ winners = filter(
+ lambda (method, score):
+ score
+ > self.min_scores[method] + int(score > lowmax),
+ self.scores.iteritems()
+ )
+ winners.sort(
+ key = lambda (method, score): (
+ -(score - self.min_scores[method] - int(score > lowmax)),
+ self.preference.index(method)
)
- )[0]
- length = self.do_cmd(winner, score)
+ )
+ winner, score = winners[0]
+
+ length = min(score, max_length)
+ self.do_cmd(winner, length)
self.address += length
def do_cmd(self, cmd, length):
- length = min(length, max_length)
+ start_address = self.address
+
cmd_length = length - 1
output = []
if length > lowmax:
- output += [(self.commands['long'] << 5) + (self.commands[cmd] << 2) + (cmd_length >> 8)]
- output += [cmd_length & 0xff]
+ output.append(
+ (self.commands['long'] << 5)
+ + (self.commands[cmd] << 2)
+ + (cmd_length >> 8)
+ )
+ output.append(
+ cmd_length & 0xff
+ )
else:
- output += [(self.commands[cmd] << 5) + cmd_length]
+ output.append(
+ (self.commands[cmd] << 5)
+ + cmd_length
+ )
- output += {
- 'literal': self.literal,
- 'iterate': self.iter,
- 'alternate': self.alts,
- 'blank': [],
- }.get(cmd, [])
+ self.helpers['blank'] = [] # quick hack
+ output += self.helpers.get(cmd, [])
- if cmd in ['repeat', 'reverse', 'flip']:
+ if cmd in self.lookback_methods:
offset = self.offsets[cmd]
# Negative offsets are one byte.
# Positive offsets are two.
- if self.address - offset <= 0x7f:
- offset = self.address - offset + 0x80
- offset -= 1 # this is a hack, but it seems to work
+ if start_address - offset <= 0x7f:
+ offset = start_address - offset + 0x80
+ offset -= 1 # this seems to work
output += [offset]
else:
output += [offset / 0x100, offset % 0x100] # big endian
if self.debug:
- print (
+ print ' '.join(map(str, [
cmd, length, '\t',
- ' '.join(map('{:02x}'.format, output))
- )
+ ' '.join(map('{:02x}'.format, output)),
+ self.data[start_address:start_address+length] if cmd in self.lookback_methods else '',
+ ]))
self.output += output
- return length