veyron/runtimes/google/ipc: remove vc from the vcMap on the client if the vc
fails to establish a flow (presumably, because the vc has been closed).
This is a follow-up to go/vcl/1951 an ensures that the client will not attempt
to re-use a stale connection when the server has restarted.

Change-Id: I44cb856d2e99df063916263fcd1495834347c3c7
diff --git a/runtimes/google/ipc/client.go b/runtimes/google/ipc/client.go
index e7ae218..c2e0aa3 100644
--- a/runtimes/google/ipc/client.go
+++ b/runtimes/google/ipc/client.go
@@ -67,20 +67,28 @@
 	return c, nil
 }
 
-func (c *client) getVCInfo(ep naming.Endpoint) (*vcInfo, error) {
+func (c *client) createFlow(ep naming.Endpoint) (stream.Flow, error) {
 	c.vcMapMu.Lock()
 	defer c.vcMapMu.Unlock()
 	if vcinfo := c.vcMap[ep.String()]; vcinfo != nil {
-		return vcinfo, nil
+		if flow, err := vcinfo.vc.Connect(); err == nil {
+			return flow, nil
+		}
+		// If the vc fails to establish a new flow, we assume it's
+		// broken, remove it from the map, and proceed to establishing
+		// a new vc.
+		// TODO(caprita): Should we distinguish errors due to vc being
+		// closed from other errors?  If not, should we call vc.Close()
+		// before removing the vc from the map?
+		delete(c.vcMap, ep.String())
 	}
 	vc, err := c.streamMgr.Dial(ep, c.vcOpts...)
 	if err != nil {
 		return nil, err
 	}
 	// TODO(toddw): Add connections for the type and cancel flows.
-	vcinfo := &vcInfo{vc: vc, remoteEP: ep}
-	c.vcMap[ep.String()] = vcinfo
-	return vcinfo, nil
+	c.vcMap[ep.String()] = &vcInfo{vc: vc, remoteEP: ep}
+	return vc.Connect()
 }
 
 // connectFlow parses an endpoint and a suffix out of the server and establishes
@@ -99,11 +107,7 @@
 	if err = version.CheckCompatibility(ep); err != nil {
 		return nil, "", err
 	}
-	vcinfo, err := c.getVCInfo(ep)
-	if err != nil {
-		return nil, "", err
-	}
-	flow, err := vcinfo.vc.Connect()
+	flow, err := c.createFlow(ep)
 	if err != nil {
 		return nil, "", err
 	}
diff --git a/runtimes/google/ipc/full_test.go b/runtimes/google/ipc/full_test.go
index 5384d56..4784673 100644
--- a/runtimes/google/ipc/full_test.go
+++ b/runtimes/google/ipc/full_test.go
@@ -4,15 +4,16 @@
 	"errors"
 	"fmt"
 	"io"
-	"log"
 	"net"
+	"os"
 	"reflect"
 	"strings"
 	"sync"
 	"testing"
 	"time"
 
-	_ "veyron/lib/testutil"
+	"veyron/lib/testutil"
+	"veyron/lib/testutil/blackbox"
 	imanager "veyron/runtimes/google/ipc/stream/manager"
 	"veyron/runtimes/google/ipc/stream/vc"
 	"veyron/runtimes/google/ipc/version"
@@ -270,7 +271,7 @@
 	return
 }
 
-func derive(blessor security.PrivateID, name string, caveats []security.ServiceCaveat) security.PrivateID {
+func derive(blessor security.PrivateID, name string, caveats ...security.ServiceCaveat) security.PrivateID {
 	id, err := isecurity.NewPrivateID("irrelevant")
 	if err != nil {
 		panic(err)
@@ -304,10 +305,10 @@
 		Caveat:  &icaveat.Expiry{IssueTime: now, ExpiryTime: now},
 	}
 
-	clientV1ID := derive(clientID, "v1", nil)
-	clientV2ID := derive(clientID, "v2", nil)
-	serverV1ID := derive(serverID, "v1", []security.ServiceCaveat{cavOnlyV1})
-	serverExpiredID := derive(serverID, "expired", []security.ServiceCaveat{cavExpired})
+	clientV1ID := derive(clientID, "v1")
+	clientV2ID := derive(clientID, "v2")
+	serverV1ID := derive(serverID, "v1", cavOnlyV1)
+	serverExpiredID := derive(serverID, "expired", cavExpired)
 
 	tests := []struct {
 		clientID, serverID security.PrivateID
@@ -426,9 +427,9 @@
 		Caveat:  &icaveat.Expiry{IssueTime: now, ExpiryTime: now},
 	}
 
-	blessedByServerOnlyEcho := derive(serverID, "onlyEcho", []security.ServiceCaveat{cavOnlyEcho})
-	blessedByServerExpired := derive(serverID, "expired", []security.ServiceCaveat{cavExpired})
-	blessedByClient := derive(clientID, "blessed", nil)
+	blessedByServerOnlyEcho := derive(serverID, "onlyEcho", cavOnlyEcho)
+	blessedByServerExpired := derive(serverID, "expired", cavExpired)
+	blessedByClient := derive(clientID, "blessed")
 
 	const (
 		expiredIDErr = "forbids credential from being used at this time"
@@ -743,14 +744,107 @@
 	}
 }
 
+// TestReconnect verifies that the client transparently re-establishes the
+// connection to the server if the server dies and comes back (on the same
+// endpoint).
+func TestReconnect(t *testing.T) {
+	b := createBundle(t, clientID, nil, nil) // We only need the client from the bundle.
+	defer b.cleanup(t)
+	idFile := testutil.SaveIdentityToFile(derive(clientID, "server"))
+	server := blackbox.HelperCommand(t, "runServer", "127.0.0.1:0", idFile)
+	server.Cmd.Start()
+	addr, err := server.ReadLineFromChild()
+	if err != nil {
+		t.Fatalf("Failed to read server address from process: %v", err)
+	}
+	ep, err := inaming.NewEndpoint(addr)
+	if err != nil {
+		t.Fatalf("inaming.NewEndpoint(%q): %v", addr, err)
+	}
+	serverName := naming.JoinAddressName(ep.String(), "server/suffix")
+	makeCall := func() (string, error) {
+		call, err := b.client.StartCall(serverName, "Echo", []interface{}{"bratman"})
+		if err != nil {
+			return "", err
+		}
+		var result string
+		if err = call.Finish(&result); err != nil {
+			return "", err
+		}
+		return result, nil
+	}
+	expected := `method:"Echo",suffix:"suffix",arg:"bratman"`
+	if result, err := makeCall(); err != nil || result != expected {
+		t.Errorf("Got (%q, %v) want (%q, nil)", result, err, expected)
+	}
+	// Kill the server, verify client can't talk to it anymore.
+	server.Cleanup()
+	if _, err := makeCall(); err == nil {
+		t.Fatal("Expected call to fail since server is dead")
+	}
+	// Resurrect the server with the same address, verify client
+	// re-establishes the connection.
+	server = blackbox.HelperCommand(t, "runServer", addr, idFile)
+	defer server.Cleanup()
+	server.Cmd.Start()
+	if addr2, err := server.ReadLineFromChild(); addr2 != addr || err != nil {
+		t.Fatalf("Got (%q, %v) want (%q, nil)", addr2, err, addr)
+	}
+	if result, err := makeCall(); err != nil || result != expected {
+		t.Errorf("Got (%q, %v) want (%q, nil)", result, err, expected)
+	}
+}
+
+func loadIdentityFromFile(file string) security.PrivateID {
+	f, err := os.Open(file)
+	if err != nil {
+		vlog.Fatalf("failed to open %v: %v", file, err)
+	}
+	id, err := security.LoadIdentity(f)
+	f.Close()
+	if err != nil {
+		vlog.Fatalf("Failed to load identity from %v: %v", file, err)
+	}
+	return id
+}
+
+func runServer(argv []string) {
+	mgr := imanager.InternalNew(naming.FixedRoutingID(0x1111111))
+	mt := newMountTable()
+	id := loadIdentityFromFile(argv[1])
+	isecurity.TrustIdentityProviders(id)
+	server, err := InternalNewServer(mgr, mt, veyron2.LocalID(id))
+	if err != nil {
+		vlog.Fatalf("InternalNewServer failed: %v", err)
+	}
+	disp := testServerDisp{new(testServer)}
+	if err := server.Register("server", disp); err != nil {
+		vlog.Fatalf("server.Register failed: %v", err)
+	}
+	ep, err := server.Listen("tcp", argv[0])
+	if err != nil {
+		vlog.Fatalf("server.Listen failed: %v", err)
+	}
+	fmt.Println(ep.Addr())
+	// Live forever (parent process should explicitly kill us).
+	<-make(chan struct{})
+}
+
+// Required by blackbox framework.
+func TestHelperProcess(t *testing.T) {
+	blackbox.HelperProcess(t)
+}
+
 func init() {
 	var err error
 	if clientID, err = isecurity.NewPrivateID("client"); err != nil {
-		log.Fatalf("failed isecurity.NewPrivateID: %s", err)
+		vlog.Fatalf("failed isecurity.NewPrivateID: %s", err)
 	}
 	if serverID, err = isecurity.NewPrivateID("server"); err != nil {
-		log.Fatalf("failed isecurity.NewPrivateID: %s", err)
+		vlog.Fatalf("failed isecurity.NewPrivateID: %s", err)
 	}
 	isecurity.TrustIdentityProviders(clientID)
 	isecurity.TrustIdentityProviders(serverID)
+
+	blackbox.CommandTable["runServer"] = runServer
 }
diff --git a/runtimes/google/ipc/server.go b/runtimes/google/ipc/server.go
index bcac1e3..d05fcc0 100644
--- a/runtimes/google/ipc/server.go
+++ b/runtimes/google/ipc/server.go
@@ -146,6 +146,10 @@
 			s.active.Add(1)
 			go func(flow stream.Flow) {
 				if err := newFlowServer(flow, s).serve(); err != nil {
+					// TODO(caprita): Logging errors here is
+					// too spammy. For example, "not
+					// authorized" errors shouldn't be
+					// logged as server errors.
 					vlog.Errorf("Flow serve on (%v, %v) failed: %v", protocol, address, err)
 				}
 				s.active.Done()