Merge "store/ptrie: add tests/benchmark"
diff --git a/runtime/factories/roaming/.api b/runtime/factories/roaming/.api
index c15af37..7443e1d 100644
--- a/runtime/factories/roaming/.api
+++ b/runtime/factories/roaming/.api
@@ -1,4 +1,4 @@
pkg roaming, const SettingsStreamDesc ideal-string
pkg roaming, const SettingsStreamName ideal-string
pkg roaming, func Init(*context.T) (v23.Runtime, *context.T, v23.Shutdown, error)
-pkg roaming, func NewProxy(*context.T, rpc.ListenSpec, ...string) (func(), naming.Endpoint, error)
+pkg roaming, func NewProxy(*context.T, rpc.ListenSpec, security.Authorizer, ...string) (func(), naming.Endpoint, error)
diff --git a/runtime/factories/static/.api b/runtime/factories/static/.api
index 2d3ebb3..c6a2583 100644
--- a/runtime/factories/static/.api
+++ b/runtime/factories/static/.api
@@ -1,2 +1,2 @@
pkg static, func Init(*context.T) (v23.Runtime, *context.T, v23.Shutdown, error)
-pkg static, func NewProxy(*context.T, rpc.ListenSpec, ...string) (func(), naming.Endpoint, error)
+pkg static, func NewProxy(*context.T, rpc.ListenSpec, security.Authorizer, ...string) (func(), naming.Endpoint, error)
diff --git a/runtime/internal/flow/manager/manager.go b/runtime/internal/flow/manager/manager.go
index f6432bc..0254e9c 100644
--- a/runtime/internal/flow/manager/manager.go
+++ b/runtime/internal/flow/manager/manager.go
@@ -26,6 +26,8 @@
"v.io/x/ref/runtime/internal/rpc/version"
)
+const reconnectDelay = 50 * time.Millisecond
+
type manager struct {
rid naming.RoutingID
closed chan struct{}
@@ -34,18 +36,20 @@
mu *sync.Mutex
listenEndpoints []naming.Endpoint
+ proxyEndpoints map[string][]naming.Endpoint // keyed by proxy address
listeners []flow.Listener
wg sync.WaitGroup
}
func New(ctx *context.T, rid naming.RoutingID) flow.Manager {
m := &manager{
- rid: rid,
- closed: make(chan struct{}),
- q: upcqueue.New(),
- cache: NewConnCache(),
- mu: &sync.Mutex{},
- listeners: []flow.Listener{},
+ rid: rid,
+ closed: make(chan struct{}),
+ q: upcqueue.New(),
+ cache: NewConnCache(),
+ mu: &sync.Mutex{},
+ proxyEndpoints: make(map[string][]naming.Endpoint),
+ listeners: []flow.Listener{},
}
go func() {
select {
@@ -72,28 +76,16 @@
// The flow.Manager associated with ctx must be the receiver of the method,
// otherwise an error is returned.
func (m *manager) Listen(ctx *context.T, protocol, address string) error {
- var (
- eps []naming.Endpoint
- err error
- )
if protocol == inaming.Network {
- eps, err = m.proxyListen(ctx, address)
- } else {
- eps, err = m.listen(ctx, protocol, address)
+ return m.proxyListen(ctx, address)
}
- if err != nil {
- return err
- }
- m.mu.Lock()
- m.listenEndpoints = append(m.listenEndpoints, eps...)
- m.mu.Unlock()
- return nil
+ return m.listen(ctx, protocol, address)
}
-func (m *manager) listen(ctx *context.T, protocol, address string) ([]naming.Endpoint, error) {
+func (m *manager) listen(ctx *context.T, protocol, address string) error {
ln, err := listen(ctx, protocol, address)
if err != nil {
- return nil, flow.NewErrNetwork(ctx, err)
+ return flow.NewErrNetwork(ctx, err)
}
local := &inaming.Endpoint{
Protocol: protocol,
@@ -102,33 +94,69 @@
}
m.mu.Lock()
if m.listeners == nil {
- return nil, flow.NewErrBadState(ctx, NewErrManagerClosed(ctx))
+ return flow.NewErrBadState(ctx, NewErrManagerClosed(ctx))
}
m.listeners = append(m.listeners, ln)
m.mu.Unlock()
m.wg.Add(1)
go m.lnAcceptLoop(ctx, ln, local)
- return []naming.Endpoint{local}, nil
+ m.mu.Lock()
+ m.listenEndpoints = append(m.listenEndpoints, local)
+ m.mu.Unlock()
+ return nil
}
-func (m *manager) proxyListen(ctx *context.T, address string) ([]naming.Endpoint, error) {
+func (m *manager) proxyListen(ctx *context.T, address string) error {
ep, err := inaming.NewEndpoint(address)
if err != nil {
- return nil, flow.NewErrBadArg(ctx, err)
+ return flow.NewErrBadArg(ctx, err)
}
- f, err := m.internalDial(ctx, ep, proxyBlessingsForPeer{}.run, &proxyFlowHandler{ctx: ctx, m: m})
- if err != nil {
- return nil, flow.NewErrNetwork(ctx, err)
- }
- w, err := message.Append(ctx, &message.ProxyServerRequest{}, nil)
- if err != nil {
- return nil, flow.NewErrBadArg(ctx, err)
- }
- if _, err := f.WriteMsg(w); err != nil {
- return nil, flow.NewErrBadArg(ctx, err)
- }
+ m.wg.Add(1)
+ go m.connectToProxy(ctx, address, ep)
+ return nil
+}
- return m.readProxyResponse(ctx, f)
+func (m *manager) connectToProxy(ctx *context.T, address string, ep naming.Endpoint) {
+ defer m.wg.Done()
+ for delay := reconnectDelay; ; delay *= 2 {
+ time.Sleep(delay - reconnectDelay)
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ }
+ f, err := m.internalDial(ctx, ep, proxyBlessingsForPeer{}.run, &proxyFlowHandler{ctx: ctx, m: m})
+ if err != nil {
+ ctx.Error(err)
+ continue
+ }
+ w, err := message.Append(ctx, &message.ProxyServerRequest{}, nil)
+ if err != nil {
+ ctx.Error(err)
+ continue
+ }
+ if _, err = f.WriteMsg(w); err != nil {
+ ctx.Error(err)
+ continue
+ }
+ eps, err := m.readProxyResponse(ctx, f)
+ if err != nil {
+ ctx.Error(err)
+ continue
+ }
+ m.mu.Lock()
+ m.proxyEndpoints[address] = eps
+ m.mu.Unlock()
+ select {
+ case <-ctx.Done():
+ return
+ case <-f.Closed():
+ m.mu.Lock()
+ delete(m.proxyEndpoints, address)
+ m.mu.Unlock()
+ delay = reconnectDelay
+ }
+ }
}
func (m *manager) readProxyResponse(ctx *context.T, f flow.Flow) ([]naming.Endpoint, error) {
@@ -176,29 +204,37 @@
ctx.Errorf("ln.Accept on localEP %v failed: %v", local, err)
return
}
+ cached := make(chan struct{})
c, err := conn.NewAccepted(
ctx,
flowConn,
local,
version.Supported,
- &flowHandler{q: m.q},
+ &flowHandler{q: m.q, cached: cached},
)
if err != nil {
+ close(cached)
flowConn.Close()
ctx.Errorf("failed to accept flow.Conn on localEP %v failed: %v", local, err)
continue
}
if err := m.cache.InsertWithRoutingID(c); err != nil {
- ctx.VI(2).Infof("failed to cache conn %v: %v", c, err)
+ close(cached)
+ ctx.Errorf("failed to cache conn %v: %v", c, err)
}
+ close(cached)
}
}
type flowHandler struct {
- q *upcqueue.T
+ q *upcqueue.T
+ cached chan struct{}
}
func (h *flowHandler) HandleFlow(f flow.Flow) error {
+ if h.cached != nil {
+ <-h.cached
+ }
return h.q.Put(f)
}
@@ -236,6 +272,9 @@
m.mu.Lock()
ret := make([]naming.Endpoint, len(m.listenEndpoints))
copy(ret, m.listenEndpoints)
+ for _, peps := range m.proxyEndpoints {
+ ret = append(ret, peps...)
+ }
m.mu.Unlock()
if len(ret) == 0 {
ret = append(ret, &inaming.Endpoint{RID: m.rid})
@@ -334,6 +373,7 @@
fh,
)
if err != nil {
+ flowConn.Close()
if verror.ErrorID(err) == message.ErrWrongProtocol.ID {
return nil, err
}
@@ -350,16 +390,17 @@
// If we are dialing out to a Proxy, we need to dial a conn on this flow, and
// return a flow on that corresponding conn.
- if remote.RoutingID() != c.RemoteEndpoint().RoutingID() {
+ if proxyConn := c; remote.RoutingID() != proxyConn.RemoteEndpoint().RoutingID() {
c, err = conn.NewDialed(
ctx,
f,
- c.LocalEndpoint(),
+ proxyConn.LocalEndpoint(),
remote,
version.Supported,
fh,
)
if err != nil {
+ proxyConn.Close(ctx, err)
if verror.ErrorID(err) == message.ErrWrongProtocol.ID {
return nil, err
}
@@ -370,6 +411,7 @@
}
f, err = c.Dial(ctx, fn)
if err != nil {
+ proxyConn.Close(ctx, err)
return nil, flow.NewErrDialFailed(ctx, err)
}
}
diff --git a/runtime/internal/flow/manager/manager_test.go b/runtime/internal/flow/manager/manager_test.go
index d0a9a41..9752a56 100644
--- a/runtime/internal/flow/manager/manager_test.go
+++ b/runtime/internal/flow/manager/manager_test.go
@@ -31,22 +31,23 @@
func TestDirectConnection(t *testing.T) {
defer goroutines.NoLeaks(t, leakWaitTime)()
ctx, shutdown := v23.Init()
- defer shutdown()
- rid := naming.FixedRoutingID(0x5555)
- m := New(ctx, rid)
-
- if err := m.Listen(ctx, "tcp", "127.0.0.1:0"); err != nil {
+ am := New(ctx, naming.FixedRoutingID(0x5555))
+ if err := am.Listen(ctx, "tcp", "127.0.0.1:0"); err != nil {
t.Fatal(err)
}
+ dm := New(ctx, naming.FixedRoutingID(0x1111))
- testFlows(t, ctx, m, m, flowtest.BlessingsForPeer)
+ testFlows(t, ctx, dm, am, flowtest.BlessingsForPeer)
+
+ shutdown()
+ <-am.Closed()
+ <-dm.Closed()
}
func TestDialCachedConn(t *testing.T) {
defer goroutines.NoLeaks(t, leakWaitTime)()
ctx, shutdown := v23.Init()
- defer shutdown()
am := New(ctx, naming.FixedRoutingID(0x5555))
if err := am.Listen(ctx, "tcp", "127.0.0.1:0"); err != nil {
@@ -63,54 +64,67 @@
if got, want := len(dm.(*manager).cache.addrCache), 1; got != want {
t.Fatalf("got cache size %v, want %v", got, want)
}
+ old := dm.(*manager).cache.ridCache[am.RoutingID()]
// After dialing another connection the cache should still hold one connection
// because the connections should be reused.
testFlows(t, ctx, dm, am, flowtest.BlessingsForPeer)
if got, want := len(dm.(*manager).cache.addrCache), 1; got != want {
- t.Fatalf("got cache size %v, want %v", got, want)
+ t.Errorf("got cache size %v, want %v", got, want)
}
+ if c := dm.(*manager).cache.ridCache[am.RoutingID()]; c != old {
+ t.Errorf("got %v want %v", c, old)
+ }
+
+ shutdown()
+ <-am.Closed()
+ <-dm.Closed()
}
func TestBidirectionalListeningEndpoint(t *testing.T) {
defer goroutines.NoLeaks(t, leakWaitTime)()
ctx, shutdown := v23.Init()
- defer shutdown()
am := New(ctx, naming.FixedRoutingID(0x5555))
if err := am.Listen(ctx, "tcp", "127.0.0.1:0"); err != nil {
t.Fatal(err)
}
- eps := am.ListeningEndpoints()
- if len(eps) == 0 {
- t.Fatalf("no endpoints listened on")
- }
+
dm := New(ctx, naming.FixedRoutingID(0x1111))
testFlows(t, ctx, dm, am, flowtest.BlessingsForPeer)
// Now am should be able to make a flow to dm even though dm is not listening.
testFlows(t, ctx, am, dm, flowtest.BlessingsForPeer)
+
+ shutdown()
+ <-am.Closed()
+ <-dm.Closed()
}
func TestNullClientBlessings(t *testing.T) {
+ defer goroutines.NoLeaks(t, leakWaitTime)()
ctx, shutdown := v23.Init()
- defer shutdown()
am := New(ctx, naming.FixedRoutingID(0x5555))
if err := am.Listen(ctx, "tcp", "127.0.0.1:0"); err != nil {
t.Fatal(err)
}
- dm := New(ctx, naming.NullRoutingID)
- _, af := testFlows(t, ctx, dm, am, flowtest.BlessingsForPeer)
+ nulldm := New(ctx, naming.NullRoutingID)
+ _, af := testFlows(t, ctx, nulldm, am, flowtest.BlessingsForPeer)
// Ensure that the remote blessings of the underlying conn of the accepted flow are zero.
if rBlessings := af.Conn().(*conn.Conn).RemoteBlessings(); !rBlessings.IsZero() {
t.Errorf("got %v, want zero-value blessings", rBlessings)
}
- dm = New(ctx, naming.FixedRoutingID(0x1111))
+ dm := New(ctx, naming.FixedRoutingID(0x1111))
_, af = testFlows(t, ctx, dm, am, flowtest.BlessingsForPeer)
// Ensure that the remote blessings of the underlying conn of the accepted flow are
// non-zero if we did specify a RoutingID.
if rBlessings := af.Conn().(*conn.Conn).RemoteBlessings(); rBlessings.IsZero() {
t.Errorf("got %v, want non-zero blessings", rBlessings)
}
+
+ shutdown()
+ <-am.Closed()
+ <-dm.Closed()
+ <-nulldm.Closed()
}
func testFlows(t *testing.T, ctx *context.T, dm, am flow.Manager, bFn flow.BlessingsForPeer) (df, af flow.Flow) {
diff --git a/runtime/internal/naming/namespace/namespace.go b/runtime/internal/naming/namespace/namespace.go
index 1a9f109..eaa20c4 100644
--- a/runtime/internal/naming/namespace/namespace.go
+++ b/runtime/internal/naming/namespace/namespace.go
@@ -159,9 +159,9 @@
case verror.ErrBadArg.ID:
// This should cover "rpc: wrong number of in-args".
return true
- case verror.ErrNoExist.ID, verror.ErrUnknownMethod.ID, verror.ErrUnknownSuffix.ID:
+ case verror.ErrNoExist.ID, verror.ErrUnknownMethod.ID, verror.ErrUnknownSuffix.ID, errNoServers.ID:
// This should cover "rpc: unknown method", "rpc: dispatcher not
- // found", and dispatcher Lookup not found errors.
+ // found", dispatcher Lookup not found, and "No servers found to resolve query "errors.
return true
case verror.ErrBadProtocol.ID:
// This covers "rpc: response decoding failed: EOF".
diff --git a/runtime/internal/rpc/xserver.go b/runtime/internal/rpc/xserver.go
index 8b3b892..10ed8a0 100644
--- a/runtime/internal/rpc/xserver.go
+++ b/runtime/internal/rpc/xserver.go
@@ -55,9 +55,6 @@
chosenEndpoints []*inaming.Endpoint
typeCache *typeCache
- // state of proxies keyed by the name of the proxy
- proxies map[string]proxyState
-
disp rpc.Dispatcher // dispatcher to serve RPCs
dispReserved rpc.Dispatcher // dispatcher for reserved methods
active sync.WaitGroup // active goroutines we've spawned.
@@ -103,7 +100,6 @@
principal: principal,
blessings: principal.BlessingStore().Default(),
publisher: publisher.New(ctx, ns, publishPeriod),
- proxies: make(map[string]proxyState),
stoppedChan: make(chan struct{}),
ns: ns,
stats: newRPCStats(statsPrefix),
@@ -247,10 +243,18 @@
s.Lock()
defer s.Unlock()
var lastErr error
+ if len(listenSpec.Proxy) > 0 {
+ lastErr = s.flowMgr.Listen(ctx, inaming.Network, listenSpec.Proxy)
+ if lastErr != nil {
+ s.ctx.VI(2).Infof("Listen(%q, %q, ...) failed: %v", inaming.Network, listenSpec.Proxy, lastErr)
+ }
+ }
for _, addr := range listenSpec.Addrs {
if len(addr.Address) > 0 {
lastErr = s.flowMgr.Listen(ctx, addr.Protocol, addr.Address)
- s.ctx.VI(2).Infof("Listen(%q, %q, ...) failed: %v", addr.Protocol, addr.Address, lastErr)
+ if lastErr != nil {
+ s.ctx.VI(2).Infof("Listen(%q, %q, ...) failed: %v", addr.Protocol, addr.Address, lastErr)
+ }
}
}
diff --git a/services/device/deviced/internal/starter/starter.go b/services/device/deviced/internal/starter/starter.go
index 3ad0a74..0849d6a 100644
--- a/services/device/deviced/internal/starter/starter.go
+++ b/services/device/deviced/internal/starter/starter.go
@@ -173,16 +173,30 @@
}
var epName string
if args.Device.ListenSpec.Proxy != "" {
- for {
- p := server.Status().Proxies
- if len(p) == 0 {
+ if os.Getenv("V23_RPC_TRANSITION_STATE") == "xservers" {
+ for {
+ eps := server.Status().Endpoints
+ if len(eps) > 0 && len(eps[0].Addr().Network()) > 0 {
+ epName = eps[0].Name()
+ ctx.Infof("Proxied address: %s", epName)
+ break
+ }
ctx.Infof("Waiting for proxy address to appear...")
time.Sleep(time.Second)
- continue
}
- epName = p[0].Endpoint.Name()
- ctx.Infof("Proxied address: %s", epName)
- break
+ } else {
+ // TODO(suharshs): Remove this else block once the transition is complete.
+ for {
+ p := server.Status().Proxies
+ if len(p) == 0 {
+ ctx.Infof("Waiting for proxy address to appear...")
+ time.Sleep(time.Second)
+ continue
+ }
+ epName = p[0].Endpoint.Name()
+ ctx.Infof("Proxied address: %s", epName)
+ break
+ }
}
} else {
if len(endpoints) == 0 {
diff --git a/services/device/dmrun/backend/backend.go b/services/device/dmrun/backend/backend.go
new file mode 100644
index 0000000..da8ca3e
--- /dev/null
+++ b/services/device/dmrun/backend/backend.go
@@ -0,0 +1,37 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package backend
+
+import "fmt"
+
+type CloudVM interface {
+ // Name of the VM instance that the object talks to
+ Name() string
+
+ // IP address (as a string) of the VM instance
+ IP() string
+
+ // Execute a command on the VM instance
+ RunCommand(...string) (output []byte, err error)
+
+ // Copy a file to the VM instance
+ CopyFile(infile, destination string) error
+
+ // Delete the VM instance
+ Delete() error
+
+ // Provide the command that the user can use to delete a VM instance for which Delete()
+ // was not called
+ DeleteCommandForUser() string
+}
+
+func CreateCloudVM(instanceName string, options interface{}) (CloudVM, error) {
+ switch options.(type) {
+ default:
+ return nil, fmt.Errorf("Unknown options type")
+ case VcloudVMOptions:
+ return newVcloudVM(instanceName, options.(VcloudVMOptions))
+ }
+}
diff --git a/services/device/dmrun/backend/backend_vcloud.go b/services/device/dmrun/backend/backend_vcloud.go
new file mode 100644
index 0000000..38a68b4
--- /dev/null
+++ b/services/device/dmrun/backend/backend_vcloud.go
@@ -0,0 +1,114 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package backend
+
+import (
+ "fmt"
+ "net"
+ "os/exec"
+ "strings"
+)
+
+type VcloudVM struct {
+ vcloud string // path to vcloud command
+ sshUser string // ssh into the VM as this user
+ projectArg, zoneArg string // common flags used with the vcloud command
+ name, ip string
+ isDeleted bool
+}
+
+type VcloudVMOptions struct {
+ VcloudBinary string // path to the "vcloud" command
+}
+
+func newVcloudVM(instanceName string, opt VcloudVMOptions) (vm *VcloudVM, err error) {
+ // TODO: Make sshUser, zone, and project configurable
+ g := &VcloudVM{
+ vcloud: opt.VcloudBinary,
+ sshUser: "veyron",
+ projectArg: "--project=google.com:veyron",
+ zoneArg: "--zone=us-central1-c",
+ isDeleted: false,
+ }
+
+ cmd := exec.Command(g.vcloud, "node", "create", g.projectArg, g.zoneArg, instanceName)
+ if output, err := cmd.CombinedOutput(); err != nil {
+ return nil, fmt.Errorf("setting up new GCE instance (%v) failed. Error: (%v) Output:\n%v", strings.Join(cmd.Args, " "), err, string(output))
+ }
+
+ cmd = exec.Command(g.vcloud, "list", g.projectArg, "--noheader", "--fields=EXTERNAL_IP", instanceName)
+ output, err := cmd.CombinedOutput()
+ if err != nil {
+ return nil, fmt.Errorf("listing instances (%v) failed. Error: (%v) Output:\n%v", strings.Join(cmd.Args, " "), err, string(output))
+ }
+ tmpIP := strings.TrimSpace(string(output))
+ if net.ParseIP(tmpIP) == nil {
+ return nil, fmt.Errorf("IP of new instance is not a valid IP address: %v", tmpIP)
+ }
+ g.ip = tmpIP
+ g.name = instanceName
+ return g, nil
+}
+
+func (g *VcloudVM) Delete() error {
+ if g.isDeleted {
+ return fmt.Errorf("trying to delete a deleted VcloudVM")
+ }
+
+ cmd := exec.Command(g.vcloud, "node", "delete", g.projectArg, g.zoneArg, g.name)
+ output, err := cmd.CombinedOutput()
+ if err != nil {
+ err = fmt.Errorf("failed deleting GCE instance (%s): %v\nOutput:%v\n", strings.Join(cmd.Args, " "), err, string(output))
+ } else {
+ g.isDeleted = true
+ g.name = ""
+ g.ip = ""
+ }
+ return err
+}
+
+func (g *VcloudVM) Name() string {
+ return g.name
+}
+
+func (g *VcloudVM) IP() string {
+ return g.ip
+}
+
+func (g *VcloudVM) RunCommand(args ...string) ([]byte, error) {
+ if g.isDeleted {
+ return nil, fmt.Errorf("RunCommand called on deleted VcloudVM")
+ }
+
+ cmd := exec.Command(g.vcloud, append([]string{"sh", g.projectArg, g.name}, args...)...)
+ output, err := cmd.CombinedOutput()
+ if err != nil {
+ err = fmt.Errorf("failed running [%s] on VM %s", strings.Join(args, " "), g.name)
+ }
+ return output, err
+}
+
+func (g *VcloudVM) CopyFile(infile, destination string) error {
+ if g.isDeleted {
+ return fmt.Errorf("CopyFile called on deleted VcloudVM")
+ }
+
+ cmd := exec.Command("gcloud", "compute", g.projectArg, "copy-files", infile, fmt.Sprintf("%s@%s:/%s", g.sshUser, g.Name(), destination), g.zoneArg)
+ output, err := cmd.CombinedOutput()
+ if err != nil {
+ err = fmt.Errorf("failed copying %s to %s:%s - %v\nOutput:\n%v", infile, g.name, destination, err, string(output))
+ }
+ return err
+}
+
+func (g *VcloudVM) DeleteCommandForUser() string {
+ if g.isDeleted {
+ return ""
+ }
+
+ // We can't return the vcloud binary that we ran for the steps above, as that one is deleted
+ // after use. For now, we assume the user will have a vcloud binary on his path to use.
+ return strings.Join([]string{"vcloud", "node", "delete", g.projectArg, g.zoneArg, g.name}, " ")
+}
diff --git a/services/device/dmrun/dmrun.go b/services/device/dmrun/dmrun.go
index d816542..d722b58 100644
--- a/services/device/dmrun/dmrun.go
+++ b/services/device/dmrun/dmrun.go
@@ -2,9 +2,9 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
-// Command dmrun runs a binary on a remote GCE instance using device manager.
+// Command dmrun runs a binary on a remote VM instance using device manager.
//
-// dmrun creates the GCE instance, installs and starts device manager on it, and
+// dmrun creates the VM instance, installs and starts device manager on it, and
// then installs and starts an app from the specified binary.
//
// dmrun uses the credentials it is running with in order to claim the device
@@ -35,12 +35,14 @@
"time"
"v.io/x/ref"
+ "v.io/x/ref/services/device/dmrun/backend"
)
var (
workDir string
vcloud string
device string
+ vm backend.CloudVM
cleanupOnDeath func()
)
@@ -148,39 +150,28 @@
return zipFile
}
-// setupInstance creates a new GCE instance and returns its name and IP address.
-func setupInstance() (string, string) {
+// setupInstance creates a new VM instance and returns its name and IP address.
+func setupInstance(vmOptions interface{}) (backend.CloudVM, string, string) {
currUser, err := user.Current()
dieIfErr(err, "Couldn't obtain current user")
instanceName := fmt.Sprintf("%s-%s", currUser.Username, time.Now().UTC().Format("20060102-150405"))
- // TODO(caprita): Allow project and zone to be customized.
- cmd := exec.Command(vcloud, "node", "create", "--project=google.com:veyron", "--zone=us-central1-c", instanceName)
- output, err := cmd.CombinedOutput()
- dieIfErr(err, "Setting up new GCE instance (%v) failed. Output:\n%v", strings.Join(cmd.Args, " "), string(output))
- cmd = exec.Command(vcloud, "list", "--project=google.com:veyron", "--noheader", "--fields=EXTERNAL_IP", instanceName)
- output, err = cmd.CombinedOutput()
- dieIfErr(err, "Listing instances (%v) failed. Output:\n%v", strings.Join(cmd.Args, " "), string(output))
- instanceIP := strings.TrimSpace(string(output))
- if net.ParseIP(instanceIP) == nil {
- die("Not a valid IP address: %v", instanceIP)
- }
+ vm, err = backend.CreateCloudVM(instanceName, vmOptions)
+ dieIfErr(err, "VM Instance Creation Failed: %v", err)
+ instanceIP := vm.IP()
// Install unzip so we can unpack the archive.
// TODO(caprita): Use tar instead.
- cmd = exec.Command(vcloud, "sh", "--project=google.com:veyron", instanceName, "sudo", "apt-get", "install", "unzip")
- output, err = cmd.CombinedOutput()
- dieIfErr(err, "Installing unzip (%v) failed. Output:\n%v", strings.Join(cmd.Args, " "), string(output))
- fmt.Println("Created GCE instance", instanceName, "with IP", instanceIP)
- return instanceName, instanceIP
+ output, err := vm.RunCommand("sudo", "apt-get", "install", "unzip")
+ dieIfErr(err, "Installing unzip failed. Output:\n%v", string(output))
+ fmt.Println("Created VM instance", instanceName, "with IP", instanceIP)
+ return vm, instanceName, instanceIP
}
-// installArchive ships the archive to the GCE instance and unpacks it.
+// installArchive ships the archive to the VM instance and unpacks it.
func installArchive(archive, instance string) {
- cmd := exec.Command("gcloud", "compute", "--project=google.com:veyron", "copy-files", archive, fmt.Sprintf("veyron@%s:/tmp/", instance), "--zone=us-central1-c")
- output, err := cmd.CombinedOutput()
- dieIfErr(err, "Copying archive (%v) failed. Output:\n%v", strings.Join(cmd.Args, " "), string(output))
- cmd = exec.Command(vcloud, "sh", "--project=google.com:veyron", instance, "unzip", path.Join("/tmp", filepath.Base(archive)), "-d", "/tmp/unpacked")
- output, err = cmd.CombinedOutput()
- dieIfErr(err, "Extracting archive (%v) failed. Output:\n%v", strings.Join(cmd.Args, " "), string(output))
+ err := vm.CopyFile(archive, "/tmp/")
+ dieIfErr(err, "Copying archive failed: %v", err)
+ output, err := vm.RunCommand("unzip", path.Join("/tmp", filepath.Base(archive)), "-d", "/tmp/unpacked")
+ dieIfErr(err, "Extracting archive failed. Output:\n%v", string(output))
}
// installDevice installs and starts device manager, and returns the public key
@@ -188,12 +179,10 @@
func installDevice(instance string) (string, string) {
fmt.Println("Installing device manager...")
defer fmt.Println("Done installing device manager...")
- cmd := exec.Command(vcloud, "sh", "--project=google.com:veyron", instance, "V23_DEVICE_DIR=/tmp/dm", "/tmp/unpacked/devicex", "install", "/tmp/unpacked", "--single_user", "--", "--v23.tcp.address=:8151", "--deviced-port=8150", "--proxy-port=8160", "--use-pairing-token")
- output, err := cmd.CombinedOutput()
- dieIfErr(err, "Installing device manager (%v) failed. Output:\n%v", strings.Join(cmd.Args, " "), string(output))
- cmd = exec.Command(vcloud, "sh", "--project=google.com:veyron", instance, "V23_DEVICE_DIR=/tmp/dm", "/tmp/unpacked/devicex", "start")
- output, err = cmd.CombinedOutput()
- dieIfErr(err, "Starting device manager (%v) failed. Output:\n%v", strings.Join(cmd.Args, " "), string(output))
+ output, err := vm.RunCommand("V23_DEVICE_DIR=/tmp/dm", "/tmp/unpacked/devicex", "install", "/tmp/unpacked", "--single_user", "--", "--v23.tcp.address=:8151", "--deviced-port=8150", "--proxy-port=8160", "--use-pairing-token")
+ dieIfErr(err, "Installing device manager failed. Output:\n%v", string(output))
+ output, err = vm.RunCommand("V23_DEVICE_DIR=/tmp/dm", "/tmp/unpacked/devicex", "start")
+ dieIfErr(err, "Starting device manager failed. Output:\n%v", string(output))
// Grab the token and public key from the device manager log.
dieAfter := time.After(5 * time.Second)
firstIteration := true
@@ -207,9 +196,8 @@
} else {
firstIteration = false
}
- cmd = exec.Command(vcloud, "sh", "--project=google.com:veyron", instance, "cat", "/tmp/dm/dmroot/device-manager/logs/deviced.INFO")
- output, err = cmd.CombinedOutput()
- dieIfErr(err, "Reading device manager log (%v) failed. Output:\n%v", strings.Join(cmd.Args, " "), string(output))
+ output, err = vm.RunCommand("cat", "/tmp/dm/dmroot/device-manager/logs/deviced.INFO")
+ dieIfErr(err, "Reading device manager log failed. Output:\n%v", string(output))
pairingTokenRE := regexp.MustCompile("Device manager pairing token: (.*)")
matches := pairingTokenRE.FindSubmatch(output)
if matches == nil {
@@ -301,21 +289,21 @@
device = buildV23Binary(deviceBin)
dmBins := buildDMBinaries()
archive := createArchive(append(dmBins, getPath(devicexRepo, devicex)))
- gceInstanceName, gceInstanceIP := setupInstance()
+ vmOpts := backend.VcloudVMOptions{VcloudBinary: vcloud}
+ vm, vmInstanceName, vmInstanceIP := setupInstance(vmOpts)
cleanupOnDeath = func() {
- fmt.Fprintf(os.Stderr, "Deleting GCE instance ...\n")
- cmd := exec.Command(vcloud, "node", "delete", "--project=google.com:veyron", "--zone=us-central1-c", gceInstanceName)
- output, err := cmd.CombinedOutput()
+ fmt.Fprintf(os.Stderr, "Deleting VM instance ...\n")
+ err := vm.Delete()
fmt.Fprintf(os.Stderr, "Removing tmp files ...\n")
os.RemoveAll(workDir)
- dieIfErr(err, "Deleting GCE instance (%v) failed. Output:\n%v", strings.Join(cmd.Args, " "), string(output))
+ dieIfErr(err, "Deleting VM instance failed")
}
- installArchive(archive, gceInstanceName)
- publicKey, pairingToken := installDevice(gceInstanceName)
- deviceAddr := net.JoinHostPort(gceInstanceIP, "8150")
+ installArchive(archive, vmInstanceName)
+ publicKey, pairingToken := installDevice(vmInstanceName)
+ deviceAddr := net.JoinHostPort(vmInstanceIP, "8150")
deviceName := "/" + deviceAddr
- claimDevice(deviceName, gceInstanceIP, publicKey, pairingToken, gceInstanceName)
- installationName := installApp(deviceName, gceInstanceIP)
+ claimDevice(deviceName, vmInstanceIP, publicKey, pairingToken, vmInstanceName)
+ installationName := installApp(deviceName, vmInstanceIP)
instanceName := startApp(installationName, "app")
fmt.Println("Launched app.")
fmt.Println("-------------")
@@ -325,6 +313,6 @@
fmt.Printf("\t${V23_ROOT}/release/go/bin/debug glob %s/logs/*\n", instanceName)
fmt.Println("Dump e.g. the INFO log:")
fmt.Printf("\t${V23_ROOT}/release/go/bin/debug logs read %s/logs/app.INFO\n", instanceName)
- fmt.Println("Clean up by deleting the GCE instance:")
- fmt.Printf("\t${V23_ROOT}/release/go/bin/vcloud node delete --project=google.com:veyron --zone=us-central1-c %s\n", gceInstanceName)
+ fmt.Println("Clean up by deleting the VM instance:")
+ fmt.Printf("\t%s\n", vm.DeleteCommandForUser())
}
diff --git a/services/syncbase/server/interfaces/sync_types.vdl b/services/syncbase/server/interfaces/sync_types.vdl
index 65a7f32..849e8ec 100644
--- a/services/syncbase/server/interfaces/sync_types.vdl
+++ b/services/syncbase/server/interfaces/sync_types.vdl
@@ -58,6 +58,9 @@
CurVers string // current version number of the object.
Parents []string // 0, 1 or 2 parent versions that the current version is derived from.
UpdTime time.Time // timestamp when the update is generated.
+ PermId string // id of the permissions object controlling this version.
+ PermVers string // current version of the permissions object.
+ Shell bool // true when the mutation data is hidden due to permissions.
Delete bool // indicates whether the update resulted in object being deleted from the store.
BatchId uint64 // unique id of the Batch this update belongs to.
BatchCount uint64 // number of objects in the Batch.
diff --git a/services/syncbase/server/interfaces/sync_types.vdl.go b/services/syncbase/server/interfaces/sync_types.vdl.go
index ca29abc..a00591c 100644
--- a/services/syncbase/server/interfaces/sync_types.vdl.go
+++ b/services/syncbase/server/interfaces/sync_types.vdl.go
@@ -56,6 +56,9 @@
CurVers string // current version number of the object.
Parents []string // 0, 1 or 2 parent versions that the current version is derived from.
UpdTime time.Time // timestamp when the update is generated.
+ PermId string // id of the permissions object controlling this version.
+ PermVers string // current version of the permissions object.
+ Shell bool // true when the mutation data is hidden due to permissions.
Delete bool // indicates whether the update resulted in object being deleted from the store.
BatchId uint64 // unique id of the Batch this update belongs to.
BatchCount uint64 // number of objects in the Batch.
diff --git a/services/syncbase/server/watchable/types.vdl b/services/syncbase/server/watchable/types.vdl
index 3f5181b..984a8df 100644
--- a/services/syncbase/server/watchable/types.vdl
+++ b/services/syncbase/server/watchable/types.vdl
@@ -17,15 +17,23 @@
// PutOp represents a store put operation. The new version is written instead
// of the value to avoid duplicating the user data in the store. The version
-// is used to access the user data of that specific mutation.
+// is used to access the user data of that specific mutation. The key and the
+// version of the permissions entry that was checked to allow this put operation
+// are also tracked to secure the access to this history.
type PutOp struct {
- Key []byte
- Version []byte
+ Key []byte
+ Version []byte
+ PermKey []byte
+ PermVersion []byte
}
-// DeleteOp represents a store delete operation.
+// DeleteOp represents a store delete operation. The key and the version of the
+// permissions entry that was checked to allow this delete operation are also
+// tracked to secure the access to this history.
type DeleteOp struct {
- Key []byte
+ Key []byte
+ PermKey []byte
+ PermVersion []byte
}
// SyncGroupOp represents a change in SyncGroup tracking, adding or removing
@@ -43,9 +51,13 @@
// allows sync to initialize its metadata at the correct versions of the objects
// when they become syncable. These log entries should be filtered by the
// client-facing Watch interface because the user data did not actually change.
+// The key and the version of the permissions entry that was checked when the
+// key was accessed are also tracked to secure the access to this history.
type SyncSnapshotOp struct {
- Key []byte
- Version []byte
+ Key []byte
+ Version []byte
+ PermKey []byte
+ PermVersion []byte
}
// Op represents a store operation.
diff --git a/services/syncbase/server/watchable/types.vdl.go b/services/syncbase/server/watchable/types.vdl.go
index 71c4b90..f4114fa 100644
--- a/services/syncbase/server/watchable/types.vdl.go
+++ b/services/syncbase/server/watchable/types.vdl.go
@@ -35,10 +35,14 @@
// PutOp represents a store put operation. The new version is written instead
// of the value to avoid duplicating the user data in the store. The version
-// is used to access the user data of that specific mutation.
+// is used to access the user data of that specific mutation. The key and the
+// version of the permissions entry that was checked to allow this put operation
+// are also tracked to secure the access to this history.
type PutOp struct {
- Key []byte
- Version []byte
+ Key []byte
+ Version []byte
+ PermKey []byte
+ PermVersion []byte
}
func (PutOp) __VDLReflect(struct {
@@ -46,9 +50,13 @@
}) {
}
-// DeleteOp represents a store delete operation.
+// DeleteOp represents a store delete operation. The key and the version of the
+// permissions entry that was checked to allow this delete operation are also
+// tracked to secure the access to this history.
type DeleteOp struct {
- Key []byte
+ Key []byte
+ PermKey []byte
+ PermVersion []byte
}
func (DeleteOp) __VDLReflect(struct {
@@ -76,9 +84,13 @@
// allows sync to initialize its metadata at the correct versions of the objects
// when they become syncable. These log entries should be filtered by the
// client-facing Watch interface because the user data did not actually change.
+// The key and the version of the permissions entry that was checked when the
+// key was accessed are also tracked to secure the access to this history.
type SyncSnapshotOp struct {
- Key []byte
- Version []byte
+ Key []byte
+ Version []byte
+ PermKey []byte
+ PermVersion []byte
}
func (SyncSnapshotOp) __VDLReflect(struct {
diff --git a/services/syncbase/vsync/dag.go b/services/syncbase/vsync/dag.go
index 086296c..2caa50d 100644
--- a/services/syncbase/vsync/dag.go
+++ b/services/syncbase/vsync/dag.go
@@ -110,31 +110,9 @@
NoBatchId = uint64(0)
)
-// dagNode holds the information on a object mutation in the DAG.
-// Note: the batch ID and deleted flag are copies of information in the log
-// record. They are also stored in the DAG node to improve DAG traversal for
-// conflict resolution and pruning without having to fetch the full log record
-// every time.
-type dagNode struct {
- Level uint64 // node distance from root
- Parents []string // references to parent versions
- Logrec string // reference to log record
- BatchId uint64 // ID of a write batch
- Deleted bool // true if the change was a delete
-}
-
// batchSet holds information on a set of write batches.
type batchSet map[uint64]*batchInfo
-// batchInfo holds the information on a write batch:
-// - The map of syncable (versioned) objects: {oid: version}
-// - The total count of batch objects, including non-syncable ones.
-// TODO(rdaoud): add support to track the read and scan sets.
-type batchInfo struct {
- Objects map[string]string
- Count uint64
-}
-
// graftMap holds the state of DAG node grafting (attaching) per object.
type graftMap map[string]*graftInfo
diff --git a/services/syncbase/vsync/types.vdl b/services/syncbase/vsync/types.vdl
index a0c8731..820dec6 100644
--- a/services/syncbase/vsync/types.vdl
+++ b/services/syncbase/vsync/types.vdl
@@ -84,3 +84,27 @@
SyncPending bool
PendingGenVec interfaces.PrefixGenVector
}
+
+// dagNode holds the information on an object mutation in the DAG. The node
+// information is extracted from the log records exchanged between Syncbases.
+// They are also stored in the DAG node to improve DAG traversal for conflict
+// resolution and pruning without having to fetch the full log record.
+type dagNode struct {
+ Level uint64 // node distance from root
+ Parents []string // references to parent versions
+ Logrec string // reference to log record
+ BatchId uint64 // ID of a write batch
+ Shell bool // true when the data is hidden due to permissions
+ Deleted bool // true if the change was a delete
+ PermId string // ID of the permissions controlling this version
+ PermVers string // current version of the permissions object
+}
+
+// batchInfo holds the information on a write batch:
+// - The map of syncable (versioned) objects: {oid: version}
+// - The total count of batch objects, including non-syncable ones.
+// TODO(rdaoud): add support to track the read and scan sets.
+type batchInfo struct {
+ Objects map[string]string
+ Count uint64
+}
diff --git a/services/syncbase/vsync/types.vdl.go b/services/syncbase/vsync/types.vdl.go
index 34f425f..e352de4 100644
--- a/services/syncbase/vsync/types.vdl.go
+++ b/services/syncbase/vsync/types.vdl.go
@@ -104,12 +104,48 @@
}) {
}
+// dagNode holds the information on an object mutation in the DAG. The node
+// information is extracted from the log records exchanged between Syncbases.
+// They are also stored in the DAG node to improve DAG traversal for conflict
+// resolution and pruning without having to fetch the full log record.
+type dagNode struct {
+ Level uint64 // node distance from root
+ Parents []string // references to parent versions
+ Logrec string // reference to log record
+ BatchId uint64 // ID of a write batch
+ Shell bool // true when the data is hidden due to permissions
+ Deleted bool // true if the change was a delete
+ PermId string // ID of the permissions controlling this version
+ PermVers string // current version of the permissions object
+}
+
+func (dagNode) __VDLReflect(struct {
+ Name string `vdl:"v.io/x/ref/services/syncbase/vsync.dagNode"`
+}) {
+}
+
+// batchInfo holds the information on a write batch:
+// - The map of syncable (versioned) objects: {oid: version}
+// - The total count of batch objects, including non-syncable ones.
+// TODO(rdaoud): add support to track the read and scan sets.
+type batchInfo struct {
+ Objects map[string]string
+ Count uint64
+}
+
+func (batchInfo) __VDLReflect(struct {
+ Name string `vdl:"v.io/x/ref/services/syncbase/vsync.batchInfo"`
+}) {
+}
+
func init() {
vdl.Register((*syncData)(nil))
vdl.Register((*localGenInfo)(nil))
vdl.Register((*dbSyncState)(nil))
vdl.Register((*localLogRec)(nil))
vdl.Register((*sgLocalState)(nil))
+ vdl.Register((*dagNode)(nil))
+ vdl.Register((*batchInfo)(nil))
}
const logPrefix = "log" // log state.
diff --git a/services/xproxyd/proxy_test.go b/services/xproxyd/proxy_test.go
index ebeb262..bb85a2b 100644
--- a/services/xproxyd/proxy_test.go
+++ b/services/xproxyd/proxy_test.go
@@ -6,7 +6,9 @@
import (
"bufio"
+ "fmt"
"strings"
+ "sync"
"testing"
"time"
@@ -22,10 +24,15 @@
"v.io/v23/security"
)
-const leakWaitTime = 100 * time.Millisecond
+const (
+ leakWaitTime = 100 * time.Millisecond
+ pollTime = 50 * time.Millisecond
+)
-func TestProxiedConnection(t *testing.T) {
+func TestSingleProxy(t *testing.T) {
defer goroutines.NoLeaks(t, leakWaitTime)()
+ kp := newKillProtocol()
+ flow.RegisterProtocol("kill", kp)
pctx, shutdown := v23.Init()
defer shutdown()
actx, am, err := v23.ExperimentalWithNewFlowManager(pctx)
@@ -37,16 +44,23 @@
t.Fatal(err)
}
- pep := startProxy(t, pctx, address{"tcp", "127.0.0.1:0"})
+ pep := startProxy(t, pctx, address{"kill", "127.0.0.1:0"})
if err := am.Listen(actx, "v23", pep.String()); err != nil {
t.Fatal(err)
}
- testEndToEndConnections(t, dctx, actx, dm, am)
+
+ for am.ListeningEndpoints()[0].Addr().Network() == "" {
+ time.Sleep(pollTime)
+ }
+
+ testEndToEndConnections(t, dctx, actx, dm, am, kp)
}
func TestMultipleProxies(t *testing.T) {
defer goroutines.NoLeaks(t, leakWaitTime)()
+ kp := newKillProtocol()
+ flow.RegisterProtocol("kill", kp)
pctx, shutdown := v23.Init()
defer shutdown()
actx, am, err := v23.ExperimentalWithNewFlowManager(pctx)
@@ -58,59 +72,79 @@
t.Fatal(err)
}
- pep := startProxy(t, pctx, address{"tcp", "127.0.0.1:0"})
+ pep := startProxy(t, pctx, address{"kill", "127.0.0.1:0"})
- p2ep := startProxy(t, pctx, address{"v23", pep.String()}, address{"tcp", "127.0.0.1:0"})
+ p2ep := startProxy(t, pctx, address{"v23", pep.String()}, address{"kill", "127.0.0.1:0"})
- p3ep := startProxy(t, pctx, address{"v23", p2ep.String()}, address{"tcp", "127.0.0.1:0"})
+ p3ep := startProxy(t, pctx, address{"v23", p2ep.String()}, address{"kill", "127.0.0.1:0"})
if err := am.Listen(actx, "v23", p3ep.String()); err != nil {
t.Fatal(err)
}
- testEndToEndConnections(t, dctx, actx, dm, am)
+
+ // Wait for am.Listen to get 3 endpoints.
+ for len(am.ListeningEndpoints()) != 3 {
+ time.Sleep(pollTime)
+ }
+
+ testEndToEndConnections(t, dctx, actx, dm, am, kp)
}
-func testEndToEndConnections(t *testing.T, dctx, actx *context.T, dm, am flow.Manager) {
+func testEndToEndConnections(t *testing.T, dctx, actx *context.T, dm, am flow.Manager, kp *killProtocol) {
aeps := am.ListeningEndpoints()
if len(aeps) == 0 {
t.Fatal("acceptor not listening on any endpoints")
}
for _, aep := range aeps {
- testEndToEndConnection(t, dctx, actx, dm, am, aep)
+ // Kill the connections, connections should still eventually succeed.
+ kp.KillConnections()
+ for {
+ if err := testEndToEndConnection(t, dctx, actx, dm, am, aep); err != nil {
+ t.Log(err)
+ time.Sleep(pollTime)
+ continue
+ }
+ break
+ }
}
}
-func testEndToEndConnection(t *testing.T, dctx, actx *context.T, dm, am flow.Manager, aep naming.Endpoint) {
+func testEndToEndConnection(t *testing.T, dctx, actx *context.T, dm, am flow.Manager, aep naming.Endpoint) error {
// The dialing flow.Manager dials a flow to the accepting flow.Manager.
want := "Do you read me?"
df, err := dm.Dial(dctx, aep, bfp)
if err != nil {
- t.Fatal(err)
+ return err
}
// We write before accepting to ensure that the openFlow message is sent.
- writeLine(df, want)
+ if err := writeLine(df, want); err != nil {
+ return err
+ }
af, err := am.Accept(actx)
if err != nil {
- t.Fatal(err)
+ return err
}
got, err := readLine(af)
if err != nil {
- t.Fatal(err)
+ return err
}
if got != want {
- t.Errorf("got %v, want %v", got, want)
+ return fmt.Errorf("got %v, want %v", got, want)
}
// Writes in the opposite direction should work as well.
want = "I read you loud and clear."
- writeLine(af, want)
+ if err := writeLine(af, want); err != nil {
+ return err
+ }
got, err = readLine(df)
if err != nil {
- t.Fatal(err)
+ return err
}
if got != want {
- t.Errorf("got %v, want %v", got, want)
+ return fmt.Errorf("got %v, want %v", got, want)
}
+ return nil
}
// TODO(suharshs): Add test for bidirectional RPC.
@@ -141,20 +175,69 @@
func startProxy(t *testing.T, ctx *context.T, addrs ...address) naming.Endpoint {
var ls rpc.ListenSpec
+ hasProxies := false
for _, addr := range addrs {
ls.Addrs = append(ls.Addrs, addr)
+ if addr.Protocol == "v23" {
+ hasProxies = true
+ }
}
ctx = v23.WithListenSpec(ctx, ls)
proxy, _, err := xproxyd.New(ctx)
if err != nil {
t.Fatal(err)
}
+ // Wait for the proxy to connect to its proxies.
+ if hasProxies {
+ for len(proxy.MultipleProxyEndpoints()) == 0 {
+ time.Sleep(pollTime)
+ }
+ }
peps := proxy.ListeningEndpoints()
for _, pep := range peps {
- if pep.Addr().Network() == "tcp" {
+ if pep.Addr().Network() == "tcp" || pep.Addr().Network() == "kill" {
return pep
}
}
t.Fatal("Proxy not listening on network address.")
return nil
}
+
+type killProtocol struct {
+ protocol flow.Protocol
+ mu sync.Mutex
+ conns []flow.Conn
+}
+
+func newKillProtocol() *killProtocol {
+ p, _ := flow.RegisteredProtocol("tcp")
+ return &killProtocol{protocol: p}
+}
+
+func (p *killProtocol) KillConnections() {
+ p.mu.Lock()
+ for _, c := range p.conns {
+ c.Close()
+ }
+ p.conns = nil
+ p.mu.Unlock()
+}
+
+func (p *killProtocol) Dial(ctx *context.T, protocol, address string, timeout time.Duration) (flow.Conn, error) {
+ c, err := p.protocol.Dial(ctx, "tcp", address, timeout)
+ if err != nil {
+ return nil, err
+ }
+ p.mu.Lock()
+ p.conns = append(p.conns, c)
+ p.mu.Unlock()
+ return c, nil
+}
+
+func (p *killProtocol) Listen(ctx *context.T, protocol, address string) (flow.Listener, error) {
+ return p.protocol.Listen(ctx, "tcp", address)
+}
+
+func (p *killProtocol) Resolve(ctx *context.T, protocol, address string) (string, string, error) {
+ return p.protocol.Resolve(ctx, "tcp", address)
+}
diff --git a/services/xproxyd/proxyd.go b/services/xproxyd/proxyd.go
index d05a561..85d66f7 100644
--- a/services/xproxyd/proxyd.go
+++ b/services/xproxyd/proxyd.go
@@ -5,9 +5,9 @@
package xproxyd
import (
- "fmt"
"io"
"sync"
+ "time"
"v.io/v23"
"v.io/v23/context"
@@ -16,12 +16,12 @@
"v.io/v23/naming"
)
-// TODO(suharshs): Make sure that we don't leak any goroutines.
+const reconnectDelay = 50 * time.Millisecond
type proxy struct {
m flow.Manager
mu sync.Mutex
- proxyEndpoints []naming.Endpoint
+ proxyEndpoints map[string][]naming.Endpoint // keyed by proxy address
}
func New(ctx *context.T) (*proxy, *context.T, error) {
@@ -30,7 +30,8 @@
return nil, nil, err
}
p := &proxy{
- m: mgr,
+ m: mgr,
+ proxyEndpoints: make(map[string][]naming.Endpoint),
}
for _, addr := range v23.GetListenSpec(ctx).Addrs {
if addr.Protocol == "v23" {
@@ -38,25 +39,7 @@
if err != nil {
return nil, nil, err
}
- f, err := p.m.Dial(ctx, ep, proxyBlessingsForPeer{}.run)
- if err != nil {
- return nil, nil, err
- }
- // Send a byte telling the acceptor that we are a proxy.
- if err := writeMessage(ctx, &message.MultiProxyRequest{}, f); err != nil {
- return nil, nil, err
- }
- msg, err := readMessage(ctx, f)
- if err != nil {
- return nil, nil, err
- }
- m, ok := msg.(*message.ProxyResponse)
- if !ok {
- return nil, nil, NewErrUnexpectedMessage(ctx, fmt.Sprintf("%t", m))
- }
- p.mu.Lock()
- p.proxyEndpoints = append(p.proxyEndpoints, m.Endpoints...)
- p.mu.Unlock()
+ go p.connectToProxy(ctx, addr.Address, ep)
} else if err := p.m.Listen(ctx, addr.Protocol, addr.Address); err != nil {
return nil, nil, err
}
@@ -69,6 +52,16 @@
return p.m.ListeningEndpoints()
}
+func (p *proxy) MultipleProxyEndpoints() []naming.Endpoint {
+ var eps []naming.Endpoint
+ p.mu.Lock()
+ for _, v := range p.proxyEndpoints {
+ eps = append(eps, v...)
+ }
+ p.mu.Unlock()
+ return eps
+}
+
func (p *proxy) listenLoop(ctx *context.T) {
for {
f, err := p.m.Accept(ctx)
@@ -99,6 +92,7 @@
func (p *proxy) startRouting(ctx *context.T, f flow.Flow, m *message.Setup) error {
fout, err := p.dialNextHop(ctx, f, m)
if err != nil {
+ f.Close()
return err
}
go p.forwardLoop(ctx, f, fout)
@@ -108,10 +102,9 @@
func (p *proxy) forwardLoop(ctx *context.T, fin, fout flow.Flow) {
for {
- _, err := io.Copy(fin, fout)
- if err == io.EOF {
- return
- } else if err != nil {
+ if _, err := io.Copy(fin, fout); err != nil {
+ fin.Close()
+ fout.Close()
ctx.Errorf("f.Read failed: %v", err)
return
}
@@ -124,7 +117,10 @@
ep naming.Endpoint
err error
)
- if routes := m.PeerRemoteEndpoint.Routes(); len(routes) > 0 {
+ if ep, err = removeNetworkAddress(m.PeerRemoteEndpoint); err != nil {
+ return nil, err
+ }
+ if routes := ep.Routes(); len(routes) > 0 {
if err := rid.FromString(routes[0]); err != nil {
return nil, err
}
@@ -133,15 +129,13 @@
// TODO(suharshs): Make sure that the routingID from the route belongs to a
// connection that is stored in the manager's cache. (i.e. a Server has connected
// with the routingID before)
- if ep, err = setEndpointRoutingID(m.PeerRemoteEndpoint, rid); err != nil {
+ if ep, err = setEndpointRoutingID(ep, rid); err != nil {
return nil, err
}
// Remove the read route from the setup message endpoint.
if m.PeerRemoteEndpoint, err = setEndpointRoutes(m.PeerRemoteEndpoint, routes[1:]); err != nil {
return nil, err
}
- } else {
- ep = m.PeerRemoteEndpoint
}
fout, err := p.m.Dial(ctx, ep, proxyBlessingsForPeer{}.run)
if err != nil {
@@ -175,7 +169,10 @@
func (p *proxy) returnEndpoints(ctx *context.T, rid naming.RoutingID, route string) ([]naming.Endpoint, error) {
p.mu.Lock()
- eps := append(p.m.ListeningEndpoints(), p.proxyEndpoints...)
+ eps := p.m.ListeningEndpoints()
+ for _, peps := range p.proxyEndpoints {
+ eps = append(eps, peps...)
+ }
p.mu.Unlock()
if len(eps) == 0 {
return nil, NewErrNotListening(ctx)
@@ -201,3 +198,41 @@
}
return eps, nil
}
+
+func (p *proxy) connectToProxy(ctx *context.T, address string, ep naming.Endpoint) {
+ for delay := reconnectDelay; ; delay *= 2 {
+ time.Sleep(delay - reconnectDelay)
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ }
+ f, err := p.m.Dial(ctx, ep, proxyBlessingsForPeer{}.run)
+ if err != nil {
+ ctx.Error(err)
+ continue
+ }
+ // Send a byte telling the acceptor that we are a proxy.
+ if err := writeMessage(ctx, &message.MultiProxyRequest{}, f); err != nil {
+ ctx.Error(err)
+ continue
+ }
+ eps, err := readProxyResponse(ctx, f)
+ if err != nil {
+ ctx.Error(err)
+ continue
+ }
+ p.mu.Lock()
+ p.proxyEndpoints[address] = eps
+ p.mu.Unlock()
+ select {
+ case <-ctx.Done():
+ return
+ case <-f.Closed():
+ p.mu.Lock()
+ delete(p.proxyEndpoints, address)
+ p.mu.Unlock()
+ delay = reconnectDelay
+ }
+ }
+}
diff --git a/services/xproxyd/util.go b/services/xproxyd/util.go
index 4ccad31..e14e23e 100644
--- a/services/xproxyd/util.go
+++ b/services/xproxyd/util.go
@@ -5,6 +5,8 @@
package xproxyd
import (
+ "fmt"
+
"v.io/v23"
"v.io/v23/context"
"v.io/v23/flow"
@@ -13,6 +15,16 @@
"v.io/v23/security"
)
+func removeNetworkAddress(ep naming.Endpoint) (naming.Endpoint, error) {
+ _, _, routes, rid, bnames, mountable := getEndpointParts(ep)
+ opts := routes
+ opts = append(opts, bnames...)
+ opts = append(opts, rid)
+ opts = append(opts, mountable)
+ epString := naming.FormatEndpoint("", "", opts...)
+ return v23.NewEndpoint(epString)
+}
+
// setEndpointRoutingID returns a copy of ep with RoutingId changed to rid.
func setEndpointRoutingID(ep naming.Endpoint, rid naming.RoutingID) (naming.Endpoint, error) {
network, address, routes, _, bnames, mountable := getEndpointParts(ep)
@@ -87,3 +99,15 @@
}
return message.Read(ctx, b)
}
+
+func readProxyResponse(ctx *context.T, f flow.Flow) ([]naming.Endpoint, error) {
+ msg, err := readMessage(ctx, f)
+ if err != nil {
+ return nil, err
+ }
+ res, ok := msg.(*message.ProxyResponse)
+ if !ok {
+ return nil, NewErrUnexpectedMessage(ctx, fmt.Sprintf("%t", msg))
+ }
+ return res.Endpoints, nil
+}