ref/profiles/proxy: Move the proxy implementation back into profiles internal.
Instead, for now, we'll expose proxy creation on profiles that need it.
Also remove the modules/core proxy command.
Change-Id: Ia2c0760d438aa0cb71126b30a6b7b2ccee134393
diff --git a/cmd/servicerunner/main.go b/cmd/servicerunner/main.go
index 4da36de..815d74e 100644
--- a/cmd/servicerunner/main.go
+++ b/cmd/servicerunner/main.go
@@ -15,7 +15,8 @@
"v.io/x/ref/lib/flags/consts"
"v.io/x/ref/lib/modules"
"v.io/x/ref/lib/modules/core"
- _ "v.io/x/ref/profiles"
+ "v.io/x/ref/lib/signals"
+ "v.io/x/ref/profiles"
)
func panicOnError(err error) {
@@ -75,17 +76,13 @@
panicOnError(err)
panicOnError(updateVars(h, vars, "MT_NAME"))
- // Set consts.NamespaceRootPrefix env var, consumed downstream by proxyd
- // among others.
- // NOTE(sadovsky): If this var is not set, proxyd takes several seconds to
- // start; if it is set, proxyd starts instantly. Confusing.
+ // Set consts.NamespaceRootPrefix env var, consumed downstream.
sh.SetVar(consts.NamespaceRootPrefix, vars["MT_NAME"])
+ v23.GetNamespace(ctx).SetRoots(vars["MT_NAME"])
- // NOTE(sadovsky): The proxyd binary requires --protocol and --address flags
- // while the proxyd command instead uses ListenSpec flags.
- h, err = sh.Start(core.ProxyServerCommand, nil, "--veyron.tcp.protocol=ws", "--veyron.tcp.address=127.0.0.1:0", "test/proxy")
- panicOnError(err)
- panicOnError(updateVars(h, vars, "PROXY_NAME"))
+ proxyShutdown, proxyEndpoint, err := profiles.NewProxy(ctx, "ws", "127.0.0.1:0", "", "test/proxy")
+ defer proxyShutdown()
+ vars["PROXY_NAME"] = proxyEndpoint.Name()
h, err = sh.Start(core.WSPRCommand, nil, "--veyron.tcp.protocol=ws", "--veyron.tcp.address=127.0.0.1:0", "--veyron.proxy=test/proxy", "--identd=test/identd")
panicOnError(err)
@@ -99,6 +96,5 @@
panicOnError(err)
fmt.Println(string(bytes))
- // Wait to be killed.
- select {}
+ <-signals.ShutdownOnSignals(ctx)
}
diff --git a/lib/modules/core/proxy.go b/lib/modules/core/proxy.go
deleted file mode 100644
index 5b2428a..0000000
--- a/lib/modules/core/proxy.go
+++ /dev/null
@@ -1,69 +0,0 @@
-package core
-
-import (
- "fmt"
- "io"
- "os"
- "time"
-
- "v.io/v23"
- "v.io/v23/naming"
-
- "v.io/x/ref/lib/modules"
- "v.io/x/ref/lib/publisher"
- "v.io/x/ref/profiles/proxy"
-)
-
-func init() {
- modules.RegisterChild(ProxyServerCommand, `<name>`, proxyServer)
-}
-
-func proxyServer(stdin io.Reader, stdout, stderr io.Writer, env map[string]string, args ...string) error {
- ctx, shutdown := v23.Init()
- defer shutdown()
-
- expected := len(args)
- rid, err := naming.NewRoutingID()
- if err != nil {
- return err
- }
-
- listenSpec := v23.GetListenSpec(ctx)
- protocol := listenSpec.Addrs[0].Protocol
- addr := listenSpec.Addrs[0].Address
-
- proxy, err := proxy.New(rid, v23.GetPrincipal(ctx), protocol, addr, "")
- if err != nil {
- return err
- }
- defer proxy.Shutdown()
-
- fmt.Fprintf(stdout, "PID=%d\n", os.Getpid())
- if expected > 0 {
- pub := publisher.New(ctx, v23.GetNamespace(ctx), time.Minute)
- defer pub.WaitForStop()
- defer pub.Stop()
- pub.AddServer(proxy.Endpoint().String(), false)
- for _, name := range args {
- if len(name) == 0 {
- return fmt.Errorf("empty name specified on the command line")
- }
- pub.AddName(name)
- }
- // Wait for all the entries to be published.
- for {
- pubState := pub.Status()
- if expected == len(pubState) {
- break
- }
- fmt.Fprintf(stderr, "%s\n", pub.DebugString())
- delay := time.Second
- fmt.Fprintf(stderr, "Sleeping: %s\n", delay)
- time.Sleep(delay)
- }
- }
- fmt.Fprintf(stdout, "PROXY_NAME=%s\n", proxy.Endpoint().Name())
- modules.WaitForEOF(stdin)
- fmt.Fprintf(stdout, "DONE\n")
- return nil
-}
diff --git a/profiles/internal/ipc/proxy_test.go b/profiles/internal/ipc/proxy_test.go
index e939cb9..26e243d 100644
--- a/profiles/internal/ipc/proxy_test.go
+++ b/profiles/internal/ipc/proxy_test.go
@@ -2,6 +2,7 @@
import (
"fmt"
+ "io"
"os"
"reflect"
"sort"
@@ -9,6 +10,7 @@
"testing"
"time"
+ "v.io/v23"
"v.io/v23/context"
"v.io/v23/ipc"
"v.io/v23/naming"
@@ -21,11 +23,12 @@
"v.io/x/ref/lib/expect"
"v.io/x/ref/lib/flags"
"v.io/x/ref/lib/modules"
- "v.io/x/ref/lib/modules/core"
+ "v.io/x/ref/lib/publisher"
tsecurity "v.io/x/ref/lib/testutil/security"
_ "v.io/x/ref/profiles"
iipc "v.io/x/ref/profiles/internal/ipc"
imanager "v.io/x/ref/profiles/internal/ipc/stream/manager"
+ "v.io/x/ref/profiles/internal/ipc/stream/proxy"
"v.io/x/ref/profiles/internal/ipc/stream/vc"
inaming "v.io/x/ref/profiles/internal/naming"
tnaming "v.io/x/ref/profiles/internal/testing/mocks/naming"
@@ -37,6 +40,50 @@
return ctx
}
+func proxyServer(stdin io.Reader, stdout, stderr io.Writer, env map[string]string, args ...string) error {
+ ctx, shutdown := v23.Init()
+ defer shutdown()
+
+ expected := len(args)
+ listenSpec := v23.GetListenSpec(ctx)
+ protocol := listenSpec.Addrs[0].Protocol
+ addr := listenSpec.Addrs[0].Address
+ proxyShutdown, proxyEp, err := proxy.New(ctx, protocol, addr, "")
+ if err != nil {
+ return err
+ }
+ defer proxyShutdown()
+
+ fmt.Fprintf(stdout, "PID=%d\n", os.Getpid())
+ if expected > 0 {
+ pub := publisher.New(ctx, v23.GetNamespace(ctx), time.Minute)
+ defer pub.WaitForStop()
+ defer pub.Stop()
+ pub.AddServer(proxyEp.String(), false)
+ for _, name := range args {
+ if len(name) == 0 {
+ return fmt.Errorf("empty name specified on the command line")
+ }
+ pub.AddName(name)
+ }
+ // Wait for all the entries to be published.
+ for {
+ pubState := pub.Status()
+ if expected == len(pubState) {
+ break
+ }
+ fmt.Fprintf(stderr, "%s\n", pub.DebugString())
+ delay := time.Second
+ fmt.Fprintf(stderr, "Sleeping: %s\n", delay)
+ time.Sleep(delay)
+ }
+ }
+ fmt.Fprintf(stdout, "PROXY_NAME=%s\n", proxyEp.Name())
+ modules.WaitForEOF(stdin)
+ fmt.Fprintf(stdout, "DONE\n")
+ return nil
+}
+
func testContextWithoutDeadline() *context.T {
ctx, _ := context.RootContext()
ctx, err := ivtrace.Init(ctx, flags.VtraceFlags{})
@@ -78,7 +125,7 @@
t.Fatalf("unexpected error: %s", err)
}
h.sh = sh
- p, err := sh.Start(core.ProxyServerCommand, nil, args...)
+ p, err := sh.Start("proxyServer", nil, args...)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
diff --git a/profiles/internal/ipc/stream/manager/listener.go b/profiles/internal/ipc/stream/manager/listener.go
index bf1e929..f1a3f60 100644
--- a/profiles/internal/ipc/stream/manager/listener.go
+++ b/profiles/internal/ipc/stream/manager/listener.go
@@ -8,9 +8,9 @@
"sync"
"v.io/x/ref/lib/upcqueue"
+ "v.io/x/ref/profiles/internal/ipc/stream/proxy"
"v.io/x/ref/profiles/internal/ipc/stream/vif"
inaming "v.io/x/ref/profiles/internal/naming"
- "v.io/x/ref/profiles/proxy"
"v.io/v23/naming"
"v.io/v23/verror"
diff --git a/profiles/proxy/debug.go b/profiles/internal/ipc/stream/proxy/debug.go
similarity index 91%
rename from profiles/proxy/debug.go
rename to profiles/internal/ipc/stream/proxy/debug.go
index deeb53d..ff68e55 100644
--- a/profiles/proxy/debug.go
+++ b/profiles/internal/ipc/stream/proxy/debug.go
@@ -7,12 +7,12 @@
// DebugString dumps out the routing table at the proxy in text format.
// The format is meant for debugging purposes and may change without notice.
-func (p *Proxy) DebugString() string {
+func (p *Proxy) debugString() string {
var buf bytes.Buffer
servers := p.servers.List()
p.mu.RLock()
defer p.mu.RUnlock()
- fmt.Fprintf(&buf, "Proxy with endpoint: %q. #Processes:%d #Servers:%d\n", p.Endpoint(), len(p.processes), len(servers))
+ fmt.Fprintf(&buf, "Proxy with endpoint: %q. #Processes:%d #Servers:%d\n", p.endpoint(), len(p.processes), len(servers))
fmt.Fprintf(&buf, "=========\n")
fmt.Fprintf(&buf, "PROCESSES\n")
fmt.Fprintf(&buf, "=========\n")
diff --git a/profiles/proxy/doc.go b/profiles/internal/ipc/stream/proxy/doc.go
similarity index 100%
rename from profiles/proxy/doc.go
rename to profiles/internal/ipc/stream/proxy/doc.go
diff --git a/profiles/internal/ipc/stream/proxy/helper_test.go b/profiles/internal/ipc/stream/proxy/helper_test.go
new file mode 100644
index 0000000..175070d
--- /dev/null
+++ b/profiles/internal/ipc/stream/proxy/helper_test.go
@@ -0,0 +1,11 @@
+package proxy
+
+import (
+ "v.io/v23/naming"
+ "v.io/v23/security"
+)
+
+// This exprts the internalNew function only for use in the proxy_test package.
+func InternalNew(rid naming.RoutingID, p security.Principal, net, addr, pubAddr string) (func(), naming.Endpoint, error) {
+ return internalNew(rid, p, net, addr, pubAddr)
+}
diff --git a/profiles/proxy/protocol.vdl b/profiles/internal/ipc/stream/proxy/protocol.vdl
similarity index 100%
rename from profiles/proxy/protocol.vdl
rename to profiles/internal/ipc/stream/proxy/protocol.vdl
diff --git a/profiles/proxy/protocol.vdl.go b/profiles/internal/ipc/stream/proxy/protocol.vdl.go
similarity index 86%
rename from profiles/proxy/protocol.vdl.go
rename to profiles/internal/ipc/stream/proxy/protocol.vdl.go
index 3b9cebc..073897e 100644
--- a/profiles/proxy/protocol.vdl.go
+++ b/profiles/internal/ipc/stream/proxy/protocol.vdl.go
@@ -15,7 +15,7 @@
}
func (Request) __VDLReflect(struct {
- Name string "v.io/x/ref/profiles/proxy.Request"
+ Name string "v.io/x/ref/profiles/internal/ipc/stream/proxy.Request"
}) {
}
@@ -30,7 +30,7 @@
}
func (Response) __VDLReflect(struct {
- Name string "v.io/x/ref/profiles/proxy.Response"
+ Name string "v.io/x/ref/profiles/internal/ipc/stream/proxy.Response"
}) {
}
diff --git a/profiles/proxy/proxy.go b/profiles/internal/ipc/stream/proxy/proxy.go
similarity index 92%
rename from profiles/proxy/proxy.go
rename to profiles/internal/ipc/stream/proxy/proxy.go
index 79078a6..aed07ca 100644
--- a/profiles/proxy/proxy.go
+++ b/profiles/internal/ipc/stream/proxy/proxy.go
@@ -5,7 +5,10 @@
"fmt"
"net"
"sync"
+ "time"
+ "v.io/v23"
+ "v.io/v23/context"
"v.io/v23/ipc"
"v.io/v23/naming"
"v.io/v23/security"
@@ -13,6 +16,7 @@
"v.io/v23/vom"
"v.io/x/lib/vlog"
+ "v.io/x/ref/lib/publisher"
"v.io/x/ref/lib/upcqueue"
"v.io/x/ref/profiles/internal/ipc/stream/crypto"
"v.io/x/ref/profiles/internal/ipc/stream/id"
@@ -147,14 +151,48 @@
// New creates a new Proxy that listens for network connections on the provided
// (network, address) pair and routes VC traffic between accepted connections.
-func New(rid naming.RoutingID, principal security.Principal, network, address, pubAddress string) (*Proxy, error) {
+// TODO(mattr): This should take a ListenSpec instead of network, address, and
+// pubAddress. However using a ListenSpec requires a great deal of supporting
+// code that should be refactored out of v.io/x/ref/profiles/internal/ipc/server.go.
+func New(ctx *context.T, network, address, pubAddress string, names ...string) (shutdown func(), endpoint naming.Endpoint, err error) {
+ rid, err := naming.NewRoutingID()
+ if err != nil {
+ return nil, nil, err
+ }
+
+ proxyShutdown, endpoint, err := internalNew(
+ rid, v23.GetPrincipal(ctx), network, address, pubAddress)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ var pub publisher.Publisher
+ if len(names) > 0 {
+ pub = publisher.New(ctx, v23.GetNamespace(ctx), time.Minute)
+ pub.AddServer(endpoint.String(), false)
+ for _, name := range names {
+ pub.AddName(name)
+ }
+ }
+
+ shutdown = func() {
+ if pub != nil {
+ pub.Stop()
+ pub.WaitForStop()
+ }
+ proxyShutdown()
+ }
+ return shutdown, endpoint, nil
+}
+
+func internalNew(rid naming.RoutingID, principal security.Principal, network, address, pubAddress string) (shutdown func(), endpoint naming.Endpoint, err error) {
_, listenFn, _ := ipc.RegisteredProtocol(network)
if listenFn == nil {
- return nil, fmt.Errorf("unknown network %s", network)
+ return nil, nil, fmt.Errorf("unknown network %s", network)
}
ln, err := listenFn(network, address)
if err != nil {
- return nil, fmt.Errorf("net.Listen(%q, %q) failed: %v", network, address, err)
+ return nil, nil, fmt.Errorf("net.Listen(%q, %q) failed: %v", network, address, err)
}
if len(pubAddress) == 0 {
pubAddress = ln.Addr().String()
@@ -168,18 +206,18 @@
principal: principal,
statsName: naming.Join("ipc", "proxy", "routing-id", rid.String(), "debug"),
}
- stats.NewStringFunc(proxy.statsName, proxy.DebugString)
+ stats.NewStringFunc(proxy.statsName, proxy.debugString)
go proxy.listenLoop()
- return proxy, nil
+ return proxy.shutdown, proxy.endpoint(), nil
}
func (p *Proxy) listenLoop() {
- proxyLog().Infof("Proxy listening on (%q, %q): %v", p.ln.Addr().Network(), p.ln.Addr(), p.Endpoint())
+ proxyLog().Infof("Proxy listening on (%q, %q): %v", p.ln.Addr().Network(), p.ln.Addr(), p.endpoint())
for {
conn, err := p.ln.Accept()
if err != nil {
- proxyLog().Infof("Exiting listenLoop of proxy %q: %v", p.Endpoint(), err)
+ proxyLog().Infof("Exiting listenLoop of proxy %q: %v", p.endpoint(), err)
return
}
go p.acceptProcess(conn)
@@ -242,7 +280,7 @@
response.Error = verror.Convert(verror.ErrUnknown, nil, err)
} else {
defer p.servers.Remove(server)
- ep, err := version.ProxiedEndpoint(server.VC.RemoteAddr().RoutingID(), p.Endpoint())
+ ep, err := version.ProxiedEndpoint(server.VC.RemoteAddr().RoutingID(), p.endpoint())
if err != nil {
response.Error = verror.Convert(verror.ErrInternal, nil, err)
}
@@ -305,13 +343,13 @@
// Endpoint returns the endpoint of the proxy service. By Dialing a VC to this
// endpoint, processes can have their services exported through the proxy.
-func (p *Proxy) Endpoint() naming.Endpoint {
+func (p *Proxy) endpoint() naming.Endpoint {
ep := version.Endpoint(p.ln.Addr().Network(), p.pubAddress, p.rid)
return ep
}
// Shutdown stops the proxy service, closing all network connections.
-func (p *Proxy) Shutdown() {
+func (p *Proxy) shutdown() {
stats.Delete(p.statsName)
p.ln.Close()
p.mu.Lock()
diff --git a/profiles/proxy/proxy_test.go b/profiles/internal/ipc/stream/proxy/proxy_test.go
similarity index 81%
rename from profiles/proxy/proxy_test.go
rename to profiles/internal/ipc/stream/proxy/proxy_test.go
index 454fad3..960e34e 100644
--- a/profiles/proxy/proxy_test.go
+++ b/profiles/internal/ipc/stream/proxy/proxy_test.go
@@ -14,25 +14,25 @@
tsecurity "v.io/x/ref/lib/testutil/security"
_ "v.io/x/ref/profiles"
"v.io/x/ref/profiles/internal/ipc/stream/manager"
+ "v.io/x/ref/profiles/internal/ipc/stream/proxy"
"v.io/x/ref/profiles/internal/ipc/stream/vc"
- "v.io/x/ref/profiles/proxy"
)
//go:generate v23 test generate
func TestProxy(t *testing.T) {
- proxy, err := proxy.New(naming.FixedRoutingID(0xbbbbbbbbbbbbbbbb), nil, "tcp", "127.0.0.1:0", "")
+ shutdown, proxyEp, err := proxy.InternalNew(naming.FixedRoutingID(0xbbbbbbbbbbbbbbbb), nil, "tcp", "127.0.0.1:0", "")
if err != nil {
t.Fatal(err)
}
- defer proxy.Shutdown()
+ defer shutdown()
// Create the stream.Manager for the server.
server1 := manager.InternalNew(naming.FixedRoutingID(0x1111111111111111))
defer server1.Shutdown()
// Setup a stream.Listener that will accept VCs and Flows routed
// through the proxy.
- ln1, ep1, err := server1.Listen(proxy.Endpoint().Network(), proxy.Endpoint().String())
+ ln1, ep1, err := server1.Listen(proxyEp.Network(), proxyEp.String())
if err != nil {
t.Fatal(err)
}
@@ -43,7 +43,7 @@
defer server2.Shutdown()
// Setup a stream.Listener that will accept VCs and Flows routed
// through the proxy.
- ln2, ep2, err := server2.Listen(proxy.Endpoint().Network(), proxy.Endpoint().String())
+ ln2, ep2, err := server2.Listen(proxyEp.Network(), proxyEp.String())
if err != nil {
t.Fatal(err)
}
@@ -81,11 +81,11 @@
}
func TestDuplicateRoutingID(t *testing.T) {
- proxy, err := proxy.New(naming.FixedRoutingID(0xbbbbbbbbbbbbbbbb), nil, "tcp", "127.0.0.1:0", "")
+ shutdown, proxyEp, err := proxy.InternalNew(naming.FixedRoutingID(0xbbbbbbbbbbbbbbbb), nil, "tcp", "127.0.0.1:0", "")
if err != nil {
t.Fatal(err)
}
- defer proxy.Shutdown()
+ defer shutdown()
// Create the stream.Manager for server1 and server2, both with the same routing ID
serverRID := naming.FixedRoutingID(0x5555555555555555)
@@ -95,13 +95,13 @@
defer server2.Shutdown()
// First server to claim serverRID should win.
- ln1, ep1, err := server1.Listen(proxy.Endpoint().Network(), proxy.Endpoint().String())
+ ln1, ep1, err := server1.Listen(proxyEp.Network(), proxyEp.String())
if err != nil {
t.Fatal(err)
}
defer ln1.Close()
- ln2, ep2, err := server2.Listen(proxy.Endpoint().Network(), proxy.Endpoint().String())
+ ln2, ep2, err := server2.Listen(proxyEp.Network(), proxyEp.String())
if pattern := "routing id 00000000000000005555555555555555 is already being proxied"; err == nil || !strings.Contains(err.Error(), pattern) {
t.Errorf("Got (%v, %v, %v) want error \"...%v\" (ep1:%v)", ln2, ep2, err, pattern, ep1)
}
@@ -109,16 +109,16 @@
func TestProxyAuthentication(t *testing.T) {
pproxy := tsecurity.NewPrincipal("proxy")
- proxy, err := proxy.New(naming.FixedRoutingID(0xbbbbbbbbbbbbbbbb), pproxy, "tcp", "127.0.0.1:0", "")
+ shutdown, proxyEp, err := proxy.InternalNew(naming.FixedRoutingID(0xbbbbbbbbbbbbbbbb), pproxy, "tcp", "127.0.0.1:0", "")
if err != nil {
t.Fatal(err)
}
- defer proxy.Shutdown()
+ defer shutdown()
other := manager.InternalNew(naming.FixedRoutingID(0xcccccccccccccccc))
defer other.Shutdown()
- vc, err := other.Dial(proxy.Endpoint())
+ vc, err := other.Dial(proxyEp)
if err != nil {
t.Fatal(err)
}
@@ -133,15 +133,16 @@
}
func TestServerBlessings(t *testing.T) {
- proxy, err := proxy.New(naming.FixedRoutingID(0xbbbbbbbbbbbbbbbb), nil, "tcp", "127.0.0.1:0", "")
+ shutdown, proxyEp, err := proxy.InternalNew(naming.FixedRoutingID(0xbbbbbbbbbbbbbbbb), nil, "tcp", "127.0.0.1:0", "")
if err != nil {
t.Fatal(err)
}
+ defer shutdown()
server := manager.InternalNew(naming.FixedRoutingID(0x5555555555555555))
defer server.Shutdown()
pserver := tsecurity.NewPrincipal("server")
- ln, ep, err := server.Listen(proxy.Endpoint().Network(), proxy.Endpoint().String(), vc.LocalPrincipal{pserver})
+ ln, ep, err := server.Listen(proxyEp.Network(), proxyEp.String(), vc.LocalPrincipal{pserver})
if err != nil {
t.Fatal(err)
}
@@ -170,14 +171,14 @@
}
func TestHostPort(t *testing.T) {
- proxy, err := proxy.New(naming.FixedRoutingID(0xbbbbbbbbbbbbbbbb), nil, "tcp", "127.0.0.1:0", "")
+ shutdown, proxyEp, err := proxy.InternalNew(naming.FixedRoutingID(0xbbbbbbbbbbbbbbbb), nil, "tcp", "127.0.0.1:0", "")
if err != nil {
t.Fatal(err)
}
- defer proxy.Shutdown()
+ defer shutdown()
server := manager.InternalNew(naming.FixedRoutingID(0x5555555555555555))
defer server.Shutdown()
- addr := proxy.Endpoint().Addr().String()
+ addr := proxyEp.Addr().String()
port := addr[strings.LastIndex(addr, ":"):]
ln, _, err := server.Listen("veyron", "127.0.0.1"+port)
if err != nil {
@@ -187,19 +188,19 @@
}
func TestClientBecomesServer(t *testing.T) {
- proxy, err := proxy.New(naming.FixedRoutingID(0xbbbbbbbbbbbbbbbb), nil, "tcp", "127.0.0.1:0", "")
+ shutdown, proxyEp, err := proxy.InternalNew(naming.FixedRoutingID(0xbbbbbbbbbbbbbbbb), nil, "tcp", "127.0.0.1:0", "")
if err != nil {
t.Fatal(err)
}
server := manager.InternalNew(naming.FixedRoutingID(0x5555555555555555))
client1 := manager.InternalNew(naming.FixedRoutingID(0x1111111111111111))
client2 := manager.InternalNew(naming.FixedRoutingID(0x2222222222222222))
- defer proxy.Shutdown()
+ defer shutdown()
defer server.Shutdown()
defer client1.Shutdown()
defer client2.Shutdown()
- lnS, epS, err := server.Listen(proxy.Endpoint().Network(), proxy.Endpoint().String())
+ lnS, epS, err := server.Listen(proxyEp.Network(), proxyEp.String())
if err != nil {
t.Fatal(err)
}
@@ -220,7 +221,7 @@
}
// Now client1 becomes a server
- lnC, epC, err := client1.Listen(proxy.Endpoint().Network(), proxy.Endpoint().String())
+ lnC, epC, err := client1.Listen(proxyEp.Network(), proxyEp.String())
if err != nil {
t.Fatal(err)
}
diff --git a/profiles/proxy/v23_internal_test.go b/profiles/internal/ipc/stream/proxy/v23_internal_test.go
similarity index 100%
rename from profiles/proxy/v23_internal_test.go
rename to profiles/internal/ipc/stream/proxy/v23_internal_test.go
diff --git a/profiles/internal/ipc/v23_test.go b/profiles/internal/ipc/v23_test.go
index 614873d..e416039 100644
--- a/profiles/internal/ipc/v23_test.go
+++ b/profiles/internal/ipc/v23_test.go
@@ -15,6 +15,7 @@
func init() {
modules.RegisterChild("childPing", ``, childPing)
+ modules.RegisterChild("proxyServer", ``, proxyServer)
}
func TestMain(m *testing.M) {
diff --git a/profiles/proxy.go b/profiles/proxy.go
new file mode 100644
index 0000000..42e5ef6
--- /dev/null
+++ b/profiles/proxy.go
@@ -0,0 +1,14 @@
+package profiles
+
+import (
+ "v.io/v23/context"
+ "v.io/v23/naming"
+
+ "v.io/x/ref/profiles/internal/ipc/stream/proxy"
+)
+
+// NewProxy creates a new Proxy that listens for network connections on the provided
+// (network, address) pair and routes VC traffic between accepted connections.
+func NewProxy(ctx *context.T, network, address, pubAddress string, names ...string) (shutdown func(), endpoint naming.Endpoint, err error) {
+ return proxy.New(ctx, network, address, pubAddress, names...)
+}
diff --git a/profiles/roaming/proxy.go b/profiles/roaming/proxy.go
new file mode 100644
index 0000000..cd87645
--- /dev/null
+++ b/profiles/roaming/proxy.go
@@ -0,0 +1,14 @@
+package roaming
+
+import (
+ "v.io/v23/context"
+ "v.io/v23/naming"
+
+ "v.io/x/ref/profiles/internal/ipc/stream/proxy"
+)
+
+// NewProxy creates a new Proxy that listens for network connections on the provided
+// (network, address) pair and routes VC traffic between accepted connections.
+func NewProxy(ctx *context.T, network, address, pubAddress string, names ...string) (shutdown func(), endpoint naming.Endpoint, err error) {
+ return proxy.New(ctx, network, address, pubAddress, names...)
+}
diff --git a/profiles/static/proxy.go b/profiles/static/proxy.go
new file mode 100644
index 0000000..de58032
--- /dev/null
+++ b/profiles/static/proxy.go
@@ -0,0 +1,14 @@
+package static
+
+import (
+ "v.io/v23/context"
+ "v.io/v23/naming"
+
+ "v.io/x/ref/profiles/internal/ipc/stream/proxy"
+)
+
+// NewProxy creates a new Proxy that listens for network connections on the provided
+// (network, address) pair and routes VC traffic between accepted connections.
+func NewProxy(ctx *context.T, network, address, pubAddress string, names ...string) (shutdown func(), endpoint naming.Endpoint, err error) {
+ return proxy.New(ctx, network, address, pubAddress, names...)
+}
diff --git a/services/mgmt/device/impl/util_test.go b/services/mgmt/device/impl/util_test.go
index be0bb48..6e8fe15 100644
--- a/services/mgmt/device/impl/util_test.go
+++ b/services/mgmt/device/impl/util_test.go
@@ -24,7 +24,7 @@
"v.io/x/ref/lib/modules"
"v.io/x/ref/lib/testutil"
- _ "v.io/x/ref/profiles/static"
+ _ "v.io/x/ref/profiles/roaming"
"v.io/x/ref/services/mgmt/device/impl"
)
diff --git a/services/mgmt/device/starter/starter.go b/services/mgmt/device/starter/starter.go
index 15d32cf..4ad22f8 100644
--- a/services/mgmt/device/starter/starter.go
+++ b/services/mgmt/device/starter/starter.go
@@ -12,7 +12,7 @@
"time"
"v.io/x/ref/lib/netstate"
- "v.io/x/ref/profiles/proxy"
+ "v.io/x/ref/profiles/roaming"
"v.io/x/ref/services/mgmt/device/config"
"v.io/x/ref/services/mgmt/device/impl"
mounttable "v.io/x/ref/services/mounttable/lib"
@@ -239,10 +239,6 @@
return nil, fmt.Errorf("invalid port: %v", port)
}
port := strconv.Itoa(p.Port)
- rid, err := naming.NewRoutingID()
- if err != nil {
- return nil, fmt.Errorf("Failed to get new routing id: %v", err)
- }
protocol, addr := "tcp", net.JoinHostPort("", port)
// Attempt to get a publicly accessible address for the proxy to publish
// under.
@@ -256,14 +252,14 @@
}
publishAddr = net.JoinHostPort(addrs[0].Address().String(), port)
}
- proxy, err := proxy.New(rid, v23.GetPrincipal(ctx), protocol, addr, publishAddr)
+ shutdown, ep, err := roaming.NewProxy(ctx, protocol, addr, publishAddr)
if err != nil {
return nil, fmt.Errorf("Failed to create proxy: %v", err)
}
- vlog.Infof("Local proxy (%v)", proxy.Endpoint().Name())
+ vlog.Infof("Local proxy (%v)", ep.Name())
return func() {
vlog.Infof("Stopping proxy...")
- proxy.Shutdown()
+ shutdown()
vlog.Infof("Stopped proxy.")
}, nil
}
diff --git a/services/proxy/proxyd/main.go b/services/proxy/proxyd/main.go
index 9808787..9dae633 100644
--- a/services/proxy/proxyd/main.go
+++ b/services/proxy/proxyd/main.go
@@ -11,14 +11,11 @@
"v.io/v23"
"v.io/v23/ipc"
- "v.io/v23/naming"
"v.io/v23/security"
"v.io/x/lib/vlog"
- "v.io/x/ref/lib/publisher"
"v.io/x/ref/lib/signals"
- "v.io/x/ref/profiles/proxy"
- _ "v.io/x/ref/profiles/static"
+ "v.io/x/ref/profiles/static"
)
var (
@@ -31,10 +28,6 @@
ctx, shutdown := v23.Init()
defer shutdown()
- rid, err := naming.NewRoutingID()
- if err != nil {
- vlog.Fatal(err)
- }
listenSpec := v23.GetListenSpec(ctx)
if len(listenSpec.Addrs) != 1 {
vlog.Fatalf("proxyd can only listen on one address: %v", listenSpec.Addrs)
@@ -42,21 +35,16 @@
if listenSpec.Proxy != "" {
vlog.Fatalf("proxyd cannot listen through another proxy")
}
- proxy, err := proxy.New(rid, v23.GetPrincipal(ctx), listenSpec.Addrs[0].Protocol, listenSpec.Addrs[0].Address, *pubAddress)
+ proxyShutdown, proxyEndpoint, err := static.NewProxy(ctx, listenSpec.Addrs[0].Protocol, listenSpec.Addrs[0].Address, *pubAddress, *name)
if err != nil {
vlog.Fatal(err)
}
- defer proxy.Shutdown()
+ defer proxyShutdown()
if len(*name) > 0 {
- publisher := publisher.New(ctx, v23.GetNamespace(ctx), time.Minute)
- defer publisher.WaitForStop()
- defer publisher.Stop()
- publisher.AddServer(proxy.Endpoint().String(), false)
- publisher.AddName(*name)
// Print out a directly accessible name for the proxy table so
// that integration tests can reliably read it from stdout.
- fmt.Printf("NAME=%s\n", proxy.Endpoint().Name())
+ fmt.Printf("NAME=%s\n", proxyEndpoint.Name())
}
if len(*healthzAddr) != 0 {
@@ -70,7 +58,7 @@
vlog.Fatalf("NewServer failed: %v", err)
}
defer server.Stop()
- ls := ipc.ListenSpec{Proxy: proxy.Endpoint().Name()}
+ ls := ipc.ListenSpec{Proxy: proxyEndpoint.Name()}
if _, err := server.Listen(ls); err != nil {
vlog.Fatalf("Listen(%v) failed: %v", ls, err)
}
diff --git a/services/wsprd/app/app_test.go b/services/wsprd/app/app_test.go
index d742ac9..ed4c115 100644
--- a/services/wsprd/app/app_test.go
+++ b/services/wsprd/app/app_test.go
@@ -21,8 +21,7 @@
"v.io/v23/vtrace"
"v.io/x/ref/lib/testutil"
tsecurity "v.io/x/ref/lib/testutil/security"
- _ "v.io/x/ref/profiles"
- "v.io/x/ref/profiles/proxy"
+ "v.io/x/ref/profiles"
vsecurity "v.io/x/ref/security"
mounttable "v.io/x/ref/services/mounttable/lib"
"v.io/x/ref/services/wsprd/ipc/server"
@@ -136,14 +135,6 @@
return startAnyServer(ctx, false, testutil.LeafDispatcher(simpleAdder{}, nil))
}
-func startProxy(ctx *context.T) (*proxy.Proxy, error) {
- rid, err := naming.NewRoutingID()
- if err != nil {
- return nil, err
- }
- return proxy.New(rid, nil, "tcp", "127.0.0.1:0", "")
-}
-
func startMountTableServer(ctx *context.T) (ipc.Server, naming.Endpoint, error) {
mt, err := mounttable.NewMountTableDispatcher("")
if err != nil {
@@ -313,7 +304,7 @@
controller *Controller
writer *testwriter.Writer
mounttableServer ipc.Server
- proxyServer *proxy.Proxy
+ proxyShutdown func()
}
func makeRequest(rpc VeyronRPCRequest, args ...interface{}) (string, error) {
@@ -339,18 +330,16 @@
return nil, fmt.Errorf("unable to start mounttable: %v", err)
}
- proxyServer, err := startProxy(ctx)
+ proxyShutdown, proxyEndpoint, err := profiles.NewProxy(ctx, "tcp", "127.0.0.1:0", "")
if err != nil {
return nil, fmt.Errorf("unable to start proxy: %v", err)
}
- proxyEndpoint := proxyServer.Endpoint().String()
-
writerCreator := func(int32) lib.ClientWriter {
return writer
}
spec := v23.GetListenSpec(ctx)
- spec.Proxy = "/" + proxyEndpoint
+ spec.Proxy = proxyEndpoint.Name()
controller, err := NewController(ctx, writerCreator, &spec, nil, testPrincipal)
if err != nil {
return nil, err
@@ -373,7 +362,7 @@
testWriter, _ := writer.(*testwriter.Writer)
return &runningTest{
- controller, testWriter, mounttableServer, proxyServer,
+ controller, testWriter, mounttableServer, proxyShutdown,
}, nil
}
@@ -425,7 +414,7 @@
mock.controller = controller
})
defer rt.mounttableServer.Stop()
- defer rt.proxyServer.Shutdown()
+ defer rt.proxyShutdown()
defer rt.controller.Cleanup()
if err != nil {
diff --git a/services/wsprd/browspr/browspr_test.go b/services/wsprd/browspr/browspr_test.go
index f7fabad..eada60d 100644
--- a/services/wsprd/browspr/browspr_test.go
+++ b/services/wsprd/browspr/browspr_test.go
@@ -17,21 +17,12 @@
"v.io/v23/vom"
"v.io/x/ref/lib/testutil"
- _ "v.io/x/ref/profiles"
- "v.io/x/ref/profiles/proxy"
+ "v.io/x/ref/profiles"
mounttable "v.io/x/ref/services/mounttable/lib"
"v.io/x/ref/services/wsprd/app"
"v.io/x/ref/services/wsprd/lib"
)
-func startProxy() (*proxy.Proxy, error) {
- rid, err := naming.NewRoutingID()
- if err != nil {
- return nil, err
- }
- return proxy.New(rid, nil, "tcp", "127.0.0.1:0", "")
-}
-
func startMounttable(ctx *context.T) (ipc.Server, naming.Endpoint, error) {
mt, err := mounttable.NewMountTableDispatcher("")
if err != nil {
@@ -84,11 +75,11 @@
ctx, shutdown := testutil.InitForTest()
defer shutdown()
- proxy, err := startProxy()
+ proxyShutdown, proxyEndpoint, err := profiles.NewProxy(ctx, "tcp", "127.0.0.1:0", "")
if err != nil {
t.Fatalf("Failed to start proxy: %v", err)
}
- defer proxy.Shutdown()
+ defer proxyShutdown()
mtServer, mtEndpoint, err := startMounttable(ctx)
if err != nil {
@@ -140,7 +131,7 @@
}
spec := v23.GetListenSpec(ctx)
- spec.Proxy = proxy.Endpoint().String()
+ spec.Proxy = proxyEndpoint.String()
receivedResponse := make(chan bool, 1)
var receivedInstanceId int32