| // Copyright 2015 The Vanadium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package conn |
| |
| import ( |
| "fmt" |
| "math" |
| "reflect" |
| "sync" |
| "time" |
| |
| "v.io/v23" |
| "v.io/v23/context" |
| "v.io/v23/flow" |
| "v.io/v23/flow/message" |
| "v.io/v23/naming" |
| "v.io/v23/rpc/version" |
| "v.io/v23/security" |
| "v.io/v23/verror" |
| slib "v.io/x/ref/lib/security" |
| ) |
| |
| const ( |
| invalidFlowID = iota |
| blessingsFlowID |
| reservedFlows = 10 |
| ) |
| |
| const ( |
| expressPriority = iota |
| flowPriority |
| tearDownPriority |
| |
| // Must be last. |
| numPriorities |
| ) |
| |
| // minChannelTimeout keeps track of minimum values that we allow for channel |
| // timeout on a per-protocol basis. This is to prevent people setting some |
| // overall limit that doesn't make sense for very slow protocols. |
| // TODO(mattr): We should consider allowing users to set this per-protocol, or |
| // perhaps having the protocol implementation expose it via some kind of |
| // ChannelOpts interface. |
| var minChannelTimeout = map[string]time.Duration{ |
| "bt": 10 * time.Second, |
| } |
| |
| const ( |
| defaultMtu = 1 << 16 |
| defaultChannelTimeout = 30 * time.Minute |
| DefaultBytesBufferedPerFlow = 1 << 20 |
| proxyOverhead = 32 |
| ) |
| |
| // A FlowHandler processes accepted flows. |
| type FlowHandler interface { |
| // HandleFlow processes an accepted flow. |
| HandleFlow(flow.Flow) error |
| } |
| |
| type Status int |
| |
| // Note that this is a progression of states that can only |
| // go in one direction. We use inequality operators to see |
| // how far along in the progression we are, so the order of |
| // these is important. |
| const ( |
| Active Status = iota |
| EnteringLameDuck |
| LameDuckAcknowledged |
| Closing |
| Closed |
| ) |
| |
| type healthCheckState struct { |
| requestSent time.Time |
| requestTimer *time.Timer |
| requestDeadline time.Time |
| lastRTT time.Duration |
| |
| closeTimer *time.Timer |
| closeDeadline time.Time |
| } |
| |
| // A Conn acts as a multiplexing encrypted channel that can host Flows. |
| type Conn struct { |
| // All the variables here are set before the constructor returns |
| // and never changed after that. |
| mp *messagePipe |
| version version.RPCVersion |
| local, remote naming.Endpoint |
| closed chan struct{} |
| lameDucked chan struct{} |
| blessingsFlow *blessingsFlow |
| loopWG sync.WaitGroup |
| unopenedFlows sync.WaitGroup |
| cancel context.CancelFunc |
| handler FlowHandler |
| mtu uint64 |
| |
| mu sync.Mutex // All the variables below here are protected by mu. |
| |
| localBlessings, remoteBlessings security.Blessings |
| localDischarges, remoteDischarges map[string]security.Discharge |
| localValid <-chan struct{} |
| remoteValid chan struct{} |
| rPublicKey security.PublicKey |
| status Status |
| remoteLameDuck bool |
| nextFid uint64 |
| lastUsedTime time.Time |
| flows map[uint64]*flw |
| hcstate *healthCheckState |
| acceptChannelTimeout time.Duration |
| |
| // TODO(mattr): Integrate these maps back into the flows themselves as |
| // has been done with the sending counts. |
| // toRelease is a map from flowID to a number of tokens which are pending |
| // to be released. We only send release messages when some flow has |
| // used up at least half it's buffer, and then we send the counters for |
| // every flow. This reduces the number of release messages that are sent. |
| toRelease map[uint64]uint64 |
| // borrowing is a map from flowID to a boolean indicating whether the remote |
| // dialer of the flow is using shared counters for his sends because we've not |
| // yet sent a release for this flow. |
| borrowing map[uint64]bool |
| |
| // In our protocol new flows are opened by the dialer by immediately |
| // starting to write data for that flow (in an OpenFlow message). |
| // Since the other side doesn't yet know of the existence of this new |
| // flow, it couldn't have allocated us any counters via a Release message. |
| // In order to deal with this the conn maintains a pool of shared tokens |
| // which are used by dialers of new flows. |
| // lshared is the number of shared tokens available for new flows dialed |
| // locally. |
| lshared uint64 |
| // outstandingBorrowed is a map from flowID to a number of borrowed tokens. |
| // This map is populated when a flow closes locally before it receives a remote close |
| // or a release message. In this case we need to remember that we have already |
| // used these counters and return them to the shared pool when we get |
| // a close or release. |
| outstandingBorrowed map[uint64]uint64 |
| |
| // activeWriters keeps track of all the flows that are currently |
| // trying to write, indexed by priority. activeWriters[0] is a list |
| // (note that writers form a linked list for this purpose) |
| // of all the highest priority flows. activeWriters[len-1] is a list |
| // of all the lowest priority writing flows. |
| activeWriters []writer |
| writing writer |
| } |
| |
| // Ensure that *Conn implements flow.ManagedConn. |
| var _ flow.ManagedConn = &Conn{} |
| |
| // NewDialed dials a new Conn on the given conn. |
| func NewDialed( |
| ctx *context.T, |
| conn flow.MsgReadWriteCloser, |
| local, remote naming.Endpoint, |
| versions version.RPCVersionRange, |
| auth flow.PeerAuthorizer, |
| handshakeTimeout time.Duration, |
| channelTimeout time.Duration, |
| handler FlowHandler) (c *Conn, names []string, rejected []security.RejectedBlessing, err error) { |
| dctx := ctx |
| ctx, cancel := context.WithRootCancel(ctx) |
| if channelTimeout == 0 { |
| channelTimeout = defaultChannelTimeout |
| } |
| if min := minChannelTimeout[local.Protocol]; channelTimeout < min { |
| channelTimeout = min |
| } |
| |
| // If the conn is being built on an encapsulated flow, we must update the |
| // cancellation of the flow, to ensure that the conn doesn't get killed |
| // when the context passed in is cancelled. |
| if f, ok := conn.(*flw); ok { |
| ctx = f.SetDeadlineContext(ctx, time.Time{}) |
| } |
| c = &Conn{ |
| mp: newMessagePipe(conn), |
| handler: handler, |
| local: local, |
| remote: remote, |
| closed: make(chan struct{}), |
| lameDucked: make(chan struct{}), |
| nextFid: reservedFlows, |
| flows: map[uint64]*flw{}, |
| lastUsedTime: time.Now(), |
| toRelease: map[uint64]uint64{}, |
| borrowing: map[uint64]bool{}, |
| cancel: cancel, |
| outstandingBorrowed: make(map[uint64]uint64), |
| activeWriters: make([]writer, numPriorities), |
| acceptChannelTimeout: channelTimeout, |
| } |
| done := make(chan struct{}) |
| var rtt time.Duration |
| c.loopWG.Add(1) |
| go func() { |
| defer c.loopWG.Done() |
| defer close(done) |
| // We only send our real blessings if we are a server in addition to being a client. |
| // Otherwise, we only send our public key through a nameless blessings object. |
| // TODO(suharshs): Should we reveal server blessings if we are connecting to proxy here. |
| if handler != nil { |
| c.localBlessings, c.localValid = v23.GetPrincipal(ctx).BlessingStore().Default() |
| } else { |
| c.localBlessings, _ = security.NamelessBlessing(v23.GetPrincipal(ctx).PublicKey()) |
| } |
| names, rejected, rtt, err = c.dialHandshake(ctx, versions, auth) |
| }() |
| timer := time.NewTimer(handshakeTimeout) |
| var ferr error |
| select { |
| case <-done: |
| ferr = err |
| case <-timer.C: |
| ferr = verror.NewErrTimeout(ctx) |
| case <-dctx.Done(): |
| ferr = verror.NewErrCanceled(ctx) |
| } |
| timer.Stop() |
| if ferr != nil { |
| c.Close(ctx, ferr) |
| return nil, nil, nil, ferr |
| } |
| c.initializeHealthChecks(ctx, rtt) |
| // We send discharges asynchronously to prevent making a second RPC while |
| // trying to build up the connection for another. If the two RPCs happen to |
| // go to the same server a deadlock will result. |
| // This commonly happens when we make a Resolve call. During the Resolve we |
| // will try to fetch discharges to send to the mounttable, leading to a |
| // Resolve of the discharge server name. The two resolve calls may be to |
| // the same mounttable. |
| if handler != nil { |
| c.loopWG.Add(1) |
| go c.blessingsLoop(ctx, time.Now(), nil) |
| } |
| c.loopWG.Add(1) |
| go c.readLoop(ctx) |
| |
| c.mu.Lock() |
| c.lastUsedTime = time.Now() |
| c.mu.Unlock() |
| return c, names, rejected, nil |
| } |
| |
| // NewAccepted accepts a new Conn on the given conn. |
| func NewAccepted( |
| ctx *context.T, |
| lAuthorizedPeers []security.BlessingPattern, |
| conn flow.MsgReadWriteCloser, |
| local naming.Endpoint, |
| versions version.RPCVersionRange, |
| handshakeTimeout time.Duration, |
| channelTimeout time.Duration, |
| handler FlowHandler) (*Conn, error) { |
| ctx, cancel := context.WithCancel(ctx) |
| if channelTimeout == 0 { |
| channelTimeout = defaultChannelTimeout |
| } |
| if min := minChannelTimeout[local.Protocol]; channelTimeout < min { |
| channelTimeout = min |
| } |
| c := &Conn{ |
| mp: newMessagePipe(conn), |
| handler: handler, |
| local: local, |
| closed: make(chan struct{}), |
| lameDucked: make(chan struct{}), |
| nextFid: reservedFlows + 1, |
| flows: map[uint64]*flw{}, |
| lastUsedTime: time.Now(), |
| toRelease: map[uint64]uint64{}, |
| borrowing: map[uint64]bool{}, |
| cancel: cancel, |
| outstandingBorrowed: make(map[uint64]uint64), |
| activeWriters: make([]writer, numPriorities), |
| acceptChannelTimeout: channelTimeout, |
| } |
| done := make(chan struct{}, 1) |
| var rtt time.Duration |
| var err error |
| var refreshTime time.Time |
| c.loopWG.Add(1) |
| go func() { |
| defer c.loopWG.Done() |
| defer close(done) |
| principal := v23.GetPrincipal(ctx) |
| c.localBlessings, c.localValid = principal.BlessingStore().Default() |
| if c.localBlessings.IsZero() { |
| c.localBlessings, _ = security.NamelessBlessing(principal.PublicKey()) |
| } |
| c.localDischarges, refreshTime = slib.PrepareDischarges( |
| ctx, c.localBlessings, nil, "", nil) |
| rtt, err = c.acceptHandshake(ctx, versions, lAuthorizedPeers) |
| }() |
| timer := time.NewTimer(handshakeTimeout) |
| var ferr error |
| select { |
| case <-done: |
| ferr = err |
| case <-timer.C: |
| ferr = verror.NewErrTimeout(ctx) |
| case <-ctx.Done(): |
| ferr = verror.NewErrCanceled(ctx) |
| } |
| timer.Stop() |
| if ferr != nil { |
| c.Close(ctx, ferr) |
| return nil, ferr |
| } |
| c.initializeHealthChecks(ctx, rtt) |
| c.loopWG.Add(2) |
| go c.blessingsLoop(ctx, refreshTime, lAuthorizedPeers) |
| go c.readLoop(ctx) |
| c.mu.Lock() |
| c.lastUsedTime = time.Now() |
| c.mu.Unlock() |
| return c, nil |
| } |
| |
| func (c *Conn) blessingsLoop( |
| ctx *context.T, |
| refreshTime time.Time, |
| authorizedPeers []security.BlessingPattern) { |
| defer c.loopWG.Done() |
| for { |
| if refreshTime.IsZero() { |
| select { |
| case <-c.localValid: |
| case <-ctx.Done(): |
| return |
| } |
| } else { |
| timer := time.NewTimer(refreshTime.Sub(time.Now())) |
| select { |
| case <-timer.C: |
| case <-c.localValid: |
| case <-ctx.Done(): |
| timer.Stop() |
| return |
| } |
| timer.Stop() |
| } |
| var dis map[string]security.Discharge |
| blessings, valid := v23.GetPrincipal(ctx).BlessingStore().Default() |
| dis, refreshTime = slib.PrepareDischarges(ctx, blessings, nil, "", nil) |
| bkey, dkey, err := c.blessingsFlow.send(ctx, blessings, dis, authorizedPeers) |
| if err != nil { |
| c.internalClose(ctx, false, err) |
| return |
| } |
| c.mu.Lock() |
| c.localBlessings = blessings |
| c.localDischarges = dis |
| c.localValid = valid |
| err = c.sendMessageLocked(ctx, true, expressPriority, &message.Auth{ |
| BlessingsKey: bkey, |
| DischargeKey: dkey, |
| }) |
| c.mu.Unlock() |
| if err != nil { |
| c.internalClose(ctx, false, err) |
| return |
| } |
| } |
| } |
| |
| // MTU Returns the maximum transimission unit for the connection in bytes. |
| func (c *Conn) MTU() uint64 { |
| return c.mtu |
| } |
| |
| // RTT returns the round trip time of a message to the remote end. |
| // Note the initial estimate of the RTT from the accepted side of a connection |
| // my be long because we don't fully factor out certificate verification time. |
| // The RTT will be updated with the receipt of every healthCheckResponse, so |
| // this overestimate doesn't remain for long when the channel timeout is low. |
| func (c *Conn) RTT() time.Duration { |
| defer c.mu.Unlock() |
| c.mu.Lock() |
| rtt := c.hcstate.lastRTT |
| if !c.hcstate.requestSent.IsZero() { |
| if waitRTT := time.Since(c.hcstate.requestSent); waitRTT > rtt { |
| rtt = waitRTT |
| } |
| } |
| return rtt |
| } |
| |
| func (c *Conn) initializeHealthChecks(ctx *context.T, firstRTT time.Duration) { |
| now := time.Now() |
| h := &healthCheckState{ |
| requestDeadline: now.Add(c.acceptChannelTimeout / 2), |
| |
| closeTimer: time.AfterFunc(c.acceptChannelTimeout, func() { |
| c.internalClose(ctx, false, NewErrChannelTimeout(ctx)) |
| }), |
| closeDeadline: now.Add(c.acceptChannelTimeout), |
| lastRTT: firstRTT, |
| } |
| requestTimer := time.AfterFunc(c.acceptChannelTimeout/2, func() { |
| c.mu.Lock() |
| c.sendMessageLocked(ctx, true, expressPriority, &message.HealthCheckRequest{}) |
| h.requestSent = time.Now() |
| c.mu.Unlock() |
| }) |
| h.requestTimer = requestTimer |
| c.mu.Lock() |
| c.hcstate = h |
| c.mu.Unlock() |
| } |
| |
| func (c *Conn) handleHealthCheckResponse(ctx *context.T) { |
| defer c.mu.Unlock() |
| c.mu.Lock() |
| if c.status < Closing { |
| timeout := c.acceptChannelTimeout |
| for _, f := range c.flows { |
| if f.channelTimeout > 0 && f.channelTimeout < timeout { |
| timeout = f.channelTimeout |
| } |
| } |
| if min := minChannelTimeout[c.local.Protocol]; timeout < min { |
| timeout = min |
| } |
| c.hcstate.closeTimer.Reset(timeout) |
| c.hcstate.closeDeadline = time.Now().Add(timeout) |
| c.hcstate.requestTimer.Reset(timeout / 2) |
| c.hcstate.requestDeadline = time.Now().Add(timeout / 2) |
| c.hcstate.lastRTT = time.Since(c.hcstate.requestSent) |
| c.hcstate.requestSent = time.Time{} |
| } |
| } |
| |
| func (c *Conn) healthCheckNewFlowLocked(ctx *context.T, timeout time.Duration) { |
| if timeout != 0 { |
| if min := minChannelTimeout[c.local.Protocol]; timeout < min { |
| timeout = min |
| } |
| now := time.Now() |
| if rd := now.Add(timeout / 2); rd.Before(c.hcstate.requestDeadline) { |
| c.hcstate.requestDeadline = rd |
| c.hcstate.requestTimer.Reset(timeout / 2) |
| } |
| if cd := now.Add(timeout); cd.Before(c.hcstate.closeDeadline) { |
| c.hcstate.closeDeadline = cd |
| c.hcstate.closeTimer.Reset(timeout) |
| } |
| } |
| } |
| |
| func (c *Conn) healthCheckCloseDeadline() time.Time { |
| defer c.mu.Unlock() |
| c.mu.Lock() |
| return c.hcstate.closeDeadline |
| } |
| |
| // EnterLameDuck enters lame duck mode, the returned channel will be closed when |
| // the remote end has ack'd or the Conn is closed. |
| func (c *Conn) EnterLameDuck(ctx *context.T) chan struct{} { |
| var err error |
| c.mu.Lock() |
| if c.status < EnteringLameDuck { |
| c.status = EnteringLameDuck |
| err = c.sendMessageLocked(ctx, false, expressPriority, &message.EnterLameDuck{}) |
| } |
| c.mu.Unlock() |
| if err != nil { |
| c.Close(ctx, NewErrSend(ctx, "release", c.remote.String(), err)) |
| } |
| return c.lameDucked |
| } |
| |
| // Dial dials a new flow on the Conn. |
| func (c *Conn) Dial(ctx *context.T, blessings security.Blessings, discharges map[string]security.Discharge, |
| remote naming.Endpoint, channelTimeout time.Duration, sideChannel bool) (flow.Flow, error) { |
| if c.remote.RoutingID == naming.NullRoutingID { |
| return nil, NewErrDialingNonServer(ctx, c.remote.String()) |
| } |
| if blessings.IsZero() { |
| // its safe to ignore this error since c.lBlessings must be valid, so the |
| // encoding of the publicKey can never error out. |
| blessings, _ = security.NamelessBlessing(v23.GetPrincipal(ctx).PublicKey()) |
| } |
| defer c.mu.Unlock() |
| c.mu.Lock() |
| |
| // It may happen that in the case of bidirectional RPC the dialer of the connection |
| // has sent blessings, but not yet discharges. In this case we will wait for them |
| // to send the discharges before allowing a bidirectional flow dial. |
| if valid := c.remoteValid; valid != nil && len(c.remoteDischarges) == 0 && len(c.remoteBlessings.ThirdPartyCaveats()) > 0 { |
| c.mu.Unlock() |
| <-valid |
| c.mu.Lock() |
| } |
| |
| if c.remoteLameDuck || c.status >= Closing { |
| return nil, NewErrConnectionClosed(ctx) |
| } |
| id := c.nextFid |
| c.nextFid += 2 |
| remote = c.remote |
| remote = remote.WithBlessingNames(c.remote.BlessingNames()) |
| flw := c.newFlowLocked( |
| ctx, |
| id, |
| blessings, |
| c.remoteBlessings, |
| discharges, |
| c.remoteDischarges, |
| remote, |
| true, |
| false, |
| channelTimeout, |
| sideChannel) |
| return flw, nil |
| } |
| |
| // LocalEndpoint returns the local vanadium Endpoint |
| func (c *Conn) LocalEndpoint() naming.Endpoint { return c.local } |
| |
| // RemoteEndpoint returns the remote vanadium Endpoint |
| func (c *Conn) RemoteEndpoint() naming.Endpoint { return c.remote } |
| |
| // LocalBlessings returns the local blessings. |
| func (c *Conn) LocalBlessings() security.Blessings { |
| c.mu.Lock() |
| localBlessings := c.localBlessings |
| c.mu.Unlock() |
| return localBlessings |
| } |
| |
| // RemoteBlessings returns the remote blessings. |
| func (c *Conn) RemoteBlessings() security.Blessings { |
| c.mu.Lock() |
| remoteBlessings := c.remoteBlessings |
| c.mu.Unlock() |
| return remoteBlessings |
| } |
| |
| // LocalDischarges fetches the most recently sent discharges for the local |
| // ends blessings. |
| func (c *Conn) LocalDischarges() map[string]security.Discharge { |
| c.mu.Lock() |
| localDischarges := c.localDischarges |
| c.mu.Unlock() |
| return localDischarges |
| } |
| |
| // RemoteDischarges fetches the most recently received discharges for the remote |
| // ends blessings. |
| func (c *Conn) RemoteDischarges() map[string]security.Discharge { |
| c.mu.Lock() |
| // It may happen that in the case of bidirectional RPC the dialer of the connection |
| // has sent blessings, but not yet discharges. In this case we will wait for them |
| // to send the discharges instead of returning the initial nil discharges. |
| if valid := c.remoteValid; valid != nil && len(c.remoteDischarges) == 0 && len(c.remoteBlessings.ThirdPartyCaveats()) > 0 { |
| c.mu.Unlock() |
| <-valid |
| c.mu.Lock() |
| } |
| remoteDischarges := c.remoteDischarges |
| c.mu.Unlock() |
| return remoteDischarges |
| } |
| |
| // CommonVersion returns the RPCVersion negotiated between the local and remote endpoints. |
| func (c *Conn) CommonVersion() version.RPCVersion { return c.version } |
| |
| // LastUsed returns the time at which the Conn had bytes read or written on it. |
| func (c *Conn) LastUsed() time.Time { |
| defer c.mu.Unlock() |
| c.mu.Lock() |
| return c.lastUsedTime |
| } |
| |
| // RemoteLameDuck returns true if the other end of the connection has announced that |
| // it is in lame duck mode indicating that new flows should not be dialed on this |
| // conn. |
| func (c *Conn) RemoteLameDuck() bool { |
| defer c.mu.Unlock() |
| c.mu.Lock() |
| return c.remoteLameDuck |
| } |
| |
| // Closed returns a channel that will be closed after the Conn is shutdown. |
| // After this channel is closed it is guaranteed that all Dial calls will fail |
| // with an error and no more flows will be sent to the FlowHandler. |
| func (c *Conn) Closed() <-chan struct{} { return c.closed } |
| |
| func (c *Conn) Status() Status { |
| c.mu.Lock() |
| status := c.status |
| c.mu.Unlock() |
| return status |
| } |
| |
| // Close shuts down a conn. |
| func (c *Conn) Close(ctx *context.T, err error) { |
| c.internalClose(ctx, false, err) |
| <-c.closed |
| } |
| |
| // CloseIfIdle closes the connection if the conn has been idle for idleExpiry, |
| // returning true if it closed it. |
| func (c *Conn) CloseIfIdle(ctx *context.T, idleExpiry time.Duration) bool { |
| defer c.mu.Unlock() |
| c.mu.Lock() |
| if c.isIdleLocked(ctx, idleExpiry) { |
| c.internalCloseLocked(ctx, false, NewErrIdleConnKilled(ctx)) |
| return true |
| } |
| return false |
| } |
| |
| func (c *Conn) IsIdle(ctx *context.T, idleExpiry time.Duration) bool { |
| defer c.mu.Unlock() |
| c.mu.Lock() |
| return c.isIdleLocked(ctx, idleExpiry) |
| } |
| |
| // isIdleLocked returns true if the connection has been idle for idleExpiry. |
| func (c *Conn) isIdleLocked(ctx *context.T, idleExpiry time.Duration) bool { |
| if c.hasActiveFlowsLocked() { |
| return false |
| } |
| return c.lastUsedTime.Add(idleExpiry).Before(time.Now()) |
| } |
| |
| func (c *Conn) HasActiveFlows() bool { |
| defer c.mu.Unlock() |
| c.mu.Lock() |
| return c.hasActiveFlowsLocked() |
| } |
| |
| func (c *Conn) hasActiveFlowsLocked() bool { |
| for _, f := range c.flows { |
| if !f.sideChannel { |
| return true |
| } |
| } |
| return false |
| } |
| |
| func (c *Conn) internalClose(ctx *context.T, closedRemotely bool, err error) { |
| c.mu.Lock() |
| c.internalCloseLocked(ctx, closedRemotely, err) |
| c.mu.Unlock() |
| } |
| |
| func (c *Conn) internalCloseLocked(ctx *context.T, closedRemotely bool, err error) { |
| debug := ctx.VI(2) |
| debug.Infof("Closing connection: %v", err) |
| |
| flows := make([]*flw, 0, len(c.flows)) |
| for _, f := range c.flows { |
| flows = append(flows, f) |
| } |
| if c.status >= Closing { |
| // This conn is already being torn down. |
| return |
| } |
| if c.status < LameDuckAcknowledged { |
| close(c.lameDucked) |
| } |
| c.status = Closing |
| if c.remoteValid != nil { |
| close(c.remoteValid) |
| c.remoteValid = nil |
| } |
| |
| go func(c *Conn) { |
| if c.hcstate != nil { |
| c.hcstate.requestTimer.Stop() |
| c.hcstate.closeTimer.Stop() |
| } |
| if !closedRemotely { |
| msg := "" |
| if err != nil { |
| msg = err.Error() |
| } |
| c.mu.Lock() |
| cerr := c.sendMessageLocked(ctx, false, tearDownPriority, &message.TearDown{ |
| Message: msg, |
| }) |
| c.mu.Unlock() |
| if cerr != nil { |
| ctx.VI(2).Infof("Error sending tearDown on connection to %s: %v", c.remote, cerr) |
| } |
| } |
| if err == nil { |
| err = NewErrConnectionClosed(ctx) |
| } |
| for _, f := range flows { |
| f.close(ctx, false, err) |
| } |
| if c.blessingsFlow != nil { |
| c.blessingsFlow.close(ctx, err) |
| } |
| if cerr := c.mp.rw.Close(); cerr != nil { |
| debug.Infof("Error closing underlying connection for %s: %v", c.remote, cerr) |
| } |
| if c.cancel != nil { |
| c.cancel() |
| } |
| c.loopWG.Wait() |
| c.mu.Lock() |
| c.status = Closed |
| close(c.closed) |
| c.mu.Unlock() |
| }(c) |
| } |
| |
| func (c *Conn) release(ctx *context.T, fid, count uint64) { |
| var toRelease map[uint64]uint64 |
| var release bool |
| c.mu.Lock() |
| c.toRelease[fid] += count |
| if c.borrowing[fid] { |
| c.toRelease[invalidFlowID] += count |
| release = c.toRelease[invalidFlowID] > DefaultBytesBufferedPerFlow/2 |
| } else { |
| release = c.toRelease[fid] > DefaultBytesBufferedPerFlow/2 |
| } |
| if release { |
| toRelease = c.toRelease |
| c.toRelease = make(map[uint64]uint64, len(c.toRelease)) |
| c.borrowing = make(map[uint64]bool, len(c.borrowing)) |
| } |
| var err error |
| if toRelease != nil { |
| delete(toRelease, invalidFlowID) |
| err = c.sendMessageLocked(ctx, false, expressPriority, &message.Release{ |
| Counters: toRelease, |
| }) |
| } |
| c.mu.Unlock() |
| if err != nil { |
| c.Close(ctx, NewErrSend(ctx, "release", c.remote.String(), err)) |
| } |
| } |
| |
| func (c *Conn) releaseOutstandingBorrowedLocked(fid, val uint64) { |
| borrowed := c.outstandingBorrowed[fid] |
| released := val |
| if borrowed == 0 { |
| return |
| } else if borrowed < released { |
| released = borrowed |
| } |
| c.lshared += released |
| if released == borrowed { |
| delete(c.outstandingBorrowed, fid) |
| } else { |
| c.outstandingBorrowed[fid] = borrowed - released |
| } |
| } |
| |
| func (c *Conn) handleMessage(ctx *context.T, m message.Message) error { |
| switch msg := m.(type) { |
| case *message.TearDown: |
| var err error |
| if msg.Message != "" { |
| err = NewErrRemoteError(ctx, msg.Message) |
| } |
| c.internalClose(ctx, true, err) |
| return nil |
| |
| case *message.EnterLameDuck: |
| c.mu.Lock() |
| c.remoteLameDuck = true |
| c.mu.Unlock() |
| go func() { |
| // We only want to send the lame duck acknowledgment after all outstanding |
| // OpenFlows are sent. |
| c.unopenedFlows.Wait() |
| c.mu.Lock() |
| err := c.sendMessageLocked(ctx, true, expressPriority, &message.AckLameDuck{}) |
| c.mu.Unlock() |
| if err != nil { |
| c.Close(ctx, NewErrSend(ctx, "release", c.remote.String(), err)) |
| } |
| }() |
| |
| case *message.AckLameDuck: |
| c.mu.Lock() |
| if c.status < LameDuckAcknowledged { |
| c.status = LameDuckAcknowledged |
| close(c.lameDucked) |
| } |
| c.mu.Unlock() |
| |
| case *message.HealthCheckRequest: |
| c.mu.Lock() |
| c.sendMessageLocked(ctx, true, expressPriority, &message.HealthCheckResponse{}) |
| c.mu.Unlock() |
| |
| case *message.HealthCheckResponse: |
| c.handleHealthCheckResponse(ctx) |
| |
| case *message.OpenFlow: |
| remoteBlessings, remoteDischarges, err := c.blessingsFlow.getRemote( |
| ctx, msg.BlessingsKey, msg.DischargeKey) |
| if err != nil { |
| return err |
| } |
| c.mu.Lock() |
| if c.nextFid%2 == msg.ID%2 { |
| c.mu.Unlock() |
| return NewErrInvalidPeerFlow(ctx) |
| } |
| if c.handler == nil { |
| c.mu.Unlock() |
| return NewErrUnexpectedMsg(ctx, "openFlow") |
| } else if c.status == Closing { |
| c.mu.Unlock() |
| return nil // Conn is already being closed. |
| } |
| sideChannel := msg.Flags&message.SideChannelFlag != 0 |
| f := c.newFlowLocked( |
| ctx, |
| msg.ID, |
| c.localBlessings, |
| remoteBlessings, |
| c.localDischarges, |
| remoteDischarges, |
| c.remote, |
| false, |
| true, |
| c.acceptChannelTimeout, |
| sideChannel) |
| f.releaseLocked(msg.InitialCounters) |
| c.toRelease[msg.ID] = DefaultBytesBufferedPerFlow |
| c.borrowing[msg.ID] = true |
| c.mu.Unlock() |
| |
| c.handler.HandleFlow(f) |
| if err := f.q.put(ctx, msg.Payload); err != nil { |
| return err |
| } |
| if msg.Flags&message.CloseFlag != 0 { |
| f.close(ctx, true, nil) |
| } |
| |
| case *message.Release: |
| c.mu.Lock() |
| for fid, val := range msg.Counters { |
| if f := c.flows[fid]; f != nil { |
| f.releaseLocked(val) |
| } else { |
| c.releaseOutstandingBorrowedLocked(fid, val) |
| } |
| } |
| c.mu.Unlock() |
| |
| case *message.Data: |
| c.mu.Lock() |
| if c.status == Closing { |
| c.mu.Unlock() |
| return nil // Conn is already being shut down. |
| } |
| f := c.flows[msg.ID] |
| if f == nil { |
| // If the flow is closing then we assume the remote side releases |
| // all borrowed counters for that flow. |
| c.releaseOutstandingBorrowedLocked(msg.ID, math.MaxUint64) |
| c.mu.Unlock() |
| return nil |
| } |
| c.mu.Unlock() |
| if err := f.q.put(ctx, msg.Payload); err != nil { |
| return err |
| } |
| if msg.Flags&message.CloseFlag != 0 { |
| f.close(ctx, true, nil) |
| } |
| |
| case *message.Auth: |
| blessings, discharges, err := c.blessingsFlow.getRemote( |
| ctx, msg.BlessingsKey, msg.DischargeKey) |
| if err != nil { |
| return err |
| } |
| c.mu.Lock() |
| c.remoteBlessings = blessings |
| c.remoteDischarges = discharges |
| if c.remoteValid != nil { |
| close(c.remoteValid) |
| c.remoteValid = make(chan struct{}) |
| } |
| c.mu.Unlock() |
| default: |
| return NewErrUnexpectedMsg(ctx, reflect.TypeOf(msg).String()) |
| } |
| return nil |
| } |
| |
| func (c *Conn) readLoop(ctx *context.T) { |
| defer c.loopWG.Done() |
| var err error |
| for { |
| msg, rerr := c.mp.readMsg(ctx) |
| if rerr != nil { |
| err = NewErrRecv(ctx, c.remote.String(), rerr) |
| break |
| } |
| if err = c.handleMessage(ctx, msg); err != nil { |
| break |
| } |
| } |
| c.internalClose(ctx, false, err) |
| } |
| |
| func (c *Conn) markUsed() { |
| c.mu.Lock() |
| c.markUsedLocked() |
| c.mu.Unlock() |
| } |
| |
| func (c *Conn) markUsedLocked() { |
| c.lastUsedTime = time.Now() |
| } |
| |
| func (c *Conn) IsEncapsulated() bool { |
| _, ok := c.mp.rw.(*flw) |
| return ok |
| } |
| |
| type writer interface { |
| notify() |
| priority() int |
| neighbors() (prev, next writer) |
| setNeighbors(prev, next writer) |
| } |
| |
| // activateWriterLocked adds a given writer to the list of active writers. |
| // The writer will be given a turn when the channel becomes available. |
| // You should try to only have writers with actual work to do in the |
| // list of activeWriters because we will switch to that thread to allow it |
| // to do work, and it will be wasteful if it turns out there is no work to do. |
| // After calling this you should typically call notifyNextWriterLocked. |
| func (c *Conn) activateWriterLocked(w writer) { |
| priority := w.priority() |
| _, wn := w.neighbors() |
| head := c.activeWriters[priority] |
| if head == w || wn != w { |
| // We're already active. |
| return |
| } |
| if head == nil { // We're the head of the list. |
| c.activeWriters[priority] = w |
| } else { // Insert us before head, which is the end of the list. |
| hp, _ := head.neighbors() |
| w.setNeighbors(hp, head) |
| hp.setNeighbors(nil, w) |
| head.setNeighbors(w, nil) |
| } |
| } |
| |
| // deactivateWriterLocked removes a writer from the active writer list. After |
| // this function is called it is certain that the writer will not be given any |
| // new turns. If the writer is already in the middle of a turn, that turn is |
| // not terminated, workers must end their turn explicitly by calling |
| // notifyNextWriterLocked. |
| func (c *Conn) deactivateWriterLocked(w writer) { |
| priority := w.priority() |
| p, n := w.neighbors() |
| if head := c.activeWriters[priority]; head == w { |
| if w == n { // We're the only one in the list. |
| c.activeWriters[priority] = nil |
| } else { |
| c.activeWriters[priority] = n |
| } |
| } |
| n.setNeighbors(p, nil) |
| p.setNeighbors(nil, n) |
| w.setNeighbors(w, w) |
| } |
| |
| // notifyNextWriterLocked notifies the highest priority activeWriter to take |
| // a turn writing. If w is the active writer give up w's claim and choose |
| // the next writer. If there is already an active writer != w, this function does |
| // nothing. |
| func (c *Conn) notifyNextWriterLocked(w writer) { |
| if c.writing == w { |
| c.writing = nil |
| } |
| if c.writing == nil { |
| for p, head := range c.activeWriters { |
| if head != nil { |
| _, c.activeWriters[p] = head.neighbors() |
| c.writing = head |
| head.notify() |
| return |
| } |
| } |
| } |
| } |
| |
| type writerList struct { |
| // next and prev are protected by c.mu |
| next, prev writer |
| } |
| |
| func (s *writerList) neighbors() (prev, next writer) { return s.prev, s.next } |
| func (s *writerList) setNeighbors(prev, next writer) { |
| if prev != nil { |
| s.prev = prev |
| } |
| if next != nil { |
| s.next = next |
| } |
| } |
| |
| // singleMessageWriter is used to send a single message with a given priority. |
| type singleMessageWriter struct { |
| writeCh chan struct{} |
| p int |
| writerList |
| } |
| |
| func (s *singleMessageWriter) notify() { close(s.writeCh) } |
| func (s *singleMessageWriter) priority() int { return s.p } |
| |
| // sendMessageLocked sends a single message on the conn with the given priority. |
| // if cancelWithContext is true, then this write attempt will fail when the context |
| // is canceled. Otherwise context cancellation will have no effect and this call |
| // will block until the message is sent. |
| // NOTE: The mutex is not held for the entirety of this call, |
| // therefore this call will interrupt your critical section. This |
| // should be called only at the end of a mutex protected region. |
| func (c *Conn) sendMessageLocked( |
| ctx *context.T, |
| cancelWithContext bool, |
| priority int, |
| m message.Message) (err error) { |
| s := &singleMessageWriter{writeCh: make(chan struct{}), p: priority} |
| s.next, s.prev = s, s |
| c.activateWriterLocked(s) |
| c.notifyNextWriterLocked(s) |
| c.mu.Unlock() |
| // wait for my turn. |
| if cancelWithContext { |
| select { |
| case <-ctx.Done(): |
| err = ctx.Err() |
| case <-s.writeCh: |
| } |
| } else { |
| <-s.writeCh |
| } |
| // send the actual message. |
| if err == nil { |
| err = c.mp.writeMsg(ctx, m) |
| } |
| c.mu.Lock() |
| c.deactivateWriterLocked(s) |
| c.notifyNextWriterLocked(s) |
| return err |
| } |
| |
| func (c *Conn) DebugString() string { |
| defer c.mu.Unlock() |
| c.mu.Lock() |
| return fmt.Sprintf(` |
| Remote: |
| Endpoint %v |
| Blessings: %v (claimed) |
| PublicKey: %v |
| Local: |
| Endpoint: %v |
| Blessings: %v |
| Version: %v |
| MTU: %d |
| LastUsed: %v |
| #Flows: %d |
| `, |
| c.remote, |
| c.remoteBlessings, |
| c.rPublicKey, |
| c.local, |
| c.localBlessings, |
| c.version, |
| c.mtu, |
| c.lastUsedTime, |
| len(c.flows)) |
| } |