Merge "veyron2/ipc: reworked ListenX API to support roaming, non-roaming and proxy."
diff --git a/profiles/roaming/gce_linux.go b/profiles/roaming/gce_linux.go
index 93f5510..ebab8ec 100644
--- a/profiles/roaming/gce_linux.go
+++ b/profiles/roaming/gce_linux.go
@@ -37,8 +37,6 @@
_ = stop
// TODO(cnicolaou): stop should be used by the soon to be added
// Cleanup method.
-
- publishInitialSettings(ch, listenProtocolFlag.Protocol, listenSpecFlag.String(), []net.Addr{pub})
return pub
}
return nil
diff --git a/profiles/roaming/init.go b/profiles/roaming/init.go
index 9196dd6..ddad180 100644
--- a/profiles/roaming/init.go
+++ b/profiles/roaming/init.go
@@ -20,24 +20,34 @@
"veyron2/ipc"
"veyron2/rt"
+ "veyron/lib/flags"
"veyron/lib/netconfig"
"veyron/lib/netstate"
"veyron/profiles"
)
const (
- SettingsStreamName = "dhcp"
+ SettingsStreamName = "roaming"
)
var (
- listenProtocolFlag = config.TCPProtocolFlag{"tcp"}
- listenSpecFlag = config.IPHostPortFlag{Port: "0"}
+ listenProtocolFlag = flags.TCPProtocolFlag{"tcp"}
+ listenAddressFlag = flags.IPHostPortFlag{Port: "0"}
+ listenProxyFlag string
+ ListenSpec *ipc.ListenSpec
)
func init() {
flag.Var(&listenProtocolFlag, "veyron.tcp.protocol", "protocol to listen with")
- flag.Var(&listenSpecFlag, "veyron.tcp.address", "address to listen on")
+ flag.Var(&listenAddressFlag, "veyron.tcp.address", "address to listen on")
+ flag.StringVar(&listenProxyFlag, "veyron.proxy", "", "proxy to use")
+
rt.RegisterProfile(New())
+ ListenSpec = &ipc.ListenSpec{
+ Protocol: listenProtocolFlag.Protocol,
+ Address: listenAddressFlag.String(),
+ Proxy: listenProxyFlag,
+ }
}
type profile struct {
@@ -122,8 +132,7 @@
}
protocol := listenProtocolFlag.Protocol
- log.VI(2).Infof("Initial Network Settings: %s %s available: %s", protocol, listenSpecFlag, state)
- publishInitialSettings(ch, protocol, listenSpecFlag.String(), state)
+ log.VI(2).Infof("Initial Network Settings: %s %s available: %s", protocol, listenAddressFlag, state)
go monitorNetworkSettings(rt, stop, ch, state, protocol, p.addrChooser)
}
@@ -131,18 +140,6 @@
return p.addrChooser
}
-func publishInitialSettings(ch chan<- config.Setting, protocol, listenSpec string, addrs []net.Addr) {
- // TODO(cnicolaou): consider applying the address chooser here and not in
- // the server, or better yet not sending the InitialAddrsSetting.
- for _, setting := range []config.Setting{
- ipc.NewProtocolSetting(protocol),
- ipc.NewListenSpecSetting(listenSpecFlag),
- ipc.NewInitialAddrsSetting(addrs),
- } {
- ch <- setting
- }
-}
-
// monitorNetworkSettings will monitor network configuration changes and
// publish subsequent Settings to reflect any changes detected.
func monitorNetworkSettings(rt veyron2.Runtime, stop <-chan struct{},
diff --git a/profiles/roaming/roaming_server.go b/profiles/roaming/roaming_server.go
index bc29b17..20f89cd 100644
--- a/profiles/roaming/roaming_server.go
+++ b/profiles/roaming/roaming_server.go
@@ -22,7 +22,8 @@
log.Fatalf("unexpected error: %q", err)
}
- ep, err := server.RoamingListen(r.Publisher(), roaming.SettingsStreamName)
+ fmt.Printf("listen spec: %v\n", roaming.ListenSpec)
+ ep, err := server.ListenX(roaming.ListenSpec)
if err != nil {
log.Fatalf("unexpected error: %q", err)
}
diff --git a/runtimes/google/ipc/server.go b/runtimes/google/ipc/server.go
index cb2cece..a7aa184 100644
--- a/runtimes/google/ipc/server.go
+++ b/runtimes/google/ipc/server.go
@@ -250,7 +250,7 @@
return iep, nil, nil
}
-func (s *server) RoamingListen() (naming.Endpoint, error) {
+func (s *server) ListenX(listenSpec *ipc.ListenSpec) (naming.Endpoint, error) {
s.Lock()
// Shortcut if the server is stopped, to avoid needlessly creating a
// listener.
@@ -260,47 +260,28 @@
}
s.Unlock()
+ protocol := listenSpec.Protocol
+ address := listenSpec.Address
+ if len(listenSpec.Proxy) > 0 {
+ // TODO(cnicolaou): implement support for proxy...
+ }
+
+ h, _, _ := net.SplitHostPort(address)
+ ip := net.ParseIP(h)
+ if ip != nil && ip.IsLoopback() {
+ // All our addresses are loopback addresses
+ // TODO(cnicolaou): use Listen for now, but should refactor more completely.
+ return s.Listen(protocol, address)
+ }
+
publisher := s.roamingOpt.Publisher
streamName := s.roamingOpt.StreamName
ch := make(chan config.Setting)
- configStream, err := publisher.ForkStream(streamName, ch)
+ _, err := publisher.ForkStream(streamName, ch)
if err != nil {
return nil, fmt.Errorf("failed to fork stream %q: %s", streamName, err)
}
- setting := configStream.Latest[ipc.ProtocolSetting]
- if setting == nil {
- return nil, fmt.Errorf("protocol setting has not be sent")
- }
- protocol, ok := setting.Value().(string)
- if !ok {
- return nil, fmt.Errorf("protocol setting is of the wrong type %T", setting.Value())
- }
-
- setting = configStream.Latest[ipc.ListenSpecSetting]
- if setting == nil {
- return nil, fmt.Errorf("listen spec setting has not be sent")
- }
- listenSpec, ok := setting.Value().(config.IPHostPortFlag)
- if !ok {
- return nil, fmt.Errorf("listen spec setting is of the wrong type %T", setting.Value())
- }
-
- address := listenSpec.String()
-
- isNotLoopback := func(a net.Addr) bool {
- if ip := netstate.AsIP(a); ip != nil {
- return !ip.IsLoopback()
- }
- return true
- }
-
- al := netstate.FromIPAddr(listenSpec.IP)
- if len(al) > 0 && al.First(isNotLoopback) == nil {
- // All our addresses are loopback addresses
- // TODO(cnicolaou): use Listen for now, but should refactor more completely.
- return s.Listen(protocol, listenSpec.String())
- }
ln, lep, err := s.streamMgr.Listen(protocol, address, s.listenerOpts...)
if err != nil {
@@ -335,7 +316,7 @@
go func(ln stream.Listener, ep naming.Endpoint) {
s.listenLoop(ln, ep)
s.active.Done()
- }(ln, ep)
+ }(ln, lep)
// goroutine to listen for address changes.
go func(dl *dhcpListener) {