support log size / processing report for gzip streams

This commit is contained in:
13xforever
2019-03-16 18:53:22 +05:00
parent 0abc0595af
commit 14f317ffdc
15 changed files with 148 additions and 10 deletions

View File

@@ -3,12 +3,14 @@ using System.IO;
using System.IO.Compression;
using System.IO.Pipelines;
using System.Threading.Tasks;
using CompatBot.Utils;
namespace CompatBot.EventHandlers.LogParsing.ArchiveHandlers
{
internal sealed class GzipHandler: IArchiveHandler
{
public long LogSize { get; private set; }
public long SourcePosition { get; private set; }
public bool CanHandle(string fileName, int fileSize, ReadOnlySpan<byte> header)
{
@@ -18,11 +20,11 @@ namespace CompatBot.EventHandlers.LogParsing.ArchiveHandlers
public async Task FillPipeAsync(Stream sourceStream, PipeWriter writer)
{
using (var gzipStream = new GZipStream(sourceStream, CompressionMode.Decompress))
using (var statsStream = new BufferCopyStream(sourceStream) )
using (var gzipStream = new GZipStream(statsStream, CompressionMode.Decompress))
{
try
{
LogSize = -1;
int read;
FlushResult flushed;
do
@@ -30,8 +32,14 @@ namespace CompatBot.EventHandlers.LogParsing.ArchiveHandlers
var memory = writer.GetMemory(Config.MinimumBufferSize);
read = await gzipStream.ReadAsync(memory, Config.Cts.Token);
writer.Advance(read);
SourcePosition = statsStream.Position;
flushed = await writer.FlushAsync(Config.Cts.Token).ConfigureAwait(false);
SourcePosition = statsStream.Position;
} while (read > 0 && !(flushed.IsCompleted || flushed.IsCanceled || Config.Cts.IsCancellationRequested));
var buf = statsStream.GetBufferedBytes();
if (buf.Length > 3)
LogSize = BitConverter.ToInt32(buf.AsSpan(buf.Length - 4, 4));
}
catch (Exception e)
{

View File

@@ -10,5 +10,6 @@ namespace CompatBot.EventHandlers.LogParsing.ArchiveHandlers
bool CanHandle(string fileName, int fileSize, ReadOnlySpan<byte> header);
Task FillPipeAsync(Stream sourceStream, PipeWriter writer);
long LogSize { get; }
long SourcePosition { get; }
}
}

View File

@@ -8,6 +8,7 @@ namespace CompatBot.EventHandlers.LogParsing.ArchiveHandlers
internal sealed class PlainTextHandler: IArchiveHandler
{
public long LogSize { get; private set; }
public long SourcePosition { get; private set; }
public bool CanHandle(string fileName, int fileSize, ReadOnlySpan<byte> header)
{

View File

@@ -10,6 +10,7 @@ namespace CompatBot.EventHandlers.LogParsing.ArchiveHandlers
internal sealed class RarHandler: IArchiveHandler
{
public long LogSize { get; private set; }
public long SourcePosition { get; private set; }
public bool CanHandle(string fileName, int fileSize, ReadOnlySpan<byte> header)
{

View File

@@ -9,6 +9,7 @@ namespace CompatBot.EventHandlers.LogParsing.ArchiveHandlers
internal sealed class SevenZipHandler: IArchiveHandler
{
public long LogSize { get; private set; }
public long SourcePosition { get; private set; }
public bool CanHandle(string fileName, int fileSize, ReadOnlySpan<byte> header)
{

View File

@@ -11,6 +11,7 @@ namespace CompatBot.EventHandlers.LogParsing.ArchiveHandlers
internal sealed class ZipHandler: IArchiveHandler
{
public long LogSize { get; private set; }
public long SourcePosition { get; private set; }
public bool CanHandle(string fileName, int fileSize, ReadOnlySpan<byte> header)
{

View File

@@ -8,7 +8,7 @@ namespace CompatBot.EventHandlers.LogParsing.SourceHandlers
{
internal abstract class BaseSourceHandler: ISourceHandler
{
protected static readonly ArrayPool<byte> bufferPool = ArrayPool<byte>.Create(1024, 64);
internal static readonly ArrayPool<byte> bufferPool = ArrayPool<byte>.Create(1024, 64);
public abstract Task<ISource> FindHandlerAsync(DiscordMessage message, ICollection<IArchiveHandler> handlers);
}

View File

@@ -50,6 +50,7 @@ namespace CompatBot.EventHandlers.LogParsing.SourceHandlers
public string SourceType => "Discord attachment";
public string FileName { get; }
public long SourceFileSize { get; }
public long SourceFilePosition => handler.SourcePosition;
public long LogFileSize => handler.LogSize;
internal DiscordAttachmentSource(DiscordAttachment attachment, IArchiveHandler handler, string fileName, int fileSize)

View File

@@ -95,7 +95,8 @@ namespace CompatBot.EventHandlers.LogParsing.SourceHandlers
{
public string SourceType => "Google Drive";
public string FileName => fileMeta.Name;
public long SourceFileSize => (int)fileMeta.Size;
public long SourceFileSize => fileMeta.Size ?? 0;
public long SourceFilePosition => handler.SourcePosition;
public long LogFileSize => handler.LogSize;
private FilesResource.GetRequest fileInfoRequest;

View File

@@ -16,6 +16,7 @@ namespace CompatBot.EventHandlers.LogParsing.SourceHandlers
string SourceType { get; }
string FileName { get; }
long SourceFileSize { get; }
long SourceFilePosition { get; }
long LogFileSize { get; }
Task FillPipeAsync(PipeWriter writer);
}

View File

@@ -72,7 +72,8 @@ namespace CompatBot.EventHandlers.LogParsing.SourceHandlers
public string SourceType => "Mega";
public string FileName => node.Name;
public long SourceFileSize => (int)node.Size;
public long SourceFileSize => node.Size;
public long SourceFilePosition => handler.SourcePosition;
public long LogFileSize => handler.LogSize;
internal MegaSource(IMegaApiClient client, Uri uri, INodeInfo node, IArchiveHandler handler)

View File

@@ -64,6 +64,7 @@ namespace CompatBot.EventHandlers.LogParsing.SourceHandlers
{
private Uri uri;
private readonly IArchiveHandler handler;
public long SourceFilePosition => handler.SourcePosition;
public long LogFileSize => handler.LogSize;
public PastebinSource(Uri uri, string filename, int filesize, IArchiveHandler handler)

View File

@@ -0,0 +1,116 @@
using System;
using System.IO;
namespace CompatBot.Utils
{
internal class BufferCopyStream : Stream, IDisposable
{
private readonly Stream baseStream;
private bool usedForReads;
private bool usedForWrites;
private long position;
private readonly int bufSize;
private readonly byte[] buf;
private int bufStart, bufLength;
public BufferCopyStream(Stream baseStream, int bufferSize = 4096)
{
this.baseStream = baseStream;
bufSize = bufferSize;
buf = new byte[bufSize];
}
public override void Flush()
{
baseStream.Flush();
}
public override int Read(byte[] buffer, int offset, int count)
{
usedForReads = true;
if (usedForWrites)
throw new InvalidOperationException("Stream was used for writes before");
var result = baseStream.Read(buffer, offset, count);
position += result;
CopyToBuf(buffer, offset, result);
return result;
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotImplementedException();
}
public override void SetLength(long value)
{
throw new NotImplementedException();
}
public override void Write(byte[] buffer, int offset, int count)
{
usedForWrites = true;
if (usedForReads)
throw new InvalidOperationException("Stream was used for reads before");
baseStream.Write(buffer, offset, count);
position += count;
CopyToBuf(buffer, offset, count);
}
protected override void Dispose(bool disposing)
{
baseStream?.Dispose();
base.Dispose(disposing);
}
void IDisposable.Dispose()
{
baseStream?.Dispose();
base.Dispose();
}
private void CopyToBuf(byte[] buffer, int offset, int count)
{
if (count >= bufSize)
{
bufStart = 0;
bufLength = bufSize;
Buffer.BlockCopy(buffer, offset + count - bufSize, buf, bufStart, bufLength);
}
else
{
// copy as much data as we can to the end of the buffer
bufStart = (bufStart + bufLength) % bufSize;
bufLength = Math.Min(bufSize - bufStart, count);
Buffer.BlockCopy(buffer, offset, buf, bufStart, bufLength);
// if there's still more data, loop it around to the beginning
if (bufLength < count)
{
Buffer.BlockCopy(buffer, offset + bufLength, buf, 0, count-bufLength);
bufLength = count;
}
}
}
public byte[] GetBufferedBytes()
{
var result = new byte[bufLength];
var partLength = Math.Min(bufSize - bufStart, bufLength);
Buffer.BlockCopy(buf, bufStart, result, 0, partLength);
if (partLength < bufLength)
Buffer.BlockCopy(buf, 0, result, partLength, bufLength - partLength);
return result;
}
public override bool CanRead => baseStream.CanRead;
public override bool CanSeek => false;
public override bool CanWrite => baseStream.CanWrite;
public override long Length => baseStream.Length;
public override long Position
{
get => position;
set => throw new InvalidOperationException();
}
}
}

View File

@@ -471,11 +471,13 @@ namespace CompatBot.Utils.ResultFormatters
else
msg = $"Log from {member.DisplayName.Sanitize()} | {member.Id}\n";
msg += " | " + (source?.SourceType ?? "Unknown source");
if (state?.ReadBytes > 0 && source.LogFileSize > 0)
if (state?.ReadBytes > 0 && source?.LogFileSize > 0)
msg += $" | Parsed {state.ReadBytes * 100.0 / source.LogFileSize:0.##}%";
else if (source?.SourceFilePosition > 0 && source.SourceFileSize > 0)
msg += $" | Read {source.SourceFilePosition * 100.0 / source.SourceFileSize:0.##}%";
else if (state?.ReadBytes > 0)
msg += $" | Parsed {state.ReadBytes} byte{(state.ReadBytes == 1 ? "" : "s")}";
else if (source.LogFileSize > 0)
else if (source?.LogFileSize > 0)
msg += $" | {source.LogFileSize} byte{(source.LogFileSize == 1 ? "" : "s")}";
#if DEBUG
msg += " | Test Bot Instance";

View File

@@ -5,16 +5,18 @@ namespace CompatBot.Utils
{
internal static class StreamExtensions
{
public static async Task<int> ReadBytesAsync(this Stream stream, byte[] buffer)
public static async Task<int> ReadBytesAsync(this Stream stream, byte[] buffer, int count = 0)
{
if (count < 1 || count > buffer.Length)
count = buffer.Length;
var result = 0;
int read;
do
{
var remaining = buffer.Length - result;
var remaining = count - result;
read = await stream.ReadAsync(buffer, result, remaining).ConfigureAwait(false);
result += read;
} while (read > 0 && result < buffer.Length);
} while (read > 0 && result < count);
return result;
}
}