// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package vc

import (
	"bytes"
	"fmt"
	"io"
	"sync"
	"time"

	"v.io/v23/context"
	"v.io/v23/naming"
	"v.io/v23/rpc/version"
	"v.io/v23/security"
	"v.io/v23/vom"
	"v.io/v23/vtrace"
	"v.io/x/lib/vlog"
	"v.io/x/ref/runtime/internal/lib/bqueue"
	"v.io/x/ref/runtime/internal/lib/bqueue/drrqueue"
	"v.io/x/ref/runtime/internal/lib/iobuf"
	vsync "v.io/x/ref/runtime/internal/lib/sync"
	"v.io/x/ref/runtime/internal/lib/upcqueue"
	"v.io/x/ref/runtime/internal/rpc/stream"
	"v.io/x/ref/runtime/internal/rpc/stream/crypto"
	"v.io/x/ref/runtime/internal/rpc/stream/id"
	"v.io/x/ref/runtime/internal/rpc/stream/message"
	iversion "v.io/x/ref/runtime/internal/rpc/version"
)

var pool = iobuf.NewPool(0)
var nullCipher = crypto.NullControlCipher{}
var stopToken = iobuf.NewSlice(make([]byte, 1))

// This is kind of a hack.  If you send this down a flow then all
// bytes sent after will be sent unencyrpted.
var disableEncryption = iobuf.NewSlice(make([]byte, 1))

const (
	// Priorities of the buffered queues used for flow control of writes.
	authPriority bqueue.Priority = iota
	expressPriority
	systemXFlowPriority
	normalXFlowPriority
	stopPriority
)

type flowState struct {
	flow               *flow
	encryptionDisabled bool
}

type xvc struct {
	mu                       sync.Mutex
	flowMap                  map[id.Flow]*flowState
	lDischarges, rDischarges map[string]security.Discharge
	nextConnectFID           id.Flow

	incommingMu sync.Mutex
	incomming   *upcqueue.T

	cipher                 crypto.ControlCipher
	version                version.RPCVersion
	principal              security.Principal
	lBlessings, rBlessings security.Blessings
	local, remote          naming.Endpoint
	conn                   stream.XConn
	outgoing               bqueue.T
	expressQ, stopQ        bqueue.Writer
	sharedCounters         *vsync.Semaphore
	reader                 *iobuf.Reader
	closed                 chan struct{}
	dataCache              *dataCache
	dClient                DischargeClient
	dBuffer                time.Duration
}

func commonInit(
	ctx *context.T,
	conn stream.XConn,
	principal security.Principal,
	versions iversion.Range) (*xvc, stream.XFlow, error) {
	vc := &xvc{
		principal:      principal,
		cipher:         nullCipher,
		conn:           conn,
		outgoing:       drrqueue.New(MaxPayloadSizeBytes),
		flowMap:        make(map[id.Flow]*flowState),
		sharedCounters: vsync.NewSemaphore(),
		closed:         make(chan struct{}),
		dataCache:      newDataCache(),
	}
	// Both sides create reserved flows ahead of time.
	authFlow, err := vc.newFlow(AuthFlowID, authPriority)
	authFlow.Release(DefaultBytesBufferedPerFlow)
	return vc, authFlow, err
}

func XNewDialed(
	ctx *context.T,
	conn stream.XConn,
	principal security.Principal,
	local, remote naming.Endpoint,
	versions iversion.Range,
	opts ...stream.VCOpt) (stream.XVC, error) {
	ctx, span := vtrace.WithNewSpan(ctx, "Dialing VC")
	defer span.Finish()

	vc, authFlow, err := commonInit(ctx, conn, principal, versions)
	if err != nil {
		return nil, err
	}

	vc.local = local
	vc.nextConnectFID = NumReservedFlows

	errch := make(chan error)
	go func() {
		errch <- vc.setup(ctx, local, remote, versions, principal != nil)
	}()
	if err := <-errch; err != nil {
		vc.Close(err)
		return nil, err
	}
	if principal == nil {
		// Exit early for unencrypted VCs.
		return vc, nil
	}
	if err := vc.authenticateClient(ctx, authFlow, opts...); err != nil {
		vc.Close(err)
		return nil, err
	}
	return vc, nil
}

func XNewAccepted(
	ctx *context.T,
	conn stream.XConn,
	principal security.Principal,
	local naming.Endpoint,
	lBlessings security.Blessings,
	versions iversion.Range,
	incomming *upcqueue.T,
	opts ...stream.ListenerOpt) (stream.XVC, error) {
	ctx, span := vtrace.WithNewSpan(ctx, "Accepting VC")
	defer span.Finish()

	vc, authFlow, err := commonInit(ctx, conn, principal, versions)
	if err != nil {
		return nil, err
	}
	vc.incommingMu.Lock()
	// We unlock this only AFTER authentication is finished to ensure
	// that no flows are announced until we're fully initialized.
	// TODO(mattr): Find a better way.
	defer vc.incommingMu.Unlock()
	vc.incomming = incomming

	vc.local = local
	vc.lBlessings = lBlessings
	vc.nextConnectFID = NumReservedFlows + 1
	vc.dClient, vc.dBuffer = dischargeOptions(opts)

	errch := make(chan error)
	go func() {
		// TODO(mattr): Sending local twice, but... what should we send?
		errch <- vc.setup(ctx, local, local, versions, principal != nil)
	}()
	if err := <-errch; err != nil {
		vc.Close(err)
		return nil, err
	}
	if principal == nil {
		//Exit early for unencrypted VCs.
		return vc, nil
	}
	if err := vc.authenticateServer(ctx, authFlow, opts...); err != nil {
		vc.Close(err)
		return nil, err
	}
	return vc, nil
}

func (vc *xvc) setup(
	ctx *context.T,
	local, remote naming.Endpoint,
	versions iversion.Range,
	encrypted bool) error {
	ctx, span := vtrace.WithNewSpan(ctx, "Setup")
	defer span.Finish()

	_, keySpan := vtrace.WithNewSpan(ctx, "Generate Key")
	pk, sk, err := crypto.GenerateBoxKey()
	if err != nil {
		return err
	}
	keySpan.Finish()

	setup := &message.SetupVC{
		LocalEndpoint:  local,
		RemoteEndpoint: remote,
		Counters:       message.NewCounters(),
		Setup: message.Setup{
			Versions: versions,
			Options:  []message.SetupOption{&message.NaclBox{PublicKey: *pk}},
		},
	}
	setup.Counters.Add(0, SharedFlowID, DefaultBytesBufferedPerFlow)

	// Send and receive a SetupVC message.
	// Note we use the nullCipher here.  The XConn is handling encryption.
	errch := make(chan error, 1)
	go func() {
		_, span := vtrace.WithNewSpan(ctx, "Writing setup")
		errch <- message.WriteTo(vc.conn, setup, nullCipher)
		span.Finish()
	}()
	_, readSpan := vtrace.WithNewSpan(ctx, "Reading setup")
	vc.reader = iobuf.NewReader(pool, vc.conn)
	msg, err := message.ReadFrom(vc.reader, nullCipher)
	if err != nil {
		return err
	}
	remoteSetup, ok := msg.(*message.SetupVC)
	if !ok {
		return fmt.Errorf("wrong message.")
	}
	readSpan.Finish()

	if err := <-errch; err != nil {
		return err
	}
	vc.remote = remoteSetup.LocalEndpoint

	// Release shared counters
	vc.distributeCounters(remoteSetup.Counters)

	// Choose a version.
	vrange, err := versions.Intersect(&remoteSetup.Setup.Versions)
	if err != nil {
		return err
	}
	vc.version = vrange.Max

	// Set up the cipher.
	if encrypted {
		_, naclSpan := vtrace.WithNewSpan(ctx, "Creating Cipher")
		remoteKey := remoteSetup.Setup.NaclBox()
		if remoteKey == nil {
			return fmt.Errorf("missing key.")
		}
		// Disable the underlying encryption.  We'll do our own from now on.
		vc.conn.DisableEncryption()

		vc.cipher = crypto.NewControlCipherRPC11(pk, sk, &remoteKey.PublicKey)
		naclSpan.Finish()
	}

	vc.expressQ, err = vc.outgoing.NewWriter(ExpressFlowID, expressPriority, DefaultBytesBufferedPerFlow)
	if err != nil {
		return err
	}
	vc.expressQ.Release(-1) // Disable flow control
	vc.stopQ, err = vc.outgoing.NewWriter(StopFlowID, stopPriority, DefaultBytesBufferedPerFlow)
	if err != nil {
		return err
	}
	vc.stopQ.Release(-1) // Disable flow control

	// Start read and write loops
	go vc.readLoop()
	go vc.writeLoop()

	return nil
}

func (vc *xvc) connectSystemFlows() error {
	conn, err := vc.connect(TypeFlowID, systemXFlowPriority)
	if err != nil {
		return err
	}
	vc.dataCache.Insert(TypeEncoderKey{}, vom.NewTypeEncoder(conn))
	vc.dataCache.Insert(TypeDecoderKey{}, vom.NewTypeDecoder(conn))

	if len(vc.rBlessings.ThirdPartyCaveats()) > 0 {
		conn, err = vc.connect(DischargeFlowID, systemXFlowPriority)
		if err != nil {
			return err
		}
		go vc.recvDischargesLoop(conn)
	}
	return nil
}

func (vc *xvc) sendDischargesLoop(conn io.WriteCloser, tpCavs []security.Caveat) {
	defer conn.Close()
	if vc.dClient == nil {
		return
	}
	enc := vom.NewEncoder(conn)
	discharges := vc.dClient.PrepareDischarges(nil, tpCavs, security.DischargeImpetus{})
	for expiry := minExpiryTime(discharges, tpCavs); !expiry.IsZero(); expiry = minExpiryTime(discharges, tpCavs) {
		select {
		case <-time.After(fetchDuration(expiry, vc.dBuffer)):
			discharges = vc.dClient.PrepareDischarges(nil, tpCavs, security.DischargeImpetus{})
			if err := enc.Encode(discharges); err != nil {
				vlog.Errorf("encoding discharges on VC %v failed: %v", vc, err)
				return
			}
			if len(discharges) == 0 {
				continue
			}
			vc.mu.Lock()
			if vc.lDischarges == nil {
				vc.lDischarges = make(map[string]security.Discharge)
			}
			for _, d := range discharges {
				vc.lDischarges[d.ID()] = d
			}
			vc.mu.Unlock()
		case <-vc.closed:
			vlog.VI(3).Infof("closing sendDischargesLoop on VC %v", vc)
			return
		}
	}
}

func (vc *xvc) recvDischargesLoop(conn io.ReadCloser) {
	defer conn.Close()
	dec := vom.NewDecoder(conn)
	for {
		var discharges []security.Discharge
		if err := dec.Decode(&discharges); err != nil {
			vlog.VI(3).Infof("decoding discharges on %v failed: %v", vc, err)
			return
		}
		if len(discharges) == 0 {
			continue
		}
		vc.mu.Lock()
		if vc.rDischarges == nil {
			vc.rDischarges = make(map[string]security.Discharge)
		}
		for _, d := range discharges {
			vc.rDischarges[d.ID()] = d
		}
		vc.mu.Unlock()
	}
}

func (vc *xvc) handleMessage(msg message.T) error {
	switch m := msg.(type) {

	case *message.Data:
		payload := m.Payload
		defer func() {
			if payload != nil {
				payload.Release()
			}
		}()
		vc.mu.Lock()
		fs := vc.flowMap[m.Flow]
		if m.Close() && fs != nil {
			defer fs.flow.Shutdown()
			delete(vc.flowMap, m.Flow)
		}
		vc.mu.Unlock()
		if payload.Size() == 0 {
			return nil
		}
		if fs == nil {
			vlog.Infof("Ignoring data packet for unkown flow: %#v", msg)
			return nil
		}
		if err := fs.flow.reader.Put(payload); err != nil {
			vlog.Errorf("Could not queue data packet: %#v", msg)
			return err
		}
		payload = nil

	case *message.AddReceiveBuffers:
		vc.distributeCounters(m.Counters)

	case *message.OpenFlow:
		if err := vc.accept(m.Flow, int(m.InitialCounters)); err != nil {
			cm := &message.Data{Flow: m.Flow}
			cm.SetClose()
			if err := vc.sendOnExpressQ(cm); err != nil {
				return err
			}
		}
		// TODO(mattr): The first write uses shared counters, but the first write is
		// always just 1 byte :(
		counters := message.NewCounters()
		counters.Add(0, m.Flow, uint32(DefaultBytesBufferedPerFlow))
		err := vc.sendOnExpressQ(&message.AddReceiveBuffers{
			Counters: counters,
		})
		return err

	case *message.CloseVC:
		vc.stopQ.Put(stopToken, nil)
		if m.Error != "" {
			vlog.Errorf("VC Closed from remote end due to error: %s", m.Error)
			return fmt.Errorf(m.Error)
		}
		return io.EOF

	default:
		vlog.Infof("Ignoring irrelevant or unrecognized message %T: ", m)
	}
	return nil
}

func (vc *xvc) distributeCounters(counters message.Counters) {
	for cid, bytes := range counters {
		if cid.Flow() == SharedFlowID {
			vc.sharedCounters.IncN(uint(bytes))
			continue
		}
		vc.mu.Lock()
		f := vc.flowMap[cid.Flow()]
		vc.mu.Unlock()
		if f == nil {
			vlog.Infof("ignoring counters for unknown flow: %d", cid.Flow())
			continue
		}
		f.flow.Release(int(bytes))
	}
}

func (vc *xvc) readLoop() {
	for {
		msg, err := message.ReadFrom(vc.reader, vc.cipher)
		if err != nil {
			vlog.Errorf("Exiting VC read loop: %v", err)
			return
		}
		if err := vc.handleMessage(msg); err != nil {
			if err != io.EOF {
				vlog.Errorf("Could not handle message: %v", err)
			}
			return
		}
	}
}

func releaseBufs(bufs []*iobuf.Slice) {
	for _, b := range bufs {
		if b != disableEncryption && b != stopToken {
			b.Release()
		}
	}
}

func (vc *xvc) writeData(writer bqueue.Writer, bufs []*iobuf.Slice) error {
	defer releaseBufs(bufs)
	fid := id.Flow(writer.ID())
	var encryptionDisabled bool
	vc.mu.Lock()
	fs := vc.flowMap[fid]
	if fs != nil {
		encryptionDisabled = fs.encryptionDisabled
	}
	vc.mu.Unlock()

	// TODO(mattr): coalesce, but only after finding the encryption boundary.
	last := len(bufs) - 1
	// TODO(mattr): This is broken if disableEncryption is last.
	drained := writer.IsDrained()

	if drained {
		defer func() {
			vc.mu.Lock()
			delete(vc.flowMap, fid)
			vc.mu.Unlock()
		}()
	}

	for i, b := range bufs {
		if b == disableEncryption {
			encryptionDisabled = true
			vc.mu.Lock()
			if fs != nil {
				fs.encryptionDisabled = true
			}
			vc.mu.Unlock()
			continue
		}

		d := &message.Data{Flow: fid, Payload: b}
		if !encryptionDisabled {
			d.SetEncrypted()
		}
		if drained && i == last {
			d.SetClose()
		}
		if err := message.WriteTo(vc.conn, d, vc.cipher); err != nil {
			return err
		}
	}
	if len(bufs) == 0 && drained {
		d := &message.Data{Flow: fid}
		d.SetClose()
		if err := message.WriteTo(vc.conn, d, vc.cipher); err != nil {
			return err
		}
	}
	return nil
}

func (vc *xvc) writeSerializedMessage(msg *iobuf.Slice) error {
	// TODO(mattr): All of this would be easier if the queues contained messages
	// instead of byte arrays.
	if err := message.EncryptMessage(msg.Contents, vc.cipher); err != nil {
		return err
	}
	if _, err := vc.conn.Write(msg.Contents); err != nil {
		return err
	}
	msg.Release()
	return nil
}

func (vc *xvc) writeLoop() {
	defer vc.outgoing.Close()
	defer close(vc.closed)
	for {
		writer, bufs, err := vc.outgoing.Get(nil)
		if err != nil {
			return
		}
		switch writer {
		case vc.expressQ:
			for _, b := range bufs {
				if err = vc.writeSerializedMessage(b); err != nil {
					vlog.Errorf("Could not write control message: %v", err)
					return
				}
			}
		case vc.stopQ:
			if len(bufs) > 0 {
				// If we get stopToken here, then the vc was closed by the remote end.
				if bufs[0] != stopToken {
					if err := vc.writeSerializedMessage(bufs[0]); err != nil {
						vlog.Errorf("Could not write CloseVC: %v", err)
					}
				}
			}
			return
		default:
			if err = vc.writeData(writer, bufs); err != nil {
				vlog.Errorf("Error writing: %v", err)
				return
			}
		}
	}
}

func (vc *xvc) authenticateServer(ctx *context.T, authFlow stream.XFlow, opts ...stream.ListenerOpt) error {
	ctx, span := vtrace.WithNewSpan(ctx, "Server Authentication")
	defer span.Finish()
	defer authFlow.Close()

	if vc.lBlessings.IsZero() {
		return fmt.Errorf("no server blessings")
	}

	// TODO(mattr): Check that authFlow has the right fid.

	// This is a wart.  We always pass a null crypter (with channel binding) even
	// though this is all encrypted by the flow write.  This is just because we
	// want to re-use the existing Authenticate methods.
	crypter := crypto.NewNullCrypterWithChannelBinding(vc.cipher.ChannelBinding())

	var serverDischarges []security.Discharge
	if tpcavs := vc.lBlessings.ThirdPartyCaveats(); len(tpcavs) > 0 && vc.dClient != nil {
		_, span := vtrace.WithNewSpan(ctx, "Prepare Discharages")
		serverDischarges = vc.dClient.PrepareDischarges(ctx, tpcavs, security.DischargeImpetus{})
		span.Finish()
	}
	_, wspan := vtrace.WithNewSpan(ctx, "writeBlessings")
	if err := writeBlessings(authFlow, authServerContextTag, crypter, vc.principal, vc.lBlessings, serverDischarges, vc.version); err != nil {
		return err
	}
	wspan.Finish()

	// Note that since the client uses a self-signed blessing to authenticate
	// during VC setup, it does not share any discharges.
	rBlessings, _, err := readBlessings(ctx, authFlow, authClientContextTag, crypter, vc.version)
	if err != nil {
		return err
	}
	vc.rBlessings = rBlessings
	vc.lDischarges = mkDischargeMap(serverDischarges)
	return nil
}

func (vc *xvc) authenticateClient(ctx *context.T, authFlow stream.XFlow, opts ...stream.VCOpt) error {
	ctx, span := vtrace.WithNewSpan(ctx, "Client Authentication")
	defer span.Finish()
	defer authFlow.Close()

	// This is a wart.  We always pass a null crypter (with channel binding) even
	// though this is all encrypted by the flow write.  This is just because we
	// want to re-use the existing Authenticate methods.
	crypter := crypto.NewNullCrypterWithChannelBinding(vc.cipher.ChannelBinding())

	server, serverDischarges, err := readBlessings(ctx, authFlow, authServerContextTag, crypter, vc.version)
	if err != nil {
		return err
	}

	// Authorize the server based on the provided authorizer.
	auth := serverAuthorizer(opts)
	if auth != nil {
		_, authSpan := vtrace.WithNewSpan(ctx, "Authorize Server")
		params := security.CallParams{
			LocalPrincipal:   vc.principal,
			LocalEndpoint:    vc.local,
			RemoteEndpoint:   vc.remote,
			RemoteBlessings:  server,
			RemoteDischarges: serverDischarges,
		}
		if err := auth.Authorize(params); err != nil {
			return fmt.Errorf("Not trusted.")
		}
		authSpan.Finish()
	}

	// The client shares its blessings at RPC time (as the blessings may vary
	// across RPCs). During VC handshake, the client simply sends a self-signed
	// blessing in order to reveal its public key to the server.
	// TODO(suharshs): Actually we should just send the public key.
	_, blessSpan := vtrace.WithNewSpan(ctx, "Bless self")
	client, err := vc.principal.BlessSelf("vcauth")
	if err != nil {
		return fmt.Errorf("Could not create blessing")
	}
	blessSpan.Finish()
	_, writeSpan := vtrace.WithNewSpan(ctx, "Write blessings")
	if err := writeBlessings(authFlow, authClientContextTag, crypter, vc.principal, client, nil, vc.version); err != nil {
		return err
	}
	writeSpan.Finish()

	vc.lBlessings = client
	vc.rBlessings = server
	vc.rDischarges = serverDischarges

	return vc.connectSystemFlows()
}

func (vc *xvc) newFlow(fid id.Flow, priority bqueue.Priority) (*flow, error) {
	qw, err := vc.outgoing.NewWriter(bqueue.ID(fid), priority, MaxPayloadSizeBytes)
	if err != nil {
		return nil, err
	}
	f := &flow{
		backingVC: vc,
		reader:    newReader(&xReadHandler{fid, vc}),
		writer:    newWriter(MaxPayloadSizeBytes, qw, iobuf.NewAllocator(pool, 0), vc.sharedCounters),
	}

	vc.mu.Lock()
	defer vc.mu.Unlock()
	if vc.flowMap == nil {
		qw.Close()
		return nil, fmt.Errorf("Could not create flow on closed VC.")
	}
	// TODO(mattr): it's an error if fid already exists.
	vc.flowMap[fid] = &flowState{f, true}

	return f, nil
}

func (vc *xvc) sendOnExpressQ(msg message.T) error {
	return vc.sendOn(vc.expressQ, msg)
}

func (vc *xvc) sendOn(w bqueue.Writer, msg message.T) error {
	var buf bytes.Buffer
	if err := message.WriteTo(&buf, msg, crypto.NewDisabledControlCipher(vc.cipher)); err != nil {
		return err
	}
	err := w.Put(iobuf.NewSlice(buf.Bytes()), nil)
	return err
}

func (vc *xvc) connect(fid id.Flow, priority bqueue.Priority, opts ...stream.FlowOpt) (stream.XFlow, error) {
	f, err := vc.newFlow(fid, priority)
	if err != nil {
		return nil, err
	}

	vc.sendOnExpressQ(&message.OpenFlow{
		Flow:            fid,
		InitialCounters: uint32(DefaultBytesBufferedPerFlow),
	})
	return f, nil
}

func (vc *xvc) accept(fid id.Flow, initialCounters int) error {
	priority := normalXFlowPriority
	if fid < NumReservedFlows {
		priority = systemXFlowPriority
	}
	f, err := vc.newFlow(fid, priority)
	if err != nil {
		return err
	}
	f.Release(initialCounters)
	switch fid {
	case TypeFlowID:
		vc.dataCache.Insert(TypeEncoderKey{}, vom.NewTypeEncoder(f))
		vc.dataCache.Insert(TypeDecoderKey{}, vom.NewTypeDecoder(f))
		return nil
	case DischargeFlowID:
		tpCaveats := vc.lBlessings.ThirdPartyCaveats()
		if len(tpCaveats) > 0 {
			go vc.sendDischargesLoop(f, tpCaveats)
		}
		return nil
	default:
		vc.incommingMu.Lock()
		q := vc.incomming
		vc.incommingMu.Unlock()

		if q == nil {
			err = fmt.Errorf("VC not listening")
		} else {
			err = q.Put(f)
		}
		if err != nil {
			vc.mu.Lock()
			f.Shutdown()
			delete(vc.flowMap, fid)
			vc.mu.Unlock()
		}
		return err
	}
}

func (vc *xvc) Connect(opts ...stream.FlowOpt) (stream.XFlow, error) {
	vc.mu.Lock()
	fid := vc.nextConnectFID
	vc.nextConnectFID += 2
	vc.mu.Unlock()
	return vc.connect(fid, normalXFlowPriority, opts...)
}

func (vc *xvc) ListenTo(q *upcqueue.T) error {
	vc.incommingMu.Lock()
	defer vc.incommingMu.Unlock()
	if vc.incomming != nil {
		return fmt.Errorf("The given address already has a listener.")
	}
	vc.incomming = q
	return nil
}

func (vc *xvc) Closed() chan struct{} {
	return vc.closed
}

func (vc *xvc) Close(reason error) error {
	vc.mu.Lock()
	flows := vc.flowMap
	vc.flowMap = nil
	vc.mu.Unlock()

	vc.incommingMu.Lock()
	q := vc.incomming
	vc.incomming = nil
	vc.incommingMu.Unlock()

	if q != nil {
		q.Close()
	}
	vc.sharedCounters.Close()

	for _, fs := range flows {
		fs.flow.Close()
	}

	msg := ""
	if reason != nil {
		msg = reason.Error()
	}
	err := vc.sendOn(vc.stopQ, &message.CloseVC{
		Error: msg,
	})
	return err
}

// Implement the backingVC interface for flows.
// Note I don't bother with locking because these are all set by the time the
// constructor is done.
func (vc *xvc) LocalEndpoint() naming.Endpoint                  { return vc.local }
func (vc *xvc) RemoteEndpoint() naming.Endpoint                 { return vc.remote }
func (vc *xvc) LocalPrincipal() security.Principal              { return vc.principal }
func (vc *xvc) LocalBlessings() security.Blessings              { return vc.lBlessings }
func (vc *xvc) RemoteBlessings() security.Blessings             { return vc.rBlessings }
func (vc *xvc) LocalDischarges() map[string]security.Discharge  { return vc.lDischarges }
func (vc *xvc) RemoteDischarges() map[string]security.Discharge { return vc.rDischarges }
func (vc *xvc) VCDataCache() stream.VCDataCache                 { return vc.dataCache }

type xReadHandler struct {
	flow id.Flow
	vc   *xvc
}

func (h *xReadHandler) HandleRead(bytes uint) {
	counters := message.NewCounters()
	counters.Add(0, h.flow, uint32(bytes))
	err := h.vc.sendOnExpressQ(&message.AddReceiveBuffers{
		Counters: counters,
	})
	if err != nil {
		vlog.Errorf("Unable to send counters: %v", err)
	}
}
