blob: f7f1596f093d266ae7399db1e8ea1c7ce054d0d3 [file] [log] [blame]
Asim Shankarf76614c2015-04-20 09:53:21 -07001// Copyright 2015 The Vanadium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package main
6
7import (
8 "fmt"
9 "os"
10 "os/signal"
11 "sync"
12 "time"
13
14 "v.io/v23"
15 "v.io/v23/context"
16
17 "v.io/x/lib/vlog"
18 "v.io/x/ref/lib/stats/histogram"
19)
20
21// params encapsulates "input" information to the loadtester.
22type params struct {
23 NetworkDistance time.Duration // Distance (in time) of the server from this driver
24 Rate float64 // Desired rate of sending RPCs (RPC/sec)
25 Duration time.Duration // Duration over which loadtest traffic should be sent
26 Context *context.T
27 Reauthenticate bool // If true, each RPC should establish a network connection (and authenticate)
28}
29
30// report is generated by a run of the loadtest.
31type report struct {
32 Count int64 // Count of RPCs sent
33 Elapsed time.Duration // Time period over which Count RPCs were sent
34 AvgLatency time.Duration
35 HistMS *histogram.Histogram // Histogram of latencies in milliseconds
36}
37
38func (r *report) Print(params params) error {
39 if r.HistMS != nil {
40 fmt.Println("RPC latency histogram (in ms):")
41 fmt.Println(r.HistMS.Value())
42 }
43 actualRate := float64(r.Count*int64(time.Second)) / float64(r.Elapsed)
44 fmt.Printf("Network Distance: %v\n", params.NetworkDistance)
45 fmt.Printf("#RPCs sent: %v\n", r.Count)
46 fmt.Printf("RPCs/sec sent: %.2f (%.2f%% of the desired rate of %v)\n", actualRate, actualRate*100/params.Rate, params.Rate)
47 fmt.Printf("Avg. Latency: %v\n", r.AvgLatency)
48 // Mark the results are tainted if the deviation from the desired rate is too large
49 if 0.9*params.Rate > actualRate {
50 return fmt.Errorf("TAINTED RESULTS: drove less traffic than desired: either server or loadtester had a bottleneck")
51 }
52 return nil
53}
54
55func run(f func(*context.T) (time.Duration, error), p params) error {
56 var (
57 ticker = time.NewTicker(time.Duration(float64(time.Second) / p.Rate))
58 latency = make(chan time.Duration, 1000)
59 started = time.Now()
60 stop = time.After(p.Duration)
61 interrupt = make(chan os.Signal)
62 ret report
63 )
64 defer ticker.Stop()
65 warmup(p.Context, f)
66 signal.Notify(interrupt, os.Interrupt)
67 defer signal.Stop(interrupt)
68
69 stopped := false
70 var sumMS int64
71 var lastInterrupt time.Time
72 for !stopped {
73 select {
74 case <-ticker.C:
75 go call(p.Context, f, p.Reauthenticate, latency)
76 case d := <-latency:
77 if ret.HistMS != nil {
78 ret.HistMS.Add(int64(d / time.Millisecond))
79 }
80 ret.Count++
81 sumMS += int64(d / time.Millisecond)
82 // Use 10 samples to determine how the histogram should be setup
83 if ret.Count == 10 {
84 avgms := sumMS / ret.Count
85 opts := histogram.Options{
86 NumBuckets: 32,
87 // Mostly interested in tail latencies,
88 // so have the histogram start close to
89 // the current average.
90 MinValue: int64(float64(avgms) * 0.95),
91 GrowthFactor: 0.20,
92 }
93 vlog.Infof("Creating histogram after %d samples (%vms avg latency): %+v", ret.Count, avgms, opts)
94 ret.HistMS = histogram.New(opts)
95 }
96 case sig := <-interrupt:
97 if time.Since(lastInterrupt) < time.Second {
98 vlog.Infof("Multiple %v signals received, aborting test", sig)
99 stopped = true
100 break
101 }
102 lastInterrupt = time.Now()
103 // Print a temporary report
104 fmt.Println("INTERMEDIATE REPORT:")
105 ret.Elapsed = time.Since(started)
106 ret.AvgLatency = time.Duration(float64(sumMS)/float64(ret.Count)) * time.Millisecond
107 if err := ret.Print(p); err != nil {
108 fmt.Println(err)
109 }
110 fmt.Println("--------------------------------------------------------------------------------")
111 case <-stop:
112 stopped = true
113 }
114 }
115 ret.Elapsed = time.Since(started)
116 ret.AvgLatency = time.Duration(float64(sumMS)/float64(ret.Count)) * time.Millisecond
117 return ret.Print(p)
118}
119
120func warmup(ctx *context.T, f func(*context.T) (time.Duration, error)) {
121 const nWarmup = 10
122 vlog.Infof("Sending %d requests as warmup", nWarmup)
123 var wg sync.WaitGroup
124 for i := 0; i < nWarmup; i++ {
125 wg.Add(1)
126 go func() {
127 f(ctx)
128 wg.Done()
129 }()
130 }
131 wg.Wait()
132 vlog.Infof("Done with warmup")
133}
134
135func call(ctx *context.T, f func(*context.T) (time.Duration, error), reauth bool, d chan<- time.Duration) {
136 client := v23.GetClient(ctx)
137 if reauth {
138 // HACK ALERT: At the time the line below was written, it was
139 // known that the implementation would cause 'ctx' to be setup
140 // such that any subsequent RPC will establish a network
141 // connection from scratch (a new VIF, new VC etc.) If that
142 // implementation changes, then this line below will have to
143 // change!
144 var err error
145 if ctx, err = v23.WithPrincipal(ctx, v23.GetPrincipal(ctx)); err != nil {
146 vlog.Infof("%v", err)
147 return
148 }
149 client = v23.GetClient(ctx)
150 defer client.Close()
151 }
152 sample, err := f(ctx)
153 if err != nil {
154 vlog.Infof("%v", err)
155 return
156 }
157 d <- sample
158}