This is the first change to get rid of //.
- StartCall option to avoid resolution
- an option parameter to Mount calls to indicate that the mounted
server is another mount table.
- an option parameter to Mount calls to indicate this server is
replacing what's there rather than adding to it.
// still works. This is just the beginning of getting rid of it.
Change-Id: I29175587420249504a37b5e025bf5e9c8565e9e3
diff --git a/lib/modules/core/core.go b/lib/modules/core/core.go
index 98a6656..ccb14dd 100644
--- a/lib/modules/core/core.go
+++ b/lib/modules/core/core.go
@@ -87,7 +87,7 @@
prints the current time`)
shell.AddFunction(NamespaceCacheCommand, namespaceCache, `on|off
turns the namespace cache on or off`)
- shell.AddFunction(MountCommand, mountServer, `<mountpoint> <server> <ttl>
+ shell.AddFunction(MountCommand, mountServer, `<mountpoint> <server> <ttl> [M][R]
invokes namespace.Mount(<mountpoint>, <server>, <ttl>)`)
shell.AddSubprocess(EchoClientCommand, `<name> <message>...
invokes name.Echo(message)`)
diff --git a/lib/modules/core/misc.go b/lib/modules/core/misc.go
index 47f6ba9..e70aa02 100644
--- a/lib/modules/core/misc.go
+++ b/lib/modules/core/misc.go
@@ -42,19 +42,30 @@
}
func mountServer(stdin io.Reader, stdout, stderr io.Writer, env map[string]string, args ...string) error {
- if len(args) != 4 {
+ if len(args) < 4 {
return fmt.Errorf("wrong # args")
}
+ var opts []naming.MountOpt
+ for _, arg := range args[4:] {
+ for _, c := range arg {
+ switch c {
+ case 'R':
+ opts = append(opts, naming.ReplaceMountOpt(true))
+ case 'M':
+ opts = append(opts, naming.ServesMountTableOpt(true))
+ }
+ }
+ }
mp, server, ttlstr := args[1], args[2], args[3]
ttl, err := time.ParseDuration(ttlstr)
if err != nil {
return fmt.Errorf("failed to parse time from %q", ttlstr)
}
ns := rt.R().Namespace()
- if err := ns.Mount(rt.R().NewContext(), mp, server, ttl); err != nil {
+ if err := ns.Mount(rt.R().NewContext(), mp, server, ttl, opts...); err != nil {
return err
}
- fmt.Fprintf(stdout, "Mount(%s, %s, %s)\n", mp, server, ttl)
+ fmt.Fprintf(stdout, "Mount(%s, %s, %s, %v)\n", mp, server, ttl, opts)
return nil
}
diff --git a/runtimes/google/ipc/client.go b/runtimes/google/ipc/client.go
index f4e177b..6f8539f 100644
--- a/runtimes/google/ipc/client.go
+++ b/runtimes/google/ipc/client.go
@@ -159,28 +159,29 @@
return false
}
+func getRetryTimeoutOpt(opts []ipc.CallOpt) (time.Duration, bool) {
+ for _, o := range opts {
+ if r, ok := o.(veyron2.RetryTimeoutOpt); ok {
+ return time.Duration(r), true
+ }
+ }
+ return 0, false
+}
+
func (c *client) StartCall(ctx context.T, name, method string, args []interface{}, opts ...ipc.CallOpt) (ipc.Call, error) {
defer vlog.LogCall()()
- var retry = true
+ // Context specified deadline.
deadline, hasDeadline := ctx.Deadline()
if !hasDeadline {
- // If no deadline is set, use the default
+ // Default deadline.
deadline = time.Now().Add(defaultCallTimeout)
}
- for _, o := range opts {
- r, ok := o.(veyron2.RetryTimeoutOpt)
- if !ok {
- continue
- }
- if r == 0 {
- retry = false
- } else {
- deadline = time.Now().Add(time.Duration(r))
- }
- break
+ if r, ok := getRetryTimeoutOpt(opts); ok {
+ // Caller specified deadline.
+ deadline = time.Now().Add(time.Duration(r))
}
var lastErr verror.E
- for retries := 0; deadline.After(time.Now()); retries++ {
+ for retries := 0; ; retries++ {
if retries != 0 {
if !backoff(retries, deadline) {
break
@@ -191,23 +192,37 @@
return call, nil
}
lastErr = err
- if !retry || !retriable(err) {
+ if deadline.After(time.Now()) || !retriable(err) {
break
}
}
return nil, lastErr
}
+func getNoResolveOpt(opts []ipc.CallOpt) bool {
+ for _, o := range opts {
+ if r, ok := o.(veyron2.NoResolveOpt); ok {
+ return bool(r)
+ }
+ }
+ return false
+}
+
// startCall ensures StartCall always returns verror.E.
func (c *client) startCall(ctx context.T, name, method string, args []interface{}, opts ...ipc.CallOpt) (ipc.Call, verror.E) {
if ctx == nil {
return nil, verror.BadArgf("ipc: %s.%s called with nil context", name, method)
}
ctx, _ = vtrace.WithNewSpan(ctx, fmt.Sprintf("Client Call: %s.%s", name, method))
-
- servers, err := c.ns.Resolve(ctx, name)
- if err != nil {
- return nil, verror.NoExistf("ipc: Resolve(%q) failed: %v", name, err)
+ // Resolve name unless told not to.
+ var servers []string
+ if getNoResolveOpt(opts) {
+ servers = []string{name}
+ } else {
+ var err error
+ if servers, err = c.ns.Resolve(ctx, name); err != nil {
+ return nil, verror.NoExistf("ipc: Resolve(%q) failed: %v", name, err)
+ }
}
// Try all servers, and if none of them are authorized for the call then return the error of the last server
// that was tried.
diff --git a/runtimes/google/ipc/debug_test.go b/runtimes/google/ipc/debug_test.go
index 6bcfc69..887bdec 100644
--- a/runtimes/google/ipc/debug_test.go
+++ b/runtimes/google/ipc/debug_test.go
@@ -5,6 +5,7 @@
"sort"
"testing"
+ "veyron.io/veyron/veyron2"
"veyron.io/veyron/veyron2/ipc"
"veyron.io/veyron/veyron2/naming"
"veyron.io/veyron/veyron2/services/mounttable/types"
@@ -37,7 +38,7 @@
ctx := testContext()
// Call the Foo method on ""
{
- addr := naming.JoinAddressName(ep.String(), "//")
+ addr := naming.JoinAddressName(ep.String(), "")
call, err := client.StartCall(ctx, addr, "Foo", nil)
if err != nil {
t.Fatalf("client.StartCall failed: %v", err)
@@ -52,8 +53,8 @@
}
// Call Glob on __debug
{
- addr := naming.JoinAddressName(ep.String(), "//__debug")
- call, err := client.StartCall(ctx, addr, "Glob", []interface{}{"*"})
+ addr := naming.JoinAddressName(ep.String(), "__debug")
+ call, err := client.StartCall(ctx, addr, "Glob", []interface{}{"*"}, veyron2.NoResolveOpt(true))
if err != nil {
t.Fatalf("client.StartCall failed: %v", err)
}
@@ -82,8 +83,8 @@
{
foo := stats.NewString("testing/foo")
foo.Set("The quick brown fox jumps over the lazy dog")
- addr := naming.JoinAddressName(ep.String(), "//__debug/stats/testing/foo")
- call, err := client.StartCall(ctx, addr, "Value", nil)
+ addr := naming.JoinAddressName(ep.String(), "__debug/stats/testing/foo")
+ call, err := client.StartCall(ctx, addr, "Value", nil, veyron2.NoResolveOpt(true))
if err != nil {
t.Fatalf("client.StartCall failed: %v", err)
}
diff --git a/runtimes/google/ipc/full_test.go b/runtimes/google/ipc/full_test.go
index 76d940e..373e625 100644
--- a/runtimes/google/ipc/full_test.go
+++ b/runtimes/google/ipc/full_test.go
@@ -956,8 +956,8 @@
defer publisher.WaitForStop()
defer publisher.Stop()
publisher.AddName("incompatible")
- publisher.AddServer("/@2@tcp@localhost:10000@@1000000@2000000@@")
- publisher.AddServer("/@2@tcp@localhost:10001@@2000000@3000000@@")
+ publisher.AddServer("/@2@tcp@localhost:10000@@1000000@2000000@@", false)
+ publisher.AddServer("/@2@tcp@localhost:10001@@2000000@3000000@@", false)
_, err := b.client.StartCall(testContext(), "incompatible/suffix", "Echo", []interface{}{"foo"})
if !strings.Contains(err.Error(), version.NoCompatibleVersionErr.Error()) {
@@ -965,7 +965,7 @@
}
// Now add a server with a compatible endpoint and try again.
- publisher.AddServer("/" + b.ep.String())
+ publisher.AddServer("/"+b.ep.String(), false)
publisher.AddName("incompatible")
call, err := b.client.StartCall(testContext(), "incompatible/suffix", "Echo", []interface{}{"foo"})
diff --git a/runtimes/google/ipc/server.go b/runtimes/google/ipc/server.go
index 57d90dc..aa1eeb5 100644
--- a/runtimes/google/ipc/server.go
+++ b/runtimes/google/ipc/server.go
@@ -204,7 +204,7 @@
}(ln, ep)
}
s.Unlock()
- s.publisher.AddServer(s.publishEP(ep))
+ s.publisher.AddServer(s.publishEP(ep), s.servesMountTable)
return ep, nil
}
@@ -340,9 +340,9 @@
s.proxyListenLoop(ln, ep, proxy)
s.active.Done()
}(pln, pep, listenSpec.Proxy)
- s.publisher.AddServer(s.publishEP(pep))
+ s.publisher.AddServer(s.publishEP(pep), s.servesMountTable)
} else {
- s.publisher.AddServer(s.publishEP(ep))
+ s.publisher.AddServer(s.publishEP(ep), s.servesMountTable)
}
s.Unlock()
return ep, nil
@@ -393,7 +393,7 @@
}
}
// (3) reconnected, publish new address
- s.publisher.AddServer(s.publishEP(ep))
+ s.publisher.AddServer(s.publishEP(ep), s.servesMountTable)
s.Lock()
s.listeners[ln] = nil
s.Unlock()
@@ -459,7 +459,7 @@
switch setting.Name() {
case ipc.NewAddrsSetting:
vlog.Infof("Added some addresses: %q", v)
- s.applyChange(dhcpl, v, publisher.AddServer)
+ s.applyChange(dhcpl, v, func(name string) { publisher.AddServer(name, s.servesMountTable) })
case ipc.RmAddrsSetting:
vlog.Infof("Removed some addresses: %q", v)
s.applyChange(dhcpl, v, publisher.RemoveServer)
diff --git a/runtimes/google/lib/publisher/publisher.go b/runtimes/google/lib/publisher/publisher.go
index 25db23f..83468e2 100644
--- a/runtimes/google/lib/publisher/publisher.go
+++ b/runtimes/google/lib/publisher/publisher.go
@@ -16,7 +16,7 @@
// Publisher manages the publishing of servers in mounttable.
type Publisher interface {
// AddServer adds a new server to be mounted.
- AddServer(server string)
+ AddServer(server string, ServesMountTable bool)
// RemoveServer removes a server from the list of mounts.
RemoveServer(server string)
// AddName adds a new name for all servers to be mounted as.
@@ -49,6 +49,7 @@
type addServerCmd struct {
server string // server to add
+ mt bool // true if server serves a mount table
done chan struct{} // closed when the cmd is done
}
@@ -87,9 +88,9 @@
}
}
-func (p *publisher) AddServer(server string) {
+func (p *publisher) AddServer(server string, mt bool) {
done := make(chan struct{})
- if p.sendCmd(addServerCmd{server, done}) {
+ if p.sendCmd(addServerCmd{server, mt, done}) {
<-done
}
}
@@ -161,7 +162,7 @@
vlog.VI(2).Info("ipc pub: exit runLoop")
return
case addServerCmd:
- state.addServer(tcmd.server)
+ state.addServer(tcmd.server, tcmd.mt)
close(tcmd.done)
case removeServerCmd:
state.removeServer(tcmd.server)
@@ -192,6 +193,7 @@
deadline time.Time // deadline for the next sync call
names []string // names that have been added
servers map[string]bool // servers that have been added
+ servesMT map[string]bool // true if server is a mount table server
mounts map[mountKey]*mountStatus // map each (name,server) to its status
}
@@ -214,6 +216,7 @@
period: period,
deadline: time.Now().Add(period),
servers: make(map[string]bool),
+ servesMT: make(map[string]bool),
mounts: make(map[mountKey]*mountStatus),
}
}
@@ -234,19 +237,20 @@
for server, _ := range ps.servers {
status := new(mountStatus)
ps.mounts[mountKey{name, server}] = status
- ps.mount(name, server, status)
+ ps.mount(name, server, status, ps.servesMT[server])
}
}
-func (ps *pubState) addServer(server string) {
+func (ps *pubState) addServer(server string, servesMT bool) {
// Each non-dup server that is added causes new mounts to be created for all
// existing names.
if !ps.servers[server] {
ps.servers[server] = true
+ ps.servers[server] = servesMT
for _, name := range ps.names {
status := new(mountStatus)
ps.mounts[mountKey{name, server}] = status
- ps.mount(name, server, status)
+ ps.mount(name, server, status, servesMT)
}
}
}
@@ -263,13 +267,13 @@
}
}
-func (ps *pubState) mount(name, server string, status *mountStatus) {
+func (ps *pubState) mount(name, server string, status *mountStatus, servesMT bool) {
// Always mount with ttl = period + slack, regardless of whether this is
// triggered by a newly added server or name, or by sync. The next call
// to sync will occur within the next period, and refresh all mounts.
ttl := ps.period + mountTTLSlack
status.lastMount = time.Now()
- status.lastMountErr = ps.ns.Mount(ps.ctx, name, server, ttl)
+ status.lastMountErr = ps.ns.Mount(ps.ctx, name, server, ttl, naming.ServesMountTableOpt(servesMT))
if status.lastMountErr != nil {
vlog.Errorf("ipc pub: couldn't mount(%v, %v, %v): %v", name, server, ttl, status.lastMountErr)
} else {
@@ -284,7 +288,7 @@
// Desired state is "unmounted", failed at previous attempt. Retry.
ps.unmount(key.name, key.server, status)
} else {
- ps.mount(key.name, key.server, status)
+ ps.mount(key.name, key.server, status, ps.servesMT[key.server])
}
}
}
diff --git a/runtimes/google/naming/namespace/mount.go b/runtimes/google/naming/namespace/mount.go
index 49cc811..35d5c01 100644
--- a/runtimes/google/naming/namespace/mount.go
+++ b/runtimes/google/naming/namespace/mount.go
@@ -5,13 +5,15 @@
"veyron.io/veyron/veyron2/context"
"veyron.io/veyron/veyron2/ipc"
+ "veyron.io/veyron/veyron2/naming"
+ "veyron.io/veyron/veyron2/services/mounttable/types"
"veyron.io/veyron/veyron2/vlog"
)
// mountIntoMountTable mounts a single server into a single mount table.
-func mountIntoMountTable(ctx context.T, client ipc.Client, name, server string, ttl time.Duration) error {
+func mountIntoMountTable(ctx context.T, client ipc.Client, name, server string, ttl time.Duration, flags types.MountFlag) error {
ctx, _ = ctx.WithTimeout(callTimeout)
- call, err := client.StartCall(ctx, name, "Mount", []interface{}{server, uint32(ttl.Seconds())})
+ call, err := client.StartCall(ctx, name, "Mount", []interface{}{server, uint32(ttl.Seconds()), flags})
if err != nil {
return err
}
@@ -34,8 +36,24 @@
return err
}
-func (ns *namespace) Mount(ctx context.T, name, server string, ttl time.Duration) error {
+func (ns *namespace) Mount(ctx context.T, name, server string, ttl time.Duration, opts ...naming.MountOpt) error {
defer vlog.LogCall()()
+
+ var flags types.MountFlag
+ for _, o := range opts {
+ // NB: used a switch since we'll be adding more options.
+ switch v := o.(type) {
+ case naming.ReplaceMountOpt:
+ if v {
+ flags |= types.MountFlag(types.Replace)
+ }
+ case naming.ServesMountTableOpt:
+ if v {
+ flags |= types.MountFlag(types.MT)
+ }
+ }
+ }
+
// Resolve to all the mount tables implementing name.
mtServers, err := ns.ResolveToMountTable(ctx, name)
if err != nil {
@@ -45,7 +63,7 @@
c := make(chan error, len(mtServers))
for _, mt := range mtServers {
go func() {
- c <- mountIntoMountTable(ctx, ns.rt.Client(), mt, server, ttl)
+ c <- mountIntoMountTable(ctx, ns.rt.Client(), mt, server, ttl, flags)
}()
}
// Return error if any mounts failed, since otherwise we'll get
diff --git a/runtimes/google/testing/mocks/naming/namespace.go b/runtimes/google/testing/mocks/naming/namespace.go
index 68b0bfe..58c914c 100644
--- a/runtimes/google/testing/mocks/naming/namespace.go
+++ b/runtimes/google/testing/mocks/naming/namespace.go
@@ -25,7 +25,7 @@
mounts map[string][]string
}
-func (ns *namespace) Mount(ctx context.T, name, server string, _ time.Duration) error {
+func (ns *namespace) Mount(ctx context.T, name, server string, _ time.Duration, _ ...naming.MountOpt) error {
defer vlog.LogCall()()
ns.Lock()
defer ns.Unlock()
diff --git a/services/mounttable/lib/mounttable.go b/services/mounttable/lib/mounttable.go
index 85936ec..2a1376d 100644
--- a/services/mounttable/lib/mounttable.go
+++ b/services/mounttable/lib/mounttable.go
@@ -47,6 +47,7 @@
// point can be sent to any of these servers.
type mount struct {
servers *serverList
+ mt bool
}
// node is a single point in the tree representing the mount table.
@@ -225,8 +226,16 @@
return n.mount.servers.copyToSlice(), slashSlashJoin(elems), nil
}
+func hasMTFlag(flags types.MountFlag) bool {
+ return (flags & types.MT) == types.MT
+}
+
+func hasReplaceFlag(flags types.MountFlag) bool {
+ return (flags & types.Replace) == types.Replace
+}
+
// Mount a server onto the name in the receiver.
-func (ms *mountContext) Mount(context ipc.ServerContext, server string, ttlsecs uint32) error {
+func (ms *mountContext) Mount(context ipc.ServerContext, server string, ttlsecs uint32, flags types.MountFlag) error {
mt := ms.mt
if ttlsecs == 0 {
ttlsecs = 10 * 365 * 24 * 60 * 60 // a really long time
@@ -246,13 +255,17 @@
if n == nil {
return naming.ErrNoSuchName
}
+ if hasReplaceFlag(flags) {
+ n.mount = nil
+ }
if n.mount == nil {
- n.mount = &mount{
- servers: NewServerList(),
+ n.mount = &mount{servers: NewServerList(), mt: hasMTFlag(flags)}
+ } else {
+ if hasMTFlag(flags) != n.mount.mt {
+ return fmt.Errorf("MT doesn't match")
}
}
- m := n.mount
- m.servers.add(server, time.Duration(ttlsecs)*time.Second)
+ n.mount.servers.add(server, time.Duration(ttlsecs)*time.Second)
return nil
}
diff --git a/services/mounttable/lib/mounttable_test.go b/services/mounttable/lib/mounttable_test.go
index d830415..c773312 100644
--- a/services/mounttable/lib/mounttable_test.go
+++ b/services/mounttable/lib/mounttable_test.go
@@ -53,7 +53,7 @@
return c
}
-func (stupidNS) Mount(context.T, string, string, time.Duration) error {
+func (stupidNS) Mount(context.T, string, string, time.Duration, ...naming.MountOpt) error {
return errors.New("unimplemented")
}
@@ -124,7 +124,7 @@
if err != nil {
boom(t, "Failed to BindMountTable: %s", err)
}
- if err := mtpt.Mount(rt.R().NewContext(), service, uint32(ttlSecs), veyron2.RetryTimeoutOpt(0)); err != nil {
+ if err := mtpt.Mount(rt.R().NewContext(), service, uint32(ttlSecs), 0, veyron2.RetryTimeoutOpt(0)); err != nil {
if shouldSucceed {
boom(t, "Failed to Mount %s onto %s: %s", service, name, err)
}
diff --git a/services/mounttable/lib/neighborhood.go b/services/mounttable/lib/neighborhood.go
index 6b6df4d..0381352 100644
--- a/services/mounttable/lib/neighborhood.go
+++ b/services/mounttable/lib/neighborhood.go
@@ -221,7 +221,7 @@
}
// Mount not implemented.
-func (*neighborhoodService) Mount(_ ipc.ServerContext, server string, ttlsecs uint32) error {
+func (*neighborhoodService) Mount(_ ipc.ServerContext, server string, ttlsecs uint32, opts types.MountFlag) error {
return errors.New("this server does not implement Mount")
}
@@ -247,7 +247,7 @@
if ok, _, _ := g.MatchInitialSegment(k); !ok {
continue
}
- if err := sender.Send(types.MountEntry{Name: k, Servers: n}); err != nil {
+ if err := sender.Send(types.MountEntry{Name: k, Servers: n, MT: true}); err != nil {
return err
}
}
@@ -257,7 +257,7 @@
if neighbor == nil {
return naming.ErrNoSuchName
}
- return sender.Send(types.MountEntry{Name: "", Servers: neighbor})
+ return sender.Send(types.MountEntry{Name: "", Servers: neighbor, MT: true})
default:
return naming.ErrNoSuchName
}
diff --git a/services/proxy/proxyd/main.go b/services/proxy/proxyd/main.go
index ce9859b..903db8b 100644
--- a/services/proxy/proxyd/main.go
+++ b/services/proxy/proxyd/main.go
@@ -46,7 +46,7 @@
publisher := publisher.New(r.NewContext(), r.Namespace(), time.Minute)
defer publisher.WaitForStop()
defer publisher.Stop()
- publisher.AddServer(naming.JoinAddressName(proxy.Endpoint().String(), "//"))
+ publisher.AddServer(naming.JoinAddressName(proxy.Endpoint().String(), "//"), false)
publisher.AddName(*name)
}
diff --git a/tools/mounttable/impl/impl.go b/tools/mounttable/impl/impl.go
index 0bec3fe..fb39d1a 100644
--- a/tools/mounttable/impl/impl.go
+++ b/tools/mounttable/impl/impl.go
@@ -99,7 +99,7 @@
if err != nil {
return fmt.Errorf("TTL parse error: %v", err)
}
- err = c.Mount(ctx, args[1], uint32(ttl.Seconds()))
+ err = c.Mount(ctx, args[1], uint32(ttl.Seconds()), 0)
if err != nil {
return err
}
diff --git a/tools/mounttable/impl/impl_test.go b/tools/mounttable/impl/impl_test.go
index 77a5a12..b4f96c7 100644
--- a/tools/mounttable/impl/impl_test.go
+++ b/tools/mounttable/impl/impl_test.go
@@ -25,12 +25,12 @@
func (s *server) Glob(_ ipc.ServerContext, pattern string, stream mounttable.GlobbableServiceGlobStream) error {
vlog.VI(2).Infof("Glob() was called. suffix=%v pattern=%q", s.suffix, pattern)
sender := stream.SendStream()
- sender.Send(types.MountEntry{"name1", []types.MountedServer{{"server1", 123}}})
- sender.Send(types.MountEntry{"name2", []types.MountedServer{{"server2", 456}, {"server3", 789}}})
+ sender.Send(types.MountEntry{"name1", []types.MountedServer{{"server1", 123}}, false})
+ sender.Send(types.MountEntry{"name2", []types.MountedServer{{"server2", 456}, {"server3", 789}}, false})
return nil
}
-func (s *server) Mount(_ ipc.ServerContext, server string, ttl uint32) error {
+func (s *server) Mount(_ ipc.ServerContext, server string, ttl uint32, flags types.MountFlag) error {
vlog.VI(2).Infof("Mount() was called. suffix=%v server=%q ttl=%d", s.suffix, server, ttl)
return nil
}