mirror of
https://github.com/google/brotli.git
synced 2026-02-04 05:35:09 +00:00
119 lines
4.2 KiB
Python
119 lines
4.2 KiB
Python
# Copyright 2016 The Brotli Authors. All rights reserved.
|
|
#
|
|
# Distributed under MIT license.
|
|
# See file LICENSE for detail or copy at https://opensource.org/licenses/MIT
|
|
|
|
import functools
|
|
import os
|
|
import unittest
|
|
|
|
import brotli
|
|
|
|
from . import _test_utils
|
|
|
|
|
|
def _get_original_name(test_data):
|
|
return test_data.split('.compressed')[0]
|
|
|
|
|
|
class TestDecompressor(_test_utils.TestCase):
|
|
|
|
CHUNK_SIZE = 1
|
|
MIN_OUTPUT_BUFFER_SIZE = 32768 # Actually, several bytes less.
|
|
|
|
def setUp(self):
|
|
# super().setUp() # Requires Py3+
|
|
self.decompressor = brotli.Decompressor()
|
|
|
|
def tearDown(self):
|
|
self.decompressor = None
|
|
# super().tearDown() # Requires Py3+
|
|
|
|
def _check_decompression(self, test_data):
|
|
# Verify decompression matches the original.
|
|
temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data)
|
|
original = _get_original_name(test_data)
|
|
self.assert_files_match(temp_uncompressed, original)
|
|
|
|
def _decompress(self, test_data):
|
|
temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data)
|
|
with open(temp_uncompressed, 'wb') as out_file:
|
|
with open(test_data, 'rb') as in_file:
|
|
read_chunk = functools.partial(in_file.read, self.CHUNK_SIZE)
|
|
for data in iter(read_chunk, b''):
|
|
out_file.write(self.decompressor.process(data))
|
|
self.assertTrue(self.decompressor.is_finished())
|
|
|
|
def _decompress_with_limit(self, test_data):
|
|
output_buffer_limit = 10922
|
|
temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data)
|
|
with open(temp_uncompressed, 'wb') as out_file:
|
|
with open(test_data, 'rb') as in_file:
|
|
chunk_iter = iter(functools.partial(in_file.read, 10 * 1024), b'')
|
|
while not self.decompressor.is_finished():
|
|
data = b''
|
|
if self.decompressor.can_accept_more_data():
|
|
data = next(chunk_iter, b'')
|
|
decompressed_data = self.decompressor.process(
|
|
data, output_buffer_limit=output_buffer_limit
|
|
)
|
|
self.assertLessEqual(
|
|
len(decompressed_data), self.MIN_OUTPUT_BUFFER_SIZE
|
|
)
|
|
out_file.write(decompressed_data)
|
|
self.assertIsNone(next(chunk_iter, None))
|
|
|
|
def _test_decompress(self, test_data):
|
|
self._decompress(test_data)
|
|
self._check_decompression(test_data)
|
|
|
|
def _test_decompress_with_limit(self, test_data):
|
|
self._decompress_with_limit(test_data)
|
|
self._check_decompression(test_data)
|
|
|
|
def test_too_much_input(self):
|
|
with open(
|
|
os.path.join(_test_utils.TESTDATA_DIR, 'zerosukkanooa.compressed'), 'rb'
|
|
) as in_file:
|
|
compressed = in_file.read()
|
|
self.decompressor.process(compressed[:-1], output_buffer_limit=10240)
|
|
# the following assertion checks whether the test setup is correct
|
|
self.assertFalse(self.decompressor.can_accept_more_data())
|
|
with self.assertRaises(brotli.error):
|
|
self.decompressor.process(compressed[-1:])
|
|
|
|
def test_changing_limit(self):
|
|
test_data = os.path.join(
|
|
_test_utils.TESTDATA_DIR, 'zerosukkanooa.compressed'
|
|
)
|
|
check_output = os.path.exists(test_data.replace('.compressed', ''))
|
|
temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data)
|
|
with open(temp_uncompressed, 'wb') as out_file:
|
|
with open(test_data, 'rb') as in_file:
|
|
compressed = in_file.read()
|
|
uncompressed = self.decompressor.process(
|
|
compressed[:-1], output_buffer_limit=10240
|
|
)
|
|
self.assertLessEqual(len(uncompressed), self.MIN_OUTPUT_BUFFER_SIZE)
|
|
out_file.write(uncompressed)
|
|
while not self.decompressor.can_accept_more_data():
|
|
out_file.write(self.decompressor.process(b''))
|
|
out_file.write(self.decompressor.process(compressed[-1:]))
|
|
if check_output:
|
|
self._check_decompression(test_data)
|
|
|
|
def test_garbage_appended(self):
|
|
with self.assertRaises(brotli.error):
|
|
self.decompressor.process(brotli.compress(b'a') + b'a')
|
|
|
|
def test_already_finished(self):
|
|
self.decompressor.process(brotli.compress(b'a'))
|
|
with self.assertRaises(brotli.error):
|
|
self.decompressor.process(b'a')
|
|
|
|
|
|
_test_utils.generate_test_methods(TestDecompressor, for_decompression=True)
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|