TBR: PROTOTYPE - discovery proto #1 and #2
Implement prototype #1 and #2
https://docs.google.com/a/google.com/document/d/1qFvWn66WgwCyKhxJ4HE4_7iCzRQ4dLtQ0HlcRxIY7uo/edit?usp=sharing
MultiPart: 2/2
Change-Id: I01829cf232e9b16c3e1f5f253eb9e66fa2a31ebe
diff --git a/cmd/namespace/impl.go b/cmd/namespace/impl.go
index ca17f5c..e75979d 100644
--- a/cmd/namespace/impl.go
+++ b/cmd/namespace/impl.go
@@ -36,6 +36,7 @@
var (
flagLongGlob bool
+ flagInsecureGlob bool
flagInsecureResolve bool
flagInsecureResolveToMT bool
flagDeleteSubtree bool
@@ -43,6 +44,7 @@
func init() {
cmdGlob.Flags.BoolVar(&flagLongGlob, "l", false, "Long listing format.")
+ cmdGlob.Flags.BoolVar(&flagInsecureGlob, "insecure", false, "Insecure mode: May return results from untrusted servers and invoke Glob on untrusted mounttables")
cmdResolve.Flags.BoolVar(&flagInsecureResolve, "insecure", false, "Insecure mode: May return results from untrusted servers and invoke Resolve on untrusted mounttables")
cmdResolveToMT.Flags.BoolVar(&flagInsecureResolveToMT, "insecure", false, "Insecure mode: May return results from untrusted servers and invoke Resolve on untrusted mounttables")
cmdDelete.Flags.BoolVar(&flagDeleteSubtree, "r", false, "Delete all children of the name in addition to the name itself.")
@@ -53,27 +55,41 @@
Name: "glob",
Short: "Returns all matching entries from the namespace",
Long: "Returns all matching entries from the namespace.",
- ArgsName: "<pattern>",
+ ArgsName: "<pattern> [<predicate>]",
ArgsLong: `
<pattern> is a glob pattern that is matched against all the names below the
specified mount name.
+
+<predicate> is an optional predicate that is matched against all the names
+below the specified mount name.
`,
}
func runGlob(ctx *context.T, env *cmdline.Env, args []string) error {
- if expected, got := 1, len(args); expected != got {
+ if expected, got := 1, len(args); expected > got {
+ return env.UsageErrorf("glob: incorrect number of arguments, expected %d, got %d", expected, got)
+ }
+ if expected, got := 2, len(args); expected < got {
return env.UsageErrorf("glob: incorrect number of arguments, expected %d, got %d", expected, got)
}
pattern := args[0]
+ predicate := ""
+ if len(args) > 1 {
+ predicate = args[1]
+ }
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
ns := v23.GetNamespace(ctx)
- c, err := ns.Glob(ctx, pattern)
+ var opts []naming.NamespaceOpt
+ if flagInsecureGlob {
+ opts = append(opts, options.SkipServerEndpointAuthorization{})
+ }
+ c, err := ns.GlobWithPredicate(ctx, pattern, predicate, opts...)
if err != nil {
- ctx.Infof("ns.Glob(%q) failed: %v", pattern, err)
+ ctx.Infof("ns.Glob(%q, %q) failed: %v", pattern, predicate, err)
return err
}
if flagLongGlob {
diff --git a/lib/xrpc/xserver.go b/lib/xrpc/xserver.go
index 30bb7c1..cb7810b 100644
--- a/lib/xrpc/xserver.go
+++ b/lib/xrpc/xserver.go
@@ -9,6 +9,7 @@
import (
"v.io/v23"
"v.io/v23/context"
+ "v.io/v23/naming"
"v.io/v23/rpc"
"v.io/v23/security"
)
@@ -84,8 +85,8 @@
// AddName adds the specified name to the mount table for this server.
// AddName may be called multiple times.
-func (s *Server) AddName(name string) error {
- return s.s.AddName(name)
+func (s *Server) AddName(name string, properties naming.Properties) error {
+ return s.s.AddName(name, properties)
}
// RemoveName removes the specified name from the mount table.
diff --git a/runtime/internal/lib/publisher/publisher.go b/runtime/internal/lib/publisher/publisher.go
index 270fec2..fb453ee 100644
--- a/runtime/internal/lib/publisher/publisher.go
+++ b/runtime/internal/lib/publisher/publisher.go
@@ -28,7 +28,7 @@
// RemoveServer removes a server from the list of mounts.
RemoveServer(server string)
// AddName adds a new name for all servers to be mounted as.
- AddName(name string, ServesMountTable bool, IsLeaf bool)
+ AddName(name string, servesMountTable bool, isLeaf bool, properties naming.Properties)
// RemoveName removes a name.
RemoveName(name string)
// Status returns a snapshot of the publisher's current state.
@@ -68,9 +68,10 @@
}
type addNameCmd struct {
- name string // name to add
- mt bool // true if server serves a mount table
- leaf bool // true if server is a leaf
+ name string // name to add
+ mt bool // true if server serves a mount table
+ leaf bool // true if server is a leaf
+ properties naming.Properties // properties to add
}
type removeNameCmd struct {
@@ -114,8 +115,8 @@
p.sendCmd(removeServerCmd{server})
}
-func (p *publisher) AddName(name string, mt bool, leaf bool) {
- p.sendCmd(addNameCmd{name, mt, leaf})
+func (p *publisher) AddName(name string, mt bool, leaf bool, properties naming.Properties) {
+ p.sendCmd(addNameCmd{name, mt, leaf, properties})
}
func (p *publisher) RemoveName(name string) {
@@ -175,7 +176,7 @@
case removeServerCmd:
state.removeServer(tcmd.server)
case addNameCmd:
- state.addName(tcmd.name, tcmd.mt, tcmd.leaf)
+ state.addName(tcmd.name, tcmd.mt, tcmd.leaf, tcmd.properties)
case removeNameCmd:
state.removeName(tcmd.name)
case statusCmd:
@@ -210,8 +211,9 @@
}
type nameAttr struct {
- servesMT bool
- isLeaf bool
+ servesMT naming.ServesMountTable
+ isLeaf naming.IsLeaf
+ properties naming.Properties
}
func newPubState(ctx *context.T, ns namespace.T, period time.Duration) *pubState {
@@ -230,13 +232,13 @@
return time.After(ps.deadline.Sub(time.Now()))
}
-func (ps *pubState) addName(name string, mt bool, leaf bool) {
+func (ps *pubState) addName(name string, mt bool, leaf bool, properties naming.Properties) {
// Each non-dup name that is added causes new mounts to be created for all
// existing servers.
if _, exists := ps.names[name]; exists {
return
}
- attr := nameAttr{mt, leaf}
+ attr := nameAttr{naming.ServesMountTable(mt), naming.IsLeaf(leaf), properties}
ps.names[name] = attr
for server, _ := range ps.servers {
status := new(rpc.MountStatus)
@@ -289,7 +291,7 @@
ttl := ps.period + mountTTLSlack
last := status
status.LastMount = time.Now()
- status.LastMountErr = ps.ns.Mount(ps.ctx, name, server, ttl, naming.ServesMountTable(attr.servesMT), naming.IsLeaf(attr.isLeaf))
+ status.LastMountErr = ps.ns.Mount(ps.ctx, name, server, ttl, attr.servesMT, attr.isLeaf, attr.properties)
status.TTL = ttl
// If the mount status changed, log it.
if status.LastMountErr != nil {
diff --git a/runtime/internal/naming/namespace/glob.go b/runtime/internal/naming/namespace/glob.go
index e5ca02e..823675d 100644
--- a/runtime/internal/naming/namespace/glob.go
+++ b/runtime/internal/naming/namespace/glob.go
@@ -40,11 +40,12 @@
// task is a sub-glob that has to be performed against a mount table. Tasks are
// done in parrallel to speed up the glob.
type task struct {
- pattern *glob.Glob // pattern to match
- er *naming.GlobError // error for that particular point in the name space
- me *naming.MountEntry // server to match at
- error error // any error performing this task
- depth int // number of mount tables traversed recursively
+ pattern *glob.Glob // pattern to match
+ predicate string // predicate to match
+ er *naming.GlobError // error for that particular point in the name space
+ me *naming.MountEntry // server to match at
+ error error // any error performing this task
+ depth int // number of mount tables traversed recursively
}
// globAtServer performs a Glob on the servers at a mount point. It cycles through the set of
@@ -79,7 +80,7 @@
return
}
- call, err := ns.parallelStartCall(ctx, client, servers, rpc.GlobMethod, []interface{}{pstr}, opts)
+ call, err := ns.parallelStartCall(ctx, client, servers, rpc.GlobWithPredicateMethod, []interface{}{pstr, t.predicate}, opts)
if err != nil {
t.error = err
return
@@ -111,6 +112,7 @@
Servers: v.Value.Servers,
ServesMountTable: v.Value.ServesMountTable,
IsLeaf: v.Value.IsLeaf,
+ NotMatched: v.Value.NotMatched,
},
depth: t.depth + 1,
}
@@ -146,7 +148,7 @@
}
// globLoop fires off a go routine for each server and reads backs replies.
-func (ns *namespace) globLoop(ctx *context.T, e *naming.MountEntry, prefix string, pattern *glob.Glob, reply chan naming.GlobReply, tr *tracks, opts []rpc.CallOpt) {
+func (ns *namespace) globLoop(ctx *context.T, e *naming.MountEntry, prefix string, pattern *glob.Glob, predicate string, reply chan naming.GlobReply, tr *tracks, opts []rpc.CallOpt) {
defer close(reply)
// Provide enough buffers to avoid too much switching between the readers and the writers.
@@ -202,7 +204,7 @@
// If we've satisfied the request and this isn't the root,
// reply to the caller.
- if suffix.Len() == 0 && t.depth != 0 {
+ if suffix.Len() == 0 && t.depth != 0 && !t.me.NotMatched {
x := *t.me
x.Name = naming.Join(prefix, x.Name)
reply <- &naming.GlobReplyEntry{x}
@@ -226,12 +228,17 @@
// Perform a glob at the next server.
inFlight++
t.pattern = suffix
+ t.predicate = predicate
go ns.globAtServer(ctx, t, replies, tr, opts)
}
}
// Glob implements naming.MountTable.Glob.
func (ns *namespace) Glob(ctx *context.T, pattern string, opts ...naming.NamespaceOpt) (<-chan naming.GlobReply, error) {
+ return ns.GlobWithPredicate(ctx, pattern, "", opts...)
+}
+
+func (ns *namespace) GlobWithPredicate(ctx *context.T, pattern, predicate string, opts ...naming.NamespaceOpt) (<-chan naming.GlobReply, error) {
defer apilog.LogCallf(ctx, "pattern=%.10s...,opts...=%v", pattern, opts)(ctx, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
// Root the pattern. If we have no servers to query, give up.
e, patternWasRooted := ns.rootMountEntry(pattern)
@@ -256,6 +263,6 @@
}
e.Name = ""
reply := make(chan naming.GlobReply, 100)
- go ns.globLoop(ctx, e, prefix, g, reply, tr, getCallOpts(opts))
+ go ns.globLoop(ctx, e, prefix, g, predicate, reply, tr, getCallOpts(opts))
return reply, nil
}
diff --git a/runtime/internal/naming/namespace/mount.go b/runtime/internal/naming/namespace/mount.go
index 884cec1..d816ff5 100644
--- a/runtime/internal/naming/namespace/mount.go
+++ b/runtime/internal/naming/namespace/mount.go
@@ -17,10 +17,10 @@
)
// mountIntoMountTable mounts a single server into a single mount table.
-func mountIntoMountTable(ctx *context.T, client rpc.Client, name, server string, ttl time.Duration, flags naming.MountFlag, id string, opts ...rpc.CallOpt) (s status) {
+func mountIntoMountTable(ctx *context.T, client rpc.Client, name, server string, ttl time.Duration, flags naming.MountFlag, properties naming.Properties, id string, opts ...rpc.CallOpt) (s status) {
s.id = id
ctx = withTimeout(ctx)
- s.err = client.Call(ctx, name, "Mount", []interface{}{server, uint32(ttl.Seconds()), flags}, nil, append(opts, options.NoResolve{})...)
+ s.err = client.Call(ctx, name, "Mount", []interface{}{server, uint32(ttl.Seconds()), flags, properties}, nil, append(opts, options.NoResolve{})...)
return
}
@@ -28,6 +28,7 @@
func (ns *namespace) Mount(ctx *context.T, name, server string, ttl time.Duration, opts ...naming.NamespaceOpt) error {
defer apilog.LogCallf(ctx, "name=%.10s...,server=%.10s...,ttl=%v,opts...=%v", name, server, ttl, opts)(ctx, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
var flags naming.MountFlag
+ var properties naming.Properties
for _, o := range opts {
// NB: used a switch since we'll be adding more options.
switch v := o.(type) {
@@ -43,13 +44,17 @@
if v {
flags |= naming.MountFlag(naming.Leaf)
}
+ case naming.Properties:
+ if v != nil {
+ properties = v
+ }
}
}
client := v23.GetClient(ctx)
// Mount the server in all the returned mount tables.
f := func(ctx *context.T, mt, id string) status {
- return mountIntoMountTable(ctx, client, mt, server, ttl, flags, id, getCallOpts(opts)...)
+ return mountIntoMountTable(ctx, client, mt, server, ttl, flags, properties, id, getCallOpts(opts)...)
}
err := ns.dispatch(ctx, name, f, opts)
ctx.VI(1).Infof("Mount(%s, %q) -> %v", name, server, err)
diff --git a/runtime/internal/rpc/reserved.go b/runtime/internal/rpc/reserved.go
index 3e8a0ca..fbe786b 100644
--- a/runtime/internal/rpc/reserved.go
+++ b/runtime/internal/rpc/reserved.go
@@ -55,6 +55,12 @@
OutStream: rpc.ArgDesc{Doc: "Streams matching entries back to the client."},
},
{
+ Name: "GlobWithPredicate",
+ Doc: "GlobWithPredicate returns all entries matching the pattern and the predicate.",
+ InArgs: []rpc.ArgDesc{{Name: "pattern", Doc: ""}, {Name: "predicate", Doc: ""}},
+ OutStream: rpc.ArgDesc{Doc: "Streams matching entries back to the client."},
+ },
+ {
Name: "MethodSignature",
Doc: "MethodSignature returns the signature for the given method.",
InArgs: []rpc.ArgDesc{{
@@ -145,9 +151,13 @@
}
func (r *reservedMethods) Glob(ctx *context.T, call rpc.StreamServerCall, pattern string) error {
+ return r.GlobWithPredicate(ctx, call, pattern, "")
+}
+
+func (r *reservedMethods) GlobWithPredicate(ctx *context.T, call rpc.StreamServerCall, pattern, predicate string) error {
// Copy the original call to shield ourselves from changes the flowServer makes.
glob := globInternal{r.dispNormal, r.dispReserved, call.Suffix()}
- return glob.Glob(ctx, call, pattern)
+ return glob.GlobWithPredicate(ctx, call, pattern, predicate)
}
// globInternal handles ALL the Glob requests received by a server and
@@ -189,7 +199,11 @@
const maxRecursiveGlobDepth = 10
func (i *globInternal) Glob(ctx *context.T, call rpc.StreamServerCall, pattern string) error {
- ctx.VI(3).Infof("rpc Glob: Incoming request: %q.Glob(%q)", i.receiver, pattern)
+ return i.GlobWithPredicate(ctx, call, pattern, "")
+}
+
+func (i *globInternal) GlobWithPredicate(ctx *context.T, call rpc.StreamServerCall, pattern, predicate string) error {
+ ctx.VI(3).Infof("rpc Glob: Incoming request: %q.GlobWithPrediate(%q, %q)", i.receiver, pattern, predicate)
g, err := glob.Parse(pattern)
if err != nil {
return err
@@ -278,7 +292,7 @@
}
if gs.AllGlobber != nil {
ctx.VI(3).Infof("rpc Glob: %q implements AllGlobber", suffix)
- ch, err := gs.AllGlobber.Glob__(ctx, subcall, state.glob.String())
+ ch, err := gs.AllGlobber.GlobWithPredicate__(ctx, subcall, state.glob.String(), predicate)
if err != nil {
ctx.VI(3).Infof("rpc Glob: %q.Glob(%q) failed: %v", suffix, state.glob, err)
subcall.Send(naming.GlobReplyError{naming.GlobError{Name: state.name, Error: verror.Convert(verror.ErrInternal, ctx, err)}})
diff --git a/runtime/internal/rpc/server.go b/runtime/internal/rpc/server.go
index c34a728..95609d8 100644
--- a/runtime/internal/rpc/server.go
+++ b/runtime/internal/rpc/server.go
@@ -120,6 +120,7 @@
ns namespace.T
servesMountTable bool
isLeaf bool
+ properties naming.Properties
// TODO(cnicolaou): add roaming stats to rpcStats
stats *rpcStats // stats for this server.
@@ -777,14 +778,24 @@
if err != nil {
return verror.New(verror.ErrBadArg, s.ctx, fmt.Sprintf("bad object: %v", err))
}
- s.setLeaf(true)
+ // TODO(jhahn): Figure out a better way to call Signature locally.
+ sigs, err := invoker.Signature(s.ctx, nil)
+ if err != nil {
+ return err
+ }
+ interfaces := make([]string, len(sigs))
+ for i, sig := range sigs {
+ interfaces[i] = sig.PkgPath
+ }
+ s.setLeaf(true, naming.Properties{naming.PropInterface: interfaces})
return s.ServeDispatcher(name, &leafDispatcher{invoker, authorizer})
}
-func (s *server) setLeaf(value bool) {
+func (s *server) setLeaf(value bool, properties naming.Properties) {
s.Lock()
defer s.Unlock()
s.isLeaf = value
+ s.properties = properties
for ls, _ := range s.listenState {
for i := range ls.ieps {
ls.ieps[i].IsLeaf = s.isLeaf
@@ -810,12 +821,12 @@
s.publisher.AddServer(iep.String())
}
}
- s.publisher.AddName(name, s.servesMountTable, s.isLeaf)
+ s.publisher.AddName(name, s.servesMountTable, s.isLeaf, s.properties)
}
return nil
}
-func (s *server) AddName(name string) error {
+func (s *server) AddName(name string, properties naming.Properties) error {
defer apilog.LogCallf(nil, "name=%.10s...", name)(nil, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
if len(name) == 0 {
return verror.New(verror.ErrBadArg, s.ctx, "name is empty")
@@ -826,7 +837,7 @@
return err
}
vtrace.GetSpan(s.ctx).Annotate("Serving under name: " + name)
- s.publisher.AddName(name, s.servesMountTable, s.isLeaf)
+ s.publisher.AddName(name, s.servesMountTable, s.isLeaf, properties)
return nil
}
diff --git a/runtime/internal/rpc/stream/proxy/proxy.go b/runtime/internal/rpc/stream/proxy/proxy.go
index 5d30d3c..d3eec83 100644
--- a/runtime/internal/rpc/stream/proxy/proxy.go
+++ b/runtime/internal/rpc/stream/proxy/proxy.go
@@ -210,7 +210,8 @@
pub = publisher.New(ctx, v23.GetNamespace(ctx), time.Minute)
pub.AddServer(proxy.endpoint().String())
}
- pub.AddName(name, false, true)
+ // TODO(jhahn): Any properties for proxy?
+ pub.AddName(name, false, true, nil)
}
shutdown = func() {
diff --git a/services/mounttable/mounttablelib/mounttable.go b/services/mounttable/mounttablelib/mounttable.go
index 666a355..ccd16f4 100644
--- a/services/mounttable/mounttablelib/mounttable.go
+++ b/services/mounttable/mounttablelib/mounttable.go
@@ -29,14 +29,15 @@
const defaultMaxNodesPerUser = 1000
var (
- errMalformedAddress = verror.Register(pkgPath+".errMalformedAddress", verror.NoRetry, "{1:}{2:} malformed address {3} for mounted server {4}{:_}")
- errMTDoesntMatch = verror.Register(pkgPath+".errMTDoesntMatch", verror.NoRetry, "{1:}{2:} MT doesn't match{:_}")
- errLeafDoesntMatch = verror.Register(pkgPath+".errLeafDoesntMatch", verror.NoRetry, "{1:}{2:} Leaf doesn't match{:_}")
- errCantDeleteRoot = verror.Register(pkgPath+".errCantDeleteRoot", verror.NoRetry, "{1:}{2:} cannot delete root node{:_}")
- errNotEmpty = verror.Register(pkgPath+".errNotEmpty", verror.NoRetry, "{1:}{2:} cannot delete {3}: has children{:_}")
- errNamingLoop = verror.Register(pkgPath+".errNamingLoop", verror.NoRetry, "{1:}{2:} Loop in namespace{:_}")
- errTooManyNodes = verror.Register(pkgPath+".errTooManyNodes", verror.NoRetry, "{1:}{2:} User has exceeded his node limit {:_}")
- errNoSharedRoot = verror.Register(pkgPath+".errNoSharedRoot", verror.NoRetry, "{1:}{2:} Server and User share no blessing root {:_}")
+ errMalformedAddress = verror.Register(pkgPath+".errMalformedAddress", verror.NoRetry, "{1:}{2:} malformed address {3} for mounted server {4}{:_}")
+ errMTDoesntMatch = verror.Register(pkgPath+".errMTDoesntMatch", verror.NoRetry, "{1:}{2:} MT doesn't match{:_}")
+ errLeafDoesntMatch = verror.Register(pkgPath+".errLeafDoesntMatch", verror.NoRetry, "{1:}{2:} Leaf doesn't match{:_}")
+ errPropertiesDontMatch = verror.Register(pkgPath+".errPropertiesDontMatch", verror.NoRetry, "{1:}{2:} Properties doesn't match{:_}")
+ errCantDeleteRoot = verror.Register(pkgPath+".errCantDeleteRoot", verror.NoRetry, "{1:}{2:} cannot delete root node{:_}")
+ errNotEmpty = verror.Register(pkgPath+".errNotEmpty", verror.NoRetry, "{1:}{2:} cannot delete {3}: has children{:_}")
+ errNamingLoop = verror.Register(pkgPath+".errNamingLoop", verror.NoRetry, "{1:}{2:} Loop in namespace{:_}")
+ errTooManyNodes = verror.Register(pkgPath+".errTooManyNodes", verror.NoRetry, "{1:}{2:} User has exceeded his node limit {:_}")
+ errNoSharedRoot = verror.Register(pkgPath+".errNoSharedRoot", verror.NoRetry, "{1:}{2:} Server and User share no blessing root {:_}")
)
var (
@@ -83,9 +84,10 @@
// here. The servers are considered equivalent, i.e., RPCs to a name below this
// point can be sent to any of these servers.
type mount struct {
- servers *serverList
- mt bool
- leaf bool
+ servers *serverList
+ mt bool
+ leaf bool
+ properties naming.Properties
}
// node is a single point in the tree representing the mount table.
@@ -473,7 +475,7 @@
}
// Mount a server onto the name in the receiver.
-func (ms *mountContext) Mount(ctx *context.T, call rpc.ServerCall, server string, ttlsecs uint32, flags naming.MountFlag) error {
+func (ms *mountContext) Mount(ctx *context.T, call rpc.ServerCall, server string, ttlsecs uint32, flags naming.MountFlag, properties naming.Properties) error {
mt := ms.mt
if ttlsecs == 0 {
ttlsecs = 10 * 365 * 24 * 60 * 60 // a really long time
@@ -511,6 +513,9 @@
if wantLeaf != n.mount.leaf {
return verror.New(errLeafDoesntMatch, ctx)
}
+ if !reflect.DeepEqual(properties, n.mount.properties) {
+ return verror.New(errPropertiesDontMatch, ctx)
+ }
}
// Remove any existing children.
for child := range n.children {
@@ -522,7 +527,7 @@
n.mount = nil
}
if n.mount == nil {
- n.mount = &mount{servers: newServerList(), mt: wantMT, leaf: wantLeaf}
+ n.mount = &mount{servers: newServerList(), mt: wantMT, leaf: wantLeaf, properties: properties}
}
n.mount.servers.add(server, time.Duration(ttlsecs)*time.Second)
mt.serverCounter.Incr(numServers(n) - nServersBefore)
@@ -645,7 +650,7 @@
}
// globStep is called with n and n.parent locked. Returns with both unlocked.
-func (mt *mountTable) globStep(ctx *context.T, call security.Call, n *node, name string, pattern *glob.Glob, ch chan<- naming.GlobReply) {
+func (mt *mountTable) globStep(ctx *context.T, call security.Call, n *node, name string, pattern *glob.Glob, predicate *glob.Predicate, ch chan<- naming.GlobReply) {
if shouldAbort(ctx) {
n.parent.Unlock()
n.Unlock()
@@ -666,8 +671,17 @@
}
// Don't need the parent lock anymore.
n.parent.Unlock()
+ // Evaluate the predicate if any
+ notMatched := predicate != nil && !predicate.Match(m.properties)
+ // Don't return the not-matched entry if it is a leaf.
+ // TODO(jhahn): Handle the dynamic property case.
+ if notMatched && n.mount.leaf {
+ n.Unlock()
+ return
+ }
me := naming.MountEntry{
- Name: name,
+ Name: name,
+ NotMatched: notMatched,
}
// Only fill in the mount info if we can resolve this name.
if err := n.satisfies(mt, ctx, call, resolveTags); err == nil {
@@ -721,7 +735,7 @@
c.Unlock()
continue
}
- mt.globStep(ctx, call, c, naming.Join(name, k), suffix, ch)
+ mt.globStep(ctx, call, c, naming.Join(name, k), suffix, predicate, ch)
n.Lock()
}
}
@@ -734,7 +748,7 @@
out:
// Remove if no longer useful.
- if n.removeUseless(mt) || pattern.Len() != 0 {
+ if n.removeUseless(mt) || pattern.Len() != 0 || predicate != nil {
n.parent.Unlock()
n.Unlock()
return
@@ -764,6 +778,10 @@
// adds a/b while a Glob is in progress, the Glob may return a set of nodes that includes both
// c/d and a/b.
func (ms *mountContext) Glob__(ctx *context.T, call rpc.ServerCall, pattern string) (<-chan naming.GlobReply, error) {
+ return ms.GlobWithPredicate__(ctx, call, pattern, "")
+}
+
+func (ms *mountContext) GlobWithPredicate__(ctx *context.T, call rpc.ServerCall, pattern, predicate string) (<-chan naming.GlobReply, error) {
ctx.VI(2).Infof("mt.Glob %v", ms.elems)
scall := call.Security()
@@ -771,6 +789,10 @@
if err != nil {
return nil, err
}
+ p, err := glob.ParsePredicate(predicate)
+ if err != nil {
+ return nil, err
+ }
mt := ms.mt
ch := make(chan naming.GlobReply)
@@ -788,7 +810,7 @@
ms.linkToLeaf(ctx, scall, ch)
return
}
- mt.globStep(ctx, scall, n, "", g, ch)
+ mt.globStep(ctx, scall, n, "", g, p, ch)
}()
return ch, nil
}
diff --git a/services/mounttable/mounttablelib/neighborhood.go b/services/mounttable/mounttablelib/neighborhood.go
index 44a2cdb..85d9dcd 100644
--- a/services/mounttable/mounttablelib/neighborhood.go
+++ b/services/mounttable/mounttablelib/neighborhood.go
@@ -253,7 +253,7 @@
}
// Mount not implemented.
-func (ns *neighborhoodService) Mount(ctx *context.T, _ rpc.ServerCall, _ string, _ uint32, _ naming.MountFlag) error {
+func (ns *neighborhoodService) Mount(ctx *context.T, _ rpc.ServerCall, _ string, _ uint32, _ naming.MountFlag, _ naming.Properties) error {
return verror.New(errDoesntImplementMount, ctx)
}
diff --git a/test/discovery/a/a.vdl b/test/discovery/a/a.vdl
new file mode 100644
index 0000000..05d1a67
--- /dev/null
+++ b/test/discovery/a/a.vdl
@@ -0,0 +1,13 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package a
+
+import (
+ "v.io/v23/security/access"
+)
+
+type AService interface {
+ AMethod() error { access.Read }
+}
diff --git a/test/discovery/a/a.vdl.go b/test/discovery/a/a.vdl.go
new file mode 100644
index 0000000..143ea91
--- /dev/null
+++ b/test/discovery/a/a.vdl.go
@@ -0,0 +1,113 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// This file was auto-generated by the vanadium vdl tool.
+// Source: a.vdl
+
+package a
+
+import (
+ // VDL system imports
+ "v.io/v23"
+ "v.io/v23/context"
+ "v.io/v23/rpc"
+ "v.io/v23/vdl"
+
+ // VDL user imports
+ "v.io/v23/security/access"
+)
+
+// AServiceClientMethods is the client interface
+// containing AService methods.
+type AServiceClientMethods interface {
+ AMethod(*context.T, ...rpc.CallOpt) error
+}
+
+// AServiceClientStub adds universal methods to AServiceClientMethods.
+type AServiceClientStub interface {
+ AServiceClientMethods
+ rpc.UniversalServiceMethods
+}
+
+// AServiceClient returns a client stub for AService.
+func AServiceClient(name string) AServiceClientStub {
+ return implAServiceClientStub{name}
+}
+
+type implAServiceClientStub struct {
+ name string
+}
+
+func (c implAServiceClientStub) AMethod(ctx *context.T, opts ...rpc.CallOpt) (err error) {
+ err = v23.GetClient(ctx).Call(ctx, c.name, "AMethod", nil, nil, opts...)
+ return
+}
+
+// AServiceServerMethods is the interface a server writer
+// implements for AService.
+type AServiceServerMethods interface {
+ AMethod(*context.T, rpc.ServerCall) error
+}
+
+// AServiceServerStubMethods is the server interface containing
+// AService methods, as expected by rpc.Server.
+// There is no difference between this interface and AServiceServerMethods
+// since there are no streaming methods.
+type AServiceServerStubMethods AServiceServerMethods
+
+// AServiceServerStub adds universal methods to AServiceServerStubMethods.
+type AServiceServerStub interface {
+ AServiceServerStubMethods
+ // Describe the AService interfaces.
+ Describe__() []rpc.InterfaceDesc
+}
+
+// AServiceServer returns a server stub for AService.
+// It converts an implementation of AServiceServerMethods into
+// an object that may be used by rpc.Server.
+func AServiceServer(impl AServiceServerMethods) AServiceServerStub {
+ stub := implAServiceServerStub{
+ impl: impl,
+ }
+ // Initialize GlobState; always check the stub itself first, to handle the
+ // case where the user has the Glob method defined in their VDL source.
+ if gs := rpc.NewGlobState(stub); gs != nil {
+ stub.gs = gs
+ } else if gs := rpc.NewGlobState(impl); gs != nil {
+ stub.gs = gs
+ }
+ return stub
+}
+
+type implAServiceServerStub struct {
+ impl AServiceServerMethods
+ gs *rpc.GlobState
+}
+
+func (s implAServiceServerStub) AMethod(ctx *context.T, call rpc.ServerCall) error {
+ return s.impl.AMethod(ctx, call)
+}
+
+func (s implAServiceServerStub) Globber() *rpc.GlobState {
+ return s.gs
+}
+
+func (s implAServiceServerStub) Describe__() []rpc.InterfaceDesc {
+ return []rpc.InterfaceDesc{AServiceDesc}
+}
+
+// AServiceDesc describes the AService interface.
+var AServiceDesc rpc.InterfaceDesc = descAService
+
+// descAService hides the desc to keep godoc clean.
+var descAService = rpc.InterfaceDesc{
+ Name: "AService",
+ PkgPath: "v.io/x/ref/test/discovery/a",
+ Methods: []rpc.MethodDesc{
+ {
+ Name: "AMethod",
+ Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Read"))},
+ },
+ },
+}
diff --git a/test/discovery/b/b.vdl b/test/discovery/b/b.vdl
new file mode 100644
index 0000000..af45268
--- /dev/null
+++ b/test/discovery/b/b.vdl
@@ -0,0 +1,16 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package b
+
+import (
+ "v.io/v23/security/access"
+ "v.io/x/ref/test/discovery/a"
+)
+
+type AnotherService interface {
+ a.AService
+
+ AnotherMethod() error { access.Read }
+}
diff --git a/test/discovery/b/b.vdl.go b/test/discovery/b/b.vdl.go
new file mode 100644
index 0000000..99d2b2c
--- /dev/null
+++ b/test/discovery/b/b.vdl.go
@@ -0,0 +1,123 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// This file was auto-generated by the vanadium vdl tool.
+// Source: b.vdl
+
+package b
+
+import (
+ // VDL system imports
+ "v.io/v23"
+ "v.io/v23/context"
+ "v.io/v23/rpc"
+ "v.io/v23/vdl"
+
+ // VDL user imports
+ "v.io/v23/security/access"
+ "v.io/x/ref/test/discovery/a"
+)
+
+// AnotherServiceClientMethods is the client interface
+// containing AnotherService methods.
+type AnotherServiceClientMethods interface {
+ a.AServiceClientMethods
+ AnotherMethod(*context.T, ...rpc.CallOpt) error
+}
+
+// AnotherServiceClientStub adds universal methods to AnotherServiceClientMethods.
+type AnotherServiceClientStub interface {
+ AnotherServiceClientMethods
+ rpc.UniversalServiceMethods
+}
+
+// AnotherServiceClient returns a client stub for AnotherService.
+func AnotherServiceClient(name string) AnotherServiceClientStub {
+ return implAnotherServiceClientStub{name, a.AServiceClient(name)}
+}
+
+type implAnotherServiceClientStub struct {
+ name string
+
+ a.AServiceClientStub
+}
+
+func (c implAnotherServiceClientStub) AnotherMethod(ctx *context.T, opts ...rpc.CallOpt) (err error) {
+ err = v23.GetClient(ctx).Call(ctx, c.name, "AnotherMethod", nil, nil, opts...)
+ return
+}
+
+// AnotherServiceServerMethods is the interface a server writer
+// implements for AnotherService.
+type AnotherServiceServerMethods interface {
+ a.AServiceServerMethods
+ AnotherMethod(*context.T, rpc.ServerCall) error
+}
+
+// AnotherServiceServerStubMethods is the server interface containing
+// AnotherService methods, as expected by rpc.Server.
+// There is no difference between this interface and AnotherServiceServerMethods
+// since there are no streaming methods.
+type AnotherServiceServerStubMethods AnotherServiceServerMethods
+
+// AnotherServiceServerStub adds universal methods to AnotherServiceServerStubMethods.
+type AnotherServiceServerStub interface {
+ AnotherServiceServerStubMethods
+ // Describe the AnotherService interfaces.
+ Describe__() []rpc.InterfaceDesc
+}
+
+// AnotherServiceServer returns a server stub for AnotherService.
+// It converts an implementation of AnotherServiceServerMethods into
+// an object that may be used by rpc.Server.
+func AnotherServiceServer(impl AnotherServiceServerMethods) AnotherServiceServerStub {
+ stub := implAnotherServiceServerStub{
+ impl: impl,
+ AServiceServerStub: a.AServiceServer(impl),
+ }
+ // Initialize GlobState; always check the stub itself first, to handle the
+ // case where the user has the Glob method defined in their VDL source.
+ if gs := rpc.NewGlobState(stub); gs != nil {
+ stub.gs = gs
+ } else if gs := rpc.NewGlobState(impl); gs != nil {
+ stub.gs = gs
+ }
+ return stub
+}
+
+type implAnotherServiceServerStub struct {
+ impl AnotherServiceServerMethods
+ a.AServiceServerStub
+ gs *rpc.GlobState
+}
+
+func (s implAnotherServiceServerStub) AnotherMethod(ctx *context.T, call rpc.ServerCall) error {
+ return s.impl.AnotherMethod(ctx, call)
+}
+
+func (s implAnotherServiceServerStub) Globber() *rpc.GlobState {
+ return s.gs
+}
+
+func (s implAnotherServiceServerStub) Describe__() []rpc.InterfaceDesc {
+ return []rpc.InterfaceDesc{AnotherServiceDesc, a.AServiceDesc}
+}
+
+// AnotherServiceDesc describes the AnotherService interface.
+var AnotherServiceDesc rpc.InterfaceDesc = descAnotherService
+
+// descAnotherService hides the desc to keep godoc clean.
+var descAnotherService = rpc.InterfaceDesc{
+ Name: "AnotherService",
+ PkgPath: "v.io/x/ref/test/discovery/b",
+ Embeds: []rpc.EmbedDesc{
+ {"AService", "v.io/x/ref/test/discovery/a", ``},
+ },
+ Methods: []rpc.MethodDesc{
+ {
+ Name: "AnotherMethod",
+ Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Read"))},
+ },
+ },
+}
diff --git a/test/discovery/testserver1.go b/test/discovery/testserver1.go
new file mode 100644
index 0000000..2c80c1e
--- /dev/null
+++ b/test/discovery/testserver1.go
@@ -0,0 +1,80 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package main
+
+import (
+ "fmt"
+
+ "v.io/v23"
+ "v.io/x/lib/cmdline"
+ "v.io/x/lib/vlog"
+
+ "v.io/v23/context"
+ "v.io/v23/rpc"
+ "v.io/v23/security"
+
+ "v.io/x/ref/lib/signals"
+ "v.io/x/ref/lib/v23cmd"
+ "v.io/x/ref/lib/xrpc"
+ _ "v.io/x/ref/runtime/factories/roaming"
+ "v.io/x/ref/services/mounttable/mounttablelib"
+ "v.io/x/ref/test/discovery/b"
+)
+
+var (
+ mtPort int
+ nhName string
+)
+
+func init() {
+ cmdRun.Flags.IntVar(&mtPort, "mtport", 9000, "port number of mounttable to run")
+ cmdRun.Flags.StringVar(&nhName, "nhname", "", "neighborhood mt name")
+}
+
+var cmdRun = &cmdline.Command{
+ Runner: v23cmd.RunnerFunc(runServer),
+ Name: "run",
+ Short: "Run a server for testing discovery",
+}
+
+func runServer(ctx *context.T, env *cmdline.Env, args []string) error {
+ mtName, stopMT, err := startMounttable(ctx)
+ if err != nil {
+ return err
+ }
+ defer stopMT()
+ ctx, _, err = v23.WithNewNamespace(ctx, mtName)
+ if err != nil {
+ return err
+ }
+ server, err := xrpc.NewServer(ctx, "test/x", b.AnotherServiceServer(&impl{}), security.AllowEveryone())
+ if err != nil {
+ ctx.Fatalf("NewServer failed: %v", err)
+ }
+ ctx.Infof("Listening on %s", server.Status().Endpoints[0].Name())
+ <-signals.ShutdownOnSignals(ctx)
+ return nil
+}
+
+func startMounttable(ctx *context.T) (string, func(), error) {
+ listenSpec := rpc.ListenSpec{Addrs: rpc.ListenAddrs{{"wsh", fmt.Sprintf(":%d", mtPort)}}}
+ mtName, stopMT, err := mounttablelib.StartServers(ctx, listenSpec, "", nhName, "", "", "mounttable")
+ if err != nil {
+ vlog.Errorf("mounttablelib.StartServers failed: %v", err)
+ return "", nil, err
+ }
+ vlog.Infof("Started local mounttable at: %v", mtName)
+ return mtName, stopMT, nil
+}
+
+type impl struct{}
+
+func (i *impl) AMethod(_ *context.T, _ rpc.ServerCall) error { return nil }
+func (i *impl) AnotherMethod(_ *context.T, _ rpc.ServerCall) error { return nil }
+
+func main() {
+ cmdline.HideGlobalFlagsExcept()
+ cmdline.Main(cmdRun)
+}
diff --git a/test/discovery/testserver2.go b/test/discovery/testserver2.go
new file mode 100644
index 0000000..b78847f
--- /dev/null
+++ b/test/discovery/testserver2.go
@@ -0,0 +1,142 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package main
+
+import (
+ "fmt"
+ "strings"
+
+ "v.io/v23"
+ "v.io/v23/context"
+ "v.io/v23/glob"
+ "v.io/v23/naming"
+ "v.io/v23/rpc"
+ "v.io/v23/security"
+
+ "v.io/x/lib/cmdline"
+ "v.io/x/lib/vlog"
+ "v.io/x/ref/lib/signals"
+ "v.io/x/ref/lib/v23cmd"
+ "v.io/x/ref/lib/xrpc"
+ _ "v.io/x/ref/runtime/factories/roaming"
+ "v.io/x/ref/services/mounttable/mounttablelib"
+ "v.io/x/ref/test/discovery/a"
+ "v.io/x/ref/test/discovery/b"
+)
+
+var (
+ mtPort int
+ nhName string
+)
+
+func init() {
+ cmdRun.Flags.IntVar(&mtPort, "mtport", 9000, "port number of mounttable to run")
+ cmdRun.Flags.StringVar(&nhName, "nhname", "", "neighborhood mt name")
+}
+
+var cmdRun = &cmdline.Command{
+ Runner: v23cmd.RunnerFunc(runServer),
+ Name: "run",
+ Short: "Run a server for testing discovery",
+}
+
+func runServer(ctx *context.T, env *cmdline.Env, args []string) error {
+ mtName, stopMT, err := startMounttable(ctx)
+ if err != nil {
+ return err
+ }
+ defer stopMT()
+ ctx, _, err = v23.WithNewNamespace(ctx, mtName)
+ if err != nil {
+ return err
+ }
+ server, err := xrpc.NewDispatchingServer(ctx, "test2", disp2{})
+ if err != nil {
+ ctx.Fatalf("NewServer failed: %v", err)
+ }
+ ctx.Infof("Listening on %s", server.Status().Endpoints[0].Name())
+ <-signals.ShutdownOnSignals(ctx)
+ return nil
+}
+
+func startMounttable(ctx *context.T) (string, func(), error) {
+ listenSpec := rpc.ListenSpec{Addrs: rpc.ListenAddrs{{"wsh", fmt.Sprintf(":%d", mtPort)}}}
+ mtName, stopMT, err := mounttablelib.StartServers(ctx, listenSpec, "", nhName, "", "", "mounttable")
+ if err != nil {
+ vlog.Errorf("mounttablelib.StartServers failed: %v", err)
+ return "", nil, err
+ }
+ vlog.Infof("Started local mounttable at: %v", mtName)
+ return mtName, stopMT, nil
+}
+
+var services2 map[string]rpc.Invoker
+
+func init() {
+ services2 = make(map[string]rpc.Invoker)
+ services2["a"] = rpc.ReflectInvokerOrDie(a.AServiceServer(&impl{}))
+ services2["b"] = rpc.ReflectInvokerOrDie(b.AnotherServiceServer(&impl{}))
+ services2["c"] = rpc.ReflectInvokerOrDie(b.AnotherServiceServer(&impl{}))
+ services2["d/e"] = rpc.ReflectInvokerOrDie(a.AServiceServer(&impl{}))
+}
+
+type disp2 struct{}
+
+func (_ disp2) Lookup(_ *context.T, suffix string) (interface{}, security.Authorizer, error) {
+ suffix = strings.TrimLeft(suffix, "/")
+ if suffix == "" {
+ return rpc.ReflectInvokerOrDie(&root{}), security.AllowEveryone(), nil
+ }
+ invoker, ok := services2[suffix]
+ if ok {
+ return invoker, security.AllowEveryone(), nil
+ }
+ return nil, nil, nil
+}
+
+type root struct{}
+
+func (r *root) Glob__(ctx *context.T, call rpc.ServerCall, pattern string) (<-chan naming.GlobReply, error) {
+ return r.GlobWithPredicate__(ctx, call, pattern, "")
+}
+
+func (r *root) GlobWithPredicate__(ctx *context.T, call rpc.ServerCall, pattern, predicate string) (<-chan naming.GlobReply, error) {
+ p, err := glob.ParsePredicate(predicate)
+ if err != nil {
+ return nil, err
+ }
+
+ ch := make(chan naming.GlobReply)
+ go func() {
+ defer close(ch)
+
+ for suffix, invoker := range services2 {
+ sigs, err := invoker.Signature(ctx, nil)
+ if err != nil {
+ ch <- naming.GlobReplyError{naming.GlobError{Error: err}}
+ continue
+ }
+
+ interfaces := make([]string, len(sigs))
+ for i, sig := range sigs {
+ interfaces[i] = sig.PkgPath
+ }
+ if p == nil || p.Match(naming.Properties{naming.PropInterface: interfaces}) {
+ ch <- naming.GlobReplyEntry{naming.MountEntry{Name: suffix, IsLeaf: true}}
+ }
+ }
+ }()
+ return ch, nil
+}
+
+type impl struct{}
+
+func (i *impl) AMethod(_ *context.T, _ rpc.ServerCall) error { return nil }
+func (i *impl) AnotherMethod(_ *context.T, _ rpc.ServerCall) error { return nil }
+
+func main() {
+ cmdline.HideGlobalFlagsExcept()
+ cmdline.Main(cmdRun)
+}