blob: 4681bded21dbb3a305a8ee02f59393ea79431625 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package conn
import (
"io"
"sync"
"time"
"v.io/v23/context"
"v.io/v23/flow"
"v.io/v23/flow/message"
"v.io/v23/naming"
"v.io/v23/security"
"v.io/v23/verror"
)
type flw struct {
// These variables are all set during flow construction.
id uint64
dialed bool
conn *Conn
q *readq
bkey, dkey uint64
noEncrypt bool
writeCh chan struct{}
remote naming.Endpoint
// These variables can only be modified by SetDeadlineContext which cannot
// be called concurrently with other methods on the flow. Therefore they
// are not mutex protected.
ctx *context.T
cancel context.CancelFunc
// NOTE: The remaining variables are actually protected by conn.mu.
// opened indicates whether the flow has already been opened. If false
// we need to send an open flow on the next write. For accepted flows
// this will always be true.
opened bool
// writing is true if we're in the middle of a write to this flow.
writing bool
// released counts tokens already released by the remote end, that is, the number
// of tokens we are allowed to send.
released uint64
// borrowed indicates the number of tokens we have borrowed from the shared pool for
// sending on newly dialed flows.
borrowed uint64
// borrowCond is a condition variable that we can use to wait for shared
// counters to be released.
borrowCond *sync.Cond
// borrowing indicates whether this flow is using borrowed counters for a newly
// dialed flow. This will be set to false after we first receive a
// release from the remote end. This is always false for accepted flows.
borrowing bool
writerList
}
// Ensure that *flw implements flow.Flow.
var _ flow.Flow = &flw{}
func (c *Conn) newFlowLocked(ctx *context.T, id uint64, bkey, dkey uint64, remote naming.Endpoint, dialed, preopen bool) *flw {
f := &flw{
id: id,
dialed: dialed,
conn: c,
q: newReadQ(c, id),
bkey: bkey,
dkey: dkey,
opened: preopen,
borrowing: dialed,
borrowCond: sync.NewCond(&c.mu),
// It's important that this channel has a non-zero buffer. Sometimes this
// flow will be notifying itself, so if there's no buffer a deadlock will
// occur.
writeCh: make(chan struct{}, 1),
remote: remote,
}
f.next, f.prev = f, f
f.ctx, f.cancel = context.WithCancel(ctx)
if !f.opened {
c.unopenedFlows.Add(1)
}
c.flows[id] = f
return f
}
// Implement the writer interface.
func (f *flw) notify() { f.writeCh <- struct{}{} }
func (f *flw) priority() int { return flowPriority }
// disableEncrytion should not be called concurrently with Write* methods.
func (f *flw) disableEncryption() {
f.noEncrypt = false
}
// Implement io.Reader.
// Read and ReadMsg should not be called concurrently with themselves
// or each other.
func (f *flw) Read(p []byte) (n int, err error) {
if err = f.checkBlessings(); err != nil {
return
}
f.markUsed()
if n, err = f.q.read(f.ctx, p); err != nil {
f.close(f.ctx, err)
}
return
}
// ReadMsg is like read, but it reads bytes in chunks. Depending on the
// implementation the batch boundaries might or might not be significant.
// Read and ReadMsg should not be called concurrently with themselves
// or each other.
func (f *flw) ReadMsg() (buf []byte, err error) {
if err = f.checkBlessings(); err != nil {
return
}
f.markUsed()
// TODO(mattr): Currently we only ever release counters when some flow
// reads. We may need to do it more or less often. Currently
// we'll send counters whenever a new flow is opened.
if buf, err = f.q.get(f.ctx); err != nil {
f.close(f.ctx, err)
}
return
}
// Implement io.Writer.
// Write, WriteMsg, and WriteMsgAndClose should not be called concurrently
// with themselves or each other.
func (f *flw) Write(p []byte) (n int, err error) {
return f.WriteMsg(p)
}
// tokensLocked returns the number of tokens this flow can send right now.
// It is bounded by the channel mtu, the released counters, and possibly
// the number of shared counters for the conn if we are sending on a just
// dialed flow.
func (f *flw) tokensLocked() (int, func(int)) {
max := uint64(mtu)
if f.borrowing {
if f.conn.lshared < max {
max = f.conn.lshared
}
return int(max), func(used int) {
f.conn.lshared -= uint64(used)
f.borrowed += uint64(used)
f.ctx.VI(2).Infof("deducting %d borrowed tokens on flow %d(%p), total: %d", used, f.id, f, f.borrowed)
}
}
if f.released < max {
max = f.released
}
return int(max), func(used int) {
f.released -= uint64(used)
f.ctx.VI(2).Infof("flow %d(%p) deducting %d tokens, %d left", f.id, f, used, f.released)
}
}
// releaseLocked releases some counters from a remote reader to the local
// writer. This allows the writer to then write more data to the wire.
func (f *flw) releaseLocked(tokens uint64) {
f.borrowing = false
if f.borrowed > 0 {
n := tokens
if f.borrowed < tokens {
n = f.borrowed
}
f.ctx.VI(2).Infof("Returning %d tokens borrowed by %d(%p)", f.borrowed, f.id, f)
tokens -= n
f.borrowed -= n
f.conn.lshared += n
f.borrowCond.Broadcast()
}
f.released += tokens
f.ctx.VI(2).Infof("Tokens release to %d(%p): %d => %d", f.id, f, tokens, f.released)
if f.writing {
f.ctx.VI(2).Infof("Activating writing flow %d(%p) now that we have tokens.", f.id, f)
f.conn.activateWriterLocked(f)
f.conn.notifyNextWriterLocked(nil)
}
}
func (f *flw) writeMsg(alsoClose bool, parts ...[]byte) (sent int, err error) {
if err = f.checkBlessings(); err != nil {
return 0, err
}
f.ctx.VI(2).Infof("starting write on flow %d(%p)", f.id, f)
select {
// Catch cancellations early. If we caught a cancel when waiting
// our turn below its possible that we were notified simultaneously.
// Then the notify channel will be full and we would deadlock
// notifying ourselves.
case <-f.ctx.Done():
f.close(f.ctx, f.ctx.Err())
return 0, io.EOF
default:
}
size, sent, tosend := 0, 0, make([][]byte, len(parts))
f.conn.mu.Lock()
f.markUsedLocked()
f.writing = true
f.conn.activateWriterLocked(f)
for err == nil && len(parts) > 0 {
f.conn.notifyNextWriterLocked(f)
// Wait for our turn.
f.conn.mu.Unlock()
select {
case <-f.ctx.Done():
err = io.EOF
case <-f.writeCh:
}
// It's our turn, we lock to learn the current state of our buffer tokens.
f.conn.mu.Lock()
if err != nil {
break
}
opened := f.opened
tokens, deduct := f.tokensLocked()
if tokens == 0 {
// Oops, we really don't have data to send, probably because we've exhausted
// the remote buffer. deactivate ourselves but keep trying.
f.ctx.VI(2).Infof("Deactivating write on flow %d(%p) due to lack of tokens", f.id, f)
f.conn.deactivateWriterLocked(f)
continue
}
parts, tosend, size = popFront(parts, tosend[:0], tokens)
deduct(size)
f.conn.mu.Unlock()
// Actually write to the wire. This is also where encryption
// happens, so this part can be slow.
d := &message.Data{ID: f.id, Payload: tosend}
if alsoClose && len(parts) == 0 {
d.Flags |= message.CloseFlag
}
if f.noEncrypt {
d.Flags |= message.DisableEncryptionFlag
}
if opened {
err = f.conn.mp.writeMsg(f.ctx, d)
} else {
err = f.conn.mp.writeMsg(f.ctx, &message.OpenFlow{
ID: f.id,
InitialCounters: DefaultBytesBufferedPerFlow,
BlessingsKey: f.bkey,
DischargeKey: f.dkey,
Flags: d.Flags,
Payload: d.Payload,
})
}
sent += size
// The top of the loop expects to be locked, so lock here and update
// opened. Note that since we've definitely sent a message now opened is surely
// true.
f.conn.mu.Lock()
// We need to ensure that we only call Done() exactly once, so we need to
// recheck f.opened, to ensure that f.close didn't decrement the wait group
// while we were not holding the lock.
if !f.opened {
f.conn.unopenedFlows.Done()
}
f.opened = true
}
f.writing = false
f.ctx.VI(2).Infof("finishing write on %d(%p): %v", f.id, f, err)
f.conn.deactivateWriterLocked(f)
f.conn.notifyNextWriterLocked(f)
f.conn.mu.Unlock()
if alsoClose || err != nil {
f.close(f.ctx, err)
}
return sent, err
}
// WriteMsg is like Write, but allows writing more than one buffer at a time.
// The data in each buffer is written sequentially onto the flow. Returns the
// number of bytes written. WriteMsg must return a non-nil error if it writes
// less than the total number of bytes from all buffers.
// Write, WriteMsg, and WriteMsgAndClose should not be called concurrently
// with themselves or each other.
func (f *flw) WriteMsg(parts ...[]byte) (int, error) {
return f.writeMsg(false, parts...)
}
// WriteMsgAndClose performs WriteMsg and then closes the flow.
// Write, WriteMsg, and WriteMsgAndClose should not be called concurrently
// with themselves or each other.
func (f *flw) WriteMsgAndClose(parts ...[]byte) (int, error) {
return f.writeMsg(true, parts...)
}
func (f *flw) checkBlessings() error {
var err error
if !f.dialed && f.bkey != 0 {
_, _, err = f.conn.blessingsFlow.getRemote(f.ctx, f.bkey, f.dkey)
}
return err
}
// SetContext sets the context associated with the flow. Typically this is
// used to set state that is only available after the flow is connected, such
// as a more restricted flow timeout, or the language of the request.
// Calling SetContext may invalidate values previously returned from Closed.
//
// The flow.Manager associated with ctx must be the same flow.Manager that the
// flow was dialed or accepted from, otherwise an error is returned.
// TODO(mattr): enforce this restriction.
//
// TODO(mattr): update v23/flow documentation.
// SetContext may not be called concurrently with other methods.
func (f *flw) SetDeadlineContext(ctx *context.T, deadline time.Time) *context.T {
if f.cancel != nil {
f.cancel()
}
if !deadline.IsZero() {
f.ctx, f.cancel = context.WithDeadline(ctx, deadline)
} else {
f.ctx, f.cancel = context.WithCancel(ctx)
}
return f.ctx
}
// LocalEndpoint returns the local vanadium endpoint.
func (f *flw) LocalEndpoint() naming.Endpoint {
return f.conn.local
}
// RemoteEndpoint returns the remote vanadium endpoint.
func (f *flw) RemoteEndpoint() naming.Endpoint {
if f.remote != nil {
return f.remote
}
return f.conn.remote
}
// LocalBlessings returns the blessings presented by the local end of the flow
// during authentication.
func (f *flw) LocalBlessings() security.Blessings {
if f.dialed {
blessings, _ := f.conn.blessingsFlow.getLocal(f.ctx, f.bkey, 0)
return blessings
}
return f.conn.lBlessings
}
// RemoteBlessings returns the blessings presented by the remote end of the
// flow during authentication.
func (f *flw) RemoteBlessings() security.Blessings {
var blessings security.Blessings
var err error
if !f.dialed {
blessings, _, err = f.conn.blessingsFlow.getRemote(f.ctx, f.bkey, 0)
} else {
blessings, _, err = f.conn.blessingsFlow.getLatestRemote(f.ctx, f.conn.rBKey)
}
if err != nil {
f.conn.Close(f.ctx, err)
}
return blessings
}
// LocalDischarges returns the discharges presented by the local end of the
// flow during authentication.
//
// Discharges are organized in a map keyed by the discharge-identifier.
func (f *flw) LocalDischarges() map[string]security.Discharge {
if f.dialed {
_, discharges := f.conn.blessingsFlow.getLocal(f.ctx, f.bkey, f.dkey)
return discharges
}
return f.conn.blessingsFlow.getLatestLocal(f.ctx, f.conn.lBlessings)
}
// RemoteDischarges returns the discharges presented by the remote end of the
// flow during authentication.
//
// Discharges are organized in a map keyed by the discharge-identifier.
func (f *flw) RemoteDischarges() map[string]security.Discharge {
var discharges map[string]security.Discharge
var err error
if !f.dialed {
_, discharges, err = f.conn.blessingsFlow.getRemote(f.ctx, f.bkey, f.dkey)
} else {
_, discharges, err = f.conn.blessingsFlow.getLatestRemote(f.ctx, f.conn.rBKey)
}
if err != nil {
f.conn.Close(f.ctx, err)
}
return discharges
}
// Conn returns the connection the flow is multiplexed on.
func (f *flw) Conn() flow.ManagedConn {
return f.conn
}
// Closed returns a channel that remains open until the flow has been closed remotely
// or the context attached to the flow has been canceled.
//
// Note that after the returned channel is closed starting new writes will result
// in an error, but reads of previously queued data are still possible. No
// new data will be queued.
// TODO(mattr): update v23/flow docs.
func (f *flw) Closed() <-chan struct{} {
return f.ctx.Done()
}
func (f *flw) close(ctx *context.T, err error) {
closedRemotely := verror.ErrorID(err) == ErrFlowClosedRemotely.ID
f.conn.mu.Lock()
if closedRemotely {
// When the other side closes a flow, it implicitly releases all the
// counters used by that flow. That means we should release the shared
// counter to be used on other new flows.
f.conn.lshared += f.borrowed
f.borrowed = 0
}
f.borrowCond.Broadcast()
f.conn.mu.Unlock()
if f.q.close(ctx) {
f.ctx.VI(2).Infof("closing %d(%p): %v", f.id, f, err)
f.cancel()
// After cancel has been called no new writes will begin for this
// flow. There may be a write in progress, but it must finish
// before another writer gets to use the channel. Therefore we
// can simply use sendMessageLocked to send the close flow
// message.
f.conn.mu.Lock()
connClosing := f.conn.status == Closing
var serr error
if !f.opened {
// Closing a flow that was never opened.
f.conn.unopenedFlows.Done()
// We mark the flow as opened to prevent mulitple calls to
// f.conn.unopenedFlows.Done().
f.opened = true
} else if !closedRemotely && !connClosing {
// Note: If the conn is closing there is no point in trying to
// send the flow close message as it will fail. This is racy
// with the connection closing, but there are no ill-effects
// other than spamming the logs a little so it's OK.
serr = f.conn.sendMessageLocked(ctx, false, expressPriority, &message.Data{
ID: f.id,
Flags: message.CloseFlag,
})
}
if f.borrowed > 0 && f.conn.status < Closing {
f.conn.loopWG.Add(1)
go func() {
defer f.conn.loopWG.Done()
f.conn.mu.Lock()
for f.borrowed > 0 && f.conn.status < Closing {
f.borrowCond.Wait()
}
delete(f.conn.flows, f.id)
f.conn.mu.Unlock()
}()
}
f.conn.mu.Unlock()
if serr != nil {
ctx.Errorf("Could not send close flow message: %v", err)
}
}
}
// Close marks the flow as closed. After Close is called, new data cannot be
// written on the flow. Reads of already queued data are still possible.
func (f *flw) Close() error {
f.close(f.ctx, nil)
return nil
}
func (f *flw) markUsed() {
if f.id >= reservedFlows {
f.conn.markUsed()
}
}
func (f *flw) markUsedLocked() {
if f.id >= reservedFlows {
f.conn.markUsedLocked()
}
}
// popFront removes the first num bytes from in and appends them to out
// returning in, out, and the actual number of bytes appended.
func popFront(in, out [][]byte, num int) ([][]byte, [][]byte, int) {
i, sofar := 0, 0
for i < len(in) && sofar < num {
i, sofar = i+1, sofar+len(in[i])
}
out = append(out, in[:i]...)
if excess := sofar - num; excess > 0 {
i, sofar = i-1, num
keep := len(out[i]) - excess
in[i], out[i] = in[i][keep:], out[i][:keep]
}
return in[i:], out, sofar
}