vm: merger console/fuzzer output line-by-line

Fixes #57
This commit is contained in:
Dmitry Vyukov 2016-08-28 19:21:57 +02:00
parent 1f9bd1e845
commit 76f68d6039
7 changed files with 242 additions and 128 deletions

View File

@ -301,13 +301,15 @@ func (mgr *Manager) runInstance(vmCfg *vm.Config, first bool) bool {
waitForOutput := func(dur time.Duration) {
timer := time.NewTimer(dur).C
loop:
for {
select {
case out := <-outputC:
case out, ok := <-outputC:
if !ok {
return
}
output = append(output, out...)
case <-timer:
break loop
return
}
}
}
@ -340,7 +342,7 @@ func (mgr *Manager) runInstance(vmCfg *vm.Config, first bool) bool {
logf(0, "%v: running long enough, restarting", vmCfg.Name)
return true
default:
logf(0, "%v: lost connection: %v", vmCfg.Name, err)
waitForOutput(10 * time.Second)
saveCrasher("lost connection", output)
return true
}
@ -362,6 +364,7 @@ func (mgr *Manager) runInstance(vmCfg *vm.Config, first bool) bool {
end = len(output)
}
saveCrasher(desc, output[start:end])
return true
}
if len(output) > 2*beforeContext {
copy(output, output[len(output)-beforeContext:])

View File

@ -5,12 +5,12 @@ package adb
import (
"fmt"
"io"
"io/ioutil"
"log"
"os"
"os/exec"
"path/filepath"
"syscall"
"time"
"github.com/google/syzkaller/vm"
@ -153,23 +153,21 @@ func (inst *instance) Copy(hostSrc string) (string, error) {
}
func (inst *instance) Run(timeout time.Duration, command string) (<-chan []byte, <-chan error, error) {
rpipe, wpipe, err := os.Pipe()
catRpipe, catWpipe, err := vm.LongPipe()
if err != nil {
return nil, nil, fmt.Errorf("failed to create pipe: %v", err)
}
for sz := 128 << 10; sz <= 2<<20; sz *= 2 {
syscall.Syscall(syscall.SYS_FCNTL, wpipe.Fd(), syscall.F_SETPIPE_SZ, uintptr(sz))
return nil, nil, err
}
cat := exec.Command("cat", inst.cfg.ConsoleDev)
cat.Stdout = wpipe
cat.Stderr = wpipe
cat.Stdout = catWpipe
cat.Stderr = catWpipe
if err := cat.Start(); err != nil {
rpipe.Close()
wpipe.Close()
catRpipe.Close()
catWpipe.Close()
return nil, nil, fmt.Errorf("failed to start cat %v: %v", inst.cfg.ConsoleDev, err)
}
catWpipe.Close()
catDone := make(chan error, 1)
go func() {
err := cat.Wait()
@ -179,18 +177,26 @@ func (inst *instance) Run(timeout time.Duration, command string) (<-chan []byte,
catDone <- fmt.Errorf("cat exited: %v", err)
}()
adbRpipe, adbWpipe, err := vm.LongPipe()
if err != nil {
cat.Process.Kill()
catRpipe.Close()
return nil, nil, err
}
if inst.cfg.Debug {
log.Printf("starting: adb shell %v", command)
}
adb := exec.Command(inst.cfg.Bin, "shell", "cd /data; "+command)
adb.Stdout = wpipe
adb.Stderr = wpipe
adb.Stdout = adbWpipe
adb.Stderr = adbWpipe
if err := adb.Start(); err != nil {
cat.Process.Kill()
rpipe.Close()
wpipe.Close()
catRpipe.Close()
adbRpipe.Close()
adbWpipe.Close()
return nil, nil, fmt.Errorf("failed to start adb: %v", err)
}
adbWpipe.Close()
adbDone := make(chan error, 1)
go func() {
err := adb.Wait()
@ -200,42 +206,22 @@ func (inst *instance) Run(timeout time.Duration, command string) (<-chan []byte,
adbDone <- fmt.Errorf("adb exited: %v", err)
}()
wpipe.Close()
outc := make(chan []byte, 10)
var tee io.Writer
if inst.cfg.Debug {
tee = os.Stdout
}
merger := vm.NewOutputMerger(tee)
merger.Add(catRpipe)
merger.Add(adbRpipe)
errc := make(chan error, 1)
signal := func(err error) {
time.Sleep(5 * time.Second) // wait for any pending output
select {
case errc <- err:
default:
}
}
go func() {
var buf [64 << 10]byte
var output []byte
for {
n, err := rpipe.Read(buf[:])
if n != 0 {
if inst.cfg.Debug {
os.Stdout.Write(buf[:n])
os.Stdout.Write([]byte{'\n'})
}
output = append(output, buf[:n]...)
select {
case outc <- output:
output = nil
default:
}
time.Sleep(time.Millisecond)
}
if err != nil {
rpipe.Close()
return
}
}
}()
go func() {
select {
case <-time.After(timeout):
@ -256,6 +242,7 @@ func (inst *instance) Run(timeout time.Duration, command string) (<-chan []byte,
signal(err)
cat.Process.Kill()
}
merger.Wait()
}()
return outc, errc, nil
return merger.Output, errc, nil
}

View File

@ -215,7 +215,6 @@ func (inst *instance) Run(timeout time.Duration, command string) (<-chan []byte,
}
signal := func(err error) {
time.Sleep(3 * time.Second) // wait for any pending output
inst.mu.Lock()
if inst.outputC == outputC {
inst.outputB = nil

63
vm/merger.go Normal file
View File

@ -0,0 +1,63 @@
// Copyright 2016 syzkaller project authors. All rights reserved.
// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
package vm
import (
"bytes"
"io"
"sync"
)
type OutputMerger struct {
Output chan []byte
tee io.Writer
wg sync.WaitGroup
}
func NewOutputMerger(tee io.Writer) *OutputMerger {
return &OutputMerger{
Output: make(chan []byte, 1000),
tee: tee,
}
}
func (merger *OutputMerger) Wait() {
merger.wg.Wait()
close(merger.Output)
}
func (merger *OutputMerger) Add(r io.ReadCloser) {
merger.wg.Add(1)
go func() {
var pending []byte
var buf [4 << 10]byte
for {
n, err := r.Read(buf[:])
if n != 0 {
pending = append(pending, buf[:n]...)
if pos := bytes.LastIndexByte(pending, '\n'); pos != -1 {
out := pending[:pos+1]
if merger.tee != nil {
merger.tee.Write(out)
}
merger.Output <- append([]byte{}, out...)
r := copy(pending[:], pending[pos+1:])
pending = pending[:r]
}
}
if err != nil {
if len(pending) != 0 {
pending = append(pending, '\n')
if merger.tee != nil {
merger.tee.Write(pending)
}
merger.Output <- pending
}
r.Close()
merger.wg.Done()
return
}
}
}()
}

73
vm/merger_test.go Normal file
View File

@ -0,0 +1,73 @@
// Copyright 2016 syzkaller project authors. All rights reserved.
// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
package vm
import (
"bytes"
"testing"
"time"
)
func TestMerger(t *testing.T) {
tee := new(bytes.Buffer)
merger := NewOutputMerger(tee)
rp1, wp1, err := LongPipe()
if err != nil {
t.Fatal(err)
}
defer wp1.Close()
merger.Add(rp1)
rp2, wp2, err := LongPipe()
if err != nil {
t.Fatal(err)
}
defer wp2.Close()
merger.Add(rp2)
wp1.Write([]byte("111"))
select {
case <-merger.Output:
t.Fatalf("merger produced incomplete line")
case <-time.After(10 * time.Millisecond):
}
wp2.Write([]byte("222"))
select {
case <-merger.Output:
t.Fatalf("merger produced incomplete line")
case <-time.After(10 * time.Millisecond):
}
wp1.Write([]byte("333\n444"))
got := string(<-merger.Output)
if want := "111333\n"; got != want {
t.Fatalf("bad line: '%s', want '%s'", got, want)
}
wp2.Write([]byte("555\n666\n777"))
got = string(<-merger.Output)
if want := "222555\n666\n"; got != want {
t.Fatalf("bad line: '%s', want '%s'", got, want)
}
wp1.Close()
got = string(<-merger.Output)
if want := "444\n"; got != want {
t.Fatalf("bad line: '%s', want '%s'", got, want)
}
wp2.Close()
got = string(<-merger.Output)
if want := "777\n"; got != want {
t.Fatalf("bad line: '%s', want '%s'", got, want)
}
merger.Wait()
want := "111333\n222555\n666\n444\n777\n"
if got := string(tee.Bytes()); got != want {
t.Fatalf("bad tee: '%s', want '%s'", got, want)
}
}

View File

@ -5,6 +5,7 @@ package qemu
import (
"fmt"
"io"
"io/ioutil"
"math/rand"
"net"
@ -13,8 +14,6 @@ import (
"path/filepath"
"strconv"
"strings"
"sync"
"syscall"
"time"
"github.com/google/syzkaller/vm"
@ -31,15 +30,11 @@ func init() {
type instance struct {
cfg *vm.Config
port int
rpipe *os.File
wpipe *os.File
rpipe io.ReadCloser
wpipe io.WriteCloser
qemu *exec.Cmd
readerC chan error
waiterC chan error
mu sync.Mutex
outputB []byte
outputC chan []byte
merger *vm.OutputMerger
}
func ctor(cfg *vm.Config) (vm.Instance, error) {
@ -81,12 +76,9 @@ func ctorImpl(cfg *vm.Config) (vm.Instance, error) {
}
var err error
inst.rpipe, inst.wpipe, err = os.Pipe()
inst.rpipe, inst.wpipe, err = vm.LongPipe()
if err != nil {
return nil, fmt.Errorf("failed to create pipe: %v", err)
}
for sz := 128 << 10; sz <= 2<<20; sz *= 2 {
syscall.Syscall(syscall.SYS_FCNTL, inst.wpipe.Fd(), syscall.F_SETPIPE_SZ, uintptr(sz))
return nil, err
}
if err := inst.Boot(); err != nil {
@ -127,7 +119,9 @@ func (inst *instance) Close() {
inst.qemu.Process.Kill()
err := <-inst.waiterC
inst.waiterC <- err // repost it for waiting goroutines
<-inst.readerC
}
if inst.merger != nil {
inst.merger.Wait()
}
if inst.rpipe != nil {
inst.rpipe.Close()
@ -197,46 +191,38 @@ func (inst *instance) Boot() error {
if err := qemu.Start(); err != nil {
return fmt.Errorf("failed to start %v %+v: %v", inst.cfg.Bin, args, err)
}
inst.wpipe.Close()
inst.wpipe = nil
inst.qemu = qemu
// Qemu has started.
// Start output reading goroutine.
inst.readerC = make(chan error)
go func(rpipe *os.File) {
var buf [64 << 10]byte
// Start output merger.
var tee io.Writer
if inst.cfg.Debug {
tee = os.Stdout
}
inst.merger = vm.NewOutputMerger(tee)
inst.merger.Add(inst.rpipe)
inst.rpipe = nil
var bootOutput []byte
bootOutputStop := make(chan bool)
go func() {
for {
n, err := rpipe.Read(buf[:])
if n != 0 {
if inst.cfg.Debug {
os.Stdout.Write(buf[:n])
os.Stdout.Write([]byte{'\n'})
}
inst.mu.Lock()
inst.outputB = append(inst.outputB, buf[:n]...)
if inst.outputC != nil {
select {
case inst.outputC <- inst.outputB:
inst.outputB = nil
default:
}
}
inst.mu.Unlock()
time.Sleep(time.Millisecond)
}
if err != nil {
rpipe.Close()
inst.readerC <- err
select {
case out := <-inst.merger.Output:
bootOutput = append(bootOutput, out...)
case <-bootOutputStop:
close(bootOutputStop)
return
}
}
}(inst.rpipe)
inst.rpipe = nil
}()
// Wait for the qemu asynchronously.
inst.waiterC = make(chan error, 1)
go func() {
err := qemu.Wait()
inst.wpipe.Close()
inst.waiterC <- err
}()
@ -259,23 +245,18 @@ func (inst *instance) Boot() error {
case err := <-inst.waiterC:
inst.waiterC <- err // repost it for Close
time.Sleep(time.Second) // wait for any pending output
inst.mu.Lock()
output := inst.outputB
inst.mu.Unlock()
return fmt.Errorf("qemu stopped:\n%v\n", string(output))
bootOutputStop <- true
<-bootOutputStop
return fmt.Errorf("qemu stopped:\n%v\n", string(bootOutput))
default:
}
if time.Since(start) > 10*time.Minute {
inst.mu.Lock()
output := inst.outputB
inst.mu.Unlock()
return fmt.Errorf("ssh server did not start:\n%v\n", string(output))
bootOutputStop <- true
<-bootOutputStop
return fmt.Errorf("ssh server did not start:\n%v\n", string(bootOutput))
}
}
// Drop boot output. It is not interesting if the VM has successfully booted.
inst.mu.Lock()
inst.outputB = nil
inst.mu.Unlock()
bootOutputStop <- true
return nil
}
@ -311,35 +292,29 @@ func (inst *instance) Copy(hostSrc string) (string, error) {
}
func (inst *instance) Run(timeout time.Duration, command string) (<-chan []byte, <-chan error, error) {
outputC := make(chan []byte, 10)
errorC := make(chan error, 1)
inst.mu.Lock()
inst.outputB = nil
inst.outputC = outputC
inst.mu.Unlock()
rpipe, wpipe, err := vm.LongPipe()
if err != nil {
return nil, nil, err
}
inst.merger.Add(rpipe)
args := append(inst.sshArgs("-p"), "root@localhost", command)
cmd := exec.Command("ssh", args...)
cmd.Stdout = wpipe
cmd.Stderr = wpipe
if err := cmd.Start(); err != nil {
wpipe.Close()
return nil, nil, err
}
wpipe.Close()
errc := make(chan error, 1)
signal := func(err error) {
time.Sleep(3 * time.Second) // wait for any pending output
inst.mu.Lock()
if inst.outputC == outputC {
inst.outputB = nil
inst.outputC = nil
}
inst.mu.Unlock()
select {
case errorC <- err:
case errc <- err:
default:
}
}
args := append(inst.sshArgs("-p"), "root@localhost", command)
cmd := exec.Command("ssh", args...)
cmd.Stdout = inst.wpipe
cmd.Stderr = inst.wpipe
if err := cmd.Start(); err != nil {
inst.mu.Lock()
inst.outputC = nil
inst.mu.Unlock()
return nil, nil, err
}
done := make(chan bool)
go func() {
select {
@ -354,7 +329,7 @@ func (inst *instance) Run(timeout time.Duration, command string) (<-chan []byte,
close(done)
signal(err)
}()
return outputC, errorC, nil
return inst.merger.Output, errc, nil
}
func (inst *instance) sshArgs(portArg string) []string {

View File

@ -7,6 +7,9 @@ import (
"bytes"
"errors"
"fmt"
"io"
"os"
"syscall"
"time"
)
@ -62,6 +65,17 @@ func Create(typ string, cfg *Config) (Instance, error) {
return ctor(cfg)
}
func LongPipe() (io.ReadCloser, io.WriteCloser, error) {
r, w, err := os.Pipe()
if err != nil {
return nil, nil, fmt.Errorf("failed to create pipe: %v", err)
}
for sz := 128 << 10; sz <= 2<<20; sz *= 2 {
syscall.Syscall(syscall.SYS_FCNTL, w.Fd(), syscall.F_SETPIPE_SZ, uintptr(sz))
}
return r, w, err
}
// FindCrash searches kernel console output for oops messages.
// Desc contains a more-or-less representative description of the first oops,
// start and end denote region of output with oops message(s).