RPC: Set a minimum ChannelTimeout per protocol in the rpc system.

Some protocols are slower and can't tolerate a very low channel timeout.

Change-Id: I6db98fb05041059ada0926d7da0f756c6bf4945f
diff --git a/runtime/internal/flow/conn/conn.go b/runtime/internal/flow/conn/conn.go
index b8723a6..157e394 100644
--- a/runtime/internal/flow/conn/conn.go
+++ b/runtime/internal/flow/conn/conn.go
@@ -37,6 +37,16 @@
 	numPriorities
 )
 
+// minChannelTimeout keeps track of minimum values that we allow for channel
+// timeout on a per-protocol basis.  This is to prevent people setting some
+// overall limit that doesn't make sense for very slow protocols.
+// TODO(mattr): We should consider allowing users to set this per-protocol, or
+// perhaps having the protocol implementation expose it via some kind of
+// ChannelOpts interface.
+var minChannelTimeout = map[string]time.Duration{
+	"bt": 10 * time.Second,
+}
+
 const (
 	defaultMtu                  = 1 << 16
 	defaultChannelTimeout       = 30 * time.Minute
@@ -160,6 +170,10 @@
 	if channelTimeout == 0 {
 		channelTimeout = defaultChannelTimeout
 	}
+	if min := minChannelTimeout[local.Protocol]; channelTimeout < min {
+		channelTimeout = min
+	}
+
 	// If the conn is being built on an encapsulated flow, we must update the
 	// cancellation of the flow, to ensure that the conn doesn't get killed
 	// when the context passed in is cancelled.
@@ -249,6 +263,9 @@
 	if channelTimeout == 0 {
 		channelTimeout = defaultChannelTimeout
 	}
+	if min := minChannelTimeout[local.Protocol]; channelTimeout < min {
+		channelTimeout = min
+	}
 	c := &Conn{
 		mp:                   newMessagePipe(conn),
 		handler:              handler,
@@ -409,6 +426,9 @@
 				timeout = f.channelTimeout
 			}
 		}
+		if min := minChannelTimeout[c.local.Protocol]; timeout < min {
+			timeout = min
+		}
 		c.hcstate.closeTimer.Reset(timeout)
 		c.hcstate.closeDeadline = time.Now().Add(timeout)
 		c.hcstate.requestTimer.Reset(timeout / 2)
@@ -420,6 +440,9 @@
 
 func (c *Conn) healthCheckNewFlowLocked(ctx *context.T, timeout time.Duration) {
 	if timeout != 0 {
+		if min := minChannelTimeout[c.local.Protocol]; timeout < min {
+			timeout = min
+		}
 		now := time.Now()
 		if rd := now.Add(timeout / 2); rd.Before(c.hcstate.requestDeadline) {
 			c.hcstate.requestDeadline = rd
@@ -432,6 +455,12 @@
 	}
 }
 
+func (c *Conn) healthCheckCloseDeadline() time.Time {
+	defer c.mu.Unlock()
+	c.mu.Lock()
+	return c.hcstate.closeDeadline
+}
+
 // EnterLameDuck enters lame duck mode, the returned channel will be closed when
 // the remote end has ack'd or the Conn is closed.
 func (c *Conn) EnterLameDuck(ctx *context.T) chan struct{} {
diff --git a/runtime/internal/flow/conn/conn_test.go b/runtime/internal/flow/conn/conn_test.go
index 263f826..01244d4 100644
--- a/runtime/internal/flow/conn/conn_test.go
+++ b/runtime/internal/flow/conn/conn_test.go
@@ -7,6 +7,7 @@
 import (
 	"bytes"
 	"crypto/rand"
+	"fmt"
 	"io"
 	"sync"
 	"testing"
@@ -98,3 +99,87 @@
 		t.Errorf("accepted conn's RTT should be non-zero")
 	}
 }
+
+func TestMinChannelTimeout(t *testing.T) {
+	defer goroutines.NoLeaks(t, leakWaitTime)()
+
+	orig := minChannelTimeout
+	defer func() {
+		minChannelTimeout = orig
+	}()
+	minChannelTimeout = map[string]time.Duration{
+		"local": time.Minute,
+	}
+
+	ctx, shutdown := test.V23Init()
+	defer shutdown()
+	dflows, aflows := make(chan flow.Flow, 1), make(chan flow.Flow, 1)
+	dc, ac, derr, aerr := setupConns(t, "local", "", ctx, ctx, dflows, aflows, nil, nil)
+	if derr != nil || aerr != nil {
+		t.Fatal(derr, aerr)
+	}
+	defer dc.Close(ctx, nil)
+	defer ac.Close(ctx, nil)
+
+	if err := deadlineInAbout(dc, defaultChannelTimeout); err != nil {
+		t.Error(err)
+	}
+	if err := deadlineInAbout(ac, defaultChannelTimeout); err != nil {
+		t.Error(err)
+	}
+
+	df, af := oneFlow(t, ctx, dc, aflows, 0)
+	if err := deadlineInAbout(dc, defaultChannelTimeout); err != nil {
+		t.Error(err)
+	}
+	if err := deadlineInAbout(ac, defaultChannelTimeout); err != nil {
+		t.Error(err)
+	}
+	df.Close()
+	af.Close()
+
+	df, af = oneFlow(t, ctx, dc, aflows, 10*time.Minute)
+	if err := deadlineInAbout(dc, 10*time.Minute); err != nil {
+		t.Error(err)
+	}
+	if err := deadlineInAbout(ac, defaultChannelTimeout); err != nil {
+		t.Error(err)
+	}
+	df.Close()
+	af.Close()
+
+	// Here the min timeout will come into play.
+	df2, af2 := oneFlow(t, ctx, dc, aflows, time.Second)
+	if err := deadlineInAbout(dc, time.Minute); err != nil {
+		t.Error(err)
+	}
+	if err := deadlineInAbout(ac, defaultChannelTimeout); err != nil {
+		t.Error(err)
+	}
+	df2.Close()
+	af2.Close()
+
+	// Setup new conns with a default channel timeout below the min.
+	dc, ac, derr, aerr = setupConnsWithTimeout(t, "local", "", ctx, ctx, dflows, aflows, nil, nil, time.Second)
+	if derr != nil || aerr != nil {
+		t.Fatal(derr, aerr)
+	}
+	defer dc.Close(ctx, nil)
+	defer ac.Close(ctx, nil)
+	// They should both start with the min value.
+	if err := deadlineInAbout(dc, time.Minute); err != nil {
+		t.Error(err)
+	}
+	if err := deadlineInAbout(ac, time.Minute); err != nil {
+		t.Error(err)
+	}
+}
+
+func deadlineInAbout(c *Conn, d time.Duration) error {
+	const slop = 5 * time.Second
+	delta := c.healthCheckCloseDeadline().Sub(time.Now())
+	if delta > d-slop && delta < d+slop {
+		return nil
+	}
+	return fmt.Errorf("got %v want %v (+-5s)", delta, d)
+}
diff --git a/runtime/internal/flow/conn/util_test.go b/runtime/internal/flow/conn/util_test.go
index 314c8dd..d026528 100644
--- a/runtime/internal/flow/conn/util_test.go
+++ b/runtime/internal/flow/conn/util_test.go
@@ -6,6 +6,7 @@
 
 import (
 	"errors"
+	"reflect"
 	"testing"
 	"time"
 
@@ -34,16 +35,19 @@
 	dctx, actx *context.T,
 	dflows, aflows chan<- flow.Flow,
 	dAuth, aAuth []security.BlessingPattern) (dialed, accepted *Conn, derr, aerr error) {
+	return setupConnsWithTimeout(t, network, address, dctx, actx, dflows, aflows, dAuth, aAuth, 0)
+}
+
+func setupConnsWithTimeout(t *testing.T,
+	network, address string,
+	dctx, actx *context.T,
+	dflows, aflows chan<- flow.Flow,
+	dAuth, aAuth []security.BlessingPattern,
+	channelTimeout time.Duration) (dialed, accepted *Conn, derr, aerr error) {
 	dmrw, amrw := flowtest.Pipe(t, actx, network, address)
 	versions := version.RPCVersionRange{Min: 3, Max: 5}
-	ridep, err := naming.ParseEndpoint("@6@@batman.com:1234@@000000000000000000000000dabbad00@m@@@")
-	if err != nil {
-		t.Fatalf("Unexpected error: %v", err)
-	}
-	ep, err := naming.ParseEndpoint("localhost:80")
-	if err != nil {
-		t.Fatalf("Unexpected error: %v", err)
-	}
+	ridep := naming.Endpoint{Protocol: network, Address: address, RoutingID: naming.FixedRoutingID(191341)}
+	ep := naming.Endpoint{Protocol: network, Address: address}
 	dch := make(chan *Conn)
 	ach := make(chan *Conn)
 	derrch := make(chan error)
@@ -56,7 +60,7 @@
 			dep = ridep
 		}
 		dBlessings, _ := v23.GetPrincipal(dctx).BlessingStore().Default()
-		d, _, _, err := NewDialed(dctx, dmrw, dep, ep, versions, peerAuthorizer{dBlessings, dAuth}, time.Minute, 0, handler)
+		d, _, _, err := NewDialed(dctx, dmrw, dep, ep, versions, peerAuthorizer{dBlessings, dAuth}, time.Minute, channelTimeout, handler)
 		dch <- d
 		derrch <- err
 	}()
@@ -65,7 +69,7 @@
 		if aflows != nil {
 			handler = fh(aflows)
 		}
-		a, err := NewAccepted(actx, aAuth, amrw, ridep, versions, time.Minute, 0, handler)
+		a, err := NewAccepted(actx, aAuth, amrw, ridep, versions, time.Minute, channelTimeout, handler)
 		ach <- a
 		aerrch <- err
 	}()
@@ -98,6 +102,22 @@
 	return dialed, aflows, d, a
 }
 
+func oneFlow(t *testing.T, ctx *context.T, dc *Conn, aflows <-chan flow.Flow, channelTimeout time.Duration) (df, af flow.Flow) {
+	df, err := dc.Dial(ctx, dc.LocalBlessings(), nil, naming.Endpoint{}, channelTimeout, false)
+	if err != nil {
+		t.Fatalf("Unexpected error: %v", err)
+	}
+	b := []byte{0}
+	if _, err := df.WriteMsg(b); err != nil {
+		t.Fatalf("Couldn't write.")
+	}
+	af = <-aflows
+	if got, err := af.ReadMsg(); err != nil || !reflect.DeepEqual(b, got) {
+		t.Fatalf("Couldn't read %v %v", got, err)
+	}
+	return df, af
+}
+
 type peerAuthorizer struct {
 	// blessings are the blessings presented to all peers.
 	blessings security.Blessings