| package benchmark |
| |
| import ( |
| "os" |
| "sync" |
| "testing" |
| "time" |
| |
| "golang.org/x/net/context" |
| "google.golang.org/grpc" |
| testpb "google.golang.org/grpc/benchmark/grpc_testing" |
| "google.golang.org/grpc/benchmark/stats" |
| "google.golang.org/grpc/grpclog" |
| ) |
| |
| func runUnary(b *testing.B, maxConcurrentCalls int) { |
| s := stats.AddStats(b, 38) |
| b.StopTimer() |
| target, stopper := StartServer(ServerInfo{Addr: "localhost:0", Type: "protobuf"}) |
| defer stopper() |
| conn := NewClientConn(target, grpc.WithInsecure()) |
| tc := testpb.NewBenchmarkServiceClient(conn) |
| |
| // Warm up connection. |
| for i := 0; i < 10; i++ { |
| unaryCaller(tc) |
| } |
| ch := make(chan int, maxConcurrentCalls*4) |
| var ( |
| mu sync.Mutex |
| wg sync.WaitGroup |
| ) |
| wg.Add(maxConcurrentCalls) |
| |
| // Distribute the b.N calls over maxConcurrentCalls workers. |
| for i := 0; i < maxConcurrentCalls; i++ { |
| go func() { |
| for range ch { |
| start := time.Now() |
| unaryCaller(tc) |
| elapse := time.Since(start) |
| mu.Lock() |
| s.Add(elapse) |
| mu.Unlock() |
| } |
| wg.Done() |
| }() |
| } |
| b.StartTimer() |
| for i := 0; i < b.N; i++ { |
| ch <- i |
| } |
| b.StopTimer() |
| close(ch) |
| wg.Wait() |
| conn.Close() |
| } |
| |
| func runStream(b *testing.B, maxConcurrentCalls int) { |
| s := stats.AddStats(b, 38) |
| b.StopTimer() |
| target, stopper := StartServer(ServerInfo{Addr: "localhost:0", Type: "protobuf"}) |
| defer stopper() |
| conn := NewClientConn(target, grpc.WithInsecure()) |
| tc := testpb.NewBenchmarkServiceClient(conn) |
| |
| // Warm up connection. |
| stream, err := tc.StreamingCall(context.Background()) |
| if err != nil { |
| b.Fatalf("%v.StreamingCall(_) = _, %v", tc, err) |
| } |
| for i := 0; i < 10; i++ { |
| streamCaller(stream) |
| } |
| |
| ch := make(chan int, maxConcurrentCalls*4) |
| var ( |
| mu sync.Mutex |
| wg sync.WaitGroup |
| ) |
| wg.Add(maxConcurrentCalls) |
| |
| // Distribute the b.N calls over maxConcurrentCalls workers. |
| for i := 0; i < maxConcurrentCalls; i++ { |
| go func() { |
| stream, err := tc.StreamingCall(context.Background()) |
| if err != nil { |
| b.Fatalf("%v.StreamingCall(_) = _, %v", tc, err) |
| } |
| for range ch { |
| start := time.Now() |
| streamCaller(stream) |
| elapse := time.Since(start) |
| mu.Lock() |
| s.Add(elapse) |
| mu.Unlock() |
| } |
| wg.Done() |
| }() |
| } |
| b.StartTimer() |
| for i := 0; i < b.N; i++ { |
| ch <- i |
| } |
| b.StopTimer() |
| close(ch) |
| wg.Wait() |
| conn.Close() |
| } |
| func unaryCaller(client testpb.BenchmarkServiceClient) { |
| if err := DoUnaryCall(client, 1, 1); err != nil { |
| grpclog.Fatalf("DoUnaryCall failed: %v", err) |
| } |
| } |
| |
| func streamCaller(stream testpb.BenchmarkService_StreamingCallClient) { |
| if err := DoStreamingRoundTrip(stream, 1, 1); err != nil { |
| grpclog.Fatalf("DoStreamingRoundTrip failed: %v", err) |
| } |
| } |
| |
| func BenchmarkClientStreamc1(b *testing.B) { |
| grpc.EnableTracing = true |
| runStream(b, 1) |
| } |
| |
| func BenchmarkClientStreamc8(b *testing.B) { |
| grpc.EnableTracing = true |
| runStream(b, 8) |
| } |
| |
| func BenchmarkClientStreamc64(b *testing.B) { |
| grpc.EnableTracing = true |
| runStream(b, 64) |
| } |
| |
| func BenchmarkClientStreamc512(b *testing.B) { |
| grpc.EnableTracing = true |
| runStream(b, 512) |
| } |
| func BenchmarkClientUnaryc1(b *testing.B) { |
| grpc.EnableTracing = true |
| runUnary(b, 1) |
| } |
| |
| func BenchmarkClientUnaryc8(b *testing.B) { |
| grpc.EnableTracing = true |
| runUnary(b, 8) |
| } |
| |
| func BenchmarkClientUnaryc64(b *testing.B) { |
| grpc.EnableTracing = true |
| runUnary(b, 64) |
| } |
| |
| func BenchmarkClientUnaryc512(b *testing.B) { |
| grpc.EnableTracing = true |
| runUnary(b, 512) |
| } |
| |
| func BenchmarkClientStreamNoTracec1(b *testing.B) { |
| grpc.EnableTracing = false |
| runStream(b, 1) |
| } |
| |
| func BenchmarkClientStreamNoTracec8(b *testing.B) { |
| grpc.EnableTracing = false |
| runStream(b, 8) |
| } |
| |
| func BenchmarkClientStreamNoTracec64(b *testing.B) { |
| grpc.EnableTracing = false |
| runStream(b, 64) |
| } |
| |
| func BenchmarkClientStreamNoTracec512(b *testing.B) { |
| grpc.EnableTracing = false |
| runStream(b, 512) |
| } |
| func BenchmarkClientUnaryNoTracec1(b *testing.B) { |
| grpc.EnableTracing = false |
| runUnary(b, 1) |
| } |
| |
| func BenchmarkClientUnaryNoTracec8(b *testing.B) { |
| grpc.EnableTracing = false |
| runUnary(b, 8) |
| } |
| |
| func BenchmarkClientUnaryNoTracec64(b *testing.B) { |
| grpc.EnableTracing = false |
| runUnary(b, 64) |
| } |
| |
| func BenchmarkClientUnaryNoTracec512(b *testing.B) { |
| grpc.EnableTracing = false |
| runUnary(b, 512) |
| } |
| |
| func TestMain(m *testing.M) { |
| os.Exit(stats.RunTestMain(m)) |
| } |