x.ref: for CLs 9907 and 9908 that cleanup address selection in v23/rpc.
MultiPart: 3/4
Change-Id: I72ffca8ad774850e0a28d139da5cf7cb9ab546c0
diff --git a/profiles/gce/init.go b/profiles/gce/init.go
index 15c96d3..73a675f 100644
--- a/profiles/gce/init.go
+++ b/profiles/gce/init.go
@@ -58,8 +58,8 @@
if ip, err := gce.ExternalIPAddress(); err != nil {
return nil, nil, nil, err
} else {
- listenSpec.AddressChooser = func(network string, addrs []rpc.Address) ([]rpc.Address, error) {
- return []rpc.Address{&netstate.AddrIfc{&net.IPAddr{IP: ip}, "gce-nat", nil}}, nil
+ listenSpec.AddressChooser = func(network string, addrs []net.Addr) ([]net.Addr, error) {
+ return []net.Addr{netstate.NewNetAddr("wsh", ip.String())}, nil
}
}
diff --git a/profiles/internal/lib/publisher/publisher.go b/profiles/internal/lib/publisher/publisher.go
index acdd006..6c87030 100644
--- a/profiles/internal/lib/publisher/publisher.go
+++ b/profiles/internal/lib/publisher/publisher.go
@@ -335,7 +335,7 @@
if status.LastUnmountErr != nil {
vlog.Errorf("rpc pub: couldn't unmount(%v, %v): %v", name, server, status.LastUnmountErr)
} else {
- vlog.Infof("rpc pub: unmount(%v, %v)", name, server)
+ vlog.VI(1).Infof("rpc pub: unmount(%v, %v)", name, server)
delete(ps.mounts, mountKey{name, server})
}
}
diff --git a/profiles/internal/rpc/full_test.go b/profiles/internal/rpc/full_test.go
index 2e47921..e978fc0 100644
--- a/profiles/internal/rpc/full_test.go
+++ b/profiles/internal/rpc/full_test.go
@@ -1520,10 +1520,8 @@
sm := imanager.InternalNew(naming.FixedRoutingID(0x555555555))
defer sm.Shutdown()
ns := tnaming.NewSimpleNamespace()
- pa := func(string, []rpc.Address) ([]rpc.Address, error) {
- a := &net.IPAddr{}
- a.IP = net.ParseIP("1.1.1.1")
- return []rpc.Address{&netstate.AddrIfc{Addr: a}}, nil
+ pa := func(string, []net.Addr) ([]net.Addr, error) {
+ return []net.Addr{netstate.NewNetAddr("tcp", "1.1.1.1")}, nil
}
server, err := testInternalNewServer(ctx, sm, ns, testutil.NewPrincipal("server"))
if err != nil {
@@ -1565,7 +1563,7 @@
sm := imanager.InternalNew(naming.FixedRoutingID(0x555555555))
defer sm.Shutdown()
ns := tnaming.NewSimpleNamespace()
- paerr := func(_ string, a []rpc.Address) ([]rpc.Address, error) {
+ paerr := func(_ string, a []net.Addr) ([]net.Addr, error) {
return nil, fmt.Errorf("oops")
}
server, err := testInternalNewServer(ctx, sm, ns, testutil.NewPrincipal("server"))
@@ -1578,17 +1576,16 @@
AddressChooser: paerr,
}
eps, err := server.Listen(spec)
- iep := eps[0].(*inaming.Endpoint)
- host, _, err := net.SplitHostPort(iep.Address)
- if err != nil {
- t.Errorf("unexpected error: %s", err)
+
+ if got, want := len(eps), 0; got != want {
+ t.Errorf("got %q, want %q", got, want)
}
- ip := net.ParseIP(host)
- if ip == nil {
- t.Fatalf("failed to parse IP address: %q", host)
+ status := server.Status()
+ if got, want := len(status.Errors), 1; got != want {
+ t.Errorf("got %q, want %q", got, want)
}
- if !ip.IsUnspecified() {
- t.Errorf("IP: %q is not unspecified", ip)
+ if got, want := status.Errors[0].Error(), "oops"; got != want {
+ t.Errorf("got %q, want %q", got, want)
}
}
diff --git a/profiles/internal/rpc/protocols/tcp/init.go b/profiles/internal/rpc/protocols/tcp/init.go
index a6067b9..08f95e6 100644
--- a/profiles/internal/rpc/protocols/tcp/init.go
+++ b/profiles/internal/rpc/protocols/tcp/init.go
@@ -22,6 +22,7 @@
}
func tcpDial(network, address string, timeout time.Duration) (net.Conn, error) {
+ vlog.Infof("tcp.Dial %v", address)
conn, err := net.DialTimeout(network, address, timeout)
if err != nil {
return nil, err
diff --git a/profiles/internal/rpc/server.go b/profiles/internal/rpc/server.go
index 91b3a1a..d41aa64 100644
--- a/profiles/internal/rpc/server.go
+++ b/profiles/internal/rpc/server.go
@@ -14,6 +14,9 @@
"sync"
"time"
+ "v.io/x/lib/netstate"
+ "v.io/x/lib/vlog"
+
"v.io/v23/config"
"v.io/v23/context"
"v.io/v23/namespace"
@@ -26,8 +29,7 @@
"v.io/v23/verror"
"v.io/v23/vom"
"v.io/v23/vtrace"
- "v.io/x/lib/netstate"
- "v.io/x/lib/vlog"
+
"v.io/x/ref/lib/stats"
"v.io/x/ref/profiles/internal/lib/publisher"
inaming "v.io/x/ref/profiles/internal/naming"
@@ -316,46 +318,10 @@
return "", verror.New(errFailedToResolveToEndpoint, s.ctx, address)
}
-// getPossbileAddrs returns an appropriate set of addresses that could be used
-// to contact the supplied protocol, host, port parameters using the supplied
-// chooser function. It returns an indication of whether the supplied address
-// was fully specified or not, returning false if the address was fully
-// specified, and true if it was not.
-func getPossibleAddrs(protocol, host, port string, chooser rpc.AddressChooser) ([]rpc.Address, bool, error) {
-
- ip := net.ParseIP(host)
- if ip == nil {
- return nil, false, verror.New(errFailedToParseIP, nil, host)
- }
-
- addrFromIP := func(ip net.IP) rpc.Address {
- return &netstate.AddrIfc{
- Addr: &net.IPAddr{IP: ip},
- }
- }
-
- if ip.IsUnspecified() {
- if chooser != nil {
- // Need to find a usable IP address since the call to listen
- // didn't specify one.
- if addrs, err := netstate.GetAccessibleIPs(); err == nil {
- a, err := chooser(protocol, addrs)
- if err == nil && len(a) > 0 {
- return a, true, nil
- }
- }
- }
- // We don't have a chooser, so we just return the address the
- // underlying system has chosen.
- return []rpc.Address{addrFromIP(ip)}, true, nil
- }
- return []rpc.Address{addrFromIP(ip)}, false, nil
-}
-
// createEndpoints creates appropriate inaming.Endpoint instances for
// all of the externally accessible network addresses that can be used
// to reach this server.
-func (s *server) createEndpoints(lep naming.Endpoint, chooser rpc.AddressChooser) ([]*inaming.Endpoint, string, bool, error) {
+func (s *server) createEndpoints(lep naming.Endpoint, chooser netstate.AddressChooser) ([]*inaming.Endpoint, string, bool, error) {
iep, ok := lep.(*inaming.Endpoint)
if !ok {
return nil, "", false, verror.New(errInternalTypeConversion, nil, fmt.Sprintf("%T", lep))
@@ -365,15 +331,15 @@
// If not tcp, ws, or wsh, just return the endpoint we were given.
return []*inaming.Endpoint{iep}, "", false, nil
}
-
host, port, err := net.SplitHostPort(iep.Address)
if err != nil {
return nil, "", false, err
}
- addrs, unspecified, err := getPossibleAddrs(iep.Protocol, host, port, chooser)
+ addrs, unspecified, err := netstate.PossibleAddresses(iep.Protocol, host, chooser)
if err != nil {
return nil, port, false, err
}
+
ieps := make([]*inaming.Endpoint, 0, len(addrs))
for _, addr := range addrs {
n, err := inaming.NewEndpoint(lep.String())
@@ -381,7 +347,7 @@
return nil, port, false, err
}
n.IsMountTable = s.servesMountTable
- n.Address = net.JoinHostPort(addr.Address().String(), port)
+ n.Address = net.JoinHostPort(addr.String(), port)
ieps = append(ieps, n)
}
return ieps, port, unspecified, nil
@@ -652,7 +618,7 @@
return
}
switch v := setting.Value().(type) {
- case []rpc.Address:
+ case []net.Addr:
s.Lock()
if s.isStopState() {
s.Unlock()
@@ -687,20 +653,20 @@
}
}
-func getHost(address rpc.Address) string {
- host, _, err := net.SplitHostPort(address.Address().String())
+func getHost(address net.Addr) string {
+ host, _, err := net.SplitHostPort(address.String())
if err == nil {
return host
}
- return address.Address().String()
+ return address.String()
}
// Remove all endpoints that have the same host address as the supplied
// address parameter.
-func (s *server) removeAddresses(addresses []rpc.Address) ([]naming.Endpoint, error) {
+func (s *server) removeAddresses(addrs []net.Addr) ([]naming.Endpoint, error) {
var removed []naming.Endpoint
- for _, address := range addresses {
+ for _, address := range addrs {
host := getHost(address)
for ls, _ := range s.listenState {
if ls != nil && ls.roaming && len(ls.ieps) > 0 {
@@ -735,13 +701,16 @@
// externally accessible.
// This places the onus on the dhcp/roaming code that sends us addresses
// to ensure that those addresses are externally reachable.
-func (s *server) addAddresses(addresses []rpc.Address) []naming.Endpoint {
+func (s *server) addAddresses(addrs []net.Addr) []naming.Endpoint {
var added []naming.Endpoint
- for _, address := range addresses {
+ vlog.Infof("HERE WITH %v -> %v", addrs, netstate.ConvertToAddresses(addrs))
+ for _, address := range netstate.ConvertToAddresses(addrs) {
if !netstate.IsAccessibleIP(address) {
+ vlog.Infof("RETURN A %v", added)
return added
}
host := getHost(address)
+ vlog.Infof("LISTEN ST: %v", s.listenState)
for ls, _ := range s.listenState {
if ls != nil && ls.roaming {
niep := ls.protoIEP
@@ -755,6 +724,7 @@
}
}
}
+ vlog.Infof("RETURN B %v", added)
return added
}
diff --git a/profiles/internal/rpc/server_test.go b/profiles/internal/rpc/server_test.go
index 4e4dbe5..4e30884 100644
--- a/profiles/internal/rpc/server_test.go
+++ b/profiles/internal/rpc/server_test.go
@@ -371,7 +371,7 @@
return &niep
}
-func getIPAddrs(eps []naming.Endpoint) []rpc.Address {
+func getIPAddrs(eps []naming.Endpoint) []net.Addr {
hosts := map[string]struct{}{}
for _, ep := range eps {
iep := (ep).(*inaming.Endpoint)
@@ -380,10 +380,9 @@
hosts[h] = struct{}{}
}
}
- addrs := []rpc.Address{}
+ addrs := []net.Addr{}
for h, _ := range hosts {
- a := &netstate.AddrIfc{Addr: &net.IPAddr{IP: net.ParseIP(h)}}
- addrs = append(addrs, a)
+ addrs = append(addrs, netstate.NewNetAddr("ip", h))
}
return addrs
}
@@ -439,11 +438,11 @@
}
defer func() { publisher.Shutdown(); <-stop }()
- ipv4And6 := func(network string, addrs []rpc.Address) ([]rpc.Address, error) {
- accessible := netstate.AddrList(addrs)
+ ipv4And6 := func(network string, addrs []net.Addr) ([]net.Addr, error) {
+ accessible := netstate.ConvertToAddresses(addrs)
ipv4 := accessible.Filter(netstate.IsUnicastIPv4)
ipv6 := accessible.Filter(netstate.IsUnicastIPv6)
- return append(ipv4, ipv6...), nil
+ return append(ipv4.AsNetAddrs(), ipv6.AsNetAddrs()...), nil
}
spec := rpc.ListenSpec{
Addrs: rpc.ListenAddrs{
@@ -481,14 +480,14 @@
t.Fatalf("got %d, want %d", got, want)
}
- n1 := &netstate.AddrIfc{Addr: &net.IPAddr{IP: net.ParseIP("1.1.1.1")}}
- n2 := &netstate.AddrIfc{Addr: &net.IPAddr{IP: net.ParseIP("2.2.2.2")}}
+ n1 := netstate.NewNetAddr("ip", "1.1.1.1")
+ n2 := netstate.NewNetAddr("ip", "2.2.2.2")
watcher := make(chan rpc.NetworkChange, 10)
server.WatchNetwork(watcher)
defer close(watcher)
- roaming <- rpc.NewAddAddrsSetting([]rpc.Address{n1, n2})
+ roaming <- rpc.NewAddAddrsSetting([]net.Addr{n1, n2})
waitForChange := func() *rpc.NetworkChange {
vlog.Infof("Waiting on %p", watcher)
@@ -527,7 +526,7 @@
t.Fatalf("got %d, want %d", got, want)
}
- roaming <- rpc.NewRmAddrsSetting([]rpc.Address{n1})
+ roaming <- rpc.NewRmAddrsSetting([]net.Addr{n1})
// We expect 2 changes, one for each usable listen spec addr.
change = waitForChange()
@@ -561,7 +560,7 @@
t.Fatalf("got %d, want %d: %v", got, want, status.Mounts)
}
- roaming <- rpc.NewAddAddrsSetting([]rpc.Address{n1})
+ roaming <- rpc.NewAddAddrsSetting([]net.Addr{n1})
// We expect 2 changes, one for each usable listen spec addr.
change = waitForChange()
if got, want := len(change.Changed), 2; got != want {
@@ -618,9 +617,9 @@
roaming <- rpc.NewRmAddrsSetting(getIPAddrs(eps))
// Add in two new addresses
- n1 := &netstate.AddrIfc{Addr: &net.IPAddr{IP: net.ParseIP("1.1.1.1")}}
- n2 := &netstate.AddrIfc{Addr: &net.IPAddr{IP: net.ParseIP("2.2.2.2")}}
- roaming <- rpc.NewAddAddrsSetting([]rpc.Address{n1, n2})
+ n1 := netstate.NewNetAddr("ip", "1.1.1.1")
+ n2 := netstate.NewNetAddr("ip", "2.2.2.2")
+ roaming <- rpc.NewAddAddrsSetting([]net.Addr{n1, n2})
neps := make([]naming.Endpoint, 0, len(eps))
for _, p := range getUniqPorts(eps) {
diff --git a/profiles/internal/rpc/sort_endpoints.go b/profiles/internal/rpc/sort_endpoints.go
index 8ea0bd9..57409e7 100644
--- a/profiles/internal/rpc/sort_endpoints.go
+++ b/profiles/internal/rpc/sort_endpoints.go
@@ -200,16 +200,16 @@
// ipNetworks returns the IP networks on this machine.
func ipNetworks() []*net.IPNet {
- ifcs, err := netstate.GetAll()
+ ifcs, err := netstate.GetAllAddresses()
if err != nil {
- vlog.VI(5).Infof("netstate.GetAll failed: %v", err)
+ vlog.VI(5).Infof("netstate.GetAllAddresses failed: %v", err)
return nil
}
ret := make([]*net.IPNet, 0, len(ifcs))
for _, a := range ifcs {
- _, ipnet, err := net.ParseCIDR(a.Address().String())
+ _, ipnet, err := net.ParseCIDR(a.String())
if err != nil {
- vlog.VI(5).Infof("net.ParseCIDR(%q) failed: %v", a.Address(), err)
+ vlog.VI(5).Infof("net.ParseCIDR(%q) failed: %v", a, err)
continue
}
ret = append(ret, ipnet)
diff --git a/profiles/internal/rpc/stream/proxy/proxy.go b/profiles/internal/rpc/stream/proxy/proxy.go
index 09d3586..f35bafb 100644
--- a/profiles/internal/rpc/stream/proxy/proxy.go
+++ b/profiles/internal/rpc/stream/proxy/proxy.go
@@ -10,6 +10,8 @@
"sync"
"time"
+ "v.io/x/lib/netstate"
+
"v.io/v23"
"v.io/v23/context"
"v.io/v23/naming"
@@ -67,6 +69,9 @@
errFailedToFowardOpenFlow = reg(".errFailedToFowardOpenFlow", "failed to forward open flow{:3}")
errServerNotBeingProxied = reg(".errServerNotBeingProxied", "no server with routing id {3} is being proxied")
errServerVanished = reg(".errServerVanished", "server with routing id {3} vanished")
+ errAccessibleAddresses = reg(".errAccessibleAddresses", "failed to obtain a set of accessible addresses{:3}")
+ errNoAccessibleAddresses = reg(".errNoAccessibleAddresses", "no accessible addresses were available for {3}")
+ errEmptyListenSpec = reg(".errEmptyListenSpec", "no addresses supplied in the listen spec")
)
// Proxy routes virtual circuit (VC) traffic between multiple underlying
@@ -182,13 +187,13 @@
// TODO(mattr): This should take a ListenSpec instead of network, address, and
// pubAddress. However using a ListenSpec requires a great deal of supporting
// code that should be refactored out of v.io/x/ref/profiles/internal/rpc/server.go.
-func New(ctx *context.T, network, address, pubAddress string, names ...string) (shutdown func(), endpoint naming.Endpoint, err error) {
+func New(ctx *context.T, spec rpc.ListenSpec, names ...string) (shutdown func(), endpoint naming.Endpoint, err error) {
rid, err := naming.NewRoutingID()
if err != nil {
return nil, nil, err
}
- proxy, err := internalNew(rid, v23.GetPrincipal(ctx), network, address, pubAddress)
+ proxy, err := internalNew(rid, v23.GetPrincipal(ctx), spec)
if err != nil {
return nil, nil, err
}
@@ -217,7 +222,13 @@
return shutdown, proxy.endpoint(), nil
}
-func internalNew(rid naming.RoutingID, principal security.Principal, network, address, pubAddress string) (*Proxy, error) {
+func internalNew(rid naming.RoutingID, principal security.Principal, spec rpc.ListenSpec) (*Proxy, error) {
+ if len(spec.Addrs) == 0 {
+ return nil, verror.New(stream.ErrProxy, nil, verror.New(errEmptyListenSpec, nil))
+ }
+ laddr := spec.Addrs[0]
+ network := laddr.Protocol
+ address := laddr.Address
_, listenFn, _ := rpc.RegisteredProtocol(network)
if listenFn == nil {
return nil, verror.New(stream.ErrProxy, nil, verror.New(errUnknownNetwork, nil, network))
@@ -226,15 +237,23 @@
if err != nil {
return nil, verror.New(stream.ErrProxy, nil, verror.New(errListenFailed, nil, network, address, err))
}
- if len(pubAddress) == 0 {
- pubAddress = ln.Addr().String()
+ pub, _, err := netstate.PossibleAddresses(ln.Addr().Network(), ln.Addr().String(), spec.AddressChooser)
+ if err != nil {
+ ln.Close()
+ return nil, verror.New(stream.ErrProxy, nil, verror.New(errAccessibleAddresses, nil, err))
}
+ if len(pub) == 0 {
+ ln.Close()
+ return nil, verror.New(stream.ErrProxy, nil, verror.New(errNoAccessibleAddresses, nil, ln.Addr().String()))
+ }
+
proxy := &Proxy{
- ln: ln,
- rid: rid,
- servers: &servermap{m: make(map[naming.RoutingID]*server)},
- processes: make(map[*process]struct{}),
- pubAddress: pubAddress,
+ ln: ln,
+ rid: rid,
+ servers: &servermap{m: make(map[naming.RoutingID]*server)},
+ processes: make(map[*process]struct{}),
+ // TODO(cnicolaou): should use all of the available addresses
+ pubAddress: pub[0].String(),
principal: principal,
statsName: naming.Join("rpc", "proxy", "routing-id", rid.String(), "debug"),
}
diff --git a/profiles/internal/rpc/stream/proxy/proxy_test.go b/profiles/internal/rpc/stream/proxy/proxy_test.go
index d94dc12..584806f 100644
--- a/profiles/internal/rpc/stream/proxy/proxy_test.go
+++ b/profiles/internal/rpc/stream/proxy/proxy_test.go
@@ -13,7 +13,11 @@
"testing"
"time"
+ "v.io/x/lib/vlog"
+
+ "v.io/v23"
"v.io/v23/naming"
+ "v.io/v23/verror"
_ "v.io/x/ref/profiles"
inaming "v.io/x/ref/profiles/internal/naming"
@@ -22,14 +26,19 @@
"v.io/x/ref/profiles/internal/rpc/stream/proxy"
"v.io/x/ref/profiles/internal/rpc/stream/vc"
"v.io/x/ref/profiles/internal/rpc/stream/vif"
+ "v.io/x/ref/test"
"v.io/x/ref/test/testutil"
)
//go:generate v23 test generate
func TestProxy(t *testing.T) {
+ ctx, shutdown := test.InitForTest()
+ defer shutdown()
+
pproxy := testutil.NewPrincipal("proxy")
- _, shutdown, proxyEp, err := proxy.InternalNew(naming.FixedRoutingID(0xbbbbbbbbbbbbbbbb), pproxy, "tcp", "127.0.0.1:0", "")
+
+ _, shutdown, proxyEp, err := proxy.InternalNew(naming.FixedRoutingID(0xbbbbbbbbbbbbbbbb), pproxy, v23.GetListenSpec(ctx))
if err != nil {
t.Fatal(err)
}
@@ -37,6 +46,8 @@
principal := testutil.NewPrincipal("test")
blessings := principal.BlessingStore().Default()
+ vlog.Infof("PROXYEP: %s", proxyEp)
+
// Create the stream.Manager for the server.
server1 := manager.InternalNew(naming.FixedRoutingID(0x1111111111111111))
defer server1.Shutdown()
@@ -44,6 +55,7 @@
// through the proxy.
ln1, ep1, err := server1.Listen(proxyEp.Network(), proxyEp.String(), principal, blessings)
if err != nil {
+ t.Logf(verror.DebugString(err))
t.Fatal(err)
}
defer ln1.Close()
@@ -91,8 +103,11 @@
}
func TestDuplicateRoutingID(t *testing.T) {
+ ctx, shutdown := test.InitForTest()
+ defer shutdown()
+
pproxy := testutil.NewPrincipal("proxy")
- _, shutdown, proxyEp, err := proxy.InternalNew(naming.FixedRoutingID(0xbbbbbbbbbbbbbbbb), pproxy, "tcp", "127.0.0.1:0", "")
+ _, shutdown, proxyEp, err := proxy.InternalNew(naming.FixedRoutingID(0xbbbbbbbbbbbbbbbb), pproxy, v23.GetListenSpec(ctx))
if err != nil {
t.Fatal(err)
}
@@ -122,8 +137,11 @@
}
func TestProxyAuthentication(t *testing.T) {
+ ctx, shutdown := test.InitForTest()
+ defer shutdown()
+
pproxy := testutil.NewPrincipal("proxy")
- _, shutdown, proxyEp, err := proxy.InternalNew(naming.FixedRoutingID(0xbbbbbbbbbbbbbbbb), pproxy, "tcp", "127.0.0.1:0", "")
+ _, shutdown, proxyEp, err := proxy.InternalNew(naming.FixedRoutingID(0xbbbbbbbbbbbbbbbb), pproxy, v23.GetListenSpec(ctx))
if err != nil {
t.Fatal(err)
}
@@ -150,13 +168,16 @@
}
func TestServerBlessings(t *testing.T) {
+ ctx, shutdown := test.InitForTest()
+ defer shutdown()
+
var (
pproxy = testutil.NewPrincipal("proxy")
pserver = testutil.NewPrincipal("server")
pclient = testutil.NewPrincipal("client")
)
- _, shutdown, proxyEp, err := proxy.InternalNew(naming.FixedRoutingID(0xbbbbbbbbbbbbbbbb), pproxy, "tcp", "127.0.0.1:0", "")
+ _, shutdown, proxyEp, err := proxy.InternalNew(naming.FixedRoutingID(0xbbbbbbbbbbbbbbbb), pproxy, v23.GetListenSpec(ctx))
if err != nil {
t.Fatal(err)
}
@@ -200,8 +221,11 @@
}
func TestHostPort(t *testing.T) {
+ ctx, shutdown := test.InitForTest()
+ defer shutdown()
+
pproxy := testutil.NewPrincipal("proxy")
- _, shutdown, proxyEp, err := proxy.InternalNew(naming.FixedRoutingID(0xbbbbbbbbbbbbbbbb), pproxy, "tcp", "127.0.0.1:0", "")
+ _, shutdown, proxyEp, err := proxy.InternalNew(naming.FixedRoutingID(0xbbbbbbbbbbbbbbbb), pproxy, v23.GetListenSpec(ctx))
if err != nil {
t.Fatal(err)
}
@@ -220,8 +244,11 @@
}
func TestClientBecomesServer(t *testing.T) {
+ ctx, shutdown := test.InitForTest()
+ defer shutdown()
+
pproxy := testutil.NewPrincipal("proxy")
- _, shutdown, proxyEp, err := proxy.InternalNew(naming.FixedRoutingID(0xbbbbbbbbbbbbbbbb), pproxy, "tcp", "127.0.0.1:0", "")
+ _, shutdown, proxyEp, err := proxy.InternalNew(naming.FixedRoutingID(0xbbbbbbbbbbbbbbbb), pproxy, v23.GetListenSpec(ctx))
if err != nil {
t.Fatal(err)
}
@@ -275,6 +302,9 @@
}
func testProxyIdleTimeout(t *testing.T, testServer bool) {
+ ctx, shutdown := test.InitForTest()
+ defer shutdown()
+
const (
idleTime = 10 * time.Millisecond
// We use a long wait time here since it takes some time to handle VC close
@@ -299,7 +329,7 @@
// Pause the idle timers.
triggerTimers := vif.SetFakeTimers()
- Proxy, shutdown, proxyEp, err := proxy.InternalNew(naming.FixedRoutingID(0xbbbbbbbbbbbbbbbb), pproxy, "tcp", "127.0.0.1:0", "")
+ Proxy, shutdown, proxyEp, err := proxy.InternalNew(naming.FixedRoutingID(0xbbbbbbbbbbbbbbbb), pproxy, v23.GetListenSpec(ctx))
if err != nil {
t.Fatal(err)
}
diff --git a/profiles/internal/rpc/stream/proxy/testutil_test.go b/profiles/internal/rpc/stream/proxy/testutil_test.go
index 1e469b2..8a1568c 100644
--- a/profiles/internal/rpc/stream/proxy/testutil_test.go
+++ b/profiles/internal/rpc/stream/proxy/testutil_test.go
@@ -6,13 +6,17 @@
import (
"v.io/v23/naming"
+ "v.io/v23/rpc"
"v.io/v23/security"
)
// These are the internal functions only for use in the proxy_test package.
-func InternalNew(rid naming.RoutingID, p security.Principal, net, addr, pubAddr string) (*Proxy, func(), naming.Endpoint, error) {
- proxy, err := internalNew(rid, p, net, addr, pubAddr)
+func InternalNew(rid naming.RoutingID, p security.Principal, spec rpc.ListenSpec) (*Proxy, func(), naming.Endpoint, error) {
+ proxy, err := internalNew(rid, p, spec)
+ if err != nil {
+ return nil, nil, nil, err
+ }
return proxy, proxy.shutdown, proxy.endpoint(), err
}
diff --git a/profiles/internal/rpc/test/proxy_test.go b/profiles/internal/rpc/test/proxy_test.go
index ab8c7e9..bd326f5 100644
--- a/profiles/internal/rpc/test/proxy_test.go
+++ b/profiles/internal/rpc/test/proxy_test.go
@@ -14,6 +14,8 @@
"testing"
"time"
+ "v.io/x/lib/vlog"
+
"v.io/v23"
"v.io/v23/context"
"v.io/v23/namespace"
@@ -52,15 +54,14 @@
defer shutdown()
expected := len(args)
- listenSpec := v23.GetListenSpec(ctx)
- protocol := listenSpec.Addrs[0].Protocol
- addr := listenSpec.Addrs[0].Address
- proxyShutdown, proxyEp, err := proxy.New(ctx, protocol, addr, "")
+
+ listenSpec := rpc.ListenSpec{Addrs: rpc.ListenAddrs{{"tcp", "127.0.0.1:0"}}}
+ proxyShutdown, proxyEp, err := proxy.New(ctx, listenSpec)
if err != nil {
+ fmt.Fprintf(stderr, "%s\n", verror.DebugString(err))
return err
}
defer proxyShutdown()
-
fmt.Fprintf(stdout, "PID=%d\n", os.Getpid())
if expected > 0 {
pub := publisher.New(ctx, v23.GetNamespace(ctx), time.Minute)
@@ -128,6 +129,7 @@
p.ReadLine()
h.name = p.ExpectVar("PROXY_NAME")
if len(h.name) == 0 {
+ h.proxy.Shutdown(os.Stderr, os.Stderr)
t.Fatalf("failed to get PROXY_NAME from proxyd")
}
return h.ns.Mount(ctx, "proxy", h.name, time.Hour)
@@ -150,14 +152,18 @@
}
func TestProxy(t *testing.T) {
- proxyListenSpec := rpc.ListenSpec{Addrs: rpc.ListenAddrs{{"tcp", "127.0.0.1:0"}}}
- proxyListenSpec.Proxy = "proxy"
+ proxyListenSpec := rpc.ListenSpec{
+ Addrs: rpc.ListenAddrs{{"tcp", "127.0.0.1:0"}},
+ Proxy: "proxy",
+ }
testProxy(t, proxyListenSpec)
}
func TestWSProxy(t *testing.T) {
- proxyListenSpec := rpc.ListenSpec{Addrs: rpc.ListenAddrs{{"tcp", "127.0.0.1:0"}}}
- proxyListenSpec.Proxy = "proxy"
+ proxyListenSpec := rpc.ListenSpec{
+ Addrs: rpc.ListenAddrs{{"tcp", "127.0.0.1:0"}},
+ Proxy: "proxy",
+ }
// The proxy uses websockets only, but the server is using tcp.
testProxy(t, proxyListenSpec, "--v23.tcp.protocol=ws")
}
@@ -165,6 +171,7 @@
func testProxy(t *testing.T, spec rpc.ListenSpec, args ...string) {
ctx, shutdown := testContext()
defer shutdown()
+
var (
pserver = testutil.NewPrincipal("server")
pclient = testutil.NewPrincipal("client")
@@ -237,12 +244,18 @@
then := time.Now().Add(time.Minute)
for {
me, err := ns.Resolve(ctx, name)
+ if err != nil {
+ continue
+ }
+ for i, s := range me.Servers {
+ vlog.Infof("%d: %s", i, s)
+ }
if err == nil && len(me.Servers) == expect {
ch <- 1
return
}
if time.Now().After(then) {
- t.Fatalf("timed out")
+ t.Fatalf("timed out waiting for %d servers, found %d", expect, len(me.Servers))
}
time.Sleep(100 * time.Millisecond)
}
diff --git a/profiles/internal/util.go b/profiles/internal/util.go
index 600ff0a..a530087 100644
--- a/profiles/internal/util.go
+++ b/profiles/internal/util.go
@@ -6,10 +6,10 @@
import (
"fmt"
+ "net"
"os"
"strings"
- "v.io/v23/rpc"
"v.io/v23/verror"
"v.io/x/lib/vlog"
@@ -46,11 +46,11 @@
// IPAddressChooser returns the preferred IP address, which is,
// a public IPv4 address, then any non-loopback IPv4, then a public
// IPv6 address and finally any non-loopback/link-local IPv6
-func IPAddressChooser(network string, addrs []rpc.Address) ([]rpc.Address, error) {
+func IPAddressChooser(network string, addrs []net.Addr) ([]net.Addr, error) {
if !netstate.IsIPProtocol(network) {
return nil, fmt.Errorf("can't support network protocol %q", network)
}
- accessible := netstate.AddrList(addrs)
+ accessible := netstate.ConvertToAddresses(addrs)
// Try and find an address on a interface with a default route.
// We give preference to IPv4 over IPv6 for compatibility for now.
@@ -65,7 +65,7 @@
if addrs := accessible.Filter(predicate); len(addrs) > 0 {
onDefaultRoutes := addrs.Filter(netstate.IsOnDefaultRoute)
if len(onDefaultRoutes) > 0 {
- return onDefaultRoutes, nil
+ return onDefaultRoutes.AsNetAddrs(), nil
}
}
}
@@ -74,11 +74,10 @@
// but without the default route requirement.
for _, predicate := range predicates {
if addrs := accessible.Filter(predicate); len(addrs) > 0 {
- return addrs, nil
+ return addrs.AsNetAddrs(), nil
}
}
-
- return nil, fmt.Errorf("failed to find any usable address for %q", network)
+ return []net.Addr{}, nil
}
// HasPublicIP returns true if the host has at least one public IP address.
diff --git a/profiles/proxy.go b/profiles/proxy.go
index 3392743..0a12b4e 100644
--- a/profiles/proxy.go
+++ b/profiles/proxy.go
@@ -7,12 +7,13 @@
import (
"v.io/v23/context"
"v.io/v23/naming"
+ "v.io/v23/rpc"
"v.io/x/ref/profiles/internal/rpc/stream/proxy"
)
// NewProxy creates a new Proxy that listens for network connections on the provided
// (network, address) pair and routes VC traffic between accepted connections.
-func NewProxy(ctx *context.T, network, address, pubAddress string, names ...string) (shutdown func(), endpoint naming.Endpoint, err error) {
- return proxy.New(ctx, network, address, pubAddress, names...)
+func NewProxy(ctx *context.T, spec rpc.ListenSpec, names ...string) (shutdown func(), endpoint naming.Endpoint, err error) {
+ return proxy.New(ctx, spec, names...)
}
diff --git a/profiles/roaming/net_watcher.go b/profiles/roaming/net_watcher.go
index 579fafb..f4880d4 100644
--- a/profiles/roaming/net_watcher.go
+++ b/profiles/roaming/net_watcher.go
@@ -8,12 +8,14 @@
import (
"fmt"
+ "os"
"strings"
+ "v.io/x/lib/netstate"
+
"v.io/v23"
"v.io/v23/config"
- "v.io/x/lib/netstate"
"v.io/x/ref/profiles/roaming"
)
@@ -25,8 +27,26 @@
fmt.Println("Profile: ", profileName)
accessible, err := netstate.GetAccessibleIPs()
- routes := netstate.GetRoutes()
- fmt.Printf("Routes:\n%s\n", strings.Replace(routes.String(), ")", ")\n", -1))
+ interfaces, err := netstate.GetAllInterfaces()
+
+ fmt.Printf("Addresses\n")
+ for _, addr := range accessible {
+ fmt.Printf("%s\n", addr.DebugString())
+ }
+
+ fmt.Printf("\nInterfaces\n")
+ for _, ifc := range interfaces {
+ fmt.Printf("%s\n", ifc)
+ }
+
+ fmt.Printf("\nRoutes\n")
+ for _, ifc := range interfaces {
+ if ipifc, ok := ifc.(netstate.IPNetworkInterface); ok {
+ if routes := ipifc.IPRoutes(); len(routes) > 0 {
+ fmt.Printf("%s: %s\n", ifc.Name(), routes)
+ }
+ }
+ }
listenSpec := v23.GetListenSpec(ctx)
chooser := listenSpec.AddressChooser
@@ -36,17 +56,17 @@
}
}
- if chosen, err := listenSpec.AddressChooser("tcp", accessible); err != nil {
+ if chosen, err := listenSpec.AddressChooser("tcp", accessible.AsNetAddrs()); err != nil {
fmt.Printf("Failed to chosen address %s\n", err)
} else {
- al := netstate.AddrList(chosen)
+ al := netstate.ConvertToAddresses(chosen)
fmt.Printf("Chosen:\n%s\n", strings.Replace(al.String(), ") ", ")\n", -1))
}
ch := make(chan config.Setting, 10)
- settings, err := v23.GetPublisher(ctx).ForkStream(roaming.SettingsStreamName, ch)
+ settings, err := listenSpec.StreamPublisher.ForkStream(roaming.SettingsStreamName, ch)
if err != nil {
- r.Logger().Infof("failed to fork stream: %s", err)
+ fmt.Fprintf(os.Stderr, "failed to fork stream: %s\n", err)
}
for _, setting := range settings.Latest {
fmt.Println("Setting: ", setting)
diff --git a/profiles/roaming/proxy.go b/profiles/roaming/proxy.go
index ed9a6b6..845ad11 100644
--- a/profiles/roaming/proxy.go
+++ b/profiles/roaming/proxy.go
@@ -7,12 +7,13 @@
import (
"v.io/v23/context"
"v.io/v23/naming"
+ "v.io/v23/rpc"
"v.io/x/ref/profiles/internal/rpc/stream/proxy"
)
// NewProxy creates a new Proxy that listens for network connections on the provided
// (network, address) pair and routes VC traffic between accepted connections.
-func NewProxy(ctx *context.T, network, address, pubAddress string, names ...string) (shutdown func(), endpoint naming.Endpoint, err error) {
- return proxy.New(ctx, network, address, pubAddress, names...)
+func NewProxy(ctx *context.T, spec rpc.ListenSpec, names ...string) (shutdown func(), endpoint naming.Endpoint, err error) {
+ return proxy.New(ctx, spec, names...)
}
diff --git a/profiles/roaming/roaminginit.go b/profiles/roaming/roaminginit.go
index f09f1c6..4996320 100644
--- a/profiles/roaming/roaminginit.go
+++ b/profiles/roaming/roaminginit.go
@@ -15,14 +15,17 @@
import (
"flag"
+ "net"
+
+ "v.io/x/lib/netconfig"
+ "v.io/x/lib/netstate"
+ "v.io/x/lib/vlog"
"v.io/v23"
"v.io/v23/config"
"v.io/v23/context"
"v.io/v23/rpc"
- "v.io/x/lib/netconfig"
- "v.io/x/lib/netstate"
- "v.io/x/lib/vlog"
+
"v.io/x/ref/lib/flags"
"v.io/x/ref/lib/security/securityflag"
"v.io/x/ref/profiles/internal"
@@ -65,8 +68,11 @@
// 1:1 NAT configuration.
if !internal.HasPublicIP(vlog.Log) {
if addr := internal.GCEPublicAddress(vlog.Log); addr != nil {
- listenSpec.AddressChooser = func(string, []rpc.Address) ([]rpc.Address, error) {
- return []rpc.Address{&netstate.AddrIfc{addr, "nat", nil}}, nil
+ listenSpec.AddressChooser = func(string, []net.Addr) ([]net.Addr, error) {
+ // TODO(cnicolaou): the protocol at least should
+ // be configurable, or maybe there's a profile specific
+ // flag to configure both the protocol and address.
+ return []net.Addr{netstate.NewNetAddr("wsh", addr.String())}, nil
}
runtime, ctx, shutdown, err := rt.Init(ctx, ac, nil, &listenSpec, commonFlags.RuntimeFlags(), reservedDispatcher)
if err != nil {
@@ -146,6 +152,7 @@
for {
select {
case <-watcher.Channel():
+ netstate.InvalidateCache()
cur, err := netstate.GetAccessibleIPs()
if err != nil {
vlog.Errorf("failed to read network state: %s", err)
@@ -163,10 +170,10 @@
}
if len(removed) > 0 {
vlog.VI(2).Infof("Sending removed: %s", removed)
- ch <- rpc.NewRmAddrsSetting(removed)
+ ch <- rpc.NewRmAddrsSetting(removed.AsNetAddrs())
}
// We will always send the best currently available address
- if chosen, err := listenSpec.AddressChooser(listenSpec.Addrs[0].Protocol, cur); err == nil && chosen != nil {
+ if chosen, err := listenSpec.AddressChooser(listenSpec.Addrs[0].Protocol, cur.AsNetAddrs()); err == nil && chosen != nil {
vlog.VI(2).Infof("Sending added and chosen: %s", chosen)
ch <- rpc.NewAddAddrsSetting(chosen)
} else {
diff --git a/profiles/static/proxy.go b/profiles/static/proxy.go
index e2617e0..a398460 100644
--- a/profiles/static/proxy.go
+++ b/profiles/static/proxy.go
@@ -7,12 +7,13 @@
import (
"v.io/v23/context"
"v.io/v23/naming"
+ "v.io/v23/rpc"
"v.io/x/ref/profiles/internal/rpc/stream/proxy"
)
// NewProxy creates a new Proxy that listens for network connections on the provided
// (network, address) pair and routes VC traffic between accepted connections.
-func NewProxy(ctx *context.T, network, address, pubAddress string, names ...string) (shutdown func(), endpoint naming.Endpoint, err error) {
- return proxy.New(ctx, network, address, pubAddress, names...)
+func NewProxy(ctx *context.T, spec rpc.ListenSpec, names ...string) (shutdown func(), endpoint naming.Endpoint, err error) {
+ return proxy.New(ctx, spec, names...)
}
diff --git a/profiles/static/staticinit.go b/profiles/static/staticinit.go
index 1908308..339e32a 100644
--- a/profiles/static/staticinit.go
+++ b/profiles/static/staticinit.go
@@ -7,12 +7,14 @@
import (
"flag"
+ "net"
+
+ "v.io/x/lib/vlog"
"v.io/v23"
"v.io/v23/context"
"v.io/v23/rpc"
- "v.io/x/lib/netstate"
- "v.io/x/lib/vlog"
+
"v.io/x/ref/lib/flags"
"v.io/x/ref/lib/security/securityflag"
"v.io/x/ref/profiles/internal"
@@ -48,11 +50,12 @@
ac := appcycle.New()
// Our address is private, so we test for running on GCE and for its 1:1 NAT
- // configuration. GCEPublicAddress returns a non-nil addr if we are running on GCE.
+ // configuration. GCEPublicAddress returns a non-nil addr if we are
+ // running on GCE.
if !internal.HasPublicIP(vlog.Log) {
if addr := internal.GCEPublicAddress(vlog.Log); addr != nil {
- listenSpec.AddressChooser = func(string, []rpc.Address) ([]rpc.Address, error) {
- return []rpc.Address{&netstate.AddrIfc{addr, "nat", nil}}, nil
+ listenSpec.AddressChooser = func(string, []net.Addr) ([]net.Addr, error) {
+ return []net.Addr{addr}, nil
}
runtime, ctx, shutdown, err := rt.Init(ctx, ac, nil, &listenSpec, commonFlags.RuntimeFlags(), reservedDispatcher)
if err != nil {