veyron/runtimes/google/ipc: remove vc from the vcMap on the client if the vc
fails to establish a flow (presumably, because the vc has been closed).
This is a follow-up to go/vcl/1951 an ensures that the client will not attempt
to re-use a stale connection when the server has restarted.
Change-Id: I44cb856d2e99df063916263fcd1495834347c3c7
diff --git a/runtimes/google/ipc/client.go b/runtimes/google/ipc/client.go
index e7ae218..c2e0aa3 100644
--- a/runtimes/google/ipc/client.go
+++ b/runtimes/google/ipc/client.go
@@ -67,20 +67,28 @@
return c, nil
}
-func (c *client) getVCInfo(ep naming.Endpoint) (*vcInfo, error) {
+func (c *client) createFlow(ep naming.Endpoint) (stream.Flow, error) {
c.vcMapMu.Lock()
defer c.vcMapMu.Unlock()
if vcinfo := c.vcMap[ep.String()]; vcinfo != nil {
- return vcinfo, nil
+ if flow, err := vcinfo.vc.Connect(); err == nil {
+ return flow, nil
+ }
+ // If the vc fails to establish a new flow, we assume it's
+ // broken, remove it from the map, and proceed to establishing
+ // a new vc.
+ // TODO(caprita): Should we distinguish errors due to vc being
+ // closed from other errors? If not, should we call vc.Close()
+ // before removing the vc from the map?
+ delete(c.vcMap, ep.String())
}
vc, err := c.streamMgr.Dial(ep, c.vcOpts...)
if err != nil {
return nil, err
}
// TODO(toddw): Add connections for the type and cancel flows.
- vcinfo := &vcInfo{vc: vc, remoteEP: ep}
- c.vcMap[ep.String()] = vcinfo
- return vcinfo, nil
+ c.vcMap[ep.String()] = &vcInfo{vc: vc, remoteEP: ep}
+ return vc.Connect()
}
// connectFlow parses an endpoint and a suffix out of the server and establishes
@@ -99,11 +107,7 @@
if err = version.CheckCompatibility(ep); err != nil {
return nil, "", err
}
- vcinfo, err := c.getVCInfo(ep)
- if err != nil {
- return nil, "", err
- }
- flow, err := vcinfo.vc.Connect()
+ flow, err := c.createFlow(ep)
if err != nil {
return nil, "", err
}
diff --git a/runtimes/google/ipc/full_test.go b/runtimes/google/ipc/full_test.go
index 5384d56..4784673 100644
--- a/runtimes/google/ipc/full_test.go
+++ b/runtimes/google/ipc/full_test.go
@@ -4,15 +4,16 @@
"errors"
"fmt"
"io"
- "log"
"net"
+ "os"
"reflect"
"strings"
"sync"
"testing"
"time"
- _ "veyron/lib/testutil"
+ "veyron/lib/testutil"
+ "veyron/lib/testutil/blackbox"
imanager "veyron/runtimes/google/ipc/stream/manager"
"veyron/runtimes/google/ipc/stream/vc"
"veyron/runtimes/google/ipc/version"
@@ -270,7 +271,7 @@
return
}
-func derive(blessor security.PrivateID, name string, caveats []security.ServiceCaveat) security.PrivateID {
+func derive(blessor security.PrivateID, name string, caveats ...security.ServiceCaveat) security.PrivateID {
id, err := isecurity.NewPrivateID("irrelevant")
if err != nil {
panic(err)
@@ -304,10 +305,10 @@
Caveat: &icaveat.Expiry{IssueTime: now, ExpiryTime: now},
}
- clientV1ID := derive(clientID, "v1", nil)
- clientV2ID := derive(clientID, "v2", nil)
- serverV1ID := derive(serverID, "v1", []security.ServiceCaveat{cavOnlyV1})
- serverExpiredID := derive(serverID, "expired", []security.ServiceCaveat{cavExpired})
+ clientV1ID := derive(clientID, "v1")
+ clientV2ID := derive(clientID, "v2")
+ serverV1ID := derive(serverID, "v1", cavOnlyV1)
+ serverExpiredID := derive(serverID, "expired", cavExpired)
tests := []struct {
clientID, serverID security.PrivateID
@@ -426,9 +427,9 @@
Caveat: &icaveat.Expiry{IssueTime: now, ExpiryTime: now},
}
- blessedByServerOnlyEcho := derive(serverID, "onlyEcho", []security.ServiceCaveat{cavOnlyEcho})
- blessedByServerExpired := derive(serverID, "expired", []security.ServiceCaveat{cavExpired})
- blessedByClient := derive(clientID, "blessed", nil)
+ blessedByServerOnlyEcho := derive(serverID, "onlyEcho", cavOnlyEcho)
+ blessedByServerExpired := derive(serverID, "expired", cavExpired)
+ blessedByClient := derive(clientID, "blessed")
const (
expiredIDErr = "forbids credential from being used at this time"
@@ -743,14 +744,107 @@
}
}
+// TestReconnect verifies that the client transparently re-establishes the
+// connection to the server if the server dies and comes back (on the same
+// endpoint).
+func TestReconnect(t *testing.T) {
+ b := createBundle(t, clientID, nil, nil) // We only need the client from the bundle.
+ defer b.cleanup(t)
+ idFile := testutil.SaveIdentityToFile(derive(clientID, "server"))
+ server := blackbox.HelperCommand(t, "runServer", "127.0.0.1:0", idFile)
+ server.Cmd.Start()
+ addr, err := server.ReadLineFromChild()
+ if err != nil {
+ t.Fatalf("Failed to read server address from process: %v", err)
+ }
+ ep, err := inaming.NewEndpoint(addr)
+ if err != nil {
+ t.Fatalf("inaming.NewEndpoint(%q): %v", addr, err)
+ }
+ serverName := naming.JoinAddressName(ep.String(), "server/suffix")
+ makeCall := func() (string, error) {
+ call, err := b.client.StartCall(serverName, "Echo", []interface{}{"bratman"})
+ if err != nil {
+ return "", err
+ }
+ var result string
+ if err = call.Finish(&result); err != nil {
+ return "", err
+ }
+ return result, nil
+ }
+ expected := `method:"Echo",suffix:"suffix",arg:"bratman"`
+ if result, err := makeCall(); err != nil || result != expected {
+ t.Errorf("Got (%q, %v) want (%q, nil)", result, err, expected)
+ }
+ // Kill the server, verify client can't talk to it anymore.
+ server.Cleanup()
+ if _, err := makeCall(); err == nil {
+ t.Fatal("Expected call to fail since server is dead")
+ }
+ // Resurrect the server with the same address, verify client
+ // re-establishes the connection.
+ server = blackbox.HelperCommand(t, "runServer", addr, idFile)
+ defer server.Cleanup()
+ server.Cmd.Start()
+ if addr2, err := server.ReadLineFromChild(); addr2 != addr || err != nil {
+ t.Fatalf("Got (%q, %v) want (%q, nil)", addr2, err, addr)
+ }
+ if result, err := makeCall(); err != nil || result != expected {
+ t.Errorf("Got (%q, %v) want (%q, nil)", result, err, expected)
+ }
+}
+
+func loadIdentityFromFile(file string) security.PrivateID {
+ f, err := os.Open(file)
+ if err != nil {
+ vlog.Fatalf("failed to open %v: %v", file, err)
+ }
+ id, err := security.LoadIdentity(f)
+ f.Close()
+ if err != nil {
+ vlog.Fatalf("Failed to load identity from %v: %v", file, err)
+ }
+ return id
+}
+
+func runServer(argv []string) {
+ mgr := imanager.InternalNew(naming.FixedRoutingID(0x1111111))
+ mt := newMountTable()
+ id := loadIdentityFromFile(argv[1])
+ isecurity.TrustIdentityProviders(id)
+ server, err := InternalNewServer(mgr, mt, veyron2.LocalID(id))
+ if err != nil {
+ vlog.Fatalf("InternalNewServer failed: %v", err)
+ }
+ disp := testServerDisp{new(testServer)}
+ if err := server.Register("server", disp); err != nil {
+ vlog.Fatalf("server.Register failed: %v", err)
+ }
+ ep, err := server.Listen("tcp", argv[0])
+ if err != nil {
+ vlog.Fatalf("server.Listen failed: %v", err)
+ }
+ fmt.Println(ep.Addr())
+ // Live forever (parent process should explicitly kill us).
+ <-make(chan struct{})
+}
+
+// Required by blackbox framework.
+func TestHelperProcess(t *testing.T) {
+ blackbox.HelperProcess(t)
+}
+
func init() {
var err error
if clientID, err = isecurity.NewPrivateID("client"); err != nil {
- log.Fatalf("failed isecurity.NewPrivateID: %s", err)
+ vlog.Fatalf("failed isecurity.NewPrivateID: %s", err)
}
if serverID, err = isecurity.NewPrivateID("server"); err != nil {
- log.Fatalf("failed isecurity.NewPrivateID: %s", err)
+ vlog.Fatalf("failed isecurity.NewPrivateID: %s", err)
}
isecurity.TrustIdentityProviders(clientID)
isecurity.TrustIdentityProviders(serverID)
+
+ blackbox.CommandTable["runServer"] = runServer
}
diff --git a/runtimes/google/ipc/server.go b/runtimes/google/ipc/server.go
index bcac1e3..d05fcc0 100644
--- a/runtimes/google/ipc/server.go
+++ b/runtimes/google/ipc/server.go
@@ -146,6 +146,10 @@
s.active.Add(1)
go func(flow stream.Flow) {
if err := newFlowServer(flow, s).serve(); err != nil {
+ // TODO(caprita): Logging errors here is
+ // too spammy. For example, "not
+ // authorized" errors shouldn't be
+ // logged as server errors.
vlog.Errorf("Flow serve on (%v, %v) failed: %v", protocol, address, err)
}
s.active.Done()