veyron2/ipc/stream: Unexpose veyron2/ipc/stream API.
veyron2/ipc/stream -> veyron2/runtimes/google/ipc/stream
stream.Register* methods moved to veyron2/ipc
Change-Id: Ie0b312b4bbfc7ee5a42496ac34da9c8a7b6f64cf
diff --git a/lib/modules/shell.go b/lib/modules/shell.go
index 687ccb8..cfdfc73 100644
--- a/lib/modules/shell.go
+++ b/lib/modules/shell.go
@@ -98,7 +98,7 @@
}
var err error
ctx, sh.cancelCtx = context.WithCancel(ctx)
- if ctx, _, err = veyron2.SetNewStreamManager(ctx); err != nil {
+ if ctx, err = veyron2.SetNewStreamManager(ctx); err != nil {
return nil, err
}
sh.ctx = ctx
@@ -153,7 +153,7 @@
}()
ctx, cancel := context.WithCancel(sh.ctx)
defer cancel()
- if ctx, _, err = veyron2.SetNewStreamManager(ctx); err != nil {
+ if ctx, err = veyron2.SetNewStreamManager(ctx); err != nil {
return nil, err
}
syscall.ForkLock.RLock()
diff --git a/lib/unixfd/unixfd.go b/lib/unixfd/unixfd.go
index fa0fe85..8f73871 100644
--- a/lib/unixfd/unixfd.go
+++ b/lib/unixfd/unixfd.go
@@ -14,7 +14,7 @@
"time"
"unsafe"
- "v.io/core/veyron2/ipc/stream"
+ "v.io/core/veyron2/ipc"
)
const Network string = "unixfd"
@@ -328,5 +328,5 @@
}
func init() {
- stream.RegisterProtocol(Network, unixFDConn, unixFDListen)
+ ipc.RegisterProtocol(Network, unixFDConn, unixFDListen)
}
diff --git a/lib/websocket/util_test.go b/lib/websocket/util_test.go
index 4d599b7..cb8df27 100644
--- a/lib/websocket/util_test.go
+++ b/lib/websocket/util_test.go
@@ -11,7 +11,7 @@
"testing"
"time"
- "v.io/core/veyron2/ipc/stream"
+ "v.io/core/veyron2/ipc"
"v.io/core/veyron/lib/testutil"
)
@@ -23,7 +23,7 @@
crcTable = crc64.MakeTable(crc64.ISO)
}
-func newSender(t *testing.T, dialer stream.DialerFunc, protocol, address string) net.Conn {
+func newSender(t *testing.T, dialer ipc.DialerFunc, protocol, address string) net.Conn {
conn, err := dialer(protocol, address, time.Minute)
if err != nil {
t.Fatalf("unexpected error: %s", err)
@@ -176,7 +176,7 @@
}
}
-func packetRunner(t *testing.T, ln net.Listener, dialer stream.DialerFunc, protocol, address string) {
+func packetRunner(t *testing.T, ln net.Listener, dialer ipc.DialerFunc, protocol, address string) {
nPackets := 100
go packetReceiver(t, ln, &backChannel{
crcChan: make(chan uint64, nPackets),
@@ -271,7 +271,7 @@
}
}
-func byteRunner(t *testing.T, ln net.Listener, dialer stream.DialerFunc, protocol, address string) {
+func byteRunner(t *testing.T, ln net.Listener, dialer ipc.DialerFunc, protocol, address string) {
nIterations := 10
go byteReceiver(t, ln, &backChannel{
byteChan: make(chan []byte, nIterations),
diff --git a/lib/websocket/ws_test.go b/lib/websocket/ws_test.go
index e7ccad3..9c7d8a9 100644
--- a/lib/websocket/ws_test.go
+++ b/lib/websocket/ws_test.go
@@ -5,12 +5,12 @@
"sync"
"testing"
- "v.io/core/veyron2/ipc/stream"
+ "v.io/core/veyron2/ipc"
"v.io/core/veyron/lib/websocket"
)
-func packetTester(t *testing.T, dialer stream.DialerFunc, listener stream.ListenerFunc, txProtocol, rxProtocol string) {
+func packetTester(t *testing.T, dialer ipc.DialerFunc, listener ipc.ListenerFunc, txProtocol, rxProtocol string) {
ln, err := listener(rxProtocol, "127.0.0.1:0")
if err != nil {
t.Fatalf("unexpected error: %s", err)
@@ -24,7 +24,7 @@
packetRunner(t, ln, dialer, txProtocol, ln.Addr().String())
}
-func byteTester(t *testing.T, dialer stream.DialerFunc, listener stream.ListenerFunc, txProtocol, rxProtocol string) {
+func byteTester(t *testing.T, dialer ipc.DialerFunc, listener ipc.ListenerFunc, txProtocol, rxProtocol string) {
ln, err := listener(rxProtocol, "127.0.0.1:0")
if err != nil {
t.Fatalf("unexpected error: %s", err)
@@ -67,7 +67,7 @@
defer ln.Close()
var pwg sync.WaitGroup
- packetTest := func(dialer stream.DialerFunc, protocol string) {
+ packetTest := func(dialer ipc.DialerFunc, protocol string) {
packetRunner(t, ln, dialer, protocol, ln.Addr().String())
pwg.Done()
}
@@ -80,7 +80,7 @@
pwg.Wait()
var bwg sync.WaitGroup
- byteTest := func(dialer stream.DialerFunc, protocol string) {
+ byteTest := func(dialer ipc.DialerFunc, protocol string) {
byteRunner(t, ln, dialer, protocol, ln.Addr().String())
bwg.Done()
}
diff --git a/profiles/chrome/chromeinit.go b/profiles/chrome/chromeinit.go
index 1019de9..e0740c1 100644
--- a/profiles/chrome/chromeinit.go
+++ b/profiles/chrome/chromeinit.go
@@ -8,7 +8,6 @@
"v.io/core/veyron2"
"v.io/core/veyron2/context"
"v.io/core/veyron2/ipc"
- "v.io/core/veyron2/ipc/stream"
"v.io/core/veyron2/vlog"
"v.io/core/veyron/lib/flags"
@@ -23,7 +22,7 @@
func init() {
veyron2.RegisterProfileInit(Init)
- stream.RegisterUnknownProtocol("wsh", websocket.Dial, websocket.Listener)
+ ipc.RegisterUnknownProtocol("wsh", websocket.Dial, websocket.Listener)
commonFlags = flags.CreateAndRegister(flag.CommandLine, flags.Runtime)
}
diff --git a/profiles/gce/init.go b/profiles/gce/init.go
index 6c47b7d..23d3480 100644
--- a/profiles/gce/init.go
+++ b/profiles/gce/init.go
@@ -12,7 +12,6 @@
"v.io/core/veyron2"
"v.io/core/veyron2/context"
"v.io/core/veyron2/ipc"
- "v.io/core/veyron2/ipc/stream"
"v.io/core/veyron2/vlog"
"v.io/core/veyron/lib/appcycle"
@@ -31,7 +30,7 @@
func init() {
veyron2.RegisterProfileInit(Init)
- stream.RegisterUnknownProtocol("wsh", websocket.HybridDial, websocket.HybridListener)
+ ipc.RegisterUnknownProtocol("wsh", websocket.HybridDial, websocket.HybridListener)
commonFlags = flags.CreateAndRegister(flag.CommandLine, flags.Runtime, flags.Listen)
}
diff --git a/profiles/genericinit.go b/profiles/genericinit.go
index 936cd2f..7bdcf9f 100644
--- a/profiles/genericinit.go
+++ b/profiles/genericinit.go
@@ -6,7 +6,6 @@
"v.io/core/veyron2"
"v.io/core/veyron2/context"
"v.io/core/veyron2/ipc"
- "v.io/core/veyron2/ipc/stream"
"v.io/core/veyron2/vlog"
"v.io/core/veyron/lib/appcycle"
@@ -23,7 +22,7 @@
func init() {
veyron2.RegisterProfileInit(Init)
- stream.RegisterUnknownProtocol("wsh", websocket.HybridDial, websocket.HybridListener)
+ ipc.RegisterUnknownProtocol("wsh", websocket.HybridDial, websocket.HybridListener)
flags.SetDefaultProtocol("tcp")
flags.SetDefaultHostPort("127.0.0.1:0")
commonFlags = flags.CreateAndRegister(flag.CommandLine, flags.Runtime, flags.Listen)
diff --git a/profiles/roaming/roaminginit.go b/profiles/roaming/roaminginit.go
index ae37ee9..8b9eb01 100644
--- a/profiles/roaming/roaminginit.go
+++ b/profiles/roaming/roaminginit.go
@@ -17,7 +17,6 @@
"v.io/core/veyron2/config"
"v.io/core/veyron2/context"
"v.io/core/veyron2/ipc"
- "v.io/core/veyron2/ipc/stream"
"v.io/core/veyron2/vlog"
"v.io/core/veyron/lib/appcycle"
@@ -44,7 +43,7 @@
func init() {
veyron2.RegisterProfileInit(Init)
- stream.RegisterUnknownProtocol("wsh", websocket.HybridDial, websocket.HybridListener)
+ ipc.RegisterUnknownProtocol("wsh", websocket.HybridDial, websocket.HybridListener)
commonFlags = flags.CreateAndRegister(flag.CommandLine, flags.Runtime, flags.Listen)
}
diff --git a/profiles/static/staticinit.go b/profiles/static/staticinit.go
index 2532210..f477d21 100644
--- a/profiles/static/staticinit.go
+++ b/profiles/static/staticinit.go
@@ -6,7 +6,6 @@
"v.io/core/veyron2"
"v.io/core/veyron2/context"
"v.io/core/veyron2/ipc"
- "v.io/core/veyron2/ipc/stream"
"v.io/core/veyron2/vlog"
"v.io/core/veyron/lib/appcycle"
@@ -28,7 +27,7 @@
func init() {
veyron2.RegisterProfileInit(Init)
- stream.RegisterUnknownProtocol("wsh", websocket.HybridDial, websocket.HybridListener)
+ ipc.RegisterUnknownProtocol("wsh", websocket.HybridDial, websocket.HybridListener)
commonFlags = flags.CreateAndRegister(flag.CommandLine, flags.Runtime, flags.Listen)
}
diff --git a/runtimes/fake/ipc.go b/runtimes/fake/ipc.go
index 755953d..5470a5a 100644
--- a/runtimes/fake/ipc.go
+++ b/runtimes/fake/ipc.go
@@ -3,7 +3,6 @@
import (
"v.io/core/veyron2/context"
"v.io/core/veyron2/ipc"
- "v.io/core/veyron2/ipc/stream"
)
// SetClient can be used to inject a mock client implementation into the context.
@@ -21,10 +20,7 @@
func (r *Runtime) NewServer(ctx *context.T, opts ...ipc.ServerOpt) (ipc.Server, error) {
panic("unimplemented")
}
-func (r *Runtime) SetNewStreamManager(ctx *context.T, opts ...stream.ManagerOpt) (*context.T, stream.Manager, error) {
- panic("unimplemented")
-}
-func (r *Runtime) GetStreamManager(ctx *context.T) stream.Manager {
+func (r *Runtime) SetNewStreamManager(ctx *context.T) (*context.T, error) {
panic("unimplemented")
}
diff --git a/runtimes/google/ipc/blessings_cache.go b/runtimes/google/ipc/blessings_cache.go
index aadbbfc..1c19b40 100644
--- a/runtimes/google/ipc/blessings_cache.go
+++ b/runtimes/google/ipc/blessings_cache.go
@@ -6,8 +6,8 @@
"reflect"
"sync"
+ "v.io/core/veyron/runtimes/google/ipc/stream"
"v.io/core/veyron2/ipc"
- "v.io/core/veyron2/ipc/stream"
"v.io/core/veyron2/security"
)
diff --git a/runtimes/google/ipc/cancel_test.go b/runtimes/google/ipc/cancel_test.go
index 2022ad7..a528795 100644
--- a/runtimes/google/ipc/cancel_test.go
+++ b/runtimes/google/ipc/cancel_test.go
@@ -6,9 +6,9 @@
"v.io/core/veyron/runtimes/google/ipc/stream/manager"
tnaming "v.io/core/veyron/runtimes/google/testing/mocks/naming"
+ "v.io/core/veyron/runtimes/google/ipc/stream"
"v.io/core/veyron2/context"
"v.io/core/veyron2/ipc"
- "v.io/core/veyron2/ipc/stream"
"v.io/core/veyron2/naming"
"v.io/core/veyron2/security"
"v.io/core/veyron2/vlog"
diff --git a/runtimes/google/ipc/client.go b/runtimes/google/ipc/client.go
index 711fe33..a5bd302 100644
--- a/runtimes/google/ipc/client.go
+++ b/runtimes/google/ipc/client.go
@@ -10,10 +10,10 @@
"sync"
"time"
+ "v.io/core/veyron/runtimes/google/ipc/stream"
"v.io/core/veyron2/context"
"v.io/core/veyron2/i18n"
"v.io/core/veyron2/ipc"
- "v.io/core/veyron2/ipc/stream"
"v.io/core/veyron2/naming"
"v.io/core/veyron2/options"
"v.io/core/veyron2/security"
diff --git a/runtimes/google/ipc/full_test.go b/runtimes/google/ipc/full_test.go
index 273dfee..5aecc6d 100644
--- a/runtimes/google/ipc/full_test.go
+++ b/runtimes/google/ipc/full_test.go
@@ -16,9 +16,9 @@
"testing"
"time"
+ "v.io/core/veyron/runtimes/google/ipc/stream"
"v.io/core/veyron2/context"
"v.io/core/veyron2/ipc"
- "v.io/core/veyron2/ipc/stream"
"v.io/core/veyron2/naming"
"v.io/core/veyron2/options"
"v.io/core/veyron2/security"
diff --git a/runtimes/google/ipc/protocols/tcp/init.go b/runtimes/google/ipc/protocols/tcp/init.go
index 8007578..4d9f756 100644
--- a/runtimes/google/ipc/protocols/tcp/init.go
+++ b/runtimes/google/ipc/protocols/tcp/init.go
@@ -3,11 +3,11 @@
import (
"net"
- "v.io/core/veyron2/ipc/stream"
+ "v.io/core/veyron2/ipc"
)
func init() {
for _, p := range []string{"tcp", "tcp4", "tcp6"} {
- stream.RegisterProtocol(p, net.DialTimeout, net.Listen)
+ ipc.RegisterProtocol(p, net.DialTimeout, net.Listen)
}
}
diff --git a/runtimes/google/ipc/protocols/ws/init.go b/runtimes/google/ipc/protocols/ws/init.go
index 583da58..0ad45b5 100644
--- a/runtimes/google/ipc/protocols/ws/init.go
+++ b/runtimes/google/ipc/protocols/ws/init.go
@@ -1,7 +1,7 @@
package websocket
import (
- "v.io/core/veyron2/ipc/stream"
+ "v.io/core/veyron2/ipc"
"v.io/core/veyron/lib/websocket"
)
@@ -9,6 +9,6 @@
func init() {
// ws, ws4, ws6 represent websocket protocol instances.
for _, p := range []string{"ws", "ws4", "ws6"} {
- stream.RegisterProtocol(p, websocket.Dial, websocket.Listener)
+ ipc.RegisterProtocol(p, websocket.Dial, websocket.Listener)
}
}
diff --git a/runtimes/google/ipc/protocols/wsh/init.go b/runtimes/google/ipc/protocols/wsh/init.go
index 30893b1..7345adf 100644
--- a/runtimes/google/ipc/protocols/wsh/init.go
+++ b/runtimes/google/ipc/protocols/wsh/init.go
@@ -3,13 +3,13 @@
package wsh
import (
- "v.io/core/veyron2/ipc/stream"
+ "v.io/core/veyron2/ipc"
"v.io/core/veyron/lib/websocket"
)
func init() {
for _, p := range []string{"wsh", "wsh4", "wsh6"} {
- stream.RegisterProtocol(p, websocket.HybridDial, websocket.HybridListener)
+ ipc.RegisterProtocol(p, websocket.HybridDial, websocket.HybridListener)
}
}
diff --git a/runtimes/google/ipc/protocols/wsh_nacl/init.go b/runtimes/google/ipc/protocols/wsh_nacl/init.go
index e9ef9ff..cd10734 100644
--- a/runtimes/google/ipc/protocols/wsh_nacl/init.go
+++ b/runtimes/google/ipc/protocols/wsh_nacl/init.go
@@ -3,13 +3,13 @@
package wsh_nacl
import (
- "v.io/core/veyron2/ipc/stream"
+ "v.io/core/veyron2/ipc"
"v.io/core/veyron/lib/websocket"
)
func init() {
for _, p := range []string{"wsh", "wsh4", "wsh6"} {
- stream.RegisterProtocol(p, websocket.Dial, websocket.Listener)
+ ipc.RegisterProtocol(p, websocket.Dial, websocket.Listener)
}
}
diff --git a/runtimes/google/ipc/server.go b/runtimes/google/ipc/server.go
index f560916..68d5984 100644
--- a/runtimes/google/ipc/server.go
+++ b/runtimes/google/ipc/server.go
@@ -10,10 +10,10 @@
"sync"
"time"
+ "v.io/core/veyron/runtimes/google/ipc/stream"
"v.io/core/veyron2/config"
"v.io/core/veyron2/context"
"v.io/core/veyron2/ipc"
- "v.io/core/veyron2/ipc/stream"
"v.io/core/veyron2/naming"
"v.io/core/veyron2/options"
"v.io/core/veyron2/security"
diff --git a/runtimes/google/ipc/stream/benchmark/throughput_flow.go b/runtimes/google/ipc/stream/benchmark/throughput_flow.go
index 3ce3a36..0aa00fe 100644
--- a/runtimes/google/ipc/stream/benchmark/throughput_flow.go
+++ b/runtimes/google/ipc/stream/benchmark/throughput_flow.go
@@ -6,7 +6,7 @@
"v.io/core/veyron/runtimes/google/ipc/stream/manager"
- "v.io/core/veyron2/ipc/stream"
+ "v.io/core/veyron/runtimes/google/ipc/stream"
"v.io/core/veyron2/naming"
"v.io/core/veyron2/options"
)
diff --git a/runtimes/google/ipc/stream/crypto/tls.go b/runtimes/google/ipc/stream/crypto/tls.go
index fafa18c..cf536be 100644
--- a/runtimes/google/ipc/stream/crypto/tls.go
+++ b/runtimes/google/ipc/stream/crypto/tls.go
@@ -18,7 +18,7 @@
var errDeadlinesNotSupported = errors.New("deadlines not supported")
// TLSClientSessionCacheOpt specifies the ClientSessionCache used to resume TLS sessions.
-// It adapts tls.ClientSessionCache to the veyron2/ipc/stream.VCOpt interface.
+// It adapts tls.ClientSessionCache to the veyron/runtimes/google/ipc/stream.VCOpt interface.
type TLSClientSessionCache struct{ tls.ClientSessionCache }
func (TLSClientSessionCache) IPCStreamVCOpt() {}
diff --git a/runtimes/google/ipc/stream/doc.go b/runtimes/google/ipc/stream/doc.go
index 0d6b8c0..65f070b 100644
--- a/runtimes/google/ipc/stream/doc.go
+++ b/runtimes/google/ipc/stream/doc.go
@@ -1,23 +1,20 @@
-// Package stream implements interfaces in veyron2/ipc/stream.
+// Package stream implements authenticated byte streams to veyron endpoints.
//
// It is split into multiple sub-packages in an attempt to keep the code
-// healthier by limiting the dependencies between objects. Most users of this
-// package however should only need to use the Runtime type defined in it
-// (which provides a factory method to create stream.Manager objects).
+// healthier by limiting the dependencies between objects. Most users should not
+// need to use this package.
//
// Package contents and dependencies are as follows:
//
-// * manager provides a factory for veyron2/ipc/stream.Manager objects.
+// * manager provides a factory for Manager objects.
// It depends on the vif and proxy packages.
-// * vif implements a VIF type that wraps over a net.Conn
-// and enables the creation of veyron2/ipc/stream.VC objects
-// over the underlying network connection.
+// * vif implements a VIF type that wraps over a net.Conn and enables the
+// creation of VC objects over the underlying network connection.
// It depends on the id, message and vc packages.
-// * message implements serialization and deserialization for
-// messages exchanged over a VIF.
+// * message implements serialization and deserialization for messages
+// exchanged over a VIF.
// It depends on the id package.
-// * vc provides types implementing veyron2/ipc/stream.VC and
-// veyron2/ipc/stream.Flow
+// * vc provides types implementing VC and Flow.
// It depends on the id and crypto packages.
// * crypto provides types to secure communication over VCs.
// It does not depend on any other package.
diff --git a/runtimes/google/ipc/stream/manager/listener.go b/runtimes/google/ipc/stream/manager/listener.go
index 5920fe2..cd4ba40 100644
--- a/runtimes/google/ipc/stream/manager/listener.go
+++ b/runtimes/google/ipc/stream/manager/listener.go
@@ -13,7 +13,7 @@
"v.io/core/veyron/runtimes/google/lib/upcqueue"
inaming "v.io/core/veyron/runtimes/google/naming"
- "v.io/core/veyron2/ipc/stream"
+ "v.io/core/veyron/runtimes/google/ipc/stream"
"v.io/core/veyron2/naming"
verror "v.io/core/veyron2/verror2"
"v.io/core/veyron2/vlog"
diff --git a/runtimes/google/ipc/stream/manager/manager.go b/runtimes/google/ipc/stream/manager/manager.go
index bc57117..c6c0b25 100644
--- a/runtimes/google/ipc/stream/manager/manager.go
+++ b/runtimes/google/ipc/stream/manager/manager.go
@@ -1,4 +1,4 @@
-// Package manager provides an implementation of the Manager interface defined in veyron2/ipc/stream.
+// Package manager provides an implementation of the Manager interface defined in veyron/runtimes/google/ipc/stream.
package manager
import (
@@ -9,13 +9,14 @@
"sync"
"time"
- "v.io/core/veyron2/ipc/stream"
+ "v.io/core/veyron2/ipc"
"v.io/core/veyron2/naming"
"v.io/core/veyron2/verror"
"v.io/core/veyron2/verror2"
"v.io/core/veyron2/vlog"
"v.io/core/veyron/lib/stats"
+ "v.io/core/veyron/runtimes/google/ipc/stream"
"v.io/core/veyron/runtimes/google/ipc/stream/crypto"
"v.io/core/veyron/runtimes/google/ipc/stream/vif"
"v.io/core/veyron/runtimes/google/ipc/version"
@@ -62,7 +63,7 @@
func (DialTimeout) IPCClientOpt() {}
func dial(network, address string, timeout time.Duration) (net.Conn, error) {
- if d, _ := stream.RegisteredProtocol(network); d != nil {
+ if d, _ := ipc.RegisteredProtocol(network); d != nil {
return d(network, address, timeout)
}
return nil, fmt.Errorf("unknown network %s", network)
@@ -147,7 +148,7 @@
}
func listen(protocol, address string) (net.Listener, error) {
- if _, l := stream.RegisteredProtocol(protocol); l != nil {
+ if _, l := ipc.RegisteredProtocol(protocol); l != nil {
return l(protocol, address)
}
return nil, fmt.Errorf("unknown network %s", protocol)
diff --git a/runtimes/google/ipc/stream/manager/manager_test.go b/runtimes/google/ipc/stream/manager/manager_test.go
index 861b3f4..17a03c2 100644
--- a/runtimes/google/ipc/stream/manager/manager_test.go
+++ b/runtimes/google/ipc/stream/manager/manager_test.go
@@ -12,7 +12,8 @@
"testing"
"time"
- "v.io/core/veyron2/ipc/stream"
+ "v.io/core/veyron/runtimes/google/ipc/stream"
+ "v.io/core/veyron2/ipc"
"v.io/core/veyron2/naming"
"v.io/core/veyron2/security"
"v.io/core/veyron2/vlog"
@@ -600,7 +601,7 @@
listener := func(_, _ string) (net.Listener, error) {
return nil, fmt.Errorf("tn.Listen")
}
- stream.RegisterProtocol("tn", dialer, listener)
+ ipc.RegisterProtocol("tn", dialer, listener)
_, _, err := server.Listen("tnx", "127.0.0.1:0")
if err == nil || !strings.Contains(err.Error(), "unknown network tnx") {
@@ -617,7 +618,7 @@
return net.Listen("tcp", addr)
}
- if got, want := stream.RegisterProtocol("tn", dialer, listener), true; got != want {
+ if got, want := ipc.RegisterProtocol("tn", dialer, listener), true; got != want {
t.Errorf("got %t, want %t", got, want)
}
diff --git a/runtimes/google/ipc/stream/message/message.go b/runtimes/google/ipc/stream/message/message.go
index 88c8617..9746efb 100644
--- a/runtimes/google/ipc/stream/message/message.go
+++ b/runtimes/google/ipc/stream/message/message.go
@@ -1,6 +1,6 @@
// Package message provides data structures and serialization/deserialization
// methods for messages exchanged by the implementation of the
-// veyron2/ipc/stream interfaces in veyron/runtimes/google/ipc/stream.
+// veyron/runtimes/google/ipc/stream interfaces in veyron/runtimes/google/ipc/stream.
package message
// This file contains methods to read and write messages sent over the VIF.
diff --git a/runtimes/google/ipc/stream/model.go b/runtimes/google/ipc/stream/model.go
new file mode 100644
index 0000000..a7c4f08
--- /dev/null
+++ b/runtimes/google/ipc/stream/model.go
@@ -0,0 +1,133 @@
+package stream
+
+import (
+ "io"
+
+ "v.io/core/veyron2/naming"
+ "v.io/core/veyron2/security"
+)
+
+// Flow is the interface for a flow-controlled channel multiplexed on a Virtual
+// Circuit (VC) (and its underlying network connections).
+//
+// This allows for a single level of multiplexing and flow-control over
+// multiple concurrent streams (that may be used for RPCs) over multiple
+// VCs over a single underlying network connection.
+type Flow interface {
+ io.ReadWriteCloser
+
+ // LocalEndpoint returns the local veyron Endpoint
+ LocalEndpoint() naming.Endpoint
+ // RemoteEndpoint returns the remote veyron Endpoint
+ RemoteEndpoint() naming.Endpoint
+ // LocalPrincipal returns the Principal at the local end of the flow that has authenticated with the remote end.
+ LocalPrincipal() security.Principal
+ // LocalBlessings returns the blessings presented by the local end of the flow during authentication.
+ LocalBlessings() security.Blessings
+ // RemoteBlessings returns the blessings presented by the remote end of the flow during authentication.
+ RemoteBlessings() security.Blessings
+ // RemoteDischarges() returns the discharges presented by the remote end of the flow during authentication.
+ //
+ // The discharges are organized in a map keyed by the discharge-identifier.
+ RemoteDischarges() map[string]security.Discharge
+ // Cancel, like Close, closes the Flow but unlike Close discards any queued writes.
+ Cancel()
+ // Closed returns true if the flow has been closed or cancelled.
+ IsClosed() bool
+ // Closed returns a channel that remains open until the flow has been closed.
+ Closed() <-chan struct{}
+
+ // SetDeadline causes reads and writes to the flow to be
+ // cancelled when the given channel is closed.
+ SetDeadline(deadline <-chan struct{})
+
+ // VCDataCache returns the stream.VCDataCache object that allows information to be
+ // shared across the Flow's parent VC.
+ VCDataCache() VCDataCache
+}
+
+// VCDataCache is a thread-safe store that allows data to be shared across a VC,
+// with the intention of caching data that reappears over multiple flows.
+type VCDataCache interface {
+ // GetOrInsert returns the 'value' associated with 'key'. If an entry already exists in the
+ // cache with the 'key', the 'value' is returned, otherwise 'create' is called to create a new
+ // value N, the cache is updated, and N is returned. GetOrInsert may be called from
+ // multiple goroutines concurrently.
+ GetOrInsert(key interface{}, create func() interface{}) interface{}
+}
+
+// FlowOpt is the interface for all Flow options.
+type FlowOpt interface {
+ IPCStreamFlowOpt()
+}
+
+// Listener is the interface for accepting Flows created by a remote process.
+type Listener interface {
+ // Accept blocks until a new Flow has been initiated by a remote process.
+ // TODO(toddw): This should be:
+ // Accept() (Flow, Connector, error)
+ Accept() (Flow, error)
+
+ // Close prevents new Flows from being accepted on this Listener.
+ // Previously accepted Flows are not closed down.
+ Close() error
+}
+
+// ListenerOpt is the interface for all options that control the creation of a
+// Listener.
+type ListenerOpt interface {
+ IPCStreamListenerOpt()
+}
+
+// Connector is the interface for initiating Flows to a remote process over a
+// Virtual Circuit (VC).
+type Connector interface {
+ Connect(opts ...FlowOpt) (Flow, error)
+}
+
+// VC is the interface for creating authenticated and secure end-to-end
+// streams.
+//
+// VCs are multiplexed onto underlying network conections and can span
+// multiple hops. Authentication and encryption are end-to-end, even though
+// underlying network connections span a single hop.
+type VC interface {
+ Connector
+ Listen() (Listener, error)
+}
+
+// VCOpt is the interface for all VC options.
+type VCOpt interface {
+ IPCStreamVCOpt()
+}
+
+// Manager is the interface for managing the creation of VCs.
+type Manager interface {
+ // Listen creates a Listener that can be used to accept Flows initiated
+ // with the provided network address.
+ //
+ // For example:
+ // ln, ep, err := Listen("tcp", ":0")
+ // for {
+ // flow, err := ln.Accept()
+ // // process flow
+ // }
+ // can be used to accept Flows initiated by remote processes to the endpoint
+ // identified by the returned Endpoint.
+ Listen(protocol, address string, opts ...ListenerOpt) (Listener, naming.Endpoint, error)
+
+ // Dial creates a VC to the provided remote endpoint.
+ Dial(remote naming.Endpoint, opts ...VCOpt) (VC, error)
+
+ // ShutdownEndpoint closes all VCs (and Flows and Listeners over it)
+ // involving the provided remote endpoint.
+ ShutdownEndpoint(remote naming.Endpoint)
+
+ // Shutdown closes all VCs and Listeners (and Flows over them) and
+ // frees up internal data structures.
+ // The Manager is not usable after Shutdown has been called.
+ Shutdown()
+
+ // RoutingID returns the Routing ID associated with the VC.
+ RoutingID() naming.RoutingID
+}
diff --git a/runtimes/google/ipc/stream/proxy/proxy.go b/runtimes/google/ipc/stream/proxy/proxy.go
index 2b79f97..9f19a6c 100644
--- a/runtimes/google/ipc/stream/proxy/proxy.go
+++ b/runtimes/google/ipc/stream/proxy/proxy.go
@@ -6,7 +6,7 @@
"net"
"sync"
- "v.io/core/veyron2/ipc/stream"
+ "v.io/core/veyron2/ipc"
"v.io/core/veyron2/naming"
"v.io/core/veyron2/security"
"v.io/core/veyron2/verror"
@@ -140,7 +140,7 @@
// New creates a new Proxy that listens for network connections on the provided
// (network, address) pair and routes VC traffic between accepted connections.
func New(rid naming.RoutingID, principal security.Principal, network, address, pubAddress string) (*Proxy, error) {
- _, listenFn := stream.RegisteredProtocol(network)
+ _, listenFn := ipc.RegisteredProtocol(network)
if listenFn == nil {
return nil, fmt.Errorf("unknown network %s", network)
}
diff --git a/runtimes/google/ipc/stream/proxy/proxy_test.go b/runtimes/google/ipc/stream/proxy/proxy_test.go
index 9d23ab4..5469e25 100644
--- a/runtimes/google/ipc/stream/proxy/proxy_test.go
+++ b/runtimes/google/ipc/stream/proxy/proxy_test.go
@@ -9,7 +9,7 @@
"strings"
"testing"
- "v.io/core/veyron2/ipc/stream"
+ "v.io/core/veyron/runtimes/google/ipc/stream"
"v.io/core/veyron2/naming"
"v.io/core/veyron/lib/testutil"
diff --git a/runtimes/google/ipc/stream/vc/doc.go b/runtimes/google/ipc/stream/vc/doc.go
index 07a13dc..b364ef5 100644
--- a/runtimes/google/ipc/stream/vc/doc.go
+++ b/runtimes/google/ipc/stream/vc/doc.go
@@ -1,2 +1,2 @@
-// Package vc provides implementations of the VC and Flow interfaces in veyron2/ipc/stream.
+// Package vc provides implementations of the VC and Flow interfaces in veyron/runtimes/google/ipc/stream.
package vc
diff --git a/runtimes/google/ipc/stream/vc/flow.go b/runtimes/google/ipc/stream/vc/flow.go
index e13fd21..f2cd456 100644
--- a/runtimes/google/ipc/stream/vc/flow.go
+++ b/runtimes/google/ipc/stream/vc/flow.go
@@ -1,7 +1,7 @@
package vc
import (
- "v.io/core/veyron2/ipc/stream"
+ "v.io/core/veyron/runtimes/google/ipc/stream"
"v.io/core/veyron2/naming"
"v.io/core/veyron2/security"
)
diff --git a/runtimes/google/ipc/stream/vc/listener.go b/runtimes/google/ipc/stream/vc/listener.go
index bd04847..193bfa4 100644
--- a/runtimes/google/ipc/stream/vc/listener.go
+++ b/runtimes/google/ipc/stream/vc/listener.go
@@ -3,8 +3,8 @@
import (
"errors"
+ "v.io/core/veyron/runtimes/google/ipc/stream"
"v.io/core/veyron/runtimes/google/lib/upcqueue"
- "v.io/core/veyron2/ipc/stream"
)
var errListenerClosed = errors.New("Listener has been closed")
diff --git a/runtimes/google/ipc/stream/vc/listener_test.go b/runtimes/google/ipc/stream/vc/listener_test.go
index 2e21acc..1ebb9e7 100644
--- a/runtimes/google/ipc/stream/vc/listener_test.go
+++ b/runtimes/google/ipc/stream/vc/listener_test.go
@@ -3,7 +3,7 @@
import (
"testing"
- "v.io/core/veyron2/ipc/stream"
+ "v.io/core/veyron/runtimes/google/ipc/stream"
"v.io/core/veyron2/naming"
"v.io/core/veyron2/security"
)
diff --git a/runtimes/google/ipc/stream/vc/vc.go b/runtimes/google/ipc/stream/vc/vc.go
index 6fa78ab..a3d5bee 100644
--- a/runtimes/google/ipc/stream/vc/vc.go
+++ b/runtimes/google/ipc/stream/vc/vc.go
@@ -17,8 +17,8 @@
"v.io/core/veyron/runtimes/google/lib/iobuf"
vsync "v.io/core/veyron/runtimes/google/lib/sync"
+ "v.io/core/veyron/runtimes/google/ipc/stream"
"v.io/core/veyron2/context"
- "v.io/core/veyron2/ipc/stream"
"v.io/core/veyron2/ipc/version"
"v.io/core/veyron2/naming"
"v.io/core/veyron2/options"
diff --git a/runtimes/google/ipc/stream/vc/vc_test.go b/runtimes/google/ipc/stream/vc/vc_test.go
index a726e21..a8499bc 100644
--- a/runtimes/google/ipc/stream/vc/vc_test.go
+++ b/runtimes/google/ipc/stream/vc/vc_test.go
@@ -22,8 +22,8 @@
"v.io/core/veyron/runtimes/google/lib/bqueue/drrqueue"
"v.io/core/veyron/runtimes/google/lib/iobuf"
+ "v.io/core/veyron/runtimes/google/ipc/stream"
"v.io/core/veyron2/context"
- "v.io/core/veyron2/ipc/stream"
"v.io/core/veyron2/ipc/version"
"v.io/core/veyron2/naming"
"v.io/core/veyron2/options"
diff --git a/runtimes/google/ipc/stream/vif/auth.go b/runtimes/google/ipc/stream/vif/auth.go
index 312c3a6..5ab80e9 100644
--- a/runtimes/google/ipc/stream/vif/auth.go
+++ b/runtimes/google/ipc/stream/vif/auth.go
@@ -8,13 +8,13 @@
"golang.org/x/crypto/nacl/box"
+ "v.io/core/veyron/runtimes/google/ipc/stream"
"v.io/core/veyron/runtimes/google/ipc/stream/crypto"
"v.io/core/veyron/runtimes/google/ipc/stream/message"
"v.io/core/veyron/runtimes/google/ipc/stream/vc"
"v.io/core/veyron/runtimes/google/ipc/version"
"v.io/core/veyron/runtimes/google/lib/iobuf"
"v.io/core/veyron2/context"
- "v.io/core/veyron2/ipc/stream"
ipcversion "v.io/core/veyron2/ipc/version"
"v.io/core/veyron2/options"
"v.io/core/veyron2/security"
diff --git a/runtimes/google/ipc/stream/vif/doc.go b/runtimes/google/ipc/stream/vif/doc.go
index ab22ba5..2754eb2 100644
--- a/runtimes/google/ipc/stream/vif/doc.go
+++ b/runtimes/google/ipc/stream/vif/doc.go
@@ -1,4 +1,4 @@
// Package vif implements a virtual network interface that wraps over a
// net.Conn and provides the ability to Dial and Listen for virtual circuits
-// (veyron2/ipc/stream.VC)
+// (veyron/runtimes/google/ipc/stream.VC)
package vif
diff --git a/runtimes/google/ipc/stream/vif/vif.go b/runtimes/google/ipc/stream/vif/vif.go
index 1e5294e..2a7c22c 100644
--- a/runtimes/google/ipc/stream/vif/vif.go
+++ b/runtimes/google/ipc/stream/vif/vif.go
@@ -14,6 +14,7 @@
"sync"
"time"
+ "v.io/core/veyron/runtimes/google/ipc/stream"
"v.io/core/veyron/runtimes/google/ipc/stream/crypto"
"v.io/core/veyron/runtimes/google/ipc/stream/id"
"v.io/core/veyron/runtimes/google/ipc/stream/message"
@@ -25,7 +26,6 @@
"v.io/core/veyron/runtimes/google/lib/pcqueue"
vsync "v.io/core/veyron/runtimes/google/lib/sync"
"v.io/core/veyron/runtimes/google/lib/upcqueue"
- "v.io/core/veyron2/ipc/stream"
"v.io/core/veyron2/naming"
"v.io/core/veyron2/verror"
"v.io/core/veyron2/vlog"
diff --git a/runtimes/google/ipc/stream/vif/vif_test.go b/runtimes/google/ipc/stream/vif/vif_test.go
index 232f985..ed21c70 100644
--- a/runtimes/google/ipc/stream/vif/vif_test.go
+++ b/runtimes/google/ipc/stream/vif/vif_test.go
@@ -23,7 +23,7 @@
"v.io/core/veyron/runtimes/google/ipc/stream/vif"
iversion "v.io/core/veyron/runtimes/google/ipc/version"
- "v.io/core/veyron2/ipc/stream"
+ "v.io/core/veyron/runtimes/google/ipc/stream"
"v.io/core/veyron2/ipc/version"
"v.io/core/veyron2/naming"
)
diff --git a/runtimes/google/rt/runtime.go b/runtimes/google/rt/runtime.go
index 3614268..813345e 100644
--- a/runtimes/google/rt/runtime.go
+++ b/runtimes/google/rt/runtime.go
@@ -13,7 +13,6 @@
"v.io/core/veyron2/context"
"v.io/core/veyron2/i18n"
"v.io/core/veyron2/ipc"
- "v.io/core/veyron2/ipc/stream"
"v.io/core/veyron2/naming"
"v.io/core/veyron2/security"
"v.io/core/veyron2/verror2"
@@ -24,6 +23,7 @@
"v.io/core/veyron/lib/stats"
_ "v.io/core/veyron/lib/stats/sysstats"
iipc "v.io/core/veyron/runtimes/google/ipc"
+ "v.io/core/veyron/runtimes/google/ipc/stream"
imanager "v.io/core/veyron/runtimes/google/ipc/stream/manager"
"v.io/core/veyron/runtimes/google/ipc/stream/vc"
"v.io/core/veyron/runtimes/google/lib/dependency"
@@ -121,7 +121,7 @@
}
// Set the initial stream manager.
- ctx, _, err = r.setNewStreamManager(ctx)
+ ctx, err = r.setNewStreamManager(ctx)
if err != nil {
return nil, nil, nil, err
}
@@ -241,7 +241,7 @@
return server, nil
}
-func newStreamManager(opts ...stream.ManagerOpt) (stream.Manager, error) {
+func newStreamManager() (stream.Manager, error) {
rid, err := naming.NewRoutingID()
if err != nil {
return nil, err
@@ -250,32 +250,30 @@
return sm, nil
}
-func (r *Runtime) setNewStreamManager(ctx *context.T, opts ...stream.ManagerOpt) (*context.T, stream.Manager, error) {
- sm, err := newStreamManager(opts...)
+func (r *Runtime) setNewStreamManager(ctx *context.T) (*context.T, error) {
+ sm, err := newStreamManager()
+ if err != nil {
+ return nil, err
+ }
newctx := context.WithValue(ctx, streamManagerKey, sm)
if err = r.addChild(ctx, sm, sm.Shutdown); err != nil {
- return ctx, nil, err
+ return ctx, err
}
- return newctx, sm, err
+ return newctx, err
}
-func (r *Runtime) SetNewStreamManager(ctx *context.T, opts ...stream.ManagerOpt) (*context.T, stream.Manager, error) {
- newctx, sm, err := r.setNewStreamManager(ctx, opts...)
+func (r *Runtime) SetNewStreamManager(ctx *context.T) (*context.T, error) {
+ newctx, err := r.setNewStreamManager(ctx)
if err != nil {
- return ctx, nil, err
+ return ctx, err
}
// Create a new client since it depends on the stream manager.
newctx, _, err = r.SetNewClient(newctx)
if err != nil {
- return ctx, nil, err
+ return ctx, err
}
- return newctx, sm, nil
-}
-
-func (*Runtime) GetStreamManager(ctx *context.T) stream.Manager {
- cl, _ := ctx.Value(streamManagerKey).(stream.Manager)
- return cl
+ return newctx, nil
}
func (*Runtime) setPrincipal(ctx *context.T, principal security.Principal) *context.T {
@@ -292,7 +290,7 @@
newctx = r.setPrincipal(ctx, principal)
- if newctx, _, err = r.setNewStreamManager(newctx); err != nil {
+ if newctx, err = r.setNewStreamManager(newctx); err != nil {
return ctx, err
}
if newctx, _, err = r.setNewNamespace(newctx, r.GetNamespace(ctx).Roots()...); err != nil {
diff --git a/runtimes/google/rt/runtime_test.go b/runtimes/google/rt/runtime_test.go
index cb6b000..ca78baf 100644
--- a/runtimes/google/rt/runtime_test.go
+++ b/runtimes/google/rt/runtime_test.go
@@ -38,27 +38,6 @@
}
}
-func TestStreamManager(t *testing.T) {
- r, ctx, shutdown := InitForTest(t)
- defer shutdown()
-
- orig := r.GetStreamManager(ctx)
-
- c2, sm, err := r.SetNewStreamManager(ctx)
- if err != nil || sm == nil {
- t.Fatalf("Could not create stream manager: %v", err)
- }
- if !c2.Initialized() {
- t.Fatal("Got uninitialized context.")
- }
- if sm == orig {
- t.Fatal("Should have replaced the stream manager but didn't")
- }
- if sm != r.GetStreamManager(c2) {
- t.Fatal("The new stream manager should be attached to the context, but it isn't")
- }
-}
-
func TestPrincipal(t *testing.T) {
r, ctx, shutdown := InitForTest(t)
defer shutdown()
diff --git a/runtimes/google/vtrace/vtrace_test.go b/runtimes/google/vtrace/vtrace_test.go
index ebf7266..c376deb 100644
--- a/runtimes/google/vtrace/vtrace_test.go
+++ b/runtimes/google/vtrace/vtrace_test.go
@@ -8,7 +8,6 @@
"v.io/core/veyron2"
"v.io/core/veyron2/context"
"v.io/core/veyron2/ipc"
- "v.io/core/veyron2/ipc/stream"
"v.io/core/veyron2/naming"
"v.io/core/veyron2/security"
"v.io/core/veyron2/vlog"
@@ -17,6 +16,7 @@
"v.io/core/veyron/lib/testutil"
_ "v.io/core/veyron/profiles"
iipc "v.io/core/veyron/runtimes/google/ipc"
+ "v.io/core/veyron/runtimes/google/ipc/stream"
"v.io/core/veyron/runtimes/google/ipc/stream/manager"
tnaming "v.io/core/veyron/runtimes/google/testing/mocks/naming"
)
diff --git a/services/mgmt/device/impl/app_service.go b/services/mgmt/device/impl/app_service.go
index fc8bfc6..3bb85c8 100644
--- a/services/mgmt/device/impl/app_service.go
+++ b/services/mgmt/device/impl/app_service.go
@@ -481,7 +481,7 @@
func agentPrincipal(ctx *context.T, conn *os.File) (security.Principal, func(), error) {
agentctx, cancel := context.WithCancel(ctx)
var err error
- if agentctx, _, err = veyron2.SetNewStreamManager(agentctx); err != nil {
+ if agentctx, err = veyron2.SetNewStreamManager(agentctx); err != nil {
cancel()
conn.Close()
return nil, nil, err