veyron/runtimes/google/ipc: add stress test
- A simple IPC stress test
Ran this test for 1 hour with 4 servers and 6 clients successfully in GCE
with 10 workers, max_chunk=100, max_payload=10K
- Will setup to build binaries in HEAD and run periodically (1 per day?)
Change-Id: Idb866428904498b74276984086118c8b938e50c2
diff --git a/runtimes/google/ipc/benchmark/client.go b/runtimes/google/ipc/benchmark/client.go
index 3a6fbef..2acb683 100644
--- a/runtimes/google/ipc/benchmark/client.go
+++ b/runtimes/google/ipc/benchmark/client.go
@@ -120,7 +120,7 @@
}
}
if err = sStream.Close(); err != nil {
- vlog.Fatalf("EchoStream Send failed: %v", err)
+ vlog.Fatalf("EchoStream Close failed: %v", err)
}
if err = <-rDone; err != nil {
diff --git a/runtimes/google/ipc/stress/internal/client.go b/runtimes/google/ipc/stress/internal/client.go
new file mode 100644
index 0000000..4ab7cae
--- /dev/null
+++ b/runtimes/google/ipc/stress/internal/client.go
@@ -0,0 +1,102 @@
+package internal
+
+import (
+ "bytes"
+ crand "crypto/rand"
+ "fmt"
+ "math/rand"
+
+ "v.io/v23/context"
+ "v.io/v23/vlog"
+
+ "v.io/core/veyron/runtimes/google/ipc/stress"
+)
+
+func newArg(maxPayloadSize int) (stress.Arg, error) {
+ var arg stress.Arg
+ arg.ABool = rand.Intn(2) == 0
+ arg.AInt64 = rand.Int63()
+ arg.AListOfBytes = make([]byte, rand.Intn(maxPayloadSize)+1)
+ _, err := crand.Read(arg.AListOfBytes)
+ return arg, err
+}
+
+// CallSum calls 'Sum' method with a randomly generated payload.
+func CallSum(ctx *context.T, server string, maxPayloadSize int) {
+ stub := stress.StressClient(server)
+ arg, err := newArg(maxPayloadSize)
+ if err != nil {
+ vlog.Fatalf("new arg failed: %v", err)
+ }
+
+ got, err := stub.Sum(ctx, arg)
+ if err != nil {
+ vlog.Fatalf("Sum failed: %v", err)
+ }
+
+ wanted, _ := doSum(arg)
+ if !bytes.Equal(got, wanted) {
+ vlog.Fatalf("Sum returned %v, but expected %v", got, wanted)
+ }
+}
+
+// CallSumStream calls 'SumStream' method. Each iteration sends up to
+// 'maxChunkCnt' chunks on the stream and receives the same number of
+// sums back.
+func CallSumStream(ctx *context.T, server string, maxChunkCnt, maxPayloadSize int) {
+ stub := stress.StressClient(server)
+ stream, err := stub.SumStream(ctx)
+ if err != nil {
+ vlog.Fatalf("Stream failed: %v", err)
+ }
+
+ chunkCnt := rand.Intn(maxChunkCnt) + 1
+ args := make([]stress.Arg, chunkCnt)
+ done := make(chan error, 1)
+ go func() {
+ defer close(done)
+
+ recvS := stream.RecvStream()
+ i := 0
+ for ; recvS.Advance(); i++ {
+ got := recvS.Value()
+ wanted, _ := doSum(args[i])
+ if !bytes.Equal(got, wanted) {
+ done <- fmt.Errorf("RecvStream returned %v, but expected %v", got, wanted)
+ return
+ }
+ }
+ switch err := recvS.Err(); {
+ case err != nil:
+ done <- err
+ case i != chunkCnt:
+ done <- fmt.Errorf("RecvStream returned %d chunks, but expected %d", i, chunkCnt)
+ default:
+ done <- nil
+ }
+ }()
+
+ sendS := stream.SendStream()
+ for i := 0; i < chunkCnt; i++ {
+ arg, err := newArg(maxPayloadSize)
+ if err != nil {
+ vlog.Fatalf("new arg failed: %v", err)
+ }
+ args[i] = arg
+
+ if err = sendS.Send(arg); err != nil {
+ vlog.Fatalf("SendStream failed to send: %v", err)
+ }
+ }
+ if err = sendS.Close(); err != nil {
+ vlog.Fatalf("SendStream failed to close: %v", err)
+ }
+
+ if err = <-done; err != nil {
+ vlog.Fatalf("%v", err)
+ }
+
+ if err = stream.Finish(); err != nil {
+ vlog.Fatalf("Stream failed to finish: %v", err)
+ }
+}
diff --git a/runtimes/google/ipc/stress/internal/server.go b/runtimes/google/ipc/stress/internal/server.go
new file mode 100644
index 0000000..7a73114
--- /dev/null
+++ b/runtimes/google/ipc/stress/internal/server.go
@@ -0,0 +1,92 @@
+package internal
+
+import (
+ "sync"
+
+ "v.io/v23"
+ "v.io/v23/context"
+ "v.io/v23/ipc"
+ "v.io/v23/naming"
+ "v.io/v23/security"
+ "v.io/v23/vlog"
+
+ "v.io/core/veyron/runtimes/google/ipc/stress"
+)
+
+type impl struct {
+ statsMu sync.Mutex
+ sumCount uint64 // GUARDED_BY(statsMu)
+ sumStreamCount uint64 // GUARDED_BY(statsMu)
+
+ stop chan struct{}
+}
+
+func (s *impl) Sum(ctx ipc.ServerContext, arg stress.Arg) ([]byte, error) {
+ defer s.incSumCount()
+ return doSum(arg)
+}
+
+func (s *impl) SumStream(ctx stress.StressSumStreamContext) error {
+ defer s.incSumStreamCount()
+ rStream := ctx.RecvStream()
+ sStream := ctx.SendStream()
+ for rStream.Advance() {
+ sum, err := doSum(rStream.Value())
+ if err != nil {
+ return err
+ }
+ sStream.Send(sum)
+ }
+ if err := rStream.Err(); err != nil {
+ return err
+ }
+ return nil
+}
+
+func (s *impl) GetStats(ctx ipc.ServerContext) (stress.Stats, error) {
+ s.statsMu.Lock()
+ defer s.statsMu.Unlock()
+ return stress.Stats{s.sumCount, s.sumStreamCount}, nil
+}
+
+func (s *impl) Stop(ctx ipc.ServerContext) error {
+ s.stop <- struct{}{}
+ return nil
+}
+
+func (s *impl) incSumCount() {
+ s.statsMu.Lock()
+ defer s.statsMu.Unlock()
+ s.sumCount++
+}
+
+func (s *impl) incSumStreamCount() {
+ s.statsMu.Lock()
+ defer s.statsMu.Unlock()
+ s.sumStreamCount++
+}
+
+type allowEveryoneAuthorizer struct{}
+
+func (allowEveryoneAuthorizer) Authorize(security.Context) error { return nil }
+
+// StartServer starts a server that implements the Stress service, and returns
+// the server and its veyron address. It also returns a channel carrying stop
+// requests. After reading from the stop channel, the application should exit.
+func StartServer(ctx *context.T, listenSpec ipc.ListenSpec) (ipc.Server, naming.Endpoint, <-chan struct{}) {
+ server, err := v23.NewServer(ctx)
+ if err != nil {
+ vlog.Fatalf("NewServer failed: %v", err)
+ }
+ eps, err := server.Listen(listenSpec)
+ if err != nil {
+ vlog.Fatalf("Listen failed: %v", err)
+ }
+
+ s := impl{stop: make(chan struct{})}
+ if err := server.Serve("", stress.StressServer(&s), allowEveryoneAuthorizer{}); err != nil {
+ vlog.Fatalf("Serve failed: %v", err)
+ }
+
+ return server, eps[0], s.stop
+}
diff --git a/runtimes/google/ipc/stress/internal/util.go b/runtimes/google/ipc/stress/internal/util.go
new file mode 100644
index 0000000..8cc6da9
--- /dev/null
+++ b/runtimes/google/ipc/stress/internal/util.go
@@ -0,0 +1,22 @@
+package internal
+
+import (
+ "crypto/md5"
+ "encoding/binary"
+
+ "v.io/core/veyron/runtimes/google/ipc/stress"
+)
+
+// doSum returns the MD5 checksum of the arg.
+func doSum(arg stress.Arg) ([]byte, error) {
+ h := md5.New()
+ if arg.ABool {
+ if err := binary.Write(h, binary.LittleEndian, arg.AInt64); err != nil {
+ return nil, err
+ }
+ }
+ if _, err := h.Write(arg.AListOfBytes); err != nil {
+ return nil, err
+ }
+ return h.Sum(nil), nil
+}
diff --git a/runtimes/google/ipc/stress/stress.vdl b/runtimes/google/ipc/stress/stress.vdl
new file mode 100644
index 0000000..556caad
--- /dev/null
+++ b/runtimes/google/ipc/stress/stress.vdl
@@ -0,0 +1,30 @@
+package stress
+
+import (
+ "v.io/v23/services/security/access"
+)
+
+type Arg struct {
+ ABool bool
+ AInt64 int64
+ AListOfBytes []byte
+}
+
+type Stats struct {
+ SumCount uint64
+ SumStreamCount uint64
+}
+
+type Stress interface {
+ // Do returns the checksum of the payload that it receives.
+ Sum(arg Arg) ([]byte | error) {access.Read}
+
+ // DoStream returns the checksum of the payload that it receives via the stream.
+ SumStream() stream<Arg,[]byte> error {access.Read}
+
+ // GetStats returns the stats on the calls that the server received.
+ GetStats() (Stats | error) {access.Read}
+
+ // Stop stops the server.
+ Stop() error {access.Admin}
+}
diff --git a/runtimes/google/ipc/stress/stress.vdl.go b/runtimes/google/ipc/stress/stress.vdl.go
new file mode 100644
index 0000000..ace7d6a
--- /dev/null
+++ b/runtimes/google/ipc/stress/stress.vdl.go
@@ -0,0 +1,428 @@
+// This file was auto-generated by the veyron vdl tool.
+// Source: stress.vdl
+
+package stress
+
+import (
+ // VDL system imports
+ "io"
+ "v.io/v23"
+ "v.io/v23/context"
+ "v.io/v23/ipc"
+ "v.io/v23/vdl"
+
+ // VDL user imports
+ "v.io/v23/services/security/access"
+)
+
+type Arg struct {
+ ABool bool
+ AInt64 int64
+ AListOfBytes []byte
+}
+
+func (Arg) __VDLReflect(struct {
+ Name string "v.io/core/veyron/runtimes/google/ipc/stress.Arg"
+}) {
+}
+
+type Stats struct {
+ SumCount uint64
+ SumStreamCount uint64
+}
+
+func (Stats) __VDLReflect(struct {
+ Name string "v.io/core/veyron/runtimes/google/ipc/stress.Stats"
+}) {
+}
+
+func init() {
+ vdl.Register((*Arg)(nil))
+ vdl.Register((*Stats)(nil))
+}
+
+// StressClientMethods is the client interface
+// containing Stress methods.
+type StressClientMethods interface {
+ // Do returns the checksum of the payload that it receives.
+ Sum(ctx *context.T, arg Arg, opts ...ipc.CallOpt) ([]byte, error)
+ // DoStream returns the checksum of the payload that it receives via the stream.
+ SumStream(*context.T, ...ipc.CallOpt) (StressSumStreamCall, error)
+ // GetStats returns the stats on the calls that the server received.
+ GetStats(*context.T, ...ipc.CallOpt) (Stats, error)
+ // Stop stops the server.
+ Stop(*context.T, ...ipc.CallOpt) error
+}
+
+// StressClientStub adds universal methods to StressClientMethods.
+type StressClientStub interface {
+ StressClientMethods
+ ipc.UniversalServiceMethods
+}
+
+// StressClient returns a client stub for Stress.
+func StressClient(name string, opts ...ipc.BindOpt) StressClientStub {
+ var client ipc.Client
+ for _, opt := range opts {
+ if clientOpt, ok := opt.(ipc.Client); ok {
+ client = clientOpt
+ }
+ }
+ return implStressClientStub{name, client}
+}
+
+type implStressClientStub struct {
+ name string
+ client ipc.Client
+}
+
+func (c implStressClientStub) c(ctx *context.T) ipc.Client {
+ if c.client != nil {
+ return c.client
+ }
+ return v23.GetClient(ctx)
+}
+
+func (c implStressClientStub) Sum(ctx *context.T, i0 Arg, opts ...ipc.CallOpt) (o0 []byte, err error) {
+ var call ipc.Call
+ if call, err = c.c(ctx).StartCall(ctx, c.name, "Sum", []interface{}{i0}, opts...); err != nil {
+ return
+ }
+ err = call.Finish(&o0)
+ return
+}
+
+func (c implStressClientStub) SumStream(ctx *context.T, opts ...ipc.CallOpt) (ocall StressSumStreamCall, err error) {
+ var call ipc.Call
+ if call, err = c.c(ctx).StartCall(ctx, c.name, "SumStream", nil, opts...); err != nil {
+ return
+ }
+ ocall = &implStressSumStreamCall{Call: call}
+ return
+}
+
+func (c implStressClientStub) GetStats(ctx *context.T, opts ...ipc.CallOpt) (o0 Stats, err error) {
+ var call ipc.Call
+ if call, err = c.c(ctx).StartCall(ctx, c.name, "GetStats", nil, opts...); err != nil {
+ return
+ }
+ err = call.Finish(&o0)
+ return
+}
+
+func (c implStressClientStub) Stop(ctx *context.T, opts ...ipc.CallOpt) (err error) {
+ var call ipc.Call
+ if call, err = c.c(ctx).StartCall(ctx, c.name, "Stop", nil, opts...); err != nil {
+ return
+ }
+ err = call.Finish()
+ return
+}
+
+// StressSumStreamClientStream is the client stream for Stress.SumStream.
+type StressSumStreamClientStream interface {
+ // RecvStream returns the receiver side of the Stress.SumStream client stream.
+ RecvStream() interface {
+ // Advance stages an item so that it may be retrieved via Value. Returns
+ // true iff there is an item to retrieve. Advance must be called before
+ // Value is called. May block if an item is not available.
+ Advance() bool
+ // Value returns the item that was staged by Advance. May panic if Advance
+ // returned false or was not called. Never blocks.
+ Value() []byte
+ // Err returns any error encountered by Advance. Never blocks.
+ Err() error
+ }
+ // SendStream returns the send side of the Stress.SumStream client stream.
+ SendStream() interface {
+ // Send places the item onto the output stream. Returns errors
+ // encountered while sending, or if Send is called after Close or
+ // the stream has been canceled. Blocks if there is no buffer
+ // space; will unblock when buffer space is available or after
+ // the stream has been canceled.
+ Send(item Arg) error
+ // Close indicates to the server that no more items will be sent;
+ // server Recv calls will receive io.EOF after all sent items.
+ // This is an optional call - e.g. a client might call Close if it
+ // needs to continue receiving items from the server after it's
+ // done sending. Returns errors encountered while closing, or if
+ // Close is called after the stream has been canceled. Like Send,
+ // blocks if there is no buffer space available.
+ Close() error
+ }
+}
+
+// StressSumStreamCall represents the call returned from Stress.SumStream.
+type StressSumStreamCall interface {
+ StressSumStreamClientStream
+ // Finish performs the equivalent of SendStream().Close, then blocks until
+ // the server is done, and returns the positional return values for the call.
+ //
+ // Finish returns immediately if the call has been canceled; depending on the
+ // timing the output could either be an error signaling cancelation, or the
+ // valid positional return values from the server.
+ //
+ // Calling Finish is mandatory for releasing stream resources, unless the call
+ // has been canceled or any of the other methods return an error. Finish should
+ // be called at most once.
+ Finish() error
+}
+
+type implStressSumStreamCall struct {
+ ipc.Call
+ valRecv []byte
+ errRecv error
+}
+
+func (c *implStressSumStreamCall) RecvStream() interface {
+ Advance() bool
+ Value() []byte
+ Err() error
+} {
+ return implStressSumStreamCallRecv{c}
+}
+
+type implStressSumStreamCallRecv struct {
+ c *implStressSumStreamCall
+}
+
+func (c implStressSumStreamCallRecv) Advance() bool {
+ c.c.errRecv = c.c.Recv(&c.c.valRecv)
+ return c.c.errRecv == nil
+}
+func (c implStressSumStreamCallRecv) Value() []byte {
+ return c.c.valRecv
+}
+func (c implStressSumStreamCallRecv) Err() error {
+ if c.c.errRecv == io.EOF {
+ return nil
+ }
+ return c.c.errRecv
+}
+func (c *implStressSumStreamCall) SendStream() interface {
+ Send(item Arg) error
+ Close() error
+} {
+ return implStressSumStreamCallSend{c}
+}
+
+type implStressSumStreamCallSend struct {
+ c *implStressSumStreamCall
+}
+
+func (c implStressSumStreamCallSend) Send(item Arg) error {
+ return c.c.Send(item)
+}
+func (c implStressSumStreamCallSend) Close() error {
+ return c.c.CloseSend()
+}
+func (c *implStressSumStreamCall) Finish() (err error) {
+ err = c.Call.Finish()
+ return
+}
+
+// StressServerMethods is the interface a server writer
+// implements for Stress.
+type StressServerMethods interface {
+ // Do returns the checksum of the payload that it receives.
+ Sum(ctx ipc.ServerContext, arg Arg) ([]byte, error)
+ // DoStream returns the checksum of the payload that it receives via the stream.
+ SumStream(StressSumStreamContext) error
+ // GetStats returns the stats on the calls that the server received.
+ GetStats(ipc.ServerContext) (Stats, error)
+ // Stop stops the server.
+ Stop(ipc.ServerContext) error
+}
+
+// StressServerStubMethods is the server interface containing
+// Stress methods, as expected by ipc.Server.
+// The only difference between this interface and StressServerMethods
+// is the streaming methods.
+type StressServerStubMethods interface {
+ // Do returns the checksum of the payload that it receives.
+ Sum(ctx ipc.ServerContext, arg Arg) ([]byte, error)
+ // DoStream returns the checksum of the payload that it receives via the stream.
+ SumStream(*StressSumStreamContextStub) error
+ // GetStats returns the stats on the calls that the server received.
+ GetStats(ipc.ServerContext) (Stats, error)
+ // Stop stops the server.
+ Stop(ipc.ServerContext) error
+}
+
+// StressServerStub adds universal methods to StressServerStubMethods.
+type StressServerStub interface {
+ StressServerStubMethods
+ // Describe the Stress interfaces.
+ Describe__() []ipc.InterfaceDesc
+}
+
+// StressServer returns a server stub for Stress.
+// It converts an implementation of StressServerMethods into
+// an object that may be used by ipc.Server.
+func StressServer(impl StressServerMethods) StressServerStub {
+ stub := implStressServerStub{
+ impl: impl,
+ }
+ // Initialize GlobState; always check the stub itself first, to handle the
+ // case where the user has the Glob method defined in their VDL source.
+ if gs := ipc.NewGlobState(stub); gs != nil {
+ stub.gs = gs
+ } else if gs := ipc.NewGlobState(impl); gs != nil {
+ stub.gs = gs
+ }
+ return stub
+}
+
+type implStressServerStub struct {
+ impl StressServerMethods
+ gs *ipc.GlobState
+}
+
+func (s implStressServerStub) Sum(ctx ipc.ServerContext, i0 Arg) ([]byte, error) {
+ return s.impl.Sum(ctx, i0)
+}
+
+func (s implStressServerStub) SumStream(ctx *StressSumStreamContextStub) error {
+ return s.impl.SumStream(ctx)
+}
+
+func (s implStressServerStub) GetStats(ctx ipc.ServerContext) (Stats, error) {
+ return s.impl.GetStats(ctx)
+}
+
+func (s implStressServerStub) Stop(ctx ipc.ServerContext) error {
+ return s.impl.Stop(ctx)
+}
+
+func (s implStressServerStub) Globber() *ipc.GlobState {
+ return s.gs
+}
+
+func (s implStressServerStub) Describe__() []ipc.InterfaceDesc {
+ return []ipc.InterfaceDesc{StressDesc}
+}
+
+// StressDesc describes the Stress interface.
+var StressDesc ipc.InterfaceDesc = descStress
+
+// descStress hides the desc to keep godoc clean.
+var descStress = ipc.InterfaceDesc{
+ Name: "Stress",
+ PkgPath: "v.io/core/veyron/runtimes/google/ipc/stress",
+ Methods: []ipc.MethodDesc{
+ {
+ Name: "Sum",
+ Doc: "// Do returns the checksum of the payload that it receives.",
+ InArgs: []ipc.ArgDesc{
+ {"arg", ``}, // Arg
+ },
+ OutArgs: []ipc.ArgDesc{
+ {"", ``}, // []byte
+ },
+ Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Read"))},
+ },
+ {
+ Name: "SumStream",
+ Doc: "// DoStream returns the checksum of the payload that it receives via the stream.",
+ Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Read"))},
+ },
+ {
+ Name: "GetStats",
+ Doc: "// GetStats returns the stats on the calls that the server received.",
+ OutArgs: []ipc.ArgDesc{
+ {"", ``}, // Stats
+ },
+ Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Read"))},
+ },
+ {
+ Name: "Stop",
+ Doc: "// Stop stops the server.",
+ Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Admin"))},
+ },
+ },
+}
+
+// StressSumStreamServerStream is the server stream for Stress.SumStream.
+type StressSumStreamServerStream interface {
+ // RecvStream returns the receiver side of the Stress.SumStream server stream.
+ RecvStream() interface {
+ // Advance stages an item so that it may be retrieved via Value. Returns
+ // true iff there is an item to retrieve. Advance must be called before
+ // Value is called. May block if an item is not available.
+ Advance() bool
+ // Value returns the item that was staged by Advance. May panic if Advance
+ // returned false or was not called. Never blocks.
+ Value() Arg
+ // Err returns any error encountered by Advance. Never blocks.
+ Err() error
+ }
+ // SendStream returns the send side of the Stress.SumStream server stream.
+ SendStream() interface {
+ // Send places the item onto the output stream. Returns errors encountered
+ // while sending. Blocks if there is no buffer space; will unblock when
+ // buffer space is available.
+ Send(item []byte) error
+ }
+}
+
+// StressSumStreamContext represents the context passed to Stress.SumStream.
+type StressSumStreamContext interface {
+ ipc.ServerContext
+ StressSumStreamServerStream
+}
+
+// StressSumStreamContextStub is a wrapper that converts ipc.ServerCall into
+// a typesafe stub that implements StressSumStreamContext.
+type StressSumStreamContextStub struct {
+ ipc.ServerCall
+ valRecv Arg
+ errRecv error
+}
+
+// Init initializes StressSumStreamContextStub from ipc.ServerCall.
+func (s *StressSumStreamContextStub) Init(call ipc.ServerCall) {
+ s.ServerCall = call
+}
+
+// RecvStream returns the receiver side of the Stress.SumStream server stream.
+func (s *StressSumStreamContextStub) RecvStream() interface {
+ Advance() bool
+ Value() Arg
+ Err() error
+} {
+ return implStressSumStreamContextRecv{s}
+}
+
+type implStressSumStreamContextRecv struct {
+ s *StressSumStreamContextStub
+}
+
+func (s implStressSumStreamContextRecv) Advance() bool {
+ s.s.valRecv = Arg{}
+ s.s.errRecv = s.s.Recv(&s.s.valRecv)
+ return s.s.errRecv == nil
+}
+func (s implStressSumStreamContextRecv) Value() Arg {
+ return s.s.valRecv
+}
+func (s implStressSumStreamContextRecv) Err() error {
+ if s.s.errRecv == io.EOF {
+ return nil
+ }
+ return s.s.errRecv
+}
+
+// SendStream returns the send side of the Stress.SumStream server stream.
+func (s *StressSumStreamContextStub) SendStream() interface {
+ Send(item []byte) error
+} {
+ return implStressSumStreamContextSend{s}
+}
+
+type implStressSumStreamContextSend struct {
+ s *StressSumStreamContextStub
+}
+
+func (s implStressSumStreamContextSend) Send(item []byte) error {
+ return s.s.Send(item)
+}
diff --git a/runtimes/google/ipc/stress/stress/main.go b/runtimes/google/ipc/stress/stress/main.go
new file mode 100644
index 0000000..af93d8c
--- /dev/null
+++ b/runtimes/google/ipc/stress/stress/main.go
@@ -0,0 +1,122 @@
+package main
+
+import (
+ "flag"
+ "fmt"
+ "math/rand"
+ "runtime"
+ "strings"
+ "time"
+
+ "v.io/v23"
+ "v.io/v23/context"
+ "v.io/v23/vlog"
+
+ _ "v.io/core/veyron/profiles/static"
+ "v.io/core/veyron/runtimes/google/ipc/stress"
+ "v.io/core/veyron/runtimes/google/ipc/stress/internal"
+)
+
+var (
+ servers = flag.String("servers", "", "comma-seperated list of of the servers to connect to")
+ workers = flag.Int("workers", 1, "number of test workers to run; If zero, no test will be performed.")
+ duration = flag.Duration("duration", 1*time.Minute, "duration of the stress test to run")
+
+ maxChunkCnt = flag.Int("max_chunk_count", 100, "maximum number of chunks to send per streaming RPC")
+ maxPayloadSize = flag.Int("max_payload_size", 10000, "maximum size of payload in bytes")
+
+ serverStats = flag.Bool("server_stats", false, "If true, print out the server stats")
+ serverStop = flag.Bool("server_stop", false, "If true, shutdown the servers")
+)
+
+func init() {
+ rand.Seed(time.Now().UnixNano())
+}
+
+func runTest(ctx *context.T, servers []string) {
+ fmt.Printf("starting stress test against %d servers...\n", len(servers))
+ fmt.Printf("workers: %d, maxChunkCnt: %d, maxPayloadSize: %d\n", *workers, *maxChunkCnt, *maxPayloadSize)
+
+ now := time.Now()
+ done := make(chan stress.Stats, *workers)
+ for i := 0; i < *workers; i++ {
+ go func() {
+ var sumCount, sumStreamCount uint64
+ timeout := time.After(*duration)
+ done:
+ for {
+ server := servers[rand.Intn(len(servers))]
+ if rand.Intn(2) == 0 {
+ internal.CallSum(ctx, server, *maxPayloadSize)
+ sumCount++
+ } else {
+ internal.CallSumStream(ctx, server, *maxChunkCnt, *maxPayloadSize)
+ sumStreamCount++
+ }
+
+ select {
+ case <-timeout:
+ break done
+ default:
+ }
+ }
+ done <- stress.Stats{sumCount, sumStreamCount}
+ }()
+ }
+
+ var stats stress.Stats
+ for i := 0; i < *workers; i++ {
+ s := <-done
+ stats.SumCount += s.SumCount
+ stats.SumStreamCount += s.SumStreamCount
+ }
+ elapsed := time.Since(now)
+
+ fmt.Printf("done after %v\n", elapsed)
+ fmt.Printf("client stats: %+v, ", stats)
+ fmt.Printf("qps: %.2f\n", float64(stats.SumCount+stats.SumStreamCount)/elapsed.Seconds())
+}
+
+func printServerStats(ctx *context.T, servers []string) {
+ for _, server := range servers {
+ stats, err := stress.StressClient(server).GetStats(ctx)
+ if err != nil {
+ vlog.Fatal("GetStats failed: %v\n", err)
+ }
+ fmt.Printf("server stats: %q:%+v\n", server, stats)
+ }
+}
+
+func stopServers(ctx *context.T, servers []string) {
+ for _, server := range servers {
+ if err := stress.StressClient(server).Stop(ctx); err != nil {
+ vlog.Fatal("Stop failed: %v\n", err)
+ }
+ }
+}
+
+func main() {
+ runtime.GOMAXPROCS(runtime.NumCPU())
+
+ ctx, shutdown := v23.Init()
+ defer shutdown()
+
+ var addrs []string
+ for _, a := range strings.Split(*servers, ",") {
+ addrs = append(addrs, strings.TrimSpace(a))
+ }
+ if len(addrs) == 0 {
+ vlog.Fatal("no server specified")
+ }
+
+ if *workers > 0 && *duration > 0 {
+ runTest(ctx, addrs)
+ }
+
+ if *serverStats {
+ printServerStats(ctx, addrs)
+ }
+ if *serverStop {
+ stopServers(ctx, addrs)
+ }
+}
diff --git a/runtimes/google/ipc/stress/stressd/main.go b/runtimes/google/ipc/stress/stressd/main.go
new file mode 100644
index 0000000..845a762
--- /dev/null
+++ b/runtimes/google/ipc/stress/stressd/main.go
@@ -0,0 +1,44 @@
+// A simple command-line tool to run the benchmark server.
+package main
+
+import (
+ "flag"
+ "runtime"
+ "time"
+
+ "v.io/v23"
+ "v.io/v23/vlog"
+
+ "v.io/core/veyron/lib/signals"
+ _ "v.io/core/veyron/profiles/static"
+ "v.io/core/veyron/runtimes/google/ipc/stress/internal"
+)
+
+var (
+ duration = flag.Duration("duration", 0, "duration of the stress test to run; if zero, there is no limit.")
+)
+
+func main() {
+ runtime.GOMAXPROCS(runtime.NumCPU())
+
+ ctx, shutdown := v23.Init()
+ defer shutdown()
+
+ server, ep, stop := internal.StartServer(ctx, v23.GetListenSpec(ctx))
+ vlog.Infof("listening on %s", ep.Name())
+
+ var timeout <-chan time.Time
+ if *duration > 0 {
+ timeout = time.After(*duration)
+ }
+ select {
+ case <-timeout:
+ case <-stop:
+ case <-signals.ShutdownOnSignals(ctx):
+ }
+
+ if err := server.Stop(); err != nil {
+ vlog.Fatalf("Stop() failed: %v", err)
+ }
+ vlog.Info("stopped.")
+}