syz-hub: send new inputs in smaller parts

Currently hub sends all inputs on first manager connect.
This can be 100K+ inputs and can take long time
and consume tons of memory. Send inputs in 1K parts.

Also increase rpc timeouts as hub still has global mutex.
This commit is contained in:
Dmitry Vyukov 2017-05-23 15:20:09 +02:00
parent 38b947b94f
commit 7e458d6416
6 changed files with 85 additions and 42 deletions

View File

@ -54,7 +54,7 @@ type RpcClient struct {
}
func NewRpcClient(addr string) (*RpcClient, error) {
conn, err := net.DialTimeout("tcp", addr, 30*time.Second)
conn, err := net.DialTimeout("tcp", addr, 60*time.Second)
if err != nil {
return nil, err
}
@ -68,7 +68,7 @@ func NewRpcClient(addr string) (*RpcClient, error) {
}
func (cli *RpcClient) Call(method string, args, reply interface{}) error {
cli.conn.SetDeadline(time.Now().Add(60 * time.Second))
cli.conn.SetDeadline(time.Now().Add(5 * 60 * time.Second))
err := cli.c.Call(method, args, reply)
cli.conn.SetDeadline(time.Time{})
return err

View File

@ -71,4 +71,5 @@ type HubSyncArgs struct {
type HubSyncRes struct {
Inputs [][]byte
More int
}

View File

@ -88,13 +88,14 @@ func (hub *Hub) Sync(a *HubSyncArgs, r *HubSyncRes) error {
hub.mu.Lock()
defer hub.mu.Unlock()
inputs, err := hub.st.Sync(a.Name, a.Add, a.Del)
inputs, more, err := hub.st.Sync(a.Name, a.Add, a.Del)
if err != nil {
Logf(0, "sync error: %v", err)
return err
}
r.Inputs = inputs
Logf(0, "sync from %v: add=%v del=%v new=%v", a.Name, len(a.Add), len(a.Del), len(inputs))
r.More = more
Logf(0, "sync from %v: add=%v del=%v new=%v pending=%v", a.Name, len(a.Add), len(a.Del), len(inputs), more)
return nil
}

View File

@ -8,6 +8,7 @@ import (
"io/ioutil"
"os"
"path/filepath"
"sort"
"strconv"
"time"
@ -136,10 +137,10 @@ func (st *State) Connect(name string, fresh bool, calls []string, corpus [][]byt
return nil
}
func (st *State) Sync(name string, add [][]byte, del []string) ([][]byte, error) {
func (st *State) Sync(name string, add [][]byte, del []string) ([][]byte, int, error) {
mgr := st.Managers[name]
if mgr == nil || mgr.Connected.IsZero() {
return nil, fmt.Errorf("unconnected manager %v", name)
return nil, 0, fmt.Errorf("unconnected manager %v", name)
}
if len(del) != 0 {
for _, sig := range del {
@ -151,20 +152,20 @@ func (st *State) Sync(name string, add [][]byte, del []string) ([][]byte, error)
st.purgeCorpus()
}
st.addInputs(mgr, add)
inputs, err := st.pendingInputs(mgr)
inputs, more, err := st.pendingInputs(mgr)
mgr.Added += len(add)
mgr.Deleted += len(del)
mgr.New += len(inputs)
return inputs, err
return inputs, more, err
}
func (st *State) pendingInputs(mgr *Manager) ([][]byte, error) {
func (st *State) pendingInputs(mgr *Manager) ([][]byte, int, error) {
if mgr.seq == st.seq {
return nil, nil
return nil, 0, nil
}
var inputs [][]byte
var records []db.Record
for key, rec := range st.Corpus.Records {
if mgr.seq > rec.Seq {
if mgr.seq >= rec.Seq {
continue
}
if _, ok := mgr.Corpus.Records[key]; ok {
@ -172,16 +173,35 @@ func (st *State) pendingInputs(mgr *Manager) ([][]byte, error) {
}
calls, err := prog.CallSet(rec.Val)
if err != nil {
return nil, fmt.Errorf("failed to extract call set: %v\nprogram: %s", err, rec.Val)
return nil, 0, fmt.Errorf("failed to extract call set: %v\nprogram: %s", err, rec.Val)
}
if !managerSupportsAllCalls(mgr.Calls, calls) {
continue
}
inputs = append(inputs, rec.Val)
records = append(records, rec)
}
mgr.seq = st.seq
maxSeq := st.seq
more := 0
// Send at most that many records (rounded up to next seq number).
const maxRecords = 1000
if len(records) > maxRecords {
sort.Sort(recordSeqSorter(records))
pos := maxRecords
maxSeq = records[pos].Seq
for pos+1 < len(records) && records[pos+1].Seq == maxSeq {
pos++
}
pos++
more = len(records) - pos
records = records[:pos]
}
inputs := make([][]byte, len(records))
for i, rec := range records {
inputs[i] = rec.Val
}
mgr.seq = maxSeq
writeFile(filepath.Join(mgr.dir, "seq"), []byte(fmt.Sprint(mgr.seq)))
return inputs, nil
return inputs, more, nil
}
func (st *State) addInputs(mgr *Manager, inputs [][]byte) {
@ -244,3 +264,17 @@ func managerSupportsAllCalls(mgr, prog map[string]struct{}) bool {
}
return true
}
type recordSeqSorter []db.Record
func (a recordSeqSorter) Len() int {
return len(a)
}
func (a recordSeqSorter) Less(i, j int) bool {
return a[i].Seq < a[j].Seq
}
func (a recordSeqSorter) Swap(i, j int) {
a[i], a[j] = a[j], a[i]
}

View File

@ -20,7 +20,7 @@ func TestState(t *testing.T) {
if err != nil {
t.Fatalf("failed to make state: %v", err)
}
_, err = st.Sync("foo", nil, nil)
_, _, err = st.Sync("foo", nil, nil)
if err == nil {
t.Fatalf("synced with unconnected manager")
}

View File

@ -901,31 +901,38 @@ func (mgr *Manager) hubSync() {
delete(mgr.hubCorpus, sig)
a.Del = append(a.Del, sig.String())
}
mgr.mu.Unlock()
r := new(HubSyncRes)
if err := mgr.hub.Call("Hub.Sync", a, r); err != nil {
mgr.mu.Lock()
Logf(0, "Hub.Sync rpc failed: %v", err)
mgr.hub.Close()
mgr.hub = nil
return
}
mgr.mu.Lock()
dropped := 0
for _, inp := range r.Inputs {
_, err := prog.Deserialize(inp)
if err != nil {
dropped++
continue
for {
mgr.mu.Unlock()
r := new(HubSyncRes)
if err := mgr.hub.Call("Hub.Sync", a, r); err != nil {
mgr.mu.Lock()
Logf(0, "Hub.Sync rpc failed: %v", err)
mgr.hub.Close()
mgr.hub = nil
return
}
mgr.candidates = append(mgr.candidates, RpcCandidate{
Prog: inp,
Minimized: false, // don't trust programs from hub
})
mgr.mu.Lock()
dropped := 0
for _, inp := range r.Inputs {
_, err := prog.Deserialize(inp)
if err != nil {
dropped++
continue
}
mgr.candidates = append(mgr.candidates, RpcCandidate{
Prog: inp,
Minimized: false, // don't trust programs from hub
})
}
mgr.stats["hub add"] += uint64(len(a.Add))
mgr.stats["hub del"] += uint64(len(a.Del))
mgr.stats["hub drop"] += uint64(dropped)
mgr.stats["hub new"] += uint64(len(r.Inputs) - dropped)
Logf(0, "hub sync: add %v, del %v, drop %v, new %v, more %v", len(a.Add), len(a.Del), dropped, len(r.Inputs)-dropped, r.More)
if len(r.Inputs)+r.More == 0 {
break
}
a.Add = nil
a.Del = nil
}
mgr.stats["hub add"] += uint64(len(a.Add))
mgr.stats["hub del"] += uint64(len(a.Del))
mgr.stats["hub drop"] += uint64(dropped)
mgr.stats["hub new"] += uint64(len(r.Inputs) - dropped)
Logf(0, "hub sync: add %v, del %v, drop %v, new %v", len(a.Add), len(a.Del), dropped, len(r.Inputs)-dropped)
}