| // Copyright 2015 The Vanadium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package manager |
| |
| import ( |
| "net" |
| |
| "v.io/x/lib/vlog" |
| "v.io/x/ref/runtime/internal/lib/upcqueue" |
| "v.io/x/ref/runtime/internal/rpc/stream" |
| "v.io/x/ref/runtime/internal/rpc/stream/vc" |
| iversion "v.io/x/ref/runtime/internal/rpc/version" |
| |
| "v.io/v23/context" |
| "v.io/v23/naming" |
| "v.io/v23/security" |
| "v.io/v23/verror" |
| ) |
| |
| type adapter interface { |
| Accept() (stream.XConn, naming.Endpoint, error) |
| Close() error |
| } |
| |
| type xListener struct { |
| m *xmanager |
| q *upcqueue.T |
| principal security.Principal |
| lBlessings security.Blessings |
| ln adapter |
| } |
| |
| func (ln *xListener) acceptLoop(ctx *context.T) { |
| for { |
| conn, lep, err := ln.ln.Accept() |
| if err != nil { |
| vlog.Errorf("netAcceptLoop: %v", err) |
| return |
| } |
| // TODO(suharshs): Get the versions here correctly. |
| vc, err := vc.XNewAccepted(ctx, conn, ln.principal, lep, ln.lBlessings, *iversion.SupportedRange, ln.q) |
| if err != nil { |
| vlog.Errorf("netAcceptLoop: %v", err) |
| return |
| } |
| if err := ln.m.vcCache.Insert(vc); err != nil { |
| vlog.Errorf("m.vcCache.Insert: %v", err) |
| } |
| } |
| } |
| |
| func (ln *xListener) Accept() (stream.XFlow, error) { |
| item, err := ln.q.Get(nil) |
| switch { |
| case err == upcqueue.ErrQueueIsClosed: |
| return nil, verror.New(stream.ErrNetwork, nil, verror.New(errListenerAlreadyClosed, nil)) |
| case err != nil: |
| return nil, verror.New(stream.ErrNetwork, nil, verror.New(errAcceptFailed, nil, err)) |
| default: |
| return item.(stream.XFlow), nil |
| } |
| } |
| |
| func (ln *xListener) Close() error { |
| // TODO(suharshs): implement this for real. |
| ln.q.Close() |
| return nil |
| } |
| |
| type netAdapter struct { |
| net.Listener |
| rid naming.RoutingID |
| } |
| |
| func (n *netAdapter) Accept() (stream.XConn, naming.Endpoint, error) { |
| if conn, err := n.Listener.Accept(); err != nil { |
| return nil, nil, err |
| } else { |
| return xNetConn{conn}, localEndpoint(conn, n.rid), nil |
| } |
| } |
| |
| func newXNetListener(ctx *context.T, m *xmanager, netLn net.Listener, principal security.Principal, blessings security.Blessings) stream.XListener { |
| ln := &xListener{ |
| m: m, |
| q: upcqueue.New(), |
| ln: &netAdapter{netLn, m.rid}, |
| principal: principal, |
| lBlessings: blessings, |
| } |
| go ln.acceptLoop(ctx) |
| return ln |
| } |
| |
| type proxyAdapter struct { |
| q *upcqueue.T |
| } |
| |
| func (p *proxyAdapter) Accept() (stream.XConn, naming.Endpoint, error) { |
| item, err := p.q.Get(nil) |
| f := item.(stream.XFlow) |
| return f, f.LocalEndpoint(), err |
| } |
| |
| func (p *proxyAdapter) Close() error { |
| p.q.Close() |
| return nil |
| } |
| |
| func newXProxyListener(proxyEP naming.Endpoint, m *xmanager, ctx *context.T, principal security.Principal) (stream.XListener, error) { |
| pa := &proxyAdapter{upcqueue.New()} |
| ln := &xListener{ |
| m: m, |
| q: upcqueue.New(), |
| ln: pa, |
| principal: principal, |
| lBlessings: principal.BlessingStore().Default(), |
| } |
| _, vc, err := m.internalDial(proxyEP, ln.principal) |
| if err != nil { |
| return nil, err |
| } |
| if err := vc.ListenTo(pa.q); err != nil { |
| return nil, err |
| } |
| go ln.acceptLoop(ctx) |
| return ln, nil |
| } |