diff --git a/src/SharpCompress/Compressors/BZip2/BZip2Stream.cs b/src/SharpCompress/Compressors/BZip2/BZip2Stream.cs index 5b9884ca..e3325f27 100644 --- a/src/SharpCompress/Compressors/BZip2/BZip2Stream.cs +++ b/src/SharpCompress/Compressors/BZip2/BZip2Stream.cs @@ -1,5 +1,7 @@ using System; using System.IO; +using System.Threading; +using System.Threading.Tasks; using SharpCompress.IO; namespace SharpCompress.Compressors.BZip2; @@ -96,13 +98,37 @@ public sealed class BZip2Stream : Stream, IStreamStack public override void SetLength(long value) => stream.SetLength(value); -#if !NETFRAMEWORK&& !NETSTANDARD2_0 +#if !NETFRAMEWORK && !NETSTANDARD2_0 public override int Read(Span buffer) => stream.Read(buffer); public override void Write(ReadOnlySpan buffer) => stream.Write(buffer); + + public override async ValueTask ReadAsync( + Memory buffer, + CancellationToken cancellationToken = default + ) => await stream.ReadAsync(buffer, cancellationToken).ConfigureAwait(false); + + public override async ValueTask WriteAsync( + ReadOnlyMemory buffer, + CancellationToken cancellationToken = default + ) => await stream.WriteAsync(buffer, cancellationToken).ConfigureAwait(false); #endif + public override async Task ReadAsync( + byte[] buffer, + int offset, + int count, + CancellationToken cancellationToken = default + ) => await stream.ReadAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false); + + public override async Task WriteAsync( + byte[] buffer, + int offset, + int count, + CancellationToken cancellationToken = default + ) => await stream.WriteAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false); + public override void Write(byte[] buffer, int offset, int count) => stream.Write(buffer, offset, count); diff --git a/src/SharpCompress/Compressors/BZip2/CBZip2InputStream.cs b/src/SharpCompress/Compressors/BZip2/CBZip2InputStream.cs index abff4973..df164b11 100644 --- a/src/SharpCompress/Compressors/BZip2/CBZip2InputStream.cs +++ b/src/SharpCompress/Compressors/BZip2/CBZip2InputStream.cs @@ -2,6 +2,8 @@ using System; using System.IO; +using System.Threading; +using System.Threading.Tasks; using SharpCompress.IO; /* @@ -1127,6 +1129,28 @@ internal class CBZip2InputStream : Stream, IStreamStack return k; } + public override async Task ReadAsync( + byte[] buffer, + int offset, + int count, + CancellationToken cancellationToken = default + ) + { + var c = -1; + int k; + for (k = 0; k < count; ++k) + { + cancellationToken.ThrowIfCancellationRequested(); + c = ReadByte(); + if (c == -1) + { + break; + } + buffer[k + offset] = (byte)c; + } + return await Task.FromResult(k); + } + public override long Seek(long offset, SeekOrigin origin) => 0; public override void SetLength(long value) { } diff --git a/src/SharpCompress/Compressors/BZip2/CBZip2OutputStream.cs b/src/SharpCompress/Compressors/BZip2/CBZip2OutputStream.cs index 555c6fcb..cebfc6b7 100644 --- a/src/SharpCompress/Compressors/BZip2/CBZip2OutputStream.cs +++ b/src/SharpCompress/Compressors/BZip2/CBZip2OutputStream.cs @@ -1,5 +1,7 @@ using System; using System.IO; +using System.Threading; +using System.Threading.Tasks; using SharpCompress.IO; /* @@ -2022,6 +2024,21 @@ internal sealed class CBZip2OutputStream : Stream, IStreamStack } } + public override async Task WriteAsync( + byte[] buffer, + int offset, + int count, + CancellationToken cancellationToken = default + ) + { + for (var k = 0; k < count; ++k) + { + cancellationToken.ThrowIfCancellationRequested(); + WriteByte(buffer[k + offset]); + } + await Task.CompletedTask; + } + public override bool CanRead => false; public override bool CanSeek => false; diff --git a/tests/SharpCompress.Test/Compressors/BZip2/BZip2StreamAsyncTests.cs b/tests/SharpCompress.Test/Compressors/BZip2/BZip2StreamAsyncTests.cs new file mode 100644 index 00000000..0c10ca64 --- /dev/null +++ b/tests/SharpCompress.Test/Compressors/BZip2/BZip2StreamAsyncTests.cs @@ -0,0 +1,231 @@ +using System; +using System.Buffers; +using System.IO; +using System.Text; +using System.Threading.Tasks; +using SharpCompress.Compressors.BZip2; +using Xunit; + +namespace SharpCompress.Test.BZip2; + +public class BZip2StreamAsyncTests +{ + private byte[] CreateTestData(int size) + { + var data = new byte[size]; + // Create compressible data with repetitive pattern + for (int i = 0; i < size; i++) + { + data[i] = (byte)('A' + (i % 26)); + } + return data; + } + + [Fact] + public async Task BZip2CompressDecompressAsyncTest() + { + var testData = CreateTestData(10000); + byte[] compressed; + + // Compress + using (var memoryStream = new MemoryStream()) + { + using ( + var bzip2Stream = new BZip2Stream( + memoryStream, + SharpCompress.Compressors.CompressionMode.Compress, + false + ) + ) + { + await bzip2Stream.WriteAsync(testData, 0, testData.Length); + (bzip2Stream as BZip2Stream)?.Finish(); + } + compressed = memoryStream.ToArray(); + } + + // Verify compression occurred + Assert.True(compressed.Length > 0); + Assert.True(compressed.Length < testData.Length); + + // Decompress + byte[] decompressed; + using (var memoryStream = new MemoryStream(compressed)) + { + using ( + var bzip2Stream = new BZip2Stream( + memoryStream, + SharpCompress.Compressors.CompressionMode.Decompress, + false + ) + ) + { + decompressed = new byte[testData.Length]; + var totalRead = 0; + int bytesRead; + while ( + ( + bytesRead = await bzip2Stream.ReadAsync( + decompressed, + totalRead, + testData.Length - totalRead + ) + ) > 0 + ) + { + totalRead += bytesRead; + } + } + } + + // Verify decompression + Assert.Equal(testData, decompressed); + } + + [Fact] + public async Task BZip2ReadAsyncWithCancellationTest() + { + var testData = Encoding.ASCII.GetBytes(new string('A', 5000)); // Repetitive data compresses well + byte[] compressed; + + // Compress + using (var memoryStream = new MemoryStream()) + { + using ( + var bzip2Stream = new BZip2Stream( + memoryStream, + SharpCompress.Compressors.CompressionMode.Compress, + false + ) + ) + { + await bzip2Stream.WriteAsync(testData, 0, testData.Length); + (bzip2Stream as BZip2Stream)?.Finish(); + } + compressed = memoryStream.ToArray(); + } + + // Decompress with cancellation support + using (var memoryStream = new MemoryStream(compressed)) + { + using ( + var bzip2Stream = new BZip2Stream( + memoryStream, + SharpCompress.Compressors.CompressionMode.Decompress, + false + ) + ) + { + var buffer = new byte[1024]; + var cts = new System.Threading.CancellationTokenSource(); + + // Read should complete without cancellation + var bytesRead = await bzip2Stream.ReadAsync(buffer, 0, buffer.Length, cts.Token); + Assert.True(bytesRead > 0); + } + } + } + + [Fact] + public async Task BZip2MultipleAsyncWritesTest() + { + using (var memoryStream = new MemoryStream()) + { + using ( + var bzip2Stream = new BZip2Stream( + memoryStream, + SharpCompress.Compressors.CompressionMode.Compress, + false + ) + ) + { + var data1 = Encoding.ASCII.GetBytes("Hello "); + var data2 = Encoding.ASCII.GetBytes("World"); + var data3 = Encoding.ASCII.GetBytes("!"); + + await bzip2Stream.WriteAsync(data1, 0, data1.Length); + await bzip2Stream.WriteAsync(data2, 0, data2.Length); + await bzip2Stream.WriteAsync(data3, 0, data3.Length); + + (bzip2Stream as BZip2Stream)?.Finish(); + } + + var compressed = memoryStream.ToArray(); + Assert.True(compressed.Length > 0); + + // Decompress and verify + using (var readStream = new MemoryStream(compressed)) + { + using ( + var bzip2Stream = new BZip2Stream( + readStream, + SharpCompress.Compressors.CompressionMode.Decompress, + false + ) + ) + { + var result = new StringBuilder(); + var buffer = new byte[256]; + int bytesRead; + while ((bytesRead = await bzip2Stream.ReadAsync(buffer, 0, buffer.Length)) > 0) + { + result.Append(Encoding.ASCII.GetString(buffer, 0, bytesRead)); + } + + Assert.Equal("Hello World!", result.ToString()); + } + } + } + } + + [Fact] + public async Task BZip2LargeDataAsyncTest() + { + var largeData = CreateTestData(100000); + + // Compress + byte[] compressed; + using (var memoryStream = new MemoryStream()) + { + using ( + var bzip2Stream = new BZip2Stream( + memoryStream, + SharpCompress.Compressors.CompressionMode.Compress, + false + ) + ) + { + await bzip2Stream.WriteAsync(largeData, 0, largeData.Length); + (bzip2Stream as BZip2Stream)?.Finish(); + } + compressed = memoryStream.ToArray(); + } + + // Decompress + byte[] decompressed; + using (var memoryStream = new MemoryStream(compressed)) + { + using ( + var bzip2Stream = new BZip2Stream( + memoryStream, + SharpCompress.Compressors.CompressionMode.Decompress, + false + ) + ) + { + decompressed = new byte[largeData.Length]; + var totalRead = 0; + int bytesRead; + var buffer = new byte[4096]; + while ((bytesRead = await bzip2Stream.ReadAsync(buffer, 0, buffer.Length)) > 0) + { + Array.Copy(buffer, 0, decompressed, totalRead, bytesRead); + totalRead += bytesRead; + } + } + } + + // Verify + Assert.Equal(largeData, decompressed); + } +}