| // Copyright 2015 The Vanadium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package vc |
| |
| import ( |
| "bytes" |
| "fmt" |
| "io" |
| "sync" |
| "time" |
| |
| "v.io/v23/context" |
| "v.io/v23/naming" |
| "v.io/v23/rpc/version" |
| "v.io/v23/security" |
| "v.io/v23/vom" |
| "v.io/v23/vtrace" |
| "v.io/x/lib/vlog" |
| "v.io/x/ref/runtime/internal/lib/bqueue" |
| "v.io/x/ref/runtime/internal/lib/bqueue/drrqueue" |
| "v.io/x/ref/runtime/internal/lib/iobuf" |
| vsync "v.io/x/ref/runtime/internal/lib/sync" |
| "v.io/x/ref/runtime/internal/lib/upcqueue" |
| "v.io/x/ref/runtime/internal/rpc/stream" |
| "v.io/x/ref/runtime/internal/rpc/stream/crypto" |
| "v.io/x/ref/runtime/internal/rpc/stream/id" |
| "v.io/x/ref/runtime/internal/rpc/stream/message" |
| iversion "v.io/x/ref/runtime/internal/rpc/version" |
| ) |
| |
| var pool = iobuf.NewPool(0) |
| var nullCipher = crypto.NullControlCipher{} |
| var stopToken = iobuf.NewSlice(make([]byte, 1)) |
| |
| // This is kind of a hack. If you send this down a flow then all |
| // bytes sent after will be sent unencyrpted. |
| var disableEncryption = iobuf.NewSlice(make([]byte, 1)) |
| |
| const ( |
| // Priorities of the buffered queues used for flow control of writes. |
| authPriority bqueue.Priority = iota |
| expressPriority |
| systemXFlowPriority |
| normalXFlowPriority |
| stopPriority |
| ) |
| |
| type flowState struct { |
| flow *flow |
| encryptionDisabled bool |
| } |
| |
| type xvc struct { |
| mu sync.Mutex |
| flowMap map[id.Flow]*flowState |
| lDischarges, rDischarges map[string]security.Discharge |
| nextConnectFID id.Flow |
| |
| incommingMu sync.Mutex |
| incomming *upcqueue.T |
| |
| cipher crypto.ControlCipher |
| version version.RPCVersion |
| principal security.Principal |
| lBlessings, rBlessings security.Blessings |
| local, remote naming.Endpoint |
| conn stream.XConn |
| outgoing bqueue.T |
| expressQ, stopQ bqueue.Writer |
| sharedCounters *vsync.Semaphore |
| reader *iobuf.Reader |
| closed chan struct{} |
| dataCache *dataCache |
| dClient DischargeClient |
| dBuffer time.Duration |
| } |
| |
| func commonInit( |
| ctx *context.T, |
| conn stream.XConn, |
| principal security.Principal, |
| versions iversion.Range) (*xvc, stream.XFlow, error) { |
| vc := &xvc{ |
| principal: principal, |
| cipher: nullCipher, |
| conn: conn, |
| outgoing: drrqueue.New(MaxPayloadSizeBytes), |
| flowMap: make(map[id.Flow]*flowState), |
| sharedCounters: vsync.NewSemaphore(), |
| closed: make(chan struct{}), |
| dataCache: newDataCache(), |
| } |
| // Both sides create reserved flows ahead of time. |
| authFlow, err := vc.newFlow(AuthFlowID, authPriority) |
| authFlow.Release(DefaultBytesBufferedPerFlow) |
| return vc, authFlow, err |
| } |
| |
| func XNewDialed( |
| ctx *context.T, |
| conn stream.XConn, |
| principal security.Principal, |
| local, remote naming.Endpoint, |
| versions iversion.Range, |
| opts ...stream.VCOpt) (stream.XVC, error) { |
| ctx, span := vtrace.WithNewSpan(ctx, "Dialing VC") |
| defer span.Finish() |
| |
| vc, authFlow, err := commonInit(ctx, conn, principal, versions) |
| if err != nil { |
| return nil, err |
| } |
| |
| vc.local = local |
| vc.nextConnectFID = NumReservedFlows |
| |
| errch := make(chan error) |
| go func() { |
| errch <- vc.setup(ctx, local, remote, versions, principal != nil) |
| }() |
| if err := <-errch; err != nil { |
| vc.Close(err) |
| return nil, err |
| } |
| if principal == nil { |
| // Exit early for unencrypted VCs. |
| return vc, nil |
| } |
| if err := vc.authenticateClient(ctx, authFlow, opts...); err != nil { |
| vc.Close(err) |
| return nil, err |
| } |
| return vc, nil |
| } |
| |
| func XNewAccepted( |
| ctx *context.T, |
| conn stream.XConn, |
| principal security.Principal, |
| local naming.Endpoint, |
| lBlessings security.Blessings, |
| versions iversion.Range, |
| incomming *upcqueue.T, |
| opts ...stream.ListenerOpt) (stream.XVC, error) { |
| ctx, span := vtrace.WithNewSpan(ctx, "Accepting VC") |
| defer span.Finish() |
| |
| vc, authFlow, err := commonInit(ctx, conn, principal, versions) |
| if err != nil { |
| return nil, err |
| } |
| vc.incommingMu.Lock() |
| // We unlock this only AFTER authentication is finished to ensure |
| // that no flows are announced until we're fully initialized. |
| // TODO(mattr): Find a better way. |
| defer vc.incommingMu.Unlock() |
| vc.incomming = incomming |
| |
| vc.local = local |
| vc.lBlessings = lBlessings |
| vc.nextConnectFID = NumReservedFlows + 1 |
| vc.dClient, vc.dBuffer = dischargeOptions(opts) |
| |
| errch := make(chan error) |
| go func() { |
| // TODO(mattr): Sending local twice, but... what should we send? |
| errch <- vc.setup(ctx, local, local, versions, principal != nil) |
| }() |
| if err := <-errch; err != nil { |
| vc.Close(err) |
| return nil, err |
| } |
| if principal == nil { |
| //Exit early for unencrypted VCs. |
| return vc, nil |
| } |
| if err := vc.authenticateServer(ctx, authFlow, opts...); err != nil { |
| vc.Close(err) |
| return nil, err |
| } |
| return vc, nil |
| } |
| |
| func (vc *xvc) setup( |
| ctx *context.T, |
| local, remote naming.Endpoint, |
| versions iversion.Range, |
| encrypted bool) error { |
| ctx, span := vtrace.WithNewSpan(ctx, "Setup") |
| defer span.Finish() |
| |
| _, keySpan := vtrace.WithNewSpan(ctx, "Generate Key") |
| pk, sk, err := crypto.GenerateBoxKey() |
| if err != nil { |
| return err |
| } |
| keySpan.Finish() |
| |
| setup := &message.SetupVC{ |
| LocalEndpoint: local, |
| RemoteEndpoint: remote, |
| Counters: message.NewCounters(), |
| Setup: message.Setup{ |
| Versions: versions, |
| Options: []message.SetupOption{&message.NaclBox{PublicKey: *pk}}, |
| }, |
| } |
| setup.Counters.Add(0, SharedFlowID, DefaultBytesBufferedPerFlow) |
| |
| // Send and receive a SetupVC message. |
| // Note we use the nullCipher here. The XConn is handling encryption. |
| errch := make(chan error, 1) |
| go func() { |
| _, span := vtrace.WithNewSpan(ctx, "Writing setup") |
| errch <- message.WriteTo(vc.conn, setup, nullCipher) |
| span.Finish() |
| }() |
| _, readSpan := vtrace.WithNewSpan(ctx, "Reading setup") |
| vc.reader = iobuf.NewReader(pool, vc.conn) |
| msg, err := message.ReadFrom(vc.reader, nullCipher) |
| if err != nil { |
| return err |
| } |
| remoteSetup, ok := msg.(*message.SetupVC) |
| if !ok { |
| return fmt.Errorf("wrong message.") |
| } |
| readSpan.Finish() |
| |
| if err := <-errch; err != nil { |
| return err |
| } |
| vc.remote = remoteSetup.LocalEndpoint |
| |
| // Release shared counters |
| vc.distributeCounters(remoteSetup.Counters) |
| |
| // Choose a version. |
| vrange, err := versions.Intersect(&remoteSetup.Setup.Versions) |
| if err != nil { |
| return err |
| } |
| vc.version = vrange.Max |
| |
| // Set up the cipher. |
| if encrypted { |
| _, naclSpan := vtrace.WithNewSpan(ctx, "Creating Cipher") |
| remoteKey := remoteSetup.Setup.NaclBox() |
| if remoteKey == nil { |
| return fmt.Errorf("missing key.") |
| } |
| // Disable the underlying encryption. We'll do our own from now on. |
| vc.conn.DisableEncryption() |
| |
| vc.cipher = crypto.NewControlCipherRPC11(pk, sk, &remoteKey.PublicKey) |
| naclSpan.Finish() |
| } |
| |
| vc.expressQ, err = vc.outgoing.NewWriter(ExpressFlowID, expressPriority, DefaultBytesBufferedPerFlow) |
| if err != nil { |
| return err |
| } |
| vc.expressQ.Release(-1) // Disable flow control |
| vc.stopQ, err = vc.outgoing.NewWriter(StopFlowID, stopPriority, DefaultBytesBufferedPerFlow) |
| if err != nil { |
| return err |
| } |
| vc.stopQ.Release(-1) // Disable flow control |
| |
| // Start read and write loops |
| go vc.readLoop() |
| go vc.writeLoop() |
| |
| return nil |
| } |
| |
| func (vc *xvc) connectSystemFlows() error { |
| conn, err := vc.connect(TypeFlowID, systemXFlowPriority) |
| if err != nil { |
| return err |
| } |
| vc.dataCache.Insert(TypeEncoderKey{}, vom.NewTypeEncoder(conn)) |
| vc.dataCache.Insert(TypeDecoderKey{}, vom.NewTypeDecoder(conn)) |
| |
| if len(vc.rBlessings.ThirdPartyCaveats()) > 0 { |
| conn, err = vc.connect(DischargeFlowID, systemXFlowPriority) |
| if err != nil { |
| return err |
| } |
| go vc.recvDischargesLoop(conn) |
| } |
| return nil |
| } |
| |
| func (vc *xvc) sendDischargesLoop(conn io.WriteCloser, tpCavs []security.Caveat) { |
| defer conn.Close() |
| if vc.dClient == nil { |
| return |
| } |
| enc := vom.NewEncoder(conn) |
| discharges := vc.dClient.PrepareDischarges(nil, tpCavs, security.DischargeImpetus{}) |
| for expiry := minExpiryTime(discharges, tpCavs); !expiry.IsZero(); expiry = minExpiryTime(discharges, tpCavs) { |
| select { |
| case <-time.After(fetchDuration(expiry, vc.dBuffer)): |
| discharges = vc.dClient.PrepareDischarges(nil, tpCavs, security.DischargeImpetus{}) |
| if err := enc.Encode(discharges); err != nil { |
| vlog.Errorf("encoding discharges on VC %v failed: %v", vc, err) |
| return |
| } |
| if len(discharges) == 0 { |
| continue |
| } |
| vc.mu.Lock() |
| if vc.lDischarges == nil { |
| vc.lDischarges = make(map[string]security.Discharge) |
| } |
| for _, d := range discharges { |
| vc.lDischarges[d.ID()] = d |
| } |
| vc.mu.Unlock() |
| case <-vc.closed: |
| vlog.VI(3).Infof("closing sendDischargesLoop on VC %v", vc) |
| return |
| } |
| } |
| } |
| |
| func (vc *xvc) recvDischargesLoop(conn io.ReadCloser) { |
| defer conn.Close() |
| dec := vom.NewDecoder(conn) |
| for { |
| var discharges []security.Discharge |
| if err := dec.Decode(&discharges); err != nil { |
| vlog.VI(3).Infof("decoding discharges on %v failed: %v", vc, err) |
| return |
| } |
| if len(discharges) == 0 { |
| continue |
| } |
| vc.mu.Lock() |
| if vc.rDischarges == nil { |
| vc.rDischarges = make(map[string]security.Discharge) |
| } |
| for _, d := range discharges { |
| vc.rDischarges[d.ID()] = d |
| } |
| vc.mu.Unlock() |
| } |
| } |
| |
| func (vc *xvc) handleMessage(msg message.T) error { |
| switch m := msg.(type) { |
| |
| case *message.Data: |
| payload := m.Payload |
| defer func() { |
| if payload != nil { |
| payload.Release() |
| } |
| }() |
| vc.mu.Lock() |
| fs := vc.flowMap[m.Flow] |
| if m.Close() && fs != nil { |
| defer fs.flow.Shutdown() |
| delete(vc.flowMap, m.Flow) |
| } |
| vc.mu.Unlock() |
| if payload.Size() == 0 { |
| return nil |
| } |
| if fs == nil { |
| vlog.Infof("Ignoring data packet for unkown flow: %#v", msg) |
| return nil |
| } |
| if err := fs.flow.reader.Put(payload); err != nil { |
| vlog.Errorf("Could not queue data packet: %#v", msg) |
| return err |
| } |
| payload = nil |
| |
| case *message.AddReceiveBuffers: |
| vc.distributeCounters(m.Counters) |
| |
| case *message.OpenFlow: |
| if err := vc.accept(m.Flow, int(m.InitialCounters)); err != nil { |
| cm := &message.Data{Flow: m.Flow} |
| cm.SetClose() |
| if err := vc.sendOnExpressQ(cm); err != nil { |
| return err |
| } |
| } |
| // TODO(mattr): The first write uses shared counters, but the first write is |
| // always just 1 byte :( |
| counters := message.NewCounters() |
| counters.Add(0, m.Flow, uint32(DefaultBytesBufferedPerFlow)) |
| err := vc.sendOnExpressQ(&message.AddReceiveBuffers{ |
| Counters: counters, |
| }) |
| return err |
| |
| case *message.CloseVC: |
| vc.stopQ.Put(stopToken, nil) |
| if m.Error != "" { |
| vlog.Errorf("VC Closed from remote end due to error: %s", m.Error) |
| return fmt.Errorf(m.Error) |
| } |
| return io.EOF |
| |
| default: |
| vlog.Infof("Ignoring irrelevant or unrecognized message %T: ", m) |
| } |
| return nil |
| } |
| |
| func (vc *xvc) distributeCounters(counters message.Counters) { |
| for cid, bytes := range counters { |
| if cid.Flow() == SharedFlowID { |
| vc.sharedCounters.IncN(uint(bytes)) |
| continue |
| } |
| vc.mu.Lock() |
| f := vc.flowMap[cid.Flow()] |
| vc.mu.Unlock() |
| if f == nil { |
| vlog.Infof("ignoring counters for unknown flow: %d", cid.Flow()) |
| continue |
| } |
| f.flow.Release(int(bytes)) |
| } |
| } |
| |
| func (vc *xvc) readLoop() { |
| for { |
| msg, err := message.ReadFrom(vc.reader, vc.cipher) |
| if err != nil { |
| vlog.Errorf("Exiting VC read loop: %v", err) |
| return |
| } |
| if err := vc.handleMessage(msg); err != nil { |
| if err != io.EOF { |
| vlog.Errorf("Could not handle message: %v", err) |
| } |
| return |
| } |
| } |
| } |
| |
| func releaseBufs(bufs []*iobuf.Slice) { |
| for _, b := range bufs { |
| if b != disableEncryption && b != stopToken { |
| b.Release() |
| } |
| } |
| } |
| |
| func (vc *xvc) writeData(writer bqueue.Writer, bufs []*iobuf.Slice) error { |
| defer releaseBufs(bufs) |
| fid := id.Flow(writer.ID()) |
| var encryptionDisabled bool |
| vc.mu.Lock() |
| fs := vc.flowMap[fid] |
| if fs != nil { |
| encryptionDisabled = fs.encryptionDisabled |
| } |
| vc.mu.Unlock() |
| |
| // TODO(mattr): coalesce, but only after finding the encryption boundary. |
| last := len(bufs) - 1 |
| // TODO(mattr): This is broken if disableEncryption is last. |
| drained := writer.IsDrained() |
| |
| if drained { |
| defer func() { |
| vc.mu.Lock() |
| delete(vc.flowMap, fid) |
| vc.mu.Unlock() |
| }() |
| } |
| |
| for i, b := range bufs { |
| if b == disableEncryption { |
| encryptionDisabled = true |
| vc.mu.Lock() |
| if fs != nil { |
| fs.encryptionDisabled = true |
| } |
| vc.mu.Unlock() |
| continue |
| } |
| |
| d := &message.Data{Flow: fid, Payload: b} |
| if !encryptionDisabled { |
| d.SetEncrypted() |
| } |
| if drained && i == last { |
| d.SetClose() |
| } |
| if err := message.WriteTo(vc.conn, d, vc.cipher); err != nil { |
| return err |
| } |
| } |
| if len(bufs) == 0 && drained { |
| d := &message.Data{Flow: fid} |
| d.SetClose() |
| if err := message.WriteTo(vc.conn, d, vc.cipher); err != nil { |
| return err |
| } |
| } |
| return nil |
| } |
| |
| func (vc *xvc) writeSerializedMessage(msg *iobuf.Slice) error { |
| // TODO(mattr): All of this would be easier if the queues contained messages |
| // instead of byte arrays. |
| if err := message.EncryptMessage(msg.Contents, vc.cipher); err != nil { |
| return err |
| } |
| if _, err := vc.conn.Write(msg.Contents); err != nil { |
| return err |
| } |
| msg.Release() |
| return nil |
| } |
| |
| func (vc *xvc) writeLoop() { |
| defer vc.outgoing.Close() |
| defer close(vc.closed) |
| for { |
| writer, bufs, err := vc.outgoing.Get(nil) |
| if err != nil { |
| return |
| } |
| switch writer { |
| case vc.expressQ: |
| for _, b := range bufs { |
| if err = vc.writeSerializedMessage(b); err != nil { |
| vlog.Errorf("Could not write control message: %v", err) |
| return |
| } |
| } |
| case vc.stopQ: |
| if len(bufs) > 0 { |
| // If we get stopToken here, then the vc was closed by the remote end. |
| if bufs[0] != stopToken { |
| if err := vc.writeSerializedMessage(bufs[0]); err != nil { |
| vlog.Errorf("Could not write CloseVC: %v", err) |
| } |
| } |
| } |
| return |
| default: |
| if err = vc.writeData(writer, bufs); err != nil { |
| vlog.Errorf("Error writing: %v", err) |
| return |
| } |
| } |
| } |
| } |
| |
| func (vc *xvc) authenticateServer(ctx *context.T, authFlow stream.XFlow, opts ...stream.ListenerOpt) error { |
| ctx, span := vtrace.WithNewSpan(ctx, "Server Authentication") |
| defer span.Finish() |
| defer authFlow.Close() |
| |
| if vc.lBlessings.IsZero() { |
| return fmt.Errorf("no server blessings") |
| } |
| |
| // TODO(mattr): Check that authFlow has the right fid. |
| |
| // This is a wart. We always pass a null crypter (with channel binding) even |
| // though this is all encrypted by the flow write. This is just because we |
| // want to re-use the existing Authenticate methods. |
| crypter := crypto.NewNullCrypterWithChannelBinding(vc.cipher.ChannelBinding()) |
| |
| var serverDischarges []security.Discharge |
| if tpcavs := vc.lBlessings.ThirdPartyCaveats(); len(tpcavs) > 0 && vc.dClient != nil { |
| _, span := vtrace.WithNewSpan(ctx, "Prepare Discharages") |
| serverDischarges = vc.dClient.PrepareDischarges(ctx, tpcavs, security.DischargeImpetus{}) |
| span.Finish() |
| } |
| _, wspan := vtrace.WithNewSpan(ctx, "writeBlessings") |
| if err := writeBlessings(authFlow, authServerContextTag, crypter, vc.principal, vc.lBlessings, serverDischarges, vc.version); err != nil { |
| return err |
| } |
| wspan.Finish() |
| |
| // Note that since the client uses a self-signed blessing to authenticate |
| // during VC setup, it does not share any discharges. |
| rBlessings, _, err := readBlessings(ctx, authFlow, authClientContextTag, crypter, vc.version) |
| if err != nil { |
| return err |
| } |
| vc.rBlessings = rBlessings |
| vc.lDischarges = mkDischargeMap(serverDischarges) |
| return nil |
| } |
| |
| func (vc *xvc) authenticateClient(ctx *context.T, authFlow stream.XFlow, opts ...stream.VCOpt) error { |
| ctx, span := vtrace.WithNewSpan(ctx, "Client Authentication") |
| defer span.Finish() |
| defer authFlow.Close() |
| |
| // This is a wart. We always pass a null crypter (with channel binding) even |
| // though this is all encrypted by the flow write. This is just because we |
| // want to re-use the existing Authenticate methods. |
| crypter := crypto.NewNullCrypterWithChannelBinding(vc.cipher.ChannelBinding()) |
| |
| server, serverDischarges, err := readBlessings(ctx, authFlow, authServerContextTag, crypter, vc.version) |
| if err != nil { |
| return err |
| } |
| |
| // Authorize the server based on the provided authorizer. |
| auth := serverAuthorizer(opts) |
| if auth != nil { |
| _, authSpan := vtrace.WithNewSpan(ctx, "Authorize Server") |
| params := security.CallParams{ |
| LocalPrincipal: vc.principal, |
| LocalEndpoint: vc.local, |
| RemoteEndpoint: vc.remote, |
| RemoteBlessings: server, |
| RemoteDischarges: serverDischarges, |
| } |
| if err := auth.Authorize(params); err != nil { |
| return fmt.Errorf("Not trusted.") |
| } |
| authSpan.Finish() |
| } |
| |
| // The client shares its blessings at RPC time (as the blessings may vary |
| // across RPCs). During VC handshake, the client simply sends a self-signed |
| // blessing in order to reveal its public key to the server. |
| // TODO(suharshs): Actually we should just send the public key. |
| _, blessSpan := vtrace.WithNewSpan(ctx, "Bless self") |
| client, err := vc.principal.BlessSelf("vcauth") |
| if err != nil { |
| return fmt.Errorf("Could not create blessing") |
| } |
| blessSpan.Finish() |
| _, writeSpan := vtrace.WithNewSpan(ctx, "Write blessings") |
| if err := writeBlessings(authFlow, authClientContextTag, crypter, vc.principal, client, nil, vc.version); err != nil { |
| return err |
| } |
| writeSpan.Finish() |
| |
| vc.lBlessings = client |
| vc.rBlessings = server |
| vc.rDischarges = serverDischarges |
| |
| return vc.connectSystemFlows() |
| } |
| |
| func (vc *xvc) newFlow(fid id.Flow, priority bqueue.Priority) (*flow, error) { |
| qw, err := vc.outgoing.NewWriter(bqueue.ID(fid), priority, MaxPayloadSizeBytes) |
| if err != nil { |
| return nil, err |
| } |
| f := &flow{ |
| backingVC: vc, |
| reader: newReader(&xReadHandler{fid, vc}), |
| writer: newWriter(MaxPayloadSizeBytes, qw, iobuf.NewAllocator(pool, 0), vc.sharedCounters), |
| } |
| |
| vc.mu.Lock() |
| defer vc.mu.Unlock() |
| if vc.flowMap == nil { |
| qw.Close() |
| return nil, fmt.Errorf("Could not create flow on closed VC.") |
| } |
| // TODO(mattr): it's an error if fid already exists. |
| vc.flowMap[fid] = &flowState{f, true} |
| |
| return f, nil |
| } |
| |
| func (vc *xvc) sendOnExpressQ(msg message.T) error { |
| return vc.sendOn(vc.expressQ, msg) |
| } |
| |
| func (vc *xvc) sendOn(w bqueue.Writer, msg message.T) error { |
| var buf bytes.Buffer |
| if err := message.WriteTo(&buf, msg, crypto.NewDisabledControlCipher(vc.cipher)); err != nil { |
| return err |
| } |
| err := w.Put(iobuf.NewSlice(buf.Bytes()), nil) |
| return err |
| } |
| |
| func (vc *xvc) connect(fid id.Flow, priority bqueue.Priority, opts ...stream.FlowOpt) (stream.XFlow, error) { |
| f, err := vc.newFlow(fid, priority) |
| if err != nil { |
| return nil, err |
| } |
| |
| vc.sendOnExpressQ(&message.OpenFlow{ |
| Flow: fid, |
| InitialCounters: uint32(DefaultBytesBufferedPerFlow), |
| }) |
| return f, nil |
| } |
| |
| func (vc *xvc) accept(fid id.Flow, initialCounters int) error { |
| priority := normalXFlowPriority |
| if fid < NumReservedFlows { |
| priority = systemXFlowPriority |
| } |
| f, err := vc.newFlow(fid, priority) |
| if err != nil { |
| return err |
| } |
| f.Release(initialCounters) |
| switch fid { |
| case TypeFlowID: |
| vc.dataCache.Insert(TypeEncoderKey{}, vom.NewTypeEncoder(f)) |
| vc.dataCache.Insert(TypeDecoderKey{}, vom.NewTypeDecoder(f)) |
| return nil |
| case DischargeFlowID: |
| tpCaveats := vc.lBlessings.ThirdPartyCaveats() |
| if len(tpCaveats) > 0 { |
| go vc.sendDischargesLoop(f, tpCaveats) |
| } |
| return nil |
| default: |
| vc.incommingMu.Lock() |
| q := vc.incomming |
| vc.incommingMu.Unlock() |
| |
| if q == nil { |
| err = fmt.Errorf("VC not listening") |
| } else { |
| err = q.Put(f) |
| } |
| if err != nil { |
| vc.mu.Lock() |
| f.Shutdown() |
| delete(vc.flowMap, fid) |
| vc.mu.Unlock() |
| } |
| return err |
| } |
| } |
| |
| func (vc *xvc) Connect(opts ...stream.FlowOpt) (stream.XFlow, error) { |
| vc.mu.Lock() |
| fid := vc.nextConnectFID |
| vc.nextConnectFID += 2 |
| vc.mu.Unlock() |
| return vc.connect(fid, normalXFlowPriority, opts...) |
| } |
| |
| func (vc *xvc) ListenTo(q *upcqueue.T) error { |
| vc.incommingMu.Lock() |
| defer vc.incommingMu.Unlock() |
| if vc.incomming != nil { |
| return fmt.Errorf("The given address already has a listener.") |
| } |
| vc.incomming = q |
| return nil |
| } |
| |
| func (vc *xvc) Closed() chan struct{} { |
| return vc.closed |
| } |
| |
| func (vc *xvc) Close(reason error) error { |
| vc.mu.Lock() |
| flows := vc.flowMap |
| vc.flowMap = nil |
| vc.mu.Unlock() |
| |
| vc.incommingMu.Lock() |
| q := vc.incomming |
| vc.incomming = nil |
| vc.incommingMu.Unlock() |
| |
| if q != nil { |
| q.Close() |
| } |
| vc.sharedCounters.Close() |
| |
| for _, fs := range flows { |
| fs.flow.Close() |
| } |
| |
| msg := "" |
| if reason != nil { |
| msg = reason.Error() |
| } |
| err := vc.sendOn(vc.stopQ, &message.CloseVC{ |
| Error: msg, |
| }) |
| return err |
| } |
| |
| // Implement the backingVC interface for flows. |
| // Note I don't bother with locking because these are all set by the time the |
| // constructor is done. |
| func (vc *xvc) LocalEndpoint() naming.Endpoint { return vc.local } |
| func (vc *xvc) RemoteEndpoint() naming.Endpoint { return vc.remote } |
| func (vc *xvc) LocalPrincipal() security.Principal { return vc.principal } |
| func (vc *xvc) LocalBlessings() security.Blessings { return vc.lBlessings } |
| func (vc *xvc) RemoteBlessings() security.Blessings { return vc.rBlessings } |
| func (vc *xvc) LocalDischarges() map[string]security.Discharge { return vc.lDischarges } |
| func (vc *xvc) RemoteDischarges() map[string]security.Discharge { return vc.rDischarges } |
| func (vc *xvc) VCDataCache() stream.VCDataCache { return vc.dataCache } |
| |
| type xReadHandler struct { |
| flow id.Flow |
| vc *xvc |
| } |
| |
| func (h *xReadHandler) HandleRead(bytes uint) { |
| counters := message.NewCounters() |
| counters.Add(0, h.flow, uint32(bytes)) |
| err := h.vc.sendOnExpressQ(&message.AddReceiveBuffers{ |
| Counters: counters, |
| }) |
| if err != nil { |
| vlog.Errorf("Unable to send counters: %v", err) |
| } |
| } |