Merge "services/mgmt/device: Share startup code with unittest."
diff --git a/services/mgmt/device/deviced/server.go b/services/mgmt/device/deviced/server.go
index ab3e85b..ac7e719 100644
--- a/services/mgmt/device/deviced/server.go
+++ b/services/mgmt/device/deviced/server.go
@@ -2,6 +2,7 @@
import (
"flag"
+ "fmt"
"net"
"os"
"path/filepath"
@@ -14,13 +15,11 @@
"v.io/core/veyron/lib/signals"
_ "v.io/core/veyron/profiles/roaming"
"v.io/core/veyron/services/mgmt/device/config"
- "v.io/core/veyron/services/mgmt/device/impl"
- mounttable "v.io/core/veyron/services/mounttable/lib"
+ "v.io/core/veyron/services/mgmt/device/starter"
"v.io/core/veyron2"
"v.io/core/veyron2/ipc"
"v.io/core/veyron2/mgmt"
- "v.io/core/veyron2/naming"
"v.io/core/veyron2/vlog"
)
@@ -52,99 +51,60 @@
vlog.Errorf("Failed to load config passed from parent: %v", err)
return err
}
- ls := veyron2.GetListenSpec(ctx)
- if testMode {
- ls.Addrs = ipc.ListenAddrs{{"tcp", "127.0.0.1:0"}}
- ls.Proxy = ""
- }
- var publishName string
- if !testMode {
- publishName = *publishAs
- }
mtAclDir := filepath.Join(configState.Root, "mounttable")
if err := os.MkdirAll(mtAclDir, 0700); err != nil {
vlog.Errorf("os.MkdirAll(%q) failed: %v", mtAclDir, err)
return err
}
- mtName, stop, err := mounttable.StartServers(ctx, ls, publishName, *nhName, filepath.Join(mtAclDir, "acls"))
+
+ // TODO(ashankar,caprita): Use channels/locks to synchronize the
+ // setting and getting of exitErr.
+ var exitErr error
+ ns := starter.NamespaceArgs{
+ ACLFile: filepath.Join(mtAclDir, "acls"),
+ Neighborhood: *nhName,
+ }
+ if testMode {
+ ns.ListenSpec = ipc.ListenSpec{Addrs: ipc.ListenAddrs{{"tcp", "127.0.0.1:0"}}}
+ } else {
+ ns.ListenSpec = veyron2.GetListenSpec(ctx)
+ ns.Name = *publishAs
+ }
+ dev := starter.DeviceArgs{
+ ConfigState: configState,
+ TestMode: testMode,
+ RestartCallback: func() { exitErr = cmdline.ErrExitCode(*restartExitCode) },
+ }
+ if dev.ListenSpec, err = newDeviceListenSpec(ns.ListenSpec, *dmPort); err != nil {
+ return err
+ }
+
+ stop, err := starter.Start(ctx, starter.Args{Namespace: ns, Device: dev, MountGlobalNamespaceInLocalNamespace: true})
if err != nil {
- vlog.Errorf("mounttable.StartServers failed: %v", err)
return err
}
defer stop()
- vlog.VI(0).Infof("Local mounttable published as: %v", publishName)
-
- server, err := veyron2.NewServer(ctx)
- if err != nil {
- vlog.Errorf("NewServer() failed: %v", err)
- return err
- }
- var dispatcher ipc.Dispatcher
- defer func() {
- server.Stop()
- if dispatcher != nil {
- impl.Shutdown(dispatcher)
- }
- }()
-
- // Bring up the device manager with the same address as the mounttable.
- dmListenSpec := ls
- for i, a := range ls.Addrs {
- host, _, err := net.SplitHostPort(a.Address)
- if err != nil {
- vlog.Errorf("net.SplitHostPort(%v) failed: %v", err)
- return err
- }
- dmListenSpec.Addrs[i].Address = net.JoinHostPort(host, strconv.Itoa(*dmPort))
- }
- endpoints, err := server.Listen(dmListenSpec)
- if err != nil {
- vlog.Errorf("Listen(%s) failed: %v", ls, err)
- return err
- }
- name := endpoints[0].Name()
- vlog.VI(0).Infof("Device manager object name: %v", name)
- configState.Name = name
- // TODO(caprita): We need a way to set config fields outside of the
- // update mechanism (since that should ideally be an opaque
- // implementation detail).
-
- var exitErr error
- dispatcher, err = impl.NewDispatcher(ctx, configState, mtName, testMode, func() { exitErr = cmdline.ErrExitCode(*restartExitCode) })
- if err != nil {
- vlog.Errorf("Failed to create dispatcher: %v", err)
- return err
- }
- // Shutdown via dispatcher above.
-
- dmPublishName := naming.Join(mtName, "devmgr")
- if err := server.ServeDispatcher(dmPublishName, dispatcher); err != nil {
- vlog.Errorf("Serve(%v) failed: %v", dmPublishName, err)
- return err
- }
- vlog.VI(0).Infof("Device manager published as: %v", dmPublishName)
-
- // Mount the global namespace in the local namespace as "global".
- ns := veyron2.GetNamespace(ctx)
- for _, root := range ns.Roots() {
- go func(r string) {
- for {
- err := ns.Mount(ctx, naming.Join(mtName, "global"), r, 0 /* forever */, naming.ServesMountTableOpt(true))
- if err == nil {
- break
- }
- vlog.Infof("Failed to Mount global namespace: %v", err)
- time.Sleep(time.Second)
- }
- }(root)
- }
-
- impl.InvokeCallback(ctx, name)
// Wait until shutdown. Ignore duplicate signals (sent by agent and
// received as part of process group).
signals.SameSignalTimeWindow = 500 * time.Millisecond
<-signals.ShutdownOnSignals(ctx)
-
return exitErr
}
+
+// newDeviceListenSpec returns a copy of ls, with the ports changed to port.
+func newDeviceListenSpec(ls ipc.ListenSpec, port int) (ipc.ListenSpec, error) {
+ orig := ls.Addrs
+ ls.Addrs = nil
+ for _, a := range orig {
+ host, _, err := net.SplitHostPort(a.Address)
+ if err != nil {
+ err = fmt.Errorf("net.SplitHostPort(%v) failed: %v", a.Address, err)
+ vlog.Errorf(err.Error())
+ return ls, err
+ }
+ a.Address = net.JoinHostPort(host, strconv.Itoa(port))
+ ls.Addrs = append(ls.Addrs, a)
+ }
+ return ls, nil
+}
diff --git a/services/mgmt/device/impl/impl_test.go b/services/mgmt/device/impl/impl_test.go
index df82bc6..bd43d09 100644
--- a/services/mgmt/device/impl/impl_test.go
+++ b/services/mgmt/device/impl/impl_test.go
@@ -52,10 +52,10 @@
binaryimpl "v.io/core/veyron/services/mgmt/binary/impl"
"v.io/core/veyron/services/mgmt/device/config"
"v.io/core/veyron/services/mgmt/device/impl"
+ "v.io/core/veyron/services/mgmt/device/starter"
libbinary "v.io/core/veyron/services/mgmt/lib/binary"
mgmttest "v.io/core/veyron/services/mgmt/lib/testutil"
suidhelper "v.io/core/veyron/services/mgmt/suidhelper/impl"
- mounttable "v.io/core/veyron/services/mounttable/lib"
)
const (
@@ -138,7 +138,6 @@
// specify device manager config settings.
func deviceManager(stdin io.Reader, stdout, stderr io.Writer, env map[string]string, args ...string) error {
ctx, shutdown := testutil.InitForTest()
-
if len(args) == 0 {
vlog.Fatalf("deviceManager expected at least an argument")
}
@@ -149,24 +148,12 @@
defer shutdown()
veyron2.GetNamespace(ctx).CacheCtl(naming.DisableCache(true))
- server, endpoint := mgmttest.NewServer(ctx)
- var dispatcher ipc.Dispatcher
- defer func() {
- server.Stop()
- if dispatcher != nil {
- impl.Shutdown(dispatcher)
- }
- }()
- name := naming.JoinAddressName(endpoint, "")
- vlog.VI(1).Infof("Device manager name: %v", name)
-
// Satisfy the contract described in doc.go by passing the config state
// through to the device manager dispatcher constructor.
configState, err := config.Load()
if err != nil {
vlog.Fatalf("Failed to decode config state: %v", err)
}
- configState.Name = name
// This exemplifies how to override or set specific config fields, if,
// for example, the device manager is invoked 'by hand' instead of via a
@@ -178,40 +165,37 @@
configState.Root, configState.Helper, configState.Origin, configState.CurrentLink = args[0], args[1], args[2], args[3]
}
- mtListenSpec := ipc.ListenSpec{Addrs: ipc.ListenAddrs{{"tcp", "127.0.0.1:0"}}}
- mtName, stop, err := mounttable.StartServers(ctx, mtListenSpec, "", "", "" /* ACL File */)
+ stop, err := starter.Start(ctx, starter.Args{
+ Namespace: starter.NamespaceArgs{
+ ListenSpec: ipc.ListenSpec{Addrs: ipc.ListenAddrs{{"tcp", "127.0.0.1:0"}}},
+ },
+ Device: starter.DeviceArgs{
+ Name: publishName,
+ ListenSpec: ipc.ListenSpec{Addrs: ipc.ListenAddrs{{"tcp", "127.0.0.1:0"}}},
+ ConfigState: configState,
+ TestMode: strings.HasSuffix(fmt.Sprint(veyron2.GetPrincipal(ctx).BlessingStore().Default()), "/testdm"),
+ RestartCallback: func() { fmt.Println("restart handler") },
+ },
+ // TODO(rthellend): Wire up the local mounttable like the real device
+ // manager, i.e. mount the device manager and the apps on it, and mount
+ // the local mounttable in the global namespace.
+ // MountGlobalNamespaceInLocalNamespace: true,
+ })
if err != nil {
- vlog.Errorf("mounttable.StartServers failed: %v", err)
+ vlog.Errorf("starter.Start failed: %v", err)
return err
}
defer stop()
- // TODO(rthellend): Wire up the local mounttable like the real device
- // manager, i.e. mount the device manager and the apps on it, and mount
- // the local mounttable in the global namespace.
-
- blessings := fmt.Sprint(veyron2.GetPrincipal(ctx).BlessingStore().Default())
- testMode := strings.HasSuffix(blessings, "/testdm")
- dispatcher, err = impl.NewDispatcher(ctx, configState, mtName, testMode, func() { fmt.Println("restart handler") })
- if err != nil {
- vlog.Fatalf("Failed to create device manager dispatcher: %v", err)
- }
- // dispatcher is shutdown by deferral above.
-
- if err := server.ServeDispatcher(publishName, dispatcher); err != nil {
- vlog.Fatalf("Serve(%v) failed: %v", publishName, err)
- }
- impl.InvokeCallback(ctx, name)
-
fmt.Fprintf(stdout, "ready:%d\n", os.Getpid())
<-signals.ShutdownOnSignals(ctx)
-
if val, present := env["PAUSE_BEFORE_STOP"]; present && val == "1" {
modules.WaitForEOF(stdin)
}
- if impl.DispatcherLeaking(dispatcher) {
- vlog.Fatalf("device manager leaking resources")
- }
+ // TODO(ashankar): Figure out a way to incorporate this check in the test.
+ // if impl.DispatcherLeaking(dispatcher) {
+ // vlog.Fatalf("device manager leaking resources")
+ // }
return nil
}
diff --git a/services/mgmt/device/starter/starter.go b/services/mgmt/device/starter/starter.go
new file mode 100644
index 0000000..62c84ad
--- /dev/null
+++ b/services/mgmt/device/starter/starter.go
@@ -0,0 +1,135 @@
+// Package starter provides a single function that starts up servers for a
+// mounttable and a device manager that is mounted on it.
+package starter
+
+import (
+ "time"
+
+ "v.io/core/veyron/services/mgmt/device/config"
+ "v.io/core/veyron/services/mgmt/device/impl"
+ mounttable "v.io/core/veyron/services/mounttable/lib"
+
+ "v.io/core/veyron2"
+ "v.io/core/veyron2/context"
+ "v.io/core/veyron2/ipc"
+ "v.io/core/veyron2/naming"
+ "v.io/core/veyron2/vlog"
+)
+
+type NamespaceArgs struct {
+ Name string // Name to publish the mounttable service under.
+ ListenSpec ipc.ListenSpec // ListenSpec for the server.
+ ACLFile string // Path to the ACL file used by the mounttable.
+ // Name in the local neighborhood on which to make the mounttable
+ // visible. If empty, the mounttable will not be visible in the local
+ // neighborhood.
+ Neighborhood string
+}
+
+type DeviceArgs struct {
+ Name string // Name to publish the device service under.
+ ListenSpec ipc.ListenSpec // ListenSpec for the device server.
+ ConfigState *config.State // Configuration for the device.
+ TestMode bool // Whether the device is running in test mode or not.
+ RestartCallback func() // Callback invoked when the device service is restarted.
+}
+
+type Args struct {
+ Namespace NamespaceArgs
+ Device DeviceArgs
+
+ // If true, the global namespace will be made available on the
+ // mounttable server under "global/".
+ MountGlobalNamespaceInLocalNamespace bool
+}
+
+// Start creates servers for the mounttable and device services and links them together.
+//
+// Returns the callback to be invoked to shutdown the services on success, or
+// an error on failure.
+func Start(ctx *context.T, args Args) (func(), error) {
+ mtName, stopMT, err := mounttable.StartServers(ctx, args.Namespace.ListenSpec, args.Namespace.Name, args.Namespace.Neighborhood, args.Namespace.ACLFile)
+ if err != nil {
+ vlog.Errorf("mounttable.StartServers(%#v) failed: %v", args.Namespace, err)
+ return nil, err
+ }
+ vlog.Infof("Local mounttable (%v) published as %q", mtName, args.Namespace.Name)
+
+ if args.Device.Name == "" {
+ args.Device.Name = naming.Join(mtName, "devmgr")
+ }
+ stopDevice, err := startDeviceServer(ctx, args.Device, mtName)
+ if err != nil {
+ stopMT()
+ vlog.Errorf("Failed to start device service: %v", err)
+ return nil, err
+ }
+ if args.MountGlobalNamespaceInLocalNamespace {
+ mountGlobalNamespaceInLocalNamespace(ctx, mtName)
+ }
+ impl.InvokeCallback(ctx, args.Device.ConfigState.Name)
+
+ return func() {
+ stopDevice()
+ stopMT()
+ }, nil
+}
+
+// startDeviceServer creates an ipc.Server and sets it up to server the Device service.
+//
+// ls: ListenSpec for the server
+// configState: configuration for the Device service dispatcher
+// mt: Object address of the mounttable
+// dm: Name to publish the device service under
+// testMode: whether the service is to be run in test mode
+// restarted: callback invoked when the device manager is restarted.
+//
+// Returns:
+// (1) Function to be called to force the service to shutdown
+// (2) Any errors in starting the service (in which case, (1) will be nil)
+func startDeviceServer(ctx *context.T, args DeviceArgs, mt string) (shutdown func(), err error) {
+ server, err := veyron2.NewServer(ctx)
+ if err != nil {
+ return nil, err
+ }
+ shutdown = func() { server.Stop() }
+ endpoints, err := server.Listen(args.ListenSpec)
+ if err != nil {
+ shutdown()
+ return nil, err
+ }
+ args.ConfigState.Name = endpoints[0].Name()
+ vlog.Infof("Device manager object name: %v", args.ConfigState.Name)
+
+ dispatcher, err := impl.NewDispatcher(ctx, args.ConfigState, mt, args.TestMode, args.RestartCallback)
+ if err != nil {
+ shutdown()
+ return nil, err
+ }
+
+ shutdown = func() {
+ server.Stop()
+ impl.Shutdown(dispatcher)
+ }
+ if err := server.ServeDispatcher(args.Name, dispatcher); err != nil {
+ shutdown()
+ return nil, err
+ }
+ return shutdown, nil
+}
+
+func mountGlobalNamespaceInLocalNamespace(ctx *context.T, localMT string) {
+ ns := veyron2.GetNamespace(ctx)
+ for _, root := range ns.Roots() {
+ go func(r string) {
+ for {
+ err := ns.Mount(ctx, naming.Join(localMT, "global"), r, 0 /* forever */, naming.ServesMountTableOpt(true))
+ if err == nil {
+ break
+ }
+ vlog.Infof("Failed to Mount global namespace: %v", err)
+ time.Sleep(time.Second)
+ }
+ }(root)
+ }
+}