services/mgmt/device: Share startup code with unittest.
This commit establishes a function to start deviced's services
(mounttable server, device service on another server mounted on the
mounttable etc.) and uses the function in both the deviced binary and
the implementation unittests.
Change-Id: I319a1c4d1b41eaab36a98c7189dc739585f5a48d
diff --git a/services/mgmt/device/deviced/server.go b/services/mgmt/device/deviced/server.go
index ab3e85b..ac7e719 100644
--- a/services/mgmt/device/deviced/server.go
+++ b/services/mgmt/device/deviced/server.go
@@ -2,6 +2,7 @@
import (
"flag"
+ "fmt"
"net"
"os"
"path/filepath"
@@ -14,13 +15,11 @@
"v.io/core/veyron/lib/signals"
_ "v.io/core/veyron/profiles/roaming"
"v.io/core/veyron/services/mgmt/device/config"
- "v.io/core/veyron/services/mgmt/device/impl"
- mounttable "v.io/core/veyron/services/mounttable/lib"
+ "v.io/core/veyron/services/mgmt/device/starter"
"v.io/core/veyron2"
"v.io/core/veyron2/ipc"
"v.io/core/veyron2/mgmt"
- "v.io/core/veyron2/naming"
"v.io/core/veyron2/vlog"
)
@@ -52,99 +51,60 @@
vlog.Errorf("Failed to load config passed from parent: %v", err)
return err
}
- ls := veyron2.GetListenSpec(ctx)
- if testMode {
- ls.Addrs = ipc.ListenAddrs{{"tcp", "127.0.0.1:0"}}
- ls.Proxy = ""
- }
- var publishName string
- if !testMode {
- publishName = *publishAs
- }
mtAclDir := filepath.Join(configState.Root, "mounttable")
if err := os.MkdirAll(mtAclDir, 0700); err != nil {
vlog.Errorf("os.MkdirAll(%q) failed: %v", mtAclDir, err)
return err
}
- mtName, stop, err := mounttable.StartServers(ctx, ls, publishName, *nhName, filepath.Join(mtAclDir, "acls"))
+
+ // TODO(ashankar,caprita): Use channels/locks to synchronize the
+ // setting and getting of exitErr.
+ var exitErr error
+ ns := starter.NamespaceArgs{
+ ACLFile: filepath.Join(mtAclDir, "acls"),
+ Neighborhood: *nhName,
+ }
+ if testMode {
+ ns.ListenSpec = ipc.ListenSpec{Addrs: ipc.ListenAddrs{{"tcp", "127.0.0.1:0"}}}
+ } else {
+ ns.ListenSpec = veyron2.GetListenSpec(ctx)
+ ns.Name = *publishAs
+ }
+ dev := starter.DeviceArgs{
+ ConfigState: configState,
+ TestMode: testMode,
+ RestartCallback: func() { exitErr = cmdline.ErrExitCode(*restartExitCode) },
+ }
+ if dev.ListenSpec, err = newDeviceListenSpec(ns.ListenSpec, *dmPort); err != nil {
+ return err
+ }
+
+ stop, err := starter.Start(ctx, starter.Args{Namespace: ns, Device: dev, MountGlobalNamespaceInLocalNamespace: true})
if err != nil {
- vlog.Errorf("mounttable.StartServers failed: %v", err)
return err
}
defer stop()
- vlog.VI(0).Infof("Local mounttable published as: %v", publishName)
-
- server, err := veyron2.NewServer(ctx)
- if err != nil {
- vlog.Errorf("NewServer() failed: %v", err)
- return err
- }
- var dispatcher ipc.Dispatcher
- defer func() {
- server.Stop()
- if dispatcher != nil {
- impl.Shutdown(dispatcher)
- }
- }()
-
- // Bring up the device manager with the same address as the mounttable.
- dmListenSpec := ls
- for i, a := range ls.Addrs {
- host, _, err := net.SplitHostPort(a.Address)
- if err != nil {
- vlog.Errorf("net.SplitHostPort(%v) failed: %v", err)
- return err
- }
- dmListenSpec.Addrs[i].Address = net.JoinHostPort(host, strconv.Itoa(*dmPort))
- }
- endpoints, err := server.Listen(dmListenSpec)
- if err != nil {
- vlog.Errorf("Listen(%s) failed: %v", ls, err)
- return err
- }
- name := endpoints[0].Name()
- vlog.VI(0).Infof("Device manager object name: %v", name)
- configState.Name = name
- // TODO(caprita): We need a way to set config fields outside of the
- // update mechanism (since that should ideally be an opaque
- // implementation detail).
-
- var exitErr error
- dispatcher, err = impl.NewDispatcher(ctx, configState, mtName, testMode, func() { exitErr = cmdline.ErrExitCode(*restartExitCode) })
- if err != nil {
- vlog.Errorf("Failed to create dispatcher: %v", err)
- return err
- }
- // Shutdown via dispatcher above.
-
- dmPublishName := naming.Join(mtName, "devmgr")
- if err := server.ServeDispatcher(dmPublishName, dispatcher); err != nil {
- vlog.Errorf("Serve(%v) failed: %v", dmPublishName, err)
- return err
- }
- vlog.VI(0).Infof("Device manager published as: %v", dmPublishName)
-
- // Mount the global namespace in the local namespace as "global".
- ns := veyron2.GetNamespace(ctx)
- for _, root := range ns.Roots() {
- go func(r string) {
- for {
- err := ns.Mount(ctx, naming.Join(mtName, "global"), r, 0 /* forever */, naming.ServesMountTableOpt(true))
- if err == nil {
- break
- }
- vlog.Infof("Failed to Mount global namespace: %v", err)
- time.Sleep(time.Second)
- }
- }(root)
- }
-
- impl.InvokeCallback(ctx, name)
// Wait until shutdown. Ignore duplicate signals (sent by agent and
// received as part of process group).
signals.SameSignalTimeWindow = 500 * time.Millisecond
<-signals.ShutdownOnSignals(ctx)
-
return exitErr
}
+
+// newDeviceListenSpec returns a copy of ls, with the ports changed to port.
+func newDeviceListenSpec(ls ipc.ListenSpec, port int) (ipc.ListenSpec, error) {
+ orig := ls.Addrs
+ ls.Addrs = nil
+ for _, a := range orig {
+ host, _, err := net.SplitHostPort(a.Address)
+ if err != nil {
+ err = fmt.Errorf("net.SplitHostPort(%v) failed: %v", a.Address, err)
+ vlog.Errorf(err.Error())
+ return ls, err
+ }
+ a.Address = net.JoinHostPort(host, strconv.Itoa(port))
+ ls.Addrs = append(ls.Addrs, a)
+ }
+ return ls, nil
+}
diff --git a/services/mgmt/device/impl/impl_test.go b/services/mgmt/device/impl/impl_test.go
index df82bc6..bd43d09 100644
--- a/services/mgmt/device/impl/impl_test.go
+++ b/services/mgmt/device/impl/impl_test.go
@@ -52,10 +52,10 @@
binaryimpl "v.io/core/veyron/services/mgmt/binary/impl"
"v.io/core/veyron/services/mgmt/device/config"
"v.io/core/veyron/services/mgmt/device/impl"
+ "v.io/core/veyron/services/mgmt/device/starter"
libbinary "v.io/core/veyron/services/mgmt/lib/binary"
mgmttest "v.io/core/veyron/services/mgmt/lib/testutil"
suidhelper "v.io/core/veyron/services/mgmt/suidhelper/impl"
- mounttable "v.io/core/veyron/services/mounttable/lib"
)
const (
@@ -138,7 +138,6 @@
// specify device manager config settings.
func deviceManager(stdin io.Reader, stdout, stderr io.Writer, env map[string]string, args ...string) error {
ctx, shutdown := testutil.InitForTest()
-
if len(args) == 0 {
vlog.Fatalf("deviceManager expected at least an argument")
}
@@ -149,24 +148,12 @@
defer shutdown()
veyron2.GetNamespace(ctx).CacheCtl(naming.DisableCache(true))
- server, endpoint := mgmttest.NewServer(ctx)
- var dispatcher ipc.Dispatcher
- defer func() {
- server.Stop()
- if dispatcher != nil {
- impl.Shutdown(dispatcher)
- }
- }()
- name := naming.JoinAddressName(endpoint, "")
- vlog.VI(1).Infof("Device manager name: %v", name)
-
// Satisfy the contract described in doc.go by passing the config state
// through to the device manager dispatcher constructor.
configState, err := config.Load()
if err != nil {
vlog.Fatalf("Failed to decode config state: %v", err)
}
- configState.Name = name
// This exemplifies how to override or set specific config fields, if,
// for example, the device manager is invoked 'by hand' instead of via a
@@ -178,40 +165,37 @@
configState.Root, configState.Helper, configState.Origin, configState.CurrentLink = args[0], args[1], args[2], args[3]
}
- mtListenSpec := ipc.ListenSpec{Addrs: ipc.ListenAddrs{{"tcp", "127.0.0.1:0"}}}
- mtName, stop, err := mounttable.StartServers(ctx, mtListenSpec, "", "", "" /* ACL File */)
+ stop, err := starter.Start(ctx, starter.Args{
+ Namespace: starter.NamespaceArgs{
+ ListenSpec: ipc.ListenSpec{Addrs: ipc.ListenAddrs{{"tcp", "127.0.0.1:0"}}},
+ },
+ Device: starter.DeviceArgs{
+ Name: publishName,
+ ListenSpec: ipc.ListenSpec{Addrs: ipc.ListenAddrs{{"tcp", "127.0.0.1:0"}}},
+ ConfigState: configState,
+ TestMode: strings.HasSuffix(fmt.Sprint(veyron2.GetPrincipal(ctx).BlessingStore().Default()), "/testdm"),
+ RestartCallback: func() { fmt.Println("restart handler") },
+ },
+ // TODO(rthellend): Wire up the local mounttable like the real device
+ // manager, i.e. mount the device manager and the apps on it, and mount
+ // the local mounttable in the global namespace.
+ // MountGlobalNamespaceInLocalNamespace: true,
+ })
if err != nil {
- vlog.Errorf("mounttable.StartServers failed: %v", err)
+ vlog.Errorf("starter.Start failed: %v", err)
return err
}
defer stop()
- // TODO(rthellend): Wire up the local mounttable like the real device
- // manager, i.e. mount the device manager and the apps on it, and mount
- // the local mounttable in the global namespace.
-
- blessings := fmt.Sprint(veyron2.GetPrincipal(ctx).BlessingStore().Default())
- testMode := strings.HasSuffix(blessings, "/testdm")
- dispatcher, err = impl.NewDispatcher(ctx, configState, mtName, testMode, func() { fmt.Println("restart handler") })
- if err != nil {
- vlog.Fatalf("Failed to create device manager dispatcher: %v", err)
- }
- // dispatcher is shutdown by deferral above.
-
- if err := server.ServeDispatcher(publishName, dispatcher); err != nil {
- vlog.Fatalf("Serve(%v) failed: %v", publishName, err)
- }
- impl.InvokeCallback(ctx, name)
-
fmt.Fprintf(stdout, "ready:%d\n", os.Getpid())
<-signals.ShutdownOnSignals(ctx)
-
if val, present := env["PAUSE_BEFORE_STOP"]; present && val == "1" {
modules.WaitForEOF(stdin)
}
- if impl.DispatcherLeaking(dispatcher) {
- vlog.Fatalf("device manager leaking resources")
- }
+ // TODO(ashankar): Figure out a way to incorporate this check in the test.
+ // if impl.DispatcherLeaking(dispatcher) {
+ // vlog.Fatalf("device manager leaking resources")
+ // }
return nil
}
diff --git a/services/mgmt/device/starter/starter.go b/services/mgmt/device/starter/starter.go
new file mode 100644
index 0000000..62c84ad
--- /dev/null
+++ b/services/mgmt/device/starter/starter.go
@@ -0,0 +1,135 @@
+// Package starter provides a single function that starts up servers for a
+// mounttable and a device manager that is mounted on it.
+package starter
+
+import (
+ "time"
+
+ "v.io/core/veyron/services/mgmt/device/config"
+ "v.io/core/veyron/services/mgmt/device/impl"
+ mounttable "v.io/core/veyron/services/mounttable/lib"
+
+ "v.io/core/veyron2"
+ "v.io/core/veyron2/context"
+ "v.io/core/veyron2/ipc"
+ "v.io/core/veyron2/naming"
+ "v.io/core/veyron2/vlog"
+)
+
+type NamespaceArgs struct {
+ Name string // Name to publish the mounttable service under.
+ ListenSpec ipc.ListenSpec // ListenSpec for the server.
+ ACLFile string // Path to the ACL file used by the mounttable.
+ // Name in the local neighborhood on which to make the mounttable
+ // visible. If empty, the mounttable will not be visible in the local
+ // neighborhood.
+ Neighborhood string
+}
+
+type DeviceArgs struct {
+ Name string // Name to publish the device service under.
+ ListenSpec ipc.ListenSpec // ListenSpec for the device server.
+ ConfigState *config.State // Configuration for the device.
+ TestMode bool // Whether the device is running in test mode or not.
+ RestartCallback func() // Callback invoked when the device service is restarted.
+}
+
+type Args struct {
+ Namespace NamespaceArgs
+ Device DeviceArgs
+
+ // If true, the global namespace will be made available on the
+ // mounttable server under "global/".
+ MountGlobalNamespaceInLocalNamespace bool
+}
+
+// Start creates servers for the mounttable and device services and links them together.
+//
+// Returns the callback to be invoked to shutdown the services on success, or
+// an error on failure.
+func Start(ctx *context.T, args Args) (func(), error) {
+ mtName, stopMT, err := mounttable.StartServers(ctx, args.Namespace.ListenSpec, args.Namespace.Name, args.Namespace.Neighborhood, args.Namespace.ACLFile)
+ if err != nil {
+ vlog.Errorf("mounttable.StartServers(%#v) failed: %v", args.Namespace, err)
+ return nil, err
+ }
+ vlog.Infof("Local mounttable (%v) published as %q", mtName, args.Namespace.Name)
+
+ if args.Device.Name == "" {
+ args.Device.Name = naming.Join(mtName, "devmgr")
+ }
+ stopDevice, err := startDeviceServer(ctx, args.Device, mtName)
+ if err != nil {
+ stopMT()
+ vlog.Errorf("Failed to start device service: %v", err)
+ return nil, err
+ }
+ if args.MountGlobalNamespaceInLocalNamespace {
+ mountGlobalNamespaceInLocalNamespace(ctx, mtName)
+ }
+ impl.InvokeCallback(ctx, args.Device.ConfigState.Name)
+
+ return func() {
+ stopDevice()
+ stopMT()
+ }, nil
+}
+
+// startDeviceServer creates an ipc.Server and sets it up to server the Device service.
+//
+// ls: ListenSpec for the server
+// configState: configuration for the Device service dispatcher
+// mt: Object address of the mounttable
+// dm: Name to publish the device service under
+// testMode: whether the service is to be run in test mode
+// restarted: callback invoked when the device manager is restarted.
+//
+// Returns:
+// (1) Function to be called to force the service to shutdown
+// (2) Any errors in starting the service (in which case, (1) will be nil)
+func startDeviceServer(ctx *context.T, args DeviceArgs, mt string) (shutdown func(), err error) {
+ server, err := veyron2.NewServer(ctx)
+ if err != nil {
+ return nil, err
+ }
+ shutdown = func() { server.Stop() }
+ endpoints, err := server.Listen(args.ListenSpec)
+ if err != nil {
+ shutdown()
+ return nil, err
+ }
+ args.ConfigState.Name = endpoints[0].Name()
+ vlog.Infof("Device manager object name: %v", args.ConfigState.Name)
+
+ dispatcher, err := impl.NewDispatcher(ctx, args.ConfigState, mt, args.TestMode, args.RestartCallback)
+ if err != nil {
+ shutdown()
+ return nil, err
+ }
+
+ shutdown = func() {
+ server.Stop()
+ impl.Shutdown(dispatcher)
+ }
+ if err := server.ServeDispatcher(args.Name, dispatcher); err != nil {
+ shutdown()
+ return nil, err
+ }
+ return shutdown, nil
+}
+
+func mountGlobalNamespaceInLocalNamespace(ctx *context.T, localMT string) {
+ ns := veyron2.GetNamespace(ctx)
+ for _, root := range ns.Roots() {
+ go func(r string) {
+ for {
+ err := ns.Mount(ctx, naming.Join(localMT, "global"), r, 0 /* forever */, naming.ServesMountTableOpt(true))
+ if err == nil {
+ break
+ }
+ vlog.Infof("Failed to Mount global namespace: %v", err)
+ time.Sleep(time.Second)
+ }
+ }(root)
+ }
+}