blob: c3cafa05b44f3be2ade97f2aaeeeb65f6970176d [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package test
import (
"bytes"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"reflect"
"regexp"
"strconv"
"strings"
"time"
"v.io/jiri"
"v.io/jiri/collect"
"v.io/x/devtools/internal/test"
"v.io/x/devtools/internal/xunit"
)
const (
testStressNodeName = "stress"
testStressNumServerNodes = 3
testStressNumClientNodes = 6
testStressNumWorkersPerClient = 8
testStressMaxChunkCnt = 100
testStressMaxPayloadSize = 10000
testStressDuration = 1 * time.Hour
testLoadNodeName = "load"
testLoadNumServerNodes = 1
testLoadNumClientNodes = 1
testLoadCPUs = testLoadNumServerNodes
testLoadPayloadSize = 1000
testLoadDuration = 15 * time.Minute
loadStatsOutputFile = "load_stats.json"
serverPort = 10000
serverMaxUpTime = 2 * time.Hour
waitTimeForServerUp = 1 * time.Minute
gceProject = "vanadium-internal"
gceZone = "us-central1-f"
gceServerMachineType = "n1-highcpu-8"
gceClientMachineType = "n1-highcpu-4"
gceNodePrefix = "tmpnode-rpc"
vcloudPkg = "v.io/x/devtools/vcloud"
serverPkg = "v.io/x/ref/runtime/internal/rpc/stress/stressd"
clientPkg = "v.io/x/ref/runtime/internal/rpc/stress/stress"
)
var (
binPath = filepath.Join("release", "go", "bin")
)
// vanadiumGoRPCStress runs an RPC stress test with multiple GCE instances.
func vanadiumGoRPCStress(jirix *jiri.X, testName string, _ ...Opt) (*test.Result, error) {
return runRPCTest(jirix, testName, testStressNodeName, testStressNumServerNodes, testStressNumClientNodes, runStressTest)
}
// vanadiumGoRPCLoad runs an RPC load test with multiple GCE instances.
func vanadiumGoRPCLoad(jirix *jiri.X, testName string, _ ...Opt) (*test.Result, error) {
return runRPCTest(jirix, testName, testLoadNodeName, testLoadNumServerNodes, testLoadNumClientNodes, runLoadTest)
}
func runRPCTest(jirix *jiri.X, testName, nodeName string, numServerNodes, numClientNodes int, testFunc func(*jiri.X, string) (*test.Result, error)) (_ *test.Result, e error) {
cleanup, err := initTest(jirix, testName, []string{"v23:base"})
if err != nil {
return nil, newInternalError(err, "Init")
}
defer collect.Error(func() error { return cleanup() }, &e)
s := jirix.NewSeq()
// Install binaries.
if err := s.Last("jiri", "go", "install", vcloudPkg, serverPkg, clientPkg); err != nil {
return nil, newInternalError(err, "Install Binaries")
}
// Cleanup old nodes if any.
fmt.Fprint(jirix.Stdout(), "Deleting old nodes...\n")
if err := deleteNodes(jirix, nodeName, numServerNodes, numClientNodes); err != nil {
fmt.Fprintf(jirix.Stdout(), "IGNORED: %v\n", err)
}
// Create nodes.
fmt.Fprint(jirix.Stdout(), "Creating nodes...\n")
if err := createNodes(jirix, nodeName, numServerNodes, numClientNodes); err != nil {
return nil, newInternalError(err, "Create Nodes")
}
// Start servers.
fmt.Fprint(jirix.Stdout(), "Starting servers...\n")
serverDone, err := startServers(jirix, nodeName, numServerNodes)
if err != nil {
return nil, newInternalError(err, "Start Servers")
}
// Run the test.
fmt.Fprint(jirix.Stdout(), "Running test...\n")
result, err := testFunc(jirix, testName)
if err != nil {
return nil, newInternalError(err, "Run Test")
}
// Stop servers.
fmt.Fprint(jirix.Stdout(), "Stopping servers...\n")
if err := stopServers(jirix, nodeName, numServerNodes); err != nil {
return nil, newInternalError(err, "Stop Servers")
}
if err := <-serverDone; err != nil {
return nil, newInternalError(err, "Stop Servers")
}
// Delete nodes.
fmt.Fprint(jirix.Stdout(), "Deleting nodes...\n")
if err := deleteNodes(jirix, nodeName, numServerNodes, numClientNodes); err != nil {
return nil, newInternalError(err, "Delete Nodes")
}
return result, nil
}
func serverNodeName(nodeName string, n int) string {
return fmt.Sprintf("%s-%s-server-%02d", gceNodePrefix, nodeName, n)
}
func clientNodeName(nodeName string, n int) string {
return fmt.Sprintf("%s-%s-client-%02d", gceNodePrefix, nodeName, n)
}
func createNodes(jirix *jiri.X, nodeName string, numServerNodes, numClientNodes int) error {
s := jirix.NewSeq()
cmd := filepath.Join(jirix.Root, binPath, "vcloud")
args := []string{
"node", "create",
"-project", gceProject,
"-zone", gceZone,
}
serverArgs := append(args, "-machine-type", gceServerMachineType)
for n := 0; n < numServerNodes; n++ {
serverArgs = append(serverArgs, serverNodeName(nodeName, n))
}
if err := s.Last(cmd, serverArgs...); err != nil {
return err
}
clientArgs := append(args, "-machine-type", gceClientMachineType)
for n := 0; n < numClientNodes; n++ {
clientArgs = append(clientArgs, clientNodeName(nodeName, n))
}
return s.Last(cmd, clientArgs...)
}
func deleteNodes(jirix *jiri.X, nodeName string, numServerNodes, numClientNodes int) error {
s := jirix.NewSeq()
cmd := filepath.Join(jirix.Root, binPath, "vcloud")
args := []string{
"node", "delete",
"-project", gceProject,
"-zone", gceZone,
}
for n := 0; n < numServerNodes; n++ {
args = append(args, serverNodeName(nodeName, n))
}
for n := 0; n < numClientNodes; n++ {
args = append(args, clientNodeName(nodeName, n))
}
return s.Last(cmd, args...)
}
func startServers(jirix *jiri.X, nodeName string, numServerNodes int) (<-chan error, error) {
var servers []string
for n := 0; n < numServerNodes; n++ {
servers = append(servers, serverNodeName(nodeName, n))
}
cmd := filepath.Join(jirix.Root, binPath, "vcloud")
args := []string{
"run",
"-failfast",
"-project", gceProject,
strings.Join(servers, ","),
filepath.Join(jirix.Root, binPath, "stressd"),
"++",
"./stressd",
"-v23.tcp.address", fmt.Sprintf(":%d", serverPort),
"-duration", serverMaxUpTime.String(),
}
done := make(chan error)
go func() {
done <- jirix.NewSeq().Last(cmd, args...)
}()
// Wait until for a few minute while servers are brought up.
timeout := time.After(waitTimeForServerUp)
select {
case err := <-done:
if err != nil {
return nil, err
}
close(done)
case <-timeout:
}
return done, nil
}
func stopServers(jirix *jiri.X, nodeName string, numServerNodes int) error {
s := jirix.NewSeq()
cmd := filepath.Join(jirix.Root, binPath, "vcloud")
args := []string{
"run",
"-failfast",
"-project", gceProject,
clientNodeName(nodeName, 0),
filepath.Join(jirix.Root, binPath, "stress"),
"++",
"./stress", "stop",
}
for n := 0; n < numServerNodes; n++ {
args = append(args, fmt.Sprintf("/%s:%d", serverNodeName(nodeName, n), serverPort))
}
return s.Last(cmd, args...)
}
func runStressTest(jirix *jiri.X, testName string) (*test.Result, error) {
var servers, clients []string
for n := 0; n < testStressNumServerNodes; n++ {
servers = append(servers, fmt.Sprintf("/%s:%d", serverNodeName(testStressNodeName, n), serverPort))
}
for n := 0; n < testStressNumClientNodes; n++ {
clients = append(clients, clientNodeName(testStressNodeName, n))
}
s := jirix.NewSeq()
var out bytes.Buffer
stdout := io.MultiWriter(jirix.Stdout(), &out)
stderr := io.MultiWriter(jirix.Stderr(), &out)
cmd := filepath.Join(jirix.Root, binPath, "vcloud")
args := []string{
"run",
"-failfast",
"-project", gceProject,
strings.Join(clients, ","),
filepath.Join(jirix.Root, binPath, "stress"),
"++",
"./stress", "stress",
"-workers", strconv.Itoa(testStressNumWorkersPerClient),
"-max-chunk-count", strconv.Itoa(testStressMaxChunkCnt),
"-max-payload-size", strconv.Itoa(testStressMaxPayloadSize),
"-duration", testStressDuration.String(),
"-format", "json",
}
args = append(args, servers...)
if err := s.Capture(stdout, stderr).Last(cmd, args...); err != nil {
return nil, err
}
// Get the stats from the servers and stop them.
args = []string{
"run",
"-failfast",
"-project", gceProject,
clients[0],
filepath.Join(jirix.Root, binPath, "stress"),
"++",
"./stress", "stats",
"-format", "json",
}
args = append(args, servers...)
if err := s.Capture(stdout, stderr).Last(cmd, args...); err != nil {
return nil, err
}
// Read the stats.
cStats, sStats, err := readStressStats(out.String())
if err != nil {
if err := xunit.CreateFailureReport(jirix, testName, "StressTest", "ReadStats", "Failure", err.Error()); err != nil {
return nil, err
}
return &test.Result{Status: test.Failed}, nil
}
fmt.Fprint(jirix.Stdout(), "\nRESULT:\n")
writeStressStats(jirix.Stdout(), "Client Stats:", cStats)
writeStressStats(jirix.Stdout(), "Server Stats:", sStats)
fmt.Fprint(jirix.Stdout(), "\n")
// Verify the stats.
sStats.BytesRecv, sStats.BytesSent = sStats.BytesSent, sStats.BytesRecv
if !reflect.DeepEqual(cStats, sStats) {
output := fmt.Sprintf("%+v != %+v", cStats, sStats)
if err := xunit.CreateFailureReport(jirix, testName, "StressTest", "VerifyStats", "Mismatched", output); err != nil {
return nil, err
}
return &test.Result{Status: test.Failed}, nil
}
return &test.Result{Status: test.Passed}, nil
}
type stressStats struct {
SumCount uint64
SumStreamCount uint64
BytesRecv uint64
BytesSent uint64
}
func readStressStats(out string) (*stressStats, *stressStats, error) {
re := regexp.MustCompile(`client stats:({.*})`)
cStats, err := readStressStatsHelper(re, out, testStressNumClientNodes)
if err != nil {
return nil, nil, err
}
re = regexp.MustCompile(`server stats\(.*\):({.*})`)
sStats, err := readStressStatsHelper(re, out, testStressNumServerNodes)
if err != nil {
return nil, nil, err
}
return cStats, sStats, nil
}
func readStressStatsHelper(re *regexp.Regexp, out string, numStats int) (*stressStats, error) {
matches := re.FindAllSubmatch([]byte(out), -1)
if len(matches) != numStats {
return nil, fmt.Errorf("invalid number of stats: %d != %qd", len(matches), numStats)
}
var merged stressStats
for _, match := range matches {
if len(match) != 2 {
return nil, fmt.Errorf("invalid stats: %q", match)
}
var stats stressStats
if err := json.Unmarshal(match[1], &stats); err != nil {
return nil, fmt.Errorf("invalid stats: %q", match)
}
if stats.SumCount == 0 || stats.SumStreamCount == 0 {
// Although clients choose servers and RPC methods randomly, we report
// this as a failure since it is very unlikely.
return nil, fmt.Errorf("zero count: %q", match)
}
merged.SumCount += stats.SumCount
merged.SumStreamCount += stats.SumStreamCount
merged.BytesRecv += stats.BytesRecv
merged.BytesSent += stats.BytesSent
}
return &merged, nil
}
func writeStressStats(w io.Writer, title string, stats *stressStats) {
fmt.Fprintf(w, "%s\n", title)
fmt.Fprintf(w, "\tNumber of non-streaming RPCs:\t%d\n", stats.SumCount)
fmt.Fprintf(w, "\tNumber of streaming RPCs:\t%d\n", stats.SumStreamCount)
fmt.Fprintf(w, "\tNumber of bytes received:\t%d\n", stats.BytesRecv)
fmt.Fprintf(w, "\tNumber of bytes sent:\t\t%d\n", stats.BytesSent)
}
func runLoadTest(jirix *jiri.X, testName string) (*test.Result, error) {
var servers, clients []string
for n := 0; n < testLoadNumServerNodes; n++ {
servers = append(servers, fmt.Sprintf("/%s:%d", serverNodeName(testLoadNodeName, n), serverPort))
}
for n := 0; n < testLoadNumClientNodes; n++ {
clients = append(clients, clientNodeName(testLoadNodeName, n))
}
s := jirix.NewSeq()
var out bytes.Buffer
stdout := io.MultiWriter(jirix.Stdout(), &out)
stderr := io.MultiWriter(jirix.Stderr(), &out)
cmd := filepath.Join(jirix.Root, binPath, "vcloud")
args := []string{
"run",
"-failfast",
"-project", gceProject,
strings.Join(clients, ","),
filepath.Join(jirix.Root, binPath, "stress"),
"++",
"./stress", "load",
"-cpu", strconv.Itoa(testLoadCPUs),
"-payload-size", strconv.Itoa(testLoadPayloadSize),
"-duration", testLoadDuration.String(),
"-format", "json",
}
args = append(args, servers...)
if err := s.Capture(stdout, stderr).Last(cmd, args...); err != nil {
return nil, err
}
// Read the stats.
stats, err := readLoadStats(out.String(), testLoadNumClientNodes)
if err != nil {
if err := xunit.CreateFailureReport(jirix, testName, "LoadTest", "ReadStats", "Failure", err.Error()); err != nil {
return nil, err
}
return &test.Result{Status: test.Failed}, nil
}
fmt.Fprint(jirix.Stdout(), "\nRESULT:\n")
fmt.Fprint(jirix.Stdout(), "Load Stats\n")
fmt.Fprintf(jirix.Stdout(), "\tNumber of RPCs:\t\t%.2f\n", stats.Iterations)
fmt.Fprintf(jirix.Stdout(), "\tLatency (msec/rpc):\t%.2f\n", stats.MsecPerRpc)
fmt.Fprintf(jirix.Stdout(), "\tQPS:\t\t\t%.2f\n", stats.Qps)
fmt.Fprintf(jirix.Stdout(), "\tQPS/core:\t\t%.2f\n", stats.QpsPerCore)
fmt.Fprint(jirix.Stdout(), "\n")
// Write the test stats in json format for vmon.
filename := filepath.Join(os.Getenv("WORKSPACE"), loadStatsOutputFile)
if err := writeLoadStatsJSON(filename, stats); err != nil {
if err := xunit.CreateFailureReport(jirix, testName, "LoadTest", "WriteLoadStats", "Failure", err.Error()); err != nil {
return nil, err
}
return &test.Result{Status: test.Failed}, nil
}
fmt.Fprintf(jirix.Stdout(), "Wrote load stats to %q\n", filename)
return &test.Result{Status: test.Passed}, nil
}
type loadStats struct {
Iterations float64
MsecPerRpc float64
Qps float64
QpsPerCore float64
}
func readLoadStats(out string, numStats int) (*loadStats, error) {
re := regexp.MustCompile(`load stats:({.*})`)
matches := re.FindAllSubmatch([]byte(out), -1)
if len(matches) != numStats {
return nil, fmt.Errorf("invalid number of stats: %d != %d", len(matches), numStats)
}
var merged loadStats
for _, match := range matches {
if len(match) != 2 {
return nil, fmt.Errorf("invalid stats: %q", match)
}
var stats loadStats
if err := json.Unmarshal(match[1], &stats); err != nil {
return nil, fmt.Errorf("invalid stats: %q", match)
}
if stats.Iterations == 0 {
return nil, fmt.Errorf("zero count: %q", match)
}
merged.Iterations += stats.Iterations
merged.MsecPerRpc += stats.MsecPerRpc
merged.Qps += stats.Qps
merged.QpsPerCore += stats.QpsPerCore
}
merged.Iterations /= float64(numStats)
merged.MsecPerRpc /= float64(numStats)
merged.Qps /= float64(numStats)
merged.QpsPerCore /= float64(numStats)
return &merged, nil
}
func writeLoadStatsJSON(filename string, stats *loadStats) error {
b, err := json.Marshal(stats)
if err != nil {
return err
}
return ioutil.WriteFile(filename, b, 0644)
}