flow: Eliminate manger.NewWithBlessings. We don't really need it.
Change-Id: I9d166870a76174180b36d1ca4f85c160909db4f5
diff --git a/runtime/internal/flow/manager/manager.go b/runtime/internal/flow/manager/manager.go
index fa7a885..02b7f40 100644
--- a/runtime/internal/flow/manager/manager.go
+++ b/runtime/internal/flow/manager/manager.go
@@ -74,9 +74,20 @@
roaming bool
}
-func NewWithBlessings(ctx *context.T, serverBlessings security.Blessings, rid naming.RoutingID,
- serverAuthorizedPeers []security.BlessingPattern, dhcpPublisher *pubsub.Publisher,
- channelTimeout time.Duration, idleExpiry time.Duration) flow.Manager {
+// New creates a new flow manager.
+func New(
+ ctx *context.T,
+ rid naming.RoutingID,
+ dhcpPublisher *pubsub.Publisher,
+ channelTimeout time.Duration,
+ idleExpiry time.Duration,
+ authorizedPeers []security.BlessingPattern) flow.Manager {
+ var serverBlessings security.Blessings
+ if rid != naming.NullRoutingID {
+ // TODO(mattr,ashankar): Have the server listen on this channel of
+ // changing default blessings and update itself.
+ serverBlessings, _ = v23.GetPrincipal(ctx).BlessingStore().Default()
+ }
m := &manager{
rid: rid,
closed: make(chan struct{}),
@@ -87,7 +98,7 @@
}
if rid != naming.NullRoutingID {
m.serverBlessings = serverBlessings
- m.serverAuthorizedPeers = serverAuthorizedPeers
+ m.serverAuthorizedPeers = authorizedPeers
m.serverNames = security.BlessingNames(v23.GetPrincipal(ctx), serverBlessings)
m.ls = &listenState{
q: upcqueue.New(),
@@ -125,15 +136,6 @@
return m
}
-func New(ctx *context.T, rid naming.RoutingID, dhcpPublisher *pubsub.Publisher,
- channelTimeout time.Duration, idleExpiry time.Duration) flow.Manager {
- var serverBlessings security.Blessings
- if rid != naming.NullRoutingID {
- serverBlessings, _ = v23.GetPrincipal(ctx).BlessingStore().Default()
- }
- return NewWithBlessings(ctx, serverBlessings, rid, nil, dhcpPublisher, channelTimeout, idleExpiry)
-}
-
func (m *manager) stopListening() {
if m.ls == nil {
return
diff --git a/runtime/internal/flow/manager/manager_test.go b/runtime/internal/flow/manager/manager_test.go
index 20d754b..e0366a8 100644
--- a/runtime/internal/flow/manager/manager_test.go
+++ b/runtime/internal/flow/manager/manager_test.go
@@ -26,11 +26,11 @@
defer goroutines.NoLeaks(t, leakWaitTime)()
ctx, shutdown := test.V23Init()
- am := New(ctx, naming.FixedRoutingID(0x5555), nil, 0, 0)
+ am := New(ctx, naming.FixedRoutingID(0x5555), nil, 0, 0, nil)
if err := am.Listen(ctx, "tcp", "127.0.0.1:0"); err != nil {
t.Fatal(err)
}
- dm := New(ctx, naming.FixedRoutingID(0x1111), nil, 0, 0)
+ dm := New(ctx, naming.FixedRoutingID(0x1111), nil, 0, 0, nil)
testFlows(t, ctx, dm, am, flowtest.AllowAllPeersAuthorizer{})
@@ -43,12 +43,12 @@
defer goroutines.NoLeaks(t, leakWaitTime)()
ctx, shutdown := test.V23Init()
- am := New(ctx, naming.FixedRoutingID(0x5555), nil, 0, 0)
+ am := New(ctx, naming.FixedRoutingID(0x5555), nil, 0, 0, nil)
if err := am.Listen(ctx, "tcp", "127.0.0.1:0"); err != nil {
t.Fatal(err)
}
- dm := New(ctx, naming.FixedRoutingID(0x1111), nil, 0, 0)
+ dm := New(ctx, naming.FixedRoutingID(0x1111), nil, 0, 0, nil)
// At first the cache should be empty.
if got, want := len(dm.(*manager).cache.addrCache), 0; got != want {
t.Fatalf("got cache size %v, want %v", got, want)
@@ -78,12 +78,12 @@
defer goroutines.NoLeaks(t, leakWaitTime)()
ctx, shutdown := test.V23Init()
- am := New(ctx, naming.FixedRoutingID(0x5555), nil, 0, 0)
+ am := New(ctx, naming.FixedRoutingID(0x5555), nil, 0, 0, nil)
if err := am.Listen(ctx, "tcp", "127.0.0.1:0"); err != nil {
t.Fatal(err)
}
- dm := New(ctx, naming.FixedRoutingID(0x1111), nil, 0, 0)
+ dm := New(ctx, naming.FixedRoutingID(0x1111), nil, 0, 0, nil)
testFlows(t, ctx, dm, am, flowtest.AllowAllPeersAuthorizer{})
// Now am should be able to make a flow to dm even though dm is not listening.
testFlows(t, ctx, am, dm, flowtest.AllowAllPeersAuthorizer{})
@@ -97,18 +97,18 @@
defer goroutines.NoLeaks(t, leakWaitTime)()
ctx, shutdown := test.V23Init()
- am := New(ctx, naming.FixedRoutingID(0x5555), nil, 0, 0)
+ am := New(ctx, naming.FixedRoutingID(0x5555), nil, 0, 0, nil)
if err := am.Listen(ctx, "tcp", "127.0.0.1:0"); err != nil {
t.Fatal(err)
}
- nulldm := New(ctx, naming.NullRoutingID, nil, 0, 0)
+ nulldm := New(ctx, naming.NullRoutingID, nil, 0, 0, nil)
_, af := testFlows(t, ctx, nulldm, am, flowtest.AllowAllPeersAuthorizer{})
// Ensure that the remote blessings of the underlying conn of the accepted blessings
// only has the public key of the client and no certificates.
if rBlessings := af.Conn().(*conn.Conn).RemoteBlessings(); len(rBlessings.String()) > 0 || rBlessings.PublicKey() == nil {
t.Errorf("got %v, want no-cert blessings", rBlessings)
}
- dm := New(ctx, naming.FixedRoutingID(0x1111), nil, 0, 0)
+ dm := New(ctx, naming.FixedRoutingID(0x1111), nil, 0, 0, nil)
_, af = testFlows(t, ctx, dm, am, flowtest.AllowAllPeersAuthorizer{})
// Ensure that the remote blessings of the underlying conn of the accepted flow are
// non-zero if we did specify a RoutingID.
@@ -126,11 +126,11 @@
defer goroutines.NoLeaks(t, leakWaitTime)()
ctx, shutdown := test.V23Init()
- am := New(ctx, naming.FixedRoutingID(0x5555), nil, 0, 0)
+ am := New(ctx, naming.FixedRoutingID(0x5555), nil, 0, 0, nil)
if err := am.Listen(ctx, "tcp", "127.0.0.1:0"); err != nil {
t.Fatal(err)
}
- dm := New(ctx, naming.FixedRoutingID(0x1111), nil, 0, 0)
+ dm := New(ctx, naming.FixedRoutingID(0x1111), nil, 0, 0, nil)
testFlows(t, ctx, dm, am, flowtest.AllowAllPeersAuthorizer{})
lameEP := am.Status().Endpoints[0]
@@ -196,11 +196,11 @@
defer goroutines.NoLeaks(b, leakWaitTime)()
ctx, shutdown := test.V23Init()
- am := New(ctx, naming.FixedRoutingID(0x5555), nil, 0, 0)
+ am := New(ctx, naming.FixedRoutingID(0x5555), nil, 0, 0, nil)
if err := am.Listen(ctx, "tcp", "127.0.0.1:0"); err != nil {
b.Fatal(err)
}
- dm := New(ctx, naming.FixedRoutingID(0x1111), nil, 0, 0)
+ dm := New(ctx, naming.FixedRoutingID(0x1111), nil, 0, 0, nil)
// At first the cache should be empty.
if got, want := len(dm.(*manager).cache.addrCache), 0; got != want {
b.Fatalf("got cache size %v, want %v", got, want)
diff --git a/runtime/internal/rpc/client.go b/runtime/internal/rpc/client.go
index cc6a759..96cfb6b 100644
--- a/runtime/internal/rpc/client.go
+++ b/runtime/internal/rpc/client.go
@@ -111,7 +111,7 @@
}
}
if c.flowMgr == nil {
- c.flowMgr = manager.New(ctx, naming.NullRoutingID, nil, 0, connIdleExpiry)
+ c.flowMgr = manager.New(ctx, naming.NullRoutingID, nil, 0, connIdleExpiry, nil)
}
go func() {
diff --git a/runtime/internal/rpc/server.go b/runtime/internal/rpc/server.go
index 2b56e97..dff5290 100644
--- a/runtime/internal/rpc/server.go
+++ b/runtime/internal/rpc/server.go
@@ -112,8 +112,6 @@
}
ctx, cancel := context.WithCancel(ctx)
statsPrefix := naming.Join("rpc", "server", "routing-id", rid.String())
- // TODO(mattr,ashankar): Have the server listen on this channel of
- // changing default blessings and update itself.
blessings, _ := v23.GetPrincipal(ctx).BlessingStore().Default()
s := &server{
ctx: ctx,
@@ -175,7 +173,7 @@
s.ctx, s.cancel = context.WithRootCancel(ctx)
}
- s.flowMgr = manager.NewWithBlessings(s.ctx, s.blessings, rid, authorizedPeers, settingsPublisher, channelTimeout, connIdleExpiry)
+ s.flowMgr = manager.New(s.ctx, rid, settingsPublisher, channelTimeout, connIdleExpiry, authorizedPeers)
s.ctx, _, err = v23.WithNewClient(s.ctx,
clientFlowManagerOpt{s.flowMgr},
PreferredProtocols(s.preferredProtocols))
diff --git a/runtime/internal/rt/runtime.go b/runtime/internal/rt/runtime.go
index 2d63ae0..79bbac8 100644
--- a/runtime/internal/rt/runtime.go
+++ b/runtime/internal/rt/runtime.go
@@ -433,7 +433,7 @@
return nil, err
}
id, _ := ctx.Value(initKey).(*initData)
- return manager.New(ctx, rid, id.settingsPublisher, channelTimeout, id.connIdleExpiry), nil
+ return manager.New(ctx, rid, id.settingsPublisher, channelTimeout, id.connIdleExpiry, nil), nil
}
func (r *Runtime) commonServerInit(ctx *context.T, opts ...rpc.ServerOpt) (*pubsub.Publisher, []rpc.ServerOpt, error) {