Revise IPC benchmark and add a new benchmark on multiplexing.
o Changed IPC benchmark clients to add throughput measurement and
share it for benchmarks on multiplexing.
o Added a stand-alone benchmark binary bm/main.go for clean histogram
outputs. (With many benchmarks, it is very hard to read the
benchmark results when printing histogram in each benchmark
trials.)
o Added the latest benchmark results to RESULTS.txt
Note:
o There will be one follow-up CL to add a new benchmark to bmclient/bmserver.
o I'm not sure when we got the BM results in README.txt, but a new BM results
shows that the current IPC performance is 2 ~ 4X slower than before even
with a better machine.
Change-Id: I66942c33a849af2b37990d5368ede85cdb9b76cc
diff --git a/runtimes/google/ipc/benchmarks/client.go b/runtimes/google/ipc/benchmarks/client.go
index b9fe30c..bad7225 100644
--- a/runtimes/google/ipc/benchmarks/client.go
+++ b/runtimes/google/ipc/benchmarks/client.go
@@ -3,96 +3,168 @@
import (
"bytes"
"fmt"
- "io"
+ "testing"
"time"
- "veyron.io/veyron/veyron2"
+ "veyron.io/veyron/veyron/lib/testutil"
+
"veyron.io/veyron/veyron2/context"
"veyron.io/veyron/veyron2/vlog"
)
-// CallEcho calls the Echo method 'iterations' times with the given payload
-// size, and optionally logs the result.
-func CallEcho(ctx context.T, address string, iterations, payloadSize int, log io.Writer) {
+// CallEcho calls 'Echo' method 'iterations' times with the given payload
+// size, and optionally updates the stats.
+func CallEcho(b *testing.B, ctx context.T, address string, iterations, payloadSize int, stats *testutil.BenchStats) {
+ stub := BenchmarkClient(address)
+ _, err := stub.Echo(ctx, make([]byte, 0)) // Create VC.
+ if err != nil {
+ vlog.Fatalf("Echo failed: %v", err)
+ }
+
payload := make([]byte, payloadSize)
- for _, i := range payload {
+ for i := range payload {
payload[i] = byte(i & 0xff)
}
- stub := BenchmarkClient(address)
+ if stats != nil {
+ stats.Clear()
+ }
+
+ b.SetBytes(int64(payloadSize) * 2) // 2 for round trip of each payload.
+ b.ResetTimer() // Exclude setup time from measurement.
+
for i := 0; i < iterations; i++ {
+ b.StartTimer()
start := time.Now()
- result, err := stub.Echo(ctx, payload)
+
+ r, err := stub.Echo(ctx, payload)
+
elapsed := time.Since(start)
+ b.StopTimer()
+
if err != nil {
vlog.Fatalf("Echo failed: %v", err)
}
- if !bytes.Equal(payload, result) {
- vlog.Fatalf("Echo return different payload: got %v, expected %v", result, payload)
+ if !bytes.Equal(r, payload) {
+ vlog.Fatalf("Echo returned %v, but expected %v", r, payload)
}
- if log != nil {
- log.Write([]byte(fmt.Sprintf("CallEcho %d %d\n", i, elapsed)))
+
+ if stats != nil {
+ stats.Add(elapsed)
}
}
}
-// CallEchoStream calls the EchoStream method 'rpcCount' times. Each iteration
-// sends 'messageCount' messages on the stream and receives the same number
-// back. Each message has the given payload size. Optionally logs the result.
-func CallEchoStream(runtime veyron2.Runtime, address string, rpcCount, messageCount, payloadSize int, log io.Writer) {
+// CallEchoStream calls 'EchoStream' method 'iterations' times. Each iteration
+// sends 'chunkCnt' chunks on the stream and receives the same number of chunks
+// back. Each chunk has the given payload size. Optionally updates the stats.
+func CallEchoStream(b *testing.B, ctx context.T, address string, iterations, chunkCnt, payloadSize int, stats *testutil.BenchStats) {
+ done, _ := StartEchoStream(b, ctx, address, iterations, chunkCnt, payloadSize, stats)
+ <-done
+}
+
+// StartEchoStream starts to call 'EchoStream' method 'iterations' times.
+// This does not block, and returns a channel that will receive the number
+// of iterations when it's done. It also returns a callback function to stop
+// the streaming. Each iteration requests 'chunkCnt' chunks on the stream and
+// receives that number of chunks back. Each chunk has the given payload size.
+// Optionally updates the stats. Zero 'iterations' means unlimited.
+func StartEchoStream(b *testing.B, ctx context.T, address string, iterations, chunkCnt, payloadSize int, stats *testutil.BenchStats) (<-chan int, func()) {
+ stub := BenchmarkClient(address)
+ _, err := stub.Echo(ctx, make([]byte, 0)) // Create VC.
+ if err != nil {
+ vlog.Fatalf("Echo failed: %v", err)
+ }
+
payload := make([]byte, payloadSize)
- for _, i := range payload {
+ for i := range payload {
payload[i] = byte(i & 0xff)
}
- stub := BenchmarkClient(address)
- for i := 0; i < rpcCount; i++ {
- start := time.Now()
- ctx, _ := runtime.NewContext().WithTimeout(time.Hour)
- stream, err := stub.EchoStream(ctx)
- if err != nil {
- vlog.Fatalf("EchoStream failed: %v", err)
+ if stats != nil {
+ stats.Clear()
+ }
+
+ done, stop := make(chan int, 1), make(chan struct{})
+ stopped := func() bool {
+ select {
+ case <-stop:
+ return true
+ default:
+ return false
}
- done := make(chan error, 1)
- go func() {
- rStream := stream.RecvStream()
- for rStream.Advance() {
- chunk := rStream.Value()
- if err == io.EOF {
- done <- nil
- return
- }
- if err != nil {
- done <- err
- return
- }
- if !bytes.Equal(payload, chunk) {
- done <- fmt.Errorf("Recv got different payload: got %v, expected %v", chunk, payload)
- return
- }
+ }
+
+ if b.N > 0 {
+ // 2 for round trip of each payload.
+ b.SetBytes(int64((iterations*chunkCnt/b.N)*payloadSize) * 2)
+ }
+ b.ResetTimer() // Exclude setup time from measurement.
+
+ go func() {
+ defer close(done)
+
+ n := 0
+ for ; !stopped() && (iterations == 0 || n < iterations); n++ {
+ b.StartTimer()
+ start := time.Now()
+
+ stream, err := stub.EchoStream(ctx)
+ if err != nil {
+ vlog.Fatalf("EchoStream failed: %v", err)
}
- done <- rStream.Err()
- }()
- sender := stream.SendStream()
- for j := 0; j < messageCount; j++ {
- if err = sender.Send(payload); err != nil {
- vlog.Fatalf("Send failed: %v", err)
+ rDone := make(chan error, 1)
+ go func() {
+ defer close(rDone)
+
+ rStream := stream.RecvStream()
+ i := 0
+ for ; rStream.Advance(); i++ {
+ r := rStream.Value()
+ if !bytes.Equal(r, payload) {
+ rDone <- fmt.Errorf("EchoStream returned %v, but expected %v", r, payload)
+ return
+ }
+ }
+ if i != chunkCnt {
+ rDone <- fmt.Errorf("EchoStream returned %d chunks, but expected %d", n, chunkCnt)
+ return
+ }
+ rDone <- rStream.Err()
+ }()
+
+ sStream := stream.SendStream()
+ for i := 0; i < chunkCnt; i++ {
+ if err = sStream.Send(payload); err != nil {
+ vlog.Fatalf("EchoStream Send failed: %v", err)
+ }
+ }
+ if err = sStream.Close(); err != nil {
+ vlog.Fatalf("EchoStream Send failed: %v", err)
+ }
+
+ if err = <-rDone; err != nil {
+ vlog.Fatalf("%v", err)
+ }
+
+ if err = stream.Finish(); err != nil {
+ vlog.Fatalf("Finish failed: %v", err)
+ }
+
+ elapsed := time.Since(start)
+ b.StopTimer()
+
+ if stats != nil {
+ stats.Add(elapsed)
}
}
- if err = sender.Close(); err != nil {
- vlog.Fatalf("Close() failed: %v", err)
- }
- if err = <-done; err != nil {
- vlog.Fatalf("%v", err)
- }
- if err = stream.Finish(); err != nil {
- vlog.Fatalf("Finish failed: %v", err)
- }
- elapsed := time.Since(start)
- if log != nil {
- log.Write([]byte(fmt.Sprintf("CallEchoStream %d %d\n", i, elapsed)))
- }
+ done <- n
+ }()
+
+ return done, func() {
+ close(stop)
+ <-done
}
}