Asim Shankar | f76614c | 2015-04-20 09:53:21 -0700 | [diff] [blame] | 1 | // Copyright 2015 The Vanadium Authors. All rights reserved. |
| 2 | // Use of this source code is governed by a BSD-style |
| 3 | // license that can be found in the LICENSE file. |
| 4 | |
| 5 | package main |
| 6 | |
| 7 | import ( |
| 8 | "fmt" |
| 9 | "os" |
| 10 | "os/signal" |
| 11 | "sync" |
| 12 | "time" |
| 13 | |
| 14 | "v.io/v23" |
| 15 | "v.io/v23/context" |
| 16 | |
| 17 | "v.io/x/lib/vlog" |
| 18 | "v.io/x/ref/lib/stats/histogram" |
| 19 | ) |
| 20 | |
| 21 | // params encapsulates "input" information to the loadtester. |
| 22 | type params struct { |
| 23 | NetworkDistance time.Duration // Distance (in time) of the server from this driver |
| 24 | Rate float64 // Desired rate of sending RPCs (RPC/sec) |
| 25 | Duration time.Duration // Duration over which loadtest traffic should be sent |
| 26 | Context *context.T |
| 27 | Reauthenticate bool // If true, each RPC should establish a network connection (and authenticate) |
| 28 | } |
| 29 | |
| 30 | // report is generated by a run of the loadtest. |
| 31 | type report struct { |
| 32 | Count int64 // Count of RPCs sent |
| 33 | Elapsed time.Duration // Time period over which Count RPCs were sent |
| 34 | AvgLatency time.Duration |
| 35 | HistMS *histogram.Histogram // Histogram of latencies in milliseconds |
| 36 | } |
| 37 | |
| 38 | func (r *report) Print(params params) error { |
| 39 | if r.HistMS != nil { |
| 40 | fmt.Println("RPC latency histogram (in ms):") |
| 41 | fmt.Println(r.HistMS.Value()) |
| 42 | } |
| 43 | actualRate := float64(r.Count*int64(time.Second)) / float64(r.Elapsed) |
| 44 | fmt.Printf("Network Distance: %v\n", params.NetworkDistance) |
| 45 | fmt.Printf("#RPCs sent: %v\n", r.Count) |
| 46 | fmt.Printf("RPCs/sec sent: %.2f (%.2f%% of the desired rate of %v)\n", actualRate, actualRate*100/params.Rate, params.Rate) |
| 47 | fmt.Printf("Avg. Latency: %v\n", r.AvgLatency) |
| 48 | // Mark the results are tainted if the deviation from the desired rate is too large |
| 49 | if 0.9*params.Rate > actualRate { |
| 50 | return fmt.Errorf("TAINTED RESULTS: drove less traffic than desired: either server or loadtester had a bottleneck") |
| 51 | } |
| 52 | return nil |
| 53 | } |
| 54 | |
| 55 | func run(f func(*context.T) (time.Duration, error), p params) error { |
| 56 | var ( |
| 57 | ticker = time.NewTicker(time.Duration(float64(time.Second) / p.Rate)) |
| 58 | latency = make(chan time.Duration, 1000) |
| 59 | started = time.Now() |
| 60 | stop = time.After(p.Duration) |
| 61 | interrupt = make(chan os.Signal) |
| 62 | ret report |
| 63 | ) |
| 64 | defer ticker.Stop() |
| 65 | warmup(p.Context, f) |
| 66 | signal.Notify(interrupt, os.Interrupt) |
| 67 | defer signal.Stop(interrupt) |
| 68 | |
| 69 | stopped := false |
| 70 | var sumMS int64 |
| 71 | var lastInterrupt time.Time |
| 72 | for !stopped { |
| 73 | select { |
| 74 | case <-ticker.C: |
| 75 | go call(p.Context, f, p.Reauthenticate, latency) |
| 76 | case d := <-latency: |
| 77 | if ret.HistMS != nil { |
| 78 | ret.HistMS.Add(int64(d / time.Millisecond)) |
| 79 | } |
| 80 | ret.Count++ |
| 81 | sumMS += int64(d / time.Millisecond) |
| 82 | // Use 10 samples to determine how the histogram should be setup |
| 83 | if ret.Count == 10 { |
| 84 | avgms := sumMS / ret.Count |
| 85 | opts := histogram.Options{ |
| 86 | NumBuckets: 32, |
| 87 | // Mostly interested in tail latencies, |
| 88 | // so have the histogram start close to |
| 89 | // the current average. |
| 90 | MinValue: int64(float64(avgms) * 0.95), |
| 91 | GrowthFactor: 0.20, |
| 92 | } |
| 93 | vlog.Infof("Creating histogram after %d samples (%vms avg latency): %+v", ret.Count, avgms, opts) |
| 94 | ret.HistMS = histogram.New(opts) |
| 95 | } |
| 96 | case sig := <-interrupt: |
| 97 | if time.Since(lastInterrupt) < time.Second { |
| 98 | vlog.Infof("Multiple %v signals received, aborting test", sig) |
| 99 | stopped = true |
| 100 | break |
| 101 | } |
| 102 | lastInterrupt = time.Now() |
| 103 | // Print a temporary report |
| 104 | fmt.Println("INTERMEDIATE REPORT:") |
| 105 | ret.Elapsed = time.Since(started) |
| 106 | ret.AvgLatency = time.Duration(float64(sumMS)/float64(ret.Count)) * time.Millisecond |
| 107 | if err := ret.Print(p); err != nil { |
| 108 | fmt.Println(err) |
| 109 | } |
| 110 | fmt.Println("--------------------------------------------------------------------------------") |
| 111 | case <-stop: |
| 112 | stopped = true |
| 113 | } |
| 114 | } |
| 115 | ret.Elapsed = time.Since(started) |
| 116 | ret.AvgLatency = time.Duration(float64(sumMS)/float64(ret.Count)) * time.Millisecond |
| 117 | return ret.Print(p) |
| 118 | } |
| 119 | |
| 120 | func warmup(ctx *context.T, f func(*context.T) (time.Duration, error)) { |
| 121 | const nWarmup = 10 |
| 122 | vlog.Infof("Sending %d requests as warmup", nWarmup) |
| 123 | var wg sync.WaitGroup |
| 124 | for i := 0; i < nWarmup; i++ { |
| 125 | wg.Add(1) |
| 126 | go func() { |
| 127 | f(ctx) |
| 128 | wg.Done() |
| 129 | }() |
| 130 | } |
| 131 | wg.Wait() |
| 132 | vlog.Infof("Done with warmup") |
| 133 | } |
| 134 | |
| 135 | func call(ctx *context.T, f func(*context.T) (time.Duration, error), reauth bool, d chan<- time.Duration) { |
| 136 | client := v23.GetClient(ctx) |
| 137 | if reauth { |
| 138 | // HACK ALERT: At the time the line below was written, it was |
| 139 | // known that the implementation would cause 'ctx' to be setup |
| 140 | // such that any subsequent RPC will establish a network |
| 141 | // connection from scratch (a new VIF, new VC etc.) If that |
| 142 | // implementation changes, then this line below will have to |
| 143 | // change! |
| 144 | var err error |
| 145 | if ctx, err = v23.WithPrincipal(ctx, v23.GetPrincipal(ctx)); err != nil { |
| 146 | vlog.Infof("%v", err) |
| 147 | return |
| 148 | } |
| 149 | client = v23.GetClient(ctx) |
| 150 | defer client.Close() |
| 151 | } |
| 152 | sample, err := f(ctx) |
| 153 | if err != nil { |
| 154 | vlog.Infof("%v", err) |
| 155 | return |
| 156 | } |
| 157 | d <- sample |
| 158 | } |