manager, fuzzer, hub: move common rpc code into rpctype

If hub hangs, it causes all managers to hang as well as they call
hub under the global mutex. So move common rpc code into rpctype
and make it more careful about timeouts (tcp keepalives, call timeouts).
Also don't call hub under the mutex, the call can be slow.
This commit is contained in:
Dmitry Vyukov 2017-01-30 15:15:37 +01:00
parent 8b2c1cb5bb
commit 1f0546f0da
5 changed files with 138 additions and 62 deletions

88
rpctype/rpc.go Normal file
View File

@ -0,0 +1,88 @@
// Copyright 2017 syzkaller project authors. All rights reserved.
// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
package rpctype
import (
"fmt"
"net"
"net/rpc"
"time"
. "github.com/google/syzkaller/log"
)
type RpcServer struct {
ln net.Listener
s *rpc.Server
}
func NewRpcServer(addr string, receiver interface{}) (*RpcServer, error) {
ln, err := net.Listen("tcp", addr)
if err != nil {
return nil, fmt.Errorf("failed to listen on %v: %v", addr, err)
}
s := rpc.NewServer()
s.Register(receiver)
serv := &RpcServer{
ln: ln,
s: s,
}
return serv, nil
}
func (serv *RpcServer) Serve() {
for {
conn, err := serv.ln.Accept()
if err != nil {
Logf(0, "failed to accept an rpc connection: %v", err)
continue
}
conn.(*net.TCPConn).SetKeepAlive(true)
conn.(*net.TCPConn).SetKeepAlivePeriod(time.Minute)
go serv.s.ServeConn(conn)
}
}
func (serv *RpcServer) Addr() net.Addr {
return serv.ln.Addr()
}
type RpcClient struct {
conn net.Conn
c *rpc.Client
}
func NewRpcClient(addr string) (*RpcClient, error) {
conn, err := net.DialTimeout("tcp", addr, 15*time.Second)
if err != nil {
return nil, err
}
conn.(*net.TCPConn).SetKeepAlive(true)
conn.(*net.TCPConn).SetKeepAlivePeriod(time.Minute)
cli := &RpcClient{
conn: conn,
c: rpc.NewClient(conn),
}
return cli, nil
}
func (cli *RpcClient) Call(method string, args, reply interface{}) error {
cli.conn.SetDeadline(time.Now().Add(30 * time.Second))
err := cli.c.Call(method, args, reply)
cli.conn.SetDeadline(time.Time{})
return err
}
func (cli *RpcClient) Close() {
cli.c.Close()
}
func RpcCall(addr, method string, args, reply interface{}) error {
c, err := NewRpcClient(addr)
if err != nil {
return err
}
defer c.Close()
return c.Call(method, args, reply)
}

View File

@ -24,6 +24,7 @@ type ConnectArgs struct {
type ConnectRes struct {
Prios [][]float32
Inputs []RpcInput
MaxSignal []uint32
Candidates []RpcCandidate
EnabledCalls string

View File

@ -14,8 +14,6 @@ import (
"math/rand"
"net/http"
_ "net/http/pprof"
"net/rpc"
"net/rpc/jsonrpc"
"os"
"os/signal"
"runtime"
@ -64,7 +62,7 @@ type Candidate struct {
}
var (
manager *rpc.Client
manager *RpcClient
signalMu sync.RWMutex
corpusSignal map[uint32]struct{}
@ -128,18 +126,16 @@ func main() {
corpusHashes = make(map[hash.Sig]struct{})
Logf(0, "dialing manager at %v", *flagManager)
conn, err := jsonrpc.Dial("tcp", *flagManager)
if err != nil {
panic(err)
}
manager = conn
a := &ConnectArgs{*flagName}
r := &ConnectRes{}
if err := manager.Call("Manager.Connect", a, r); err != nil {
if err := RpcCall(*flagManager, "Manager.Connect", a, r); err != nil {
panic(err)
}
calls := buildCallList(r.EnabledCalls)
ct := prog.BuildChoiceTable(r.Prios, calls)
for _, inp := range r.Inputs {
addInput(inp)
}
for _, s := range r.MaxSignal {
maxSignal[s] = struct{}{}
}
@ -168,11 +164,21 @@ func main() {
for c := range calls {
a.Calls = append(a.Calls, c.Name)
}
if err := manager.Call("Manager.Check", a, nil); err != nil {
if err := RpcCall(*flagManager, "Manager.Check", a, nil); err != nil {
panic(err)
}
}
// Manager.Connect reply can ve very large and that memory will be permanently cached in the connection.
// So we do the call on a transient connection, free all memory and reconnect.
// The rest of rpc requests have bounded size.
debug.FreeOSMemory()
if conn, err := NewRpcClient(*flagManager); err != nil {
panic(err)
} else {
manager = conn
}
kmemleakInit()
flags, timeout, err := ipc.DefaultFlags()

View File

@ -8,10 +8,7 @@ import (
"flag"
"fmt"
"io/ioutil"
"net"
"net/rpc"
"sync"
"time"
. "github.com/google/syzkaller/log"
. "github.com/google/syzkaller/rpctype"
@ -59,23 +56,12 @@ func main() {
hub.initHttp(cfg.Http)
ln, err := net.Listen("tcp", cfg.Rpc)
s, err := NewRpcServer(cfg.Rpc, hub)
if err != nil {
Fatalf("failed to listen on %v: %v", cfg.Rpc, err)
}
Logf(0, "serving rpc on tcp://%v", ln.Addr())
s := rpc.NewServer()
s.Register(hub)
for {
conn, err := ln.Accept()
if err != nil {
Logf(0, "failed to accept an rpc connection: %v", err)
continue
}
conn.(*net.TCPConn).SetKeepAlive(true)
conn.(*net.TCPConn).SetKeepAlivePeriod(time.Minute)
go s.ServeConn(conn)
Fatalf("failed to create rpc server: %v", err)
}
Logf(0, "serving rpc on tcp://%v", s.Addr())
s.Serve()
}
func (hub *Hub) Connect(a *HubConnectArgs, r *int) error {

View File

@ -11,8 +11,6 @@ import (
"io/ioutil"
"math/rand"
"net"
"net/rpc"
"net/rpc/jsonrpc"
"os"
"os/signal"
"path/filepath"
@ -73,7 +71,7 @@ type Manager struct {
prios [][]float32
fuzzers map[string]*Fuzzer
hub *rpc.Client
hub *RpcClient
hubCorpus map[hash.Sig]bool
}
@ -200,26 +198,13 @@ func RunManager(cfg *config.Config, syscalls map[int]bool) {
mgr.initHttp()
// Create RPC server for fuzzers.
ln, err := net.Listen("tcp", cfg.Rpc)
s, err := NewRpcServer(cfg.Rpc, mgr)
if err != nil {
Fatalf("failed to listen on %v: %v", cfg.Rpc, err)
Fatalf("failed to create rpc server: %v", err)
}
Logf(0, "serving rpc on tcp://%v", ln.Addr())
mgr.port = ln.Addr().(*net.TCPAddr).Port
s := rpc.NewServer()
s.Register(mgr)
go func() {
for {
conn, err := ln.Accept()
if err != nil {
Logf(0, "failed to accept an rpc connection: %v", err)
continue
}
conn.(*net.TCPConn).SetKeepAlive(true)
conn.(*net.TCPConn).SetKeepAlivePeriod(time.Minute)
go s.ServeCodec(jsonrpc.NewServerCodec(conn))
}
}()
Logf(0, "serving rpc on tcp://%v", s.Addr())
mgr.port = s.Addr().(*net.TCPAddr).Port
go s.Serve()
go func() {
for lastTime := time.Now(); ; {
@ -645,8 +630,10 @@ func (mgr *Manager) Connect(a *ConnectArgs, r *ConnectRes) error {
}
mgr.fuzzers[a.Name] = f
mgr.minimizeCorpus()
f.inputs = nil
for _, inp := range mgr.corpus {
f.inputs = append(f.inputs, inp)
r.Inputs = append(r.Inputs, inp)
}
r.Prios = mgr.prios
r.EnabledCalls = mgr.enabledSyscalls
@ -785,30 +772,35 @@ func (mgr *Manager) hubSync() {
mgr.minimizeCorpus()
if mgr.hub == nil {
conn, err := rpc.Dial("tcp", mgr.cfg.Hub_Addr)
if err != nil {
Logf(0, "failed to connect to hub at %v: %v", mgr.cfg.Hub_Addr, err)
return
}
mgr.hub = conn
a := &HubConnectArgs{
Name: mgr.cfg.Name,
Key: mgr.cfg.Hub_Key,
Fresh: mgr.fresh,
Calls: mgr.enabledCalls,
}
mgr.hubCorpus = make(map[hash.Sig]bool)
hubCorpus := make(map[hash.Sig]bool)
for _, inp := range mgr.corpus {
mgr.hubCorpus[hash.Hash(inp.Prog)] = true
hubCorpus[hash.Hash(inp.Prog)] = true
a.Corpus = append(a.Corpus, inp.Prog)
}
if err := mgr.hub.Call("Hub.Connect", a, nil); err != nil {
mgr.mu.Unlock()
// Hub.Connect request can be very large, so do it on a transient connection
// (rpc connection buffers never shrink).
// Also don't do hub rpc's under the mutex -- hub can be slow or inaccessible.
if err := RpcCall(mgr.cfg.Hub_Addr, "Hub.Connect", a, nil); err != nil {
mgr.mu.Lock()
Logf(0, "Hub.Connect rpc failed: %v", err)
mgr.hub.Close()
mgr.hub = nil
mgr.hubCorpus = nil
return
}
conn, err := NewRpcClient(mgr.cfg.Hub_Addr)
if err != nil {
mgr.mu.Lock()
Logf(0, "failed to connect to hub at %v: %v", mgr.cfg.Hub_Addr, err)
return
}
mgr.mu.Lock()
mgr.hub = conn
mgr.hubCorpus = hubCorpus
mgr.fresh = false
Logf(0, "connected to hub at %v, corpus %v", mgr.cfg.Hub_Addr, len(mgr.corpus))
}
@ -834,13 +826,16 @@ func (mgr *Manager) hubSync() {
delete(mgr.hubCorpus, sig)
a.Del = append(a.Del, sig.String())
}
mgr.mu.Unlock()
r := new(HubSyncRes)
if err := mgr.hub.Call("Hub.Sync", a, r); err != nil {
mgr.mu.Lock()
Logf(0, "Hub.Sync rpc failed: %v", err)
mgr.hub.Close()
mgr.hub = nil
return
}
mgr.mu.Lock()
dropped := 0
for _, inp := range r.Inputs {
_, err := prog.Deserialize(inp)