syncbase: garbage collect commited transactions.
This CL adds function gcTransactions to the transaction manager that
cleans up all commited transactions since the last uncommited
transaction.
The gcTransactions function runs in a loop every 100ms. We will want to
tune that parameter, and possibly optimize the gcTransaction function in
other ways, since it blocks other transaction operactions while it runs.
Change-Id: Ibb276001f501368671e276af70ded91c4243d645
diff --git a/services/syncbase/store/transactions/manager.go b/services/syncbase/store/transactions/manager.go
index 0b37927..f81ba3a 100644
--- a/services/syncbase/store/transactions/manager.go
+++ b/services/syncbase/store/transactions/manager.go
@@ -6,12 +6,18 @@
import (
"container/list"
+ "fmt"
"sync"
+ "time"
"v.io/v23/verror"
"v.io/x/ref/services/syncbase/store"
)
+// txGcInterval is the interval between transaction garbage collections.
+// TODO(nlacasse): Profile and optimize this value.
+const txGcInterval = 100 * time.Millisecond
+
// BatchStore is a CRUD-capable storage engine that supports atomic batch
// writes. BatchStore doesn't support transactions.
// This interface is a Go version of the C++ LevelDB interface. It serves as
@@ -32,10 +38,13 @@
// manager handles transaction-related operations of the store.
type manager struct {
BatchStore
+ // stopTxGc is used to stop garbage collecting transactions.
+ stopTxGc func()
// mu protects the variables below, and is also held during transaction
// commits. It must always be acquired before the store-level lock.
mu sync.Mutex
- // events is a queue of create/commit transaction events.
+ // events is a queue of create/commit transaction events. Events are
+ // pushed to the back of the queue, and removed from the front via GC.
events *list.List
seq uint64
// txTable is a set of keys written by recent transactions. This set
@@ -52,15 +61,27 @@
// Wrap wraps the BatchStore with transaction functionality.
func Wrap(bs BatchStore) store.Store {
- return &manager{
+ mg := &manager{
BatchStore: bs,
events: list.New(),
txTable: newTrie(),
}
+
+ // Start a goroutine that garbage collects transactions every txGcInterval.
+ t := time.NewTicker(txGcInterval)
+ mg.stopTxGc = t.Stop
+ go func() {
+ for range t.C {
+ mg.gcTransactions()
+ }
+ }()
+
+ return mg
}
// Close implements the store.Store interface.
func (mg *manager) Close() error {
+ mg.stopTxGc()
mg.mu.Lock()
if mg.txTable == nil {
mg.mu.Unlock()
@@ -136,7 +157,6 @@
if mg.events.Len() == 0 {
return
}
- // TODO(rogulenko): do GC.
mg.seq++
var keys [][]byte
for _, write := range batch {
@@ -150,8 +170,40 @@
mg.events.PushBack(tx)
}
+// gcTransactions cleans up all transactions that were commited before the
+// first open (non-commited) transaction. Transactions are cleaned up in the
+// order that they were commited. Note that this function holds mg.mu,
+// preventing other transaction operations from occurring while it runs.
+//
+// TODO(nlacasse): If a transaction never commits or aborts, then we will never
+// be able to GC any transaction that occurs after it. Consider aborting any
+// transaction that has been open longer than a maximum amount of time.
+func (mg *manager) gcTransactions() {
+ mg.mu.Lock()
+ defer mg.mu.Unlock()
+ if mg.events == nil {
+ return
+ }
+ ev := mg.events.Front()
+ for ev != nil {
+ switch tx := ev.Value.(type) {
+ case *transaction:
+ return
+ case *commitedTransaction:
+ for _, batch := range tx.batch {
+ mg.txTable.remove(batch, tx.seq)
+ }
+ next := ev.Next()
+ mg.events.Remove(ev)
+ ev = next
+ default:
+ panic(fmt.Sprintf("unknown event type: %T", tx))
+ }
+ }
+}
+
//////////////////////////////////////////////////////////////
-// Read and Write types used for storing transcation reads
+// Read and Write types used for storing transaction reads
// and uncommitted writes.
type WriteType int
diff --git a/services/syncbase/store/transactions/manager_test.go b/services/syncbase/store/transactions/manager_test.go
new file mode 100644
index 0000000..5eee857
--- /dev/null
+++ b/services/syncbase/store/transactions/manager_test.go
@@ -0,0 +1,135 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package transactions
+
+import (
+ "container/list"
+ "testing"
+
+ "v.io/x/ref/services/syncbase/store"
+)
+
+// package syncbase/store/memstore imports syncbase/transactions, so we can't
+// use memstore as the underlying BatchStore. Instead, we use a mockBatchStore
+// with all methods stubbed out.
+type mockBatchStore struct{}
+
+func (mbs mockBatchStore) Get(key, valbuf []byte) ([]byte, error) {
+ return nil, nil
+}
+
+func (mbs mockBatchStore) Scan(start, limit []byte) store.Stream {
+ return nil
+}
+
+func (mbs mockBatchStore) WriteBatch(batch ...WriteOp) error {
+ return nil
+}
+
+func (mbs mockBatchStore) Close() error {
+ return nil
+}
+
+func (mbs mockBatchStore) NewSnapshot() store.Snapshot {
+ return mockSnapshot{}
+}
+
+type mockSnapshot struct {
+ *store.SnapshotSpecImpl
+}
+
+func (ms mockSnapshot) Get(key, valbuf []byte) ([]byte, error) {
+ return nil, nil
+}
+
+func (ms mockSnapshot) Scan(start, limit []byte) store.Stream {
+ return nil
+}
+
+func (ms mockSnapshot) Abort() error {
+ return nil
+}
+
+// TestGcTransactions tests that calling gcTransactions() garbage collects all
+// committed transactions.
+func TestGcTransactions(t *testing.T) {
+ mg := &manager{
+ BatchStore: mockBatchStore{},
+ events: list.New(),
+ txTable: newTrie(),
+ }
+
+ // Make a Put to the DB. Since there are no open transactions this should
+ // not add to the events queue.
+ if err := mg.Put([]byte("foo"), []byte("foo")); err != nil {
+ t.Fatal(err)
+ }
+ if want, got := 0, mg.events.Len(); want != got {
+ t.Errorf("wanted mg.events.Len() to be %v but got %v", want, got)
+ }
+
+ // Start a new transaction and leave it open. This will add a single event
+ // to the queue.
+ tx1 := mg.NewTransaction()
+ if want, got := 1, mg.events.Len(); want != got {
+ t.Errorf("wanted mg.events.Len() to be %v but got %v", want, got)
+ }
+
+ // Make two more Puts to the DB, which will add a two events to the queue.
+ if err := mg.Put([]byte("bar"), []byte("bar")); err != nil {
+ t.Fatal(err)
+ }
+ if err := mg.Put([]byte("baz"), []byte("baz")); err != nil {
+ t.Fatal(err)
+ }
+ if want, got := 3, mg.events.Len(); want != got {
+ t.Errorf("wanted mg.events.Len() to be %v but got %v", want, got)
+ }
+
+ // Garbage collection should not remove anything, since tx1 is still open.
+ mg.gcTransactions()
+ if want, got := 3, mg.events.Len(); want != got {
+ t.Errorf("wanted mg.events.Len() to be %v but got %v", want, got)
+ }
+
+ // Start a second transaction and leave it open. This will add a single event
+ // to the queue.
+ tx2 := mg.NewTransaction()
+ if want, got := 4, mg.events.Len(); want != got {
+ t.Errorf("wanted mg.events.Len() to be %v but got %v", want, got)
+ }
+
+ // Do a Put and Commit on tx1. tx1 will be removed from the queue, but a
+ // commit event added to the end, so the length is not changed.
+ if err := tx1.Put([]byte("qux"), []byte("qux")); err != nil {
+ t.Fatal(err)
+ }
+ if err := tx1.Commit(); err != nil {
+ t.Fatal(err)
+ }
+ if want, got := 4, mg.events.Len(); want != got {
+ t.Errorf("wanted mg.events.Len() to be %v but got %v", want, got)
+ }
+
+ // Garbage collection should remove the two recent Puts.
+ mg.gcTransactions()
+ if want, got := 2, mg.events.Len(); want != got {
+ t.Errorf("wanted mg.events.Len() to be %v but got %v", want, got)
+ }
+
+ // Abort tx2, which will remove it from the queue.
+ if err := tx2.Abort(); err != nil {
+ t.Fatal(err)
+ }
+ if want, got := 1, mg.events.Len(); want != got {
+ t.Errorf("wanted mg.events.Len() to be %v but got %v", want, got)
+ }
+
+ // A final garbage collection will remove the commit event for tx1.
+ mg.gcTransactions()
+ if want, got := 0, mg.events.Len(); want != got {
+ t.Errorf("wanted mg.events.Len() to be %v but got %v", want, got)
+ }
+}
diff --git a/services/syncbase/store/transactions/transaction.go b/services/syncbase/store/transactions/transaction.go
index 78abc36..d2f56aa 100644
--- a/services/syncbase/store/transactions/transaction.go
+++ b/services/syncbase/store/transactions/transaction.go
@@ -38,6 +38,8 @@
var _ store.Transaction = (*transaction)(nil)
+// newTransaction creates a new transaction and adds it to the mg.events queue.
+// Assumes mg.mu is held.
func newTransaction(mg *manager) *transaction {
tx := &transaction{
mg: mg,
@@ -45,7 +47,7 @@
seq: mg.seq,
writes: ptrie.New(true),
}
- tx.event = mg.events.PushFront(tx)
+ tx.event = mg.events.PushBack(tx)
return tx
}