blob: 14e3a931a4b250691f914a793970f83180abe7ae [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package vc
import (
"bytes"
"fmt"
"io"
"sync"
"time"
"v.io/v23/context"
"v.io/v23/naming"
"v.io/v23/rpc/version"
"v.io/v23/security"
"v.io/v23/vom"
"v.io/v23/vtrace"
"v.io/x/lib/vlog"
"v.io/x/ref/runtime/internal/lib/bqueue"
"v.io/x/ref/runtime/internal/lib/bqueue/drrqueue"
"v.io/x/ref/runtime/internal/lib/iobuf"
vsync "v.io/x/ref/runtime/internal/lib/sync"
"v.io/x/ref/runtime/internal/lib/upcqueue"
"v.io/x/ref/runtime/internal/rpc/stream"
"v.io/x/ref/runtime/internal/rpc/stream/crypto"
"v.io/x/ref/runtime/internal/rpc/stream/id"
"v.io/x/ref/runtime/internal/rpc/stream/message"
iversion "v.io/x/ref/runtime/internal/rpc/version"
)
var pool = iobuf.NewPool(0)
var nullCipher = crypto.NullControlCipher{}
var stopToken = iobuf.NewSlice(make([]byte, 1))
// This is kind of a hack. If you send this down a flow then all
// bytes sent after will be sent unencyrpted.
var disableEncryption = iobuf.NewSlice(make([]byte, 1))
const (
// Priorities of the buffered queues used for flow control of writes.
authPriority bqueue.Priority = iota
expressPriority
systemXFlowPriority
normalXFlowPriority
stopPriority
)
type flowState struct {
flow *flow
encryptionDisabled bool
}
type xvc struct {
mu sync.Mutex
flowMap map[id.Flow]*flowState
lDischarges, rDischarges map[string]security.Discharge
nextConnectFID id.Flow
incommingMu sync.Mutex
incomming *upcqueue.T
cipher crypto.ControlCipher
version version.RPCVersion
principal security.Principal
lBlessings, rBlessings security.Blessings
local, remote naming.Endpoint
conn stream.XConn
outgoing bqueue.T
expressQ, stopQ bqueue.Writer
sharedCounters *vsync.Semaphore
reader *iobuf.Reader
closed chan struct{}
dataCache *dataCache
dClient DischargeClient
dBuffer time.Duration
}
func commonInit(
ctx *context.T,
conn stream.XConn,
principal security.Principal,
versions iversion.Range) (*xvc, stream.XFlow, error) {
vc := &xvc{
principal: principal,
cipher: nullCipher,
conn: conn,
outgoing: drrqueue.New(MaxPayloadSizeBytes),
flowMap: make(map[id.Flow]*flowState),
sharedCounters: vsync.NewSemaphore(),
closed: make(chan struct{}),
dataCache: newDataCache(),
}
// Both sides create reserved flows ahead of time.
authFlow, err := vc.newFlow(AuthFlowID, authPriority)
authFlow.Release(DefaultBytesBufferedPerFlow)
return vc, authFlow, err
}
func XNewDialed(
ctx *context.T,
conn stream.XConn,
principal security.Principal,
local, remote naming.Endpoint,
versions iversion.Range,
opts ...stream.VCOpt) (stream.XVC, error) {
ctx, span := vtrace.WithNewSpan(ctx, "Dialing VC")
defer span.Finish()
vc, authFlow, err := commonInit(ctx, conn, principal, versions)
if err != nil {
return nil, err
}
vc.local = local
vc.nextConnectFID = NumReservedFlows
errch := make(chan error)
go func() {
errch <- vc.setup(ctx, local, remote, versions, principal != nil)
}()
if err := <-errch; err != nil {
vc.Close(err)
return nil, err
}
if principal == nil {
// Exit early for unencrypted VCs.
return vc, nil
}
if err := vc.authenticateClient(ctx, authFlow, opts...); err != nil {
vc.Close(err)
return nil, err
}
return vc, nil
}
func XNewAccepted(
ctx *context.T,
conn stream.XConn,
principal security.Principal,
local naming.Endpoint,
lBlessings security.Blessings,
versions iversion.Range,
incomming *upcqueue.T,
opts ...stream.ListenerOpt) (stream.XVC, error) {
ctx, span := vtrace.WithNewSpan(ctx, "Accepting VC")
defer span.Finish()
vc, authFlow, err := commonInit(ctx, conn, principal, versions)
if err != nil {
return nil, err
}
vc.incommingMu.Lock()
// We unlock this only AFTER authentication is finished to ensure
// that no flows are announced until we're fully initialized.
// TODO(mattr): Find a better way.
defer vc.incommingMu.Unlock()
vc.incomming = incomming
vc.local = local
vc.lBlessings = lBlessings
vc.nextConnectFID = NumReservedFlows + 1
vc.dClient, vc.dBuffer = dischargeOptions(opts)
errch := make(chan error)
go func() {
// TODO(mattr): Sending local twice, but... what should we send?
errch <- vc.setup(ctx, local, local, versions, principal != nil)
}()
if err := <-errch; err != nil {
vc.Close(err)
return nil, err
}
if principal == nil {
//Exit early for unencrypted VCs.
return vc, nil
}
if err := vc.authenticateServer(ctx, authFlow, opts...); err != nil {
vc.Close(err)
return nil, err
}
return vc, nil
}
func (vc *xvc) setup(
ctx *context.T,
local, remote naming.Endpoint,
versions iversion.Range,
encrypted bool) error {
ctx, span := vtrace.WithNewSpan(ctx, "Setup")
defer span.Finish()
_, keySpan := vtrace.WithNewSpan(ctx, "Generate Key")
pk, sk, err := crypto.GenerateBoxKey()
if err != nil {
return err
}
keySpan.Finish()
setup := &message.SetupVC{
LocalEndpoint: local,
RemoteEndpoint: remote,
Counters: message.NewCounters(),
Setup: message.Setup{
Versions: versions,
Options: []message.SetupOption{&message.NaclBox{PublicKey: *pk}},
},
}
setup.Counters.Add(0, SharedFlowID, DefaultBytesBufferedPerFlow)
// Send and receive a SetupVC message.
// Note we use the nullCipher here. The XConn is handling encryption.
errch := make(chan error, 1)
go func() {
_, span := vtrace.WithNewSpan(ctx, "Writing setup")
errch <- message.WriteTo(vc.conn, setup, nullCipher)
span.Finish()
}()
_, readSpan := vtrace.WithNewSpan(ctx, "Reading setup")
vc.reader = iobuf.NewReader(pool, vc.conn)
msg, err := message.ReadFrom(vc.reader, nullCipher)
if err != nil {
return err
}
remoteSetup, ok := msg.(*message.SetupVC)
if !ok {
return fmt.Errorf("wrong message.")
}
readSpan.Finish()
if err := <-errch; err != nil {
return err
}
vc.remote = remoteSetup.LocalEndpoint
// Release shared counters
vc.distributeCounters(remoteSetup.Counters)
// Choose a version.
vrange, err := versions.Intersect(&remoteSetup.Setup.Versions)
if err != nil {
return err
}
vc.version = vrange.Max
// Set up the cipher.
if encrypted {
_, naclSpan := vtrace.WithNewSpan(ctx, "Creating Cipher")
remoteKey := remoteSetup.Setup.NaclBox()
if remoteKey == nil {
return fmt.Errorf("missing key.")
}
// Disable the underlying encryption. We'll do our own from now on.
vc.conn.DisableEncryption()
vc.cipher = crypto.NewControlCipherRPC11(pk, sk, &remoteKey.PublicKey)
naclSpan.Finish()
}
vc.expressQ, err = vc.outgoing.NewWriter(ExpressFlowID, expressPriority, DefaultBytesBufferedPerFlow)
if err != nil {
return err
}
vc.expressQ.Release(-1) // Disable flow control
vc.stopQ, err = vc.outgoing.NewWriter(StopFlowID, stopPriority, DefaultBytesBufferedPerFlow)
if err != nil {
return err
}
vc.stopQ.Release(-1) // Disable flow control
// Start read and write loops
go vc.readLoop()
go vc.writeLoop()
return nil
}
func (vc *xvc) connectSystemFlows() error {
conn, err := vc.connect(TypeFlowID, systemXFlowPriority)
if err != nil {
return err
}
vc.dataCache.Insert(TypeEncoderKey{}, vom.NewTypeEncoder(conn))
vc.dataCache.Insert(TypeDecoderKey{}, vom.NewTypeDecoder(conn))
if len(vc.rBlessings.ThirdPartyCaveats()) > 0 {
conn, err = vc.connect(DischargeFlowID, systemXFlowPriority)
if err != nil {
return err
}
go vc.recvDischargesLoop(conn)
}
return nil
}
func (vc *xvc) sendDischargesLoop(conn io.WriteCloser, tpCavs []security.Caveat) {
defer conn.Close()
if vc.dClient == nil {
return
}
enc := vom.NewEncoder(conn)
discharges := vc.dClient.PrepareDischarges(nil, tpCavs, security.DischargeImpetus{})
for expiry := minExpiryTime(discharges, tpCavs); !expiry.IsZero(); expiry = minExpiryTime(discharges, tpCavs) {
select {
case <-time.After(fetchDuration(expiry, vc.dBuffer)):
discharges = vc.dClient.PrepareDischarges(nil, tpCavs, security.DischargeImpetus{})
if err := enc.Encode(discharges); err != nil {
vlog.Errorf("encoding discharges on VC %v failed: %v", vc, err)
return
}
if len(discharges) == 0 {
continue
}
vc.mu.Lock()
if vc.lDischarges == nil {
vc.lDischarges = make(map[string]security.Discharge)
}
for _, d := range discharges {
vc.lDischarges[d.ID()] = d
}
vc.mu.Unlock()
case <-vc.closed:
vlog.VI(3).Infof("closing sendDischargesLoop on VC %v", vc)
return
}
}
}
func (vc *xvc) recvDischargesLoop(conn io.ReadCloser) {
defer conn.Close()
dec := vom.NewDecoder(conn)
for {
var discharges []security.Discharge
if err := dec.Decode(&discharges); err != nil {
vlog.VI(3).Infof("decoding discharges on %v failed: %v", vc, err)
return
}
if len(discharges) == 0 {
continue
}
vc.mu.Lock()
if vc.rDischarges == nil {
vc.rDischarges = make(map[string]security.Discharge)
}
for _, d := range discharges {
vc.rDischarges[d.ID()] = d
}
vc.mu.Unlock()
}
}
func (vc *xvc) handleMessage(msg message.T) error {
switch m := msg.(type) {
case *message.Data:
payload := m.Payload
defer func() {
if payload != nil {
payload.Release()
}
}()
vc.mu.Lock()
fs := vc.flowMap[m.Flow]
if m.Close() && fs != nil {
defer fs.flow.Shutdown()
delete(vc.flowMap, m.Flow)
}
vc.mu.Unlock()
if payload.Size() == 0 {
return nil
}
if fs == nil {
vlog.Infof("Ignoring data packet for unkown flow: %#v", msg)
return nil
}
if err := fs.flow.reader.Put(payload); err != nil {
vlog.Errorf("Could not queue data packet: %#v", msg)
return err
}
payload = nil
case *message.AddReceiveBuffers:
vc.distributeCounters(m.Counters)
case *message.OpenFlow:
if err := vc.accept(m.Flow, int(m.InitialCounters)); err != nil {
cm := &message.Data{Flow: m.Flow}
cm.SetClose()
if err := vc.sendOnExpressQ(cm); err != nil {
return err
}
}
// TODO(mattr): The first write uses shared counters, but the first write is
// always just 1 byte :(
counters := message.NewCounters()
counters.Add(0, m.Flow, uint32(DefaultBytesBufferedPerFlow))
err := vc.sendOnExpressQ(&message.AddReceiveBuffers{
Counters: counters,
})
return err
case *message.CloseVC:
vc.stopQ.Put(stopToken, nil)
if m.Error != "" {
vlog.Errorf("VC Closed from remote end due to error: %s", m.Error)
return fmt.Errorf(m.Error)
}
return io.EOF
default:
vlog.Infof("Ignoring irrelevant or unrecognized message %T: ", m)
}
return nil
}
func (vc *xvc) distributeCounters(counters message.Counters) {
for cid, bytes := range counters {
if cid.Flow() == SharedFlowID {
vc.sharedCounters.IncN(uint(bytes))
continue
}
vc.mu.Lock()
f := vc.flowMap[cid.Flow()]
vc.mu.Unlock()
if f == nil {
vlog.Infof("ignoring counters for unknown flow: %d", cid.Flow())
continue
}
f.flow.Release(int(bytes))
}
}
func (vc *xvc) readLoop() {
for {
msg, err := message.ReadFrom(vc.reader, vc.cipher)
if err != nil {
vlog.Errorf("Exiting VC read loop: %v", err)
return
}
if err := vc.handleMessage(msg); err != nil {
if err != io.EOF {
vlog.Errorf("Could not handle message: %v", err)
}
return
}
}
}
func releaseBufs(bufs []*iobuf.Slice) {
for _, b := range bufs {
if b != disableEncryption && b != stopToken {
b.Release()
}
}
}
func (vc *xvc) writeData(writer bqueue.Writer, bufs []*iobuf.Slice) error {
defer releaseBufs(bufs)
fid := id.Flow(writer.ID())
var encryptionDisabled bool
vc.mu.Lock()
fs := vc.flowMap[fid]
if fs != nil {
encryptionDisabled = fs.encryptionDisabled
}
vc.mu.Unlock()
// TODO(mattr): coalesce, but only after finding the encryption boundary.
last := len(bufs) - 1
// TODO(mattr): This is broken if disableEncryption is last.
drained := writer.IsDrained()
if drained {
defer func() {
vc.mu.Lock()
delete(vc.flowMap, fid)
vc.mu.Unlock()
}()
}
for i, b := range bufs {
if b == disableEncryption {
encryptionDisabled = true
vc.mu.Lock()
if fs != nil {
fs.encryptionDisabled = true
}
vc.mu.Unlock()
continue
}
d := &message.Data{Flow: fid, Payload: b}
if !encryptionDisabled {
d.SetEncrypted()
}
if drained && i == last {
d.SetClose()
}
if err := message.WriteTo(vc.conn, d, vc.cipher); err != nil {
return err
}
}
if len(bufs) == 0 && drained {
d := &message.Data{Flow: fid}
d.SetClose()
if err := message.WriteTo(vc.conn, d, vc.cipher); err != nil {
return err
}
}
return nil
}
func (vc *xvc) writeSerializedMessage(msg *iobuf.Slice) error {
// TODO(mattr): All of this would be easier if the queues contained messages
// instead of byte arrays.
if err := message.EncryptMessage(msg.Contents, vc.cipher); err != nil {
return err
}
if _, err := vc.conn.Write(msg.Contents); err != nil {
return err
}
msg.Release()
return nil
}
func (vc *xvc) writeLoop() {
defer vc.outgoing.Close()
defer close(vc.closed)
for {
writer, bufs, err := vc.outgoing.Get(nil)
if err != nil {
return
}
switch writer {
case vc.expressQ:
for _, b := range bufs {
if err = vc.writeSerializedMessage(b); err != nil {
vlog.Errorf("Could not write control message: %v", err)
return
}
}
case vc.stopQ:
if len(bufs) > 0 {
// If we get stopToken here, then the vc was closed by the remote end.
if bufs[0] != stopToken {
if err := vc.writeSerializedMessage(bufs[0]); err != nil {
vlog.Errorf("Could not write CloseVC: %v", err)
}
}
}
return
default:
if err = vc.writeData(writer, bufs); err != nil {
vlog.Errorf("Error writing: %v", err)
return
}
}
}
}
func (vc *xvc) authenticateServer(ctx *context.T, authFlow stream.XFlow, opts ...stream.ListenerOpt) error {
ctx, span := vtrace.WithNewSpan(ctx, "Server Authentication")
defer span.Finish()
defer authFlow.Close()
if vc.lBlessings.IsZero() {
return fmt.Errorf("no server blessings")
}
// TODO(mattr): Check that authFlow has the right fid.
// This is a wart. We always pass a null crypter (with channel binding) even
// though this is all encrypted by the flow write. This is just because we
// want to re-use the existing Authenticate methods.
crypter := crypto.NewNullCrypterWithChannelBinding(vc.cipher.ChannelBinding())
var serverDischarges []security.Discharge
if tpcavs := vc.lBlessings.ThirdPartyCaveats(); len(tpcavs) > 0 && vc.dClient != nil {
_, span := vtrace.WithNewSpan(ctx, "Prepare Discharages")
serverDischarges = vc.dClient.PrepareDischarges(ctx, tpcavs, security.DischargeImpetus{})
span.Finish()
}
_, wspan := vtrace.WithNewSpan(ctx, "writeBlessings")
if err := writeBlessings(authFlow, authServerContextTag, crypter, vc.principal, vc.lBlessings, serverDischarges, vc.version); err != nil {
return err
}
wspan.Finish()
// Note that since the client uses a self-signed blessing to authenticate
// during VC setup, it does not share any discharges.
rBlessings, _, err := readBlessings(ctx, authFlow, authClientContextTag, crypter, vc.version)
if err != nil {
return err
}
vc.rBlessings = rBlessings
vc.lDischarges = mkDischargeMap(serverDischarges)
return nil
}
func (vc *xvc) authenticateClient(ctx *context.T, authFlow stream.XFlow, opts ...stream.VCOpt) error {
ctx, span := vtrace.WithNewSpan(ctx, "Client Authentication")
defer span.Finish()
defer authFlow.Close()
// This is a wart. We always pass a null crypter (with channel binding) even
// though this is all encrypted by the flow write. This is just because we
// want to re-use the existing Authenticate methods.
crypter := crypto.NewNullCrypterWithChannelBinding(vc.cipher.ChannelBinding())
server, serverDischarges, err := readBlessings(ctx, authFlow, authServerContextTag, crypter, vc.version)
if err != nil {
return err
}
// Authorize the server based on the provided authorizer.
auth := serverAuthorizer(opts)
if auth != nil {
_, authSpan := vtrace.WithNewSpan(ctx, "Authorize Server")
params := security.CallParams{
LocalPrincipal: vc.principal,
LocalEndpoint: vc.local,
RemoteEndpoint: vc.remote,
RemoteBlessings: server,
RemoteDischarges: serverDischarges,
}
if err := auth.Authorize(params); err != nil {
return fmt.Errorf("Not trusted.")
}
authSpan.Finish()
}
// The client shares its blessings at RPC time (as the blessings may vary
// across RPCs). During VC handshake, the client simply sends a self-signed
// blessing in order to reveal its public key to the server.
// TODO(suharshs): Actually we should just send the public key.
_, blessSpan := vtrace.WithNewSpan(ctx, "Bless self")
client, err := vc.principal.BlessSelf("vcauth")
if err != nil {
return fmt.Errorf("Could not create blessing")
}
blessSpan.Finish()
_, writeSpan := vtrace.WithNewSpan(ctx, "Write blessings")
if err := writeBlessings(authFlow, authClientContextTag, crypter, vc.principal, client, nil, vc.version); err != nil {
return err
}
writeSpan.Finish()
vc.lBlessings = client
vc.rBlessings = server
vc.rDischarges = serverDischarges
return vc.connectSystemFlows()
}
func (vc *xvc) newFlow(fid id.Flow, priority bqueue.Priority) (*flow, error) {
qw, err := vc.outgoing.NewWriter(bqueue.ID(fid), priority, MaxPayloadSizeBytes)
if err != nil {
return nil, err
}
f := &flow{
backingVC: vc,
reader: newReader(&xReadHandler{fid, vc}),
writer: newWriter(MaxPayloadSizeBytes, qw, iobuf.NewAllocator(pool, 0), vc.sharedCounters),
}
vc.mu.Lock()
defer vc.mu.Unlock()
if vc.flowMap == nil {
qw.Close()
return nil, fmt.Errorf("Could not create flow on closed VC.")
}
// TODO(mattr): it's an error if fid already exists.
vc.flowMap[fid] = &flowState{f, true}
return f, nil
}
func (vc *xvc) sendOnExpressQ(msg message.T) error {
return vc.sendOn(vc.expressQ, msg)
}
func (vc *xvc) sendOn(w bqueue.Writer, msg message.T) error {
var buf bytes.Buffer
if err := message.WriteTo(&buf, msg, crypto.NewDisabledControlCipher(vc.cipher)); err != nil {
return err
}
err := w.Put(iobuf.NewSlice(buf.Bytes()), nil)
return err
}
func (vc *xvc) connect(fid id.Flow, priority bqueue.Priority, opts ...stream.FlowOpt) (stream.XFlow, error) {
f, err := vc.newFlow(fid, priority)
if err != nil {
return nil, err
}
vc.sendOnExpressQ(&message.OpenFlow{
Flow: fid,
InitialCounters: uint32(DefaultBytesBufferedPerFlow),
})
return f, nil
}
func (vc *xvc) accept(fid id.Flow, initialCounters int) error {
priority := normalXFlowPriority
if fid < NumReservedFlows {
priority = systemXFlowPriority
}
f, err := vc.newFlow(fid, priority)
if err != nil {
return err
}
f.Release(initialCounters)
switch fid {
case TypeFlowID:
vc.dataCache.Insert(TypeEncoderKey{}, vom.NewTypeEncoder(f))
vc.dataCache.Insert(TypeDecoderKey{}, vom.NewTypeDecoder(f))
return nil
case DischargeFlowID:
tpCaveats := vc.lBlessings.ThirdPartyCaveats()
if len(tpCaveats) > 0 {
go vc.sendDischargesLoop(f, tpCaveats)
}
return nil
default:
vc.incommingMu.Lock()
q := vc.incomming
vc.incommingMu.Unlock()
if q == nil {
err = fmt.Errorf("VC not listening")
} else {
err = q.Put(f)
}
if err != nil {
vc.mu.Lock()
f.Shutdown()
delete(vc.flowMap, fid)
vc.mu.Unlock()
}
return err
}
}
func (vc *xvc) Connect(opts ...stream.FlowOpt) (stream.XFlow, error) {
vc.mu.Lock()
fid := vc.nextConnectFID
vc.nextConnectFID += 2
vc.mu.Unlock()
return vc.connect(fid, normalXFlowPriority, opts...)
}
func (vc *xvc) ListenTo(q *upcqueue.T) error {
vc.incommingMu.Lock()
defer vc.incommingMu.Unlock()
if vc.incomming != nil {
return fmt.Errorf("The given address already has a listener.")
}
vc.incomming = q
return nil
}
func (vc *xvc) Closed() chan struct{} {
return vc.closed
}
func (vc *xvc) Close(reason error) error {
vc.mu.Lock()
flows := vc.flowMap
vc.flowMap = nil
vc.mu.Unlock()
vc.incommingMu.Lock()
q := vc.incomming
vc.incomming = nil
vc.incommingMu.Unlock()
if q != nil {
q.Close()
}
vc.sharedCounters.Close()
for _, fs := range flows {
fs.flow.Close()
}
msg := ""
if reason != nil {
msg = reason.Error()
}
err := vc.sendOn(vc.stopQ, &message.CloseVC{
Error: msg,
})
return err
}
// Implement the backingVC interface for flows.
// Note I don't bother with locking because these are all set by the time the
// constructor is done.
func (vc *xvc) LocalEndpoint() naming.Endpoint { return vc.local }
func (vc *xvc) RemoteEndpoint() naming.Endpoint { return vc.remote }
func (vc *xvc) LocalPrincipal() security.Principal { return vc.principal }
func (vc *xvc) LocalBlessings() security.Blessings { return vc.lBlessings }
func (vc *xvc) RemoteBlessings() security.Blessings { return vc.rBlessings }
func (vc *xvc) LocalDischarges() map[string]security.Discharge { return vc.lDischarges }
func (vc *xvc) RemoteDischarges() map[string]security.Discharge { return vc.rDischarges }
func (vc *xvc) VCDataCache() stream.VCDataCache { return vc.dataCache }
type xReadHandler struct {
flow id.Flow
vc *xvc
}
func (h *xReadHandler) HandleRead(bytes uint) {
counters := message.NewCounters()
counters.Add(0, h.flow, uint32(bytes))
err := h.vc.sendOnExpressQ(&message.AddReceiveBuffers{
Counters: counters,
})
if err != nil {
vlog.Errorf("Unable to send counters: %v", err)
}
}