veyron2/ipc: modify ListenSpec to accept a slice of addresses.
MultiPart: 1/4
Change-Id: Ie34f7ac5c8b531e3b03e6dee775b7f1f3f9b3aae
diff --git a/runtimes/google/ipc/cancel_test.go b/runtimes/google/ipc/cancel_test.go
index 0ece305..f7cf101 100644
--- a/runtimes/google/ipc/cancel_test.go
+++ b/runtimes/google/ipc/cancel_test.go
@@ -59,7 +59,7 @@
if err != nil {
return nil, err
}
- if _, err := s.Listen(ipc.ListenSpec{Protocol: "tcp", Address: "127.0.0.1:0"}); err != nil {
+ if _, err := s.Listen(listenSpec); err != nil {
return nil, err
}
diff --git a/runtimes/google/ipc/client.go b/runtimes/google/ipc/client.go
index fae6ce0..4891254 100644
--- a/runtimes/google/ipc/client.go
+++ b/runtimes/google/ipc/client.go
@@ -354,7 +354,7 @@
span.Annotatef("address:%v", server)
defer span.Finish()
if status.flow, status.suffix, err = c.connectFlow(ctx, server, noDischarges); err != nil {
- vlog.VI(2).Infof("ipc: err: %s", err)
+ vlog.VI(2).Infof("ipc: connect to %s: %s", server, err)
status.err = err
status.flow = nil
}
@@ -861,7 +861,6 @@
// always return an error. But this isn't a "real" error; the client should
// read the rest of the results and succeed.
_ = fc.closeSend()
-
// Decode the response header, if it hasn't already been decoded by Recv.
if fc.response.Error == nil && !fc.response.EndStreamResults {
if err := fc.dec.Decode(&fc.response); err != nil {
@@ -874,10 +873,8 @@
return fc.close(berr)
}
}
-
// Incorporate any VTrace info that was returned.
ivtrace.MergeResponse(fc.ctx, &fc.response.TraceResponse)
-
if fc.response.Error != nil {
// TODO(cnicolaou): remove verror.NoAccess with verror version
// when ipc.Server is converted.
diff --git a/runtimes/google/ipc/client_test.go b/runtimes/google/ipc/client_test.go
index e6e6b2e..e8fb20f 100644
--- a/runtimes/google/ipc/client_test.go
+++ b/runtimes/google/ipc/client_test.go
@@ -10,7 +10,6 @@
"time"
"veyron.io/veyron/veyron2"
- "veyron.io/veyron/veyron2/ipc"
"veyron.io/veyron/veyron2/naming"
"veyron.io/veyron/veyron2/rt"
verror "veyron.io/veyron/veyron2/verror2"
@@ -108,6 +107,11 @@
s := expect.NewSession(t, srv.Stdout(), time.Minute)
s.ExpectVar("NAME")
+ // Verify that there are 1 entries for echoServer in the mount table.
+ if got, want := numServers(t, "echoServer"), 1; got != want {
+ t.Fatalf("got: %q, want: %q", got, want)
+ }
+
runClient(t, sh)
// Create a fake set of 100 entries in the mount table
@@ -121,9 +125,9 @@
}
}
- // Verify that there are 102 entries for echoServer in the mount table.
- if got, want := numServers(t, "echoServer"), 102; got != want {
- t.Fatalf("got: %d, want: %d", got, want)
+ // Verify that there are 101 entries for echoServer in the mount table.
+ if got, want := numServers(t, "echoServer"), 101; got != want {
+ t.Fatalf("got: %q, want: %q", got, want)
}
// TODO(cnicolaou): ok, so it works, but I'm not sure how
@@ -152,46 +156,6 @@
}
}
-type sleeper struct {
- done <-chan struct{}
-}
-
-func (s *sleeper) Sleep(call ipc.ServerContext) error {
- select {
- case <-s.done:
- case <-time.After(time.Hour):
- }
- return nil
-}
-
-func (s *sleeper) Ping(call ipc.ServerContext) (string, error) {
- return "pong", nil
-}
-
-func (s *sleeper) Source(call ipc.ServerCall, start int) error {
- i := start
- backoff := 25 * time.Millisecond
- for {
- select {
- case <-s.done:
- return nil
- case <-time.After(backoff):
- call.Send(i)
- i++
- }
- backoff *= 2
- }
-}
-
-func (s *sleeper) Sink(call ipc.ServerCall) (int, error) {
- i := 0
- for {
- if err := call.Recv(&i); err != nil {
- return i, verror.Convert(verror.Internal, call, err)
- }
- }
-}
-
func childPing(stdin io.Reader, stdout, stderr io.Writer, env map[string]string, args ...string) error {
name := args[1]
call, err := r.Client().StartCall(r.NewContext(), name, "Ping", nil)
@@ -210,7 +174,7 @@
return nil
}
-func initServer(t *testing.T, r veyron2.Runtime) (string, ipc.Server, func()) {
+func initServer(t *testing.T, r veyron2.Runtime) (string, func()) {
server, err := r.NewServer()
if err != nil {
t.Fatalf("unexpected error: %s", err)
@@ -222,9 +186,9 @@
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
- server.Serve("", &sleeper{done}, nil)
+ server.Serve("", &simple{done}, nil)
name := naming.JoinAddressName(ep.String(), "")
- return name, server, deferFn
+ return name, deferFn
}
func testForVerror(t *testing.T, err error, verr ...verror.IDAction) {
@@ -250,7 +214,7 @@
}
func TestTimeoutResponse(t *testing.T) {
- name, _, fn := initServer(t, r)
+ name, fn := initServer(t, r)
defer fn()
ctx, _ := r.NewContext().WithTimeout(100 * time.Millisecond)
call, err := r.Client().StartCall(ctx, name, "Sleep", nil)
@@ -264,7 +228,7 @@
}
func TestArgsAndResponses(t *testing.T) {
- name, _, fn := initServer(t, r)
+ name, fn := initServer(t, r)
defer fn()
call, err := r.Client().StartCall(r.NewContext(), name, "Sleep", []interface{}{"too many args"})
@@ -291,7 +255,7 @@
// The server and client use different runtimes and hence different
// principals and without any shared blessings the server will deny
// access to the client
- name, _, fn := initServer(t, r1)
+ name, fn := initServer(t, r1)
defer fn()
client := r2.Client()
@@ -304,7 +268,7 @@
}
func TestCancelledBeforeFinish(t *testing.T) {
- name, _, fn := initServer(t, r)
+ name, fn := initServer(t, r)
defer fn()
ctx, cancel := r.NewContext().WithCancel()
@@ -320,7 +284,7 @@
}
func TestCancelledDuringFinish(t *testing.T) {
- name, _, fn := initServer(t, r)
+ name, fn := initServer(t, r)
defer fn()
ctx, cancel := r.NewContext().WithCancel()
@@ -375,7 +339,7 @@
sh, fn := runMountTable(t, r)
defer fn()
- name, _, fn := initServer(t, r)
+ name, fn := initServer(t, r)
defer fn()
srv, err := sh.Start("ping", nil, name)
@@ -389,7 +353,7 @@
}
func TestStreamTimeout(t *testing.T) {
- name, _, fn := initServer(t, r)
+ name, fn := initServer(t, r)
defer fn()
want := 10
@@ -422,7 +386,7 @@
}
func TestStreamAbort(t *testing.T) {
- name, _, fn := initServer(t, r)
+ name, fn := initServer(t, r)
defer fn()
ctx := r.NewContext()
@@ -446,9 +410,14 @@
if verr != nil {
t.Fatalf("unexpected error: %s", verr)
}
- if !verror.Is(err, "veyron.io/veyron/veyron2/verror.Internal") || err.Error() != `ipc.test:"".Sink: Internal error: EOF` {
+ if !verror.Is(err, verror.Unknown.ID) || err.Error() != `veyron.io/veyron/veyron2/verror.Unknown: EOF` {
t.Errorf("wrong error: %#v", err)
}
+ /* TODO(cnicolaou): use this when verror2/vom transition is done.
+ if err != nil && !verror.Is(err, verror.EOF.ID) {
+ t.Fatalf("unexpected error: %#v", err)
+ }
+ */
if got := result; got != want {
t.Errorf("got %d, want %d", got, want)
}
diff --git a/runtimes/google/ipc/full_test.go b/runtimes/google/ipc/full_test.go
index e7a680c..729b8c6 100644
--- a/runtimes/google/ipc/full_test.go
+++ b/runtimes/google/ipc/full_test.go
@@ -40,9 +40,12 @@
)
var (
- errMethod = verror.Make(verror.Aborted, nil)
- clock = new(fakeClock)
- listenSpec = ipc.ListenSpec{Protocol: "tcp", Address: "127.0.0.1:0"}
+ errMethod = verror.Make(verror.Aborted, nil)
+ clock = new(fakeClock)
+ listenAddrs = ipc.ListenAddrs{{"tcp", "127.0.0.1:0"}}
+ listenWSAddrs = ipc.ListenAddrs{{"ws", "127.0.0.1:0"}, {"tcp", "127.0.0.1:0"}}
+ listenSpec = ipc.ListenSpec{Addrs: listenAddrs}
+ listenWSSpec = ipc.ListenSpec{Addrs: listenWSAddrs}
)
type fakeClock struct {
@@ -173,6 +176,10 @@
}
func startServer(t *testing.T, principal security.Principal, sm stream.Manager, ns naming.Namespace, name string, disp ipc.Dispatcher, opts ...ipc.ServerOpt) (naming.Endpoint, ipc.Server) {
+ return startServerWS(t, principal, sm, ns, name, disp, noWebsocket, opts...)
+}
+
+func startServerWS(t *testing.T, principal security.Principal, sm stream.Manager, ns naming.Namespace, name string, disp ipc.Dispatcher, shouldUseWebsocket websocketMode, opts ...ipc.ServerOpt) (naming.Endpoint, ipc.Server) {
vlog.VI(1).Info("InternalNewServer")
opts = append(opts, vc.LocalPrincipal{principal})
server, err := InternalNewServer(testContext(), sm, ns, nil, opts...)
@@ -180,7 +187,11 @@
t.Errorf("InternalNewServer failed: %v", err)
}
vlog.VI(1).Info("server.Listen")
- ep, err := server.Listen(listenSpec)
+ spec := listenSpec
+ if shouldUseWebsocket {
+ spec = listenWSSpec
+ }
+ ep, err := server.Listen(spec)
if err != nil {
t.Errorf("server.Listen failed: %v", err)
}
@@ -273,11 +284,15 @@
}
func createBundle(t *testing.T, client, server security.Principal, ts interface{}) (b bundle) {
+ return createBundleWS(t, client, server, ts, noWebsocket)
+}
+
+func createBundleWS(t *testing.T, client, server security.Principal, ts interface{}, shouldUseWebsocket websocketMode) (b bundle) {
b.sm = imanager.InternalNew(naming.FixedRoutingID(0x555555555))
b.ns = tnaming.NewSimpleNamespace()
b.name = "mountpoint/server"
if server != nil {
- b.ep, b.server = startServer(t, server, b.sm, b.ns, b.name, testServerDisp{ts})
+ b.ep, b.server = startServerWS(t, server, b.sm, b.ns, b.name, testServerDisp{ts}, shouldUseWebsocket)
}
if client != nil {
var err error
@@ -536,7 +551,7 @@
pserver = tsecurity.NewPrincipal("server")
pclient = tsecurity.NewPrincipal("client")
- b = createBundle(t, pclient, pserver, &testServer{})
+ b = createBundleWS(t, pclient, pserver, &testServer{}, shouldUseWebsocket)
)
defer b.cleanup(t)
// The server needs to recognize the client's root certificate.
@@ -1216,10 +1231,14 @@
t.Errorf("InternalNewServer failed: %v", err)
}
defer server.Stop()
- spec := listenSpec
- spec.Address = ":0"
- spec.AddressChooser = pa
+ spec := ipc.ListenSpec{
+ Addrs: ipc.ListenAddrs{{"tcp", ":0"}},
+ AddressChooser: pa,
+ }
ep, err := server.Listen(spec)
+ if err != nil {
+ t.Errorf("unexpected error: %s", err)
+ }
iep := ep.(*inaming.Endpoint)
host, _, err := net.SplitHostPort(iep.Address)
if err != nil {
@@ -1252,9 +1271,10 @@
t.Errorf("InternalNewServer failed: %v", err)
}
defer server.Stop()
- spec := listenSpec
- spec.Address = ":0"
- spec.AddressChooser = paerr
+ spec := ipc.ListenSpec{
+ Addrs: ipc.ListenAddrs{{"tcp", ":0"}},
+ AddressChooser: paerr,
+ }
ep, err := server.Listen(spec)
iep := ep.(*inaming.Endpoint)
host, _, err := net.SplitHostPort(iep.Address)
diff --git a/runtimes/google/ipc/server.go b/runtimes/google/ipc/server.go
index 9a44ec4..98b161e 100644
--- a/runtimes/google/ipc/server.go
+++ b/runtimes/google/ipc/server.go
@@ -38,17 +38,19 @@
type server struct {
sync.Mutex
- ctx context.T // context used by the server to make internal RPCs.
- streamMgr stream.Manager // stream manager to listen for new flows.
- publisher publisher.Publisher // publisher to publish mounttable mounts.
- listenerOpts []stream.ListenerOpt // listener opts passed to Listen.
- listeners map[stream.Listener]*dhcpListener // listeners created by Listen.
- disp ipc.Dispatcher // dispatcher to serve RPCs
- dispReserved ipc.Dispatcher // dispatcher for reserved methods
- active sync.WaitGroup // active goroutines we've spawned.
- stopped bool // whether the server has been stopped.
- stoppedChan chan struct{} // closed when the server has been stopped.
- preferredProtocols []string // protocols to use when resolving proxy name to endpoint.
+ ctx context.T // context used by the server to make internal RPCs.
+ streamMgr stream.Manager // stream manager to listen for new flows.
+ publisher publisher.Publisher // publisher to publish mounttable mounts.
+ listenerOpts []stream.ListenerOpt // listener opts passed to Listen.
+ listeners map[stream.Listener]struct{} // listeners created by Listen.
+ dhcpListeners map[*dhcpListener]struct{} // dhcpListeners created by Listen.
+
+ disp ipc.Dispatcher // dispatcher to serve RPCs
+ dispReserved ipc.Dispatcher // dispatcher for reserved methods
+ active sync.WaitGroup // active goroutines we've spawned.
+ stopped bool // whether the server has been stopped.
+ stoppedChan chan struct{} // closed when the server has been stopped.
+ preferredProtocols []string // protocols to use when resolving proxy name to endpoint.
ns naming.Namespace
servesMountTable bool
// TODO(cnicolaou): remove this when the publisher tracks published names
@@ -66,7 +68,7 @@
sync.Mutex
publisher *config.Publisher // publisher used to fork the stream
name string // name of the publisher stream
- ep *inaming.Endpoint // endpoint returned after listening
+ eps []*inaming.Endpoint // endpoint returned after listening
pubAddrs []ipc.Address // addresses to publish
pubPort string // port to use with the publish addresses
ch chan config.Setting // channel to receive settings over
@@ -82,14 +84,15 @@
ctx, _ = ivtrace.WithNewSpan(ctx, "NewServer")
statsPrefix := naming.Join("ipc", "server", "routing-id", streamMgr.RoutingID().String())
s := &server{
- ctx: ctx,
- streamMgr: streamMgr,
- publisher: publisher.New(ctx, ns, publishPeriod),
- listeners: make(map[stream.Listener]*dhcpListener),
- stoppedChan: make(chan struct{}),
- ns: ns,
- stats: newIPCStats(statsPrefix),
- traceStore: store,
+ ctx: ctx,
+ streamMgr: streamMgr,
+ publisher: publisher.New(ctx, ns, publishPeriod),
+ listeners: make(map[stream.Listener]struct{}),
+ dhcpListeners: make(map[*dhcpListener]struct{}),
+ stoppedChan: make(chan struct{}),
+ ns: ns,
+ stats: newIPCStats(statsPrefix),
+ traceStore: store,
}
var (
principal security.Principal
@@ -177,6 +180,7 @@
}
}
+/*
// getIPRoamingAddrs finds an appropriate set of addresss to publish
// externally and also determines if it's sensible to allow roaming.
// It returns the host address of the first suitable address that
@@ -208,38 +212,93 @@
// roaming is not desired.
return []ipc.Address{addrFromIP(ip)}, host, port, false, nil
}
+*/
-// configureEPAndRoaming configures the endpoint and roaming. In particular,
-// it fills in the Address portion of the endpoint with the appropriately
-// selected network address and creates a dhcpListener struct if roaming
-// is enabled.
-func (s *server) configureEPAndRoaming(spec ipc.ListenSpec, ep naming.Endpoint) (*dhcpListener, *inaming.Endpoint, error) {
+// getPossbileAddrs returns an appropriate set of addresses that could be used
+// to contact the supplied protocol, host, port parameters using the supplied
+// chooser function. It returns an indication of whether the supplied address
+// was fully specified or not, returning false if the address was fully
+// specified, and true if it was not.
+func getPossibleAddrs(protocol, host, port string, chooser ipc.AddressChooser) ([]ipc.Address, bool, error) {
+ ip := net.ParseIP(host)
+ if ip == nil {
+ return nil, false, fmt.Errorf("failed to parse %q as an IP host", host)
+ }
+ if ip.IsUnspecified() {
+ if chooser != nil {
+ // Need to find a usable IP address since the call to listen
+ // didn't specify one.
+ if addrs, err := netstate.GetAccessibleIPs(); err == nil {
+ if a, err := chooser(protocol, addrs); err == nil && len(a) > 0 {
+ return a, true, nil
+ }
+ }
+ }
+ // We don't have a chooser, so we just return the address the
+ // underlying system has chosen.
+ return []ipc.Address{addrFromIP(ip)}, true, nil
+ }
+ return []ipc.Address{addrFromIP(ip)}, false, nil
+}
+
+// createEndpoints creates appropriate inaming.Endpoint instances for
+// all of the externally accessible networrk addresses that can be used
+// to reach this server.
+func (s *server) createEndpoints(lep naming.Endpoint, chooser ipc.AddressChooser) ([]*inaming.Endpoint, bool, error) {
+ iep, ok := lep.(*inaming.Endpoint)
+ if !ok {
+ return nil, false, fmt.Errorf("internal type conversion error for %T", lep)
+ }
+ if !strings.HasPrefix(iep.Protocol, "tcp") &&
+ !strings.HasPrefix(iep.Protocol, "ws") {
+ // If not tcp or ws, just return the endpoint we were given.
+ return []*inaming.Endpoint{iep}, false, nil
+ }
+
+ host, port, err := net.SplitHostPort(iep.Address)
+ if err != nil {
+ return nil, false, err
+ }
+ addrs, unspecified, err := getPossibleAddrs(lep.Network(), host, port, chooser)
+ if err != nil {
+ return nil, false, err
+ }
+ ieps := make([]*inaming.Endpoint, 0, len(addrs))
+ for _, addr := range addrs {
+ n, err := inaming.NewEndpoint(lep.String())
+ if err != nil {
+ return nil, false, err
+ }
+ n.IsMountTable = s.servesMountTable
+ //n.Protocol = addr.Address().Network()
+ n.Address = net.JoinHostPort(addr.Address().String(), port)
+ ieps = append(ieps, n)
+ }
+ return ieps, unspecified, nil
+}
+
+/*
+// configureEPAndRoaming configures the endpoint by filling in its Address
+// portion with the appropriately selected network address, it also
+// returns an indication of whether this endpoint is appropriate for
+// roaming and the set of addresses that should be published.
+func (s *server) configureEPAndRoaming(spec ipc.ListenSpec, ep naming.Endpoint) (bool, []ipc.Address, *inaming.Endpoint, error) {
iep, ok := ep.(*inaming.Endpoint)
if !ok {
- return nil, nil, fmt.Errorf("internal type conversion error for %T", ep)
+ return false, nil, nil, fmt.Errorf("internal type conversion error for %T", ep)
}
- if !strings.HasPrefix(spec.Protocol, "tcp") {
- return nil, iep, nil
+ if !strings.HasPrefix(spec.Addrs[0].Protocol, "tcp") &&
+ !strings.HasPrefix(spec.Addrs[0].Protocol, "ws") {
+ return false, nil, iep, nil
}
pubAddrs, pubHost, pubPort, roaming, err := s.getIPRoamingAddrs(spec.AddressChooser, iep)
if err != nil {
- return nil, iep, err
+ return false, nil, iep, err
}
iep.Address = net.JoinHostPort(pubHost, pubPort)
- if !roaming {
- vlog.VI(2).Infof("the address %q requested for listening contained a fixed IP address which disables roaming, use :0 instead", spec.Address)
- }
- publisher := spec.StreamPublisher
- if roaming && publisher != nil {
- streamName := spec.StreamName
- ch := make(chan config.Setting)
- if _, err := publisher.ForkStream(streamName, ch); err != nil {
- return nil, iep, fmt.Errorf("failed to fork stream %q: %s", streamName, err)
- }
- return &dhcpListener{ep: iep, pubAddrs: pubAddrs, pubPort: pubPort, ch: ch, name: streamName, publisher: publisher}, iep, nil
- }
- return nil, iep, nil
+ return roaming, pubAddrs, iep, nil
}
+*/
func (s *server) Listen(listenSpec ipc.ListenSpec) (naming.Endpoint, error) {
defer vlog.LogCall()()
@@ -250,66 +309,11 @@
s.Unlock()
return nil, errServerStopped
}
- s.Unlock()
- var iep *inaming.Endpoint
- var dhcpl *dhcpListener
- var ln stream.Listener
+ useProxy := len(listenSpec.Proxy) > 0
- if len(listenSpec.Address) > 0 {
- // Listen if we have a local address to listen on. Some situations
- // just need a proxy (e.g. a browser extension).
- tmpln, lep, err := s.streamMgr.Listen(listenSpec.Protocol, listenSpec.Address, s.listenerOpts...)
- if err != nil {
- vlog.Errorf("ipc: Listen on %s failed: %s", listenSpec, err)
- return nil, err
- }
- ln = tmpln
- if tmpdhcpl, tmpiep, err := s.configureEPAndRoaming(listenSpec, lep); err != nil {
- ln.Close()
- return nil, err
- } else {
- dhcpl = tmpdhcpl
- iep = tmpiep
- }
- }
-
- s.Lock()
- defer s.Unlock()
- if s.stopped {
- ln.Close()
- return nil, errServerStopped
- }
-
- if dhcpl != nil {
- // We have a goroutine to listen for dhcp changes.
- s.active.Add(1)
- go func() {
- s.dhcpLoop(dhcpl)
- s.active.Done()
- }()
- s.listeners[ln] = dhcpl
- } else if ln != nil {
- s.listeners[ln] = nil
- }
-
- if iep != nil {
- // We have a goroutine per listener to accept new flows.
- // Each flow is served from its own goroutine.
- s.active.Add(1)
- go func() {
- s.listenLoop(ln, iep)
- s.active.Done()
- }()
- s.publisher.AddServer(s.publishEP(iep, s.servesMountTable), s.servesMountTable)
- if strings.HasPrefix(iep.Protocol, "tcp") {
- epCopy := *iep
- epCopy.Protocol = "ws"
- s.publisher.AddServer(s.publishEP(&epCopy, s.servesMountTable), s.servesMountTable)
- }
- }
-
- if len(listenSpec.Proxy) > 0 {
+ // Start the proxy as early as possible.
+ if useProxy {
// We have a goroutine for listening on proxy connections.
s.active.Add(1)
go func() {
@@ -317,14 +321,100 @@
s.active.Done()
}()
}
- return iep, nil
-}
+ s.Unlock()
-// TODO(cnicolaou): Take this out or make the ServesMountTable bit work in the endpoint.
-func (s *server) publishEP(ep *inaming.Endpoint, servesMountTable bool) string {
- var name string
- ep.IsMountTable = servesMountTable
- return naming.JoinAddressName(ep.String(), name)
+ var ieps []*inaming.Endpoint
+
+ type lnInfo struct {
+ ln stream.Listener
+ ep naming.Endpoint
+ }
+ linfo := []lnInfo{}
+ closeAll := func(lni []lnInfo) {
+ for _, li := range lni {
+ li.ln.Close()
+ }
+ }
+
+ roaming := false
+ for _, addr := range listenSpec.Addrs {
+ if len(addr.Address) > 0 {
+ // Listen if we have a local address to listen on. Some situations
+ // just need a proxy (e.g. a browser extension).
+ tmpln, lep, err := s.streamMgr.Listen(addr.Protocol, addr.Address, s.listenerOpts...)
+ if err != nil {
+ closeAll(linfo)
+ vlog.Errorf("ipc: Listen on %s failed: %s", addr, err)
+ return nil, err
+ }
+ linfo = append(linfo, lnInfo{tmpln, lep})
+ tmpieps, tmpRoaming, err := s.createEndpoints(lep, listenSpec.AddressChooser)
+ if err != nil {
+ closeAll(linfo)
+ return nil, err
+ }
+ ieps = append(ieps, tmpieps...)
+ if tmpRoaming {
+ roaming = true
+ }
+ }
+ }
+
+ // TODO(cnicolaou): write a test for all of these error cases.
+ if len(ieps) == 0 {
+ if useProxy {
+ return nil, nil
+ }
+ // no proxy.
+ if len(listenSpec.Addrs) > 0 {
+ return nil, fmt.Errorf("no endpoints")
+ }
+ return nil, fmt.Errorf("no proxy and no addresses requested")
+ }
+
+ // TODO(cnicolaou): return all of the eps and their errors....
+ s.Lock()
+ defer s.Unlock()
+ if s.stopped {
+ closeAll(linfo)
+ return nil, errServerStopped
+ }
+
+ if roaming && listenSpec.StreamPublisher != nil {
+ // TODO(cnicolaou): renable roaming in a followup CL.
+ /*
+ var dhcpl *dhcpListener
+ streamName := listenSpec.StreamName
+ ch := make(chan config.Setting)
+ if _, err := publisher.ForkStream(streamName, ch); err != nil {
+ return ieps[0], fmt.Errorf("failed to fork stream %q: %s", streamName, err)
+ }
+ dhcpl = &dhcpListener{eps: ieps, pubAddrs: pubAddrs, ch: ch, name: streamName, publisher: publisher}, iep, nil
+ // We have a goroutine to listen for dhcp changes.
+ s.active.Add(1)
+ go func() {
+ s.dhcpLoop(dhcpl)
+ s.active.Done()
+ }()
+ s.dhcpListeners[dhcpl] = struct{}{}
+ */
+ }
+
+ for _, li := range linfo {
+ s.listeners[li.ln] = struct{}{}
+ // We have a goroutine per listener to accept new flows.
+ // Each flow is served from its own goroutine.
+ s.active.Add(1)
+ go func(ln stream.Listener, ep naming.Endpoint) {
+ s.listenLoop(ln, ep)
+ s.active.Done()
+ }(li.ln, li.ep)
+ }
+ for _, iep := range ieps {
+ s.publisher.AddServer(naming.JoinAddressName(iep.String(), ""), s.servesMountTable)
+ }
+
+ return ieps[0], nil
}
func (s *server) reconnectAndPublishProxy(proxy string) (*inaming.Endpoint, stream.Listener, error) {
@@ -342,16 +432,9 @@
return nil, nil, fmt.Errorf("internal type conversion error for %T", ep)
}
s.Lock()
- s.listeners[ln] = nil
+ s.listeners[ln] = struct{}{}
s.Unlock()
- s.publisher.AddServer(s.publishEP(iep, s.servesMountTable), s.servesMountTable)
-
- if strings.HasPrefix(iep.Protocol, "tcp") {
- epCopy := *iep
- epCopy.Protocol = "ws"
- s.publisher.AddServer(s.publishEP(&epCopy, s.servesMountTable), s.servesMountTable)
- }
-
+ s.publisher.AddServer(naming.JoinAddressName(iep.String(), ""), s.servesMountTable)
return iep, ln, nil
}
@@ -368,7 +451,6 @@
// the initial connection maybe have failed, but we enter the retry
// loop anyway so that we will continue to try and connect to the
// proxy.
-
s.Lock()
if s.stopped {
s.Unlock()
@@ -381,12 +463,7 @@
s.listenLoop(ln, iep)
// The listener is done, so:
// (1) Unpublish its name
- s.publisher.RemoveServer(s.publishEP(iep, s.servesMountTable))
- if strings.HasPrefix(iep.Protocol, "tcp") {
- iepCopy := *iep
- iepCopy.Protocol = "ws"
- s.publisher.RemoveServer(s.publishEP(&iepCopy, s.servesMountTable))
- }
+ s.publisher.RemoveServer(naming.JoinAddressName(iep.String(), ""))
}
s.Lock()
@@ -420,7 +497,7 @@
}
func (s *server) listenLoop(ln stream.Listener, ep naming.Endpoint) {
- defer vlog.VI(1).Infof("ipc: Stopped listening on %v", ep)
+ defer vlog.VI(1).Infof("ipc: Stopped listening on %s", ep)
var calls sync.WaitGroup
defer func() {
calls.Wait()
@@ -453,13 +530,14 @@
}
}
+/*
func (s *server) applyChange(dhcpl *dhcpListener, addrs []net.Addr, fn func(string)) {
dhcpl.Lock()
defer dhcpl.Unlock()
for _, a := range addrs {
if ip := netstate.AsIP(a); ip != nil {
dhcpl.ep.Address = net.JoinHostPort(ip.String(), dhcpl.pubPort)
- fn(s.publishEP(dhcpl.ep, s.servesMountTable))
+ fn(dhcpl.ep.String())
}
}
}
@@ -472,7 +550,7 @@
// Publish all of the addresses
for _, pubAddr := range dhcpl.pubAddrs {
ep.Address = net.JoinHostPort(pubAddr.Address().String(), dhcpl.pubPort)
- s.publisher.AddServer(s.publishEP(&ep, s.servesMountTable), s.servesMountTable)
+ s.publisher.AddServer(naming.JoinAddressName(ep.String(), ""), s.servesMountTable)
}
for setting := range dhcpl.ch {
@@ -501,6 +579,7 @@
}
}
}
+*/
func (s *server) Serve(name string, obj interface{}, authorizer security.Authorizer) error {
if obj == nil {
@@ -607,16 +686,16 @@
nListeners := len(s.listeners)
errCh := make(chan error, nListeners)
- for ln, dhcpl := range s.listeners {
+ for ln, _ := range s.listeners {
go func(ln stream.Listener) {
errCh <- ln.Close()
}(ln)
- if dhcpl != nil {
- dhcpl.Lock()
- dhcpl.publisher.CloseFork(dhcpl.name, dhcpl.ch)
- dhcpl.ch <- config.NewBool("EOF", "stop", true)
- dhcpl.Unlock()
- }
+ }
+ for dhcpl, _ := range s.dhcpListeners {
+ dhcpl.Lock()
+ dhcpl.publisher.CloseFork(dhcpl.name, dhcpl.ch)
+ dhcpl.ch <- config.NewBool("EOF", "stop", true)
+ dhcpl.Unlock()
}
s.Unlock()
diff --git a/runtimes/google/ipc/server_test.go b/runtimes/google/ipc/server_test.go
index 98cacab..9c8c126 100644
--- a/runtimes/google/ipc/server_test.go
+++ b/runtimes/google/ipc/server_test.go
@@ -2,7 +2,6 @@
import (
"fmt"
- "io"
"os"
"reflect"
"sort"
@@ -15,9 +14,10 @@
"veyron.io/veyron/veyron/lib/expect"
"veyron.io/veyron/veyron/lib/modules"
+ "veyron.io/veyron/veyron/lib/modules/core"
tsecurity "veyron.io/veyron/veyron/lib/testutil/security"
+ _ "veyron.io/veyron/veyron/lib/websocket"
imanager "veyron.io/veyron/veyron/runtimes/google/ipc/stream/manager"
- "veyron.io/veyron/veyron/runtimes/google/ipc/stream/proxy"
"veyron.io/veyron/veyron/runtimes/google/ipc/stream/vc"
inaming "veyron.io/veyron/veyron/runtimes/google/naming"
tnaming "veyron.io/veyron/veyron/runtimes/google/testing/mocks/naming"
@@ -27,24 +27,22 @@
// connection to the server if the server dies and comes back (on the same
// endpoint).
func TestReconnect(t *testing.T) {
- b := createBundle(t, tsecurity.NewPrincipal("client"), nil, nil) // We only need the client from the bundle.
+ principal := tsecurity.NewPrincipal("client")
+ b := createBundle(t, principal, nil, nil) // We only need the client from the bundle.
defer b.cleanup(t)
- sh, err := modules.NewShell(nil)
+ sh, err := modules.NewShell(principal)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
defer sh.Cleanup(os.Stderr, os.Stderr)
- server, err := sh.Start("runServer", nil, "127.0.0.1:0")
+ server, err := sh.Start(core.EchoServerCommand, nil, "--", "--veyron.tcp.address=127.0.0.1:0", "mymessage", "")
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
session := expect.NewSession(t, server.Stdout(), time.Minute)
- addr := session.ReadLine()
- ep, err := inaming.NewEndpoint(addr)
- if err != nil {
- t.Fatalf("inaming.NewEndpoint(%q): %v", addr, err)
- }
- serverName := naming.JoinAddressName(ep.String(), "suffix")
+ serverName := session.ExpectVar("NAME")
+ serverEP := session.ExpectVar("ADDR")
+ ep, _ := inaming.NewEndpoint(serverEP)
makeCall := func() (string, error) {
ctx, _ := testContext().WithDeadline(time.Now().Add(5 * time.Second))
call, err := b.client.StartCall(ctx, serverName, "Echo", []interface{}{"bratman"})
@@ -52,12 +50,13 @@
return "", fmt.Errorf("START: %s", err)
}
var result string
- if err = call.Finish(&result); err != nil {
+ var rerr error
+ if err = call.Finish(&result, &rerr); err != nil {
return "", err
}
return result, nil
}
- expected := `method:"Echo",suffix:"suffix",arg:"bratman"`
+ expected := "mymessage: bratman\n"
if result, err := makeCall(); err != nil || result != expected {
t.Errorf("Got (%q, %v) want (%q, nil)", result, err, expected)
}
@@ -72,82 +71,77 @@
}
// Resurrect the server with the same address, verify client
- // re-establishes the connection.
- server, err = sh.Start("runServer", nil, addr)
+ // re-establishes the connection. This is probably racy if another
+ // process grabs the port. This seems unlikely since the kernel cycles
+ // through the entire port space.
+ server, err = sh.Start(core.EchoServerCommand, nil, "--", "--veyron.tcp.address="+ep.Address, "mymessage again", "")
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
session = expect.NewSession(t, server.Stdout(), time.Minute)
- defer server.Shutdown(nil, nil)
- session.Expect(addr)
+ defer server.Shutdown(os.Stderr, os.Stderr)
+ expected = "mymessage again: bratman\n"
if result, err := makeCall(); err != nil || result != expected {
t.Errorf("Got (%q, %v) want (%q, nil)", result, err, expected)
}
+
}
type proxyHandle struct {
- ns naming.Namespace
- process modules.Handle
- session *expect.Session
- mount string
- sh *modules.Shell
+ ns naming.Namespace
+ sh *modules.Shell
+ proxy modules.Handle
+ name string
}
-func (h *proxyHandle) Start(t *testing.T) error {
+func (h *proxyHandle) Start(t *testing.T, args ...string) error {
sh, err := modules.NewShell(nil)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
- server, err := sh.Start("runProxy", nil)
+ h.sh = sh
+ p, err := sh.Start(core.ProxyServerCommand, nil, args...)
if err != nil {
+ p.Shutdown(os.Stderr, os.Stderr)
t.Fatalf("unexpected error: %s", err)
}
- h.process = server
- h.session = expect.NewSession(t, server.Stdout(), time.Minute)
- h.mount = h.session.ReadLine()
- h.sh = sh
- if err := h.session.Error(); err != nil {
- return err
- }
- if err := h.ns.Mount(testContext(), "proxy", h.mount, time.Hour); err != nil {
- return err
- }
- return nil
+ h.proxy = p
+ s := expect.NewSession(t, p.Stdout(), time.Minute)
+ s.ReadLine()
+ h.name = s.ExpectVar("PROXY_NAME")
+ return h.ns.Mount(testContext(), "proxy", h.name, time.Hour)
}
func (h *proxyHandle) Stop() error {
- if h.process == nil {
+ defer h.sh.Cleanup(os.Stderr, os.Stderr)
+ if err := h.proxy.Shutdown(os.Stderr, os.Stderr); err != nil {
+ return err
+ }
+ if len(h.name) == 0 {
return nil
}
- h.process.Shutdown(os.Stderr, os.Stderr)
- h.process = nil
- if len(h.mount) == 0 {
- return nil
- }
- h.sh.Cleanup(nil, nil)
- return h.ns.Unmount(testContext(), "proxy", h.mount)
+ return h.ns.Unmount(testContext(), "proxy", h.name)
}
func TestProxyOnly(t *testing.T) {
listenSpec := ipc.ListenSpec{Proxy: "proxy"}
- testProxy(t, listenSpec)
+ testProxy(t, listenSpec, "--", "--veyron.tcp.address=127.0.0.1:0")
}
func TestProxy(t *testing.T) {
- listenSpec := ipc.ListenSpec{Protocol: "tcp", Address: "127.0.0.1:0", Proxy: "proxy"}
- testProxy(t, listenSpec)
+ proxyListenSpec := listenSpec
+ proxyListenSpec.Proxy = "proxy"
+ testProxy(t, proxyListenSpec, "--", "--veyron.tcp.address=127.0.0.1:0")
}
-func addrOnly(name string) string {
- addr, _ := naming.SplitAddressName(name)
- return addr
+func TestWSProxy(t *testing.T) {
+ proxyListenSpec := listenSpec
+ proxyListenSpec.Proxy = "proxy"
+ // The proxy uses websockets only, but the server is using tcp.
+ testProxy(t, proxyListenSpec, "--", "--veyron.tcp.protocol=ws", "--veyron.tcp.address=127.0.0.1:0")
}
-func addWSName(name string) []string {
- return []string{name, strings.Replace(name, "@tcp@", "@ws@", 1)}
-}
-
-func testProxy(t *testing.T, spec ipc.ListenSpec) {
+func testProxy(t *testing.T, spec ipc.ListenSpec, args ...string) {
sm := imanager.InternalNew(naming.FixedRoutingID(0x555555555))
ns := tnaming.NewSimpleNamespace()
client, err := InternalNewClient(sm, ns, vc.LocalPrincipal{tsecurity.NewPrincipal("client")})
@@ -163,7 +157,7 @@
// If no address is specified then we'll only 'listen' via
// the proxy.
- hasLocalListener := len(spec.Address) != 0
+ hasLocalListener := len(spec.Addrs) > 0 && len(spec.Addrs[0].Address) != 0
name := "mountpoint/server/suffix"
makeCall := func() (string, error) {
@@ -182,7 +176,7 @@
return result, nil
}
proxy := &proxyHandle{ns: ns}
- if err := proxy.Start(t); err != nil {
+ if err := proxy.Start(t, args...); err != nil {
t.Fatal(err)
}
defer proxy.Stop()
@@ -190,7 +184,7 @@
if len(addrs) != 1 {
t.Fatalf("failed to lookup proxy")
}
- proxyEP := addrOnly(addrs[0])
+ proxyEP, _ := naming.SplitAddressName(addrs[0])
ep, err := server.Listen(spec)
if err != nil {
@@ -203,24 +197,29 @@
ch := make(chan struct{})
// Proxy connections are started asynchronously, so we need to wait..
waitfor := func(expect int) {
+ then := time.Now().Add(time.Minute)
for {
addrs, _ := ns.Resolve(testContext(), name)
if len(addrs) == expect {
close(ch)
return
}
+ if time.Now().After(then) {
+ t.Fatalf("timed out")
+ }
time.Sleep(100 * time.Millisecond)
}
}
proxiedEP, err := inaming.NewEndpoint(proxyEP)
+
if err != nil {
- t.Fatalf("unexpected error: %s", err)
+ t.Fatalf("unexpected error for %q: %s", proxyEP, err)
}
proxiedEP.RID = naming.FixedRoutingID(0x555555555)
- expectedEndpoints := addWSName(proxiedEP.String())
+ expectedEndpoints := []string{proxiedEP.String()}
if hasLocalListener {
- expectedEndpoints = append(expectedEndpoints, addWSName(ep.String())...)
+ expectedEndpoints = append(expectedEndpoints, ep.String())
}
// Proxy connetions are created asynchronously, so we wait for the
@@ -234,7 +233,8 @@
got := []string{}
for _, s := range verifyMount(t, ns, name) {
- got = append(got, addrOnly(s))
+ addr, _ := naming.SplitAddressName(s)
+ got = append(got, addr)
}
sort.Strings(got)
sort.Strings(expectedEndpoints)
@@ -248,9 +248,14 @@
// the local endpoint from the mount table entry! We have to remove both
// the tcp and the websocket address.
sep := ep.String()
- wsep := strings.Replace(sep, "@tcp@", "@ws@", 1)
+ //wsep := strings.Replace(sep, "@tcp@", "@ws@", 1)
ns.Unmount(testContext(), "mountpoint/server", naming.JoinAddressName(sep, ""))
- ns.Unmount(testContext(), "mountpoint/server", naming.JoinAddressName(wsep, ""))
+ //ns.Unmount(testContext(), "mountpoint/server", naming.JoinAddressName(wsep, ""))
+ }
+
+ addrs = verifyMount(t, ns, name)
+ if len(addrs) != 1 {
+ t.Fatalf("failed to lookup proxy")
}
// Proxied endpoint should be published and RPC should succeed (through proxy)
@@ -265,6 +270,7 @@
if result, err := makeCall(); err == nil || (!strings.HasPrefix(err.Error(), "RESOLVE") && !strings.Contains(err.Error(), "EOF")) {
t.Fatalf(`Got (%v, %v) want ("", "RESOLVE: <err>" or "EOF") as proxy is down`, result, err)
}
+
for {
if _, err := ns.Resolve(testContext(), name); err != nil {
break
@@ -274,64 +280,27 @@
verifyMountMissing(t, ns, name)
// Proxy restarts, calls should eventually start succeeding.
- if err := proxy.Start(t); err != nil {
+ if err := proxy.Start(t, args...); err != nil {
t.Fatal(err)
}
+ retries := 0
for {
if result, err := makeCall(); err == nil {
if result != expected {
t.Errorf("Got (%v, %v) want (%v, nil)", result, err, expected)
}
break
+ } else {
+ retries++
+ if retries > 10 {
+ t.Fatalf("Failed after 10 attempts: err: %s", err)
+ }
}
}
}
-func runServer(stdin io.Reader, stdout, stderr io.Writer, env map[string]string, args ...string) error {
- mgr := imanager.InternalNew(naming.FixedRoutingID(0x1111111))
- ns := tnaming.NewSimpleNamespace()
- server, err := InternalNewServer(testContext(), mgr, ns, nil, vc.LocalPrincipal{tsecurity.NewPrincipal("server")})
- if err != nil {
- return fmt.Errorf("InternalNewServer failed: %v", err)
- }
- disp := testServerDisp{new(testServer)}
- if err := server.ServeDispatcher("server", disp); err != nil {
- return fmt.Errorf("server.Register failed: %v", err)
- }
- spec := listenSpec
- spec.Address = args[1]
- ep, err := server.Listen(spec)
- if err != nil {
- return fmt.Errorf("server.Listen failed: %v", err)
- }
- fmt.Fprintf(stdout, "%s\n", ep.Addr())
- // parent process should explicitly shut us down by closing stdin.
- modules.WaitForEOF(stdin)
- return nil
-}
-
-func runProxy(stdin io.Reader, stdout, stderr io.Writer, env map[string]string, args ...string) error {
- rid, err := naming.NewRoutingID()
- if err != nil {
- return err
- }
- proxy, err := proxy.New(rid, tsecurity.NewPrincipal("proxy"), "tcp", "127.0.0.1:0", "")
- if err != nil {
- return err
- }
- fmt.Fprintf(stdout, "/%s\n", proxy.Endpoint().String())
- // parent process should explicitly shut us down by closing stdin.
- modules.WaitForEOF(stdin)
- return nil
-}
-
// Required by modules framework.
func TestHelperProcess(t *testing.T) {
modules.DispatchInTest()
}
-
-func init() {
- modules.RegisterChild("runServer", "[address]", runServer)
- modules.RegisterChild("runProxy", "", runProxy)
-}
diff --git a/runtimes/google/ipc/simple_test.go b/runtimes/google/ipc/simple_test.go
new file mode 100644
index 0000000..f2853c5
--- /dev/null
+++ b/runtimes/google/ipc/simple_test.go
@@ -0,0 +1,130 @@
+package ipc_test
+
+import (
+ "io"
+ "testing"
+ "time"
+
+ "veyron.io/veyron/veyron2/ipc"
+ verror "veyron.io/veyron/veyron2/verror2"
+)
+
+type simple struct {
+ done <-chan struct{}
+}
+
+func (s *simple) Sleep(call ipc.ServerContext) error {
+ select {
+ case <-s.done:
+ case <-time.After(time.Hour):
+ }
+ return nil
+}
+
+func (s *simple) Ping(call ipc.ServerContext) (string, error) {
+ return "pong", nil
+}
+
+func (s *simple) Source(call ipc.ServerCall, start int) error {
+ i := start
+ backoff := 25 * time.Millisecond
+ for {
+ select {
+ case <-s.done:
+ return nil
+ case <-time.After(backoff):
+ call.Send(i)
+ i++
+ }
+ backoff *= 2
+ }
+}
+
+func (s *simple) Sink(call ipc.ServerCall) (int, error) {
+ i := 0
+ for {
+ if err := call.Recv(&i); err != nil {
+ return i, err
+ }
+ }
+}
+
+func (s *simple) Inc(call ipc.ServerCall, inc int) (int, error) {
+ i := 0
+ for {
+ if err := call.Recv(&i); err != nil {
+ if err == io.EOF {
+ // TODO(cnicolaou): this should return a verror, i.e.
+ // verror.Make(verror.EOF, call), but for now we
+ // return an io.EOF
+ return i, io.EOF
+ }
+ return i, err
+ }
+ call.Send(i + inc)
+ }
+}
+
+func TestSimpleRPC(t *testing.T) {
+ name, fn := initServer(t, r)
+ defer fn()
+
+ client := r.Client()
+ call, err := client.StartCall(r.NewContext(), name, "Ping", nil)
+ if err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+ response := ""
+ var verr error
+ err = call.Finish(&response, &verr)
+ if err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+ if got, want := response, "pong"; got != want {
+ t.Fatalf("got %q, want %q", got, want)
+ }
+}
+
+func TestSimpleStreaming(t *testing.T) {
+ name, fn := initServer(t, r)
+ defer fn()
+
+ ctx := r.NewContext()
+ inc := 1
+ call, err := r.Client().StartCall(ctx, name, "Inc", []interface{}{inc})
+ if err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+
+ want := 10
+ for i := 0; i <= want; i++ {
+ if err := call.Send(i); err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+ got := -1
+ if err = call.Recv(&got); err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+ if want := i + inc; got != want {
+ t.Fatalf("got %d, want %d")
+ }
+ }
+ call.CloseSend()
+ final := -1
+ verr := call.Finish(&final, &err)
+ if verr != nil {
+ t.Fatalf("unexpected error: %s", verr)
+
+ }
+ if !verror.Is(err, verror.Unknown.ID) || err.Error() != `veyron.io/veyron/veyron2/verror.Unknown: EOF` {
+ t.Errorf("wrong error: %#v", err)
+ }
+ /* TODO(cnicolaou): use this when verror2/vom transition is done.
+ if err != nil && !verror.Is(err, verror.EOF.ID) {
+ t.Fatalf("unexpected error: %#v", err)
+ }
+ */
+ if got := final; got != want {
+ t.Fatalf("got %d, want %d")
+ }
+}
diff --git a/runtimes/google/ipc/sort_endpoints.go b/runtimes/google/ipc/sort_endpoints.go
index 48bcd88..312be8c 100644
--- a/runtimes/google/ipc/sort_endpoints.go
+++ b/runtimes/google/ipc/sort_endpoints.go
@@ -55,12 +55,13 @@
name := server
address, suffix := naming.SplitAddressName(name)
if len(address) == 0 {
- errs.add(fmt.Errorf("%q is not a rooted name", name))
- continue
+ // Maybe it's not a rooted endpoint, just a bare one.
+ address = name
+ suffix = ""
}
iep, err := inaming.NewEndpoint(address)
if err != nil {
- errs.add(fmt.Errorf("%q: %s", name, err))
+ errs.add(fmt.Errorf("failed to parse %q: %s", name, err))
continue
}
if err = version.CheckCompatibility(iep); err != nil {
diff --git a/runtimes/google/ipc/stream/manager/manager.go b/runtimes/google/ipc/stream/manager/manager.go
index a2ad073..6b782a0 100644
--- a/runtimes/google/ipc/stream/manager/manager.go
+++ b/runtimes/google/ipc/stream/manager/manager.go
@@ -15,7 +15,6 @@
"veyron.io/veyron/veyron2/vlog"
"veyron.io/veyron/veyron/lib/stats"
- "veyron.io/veyron/veyron/lib/websocket"
"veyron.io/veyron/veyron/runtimes/google/ipc/stream/crypto"
"veyron.io/veyron/veyron/runtimes/google/ipc/stream/vif"
"veyron.io/veyron/veyron/runtimes/google/ipc/version"
@@ -180,16 +179,6 @@
return nil, nil, errShutDown
}
- // If the protocol is tcp, we add the listener that supports both tcp and websocket
- // so that javascript can talk to this server.
- if strings.HasPrefix(protocol, "tcp") {
- wsln, err := websocket.NewListener(netln)
- if err != nil {
- netln.Close()
- return nil, nil, err
- }
- netln = wsln
- }
ln := newNetListener(m, netln, opts)
m.listeners[ln] = true
m.muListeners.Unlock()
diff --git a/runtimes/google/ipc/stream/manager/manager_test.go b/runtimes/google/ipc/stream/manager/manager_test.go
index aa66c0d..dc2b32d 100644
--- a/runtimes/google/ipc/stream/manager/manager_test.go
+++ b/runtimes/google/ipc/stream/manager/manager_test.go
@@ -42,19 +42,15 @@
modules.RegisterChild("runServer", "", runServer)
}
-func testSimpleFlow(t *testing.T, useWebsocket bool) {
+func testSimpleFlow(t *testing.T, protocol string) {
server := InternalNew(naming.FixedRoutingID(0x55555555))
client := InternalNew(naming.FixedRoutingID(0xcccccccc))
- ln, ep, err := server.Listen("tcp", "127.0.0.1:0")
+ ln, ep, err := server.Listen(protocol, "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
- if useWebsocket {
- ep.(*inaming.Endpoint).Protocol = "ws"
- }
-
data := "the dark knight rises"
var clientVC stream.VC
var clientF1 stream.Flow
@@ -131,11 +127,11 @@
}
func TestSimpleFlow(t *testing.T) {
- testSimpleFlow(t, false)
+ testSimpleFlow(t, "tcp")
}
func TestSimpleFlowWS(t *testing.T) {
- testSimpleFlow(t, true)
+ testSimpleFlow(t, "ws")
}
func TestConnectionTimeout(t *testing.T) {
@@ -159,7 +155,7 @@
}
}
-func testAuthenticatedByDefault(t *testing.T, useWebsocket bool) {
+func testAuthenticatedByDefault(t *testing.T, protocol string) {
var (
server = InternalNew(naming.FixedRoutingID(0x55555555))
client = InternalNew(naming.FixedRoutingID(0xcccccccc))
@@ -171,13 +167,10 @@
)
// VCSecurityLevel is intentionally not provided to Listen - to test
// default behavior.
- ln, ep, err := server.Listen("tcp", "127.0.0.1:0", serverPrincipal)
+ ln, ep, err := server.Listen(protocol, "127.0.0.1:0", serverPrincipal)
if err != nil {
t.Fatal(err)
}
- if useWebsocket {
- ep.(*inaming.Endpoint).Protocol = "ws"
- }
errs := make(chan error)
@@ -227,11 +220,11 @@
}
func TestAuthenticatedByDefault(t *testing.T) {
- testAuthenticatedByDefault(t, false)
+ testAuthenticatedByDefault(t, "tcp")
}
func TestAuthenticatedByDefaultWS(t *testing.T) {
- testAuthenticatedByDefault(t, true)
+ testAuthenticatedByDefault(t, "ws")
}
func numListeners(m stream.Manager) int { return len(m.(*manager).listeners) }
@@ -277,35 +270,20 @@
}
func TestCloseListener(t *testing.T) {
- server := InternalNew(naming.FixedRoutingID(0x5e97e9))
-
- ln, ep, err := server.Listen("tcp", "127.0.0.1:0")
- if err != nil {
- t.Fatal(err)
- }
- // Server will just listen for flows and close them.
- go acceptLoop(ln)
- client := InternalNew(naming.FixedRoutingID(0xc1e41))
- if _, err = client.Dial(ep); err != nil {
- t.Fatal(err)
- }
- ln.Close()
- client = InternalNew(naming.FixedRoutingID(0xc1e42))
- if _, err := client.Dial(ep); err == nil {
- t.Errorf("client.Dial(%q) should have failed", ep)
- }
+ testCloseListener(t, "tcp")
}
func TestCloseListenerWS(t *testing.T) {
+ testCloseListener(t, "ws")
+}
+
+func testCloseListener(t *testing.T, protocol string) {
server := InternalNew(naming.FixedRoutingID(0x5e97e9))
- ln, ep, err := server.Listen("tcp", "127.0.0.1:0")
+ ln, ep, err := server.Listen(protocol, "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
-
- ep.(*inaming.Endpoint).Protocol = "ws"
-
// Server will just listen for flows and close them.
go acceptLoop(ln)
client := InternalNew(naming.FixedRoutingID(0xc1e41))
@@ -340,41 +318,22 @@
}
func TestShutdownEndpoint(t *testing.T) {
- server := InternalNew(naming.FixedRoutingID(0x55555555))
- client := InternalNew(naming.FixedRoutingID(0xcccccccc))
-
- ln, ep, err := server.Listen("tcp", "127.0.0.1:0")
- if err != nil {
- t.Fatal(err)
- }
-
- // Server will just listen for flows and close them.
- go acceptLoop(ln)
-
- vc, err := client.Dial(ep)
- if err != nil {
- t.Fatal(err)
- }
- if f, err := vc.Connect(); f == nil || err != nil {
- t.Errorf("vc.Connect failed: (%v, %v)", f, err)
- }
- client.ShutdownEndpoint(ep)
- if f, err := vc.Connect(); f != nil || err == nil {
- t.Errorf("vc.Connect unexpectedly succeeded: (%v, %v)", f, err)
- }
+ testShutdownEndpoint(t, "tcp")
}
func TestShutdownEndpointWS(t *testing.T) {
+ testShutdownEndpoint(t, "ws")
+}
+
+func testShutdownEndpoint(t *testing.T, protocol string) {
server := InternalNew(naming.FixedRoutingID(0x55555555))
client := InternalNew(naming.FixedRoutingID(0xcccccccc))
- ln, ep, err := server.Listen("tcp", "127.0.0.1:0")
+ ln, ep, err := server.Listen(protocol, "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
- ep.(*inaming.Endpoint).Protocol = "ws"
-
// Server will just listen for flows and close them.
go acceptLoop(ln)
@@ -410,7 +369,7 @@
}
*/
-func testMultipleVCs(t *testing.T, useWebsocket bool) {
+func testMultipleVCs(t *testing.T, protocol string) {
server := InternalNew(naming.FixedRoutingID(0x55555555))
client := InternalNew(naming.FixedRoutingID(0xcccccccc))
@@ -419,13 +378,10 @@
// Have the server read from each flow and write to rchan.
rchan := make(chan string)
- ln, ep, err := server.Listen("tcp", "127.0.0.1:0")
+ ln, ep, err := server.Listen(protocol, "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
- if useWebsocket {
- ep.(*inaming.Endpoint).Protocol = "ws"
- }
read := func(flow stream.Flow, c chan string) {
var buf bytes.Buffer
@@ -494,11 +450,11 @@
}
func TestMultipleVCs(t *testing.T) {
- testMultipleVCs(t, false)
+ testMultipleVCs(t, "tcp")
}
func TestMultipleVCsWS(t *testing.T) {
- testMultipleVCs(t, true)
+ testMultipleVCs(t, "ws")
}
func TestAddressResolution(t *testing.T) {
@@ -543,66 +499,31 @@
}
func TestServerRestartDuringClientLifetime(t *testing.T) {
- client := InternalNew(naming.FixedRoutingID(0xcccccccc))
- sh, err := modules.NewShell(nil)
- if err != nil {
- t.Fatalf("unexpected error: %s", err)
- }
- defer sh.Cleanup(nil, nil)
- h, err := sh.Start("runServer", nil, "127.0.0.1:0")
- if err != nil {
- t.Fatalf("unexpected error: %s", err)
- }
- s := expect.NewSession(t, h.Stdout(), time.Minute)
- addr := s.ReadLine()
-
- ep, err := inaming.NewEndpoint(addr)
- if err != nil {
- t.Fatalf("inaming.NewEndpoint(%q): %v", addr, err)
- }
- if _, err := client.Dial(ep); err != nil {
- t.Fatal(err)
- }
- h.Shutdown(nil, os.Stderr)
-
- // A new VC cannot be created since the server is dead
- if _, err := client.Dial(ep); err == nil {
- t.Fatal("Expected client.Dial to fail since server is dead")
- }
-
- h, err = sh.Start("runServer", nil, addr)
- if err != nil {
- t.Fatalf("unexpected error: %s", err)
- }
- s = expect.NewSession(t, h.Stdout(), time.Minute)
- // Restarting the server, listening on the same address as before
- if addr2 := s.ReadLine(); addr2 != addr || err != nil {
- t.Fatalf("Got (%q, %v) want (%q, nil)", addr2, err, addr)
- }
- if _, err := client.Dial(ep); err != nil {
- t.Fatal(err)
- }
+ testServerRestartDuringClientLifetime(t, "tcp")
}
func TestServerRestartDuringClientLifetimeWS(t *testing.T) {
+ testServerRestartDuringClientLifetime(t, "ws")
+}
+
+func testServerRestartDuringClientLifetime(t *testing.T, protocol string) {
client := InternalNew(naming.FixedRoutingID(0xcccccccc))
sh, err := modules.NewShell(nil)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
defer sh.Cleanup(nil, nil)
- h, err := sh.Start("runServer", nil, "127.0.0.1:0")
+ h, err := sh.Start("runServer", nil, protocol, "127.0.0.1:0")
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
s := expect.NewSession(t, h.Stdout(), time.Minute)
addr := s.ReadLine()
- ep, err := inaming.NewEndpoint(addr)
+ ep, err := inaming.NewEndpoint(naming.FormatEndpoint(protocol, addr))
if err != nil {
t.Fatalf("inaming.NewEndpoint(%q): %v", addr, err)
}
- ep.Protocol = "ws"
if _, err := client.Dial(ep); err != nil {
t.Fatal(err)
}
@@ -613,7 +534,7 @@
t.Fatal("Expected client.Dial to fail since server is dead")
}
- h, err = sh.Start("runServer", nil, addr)
+ h, err = sh.Start("runServer", nil, protocol, addr)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
@@ -634,7 +555,7 @@
func runServer(stdin io.Reader, stdout, stderr io.Writer, env map[string]string, args ...string) error {
server := InternalNew(naming.FixedRoutingID(0x55555555))
- _, ep, err := server.Listen("tcp", args[1])
+ _, ep, err := server.Listen(args[1], args[2])
if err != nil {
fmt.Fprintln(stderr, err)
return err
diff --git a/runtimes/google/ipc/stream/proxy/proxy.go b/runtimes/google/ipc/stream/proxy/proxy.go
index eddef79..095357b 100644
--- a/runtimes/google/ipc/stream/proxy/proxy.go
+++ b/runtimes/google/ipc/stream/proxy/proxy.go
@@ -6,13 +6,13 @@
"net"
"sync"
+ "veyron.io/veyron/veyron2/ipc/stream"
"veyron.io/veyron/veyron2/naming"
"veyron.io/veyron/veyron2/security"
"veyron.io/veyron/veyron2/verror"
"veyron.io/veyron/veyron2/vlog"
"veyron.io/veyron/veyron2/vom"
- "veyron.io/veyron/veyron/lib/websocket"
"veyron.io/veyron/veyron/runtimes/google/ipc/stream/crypto"
"veyron.io/veyron/veyron/runtimes/google/ipc/stream/id"
"veyron.io/veyron/veyron/runtimes/google/ipc/stream/message"
@@ -132,14 +132,14 @@
// New creates a new Proxy that listens for network connections on the provided
// (network, address) pair and routes VC traffic between accepted connections.
func New(rid naming.RoutingID, principal security.Principal, network, address, pubAddress string) (*Proxy, error) {
- ln, err := net.Listen(network, address)
+ _, listenFn := stream.RegisteredProtocol(network)
+ if listenFn == nil {
+ return nil, fmt.Errorf("unknown network %s", network)
+ }
+ ln, err := listenFn(network, address)
if err != nil {
return nil, fmt.Errorf("net.Listen(%q, %q) failed: %v", network, address, err)
}
- ln, err = websocket.NewListener(ln)
- if err != nil {
- return nil, err
- }
if len(pubAddress) == 0 {
pubAddress = ln.Addr().String()
}
@@ -460,7 +460,8 @@
// Endpoint returns the endpoint of the proxy service. By Dialing a VC to this
// endpoint, processes can have their services exported through the proxy.
func (p *Proxy) Endpoint() naming.Endpoint {
- return version.Endpoint(p.ln.Addr().Network(), p.pubAddress, p.rid)
+ ep := version.Endpoint(p.ln.Addr().Network(), p.pubAddress, p.rid)
+ return ep
}
// Shutdown stops the proxy service, closing all network connections.
diff --git a/runtimes/google/naming/endpoint.go b/runtimes/google/naming/endpoint.go
index 2552bc2..33c86cf 100644
--- a/runtimes/google/naming/endpoint.go
+++ b/runtimes/google/naming/endpoint.go
@@ -34,7 +34,7 @@
// NewEndpoint creates a new endpoint from a string as per naming.NewEndpoint
func NewEndpoint(input string) (*Endpoint, error) {
- var ep Endpoint
+ ep := new(Endpoint)
// We have to guess this is a mount table if we don't know.
ep.IsMountTable = true
@@ -45,7 +45,7 @@
parts := strings.Split(input, separator)
if len(parts) == 1 {
err := ep.parseHostPort(parts[0])
- return &ep, err
+ return ep, err
}
version, err := strconv.ParseUint(parts[0], 10, 16)
@@ -63,7 +63,7 @@
default:
err = errInvalidEndpointString
}
- return &ep, err
+ return ep, err
}
func (ep *Endpoint) parseHostPort(input string) error {
diff --git a/runtimes/google/naming/namespace/all_test.go b/runtimes/google/naming/namespace/all_test.go
index 8c1bd07..2dd0a94 100644
--- a/runtimes/google/naming/namespace/all_test.go
+++ b/runtimes/google/naming/namespace/all_test.go
@@ -3,7 +3,6 @@
import (
"runtime"
"runtime/debug"
- "strings"
"sync"
"testing"
"time"
@@ -19,7 +18,7 @@
"veyron.io/veyron/veyron2/vlog"
"veyron.io/veyron/veyron/lib/testutil"
- _ "veyron.io/veyron/veyron/profiles"
+ "veyron.io/veyron/veyron/profiles"
"veyron.io/veyron/veyron/runtimes/google/naming/namespace"
vsecurity "veyron.io/veyron/veyron/security"
service "veyron.io/veyron/veyron/services/mounttable/lib"
@@ -54,10 +53,6 @@
t.Fatal(string(debug.Stack()))
}
-func addWSName(name string) []string {
- return []string{name, strings.Replace(name, "@tcp@", "@ws@", 1)}
-}
-
// N squared but who cares, this is a little test.
// Ignores dups.
func contains(container, contained []string) bool {
@@ -203,7 +198,7 @@
}
// Add a mount table server.
// Start serving on a loopback address.
- ep, err := s.Listen(ipc.ListenSpec{Protocol: "tcp", Address: "127.0.0.1:0"})
+ ep, err := s.Listen(profiles.LocalListenSpec)
if err != nil {
boom(t, "Failed to Listen: %s", err)
}
@@ -306,16 +301,16 @@
testResolveToMountTable(t, r, ns, m, rootMT)
// The server registered for each mount point is a mount table
- testResolve(t, r, ns, m, addWSName(mts[m].name)...)
+ testResolve(t, r, ns, m, mts[m].name)
// ResolveToMountTable will walk through to the sub MountTables
mtbar := naming.Join(m, "bar")
subMT := naming.Join(mts[m].name, "bar")
- testResolveToMountTable(t, r, ns, mtbar, addWSName(subMT)...)
+ testResolveToMountTable(t, r, ns, mtbar, subMT)
}
for _, j := range []string{j1MP, j2MP, j3MP} {
- testResolve(t, r, ns, j, addWSName(jokes[j].name)...)
+ testResolve(t, r, ns, j, jokes[j].name)
}
}
@@ -349,7 +344,7 @@
mt2mt := naming.Join(mts[mt2MP].name, "a")
// The mt2/a is served by the mt2 mount table
- testResolveToMountTable(t, r, ns, mt2a, addWSName(mt2mt)...)
+ testResolveToMountTable(t, r, ns, mt2a, mt2mt)
// The server for mt2a is mt3server from the second mount above.
testResolve(t, r, ns, mt2a, mt3Server)
@@ -366,11 +361,11 @@
names := []string{naming.JoinAddressName(mts[mt4MP].name, "a"),
naming.JoinAddressName(mts[mt5MP].name, "a")}
- names = append(names, addWSName(naming.JoinAddressName(mts[mt2MP].name, "a"))...)
+ names = append(names, naming.JoinAddressName(mts[mt2MP].name, "a"))
// We now have 3 mount tables prepared to serve mt2/a
testResolveToMountTable(t, r, ns, "mt2/a", names...)
names = []string{mts[mt4MP].name, mts[mt5MP].name}
- names = append(names, addWSName(mts[mt2MP].name)...)
+ names = append(names, mts[mt2MP].name)
testResolve(t, r, ns, "mt2", names...)
}
@@ -388,15 +383,15 @@
// Set up some nested mounts and verify resolution.
for _, m := range []string{"mt4/foo", "mt4/foo/bar"} {
- testResolve(t, r, ns, m, addWSName(mts[m].name)...)
+ testResolve(t, r, ns, m, mts[m].name)
}
testResolveToMountTable(t, r, ns, "mt4/foo",
- addWSName(naming.JoinAddressName(mts[mt4MP].name, "foo"))...)
+ naming.JoinAddressName(mts[mt4MP].name, "foo"))
testResolveToMountTable(t, r, ns, "mt4/foo/bar",
- addWSName(naming.JoinAddressName(mts["mt4/foo"].name, "bar"))...)
+ naming.JoinAddressName(mts["mt4/foo"].name, "bar"))
testResolveToMountTable(t, r, ns, "mt4/foo/baz",
- addWSName(naming.JoinAddressName(mts["mt4/foo"].name, "baz"))...)
+ naming.JoinAddressName(mts["mt4/foo"].name, "baz"))
}
// TestServers tests invoking RPCs on simple servers
@@ -411,16 +406,16 @@
// Let's run some non-mount table services
for _, j := range []string{j1MP, j2MP, j3MP} {
- testResolve(t, r, ns, j, addWSName(jokes[j].name)...)
+ testResolve(t, r, ns, j, jokes[j].name)
knockKnock(t, r, j)
globalName := naming.JoinAddressName(mts["mt4"].name, j)
disp := &dispatcher{}
gj := "g_" + j
jokes[gj] = runServer(t, r, disp, globalName)
- testResolve(t, r, ns, "mt4/"+j, addWSName(jokes[gj].name)...)
+ testResolve(t, r, ns, "mt4/"+j, jokes[gj].name)
knockKnock(t, r, "mt4/"+j)
- testResolveToMountTable(t, r, ns, "mt4/"+j, addWSName(globalName)...)
- testResolveToMountTable(t, r, ns, "mt4/"+j+"/garbage", addWSName(globalName+"/garbage")...)
+ testResolveToMountTable(t, r, ns, "mt4/"+j, globalName)
+ testResolveToMountTable(t, r, ns, "mt4/"+j+"/garbage", globalName+"/garbage")
}
}
@@ -568,7 +563,7 @@
}
// Since c1 was mounted with the Serve call, it will have both the tcp and ws endpoints.
- testResolve(t, r, ns, "c1", addWSName(c1.name)...)
+ testResolve(t, r, ns, "c1", c1.name)
testResolve(t, r, ns, "c1/c2", c1.name)
testResolve(t, r, ns, "c1/c3", c3.name)
testResolve(t, r, ns, "c1/c3/c4", c1.name)
@@ -675,9 +670,9 @@
}
// Now check a matching pattern.
- testResolveWithPattern(t, r, ns, name, naming.RootBlessingPatternOpt("root/server"), addWSName(mts[mt2MP].name)...)
+ testResolveWithPattern(t, r, ns, name, naming.RootBlessingPatternOpt("root/server"), mts[mt2MP].name)
testResolveToMountTableWithPattern(t, r, ns, name, naming.RootBlessingPatternOpt("root/server"), name)
// After successful lookup it should be cached, so the pattern doesn't matter.
- testResolveWithPattern(t, r, ns, name, naming.RootBlessingPatternOpt("root/foobar"), addWSName(mts[mt2MP].name)...)
+ testResolveWithPattern(t, r, ns, name, naming.RootBlessingPatternOpt("root/foobar"), mts[mt2MP].name)
}
diff --git a/runtimes/google/rt/mgmt.go b/runtimes/google/rt/mgmt.go
index a223f02..ae4a949 100644
--- a/runtimes/google/rt/mgmt.go
+++ b/runtimes/google/rt/mgmt.go
@@ -74,7 +74,7 @@
if address == "" {
return nil, fmt.Errorf("%v is not set", mgmt.AddressConfigKey)
}
- return &ipc.ListenSpec{Protocol: protocol, Address: address}, nil
+ return &ipc.ListenSpec{Addrs: ipc.ListenAddrs{{protocol, address}}}, nil
}
func (rt *vrt) callbackToParent(parentName, myName string) error {
diff --git a/runtimes/google/vtrace/vtrace_test.go b/runtimes/google/vtrace/vtrace_test.go
index 250f4c9..6e2e64b 100644
--- a/runtimes/google/vtrace/vtrace_test.go
+++ b/runtimes/google/vtrace/vtrace_test.go
@@ -13,7 +13,7 @@
"veyron.io/veyron/veyron2/vlog"
"veyron.io/veyron/veyron2/vtrace"
- _ "veyron.io/veyron/veyron/lib/tcp"
+ "veyron.io/veyron/veyron/profiles"
iipc "veyron.io/veyron/veyron/runtimes/google/ipc"
"veyron.io/veyron/veyron/runtimes/google/ipc/stream/manager"
tnaming "veyron.io/veyron/veyron/runtimes/google/testing/mocks/naming"
@@ -103,7 +103,7 @@
if err != nil {
return nil, err
}
- if _, err := s.Listen(ipc.ListenSpec{Protocol: "tcp", Address: "127.0.0.1:0"}); err != nil {
+ if _, err := s.Listen(profiles.LocalListenSpec); err != nil {
return nil, err
}