blob: a7f14e31a232876547817245471a3a3349621d64 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package main
import (
"bytes"
"fmt"
"io"
"os/exec"
"strings"
"v.io/v23"
"v.io/v23/context"
"v.io/v23/security"
"v.io/v23/services/device"
)
const (
clusterAgentServiceName = "cluster-agent"
clusterAgentServicePort = 8193
clusterAgentApplicationName = "cluster-agentd"
)
func makeClusterAgentObject(config *vkubeConfig, rootBlessings string) object {
version := "latest"
if p := strings.Split(config.ClusterAgent.Image, ":"); len(p) == 2 {
version = p[1]
}
ca := object{
"apiVersion": "extensions/v1beta1",
"kind": "Deployment",
"metadata": object{
"name": clusterAgentApplicationName,
"labels": object{
"application": clusterAgentApplicationName,
},
"namespace": config.ClusterAgent.Namespace,
},
"spec": object{
"replicas": 1,
"selector": object{
"matchLabels": object{
"application": clusterAgentApplicationName,
},
},
"minReadySeconds": 60,
"revisionHistoryLimit": 10,
"strategy": object{
"type": "Recreate",
},
"template": object{
"metadata": object{
"labels": object{
"application": clusterAgentApplicationName,
"version": version,
},
},
"spec": object{
"containers": []object{
object{
"name": "cluster-agentd",
"image": config.ClusterAgent.Image,
"imagePullPolicy": "Always",
"ports": []object{
object{
"containerPort": clusterAgentServicePort,
},
},
"livenessProbe": object{
"exec": object{
"command": []string{
"vrpc", "identify", fmt.Sprintf("/localhost:%d", clusterAgentServicePort),
},
},
"initialDelaySeconds": 5,
"timeoutSeconds": 1,
},
"readinessProbe": object{
"exec": object{
"command": []string{
"vrpc", "identify", fmt.Sprintf("/localhost:%d", clusterAgentServicePort),
},
},
"initialDelaySeconds": 5,
"timeoutSeconds": 1,
},
"resources": object{
"limits": object{
"cpu": config.ClusterAgent.CPU,
"memory": config.ClusterAgent.Memory,
},
},
"volumeMounts": []object{
object{
"name": "data",
"mountPath": "/data",
},
object{
"name": "logs",
"mountPath": "/logs",
},
},
"env": []object{
object{
"name": "ROOT_BLESSINGS",
"value": rootBlessings,
},
object{
"name": "CLAIMER",
"value": clusterAgentClaimer(config),
},
object{
"name": "ADMIN",
"value": config.ClusterAgent.Admin,
},
object{
"name": "DATADIR",
"value": "/data",
},
object{
"name": "LOGDIR",
"value": "/logs",
},
},
},
},
"volumes": []interface{}{
object{
"name": "logs",
"emptyDir": object{},
},
},
},
},
},
}
switch {
case config.ClusterAgent.PersistentDisk != "":
ca.append("spec.template.spec.volumes", object{
"name": "data",
"gcePersistentDisk": object{
"pdName": config.ClusterAgent.PersistentDisk,
"fsType": "ext4",
},
})
case config.ClusterAgent.PersistentVolumeClaim != "":
ca.append("spec.template.spec.volumes", object{
"name": "data",
"persistentVolumeClaim": object{
"claimName": config.ClusterAgent.PersistentVolumeClaim,
},
})
default:
ca.append("spec.template.spec.volumes", object{
"name": "data",
"emptyDir": object{},
})
}
return ca
}
// createClusterAgent creates a Deployment and a Service to run the cluster
// agent.
func createClusterAgent(ctx *context.T, config *vkubeConfig) error {
if err := createNamespaceIfNotExist(config.ClusterAgent.Namespace); err != nil {
return err
}
ca := makeClusterAgentObject(config, rootBlessings(ctx))
if out, err := kubectlCreate(ca, "--record"); err != nil {
return fmt.Errorf("failed to create deployment: %v\n%s\n", err, string(out))
}
svc := object{
"apiVersion": "v1",
"kind": "Service",
"metadata": object{
"name": clusterAgentServiceName,
"namespace": config.ClusterAgent.Namespace,
},
"spec": object{
"ports": []object{
object{
"port": clusterAgentServicePort,
"targetPort": clusterAgentServicePort,
},
},
"selector": object{
"application": clusterAgentApplicationName,
},
"type": "LoadBalancer",
},
}
if config.ClusterAgent.InternalOnly {
svc.set("spec.type", "ClusterIP")
} else if config.ClusterAgent.ExternalIP != "" {
svc.set("spec.loadBalancerIP", config.ClusterAgent.ExternalIP)
}
if out, err := kubectlCreate(svc); err != nil {
return fmt.Errorf("failed to create service: %v\n%s\n", err, string(out))
}
return nil
}
// stopClusterAgent stops the cluster agent Deployment and deletes its Service.
func stopClusterAgent(config *vkubeConfig) error {
if out, err := kubectl("--namespace="+config.ClusterAgent.Namespace, "delete", "deployment", clusterAgentApplicationName); err != nil {
return fmt.Errorf("failed to stop %s: %v: %s", clusterAgentApplicationName, err, out)
}
if out, err := kubectl("--namespace="+config.ClusterAgent.Namespace, "delete", "service", clusterAgentServiceName); err != nil {
return fmt.Errorf("failed to delete %s: %v: %s", clusterAgentServiceName, err, out)
}
return nil
}
// updateClusterAgent updates the Deployment of an existing cluster agent.
func updateClusterAgent(config *vkubeConfig, stdout, stderr io.Writer) error {
data, err := kubectl("--namespace="+config.ClusterAgent.Namespace, "get", "deployment", clusterAgentApplicationName, "-o", "json")
if err != nil {
return fmt.Errorf("failed to get deployment %q: %v\n%s\n", clusterAgentApplicationName, err, string(data))
}
var deployment object
if err := deployment.importJSON(data); err != nil {
return fmt.Errorf("failed to parse kubectl output: %v", err)
}
// Find root blessings.
containers := deployment.getObjectArray("spec.template.spec.containers")
if len(containers) != 1 {
return fmt.Errorf("unexpected number of containers in cluster agent: %d", len(containers))
}
var root string
for _, e := range containers[0].getObjectArray("env") {
if e.getString("name") == "ROOT_BLESSINGS" {
root = e.getString("value")
break
}
}
json, err := makeClusterAgentObject(config, root).json()
if err != nil {
return err
}
cmd := exec.Command(flagKubectlBin, "replace", "--record", "-f", "-", "--namespace="+config.ClusterAgent.Namespace)
cmd.Stdin = bytes.NewBuffer(json)
cmd.Stdout = stdout
cmd.Stderr = stderr
if err := cmd.Run(); err != nil {
return fmt.Errorf("failed to update deployment %q: %v\n", clusterAgentApplicationName, err)
}
return nil
}
// clusterAgentClaimer returns the blessing name of the claimer of the cluster
// agent.
func clusterAgentClaimer(config *vkubeConfig) string {
p := strings.Split(config.ClusterAgent.Blessing, security.ChainSeparator)
return strings.Join(p[:len(p)-1], security.ChainSeparator)
}
// findClusterAgent returns the external address of the cluster agent.
func findClusterAgent(config *vkubeConfig, includeBlessings bool) (string, error) {
out, err := kubectl("--namespace="+config.ClusterAgent.Namespace, "get", "service", clusterAgentServiceName, "-o", "json")
if err != nil {
return "", fmt.Errorf("failed to get info of %s: %v: %s", clusterAgentServiceName, err, out)
}
var svc object
if err := svc.importJSON(out); err != nil {
return "", fmt.Errorf("failed to parse kubectl output: %v", err)
}
ports := svc.getObjectArray("spec.ports")
if len(ports) == 0 {
return "", fmt.Errorf("service %q has no ports", clusterAgentServiceName)
}
port := ports[0].getInt("port", -1)
if port < 0 {
return "", fmt.Errorf("service %q has no valid port: %v", clusterAgentServiceName, port)
}
var ip string
if config.ClusterAgent.InternalOnly {
ip = svc.getString("spec.clusterIP")
} else {
ingress := svc.getObjectArray("status.loadBalancer.ingress")
if len(ingress) == 0 {
return "", fmt.Errorf("service %q has no loadbalancer ingress", clusterAgentServiceName)
}
ip = ingress[0].getString("ip")
}
if ip == "" {
return "", fmt.Errorf("service %q loadbalancer has no valid ip", clusterAgentServiceName)
}
if includeBlessings {
return fmt.Sprintf("/(%s)@%s:%d", config.ClusterAgent.Blessing, ip, port), nil
}
return fmt.Sprintf("/%s:%d", ip, port), nil
}
// claimClusterAgent claims the cluster agent with the given blessing extension.
func claimClusterAgent(ctx *context.T, config *vkubeConfig, extension string) error {
addr, err := findClusterAgent(config, false)
if err != nil {
return err
}
baseBlessings, _ := v23.GetPrincipal(ctx).BlessingStore().Default()
if err := device.ClaimableClient(addr).Claim(ctx, "", &granter{blessings: baseBlessings, extension: extension}); err != nil {
return err
}
return nil
}