From d7c5dd7c89a48dc8375ea16a58ed53bab4284479 Mon Sep 17 00:00:00 2001 From: 13xforever Date: Thu, 13 Jun 2019 23:08:07 +0500 Subject: [PATCH] rework how log parsing timeout works to fix the deadlock --- .../LogParsing/ArchiveHandlers/GzipHandler.cs | 9 +++++---- .../LogParsing/ArchiveHandlers/IArchiveHandler.cs | 3 ++- .../LogParsing/ArchiveHandlers/PlainText.cs | 9 +++++---- .../LogParsing/ArchiveHandlers/RarHandler.cs | 11 ++++++----- .../LogParsing/ArchiveHandlers/SevenZipHandler.cs | 11 ++++++----- .../LogParsing/ArchiveHandlers/ZipHandler.cs | 11 ++++++----- .../EventHandlers/LogParsing/LogParser.PipeReader.cs | 9 ++++----- .../SourceHandlers/DiscordAttachmentHandler.cs | 5 +++-- .../LogParsing/SourceHandlers/DropboxHandler.cs | 5 +++-- .../LogParsing/SourceHandlers/GoogleDriveHandler.cs | 9 +++++---- .../LogParsing/SourceHandlers/ISourceHandler.cs | 3 ++- .../LogParsing/SourceHandlers/MegaHandler.cs | 7 ++++--- .../LogParsing/SourceHandlers/PastebinHandler.cs | 5 +++-- CompatBot/EventHandlers/LogParsingHandler.cs | 7 +++++-- 14 files changed, 59 insertions(+), 45 deletions(-) diff --git a/CompatBot/EventHandlers/LogParsing/ArchiveHandlers/GzipHandler.cs b/CompatBot/EventHandlers/LogParsing/ArchiveHandlers/GzipHandler.cs index d3720eeb..ad453101 100644 --- a/CompatBot/EventHandlers/LogParsing/ArchiveHandlers/GzipHandler.cs +++ b/CompatBot/EventHandlers/LogParsing/ArchiveHandlers/GzipHandler.cs @@ -2,6 +2,7 @@ using System.IO; using System.IO.Compression; using System.IO.Pipelines; +using System.Threading; using System.Threading.Tasks; using CompatBot.Utils; @@ -18,7 +19,7 @@ namespace CompatBot.EventHandlers.LogParsing.ArchiveHandlers && !fileName.Contains("tty.log", StringComparison.InvariantCultureIgnoreCase); } - public async Task FillPipeAsync(Stream sourceStream, PipeWriter writer) + public async Task FillPipeAsync(Stream sourceStream, PipeWriter writer, CancellationToken cancellationToken) { using (var statsStream = new BufferCopyStream(sourceStream) ) using (var gzipStream = new GZipStream(statsStream, CompressionMode.Decompress)) @@ -30,12 +31,12 @@ namespace CompatBot.EventHandlers.LogParsing.ArchiveHandlers do { var memory = writer.GetMemory(Config.MinimumBufferSize); - read = await gzipStream.ReadAsync(memory, Config.Cts.Token); + read = await gzipStream.ReadAsync(memory, cancellationToken); writer.Advance(read); SourcePosition = statsStream.Position; - flushed = await writer.FlushAsync(Config.Cts.Token).ConfigureAwait(false); + flushed = await writer.FlushAsync(cancellationToken).ConfigureAwait(false); SourcePosition = statsStream.Position; - } while (read > 0 && !(flushed.IsCompleted || flushed.IsCanceled || Config.Cts.IsCancellationRequested)); + } while (read > 0 && !(flushed.IsCompleted || flushed.IsCanceled || cancellationToken.IsCancellationRequested)); var buf = statsStream.GetBufferedBytes(); if (buf.Length > 3) diff --git a/CompatBot/EventHandlers/LogParsing/ArchiveHandlers/IArchiveHandler.cs b/CompatBot/EventHandlers/LogParsing/ArchiveHandlers/IArchiveHandler.cs index b1cd71a9..d0a30e8d 100644 --- a/CompatBot/EventHandlers/LogParsing/ArchiveHandlers/IArchiveHandler.cs +++ b/CompatBot/EventHandlers/LogParsing/ArchiveHandlers/IArchiveHandler.cs @@ -1,6 +1,7 @@ using System; using System.IO; using System.IO.Pipelines; +using System.Threading; using System.Threading.Tasks; namespace CompatBot.EventHandlers.LogParsing.ArchiveHandlers @@ -8,7 +9,7 @@ namespace CompatBot.EventHandlers.LogParsing.ArchiveHandlers internal interface IArchiveHandler { bool CanHandle(string fileName, int fileSize, ReadOnlySpan header); - Task FillPipeAsync(Stream sourceStream, PipeWriter writer); + Task FillPipeAsync(Stream sourceStream, PipeWriter writer, CancellationToken cancellationToken); long LogSize { get; } long SourcePosition { get; } } diff --git a/CompatBot/EventHandlers/LogParsing/ArchiveHandlers/PlainText.cs b/CompatBot/EventHandlers/LogParsing/ArchiveHandlers/PlainText.cs index c4169e37..9d9cd0cf 100644 --- a/CompatBot/EventHandlers/LogParsing/ArchiveHandlers/PlainText.cs +++ b/CompatBot/EventHandlers/LogParsing/ArchiveHandlers/PlainText.cs @@ -2,6 +2,7 @@ using System.IO; using System.IO.Pipelines; using System.Text; +using System.Threading; using System.Threading.Tasks; namespace CompatBot.EventHandlers.LogParsing.ArchiveHandlers @@ -20,7 +21,7 @@ namespace CompatBot.EventHandlers.LogParsing.ArchiveHandlers && Encoding.UTF8.GetString(header.Slice(0, 8)).Contains("RPCS3"); } - public async Task FillPipeAsync(Stream sourceStream, PipeWriter writer) + public async Task FillPipeAsync(Stream sourceStream, PipeWriter writer, CancellationToken cancellationToken) { try { @@ -29,10 +30,10 @@ namespace CompatBot.EventHandlers.LogParsing.ArchiveHandlers do { var memory = writer.GetMemory(Config.MinimumBufferSize); - read = await sourceStream.ReadAsync(memory, Config.Cts.Token); + read = await sourceStream.ReadAsync(memory, cancellationToken); writer.Advance(read); - flushed = await writer.FlushAsync(Config.Cts.Token).ConfigureAwait(false); - } while (read > 0 && !(flushed.IsCompleted || flushed.IsCanceled || Config.Cts.IsCancellationRequested)); + flushed = await writer.FlushAsync(cancellationToken).ConfigureAwait(false); + } while (read > 0 && !(flushed.IsCompleted || flushed.IsCanceled || cancellationToken.IsCancellationRequested)); } catch (Exception e) { diff --git a/CompatBot/EventHandlers/LogParsing/ArchiveHandlers/RarHandler.cs b/CompatBot/EventHandlers/LogParsing/ArchiveHandlers/RarHandler.cs index ca80c3f7..ac399a30 100644 --- a/CompatBot/EventHandlers/LogParsing/ArchiveHandlers/RarHandler.cs +++ b/CompatBot/EventHandlers/LogParsing/ArchiveHandlers/RarHandler.cs @@ -2,6 +2,7 @@ using System.IO; using System.IO.Pipelines; using System.Text; +using System.Threading; using System.Threading.Tasks; using SharpCompress.Archives.Rar; @@ -24,13 +25,13 @@ namespace CompatBot.EventHandlers.LogParsing.ArchiveHandlers return firstEntry.Contains(".log", StringComparison.InvariantCultureIgnoreCase); } - public async Task FillPipeAsync(Stream sourceStream, PipeWriter writer) + public async Task FillPipeAsync(Stream sourceStream, PipeWriter writer, CancellationToken cancellationToken) { try { using (var fileStream = new FileStream(Path.GetTempFileName(), FileMode.Create, FileAccess.ReadWrite, FileShare.Read, 16384, FileOptions.Asynchronous | FileOptions.RandomAccess | FileOptions.DeleteOnClose)) { - await sourceStream.CopyToAsync(fileStream, 16384, Config.Cts.Token).ConfigureAwait(false); + await sourceStream.CopyToAsync(fileStream, 16384, cancellationToken).ConfigureAwait(false); fileStream.Seek(0, SeekOrigin.Begin); using (var rarArchive = RarArchive.Open(fileStream)) using (var rarReader = rarArchive.ExtractAllEntries()) @@ -47,10 +48,10 @@ namespace CompatBot.EventHandlers.LogParsing.ArchiveHandlers do { var memory = writer.GetMemory(Config.MinimumBufferSize); - read = await rarStream.ReadAsync(memory, Config.Cts.Token); + read = await rarStream.ReadAsync(memory, cancellationToken); writer.Advance(read); - flushed = await writer.FlushAsync(Config.Cts.Token).ConfigureAwait(false); - } while (read > 0 && !(flushed.IsCompleted || flushed.IsCanceled || Config.Cts.IsCancellationRequested)); + flushed = await writer.FlushAsync(cancellationToken).ConfigureAwait(false); + } while (read > 0 && !(flushed.IsCompleted || flushed.IsCanceled || cancellationToken.IsCancellationRequested)); } writer.Complete(); return; diff --git a/CompatBot/EventHandlers/LogParsing/ArchiveHandlers/SevenZipHandler.cs b/CompatBot/EventHandlers/LogParsing/ArchiveHandlers/SevenZipHandler.cs index 084c5038..0376145a 100644 --- a/CompatBot/EventHandlers/LogParsing/ArchiveHandlers/SevenZipHandler.cs +++ b/CompatBot/EventHandlers/LogParsing/ArchiveHandlers/SevenZipHandler.cs @@ -1,6 +1,7 @@ using System; using System.IO; using System.IO.Pipelines; +using System.Threading; using System.Threading.Tasks; using SharpCompress.Archives.SevenZip; @@ -22,13 +23,13 @@ namespace CompatBot.EventHandlers.LogParsing.ArchiveHandlers return true; } - public async Task FillPipeAsync(Stream sourceStream, PipeWriter writer) + public async Task FillPipeAsync(Stream sourceStream, PipeWriter writer, CancellationToken cancellationToken) { try { using (var fileStream = new FileStream(Path.GetTempFileName(), FileMode.Create, FileAccess.ReadWrite, FileShare.Read, 16384, FileOptions.Asynchronous | FileOptions.RandomAccess | FileOptions.DeleteOnClose)) { - await sourceStream.CopyToAsync(fileStream, 16384, Config.Cts.Token).ConfigureAwait(false); + await sourceStream.CopyToAsync(fileStream, 16384, cancellationToken).ConfigureAwait(false); fileStream.Seek(0, SeekOrigin.Begin); using (var zipArchive = SevenZipArchive.Open(fileStream)) using (var zipReader = zipArchive.ExtractAllEntries()) @@ -45,10 +46,10 @@ namespace CompatBot.EventHandlers.LogParsing.ArchiveHandlers do { var memory = writer.GetMemory(Config.MinimumBufferSize); - read = await entryStream.ReadAsync(memory, Config.Cts.Token); + read = await entryStream.ReadAsync(memory, cancellationToken); writer.Advance(read); - flushed = await writer.FlushAsync(Config.Cts.Token).ConfigureAwait(false); - } while (read > 0 && !(flushed.IsCompleted || flushed.IsCanceled || Config.Cts.IsCancellationRequested)); + flushed = await writer.FlushAsync(cancellationToken).ConfigureAwait(false); + } while (read > 0 && !(flushed.IsCompleted || flushed.IsCanceled || cancellationToken.IsCancellationRequested)); } writer.Complete(); return; diff --git a/CompatBot/EventHandlers/LogParsing/ArchiveHandlers/ZipHandler.cs b/CompatBot/EventHandlers/LogParsing/ArchiveHandlers/ZipHandler.cs index 44cc31ab..4ccac218 100644 --- a/CompatBot/EventHandlers/LogParsing/ArchiveHandlers/ZipHandler.cs +++ b/CompatBot/EventHandlers/LogParsing/ArchiveHandlers/ZipHandler.cs @@ -4,6 +4,7 @@ using System.IO.Compression; using System.IO.Pipelines; using System.Linq; using System.Text; +using System.Threading; using System.Threading.Tasks; namespace CompatBot.EventHandlers.LogParsing.ArchiveHandlers @@ -25,13 +26,13 @@ namespace CompatBot.EventHandlers.LogParsing.ArchiveHandlers return firstEntry.Contains(".log", StringComparison.InvariantCultureIgnoreCase); } - public async Task FillPipeAsync(Stream sourceStream, PipeWriter writer) + public async Task FillPipeAsync(Stream sourceStream, PipeWriter writer, CancellationToken cancellationToken) { try { using (var fileStream = new FileStream(Path.GetTempFileName(), FileMode.Create, FileAccess.ReadWrite, FileShare.Read, 16384, FileOptions.Asynchronous | FileOptions.RandomAccess | FileOptions.DeleteOnClose)) { - await sourceStream.CopyToAsync(fileStream, 16384, Config.Cts.Token).ConfigureAwait(false); + await sourceStream.CopyToAsync(fileStream, 16384, cancellationToken).ConfigureAwait(false); fileStream.Seek(0, SeekOrigin.Begin); using (var zipArchive = new ZipArchive(fileStream, ZipArchiveMode.Read)) { @@ -47,10 +48,10 @@ namespace CompatBot.EventHandlers.LogParsing.ArchiveHandlers do { var memory = writer.GetMemory(Config.MinimumBufferSize); - read = await zipStream.ReadAsync(memory, Config.Cts.Token); + read = await zipStream.ReadAsync(memory, cancellationToken); writer.Advance(read); - flushed = await writer.FlushAsync(Config.Cts.Token).ConfigureAwait(false); - } while (read > 0 && !(flushed.IsCompleted || flushed.IsCanceled || Config.Cts.IsCancellationRequested)); + flushed = await writer.FlushAsync(cancellationToken).ConfigureAwait(false); + } while (read > 0 && !(flushed.IsCompleted || flushed.IsCanceled || cancellationToken.IsCancellationRequested)); } } } diff --git a/CompatBot/EventHandlers/LogParsing/LogParser.PipeReader.cs b/CompatBot/EventHandlers/LogParsing/LogParser.PipeReader.cs index fcb253b9..da0cd4f9 100644 --- a/CompatBot/EventHandlers/LogParsing/LogParser.PipeReader.cs +++ b/CompatBot/EventHandlers/LogParsing/LogParser.PipeReader.cs @@ -16,9 +16,8 @@ namespace CompatBot.EventHandlers.LogParsing private static readonly PoorMansTaskScheduler TaskScheduler = new PoorMansTaskScheduler(); - public static async Task ReadPipeAsync(PipeReader reader) + public static async Task ReadPipeAsync(PipeReader reader, CancellationToken cancellationToken) { - var timeout = new CancellationTokenSource(Config.LogParsingTimeout); var currentSectionLines = new LinkedList>(); var state = new LogParseState(); bool skippedBom = false; @@ -26,7 +25,7 @@ namespace CompatBot.EventHandlers.LogParsing ReadResult result; do { - result = await reader.ReadAsync(Config.Cts.Token).ConfigureAwait(false); + result = await reader.ReadAsync(cancellationToken).ConfigureAwait(false); var buffer = result.Buffer; if (!skippedBom) { @@ -62,14 +61,14 @@ namespace CompatBot.EventHandlers.LogParsing } } while (lineEnd != null); - if (result.IsCanceled || Config.Cts.IsCancellationRequested || timeout.IsCancellationRequested) + if (result.IsCanceled || cancellationToken.IsCancellationRequested) state.Error = LogParseState.ErrorCode.SizeLimit; else if (result.IsCompleted) await FlushAllLinesAsync(result.Buffer, currentSectionLines, state).ConfigureAwait(false); var sectionStart = currentSectionLines.Count == 0 ? buffer : currentSectionLines.First.Value; totalReadBytes += result.Buffer.Slice(0, sectionStart.Start).Length; reader.AdvanceTo(sectionStart.Start); - } while (!(result.IsCompleted || result.IsCanceled || Config.Cts.IsCancellationRequested || timeout.IsCancellationRequested)); + } while (!(result.IsCompleted || result.IsCanceled || cancellationToken.IsCancellationRequested)); await TaskScheduler.WaitForClearTagAsync(state).ConfigureAwait(false); state.ReadBytes = totalReadBytes; reader.Complete(); diff --git a/CompatBot/EventHandlers/LogParsing/SourceHandlers/DiscordAttachmentHandler.cs b/CompatBot/EventHandlers/LogParsing/SourceHandlers/DiscordAttachmentHandler.cs index 6bb6bfc2..8623d0fa 100644 --- a/CompatBot/EventHandlers/LogParsing/SourceHandlers/DiscordAttachmentHandler.cs +++ b/CompatBot/EventHandlers/LogParsing/SourceHandlers/DiscordAttachmentHandler.cs @@ -2,6 +2,7 @@ using System.Collections.Generic; using System.IO.Pipelines; using System.Net.Http; +using System.Threading; using System.Threading.Tasks; using CompatBot.EventHandlers.LogParsing.ArchiveHandlers; using CompatBot.Utils; @@ -61,11 +62,11 @@ namespace CompatBot.EventHandlers.LogParsing.SourceHandlers SourceFileSize = fileSize; } - public async Task FillPipeAsync(PipeWriter writer) + public async Task FillPipeAsync(PipeWriter writer, CancellationToken cancellationToken) { using (var client = HttpClientFactory.Create()) using (var stream = await client.GetStreamAsync(attachment.Url).ConfigureAwait(false)) - await handler.FillPipeAsync(stream, writer).ConfigureAwait(false); + await handler.FillPipeAsync(stream, writer, cancellationToken).ConfigureAwait(false); } } } diff --git a/CompatBot/EventHandlers/LogParsing/SourceHandlers/DropboxHandler.cs b/CompatBot/EventHandlers/LogParsing/SourceHandlers/DropboxHandler.cs index 71527832..af3b0ee1 100644 --- a/CompatBot/EventHandlers/LogParsing/SourceHandlers/DropboxHandler.cs +++ b/CompatBot/EventHandlers/LogParsing/SourceHandlers/DropboxHandler.cs @@ -8,6 +8,7 @@ using DSharpPlus.Entities; using CompatBot.Utils; using System.IO.Pipelines; using System.Net.Http; +using System.Threading; using CompatApiClient; namespace CompatBot.EventHandlers.LogParsing.SourceHandlers @@ -94,11 +95,11 @@ namespace CompatBot.EventHandlers.LogParsing.SourceHandlers SourceFileSize = fileSize; } - public async Task FillPipeAsync(PipeWriter writer) + public async Task FillPipeAsync(PipeWriter writer, CancellationToken cancellationToken) { using (var client = HttpClientFactory.Create()) using (var stream = await client.GetStreamAsync(uri).ConfigureAwait(false)) - await handler.FillPipeAsync(stream, writer).ConfigureAwait(false); + await handler.FillPipeAsync(stream, writer, cancellationToken).ConfigureAwait(false); } } } diff --git a/CompatBot/EventHandlers/LogParsing/SourceHandlers/GoogleDriveHandler.cs b/CompatBot/EventHandlers/LogParsing/SourceHandlers/GoogleDriveHandler.cs index bdf3371a..713246f2 100644 --- a/CompatBot/EventHandlers/LogParsing/SourceHandlers/GoogleDriveHandler.cs +++ b/CompatBot/EventHandlers/LogParsing/SourceHandlers/GoogleDriveHandler.cs @@ -4,6 +4,7 @@ using System.IO; using System.IO.Pipelines; using System.Net.Http.Headers; using System.Text.RegularExpressions; +using System.Threading; using System.Threading.Tasks; using CompatBot.EventHandlers.LogParsing.ArchiveHandlers; using DSharpPlus.Entities; @@ -109,21 +110,21 @@ namespace CompatBot.EventHandlers.LogParsing.SourceHandlers this.handler = handler; } - public async Task FillPipeAsync(PipeWriter writer) + public async Task FillPipeAsync(PipeWriter writer, CancellationToken cancellationToken) { try { var pipe = new Pipe(); using (var pushStream = pipe.Writer.AsStream()) { - var progressTask = fileInfoRequest.DownloadAsync(pushStream, Config.Cts.Token); + var progressTask = fileInfoRequest.DownloadAsync(pushStream, cancellationToken); using (var pullStream = pipe.Reader.AsStream()) { - var pipingTask = handler.FillPipeAsync(pullStream, writer); + var pipingTask = handler.FillPipeAsync(pullStream, writer, cancellationToken); var result = await progressTask.ConfigureAwait(false); if (result.Status != DownloadStatus.Completed) Config.Log.Error(result.Exception, "Failed to download file from Google Drive: " + result.Status); - await pipe.Writer.FlushAsync(Config.Cts.Token).ConfigureAwait(false); + await pipe.Writer.FlushAsync(cancellationToken).ConfigureAwait(false); pipe.Writer.Complete(); await pipingTask.ConfigureAwait(false); } diff --git a/CompatBot/EventHandlers/LogParsing/SourceHandlers/ISourceHandler.cs b/CompatBot/EventHandlers/LogParsing/SourceHandlers/ISourceHandler.cs index 6366a279..93a46c86 100644 --- a/CompatBot/EventHandlers/LogParsing/SourceHandlers/ISourceHandler.cs +++ b/CompatBot/EventHandlers/LogParsing/SourceHandlers/ISourceHandler.cs @@ -1,5 +1,6 @@ using System.Collections.Generic; using System.IO.Pipelines; +using System.Threading; using System.Threading.Tasks; using CompatBot.EventHandlers.LogParsing.ArchiveHandlers; using DSharpPlus.Entities; @@ -18,6 +19,6 @@ namespace CompatBot.EventHandlers.LogParsing.SourceHandlers long SourceFileSize { get; } long SourceFilePosition { get; } long LogFileSize { get; } - Task FillPipeAsync(PipeWriter writer); + Task FillPipeAsync(PipeWriter writer, CancellationToken cancellationToken); } } diff --git a/CompatBot/EventHandlers/LogParsing/SourceHandlers/MegaHandler.cs b/CompatBot/EventHandlers/LogParsing/SourceHandlers/MegaHandler.cs index 119bc19a..2b19c5f4 100644 --- a/CompatBot/EventHandlers/LogParsing/SourceHandlers/MegaHandler.cs +++ b/CompatBot/EventHandlers/LogParsing/SourceHandlers/MegaHandler.cs @@ -7,6 +7,7 @@ using DSharpPlus.Entities; using CG.Web.MegaApiClient; using CompatBot.Utils; using System.IO.Pipelines; +using System.Threading; namespace CompatBot.EventHandlers.LogParsing.SourceHandlers { @@ -83,10 +84,10 @@ namespace CompatBot.EventHandlers.LogParsing.SourceHandlers this.handler = handler; } - public async Task FillPipeAsync(PipeWriter writer) + public async Task FillPipeAsync(PipeWriter writer, CancellationToken cancellationToken) { - using (var stream = await client.DownloadAsync(uri, doodad, Config.Cts.Token).ConfigureAwait(false)) - await handler.FillPipeAsync(stream, writer).ConfigureAwait(false); + using (var stream = await client.DownloadAsync(uri, doodad, cancellationToken).ConfigureAwait(false)) + await handler.FillPipeAsync(stream, writer, cancellationToken).ConfigureAwait(false); } } } diff --git a/CompatBot/EventHandlers/LogParsing/SourceHandlers/PastebinHandler.cs b/CompatBot/EventHandlers/LogParsing/SourceHandlers/PastebinHandler.cs index 50d1880f..39655b3e 100644 --- a/CompatBot/EventHandlers/LogParsing/SourceHandlers/PastebinHandler.cs +++ b/CompatBot/EventHandlers/LogParsing/SourceHandlers/PastebinHandler.cs @@ -7,6 +7,7 @@ using DSharpPlus.Entities; using CompatBot.Utils; using System.IO.Pipelines; using System.Net.Http; +using System.Threading; namespace CompatBot.EventHandlers.LogParsing.SourceHandlers { @@ -78,11 +79,11 @@ namespace CompatBot.EventHandlers.LogParsing.SourceHandlers public string FileName { get; } public long SourceFileSize { get; } - public async Task FillPipeAsync(PipeWriter writer) + public async Task FillPipeAsync(PipeWriter writer, CancellationToken cancellationToken) { using (var client = HttpClientFactory.Create()) using (var stream = await client.GetStreamAsync(uri).ConfigureAwait(false)) - await handler.FillPipeAsync(stream, writer).ConfigureAwait(false); + await handler.FillPipeAsync(stream, writer, cancellationToken).ConfigureAwait(false); } } } diff --git a/CompatBot/EventHandlers/LogParsingHandler.cs b/CompatBot/EventHandlers/LogParsingHandler.cs index 03d0c1bf..52e8cb0a 100644 --- a/CompatBot/EventHandlers/LogParsingHandler.cs +++ b/CompatBot/EventHandlers/LogParsingHandler.cs @@ -92,9 +92,12 @@ namespace CompatBot.EventHandlers LogParseState result = null; try { + var timeout = new CancellationTokenSource(Config.LogParsingTimeout); + var combinedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(timeout.Token, Config.Cts.Token); + var pipe = new Pipe(); - var fillPipeTask = source.FillPipeAsync(pipe.Writer); - var readPipeTask = LogParser.ReadPipeAsync(pipe.Reader); + var fillPipeTask = source.FillPipeAsync(pipe.Writer, combinedTokenSource.Token); + var readPipeTask = LogParser.ReadPipeAsync(pipe.Reader, combinedTokenSource.Token); do { await Task.WhenAny(readPipeTask, Task.Delay(5000)).ConfigureAwait(false);