| // Copyright 2015 The Vanadium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package manager |
| |
| import ( |
| "net" |
| "time" |
| |
| inaming "v.io/x/ref/runtime/internal/naming" |
| "v.io/x/ref/runtime/internal/rpc/stream" |
| "v.io/x/ref/runtime/internal/rpc/stream/vc" |
| iversion "v.io/x/ref/runtime/internal/rpc/version" |
| |
| "v.io/v23/context" |
| "v.io/v23/naming" |
| "v.io/v23/rpc" |
| "v.io/v23/security" |
| "v.io/v23/verror" |
| ) |
| |
| type xmanager struct { |
| ctx *context.T // TODO(mattr): This should be pushed into the managers interfaces. |
| rid naming.RoutingID |
| vcCache *vc.XVCCache |
| } |
| |
| func XInternalNew(ctx *context.T, rid naming.RoutingID) stream.XManager { |
| return &xmanager{ |
| ctx: ctx, |
| rid: rid, |
| vcCache: vc.NewXVCCache(), |
| } |
| } |
| |
| func xdial(network, address string, timeout time.Duration) (net.Conn, error) { |
| d, _, _, _ := rpc.RegisteredProtocol(network) |
| if d != nil { |
| conn, err := d(network, address, timeout) |
| if err != nil { |
| return nil, verror.New(stream.ErrDialFailed, nil, err) |
| } |
| return conn, nil |
| } |
| return nil, verror.New(stream.ErrDialFailed, nil, verror.New(errUnknownNetwork, nil, network)) |
| } |
| |
| func (m *xmanager) Listen(protocol, address string, principal security.Principal, blessings security.Blessings, opts ...stream.ListenerOpt) (stream.XListener, naming.Endpoint, error) { |
| bNames, err := extractBlessingNames(principal, blessings) |
| if err != nil { |
| return nil, nil, err |
| } |
| ln, ep, err := m.internalListen(protocol, address, principal, blessings, opts...) |
| if err != nil { |
| return nil, nil, err |
| } |
| ep.Blessings = bNames |
| return ln, ep, nil |
| } |
| |
| func (m *xmanager) internalListen(protocol, address string, principal security.Principal, blessings security.Blessings, opts ...stream.ListenerOpt) (stream.XListener, *inaming.Endpoint, error) { |
| if protocol == inaming.Network { |
| // Act as if listening on the address of a remote proxy. |
| ep, err := inaming.NewEndpoint(address) |
| if err != nil { |
| return nil, nil, verror.New(stream.ErrBadArg, nil, verror.New(errEndpointParseError, nil, address, err)) |
| } |
| return m.remoteListen(ep, principal) |
| } |
| |
| netln, err := listen(protocol, address) |
| if err != nil { |
| return nil, nil, err |
| } |
| |
| ln := newXNetListener(m.ctx, m, netln, principal, blessings) |
| ep := &inaming.Endpoint{ |
| Protocol: protocol, |
| Address: netln.Addr().String(), |
| RID: m.rid, |
| } |
| return ln, ep, nil |
| } |
| |
| func (m *xmanager) remoteListen(proxy naming.Endpoint, principal security.Principal) (stream.XListener, *inaming.Endpoint, error) { |
| ln, err := newXProxyListener(proxy, m, m.ctx, principal) |
| if err != nil { |
| return nil, nil, err |
| } |
| ep := &inaming.Endpoint{ |
| Protocol: proxy.Addr().Network(), |
| Address: proxy.Addr().String(), |
| RID: m.rid, |
| } |
| return ln, ep, nil |
| } |
| |
| func (m *xmanager) Dial(remote naming.Endpoint, principal security.Principal, opts ...stream.VCOpt) (stream.XFlow, error) { |
| f, _, err := m.internalDial(remote, principal, opts...) |
| return f, err |
| } |
| |
| func (m *xmanager) internalDial(remote naming.Endpoint, principal security.Principal, opts ...stream.VCOpt) (stream.XFlow, stream.XVC, error) { |
| var timeout time.Duration |
| for _, o := range opts { |
| switch v := o.(type) { |
| case DialTimeout: |
| timeout = time.Duration(v) |
| } |
| } |
| var v stream.XVC |
| var err error |
| if v, err = m.vcCache.ReservedFind(remote); err != nil { |
| return nil, nil, err |
| } |
| defer m.vcCache.Unreserve(remote) |
| if v == nil { |
| conn, err := xdial(remote.Addr().Network(), remote.Addr().String(), timeout) |
| if err != nil { |
| return nil, nil, err |
| } |
| local := localEndpoint(conn, m.rid) |
| if v, err = vc.XNewDialed(m.ctx, xNetConn{conn}, principal, local, remote, *iversion.SupportedRange); err != nil { |
| return nil, nil, err |
| } |
| if err := m.vcCache.Insert(v); err != nil { |
| return nil, nil, err |
| } |
| } |
| |
| if remote.RoutingID() != v.(backingVC).RemoteEndpoint().RoutingID() { |
| flow, err := v.Connect() |
| if err != nil { |
| return nil, nil, err |
| } |
| if v, err = vc.XNewDialed(m.ctx, flow, principal, flow.LocalEndpoint(), remote, *iversion.SupportedRange); err != nil { |
| return nil, nil, err |
| } |
| } |
| f, err := v.Connect() |
| return f, v, err |
| } |
| |
| func (m *xmanager) Shutdown() { |
| // TODO(suharshs): Implement this. |
| } |
| |
| func (m *xmanager) RoutingID() naming.RoutingID { |
| return m.rid |
| } |
| |
| type backingVC interface { |
| LocalEndpoint() naming.Endpoint |
| RemoteEndpoint() naming.Endpoint |
| LocalPrincipal() security.Principal |
| LocalBlessings() security.Blessings |
| RemoteBlessings() security.Blessings |
| LocalDischarges() map[string]security.Discharge |
| RemoteDischarges() map[string]security.Discharge |
| VCDataCache() stream.VCDataCache |
| } |
| |
| func localEndpoint(conn net.Conn, rid naming.RoutingID) naming.Endpoint { |
| localAddr := conn.LocalAddr() |
| ep := &inaming.Endpoint{ |
| Protocol: localAddr.Network(), |
| Address: localAddr.String(), |
| RID: rid, |
| } |
| return ep |
| } |
| |
| type xNetConn struct { |
| net.Conn |
| } |
| |
| func (xNetConn) DisableEncryption() {} |