# Copyright 2016 The Brotli Authors. All rights reserved. # # Distributed under MIT license. # See file LICENSE for detail or copy at https://opensource.org/licenses/MIT import brotli import pytest from . import _test_utils MIN_OUTPUT_BUFFER_SIZE = 32768 # Actually, several bytes less. @pytest.mark.parametrize( 'compressed_name, original_name', _test_utils.gather_compressed_inputs() ) def test_decompress(compressed_name, original_name): decompressor = brotli.Decompressor() compressed = _test_utils.take_input(compressed_name) original = _test_utils.take_input(original_name) chunk_size = 1 chunks = _test_utils.chunk_input(compressed, chunk_size) decompressed = b'' for chunk in chunks: decompressed += decompressor.process(chunk) assert decompressor.is_finished() assert original == decompressed @pytest.mark.parametrize( 'compressed_name, original_name', _test_utils.gather_compressed_inputs() ) def test_decompress_with_limit(compressed_name, original_name): decompressor = brotli.Decompressor() compressed = _test_utils.take_input(compressed_name) original = _test_utils.take_input(original_name) chunk_size = 10 * 1024 output_buffer_limit = 10922 chunks = _test_utils.chunk_input(compressed, chunk_size) decompressed = b'' while not decompressor.is_finished(): data = b'' if decompressor.can_accept_more_data() and chunks: data = chunks.pop(0) decompressed_chunk = decompressor.process( data, output_buffer_limit=output_buffer_limit ) assert len(decompressed_chunk) <= MIN_OUTPUT_BUFFER_SIZE decompressed += decompressed_chunk assert not chunks assert original == decompressed def test_too_much_input(): decompressor = brotli.Decompressor() compressed = _test_utils.take_input('zerosukkanooa.compressed') decompressor.process(compressed[:-1], output_buffer_limit=10240) # The following assertion checks whether the test setup is correct. assert not decompressor.can_accept_more_data() with pytest.raises(brotli.error): decompressor.process(compressed[-1:]) def test_changing_limit(): decompressor = brotli.Decompressor() input_name = 'zerosukkanooa' compressed = _test_utils.take_input(input_name + '.compressed') check_output = _test_utils.has_input(input_name) decompressed = decompressor.process( compressed[:-1], output_buffer_limit=10240 ) assert len(decompressed) <= MIN_OUTPUT_BUFFER_SIZE while not decompressor.can_accept_more_data(): decompressed += decompressor.process(b'') decompressed += decompressor.process(compressed[-1:]) if check_output: original = _test_utils.take_input(input_name) assert original == decompressed def test_garbage_appended(): decompressor = brotli.Decompressor() with pytest.raises(brotli.error): decompressor.process(brotli.compress(b'a') + b'a') def test_already_finished(): decompressor = brotli.Decompressor() decompressor.process(brotli.compress(b'a')) with pytest.raises(brotli.error): decompressor.process(b'a')