blob: a3207dcc2e085337c8f836addae005530f187cd9 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package conn
import (
"strconv"
"v.io/v23/context"
"v.io/v23/flow"
"v.io/v23/flow/message"
"v.io/v23/security"
"v.io/v23/verror"
"v.io/x/ref/runtime/internal/flow/flowcontrol"
)
type flw struct {
id uint64
dialed bool
ctx *context.T
cancel context.CancelFunc
conn *Conn
worker *flowcontrol.Worker
opened bool
q *readq
bkey, dkey uint64
}
// Ensure that *flw implements flow.Flow.
var _ flow.Flow = &flw{}
func (c *Conn) newFlowLocked(ctx *context.T, id uint64, bkey, dkey uint64, dialed, preopen bool) *flw {
f := &flw{
id: id,
dialed: dialed,
conn: c,
worker: c.fc.NewWorker(strconv.FormatUint(uint64(id), 10), flowPriority),
q: newReadQ(),
bkey: bkey,
dkey: dkey,
opened: preopen,
}
f.SetContext(ctx)
c.flows[id] = f
return f
}
// Implement io.Reader.
// Read and ReadMsg should not be called concurrently with themselves
// or each other.
func (f *flw) Read(p []byte) (n int, err error) {
f.conn.markUsed()
var release bool
if n, release, err = f.q.read(f.ctx, p); release {
f.conn.release(f.ctx)
}
if err != nil {
f.close(f.ctx, err)
}
return
}
// ReadMsg is like read, but it reads bytes in chunks. Depending on the
// implementation the batch boundaries might or might not be significant.
// Read and ReadMsg should not be called concurrently with themselves
// or each other.
func (f *flw) ReadMsg() (buf []byte, err error) {
f.conn.markUsed()
var release bool
// TODO(mattr): Currently we only ever release counters when some flow
// reads. We may need to do it more or less often. Currently
// we'll send counters whenever a new flow is opened.
if buf, release, err = f.q.get(f.ctx); release {
f.conn.release(f.ctx)
}
if err != nil {
f.close(f.ctx, err)
}
return
}
// Implement io.Writer.
// Write, WriteMsg, and WriteMsgAndClose should not be called concurrently
// with themselves or each other.
func (f *flw) Write(p []byte) (n int, err error) {
return f.WriteMsg(p)
}
func (f *flw) writeMsg(alsoClose bool, parts ...[]byte) (int, error) {
f.conn.markUsed()
sent := 0
var left []byte
err := f.worker.Run(f.ctx, func(tokens int) (int, bool, error) {
if !f.opened {
// TODO(mattr): we should be able to send multiple messages
// in a single writeMsg call.
err := f.conn.mp.writeMsg(f.ctx, &message.OpenFlow{
ID: f.id,
InitialCounters: defaultBufferSize,
BlessingsKey: f.bkey,
DischargeKey: f.dkey,
})
if err != nil {
return 0, false, err
}
f.opened = true
}
size := 0
var bufs [][]byte
if len(left) > 0 {
size += len(left)
bufs = append(bufs, left)
left = nil
}
for size <= tokens && len(parts) > 0 {
bufs = append(bufs, parts[0])
size += len(parts[0])
parts = parts[1:]
}
if size > tokens {
lidx := len(bufs) - 1
last := bufs[lidx]
take := len(last) - (size - tokens)
bufs[lidx] = last[:take]
left = last[take:]
size = tokens
}
d := &message.Data{
ID: f.id,
Payload: bufs,
}
done := len(left) == 0 && len(parts) == 0
if alsoClose && done {
d.Flags |= message.CloseFlag
}
sent += size
return size, done, f.conn.mp.writeMsg(f.ctx, d)
})
if alsoClose || err != nil {
f.close(f.ctx, err)
}
return sent, err
}
// WriteMsg is like Write, but allows writing more than one buffer at a time.
// The data in each buffer is written sequentially onto the flow. Returns the
// number of bytes written. WriteMsg must return a non-nil error if it writes
// less than the total number of bytes from all buffers.
// Write, WriteMsg, and WriteMsgAndClose should not be called concurrently
// with themselves or each other.
func (f *flw) WriteMsg(parts ...[]byte) (int, error) {
return f.writeMsg(false, parts...)
}
// WriteMsgAndClose performs WriteMsg and then closes the flow.
// Write, WriteMsg, and WriteMsgAndClose should not be called concurrently
// with themselves or each other.
func (f *flw) WriteMsgAndClose(parts ...[]byte) (int, error) {
return f.writeMsg(true, parts...)
}
// SetContext sets the context associated with the flow. Typically this is
// used to set state that is only available after the flow is connected, such
// as a more restricted flow timeout, or the language of the request.
// Calling SetContext may invalidate values previously returned from Closed.
//
// The flow.Manager associated with ctx must be the same flow.Manager that the
// flow was dialed or accepted from, otherwise an error is returned.
// TODO(mattr): enforce this restriction.
//
// TODO(mattr): update v23/flow documentation.
// SetContext may not be called concurrently with other methods.
func (f *flw) SetContext(ctx *context.T) error {
if f.cancel != nil {
f.cancel()
}
f.ctx, f.cancel = context.WithCancel(ctx)
return nil
}
// LocalBlessings returns the blessings presented by the local end of the flow
// during authentication.
func (f *flw) LocalBlessings() security.Blessings {
if f.dialed {
blessings, _, err := f.conn.blessingsFlow.get(f.ctx, f.bkey, f.dkey)
if err != nil {
f.conn.Close(f.ctx, err)
}
return blessings
}
return f.conn.lBlessings
}
// RemoteBlessings returns the blessings presented by the remote end of the
// flow during authentication.
func (f *flw) RemoteBlessings() security.Blessings {
if !f.dialed {
blessings, _, err := f.conn.blessingsFlow.get(f.ctx, f.bkey, f.dkey)
if err != nil {
f.conn.Close(f.ctx, err)
}
return blessings
}
return f.conn.rBlessings
}
// LocalDischarges returns the discharges presented by the local end of the
// flow during authentication.
//
// Discharges are organized in a map keyed by the discharge-identifier.
func (f *flw) LocalDischarges() map[string]security.Discharge {
var discharges map[string]security.Discharge
var err error
if f.dialed {
_, discharges, err = f.conn.blessingsFlow.get(f.ctx, f.bkey, f.dkey)
} else {
discharges, err = f.conn.blessingsFlow.getLatestDischarges(f.ctx, f.conn.lBlessings)
}
if err != nil {
f.conn.Close(f.ctx, err)
}
return discharges
}
// RemoteDischarges returns the discharges presented by the remote end of the
// flow during authentication.
//
// Discharges are organized in a map keyed by the discharge-identifier.
func (f *flw) RemoteDischarges() map[string]security.Discharge {
var discharges map[string]security.Discharge
var err error
if !f.dialed {
_, discharges, err = f.conn.blessingsFlow.get(f.ctx, f.bkey, f.dkey)
} else {
discharges, err = f.conn.blessingsFlow.getLatestDischarges(f.ctx, f.conn.rBlessings)
}
if err != nil {
f.conn.Close(f.ctx, err)
}
return discharges
}
// Conn returns the connection the flow is multiplexed on.
func (f *flw) Conn() flow.ManagedConn {
return f.conn
}
// Closed returns a channel that remains open until the flow has been closed remotely
// or the context attached to the flow has been canceled.
//
// Note that after the returned channel is closed starting new writes will result
// in an error, but reads of previously queued data are still possible. No
// new data will be queued.
// TODO(mattr): update v23/flow docs.
func (f *flw) Closed() <-chan struct{} {
return f.ctx.Done()
}
func (f *flw) close(ctx *context.T, err error) {
f.q.close(ctx)
f.cancel()
if eid := verror.ErrorID(err); eid != ErrFlowClosedRemotely.ID &&
eid != ErrConnectionClosed.ID {
// We want to try to send this message even if ctx is already canceled.
ctx, cancel := context.WithRootCancel(ctx)
err := f.worker.Run(ctx, func(tokens int) (int, bool, error) {
return 0, true, f.conn.mp.writeMsg(ctx, &message.Data{
ID: f.id,
Flags: message.CloseFlag,
})
})
if err != nil {
ctx.Errorf("Could not send close flow message: %v", err)
}
cancel()
}
}
// Close marks the flow as closed. After Close is called, new data cannot be
// written on the flow. Reads of already queued data are still possible.
func (f *flw) Close() error {
f.close(f.ctx, nil)
return nil
}