v.io/x/devtools/vmon: use lock to protect concurrent accesses to map
The map "aggs" was being modified concurrently by the statsWorker()
threads. This caused crash:
https://veyron.corp.google.com/jenkins/job/monitoring-cloud-syncbase/109367/console
This change introduces a mutex to fix it.
Change-Id: I1530c68131af162499ea97802efa32417a647266
diff --git a/vmon/cloudsyncbase.go b/vmon/cloudsyncbase.go
index 4beeb60..3b57034 100644
--- a/vmon/cloudsyncbase.go
+++ b/vmon/cloudsyncbase.go
@@ -7,6 +7,7 @@
import (
"fmt"
"runtime"
+ "sync"
"time"
cloudmonitoring "google.golang.org/api/monitoring/v3"
@@ -95,10 +96,11 @@
numTasks := numInstances * len(taskTypes)
tasks := make(chan cloudSyncbaseStatsTask, numTasks)
taskResults := make(chan cloudSyncbaseStatsResult, numTasks)
+ var aggsMu sync.Mutex // protects aggs
aggs := map[string]*aggregator{}
// Start workers and distribute work to them.
for i := 0; i < runtime.NumCPU(); i++ {
- go statsWorker(v23ctx, ctx, s, now, aggs, md, tasks, taskResults)
+ go statsWorker(v23ctx, ctx, s, now, &aggsMu, aggs, md, tasks, taskResults)
}
for _, sb := range sbInstances {
for _, t := range taskTypes {
@@ -136,7 +138,7 @@
// and updates the corresponding aggregator.
func statsWorker(
v23ctx *context.T, ctx *tool.Context, s *cloudmonitoring.Service,
- now string, aggs map[string]*aggregator, md *cloudmonitoring.MetricDescriptor,
+ now string, aggsMu *sync.Mutex, aggs map[string]*aggregator, md *cloudmonitoring.MetricDescriptor,
tasks <-chan cloudSyncbaseStatsTask, results chan<- cloudSyncbaseStatsResult) {
for t := range tasks {
result := cloudSyncbaseStatsResult{}
@@ -209,7 +211,7 @@
}
}
for _, metric := range metrics {
- getAggregator(aggs, metric.metricLabel).add(metric.value)
+ getAggregator(aggsMu, aggs, metric.metricLabel).add(metric.value)
if err := sendDataToGCM(s, md, metric.value, now, "", "", metric.metricLabel, t.mountedName); err != nil {
fmt.Fprintf(ctx.Stderr(), "%v\n", err)
} else {
@@ -220,7 +222,9 @@
}
}
-func getAggregator(aggs map[string]*aggregator, metricLabel string) *aggregator {
+func getAggregator(aggsMu *sync.Mutex, aggs map[string]*aggregator, metricLabel string) *aggregator {
+ aggsMu.Lock()
+ defer aggsMu.Unlock()
_, ok := aggs[metricLabel]
if !ok {
aggs[metricLabel] = newAggregator()