v.io/x/lib/simplemr: fix data race and time dependent test.

Change-Id: Icfbb811ebb63c8f4cbf51103554373dcc0f243f2
diff --git a/simplemr/mr.go b/simplemr/mr.go
index 586a241..15f19d2 100644
--- a/simplemr/mr.go
+++ b/simplemr/mr.go
@@ -83,6 +83,7 @@
 	cancelled    bool
 	cancelled_mu sync.RWMutex // guards cancelled
 	err          error
+	err_mu       sync.RWMutex // guards err
 	data         *store
 
 	// The number of conccurent mappers to use. A value of 0 instructs
@@ -98,6 +99,8 @@
 // safe to read its value once the output channel passed to Run has been
 // closed.
 func (mr *MR) Error() error {
+	mr.err_mu.RLock()
+	defer mr.err_mu.RUnlock()
 	return mr.err
 }
 
@@ -221,15 +224,21 @@
 		timeout = time.After(mr.Timeout)
 	}
 	defer close(mr.output)
-	if mr.err = mr.runMappers(mapper, timeout); mr.err != nil {
-		return mr.err
+	if err := mr.runMappers(mapper, timeout); err != nil {
+		mr.err_mu.Lock()
+		mr.err = err
+		mr.err_mu.Unlock()
+		return err
 	}
 	if mr.IsCancelled() {
 		return ErrMRCancelled
 	}
-	mr.err = mr.runReducers(reducer, timeout)
+	err := mr.runReducers(reducer, timeout)
+	mr.err_mu.Lock()
+	mr.err = err
+	mr.err_mu.Unlock()
 	if mr.IsCancelled() {
 		return ErrMRCancelled
 	}
-	return mr.err
+	return err
 }
diff --git a/simplemr/mr_test.go b/simplemr/mr_test.go
index baf5077..42722ec 100644
--- a/simplemr/mr_test.go
+++ b/simplemr/mr_test.go
@@ -103,13 +103,13 @@
 
 func TestTimeout(t *testing.T) {
 	in, out := newChans(1)
-	mrt := &simplemr.MR{Timeout: 10 * time.Millisecond}
+	mrt := &simplemr.MR{Timeout: 100 * time.Millisecond}
 	identity := &simplemr.Identity{}
 	mrt.Run(in, out, identity, identity)
 	if err := mrt.Error(); err == nil || !strings.Contains(err.Error(), "timed out mappers") {
 		t.Fatalf("missing or wrong error: %v", err)
 	}
-	mrt = &simplemr.MR{Timeout: 10 * time.Millisecond}
+	mrt = &simplemr.MR{Timeout: 100 * time.Millisecond}
 	in, out = newChans(1)
 	in <- &simplemr.Record{"key", []interface{}{"value"}}
 	close(in)