mirror of
https://github.com/reactos/syzkaller.git
synced 2024-12-12 13:56:12 +00:00
d9c79f8842
GCE serial reply seems to be buggy, we see lots of "serialport: VM disconnected" and "packet_write_wait: Connection to 1.2.3.4 port 9600: Broken pipe" errors, which do not have any explanation. Ignore all serial relay errors.
106 lines
2.2 KiB
Go
106 lines
2.2 KiB
Go
// Copyright 2016 syzkaller project authors. All rights reserved.
|
|
// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
|
|
|
|
package vmimpl
|
|
|
|
import (
|
|
"bytes"
|
|
"fmt"
|
|
"io"
|
|
"sync"
|
|
)
|
|
|
|
type OutputMerger struct {
|
|
Output chan []byte
|
|
Err chan error
|
|
teeMu sync.Mutex
|
|
tee io.Writer
|
|
wg sync.WaitGroup
|
|
}
|
|
|
|
type MergerError struct {
|
|
Name string
|
|
R io.ReadCloser
|
|
Err error
|
|
}
|
|
|
|
func (err MergerError) Error() string {
|
|
return fmt.Sprintf("failed to read from %v: %v", err.Name, err.Err)
|
|
}
|
|
|
|
func NewOutputMerger(tee io.Writer) *OutputMerger {
|
|
return &OutputMerger{
|
|
Output: make(chan []byte, 1000),
|
|
Err: make(chan error, 1),
|
|
tee: tee,
|
|
}
|
|
}
|
|
|
|
func (merger *OutputMerger) Wait() {
|
|
merger.wg.Wait()
|
|
close(merger.Output)
|
|
}
|
|
|
|
func (merger *OutputMerger) Add(name string, r io.ReadCloser) {
|
|
merger.AddDecoder(name, r, nil)
|
|
}
|
|
|
|
func (merger *OutputMerger) AddDecoder(name string, r io.ReadCloser,
|
|
decoder func(data []byte) (start, size int, decoded []byte)) {
|
|
merger.wg.Add(1)
|
|
go func() {
|
|
var pending []byte
|
|
var proto []byte
|
|
var buf [4 << 10]byte
|
|
for {
|
|
n, err := r.Read(buf[:])
|
|
if n != 0 {
|
|
if decoder != nil {
|
|
proto = append(proto, buf[:n]...)
|
|
start, size, decoded := decoder(proto)
|
|
proto = proto[start+size:]
|
|
if len(decoded) != 0 {
|
|
merger.Output <- decoded // note: this can block
|
|
}
|
|
}
|
|
pending = append(pending, buf[:n]...)
|
|
if pos := bytes.LastIndexByte(pending, '\n'); pos != -1 {
|
|
out := pending[:pos+1]
|
|
if merger.tee != nil {
|
|
merger.teeMu.Lock()
|
|
merger.tee.Write(out)
|
|
merger.teeMu.Unlock()
|
|
}
|
|
select {
|
|
case merger.Output <- append([]byte{}, out...):
|
|
r := copy(pending[:], pending[pos+1:])
|
|
pending = pending[:r]
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
if err != nil {
|
|
if len(pending) != 0 {
|
|
pending = append(pending, '\n')
|
|
if merger.tee != nil {
|
|
merger.teeMu.Lock()
|
|
merger.tee.Write(pending)
|
|
merger.teeMu.Unlock()
|
|
}
|
|
select {
|
|
case merger.Output <- pending:
|
|
default:
|
|
}
|
|
}
|
|
r.Close()
|
|
select {
|
|
case merger.Err <- MergerError{name, r, err}:
|
|
default:
|
|
}
|
|
merger.wg.Done()
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
}
|