Files
cuetools.net/CUETools.Codecs/CyclicBuffer.cs
2011-10-24 00:13:35 +00:00

232 lines
6.7 KiB
C#

using System;
using System.IO;
using System.Threading;
namespace CUETools.Codecs
{
public class CyclicBuffer
{
private byte[] _buffer;
private int _size;
private int _start = 0; // moved only by Write
private int _end = 0; // moved only by Read
private bool _eof = false;
private Thread _readThread = null, _writeThread = null;
private Exception _ex = null;
private int DataAvailable
{
get
{
return _end - _start;
}
}
private int FreeSpace
{
get
{
return _size - DataAvailable;
}
}
public CyclicBuffer(int len)
{
_size = len;
_buffer = new byte[len];
}
public CyclicBuffer(int len, Stream input, Stream output)
{
_size = len;
_buffer = new byte[len];
ReadFrom(input);
WriteTo(output);
}
public void ReadFrom(Stream input)
{
_readThread = new Thread(PumpRead);
_readThread.Priority = ThreadPriority.Highest;
_readThread.IsBackground = true;
_readThread.Start(input);
}
public void WriteTo(Stream output)
{
WriteTo(flushOutputToStream, closeOutputToStream, ThreadPriority.Highest, output);
}
public void WriteTo(FlushOutput flushOutputDelegate, CloseOutput closeOutputDelegate, ThreadPriority priority, object to)
{
if (flushOutputDelegate != null)
flushOutput += flushOutputDelegate;
if (closeOutputDelegate != null)
closeOutput += closeOutputDelegate;
_writeThread = new Thread(FlushThread);
_writeThread.Priority = priority;
_writeThread.IsBackground = true;
_writeThread.Start(to);
}
private void closeOutputToStream(object to)
{
((Stream)to).Close();
}
private void flushOutputToStream(byte[] buffer, int pos, int chunk, object to)
{
((Stream)to).Write(buffer, pos, chunk);
}
private void PumpRead(object o)
{
while (Read((Stream)o))
;
SetEOF();
}
public void Close()
{
if (_readThread != null)
{
_readThread.Join();
_readThread = null;
}
SetEOF();
if (_writeThread != null)
{
_writeThread.Join();
_writeThread = null;
}
}
public void SetEOF()
{
lock (this)
{
_eof = true;
Monitor.Pulse(this);
}
}
public bool Read(Stream input)
{
int pos, chunk;
lock (this)
{
while (FreeSpace == 0 && _ex == null)
Monitor.Wait(this);
if (_ex != null)
throw _ex;
pos = _end % _size;
chunk = Math.Min(FreeSpace, _size - pos);
}
chunk = input.Read(_buffer, pos, chunk);
if (chunk == 0)
return false;
lock (this)
{
_end += chunk;
Monitor.Pulse(this);
}
return true;
}
public void Read(byte[] array, int offset, int count)
{
int pos, chunk;
while (count > 0)
{
lock (this)
{
while (FreeSpace == 0 && _ex == null)
Monitor.Wait(this);
if (_ex != null)
throw _ex;
pos = _end % _size;
chunk = Math.Min(FreeSpace, _size - pos);
chunk = Math.Min(chunk, count);
}
Array.Copy(array, offset, _buffer, pos, chunk);
lock (this)
{
_end += chunk;
Monitor.Pulse(this);
}
count -= chunk;
offset += chunk;
}
}
public void Write(byte[] buff, int offs, int count)
{
while (count > 0)
{
int pos, chunk;
lock (this)
{
while (DataAvailable == 0 && !_eof)
Monitor.Wait(this);
if (DataAvailable == 0)
break;
pos = _start % _size;
chunk = Math.Min(DataAvailable, _size - pos);
}
if (flushOutput != null)
Array.Copy(_buffer, pos, buff, offs, chunk);
offs += chunk;
lock (this)
{
_start += chunk;
Monitor.Pulse(this);
}
}
}
private void FlushThread(object to)
{
while (true)
{
int pos, chunk;
lock (this)
{
while (DataAvailable == 0 && !_eof)
Monitor.Wait(this);
if (DataAvailable == 0)
break;
pos = _start % _size;
chunk = Math.Min(DataAvailable, _size - pos);
}
if (flushOutput != null)
try
{
flushOutput(_buffer, pos, chunk, to);
}
catch (Exception ex)
{
lock (this)
{
_ex = ex;
Monitor.Pulse(this);
return;
}
}
lock (this)
{
_start += chunk;
Monitor.Pulse(this);
}
}
if (closeOutput != null)
closeOutput(to);
}
public delegate void FlushOutput(byte[] buffer, int pos, int chunk, object to);
public delegate void CloseOutput(object to);
public event FlushOutput flushOutput;
public event CloseOutput closeOutput;
}
}