veyron2: Add a RuntimeX interface and its implementation.
Change-Id: I633c66f6466334fad68aa66c2bbc5cb6f714765e
diff --git a/runtimes/google/rt/ipc.go b/runtimes/google/rt/ipc.go
index 6e4439f..5c2d452 100644
--- a/runtimes/google/rt/ipc.go
+++ b/runtimes/google/rt/ipc.go
@@ -53,7 +53,7 @@
forceCollect := sr > 0.0 && (sr >= 1.0 || rand.Float64() < sr)
ctx, _ = ivtrace.WithNewRootSpan(ctx, rt.traceStore, forceCollect)
- return ctx
+ return rt.initRuntimeXContext(ctx)
}
func (rt *vrt) WithNewSpan(ctx *context.T, name string) (*context.T, vtrace.Span) {
diff --git a/runtimes/google/rt/runtimex.go b/runtimes/google/rt/runtimex.go
new file mode 100644
index 0000000..fa0d4d3
--- /dev/null
+++ b/runtimes/google/rt/runtimex.go
@@ -0,0 +1,274 @@
+package rt
+
+import (
+ "fmt"
+ "time"
+
+ _ "v.io/core/veyron/lib/stats/sysstats"
+ "v.io/core/veyron2"
+ "v.io/core/veyron2/context"
+ "v.io/core/veyron2/ipc"
+ "v.io/core/veyron2/ipc/stream"
+ "v.io/core/veyron2/naming"
+ "v.io/core/veyron2/options"
+ "v.io/core/veyron2/security"
+ "v.io/core/veyron2/vlog"
+ "v.io/core/veyron2/vtrace"
+
+ //iipc "v.io/core/veyron/runtimes/google/ipc"
+ iipc "v.io/core/veyron/runtimes/google/ipc"
+ imanager "v.io/core/veyron/runtimes/google/ipc/stream/manager"
+ "v.io/core/veyron/runtimes/google/ipc/stream/vc"
+ inaming "v.io/core/veyron/runtimes/google/naming"
+ "v.io/core/veyron/runtimes/google/naming/namespace"
+ ivtrace "v.io/core/veyron/runtimes/google/vtrace"
+)
+
+type contextKey int
+
+const (
+ streamManagerKey = contextKey(iota)
+ clientKey
+ namespaceKey
+ loggerKey
+ principalKey
+ vtraceKey
+ reservedNameKey
+)
+
+func init() {
+ veyron2.RegisterRuntime("google", &RuntimeX{})
+}
+
+// initRuntimeXContext provides compatibility between Runtime and RuntimeX.
+// It is only used during the transition between runtime and
+// RuntimeX. It populates a context with all the subparts that the
+// new interface expects to be present. In the future this work will
+// be replaced by RuntimeX.Init()
+// TODO(mattr): Remove this after the runtime->runtimex transistion.
+func (rt *vrt) initRuntimeXContext(ctx *context.T) *context.T {
+ ctx = context.WithValue(ctx, streamManagerKey, rt.sm[0])
+ ctx = context.WithValue(ctx, clientKey, rt.client)
+ ctx = context.WithValue(ctx, namespaceKey, rt.ns)
+ ctx = context.WithValue(ctx, loggerKey, vlog.Log)
+ ctx = context.WithValue(ctx, principalKey, rt.principal)
+ ctx = context.WithValue(ctx, vtraceKey, rt.traceStore)
+ return ctx
+}
+
+// RuntimeX implements the veyron2.RuntimeX interface. It is stateless.
+// Please see the interface definition for documentation of the
+// individiual methods.
+type RuntimeX struct{}
+
+// TODO(mattr): This function isn't used yet. We'll implement it later
+// in the transition.
+func (*RuntimeX) Init(ctx *context.T) (*context.T, context.CancelFunc) {
+ // TODO(mattr): Here we need to do a bunch of one time init, like parsing flags
+ // and reading the credentials, init logging and verror, start an appcycle manager.
+ // TODO(mattr): Here we need to arrange for a long of one time cleanup
+ // when cancel is called. Dump vtrace, shotdown signalhandling, shutdownlogging,
+ // shutdown the appcyclemanager.
+ return nil, nil
+}
+
+func (*RuntimeX) NewEndpoint(ep string) (naming.Endpoint, error) {
+ return inaming.NewEndpoint(ep)
+}
+
+func (r *RuntimeX) NewServer(ctx *context.T, opts ...ipc.ServerOpt) (ipc.Server, error) {
+ // Create a new RoutingID (and StreamManager) for each server.
+ _, sm, err := r.SetNewStreamManager(ctx)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create ipc/stream/Manager: %v", err)
+ }
+
+ ns, _ := ctx.Value(namespaceKey).(naming.Namespace)
+ principal, _ := ctx.Value(principalKey).(security.Principal)
+
+ otherOpts := append([]ipc.ServerOpt{}, opts...)
+ otherOpts = append(otherOpts, vc.LocalPrincipal{principal})
+ if reserved, ok := ctx.Value(reservedNameKey).(*reservedNameDispatcher); ok {
+ otherOpts = append(otherOpts, options.ReservedNameDispatcher{reserved.dispatcher})
+ otherOpts = append(otherOpts, reserved.opts...)
+ }
+ // TODO(mattr): We used to get rt.preferredprotocols here, should we
+ // attach these to the context directly?
+
+ traceStore, _ := ctx.Value(vtraceKey).(*ivtrace.Store)
+ server, err := iipc.InternalNewServer(ctx, sm, ns, traceStore, otherOpts...)
+ if done := ctx.Done(); err == nil && done != nil {
+ // Arrange to clean up the server when the parent context is canceled.
+ // TODO(mattr): Should we actually do this? Or just have users clean
+ // their own servers up manually?
+ go func() {
+ <-done
+ server.Stop()
+ }()
+ }
+ return server, err
+}
+
+func (r *RuntimeX) setNewStreamManager(ctx *context.T, opts ...stream.ManagerOpt) (*context.T, stream.Manager, error) {
+ rid, err := naming.NewRoutingID()
+ if err != nil {
+ return ctx, nil, err
+ }
+ sm := imanager.InternalNew(rid)
+ ctx = context.WithValue(ctx, streamManagerKey, sm)
+
+ // Arrange for the manager to shut itself down when the context is canceled.
+ if done := ctx.Done(); done != nil {
+ go func() {
+ <-done
+ sm.Shutdown()
+ }()
+ }
+
+ return ctx, sm, nil
+}
+
+func (r *RuntimeX) SetNewStreamManager(ctx *context.T, opts ...stream.ManagerOpt) (*context.T, stream.Manager, error) {
+ newctx, sm, err := r.setNewStreamManager(ctx, opts...)
+ if err != nil {
+ return ctx, nil, err
+ }
+
+ // Create a new client since it depends on the stream manager.
+ newctx, _, err = r.SetNewClient(newctx)
+ if err != nil {
+ return ctx, nil, err
+ }
+
+ return newctx, sm, nil
+}
+
+func (*RuntimeX) StreamManager(ctx *context.T) stream.Manager {
+ cl, _ := ctx.Value(streamManagerKey).(stream.Manager)
+ return cl
+}
+
+func (r *RuntimeX) SetPrincipal(ctx *context.T, principal security.Principal) (*context.T, error) {
+ var err error
+ newctx := ctx
+
+ newctx = context.WithValue(newctx, principalKey, principal)
+
+ // TODO(mattr, suharshs): The stream manager holds a cache of vifs
+ // which were negotiated with the principal, so we replace it here when the
+ // principal changes. However we should negotiate the vif with a
+ // random principal and then we needn't replace this here.
+ if newctx, _, err = r.setNewStreamManager(newctx); err != nil {
+ return ctx, err
+ }
+ if newctx, _, err = r.setNewNamespace(newctx, r.Namespace(ctx).Roots()...); err != nil {
+ return ctx, err
+ }
+ if newctx, _, err = r.SetNewClient(newctx); err != nil {
+ return ctx, err
+ }
+
+ return newctx, nil
+}
+
+func (*RuntimeX) Principal(ctx *context.T) security.Principal {
+ p, _ := ctx.Value(principalKey).(security.Principal)
+ return p
+}
+
+func (*RuntimeX) SetNewClient(ctx *context.T, opts ...ipc.ClientOpt) (*context.T, ipc.Client, error) {
+ // TODO(mattr, suharshs): Currently there are a lot of things that can come in as opts.
+ // Some of them will be removed as opts and simply be pulled from the context instead
+ // these are:
+ // stream.Manager, Namespace, LocalPrincipal
+ sm, _ := ctx.Value(streamManagerKey).(stream.Manager)
+ ns, _ := ctx.Value(namespaceKey).(naming.Namespace)
+ p, _ := ctx.Value(principalKey).(security.Principal)
+
+ // TODO(mattr, suharshs): Some will need to ba accessible from the
+ // client so that we can replace the client transparantly:
+ // VCSecurityLevel, PreferredProtocols
+ // Currently we are ignoring these and the settings will be lost in some cases.
+ // We should try to retrieve them from the client currently attached to the context
+ // where possible.
+ otherOpts := append([]ipc.ClientOpt{}, opts...)
+
+ // Note we always add DialTimeout, so we don't have to worry about replicating the option.
+ otherOpts = append(otherOpts, vc.LocalPrincipal{p}, &imanager.DialTimeout{5 * time.Minute})
+
+ client, err := iipc.InternalNewClient(sm, ns, otherOpts...)
+ if err == nil {
+ ctx = context.WithValue(ctx, clientKey, client)
+ }
+ return ctx, client, err
+}
+
+func (*RuntimeX) Client(ctx *context.T) ipc.Client {
+ cl, _ := ctx.Value(clientKey).(ipc.Client)
+ return cl
+}
+
+func (*RuntimeX) SetNewSpan(ctx *context.T, name string) (*context.T, vtrace.Span) {
+ return ivtrace.WithNewSpan(ctx, name)
+}
+
+func (*RuntimeX) Span(ctx *context.T) vtrace.Span {
+ return ivtrace.FromContext(ctx)
+}
+
+func (*RuntimeX) setNewNamespace(ctx *context.T, roots ...string) (*context.T, naming.Namespace, error) {
+ ns, err := namespace.New(roots...)
+ if err == nil {
+ ctx = context.WithValue(ctx, namespaceKey, ns)
+ }
+ return ctx, ns, err
+}
+
+func (r *RuntimeX) SetNewNamespace(ctx *context.T, roots ...string) (*context.T, naming.Namespace, error) {
+ newctx, ns, err := r.setNewNamespace(ctx, roots...)
+ if err != nil {
+ return ctx, nil, err
+ }
+
+ // Replace the client since it depends on the namespace.
+ newctx, _, err = r.SetNewClient(newctx)
+ if err != nil {
+ return ctx, nil, err
+ }
+
+ return newctx, ns, err
+}
+
+func (*RuntimeX) Namespace(ctx *context.T) naming.Namespace {
+ ns, _ := ctx.Value(namespaceKey).(naming.Namespace)
+ return ns
+}
+
+func (*RuntimeX) SetNewLogger(ctx *context.T, name string, opts ...vlog.LoggingOpts) (*context.T, vlog.Logger, error) {
+ logger, err := vlog.NewLogger(name, opts...)
+ if err == nil {
+ ctx = context.WithValue(ctx, loggerKey, logger)
+ }
+ return ctx, logger, err
+}
+
+func (*RuntimeX) Logger(ctx *context.T) vlog.Logger {
+ logger, _ := ctx.Value(loggerKey).(vlog.Logger)
+ return logger
+}
+
+func (*RuntimeX) VtraceStore(ctx *context.T) vtrace.Store {
+ traceStore, _ := ctx.Value(vtraceKey).(vtrace.Store)
+ return traceStore
+}
+
+type reservedNameDispatcher struct {
+ dispatcher ipc.Dispatcher
+ opts []ipc.ServerOpt
+}
+
+// TODO(mattr): Get this from the profile instead, then remove this
+// method from the interface.
+func (*RuntimeX) SetReservedNameDispatcher(ctx *context.T, server ipc.Dispatcher, opts ...ipc.ServerOpt) *context.T {
+ return context.WithValue(ctx, reservedNameKey, &reservedNameDispatcher{server, opts})
+}
diff --git a/runtimes/google/rt/runtimex_test.go b/runtimes/google/rt/runtimex_test.go
new file mode 100644
index 0000000..da6e13c
--- /dev/null
+++ b/runtimes/google/rt/runtimex_test.go
@@ -0,0 +1,130 @@
+package rt_test
+
+import (
+ "testing"
+
+ "v.io/core/veyron2/context"
+
+ "v.io/core/veyron/runtimes/google/rt"
+ "v.io/core/veyron/security"
+)
+
+// InitForTest creates a context for use in a test.
+// TODO(mattr): We should call runtimeX.Init once that is implemented.
+func InitForTest(t *testing.T) (*context.T, context.CancelFunc) {
+ rt, err := rt.New(profileOpt)
+ if err != nil {
+ t.Fatalf("Could not create runtime: %v", err)
+ }
+ ctx, cancel := context.WithCancel(rt.NewContext())
+ go func() {
+ <-ctx.Done()
+ rt.Cleanup()
+ }()
+ return ctx, cancel
+}
+
+func TestNewServer(t *testing.T) {
+ ctx, cancel := InitForTest(t)
+ defer cancel()
+
+ r := &rt.RuntimeX{}
+ if s, err := r.NewServer(ctx); err != nil || s == nil {
+ t.Fatalf("Could not create server: %v", err)
+ }
+}
+
+func TestStreamManager(t *testing.T) {
+ ctx, cancel := InitForTest(t)
+ defer cancel()
+
+ r := &rt.RuntimeX{}
+ orig := r.StreamManager(ctx)
+
+ c2, sm, err := r.SetNewStreamManager(ctx)
+ if err != nil || sm == nil {
+ t.Fatalf("Could not create stream manager: %v", err)
+ }
+ if !c2.Initialized() {
+ t.Fatal("Got uninitialized context.")
+ }
+ if sm == orig {
+ t.Fatal("Should have replaced the stream manager but didn't")
+ }
+ if sm != r.StreamManager(c2) {
+ t.Fatal("The new stream manager should be attached to the context, but it isn't")
+ }
+}
+
+func TestPrincipal(t *testing.T) {
+ ctx, cancel := InitForTest(t)
+ defer cancel()
+
+ r := &rt.RuntimeX{}
+
+ p2, err := security.NewPrincipal()
+ if err != nil {
+ t.Fatalf("Could not create new principal %v", err)
+ }
+ c2, err := r.SetPrincipal(ctx, p2)
+ if err != nil {
+ t.Fatalf("Could not attach principal: %v", err)
+ }
+ if !c2.Initialized() {
+ t.Fatal("Got uninitialized context.")
+ }
+ if p2 != r.Principal(c2) {
+ t.Fatal("The new principal should be attached to the context, but it isn't")
+ }
+}
+
+func TestClient(t *testing.T) {
+ ctx, cancel := InitForTest(t)
+ defer cancel()
+
+ r := &rt.RuntimeX{}
+ orig := r.Client(ctx)
+
+ c2, client, err := r.SetNewClient(ctx)
+ if err != nil || client == nil {
+ t.Fatalf("Could not create client: %v", err)
+ }
+ if !c2.Initialized() {
+ t.Fatal("Got uninitialized context.")
+ }
+ if client == orig {
+ t.Fatal("Should have replaced the client but didn't")
+ }
+ if client != r.Client(c2) {
+ t.Fatal("The new client should be attached to the context, but it isn't")
+ }
+}
+
+func TestNamespace(t *testing.T) {
+ ctx, cancel := InitForTest(t)
+ defer cancel()
+
+ r := &rt.RuntimeX{}
+ orig := r.Namespace(ctx)
+
+ newroots := []string{"/newroot1", "/newroot2"}
+ c2, ns, err := r.SetNewNamespace(ctx, newroots...)
+ if err != nil || ns == nil {
+ t.Fatalf("Could not create namespace: %v", err)
+ }
+ if !c2.Initialized() {
+ t.Fatal("Got uninitialized context.")
+ }
+ if ns == orig {
+ t.Fatal("Should have replaced the namespace but didn't")
+ }
+ if ns != r.Namespace(c2) {
+ t.Fatal("The new namespace should be attached to the context, but it isn't")
+ }
+ newrootmap := map[string]bool{"/newroot1": true, "/newroot2": true}
+ for _, root := range ns.Roots() {
+ if !newrootmap[root] {
+ t.Errorf("root %s found in ns, but we expected: %v", root, newroots)
+ }
+ }
+}