RPC: Set a minimum ChannelTimeout per protocol in the rpc system.
Some protocols are slower and can't tolerate a very low channel timeout.
Change-Id: I6db98fb05041059ada0926d7da0f756c6bf4945f
diff --git a/runtime/internal/flow/conn/conn.go b/runtime/internal/flow/conn/conn.go
index b8723a6..157e394 100644
--- a/runtime/internal/flow/conn/conn.go
+++ b/runtime/internal/flow/conn/conn.go
@@ -37,6 +37,16 @@
numPriorities
)
+// minChannelTimeout keeps track of minimum values that we allow for channel
+// timeout on a per-protocol basis. This is to prevent people setting some
+// overall limit that doesn't make sense for very slow protocols.
+// TODO(mattr): We should consider allowing users to set this per-protocol, or
+// perhaps having the protocol implementation expose it via some kind of
+// ChannelOpts interface.
+var minChannelTimeout = map[string]time.Duration{
+ "bt": 10 * time.Second,
+}
+
const (
defaultMtu = 1 << 16
defaultChannelTimeout = 30 * time.Minute
@@ -160,6 +170,10 @@
if channelTimeout == 0 {
channelTimeout = defaultChannelTimeout
}
+ if min := minChannelTimeout[local.Protocol]; channelTimeout < min {
+ channelTimeout = min
+ }
+
// If the conn is being built on an encapsulated flow, we must update the
// cancellation of the flow, to ensure that the conn doesn't get killed
// when the context passed in is cancelled.
@@ -249,6 +263,9 @@
if channelTimeout == 0 {
channelTimeout = defaultChannelTimeout
}
+ if min := minChannelTimeout[local.Protocol]; channelTimeout < min {
+ channelTimeout = min
+ }
c := &Conn{
mp: newMessagePipe(conn),
handler: handler,
@@ -409,6 +426,9 @@
timeout = f.channelTimeout
}
}
+ if min := minChannelTimeout[c.local.Protocol]; timeout < min {
+ timeout = min
+ }
c.hcstate.closeTimer.Reset(timeout)
c.hcstate.closeDeadline = time.Now().Add(timeout)
c.hcstate.requestTimer.Reset(timeout / 2)
@@ -420,6 +440,9 @@
func (c *Conn) healthCheckNewFlowLocked(ctx *context.T, timeout time.Duration) {
if timeout != 0 {
+ if min := minChannelTimeout[c.local.Protocol]; timeout < min {
+ timeout = min
+ }
now := time.Now()
if rd := now.Add(timeout / 2); rd.Before(c.hcstate.requestDeadline) {
c.hcstate.requestDeadline = rd
@@ -432,6 +455,12 @@
}
}
+func (c *Conn) healthCheckCloseDeadline() time.Time {
+ defer c.mu.Unlock()
+ c.mu.Lock()
+ return c.hcstate.closeDeadline
+}
+
// EnterLameDuck enters lame duck mode, the returned channel will be closed when
// the remote end has ack'd or the Conn is closed.
func (c *Conn) EnterLameDuck(ctx *context.T) chan struct{} {
diff --git a/runtime/internal/flow/conn/conn_test.go b/runtime/internal/flow/conn/conn_test.go
index 263f826..01244d4 100644
--- a/runtime/internal/flow/conn/conn_test.go
+++ b/runtime/internal/flow/conn/conn_test.go
@@ -7,6 +7,7 @@
import (
"bytes"
"crypto/rand"
+ "fmt"
"io"
"sync"
"testing"
@@ -98,3 +99,87 @@
t.Errorf("accepted conn's RTT should be non-zero")
}
}
+
+func TestMinChannelTimeout(t *testing.T) {
+ defer goroutines.NoLeaks(t, leakWaitTime)()
+
+ orig := minChannelTimeout
+ defer func() {
+ minChannelTimeout = orig
+ }()
+ minChannelTimeout = map[string]time.Duration{
+ "local": time.Minute,
+ }
+
+ ctx, shutdown := test.V23Init()
+ defer shutdown()
+ dflows, aflows := make(chan flow.Flow, 1), make(chan flow.Flow, 1)
+ dc, ac, derr, aerr := setupConns(t, "local", "", ctx, ctx, dflows, aflows, nil, nil)
+ if derr != nil || aerr != nil {
+ t.Fatal(derr, aerr)
+ }
+ defer dc.Close(ctx, nil)
+ defer ac.Close(ctx, nil)
+
+ if err := deadlineInAbout(dc, defaultChannelTimeout); err != nil {
+ t.Error(err)
+ }
+ if err := deadlineInAbout(ac, defaultChannelTimeout); err != nil {
+ t.Error(err)
+ }
+
+ df, af := oneFlow(t, ctx, dc, aflows, 0)
+ if err := deadlineInAbout(dc, defaultChannelTimeout); err != nil {
+ t.Error(err)
+ }
+ if err := deadlineInAbout(ac, defaultChannelTimeout); err != nil {
+ t.Error(err)
+ }
+ df.Close()
+ af.Close()
+
+ df, af = oneFlow(t, ctx, dc, aflows, 10*time.Minute)
+ if err := deadlineInAbout(dc, 10*time.Minute); err != nil {
+ t.Error(err)
+ }
+ if err := deadlineInAbout(ac, defaultChannelTimeout); err != nil {
+ t.Error(err)
+ }
+ df.Close()
+ af.Close()
+
+ // Here the min timeout will come into play.
+ df2, af2 := oneFlow(t, ctx, dc, aflows, time.Second)
+ if err := deadlineInAbout(dc, time.Minute); err != nil {
+ t.Error(err)
+ }
+ if err := deadlineInAbout(ac, defaultChannelTimeout); err != nil {
+ t.Error(err)
+ }
+ df2.Close()
+ af2.Close()
+
+ // Setup new conns with a default channel timeout below the min.
+ dc, ac, derr, aerr = setupConnsWithTimeout(t, "local", "", ctx, ctx, dflows, aflows, nil, nil, time.Second)
+ if derr != nil || aerr != nil {
+ t.Fatal(derr, aerr)
+ }
+ defer dc.Close(ctx, nil)
+ defer ac.Close(ctx, nil)
+ // They should both start with the min value.
+ if err := deadlineInAbout(dc, time.Minute); err != nil {
+ t.Error(err)
+ }
+ if err := deadlineInAbout(ac, time.Minute); err != nil {
+ t.Error(err)
+ }
+}
+
+func deadlineInAbout(c *Conn, d time.Duration) error {
+ const slop = 5 * time.Second
+ delta := c.healthCheckCloseDeadline().Sub(time.Now())
+ if delta > d-slop && delta < d+slop {
+ return nil
+ }
+ return fmt.Errorf("got %v want %v (+-5s)", delta, d)
+}
diff --git a/runtime/internal/flow/conn/util_test.go b/runtime/internal/flow/conn/util_test.go
index 314c8dd..d026528 100644
--- a/runtime/internal/flow/conn/util_test.go
+++ b/runtime/internal/flow/conn/util_test.go
@@ -6,6 +6,7 @@
import (
"errors"
+ "reflect"
"testing"
"time"
@@ -34,16 +35,19 @@
dctx, actx *context.T,
dflows, aflows chan<- flow.Flow,
dAuth, aAuth []security.BlessingPattern) (dialed, accepted *Conn, derr, aerr error) {
+ return setupConnsWithTimeout(t, network, address, dctx, actx, dflows, aflows, dAuth, aAuth, 0)
+}
+
+func setupConnsWithTimeout(t *testing.T,
+ network, address string,
+ dctx, actx *context.T,
+ dflows, aflows chan<- flow.Flow,
+ dAuth, aAuth []security.BlessingPattern,
+ channelTimeout time.Duration) (dialed, accepted *Conn, derr, aerr error) {
dmrw, amrw := flowtest.Pipe(t, actx, network, address)
versions := version.RPCVersionRange{Min: 3, Max: 5}
- ridep, err := naming.ParseEndpoint("@6@@batman.com:1234@@000000000000000000000000dabbad00@m@@@")
- if err != nil {
- t.Fatalf("Unexpected error: %v", err)
- }
- ep, err := naming.ParseEndpoint("localhost:80")
- if err != nil {
- t.Fatalf("Unexpected error: %v", err)
- }
+ ridep := naming.Endpoint{Protocol: network, Address: address, RoutingID: naming.FixedRoutingID(191341)}
+ ep := naming.Endpoint{Protocol: network, Address: address}
dch := make(chan *Conn)
ach := make(chan *Conn)
derrch := make(chan error)
@@ -56,7 +60,7 @@
dep = ridep
}
dBlessings, _ := v23.GetPrincipal(dctx).BlessingStore().Default()
- d, _, _, err := NewDialed(dctx, dmrw, dep, ep, versions, peerAuthorizer{dBlessings, dAuth}, time.Minute, 0, handler)
+ d, _, _, err := NewDialed(dctx, dmrw, dep, ep, versions, peerAuthorizer{dBlessings, dAuth}, time.Minute, channelTimeout, handler)
dch <- d
derrch <- err
}()
@@ -65,7 +69,7 @@
if aflows != nil {
handler = fh(aflows)
}
- a, err := NewAccepted(actx, aAuth, amrw, ridep, versions, time.Minute, 0, handler)
+ a, err := NewAccepted(actx, aAuth, amrw, ridep, versions, time.Minute, channelTimeout, handler)
ach <- a
aerrch <- err
}()
@@ -98,6 +102,22 @@
return dialed, aflows, d, a
}
+func oneFlow(t *testing.T, ctx *context.T, dc *Conn, aflows <-chan flow.Flow, channelTimeout time.Duration) (df, af flow.Flow) {
+ df, err := dc.Dial(ctx, dc.LocalBlessings(), nil, naming.Endpoint{}, channelTimeout, false)
+ if err != nil {
+ t.Fatalf("Unexpected error: %v", err)
+ }
+ b := []byte{0}
+ if _, err := df.WriteMsg(b); err != nil {
+ t.Fatalf("Couldn't write.")
+ }
+ af = <-aflows
+ if got, err := af.ReadMsg(); err != nil || !reflect.DeepEqual(b, got) {
+ t.Fatalf("Couldn't read %v %v", got, err)
+ }
+ return df, af
+}
+
type peerAuthorizer struct {
// blessings are the blessings presented to all peers.
blessings security.Blessings