blob: d28fcd37bcd610f31c790082cdff89cd4f29a756 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package rt
import (
"fmt"
"os"
"os/signal"
"path/filepath"
"strings"
"sync/atomic"
"syscall"
"time"
"v.io/x/lib/metadata"
"v.io/v23"
"v.io/v23/context"
"v.io/v23/discovery"
"v.io/v23/flow"
"v.io/v23/i18n"
"v.io/v23/namespace"
"v.io/v23/naming"
"v.io/v23/rpc"
"v.io/v23/security"
"v.io/v23/verror"
"v.io/v23/vtrace"
"v.io/x/ref/internal/logger"
"v.io/x/ref/lib/apilog"
idiscovery "v.io/x/ref/lib/discovery"
"v.io/x/ref/lib/flags"
"v.io/x/ref/lib/pubsub"
"v.io/x/ref/lib/stats"
_ "v.io/x/ref/lib/stats/sysstats"
"v.io/x/ref/runtime/internal/flow/manager"
"v.io/x/ref/runtime/internal/lib/dependency"
inamespace "v.io/x/ref/runtime/internal/naming/namespace"
irpc "v.io/x/ref/runtime/internal/rpc"
ivtrace "v.io/x/ref/runtime/internal/vtrace"
)
type contextKey int
const (
clientKey = contextKey(iota)
namespaceKey
principalKey
backgroundKey
reservedNameKey
listenKey
// initKey is used to store values that are only set at init time.
initKey
)
func init() {
metadata.Insert("v23.RPCEndpointVersion", fmt.Sprint(naming.DefaultEndpointVersion))
}
var (
errDiscoveryNotInitialized = verror.Register(pkgPath+".errDiscoveryNotInitialized", verror.NoRetry, "{1:}{2:} discovery not initialized")
)
var setPrincipalCounter int32 = -1
type initData struct {
appCycle v23.AppCycle
discoveryFactory idiscovery.Factory
namespaceFactory inamespace.Factory
protocols []string
settingsPublisher *pubsub.Publisher
connIdleExpiry time.Duration
}
type vtraceDependency struct{}
// Runtime implements the v23.Runtime interface.
// Please see the interface definition for documentation of the
// individiual methods.
type Runtime struct {
ctx *context.T
deps *dependency.Graph
}
func Init(
ctx *context.T,
appCycle v23.AppCycle,
discoveryFactory idiscovery.Factory,
namespaceFactory inamespace.Factory,
protocols []string,
listenSpec *rpc.ListenSpec,
settingsPublisher *pubsub.Publisher,
flags flags.RuntimeFlags,
reservedDispatcher rpc.Dispatcher,
connIdleExpiry time.Duration) (*Runtime, *context.T, v23.Shutdown, error) {
r := &Runtime{deps: dependency.NewGraph()}
ctx = context.WithValue(ctx, initKey, &initData{
appCycle: appCycle,
discoveryFactory: discoveryFactory,
namespaceFactory: namespaceFactory,
protocols: protocols,
settingsPublisher: settingsPublisher,
connIdleExpiry: connIdleExpiry,
})
if listenSpec != nil {
ctx = context.WithValue(ctx, listenKey, listenSpec.Copy())
}
if reservedDispatcher != nil {
ctx = context.WithValue(ctx, reservedNameKey, reservedDispatcher)
}
// Configure the context to use the global logger.
ctx = context.WithLogger(ctx, logger.Global())
// We want to print out metadata only into the log files, to avoid
// spamming stderr, see #1246.
//
// TODO(caprita): We should add it to the log file header information;
// since that requires changes to the llog and vlog packages, for now we
// condition printing of metadata on having specified an explicit
// log_dir for the program. It's a hack, but it gets us the metadata
// to device manager-run apps and avoids it for command-lines, which is
// a good enough approximation.
if logger.Manager(ctx).LogDir() != os.TempDir() {
ctx.Infof(metadata.ToXML())
}
// Setup the initial trace.
ctx, err := ivtrace.Init(ctx, flags.Vtrace)
if err != nil {
return nil, nil, nil, err
}
ctx, _ = vtrace.WithNewTrace(ctx)
r.addChild(ctx, vtraceDependency{}, func() {
vtrace.FormatTraces(os.Stderr, vtrace.GetStore(ctx).TraceRecords(), nil)
})
ctx = context.WithContextLogger(ctx, &ivtrace.VTraceLogger{})
// Setup i18n.
ctx = i18n.WithLangID(ctx, i18n.LangIDFromEnv())
if len(flags.I18nCatalogue) != 0 {
cat := i18n.Cat()
for _, filename := range strings.Split(flags.I18nCatalogue, ",") {
err := cat.MergeFromFile(filename)
if err != nil {
fmt.Fprintf(os.Stderr, "%s: i18n: error reading i18n catalogue file %q: %s\n", os.Args[0], filename, err)
}
}
}
// Setup the program name.
ctx = verror.WithComponentName(ctx, filepath.Base(os.Args[0]))
// Enable signal handling.
r.initSignalHandling(ctx)
// Set the initial namespace.
ctx, _, err = r.setNewNamespace(ctx, flags.NamespaceRoots...)
if err != nil {
return nil, nil, nil, err
}
// Create and set the principal
principal, shutdown, err := r.initPrincipal(ctx, flags.Credentials)
if err != nil {
return nil, nil, nil, err
}
ctx, err = r.setPrincipal(ctx, principal, shutdown)
if err != nil {
return nil, nil, nil, err
}
// Add the Client to the context.
ctx, _, err = r.WithNewClient(ctx)
if err != nil {
return nil, nil, nil, err
}
r.ctx = ctx
return r, r.WithBackgroundContext(ctx), r.shutdown, nil
}
func (r *Runtime) addChild(ctx *context.T, me interface{}, stop func(), dependsOn ...interface{}) error {
if err := r.deps.Depend(me, dependsOn...); err != nil {
stop()
return err
} else if done := ctx.Done(); done != nil {
go func() {
<-done
finish := r.deps.CloseAndWait(me)
stop()
finish()
}()
}
return nil
}
func (r *Runtime) Init(ctx *context.T) error {
defer apilog.LogCall(ctx)(ctx) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
return r.initMgmt(ctx)
}
func (r *Runtime) shutdown() {
r.deps.CloseAndWaitForAll()
r.ctx.FlushLog()
}
func (r *Runtime) initSignalHandling(ctx *context.T) {
// TODO(caprita): Given that our device manager implementation is to
// kill all child apps when the device manager dies, we should
// enable SIGHUP on apps by default.
// Automatically handle SIGHUP to prevent applications started as
// daemons from being killed. The developer can choose to still listen
// on SIGHUP and take a different action if desired.
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGHUP)
go func() {
for {
sig, ok := <-signals
if !ok {
break
}
ctx.Infof("Received signal %v", sig)
}
}()
r.addChild(ctx, signals, func() {
signal.Stop(signals)
close(signals)
})
}
func (r *Runtime) setPrincipal(ctx *context.T, principal security.Principal, shutdown func()) (*context.T, error) {
stop := shutdown
if principal != nil {
// Uniquely identify blessingstore and blessingroots with
// security/principal/<publicKey>/(blessingstore|blessingroots)/<counter>.
// Make sure to stop exporting the stats when the context dies.
var (
counter = atomic.AddInt32(&setPrincipalCounter, 1)
prefix = "security/principal/" + principal.PublicKey().String()
store = fmt.Sprintf("%s/blessingstore/%d", prefix, counter)
roots = fmt.Sprintf("%s/blessingroots/%d", prefix, counter)
)
stats.NewStringFunc(store, principal.BlessingStore().DebugString)
stats.NewStringFunc(roots, principal.Roots().DebugString)
stop = func() {
if shutdown != nil {
shutdown()
}
stats.Delete(store)
stats.Delete(roots)
}
}
ctx = context.WithValue(ctx, principalKey, principal)
return ctx, r.addChild(ctx, principal, stop)
}
func (r *Runtime) WithPrincipal(ctx *context.T, principal security.Principal) (*context.T, error) {
defer apilog.LogCallf(ctx, "principal=%v", principal)(ctx, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
var err error
newctx := ctx
// TODO(mattr, suharshs): If there user gives us some principal that has dependencies
// we don't know about, we will not honour those dependencies during shutdown.
// For example if they create an agent principal with some client, we don't know
// about that, so servers based of this new principal will not prevent the client
// from terminating early.
if newctx, err = r.setPrincipal(ctx, principal, func() {}); err != nil {
return ctx, err
}
if newctx, _, err = r.setNewNamespace(newctx, r.GetNamespace(ctx).Roots()...); err != nil {
return ctx, err
}
if newctx, _, err = r.WithNewClient(newctx); err != nil {
return ctx, err
}
return newctx, nil
}
func (*Runtime) GetPrincipal(ctx *context.T) security.Principal {
// nologcall
p, _ := ctx.Value(principalKey).(security.Principal)
return p
}
func (r *Runtime) WithNewClient(ctx *context.T, opts ...rpc.ClientOpt) (*context.T, rpc.Client, error) {
defer apilog.LogCallf(ctx, "opts...=%v", opts)(ctx, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
otherOpts := append([]rpc.ClientOpt{}, opts...)
p, _ := ctx.Value(principalKey).(security.Principal)
id, _ := ctx.Value(initKey).(*initData)
if id.protocols != nil {
otherOpts = append(otherOpts, irpc.PreferredProtocols(id.protocols))
}
if id.connIdleExpiry > 0 {
otherOpts = append(otherOpts, irpc.IdleConnectionExpiry(id.connIdleExpiry))
}
deps := []interface{}{vtraceDependency{}}
client := irpc.NewClient(ctx, otherOpts...)
newctx := context.WithValue(ctx, clientKey, client)
if p != nil {
deps = append(deps, p)
}
if err := r.addChild(ctx, client, client.Close, deps...); err != nil {
return ctx, nil, err
}
return newctx, client, nil
}
func (*Runtime) GetClient(ctx *context.T) rpc.Client {
// nologcall
cl, _ := ctx.Value(clientKey).(rpc.Client)
return cl
}
func (r *Runtime) setNewNamespace(ctx *context.T, roots ...string) (*context.T, namespace.T, error) {
id, _ := ctx.Value(initKey).(*initData)
var ns namespace.T
var err error
if ns, err = inamespace.New(roots...); err != nil {
return nil, nil, err
}
if id.namespaceFactory != nil {
if ns, err = id.namespaceFactory(ctx, ns, roots...); err != nil {
return nil, nil, err
}
}
if oldNS := r.GetNamespace(ctx); oldNS != nil {
ns.CacheCtl(oldNS.CacheCtl()...)
}
ctx = context.WithValue(ctx, namespaceKey, ns)
return ctx, ns, err
}
func (r *Runtime) WithNewNamespace(ctx *context.T, roots ...string) (*context.T, namespace.T, error) {
defer apilog.LogCallf(ctx, "roots...=%v", roots)(ctx, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
newctx, ns, err := r.setNewNamespace(ctx, roots...)
if err != nil {
return ctx, nil, err
}
// Replace the client since it depends on the namespace.
newctx, _, err = r.WithNewClient(newctx)
if err != nil {
return ctx, nil, err
}
return newctx, ns, err
}
func (*Runtime) GetNamespace(ctx *context.T) namespace.T {
// nologcall
ns, _ := ctx.Value(namespaceKey).(namespace.T)
return ns
}
func (*Runtime) GetAppCycle(ctx *context.T) v23.AppCycle {
// nologcall
id, _ := ctx.Value(initKey).(*initData)
return id.appCycle
}
func (*Runtime) GetListenSpec(ctx *context.T) rpc.ListenSpec {
// nologcall
ls, _ := ctx.Value(listenKey).(rpc.ListenSpec)
return ls
}
func (*Runtime) WithListenSpec(ctx *context.T, ls rpc.ListenSpec) *context.T {
defer apilog.LogCall(ctx)(ctx) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
return context.WithValue(ctx, listenKey, ls.Copy())
}
func (*Runtime) WithBackgroundContext(ctx *context.T) *context.T {
defer apilog.LogCall(ctx)(ctx) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
// Note we add an extra context with a nil value here.
// This prevents users from travelling back through the
// chain of background contexts.
ctx = context.WithValue(ctx, backgroundKey, nil)
return context.WithValue(ctx, backgroundKey, ctx)
}
func (*Runtime) GetBackgroundContext(ctx *context.T) *context.T {
// nologcall
bctx, _ := ctx.Value(backgroundKey).(*context.T)
if bctx == nil {
// There should always be a background context. If we don't find
// it, that means that the user passed us the background context
// in hopes of following the chain. Instead we just give them
// back what they sent in, which is correct.
return ctx
}
return bctx
}
func (*Runtime) NewDiscovery(ctx *context.T) (discovery.T, error) {
// nologcall
id, _ := ctx.Value(initKey).(*initData)
if id.discoveryFactory != nil {
return id.discoveryFactory.New(ctx)
}
return nil, verror.New(errDiscoveryNotInitialized, ctx)
}
func (*Runtime) WithReservedNameDispatcher(ctx *context.T, d rpc.Dispatcher) *context.T {
defer apilog.LogCall(ctx)(ctx) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
return context.WithValue(ctx, reservedNameKey, d)
}
func (*Runtime) GetReservedNameDispatcher(ctx *context.T) rpc.Dispatcher {
// nologcall
if d, ok := ctx.Value(reservedNameKey).(rpc.Dispatcher); ok {
return d
}
return nil
}
func (r *Runtime) NewFlowManager(ctx *context.T, channelTimeout time.Duration) (flow.Manager, error) {
defer apilog.LogCall(ctx)(ctx) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
rid, err := naming.NewRoutingID()
if err != nil {
return nil, err
}
id, _ := ctx.Value(initKey).(*initData)
return manager.New(ctx, rid, id.settingsPublisher, channelTimeout, id.connIdleExpiry, nil), nil
}
func (r *Runtime) commonServerInit(ctx *context.T, opts ...rpc.ServerOpt) (*pubsub.Publisher, []rpc.ServerOpt, error) {
otherOpts := append([]rpc.ServerOpt{}, opts...)
if reservedDispatcher := r.GetReservedNameDispatcher(ctx); reservedDispatcher != nil {
otherOpts = append(otherOpts, irpc.ReservedNameDispatcher{
Dispatcher: reservedDispatcher,
})
}
id, _ := ctx.Value(initKey).(*initData)
if id.protocols != nil {
otherOpts = append(otherOpts, irpc.PreferredServerResolveProtocols(id.protocols))
}
if id.connIdleExpiry > 0 {
otherOpts = append(otherOpts, irpc.IdleConnectionExpiry(id.connIdleExpiry))
}
return id.settingsPublisher, otherOpts, nil
}
func (r *Runtime) WithNewServer(ctx *context.T, name string, object interface{}, auth security.Authorizer, opts ...rpc.ServerOpt) (*context.T, rpc.Server, error) {
defer apilog.LogCall(ctx)(ctx) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
spub, opts, err := r.commonServerInit(ctx, opts...)
if err != nil {
return ctx, nil, err
}
newctx, s, err := irpc.WithNewServer(ctx, name, object, auth, spub, opts...)
if err != nil {
return ctx, nil, err
}
if err = r.addChild(ctx, s, func() { <-s.Closed() }); err != nil {
return ctx, nil, err
}
return newctx, s, nil
}
func (r *Runtime) WithNewDispatchingServer(ctx *context.T, name string, disp rpc.Dispatcher, opts ...rpc.ServerOpt) (*context.T, rpc.Server, error) {
defer apilog.LogCall(ctx)(ctx) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
spub, opts, err := r.commonServerInit(ctx, opts...)
if err != nil {
return ctx, nil, err
}
newctx, s, err := irpc.WithNewDispatchingServer(ctx, name, disp, spub, opts...)
if err != nil {
return ctx, nil, err
}
if err = r.addChild(ctx, s, func() { <-s.Closed() }); err != nil {
return ctx, nil, err
}
return newctx, s, nil
}