blob: 6ea024649248897a37ee4995a514a1e0b9e9ee30 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package conn
import (
"v.io/v23/context"
"v.io/v23/flow"
"v.io/v23/security"
"v.io/x/ref/runtime/internal/flow/flowcontrol"
"v.io/x/ref/runtime/internal/lib/upcqueue"
)
type flw struct {
id flowID
ctx *context.T
conn *Conn
closed chan struct{}
worker *flowcontrol.Worker
opened bool
q *upcqueue.T
readBufs [][]byte
dialerBlessings security.Blessings
dialerDischarges map[string]security.Discharge
}
var _ flow.Flow = &flw{}
func (c *Conn) newFlowLocked(ctx *context.T, id flowID) *flw {
f := &flw{
id: id,
ctx: ctx,
conn: c,
closed: make(chan struct{}),
worker: c.fc.NewWorker(flowPriority),
q: upcqueue.New(),
}
c.flows[id] = f
return f
}
func (f *flw) dialed() bool {
return f.id%2 == 0
}
// Implement io.Reader.
// Read and ReadMsg should not be called concurrently with themselves
// or each other.
func (f *flw) Read(p []byte) (n int, err error) {
for {
for len(f.readBufs) > 0 && len(f.readBufs[0]) == 0 {
f.readBufs = f.readBufs[1:]
}
if len(f.readBufs) > 0 {
break
}
var msg interface{}
msg, err = f.q.Get(f.ctx.Done())
f.readBufs = msg.([][]byte)
}
n = copy(p, f.readBufs[0])
f.readBufs[0] = f.readBufs[0][n:]
return
}
// ReadMsg is like read, but it reads bytes in chunks. Depending on the
// implementation the batch boundaries might or might not be significant.
// Read and ReadMsg should not be called concurrently with themselves
// or each other.
func (f *flw) ReadMsg() (buf []byte, err error) {
for {
for len(f.readBufs) > 0 {
buf, f.readBufs = f.readBufs[0], f.readBufs[1:]
if len(buf) > 0 {
return buf, nil
}
}
bufs, err := f.q.Get(f.ctx.Done())
if err != nil {
return nil, err
}
f.readBufs = bufs.([][]byte)
}
}
// Implement io.Writer.
// Write, WriteMsg, and WriteMsgAndClose should not be called concurrently
// with themselves or each other.
func (f *flw) Write(p []byte) (n int, err error) {
return f.WriteMsg(p)
}
func (f *flw) writeMsg(alsoClose bool, parts ...[]byte) (int, error) {
sent := 0
var left []byte
f.ctx.VI(3).Infof("trying to write: %d.", f.id)
err := f.worker.Run(f.ctx, func(tokens int) (int, bool, error) {
f.ctx.VI(3).Infof("writing: %d.", f.id)
if !f.opened {
// TODO(mattr): we should be able to send multiple messages
// in a single writeMsg call.
err := f.conn.mp.writeMsg(f.ctx, &openFlow{
id: f.id,
initialCounters: defaultBufferSize,
})
if err != nil {
return 0, false, err
}
f.opened = true
}
size := 0
var bufs [][]byte
if len(left) > 0 {
size += len(left)
bufs = append(bufs, left)
}
for size <= tokens && len(parts) > 0 {
bufs = append(bufs, parts[0])
size += len(parts[0])
parts = parts[1:]
}
if size > tokens {
lidx := len(bufs) - 1
last := bufs[lidx]
take := len(last) - (size - tokens)
bufs[lidx] = last[:take]
left = last[take:]
}
d := &data{
id: f.id,
payload: bufs,
}
done := len(left) == 0 && len(parts) == 0
if alsoClose && done {
d.flags |= closeFlag
}
sent += size
return size, done, f.conn.mp.writeMsg(f.ctx, d)
})
return sent, err
}
// WriteMsg is like Write, but allows writing more than one buffer at a time.
// The data in each buffer is written sequentially onto the flow. Returns the
// number of bytes written. WriteMsg must return a non-nil error if it writes
// less than the total number of bytes from all buffers.
// Write, WriteMsg, and WriteMsgAndClose should not be called concurrently
// with themselves or each other.
func (f *flw) WriteMsg(parts ...[]byte) (int, error) {
return f.writeMsg(false, parts...)
}
// WriteMsgAndClose performs WriteMsg and then closes the flow.
// Write, WriteMsg, and WriteMsgAndClose should not be called concurrently
// with themselves or each other.
func (f *flw) WriteMsgAndClose(parts ...[]byte) (int, error) {
return f.writeMsg(true, parts...)
}
// SetContext sets the context associated with the flow. Typically this is
// used to set state that is only available after the flow is connected, such
// as a more restricted flow timeout, or the language of the request.
//
// The flow.Manager associated with ctx must be the same flow.Manager that the
// flow was dialed or accepted from, otherwise an error is returned.
// TODO(mattr): enforce this restriction.
//
// TODO(mattr): update v23/flow documentation.
// SetContext may not be called concurrently with other methods.
func (f *flw) SetContext(ctx *context.T) error {
f.ctx = ctx
return nil
}
// LocalBlessings returns the blessings presented by the local end of the flow
// during authentication.
func (f *flw) LocalBlessings() security.Blessings {
if f.dialed() {
return f.dialerBlessings
}
return f.conn.AcceptorBlessings()
}
// RemoteBlessings returns the blessings presented by the remote end of the
// flow during authentication.
func (f *flw) RemoteBlessings() security.Blessings {
if f.dialed() {
return f.conn.AcceptorBlessings()
}
return f.dialerBlessings
}
// LocalDischarges returns the discharges presented by the local end of the
// flow during authentication.
//
// Discharges are organized in a map keyed by the discharge-identifier.
func (f *flw) LocalDischarges() map[string]security.Discharge {
if f.dialed() {
return f.dialerDischarges
}
return f.conn.AcceptorDischarges()
}
// RemoteDischarges returns the discharges presented by the remote end of the
// flow during authentication.
//
// Discharges are organized in a map keyed by the discharge-identifier.
func (f *flw) RemoteDischarges() map[string]security.Discharge {
if f.dialed() {
return f.conn.AcceptorDischarges()
}
return f.dialerDischarges
}
// Conn returns the connection the flow is multiplexed on.
func (f *flw) Conn() flow.Conn {
return f.conn
}
// Closed returns a channel that remains open until the flow has been closed or
// the ctx to the Dial or Accept call used to create the flow has been cancelled.
func (f *flw) Closed() <-chan struct{} {
return f.closed
}