blob: 8cd4b6a0c67771fe7020d6fd6e0eb2b08cbf63b1 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package conn
import (
"reflect"
"sync"
"v.io/v23"
"v.io/v23/context"
"v.io/v23/flow"
"v.io/v23/naming"
"v.io/v23/rpc/version"
"v.io/v23/security"
"v.io/v23/verror"
"v.io/x/ref/runtime/internal/flow/flowcontrol"
)
// flowID is a number assigned to identify a flow.
// Each flow on a given conn will have a unique number.
type flowID uint64
const mtu = 1 << 16
const defaultBufferSize = 1 << 20
const reservedFlows = 10
const (
expressPriority = iota
flowPriority
tearDownPriority
)
type MsgReadWriteCloser interface {
flow.MsgReadWriter
Close() error
}
// FlowHandlers process accepted flows.
type FlowHandler interface {
// HandleFlow processes an accepted flow.
HandleFlow(flow.Flow) error
}
// Conns are a multiplexing encrypted channels that can host Flows.
type Conn struct {
fc *flowcontrol.FlowController
mp *messagePipe
handler FlowHandler
versions version.RPCVersionRange
acceptorBlessings security.Blessings
dialerPublicKey security.PublicKey
local, remote naming.Endpoint
closed chan struct{}
mu sync.Mutex
nextFid flowID
flows map[flowID]*flw
}
// Ensure that *Conn implements flow.Conn
var _ flow.Conn = &Conn{}
// NewDialed dials a new Conn on the given conn.
func NewDialed(
ctx *context.T,
conn MsgReadWriteCloser,
local, remote naming.Endpoint,
versions version.RPCVersionRange,
handler FlowHandler,
fn flow.BlessingsForPeer) (*Conn, error) {
principal := v23.GetPrincipal(ctx)
c := &Conn{
fc: flowcontrol.New(defaultBufferSize, mtu),
mp: newMessagePipe(conn),
handler: handler,
versions: versions,
dialerPublicKey: principal.PublicKey(),
local: local,
remote: remote,
closed: make(chan struct{}),
nextFid: reservedFlows,
flows: map[flowID]*flw{},
}
go c.readLoop(ctx)
return c, nil
}
// NewAccepted accepts a new Conn on the given conn.
func NewAccepted(
ctx *context.T,
conn MsgReadWriteCloser,
local naming.Endpoint,
lBlessings security.Blessings,
versions version.RPCVersionRange,
handler FlowHandler) (*Conn, error) {
c := &Conn{
fc: flowcontrol.New(defaultBufferSize, mtu),
mp: newMessagePipe(conn),
handler: handler,
versions: versions,
acceptorBlessings: lBlessings,
local: local,
remote: local, // TODO(mattr): Get the real remote endpoint.
closed: make(chan struct{}),
nextFid: reservedFlows + 1,
flows: map[flowID]*flw{},
}
go c.readLoop(ctx)
return c, nil
}
// Dial dials a new flow on the Conn.
func (c *Conn) Dial(ctx *context.T) (flow.Flow, error) {
defer c.mu.Unlock()
c.mu.Lock()
if c.flows == nil {
return nil, NewErrConnectionClosed(ctx)
}
id := c.nextFid
c.nextFid++
return c.newFlowLocked(ctx, id), nil
}
// LocalEndpoint returns the local vanadium Endpoint
func (c *Conn) LocalEndpoint() naming.Endpoint { return c.local }
// RemoteEndpoint returns the remote vanadium Endpoint
func (c *Conn) RemoteEndpoint() naming.Endpoint { return c.remote }
// DialerPublicKey returns the public key presented by the dialer during authentication.
func (c *Conn) DialerPublicKey() security.PublicKey { return c.dialerPublicKey }
// AcceptorBlessings returns the blessings presented by the acceptor during authentication.
func (c *Conn) AcceptorBlessings() security.Blessings { return c.acceptorBlessings }
// AcceptorDischarges returns the discharges presented by the acceptor during authentication.
// Discharges are organized in a map keyed by the discharge-identifier.
func (c *Conn) AcceptorDischarges() map[string]security.Discharge { return nil }
// Closed returns a channel that will be closed after the Conn is shutdown.
// After this channel is closed it is guaranteed that all Dial calls will fail
// with an error and no more flows will be sent to the FlowHandler.
func (c *Conn) Closed() <-chan struct{} { return c.closed }
// Close shuts down a conn. This will cause the read loop
// to exit.
func (c *Conn) Close(ctx *context.T, err error) {
c.mu.Lock()
var flows map[flowID]*flw
flows, c.flows = c.flows, nil
c.mu.Unlock()
if flows == nil {
// We've already torn this conn down.
return
}
ferr := err
if verror.ErrorID(err) == ErrConnClosedRemotely.ID {
ferr = NewErrFlowClosedRemotely(ctx)
} else {
message := ""
if err != nil {
message = err.Error()
}
cerr := c.fc.Run(ctx, expressPriority, func(_ int) (int, bool, error) {
return 0, true, c.mp.writeMsg(ctx, &tearDown{Message: message})
})
if cerr != nil {
ctx.Errorf("Error sending tearDown on connection to %s: %v", c.remote, cerr)
}
}
for _, f := range flows {
f.close(ctx, ferr)
}
if cerr := c.mp.close(); cerr != nil {
ctx.Errorf("Error closing underlying connection for %s: %v", c.remote, cerr)
}
// TODO(mattr): ensure the readLoop is finished before closing this.
close(c.closed)
}
func (c *Conn) release(ctx *context.T) {
counts := map[flowID]uint64{}
c.mu.Lock()
for fid, f := range c.flows {
if release := f.q.release(); release > 0 {
counts[fid] = uint64(release)
}
}
c.mu.Unlock()
if len(counts) == 0 {
return
}
err := c.fc.Run(ctx, expressPriority, func(_ int) (int, bool, error) {
err := c.mp.writeMsg(ctx, &release{
counters: counts,
})
return 0, true, err
})
if err != nil {
c.Close(ctx, NewErrSend(ctx, "release", c.remote.String(), err))
}
}
func (c *Conn) readLoop(ctx *context.T) {
var terr error
defer c.Close(ctx, terr)
for {
x, err := c.mp.readMsg(ctx)
if err != nil {
c.Close(ctx, NewErrRecv(ctx, c.remote.String(), err))
return
}
switch msg := x.(type) {
case *tearDown:
terr = NewErrConnClosedRemotely(ctx, msg.Message)
return
case *openFlow:
c.mu.Lock()
f := c.newFlowLocked(ctx, msg.id)
c.mu.Unlock()
c.handler.HandleFlow(f)
case *release:
release := make([]flowcontrol.Release, 0, len(msg.counters))
c.mu.Lock()
for fid, val := range msg.counters {
if f := c.flows[fid]; f != nil {
release = append(release, flowcontrol.Release{
Worker: f.worker,
Tokens: int(val),
})
}
}
c.mu.Unlock()
if terr = c.fc.Release(ctx, release); terr != nil {
return
}
case *data:
c.mu.Lock()
f := c.flows[msg.id]
c.mu.Unlock()
if f == nil {
ctx.Infof("Ignoring data message for unknown flow on connection to %s: %d", c.remote, msg.id)
continue
}
if terr = f.q.put(ctx, msg.payload); terr != nil {
return
}
if msg.flags&closeFlag != 0 {
f.close(ctx, NewErrFlowClosedRemotely(f.ctx))
}
case *unencryptedData:
c.mu.Lock()
f := c.flows[msg.id]
c.mu.Unlock()
if f == nil {
ctx.Infof("Ignoring data message for unknown flow: %d", msg.id)
continue
}
if terr = f.q.put(ctx, msg.payload); terr != nil {
return
}
if msg.flags&closeFlag != 0 {
f.close(ctx, NewErrFlowClosedRemotely(f.ctx))
}
default:
terr = NewErrUnexpectedMsg(ctx, reflect.TypeOf(msg).Name())
return
}
}
}