Merge "veyron2: add a method for obtaining the current status of a server."
diff --git a/lib/modules/core/proxy.go b/lib/modules/core/proxy.go
index f137cf8..79d826b 100644
--- a/lib/modules/core/proxy.go
+++ b/lib/modules/core/proxy.go
@@ -40,7 +40,6 @@
defer proxy.Shutdown()
fmt.Fprintf(stdout, "PID=%d\n", os.Getpid())
- fmt.Fprintf(stdout, "PROXY_NAME=%s\n", proxy.Endpoint().Name())
if expected > 0 {
pub := publisher.New(ctx, veyron2.GetNamespace(ctx), time.Minute)
defer pub.WaitForStop()
@@ -54,8 +53,8 @@
}
// Wait for all the entries to be published.
for {
- got := len(pub.Published())
- if expected == got {
+ pubState := pub.Status()
+ if expected == len(pubState) {
break
}
fmt.Fprintf(stderr, "%s\n", pub.DebugString())
@@ -63,10 +62,8 @@
fmt.Fprintf(stderr, "Sleeping: %s\n", delay)
time.Sleep(delay)
}
- for _, p := range pub.Published() {
- fmt.Fprintf(stdout, "PUBLISHED_PROXY_NAME=%s\n", p)
- }
}
+ fmt.Fprintf(stdout, "PROXY_NAME=%s\n", proxy.Endpoint().Name())
modules.WaitForEOF(stdin)
fmt.Fprintf(stdout, "DONE\n")
return nil
diff --git a/runtimes/google/ipc/debug_test.go b/runtimes/google/ipc/debug_test.go
index 04ea5b3..7290cd5 100644
--- a/runtimes/google/ipc/debug_test.go
+++ b/runtimes/google/ipc/debug_test.go
@@ -41,11 +41,13 @@
t.Fatalf("InternalNewServer failed: %v", err)
}
defer server.Stop()
- server.Serve("", &testObject{}, nil)
eps, err := server.Listen(listenSpec)
if err != nil {
t.Fatalf("server.Listen failed: %v", err)
}
+ if err := server.Serve("", &testObject{}, nil); err != nil {
+ t.Fatalf("server.Serve failed: %v", err)
+ }
client, err := InternalNewClient(sm, ns, vc.LocalPrincipal{pclient})
if err != nil {
t.Fatalf("InternalNewClient failed: %v", err)
diff --git a/runtimes/google/ipc/full_test.go b/runtimes/google/ipc/full_test.go
index e0c3164..45f3a61 100644
--- a/runtimes/google/ipc/full_test.go
+++ b/runtimes/google/ipc/full_test.go
@@ -10,6 +10,7 @@
"path/filepath"
"reflect"
"runtime"
+ "sort"
"strings"
"sync"
"testing"
@@ -201,6 +202,15 @@
return startServerWS(t, principal, sm, ns, name, disp, noWebsocket, opts...)
}
+func endpointsToStrings(eps []naming.Endpoint) []string {
+ r := make([]string, len(eps))
+ for i, e := range eps {
+ r[i] = e.String()
+ }
+ sort.Strings(r)
+ return r
+}
+
func startServerWS(t *testing.T, principal security.Principal, sm stream.Manager, ns naming.Namespace, name string, disp ipc.Dispatcher, shouldUseWebsocket websocketMode, opts ...ipc.ServerOpt) (naming.Endpoint, ipc.Server) {
vlog.VI(1).Info("InternalNewServer")
opts = append(opts, vc.LocalPrincipal{principal})
@@ -214,7 +224,7 @@
if shouldUseWebsocket {
spec = listenWSSpec
}
- ep, err := server.Listen(spec)
+ eps, err := server.Listen(spec)
if err != nil {
t.Errorf("server.Listen failed: %v", err)
}
@@ -225,7 +235,16 @@
if err := server.AddName(name); err != nil {
t.Errorf("server.AddName for discharger failed: %v", err)
}
- return ep[0], server
+
+ status := server.Status()
+ if got, want := endpointsToStrings(status.Endpoints), endpointsToStrings(eps); !reflect.DeepEqual(got, want) {
+ t.Fatalf("got %v, want %v", got, want)
+ }
+ names := status.Mounts.Names()
+ if len(names) != 1 || names[0] != name {
+ t.Fatalf("unexpected names: %v", names)
+ }
+ return eps[0], server
}
func loc(d int) string {
@@ -387,14 +406,10 @@
verifyMount(t, ns, n2)
verifyMount(t, ns, n3)
- if err := server.RemoveName(n1); err != nil {
- t.Errorf("server.RemoveName failed: %v", err)
- }
+ server.RemoveName(n1)
verifyMountMissing(t, ns, n1)
- if err := server.RemoveName("some randome name"); err == nil {
- t.Errorf("server.RemoveName should have failed")
- }
+ server.RemoveName("some randome name")
if err := server.ServeDispatcher(n4, &testServerDisp{&testServer{}}); err == nil {
t.Errorf("server.ServeDispatcher should have failed")
diff --git a/runtimes/google/ipc/server.go b/runtimes/google/ipc/server.go
index 20af110..abf18a3 100644
--- a/runtimes/google/ipc/server.go
+++ b/runtimes/google/ipc/server.go
@@ -36,36 +36,81 @@
// for communicating from server to client.
)
-var (
- // TODO(cnicolaou): this should be BadState in verror2.
- errServerStopped = old_verror.Abortedf("ipc: server is stopped")
-)
-
type server struct {
sync.Mutex
- ctx *context.T // context used by the server to make internal RPCs.
- streamMgr stream.Manager // stream manager to listen for new flows.
- publisher publisher.Publisher // publisher to publish mounttable mounts.
- listenerOpts []stream.ListenerOpt // listener opts passed to Listen.
- listeners map[stream.Listener]struct{} // listeners created by Listen.
- dhcpListeners map[*dhcpListener]struct{} // dhcpListeners created by Listen.
+ state serverState // track state of the server.
+ ctx *context.T // context used by the server to make internal RPCs.
+ streamMgr stream.Manager // stream manager to listen for new flows.
+ publisher publisher.Publisher // publisher to publish mounttable mounts.
+ listenerOpts []stream.ListenerOpt // listener opts for Listen.
+
+ // listeners created by Listen for servers and proxies
+ listeners map[stream.Listener]struct{}
+ // dhcpListeners created by Listen.
+ dhcpListeners map[*dhcpListener]struct{}
+ // state of proxies keyed by the name of the proxy
+ proxies map[string]proxyState
+ // all endpoints generated and returned by this server
+ endpoints []naming.Endpoint
disp ipc.Dispatcher // dispatcher to serve RPCs
dispReserved ipc.Dispatcher // dispatcher for reserved methods
active sync.WaitGroup // active goroutines we've spawned.
- stopped bool // whether the server has been stopped.
stoppedChan chan struct{} // closed when the server has been stopped.
preferredProtocols []string // protocols to use when resolving proxy name to endpoint.
ns naming.Namespace
servesMountTable bool
- // TODO(cnicolaou): remove this when the publisher tracks published names
- // and can return an appropriate error for RemoveName on a name that
- // wasn't 'Added' for this server.
- names map[string]struct{}
// TODO(cnicolaou): add roaming stats to ipcStats
stats *ipcStats // stats for this server.
}
+type serverState int
+
+const (
+ initialized serverState = iota
+ listening
+ serving
+ publishing
+ stopping
+ stopped
+)
+
+// Simple state machine for the server implementation.
+type next map[serverState]bool
+type transitions map[serverState]next
+
+var (
+ states = transitions{
+ initialized: next{listening: true, stopping: true},
+ listening: next{listening: true, serving: true, stopping: true},
+ serving: next{publishing: true, stopping: true},
+ publishing: next{publishing: true, stopping: true},
+ stopping: next{},
+ stopped: next{},
+ }
+
+ externalStates = map[serverState]ipc.ServerState{
+ initialized: ipc.ServerInit,
+ listening: ipc.ServerActive,
+ serving: ipc.ServerActive,
+ publishing: ipc.ServerActive,
+ stopping: ipc.ServerStopping,
+ stopped: ipc.ServerStopped,
+ }
+)
+
+func (s *server) allowed(next serverState, method string) error {
+ if states[s.state][next] {
+ s.state = next
+ return nil
+ }
+ return verror.Make(verror.BadState, s.ctx, fmt.Sprintf("%s called out of order or more than once", method))
+}
+
+func (s *server) isStopState() bool {
+ return s.state == stopping || s.state == stopped
+}
+
var _ ipc.Server = (*server)(nil)
type dhcpListener struct {
@@ -78,6 +123,11 @@
ch chan config.Setting // channel to receive settings over
}
+type proxyState struct {
+ endpoint naming.Endpoint
+ err verror.E
+}
+
// This option is used to sort and filter the endpoints when resolving the
// proxy name from a mounttable.
type PreferredServerResolveProtocols []string
@@ -92,6 +142,7 @@
streamMgr: streamMgr,
publisher: publisher.New(ctx, ns, publishPeriod),
listeners: make(map[stream.Listener]struct{}),
+ proxies: make(map[string]proxyState),
dhcpListeners: make(map[*dhcpListener]struct{}),
stoppedChan: make(chan struct{}),
ns: ns,
@@ -139,14 +190,21 @@
return s, nil
}
-func (s *server) Published() ([]string, error) {
+func (s *server) Status() ipc.ServerStatus {
+ status := ipc.ServerStatus{}
defer vlog.LogCall()()
s.Lock()
defer s.Unlock()
- if s.stopped {
- return nil, s.newBadState("ipc.Server.Stop already called")
+ status.State = externalStates[s.state]
+ status.ServesMountTable = s.servesMountTable
+ status.Mounts = s.publisher.Status()
+ status.Endpoints = make([]naming.Endpoint, len(s.endpoints))
+ copy(status.Endpoints, s.endpoints)
+ status.Proxies = make([]ipc.ProxyStatus, 0, len(s.proxies))
+ for k, v := range s.proxies {
+ status.Proxies = append(status.Proxies, ipc.ProxyStatus{k, v.endpoint, v.err})
}
- return s.publisher.Published(), nil
+ return status
}
// resolveToEndpoint resolves an object name or address to an endpoint.
@@ -306,6 +364,8 @@
}
*/
+// TODO(cnicolaou): get rid of this in a subsequent CL - it's not used
+// and it's not clear it's needed.
type listenError struct {
err verror.E
errors map[struct{ Protocol, Address string }]error
@@ -359,12 +419,9 @@
func (s *server) Listen(listenSpec ipc.ListenSpec) ([]naming.Endpoint, error) {
defer vlog.LogCall()()
s.Lock()
-
- // Shortcut if the server is stopped, to avoid needlessly creating a
- // listener.
- if s.stopped {
- s.Unlock()
- return nil, s.newBadState("ipc.Server.Stop already called")
+ defer s.Unlock()
+ if err := s.allowed(listening, "Listen"); err != nil {
+ return nil, err
}
useProxy := len(listenSpec.Proxy) > 0
@@ -378,7 +435,6 @@
s.active.Done()
}()
}
- s.Unlock()
var ieps []*inaming.Endpoint
@@ -424,18 +480,15 @@
}
// no proxy.
if len(listenSpec.Addrs) > 0 {
+ // TODO(cnicolaou): should be verror2
return nil, fmt.Errorf("no endpoints")
}
+ // TODO(cnicolaou): should be verror2
return nil, fmt.Errorf("no proxy and no addresses requested")
}
- // TODO(cnicolaou): return all of the eps and their errors....
- s.Lock()
- defer s.Unlock()
- if s.stopped {
- closeAll(linfo)
- return nil, errServerStopped
- }
+ // TODO(cnicolaou): return all of the eps and their errors, then again,
+ // it's not clear we need to....
if roaming && listenSpec.StreamPublisher != nil {
// TODO(cnicolaou): renable roaming in a followup CL.
@@ -471,6 +524,7 @@
for i, iep := range ieps {
s.publisher.AddServer(iep.String(), s.servesMountTable)
eps[i] = iep
+ s.endpoints = append(s.endpoints, iep)
}
return eps, nil
}
@@ -491,6 +545,7 @@
}
s.Lock()
s.listeners[ln] = struct{}{}
+ s.proxies[proxy] = proxyState{iep, nil}
s.Unlock()
s.publisher.AddServer(iep.String(), s.servesMountTable)
return iep, ln, nil
@@ -510,7 +565,7 @@
// loop anyway so that we will continue to try and connect to the
// proxy.
s.Lock()
- if s.stopped {
+ if s.isStopState() {
s.Unlock()
return
}
@@ -518,14 +573,17 @@
for {
if ln != nil && iep != nil {
- s.listenLoop(ln, iep)
+ err := s.listenLoop(ln, iep)
// The listener is done, so:
// (1) Unpublish its name
s.publisher.RemoveServer(iep.String())
+ s.Lock()
+ s.proxies[proxy] = proxyState{iep, verror.Make(verror.NoServers, nil, err)}
+ s.Unlock()
}
s.Lock()
- if s.stopped {
+ if s.isStopState() {
s.Unlock()
return
}
@@ -554,7 +612,7 @@
}
}
-func (s *server) listenLoop(ln stream.Listener, ep naming.Endpoint) {
+func (s *server) listenLoop(ln stream.Listener, ep naming.Endpoint) error {
defer vlog.VI(1).Infof("ipc: Stopped listening on %s", ep)
var calls sync.WaitGroup
defer func() {
@@ -567,7 +625,7 @@
flow, err := ln.Accept()
if err != nil {
vlog.VI(10).Infof("ipc: Accept on %v failed: %v", ep, err)
- return
+ return err
}
calls.Add(1)
go func(flow stream.Flow) {
@@ -653,6 +711,9 @@
func (s *server) Serve(name string, obj interface{}, authorizer security.Authorizer) error {
defer vlog.LogCall()()
+ if obj == nil {
+ return s.newBadArg("nil object")
+ }
invoker, err := objectToInvoker(obj)
if err != nil {
return s.newBadArg(fmt.Sprintf("bad object: %v", err))
@@ -662,76 +723,56 @@
func (s *server) ServeDispatcher(name string, disp ipc.Dispatcher) error {
defer vlog.LogCall()()
- s.Lock()
- defer s.Unlock()
- vtrace.GetSpan(s.ctx).Annotate("Serving under name: " + name)
-
- if s.stopped {
- return s.newBadState("ipc.Server.Stop already called")
- }
if disp == nil {
return s.newBadArg("nil dispatcher")
}
- if s.disp != nil {
- return s.newBadState("ipc.Server.Serve/ServeDispatcher already called")
+ s.Lock()
+ defer s.Unlock()
+ if err := s.allowed(serving, "Serve or ServeDispatcher"); err != nil {
+ return err
}
+ vtrace.GetSpan(s.ctx).Annotate("Serving under name: " + name)
s.disp = disp
- s.names = make(map[string]struct{})
if len(name) > 0 {
s.publisher.AddName(name)
- s.names[name] = struct{}{}
}
return nil
}
func (s *server) AddName(name string) error {
defer vlog.LogCall()()
- s.Lock()
- defer s.Unlock()
- vtrace.GetSpan(s.ctx).Annotate("Serving under name: " + name)
if len(name) == 0 {
return s.newBadArg("name is empty")
}
- if s.stopped {
- return s.newBadState("ipc.Server.Stop already called")
+ s.Lock()
+ defer s.Unlock()
+ if err := s.allowed(publishing, "AddName"); err != nil {
+ return err
}
- if s.disp == nil {
- return s.newBadState("adding a name before calling Serve or ServeDispatcher is not allowed")
- }
+ vtrace.GetSpan(s.ctx).Annotate("Serving under name: " + name)
s.publisher.AddName(name)
- // TODO(cnicolaou): remove this map when the publisher's RemoveName
- // method returns an error.
- s.names[name] = struct{}{}
return nil
}
-func (s *server) RemoveName(name string) error {
+func (s *server) RemoveName(name string) {
defer vlog.LogCall()()
s.Lock()
defer s.Unlock()
+ if err := s.allowed(publishing, "RemoveName"); err != nil {
+ return
+ }
vtrace.GetSpan(s.ctx).Annotate("Removed name: " + name)
- if s.stopped {
- return s.newBadState("ipc.Server.Stop already called")
- }
- if s.disp == nil {
- return s.newBadState("removing name before calling Serve or ServeDispatcher is not allowed")
- }
- if _, present := s.names[name]; !present {
- return s.newBadArg(fmt.Sprintf("%q has not been previously used for this server", name))
- }
s.publisher.RemoveName(name)
- delete(s.names, name)
- return nil
}
func (s *server) Stop() error {
defer vlog.LogCall()()
s.Lock()
- if s.stopped {
+ if s.isStopState() {
s.Unlock()
return nil
}
- s.stopped = true
+ s.state = stopping
close(s.stoppedChan)
s.Unlock()
@@ -764,6 +805,7 @@
errCh <- ln.Close()
}(ln)
}
+
for dhcpl, _ := range s.dhcpListeners {
dhcpl.Lock()
dhcpl.publisher.CloseFork(dhcpl.name, dhcpl.ch)
@@ -790,6 +832,7 @@
if firstErr != nil {
return verror.Make(verror.Internal, s.ctx, firstErr)
}
+ s.state = stopped
return nil
}
diff --git a/runtimes/google/ipc/server_test.go b/runtimes/google/ipc/server_test.go
index e59f933..14548b3 100644
--- a/runtimes/google/ipc/server_test.go
+++ b/runtimes/google/ipc/server_test.go
@@ -3,7 +3,9 @@
import (
"fmt"
"os"
+ "path/filepath"
"reflect"
+ "runtime"
"sort"
"strings"
"testing"
@@ -13,6 +15,7 @@
"v.io/core/veyron2/ipc"
"v.io/core/veyron2/naming"
"v.io/core/veyron2/security"
+ verror "v.io/core/veyron2/verror2"
"v.io/core/veyron/lib/expect"
"v.io/core/veyron/lib/modules"
@@ -181,11 +184,10 @@
t.Fatal(err)
}
defer proxy.Stop()
- addrs := verifyMount(t, ns, "proxy")
+ addrs := verifyMount(t, ns, spec.Proxy)
if len(addrs) != 1 {
t.Fatalf("failed to lookup proxy")
}
- proxyEP, _ := naming.SplitAddressName(addrs[0])
eps, err := server.Listen(spec)
if err != nil {
@@ -195,14 +197,13 @@
t.Fatal(err)
}
- ch := make(chan struct{})
// Proxy connections are started asynchronously, so we need to wait..
- waitfor := func(expect int) {
+ waitForMountTable := func(ch chan int, expect int) {
then := time.Now().Add(time.Minute)
for {
me, err := ns.Resolve(testContext(), name)
if err == nil && len(me.Servers) == expect {
- close(ch)
+ ch <- 1
return
}
if time.Now().After(then) {
@@ -211,7 +212,21 @@
time.Sleep(100 * time.Millisecond)
}
}
-
+ waitForServerStatus := func(ch chan int, proxy string) {
+ then := time.Now().Add(time.Minute)
+ for {
+ status := server.Status()
+ if len(status.Proxies) == 1 && status.Proxies[0].Proxy == proxy {
+ ch <- 2
+ return
+ }
+ if time.Now().After(then) {
+ t.Fatalf("timed out")
+ }
+ time.Sleep(100 * time.Millisecond)
+ }
+ }
+ proxyEP, _ := naming.SplitAddressName(addrs[0])
proxiedEP, err := inaming.NewEndpoint(proxyEP)
if err != nil {
t.Fatalf("unexpected error for %q: %s", proxyEP, err)
@@ -224,11 +239,26 @@
// Proxy connetions are created asynchronously, so we wait for the
// expected number of endpoints to appear for the specified service name.
- go waitfor(len(expectedNames))
+ ch := make(chan int, 2)
+ go waitForMountTable(ch, len(expectedNames))
+ go waitForServerStatus(ch, spec.Proxy)
select {
case <-time.After(time.Minute):
- t.Fatalf("timedout waiting for two entries in the mount table")
- case <-ch:
+ t.Fatalf("timedout waiting for two entries in the mount table and server status")
+ case i := <-ch:
+ select {
+ case <-time.After(time.Minute):
+ t.Fatalf("timedout waiting for two entries in the mount table or server status")
+ case j := <-ch:
+ if !((i == 1 && j == 2) || (i == 2 && j == 1)) {
+ t.Fatalf("unexpected return values from waiters")
+ }
+ }
+ }
+
+ status := server.Status()
+ if got, want := status.Proxies[0].Endpoint, proxiedEP; !reflect.DeepEqual(got, want) {
+ t.Fatalf("got %q, want %q", got, want)
}
got := []string{}
@@ -277,6 +307,11 @@
}
verifyMountMissing(t, ns, name)
+ status = server.Status()
+ if len(status.Proxies) != 1 || status.Proxies[0].Proxy != spec.Proxy || !verror.Is(status.Proxies[0].Error, verror.NoServers.ID) {
+ t.Fatalf("proxy status is incorrect: %v", status.Proxies)
+ }
+
// Proxy restarts, calls should eventually start succeeding.
if err := proxy.Start(t, args...); err != nil {
t.Fatal(err)
@@ -298,6 +333,157 @@
}
}
+type statusServer struct{ ch chan struct{} }
+
+func (s *statusServer) Hang(ctx ipc.ServerContext) {
+ <-s.ch
+}
+
+func TestServerStatus(t *testing.T) {
+ sm := imanager.InternalNew(naming.FixedRoutingID(0x555555555))
+ ns := tnaming.NewSimpleNamespace()
+ principal := vc.LocalPrincipal{tsecurity.NewPrincipal("testServerStatus")}
+ server, err := InternalNewServer(testContext(), sm, ns, principal)
+ if err != nil {
+ t.Fatal(err)
+ }
+ status := server.Status()
+ if got, want := status.State, ipc.ServerInit; got != want {
+ t.Fatalf("got %s, want %s", got, want)
+ }
+ server.Listen(ipc.ListenSpec{Addrs: ipc.ListenAddrs{{"tcp", "127.0.0.1:0"}}})
+ status = server.Status()
+ if got, want := status.State, ipc.ServerActive; got != want {
+ t.Fatalf("got %s, want %s", got, want)
+ }
+ serverChan := make(chan struct{})
+ err = server.Serve("test", &statusServer{serverChan}, nil)
+ if err != nil {
+ t.Fatalf(err.Error())
+ }
+ status = server.Status()
+ if got, want := status.State, ipc.ServerActive; got != want {
+ t.Fatalf("got %s, want %s", got, want)
+ }
+
+ progress := make(chan error)
+
+ client, err := InternalNewClient(sm, ns, principal)
+ makeCall := func() {
+ call, err := client.StartCall(testContext(), "test", "Hang", nil)
+ progress <- err
+ progress <- call.Finish()
+ }
+ go makeCall()
+
+ // Wait for RPC to start
+ if err := <-progress; err != nil {
+ t.Fatalf(err.Error())
+ }
+
+ // Stop server asynchronously
+ go func() {
+ err = server.Stop()
+ if err != nil {
+ t.Fatalf(err.Error())
+ }
+ }()
+
+ // Server should enter 'ServerStopping' state.
+ then := time.Now()
+ for {
+ status = server.Status()
+ if got, want := status.State, ipc.ServerStopping; got != want {
+ if time.Now().Sub(then) > time.Minute {
+ t.Fatalf("got %s, want %s", got, want)
+ }
+ } else {
+ break
+ }
+ time.Sleep(100 * time.Millisecond)
+ }
+ // Server won't stop until the statusServer's hung method completes.
+ close(serverChan)
+ // Wait for RPC to finish
+ if err := <-progress; err != nil {
+ t.Fatalf(err.Error())
+ }
+ // Now that the the RPC is done the server should be able to stop.
+ status = server.Status()
+ if got, want := status.State, ipc.ServerStopped; got != want {
+ t.Fatalf("got %s, want %s", got, want)
+ }
+}
+
+func TestServerStates(t *testing.T) {
+ sm := imanager.InternalNew(naming.FixedRoutingID(0x555555555))
+ ns := tnaming.NewSimpleNamespace()
+
+ loc := func() string {
+ _, file, line, _ := runtime.Caller(2)
+ return fmt.Sprintf("%s:%d", filepath.Base(file), line)
+ }
+
+ expectBadState := func(err error) {
+ if !verror.Is(err, verror.BadState.ID) {
+ t.Fatalf("%s: unexpected error: %v", loc(), err)
+ }
+ }
+
+ expectNoError := func(err error) {
+ if err != nil {
+ t.Fatalf("%s: unexpected error: %v", loc(), err)
+ }
+ }
+
+ server, err := InternalNewServer(testContext(), sm, ns)
+ expectNoError(err)
+
+ expectState := func(s ipc.ServerState) {
+ if got, want := server.Status().State, s; got != want {
+ t.Fatalf("%s: got %s, want %s", loc(), got, want)
+ }
+ }
+
+ expectState(ipc.ServerInit)
+
+ // Need to call Listen first.
+ err = server.Serve("", &testServer{}, nil)
+ expectBadState(err)
+ err = server.AddName("a")
+ expectBadState(err)
+
+ _, err = server.Listen(ipc.ListenSpec{Addrs: ipc.ListenAddrs{{"tcp", "127.0.0.1:0"}}})
+ expectNoError(err)
+
+ expectState(ipc.ServerActive)
+
+ err = server.Serve("", &testServer{}, nil)
+ expectNoError(err)
+
+ err = server.Serve("", &testServer{}, nil)
+ expectBadState(err)
+
+ expectState(ipc.ServerActive)
+
+ err = server.AddName("a")
+ expectNoError(err)
+
+ expectState(ipc.ServerActive)
+
+ server.RemoveName("a")
+
+ expectState(ipc.ServerActive)
+
+ err = server.Stop()
+ expectNoError(err)
+ err = server.Stop()
+ expectNoError(err)
+
+ err = server.AddName("a")
+ expectBadState(err)
+}
+
// Required by modules framework.
func TestHelperProcess(t *testing.T) {
modules.DispatchInTest()
diff --git a/runtimes/google/lib/publisher/publisher.go b/runtimes/google/lib/publisher/publisher.go
index 5ccf6e7..fd4310c 100644
--- a/runtimes/google/lib/publisher/publisher.go
+++ b/runtimes/google/lib/publisher/publisher.go
@@ -5,17 +5,16 @@
import (
"fmt"
+ "sort"
"strings"
"time"
"v.io/core/veyron2/context"
+ "v.io/core/veyron2/ipc"
"v.io/core/veyron2/naming"
"v.io/core/veyron2/vlog"
)
-// TODO(cnicolaou): have the done channel return an error so
-// that the publisher calls can return errors also.
-
// Publisher manages the publishing of servers in mounttable.
type Publisher interface {
// AddServer adds a new server to be mounted.
@@ -26,8 +25,8 @@
AddName(name string)
// RemoveName removes a name.
RemoveName(name string)
- // Published returns the published names rooted at the mounttable.
- Published() []string
+ // Status returns a snapshot of the publisher's current state.
+ Status() ipc.MountState
// DebugString returns a string representation of the publisher
// meant solely for debugging.
DebugString() string
@@ -75,7 +74,7 @@
type debugCmd chan string // debug string is sent when the cmd is done
-type publishedCmd chan []string // published names are sent when cmd is done
+type statusCmd chan ipc.MountState // status info is sent when cmd is done
type stopCmd struct{} // sent to the runloop when we want it to exit.
@@ -126,16 +125,12 @@
}
}
-// Published returns the published name(s) for this publisher, where each name
-// is rooted at the mount table(s) where the name has been mounted.
-// The names are returned grouped by published name, where all the names
-// corresponding the the mount table replicas are grouped together.
-func (p *publisher) Published() []string {
- published := make(publishedCmd)
- if p.sendCmd(published) {
- return <-published
+func (p *publisher) Status() ipc.MountState {
+ status := make(statusCmd)
+ if p.sendCmd(status) {
+ return <-status
}
- return []string{}
+ return ipc.MountState{}
}
func (p *publisher) DebugString() (dbg string) {
@@ -190,8 +185,8 @@
case removeNameCmd:
state.removeName(tcmd.name)
close(tcmd.done)
- case publishedCmd:
- tcmd <- state.published()
+ case statusCmd:
+ tcmd <- state.getStatus()
close(tcmd)
case debugCmd:
tcmd <- state.debugString()
@@ -204,29 +199,21 @@
}
}
+type mountKey struct {
+ name, server string
+}
+
// pubState maintains the state for our periodic mounts. It is not thread-safe;
// it's only used in the sequential publisher runLoop.
type pubState struct {
ctx *context.T
ns naming.Namespace
period time.Duration
- deadline time.Time // deadline for the next sync call
- names map[string]bool // names that have been added
- servers map[string]bool // servers that have been added, true
- mounts map[mountKey]*mountStatus // map each (name,server) to its status
- // if server is a mount table server
-}
-
-type mountKey struct {
- name string
- server string
-}
-
-type mountStatus struct {
- lastMount time.Time
- lastMountErr error
- lastUnmount time.Time
- lastUnmountErr error
+ deadline time.Time // deadline for the next sync call
+ names map[string]bool // names that have been added
+ servers map[string]bool // servers that have been added, true
+ // map each (name,server) to its status.
+ mounts map[mountKey]*ipc.MountStatus
}
func newPubState(ctx *context.T, ns naming.Namespace, period time.Duration) *pubState {
@@ -237,7 +224,7 @@
deadline: time.Now().Add(period),
names: make(map[string]bool),
servers: make(map[string]bool),
- mounts: make(map[mountKey]*mountStatus),
+ mounts: make(map[mountKey]*ipc.MountStatus),
}
}
@@ -253,7 +240,7 @@
}
ps.names[name] = true
for server, servesMT := range ps.servers {
- status := new(mountStatus)
+ status := new(ipc.MountStatus)
ps.mounts[mountKey{name, server}] = status
ps.mount(name, server, status, servesMT)
}
@@ -277,7 +264,7 @@
if !ps.servers[server] {
ps.servers[server] = servesMT
for name, _ := range ps.names {
- status := new(mountStatus)
+ status := new(ipc.MountStatus)
ps.mounts[mountKey{name, server}] = status
ps.mount(name, server, status, servesMT)
}
@@ -296,15 +283,16 @@
}
}
-func (ps *pubState) mount(name, server string, status *mountStatus, servesMT bool) {
+func (ps *pubState) mount(name, server string, status *ipc.MountStatus, servesMT bool) {
// Always mount with ttl = period + slack, regardless of whether this is
// triggered by a newly added server or name, or by sync. The next call
// to sync will occur within the next period, and refresh all mounts.
ttl := ps.period + mountTTLSlack
- status.lastMount = time.Now()
- status.lastMountErr = ps.ns.Mount(ps.ctx, name, server, ttl, naming.ServesMountTableOpt(servesMT))
- if status.lastMountErr != nil {
- vlog.Errorf("ipc pub: couldn't mount(%v, %v, %v): %v", name, server, ttl, status.lastMountErr)
+ status.LastMount = time.Now()
+ status.LastMountErr = ps.ns.Mount(ps.ctx, name, server, ttl, naming.ServesMountTableOpt(servesMT))
+ status.TTL = ttl
+ if status.LastMountErr != nil {
+ vlog.Errorf("ipc pub: couldn't mount(%v, %v, %v): %v", name, server, ttl, status.LastMountErr)
} else {
vlog.VI(2).Infof("ipc pub: mount(%v, %v, %v)", name, server, ttl)
}
@@ -313,7 +301,7 @@
func (ps *pubState) sync() {
ps.deadline = time.Now().Add(ps.period) // set deadline for the next sync
for key, status := range ps.mounts {
- if status.lastUnmountErr != nil {
+ if status.LastUnmountErr != nil {
// Desired state is "unmounted", failed at previous attempt. Retry.
ps.unmount(key.name, key.server, status)
} else {
@@ -322,11 +310,11 @@
}
}
-func (ps *pubState) unmount(name, server string, status *mountStatus) {
- status.lastUnmount = time.Now()
- status.lastUnmountErr = ps.ns.Unmount(ps.ctx, name, server)
- if status.lastUnmountErr != nil {
- vlog.Errorf("ipc pub: couldn't unmount(%v, %v): %v", name, server, status.lastUnmountErr)
+func (ps *pubState) unmount(name, server string, status *ipc.MountStatus) {
+ status.LastUnmount = time.Now()
+ status.LastUnmountErr = ps.ns.Unmount(ps.ctx, name, server)
+ if status.LastUnmountErr != nil {
+ vlog.Errorf("ipc pub: couldn't unmount(%v, %v): %v", name, server, status.LastUnmountErr)
} else {
vlog.VI(2).Infof("ipc pub: unmount(%v, %v)", name, server)
delete(ps.mounts, mountKey{name, server})
@@ -339,32 +327,43 @@
}
}
-func (ps *pubState) published() []string {
+func copyToSlice(sl map[string]bool) []string {
var ret []string
- for name, _ := range ps.names {
- e, err := ps.ns.ResolveToMountTable(ps.ctx, name)
- if err != nil {
- vlog.Errorf("ipc pub: couldn't resolve %v to mount table: %v", name, err)
+ for s, _ := range sl {
+ if len(s) == 0 {
continue
}
- if len(e.Servers) == 0 {
- vlog.Errorf("ipc pub: no mount table found for %v", name)
- continue
- }
- for _, s := range e.Servers {
- ret = append(ret, naming.JoinAddressName(s.Server, e.Name))
- }
+ ret = append(ret, s)
}
return ret
}
+func (ps *pubState) getStatus() ipc.MountState {
+ st := make([]ipc.MountStatus, 0, len(ps.mounts))
+ names := copyToSlice(ps.names)
+ servers := copyToSlice(ps.servers)
+ sort.Strings(names)
+ sort.Strings(servers)
+ for _, name := range names {
+ for _, server := range servers {
+ if v := ps.mounts[mountKey{name, server}]; v != nil {
+ mst := *v
+ mst.Name = name
+ mst.Server = server
+ st = append(st, mst)
+ }
+ }
+ }
+ return st
+}
+
// TODO(toddw): sort the names/servers so that the output order is stable.
func (ps *pubState) debugString() string {
l := make([]string, 2+len(ps.mounts))
l = append(l, fmt.Sprintf("Publisher period:%v deadline:%v", ps.period, ps.deadline))
l = append(l, "==============================Mounts============================================")
for key, status := range ps.mounts {
- l = append(l, fmt.Sprintf("[%s,%s] mount(%v, %v) unmount(%v, %v)", key.name, key.server, status.lastMount, status.lastMountErr, status.lastUnmount, status.lastUnmountErr))
+ l = append(l, fmt.Sprintf("[%s,%s] mount(%v, %v, %s) unmount(%v, %v)", key.name, key.server, status.LastMount, status.LastMountErr, status.TTL, status.LastUnmount, status.LastUnmountErr))
}
return strings.Join(l, "\n")
}
diff --git a/runtimes/google/lib/publisher/publisher_test.go b/runtimes/google/lib/publisher/publisher_test.go
index a947799..1fe0992 100644
--- a/runtimes/google/lib/publisher/publisher_test.go
+++ b/runtimes/google/lib/publisher/publisher_test.go
@@ -1,6 +1,7 @@
package publisher_test
import (
+ "fmt"
"reflect"
"sort"
"testing"
@@ -11,11 +12,16 @@
"v.io/core/veyron2/vtrace"
"v.io/core/veyron/lib/flags"
+ "v.io/core/veyron/lib/testutil"
"v.io/core/veyron/runtimes/google/lib/publisher"
tnaming "v.io/core/veyron/runtimes/google/testing/mocks/naming"
ivtrace "v.io/core/veyron/runtimes/google/vtrace"
)
+func init() {
+ testutil.Init()
+}
+
func testContext() *context.T {
var ctx *context.T
ctx, err := ivtrace.Init(ctx, flags.VtraceFlags{})
@@ -60,3 +66,61 @@
t.Errorf("expected an error")
}
}
+
+func TestStatus(t *testing.T) {
+ ns := tnaming.NewSimpleNamespace()
+ pub := publisher.New(testContext(), ns, time.Second)
+ pub.AddName("foo")
+ status := pub.Status()
+ if got, want := len(status), 0; got != want {
+ t.Errorf("got %d, want %d", got, want)
+ }
+ pub.AddServer("foo-addr", false)
+
+ // Wait for the publisher to asynchronously publish server the
+ // requisite number of servers.
+ ch := make(chan error, 1)
+ waitFor := func(n int) {
+ then := time.Now()
+ for {
+ status = pub.Status()
+ if got, want := len(status), n; got != want {
+ if time.Now().Sub(then) > time.Minute {
+ ch <- fmt.Errorf("got %d, want %d", got, want)
+ return
+ }
+ time.Sleep(100 * time.Millisecond)
+ } else {
+ ch <- nil
+ return
+ }
+ }
+ }
+
+ go waitFor(1)
+ if err := <-ch; err != nil {
+ t.Fatalf("%s", err)
+ }
+
+ pub.AddServer("bar-addr", false)
+ pub.AddName("baz")
+ status = pub.Status()
+ names := status.Names()
+ if got, want := names, []string{"baz", "foo"}; !reflect.DeepEqual(got, want) {
+ t.Errorf("got %q, want %q", got, want)
+ }
+ servers := status.Servers()
+ if got, want := servers, []string{"bar-addr", "foo-addr"}; !reflect.DeepEqual(got, want) {
+ t.Errorf("got %q, want %q", got, want)
+ }
+ pub.RemoveName("foo")
+ if _, err := ns.Resolve(testContext(), "foo"); err == nil {
+ t.Errorf("expected an error")
+ }
+ status = pub.Status()
+
+ go waitFor(2)
+ if err := <-ch; err != nil {
+ t.Fatalf("%s", err)
+ }
+}
diff --git a/services/identity/server/identityd.go b/services/identity/server/identityd.go
index c1b2c99..1aafdd1 100644
--- a/services/identity/server/identityd.go
+++ b/services/identity/server/identityd.go
@@ -170,13 +170,8 @@
if err := server.ServeDispatcher(objectAddr, dispatcher); err != nil {
return nil, nil, fmt.Errorf("failed to start Veyron services: %v", err)
}
- published, _ := server.Published()
- if len(published) == 0 {
- // No addresses successfully published, return what we expect to be published at.
- published = []string{rootedObjectAddr}
- }
vlog.Infof("Blessing and discharger services will be published at %v", rootedObjectAddr)
- return server, published, nil
+ return server, []string{rootedObjectAddr}, nil
}
// newDispatcher returns a dispatcher for both the blessing and the
diff --git a/tools/naming/simulator/proxy.scr b/tools/naming/simulator/proxy.scr
index 1093006..cb0d6bc 100644
--- a/tools/naming/simulator/proxy.scr
+++ b/tools/naming/simulator/proxy.scr
@@ -34,7 +34,7 @@
assert $PN 7
set PROXY_ADDR=$P2
set PROXY_RID=$P3
-eval $proxy
+
# TODO(cnicolaou): figure out why ls appears to run slowly when a proxy is
# running, maybe a problem with the mount table.