Best way to achieve parallel processing of individual entries inside a tarball #520

Closed
opened 2026-01-29 22:13:12 +00:00 by claunia · 2 comments
Owner

Originally created by @arvindshmicrosoft on GitHub (May 22, 2022).

My goal is to use separate threads to extract and process individual files from the tarball. Each thread will be doing some CPU intensive post-processing with the data being read from the file, hence the motivation to keep this multi-threaded. Given that individual TarReader instances are assumed to not be thread-safe, I took the approach of opening up separate TarReader instances, and then starting separate tasks for each EntryStream.

The approach seems to work, but performance rapidly becomes a concern for larger files. The logic I use of skipping entries which already have a Task queued up, is wasteful in that it starts from scratch each time, so later entries in the tarball take (by definition) longer to even start processing. For example, when SkipEntry() is called for larger (several tens of GiBs each) files, it looks like the stream is still read till the next entry, and that is painfully slow. This is despite setting a larger buffer size for the underlying FileStream and using an SSD for the source tarball:

 	System.Private.CoreLib.dll!Interop.Kernel32.ReadFile(System.Runtime.InteropServices.SafeHandle handle, byte* bytes, int numBytesToRead, out int numBytesRead, System.Threading.NativeOverlapped* overlapped)	Unknown
 	System.Private.CoreLib.dll!System.IO.Strategies.OSFileStreamStrategy.Read(System.Span<byte> buffer)	Unknown
 	System.Private.CoreLib.dll!System.IO.Strategies.BufferedFileStreamStrategy.ReadSpan(System.Span<byte> destination, System.ArraySegment<byte> arraySegment)	Unknown
 	System.Private.CoreLib.dll!System.IO.FileStream.Read(byte[] buffer, int offset, int count)	Unknown
 	SharpCompress.dll!SharpCompress.IO.RewindableStream.Read(byte[] buffer, int offset, int count)	Unknown
 	SharpCompress.dll!SharpCompress.Common.Tar.TarReadOnlySubStream.Read(byte[] buffer, int offset, int count)	Unknown
 	SharpCompress.dll!SharpCompress.Common.EntryStream.Read(byte[] buffer, int offset, int count)	Unknown
 	SharpCompress.dll!SharpCompress.Utility.Skip(System.IO.Stream source)	Unknown
 	SharpCompress.dll!SharpCompress.Common.EntryStream.SkipEntry()	Unknown

The whole sample is below. I would appreciate any advice on doing this in a more performant manner. For example, is there a way I can start reading from a later position in the stream, for subsequent executions of the outer loop? When opening the same tarball with 7-zip for example, it is very quick to extract entries which are much "later" in the file. So, I am hoping there is a more performant way to achieve my requirement using SharpCompress.

using SharpCompress.Readers.Tar;

TarReader GetTarReader(string sourceFile)
{
    var srcFs = new FileStream(sourceFile, FileMode.Open, FileAccess.Read, FileShare.Read, 100000000, true);
    return TarReader.Open(srcFs, new SharpCompress.Readers.ReaderOptions());
}

var extractTasks = new Dictionary<string, Task>();

while (true)
{
    var wasTaskAdded = false;
    lock (extractTasks)
    {
        var reader = GetTarReader(@"c:\temp\very-large-tarball.tar");
        {
            while (reader.MoveToNextEntry())
            {
                if (!reader.Entry.IsDirectory)
                {
                    var entryStream = reader.OpenEntryStream();
                    {
                        string file = Path.GetFileName(reader.Entry.Key);
                        string folder = Path.GetDirectoryName(reader.Entry.Key);
                        string destdir = Path.GetTempPath();
                        string destinationFileName = Path.Combine(destdir, file);

                        if (extractTasks.ContainsKey(destinationFileName))
                        {
                            entryStream.SkipEntry();
                            entryStream.Dispose();
                            continue;
                        }

                        if (file.ToLower().EndsWith(".csv"))
                        {
                            Console.WriteLine($"Will process {destinationFileName}");
                            extractTasks.Add(destinationFileName, Task.Run(() => ProcessEntry(entryStream, destinationFileName)));
                            wasTaskAdded = true;
                            break;
                        }
                        else
                        {
                            entryStream.SkipEntry();
                            entryStream.Dispose();
                        }
                    }
                }
            }
        }
    }

    if (!wasTaskAdded) break;
}

var allTasks = extractTasks.Values.ToArray();
await Task.WhenAll(allTasks);

async Task ProcessEntry(Stream entryStream, string destinationFileName)
{
    // some CPU-intensive processing with the entryStream
}
Originally created by @arvindshmicrosoft on GitHub (May 22, 2022). My goal is to use separate threads to extract and process individual files from the tarball. Each thread will be doing some CPU intensive post-processing with the data being read from the file, hence the motivation to keep this multi-threaded. Given that individual TarReader instances are assumed to not be thread-safe, I took the approach of opening up separate TarReader instances, and then starting separate tasks for each EntryStream. The approach seems to work, but performance rapidly becomes a concern for larger files. The logic I use of skipping entries which already have a Task queued up, is wasteful in that it starts from scratch each time, so later entries in the tarball take (by definition) longer to even start processing. For example, when SkipEntry() is called for larger (several tens of GiBs each) files, it looks like the stream is still read till the next entry, and that is painfully slow. This is despite setting a larger buffer size for the underlying FileStream and using an SSD for the source tarball: ``` System.Private.CoreLib.dll!Interop.Kernel32.ReadFile(System.Runtime.InteropServices.SafeHandle handle, byte* bytes, int numBytesToRead, out int numBytesRead, System.Threading.NativeOverlapped* overlapped) Unknown System.Private.CoreLib.dll!System.IO.Strategies.OSFileStreamStrategy.Read(System.Span<byte> buffer) Unknown System.Private.CoreLib.dll!System.IO.Strategies.BufferedFileStreamStrategy.ReadSpan(System.Span<byte> destination, System.ArraySegment<byte> arraySegment) Unknown System.Private.CoreLib.dll!System.IO.FileStream.Read(byte[] buffer, int offset, int count) Unknown SharpCompress.dll!SharpCompress.IO.RewindableStream.Read(byte[] buffer, int offset, int count) Unknown SharpCompress.dll!SharpCompress.Common.Tar.TarReadOnlySubStream.Read(byte[] buffer, int offset, int count) Unknown SharpCompress.dll!SharpCompress.Common.EntryStream.Read(byte[] buffer, int offset, int count) Unknown SharpCompress.dll!SharpCompress.Utility.Skip(System.IO.Stream source) Unknown SharpCompress.dll!SharpCompress.Common.EntryStream.SkipEntry() Unknown ``` The whole sample is below. I would appreciate any advice on doing this in a more performant manner. For example, is there a way I can start reading from a later position in the stream, for subsequent executions of the outer loop? When opening the same tarball with 7-zip for example, it is very quick to extract entries which are much "later" in the file. So, I am hoping there is a more performant way to achieve my requirement using SharpCompress. ```csharp using SharpCompress.Readers.Tar; TarReader GetTarReader(string sourceFile) { var srcFs = new FileStream(sourceFile, FileMode.Open, FileAccess.Read, FileShare.Read, 100000000, true); return TarReader.Open(srcFs, new SharpCompress.Readers.ReaderOptions()); } var extractTasks = new Dictionary<string, Task>(); while (true) { var wasTaskAdded = false; lock (extractTasks) { var reader = GetTarReader(@"c:\temp\very-large-tarball.tar"); { while (reader.MoveToNextEntry()) { if (!reader.Entry.IsDirectory) { var entryStream = reader.OpenEntryStream(); { string file = Path.GetFileName(reader.Entry.Key); string folder = Path.GetDirectoryName(reader.Entry.Key); string destdir = Path.GetTempPath(); string destinationFileName = Path.Combine(destdir, file); if (extractTasks.ContainsKey(destinationFileName)) { entryStream.SkipEntry(); entryStream.Dispose(); continue; } if (file.ToLower().EndsWith(".csv")) { Console.WriteLine($"Will process {destinationFileName}"); extractTasks.Add(destinationFileName, Task.Run(() => ProcessEntry(entryStream, destinationFileName))); wasTaskAdded = true; break; } else { entryStream.SkipEntry(); entryStream.Dispose(); } } } } } } if (!wasTaskAdded) break; } var allTasks = extractTasks.Values.ToArray(); await Task.WhenAll(allTasks); async Task ProcessEntry(Stream entryStream, string destinationFileName) { // some CPU-intensive processing with the entryStream } ```
Author
Owner

@adamhathcock commented on GitHub (May 23, 2022):

If you use TarArchive (which you should because it's a file that can be randomly accessed?) then skip will be faster because the code should be using Seek to skip entries instead of reading bytes to the Null stream (Reader is forward only so we can't seek in that scenario).

If you use multiple TarArchives to process then you might be able to achive what you want. Maybe also MemoryMapped file streams.

@adamhathcock commented on GitHub (May 23, 2022): If you use TarArchive (which you should because it's a file that can be randomly accessed?) then skip will be faster because the code should be using `Seek` to skip entries instead of reading bytes to the Null stream (Reader is forward only so we can't seek in that scenario). If you use multiple TarArchives to process then you might be able to achive what you want. Maybe also MemoryMapped file streams.
Author
Owner

@arvindshmicrosoft commented on GitHub (May 23, 2022):

Thank you @adamhathcock for your precise suggestion. Using TarArchive worked like a charm, and only minimal changes were required to my code. For completeness, the refactored code is below.

using SharpCompress.Archives.Tar;

TarArchive GetTarArchive(string sourceFile)
{
    var srcFs = new FileStream(sourceFile, FileMode.Open, FileAccess.Read, FileShare.Read, 100000000, true);
    return TarArchive.Open(srcFs, new SharpCompress.Readers.ReaderOptions());
}

var extractTasks = new Dictionary<string, Task>();

while (true)
{
    var wasTaskAdded = false;
    lock (extractTasks)
    {
        var reader = GetTarArchive(@"c:\temp\very-large-tarball.tar");
        {
            foreach (var entry in reader.Entries)
            {
                if (!entry.IsDirectory)
                {
                    var entryStream = entry.OpenEntryStream();
                    {
                        string file = Path.GetFileName(entry.Key);
                        string folder = Path.GetDirectoryName(entry.Key);
                        string destdir = Path.GetTempPath();
                        string destinationFileName = Path.Combine(destdir, file);

                        if (extractTasks.ContainsKey(destinationFileName))
                        {
                            entryStream.Dispose();
                            continue;
                        }

                        if (file.ToLower().EndsWith(".csv"))
                        {
                            Console.WriteLine($"Will process {destinationFileName}");
                            extractTasks.Add(destinationFileName, Task.Run(() => ProcessEntry(entryStream, destinationFileName)));
                            wasTaskAdded = true;
                            break;
                        }
                        else
                        {
                            entryStream.Dispose();
                        }
                    }
                }
            }
        }
    }

    if (!wasTaskAdded) break;
}

var allTasks = extractTasks.Values.ToArray();
Task.WaitAll(allTasks);

async Task ProcessEntry(Stream entryStream, string destinationFileName)
{
    // nothing to do for this sample
}
@arvindshmicrosoft commented on GitHub (May 23, 2022): Thank you @adamhathcock for your precise suggestion. Using TarArchive worked like a charm, and only minimal changes were required to my code. For completeness, the refactored code is below. ```csharp using SharpCompress.Archives.Tar; TarArchive GetTarArchive(string sourceFile) { var srcFs = new FileStream(sourceFile, FileMode.Open, FileAccess.Read, FileShare.Read, 100000000, true); return TarArchive.Open(srcFs, new SharpCompress.Readers.ReaderOptions()); } var extractTasks = new Dictionary<string, Task>(); while (true) { var wasTaskAdded = false; lock (extractTasks) { var reader = GetTarArchive(@"c:\temp\very-large-tarball.tar"); { foreach (var entry in reader.Entries) { if (!entry.IsDirectory) { var entryStream = entry.OpenEntryStream(); { string file = Path.GetFileName(entry.Key); string folder = Path.GetDirectoryName(entry.Key); string destdir = Path.GetTempPath(); string destinationFileName = Path.Combine(destdir, file); if (extractTasks.ContainsKey(destinationFileName)) { entryStream.Dispose(); continue; } if (file.ToLower().EndsWith(".csv")) { Console.WriteLine($"Will process {destinationFileName}"); extractTasks.Add(destinationFileName, Task.Run(() => ProcessEntry(entryStream, destinationFileName))); wasTaskAdded = true; break; } else { entryStream.Dispose(); } } } } } } if (!wasTaskAdded) break; } var allTasks = extractTasks.Values.ToArray(); Task.WaitAll(allTasks); async Task ProcessEntry(Stream entryStream, string destinationFileName) { // nothing to do for this sample } ```
Sign in to join this conversation.
1 Participants
Notifications
Due Date
No due date set.
Dependencies

No dependencies set.

Reference: starred/sharpcompress#520