syz-manager: refactor work with hub

Move work with hub into a separate file and fully separate
its state from the rest of the manager state.
First step towards splitting manager into managable parts.
This also required to rework stats as they are used throughout the code.

Update #538
Update #605
This commit is contained in:
Dmitry Vyukov 2018-08-02 16:44:21 +02:00
parent fbedd425b5
commit 976e4de048
4 changed files with 346 additions and 204 deletions

View File

@ -122,8 +122,16 @@ func (mgr *Manager) collectStats() []UIStat {
secs = uint64(time.Since(mgr.firstConnect))/1e9 + 1
}
intStats := convertStats(mgr.stats.all(), secs)
intStats = append(intStats, convertStats(mgr.fuzzerStats, secs)...)
sort.Sort(UIStatArray(intStats))
stats = append(stats, intStats...)
return stats
}
func convertStats(stats map[string]uint64, secs uint64) []UIStat {
var intStats []UIStat
for k, v := range mgr.stats {
for k, v := range stats {
val := fmt.Sprintf("%v", v)
if x := v / secs; x >= 10 {
val += fmt.Sprintf(" (%v/sec)", x)
@ -135,9 +143,7 @@ func (mgr *Manager) collectStats() []UIStat {
}
intStats = append(intStats, UIStat{Name: k, Value: val})
}
sort.Sort(UIStatArray(intStats))
stats = append(stats, intStats...)
return stats
return intStats
}
func (mgr *Manager) collectSyscallInfo() map[string]*CallCov {
@ -413,11 +419,11 @@ func readCrash(workdir, dir string, repros map[string]bool, full bool) *UICrashT
return nil
}
defer descFile.Close()
desc, err := ioutil.ReadAll(descFile)
if err != nil || len(desc) == 0 {
descBytes, err := ioutil.ReadAll(descFile)
if err != nil || len(descBytes) == 0 {
return nil
}
desc = trimNewLines(desc)
desc := string(trimNewLines(descBytes))
stat, err := descFile.Stat()
if err != nil {
return nil
@ -471,20 +477,9 @@ func readCrash(workdir, dir string, repros map[string]bool, full bool) *UICrashT
sort.Sort(UICrashArray(crashes))
}
triaged := ""
if hasRepro {
if hasCRepro {
triaged = "has C repro"
} else {
triaged = "has repro"
}
} else if repros[string(desc)] {
triaged = "reproducing"
} else if reproAttempts >= maxReproAttempts {
triaged = "non-reproducible"
}
triaged := reproStatus(hasRepro, hasCRepro, repros[desc], reproAttempts >= maxReproAttempts)
return &UICrashType{
Description: string(desc),
Description: desc,
LastTime: modTime.Format(dateFormat),
ID: dir,
Count: len(crashes),
@ -493,6 +488,21 @@ func readCrash(workdir, dir string, repros map[string]bool, full bool) *UICrashT
}
}
func reproStatus(hasRepro, hasCRepro, reproducing, nonReproducible bool) string {
status := ""
if hasRepro {
status = "has repro"
if hasCRepro {
status = "has C repro"
}
} else if reproducing {
status = "reproducing"
} else if nonReproducible {
status = "non-reproducible"
}
return status
}
func trimNewLines(data []byte) []byte {
for len(data) > 0 && data[len(data)-1] == '\n' {
data = data[:len(data)-1]

192
syz-manager/hub.go Normal file
View File

@ -0,0 +1,192 @@
// Copyright 2018 syzkaller project authors. All rights reserved.
// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
package main
import (
"time"
"github.com/google/syzkaller/pkg/hash"
"github.com/google/syzkaller/pkg/log"
"github.com/google/syzkaller/pkg/mgrconfig"
"github.com/google/syzkaller/pkg/report"
"github.com/google/syzkaller/pkg/rpctype"
"github.com/google/syzkaller/prog"
)
func (mgr *Manager) hubSyncLoop() {
hc := &HubConnector{
mgr: mgr,
cfg: mgr.cfg,
target: mgr.target,
stats: mgr.stats,
enabledCalls: mgr.checkResult.EnabledCalls,
fresh: mgr.fresh,
hubReproQueue: mgr.hubReproQueue,
}
if mgr.cfg.Reproduce && mgr.dash != nil {
hc.needMoreRepros = mgr.needMoreRepros
}
hc.loop()
}
type HubConnector struct {
mgr HubManagerView
cfg *mgrconfig.Config
target *prog.Target
stats *Stats
enabledCalls []int
fresh bool
hubCorpus map[hash.Sig]bool
newRepros [][]byte
hubReproQueue chan *Crash
needMoreRepros chan chan bool
}
// HubManagerView restricts interface between HubConnector and Manager.
type HubManagerView interface {
getMinimizedCorpus() (corpus, repros [][]byte)
addNewCandidates(progs [][]byte)
}
func (hc *HubConnector) loop() {
var hub *rpctype.RPCClient
for {
time.Sleep(time.Minute)
corpus, repros := hc.mgr.getMinimizedCorpus()
hc.newRepros = append(hc.newRepros, repros...)
if hub == nil {
var err error
if hub, err = hc.connect(corpus); err != nil {
log.Logf(0, "failed to connect to hub at %v: %v", hc.cfg.HubAddr, err)
continue
}
log.Logf(0, "connected to hub at %v, corpus %v", hc.cfg.HubAddr, len(corpus))
}
if err := hc.sync(hub, corpus); err != nil {
log.Logf(0, "hub sync failed: %v", err)
hub.Close()
hub = nil
}
}
}
func (hc *HubConnector) connect(corpus [][]byte) (*rpctype.RPCClient, error) {
a := &rpctype.HubConnectArgs{
Client: hc.cfg.HubClient,
Key: hc.cfg.HubKey,
Manager: hc.cfg.Name,
Fresh: hc.fresh,
}
for _, id := range hc.enabledCalls {
a.Calls = append(a.Calls, hc.target.Syscalls[id].Name)
}
hubCorpus := make(map[hash.Sig]bool)
for _, inp := range corpus {
hubCorpus[hash.Hash(inp)] = true
a.Corpus = append(a.Corpus, inp)
}
// Hub.Connect request can be very large, so do it on a transient connection
// (rpc connection buffers never shrink).
if err := rpctype.RPCCall(hc.cfg.HubAddr, "Hub.Connect", a, nil); err != nil {
return nil, err
}
hub, err := rpctype.NewRPCClient(hc.cfg.HubAddr)
if err != nil {
return nil, err
}
hc.hubCorpus = hubCorpus
hc.fresh = false
return hub, nil
}
func (hc *HubConnector) sync(hub *rpctype.RPCClient, corpus [][]byte) error {
a := &rpctype.HubSyncArgs{
Client: hc.cfg.HubClient,
Key: hc.cfg.HubKey,
Manager: hc.cfg.Name,
}
sigs := make(map[hash.Sig]bool)
for _, inp := range corpus {
sig := hash.Hash(inp)
sigs[sig] = true
if hc.hubCorpus[sig] {
continue
}
hc.hubCorpus[sig] = true
a.Add = append(a.Add, inp)
}
for sig := range hc.hubCorpus {
if sigs[sig] {
continue
}
delete(hc.hubCorpus, sig)
a.Del = append(a.Del, sig.String())
}
if hc.needMoreRepros != nil {
needReproReply := make(chan bool)
hc.needMoreRepros <- needReproReply
a.NeedRepros = <-needReproReply
}
a.Repros = hc.newRepros
for {
r := new(rpctype.HubSyncRes)
if err := hub.Call("Hub.Sync", a, r); err != nil {
return err
}
progDropped := hc.processProgs(r.Progs)
reproDropped := hc.processRepros(r.Repros)
hc.stats.hubSendProgAdd.add(len(a.Add))
hc.stats.hubSendProgDel.add(len(a.Del))
hc.stats.hubSendRepro.add(len(a.Repros))
hc.stats.hubRecvProg.add(len(r.Progs) - progDropped)
hc.stats.hubRecvProgDrop.add(progDropped)
hc.stats.hubRecvRepro.add(len(r.Repros) - reproDropped)
hc.stats.hubRecvReproDrop.add(reproDropped)
log.Logf(0, "hub sync: send: add %v, del %v, repros %v;"+
" recv: progs %v, repros %v; more %v",
len(a.Add), len(a.Del), len(a.Repros),
len(r.Progs)-progDropped, len(r.Repros)-reproDropped, r.More)
a.Add = nil
a.Del = nil
a.Repros = nil
a.NeedRepros = false
hc.newRepros = nil
if len(r.Progs)+r.More == 0 {
return nil
}
}
}
func (hc *HubConnector) processProgs(progs [][]byte) int {
dropped := 0
candidates := make([][]byte, 0, len(progs))
for _, inp := range progs {
if _, err := hc.target.Deserialize(inp); err != nil {
dropped++
continue
}
candidates = append(candidates, inp)
}
hc.mgr.addNewCandidates(candidates)
return dropped
}
func (hc *HubConnector) processRepros(repros [][]byte) int {
dropped := 0
for _, repro := range repros {
if _, err := hc.target.Deserialize(repro); err != nil {
dropped++
continue
}
hc.hubReproQueue <- &Crash{
vmIndex: -1,
hub: true,
Report: &report.Report{
Title: "external repro",
Output: repro,
},
}
}
return dropped
}

View File

@ -53,7 +53,8 @@ type Manager struct {
startTime time.Time
firstConnect time.Time
fuzzingTime time.Duration
stats map[string]uint64
stats *Stats
fuzzerStats map[string]uint64
crashTypes map[string]bool
vmStop chan bool
checkResult *rpctype.CheckArgs
@ -77,8 +78,6 @@ type Manager struct {
newRepros [][]byte
fuzzers map[string]*Fuzzer
hub *rpctype.RPCClient
hubCorpus map[hash.Sig]bool
needMoreRepros chan chan bool
hubReproQueue chan *Crash
reproRequest chan chan map[string]bool
@ -171,7 +170,8 @@ func RunManager(cfg *mgrconfig.Config, target *prog.Target, syscalls map[int]boo
reporter: reporter,
crashdir: crashdir,
startTime: time.Now(),
stats: make(map[string]uint64),
stats: new(Stats),
fuzzerStats: make(map[string]uint64),
crashTypes: make(map[string]bool),
enabledSyscalls: enabledSyscalls,
corpus: make(map[string]rpctype.RPCInput),
@ -220,8 +220,8 @@ func RunManager(cfg *mgrconfig.Config, target *prog.Target, syscalls map[int]boo
continue
}
mgr.fuzzingTime += diff * time.Duration(atomic.LoadUint32(&mgr.numFuzzing))
executed := mgr.stats["exec total"]
crashes := mgr.stats["crashes"]
executed := mgr.stats.execTotal.get()
crashes := mgr.stats.crashes.get()
signal := mgr.corpusSignal.Len()
mgr.mu.Unlock()
numReproducing := atomic.LoadUint32(&mgr.numReproducing)
@ -252,10 +252,13 @@ func RunManager(cfg *mgrconfig.Config, target *prog.Target, syscalls map[int]boo
vals["fuzzing"] = uint64(mgr.fuzzingTime) / 1e9
vals["signal"] = uint64(mgr.corpusSignal.Len())
vals["coverage"] = uint64(len(mgr.corpusCover))
for k, v := range mgr.stats {
for k, v := range mgr.fuzzerStats {
vals[k] = v
}
mgr.mu.Unlock()
for k, v := range mgr.stats.all() {
vals[k] = v
}
data, err := json.MarshalIndent(vals, "", " ")
if err != nil {
@ -272,15 +275,6 @@ func RunManager(cfg *mgrconfig.Config, target *prog.Target, syscalls map[int]boo
go mgr.dashboardReporter()
}
if mgr.cfg.HubClient != "" {
go func() {
for {
time.Sleep(time.Minute)
mgr.hubSync()
}
}()
}
osutil.HandleInterrupts(vm.Shutdown)
if mgr.vmPool == nil {
log.Logf(0, "no VMs started (type=none)")
@ -307,6 +301,8 @@ type ReproResult struct {
hub bool // repro came from hub
}
// Manager needs to be refactored (#605).
// nolint: gocyclo
func (mgr *Manager) vmLoop() {
log.Logf(0, "booting test machines...")
log.Logf(0, "wait for the connection from test machine...")
@ -600,9 +596,7 @@ func (mgr *Manager) emailCrash(crash *Crash) {
func (mgr *Manager) saveCrash(crash *Crash) bool {
if crash.Suppressed {
log.Logf(0, "vm-%v: suppressed crash %v", crash.vmIndex, crash.Title)
mgr.mu.Lock()
mgr.stats["suppressed"]++
mgr.mu.Unlock()
mgr.stats.crashSuppressed.inc()
return false
}
corrupted := ""
@ -614,11 +608,11 @@ func (mgr *Manager) saveCrash(crash *Crash) bool {
log.Logf(0, "failed to symbolize report: %v", err)
}
mgr.stats.crashes.inc()
mgr.mu.Lock()
mgr.stats["crashes"]++
if !mgr.crashTypes[crash.Title] {
mgr.crashTypes[crash.Title] = true
mgr.stats["crash types"]++
mgr.stats.crashTypes.inc()
}
mgr.mu.Unlock()
@ -828,6 +822,36 @@ func saveReproStats(filename string, stats *repro.Stats) {
osutil.WriteFile(filename, []byte(text))
}
func (mgr *Manager) getMinimizedCorpus() (corpus, repros [][]byte) {
mgr.mu.Lock()
defer mgr.mu.Unlock()
mgr.minimizeCorpus()
corpus = make([][]byte, 0, len(mgr.corpus))
for _, inp := range mgr.corpus {
corpus = append(corpus, inp.Prog)
}
repros = mgr.newRepros
mgr.newRepros = nil
return
}
func (mgr *Manager) addNewCandidates(progs [][]byte) {
candidates := make([]rpctype.RPCCandidate, len(progs))
for i, inp := range progs {
candidates[i] = rpctype.RPCCandidate{
Prog: inp,
Minimized: false, // don't trust programs from hub
Smashed: false,
}
}
mgr.mu.Lock()
defer mgr.mu.Unlock()
mgr.candidates = append(mgr.candidates, candidates...)
if mgr.phase == phaseTriagedCorpus {
mgr.phase = phaseQueriedHub
}
}
func (mgr *Manager) minimizeCorpus() {
if mgr.phase < phaseLoadedCorpus {
return
@ -863,10 +887,10 @@ func (mgr *Manager) minimizeCorpus() {
func (mgr *Manager) Connect(a *rpctype.ConnectArgs, r *rpctype.ConnectRes) error {
log.Logf(1, "fuzzer %v connected", a.Name)
mgr.stats.vmRestarts.inc()
mgr.mu.Lock()
defer mgr.mu.Unlock()
mgr.stats["vm restarts"]++
f := &Fuzzer{
name: a.Name,
}
@ -938,7 +962,7 @@ func (mgr *Manager) NewInput(a *rpctype.NewInputArgs, r *int) error {
if mgr.corpusSignal.Diff(inputSignal).Empty() {
return nil
}
mgr.stats["manager new inputs"]++
mgr.stats.newInputs.inc()
mgr.corpusSignal.Merge(inputSignal)
mgr.corpusCover.Merge(a.Cover)
sig := hash.String(a.RPCInput.Prog)
@ -974,7 +998,12 @@ func (mgr *Manager) Poll(a *rpctype.PollArgs, r *rpctype.PollRes) error {
defer mgr.mu.Unlock()
for k, v := range a.Stats {
mgr.stats[k] += v
switch k {
case "exec total":
mgr.stats.execTotal.add(int(v))
default:
mgr.fuzzerStats[k] += v
}
}
f := mgr.fuzzers[a.Name]
@ -1003,16 +1032,6 @@ func (mgr *Manager) Poll(a *rpctype.PollArgs, r *rpctype.PollRes) error {
mgr.candidates[last] = rpctype.RPCCandidate{}
mgr.candidates = mgr.candidates[:last]
}
if len(mgr.candidates) == 0 {
mgr.candidates = nil
if mgr.phase == phaseLoadedCorpus {
if mgr.cfg.HubClient != "" {
mgr.phase = phaseTriagedCorpus
} else {
mgr.phase = phaseTriagedHub
}
}
}
}
if len(r.Candidates) == 0 {
for i := 0; i < maxInputs && len(f.inputs) > 0; i++ {
@ -1025,159 +1044,24 @@ func (mgr *Manager) Poll(a *rpctype.PollArgs, r *rpctype.PollRes) error {
f.inputs = nil
}
}
if len(mgr.candidates) == 0 {
mgr.candidates = nil
if mgr.phase == phaseLoadedCorpus {
if mgr.cfg.HubClient != "" {
mgr.phase = phaseTriagedCorpus
go mgr.hubSyncLoop()
} else {
mgr.phase = phaseTriagedHub
}
} else if mgr.phase == phaseQueriedHub {
mgr.phase = phaseTriagedHub
}
}
log.Logf(4, "poll from %v: candidates=%v inputs=%v maxsignal=%v",
a.Name, len(r.Candidates), len(r.NewInputs), len(r.MaxSignal.Elems))
return nil
}
func (mgr *Manager) hubSync() {
mgr.mu.Lock()
defer mgr.mu.Unlock()
switch mgr.phase {
case phaseInit, phaseLoadedCorpus:
return
case phaseTriagedCorpus:
mgr.phase = phaseQueriedHub
case phaseQueriedHub:
if len(mgr.candidates) == 0 {
mgr.phase = phaseTriagedHub
}
case phaseTriagedHub:
default:
panic("unknown phase")
}
mgr.minimizeCorpus()
if mgr.hub == nil {
a := &rpctype.HubConnectArgs{
Client: mgr.cfg.HubClient,
Key: mgr.cfg.HubKey,
Manager: mgr.cfg.Name,
Fresh: mgr.fresh,
}
for _, id := range mgr.checkResult.EnabledCalls {
a.Calls = append(a.Calls, mgr.target.Syscalls[id].Name)
}
hubCorpus := make(map[hash.Sig]bool)
for _, inp := range mgr.corpus {
hubCorpus[hash.Hash(inp.Prog)] = true
a.Corpus = append(a.Corpus, inp.Prog)
}
mgr.mu.Unlock()
// Hub.Connect request can be very large, so do it on a transient connection
// (rpc connection buffers never shrink).
// Also don't do hub rpc's under the mutex -- hub can be slow or inaccessible.
if err := rpctype.RPCCall(mgr.cfg.HubAddr, "Hub.Connect", a, nil); err != nil {
mgr.mu.Lock()
log.Logf(0, "Hub.Connect rpc failed: %v", err)
return
}
conn, err := rpctype.NewRPCClient(mgr.cfg.HubAddr)
if err != nil {
mgr.mu.Lock()
log.Logf(0, "failed to connect to hub at %v: %v", mgr.cfg.HubAddr, err)
return
}
mgr.mu.Lock()
mgr.hub = conn
mgr.hubCorpus = hubCorpus
mgr.fresh = false
log.Logf(0, "connected to hub at %v, corpus %v", mgr.cfg.HubAddr, len(mgr.corpus))
}
a := &rpctype.HubSyncArgs{
Client: mgr.cfg.HubClient,
Key: mgr.cfg.HubKey,
Manager: mgr.cfg.Name,
}
corpus := make(map[hash.Sig]bool)
for _, inp := range mgr.corpus {
sig := hash.Hash(inp.Prog)
corpus[sig] = true
if mgr.hubCorpus[sig] {
continue
}
mgr.hubCorpus[sig] = true
a.Add = append(a.Add, inp.Prog)
}
for sig := range mgr.hubCorpus {
if corpus[sig] {
continue
}
delete(mgr.hubCorpus, sig)
a.Del = append(a.Del, sig.String())
}
for {
a.Repros = mgr.newRepros
mgr.mu.Unlock()
if mgr.cfg.Reproduce && mgr.dash != nil {
needReproReply := make(chan bool)
mgr.needMoreRepros <- needReproReply
a.NeedRepros = <-needReproReply
}
r := new(rpctype.HubSyncRes)
if err := mgr.hub.Call("Hub.Sync", a, r); err != nil {
mgr.mu.Lock()
log.Logf(0, "Hub.Sync rpc failed: %v", err)
mgr.hub.Close()
mgr.hub = nil
return
}
reproDropped := 0
for _, repro := range r.Repros {
_, err := mgr.target.Deserialize(repro)
if err != nil {
reproDropped++
continue
}
mgr.hubReproQueue <- &Crash{
vmIndex: -1,
hub: true,
Report: &report.Report{
Title: "external repro",
Output: repro,
},
}
}
mgr.mu.Lock()
mgr.newRepros = nil
dropped := 0
for _, inp := range r.Progs {
_, err := mgr.target.Deserialize(inp)
if err != nil {
dropped++
continue
}
mgr.candidates = append(mgr.candidates, rpctype.RPCCandidate{
Prog: inp,
Minimized: false, // don't trust programs from hub
Smashed: false,
})
}
mgr.stats["hub add"] += uint64(len(a.Add))
mgr.stats["hub del"] += uint64(len(a.Del))
mgr.stats["hub drop"] += uint64(dropped)
mgr.stats["hub new"] += uint64(len(r.Progs) - dropped)
mgr.stats["hub sent repros"] += uint64(len(a.Repros))
mgr.stats["hub recv repros"] += uint64(len(r.Repros) - reproDropped)
log.Logf(0, "hub sync: send: add %v, del %v, repros %v; recv: progs: drop %v, new %v,"+
" repros: drop: %v, new %v; more %v",
len(a.Add), len(a.Del), len(a.Repros), dropped, len(r.Progs)-dropped,
reproDropped, len(r.Repros)-reproDropped, r.More)
if len(r.Progs)+r.More == 0 {
break
}
a.Add = nil
a.Del = nil
}
}
func (mgr *Manager) collectUsedFiles() {
if mgr.vmPool == nil {
return
@ -1235,8 +1119,8 @@ func (mgr *Manager) dashboardReporter() {
mgr.mu.Unlock()
continue
}
crashes := mgr.stats["crashes"]
execs := mgr.stats["exec total"]
crashes := mgr.stats.crashes.get()
execs := mgr.stats.execTotal.get()
req := &dashapi.ManagerStatsReq{
Name: mgr.cfg.Name,
Addr: webAddr,

56
syz-manager/stats.go Normal file
View File

@ -0,0 +1,56 @@
// Copyright 2018 syzkaller project authors. All rights reserved.
// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
package main
import (
"sync/atomic"
)
type Stat uint64
type Stats struct {
crashes Stat
crashTypes Stat
crashSuppressed Stat
vmRestarts Stat
newInputs Stat
execTotal Stat
hubSendProgAdd Stat
hubSendProgDel Stat
hubSendRepro Stat
hubRecvProg Stat
hubRecvProgDrop Stat
hubRecvRepro Stat
hubRecvReproDrop Stat
}
func (stats *Stats) all() map[string]uint64 {
return map[string]uint64{
"crashes": stats.crashes.get(),
"crash types": stats.crashTypes.get(),
"suppressed": stats.crashSuppressed.get(),
"vm restarts": stats.vmRestarts.get(),
"manager new inputs": stats.newInputs.get(),
"exec total": stats.execTotal.get(),
"hub: send prog add": stats.hubSendProgAdd.get(),
"hub: send prog del": stats.hubSendProgDel.get(),
"hub: send repro": stats.hubSendRepro.get(),
"hub: recv prog": stats.hubRecvProg.get(),
"hub: recv prog drop": stats.hubRecvProgDrop.get(),
"hub: recv repro": stats.hubRecvRepro.get(),
"hub: recv repro drop": stats.hubRecvReproDrop.get(),
}
}
func (s *Stat) get() uint64 {
return atomic.LoadUint64((*uint64)(s))
}
func (s *Stat) inc() {
s.add(1)
}
func (s *Stat) add(v int) {
atomic.AddUint64((*uint64)(s), uint64(v))
}