rpc/stress: add a load test

  * This changes the current stress test to also support a load test
    with non-streaming Echo call.
  * the client will use min(maxcpus, num_of_servers) core and the server
    will use the maximum available cores.

  * Here are the test results with a few variations:
    2-server/2-client: Test Stats: {MSecPerRPC:2.785 QPS:719.57 QPSPerCore:359.785}
    1-server/1-client: Test Stats: {MSecPerRPC:2.29 QPS:437.32 QPSPerCore:437.32}
    2-server/1-client: Test Stats: {MSecPerRPC:2.75 QPS:727.8 QPSPerCore:363.9}

  * The initial setting will be 1-client and 1-server.
  * Also reduce # of machines for stress tests a bit (due to quota)

MultiPart: 1/2
Change-Id: I264bcb9afff72ab2e88104a13bd92bcac52b152c
diff --git a/profiles/internal/rpc/stress/internal/client.go b/profiles/internal/rpc/stress/internal/client.go
index 27c3e05..c70715f 100644
--- a/profiles/internal/rpc/stress/internal/client.go
+++ b/profiles/internal/rpc/stress/internal/client.go
@@ -6,29 +6,48 @@
 
 import (
 	"bytes"
-	crand "crypto/rand"
 	"fmt"
 	"math/rand"
+	"time"
 
 	"v.io/v23/context"
-	"v.io/x/lib/vlog"
 
+	"v.io/x/lib/vlog"
 	"v.io/x/ref/profiles/internal/rpc/stress"
 )
 
-func newArg(maxPayloadSize int) (stress.Arg, error) {
-	var arg stress.Arg
-	arg.ABool = rand.Intn(2) == 0
-	arg.AInt64 = rand.Int63()
-	arg.AListOfBytes = make([]byte, rand.Intn(maxPayloadSize)+1)
-	_, err := crand.Read(arg.AListOfBytes)
-	return arg, err
+// CallEcho calls 'Echo' method with the given payload size for the given time
+// duration and returns the number of iterations.
+func CallEcho(ctx *context.T, server string, payloadSize int, duration time.Duration) uint64 {
+	stub := stress.StressClient(server)
+	payload := make([]byte, payloadSize)
+	for i := range payload {
+		payload[i] = byte(i & 0xff)
+	}
+
+	var iterations uint64
+	start := time.Now()
+	for {
+		got, err := stub.Echo(ctx, payload)
+		if err != nil {
+			vlog.Fatalf("Echo failed: %v", err)
+		}
+		if !bytes.Equal(got, payload) {
+			vlog.Fatalf("Echo returned %v, but expected %v", got, payload)
+		}
+		iterations++
+
+		if time.Since(start) >= duration {
+			break
+		}
+	}
+	return iterations
 }
 
 // CallSum calls 'Sum' method with a randomly generated payload.
-func CallSum(ctx *context.T, server string, maxPayloadSize int) {
+func CallSum(ctx *context.T, server string, maxPayloadSize int, stats *stress.SumStats) {
 	stub := stress.StressClient(server)
-	arg, err := newArg(maxPayloadSize)
+	arg, err := newSumArg(maxPayloadSize)
 	if err != nil {
 		vlog.Fatalf("new arg failed: %v", err)
 	}
@@ -38,16 +57,19 @@
 		vlog.Fatalf("Sum failed: %v", err)
 	}
 
-	wanted, _ := doSum(arg)
+	wanted, _ := doSum(&arg)
 	if !bytes.Equal(got, wanted) {
 		vlog.Fatalf("Sum returned %v, but expected %v", got, wanted)
 	}
+	stats.SumCount++
+	stats.BytesSent += uint64(lenSumArg(&arg))
+	stats.BytesRecv += uint64(len(got))
 }
 
 // CallSumStream calls 'SumStream' method. Each iteration sends up to
 // 'maxChunkCnt' chunks on the stream and receives the same number of
 // sums back.
-func CallSumStream(ctx *context.T, server string, maxChunkCnt, maxPayloadSize int) {
+func CallSumStream(ctx *context.T, server string, maxChunkCnt, maxPayloadSize int, stats *stress.SumStats) {
 	stub := stress.StressClient(server)
 	stream, err := stub.SumStream(ctx)
 	if err != nil {
@@ -55,7 +77,7 @@
 	}
 
 	chunkCnt := rand.Intn(maxChunkCnt) + 1
-	args := make([]stress.Arg, chunkCnt)
+	args := make([]stress.SumArg, chunkCnt)
 	done := make(chan error, 1)
 	go func() {
 		defer close(done)
@@ -64,11 +86,12 @@
 		i := 0
 		for ; recvS.Advance(); i++ {
 			got := recvS.Value()
-			wanted, _ := doSum(args[i])
+			wanted, _ := doSum(&args[i])
 			if !bytes.Equal(got, wanted) {
 				done <- fmt.Errorf("RecvStream returned %v, but expected %v", got, wanted)
 				return
 			}
+			stats.BytesRecv += uint64(len(got))
 		}
 		switch err := recvS.Err(); {
 		case err != nil:
@@ -82,7 +105,7 @@
 
 	sendS := stream.SendStream()
 	for i := 0; i < chunkCnt; i++ {
-		arg, err := newArg(maxPayloadSize)
+		arg, err := newSumArg(maxPayloadSize)
 		if err != nil {
 			vlog.Fatalf("new arg failed: %v", err)
 		}
@@ -91,16 +114,16 @@
 		if err = sendS.Send(arg); err != nil {
 			vlog.Fatalf("SendStream failed to send: %v", err)
 		}
+		stats.BytesSent += uint64(lenSumArg(&arg))
 	}
 	if err = sendS.Close(); err != nil {
 		vlog.Fatalf("SendStream failed to close: %v", err)
 	}
-
 	if err = <-done; err != nil {
 		vlog.Fatalf("%v", err)
 	}
-
 	if err = stream.Finish(); err != nil {
 		vlog.Fatalf("Stream failed to finish: %v", err)
 	}
+	stats.SumStreamCount++
 }
diff --git a/profiles/internal/rpc/stress/internal/server.go b/profiles/internal/rpc/stress/internal/server.go
index 58cf3b5..66b49d7 100644
--- a/profiles/internal/rpc/stress/internal/server.go
+++ b/profiles/internal/rpc/stress/internal/server.go
@@ -18,39 +18,62 @@
 )
 
 type impl struct {
-	statsMu        sync.Mutex
-	sumCount       uint64 // GUARDED_BY(statsMu)
-	sumStreamCount uint64 // GUARDED_BY(statsMu)
+	statsMu sync.Mutex
+	stats   stress.SumStats // GUARDED_BY(statsMu)
 
 	stop chan struct{}
 }
 
-func (s *impl) Sum(_ *context.T, _ rpc.ServerCall, arg stress.Arg) ([]byte, error) {
-	defer s.incSumCount()
-	return doSum(arg)
+func (s *impl) Echo(_ *context.T, _ rpc.ServerCall, payload []byte) ([]byte, error) {
+	return payload, nil
+}
+
+func (s *impl) Sum(_ *context.T, _ rpc.ServerCall, arg stress.SumArg) ([]byte, error) {
+	sum, err := doSum(&arg)
+	if err != nil {
+		return nil, err
+	}
+	s.addSumStats(false, uint64(lenSumArg(&arg)), uint64(len(sum)))
+	return sum, nil
 }
 
 func (s *impl) SumStream(_ *context.T, call stress.StressSumStreamServerCall) error {
-	defer s.incSumStreamCount()
 	rStream := call.RecvStream()
 	sStream := call.SendStream()
+	var bytesRecv, bytesSent uint64
 	for rStream.Advance() {
-		sum, err := doSum(rStream.Value())
+		arg := rStream.Value()
+		sum, err := doSum(&arg)
 		if err != nil {
 			return err
 		}
 		sStream.Send(sum)
+		bytesRecv += uint64(lenSumArg(&arg))
+		bytesSent += uint64(len(sum))
 	}
 	if err := rStream.Err(); err != nil {
 		return err
 	}
+	s.addSumStats(true, bytesRecv, bytesSent)
 	return nil
 }
 
-func (s *impl) GetStats(*context.T, rpc.ServerCall) (stress.Stats, error) {
+func (s *impl) addSumStats(stream bool, bytesRecv, bytesSent uint64) {
+	s.statsMu.Lock()
+	if stream {
+		s.stats.SumStreamCount++
+	} else {
+		s.stats.SumCount++
+	}
+	s.stats.BytesRecv += bytesRecv
+	s.stats.BytesSent += bytesSent
+	s.statsMu.Unlock()
+}
+
+func (s *impl) GetSumStats(*context.T, rpc.ServerCall) (stress.SumStats, error) {
 	s.statsMu.Lock()
 	defer s.statsMu.Unlock()
-	return stress.Stats{s.sumCount, s.sumStreamCount}, nil
+	return s.stats, nil
 }
 
 func (s *impl) Stop(*context.T, rpc.ServerCall) error {
@@ -58,18 +81,6 @@
 	return nil
 }
 
-func (s *impl) incSumCount() {
-	s.statsMu.Lock()
-	defer s.statsMu.Unlock()
-	s.sumCount++
-}
-
-func (s *impl) incSumStreamCount() {
-	s.statsMu.Lock()
-	defer s.statsMu.Unlock()
-	s.sumStreamCount++
-}
-
 type allowEveryoneAuthorizer struct{}
 
 func (allowEveryoneAuthorizer) Authorize(*context.T, security.Call) error { return nil }
@@ -86,11 +97,13 @@
 	if err != nil {
 		vlog.Fatalf("Listen failed: %v", err)
 	}
+	if len(eps) == 0 {
+		vlog.Fatal("No local address to listen on")
+	}
 
 	s := impl{stop: make(chan struct{})}
 	if err := server.Serve("", stress.StressServer(&s), allowEveryoneAuthorizer{}); err != nil {
 		vlog.Fatalf("Serve failed: %v", err)
 	}
-
 	return server, eps[0], s.stop
 }
diff --git a/profiles/internal/rpc/stress/internal/util.go b/profiles/internal/rpc/stress/internal/util.go
index 6744ba1..63f8359 100644
--- a/profiles/internal/rpc/stress/internal/util.go
+++ b/profiles/internal/rpc/stress/internal/util.go
@@ -6,13 +6,31 @@
 
 import (
 	"crypto/md5"
+	crand "crypto/rand"
 	"encoding/binary"
+	"math/rand"
 
 	"v.io/x/ref/profiles/internal/rpc/stress"
 )
 
-// doSum returns the MD5 checksum of the arg.
-func doSum(arg stress.Arg) ([]byte, error) {
+// newSumArg returns a randomly generated SumArg.
+func newSumArg(maxPayloadSize int) (stress.SumArg, error) {
+	var arg stress.SumArg
+	arg.ABool = rand.Intn(2) == 0
+	arg.AInt64 = rand.Int63()
+	arg.AListOfBytes = make([]byte, rand.Intn(maxPayloadSize)+1)
+	_, err := crand.Read(arg.AListOfBytes)
+	return arg, err
+}
+
+// lenSumArg returns the length of the SumArg in bytes.
+func lenSumArg(arg *stress.SumArg) int {
+	// bool + uint64 + []byte
+	return 1 + 4 + len(arg.AListOfBytes)
+}
+
+// doSum returns the MD5 checksum of the SumArg.
+func doSum(arg *stress.SumArg) ([]byte, error) {
 	h := md5.New()
 	if arg.ABool {
 		if err := binary.Write(h, binary.LittleEndian, arg.AInt64); err != nil {
diff --git a/profiles/internal/rpc/stress/stress.vdl b/profiles/internal/rpc/stress/stress.vdl
index dd89068..1ae6c51 100644
--- a/profiles/internal/rpc/stress/stress.vdl
+++ b/profiles/internal/rpc/stress/stress.vdl
@@ -8,26 +8,31 @@
 	"v.io/v23/security/access"
 )
 
-type Arg struct {
+type SumArg struct {
 	ABool        bool
 	AInt64       int64
 	AListOfBytes []byte
 }
 
-type Stats struct {
+type SumStats struct {
 	SumCount       uint64
 	SumStreamCount uint64
+	BytesRecv      uint64
+	BytesSent      uint64
 }
 
 type Stress interface {
+	// Echo returns the payload that it receives.
+	Echo(Payload []byte) ([]byte | error) {access.Read}
+
 	// Do returns the checksum of the payload that it receives.
-	Sum(arg Arg) ([]byte | error) {access.Read}
+	Sum(arg SumArg) ([]byte | error) {access.Read}
 
 	// DoStream returns the checksum of the payload that it receives via the stream.
-	SumStream() stream<Arg,[]byte> error {access.Read}
+	SumStream() stream<SumArg,[]byte> error {access.Read}
 
-	// GetStats returns the stats on the calls that the server received.
-	GetStats() (Stats | error) {access.Read}
+	// GetSumStats returns the stats on the Sum calls that the server received.
+	GetSumStats() (SumStats | error) {access.Read}
 
 	// Stop stops the server.
 	Stop() error {access.Admin}
diff --git a/profiles/internal/rpc/stress/stress.vdl.go b/profiles/internal/rpc/stress/stress.vdl.go
index 505c788..3ab11a1 100644
--- a/profiles/internal/rpc/stress/stress.vdl.go
+++ b/profiles/internal/rpc/stress/stress.vdl.go
@@ -19,41 +19,45 @@
 	"v.io/v23/security/access"
 )
 
-type Arg struct {
+type SumArg struct {
 	ABool        bool
 	AInt64       int64
 	AListOfBytes []byte
 }
 
-func (Arg) __VDLReflect(struct {
-	Name string "v.io/x/ref/profiles/internal/rpc/stress.Arg"
+func (SumArg) __VDLReflect(struct {
+	Name string "v.io/x/ref/profiles/internal/rpc/stress.SumArg"
 }) {
 }
 
-type Stats struct {
+type SumStats struct {
 	SumCount       uint64
 	SumStreamCount uint64
+	BytesRecv      uint64
+	BytesSent      uint64
 }
 
-func (Stats) __VDLReflect(struct {
-	Name string "v.io/x/ref/profiles/internal/rpc/stress.Stats"
+func (SumStats) __VDLReflect(struct {
+	Name string "v.io/x/ref/profiles/internal/rpc/stress.SumStats"
 }) {
 }
 
 func init() {
-	vdl.Register((*Arg)(nil))
-	vdl.Register((*Stats)(nil))
+	vdl.Register((*SumArg)(nil))
+	vdl.Register((*SumStats)(nil))
 }
 
 // StressClientMethods is the client interface
 // containing Stress methods.
 type StressClientMethods interface {
+	// Echo returns the payload that it receives.
+	Echo(ctx *context.T, Payload []byte, opts ...rpc.CallOpt) ([]byte, error)
 	// Do returns the checksum of the payload that it receives.
-	Sum(ctx *context.T, arg Arg, opts ...rpc.CallOpt) ([]byte, error)
+	Sum(ctx *context.T, arg SumArg, opts ...rpc.CallOpt) ([]byte, error)
 	// DoStream returns the checksum of the payload that it receives via the stream.
 	SumStream(*context.T, ...rpc.CallOpt) (StressSumStreamClientCall, error)
-	// GetStats returns the stats on the calls that the server received.
-	GetStats(*context.T, ...rpc.CallOpt) (Stats, error)
+	// GetSumStats returns the stats on the Sum calls that the server received.
+	GetSumStats(*context.T, ...rpc.CallOpt) (SumStats, error)
 	// Stop stops the server.
 	Stop(*context.T, ...rpc.CallOpt) error
 }
@@ -73,7 +77,12 @@
 	name string
 }
 
-func (c implStressClientStub) Sum(ctx *context.T, i0 Arg, opts ...rpc.CallOpt) (o0 []byte, err error) {
+func (c implStressClientStub) Echo(ctx *context.T, i0 []byte, opts ...rpc.CallOpt) (o0 []byte, err error) {
+	err = v23.GetClient(ctx).Call(ctx, c.name, "Echo", []interface{}{i0}, []interface{}{&o0}, opts...)
+	return
+}
+
+func (c implStressClientStub) Sum(ctx *context.T, i0 SumArg, opts ...rpc.CallOpt) (o0 []byte, err error) {
 	err = v23.GetClient(ctx).Call(ctx, c.name, "Sum", []interface{}{i0}, []interface{}{&o0}, opts...)
 	return
 }
@@ -87,8 +96,8 @@
 	return
 }
 
-func (c implStressClientStub) GetStats(ctx *context.T, opts ...rpc.CallOpt) (o0 Stats, err error) {
-	err = v23.GetClient(ctx).Call(ctx, c.name, "GetStats", nil, []interface{}{&o0}, opts...)
+func (c implStressClientStub) GetSumStats(ctx *context.T, opts ...rpc.CallOpt) (o0 SumStats, err error) {
+	err = v23.GetClient(ctx).Call(ctx, c.name, "GetSumStats", nil, []interface{}{&o0}, opts...)
 	return
 }
 
@@ -118,7 +127,7 @@
 		// the stream has been canceled.  Blocks if there is no buffer
 		// space; will unblock when buffer space is available or after
 		// the stream has been canceled.
-		Send(item Arg) error
+		Send(item SumArg) error
 		// Close indicates to the server that no more items will be sent;
 		// server Recv calls will receive io.EOF after all sent items.
 		// This is an optional call - e.g. a client might call Close if it
@@ -178,7 +187,7 @@
 	return c.c.errRecv
 }
 func (c *implStressSumStreamClientCall) SendStream() interface {
-	Send(item Arg) error
+	Send(item SumArg) error
 	Close() error
 } {
 	return implStressSumStreamClientCallSend{c}
@@ -188,7 +197,7 @@
 	c *implStressSumStreamClientCall
 }
 
-func (c implStressSumStreamClientCallSend) Send(item Arg) error {
+func (c implStressSumStreamClientCallSend) Send(item SumArg) error {
 	return c.c.Send(item)
 }
 func (c implStressSumStreamClientCallSend) Close() error {
@@ -202,12 +211,14 @@
 // StressServerMethods is the interface a server writer
 // implements for Stress.
 type StressServerMethods interface {
+	// Echo returns the payload that it receives.
+	Echo(ctx *context.T, call rpc.ServerCall, Payload []byte) ([]byte, error)
 	// Do returns the checksum of the payload that it receives.
-	Sum(ctx *context.T, call rpc.ServerCall, arg Arg) ([]byte, error)
+	Sum(ctx *context.T, call rpc.ServerCall, arg SumArg) ([]byte, error)
 	// DoStream returns the checksum of the payload that it receives via the stream.
 	SumStream(*context.T, StressSumStreamServerCall) error
-	// GetStats returns the stats on the calls that the server received.
-	GetStats(*context.T, rpc.ServerCall) (Stats, error)
+	// GetSumStats returns the stats on the Sum calls that the server received.
+	GetSumStats(*context.T, rpc.ServerCall) (SumStats, error)
 	// Stop stops the server.
 	Stop(*context.T, rpc.ServerCall) error
 }
@@ -217,12 +228,14 @@
 // The only difference between this interface and StressServerMethods
 // is the streaming methods.
 type StressServerStubMethods interface {
+	// Echo returns the payload that it receives.
+	Echo(ctx *context.T, call rpc.ServerCall, Payload []byte) ([]byte, error)
 	// Do returns the checksum of the payload that it receives.
-	Sum(ctx *context.T, call rpc.ServerCall, arg Arg) ([]byte, error)
+	Sum(ctx *context.T, call rpc.ServerCall, arg SumArg) ([]byte, error)
 	// DoStream returns the checksum of the payload that it receives via the stream.
 	SumStream(*context.T, *StressSumStreamServerCallStub) error
-	// GetStats returns the stats on the calls that the server received.
-	GetStats(*context.T, rpc.ServerCall) (Stats, error)
+	// GetSumStats returns the stats on the Sum calls that the server received.
+	GetSumStats(*context.T, rpc.ServerCall) (SumStats, error)
 	// Stop stops the server.
 	Stop(*context.T, rpc.ServerCall) error
 }
@@ -256,7 +269,11 @@
 	gs   *rpc.GlobState
 }
 
-func (s implStressServerStub) Sum(ctx *context.T, call rpc.ServerCall, i0 Arg) ([]byte, error) {
+func (s implStressServerStub) Echo(ctx *context.T, call rpc.ServerCall, i0 []byte) ([]byte, error) {
+	return s.impl.Echo(ctx, call, i0)
+}
+
+func (s implStressServerStub) Sum(ctx *context.T, call rpc.ServerCall, i0 SumArg) ([]byte, error) {
 	return s.impl.Sum(ctx, call, i0)
 }
 
@@ -264,8 +281,8 @@
 	return s.impl.SumStream(ctx, call)
 }
 
-func (s implStressServerStub) GetStats(ctx *context.T, call rpc.ServerCall) (Stats, error) {
-	return s.impl.GetStats(ctx, call)
+func (s implStressServerStub) GetSumStats(ctx *context.T, call rpc.ServerCall) (SumStats, error) {
+	return s.impl.GetSumStats(ctx, call)
 }
 
 func (s implStressServerStub) Stop(ctx *context.T, call rpc.ServerCall) error {
@@ -289,10 +306,21 @@
 	PkgPath: "v.io/x/ref/profiles/internal/rpc/stress",
 	Methods: []rpc.MethodDesc{
 		{
+			Name: "Echo",
+			Doc:  "// Echo returns the payload that it receives.",
+			InArgs: []rpc.ArgDesc{
+				{"Payload", ``}, // []byte
+			},
+			OutArgs: []rpc.ArgDesc{
+				{"", ``}, // []byte
+			},
+			Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Read"))},
+		},
+		{
 			Name: "Sum",
 			Doc:  "// Do returns the checksum of the payload that it receives.",
 			InArgs: []rpc.ArgDesc{
-				{"arg", ``}, // Arg
+				{"arg", ``}, // SumArg
 			},
 			OutArgs: []rpc.ArgDesc{
 				{"", ``}, // []byte
@@ -305,10 +333,10 @@
 			Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Read"))},
 		},
 		{
-			Name: "GetStats",
-			Doc:  "// GetStats returns the stats on the calls that the server received.",
+			Name: "GetSumStats",
+			Doc:  "// GetSumStats returns the stats on the Sum calls that the server received.",
 			OutArgs: []rpc.ArgDesc{
-				{"", ``}, // Stats
+				{"", ``}, // SumStats
 			},
 			Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Read"))},
 		},
@@ -330,7 +358,7 @@
 		Advance() bool
 		// Value returns the item that was staged by Advance.  May panic if Advance
 		// returned false or was not called.  Never blocks.
-		Value() Arg
+		Value() SumArg
 		// Err returns any error encountered by Advance.  Never blocks.
 		Err() error
 	}
@@ -353,7 +381,7 @@
 // a typesafe stub that implements StressSumStreamServerCall.
 type StressSumStreamServerCallStub struct {
 	rpc.StreamServerCall
-	valRecv Arg
+	valRecv SumArg
 	errRecv error
 }
 
@@ -365,7 +393,7 @@
 // RecvStream returns the receiver side of the Stress.SumStream server stream.
 func (s *StressSumStreamServerCallStub) RecvStream() interface {
 	Advance() bool
-	Value() Arg
+	Value() SumArg
 	Err() error
 } {
 	return implStressSumStreamServerCallRecv{s}
@@ -376,11 +404,11 @@
 }
 
 func (s implStressSumStreamServerCallRecv) Advance() bool {
-	s.s.valRecv = Arg{}
+	s.s.valRecv = SumArg{}
 	s.s.errRecv = s.s.Recv(&s.s.valRecv)
 	return s.s.errRecv == nil
 }
-func (s implStressSumStreamServerCallRecv) Value() Arg {
+func (s implStressSumStreamServerCallRecv) Value() SumArg {
 	return s.s.valRecv
 }
 func (s implStressSumStreamServerCallRecv) Err() error {
diff --git a/profiles/internal/rpc/stress/stress/load.go b/profiles/internal/rpc/stress/stress/load.go
new file mode 100644
index 0000000..9c9ceeb
--- /dev/null
+++ b/profiles/internal/rpc/stress/stress/load.go
@@ -0,0 +1,115 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package main
+
+import (
+	"encoding/json"
+	"fmt"
+	"io"
+	"runtime"
+	"time"
+
+	"v.io/v23"
+
+	"v.io/x/lib/cmdline"
+	"v.io/x/ref/profiles/internal/rpc/stress/internal"
+)
+
+var (
+	cpus        int
+	payloadSize int
+)
+
+func init() {
+	cmdLoadTest.Flags.DurationVar(&duration, "duration", 1*time.Minute, "duration of the test to run")
+	cmdLoadTest.Flags.IntVar(&cpus, "cpu", 0, "number of cpu cores to use; if zero, use the number of servers to test")
+	cmdLoadTest.Flags.IntVar(&payloadSize, "payload-size", 1000, "size of payload in bytes")
+	cmdLoadTest.Flags.StringVar(&outFormat, "format", "text", "Stats output format; either text or json")
+}
+
+type loadStats struct {
+	Iterations uint64
+	MsecPerRpc float64
+	Qps        float64
+	QpsPerCore float64
+}
+
+var cmdLoadTest = &cmdline.Command{
+	Run:      runLoadTest,
+	Name:     "load",
+	Short:    "Run load test",
+	Long:     "Run load test",
+	ArgsName: "<server> ...",
+	ArgsLong: "<server> ... A list of servers to connect to.",
+}
+
+func runLoadTest(cmd *cmdline.Command, args []string) error {
+	if len(args) == 0 {
+		return cmd.UsageErrorf("no server specified")
+	}
+	if outFormat != "text" && outFormat != "json" {
+		return cmd.UsageErrorf("invalid output format: %s\n", outFormat)
+	}
+
+	cores := cpus
+	if cores == 0 {
+		cores = len(args)
+	}
+	runtime.GOMAXPROCS(cores)
+
+	ctx, shutdown := v23.Init()
+	defer shutdown()
+
+	fmt.Fprintf(cmd.Stdout(), "starting load test against %d server(s) using %d core(s)...\n", len(args), cores)
+	fmt.Fprintf(cmd.Stdout(), "payloadSize: %d, duration: %v\n", payloadSize, duration)
+
+	start := time.Now()
+	done := make(chan loadStats)
+	for _, server := range args {
+		go func(server string) {
+			var stats loadStats
+
+			start := time.Now()
+			stats.Iterations = internal.CallEcho(ctx, server, payloadSize, duration)
+			elapsed := time.Since(start)
+
+			stats.Qps = float64(stats.Iterations) / elapsed.Seconds()
+			stats.MsecPerRpc = 1000 / stats.Qps
+			done <- stats
+		}(server)
+	}
+	var merged loadStats
+	for i := 0; i < len(args); i++ {
+		stats := <-done
+		merged.Iterations += stats.Iterations
+		merged.MsecPerRpc += stats.MsecPerRpc
+		merged.Qps += stats.Qps
+	}
+	merged.MsecPerRpc /= float64(len(args))
+	merged.QpsPerCore = merged.Qps / float64(cores)
+	elapsed := time.Since(start)
+	fmt.Printf("done after %v\n", elapsed)
+	return outLoadStats(cmd.Stdout(), outFormat, "load stats:", &merged)
+}
+
+func outLoadStats(w io.Writer, format, title string, stats *loadStats) error {
+	switch format {
+	case "text":
+		fmt.Fprintf(w, "%s\n", title)
+		fmt.Fprintf(w, "\tnumber of RPCs:\t\t%d\n", stats.Iterations)
+		fmt.Fprintf(w, "\tlatency (msec/rpc):\t%.2f\n", stats.MsecPerRpc)
+		fmt.Fprintf(w, "\tqps:\t\t\t%.2f\n", stats.Qps)
+		fmt.Fprintf(w, "\tqps/core:\t\t%.2f\n", stats.QpsPerCore)
+	case "json":
+		b, err := json.Marshal(stats)
+		if err != nil {
+			return err
+		}
+		fmt.Fprintf(w, "%s%s\n", title, b)
+	default:
+		return fmt.Errorf("invalid output format: %s\n", format)
+	}
+	return nil
+}
diff --git a/profiles/internal/rpc/stress/stress/main.go b/profiles/internal/rpc/stress/stress/main.go
index c292e69..561cf60 100644
--- a/profiles/internal/rpc/stress/stress/main.go
+++ b/profiles/internal/rpc/stress/stress/main.go
@@ -5,122 +5,51 @@
 package main
 
 import (
-	"flag"
-	"fmt"
-	"math/rand"
-	"runtime"
-	"strings"
-	"time"
+	"os"
 
 	"v.io/v23"
-	"v.io/v23/context"
-	"v.io/x/lib/vlog"
 
+	"v.io/x/lib/cmdline"
 	"v.io/x/ref/profiles/internal/rpc/stress"
-	"v.io/x/ref/profiles/internal/rpc/stress/internal"
 	_ "v.io/x/ref/profiles/static"
 )
 
-var (
-	servers  = flag.String("servers", "", "comma-seperated list of of the servers to connect to")
-	workers  = flag.Int("workers", 1, "number of test workers to run; If zero, no test will be performed.")
-	duration = flag.Duration("duration", 1*time.Minute, "duration of the stress test to run")
-
-	maxChunkCnt    = flag.Int("max_chunk_count", 100, "maximum number of chunks to send per streaming RPC")
-	maxPayloadSize = flag.Int("max_payload_size", 10000, "maximum size of payload in bytes")
-
-	serverStats = flag.Bool("server_stats", false, "If true, print out the server stats")
-	serverStop  = flag.Bool("server_stop", false, "If true, shutdown the servers")
-)
-
-func init() {
-	rand.Seed(time.Now().UnixNano())
+var cmdStopServers = &cmdline.Command{
+	Run:      runStopServers,
+	Name:     "stop",
+	Short:    "Stop servers",
+	Long:     "Stop servers",
+	ArgsName: "<server> ...",
+	ArgsLong: "<server> ... A list of servers to stop.",
 }
 
-func runTest(ctx *context.T, servers []string) {
-	fmt.Printf("starting stress test against %d servers...\n", len(servers))
-	fmt.Printf("workers: %d, maxChunkCnt: %d, maxPayloadSize: %d\n", *workers, *maxChunkCnt, *maxPayloadSize)
-
-	now := time.Now()
-	done := make(chan stress.Stats, *workers)
-	for i := 0; i < *workers; i++ {
-		go func() {
-			var sumCount, sumStreamCount uint64
-			timeout := time.After(*duration)
-		done:
-			for {
-				server := servers[rand.Intn(len(servers))]
-				if rand.Intn(2) == 0 {
-					internal.CallSum(ctx, server, *maxPayloadSize)
-					sumCount++
-				} else {
-					internal.CallSumStream(ctx, server, *maxChunkCnt, *maxPayloadSize)
-					sumStreamCount++
-				}
-
-				select {
-				case <-timeout:
-					break done
-				default:
-				}
-			}
-			done <- stress.Stats{sumCount, sumStreamCount}
-		}()
+func runStopServers(cmd *cmdline.Command, args []string) error {
+	if len(args) == 0 {
+		return cmd.UsageErrorf("no server specified")
 	}
 
-	var stats stress.Stats
-	for i := 0; i < *workers; i++ {
-		s := <-done
-		stats.SumCount += s.SumCount
-		stats.SumStreamCount += s.SumStreamCount
-	}
-	elapsed := time.Since(now)
-
-	fmt.Printf("done after %v\n", elapsed)
-	fmt.Printf("client stats: %+v, ", stats)
-	fmt.Printf("qps: %.2f\n", float64(stats.SumCount+stats.SumStreamCount)/elapsed.Seconds())
-}
-
-func printServerStats(ctx *context.T, servers []string) {
-	for _, server := range servers {
-		stats, err := stress.StressClient(server).GetStats(ctx)
-		if err != nil {
-			vlog.Fatal("GetStats failed: %v\n", err)
-		}
-		fmt.Printf("server stats: %q:%+v\n", server, stats)
-	}
-}
-
-func stopServers(ctx *context.T, servers []string) {
-	for _, server := range servers {
-		if err := stress.StressClient(server).Stop(ctx); err != nil {
-			vlog.Fatal("Stop failed: %v\n", err)
-		}
-	}
-}
-
-func main() {
-	runtime.GOMAXPROCS(runtime.NumCPU())
-
 	ctx, shutdown := v23.Init()
 	defer shutdown()
 
-	var addrs []string
-	for _, a := range strings.Split(*servers, ",") {
-		addrs = append(addrs, strings.TrimSpace(a))
+	for _, server := range args {
+		if err := stress.StressClient(server).Stop(ctx); err != nil {
+			return err
+		}
 	}
-	if len(addrs) == 0 {
-		vlog.Fatal("no server specified")
-	}
+	return nil
+}
 
-	if *workers > 0 && *duration > 0 {
-		runTest(ctx, addrs)
+func main() {
+	cmdRoot := &cmdline.Command{
+		Name:  "stress",
+		Short: "Tool to stress/load test RPC",
+		Long:  "Tool to stress/load test RPC by issuing randomly generated requests",
+		Children: []*cmdline.Command{
+			cmdStressTest,
+			cmdStressStats,
+			cmdLoadTest,
+			cmdStopServers,
+		},
 	}
-
-	if *serverStats {
-		printServerStats(ctx, addrs)
-	}
-	if *serverStop {
-		stopServers(ctx, addrs)
-	}
+	os.Exit(cmdRoot.Main())
 }
diff --git a/profiles/internal/rpc/stress/stress/stress.go b/profiles/internal/rpc/stress/stress/stress.go
new file mode 100644
index 0000000..3d363c0
--- /dev/null
+++ b/profiles/internal/rpc/stress/stress/stress.go
@@ -0,0 +1,157 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package main
+
+import (
+	"encoding/json"
+	"fmt"
+	"io"
+	"math/rand"
+	"runtime"
+	"time"
+
+	"v.io/v23"
+
+	"v.io/x/lib/cmdline"
+	"v.io/x/ref/profiles/internal/rpc/stress"
+	"v.io/x/ref/profiles/internal/rpc/stress/internal"
+)
+
+var (
+	duration time.Duration
+
+	workers        int
+	maxChunkCnt    int
+	maxPayloadSize int
+
+	outFormat string
+)
+
+func init() {
+	cmdStressTest.Flags.DurationVar(&duration, "duration", 1*time.Minute, "duration of the test to run")
+	cmdStressTest.Flags.IntVar(&workers, "workers", 1, "number of test workers to run")
+	cmdStressTest.Flags.IntVar(&maxChunkCnt, "max-chunk-count", 1000, "maximum number of chunks to send per streaming RPC")
+	cmdStressTest.Flags.IntVar(&maxPayloadSize, "max-payload-size", 10000, "maximum size of payload in bytes")
+	cmdStressTest.Flags.StringVar(&outFormat, "format", "text", "Stats output format; either text or json")
+
+	cmdStressStats.Flags.StringVar(&outFormat, "format", "text", "Stats output format; either text or json")
+}
+
+var cmdStressTest = &cmdline.Command{
+	Run:      runStressTest,
+	Name:     "stress",
+	Short:    "Run stress test",
+	Long:     "Run stress test",
+	ArgsName: "<server> ...",
+	ArgsLong: "<server> ... A list of servers to connect to.",
+}
+
+func runStressTest(cmd *cmdline.Command, args []string) error {
+	if len(args) == 0 {
+		return cmd.UsageErrorf("no server specified")
+	}
+	if outFormat != "text" && outFormat != "json" {
+		return cmd.UsageErrorf("invalid output format: %s\n", outFormat)
+	}
+
+	cores := runtime.NumCPU()
+	runtime.GOMAXPROCS(cores)
+
+	ctx, shutdown := v23.Init()
+	defer shutdown()
+
+	rand.Seed(time.Now().UnixNano())
+	fmt.Fprintf(cmd.Stdout(), "starting stress test against %d server(s) using %d core(s)...\n", len(args), cores)
+	fmt.Fprintf(cmd.Stdout(), "workers: %d, maxChunkCnt: %d, maxPayloadSize: %d, duration: %v\n", workers, maxChunkCnt, maxPayloadSize, duration)
+
+	start := time.Now()
+	done := make(chan stress.SumStats)
+	for i := 0; i < workers; i++ {
+		go func() {
+			var stats stress.SumStats
+			timeout := time.After(duration)
+		done:
+			for {
+				server := args[rand.Intn(len(args))]
+				if rand.Intn(2) == 0 {
+					internal.CallSum(ctx, server, maxPayloadSize, &stats)
+				} else {
+					internal.CallSumStream(ctx, server, maxChunkCnt, maxPayloadSize, &stats)
+				}
+
+				select {
+				case <-timeout:
+					break done
+				default:
+				}
+			}
+			done <- stats
+		}()
+	}
+	var merged stress.SumStats
+	for i := 0; i < workers; i++ {
+		stats := <-done
+		merged.SumCount += stats.SumCount
+		merged.SumStreamCount += stats.SumStreamCount
+		merged.BytesRecv += stats.BytesRecv
+		merged.BytesSent += stats.BytesSent
+	}
+	elapsed := time.Since(start)
+	fmt.Printf("done after %v\n", elapsed)
+	return outSumStats(cmd.Stdout(), outFormat, "client stats:", &merged)
+}
+
+var cmdStressStats = &cmdline.Command{
+	Run:      runStressStats,
+	Name:     "stats",
+	Short:    "Print out stress stats of servers",
+	Long:     "Print out stress stats of servers",
+	ArgsName: "<server> ...",
+	ArgsLong: "<server> ... A list of servers to connect to.",
+}
+
+func runStressStats(cmd *cmdline.Command, args []string) error {
+	if len(args) == 0 {
+		return cmd.UsageErrorf("no server specified")
+	}
+	if outFormat != "text" && outFormat != "json" {
+		return cmd.UsageErrorf("invalid output format: %s\n", outFormat)
+	}
+
+	ctx, shutdown := v23.Init()
+	defer shutdown()
+
+	for _, server := range args {
+		stats, err := stress.StressClient(server).GetSumStats(ctx)
+		if err != nil {
+			return err
+		}
+		title := fmt.Sprintf("server stats(%s):", server)
+		if err := outSumStats(cmd.Stdout(), outFormat, title, &stats); err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+func outSumStats(w io.Writer, format, title string, stats *stress.SumStats) error {
+	switch format {
+	case "text":
+		fmt.Fprintf(w, "%s\n", title)
+		fmt.Fprintf(w, "\tnumber of non-streaming RPCs:\t%d\n", stats.SumCount)
+		fmt.Fprintf(w, "\tnumber of streaming RPCs:\t%d\n", stats.SumStreamCount)
+		fmt.Fprintf(w, "\tnumber of bytes received:\t%d\n", stats.BytesRecv)
+		fmt.Fprintf(w, "\tnumber of bytes sent:\t\t%d\n", stats.BytesSent)
+	case "json":
+		b, err := json.Marshal(stats)
+		if err != nil {
+			return err
+		}
+		fmt.Fprintf(w, "%s%s\n", title, b)
+	default:
+		return fmt.Errorf("invalid output format: %s\n", format)
+	}
+	return nil
+}