start of implementing zip reading async

This commit is contained in:
Adam Hathcock
2025-12-31 14:53:55 +00:00
parent 44b7955d85
commit 8533b09091
8 changed files with 334 additions and 16 deletions

View File

@@ -1,4 +1,6 @@
using System.IO;
using System.Threading;
using System.Threading.Tasks;
namespace SharpCompress.Common;
@@ -14,4 +16,8 @@ public abstract class FilePart
internal abstract Stream? GetCompressedStream();
internal abstract Stream? GetRawStream();
internal bool Skipped { get; set; }
internal virtual Task<Stream?> GetCompressedStreamAsync(
CancellationToken cancellationToken = default
) => Task.FromResult(GetCompressedStream());
}

View File

@@ -2,6 +2,8 @@ using System;
using System.Buffers.Binary;
using System.Collections.Generic;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
namespace SharpCompress.Common.Zip.Headers;
@@ -57,6 +59,24 @@ internal abstract class ZipFileEntry(ZipHeaderType type, ArchiveEncoding archive
return encryptionData;
}
internal async Task<PkwareTraditionalEncryptionData> ComposeEncryptionDataAsync(
Stream archiveStream,
CancellationToken cancellationToken = default
)
{
if (archiveStream is null)
{
throw new ArgumentNullException(nameof(archiveStream));
}
var buffer = new byte[12];
await archiveStream.ReadFullyAsync(buffer, 0, 12, cancellationToken).ConfigureAwait(false);
var encryptionData = PkwareTraditionalEncryptionData.ForRead(Password!, this, buffer);
return encryptionData;
}
internal WinzipAesEncryptionData? WinzipAesEncryptionData { get; set; }
/// <summary>

View File

@@ -1,4 +1,6 @@
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using SharpCompress.Common.Zip.Headers;
using SharpCompress.Compressors.Deflate;
using SharpCompress.IO;
@@ -31,6 +33,28 @@ internal sealed class StreamingZipFilePart : ZipFilePart
return _decompressionStream;
}
internal override async Task<Stream?> GetCompressedStreamAsync(
CancellationToken cancellationToken = default
)
{
if (!Header.HasData)
{
return Stream.Null;
}
_decompressionStream = await CreateDecompressionStreamAsync(
await GetCryptoStreamAsync(CreateBaseStream(), cancellationToken)
.ConfigureAwait(false),
Header.CompressionMethod,
cancellationToken
)
.ConfigureAwait(false);
if (LeaveStreamOpen)
{
return SharpCompressStream.Create(_decompressionStream, leaveOpen: true);
}
return _decompressionStream;
}
internal BinaryReader FixStreamedFileLocation(ref SharpCompressStream rewindableStream)
{
if (Header.IsDirectory)

View File

@@ -2,6 +2,8 @@ using System;
using System.Buffers.Binary;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using SharpCompress.Common.Zip.Headers;
using SharpCompress.Compressors;
using SharpCompress.Compressors.BZip2;
@@ -264,4 +266,220 @@ internal abstract class ZipFilePart : FilePart
}
return plainStream;
}
internal override async Task<Stream?> GetCompressedStreamAsync(
CancellationToken cancellationToken = default
)
{
if (!Header.HasData)
{
return Stream.Null;
}
var decompressionStream = await CreateDecompressionStreamAsync(
await GetCryptoStreamAsync(CreateBaseStream(), cancellationToken)
.ConfigureAwait(false),
Header.CompressionMethod,
cancellationToken
)
.ConfigureAwait(false);
if (LeaveStreamOpen)
{
return SharpCompressStream.Create(decompressionStream, leaveOpen: true);
}
return decompressionStream;
}
protected async Task<Stream> GetCryptoStreamAsync(
Stream plainStream,
CancellationToken cancellationToken = default
)
{
var isFileEncrypted = FlagUtility.HasFlag(Header.Flags, HeaderFlags.Encrypted);
if (Header.CompressedSize == 0 && isFileEncrypted)
{
throw new NotSupportedException("Cannot encrypt file with unknown size at start.");
}
if (
(
Header.CompressedSize == 0
&& FlagUtility.HasFlag(Header.Flags, HeaderFlags.UsePostDataDescriptor)
) || Header.IsZip64
)
{
plainStream = SharpCompressStream.Create(plainStream, leaveOpen: true); //make sure AES doesn't close
}
else
{
plainStream = new ReadOnlySubStream(plainStream, Header.CompressedSize); //make sure AES doesn't close
}
if (isFileEncrypted)
{
switch (Header.CompressionMethod)
{
case ZipCompressionMethod.None:
case ZipCompressionMethod.Shrink:
case ZipCompressionMethod.Reduce1:
case ZipCompressionMethod.Reduce2:
case ZipCompressionMethod.Reduce3:
case ZipCompressionMethod.Reduce4:
case ZipCompressionMethod.Deflate:
case ZipCompressionMethod.Deflate64:
case ZipCompressionMethod.BZip2:
case ZipCompressionMethod.LZMA:
case ZipCompressionMethod.PPMd:
{
return new PkwareTraditionalCryptoStream(
plainStream,
await Header
.ComposeEncryptionDataAsync(plainStream, cancellationToken)
.ConfigureAwait(false),
CryptoMode.Decrypt
);
}
case ZipCompressionMethod.WinzipAes:
{
if (Header.WinzipAesEncryptionData != null)
{
return new WinzipAesCryptoStream(
plainStream,
Header.WinzipAesEncryptionData,
Header.CompressedSize - 10
);
}
return plainStream;
}
default:
{
throw new InvalidOperationException("Header.CompressionMethod is invalid");
}
}
}
return plainStream;
}
protected async Task<Stream> CreateDecompressionStreamAsync(
Stream stream,
ZipCompressionMethod method,
CancellationToken cancellationToken = default
)
{
switch (method)
{
case ZipCompressionMethod.None:
{
if (Header.CompressedSize is 0)
{
return new DataDescriptorStream(stream);
}
return stream;
}
case ZipCompressionMethod.Shrink:
{
return new ShrinkStream(
stream,
CompressionMode.Decompress,
Header.CompressedSize,
Header.UncompressedSize
);
}
case ZipCompressionMethod.Reduce1:
{
return new ReduceStream(stream, Header.CompressedSize, Header.UncompressedSize, 1);
}
case ZipCompressionMethod.Reduce2:
{
return new ReduceStream(stream, Header.CompressedSize, Header.UncompressedSize, 2);
}
case ZipCompressionMethod.Reduce3:
{
return new ReduceStream(stream, Header.CompressedSize, Header.UncompressedSize, 3);
}
case ZipCompressionMethod.Reduce4:
{
return new ReduceStream(stream, Header.CompressedSize, Header.UncompressedSize, 4);
}
case ZipCompressionMethod.Explode:
{
return new ExplodeStream(
stream,
Header.CompressedSize,
Header.UncompressedSize,
Header.Flags
);
}
case ZipCompressionMethod.Deflate:
{
return new DeflateStream(stream, CompressionMode.Decompress);
}
case ZipCompressionMethod.Deflate64:
{
return new Deflate64Stream(stream, CompressionMode.Decompress);
}
case ZipCompressionMethod.BZip2:
{
return new BZip2Stream(stream, CompressionMode.Decompress, false);
}
case ZipCompressionMethod.LZMA:
{
if (FlagUtility.HasFlag(Header.Flags, HeaderFlags.Encrypted))
{
throw new NotSupportedException("LZMA with pkware encryption.");
}
var buffer = new byte[4];
await stream.ReadFullyAsync(buffer, 0, 4, cancellationToken).ConfigureAwait(false);
var version = BinaryPrimitives.ReadUInt16LittleEndian(buffer.AsSpan(0, 2));
var propsSize = BinaryPrimitives.ReadUInt16LittleEndian(buffer.AsSpan(2, 2));
var props = new byte[propsSize];
await stream
.ReadFullyAsync(props, 0, propsSize, cancellationToken)
.ConfigureAwait(false);
return new LzmaStream(
props,
stream,
Header.CompressedSize > 0 ? Header.CompressedSize - 4 - props.Length : -1,
FlagUtility.HasFlag(Header.Flags, HeaderFlags.Bit1)
? -1
: Header.UncompressedSize
);
}
case ZipCompressionMethod.Xz:
{
return new XZStream(stream);
}
case ZipCompressionMethod.ZStandard:
{
return new DecompressionStream(stream);
}
case ZipCompressionMethod.PPMd:
{
var props = new byte[2];
await stream.ReadFullyAsync(props, 0, 2, cancellationToken).ConfigureAwait(false);
return new PpmdStream(new PpmdProperties(props), stream, false);
}
case ZipCompressionMethod.WinzipAes:
{
var data = Header.Extra.SingleOrDefault(x => x.Type == ExtraDataType.WinZipAes);
if (data is null)
{
throw new InvalidFormatException("No Winzip AES extra data found.");
}
if (data.Length != 7)
{
throw new InvalidFormatException("Winzip data length is not 7.");
}
throw new NotSupportedException("WinzipAes isn't supported for streaming");
}
default:
{
throw new NotSupportedException("CompressionMethod: " + Header.CompressionMethod);
}
}
}
}

View File

@@ -268,11 +268,11 @@ public abstract class AbstractReader<TEntry, TVolume> : IReader
internal async Task WriteAsync(Stream writeStream, CancellationToken cancellationToken)
{
#if NETFRAMEWORK || NETSTANDARD2_0
using Stream s = OpenEntryStream();
using Stream s = await OpenEntryStreamAsync(cancellationToken).ConfigureAwait(false);
var sourceStream = WrapWithProgress(s, Entry);
await sourceStream.CopyToAsync(writeStream, 81920, cancellationToken).ConfigureAwait(false);
#else
await using Stream s = OpenEntryStream();
await using Stream s = await OpenEntryStreamAsync(cancellationToken).ConfigureAwait(false);
var sourceStream = WrapWithProgress(s, Entry);
await sourceStream.CopyToAsync(writeStream, 81920, cancellationToken).ConfigureAwait(false);
#endif
@@ -347,9 +347,16 @@ public abstract class AbstractReader<TEntry, TVolume> : IReader
protected virtual EntryStream GetEntryStream() =>
CreateEntryStream(Entry.Parts.First().GetCompressedStream());
protected virtual Task<EntryStream> GetEntryStreamAsync(
protected virtual async Task<EntryStream> GetEntryStreamAsync(
CancellationToken cancellationToken = default
) => Task.FromResult(GetEntryStream());
)
{
var stream = await Entry
.Parts.First()
.GetCompressedStreamAsync(cancellationToken)
.ConfigureAwait(false);
return CreateEntryStream(stream);
}
#endregion

View File

@@ -406,6 +406,33 @@ internal static class Utility
return (total >= buffer.Length);
}
public static async Task<bool> ReadFullyAsync(
this Stream stream,
byte[] buffer,
int offset,
int count,
CancellationToken cancellationToken = default
)
{
var total = 0;
int read;
while (
(
read = await stream
.ReadAsync(buffer, offset + total, count - total, cancellationToken)
.ConfigureAwait(false)
) > 0
)
{
total += read;
if (total >= count)
{
return true;
}
}
return (total >= count);
}
public static string TrimNulls(this string source) => source.Replace('\0', ' ').Trim();
/// <summary>

View File

@@ -32,21 +32,32 @@ public class AsyncOnlyStream : Stream
throw new NotSupportedException("Synchronous Read is not supported");
}
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
public override Task<int> ReadAsync(
byte[] buffer,
int offset,
int count,
CancellationToken cancellationToken
)
{
return _stream.ReadAsync(buffer, offset, count, cancellationToken);
}
#if !NETFRAMEWORK && !NETSTANDARD2_0
public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
public override ValueTask<int> ReadAsync(
Memory<byte> buffer,
CancellationToken cancellationToken = default
)
{
return _stream.ReadAsync(buffer, cancellationToken);
}
#endif
public override long Seek(long offset, SeekOrigin origin) => _stream.Seek(offset, origin);
public override void SetLength(long value) => _stream.SetLength(value);
public override void Write(byte[] buffer, int offset, int count) => _stream.Write(buffer, offset, count);
public override void Write(byte[] buffer, int offset, int count) =>
_stream.Write(buffer, offset, count);
protected override void Dispose(bool disposing)
{
@@ -57,4 +68,3 @@ public class AsyncOnlyStream : Stream
base.Dispose(disposing);
}
}

View File

@@ -164,7 +164,9 @@ public class ZipReaderAsyncTests : ReaderTests
public async Task Zip_Reader_Disposal_Test2_Async()
{
using var stream = new TestStream(
new AsyncOnlyStream(File.OpenRead(Path.Combine(TEST_ARCHIVES_PATH, "Zip.deflate.dd.zip")))
new AsyncOnlyStream(
File.OpenRead(Path.Combine(TEST_ARCHIVES_PATH, "Zip.deflate.dd.zip"))
)
);
var reader = ReaderFactory.Open(stream);
while (await reader.MoveToNextEntryAsync())
@@ -185,9 +187,9 @@ public class ZipReaderAsyncTests : ReaderTests
await Assert.ThrowsAsync<NotSupportedException>(async () =>
{
using (
Stream stream = new AsyncOnlyStream(File.OpenRead(
Path.Combine(TEST_ARCHIVES_PATH, "Zip.lzma.WinzipAES.zip")
))
Stream stream = new AsyncOnlyStream(
File.OpenRead(Path.Combine(TEST_ARCHIVES_PATH, "Zip.lzma.WinzipAES.zip"))
)
)
using (var reader = ZipReader.Open(stream, new ReaderOptions { Password = "test" }))
{
@@ -210,9 +212,9 @@ public class ZipReaderAsyncTests : ReaderTests
public async Task Zip_Deflate_WinzipAES_Read_Async()
{
using (
Stream stream = new AsyncOnlyStream(File.OpenRead(
Path.Combine(TEST_ARCHIVES_PATH, "Zip.deflate.WinzipAES.zip")
))
Stream stream = new AsyncOnlyStream(
File.OpenRead(Path.Combine(TEST_ARCHIVES_PATH, "Zip.deflate.WinzipAES.zip"))
)
)
using (var reader = ZipReader.Open(stream, new ReaderOptions { Password = "test" }))
{
@@ -235,7 +237,11 @@ public class ZipReaderAsyncTests : ReaderTests
public async Task Zip_Deflate_ZipCrypto_Read_Async()
{
var count = 0;
using (Stream stream = new AsyncOnlyStream(File.OpenRead(Path.Combine(TEST_ARCHIVES_PATH, "zipcrypto.zip"))))
using (
Stream stream = new AsyncOnlyStream(
File.OpenRead(Path.Combine(TEST_ARCHIVES_PATH, "zipcrypto.zip"))
)
)
using (var reader = ZipReader.Open(stream, new ReaderOptions { Password = "test" }))
{
while (await reader.MoveToNextEntryAsync())