// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package main

import (
	"bytes"
	"encoding/json"
	"fmt"
	"io/ioutil"
	"net/http"
	"os"
	"os/exec"
	"path/filepath"
	"regexp"
	"strconv"
	"strings"
	"time"

	cloudmonitoring "google.golang.org/api/monitoring/v3"

	"v.io/jiri/collect"
	"v.io/jiri/tool"
	"v.io/v23/context"
	"v.io/x/devtools/internal/test"
	"v.io/x/lib/gcm"
)

const localCheckScript = `#!/bin/bash

# Check cpu.
CPU_IDLE="$(top -bn2 | grep Cpu | tail -1 | sed -n 's/.*,\s*\(\S*\) id,.*/\1/p')"
CPU_USAGE="$(echo "scale=4; 100.0-${CPU_IDLE}" | bc)"
echo ${CPU_USAGE} > output_cpu_$(hostname)

# Check memory.
MEM="$(free -m)"
MEM_TOTAL="$(echo "${MEM}" | grep Mem: | awk '{print $2}')"
MEM_FREE="$(echo "${MEM}" | grep /cache: | awk '{print $4}')"
MEM_USAGE="$(echo "scale=4; (${MEM_TOTAL}-${MEM_FREE})/${MEM_TOTAL}*100.0" | bc)"
echo ${MEM_USAGE} > output_mem_$(hostname)

# Check disk.
DISK_PCT="$(df | grep /dev/sda1 | awk '{print $5}')"
DISK_USAGE="$(echo "scale=4; ${DISK_PCT::-1}" | bc)"
echo ${DISK_USAGE} > output_disk_$(hostname)

# Check open TCP connections.
sudo netstat -anp --tcp | egrep "ESTABLISHED|CLOSE_WAIT|FIN_WAIT1|FIN_WAIT2" | wc -l > output_tcpconn_$(hostname)

# Check nginx
if [[ "$(hostname)" == nginx* ]]; then
  # Output is in the form of:
  #
  # Active connections: 12
  # server accepts handled requests
  #  64028 64028 65047
  # Reading: 3 Writing: 4 Waiting: 5
  NGINX_STATS="$(curl -s local-stackdriver-agent.stackdriver.com/nginx_status)"

  # Calculate qps.
  CUR_TIME_SECONDS="$(date +%s)"
  CUR_TOTAL_REQS="$(echo "${NGINX_STATS}" | grep '^ ' | awk '{print $3}')"
  LAST_REQS_INFO_FILE="/tmp/v-last-requests-info"
  QPS=0
  if [ -f "${LAST_REQS_INFO_FILE=}" ]; then
    LAST_TIME_SECONDS="$(cat "${LAST_REQS_INFO_FILE}" | awk '{print $1}')"
    LAST_TOTAL_REQS="$(cat "${LAST_REQS_INFO_FILE}" | awk '{print $2}')"
    QPS="$(echo "scale=2; (${CUR_TOTAL_REQS}-${LAST_TOTAL_REQS})/(${CUR_TIME_SECONDS}-${LAST_TIME_SECONDS})" | bc)"
  fi
  echo "${CUR_TIME_SECONDS} ${CUR_TOTAL_REQS}" > "${LAST_REQS_INFO_FILE}"
  echo ${QPS} > output_nginx-qps_$(hostname)

  # Other stats.
  echo "${NGINX_STATS}" | sed -n 's/^Active connections:\s*\(\d*\)/\1/p' > output_nginx-activeconn_$(hostname)
  echo "${NGINX_STATS}" | grep Reading | awk '{print $2}' > output_nginx-readingconn_$(hostname)
  echo "${NGINX_STATS}" | grep Reading | awk '{print $4}' > output_nginx-writingconn_$(hostname)
  echo "${NGINX_STATS}" | grep Reading | awk '{print $6}' > output_nginx-waitingconn_$(hostname)
fi
`

var (
	pingResultRE     = regexp.MustCompile(`(\S*)\s*:.*min/avg/max = [^/]*/([^/]*)/[^/]*`)
	gceMetricNames   = []string{"cpu-usage", "memory-usage", "disk-usage", "ping", "tcpconn"}
	nginxMetricNames = []string{"healthCheckLatency", "qps", "active-connections", "reading-connections", "writing-connections", "waiting-connections"}
)

type gceInstanceData struct {
	name      string
	zone      string
	ip        string
	stat      *gceInstanceStat
	nginxStat *nginxStat
}

type gceInstanceStat struct {
	cpuUsage    float64
	memUsage    float64
	diskUsage   float64
	pingLatency float64
	tcpconn     float64
}

type nginxStat struct {
	healthCheckLatency float64
	qps                float64
	activeConnections  float64
	readingConnections float64
	writingConnections float64
	waitingConnections float64
}

// checkGCEInstances checks all GCE instances in a GCE project.
func checkGCEInstances(_ *context.T, ctx *tool.Context, s *cloudmonitoring.Service) error {
	msg := "Getting instance list\n"
	instances, err := getInstances(ctx)
	if err != nil {
		test.Fail(ctx, msg)
		return err
	}
	test.Pass(ctx, msg)

	if err := invoker(ctx, "Check ping latencies\n", instances, checkPing); err != nil {
		return err
	}

	if err := invoker(ctx, "Check machine stats\n", instances, checkInstanceStats); err != nil {
		return err
	}

	if err := invoker(ctx, "Check nginx health\n", instances, checkNginxHealth); err != nil {
		return err
	}

	if err := invoker(ctx, "Send data to GCM\n", instances, sendToGCM); err != nil {
		return err
	}

	return nil
}

func invoker(ctx *tool.Context, msg string, instances []*gceInstanceData, fn func(*tool.Context, []*gceInstanceData) error) error {
	if err := fn(ctx, instances); err != nil {
		test.Fail(ctx, msg)
		return err
	}
	test.Pass(ctx, msg)
	return nil
}

// getInstances uses "gcloud compute instances list" to get a list of instances
// we care about.
func getInstances(ctx *tool.Context) ([]*gceInstanceData, error) {
	var out bytes.Buffer
	if err := ctx.NewSeq().Capture(&out, &out).
		Last("gcloud", "-q", "--project="+projectFlag, "compute", "instances", "list", "--format=json"); err != nil {
		return nil, err
	}
	var instances []struct {
		Name              string
		Zone              string
		Status            string
		NetworkInterfaces []struct {
			AccessConfigs []struct {
				NatIP string
			}
		}
	}
	if err := json.Unmarshal(out.Bytes(), &instances); err != nil {
		return nil, fmt.Errorf("Unmarshal() failed: %v", err)
	}
	filteredInstances := []*gceInstanceData{}
	for _, instance := range instances {
		if instance.Status != "RUNNING" {
			continue
		}
		name := instance.Name
		// We only collect data for the machines that are running vanadium services
		// and nginx servers.
		// New patterns should be added if new machine name patterns are introduced.
		if strings.HasPrefix(name, "vanadium") || strings.HasPrefix(name, "nginx") {
			filteredInstances = append(filteredInstances, &gceInstanceData{
				name: name,
				zone: instance.Zone,
				ip:   instance.NetworkInterfaces[0].AccessConfigs[0].NatIP,
				stat: &gceInstanceStat{
					cpuUsage:    -1,
					memUsage:    -1,
					diskUsage:   -1,
					pingLatency: -1,
					tcpconn:     -1,
				},
				nginxStat: &nginxStat{
					healthCheckLatency: -1,
					qps:                -1,
					activeConnections:  -1,
					readingConnections: -1,
					writingConnections: -1,
					waitingConnections: -1,
				},
			})
		}
	}
	return filteredInstances, nil
}

// checkPing checks the ping response from all instances using fping.
//
// By default, fping is installed on debian/ubuntu linux machines in GCE. For
// Macs, fping needs to be installed through homebrew.
func checkPing(ctx *tool.Context, instances []*gceInstanceData) error {
	// Check the fping program.
	if _, err := exec.LookPath("fping"); err != nil {
		return fmt.Errorf("fping not installed.")
	}

	// Run fping.
	args := []string{
		"-q",
		"-c3", // ping 3 times for each host.
	}
	ipToInstance := map[string]*gceInstanceData{}
	for _, instance := range instances {
		args = append(args, instance.ip)
		ipToInstance[instance.ip] = instance
	}
	var out bytes.Buffer
	if err := ctx.NewSeq().Capture(&out, &out).Last("fping", args...); err != nil {
		// When some hosts are not reachable, the command's exit code will be non-zero.
		fmt.Fprintf(ctx.Stdout(), "Output:\n%s\n", out.String())
	}

	// Parse output.
	for _, line := range strings.Split(out.String(), "\n") {
		matches := pingResultRE.FindStringSubmatch(line)
		if matches != nil {
			ip := matches[1]
			strLatency := matches[2]
			latency, err := strconv.ParseFloat(strLatency, 64)
			if err != nil {
				return fmt.Errorf("ParseFloat(%q) failed: %v", strLatency, err)
			}
			ipToInstance[ip].stat.pingLatency = latency
		}
	}
	return nil
}

// checkInstanceStats uses "vcloud run" command to run a script in remote
// instances and collects and processes results.
func checkInstanceStats(ctx *tool.Context, instances []*gceInstanceData) (e error) {
	// Create the check script in a tmp dir.
	s := ctx.NewSeq()
	tmpdir, err := s.TempDir("", "")
	if err != nil {
		return err
	}
	defer collect.Error(func() error { return ctx.NewSeq().RemoveAll(tmpdir).Done() }, &e)
	scriptPath := filepath.Join(tmpdir, "localtest.sh")
	if err := s.WriteFile(scriptPath, []byte(localCheckScript), os.FileMode(0755)).Done(); err != nil {
		return err
	}

	// Run "vcloud run" on all nodes.
	// This will run the local check script remotely and copy the "output" files back to
	// <tmpdir>/<node_name>/<tmpdir2>/output_<checktype>_<nodename>.
	nodes := []string{}
	instanceByNode := map[string]*gceInstanceData{}
	for _, instance := range instances {
		nodes = append(nodes, instance.name)
		instanceByNode[instance.name] = instance
	}
	vcloud := filepath.Join(binDirFlag, "vcloud")
	args := []string{
		"--project=" + projectFlag,
		"run",
		"--outdir=" + tmpdir,
		strings.Join(nodes, "|"),
		scriptPath,
	}
	if err := s.Capture(ioutil.Discard, ctx.Stderr()).
		Last(vcloud, args...); err != nil {
		return err
	}

	// Find and read output files.
	if err := filepath.Walk(tmpdir, func(path string, info os.FileInfo, err error) error {
		fileName := info.Name()
		if strings.HasPrefix(fileName, "output_") {
			// The filename is in the form of: output_<checktype>_<nodename>.
			parts := strings.SplitN(fileName, "_", 3)
			checkType := parts[1]
			node := parts[2]
			value, err := readFloatFromFile(ctx, path)
			if err != nil {
				return err
			}
			switch checkType {
			case "cpu":
				instanceByNode[node].stat.cpuUsage = value
			case "mem":
				instanceByNode[node].stat.memUsage = value
			case "disk":
				instanceByNode[node].stat.diskUsage = value
			case "tcpconn":
				instanceByNode[node].stat.tcpconn = value
			case "nginx-qps":
				instanceByNode[node].nginxStat.qps = value
			case "nginx-activeconn":
				instanceByNode[node].nginxStat.activeConnections = value
			case "nginx-readingconn":
				instanceByNode[node].nginxStat.readingConnections = value
			case "nginx-writingconn":
				instanceByNode[node].nginxStat.writingConnections = value
			case "nginx-waitingconn":
				instanceByNode[node].nginxStat.waitingConnections = value
			}
		}
		return nil
	}); err != nil {
		return fmt.Errorf("Walk() failed: %v", err)
	}
	return nil
}

// readFloatFromFile reads the given file's content as a number and converts it
// to float64.
func readFloatFromFile(ctx *tool.Context, path string) (float64, error) {
	bytes, err := ctx.NewSeq().ReadFile(path)
	if err != nil {
		return -1, err
	}
	strValue := strings.TrimSpace(string(bytes))
	value, err := strconv.ParseFloat(strValue, 64)
	if err != nil {
		if strValue == "" {
			value = 0
		} else {
			return -1, fmt.Errorf("ParseFloat(%s) failed:\nfile: %s\nerr: %v", strValue, path, err)
		}
	}
	return value, nil
}

func checkNginxHealth(ctx *tool.Context, instances []*gceInstanceData) error {
	timeout := time.Duration(5 * time.Second)
	client := http.Client{
		Timeout: timeout,
	}
	hasError := false
	for _, instance := range instances {
		if !strings.HasPrefix(instance.name, "nginx-worker") {
			continue
		}
		// Check the latency of worker's /health endpoint.
		lat := 5000.0 // default to 5s
		url := fmt.Sprintf("http://%s/health", instance.ip)
		start := time.Now()
		if resp, err := client.Get(url); err != nil {
			hasError = true
			fmt.Fprintf(ctx.Stderr(), "client.Get(%s) failed: %v\n", url, err)
		} else if resp.StatusCode != http.StatusOK {
			hasError = true
			resp.Body.Close()
			fmt.Fprintf(ctx.Stderr(), "got status code %v while checking %s, expected 200", resp.StatusCode, url)
		} else {
			resp.Body.Close()
			// Convert to ms.
			lat = float64(time.Now().Sub(start).Nanoseconds()) / 1000000.0
			if ctx.Verbose() {
				fmt.Fprintf(ctx.Stdout(), "/health latency for %s: %f ms\n", url, lat)
			}
		}
		instance.nginxStat.healthCheckLatency = lat
	}
	if hasError {
		return fmt.Errorf("some checks failed")
	}
	return nil
}

// sendToGCM sends instance stats data to GCM.
func sendToGCM(ctx *tool.Context, instances []*gceInstanceData) error {
	s, err := gcm.Authenticate(keyFileFlag)
	if err != nil {
		return err
	}
	timeStr := time.Now().UTC().Format(time.RFC3339)
	for _, instance := range instances {
		msg := fmt.Sprintf("Send gce instance data for %s (%s)\n", instance.name, instance.zone)
		for _, metricName := range gceMetricNames {
			value := -1.0
			switch metricName {
			case "cpu-usage":
				value = instance.stat.cpuUsage
			case "memory-usage":
				value = instance.stat.memUsage
			case "disk-usage":
				value = instance.stat.diskUsage
			case "ping":
				value = instance.stat.pingLatency
			case "tcpconn":
				value = instance.stat.tcpconn
			default:
				test.Fail(ctx, msg)
				return fmt.Errorf("Invalid metric name: %q", metricName)
			}
			// GCM treats 0 and missing value the same.
			if value == 0 {
				value = 0.0001
			}
			if err := sendInstanceDataToGCM(s, "gce-instance", metricName, timeStr, instance, value); err != nil {
				test.Fail(ctx, msg)
				return fmt.Errorf("failed to add %q to GCM: %v\n", metricName, err)
			}
		}

		msg = fmt.Sprintf("Send nginx data for %s (%s)\n", instance.name, instance.zone)
		for _, metricName := range nginxMetricNames {
			value := -1.0
			switch metricName {
			case "healthCheckLatency":
				value = instance.nginxStat.healthCheckLatency
			case "qps":
				value = instance.nginxStat.qps
			case "active-connections":
				value = instance.nginxStat.activeConnections
			case "reading-connections":
				value = instance.nginxStat.readingConnections
			case "writing-connections":
				value = instance.nginxStat.writingConnections
			case "waiting-connections":
				value = instance.nginxStat.waitingConnections
			default:
				test.Fail(ctx, msg)
				return fmt.Errorf("Invalid metric name: %q", metricName)
			}
			// GCM treats 0 and missing value the same.
			if value == 0 {
				value = 0.0001
			}
			if err := sendInstanceDataToGCM(s, "nginx", metricName, timeStr, instance, value); err != nil {
				test.Fail(ctx, msg)
				return fmt.Errorf("failed to add %q to GCM: %v\n", metricName, err)
			}
		}

		test.Pass(ctx, msg)
	}
	return nil
}

// sendInstanceDataToGCM sends a single instance's stat to GCM.
func sendInstanceDataToGCM(s *cloudmonitoring.Service, metricType, metricName, timeStr string, instance *gceInstanceData, value float64) error {
	md, err := gcm.GetMetric(metricType, projectFlag)
	if err != nil {
		return err
	}
	if err := sendDataToGCM(s, md, value, timeStr, "", "", instance.name, instance.zone, metricName); err != nil {
		return fmt.Errorf("failed to write timeseries: %v", err)
	}
	return nil
}
