ref: Introduce client.PinConnection.
PinConnection will pin the connection to the manager's cache, reconnecting
to the remote end as if the connection breaks.
MultiPart: 2/2
Change-Id: Ie91717dda96a0bf6d98ca3bbb50cba5fe904df90
diff --git a/runtime/internal/flow/conn/auth_test.go b/runtime/internal/flow/conn/auth_test.go
index 6bbe808..43df1cd 100644
--- a/runtime/internal/flow/conn/auth_test.go
+++ b/runtime/internal/flow/conn/auth_test.go
@@ -139,7 +139,7 @@
func (fc *fakeDischargeClient) StartCall(*context.T, string, string, []interface{}, ...rpc.CallOpt) (rpc.ClientCall, error) {
return nil, nil
}
-func (fc *fakeDischargeClient) Connection(*context.T, string, ...rpc.CallOpt) (flow.ManagedConn, error) {
+func (fc *fakeDischargeClient) PinConnection(*context.T, string, ...rpc.CallOpt) (flow.PinnedConn, error) {
return nil, nil
}
func (fc *fakeDischargeClient) Close() {}
diff --git a/runtime/internal/rpc/client.go b/runtime/internal/rpc/client.go
index ec5e17f..0e3fb7b 100644
--- a/runtime/internal/rpc/client.go
+++ b/runtime/internal/rpc/client.go
@@ -180,15 +180,67 @@
return fc, nil
}
-func (c *client) Connection(ctx *context.T, name string, opts ...rpc.CallOpt) (flow.ManagedConn, error) {
+func (c *client) PinConnection(ctx *context.T, name string, opts ...rpc.CallOpt) (flow.PinnedConn, error) {
+ ctx, cancel := context.WithCancel(ctx)
connOpts := getConnectionOptions(ctx, opts)
r, err := c.connectToName(ctx, name, "", nil, connOpts, opts)
if err != nil {
return nil, err
}
- conn := r.flow.Conn()
- r.flow.Close()
- return conn, nil
+ pinned := &pinnedConn{
+ cancel: cancel,
+ done: ctx.Done(),
+ conn: r.flow.Conn(),
+ }
+ c.wg.Add(1)
+ go c.reconnectPinnedConn(ctx, pinned, name, connOpts, opts...)
+ return pinned, nil
+}
+
+type pinnedConn struct {
+ cancel context.CancelFunc
+ done <-chan struct{}
+
+ mu sync.Mutex
+ conn flow.ManagedConn
+}
+
+func (p *pinnedConn) Conn() flow.ManagedConn {
+ defer p.mu.Unlock()
+ p.mu.Lock()
+ return p.conn
+}
+
+func (p *pinnedConn) Unpin() {
+ p.cancel()
+}
+
+func (c *client) reconnectPinnedConn(ctx *context.T, p *pinnedConn, name string, connOpts *connectionOpts, opts ...rpc.CallOpt) {
+ defer c.wg.Done()
+ delay := reconnectDelay
+ for {
+ p.mu.Lock()
+ closed := p.conn.Closed()
+ p.mu.Unlock()
+ select {
+ case <-closed:
+ r, err := c.connectToName(ctx, name, "", nil, connOpts, opts)
+ if err != nil {
+ time.Sleep(delay)
+ delay = nextDelay(delay)
+ } else {
+ delay = reconnectDelay
+ p.mu.Lock()
+ p.conn = r.flow.Conn()
+ closed = p.conn.Closed()
+ p.mu.Unlock()
+ }
+ case <-p.done:
+ // Reaching here means that the ctx passed to PinConnection is cancelled,
+ // so the flow on the conn created in PinConnection is closed here.
+ return
+ }
+ }
}
type serverStatus struct {
diff --git a/runtime/internal/rpc/test/client_test.go b/runtime/internal/rpc/test/client_test.go
index 4a71fdf..45097f2 100644
--- a/runtime/internal/rpc/test/client_test.go
+++ b/runtime/internal/rpc/test/client_test.go
@@ -31,6 +31,7 @@
"v.io/x/ref/internal/logger"
"v.io/x/ref/lib/signals"
_ "v.io/x/ref/runtime/factories/generic"
+ "v.io/x/ref/runtime/internal/flow/conn"
"v.io/x/ref/runtime/internal/lib/tcputil"
inaming "v.io/x/ref/runtime/internal/naming"
irpc "v.io/x/ref/runtime/internal/rpc"
@@ -956,13 +957,13 @@
}
}
-func TestClientConnection(t *testing.T) {
+func TestPinConnection(t *testing.T) {
ctx, shutdown := test.V23InitWithMounttable()
defer shutdown()
ctx, cancel := context.WithCancel(ctx)
name := "mountpoint/server"
- _, server, err := v23.WithNewServer(ctx, name, &testServer{}, nil)
+ _, server, err := v23.WithNewServer(ctx, name, &testServer{}, nil, options.LameDuckTimeout(0))
if err != nil {
t.Fatal(err)
}
@@ -971,7 +972,7 @@
client := v23.GetClient(ctx)
- conn, err := client.Connection(ctx, name)
+ pinnedConn, err := client.PinConnection(ctx, name)
if err != nil {
t.Error(err)
}
@@ -981,9 +982,35 @@
if err != nil {
t.Error(err)
}
- if got, want := call.Security().LocalEndpoint().String(), conn.LocalEndpoint().String(); got != want {
+ if got, want := call.Security().LocalEndpoint().String(), pinnedConn.Conn().LocalEndpoint().String(); got != want {
t.Errorf("got %v, want %v", got, want)
}
+ call.Finish()
+
+ // Closing the original conn should automatically reconnect to the server.
+ origConn := pinnedConn.Conn()
+ origConn.(*conn.Conn).Close(ctx, nil)
+ <-origConn.Closed()
+ for {
+ if origConn != pinnedConn.Conn() {
+ break
+ }
+ time.Sleep(20 * time.Millisecond)
+ }
+
+ // Closing an unpinned conn should not reconnect.
+ pinnedConn.Unpin()
+ origConn = pinnedConn.Conn()
+ origConn.(*conn.Conn).Close(ctx, nil)
+ <-origConn.Closed()
+ time.Sleep(300 * time.Millisecond)
+ if origConn != pinnedConn.Conn() {
+ t.Errorf("Conn should not have reconnected.")
+ }
+ // The pinned conn should also become idle.
+ if !origConn.(*conn.Conn).IsIdle(ctx, time.Millisecond) {
+ t.Errorf("Conn should have been idle.")
+ }
}
func TestConnectionTimeout(t *testing.T) {
diff --git a/services/wspr/internal/lib/simple_client.go b/services/wspr/internal/lib/simple_client.go
index c900102..535ec05 100644
--- a/services/wspr/internal/lib/simple_client.go
+++ b/services/wspr/internal/lib/simple_client.go
@@ -88,7 +88,7 @@
}
// Implement rpc.Client.
-func (*simpleMockClient) Connection(*context.T, string, ...rpc.CallOpt) (flow.ManagedConn, error) {
+func (*simpleMockClient) PinConnection(*context.T, string, ...rpc.CallOpt) (flow.PinnedConn, error) {
return nil, nil
}
func (*simpleMockClient) Close() {}