blob: f5823582b0488f25bff3421a3928cb901a06cbb6 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package namespace
import (
"io"
"strings"
"sync"
"v.io/v23"
"v.io/v23/context"
"v.io/v23/glob"
"v.io/v23/naming"
"v.io/v23/options"
"v.io/v23/rpc"
"v.io/v23/verror"
"v.io/x/ref/lib/apilog"
)
type tracks struct {
m sync.Mutex
places map[string]struct{}
}
func (tr *tracks) beenThereDoneThat(servers []naming.MountedServer, pstr string) bool {
tr.m.Lock()
defer tr.m.Unlock()
found := false
for _, s := range servers {
x := naming.JoinAddressName(s.Server, "") + "!" + pstr
if _, ok := tr.places[x]; ok {
found = true
}
tr.places[x] = struct{}{}
}
return found
}
// task is a sub-glob that has to be performed against a mount table. Tasks are
// done in parrallel to speed up the glob.
type task struct {
pattern *glob.Glob // pattern to match
er *naming.GlobError // error for that particular point in the name space
me *naming.MountEntry // server to match at
error error // any error performing this task
depth int // number of mount tables traversed recursively
}
// globAtServer performs a Glob on the servers at a mount point. It cycles through the set of
// servers until it finds one that replies.
func (ns *namespace) globAtServer(ctx *context.T, t *task, replies chan *task, tr *tracks, opts []rpc.CallOpt) {
defer func() {
if t.error == nil {
replies <- nil
} else {
replies <- t
}
}()
client := v23.GetClient(ctx)
pstr := t.pattern.String()
ctx.VI(2).Infof("globAtServer(%v, %v)", *t.me, pstr)
// If there are no servers to call, this isn't a mount point. No sense
// trying to call servers that aren't there.
if len(t.me.Servers) == 0 {
t.error = nil
return
}
// If we've been there before with the same request, give up.
if tr.beenThereDoneThat(t.me.Servers, pstr) {
t.error = nil
return
}
// t.me.Name has already been matched at this point to so don't pass it to the Call. Kind of sleazy to do this
// but it avoids making yet another copy of the MountEntry.
on := t.me.Name
t.me.Name = ""
timeoutCtx, cancel := withTimeout(ctx)
defer cancel()
call, err := client.StartCall(timeoutCtx, "", rpc.GlobMethod, []interface{}{pstr}, append(opts, options.Preresolved{t.me})...)
t.me.Name = on
if err != nil {
t.error = err
return
}
// At this point we're commited to the server that answered the call
// first. Cycle through all replies from that server.
for {
// If the mount table returns an error, we're done. Send the task to the channel
// including the error. This terminates the task.
var gr naming.GlobReply
err := call.Recv(&gr)
if err == io.EOF {
break
}
if err != nil {
t.error = err
return
}
var x *task
switch v := gr.(type) {
case naming.GlobReplyEntry:
// Convert to the ever so slightly different name.MountTable version of a MountEntry
// and add it to the list.
x = &task{
me: &naming.MountEntry{
Name: naming.Join(t.me.Name, v.Value.Name),
Servers: v.Value.Servers,
ServesMountTable: v.Value.ServesMountTable,
IsLeaf: v.Value.IsLeaf,
},
depth: t.depth + 1,
}
case naming.GlobReplyError:
// Pass on the error.
x = &task{
er: &v.Value,
depth: t.depth + 1,
}
}
// x.depth is the number of servers we've walked through since we've gone
// recursive (i.e. with pattern length of 0). Limit the depth of globs.
// TODO(p): return an error?
if t.pattern.Len() == 0 {
if x.depth > ns.maxRecursiveGlobDepth {
continue
}
}
replies <- x
}
t.error = call.Finish()
return
}
// depth returns the directory depth of a given name. It is used to pick off the unsatisfied part of the pattern.
func depth(name string) int {
name = strings.Trim(naming.Clean(name), "/")
if name == "" {
return 0
}
return strings.Count(name, "/") + 1
}
// globLoop fires off a go routine for each server and reads backs replies.
func (ns *namespace) globLoop(ctx *context.T, e *naming.MountEntry, prefix string, pattern *glob.Glob, reply chan naming.GlobReply, tr *tracks, opts []rpc.CallOpt) {
defer close(reply)
// Provide enough buffers to avoid too much switching between the readers and the writers.
// This size is just a guess.
replies := make(chan *task, 100)
defer close(replies)
// Push the first task into the channel to start the ball rolling. This task has the
// root of the search and the full pattern. It will be the first task fired off in the for
// loop that follows.
replies <- &task{me: e, pattern: pattern}
replies <- nil
inFlight := 1
// Perform a parallel search of the name graph. Each task will send what it learns
// on the replies channel. If the reply is a mount point and the pattern is not completely
// fulfilled, a new task will be fired off to handle it.
for inFlight != 0 {
t := <-replies
// A nil reply represents a successfully terminated task.
// If no tasks are running, return.
if t == nil {
inFlight--
continue
}
// We want to output this entry if there was a real error other than
// "not a mount table".
//
// An error reply is also a terminated task.
// If no tasks are running, return.
if t.error != nil {
if !notAnMT(t.error) {
reply <- &naming.GlobReplyError{Value: naming.GlobError{Name: naming.Join(prefix, t.me.Name), Error: t.error}}
}
inFlight--
continue
}
// If this is just an error from the mount table, pass it on.
if t.er != nil {
x := *t.er
x.Name = naming.Join(prefix, x.Name)
reply <- &naming.GlobReplyError{Value: x}
continue
}
// Get the pattern elements below the current path.
suffix := pattern
for i := depth(t.me.Name) - 1; i >= 0; i-- {
suffix = suffix.Tail()
}
// If we've satisfied the request and this isn't the root,
// reply to the caller.
if suffix.Len() == 0 && t.depth != 0 {
x := *t.me
x.Name = naming.Join(prefix, x.Name)
reply <- &naming.GlobReplyEntry{Value: x}
}
// If the pattern is finished (so we're only querying about the root on the
// remote server) and the server is not another MT, then we needn't send the
// query on since we know the server will not supply a new address for the
// current name.
if suffix.Empty() {
if !t.me.ServesMountTable {
continue
}
}
// If this is restricted recursive and not a mount table, don't descend into it.
if suffix.Restricted() && suffix.Len() == 0 && !t.me.ServesMountTable {
continue
}
// Perform a glob at the next server.
inFlight++
t.pattern = suffix
go ns.globAtServer(ctx, t, replies, tr, opts)
}
}
// Glob implements naming.MountTable.Glob.
func (ns *namespace) Glob(ctx *context.T, pattern string, opts ...naming.NamespaceOpt) (<-chan naming.GlobReply, error) {
defer apilog.LogCallf(ctx, "pattern=%.10s...,opts...=%v", pattern, opts)(ctx, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
// Root the pattern. If we have no servers to query, give up.
e, patternWasRooted := ns.rootMountEntry(pattern)
if len(e.Servers) == 0 {
return nil, verror.New(naming.ErrNoMountTable, ctx)
}
// If the name doesn't parse, give up.
g, err := glob.Parse(e.Name)
if err != nil {
return nil, err
}
tr := &tracks{places: make(map[string]struct{})}
// If pattern was already rooted, make sure we tack that root
// onto all returned names. Otherwise, just return the relative
// name.
var prefix string
if patternWasRooted {
prefix = e.Servers[0].Server
}
e.Name = ""
reply := make(chan naming.GlobReply, 100)
go ns.globLoop(ctx, e, prefix, g, reply, tr, getCallOpts(opts))
return reply, nil
}