Merge "runtime/internal/rpc: Add initial xserver, based on v23/flow."
diff --git a/runtime/internal/rpc/xserver.go b/runtime/internal/rpc/xserver.go
new file mode 100644
index 0000000..3907975
--- /dev/null
+++ b/runtime/internal/rpc/xserver.go
@@ -0,0 +1,815 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package rpc
+
+import (
+	"fmt"
+	"io"
+	"net"
+	"strings"
+	"sync"
+	"time"
+
+	"v.io/x/lib/netstate"
+
+	"v.io/v23"
+	"v.io/v23/context"
+	"v.io/v23/flow"
+	"v.io/v23/i18n"
+	"v.io/v23/namespace"
+	"v.io/v23/naming"
+	"v.io/v23/options"
+	"v.io/v23/rpc"
+	"v.io/v23/security"
+	"v.io/v23/security/access"
+	"v.io/v23/vdl"
+	"v.io/v23/verror"
+	"v.io/v23/vom"
+	"v.io/v23/vtrace"
+
+	"v.io/x/ref/lib/apilog"
+	"v.io/x/ref/lib/pubsub"
+	"v.io/x/ref/lib/stats"
+	"v.io/x/ref/runtime/internal/lib/publisher"
+	inaming "v.io/x/ref/runtime/internal/naming"
+)
+
+// TODO(mattr): add/removeAddresses
+// TODO(mattr): dhcpLoop
+
+type xserver struct {
+	sync.Mutex
+	// context used by the server to make internal RPCs, error messages etc.
+	ctx               *context.T
+	cancel            context.CancelFunc // function to cancel the above context.
+	flowMgr           flow.Manager
+	publisher         publisher.Publisher // publisher to publish mounttable mounts.
+	settingsPublisher *pubsub.Publisher   // pubsub publisher for dhcp
+	settingsName      string              // pubwsub stream name for dhcp
+	dhcpState         *dhcpState          // dhcpState, nil if not using dhcp
+	principal         security.Principal
+	blessings         security.Blessings
+	protoEndpoints    []*inaming.Endpoint
+	chosenEndpoints   []*inaming.Endpoint
+
+	// state of proxies keyed by the name of the proxy
+	proxies map[string]proxyState
+
+	disp               rpc.Dispatcher // dispatcher to serve RPCs
+	dispReserved       rpc.Dispatcher // dispatcher for reserved methods
+	active             sync.WaitGroup // active goroutines we've spawned.
+	stoppedChan        chan struct{}  // closed when the server has been stopped.
+	preferredProtocols []string       // protocols to use when resolving proxy name to endpoint.
+	// We cache the IP networks on the device since it is not that cheap to read
+	// network interfaces through os syscall.
+	// TODO(jhahn): Add monitoring the network interface changes.
+	ipNets           []*net.IPNet
+	ns               namespace.T
+	servesMountTable bool
+	isLeaf           bool
+
+	// TODO(cnicolaou): add roaming stats to rpcStats
+	stats *rpcStats // stats for this server.
+}
+
+func InternalNewXServer(ctx *context.T, settingsPublisher *pubsub.Publisher, settingsName string, opts ...rpc.ServerOpt) (rpc.XServer, error) {
+	ctx, cancel := context.WithRootCancel(ctx)
+	flowMgr := v23.ExperimentalGetFlowManager(ctx)
+	ns, principal := v23.GetNamespace(ctx), v23.GetPrincipal(ctx)
+	statsPrefix := naming.Join("rpc", "server", "routing-id", flowMgr.RoutingID().String())
+	s := &xserver{
+		ctx:               ctx,
+		cancel:            cancel,
+		flowMgr:           flowMgr,
+		principal:         principal,
+		blessings:         principal.BlessingStore().Default(),
+		publisher:         publisher.New(ctx, ns, publishPeriod),
+		proxies:           make(map[string]proxyState),
+		stoppedChan:       make(chan struct{}),
+		ns:                ns,
+		stats:             newRPCStats(statsPrefix),
+		settingsPublisher: settingsPublisher,
+		settingsName:      settingsName,
+	}
+	ipNets, err := ipNetworks()
+	if err != nil {
+		return nil, err
+	}
+	s.ipNets = ipNets
+
+	for _, opt := range opts {
+		switch opt := opt.(type) {
+		case options.ServesMountTable:
+			s.servesMountTable = bool(opt)
+		case options.IsLeaf:
+			s.isLeaf = bool(opt)
+		case ReservedNameDispatcher:
+			s.dispReserved = opt.Dispatcher
+		case PreferredServerResolveProtocols:
+			s.preferredProtocols = []string(opt)
+		}
+	}
+
+	blessingsStatsName := naming.Join(statsPrefix, "security", "blessings")
+	// TODO(caprita): revist printing the blessings with %s, and
+	// instead expose them as a list.
+	stats.NewString(blessingsStatsName).Set(fmt.Sprintf("%s", s.blessings))
+	stats.NewStringFunc(blessingsStatsName, func() string {
+		return fmt.Sprintf("%s (default)", s.principal.BlessingStore().Default())
+	})
+	return s, nil
+}
+
+func (s *xserver) Status() rpc.ServerStatus {
+	return rpc.ServerStatus{}
+}
+
+func (s *xserver) WatchNetwork(ch chan<- rpc.NetworkChange) {
+	defer apilog.LogCallf(nil, "ch=")(nil, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
+	s.Lock()
+	defer s.Unlock()
+	if s.dhcpState != nil {
+		s.dhcpState.watchers[ch] = struct{}{}
+	}
+}
+
+func (s *xserver) UnwatchNetwork(ch chan<- rpc.NetworkChange) {
+	defer apilog.LogCallf(nil, "ch=")(nil, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
+	s.Lock()
+	defer s.Unlock()
+	if s.dhcpState != nil {
+		delete(s.dhcpState.watchers, ch)
+	}
+}
+
+// resolveToEndpoint resolves an object name or address to an endpoint.
+func (s *xserver) resolveToEndpoint(address string) (string, error) {
+	var resolved *naming.MountEntry
+	var err error
+	if s.ns != nil {
+		if resolved, err = s.ns.Resolve(s.ctx, address); err != nil {
+			return "", err
+		}
+	} else {
+		// Fake a namespace resolution
+		resolved = &naming.MountEntry{Servers: []naming.MountedServer{
+			{Server: address},
+		}}
+	}
+	// An empty set of protocols means all protocols...
+	if resolved.Servers, err = filterAndOrderServers(resolved.Servers, s.preferredProtocols, s.ipNets); err != nil {
+		return "", err
+	}
+	for _, n := range resolved.Names() {
+		address, suffix := naming.SplitAddressName(n)
+		if suffix != "" {
+			continue
+		}
+		if ep, err := inaming.NewEndpoint(address); err == nil {
+			return ep.String(), nil
+		}
+	}
+	return "", verror.New(errFailedToResolveToEndpoint, s.ctx, address)
+}
+
+// createEndpoints creates appropriate inaming.Endpoint instances for
+// all of the externally accessible network addresses that can be used
+// to reach this server.
+func (s *xserver) createEndpoints(lep naming.Endpoint, chooser netstate.AddressChooser) ([]*inaming.Endpoint, string, bool, error) {
+	iep, ok := lep.(*inaming.Endpoint)
+	if !ok {
+		return nil, "", false, verror.New(errInternalTypeConversion, nil, fmt.Sprintf("%T", lep))
+	}
+	if !strings.HasPrefix(iep.Protocol, "tcp") &&
+		!strings.HasPrefix(iep.Protocol, "ws") {
+		// If not tcp, ws, or wsh, just return the endpoint we were given.
+		return []*inaming.Endpoint{iep}, "", false, nil
+	}
+	host, port, err := net.SplitHostPort(iep.Address)
+	if err != nil {
+		return nil, "", false, err
+	}
+	addrs, unspecified, err := netstate.PossibleAddresses(iep.Protocol, host, chooser)
+	if err != nil {
+		return nil, port, false, err
+	}
+
+	ieps := make([]*inaming.Endpoint, 0, len(addrs))
+	for _, addr := range addrs {
+		n, err := inaming.NewEndpoint(lep.String())
+		if err != nil {
+			return nil, port, false, err
+		}
+		n.IsMountTable = s.servesMountTable
+		n.Address = net.JoinHostPort(addr.String(), port)
+		ieps = append(ieps, n)
+	}
+	return ieps, port, unspecified, nil
+}
+
+func (s *xserver) listen(ctx *context.T, listenSpec rpc.ListenSpec) error {
+	s.Lock()
+	defer s.Unlock()
+
+	var lastErr error
+	for _, addr := range listenSpec.Addrs {
+		if len(addr.Address) > 0 {
+			lastErr = s.flowMgr.Listen(ctx, addr.Protocol, addr.Address)
+			s.ctx.VI(2).Infof("Listen(%q, %q, ...) failed: %v", addr.Protocol, addr.Address, lastErr)
+		}
+	}
+
+	leps := s.flowMgr.ListeningEndpoints()
+	if len(leps) == 0 {
+		return verror.New(verror.ErrBadArg, s.ctx, verror.New(errNoListeners, s.ctx, lastErr))
+	}
+
+	roaming := false
+	for _, ep := range leps {
+		eps, _, eproaming, eperr := s.createEndpoints(ep, listenSpec.AddressChooser)
+		s.chosenEndpoints = append(s.chosenEndpoints, eps...)
+		if eproaming && eperr == nil {
+			s.protoEndpoints = append(s.protoEndpoints, ep.(*inaming.Endpoint))
+			roaming = true
+		}
+	}
+
+	if roaming && s.dhcpState == nil && s.settingsPublisher != nil {
+		// TODO(mattr): Support roaming.
+	}
+
+	s.active.Add(1)
+	go s.acceptLoop(ctx)
+	return nil
+}
+
+func (s *xserver) acceptLoop(ctx *context.T) error {
+	var calls sync.WaitGroup
+	defer func() {
+		calls.Wait()
+		s.active.Done()
+		s.ctx.VI(1).Infof("rpc: Stopped accepting")
+	}()
+	for {
+		// TODO(mattr): We need to interrupt Accept at some point.
+		// Should we interrupt it by canceling the context?
+		fl, err := s.flowMgr.Accept(ctx)
+		if err != nil {
+			s.ctx.VI(10).Infof("rpc: Accept failed: %v", err)
+			return err
+		}
+		calls.Add(1)
+		go func(fl flow.Flow) {
+			defer calls.Done()
+			fs, err := newXFlowServer(fl, s)
+			if err != nil {
+				s.ctx.VI(1).Infof("newFlowServer on %v failed", err)
+				return
+			}
+			if err := fs.serve(); err != nil {
+				// TODO(caprita): Logging errors here is too spammy. For example, "not
+				// authorized" errors shouldn't be logged as server errors.
+				// TODO(cnicolaou): revisit this when verror2 transition is
+				// done.
+				if err != io.EOF {
+					s.ctx.VI(2).Infof("Flow.serve failed: %v", err)
+				}
+			}
+		}(fl)
+	}
+}
+
+func (s *server) serve(name string, obj interface{}, authorizer security.Authorizer) error {
+	if obj == nil {
+		return verror.New(verror.ErrBadArg, s.ctx, "nil object")
+	}
+	invoker, err := objectToInvoker(obj)
+	if err != nil {
+		return verror.New(verror.ErrBadArg, s.ctx, fmt.Sprintf("bad object: %v", err))
+	}
+	// TODO(mattr): Does this really need to be locked?
+	s.Lock()
+	s.isLeaf = true
+	s.Unlock()
+	return s.ServeDispatcher(name, &leafDispatcher{invoker, authorizer})
+}
+
+func (s *xserver) serveDispatcher(name string, disp rpc.Dispatcher) error {
+	if disp == nil {
+		return verror.New(verror.ErrBadArg, s.ctx, "nil dispatcher")
+	}
+	s.Lock()
+	defer s.Unlock()
+	vtrace.GetSpan(s.ctx).Annotate("Serving under name: " + name)
+	s.disp = disp
+	if len(name) > 0 {
+		for _, ep := range s.chosenEndpoints {
+			s.publisher.AddServer(ep.String())
+		}
+		s.publisher.AddName(name, s.servesMountTable, s.isLeaf)
+	}
+	return nil
+}
+
+func (s *xserver) AddName(name string) error {
+	defer apilog.LogCallf(nil, "name=%.10s...", name)(nil, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
+	if len(name) == 0 {
+		return verror.New(verror.ErrBadArg, s.ctx, "name is empty")
+	}
+	s.Lock()
+	defer s.Unlock()
+	vtrace.GetSpan(s.ctx).Annotate("Serving under name: " + name)
+	s.publisher.AddName(name, s.servesMountTable, s.isLeaf)
+	return nil
+}
+
+func (s *xserver) RemoveName(name string) {
+	defer apilog.LogCallf(nil, "name=%.10s...", name)(nil, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
+	s.Lock()
+	defer s.Unlock()
+	vtrace.GetSpan(s.ctx).Annotate("Removed name: " + name)
+	s.publisher.RemoveName(name)
+}
+
+func (s *xserver) Stop() error {
+	defer apilog.LogCall(nil)(nil) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
+
+	serverDebug := fmt.Sprintf("Dispatcher: %T, Status:[%v]", s.disp, s.Status())
+	s.ctx.VI(1).Infof("Stop: %s", serverDebug)
+	defer s.ctx.VI(1).Infof("Stop done: %s", serverDebug)
+
+	s.Lock()
+	if s.disp == nil {
+		s.Unlock()
+		return nil
+	}
+	s.disp = nil
+	close(s.stoppedChan)
+	s.Unlock()
+
+	// Delete the stats object.
+	s.stats.stop()
+
+	// Note, It's safe to Stop/WaitForStop on the publisher outside of the
+	// server lock, since publisher is safe for concurrent access.
+	// Stop the publisher, which triggers unmounting of published names.
+	s.publisher.Stop()
+
+	// Wait for the publisher to be done unmounting before we can proceed to
+	// close the listeners (to minimize the number of mounted names pointing
+	// to endpoint that are no longer serving).
+	//
+	// TODO(caprita): See if make sense to fail fast on rejecting
+	// connections once listeners are closed, and parallelize the publisher
+	// and listener shutdown.
+	s.publisher.WaitForStop()
+
+	s.Lock()
+
+	// TODO(mattr): What should we do when we stop a server now?  We need to
+	// interrupt Accept at some point, but it's weird to stop the flowmanager.
+	// Close all listeners.  No new flows will be accepted, while in-flight
+	// flows will continue until they terminate naturally.
+	// nListeners := len(s.listeners)
+	// errCh := make(chan error, nListeners)
+	// for ln, _ := range s.listeners {
+	// 	go func(ln stream.Listener) {
+	// 		errCh <- ln.Close()
+	// 	}(ln)
+	// }
+
+	if dhcp := s.dhcpState; dhcp != nil {
+		// TODO(cnicolaou,caprita): investigate not having to close and drain
+		// the channel here. It's a little awkward right now since we have to
+		// be careful to not close the channel in two places, i.e. here and
+		// and from the publisher's Shutdown method.
+		if err := dhcp.publisher.CloseFork(dhcp.name, dhcp.ch); err == nil {
+		drain:
+			for {
+				select {
+				case v := <-dhcp.ch:
+					if v == nil {
+						break drain
+					}
+				default:
+					close(dhcp.ch)
+					break drain
+				}
+			}
+		}
+	}
+
+	s.Unlock()
+
+	// At this point, we are guaranteed that no new requests are going to be
+	// accepted.
+
+	// Wait for the publisher and active listener + flows to finish.
+	done := make(chan struct{}, 1)
+	go func() { s.active.Wait(); done <- struct{}{} }()
+
+	select {
+	case <-done:
+	case <-time.After(5 * time.Second):
+		s.ctx.Errorf("%s: Timedout waiting for goroutines to stop", serverDebug)
+		// TODO(mattr): This doesn't make sense, shouldn't we not wait after timing out?
+		<-done
+		s.ctx.Infof("%s: Done waiting.", serverDebug)
+	}
+
+	s.cancel()
+	return nil
+}
+
+// flowServer implements the RPC server-side protocol for a single RPC, over a
+// flow that's already connected to the client.
+type xflowServer struct {
+	ctx    *context.T     // context associated with the RPC
+	server *xserver       // rpc.Server that this flow server belongs to
+	disp   rpc.Dispatcher // rpc.Dispatcher that will serve RPCs on this flow
+	dec    *vom.Decoder   // to decode requests and args from the client
+	enc    *vom.Encoder   // to encode responses and results to the client
+	flow   flow.Flow      // underlying flow
+
+	// Fields filled in during the server invocation.
+	clientBlessings  security.Blessings
+	ackBlessings     bool
+	grantedBlessings security.Blessings
+	method, suffix   string
+	tags             []*vdl.Value
+	discharges       map[string]security.Discharge
+	starttime        time.Time
+	endStreamArgs    bool // are the stream args at EOF?
+}
+
+var (
+	_ rpc.StreamServerCall = (*xflowServer)(nil)
+	_ security.Call        = (*xflowServer)(nil)
+)
+
+func newXFlowServer(flow flow.Flow, server *xserver) (*xflowServer, error) {
+	server.Lock()
+	disp := server.disp
+	server.Unlock()
+
+	fs := &xflowServer{
+		ctx:        server.ctx,
+		server:     server,
+		disp:       disp,
+		flow:       flow,
+		enc:        vom.NewEncoder(flow),
+		dec:        vom.NewDecoder(flow),
+		discharges: make(map[string]security.Discharge),
+	}
+	// TODO(toddw): Add logic to create separate type flows!
+	return fs, nil
+}
+
+// authorizeVtrace works by simulating a call to __debug/vtrace.Trace.  That
+// rpc is essentially equivalent in power to the data we are attempting to
+// attach here.
+func (fs *xflowServer) authorizeVtrace(ctx *context.T) error {
+	// Set up a context as though we were calling __debug/vtrace.
+	params := &security.CallParams{}
+	params.Copy(fs)
+	params.Method = "Trace"
+	params.MethodTags = []*vdl.Value{vdl.ValueOf(access.Debug)}
+	params.Suffix = "__debug/vtrace"
+
+	var auth security.Authorizer
+	if fs.server.dispReserved != nil {
+		_, auth, _ = fs.server.dispReserved.Lookup(ctx, params.Suffix)
+	}
+	return authorize(fs.ctx, security.NewCall(params), auth)
+}
+
+func (fs *xflowServer) serve() error {
+	defer fs.flow.Close()
+
+	results, err := fs.processRequest()
+
+	vtrace.GetSpan(fs.ctx).Finish()
+
+	var traceResponse vtrace.Response
+	// Check if the caller is permitted to view vtrace data.
+	if fs.authorizeVtrace(fs.ctx) == nil {
+		traceResponse = vtrace.GetResponse(fs.ctx)
+	}
+
+	// Respond to the client with the response header and positional results.
+	response := rpc.Response{
+		Error:            err,
+		EndStreamResults: true,
+		NumPosResults:    uint64(len(results)),
+		TraceResponse:    traceResponse,
+		AckBlessings:     fs.ackBlessings,
+	}
+	if err := fs.enc.Encode(response); err != nil {
+		if err == io.EOF {
+			return err
+		}
+		return verror.New(errResponseEncoding, fs.ctx, fs.LocalEndpoint().String(), fs.RemoteEndpoint().String(), err)
+	}
+	if response.Error != nil {
+		return response.Error
+	}
+	for ix, res := range results {
+		if err := fs.enc.Encode(res); err != nil {
+			if err == io.EOF {
+				return err
+			}
+			return verror.New(errResultEncoding, fs.ctx, ix, fmt.Sprintf("%T=%v", res, res), err)
+		}
+	}
+	// TODO(ashankar): Should unread data from the flow be drained?
+	//
+	// Reason to do so:
+	// The common stream.Flow implementation (v.io/x/ref/runtime/internal/rpc/stream/vc/reader.go)
+	// uses iobuf.Slices backed by an iobuf.Pool. If the stream is not drained, these
+	// slices will not be returned to the pool leading to possibly increased memory usage.
+	//
+	// Reason to not do so:
+	// Draining here will conflict with any Reads on the flow in a separate goroutine
+	// (for example, see TestStreamReadTerminatedByServer in full_test.go).
+	//
+	// For now, go with the reason to not do so as having unread data in the stream
+	// should be a rare case.
+	return nil
+}
+
+func (fs *xflowServer) readRPCRequest() (*rpc.Request, error) {
+	// TODO(toddw): How do we set the initial timeout?  It might be shorter than
+	// the timeout we set later, which we learn after we've decoded the request.
+	/*
+		// Set a default timeout before reading from the flow. Without this timeout,
+		// a client that sends no request or a partial request will retain the flow
+		// indefinitely (and lock up server resources).
+		initTimer := newTimer(defaultCallTimeout)
+		defer initTimer.Stop()
+		fs.flow.SetDeadline(initTimer.C)
+	*/
+
+	// Decode the initial request.
+	var req rpc.Request
+	if err := fs.dec.Decode(&req); err != nil {
+		return nil, verror.New(verror.ErrBadProtocol, fs.ctx, newErrBadRequest(fs.ctx, err))
+	}
+	return &req, nil
+}
+
+func (fs *xflowServer) processRequest() ([]interface{}, error) {
+	fs.starttime = time.Now()
+	req, err := fs.readRPCRequest()
+	if err != nil {
+		// We don't know what the rpc call was supposed to be, but we'll create
+		// a placeholder span so we can capture annotations.
+		fs.ctx, _ = vtrace.WithNewSpan(fs.ctx, fmt.Sprintf("\"%s\".UNKNOWN", fs.suffix))
+		return nil, err
+	}
+	// We must call fs.drainDecoderArgs for any error that occurs
+	// after this point, and before we actually decode the arguments.
+	fs.method = req.Method
+	fs.suffix = strings.TrimLeft(req.Suffix, "/")
+
+	if req.Language != "" {
+		fs.ctx = i18n.WithLangID(fs.ctx, i18n.LangID(req.Language))
+	}
+
+	// TODO(mattr): Currently this allows users to trigger trace collection
+	// on the server even if they will not be allowed to collect the
+	// results later.  This might be considered a DOS vector.
+	spanName := fmt.Sprintf("\"%s\".%s", fs.suffix, fs.method)
+	fs.ctx, _ = vtrace.WithContinuedTrace(fs.ctx, spanName, req.TraceRequest)
+
+	var cancel context.CancelFunc
+	if !req.Deadline.IsZero() {
+		fs.ctx, cancel = context.WithDeadline(fs.ctx, req.Deadline.Time)
+	} else {
+		fs.ctx, cancel = context.WithCancel(fs.ctx)
+	}
+	fs.flow.SetContext(fs.ctx)
+	// TODO(toddw): Explicitly cancel the context when the flow is done.
+	_ = cancel
+
+	// Initialize security: blessings, discharges, etc.
+	if err := fs.initSecurity(req); err != nil {
+		fs.drainDecoderArgs(int(req.NumPosArgs))
+		return nil, err
+	}
+	// Lookup the invoker.
+	invoker, auth, err := fs.lookup(fs.suffix, fs.method)
+	if err != nil {
+		fs.drainDecoderArgs(int(req.NumPosArgs))
+		return nil, err
+	}
+
+	// Note that we strip the reserved prefix when calling the invoker so
+	// that __Glob will call Glob.  Note that we've already assigned a
+	// special invoker so that we never call the wrong method by mistake.
+	strippedMethod := naming.StripReserved(fs.method)
+
+	// Prepare invoker and decode args.
+	numArgs := int(req.NumPosArgs)
+	argptrs, tags, err := invoker.Prepare(fs.ctx, strippedMethod, numArgs)
+	fs.tags = tags
+	if err != nil {
+		fs.drainDecoderArgs(numArgs)
+		return nil, err
+	}
+	if called, want := req.NumPosArgs, uint64(len(argptrs)); called != want {
+		fs.drainDecoderArgs(numArgs)
+		return nil, newErrBadNumInputArgs(fs.ctx, fs.suffix, fs.method, called, want)
+	}
+	for ix, argptr := range argptrs {
+		if err := fs.dec.Decode(argptr); err != nil {
+			return nil, newErrBadInputArg(fs.ctx, fs.suffix, fs.method, uint64(ix), err)
+		}
+	}
+
+	// Check application's authorization policy.
+	if err := authorize(fs.ctx, fs, auth); err != nil {
+		return nil, err
+	}
+
+	// Invoke the method.
+	results, err := invoker.Invoke(fs.ctx, fs, strippedMethod, argptrs)
+	fs.server.stats.record(fs.method, time.Since(fs.starttime))
+	return results, err
+}
+
+// drainDecoderArgs drains the next n arguments encoded onto the flows decoder.
+// This is needed to ensure that the client is able to encode all of its args
+// before the server closes its flow. This guarantees that the client will
+// consistently get the server's error response.
+// TODO(suharshs): Figure out a better way to solve this race condition without
+// unnecessarily reading all arguments.
+func (fs *xflowServer) drainDecoderArgs(n int) error {
+	for i := 0; i < n; i++ {
+		if err := fs.dec.Ignore(); err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+// lookup returns the invoker and authorizer responsible for serving the given
+// name and method.  The suffix is stripped of any leading slashes. If it begins
+// with rpc.DebugKeyword, we use the internal debug dispatcher to look up the
+// invoker. Otherwise, and we use the server's dispatcher. The suffix and method
+// value may be modified to match the actual suffix and method to use.
+func (fs *xflowServer) lookup(suffix string, method string) (rpc.Invoker, security.Authorizer, error) {
+	if naming.IsReserved(method) {
+		return reservedInvoker(fs.disp, fs.server.dispReserved), security.AllowEveryone(), nil
+	}
+	disp := fs.disp
+	if naming.IsReserved(suffix) {
+		disp = fs.server.dispReserved
+	} else if fs.server.isLeaf && suffix != "" {
+		innerErr := verror.New(errUnexpectedSuffix, fs.ctx, suffix)
+		return nil, nil, verror.New(verror.ErrUnknownSuffix, fs.ctx, suffix, innerErr)
+	}
+	if disp != nil {
+		obj, auth, err := disp.Lookup(fs.ctx, suffix)
+		switch {
+		case err != nil:
+			return nil, nil, err
+		case obj != nil:
+			invoker, err := objectToInvoker(obj)
+			if err != nil {
+				return nil, nil, verror.New(verror.ErrInternal, fs.ctx, "invalid received object", err)
+			}
+			return invoker, auth, nil
+		}
+	}
+	return nil, nil, verror.New(verror.ErrUnknownSuffix, fs.ctx, suffix)
+}
+
+func (fs *xflowServer) initSecurity(req *rpc.Request) error {
+	// TODO(toddw): Do something with this.
+	/*
+		// LocalPrincipal is nil which means we are operating under
+		// SecurityNone.
+		if fs.LocalPrincipal() == nil {
+			return nil
+		}
+
+		// If additional credentials are provided, make them available in the context
+		// Detect unusable blessings now, rather then discovering they are unusable on
+		// first use.
+		//
+		// TODO(ashankar,ataly): Potential confused deputy attack: The client provides
+		// the server's identity as the blessing. Figure out what we want to do about
+		// this - should servers be able to assume that a blessing is something that
+		// does not have the authorizations that the server's own identity has?
+		if got, want := req.GrantedBlessings.PublicKey(), fs.LocalPrincipal().PublicKey(); got != nil && !reflect.DeepEqual(got, want) {
+			return verror.New(verror.ErrNoAccess, fs.ctx, fmt.Sprintf("blessing granted not bound to this server(%v vs %v)", got, want))
+		}
+		fs.grantedBlessings = req.GrantedBlessings
+
+		var err error
+		if fs.clientBlessings, err = serverDecodeBlessings(fs.flow.VCDataCache(), req.Blessings, fs.server.stats); err != nil {
+			// When the server can't access the blessings cache, the client is not following
+			// protocol, so the server closes the VCs corresponding to the client endpoint.
+			// TODO(suharshs,toddw): Figure out a way to only shutdown the current VC, instead
+			// of all VCs connected to the RemoteEndpoint.
+			fs.server.streamMgr.ShutdownEndpoint(fs.RemoteEndpoint())
+			return verror.New(verror.ErrBadProtocol, fs.ctx, newErrBadBlessingsCache(fs.ctx, err))
+		}
+		// Verify that the blessings sent by the client in the request have the same public
+		// key as those sent by the client during VC establishment.
+		if got, want := fs.clientBlessings.PublicKey(), fs.flow.RemoteBlessings().PublicKey(); got != nil && !reflect.DeepEqual(got, want) {
+			return verror.New(verror.ErrNoAccess, fs.ctx, fmt.Sprintf("blessings sent with the request are bound to a different public key (%v) from the blessing used during VC establishment (%v)", got, want))
+		}
+		fs.ackBlessings = true
+
+		for _, d := range req.Discharges {
+			fs.discharges[d.ID()] = d
+		}
+	*/
+	return nil
+}
+
+// Send implements the rpc.Stream method.
+func (fs *xflowServer) Send(item interface{}) error {
+	defer apilog.LogCallf(nil, "item=")(nil, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
+	// The empty response header indicates what follows is a streaming result.
+	if err := fs.enc.Encode(rpc.Response{}); err != nil {
+		return err
+	}
+	return fs.enc.Encode(item)
+}
+
+// Recv implements the rpc.Stream method.
+func (fs *xflowServer) Recv(itemptr interface{}) error {
+	defer apilog.LogCallf(nil, "itemptr=")(nil, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
+	var req rpc.Request
+	if err := fs.dec.Decode(&req); err != nil {
+		return err
+	}
+	if req.EndStreamArgs {
+		fs.endStreamArgs = true
+		return io.EOF
+	}
+	return fs.dec.Decode(itemptr)
+}
+
+// Implementations of rpc.ServerCall and security.Call methods.
+
+func (fs *xflowServer) Security() security.Call {
+	//nologcall
+	return fs
+}
+func (fs *xflowServer) LocalDischarges() map[string]security.Discharge {
+	//nologcall
+	return fs.flow.LocalDischarges()
+}
+func (fs *xflowServer) RemoteDischarges() map[string]security.Discharge {
+	//nologcall
+	return fs.flow.RemoteDischarges()
+}
+func (fs *xflowServer) Server() rpc.Server {
+	//nologcall
+	return nil // TODO(toddw): Change return to rpc.XServer
+}
+func (fs *xflowServer) Timestamp() time.Time {
+	//nologcall
+	return fs.starttime
+}
+func (fs *xflowServer) Method() string {
+	//nologcall
+	return fs.method
+}
+func (fs *xflowServer) MethodTags() []*vdl.Value {
+	//nologcall
+	return fs.tags
+}
+func (fs *xflowServer) Suffix() string {
+	//nologcall
+	return fs.suffix
+}
+func (fs *xflowServer) LocalPrincipal() security.Principal {
+	//nologcall
+	return fs.server.principal
+}
+func (fs *xflowServer) LocalBlessings() security.Blessings {
+	//nologcall
+	return fs.flow.LocalBlessings()
+}
+func (fs *xflowServer) RemoteBlessings() security.Blessings {
+	//nologcall
+	return fs.flow.RemoteBlessings()
+}
+func (fs *xflowServer) GrantedBlessings() security.Blessings {
+	//nologcall
+	return fs.grantedBlessings
+}
+func (fs *xflowServer) LocalEndpoint() naming.Endpoint {
+	//nologcall
+	return fs.flow.Conn().LocalEndpoint()
+}
+func (fs *xflowServer) RemoteEndpoint() naming.Endpoint {
+	//nologcall
+	return fs.flow.Conn().RemoteEndpoint()
+}