Merge "veyron2/storage/vstore: Refactor dependencies in vstore."
diff --git a/runtimes/google/ipc/client.go b/runtimes/google/ipc/client.go
index e7ae218..c2e0aa3 100644
--- a/runtimes/google/ipc/client.go
+++ b/runtimes/google/ipc/client.go
@@ -67,20 +67,28 @@
return c, nil
}
-func (c *client) getVCInfo(ep naming.Endpoint) (*vcInfo, error) {
+func (c *client) createFlow(ep naming.Endpoint) (stream.Flow, error) {
c.vcMapMu.Lock()
defer c.vcMapMu.Unlock()
if vcinfo := c.vcMap[ep.String()]; vcinfo != nil {
- return vcinfo, nil
+ if flow, err := vcinfo.vc.Connect(); err == nil {
+ return flow, nil
+ }
+ // If the vc fails to establish a new flow, we assume it's
+ // broken, remove it from the map, and proceed to establishing
+ // a new vc.
+ // TODO(caprita): Should we distinguish errors due to vc being
+ // closed from other errors? If not, should we call vc.Close()
+ // before removing the vc from the map?
+ delete(c.vcMap, ep.String())
}
vc, err := c.streamMgr.Dial(ep, c.vcOpts...)
if err != nil {
return nil, err
}
// TODO(toddw): Add connections for the type and cancel flows.
- vcinfo := &vcInfo{vc: vc, remoteEP: ep}
- c.vcMap[ep.String()] = vcinfo
- return vcinfo, nil
+ c.vcMap[ep.String()] = &vcInfo{vc: vc, remoteEP: ep}
+ return vc.Connect()
}
// connectFlow parses an endpoint and a suffix out of the server and establishes
@@ -99,11 +107,7 @@
if err = version.CheckCompatibility(ep); err != nil {
return nil, "", err
}
- vcinfo, err := c.getVCInfo(ep)
- if err != nil {
- return nil, "", err
- }
- flow, err := vcinfo.vc.Connect()
+ flow, err := c.createFlow(ep)
if err != nil {
return nil, "", err
}
diff --git a/runtimes/google/ipc/full_test.go b/runtimes/google/ipc/full_test.go
index 252d3e3..4784673 100644
--- a/runtimes/google/ipc/full_test.go
+++ b/runtimes/google/ipc/full_test.go
@@ -4,15 +4,16 @@
"errors"
"fmt"
"io"
- "log"
"net"
+ "os"
"reflect"
"strings"
"sync"
"testing"
"time"
- _ "veyron/lib/testutil"
+ "veyron/lib/testutil"
+ "veyron/lib/testutil/blackbox"
imanager "veyron/runtimes/google/ipc/stream/manager"
"veyron/runtimes/google/ipc/stream/vc"
"veyron/runtimes/google/ipc/version"
@@ -270,7 +271,7 @@
return
}
-func derive(blessor security.PrivateID, name string, caveats []security.ServiceCaveat) security.PrivateID {
+func derive(blessor security.PrivateID, name string, caveats ...security.ServiceCaveat) security.PrivateID {
id, err := isecurity.NewPrivateID("irrelevant")
if err != nil {
panic(err)
@@ -304,10 +305,10 @@
Caveat: &icaveat.Expiry{IssueTime: now, ExpiryTime: now},
}
- clientV1ID := derive(clientID, "v1", nil)
- clientV2ID := derive(clientID, "v2", nil)
- serverV1ID := derive(serverID, "v1", []security.ServiceCaveat{cavOnlyV1})
- serverExpiredID := derive(serverID, "expired", []security.ServiceCaveat{cavExpired})
+ clientV1ID := derive(clientID, "v1")
+ clientV2ID := derive(clientID, "v2")
+ serverV1ID := derive(serverID, "v1", cavOnlyV1)
+ serverExpiredID := derive(serverID, "expired", cavExpired)
tests := []struct {
clientID, serverID security.PrivateID
@@ -426,9 +427,9 @@
Caveat: &icaveat.Expiry{IssueTime: now, ExpiryTime: now},
}
- blessedByServerOnlyEcho := derive(serverID, "onlyEcho", []security.ServiceCaveat{cavOnlyEcho})
- blessedByServerExpired := derive(serverID, "expired", []security.ServiceCaveat{cavExpired})
- blessedByClient := derive(clientID, "blessed", nil)
+ blessedByServerOnlyEcho := derive(serverID, "onlyEcho", cavOnlyEcho)
+ blessedByServerExpired := derive(serverID, "expired", cavExpired)
+ blessedByClient := derive(clientID, "blessed")
const (
expiredIDErr = "forbids credential from being used at this time"
@@ -691,7 +692,7 @@
{[]ipc.ServerOpt{}, []string{"127.0.0.1", "127.0.0.1"}},
{[]ipc.ServerOpt{veyron2.PublishAll}, []string{"127.0.0.1", "127.0.0.1"}},
{[]ipc.ServerOpt{veyron2.PublishFirst}, []string{"127.0.0.1"}},
- {[]ipc.ServerOpt{veyron2.EndpointRewriteOpt("example.com")}, []string{"example.com", "example.com"}},
+ {[]ipc.ServerOpt{veyron2.EndpointRewriteOpt("example1.com"), veyron2.EndpointRewriteOpt("example2.com")}, []string{"example2.com", "example2.com"}},
{[]ipc.ServerOpt{veyron2.PublishFirst, veyron2.EndpointRewriteOpt("example.com")}, []string{"example.com"}},
}
for i, c := range cases {
@@ -743,14 +744,107 @@
}
}
+// TestReconnect verifies that the client transparently re-establishes the
+// connection to the server if the server dies and comes back (on the same
+// endpoint).
+func TestReconnect(t *testing.T) {
+ b := createBundle(t, clientID, nil, nil) // We only need the client from the bundle.
+ defer b.cleanup(t)
+ idFile := testutil.SaveIdentityToFile(derive(clientID, "server"))
+ server := blackbox.HelperCommand(t, "runServer", "127.0.0.1:0", idFile)
+ server.Cmd.Start()
+ addr, err := server.ReadLineFromChild()
+ if err != nil {
+ t.Fatalf("Failed to read server address from process: %v", err)
+ }
+ ep, err := inaming.NewEndpoint(addr)
+ if err != nil {
+ t.Fatalf("inaming.NewEndpoint(%q): %v", addr, err)
+ }
+ serverName := naming.JoinAddressName(ep.String(), "server/suffix")
+ makeCall := func() (string, error) {
+ call, err := b.client.StartCall(serverName, "Echo", []interface{}{"bratman"})
+ if err != nil {
+ return "", err
+ }
+ var result string
+ if err = call.Finish(&result); err != nil {
+ return "", err
+ }
+ return result, nil
+ }
+ expected := `method:"Echo",suffix:"suffix",arg:"bratman"`
+ if result, err := makeCall(); err != nil || result != expected {
+ t.Errorf("Got (%q, %v) want (%q, nil)", result, err, expected)
+ }
+ // Kill the server, verify client can't talk to it anymore.
+ server.Cleanup()
+ if _, err := makeCall(); err == nil {
+ t.Fatal("Expected call to fail since server is dead")
+ }
+ // Resurrect the server with the same address, verify client
+ // re-establishes the connection.
+ server = blackbox.HelperCommand(t, "runServer", addr, idFile)
+ defer server.Cleanup()
+ server.Cmd.Start()
+ if addr2, err := server.ReadLineFromChild(); addr2 != addr || err != nil {
+ t.Fatalf("Got (%q, %v) want (%q, nil)", addr2, err, addr)
+ }
+ if result, err := makeCall(); err != nil || result != expected {
+ t.Errorf("Got (%q, %v) want (%q, nil)", result, err, expected)
+ }
+}
+
+func loadIdentityFromFile(file string) security.PrivateID {
+ f, err := os.Open(file)
+ if err != nil {
+ vlog.Fatalf("failed to open %v: %v", file, err)
+ }
+ id, err := security.LoadIdentity(f)
+ f.Close()
+ if err != nil {
+ vlog.Fatalf("Failed to load identity from %v: %v", file, err)
+ }
+ return id
+}
+
+func runServer(argv []string) {
+ mgr := imanager.InternalNew(naming.FixedRoutingID(0x1111111))
+ mt := newMountTable()
+ id := loadIdentityFromFile(argv[1])
+ isecurity.TrustIdentityProviders(id)
+ server, err := InternalNewServer(mgr, mt, veyron2.LocalID(id))
+ if err != nil {
+ vlog.Fatalf("InternalNewServer failed: %v", err)
+ }
+ disp := testServerDisp{new(testServer)}
+ if err := server.Register("server", disp); err != nil {
+ vlog.Fatalf("server.Register failed: %v", err)
+ }
+ ep, err := server.Listen("tcp", argv[0])
+ if err != nil {
+ vlog.Fatalf("server.Listen failed: %v", err)
+ }
+ fmt.Println(ep.Addr())
+ // Live forever (parent process should explicitly kill us).
+ <-make(chan struct{})
+}
+
+// Required by blackbox framework.
+func TestHelperProcess(t *testing.T) {
+ blackbox.HelperProcess(t)
+}
+
func init() {
var err error
if clientID, err = isecurity.NewPrivateID("client"); err != nil {
- log.Fatalf("failed isecurity.NewPrivateID: %s", err)
+ vlog.Fatalf("failed isecurity.NewPrivateID: %s", err)
}
if serverID, err = isecurity.NewPrivateID("server"); err != nil {
- log.Fatalf("failed isecurity.NewPrivateID: %s", err)
+ vlog.Fatalf("failed isecurity.NewPrivateID: %s", err)
}
isecurity.TrustIdentityProviders(clientID)
isecurity.TrustIdentityProviders(serverID)
+
+ blackbox.CommandTable["runServer"] = runServer
}
diff --git a/runtimes/google/ipc/server.go b/runtimes/google/ipc/server.go
index bcac1e3..d05fcc0 100644
--- a/runtimes/google/ipc/server.go
+++ b/runtimes/google/ipc/server.go
@@ -146,6 +146,10 @@
s.active.Add(1)
go func(flow stream.Flow) {
if err := newFlowServer(flow, s).serve(); err != nil {
+ // TODO(caprita): Logging errors here is
+ // too spammy. For example, "not
+ // authorized" errors shouldn't be
+ // logged as server errors.
vlog.Errorf("Flow serve on (%v, %v) failed: %v", protocol, address, err)
}
s.active.Done()
diff --git a/runtimes/google/ipc/stream/manager/manager.go b/runtimes/google/ipc/stream/manager/manager.go
index 3b1a084..3a19169 100644
--- a/runtimes/google/ipc/stream/manager/manager.go
+++ b/runtimes/google/ipc/stream/manager/manager.go
@@ -110,11 +110,12 @@
func (m *manager) Listen(protocol, address string, opts ...stream.ListenerOpt) (stream.Listener, naming.Endpoint, error) {
var rewriteEP string
- for i, o := range opts {
- if rewriteOpt, ok := o.(veyron2.EndpointRewriteOpt); ok {
+ for i := 0; i < len(opts); i++ {
+ if rewriteOpt, ok := opts[i].(veyron2.EndpointRewriteOpt); ok {
// Last one 'wins'.
rewriteEP = string(rewriteOpt)
opts = append(opts[:i], opts[i+1:]...)
+ i--
}
}
m.muListeners.Lock()
diff --git a/services/mounttable/lib/mounttable.go b/services/mounttable/lib/mounttable.go
index ee35018..28eb42f 100644
--- a/services/mounttable/lib/mounttable.go
+++ b/services/mounttable/lib/mounttable.go
@@ -51,6 +51,12 @@
children map[string]*node
}
+// dummyAuth allows all RPCs.
+type dummyAuth struct { }
+func (dummyAuth) Authorize(security.Context) error {
+ return nil
+}
+
// NewMountTable creates a new server that uses the default authorization policy.
func NewMountTable() *mountTable {
return &mountTable{
@@ -71,7 +77,7 @@
ms.elems = strings.Split(name, "/")
ms.cleanedElems = strings.Split(strings.TrimLeft(path.Clean(name), "/"), "/")
}
- return ipc.ReflectInvoker(mounttable.NewServerMountTable(ms)), nil, nil
+ return ipc.ReflectInvoker(mounttable.NewServerMountTable(ms)), new(dummyAuth), nil
}
// findNode returns the node for the name path represented by elems. If none exists and create is false, return nil.
diff --git a/services/wspr/wsprd/lib/wspr.go b/services/wspr/wsprd/lib/wspr.go
index 600f7ae..31c96e0 100644
--- a/services/wspr/wsprd/lib/wspr.go
+++ b/services/wspr/wsprd/lib/wspr.go
@@ -26,6 +26,7 @@
"log"
"net/http"
_ "net/http/pprof"
+ "os"
"path/filepath"
"runtime"
"strings"
@@ -49,6 +50,10 @@
pongTimeout = pingInterval + 10*time.Second // maximum wait for pong.
)
+type wsprConfig struct {
+ MounttableRoot []string
+}
+
type WSPR struct {
tlsCert *tls.Certificate
clientCache *ClientCache
@@ -505,13 +510,37 @@
wsp.ctx.logger.VI(0).Infof("websocket upgrade failed: %s", err)
return
}
+
wsp.setup()
wsp.ws = ws
wsp.ws.SetPongHandler(wsp.pongHandler)
+ wsp.sendInitialMessage()
go wsp.readLoop()
go wsp.pingLoop()
}
+// Upon first connect, we send a message with the wsprConfig.
+func (wsp *websocketPipe) sendInitialMessage() {
+ mounttableRoots := strings.Split(os.Getenv("MOUNTTABLE_ROOT"), ",")
+ if len(mounttableRoots) == 1 && mounttableRoots[0] == "" {
+ mounttableRoots = []string{}
+ }
+ msg := wsprConfig{
+ MounttableRoot: mounttableRoots,
+ }
+
+ wc, err := wsp.ws.NextWriter(websocket.TextMessage)
+ if err != nil {
+ wsp.ctx.logger.VI(0).Infof("failed to create websocket writer: %s", err)
+ return
+ }
+ if err := vom.ObjToJSON(wc, vom.ValueOf(msg)); err != nil {
+ wsp.ctx.logger.VI(0).Infof("failed to convert wspr config to json: %s", err)
+ return
+ }
+ wc.Close()
+}
+
func (wsp *websocketPipe) pingLoop() {
for {
time.Sleep(pingInterval)