Merge pull request #338 from 13xforever/vnext

Rework how log parsing timeout works to fix the deadlock
This commit is contained in:
Ilya 2019-06-13 23:09:16 +05:00 committed by GitHub
commit 29d091443a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 59 additions and 45 deletions

View File

@ -2,6 +2,7 @@
using System.IO;
using System.IO.Compression;
using System.IO.Pipelines;
using System.Threading;
using System.Threading.Tasks;
using CompatBot.Utils;
@ -18,7 +19,7 @@ namespace CompatBot.EventHandlers.LogParsing.ArchiveHandlers
&& !fileName.Contains("tty.log", StringComparison.InvariantCultureIgnoreCase);
}
public async Task FillPipeAsync(Stream sourceStream, PipeWriter writer)
public async Task FillPipeAsync(Stream sourceStream, PipeWriter writer, CancellationToken cancellationToken)
{
using (var statsStream = new BufferCopyStream(sourceStream) )
using (var gzipStream = new GZipStream(statsStream, CompressionMode.Decompress))
@ -30,12 +31,12 @@ namespace CompatBot.EventHandlers.LogParsing.ArchiveHandlers
do
{
var memory = writer.GetMemory(Config.MinimumBufferSize);
read = await gzipStream.ReadAsync(memory, Config.Cts.Token);
read = await gzipStream.ReadAsync(memory, cancellationToken);
writer.Advance(read);
SourcePosition = statsStream.Position;
flushed = await writer.FlushAsync(Config.Cts.Token).ConfigureAwait(false);
flushed = await writer.FlushAsync(cancellationToken).ConfigureAwait(false);
SourcePosition = statsStream.Position;
} while (read > 0 && !(flushed.IsCompleted || flushed.IsCanceled || Config.Cts.IsCancellationRequested));
} while (read > 0 && !(flushed.IsCompleted || flushed.IsCanceled || cancellationToken.IsCancellationRequested));
var buf = statsStream.GetBufferedBytes();
if (buf.Length > 3)

View File

@ -1,6 +1,7 @@
using System;
using System.IO;
using System.IO.Pipelines;
using System.Threading;
using System.Threading.Tasks;
namespace CompatBot.EventHandlers.LogParsing.ArchiveHandlers
@ -8,7 +9,7 @@ namespace CompatBot.EventHandlers.LogParsing.ArchiveHandlers
internal interface IArchiveHandler
{
bool CanHandle(string fileName, int fileSize, ReadOnlySpan<byte> header);
Task FillPipeAsync(Stream sourceStream, PipeWriter writer);
Task FillPipeAsync(Stream sourceStream, PipeWriter writer, CancellationToken cancellationToken);
long LogSize { get; }
long SourcePosition { get; }
}

View File

@ -2,6 +2,7 @@
using System.IO;
using System.IO.Pipelines;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace CompatBot.EventHandlers.LogParsing.ArchiveHandlers
@ -20,7 +21,7 @@ namespace CompatBot.EventHandlers.LogParsing.ArchiveHandlers
&& Encoding.UTF8.GetString(header.Slice(0, 8)).Contains("RPCS3");
}
public async Task FillPipeAsync(Stream sourceStream, PipeWriter writer)
public async Task FillPipeAsync(Stream sourceStream, PipeWriter writer, CancellationToken cancellationToken)
{
try
{
@ -29,10 +30,10 @@ namespace CompatBot.EventHandlers.LogParsing.ArchiveHandlers
do
{
var memory = writer.GetMemory(Config.MinimumBufferSize);
read = await sourceStream.ReadAsync(memory, Config.Cts.Token);
read = await sourceStream.ReadAsync(memory, cancellationToken);
writer.Advance(read);
flushed = await writer.FlushAsync(Config.Cts.Token).ConfigureAwait(false);
} while (read > 0 && !(flushed.IsCompleted || flushed.IsCanceled || Config.Cts.IsCancellationRequested));
flushed = await writer.FlushAsync(cancellationToken).ConfigureAwait(false);
} while (read > 0 && !(flushed.IsCompleted || flushed.IsCanceled || cancellationToken.IsCancellationRequested));
}
catch (Exception e)
{

View File

@ -2,6 +2,7 @@
using System.IO;
using System.IO.Pipelines;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using SharpCompress.Archives.Rar;
@ -24,13 +25,13 @@ namespace CompatBot.EventHandlers.LogParsing.ArchiveHandlers
return firstEntry.Contains(".log", StringComparison.InvariantCultureIgnoreCase);
}
public async Task FillPipeAsync(Stream sourceStream, PipeWriter writer)
public async Task FillPipeAsync(Stream sourceStream, PipeWriter writer, CancellationToken cancellationToken)
{
try
{
using (var fileStream = new FileStream(Path.GetTempFileName(), FileMode.Create, FileAccess.ReadWrite, FileShare.Read, 16384, FileOptions.Asynchronous | FileOptions.RandomAccess | FileOptions.DeleteOnClose))
{
await sourceStream.CopyToAsync(fileStream, 16384, Config.Cts.Token).ConfigureAwait(false);
await sourceStream.CopyToAsync(fileStream, 16384, cancellationToken).ConfigureAwait(false);
fileStream.Seek(0, SeekOrigin.Begin);
using (var rarArchive = RarArchive.Open(fileStream))
using (var rarReader = rarArchive.ExtractAllEntries())
@ -47,10 +48,10 @@ namespace CompatBot.EventHandlers.LogParsing.ArchiveHandlers
do
{
var memory = writer.GetMemory(Config.MinimumBufferSize);
read = await rarStream.ReadAsync(memory, Config.Cts.Token);
read = await rarStream.ReadAsync(memory, cancellationToken);
writer.Advance(read);
flushed = await writer.FlushAsync(Config.Cts.Token).ConfigureAwait(false);
} while (read > 0 && !(flushed.IsCompleted || flushed.IsCanceled || Config.Cts.IsCancellationRequested));
flushed = await writer.FlushAsync(cancellationToken).ConfigureAwait(false);
} while (read > 0 && !(flushed.IsCompleted || flushed.IsCanceled || cancellationToken.IsCancellationRequested));
}
writer.Complete();
return;

View File

@ -1,6 +1,7 @@
using System;
using System.IO;
using System.IO.Pipelines;
using System.Threading;
using System.Threading.Tasks;
using SharpCompress.Archives.SevenZip;
@ -22,13 +23,13 @@ namespace CompatBot.EventHandlers.LogParsing.ArchiveHandlers
return true;
}
public async Task FillPipeAsync(Stream sourceStream, PipeWriter writer)
public async Task FillPipeAsync(Stream sourceStream, PipeWriter writer, CancellationToken cancellationToken)
{
try
{
using (var fileStream = new FileStream(Path.GetTempFileName(), FileMode.Create, FileAccess.ReadWrite, FileShare.Read, 16384, FileOptions.Asynchronous | FileOptions.RandomAccess | FileOptions.DeleteOnClose))
{
await sourceStream.CopyToAsync(fileStream, 16384, Config.Cts.Token).ConfigureAwait(false);
await sourceStream.CopyToAsync(fileStream, 16384, cancellationToken).ConfigureAwait(false);
fileStream.Seek(0, SeekOrigin.Begin);
using (var zipArchive = SevenZipArchive.Open(fileStream))
using (var zipReader = zipArchive.ExtractAllEntries())
@ -45,10 +46,10 @@ namespace CompatBot.EventHandlers.LogParsing.ArchiveHandlers
do
{
var memory = writer.GetMemory(Config.MinimumBufferSize);
read = await entryStream.ReadAsync(memory, Config.Cts.Token);
read = await entryStream.ReadAsync(memory, cancellationToken);
writer.Advance(read);
flushed = await writer.FlushAsync(Config.Cts.Token).ConfigureAwait(false);
} while (read > 0 && !(flushed.IsCompleted || flushed.IsCanceled || Config.Cts.IsCancellationRequested));
flushed = await writer.FlushAsync(cancellationToken).ConfigureAwait(false);
} while (read > 0 && !(flushed.IsCompleted || flushed.IsCanceled || cancellationToken.IsCancellationRequested));
}
writer.Complete();
return;

View File

@ -4,6 +4,7 @@ using System.IO.Compression;
using System.IO.Pipelines;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace CompatBot.EventHandlers.LogParsing.ArchiveHandlers
@ -25,13 +26,13 @@ namespace CompatBot.EventHandlers.LogParsing.ArchiveHandlers
return firstEntry.Contains(".log", StringComparison.InvariantCultureIgnoreCase);
}
public async Task FillPipeAsync(Stream sourceStream, PipeWriter writer)
public async Task FillPipeAsync(Stream sourceStream, PipeWriter writer, CancellationToken cancellationToken)
{
try
{
using (var fileStream = new FileStream(Path.GetTempFileName(), FileMode.Create, FileAccess.ReadWrite, FileShare.Read, 16384, FileOptions.Asynchronous | FileOptions.RandomAccess | FileOptions.DeleteOnClose))
{
await sourceStream.CopyToAsync(fileStream, 16384, Config.Cts.Token).ConfigureAwait(false);
await sourceStream.CopyToAsync(fileStream, 16384, cancellationToken).ConfigureAwait(false);
fileStream.Seek(0, SeekOrigin.Begin);
using (var zipArchive = new ZipArchive(fileStream, ZipArchiveMode.Read))
{
@ -47,10 +48,10 @@ namespace CompatBot.EventHandlers.LogParsing.ArchiveHandlers
do
{
var memory = writer.GetMemory(Config.MinimumBufferSize);
read = await zipStream.ReadAsync(memory, Config.Cts.Token);
read = await zipStream.ReadAsync(memory, cancellationToken);
writer.Advance(read);
flushed = await writer.FlushAsync(Config.Cts.Token).ConfigureAwait(false);
} while (read > 0 && !(flushed.IsCompleted || flushed.IsCanceled || Config.Cts.IsCancellationRequested));
flushed = await writer.FlushAsync(cancellationToken).ConfigureAwait(false);
} while (read > 0 && !(flushed.IsCompleted || flushed.IsCanceled || cancellationToken.IsCancellationRequested));
}
}
}

View File

@ -16,9 +16,8 @@ namespace CompatBot.EventHandlers.LogParsing
private static readonly PoorMansTaskScheduler<LogParseState> TaskScheduler = new PoorMansTaskScheduler<LogParseState>();
public static async Task<LogParseState> ReadPipeAsync(PipeReader reader)
public static async Task<LogParseState> ReadPipeAsync(PipeReader reader, CancellationToken cancellationToken)
{
var timeout = new CancellationTokenSource(Config.LogParsingTimeout);
var currentSectionLines = new LinkedList<ReadOnlySequence<byte>>();
var state = new LogParseState();
bool skippedBom = false;
@ -26,7 +25,7 @@ namespace CompatBot.EventHandlers.LogParsing
ReadResult result;
do
{
result = await reader.ReadAsync(Config.Cts.Token).ConfigureAwait(false);
result = await reader.ReadAsync(cancellationToken).ConfigureAwait(false);
var buffer = result.Buffer;
if (!skippedBom)
{
@ -62,14 +61,14 @@ namespace CompatBot.EventHandlers.LogParsing
}
} while (lineEnd != null);
if (result.IsCanceled || Config.Cts.IsCancellationRequested || timeout.IsCancellationRequested)
if (result.IsCanceled || cancellationToken.IsCancellationRequested)
state.Error = LogParseState.ErrorCode.SizeLimit;
else if (result.IsCompleted)
await FlushAllLinesAsync(result.Buffer, currentSectionLines, state).ConfigureAwait(false);
var sectionStart = currentSectionLines.Count == 0 ? buffer : currentSectionLines.First.Value;
totalReadBytes += result.Buffer.Slice(0, sectionStart.Start).Length;
reader.AdvanceTo(sectionStart.Start);
} while (!(result.IsCompleted || result.IsCanceled || Config.Cts.IsCancellationRequested || timeout.IsCancellationRequested));
} while (!(result.IsCompleted || result.IsCanceled || cancellationToken.IsCancellationRequested));
await TaskScheduler.WaitForClearTagAsync(state).ConfigureAwait(false);
state.ReadBytes = totalReadBytes;
reader.Complete();

View File

@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.IO.Pipelines;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using CompatBot.EventHandlers.LogParsing.ArchiveHandlers;
using CompatBot.Utils;
@ -61,11 +62,11 @@ namespace CompatBot.EventHandlers.LogParsing.SourceHandlers
SourceFileSize = fileSize;
}
public async Task FillPipeAsync(PipeWriter writer)
public async Task FillPipeAsync(PipeWriter writer, CancellationToken cancellationToken)
{
using (var client = HttpClientFactory.Create())
using (var stream = await client.GetStreamAsync(attachment.Url).ConfigureAwait(false))
await handler.FillPipeAsync(stream, writer).ConfigureAwait(false);
await handler.FillPipeAsync(stream, writer, cancellationToken).ConfigureAwait(false);
}
}
}

View File

@ -8,6 +8,7 @@ using DSharpPlus.Entities;
using CompatBot.Utils;
using System.IO.Pipelines;
using System.Net.Http;
using System.Threading;
using CompatApiClient;
namespace CompatBot.EventHandlers.LogParsing.SourceHandlers
@ -94,11 +95,11 @@ namespace CompatBot.EventHandlers.LogParsing.SourceHandlers
SourceFileSize = fileSize;
}
public async Task FillPipeAsync(PipeWriter writer)
public async Task FillPipeAsync(PipeWriter writer, CancellationToken cancellationToken)
{
using (var client = HttpClientFactory.Create())
using (var stream = await client.GetStreamAsync(uri).ConfigureAwait(false))
await handler.FillPipeAsync(stream, writer).ConfigureAwait(false);
await handler.FillPipeAsync(stream, writer, cancellationToken).ConfigureAwait(false);
}
}
}

View File

@ -4,6 +4,7 @@ using System.IO;
using System.IO.Pipelines;
using System.Net.Http.Headers;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using CompatBot.EventHandlers.LogParsing.ArchiveHandlers;
using DSharpPlus.Entities;
@ -109,21 +110,21 @@ namespace CompatBot.EventHandlers.LogParsing.SourceHandlers
this.handler = handler;
}
public async Task FillPipeAsync(PipeWriter writer)
public async Task FillPipeAsync(PipeWriter writer, CancellationToken cancellationToken)
{
try
{
var pipe = new Pipe();
using (var pushStream = pipe.Writer.AsStream())
{
var progressTask = fileInfoRequest.DownloadAsync(pushStream, Config.Cts.Token);
var progressTask = fileInfoRequest.DownloadAsync(pushStream, cancellationToken);
using (var pullStream = pipe.Reader.AsStream())
{
var pipingTask = handler.FillPipeAsync(pullStream, writer);
var pipingTask = handler.FillPipeAsync(pullStream, writer, cancellationToken);
var result = await progressTask.ConfigureAwait(false);
if (result.Status != DownloadStatus.Completed)
Config.Log.Error(result.Exception, "Failed to download file from Google Drive: " + result.Status);
await pipe.Writer.FlushAsync(Config.Cts.Token).ConfigureAwait(false);
await pipe.Writer.FlushAsync(cancellationToken).ConfigureAwait(false);
pipe.Writer.Complete();
await pipingTask.ConfigureAwait(false);
}

View File

@ -1,5 +1,6 @@
using System.Collections.Generic;
using System.IO.Pipelines;
using System.Threading;
using System.Threading.Tasks;
using CompatBot.EventHandlers.LogParsing.ArchiveHandlers;
using DSharpPlus.Entities;
@ -18,6 +19,6 @@ namespace CompatBot.EventHandlers.LogParsing.SourceHandlers
long SourceFileSize { get; }
long SourceFilePosition { get; }
long LogFileSize { get; }
Task FillPipeAsync(PipeWriter writer);
Task FillPipeAsync(PipeWriter writer, CancellationToken cancellationToken);
}
}

View File

@ -7,6 +7,7 @@ using DSharpPlus.Entities;
using CG.Web.MegaApiClient;
using CompatBot.Utils;
using System.IO.Pipelines;
using System.Threading;
namespace CompatBot.EventHandlers.LogParsing.SourceHandlers
{
@ -83,10 +84,10 @@ namespace CompatBot.EventHandlers.LogParsing.SourceHandlers
this.handler = handler;
}
public async Task FillPipeAsync(PipeWriter writer)
public async Task FillPipeAsync(PipeWriter writer, CancellationToken cancellationToken)
{
using (var stream = await client.DownloadAsync(uri, doodad, Config.Cts.Token).ConfigureAwait(false))
await handler.FillPipeAsync(stream, writer).ConfigureAwait(false);
using (var stream = await client.DownloadAsync(uri, doodad, cancellationToken).ConfigureAwait(false))
await handler.FillPipeAsync(stream, writer, cancellationToken).ConfigureAwait(false);
}
}
}

View File

@ -7,6 +7,7 @@ using DSharpPlus.Entities;
using CompatBot.Utils;
using System.IO.Pipelines;
using System.Net.Http;
using System.Threading;
namespace CompatBot.EventHandlers.LogParsing.SourceHandlers
{
@ -78,11 +79,11 @@ namespace CompatBot.EventHandlers.LogParsing.SourceHandlers
public string FileName { get; }
public long SourceFileSize { get; }
public async Task FillPipeAsync(PipeWriter writer)
public async Task FillPipeAsync(PipeWriter writer, CancellationToken cancellationToken)
{
using (var client = HttpClientFactory.Create())
using (var stream = await client.GetStreamAsync(uri).ConfigureAwait(false))
await handler.FillPipeAsync(stream, writer).ConfigureAwait(false);
await handler.FillPipeAsync(stream, writer, cancellationToken).ConfigureAwait(false);
}
}
}

View File

@ -92,9 +92,12 @@ namespace CompatBot.EventHandlers
LogParseState result = null;
try
{
var timeout = new CancellationTokenSource(Config.LogParsingTimeout);
var combinedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(timeout.Token, Config.Cts.Token);
var pipe = new Pipe();
var fillPipeTask = source.FillPipeAsync(pipe.Writer);
var readPipeTask = LogParser.ReadPipeAsync(pipe.Reader);
var fillPipeTask = source.FillPipeAsync(pipe.Writer, combinedTokenSource.Token);
var readPipeTask = LogParser.ReadPipeAsync(pipe.Reader, combinedTokenSource.Token);
do
{
await Task.WhenAny(readPipeTask, Task.Delay(5000)).ConfigureAwait(false);