blob: a8998911ae60d87ca8f0a818a8277a0a92ca2da4 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package manager
import (
"net"
"v.io/x/lib/vlog"
"v.io/x/ref/runtime/internal/lib/upcqueue"
"v.io/x/ref/runtime/internal/rpc/stream"
"v.io/x/ref/runtime/internal/rpc/stream/vc"
iversion "v.io/x/ref/runtime/internal/rpc/version"
"v.io/v23/context"
"v.io/v23/naming"
"v.io/v23/security"
"v.io/v23/verror"
)
type adapter interface {
Accept() (stream.XConn, naming.Endpoint, error)
Close() error
}
type xListener struct {
m *xmanager
q *upcqueue.T
principal security.Principal
lBlessings security.Blessings
ln adapter
}
func (ln *xListener) acceptLoop(ctx *context.T) {
for {
conn, lep, err := ln.ln.Accept()
if err != nil {
vlog.Errorf("netAcceptLoop: %v", err)
return
}
// TODO(suharshs): Get the versions here correctly.
vc, err := vc.XNewAccepted(ctx, conn, ln.principal, lep, ln.lBlessings, *iversion.SupportedRange, ln.q)
if err != nil {
vlog.Errorf("netAcceptLoop: %v", err)
return
}
if err := ln.m.vcCache.Insert(vc); err != nil {
vlog.Errorf("m.vcCache.Insert: %v", err)
}
}
}
func (ln *xListener) Accept() (stream.XFlow, error) {
item, err := ln.q.Get(nil)
switch {
case err == upcqueue.ErrQueueIsClosed:
return nil, verror.New(stream.ErrNetwork, nil, verror.New(errListenerAlreadyClosed, nil))
case err != nil:
return nil, verror.New(stream.ErrNetwork, nil, verror.New(errAcceptFailed, nil, err))
default:
return item.(stream.XFlow), nil
}
}
func (ln *xListener) Close() error {
// TODO(suharshs): implement this for real.
ln.q.Close()
return nil
}
type netAdapter struct {
net.Listener
rid naming.RoutingID
}
func (n *netAdapter) Accept() (stream.XConn, naming.Endpoint, error) {
if conn, err := n.Listener.Accept(); err != nil {
return nil, nil, err
} else {
return xNetConn{conn}, localEndpoint(conn, n.rid), nil
}
}
func newXNetListener(ctx *context.T, m *xmanager, netLn net.Listener, principal security.Principal, blessings security.Blessings) stream.XListener {
ln := &xListener{
m: m,
q: upcqueue.New(),
ln: &netAdapter{netLn, m.rid},
principal: principal,
lBlessings: blessings,
}
go ln.acceptLoop(ctx)
return ln
}
type proxyAdapter struct {
q *upcqueue.T
}
func (p *proxyAdapter) Accept() (stream.XConn, naming.Endpoint, error) {
item, err := p.q.Get(nil)
f := item.(stream.XFlow)
return f, f.LocalEndpoint(), err
}
func (p *proxyAdapter) Close() error {
p.q.Close()
return nil
}
func newXProxyListener(proxyEP naming.Endpoint, m *xmanager, ctx *context.T, principal security.Principal) (stream.XListener, error) {
pa := &proxyAdapter{upcqueue.New()}
ln := &xListener{
m: m,
q: upcqueue.New(),
ln: pa,
principal: principal,
lBlessings: principal.BlessingStore().Default(),
}
_, vc, err := m.internalDial(proxyEP, ln.principal)
if err != nil {
return nil, err
}
if err := vc.ListenTo(pa.q); err != nil {
return nil, err
}
go ln.acceptLoop(ctx)
return ln, nil
}