blob: 34b797e7cc07dddcc41bd047889d9bafea8c5712 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package manager
import (
"net"
"time"
inaming "v.io/x/ref/runtime/internal/naming"
"v.io/x/ref/runtime/internal/rpc/stream"
"v.io/x/ref/runtime/internal/rpc/stream/vc"
iversion "v.io/x/ref/runtime/internal/rpc/version"
"v.io/v23/context"
"v.io/v23/naming"
"v.io/v23/rpc"
"v.io/v23/security"
"v.io/v23/verror"
)
type xmanager struct {
ctx *context.T // TODO(mattr): This should be pushed into the managers interfaces.
rid naming.RoutingID
vcCache *vc.XVCCache
}
func XInternalNew(ctx *context.T, rid naming.RoutingID) stream.XManager {
return &xmanager{
ctx: ctx,
rid: rid,
vcCache: vc.NewXVCCache(),
}
}
func xdial(network, address string, timeout time.Duration) (net.Conn, error) {
d, _, _, _ := rpc.RegisteredProtocol(network)
if d != nil {
conn, err := d(network, address, timeout)
if err != nil {
return nil, verror.New(stream.ErrDialFailed, nil, err)
}
return conn, nil
}
return nil, verror.New(stream.ErrDialFailed, nil, verror.New(errUnknownNetwork, nil, network))
}
func (m *xmanager) Listen(protocol, address string, principal security.Principal, blessings security.Blessings, opts ...stream.ListenerOpt) (stream.XListener, naming.Endpoint, error) {
bNames, err := extractBlessingNames(principal, blessings)
if err != nil {
return nil, nil, err
}
ln, ep, err := m.internalListen(protocol, address, principal, blessings, opts...)
if err != nil {
return nil, nil, err
}
ep.Blessings = bNames
return ln, ep, nil
}
func (m *xmanager) internalListen(protocol, address string, principal security.Principal, blessings security.Blessings, opts ...stream.ListenerOpt) (stream.XListener, *inaming.Endpoint, error) {
if protocol == inaming.Network {
// Act as if listening on the address of a remote proxy.
ep, err := inaming.NewEndpoint(address)
if err != nil {
return nil, nil, verror.New(stream.ErrBadArg, nil, verror.New(errEndpointParseError, nil, address, err))
}
return m.remoteListen(ep, principal)
}
netln, err := listen(protocol, address)
if err != nil {
return nil, nil, err
}
ln := newXNetListener(m.ctx, m, netln, principal, blessings)
ep := &inaming.Endpoint{
Protocol: protocol,
Address: netln.Addr().String(),
RID: m.rid,
}
return ln, ep, nil
}
func (m *xmanager) remoteListen(proxy naming.Endpoint, principal security.Principal) (stream.XListener, *inaming.Endpoint, error) {
ln, err := newXProxyListener(proxy, m, m.ctx, principal)
if err != nil {
return nil, nil, err
}
ep := &inaming.Endpoint{
Protocol: proxy.Addr().Network(),
Address: proxy.Addr().String(),
RID: m.rid,
}
return ln, ep, nil
}
func (m *xmanager) Dial(remote naming.Endpoint, principal security.Principal, opts ...stream.VCOpt) (stream.XFlow, error) {
f, _, err := m.internalDial(remote, principal, opts...)
return f, err
}
func (m *xmanager) internalDial(remote naming.Endpoint, principal security.Principal, opts ...stream.VCOpt) (stream.XFlow, stream.XVC, error) {
var timeout time.Duration
for _, o := range opts {
switch v := o.(type) {
case DialTimeout:
timeout = time.Duration(v)
}
}
var v stream.XVC
var err error
if v, err = m.vcCache.ReservedFind(remote); err != nil {
return nil, nil, err
}
defer m.vcCache.Unreserve(remote)
if v == nil {
conn, err := xdial(remote.Addr().Network(), remote.Addr().String(), timeout)
if err != nil {
return nil, nil, err
}
local := localEndpoint(conn, m.rid)
if v, err = vc.XNewDialed(m.ctx, xNetConn{conn}, principal, local, remote, *iversion.SupportedRange); err != nil {
return nil, nil, err
}
if err := m.vcCache.Insert(v); err != nil {
return nil, nil, err
}
}
if remote.RoutingID() != v.(backingVC).RemoteEndpoint().RoutingID() {
flow, err := v.Connect()
if err != nil {
return nil, nil, err
}
if v, err = vc.XNewDialed(m.ctx, flow, principal, flow.LocalEndpoint(), remote, *iversion.SupportedRange); err != nil {
return nil, nil, err
}
}
f, err := v.Connect()
return f, v, err
}
func (m *xmanager) Shutdown() {
// TODO(suharshs): Implement this.
}
func (m *xmanager) RoutingID() naming.RoutingID {
return m.rid
}
type backingVC interface {
LocalEndpoint() naming.Endpoint
RemoteEndpoint() naming.Endpoint
LocalPrincipal() security.Principal
LocalBlessings() security.Blessings
RemoteBlessings() security.Blessings
LocalDischarges() map[string]security.Discharge
RemoteDischarges() map[string]security.Discharge
VCDataCache() stream.VCDataCache
}
func localEndpoint(conn net.Conn, rid naming.RoutingID) naming.Endpoint {
localAddr := conn.LocalAddr()
ep := &inaming.Endpoint{
Protocol: localAddr.Network(),
Address: localAddr.String(),
RID: rid,
}
return ep
}
type xNetConn struct {
net.Conn
}
func (xNetConn) DisableEncryption() {}