syncbase/store: fix a race condition

Consider two transactions committing concurrently and doing tx.removeEvent()
at the same time (transaction.go:169). Now the event queue is empty and
both transcations will not be tracked.

Fix: move this line under the tx.mg.mu Lock.

Change-Id: I13b54fa05ea35c413d64226d596dd7e64fc9d5e4
diff --git a/services/syncbase/store/transactions/transaction.go b/services/syncbase/store/transactions/transaction.go
index 6331a8f..1c81abd 100644
--- a/services/syncbase/store/transactions/transaction.go
+++ b/services/syncbase/store/transactions/transaction.go
@@ -21,7 +21,7 @@
 	mu       sync.Mutex
 	mg       *manager
 	seq      uint64
-	event    *list.Element // pointer to element of db.txEvents
+	event    *list.Element // pointer to element of mg.events
 	snapshot store.Snapshot
 	reads    readSet
 	writes   []WriteOp
@@ -40,26 +40,13 @@
 	return tx
 }
 
-// close removes this transaction from the mg.events queue and aborts
-// the underlying snapshot.
-// Assumes mu is held.
-func (tx *transaction) close() {
-	tx.removeEvent()
-	tx.snapshot.Abort()
-}
-
 // removeEvent removes this transaction from the mg.events queue.
-// Assumes mu is held.
+// Assumes mu and mg.mu are held.
 func (tx *transaction) removeEvent() {
-	// This can happen if the transaction was committed, since Commit()
-	// explicitly calls removeEvent().
-	if tx.event == nil {
-		return
+	if tx.event != nil {
+		tx.mg.events.Remove(tx.event)
+		tx.event = nil
 	}
-	tx.mg.mu.Lock()
-	tx.mg.events.Remove(tx.event)
-	tx.mg.mu.Unlock()
-	tx.event = nil
 }
 
 // Get implements the store.StoreReader interface.
@@ -163,13 +150,16 @@
 		return store.ConvertError(tx.err)
 	}
 	tx.err = verror.New(verror.ErrBadState, nil, store.ErrMsgCommittedTxn)
-	// Explicitly remove this transaction from the event queue. If this was the
-	// only active transaction, the event queue becomes empty and writeLocked will
-	// not add this transaction's write set to txTable.
-	tx.removeEvent()
-	defer tx.close()
+	tx.snapshot.Abort()
 	tx.mg.mu.Lock()
 	defer tx.mg.mu.Unlock()
+	if tx.mg.txTable == nil {
+		return verror.New(verror.ErrCanceled, nil, store.ErrMsgClosedStore)
+	}
+	// Explicitly remove this transaction from the event queue. If this was the
+	// only active transaction, the event queue becomes empty and trackBatch will
+	// not add this transaction's write set to txTable.
+	tx.removeEvent()
 	if !tx.validateReadSet() {
 		return store.NewErrConcurrentTransaction(nil)
 	}
@@ -188,6 +178,9 @@
 		return store.ConvertError(tx.err)
 	}
 	tx.err = verror.New(verror.ErrCanceled, nil, store.ErrMsgAbortedTxn)
-	tx.close()
+	tx.snapshot.Abort()
+	tx.mg.mu.Lock()
+	tx.removeEvent()
+	tx.mg.mu.Unlock()
 	return nil
 }