Merge branch 'master' into fix-python-docstring

This commit is contained in:
Eugene Kliuchnikov
2025-11-24 12:39:25 +01:00
committed by GitHub
30 changed files with 316 additions and 979 deletions

View File

@@ -137,7 +137,6 @@ jobs:
build_system: python
python_version: "3.10"
# TODO: investigate why win-builds can't run tests
py_setuptools_cmd: build_ext
os: windows-2022
- name: maven
@@ -208,7 +207,7 @@ jobs:
steps:
- name: Harden Runner
uses: step-security/harden-runner@f4a75cfd619ee5ce8d5b864b0d183aff3c69b55a # v2.13.1
uses: step-security/harden-runner@95d9a5deda9de15063e7595e9719c11c38c90ae2 # v2.13.2
with:
egress-policy: audit
@@ -221,7 +220,7 @@ jobs:
sudo apt install -y ${EXTRA_PACKAGES}
- name: Checkout the source
uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
uses: actions/checkout@1af3b93b6815bc44a9784bd300feb67ff0d1eeb3 # v6.0.0
with:
submodules: false
fetch-depth: 1
@@ -333,40 +332,9 @@ jobs:
run: |
python -VV
python -c "import sys; sys.exit('Invalid python version') if '.'.join(map(str,sys.version_info[0:2])) != '${{ matrix.python_version }}' else True"
pip install setuptools==51.3.3
python setup.py ${{ matrix.py_setuptools_cmd || 'test'}}
build_test_py27:
name: Build and test with Python 2.7
runs-on: ubuntu-latest
container:
image: ubuntu:22.04
steps:
- name: Harden Runner
uses: step-security/harden-runner@f4a75cfd619ee5ce8d5b864b0d183aff3c69b55a # v2.13.1
with:
egress-policy: audit
- name: Install deps
run: |
apt update
apt install -y curl gcc python2.7 python2.7-dev
curl https://bootstrap.pypa.io/pip/2.7/get-pip.py --output get-pip.py
python2.7 get-pip.py
python2.7 -m pip install distutils-pytest==0.1
- name: Checkout the source
uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
with:
submodules: false
fetch-depth: 1
- name: Build / Test
run: |
python2.7 -VV
python2.7 -c "import sys; sys.exit('Invalid python version') if '.'.join(map(str,sys.version_info[0:2])) != '2.7' else True"
python2.7 setup.py test
pip install setuptools==51.3.3 pytest
python setup.py build_ext --inplace
pytest ./python/tests
build_test_dotnet:
name: Build and test with .NET
@@ -374,7 +342,7 @@ jobs:
steps:
- name: Checkout the source
uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
uses: actions/checkout@1af3b93b6815bc44a9784bd300feb67ff0d1eeb3 # v6.0.0
with:
submodules: false
fetch-depth: 1

View File

@@ -35,11 +35,11 @@ jobs:
steps:
- name: Harden Runner
uses: step-security/harden-runner@f4a75cfd619ee5ce8d5b864b0d183aff3c69b55a # v2.13.1
uses: step-security/harden-runner@95d9a5deda9de15063e7595e9719c11c38c90ae2 # v2.13.2
with:
egress-policy: audit
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
- uses: actions/checkout@1af3b93b6815bc44a9784bd300feb67ff0d1eeb3 # v6.0.0
with:
submodules: true
fetch-depth: 1

View File

@@ -35,16 +35,16 @@ jobs:
steps:
- name: Harden Runner
uses: step-security/harden-runner@f4a75cfd619ee5ce8d5b864b0d183aff3c69b55a # v2.13.1
uses: step-security/harden-runner@95d9a5deda9de15063e7595e9719c11c38c90ae2 # v2.13.2
with:
egress-policy: audit
- name: Checkout repository
uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
uses: actions/checkout@1af3b93b6815bc44a9784bd300feb67ff0d1eeb3 # v6.0.0
# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL
uses: github/codeql-action/init@0499de31b99561a6d14a36a5f662c2a54f91beee # v3.29.5
uses: github/codeql-action/init@e12f0178983d466f2f6028f5cc7a6d786fd97f4b # v3.29.5
with:
languages: ${{ matrix.language }}
# CodeQL is currently crashing on files with large lists:
@@ -56,7 +56,7 @@ jobs:
- if: matrix.language == 'cpp'
name: Build CPP
uses: github/codeql-action/autobuild@0499de31b99561a6d14a36a5f662c2a54f91beee # v3.29.5
uses: github/codeql-action/autobuild@e12f0178983d466f2f6028f5cc7a6d786fd97f4b # v3.29.5
- if: matrix.language == 'cpp' || matrix.language == 'java'
name: Build Java
@@ -66,7 +66,7 @@ jobs:
- if: matrix.language == 'javascript'
name: Build JS
uses: github/codeql-action/autobuild@0499de31b99561a6d14a36a5f662c2a54f91beee # v3.29.5
uses: github/codeql-action/autobuild@e12f0178983d466f2f6028f5cc7a6d786fd97f4b # v3.29.5
- if: matrix.language == 'cpp' || matrix.language == 'python'
name: Build Python
@@ -74,7 +74,7 @@ jobs:
python setup.py build_ext
- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@0499de31b99561a6d14a36a5f662c2a54f91beee # v3.29.5
uses: github/codeql-action/analyze@e12f0178983d466f2f6028f5cc7a6d786fd97f4b # v3.29.5
with:
category: "/language:${{matrix.language}}"
ref: "${{ github.ref != 'master' && github.ref || '/refs/heads/master' }}"

View File

@@ -22,7 +22,7 @@ jobs:
steps:
- name: Harden Runner
uses: step-security/harden-runner@f4a75cfd619ee5ce8d5b864b0d183aff3c69b55a # v2.13.1
uses: step-security/harden-runner@95d9a5deda9de15063e7595e9719c11c38c90ae2 # v2.13.2
with:
egress-policy: audit

View File

@@ -30,21 +30,26 @@ jobs:
steps:
- name: Harden Runner
uses: step-security/harden-runner@f4a75cfd619ee5ce8d5b864b0d183aff3c69b55a # v2.13.1
uses: step-security/harden-runner@95d9a5deda9de15063e7595e9719c11c38c90ae2 # v2.13.2
with:
egress-policy: audit
- name: Checkout repository
uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
uses: actions/checkout@1af3b93b6815bc44a9784bd300feb67ff0d1eeb3 # v6.0.0
- name: Install tools
run: |
eval "$(/home/linuxbrew/.linuxbrew/bin/brew shellenv)"
brew install buildifier typos-cli
brew install buildifier ruff typos-cli
- name: Check typos
run: |
eval "$(/home/linuxbrew/.linuxbrew/bin/brew shellenv)"
./scripts/check_typos.sh
- name: Lint Python code
run: |
eval "$(/home/linuxbrew/.linuxbrew/bin/brew shellenv)"
ruff check --extend-select=C4,C90,PERF,RET,SIM,W
# TODO(eustas): run buildifier

View File

@@ -64,12 +64,12 @@ jobs:
steps:
- name: Harden Runner
uses: step-security/harden-runner@f4a75cfd619ee5ce8d5b864b0d183aff3c69b55a # v2.13.1
uses: step-security/harden-runner@95d9a5deda9de15063e7595e9719c11c38c90ae2 # v2.13.2
with:
egress-policy: audit
- name: Checkout the source
uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
uses: actions/checkout@1af3b93b6815bc44a9784bd300feb67ff0d1eeb3 # v6.0.0
with:
submodules: false
fetch-depth: 1
@@ -145,12 +145,12 @@ jobs:
steps:
- name: Harden Runner
uses: step-security/harden-runner@f4a75cfd619ee5ce8d5b864b0d183aff3c69b55a # v2.13.1
uses: step-security/harden-runner@95d9a5deda9de15063e7595e9719c11c38c90ae2 # v2.13.2
with:
egress-policy: audit
- name: Checkout the source
uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
uses: actions/checkout@1af3b93b6815bc44a9784bd300feb67ff0d1eeb3 # v6.0.0
with:
submodules: false
fetch-depth: 1
@@ -176,7 +176,7 @@ jobs:
steps:
- name: Checkout the source
uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
uses: actions/checkout@1af3b93b6815bc44a9784bd300feb67ff0d1eeb3 # v6.0.0
with:
submodules: false
fetch-depth: 1
@@ -203,12 +203,12 @@ jobs:
steps:
- name: Harden Runner
uses: step-security/harden-runner@f4a75cfd619ee5ce8d5b864b0d183aff3c69b55a # v2.13.1
uses: step-security/harden-runner@95d9a5deda9de15063e7595e9719c11c38c90ae2 # v2.13.2
with:
egress-policy: audit
- name: Checkout the source
uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
uses: actions/checkout@1af3b93b6815bc44a9784bd300feb67ff0d1eeb3 # v6.0.0
with:
submodules: false
fetch-depth: 1

View File

@@ -37,12 +37,12 @@ jobs:
steps:
- name: Harden Runner
uses: step-security/harden-runner@f4a75cfd619ee5ce8d5b864b0d183aff3c69b55a # v2.13.1
uses: step-security/harden-runner@95d9a5deda9de15063e7595e9719c11c38c90ae2 # v2.13.2
with:
egress-policy: audit
- name: "Checkout code"
uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
uses: actions/checkout@1af3b93b6815bc44a9784bd300feb67ff0d1eeb3 # v6.0.0
with:
persist-credentials: false
@@ -77,6 +77,6 @@ jobs:
# Upload the results to GitHub's code scanning dashboard.
- name: "Upload to code-scanning"
uses: github/codeql-action/upload-sarif@0499de31b99561a6d14a36a5f662c2a54f91beee # v2.23.3
uses: github/codeql-action/upload-sarif@e12f0178983d466f2f6028f5cc7a6d786fd97f4b # v2.23.3
with:
sarif_file: results.sarif

View File

@@ -386,11 +386,11 @@ endif() # BROTLI_BUNDLED_MODE
if (BROTLI_BUILD_TOOLS)
install(FILES "docs/brotli.1"
DESTINATION "${CMAKE_INSTALL_FULL_MANDIR}/man1")
DESTINATION "${CMAKE_INSTALL_MANDIR}/man1")
endif()
install(FILES docs/constants.h.3 docs/decode.h.3 docs/encode.h.3 docs/types.h.3
DESTINATION "${CMAKE_INSTALL_FULL_MANDIR}/man3")
DESTINATION "${CMAKE_INSTALL_MANDIR}/man3")
if (ENABLE_COVERAGE STREQUAL "yes")
setup_target_for_coverage(coverage test coverage)

View File

@@ -9,7 +9,6 @@ include c/include/brotli/*.h
include LICENSE
include MANIFEST.in
include python/_brotli.cc
include python/bro.py
include python/brotli.py
include python/README.md
include python/tests/*

View File

@@ -433,7 +433,7 @@ static size_t UpdateNodes(
const CompoundDictionary* addon = &params->dictionary.compound;
size_t gap = addon->total_size;
BROTLI_DCHECK(cur_ix_masked + max_len <= ringbuffer_mask);
BROTLI_DCHECK(cur_ix_masked + max_len <= ringbuffer_mask + 1);
EvaluateNode(block_start + stream_offset, pos, max_backward_limit, gap,
starting_dist_cache, model, queue, nodes);

View File

@@ -545,7 +545,7 @@ static BROTLI_INLINE void FindCompoundDictionaryMatch(
source = (const uint8_t*)BROTLI_UNALIGNED_LOAD_PTR((const uint8_t**)tail);
}
BROTLI_DCHECK(cur_ix_masked + max_length <= ring_buffer_mask);
BROTLI_DCHECK(cur_ix_masked + max_length <= ring_buffer_mask + 1);
for (i = 0; i < 4; ++i) {
const size_t distance = (size_t)distance_cache[i];
@@ -656,7 +656,7 @@ static BROTLI_INLINE size_t FindAllCompoundDictionaryMatches(
source = (const uint8_t*)BROTLI_UNALIGNED_LOAD_PTR((const uint8_t**)tail);
}
BROTLI_DCHECK(cur_ix_masked + max_length <= ring_buffer_mask);
BROTLI_DCHECK(cur_ix_masked + max_length <= ring_buffer_mask + 1);
while (item == 0) {
size_t offset;

View File

@@ -213,7 +213,7 @@ static BROTLI_INLINE void FN(FindLongestMatch)(
out->len = 0;
out->len_code_delta = 0;
BROTLI_DCHECK(cur_ix_masked + max_length <= ring_buffer_mask);
BROTLI_DCHECK(cur_ix_masked + max_length <= ring_buffer_mask + 1);
/* Try last distance first. */
for (i = 0; i < NUM_LAST_DISTANCES_TO_CHECK; ++i) {

View File

@@ -178,7 +178,7 @@ static BROTLI_INLINE void FN(FindLongestMatch)(
out->len = 0;
out->len_code_delta = 0;
BROTLI_DCHECK(cur_ix_masked + max_length <= ring_buffer_mask);
BROTLI_DCHECK(cur_ix_masked + max_length <= ring_buffer_mask + 1);
/* Try last distance first. */
for (i = 0; i < (size_t)self->num_last_distances_to_check_; ++i) {

View File

@@ -195,7 +195,7 @@ static BROTLI_INLINE void FN(FindLongestMatch)(
out->len = 0;
out->len_code_delta = 0;
BROTLI_DCHECK(cur_ix_masked + max_length <= ring_buffer_mask);
BROTLI_DCHECK(cur_ix_masked + max_length <= ring_buffer_mask + 1);
/* Try last distance first. */
for (i = 0; i < (size_t)self->num_last_distances_to_check_; ++i) {

View File

@@ -178,7 +178,7 @@ static BROTLI_INLINE void FN(FindLongestMatch)(
out->len = 0;
out->len_code_delta = 0;
BROTLI_DCHECK(cur_ix_masked + max_length <= ring_buffer_mask);
BROTLI_DCHECK(cur_ix_masked + max_length <= ring_buffer_mask + 1);
/* Try last distance first. */
for (i = 0; i < (size_t)self->num_last_distances_to_check_; ++i) {

View File

@@ -165,7 +165,7 @@ static BROTLI_INLINE void FN(FindLongestMatch)(
size_t cached_backward = (size_t)distance_cache[0];
size_t prev_ix = cur_ix - cached_backward;
BROTLI_DCHECK(cur_ix_masked + max_length <= ring_buffer_mask);
BROTLI_DCHECK(cur_ix_masked + max_length <= ring_buffer_mask + 1);
out->len_code_delta = 0;
if (prev_ix < cur_ix) {

View File

@@ -170,7 +170,7 @@ static BROTLI_INLINE void FN(FindLongestMatch)(
out->len = 0;
out->len_code_delta = 0;
BROTLI_DCHECK(cur_ix_masked + max_length <= ring_buffer_mask);
BROTLI_DCHECK(cur_ix_masked + max_length <= ring_buffer_mask + 1);
/* Try last distance first. */
for (i = 0; i < (size_t)self->num_last_distances_to_check_; ++i) {

View File

@@ -1,7 +1,7 @@
This directory contains the code for the Python `brotli` module,
`bro.py` tool, and roundtrip tests.
and roundtrip tests.
Only Python 2.7+ is supported.
Only Python 3.10+ is supported.
We provide a `Makefile` to simplify common development commands.
@@ -17,13 +17,17 @@ following command from this directory:
$ make install
If you already have native Brotli installed on your system and want to use this one instead of the vendored sources, you
should set the `USE_SYSTEM_BROTLI=1` environment variable when building the wheel, like this:
If you already have native Brotli installed on your system and want to use
this one instead of the vendored sources, you should set
the `USE_SYSTEM_BROTLI=1` environment variable when building the wheel,
like this:
$ USE_SYSTEM_BROTLI=1 pip install brotli --no-binary brotli
Brotli is found via the `pkg-config` utility. Moreover, you must build all 3 `brotlicommon`, `brotlienc`, and `brotlidec`
components. If you're installing brotli from the package manager, you need the development package, like this on Fedora:
Brotli is found via the `pkg-config` utility. Moreover, you must build
all 3 `brotlicommon`, `brotlienc`, and `brotlidec` components. If you're
installing brotli from the package manager, you need the development package,
like this on Fedora:
$ dnf install brotli brotli-devel
@@ -45,8 +49,8 @@ able to edit the source files, you can use the `setuptools`
### Code Style
Brotli's code follows the [Google Python Style Guide][]. To
automatically format your code, first install [YAPF][]:
Brotli code follows the [Google Python Style Guide][].
To automatically format your code, first install [YAPF][]:
$ pip install yapf
@@ -56,7 +60,6 @@ Then, to format all files in the project, you can run:
See the [YAPF usage][] documentation for more information.
[PyPI]: https://pypi.org/project/Brotli/
[development mode]: https://setuptools.readthedocs.io/en/latest/setuptools.html#development-mode
[Google Python Style Guide]: https://google.github.io/styleguide/pyguide.html

View File

@@ -12,18 +12,18 @@
#include <brotli/decode.h>
#include <brotli/encode.h>
#if PY_MAJOR_VERSION >= 3
#define PY_GET_TYPE(Obj) (Py_TYPE(Obj))
#else
#define PY_GET_TYPE(Obj) ((Obj)->ob_type)
// 3.9 end-of-life is 2025-10-31.
// 3.10 end-of-life is 2026-10.
// 3.11 end-of-life is 2027-10.
// 3.12 end-of-life is 2028-10.
// 3.13 end-of-life is 2029-10.
// 3.14 end-of-life is 2030-10.
#if PY_MAJOR_VERSION < 3 || (PY_MAJOR_VERSION == 3 && PY_MINOR_VERSION < 10)
#error "Only Python 3.10+ is supported"
#endif
static const char kErrorAttr[] = "error";
#if PY_MAJOR_VERSION >= 3
static const char kModuleAttr[] = "_module";
#else
static PyObject* BrotliError;
#endif
static const char kInvalidBufferError[] =
"brotli: data must be a C-contiguous buffer";
@@ -201,7 +201,6 @@ PyDoc_STRVAR(brotli_doc,
/* clang-format on */
static void set_brotli_exception(PyObject* t, const char* msg) {
#if PY_MAJOR_VERSION >= 3
PyObject* error = NULL;
PyObject* module = NULL;
assert(t != NULL);
@@ -213,13 +212,9 @@ static void set_brotli_exception(PyObject* t, const char* msg) {
if (error == NULL) return; /* AttributeError raised. */
PyErr_SetString(error, msg);
Py_DECREF(error);
#else
PyErr_SetString(BrotliError, msg);
#endif
}
static void set_brotli_exception_from_module(PyObject* m, const char* msg) {
#if PY_MAJOR_VERSION >= 3
PyObject* error = NULL;
assert(m != NULL);
assert(PyModule_Check(m));
@@ -227,9 +222,6 @@ static void set_brotli_exception_from_module(PyObject* m, const char* msg) {
if (error == NULL) return; /* AttributeError raised. */
PyErr_SetString(error, msg);
Py_DECREF(error);
#else
PyErr_SetString(BrotliError, msg);
#endif
}
/*
@@ -354,7 +346,7 @@ static PyObject* Buffer_Finish(Buffer* buffer) {
}
if (len == 0) return result;
out = PyBytes_AS_STRING(result);
out = (uint8_t*)PyBytes_AS_STRING(result);
block = buffer->head;
while (block != buffer->tail) {
memcpy(out + pos, block->payload, block->size);
@@ -389,7 +381,7 @@ static PyObject* brotli_Compressor_new(PyTypeObject* type, PyObject* args,
self->enc = BrotliEncoderCreateInstance(0, 0, 0);
if (self->enc == NULL) {
set_brotli_exception(self_type, kCompressCreateError);
PY_GET_TYPE(self)->tp_free((PyObject*)self);
Py_TYPE(self)->tp_free((PyObject*)self);
return NULL;
}
self->healthy = 1;
@@ -401,7 +393,7 @@ static int brotli_Compressor_init(PyBrotli_Compressor* self, PyObject* args,
PyObject* keywds) {
static const char* kwlist[] = {"mode", "quality", "lgwin", "lgblock", NULL};
PyObject* self_type = (PyObject*)PY_GET_TYPE((PyObject*)self);
PyObject* self_type = (PyObject*)Py_TYPE((PyObject*)self);
unsigned char mode = BROTLI_DEFAULT_MODE;
unsigned char quality = BROTLI_DEFAULT_QUALITY;
unsigned char lgwin = BROTLI_DEFAULT_WINDOW;
@@ -454,7 +446,7 @@ static int brotli_Compressor_init(PyBrotli_Compressor* self, PyObject* args,
static void brotli_Compressor_dealloc(PyBrotli_Compressor* self) {
if (self->enc) BrotliEncoderDestroyInstance(self->enc);
PY_GET_TYPE(self)->tp_free((PyObject*)self);
Py_TYPE(self)->tp_free((PyObject*)self);
}
/*
@@ -466,7 +458,7 @@ static void brotli_Compressor_dealloc(PyBrotli_Compressor* self) {
static PyObject* compress_stream(PyBrotli_Compressor* self,
BrotliEncoderOperation op, uint8_t* input,
size_t input_length) {
PyObject* self_type = (PyObject*)PY_GET_TYPE((PyObject*)self);
PyObject* self_type = (PyObject*)Py_TYPE((PyObject*)self);
size_t available_in = input_length;
const uint8_t* next_in = input;
Buffer buffer;
@@ -526,7 +518,7 @@ error:
static PyObject* brotli_Compressor_process(PyBrotli_Compressor* self,
PyObject* args) {
PyObject* self_type = (PyObject*)PY_GET_TYPE((PyObject*)self);
PyObject* self_type = (PyObject*)Py_TYPE((PyObject*)self);
PyObject* ret = NULL;
PyObject* input_object = NULL;
Py_buffer input;
@@ -556,7 +548,7 @@ static PyObject* brotli_Compressor_process(PyBrotli_Compressor* self,
}
static PyObject* brotli_Compressor_flush(PyBrotli_Compressor* self) {
PyObject* self_type = (PyObject*)PY_GET_TYPE((PyObject*)self);
PyObject* self_type = (PyObject*)Py_TYPE((PyObject*)self);
PyObject* ret = NULL;
if (self->healthy == 0) {
@@ -575,7 +567,7 @@ static PyObject* brotli_Compressor_flush(PyBrotli_Compressor* self) {
}
static PyObject* brotli_Compressor_finish(PyBrotli_Compressor* self) {
PyObject* self_type = (PyObject*)PY_GET_TYPE((PyObject*)self);
PyObject* self_type = (PyObject*)Py_TYPE((PyObject*)self);
PyObject* ret = NULL;
if (self->healthy == 0) {
@@ -619,7 +611,7 @@ static PyObject* brotli_Decompressor_new(PyTypeObject* type, PyObject* args,
self->dec = BrotliDecoderCreateInstance(0, 0, 0);
if (self->dec == NULL) {
set_brotli_exception(self_type, kDecompressCreateError);
PY_GET_TYPE(self)->tp_free((PyObject*)self);
Py_TYPE(self)->tp_free((PyObject*)self);
return NULL;
}
@@ -653,14 +645,14 @@ static void brotli_Decompressor_dealloc(PyBrotli_Decompressor* self) {
free(self->unconsumed_data);
self->unconsumed_data = NULL;
}
PY_GET_TYPE(self)->tp_free((PyObject*)self);
Py_TYPE(self)->tp_free((PyObject*)self);
}
static PyObject* brotli_Decompressor_process(PyBrotli_Decompressor* self,
PyObject* args, PyObject* keywds) {
static const char* kwlist[] = {"", "output_buffer_limit", NULL};
PyObject* self_type = (PyObject*)PY_GET_TYPE((PyObject*)self);
PyObject* self_type = (PyObject*)Py_TYPE((PyObject*)self);
PyObject* ret = NULL;
PyObject* input_object = NULL;
Py_buffer input;
@@ -720,7 +712,7 @@ static PyObject* brotli_Decompressor_process(PyBrotli_Decompressor* self,
if (result == BROTLI_DECODER_RESULT_NEEDS_MORE_OUTPUT) {
assert(buffer.avail_out == 0);
/* All allocated is used -> reached the output length limit. */
if (buffer.total_allocated >= output_buffer_limit) break;
if (buffer.total_allocated >= (uint64_t)output_buffer_limit) break;
if (Buffer_Grow(&buffer) < 0) {
oom = 1;
break;
@@ -782,7 +774,7 @@ finally:
}
static PyObject* brotli_Decompressor_is_finished(PyBrotli_Decompressor* self) {
PyObject* self_type = (PyObject*)PY_GET_TYPE((PyObject*)self);
PyObject* self_type = (PyObject*)Py_TYPE((PyObject*)self);
if (self->healthy == 0) {
set_brotli_exception(self_type, kDecompressUnhealthyError);
return NULL;
@@ -800,7 +792,7 @@ static PyObject* brotli_Decompressor_is_finished(PyBrotli_Decompressor* self) {
static PyObject* brotli_Decompressor_can_accept_more_data(
PyBrotli_Decompressor* self) {
PyObject* self_type = (PyObject*)PY_GET_TYPE((PyObject*)self);
PyObject* self_type = (PyObject*)Py_TYPE((PyObject*)self);
if (self->healthy == 0) {
set_brotli_exception(self_type, kDecompressUnhealthyError);
return NULL;
@@ -892,13 +884,6 @@ finally:
/* Module definition */
static int init_brotli_mod(PyObject* m);
static PyMethodDef brotli_methods[] = {
{"decompress", (PyCFunction)brotli_decompress, METH_VARARGS | METH_KEYWORDS,
brotli_decompress__doc__},
{NULL, NULL, 0, NULL}};
static PyMethodDef brotli_Compressor_methods[] = {
{"process", (PyCFunction)brotli_Compressor_process, METH_VARARGS,
brotli_Compressor_process_doc},
@@ -909,44 +894,6 @@ static PyMethodDef brotli_Compressor_methods[] = {
{NULL} /* Sentinel */
};
static PyMethodDef brotli_Decompressor_methods[] = {
{"process", (PyCFunction)brotli_Decompressor_process,
METH_VARARGS | METH_KEYWORDS, brotli_Decompressor_process_doc},
{"is_finished", (PyCFunction)brotli_Decompressor_is_finished, METH_NOARGS,
brotli_Decompressor_is_finished_doc},
{"can_accept_more_data",
(PyCFunction)brotli_Decompressor_can_accept_more_data, METH_NOARGS,
brotli_Decompressor_can_accept_more_data_doc},
{NULL} /* Sentinel */
};
#if PY_MAJOR_VERSION >= 3
#if PY_MINOR_VERSION >= 5
static PyModuleDef_Slot brotli_mod_slots[] = {
{Py_mod_exec, init_brotli_mod},
#if PY_MINOR_VERSION >= 12
{Py_mod_multiple_interpreters, Py_MOD_PER_INTERPRETER_GIL_SUPPORTED},
#endif
{0, NULL}};
#endif
static struct PyModuleDef brotli_module = {
PyModuleDef_HEAD_INIT,
"_brotli", /* m_name */
brotli_doc, /* m_doc */
0, /* m_size */
brotli_methods, /* m_methods */
#if PY_MINOR_VERSION >= 5
brotli_mod_slots, /* m_slots */
#else
NULL, /* m_reload */
#endif
NULL, /* m_traverse */
NULL, /* m_clear */
NULL /* m_free */
};
static PyType_Slot brotli_Compressor_slots[] = {
{Py_tp_dealloc, (destructor)brotli_Compressor_dealloc},
{Py_tp_doc, (void*)brotli_Compressor_doc},
@@ -960,6 +907,17 @@ static PyType_Spec brotli_Compressor_spec = {
"brotli.Compressor", sizeof(PyBrotli_Compressor), 0,
Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, brotli_Compressor_slots};
static PyMethodDef brotli_Decompressor_methods[] = {
{"process", (PyCFunction)brotli_Decompressor_process,
METH_VARARGS | METH_KEYWORDS, brotli_Decompressor_process_doc},
{"is_finished", (PyCFunction)brotli_Decompressor_is_finished, METH_NOARGS,
brotli_Decompressor_is_finished_doc},
{"can_accept_more_data",
(PyCFunction)brotli_Decompressor_can_accept_more_data, METH_NOARGS,
brotli_Decompressor_can_accept_more_data_doc},
{NULL} /* Sentinel */
};
static PyType_Slot brotli_Decompressor_slots[] = {
{Py_tp_dealloc, (destructor)brotli_Decompressor_dealloc},
{Py_tp_doc, (void*)brotli_Decompressor_doc},
@@ -973,127 +931,17 @@ static PyType_Spec brotli_Decompressor_spec = {
"brotli.Decompressor", sizeof(PyBrotli_Decompressor), 0,
Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, brotli_Decompressor_slots};
PyMODINIT_FUNC PyInit__brotli(void) {
#if PY_MINOR_VERSION < 5
PyObject* m = PyModule_Create(&brotli_module);
if (m == NULL) return NULL;
if (init_brotli_mod(m) < 0) {
Py_DECREF(m);
m = NULL;
}
return m;
#else
return PyModuleDef_Init(&brotli_module);
#endif
}
#else
static PyTypeObject brotli_CompressorType = {
PyObject_HEAD_INIT(NULL) 0, /* ob_size */
"brotli.Compressor", /* tp_name */
sizeof(PyBrotli_Compressor), /* tp_basicsize */
0, /* tp_itemsize */
(destructor)brotli_Compressor_dealloc, /* tp_dealloc */
0, /* tp_print */
0, /* tp_getattr */
0, /* tp_setattr */
0, /* tp_compare */
0, /* tp_repr */
0, /* tp_as_number */
0, /* tp_as_sequence */
0, /* tp_as_mapping */
0, /* tp_hash */
0, /* tp_call */
0, /* tp_str */
0, /* tp_getattro */
0, /* tp_setattro */
0, /* tp_as_buffer */
Py_TPFLAGS_DEFAULT, /* tp_flags */
brotli_Compressor_doc, /* tp_doc */
0, /* tp_traverse */
0, /* tp_clear */
0, /* tp_richcompare */
0, /* tp_weaklistoffset */
0, /* tp_iter */
0, /* tp_iternext */
brotli_Compressor_methods, /* tp_methods */
0, /* tp_members */
0, /* tp_getset */
0, /* tp_base */
0, /* tp_dict */
0, /* tp_descr_get */
0, /* tp_descr_set */
0, /* tp_dictoffset */
(initproc)brotli_Compressor_init, /* tp_init */
0, /* tp_alloc */
brotli_Compressor_new, /* tp_new */
};
static PyTypeObject brotli_DecompressorType = {
PyObject_HEAD_INIT(NULL) 0, /* ob_size */
"brotli.Decompressor", /* tp_name */
sizeof(PyBrotli_Decompressor), /* tp_basicsize */
0, /* tp_itemsize */
(destructor)brotli_Decompressor_dealloc, /* tp_dealloc */
0, /* tp_print */
0, /* tp_getattr */
0, /* tp_setattr */
0, /* tp_compare */
0, /* tp_repr */
0, /* tp_as_number */
0, /* tp_as_sequence */
0, /* tp_as_mapping */
0, /* tp_hash */
0, /* tp_call */
0, /* tp_str */
0, /* tp_getattro */
0, /* tp_setattro */
0, /* tp_as_buffer */
Py_TPFLAGS_DEFAULT, /* tp_flags */
brotli_Decompressor_doc, /* tp_doc */
0, /* tp_traverse */
0, /* tp_clear */
0, /* tp_richcompare */
0, /* tp_weaklistoffset */
0, /* tp_iter */
0, /* tp_iternext */
brotli_Decompressor_methods, /* tp_methods */
0, /* tp_members */
0, /* tp_getset */
0, /* tp_base */
0, /* tp_dict */
0, /* tp_descr_get */
0, /* tp_descr_set */
0, /* tp_dictoffset */
(initproc)brotli_Decompressor_init, /* tp_init */
0, /* tp_alloc */
brotli_Decompressor_new, /* tp_new */
};
PyMODINIT_FUNC init_brotli(void) {
PyObject* m = Py_InitModule3("_brotli", brotli_methods, brotli_doc);
if (m == NULL) return;
init_brotli_mod(m);
}
#endif
/* Emulates PyModule_AddObject */
static int RegisterObject(PyObject* mod, const char* name, PyObject* value) {
assert(value != NULL);
#if PY_MAJOR_VERSION >= 3 && PY_MINOR_VERSION >= 10
int ret = PyModule_AddObjectRef(mod, name, value);
/* Emulates PyModule_AddObject, i.e. decrements the reference count on
success. */
if (ret == 0) Py_DECREF(value);
return ret;
#else
return PyModule_AddObject(mod, name, value);
#endif
}
static int init_brotli_mod(PyObject* m) {
static int brotli_init_mod(PyObject* m) {
PyObject* error_type = NULL;
PyObject* compressor_type = NULL;
PyObject* decompressor_type = NULL;
@@ -1104,34 +952,19 @@ static int init_brotli_mod(PyObject* m) {
if (error_type == NULL) goto error;
if (RegisterObject(m, kErrorAttr, error_type) < 0) goto error;
#if PY_MAJOR_VERSION < 3
/* Assumption: pointer is used only while module is alive and well. */
BrotliError = error_type;
#endif
error_type = NULL;
#if PY_MAJOR_VERSION >= 3
compressor_type = PyType_FromSpec(&brotli_Compressor_spec);
decompressor_type = PyType_FromSpec(&brotli_Decompressor_spec);
#else
compressor_type = (PyObject*)&brotli_CompressorType;
Py_INCREF(compressor_type);
decompressor_type = (PyObject*)&brotli_DecompressorType;
Py_INCREF(decompressor_type);
#endif
if (compressor_type == NULL) goto error;
if (PyType_Ready((PyTypeObject*)compressor_type) < 0) goto error;
#if PY_MAJOR_VERSION >= 3
if (PyObject_SetAttrString(compressor_type, kModuleAttr, m) < 0) goto error;
#endif
if (RegisterObject(m, "Compressor", compressor_type) < 0) goto error;
compressor_type = NULL;
if (decompressor_type == NULL) goto error;
if (PyType_Ready((PyTypeObject*)decompressor_type) < 0) goto error;
#if PY_MAJOR_VERSION >= 3
if (PyObject_SetAttrString(decompressor_type, kModuleAttr, m) < 0) goto error;
#endif
if (RegisterObject(m, "Decompressor", decompressor_type) < 0) goto error;
decompressor_type = NULL;
@@ -1162,3 +995,29 @@ error:
}
return -1;
}
static PyMethodDef brotli_methods[] = {
{"decompress", (PyCFunction)brotli_decompress, METH_VARARGS | METH_KEYWORDS,
brotli_decompress__doc__},
{NULL, NULL, 0, NULL}};
static PyModuleDef_Slot brotli_mod_slots[] = {
{Py_mod_exec, brotli_init_mod},
#if (PY_MAJOR_VERSION > 3) || (PY_MINOR_VERSION >= 12)
{Py_mod_multiple_interpreters, Py_MOD_PER_INTERPRETER_GIL_SUPPORTED},
#endif
{0, NULL}};
static struct PyModuleDef brotli_module = {
PyModuleDef_HEAD_INIT,
"_brotli", /* m_name */
brotli_doc, /* m_doc */
0, /* m_size */
brotli_methods, /* m_methods */
brotli_mod_slots, /* m_slots */
NULL, /* m_traverse */
NULL, /* m_clear */
NULL /* m_free */
};
PyMODINIT_FUNC PyInit__brotli(void) { return PyModuleDef_Init(&brotli_module); }

View File

@@ -1,194 +0,0 @@
#! /usr/bin/env python
"""Compression/decompression utility using the Brotli algorithm."""
# Note: Python2 has been deprecated long ago, but some projects out in
# the wide world may still use it nevertheless. This should not
# deprive them from being able to run Brotli.
from __future__ import print_function
import argparse
import os
import platform
import sys
import brotli
# default values of encoder parameters
_DEFAULT_PARAMS = {
'mode': brotli.MODE_GENERIC,
'quality': 11,
'lgwin': 22,
'lgblock': 0,
}
def get_binary_stdio(stream):
"""Return the specified stdin/stdout/stderr stream.
If the stdio stream requested (i.e. sys.(stdin|stdout|stderr))
has been replaced with a stream object that does not have a `.buffer`
attribute, this will return the original stdio stream's buffer, i.e.
`sys.__(stdin|stdout|stderr)__.buffer`.
Args:
stream: One of 'stdin', 'stdout', 'stderr'.
Returns:
The stream, as a 'raw' buffer object (i.e. io.BufferedIOBase subclass
instance such as io.Bufferedreader/io.BufferedWriter), suitable for
reading/writing binary data from/to it.
"""
if stream == 'stdin': stdio = sys.stdin
elif stream == 'stdout': stdio = sys.stdout
elif stream == 'stderr': stdio = sys.stderr
else:
raise ValueError('invalid stream name: %s' % (stream,))
if sys.version_info[0] < 3:
if sys.platform == 'win32':
# set I/O stream binary flag on python2.x (Windows)
runtime = platform.python_implementation()
if runtime == 'PyPy':
# the msvcrt trick doesn't work in pypy, so use fdopen().
mode = 'rb' if stream == 'stdin' else 'wb'
stdio = os.fdopen(stdio.fileno(), mode, 0)
else:
# this works with CPython -- untested on other implementations
import msvcrt
msvcrt.setmode(stdio.fileno(), os.O_BINARY)
return stdio
else:
try:
return stdio.buffer
except AttributeError:
# The Python reference explains
# (-> https://docs.python.org/3/library/sys.html#sys.stdin)
# that the `.buffer` attribute might not exist, since
# the standard streams might have been replaced by something else
# (such as an `io.StringIO()` - perhaps via
# `contextlib.redirect_stdout()`).
# We fall back to the original stdio in these cases.
if stream == 'stdin': return sys.__stdin__.buffer
if stream == 'stdout': return sys.__stdout__.buffer
if stream == 'stderr': return sys.__stderr__.buffer
assert False, 'Impossible Situation.'
def main(args=None):
parser = argparse.ArgumentParser(
prog=os.path.basename(__file__), description=__doc__)
parser.add_argument(
'--version', action='version', version=brotli.version)
parser.add_argument(
'-i',
'--input',
metavar='FILE',
type=str,
dest='infile',
help='Input file',
default=None)
parser.add_argument(
'-o',
'--output',
metavar='FILE',
type=str,
dest='outfile',
help='Output file',
default=None)
parser.add_argument(
'-f',
'--force',
action='store_true',
help='Overwrite existing output file',
default=False)
parser.add_argument(
'-d',
'--decompress',
action='store_true',
help='Decompress input file',
default=False)
params = parser.add_argument_group('optional encoder parameters')
params.add_argument(
'-m',
'--mode',
metavar='MODE',
type=int,
choices=[0, 1, 2],
help='The compression mode can be 0 for generic input, '
'1 for UTF-8 encoded text, or 2 for WOFF 2.0 font data. '
'Defaults to 0.')
params.add_argument(
'-q',
'--quality',
metavar='QUALITY',
type=int,
choices=list(range(0, 12)),
help='Controls the compression-speed vs compression-density '
'tradeoff. The higher the quality, the slower the '
'compression. Range is 0 to 11. Defaults to 11.')
params.add_argument(
'--lgwin',
metavar='LGWIN',
type=int,
choices=list(range(10, 25)),
help='Base 2 logarithm of the sliding window size. Range is '
'10 to 24. Defaults to 22.')
params.add_argument(
'--lgblock',
metavar='LGBLOCK',
type=int,
choices=[0] + list(range(16, 25)),
help='Base 2 logarithm of the maximum input block size. '
'Range is 16 to 24. If set to 0, the value will be set based '
'on the quality. Defaults to 0.')
# set default values using global _DEFAULT_PARAMS dictionary
parser.set_defaults(**_DEFAULT_PARAMS)
options = parser.parse_args(args=args)
if options.infile:
try:
with open(options.infile, 'rb') as infile:
data = infile.read()
except OSError:
parser.error('Could not read --infile: %s' % (infile,))
else:
if sys.stdin.isatty():
# interactive console, just quit
parser.error('No input (called from interactive terminal).')
infile = get_binary_stdio('stdin')
data = infile.read()
if options.outfile:
# Caution! If `options.outfile` is a broken symlink, will try to
# redirect the write according to symlink.
if os.path.exists(options.outfile) and not options.force:
parser.error(('Target --outfile=%s already exists, '
'but --force was not requested.') % (options.outfile,))
outfile = open(options.outfile, 'wb')
did_open_outfile = True
else:
outfile = get_binary_stdio('stdout')
did_open_outfile = False
try:
try:
if options.decompress:
data = brotli.decompress(data)
else:
data = brotli.compress(
data,
mode=options.mode,
quality=options.quality,
lgwin=options.lgwin,
lgblock=options.lgblock)
outfile.write(data)
finally:
if did_open_outfile: outfile.close()
except brotli.error as e:
parser.exit(1,
'bro: error: %s: %s' % (e, options.infile or '{stdin}'))
if __name__ == '__main__':
main()

View File

@@ -1,27 +1,21 @@
"""Common utilities for Brotli tests."""
from __future__ import print_function
import filecmp
import glob
import itertools
import os
import pathlib
import sys
import sysconfig
import tempfile
import unittest
# TODO(eustas): use str(pathlib.PurePath(file).parent.parent) for Python 3.4+
project_dir = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
project_dir = str(pathlib.PurePath(__file__).parent.parent.parent)
runtime_dir = os.getenv('TEST_SRCDIR')
test_dir = os.getenv('BROTLI_TESTS_PATH')
BRO_ARGS = [os.getenv('BROTLI_WRAPPER')]
# Fallbacks
if test_dir is None:
if test_dir and runtime_dir:
test_dir = os.path.join(runtime_dir, test_dir)
elif test_dir is None:
test_dir = os.path.join(project_dir, 'tests')
if BRO_ARGS[0] is None:
python_exe = sys.executable or 'python'
bro_path = os.path.join(project_dir, 'python', 'bro.py')
BRO_ARGS = [python_exe, bro_path]
# Get the platform/version-specific build folder.
# By default, the distutils build base is in the same location as setup.py.
@@ -41,113 +35,49 @@ else:
TESTDATA_DIR = os.path.join(test_dir, 'testdata')
TESTDATA_FILES = [
'empty', # Empty file
'10x10y', # Small text
'alice29.txt', # Large text
'random_org_10k.bin', # Small data
'mapsdatazrh', # Large data
'ukkonooa', # Poem
'cp1251-utf16le', # Codepage 1251 table saved in UTF16-LE encoding
'cp852-utf8', # Codepage 852 table saved in UTF8 encoding
# TODO(eustas): add test on already compressed content
]
# Some files might be missing in a lightweight sources pack.
TESTDATA_PATH_CANDIDATES = [
os.path.join(TESTDATA_DIR, f) for f in TESTDATA_FILES
]
TESTDATA_PATHS = [
path for path in TESTDATA_PATH_CANDIDATES if os.path.isfile(path)
]
TESTDATA_PATHS_FOR_DECOMPRESSION = glob.glob(
os.path.join(TESTDATA_DIR, '*.compressed')
)
TEMP_DIR = tempfile.mkdtemp()
def gather_text_inputs():
"""Discover inputs for decompression tests."""
all_inputs = [
'empty', # Empty file
'10x10y', # Small text
'alice29.txt', # Large text
'random_org_10k.bin', # Small data
'mapsdatazrh', # Large data
'ukkonooa', # Poem
'cp1251-utf16le', # Codepage 1251 table saved in UTF16-LE encoding
'cp852-utf8', # Codepage 852 table saved in UTF8 encoding
# TODO(eustas): add test on already compressed content
]
# Filter out non-existing files; e.g. in lightweight sources pack.
return [
f for f in all_inputs if os.path.isfile(os.path.join(TESTDATA_DIR, f))
]
def get_temp_compressed_name(filename):
return os.path.join(TEMP_DIR, os.path.basename(filename + '.bro'))
def gather_compressed_inputs():
"""Discover inputs for compression tests."""
candidates = glob.glob(os.path.join(TESTDATA_DIR, '*.compressed'))
pairs = [(f, f.split('.compressed')[0]) for f in candidates]
existing = [
pair
for pair in pairs
if os.path.isfile(pair[0]) and os.path.isfile(pair[1])
]
return [
(os.path.basename(pair[0]), (os.path.basename(pair[1])))
for pair in existing
]
def get_temp_uncompressed_name(filename):
return os.path.join(TEMP_DIR, os.path.basename(filename + '.unbro'))
def take_input(input_name):
with open(os.path.join(TESTDATA_DIR, input_name), 'rb') as f:
return f.read()
def bind_method_args(method, *args, **kwargs):
return lambda self: method(self, *args, **kwargs)
def has_input(input_name):
return os.path.isfile(os.path.join(TESTDATA_DIR, input_name))
# TODO(eustas): migrate to absl.testing.parameterized.
def generate_test_methods(
test_case_class, for_decompression=False, variants=None
):
"""Adds test methods for each test data file and each variant.
This makes identifying problems with specific compression scenarios easier.
Args:
test_case_class: The test class to add methods to.
for_decompression: If True, uses compressed test data files.
variants: A dictionary where keys are option names and values are lists of
possible values for that option. Each combination of variants will
generate a separate test method.
"""
if for_decompression:
paths = [
path for path in TESTDATA_PATHS_FOR_DECOMPRESSION
if os.path.exists(path.replace('.compressed', ''))
]
else:
paths = TESTDATA_PATHS
opts = []
if variants:
opts_list = []
for k, v in variants.items():
opts_list.append([r for r in itertools.product([k], v)])
for o in itertools.product(*opts_list):
opts_name = '_'.join([str(i) for i in itertools.chain(*o)])
opts_dict = dict(o)
opts.append([opts_name, opts_dict])
else:
opts.append(['', {}])
for method in [m for m in dir(test_case_class) if m.startswith('_test')]:
for testdata in paths:
for opts_name, opts_dict in opts:
f = os.path.splitext(os.path.basename(testdata))[0]
name = 'test_{method}_{options}_{file}'.format(
method=method, options=opts_name, file=f
)
func = bind_method_args(
getattr(test_case_class, method), testdata, **opts_dict
)
setattr(test_case_class, name, func)
class TestCase(unittest.TestCase):
"""Base class for Brotli test cases.
Provides common setup and teardown logic, including cleaning up temporary
files and a utility for comparing file contents.
"""
def tearDown(self):
for f in TESTDATA_PATHS:
try:
os.unlink(get_temp_compressed_name(f))
except OSError:
pass
try:
os.unlink(get_temp_uncompressed_name(f))
except OSError:
pass
# super().tearDown() # Requires Py3+
def assert_files_match(self, first, second):
self.assertTrue(
filecmp.cmp(first, second, shallow=False),
'File {} differs from {}'.format(first, second),
)
def chunk_input(data, chunk_size):
return [data[i:i + chunk_size] for i in range(0, len(data), chunk_size)]

View File

@@ -1,104 +0,0 @@
# Copyright 2016 The Brotli Authors. All rights reserved.
#
# Distributed under MIT license.
# See file LICENSE for detail or copy at https://opensource.org/licenses/MIT
import subprocess
import unittest
from . import _test_utils
BRO_ARGS = _test_utils.BRO_ARGS
TEST_ENV = _test_utils.TEST_ENV
def _get_original_name(test_data):
return test_data.split('.compressed')[0]
class TestBroDecompress(_test_utils.TestCase):
def _check_decompression(self, test_data):
# Verify decompression matches the original.
temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data)
original = _get_original_name(test_data)
self.assert_files_match(temp_uncompressed, original)
def _decompress_file(self, test_data):
temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data)
args = BRO_ARGS + ['-f', '-d', '-i', test_data, '-o', temp_uncompressed]
subprocess.check_call(args, env=TEST_ENV)
def _decompress_pipe(self, test_data):
temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data)
args = BRO_ARGS + ['-d']
with open(temp_uncompressed, 'wb') as out_file:
with open(test_data, 'rb') as in_file:
subprocess.check_call(
args, stdin=in_file, stdout=out_file, env=TEST_ENV
)
def _test_decompress_file(self, test_data):
self._decompress_file(test_data)
self._check_decompression(test_data)
def _test_decompress_pipe(self, test_data):
self._decompress_pipe(test_data)
self._check_decompression(test_data)
_test_utils.generate_test_methods(TestBroDecompress, for_decompression=True)
class TestBroCompress(_test_utils.TestCase):
VARIANTS = {'quality': (1, 6, 9, 11), 'lgwin': (10, 15, 20, 24)}
def _check_decompression(self, test_data):
# Write decompression to temp file and verify it matches the original.
temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data)
temp_compressed = _test_utils.get_temp_compressed_name(test_data)
original = test_data
args = BRO_ARGS + ['-f', '-d']
args.extend(['-i', temp_compressed, '-o', temp_uncompressed])
subprocess.check_call(args, env=TEST_ENV)
self.assert_files_match(temp_uncompressed, original)
def _compress_file(self, test_data, **kwargs):
temp_compressed = _test_utils.get_temp_compressed_name(test_data)
args = BRO_ARGS + ['-f']
if 'quality' in kwargs:
args.extend(['-q', str(kwargs['quality'])])
if 'lgwin' in kwargs:
args.extend(['--lgwin', str(kwargs['lgwin'])])
args.extend(['-i', test_data, '-o', temp_compressed])
subprocess.check_call(args, env=TEST_ENV)
def _compress_pipe(self, test_data, **kwargs):
temp_compressed = _test_utils.get_temp_compressed_name(test_data)
args = BRO_ARGS
if 'quality' in kwargs:
args.extend(['-q', str(kwargs['quality'])])
if 'lgwin' in kwargs:
args.extend(['--lgwin', str(kwargs['lgwin'])])
with open(temp_compressed, 'wb') as out_file:
with open(test_data, 'rb') as in_file:
subprocess.check_call(
args, stdin=in_file, stdout=out_file, env=TEST_ENV
)
def _test_compress_file(self, test_data, **kwargs):
self._compress_file(test_data, **kwargs)
self._check_decompression(test_data)
def _test_compress_pipe(self, test_data, **kwargs):
self._compress_pipe(test_data, **kwargs)
self._check_decompression(test_data)
_test_utils.generate_test_methods(
TestBroCompress, variants=TestBroCompress.VARIANTS
)
if __name__ == '__main__':
unittest.main()

View File

@@ -3,39 +3,17 @@
# Distributed under MIT license.
# See file LICENSE for detail or copy at https://opensource.org/licenses/MIT
import unittest
import brotli
import pytest
from . import _test_utils
class TestCompress(_test_utils.TestCase):
VARIANTS = {'quality': (1, 6, 9, 11), 'lgwin': (10, 15, 20, 24)}
def _check_decompression(self, test_data):
# Write decompression to temp file and verify it matches the original.
temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data)
temp_compressed = _test_utils.get_temp_compressed_name(test_data)
original = test_data
with open(temp_uncompressed, 'wb') as out_file:
with open(temp_compressed, 'rb') as in_file:
out_file.write(brotli.decompress(in_file.read()))
self.assert_files_match(temp_uncompressed, original)
def _compress(self, test_data, **kwargs):
temp_compressed = _test_utils.get_temp_compressed_name(test_data)
with open(temp_compressed, 'wb') as out_file:
with open(test_data, 'rb') as in_file:
out_file.write(brotli.compress(in_file.read(), **kwargs))
def _test_compress(self, test_data, **kwargs):
self._compress(test_data, **kwargs)
self._check_decompression(test_data)
_test_utils.generate_test_methods(TestCompress, variants=TestCompress.VARIANTS)
if __name__ == '__main__':
unittest.main()
@pytest.mark.parametrize("quality", [1, 6, 9, 11])
@pytest.mark.parametrize("lgwin", [10, 15, 20, 24])
@pytest.mark.parametrize("text_name", _test_utils.gather_text_inputs())
def test_compress(quality, lgwin, text_name):
original = _test_utils.take_input(text_name)
compressed = brotli.compress(original, quality=quality, lgwin=lgwin)
decompressed = brotli.decompress(compressed)
assert original == decompressed

View File

@@ -3,98 +3,49 @@
# Distributed under MIT license.
# See file LICENSE for detail or copy at https://opensource.org/licenses/MIT
import functools
import unittest
import brotli
import pytest
from . import _test_utils
# Do not inherit from TestCase here to ensure that test methods
# are not run automatically and instead are run as part of a specific
# configuration below.
class _TestCompressor(object):
CHUNK_SIZE = 2048
def tearDown(self):
self.compressor = None
# super().tearDown() # Requires Py3+
def _check_decompression(self, test_data):
# Write decompression to temp file and verify it matches the original.
temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data)
temp_compressed = _test_utils.get_temp_compressed_name(test_data)
original = test_data
with open(temp_uncompressed, 'wb') as out_file:
with open(temp_compressed, 'rb') as in_file:
out_file.write(brotli.decompress(in_file.read()))
self.assert_files_match(temp_uncompressed, original)
def _test_single_process(self, test_data):
# Write single-shot compression to temp file.
temp_compressed = _test_utils.get_temp_compressed_name(test_data)
with open(temp_compressed, 'wb') as out_file:
with open(test_data, 'rb') as in_file:
out_file.write(self.compressor.process(in_file.read()))
out_file.write(self.compressor.finish())
self._check_decompression(test_data)
def _test_multiple_process(self, test_data):
# Write chunked compression to temp file.
temp_compressed = _test_utils.get_temp_compressed_name(test_data)
with open(temp_compressed, 'wb') as out_file:
with open(test_data, 'rb') as in_file:
read_chunk = functools.partial(in_file.read, self.CHUNK_SIZE)
for data in iter(read_chunk, b''):
out_file.write(self.compressor.process(data))
out_file.write(self.compressor.finish())
self._check_decompression(test_data)
def _test_multiple_process_and_flush(self, test_data):
# Write chunked and flushed compression to temp file.
temp_compressed = _test_utils.get_temp_compressed_name(test_data)
with open(temp_compressed, 'wb') as out_file:
with open(test_data, 'rb') as in_file:
read_chunk = functools.partial(in_file.read, self.CHUNK_SIZE)
for data in iter(read_chunk, b''):
out_file.write(self.compressor.process(data))
out_file.write(self.compressor.flush())
out_file.write(self.compressor.finish())
self._check_decompression(test_data)
@pytest.mark.parametrize("quality", [1, 6, 9, 11])
@pytest.mark.parametrize("text_name", _test_utils.gather_text_inputs())
def test_single_process(quality, text_name):
original = _test_utils.take_input(text_name)
compressor = brotli.Compressor(quality=quality)
compressed = compressor.process(original)
compressed += compressor.finish()
decompressed = brotli.decompress(compressed)
assert original == decompressed
_test_utils.generate_test_methods(_TestCompressor)
@pytest.mark.parametrize("quality", [1, 6, 9, 11])
@pytest.mark.parametrize("text_name", _test_utils.gather_text_inputs())
def test_multiple_process(quality, text_name):
original = _test_utils.take_input(text_name)
chunk_size = 2048
chunks = _test_utils.chunk_input(original, chunk_size)
compressor = brotli.Compressor(quality=quality)
compressed = b''
for chunk in chunks:
compressed += compressor.process(chunk)
compressed += compressor.finish()
decompressed = brotli.decompress(compressed)
assert original == decompressed
class TestCompressorQuality1(_TestCompressor, _test_utils.TestCase):
def setUp(self):
# super().setUp() # Requires Py3+
self.compressor = brotli.Compressor(quality=1)
class TestCompressorQuality6(_TestCompressor, _test_utils.TestCase):
def setUp(self):
# super().setUp() # Requires Py3+
self.compressor = brotli.Compressor(quality=6)
class TestCompressorQuality9(_TestCompressor, _test_utils.TestCase):
def setUp(self):
# super().setUp() # Requires Py3+
self.compressor = brotli.Compressor(quality=9)
class TestCompressorQuality11(_TestCompressor, _test_utils.TestCase):
def setUp(self):
# super().setUp() # Requires Py3+
self.compressor = brotli.Compressor(quality=11)
if __name__ == '__main__':
unittest.main()
@pytest.mark.parametrize("quality", [1, 6, 9, 11])
@pytest.mark.parametrize("text_name", _test_utils.gather_text_inputs())
def test_multiple_process_and_flush(quality, text_name):
original = _test_utils.take_input(text_name)
chunk_size = 2048
chunks = _test_utils.chunk_input(original, chunk_size)
compressor = brotli.Compressor(quality=quality)
compressed = b''
for chunk in chunks:
compressed += compressor.process(chunk)
compressed += compressor.flush()
compressed += compressor.finish()
decompressed = brotli.decompress(compressed)
assert original == decompressed

View File

@@ -3,41 +3,22 @@
# Distributed under MIT license.
# See file LICENSE for detail or copy at https://opensource.org/licenses/MIT
import unittest
import brotli
import pytest
from . import _test_utils
def _get_original_name(test_data):
return test_data.split('.compressed')[0]
@pytest.mark.parametrize(
'compressed_name, original_name', _test_utils.gather_compressed_inputs()
)
def test_decompress(compressed_name, original_name):
compressed = _test_utils.take_input(compressed_name)
original = _test_utils.take_input(original_name)
decompressed = brotli.decompress(compressed)
assert decompressed == original
class TestDecompress(_test_utils.TestCase):
def _check_decompression(self, test_data):
# Verify decompression matches the original.
temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data)
original = _get_original_name(test_data)
self.assert_files_match(temp_uncompressed, original)
def _decompress(self, test_data):
temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data)
with open(temp_uncompressed, 'wb') as out_file:
with open(test_data, 'rb') as in_file:
out_file.write(brotli.decompress(in_file.read()))
def _test_decompress(self, test_data):
self._decompress(test_data)
self._check_decompression(test_data)
def test_garbage_appended(self):
with self.assertRaises(brotli.error):
brotli.decompress(brotli.compress(b'a') + b'a')
_test_utils.generate_test_methods(TestDecompress, for_decompression=True)
if __name__ == '__main__':
unittest.main()
def test_garbage_appended():
with pytest.raises(brotli.error):
brotli.decompress(brotli.compress(b'a') + b'a')

View File

@@ -3,116 +3,89 @@
# Distributed under MIT license.
# See file LICENSE for detail or copy at https://opensource.org/licenses/MIT
import functools
import os
import unittest
import brotli
import pytest
from . import _test_utils
def _get_original_name(test_data):
return test_data.split('.compressed')[0]
MIN_OUTPUT_BUFFER_SIZE = 32768 # Actually, several bytes less.
class TestDecompressor(_test_utils.TestCase):
@pytest.mark.parametrize(
'compressed_name, original_name', _test_utils.gather_compressed_inputs()
)
def test_decompress(compressed_name, original_name):
decompressor = brotli.Decompressor()
compressed = _test_utils.take_input(compressed_name)
original = _test_utils.take_input(original_name)
chunk_size = 1
chunks = _test_utils.chunk_input(compressed, chunk_size)
decompressed = b''
for chunk in chunks:
decompressed += decompressor.process(chunk)
assert decompressor.is_finished()
assert original == decompressed
CHUNK_SIZE = 1
MIN_OUTPUT_BUFFER_SIZE = 32768 # Actually, several bytes less.
def setUp(self):
# super().setUp() # Requires Py3+
self.decompressor = brotli.Decompressor()
def tearDown(self):
self.decompressor = None
# super().tearDown() # Requires Py3+
def _check_decompression(self, test_data):
# Verify decompression matches the original.
temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data)
original = _get_original_name(test_data)
self.assert_files_match(temp_uncompressed, original)
def _decompress(self, test_data):
temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data)
with open(temp_uncompressed, 'wb') as out_file:
with open(test_data, 'rb') as in_file:
read_chunk = functools.partial(in_file.read, self.CHUNK_SIZE)
for data in iter(read_chunk, b''):
out_file.write(self.decompressor.process(data))
self.assertTrue(self.decompressor.is_finished())
def _decompress_with_limit(self, test_data):
output_buffer_limit = 10922
temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data)
with open(temp_uncompressed, 'wb') as out_file:
with open(test_data, 'rb') as in_file:
chunk_iter = iter(functools.partial(in_file.read, 10 * 1024), b'')
while not self.decompressor.is_finished():
data = b''
if self.decompressor.can_accept_more_data():
data = next(chunk_iter, b'')
decompressed_data = self.decompressor.process(
data, output_buffer_limit=output_buffer_limit
)
self.assertLessEqual(
len(decompressed_data), self.MIN_OUTPUT_BUFFER_SIZE
)
out_file.write(decompressed_data)
self.assertIsNone(next(chunk_iter, None))
def _test_decompress(self, test_data):
self._decompress(test_data)
self._check_decompression(test_data)
def _test_decompress_with_limit(self, test_data):
self._decompress_with_limit(test_data)
self._check_decompression(test_data)
def test_too_much_input(self):
with open(
os.path.join(_test_utils.TESTDATA_DIR, 'zerosukkanooa.compressed'), 'rb'
) as in_file:
compressed = in_file.read()
self.decompressor.process(compressed[:-1], output_buffer_limit=10240)
# the following assertion checks whether the test setup is correct
self.assertFalse(self.decompressor.can_accept_more_data())
with self.assertRaises(brotli.error):
self.decompressor.process(compressed[-1:])
def test_changing_limit(self):
test_data = os.path.join(
_test_utils.TESTDATA_DIR, 'zerosukkanooa.compressed'
@pytest.mark.parametrize(
'compressed_name, original_name', _test_utils.gather_compressed_inputs()
)
def test_decompress_with_limit(compressed_name, original_name):
decompressor = brotli.Decompressor()
compressed = _test_utils.take_input(compressed_name)
original = _test_utils.take_input(original_name)
chunk_size = 10 * 1024
output_buffer_limit = 10922
chunks = _test_utils.chunk_input(compressed, chunk_size)
decompressed = b''
while not decompressor.is_finished():
data = b''
if decompressor.can_accept_more_data() and chunks:
data = chunks.pop(0)
decompressed_chunk = decompressor.process(
data, output_buffer_limit=output_buffer_limit
)
check_output = os.path.exists(test_data.replace('.compressed', ''))
temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data)
with open(temp_uncompressed, 'wb') as out_file:
with open(test_data, 'rb') as in_file:
compressed = in_file.read()
uncompressed = self.decompressor.process(
compressed[:-1], output_buffer_limit=10240
)
self.assertLessEqual(len(uncompressed), self.MIN_OUTPUT_BUFFER_SIZE)
out_file.write(uncompressed)
while not self.decompressor.can_accept_more_data():
out_file.write(self.decompressor.process(b''))
out_file.write(self.decompressor.process(compressed[-1:]))
if check_output:
self._check_decompression(test_data)
def test_garbage_appended(self):
with self.assertRaises(brotli.error):
self.decompressor.process(brotli.compress(b'a') + b'a')
def test_already_finished(self):
self.decompressor.process(brotli.compress(b'a'))
with self.assertRaises(brotli.error):
self.decompressor.process(b'a')
assert len(decompressed_chunk) <= MIN_OUTPUT_BUFFER_SIZE
decompressed += decompressed_chunk
assert not chunks
assert original == decompressed
_test_utils.generate_test_methods(TestDecompressor, for_decompression=True)
def test_too_much_input():
decompressor = brotli.Decompressor()
compressed = _test_utils.take_input('zerosukkanooa.compressed')
decompressor.process(compressed[:-1], output_buffer_limit=10240)
# The following assertion checks whether the test setup is correct.
assert not decompressor.can_accept_more_data()
with pytest.raises(brotli.error):
decompressor.process(compressed[-1:])
if __name__ == '__main__':
unittest.main()
def test_changing_limit():
decompressor = brotli.Decompressor()
input_name = 'zerosukkanooa'
compressed = _test_utils.take_input(input_name + '.compressed')
check_output = _test_utils.has_input(input_name)
decompressed = decompressor.process(
compressed[:-1], output_buffer_limit=10240
)
assert len(decompressed) <= MIN_OUTPUT_BUFFER_SIZE
while not decompressor.can_accept_more_data():
decompressed += decompressor.process(b'')
decompressed += decompressor.process(compressed[-1:])
if check_output:
original = _test_utils.take_input(input_name)
assert original == decompressed
def test_garbage_appended():
decompressor = brotli.Decompressor()
with pytest.raises(brotli.error):
decompressor.process(brotli.compress(b'a') + b'a')
def test_already_finished():
decompressor = brotli.Decompressor()
decompressor.process(brotli.compress(b'a'))
with pytest.raises(brotli.error):
decompressor.process(b'a')

View File

@@ -8,6 +8,7 @@ I found the following issues with the Brotli format:
- The block type code is useless if NBLTYPES==2, you would only need 1 symbol
anyway, so why don't you just switch to "the other" type?
"""
# ruff: noqa
import struct
from operator import itemgetter, methodcaller
from itertools import accumulate, repeat

View File

@@ -18,13 +18,11 @@ for line in lines:
if appendix_a_found:
if re_data_line.match(line) is not None:
data = line.strip()
for i in range(32):
dictionary.append(int(data[2 * i:2 * i + 2], 16))
dictionary.extend(int(data[2 * i:2 * i + 2], 16) for i in range(32))
if len(dictionary) == 122784:
break
else:
if line.startswith("Appendix A."):
appendix_a_found = True
elif line.startswith("Appendix A."):
appendix_a_found = True
bin_path = "dictionary.bin"

View File

@@ -40,13 +40,12 @@ for b in data:
is_skip = False
hi.append(unichr(cntr))
cntr = skip_flip_offset + 1
elif value >= 0x80:
cntr += 1
else:
if value >= 0x80:
cntr += 1
else:
is_skip = True
hi.append(unichr(cntr))
cntr = skip_flip_offset + 1
is_skip = True
hi.append(unichr(cntr))
cntr = skip_flip_offset + 1
hi.append(unichr(cntr))
low0 = low[0:len(low) // 2]
@@ -56,15 +55,15 @@ low1 = low[len(low) // 2:len(low)]
def escape(chars):
result = []
for c in chars:
if "\r" == c:
if c == "\r":
result.append("\\r")
elif "\n" == c:
elif c == "\n":
result.append("\\n")
elif "\t" == c:
elif c == "\t":
result.append("\\t")
elif "\"" == c:
elif c == "\"":
result.append("\\\"")
elif "\\" == c:
elif c == "\\":
result.append("\\\\")
elif ord(c) < 32 or ord(c) >= 127:
result.append("\\u%04X" % ord(c))

View File

@@ -4,16 +4,11 @@
# See file LICENSE for detail or copy at https://opensource.org/licenses/MIT
import os
import platform
import re
import unittest
try:
from setuptools import Extension
from setuptools import setup
except:
from distutils.core import Extension
from distutils.core import setup
from setuptools import Extension
from setuptools import setup
from distutils.command.build_ext import build_ext
from distutils import errors
from distutils import dep_util
@@ -58,8 +53,7 @@ def get_version():
def get_test_suite():
test_loader = unittest.TestLoader()
test_suite = test_loader.discover("python", pattern="*_test.py")
return test_suite
return test_loader.discover("python", pattern="*_test.py")
class BuildExt(build_ext):
@@ -82,13 +76,9 @@ class BuildExt(build_ext):
if not (self.force or dep_util.newer_group(depends, ext_path, "newer")):
log.debug("skipping '%s' extension (up-to-date)", ext.name)
return
else:
log.info("building '%s' extension", ext.name)
log.info("building '%s' extension", ext.name)
c_sources = []
for source in ext.sources:
if source.endswith(".c"):
c_sources.append(source)
c_sources = [source for source in ext.sources if source.endswith(".c")]
extra_args = ext.extra_compile_args or []
objects = []
@@ -166,12 +156,12 @@ CLASSIFIERS = [
"Programming Language :: C",
"Programming Language :: C++",
"Programming Language :: Python",
"Programming Language :: Python :: 2",
"Programming Language :: Python :: 2.7",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.3",
"Programming Language :: Python :: 3.4",
"Programming Language :: Python :: 3.5",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
"Programming Language :: Python :: 3.14",
"Programming Language :: Unix Shell",
"Topic :: Software Development :: Libraries",
"Topic :: Software Development :: Libraries :: Python Modules",
@@ -189,7 +179,7 @@ USE_SYSTEM_BROTLI = bool_from_environ("USE_SYSTEM_BROTLI")
if USE_SYSTEM_BROTLI:
import pkgconfig
REQUIRED_BROTLI_SYSTEM_LIBRARIES = ["libbrotlicommon", "libbrotlienc", "libbrotlidec"]
define_macros = []