blob: 453c6695fca8060b438730a289999e524c911afe [file] [log] [blame]
// Copyright 2016 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package control starts a number of syncbase instances and configures their
// connectivity based on a model.
//
// Usage:
//
// // Create a new universe model.
// u := model.GenerateUniverse(universeOpts)
//
// // Run the universe model on a new controller.
// c := NewController(controllerOpts)
// err := c.Run(u)
//
// // Then update universe however you want, and run it on the controller.
// u = ...
// err := c.Run(u)
//
// // Pause all clients.
// err := c.Pause()
//
// // Run checker.
// err := c.RunChecker(Checker)
//
// // Resume all clients.
// err := c.Resume()
//
// // Tear it all down.
// err := c.TearDown()
package control
import (
"fmt"
"io"
"os"
"path/filepath"
"strings"
"sync"
"time"
"v.io/v23"
"v.io/v23/context"
"v.io/v23/security"
"v.io/x/lib/gosh"
vsecurity "v.io/x/ref/lib/security"
"v.io/x/ref/runtime/protocols/vine"
"v.io/x/ref/services/mounttable/mounttablelib"
"v.io/x/ref/services/syncbase/longevity_tests/checker"
"v.io/x/ref/services/syncbase/longevity_tests/client"
"v.io/x/ref/services/syncbase/longevity_tests/model"
"v.io/x/ref/services/syncbase/longevity_tests/mounttabled"
"v.io/x/ref/test/testutil"
)
var (
mounttabledMain = gosh.RegisterFunc("mounttabledMain", mounttabled.Main)
)
// Controller handles starting and stopping syncbase instances according to
// model universes.
type Controller struct {
// Context used to send RPCs to vine servers.
ctx *context.T
// Context used by Checkers to verify syncbase data.
checkerCtx *context.T
// identityProvider used to bless mounttable and all syncbase instances.
identityProvider *testutil.IDProvider
// Map of syncbase instance names to instances.
instances map[string]*instance
instancesMu sync.Mutex // Locks instances.
// Gosh command for mounttabled process. Will be nil if mounttabled is not
// running.
mtCmd *gosh.Cmd
// Namespace root of all instances.
namespaceRoot string
// Directory containing all instance working directories.
rootDir string
// Gosh shell used to spawn all processes.
sh *gosh.Shell
// Name of the controller's vine server. Even though the controller only
// acts as a client, it needs to run a vine server so that other servers
// can be configured to allow RPCs from the controller.
vineName string
// Universe model that the controller is currently simulating.
universe *model.Universe
// vineShutdown shutdown the vine server and vine plugin.
vineShutdown func()
}
// Opts is used to configure the Controller.
type Opts struct {
// Directory that will contain all instance output.
RootDir string
// Send all child process output to stdout/stderr. Usefull for tests.
DebugOutput bool
// Tests should pass their testing.TB instance, non-tests should pass nil.
TB gosh.TB
}
// NewController returns a new Controller configured by opts.
func NewController(ctx *context.T, opts Opts) (*Controller, error) {
sh := gosh.NewShell(opts.TB)
if opts.DebugOutput {
sh.PropagateChildOutput = true
}
c := &Controller{
instances: make(map[string]*instance),
rootDir: opts.RootDir,
sh: sh,
vineName: "vine-controller",
}
if err := c.init(ctx); err != nil {
return nil, err
}
return c, nil
}
// init initializes the controller.
func (c *Controller) init(ctx *context.T) error {
// Initialize rootDir.
if c.rootDir == "" {
return fmt.Errorf("controller rootDir must not be empty")
}
if err := os.MkdirAll(c.rootDir, 0755); err != nil {
return err
}
// Capture output of "vmstat".
if err := c.startVmstat(); err != nil {
return err
}
// Initialize identity provider.
c.identityProvider = testutil.NewIDProvider("root")
// Start mounttabled.
if err := c.startMounttabled(); err != nil {
return err
}
// Configure context for controller.
ctx, err := c.setContextBlessings(ctx, "controller")
if err != nil {
return err
}
// Set proper namespace root.
ctx, _, err = v23.WithNewNamespace(ctx, c.namespaceRoot)
if err != nil {
return err
}
// Modify ctx to use vine protocol.
ctx, c.vineShutdown, err = vine.Init(ctx, c.vineName, security.AllowEveryone(), c.vineName, 0)
if err != nil {
return err
}
c.ctx = ctx
// Configure context for checker.
if checkerCtx, err := c.setContextBlessings(ctx, "checker"); err != nil {
return err
} else {
c.checkerCtx = checkerCtx
}
return nil
}
// TearDown stops all instances and the root mounttable.
func (c *Controller) TearDown() error {
var retErr error
c.instancesMu.Lock()
// Stop all instance clients.
for _, inst := range c.instances {
err := inst.stopClients()
if err != nil && retErr == nil {
retErr = err
}
}
// Give syncbases a few seconds to finish processing any operations (like
// syncing).
time.Sleep(2 * time.Second)
// Stop all instance syncbases.
for _, inst := range c.instances {
err := inst.stopSyncbase()
if err != nil && retErr == nil {
retErr = err
}
}
c.instancesMu.Unlock()
c.vineShutdown()
c.sh.Cleanup()
return retErr
}
// Run runs a mounttable and a number of syncbased instances with connectivity
// defined by the given model.
// Run can be called multiple times, and will update the running syncbases to
// reflect the model. Any running instances not contained in the model will be
// stopped. Any non-running instances contained in the model will be started.
func (c *Controller) Run(u *model.Universe) error {
// allDevices keeps track of all devices in the model.
allDevices := model.DeviceSet{}
// Update all devices owned by each user. If a device is not currently
// running, it will be started.
for _, user := range u.Users {
allDevices = append(allDevices, user.Devices...)
if err := c.updateInstancesForUser(user); err != nil {
return err
}
}
// Return an error if there are running devices that are not in the model.
// Universes are not allowed to shrink.
// TODO(nlacasse): Re-evaluate this rule once we have more examples of
// longevity tests.
c.instancesMu.Lock()
for _, inst := range c.instances {
inModel := false
for _, d := range allDevices {
if inst.name == d.Name {
inModel = true
break
}
}
if !inModel {
c.instancesMu.Unlock()
return fmt.Errorf("device %s is running but not in given model %v", inst.name, u)
}
}
c.instancesMu.Unlock()
if err := c.updateTopology(u.Topology); err != nil {
return err
}
c.universe = u
return nil
}
// RunChecker runs the given Checker.
func (c *Controller) RunChecker(checker checker.Checker) error {
return checker.Run(c.checkerCtx, *c.universe)
}
// PauseUniverse stops all clients but leaves syncbases running.
func (c *Controller) PauseUniverse() error {
var retErr error
c.instancesMu.Lock()
defer c.instancesMu.Unlock()
for _, inst := range c.instances {
err := inst.stopClients()
if err != nil && retErr == nil {
retErr = err
}
}
return retErr
}
// ResumeUniverse restarts all clients.
func (c *Controller) ResumeUniverse() error {
var retErr error
c.instancesMu.Lock()
defer c.instancesMu.Unlock()
for _, inst := range c.instances {
err := inst.startClients()
if err != nil && retErr == nil {
retErr = err
}
}
return retErr
}
func (c *Controller) updateInstancesForUser(user *model.User) error {
c.instancesMu.Lock()
defer c.instancesMu.Unlock()
for _, d := range user.Devices {
// If instance is already running, update it to match model.
if inst, ok := c.instances[d.Name]; ok {
inst.update(d)
continue
}
// Otherwise start a new instance for the model.
inst, err := c.newInstance(user, d)
if err != nil {
return err
}
if err := inst.start(c.ctx); err != nil {
return err
}
c.instances[inst.name] = inst
}
return nil
}
func (c *Controller) updateTopology(top model.Topology) error {
c.instancesMu.Lock()
defer c.instancesMu.Unlock()
// Start with empty topology map.
behaviorMap := map[vine.PeerKey]vine.PeerBehavior{}
// Set the reachable pairs based on the given topology.
for clientDevice, serviceDevices := range top {
clientInst, ok := c.instances[clientDevice.Name]
if !ok {
return fmt.Errorf("no instance associated with device %v", clientDevice)
}
for _, serviceDevice := range serviceDevices {
serviceInst, ok := c.instances[serviceDevice.Name]
if !ok {
return fmt.Errorf("no instance associated with device %v", clientDevice)
}
peerKey := vine.PeerKey{
Dialer: clientInst.vineName,
Acceptor: serviceInst.vineName,
}
behaviorMap[peerKey] = vine.PeerBehavior{Reachable: true}
}
}
// Allow the controller to access every device.
for _, inst := range c.instances {
pk1 := vine.PeerKey{
Dialer: c.vineName,
Acceptor: inst.vineName,
}
behaviorMap[pk1] = vine.PeerBehavior{Reachable: true}
pk2 := vine.PeerKey{
Dialer: inst.vineName,
Acceptor: c.vineName,
}
behaviorMap[pk2] = vine.PeerBehavior{Reachable: true}
}
// Set the behavior on each instance.
for _, inst := range c.instances {
vineClient := vine.VineClient(inst.vineName)
if err := vineClient.SetBehaviors(c.ctx, behaviorMap); err != nil {
return err
}
}
// Set the controller's behavior.
vineClient := vine.VineClient(c.vineName)
if err := vineClient.SetBehaviors(c.ctx, behaviorMap); err != nil {
return err
}
return nil
}
// newInstance creates a new instance corresponding to the given device model.
func (c *Controller) newInstance(user *model.User, d *model.Device) (*instance, error) {
wd := filepath.Join(c.rootDir, "instances", d.Name)
if err := os.MkdirAll(wd, 0755); err != nil {
return nil, err
}
// Create persistent principal for syncbase instance on device.
credsDir := filepath.Join(wd, "credentials")
instancePrincipal, err := vsecurity.CreatePersistentPrincipal(credsDir, nil)
if err != nil {
return nil, err
}
// Bless instance's principal with name root:u:<user>:<device>.
blessingName := strings.Join([]string{"u", user.Name, d.Name}, security.ChainSeparator)
if err := c.identityProvider.Bless(instancePrincipal, blessingName); err != nil {
return nil, err
}
clients := make([]client.Client, len(d.Clients))
for i, clientType := range d.Clients {
f := LookupClient(clientType)
if f == nil {
return nil, fmt.Errorf("no client for type %q", clientType)
}
clients[i] = f()
}
vineName := "vine-" + d.Name
inst := &instance{
clients: clients,
credsDir: credsDir,
databases: d.Databases,
name: d.Name,
namespaceRoot: c.namespaceRoot,
principal: instancePrincipal,
vineName: vineName,
sh: c.sh,
wd: wd,
}
return inst, nil
}
// startMounttabled starts mounttabled and sets c.namespaceRoot with
// mounttabled's endpoint.
func (c *Controller) startMounttabled() error {
mtDir := filepath.Join(c.rootDir, "mounttabled")
mtCreds := filepath.Join(mtDir, "credentials")
if err := os.MkdirAll(mtCreds, 0755); err != nil {
return err
}
// Create blessings for mounttable.
p, err := vsecurity.CreatePersistentPrincipal(mtCreds, nil)
if err != nil {
return err
}
if err = c.identityProvider.Bless(p, "mounttable"); err != nil {
return err
}
// Start mounttable and set namespace root.
opts := mounttablelib.Opts{}
c.mtCmd = c.sh.FuncCmd(mounttabledMain, opts)
c.mtCmd.Args = append(c.mtCmd.Args,
"--log_dir="+mtDir,
"--v23.credentials="+mtCreds,
"--vpath=mounttable=3,namespace=3",
)
c.mtCmd.Start()
vars := c.mtCmd.AwaitVars("NAME")
name := vars["NAME"]
if name == "" {
return fmt.Errorf("empty NAME variable returned by mounttable")
}
c.namespaceRoot = name
return nil
}
// setContextBlessings returns a new context based off the given one, with a new
// principal that is blessed by the controller's identity provider.
func (c *Controller) setContextBlessings(ctx *context.T, blessingName string) (*context.T, error) {
// Create a principal blessed by the identity provider.
p := testutil.NewPrincipal()
if err := c.identityProvider.Bless(p, blessingName); err != nil {
return nil, err
}
newCtx, err := v23.WithPrincipal(ctx, p)
if err != nil {
return nil, err
}
return newCtx, nil
}
// startVmstat writes a timestamp and vmstat output to a log file every second.
func (c *Controller) startVmstat() error {
logFilePath := filepath.Join(c.rootDir, "vmstat.log")
// Open logfile in append-mode.
logFile, err := os.OpenFile(logFilePath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0666)
if err != nil {
return err
}
cmd := c.sh.Cmd("bash", "-c", "vmstat -n -w 1 | awk '{ print strftime(\"%c, %Z\"), $0; fflush(); }'")
outPipe := cmd.StdoutPipe()
go func() {
io.Copy(logFile, outPipe)
logFile.Close()
}()
cmd.Start()
return nil
}