blob: 5e3a32ae5df458e334f024c6191f821016e3e756 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package proxy
import (
"fmt"
"net"
"reflect"
"sync"
"time"
"v.io/x/lib/netstate"
"v.io/v23"
"v.io/v23/context"
"v.io/v23/naming"
"v.io/v23/rpc"
"v.io/v23/security"
"v.io/v23/verror"
"v.io/v23/vom"
"v.io/x/ref/runtime/internal/lib/bqueue"
"v.io/x/ref/runtime/internal/lib/bqueue/drrqueue"
"v.io/x/ref/runtime/internal/lib/iobuf"
"v.io/x/ref/runtime/internal/lib/publisher"
"v.io/x/ref/runtime/internal/lib/upcqueue"
inaming "v.io/x/ref/runtime/internal/naming"
"v.io/x/ref/runtime/internal/rpc/stream"
"v.io/x/ref/runtime/internal/rpc/stream/crypto"
"v.io/x/ref/runtime/internal/rpc/stream/id"
"v.io/x/ref/runtime/internal/rpc/stream/message"
"v.io/x/ref/runtime/internal/rpc/stream/vc"
"v.io/x/ref/runtime/internal/rpc/stream/vif"
iversion "v.io/x/ref/runtime/internal/rpc/version"
"v.io/x/ref/lib/stats"
)
const pkgPath = "v.io/x/ref/runtime/proxy"
func reg(id, msg string) verror.IDAction {
return verror.Register(verror.ID(pkgPath+id), verror.NoRetry, msg)
}
var (
// These errors are intended to be used as arguments to higher
// level errors and hence {1}{2} is omitted from their format
// strings to avoid repeating these n-times in the final error
// message visible to the user.
errNoRoutingTableEntry = reg(".errNoRoutingTableEntry", "routing table has no entry for the VC")
errProcessVanished = reg(".errProcessVanished", "remote process vanished")
errDuplicateSetupVC = reg(".errDuplicateSetupVC", "duplicate SetupVC request")
errVomEncodeResponse = reg(".errVomEncodeResponse", "failed to encode response from proxy{:3}")
errNoRequest = reg(".errNoRequest", "unable to read Request{:3}")
errServerClosedByProxy = reg(".errServerClosedByProxy", "server closed by proxy")
errRemoveServerVC = reg(".errRemoveServerVC", "failed to remove server VC {3}{:4}")
errNetConnClosing = reg(".errNetConnClosing", "net.Conn is closing")
errFailedToAcceptHealthCheck = reg(".errFailedToAcceptHealthCheck", "failed to accept health check flow")
errIncompatibleVersions = reg(".errIncompatibleVersions", "{:3}")
errAlreadyProxied = reg(".errAlreadyProxied", "server with routing id {3} is already being proxied")
errUnknownNetwork = reg(".errUnknownNetwork", "unknown network {3}")
errListenFailed = reg(".errListenFailed", "net.Listen({3}, {4}) failed{:5}")
errFailedToForwardRxBufs = reg(".errFailedToForwardRxBufs", "failed to forward receive buffers{:3}")
errFailedToFowardDataMsg = reg(".errFailedToFowardDataMsg", "failed to forward data message{:3}")
errFailedToFowardOpenFlow = reg(".errFailedToFowardOpenFlow", "failed to forward open flow{:3}")
errServerNotBeingProxied = reg(".errServerNotBeingProxied", "no server with routing id {3} is being proxied")
errServerVanished = reg(".errServerVanished", "server with routing id {3} vanished")
errAccessibleAddresses = reg(".errAccessibleAddresses", "failed to obtain a set of accessible addresses{:3}")
errNoAccessibleAddresses = reg(".errNoAccessibleAddresses", "no accessible addresses were available for {3}")
errEmptyListenSpec = reg(".errEmptyListenSpec", "no addresses supplied in the listen spec")
)
// Proxy routes virtual circuit (VC) traffic between multiple underlying
// network connections.
type Proxy struct {
ctx *context.T
ln net.Listener
rid naming.RoutingID
principal security.Principal
blessings security.Blessings
authorizer security.Authorizer
mu sync.RWMutex
servers *servermap
processes map[*process]struct{}
pubAddress string
statsName string
}
// process encapsulates the physical network connection and the routing table
// associated with the process at the other end of the network connection.
type process struct {
proxy *Proxy
ctx *context.T
conn net.Conn
pool *iobuf.Pool
reader *iobuf.Reader
ctrlCipher crypto.ControlCipher
queue *upcqueue.T
mu sync.RWMutex
routingTable map[id.VC]*destination
nextVCI id.VC
servers map[id.VC]*vc.VC // servers wishing to be proxied create a VC that terminates at the proxy
bq bqueue.T // Flow control for messages sent on behalf of servers.
}
// destination is an entry in the routingtable of a process.
type destination struct {
VCI id.VC
Process *process
}
// server encapsulates information stored about a server exporting itself via the proxy.
type server struct {
Process *process
VC *vc.VC
}
func (s *server) RoutingID() naming.RoutingID { return s.VC.RemoteEndpoint().RoutingID() }
func (s *server) Close(err error) {
if vc := s.Process.RemoveServerVC(s.VC.VCI()); vc != nil {
if err != nil {
vc.Close(verror.New(stream.ErrProxy, nil, verror.New(errRemoveServerVC, nil, s.VC.VCI(), err)))
} else {
vc.Close(verror.New(stream.ErrProxy, nil, verror.New(errServerClosedByProxy, nil)))
}
s.Process.SendCloseVC(s.VC.VCI(), err)
}
}
func (s *server) String() string {
return fmt.Sprintf("RoutingID %v on process %v (VCI:%v Blessings:%v)", s.RoutingID(), s.Process, s.VC.VCI(), s.VC.RemoteBlessings())
}
// servermap is a concurrent-access safe map from the RoutingID of a server exporting itself
// through the proxy to the underlying network connection that the server is found on.
type servermap struct {
ctx *context.T
mu sync.Mutex
m map[naming.RoutingID]*server
}
func (m *servermap) Add(server *server) error {
key := server.RoutingID()
m.mu.Lock()
defer m.mu.Unlock()
if m.m[key] != nil {
return verror.New(stream.ErrProxy, nil, verror.New(errAlreadyProxied, nil, key))
}
m.m[key] = server
proxyLog(m.ctx, "Started proxying server: %v", server)
return nil
}
func (m *servermap) Remove(server *server) {
key := server.RoutingID()
m.mu.Lock()
if m.m[key] != nil {
delete(m.m, key)
proxyLog(m.ctx, "Stopped proxying server: %v", server)
}
m.mu.Unlock()
}
func (m *servermap) Process(rid naming.RoutingID) *process {
m.mu.Lock()
defer m.mu.Unlock()
if s := m.m[rid]; s != nil {
return s.Process
}
return nil
}
func (m *servermap) List() []string {
m.mu.Lock()
defer m.mu.Unlock()
ret := make([]string, 0, len(m.m))
for _, s := range m.m {
ret = append(ret, s.String())
}
return ret
}
// New creates a new Proxy that listens for network connections on the provided
// ListenSpec and routes VC traffic between accepted connections.
//
// Servers wanting to "listen through the proxy" will only be allowed to do so
// if the blessings they present are accepted to the provided authorization
// policy (authorizer).
func New(ctx *context.T, spec rpc.ListenSpec, authorizer security.Authorizer, names ...string) (shutdown func(), endpoint naming.Endpoint, err error) {
rid, err := naming.NewRoutingID()
if err != nil {
return nil, nil, err
}
proxy, err := internalNew(rid, ctx, spec, authorizer)
if err != nil {
return nil, nil, err
}
var pub publisher.Publisher
for _, name := range names {
if name == "" {
// Consistent with v23.rpc.Server.Serve(...)
// an empty name implies, "do not publish"
continue
}
if pub == nil {
pub = publisher.New(ctx, v23.GetNamespace(ctx), time.Minute)
pub.AddServer(proxy.endpoint().String())
}
pub.AddName(name, false, true)
}
shutdown = func() {
if pub != nil {
pub.Stop()
pub.WaitForStop()
}
proxy.shutdown()
}
return shutdown, proxy.endpoint(), nil
}
func internalNew(rid naming.RoutingID, ctx *context.T, spec rpc.ListenSpec, authorizer security.Authorizer) (*Proxy, error) {
if len(spec.Addrs) == 0 {
return nil, verror.New(stream.ErrProxy, nil, verror.New(errEmptyListenSpec, nil))
}
laddr := spec.Addrs[0]
network := laddr.Protocol
address := laddr.Address
_, _, listenFn, _ := rpc.RegisteredProtocol(network)
if listenFn == nil {
return nil, verror.New(stream.ErrProxy, nil, verror.New(errUnknownNetwork, nil, network))
}
ln, err := listenFn(ctx, network, address)
if err != nil {
return nil, verror.New(stream.ErrProxy, nil, verror.New(errListenFailed, nil, network, address, err))
}
pub, _, err := netstate.PossibleAddresses(ln.Addr().Network(), ln.Addr().String(), spec.AddressChooser)
if err != nil {
ln.Close()
return nil, verror.New(stream.ErrProxy, nil, verror.New(errAccessibleAddresses, nil, err))
}
if len(pub) == 0 {
ln.Close()
return nil, verror.New(stream.ErrProxy, nil, verror.New(errNoAccessibleAddresses, nil, ln.Addr().String()))
}
if authorizer == nil {
authorizer = security.DefaultAuthorizer()
}
proxy := &Proxy{
ctx: ctx,
ln: ln,
rid: rid,
authorizer: authorizer,
servers: &servermap{ctx: ctx, m: make(map[naming.RoutingID]*server)},
processes: make(map[*process]struct{}),
// TODO(cnicolaou): should use all of the available addresses
pubAddress: pub[0].String(),
principal: v23.GetPrincipal(ctx),
statsName: naming.Join("rpc", "proxy", "routing-id", rid.String(), "debug"),
}
if proxy.principal != nil {
proxy.blessings = proxy.principal.BlessingStore().Default()
}
stats.NewStringFunc(proxy.statsName, proxy.debugString)
go proxy.listenLoop()
return proxy, nil
}
func (p *Proxy) listenLoop() {
proxyLog(p.ctx, "Proxy listening on (%q, %q): %v", p.ln.Addr().Network(), p.ln.Addr(), p.endpoint())
for {
conn, err := p.ln.Accept()
if err != nil {
proxyLog(p.ctx, "Exiting listenLoop of proxy %q: %v", p.endpoint(), err)
return
}
go p.acceptProcess(conn)
}
}
func (p *Proxy) acceptProcess(conn net.Conn) {
pool := iobuf.NewPool(0)
reader := iobuf.NewReader(pool, conn)
var blessings security.Blessings
if p.principal != nil {
blessings = p.principal.BlessingStore().Default()
}
cipher, _, err := vif.AuthenticateAsServer(conn, reader, nil, nil, p.principal, blessings, nil)
if err != nil {
processLog(p.ctx, "Process %v failed to authenticate: %s", p, err)
return
}
process := &process{
ctx: p.ctx,
proxy: p,
conn: conn,
pool: pool,
reader: reader,
ctrlCipher: cipher,
queue: upcqueue.New(),
routingTable: make(map[id.VC]*destination),
servers: make(map[id.VC]*vc.VC),
bq: drrqueue.New(vc.MaxPayloadSizeBytes),
}
p.mu.Lock()
if p.processes == nil {
// The proxy has been shutdowned.
p.mu.Unlock()
return
}
p.processes[process] = struct{}{}
p.mu.Unlock()
go process.serverVCsLoop()
go process.writeLoop()
go process.readLoop()
processLog(p.ctx, "Started process %v", process)
}
func (p *Proxy) removeProcess(process *process) {
p.mu.Lock()
delete(p.processes, process)
p.mu.Unlock()
}
func (p *Proxy) runServer(server *server, c <-chan vc.HandshakeResult) {
hr := <-c
if hr.Error != nil {
server.Close(hr.Error)
return
}
// See comments in protocol.vdl for the protocol between servers and the proxy.
conn, err := hr.Listener.Accept()
if err != nil {
server.Close(verror.New(stream.ErrProxy, nil, verror.New(errFailedToAcceptHealthCheck, nil)))
return
}
server.Process.InitVCI(server.VC.VCI())
var request Request
var response Response
dec := vom.NewDecoder(conn)
if err := dec.Decode(&request); err != nil {
response.Error = verror.New(stream.ErrProxy, nil, verror.New(errNoRequest, nil, err))
} else if err := p.authorize(server.VC, request); err != nil {
response.Error = err
} else if err := p.servers.Add(server); err != nil {
response.Error = verror.Convert(verror.ErrUnknown, nil, err)
} else {
defer p.servers.Remove(server)
proxyEP := p.endpoint()
ep := &inaming.Endpoint{
Protocol: proxyEP.Protocol,
Address: proxyEP.Address,
RID: server.VC.RemoteEndpoint().RoutingID(),
}
response.Endpoint = ep.String()
}
enc := vom.NewEncoder(conn)
if err := enc.Encode(response); err != nil {
proxyLog(p.ctx, "Failed to encode response %#v for server %v", response, server)
server.Close(verror.New(stream.ErrProxy, nil, verror.New(errVomEncodeResponse, nil, err)))
return
}
// Reject all other flows
go func() {
for {
flow, err := hr.Listener.Accept()
if err != nil {
return
}
flow.Close()
}
}()
// Wait for this flow to be closed.
<-conn.Closed()
server.Close(nil)
}
func (p *Proxy) authorize(vc *vc.VC, request Request) error {
var dmap map[string]security.Discharge
if len(request.Discharges) > 0 {
dmap = make(map[string]security.Discharge)
for _, d := range request.Discharges {
dmap[d.ID()] = d
}
}
// Blessings must be bound to the same public key as the VC.
// (Repeating logic in the RPC server authorization code).
if got, want := request.Blessings.PublicKey(), vc.RemoteBlessings().PublicKey(); !request.Blessings.IsZero() && !reflect.DeepEqual(got, want) {
return verror.New(verror.ErrNoAccess, nil, fmt.Errorf("malformed request: Blessings sent in proxy.Request are bound to public key %v and not %v", got, want))
}
return p.authorizer.Authorize(p.ctx, security.NewCall(&security.CallParams{
LocalPrincipal: vc.LocalPrincipal(),
LocalBlessings: vc.LocalBlessings(),
RemoteBlessings: request.Blessings,
LocalEndpoint: vc.LocalEndpoint(),
RemoteEndpoint: vc.RemoteEndpoint(),
LocalDischarges: vc.LocalDischarges(),
RemoteDischarges: dmap,
}))
}
func (p *Proxy) routeCounters(process *process, counters message.Counters) {
// Since each VC can be routed to a different process, split up the
// Counters into one message per VC.
// Ideally, would split into one message per process (rather than per
// flow). This optimization is left an as excercise to the interested.
for cid, bytes := range counters {
srcVCI := cid.VCI()
if vc := process.ServerVC(srcVCI); vc != nil {
vc.ReleaseCounters(cid.Flow(), bytes)
continue
}
if d := process.Route(srcVCI); d != nil {
c := message.NewCounters()
c.Add(d.VCI, cid.Flow(), bytes)
if err := d.Process.queue.Put(&message.AddReceiveBuffers{Counters: c}); err != nil {
process.RemoveRoute(srcVCI)
process.SendCloseVC(srcVCI, verror.New(stream.ErrProxy, nil, verror.New(errFailedToForwardRxBufs, nil, err)))
}
}
}
}
func startRoutingVC(ctx *context.T, srcVCI, dstVCI id.VC, srcProcess, dstProcess *process) {
dstProcess.AddRoute(dstVCI, &destination{VCI: srcVCI, Process: srcProcess})
srcProcess.AddRoute(srcVCI, &destination{VCI: dstVCI, Process: dstProcess})
vcLog(ctx, "Routing (VCI %d @ [%s]) <-> (VCI %d @ [%s])", srcVCI, srcProcess, dstVCI, dstProcess)
}
// Endpoint returns the endpoint of the proxy service. By Dialing a VC to this
// endpoint, processes can have their services exported through the proxy.
func (p *Proxy) endpoint() *inaming.Endpoint {
ep := &inaming.Endpoint{
Protocol: p.ln.Addr().Network(),
Address: p.pubAddress,
RID: p.rid,
}
if prncpl := p.principal; prncpl != nil {
ep.Blessings = security.BlessingNames(prncpl, prncpl.BlessingStore().Default())
}
return ep
}
// Shutdown stops the proxy service, closing all network connections.
func (p *Proxy) shutdown() {
stats.Delete(p.statsName)
p.ln.Close()
p.mu.Lock()
processes := p.processes
p.processes = nil
p.mu.Unlock()
for process, _ := range processes {
process.Close()
}
}
func (p *process) serverVCsLoop() {
for {
w, bufs, err := p.bq.Get(nil)
if err != nil {
return
}
vci, fid := unpackIDs(w.ID())
if vc := p.ServerVC(vci); vc != nil {
queueDataMessages(p.ctx, bufs, vc, fid, p.queue)
if len(bufs) == 0 {
m := &message.Data{VCI: vci, Flow: fid}
m.SetClose()
p.queue.Put(m)
w.Shutdown(true)
}
continue
}
releaseBufs(0, bufs)
}
}
func releaseBufs(start int, bufs []*iobuf.Slice) {
for _, buf := range bufs[start:] {
buf.Release()
}
}
func queueDataMessages(ctx *context.T, bufs []*iobuf.Slice, vc *vc.VC, fid id.Flow, q *upcqueue.T) {
for ix, b := range bufs {
m := &message.Data{VCI: vc.VCI(), Flow: fid}
var err error
if m.Payload, err = vc.Encrypt(fid, b); err != nil {
msgLog(ctx, "vc.Encrypt failed. VC:%v Flow:%v Error:%v", vc, fid, err)
releaseBufs(ix+1, bufs)
return
}
if err = q.Put(m); err != nil {
msgLog(ctx, "Failed to enqueue data message %v: %v", m, err)
m.Release()
releaseBufs(ix+1, bufs)
return
}
}
}
func (p *process) writeLoop() {
defer processLog(p.ctx, "Exited writeLoop for %v", p)
defer p.Close()
for {
item, err := p.queue.Get(nil)
if err != nil {
if err != upcqueue.ErrQueueIsClosed {
processLog(p.ctx, "upcqueue.Get failed on %v: %v", p, err)
}
return
}
if err = message.WriteTo(p.conn, item.(message.T), p.ctrlCipher); err != nil {
processLog(p.ctx, "message.WriteTo on %v failed: %v", p, err)
return
}
}
}
func (p *process) readLoop() {
defer processLog(p.ctx, "Exited readLoop for %v", p)
defer p.Close()
for {
msg, err := message.ReadFrom(p.reader, p.ctrlCipher)
if err != nil {
processLog(p.ctx, "Read on %v failed: %v", p, err)
return
}
msgLog(p.ctx, "Received msg: %T = %v", msg, msg)
switch m := msg.(type) {
case *message.Data:
if vc := p.ServerVC(m.VCI); vc != nil {
if err := vc.DispatchPayload(m.Flow, m.Payload); err != nil {
processLog(p.ctx, "Ignoring data message %v from process %v: %v", m, p, err)
}
if m.Close() {
vc.ShutdownFlow(m.Flow)
}
break
}
srcVCI := m.VCI
if d := p.Route(srcVCI); d != nil {
m.VCI = d.VCI
if err := d.Process.queue.Put(m); err != nil {
m.Release()
p.RemoveRoute(srcVCI)
p.SendCloseVC(srcVCI, verror.New(stream.ErrProxy, nil, verror.New(errFailedToFowardDataMsg, nil, err)))
}
break
}
p.SendCloseVC(srcVCI, verror.New(stream.ErrProxy, nil, verror.New(errNoRoutingTableEntry, nil)))
case *message.OpenFlow:
if vc := p.ServerVC(m.VCI); vc != nil {
if err := vc.AcceptFlow(m.Flow); err != nil {
processLog(p.ctx, "OpenFlow %+v on process %v failed: %v", m, p, err)
cm := &message.Data{VCI: m.VCI, Flow: m.Flow}
cm.SetClose()
p.queue.Put(cm)
}
vc.ReleaseCounters(m.Flow, m.InitialCounters)
break
}
srcVCI := m.VCI
if d := p.Route(srcVCI); d != nil {
m.VCI = d.VCI
if err := d.Process.queue.Put(m); err != nil {
p.RemoveRoute(srcVCI)
p.SendCloseVC(srcVCI, verror.New(stream.ErrProxy, nil, verror.New(errFailedToFowardOpenFlow, nil, err)))
}
break
}
p.SendCloseVC(srcVCI, verror.New(stream.ErrProxy, nil, verror.New(errNoRoutingTableEntry, nil)))
case *message.CloseVC:
if vc := p.RemoveServerVC(m.VCI); vc != nil {
vc.Close(verror.New(stream.ErrProxy, nil, verror.New(errRemoveServerVC, nil, m.VCI, m.Error)))
break
}
srcVCI := m.VCI
if d := p.Route(srcVCI); d != nil {
m.VCI = d.VCI
d.Process.queue.Put(m)
d.Process.RemoveRoute(d.VCI)
}
p.RemoveRoute(srcVCI)
case *message.AddReceiveBuffers:
p.proxy.routeCounters(p, m.Counters)
case *message.HealthCheckRequest:
if svc := p.ServerVC(m.VCI); svc != nil {
// If the request is for the proxy, simply respond to it.
p.queue.Put(&message.HealthCheckResponse{VCI: m.VCI})
} else if dst := p.Route(m.VCI); dst != nil {
m.VCI = dst.VCI
dst.Process.queue.Put(m)
}
case *message.HealthCheckResponse:
if svc := p.ServerVC(m.VCI); svc != nil {
// If the request is for the proxy, pass it to the VC.
svc.HandleHealthCheckResponse()
} else if dst := p.Route(m.VCI); dst != nil {
m.VCI = dst.VCI
dst.Process.queue.Put(m)
}
case *message.SetupVC:
// First let's ensure that we can speak a common protocol verison.
intersection, err := iversion.SupportedRange.Intersect(&m.Setup.Versions)
if err != nil {
p.SendCloseVC(m.VCI, verror.New(stream.ErrProxy, nil,
verror.New(errIncompatibleVersions, nil, err)))
break
}
dstrid := m.RemoteEndpoint.RoutingID()
if naming.Compare(dstrid, p.proxy.rid) || naming.Compare(dstrid, naming.NullRoutingID) {
// VC that terminates at the proxy.
// See protocol.vdl for details on the protocol between the server and the proxy.
vcObj := p.NewServerVC(p.ctx, m)
// route counters after creating the VC so counters to vc are not lost.
p.proxy.routeCounters(p, m.Counters)
if vcObj != nil {
server := &server{Process: p, VC: vcObj}
keyExchanger := func(pubKey *crypto.BoxKey) (*crypto.BoxKey, error) {
p.queue.Put(&message.SetupVC{
VCI: m.VCI,
Setup: message.Setup{
// Note that servers send clients not their actual supported versions,
// but the intersected range of the server and client ranges. This
// is important because proxies may have adjusted the version ranges
// along the way, and we should negotiate a version that is compatible
// with all intermediate hops.
Versions: *intersection,
Options: []message.SetupOption{&message.NaclBox{PublicKey: *pubKey}},
},
RemoteEndpoint: m.LocalEndpoint,
LocalEndpoint: p.proxy.endpoint(),
// TODO(mattr): Consider adding counters. See associated comment
// in vc.go:VC.HandshakeAcceptedVC for more details.
})
var theirPK *crypto.BoxKey
box := m.Setup.NaclBox()
if box != nil {
theirPK = &box.PublicKey
}
return theirPK, nil
}
go p.proxy.runServer(server, vcObj.HandshakeAcceptedVCWithAuthentication(intersection.Max, p.proxy.principal, p.proxy.blessings, keyExchanger))
}
break
}
srcVCI := m.VCI
d := p.Route(srcVCI)
if d == nil {
// SetupVC involves two messages: One sent by the initiator
// and one by the acceptor. The routing table gets setup on
// the first message, so if there is no route -
// setup a routing table entry.
dstprocess := p.proxy.servers.Process(dstrid)
if dstprocess == nil {
p.SendCloseVC(m.VCI, verror.New(stream.ErrProxy, nil, verror.New(errServerNotBeingProxied, nil, dstrid)))
p.proxy.routeCounters(p, m.Counters)
break
}
dstVCI := dstprocess.AllocVCI()
startRoutingVC(p.ctx, srcVCI, dstVCI, p, dstprocess)
if d = p.Route(srcVCI); d == nil {
p.SendCloseVC(srcVCI, verror.New(stream.ErrProxy, nil, verror.New(errServerVanished, nil, dstrid)))
p.proxy.routeCounters(p, m.Counters)
break
}
}
// Forward the SetupVC message.
// Typically, a SetupVC message is accompanied with
// Counters for the new VC. Keep that in the forwarded
// message and route the remaining counters separately.
counters := m.Counters
m.Counters = message.NewCounters()
dstVCI := d.VCI
for cid, bytes := range counters {
if cid.VCI() == srcVCI {
m.Counters.Add(dstVCI, cid.Flow(), bytes)
delete(counters, cid)
}
}
m.VCI = dstVCI
// Note that proxies rewrite the version range so that the final negotiated
// version will be compatible with all intermediate hops.
m.Setup.Versions = *intersection
d.Process.queue.Put(m)
p.proxy.routeCounters(p, counters)
default:
processLog(p.ctx, "Closing %v because of invalid message %T", p, m)
return
}
}
}
func (p *process) String() string {
r := p.conn.RemoteAddr()
return fmt.Sprintf("(%s, %s)", r.Network(), r)
}
func (p *process) Route(vci id.VC) *destination {
p.mu.RLock()
defer p.mu.RUnlock()
return p.routingTable[vci]
}
func (p *process) AddRoute(vci id.VC, d *destination) {
p.mu.Lock()
p.routingTable[vci] = d
p.mu.Unlock()
}
func (p *process) InitVCI(vci id.VC) {
p.mu.Lock()
if p.nextVCI <= vci {
p.nextVCI = vci + 1
}
p.mu.Unlock()
}
func (p *process) AllocVCI() id.VC {
p.mu.Lock()
ret := p.nextVCI
p.nextVCI += 2
p.mu.Unlock()
return ret
}
func (p *process) RemoveRoute(vci id.VC) {
p.mu.Lock()
delete(p.routingTable, vci)
p.mu.Unlock()
}
func (p *process) SendCloseVC(vci id.VC, err error) {
var estr string
if err != nil {
estr = err.Error()
}
p.queue.Put(&message.CloseVC{VCI: vci, Error: estr})
}
func (p *process) Close() {
p.mu.Lock()
if p.routingTable == nil {
p.mu.Unlock()
return
}
rt := p.routingTable
p.routingTable = nil
for _, vc := range p.servers {
vc.Close(verror.New(stream.ErrProxy, nil, verror.New(errNetConnClosing, nil)))
}
p.mu.Unlock()
for _, d := range rt {
d.Process.SendCloseVC(d.VCI, verror.New(stream.ErrProxy, nil, verror.New(errProcessVanished, nil)))
}
p.bq.Close()
p.queue.Close()
p.conn.Close()
p.proxy.removeProcess(p)
}
func (p *process) ServerVC(vci id.VC) *vc.VC {
p.mu.Lock()
defer p.mu.Unlock()
return p.servers[vci]
}
func (p *process) NewServerVC(ctx *context.T, m *message.SetupVC) *vc.VC {
p.mu.Lock()
defer p.mu.Unlock()
if vc := p.servers[m.VCI]; vc != nil {
vc.Close(verror.New(stream.ErrProxy, nil, verror.New(errDuplicateSetupVC, nil)))
return nil
}
vc := vc.InternalNew(ctx, vc.Params{
VCI: m.VCI,
LocalEP: m.RemoteEndpoint,
RemoteEP: m.LocalEndpoint,
Pool: p.pool,
ReserveBytes: message.HeaderSizeBytes,
Helper: p,
})
p.servers[m.VCI] = vc
proxyLog(p.ctx, "Registered VC %v from server on process %v", vc, p)
return vc
}
func (p *process) RemoveServerVC(vci id.VC) *vc.VC {
p.mu.Lock()
defer p.mu.Unlock()
if vc := p.servers[vci]; vc != nil {
delete(p.servers, vci)
proxyLog(p.ctx, "Unregistered server VC %v from process %v", vc, p)
return vc
}
return nil
}
// Make process implement vc.Helper
func (p *process) NotifyOfNewFlow(vci id.VC, fid id.Flow, bytes uint) {
msg := &message.OpenFlow{VCI: vci, Flow: fid, InitialCounters: uint32(bytes)}
if err := p.queue.Put(msg); err != nil {
processLog(p.ctx, "Failed to send OpenFlow(%+v) on process %v: %v", msg, p, err)
}
}
func (p *process) SendHealthCheck(vci id.VC) {
p.queue.Put(&message.HealthCheckRequest{VCI: vci})
}
func (p *process) AddReceiveBuffers(vci id.VC, fid id.Flow, bytes uint) {
if bytes == 0 {
return
}
msg := &message.AddReceiveBuffers{Counters: message.NewCounters()}
msg.Counters.Add(vci, fid, uint32(bytes))
if err := p.queue.Put(msg); err != nil {
processLog(p.ctx, "Failed to send AddReceiveBuffers(%+v) on process %v: %v", msg, p, err)
}
}
func (p *process) NewWriter(vci id.VC, fid id.Flow, priority bqueue.Priority) (bqueue.Writer, error) {
return p.bq.NewWriter(packIDs(vci, fid), priority, vc.DefaultBytesBufferedPerFlow)
}
// Convenience functions to assist with the logging convention.
func proxyLog(ctx *context.T, format string, args ...interface{}) {
ctx.VI(1).Infof(format, args...)
}
func processLog(ctx *context.T, format string, args ...interface{}) {
ctx.VI(2).Infof(format, args...)
}
func vcLog(ctx *context.T, format string, args ...interface{}) {
ctx.VI(3).Infof(format, args...)
}
func msgLog(ctx *context.T, format string, args ...interface{}) {
ctx.VI(4).Infof(format, args...)
}
func packIDs(vci id.VC, fid id.Flow) bqueue.ID {
return bqueue.ID(message.MakeCounterID(vci, fid))
}
func unpackIDs(b bqueue.ID) (id.VC, id.Flow) {
cid := message.CounterID(b)
return cid.VCI(), cid.Flow()
}