blob: f753e4e203398c176067bdc9363a5b5d6a5a0bde [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package namespace
import (
"v.io/v23/context"
"v.io/v23/naming"
"v.io/v23/options"
"v.io/v23/rpc"
"v.io/v23/verror"
inaming "v.io/x/ref/profiles/internal/naming"
)
type startStatus struct {
index int
err error
call rpc.ClientCall
}
func tryStartCall(ctx *context.T, client rpc.Client, target, method string, args []interface{}, c chan startStatus, index int, opts ...rpc.CallOpt) {
call, err := client.StartCall(ctx, target, method, args, append(opts, options.NoResolve{})...)
c <- startStatus{index: index, err: err, call: call}
}
// parallelStartCall returns the first succeeding StartCall.
func (ns *namespace) parallelStartCall(ctx *context.T, client rpc.Client, servers []string, method string, args []interface{}, opts []rpc.CallOpt) (rpc.ClientCall, error) {
if len(servers) == 0 {
return nil, verror.New(verror.ErrNoExist, ctx, "no servers to resolve query")
}
// StartCall to each of the servers.
c := make(chan startStatus, len(servers))
cancelFuncs := make([]context.CancelFunc, len(servers))
for index, server := range servers {
callCtx, cancel := context.WithTimeout(ctx, callTimeout)
cancelFuncs[index] = cancel
go tryStartCall(callCtx, client, server, method, args, c, index, opts...)
}
// First positive response wins. Cancel the rest. The cancellation
// will prevent any RPCs from starting or progressing. We do not close
// the channel since some go routines may still be in flight and want to
// write status to it. The channel will be garbage collected when all
// references to it disappear.
var final startStatus
for range servers {
final = <-c
if final.err == nil {
cancelFuncs[final.index] = nil
break
}
}
// Cancel the rest.
for _, cancel := range cancelFuncs {
if cancel != nil {
cancel()
}
}
return final.call, final.err
}
type status struct {
id string
err error
}
// nameToRID converts a name to a routing ID string. If a routing ID can't be obtained,
// it just returns the name.
func nameToRID(name string) string {
address, _ := naming.SplitAddressName(name)
if ep, err := inaming.NewEndpoint(address); err == nil {
return ep.RID.String()
}
return name
}
// collectStati collects n status messages from channel c and returns an error if, for
// any id, there is no successful reply.
func collectStati(c chan status, n int) error {
// Make a map indexed by the routing id (or address if routing id not found) of
// each mount table. A mount table may be reachable via multiple addresses but
// each address should have the same routing id. We should only return an error
// if any of the ids had no successful mounts.
statusByID := make(map[string]error)
// Get the status of each request.
for i := 0; i < n; i++ {
s := <-c
if _, ok := statusByID[s.id]; !ok || s.err == nil {
statusByID[s.id] = s.err
}
}
// Return any error.
for _, s := range statusByID {
if s != nil {
return s
}
}
return nil
}
// dispatch executes f in parallel for each mount table implementing mTName.
func (ns *namespace) dispatch(ctx *context.T, mTName string, f func(*context.T, string, string) status, opts []naming.NamespaceOpt) error {
// Resolve to all the mount tables implementing name.
me, err := ns.ResolveToMountTable(ctx, mTName, opts...)
if err != nil {
return err
}
mts := me.Names()
// Apply f to each of the returned mount tables.
c := make(chan status, len(mts))
for _, mt := range mts {
go func(mt string) {
c <- f(ctx, mt, nameToRID(mt))
}(mt)
}
finalerr := collectStati(c, len(mts))
// Forget any previous cached information about these names.
ns.resolutionCache.forget(mts)
return finalerr
}