v23: break dependency on lib/pubsub
MultiPart: 2/2
Change-Id: Ib989867e978a27a7a892b9f922b76cb9a3a442a0
diff --git a/profiles/chrome/chromeinit.go b/profiles/chrome/chromeinit.go
index 6445080..ccbfc83 100644
--- a/profiles/chrome/chromeinit.go
+++ b/profiles/chrome/chromeinit.go
@@ -37,7 +37,7 @@
protocols := []string{"wsh", "ws"}
listenSpec := rpc.ListenSpec{Addrs: rpc.ListenAddrs{{Protocol: "ws", Address: ""}}}
- runtime, ctx, shutdown, err := grt.Init(ctx, nil, protocols, &listenSpec, commonFlags.RuntimeFlags(), nil)
+ runtime, ctx, shutdown, err := grt.Init(ctx, nil, protocols, &listenSpec, nil, "", commonFlags.RuntimeFlags(), nil)
if err != nil {
return nil, nil, shutdown, err
}
diff --git a/profiles/gce/init.go b/profiles/gce/init.go
index 73a675f..d71409d 100644
--- a/profiles/gce/init.go
+++ b/profiles/gce/init.go
@@ -63,7 +63,7 @@
}
}
- runtime, ctx, shutdown, err := grt.Init(ctx, ac, nil, &listenSpec, commonFlags.RuntimeFlags(), nil)
+ runtime, ctx, shutdown, err := grt.Init(ctx, ac, nil, &listenSpec, nil, "", commonFlags.RuntimeFlags(), nil)
if err != nil {
return nil, nil, shutdown, err
}
diff --git a/profiles/genericinit.go b/profiles/genericinit.go
index b04189e..484f621 100644
--- a/profiles/genericinit.go
+++ b/profiles/genericinit.go
@@ -49,6 +49,8 @@
ac,
nil,
&listenSpec,
+ nil,
+ "",
commonFlags.RuntimeFlags(),
nil)
if err != nil {
diff --git a/profiles/internal/rpc/full_test.go b/profiles/internal/rpc/full_test.go
index ed52512..ad89931 100644
--- a/profiles/internal/rpc/full_test.go
+++ b/profiles/internal/rpc/full_test.go
@@ -19,6 +19,10 @@
"testing"
"time"
+ "v.io/x/lib/netstate"
+ "v.io/x/lib/pubsub"
+ "v.io/x/lib/vlog"
+
"v.io/v23"
"v.io/v23/context"
"v.io/v23/namespace"
@@ -31,8 +35,7 @@
"v.io/v23/vdl"
"v.io/v23/verror"
"v.io/v23/vtrace"
- "v.io/x/lib/netstate"
- "v.io/x/lib/vlog"
+
"v.io/x/ref/lib/stats"
"v.io/x/ref/profiles/internal/lib/publisher"
"v.io/x/ref/profiles/internal/lib/websocket"
@@ -75,12 +78,16 @@
c.Unlock()
}
-func testInternalNewServer(ctx *context.T, streamMgr stream.Manager, ns namespace.T, principal security.Principal, opts ...rpc.ServerOpt) (rpc.Server, error) {
+func testInternalNewServerWithPubsub(ctx *context.T, streamMgr stream.Manager, ns namespace.T, settingsPublisher *pubsub.Publisher, settingsStreamName string, principal security.Principal, opts ...rpc.ServerOpt) (rpc.Server, error) {
client, err := InternalNewClient(streamMgr, ns)
if err != nil {
return nil, err
}
- return InternalNewServer(ctx, streamMgr, ns, client, principal, opts...)
+ return InternalNewServer(ctx, streamMgr, ns, settingsPublisher, settingsStreamName, client, principal, opts...)
+}
+
+func testInternalNewServer(ctx *context.T, streamMgr stream.Manager, ns namespace.T, principal security.Principal, opts ...rpc.ServerOpt) (rpc.Server, error) {
+ return testInternalNewServerWithPubsub(ctx, streamMgr, ns, nil, "", principal, opts...)
}
type userType string
diff --git a/profiles/internal/rpc/pubsub.go b/profiles/internal/rpc/pubsub.go
new file mode 100644
index 0000000..7bfe46c
--- /dev/null
+++ b/profiles/internal/rpc/pubsub.go
@@ -0,0 +1,30 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package rpc
+
+import (
+ "net"
+
+ "v.io/x/lib/pubsub"
+)
+
+// NewAddAddrsSetting creates the Setting to be sent to Listen to inform
+// it of new addresses that have become available since the last change.
+func NewAddAddrsSetting(a []net.Addr) pubsub.Setting {
+ return pubsub.NewAny(NewAddrsSetting, NewAddrsSettingDesc, a)
+}
+
+// NewRmAddrsSetting creates the Setting to be sent to Listen to inform
+// it of addresses that are no longer available.
+func NewRmAddrsSetting(a []net.Addr) pubsub.Setting {
+ return pubsub.NewAny(RmAddrsSetting, RmAddrsSettingDesc, a)
+}
+
+const (
+ NewAddrsSetting = "NewAddrs"
+ NewAddrsSettingDesc = "New Addresses discovered since last change"
+ RmAddrsSetting = "RmAddrs"
+ RmAddrsSettingDesc = "Addresses that have been removed since last change"
+)
diff --git a/profiles/internal/rpc/resolve_test.go b/profiles/internal/rpc/resolve_test.go
index 20bf957..e0f6878 100644
--- a/profiles/internal/rpc/resolve_test.go
+++ b/profiles/internal/rpc/resolve_test.go
@@ -50,6 +50,8 @@
ac,
nil,
&listenSpec,
+ nil,
+ "",
commonFlags.RuntimeFlags(),
nil)
if err != nil {
diff --git a/profiles/internal/rpc/server.go b/profiles/internal/rpc/server.go
index 3516796..102f82a 100644
--- a/profiles/internal/rpc/server.go
+++ b/profiles/internal/rpc/server.go
@@ -83,15 +83,17 @@
type server struct {
sync.Mutex
// context used by the server to make internal RPCs, error messages etc.
- ctx *context.T
- cancel context.CancelFunc // function to cancel the above context.
- state serverState // track state of the server.
- streamMgr stream.Manager // stream manager to listen for new flows.
- publisher publisher.Publisher // publisher to publish mounttable mounts.
- listenerOpts []stream.ListenerOpt // listener opts for Listen.
- dhcpState *dhcpState // dhcpState, nil if not using dhcp
- principal security.Principal
- blessings security.Blessings
+ ctx *context.T
+ cancel context.CancelFunc // function to cancel the above context.
+ state serverState // track state of the server.
+ streamMgr stream.Manager // stream manager to listen for new flows.
+ publisher publisher.Publisher // publisher to publish mounttable mounts.
+ listenerOpts []stream.ListenerOpt // listener opts for Listen.
+ settingsPublisher *pubsub.Publisher // pubsub publisher for dhcp
+ settingsName string // pubwsub stream name for dhcp
+ dhcpState *dhcpState // dhcpState, nil if not using dhcp
+ principal security.Principal
+ blessings security.Blessings
// maps that contain state on listeners.
listenState map[*listenState]struct{}
@@ -173,6 +175,8 @@
ctx *context.T,
streamMgr stream.Manager,
ns namespace.T,
+ settingsPublisher *pubsub.Publisher,
+ settingsName string,
client rpc.Client,
principal security.Principal,
opts ...rpc.ServerOpt) (rpc.Server, error) {
@@ -180,18 +184,20 @@
ctx, _ = vtrace.WithNewSpan(ctx, "NewServer")
statsPrefix := naming.Join("rpc", "server", "routing-id", streamMgr.RoutingID().String())
s := &server{
- ctx: ctx,
- cancel: cancel,
- streamMgr: streamMgr,
- principal: principal,
- publisher: publisher.New(ctx, ns, publishPeriod),
- listenState: make(map[*listenState]struct{}),
- listeners: make(map[stream.Listener]struct{}),
- proxies: make(map[string]proxyState),
- stoppedChan: make(chan struct{}),
- ipNets: ipNetworks(),
- ns: ns,
- stats: newRPCStats(statsPrefix),
+ ctx: ctx,
+ cancel: cancel,
+ streamMgr: streamMgr,
+ principal: principal,
+ publisher: publisher.New(ctx, ns, publishPeriod),
+ listenState: make(map[*listenState]struct{}),
+ listeners: make(map[stream.Listener]struct{}),
+ proxies: make(map[string]proxyState),
+ stoppedChan: make(chan struct{}),
+ ipNets: ipNetworks(),
+ ns: ns,
+ stats: newRPCStats(statsPrefix),
+ settingsPublisher: settingsPublisher,
+ settingsName: settingsName,
}
var (
dischargeExpiryBuffer = vc.DefaultServerDischargeExpiryBuffer
@@ -412,11 +418,11 @@
return nil, verror.New(verror.ErrBadArg, s.ctx, "failed to create any listeners")
}
- if roaming && s.dhcpState == nil && listenSpec.StreamPublisher != nil {
+ if roaming && s.dhcpState == nil && s.settingsPublisher != nil {
// Create a dhcp listener if we haven't already done so.
dhcp := &dhcpState{
- name: listenSpec.StreamName,
- publisher: listenSpec.StreamPublisher,
+ name: s.settingsName,
+ publisher: s.settingsPublisher,
watchers: make(map[chan<- rpc.NetworkChange]struct{}),
}
s.dhcpState = dhcp
@@ -624,20 +630,17 @@
s.Unlock()
return
}
- var err error
- var changed []naming.Endpoint
- switch setting.Name() {
- case rpc.NewAddrsSetting:
- changed = s.addAddresses(v)
- case rpc.RmAddrsSetting:
- changed, err = s.removeAddresses(v)
- }
change := rpc.NetworkChange{
- Time: time.Now(),
- State: externalStates[s.state],
- Setting: setting,
- Changed: changed,
- Error: err,
+ Time: time.Now(),
+ State: externalStates[s.state],
+ }
+ switch setting.Name() {
+ case NewAddrsSetting:
+ change.Changed = s.addAddresses(v)
+ change.AddedAddrs = v
+ case RmAddrsSetting:
+ change.Changed, change.Error = s.removeAddresses(v)
+ change.RemovedAddrs = v
}
vlog.VI(2).Infof("rpc: dhcp: change %v", change)
for ch, _ := range s.dhcpState.watchers {
@@ -703,14 +706,11 @@
// to ensure that those addresses are externally reachable.
func (s *server) addAddresses(addrs []net.Addr) []naming.Endpoint {
var added []naming.Endpoint
- vlog.Infof("HERE WITH %v -> %v", addrs, netstate.ConvertToAddresses(addrs))
for _, address := range netstate.ConvertToAddresses(addrs) {
if !netstate.IsAccessibleIP(address) {
- vlog.Infof("RETURN A %v", added)
return added
}
host := getHost(address)
- vlog.Infof("LISTEN ST: %v", s.listenState)
for ls, _ := range s.listenState {
if ls != nil && ls.roaming {
niep := ls.protoIEP
@@ -724,7 +724,6 @@
}
}
}
- vlog.Infof("RETURN B %v", added)
return added
}
diff --git a/profiles/internal/rpc/server_test.go b/profiles/internal/rpc/server_test.go
index c23b3bd..900db9a 100644
--- a/profiles/internal/rpc/server_test.go
+++ b/profiles/internal/rpc/server_test.go
@@ -424,12 +424,6 @@
ns := tnaming.NewSimpleNamespace()
ctx, shutdown := initForTest()
defer shutdown()
- server, err := testInternalNewServer(ctx, sm, ns, testutil.NewPrincipal("test"))
- defer server.Stop()
-
- if err != nil {
- t.Fatal(err)
- }
publisher := pubsub.NewPublisher()
roaming := make(chan pubsub.Setting)
@@ -439,6 +433,12 @@
}
defer func() { publisher.Shutdown(); <-stop }()
+ server, err := testInternalNewServerWithPubsub(ctx, sm, ns, publisher, "TestRoaming", testutil.NewPrincipal("test"))
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer server.Stop()
+
ipv4And6 := func(network string, addrs []net.Addr) ([]net.Addr, error) {
accessible := netstate.ConvertToAddresses(addrs)
ipv4 := accessible.Filter(netstate.IsUnicastIPv4)
@@ -451,9 +451,7 @@
{"tcp", ":0"},
{"tcp", ":0"},
},
- StreamName: "TestRoaming",
- StreamPublisher: publisher,
- AddressChooser: ipv4And6,
+ AddressChooser: ipv4And6,
}
eps, err := server.Listen(spec)
@@ -488,7 +486,7 @@
server.WatchNetwork(watcher)
defer close(watcher)
- roaming <- rpc.NewAddAddrsSetting([]net.Addr{n1, n2})
+ roaming <- NewAddAddrsSetting([]net.Addr{n1, n2})
waitForChange := func() *rpc.NetworkChange {
vlog.Infof("Waiting on %p", watcher)
@@ -527,7 +525,7 @@
t.Fatalf("got %d, want %d", got, want)
}
- roaming <- rpc.NewRmAddrsSetting([]net.Addr{n1})
+ roaming <- NewRmAddrsSetting([]net.Addr{n1})
// We expect 2 changes, one for each usable listen spec addr.
change = waitForChange()
@@ -548,7 +546,7 @@
}
// Remove all addresses to mimic losing all connectivity.
- roaming <- rpc.NewRmAddrsSetting(getIPAddrs(nepsR))
+ roaming <- NewRmAddrsSetting(getIPAddrs(nepsR))
// We expect changes for all of the current endpoints
change = waitForChange()
@@ -561,7 +559,7 @@
t.Fatalf("got %d, want %d: %v", got, want, status.Mounts)
}
- roaming <- rpc.NewAddAddrsSetting([]net.Addr{n1})
+ roaming <- NewAddAddrsSetting([]net.Addr{n1})
// We expect 2 changes, one for each usable listen spec addr.
change = waitForChange()
if got, want := len(change.Changed), 2; got != want {
@@ -576,12 +574,6 @@
ns := tnaming.NewSimpleNamespace()
ctx, shutdown := initForTest()
defer shutdown()
- server, err := testInternalNewServer(ctx, sm, ns, testutil.NewPrincipal("test"))
- defer server.Stop()
-
- if err != nil {
- t.Fatal(err)
- }
publisher := pubsub.NewPublisher()
roaming := make(chan pubsub.Setting)
@@ -591,12 +583,16 @@
}
defer func() { publisher.Shutdown(); <-stop }()
+ server, err := testInternalNewServerWithPubsub(ctx, sm, ns, publisher, "TestWatcherDeadlock", testutil.NewPrincipal("test"))
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer server.Stop()
+
spec := rpc.ListenSpec{
Addrs: rpc.ListenAddrs{
{"tcp", ":0"},
},
- StreamName: "TestWatcherDeadlock",
- StreamPublisher: publisher,
}
eps, err := server.Listen(spec)
if err != nil {
@@ -615,12 +611,12 @@
defer close(watcher)
// Remove all addresses to mimic losing all connectivity.
- roaming <- rpc.NewRmAddrsSetting(getIPAddrs(eps))
+ roaming <- NewRmAddrsSetting(getIPAddrs(eps))
// Add in two new addresses
n1 := netstate.NewNetAddr("ip", "1.1.1.1")
n2 := netstate.NewNetAddr("ip", "2.2.2.2")
- roaming <- rpc.NewAddAddrsSetting([]net.Addr{n1, n2})
+ roaming <- NewAddAddrsSetting([]net.Addr{n1, n2})
neps := make([]naming.Endpoint, 0, len(eps))
for _, p := range getUniqPorts(eps) {
diff --git a/profiles/internal/rpc/test/proxy_test.go b/profiles/internal/rpc/test/proxy_test.go
index bd326f5..e6bd616 100644
--- a/profiles/internal/rpc/test/proxy_test.go
+++ b/profiles/internal/rpc/test/proxy_test.go
@@ -191,7 +191,7 @@
}
defer client.Close()
serverCtx, _ := v23.WithPrincipal(ctx, pserver)
- server, err := irpc.InternalNewServer(serverCtx, smserver, ns, nil, pserver)
+ server, err := irpc.InternalNewServer(serverCtx, smserver, ns, nil, "", nil, pserver)
if err != nil {
t.Fatal(err)
}
diff --git a/profiles/internal/rt/runtime.go b/profiles/internal/rt/runtime.go
index f0ec69f..bc9c3af 100644
--- a/profiles/internal/rt/runtime.go
+++ b/profiles/internal/rt/runtime.go
@@ -13,6 +13,8 @@
"syscall"
"time"
+ "v.io/x/lib/pubsub"
+
"v.io/v23"
"v.io/v23/context"
"v.io/v23/i18n"
@@ -52,9 +54,11 @@
)
type initData struct {
- appCycle v23.AppCycle
- listenSpec *rpc.ListenSpec
- protocols []string
+ appCycle v23.AppCycle
+ listenSpec *rpc.ListenSpec
+ protocols []string
+ settingsPublisher *pubsub.Publisher
+ settingsName string
}
type vtraceDependency struct{}
@@ -71,14 +75,18 @@
appCycle v23.AppCycle,
protocols []string,
listenSpec *rpc.ListenSpec,
+ settingsPublisher *pubsub.Publisher,
+ settingsName string,
flags flags.RuntimeFlags,
reservedDispatcher rpc.Dispatcher) (*Runtime, *context.T, v23.Shutdown, error) {
r := &Runtime{deps: dependency.NewGraph()}
ctx = context.WithValue(ctx, initKey, &initData{
- protocols: protocols,
- listenSpec: listenSpec,
- appCycle: appCycle,
+ protocols: protocols,
+ listenSpec: listenSpec,
+ appCycle: appCycle,
+ settingsPublisher: settingsPublisher,
+ settingsName: settingsName,
})
if reservedDispatcher != nil {
@@ -236,7 +244,7 @@
Blessings: principal.BlessingStore().Default(),
})
}
- server, err := irpc.InternalNewServer(ctx, sm, ns, r.GetClient(ctx), principal, otherOpts...)
+ server, err := irpc.InternalNewServer(ctx, sm, ns, id.settingsPublisher, id.settingsName, r.GetClient(ctx), principal, otherOpts...)
if err != nil {
return nil, err
}
diff --git a/profiles/internal/rt/runtime_test.go b/profiles/internal/rt/runtime_test.go
index cf6706d..f4cded9 100644
--- a/profiles/internal/rt/runtime_test.go
+++ b/profiles/internal/rt/runtime_test.go
@@ -20,7 +20,7 @@
// InitForTest creates a context for use in a test.
func InitForTest(t *testing.T) (*rt.Runtime, *context.T, v23.Shutdown) {
ctx, cancel := context.RootContext()
- r, ctx, shutdown, err := rt.Init(ctx, nil, nil, nil, flags.RuntimeFlags{}, nil)
+ r, ctx, shutdown, err := rt.Init(ctx, nil, nil, nil, nil, "", flags.RuntimeFlags{}, nil)
if err != nil {
t.Fatal(err)
}
diff --git a/profiles/roaming/.api b/profiles/roaming/.api
new file mode 100644
index 0000000..c15af37
--- /dev/null
+++ b/profiles/roaming/.api
@@ -0,0 +1,4 @@
+pkg roaming, const SettingsStreamDesc ideal-string
+pkg roaming, const SettingsStreamName ideal-string
+pkg roaming, func Init(*context.T) (v23.Runtime, *context.T, v23.Shutdown, error)
+pkg roaming, func NewProxy(*context.T, rpc.ListenSpec, ...string) (func(), naming.Endpoint, error)
diff --git a/profiles/roaming/roaminginit.go b/profiles/roaming/roaminginit.go
index 8ec4482..c6b37f6 100644
--- a/profiles/roaming/roaminginit.go
+++ b/profiles/roaming/roaminginit.go
@@ -31,6 +31,7 @@
"v.io/x/ref/profiles/internal"
"v.io/x/ref/profiles/internal/lib/appcycle"
"v.io/x/ref/profiles/internal/lib/websocket"
+ irpc "v.io/x/ref/profiles/internal/rpc"
_ "v.io/x/ref/profiles/internal/rpc/protocols/tcp"
_ "v.io/x/ref/profiles/internal/rpc/protocols/ws"
_ "v.io/x/ref/profiles/internal/rpc/protocols/wsh"
@@ -40,6 +41,7 @@
const (
SettingsStreamName = "roaming"
+ SettingsStreamDesc = "pubsub stream used by the roaming profile"
)
var commonFlags *flags.Flags
@@ -74,7 +76,7 @@
// flag to configure both the protocol and address.
return []net.Addr{netstate.NewNetAddr("wsh", addr.String())}, nil
}
- runtime, ctx, shutdown, err := rt.Init(ctx, ac, nil, &listenSpec, commonFlags.RuntimeFlags(), reservedDispatcher)
+ runtime, ctx, shutdown, err := rt.Init(ctx, ac, nil, &listenSpec, nil, "", commonFlags.RuntimeFlags(), reservedDispatcher)
if err != nil {
return nil, nil, shutdown, err
}
@@ -91,7 +93,8 @@
// Create stream in Init function to avoid a race between any
// goroutines started here and consumers started after Init returns.
ch := make(chan pubsub.Setting)
- stop, err := publisher.CreateStream(SettingsStreamName, SettingsStreamName, ch)
+ // TODO(cnicolaou): use stop to shutdown this stream when the profile shutdowns.
+ stop, err := publisher.CreateStream(SettingsStreamName, SettingsStreamDesc, ch)
if err != nil {
ac.Shutdown()
return nil, nil, nil, err
@@ -113,11 +116,9 @@
cleanupCh := make(chan struct{})
watcherCh := make(chan struct{})
- listenSpec.StreamPublisher = publisher
- listenSpec.StreamName = SettingsStreamName
listenSpec.AddressChooser = internal.IPAddressChooser
- runtime, ctx, shutdown, err := rt.Init(ctx, ac, nil, &listenSpec, commonFlags.RuntimeFlags(), reservedDispatcher)
+ runtime, ctx, shutdown, err := rt.Init(ctx, ac, nil, &listenSpec, publisher, SettingsStreamName, commonFlags.RuntimeFlags(), reservedDispatcher)
if err != nil {
return nil, nil, shutdown, err
}
@@ -170,12 +171,12 @@
}
if len(removed) > 0 {
vlog.VI(2).Infof("Sending removed: %s", removed)
- ch <- rpc.NewRmAddrsSetting(removed.AsNetAddrs())
+ ch <- irpc.NewRmAddrsSetting(removed.AsNetAddrs())
}
// We will always send the best currently available address
if chosen, err := listenSpec.AddressChooser(listenSpec.Addrs[0].Protocol, cur.AsNetAddrs()); err == nil && chosen != nil {
vlog.VI(2).Infof("Sending added and chosen: %s", chosen)
- ch <- rpc.NewAddAddrsSetting(chosen)
+ ch <- irpc.NewAddAddrsSetting(chosen)
} else {
vlog.VI(2).Infof("Ignoring added %s", added)
}
diff --git a/profiles/static/.api b/profiles/static/.api
new file mode 100644
index 0000000..2d3ebb3
--- /dev/null
+++ b/profiles/static/.api
@@ -0,0 +1,2 @@
+pkg static, func Init(*context.T) (v23.Runtime, *context.T, v23.Shutdown, error)
+pkg static, func NewProxy(*context.T, rpc.ListenSpec, ...string) (func(), naming.Endpoint, error)
diff --git a/profiles/static/staticinit.go b/profiles/static/staticinit.go
index 339e32a..dc93eae 100644
--- a/profiles/static/staticinit.go
+++ b/profiles/static/staticinit.go
@@ -57,7 +57,7 @@
listenSpec.AddressChooser = func(string, []net.Addr) ([]net.Addr, error) {
return []net.Addr{addr}, nil
}
- runtime, ctx, shutdown, err := rt.Init(ctx, ac, nil, &listenSpec, commonFlags.RuntimeFlags(), reservedDispatcher)
+ runtime, ctx, shutdown, err := rt.Init(ctx, ac, nil, &listenSpec, nil, "", commonFlags.RuntimeFlags(), reservedDispatcher)
if err != nil {
return nil, nil, nil, err
}
@@ -70,7 +70,7 @@
}
listenSpec.AddressChooser = internal.IPAddressChooser
- runtime, ctx, shutdown, err := rt.Init(ctx, ac, nil, &listenSpec, commonFlags.RuntimeFlags(), reservedDispatcher)
+ runtime, ctx, shutdown, err := rt.Init(ctx, ac, nil, &listenSpec, nil, "", commonFlags.RuntimeFlags(), reservedDispatcher)
if err != nil {
return nil, nil, shutdown, err
}