Revert "veyron/runtimes/google/ipc: refactor Listen loop code."
This reverts commit 6e2f5cf8a527e6fd306fb80c439ff906ca1c14d4.
Change-Id: I1fa57d00c16927c9256865a04fef83a077f0fc93
diff --git a/runtimes/google/ipc/client.go b/runtimes/google/ipc/client.go
index 8451c3e..e828b51 100644
--- a/runtimes/google/ipc/client.go
+++ b/runtimes/google/ipc/client.go
@@ -247,11 +247,9 @@
return nil, verror.NoExistf("ipc: Resolve(%q) failed: %v", name, err)
}
}
-
// Try all servers, and if none of them are authorized for the call then return the error of the last server
// that was tried.
var lastErr verror.E
- // TODO(cnicolaou): sort servers by sensible metric.
for _, server := range servers {
flow, suffix, err := c.connectFlow(server)
if err != nil {
diff --git a/runtimes/google/ipc/full_test.go b/runtimes/google/ipc/full_test.go
index f78191b..bdbdc1f 100644
--- a/runtimes/google/ipc/full_test.go
+++ b/runtimes/google/ipc/full_test.go
@@ -5,9 +5,7 @@
"fmt"
"io"
"net"
- "path/filepath"
"reflect"
- "runtime"
"strings"
"sync"
"testing"
@@ -191,23 +189,15 @@
return ep, server
}
-func loc(d int) string {
- _, file, line, _ := runtime.Caller(d + 1)
- return fmt.Sprintf("%s:%d", filepath.Base(file), line)
-}
-
-func verifyMount(t *testing.T, ns naming.Namespace, name string) []string {
- addrs, err := ns.Resolve(testContext(), name)
- if err != nil {
- t.Errorf("%s: %s not found in mounttable", loc(1), name)
- return nil
+func verifyMount(t *testing.T, ns naming.Namespace, name string) {
+ if _, err := ns.Resolve(testContext(), name); err != nil {
+ t.Errorf("%s not found in mounttable", name)
}
- return addrs
}
func verifyMountMissing(t *testing.T, ns naming.Namespace, name string) {
if servers, err := ns.Resolve(testContext(), name); err == nil {
- t.Errorf("%s: %s not supposed to be found in mounttable; got %d servers instead", loc(1), name, len(servers))
+ t.Errorf("%s not supposed to be found in mounttable; got %d servers instead", name, len(servers))
}
}
diff --git a/runtimes/google/ipc/server.go b/runtimes/google/ipc/server.go
index d933f4a..42172dd 100644
--- a/runtimes/google/ipc/server.go
+++ b/runtimes/google/ipc/server.go
@@ -57,11 +57,10 @@
type dhcpListener struct {
sync.Mutex
- publisher *config.Publisher // publisher used to fork the stream
- name string // name of the publisher stream
- ep *inaming.Endpoint // endpoint returned after listening
- pubAddrs []ipc.Address // addresses to publish
- pubPort string // port to use with the publish addresses
+ publisher *config.Publisher // publisher used to fork the stream
+ name string // name of the publisher stream
+ ep *inaming.Endpoint // endpoint returned after listening and choosing an address to be published
+ port string
ch chan config.Setting // channel to receive settings over
}
@@ -126,6 +125,122 @@
return "", fmt.Errorf("unable to resolve %q to an endpoint", address)
}
+/*
+// ipAddressChooser returns the preferred IP address, which is,
+// a public IPv4 address, then any non-loopback IPv4, then a public
+// IPv6 address and finally any non-loopback/link-local IPv6
+// It is replicated here to avoid a circular dependency and will, in any case,
+// go away when we transition away from Listen to the ListenX API.
+func ipAddressChooser(network string, addrs []ipc.Address) ([]ipc.Address, error) {
+ if !netstate.IsIPProtocol(network) {
+ return nil, fmt.Errorf("can't support network protocol %q", network)
+ }
+ accessible := netstate.AddrList(addrs)
+
+ // Try and find an address on a interface with a default route.
+ predicates := []netstate.AddressPredicate{netstate.IsPublicUnicastIPv4,
+ netstate.IsUnicastIPv4, netstate.IsPublicUnicastIPv6}
+ for _, predicate := range predicates {
+ if addrs := accessible.Filter(predicate); len(addrs) > 0 {
+ onDefaultRoutes := addrs.Filter(netstate.IsOnDefaultRoute)
+ if len(onDefaultRoutes) > 0 {
+ return onDefaultRoutes, nil
+ }
+ }
+ }
+
+ // We failed to find any addresses with default routes, try again
+ // but without the default route requirement.
+ for _, predicate := range predicates {
+ if addrs := accessible.Filter(predicate); len(addrs) > 0 {
+ return addrs, nil
+ }
+ }
+
+ return nil, fmt.Errorf("failed to find any usable address for %q", network)
+}
+
+func (s *server) Listen(protocol, address string) (naming.Endpoint, error) {
+ defer vlog.LogCall()()
+ s.Lock()
+ // Shortcut if the server is stopped, to avoid needlessly creating a
+ // listener.
+ if s.stopped {
+ s.Unlock()
+ return nil, errServerStopped
+ }
+ s.Unlock()
+ var proxyName string
+ if protocol == inaming.Network {
+ proxyName = address
+ var err error
+ if address, err = s.resolveToAddress(address); err != nil {
+ return nil, err
+ }
+ }
+ // TODO(cnicolaou): pass options.ServesMountTable to streamMgr.Listen so that
+ // it can more cleanly set the IsMountTable bit in the endpoint.
+ ln, ep, err := s.streamMgr.Listen(protocol, address, s.listenerOpts...)
+ if err != nil {
+ vlog.Errorf("ipc: Listen on %v %v failed: %v", protocol, address, err)
+ return nil, err
+ }
+ iep, ok := ep.(*inaming.Endpoint)
+ if !ok {
+ return nil, fmt.Errorf("ipc: Listen on %v %v failed translating internal endpoint data types", protocol, address)
+ }
+
+ if protocol != inaming.Network {
+ // We know the endpoint format, so we crack it open...
+ switch iep.Protocol {
+ case "tcp", "tcp4", "tcp6":
+ host, port, err := net.SplitHostPort(iep.Address)
+ if err != nil {
+ return nil, err
+ }
+ ip := net.ParseIP(host)
+ if ip == nil {
+ return nil, fmt.Errorf("ipc: Listen(%q, %q) failed to parse IP address from address", protocol, address)
+ }
+ if ip.IsUnspecified() {
+ addrs, err := netstate.GetAccessibleIPs()
+ if err == nil {
+ if a, err := ipAddressChooser(iep.Protocol, addrs); err == nil && len(a) > 0 {
+ iep.Address = net.JoinHostPort(a[0].Address().String(), port)
+ }
+ }
+ }
+ }
+ }
+
+ s.Lock()
+ if s.stopped {
+ s.Unlock()
+ // Ignore error return since we can't really do much about it.
+ ln.Close()
+ return nil, errServerStopped
+ }
+ s.listeners[ln] = nil
+ // We have a single goroutine per listener to accept new flows.
+ // Each flow is served from its own goroutine.
+ s.active.Add(1)
+ if protocol == inaming.Network {
+ go func(ln stream.Listener, ep *inaming.Endpoint, proxy string) {
+ s.proxyListenLoop(ln, ep, proxy)
+ s.active.Done()
+ }(ln, iep, proxyName)
+ } else {
+ go func(ln stream.Listener, ep naming.Endpoint) {
+ s.listenLoop(ln, ep)
+ s.active.Done()
+ }(ln, iep)
+ }
+ s.Unlock()
+ s.publisher.AddServer(s.publishEP(iep, s.servesMountTable), s.servesMountTable)
+ return ep, nil
+}
+*/
+
// externalEndpoint examines the endpoint returned by the stream listen call
// and fills in the address to publish to the mount table. It also returns the
// IP host address that it selected for publishing to the mount table.
@@ -167,76 +282,6 @@
return iep, nil, nil
}
-func addrFromIP(ip net.IP) ipc.Address {
- return &netstate.AddrIfc{
- Addr: &net.IPAddr{IP: ip},
- }
-}
-
-// getIPRoamingAddrs finds an appropriate set of addresss to publish
-// externally and also determines if it's sensible to allow roaming.
-// It returns the host address of the first suitable address that
-// can be used and the port number that can be used with all addresses.
-// The host is required to allow the caller to construct an endpoint
-// that can be returned to the caller of Listen.
-func (s *server) getIPRoamingAddrs(chooser ipc.AddressChooser, iep *inaming.Endpoint) (addresses []ipc.Address, host string, port string, roaming bool, err error) {
- host, port, err = net.SplitHostPort(iep.Address)
- if err != nil {
- return nil, "", "", false, err
- }
- ip := net.ParseIP(host)
- if ip == nil {
- return nil, "", "", false, fmt.Errorf("failed to parse %q as an IP host", host)
- }
- if ip.IsUnspecified() && chooser != nil {
- // Need to find a usable IP address since the call to listen
- // didn't specify one.
- if addrs, err := netstate.GetAccessibleIPs(); err == nil {
- if a, err := chooser(iep.Protocol, addrs); err == nil && len(a) > 0 {
- phost := a[0].Address().String()
- iep.Address = net.JoinHostPort(phost, port)
- return a, phost, port, true, nil
- }
- }
- return []ipc.Address{addrFromIP(ip)}, host, port, true, nil
- }
- // Listen used a fixed IP address, which we take to mean that
- // roaming is not desired.
- return []ipc.Address{addrFromIP(ip)}, host, port, false, nil
-}
-
-// configureEPAndRoaming configures the endpoint and roaming. In particular,
-// it fills in the Address portion of the endpoint with the appropriately
-// selected network address and creates a dhcpListener struct if roaming
-// is enabled.
-func (s *server) configureEPAndRoaming(spec ipc.ListenSpec, ep naming.Endpoint) (*dhcpListener, *inaming.Endpoint, error) {
- iep, ok := ep.(*inaming.Endpoint)
- if !ok {
- return nil, nil, fmt.Errorf("internal type conversion error for %T", ep)
- }
- if !strings.HasPrefix(spec.Protocol, "tcp") {
- return nil, iep, nil
- }
- pubAddrs, pubHost, pubPort, roaming, err := s.getIPRoamingAddrs(spec.AddressChooser, iep)
- if err != nil {
- return nil, iep, err
- }
- iep.Address = net.JoinHostPort(pubHost, pubPort)
- if !roaming {
- vlog.VI(2).Infof("the address %q requested for listening contained a fixed IP address which disables roaming, use :0 instead", spec.Address)
- }
- publisher := spec.StreamPublisher
- if roaming && publisher != nil {
- streamName := spec.StreamName
- ch := make(chan config.Setting)
- if _, err := publisher.ForkStream(streamName, ch); err != nil {
- return nil, iep, fmt.Errorf("failed to fork stream %q: %s", streamName, err)
- }
- return &dhcpListener{ep: iep, pubAddrs: pubAddrs, pubPort: pubPort, ch: ch, name: streamName, publisher: publisher}, iep, nil
- }
- return nil, iep, nil
-}
-
func (s *server) Listen(listenSpec ipc.ListenSpec) (naming.Endpoint, error) {
defer vlog.LogCall()()
s.Lock()
@@ -248,67 +293,100 @@
}
s.Unlock()
- var iep *inaming.Endpoint
- var dhcpl *dhcpListener
- var ln stream.Listener
-
- if len(listenSpec.Address) > 0 {
- // Listen if we have a local address to listen on. Some situations
- // just need a proxy (e.g. a browser extension).
- tmpln, lep, err := s.streamMgr.Listen(listenSpec.Protocol, listenSpec.Address, s.listenerOpts...)
- if err != nil {
- vlog.Errorf("ipc: Listen on %s failed: %s", listenSpec, err)
- return nil, err
- }
- ln = tmpln
- if tmpdhcpl, tmpiep, err := s.configureEPAndRoaming(listenSpec, lep); err != nil {
- ln.Close()
+ protocol := listenSpec.Protocol
+ address := listenSpec.Address
+ proxyAddress := ""
+ if len(listenSpec.Proxy) > 0 {
+ if address, err := s.resolveToAddress(listenSpec.Proxy); err != nil {
return nil, err
} else {
- dhcpl = tmpdhcpl
- iep = tmpiep
+ proxyAddress = address
}
}
+ ln, lep, err := s.streamMgr.Listen(protocol, address, s.listenerOpts...)
+ if err != nil {
+ vlog.Errorf("ipc: Listen on %v %v failed: %v", protocol, address, err)
+ return nil, err
+ }
+ ep, ipaddr, err := s.externalEndpoint(listenSpec.AddressChooser, lep)
+ if err != nil {
+ ln.Close()
+ return nil, err
+ }
+
s.Lock()
- defer s.Unlock()
if s.stopped {
+ s.Unlock()
+ // Ignore error return since we can't really do much about it.
ln.Close()
return nil, errServerStopped
}
- if dhcpl != nil {
+ var ip net.IP
+ if ipaddr != nil {
+ ip = net.ParseIP(ipaddr.String())
+ } else {
+ vlog.VI(2).Infof("the address %q requested for listening contained a fixed IP address which disables roaming, use :0 instead", address)
+ }
+ publisher := listenSpec.StreamPublisher
+ if ip != nil && !ip.IsLoopback() && publisher != nil {
+ streamName := listenSpec.StreamName
+ ch := make(chan config.Setting)
+ _, err := publisher.ForkStream(streamName, ch)
+ if err != nil {
+ return nil, fmt.Errorf("failed to fork stream %q: %s", streamName, err)
+ }
+ _, port, _ := net.SplitHostPort(ep.Address)
+ dhcpl := &dhcpListener{ep: ep, port: port, ch: ch, name: streamName, publisher: publisher}
+
// We have a goroutine to listen for dhcp changes.
- go func() {
- s.active.Add(1)
- s.dhcpLoop(dhcpl)
+ s.active.Add(1)
+ // goroutine to listen for address changes.
+ go func(dl *dhcpListener) {
+ s.dhcpLoop(dl)
s.active.Done()
- }()
+ }(dhcpl)
s.listeners[ln] = dhcpl
- } else if ln != nil {
+ } else {
s.listeners[ln] = nil
}
- if iep != nil {
- // We have a goroutine per listener to accept new flows.
- // Each flow is served from its own goroutine.
- go func() {
- s.active.Add(1)
- s.listenLoop(ln, iep)
- s.active.Done()
- }()
- s.publisher.AddServer(s.publishEP(iep, s.servesMountTable), s.servesMountTable)
- }
+ // We have a goroutine per listener to accept new flows.
+ // Each flow is served from its own goroutine.
+ s.active.Add(1)
- if len(listenSpec.Proxy) > 0 {
+ // goroutine to listen for connections
+ go func(ln stream.Listener, ep naming.Endpoint) {
+ s.listenLoop(ln, ep)
+ s.active.Done()
+ }(ln, lep)
+
+ if len(proxyAddress) > 0 {
+ pln, pep, err := s.streamMgr.Listen(inaming.Network, proxyAddress, s.listenerOpts...)
+ if err != nil {
+ vlog.Errorf("ipc: Listen on %v %v failed: %v", protocol, address, err)
+ return nil, err
+ }
+ ipep, ok := pep.(*inaming.Endpoint)
+ if !ok {
+ return nil, fmt.Errorf("failed translating internal endpoint data types")
+ }
// We have a goroutine for listening on proxy connections.
- go func() {
- s.active.Add(1)
- s.proxyListenLoop(listenSpec.Proxy)
+ s.active.Add(1)
+ go func(ln stream.Listener, ep *inaming.Endpoint, proxy string) {
+ s.proxyListenLoop(ln, ep, proxy)
s.active.Done()
- }()
+ }(pln, ipep, listenSpec.Proxy)
+ s.listeners[pln] = nil
+ // TODO(cnicolaou,p): AddServer no longer needs to take the
+ // servesMountTable bool since it can be extracted from the endpoint.
+ s.publisher.AddServer(s.publishEP(ipep, s.servesMountTable), s.servesMountTable)
+ } else {
+ s.publisher.AddServer(s.publishEP(ep, s.servesMountTable), s.servesMountTable)
}
- return iep, nil
+ s.Unlock()
+ return ep, nil
}
// TODO(cnicolaou): Take this out or make the ServesMountTable bit work in the endpoint.
@@ -318,68 +396,63 @@
return naming.JoinAddressName(ep.String(), name)
}
-func (s *server) reconnectAndPublishProxy(proxy string) (*inaming.Endpoint, stream.Listener, error) {
- resolved, err := s.resolveToAddress(proxy)
- if err != nil {
- return nil, nil, fmt.Errorf("Failed to resolve proxy %q (%v)", proxy, err)
- }
- ln, ep, err := s.streamMgr.Listen(inaming.Network, resolved, s.listenerOpts...)
- if err != nil {
- return nil, nil, fmt.Errorf("failed to listen on %q: %s", resolved, err)
- }
- iep, ok := ep.(*inaming.Endpoint)
- if !ok {
- return nil, nil, fmt.Errorf("internal type conversion error for %T", ep)
- }
- s.Lock()
- s.listeners[ln] = nil
- s.Unlock()
- s.publisher.AddServer(s.publishEP(iep, s.servesMountTable), s.servesMountTable)
- return iep, ln, nil
-}
-
-func (s *server) proxyListenLoop(proxy string) {
+func (s *server) proxyListenLoop(ln stream.Listener, iep *inaming.Endpoint, proxy string) {
const (
min = 5 * time.Millisecond
max = 5 * time.Minute
)
-
- iep, ln, err := s.reconnectAndPublishProxy(proxy)
- if err != nil {
- vlog.VI(1).Infof("Failed to connect to proxy: %s", err)
- }
- // the initial connection maybe have failed, but we enter the retry
- // loop anyway so that we will continue to try and connect to the
- // proxy.
for {
- if ln != nil && iep != nil {
- s.listenLoop(ln, iep)
- // The listener is done, so:
- // (1) Unpublish its name
- s.publisher.RemoveServer(s.publishEP(iep, s.servesMountTable))
- }
-
+ s.listenLoop(ln, iep)
+ // The listener is done, so:
+ // (1) Unpublish its name
+ s.publisher.RemoveServer(s.publishEP(iep, s.servesMountTable))
// (2) Reconnect to the proxy unless the server has been stopped
backoff := min
ln = nil
- for {
+ // TODO(ashankar,cnicolaou): this code is way too confusing and should
+ // be cleaned up.
+ for ln == nil {
select {
case <-time.After(backoff):
+ resolved, err := s.resolveToAddress(proxy)
+ if err != nil {
+ vlog.VI(1).Infof("Failed to resolve proxy %q (%v), will retry in %v", proxy, err, backoff)
+ if backoff = backoff * 2; backoff > max {
+ backoff = max
+ }
+ break
+ }
+ var ep naming.Endpoint
+ ln, ep, err = s.streamMgr.Listen(inaming.Network, resolved, s.listenerOpts...)
+ if err == nil {
+ var ok bool
+ iep, ok = ep.(*inaming.Endpoint)
+ if !ok {
+ vlog.Errorf("failed translating internal endpoint data types")
+ ln = nil
+ continue
+ }
+ vlog.VI(1).Infof("Reconnected to proxy at %q listener: (%v, %v)", proxy, ln, iep)
+ break
+ }
if backoff = backoff * 2; backoff > max {
backoff = max
}
+ vlog.VI(1).Infof("Proxy reconnection failed, will retry in %v", backoff)
case <-s.stoppedChan:
return
}
- var err error
- // (3) reconnect, publish new address
- if iep, ln, err = s.reconnectAndPublishProxy(proxy); err != nil {
- vlog.VI(1).Infof("Failed to reconnect to proxy %q: %s", proxy, err)
- } else {
- vlog.VI(1).Infof("Reconnected to proxy %q, %s", proxy, iep)
- break
- }
}
+ // TODO(cnicolaou,ashankar): this won't work when we are both
+ // proxying and publishing locally, which is the common case.
+ // listenLoop, dhcpLoop and the original publish are all publishing
+ // addresses to the same name, but the client is not smart enough
+ // to choose sensibly between them.
+ // (3) reconnected, publish new address
+ s.publisher.AddServer(s.publishEP(iep, s.servesMountTable), s.servesMountTable)
+ s.Lock()
+ s.listeners[ln] = nil
+ s.Unlock()
}
}
@@ -415,7 +488,7 @@
defer dhcpl.Unlock()
for _, a := range addrs {
if ip := netstate.AsIP(a); ip != nil {
- dhcpl.ep.Address = net.JoinHostPort(ip.String(), dhcpl.pubPort)
+ dhcpl.ep.Address = net.JoinHostPort(ip.String(), dhcpl.port)
fn(s.publishEP(dhcpl.ep, s.servesMountTable))
}
}
@@ -424,14 +497,6 @@
func (s *server) dhcpLoop(dhcpl *dhcpListener) {
defer vlog.VI(1).Infof("ipc: Stopped listen for dhcp changes on %v", dhcpl.ep)
vlog.VI(2).Infof("ipc: dhcp loop")
-
- ep := *dhcpl.ep
- // Publish all of the addresses
- for _, pubAddr := range dhcpl.pubAddrs {
- ep.Address = net.JoinHostPort(pubAddr.Address().String(), dhcpl.pubPort)
- s.publisher.AddServer(s.publishEP(&ep, s.servesMountTable), s.servesMountTable)
- }
-
for setting := range dhcpl.ch {
if setting == nil {
return
@@ -445,6 +510,11 @@
s.Unlock()
return
}
+ // TODO(cnicolaou,ashankar): this won't work when we are both
+ // proxying and publishing locally, which is the common case.
+ // listenLoop, dhcpLoop and the original publish are all publishing
+ // addresses to the same name, but the client is not smart enough
+ // to choose sensibly between them.
publisher := s.publisher
s.Unlock()
switch setting.Name() {
@@ -455,6 +525,7 @@
vlog.Infof("Removed some addresses: %q", v)
s.applyChange(dhcpl, v, publisher.RemoveServer)
}
+
}
}
}
diff --git a/runtimes/google/ipc/server_test.go b/runtimes/google/ipc/server_test.go
index aa3504f..11959f0 100644
--- a/runtimes/google/ipc/server_test.go
+++ b/runtimes/google/ipc/server_test.go
@@ -4,21 +4,19 @@
"fmt"
"io"
"os"
- "reflect"
- "sort"
"strings"
"testing"
"time"
- "veyron.io/veyron/veyron2/ipc"
"veyron.io/veyron/veyron2/naming"
"veyron.io/veyron/veyron/lib/expect"
+ "veyron.io/veyron/veyron/runtimes/google/ipc/stream/vc"
+
"veyron.io/veyron/veyron/lib/modules"
imanager "veyron.io/veyron/veyron/runtimes/google/ipc/stream/manager"
"veyron.io/veyron/veyron/runtimes/google/ipc/stream/proxy"
"veyron.io/veyron/veyron/runtimes/google/ipc/stream/sectest"
- "veyron.io/veyron/veyron/runtimes/google/ipc/stream/vc"
inaming "veyron.io/veyron/veyron/runtimes/google/naming"
tnaming "veyron.io/veyron/veyron/runtimes/google/testing/mocks/naming"
)
@@ -115,22 +113,7 @@
return h.ns.Unmount(testContext(), "proxy", h.mount)
}
-func TestProxyOnly(t *testing.T) {
- listenSpec := ipc.ListenSpec{Proxy: "proxy"}
- testProxy(t, listenSpec)
-}
-
func TestProxy(t *testing.T) {
- listenSpec := ipc.ListenSpec{Protocol: "tcp", Address: "127.0.0.1:0", Proxy: "proxy"}
- testProxy(t, listenSpec)
-}
-
-func addrOnly(name string) string {
- addr, _ := naming.SplitAddressName(name)
- return addr
-}
-
-func testProxy(t *testing.T, spec ipc.ListenSpec) {
sm := imanager.InternalNew(naming.FixedRoutingID(0x555555555))
ns := tnaming.NewSimpleNamespace()
client, err := InternalNewClient(sm, ns, vc.LocalPrincipal{sectest.NewPrincipal("client")})
@@ -144,10 +127,6 @@
}
defer server.Stop()
- // If no address is specified then we'll only 'listen' via
- // the proxy.
- hasLocalListener := len(spec.Address) != 0
-
name := "mountpoint/server/suffix"
makeCall := func() (string, error) {
ctx, _ := testContext().WithDeadline(time.Now().Add(5 * time.Second))
@@ -169,69 +148,15 @@
t.Fatal(err)
}
defer proxy.Stop()
- addrs := verifyMount(t, ns, "proxy")
- if len(addrs) != 1 {
- t.Fatalf("failed to lookup proxy")
- }
- proxyEP := addrOnly(addrs[0])
-
- ep, err := server.Listen(spec)
- if err != nil {
+ spec := listenSpec
+ spec.Proxy = "proxy"
+ if _, err := server.Listen(spec); err != nil {
t.Fatal(err)
}
if err := server.ServeDispatcher("mountpoint/server", testServerDisp{&testServer{}}); err != nil {
t.Fatal(err)
}
-
- ch := make(chan struct{})
- // Proxy connections are started asynchronously, so we need to wait..
- waitfor := func(expect int) {
- for {
- addrs, _ := ns.Resolve(testContext(), name)
- if len(addrs) == expect {
- close(ch)
- return
- }
- time.Sleep(100 * time.Millisecond)
- }
- }
-
- proxiedEP, err := inaming.NewEndpoint(proxyEP)
- if err != nil {
- t.Fatalf("unexpected error: %s", err)
- }
- proxiedEP.RID = naming.FixedRoutingID(0x555555555)
- expectedEndpoints := []string{proxiedEP.String()}
- if hasLocalListener {
- expectedEndpoints = append(expectedEndpoints, ep.String())
- }
-
- // Proxy connetions are created asynchronously, so we wait for the
- // expected number of endpoints to appear for the specified service name.
- go waitfor(len(expectedEndpoints))
- select {
- case <-time.After(time.Minute):
- t.Fatalf("timedout waiting for two entries in the mount table")
- case <-ch:
- }
-
- got := []string{}
- for _, s := range verifyMount(t, ns, name) {
- got = append(got, addrOnly(s))
- }
- sort.Strings(got)
- sort.Strings(expectedEndpoints)
- if !reflect.DeepEqual(got, expectedEndpoints) {
- t.Errorf("got %v, want %v", got, expectedEndpoints)
- }
-
- if hasLocalListener {
- // Listen will publish both the local and proxied endpoint with the
- // mount table, given that we're trying to test the proxy, we remove
- // the local endpoint from the mount table entry!
- ns.Unmount(testContext(), "mountpoint/server", naming.JoinAddressName(ep.String(), ""))
- }
-
+ verifyMount(t, ns, name)
// Proxied endpoint should be published and RPC should succeed (through proxy)
const expected = `method:"Echo",suffix:"suffix",arg:"batman"`
if result, err := makeCall(); result != expected || err != nil {