syncbase: garbage collect commited transactions.

This CL adds function gcTransactions to the transaction manager that
cleans up all commited transactions since the last uncommited
transaction.

The gcTransactions function runs in a loop every 100ms.  We will want to
tune that parameter, and possibly optimize the gcTransaction function in
other ways, since it blocks other transaction operactions while it runs.

Change-Id: Ibb276001f501368671e276af70ded91c4243d645
diff --git a/services/syncbase/store/transactions/manager.go b/services/syncbase/store/transactions/manager.go
index 0b37927..f81ba3a 100644
--- a/services/syncbase/store/transactions/manager.go
+++ b/services/syncbase/store/transactions/manager.go
@@ -6,12 +6,18 @@
 
 import (
 	"container/list"
+	"fmt"
 	"sync"
+	"time"
 
 	"v.io/v23/verror"
 	"v.io/x/ref/services/syncbase/store"
 )
 
+// txGcInterval is the interval between transaction garbage collections.
+// TODO(nlacasse): Profile and optimize this value.
+const txGcInterval = 100 * time.Millisecond
+
 // BatchStore is a CRUD-capable storage engine that supports atomic batch
 // writes. BatchStore doesn't support transactions.
 // This interface is a Go version of the C++ LevelDB interface. It serves as
@@ -32,10 +38,13 @@
 // manager handles transaction-related operations of the store.
 type manager struct {
 	BatchStore
+	// stopTxGc is used to stop garbage collecting transactions.
+	stopTxGc func()
 	// mu protects the variables below, and is also held during transaction
 	// commits. It must always be acquired before the store-level lock.
 	mu sync.Mutex
-	// events is a queue of create/commit transaction events.
+	// events is a queue of create/commit transaction events.  Events are
+	// pushed to the back of the queue, and removed from the front via GC.
 	events *list.List
 	seq    uint64
 	// txTable is a set of keys written by recent transactions. This set
@@ -52,15 +61,27 @@
 
 // Wrap wraps the BatchStore with transaction functionality.
 func Wrap(bs BatchStore) store.Store {
-	return &manager{
+	mg := &manager{
 		BatchStore: bs,
 		events:     list.New(),
 		txTable:    newTrie(),
 	}
+
+	// Start a goroutine that garbage collects transactions every txGcInterval.
+	t := time.NewTicker(txGcInterval)
+	mg.stopTxGc = t.Stop
+	go func() {
+		for range t.C {
+			mg.gcTransactions()
+		}
+	}()
+
+	return mg
 }
 
 // Close implements the store.Store interface.
 func (mg *manager) Close() error {
+	mg.stopTxGc()
 	mg.mu.Lock()
 	if mg.txTable == nil {
 		mg.mu.Unlock()
@@ -136,7 +157,6 @@
 	if mg.events.Len() == 0 {
 		return
 	}
-	// TODO(rogulenko): do GC.
 	mg.seq++
 	var keys [][]byte
 	for _, write := range batch {
@@ -150,8 +170,40 @@
 	mg.events.PushBack(tx)
 }
 
+// gcTransactions cleans up all transactions that were commited before the
+// first open (non-commited) transaction.  Transactions are cleaned up in the
+// order that they were commited.  Note that this function holds mg.mu,
+// preventing other transaction operations from occurring while it runs.
+//
+// TODO(nlacasse): If a transaction never commits or aborts, then we will never
+// be able to GC any transaction that occurs after it.  Consider aborting any
+// transaction that has been open longer than a maximum amount of time.
+func (mg *manager) gcTransactions() {
+	mg.mu.Lock()
+	defer mg.mu.Unlock()
+	if mg.events == nil {
+		return
+	}
+	ev := mg.events.Front()
+	for ev != nil {
+		switch tx := ev.Value.(type) {
+		case *transaction:
+			return
+		case *commitedTransaction:
+			for _, batch := range tx.batch {
+				mg.txTable.remove(batch, tx.seq)
+			}
+			next := ev.Next()
+			mg.events.Remove(ev)
+			ev = next
+		default:
+			panic(fmt.Sprintf("unknown event type: %T", tx))
+		}
+	}
+}
+
 //////////////////////////////////////////////////////////////
-// Read and Write types used for storing transcation reads
+// Read and Write types used for storing transaction reads
 // and uncommitted writes.
 
 type WriteType int
diff --git a/services/syncbase/store/transactions/manager_test.go b/services/syncbase/store/transactions/manager_test.go
new file mode 100644
index 0000000..5eee857
--- /dev/null
+++ b/services/syncbase/store/transactions/manager_test.go
@@ -0,0 +1,135 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package transactions
+
+import (
+	"container/list"
+	"testing"
+
+	"v.io/x/ref/services/syncbase/store"
+)
+
+// package syncbase/store/memstore imports syncbase/transactions, so we can't
+// use memstore as the underlying BatchStore.  Instead, we use a mockBatchStore
+// with all methods stubbed out.
+type mockBatchStore struct{}
+
+func (mbs mockBatchStore) Get(key, valbuf []byte) ([]byte, error) {
+	return nil, nil
+}
+
+func (mbs mockBatchStore) Scan(start, limit []byte) store.Stream {
+	return nil
+}
+
+func (mbs mockBatchStore) WriteBatch(batch ...WriteOp) error {
+	return nil
+}
+
+func (mbs mockBatchStore) Close() error {
+	return nil
+}
+
+func (mbs mockBatchStore) NewSnapshot() store.Snapshot {
+	return mockSnapshot{}
+}
+
+type mockSnapshot struct {
+	*store.SnapshotSpecImpl
+}
+
+func (ms mockSnapshot) Get(key, valbuf []byte) ([]byte, error) {
+	return nil, nil
+}
+
+func (ms mockSnapshot) Scan(start, limit []byte) store.Stream {
+	return nil
+}
+
+func (ms mockSnapshot) Abort() error {
+	return nil
+}
+
+// TestGcTransactions tests that calling gcTransactions() garbage collects all
+// committed transactions.
+func TestGcTransactions(t *testing.T) {
+	mg := &manager{
+		BatchStore: mockBatchStore{},
+		events:     list.New(),
+		txTable:    newTrie(),
+	}
+
+	// Make a Put to the DB.  Since there are no open transactions this should
+	// not add to the events queue.
+	if err := mg.Put([]byte("foo"), []byte("foo")); err != nil {
+		t.Fatal(err)
+	}
+	if want, got := 0, mg.events.Len(); want != got {
+		t.Errorf("wanted mg.events.Len() to be %v but got %v", want, got)
+	}
+
+	// Start a new transaction and leave it open.  This will add a single event
+	// to the queue.
+	tx1 := mg.NewTransaction()
+	if want, got := 1, mg.events.Len(); want != got {
+		t.Errorf("wanted mg.events.Len() to be %v but got %v", want, got)
+	}
+
+	// Make two more Puts to the DB, which will add a two events to the queue.
+	if err := mg.Put([]byte("bar"), []byte("bar")); err != nil {
+		t.Fatal(err)
+	}
+	if err := mg.Put([]byte("baz"), []byte("baz")); err != nil {
+		t.Fatal(err)
+	}
+	if want, got := 3, mg.events.Len(); want != got {
+		t.Errorf("wanted mg.events.Len() to be %v but got %v", want, got)
+	}
+
+	// Garbage collection should not remove anything, since tx1 is still open.
+	mg.gcTransactions()
+	if want, got := 3, mg.events.Len(); want != got {
+		t.Errorf("wanted mg.events.Len() to be %v but got %v", want, got)
+	}
+
+	// Start a second transaction and leave it open.  This will add a single event
+	// to the queue.
+	tx2 := mg.NewTransaction()
+	if want, got := 4, mg.events.Len(); want != got {
+		t.Errorf("wanted mg.events.Len() to be %v but got %v", want, got)
+	}
+
+	// Do a Put and Commit on tx1.  tx1 will be removed from the queue, but a
+	// commit event added to the end, so the length is not changed.
+	if err := tx1.Put([]byte("qux"), []byte("qux")); err != nil {
+		t.Fatal(err)
+	}
+	if err := tx1.Commit(); err != nil {
+		t.Fatal(err)
+	}
+	if want, got := 4, mg.events.Len(); want != got {
+		t.Errorf("wanted mg.events.Len() to be %v but got %v", want, got)
+	}
+
+	// Garbage collection should remove the two recent Puts.
+	mg.gcTransactions()
+	if want, got := 2, mg.events.Len(); want != got {
+		t.Errorf("wanted mg.events.Len() to be %v but got %v", want, got)
+	}
+
+	// Abort tx2, which will remove it from the queue.
+	if err := tx2.Abort(); err != nil {
+		t.Fatal(err)
+	}
+	if want, got := 1, mg.events.Len(); want != got {
+		t.Errorf("wanted mg.events.Len() to be %v but got %v", want, got)
+	}
+
+	// A final garbage collection will remove the commit event for tx1.
+	mg.gcTransactions()
+	if want, got := 0, mg.events.Len(); want != got {
+		t.Errorf("wanted mg.events.Len() to be %v but got %v", want, got)
+	}
+}
diff --git a/services/syncbase/store/transactions/transaction.go b/services/syncbase/store/transactions/transaction.go
index 78abc36..d2f56aa 100644
--- a/services/syncbase/store/transactions/transaction.go
+++ b/services/syncbase/store/transactions/transaction.go
@@ -38,6 +38,8 @@
 
 var _ store.Transaction = (*transaction)(nil)
 
+// newTransaction creates a new transaction and adds it to the mg.events queue.
+// Assumes mg.mu is held.
 func newTransaction(mg *manager) *transaction {
 	tx := &transaction{
 		mg:       mg,
@@ -45,7 +47,7 @@
 		seq:      mg.seq,
 		writes:   ptrie.New(true),
 	}
-	tx.event = mg.events.PushFront(tx)
+	tx.event = mg.events.PushBack(tx)
 	return tx
 }