services/xproxyd: Create non-vom proxy protocol.
(1) v23/flow/message now has Proxy request and response messages.
(2) Add support for proxy to pass multiple endpoints back to servers.
MultiPart: 1/2
Change-Id: I2ee28d690adcfad263ecc48c5f8df5448d3cd6e0
diff --git a/runtime/internal/flow/manager/errors.vdl b/runtime/internal/flow/manager/errors.vdl
index 4ea50bc..dbb38ae 100644
--- a/runtime/internal/flow/manager/errors.vdl
+++ b/runtime/internal/flow/manager/errors.vdl
@@ -15,4 +15,5 @@
AcceptFailed(err error) {"en": "accept failed{:err}"}
CacheClosed() {"en":"cache is closed"}
ConnKilledToFreeResources() {"en": "Connection killed to free resources."}
+ InvalidProxyResponse(typ string) {"en": "Invalid proxy response{:typ}"}
)
diff --git a/runtime/internal/flow/manager/errors.vdl.go b/runtime/internal/flow/manager/errors.vdl.go
index be7aa07..438e6fa 100644
--- a/runtime/internal/flow/manager/errors.vdl.go
+++ b/runtime/internal/flow/manager/errors.vdl.go
@@ -20,6 +20,7 @@
ErrAcceptFailed = verror.Register("v.io/x/ref/runtime/internal/flow/manager.AcceptFailed", verror.NoRetry, "{1:}{2:} accept failed{:3}")
ErrCacheClosed = verror.Register("v.io/x/ref/runtime/internal/flow/manager.CacheClosed", verror.NoRetry, "{1:}{2:} cache is closed")
ErrConnKilledToFreeResources = verror.Register("v.io/x/ref/runtime/internal/flow/manager.ConnKilledToFreeResources", verror.NoRetry, "{1:}{2:} Connection killed to free resources.")
+ ErrInvalidProxyResponse = verror.Register("v.io/x/ref/runtime/internal/flow/manager.InvalidProxyResponse", verror.NoRetry, "{1:}{2:} Invalid proxy response{:3}")
)
func init() {
@@ -28,6 +29,7 @@
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrAcceptFailed.ID), "{1:}{2:} accept failed{:3}")
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrCacheClosed.ID), "{1:}{2:} cache is closed")
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrConnKilledToFreeResources.ID), "{1:}{2:} Connection killed to free resources.")
+ i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrInvalidProxyResponse.ID), "{1:}{2:} Invalid proxy response{:3}")
}
// NewErrUnknownProtocol returns an error with the ErrUnknownProtocol ID.
@@ -54,3 +56,8 @@
func NewErrConnKilledToFreeResources(ctx *context.T) error {
return verror.New(ErrConnKilledToFreeResources, ctx)
}
+
+// NewErrInvalidProxyResponse returns an error with the ErrInvalidProxyResponse ID.
+func NewErrInvalidProxyResponse(ctx *context.T, typ string) error {
+ return verror.New(ErrInvalidProxyResponse, ctx, typ)
+}
diff --git a/runtime/internal/flow/manager/manager.go b/runtime/internal/flow/manager/manager.go
index c2eb5a9..6ec43fb 100644
--- a/runtime/internal/flow/manager/manager.go
+++ b/runtime/internal/flow/manager/manager.go
@@ -5,6 +5,7 @@
package manager
import (
+ "fmt"
"net"
"strings"
"sync"
@@ -18,7 +19,6 @@
"v.io/v23/naming"
"v.io/v23/security"
"v.io/v23/verror"
- "v.io/v23/vom"
"v.io/x/ref/runtime/internal/flow/conn"
"v.io/x/ref/runtime/internal/lib/upcqueue"
@@ -26,11 +26,6 @@
"v.io/x/ref/runtime/internal/rpc/version"
)
-const (
- clientByte = 'c'
- serverByte = 's'
-)
-
type manager struct {
rid naming.RoutingID
closed <-chan struct{}
@@ -59,24 +54,24 @@
// otherwise an error is returned.
func (m *manager) Listen(ctx *context.T, protocol, address string) error {
var (
- ep naming.Endpoint
+ eps []naming.Endpoint
err error
)
if protocol == inaming.Network {
- ep, err = m.proxyListen(ctx, address)
+ eps, err = m.proxyListen(ctx, address)
} else {
- ep, err = m.listen(ctx, protocol, address)
+ eps, err = m.listen(ctx, protocol, address)
}
if err != nil {
return err
}
m.mu.Lock()
- m.listenEndpoints = append(m.listenEndpoints, ep)
+ m.listenEndpoints = append(m.listenEndpoints, eps...)
m.mu.Unlock()
return nil
}
-func (m *manager) listen(ctx *context.T, protocol, address string) (naming.Endpoint, error) {
+func (m *manager) listen(ctx *context.T, protocol, address string) ([]naming.Endpoint, error) {
ln, err := listen(ctx, protocol, address)
if err != nil {
return nil, flow.NewErrNetwork(ctx, err)
@@ -87,10 +82,10 @@
RID: m.rid,
}
go m.lnAcceptLoop(ctx, ln, local)
- return local, nil
+ return []naming.Endpoint{local}, nil
}
-func (m *manager) proxyListen(ctx *context.T, address string) (naming.Endpoint, error) {
+func (m *manager) proxyListen(ctx *context.T, address string) ([]naming.Endpoint, error) {
ep, err := inaming.NewEndpoint(address)
if err != nil {
return nil, flow.NewErrBadArg(ctx, err)
@@ -99,15 +94,31 @@
if err != nil {
return nil, flow.NewErrNetwork(ctx, err)
}
- // Write to ensure we send an openFlow message.
- if _, err := f.Write([]byte{serverByte}); err != nil {
+ w, err := message.Append(ctx, &message.ProxyServerRequest{}, nil)
+ if err != nil {
+ return nil, flow.NewErrBadArg(ctx, err)
+ }
+ if _, err := f.WriteMsg(w); err != nil {
+ return nil, flow.NewErrBadArg(ctx, err)
+ }
+
+ return m.readProxyResponse(ctx, f)
+}
+
+func (m *manager) readProxyResponse(ctx *context.T, f flow.Flow) ([]naming.Endpoint, error) {
+ b, err := f.ReadMsg()
+ if err != nil {
return nil, flow.NewErrNetwork(ctx, err)
}
- var lep string
- if err := vom.NewDecoder(f).Decode(&lep); err != nil {
- return nil, flow.NewErrNetwork(ctx, err)
+ msg, err := message.Read(ctx, b)
+ if err != nil {
+ return nil, flow.NewErrBadArg(ctx, err)
}
- return inaming.NewEndpoint(lep)
+ res, ok := msg.(*message.ProxyResponse)
+ if !ok {
+ return nil, flow.NewErrBadArg(ctx, NewErrInvalidProxyResponse(ctx, fmt.Sprintf("%t", res)))
+ }
+ return res.Endpoints, nil
}
type proxyBlessingsForPeer struct{}
@@ -322,10 +333,6 @@
// If we are dialing out to a Proxy, we need to dial a conn on this flow, and
// return a flow on that corresponding conn.
if remote.RoutingID() != c.RemoteEndpoint().RoutingID() {
- // Write to tell the proxy that this should be routed.
- if _, err := f.Write([]byte{clientByte}); err != nil {
- return nil, flow.NewErrNetwork(ctx, err)
- }
c, err = conn.NewDialed(
ctx,
f,
diff --git a/services/xproxyd/proxyd.go b/services/xproxyd/proxyd.go
index 239d633..fc97ceb 100644
--- a/services/xproxyd/proxyd.go
+++ b/services/xproxyd/proxyd.go
@@ -5,22 +5,19 @@
package xproxyd
import (
+ "fmt"
"io"
"sync"
"v.io/v23"
"v.io/v23/context"
"v.io/v23/flow"
+ "v.io/v23/flow/message"
"v.io/v23/naming"
- "v.io/v23/vom"
)
// TODO(suharshs): Make sure that we don't leak any goroutines.
-const proxyByte = byte('p')
-const serverByte = byte('s')
-const clientByte = byte('c')
-
type proxy struct {
m flow.Manager
mu sync.Mutex
@@ -42,19 +39,19 @@
return nil, err
}
// Send a byte telling the acceptor that we are a proxy.
- if _, err := f.Write([]byte{proxyByte}); err != nil {
+ if err := writeMessage(ctx, &message.MultiProxyRequest{}, f); err != nil {
return nil, err
}
- var lep string
- if err := vom.NewDecoder(f).Decode(&lep); err != nil {
- return nil, err
- }
- proxyEndpoint, err := v23.NewEndpoint(lep)
+ msg, err := readMessage(ctx, f)
if err != nil {
return nil, err
}
+ m, ok := msg.(*message.ProxyResponse)
+ if !ok {
+ return nil, NewErrUnexpectedMessage(ctx, fmt.Sprintf("%t", m))
+ }
p.mu.Lock()
- p.proxyEndpoints = append(p.proxyEndpoints, proxyEndpoint)
+ p.proxyEndpoints = append(p.proxyEndpoints, m.Endpoints...)
p.mu.Unlock()
} else if err := p.m.Listen(ctx, addr.Protocol, addr.Address); err != nil {
return nil, err
@@ -75,16 +72,16 @@
ctx.Infof("p.m.Accept failed: %v", err)
break
}
- msg := make([]byte, 1)
- if _, err := f.Read(msg); err != nil {
+ msg, err := readMessage(ctx, f)
+ if err != nil {
ctx.Errorf("reading type byte failed: %v", err)
}
- switch msg[0] {
- case clientByte:
- err = p.startRouting(ctx, f)
- case proxyByte:
+ switch m := msg.(type) {
+ case *message.Setup:
+ err = p.startRouting(ctx, f, m)
+ case *message.MultiProxyRequest:
err = p.replyToProxy(ctx, f)
- case serverByte:
+ case *message.ProxyServerRequest:
err = p.replyToServer(ctx, f)
default:
continue
@@ -95,8 +92,8 @@
}
}
-func (p *proxy) startRouting(ctx *context.T, f flow.Flow) error {
- fout, err := p.dialNextHop(ctx, f)
+func (p *proxy) startRouting(ctx *context.T, f flow.Flow, m *message.Setup) error {
+ fout, err := p.dialNextHop(ctx, f, m)
if err != nil {
return err
}
@@ -117,14 +114,12 @@
}
}
-func (p *proxy) dialNextHop(ctx *context.T, f flow.Flow) (flow.Flow, error) {
- m, err := readSetupMessage(ctx, f)
- if err != nil {
- return nil, err
- }
- var rid naming.RoutingID
- var ep naming.Endpoint
- var shouldWriteClientByte bool
+func (p *proxy) dialNextHop(ctx *context.T, f flow.Flow, m *message.Setup) (flow.Flow, error) {
+ var (
+ rid naming.RoutingID
+ ep naming.Endpoint
+ err error
+ )
if routes := m.PeerRemoteEndpoint.Routes(); len(routes) > 0 {
if err := rid.FromString(routes[0]); err != nil {
return nil, err
@@ -141,7 +136,6 @@
if m.PeerRemoteEndpoint, err = setEndpointRoutes(m.PeerRemoteEndpoint, routes[1:]); err != nil {
return nil, err
}
- shouldWriteClientByte = true
} else {
ep = m.PeerRemoteEndpoint
}
@@ -149,27 +143,17 @@
if err != nil {
return nil, err
}
- if shouldWriteClientByte {
- // We only write the clientByte on flows made to proxys. If we are creating
- // the last hop flow to the end server, we don't need to send the byte.
- if _, err := fout.Write([]byte{clientByte}); err != nil {
- return nil, err
- }
- }
-
// Write the setup message back onto the flow for the next hop to read.
- return fout, writeSetupMessage(ctx, m, fout)
+ return fout, writeMessage(ctx, m, fout)
}
func (p *proxy) replyToServer(ctx *context.T, f flow.Flow) error {
rid := f.Conn().RemoteEndpoint().RoutingID()
- epString, err := p.returnEndpoint(ctx, rid, "")
+ eps, err := p.returnEndpoints(ctx, rid, "")
if err != nil {
return err
}
- // TODO(suharshs): Make a low-level message for this information instead of
- // VOM-Encoding the endpoint string.
- return vom.NewEncoder(f).Encode(epString)
+ return writeMessage(ctx, &message.ProxyResponse{Endpoints: eps}, f)
}
func (p *proxy) replyToProxy(ctx *context.T, f flow.Flow) error {
@@ -178,37 +162,38 @@
// by a server's rid by some later proxy.
// TODO(suharshs): Use a local route instead of this global routingID.
rid := f.Conn().RemoteEndpoint().RoutingID()
- epString, err := p.returnEndpoint(ctx, naming.NullRoutingID, rid.String())
+ eps, err := p.returnEndpoints(ctx, naming.NullRoutingID, rid.String())
if err != nil {
return err
}
- return vom.NewEncoder(f).Encode(epString)
+ return writeMessage(ctx, &message.ProxyResponse{Endpoints: eps}, f)
}
-func (p *proxy) returnEndpoint(ctx *context.T, rid naming.RoutingID, route string) (string, error) {
+func (p *proxy) returnEndpoints(ctx *context.T, rid naming.RoutingID, route string) ([]naming.Endpoint, error) {
p.mu.Lock()
eps := append(p.m.ListeningEndpoints(), p.proxyEndpoints...)
p.mu.Unlock()
if len(eps) == 0 {
- return "", NewErrNotListening(ctx)
+ return nil, NewErrNotListening(ctx)
}
- // TODO(suharshs): handle listening on multiple endpoints.
- ep := eps[len(eps)-1]
- var err error
- if rid != naming.NullRoutingID {
- ep, err = setEndpointRoutingID(ep, rid)
- if err != nil {
- return "", err
+ for idx, ep := range eps {
+ var err error
+ if rid != naming.NullRoutingID {
+ ep, err = setEndpointRoutingID(ep, rid)
+ if err != nil {
+ return nil, err
+ }
}
- }
- if len(route) > 0 {
- var cp []string
- cp = append(cp, ep.Routes()...)
- cp = append(cp, route)
- ep, err = setEndpointRoutes(ep, cp)
- if err != nil {
- return "", err
+ if len(route) > 0 {
+ var cp []string
+ cp = append(cp, ep.Routes()...)
+ cp = append(cp, route)
+ ep, err = setEndpointRoutes(ep, cp)
+ if err != nil {
+ return nil, err
+ }
}
+ eps[idx] = ep
}
- return ep.String(), nil
+ return eps, nil
}
diff --git a/services/xproxyd/util.go b/services/xproxyd/util.go
index b1fdd58..4ccad31 100644
--- a/services/xproxyd/util.go
+++ b/services/xproxyd/util.go
@@ -5,8 +5,6 @@
package xproxyd
import (
- "fmt"
-
"v.io/v23"
"v.io/v23/context"
"v.io/v23/flow"
@@ -73,26 +71,19 @@
return v23.GetPrincipal(ctx).BlessingStore().Default(), nil, nil
}
-func readSetupMessage(ctx *context.T, f flow.Flow) (*message.Setup, error) {
- b, err := f.ReadMsg()
- if err != nil {
- return nil, err
- }
- m, err := message.Read(ctx, b)
- if err != nil {
- return nil, err
- }
- if m, isSetup := m.(*message.Setup); isSetup {
- return m, nil
- }
- return nil, NewErrUnexpectedMessage(ctx, fmt.Sprintf("%t", m))
-}
-
-func writeSetupMessage(ctx *context.T, m message.Message, f flow.Flow) error {
- w, err := message.Append(ctx, m, []byte{})
+func writeMessage(ctx *context.T, m message.Message, f flow.Flow) error {
+ w, err := message.Append(ctx, m, nil)
if err != nil {
return err
}
_, err = f.WriteMsg(w)
return err
}
+
+func readMessage(ctx *context.T, f flow.Flow) (message.Message, error) {
+ b, err := f.ReadMsg()
+ if err != nil {
+ return nil, err
+ }
+ return message.Read(ctx, b)
+}