Merge ""veyron2/security": No direct vom encoding of blessings"
diff --git a/lib/appcycle/appcycle.go b/lib/appcycle/appcycle.go
new file mode 100644
index 0000000..3dd0004
--- /dev/null
+++ b/lib/appcycle/appcycle.go
@@ -0,0 +1,141 @@
+package appcycle
+
+import (
+ "fmt"
+ "os"
+ "sync"
+
+ "veyron.io/veyron/veyron2"
+ "veyron.io/veyron/veyron2/ipc"
+ stub "veyron.io/veyron/veyron2/services/mgmt/appcycle"
+)
+
+type AppCycle struct {
+ sync.RWMutex
+ waiters []chan<- string
+ taskTrackers []chan<- veyron2.Task
+ task veyron2.Task
+ shutDown bool
+ rt veyron2.Runtime
+ disp *invoker
+}
+
+type invoker struct {
+ ac *AppCycle
+}
+
+func New(rt veyron2.Runtime) *AppCycle {
+ ac := &AppCycle{rt: rt}
+ ac.disp = &invoker{ac}
+ return ac
+}
+
+func (m *AppCycle) Shutdown() {
+ m.Lock()
+ defer m.Unlock()
+ if m.shutDown {
+ return
+ }
+ m.shutDown = true
+ for _, t := range m.taskTrackers {
+ close(t)
+ }
+ m.taskTrackers = nil
+}
+
+func (m *AppCycle) stop(msg string) {
+ m.RLock()
+ defer m.RUnlock()
+ if len(m.waiters) == 0 {
+ os.Exit(veyron2.UnhandledStopExitCode)
+ }
+ for _, w := range m.waiters {
+ select {
+ case w <- msg:
+ default:
+ }
+ }
+}
+
+func (m *AppCycle) Stop() {
+ m.stop(veyron2.LocalStop)
+}
+
+func (*AppCycle) ForceStop() {
+ os.Exit(veyron2.ForceStopExitCode)
+}
+
+func (m *AppCycle) WaitForStop(ch chan<- string) {
+ m.Lock()
+ defer m.Unlock()
+ m.waiters = append(m.waiters, ch)
+}
+
+func (m *AppCycle) TrackTask(ch chan<- veyron2.Task) {
+ m.Lock()
+ defer m.Unlock()
+ if m.shutDown {
+ close(ch)
+ return
+ }
+ m.taskTrackers = append(m.taskTrackers, ch)
+}
+
+func (m *AppCycle) advanceTask(progress, goal int32) {
+ m.Lock()
+ defer m.Unlock()
+ m.task.Goal += goal
+ m.task.Progress += progress
+ for _, t := range m.taskTrackers {
+ select {
+ case t <- m.task:
+ default:
+ // TODO(caprita): Make it such that the latest task
+ // update is always added to the channel even if channel
+ // is full. One way is to pull an element from t and
+ // then re-try the push.
+ }
+ }
+}
+
+func (m *AppCycle) AdvanceGoal(delta int32) {
+ if delta <= 0 {
+ return
+ }
+ m.advanceTask(0, delta)
+}
+
+func (m *AppCycle) AdvanceProgress(delta int32) {
+ if delta <= 0 {
+ return
+ }
+ m.advanceTask(delta, 0)
+}
+
+func (m *AppCycle) Remote() interface{} {
+ return stub.AppCycleServer(m.disp)
+}
+
+func (d *invoker) Stop(ctx stub.AppCycleStopContext) error {
+ // The size of the channel should be reasonably sized to expect not to
+ // miss updates while we're waiting for the stream to unblock.
+ ch := make(chan veyron2.Task, 10)
+ d.ac.TrackTask(ch)
+ // TODO(caprita): Include identity of Stop issuer in message.
+ d.ac.stop(veyron2.RemoteStop)
+ for {
+ task, ok := <-ch
+ if !ok {
+ // Channel closed, meaning process shutdown is imminent.
+ break
+ }
+ actask := stub.Task{Progress: task.Progress, Goal: task.Goal}
+ ctx.SendStream().Send(actask)
+ }
+ return nil
+}
+
+func (d *invoker) ForceStop(ipc.ServerContext) error {
+ d.ac.ForceStop()
+ return fmt.Errorf("ForceStop should not reply as the process should be dead")
+}
diff --git a/lib/modules/core/mounttable.go b/lib/modules/core/mounttable.go
index 7d5d7f8..306498f 100644
--- a/lib/modules/core/mounttable.go
+++ b/lib/modules/core/mounttable.go
@@ -108,7 +108,7 @@
return nil
}
-type resolver func(ctx context.T, name string) (names []string, err error)
+type resolver func(ctx context.T, name string, opts ...naming.ResolveOpt) (names []string, err error)
func resolve(fn resolver, stdin io.Reader, stdout, stderr io.Writer, env map[string]string, args ...string) error {
if err := checkArgs(args[1:], 1, "<name>"); err != nil {
diff --git a/lib/signals/signals.go b/lib/signals/signals.go
index 36bf90b..e24e1c3 100644
--- a/lib/signals/signals.go
+++ b/lib/signals/signals.go
@@ -47,7 +47,7 @@
sawStop = true
if r := rt.R(); r != nil {
stopWaiter := make(chan string, 1)
- r.WaitForStop(stopWaiter)
+ r.AppCycle().WaitForStop(stopWaiter)
go func() {
for {
ch <- stopSignal(<-stopWaiter)
diff --git a/lib/signals/signals_test.go b/lib/signals/signals_test.go
index 4aff7fe..e101277 100644
--- a/lib/signals/signals_test.go
+++ b/lib/signals/signals_test.go
@@ -49,7 +49,7 @@
close(ch)
return
case "stop":
- rt.R().Stop()
+ rt.R().AppCycle().Stop()
}
}
}
diff --git a/lib/unixfd/unixfd.go b/lib/unixfd/unixfd.go
index 132a6c9..2cdcc36 100644
--- a/lib/unixfd/unixfd.go
+++ b/lib/unixfd/unixfd.go
@@ -202,16 +202,37 @@
}
defer local.maybeClose()
rfile := remote.releaseFile()
- defer rfile.Close()
rights := syscall.UnixRights(int(rfile.Fd()))
n, oobn, err := conn.WriteMsgUnix(data, rights, nil)
if err != nil {
+ rfile.Close()
return nil, err
} else if n != len(data) || oobn != len(rights) {
+ rfile.Close()
return nil, fmt.Errorf("expected to send %d, %d bytes, sent %d, %d", len(data), len(rights), n, oobn)
}
- return local.releaseAddr(), nil
+ // Wait for the other side to acknowledge.
+ // This is to work around a race on OS X where it appears we can close
+ // the file descriptor before it gets transfered over the socket.
+ f := local.releaseFile()
+ fd, err := syscall.Dup(int(f.Fd()))
+ if err != nil {
+ f.Close()
+ rfile.Close()
+ return nil, err
+ }
+ newConn, err := net.FileConn(f)
+ f.Close()
+ if err != nil {
+ rfile.Close()
+ return nil, err
+ }
+ newConn.Read(make([]byte, 1))
+ newConn.Close()
+ rfile.Close()
+
+ return Addr(uintptr(fd)), nil
}
const cmsgDataLength = int(unsafe.Sizeof(int(1)))
@@ -232,22 +253,40 @@
return nil, n, err
}
fd := -1
+ // Loop through any file descriptors we are sent, and close
+ // all extras.
for _, scm := range scms {
fds, err := syscall.ParseUnixRights(&scm)
if err != nil {
return nil, n, err
}
for _, f := range fds {
- if fd != -1 {
- syscall.Close(fd)
+ if fd == -1 {
+ fd = f
+ } else if f != -1 {
+ syscall.Close(f)
}
- fd = f
}
}
if fd == -1 {
return nil, n, nil
}
- return Addr(uintptr(fd)), n, nil
+ result := Addr(uintptr(fd))
+ fd, err = syscall.Dup(fd)
+ if err != nil {
+ CloseUnixAddr(result)
+ return nil, n, err
+ }
+ file := os.NewFile(uintptr(fd), "newconn")
+ newconn, err := net.FileConn(file)
+ file.Close()
+ if err != nil {
+ CloseUnixAddr(result)
+ return nil, n, err
+ }
+ newconn.Write(make([]byte, 1))
+ newconn.Close()
+ return result, n, nil
}
func CloseUnixAddr(addr net.Addr) error {
diff --git a/lib/unixfd/unixfd_test.go b/lib/unixfd/unixfd_test.go
index 6dca48f..152efa8 100644
--- a/lib/unixfd/unixfd_test.go
+++ b/lib/unixfd/unixfd_test.go
@@ -136,15 +136,25 @@
if err != nil {
t.Fatalf("FileConn: %v", err)
}
+ var readErr error
+ var n int
+ var saddr net.Addr
+ done := make(chan struct{})
+ buf := make([]byte, 10)
+ go func() {
+ saddr, n, err = ReadConnection(server, buf)
+ close(done)
+ }()
caddr, err := SendConnection(uclient.(*net.UnixConn), []byte("hello"), true)
if err != nil {
t.Fatalf("SendConnection: %v", err)
}
-
- buf := make([]byte, 10)
- saddr, n, err := ReadConnection(server, buf)
- if err != nil {
- t.Fatalf("ReadConnection: %v", err)
+ <-done
+ if readErr != nil {
+ t.Fatalf("ReadConnection: %v", readErr)
+ }
+ if saddr == nil {
+ t.Fatalf("ReadConnection returned nil, %d", n)
}
data := buf[0:n]
if !bytes.Equal([]byte("hello"), data) {
diff --git a/profiles/chrome/chrome.go b/profiles/chrome/chrome.go
index 22b5beb..cfead4e 100644
--- a/profiles/chrome/chrome.go
+++ b/profiles/chrome/chrome.go
@@ -39,11 +39,13 @@
return p
}
-func (g *chrome) Init(rt veyron2.Runtime, _ *config.Publisher) error {
- rt.Logger().VI(1).Infof("%s", g)
- return nil
+func (c *chrome) Init(rt veyron2.Runtime, _ *config.Publisher) (veyron2.AppCycle, error) {
+ rt.Logger().VI(1).Infof("%s", c)
+ return nil, nil
}
-func (g *chrome) String() string {
- return "chrome profile on " + g.Platform().String()
+func (*chrome) Cleanup() {}
+
+func (c *chrome) String() string {
+ return "chrome profile on " + c.Platform().String()
}
diff --git a/profiles/doc.go b/profiles/doc.go
index bee7e06..635df9d 100644
--- a/profiles/doc.go
+++ b/profiles/doc.go
@@ -17,17 +17,12 @@
// registration.
//
// This top level directory contains a 'generic' Profile and utility routines
-// used by other Profiles. It does not follow the convention of registering
-// itself via its Init function, since the expected use is that it will
-// used automatically as a default by the Runtime. Instead it provides a New
-// function. This avoids the need for every main package to import
-// "veyron.io/veyron/veyron/profiles", instead, only more specific Profiles must be so imported.
+// used by other Profiles. It should be imported whenever possible and
+// particularly by tests.
//
-// The 'net' Profile adds operating system support for varied network
+// The 'roaming' Profile adds operating system support for varied network
// configurations and in particular dhcp. It should be used by any application
-// that may 'roam' or any may be behind a 1-1 NAT.
+// that may 'roam' or any may be behind a 1-1 NAT. The 'static' profile
+// does not provide dhcp support, but is otherwise like the roaming profile.
//
-// The 'net/bluetooth' Profile adds operating system support for bluetooth
-// networking.
-// TODO(cnicolaou,ashankar): add this
package profiles
diff --git a/profiles/gce/init.go b/profiles/gce/init.go
index b5afa26..b0143a7 100644
--- a/profiles/gce/init.go
+++ b/profiles/gce/init.go
@@ -14,6 +14,7 @@
"veyron.io/veyron/veyron2/ipc"
"veyron.io/veyron/veyron2/rt"
+ "veyron.io/veyron/veyron/lib/appcycle"
"veyron.io/veyron/veyron/lib/flags"
"veyron.io/veyron/veyron/lib/netstate"
"veyron.io/veyron/veyron/profiles"
@@ -35,7 +36,9 @@
rt.RegisterProfile(&profile{})
}
-type profile struct{}
+type profile struct {
+ ac *appcycle.AppCycle
+}
func (p *profile) Name() string {
return "GCE"
@@ -54,17 +57,22 @@
return "net " + p.Platform().String()
}
-func (p *profile) Init(rt veyron2.Runtime, publisher *config.Publisher) error {
+func (p *profile) Init(rt veyron2.Runtime, publisher *config.Publisher) (veyron2.AppCycle, error) {
if !gce.RunningOnGCE() {
- return fmt.Errorf("GCE profile used on a non-GCE system")
+ return nil, fmt.Errorf("GCE profile used on a non-GCE system")
}
+ p.ac = appcycle.New(rt)
ListenSpec.Address = listenAddressFlag.String()
if ip, err := gce.ExternalIPAddress(); err != nil {
- return err
+ return p.ac, err
} else {
ListenSpec.AddressChooser = func(network string, addrs []ipc.Address) ([]ipc.Address, error) {
return []ipc.Address{&netstate.AddrIfc{&net.IPAddr{IP: ip}, "gce-nat", nil}}, nil
}
}
- return nil
+ return p.ac, nil
+}
+
+func (p *profile) Cleanup() {
+ p.ac.Shutdown()
}
diff --git a/profiles/generic.go b/profiles/generic.go
index 9aadc25..96ddcb7 100644
--- a/profiles/generic.go
+++ b/profiles/generic.go
@@ -6,6 +6,7 @@
"veyron.io/veyron/veyron2/ipc"
"veyron.io/veyron/veyron2/rt"
+ "veyron.io/veyron/veyron/lib/appcycle"
"veyron.io/veyron/veyron/profiles/internal"
_ "veyron.io/veyron/veyron/runtimes/google/rt"
)
@@ -17,7 +18,7 @@
AddressChooser: internal.IPAddressChooser,
}
-type generic struct{}
+type generic struct{ ac *appcycle.AppCycle }
var _ veyron2.Profile = (*generic)(nil)
@@ -44,9 +45,14 @@
return p
}
-func (g *generic) Init(rt veyron2.Runtime, _ *config.Publisher) error {
+func (g *generic) Init(rt veyron2.Runtime, _ *config.Publisher) (veyron2.AppCycle, error) {
rt.Logger().VI(1).Infof("%s", g)
- return nil
+ g.ac = appcycle.New(rt)
+ return g.ac, nil
+}
+
+func (g *generic) Cleanup() {
+ g.ac.Shutdown()
}
func (g *generic) String() string {
diff --git a/profiles/roaming/roaming.go b/profiles/roaming/roaming.go
index d4c0262..837a5b8 100644
--- a/profiles/roaming/roaming.go
+++ b/profiles/roaming/roaming.go
@@ -18,6 +18,7 @@
"veyron.io/veyron/veyron2/ipc"
"veyron.io/veyron/veyron2/rt"
+ "veyron.io/veyron/veyron/lib/appcycle"
"veyron.io/veyron/veyron/lib/flags"
"veyron.io/veyron/veyron/lib/netconfig"
"veyron.io/veyron/veyron/lib/netstate"
@@ -45,7 +46,9 @@
}
type profile struct {
- gce string
+ gce string
+ ac *appcycle.AppCycle
+ cleanupCh, watcherCh chan struct{}
}
func New() veyron2.Profile {
@@ -69,7 +72,7 @@
return p.Name() + " " + p.Platform().String()
}
-func (p *profile) Init(rt veyron2.Runtime, publisher *config.Publisher) error {
+func (p *profile) Init(rt veyron2.Runtime, publisher *config.Publisher) (veyron2.AppCycle, error) {
log := rt.Logger()
rt.ConfigureReservedName(debug.NewDispatcher(log.LogDir(), sflag.NewAuthorizerOrDie(), rt.VtraceStore()))
@@ -81,6 +84,8 @@
Proxy: lf.ListenProxy,
}
+ p.ac = appcycle.New(rt)
+
// Our address is private, so we test for running on GCE and for its
// 1:1 NAT configuration.
if !internal.HasPublicIP(log) {
@@ -89,7 +94,7 @@
return []ipc.Address{&netstate.AddrIfc{addr, "nat", nil}}, nil
}
p.gce = "+gce"
- return nil
+ return p.ac, nil
}
}
@@ -99,40 +104,57 @@
stop, err := publisher.CreateStream(SettingsStreamName, SettingsStreamName, ch)
if err != nil {
log.Errorf("failed to create publisher: %s", err)
- return err
+ p.ac.Shutdown()
+ return nil, err
}
- ListenSpec.StreamPublisher = publisher
- ListenSpec.StreamName = SettingsStreamName
- ListenSpec.AddressChooser = internal.IPAddressChooser
- go monitorNetworkSettings(rt, stop, ch, ListenSpec)
- return nil
-}
-
-// monitorNetworkSettings will monitor network configuration changes and
-// publish subsequent Settings to reflect any changes detected.
-func monitorNetworkSettings(rt veyron2.Runtime, stop <-chan struct{},
- ch chan<- config.Setting, listenSpec ipc.ListenSpec) {
- defer close(ch)
-
- log := rt.Logger()
prev, err := netstate.GetAccessibleIPs()
if err != nil {
- // TODO(cnicolaou): add support for shutting down profiles
- //<-stop
log.VI(2).Infof("failed to determine network state")
- return
+ p.ac.Shutdown()
+ return nil, err
}
// Start the dhcp watcher.
watcher, err := netconfig.NewNetConfigWatcher()
if err != nil {
log.VI(2).Infof("Failed to get new config watcher: %s", err)
- // TODO(cnicolaou): add support for shutting down profiles
- //<-stop
- return
+ p.ac.Shutdown()
+ return nil, err
}
+ p.cleanupCh = make(chan struct{})
+ p.watcherCh = make(chan struct{})
+
+ ListenSpec.StreamPublisher = publisher
+ ListenSpec.StreamName = SettingsStreamName
+ ListenSpec.AddressChooser = internal.IPAddressChooser
+
+ go monitorNetworkSettings(rt, watcher, prev, stop, p.cleanupCh, p.watcherCh, ch, ListenSpec)
+ return p.ac, nil
+}
+
+func (p *profile) Cleanup() {
+ if p.cleanupCh != nil {
+ close(p.cleanupCh)
+ }
+ if p.ac != nil {
+ p.ac.Shutdown()
+ }
+ if p.watcherCh != nil {
+ <-p.watcherCh
+ }
+}
+
+// monitorNetworkSettings will monitor network configuration changes and
+// publish subsequent Settings to reflect any changes detected.
+func monitorNetworkSettings(rt veyron2.Runtime, watcher netconfig.NetConfigWatcher, prev netstate.AddrList, pubStop, cleanup <-chan struct{},
+ watcherLoop chan<- struct{}, ch chan<- config.Setting, listenSpec ipc.ListenSpec) {
+ defer close(ch)
+
+ log := rt.Logger()
+
+done:
for {
select {
case <-watcher.Channel():
@@ -160,9 +182,12 @@
ch <- ipc.NewAddAddrsSetting(chosen)
}
prev = cur
- // TODO(cnicolaou): add support for shutting down profiles.
- //case <-stop:
- // return
+ case <-cleanup:
+ break done
+ case <-pubStop:
+ goto done
}
}
+ watcher.Stop()
+ close(watcherLoop)
}
diff --git a/profiles/static/static.go b/profiles/static/static.go
index 377874a..2720f1d 100644
--- a/profiles/static/static.go
+++ b/profiles/static/static.go
@@ -11,6 +11,7 @@
"veyron.io/veyron/veyron2/ipc"
"veyron.io/veyron/veyron2/rt"
+ "veyron.io/veyron/veyron/lib/appcycle"
"veyron.io/veyron/veyron/lib/flags"
"veyron.io/veyron/veyron/lib/netstate"
"veyron.io/veyron/veyron/profiles"
@@ -34,6 +35,7 @@
type static struct {
gce string
+ ac *appcycle.AppCycle
}
// New returns a new instance of a very static Profile. It can be used
@@ -55,7 +57,7 @@
return p
}
-func (p *static) Init(rt veyron2.Runtime, _ *config.Publisher) error {
+func (p *static) Init(rt veyron2.Runtime, _ *config.Publisher) (veyron2.AppCycle, error) {
log := rt.Logger()
rt.ConfigureReservedName(debug.NewDispatcher(log.LogDir(), sflag.NewAuthorizerOrDie(), rt.VtraceStore()))
@@ -67,6 +69,8 @@
Proxy: lf.ListenProxy,
}
+ p.ac = appcycle.New(rt)
+
// Our address is private, so we test for running on GCE and for its
// 1:1 NAT configuration. GCEPublicAddress returns a non-nil addr
// if we are indeed running on GCE.
@@ -76,11 +80,17 @@
return []ipc.Address{&netstate.AddrIfc{addr, "nat", nil}}, nil
}
p.gce = "+gce"
- return nil
+ return p.ac, nil
}
}
ListenSpec.AddressChooser = internal.IPAddressChooser
- return nil
+ return p.ac, nil
+}
+
+func (p *static) Cleanup() {
+ if p.ac != nil {
+ p.ac.Shutdown()
+ }
}
func (p *static) String() string {
diff --git a/runtimes/google/appcycle/appcycle.go b/runtimes/google/appcycle/appcycle.go
deleted file mode 100644
index 5c3e4de..0000000
--- a/runtimes/google/appcycle/appcycle.go
+++ /dev/null
@@ -1,81 +0,0 @@
-// Package appcycle is a stripped-down server stub implementation for the
-// AppCycle service. We can't use the generated stub under
-// veyron2/services/mgmt/appcycle because that would introduce a recursive
-// dependency on veyron/runtimes/google (via veyron2/rt).
-//
-// TODO(caprita): It would be nice to still use the stub if possible. Look
-// into the feasibility of a generated stub that does not depend on veyron2/rt.
-package appcycle
-
-import (
- "veyron.io/veyron/veyron2"
- "veyron.io/veyron/veyron2/ipc"
-)
-
-// AppCycleServerMethods is the interface a server writer
-// implements for AppCycle.
-//
-// AppCycle interfaces with the process running a veyron runtime.
-type AppCycleServerMethods interface {
- // Stop initiates shutdown of the server. It streams back periodic
- // updates to give the client an idea of how the shutdown is
- // progressing.
- Stop(AppCycleStopContext) error
- // ForceStop tells the server to shut down right away. It can be issued
- // while a Stop is outstanding if for example the client does not want
- // to wait any longer.
- ForceStop(ipc.ServerContext) error
-}
-
-// AppCycleServer returns a server stub for AppCycle.
-// It converts an implementation of AppCycleServerMethods into
-// an object that may be used by ipc.Server.
-func AppCycleServer(impl AppCycleServerMethods) AppCycleServerStub {
- return AppCycleServerStub{impl}
-}
-
-type AppCycleServerStub struct {
- impl AppCycleServerMethods
-}
-
-func (s AppCycleServerStub) Stop(ctx *AppCycleStopContextStub) error {
- return s.impl.Stop(ctx)
-}
-
-func (s AppCycleServerStub) ForceStop(call ipc.ServerCall) error {
- return s.impl.ForceStop(call)
-}
-
-// AppCycleStopContext represents the context passed to AppCycle.Stop.
-type AppCycleStopContext interface {
- ipc.ServerContext
- // SendStream returns the send side of the server stream.
- SendStream() interface {
- // Send places the item onto the output stream. Returns errors encountered
- // while sending. Blocks if there is no buffer space; will unblock when
- // buffer space is available.
- Send(item veyron2.Task) error
- }
-}
-
-type AppCycleStopContextStub struct {
- ipc.ServerCall
-}
-
-func (s *AppCycleStopContextStub) Init(call ipc.ServerCall) {
- s.ServerCall = call
-}
-
-func (s *AppCycleStopContextStub) SendStream() interface {
- Send(item veyron2.Task) error
-} {
- return implAppCycleStopContextSend{s}
-}
-
-type implAppCycleStopContextSend struct {
- s *AppCycleStopContextStub
-}
-
-func (s implAppCycleStopContextSend) Send(item veyron2.Task) error {
- return s.s.Send(item)
-}
diff --git a/runtimes/google/ipc/client.go b/runtimes/google/ipc/client.go
index aa8c55d..2b15b7c 100644
--- a/runtimes/google/ipc/client.go
+++ b/runtimes/google/ipc/client.go
@@ -272,44 +272,32 @@
index int
suffix string
flow stream.Flow
- processed bool
- err verror.E
+ errConn verror.E
+ errAccess verror.E
}
-func (c *client) tryServer(index int, server string, ch chan<- *serverStatus, done <-chan struct{}) {
- select {
- case <-done:
- return
- default:
- }
+// TODO(cnicolaou): implement real, configurable load balancing.
+func (c *client) tryServer(index int, server string, ch chan<- *serverStatus) {
status := &serverStatus{index: index}
- flow, suffix, err := c.connectFlow(server)
- if err != nil {
+ var err error
+ if status.flow, status.suffix, err = c.connectFlow(server); err != nil {
vlog.VI(2).Infof("ipc: couldn't connect to server %v: %v", server, err)
- status.err = verror.NoExistf("ipc: %q: %s", server, err)
- ch <- status
- return
+ status.errConn = verror.NoExistf("ipc: %q: %s", server, err)
+ status.flow = nil
}
- status.suffix = suffix
- status.flow = flow
- select {
- case <-done:
- flow.Close()
- default:
- ch <- status
- }
+ ch <- status
}
// tryCall makes a single attempt at a call, against possibly multiple servers.
func (c *client) tryCall(ctx context.T, name, method string, args []interface{}, opts []ipc.CallOpt) (ipc.Call, verror.E) {
ctx, _ = vtrace.WithNewSpan(ctx, fmt.Sprintf("<client>\"%s\".%s", name, method))
- _, serverPattern, name := splitObjectName(name)
+ mtPattern, serverPattern, name := splitObjectName(name)
// Resolve name unless told not to.
var servers []string
if getNoResolveOpt(opts) {
servers = []string{name}
} else {
- if resolved, err := c.ns.Resolve(ctx, name); err != nil {
+ if resolved, err := c.ns.Resolve(ctx, name, naming.RootBlessingPatternOpt(mtPattern)); err != nil {
return nil, verror.NoExistf("ipc: Resolve(%q) failed: %v", name, err)
} else {
// An empty set of protocols means all protocols...
@@ -327,69 +315,53 @@
return nil, errNoServers
}
- // Try to connect to all servers in parallel.
+ // Try to connect to all servers in parallel. Provide sufficient buffering
+ // for all of the connections to finish instantaneously. This is important
+ // because we want to process the responses in priority order; that order is
+ // indicated by the order of entries in servers. So, if two respones come in
+ // at the same 'instant', we prefer the first in the slice.
responses := make([]*serverStatus, attempts)
-
- // Provide sufficient buffering for all of the connections to finish
- // instantaneously. This is important because we want to process
- // the responses in priority order; that order is indicated by the
- // order of entries in servers. So, if two respones come in at the
- // same 'instant', we prefer the first in the slice.
ch := make(chan *serverStatus, attempts)
-
- // Read as many responses as we can before we would block.
- gatherResponses := func() {
- for {
- select {
- default:
- return
- case s := <-ch:
- responses[s.index] = s
- }
- }
+ for i, server := range servers {
+ go c.tryServer(i, server, ch)
}
delay := time.Duration(ipc.NoTimeout)
- if dl, set := ctx.Deadline(); set {
+ if dl, ok := ctx.Deadline(); ok {
delay = dl.Sub(time.Now())
}
timeoutChan := time.After(delay)
- // We'll close this channel when an RPC has been started and we've
- // irrevocably selected a server.
- done := make(chan struct{})
- // Try all of the servers in parallel.
- for i, server := range servers {
- go c.tryServer(i, server, ch, done)
- }
-
- select {
- case <-timeoutChan:
- // All calls failed if we get here.
- close(done)
- c.ns.FlushCacheEntry(name)
- return nil, verror.NoExistf("ipc: couldn't connect to server %v", name)
- case s := <-ch:
- responses[s.index] = s
- gatherResponses()
- }
-
- accessErrs := []error{}
- connErrs := []error{}
for {
-
- for _, r := range responses {
- if r == nil || r.err != nil {
- if r != nil && r.err != nil && !r.processed {
- connErrs = append(connErrs, r.err)
- r.processed = true
+ // Block for at least one new response from the server, or the timeout.
+ select {
+ case r := <-ch:
+ responses[r.index] = r
+ // Read as many more responses as we can without blocking.
+ LoopNonBlocking:
+ for {
+ select {
+ default:
+ break LoopNonBlocking
+ case r := <-ch:
+ responses[r.index] = r
}
+ }
+ case <-timeoutChan:
+ vlog.VI(2).Infof("ipc: timeout on connection to server %v ", name)
+ return c.failedTryCall(name, method, servers, responses, ch)
+ }
+
+ // Process new responses, in priority order.
+ numResponses := 0
+ for _, r := range responses {
+ if r != nil {
+ numResponses++
+ }
+ if r == nil || r.flow == nil {
continue
}
-
- flow := r.flow
- suffix := r.suffix
- flow.SetDeadline(ctx.Done())
+ r.flow.SetDeadline(ctx.Done())
var (
serverB []string
@@ -398,33 +370,28 @@
// LocalPrincipal is nil means that the client wanted to avoid
// authentication, and thus wanted to skip authorization as well.
- if flow.LocalPrincipal() != nil {
+ if r.flow.LocalPrincipal() != nil {
// Validate caveats on the server's identity for the context associated with this call.
var err error
- if serverB, grantedB, err = c.authorizeServer(flow, name, method, serverPattern, opts); err != nil {
- vlog.VI(2).Infof("ipc: client unwilling to invoke %q.%q on server %v: %v", name, method, flow.RemoteBlessings(), err)
- if !r.processed {
- accessErrs = append(accessErrs, err)
- r.err = verror.NoAccessf("ipc: unwilling to invoke %q.%q on server %v: %v", name, method, flow.RemoteBlessings(), err)
- r.processed = true
- }
- flow.Close()
+ if serverB, grantedB, err = c.authorizeServer(r.flow, name, method, serverPattern, opts); err != nil {
+ vlog.VI(2).Infof("ipc: client unwilling to invoke %q.%q on server %v: %v", name, method, r.flow.RemoteBlessings(), err)
+ r.errAccess = verror.NoAccessf("ipc: unwilling to invoke %q.%q on server %v: %v", name, method, r.flow.RemoteBlessings(), err)
+ r.flow.Close()
+ r.flow = nil
continue
}
}
- // This is the 'point of no return', so we tell the tryServer
- // goroutines to not bother sending us any more flows.
- // Once the RPC is started (fc.start below) we can't be sure
- // if it makes it to the server or not so, this code will
- // never call fc.start more than once to ensure that we
- // provide 'at-most-once' rpc semantics at this level. Retrying
- // the network connections (i.e. creating flows) is fine since
- // we can cleanup that state if we abort a call (i.e. close the
- // flow).
- close(done)
-
- fc := newFlowClient(ctx, serverB, flow, c.dc)
+ // This is the 'point of no return'; once the RPC is started (fc.start
+ // below) we can't be sure if it makes it to the server or not so, this
+ // code will never call fc.start more than once to ensure that we provide
+ // 'at-most-once' rpc semantics at this level. Retrying the network
+ // connections (i.e. creating flows) is fine since we can cleanup that
+ // state if we abort a call (i.e. close the flow).
+ //
+ // We must ensure that all flows other than r.flow are closed.
+ go cleanupTryCall(r, responses, ch)
+ fc := newFlowClient(ctx, serverB, r.flow, c.dc)
if doneChan := ctx.Done(); doneChan != nil {
go func() {
@@ -440,39 +407,61 @@
if deadline, hasDeadline := ctx.Deadline(); hasDeadline {
timeout = deadline.Sub(time.Now())
}
- if verr := fc.start(suffix, method, args, timeout, grantedB); verr != nil {
+ if verr := fc.start(r.suffix, method, args, timeout, grantedB); verr != nil {
return nil, verr
}
return fc, nil
}
-
- // Quit if we've seen an error from all parallel connection attempts
- handled := 0
- for _, r := range responses {
- if r != nil && r.err != nil {
- handled++
- }
- }
- if handled == len(responses) {
- break
- }
-
- select {
- case <-timeoutChan:
- // All remaining calls failed if we get here.
- vlog.VI(2).Infof("ipc: couldn't connect to server %v", name)
- goto quit
- case s := <-ch:
- responses[s.index] = s
- gatherResponses()
+ if numResponses == len(responses) {
+ return c.failedTryCall(name, method, servers, responses, ch)
}
}
-quit:
- close(done)
+}
+
+// cleanupTryCall ensures we've waited for every response from the tryServer
+// goroutines, and have closed the flow from each one except skip. This is a
+// blocking function; it should be called in its own goroutine.
+func cleanupTryCall(skip *serverStatus, responses []*serverStatus, ch chan *serverStatus) {
+ numPending := 0
+ for _, r := range responses {
+ switch {
+ case r == nil:
+ // The response hasn't arrived yet.
+ numPending++
+ case r == skip || r.flow == nil:
+ // Either we should skip this flow, or we've closed the flow for this
+ // response already; nothing more to do.
+ default:
+ // We received the response, but haven't closed the flow yet.
+ r.flow.Close()
+ }
+ }
+ // Now we just need to wait for the pending responses and close their flows.
+ for i := 0; i < numPending; i++ {
+ if r := <-ch; r.flow != nil {
+ r.flow.Close()
+ }
+ }
+}
+
+// failedTryCall performs asynchronous cleanup for tryCall, and returns an
+// appropriate error from the responses we've already received. All parallel
+// calls in tryCall failed or we timed out if we get here.
+func (c *client) failedTryCall(name, method string, servers []string, responses []*serverStatus, ch chan *serverStatus) (ipc.Call, verror.E) {
+ go cleanupTryCall(nil, responses, ch)
c.ns.FlushCacheEntry(name)
// TODO(cnicolaou): introduce a third error code here for mixed
// conn/access errors.
- return nil, verror.NoExistf("ipc: client failed to invoke %q.%q: on %v", name, method, servers, append(connErrs, accessErrs...))
+ var errs []verror.E
+ for _, r := range responses {
+ switch {
+ case r != nil && r.errConn != nil:
+ errs = append(errs, r.errConn)
+ case r != nil && r.errAccess != nil:
+ errs = append(errs, r.errAccess)
+ }
+ }
+ return nil, verror.NoExistf("ipc: client failed to invoke %q.%q: on %v: %v", name, method, servers, errs)
}
// authorizeServer validates that the server (remote end of flow) has the credentials to serve
diff --git a/runtimes/google/ipc/client_test.go b/runtimes/google/ipc/client_test.go
new file mode 100644
index 0000000..d5be886
--- /dev/null
+++ b/runtimes/google/ipc/client_test.go
@@ -0,0 +1,125 @@
+package ipc_test
+
+import (
+ "fmt"
+ "os"
+ "testing"
+ "time"
+
+ "veyron.io/veyron/veyron2/naming"
+ "veyron.io/veyron/veyron2/rt"
+ "veyron.io/veyron/veyron2/vlog"
+
+ "veyron.io/veyron/veyron/lib/expect"
+ "veyron.io/veyron/veyron/lib/flags/consts"
+ "veyron.io/veyron/veyron/lib/modules"
+ "veyron.io/veyron/veyron/lib/modules/core"
+)
+
+func init() {
+ rt.Init()
+}
+
+func testArgs(args ...string) []string {
+ var targs = []string{"--", "--veyron.tcp.address=127.0.0.1:0"}
+ return append(targs, args...)
+}
+
+func runMountTable(t *testing.T) (*modules.Shell, func()) {
+ sh := modules.NewShell(".*")
+ core.Install(sh)
+ root, err := sh.Start(core.RootMTCommand, nil, testArgs()...)
+ if err != nil {
+ t.Fatalf("unexpected error for root mt: %s", err)
+ }
+ sh.Forget(root)
+
+ rootSession := expect.NewSession(t, root.Stdout(), time.Minute)
+ rootName := rootSession.ExpectVar("MT_NAME")
+ if t.Failed() {
+ t.Fatalf("%s", rootSession.Error())
+ }
+ sh.SetVar(consts.NamespaceRootPrefix, rootName)
+ sh.Start(core.SetNamespaceRootsCommand, nil, rootName)
+
+ deferFn := func() {
+ if testing.Verbose() {
+ vlog.Infof("------ root shutdown ------")
+ root.Shutdown(os.Stderr, os.Stderr)
+ } else {
+ root.Shutdown(nil, nil)
+ }
+ }
+ return sh, deferFn
+}
+
+func runClient(t *testing.T, sh *modules.Shell) error {
+ clt, err := sh.Start(core.EchoClientCommand, nil, "echoServer", "a message")
+ if err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+ s := expect.NewSession(t, clt.Stdout(), 30*time.Second)
+ s.Expect("echoServer: a message")
+ if s.Failed() {
+ return s.Error()
+ }
+ return nil
+}
+
+func numServers(t *testing.T, sh *modules.Shell, name string) string {
+ r, err := sh.Start(core.ResolveCommand, nil, "echoServer")
+ if err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+ s := expect.NewSession(t, r.Stdout(), time.Minute)
+ rn := s.ExpectVar("RN")
+ return rn
+}
+
+// TODO(cnicolaou): figure out how to test and see what the internals
+// of tryCall are doing - e.g. using stats counters.
+func TestMultipleEndpoints(t *testing.T) {
+ sh, fn := runMountTable(t)
+ defer fn()
+ srv, _ := sh.Start(core.EchoServerCommand, nil, testArgs("echoServer", "echoServer")...)
+ s := expect.NewSession(t, srv.Stdout(), time.Minute)
+ s.ExpectVar("NAME")
+
+ runClient(t, sh)
+
+ // Create a fake set of 100 entries in the mount table
+ for i := 0; i < 100; i++ {
+ // 203.0.113.0 is TEST-NET-3 from RFC5737
+ ep := naming.FormatEndpoint("tcp", fmt.Sprintf("203.0.113.%d:443", i))
+ n := naming.JoinAddressName(ep, "")
+ h, err := sh.Start(core.MountCommand, nil, "echoServer", n, "1h")
+ if err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+ if err := h.Shutdown(nil, os.Stderr); err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+ }
+
+ // Verify that there are 101 entries for echoServer in the mount table.
+ if got, want := numServers(t, sh, "echoServer"), "101"; got != want {
+ vlog.Fatalf("got: %q, want: %q", got, want)
+ }
+
+ // TODO(cnicolaou): ok, so it works, but I'm not sure how
+ // long it should take or if the parallel connection code
+ // really works. Use counters to inspect it for example.
+ if err := runClient(t, sh); err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+
+ srv.CloseStdin()
+ srv.Shutdown(nil, nil)
+
+ // TODO(cnicolaou,p): figure out why the real entry isn't removed
+ // from the mount table.
+ // Verify that there are 100 entries for echoServer in the mount table.
+ if got, want := numServers(t, sh, "echoServer"), "101"; got != want {
+ vlog.Fatalf("got: %q, want: %q", got, want)
+ }
+}
diff --git a/runtimes/google/naming/namespace/all_test.go b/runtimes/google/naming/namespace/all_test.go
index 4192a68..163bcc9 100644
--- a/runtimes/google/naming/namespace/all_test.go
+++ b/runtimes/google/naming/namespace/all_test.go
@@ -8,6 +8,7 @@
"time"
"veyron.io/veyron/veyron2"
+ "veyron.io/veyron/veyron2/context"
"veyron.io/veyron/veyron2/ipc"
"veyron.io/veyron/veyron2/naming"
"veyron.io/veyron/veyron2/options"
@@ -20,6 +21,7 @@
"veyron.io/veyron/veyron/lib/glob"
"veyron.io/veyron/veyron/lib/testutil"
_ "veyron.io/veyron/veyron/profiles"
+ "veyron.io/veyron/veyron/runtimes/google/ipc/stream/sectest"
"veyron.io/veyron/veyron/runtimes/google/naming/namespace"
service "veyron.io/veyron/veyron/services/mounttable/lib"
)
@@ -138,20 +140,28 @@
}
}
-func testResolveToMountTable(t *testing.T, r veyron2.Runtime, ns naming.Namespace, name string, want ...string) {
- servers, err := ns.ResolveToMountTable(r.NewContext(), name)
+func doResolveTest(t *testing.T, fname string, f func(context.T, string, ...naming.ResolveOpt) ([]string, error), ctx context.T, name string, want []string, opts ...naming.ResolveOpt) {
+ servers, err := f(ctx, name, opts...)
if err != nil {
- boom(t, "Failed to ResolveToMountTable %q: %s", name, err)
+ boom(t, "Failed to %s %s: %s", fname, name, err)
}
- compare(t, "ResolveToMountTable", name, servers, want)
+ compare(t, fname, name, servers, want)
+}
+
+func testResolveToMountTable(t *testing.T, r veyron2.Runtime, ns naming.Namespace, name string, want ...string) {
+ doResolveTest(t, "ResolveToMountTable", ns.ResolveToMountTable, r.NewContext(), name, want)
+}
+
+func testResolveToMountTableWithPattern(t *testing.T, r veyron2.Runtime, ns naming.Namespace, name string, pattern naming.ResolveOpt, want ...string) {
+ doResolveTest(t, "ResolveToMountTable", ns.ResolveToMountTable, r.NewContext(), name, want, pattern)
}
func testResolve(t *testing.T, r veyron2.Runtime, ns naming.Namespace, name string, want ...string) {
- servers, err := ns.Resolve(r.NewContext(), name)
- if err != nil {
- boom(t, "Failed to Resolve %q: %s", name, err)
- }
- compare(t, "Resolve", name, servers, want)
+ doResolveTest(t, "Resolve", ns.Resolve, r.NewContext(), name, want)
+}
+
+func testResolveWithPattern(t *testing.T, r veyron2.Runtime, ns naming.Namespace, name string, pattern naming.ResolveOpt, want ...string) {
+ doResolveTest(t, "Resolve", ns.Resolve, r.NewContext(), name, want, pattern)
}
func testUnresolve(t *testing.T, r veyron2.Runtime, ns naming.Namespace, name string, want ...string) {
@@ -610,3 +620,56 @@
t.Errorf("namespace.New should have failed with an unrooted name")
}
}
+
+func bless(blesser, delegate security.Principal, extension string) {
+ b, err := blesser.Bless(delegate.PublicKey(), blesser.BlessingStore().Default(), extension, security.UnconstrainedUse())
+ if err != nil {
+ panic(err)
+ }
+ delegate.BlessingStore().SetDefault(b)
+}
+
+func TestRootBlessing(t *testing.T) {
+ // We need the default runtime for the server-side mounttable code
+ // which references rt.R() to create new endpoints
+ cr := rt.Init()
+ r, _ := rt.New() // We use a different runtime for the client side.
+
+ proot := sectest.NewPrincipal("root")
+ bless(proot, r.Principal(), "server")
+ bless(proot, cr.Principal(), "client")
+
+ cr.Principal().AddToRoots(proot.BlessingStore().Default())
+ r.Principal().AddToRoots(proot.BlessingStore().Default())
+
+ root, mts, _, stopper := createNamespace(t, r)
+ defer stopper()
+ ns := r.Namespace()
+
+ name := naming.Join(root.name, mt2MP)
+ // First check with a non-matching blessing pattern.
+ _, err := ns.Resolve(r.NewContext(), name, naming.RootBlessingPatternOpt("root/foobar"))
+ if !verror.Is(err, verror.NoAccess.ID) {
+ t.Errorf("Resolve expected NoAccess error, got %v", err)
+ }
+ _, err = ns.ResolveToMountTable(r.NewContext(), name, naming.RootBlessingPatternOpt("root/foobar"))
+ if !verror.Is(err, verror.NoAccess.ID) {
+ t.Errorf("ResolveToMountTable expected NoAccess error, got %v", err)
+ }
+
+ // Now check a matching pattern.
+ testResolveWithPattern(t, r, ns, name, naming.RootBlessingPatternOpt("root/server"), mts[mt2MP].name)
+ testResolveToMountTableWithPattern(t, r, ns, name, naming.RootBlessingPatternOpt("root/server"), name)
+
+ // After successful lookup it should be cached, so the pattern doesn't matter.
+ testResolveWithPattern(t, r, ns, name, naming.RootBlessingPatternOpt("root/foobar"), mts[mt2MP].name)
+
+ // Test calling a method.
+ jokeName := naming.Join(root.name, mt4MP, j1MP)
+ runServer(t, r, &dispatcher{}, naming.Join(mts["mt4"].name, j1MP))
+ _, err = r.Client().StartCall(r.NewContext(), "[root/foobar]"+jokeName, "KnockKnock", nil)
+ if err == nil {
+ t.Errorf("StartCall expected NoAccess error, got %v", err)
+ }
+ knockKnock(t, r, "[root/server]"+jokeName)
+}
diff --git a/runtimes/google/naming/namespace/resolve.go b/runtimes/google/naming/namespace/resolve.go
index eb69a3c..8ac7773 100644
--- a/runtimes/google/naming/namespace/resolve.go
+++ b/runtimes/google/naming/namespace/resolve.go
@@ -2,6 +2,7 @@
import (
"errors"
+ "fmt"
"runtime"
"veyron.io/veyron/veyron2/context"
@@ -12,11 +13,17 @@
"veyron.io/veyron/veyron2/vlog"
)
-func (ns *namespace) resolveAgainstMountTable(ctx context.T, client ipc.Client, e *naming.MountEntry) (*naming.MountEntry, error) {
+func (ns *namespace) resolveAgainstMountTable(ctx context.T, client ipc.Client, e *naming.MountEntry, pattern string) (*naming.MountEntry, error) {
// Try each server till one answers.
finalErr := errors.New("no servers to resolve query")
for _, s := range e.Servers {
+ var pattern_and_name string
name := naming.JoinAddressName(s.Server, e.Name)
+ if pattern != "" {
+ pattern_and_name = naming.JoinAddressName(s.Server, fmt.Sprintf("[%s]%s", pattern, e.Name))
+ } else {
+ pattern_and_name = name
+ }
// First check the cache.
if ne, err := ns.resolutionCache.lookup(name); err == nil {
vlog.VI(2).Infof("resolveAMT %s from cache -> %v", name, convertServersToStrings(ne.Servers, ne.Name))
@@ -24,7 +31,7 @@
}
// Not in cache, call the real server.
callCtx, _ := ctx.WithTimeout(callTimeout)
- call, err := client.StartCall(callCtx, name, "ResolveStepX", nil, options.NoResolve(true))
+ call, err := client.StartCall(callCtx, pattern_and_name, "ResolveStepX", nil, options.NoResolve(true))
if err != nil {
finalErr = err
vlog.VI(2).Infof("ResolveStep.StartCall %s failed: %s", name, err)
@@ -61,7 +68,7 @@
}
// ResolveX implements veyron2/naming.Namespace.
-func (ns *namespace) ResolveX(ctx context.T, name string) (*naming.MountEntry, error) {
+func (ns *namespace) ResolveX(ctx context.T, name string, opts ...naming.ResolveOpt) (*naming.MountEntry, error) {
defer vlog.LogCall()()
e, _ := ns.rootMountEntry(name)
if vlog.V(2) {
@@ -72,6 +79,7 @@
if len(e.Servers) == 0 {
return nil, verror.Make(naming.ErrNoSuchName, ctx, name)
}
+ pattern := getRootPattern(opts)
// Iterate walking through mount table servers.
for remaining := ns.maxResolveDepth; remaining > 0; remaining-- {
vlog.VI(2).Infof("ResolveX(%s) loop %v", name, *e)
@@ -81,7 +89,7 @@
}
var err error
curr := e
- if e, err = ns.resolveAgainstMountTable(ctx, ns.rt.Client(), curr); err != nil {
+ if e, err = ns.resolveAgainstMountTable(ctx, ns.rt.Client(), curr, pattern); err != nil {
// If the name could not be found in the mount table, return an error.
if verror.Is(err, naming.ErrNoSuchNameRoot.ID) {
err = verror.Make(naming.ErrNoSuchName, ctx, name)
@@ -90,20 +98,26 @@
vlog.VI(1).Infof("ResolveX(%s) -> (NoSuchName: %v)", name, curr)
return nil, err
}
+ if verror.Is(err, verror.NoAccess.ID) {
+ vlog.VI(1).Infof("ResolveX(%s) -> (NoAccess: %v)", name, curr)
+ return nil, err
+
+ }
// Any other failure (server not found, no ResolveStep
// method, etc.) are a sign that iterative resolution can
// stop.
vlog.VI(1).Infof("ResolveX(%s) -> %v", name, curr)
return curr, nil
}
+ pattern = ""
}
return nil, verror.Make(naming.ErrResolutionDepthExceeded, ctx)
}
// Resolve implements veyron2/naming.Namespace.
-func (ns *namespace) Resolve(ctx context.T, name string) ([]string, error) {
+func (ns *namespace) Resolve(ctx context.T, name string, opts ...naming.ResolveOpt) ([]string, error) {
defer vlog.LogCall()()
- e, err := ns.ResolveX(ctx, name)
+ e, err := ns.ResolveX(ctx, name, opts...)
if err != nil {
return nil, err
}
@@ -111,7 +125,7 @@
}
// ResolveToMountTableX implements veyron2/naming.Namespace.
-func (ns *namespace) ResolveToMountTableX(ctx context.T, name string) (*naming.MountEntry, error) {
+func (ns *namespace) ResolveToMountTableX(ctx context.T, name string, opts ...naming.ResolveOpt) (*naming.MountEntry, error) {
defer vlog.LogCall()()
e, _ := ns.rootMountEntry(name)
if vlog.V(2) {
@@ -122,6 +136,7 @@
if len(e.Servers) == 0 {
return nil, verror.Make(naming.ErrNoMountTable, ctx)
}
+ pattern := getRootPattern(opts)
last := e
for remaining := ns.maxResolveDepth; remaining > 0; remaining-- {
vlog.VI(2).Infof("ResolveToMountTableX(%s) loop %v", name, e)
@@ -132,7 +147,7 @@
vlog.VI(1).Infof("ResolveToMountTableX(%s) -> %v", name, last)
return last, nil
}
- if e, err = ns.resolveAgainstMountTable(ctx, ns.rt.Client(), e); err != nil {
+ if e, err = ns.resolveAgainstMountTable(ctx, ns.rt.Client(), e, pattern); err != nil {
if verror.Is(err, naming.ErrNoSuchNameRoot.ID) {
vlog.VI(1).Infof("ResolveToMountTableX(%s) -> %v (NoSuchRoot: %v)", name, last, curr)
return last, nil
@@ -157,14 +172,15 @@
return nil, err
}
last = curr
+ pattern = ""
}
return nil, verror.Make(naming.ErrResolutionDepthExceeded, ctx)
}
// ResolveToMountTable implements veyron2/naming.Namespace.
-func (ns *namespace) ResolveToMountTable(ctx context.T, name string) ([]string, error) {
+func (ns *namespace) ResolveToMountTable(ctx context.T, name string, opts ...naming.ResolveOpt) ([]string, error) {
defer vlog.LogCall()()
- e, err := ns.ResolveToMountTableX(ctx, name)
+ e, err := ns.ResolveToMountTableX(ctx, name, opts...)
if err != nil {
return nil, err
}
@@ -254,3 +270,12 @@
}
return flushed
}
+
+func getRootPattern(opts []naming.ResolveOpt) string {
+ for _, opt := range opts {
+ if pattern, ok := opt.(naming.RootBlessingPatternOpt); ok {
+ return string(pattern)
+ }
+ }
+ return ""
+}
diff --git a/runtimes/google/rt/mgmt.go b/runtimes/google/rt/mgmt.go
index 1d61739..44e52e4 100644
--- a/runtimes/google/rt/mgmt.go
+++ b/runtimes/google/rt/mgmt.go
@@ -2,8 +2,6 @@
import (
"fmt"
- "os"
- "sync"
"time"
"veyron.io/veyron/veyron2"
@@ -13,17 +11,51 @@
"veyron.io/veyron/veyron2/options"
"veyron.io/veyron/veyron/lib/exec"
- "veyron.io/veyron/veyron/runtimes/google/appcycle"
)
-type mgmtImpl struct {
- sync.RWMutex
- waiters []chan<- string
- taskTrackers []chan<- veyron2.Task
- task veyron2.Task
- shutDown bool
- rt *vrt
- server ipc.Server // Serves AppCycle service.
+// TODO(cnicolaou,caprita): move this all out of the runtime when we
+// refactor the profiles/runtime interface.
+func (rt *vrt) initMgmt(appCycle veyron2.AppCycle, handle *exec.ChildHandle) (ipc.Server, error) {
+ // Do not initialize the mgmt runtime if the process has not
+ // been started through the veyron exec library by a node
+ // manager.
+ if handle == nil {
+ return nil, nil
+ }
+ parentName, err := handle.Config.Get(mgmt.ParentNameConfigKey)
+ if err != nil {
+ return nil, nil
+ }
+ listenSpec, err := getListenSpec(handle)
+ if err != nil {
+ return nil, err
+ }
+ var serverOpts []ipc.ServerOpt
+ parentPeerPattern, err := handle.Config.Get(mgmt.ParentBlessingConfigKey)
+ if err == nil && parentPeerPattern != "" {
+ // Grab the blessing from our blessing store that the parent
+ // told us to use so they can talk to us.
+ serverBlessing := rt.Principal().BlessingStore().ForPeer(parentPeerPattern)
+ serverOpts = append(serverOpts, options.ServerBlessings{serverBlessing})
+ }
+ server, err := rt.NewServer(serverOpts...)
+ if err != nil {
+ return nil, err
+ }
+ ep, err := server.Listen(*listenSpec)
+ if err != nil {
+ return nil, err
+ }
+ if err := server.Serve("", appCycle.Remote(), nil); err != nil {
+ server.Stop()
+ return nil, err
+ }
+ err = rt.callbackToParent(parentName, naming.JoinAddressName(ep.String(), ""))
+ if err != nil {
+ server.Stop()
+ return nil, err
+ }
+ return server, nil
}
func getListenSpec(handle *exec.ChildHandle) (*ipc.ListenSpec, error) {
@@ -45,47 +77,9 @@
return &ipc.ListenSpec{Protocol: protocol, Address: address}, nil
}
-func (m *mgmtImpl) initMgmt(rt *vrt, handle *exec.ChildHandle) error {
- // Do not initialize the mgmt runtime if the process has not
- // been started through the veyron exec library by a node
- // manager.
- if handle == nil {
- return nil
- }
- parentName, err := handle.Config.Get(mgmt.ParentNameConfigKey)
- if err != nil {
- return nil
- }
- listenSpec, err := getListenSpec(handle)
- if err != nil {
- return err
- }
- var serverOpts []ipc.ServerOpt
- parentPeerPattern, err := handle.Config.Get(mgmt.ParentBlessingConfigKey)
- if err == nil && parentPeerPattern != "" {
- // Grab the blessing from our blessing store that the parent
- // told us to use so they can talk to us.
- serverBlessing := rt.Principal().BlessingStore().ForPeer(parentPeerPattern)
- serverOpts = append(serverOpts, options.ServerBlessings{serverBlessing})
- }
- m.rt = rt
- m.server, err = rt.NewServer(serverOpts...)
- if err != nil {
- return err
- }
- ep, err := m.server.Listen(*listenSpec)
- if err != nil {
- return err
- }
- if err := m.server.Serve("", appcycle.AppCycleServer(m), nil); err != nil {
- return err
- }
- return m.callbackToParent(parentName, naming.JoinAddressName(ep.String(), ""))
-}
-
-func (m *mgmtImpl) callbackToParent(parentName, myName string) error {
- ctx, _ := m.rt.NewContext().WithTimeout(10 * time.Second)
- call, err := m.rt.Client().StartCall(ctx, parentName, "Set", []interface{}{mgmt.AppCycleManagerConfigKey, myName})
+func (rt *vrt) callbackToParent(parentName, myName string) error {
+ ctx, _ := rt.NewContext().WithTimeout(10 * time.Second)
+ call, err := rt.Client().StartCall(ctx, parentName, "Set", []interface{}{mgmt.AppCycleManagerConfigKey, myName})
if err != nil {
return err
}
@@ -94,113 +88,3 @@
}
return err
}
-
-func (m *mgmtImpl) shutdown() {
- m.Lock()
- if m.shutDown {
- m.Unlock()
- return
- }
- m.shutDown = true
- for _, t := range m.taskTrackers {
- close(t)
- }
- m.taskTrackers = nil
- server := m.server
- m.Unlock()
- if server != nil {
- server.Stop()
- }
-}
-
-func (rt *vrt) stop(msg string) {
- rt.mgmt.RLock()
- defer rt.mgmt.RUnlock()
- if len(rt.mgmt.waiters) == 0 {
- os.Exit(veyron2.UnhandledStopExitCode)
- }
- for _, w := range rt.mgmt.waiters {
- select {
- case w <- msg:
- default:
- }
- }
-}
-
-func (rt *vrt) Stop() {
- rt.stop(veyron2.LocalStop)
-}
-
-func (*vrt) ForceStop() {
- os.Exit(veyron2.ForceStopExitCode)
-}
-
-func (rt *vrt) WaitForStop(ch chan<- string) {
- rt.mgmt.Lock()
- defer rt.mgmt.Unlock()
- rt.mgmt.waiters = append(rt.mgmt.waiters, ch)
-}
-
-func (rt *vrt) TrackTask(ch chan<- veyron2.Task) {
- rt.mgmt.Lock()
- defer rt.mgmt.Unlock()
- if rt.mgmt.shutDown {
- close(ch)
- return
- }
- rt.mgmt.taskTrackers = append(rt.mgmt.taskTrackers, ch)
-}
-
-func (rt *vrt) advanceTask(progress, goal int) {
- rt.mgmt.Lock()
- defer rt.mgmt.Unlock()
- rt.mgmt.task.Goal += goal
- rt.mgmt.task.Progress += progress
- for _, t := range rt.mgmt.taskTrackers {
- select {
- case t <- rt.mgmt.task:
- default:
- // TODO(caprita): Make it such that the latest task
- // update is always added to the channel even if channel
- // is full. One way is to pull an element from t and
- // then re-try the push.
- }
- }
-}
-
-func (rt *vrt) AdvanceGoal(delta int) {
- if delta <= 0 {
- return
- }
- rt.advanceTask(0, delta)
-}
-
-func (rt *vrt) AdvanceProgress(delta int) {
- if delta <= 0 {
- return
- }
- rt.advanceTask(delta, 0)
-}
-
-func (m *mgmtImpl) Stop(ctx appcycle.AppCycleStopContext) error {
- // The size of the channel should be reasonably sized to expect not to
- // miss updates while we're waiting for the stream to unblock.
- ch := make(chan veyron2.Task, 10)
- m.rt.TrackTask(ch)
- // TODO(caprita): Include identity of Stop issuer in message.
- m.rt.stop(veyron2.RemoteStop)
- for {
- task, ok := <-ch
- if !ok {
- // Channel closed, meaning process shutdown is imminent.
- break
- }
- ctx.SendStream().Send(task)
- }
- return nil
-}
-
-func (m *mgmtImpl) ForceStop(ipc.ServerContext) error {
- m.rt.ForceStop()
- return fmt.Errorf("ForceStop should not reply as the process should be dead")
-}
diff --git a/runtimes/google/rt/mgmt_test.go b/runtimes/google/rt/mgmt_test.go
index fc39919..cd3db2a 100644
--- a/runtimes/google/rt/mgmt_test.go
+++ b/runtimes/google/rt/mgmt_test.go
@@ -45,7 +45,8 @@
// TestBasic verifies that the basic plumbing works: LocalStop calls result in
// stop messages being sent on the channel passed to WaitForStop.
func TestBasic(t *testing.T) {
- m, _ := rt.New(profileOpt)
+ r, _ := rt.New(profileOpt)
+ m := r.AppCycle()
ch := make(chan string, 1)
m.WaitForStop(ch)
for i := 0; i < 10; i++ {
@@ -64,7 +65,8 @@
// TestMultipleWaiters verifies that the plumbing works with more than one
// registered wait channel.
func TestMultipleWaiters(t *testing.T) {
- m, _ := rt.New(profileOpt)
+ r, _ := rt.New(profileOpt)
+ m := r.AppCycle()
ch1 := make(chan string, 1)
m.WaitForStop(ch1)
ch2 := make(chan string, 1)
@@ -84,7 +86,8 @@
// channel is not being drained: once the channel's buffer fills up, future
// Stops become no-ops.
func TestMultipleStops(t *testing.T) {
- m, _ := rt.New(profileOpt)
+ r, _ := rt.New(profileOpt)
+ m := r.AppCycle()
ch := make(chan string, 1)
m.WaitForStop(ch)
for i := 0; i < 10; i++ {
@@ -101,7 +104,8 @@
}
func noWaiters(stdin io.Reader, stdout, stderr io.Writer, env map[string]string, args ...string) error {
- m, _ := rt.New(profileOpt)
+ r, _ := rt.New(profileOpt)
+ m := r.AppCycle()
fmt.Fprintf(stdout, "ready\n")
modules.WaitForEOF(stdin)
m.Stop()
@@ -126,7 +130,8 @@
}
func forceStop(stdin io.Reader, stdout, stderr io.Writer, env map[string]string, args ...string) error {
- m, _ := rt.New(profileOpt)
+ r, _ := rt.New(profileOpt)
+ m := r.AppCycle()
fmt.Fprintf(stdout, "ready\n")
modules.WaitForEOF(stdin)
m.WaitForStop(make(chan string, 1))
@@ -153,7 +158,7 @@
}
}
-func checkProgress(t *testing.T, ch <-chan veyron2.Task, progress, goal int) {
+func checkProgress(t *testing.T, ch <-chan veyron2.Task, progress, goal int32) {
if want, got := (veyron2.Task{progress, goal}), <-ch; !reflect.DeepEqual(want, got) {
t.Errorf("Unexpected progress: want %+v, got %+v", want, got)
}
@@ -170,7 +175,8 @@
// TestProgress verifies that the ticker update/track logic works for a single
// tracker.
func TestProgress(t *testing.T) {
- m, _ := rt.New(profileOpt)
+ r, _ := rt.New(profileOpt)
+ m := r.AppCycle()
m.AdvanceGoal(50)
ch := make(chan veyron2.Task, 1)
m.TrackTask(ch)
@@ -190,7 +196,7 @@
checkNoProgress(t, ch)
m.AdvanceGoal(0)
checkNoProgress(t, ch)
- m.Cleanup()
+ r.Cleanup()
if _, ok := <-ch; ok {
t.Errorf("Expected channel to be closed")
}
@@ -200,7 +206,8 @@
// works for more than one tracker. It also ensures that the runtime doesn't
// block when the tracker channels are full.
func TestProgressMultipleTrackers(t *testing.T) {
- m, _ := rt.New(profileOpt)
+ r, _ := rt.New(profileOpt)
+ m := r.AppCycle()
// ch1 is 1-buffered, ch2 is 2-buffered.
ch1, ch2 := make(chan veyron2.Task, 1), make(chan veyron2.Task, 2)
m.TrackTask(ch1)
@@ -223,7 +230,7 @@
m.AdvanceGoal(4)
checkProgress(t, ch1, 11, 4)
checkProgress(t, ch2, 11, 4)
- m.Cleanup()
+ r.Cleanup()
if _, ok := <-ch1; ok {
t.Errorf("Expected channel to be closed")
}
@@ -237,15 +244,16 @@
if err != nil {
return err
}
+ m := r.AppCycle()
defer r.Cleanup()
ch := make(chan string, 1)
- r.WaitForStop(ch)
+ m.WaitForStop(ch)
fmt.Fprintf(stdout, "Got %s\n", <-ch)
- r.AdvanceGoal(10)
+ m.AdvanceGoal(10)
fmt.Fprintf(stdout, "Doing some work\n")
- r.AdvanceProgress(2)
+ m.AdvanceProgress(2)
fmt.Fprintf(stdout, "Doing some more work\n")
- r.AdvanceProgress(5)
+ m.AdvanceProgress(5)
return nil
}
@@ -268,7 +276,6 @@
t.Fatalf("Got error: %v", err)
}
ch := make(chan string)
-
var ep naming.Endpoint
if ep, err = server.Listen(profiles.LocalListenSpec); err != nil {
t.Fatalf("Got error: %v", err)
@@ -277,7 +284,6 @@
t.Fatalf("Got error: %v", err)
}
return server, naming.JoinAddressName(ep.String(), ""), ch
-
}
func setupRemoteAppCycleMgr(t *testing.T) (veyron2.Runtime, modules.Handle, appcycle.AppCycleClientMethods, func()) {
@@ -298,8 +304,12 @@
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
-
- appCycleName := <-ch
+ appCycleName := ""
+ select {
+ case appCycleName = <-ch:
+ case <-time.After(time.Minute):
+ t.Errorf("timeout")
+ }
appCycle := appcycle.AppCycleClient(appCycleName)
return r, h, appCycle, func() {
configServer.Stop()
diff --git a/runtimes/google/rt/rt.go b/runtimes/google/rt/rt.go
index 8ff2225..ab635b2 100644
--- a/runtimes/google/rt/rt.go
+++ b/runtimes/google/rt/rt.go
@@ -30,8 +30,7 @@
var errCleaningUp = fmt.Errorf("operation rejected: runtime is being cleaned up")
type vrt struct {
- mu sync.Mutex
-
+ mu sync.Mutex
profile veyron2.Profile
publisher *config.Publisher
sm []stream.Manager // GUARDED_BY(mu)
@@ -39,7 +38,8 @@
signals chan os.Signal
principal security.Principal
client ipc.Client
- mgmt *mgmtImpl
+ ac veyron2.AppCycle
+ acServer ipc.Server
flags flags.RuntimeFlags
preferredProtocols options.PreferredProtocols
reservedDisp ipc.Dispatcher
@@ -69,7 +69,6 @@
})
flags := runtimeFlags.RuntimeFlags()
rt := &vrt{
- mgmt: new(mgmtImpl),
lang: i18n.LangIDFromEnv(),
program: filepath.Base(os.Args[0]),
flags: flags,
@@ -136,14 +135,14 @@
}
rt.publisher = config.NewPublisher()
- if err := rt.profile.Init(rt, rt.publisher); err != nil {
+ if rt.ac, err = rt.profile.Init(rt, rt.publisher); err != nil {
return nil, err
}
-
- if err := rt.mgmt.initMgmt(rt, handle); err != nil {
+ server, err := rt.initMgmt(rt.ac, handle)
+ if err != nil {
return nil, err
}
-
+ rt.acServer = server
vlog.VI(2).Infof("rt.Init done")
return rt, nil
}
@@ -156,6 +155,10 @@
return rt.profile
}
+func (rt *vrt) AppCycle() veyron2.AppCycle {
+ return rt.ac
+}
+
func (rt *vrt) ConfigureReservedName(server ipc.Dispatcher, opts ...ipc.ServerOpt) {
rt.mu.Lock()
defer rt.mu.Unlock()
@@ -188,7 +191,11 @@
// TODO(caprita): Consider shutting down mgmt later in the runtime's
// shutdown sequence, to capture some of the runtime internal shutdown
// tasks in the task tracker.
- rt.mgmt.shutdown()
+ rt.profile.Cleanup()
+ if rt.acServer != nil {
+ rt.acServer.Stop()
+ }
+
// It's ok to access rt.sm out of lock below, since a Mutex acts as a
// barrier in Go and hence we're guaranteed that cleaningUp is true at
// this point. The only code that mutates rt.sm is NewStreamManager in
diff --git a/runtimes/google/rt/signal_test.go b/runtimes/google/rt/signal_test.go
index f478d6d..de2d98c 100644
--- a/runtimes/google/rt/signal_test.go
+++ b/runtimes/google/rt/signal_test.go
@@ -14,6 +14,7 @@
"veyron.io/veyron/veyron2/options"
"veyron.io/veyron/veyron2/rt"
+ "veyron.io/veyron/veyron/lib/appcycle"
"veyron.io/veyron/veyron/lib/expect"
"veyron.io/veyron/veyron/lib/modules"
)
@@ -42,10 +43,12 @@
return "myprofile on " + mp.Platform().String()
}
-func (mp *myprofile) Init(veyron2.Runtime, *config.Publisher) error {
- return nil
+func (mp *myprofile) Init(rt veyron2.Runtime, _ *config.Publisher) (veyron2.AppCycle, error) {
+ return appcycle.New(rt), nil
}
+func (mp *myprofile) Cleanup() {}
+
func simpleEchoProgram(stdin io.Reader, stdout io.Writer) {
fmt.Fprintf(stdout, "ready\n")
scanner := bufio.NewScanner(stdin)
diff --git a/runtimes/google/testing/mocks/naming/namespace.go b/runtimes/google/testing/mocks/naming/namespace.go
index bff47f6..f6b0ba0 100644
--- a/runtimes/google/testing/mocks/naming/namespace.go
+++ b/runtimes/google/testing/mocks/naming/namespace.go
@@ -57,7 +57,7 @@
return nil
}
-func (ns *namespace) Resolve(ctx context.T, name string) ([]string, error) {
+func (ns *namespace) Resolve(ctx context.T, name string, opts ...naming.ResolveOpt) ([]string, error) {
defer vlog.LogCall()()
if address, _ := naming.SplitAddressName(name); len(address) > 0 {
return []string{name}, nil
@@ -77,7 +77,7 @@
return nil, verror.NoExistf("Resolve name %q not found in %v", name, ns.mounts)
}
-func (ns *namespace) ResolveX(ctx context.T, name string) (*naming.MountEntry, error) {
+func (ns *namespace) ResolveX(ctx context.T, name string, opts ...naming.ResolveOpt) (*naming.MountEntry, error) {
defer vlog.LogCall()()
e := new(naming.MountEntry)
if address, _ := naming.SplitAddressName(name); len(address) > 0 {
@@ -98,14 +98,14 @@
return nil, verror.NoExistf("Resolve name %q not found in %v", name, ns.mounts)
}
-func (ns *namespace) ResolveToMountTableX(ctx context.T, name string) (*naming.MountEntry, error) {
+func (ns *namespace) ResolveToMountTableX(ctx context.T, name string, opts ...naming.ResolveOpt) (*naming.MountEntry, error) {
defer vlog.LogCall()()
// TODO(mattr): Implement this method for tests that might need it.
panic("ResolveToMountTable not implemented")
return nil, nil
}
-func (ns *namespace) ResolveToMountTable(ctx context.T, name string) ([]string, error) {
+func (ns *namespace) ResolveToMountTable(ctx context.T, name string, opts ...naming.ResolveOpt) ([]string, error) {
defer vlog.LogCall()()
// TODO(mattr): Implement this method for tests that might need it.
panic("ResolveToMountTable not implemented")
diff --git a/runtimes/google/testing/mocks/runtime/panic_runtime.go b/runtimes/google/testing/mocks/runtime/panic_runtime.go
index 1c28e1d..d70852b 100644
--- a/runtimes/google/testing/mocks/runtime/panic_runtime.go
+++ b/runtimes/google/testing/mocks/runtime/panic_runtime.go
@@ -22,6 +22,7 @@
const badRuntime = "The runtime implmentation should not call methods on runtime intances."
func (*PanicRuntime) Profile() veyron2.Profile { panic(badRuntime) }
+func (*PanicRuntime) AppCycle() veyron2.AppCycle { panic(badRuntime) }
func (*PanicRuntime) Publisher() *config.Publisher { panic(badRuntime) }
func (*PanicRuntime) Principal() security.Principal { panic(badRuntime) }
func (*PanicRuntime) NewClient(opts ...ipc.ClientOpt) (ipc.Client, error) { panic(badRuntime) }
@@ -39,12 +40,6 @@
func (*PanicRuntime) NewLogger(name string, opts ...vlog.LoggingOpts) (vlog.Logger, error) {
panic(badRuntime)
}
-func (*PanicRuntime) Stop() { panic(badRuntime) }
-func (*PanicRuntime) ForceStop() { panic(badRuntime) }
-func (*PanicRuntime) WaitForStop(chan<- string) { panic(badRuntime) }
-func (*PanicRuntime) AdvanceGoal(delta int) { panic(badRuntime) }
-func (*PanicRuntime) AdvanceProgress(delta int) { panic(badRuntime) }
-func (*PanicRuntime) TrackTask(chan<- veyron2.Task) { panic(badRuntime) }
func (*PanicRuntime) ConfigureReservedName(ipc.Dispatcher, ...ipc.ServerOpt) {
panic(badRuntime)
}
diff --git a/security/agent/server/server.go b/security/agent/server/server.go
index 7d3f1ad..b753987 100644
--- a/security/agent/server/server.go
+++ b/security/agent/server/server.go
@@ -182,7 +182,7 @@
if err == io.EOF {
return
}
- if err == nil {
+ if clientAddr != nil {
// VCSecurityNone is safe since we're using anonymous unix sockets.
// Only our child process can possibly communicate on the socket.
//
diff --git a/services/mgmt/node/impl/node_invoker.go b/services/mgmt/node/impl/node_invoker.go
index 51a35cc..eeed7a4 100644
--- a/services/mgmt/node/impl/node_invoker.go
+++ b/services/mgmt/node/impl/node_invoker.go
@@ -159,7 +159,7 @@
if err := updateLink(i.config.Previous, i.config.CurrentLink); err != nil {
return err
}
- rt.R().Stop()
+ rt.R().AppCycle().Stop()
return nil
}
@@ -376,7 +376,7 @@
return err
}
- rt.R().Stop()
+ rt.R().AppCycle().Stop()
deferrer = nil
return nil
}
diff --git a/tools/mgmt/vsh b/tools/mgmt/vbash
similarity index 94%
rename from tools/mgmt/vsh
rename to tools/mgmt/vbash
index b0d0431..9a04701 100755
--- a/tools/mgmt/vsh
+++ b/tools/mgmt/vbash
@@ -11,18 +11,18 @@
# 2. Starts a shell under the agent, optionally fetching a remote blessing if
# the principal is missing.
#
-# Uses ~/.vsh to store its files, including binaries and the principal.
+# Uses ~/.vbash to store its files, including binaries and the principal.
#
# Usage:
#
# # Gets binaries from local repository
-# ./vsh
+# ./vbash
#
# # Gets binaries from local filesystem
-# ./vsh /path/to/binaries
+# ./vbash /path/to/binaries
#
# # Gets binaries from HTTP server
-# ./vsh http://host/path
+# ./vbash http://host/path
#
# Limitations:
# This only works on Linux and was only tested on goobuntu.
@@ -117,14 +117,14 @@
}
main() {
- if [[ ! -z "${VSH_INDICATOR}" ]]; then
- echo "Disallowing running VSH within VSH."
+ if [[ ! -z "${VBASH_INDICATOR}" ]]; then
+ echo "Disallowing running VBASH within VBASH."
echo "https://memegen.googleplex.com/5551020600983552"
exit 1
fi
- export VSH_INDICATOR="1"
+ export VBASH_INDICATOR="1"
- local -r INSTALL_DIR="${HOME}/.vsh"
+ local -r INSTALL_DIR="${HOME}/.vbash"
if [[ ! -e "${INSTALL_DIR}" ]]; then
mkdir -m 700 "${INSTALL_DIR}"
fi
diff --git a/tools/servicerunner/main.go b/tools/servicerunner/main.go
index c32a454..8620d14 100644
--- a/tools/servicerunner/main.go
+++ b/tools/servicerunner/main.go
@@ -57,44 +57,48 @@
func main() {
rt.Init()
- // TODO(sadovsky): It would be better if Dispatch() itself performed the env
+ // NOTE(sadovsky): It would be better if Dispatch() itself performed the env
// check.
if os.Getenv(modules.ShellEntryPoint) != "" {
panicOnError(modules.Dispatch())
return
}
+ vars := map[string]string{}
+
sh := modules.NewShell()
defer sh.Cleanup(os.Stderr, os.Stderr)
- // TODO(sadovsky): Shell only does this for tests. It would be better if it
+ // NOTE(sadovsky): Shell only does this for tests. It would be better if it
// either always did it or never did it.
if os.Getenv(consts.VeyronCredentials) == "" {
panicOnError(sh.CreateAndUseNewCredentials())
+ v, ok := sh.GetVar(consts.VeyronCredentials)
+ if !ok {
+ panic("Missing " + consts.VeyronCredentials)
+ }
+ vars[consts.VeyronCredentials] = v
}
- // TODO(sadovsky): The following line will not be needed if the modules
+ // NOTE(sadovsky): The following line will not be needed if the modules
// library is restructured per my proposal.
core.Install(sh)
- vars := map[string]string{}
-
h, err := sh.Start("root", nil, "--", "--veyron.tcp.address=127.0.0.1:0")
panicOnError(err)
updateVars(h, vars, "MT_NAME")
// Set consts.NamespaceRootPrefix env var, consumed downstream by proxyd
// among others.
- // NOTE(sadovsky): If this is not set, proxyd takes several seconds to
- // start; if it is set, proxyd starts instantly. Fun!
+ // NOTE(sadovsky): If this var is not set, proxyd takes several seconds to
+ // start; if it is set, proxyd starts instantly. Confusing.
sh.SetVar(consts.NamespaceRootPrefix, vars["MT_NAME"])
// NOTE(sadovsky): The proxyd binary requires --protocol and --address flags
// while the proxyd command instead uses ListenSpec flags.
- h, err = sh.Start("proxyd", nil, "--", "--veyron.tcp.address=127.0.0.1:0", "p")
+ h, err = sh.Start("proxyd", nil, "--", "--veyron.tcp.address=127.0.0.1:0", "test/proxy")
panicOnError(err)
updateVars(h, vars, "PROXY_ADDR")
- // TODO(sadovsky): Which identd should we be using?
- h, err = sh.Start("wsprd", nil, "--", "--veyron.proxy="+vars["PROXY_ADDR"], "--identd=/proxy.envyor.com:8101/identity/veyron-test/google")
+ h, err = sh.Start("wsprd", nil, "--", "--veyron.tcp.address=127.0.0.1:0", "--veyron.proxy=test/proxy", "--identd=test/identd")
panicOnError(err)
updateVars(h, vars, "WSPR_ADDR")