Merge branch 'master' into discovery
Change-Id: Ic9c22b988ef0b87d19fa9842ee57826a4ee5457b
diff --git a/lib/discovery/util/advertise.go b/lib/discovery/util/advertise.go
index 5835552..93d6a40 100644
--- a/lib/discovery/util/advertise.go
+++ b/lib/discovery/util/advertise.go
@@ -27,13 +27,9 @@
service.InstanceUuid = idiscovery.NewInstanceUUID()
}
- watcher := make(chan rpc.NetworkChange, 3)
- server.WatchNetwork(watcher)
-
- stop, err := advertise(ctx, service, getEndpoints(server), suffix, visibility)
+ eps, valid := getEndpoints(server)
+ stop, err := advertise(ctx, service, eps, suffix, visibility)
if err != nil {
- server.UnwatchNetwork(watcher)
- close(watcher)
return nil, err
}
@@ -41,17 +37,16 @@
go func() {
for {
select {
- case <-watcher:
+ case <-valid:
if stop != nil {
stop() // Stop the previous advertisement.
}
- stop, err = advertise(ctx, service, getEndpoints(server), suffix, visibility)
+ eps, valid = getEndpoints(server)
+ stop, err = advertise(ctx, service, eps, suffix, visibility)
if err != nil {
ctx.Error(err)
}
case <-ctx.Done():
- server.UnwatchNetwork(watcher)
- close(watcher)
close(done)
return
}
@@ -81,11 +76,11 @@
}
// TODO(suharshs): Use server.Status().Endpoints only when migrating to a new server.
-func getEndpoints(server rpc.Server) []naming.Endpoint {
+func getEndpoints(server rpc.Server) ([]naming.Endpoint, <-chan struct{}) {
status := server.Status()
eps := status.Endpoints
for _, p := range status.Proxies {
eps = append(eps, p.Endpoint)
}
- return eps
+ return eps, status.Valid
}
diff --git a/lib/discovery/util/advertise_test.go b/lib/discovery/util/advertise_test.go
index b3ff757..96b55de 100644
--- a/lib/discovery/util/advertise_test.go
+++ b/lib/discovery/util/advertise_test.go
@@ -7,6 +7,7 @@
import (
"fmt"
"reflect"
+ "sync"
"testing"
"time"
@@ -25,40 +26,37 @@
)
type mockServer struct {
- eps []naming.Endpoint
- watcher chan<- rpc.NetworkChange
- watcherClosedCh chan struct{}
+ mu sync.Mutex
+ eps []naming.Endpoint
+ valid chan struct{}
}
-func (s *mockServer) AddName(string) error { return nil }
-func (s *mockServer) RemoveName(string) {}
-func (s *mockServer) Stop() error { return nil }
-func (s *mockServer) Closed() <-chan struct{} { return nil }
-func (s *mockServer) Status() rpc.ServerStatus { return rpc.ServerStatus{Endpoints: s.eps} }
-
-func (s *mockServer) WatchNetwork(ch chan<- rpc.NetworkChange) {
- s.watcher = ch
- s.watcherClosedCh = make(chan struct{})
-}
-
-func (s *mockServer) UnwatchNetwork(ch chan<- rpc.NetworkChange) {
- s.watcher = nil
- close(s.watcherClosedCh)
-}
-
-func (s *mockServer) updateNetwork(eps []naming.Endpoint) {
- s.eps = eps
- if s.watcher != nil {
- s.watcher <- rpc.NetworkChange{Changed: eps}
+func (s *mockServer) AddName(string) error { return nil }
+func (s *mockServer) RemoveName(string) {}
+func (s *mockServer) Stop() error { return nil }
+func (s *mockServer) Closed() <-chan struct{} { return nil }
+func (s *mockServer) Status() rpc.ServerStatus {
+ defer s.mu.Unlock()
+ s.mu.Lock()
+ return rpc.ServerStatus{
+ Endpoints: s.eps,
+ Valid: s.valid,
}
}
-func (s *mockServer) watcherClosed() <-chan struct{} {
- return s.watcherClosedCh
+func (s *mockServer) updateNetwork(eps []naming.Endpoint) {
+ defer s.mu.Unlock()
+ s.mu.Lock()
+ s.eps = eps
+ close(s.valid)
+ s.valid = make(chan struct{})
}
func newMockServer(eps []naming.Endpoint) *mockServer {
- return &mockServer{eps: eps}
+ return &mockServer{
+ eps: eps,
+ valid: make(chan struct{}),
+ }
}
func newEndpoints(addrs ...string) []naming.Endpoint {
@@ -84,7 +82,6 @@
eps := newEndpoints("addr1:123")
mock := newMockServer(eps)
- ctx, cancel := context.WithCancel(ctx)
util.AdvertiseServer(ctx, mock, suffix, service, nil)
if err := scanAndMatch(ctx, service, eps, suffix); err != nil {
t.Error(err)
@@ -101,16 +98,6 @@
t.Error(err)
}
}
-
- // Make sure that the network watcher is unregistered when the context
- // is canceled.
- cancel()
-
- select {
- case <-mock.watcherClosed():
- case <-time.After(3 * time.Second):
- t.Error("watcher not closed")
- }
}
func TestNetworkChangeInstanceUuid(t *testing.T) {
diff --git a/runtime/factories/android/android.go b/runtime/factories/android/android.go
index 8704541..759c752 100644
--- a/runtime/factories/android/android.go
+++ b/runtime/factories/android/android.go
@@ -16,9 +16,6 @@
"flag"
"os"
- "v.io/x/lib/netconfig"
- "v.io/x/lib/netstate"
-
"v.io/v23"
"v.io/v23/context"
"v.io/v23/discovery"
@@ -37,9 +34,9 @@
_ "v.io/x/ref/runtime/internal/flow/protocols/ws"
_ "v.io/x/ref/runtime/internal/flow/protocols/wsh"
"v.io/x/ref/runtime/internal/lib/appcycle"
+ "v.io/x/ref/runtime/internal/lib/roaming"
"v.io/x/ref/runtime/internal/lib/websocket"
"v.io/x/ref/runtime/internal/lib/xwebsocket"
- irpc "v.io/x/ref/runtime/internal/rpc"
"v.io/x/ref/runtime/internal/rt"
"v.io/x/ref/services/debug/debuglib"
@@ -50,11 +47,6 @@
"v.io/x/ref/lib/discovery/plugins/mdns"
)
-const (
- SettingsStreamName = "roaming"
- SettingsStreamDesc = "pubsub stream used by the roaming RuntimeFactory"
-)
-
var commonFlags *flags.Flags
var bleCreator func(string) (idiscovery.Plugin, error)
@@ -93,107 +85,26 @@
publisher := pubsub.NewPublisher()
- // Create stream in Init function to avoid a race between any
- // goroutines started here and consumers started after Init returns.
- ch := make(chan pubsub.Setting)
- // TODO(cnicolaou): use stop to shutdown this stream when the RuntimeFactory shutdowns.
- stop, err := publisher.CreateStream(SettingsStreamName, SettingsStreamDesc, ch)
+ // TODO(suharshs): We can remove the SettingName argument after the transition to new RPC.
+ runtime, ctx, shutdown, err := rt.Init(ctx, ac, discovery, nil, &listenSpec, publisher, roaming.RoamingSetting, commonFlags.RuntimeFlags(), reservedDispatcher)
if err != nil {
ishutdown()
return nil, nil, nil, err
}
- prev, err := netstate.GetAccessibleIPs()
+ stopRoaming, err := roaming.CreateRoamingStream(ctx, publisher, listenSpec)
if err != nil {
- ishutdown()
return nil, nil, nil, err
}
- // Start the dhcp watcher.
- watcher, err := netconfig.NewNetConfigWatcher()
- if err != nil {
- ishutdown()
- return nil, nil, nil, err
- }
-
- cleanupCh := make(chan struct{})
- watcherCh := make(chan struct{})
-
- runtime, ctx, shutdown, err := rt.Init(ctx, ac, discovery, nil, &listenSpec, publisher, SettingsStreamName, commonFlags.RuntimeFlags(), reservedDispatcher)
- if err != nil {
- ishutdown()
- return nil, nil, nil, err
- }
-
- go monitorNetworkSettingsX(runtime, ctx, watcher, prev, stop, cleanupCh, watcherCh, ch)
runtimeFactoryShutdown := func() {
- close(cleanupCh)
ishutdown()
shutdown()
- <-watcherCh
+ stopRoaming()
}
return runtime, ctx, runtimeFactoryShutdown, nil
}
-// monitorNetworkSettings will monitor network configuration changes and
-// publish subsequent Settings to reflect any changes detected.
-func monitorNetworkSettingsX(
- runtime *rt.Runtime,
- ctx *context.T,
- watcher netconfig.NetConfigWatcher,
- prev netstate.AddrList,
- pubStop, cleanup <-chan struct{},
- watcherLoop chan<- struct{},
- ch chan<- pubsub.Setting) {
- defer close(ch)
-
- listenSpec := runtime.GetListenSpec(ctx)
-
- // TODO(cnicolaou): add support for listening on multiple network addresses.
-
-done:
- for {
- select {
- case <-watcher.Channel():
- netstate.InvalidateCache()
- cur, err := netstate.GetAccessibleIPs()
- if err != nil {
- ctx.Errorf("failed to read network state: %s", err)
- continue
- }
- removed := netstate.FindRemoved(prev, cur)
- added := netstate.FindAdded(prev, cur)
- ctx.VI(2).Infof("Previous: %d: %s", len(prev), prev)
- ctx.VI(2).Infof("Current : %d: %s", len(cur), cur)
- ctx.VI(2).Infof("Added : %d: %s", len(added), added)
- ctx.VI(2).Infof("Removed : %d: %s", len(removed), removed)
- if len(removed) == 0 && len(added) == 0 {
- ctx.VI(2).Infof("Network event that lead to no address changes since our last 'baseline'")
- continue
- }
- if len(removed) > 0 {
- ctx.VI(2).Infof("Sending removed: %s", removed)
- ch <- irpc.NewRmAddrsSetting(removed.AsNetAddrs())
- }
- // We will always send the best currently available address
- if chosen, err := listenSpec.AddressChooser.ChooseAddresses(listenSpec.Addrs[0].Protocol, cur.AsNetAddrs()); err == nil && chosen != nil {
- ctx.VI(2).Infof("Sending added and chosen: %s", chosen)
- ch <- irpc.NewNewAddrsSetting(chosen)
- } else {
- ctx.VI(2).Infof("Ignoring added %s", added)
- }
- prev = cur
- case <-cleanup:
- break done
- case <-pubStop:
- goto done
- }
- }
- watcher.Stop()
- close(watcherLoop)
-}
-
-
func createDiscovery() (discovery.T, error) {
plugins := make([]idiscovery.Plugin, 2)
host, _ := os.Hostname()
diff --git a/runtime/factories/roaming/roaming.go b/runtime/factories/roaming/roaming.go
index a015e18..7a2703d 100644
--- a/runtime/factories/roaming/roaming.go
+++ b/runtime/factories/roaming/roaming.go
@@ -16,9 +16,6 @@
import (
"flag"
- "v.io/x/lib/netconfig"
- "v.io/x/lib/netstate"
-
"v.io/v23"
"v.io/v23/context"
"v.io/v23/flow"
@@ -34,9 +31,9 @@
_ "v.io/x/ref/runtime/internal/flow/protocols/ws"
_ "v.io/x/ref/runtime/internal/flow/protocols/wsh"
"v.io/x/ref/runtime/internal/lib/appcycle"
+ "v.io/x/ref/runtime/internal/lib/roaming"
"v.io/x/ref/runtime/internal/lib/websocket"
"v.io/x/ref/runtime/internal/lib/xwebsocket"
- irpc "v.io/x/ref/runtime/internal/rpc"
"v.io/x/ref/runtime/internal/rt"
"v.io/x/ref/services/debug/debuglib"
@@ -46,11 +43,6 @@
_ "v.io/x/ref/runtime/internal/rpc/protocols/wsh"
)
-const (
- SettingsStreamName = "roaming"
- SettingsStreamDesc = "pubsub stream used by the roaming RuntimeFactory"
-)
-
var commonFlags *flags.Flags
func init() {
@@ -87,102 +79,22 @@
publisher := pubsub.NewPublisher()
- // Create stream in Init function to avoid a race between any
- // goroutines started here and consumers started after Init returns.
- ch := make(chan pubsub.Setting)
- // TODO(cnicolaou): use stop to shutdown this stream when the RuntimeFactory shutdowns.
- stop, err := publisher.CreateStream(SettingsStreamName, SettingsStreamDesc, ch)
+ // TODO(suharshs): We can remove the SettingName argument after the transition to new RPC.
+ runtime, ctx, shutdown, err := rt.Init(ctx, ac, discovery, nil, &listenSpec, publisher, roaming.RoamingSetting, commonFlags.RuntimeFlags(), reservedDispatcher)
if err != nil {
ishutdown()
return nil, nil, nil, err
}
- prev, err := netstate.GetAccessibleIPs()
+ stopRoaming, err := roaming.CreateRoamingStream(ctx, publisher, listenSpec)
if err != nil {
- ishutdown()
return nil, nil, nil, err
}
- // Start the dhcp watcher.
- watcher, err := netconfig.NewNetConfigWatcher()
- if err != nil {
- ishutdown()
- return nil, nil, nil, err
- }
-
- cleanupCh := make(chan struct{})
- watcherCh := make(chan struct{})
-
- runtime, ctx, shutdown, err := rt.Init(ctx, ac, discovery, nil, &listenSpec, publisher, SettingsStreamName, commonFlags.RuntimeFlags(), reservedDispatcher)
- if err != nil {
- ishutdown()
- return nil, nil, nil, err
- }
-
- go monitorNetworkSettingsX(runtime, ctx, watcher, prev, stop, cleanupCh, watcherCh, ch)
runtimeFactoryShutdown := func() {
- close(cleanupCh)
ishutdown()
shutdown()
- <-watcherCh
+ stopRoaming()
}
return runtime, ctx, runtimeFactoryShutdown, nil
}
-
-// monitorNetworkSettings will monitor network configuration changes and
-// publish subsequent Settings to reflect any changes detected.
-func monitorNetworkSettingsX(
- runtime *rt.Runtime,
- ctx *context.T,
- watcher netconfig.NetConfigWatcher,
- prev netstate.AddrList,
- pubStop, cleanup <-chan struct{},
- watcherLoop chan<- struct{},
- ch chan<- pubsub.Setting) {
- defer close(ch)
-
- listenSpec := runtime.GetListenSpec(ctx)
-
- // TODO(cnicolaou): add support for listening on multiple network addresses.
-
-done:
- for {
- select {
- case <-watcher.Channel():
- netstate.InvalidateCache()
- cur, err := netstate.GetAccessibleIPs()
- if err != nil {
- ctx.Errorf("failed to read network state: %s", err)
- continue
- }
- removed := netstate.FindRemoved(prev, cur)
- added := netstate.FindAdded(prev, cur)
- ctx.VI(2).Infof("Previous: %d: %s", len(prev), prev)
- ctx.VI(2).Infof("Current : %d: %s", len(cur), cur)
- ctx.VI(2).Infof("Added : %d: %s", len(added), added)
- ctx.VI(2).Infof("Removed : %d: %s", len(removed), removed)
- if len(removed) == 0 && len(added) == 0 {
- ctx.VI(2).Infof("Network event that lead to no address changes since our last 'baseline'")
- continue
- }
- if len(removed) > 0 {
- ctx.VI(2).Infof("Sending removed: %s", removed)
- ch <- irpc.NewRmAddrsSetting(removed.AsNetAddrs())
- }
- // We will always send the best currently available address
- if chosen, err := listenSpec.AddressChooser.ChooseAddresses(listenSpec.Addrs[0].Protocol, cur.AsNetAddrs()); err == nil && chosen != nil {
- ctx.VI(2).Infof("Sending added and chosen: %s", chosen)
- ch <- irpc.NewNewAddrsSetting(chosen)
- } else {
- ctx.VI(2).Infof("Ignoring added %s", added)
- }
- prev = cur
- case <-cleanup:
- break done
- case <-pubStop:
- goto done
- }
- }
- watcher.Stop()
- close(watcherLoop)
-}
diff --git a/runtime/factories/roaming/roaming_server.go b/runtime/factories/roaming/roaming_server.go
deleted file mode 100644
index b8e5170..0000000
--- a/runtime/factories/roaming/roaming_server.go
+++ /dev/null
@@ -1,47 +0,0 @@
-// Copyright 2015 The Vanadium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-// +build ignore
-
-package main
-
-import (
- "fmt"
-
- "v.io/v23"
- "v.io/v23/rpc"
- _ "v.io/x/ref/runtime/factories/roaming"
-)
-
-func main() {
- ctx, shutdown := v23.Init()
- defer shutdown()
-
- ctx, server, err := v23.WithNewServer(ctx, "roamer", &dummy{}, nil)
- if err != nil {
- ctx.Fatalf("unexpected error: %q", err)
- }
- watcher := make(chan rpc.NetworkChange, 1)
- server.WatchNetwork(watcher)
-
- for {
- status := server.Status()
- fmt.Printf("Endpoints: %d created:\n", len(status.Endpoints))
- for _, ep := range status.Endpoints {
- fmt.Printf(" %s\n", ep)
- }
- fmt.Printf("Mounts: %d mounts:\n", len(status.Mounts))
- for _, ms := range status.Mounts {
- fmt.Printf(" %s\n", ms)
- }
- change := <-watcher
- fmt.Printf("Network change: %s", change.DebugString())
- }
-}
-
-type dummy struct{}
-
-func (d *dummy) Echo(call rpc.ServerCall, arg string) (string, error) {
- return arg, nil
-}
diff --git a/runtime/internal/flow/manager/manager.go b/runtime/internal/flow/manager/manager.go
index 01ebcc8..aa41575 100644
--- a/runtime/internal/flow/manager/manager.go
+++ b/runtime/internal/flow/manager/manager.go
@@ -19,9 +19,12 @@
"v.io/v23/naming"
"v.io/v23/security"
+ "v.io/x/lib/netstate"
+ "v.io/x/ref/lib/pubsub"
iflow "v.io/x/ref/runtime/internal/flow"
"v.io/x/ref/runtime/internal/flow/conn"
"v.io/x/ref/runtime/internal/flow/protocols/bidi"
+ "v.io/x/ref/runtime/internal/lib/roaming"
"v.io/x/ref/runtime/internal/lib/upcqueue"
inaming "v.io/x/ref/runtime/internal/naming"
"v.io/x/ref/runtime/internal/rpc/version"
@@ -43,7 +46,27 @@
serverNames []string
}
-func NewWithBlessings(ctx *context.T, serverBlessings security.Blessings, rid naming.RoutingID) flow.Manager {
+type listenState struct {
+ q *upcqueue.T
+ listenLoops sync.WaitGroup
+ dhcpPublisher *pubsub.Publisher
+
+ mu sync.Mutex
+ stopProxy chan struct{}
+ listeners []flow.Listener
+ endpoints []*endpointState
+ proxyEndpoints []naming.Endpoint
+ notifyWatchers chan struct{}
+ roaming bool
+}
+
+type endpointState struct {
+ leps []*inaming.Endpoint // the list of currently active endpoints.
+ tmplEndpoint *inaming.Endpoint // endpoint used as a template for creating new endpoints from the network interfaces provided from roaming.
+ roaming bool
+}
+
+func NewWithBlessings(ctx *context.T, serverBlessings security.Blessings, rid naming.RoutingID, dhcpPublisher *pubsub.Publisher) flow.Manager {
m := &manager{
rid: rid,
closed: make(chan struct{}),
@@ -54,9 +77,11 @@
m.serverBlessings = serverBlessings
m.serverNames = security.BlessingNames(v23.GetPrincipal(ctx), serverBlessings)
m.ls = &listenState{
- q: upcqueue.New(),
- listeners: []flow.Listener{},
- stopProxy: make(chan struct{}),
+ q: upcqueue.New(),
+ listeners: []flow.Listener{},
+ stopProxy: make(chan struct{}),
+ notifyWatchers: make(chan struct{}),
+ dhcpPublisher: dhcpPublisher,
}
}
go func() {
@@ -78,22 +103,12 @@
return m
}
-func New(ctx *context.T, rid naming.RoutingID) flow.Manager {
+func New(ctx *context.T, rid naming.RoutingID, dhcpPublisher *pubsub.Publisher) flow.Manager {
var serverBlessings security.Blessings
if rid != naming.NullRoutingID {
serverBlessings = v23.GetPrincipal(ctx).BlessingStore().Default()
}
- return NewWithBlessings(ctx, serverBlessings, rid)
-}
-
-type listenState struct {
- q *upcqueue.T
- listenLoops sync.WaitGroup
-
- mu sync.Mutex
- stopProxy chan struct{}
- listeners []flow.Listener
- endpoints []naming.Endpoint
+ return NewWithBlessings(ctx, serverBlessings, rid, dhcpPublisher)
}
func (m *manager) stopListening() {
@@ -108,6 +123,10 @@
close(m.ls.stopProxy)
m.ls.stopProxy = nil
}
+ if m.ls.notifyWatchers != nil {
+ close(m.ls.notifyWatchers)
+ m.ls.notifyWatchers = nil
+ }
m.ls.mu.Unlock()
for _, ln := range listeners {
ln.Close()
@@ -143,6 +162,7 @@
RID: m.rid,
Blessings: m.serverNames,
}
+ defer m.ls.mu.Unlock()
m.ls.mu.Lock()
if m.ls.listeners == nil {
m.ls.mu.Unlock()
@@ -150,29 +170,143 @@
return flow.NewErrBadState(ctx, NewErrManagerClosed(ctx))
}
m.ls.listeners = append(m.ls.listeners, ln)
- m.ls.endpoints = append(m.ls.endpoints, local)
- m.ls.mu.Unlock()
+ leps, roam, err := m.createEndpoints(ctx, local)
+ if err != nil {
+ return iflow.MaybeWrapError(flow.ErrBadArg, ctx, err)
+ }
+ m.ls.endpoints = append(m.ls.endpoints, &endpointState{
+ leps: leps,
+ tmplEndpoint: local,
+ roaming: roam,
+ })
+ if !m.ls.roaming && m.ls.dhcpPublisher != nil && roam {
+ m.ls.roaming = true
+ m.ls.listenLoops.Add(1)
+ go func() {
+ roaming.ReadRoamingStream(ctx, m.ls.dhcpPublisher, m.rmAddrs, m.addAddrs)
+ m.ls.listenLoops.Done()
+ }()
+ }
m.ls.listenLoops.Add(1)
go m.lnAcceptLoop(ctx, ln, local)
return nil
}
+func (m *manager) createEndpoints(ctx *context.T, lep naming.Endpoint) ([]*inaming.Endpoint, bool, error) {
+ iep := lep.(*inaming.Endpoint)
+ if !strings.HasPrefix(iep.Protocol, "tcp") &&
+ !strings.HasPrefix(iep.Protocol, "ws") {
+ // If not tcp, ws, or wsh, just return the endpoint we were given.
+ return []*inaming.Endpoint{iep}, false, nil
+ }
+ host, port, err := net.SplitHostPort(iep.Address)
+ if err != nil {
+ return nil, false, err
+ }
+ chooser := v23.GetListenSpec(ctx).AddressChooser
+ addrs, unspecified, err := netstate.PossibleAddresses(iep.Protocol, host, chooser)
+ if err != nil {
+ return nil, false, err
+ }
+ ieps := make([]*inaming.Endpoint, 0, len(addrs))
+ for _, addr := range addrs {
+ n, err := inaming.NewEndpoint(lep.String())
+ if err != nil {
+ return nil, false, err
+ }
+ n.Address = net.JoinHostPort(addr.String(), port)
+ ieps = append(ieps, n)
+ }
+ return ieps, unspecified, nil
+}
+
+func (m *manager) addAddrs(addrs []net.Addr) {
+ defer m.ls.mu.Unlock()
+ m.ls.mu.Lock()
+ changed := false
+ for _, addr := range netstate.ConvertToAddresses(addrs) {
+ if !netstate.IsAccessibleIP(addr) {
+ continue
+ }
+ host, _ := getHostPort(addr)
+ for _, epState := range m.ls.endpoints {
+ if !epState.roaming {
+ continue
+ }
+ tmplEndpoint := epState.tmplEndpoint
+ _, port := getHostPort(tmplEndpoint.Addr())
+ if i := findEndpoint(epState, host); i < 0 {
+ nep := *tmplEndpoint
+ nep.Address = net.JoinHostPort(host, port)
+ epState.leps = append(epState.leps, &nep)
+ changed = true
+ }
+ }
+ }
+ if changed && m.ls.notifyWatchers != nil {
+ close(m.ls.notifyWatchers)
+ m.ls.notifyWatchers = make(chan struct{})
+ }
+}
+
+func findEndpoint(epState *endpointState, host string) int {
+ for i, ep := range epState.leps {
+ epHost, _ := getHostPort(ep.Addr())
+ if epHost == host {
+ return i
+ }
+ }
+ return -1
+}
+
+func (m *manager) rmAddrs(addrs []net.Addr) {
+ defer m.ls.mu.Unlock()
+ m.ls.mu.Lock()
+ changed := false
+ for _, addr := range netstate.ConvertToAddresses(addrs) {
+ host, _ := getHostPort(addr)
+ for _, epState := range m.ls.endpoints {
+ if !epState.roaming {
+ continue
+ }
+ if i := findEndpoint(epState, host); i >= 0 {
+ n := len(epState.leps) - 1
+ epState.leps[i], epState.leps[n] = epState.leps[n], nil
+ epState.leps = epState.leps[:n]
+ changed = true
+ }
+ }
+ }
+ if changed && m.ls.notifyWatchers != nil {
+ close(m.ls.notifyWatchers)
+ m.ls.notifyWatchers = make(chan struct{})
+ }
+}
+
+func getHostPort(address net.Addr) (string, string) {
+ host, port, err := net.SplitHostPort(address.String())
+ if err == nil {
+ return host, port
+ }
+ return address.String(), ""
+}
+
// ProxyListen causes the Manager to accept flows from the specified endpoint.
// The endpoint must correspond to a vanadium proxy.
//
// update gets passed the complete set of endpoints for the proxy every time it
// is called.
-func (m *manager) ProxyListen(ctx *context.T, ep naming.Endpoint, update func([]naming.Endpoint)) error {
+func (m *manager) ProxyListen(ctx *context.T, ep naming.Endpoint) error {
if m.ls == nil {
return NewErrListeningWithNullRid(ctx)
}
m.ls.listenLoops.Add(1)
- go m.connectToProxy(ctx, ep, update)
+ go m.connectToProxy(ctx, ep)
return nil
}
-func (m *manager) connectToProxy(ctx *context.T, ep naming.Endpoint, update func([]naming.Endpoint)) {
+func (m *manager) connectToProxy(ctx *context.T, ep naming.Endpoint) {
defer m.ls.listenLoops.Done()
var eps []naming.Endpoint
for delay := reconnectDelay; ; delay *= 2 {
@@ -207,19 +341,52 @@
for i := range eps {
eps[i].(*inaming.Endpoint).Blessings = m.serverNames
}
- update(eps)
+ m.updateProxyEndpoints(eps)
select {
case <-ctx.Done():
return
case <-m.ls.stopProxy:
return
case <-f.Closed():
- update(nil)
+ m.updateProxyEndpoints(nil)
delay = reconnectDelay
}
}
}
+func (m *manager) updateProxyEndpoints(eps []naming.Endpoint) {
+ defer m.ls.mu.Unlock()
+ m.ls.mu.Lock()
+ if endpointsEqual(m.ls.proxyEndpoints, eps) {
+ return
+ }
+ m.ls.proxyEndpoints = eps
+ // The proxy endpoints have changed so we need to notify any watchers to
+ // requery ListeningEndpoints.
+ if m.ls.notifyWatchers != nil {
+ close(m.ls.notifyWatchers)
+ m.ls.notifyWatchers = make(chan struct{})
+ }
+}
+
+func endpointsEqual(a, b []naming.Endpoint) bool {
+ if len(a) != len(b) {
+ return false
+ }
+ m := make(map[string]struct{})
+ for _, ep := range a {
+ m[ep.String()] = struct{}{}
+ }
+ for _, ep := range b {
+ key := ep.String()
+ if _, ok := m[key]; !ok {
+ return false
+ }
+ delete(m, key)
+ }
+ return len(m) == 0
+}
+
func (m *manager) readProxyResponse(ctx *context.T, f flow.Flow) ([]naming.Endpoint, error) {
b, err := f.ReadMsg()
if err != nil {
@@ -361,18 +528,24 @@
// If the Manager is not listening on any endpoints, an endpoint with the
// Manager's RoutingID will be returned for use in bidirectional RPC.
// Returned endpoints all have the Manager's unique RoutingID.
-func (m *manager) ListeningEndpoints() (out []naming.Endpoint) {
+func (m *manager) ListeningEndpoints() (out []naming.Endpoint, changed <-chan struct{}) {
if m.ls == nil {
- return nil
+ return nil, nil
}
m.ls.mu.Lock()
- out = make([]naming.Endpoint, len(m.ls.endpoints))
- copy(out, m.ls.endpoints)
+ out = make([]naming.Endpoint, len(m.ls.proxyEndpoints))
+ copy(out, m.ls.proxyEndpoints)
+ for _, epState := range m.ls.endpoints {
+ for _, ep := range epState.leps {
+ out = append(out, ep)
+ }
+ }
+ changed = m.ls.notifyWatchers
m.ls.mu.Unlock()
if len(out) == 0 {
out = append(out, &inaming.Endpoint{Protocol: bidi.Name, RID: m.rid})
}
- return out
+ return out, changed
}
// Accept blocks until a new Flow has been initiated by a remote process.
diff --git a/runtime/internal/flow/manager/manager_test.go b/runtime/internal/flow/manager/manager_test.go
index 6a949f8..60f9a2a 100644
--- a/runtime/internal/flow/manager/manager_test.go
+++ b/runtime/internal/flow/manager/manager_test.go
@@ -31,11 +31,11 @@
defer goroutines.NoLeaks(t, leakWaitTime)()
ctx, shutdown := v23.Init()
- am := New(ctx, naming.FixedRoutingID(0x5555))
+ am := New(ctx, naming.FixedRoutingID(0x5555), nil)
if err := am.Listen(ctx, "tcp", "127.0.0.1:0"); err != nil {
t.Fatal(err)
}
- dm := New(ctx, naming.FixedRoutingID(0x1111))
+ dm := New(ctx, naming.FixedRoutingID(0x1111), nil)
testFlows(t, ctx, dm, am, flowtest.AllowAllPeersAuthorizer{})
@@ -48,12 +48,12 @@
defer goroutines.NoLeaks(t, leakWaitTime)()
ctx, shutdown := v23.Init()
- am := New(ctx, naming.FixedRoutingID(0x5555))
+ am := New(ctx, naming.FixedRoutingID(0x5555), nil)
if err := am.Listen(ctx, "tcp", "127.0.0.1:0"); err != nil {
t.Fatal(err)
}
- dm := New(ctx, naming.FixedRoutingID(0x1111))
+ dm := New(ctx, naming.FixedRoutingID(0x1111), nil)
// At first the cache should be empty.
if got, want := len(dm.(*manager).cache.addrCache), 0; got != want {
t.Fatalf("got cache size %v, want %v", got, want)
@@ -83,12 +83,12 @@
defer goroutines.NoLeaks(t, leakWaitTime)()
ctx, shutdown := v23.Init()
- am := New(ctx, naming.FixedRoutingID(0x5555))
+ am := New(ctx, naming.FixedRoutingID(0x5555), nil)
if err := am.Listen(ctx, "tcp", "127.0.0.1:0"); err != nil {
t.Fatal(err)
}
- dm := New(ctx, naming.FixedRoutingID(0x1111))
+ dm := New(ctx, naming.FixedRoutingID(0x1111), nil)
testFlows(t, ctx, dm, am, flowtest.AllowAllPeersAuthorizer{})
// Now am should be able to make a flow to dm even though dm is not listening.
testFlows(t, ctx, am, dm, flowtest.AllowAllPeersAuthorizer{})
@@ -102,18 +102,18 @@
defer goroutines.NoLeaks(t, leakWaitTime)()
ctx, shutdown := v23.Init()
- am := New(ctx, naming.FixedRoutingID(0x5555))
+ am := New(ctx, naming.FixedRoutingID(0x5555), nil)
if err := am.Listen(ctx, "tcp", "127.0.0.1:0"); err != nil {
t.Fatal(err)
}
- nulldm := New(ctx, naming.NullRoutingID)
+ nulldm := New(ctx, naming.NullRoutingID, nil)
_, af := testFlows(t, ctx, nulldm, am, flowtest.AllowAllPeersAuthorizer{})
// Ensure that the remote blessings of the underlying conn of the accepted blessings
// only has the public key of the client and no certificates.
if rBlessings := af.Conn().(*conn.Conn).RemoteBlessings(); len(rBlessings.String()) > 0 || rBlessings.PublicKey() == nil {
t.Errorf("got %v, want no-cert blessings", rBlessings)
}
- dm := New(ctx, naming.FixedRoutingID(0x1111))
+ dm := New(ctx, naming.FixedRoutingID(0x1111), nil)
_, af = testFlows(t, ctx, dm, am, flowtest.AllowAllPeersAuthorizer{})
// Ensure that the remote blessings of the underlying conn of the accepted flow are
// non-zero if we did specify a RoutingID.
@@ -131,16 +131,18 @@
defer goroutines.NoLeaks(t, leakWaitTime)()
ctx, shutdown := v23.Init()
- am := New(ctx, naming.FixedRoutingID(0x5555))
+ am := New(ctx, naming.FixedRoutingID(0x5555), nil)
if err := am.Listen(ctx, "tcp", "127.0.0.1:0"); err != nil {
t.Fatal(err)
}
- dm := New(ctx, naming.FixedRoutingID(0x1111))
+ dm := New(ctx, naming.FixedRoutingID(0x1111), nil)
testFlows(t, ctx, dm, am, flowtest.AllowAllPeersAuthorizer{})
+ leps, _ := am.ListeningEndpoints()
+ lameEP := leps[0]
am.StopListening(ctx)
- if f, err := dm.Dial(ctx, am.ListeningEndpoints()[0], flowtest.AllowAllPeersAuthorizer{}); err == nil {
+ if f, err := dm.Dial(ctx, lameEP, flowtest.AllowAllPeersAuthorizer{}); err == nil {
t.Errorf("dialing a lame duck should fail, but didn't %#v.", f)
}
@@ -150,7 +152,8 @@
}
func testFlows(t *testing.T, ctx *context.T, dm, am flow.Manager, auth flow.PeerAuthorizer) (df, af flow.Flow) {
- ep := am.ListeningEndpoints()[0]
+ eps, _ := am.ListeningEndpoints()
+ ep := eps[0]
var err error
df, err = dm.Dial(ctx, ep, auth)
if err != nil {
diff --git a/runtime/internal/lib/roaming/roaming.go b/runtime/internal/lib/roaming/roaming.go
new file mode 100644
index 0000000..c455332
--- /dev/null
+++ b/runtime/internal/lib/roaming/roaming.go
@@ -0,0 +1,145 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package roaming
+
+import (
+ "net"
+
+ "v.io/x/lib/netconfig"
+ "v.io/x/lib/netstate"
+ "v.io/x/ref/lib/pubsub"
+
+ "v.io/v23/context"
+ "v.io/v23/rpc"
+)
+
+const (
+ RoamingSetting = "roaming"
+ RoamingSettingDesc = "pubsub stream used by the roaming RuntimeFactory"
+ UpdateAddrsSetting = "NewAddrs"
+ UpdateAddrsSettingDesc = "Addresses that have been available since last change"
+ RmAddrsSetting = "RmAddrs"
+ RmAddrsSettingDesc = "Addresses that have been removed since last change"
+)
+
+// CreateRoamingStream creates a stream that monitors network configuration
+// changes and publishes sebsequent Settings to reflect any changes detected.
+// A synchronous shutdown function is returned.
+func CreateRoamingStream(ctx *context.T, publisher *pubsub.Publisher, listenSpec rpc.ListenSpec) (func(), error) {
+ ch := make(chan pubsub.Setting)
+ stop, err := publisher.CreateStream(RoamingSetting, RoamingSettingDesc, ch)
+ if err != nil {
+ return nil, err
+ }
+ prev, err := netstate.GetAccessibleIPs()
+ if err != nil {
+ return nil, err
+ }
+ watcher, err := netconfig.NewNetConfigWatcher()
+ if err != nil {
+ return nil, err
+ }
+ done := make(chan struct{})
+ go func() {
+ defer close(ch)
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-done:
+ return
+ case <-stop:
+ return
+ case <-watcher.Channel():
+ netstate.InvalidateCache()
+ cur, err := netstate.GetAccessibleIPs()
+ if err != nil {
+ ctx.Errorf("failed to read network state: %v", err)
+ continue
+ }
+ removed := netstate.FindRemoved(prev, cur)
+ added := netstate.FindAdded(prev, cur)
+ ctx.VI(2).Infof("Previous: %d: %s", len(prev), prev)
+ ctx.VI(2).Infof("Current : %d: %s", len(cur), cur)
+ ctx.VI(2).Infof("Added : %d: %s", len(added), added)
+ ctx.VI(2).Infof("Removed : %d: %s", len(removed), removed)
+ if len(removed) > 0 {
+ ctx.VI(2).Infof("Sending removed: %s", removed)
+ ch <- NewRmAddrsSetting(removed.AsNetAddrs())
+ }
+ if chosen, err := listenSpec.AddressChooser.ChooseAddresses(listenSpec.Addrs[0].Protocol, cur.AsNetAddrs()); err == nil && len(chosen) > 0 {
+ ctx.VI(2).Infof("Sending added and chosen: %s", chosen)
+ ch <- NewUpdateAddrsSetting(chosen)
+ }
+ prev = cur
+ }
+ }
+ }()
+ shutdown := func() {
+ close(done)
+ watcher.Stop()
+ }
+ return shutdown, nil
+}
+
+// ReadRoamingStream reads updates from the roaming RuntimeFactory. Updates are
+// passed as the argument to 'update'.
+// 'remove' gets passed the net.Addrs that are no longer being listened on.
+// 'add' gets passed the net.Addrs that are newly being listened on.
+func ReadRoamingStream(ctx *context.T, pub *pubsub.Publisher, remove, add func([]net.Addr)) {
+ ch := make(chan pubsub.Setting, 10)
+ _, err := pub.ForkStream(RoamingSetting, ch)
+ if err != nil {
+ ctx.Errorf("error forking stream:", err)
+ return
+ }
+ for {
+ select {
+ case <-ctx.Done():
+ if err := pub.CloseFork(RoamingSetting, ch); err == nil {
+ drain(ch)
+ }
+ return
+ case setting := <-ch:
+ if setting == nil {
+ continue
+ }
+ addrs, ok := setting.Value().([]net.Addr)
+ if !ok {
+ ctx.Errorf("unhandled setting type %T", addrs)
+ continue
+ }
+ switch setting.Name() {
+ case UpdateAddrsSetting:
+ add(addrs)
+ case RmAddrsSetting:
+ remove(addrs)
+ }
+ }
+ }
+}
+
+func drain(ch chan pubsub.Setting) {
+ for {
+ select {
+ case <-ch:
+ default:
+ close(ch)
+ return
+ }
+ }
+}
+
+// NewUpdateAddrsSetting creates the Setting to be sent to Listen to inform
+// it of all addresses that habe become available since the last change.
+func NewUpdateAddrsSetting(a []net.Addr) pubsub.Setting {
+ return pubsub.NewAny(UpdateAddrsSetting, UpdateAddrsSettingDesc, a)
+}
+
+// NewRmAddrsSetting creates the Setting to be sent to Listen to inform
+// it of addresses that are no longer available.
+func NewRmAddrsSetting(a []net.Addr) pubsub.Setting {
+ return pubsub.NewAny(RmAddrsSetting, RmAddrsSettingDesc, a)
+}
diff --git a/runtime/internal/rpc/benchmark/simple/main.go b/runtime/internal/rpc/benchmark/simple/main.go
index fb30f0a..1b450ac 100644
--- a/runtime/internal/rpc/benchmark/simple/main.go
+++ b/runtime/internal/rpc/benchmark/simple/main.go
@@ -56,7 +56,7 @@
b.ResetTimer()
for i := 0; i < b.N; i++ {
if ref.RPCTransitionState() >= ref.XServers {
- m := fmanager.New(nctx, naming.FixedRoutingID(0xc))
+ m := fmanager.New(nctx, naming.FixedRoutingID(0xc), nil)
b.StartTimer()
_, err := m.Dial(nctx, serverEP, flowtest.AllowAllPeersAuthorizer{})
if err != nil {
diff --git a/runtime/internal/rpc/pubsub.go b/runtime/internal/rpc/pubsub.go
deleted file mode 100644
index 03f1e67..0000000
--- a/runtime/internal/rpc/pubsub.go
+++ /dev/null
@@ -1,30 +0,0 @@
-// Copyright 2015 The Vanadium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-package rpc
-
-import (
- "net"
-
- "v.io/x/ref/lib/pubsub"
-)
-
-// NewNewAddrsSetting creates the Setting to be sent to Listen to inform
-// it of all addresses that habe become available since the last change.
-func NewNewAddrsSetting(a []net.Addr) pubsub.Setting {
- return pubsub.NewAny(NewAddrsSetting, NewAddrsSettingDesc, a)
-}
-
-// NewRmAddrsSetting creates the Setting to be sent to Listen to inform
-// it of addresses that are no longer available.
-func NewRmAddrsSetting(a []net.Addr) pubsub.Setting {
- return pubsub.NewAny(RmAddrsSetting, RmAddrsSettingDesc, a)
-}
-
-const (
- NewAddrsSetting = "NewAddrs"
- NewAddrsSettingDesc = "Addresses that have been available since last change"
- RmAddrsSetting = "RmAddrs"
- RmAddrsSettingDesc = "Addresses that have been removed since last change"
-)
diff --git a/runtime/internal/rpc/roaming_test.go b/runtime/internal/rpc/roaming_test.go
index d192ec9..e96ac7a 100644
--- a/runtime/internal/rpc/roaming_test.go
+++ b/runtime/internal/rpc/roaming_test.go
@@ -8,15 +8,20 @@
"net"
"reflect"
"sort"
+ "strings"
"testing"
"time"
"v.io/v23"
+ "v.io/v23/context"
"v.io/v23/naming"
"v.io/v23/rpc"
"v.io/x/lib/netstate"
"v.io/x/lib/set"
+ "v.io/x/ref"
"v.io/x/ref/lib/pubsub"
+ "v.io/x/ref/runtime/factories/fake"
+ "v.io/x/ref/runtime/internal/lib/roaming"
inaming "v.io/x/ref/runtime/internal/naming"
_ "v.io/x/ref/runtime/internal/rpc/protocols/tcp"
_ "v.io/x/ref/runtime/internal/rpc/protocols/ws"
@@ -26,7 +31,114 @@
"v.io/x/ref/test/testutil"
)
-// TODO(mattr): Transition this to using public API.
+func TestRoamingNew(t *testing.T) {
+ if ref.RPCTransitionState() < ref.XServers {
+ t.Skip("This test only runs under the new rpc system.")
+ }
+ ctx, shutdown := v23.Init()
+ defer shutdown()
+
+ ctx = fake.SetClientFactory(ctx, func(ctx *context.T, opts ...rpc.ClientOpt) rpc.Client {
+ return NewXClient(ctx, v23.GetNamespace(ctx), opts...)
+ })
+
+ publisher := pubsub.NewPublisher()
+ ch := make(chan pubsub.Setting)
+ stop, err := publisher.CreateStream(roaming.RoamingSetting, roaming.RoamingSettingDesc, ch)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer func() { publisher.Shutdown(); <-stop }()
+
+ ipv4And6 := netstate.AddressChooserFunc(func(network string, addrs []net.Addr) ([]net.Addr, error) {
+ accessible := netstate.ConvertToAddresses(addrs)
+ ipv4 := accessible.Filter(netstate.IsUnicastIPv4)
+ ipv6 := accessible.Filter(netstate.IsUnicastIPv6)
+ return append(ipv4.AsNetAddrs(), ipv6.AsNetAddrs()...), nil
+ })
+ spec := rpc.ListenSpec{
+ Addrs: rpc.ListenAddrs{
+ {"tcp", "*:0"}, // an invalid address.
+ {"tcp", ":0"},
+ {"tcp", ":0"},
+ },
+ AddressChooser: ipv4And6,
+ }
+ sctx := v23.WithListenSpec(ctx, spec)
+ sctx, _ = v23.WithPrincipal(sctx, testutil.NewPrincipal("test"))
+ sctx, server, err := WithNewServer(sctx, "foo", &testServer{}, nil, publisher)
+ if err != nil {
+ t.Fatal(err)
+ }
+ status := server.Status()
+ prevEps := status.Endpoints
+
+ n1 := netstate.NewNetAddr("ip", "1.1.1.1")
+ n2 := netstate.NewNetAddr("ip", "2.2.2.2")
+
+ change := status.Valid
+
+ ch <- roaming.NewUpdateAddrsSetting([]net.Addr{n1, n2})
+ // We should be notified of a network change.
+ <-change
+ status = server.Status()
+ eps := status.Endpoints
+ change = status.Valid
+ // We expect 4 new endpoints, 2 for each valid listen call.
+ if got, want := len(eps), len(prevEps)+4; got != want {
+ t.Errorf("got %v, want %v", got, want)
+ }
+ // We expect the added networks to be in the new endpoints.
+ if got, want := len(filterEndpointsByHost(eps, "1.1.1.1")), 2; got != want {
+ t.Errorf("got %v, wanted %v endpoints with host 1.1.1.1")
+ }
+ if got, want := len(filterEndpointsByHost(eps, "2.2.2.2")), 2; got != want {
+ t.Errorf("got %v, wanted %v endpoints with host 2.2.2.2")
+ }
+ prevEps = eps
+
+ // Now remove a network.
+ ch <- roaming.NewRmAddrsSetting([]net.Addr{n1})
+ <-change
+ status = server.Status()
+ eps = status.Endpoints
+ change = status.Valid
+ // We expect 2 endpoints to be missing.
+ if got, want := len(eps), len(prevEps)-2; got != want {
+ t.Errorf("got %v, want %v", got, want)
+ }
+ // We expect the removed network to not be in the new endpoints.
+ if got, want := len(filterEndpointsByHost(eps, "1.1.1.1")), 0; got != want {
+ t.Errorf("got %v, wanted %v endpoints with host 1.1.1.1")
+ }
+ prevEps = eps
+
+ // Now remove everything, essentially "disconnected from the network"
+ ch <- roaming.NewRmAddrsSetting(getIPAddrs(prevEps))
+ <-change
+ status = server.Status()
+ eps = status.Endpoints
+ change = status.Valid
+ // We expect there to be only the bidi endpoint.
+ if got, want := len(eps), 1; got != want && eps[0].Addr().Network() != "bidi" {
+ t.Errorf("got %v, want %v", got, want)
+ }
+
+ // Now if we reconnect to a network it should should up.
+ ch <- roaming.NewUpdateAddrsSetting([]net.Addr{n1})
+ <-change
+ status = server.Status()
+ eps = status.Endpoints
+ // We expect 2 endpoints to be added
+ if got, want := len(eps), 2; got != want {
+ t.Errorf("got %v, want %v", got, want)
+ }
+ // We expect the removed network to not be in the new endpoints.
+ if got, want := len(filterEndpointsByHost(eps, "1.1.1.1")), 2; got != want {
+ t.Errorf("got %v, wanted %v endpoints with host 1.1.1.1")
+ }
+}
+
func TestRoaming(t *testing.T) {
ctx, shutdown := initForTest()
defer shutdown()
@@ -35,19 +147,19 @@
ns := tnaming.NewSimpleNamespace()
publisher := pubsub.NewPublisher()
- roaming := make(chan pubsub.Setting)
- stop, err := publisher.CreateStream("TestRoaming", "TestRoaming", roaming)
+ ch := make(chan pubsub.Setting)
+ stop, err := publisher.CreateStream("TestRoaming", "TestRoaming", ch)
if err != nil {
t.Fatal(err)
}
defer func() { publisher.Shutdown(); <-stop }()
nctx, _ := v23.WithPrincipal(ctx, testutil.NewPrincipal("test"))
- server, err := testInternalNewServerWithPubsub(nctx, sm, ns, publisher, "TestRoaming")
+ s, err := testInternalNewServerWithPubsub(nctx, sm, ns, publisher, "TestRoaming")
if err != nil {
t.Fatal(err)
}
- defer server.Stop()
+ defer s.Stop()
ipv4And6 := netstate.AddressChooserFunc(func(network string, addrs []net.Addr) ([]net.Addr, error) {
accessible := netstate.ConvertToAddresses(addrs)
@@ -64,7 +176,7 @@
AddressChooser: ipv4And6,
}
- eps, err := server.Listen(spec)
+ eps, err := s.Listen(spec)
if err != nil {
t.Fatal(err)
}
@@ -72,15 +184,16 @@
t.Fatal("no endpoints listened on.")
}
- if err = server.Serve("foo", &testServer{}, nil); err != nil {
+ if err = s.Serve("foo", &testServer{}, nil); err != nil {
t.Fatal(err)
}
setLeafEndpoints(eps)
- if err = server.AddName("bar"); err != nil {
+ if err = s.AddName("bar"); err != nil {
t.Fatal(err)
}
- status := server.Status()
+ status := s.Status()
+ netChange := status.Valid
if got, want := status.Endpoints, eps; !cmpEndpoints(got, want) {
t.Fatalf("got %v, want %v", got, want)
}
@@ -89,11 +202,11 @@
t.Fatalf("got %d, want %d", got, want)
}
- watcher := make(chan rpc.NetworkChange, 10)
- server.WatchNetwork(watcher)
+ watcher := make(chan NetworkChange, 10)
+ s.(*server).WatchNetwork(watcher)
defer close(watcher)
- waitForChange := func() *rpc.NetworkChange {
+ waitForChange := func() *NetworkChange {
ctx.Infof("Waiting on %p", watcher)
select {
case c := <-watcher:
@@ -108,7 +221,8 @@
n2 := netstate.NewNetAddr("ip", "2.2.2.2")
n3 := netstate.NewNetAddr("ip", "3.3.3.3")
- roaming <- NewNewAddrsSetting([]net.Addr{n1, n2})
+ ch <- roaming.NewUpdateAddrsSetting([]net.Addr{n1, n2})
+ <-netChange
// We expect 2 added addrs and 4 changes, one for each IP per usable listen spec addr.
change := waitForChange()
@@ -127,7 +241,8 @@
nepsA = append(nepsA, []naming.Endpoint{nep1, nep2}...)
}
- status = server.Status()
+ status = s.Status()
+ netChange = status.Valid
if got, want := status.Endpoints, nepsA; !cmpEndpoints(got, want) {
t.Fatalf("got %v, want %v [%d, %d]", got, want, len(got), len(want))
}
@@ -140,9 +255,10 @@
}
// Mimic that n2 has been changed to n3. The network monitor will publish
- // two AddrsSettings - RmAddrsSetting(n2) and NewNewAddrsSetting(n1, n3).
+ // two AddrsSettings - RmAddrsSetting(n2) and NewUpdateAddrsSetting(n1, n3).
- roaming <- NewRmAddrsSetting([]net.Addr{n2})
+ ch <- roaming.NewRmAddrsSetting([]net.Addr{n2})
+ <-netChange
// We expect 1 removed addr and 2 changes, one for each usable listen spec addr.
change = waitForChange()
@@ -160,12 +276,14 @@
nepsR = append(nepsR, nep2)
}
- status = server.Status()
+ status = s.Status()
+ netChange = status.Valid
if got, want := status.Endpoints, nepsR; !cmpEndpoints(got, want) {
t.Fatalf("got %v, want %v [%d, %d]", got, want, len(got), len(want))
}
- roaming <- NewNewAddrsSetting([]net.Addr{n1, n3})
+ ch <- roaming.NewUpdateAddrsSetting([]net.Addr{n1, n3})
+ <-netChange
// We expect 1 added addr and 2 changes, one for the new IP per usable listen spec addr.
change = waitForChange()
@@ -184,7 +302,8 @@
nepsC = append(nepsC, nep1, nep2)
}
- status = server.Status()
+ status = s.Status()
+ netChange = status.Valid
if got, want := status.Endpoints, nepsC; !cmpEndpoints(got, want) {
t.Fatalf("got %v, want %v [%d, %d]", got, want, len(got), len(want))
}
@@ -197,7 +316,8 @@
}
// Remove all addresses to mimic losing all connectivity.
- roaming <- NewRmAddrsSetting(getIPAddrs(nepsC))
+ ch <- roaming.NewRmAddrsSetting(getIPAddrs(nepsC))
+ <-netChange
// We expect changes for all of the current endpoints
change = waitForChange()
@@ -205,12 +325,14 @@
t.Fatalf("got %d, want %d", got, want)
}
- status = server.Status()
+ status = s.Status()
+ netChange = status.Valid
if got, want := len(status.Mounts), 0; got != want {
t.Fatalf("got %d, want %d: %v", got, want, status.Mounts)
}
- roaming <- NewNewAddrsSetting([]net.Addr{n1})
+ ch <- roaming.NewUpdateAddrsSetting([]net.Addr{n1})
+ <-netChange
// We expect 2 changes, one for each usable listen spec addr.
change = waitForChange()
if got, want := len(change.Changed), 2; got != want {
@@ -226,30 +348,30 @@
ns := tnaming.NewSimpleNamespace()
publisher := pubsub.NewPublisher()
- roaming := make(chan pubsub.Setting)
- stop, err := publisher.CreateStream("TestWatcherDeadlock", "TestWatcherDeadlock", roaming)
+ ch := make(chan pubsub.Setting)
+ stop, err := publisher.CreateStream("TestWatcherDeadlock", "TestWatcherDeadlock", ch)
if err != nil {
t.Fatal(err)
}
defer func() { publisher.Shutdown(); <-stop }()
nctx, _ := v23.WithPrincipal(ctx, testutil.NewPrincipal("test"))
- server, err := testInternalNewServerWithPubsub(nctx, sm, ns, publisher, "TestWatcherDeadlock")
+ s, err := testInternalNewServerWithPubsub(nctx, sm, ns, publisher, "TestWatcherDeadlock")
if err != nil {
t.Fatal(err)
}
- defer server.Stop()
+ defer s.Stop()
spec := rpc.ListenSpec{
Addrs: rpc.ListenAddrs{
{"tcp", ":0"},
},
}
- eps, err := server.Listen(spec)
+ eps, err := s.Listen(spec)
if err != nil {
t.Fatal(err)
}
- if err = server.Serve("foo", &testServer{}, nil); err != nil {
+ if err = s.Serve("foo", &testServer{}, nil); err != nil {
t.Fatal(err)
}
setLeafEndpoints(eps)
@@ -257,17 +379,17 @@
// Set a watcher that we never read from - the intent is to make sure
// that the listener still listens to changes even though there is no
// goroutine to read from the watcher channel.
- watcher := make(chan rpc.NetworkChange, 0)
- server.WatchNetwork(watcher)
+ watcher := make(chan NetworkChange, 0)
+ s.(*server).WatchNetwork(watcher)
defer close(watcher)
// Remove all addresses to mimic losing all connectivity.
- roaming <- NewRmAddrsSetting(getIPAddrs(eps))
+ ch <- roaming.NewRmAddrsSetting(getIPAddrs(eps))
// Add in two new addresses
n1 := netstate.NewNetAddr("ip", "1.1.1.1")
n2 := netstate.NewNetAddr("ip", "2.2.2.2")
- roaming <- NewNewAddrsSetting([]net.Addr{n1, n2})
+ ch <- roaming.NewUpdateAddrsSetting([]net.Addr{n1, n2})
neps := make([]naming.Endpoint, 0, len(eps))
for _, p := range getUniqPorts(eps) {
@@ -277,7 +399,7 @@
}
then := time.Now()
for {
- status := server.Status()
+ status := s.Status()
if got, want := status.Endpoints, neps; cmpEndpoints(got, want) {
break
}
@@ -310,6 +432,16 @@
return addrs
}
+func filterEndpointsByHost(eps []naming.Endpoint, host string) []naming.Endpoint {
+ var filtered []naming.Endpoint
+ for _, ep := range eps {
+ if strings.Contains(ep.Addr().String(), host) {
+ filtered = append(filtered, ep)
+ }
+ }
+ return filtered
+}
+
func cmpEndpoints(got, want []naming.Endpoint) bool {
if len(got) != len(want) {
return false
diff --git a/runtime/internal/rpc/server.go b/runtime/internal/rpc/server.go
index 80fa2bf..7cb149d 100644
--- a/runtime/internal/rpc/server.go
+++ b/runtime/internal/rpc/server.go
@@ -34,6 +34,7 @@
"v.io/x/ref/lib/pubsub"
"v.io/x/ref/lib/stats"
"v.io/x/ref/runtime/internal/lib/publisher"
+ "v.io/x/ref/runtime/internal/lib/roaming"
inaming "v.io/x/ref/runtime/internal/naming"
"v.io/x/ref/runtime/internal/rpc/stream"
"v.io/x/ref/runtime/internal/rpc/stream/manager"
@@ -151,7 +152,17 @@
stream *pubsub.Stream
ch chan pubsub.Setting // channel to receive dhcp settings over
err error // error status.
- watchers map[chan<- rpc.NetworkChange]struct{}
+ watchers map[chan<- NetworkChange]struct{}
+ change chan struct{}
+}
+
+type NetworkChange struct {
+ Time time.Time // Time of last change.
+ State rpc.ServerState // The current state of the server.
+ AddedAddrs []net.Addr // Addresses added sinced the last change.
+ RemovedAddrs []net.Addr // Addresses removed since the last change.
+ Changed []naming.Endpoint // The set of endpoints added/removed as a result of this change.
+ Error error // Any error that encountered.
}
type server struct {
@@ -340,6 +351,9 @@
status.ServesMountTable = s.servesMountTable
status.Mounts = s.publisher.Status()
status.Endpoints = []naming.Endpoint{}
+ if s.dhcpState != nil {
+ status.Valid = s.dhcpState.change
+ }
for ls, _ := range s.listenState {
if ls.eperr != nil {
status.Errors = append(status.Errors, ls.eperr)
@@ -362,7 +376,7 @@
return status
}
-func (s *server) WatchNetwork(ch chan<- rpc.NetworkChange) {
+func (s *server) WatchNetwork(ch chan<- NetworkChange) {
defer apilog.LogCallf(nil, "ch=")(nil, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
s.Lock()
defer s.Unlock()
@@ -371,7 +385,7 @@
}
}
-func (s *server) UnwatchNetwork(ch chan<- rpc.NetworkChange) {
+func (s *server) UnwatchNetwork(ch chan<- NetworkChange) {
defer apilog.LogCallf(nil, "ch=")(nil, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
s.Lock()
defer s.Unlock()
@@ -519,7 +533,8 @@
dhcp := &dhcpState{
name: s.settingsName,
publisher: s.settingsPublisher,
- watchers: make(map[chan<- rpc.NetworkChange]struct{}),
+ watchers: make(map[chan<- NetworkChange]struct{}),
+ change: make(chan struct{}),
}
s.dhcpState = dhcp
dhcp.ch = make(chan pubsub.Setting, 10)
@@ -727,14 +742,14 @@
s.Unlock()
return
}
- change := rpc.NetworkChange{
+ change := NetworkChange{
Time: time.Now(),
State: externalStates[s.state],
}
switch setting.Name() {
- case NewAddrsSetting:
+ case roaming.UpdateAddrsSetting:
change.AddedAddrs, change.Changed = s.updateAddresses(v)
- case RmAddrsSetting:
+ case roaming.RmAddrsSetting:
change.RemovedAddrs, change.Changed = s.removeAddresses(v)
}
s.ctx.VI(2).Infof("rpc: dhcp: change %v", change)
@@ -744,6 +759,10 @@
default:
}
}
+ if s.dhcpState.change != nil {
+ close(s.dhcpState.change)
+ s.dhcpState.change = make(chan struct{})
+ }
s.Unlock()
default:
s.ctx.Errorf("rpc: dhcpLoop: unhandled setting type %T", v)
@@ -757,7 +776,6 @@
return host
}
return address.String()
-
}
// findEndpoint returns the index of the first endpoint in ieps with the given network address.
@@ -994,6 +1012,10 @@
if err := dhcp.publisher.CloseFork(dhcp.name, dhcp.ch); err == nil {
drain(dhcp.ch)
}
+ if dhcp.change != nil {
+ close(dhcp.change)
+ dhcp.change = nil
+ }
}
s.Unlock()
diff --git a/runtime/internal/rpc/test/full_test.go b/runtime/internal/rpc/test/full_test.go
index 5af791a..1bee64e 100644
--- a/runtime/internal/rpc/test/full_test.go
+++ b/runtime/internal/rpc/test/full_test.go
@@ -13,6 +13,7 @@
"io"
"net"
"reflect"
+ "strings"
"testing"
"time"
@@ -289,13 +290,19 @@
t.Fatal(err)
}
status := server.Status()
- if got, want := len(status.Endpoints), 0; got != want {
- t.Errorf("got %d, want %d", got, want)
+ if ref.RPCTransitionState() >= ref.XServers {
+ if got, want := len(status.Endpoints), 1; got != want {
+ t.Errorf("got %d, want %d", got, want)
+ }
+ } else {
+ if got, want := len(status.Endpoints), 0; got != want {
+ t.Errorf("got %d, want %d", got, want)
+ }
}
if got, want := len(status.Errors), 1; got != want {
t.Errorf("got %d, want %d", got, want)
}
- if got, want := status.Errors[0].Error(), "oops"; got != want {
+ if got, want := status.Errors[0].Error(), "oops"; !strings.Contains(got, want) {
t.Errorf("got %d, want %d", got, want)
}
}
diff --git a/runtime/internal/rpc/x_test.go b/runtime/internal/rpc/x_test.go
index 5568371..0697ae0 100644
--- a/runtime/internal/rpc/x_test.go
+++ b/runtime/internal/rpc/x_test.go
@@ -30,7 +30,7 @@
var i uint64 = 1
ctx = fake.SetFlowManagerFactory(ctx, func(ctx *context.T) flow.Manager {
i++
- return manager.New(ctx, naming.FixedRoutingID(i))
+ return manager.New(ctx, naming.FixedRoutingID(i), nil)
})
ctx = fake.SetClientFactory(ctx, func(ctx *context.T, o ...rpc.ClientOpt) rpc.Client {
return NewXClient(ctx, v23.GetNamespace(ctx), o...)
@@ -39,7 +39,7 @@
ctx = v23.WithListenSpec(ctx, rpc.ListenSpec{
Addrs: rpc.ListenAddrs{{Protocol: "tcp", Address: "127.0.0.1:0"}},
})
- _, _, err := WithNewServer(ctx, "server", &testService{}, nil, nil, "")
+ _, _, err := WithNewServer(ctx, "server", &testService{}, nil, nil)
if err != nil {
t.Fatal(verror.DebugString(err))
}
@@ -64,7 +64,7 @@
var i uint64 = 1
ctx = fake.SetFlowManagerFactory(ctx, func(ctx *context.T) flow.Manager {
i++
- return manager.New(ctx, naming.FixedRoutingID(i))
+ return manager.New(ctx, naming.FixedRoutingID(i), nil)
})
ctx = fake.SetClientFactory(ctx, func(ctx *context.T, o ...rpc.ClientOpt) rpc.Client {
return NewXClient(ctx, v23.GetNamespace(ctx), o...)
@@ -73,7 +73,7 @@
ctx = v23.WithListenSpec(ctx, rpc.ListenSpec{
Addrs: rpc.ListenAddrs{{Protocol: "tcp", Address: "127.0.0.1:0"}},
})
- _, _, err := WithNewDispatchingServer(ctx, "server", &testDispatcher{}, nil, "")
+ _, _, err := WithNewDispatchingServer(ctx, "server", &testDispatcher{}, nil)
if err != nil {
t.Fatal(verror.DebugString(err))
}
diff --git a/runtime/internal/rpc/xclient.go b/runtime/internal/rpc/xclient.go
index 0c78353..875e6fe 100644
--- a/runtime/internal/rpc/xclient.go
+++ b/runtime/internal/rpc/xclient.go
@@ -80,7 +80,7 @@
}
}
if c.flowMgr == nil {
- c.flowMgr = manager.New(ctx, naming.NullRoutingID)
+ c.flowMgr = manager.New(ctx, naming.NullRoutingID, nil)
}
go func() {
diff --git a/runtime/internal/rpc/xserver.go b/runtime/internal/rpc/xserver.go
index 5b8a798..42ef6c4 100644
--- a/runtime/internal/rpc/xserver.go
+++ b/runtime/internal/rpc/xserver.go
@@ -7,7 +7,6 @@
import (
"fmt"
"io"
- "net"
"reflect"
"strings"
"sync"
@@ -26,7 +25,6 @@
"v.io/v23/verror"
"v.io/v23/vom"
"v.io/v23/vtrace"
- "v.io/x/lib/netstate"
"v.io/x/ref/lib/apilog"
"v.io/x/ref/lib/pubsub"
"v.io/x/ref/lib/stats"
@@ -36,7 +34,6 @@
)
// TODO(mattr): add/removeAddresses
-// TODO(mattr): dhcpLoop
type xserver struct {
sync.Mutex
@@ -49,17 +46,13 @@
flowMgr flow.Manager
publisher publisher.Publisher // publisher to publish mounttable mounts.
settingsPublisher *pubsub.Publisher // pubsub publisher for dhcp
- settingsName string // pubwsub stream name for dhcp
- dhcpState *dhcpState // dhcpState, nil if not using dhcp
+ valid chan struct{}
blessings security.Blessings
typeCache *typeCache
- addressChooser rpc.AddressChooser
state rpc.ServerState // the current state of the server.
- chosenEndpoints map[string]*inaming.Endpoint // endpoints chosen by the addressChooser for publishing.
- protoEndpoints map[string]*inaming.Endpoint // endpoints that act as "template" endpoints.
- proxyEndpoints map[string]map[string]*inaming.Endpoint // keyed by ep.String()
- lnErrors map[string]error // erros from listening, keyed by ep.String()
+ endpoints map[string]*inaming.Endpoint // endpoints that the server is listening on.
+ lnErrors []error // errors from listening
disp rpc.Dispatcher // dispatcher to serve RPCs
dispReserved rpc.Dispatcher // dispatcher for reserved methods
@@ -68,13 +61,12 @@
servesMountTable bool
isLeaf bool
- // TODO(cnicolaou): add roaming stats to rpcStats
stats *rpcStats // stats for this server.
}
func WithNewServer(ctx *context.T,
name string, object interface{}, authorizer security.Authorizer,
- settingsPublisher *pubsub.Publisher, settingsName string,
+ settingsPublisher *pubsub.Publisher,
opts ...rpc.ServerOpt) (*context.T, rpc.Server, error) {
if object == nil {
return ctx, nil, verror.New(verror.ErrBadArg, ctx, "nil object")
@@ -85,12 +77,12 @@
}
d := &leafDispatcher{invoker, authorizer}
opts = append([]rpc.ServerOpt{options.IsLeaf(true)}, opts...)
- return WithNewDispatchingServer(ctx, name, d, settingsPublisher, settingsName, opts...)
+ return WithNewDispatchingServer(ctx, name, d, settingsPublisher, opts...)
}
func WithNewDispatchingServer(ctx *context.T,
name string, dispatcher rpc.Dispatcher,
- settingsPublisher *pubsub.Publisher, settingsName string,
+ settingsPublisher *pubsub.Publisher,
opts ...rpc.ServerOpt) (*context.T, rpc.Server, error) {
if dispatcher == nil {
return ctx, nil, verror.New(verror.ErrBadArg, ctx, "nil dispatcher")
@@ -110,11 +102,11 @@
blessings: v23.GetPrincipal(rootCtx).BlessingStore().Default(),
stats: newRPCStats(statsPrefix),
settingsPublisher: settingsPublisher,
- settingsName: settingsName,
+ valid: make(chan struct{}),
disp: dispatcher,
typeCache: newTypeCache(),
- proxyEndpoints: make(map[string]map[string]*inaming.Endpoint),
state: rpc.ServerActive,
+ endpoints: make(map[string]*inaming.Endpoint),
}
for _, opt := range opts {
switch opt := opt.(type) {
@@ -136,7 +128,7 @@
}
}
- s.flowMgr = manager.NewWithBlessings(rootCtx, s.blessings, rid)
+ s.flowMgr = manager.NewWithBlessings(rootCtx, s.blessings, rid, settingsPublisher)
rootCtx, _, err = v23.WithNewClient(rootCtx,
clientFlowManagerOpt{s.flowMgr},
PreferredProtocols(s.preferredProtocols))
@@ -157,7 +149,7 @@
// TODO(mattr): We only call AddServer here, but if someone calls AddName
// later there will be no servers?
s.Lock()
- for k, _ := range s.chosenEndpoints {
+ for k, _ := range s.endpoints {
s.publisher.AddServer(k)
}
s.Unlock()
@@ -187,30 +179,6 @@
close(done)
}()
- s.Lock()
- // TODO(mattr): I don't understand what this is.
- if dhcp := s.dhcpState; dhcp != nil {
- // TODO(cnicolaou,caprita): investigate not having to close and drain
- // the channel here. It's a little awkward right now since we have to
- // be careful to not close the channel in two places, i.e. here and
- // and from the publisher's Shutdown method.
- if err := dhcp.publisher.CloseFork(dhcp.name, dhcp.ch); err == nil {
- drain:
- for {
- select {
- case v := <-dhcp.ch:
- if v == nil {
- break drain
- }
- default:
- close(dhcp.ch)
- break drain
- }
- }
- }
- }
- s.Unlock()
-
select {
case <-done:
case <-time.After(5 * time.Second): // TODO(mattr): This should be configurable.
@@ -222,6 +190,8 @@
// operations to a close.
s.cancel()
<-s.flowMgr.Closed()
+ close(s.valid)
+ s.valid = nil
// Now we really will wait forever. If this doesn't exit, there's something
// wrong with the users code.
<-done
@@ -237,35 +207,16 @@
status.ServesMountTable = s.servesMountTable
status.Mounts = s.publisher.Status()
s.Lock()
+ status.Valid = s.valid
status.State = s.state
- for _, e := range s.chosenEndpoints {
+ for _, e := range s.endpoints {
status.Endpoints = append(status.Endpoints, e)
}
- for _, err := range s.lnErrors {
- status.Errors = append(status.Errors, err)
- }
+ status.Errors = s.lnErrors
s.Unlock()
return status
}
-func (s *xserver) WatchNetwork(ch chan<- rpc.NetworkChange) {
- defer apilog.LogCallf(nil, "ch=")(nil, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
- s.Lock()
- defer s.Unlock()
- if s.dhcpState != nil {
- s.dhcpState.watchers[ch] = struct{}{}
- }
-}
-
-func (s *xserver) UnwatchNetwork(ch chan<- rpc.NetworkChange) {
- defer apilog.LogCallf(nil, "ch=")(nil, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
- s.Lock()
- defer s.Unlock()
- if s.dhcpState != nil {
- delete(s.dhcpState.watchers, ch)
- }
-}
-
// resolveToEndpoint resolves an object name or address to an endpoint.
func (s *xserver) resolveToEndpoint(ctx *context.T, address string) (naming.Endpoint, error) {
var resolved *naming.MountEntry
@@ -297,75 +248,89 @@
return nil, verror.New(errFailedToResolveToEndpoint, s.ctx, address)
}
-// createEndpoints creates appropriate inaming.Endpoint instances for
-// all of the externally accessible network addresses that can be used
-// to reach this server.
-func (s *xserver) createEndpoints(lep naming.Endpoint, chooser netstate.AddressChooser) ([]*inaming.Endpoint, string, bool, error) {
- iep, ok := lep.(*inaming.Endpoint)
- if !ok {
- return nil, "", false, verror.New(errInternalTypeConversion, nil, fmt.Sprintf("%T", lep))
- }
- if !strings.HasPrefix(iep.Protocol, "tcp") &&
- !strings.HasPrefix(iep.Protocol, "ws") {
- // If not tcp, ws, or wsh, just return the endpoint we were given.
- return []*inaming.Endpoint{iep}, "", false, nil
- }
- host, port, err := net.SplitHostPort(iep.Address)
- if err != nil {
- return nil, "", false, err
- }
- addrs, unspecified, err := netstate.PossibleAddresses(iep.Protocol, host, chooser)
- if err != nil {
- return nil, port, false, err
- }
-
- ieps := make([]*inaming.Endpoint, 0, len(addrs))
- for _, addr := range addrs {
- n, err := inaming.NewEndpoint(lep.String())
- if err != nil {
- return nil, port, false, err
- }
- n.IsMountTable = s.servesMountTable
- n.IsLeaf = s.isLeaf
- n.Address = net.JoinHostPort(addr.String(), port)
- ieps = append(ieps, n)
- }
- return ieps, port, unspecified, nil
+// createEndpoint adds server publishing information to the ep from the manager.
+func (s *xserver) createEndpoint(lep naming.Endpoint) *inaming.Endpoint {
+ n := *(lep.(*inaming.Endpoint))
+ n.IsMountTable = s.servesMountTable
+ n.IsLeaf = s.isLeaf
+ return &n
}
-func (s *xserver) update(pep naming.Endpoint) func([]naming.Endpoint) {
- return func(leps []naming.Endpoint) {
- chosenEps := make(map[string]*inaming.Endpoint)
- pkey := pep.String()
- for _, ep := range leps {
- eps, _, _, _ := s.createEndpoints(ep, s.addressChooser)
- for _, cep := range eps {
- chosenEps[cep.String()] = cep
+func (s *xserver) listen(ctx *context.T, listenSpec rpc.ListenSpec) {
+ s.Lock()
+ defer s.Unlock()
+ if len(listenSpec.Proxy) > 0 {
+ ep, err := s.resolveToEndpoint(ctx, listenSpec.Proxy)
+ if err != nil {
+ s.ctx.VI(2).Infof("resolveToEndpoint(%q) failed: %v", listenSpec.Proxy, err)
+ } else {
+ err = s.flowMgr.ProxyListen(ctx, ep)
+ if err != nil {
+ s.ctx.VI(2).Infof("Listen(%q, %q, ...) failed: %v", inaming.Network, ep, err)
}
- // TODO(suharshs): do protoEndpoints need to be handled here?
}
-
- // Endpoints to add and remove.
- s.Lock()
- oldEps := s.proxyEndpoints[pkey]
- s.proxyEndpoints[pkey] = chosenEps
- rmEps := setDiff(oldEps, chosenEps)
- addEps := setDiff(chosenEps, oldEps)
- for k := range rmEps {
- delete(s.chosenEndpoints, k)
- }
- for k, ep := range addEps {
- s.chosenEndpoints[k] = ep
- }
- s.Unlock()
-
- for k := range rmEps {
- s.publisher.RemoveServer(k)
- }
- for k := range addEps {
- s.publisher.AddServer(k)
+ if err != nil {
+ s.lnErrors = append(s.lnErrors, err)
}
}
+ for _, addr := range listenSpec.Addrs {
+ if len(addr.Address) > 0 {
+ err := s.flowMgr.Listen(ctx, addr.Protocol, addr.Address)
+ if err != nil {
+ s.ctx.VI(2).Infof("Listen(%q, %q, ...) failed: %v", addr.Protocol, addr.Address, err)
+ s.lnErrors = append(s.lnErrors, err)
+ }
+ }
+ }
+
+ // We call updateEndpointsLocked in serial once to populate our endpoints for
+ // server status with at least one endpoint.
+ leps, _ := s.flowMgr.ListeningEndpoints()
+ s.updateEndpointsLocked(leps)
+ s.active.Add(2)
+ go s.updateEndpointsLoop()
+ go s.acceptLoop(ctx)
+}
+
+func (s *xserver) updateEndpointsLoop() {
+ defer s.active.Done()
+ for leps, changed := s.flowMgr.ListeningEndpoints(); changed != nil; {
+ s.Lock()
+ s.updateEndpointsLocked(leps)
+ if s.valid != nil {
+ close(s.valid)
+ s.valid = make(chan struct{})
+ }
+ s.Unlock()
+ <-changed
+ leps, changed = s.flowMgr.ListeningEndpoints()
+ }
+}
+
+func (s *xserver) updateEndpointsLocked(leps []naming.Endpoint) {
+ endpoints := make(map[string]*inaming.Endpoint)
+ for _, ep := range leps {
+ sep := s.createEndpoint(ep)
+ endpoints[sep.String()] = sep
+ }
+ // Endpoints to add and remaove.
+ rmEps := setDiff(s.endpoints, endpoints)
+ addEps := setDiff(endpoints, s.endpoints)
+ for k := range rmEps {
+ delete(s.endpoints, k)
+ }
+ for k, ep := range addEps {
+ s.endpoints[k] = ep
+ }
+
+ s.Unlock()
+ for k := range rmEps {
+ s.publisher.RemoveServer(k)
+ }
+ for k := range addEps {
+ s.publisher.AddServer(k)
+ }
+ s.Lock()
}
// setDiff returns the endpoints in a that are not in b.
@@ -379,62 +344,6 @@
return ret
}
-func (s *xserver) listen(ctx *context.T, listenSpec rpc.ListenSpec) {
- s.Lock()
- defer s.Unlock()
- var lastErr error
- var ep naming.Endpoint
- if len(listenSpec.Proxy) > 0 {
- ep, lastErr = s.resolveToEndpoint(ctx, listenSpec.Proxy)
- if lastErr != nil {
- s.ctx.VI(2).Infof("resolveToEndpoint(%q) failed: %v", listenSpec.Proxy, lastErr)
- } else {
- lastErr = s.flowMgr.ProxyListen(ctx, ep, s.update(ep))
- if lastErr != nil {
- s.ctx.VI(2).Infof("Listen(%q, %q, ...) failed: %v", inaming.Network, ep, lastErr)
- }
- }
- }
- for _, addr := range listenSpec.Addrs {
- if len(addr.Address) > 0 {
- lastErr = s.flowMgr.Listen(ctx, addr.Protocol, addr.Address)
- if lastErr != nil {
- s.ctx.VI(2).Infof("Listen(%q, %q, ...) failed: %v", addr.Protocol, addr.Address, lastErr)
- }
- }
- }
-
- leps := s.flowMgr.ListeningEndpoints()
- s.addressChooser = listenSpec.AddressChooser
- roaming := false
- chosenEps := make(map[string]*inaming.Endpoint)
- protoEps := make(map[string]*inaming.Endpoint)
- lnErrs := make(map[string]error)
- for _, ep := range leps {
- eps, _, eproaming, eperr := s.createEndpoints(ep, listenSpec.AddressChooser)
- for _, cep := range eps {
- chosenEps[cep.String()] = cep
- }
- if eproaming && eperr == nil {
- protoEps[ep.String()] = ep.(*inaming.Endpoint)
- roaming = true
- }
- if eperr != nil {
- lnErrs[ep.String()] = eperr
- }
- }
- s.chosenEndpoints = chosenEps
- s.protoEndpoints = protoEps
- s.lnErrors = lnErrs
-
- if roaming && s.dhcpState == nil && s.settingsPublisher != nil {
- // TODO(mattr): Support roaming.
- }
-
- s.active.Add(1)
- go s.acceptLoop(ctx)
-}
-
func (s *xserver) acceptLoop(ctx *context.T) error {
var calls sync.WaitGroup
defer func() {
diff --git a/runtime/internal/rt/runtime.go b/runtime/internal/rt/runtime.go
index 9dd8a22..78c4bdb 100644
--- a/runtime/internal/rt/runtime.go
+++ b/runtime/internal/rt/runtime.go
@@ -512,10 +512,11 @@
if err != nil {
return nil, err
}
- return manager.New(ctx, rid), nil
+ id, _ := ctx.Value(initKey).(*initData)
+ return manager.New(ctx, rid, id.settingsPublisher), nil
}
-func (r *Runtime) commonServerInit(ctx *context.T, opts ...rpc.ServerOpt) (*pubsub.Publisher, string, []rpc.ServerOpt, error) {
+func (r *Runtime) commonServerInit(ctx *context.T, opts ...rpc.ServerOpt) (*pubsub.Publisher, []rpc.ServerOpt, error) {
otherOpts := append([]rpc.ServerOpt{}, opts...)
if reservedDispatcher := r.GetReservedNameDispatcher(ctx); reservedDispatcher != nil {
otherOpts = append(otherOpts, irpc.ReservedNameDispatcher{
@@ -526,17 +527,17 @@
if id.protocols != nil {
otherOpts = append(otherOpts, irpc.PreferredServerResolveProtocols(id.protocols))
}
- return id.settingsPublisher, id.settingsName, otherOpts, nil
+ return id.settingsPublisher, otherOpts, nil
}
func (r *Runtime) WithNewServer(ctx *context.T, name string, object interface{}, auth security.Authorizer, opts ...rpc.ServerOpt) (*context.T, rpc.Server, error) {
defer apilog.LogCall(ctx)(ctx) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
if ref.RPCTransitionState() >= ref.XServers {
- spub, sname, opts, err := r.commonServerInit(ctx, opts...)
+ spub, opts, err := r.commonServerInit(ctx, opts...)
if err != nil {
return ctx, nil, err
}
- newctx, s, err := irpc.WithNewServer(ctx, name, object, auth, spub, sname, opts...)
+ newctx, s, err := irpc.WithNewServer(ctx, name, object, auth, spub, opts...)
if err != nil {
return ctx, nil, err
}
@@ -563,11 +564,11 @@
func (r *Runtime) WithNewDispatchingServer(ctx *context.T, name string, disp rpc.Dispatcher, opts ...rpc.ServerOpt) (*context.T, rpc.Server, error) {
defer apilog.LogCall(ctx)(ctx) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
if ref.RPCTransitionState() >= ref.XServers {
- spub, sname, opts, err := r.commonServerInit(ctx, opts...)
+ spub, opts, err := r.commonServerInit(ctx, opts...)
if err != nil {
return ctx, nil, err
}
- newctx, s, err := irpc.WithNewDispatchingServer(ctx, name, disp, spub, sname, opts...)
+ newctx, s, err := irpc.WithNewDispatchingServer(ctx, name, disp, spub, opts...)
if err != nil {
return ctx, nil, err
}
diff --git a/runtime/internal/rt/security.go b/runtime/internal/rt/security.go
index 35d9427..c637ee1 100644
--- a/runtime/internal/rt/security.go
+++ b/runtime/internal/rt/security.go
@@ -80,7 +80,13 @@
} else {
name = "anonymous"
}
- if host, _ := os.Hostname(); len(host) > 0 {
+ host, _ := os.Hostname()
+ if host == "(none)" {
+ // (none) is a common default hostname and contains parentheses,
+ // which are invalid blessings characters.
+ host = "anonymous"
+ }
+ if len(host) > 0 {
name = name + "@" + host
}
return fmt.Sprintf("%s-%d", name, os.Getpid())
diff --git a/services/xproxyd/proxy_test.go b/services/xproxyd/proxy_test.go
index b1eaa78..a6314d2 100644
--- a/services/xproxyd/proxy_test.go
+++ b/services/xproxyd/proxy_test.go
@@ -58,12 +58,12 @@
}
// Wait for the server to finish listening through the proxy.
eps := s.Status().Endpoints
- for ; len(eps) < 2 || eps[1].Addr().Network() == bidiProtocol; eps = s.Status().Endpoints {
+ for ; len(eps) == 0 || eps[0].Addr().Network() == bidiProtocol; eps = s.Status().Endpoints {
time.Sleep(pollTime)
}
var got string
- if err := v23.GetClient(ctx).Call(ctx, eps[1].Name(), "Echo", []interface{}{"hello"}, []interface{}{&got}); err != nil {
+ if err := v23.GetClient(ctx).Call(ctx, eps[0].Name(), "Echo", []interface{}{"hello"}, []interface{}{&got}); err != nil {
t.Fatal(err)
}
if want := "response:hello"; got != want {
@@ -147,13 +147,13 @@
}
// Wait for the server to finish listening through the proxy.
eps := s.Status().Endpoints
- for ; len(eps) < 2 || eps[1].Addr().Network() == bidiProtocol; eps = s.Status().Endpoints {
+ for ; len(eps) == 0 || eps[0].Addr().Network() == bidiProtocol; eps = s.Status().Endpoints {
time.Sleep(pollTime)
}
// The call should succeed which means that the client did not try to authorize
// the proxy's blessings.
var got string
- if err := v23.GetClient(ctx).Call(ctx, eps[1].Name(), "Echo", []interface{}{"hello"}, []interface{}{&got}); err != nil {
+ if err := v23.GetClient(ctx).Call(ctx, eps[0].Name(), "Echo", []interface{}{"hello"}, []interface{}{&got}); err != nil {
t.Fatal(err)
}
if want := "response:hello"; got != want {
@@ -179,19 +179,19 @@
pep := startProxy(t, ctx, address{"kill", "127.0.0.1:0"})
- done := make(chan struct{})
- update := func(eps []naming.Endpoint) {
- if len(eps) > 0 {
+ if err := am.ProxyListen(ctx, pep); err != nil {
+ t.Fatal(err)
+ }
+ for {
+ eps, changed := am.ListeningEndpoints()
+ if eps[0].Addr().Network() != bidiProtocol {
if err := testEndToEndConnection(t, ctx, dm, am, eps[0]); err != nil {
t.Error(err)
}
- close(done)
+ return
}
+ <-changed
}
- if err := am.ProxyListen(ctx, pep, update); err != nil {
- t.Fatal(err)
- }
- <-done
}
func TestMultipleProxies(t *testing.T) {
@@ -215,27 +215,26 @@
p3ep := startProxy(t, ctx, address{"v23", p2ep.String()}, address{"kill", "127.0.0.1:0"})
- done := make(chan struct{})
- update := func(eps []naming.Endpoint) {
+ if err := am.ProxyListen(ctx, p3ep); err != nil {
+ t.Fatal(err)
+ }
+
+ for {
+ eps, changed := am.ListeningEndpoints()
// TODO(suharshs): Fix this test once we have the proxy send update messages to the
// server when it reconnects to a proxy. This test only really tests the first connection
// currently because the connections are cached. So we need to kill connections and
// wait for them to reestablish but we need proxies to update communicate their new endpoints
// to each other and to the server. For now we at least check a random endpoint so the
// test will at least fail over many runs if something is wrong.
- if len(eps) > 0 {
+ if eps[0].Addr().Network() != bidiProtocol {
if err := testEndToEndConnection(t, ctx, dm, am, eps[rand.Int()%3]); err != nil {
t.Error(err)
}
- close(done)
+ return
}
+ <-changed
}
-
- if err := am.ProxyListen(ctx, p3ep, update); err != nil {
- t.Fatal(err)
- }
-
- <-done
}
func testEndToEndConnection(t *testing.T, ctx *context.T, dm, am flow.Manager, aep naming.Endpoint) error {
diff --git a/services/xproxyd/proxyd.go b/services/xproxyd/proxyd.go
index d14b2f2..ebf4703 100644
--- a/services/xproxyd/proxyd.go
+++ b/services/xproxyd/proxyd.go
@@ -49,7 +49,9 @@
}
func (p *proxy) ListeningEndpoints() []naming.Endpoint {
- return p.m.ListeningEndpoints()
+ // TODO(suharshs): Return changed channel here as well.
+ eps, _ := p.m.ListeningEndpoints()
+ return eps
}
func (p *proxy) MultipleProxyEndpoints() []naming.Endpoint {
@@ -165,7 +167,7 @@
func (p *proxy) returnEndpoints(ctx *context.T, rid naming.RoutingID, route string) ([]naming.Endpoint, error) {
p.mu.Lock()
- eps := p.m.ListeningEndpoints()
+ eps, _ := p.m.ListeningEndpoints()
for _, peps := range p.proxyEndpoints {
eps = append(eps, peps...)
}