blob: 239d633985011fbe94879d64c943840a4fc857b1 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package xproxyd
import (
"io"
"sync"
"v.io/v23"
"v.io/v23/context"
"v.io/v23/flow"
"v.io/v23/naming"
"v.io/v23/vom"
)
// TODO(suharshs): Make sure that we don't leak any goroutines.
const proxyByte = byte('p')
const serverByte = byte('s')
const clientByte = byte('c')
type proxy struct {
m flow.Manager
mu sync.Mutex
proxyEndpoints []naming.Endpoint
}
func New(ctx *context.T) (*proxy, error) {
p := &proxy{
m: v23.ExperimentalGetFlowManager(ctx),
}
for _, addr := range v23.GetListenSpec(ctx).Addrs {
if addr.Protocol == "v23" {
ep, err := v23.NewEndpoint(addr.Address)
if err != nil {
return nil, err
}
f, err := p.m.Dial(ctx, ep, proxyBlessingsForPeer{}.run)
if err != nil {
return nil, err
}
// Send a byte telling the acceptor that we are a proxy.
if _, err := f.Write([]byte{proxyByte}); err != nil {
return nil, err
}
var lep string
if err := vom.NewDecoder(f).Decode(&lep); err != nil {
return nil, err
}
proxyEndpoint, err := v23.NewEndpoint(lep)
if err != nil {
return nil, err
}
p.mu.Lock()
p.proxyEndpoints = append(p.proxyEndpoints, proxyEndpoint)
p.mu.Unlock()
} else if err := p.m.Listen(ctx, addr.Protocol, addr.Address); err != nil {
return nil, err
}
}
go p.listenLoop(ctx)
return p, nil
}
func (p *proxy) ListeningEndpoints() []naming.Endpoint {
return p.m.ListeningEndpoints()
}
func (p *proxy) listenLoop(ctx *context.T) {
for {
f, err := p.m.Accept(ctx)
if err != nil {
ctx.Infof("p.m.Accept failed: %v", err)
break
}
msg := make([]byte, 1)
if _, err := f.Read(msg); err != nil {
ctx.Errorf("reading type byte failed: %v", err)
}
switch msg[0] {
case clientByte:
err = p.startRouting(ctx, f)
case proxyByte:
err = p.replyToProxy(ctx, f)
case serverByte:
err = p.replyToServer(ctx, f)
default:
continue
}
if err != nil {
ctx.Errorf("failed to handle incoming connection: %v", err)
}
}
}
func (p *proxy) startRouting(ctx *context.T, f flow.Flow) error {
fout, err := p.dialNextHop(ctx, f)
if err != nil {
return err
}
go p.forwardLoop(ctx, f, fout)
go p.forwardLoop(ctx, fout, f)
return nil
}
func (p *proxy) forwardLoop(ctx *context.T, fin, fout flow.Flow) {
for {
_, err := io.Copy(fin, fout)
if err == io.EOF {
return
} else if err != nil {
ctx.Errorf("f.Read failed: %v", err)
return
}
}
}
func (p *proxy) dialNextHop(ctx *context.T, f flow.Flow) (flow.Flow, error) {
m, err := readSetupMessage(ctx, f)
if err != nil {
return nil, err
}
var rid naming.RoutingID
var ep naming.Endpoint
var shouldWriteClientByte bool
if routes := m.PeerRemoteEndpoint.Routes(); len(routes) > 0 {
if err := rid.FromString(routes[0]); err != nil {
return nil, err
}
// Make an endpoint with the correct routingID to dial out. All other fields
// do not matter.
// TODO(suharshs): Make sure that the routingID from the route belongs to a
// connection that is stored in the manager's cache. (i.e. a Server has connected
// with the routingID before)
if ep, err = setEndpointRoutingID(m.PeerRemoteEndpoint, rid); err != nil {
return nil, err
}
// Remove the read route from the setup message endpoint.
if m.PeerRemoteEndpoint, err = setEndpointRoutes(m.PeerRemoteEndpoint, routes[1:]); err != nil {
return nil, err
}
shouldWriteClientByte = true
} else {
ep = m.PeerRemoteEndpoint
}
fout, err := p.m.Dial(ctx, ep, proxyBlessingsForPeer{}.run)
if err != nil {
return nil, err
}
if shouldWriteClientByte {
// We only write the clientByte on flows made to proxys. If we are creating
// the last hop flow to the end server, we don't need to send the byte.
if _, err := fout.Write([]byte{clientByte}); err != nil {
return nil, err
}
}
// Write the setup message back onto the flow for the next hop to read.
return fout, writeSetupMessage(ctx, m, fout)
}
func (p *proxy) replyToServer(ctx *context.T, f flow.Flow) error {
rid := f.Conn().RemoteEndpoint().RoutingID()
epString, err := p.returnEndpoint(ctx, rid, "")
if err != nil {
return err
}
// TODO(suharshs): Make a low-level message for this information instead of
// VOM-Encoding the endpoint string.
return vom.NewEncoder(f).Encode(epString)
}
func (p *proxy) replyToProxy(ctx *context.T, f flow.Flow) error {
// Add the routing id of the incoming proxy to the routes. The routing id of the
// returned endpoint doesn't matter because it will eventually be replaced
// by a server's rid by some later proxy.
// TODO(suharshs): Use a local route instead of this global routingID.
rid := f.Conn().RemoteEndpoint().RoutingID()
epString, err := p.returnEndpoint(ctx, naming.NullRoutingID, rid.String())
if err != nil {
return err
}
return vom.NewEncoder(f).Encode(epString)
}
func (p *proxy) returnEndpoint(ctx *context.T, rid naming.RoutingID, route string) (string, error) {
p.mu.Lock()
eps := append(p.m.ListeningEndpoints(), p.proxyEndpoints...)
p.mu.Unlock()
if len(eps) == 0 {
return "", NewErrNotListening(ctx)
}
// TODO(suharshs): handle listening on multiple endpoints.
ep := eps[len(eps)-1]
var err error
if rid != naming.NullRoutingID {
ep, err = setEndpointRoutingID(ep, rid)
if err != nil {
return "", err
}
}
if len(route) > 0 {
var cp []string
cp = append(cp, ep.Routes()...)
cp = append(cp, route)
ep, err = setEndpointRoutes(ep, cp)
if err != nil {
return "", err
}
}
return ep.String(), nil
}