ref: Introduce client.PinConnection.

PinConnection will pin the connection to the manager's cache, reconnecting
to the remote end as if the connection breaks.

MultiPart: 2/2

Change-Id: Ie91717dda96a0bf6d98ca3bbb50cba5fe904df90
diff --git a/runtime/internal/flow/conn/auth_test.go b/runtime/internal/flow/conn/auth_test.go
index 6bbe808..43df1cd 100644
--- a/runtime/internal/flow/conn/auth_test.go
+++ b/runtime/internal/flow/conn/auth_test.go
@@ -139,7 +139,7 @@
 func (fc *fakeDischargeClient) StartCall(*context.T, string, string, []interface{}, ...rpc.CallOpt) (rpc.ClientCall, error) {
 	return nil, nil
 }
-func (fc *fakeDischargeClient) Connection(*context.T, string, ...rpc.CallOpt) (flow.ManagedConn, error) {
+func (fc *fakeDischargeClient) PinConnection(*context.T, string, ...rpc.CallOpt) (flow.PinnedConn, error) {
 	return nil, nil
 }
 func (fc *fakeDischargeClient) Close()                  {}
diff --git a/runtime/internal/rpc/client.go b/runtime/internal/rpc/client.go
index ec5e17f..0e3fb7b 100644
--- a/runtime/internal/rpc/client.go
+++ b/runtime/internal/rpc/client.go
@@ -180,15 +180,67 @@
 	return fc, nil
 }
 
-func (c *client) Connection(ctx *context.T, name string, opts ...rpc.CallOpt) (flow.ManagedConn, error) {
+func (c *client) PinConnection(ctx *context.T, name string, opts ...rpc.CallOpt) (flow.PinnedConn, error) {
+	ctx, cancel := context.WithCancel(ctx)
 	connOpts := getConnectionOptions(ctx, opts)
 	r, err := c.connectToName(ctx, name, "", nil, connOpts, opts)
 	if err != nil {
 		return nil, err
 	}
-	conn := r.flow.Conn()
-	r.flow.Close()
-	return conn, nil
+	pinned := &pinnedConn{
+		cancel: cancel,
+		done:   ctx.Done(),
+		conn:   r.flow.Conn(),
+	}
+	c.wg.Add(1)
+	go c.reconnectPinnedConn(ctx, pinned, name, connOpts, opts...)
+	return pinned, nil
+}
+
+type pinnedConn struct {
+	cancel context.CancelFunc
+	done   <-chan struct{}
+
+	mu   sync.Mutex
+	conn flow.ManagedConn
+}
+
+func (p *pinnedConn) Conn() flow.ManagedConn {
+	defer p.mu.Unlock()
+	p.mu.Lock()
+	return p.conn
+}
+
+func (p *pinnedConn) Unpin() {
+	p.cancel()
+}
+
+func (c *client) reconnectPinnedConn(ctx *context.T, p *pinnedConn, name string, connOpts *connectionOpts, opts ...rpc.CallOpt) {
+	defer c.wg.Done()
+	delay := reconnectDelay
+	for {
+		p.mu.Lock()
+		closed := p.conn.Closed()
+		p.mu.Unlock()
+		select {
+		case <-closed:
+			r, err := c.connectToName(ctx, name, "", nil, connOpts, opts)
+			if err != nil {
+				time.Sleep(delay)
+				delay = nextDelay(delay)
+			} else {
+				delay = reconnectDelay
+				p.mu.Lock()
+				p.conn = r.flow.Conn()
+				closed = p.conn.Closed()
+				p.mu.Unlock()
+			}
+		case <-p.done:
+			// Reaching here means that the ctx passed to PinConnection is cancelled,
+			// so the flow on the conn created in PinConnection is closed here.
+			return
+		}
+	}
 }
 
 type serverStatus struct {
diff --git a/runtime/internal/rpc/test/client_test.go b/runtime/internal/rpc/test/client_test.go
index 4a71fdf..45097f2 100644
--- a/runtime/internal/rpc/test/client_test.go
+++ b/runtime/internal/rpc/test/client_test.go
@@ -31,6 +31,7 @@
 	"v.io/x/ref/internal/logger"
 	"v.io/x/ref/lib/signals"
 	_ "v.io/x/ref/runtime/factories/generic"
+	"v.io/x/ref/runtime/internal/flow/conn"
 	"v.io/x/ref/runtime/internal/lib/tcputil"
 	inaming "v.io/x/ref/runtime/internal/naming"
 	irpc "v.io/x/ref/runtime/internal/rpc"
@@ -956,13 +957,13 @@
 	}
 }
 
-func TestClientConnection(t *testing.T) {
+func TestPinConnection(t *testing.T) {
 	ctx, shutdown := test.V23InitWithMounttable()
 	defer shutdown()
 
 	ctx, cancel := context.WithCancel(ctx)
 	name := "mountpoint/server"
-	_, server, err := v23.WithNewServer(ctx, name, &testServer{}, nil)
+	_, server, err := v23.WithNewServer(ctx, name, &testServer{}, nil, options.LameDuckTimeout(0))
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -971,7 +972,7 @@
 
 	client := v23.GetClient(ctx)
 
-	conn, err := client.Connection(ctx, name)
+	pinnedConn, err := client.PinConnection(ctx, name)
 	if err != nil {
 		t.Error(err)
 	}
@@ -981,9 +982,35 @@
 	if err != nil {
 		t.Error(err)
 	}
-	if got, want := call.Security().LocalEndpoint().String(), conn.LocalEndpoint().String(); got != want {
+	if got, want := call.Security().LocalEndpoint().String(), pinnedConn.Conn().LocalEndpoint().String(); got != want {
 		t.Errorf("got %v, want %v", got, want)
 	}
+	call.Finish()
+
+	// Closing the original conn should automatically reconnect to the server.
+	origConn := pinnedConn.Conn()
+	origConn.(*conn.Conn).Close(ctx, nil)
+	<-origConn.Closed()
+	for {
+		if origConn != pinnedConn.Conn() {
+			break
+		}
+		time.Sleep(20 * time.Millisecond)
+	}
+
+	// Closing an unpinned conn should not reconnect.
+	pinnedConn.Unpin()
+	origConn = pinnedConn.Conn()
+	origConn.(*conn.Conn).Close(ctx, nil)
+	<-origConn.Closed()
+	time.Sleep(300 * time.Millisecond)
+	if origConn != pinnedConn.Conn() {
+		t.Errorf("Conn should not have reconnected.")
+	}
+	// The pinned conn should also become idle.
+	if !origConn.(*conn.Conn).IsIdle(ctx, time.Millisecond) {
+		t.Errorf("Conn should have been idle.")
+	}
 }
 
 func TestConnectionTimeout(t *testing.T) {
diff --git a/services/wspr/internal/lib/simple_client.go b/services/wspr/internal/lib/simple_client.go
index c900102..535ec05 100644
--- a/services/wspr/internal/lib/simple_client.go
+++ b/services/wspr/internal/lib/simple_client.go
@@ -88,7 +88,7 @@
 }
 
 // Implement rpc.Client.
-func (*simpleMockClient) Connection(*context.T, string, ...rpc.CallOpt) (flow.ManagedConn, error) {
+func (*simpleMockClient) PinConnection(*context.T, string, ...rpc.CallOpt) (flow.PinnedConn, error) {
 	return nil, nil
 }
 func (*simpleMockClient) Close()                  {}