| // Copyright 2015 The Vanadium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package rpc |
| |
| import ( |
| "fmt" |
| "io" |
| "net" |
| "strings" |
| "sync" |
| "time" |
| |
| "v.io/x/lib/netstate" |
| |
| "v.io/v23" |
| "v.io/v23/context" |
| "v.io/v23/flow" |
| "v.io/v23/i18n" |
| "v.io/v23/namespace" |
| "v.io/v23/naming" |
| "v.io/v23/options" |
| "v.io/v23/rpc" |
| "v.io/v23/security" |
| "v.io/v23/security/access" |
| "v.io/v23/vdl" |
| "v.io/v23/verror" |
| "v.io/v23/vom" |
| "v.io/v23/vtrace" |
| |
| "v.io/x/ref/lib/apilog" |
| "v.io/x/ref/lib/pubsub" |
| "v.io/x/ref/lib/stats" |
| "v.io/x/ref/runtime/internal/lib/publisher" |
| inaming "v.io/x/ref/runtime/internal/naming" |
| ) |
| |
| // TODO(mattr): add/removeAddresses |
| // TODO(mattr): dhcpLoop |
| |
| type xserver struct { |
| sync.Mutex |
| // context used by the server to make internal RPCs, error messages etc. |
| ctx *context.T |
| cancel context.CancelFunc // function to cancel the above context. |
| flowMgr flow.Manager |
| publisher publisher.Publisher // publisher to publish mounttable mounts. |
| settingsPublisher *pubsub.Publisher // pubsub publisher for dhcp |
| settingsName string // pubwsub stream name for dhcp |
| dhcpState *dhcpState // dhcpState, nil if not using dhcp |
| principal security.Principal |
| blessings security.Blessings |
| protoEndpoints []*inaming.Endpoint |
| chosenEndpoints []*inaming.Endpoint |
| typeCache *typeCache |
| |
| disp rpc.Dispatcher // dispatcher to serve RPCs |
| dispReserved rpc.Dispatcher // dispatcher for reserved methods |
| active sync.WaitGroup // active goroutines we've spawned. |
| stoppedChan chan struct{} // closed when the server has been stopped. |
| preferredProtocols []string // protocols to use when resolving proxy name to endpoint. |
| // We cache the IP networks on the device since it is not that cheap to read |
| // network interfaces through os syscall. |
| // TODO(jhahn): Add monitoring the network interface changes. |
| ipNets []*net.IPNet |
| ns namespace.T |
| servesMountTable bool |
| isLeaf bool |
| |
| // TODO(cnicolaou): add roaming stats to rpcStats |
| stats *rpcStats // stats for this server. |
| } |
| |
| func NewServer(ctx *context.T, name string, object interface{}, authorizer security.Authorizer, settingsPublisher *pubsub.Publisher, settingsName string, opts ...rpc.ServerOpt) (rpc.Server, error) { |
| if object == nil { |
| return nil, verror.New(verror.ErrBadArg, ctx, "nil object") |
| } |
| invoker, err := objectToInvoker(object) |
| if err != nil { |
| return nil, verror.New(verror.ErrBadArg, ctx, fmt.Sprintf("bad object: %v", err)) |
| } |
| d := &leafDispatcher{invoker, authorizer} |
| opts = append([]rpc.ServerOpt{options.IsLeaf(true)}, opts...) |
| return NewDispatchingServer(ctx, name, d, settingsPublisher, settingsName, opts...) |
| } |
| |
| func NewDispatchingServer(ctx *context.T, name string, dispatcher rpc.Dispatcher, settingsPublisher *pubsub.Publisher, settingsName string, opts ...rpc.ServerOpt) (rpc.Server, error) { |
| if dispatcher == nil { |
| return nil, verror.New(verror.ErrBadArg, ctx, "nil dispatcher") |
| } |
| ctx, cancel := context.WithRootCancel(ctx) |
| flowMgr := v23.ExperimentalGetFlowManager(ctx) |
| ns, principal := v23.GetNamespace(ctx), v23.GetPrincipal(ctx) |
| statsPrefix := naming.Join("rpc", "server", "routing-id", flowMgr.RoutingID().String()) |
| s := &xserver{ |
| ctx: ctx, |
| cancel: cancel, |
| flowMgr: flowMgr, |
| principal: principal, |
| blessings: principal.BlessingStore().Default(), |
| publisher: publisher.New(ctx, ns, publishPeriod), |
| stoppedChan: make(chan struct{}), |
| ns: ns, |
| stats: newRPCStats(statsPrefix), |
| settingsPublisher: settingsPublisher, |
| settingsName: settingsName, |
| disp: dispatcher, |
| typeCache: newTypeCache(), |
| } |
| ipNets, err := ipNetworks() |
| if err != nil { |
| return nil, err |
| } |
| s.ipNets = ipNets |
| |
| for _, opt := range opts { |
| switch opt := opt.(type) { |
| case options.ServesMountTable: |
| s.servesMountTable = bool(opt) |
| case options.IsLeaf: |
| s.isLeaf = bool(opt) |
| case ReservedNameDispatcher: |
| s.dispReserved = opt.Dispatcher |
| case PreferredServerResolveProtocols: |
| s.preferredProtocols = []string(opt) |
| } |
| } |
| |
| blessingsStatsName := naming.Join(statsPrefix, "security", "blessings") |
| // TODO(caprita): revist printing the blessings with %s, and |
| // instead expose them as a list. |
| stats.NewString(blessingsStatsName).Set(fmt.Sprintf("%s", s.blessings)) |
| stats.NewStringFunc(blessingsStatsName, func() string { |
| return fmt.Sprintf("%s (default)", s.principal.BlessingStore().Default()) |
| }) |
| if err = s.listen(ctx, v23.GetListenSpec(ctx)); err != nil { |
| s.Stop() |
| return nil, err |
| } |
| if len(name) > 0 { |
| for _, ep := range s.chosenEndpoints { |
| s.publisher.AddServer(ep.String()) |
| } |
| s.publisher.AddName(name, s.servesMountTable, s.isLeaf) |
| vtrace.GetSpan(s.ctx).Annotate("Serving under name: " + name) |
| } |
| return s, nil |
| } |
| |
| func (s *xserver) Status() rpc.ServerStatus { |
| ret := rpc.ServerStatus{} |
| for _, e := range s.chosenEndpoints { |
| ret.Endpoints = append(ret.Endpoints, e) |
| } |
| return ret |
| } |
| |
| func (s *xserver) WatchNetwork(ch chan<- rpc.NetworkChange) { |
| defer apilog.LogCallf(nil, "ch=")(nil, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT |
| s.Lock() |
| defer s.Unlock() |
| if s.dhcpState != nil { |
| s.dhcpState.watchers[ch] = struct{}{} |
| } |
| } |
| |
| func (s *xserver) UnwatchNetwork(ch chan<- rpc.NetworkChange) { |
| defer apilog.LogCallf(nil, "ch=")(nil, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT |
| s.Lock() |
| defer s.Unlock() |
| if s.dhcpState != nil { |
| delete(s.dhcpState.watchers, ch) |
| } |
| } |
| |
| // resolveToEndpoint resolves an object name or address to an endpoint. |
| func (s *xserver) resolveToEndpoint(address string) (string, error) { |
| var resolved *naming.MountEntry |
| var err error |
| if s.ns != nil { |
| if resolved, err = s.ns.Resolve(s.ctx, address); err != nil { |
| return "", err |
| } |
| } else { |
| // Fake a namespace resolution |
| resolved = &naming.MountEntry{Servers: []naming.MountedServer{ |
| {Server: address}, |
| }} |
| } |
| // An empty set of protocols means all protocols... |
| if resolved.Servers, err = filterAndOrderServers(resolved.Servers, s.preferredProtocols, s.ipNets); err != nil { |
| return "", err |
| } |
| for _, n := range resolved.Names() { |
| address, suffix := naming.SplitAddressName(n) |
| if suffix != "" { |
| continue |
| } |
| if ep, err := inaming.NewEndpoint(address); err == nil { |
| return ep.String(), nil |
| } |
| } |
| return "", verror.New(errFailedToResolveToEndpoint, s.ctx, address) |
| } |
| |
| // createEndpoints creates appropriate inaming.Endpoint instances for |
| // all of the externally accessible network addresses that can be used |
| // to reach this server. |
| func (s *xserver) createEndpoints(lep naming.Endpoint, chooser netstate.AddressChooser) ([]*inaming.Endpoint, string, bool, error) { |
| iep, ok := lep.(*inaming.Endpoint) |
| if !ok { |
| return nil, "", false, verror.New(errInternalTypeConversion, nil, fmt.Sprintf("%T", lep)) |
| } |
| if !strings.HasPrefix(iep.Protocol, "tcp") && |
| !strings.HasPrefix(iep.Protocol, "ws") { |
| // If not tcp, ws, or wsh, just return the endpoint we were given. |
| return []*inaming.Endpoint{iep}, "", false, nil |
| } |
| host, port, err := net.SplitHostPort(iep.Address) |
| if err != nil { |
| return nil, "", false, err |
| } |
| addrs, unspecified, err := netstate.PossibleAddresses(iep.Protocol, host, chooser) |
| if err != nil { |
| return nil, port, false, err |
| } |
| |
| ieps := make([]*inaming.Endpoint, 0, len(addrs)) |
| for _, addr := range addrs { |
| n, err := inaming.NewEndpoint(lep.String()) |
| if err != nil { |
| return nil, port, false, err |
| } |
| n.IsMountTable = s.servesMountTable |
| n.Address = net.JoinHostPort(addr.String(), port) |
| ieps = append(ieps, n) |
| } |
| return ieps, port, unspecified, nil |
| } |
| |
| func (s *xserver) listen(ctx *context.T, listenSpec rpc.ListenSpec) error { |
| s.Lock() |
| defer s.Unlock() |
| var lastErr error |
| if len(listenSpec.Proxy) > 0 { |
| lastErr = s.flowMgr.Listen(ctx, inaming.Network, listenSpec.Proxy) |
| if lastErr != nil { |
| s.ctx.VI(2).Infof("Listen(%q, %q, ...) failed: %v", inaming.Network, listenSpec.Proxy, lastErr) |
| } |
| } |
| for _, addr := range listenSpec.Addrs { |
| if len(addr.Address) > 0 { |
| lastErr = s.flowMgr.Listen(ctx, addr.Protocol, addr.Address) |
| if lastErr != nil { |
| s.ctx.VI(2).Infof("Listen(%q, %q, ...) failed: %v", addr.Protocol, addr.Address, lastErr) |
| } |
| } |
| } |
| |
| leps := s.flowMgr.ListeningEndpoints() |
| if len(leps) == 0 { |
| return verror.New(verror.ErrBadArg, s.ctx, verror.New(errNoListeners, s.ctx, lastErr)) |
| } |
| |
| roaming := false |
| for _, ep := range leps { |
| eps, _, eproaming, eperr := s.createEndpoints(ep, listenSpec.AddressChooser) |
| s.chosenEndpoints = append(s.chosenEndpoints, eps...) |
| if eproaming && eperr == nil { |
| s.protoEndpoints = append(s.protoEndpoints, ep.(*inaming.Endpoint)) |
| roaming = true |
| } |
| } |
| |
| if roaming && s.dhcpState == nil && s.settingsPublisher != nil { |
| // TODO(mattr): Support roaming. |
| } |
| |
| s.active.Add(1) |
| go s.acceptLoop(ctx) |
| return nil |
| } |
| |
| func (s *xserver) acceptLoop(ctx *context.T) error { |
| var calls sync.WaitGroup |
| defer func() { |
| calls.Wait() |
| s.active.Done() |
| s.ctx.VI(1).Infof("rpc: Stopped accepting") |
| }() |
| for { |
| // TODO(mattr): We need to interrupt Accept at some point. |
| // Should we interrupt it by canceling the context? |
| fl, err := s.flowMgr.Accept(ctx) |
| if err != nil { |
| s.ctx.VI(10).Infof("rpc: Accept failed: %v", err) |
| return err |
| } |
| calls.Add(1) |
| go func(fl flow.Flow) { |
| defer calls.Done() |
| var ty [1]byte |
| if _, err := io.ReadFull(fl, ty[:]); err != nil { |
| s.ctx.VI(1).Infof("failed to read flow type: %v", err) |
| return |
| } |
| switch ty[0] { |
| case dataFlow: |
| fs, err := newXFlowServer(fl, s) |
| if err != nil { |
| s.ctx.VI(1).Infof("newFlowServer failed %v", err) |
| return |
| } |
| if err := fs.serve(); err != nil { |
| // TODO(caprita): Logging errors here is too spammy. For example, "not |
| // authorized" errors shouldn't be logged as server errors. |
| // TODO(cnicolaou): revisit this when verror2 transition is |
| // done. |
| if err != io.EOF { |
| s.ctx.VI(2).Infof("Flow.serve failed: %v", err) |
| } |
| } |
| case typeFlow: |
| write := s.typeCache.writer(fl.Conn()) |
| if write == nil { |
| s.ctx.VI(1).Infof("ignoring duplicate type flow.") |
| return |
| } |
| write(fl) |
| } |
| }(fl) |
| } |
| } |
| |
| func (s *xserver) AddName(name string) error { |
| defer apilog.LogCallf(nil, "name=%.10s...", name)(nil, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT |
| if len(name) == 0 { |
| return verror.New(verror.ErrBadArg, s.ctx, "name is empty") |
| } |
| s.Lock() |
| defer s.Unlock() |
| vtrace.GetSpan(s.ctx).Annotate("Serving under name: " + name) |
| s.publisher.AddName(name, s.servesMountTable, s.isLeaf) |
| return nil |
| } |
| |
| func (s *xserver) RemoveName(name string) { |
| defer apilog.LogCallf(nil, "name=%.10s...", name)(nil, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT |
| s.Lock() |
| defer s.Unlock() |
| vtrace.GetSpan(s.ctx).Annotate("Removed name: " + name) |
| s.publisher.RemoveName(name) |
| } |
| |
| func (s *xserver) Stop() error { |
| defer apilog.LogCall(nil)(nil) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT |
| |
| serverDebug := fmt.Sprintf("Dispatcher: %T, Status:[%v]", s.disp, s.Status()) |
| s.ctx.VI(1).Infof("Stop: %s", serverDebug) |
| defer s.ctx.VI(1).Infof("Stop done: %s", serverDebug) |
| |
| s.Lock() |
| if s.disp == nil { |
| s.Unlock() |
| return nil |
| } |
| s.disp = nil |
| close(s.stoppedChan) |
| s.Unlock() |
| |
| // Delete the stats object. |
| s.stats.stop() |
| |
| // Note, It's safe to Stop/WaitForStop on the publisher outside of the |
| // server lock, since publisher is safe for concurrent access. |
| // Stop the publisher, which triggers unmounting of published names. |
| s.publisher.Stop() |
| |
| // Wait for the publisher to be done unmounting before we can proceed to |
| // close the listeners (to minimize the number of mounted names pointing |
| // to endpoint that are no longer serving). |
| // |
| // TODO(caprita): See if make sense to fail fast on rejecting |
| // connections once listeners are closed, and parallelize the publisher |
| // and listener shutdown. |
| s.publisher.WaitForStop() |
| |
| s.Lock() |
| |
| // TODO(mattr): What should we do when we stop a server now? We need to |
| // interrupt Accept at some point, but it's weird to stop the flowmanager. |
| // Close all listeners. No new flows will be accepted, while in-flight |
| // flows will continue until they terminate naturally. |
| // nListeners := len(s.listeners) |
| // errCh := make(chan error, nListeners) |
| // for ln, _ := range s.listeners { |
| // go func(ln stream.Listener) { |
| // errCh <- ln.Close() |
| // }(ln) |
| // } |
| |
| if dhcp := s.dhcpState; dhcp != nil { |
| // TODO(cnicolaou,caprita): investigate not having to close and drain |
| // the channel here. It's a little awkward right now since we have to |
| // be careful to not close the channel in two places, i.e. here and |
| // and from the publisher's Shutdown method. |
| if err := dhcp.publisher.CloseFork(dhcp.name, dhcp.ch); err == nil { |
| drain: |
| for { |
| select { |
| case v := <-dhcp.ch: |
| if v == nil { |
| break drain |
| } |
| default: |
| close(dhcp.ch) |
| break drain |
| } |
| } |
| } |
| } |
| |
| s.Unlock() |
| |
| // At this point, we are guaranteed that no new requests are going to be |
| // accepted. |
| |
| // Wait for the publisher and active listener + flows to finish. |
| done := make(chan struct{}, 1) |
| go func() { s.active.Wait(); done <- struct{}{} }() |
| |
| select { |
| case <-done: |
| case <-time.After(5 * time.Second): |
| s.ctx.Errorf("%s: Timedout waiting for goroutines to stop", serverDebug) |
| // TODO(mattr): This doesn't make sense, shouldn't we not wait after timing out? |
| <-done |
| s.ctx.Infof("%s: Done waiting.", serverDebug) |
| } |
| |
| s.cancel() |
| return nil |
| } |
| |
| // flowServer implements the RPC server-side protocol for a single RPC, over a |
| // flow that's already connected to the client. |
| type xflowServer struct { |
| ctx *context.T // context associated with the RPC |
| server *xserver // rpc.Server that this flow server belongs to |
| disp rpc.Dispatcher // rpc.Dispatcher that will serve RPCs on this flow |
| dec *vom.Decoder // to decode requests and args from the client |
| enc *vom.Encoder // to encode responses and results to the client |
| flow flow.Flow // underlying flow |
| |
| // Fields filled in during the server invocation. |
| clientBlessings security.Blessings |
| ackBlessings bool |
| grantedBlessings security.Blessings |
| method, suffix string |
| tags []*vdl.Value |
| discharges map[string]security.Discharge |
| starttime time.Time |
| endStreamArgs bool // are the stream args at EOF? |
| } |
| |
| var ( |
| _ rpc.StreamServerCall = (*xflowServer)(nil) |
| _ security.Call = (*xflowServer)(nil) |
| ) |
| |
| func newXFlowServer(flow flow.Flow, server *xserver) (*xflowServer, error) { |
| server.Lock() |
| disp := server.disp |
| server.Unlock() |
| typeEnc, typeDec := server.typeCache.get(flow.Conn()) |
| fs := &xflowServer{ |
| ctx: server.ctx, |
| server: server, |
| disp: disp, |
| flow: flow, |
| enc: vom.NewEncoderWithTypeEncoder(flow, typeEnc), |
| dec: vom.NewDecoderWithTypeDecoder(flow, typeDec), |
| discharges: make(map[string]security.Discharge), |
| } |
| // TODO(toddw): Add logic to create separate type flows! |
| return fs, nil |
| } |
| |
| // authorizeVtrace works by simulating a call to __debug/vtrace.Trace. That |
| // rpc is essentially equivalent in power to the data we are attempting to |
| // attach here. |
| func (fs *xflowServer) authorizeVtrace(ctx *context.T) error { |
| // Set up a context as though we were calling __debug/vtrace. |
| params := &security.CallParams{} |
| params.Copy(fs) |
| params.Method = "Trace" |
| params.MethodTags = []*vdl.Value{vdl.ValueOf(access.Debug)} |
| params.Suffix = "__debug/vtrace" |
| |
| var auth security.Authorizer |
| if fs.server.dispReserved != nil { |
| _, auth, _ = fs.server.dispReserved.Lookup(ctx, params.Suffix) |
| } |
| return authorize(fs.ctx, security.NewCall(params), auth) |
| } |
| |
| func (fs *xflowServer) serve() error { |
| defer fs.flow.Close() |
| |
| results, err := fs.processRequest() |
| |
| vtrace.GetSpan(fs.ctx).Finish() |
| |
| var traceResponse vtrace.Response |
| // Check if the caller is permitted to view vtrace data. |
| if fs.authorizeVtrace(fs.ctx) == nil { |
| traceResponse = vtrace.GetResponse(fs.ctx) |
| } |
| |
| // Respond to the client with the response header and positional results. |
| response := rpc.Response{ |
| Error: err, |
| EndStreamResults: true, |
| NumPosResults: uint64(len(results)), |
| TraceResponse: traceResponse, |
| AckBlessings: fs.ackBlessings, |
| } |
| if err := fs.enc.Encode(response); err != nil { |
| if err == io.EOF { |
| return err |
| } |
| return verror.New(errResponseEncoding, fs.ctx, fs.LocalEndpoint().String(), fs.RemoteEndpoint().String(), err) |
| } |
| if response.Error != nil { |
| return response.Error |
| } |
| for ix, res := range results { |
| if err := fs.enc.Encode(res); err != nil { |
| if err == io.EOF { |
| return err |
| } |
| return verror.New(errResultEncoding, fs.ctx, ix, fmt.Sprintf("%T=%v", res, res), err) |
| } |
| } |
| // TODO(ashankar): Should unread data from the flow be drained? |
| // |
| // Reason to do so: |
| // The common stream.Flow implementation (v.io/x/ref/runtime/internal/rpc/stream/vc/reader.go) |
| // uses iobuf.Slices backed by an iobuf.Pool. If the stream is not drained, these |
| // slices will not be returned to the pool leading to possibly increased memory usage. |
| // |
| // Reason to not do so: |
| // Draining here will conflict with any Reads on the flow in a separate goroutine |
| // (for example, see TestStreamReadTerminatedByServer in full_test.go). |
| // |
| // For now, go with the reason to not do so as having unread data in the stream |
| // should be a rare case. |
| return nil |
| } |
| |
| func (fs *xflowServer) readRPCRequest() (*rpc.Request, error) { |
| // TODO(toddw): How do we set the initial timeout? It might be shorter than |
| // the timeout we set later, which we learn after we've decoded the request. |
| /* |
| // Set a default timeout before reading from the flow. Without this timeout, |
| // a client that sends no request or a partial request will retain the flow |
| // indefinitely (and lock up server resources). |
| initTimer := newTimer(defaultCallTimeout) |
| defer initTimer.Stop() |
| fs.flow.SetDeadline(initTimer.C) |
| */ |
| |
| // Decode the initial request. |
| var req rpc.Request |
| if err := fs.dec.Decode(&req); err != nil { |
| return nil, verror.New(verror.ErrBadProtocol, fs.ctx, newErrBadRequest(fs.ctx, err)) |
| } |
| return &req, nil |
| } |
| |
| func (fs *xflowServer) processRequest() ([]interface{}, error) { |
| fs.starttime = time.Now() |
| req, err := fs.readRPCRequest() |
| if err != nil { |
| // We don't know what the rpc call was supposed to be, but we'll create |
| // a placeholder span so we can capture annotations. |
| fs.ctx, _ = vtrace.WithNewSpan(fs.ctx, fmt.Sprintf("\"%s\".UNKNOWN", fs.suffix)) |
| return nil, err |
| } |
| // We must call fs.drainDecoderArgs for any error that occurs |
| // after this point, and before we actually decode the arguments. |
| fs.method = req.Method |
| fs.suffix = strings.TrimLeft(req.Suffix, "/") |
| |
| if req.Language != "" { |
| fs.ctx = i18n.WithLangID(fs.ctx, i18n.LangID(req.Language)) |
| } |
| |
| // TODO(mattr): Currently this allows users to trigger trace collection |
| // on the server even if they will not be allowed to collect the |
| // results later. This might be considered a DOS vector. |
| spanName := fmt.Sprintf("\"%s\".%s", fs.suffix, fs.method) |
| fs.ctx, _ = vtrace.WithContinuedTrace(fs.ctx, spanName, req.TraceRequest) |
| |
| var cancel context.CancelFunc |
| if !req.Deadline.IsZero() { |
| fs.ctx, cancel = context.WithDeadline(fs.ctx, req.Deadline.Time) |
| } else { |
| fs.ctx, cancel = context.WithCancel(fs.ctx) |
| } |
| fs.flow.SetContext(fs.ctx) |
| // TODO(toddw): Explicitly cancel the context when the flow is done. |
| _ = cancel |
| |
| // Initialize security: blessings, discharges, etc. |
| if err := fs.initSecurity(req); err != nil { |
| fs.drainDecoderArgs(int(req.NumPosArgs)) |
| return nil, err |
| } |
| // Lookup the invoker. |
| invoker, auth, err := fs.lookup(fs.suffix, fs.method) |
| if err != nil { |
| fs.drainDecoderArgs(int(req.NumPosArgs)) |
| return nil, err |
| } |
| |
| // Note that we strip the reserved prefix when calling the invoker so |
| // that __Glob will call Glob. Note that we've already assigned a |
| // special invoker so that we never call the wrong method by mistake. |
| strippedMethod := naming.StripReserved(fs.method) |
| |
| // Prepare invoker and decode args. |
| numArgs := int(req.NumPosArgs) |
| argptrs, tags, err := invoker.Prepare(fs.ctx, strippedMethod, numArgs) |
| fs.tags = tags |
| if err != nil { |
| fs.drainDecoderArgs(numArgs) |
| return nil, err |
| } |
| if called, want := req.NumPosArgs, uint64(len(argptrs)); called != want { |
| fs.drainDecoderArgs(numArgs) |
| return nil, newErrBadNumInputArgs(fs.ctx, fs.suffix, fs.method, called, want) |
| } |
| for ix, argptr := range argptrs { |
| if err := fs.dec.Decode(argptr); err != nil { |
| return nil, newErrBadInputArg(fs.ctx, fs.suffix, fs.method, uint64(ix), err) |
| } |
| } |
| |
| // Check application's authorization policy. |
| if err := authorize(fs.ctx, fs, auth); err != nil { |
| return nil, err |
| } |
| |
| // Invoke the method. |
| results, err := invoker.Invoke(fs.ctx, fs, strippedMethod, argptrs) |
| fs.server.stats.record(fs.method, time.Since(fs.starttime)) |
| return results, err |
| } |
| |
| // drainDecoderArgs drains the next n arguments encoded onto the flows decoder. |
| // This is needed to ensure that the client is able to encode all of its args |
| // before the server closes its flow. This guarantees that the client will |
| // consistently get the server's error response. |
| // TODO(suharshs): Figure out a better way to solve this race condition without |
| // unnecessarily reading all arguments. |
| func (fs *xflowServer) drainDecoderArgs(n int) error { |
| for i := 0; i < n; i++ { |
| if err := fs.dec.Ignore(); err != nil { |
| return err |
| } |
| } |
| return nil |
| } |
| |
| // lookup returns the invoker and authorizer responsible for serving the given |
| // name and method. The suffix is stripped of any leading slashes. If it begins |
| // with rpc.DebugKeyword, we use the internal debug dispatcher to look up the |
| // invoker. Otherwise, and we use the server's dispatcher. The suffix and method |
| // value may be modified to match the actual suffix and method to use. |
| func (fs *xflowServer) lookup(suffix string, method string) (rpc.Invoker, security.Authorizer, error) { |
| if naming.IsReserved(method) { |
| return reservedInvoker(fs.disp, fs.server.dispReserved), security.AllowEveryone(), nil |
| } |
| disp := fs.disp |
| if naming.IsReserved(suffix) { |
| disp = fs.server.dispReserved |
| } else if fs.server.isLeaf && suffix != "" { |
| innerErr := verror.New(errUnexpectedSuffix, fs.ctx, suffix) |
| return nil, nil, verror.New(verror.ErrUnknownSuffix, fs.ctx, suffix, innerErr) |
| } |
| if disp != nil { |
| obj, auth, err := disp.Lookup(fs.ctx, suffix) |
| switch { |
| case err != nil: |
| return nil, nil, err |
| case obj != nil: |
| invoker, err := objectToInvoker(obj) |
| if err != nil { |
| return nil, nil, verror.New(verror.ErrInternal, fs.ctx, "invalid received object", err) |
| } |
| return invoker, auth, nil |
| } |
| } |
| return nil, nil, verror.New(verror.ErrUnknownSuffix, fs.ctx, suffix) |
| } |
| |
| func (fs *xflowServer) initSecurity(req *rpc.Request) error { |
| // TODO(toddw): Do something with this. |
| /* |
| // LocalPrincipal is nil which means we are operating under |
| // SecurityNone. |
| if fs.LocalPrincipal() == nil { |
| return nil |
| } |
| |
| // If additional credentials are provided, make them available in the context |
| // Detect unusable blessings now, rather then discovering they are unusable on |
| // first use. |
| // |
| // TODO(ashankar,ataly): Potential confused deputy attack: The client provides |
| // the server's identity as the blessing. Figure out what we want to do about |
| // this - should servers be able to assume that a blessing is something that |
| // does not have the authorizations that the server's own identity has? |
| if got, want := req.GrantedBlessings.PublicKey(), fs.LocalPrincipal().PublicKey(); got != nil && !reflect.DeepEqual(got, want) { |
| return verror.New(verror.ErrNoAccess, fs.ctx, fmt.Sprintf("blessing granted not bound to this server(%v vs %v)", got, want)) |
| } |
| fs.grantedBlessings = req.GrantedBlessings |
| |
| var err error |
| if fs.clientBlessings, err = serverDecodeBlessings(fs.flow.VCDataCache(), req.Blessings, fs.server.stats); err != nil { |
| // When the server can't access the blessings cache, the client is not following |
| // protocol, so the server closes the VCs corresponding to the client endpoint. |
| // TODO(suharshs,toddw): Figure out a way to only shutdown the current VC, instead |
| // of all VCs connected to the RemoteEndpoint. |
| fs.server.streamMgr.ShutdownEndpoint(fs.RemoteEndpoint()) |
| return verror.New(verror.ErrBadProtocol, fs.ctx, newErrBadBlessingsCache(fs.ctx, err)) |
| } |
| // Verify that the blessings sent by the client in the request have the same public |
| // key as those sent by the client during VC establishment. |
| if got, want := fs.clientBlessings.PublicKey(), fs.flow.RemoteBlessings().PublicKey(); got != nil && !reflect.DeepEqual(got, want) { |
| return verror.New(verror.ErrNoAccess, fs.ctx, fmt.Sprintf("blessings sent with the request are bound to a different public key (%v) from the blessing used during VC establishment (%v)", got, want)) |
| } |
| fs.ackBlessings = true |
| |
| for _, d := range req.Discharges { |
| fs.discharges[d.ID()] = d |
| } |
| */ |
| return nil |
| } |
| |
| // Send implements the rpc.Stream method. |
| func (fs *xflowServer) Send(item interface{}) error { |
| defer apilog.LogCallf(nil, "item=")(nil, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT |
| // The empty response header indicates what follows is a streaming result. |
| if err := fs.enc.Encode(rpc.Response{}); err != nil { |
| return err |
| } |
| return fs.enc.Encode(item) |
| } |
| |
| // Recv implements the rpc.Stream method. |
| func (fs *xflowServer) Recv(itemptr interface{}) error { |
| defer apilog.LogCallf(nil, "itemptr=")(nil, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT |
| var req rpc.Request |
| if err := fs.dec.Decode(&req); err != nil { |
| return err |
| } |
| if req.EndStreamArgs { |
| fs.endStreamArgs = true |
| return io.EOF |
| } |
| return fs.dec.Decode(itemptr) |
| } |
| |
| // Implementations of rpc.ServerCall and security.Call methods. |
| |
| func (fs *xflowServer) Security() security.Call { |
| //nologcall |
| return fs |
| } |
| func (fs *xflowServer) LocalDischarges() map[string]security.Discharge { |
| //nologcall |
| return fs.flow.LocalDischarges() |
| } |
| func (fs *xflowServer) RemoteDischarges() map[string]security.Discharge { |
| //nologcall |
| return fs.flow.RemoteDischarges() |
| } |
| func (fs *xflowServer) Server() rpc.Server { |
| //nologcall |
| return nil // TODO(toddw): Change return to rpc.Server |
| } |
| func (fs *xflowServer) Timestamp() time.Time { |
| //nologcall |
| return fs.starttime |
| } |
| func (fs *xflowServer) Method() string { |
| //nologcall |
| return fs.method |
| } |
| func (fs *xflowServer) MethodTags() []*vdl.Value { |
| //nologcall |
| return fs.tags |
| } |
| func (fs *xflowServer) Suffix() string { |
| //nologcall |
| return fs.suffix |
| } |
| func (fs *xflowServer) LocalPrincipal() security.Principal { |
| //nologcall |
| return fs.server.principal |
| } |
| func (fs *xflowServer) LocalBlessings() security.Blessings { |
| //nologcall |
| return fs.flow.LocalBlessings() |
| } |
| func (fs *xflowServer) RemoteBlessings() security.Blessings { |
| //nologcall |
| return fs.flow.RemoteBlessings() |
| } |
| func (fs *xflowServer) GrantedBlessings() security.Blessings { |
| //nologcall |
| return fs.grantedBlessings |
| } |
| func (fs *xflowServer) LocalEndpoint() naming.Endpoint { |
| //nologcall |
| return fs.flow.Conn().LocalEndpoint() |
| } |
| func (fs *xflowServer) RemoteEndpoint() naming.Endpoint { |
| //nologcall |
| return fs.flow.Conn().RemoteEndpoint() |
| } |