veyron/runtimes/google/ipc/...: Proxy improvements.
(1) The proxy explicitly communicates the external endpoint for a proxied server.
The previous scheme of having the server derive the endpoint from the address
used to connect to the proxy was flawed as it didn't allow for the possibility
of different network addresses being used to connect to the proxy and to connect
*through* the proxy. (For example, a proxy server might be running on your
local network and servers would connect to it at say 192.168.1.1, while external
clients will connect to it through the public IP of the proxy)
(2) Reconnection to a proxy is now done at by ipc.Server instead of
stream.Manager.Listener. Two improvements as a result of this:
(a) As soon as the proxy connection dies, the server unmounts the
(unusable) proxied address.
(b) If the proxy address is itself a veyron name, then the name gets
resolved to an endpoint every time a reconnect happens.
This allows for the proxy servers to physically move around.
Change-Id: I7f23f3ff784b82e0429c0efe87c23eebd9127983
diff --git a/runtimes/google/ipc/full_test.go b/runtimes/google/ipc/full_test.go
index 32f4e91..4729cef 100644
--- a/runtimes/google/ipc/full_test.go
+++ b/runtimes/google/ipc/full_test.go
@@ -15,6 +15,7 @@
"veyron/lib/testutil"
"veyron/lib/testutil/blackbox"
imanager "veyron/runtimes/google/ipc/stream/manager"
+ "veyron/runtimes/google/ipc/stream/proxy"
"veyron/runtimes/google/ipc/stream/vc"
"veyron/runtimes/google/ipc/version"
"veyron/runtimes/google/lib/publisher"
@@ -867,6 +868,112 @@
}
}
+type proxyHandle struct {
+ MT naming.MountTable
+ process *blackbox.Child
+ mount string
+}
+
+func (h *proxyHandle) Start(t *testing.T) error {
+ h.process = blackbox.HelperCommand(t, "runProxy")
+ h.process.Cmd.Start()
+ var err error
+ if h.mount, err = h.process.ReadLineFromChild(); err != nil {
+ return err
+ }
+ if err := h.MT.Mount(&fakeContext{}, "proxy", h.mount, time.Hour); err != nil {
+ return err
+ }
+ return nil
+}
+
+func (h *proxyHandle) Stop() error {
+ if h.process == nil {
+ return nil
+ }
+ h.process.Cleanup()
+ h.process = nil
+ if len(h.mount) == 0 {
+ return nil
+ }
+ return h.MT.Unmount(&fakeContext{}, "proxy", h.mount)
+}
+
+func TestProxy(t *testing.T) {
+ sm := imanager.InternalNew(naming.FixedRoutingID(0x555555555))
+ mt := newMountTable()
+ client, err := InternalNewClient(sm, mt, veyron2.LocalID(clientID))
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer client.Close()
+ server, err := InternalNewServer(InternalNewContext(), sm, mt, veyron2.LocalID(serverID))
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer server.Stop()
+ if err := server.Register("server", testServerDisp{&testServer{}}); err != nil {
+ t.Fatal(err)
+ }
+
+ name := "mountpoint/server/suffix"
+ makeCall := func() (string, error) {
+ call, err := client.StartCall(&fakeContext{}, name, "Echo", []interface{}{"batman"})
+ if err != nil {
+ return "", err
+ }
+ var result string
+ if err = call.Finish(&result); err != nil {
+ return "", err
+ }
+ return result, nil
+ }
+ proxy := &proxyHandle{MT: mt}
+ if err := proxy.Start(t); err != nil {
+ t.Fatal(err)
+ }
+ defer proxy.Stop()
+ if _, err := server.Listen(inaming.Network, "proxy"); err != nil {
+ t.Fatal(err)
+ }
+ if err := server.Publish("mountpoint"); err != nil {
+ t.Fatal(err)
+ }
+ verifyMount(t, mt, name)
+ // Proxied endpoint should be published and RPC should succeed (through proxy)
+ const expected = `method:"Echo",suffix:"suffix",arg:"batman"`
+ if result, err := makeCall(); result != expected || err != nil {
+ t.Fatalf("Got (%v, %v) want (%v, nil)", result, err, expected)
+ }
+
+ // Proxy dies, calls should fail and the name should be unmounted.
+ if err := proxy.Stop(); err != nil {
+ t.Fatal(err)
+ }
+ if result, err := makeCall(); err == nil {
+ t.Fatalf(`Got (%v, %v) want ("", <non-nil>) as proxy is down`, result, err)
+ }
+ for {
+ if _, err := mt.Resolve(InternalNewContext(), name); err != nil {
+ break
+ }
+ }
+ verifyMountMissing(t, mt, name)
+
+ // Proxy restarts, calls should eventually start succeeding.
+ if err := proxy.Start(t); err != nil {
+ t.Fatal(err)
+ }
+ for {
+ if result, err := makeCall(); err == nil {
+ if result != expected {
+ t.Errorf("Got (%v, %v) want (%v, nil)", result, err, expected)
+ }
+ break
+ }
+ }
+}
+
func loadIdentityFromFile(file string) security.PrivateID {
f, err := os.Open(file)
if err != nil {
@@ -902,6 +1009,19 @@
<-make(chan struct{})
}
+func runProxy([]string) {
+ rid, err := naming.NewRoutingID()
+ if err != nil {
+ vlog.Fatal(err)
+ }
+ proxy, err := proxy.New(rid, nil, "tcp4", "127.0.0.1:0")
+ if err != nil {
+ vlog.Fatal(err)
+ }
+ fmt.Println("/" + proxy.Endpoint().String())
+ <-make(chan struct{})
+}
+
// Required by blackbox framework.
func TestHelperProcess(t *testing.T) {
blackbox.HelperProcess(t)
@@ -919,4 +1039,5 @@
isecurity.TrustIdentityProviders(serverID)
blackbox.CommandTable["runServer"] = runServer
+ blackbox.CommandTable["runProxy"] = runProxy
}
diff --git a/runtimes/google/ipc/server.go b/runtimes/google/ipc/server.go
index 3f2395b..5cf77d8 100644
--- a/runtimes/google/ipc/server.go
+++ b/runtimes/google/ipc/server.go
@@ -35,26 +35,30 @@
type server struct {
sync.Mutex
- ctx context.T // context used by the server to make internal RPCs.
- streamMgr stream.Manager // stream manager to listen for new flows.
- disptrie *disptrie // dispatch trie for method dispatching.
- publisher publisher.Publisher // publisher to publish mounttable mounts.
- listenerOpts []stream.ListenerOpt // listener opts passed to Listen.
- listeners []stream.Listener // listeners created by Listen.
- active sync.WaitGroup // active goroutines we've spawned.
- stopped bool // whether the server has been stopped.
+ ctx context.T // context used by the server to make internal RPCs.
+ streamMgr stream.Manager // stream manager to listen for new flows.
+ disptrie *disptrie // dispatch trie for method dispatching.
+ publisher publisher.Publisher // publisher to publish mounttable mounts.
+ listenerOpts []stream.ListenerOpt // listener opts passed to Listen.
+ listeners map[stream.Listener]bool // listeners created by Listen.
+ active sync.WaitGroup // active goroutines we've spawned.
+ stopped bool // whether the server has been stopped.
+ stoppedChan chan struct{} // closed when the server has been stopped.
mt naming.MountTable
publishOpt veyron2.ServerPublishOpt // which endpoints to publish
+ publishing bool // is some name being published?
servesMountTable bool
}
func InternalNewServer(ctx context.T, streamMgr stream.Manager, mt naming.MountTable, opts ...ipc.ServerOpt) (ipc.Server, error) {
s := &server{
- ctx: ctx,
- streamMgr: streamMgr,
- disptrie: newDisptrie(),
- publisher: publisher.New(ctx, mt, publishPeriod),
- mt: mt,
+ ctx: ctx,
+ streamMgr: streamMgr,
+ disptrie: newDisptrie(),
+ publisher: publisher.New(ctx, mt, publishPeriod),
+ listeners: make(map[stream.Listener]bool),
+ stoppedChan: make(chan struct{}),
+ mt: mt,
}
for _, opt := range opts {
switch opt := opt.(type) {
@@ -122,7 +126,9 @@
return nil, errServerStopped
}
s.Unlock()
+ var proxyName string
if protocol == inaming.Network {
+ proxyName = address
address = s.resolveToAddress(address)
}
ln, ep, err := s.streamMgr.Listen(protocol, address, s.listenerOpts...)
@@ -137,48 +143,110 @@
ln.Close()
return nil, errServerStopped
}
- s.listeners = append(s.listeners, ln)
+ publish := s.publishOpt == veyron2.PublishAll || !s.publishing
+ s.publishing = true
+ s.listeners[ln] = true
// We have a single goroutine per listener to accept new flows. Each flow is
// served from its own goroutine.
s.active.Add(1)
- go func(ln stream.Listener, ep naming.Endpoint) {
- defer vlog.VI(1).Infof("ipc: Stopped listening on %v", ep)
- for {
- flow, err := ln.Accept()
- if err != nil {
- vlog.VI(10).Infof("ipc: Accept on %v %v failed: %v", protocol, address, err)
- s.active.Done()
- return
- }
- s.active.Add(1)
- go func(flow stream.Flow) {
- if err := newFlowServer(flow, s).serve(); err != nil {
- // TODO(caprita): Logging errors here is
- // too spammy. For example, "not
- // authorized" errors shouldn't be
- // logged as server errors.
- vlog.Errorf("Flow serve on (%v, %v) failed: %v", protocol, address, err)
- }
- s.active.Done()
- }(flow)
- }
- }(ln, ep)
- var publishEP string
- if s.publishOpt == veyron2.PublishAll || len(s.listeners) == 1 {
- publishEP = naming.JoinAddressName(ep.String(), "")
+ if protocol == inaming.Network {
+ go func(ln stream.Listener, ep naming.Endpoint, proxy string, publish bool) {
+ s.proxyListenLoop(ln, ep, proxy, publish)
+ s.active.Done()
+ }(ln, ep, proxyName, publish)
+ } else {
+ go func(ln stream.Listener, ep naming.Endpoint) {
+ s.listenLoop(ln, ep)
+ s.active.Done()
+ }(ln, ep)
}
s.Unlock()
- if len(publishEP) > 0 {
- if !s.servesMountTable {
- // Make sure that client MountTable code doesn't try and
- // ResolveStep past this final address.
- publishEP += "//"
- }
- s.publisher.AddServer(publishEP)
+ if publish {
+ s.publisher.AddServer(s.publishEP(ep))
}
return ep, nil
}
+func (s *server) publishEP(ep naming.Endpoint) string {
+ var name string
+ if !s.servesMountTable {
+ // Make sure that client MountTable code doesn't try and
+ // ResolveStep past this final address.
+ name = "//"
+ }
+ return naming.JoinAddressName(ep.String(), name)
+}
+
+func (s *server) proxyListenLoop(ln stream.Listener, ep naming.Endpoint, proxy string, publish bool) {
+ const (
+ min = 5 * time.Millisecond
+ max = 5 * time.Minute
+ )
+ for {
+ s.listenLoop(ln, ep)
+ // The listener is done, so:
+ // (1) Unpublish its name
+ if publish {
+ s.publisher.RemoveServer(s.publishEP(ep))
+ }
+ // (2) Reconnect to the proxy unless the server has been stopped
+ backoff := min
+ ln = nil
+ for ln == nil {
+ select {
+ case <-time.After(backoff):
+ proxy = s.resolveToAddress(proxy)
+ var err error
+ ln, ep, err = s.streamMgr.Listen(inaming.Network, proxy, s.listenerOpts...)
+ if err == nil {
+ vlog.VI(1).Infof("Reconnected to proxy at %v listener: (%v, %v)", proxy, ln, ep)
+ break
+ }
+ if backoff = backoff * 2; backoff > max {
+ backoff = max
+ }
+ vlog.VI(1).Infof("Proxy reconnection failed, will retry in %v", backoff)
+ case <-s.stoppedChan:
+ return
+ }
+ }
+ // (3) reconnected, publish new address
+ if publish {
+ s.publisher.AddServer(s.publishEP(ep))
+ }
+ s.Lock()
+ s.listeners[ln] = true
+ s.Unlock()
+ }
+}
+
+func (s *server) listenLoop(ln stream.Listener, ep naming.Endpoint) {
+ defer vlog.VI(1).Infof("ipc: Stopped listening on %v", ep)
+ defer func() {
+ s.Lock()
+ delete(s.listeners, ln)
+ s.Unlock()
+ }()
+ for {
+ flow, err := ln.Accept()
+ if err != nil {
+ vlog.VI(10).Infof("ipc: Accept on %v failed: %v", ln, err)
+ return
+ }
+ s.active.Add(1)
+ go func(flow stream.Flow) {
+ if err := newFlowServer(flow, s).serve(); err != nil {
+ // TODO(caprita): Logging errors here is
+ // too spammy. For example, "not
+ // authorized" errors shouldn't be
+ // logged as server errors.
+ vlog.Errorf("Flow serve on %v failed: %v", ln, err)
+ }
+ s.active.Done()
+ }(flow)
+ }
+}
+
func (s *server) Publish(name string) error {
s.publisher.AddName(name)
return nil
@@ -191,6 +259,7 @@
return nil
}
s.stopped = true
+ close(s.stoppedChan)
s.Unlock()
// Note, It's safe to Stop/WaitForStop on the publisher outside of the
@@ -212,7 +281,7 @@
// flows will continue until they terminate naturally.
nListeners := len(s.listeners)
errCh := make(chan error, nListeners)
- for _, ln := range s.listeners {
+ for ln, _ := range s.listeners {
go func(ln stream.Listener) {
errCh <- ln.Close()
}(ln)
diff --git a/runtimes/google/ipc/stream/manager/listener.go b/runtimes/google/ipc/stream/manager/listener.go
index 33c81d7..2bafdbb 100644
--- a/runtimes/google/ipc/stream/manager/listener.go
+++ b/runtimes/google/ipc/stream/manager/listener.go
@@ -6,11 +6,11 @@
"net"
"strings"
"sync"
- "time"
"veyron/runtimes/google/ipc/stream/proxy"
"veyron/runtimes/google/ipc/stream/vif"
"veyron/runtimes/google/lib/upcqueue"
+ inaming "veyron/runtimes/google/naming"
"veyron2/ipc/stream"
"veyron2/naming"
@@ -45,10 +45,7 @@
q *upcqueue.T
proxyEP naming.Endpoint
manager *manager
-
- reconnect chan bool // true when the proxy connection dies, false when the listener is being closed.
- stopped chan struct{} // closed when reconnectLoop exits
- opts []stream.ListenerOpt
+ opts []stream.ListenerOpt
}
func newNetListener(m *manager, netLn net.Listener, opts []stream.ListenerOpt) listener {
@@ -120,7 +117,7 @@
}
func (ln *netListener) String() string {
- return fmt.Sprintf("%T: %v", ln, ln.netLn.Addr())
+ return fmt.Sprintf("%T: (%v, %v)", ln, ln.netLn.Addr().Network(), ln.netLn.Addr())
}
func (ln *netListener) DebugString() string {
@@ -136,31 +133,29 @@
return strings.Join(ret, "\n")
}
-func newProxyListener(m *manager, ep naming.Endpoint, opts []stream.ListenerOpt) (listener, error) {
+func newProxyListener(m *manager, ep naming.Endpoint, opts []stream.ListenerOpt) (listener, naming.Endpoint, error) {
ln := &proxyListener{
- q: upcqueue.New(),
- proxyEP: ep,
- manager: m,
- reconnect: make(chan bool),
- stopped: make(chan struct{}),
- opts: opts,
+ q: upcqueue.New(),
+ proxyEP: ep,
+ manager: m,
+ opts: opts,
}
- vf, err := ln.connect()
+ vf, ep, err := ln.connect()
if err != nil {
- return nil, err
+ return nil, nil, err
}
- go ln.reconnectLoop(vf)
- return ln, nil
+ go vifLoop(vf, ln.q)
+ return ln, ep, nil
}
-func (ln *proxyListener) connect() (*vif.VIF, error) {
+func (ln *proxyListener) connect() (*vif.VIF, naming.Endpoint, error) {
vlog.VI(1).Infof("Connecting to proxy at %v", ln.proxyEP)
vf, err := ln.manager.FindOrDialVIF(ln.proxyEP.Addr())
if err != nil {
- return nil, err
+ return nil, nil, err
}
if err := vf.StartAccepting(ln.opts...); err != nil {
- return nil, fmt.Errorf("already connected to proxy and accepting connections? VIF: %v, StartAccepting error: %v", vf, err)
+ return nil, nil, fmt.Errorf("already connected to proxy and accepting connections? VIF: %v, StartAccepting error: %v", vf, err)
}
// Proxy protocol: See veyron/runtimes/google/ipc/stream/proxy/protocol.vdl
// Requires dialing a VC to the proxy, need to extract options (like the identity)
@@ -177,96 +172,42 @@
if verror.ErrorID(err) == verror.Aborted {
ln.manager.vifs.Delete(vf)
}
- return nil, fmt.Errorf("VC establishment with proxy failed: %v", err)
+ return nil, nil, fmt.Errorf("VC establishment with proxy failed: %v", err)
}
flow, err := vc.Connect()
if err != nil {
vf.StopAccepting()
- return nil, fmt.Errorf("unable to create liveness check flow to proxy: %v", err)
+ return nil, nil, fmt.Errorf("unable to create liveness check flow to proxy: %v", err)
}
var request proxy.Request
var response proxy.Response
if err := vom.NewEncoder(flow).Encode(request); err != nil {
flow.Close()
vf.StopAccepting()
- return nil, fmt.Errorf("failed to encode request to proxy: %v", err)
+ return nil, nil, fmt.Errorf("failed to encode request to proxy: %v", err)
}
if err := vom.NewDecoder(flow).Decode(&response); err != nil {
flow.Close()
vf.StopAccepting()
- return nil, fmt.Errorf("failed to decode response from proxy: %v", err)
+ return nil, nil, fmt.Errorf("failed to decode response from proxy: %v", err)
}
if response.Error != nil {
flow.Close()
vf.StopAccepting()
- return nil, fmt.Errorf("proxy error: %v", response.Error)
+ return nil, nil, fmt.Errorf("proxy error: %v", response.Error)
}
-
- go func(vf *vif.VIF, flow stream.Flow) {
+ ep, err := inaming.NewEndpoint(response.Endpoint)
+ if err != nil {
+ flow.Close()
+ vf.StopAccepting()
+ return nil, nil, fmt.Errorf("proxy returned invalid endpoint(%v): %v", response.Endpoint, err)
+ }
+ go func(vf *vif.VIF, flow stream.Flow, q *upcqueue.T) {
<-flow.Closed()
vf.StopAccepting()
- }(vf, flow)
- return vf, nil
-}
-
-func (ln *proxyListener) reconnectLoop(vf *vif.VIF) {
- const (
- min = 5 * time.Millisecond
- max = 5 * time.Minute
- )
- defer close(ln.stopped)
- go ln.vifLoop(vf)
- for {
- if retry := <-ln.reconnect; !retry {
- ln.waitForVIFLoop(vf)
- return
- }
- vlog.VI(3).Infof("Connection to proxy at %v broke. Re-establishing", ln.proxyEP)
- backoff := min
- for {
- var err error
- if vf, err = ln.connect(); err == nil {
- go ln.vifLoop(vf)
- vlog.VI(3).Infof("Proxy reconnect (%v) succeeded", ln.proxyEP)
- break
- }
- vlog.VI(3).Infof("Proxy reconnect (%v) FAILED (%v). Retrying in %v", ln.proxyEP, err, backoff)
- select {
- case retry := <-ln.reconnect:
- // Invariant: ln.vifLoop is not running. Thus,
- // the only writer to ln.reconnect is ln.Close,
- // which always writes false.
- if retry {
- vlog.Errorf("retry=true in %v: rogue vifLoop running?", ln)
- }
- ln.waitForVIFLoop(vf)
- return
- case <-time.After(backoff):
- if backoff = backoff * 2; backoff > max {
- backoff = max
- }
- }
- }
- }
-}
-
-func (ln *proxyListener) vifLoop(vf *vif.VIF) {
- vifLoop(vf, ln.q)
- ln.reconnect <- true
-}
-
-func (ln *proxyListener) waitForVIFLoop(vf *vif.VIF) {
- if vf == nil {
- return
- }
- // ln.vifLoop is running, wait for it to exit. (when it exits, it will
- // send "true" on the reconnect channel)
- vf.StopAccepting()
- for retry := range ln.reconnect {
- if retry {
- return
- }
- }
+ q.Close()
+ }(vf, flow, ln.q)
+ return vf, ep, nil
}
func (ln *proxyListener) Accept() (stream.Flow, error) {
@@ -282,13 +223,15 @@
}
func (ln *proxyListener) Close() error {
- ln.reconnect <- false // tell reconnectLoop to stop
- <-ln.stopped // wait for reconnectLoop to exit
ln.q.Shutdown()
ln.manager.removeListener(ln)
return nil
}
+func (ln *proxyListener) String() string {
+ return ln.DebugString()
+}
+
func (ln *proxyListener) DebugString() string {
return fmt.Sprintf("stream.Listener: PROXY:%v RoutingID:%v", ln.proxyEP, ln.manager.rid)
}
diff --git a/runtimes/google/ipc/stream/manager/manager.go b/runtimes/google/ipc/stream/manager/manager.go
index 3ce280a..7e3a279 100644
--- a/runtimes/google/ipc/stream/manager/manager.go
+++ b/runtimes/google/ipc/stream/manager/manager.go
@@ -163,11 +163,7 @@
}
func (m *manager) remoteListen(proxy naming.Endpoint, listenerOpts []stream.ListenerOpt) (stream.Listener, naming.Endpoint, error) {
- ep, err := version.ProxiedEndpoint(m.rid, proxy)
- if err != nil {
- return nil, nil, err
- }
- ln, err := newProxyListener(m, proxy, listenerOpts)
+ ln, ep, err := newProxyListener(m, proxy, listenerOpts)
if err != nil {
return nil, nil, err
}
diff --git a/runtimes/google/ipc/stream/proxy/protocol.vdl b/runtimes/google/ipc/stream/proxy/protocol.vdl
index 48a73ab..591e477 100644
--- a/runtimes/google/ipc/stream/proxy/protocol.vdl
+++ b/runtimes/google/ipc/stream/proxy/protocol.vdl
@@ -12,10 +12,6 @@
// traffic intended for the server's RoutingID to the network connection
// between the server and the proxy.
type Request struct {
- // TODO(ashankar): Things that will go in here include the mounttable and the name
- // the server wants the proxy to use when mounting the server on the mounttable,
- // possibly a blessing to give the proxy the credentials required to act as the
- // server as far as the mounttable is concerned.
}
// Response is sent by the proxy to the server after processing Request.
@@ -23,4 +19,7 @@
// Error is a description of why the proxy refused to proxy the server.
// A nil error indicates that the proxy will route traffic to the server.
Error error
+ // Endpoint is the string representation of an endpoint that can be
+ // used to communicate with the server through the proxy.
+ Endpoint string
}
\ No newline at end of file
diff --git a/runtimes/google/ipc/stream/proxy/protocol.vdl.go b/runtimes/google/ipc/stream/proxy/protocol.vdl.go
index 7bd2229..c5d943f 100644
--- a/runtimes/google/ipc/stream/proxy/protocol.vdl.go
+++ b/runtimes/google/ipc/stream/proxy/protocol.vdl.go
@@ -14,4 +14,7 @@
// Error is a description of why the proxy refused to proxy the server.
// A nil error indicates that the proxy will route traffic to the server.
Error error
+ // Endpoint is the string representation of an endpoint that can be
+ // used to communicate with the server through the proxy.
+ Endpoint string
}
diff --git a/runtimes/google/ipc/stream/proxy/proxy.go b/runtimes/google/ipc/stream/proxy/proxy.go
index e9ec40d..e1911aa 100644
--- a/runtimes/google/ipc/stream/proxy/proxy.go
+++ b/runtimes/google/ipc/stream/proxy/proxy.go
@@ -74,8 +74,8 @@
} else {
vc.Close("server closed by proxy")
}
+ s.Process.SendCloseVC(s.VC.VCI(), err)
}
- s.Process.SendCloseVC(s.VC.VCI(), err)
}
func (s *server) String() string {
return fmt.Sprintf("RoutingID %v on process %v (VCI:%v ID:%v)", s.RoutingID(), s.Process, s.VC.VCI(), s.VC.RemoteID())
@@ -372,8 +372,7 @@
server.Close(errors.New("failed to accept health check flow"))
return
}
- defer server.Process.RemoveServerVC(server.VC.VCI())
- defer server.VC.Close("stopped proxying server")
+ defer server.Close(nil)
server.Process.InitVCI(server.VC.VCI())
var request Request
@@ -384,6 +383,13 @@
response.Error = verror.Convert(err)
} else {
defer p.servers.Remove(server)
+ ep, err := version.ProxiedEndpoint(server.VC.RemoteAddr().RoutingID(), p.Endpoint())
+ if err != nil {
+ response.Error = verror.ConvertWithDefault(verror.Internal, err)
+ }
+ if ep != nil {
+ response.Endpoint = ep.String()
+ }
}
if err := vom.NewEncoder(conn).Encode(response); err != nil {
proxyLog().Infof("Failed to encode response %#v for server %v", response, server)
diff --git a/runtimes/google/ipc/stream/proxy/proxy_test.go b/runtimes/google/ipc/stream/proxy/proxy_test.go
index dacf405..96b0678 100644
--- a/runtimes/google/ipc/stream/proxy/proxy_test.go
+++ b/runtimes/google/ipc/stream/proxy/proxy_test.go
@@ -237,59 +237,6 @@
}
}
-func TestRestart(t *testing.T) {
- prxy, err := proxy.New(naming.FixedRoutingID(0xbbbbbbbb), nil, "tcp4", "127.0.0.1:0")
- if err != nil {
- t.Fatal(err)
- }
- server := manager.InternalNew(naming.FixedRoutingID(0x5555555555555555))
- defer server.Shutdown()
- ln, ep, err := server.Listen(prxy.Endpoint().Network(), prxy.Endpoint().String())
- if err != nil {
- t.Fatal(err)
- }
- defer ln.Close()
-
- client := manager.InternalNew(naming.FixedRoutingID(0xcccccccccccccccc))
- defer client.Shutdown()
-
- data1, data2 := "i am alive", "i was reborn"
-
- // Write data1 to server via the proxy
- rchan := make(chan string)
- go readFlow(t, ln, rchan)
- if err := writeFlow(client, ep, data1); err != nil {
- t.Fatal(err)
- }
- if read := <-rchan; read != data1 {
- t.Fatal("Got %q want %q", read, data1)
- }
-
- // Restart the proxy
- prxy.Shutdown()
- rchan = make(chan string)
- go readFlow(t, ln, rchan)
- if err := writeFlow(client, ep, data1); err == nil {
- t.Fatalf("writeFlow should fail once the proxy is dead")
- }
- if prxy, err = proxy.New(naming.FixedRoutingID(0xbbbbbbbb), nil, prxy.Endpoint().Addr().Network(), prxy.Endpoint().Addr().String()); err != nil {
- t.Fatal(err)
- }
- defer prxy.Shutdown()
-
- // Eventually the server will reconnect to the proxy and the
- // client should be able to write again.
- for i := 0; true; i++ {
- if err := writeFlow(client, ep, data2); err != nil {
- continue
- }
- if read := <-rchan; read != data2 {
- t.Fatal("Got %q want %q", read, data2)
- }
- return
- }
-}
-
func writeFlow(mgr stream.Manager, ep naming.Endpoint, data string) error {
vc, err := mgr.Dial(ep)
if err != nil {
diff --git a/runtimes/google/lib/publisher/publisher.go b/runtimes/google/lib/publisher/publisher.go
index d724da3..e86d232 100644
--- a/runtimes/google/lib/publisher/publisher.go
+++ b/runtimes/google/lib/publisher/publisher.go
@@ -17,6 +17,8 @@
type Publisher interface {
// AddServer adds a new server to be mounted.
AddServer(server string)
+ // RemoveServer removes a server from the list of mounts.
+ RemoveServer(server string)
// AddName adds a new name for all servers to be mounted as.
AddName(name string)
// Published returns the published names rooted at the mounttable.
@@ -44,11 +46,16 @@
donechan chan struct{} // closed when the publisher is done
}
-type serverCmd struct {
+type addServerCmd struct {
server string // server to add
done chan struct{} // closed when the cmd is done
}
+type removeServerCmd struct {
+ server string // server to remove
+ done chan struct{} // closed when the cmd is done
+}
+
type nameCmd struct {
name string // name to add
done chan struct{} // closed when the cmd is done
@@ -71,7 +78,14 @@
func (p *publisher) AddServer(server string) {
done := make(chan struct{})
defer func() { recover() }()
- p.cmdchan <- serverCmd{server, done}
+ p.cmdchan <- addServerCmd{server, done}
+ <-done
+}
+
+func (p *publisher) RemoveServer(server string) {
+ done := make(chan struct{})
+ defer func() { recover() }()
+ p.cmdchan <- removeServerCmd{server, done}
<-done
}
@@ -137,9 +151,12 @@
return
}
switch tcmd := cmd.(type) {
- case serverCmd:
+ case addServerCmd:
state.addServer(tcmd.server)
close(tcmd.done)
+ case removeServerCmd:
+ state.removeServer(tcmd.server)
+ close(tcmd.done)
case nameCmd:
state.addName(tcmd.name)
close(tcmd.done)
@@ -151,8 +168,8 @@
close(tcmd)
}
case <-state.timeout():
- // Remount everything once every period, to refresh the ttls.
- state.mountAll()
+ // Sync everything once every period, to refresh the ttls.
+ state.sync()
}
}
}
@@ -163,7 +180,7 @@
ctx context.T
mt naming.MountTable
period time.Duration
- deadline time.Time // deadline for the next mountAll call
+ deadline time.Time // deadline for the next sync call
names []string // names that have been added
servers map[string]bool // servers that have been added
mounts map[mountKey]*mountStatus // map each (name,server) to its status
@@ -224,10 +241,22 @@
}
}
+func (ps *pubState) removeServer(server string) {
+ if _, exists := ps.servers[server]; !exists {
+ return
+ }
+ delete(ps.servers, server)
+ for _, name := range ps.names {
+ if status, exists := ps.mounts[mountKey{name, server}]; exists {
+ ps.unmount(name, server, status)
+ }
+ }
+}
+
func (ps *pubState) mount(name, server string, status *mountStatus) {
// Always mount with ttl = period + slack, regardless of whether this is
- // triggered by a newly added server or name, or by mountAll. The next call
- // to mountAll will occur within the next period, and refresh all mounts.
+ // triggered by a newly added server or name, or by sync. The next call
+ // to sync will occur within the next period, and refresh all mounts.
ttl := ps.period + mountTTLSlack
status.lastMount = time.Now()
status.lastMountErr = ps.mt.Mount(ps.ctx, name, server, ttl)
@@ -238,10 +267,15 @@
}
}
-func (ps *pubState) mountAll() {
- ps.deadline = time.Now().Add(ps.period) // set deadline for the next mountAll
+func (ps *pubState) sync() {
+ ps.deadline = time.Now().Add(ps.period) // set deadline for the next sync
for key, status := range ps.mounts {
- ps.mount(key.name, key.server, status)
+ if status.lastUnmountErr != nil {
+ // Desired state is "unmounted", failed at previous attempt. Retry.
+ ps.unmount(key.name, key.server, status)
+ } else {
+ ps.mount(key.name, key.server, status)
+ }
}
}
@@ -252,6 +286,7 @@
vlog.Errorf("ipc pub: couldn't unmount(%v, %v): %v", name, server, status.lastUnmountErr)
} else {
vlog.VI(2).Infof("ipc pub: unmount(%v, %v)", name, server)
+ delete(ps.mounts, mountKey{name, server})
}
}