blob: 10ff1ad21fe1467e1fdb258388f61dfe8cd3da56 [file] [log] [blame]
// Copyright 2016 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package main
import (
"crypto/sha1"
"encoding/base64"
"encoding/hex"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"os/exec"
"strings"
"text/template"
"time"
"v.io/v23/context"
"v.io/v23/conventions"
"v.io/v23/rpc"
"v.io/v23/security"
"v.io/v23/security/access"
"v.io/v23/verror"
"v.io/v23/vom"
"v.io/x/ref/services/allocator"
)
const pkgPath = "v.io/x/ref/services/allocator/allocatord"
var (
errLimitExceeded = verror.Register(pkgPath+".errLimitExceeded", verror.NoRetry, "{1:}{2:} limit ({3}) exceeded")
errGlobalLimitExceeded = verror.Register(pkgPath+".errGlobalLimitExceeded", verror.NoRetry, "{1:}{2:} global limit ({3}) exceeded")
)
type allocatorImpl struct {
baseBlessings security.Blessings
baseBlessingNames []string
}
// Create creates a new instance of the service.
// It returns a handle for the new instance.
func (i *allocatorImpl) Create(ctx *context.T, call rpc.ServerCall) (string, error) {
b, _ := security.RemoteBlessingNames(ctx, call.Security())
ctx.Infof("Create() called by %v", b)
email := emailFromBlessingNames(b)
if email == "" {
return "", verror.New(verror.ErrNoAccess, ctx, "unable to determine caller's email address")
}
return create(ctx, email, i.baseBlessings, i.baseBlessingNames)
}
// Destroy destroys the instance with the given handle.
func (i *allocatorImpl) Destroy(ctx *context.T, call rpc.ServerCall, kName string) error {
b, _ := security.RemoteBlessingNames(ctx, call.Security())
ctx.Infof("Destroy(%q) called by %v", kName, b)
email := emailFromBlessingNames(b)
if email == "" {
return verror.New(verror.ErrNoAccess, ctx, "unable to determine caller's email address")
}
return destroy(ctx, email, kName)
}
// List returns a list of all the instances owned by the caller.
func (i *allocatorImpl) List(ctx *context.T, call rpc.ServerCall) ([]allocator.Instance, error) {
b, _ := security.RemoteBlessingNames(ctx, call.Security())
ctx.Infof("List() called by %v", b)
email := emailFromBlessingNames(b)
if email == "" {
return nil, verror.New(verror.ErrNoAccess, ctx, "unable to determine caller's email address")
}
return serverInstances(ctx, email)
}
func create(ctx *context.T, email string, baseBlessings security.Blessings, baseBlessingNames []string) (string, error) {
// Enforce a limit on the number of instances. These tests are a little
// bit racy. It's possible that multiple calls to create() will run
// concurrently and that we'll end up with too many instances.
if n, err := serverInstances(ctx, email); err != nil {
return "", err
} else if len(n) >= maxInstancesPerUserFlag {
return "", verror.New(errLimitExceeded, ctx, maxInstancesPerUserFlag)
}
if n, err := serverInstances(ctx, ""); err != nil {
return "", err
} else if len(n) >= maxInstancesFlag {
return "", verror.New(errGlobalLimitExceeded, ctx, maxInstancesFlag)
}
kName, err := newKubeName()
if err != nil {
return "", err
}
mName := mountNameFromKubeName(ctx, kName)
cfg, cleanup, err := createDeploymentConfig(ctx, email, kName, mName, baseBlessingNames)
defer cleanup()
if err != nil {
return "", err
}
vomBlessings, err := vom.Encode(baseBlessings)
if err != nil {
return "", err
}
if err := createPersistentDisk(ctx, kName); err != nil {
return "", err
}
if _, err := vkube(
"start", "-f", cfg,
"--base-blessings", base64.URLEncoding.EncodeToString(vomBlessings),
"--wait",
kName,
); err != nil {
ctx.Errorf("Error: %v", err)
deletePersistentDisk(ctx, kName)
return "", verror.New(verror.ErrInternal, ctx, err)
}
return kName, nil
}
func scale(ctx *context.T, email, kName string, replicas int) error {
args := []string{"kubectl", "scale", "--timeout=1m", fmt.Sprintf("--replicas=%d", replicas), "deployment", kName}
if _, err := vkube(args...); err != nil {
ctx.Errorf("Error: %v", err)
return verror.New(verror.ErrInternal, ctx, err)
}
return nil
}
func suspendImpl(ctx *context.T, email, kName string) error {
return scale(ctx, email, kName, 0)
}
func suspend(ctx *context.T, email, kName string) error {
if err := isOwnerOfInstance(ctx, email, kName); err != nil {
return err
}
return suspendImpl(ctx, email, kName)
}
func resumeImpl(ctx *context.T, email, kName string) error {
return scale(ctx, email, kName, 1)
}
func resume(ctx *context.T, email, kName string) error {
if err := isOwnerOfInstance(ctx, email, kName); err != nil {
return err
}
return resumeImpl(ctx, email, kName)
}
func resetDisk(ctx *context.T, email, kName string) error {
instance, err := getInstance(ctx, email, kName)
if err != nil {
return err
}
if instance.Replicas > 0 {
if err := suspend(ctx, email, kName); err != nil {
return err
}
}
if err := deletePersistentDisk(ctx, kName); err != nil {
return err
}
if err := createPersistentDisk(ctx, kName); err != nil {
return err
}
if instance.Replicas > 0 {
return resume(ctx, email, kName)
}
return nil
}
func destroy(ctx *context.T, email, kName string) error {
if err := isOwnerOfInstance(ctx, email, kName); err != nil {
return err
}
cfg, cleanup, err := createDeploymentConfig(ctx, email, kName, "", nil)
defer cleanup()
if err != nil {
return err
}
if _, err := vkube("stop", "-f", cfg); err != nil {
ctx.Errorf("Error: %v", err)
return verror.New(verror.ErrInternal, ctx, err)
}
return deletePersistentDisk(ctx, kName)
}
func createDeploymentConfig(ctx *context.T, email, deploymentName, mountName string, baseBlessingNames []string) (string, func(), error) {
cleanup := func() {}
acl, err := accessList(ctx, email)
if err != nil {
return "", cleanup, err
}
blessingNames := make([]string, len(baseBlessingNames))
for i, b := range baseBlessingNames {
blessingNames[i] = b + security.ChainSeparator + deploymentName
}
creatorInfo, err := creatorInfo(ctx, email, mountName, blessingNames)
if err != nil {
return "", cleanup, err
}
t, err := template.ParseFiles(deploymentTemplateFlag)
if err != nil {
return "", cleanup, err
}
data := struct {
AccessList string
CreatorInfo string
MountName string
Name string
OwnerHash string
Version string
}{
AccessList: acl,
CreatorInfo: creatorInfo,
MountName: mountName,
Name: deploymentName,
OwnerHash: emailHash(email),
Version: serverVersionFlag,
}
f, err := ioutil.TempFile("", "allocator-deployment-")
if err != nil {
return "", cleanup, err
}
defer f.Close()
cleanup = func() { os.Remove(f.Name()) }
if err := t.Execute(f, data); err != nil {
return "", cleanup, err
}
return f.Name(), cleanup, nil
}
// accessList returns a double encoded JSON access list that can be used in a
// Deployment template that contains something like:
// "--v23.permissions.literal={\"Admin\": {{.AccessList}} }"
// The access list include the creator.
func accessList(ctx *context.T, email string) (string, error) {
var acl access.AccessList
if globalAdminsFlag != "" {
for _, admin := range strings.Split(globalAdminsFlag, ",") {
acl.In = append(acl.In, security.BlessingPattern(admin))
}
}
for _, blessing := range conventions.ParseBlessingNames(blessingNamesFromEmail(email)...) {
acl.In = append(acl.In, blessing.UserPattern())
}
j, err := json.Marshal(acl)
if err != nil {
ctx.Errorf("json.Marshal(%#v) failed: %v", acl, err)
return "", err
}
// JSON encode again, because the access list is in a JSON template.
str := string(j)
j, err = json.Marshal(str)
if err != nil {
ctx.Errorf("json.Marshal(%#v) failed: %v", str, err)
return "", err
}
// Remove the quotes.
return string(j[1 : len(j)-1]), nil
}
type creatorInfoData struct {
Email string `json:"email"`
BlessingNames []string `json:"blessingNames"`
MountName string `json:"mountName"`
}
// creatorInfo returns a double encoded JSON access list that can be used as
// annotation in a Deployment template, e.g.
// "annotations": {
// "v.io/allocator-creator-info": {{.CreatorInfo}}
// }
func creatorInfo(ctx *context.T, email, mountName string, blessingNames []string) (string, error) {
j, err := json.Marshal(creatorInfoData{email, blessingNames, mountName})
if err != nil {
ctx.Errorf("json.Marshal() failed: %v", err)
return "", err
}
// JSON encode again, because the annotation is in a JSON template.
str := string(j)
j, err = json.Marshal(str)
if err != nil {
ctx.Errorf("json.Marshal(%#v) failed: %v", str, err)
return "", err
}
return string(j), nil
}
func decodeCreatorInfo(s string) (data creatorInfoData, err error) {
err = json.Unmarshal([]byte(s), &data)
return
}
func emailHash(email string) string {
h := sha1.Sum([]byte(email))
return hex.EncodeToString(h[:])
}
func serverInstances(ctx *context.T, email string) ([]allocator.Instance, error) {
args := []string{"kubectl", "get", "deployments", "-o", "json"}
if email != "" {
args = append(args, "-l", "ownerHash="+emailHash(email))
}
var out []byte
var err error
for retry := 0; retry < 10; retry++ {
if out, err = vkube(args...); err == nil {
break
}
time.Sleep(time.Second)
}
if err != nil {
return nil, err
}
var list struct {
Items []struct {
Metadata struct {
Name string `json:"name"`
CreationTime time.Time `json:"creationTimestamp"`
Annotations map[string]string `json:"annotations"`
} `json:"metadata"`
Spec struct {
Replicas int32 `json:"replicas"`
Template struct {
Metadata struct {
Labels struct {
Version string `json:"version"`
} `json:"labels"`
} `json:"metadata"`
} `json:"template"`
} `json:"spec"`
} `json:"items"`
}
if err := json.Unmarshal(out, &list); err != nil {
return nil, err
}
instances := []allocator.Instance{}
for _, l := range list.Items {
if !strings.HasPrefix(l.Metadata.Name, serverNameFlag+"-") {
continue
}
cInfo, err := decodeCreatorInfo(l.Metadata.Annotations["v.io/allocator-creator-info"])
if err != nil {
ctx.Errorf("decodeCreatorInfo failed: %v", err)
continue
}
instances = append(instances, allocator.Instance{
Handle: l.Metadata.Name,
MountName: cInfo.MountName,
BlessingNames: cInfo.BlessingNames,
CreationTime: l.Metadata.CreationTime,
Replicas: l.Spec.Replicas,
Version: l.Spec.Template.Metadata.Labels.Version,
})
}
return instances, nil
}
func getInstance(ctx *context.T, email, kName string) (allocator.Instance, error) {
instances, err := serverInstances(ctx, email)
if err != nil {
return allocator.Instance{}, err
}
for _, i := range instances {
if i.Handle == kName {
return i, nil
}
}
return allocator.Instance{}, verror.New(verror.ErrNoExistOrNoAccess, nil)
}
func isOwnerOfInstance(ctx *context.T, email, kName string) error {
_, err := getInstance(ctx, email, kName)
return err
}
func createPersistentDisk(ctx *context.T, name string) error {
if out, err := gcloud("compute", "disks", "create", name, "--size", diskSizeFlag); err != nil {
ctx.Errorf("disk creation failed: %v: %s", err, string(out))
return err
}
return nil
}
func deletePersistentDisk(ctx *context.T, name string) error {
var (
start = time.Now()
out []byte
err error
)
for time.Since(start) < 5*time.Minute {
if out, err = gcloud("compute", "disks", "delete", name); err == nil {
return nil
}
time.Sleep(time.Second)
}
ctx.Errorf("disk deletion failed: %v: %s", err, string(out))
return err
}
func gcloud(args ...string) ([]byte, error) {
data, err := ioutil.ReadFile(vkubeCfgFlag)
if err != nil {
return nil, err
}
var config struct {
Project string `json:"project"`
Zone string `json:"zone"`
}
if err := json.Unmarshal(data, &config); err != nil {
return nil, err
}
args = append(args, "--project", config.Project, "--zone", config.Zone)
return exec.Command(gcloudBinFlag, args...).CombinedOutput()
}
func vkube(args ...string) (out []byte, err error) {
args = append(
[]string{
"--config=" + vkubeCfgFlag,
"--kubectl=" + kubectlBinFlag,
"--no-headers",
},
args...,
)
out, err = exec.Command(vkubeBinFlag, args...).CombinedOutput()
if err != nil {
err = fmt.Errorf("vkube(%v) failed: %v\n%s\n", args, err, string(out))
}
return
}