Merge "veyron2/ipc: modify ListenSpec to accept a slice of addresses."
diff --git a/lib/flags/flags.go b/lib/flags/flags.go
index bbd71ce..a6d7888 100644
--- a/lib/flags/flags.go
+++ b/lib/flags/flags.go
@@ -123,11 +123,52 @@
return af.flag.files[name]
}
+// ListenAddrs is the set of listen addresses captured from the command line.
+// ListenAddrs mirrors ipc.ListenAddrs.
+type ListenAddrs []struct {
+ Protocol, Address string
+}
+
// ListenFlags contains the values of the Listen flag group.
type ListenFlags struct {
- ListenProtocol TCPProtocolFlag
- ListenAddress IPHostPortFlag
- ListenProxy string
+ Addrs ListenAddrs
+ ListenProxy string
+ protocol TCPProtocolFlag
+ addresses ipHostPortFlagVar
+}
+
+type ipHostPortFlagVar struct {
+ validator IPHostPortFlag
+ flags *ListenFlags
+}
+
+// Implements flag.Value.Get
+func (ip ipHostPortFlagVar) Get() interface{} {
+ return ip.String()
+}
+
+// Implements flag.Value.Set
+func (ip *ipHostPortFlagVar) Set(s string) error {
+ if err := ip.validator.Set(s); err != nil {
+ return err
+ }
+ a := struct {
+ Protocol, Address string
+ }{
+ ip.flags.protocol.String(),
+ ip.validator.String(),
+ }
+ ip.flags.Addrs = append(ip.flags.Addrs, a)
+ return nil
+}
+
+// Implements flag.Value.String
+func (ip ipHostPortFlagVar) String() string {
+ s := ""
+ for _, a := range ip.flags.Addrs {
+ s += fmt.Sprintf("(%s %s)", a.Protocol, a.Address)
+ }
+ return s
}
// createAndRegisterRuntimeFlags creates and registers the RuntimeFlags
@@ -160,11 +201,14 @@
// createAndRegisterListenFlags creates and registers the ListenFlags
// group with the supplied flag.FlagSet.
func createAndRegisterListenFlags(fs *flag.FlagSet) *ListenFlags {
- f := &ListenFlags{}
- f.ListenProtocol = TCPProtocolFlag{"tcp"}
- f.ListenAddress = IPHostPortFlag{Port: "0"}
- fs.Var(&f.ListenProtocol, "veyron.tcp.protocol", "protocol to listen with")
- fs.Var(&f.ListenAddress, "veyron.tcp.address", "address to listen on")
+ f := &ListenFlags{
+ protocol: TCPProtocolFlag{"tcp"},
+ addresses: ipHostPortFlagVar{validator: IPHostPortFlag{Port: "0"}},
+ }
+ f.addresses.flags = f
+
+ fs.Var(&f.protocol, "veyron.tcp.protocol", "protocol to listen with")
+ fs.Var(&f.addresses, "veyron.tcp.address", "address to listen on")
fs.StringVar(&f.ListenProxy, "veyron.proxy", "", "object name of proxy service to use to export services across network boundaries")
return f
}
@@ -209,7 +253,16 @@
// method can be used for testing to see if any given group was configured.
func (f *Flags) ListenFlags() ListenFlags {
if p := f.groups[Listen]; p != nil {
- return *(p.(*ListenFlags))
+ lf := p.(*ListenFlags)
+ n := *lf
+ if len(lf.Addrs) == 0 {
+ n.Addrs = ListenAddrs{{n.protocol.String(),
+ n.addresses.validator.String()}}
+ return n
+ }
+ n.Addrs = make(ListenAddrs, len(lf.Addrs))
+ copy(n.Addrs, lf.Addrs)
+ return n
}
return ListenFlags{}
}
diff --git a/lib/flags/flags_test.go b/lib/flags/flags_test.go
index a3da088..eb712e9 100644
--- a/lib/flags/flags_test.go
+++ b/lib/flags/flags_test.go
@@ -77,7 +77,6 @@
}
fs = flag.NewFlagSet("test", flag.ContinueOnError)
- //fs.SetOutput(ioutil.Discard)
fl = flags.CreateAndRegister(fs, flags.ACL)
args = []string{"--veyron.acl=noname"}
err = fl.Parse(args)
@@ -99,7 +98,7 @@
if got, want := fl.RuntimeFlags().NamespaceRoots, roots; !reflect.DeepEqual(got, want) {
t.Errorf("got %v, want %v", got, want)
}
- if got, want := lf.ListenAddress.String(), addr; got != want {
+ if got, want := lf.Addrs[0].Address, addr; got != want {
t.Errorf("got %q, want %q", got, want)
}
}
@@ -180,3 +179,38 @@
t.Errorf("got %q, want %q", got, want)
}
}
+
+func TestListenFlags(t *testing.T) {
+ fl := flags.CreateAndRegister(flag.NewFlagSet("test", flag.ContinueOnError), flags.Listen)
+ if err := fl.Parse([]string{}); err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+ lf := fl.ListenFlags()
+ if got, want := len(lf.Addrs), 1; got != want {
+ t.Errorf("got %d, want %d", got, want)
+ }
+ def := struct{ Protocol, Address string }{"tcp", ":0"}
+ if got, want := lf.Addrs[0], def; !reflect.DeepEqual(got, want) {
+ t.Errorf("got %v, want %v", got, want)
+ }
+
+ fl = flags.CreateAndRegister(flag.NewFlagSet("test", flag.ContinueOnError), flags.Listen)
+ if err := fl.Parse([]string{
+ "--veyron.tcp.address=172.0.0.1:10", "--veyron.tcp.protocol=ws", "--veyron.tcp.address=127.0.0.10:34", "--veyron.tcp.protocol=tcp6", "--veyron.tcp.address=172.0.0.100:100"}); err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+ lf = fl.ListenFlags()
+ if got, want := len(lf.Addrs), 3; got != want {
+ t.Errorf("got %d, want %d", got, want)
+ }
+ for i, p := range []string{"tcp", "ws", "tcp6"} {
+ if got, want := lf.Addrs[i].Protocol, p; got != want {
+ t.Errorf("got %q, want %q", got, want)
+ }
+ }
+ for i, p := range []string{"172.0.0.1:10", "127.0.0.10:34", "172.0.0.100:100"} {
+ if got, want := lf.Addrs[i].Address, p; got != want {
+ t.Errorf("got %q, want %q", got, want)
+ }
+ }
+}
diff --git a/lib/flags/listen.go b/lib/flags/listen.go
index 929dd56..1812fe4 100644
--- a/lib/flags/listen.go
+++ b/lib/flags/listen.go
@@ -7,8 +7,8 @@
)
// TCPProtocolFlag implements flag.Value to provide validation of the
-// command line values passed to it: tcp, tcp4 or tcp6 being the
-// only allowed values.
+// command line values passed to it: tcp, tcp4 or tcp6, ws, ws4 and ws6
+// being the only allowed values.
type TCPProtocolFlag struct{ Protocol string }
// Implements flag.Value.Get
@@ -19,7 +19,7 @@
// Implements flag.Value.Set
func (t *TCPProtocolFlag) Set(s string) error {
switch s {
- case "tcp", "tcp4", "tcp6":
+ case "tcp", "tcp4", "tcp6", "ws", "ws4", "ws6":
t.Protocol = s
return nil
default:
diff --git a/lib/flags/main.go b/lib/flags/main.go
index cae781a..044c91e 100644
--- a/lib/flags/main.go
+++ b/lib/flags/main.go
@@ -22,8 +22,9 @@
fmt.Printf("Runtime: Credentials: %s\n", rtf.Credentials)
fmt.Printf("Runtime: Namespace Roots: %s\n", rtf.NamespaceRoots)
lf := fl.ListenFlags()
- fmt.Printf("Listen: Protocol %q\n", lf.ListenProtocol)
- fmt.Printf("Listen: Address %q\n", lf.ListenAddress)
+ for _, a := range lf.ListenAddrs {
+ fmt.Printf("Listen: Protocol %q, Address %q\n", a.Protocol, a.Address)
+ }
fmt.Printf("Listen: Proxy %q\n", lf.ListenProxy)
fmt.Printf("ACL: %v\n", fl.ACLFlags())
}
diff --git a/lib/modules/core/proxy.go b/lib/modules/core/proxy.go
index d1d2477..3a46b47 100644
--- a/lib/modules/core/proxy.go
+++ b/lib/modules/core/proxy.go
@@ -3,7 +3,6 @@
import (
"fmt"
"io"
- "strings"
"time"
"veyron.io/veyron/veyron2/naming"
@@ -19,65 +18,58 @@
}
func proxyServer(stdin io.Reader, stdout, stderr io.Writer, env map[string]string, args ...string) error {
+ r, err := rt.New()
+ if err != nil {
+ panic(err)
+ }
fl, args, err := parseListenFlags(args)
if err != nil {
return fmt.Errorf("failed to parse args: %s", err)
}
- // TODO(sadovsky): Why does this require >=1 arg? Seems 0 should be fine.
- // Also note, we have no way to specify ">=0".
- if err := checkArgs(args, -1, ""); err != nil {
- return err
- }
expected := len(args)
rid, err := naming.NewRoutingID()
if err != nil {
return err
}
lf := fl.ListenFlags()
- // TODO(ashankar): Set the second argument to r.Principal() once the
- // old security model is no longer operational.
- proxy, err := proxy.New(rid, nil, lf.ListenProtocol.String(), lf.ListenAddress.String(), "")
+
+ proxy, err := proxy.New(rid, r.Principal(), lf.Addrs[0].Protocol, lf.Addrs[0].Address, "")
if err != nil {
return err
}
defer proxy.Shutdown()
+
pname := naming.JoinAddressName(proxy.Endpoint().String(), "")
fmt.Fprintf(stdout, "PROXY_ADDR=%s\n", proxy.Endpoint().String())
fmt.Fprintf(stdout, "PROXY_NAME=%s\n", pname)
-
- r, err := rt.New()
- if err != nil {
- panic(err)
- }
- defer r.Cleanup()
-
- pub := publisher.New(r.NewContext(), r.Namespace(), time.Minute)
- defer pub.WaitForStop()
- defer pub.Stop()
- pub.AddServer(pname, false)
- // If the protocol is tcp we need to also publish the websocket endpoint.
- // TODO(bjornick): Remove this hack before we launch.
- if strings.HasPrefix(proxy.Endpoint().Addr().Network(), "tcp") {
- wsEP := strings.Replace(pname, "@"+proxy.Endpoint().Addr().Network()+"@", "@ws@", 1)
- pub.AddServer(wsEP, false)
- }
- for _, name := range args {
- pub.AddName(name)
- }
- // Wait for all the entries to be published.
- for {
- got := len(pub.Published())
- if expected == got {
- break
+ if expected > 0 {
+ defer r.Cleanup()
+ pub := publisher.New(r.NewContext(), r.Namespace(), time.Minute)
+ defer pub.WaitForStop()
+ defer pub.Stop()
+ pub.AddServer(pname, false)
+ for _, name := range args {
+ if len(name) == 0 {
+ return fmt.Errorf("empty name specified on the command line")
+ }
+ pub.AddName(name)
}
- fmt.Fprintf(stderr, "%s\n", pub.DebugString())
- delay := time.Second
- fmt.Fprintf(stderr, "Sleeping: %s\n", delay)
- time.Sleep(delay)
- }
- for _, p := range pub.Published() {
- fmt.Fprintf(stdout, "PUBLISHED_PROXY_NAME=%s\n", p)
+ // Wait for all the entries to be published.
+ for {
+ got := len(pub.Published())
+ if expected == got {
+ break
+ }
+ fmt.Fprintf(stderr, "%s\n", pub.DebugString())
+ delay := time.Second
+ fmt.Fprintf(stderr, "Sleeping: %s\n", delay)
+ time.Sleep(delay)
+ }
+ for _, p := range pub.Published() {
+ fmt.Fprintf(stdout, "PUBLISHED_PROXY_NAME=%s\n", p)
+ }
}
modules.WaitForEOF(stdin)
+ fmt.Fprintf(stdout, "DONE\n")
return nil
}
diff --git a/lib/modules/core/util.go b/lib/modules/core/util.go
index a429651..4207bf4 100644
--- a/lib/modules/core/util.go
+++ b/lib/modules/core/util.go
@@ -28,9 +28,8 @@
func initListenSpec(fl *flags.Flags) ipc.ListenSpec {
lf := fl.ListenFlags()
return ipc.ListenSpec{
- Protocol: lf.ListenProtocol.String(),
- Address: lf.ListenAddress.String(),
- Proxy: lf.ListenProxy,
+ Addrs: ipc.ListenAddrs(lf.Addrs),
+ Proxy: lf.ListenProxy,
}
}
diff --git a/lib/modules/exec.go b/lib/modules/exec.go
index 2d517d7..2e2271a 100644
--- a/lib/modules/exec.go
+++ b/lib/modules/exec.go
@@ -139,6 +139,7 @@
eh.stdin = stdin
eh.handle = handle
eh.cmd = cmd
+ vlog.VI(1).Infof("Start: %q stderr: %s", eh.name, stderr.Name())
vlog.VI(1).Infof("Start: %q args: %v", eh.name, cmd.Args)
vlog.VI(2).Infof("Start: %q env: %v", eh.name, cmd.Env)
if err := handle.Start(); err != nil {
diff --git a/lib/tcp/init.go b/lib/tcp/init.go
index a62422e..bbc525c 100644
--- a/lib/tcp/init.go
+++ b/lib/tcp/init.go
@@ -1,37 +1,13 @@
package tcp
import (
- "fmt"
"net"
- "time"
"veyron.io/veyron/veyron2/ipc/stream"
-
- "veyron.io/veyron/veyron/lib/websocket"
)
-func dialer(network, address string, timeout time.Duration) (net.Conn, error) {
- conn, err := net.DialTimeout(network, address, timeout)
- if err != nil {
- return nil, err
- }
- // For tcp connections we add an extra magic byte so we can differentiate between
- // raw tcp and websocket on the same port.
- switch n, err := conn.Write([]byte{websocket.BinaryMagicByte}); {
- case err != nil:
- return nil, err
- case n != 1:
- return nil, fmt.Errorf("Unable to write the magic byte")
- }
- return conn, nil
-}
-
-func listener(network, address string) (net.Listener, error) {
- return net.Listen(network, address)
-}
-
func init() {
for _, p := range []string{"tcp", "tcp4", "tcp6"} {
- stream.RegisterProtocol(p, dialer, listener)
+ stream.RegisterProtocol(p, net.DialTimeout, net.Listen)
}
}
diff --git a/lib/testutil/integration/util.go b/lib/testutil/integration/util.go
index 4e7e1d9..05ab2e2 100644
--- a/lib/testutil/integration/util.go
+++ b/lib/testutil/integration/util.go
@@ -102,12 +102,14 @@
go func() {
defer outPipe.Close()
scanner := bufio.NewScanner(outPipe)
- nmounts := 0
+ mounts := 0
for scanner.Scan() {
line := scanner.Text()
+ // TODO(cnicolaou): find a better way of synchronizing with
+ // the child process, this is way too fragile.
if strings.Index(line, "ipc pub: mount") != -1 {
- nmounts++
- if nmounts == 2 {
+ mounts++
+ if mounts == 1 {
close(ready)
}
}
diff --git a/lib/websocket/conn.go b/lib/websocket/conn.go
index e6963c4..3a291ea 100644
--- a/lib/websocket/conn.go
+++ b/lib/websocket/conn.go
@@ -4,11 +4,12 @@
import (
"fmt"
- "github.com/gorilla/websocket"
"io"
"net"
"sync"
"time"
+
+ "github.com/gorilla/websocket"
)
// WebsocketConn provides a net.Conn interface for a websocket connection.
diff --git a/lib/websocket/dialer.go b/lib/websocket/dialer.go
index dea1262..111acb2 100644
--- a/lib/websocket/dialer.go
+++ b/lib/websocket/dialer.go
@@ -3,10 +3,11 @@
package websocket
import (
- "github.com/gorilla/websocket"
"net"
"net/http"
"net/url"
+
+ "github.com/gorilla/websocket"
)
func Dial(address string) (net.Conn, error) {
@@ -23,6 +24,5 @@
if err != nil {
return nil, err
}
-
return WebsocketConn(ws), nil
}
diff --git a/lib/websocket/init.go b/lib/websocket/init.go
index ac60189..314c07a 100644
--- a/lib/websocket/init.go
+++ b/lib/websocket/init.go
@@ -7,8 +7,11 @@
"veyron.io/veyron/veyron2/ipc/stream"
)
+var mapWebSocketToTCP = map[string]string{"ws": "tcp", "ws4": "tcp4", "ws6": "tcp6"}
+
func wsListener(protocol, address string) (net.Listener, error) {
- ln, err := net.Listen(protocol, address)
+ tcp := mapWebSocketToTCP[protocol]
+ ln, err := net.Listen(tcp, address)
if err != nil {
return nil, err
}
@@ -21,7 +24,48 @@
}
func init() {
+ // ws, ws4, ws6 represent websocket protocol instances.
for _, p := range []string{"ws", "ws4", "ws6"} {
stream.RegisterProtocol(p, wsDialer, wsListener)
}
+
+ // TODO(cnicolaou): fully enable and test this 'hybrid mode'.
+ // hws, hws4, hws6 represent a 'hybrid' protocol that can accept
+ // both websockets and tcp, using a 'magic' byte to discriminate
+ // between the two. These are needed when a single network port must
+ // be use to serve both websocket and tcp clients, we prefer to use
+ // tcp whenever we can to avoid the overhead of websockets. Clients
+ // decide whether to use hybrid tcp or websockets by electing to dial
+ // using the hws protocol or the ws protocol respectively.
+ //for _, p := range []string{"wsh", "wsh4", "wsh6"} {
+ // stream.RegisterProtocol(p, tcpHybridDialer, wsHybridListener)
+ //}
+
+ // The implementation strategy is as follows:
+ // tcpHybridDialer will create and return a wrapped net.Conn which will
+ // write the 'magic' time the first time that its Write method is called
+ // but will otherwise be indistinguishable from the underlying net.Conn.
+ // This first write will require an extra copy, but avoid potentially
+ // sending two packets.
+ // wsHybridListener is essentially the same as the current wsTCPListener,
+ // but the magic byte handling implemented on a conditional basis.
}
+
+/*
+func dialer(network, address string, timeout time.Duration) (net.Conn, error) {
+ conn, err := net.DialTimeout(network, address, timeout)
+ if err != nil {
+ return nil, err
+ }
+ // For tcp connections we add an extra magic byte so we can differentiate between
+ // raw tcp and websocket on the same port.
+ switch n, err := conn.Write([]byte{websocket.BinaryMagicByte}); {
+ case err != nil:
+ return nil, err
+ case n != 1:
+ return nil, fmt.Errorf("Unable to write the magic byte")
+ }
+ return conn, nil
+ }
+}
+*/
diff --git a/lib/websocket/listener.go b/lib/websocket/listener.go
index 3c4922e..10d3f51 100644
--- a/lib/websocket/listener.go
+++ b/lib/websocket/listener.go
@@ -3,7 +3,6 @@
package websocket
import (
- "bufio"
"errors"
"fmt"
"net"
@@ -42,6 +41,7 @@
wsLoop sync.WaitGroup
}
+/*
// bufferedConn is used to allow us to Peek at the first byte to see if it
// is the magic byte used by veyron tcp requests. Other than that it behaves
// like a normal net.Conn.
@@ -63,6 +63,7 @@
func (c *bufferedConn) Read(p []byte) (int, error) {
return c.r.Read(p)
}
+*/
// queueListener is a listener that returns connections that are in q.
type queueListener struct {
@@ -144,35 +145,39 @@
return
}
vlog.VI(1).Infof("New net.Conn accepted from %s (local address: %s)", conn.RemoteAddr(), conn.LocalAddr())
- bc := newBufferedConn(conn)
- magic, err := bc.Peek(1)
- if err != nil {
- vlog.VI(1).Infof("Shutting down conn from %s (local address: %s) as the magic byte failed to be read: %v", conn.RemoteAddr(), conn.LocalAddr(), err)
- bc.Close()
- continue
- }
+ /*
+ bc := newBufferedConn(conn)
+ magic, err := bc.Peek(1)
+ if err != nil {
+ vlog.VI(1).Infof("Shutting down conn from %s (local address: %s) as the magic byte failed to be read: %v", conn.RemoteAddr(), conn.LocalAddr(), err)
+ bc.Close()
+ continue
+ }
- vlog.VI(1).Infof("Got a connection from %s (local address: %s)", conn.RemoteAddr(), conn.LocalAddr())
- // Check to see if it is a regular connection or a http connection.
- if magic[0] == BinaryMagicByte {
- if _, err := bc.r.ReadByte(); err != nil {
- vlog.VI(1).Infof("Shutting down conn from %s (local address: %s), could read past the magic byte: %v", conn.RemoteAddr(), conn.LocalAddr(), err)
- bc.Close()
- continue
- }
- if err := ln.q.Put(&bc); err != nil {
- vlog.VI(1).Infof("Shutting down conn from %s (local address: %s) as Put failed in vifLoop: %v", conn.RemoteAddr(), conn.LocalAddr(), err)
- bc.Close()
- continue
- }
- continue
- }
+ vlog.VI(1).Infof("Got a connection from %s (local address: %s)", conn.RemoteAddr(), conn.LocalAddr())
+ // Check to see if it is a regular connection or a http connection.
+ if magic[0] == BinaryMagicByte {
+ if _, err := bc.r.ReadByte(); err != nil {
+ vlog.VI(1).Infof("Shutting down conn from %s (local address: %s), could read past the magic byte: %v", conn.RemoteAddr(), conn.LocalAddr(), err)
+ bc.Close()
+ continue
+ }
+ if err := ln.q.Put(&bc); err != nil {
+ vlog.VI(1).Infof("Shutting down conn from %s (local address: %s) as Put failed in vifLoop: %v", conn.RemoteAddr(), conn.LocalAddr(), err)
+ bc.Close()
+ continue
+ }
+ continue
+ }
+ */
ln.wsLoop.Add(1)
- if err := ln.httpQ.Put(&bc); err != nil {
+ // if err := ln.httpQ.Put(&bc); err != nil {
+ if err := ln.httpQ.Put(conn); err != nil {
ln.wsLoop.Done()
vlog.VI(1).Infof("Shutting down conn from %s (local address: %s) as Put failed in vifLoop: %v", conn.RemoteAddr(), conn.LocalAddr(), err)
- bc.Close()
+ //bc.Close()
+ conn.Close()
continue
}
}
@@ -207,6 +212,17 @@
return nil
}
+type addr struct{ n, a string }
+
+func (a *addr) Network() string {
+ return a.n
+}
+
+func (a *addr) String() string {
+ return a.a
+}
+
func (ln *wsTCPListener) Addr() net.Addr {
- return ln.netLn.Addr()
+ a := &addr{"ws", ln.netLn.Addr().String()}
+ return a
}
diff --git a/profiles/gce/init.go b/profiles/gce/init.go
index 49ead70..815b6dd 100644
--- a/profiles/gce/init.go
+++ b/profiles/gce/init.go
@@ -25,17 +25,15 @@
)
var (
- listenAddressFlag = flags.IPHostPortFlag{Port: "0"}
+ commonFlags *flags.Flags
- ListenSpec = &ipc.ListenSpec{
- Protocol: "tcp",
- Address: "127.0.0.1:0",
- }
+ // ListenSpec is an initialized instance of ipc.ListenSpec that can
+ // be used with ipc.Listen.
+ ListenSpec ipc.ListenSpec
)
func init() {
- flag.Var(&listenAddressFlag, "veyron.tcp.address", "address to listen on")
-
+ commonFlags = flags.CreateAndRegister(flag.CommandLine, flags.Listen)
rt.RegisterProfile(&profile{})
}
@@ -64,8 +62,15 @@
if !gce.RunningOnGCE() {
return nil, fmt.Errorf("GCE profile used on a non-GCE system")
}
+
+ lf := commonFlags.ListenFlags()
+ ListenSpec = ipc.ListenSpec{
+ Addrs: ipc.ListenAddrs(lf.Addrs),
+ Proxy: lf.ListenProxy,
+ }
+
p.ac = appcycle.New()
- ListenSpec.Address = listenAddressFlag.String()
+
if ip, err := gce.ExternalIPAddress(); err != nil {
return p.ac, err
} else {
diff --git a/profiles/generic.go b/profiles/generic.go
index 421ff22..5b6f2d7 100644
--- a/profiles/generic.go
+++ b/profiles/generic.go
@@ -16,8 +16,7 @@
// LocalListenSpec is a ListenSpec for 127.0.0.1.
var LocalListenSpec = ipc.ListenSpec{
- Protocol: "tcp",
- Address: "127.0.0.1:0",
+ Addrs: ipc.ListenAddrs{{"tcp", "127.0.0.1:0"}},
AddressChooser: internal.IPAddressChooser,
}
diff --git a/profiles/roaming/roaming.go b/profiles/roaming/roaming.go
index 3d4d638..c772848 100644
--- a/profiles/roaming/roaming.go
+++ b/profiles/roaming/roaming.go
@@ -83,9 +83,8 @@
lf := commonFlags.ListenFlags()
ListenSpec = ipc.ListenSpec{
- Protocol: lf.ListenProtocol.Protocol,
- Address: lf.ListenAddress.String(),
- Proxy: lf.ListenProxy,
+ Addrs: ipc.ListenAddrs(lf.Addrs),
+ Proxy: lf.ListenProxy,
}
p.ac = appcycle.New()
@@ -158,6 +157,8 @@
log := rt.Logger()
+ // TODO(cnicolaou): add support for listening on multiple network addresses.
+
done:
for {
select {
@@ -182,7 +183,7 @@
ch <- ipc.NewRmAddrsSetting(removed)
}
// We will always send the best currently available address
- if chosen, err := listenSpec.AddressChooser(listenSpec.Protocol, cur); err == nil && chosen != nil {
+ if chosen, err := listenSpec.AddressChooser(listenSpec.Addrs[0].Protocol, cur); err == nil && chosen != nil {
ch <- ipc.NewAddAddrsSetting(chosen)
}
prev = cur
diff --git a/profiles/static/static.go b/profiles/static/static.go
index f30137c..a6b03c3 100644
--- a/profiles/static/static.go
+++ b/profiles/static/static.go
@@ -67,9 +67,8 @@
lf := commonFlags.ListenFlags()
ListenSpec = ipc.ListenSpec{
- Protocol: lf.ListenProtocol.Protocol,
- Address: lf.ListenAddress.String(),
- Proxy: lf.ListenProxy,
+ Addrs: ipc.ListenAddrs(lf.Addrs),
+ Proxy: lf.ListenProxy,
}
p.ac = appcycle.New()
diff --git a/runtimes/google/ipc/cancel_test.go b/runtimes/google/ipc/cancel_test.go
index 0ece305..f7cf101 100644
--- a/runtimes/google/ipc/cancel_test.go
+++ b/runtimes/google/ipc/cancel_test.go
@@ -59,7 +59,7 @@
if err != nil {
return nil, err
}
- if _, err := s.Listen(ipc.ListenSpec{Protocol: "tcp", Address: "127.0.0.1:0"}); err != nil {
+ if _, err := s.Listen(listenSpec); err != nil {
return nil, err
}
diff --git a/runtimes/google/ipc/client.go b/runtimes/google/ipc/client.go
index fae6ce0..4891254 100644
--- a/runtimes/google/ipc/client.go
+++ b/runtimes/google/ipc/client.go
@@ -354,7 +354,7 @@
span.Annotatef("address:%v", server)
defer span.Finish()
if status.flow, status.suffix, err = c.connectFlow(ctx, server, noDischarges); err != nil {
- vlog.VI(2).Infof("ipc: err: %s", err)
+ vlog.VI(2).Infof("ipc: connect to %s: %s", server, err)
status.err = err
status.flow = nil
}
@@ -861,7 +861,6 @@
// always return an error. But this isn't a "real" error; the client should
// read the rest of the results and succeed.
_ = fc.closeSend()
-
// Decode the response header, if it hasn't already been decoded by Recv.
if fc.response.Error == nil && !fc.response.EndStreamResults {
if err := fc.dec.Decode(&fc.response); err != nil {
@@ -874,10 +873,8 @@
return fc.close(berr)
}
}
-
// Incorporate any VTrace info that was returned.
ivtrace.MergeResponse(fc.ctx, &fc.response.TraceResponse)
-
if fc.response.Error != nil {
// TODO(cnicolaou): remove verror.NoAccess with verror version
// when ipc.Server is converted.
diff --git a/runtimes/google/ipc/client_test.go b/runtimes/google/ipc/client_test.go
index e6e6b2e..e8fb20f 100644
--- a/runtimes/google/ipc/client_test.go
+++ b/runtimes/google/ipc/client_test.go
@@ -10,7 +10,6 @@
"time"
"veyron.io/veyron/veyron2"
- "veyron.io/veyron/veyron2/ipc"
"veyron.io/veyron/veyron2/naming"
"veyron.io/veyron/veyron2/rt"
verror "veyron.io/veyron/veyron2/verror2"
@@ -108,6 +107,11 @@
s := expect.NewSession(t, srv.Stdout(), time.Minute)
s.ExpectVar("NAME")
+ // Verify that there are 1 entries for echoServer in the mount table.
+ if got, want := numServers(t, "echoServer"), 1; got != want {
+ t.Fatalf("got: %q, want: %q", got, want)
+ }
+
runClient(t, sh)
// Create a fake set of 100 entries in the mount table
@@ -121,9 +125,9 @@
}
}
- // Verify that there are 102 entries for echoServer in the mount table.
- if got, want := numServers(t, "echoServer"), 102; got != want {
- t.Fatalf("got: %d, want: %d", got, want)
+ // Verify that there are 101 entries for echoServer in the mount table.
+ if got, want := numServers(t, "echoServer"), 101; got != want {
+ t.Fatalf("got: %q, want: %q", got, want)
}
// TODO(cnicolaou): ok, so it works, but I'm not sure how
@@ -152,46 +156,6 @@
}
}
-type sleeper struct {
- done <-chan struct{}
-}
-
-func (s *sleeper) Sleep(call ipc.ServerContext) error {
- select {
- case <-s.done:
- case <-time.After(time.Hour):
- }
- return nil
-}
-
-func (s *sleeper) Ping(call ipc.ServerContext) (string, error) {
- return "pong", nil
-}
-
-func (s *sleeper) Source(call ipc.ServerCall, start int) error {
- i := start
- backoff := 25 * time.Millisecond
- for {
- select {
- case <-s.done:
- return nil
- case <-time.After(backoff):
- call.Send(i)
- i++
- }
- backoff *= 2
- }
-}
-
-func (s *sleeper) Sink(call ipc.ServerCall) (int, error) {
- i := 0
- for {
- if err := call.Recv(&i); err != nil {
- return i, verror.Convert(verror.Internal, call, err)
- }
- }
-}
-
func childPing(stdin io.Reader, stdout, stderr io.Writer, env map[string]string, args ...string) error {
name := args[1]
call, err := r.Client().StartCall(r.NewContext(), name, "Ping", nil)
@@ -210,7 +174,7 @@
return nil
}
-func initServer(t *testing.T, r veyron2.Runtime) (string, ipc.Server, func()) {
+func initServer(t *testing.T, r veyron2.Runtime) (string, func()) {
server, err := r.NewServer()
if err != nil {
t.Fatalf("unexpected error: %s", err)
@@ -222,9 +186,9 @@
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
- server.Serve("", &sleeper{done}, nil)
+ server.Serve("", &simple{done}, nil)
name := naming.JoinAddressName(ep.String(), "")
- return name, server, deferFn
+ return name, deferFn
}
func testForVerror(t *testing.T, err error, verr ...verror.IDAction) {
@@ -250,7 +214,7 @@
}
func TestTimeoutResponse(t *testing.T) {
- name, _, fn := initServer(t, r)
+ name, fn := initServer(t, r)
defer fn()
ctx, _ := r.NewContext().WithTimeout(100 * time.Millisecond)
call, err := r.Client().StartCall(ctx, name, "Sleep", nil)
@@ -264,7 +228,7 @@
}
func TestArgsAndResponses(t *testing.T) {
- name, _, fn := initServer(t, r)
+ name, fn := initServer(t, r)
defer fn()
call, err := r.Client().StartCall(r.NewContext(), name, "Sleep", []interface{}{"too many args"})
@@ -291,7 +255,7 @@
// The server and client use different runtimes and hence different
// principals and without any shared blessings the server will deny
// access to the client
- name, _, fn := initServer(t, r1)
+ name, fn := initServer(t, r1)
defer fn()
client := r2.Client()
@@ -304,7 +268,7 @@
}
func TestCancelledBeforeFinish(t *testing.T) {
- name, _, fn := initServer(t, r)
+ name, fn := initServer(t, r)
defer fn()
ctx, cancel := r.NewContext().WithCancel()
@@ -320,7 +284,7 @@
}
func TestCancelledDuringFinish(t *testing.T) {
- name, _, fn := initServer(t, r)
+ name, fn := initServer(t, r)
defer fn()
ctx, cancel := r.NewContext().WithCancel()
@@ -375,7 +339,7 @@
sh, fn := runMountTable(t, r)
defer fn()
- name, _, fn := initServer(t, r)
+ name, fn := initServer(t, r)
defer fn()
srv, err := sh.Start("ping", nil, name)
@@ -389,7 +353,7 @@
}
func TestStreamTimeout(t *testing.T) {
- name, _, fn := initServer(t, r)
+ name, fn := initServer(t, r)
defer fn()
want := 10
@@ -422,7 +386,7 @@
}
func TestStreamAbort(t *testing.T) {
- name, _, fn := initServer(t, r)
+ name, fn := initServer(t, r)
defer fn()
ctx := r.NewContext()
@@ -446,9 +410,14 @@
if verr != nil {
t.Fatalf("unexpected error: %s", verr)
}
- if !verror.Is(err, "veyron.io/veyron/veyron2/verror.Internal") || err.Error() != `ipc.test:"".Sink: Internal error: EOF` {
+ if !verror.Is(err, verror.Unknown.ID) || err.Error() != `veyron.io/veyron/veyron2/verror.Unknown: EOF` {
t.Errorf("wrong error: %#v", err)
}
+ /* TODO(cnicolaou): use this when verror2/vom transition is done.
+ if err != nil && !verror.Is(err, verror.EOF.ID) {
+ t.Fatalf("unexpected error: %#v", err)
+ }
+ */
if got := result; got != want {
t.Errorf("got %d, want %d", got, want)
}
diff --git a/runtimes/google/ipc/full_test.go b/runtimes/google/ipc/full_test.go
index e7a680c..729b8c6 100644
--- a/runtimes/google/ipc/full_test.go
+++ b/runtimes/google/ipc/full_test.go
@@ -40,9 +40,12 @@
)
var (
- errMethod = verror.Make(verror.Aborted, nil)
- clock = new(fakeClock)
- listenSpec = ipc.ListenSpec{Protocol: "tcp", Address: "127.0.0.1:0"}
+ errMethod = verror.Make(verror.Aborted, nil)
+ clock = new(fakeClock)
+ listenAddrs = ipc.ListenAddrs{{"tcp", "127.0.0.1:0"}}
+ listenWSAddrs = ipc.ListenAddrs{{"ws", "127.0.0.1:0"}, {"tcp", "127.0.0.1:0"}}
+ listenSpec = ipc.ListenSpec{Addrs: listenAddrs}
+ listenWSSpec = ipc.ListenSpec{Addrs: listenWSAddrs}
)
type fakeClock struct {
@@ -173,6 +176,10 @@
}
func startServer(t *testing.T, principal security.Principal, sm stream.Manager, ns naming.Namespace, name string, disp ipc.Dispatcher, opts ...ipc.ServerOpt) (naming.Endpoint, ipc.Server) {
+ return startServerWS(t, principal, sm, ns, name, disp, noWebsocket, opts...)
+}
+
+func startServerWS(t *testing.T, principal security.Principal, sm stream.Manager, ns naming.Namespace, name string, disp ipc.Dispatcher, shouldUseWebsocket websocketMode, opts ...ipc.ServerOpt) (naming.Endpoint, ipc.Server) {
vlog.VI(1).Info("InternalNewServer")
opts = append(opts, vc.LocalPrincipal{principal})
server, err := InternalNewServer(testContext(), sm, ns, nil, opts...)
@@ -180,7 +187,11 @@
t.Errorf("InternalNewServer failed: %v", err)
}
vlog.VI(1).Info("server.Listen")
- ep, err := server.Listen(listenSpec)
+ spec := listenSpec
+ if shouldUseWebsocket {
+ spec = listenWSSpec
+ }
+ ep, err := server.Listen(spec)
if err != nil {
t.Errorf("server.Listen failed: %v", err)
}
@@ -273,11 +284,15 @@
}
func createBundle(t *testing.T, client, server security.Principal, ts interface{}) (b bundle) {
+ return createBundleWS(t, client, server, ts, noWebsocket)
+}
+
+func createBundleWS(t *testing.T, client, server security.Principal, ts interface{}, shouldUseWebsocket websocketMode) (b bundle) {
b.sm = imanager.InternalNew(naming.FixedRoutingID(0x555555555))
b.ns = tnaming.NewSimpleNamespace()
b.name = "mountpoint/server"
if server != nil {
- b.ep, b.server = startServer(t, server, b.sm, b.ns, b.name, testServerDisp{ts})
+ b.ep, b.server = startServerWS(t, server, b.sm, b.ns, b.name, testServerDisp{ts}, shouldUseWebsocket)
}
if client != nil {
var err error
@@ -536,7 +551,7 @@
pserver = tsecurity.NewPrincipal("server")
pclient = tsecurity.NewPrincipal("client")
- b = createBundle(t, pclient, pserver, &testServer{})
+ b = createBundleWS(t, pclient, pserver, &testServer{}, shouldUseWebsocket)
)
defer b.cleanup(t)
// The server needs to recognize the client's root certificate.
@@ -1216,10 +1231,14 @@
t.Errorf("InternalNewServer failed: %v", err)
}
defer server.Stop()
- spec := listenSpec
- spec.Address = ":0"
- spec.AddressChooser = pa
+ spec := ipc.ListenSpec{
+ Addrs: ipc.ListenAddrs{{"tcp", ":0"}},
+ AddressChooser: pa,
+ }
ep, err := server.Listen(spec)
+ if err != nil {
+ t.Errorf("unexpected error: %s", err)
+ }
iep := ep.(*inaming.Endpoint)
host, _, err := net.SplitHostPort(iep.Address)
if err != nil {
@@ -1252,9 +1271,10 @@
t.Errorf("InternalNewServer failed: %v", err)
}
defer server.Stop()
- spec := listenSpec
- spec.Address = ":0"
- spec.AddressChooser = paerr
+ spec := ipc.ListenSpec{
+ Addrs: ipc.ListenAddrs{{"tcp", ":0"}},
+ AddressChooser: paerr,
+ }
ep, err := server.Listen(spec)
iep := ep.(*inaming.Endpoint)
host, _, err := net.SplitHostPort(iep.Address)
diff --git a/runtimes/google/ipc/server.go b/runtimes/google/ipc/server.go
index 9a44ec4..98b161e 100644
--- a/runtimes/google/ipc/server.go
+++ b/runtimes/google/ipc/server.go
@@ -38,17 +38,19 @@
type server struct {
sync.Mutex
- ctx context.T // context used by the server to make internal RPCs.
- streamMgr stream.Manager // stream manager to listen for new flows.
- publisher publisher.Publisher // publisher to publish mounttable mounts.
- listenerOpts []stream.ListenerOpt // listener opts passed to Listen.
- listeners map[stream.Listener]*dhcpListener // listeners created by Listen.
- disp ipc.Dispatcher // dispatcher to serve RPCs
- dispReserved ipc.Dispatcher // dispatcher for reserved methods
- active sync.WaitGroup // active goroutines we've spawned.
- stopped bool // whether the server has been stopped.
- stoppedChan chan struct{} // closed when the server has been stopped.
- preferredProtocols []string // protocols to use when resolving proxy name to endpoint.
+ ctx context.T // context used by the server to make internal RPCs.
+ streamMgr stream.Manager // stream manager to listen for new flows.
+ publisher publisher.Publisher // publisher to publish mounttable mounts.
+ listenerOpts []stream.ListenerOpt // listener opts passed to Listen.
+ listeners map[stream.Listener]struct{} // listeners created by Listen.
+ dhcpListeners map[*dhcpListener]struct{} // dhcpListeners created by Listen.
+
+ disp ipc.Dispatcher // dispatcher to serve RPCs
+ dispReserved ipc.Dispatcher // dispatcher for reserved methods
+ active sync.WaitGroup // active goroutines we've spawned.
+ stopped bool // whether the server has been stopped.
+ stoppedChan chan struct{} // closed when the server has been stopped.
+ preferredProtocols []string // protocols to use when resolving proxy name to endpoint.
ns naming.Namespace
servesMountTable bool
// TODO(cnicolaou): remove this when the publisher tracks published names
@@ -66,7 +68,7 @@
sync.Mutex
publisher *config.Publisher // publisher used to fork the stream
name string // name of the publisher stream
- ep *inaming.Endpoint // endpoint returned after listening
+ eps []*inaming.Endpoint // endpoint returned after listening
pubAddrs []ipc.Address // addresses to publish
pubPort string // port to use with the publish addresses
ch chan config.Setting // channel to receive settings over
@@ -82,14 +84,15 @@
ctx, _ = ivtrace.WithNewSpan(ctx, "NewServer")
statsPrefix := naming.Join("ipc", "server", "routing-id", streamMgr.RoutingID().String())
s := &server{
- ctx: ctx,
- streamMgr: streamMgr,
- publisher: publisher.New(ctx, ns, publishPeriod),
- listeners: make(map[stream.Listener]*dhcpListener),
- stoppedChan: make(chan struct{}),
- ns: ns,
- stats: newIPCStats(statsPrefix),
- traceStore: store,
+ ctx: ctx,
+ streamMgr: streamMgr,
+ publisher: publisher.New(ctx, ns, publishPeriod),
+ listeners: make(map[stream.Listener]struct{}),
+ dhcpListeners: make(map[*dhcpListener]struct{}),
+ stoppedChan: make(chan struct{}),
+ ns: ns,
+ stats: newIPCStats(statsPrefix),
+ traceStore: store,
}
var (
principal security.Principal
@@ -177,6 +180,7 @@
}
}
+/*
// getIPRoamingAddrs finds an appropriate set of addresss to publish
// externally and also determines if it's sensible to allow roaming.
// It returns the host address of the first suitable address that
@@ -208,38 +212,93 @@
// roaming is not desired.
return []ipc.Address{addrFromIP(ip)}, host, port, false, nil
}
+*/
-// configureEPAndRoaming configures the endpoint and roaming. In particular,
-// it fills in the Address portion of the endpoint with the appropriately
-// selected network address and creates a dhcpListener struct if roaming
-// is enabled.
-func (s *server) configureEPAndRoaming(spec ipc.ListenSpec, ep naming.Endpoint) (*dhcpListener, *inaming.Endpoint, error) {
+// getPossbileAddrs returns an appropriate set of addresses that could be used
+// to contact the supplied protocol, host, port parameters using the supplied
+// chooser function. It returns an indication of whether the supplied address
+// was fully specified or not, returning false if the address was fully
+// specified, and true if it was not.
+func getPossibleAddrs(protocol, host, port string, chooser ipc.AddressChooser) ([]ipc.Address, bool, error) {
+ ip := net.ParseIP(host)
+ if ip == nil {
+ return nil, false, fmt.Errorf("failed to parse %q as an IP host", host)
+ }
+ if ip.IsUnspecified() {
+ if chooser != nil {
+ // Need to find a usable IP address since the call to listen
+ // didn't specify one.
+ if addrs, err := netstate.GetAccessibleIPs(); err == nil {
+ if a, err := chooser(protocol, addrs); err == nil && len(a) > 0 {
+ return a, true, nil
+ }
+ }
+ }
+ // We don't have a chooser, so we just return the address the
+ // underlying system has chosen.
+ return []ipc.Address{addrFromIP(ip)}, true, nil
+ }
+ return []ipc.Address{addrFromIP(ip)}, false, nil
+}
+
+// createEndpoints creates appropriate inaming.Endpoint instances for
+// all of the externally accessible networrk addresses that can be used
+// to reach this server.
+func (s *server) createEndpoints(lep naming.Endpoint, chooser ipc.AddressChooser) ([]*inaming.Endpoint, bool, error) {
+ iep, ok := lep.(*inaming.Endpoint)
+ if !ok {
+ return nil, false, fmt.Errorf("internal type conversion error for %T", lep)
+ }
+ if !strings.HasPrefix(iep.Protocol, "tcp") &&
+ !strings.HasPrefix(iep.Protocol, "ws") {
+ // If not tcp or ws, just return the endpoint we were given.
+ return []*inaming.Endpoint{iep}, false, nil
+ }
+
+ host, port, err := net.SplitHostPort(iep.Address)
+ if err != nil {
+ return nil, false, err
+ }
+ addrs, unspecified, err := getPossibleAddrs(lep.Network(), host, port, chooser)
+ if err != nil {
+ return nil, false, err
+ }
+ ieps := make([]*inaming.Endpoint, 0, len(addrs))
+ for _, addr := range addrs {
+ n, err := inaming.NewEndpoint(lep.String())
+ if err != nil {
+ return nil, false, err
+ }
+ n.IsMountTable = s.servesMountTable
+ //n.Protocol = addr.Address().Network()
+ n.Address = net.JoinHostPort(addr.Address().String(), port)
+ ieps = append(ieps, n)
+ }
+ return ieps, unspecified, nil
+}
+
+/*
+// configureEPAndRoaming configures the endpoint by filling in its Address
+// portion with the appropriately selected network address, it also
+// returns an indication of whether this endpoint is appropriate for
+// roaming and the set of addresses that should be published.
+func (s *server) configureEPAndRoaming(spec ipc.ListenSpec, ep naming.Endpoint) (bool, []ipc.Address, *inaming.Endpoint, error) {
iep, ok := ep.(*inaming.Endpoint)
if !ok {
- return nil, nil, fmt.Errorf("internal type conversion error for %T", ep)
+ return false, nil, nil, fmt.Errorf("internal type conversion error for %T", ep)
}
- if !strings.HasPrefix(spec.Protocol, "tcp") {
- return nil, iep, nil
+ if !strings.HasPrefix(spec.Addrs[0].Protocol, "tcp") &&
+ !strings.HasPrefix(spec.Addrs[0].Protocol, "ws") {
+ return false, nil, iep, nil
}
pubAddrs, pubHost, pubPort, roaming, err := s.getIPRoamingAddrs(spec.AddressChooser, iep)
if err != nil {
- return nil, iep, err
+ return false, nil, iep, err
}
iep.Address = net.JoinHostPort(pubHost, pubPort)
- if !roaming {
- vlog.VI(2).Infof("the address %q requested for listening contained a fixed IP address which disables roaming, use :0 instead", spec.Address)
- }
- publisher := spec.StreamPublisher
- if roaming && publisher != nil {
- streamName := spec.StreamName
- ch := make(chan config.Setting)
- if _, err := publisher.ForkStream(streamName, ch); err != nil {
- return nil, iep, fmt.Errorf("failed to fork stream %q: %s", streamName, err)
- }
- return &dhcpListener{ep: iep, pubAddrs: pubAddrs, pubPort: pubPort, ch: ch, name: streamName, publisher: publisher}, iep, nil
- }
- return nil, iep, nil
+ return roaming, pubAddrs, iep, nil
}
+*/
func (s *server) Listen(listenSpec ipc.ListenSpec) (naming.Endpoint, error) {
defer vlog.LogCall()()
@@ -250,66 +309,11 @@
s.Unlock()
return nil, errServerStopped
}
- s.Unlock()
- var iep *inaming.Endpoint
- var dhcpl *dhcpListener
- var ln stream.Listener
+ useProxy := len(listenSpec.Proxy) > 0
- if len(listenSpec.Address) > 0 {
- // Listen if we have a local address to listen on. Some situations
- // just need a proxy (e.g. a browser extension).
- tmpln, lep, err := s.streamMgr.Listen(listenSpec.Protocol, listenSpec.Address, s.listenerOpts...)
- if err != nil {
- vlog.Errorf("ipc: Listen on %s failed: %s", listenSpec, err)
- return nil, err
- }
- ln = tmpln
- if tmpdhcpl, tmpiep, err := s.configureEPAndRoaming(listenSpec, lep); err != nil {
- ln.Close()
- return nil, err
- } else {
- dhcpl = tmpdhcpl
- iep = tmpiep
- }
- }
-
- s.Lock()
- defer s.Unlock()
- if s.stopped {
- ln.Close()
- return nil, errServerStopped
- }
-
- if dhcpl != nil {
- // We have a goroutine to listen for dhcp changes.
- s.active.Add(1)
- go func() {
- s.dhcpLoop(dhcpl)
- s.active.Done()
- }()
- s.listeners[ln] = dhcpl
- } else if ln != nil {
- s.listeners[ln] = nil
- }
-
- if iep != nil {
- // We have a goroutine per listener to accept new flows.
- // Each flow is served from its own goroutine.
- s.active.Add(1)
- go func() {
- s.listenLoop(ln, iep)
- s.active.Done()
- }()
- s.publisher.AddServer(s.publishEP(iep, s.servesMountTable), s.servesMountTable)
- if strings.HasPrefix(iep.Protocol, "tcp") {
- epCopy := *iep
- epCopy.Protocol = "ws"
- s.publisher.AddServer(s.publishEP(&epCopy, s.servesMountTable), s.servesMountTable)
- }
- }
-
- if len(listenSpec.Proxy) > 0 {
+ // Start the proxy as early as possible.
+ if useProxy {
// We have a goroutine for listening on proxy connections.
s.active.Add(1)
go func() {
@@ -317,14 +321,100 @@
s.active.Done()
}()
}
- return iep, nil
-}
+ s.Unlock()
-// TODO(cnicolaou): Take this out or make the ServesMountTable bit work in the endpoint.
-func (s *server) publishEP(ep *inaming.Endpoint, servesMountTable bool) string {
- var name string
- ep.IsMountTable = servesMountTable
- return naming.JoinAddressName(ep.String(), name)
+ var ieps []*inaming.Endpoint
+
+ type lnInfo struct {
+ ln stream.Listener
+ ep naming.Endpoint
+ }
+ linfo := []lnInfo{}
+ closeAll := func(lni []lnInfo) {
+ for _, li := range lni {
+ li.ln.Close()
+ }
+ }
+
+ roaming := false
+ for _, addr := range listenSpec.Addrs {
+ if len(addr.Address) > 0 {
+ // Listen if we have a local address to listen on. Some situations
+ // just need a proxy (e.g. a browser extension).
+ tmpln, lep, err := s.streamMgr.Listen(addr.Protocol, addr.Address, s.listenerOpts...)
+ if err != nil {
+ closeAll(linfo)
+ vlog.Errorf("ipc: Listen on %s failed: %s", addr, err)
+ return nil, err
+ }
+ linfo = append(linfo, lnInfo{tmpln, lep})
+ tmpieps, tmpRoaming, err := s.createEndpoints(lep, listenSpec.AddressChooser)
+ if err != nil {
+ closeAll(linfo)
+ return nil, err
+ }
+ ieps = append(ieps, tmpieps...)
+ if tmpRoaming {
+ roaming = true
+ }
+ }
+ }
+
+ // TODO(cnicolaou): write a test for all of these error cases.
+ if len(ieps) == 0 {
+ if useProxy {
+ return nil, nil
+ }
+ // no proxy.
+ if len(listenSpec.Addrs) > 0 {
+ return nil, fmt.Errorf("no endpoints")
+ }
+ return nil, fmt.Errorf("no proxy and no addresses requested")
+ }
+
+ // TODO(cnicolaou): return all of the eps and their errors....
+ s.Lock()
+ defer s.Unlock()
+ if s.stopped {
+ closeAll(linfo)
+ return nil, errServerStopped
+ }
+
+ if roaming && listenSpec.StreamPublisher != nil {
+ // TODO(cnicolaou): renable roaming in a followup CL.
+ /*
+ var dhcpl *dhcpListener
+ streamName := listenSpec.StreamName
+ ch := make(chan config.Setting)
+ if _, err := publisher.ForkStream(streamName, ch); err != nil {
+ return ieps[0], fmt.Errorf("failed to fork stream %q: %s", streamName, err)
+ }
+ dhcpl = &dhcpListener{eps: ieps, pubAddrs: pubAddrs, ch: ch, name: streamName, publisher: publisher}, iep, nil
+ // We have a goroutine to listen for dhcp changes.
+ s.active.Add(1)
+ go func() {
+ s.dhcpLoop(dhcpl)
+ s.active.Done()
+ }()
+ s.dhcpListeners[dhcpl] = struct{}{}
+ */
+ }
+
+ for _, li := range linfo {
+ s.listeners[li.ln] = struct{}{}
+ // We have a goroutine per listener to accept new flows.
+ // Each flow is served from its own goroutine.
+ s.active.Add(1)
+ go func(ln stream.Listener, ep naming.Endpoint) {
+ s.listenLoop(ln, ep)
+ s.active.Done()
+ }(li.ln, li.ep)
+ }
+ for _, iep := range ieps {
+ s.publisher.AddServer(naming.JoinAddressName(iep.String(), ""), s.servesMountTable)
+ }
+
+ return ieps[0], nil
}
func (s *server) reconnectAndPublishProxy(proxy string) (*inaming.Endpoint, stream.Listener, error) {
@@ -342,16 +432,9 @@
return nil, nil, fmt.Errorf("internal type conversion error for %T", ep)
}
s.Lock()
- s.listeners[ln] = nil
+ s.listeners[ln] = struct{}{}
s.Unlock()
- s.publisher.AddServer(s.publishEP(iep, s.servesMountTable), s.servesMountTable)
-
- if strings.HasPrefix(iep.Protocol, "tcp") {
- epCopy := *iep
- epCopy.Protocol = "ws"
- s.publisher.AddServer(s.publishEP(&epCopy, s.servesMountTable), s.servesMountTable)
- }
-
+ s.publisher.AddServer(naming.JoinAddressName(iep.String(), ""), s.servesMountTable)
return iep, ln, nil
}
@@ -368,7 +451,6 @@
// the initial connection maybe have failed, but we enter the retry
// loop anyway so that we will continue to try and connect to the
// proxy.
-
s.Lock()
if s.stopped {
s.Unlock()
@@ -381,12 +463,7 @@
s.listenLoop(ln, iep)
// The listener is done, so:
// (1) Unpublish its name
- s.publisher.RemoveServer(s.publishEP(iep, s.servesMountTable))
- if strings.HasPrefix(iep.Protocol, "tcp") {
- iepCopy := *iep
- iepCopy.Protocol = "ws"
- s.publisher.RemoveServer(s.publishEP(&iepCopy, s.servesMountTable))
- }
+ s.publisher.RemoveServer(naming.JoinAddressName(iep.String(), ""))
}
s.Lock()
@@ -420,7 +497,7 @@
}
func (s *server) listenLoop(ln stream.Listener, ep naming.Endpoint) {
- defer vlog.VI(1).Infof("ipc: Stopped listening on %v", ep)
+ defer vlog.VI(1).Infof("ipc: Stopped listening on %s", ep)
var calls sync.WaitGroup
defer func() {
calls.Wait()
@@ -453,13 +530,14 @@
}
}
+/*
func (s *server) applyChange(dhcpl *dhcpListener, addrs []net.Addr, fn func(string)) {
dhcpl.Lock()
defer dhcpl.Unlock()
for _, a := range addrs {
if ip := netstate.AsIP(a); ip != nil {
dhcpl.ep.Address = net.JoinHostPort(ip.String(), dhcpl.pubPort)
- fn(s.publishEP(dhcpl.ep, s.servesMountTable))
+ fn(dhcpl.ep.String())
}
}
}
@@ -472,7 +550,7 @@
// Publish all of the addresses
for _, pubAddr := range dhcpl.pubAddrs {
ep.Address = net.JoinHostPort(pubAddr.Address().String(), dhcpl.pubPort)
- s.publisher.AddServer(s.publishEP(&ep, s.servesMountTable), s.servesMountTable)
+ s.publisher.AddServer(naming.JoinAddressName(ep.String(), ""), s.servesMountTable)
}
for setting := range dhcpl.ch {
@@ -501,6 +579,7 @@
}
}
}
+*/
func (s *server) Serve(name string, obj interface{}, authorizer security.Authorizer) error {
if obj == nil {
@@ -607,16 +686,16 @@
nListeners := len(s.listeners)
errCh := make(chan error, nListeners)
- for ln, dhcpl := range s.listeners {
+ for ln, _ := range s.listeners {
go func(ln stream.Listener) {
errCh <- ln.Close()
}(ln)
- if dhcpl != nil {
- dhcpl.Lock()
- dhcpl.publisher.CloseFork(dhcpl.name, dhcpl.ch)
- dhcpl.ch <- config.NewBool("EOF", "stop", true)
- dhcpl.Unlock()
- }
+ }
+ for dhcpl, _ := range s.dhcpListeners {
+ dhcpl.Lock()
+ dhcpl.publisher.CloseFork(dhcpl.name, dhcpl.ch)
+ dhcpl.ch <- config.NewBool("EOF", "stop", true)
+ dhcpl.Unlock()
}
s.Unlock()
diff --git a/runtimes/google/ipc/server_test.go b/runtimes/google/ipc/server_test.go
index 98cacab..9c8c126 100644
--- a/runtimes/google/ipc/server_test.go
+++ b/runtimes/google/ipc/server_test.go
@@ -2,7 +2,6 @@
import (
"fmt"
- "io"
"os"
"reflect"
"sort"
@@ -15,9 +14,10 @@
"veyron.io/veyron/veyron/lib/expect"
"veyron.io/veyron/veyron/lib/modules"
+ "veyron.io/veyron/veyron/lib/modules/core"
tsecurity "veyron.io/veyron/veyron/lib/testutil/security"
+ _ "veyron.io/veyron/veyron/lib/websocket"
imanager "veyron.io/veyron/veyron/runtimes/google/ipc/stream/manager"
- "veyron.io/veyron/veyron/runtimes/google/ipc/stream/proxy"
"veyron.io/veyron/veyron/runtimes/google/ipc/stream/vc"
inaming "veyron.io/veyron/veyron/runtimes/google/naming"
tnaming "veyron.io/veyron/veyron/runtimes/google/testing/mocks/naming"
@@ -27,24 +27,22 @@
// connection to the server if the server dies and comes back (on the same
// endpoint).
func TestReconnect(t *testing.T) {
- b := createBundle(t, tsecurity.NewPrincipal("client"), nil, nil) // We only need the client from the bundle.
+ principal := tsecurity.NewPrincipal("client")
+ b := createBundle(t, principal, nil, nil) // We only need the client from the bundle.
defer b.cleanup(t)
- sh, err := modules.NewShell(nil)
+ sh, err := modules.NewShell(principal)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
defer sh.Cleanup(os.Stderr, os.Stderr)
- server, err := sh.Start("runServer", nil, "127.0.0.1:0")
+ server, err := sh.Start(core.EchoServerCommand, nil, "--", "--veyron.tcp.address=127.0.0.1:0", "mymessage", "")
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
session := expect.NewSession(t, server.Stdout(), time.Minute)
- addr := session.ReadLine()
- ep, err := inaming.NewEndpoint(addr)
- if err != nil {
- t.Fatalf("inaming.NewEndpoint(%q): %v", addr, err)
- }
- serverName := naming.JoinAddressName(ep.String(), "suffix")
+ serverName := session.ExpectVar("NAME")
+ serverEP := session.ExpectVar("ADDR")
+ ep, _ := inaming.NewEndpoint(serverEP)
makeCall := func() (string, error) {
ctx, _ := testContext().WithDeadline(time.Now().Add(5 * time.Second))
call, err := b.client.StartCall(ctx, serverName, "Echo", []interface{}{"bratman"})
@@ -52,12 +50,13 @@
return "", fmt.Errorf("START: %s", err)
}
var result string
- if err = call.Finish(&result); err != nil {
+ var rerr error
+ if err = call.Finish(&result, &rerr); err != nil {
return "", err
}
return result, nil
}
- expected := `method:"Echo",suffix:"suffix",arg:"bratman"`
+ expected := "mymessage: bratman\n"
if result, err := makeCall(); err != nil || result != expected {
t.Errorf("Got (%q, %v) want (%q, nil)", result, err, expected)
}
@@ -72,82 +71,77 @@
}
// Resurrect the server with the same address, verify client
- // re-establishes the connection.
- server, err = sh.Start("runServer", nil, addr)
+ // re-establishes the connection. This is probably racy if another
+ // process grabs the port. This seems unlikely since the kernel cycles
+ // through the entire port space.
+ server, err = sh.Start(core.EchoServerCommand, nil, "--", "--veyron.tcp.address="+ep.Address, "mymessage again", "")
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
session = expect.NewSession(t, server.Stdout(), time.Minute)
- defer server.Shutdown(nil, nil)
- session.Expect(addr)
+ defer server.Shutdown(os.Stderr, os.Stderr)
+ expected = "mymessage again: bratman\n"
if result, err := makeCall(); err != nil || result != expected {
t.Errorf("Got (%q, %v) want (%q, nil)", result, err, expected)
}
+
}
type proxyHandle struct {
- ns naming.Namespace
- process modules.Handle
- session *expect.Session
- mount string
- sh *modules.Shell
+ ns naming.Namespace
+ sh *modules.Shell
+ proxy modules.Handle
+ name string
}
-func (h *proxyHandle) Start(t *testing.T) error {
+func (h *proxyHandle) Start(t *testing.T, args ...string) error {
sh, err := modules.NewShell(nil)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
- server, err := sh.Start("runProxy", nil)
+ h.sh = sh
+ p, err := sh.Start(core.ProxyServerCommand, nil, args...)
if err != nil {
+ p.Shutdown(os.Stderr, os.Stderr)
t.Fatalf("unexpected error: %s", err)
}
- h.process = server
- h.session = expect.NewSession(t, server.Stdout(), time.Minute)
- h.mount = h.session.ReadLine()
- h.sh = sh
- if err := h.session.Error(); err != nil {
- return err
- }
- if err := h.ns.Mount(testContext(), "proxy", h.mount, time.Hour); err != nil {
- return err
- }
- return nil
+ h.proxy = p
+ s := expect.NewSession(t, p.Stdout(), time.Minute)
+ s.ReadLine()
+ h.name = s.ExpectVar("PROXY_NAME")
+ return h.ns.Mount(testContext(), "proxy", h.name, time.Hour)
}
func (h *proxyHandle) Stop() error {
- if h.process == nil {
+ defer h.sh.Cleanup(os.Stderr, os.Stderr)
+ if err := h.proxy.Shutdown(os.Stderr, os.Stderr); err != nil {
+ return err
+ }
+ if len(h.name) == 0 {
return nil
}
- h.process.Shutdown(os.Stderr, os.Stderr)
- h.process = nil
- if len(h.mount) == 0 {
- return nil
- }
- h.sh.Cleanup(nil, nil)
- return h.ns.Unmount(testContext(), "proxy", h.mount)
+ return h.ns.Unmount(testContext(), "proxy", h.name)
}
func TestProxyOnly(t *testing.T) {
listenSpec := ipc.ListenSpec{Proxy: "proxy"}
- testProxy(t, listenSpec)
+ testProxy(t, listenSpec, "--", "--veyron.tcp.address=127.0.0.1:0")
}
func TestProxy(t *testing.T) {
- listenSpec := ipc.ListenSpec{Protocol: "tcp", Address: "127.0.0.1:0", Proxy: "proxy"}
- testProxy(t, listenSpec)
+ proxyListenSpec := listenSpec
+ proxyListenSpec.Proxy = "proxy"
+ testProxy(t, proxyListenSpec, "--", "--veyron.tcp.address=127.0.0.1:0")
}
-func addrOnly(name string) string {
- addr, _ := naming.SplitAddressName(name)
- return addr
+func TestWSProxy(t *testing.T) {
+ proxyListenSpec := listenSpec
+ proxyListenSpec.Proxy = "proxy"
+ // The proxy uses websockets only, but the server is using tcp.
+ testProxy(t, proxyListenSpec, "--", "--veyron.tcp.protocol=ws", "--veyron.tcp.address=127.0.0.1:0")
}
-func addWSName(name string) []string {
- return []string{name, strings.Replace(name, "@tcp@", "@ws@", 1)}
-}
-
-func testProxy(t *testing.T, spec ipc.ListenSpec) {
+func testProxy(t *testing.T, spec ipc.ListenSpec, args ...string) {
sm := imanager.InternalNew(naming.FixedRoutingID(0x555555555))
ns := tnaming.NewSimpleNamespace()
client, err := InternalNewClient(sm, ns, vc.LocalPrincipal{tsecurity.NewPrincipal("client")})
@@ -163,7 +157,7 @@
// If no address is specified then we'll only 'listen' via
// the proxy.
- hasLocalListener := len(spec.Address) != 0
+ hasLocalListener := len(spec.Addrs) > 0 && len(spec.Addrs[0].Address) != 0
name := "mountpoint/server/suffix"
makeCall := func() (string, error) {
@@ -182,7 +176,7 @@
return result, nil
}
proxy := &proxyHandle{ns: ns}
- if err := proxy.Start(t); err != nil {
+ if err := proxy.Start(t, args...); err != nil {
t.Fatal(err)
}
defer proxy.Stop()
@@ -190,7 +184,7 @@
if len(addrs) != 1 {
t.Fatalf("failed to lookup proxy")
}
- proxyEP := addrOnly(addrs[0])
+ proxyEP, _ := naming.SplitAddressName(addrs[0])
ep, err := server.Listen(spec)
if err != nil {
@@ -203,24 +197,29 @@
ch := make(chan struct{})
// Proxy connections are started asynchronously, so we need to wait..
waitfor := func(expect int) {
+ then := time.Now().Add(time.Minute)
for {
addrs, _ := ns.Resolve(testContext(), name)
if len(addrs) == expect {
close(ch)
return
}
+ if time.Now().After(then) {
+ t.Fatalf("timed out")
+ }
time.Sleep(100 * time.Millisecond)
}
}
proxiedEP, err := inaming.NewEndpoint(proxyEP)
+
if err != nil {
- t.Fatalf("unexpected error: %s", err)
+ t.Fatalf("unexpected error for %q: %s", proxyEP, err)
}
proxiedEP.RID = naming.FixedRoutingID(0x555555555)
- expectedEndpoints := addWSName(proxiedEP.String())
+ expectedEndpoints := []string{proxiedEP.String()}
if hasLocalListener {
- expectedEndpoints = append(expectedEndpoints, addWSName(ep.String())...)
+ expectedEndpoints = append(expectedEndpoints, ep.String())
}
// Proxy connetions are created asynchronously, so we wait for the
@@ -234,7 +233,8 @@
got := []string{}
for _, s := range verifyMount(t, ns, name) {
- got = append(got, addrOnly(s))
+ addr, _ := naming.SplitAddressName(s)
+ got = append(got, addr)
}
sort.Strings(got)
sort.Strings(expectedEndpoints)
@@ -248,9 +248,14 @@
// the local endpoint from the mount table entry! We have to remove both
// the tcp and the websocket address.
sep := ep.String()
- wsep := strings.Replace(sep, "@tcp@", "@ws@", 1)
+ //wsep := strings.Replace(sep, "@tcp@", "@ws@", 1)
ns.Unmount(testContext(), "mountpoint/server", naming.JoinAddressName(sep, ""))
- ns.Unmount(testContext(), "mountpoint/server", naming.JoinAddressName(wsep, ""))
+ //ns.Unmount(testContext(), "mountpoint/server", naming.JoinAddressName(wsep, ""))
+ }
+
+ addrs = verifyMount(t, ns, name)
+ if len(addrs) != 1 {
+ t.Fatalf("failed to lookup proxy")
}
// Proxied endpoint should be published and RPC should succeed (through proxy)
@@ -265,6 +270,7 @@
if result, err := makeCall(); err == nil || (!strings.HasPrefix(err.Error(), "RESOLVE") && !strings.Contains(err.Error(), "EOF")) {
t.Fatalf(`Got (%v, %v) want ("", "RESOLVE: <err>" or "EOF") as proxy is down`, result, err)
}
+
for {
if _, err := ns.Resolve(testContext(), name); err != nil {
break
@@ -274,64 +280,27 @@
verifyMountMissing(t, ns, name)
// Proxy restarts, calls should eventually start succeeding.
- if err := proxy.Start(t); err != nil {
+ if err := proxy.Start(t, args...); err != nil {
t.Fatal(err)
}
+ retries := 0
for {
if result, err := makeCall(); err == nil {
if result != expected {
t.Errorf("Got (%v, %v) want (%v, nil)", result, err, expected)
}
break
+ } else {
+ retries++
+ if retries > 10 {
+ t.Fatalf("Failed after 10 attempts: err: %s", err)
+ }
}
}
}
-func runServer(stdin io.Reader, stdout, stderr io.Writer, env map[string]string, args ...string) error {
- mgr := imanager.InternalNew(naming.FixedRoutingID(0x1111111))
- ns := tnaming.NewSimpleNamespace()
- server, err := InternalNewServer(testContext(), mgr, ns, nil, vc.LocalPrincipal{tsecurity.NewPrincipal("server")})
- if err != nil {
- return fmt.Errorf("InternalNewServer failed: %v", err)
- }
- disp := testServerDisp{new(testServer)}
- if err := server.ServeDispatcher("server", disp); err != nil {
- return fmt.Errorf("server.Register failed: %v", err)
- }
- spec := listenSpec
- spec.Address = args[1]
- ep, err := server.Listen(spec)
- if err != nil {
- return fmt.Errorf("server.Listen failed: %v", err)
- }
- fmt.Fprintf(stdout, "%s\n", ep.Addr())
- // parent process should explicitly shut us down by closing stdin.
- modules.WaitForEOF(stdin)
- return nil
-}
-
-func runProxy(stdin io.Reader, stdout, stderr io.Writer, env map[string]string, args ...string) error {
- rid, err := naming.NewRoutingID()
- if err != nil {
- return err
- }
- proxy, err := proxy.New(rid, tsecurity.NewPrincipal("proxy"), "tcp", "127.0.0.1:0", "")
- if err != nil {
- return err
- }
- fmt.Fprintf(stdout, "/%s\n", proxy.Endpoint().String())
- // parent process should explicitly shut us down by closing stdin.
- modules.WaitForEOF(stdin)
- return nil
-}
-
// Required by modules framework.
func TestHelperProcess(t *testing.T) {
modules.DispatchInTest()
}
-
-func init() {
- modules.RegisterChild("runServer", "[address]", runServer)
- modules.RegisterChild("runProxy", "", runProxy)
-}
diff --git a/runtimes/google/ipc/simple_test.go b/runtimes/google/ipc/simple_test.go
new file mode 100644
index 0000000..f2853c5
--- /dev/null
+++ b/runtimes/google/ipc/simple_test.go
@@ -0,0 +1,130 @@
+package ipc_test
+
+import (
+ "io"
+ "testing"
+ "time"
+
+ "veyron.io/veyron/veyron2/ipc"
+ verror "veyron.io/veyron/veyron2/verror2"
+)
+
+type simple struct {
+ done <-chan struct{}
+}
+
+func (s *simple) Sleep(call ipc.ServerContext) error {
+ select {
+ case <-s.done:
+ case <-time.After(time.Hour):
+ }
+ return nil
+}
+
+func (s *simple) Ping(call ipc.ServerContext) (string, error) {
+ return "pong", nil
+}
+
+func (s *simple) Source(call ipc.ServerCall, start int) error {
+ i := start
+ backoff := 25 * time.Millisecond
+ for {
+ select {
+ case <-s.done:
+ return nil
+ case <-time.After(backoff):
+ call.Send(i)
+ i++
+ }
+ backoff *= 2
+ }
+}
+
+func (s *simple) Sink(call ipc.ServerCall) (int, error) {
+ i := 0
+ for {
+ if err := call.Recv(&i); err != nil {
+ return i, err
+ }
+ }
+}
+
+func (s *simple) Inc(call ipc.ServerCall, inc int) (int, error) {
+ i := 0
+ for {
+ if err := call.Recv(&i); err != nil {
+ if err == io.EOF {
+ // TODO(cnicolaou): this should return a verror, i.e.
+ // verror.Make(verror.EOF, call), but for now we
+ // return an io.EOF
+ return i, io.EOF
+ }
+ return i, err
+ }
+ call.Send(i + inc)
+ }
+}
+
+func TestSimpleRPC(t *testing.T) {
+ name, fn := initServer(t, r)
+ defer fn()
+
+ client := r.Client()
+ call, err := client.StartCall(r.NewContext(), name, "Ping", nil)
+ if err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+ response := ""
+ var verr error
+ err = call.Finish(&response, &verr)
+ if err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+ if got, want := response, "pong"; got != want {
+ t.Fatalf("got %q, want %q", got, want)
+ }
+}
+
+func TestSimpleStreaming(t *testing.T) {
+ name, fn := initServer(t, r)
+ defer fn()
+
+ ctx := r.NewContext()
+ inc := 1
+ call, err := r.Client().StartCall(ctx, name, "Inc", []interface{}{inc})
+ if err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+
+ want := 10
+ for i := 0; i <= want; i++ {
+ if err := call.Send(i); err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+ got := -1
+ if err = call.Recv(&got); err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+ if want := i + inc; got != want {
+ t.Fatalf("got %d, want %d")
+ }
+ }
+ call.CloseSend()
+ final := -1
+ verr := call.Finish(&final, &err)
+ if verr != nil {
+ t.Fatalf("unexpected error: %s", verr)
+
+ }
+ if !verror.Is(err, verror.Unknown.ID) || err.Error() != `veyron.io/veyron/veyron2/verror.Unknown: EOF` {
+ t.Errorf("wrong error: %#v", err)
+ }
+ /* TODO(cnicolaou): use this when verror2/vom transition is done.
+ if err != nil && !verror.Is(err, verror.EOF.ID) {
+ t.Fatalf("unexpected error: %#v", err)
+ }
+ */
+ if got := final; got != want {
+ t.Fatalf("got %d, want %d")
+ }
+}
diff --git a/runtimes/google/ipc/sort_endpoints.go b/runtimes/google/ipc/sort_endpoints.go
index 48bcd88..312be8c 100644
--- a/runtimes/google/ipc/sort_endpoints.go
+++ b/runtimes/google/ipc/sort_endpoints.go
@@ -55,12 +55,13 @@
name := server
address, suffix := naming.SplitAddressName(name)
if len(address) == 0 {
- errs.add(fmt.Errorf("%q is not a rooted name", name))
- continue
+ // Maybe it's not a rooted endpoint, just a bare one.
+ address = name
+ suffix = ""
}
iep, err := inaming.NewEndpoint(address)
if err != nil {
- errs.add(fmt.Errorf("%q: %s", name, err))
+ errs.add(fmt.Errorf("failed to parse %q: %s", name, err))
continue
}
if err = version.CheckCompatibility(iep); err != nil {
diff --git a/runtimes/google/ipc/stream/manager/manager.go b/runtimes/google/ipc/stream/manager/manager.go
index a2ad073..6b782a0 100644
--- a/runtimes/google/ipc/stream/manager/manager.go
+++ b/runtimes/google/ipc/stream/manager/manager.go
@@ -15,7 +15,6 @@
"veyron.io/veyron/veyron2/vlog"
"veyron.io/veyron/veyron/lib/stats"
- "veyron.io/veyron/veyron/lib/websocket"
"veyron.io/veyron/veyron/runtimes/google/ipc/stream/crypto"
"veyron.io/veyron/veyron/runtimes/google/ipc/stream/vif"
"veyron.io/veyron/veyron/runtimes/google/ipc/version"
@@ -180,16 +179,6 @@
return nil, nil, errShutDown
}
- // If the protocol is tcp, we add the listener that supports both tcp and websocket
- // so that javascript can talk to this server.
- if strings.HasPrefix(protocol, "tcp") {
- wsln, err := websocket.NewListener(netln)
- if err != nil {
- netln.Close()
- return nil, nil, err
- }
- netln = wsln
- }
ln := newNetListener(m, netln, opts)
m.listeners[ln] = true
m.muListeners.Unlock()
diff --git a/runtimes/google/ipc/stream/manager/manager_test.go b/runtimes/google/ipc/stream/manager/manager_test.go
index aa66c0d..dc2b32d 100644
--- a/runtimes/google/ipc/stream/manager/manager_test.go
+++ b/runtimes/google/ipc/stream/manager/manager_test.go
@@ -42,19 +42,15 @@
modules.RegisterChild("runServer", "", runServer)
}
-func testSimpleFlow(t *testing.T, useWebsocket bool) {
+func testSimpleFlow(t *testing.T, protocol string) {
server := InternalNew(naming.FixedRoutingID(0x55555555))
client := InternalNew(naming.FixedRoutingID(0xcccccccc))
- ln, ep, err := server.Listen("tcp", "127.0.0.1:0")
+ ln, ep, err := server.Listen(protocol, "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
- if useWebsocket {
- ep.(*inaming.Endpoint).Protocol = "ws"
- }
-
data := "the dark knight rises"
var clientVC stream.VC
var clientF1 stream.Flow
@@ -131,11 +127,11 @@
}
func TestSimpleFlow(t *testing.T) {
- testSimpleFlow(t, false)
+ testSimpleFlow(t, "tcp")
}
func TestSimpleFlowWS(t *testing.T) {
- testSimpleFlow(t, true)
+ testSimpleFlow(t, "ws")
}
func TestConnectionTimeout(t *testing.T) {
@@ -159,7 +155,7 @@
}
}
-func testAuthenticatedByDefault(t *testing.T, useWebsocket bool) {
+func testAuthenticatedByDefault(t *testing.T, protocol string) {
var (
server = InternalNew(naming.FixedRoutingID(0x55555555))
client = InternalNew(naming.FixedRoutingID(0xcccccccc))
@@ -171,13 +167,10 @@
)
// VCSecurityLevel is intentionally not provided to Listen - to test
// default behavior.
- ln, ep, err := server.Listen("tcp", "127.0.0.1:0", serverPrincipal)
+ ln, ep, err := server.Listen(protocol, "127.0.0.1:0", serverPrincipal)
if err != nil {
t.Fatal(err)
}
- if useWebsocket {
- ep.(*inaming.Endpoint).Protocol = "ws"
- }
errs := make(chan error)
@@ -227,11 +220,11 @@
}
func TestAuthenticatedByDefault(t *testing.T) {
- testAuthenticatedByDefault(t, false)
+ testAuthenticatedByDefault(t, "tcp")
}
func TestAuthenticatedByDefaultWS(t *testing.T) {
- testAuthenticatedByDefault(t, true)
+ testAuthenticatedByDefault(t, "ws")
}
func numListeners(m stream.Manager) int { return len(m.(*manager).listeners) }
@@ -277,35 +270,20 @@
}
func TestCloseListener(t *testing.T) {
- server := InternalNew(naming.FixedRoutingID(0x5e97e9))
-
- ln, ep, err := server.Listen("tcp", "127.0.0.1:0")
- if err != nil {
- t.Fatal(err)
- }
- // Server will just listen for flows and close them.
- go acceptLoop(ln)
- client := InternalNew(naming.FixedRoutingID(0xc1e41))
- if _, err = client.Dial(ep); err != nil {
- t.Fatal(err)
- }
- ln.Close()
- client = InternalNew(naming.FixedRoutingID(0xc1e42))
- if _, err := client.Dial(ep); err == nil {
- t.Errorf("client.Dial(%q) should have failed", ep)
- }
+ testCloseListener(t, "tcp")
}
func TestCloseListenerWS(t *testing.T) {
+ testCloseListener(t, "ws")
+}
+
+func testCloseListener(t *testing.T, protocol string) {
server := InternalNew(naming.FixedRoutingID(0x5e97e9))
- ln, ep, err := server.Listen("tcp", "127.0.0.1:0")
+ ln, ep, err := server.Listen(protocol, "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
-
- ep.(*inaming.Endpoint).Protocol = "ws"
-
// Server will just listen for flows and close them.
go acceptLoop(ln)
client := InternalNew(naming.FixedRoutingID(0xc1e41))
@@ -340,41 +318,22 @@
}
func TestShutdownEndpoint(t *testing.T) {
- server := InternalNew(naming.FixedRoutingID(0x55555555))
- client := InternalNew(naming.FixedRoutingID(0xcccccccc))
-
- ln, ep, err := server.Listen("tcp", "127.0.0.1:0")
- if err != nil {
- t.Fatal(err)
- }
-
- // Server will just listen for flows and close them.
- go acceptLoop(ln)
-
- vc, err := client.Dial(ep)
- if err != nil {
- t.Fatal(err)
- }
- if f, err := vc.Connect(); f == nil || err != nil {
- t.Errorf("vc.Connect failed: (%v, %v)", f, err)
- }
- client.ShutdownEndpoint(ep)
- if f, err := vc.Connect(); f != nil || err == nil {
- t.Errorf("vc.Connect unexpectedly succeeded: (%v, %v)", f, err)
- }
+ testShutdownEndpoint(t, "tcp")
}
func TestShutdownEndpointWS(t *testing.T) {
+ testShutdownEndpoint(t, "ws")
+}
+
+func testShutdownEndpoint(t *testing.T, protocol string) {
server := InternalNew(naming.FixedRoutingID(0x55555555))
client := InternalNew(naming.FixedRoutingID(0xcccccccc))
- ln, ep, err := server.Listen("tcp", "127.0.0.1:0")
+ ln, ep, err := server.Listen(protocol, "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
- ep.(*inaming.Endpoint).Protocol = "ws"
-
// Server will just listen for flows and close them.
go acceptLoop(ln)
@@ -410,7 +369,7 @@
}
*/
-func testMultipleVCs(t *testing.T, useWebsocket bool) {
+func testMultipleVCs(t *testing.T, protocol string) {
server := InternalNew(naming.FixedRoutingID(0x55555555))
client := InternalNew(naming.FixedRoutingID(0xcccccccc))
@@ -419,13 +378,10 @@
// Have the server read from each flow and write to rchan.
rchan := make(chan string)
- ln, ep, err := server.Listen("tcp", "127.0.0.1:0")
+ ln, ep, err := server.Listen(protocol, "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
- if useWebsocket {
- ep.(*inaming.Endpoint).Protocol = "ws"
- }
read := func(flow stream.Flow, c chan string) {
var buf bytes.Buffer
@@ -494,11 +450,11 @@
}
func TestMultipleVCs(t *testing.T) {
- testMultipleVCs(t, false)
+ testMultipleVCs(t, "tcp")
}
func TestMultipleVCsWS(t *testing.T) {
- testMultipleVCs(t, true)
+ testMultipleVCs(t, "ws")
}
func TestAddressResolution(t *testing.T) {
@@ -543,66 +499,31 @@
}
func TestServerRestartDuringClientLifetime(t *testing.T) {
- client := InternalNew(naming.FixedRoutingID(0xcccccccc))
- sh, err := modules.NewShell(nil)
- if err != nil {
- t.Fatalf("unexpected error: %s", err)
- }
- defer sh.Cleanup(nil, nil)
- h, err := sh.Start("runServer", nil, "127.0.0.1:0")
- if err != nil {
- t.Fatalf("unexpected error: %s", err)
- }
- s := expect.NewSession(t, h.Stdout(), time.Minute)
- addr := s.ReadLine()
-
- ep, err := inaming.NewEndpoint(addr)
- if err != nil {
- t.Fatalf("inaming.NewEndpoint(%q): %v", addr, err)
- }
- if _, err := client.Dial(ep); err != nil {
- t.Fatal(err)
- }
- h.Shutdown(nil, os.Stderr)
-
- // A new VC cannot be created since the server is dead
- if _, err := client.Dial(ep); err == nil {
- t.Fatal("Expected client.Dial to fail since server is dead")
- }
-
- h, err = sh.Start("runServer", nil, addr)
- if err != nil {
- t.Fatalf("unexpected error: %s", err)
- }
- s = expect.NewSession(t, h.Stdout(), time.Minute)
- // Restarting the server, listening on the same address as before
- if addr2 := s.ReadLine(); addr2 != addr || err != nil {
- t.Fatalf("Got (%q, %v) want (%q, nil)", addr2, err, addr)
- }
- if _, err := client.Dial(ep); err != nil {
- t.Fatal(err)
- }
+ testServerRestartDuringClientLifetime(t, "tcp")
}
func TestServerRestartDuringClientLifetimeWS(t *testing.T) {
+ testServerRestartDuringClientLifetime(t, "ws")
+}
+
+func testServerRestartDuringClientLifetime(t *testing.T, protocol string) {
client := InternalNew(naming.FixedRoutingID(0xcccccccc))
sh, err := modules.NewShell(nil)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
defer sh.Cleanup(nil, nil)
- h, err := sh.Start("runServer", nil, "127.0.0.1:0")
+ h, err := sh.Start("runServer", nil, protocol, "127.0.0.1:0")
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
s := expect.NewSession(t, h.Stdout(), time.Minute)
addr := s.ReadLine()
- ep, err := inaming.NewEndpoint(addr)
+ ep, err := inaming.NewEndpoint(naming.FormatEndpoint(protocol, addr))
if err != nil {
t.Fatalf("inaming.NewEndpoint(%q): %v", addr, err)
}
- ep.Protocol = "ws"
if _, err := client.Dial(ep); err != nil {
t.Fatal(err)
}
@@ -613,7 +534,7 @@
t.Fatal("Expected client.Dial to fail since server is dead")
}
- h, err = sh.Start("runServer", nil, addr)
+ h, err = sh.Start("runServer", nil, protocol, addr)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
@@ -634,7 +555,7 @@
func runServer(stdin io.Reader, stdout, stderr io.Writer, env map[string]string, args ...string) error {
server := InternalNew(naming.FixedRoutingID(0x55555555))
- _, ep, err := server.Listen("tcp", args[1])
+ _, ep, err := server.Listen(args[1], args[2])
if err != nil {
fmt.Fprintln(stderr, err)
return err
diff --git a/runtimes/google/ipc/stream/proxy/proxy.go b/runtimes/google/ipc/stream/proxy/proxy.go
index eddef79..095357b 100644
--- a/runtimes/google/ipc/stream/proxy/proxy.go
+++ b/runtimes/google/ipc/stream/proxy/proxy.go
@@ -6,13 +6,13 @@
"net"
"sync"
+ "veyron.io/veyron/veyron2/ipc/stream"
"veyron.io/veyron/veyron2/naming"
"veyron.io/veyron/veyron2/security"
"veyron.io/veyron/veyron2/verror"
"veyron.io/veyron/veyron2/vlog"
"veyron.io/veyron/veyron2/vom"
- "veyron.io/veyron/veyron/lib/websocket"
"veyron.io/veyron/veyron/runtimes/google/ipc/stream/crypto"
"veyron.io/veyron/veyron/runtimes/google/ipc/stream/id"
"veyron.io/veyron/veyron/runtimes/google/ipc/stream/message"
@@ -132,14 +132,14 @@
// New creates a new Proxy that listens for network connections on the provided
// (network, address) pair and routes VC traffic between accepted connections.
func New(rid naming.RoutingID, principal security.Principal, network, address, pubAddress string) (*Proxy, error) {
- ln, err := net.Listen(network, address)
+ _, listenFn := stream.RegisteredProtocol(network)
+ if listenFn == nil {
+ return nil, fmt.Errorf("unknown network %s", network)
+ }
+ ln, err := listenFn(network, address)
if err != nil {
return nil, fmt.Errorf("net.Listen(%q, %q) failed: %v", network, address, err)
}
- ln, err = websocket.NewListener(ln)
- if err != nil {
- return nil, err
- }
if len(pubAddress) == 0 {
pubAddress = ln.Addr().String()
}
@@ -460,7 +460,8 @@
// Endpoint returns the endpoint of the proxy service. By Dialing a VC to this
// endpoint, processes can have their services exported through the proxy.
func (p *Proxy) Endpoint() naming.Endpoint {
- return version.Endpoint(p.ln.Addr().Network(), p.pubAddress, p.rid)
+ ep := version.Endpoint(p.ln.Addr().Network(), p.pubAddress, p.rid)
+ return ep
}
// Shutdown stops the proxy service, closing all network connections.
diff --git a/runtimes/google/naming/endpoint.go b/runtimes/google/naming/endpoint.go
index 2552bc2..33c86cf 100644
--- a/runtimes/google/naming/endpoint.go
+++ b/runtimes/google/naming/endpoint.go
@@ -34,7 +34,7 @@
// NewEndpoint creates a new endpoint from a string as per naming.NewEndpoint
func NewEndpoint(input string) (*Endpoint, error) {
- var ep Endpoint
+ ep := new(Endpoint)
// We have to guess this is a mount table if we don't know.
ep.IsMountTable = true
@@ -45,7 +45,7 @@
parts := strings.Split(input, separator)
if len(parts) == 1 {
err := ep.parseHostPort(parts[0])
- return &ep, err
+ return ep, err
}
version, err := strconv.ParseUint(parts[0], 10, 16)
@@ -63,7 +63,7 @@
default:
err = errInvalidEndpointString
}
- return &ep, err
+ return ep, err
}
func (ep *Endpoint) parseHostPort(input string) error {
diff --git a/runtimes/google/naming/namespace/all_test.go b/runtimes/google/naming/namespace/all_test.go
index 8c1bd07..2dd0a94 100644
--- a/runtimes/google/naming/namespace/all_test.go
+++ b/runtimes/google/naming/namespace/all_test.go
@@ -3,7 +3,6 @@
import (
"runtime"
"runtime/debug"
- "strings"
"sync"
"testing"
"time"
@@ -19,7 +18,7 @@
"veyron.io/veyron/veyron2/vlog"
"veyron.io/veyron/veyron/lib/testutil"
- _ "veyron.io/veyron/veyron/profiles"
+ "veyron.io/veyron/veyron/profiles"
"veyron.io/veyron/veyron/runtimes/google/naming/namespace"
vsecurity "veyron.io/veyron/veyron/security"
service "veyron.io/veyron/veyron/services/mounttable/lib"
@@ -54,10 +53,6 @@
t.Fatal(string(debug.Stack()))
}
-func addWSName(name string) []string {
- return []string{name, strings.Replace(name, "@tcp@", "@ws@", 1)}
-}
-
// N squared but who cares, this is a little test.
// Ignores dups.
func contains(container, contained []string) bool {
@@ -203,7 +198,7 @@
}
// Add a mount table server.
// Start serving on a loopback address.
- ep, err := s.Listen(ipc.ListenSpec{Protocol: "tcp", Address: "127.0.0.1:0"})
+ ep, err := s.Listen(profiles.LocalListenSpec)
if err != nil {
boom(t, "Failed to Listen: %s", err)
}
@@ -306,16 +301,16 @@
testResolveToMountTable(t, r, ns, m, rootMT)
// The server registered for each mount point is a mount table
- testResolve(t, r, ns, m, addWSName(mts[m].name)...)
+ testResolve(t, r, ns, m, mts[m].name)
// ResolveToMountTable will walk through to the sub MountTables
mtbar := naming.Join(m, "bar")
subMT := naming.Join(mts[m].name, "bar")
- testResolveToMountTable(t, r, ns, mtbar, addWSName(subMT)...)
+ testResolveToMountTable(t, r, ns, mtbar, subMT)
}
for _, j := range []string{j1MP, j2MP, j3MP} {
- testResolve(t, r, ns, j, addWSName(jokes[j].name)...)
+ testResolve(t, r, ns, j, jokes[j].name)
}
}
@@ -349,7 +344,7 @@
mt2mt := naming.Join(mts[mt2MP].name, "a")
// The mt2/a is served by the mt2 mount table
- testResolveToMountTable(t, r, ns, mt2a, addWSName(mt2mt)...)
+ testResolveToMountTable(t, r, ns, mt2a, mt2mt)
// The server for mt2a is mt3server from the second mount above.
testResolve(t, r, ns, mt2a, mt3Server)
@@ -366,11 +361,11 @@
names := []string{naming.JoinAddressName(mts[mt4MP].name, "a"),
naming.JoinAddressName(mts[mt5MP].name, "a")}
- names = append(names, addWSName(naming.JoinAddressName(mts[mt2MP].name, "a"))...)
+ names = append(names, naming.JoinAddressName(mts[mt2MP].name, "a"))
// We now have 3 mount tables prepared to serve mt2/a
testResolveToMountTable(t, r, ns, "mt2/a", names...)
names = []string{mts[mt4MP].name, mts[mt5MP].name}
- names = append(names, addWSName(mts[mt2MP].name)...)
+ names = append(names, mts[mt2MP].name)
testResolve(t, r, ns, "mt2", names...)
}
@@ -388,15 +383,15 @@
// Set up some nested mounts and verify resolution.
for _, m := range []string{"mt4/foo", "mt4/foo/bar"} {
- testResolve(t, r, ns, m, addWSName(mts[m].name)...)
+ testResolve(t, r, ns, m, mts[m].name)
}
testResolveToMountTable(t, r, ns, "mt4/foo",
- addWSName(naming.JoinAddressName(mts[mt4MP].name, "foo"))...)
+ naming.JoinAddressName(mts[mt4MP].name, "foo"))
testResolveToMountTable(t, r, ns, "mt4/foo/bar",
- addWSName(naming.JoinAddressName(mts["mt4/foo"].name, "bar"))...)
+ naming.JoinAddressName(mts["mt4/foo"].name, "bar"))
testResolveToMountTable(t, r, ns, "mt4/foo/baz",
- addWSName(naming.JoinAddressName(mts["mt4/foo"].name, "baz"))...)
+ naming.JoinAddressName(mts["mt4/foo"].name, "baz"))
}
// TestServers tests invoking RPCs on simple servers
@@ -411,16 +406,16 @@
// Let's run some non-mount table services
for _, j := range []string{j1MP, j2MP, j3MP} {
- testResolve(t, r, ns, j, addWSName(jokes[j].name)...)
+ testResolve(t, r, ns, j, jokes[j].name)
knockKnock(t, r, j)
globalName := naming.JoinAddressName(mts["mt4"].name, j)
disp := &dispatcher{}
gj := "g_" + j
jokes[gj] = runServer(t, r, disp, globalName)
- testResolve(t, r, ns, "mt4/"+j, addWSName(jokes[gj].name)...)
+ testResolve(t, r, ns, "mt4/"+j, jokes[gj].name)
knockKnock(t, r, "mt4/"+j)
- testResolveToMountTable(t, r, ns, "mt4/"+j, addWSName(globalName)...)
- testResolveToMountTable(t, r, ns, "mt4/"+j+"/garbage", addWSName(globalName+"/garbage")...)
+ testResolveToMountTable(t, r, ns, "mt4/"+j, globalName)
+ testResolveToMountTable(t, r, ns, "mt4/"+j+"/garbage", globalName+"/garbage")
}
}
@@ -568,7 +563,7 @@
}
// Since c1 was mounted with the Serve call, it will have both the tcp and ws endpoints.
- testResolve(t, r, ns, "c1", addWSName(c1.name)...)
+ testResolve(t, r, ns, "c1", c1.name)
testResolve(t, r, ns, "c1/c2", c1.name)
testResolve(t, r, ns, "c1/c3", c3.name)
testResolve(t, r, ns, "c1/c3/c4", c1.name)
@@ -675,9 +670,9 @@
}
// Now check a matching pattern.
- testResolveWithPattern(t, r, ns, name, naming.RootBlessingPatternOpt("root/server"), addWSName(mts[mt2MP].name)...)
+ testResolveWithPattern(t, r, ns, name, naming.RootBlessingPatternOpt("root/server"), mts[mt2MP].name)
testResolveToMountTableWithPattern(t, r, ns, name, naming.RootBlessingPatternOpt("root/server"), name)
// After successful lookup it should be cached, so the pattern doesn't matter.
- testResolveWithPattern(t, r, ns, name, naming.RootBlessingPatternOpt("root/foobar"), addWSName(mts[mt2MP].name)...)
+ testResolveWithPattern(t, r, ns, name, naming.RootBlessingPatternOpt("root/foobar"), mts[mt2MP].name)
}
diff --git a/runtimes/google/rt/mgmt.go b/runtimes/google/rt/mgmt.go
index a223f02..ae4a949 100644
--- a/runtimes/google/rt/mgmt.go
+++ b/runtimes/google/rt/mgmt.go
@@ -74,7 +74,7 @@
if address == "" {
return nil, fmt.Errorf("%v is not set", mgmt.AddressConfigKey)
}
- return &ipc.ListenSpec{Protocol: protocol, Address: address}, nil
+ return &ipc.ListenSpec{Addrs: ipc.ListenAddrs{{protocol, address}}}, nil
}
func (rt *vrt) callbackToParent(parentName, myName string) error {
diff --git a/runtimes/google/vtrace/vtrace_test.go b/runtimes/google/vtrace/vtrace_test.go
index 250f4c9..6e2e64b 100644
--- a/runtimes/google/vtrace/vtrace_test.go
+++ b/runtimes/google/vtrace/vtrace_test.go
@@ -13,7 +13,7 @@
"veyron.io/veyron/veyron2/vlog"
"veyron.io/veyron/veyron2/vtrace"
- _ "veyron.io/veyron/veyron/lib/tcp"
+ "veyron.io/veyron/veyron/profiles"
iipc "veyron.io/veyron/veyron/runtimes/google/ipc"
"veyron.io/veyron/veyron/runtimes/google/ipc/stream/manager"
tnaming "veyron.io/veyron/veyron/runtimes/google/testing/mocks/naming"
@@ -103,7 +103,7 @@
if err != nil {
return nil, err
}
- if _, err := s.Listen(ipc.ListenSpec{Protocol: "tcp", Address: "127.0.0.1:0"}); err != nil {
+ if _, err := s.Listen(profiles.LocalListenSpec); err != nil {
return nil, err
}
diff --git a/security/agent/pingpong/main.go b/security/agent/pingpong/main.go
index d79e993..a23a5ca 100644
--- a/security/agent/pingpong/main.go
+++ b/security/agent/pingpong/main.go
@@ -4,14 +4,14 @@
"flag"
"fmt"
- "veyron.io/veyron/veyron/lib/signals"
- _ "veyron.io/veyron/veyron/profiles"
-
"veyron.io/veyron/veyron2"
"veyron.io/veyron/veyron2/ipc"
"veyron.io/veyron/veyron2/rt"
"veyron.io/veyron/veyron2/security"
"veyron.io/veyron/veyron2/vlog"
+
+ "veyron.io/veyron/veyron/lib/signals"
+ _ "veyron.io/veyron/veyron/profiles"
)
var runServer = flag.Bool("server", false, "Whether to run in server mode")
@@ -44,7 +44,8 @@
serverPong := PingPongServer(&pongd{})
- if endpoint, err := s.Listen(ipc.ListenSpec{Protocol: "tcp", Address: "127.0.0.1:0"}); err == nil {
+ spec := ipc.ListenSpec{Addrs: ipc.ListenAddrs{{"tcp", "127.0.0.1:0"}}}
+ if endpoint, err := s.Listen(spec); err == nil {
fmt.Printf("Listening at: %v\n", endpoint)
} else {
log.Fatal("error listening to service: ", err)
diff --git a/security/agent/server/server.go b/security/agent/server/server.go
index 6035d22..c6ff7ea 100644
--- a/security/agent/server/server.go
+++ b/security/agent/server/server.go
@@ -204,7 +204,10 @@
vlog.Infof("Error creating server: %v", err)
continue
}
- spec := ipc.ListenSpec{Protocol: clientAddr.Network(), Address: clientAddr.String()}
+ a := []struct{ Protocol, Address string }{
+ {clientAddr.Network(), clientAddr.String()},
+ }
+ spec := ipc.ListenSpec{Addrs: ipc.ListenAddrs(a)}
if _, err = s.Listen(spec); err == nil {
err = s.Serve("", serverAgent, nil)
}
diff --git a/services/mgmt/device/impl/proxy_invoker_test.go b/services/mgmt/device/impl/proxy_invoker_test.go
index e1caf15..4ed43a8 100644
--- a/services/mgmt/device/impl/proxy_invoker_test.go
+++ b/services/mgmt/device/impl/proxy_invoker_test.go
@@ -11,6 +11,7 @@
"veyron.io/veyron/veyron2/security"
"veyron.io/veyron/veyron2/services/mgmt/stats"
"veyron.io/veyron/veyron2/services/security/access"
+ "veyron.io/veyron/veyron2/vlog"
"veyron.io/veyron/veyron/lib/testutil"
)
@@ -30,7 +31,7 @@
t.Fatalf("NewServer: %v", err)
}
defer server1.Stop()
- localSpec := ipc.ListenSpec{Protocol: "tcp", Address: "127.0.0.1:0"}
+ localSpec := ipc.ListenSpec{Addrs: ipc.ListenAddrs{{"tcp", "127.0.0.1:0"}}}
ep1, err := server1.Listen(localSpec)
if err != nil {
t.Fatalf("Listen: %v", err)
@@ -62,7 +63,7 @@
name := naming.JoinAddressName(ep2.String(), "system/start-time-rfc1123")
c := stats.StatsClient(name)
if _, err := c.Value(runtime.NewContext()); err != nil {
- t.Errorf("%q.Value() error: %v", name, err)
+ t.Fatalf("%q.Value() error: %v", name, err)
}
// Call Glob()
@@ -90,6 +91,7 @@
}
func (d *proxyDispatcher) Lookup(suffix string) (interface{}, security.Authorizer, error) {
+ vlog.Infof("LOOKUP(%s): remote .... %s", suffix, d.remote)
invoker := &proxyInvoker{
remote: naming.Join(d.remote, suffix),
access: access.Debug,
diff --git a/services/mgmt/device/impl/util_test.go b/services/mgmt/device/impl/util_test.go
index 4b7f342..52642ff 100644
--- a/services/mgmt/device/impl/util_test.go
+++ b/services/mgmt/device/impl/util_test.go
@@ -26,7 +26,7 @@
"veyron.io/veyron/veyron/lib/modules"
"veyron.io/veyron/veyron/lib/modules/core"
tsecurity "veyron.io/veyron/veyron/lib/testutil/security"
- "veyron.io/veyron/veyron/profiles/static"
+ _ "veyron.io/veyron/veyron/profiles/static"
"veyron.io/veyron/veyron/services/mgmt/device/impl"
"veyron.io/veyron/veyron2/services/mgmt/application"
)
@@ -162,11 +162,10 @@
if err != nil {
vlog.Fatalf("NewServer() failed: %v", err)
}
- spec := static.ListenSpec
- spec.Address = "127.0.0.1:0" // Isn't this the default?
+ spec := ipc.ListenSpec{Addrs: ipc.ListenAddrs{{"tcp", "127.0.0.1:0"}}}
endpoint, err := server.Listen(spec)
if err != nil {
- vlog.Fatalf("Listen(%s) failed: %v", static.ListenSpec, err)
+ vlog.Fatalf("Listen(%s) failed: %v", spec, err)
}
return server, endpoint.String()
}
diff --git a/services/mounttable/mounttabled/mounttable.go b/services/mounttable/mounttabled/mounttable.go
index 2cbf037..a9b0a1c 100644
--- a/services/mounttable/mounttabled/mounttable.go
+++ b/services/mounttable/mounttabled/mounttable.go
@@ -58,9 +58,9 @@
if len(*nhName) > 0 {
neighborhoodListenSpec := roaming.ListenSpec
// The ListenSpec code ensures that we have a valid address here.
- host, port, _ := net.SplitHostPort(roaming.ListenSpec.Address)
+ host, port, _ := net.SplitHostPort(roaming.ListenSpec.Addrs[0].Address)
if port != "" {
- neighborhoodListenSpec.Address = net.JoinHostPort(host, "0")
+ neighborhoodListenSpec.Addrs[0].Address = net.JoinHostPort(host, "0")
}
nhServer, err := r.NewServer(options.ServesMountTable(true))
if err != nil {
diff --git a/tools/naming/simulator/echo.scr b/tools/naming/simulator/echo.scr
index 454272c..09831c6 100644
--- a/tools/naming/simulator/echo.scr
+++ b/tools/naming/simulator/echo.scr
@@ -2,11 +2,12 @@
# and the difference between resolve and resolveMT.
set localaddr=--veyron.tcp.address=127.0.0.1:0
+set ws=--veyron.tcp.protocol=ws
-setRoots ""
+setRoots
# A 'stand-alone' server
-echoServer -- $localaddr "text" ""
+echoServer -- $localaddr $ws $localaddr "text" ""
set es=$_
eval $es
eval $es
@@ -28,7 +29,7 @@
set root=$MT_NAME
set NAMESPACE_ROOT=$root
-echoServer -- $localaddr "text2" "a/b"
+echoServer -- $localaddr $ws $localaddr "text2" "a/b"
set es=$_
eval $es
set es_name=$NAME
diff --git a/tools/naming/simulator/mt_complex.scr b/tools/naming/simulator/mt_complex.scr
index 4a1757f..daabc3b 100644
--- a/tools/naming/simulator/mt_complex.scr
+++ b/tools/naming/simulator/mt_complex.scr
@@ -3,6 +3,7 @@
# TODO - list the examples and any issues.
set localaddr="--veyron.tcp.address=127.0.0.1:0"
+set ws=--veyron.tcp.protocol=ws
cache off
@@ -61,12 +62,10 @@
resolve tl/a
set r=$_
eval $r
-assert $RN 2
+assert $RN 1
eval $r
set ep1=$R0
-eval $r
-set ep2=$R1
-assertOneOf $mt_a_name $ep1 $ep2
+assert $mt_a_name $ep1
wait $r
#
@@ -102,12 +101,10 @@
resolve tl
set r=$_
eval $r
-assert $RN 2
+assert $RN 1
eval $r
set ep1=$R0
-eval $r
-set ep2=$R1
-assertOneOf /$es_E1_addr $ep1 $ep2
+assert /$es_E1_addr $ep1
# let's have the echo server shut down
stop $es_E1
@@ -122,12 +119,10 @@
resolve tl/a
set r=$_
eval $r
-assert $RN 2
+assert $RN 1
eval $r
set ep1=$R0
-eval $r
-set ep2=$R1
-assertOneOf $mt_a_name $ep1 $ep2
+assert $mt_a_name $ep1
# run an echo server on tl/a - note that this currently doesn't seem to
# have any effect on the mount table - that is, I suspect the mount table
@@ -245,12 +240,10 @@
resolveMT $long_name/echo
set r=$_
eval $r
-assert $RN 2
+assert $RN 1
eval $r
set ep1=$R0
-eval $r
-set ep2=$R1
-assertOneOf /$mt_l_addr/echo $ep1 $ep2
+assert /$mt_l_addr/echo $ep1
# Now, use mount directly to create a 'symlink'
set symlink_target=some/deep/name/that/is/a/mount
@@ -277,12 +270,10 @@
resolveMT tl/b/symlink
set r=$_
eval $r
-assert $RN 2
+assert $RN 1
eval $r
set ep1=$R0
-eval $r
-set ep2=$R1
-assertOneOf /$mt_b_addr/symlink $ep1 $ep2
+assert /$mt_b_addr/symlink $ep1
stop $es_E3
stop $es_E2
diff --git a/tools/naming/simulator/mt_simple.scr b/tools/naming/simulator/mt_simple.scr
index 8c4d9c0..ab548a1 100644
--- a/tools/naming/simulator/mt_simple.scr
+++ b/tools/naming/simulator/mt_simple.scr
@@ -1,16 +1,17 @@
# Simple example showing multiple mount tables, servers and globing
set localaddr="--veyron.tcp.address=127.0.0.1:0"
+set ws=--veyron.tcp.protocol=ws
root -- $localaddr
eval $_
set root=$MT_NAME
set NAMESPACE_ROOT=$root
-mt -- $localaddr usa
+mt -- $localaddr $ws $localaddr usa
eval $_
set usa_mt=$MT_NAME
-mt -- $localaddr uk
+mt -- $localaddr $ws $localaddr uk
eval $_
set uk_mt=$MT_NAME
@@ -21,12 +22,12 @@
wait $l
set NAMESPACE_ROOT=$usa_mt
-mt -- $localaddr "palo alto"
+mt -- $localaddr $ws $localaddr "palo alto"
eval $_
set pa_mt=$MT_NAME
set NAMESPACE_ROOT=$uk_mt
-mt -- $localaddr "cambridge"
+mt -- $localaddr $ws $localaddr "cambridge"
eval $_
set cam_mt=$MT_NAME
diff --git a/tools/naming/simulator/proxy.scr b/tools/naming/simulator/proxy.scr
index ec514f3..0ef2d7b 100644
--- a/tools/naming/simulator/proxy.scr
+++ b/tools/naming/simulator/proxy.scr
@@ -2,15 +2,17 @@
cache off
set localaddr=--veyron.tcp.address=127.0.0.1:0
+set ws=--veyron.tcp.protocol=ws
-root -- $localaddr
+root -- $localaddr $ws $localaddr
eval $_
set root=$MT_NAME
set NAMESPACE_ROOT=$root
setRoots $NAMESPACE_ROOT
+print $NAMESPACE_ROOT
# run a non-proxied echo server
-echoServer -- $localaddr noproxy echo/noproxy
+echoServer -- $localaddr $ws $localaddr noproxy echo/noproxy
set esnp=$_
eval $esnp
set NP_ECHOS_NAME=$NAME
@@ -18,8 +20,13 @@
set NP_ECHOS_ADDR=$ADDR
+echoClient echo/noproxy "ohh"
+set ec=$_
+read $ec l
+assert $l "noproxy: ohh"
+
# run a proxy server
-proxyd -- $localaddr p1
+proxyd -- $localaddr $ws $localaddr p1
set proxy=$_
# PROXY_ADDR=<address of proxy>
eval $proxy
@@ -40,7 +47,7 @@
#assert $RN 3
#wait $l
-echoServer -- $localaddr --veyron.proxy=p1 withproxy echo/withproxy
+echoServer -- $localaddr $ws $localaddr --veyron.proxy=p1 withproxy echo/withproxy
set eswp=$_
eval $eswp
set ECHOS_NAME=$NAME
@@ -49,6 +56,11 @@
splitEP $ADDR
set ECHOS_RID=$P3
+echoClient echo/withproxy "ahh"
+set ec=$_
+read $ec l
+assert $l "withproxy: ahh"
+
#ls ...
#set l=$_
#eval $l
@@ -61,18 +73,22 @@
print "with proxy: " $ECHOS_ADDR
# The ipc.Server implementation publishes the proxy supplied address and
# the local address in the mount table
+
resolve echo/withproxy
set rs=$_
eval $rs
-assert $RN 4
+# This will be 4 when ipc.Listen can return all of the endpoints in use,
+# then the proxy can return more than one address. We only see 3 endpoints
+# because the proxy server only returns one to the echo server.
+assert $RN 3
eval $rs
set ep1=$R0
eval $rs
set ep2=$R1
eval $rs
set ep3=$R2
-eval $rs
-set ep4=$R3
+#eval $rs
+#set ep4=$R3
splitEP $ep1
assert $PN 7
@@ -82,10 +98,11 @@
set ep2_addr=$P2
splitEP $ep3
set ep3_addr=$P2
-splitEP $ep4
-set ep4_addr=$P2
+#splitEP $ep4
+#set ep4_addr=$P2
-assertOneOf $PROXY_ADDR $ep1_addr $ep2_addr $ep3_addr $ep4_addr
+assertOneOf $PROXY_ADDR $ep1_addr $ep2_addr $ep3_addr
+# $ep4_addr
assert $rid $ECHOS_RID
ls -- -l echo/withproxy
diff --git a/tools/servicerunner/main.go b/tools/servicerunner/main.go
index 653d8d0..28a0ac8 100644
--- a/tools/servicerunner/main.go
+++ b/tools/servicerunner/main.go
@@ -79,7 +79,7 @@
}
vars[consts.VeyronCredentials] = v
- h, err := sh.Start("root", nil, "--", "--veyron.tcp.address=127.0.0.1:0")
+ h, err := sh.Start("root", nil, "--", "--veyron.tcp.protocol=ws", "--veyron.tcp.address=127.0.0.1:0")
panicOnError(err)
updateVars(h, vars, "MT_NAME")
@@ -91,11 +91,11 @@
// NOTE(sadovsky): The proxyd binary requires --protocol and --address flags
// while the proxyd command instead uses ListenSpec flags.
- h, err = sh.Start("proxyd", nil, "--", "--veyron.tcp.address=127.0.0.1:0", "test/proxy")
+ h, err = sh.Start("proxyd", nil, "--", "--veyron.tcp.protocol=ws", "--veyron.tcp.address=127.0.0.1:0", "test/proxy")
panicOnError(err)
updateVars(h, vars, "PROXY_ADDR")
- h, err = sh.Start("wsprd", nil, "--", "--veyron.tcp.address=127.0.0.1:0", "--veyron.proxy=test/proxy", "--identd=test/identd")
+ h, err = sh.Start("wsprd", nil, "--", "--veyron.tcp.protocol=ws", "--veyron.tcp.address=127.0.0.1:0", "--veyron.proxy=test/proxy", "--identd=test/identd")
panicOnError(err)
updateVars(h, vars, "WSPR_ADDR")