blob: a52585b8c0ff4c48d4b2e728aa24718ac57a0778 [file] [log] [blame]
package vc
// Logging guidelines:
// Verbosity level 1 is for per-VC messages.
// Verbosity level 2 is for per-Flow messages.
import (
"errors"
"fmt"
"sort"
"strings"
"sync"
"v.io/core/veyron/runtimes/google/ipc/stream/crypto"
"v.io/core/veyron/runtimes/google/ipc/stream/id"
"v.io/core/veyron/runtimes/google/lib/bqueue"
"v.io/core/veyron/runtimes/google/lib/iobuf"
vsync "v.io/core/veyron/runtimes/google/lib/sync"
ivtrace "v.io/core/veyron/runtimes/google/vtrace"
"v.io/core/veyron2/context"
"v.io/core/veyron2/ipc/stream"
"v.io/core/veyron2/ipc/version"
"v.io/core/veyron2/naming"
"v.io/core/veyron2/options"
"v.io/core/veyron2/security"
"v.io/core/veyron2/vlog"
"v.io/core/veyron2/vtrace"
)
var (
errAlreadyListening = errors.New("Listen has already been called")
errDuplicateFlow = errors.New("duplicate OpenFlow message")
errUnrecognizedFlow = errors.New("unrecognized flow")
)
// VC implements the stream.VC interface and exports additional methods to
// manage Flows.
//
// stream.Flow objects created by this stream.VC implementation use a buffer
// queue (veyron/runtimes/google/lib/bqueue) to provide flow control on Write
// operations.
type VC struct {
vci id.VC
localEP, remoteEP naming.Endpoint
localPrincipal security.Principal
localBlessings, remoteBlessings security.Blessings
remoteDischarges map[string]security.Discharge
pool *iobuf.Pool
reserveBytes uint
sharedCounters *vsync.Semaphore
mu sync.Mutex
flowMap map[id.Flow]*flow // nil iff the VC is closed.
acceptHandshakeDone chan struct{} // non-nil when HandshakeAcceptVC begins the handshake, closed when handshake completes.
handshakeFID id.Flow // flow used for a TLS handshake to setup encryption.
authFID id.Flow // flow used by the authentication protocol.
nextConnectFID id.Flow
listener *listener // non-nil iff Listen has been called and the VC has not been closed.
crypter crypto.Crypter
closeReason string // reason why the VC was closed
helper Helper
version version.IPCVersion
dataCache *dataCache // dataCache contains information that can shared between Flows from this VC.
}
// NoDischarges specifies that the RPC call should not fetch discharges.
type NoDischarges struct{}
func (NoDischarges) IPCCallOpt() {}
func (NoDischarges) IPCStreamVCOpt() {}
func (NoDischarges) NSResolveOpt() {}
var _ stream.VC = (*VC)(nil)
// Helper is the interface for functionality required by the stream.VC
// implementation in this package.
type Helper interface {
// NotifyOfNewFlow notifies the remote end of a VC that the caller intends to
// establish a new flow to it and that the caller is ready to receive bytes
// data from the remote end.
NotifyOfNewFlow(vci id.VC, fid id.Flow, bytes uint)
// AddReceiveBuffers notifies the remote end of a VC that it is read to receive
// bytes more data on the flow identified by fid over the VC identified by vci.
//
// Unlike NotifyOfNewFlow, this call does not let the remote end know of the
// intent to establish a new flow.
AddReceiveBuffers(vci id.VC, fid id.Flow, bytes uint)
// NewWriter creates a buffer queue for Write operations on the
// stream.Flow implementation.
NewWriter(vci id.VC, fid id.Flow) (bqueue.Writer, error)
}
// Params encapsulates the set of parameters needed to create a new VC.
type Params struct {
VCI id.VC // Identifier of the VC
Dialed bool // True if the VC was initiated by the local process.
LocalEP naming.Endpoint // Endpoint of the local end of the VC.
RemoteEP naming.Endpoint // Endpoint of the remote end of the VC.
Pool *iobuf.Pool // Byte pool used for read and write buffer allocations.
ReserveBytes uint // Number of bytes to reserve in iobuf.Slices.
Helper Helper
Version version.IPCVersion
}
// LocalPrincipal wraps a security.Principal so that it can be provided
// as an option to various methods in order to provide authentication information
// when establishing virtual circuits.
type LocalPrincipal struct{ security.Principal }
func (LocalPrincipal) IPCStreamListenerOpt() {}
func (LocalPrincipal) IPCStreamVCOpt() {}
func (LocalPrincipal) IPCClientOpt() {}
func (LocalPrincipal) IPCServerOpt() {}
// DischargeClient is an interface for obtaining discharges for a set of third-party
// caveats.
//
// TODO(ataly, ashankar): What should be the impetus for obtaining the discharges?
type DischargeClient interface {
PrepareDischarges(ctx *context.T, forcaveats []security.ThirdPartyCaveat, impetus security.DischargeImpetus) []security.Discharge
// Invalidate marks the provided discharges as invalid, and therefore unfit
// for being returned by a subsequent PrepareDischarges call.
Invalidate(discharges ...security.Discharge)
IPCServerOpt()
IPCClientOpt()
IPCStreamListenerOpt()
IPCStreamVCOpt()
}
// DialContext establishes the context under which a VC Dial was initiated.
type DialContext struct{ *context.T }
func (DialContext) IPCStreamVCOpt() {}
// InternalNew creates a new VC, which implements the stream.VC interface.
//
// As the name suggests, this method is intended for use only within packages
// placed inside veyron/runtimes/google. Code outside the
// veyron/runtimes/google/* packages should never call this method.
func InternalNew(p Params) *VC {
fidOffset := 1
if p.Dialed {
fidOffset = 0
}
return &VC{
vci: p.VCI,
localEP: p.LocalEP,
remoteEP: p.RemoteEP,
pool: p.Pool,
reserveBytes: p.ReserveBytes,
sharedCounters: vsync.NewSemaphore(),
flowMap: make(map[id.Flow]*flow),
// Reserve flow IDs 0 thru NumReservedFlows for
// possible future use.
// Furthermore, flows created by Connect have an even
// id if the VC was initiated by the local process,
// and have an odd id if the VC was initiated by the
// remote process.
nextConnectFID: id.Flow(NumReservedFlows + fidOffset),
crypter: crypto.NewNullCrypter(),
helper: p.Helper,
version: p.Version,
dataCache: newDataCache(),
}
}
// Connect implements the stream.Connector.Connect method.
func (vc *VC) Connect(opts ...stream.FlowOpt) (stream.Flow, error) {
return vc.connectFID(vc.allocFID(), opts...)
}
func (vc *VC) connectFID(fid id.Flow, opts ...stream.FlowOpt) (stream.Flow, error) {
writer, err := vc.newWriter(fid)
if err != nil {
return nil, fmt.Errorf("failed to create writer for Flow: %v", err)
}
f := &flow{
authN: vc,
reader: newReader(readHandlerImpl{vc, fid}),
writer: writer,
localEndpoint: vc.localEP,
remoteEndpoint: vc.remoteEP,
dataCache: vc.dataCache,
}
vc.mu.Lock()
if vc.flowMap != nil {
vc.flowMap[fid] = f
} else {
err = fmt.Errorf("Connect on closed VC(%q)", vc.closeReason)
}
vc.mu.Unlock()
if err != nil {
f.Shutdown()
return nil, err
}
// New flow created, inform remote end that data can be received on it.
vc.helper.NotifyOfNewFlow(vc.vci, fid, DefaultBytesBufferedPerFlow)
return f, nil
}
// Listen implements the stream.VC.Listen method.
func (vc *VC) Listen() (stream.Listener, error) {
vc.mu.Lock()
defer vc.mu.Unlock()
if vc.listener != nil {
return nil, errAlreadyListening
}
vc.listener = newListener()
return vc.listener, nil
}
// RemoteAddr returns the remote endpoint for this VC.
func (vc *VC) RemoteAddr() naming.Endpoint {
return vc.remoteEP
}
// LocalAddr returns the local endpoint for this VC.
func (vc *VC) LocalAddr() naming.Endpoint {
return vc.localEP
}
// DispatchPayload makes payload.Contents available to Read operations on the
// Flow identified by fid.
//
// Assumes ownership of payload, i.e., payload should not be used by the caller
// after this method returns (irrespective of the return value).
func (vc *VC) DispatchPayload(fid id.Flow, payload *iobuf.Slice) error {
if payload.Size() == 0 {
payload.Release()
return nil
}
vc.mu.Lock()
if vc.flowMap == nil {
vc.mu.Unlock()
payload.Release()
return fmt.Errorf("ignoring message for Flow %d on closed VC %d", fid, vc.VCI())
}
// TLS decryption is stateful, so even if the message will be discarded
// because of other checks further down in this method, go through with
// the decryption.
if fid != vc.handshakeFID && fid != vc.authFID {
vc.waitForHandshakeLocked()
var err error
if payload, err = vc.crypter.Decrypt(payload); err != nil {
vc.mu.Unlock()
return fmt.Errorf("failed to decrypt payload: %v", err)
}
}
if payload.Size() == 0 {
vc.mu.Unlock()
payload.Release()
return nil
}
f := vc.flowMap[fid]
if f == nil {
vc.mu.Unlock()
payload.Release()
return errUnrecognizedFlow
}
vc.mu.Unlock()
if err := f.reader.Put(payload); err != nil {
payload.Release()
return err
}
return nil
}
// AcceptFlow enqueues a new Flow for acceptance by the listener on the VC.
// Returns an error if the VC is not accepting flows initiated by the remote
// end.
func (vc *VC) AcceptFlow(fid id.Flow) error {
vc.mu.Lock()
defer vc.mu.Unlock()
if vc.listener == nil {
return fmt.Errorf("no active listener on VC %d", vc.vci)
}
writer, err := vc.newWriter(fid)
if err != nil {
return fmt.Errorf("failed to create writer for new flow(%d): %v", fid, err)
}
f := &flow{
authN: vc,
reader: newReader(readHandlerImpl{vc, fid}),
writer: writer,
localEndpoint: vc.localEP,
remoteEndpoint: vc.remoteEP,
dataCache: vc.dataCache,
}
if err = vc.listener.Enqueue(f); err != nil {
f.Shutdown()
return fmt.Errorf("failed to enqueue flow at listener: %v", err)
}
if _, exists := vc.flowMap[fid]; exists {
return errDuplicateFlow
}
vc.flowMap[fid] = f
// New flow accepted, notify remote end that it can send over data.
// Do it in a goroutine in case the implementation of AddReceiveBuffers
// ends up attempting to lock vc.mu
go vc.helper.AddReceiveBuffers(vc.vci, fid, DefaultBytesBufferedPerFlow)
vlog.VI(2).Infof("Added flow %d@%d to listener", fid, vc.vci)
return nil
}
// ShutdownFlow closes the Flow identified by fid and discards any pending
// writes.
func (vc *VC) ShutdownFlow(fid id.Flow) {
vc.mu.Lock()
f := vc.flowMap[fid]
delete(vc.flowMap, fid)
vc.mu.Unlock()
if f != nil {
f.Shutdown()
}
}
// ReleaseCounters informs the Flow (identified by fid) that the remote end is
// ready to receive up to 'bytes' more bytes of data.
func (vc *VC) ReleaseCounters(fid id.Flow, bytes uint32) {
if fid == SharedFlowID {
vc.sharedCounters.IncN(uint(bytes))
return
}
var f *flow
vc.mu.Lock()
if vc.flowMap != nil {
f = vc.flowMap[fid]
}
vc.mu.Unlock()
if f == nil {
vlog.VI(2).Infof("Ignoring ReleaseCounters(%d, %d) on VCI %d as the flow does not exist", fid, bytes, vc.vci)
return
}
f.Release(int(bytes))
}
// Close closes the VC and all flows on it, allowing any pending writes in the
// flow to drain.
func (vc *VC) Close(reason string) error {
vlog.VI(1).Infof("Closing VC %v. Reason:%q", vc, reason)
vc.mu.Lock()
flows := vc.flowMap
vc.flowMap = nil
if vc.listener != nil {
vc.listener.Close()
}
vc.listener = nil
vc.closeReason = reason
vc.mu.Unlock()
vc.sharedCounters.Close()
for fid, flow := range flows {
vlog.VI(2).Infof("Closing flow %d on VC %v as VC is being closed(%q)", fid, vc, reason)
flow.Close()
}
return nil
}
// err prefers vc.closeReason over err.
func (vc *VC) err(err error) error {
vc.mu.Lock()
defer vc.mu.Unlock()
if vc.closeReason != "" {
return errors.New(vc.closeReason)
}
return err
}
// HandshakeDialedVC completes initialization of the VC (setting up encryption,
// authentication etc.) under the assumption that the VC was initiated by the
// local process (i.e., the local process "Dial"ed to create the VC).
func (vc *VC) HandshakeDialedVC(opts ...stream.VCOpt) error {
var (
principal security.Principal
tlsSessionCache crypto.TLSClientSessionCache
securityLevel options.VCSecurityLevel
dischargeClient DischargeClient
ctx *context.T
noDischarges bool
)
for _, o := range opts {
switch v := o.(type) {
case DialContext:
ctx = v.T
case DischargeClient:
dischargeClient = v
case LocalPrincipal:
principal = v.Principal
case options.VCSecurityLevel:
securityLevel = v
case crypto.TLSClientSessionCache:
tlsSessionCache = v
case NoDischarges:
noDischarges = true
}
}
if ctx != nil {
var span vtrace.Span
ctx, span = ivtrace.WithNewSpan(ctx, "vc.HandshakeDialedVC")
defer span.Finish()
}
// If noDischarge is provided, disable the dischargeClient.
if noDischarges {
dischargeClient = nil
}
switch securityLevel {
case options.VCSecurityConfidential:
if principal == nil {
principal = AnonymousPrincipal
}
case options.VCSecurityNone:
return nil
default:
return fmt.Errorf("unrecognized VC security level: %v", securityLevel)
}
// Establish TLS
handshakeFID := vc.allocFID()
handshakeConn, err := vc.connectFID(handshakeFID)
if err != nil {
return vc.err(fmt.Errorf("failed to create a Flow for setting up TLS: %v", err))
}
crypter, err := crypto.NewTLSClient(handshakeConn, handshakeConn.LocalEndpoint(), handshakeConn.RemoteEndpoint(), tlsSessionCache, vc.pool)
if err != nil {
return vc.err(fmt.Errorf("failed to setup TLS: %v", err))
}
// Authenticate (exchange identities)
// Unfortunately, handshakeConn cannot be used for the authentication protocol.
// This is because the Crypter implementation uses crypto/tls.Conn,
// which can consume data beyond the handshake message boundaries (call
// to readFromUntil in
// https://code.google.com/p/go/source/browse/src/pkg/crypto/tls/conn.go?spec=svn654b2703fcc466a29692068ab56efedd09fb3d05&r=654b2703fcc466a29692068ab56efedd09fb3d05#539).
// This is not a problem when tls.Conn is used as intended (to wrap over a stream), but
// becomes a problem when shoehorning a block encrypter (Crypter interface) over this
// stream API.
authFID := vc.allocFID()
authConn, err := vc.connectFID(authFID)
if err != nil {
return vc.err(fmt.Errorf("failed to create a Flow for authentication: %v", err))
}
rBlessings, lBlessings, rDischarges, err := AuthenticateAsClient(ctx, authConn, principal, dischargeClient, crypter, vc.version)
if err != nil {
return vc.err(fmt.Errorf("authentication failed: %v", err))
}
vc.mu.Lock()
vc.handshakeFID = handshakeFID
vc.authFID = authFID
vc.crypter = crypter
vc.localPrincipal = principal
vc.localBlessings = lBlessings
vc.remoteBlessings = rBlessings
vc.remoteDischarges = rDischarges
vc.mu.Unlock()
vlog.VI(1).Infof("Client VC %v authenticated. RemoteBlessings:%v, LocalBlessings:%v", vc, rBlessings, lBlessings)
return nil
}
// HandshakeResult is sent by HandshakeAcceptedVC over the channel returned by it.
type HandshakeResult struct {
Listener stream.Listener // Listener for accepting new Flows on the VC.
Error error // Error, if any, during the handshake.
}
// HandshakeAcceptedVC completes initialization of the VC (setting up
// encryption, authentication etc.) under the assumption that the VC was
// initiated by a remote process (and the local process wishes to "accept" it).
//
// Since the handshaking process might involve several round trips, a bulk of the work
// is done asynchronously and the result of the handshake is written to the
// channel returned by this method.
func (vc *VC) HandshakeAcceptedVC(opts ...stream.ListenerOpt) <-chan HandshakeResult {
result := make(chan HandshakeResult, 1)
finish := func(ln stream.Listener, err error) chan HandshakeResult {
result <- HandshakeResult{ln, err}
return result
}
var (
principal security.Principal
securityLevel options.VCSecurityLevel
dischargeClient DischargeClient
lBlessings security.Blessings
)
for _, o := range opts {
switch v := o.(type) {
case DischargeClient:
dischargeClient = v
case LocalPrincipal:
principal = v.Principal
case options.VCSecurityLevel:
securityLevel = v
case options.ServerBlessings:
lBlessings = v.Blessings
}
}
// If the listener was setup asynchronously, there is a race between
// the listener being setup and the caller of this method trying to
// dispatch messages, thus it is setup synchronously.
ln, err := vc.Listen()
if err != nil {
return finish(nil, err)
}
vc.helper.AddReceiveBuffers(vc.VCI(), SharedFlowID, DefaultBytesBufferedPerFlow)
switch securityLevel {
case options.VCSecurityConfidential:
if principal == nil {
principal = AnonymousPrincipal
}
if lBlessings == nil {
lBlessings = principal.BlessingStore().Default()
}
case options.VCSecurityNone:
return finish(ln, nil)
default:
ln.Close()
return finish(nil, fmt.Errorf("unrecognized VC security level: %v", securityLevel))
}
go func() {
sendErr := func(err error) {
ln.Close()
result <- HandshakeResult{nil, vc.err(err)}
}
// TODO(ashankar): There should be a timeout on this Accept
// call. Otherwise, a malicious (or incompetent) client can
// consume server resources by sending many OpenVC messages but
// not following up with the handshake protocol. Same holds for
// the identity exchange protocol.
handshakeConn, err := ln.Accept()
if err != nil {
sendErr(fmt.Errorf("TLS handshake Flow not accepted: %v", err))
return
}
vc.mu.Lock()
vc.acceptHandshakeDone = make(chan struct{})
vc.handshakeFID = vc.findFlowLocked(handshakeConn)
vc.mu.Unlock()
// Establish TLS
crypter, err := crypto.NewTLSServer(handshakeConn, handshakeConn.LocalEndpoint(), handshakeConn.RemoteEndpoint(), vc.pool)
if err != nil {
sendErr(fmt.Errorf("failed to setup TLS: %v", err))
return
}
// Authenticate (exchange identities)
authConn, err := ln.Accept()
if err != nil {
sendErr(fmt.Errorf("Authentication Flow not accepted: %v", err))
return
}
vc.mu.Lock()
vc.authFID = vc.findFlowLocked(authConn)
vc.mu.Unlock()
rBlessings, rDischarges, err := AuthenticateAsServer(authConn, principal, lBlessings, dischargeClient, crypter, vc.version)
if err != nil {
sendErr(fmt.Errorf("authentication failed %v", err))
return
}
vc.mu.Lock()
vc.crypter = crypter
vc.localPrincipal = principal
vc.localBlessings = lBlessings
vc.remoteBlessings = rBlessings
vc.remoteDischarges = rDischarges
close(vc.acceptHandshakeDone)
vc.acceptHandshakeDone = nil
vc.mu.Unlock()
vlog.VI(1).Infof("Server VC %v authenticated. RemoteBlessings:%v, LocalBlessings:%v", vc, rBlessings, lBlessings)
result <- HandshakeResult{ln, nil}
}()
return result
}
// Encrypt uses the VC's encryption scheme to encrypt the provided data payload.
// Always takes ownership of plaintext.
func (vc *VC) Encrypt(fid id.Flow, plaintext *iobuf.Slice) (cipherslice *iobuf.Slice, err error) {
if plaintext == nil {
return nil, nil
}
vc.mu.Lock()
if fid == vc.handshakeFID || fid == vc.authFID {
cipherslice = plaintext
} else {
cipherslice, err = vc.crypter.Encrypt(plaintext)
}
vc.mu.Unlock()
return
}
func (vc *VC) allocFID() id.Flow {
vc.mu.Lock()
ret := vc.nextConnectFID
vc.nextConnectFID += 2
vc.mu.Unlock()
return ret
}
func (vc *VC) newWriter(fid id.Flow) (*writer, error) {
bq, err := vc.helper.NewWriter(vc.vci, fid)
if err != nil {
return nil, err
}
alloc := iobuf.NewAllocator(vc.pool, vc.reserveBytes)
return newWriter(MaxPayloadSizeBytes, bq, alloc, vc.sharedCounters), nil
}
// findFlowLocked finds the flow id for the provided flow.
// REQUIRES: vc.mu is held
// Returns 0 if there is none.
func (vc *VC) findFlowLocked(flow interface{}) id.Flow {
const invalidFlowID = 0
// This operation is rare and early enough (called when there are <= 2
// flows over the VC) that iteration to the map should be fine.
for fid, f := range vc.flowMap {
if f == flow {
return fid
}
}
return invalidFlowID
}
// VCI returns the identifier of this VC.
func (vc *VC) VCI() id.VC { return vc.vci }
// LocalPrincipal returns the principal that authenticated with the remote end of the VC.
func (vc *VC) LocalPrincipal() security.Principal {
vc.mu.Lock()
defer vc.mu.Unlock()
vc.waitForHandshakeLocked()
return vc.localPrincipal
}
// LocalBlessings returns the blessings (bound to LocalPrincipal) presented to the
// remote end of the VC during authentication.
func (vc *VC) LocalBlessings() security.Blessings {
vc.mu.Lock()
defer vc.mu.Unlock()
vc.waitForHandshakeLocked()
return vc.localBlessings
}
// RemoteBlessings returns the blessings presented by the remote end of the VC during
// authentication.
func (vc *VC) RemoteBlessings() security.Blessings {
vc.mu.Lock()
defer vc.mu.Unlock()
vc.waitForHandshakeLocked()
return vc.remoteBlessings
}
// RemoteDischarges returns the discharges presented by the remote end of the VC during
// authentication.
func (vc *VC) RemoteDischarges() map[string]security.Discharge {
vc.mu.Lock()
defer vc.mu.Unlock()
vc.waitForHandshakeLocked()
return vc.remoteDischarges
}
// waitForHandshakeLocked blocks until an in-progress handshake (encryption
// setup and authentication) completes.
// REQUIRES: vc.mu is held.
func (vc *VC) waitForHandshakeLocked() {
if hsd := vc.acceptHandshakeDone; hsd != nil {
vc.mu.Unlock()
<-hsd
vc.mu.Lock()
}
}
func (vc *VC) String() string {
return fmt.Sprintf("VCI:%d (%v<->%v)", vc.vci, vc.localEP, vc.remoteEP)
}
// DebugString returns a string representation of the state of a VC.
//
// The format of the returned string is meant to be human-friendly and the
// specific format should not be relied upon for automated processing.
func (vc *VC) DebugString() string {
vc.mu.Lock()
l := make([]string, 0, len(vc.flowMap)+1)
l = append(l, fmt.Sprintf("VCI:%d -- Endpoints:(Local:%q Remote:%q) #Flows:%d NextConnectFID:%d",
vc.vci,
vc.localEP,
vc.remoteEP,
len(vc.flowMap),
vc.nextConnectFID))
if vc.crypter == nil {
l = append(l, "Handshake not completed yet")
} else {
l = append(l, "Encryption: "+vc.crypter.String())
if vc.localPrincipal != nil {
l = append(l, fmt.Sprintf("LocalPrincipal:%v LocalBlessings:%v RemoteBlessings:%v", vc.localPrincipal.PublicKey(), vc.localBlessings, vc.remoteBlessings))
}
}
for fid, f := range vc.flowMap {
l = append(l, fmt.Sprintf(" Flow:%3d BytesRead:%7d BytesWritten:%7d", fid, f.BytesRead(), f.BytesWritten()))
}
vc.mu.Unlock()
sort.Strings(l[1:])
return strings.Join(l, "\n")
}
// readHandlerImpl is an adapter for the readHandler interface required by
// the reader type.
type readHandlerImpl struct {
vc *VC
fid id.Flow
}
func (r readHandlerImpl) HandleRead(bytes uint) {
r.vc.helper.AddReceiveBuffers(r.vc.vci, r.fid, bytes)
}