veyron2/ipc: 3 of n. 'Invoker and Signature' rationalisation.
- implement the changed ipc.Serve signature
- transition all code to use it and ipc.ServeDispatcher
- get rid of the ugliness around calling Serve multiple
times to publish under multiple names in the mount table
and instead provide AddName/RemoveName methods.
Change-Id: Ic5edd709d28e2670369986a5b0fce4cd22e3cafd
diff --git a/runtimes/google/ipc/benchmarks/server.go b/runtimes/google/ipc/benchmarks/server.go
index aca9f60..b01f59f 100644
--- a/runtimes/google/ipc/benchmarks/server.go
+++ b/runtimes/google/ipc/benchmarks/server.go
@@ -41,7 +41,7 @@
if err != nil {
vlog.Fatalf("Listen failed: %v", err)
}
- if err := server.Serve("", ipc.LeafDispatcher(NewServerBenchmark(&impl{}), sflag.NewAuthorizerOrDie())); err != nil {
+ if err := server.Serve("", &impl{}, sflag.NewAuthorizerOrDie()); err != nil {
vlog.Fatalf("Serve failed: %v", err)
}
return naming.JoinAddressName(ep.String(), ""), func() {
diff --git a/runtimes/google/ipc/cancel_test.go b/runtimes/google/ipc/cancel_test.go
index cc30a2f..0ad572c 100644
--- a/runtimes/google/ipc/cancel_test.go
+++ b/runtimes/google/ipc/cancel_test.go
@@ -73,7 +73,7 @@
stop: s.Stop,
}
- if err := s.Serve(name, ipc.LeafDispatcher(c, fakeAuthorizer(0))); err != nil {
+ if err := s.Serve(name, c, fakeAuthorizer(0)); err != nil {
return nil, err
}
diff --git a/runtimes/google/ipc/debug_test.go b/runtimes/google/ipc/debug_test.go
index e05c185..eaa8911 100644
--- a/runtimes/google/ipc/debug_test.go
+++ b/runtimes/google/ipc/debug_test.go
@@ -41,7 +41,7 @@
t.Fatalf("InternalNewServer failed: %v", err)
}
defer server.Stop()
- server.Serve("", ipc.LeafDispatcher(&testObject{}, nil))
+ server.Serve("", &testObject{}, nil)
ep, err := server.Listen(listenSpec)
if err != nil {
t.Fatalf("server.Listen failed: %v", err)
diff --git a/runtimes/google/ipc/full_test.go b/runtimes/google/ipc/full_test.go
index cd83055..19252f1 100644
--- a/runtimes/google/ipc/full_test.go
+++ b/runtimes/google/ipc/full_test.go
@@ -180,10 +180,10 @@
}
vlog.VI(1).Info("server.Serve")
disp := testServerDisp{ts}
- if err := server.Serve("mountpoint/server", disp); err != nil {
+ if err := server.ServeDispatcher("mountpoint/server", disp); err != nil {
t.Errorf("server.Publish failed: %v", err)
}
- if err := server.Serve("mountpoint/discharger", disp); err != nil {
+ if err := server.AddName("mountpoint/discharger"); err != nil {
t.Errorf("server.Publish for discharger failed: %v", err)
}
return ep, server
@@ -208,7 +208,7 @@
verifyMount(t, ns, n1)
// publish a second name
- if err := server.Serve(n2, nil); err != nil {
+ if err := server.AddName(n2); err != nil {
t.Errorf("server.Serve failed: %v", err)
}
verifyMount(t, ns, n2)
@@ -221,7 +221,7 @@
verifyMountMissing(t, ns, n2)
// Check that we can no longer serve after Stop.
- err := server.Serve("name doesn't matter", nil)
+ err := server.AddName("name doesn't matter")
if err == nil || err.Error() != "ipc: server is stopped" {
t.Errorf("either no error, or a wrong error was returned: %v", err)
}
@@ -267,7 +267,7 @@
return err == nil || strings.Index(err.Error(), pattern) >= 0
}
-func TestMultipleCallsToServe(t *testing.T) {
+func TestMultipleCallsToServeAndName(t *testing.T) {
sm := imanager.InternalNew(naming.FixedRoutingID(0x555555555))
ns := tnaming.NewSimpleNamespace()
server, err := InternalNewServer(testContext(), sm, ns, vc.LocalPrincipal{sectest.NewPrincipal()})
@@ -280,8 +280,8 @@
}
disp := &testServerDisp{&testServer{}}
- if err := server.Serve("mountpoint/server", disp); err != nil {
- t.Errorf("server.Publish failed: %v", err)
+ if err := server.ServeDispatcher("mountpoint/server", disp); err != nil {
+ t.Errorf("server.ServeDispatcher failed: %v", err)
}
n1 := "mountpoint/server"
@@ -291,17 +291,35 @@
verifyMount(t, ns, n1)
- if err := server.Serve(n2, disp); err != nil {
- t.Errorf("server.Serve failed: %v", err)
+ if server.ServeDispatcher(n2, disp) == nil {
+ t.Errorf("server.ServeDispatcher should have failed")
}
- if err := server.Serve(n3, nil); err != nil {
- t.Errorf("server.Serve failed: %v", err)
+
+ if err := server.Serve(n2, &testServer{}, nil); err == nil {
+ t.Errorf("server.Serve should have failed")
+ }
+
+ if err := server.AddName(n3); err != nil {
+ t.Errorf("server.AddName failed: %v", err)
+ }
+
+ if err := server.AddName(n3); err != nil {
+ t.Errorf("server.AddName failed: %v", err)
}
verifyMount(t, ns, n2)
verifyMount(t, ns, n3)
- if err := server.Serve(n4, &testServerDisp{&testServer{}}); err == nil {
- t.Errorf("server.Serve should have failed")
+ if err := server.RemoveName(n1); err != nil {
+ t.Errorf("server.RemoveName failed: %v", err)
+ }
+ verifyMountMissing(t, ns, n1)
+
+ if err := server.RemoveName("some randome name"); err == nil {
+ t.Errorf("server.RemoveName should have failed")
+ }
+
+ if err := server.ServeDispatcher(n4, &testServerDisp{&testServer{}}); err == nil {
+ t.Errorf("server.ServeDispatcher should have failed")
}
verifyMountMissing(t, ns, n4)
@@ -602,7 +620,7 @@
}
var tester dischargeImpetusTester
- if err := server.Serve("mountpoint", &tester); err != nil {
+ if err := server.ServeDispatcher("mountpoint", &tester); err != nil {
t.Fatal(err)
}
@@ -1048,7 +1066,7 @@
t.Fatalf("server.Listen failed: %v", err)
}
disp := &testServerDisp{&testServer{}}
- if err := server.Serve("mp/server", disp); err != nil {
+ if err := server.ServeDispatcher("mp/server", disp); err != nil {
t.Fatalf("server.Serve failed: %v", err)
}
client, err := InternalNewClient(sm, ns, options.VCSecurityNone)
diff --git a/runtimes/google/ipc/server.go b/runtimes/google/ipc/server.go
index 44ae7ab..943a71b 100644
--- a/runtimes/google/ipc/server.go
+++ b/runtimes/google/ipc/server.go
@@ -47,7 +47,11 @@
stoppedChan chan struct{} // closed when the server has been stopped.
ns naming.Namespace
servesMountTable bool
- reservedOpt options.ReservedNameDispatcher
+ // TODO(cnicolaou): remove this when the publisher tracks published names
+ // and can return an appropriate error for RemoveName on a name that
+ // wasn't 'Added' for this server.
+ names map[string]struct{}
+ reservedOpt options.ReservedNameDispatcher
// TODO(cnicolaou): add roaming stats to ipcStats
stats *ipcStats // stats for this server.
}
@@ -533,51 +537,66 @@
}
}
-func (s *server) Serve(name string, obj interface{}) error {
+func (s *server) Serve(name string, obj interface{}, authorizer security.Authorizer) error {
if obj == nil {
- // nil is an allowed value for obj.
- return s.ServeDispatcher(name, ipc.Dispatcher(nil))
+ // The ReflectInvoker inside the LeafDispatcher will panic
+ // if called for a nil value.
+ return fmt.Errorf("A nil object is not allowed")
}
- // TRANSITION: this will be disallowed when the transition is complete.
- if disp, ok := obj.(ipc.Dispatcher); ok {
- return s.ServeDispatcher(name, disp)
- }
- // TRANSITION: We may fail the dispatcher type test, but still be a
- // dispatcher becase our Lookup method returns ipc.Invoker and not a
- // raw object. This code here will detect that case and panic as an aid
- // to catching these cases early.
- typ := reflect.TypeOf(obj)
- if lookup, found := typ.MethodByName("Lookup"); found {
- if lookup.Type.NumIn() == 3 && lookup.Type.NumOut() == 3 {
- inv := lookup.Type.Out(0)
- if inv.Name() == "Invoker" {
- panic(fmt.Sprintf("%q has a Lookup that returns an Invoker", lookup.Name))
- }
- }
- }
- // TRANSITION: this will go away in the transition.
- panic("should never get here")
+ return s.ServeDispatcher(name, ipc.LeafDispatcher(obj, authorizer))
}
func (s *server) ServeDispatcher(name string, disp ipc.Dispatcher) error {
- defer vlog.LogCall()()
s.Lock()
defer s.Unlock()
if s.stopped {
return errServerStopped
}
- if s.disp != nil && disp != nil && s.disp != disp {
- return fmt.Errorf("attempt to change dispatcher")
+ if disp == nil {
+ return fmt.Errorf("A nil dispacther is not allowed")
}
- if disp != nil {
- s.disp = disp
+ if s.disp != nil {
+ return fmt.Errorf("Serve or ServeDispatcher has already been called")
}
+ s.disp = disp
+ s.names = make(map[string]struct{})
if len(name) > 0 {
s.publisher.AddName(name)
+ s.names[name] = struct{}{}
}
return nil
}
+func (s *server) AddName(name string) error {
+ s.Lock()
+ defer s.Unlock()
+ if s.stopped {
+ return errServerStopped
+ }
+ if len(name) == 0 {
+ return fmt.Errorf("empty name")
+ }
+ s.publisher.AddName(name)
+ // TODO(cnicolaou): remove this map when the publisher's RemoveName
+ // method returns an error.
+ s.names[name] = struct{}{}
+ return nil
+}
+
+func (s *server) RemoveName(name string) error {
+ s.Lock()
+ defer s.Unlock()
+ if s.stopped {
+ return errServerStopped
+ }
+ if _, present := s.names[name]; !present {
+ return fmt.Errorf("%q has not been previously used for this server", name)
+ }
+ s.publisher.RemoveName(name)
+ delete(s.names, name)
+ return nil
+}
+
func (s *server) Stop() error {
defer vlog.LogCall()()
s.Lock()
diff --git a/runtimes/google/ipc/server_test.go b/runtimes/google/ipc/server_test.go
index 8a8020f..11959f0 100644
--- a/runtimes/google/ipc/server_test.go
+++ b/runtimes/google/ipc/server_test.go
@@ -153,7 +153,7 @@
if _, err := server.Listen(spec); err != nil {
t.Fatal(err)
}
- if err := server.Serve("mountpoint/server", testServerDisp{&testServer{}}); err != nil {
+ if err := server.ServeDispatcher("mountpoint/server", testServerDisp{&testServer{}}); err != nil {
t.Fatal(err)
}
verifyMount(t, ns, name)
@@ -200,7 +200,7 @@
return fmt.Errorf("InternalNewServer failed: %v", err)
}
disp := testServerDisp{new(testServer)}
- if err := server.Serve("server", disp); err != nil {
+ if err := server.ServeDispatcher("server", disp); err != nil {
return fmt.Errorf("server.Register failed: %v", err)
}
spec := listenSpec
diff --git a/runtimes/google/lib/publisher/publisher.go b/runtimes/google/lib/publisher/publisher.go
index 63fad95..96e39f4 100644
--- a/runtimes/google/lib/publisher/publisher.go
+++ b/runtimes/google/lib/publisher/publisher.go
@@ -13,6 +13,9 @@
"veyron.io/veyron/veyron2/vlog"
)
+// TODO(cnicolaou): have the done channel return an error so
+// that the publisher calls can return errors also.
+
// Publisher manages the publishing of servers in mounttable.
type Publisher interface {
// AddServer adds a new server to be mounted.
@@ -21,6 +24,8 @@
RemoveServer(server string)
// AddName adds a new name for all servers to be mounted as.
AddName(name string)
+ // RemoveName removes a name.
+ RemoveName(name string)
// Published returns the published names rooted at the mounttable.
Published() []string
// DebugString returns a string representation of the publisher
@@ -58,11 +63,16 @@
done chan struct{} // closed when the cmd is done
}
-type nameCmd struct {
+type addNameCmd struct {
name string // name to add
done chan struct{} // closed when the cmd is done
}
+type removeNameCmd struct {
+ name string // name to remove
+ done chan struct{} // closed when the cmd is done
+}
+
type debugCmd chan string // debug string is sent when the cmd is done
type publishedCmd chan []string // published names are sent when cmd is done
@@ -104,7 +114,14 @@
func (p *publisher) AddName(name string) {
done := make(chan struct{})
- if p.sendCmd(nameCmd{name, done}) {
+ if p.sendCmd(addNameCmd{name, done}) {
+ <-done
+ }
+}
+
+func (p *publisher) RemoveName(name string) {
+ done := make(chan struct{})
+ if p.sendCmd(removeNameCmd{name, done}) {
<-done
}
}
@@ -167,9 +184,12 @@
case removeServerCmd:
state.removeServer(tcmd.server)
close(tcmd.done)
- case nameCmd:
+ case addNameCmd:
state.addName(tcmd.name)
close(tcmd.done)
+ case removeNameCmd:
+ state.removeName(tcmd.name)
+ close(tcmd.done)
case publishedCmd:
tcmd <- state.published()
close(tcmd)
@@ -190,11 +210,12 @@
ctx context.T
ns naming.Namespace
period time.Duration
- deadline time.Time // deadline for the next sync call
- names []string // names that have been added
- servers map[string]bool // servers that have been added, true
+ deadline time.Time // deadline for the next sync call
+ names map[string]bool // names that have been added
+ servers map[string]bool // servers that have been added, true
+ servesMT map[string]bool // true if server is a mount table server
+ mounts map[mountKey]*mountStatus // map each (name,server) to its status
// if server is a mount table server
- mounts map[mountKey]*mountStatus // map each (name,server) to its status
}
type mountKey struct {
@@ -215,6 +236,7 @@
ns: ns,
period: period,
deadline: time.Now().Add(period),
+ names: make(map[string]bool),
servers: make(map[string]bool),
mounts: make(map[mountKey]*mountStatus),
}
@@ -227,12 +249,10 @@
func (ps *pubState) addName(name string) {
// Each non-dup name that is added causes new mounts to be created for all
// existing servers.
- for _, n := range ps.names {
- if n == name {
- return
- }
+ if ps.names[name] {
+ return
}
- ps.names = append(ps.names, name)
+ ps.names[name] = true
for server, servesMT := range ps.servers {
status := new(mountStatus)
ps.mounts[mountKey{name, server}] = status
@@ -240,12 +260,24 @@
}
}
+func (ps *pubState) removeName(name string) {
+ if !ps.names[name] {
+ return
+ }
+ for server, _ := range ps.servers {
+ if status, exists := ps.mounts[mountKey{name, server}]; exists {
+ ps.unmount(name, server, status)
+ }
+ }
+ delete(ps.names, name)
+}
+
func (ps *pubState) addServer(server string, servesMT bool) {
// Each non-dup server that is added causes new mounts to be created for all
// existing names.
- if _, exists := ps.servers[server]; !exists {
+ if !ps.servers[server] {
ps.servers[server] = servesMT
- for _, name := range ps.names {
+ for name, _ := range ps.names {
status := new(mountStatus)
ps.mounts[mountKey{name, server}] = status
ps.mount(name, server, status, servesMT)
@@ -258,7 +290,7 @@
return
}
delete(ps.servers, server)
- for _, name := range ps.names {
+ for name, _ := range ps.names {
if status, exists := ps.mounts[mountKey{name, server}]; exists {
ps.unmount(name, server, status)
}
@@ -310,7 +342,7 @@
func (ps *pubState) published() []string {
var ret []string
- for _, name := range ps.names {
+ for name, _ := range ps.names {
e, err := ps.ns.ResolveToMountTableX(ps.ctx, name)
if err != nil {
vlog.Errorf("ipc pub: couldn't resolve %v to mount table: %v", name, err)
@@ -327,6 +359,7 @@
return ret
}
+// TODO(toddw): sort the names/servers so that the output order is stable.
func (ps *pubState) debugString() string {
l := make([]string, 2+len(ps.mounts))
l = append(l, fmt.Sprintf("Publisher period:%v deadline:%v", ps.period, ps.deadline))
diff --git a/runtimes/google/lib/publisher/publisher_test.go b/runtimes/google/lib/publisher/publisher_test.go
new file mode 100644
index 0000000..720f16f
--- /dev/null
+++ b/runtimes/google/lib/publisher/publisher_test.go
@@ -0,0 +1,58 @@
+package publisher_test
+
+import (
+ "reflect"
+ "sort"
+ "testing"
+ "time"
+
+ "veyron.io/veyron/veyron2/context"
+ "veyron.io/veyron/veyron2/naming"
+
+ iipc "veyron.io/veyron/veyron/runtimes/google/ipc"
+ "veyron.io/veyron/veyron/runtimes/google/lib/publisher"
+ tnaming "veyron.io/veyron/veyron/runtimes/google/testing/mocks/naming"
+ "veyron.io/veyron/veyron/runtimes/google/testing/mocks/runtime"
+ "veyron.io/veyron/veyron/runtimes/google/vtrace"
+)
+
+func testContext() context.T {
+ ctx := iipc.InternalNewContext(&runtime.PanicRuntime{})
+ ctx, _ = vtrace.WithNewSpan(ctx, "")
+ ctx, _ = ctx.WithDeadline(time.Now().Add(20 * time.Second))
+ return ctx
+}
+
+func resolve(t *testing.T, ns naming.Namespace, name string) []string {
+ servers, err := ns.Resolve(testContext(), name)
+ if err != nil {
+ t.Fatalf("failed to resolve %q", name)
+ }
+ return servers
+}
+
+func TestAddAndRemove(t *testing.T) {
+ ns := tnaming.NewSimpleNamespace()
+ pub := publisher.New(testContext(), ns, time.Second)
+ pub.AddName("foo")
+ pub.AddServer("foo-addr", false)
+ if got, want := resolve(t, ns, "foo"), []string{"foo-addr"}; !reflect.DeepEqual(got, want) {
+ t.Errorf("got %q, want %q", got, want)
+ }
+ pub.AddServer("bar-addr", false)
+ got, want := resolve(t, ns, "foo"), []string{"bar-addr", "foo-addr"}
+ sort.Strings(got)
+ if !reflect.DeepEqual(got, want) {
+ t.Errorf("got %q, want %q", got, want)
+ }
+ pub.AddName("baz")
+ got = resolve(t, ns, "baz")
+ sort.Strings(got)
+ if !reflect.DeepEqual(got, want) {
+ t.Errorf("got %q, want %q", got, want)
+ }
+ pub.RemoveName("foo")
+ if _, err := ns.Resolve(testContext(), "foo"); err == nil {
+ t.Errorf("expected an error")
+ }
+}
diff --git a/runtimes/google/naming/namespace/all_test.go b/runtimes/google/naming/namespace/all_test.go
index b803577..cf66324 100644
--- a/runtimes/google/naming/namespace/all_test.go
+++ b/runtimes/google/naming/namespace/all_test.go
@@ -193,7 +193,7 @@
if err != nil {
boom(t, "Failed to Listen: %s", err)
}
- if err := s.Serve(mountPoint, disp); err != nil {
+ if err := s.ServeDispatcher(mountPoint, disp); err != nil {
boom(t, "Failed to serve mount table at %s: %s", mountPoint, err)
}
name := naming.JoinAddressName(ep.String(), "")
diff --git a/runtimes/google/rt/ipc_test.go b/runtimes/google/rt/ipc_test.go
index 4c6cc6c..a29098b 100644
--- a/runtimes/google/rt/ipc_test.go
+++ b/runtimes/google/rt/ipc_test.go
@@ -131,7 +131,7 @@
} else {
serverObjectName = naming.JoinAddressName(endpoint.String(), "")
}
- if err := server.Serve("", ipc.LeafDispatcher(testService{}, vsecurity.NewACLAuthorizer(vsecurity.OpenACL()))); err != nil {
+ if err := server.Serve("", testService{}, vsecurity.NewACLAuthorizer(vsecurity.OpenACL())); err != nil {
t.Fatal(err)
}
// Let it rip!
diff --git a/runtimes/google/rt/mgmt.go b/runtimes/google/rt/mgmt.go
index 103d804..8fcee9a 100644
--- a/runtimes/google/rt/mgmt.go
+++ b/runtimes/google/rt/mgmt.go
@@ -56,7 +56,7 @@
if ep, err = m.server.Listen(listenSpec); err != nil {
return err
}
- if err := m.server.Serve("", ipc.LeafDispatcher(appcycle.NewServerAppCycle(m), nil)); err != nil {
+ if err := m.server.Serve("", appcycle.NewServerAppCycle(m), nil); err != nil {
return err
}
return m.callbackToParent(parentName, naming.JoinAddressName(ep.String(), ""))
diff --git a/runtimes/google/rt/mgmt_test.go b/runtimes/google/rt/mgmt_test.go
index 4eea73d..88ba312 100644
--- a/runtimes/google/rt/mgmt_test.go
+++ b/runtimes/google/rt/mgmt_test.go
@@ -273,7 +273,7 @@
if ep, err = server.Listen(profiles.LocalListenSpec); err != nil {
t.Fatalf("Got error: %v", err)
}
- if err := server.Serve("", ipc.LeafDispatcher(node.NewServerConfig(&configServer{ch}), vflag.NewAuthorizerOrDie())); err != nil {
+ if err := server.Serve("", node.NewServerConfig(&configServer{ch}), vflag.NewAuthorizerOrDie()); err != nil {
t.Fatalf("Got error: %v", err)
}
return server, naming.JoinAddressName(ep.String(), ""), ch
diff --git a/runtimes/google/vtrace/vtrace_test.go b/runtimes/google/vtrace/vtrace_test.go
index 66c8bd7..c25d6f9 100644
--- a/runtimes/google/vtrace/vtrace_test.go
+++ b/runtimes/google/vtrace/vtrace_test.go
@@ -114,7 +114,7 @@
forceCollect: forceCollect,
}
- if err := s.Serve(name, ipc.LeafDispatcher(c, fakeAuthorizer(0))); err != nil {
+ if err := s.Serve(name, c, fakeAuthorizer(0)); err != nil {
return nil, err
}