Merge "veyron2/storage/vstore: Refactor dependencies in vstore."
diff --git a/runtimes/google/ipc/client.go b/runtimes/google/ipc/client.go
index e7ae218..c2e0aa3 100644
--- a/runtimes/google/ipc/client.go
+++ b/runtimes/google/ipc/client.go
@@ -67,20 +67,28 @@
 	return c, nil
 }
 
-func (c *client) getVCInfo(ep naming.Endpoint) (*vcInfo, error) {
+func (c *client) createFlow(ep naming.Endpoint) (stream.Flow, error) {
 	c.vcMapMu.Lock()
 	defer c.vcMapMu.Unlock()
 	if vcinfo := c.vcMap[ep.String()]; vcinfo != nil {
-		return vcinfo, nil
+		if flow, err := vcinfo.vc.Connect(); err == nil {
+			return flow, nil
+		}
+		// If the vc fails to establish a new flow, we assume it's
+		// broken, remove it from the map, and proceed to establishing
+		// a new vc.
+		// TODO(caprita): Should we distinguish errors due to vc being
+		// closed from other errors?  If not, should we call vc.Close()
+		// before removing the vc from the map?
+		delete(c.vcMap, ep.String())
 	}
 	vc, err := c.streamMgr.Dial(ep, c.vcOpts...)
 	if err != nil {
 		return nil, err
 	}
 	// TODO(toddw): Add connections for the type and cancel flows.
-	vcinfo := &vcInfo{vc: vc, remoteEP: ep}
-	c.vcMap[ep.String()] = vcinfo
-	return vcinfo, nil
+	c.vcMap[ep.String()] = &vcInfo{vc: vc, remoteEP: ep}
+	return vc.Connect()
 }
 
 // connectFlow parses an endpoint and a suffix out of the server and establishes
@@ -99,11 +107,7 @@
 	if err = version.CheckCompatibility(ep); err != nil {
 		return nil, "", err
 	}
-	vcinfo, err := c.getVCInfo(ep)
-	if err != nil {
-		return nil, "", err
-	}
-	flow, err := vcinfo.vc.Connect()
+	flow, err := c.createFlow(ep)
 	if err != nil {
 		return nil, "", err
 	}
diff --git a/runtimes/google/ipc/full_test.go b/runtimes/google/ipc/full_test.go
index 252d3e3..4784673 100644
--- a/runtimes/google/ipc/full_test.go
+++ b/runtimes/google/ipc/full_test.go
@@ -4,15 +4,16 @@
 	"errors"
 	"fmt"
 	"io"
-	"log"
 	"net"
+	"os"
 	"reflect"
 	"strings"
 	"sync"
 	"testing"
 	"time"
 
-	_ "veyron/lib/testutil"
+	"veyron/lib/testutil"
+	"veyron/lib/testutil/blackbox"
 	imanager "veyron/runtimes/google/ipc/stream/manager"
 	"veyron/runtimes/google/ipc/stream/vc"
 	"veyron/runtimes/google/ipc/version"
@@ -270,7 +271,7 @@
 	return
 }
 
-func derive(blessor security.PrivateID, name string, caveats []security.ServiceCaveat) security.PrivateID {
+func derive(blessor security.PrivateID, name string, caveats ...security.ServiceCaveat) security.PrivateID {
 	id, err := isecurity.NewPrivateID("irrelevant")
 	if err != nil {
 		panic(err)
@@ -304,10 +305,10 @@
 		Caveat:  &icaveat.Expiry{IssueTime: now, ExpiryTime: now},
 	}
 
-	clientV1ID := derive(clientID, "v1", nil)
-	clientV2ID := derive(clientID, "v2", nil)
-	serverV1ID := derive(serverID, "v1", []security.ServiceCaveat{cavOnlyV1})
-	serverExpiredID := derive(serverID, "expired", []security.ServiceCaveat{cavExpired})
+	clientV1ID := derive(clientID, "v1")
+	clientV2ID := derive(clientID, "v2")
+	serverV1ID := derive(serverID, "v1", cavOnlyV1)
+	serverExpiredID := derive(serverID, "expired", cavExpired)
 
 	tests := []struct {
 		clientID, serverID security.PrivateID
@@ -426,9 +427,9 @@
 		Caveat:  &icaveat.Expiry{IssueTime: now, ExpiryTime: now},
 	}
 
-	blessedByServerOnlyEcho := derive(serverID, "onlyEcho", []security.ServiceCaveat{cavOnlyEcho})
-	blessedByServerExpired := derive(serverID, "expired", []security.ServiceCaveat{cavExpired})
-	blessedByClient := derive(clientID, "blessed", nil)
+	blessedByServerOnlyEcho := derive(serverID, "onlyEcho", cavOnlyEcho)
+	blessedByServerExpired := derive(serverID, "expired", cavExpired)
+	blessedByClient := derive(clientID, "blessed")
 
 	const (
 		expiredIDErr = "forbids credential from being used at this time"
@@ -691,7 +692,7 @@
 		{[]ipc.ServerOpt{}, []string{"127.0.0.1", "127.0.0.1"}},
 		{[]ipc.ServerOpt{veyron2.PublishAll}, []string{"127.0.0.1", "127.0.0.1"}},
 		{[]ipc.ServerOpt{veyron2.PublishFirst}, []string{"127.0.0.1"}},
-		{[]ipc.ServerOpt{veyron2.EndpointRewriteOpt("example.com")}, []string{"example.com", "example.com"}},
+		{[]ipc.ServerOpt{veyron2.EndpointRewriteOpt("example1.com"), veyron2.EndpointRewriteOpt("example2.com")}, []string{"example2.com", "example2.com"}},
 		{[]ipc.ServerOpt{veyron2.PublishFirst, veyron2.EndpointRewriteOpt("example.com")}, []string{"example.com"}},
 	}
 	for i, c := range cases {
@@ -743,14 +744,107 @@
 	}
 }
 
+// TestReconnect verifies that the client transparently re-establishes the
+// connection to the server if the server dies and comes back (on the same
+// endpoint).
+func TestReconnect(t *testing.T) {
+	b := createBundle(t, clientID, nil, nil) // We only need the client from the bundle.
+	defer b.cleanup(t)
+	idFile := testutil.SaveIdentityToFile(derive(clientID, "server"))
+	server := blackbox.HelperCommand(t, "runServer", "127.0.0.1:0", idFile)
+	server.Cmd.Start()
+	addr, err := server.ReadLineFromChild()
+	if err != nil {
+		t.Fatalf("Failed to read server address from process: %v", err)
+	}
+	ep, err := inaming.NewEndpoint(addr)
+	if err != nil {
+		t.Fatalf("inaming.NewEndpoint(%q): %v", addr, err)
+	}
+	serverName := naming.JoinAddressName(ep.String(), "server/suffix")
+	makeCall := func() (string, error) {
+		call, err := b.client.StartCall(serverName, "Echo", []interface{}{"bratman"})
+		if err != nil {
+			return "", err
+		}
+		var result string
+		if err = call.Finish(&result); err != nil {
+			return "", err
+		}
+		return result, nil
+	}
+	expected := `method:"Echo",suffix:"suffix",arg:"bratman"`
+	if result, err := makeCall(); err != nil || result != expected {
+		t.Errorf("Got (%q, %v) want (%q, nil)", result, err, expected)
+	}
+	// Kill the server, verify client can't talk to it anymore.
+	server.Cleanup()
+	if _, err := makeCall(); err == nil {
+		t.Fatal("Expected call to fail since server is dead")
+	}
+	// Resurrect the server with the same address, verify client
+	// re-establishes the connection.
+	server = blackbox.HelperCommand(t, "runServer", addr, idFile)
+	defer server.Cleanup()
+	server.Cmd.Start()
+	if addr2, err := server.ReadLineFromChild(); addr2 != addr || err != nil {
+		t.Fatalf("Got (%q, %v) want (%q, nil)", addr2, err, addr)
+	}
+	if result, err := makeCall(); err != nil || result != expected {
+		t.Errorf("Got (%q, %v) want (%q, nil)", result, err, expected)
+	}
+}
+
+func loadIdentityFromFile(file string) security.PrivateID {
+	f, err := os.Open(file)
+	if err != nil {
+		vlog.Fatalf("failed to open %v: %v", file, err)
+	}
+	id, err := security.LoadIdentity(f)
+	f.Close()
+	if err != nil {
+		vlog.Fatalf("Failed to load identity from %v: %v", file, err)
+	}
+	return id
+}
+
+func runServer(argv []string) {
+	mgr := imanager.InternalNew(naming.FixedRoutingID(0x1111111))
+	mt := newMountTable()
+	id := loadIdentityFromFile(argv[1])
+	isecurity.TrustIdentityProviders(id)
+	server, err := InternalNewServer(mgr, mt, veyron2.LocalID(id))
+	if err != nil {
+		vlog.Fatalf("InternalNewServer failed: %v", err)
+	}
+	disp := testServerDisp{new(testServer)}
+	if err := server.Register("server", disp); err != nil {
+		vlog.Fatalf("server.Register failed: %v", err)
+	}
+	ep, err := server.Listen("tcp", argv[0])
+	if err != nil {
+		vlog.Fatalf("server.Listen failed: %v", err)
+	}
+	fmt.Println(ep.Addr())
+	// Live forever (parent process should explicitly kill us).
+	<-make(chan struct{})
+}
+
+// Required by blackbox framework.
+func TestHelperProcess(t *testing.T) {
+	blackbox.HelperProcess(t)
+}
+
 func init() {
 	var err error
 	if clientID, err = isecurity.NewPrivateID("client"); err != nil {
-		log.Fatalf("failed isecurity.NewPrivateID: %s", err)
+		vlog.Fatalf("failed isecurity.NewPrivateID: %s", err)
 	}
 	if serverID, err = isecurity.NewPrivateID("server"); err != nil {
-		log.Fatalf("failed isecurity.NewPrivateID: %s", err)
+		vlog.Fatalf("failed isecurity.NewPrivateID: %s", err)
 	}
 	isecurity.TrustIdentityProviders(clientID)
 	isecurity.TrustIdentityProviders(serverID)
+
+	blackbox.CommandTable["runServer"] = runServer
 }
diff --git a/runtimes/google/ipc/server.go b/runtimes/google/ipc/server.go
index bcac1e3..d05fcc0 100644
--- a/runtimes/google/ipc/server.go
+++ b/runtimes/google/ipc/server.go
@@ -146,6 +146,10 @@
 			s.active.Add(1)
 			go func(flow stream.Flow) {
 				if err := newFlowServer(flow, s).serve(); err != nil {
+					// TODO(caprita): Logging errors here is
+					// too spammy. For example, "not
+					// authorized" errors shouldn't be
+					// logged as server errors.
 					vlog.Errorf("Flow serve on (%v, %v) failed: %v", protocol, address, err)
 				}
 				s.active.Done()
diff --git a/runtimes/google/ipc/stream/manager/manager.go b/runtimes/google/ipc/stream/manager/manager.go
index 3b1a084..3a19169 100644
--- a/runtimes/google/ipc/stream/manager/manager.go
+++ b/runtimes/google/ipc/stream/manager/manager.go
@@ -110,11 +110,12 @@
 
 func (m *manager) Listen(protocol, address string, opts ...stream.ListenerOpt) (stream.Listener, naming.Endpoint, error) {
 	var rewriteEP string
-	for i, o := range opts {
-		if rewriteOpt, ok := o.(veyron2.EndpointRewriteOpt); ok {
+	for i := 0; i < len(opts); i++ {
+		if rewriteOpt, ok := opts[i].(veyron2.EndpointRewriteOpt); ok {
 			// Last one 'wins'.
 			rewriteEP = string(rewriteOpt)
 			opts = append(opts[:i], opts[i+1:]...)
+			i--
 		}
 	}
 	m.muListeners.Lock()
diff --git a/services/mounttable/lib/mounttable.go b/services/mounttable/lib/mounttable.go
index ee35018..28eb42f 100644
--- a/services/mounttable/lib/mounttable.go
+++ b/services/mounttable/lib/mounttable.go
@@ -51,6 +51,12 @@
 	children map[string]*node
 }
 
+// dummyAuth allows all RPCs.
+type dummyAuth struct { }
+func (dummyAuth) Authorize(security.Context) error {
+	return nil
+}
+
 // NewMountTable creates a new server that uses the default authorization policy.
 func NewMountTable() *mountTable {
 	return &mountTable{
@@ -71,7 +77,7 @@
 		ms.elems = strings.Split(name, "/")
 		ms.cleanedElems = strings.Split(strings.TrimLeft(path.Clean(name), "/"), "/")
 	}
-	return ipc.ReflectInvoker(mounttable.NewServerMountTable(ms)), nil, nil
+	return ipc.ReflectInvoker(mounttable.NewServerMountTable(ms)), new(dummyAuth), nil
 }
 
 // findNode returns the node for the name path represented by elems.  If none exists and create is false, return nil.
diff --git a/services/wspr/wsprd/lib/wspr.go b/services/wspr/wsprd/lib/wspr.go
index 600f7ae..31c96e0 100644
--- a/services/wspr/wsprd/lib/wspr.go
+++ b/services/wspr/wsprd/lib/wspr.go
@@ -26,6 +26,7 @@
 	"log"
 	"net/http"
 	_ "net/http/pprof"
+	"os"
 	"path/filepath"
 	"runtime"
 	"strings"
@@ -49,6 +50,10 @@
 	pongTimeout  = pingInterval + 10*time.Second // maximum wait for pong.
 )
 
+type wsprConfig struct {
+	MounttableRoot []string
+}
+
 type WSPR struct {
 	tlsCert       *tls.Certificate
 	clientCache   *ClientCache
@@ -505,13 +510,37 @@
 		wsp.ctx.logger.VI(0).Infof("websocket upgrade failed: %s", err)
 		return
 	}
+
 	wsp.setup()
 	wsp.ws = ws
 	wsp.ws.SetPongHandler(wsp.pongHandler)
+	wsp.sendInitialMessage()
 	go wsp.readLoop()
 	go wsp.pingLoop()
 }
 
+// Upon first connect, we send a message with the wsprConfig.
+func (wsp *websocketPipe) sendInitialMessage() {
+	mounttableRoots := strings.Split(os.Getenv("MOUNTTABLE_ROOT"), ",")
+	if len(mounttableRoots) == 1 && mounttableRoots[0] == "" {
+		mounttableRoots = []string{}
+	}
+	msg := wsprConfig{
+		MounttableRoot: mounttableRoots,
+	}
+
+	wc, err := wsp.ws.NextWriter(websocket.TextMessage)
+	if err != nil {
+		wsp.ctx.logger.VI(0).Infof("failed to create websocket writer: %s", err)
+		return
+	}
+	if err := vom.ObjToJSON(wc, vom.ValueOf(msg)); err != nil {
+		wsp.ctx.logger.VI(0).Infof("failed to convert wspr config to json: %s", err)
+		return
+	}
+	wc.Close()
+}
+
 func (wsp *websocketPipe) pingLoop() {
 	for {
 		time.Sleep(pingInterval)