Revise IPC benchmark and add a new benchmark on multiplexing.
  o Changed IPC benchmark clients to add throughput measurement and
    share it for benchmarks on multiplexing.
  o Added a stand-alone benchmark binary bm/main.go for clean histogram
    outputs. (With many benchmarks, it is very hard to read the
    benchmark results when printing histogram in each benchmark
    trials.)
  o Added the latest benchmark results to RESULTS.txt

Note:
  o There will be one follow-up CL to add a new benchmark to bmclient/bmserver.
  o I'm not sure when we got the BM results in README.txt, but a new BM results
    shows that the current IPC performance is 2 ~ 4X slower than before even
    with a better machine.

Change-Id: I66942c33a849af2b37990d5368ede85cdb9b76cc
diff --git a/runtimes/google/ipc/benchmarks/client.go b/runtimes/google/ipc/benchmarks/client.go
index b9fe30c..bad7225 100644
--- a/runtimes/google/ipc/benchmarks/client.go
+++ b/runtimes/google/ipc/benchmarks/client.go
@@ -3,96 +3,168 @@
 import (
 	"bytes"
 	"fmt"
-	"io"
+	"testing"
 	"time"
 
-	"veyron.io/veyron/veyron2"
+	"veyron.io/veyron/veyron/lib/testutil"
+
 	"veyron.io/veyron/veyron2/context"
 	"veyron.io/veyron/veyron2/vlog"
 )
 
-// CallEcho calls the Echo method 'iterations' times with the given payload
-// size, and optionally logs the result.
-func CallEcho(ctx context.T, address string, iterations, payloadSize int, log io.Writer) {
+// CallEcho calls 'Echo' method 'iterations' times with the given payload
+// size, and optionally updates the stats.
+func CallEcho(b *testing.B, ctx context.T, address string, iterations, payloadSize int, stats *testutil.BenchStats) {
+	stub := BenchmarkClient(address)
+	_, err := stub.Echo(ctx, make([]byte, 0)) // Create VC.
+	if err != nil {
+		vlog.Fatalf("Echo failed: %v", err)
+	}
+
 	payload := make([]byte, payloadSize)
-	for _, i := range payload {
+	for i := range payload {
 		payload[i] = byte(i & 0xff)
 	}
 
-	stub := BenchmarkClient(address)
+	if stats != nil {
+		stats.Clear()
+	}
+
+	b.SetBytes(int64(payloadSize) * 2) // 2 for round trip of each payload.
+	b.ResetTimer()                     // Exclude setup time from measurement.
+
 	for i := 0; i < iterations; i++ {
+		b.StartTimer()
 		start := time.Now()
-		result, err := stub.Echo(ctx, payload)
+
+		r, err := stub.Echo(ctx, payload)
+
 		elapsed := time.Since(start)
+		b.StopTimer()
+
 		if err != nil {
 			vlog.Fatalf("Echo failed: %v", err)
 		}
-		if !bytes.Equal(payload, result) {
-			vlog.Fatalf("Echo return different payload: got %v, expected %v", result, payload)
+		if !bytes.Equal(r, payload) {
+			vlog.Fatalf("Echo returned %v, but expected %v", r, payload)
 		}
-		if log != nil {
-			log.Write([]byte(fmt.Sprintf("CallEcho %d %d\n", i, elapsed)))
+
+		if stats != nil {
+			stats.Add(elapsed)
 		}
 	}
 }
 
-// CallEchoStream calls the EchoStream method 'rpcCount' times. Each iteration
-// sends 'messageCount' messages on the stream and receives the same number
-// back. Each message has the given payload size. Optionally logs the result.
-func CallEchoStream(runtime veyron2.Runtime, address string, rpcCount, messageCount, payloadSize int, log io.Writer) {
+// CallEchoStream calls 'EchoStream' method 'iterations' times. Each iteration
+// sends 'chunkCnt' chunks on the stream and receives the same number of chunks
+// back. Each chunk has the given payload size. Optionally updates the stats.
+func CallEchoStream(b *testing.B, ctx context.T, address string, iterations, chunkCnt, payloadSize int, stats *testutil.BenchStats) {
+	done, _ := StartEchoStream(b, ctx, address, iterations, chunkCnt, payloadSize, stats)
+	<-done
+}
+
+// StartEchoStream starts to call 'EchoStream' method 'iterations' times.
+// This does not block, and returns a channel that will receive the number
+// of iterations when it's done. It also returns a callback function to stop
+// the streaming. Each iteration requests 'chunkCnt' chunks on the stream and
+// receives that number of chunks back. Each chunk has the given payload size.
+// Optionally updates the stats. Zero 'iterations' means unlimited.
+func StartEchoStream(b *testing.B, ctx context.T, address string, iterations, chunkCnt, payloadSize int, stats *testutil.BenchStats) (<-chan int, func()) {
+	stub := BenchmarkClient(address)
+	_, err := stub.Echo(ctx, make([]byte, 0)) // Create VC.
+	if err != nil {
+		vlog.Fatalf("Echo failed: %v", err)
+	}
+
 	payload := make([]byte, payloadSize)
-	for _, i := range payload {
+	for i := range payload {
 		payload[i] = byte(i & 0xff)
 	}
 
-	stub := BenchmarkClient(address)
-	for i := 0; i < rpcCount; i++ {
-		start := time.Now()
-		ctx, _ := runtime.NewContext().WithTimeout(time.Hour)
-		stream, err := stub.EchoStream(ctx)
-		if err != nil {
-			vlog.Fatalf("EchoStream failed: %v", err)
+	if stats != nil {
+		stats.Clear()
+	}
+
+	done, stop := make(chan int, 1), make(chan struct{})
+	stopped := func() bool {
+		select {
+		case <-stop:
+			return true
+		default:
+			return false
 		}
-		done := make(chan error, 1)
-		go func() {
-			rStream := stream.RecvStream()
-			for rStream.Advance() {
-				chunk := rStream.Value()
-				if err == io.EOF {
-					done <- nil
-					return
-				}
-				if err != nil {
-					done <- err
-					return
-				}
-				if !bytes.Equal(payload, chunk) {
-					done <- fmt.Errorf("Recv got different payload: got %v, expected %v", chunk, payload)
-					return
-				}
+	}
+
+	if b.N > 0 {
+		// 2 for round trip of each payload.
+		b.SetBytes(int64((iterations*chunkCnt/b.N)*payloadSize) * 2)
+	}
+	b.ResetTimer() // Exclude setup time from measurement.
+
+	go func() {
+		defer close(done)
+
+		n := 0
+		for ; !stopped() && (iterations == 0 || n < iterations); n++ {
+			b.StartTimer()
+			start := time.Now()
+
+			stream, err := stub.EchoStream(ctx)
+			if err != nil {
+				vlog.Fatalf("EchoStream failed: %v", err)
 			}
 
-			done <- rStream.Err()
-		}()
-		sender := stream.SendStream()
-		for j := 0; j < messageCount; j++ {
-			if err = sender.Send(payload); err != nil {
-				vlog.Fatalf("Send failed: %v", err)
+			rDone := make(chan error, 1)
+			go func() {
+				defer close(rDone)
+
+				rStream := stream.RecvStream()
+				i := 0
+				for ; rStream.Advance(); i++ {
+					r := rStream.Value()
+					if !bytes.Equal(r, payload) {
+						rDone <- fmt.Errorf("EchoStream returned %v, but expected %v", r, payload)
+						return
+					}
+				}
+				if i != chunkCnt {
+					rDone <- fmt.Errorf("EchoStream returned %d chunks, but expected %d", n, chunkCnt)
+					return
+				}
+				rDone <- rStream.Err()
+			}()
+
+			sStream := stream.SendStream()
+			for i := 0; i < chunkCnt; i++ {
+				if err = sStream.Send(payload); err != nil {
+					vlog.Fatalf("EchoStream Send failed: %v", err)
+				}
+			}
+			if err = sStream.Close(); err != nil {
+				vlog.Fatalf("EchoStream Send failed: %v", err)
+			}
+
+			if err = <-rDone; err != nil {
+				vlog.Fatalf("%v", err)
+			}
+
+			if err = stream.Finish(); err != nil {
+				vlog.Fatalf("Finish failed: %v", err)
+			}
+
+			elapsed := time.Since(start)
+			b.StopTimer()
+
+			if stats != nil {
+				stats.Add(elapsed)
 			}
 		}
-		if err = sender.Close(); err != nil {
-			vlog.Fatalf("Close() failed: %v", err)
-		}
-		if err = <-done; err != nil {
-			vlog.Fatalf("%v", err)
-		}
 
-		if err = stream.Finish(); err != nil {
-			vlog.Fatalf("Finish failed: %v", err)
-		}
-		elapsed := time.Since(start)
-		if log != nil {
-			log.Write([]byte(fmt.Sprintf("CallEchoStream %d %d\n", i, elapsed)))
-		}
+		done <- n
+	}()
+
+	return done, func() {
+		close(stop)
+		<-done
 	}
 }