mirror of
https://github.com/google/brotli.git
synced 2026-02-06 13:44:45 +00:00
Avoid file IO. Drive-by: drop bro.py and bro_test.py; we do not support it well and likely no one uses it. PiperOrigin-RevId: 834206605
92 lines
3.0 KiB
Python
92 lines
3.0 KiB
Python
# Copyright 2016 The Brotli Authors. All rights reserved.
|
|
#
|
|
# Distributed under MIT license.
|
|
# See file LICENSE for detail or copy at https://opensource.org/licenses/MIT
|
|
|
|
import brotli
|
|
import pytest
|
|
|
|
from . import _test_utils
|
|
|
|
MIN_OUTPUT_BUFFER_SIZE = 32768 # Actually, several bytes less.
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
'compressed_name, original_name', _test_utils.gather_compressed_inputs()
|
|
)
|
|
def test_decompress(compressed_name, original_name):
|
|
decompressor = brotli.Decompressor()
|
|
compressed = _test_utils.take_input(compressed_name)
|
|
original = _test_utils.take_input(original_name)
|
|
chunk_size = 1
|
|
chunks = _test_utils.chunk_input(compressed, chunk_size)
|
|
decompressed = b''
|
|
for chunk in chunks:
|
|
decompressed += decompressor.process(chunk)
|
|
assert decompressor.is_finished()
|
|
assert original == decompressed
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
'compressed_name, original_name', _test_utils.gather_compressed_inputs()
|
|
)
|
|
def test_decompress_with_limit(compressed_name, original_name):
|
|
decompressor = brotli.Decompressor()
|
|
compressed = _test_utils.take_input(compressed_name)
|
|
original = _test_utils.take_input(original_name)
|
|
chunk_size = 10 * 1024
|
|
output_buffer_limit = 10922
|
|
chunks = _test_utils.chunk_input(compressed, chunk_size)
|
|
decompressed = b''
|
|
while not decompressor.is_finished():
|
|
data = b''
|
|
if decompressor.can_accept_more_data() and chunks:
|
|
data = chunks.pop(0)
|
|
decompressed_chunk = decompressor.process(
|
|
data, output_buffer_limit=output_buffer_limit
|
|
)
|
|
assert len(decompressed_chunk) <= MIN_OUTPUT_BUFFER_SIZE
|
|
decompressed += decompressed_chunk
|
|
assert not chunks
|
|
assert original == decompressed
|
|
|
|
|
|
def test_too_much_input():
|
|
decompressor = brotli.Decompressor()
|
|
compressed = _test_utils.take_input('zerosukkanooa.compressed')
|
|
decompressor.process(compressed[:-1], output_buffer_limit=10240)
|
|
# The following assertion checks whether the test setup is correct.
|
|
assert not decompressor.can_accept_more_data()
|
|
with pytest.raises(brotli.error):
|
|
decompressor.process(compressed[-1:])
|
|
|
|
|
|
def test_changing_limit():
|
|
decompressor = brotli.Decompressor()
|
|
input_name = 'zerosukkanooa'
|
|
compressed = _test_utils.take_input(input_name + '.compressed')
|
|
check_output = _test_utils.has_input(input_name)
|
|
decompressed = decompressor.process(
|
|
compressed[:-1], output_buffer_limit=10240
|
|
)
|
|
assert len(decompressed) <= MIN_OUTPUT_BUFFER_SIZE
|
|
while not decompressor.can_accept_more_data():
|
|
decompressed += decompressor.process(b'')
|
|
decompressed += decompressor.process(compressed[-1:])
|
|
if check_output:
|
|
original = _test_utils.take_input(input_name)
|
|
assert original == decompressed
|
|
|
|
|
|
def test_garbage_appended():
|
|
decompressor = brotli.Decompressor()
|
|
with pytest.raises(brotli.error):
|
|
decompressor.process(brotli.compress(b'a') + b'a')
|
|
|
|
|
|
def test_already_finished():
|
|
decompressor = brotli.Decompressor()
|
|
decompressor.process(brotli.compress(b'a'))
|
|
with pytest.raises(brotli.error):
|
|
decompressor.process(b'a')
|