blob: 6af8e88ebbaeb8ad6555920db8746a9f7e4a0746 [file] [log] [blame]
package rt
import (
"fmt"
"time"
_ "v.io/core/veyron/lib/stats/sysstats"
"v.io/core/veyron2"
"v.io/core/veyron2/config"
"v.io/core/veyron2/context"
"v.io/core/veyron2/ipc"
"v.io/core/veyron2/ipc/stream"
"v.io/core/veyron2/naming"
"v.io/core/veyron2/options"
"v.io/core/veyron2/security"
"v.io/core/veyron2/vlog"
"v.io/core/veyron2/vtrace"
//iipc "v.io/core/veyron/runtimes/google/ipc"
iipc "v.io/core/veyron/runtimes/google/ipc"
imanager "v.io/core/veyron/runtimes/google/ipc/stream/manager"
"v.io/core/veyron/runtimes/google/ipc/stream/vc"
inaming "v.io/core/veyron/runtimes/google/naming"
"v.io/core/veyron/runtimes/google/naming/namespace"
ivtrace "v.io/core/veyron/runtimes/google/vtrace"
)
type contextKey int
const (
streamManagerKey = contextKey(iota)
clientKey
namespaceKey
loggerKey
principalKey
vtraceKey
reservedNameKey
profileKey
appCycleKey
listenSpecKey
protocolsKey
publisherKey
)
func init() {
veyron2.RegisterRuntime("google", &RuntimeX{})
}
// initRuntimeXContext provides compatibility between Runtime and RuntimeX.
// It is only used during the transition between runtime and
// RuntimeX. It populates a context with all the subparts that the
// new interface expects to be present. In the future this work will
// be replaced by RuntimeX.Init()
// TODO(mattr): Remove this after the runtime->runtimex transistion.
func (rt *vrt) initRuntimeXContext(ctx *context.T) *context.T {
ctx = context.WithValue(ctx, streamManagerKey, rt.sm[0])
ctx = context.WithValue(ctx, clientKey, rt.client)
ctx = context.WithValue(ctx, namespaceKey, rt.ns)
ctx = context.WithValue(ctx, loggerKey, vlog.Log)
ctx = context.WithValue(ctx, principalKey, rt.principal)
ctx = context.WithValue(ctx, vtraceKey, rt.traceStore)
ctx = context.WithValue(ctx, publisherKey, rt.publisher)
ctx = context.WithValue(ctx, profileKey, rt.profile)
ctx = context.WithValue(ctx, appCycleKey, rt.ac)
return ctx
}
// RuntimeX implements the veyron2.RuntimeX interface. It is stateless.
// Please see the interface definition for documentation of the
// individiual methods.
type RuntimeX struct{}
// TODO(mattr): This function isn't used yet. We'll implement it later
// in the transition.
func (*RuntimeX) Init(ctx *context.T) *context.T {
// TODO(mattr): Here we need to do a bunch of one time init, like parsing flags
// and reading the credentials, init logging and verror, start an appcycle manager.
// TODO(mattr): Here we need to arrange for a long of one time cleanup
// when cancel is called. Dump vtrace, shotdown signalhandling, shutdownlogging,
// shutdown the appcyclemanager.
return nil
}
func (*RuntimeX) NewEndpoint(ep string) (naming.Endpoint, error) {
return inaming.NewEndpoint(ep)
}
func (r *RuntimeX) NewServer(ctx *context.T, opts ...ipc.ServerOpt) (ipc.Server, error) {
// Create a new RoutingID (and StreamManager) for each server.
_, sm, err := r.SetNewStreamManager(ctx)
if err != nil {
return nil, fmt.Errorf("failed to create ipc/stream/Manager: %v", err)
}
ns, _ := ctx.Value(namespaceKey).(naming.Namespace)
principal, _ := ctx.Value(principalKey).(security.Principal)
otherOpts := append([]ipc.ServerOpt{}, opts...)
otherOpts = append(otherOpts, vc.LocalPrincipal{principal})
if reserved, ok := ctx.Value(reservedNameKey).(*reservedNameDispatcher); ok {
otherOpts = append(otherOpts, options.ReservedNameDispatcher{reserved.dispatcher})
otherOpts = append(otherOpts, reserved.opts...)
}
// TODO(mattr): We used to get rt.preferredprotocols here, should we
// attach these to the context directly?
traceStore, _ := ctx.Value(vtraceKey).(*ivtrace.Store)
server, err := iipc.InternalNewServer(ctx, sm, ns, traceStore, otherOpts...)
if done := ctx.Done(); err == nil && done != nil {
// Arrange to clean up the server when the parent context is canceled.
// TODO(mattr): Should we actually do this? Or just have users clean
// their own servers up manually?
go func() {
<-done
server.Stop()
}()
}
return server, err
}
func (r *RuntimeX) setNewStreamManager(ctx *context.T, opts ...stream.ManagerOpt) (*context.T, stream.Manager, error) {
rid, err := naming.NewRoutingID()
if err != nil {
return ctx, nil, err
}
sm := imanager.InternalNew(rid)
ctx = context.WithValue(ctx, streamManagerKey, sm)
// Arrange for the manager to shut itself down when the context is canceled.
if done := ctx.Done(); done != nil {
go func() {
<-done
sm.Shutdown()
}()
}
return ctx, sm, nil
}
func (r *RuntimeX) SetNewStreamManager(ctx *context.T, opts ...stream.ManagerOpt) (*context.T, stream.Manager, error) {
newctx, sm, err := r.setNewStreamManager(ctx, opts...)
if err != nil {
return ctx, nil, err
}
// Create a new client since it depends on the stream manager.
newctx, _, err = r.SetNewClient(newctx)
if err != nil {
return ctx, nil, err
}
return newctx, sm, nil
}
func (*RuntimeX) GetStreamManager(ctx *context.T) stream.Manager {
cl, _ := ctx.Value(streamManagerKey).(stream.Manager)
return cl
}
func (r *RuntimeX) SetPrincipal(ctx *context.T, principal security.Principal) (*context.T, error) {
var err error
newctx := ctx
newctx = context.WithValue(newctx, principalKey, principal)
// TODO(mattr, suharshs): The stream manager holds a cache of vifs
// which were negotiated with the principal, so we replace it here when the
// principal changes. However we should negotiate the vif with a
// random principal and then we needn't replace this here.
if newctx, _, err = r.setNewStreamManager(newctx); err != nil {
return ctx, err
}
if newctx, _, err = r.setNewNamespace(newctx, r.GetNamespace(ctx).Roots()...); err != nil {
return ctx, err
}
if newctx, _, err = r.SetNewClient(newctx); err != nil {
return ctx, err
}
return newctx, nil
}
func (*RuntimeX) GetPrincipal(ctx *context.T) security.Principal {
p, _ := ctx.Value(principalKey).(security.Principal)
return p
}
func (*RuntimeX) SetNewClient(ctx *context.T, opts ...ipc.ClientOpt) (*context.T, ipc.Client, error) {
// TODO(mattr, suharshs): Currently there are a lot of things that can come in as opts.
// Some of them will be removed as opts and simply be pulled from the context instead
// these are:
// stream.Manager, Namespace, LocalPrincipal
sm, _ := ctx.Value(streamManagerKey).(stream.Manager)
ns, _ := ctx.Value(namespaceKey).(naming.Namespace)
p, _ := ctx.Value(principalKey).(security.Principal)
protocols, _ := ctx.Value(protocolsKey).([]string)
// TODO(mattr, suharshs): Some will need to ba accessible from the
// client so that we can replace the client transparantly:
// VCSecurityLevel, PreferredProtocols
// Currently we are ignoring these and the settings will be lost in some cases.
// We should try to retrieve them from the client currently attached to the context
// where possible.
otherOpts := append([]ipc.ClientOpt{}, opts...)
// Note we always add DialTimeout, so we don't have to worry about replicating the option.
otherOpts = append(otherOpts, vc.LocalPrincipal{p}, &imanager.DialTimeout{5 * time.Minute}, options.PreferredProtocols(protocols))
client, err := iipc.InternalNewClient(sm, ns, otherOpts...)
if err == nil {
ctx = context.WithValue(ctx, clientKey, client)
}
return ctx, client, err
}
func (*RuntimeX) GetClient(ctx *context.T) ipc.Client {
cl, _ := ctx.Value(clientKey).(ipc.Client)
return cl
}
func (*RuntimeX) SetNewSpan(ctx *context.T, name string) (*context.T, vtrace.Span) {
return ivtrace.WithNewSpan(ctx, name)
}
func (*RuntimeX) GetSpan(ctx *context.T) vtrace.Span {
return ivtrace.FromContext(ctx)
}
func (*RuntimeX) setNewNamespace(ctx *context.T, roots ...string) (*context.T, naming.Namespace, error) {
ns, err := namespace.New(roots...)
if err == nil {
ctx = context.WithValue(ctx, namespaceKey, ns)
}
return ctx, ns, err
}
func (r *RuntimeX) SetNewNamespace(ctx *context.T, roots ...string) (*context.T, naming.Namespace, error) {
newctx, ns, err := r.setNewNamespace(ctx, roots...)
if err != nil {
return ctx, nil, err
}
// Replace the client since it depends on the namespace.
newctx, _, err = r.SetNewClient(newctx)
if err != nil {
return ctx, nil, err
}
return newctx, ns, err
}
func (*RuntimeX) GetNamespace(ctx *context.T) naming.Namespace {
ns, _ := ctx.Value(namespaceKey).(naming.Namespace)
return ns
}
func (*RuntimeX) SetNewLogger(ctx *context.T, name string, opts ...vlog.LoggingOpts) (*context.T, vlog.Logger, error) {
logger, err := vlog.NewLogger(name, opts...)
if err == nil {
ctx = context.WithValue(ctx, loggerKey, logger)
}
return ctx, logger, err
}
func (*RuntimeX) GetLogger(ctx *context.T) vlog.Logger {
logger, _ := ctx.Value(loggerKey).(vlog.Logger)
return logger
}
func (*RuntimeX) GetVtraceStore(ctx *context.T) vtrace.Store {
traceStore, _ := ctx.Value(vtraceKey).(vtrace.Store)
return traceStore
}
type reservedNameDispatcher struct {
dispatcher ipc.Dispatcher
opts []ipc.ServerOpt
}
// TODO(mattr): Get this from the profile instead, then remove this
// method from the interface.
func (*RuntimeX) SetReservedNameDispatcher(ctx *context.T, server ipc.Dispatcher, opts ...ipc.ServerOpt) *context.T {
return context.WithValue(ctx, reservedNameKey, &reservedNameDispatcher{server, opts})
}
// SetProfile sets the profile used to create this runtime.
// TODO(suharshs, mattr): Determine if this is needed by functions after the new
// profile init function is in use. This will probably be easy to do because:
// Name is used in tests only.
// Platform is used for String representaions of a Profile.
// String is unused.
// Cleanup is used in rt.Cleanup and can probably be replaced by a cancelfunc returned
// by the new profile initialization function.
func (*RuntimeX) SetProfile(ctx *context.T, profile veyron2.Profile) *context.T {
return context.WithValue(ctx, profileKey, profile)
}
func (*RuntimeX) GetProfile(ctx *context.T) veyron2.Profile {
profile, _ := ctx.Value(profileKey).(veyron2.Profile)
return profile
}
func (*RuntimeX) SetAppCycle(ctx *context.T, appCycle veyron2.AppCycle) *context.T {
return context.WithValue(ctx, appCycleKey, appCycle)
}
func (*RuntimeX) GetAppCycle(ctx *context.T) veyron2.AppCycle {
appCycle, _ := ctx.Value(appCycleKey).(veyron2.AppCycle)
return appCycle
}
func (*RuntimeX) SetListenSpec(ctx *context.T, listenSpec ipc.ListenSpec) *context.T {
return context.WithValue(ctx, listenSpecKey, listenSpec)
}
func (*RuntimeX) GetListenSpec(ctx *context.T) ipc.ListenSpec {
listenSpec, _ := ctx.Value(listenSpecKey).(ipc.ListenSpec)
return listenSpec
}
func (*RuntimeX) SetPreferredProtocols(ctx *context.T, protocols []string) *context.T {
return context.WithValue(ctx, protocolsKey, protocols)
}
// GetPublisher returns a configuration Publisher that can be used to access
// configuration information.
func (*RuntimeX) GetPublisher(ctx *context.T) *config.Publisher {
publisher, _ := ctx.Value(publisherKey).(*config.Publisher)
return publisher
}