veyron2/ipc: 3 of n. 'Invoker and Signature' rationalisation.

- implement the changed ipc.Serve signature
- transition all code to use it and ipc.ServeDispatcher
- get rid of the ugliness around calling Serve multiple
  times to publish under multiple names in the mount table
  and instead provide AddName/RemoveName methods.

Change-Id: Ic5edd709d28e2670369986a5b0fce4cd22e3cafd
diff --git a/runtimes/google/ipc/benchmarks/server.go b/runtimes/google/ipc/benchmarks/server.go
index aca9f60..b01f59f 100644
--- a/runtimes/google/ipc/benchmarks/server.go
+++ b/runtimes/google/ipc/benchmarks/server.go
@@ -41,7 +41,7 @@
 	if err != nil {
 		vlog.Fatalf("Listen failed: %v", err)
 	}
-	if err := server.Serve("", ipc.LeafDispatcher(NewServerBenchmark(&impl{}), sflag.NewAuthorizerOrDie())); err != nil {
+	if err := server.Serve("", &impl{}, sflag.NewAuthorizerOrDie()); err != nil {
 		vlog.Fatalf("Serve failed: %v", err)
 	}
 	return naming.JoinAddressName(ep.String(), ""), func() {
diff --git a/runtimes/google/ipc/cancel_test.go b/runtimes/google/ipc/cancel_test.go
index cc30a2f..0ad572c 100644
--- a/runtimes/google/ipc/cancel_test.go
+++ b/runtimes/google/ipc/cancel_test.go
@@ -73,7 +73,7 @@
 		stop:      s.Stop,
 	}
 
-	if err := s.Serve(name, ipc.LeafDispatcher(c, fakeAuthorizer(0))); err != nil {
+	if err := s.Serve(name, c, fakeAuthorizer(0)); err != nil {
 		return nil, err
 	}
 
diff --git a/runtimes/google/ipc/debug_test.go b/runtimes/google/ipc/debug_test.go
index e05c185..eaa8911 100644
--- a/runtimes/google/ipc/debug_test.go
+++ b/runtimes/google/ipc/debug_test.go
@@ -41,7 +41,7 @@
 		t.Fatalf("InternalNewServer failed: %v", err)
 	}
 	defer server.Stop()
-	server.Serve("", ipc.LeafDispatcher(&testObject{}, nil))
+	server.Serve("", &testObject{}, nil)
 	ep, err := server.Listen(listenSpec)
 	if err != nil {
 		t.Fatalf("server.Listen failed: %v", err)
diff --git a/runtimes/google/ipc/full_test.go b/runtimes/google/ipc/full_test.go
index cd83055..19252f1 100644
--- a/runtimes/google/ipc/full_test.go
+++ b/runtimes/google/ipc/full_test.go
@@ -180,10 +180,10 @@
 	}
 	vlog.VI(1).Info("server.Serve")
 	disp := testServerDisp{ts}
-	if err := server.Serve("mountpoint/server", disp); err != nil {
+	if err := server.ServeDispatcher("mountpoint/server", disp); err != nil {
 		t.Errorf("server.Publish failed: %v", err)
 	}
-	if err := server.Serve("mountpoint/discharger", disp); err != nil {
+	if err := server.AddName("mountpoint/discharger"); err != nil {
 		t.Errorf("server.Publish for discharger failed: %v", err)
 	}
 	return ep, server
@@ -208,7 +208,7 @@
 	verifyMount(t, ns, n1)
 
 	// publish a second name
-	if err := server.Serve(n2, nil); err != nil {
+	if err := server.AddName(n2); err != nil {
 		t.Errorf("server.Serve failed: %v", err)
 	}
 	verifyMount(t, ns, n2)
@@ -221,7 +221,7 @@
 	verifyMountMissing(t, ns, n2)
 
 	// Check that we can no longer serve after Stop.
-	err := server.Serve("name doesn't matter", nil)
+	err := server.AddName("name doesn't matter")
 	if err == nil || err.Error() != "ipc: server is stopped" {
 		t.Errorf("either no error, or a wrong error was returned: %v", err)
 	}
@@ -267,7 +267,7 @@
 	return err == nil || strings.Index(err.Error(), pattern) >= 0
 }
 
-func TestMultipleCallsToServe(t *testing.T) {
+func TestMultipleCallsToServeAndName(t *testing.T) {
 	sm := imanager.InternalNew(naming.FixedRoutingID(0x555555555))
 	ns := tnaming.NewSimpleNamespace()
 	server, err := InternalNewServer(testContext(), sm, ns, vc.LocalPrincipal{sectest.NewPrincipal()})
@@ -280,8 +280,8 @@
 	}
 
 	disp := &testServerDisp{&testServer{}}
-	if err := server.Serve("mountpoint/server", disp); err != nil {
-		t.Errorf("server.Publish failed: %v", err)
+	if err := server.ServeDispatcher("mountpoint/server", disp); err != nil {
+		t.Errorf("server.ServeDispatcher failed: %v", err)
 	}
 
 	n1 := "mountpoint/server"
@@ -291,17 +291,35 @@
 
 	verifyMount(t, ns, n1)
 
-	if err := server.Serve(n2, disp); err != nil {
-		t.Errorf("server.Serve failed: %v", err)
+	if server.ServeDispatcher(n2, disp) == nil {
+		t.Errorf("server.ServeDispatcher should have failed")
 	}
-	if err := server.Serve(n3, nil); err != nil {
-		t.Errorf("server.Serve failed: %v", err)
+
+	if err := server.Serve(n2, &testServer{}, nil); err == nil {
+		t.Errorf("server.Serve should have failed")
+	}
+
+	if err := server.AddName(n3); err != nil {
+		t.Errorf("server.AddName failed: %v", err)
+	}
+
+	if err := server.AddName(n3); err != nil {
+		t.Errorf("server.AddName failed: %v", err)
 	}
 	verifyMount(t, ns, n2)
 	verifyMount(t, ns, n3)
 
-	if err := server.Serve(n4, &testServerDisp{&testServer{}}); err == nil {
-		t.Errorf("server.Serve should have failed")
+	if err := server.RemoveName(n1); err != nil {
+		t.Errorf("server.RemoveName failed: %v", err)
+	}
+	verifyMountMissing(t, ns, n1)
+
+	if err := server.RemoveName("some randome name"); err == nil {
+		t.Errorf("server.RemoveName should have failed")
+	}
+
+	if err := server.ServeDispatcher(n4, &testServerDisp{&testServer{}}); err == nil {
+		t.Errorf("server.ServeDispatcher should have failed")
 	}
 	verifyMountMissing(t, ns, n4)
 
@@ -602,7 +620,7 @@
 	}
 
 	var tester dischargeImpetusTester
-	if err := server.Serve("mountpoint", &tester); err != nil {
+	if err := server.ServeDispatcher("mountpoint", &tester); err != nil {
 		t.Fatal(err)
 	}
 
@@ -1048,7 +1066,7 @@
 		t.Fatalf("server.Listen failed: %v", err)
 	}
 	disp := &testServerDisp{&testServer{}}
-	if err := server.Serve("mp/server", disp); err != nil {
+	if err := server.ServeDispatcher("mp/server", disp); err != nil {
 		t.Fatalf("server.Serve failed: %v", err)
 	}
 	client, err := InternalNewClient(sm, ns, options.VCSecurityNone)
diff --git a/runtimes/google/ipc/server.go b/runtimes/google/ipc/server.go
index 44ae7ab..943a71b 100644
--- a/runtimes/google/ipc/server.go
+++ b/runtimes/google/ipc/server.go
@@ -47,7 +47,11 @@
 	stoppedChan      chan struct{}                     // closed when the server has been stopped.
 	ns               naming.Namespace
 	servesMountTable bool
-	reservedOpt      options.ReservedNameDispatcher
+	// TODO(cnicolaou): remove this when the publisher tracks published names
+	// and can return an appropriate error for RemoveName on a name that
+	// wasn't 'Added' for this server.
+	names       map[string]struct{}
+	reservedOpt options.ReservedNameDispatcher
 	// TODO(cnicolaou): add roaming stats to ipcStats
 	stats *ipcStats // stats for this server.
 }
@@ -533,51 +537,66 @@
 	}
 }
 
-func (s *server) Serve(name string, obj interface{}) error {
+func (s *server) Serve(name string, obj interface{}, authorizer security.Authorizer) error {
 	if obj == nil {
-		// nil is an allowed value for obj.
-		return s.ServeDispatcher(name, ipc.Dispatcher(nil))
+		// The ReflectInvoker inside the LeafDispatcher will panic
+		// if called for a nil value.
+		return fmt.Errorf("A nil object is not allowed")
 	}
-	// TRANSITION: this will be disallowed when the transition is complete.
-	if disp, ok := obj.(ipc.Dispatcher); ok {
-		return s.ServeDispatcher(name, disp)
-	}
-	// TRANSITION: We may fail the dispatcher type test, but still be a
-	// dispatcher becase our Lookup method returns ipc.Invoker and not a
-	// raw object. This code here will detect that case and panic as an aid
-	// to catching these cases early.
-	typ := reflect.TypeOf(obj)
-	if lookup, found := typ.MethodByName("Lookup"); found {
-		if lookup.Type.NumIn() == 3 && lookup.Type.NumOut() == 3 {
-			inv := lookup.Type.Out(0)
-			if inv.Name() == "Invoker" {
-				panic(fmt.Sprintf("%q has a Lookup that returns an Invoker", lookup.Name))
-			}
-		}
-	}
-	// TRANSITION: this will go away in the transition.
-	panic("should never get here")
+	return s.ServeDispatcher(name, ipc.LeafDispatcher(obj, authorizer))
 }
 
 func (s *server) ServeDispatcher(name string, disp ipc.Dispatcher) error {
-	defer vlog.LogCall()()
 	s.Lock()
 	defer s.Unlock()
 	if s.stopped {
 		return errServerStopped
 	}
-	if s.disp != nil && disp != nil && s.disp != disp {
-		return fmt.Errorf("attempt to change dispatcher")
+	if disp == nil {
+		return fmt.Errorf("A nil dispacther is not allowed")
 	}
-	if disp != nil {
-		s.disp = disp
+	if s.disp != nil {
+		return fmt.Errorf("Serve or ServeDispatcher has already been called")
 	}
+	s.disp = disp
+	s.names = make(map[string]struct{})
 	if len(name) > 0 {
 		s.publisher.AddName(name)
+		s.names[name] = struct{}{}
 	}
 	return nil
 }
 
+func (s *server) AddName(name string) error {
+	s.Lock()
+	defer s.Unlock()
+	if s.stopped {
+		return errServerStopped
+	}
+	if len(name) == 0 {
+		return fmt.Errorf("empty name")
+	}
+	s.publisher.AddName(name)
+	// TODO(cnicolaou): remove this map when the publisher's RemoveName
+	// method returns an error.
+	s.names[name] = struct{}{}
+	return nil
+}
+
+func (s *server) RemoveName(name string) error {
+	s.Lock()
+	defer s.Unlock()
+	if s.stopped {
+		return errServerStopped
+	}
+	if _, present := s.names[name]; !present {
+		return fmt.Errorf("%q has not been previously used for this server", name)
+	}
+	s.publisher.RemoveName(name)
+	delete(s.names, name)
+	return nil
+}
+
 func (s *server) Stop() error {
 	defer vlog.LogCall()()
 	s.Lock()
diff --git a/runtimes/google/ipc/server_test.go b/runtimes/google/ipc/server_test.go
index 8a8020f..11959f0 100644
--- a/runtimes/google/ipc/server_test.go
+++ b/runtimes/google/ipc/server_test.go
@@ -153,7 +153,7 @@
 	if _, err := server.Listen(spec); err != nil {
 		t.Fatal(err)
 	}
-	if err := server.Serve("mountpoint/server", testServerDisp{&testServer{}}); err != nil {
+	if err := server.ServeDispatcher("mountpoint/server", testServerDisp{&testServer{}}); err != nil {
 		t.Fatal(err)
 	}
 	verifyMount(t, ns, name)
@@ -200,7 +200,7 @@
 		return fmt.Errorf("InternalNewServer failed: %v", err)
 	}
 	disp := testServerDisp{new(testServer)}
-	if err := server.Serve("server", disp); err != nil {
+	if err := server.ServeDispatcher("server", disp); err != nil {
 		return fmt.Errorf("server.Register failed: %v", err)
 	}
 	spec := listenSpec
diff --git a/runtimes/google/lib/publisher/publisher.go b/runtimes/google/lib/publisher/publisher.go
index 63fad95..96e39f4 100644
--- a/runtimes/google/lib/publisher/publisher.go
+++ b/runtimes/google/lib/publisher/publisher.go
@@ -13,6 +13,9 @@
 	"veyron.io/veyron/veyron2/vlog"
 )
 
+// TODO(cnicolaou): have the done channel return an error so
+// that the publisher calls can return errors also.
+
 // Publisher manages the publishing of servers in mounttable.
 type Publisher interface {
 	// AddServer adds a new server to be mounted.
@@ -21,6 +24,8 @@
 	RemoveServer(server string)
 	// AddName adds a new name for all servers to be mounted as.
 	AddName(name string)
+	// RemoveName removes a name.
+	RemoveName(name string)
 	// Published returns the published names rooted at the mounttable.
 	Published() []string
 	// DebugString returns a string representation of the publisher
@@ -58,11 +63,16 @@
 	done   chan struct{} // closed when the cmd is done
 }
 
-type nameCmd struct {
+type addNameCmd struct {
 	name string        // name to add
 	done chan struct{} // closed when the cmd is done
 }
 
+type removeNameCmd struct {
+	name string        // name to remove
+	done chan struct{} // closed when the cmd is done
+}
+
 type debugCmd chan string // debug string is sent when the cmd is done
 
 type publishedCmd chan []string // published names are sent when cmd is done
@@ -104,7 +114,14 @@
 
 func (p *publisher) AddName(name string) {
 	done := make(chan struct{})
-	if p.sendCmd(nameCmd{name, done}) {
+	if p.sendCmd(addNameCmd{name, done}) {
+		<-done
+	}
+}
+
+func (p *publisher) RemoveName(name string) {
+	done := make(chan struct{})
+	if p.sendCmd(removeNameCmd{name, done}) {
 		<-done
 	}
 }
@@ -167,9 +184,12 @@
 			case removeServerCmd:
 				state.removeServer(tcmd.server)
 				close(tcmd.done)
-			case nameCmd:
+			case addNameCmd:
 				state.addName(tcmd.name)
 				close(tcmd.done)
+			case removeNameCmd:
+				state.removeName(tcmd.name)
+				close(tcmd.done)
 			case publishedCmd:
 				tcmd <- state.published()
 				close(tcmd)
@@ -190,11 +210,12 @@
 	ctx      context.T
 	ns       naming.Namespace
 	period   time.Duration
-	deadline time.Time       // deadline for the next sync call
-	names    []string        // names that have been added
-	servers  map[string]bool // servers that have been added, true
+	deadline time.Time                 // deadline for the next sync call
+	names    map[string]bool           // names that have been added
+	servers  map[string]bool           // servers that have been added, true
+	servesMT map[string]bool           // true if server is a mount table server
+	mounts   map[mountKey]*mountStatus // map each (name,server) to its status
 	//   if server is a mount table server
-	mounts map[mountKey]*mountStatus // map each (name,server) to its status
 }
 
 type mountKey struct {
@@ -215,6 +236,7 @@
 		ns:       ns,
 		period:   period,
 		deadline: time.Now().Add(period),
+		names:    make(map[string]bool),
 		servers:  make(map[string]bool),
 		mounts:   make(map[mountKey]*mountStatus),
 	}
@@ -227,12 +249,10 @@
 func (ps *pubState) addName(name string) {
 	// Each non-dup name that is added causes new mounts to be created for all
 	// existing servers.
-	for _, n := range ps.names {
-		if n == name {
-			return
-		}
+	if ps.names[name] {
+		return
 	}
-	ps.names = append(ps.names, name)
+	ps.names[name] = true
 	for server, servesMT := range ps.servers {
 		status := new(mountStatus)
 		ps.mounts[mountKey{name, server}] = status
@@ -240,12 +260,24 @@
 	}
 }
 
+func (ps *pubState) removeName(name string) {
+	if !ps.names[name] {
+		return
+	}
+	for server, _ := range ps.servers {
+		if status, exists := ps.mounts[mountKey{name, server}]; exists {
+			ps.unmount(name, server, status)
+		}
+	}
+	delete(ps.names, name)
+}
+
 func (ps *pubState) addServer(server string, servesMT bool) {
 	// Each non-dup server that is added causes new mounts to be created for all
 	// existing names.
-	if _, exists := ps.servers[server]; !exists {
+	if !ps.servers[server] {
 		ps.servers[server] = servesMT
-		for _, name := range ps.names {
+		for name, _ := range ps.names {
 			status := new(mountStatus)
 			ps.mounts[mountKey{name, server}] = status
 			ps.mount(name, server, status, servesMT)
@@ -258,7 +290,7 @@
 		return
 	}
 	delete(ps.servers, server)
-	for _, name := range ps.names {
+	for name, _ := range ps.names {
 		if status, exists := ps.mounts[mountKey{name, server}]; exists {
 			ps.unmount(name, server, status)
 		}
@@ -310,7 +342,7 @@
 
 func (ps *pubState) published() []string {
 	var ret []string
-	for _, name := range ps.names {
+	for name, _ := range ps.names {
 		e, err := ps.ns.ResolveToMountTableX(ps.ctx, name)
 		if err != nil {
 			vlog.Errorf("ipc pub: couldn't resolve %v to mount table: %v", name, err)
@@ -327,6 +359,7 @@
 	return ret
 }
 
+// TODO(toddw): sort the names/servers so that the output order is stable.
 func (ps *pubState) debugString() string {
 	l := make([]string, 2+len(ps.mounts))
 	l = append(l, fmt.Sprintf("Publisher period:%v deadline:%v", ps.period, ps.deadline))
diff --git a/runtimes/google/lib/publisher/publisher_test.go b/runtimes/google/lib/publisher/publisher_test.go
new file mode 100644
index 0000000..720f16f
--- /dev/null
+++ b/runtimes/google/lib/publisher/publisher_test.go
@@ -0,0 +1,58 @@
+package publisher_test
+
+import (
+	"reflect"
+	"sort"
+	"testing"
+	"time"
+
+	"veyron.io/veyron/veyron2/context"
+	"veyron.io/veyron/veyron2/naming"
+
+	iipc "veyron.io/veyron/veyron/runtimes/google/ipc"
+	"veyron.io/veyron/veyron/runtimes/google/lib/publisher"
+	tnaming "veyron.io/veyron/veyron/runtimes/google/testing/mocks/naming"
+	"veyron.io/veyron/veyron/runtimes/google/testing/mocks/runtime"
+	"veyron.io/veyron/veyron/runtimes/google/vtrace"
+)
+
+func testContext() context.T {
+	ctx := iipc.InternalNewContext(&runtime.PanicRuntime{})
+	ctx, _ = vtrace.WithNewSpan(ctx, "")
+	ctx, _ = ctx.WithDeadline(time.Now().Add(20 * time.Second))
+	return ctx
+}
+
+func resolve(t *testing.T, ns naming.Namespace, name string) []string {
+	servers, err := ns.Resolve(testContext(), name)
+	if err != nil {
+		t.Fatalf("failed to resolve %q", name)
+	}
+	return servers
+}
+
+func TestAddAndRemove(t *testing.T) {
+	ns := tnaming.NewSimpleNamespace()
+	pub := publisher.New(testContext(), ns, time.Second)
+	pub.AddName("foo")
+	pub.AddServer("foo-addr", false)
+	if got, want := resolve(t, ns, "foo"), []string{"foo-addr"}; !reflect.DeepEqual(got, want) {
+		t.Errorf("got %q, want %q", got, want)
+	}
+	pub.AddServer("bar-addr", false)
+	got, want := resolve(t, ns, "foo"), []string{"bar-addr", "foo-addr"}
+	sort.Strings(got)
+	if !reflect.DeepEqual(got, want) {
+		t.Errorf("got %q, want %q", got, want)
+	}
+	pub.AddName("baz")
+	got = resolve(t, ns, "baz")
+	sort.Strings(got)
+	if !reflect.DeepEqual(got, want) {
+		t.Errorf("got %q, want %q", got, want)
+	}
+	pub.RemoveName("foo")
+	if _, err := ns.Resolve(testContext(), "foo"); err == nil {
+		t.Errorf("expected an error")
+	}
+}
diff --git a/runtimes/google/naming/namespace/all_test.go b/runtimes/google/naming/namespace/all_test.go
index b803577..cf66324 100644
--- a/runtimes/google/naming/namespace/all_test.go
+++ b/runtimes/google/naming/namespace/all_test.go
@@ -193,7 +193,7 @@
 	if err != nil {
 		boom(t, "Failed to Listen: %s", err)
 	}
-	if err := s.Serve(mountPoint, disp); err != nil {
+	if err := s.ServeDispatcher(mountPoint, disp); err != nil {
 		boom(t, "Failed to serve mount table at %s: %s", mountPoint, err)
 	}
 	name := naming.JoinAddressName(ep.String(), "")
diff --git a/runtimes/google/rt/ipc_test.go b/runtimes/google/rt/ipc_test.go
index 4c6cc6c..a29098b 100644
--- a/runtimes/google/rt/ipc_test.go
+++ b/runtimes/google/rt/ipc_test.go
@@ -131,7 +131,7 @@
 	} else {
 		serverObjectName = naming.JoinAddressName(endpoint.String(), "")
 	}
-	if err := server.Serve("", ipc.LeafDispatcher(testService{}, vsecurity.NewACLAuthorizer(vsecurity.OpenACL()))); err != nil {
+	if err := server.Serve("", testService{}, vsecurity.NewACLAuthorizer(vsecurity.OpenACL())); err != nil {
 		t.Fatal(err)
 	}
 	// Let it rip!
diff --git a/runtimes/google/rt/mgmt.go b/runtimes/google/rt/mgmt.go
index 103d804..8fcee9a 100644
--- a/runtimes/google/rt/mgmt.go
+++ b/runtimes/google/rt/mgmt.go
@@ -56,7 +56,7 @@
 	if ep, err = m.server.Listen(listenSpec); err != nil {
 		return err
 	}
-	if err := m.server.Serve("", ipc.LeafDispatcher(appcycle.NewServerAppCycle(m), nil)); err != nil {
+	if err := m.server.Serve("", appcycle.NewServerAppCycle(m), nil); err != nil {
 		return err
 	}
 	return m.callbackToParent(parentName, naming.JoinAddressName(ep.String(), ""))
diff --git a/runtimes/google/rt/mgmt_test.go b/runtimes/google/rt/mgmt_test.go
index 4eea73d..88ba312 100644
--- a/runtimes/google/rt/mgmt_test.go
+++ b/runtimes/google/rt/mgmt_test.go
@@ -273,7 +273,7 @@
 	if ep, err = server.Listen(profiles.LocalListenSpec); err != nil {
 		t.Fatalf("Got error: %v", err)
 	}
-	if err := server.Serve("", ipc.LeafDispatcher(node.NewServerConfig(&configServer{ch}), vflag.NewAuthorizerOrDie())); err != nil {
+	if err := server.Serve("", node.NewServerConfig(&configServer{ch}), vflag.NewAuthorizerOrDie()); err != nil {
 		t.Fatalf("Got error: %v", err)
 	}
 	return server, naming.JoinAddressName(ep.String(), ""), ch
diff --git a/runtimes/google/vtrace/vtrace_test.go b/runtimes/google/vtrace/vtrace_test.go
index 66c8bd7..c25d6f9 100644
--- a/runtimes/google/vtrace/vtrace_test.go
+++ b/runtimes/google/vtrace/vtrace_test.go
@@ -114,7 +114,7 @@
 		forceCollect: forceCollect,
 	}
 
-	if err := s.Serve(name, ipc.LeafDispatcher(c, fakeAuthorizer(0))); err != nil {
+	if err := s.Serve(name, c, fakeAuthorizer(0)); err != nil {
 		return nil, err
 	}