rpc/stress: add a load test
* This changes the current stress test to also support a load test
with non-streaming Echo call.
* the client will use min(maxcpus, num_of_servers) core and the server
will use the maximum available cores.
* Here are the test results with a few variations:
2-server/2-client: Test Stats: {MSecPerRPC:2.785 QPS:719.57 QPSPerCore:359.785}
1-server/1-client: Test Stats: {MSecPerRPC:2.29 QPS:437.32 QPSPerCore:437.32}
2-server/1-client: Test Stats: {MSecPerRPC:2.75 QPS:727.8 QPSPerCore:363.9}
* The initial setting will be 1-client and 1-server.
* Also reduce # of machines for stress tests a bit (due to quota)
MultiPart: 1/2
Change-Id: I264bcb9afff72ab2e88104a13bd92bcac52b152c
diff --git a/profiles/internal/rpc/stress/internal/client.go b/profiles/internal/rpc/stress/internal/client.go
index 27c3e05..c70715f 100644
--- a/profiles/internal/rpc/stress/internal/client.go
+++ b/profiles/internal/rpc/stress/internal/client.go
@@ -6,29 +6,48 @@
import (
"bytes"
- crand "crypto/rand"
"fmt"
"math/rand"
+ "time"
"v.io/v23/context"
- "v.io/x/lib/vlog"
+ "v.io/x/lib/vlog"
"v.io/x/ref/profiles/internal/rpc/stress"
)
-func newArg(maxPayloadSize int) (stress.Arg, error) {
- var arg stress.Arg
- arg.ABool = rand.Intn(2) == 0
- arg.AInt64 = rand.Int63()
- arg.AListOfBytes = make([]byte, rand.Intn(maxPayloadSize)+1)
- _, err := crand.Read(arg.AListOfBytes)
- return arg, err
+// CallEcho calls 'Echo' method with the given payload size for the given time
+// duration and returns the number of iterations.
+func CallEcho(ctx *context.T, server string, payloadSize int, duration time.Duration) uint64 {
+ stub := stress.StressClient(server)
+ payload := make([]byte, payloadSize)
+ for i := range payload {
+ payload[i] = byte(i & 0xff)
+ }
+
+ var iterations uint64
+ start := time.Now()
+ for {
+ got, err := stub.Echo(ctx, payload)
+ if err != nil {
+ vlog.Fatalf("Echo failed: %v", err)
+ }
+ if !bytes.Equal(got, payload) {
+ vlog.Fatalf("Echo returned %v, but expected %v", got, payload)
+ }
+ iterations++
+
+ if time.Since(start) >= duration {
+ break
+ }
+ }
+ return iterations
}
// CallSum calls 'Sum' method with a randomly generated payload.
-func CallSum(ctx *context.T, server string, maxPayloadSize int) {
+func CallSum(ctx *context.T, server string, maxPayloadSize int, stats *stress.SumStats) {
stub := stress.StressClient(server)
- arg, err := newArg(maxPayloadSize)
+ arg, err := newSumArg(maxPayloadSize)
if err != nil {
vlog.Fatalf("new arg failed: %v", err)
}
@@ -38,16 +57,19 @@
vlog.Fatalf("Sum failed: %v", err)
}
- wanted, _ := doSum(arg)
+ wanted, _ := doSum(&arg)
if !bytes.Equal(got, wanted) {
vlog.Fatalf("Sum returned %v, but expected %v", got, wanted)
}
+ stats.SumCount++
+ stats.BytesSent += uint64(lenSumArg(&arg))
+ stats.BytesRecv += uint64(len(got))
}
// CallSumStream calls 'SumStream' method. Each iteration sends up to
// 'maxChunkCnt' chunks on the stream and receives the same number of
// sums back.
-func CallSumStream(ctx *context.T, server string, maxChunkCnt, maxPayloadSize int) {
+func CallSumStream(ctx *context.T, server string, maxChunkCnt, maxPayloadSize int, stats *stress.SumStats) {
stub := stress.StressClient(server)
stream, err := stub.SumStream(ctx)
if err != nil {
@@ -55,7 +77,7 @@
}
chunkCnt := rand.Intn(maxChunkCnt) + 1
- args := make([]stress.Arg, chunkCnt)
+ args := make([]stress.SumArg, chunkCnt)
done := make(chan error, 1)
go func() {
defer close(done)
@@ -64,11 +86,12 @@
i := 0
for ; recvS.Advance(); i++ {
got := recvS.Value()
- wanted, _ := doSum(args[i])
+ wanted, _ := doSum(&args[i])
if !bytes.Equal(got, wanted) {
done <- fmt.Errorf("RecvStream returned %v, but expected %v", got, wanted)
return
}
+ stats.BytesRecv += uint64(len(got))
}
switch err := recvS.Err(); {
case err != nil:
@@ -82,7 +105,7 @@
sendS := stream.SendStream()
for i := 0; i < chunkCnt; i++ {
- arg, err := newArg(maxPayloadSize)
+ arg, err := newSumArg(maxPayloadSize)
if err != nil {
vlog.Fatalf("new arg failed: %v", err)
}
@@ -91,16 +114,16 @@
if err = sendS.Send(arg); err != nil {
vlog.Fatalf("SendStream failed to send: %v", err)
}
+ stats.BytesSent += uint64(lenSumArg(&arg))
}
if err = sendS.Close(); err != nil {
vlog.Fatalf("SendStream failed to close: %v", err)
}
-
if err = <-done; err != nil {
vlog.Fatalf("%v", err)
}
-
if err = stream.Finish(); err != nil {
vlog.Fatalf("Stream failed to finish: %v", err)
}
+ stats.SumStreamCount++
}
diff --git a/profiles/internal/rpc/stress/internal/server.go b/profiles/internal/rpc/stress/internal/server.go
index 58cf3b5..66b49d7 100644
--- a/profiles/internal/rpc/stress/internal/server.go
+++ b/profiles/internal/rpc/stress/internal/server.go
@@ -18,39 +18,62 @@
)
type impl struct {
- statsMu sync.Mutex
- sumCount uint64 // GUARDED_BY(statsMu)
- sumStreamCount uint64 // GUARDED_BY(statsMu)
+ statsMu sync.Mutex
+ stats stress.SumStats // GUARDED_BY(statsMu)
stop chan struct{}
}
-func (s *impl) Sum(_ *context.T, _ rpc.ServerCall, arg stress.Arg) ([]byte, error) {
- defer s.incSumCount()
- return doSum(arg)
+func (s *impl) Echo(_ *context.T, _ rpc.ServerCall, payload []byte) ([]byte, error) {
+ return payload, nil
+}
+
+func (s *impl) Sum(_ *context.T, _ rpc.ServerCall, arg stress.SumArg) ([]byte, error) {
+ sum, err := doSum(&arg)
+ if err != nil {
+ return nil, err
+ }
+ s.addSumStats(false, uint64(lenSumArg(&arg)), uint64(len(sum)))
+ return sum, nil
}
func (s *impl) SumStream(_ *context.T, call stress.StressSumStreamServerCall) error {
- defer s.incSumStreamCount()
rStream := call.RecvStream()
sStream := call.SendStream()
+ var bytesRecv, bytesSent uint64
for rStream.Advance() {
- sum, err := doSum(rStream.Value())
+ arg := rStream.Value()
+ sum, err := doSum(&arg)
if err != nil {
return err
}
sStream.Send(sum)
+ bytesRecv += uint64(lenSumArg(&arg))
+ bytesSent += uint64(len(sum))
}
if err := rStream.Err(); err != nil {
return err
}
+ s.addSumStats(true, bytesRecv, bytesSent)
return nil
}
-func (s *impl) GetStats(*context.T, rpc.ServerCall) (stress.Stats, error) {
+func (s *impl) addSumStats(stream bool, bytesRecv, bytesSent uint64) {
+ s.statsMu.Lock()
+ if stream {
+ s.stats.SumStreamCount++
+ } else {
+ s.stats.SumCount++
+ }
+ s.stats.BytesRecv += bytesRecv
+ s.stats.BytesSent += bytesSent
+ s.statsMu.Unlock()
+}
+
+func (s *impl) GetSumStats(*context.T, rpc.ServerCall) (stress.SumStats, error) {
s.statsMu.Lock()
defer s.statsMu.Unlock()
- return stress.Stats{s.sumCount, s.sumStreamCount}, nil
+ return s.stats, nil
}
func (s *impl) Stop(*context.T, rpc.ServerCall) error {
@@ -58,18 +81,6 @@
return nil
}
-func (s *impl) incSumCount() {
- s.statsMu.Lock()
- defer s.statsMu.Unlock()
- s.sumCount++
-}
-
-func (s *impl) incSumStreamCount() {
- s.statsMu.Lock()
- defer s.statsMu.Unlock()
- s.sumStreamCount++
-}
-
type allowEveryoneAuthorizer struct{}
func (allowEveryoneAuthorizer) Authorize(*context.T, security.Call) error { return nil }
@@ -86,11 +97,13 @@
if err != nil {
vlog.Fatalf("Listen failed: %v", err)
}
+ if len(eps) == 0 {
+ vlog.Fatal("No local address to listen on")
+ }
s := impl{stop: make(chan struct{})}
if err := server.Serve("", stress.StressServer(&s), allowEveryoneAuthorizer{}); err != nil {
vlog.Fatalf("Serve failed: %v", err)
}
-
return server, eps[0], s.stop
}
diff --git a/profiles/internal/rpc/stress/internal/util.go b/profiles/internal/rpc/stress/internal/util.go
index 6744ba1..63f8359 100644
--- a/profiles/internal/rpc/stress/internal/util.go
+++ b/profiles/internal/rpc/stress/internal/util.go
@@ -6,13 +6,31 @@
import (
"crypto/md5"
+ crand "crypto/rand"
"encoding/binary"
+ "math/rand"
"v.io/x/ref/profiles/internal/rpc/stress"
)
-// doSum returns the MD5 checksum of the arg.
-func doSum(arg stress.Arg) ([]byte, error) {
+// newSumArg returns a randomly generated SumArg.
+func newSumArg(maxPayloadSize int) (stress.SumArg, error) {
+ var arg stress.SumArg
+ arg.ABool = rand.Intn(2) == 0
+ arg.AInt64 = rand.Int63()
+ arg.AListOfBytes = make([]byte, rand.Intn(maxPayloadSize)+1)
+ _, err := crand.Read(arg.AListOfBytes)
+ return arg, err
+}
+
+// lenSumArg returns the length of the SumArg in bytes.
+func lenSumArg(arg *stress.SumArg) int {
+ // bool + uint64 + []byte
+ return 1 + 4 + len(arg.AListOfBytes)
+}
+
+// doSum returns the MD5 checksum of the SumArg.
+func doSum(arg *stress.SumArg) ([]byte, error) {
h := md5.New()
if arg.ABool {
if err := binary.Write(h, binary.LittleEndian, arg.AInt64); err != nil {
diff --git a/profiles/internal/rpc/stress/stress.vdl b/profiles/internal/rpc/stress/stress.vdl
index dd89068..1ae6c51 100644
--- a/profiles/internal/rpc/stress/stress.vdl
+++ b/profiles/internal/rpc/stress/stress.vdl
@@ -8,26 +8,31 @@
"v.io/v23/security/access"
)
-type Arg struct {
+type SumArg struct {
ABool bool
AInt64 int64
AListOfBytes []byte
}
-type Stats struct {
+type SumStats struct {
SumCount uint64
SumStreamCount uint64
+ BytesRecv uint64
+ BytesSent uint64
}
type Stress interface {
+ // Echo returns the payload that it receives.
+ Echo(Payload []byte) ([]byte | error) {access.Read}
+
// Do returns the checksum of the payload that it receives.
- Sum(arg Arg) ([]byte | error) {access.Read}
+ Sum(arg SumArg) ([]byte | error) {access.Read}
// DoStream returns the checksum of the payload that it receives via the stream.
- SumStream() stream<Arg,[]byte> error {access.Read}
+ SumStream() stream<SumArg,[]byte> error {access.Read}
- // GetStats returns the stats on the calls that the server received.
- GetStats() (Stats | error) {access.Read}
+ // GetSumStats returns the stats on the Sum calls that the server received.
+ GetSumStats() (SumStats | error) {access.Read}
// Stop stops the server.
Stop() error {access.Admin}
diff --git a/profiles/internal/rpc/stress/stress.vdl.go b/profiles/internal/rpc/stress/stress.vdl.go
index 505c788..3ab11a1 100644
--- a/profiles/internal/rpc/stress/stress.vdl.go
+++ b/profiles/internal/rpc/stress/stress.vdl.go
@@ -19,41 +19,45 @@
"v.io/v23/security/access"
)
-type Arg struct {
+type SumArg struct {
ABool bool
AInt64 int64
AListOfBytes []byte
}
-func (Arg) __VDLReflect(struct {
- Name string "v.io/x/ref/profiles/internal/rpc/stress.Arg"
+func (SumArg) __VDLReflect(struct {
+ Name string "v.io/x/ref/profiles/internal/rpc/stress.SumArg"
}) {
}
-type Stats struct {
+type SumStats struct {
SumCount uint64
SumStreamCount uint64
+ BytesRecv uint64
+ BytesSent uint64
}
-func (Stats) __VDLReflect(struct {
- Name string "v.io/x/ref/profiles/internal/rpc/stress.Stats"
+func (SumStats) __VDLReflect(struct {
+ Name string "v.io/x/ref/profiles/internal/rpc/stress.SumStats"
}) {
}
func init() {
- vdl.Register((*Arg)(nil))
- vdl.Register((*Stats)(nil))
+ vdl.Register((*SumArg)(nil))
+ vdl.Register((*SumStats)(nil))
}
// StressClientMethods is the client interface
// containing Stress methods.
type StressClientMethods interface {
+ // Echo returns the payload that it receives.
+ Echo(ctx *context.T, Payload []byte, opts ...rpc.CallOpt) ([]byte, error)
// Do returns the checksum of the payload that it receives.
- Sum(ctx *context.T, arg Arg, opts ...rpc.CallOpt) ([]byte, error)
+ Sum(ctx *context.T, arg SumArg, opts ...rpc.CallOpt) ([]byte, error)
// DoStream returns the checksum of the payload that it receives via the stream.
SumStream(*context.T, ...rpc.CallOpt) (StressSumStreamClientCall, error)
- // GetStats returns the stats on the calls that the server received.
- GetStats(*context.T, ...rpc.CallOpt) (Stats, error)
+ // GetSumStats returns the stats on the Sum calls that the server received.
+ GetSumStats(*context.T, ...rpc.CallOpt) (SumStats, error)
// Stop stops the server.
Stop(*context.T, ...rpc.CallOpt) error
}
@@ -73,7 +77,12 @@
name string
}
-func (c implStressClientStub) Sum(ctx *context.T, i0 Arg, opts ...rpc.CallOpt) (o0 []byte, err error) {
+func (c implStressClientStub) Echo(ctx *context.T, i0 []byte, opts ...rpc.CallOpt) (o0 []byte, err error) {
+ err = v23.GetClient(ctx).Call(ctx, c.name, "Echo", []interface{}{i0}, []interface{}{&o0}, opts...)
+ return
+}
+
+func (c implStressClientStub) Sum(ctx *context.T, i0 SumArg, opts ...rpc.CallOpt) (o0 []byte, err error) {
err = v23.GetClient(ctx).Call(ctx, c.name, "Sum", []interface{}{i0}, []interface{}{&o0}, opts...)
return
}
@@ -87,8 +96,8 @@
return
}
-func (c implStressClientStub) GetStats(ctx *context.T, opts ...rpc.CallOpt) (o0 Stats, err error) {
- err = v23.GetClient(ctx).Call(ctx, c.name, "GetStats", nil, []interface{}{&o0}, opts...)
+func (c implStressClientStub) GetSumStats(ctx *context.T, opts ...rpc.CallOpt) (o0 SumStats, err error) {
+ err = v23.GetClient(ctx).Call(ctx, c.name, "GetSumStats", nil, []interface{}{&o0}, opts...)
return
}
@@ -118,7 +127,7 @@
// the stream has been canceled. Blocks if there is no buffer
// space; will unblock when buffer space is available or after
// the stream has been canceled.
- Send(item Arg) error
+ Send(item SumArg) error
// Close indicates to the server that no more items will be sent;
// server Recv calls will receive io.EOF after all sent items.
// This is an optional call - e.g. a client might call Close if it
@@ -178,7 +187,7 @@
return c.c.errRecv
}
func (c *implStressSumStreamClientCall) SendStream() interface {
- Send(item Arg) error
+ Send(item SumArg) error
Close() error
} {
return implStressSumStreamClientCallSend{c}
@@ -188,7 +197,7 @@
c *implStressSumStreamClientCall
}
-func (c implStressSumStreamClientCallSend) Send(item Arg) error {
+func (c implStressSumStreamClientCallSend) Send(item SumArg) error {
return c.c.Send(item)
}
func (c implStressSumStreamClientCallSend) Close() error {
@@ -202,12 +211,14 @@
// StressServerMethods is the interface a server writer
// implements for Stress.
type StressServerMethods interface {
+ // Echo returns the payload that it receives.
+ Echo(ctx *context.T, call rpc.ServerCall, Payload []byte) ([]byte, error)
// Do returns the checksum of the payload that it receives.
- Sum(ctx *context.T, call rpc.ServerCall, arg Arg) ([]byte, error)
+ Sum(ctx *context.T, call rpc.ServerCall, arg SumArg) ([]byte, error)
// DoStream returns the checksum of the payload that it receives via the stream.
SumStream(*context.T, StressSumStreamServerCall) error
- // GetStats returns the stats on the calls that the server received.
- GetStats(*context.T, rpc.ServerCall) (Stats, error)
+ // GetSumStats returns the stats on the Sum calls that the server received.
+ GetSumStats(*context.T, rpc.ServerCall) (SumStats, error)
// Stop stops the server.
Stop(*context.T, rpc.ServerCall) error
}
@@ -217,12 +228,14 @@
// The only difference between this interface and StressServerMethods
// is the streaming methods.
type StressServerStubMethods interface {
+ // Echo returns the payload that it receives.
+ Echo(ctx *context.T, call rpc.ServerCall, Payload []byte) ([]byte, error)
// Do returns the checksum of the payload that it receives.
- Sum(ctx *context.T, call rpc.ServerCall, arg Arg) ([]byte, error)
+ Sum(ctx *context.T, call rpc.ServerCall, arg SumArg) ([]byte, error)
// DoStream returns the checksum of the payload that it receives via the stream.
SumStream(*context.T, *StressSumStreamServerCallStub) error
- // GetStats returns the stats on the calls that the server received.
- GetStats(*context.T, rpc.ServerCall) (Stats, error)
+ // GetSumStats returns the stats on the Sum calls that the server received.
+ GetSumStats(*context.T, rpc.ServerCall) (SumStats, error)
// Stop stops the server.
Stop(*context.T, rpc.ServerCall) error
}
@@ -256,7 +269,11 @@
gs *rpc.GlobState
}
-func (s implStressServerStub) Sum(ctx *context.T, call rpc.ServerCall, i0 Arg) ([]byte, error) {
+func (s implStressServerStub) Echo(ctx *context.T, call rpc.ServerCall, i0 []byte) ([]byte, error) {
+ return s.impl.Echo(ctx, call, i0)
+}
+
+func (s implStressServerStub) Sum(ctx *context.T, call rpc.ServerCall, i0 SumArg) ([]byte, error) {
return s.impl.Sum(ctx, call, i0)
}
@@ -264,8 +281,8 @@
return s.impl.SumStream(ctx, call)
}
-func (s implStressServerStub) GetStats(ctx *context.T, call rpc.ServerCall) (Stats, error) {
- return s.impl.GetStats(ctx, call)
+func (s implStressServerStub) GetSumStats(ctx *context.T, call rpc.ServerCall) (SumStats, error) {
+ return s.impl.GetSumStats(ctx, call)
}
func (s implStressServerStub) Stop(ctx *context.T, call rpc.ServerCall) error {
@@ -289,10 +306,21 @@
PkgPath: "v.io/x/ref/profiles/internal/rpc/stress",
Methods: []rpc.MethodDesc{
{
+ Name: "Echo",
+ Doc: "// Echo returns the payload that it receives.",
+ InArgs: []rpc.ArgDesc{
+ {"Payload", ``}, // []byte
+ },
+ OutArgs: []rpc.ArgDesc{
+ {"", ``}, // []byte
+ },
+ Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Read"))},
+ },
+ {
Name: "Sum",
Doc: "// Do returns the checksum of the payload that it receives.",
InArgs: []rpc.ArgDesc{
- {"arg", ``}, // Arg
+ {"arg", ``}, // SumArg
},
OutArgs: []rpc.ArgDesc{
{"", ``}, // []byte
@@ -305,10 +333,10 @@
Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Read"))},
},
{
- Name: "GetStats",
- Doc: "// GetStats returns the stats on the calls that the server received.",
+ Name: "GetSumStats",
+ Doc: "// GetSumStats returns the stats on the Sum calls that the server received.",
OutArgs: []rpc.ArgDesc{
- {"", ``}, // Stats
+ {"", ``}, // SumStats
},
Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Read"))},
},
@@ -330,7 +358,7 @@
Advance() bool
// Value returns the item that was staged by Advance. May panic if Advance
// returned false or was not called. Never blocks.
- Value() Arg
+ Value() SumArg
// Err returns any error encountered by Advance. Never blocks.
Err() error
}
@@ -353,7 +381,7 @@
// a typesafe stub that implements StressSumStreamServerCall.
type StressSumStreamServerCallStub struct {
rpc.StreamServerCall
- valRecv Arg
+ valRecv SumArg
errRecv error
}
@@ -365,7 +393,7 @@
// RecvStream returns the receiver side of the Stress.SumStream server stream.
func (s *StressSumStreamServerCallStub) RecvStream() interface {
Advance() bool
- Value() Arg
+ Value() SumArg
Err() error
} {
return implStressSumStreamServerCallRecv{s}
@@ -376,11 +404,11 @@
}
func (s implStressSumStreamServerCallRecv) Advance() bool {
- s.s.valRecv = Arg{}
+ s.s.valRecv = SumArg{}
s.s.errRecv = s.s.Recv(&s.s.valRecv)
return s.s.errRecv == nil
}
-func (s implStressSumStreamServerCallRecv) Value() Arg {
+func (s implStressSumStreamServerCallRecv) Value() SumArg {
return s.s.valRecv
}
func (s implStressSumStreamServerCallRecv) Err() error {
diff --git a/profiles/internal/rpc/stress/stress/load.go b/profiles/internal/rpc/stress/stress/load.go
new file mode 100644
index 0000000..9c9ceeb
--- /dev/null
+++ b/profiles/internal/rpc/stress/stress/load.go
@@ -0,0 +1,115 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package main
+
+import (
+ "encoding/json"
+ "fmt"
+ "io"
+ "runtime"
+ "time"
+
+ "v.io/v23"
+
+ "v.io/x/lib/cmdline"
+ "v.io/x/ref/profiles/internal/rpc/stress/internal"
+)
+
+var (
+ cpus int
+ payloadSize int
+)
+
+func init() {
+ cmdLoadTest.Flags.DurationVar(&duration, "duration", 1*time.Minute, "duration of the test to run")
+ cmdLoadTest.Flags.IntVar(&cpus, "cpu", 0, "number of cpu cores to use; if zero, use the number of servers to test")
+ cmdLoadTest.Flags.IntVar(&payloadSize, "payload-size", 1000, "size of payload in bytes")
+ cmdLoadTest.Flags.StringVar(&outFormat, "format", "text", "Stats output format; either text or json")
+}
+
+type loadStats struct {
+ Iterations uint64
+ MsecPerRpc float64
+ Qps float64
+ QpsPerCore float64
+}
+
+var cmdLoadTest = &cmdline.Command{
+ Run: runLoadTest,
+ Name: "load",
+ Short: "Run load test",
+ Long: "Run load test",
+ ArgsName: "<server> ...",
+ ArgsLong: "<server> ... A list of servers to connect to.",
+}
+
+func runLoadTest(cmd *cmdline.Command, args []string) error {
+ if len(args) == 0 {
+ return cmd.UsageErrorf("no server specified")
+ }
+ if outFormat != "text" && outFormat != "json" {
+ return cmd.UsageErrorf("invalid output format: %s\n", outFormat)
+ }
+
+ cores := cpus
+ if cores == 0 {
+ cores = len(args)
+ }
+ runtime.GOMAXPROCS(cores)
+
+ ctx, shutdown := v23.Init()
+ defer shutdown()
+
+ fmt.Fprintf(cmd.Stdout(), "starting load test against %d server(s) using %d core(s)...\n", len(args), cores)
+ fmt.Fprintf(cmd.Stdout(), "payloadSize: %d, duration: %v\n", payloadSize, duration)
+
+ start := time.Now()
+ done := make(chan loadStats)
+ for _, server := range args {
+ go func(server string) {
+ var stats loadStats
+
+ start := time.Now()
+ stats.Iterations = internal.CallEcho(ctx, server, payloadSize, duration)
+ elapsed := time.Since(start)
+
+ stats.Qps = float64(stats.Iterations) / elapsed.Seconds()
+ stats.MsecPerRpc = 1000 / stats.Qps
+ done <- stats
+ }(server)
+ }
+ var merged loadStats
+ for i := 0; i < len(args); i++ {
+ stats := <-done
+ merged.Iterations += stats.Iterations
+ merged.MsecPerRpc += stats.MsecPerRpc
+ merged.Qps += stats.Qps
+ }
+ merged.MsecPerRpc /= float64(len(args))
+ merged.QpsPerCore = merged.Qps / float64(cores)
+ elapsed := time.Since(start)
+ fmt.Printf("done after %v\n", elapsed)
+ return outLoadStats(cmd.Stdout(), outFormat, "load stats:", &merged)
+}
+
+func outLoadStats(w io.Writer, format, title string, stats *loadStats) error {
+ switch format {
+ case "text":
+ fmt.Fprintf(w, "%s\n", title)
+ fmt.Fprintf(w, "\tnumber of RPCs:\t\t%d\n", stats.Iterations)
+ fmt.Fprintf(w, "\tlatency (msec/rpc):\t%.2f\n", stats.MsecPerRpc)
+ fmt.Fprintf(w, "\tqps:\t\t\t%.2f\n", stats.Qps)
+ fmt.Fprintf(w, "\tqps/core:\t\t%.2f\n", stats.QpsPerCore)
+ case "json":
+ b, err := json.Marshal(stats)
+ if err != nil {
+ return err
+ }
+ fmt.Fprintf(w, "%s%s\n", title, b)
+ default:
+ return fmt.Errorf("invalid output format: %s\n", format)
+ }
+ return nil
+}
diff --git a/profiles/internal/rpc/stress/stress/main.go b/profiles/internal/rpc/stress/stress/main.go
index c292e69..561cf60 100644
--- a/profiles/internal/rpc/stress/stress/main.go
+++ b/profiles/internal/rpc/stress/stress/main.go
@@ -5,122 +5,51 @@
package main
import (
- "flag"
- "fmt"
- "math/rand"
- "runtime"
- "strings"
- "time"
+ "os"
"v.io/v23"
- "v.io/v23/context"
- "v.io/x/lib/vlog"
+ "v.io/x/lib/cmdline"
"v.io/x/ref/profiles/internal/rpc/stress"
- "v.io/x/ref/profiles/internal/rpc/stress/internal"
_ "v.io/x/ref/profiles/static"
)
-var (
- servers = flag.String("servers", "", "comma-seperated list of of the servers to connect to")
- workers = flag.Int("workers", 1, "number of test workers to run; If zero, no test will be performed.")
- duration = flag.Duration("duration", 1*time.Minute, "duration of the stress test to run")
-
- maxChunkCnt = flag.Int("max_chunk_count", 100, "maximum number of chunks to send per streaming RPC")
- maxPayloadSize = flag.Int("max_payload_size", 10000, "maximum size of payload in bytes")
-
- serverStats = flag.Bool("server_stats", false, "If true, print out the server stats")
- serverStop = flag.Bool("server_stop", false, "If true, shutdown the servers")
-)
-
-func init() {
- rand.Seed(time.Now().UnixNano())
+var cmdStopServers = &cmdline.Command{
+ Run: runStopServers,
+ Name: "stop",
+ Short: "Stop servers",
+ Long: "Stop servers",
+ ArgsName: "<server> ...",
+ ArgsLong: "<server> ... A list of servers to stop.",
}
-func runTest(ctx *context.T, servers []string) {
- fmt.Printf("starting stress test against %d servers...\n", len(servers))
- fmt.Printf("workers: %d, maxChunkCnt: %d, maxPayloadSize: %d\n", *workers, *maxChunkCnt, *maxPayloadSize)
-
- now := time.Now()
- done := make(chan stress.Stats, *workers)
- for i := 0; i < *workers; i++ {
- go func() {
- var sumCount, sumStreamCount uint64
- timeout := time.After(*duration)
- done:
- for {
- server := servers[rand.Intn(len(servers))]
- if rand.Intn(2) == 0 {
- internal.CallSum(ctx, server, *maxPayloadSize)
- sumCount++
- } else {
- internal.CallSumStream(ctx, server, *maxChunkCnt, *maxPayloadSize)
- sumStreamCount++
- }
-
- select {
- case <-timeout:
- break done
- default:
- }
- }
- done <- stress.Stats{sumCount, sumStreamCount}
- }()
+func runStopServers(cmd *cmdline.Command, args []string) error {
+ if len(args) == 0 {
+ return cmd.UsageErrorf("no server specified")
}
- var stats stress.Stats
- for i := 0; i < *workers; i++ {
- s := <-done
- stats.SumCount += s.SumCount
- stats.SumStreamCount += s.SumStreamCount
- }
- elapsed := time.Since(now)
-
- fmt.Printf("done after %v\n", elapsed)
- fmt.Printf("client stats: %+v, ", stats)
- fmt.Printf("qps: %.2f\n", float64(stats.SumCount+stats.SumStreamCount)/elapsed.Seconds())
-}
-
-func printServerStats(ctx *context.T, servers []string) {
- for _, server := range servers {
- stats, err := stress.StressClient(server).GetStats(ctx)
- if err != nil {
- vlog.Fatal("GetStats failed: %v\n", err)
- }
- fmt.Printf("server stats: %q:%+v\n", server, stats)
- }
-}
-
-func stopServers(ctx *context.T, servers []string) {
- for _, server := range servers {
- if err := stress.StressClient(server).Stop(ctx); err != nil {
- vlog.Fatal("Stop failed: %v\n", err)
- }
- }
-}
-
-func main() {
- runtime.GOMAXPROCS(runtime.NumCPU())
-
ctx, shutdown := v23.Init()
defer shutdown()
- var addrs []string
- for _, a := range strings.Split(*servers, ",") {
- addrs = append(addrs, strings.TrimSpace(a))
+ for _, server := range args {
+ if err := stress.StressClient(server).Stop(ctx); err != nil {
+ return err
+ }
}
- if len(addrs) == 0 {
- vlog.Fatal("no server specified")
- }
+ return nil
+}
- if *workers > 0 && *duration > 0 {
- runTest(ctx, addrs)
+func main() {
+ cmdRoot := &cmdline.Command{
+ Name: "stress",
+ Short: "Tool to stress/load test RPC",
+ Long: "Tool to stress/load test RPC by issuing randomly generated requests",
+ Children: []*cmdline.Command{
+ cmdStressTest,
+ cmdStressStats,
+ cmdLoadTest,
+ cmdStopServers,
+ },
}
-
- if *serverStats {
- printServerStats(ctx, addrs)
- }
- if *serverStop {
- stopServers(ctx, addrs)
- }
+ os.Exit(cmdRoot.Main())
}
diff --git a/profiles/internal/rpc/stress/stress/stress.go b/profiles/internal/rpc/stress/stress/stress.go
new file mode 100644
index 0000000..3d363c0
--- /dev/null
+++ b/profiles/internal/rpc/stress/stress/stress.go
@@ -0,0 +1,157 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package main
+
+import (
+ "encoding/json"
+ "fmt"
+ "io"
+ "math/rand"
+ "runtime"
+ "time"
+
+ "v.io/v23"
+
+ "v.io/x/lib/cmdline"
+ "v.io/x/ref/profiles/internal/rpc/stress"
+ "v.io/x/ref/profiles/internal/rpc/stress/internal"
+)
+
+var (
+ duration time.Duration
+
+ workers int
+ maxChunkCnt int
+ maxPayloadSize int
+
+ outFormat string
+)
+
+func init() {
+ cmdStressTest.Flags.DurationVar(&duration, "duration", 1*time.Minute, "duration of the test to run")
+ cmdStressTest.Flags.IntVar(&workers, "workers", 1, "number of test workers to run")
+ cmdStressTest.Flags.IntVar(&maxChunkCnt, "max-chunk-count", 1000, "maximum number of chunks to send per streaming RPC")
+ cmdStressTest.Flags.IntVar(&maxPayloadSize, "max-payload-size", 10000, "maximum size of payload in bytes")
+ cmdStressTest.Flags.StringVar(&outFormat, "format", "text", "Stats output format; either text or json")
+
+ cmdStressStats.Flags.StringVar(&outFormat, "format", "text", "Stats output format; either text or json")
+}
+
+var cmdStressTest = &cmdline.Command{
+ Run: runStressTest,
+ Name: "stress",
+ Short: "Run stress test",
+ Long: "Run stress test",
+ ArgsName: "<server> ...",
+ ArgsLong: "<server> ... A list of servers to connect to.",
+}
+
+func runStressTest(cmd *cmdline.Command, args []string) error {
+ if len(args) == 0 {
+ return cmd.UsageErrorf("no server specified")
+ }
+ if outFormat != "text" && outFormat != "json" {
+ return cmd.UsageErrorf("invalid output format: %s\n", outFormat)
+ }
+
+ cores := runtime.NumCPU()
+ runtime.GOMAXPROCS(cores)
+
+ ctx, shutdown := v23.Init()
+ defer shutdown()
+
+ rand.Seed(time.Now().UnixNano())
+ fmt.Fprintf(cmd.Stdout(), "starting stress test against %d server(s) using %d core(s)...\n", len(args), cores)
+ fmt.Fprintf(cmd.Stdout(), "workers: %d, maxChunkCnt: %d, maxPayloadSize: %d, duration: %v\n", workers, maxChunkCnt, maxPayloadSize, duration)
+
+ start := time.Now()
+ done := make(chan stress.SumStats)
+ for i := 0; i < workers; i++ {
+ go func() {
+ var stats stress.SumStats
+ timeout := time.After(duration)
+ done:
+ for {
+ server := args[rand.Intn(len(args))]
+ if rand.Intn(2) == 0 {
+ internal.CallSum(ctx, server, maxPayloadSize, &stats)
+ } else {
+ internal.CallSumStream(ctx, server, maxChunkCnt, maxPayloadSize, &stats)
+ }
+
+ select {
+ case <-timeout:
+ break done
+ default:
+ }
+ }
+ done <- stats
+ }()
+ }
+ var merged stress.SumStats
+ for i := 0; i < workers; i++ {
+ stats := <-done
+ merged.SumCount += stats.SumCount
+ merged.SumStreamCount += stats.SumStreamCount
+ merged.BytesRecv += stats.BytesRecv
+ merged.BytesSent += stats.BytesSent
+ }
+ elapsed := time.Since(start)
+ fmt.Printf("done after %v\n", elapsed)
+ return outSumStats(cmd.Stdout(), outFormat, "client stats:", &merged)
+}
+
+var cmdStressStats = &cmdline.Command{
+ Run: runStressStats,
+ Name: "stats",
+ Short: "Print out stress stats of servers",
+ Long: "Print out stress stats of servers",
+ ArgsName: "<server> ...",
+ ArgsLong: "<server> ... A list of servers to connect to.",
+}
+
+func runStressStats(cmd *cmdline.Command, args []string) error {
+ if len(args) == 0 {
+ return cmd.UsageErrorf("no server specified")
+ }
+ if outFormat != "text" && outFormat != "json" {
+ return cmd.UsageErrorf("invalid output format: %s\n", outFormat)
+ }
+
+ ctx, shutdown := v23.Init()
+ defer shutdown()
+
+ for _, server := range args {
+ stats, err := stress.StressClient(server).GetSumStats(ctx)
+ if err != nil {
+ return err
+ }
+ title := fmt.Sprintf("server stats(%s):", server)
+ if err := outSumStats(cmd.Stdout(), outFormat, title, &stats); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func outSumStats(w io.Writer, format, title string, stats *stress.SumStats) error {
+ switch format {
+ case "text":
+ fmt.Fprintf(w, "%s\n", title)
+ fmt.Fprintf(w, "\tnumber of non-streaming RPCs:\t%d\n", stats.SumCount)
+ fmt.Fprintf(w, "\tnumber of streaming RPCs:\t%d\n", stats.SumStreamCount)
+ fmt.Fprintf(w, "\tnumber of bytes received:\t%d\n", stats.BytesRecv)
+ fmt.Fprintf(w, "\tnumber of bytes sent:\t\t%d\n", stats.BytesSent)
+ case "json":
+ b, err := json.Marshal(stats)
+ if err != nil {
+ return err
+ }
+ fmt.Fprintf(w, "%s%s\n", title, b)
+ default:
+ return fmt.Errorf("invalid output format: %s\n", format)
+ }
+ return nil
+}