syncbase/store: fix a race condition

Consider two transactions committing concurrently and doing tx.removeEvent()
at the same time (transaction.go:169). Now the event queue is empty and
both transcations will not be tracked.

Fix: move this line under the tx.mg.mu Lock.

Change-Id: I13b54fa05ea35c413d64226d596dd7e64fc9d5e4
diff --git a/services/syncbase/store/transactions/manager.go b/services/syncbase/store/transactions/manager.go
index 2e0641c..3d40c09 100644
--- a/services/syncbase/store/transactions/manager.go
+++ b/services/syncbase/store/transactions/manager.go
@@ -62,24 +62,21 @@
 // Close implements the store.Store interface.
 func (mg *manager) Close() error {
 	mg.mu.Lock()
-	defer mg.mu.Unlock()
 	if mg.txTable == nil {
+		mg.mu.Unlock()
 		return verror.New(verror.ErrCanceled, nil, store.ErrMsgClosedStore)
 	}
 	mg.BatchStore.Close()
-	for event := mg.events.Front(); event != nil; event = event.Next() {
+	events := mg.events
+	mg.events = nil
+	mg.txTable = nil
+	// tx.Abort() internally locks mg.mu.
+	mg.mu.Unlock()
+	for event := events.Front(); event != nil; event = event.Next() {
 		if tx, ok := event.Value.(*transaction); ok {
-			// tx.Abort() internally removes tx from the mg.events list under
-			// the mg.mu lock. To brake the cyclic dependency, we set tx.event
-			// to nil.
-			tx.mu.Lock()
-			tx.event = nil
-			tx.mu.Unlock()
 			tx.Abort()
 		}
 	}
-	mg.events = nil
-	mg.txTable = nil
 	return nil
 }
 
diff --git a/services/syncbase/store/transactions/transaction.go b/services/syncbase/store/transactions/transaction.go
index 6331a8f..1c81abd 100644
--- a/services/syncbase/store/transactions/transaction.go
+++ b/services/syncbase/store/transactions/transaction.go
@@ -21,7 +21,7 @@
 	mu       sync.Mutex
 	mg       *manager
 	seq      uint64
-	event    *list.Element // pointer to element of db.txEvents
+	event    *list.Element // pointer to element of mg.events
 	snapshot store.Snapshot
 	reads    readSet
 	writes   []WriteOp
@@ -40,26 +40,13 @@
 	return tx
 }
 
-// close removes this transaction from the mg.events queue and aborts
-// the underlying snapshot.
-// Assumes mu is held.
-func (tx *transaction) close() {
-	tx.removeEvent()
-	tx.snapshot.Abort()
-}
-
 // removeEvent removes this transaction from the mg.events queue.
-// Assumes mu is held.
+// Assumes mu and mg.mu are held.
 func (tx *transaction) removeEvent() {
-	// This can happen if the transaction was committed, since Commit()
-	// explicitly calls removeEvent().
-	if tx.event == nil {
-		return
+	if tx.event != nil {
+		tx.mg.events.Remove(tx.event)
+		tx.event = nil
 	}
-	tx.mg.mu.Lock()
-	tx.mg.events.Remove(tx.event)
-	tx.mg.mu.Unlock()
-	tx.event = nil
 }
 
 // Get implements the store.StoreReader interface.
@@ -163,13 +150,16 @@
 		return store.ConvertError(tx.err)
 	}
 	tx.err = verror.New(verror.ErrBadState, nil, store.ErrMsgCommittedTxn)
-	// Explicitly remove this transaction from the event queue. If this was the
-	// only active transaction, the event queue becomes empty and writeLocked will
-	// not add this transaction's write set to txTable.
-	tx.removeEvent()
-	defer tx.close()
+	tx.snapshot.Abort()
 	tx.mg.mu.Lock()
 	defer tx.mg.mu.Unlock()
+	if tx.mg.txTable == nil {
+		return verror.New(verror.ErrCanceled, nil, store.ErrMsgClosedStore)
+	}
+	// Explicitly remove this transaction from the event queue. If this was the
+	// only active transaction, the event queue becomes empty and trackBatch will
+	// not add this transaction's write set to txTable.
+	tx.removeEvent()
 	if !tx.validateReadSet() {
 		return store.NewErrConcurrentTransaction(nil)
 	}
@@ -188,6 +178,9 @@
 		return store.ConvertError(tx.err)
 	}
 	tx.err = verror.New(verror.ErrCanceled, nil, store.ErrMsgAbortedTxn)
-	tx.close()
+	tx.snapshot.Abort()
+	tx.mg.mu.Lock()
+	tx.removeEvent()
+	tx.mg.mu.Unlock()
 	return nil
 }