v.io/x/lib/simplemr: fix data race and time dependent test.
Change-Id: Icfbb811ebb63c8f4cbf51103554373dcc0f243f2
diff --git a/simplemr/mr.go b/simplemr/mr.go
index 586a241..15f19d2 100644
--- a/simplemr/mr.go
+++ b/simplemr/mr.go
@@ -83,6 +83,7 @@
cancelled bool
cancelled_mu sync.RWMutex // guards cancelled
err error
+ err_mu sync.RWMutex // guards err
data *store
// The number of conccurent mappers to use. A value of 0 instructs
@@ -98,6 +99,8 @@
// safe to read its value once the output channel passed to Run has been
// closed.
func (mr *MR) Error() error {
+ mr.err_mu.RLock()
+ defer mr.err_mu.RUnlock()
return mr.err
}
@@ -221,15 +224,21 @@
timeout = time.After(mr.Timeout)
}
defer close(mr.output)
- if mr.err = mr.runMappers(mapper, timeout); mr.err != nil {
- return mr.err
+ if err := mr.runMappers(mapper, timeout); err != nil {
+ mr.err_mu.Lock()
+ mr.err = err
+ mr.err_mu.Unlock()
+ return err
}
if mr.IsCancelled() {
return ErrMRCancelled
}
- mr.err = mr.runReducers(reducer, timeout)
+ err := mr.runReducers(reducer, timeout)
+ mr.err_mu.Lock()
+ mr.err = err
+ mr.err_mu.Unlock()
if mr.IsCancelled() {
return ErrMRCancelled
}
- return mr.err
+ return err
}
diff --git a/simplemr/mr_test.go b/simplemr/mr_test.go
index baf5077..42722ec 100644
--- a/simplemr/mr_test.go
+++ b/simplemr/mr_test.go
@@ -103,13 +103,13 @@
func TestTimeout(t *testing.T) {
in, out := newChans(1)
- mrt := &simplemr.MR{Timeout: 10 * time.Millisecond}
+ mrt := &simplemr.MR{Timeout: 100 * time.Millisecond}
identity := &simplemr.Identity{}
mrt.Run(in, out, identity, identity)
if err := mrt.Error(); err == nil || !strings.Contains(err.Error(), "timed out mappers") {
t.Fatalf("missing or wrong error: %v", err)
}
- mrt = &simplemr.MR{Timeout: 10 * time.Millisecond}
+ mrt = &simplemr.MR{Timeout: 100 * time.Millisecond}
in, out = newChans(1)
in <- &simplemr.Record{"key", []interface{}{"value"}}
close(in)