store/leveldb: recursively close objects

Make sure that ia object 'a' is closed, then object 'b' created
from 'a' is also closed, where 'a'->'b' are:
store -> transaction -> stream
store -> snapshot -> stream
store -> stream

Also remove ResetForRetry function from the API of transactions,
because it becomes harder to track the half-closed state of a
transaction and there is not much benefit for having this function
now.

Change-Id: Icf9158bcf3cb1c57f75dd1d9d0cadd95e3350a20
diff --git a/services/syncbase/store/invalid_types.go b/services/syncbase/store/invalid_types.go
index 05bfc13..83e73e9 100644
--- a/services/syncbase/store/invalid_types.go
+++ b/services/syncbase/store/invalid_types.go
@@ -21,7 +21,7 @@
 }
 
 // Scan implements the store.StoreReader interface.
-func (s *InvalidSnapshot) Scan(start, end []byte) Stream {
+func (s *InvalidSnapshot) Scan(start, limit []byte) Stream {
 	return &InvalidStream{s.Error}
 }
 
@@ -72,7 +72,7 @@
 }
 
 // Scan implements the store.StoreReader interface.
-func (tx *InvalidTransaction) Scan(start, end []byte) Stream {
+func (tx *InvalidTransaction) Scan(start, limit []byte) Stream {
 	return &InvalidStream{tx.Error}
 }
 
diff --git a/services/syncbase/store/leveldb/db.go b/services/syncbase/store/leveldb/db.go
index ee49b1c..5e45bd3 100644
--- a/services/syncbase/store/leveldb/db.go
+++ b/services/syncbase/store/leveldb/db.go
@@ -24,9 +24,10 @@
 
 // db is a wrapper around LevelDB that implements the store.Store interface.
 type db struct {
-	// mu protects cDb.
-	mu  sync.RWMutex
-	cDb *C.leveldb_t
+	// mu protects the state of the db.
+	mu   sync.RWMutex
+	node *resourceNode
+	cDb  *C.leveldb_t
 	// Default read/write options.
 	readOptions  *C.leveldb_readoptions_t
 	writeOptions *C.leveldb_writeoptions_t
@@ -57,6 +58,7 @@
 	readOptions := C.leveldb_readoptions_create()
 	C.leveldb_readoptions_set_verify_checksums(readOptions, 1)
 	return &db{
+		node:         newResourceNode(),
 		cDb:          cDb,
 		readOptions:  readOptions,
 		writeOptions: C.leveldb_writeoptions_create(),
@@ -70,6 +72,7 @@
 	if d.err != nil {
 		return d.err
 	}
+	d.node.close()
 	C.leveldb_close(d.cDb)
 	d.cDb = nil
 	C.leveldb_readoptions_destroy(d.readOptions)
@@ -103,7 +106,7 @@
 	if d.err != nil {
 		return &store.InvalidStream{d.err}
 	}
-	return newStream(d, start, limit, d.readOptions)
+	return newStream(d, d.node, start, limit, d.readOptions)
 }
 
 // Put implements the store.StoreWriter interface.
@@ -132,7 +135,7 @@
 		d.txmu.Unlock()
 		return &store.InvalidTransaction{d.err}
 	}
-	return newTransaction(d)
+	return newTransaction(d, d.node)
 }
 
 // NewSnapshot implements the store.Store interface.
@@ -142,7 +145,7 @@
 	if d.err != nil {
 		return &store.InvalidSnapshot{d.err}
 	}
-	return newSnapshot(d)
+	return newSnapshot(d, d.node)
 }
 
 // getWithOpts returns the value for the given key.
diff --git a/services/syncbase/store/leveldb/db_test.go b/services/syncbase/store/leveldb/db_test.go
index 666cc40..65594f5 100644
--- a/services/syncbase/store/leveldb/db_test.go
+++ b/services/syncbase/store/leveldb/db_test.go
@@ -30,6 +30,10 @@
 	runTest(t, test.RunStoreStateTest)
 }
 
+func TestClose(t *testing.T) {
+	runTest(t, test.RunCloseTest)
+}
+
 func TestReadWriteBasic(t *testing.T) {
 	runTest(t, test.RunReadWriteBasicTest)
 }
diff --git a/services/syncbase/store/leveldb/resource_node.go b/services/syncbase/store/leveldb/resource_node.go
new file mode 100644
index 0000000..90e598d
--- /dev/null
+++ b/services/syncbase/store/leveldb/resource_node.go
@@ -0,0 +1,72 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package leveldb
+
+import (
+	"sync"
+)
+
+// resourceNode is a node in a dependency graph. This graph is used to ensure
+// that when a resource is freed, downstream resources are also freed. For
+// example, closing a store closes all downstream transactions, snapshots and
+// streams.
+type resourceNode struct {
+	mu       sync.Mutex
+	parent   *resourceNode
+	children map[*resourceNode]func()
+}
+
+func newResourceNode() *resourceNode {
+	return &resourceNode{
+		children: make(map[*resourceNode]func()),
+	}
+}
+
+// addChild adds a parent-child relation between this node and the provided
+// node. The provided function is called to close the child when this node is
+// closed.
+func (r *resourceNode) addChild(node *resourceNode, closefn func()) {
+	r.mu.Lock()
+	defer r.mu.Unlock()
+	if r.children == nil {
+		panic("already closed")
+	}
+	node.parent = r
+	r.children[node] = closefn
+}
+
+// removeChild removes the parent-child relation between this node and the
+// provided node, enabling Go's garbage collector to free the resources
+// associated with the node if there are no more references to it.
+func (r *resourceNode) removeChild(node *resourceNode) {
+	r.mu.Lock()
+	defer r.mu.Unlock()
+	if r.children == nil {
+		// Already closed.
+		return
+	}
+	delete(r.children, node)
+}
+
+// close closes this node and detaches it from its parent. All child nodes
+// are closed using close functions provided to addChild.
+func (r *resourceNode) close() {
+	r.mu.Lock()
+	if r.parent != nil {
+		// If there is a node V with parent P and we decide to explicitly close V,
+		// then we need to remove V from P's children list so that we don't close
+		// V again when P is closed.
+		r.parent.removeChild(r)
+		r.parent = nil
+	}
+	// Copy the children map to a local variable so that the removeChild step
+	// executed from children won't affect the map while we iterate through it.
+	children := r.children
+	r.children = nil
+	r.mu.Unlock()
+	for _, closefn := range children {
+		closefn()
+	}
+}
diff --git a/services/syncbase/store/leveldb/snapshot.go b/services/syncbase/store/leveldb/snapshot.go
index 1e920fd..0d78f03 100644
--- a/services/syncbase/store/leveldb/snapshot.go
+++ b/services/syncbase/store/leveldb/snapshot.go
@@ -18,6 +18,7 @@
 type snapshot struct {
 	// mu protects the state of the snapshot.
 	mu        sync.RWMutex
+	node      *resourceNode
 	d         *db
 	cSnapshot *C.leveldb_snapshot_t
 	cOpts     *C.leveldb_readoptions_t
@@ -26,16 +27,21 @@
 
 var _ store.Snapshot = (*snapshot)(nil)
 
-func newSnapshot(d *db) *snapshot {
+func newSnapshot(d *db, parent *resourceNode) *snapshot {
 	cSnapshot := C.leveldb_create_snapshot(d.cDb)
 	cOpts := C.leveldb_readoptions_create()
 	C.leveldb_readoptions_set_verify_checksums(cOpts, 1)
 	C.leveldb_readoptions_set_snapshot(cOpts, cSnapshot)
-	return &snapshot{
+	s := &snapshot{
+		node:      newResourceNode(),
 		d:         d,
 		cSnapshot: cSnapshot,
 		cOpts:     cOpts,
 	}
+	parent.addChild(s.node, func() {
+		s.Close()
+	})
+	return s
 }
 
 // Close implements the store.Snapshot interface.
@@ -45,6 +51,7 @@
 	if s.err != nil {
 		return s.err
 	}
+	s.node.close()
 	C.leveldb_readoptions_destroy(s.cOpts)
 	s.cOpts = nil
 	C.leveldb_release_snapshot(s.d.cDb, s.cSnapshot)
@@ -70,5 +77,5 @@
 	if s.err != nil {
 		return &store.InvalidStream{s.err}
 	}
-	return newStream(s.d, start, limit, s.cOpts)
+	return newStream(s.d, s.node, start, limit, s.cOpts)
 }
diff --git a/services/syncbase/store/leveldb/stream.go b/services/syncbase/store/leveldb/stream.go
index bc146e8..b528626 100644
--- a/services/syncbase/store/leveldb/stream.go
+++ b/services/syncbase/store/leveldb/stream.go
@@ -20,6 +20,7 @@
 type stream struct {
 	// mu protects the state of the stream.
 	mu    sync.Mutex
+	node  *resourceNode
 	cIter *C.syncbase_leveldb_iterator_t
 	limit []byte
 
@@ -37,16 +38,24 @@
 
 var _ store.Stream = (*stream)(nil)
 
-func newStream(d *db, start, limit []byte, cOpts *C.leveldb_readoptions_t) *stream {
+func newStream(d *db, parent *resourceNode, start, limit []byte, cOpts *C.leveldb_readoptions_t) *stream {
 	cStr, size := cSlice(start)
 	cIter := C.syncbase_leveldb_create_iterator(d.cDb, cOpts, cStr, size)
-	return &stream{
+	s := &stream{
+		node:  newResourceNode(),
 		cIter: cIter,
 		limit: limit,
 	}
+	parent.addChild(s.node, func() {
+		s.Cancel()
+	})
+	return s
 }
 
+// destroyLeveldbIter destroys the underlying C iterator.
+// Assumes mu is held.
 func (s *stream) destroyLeveldbIter() {
+	s.node.close()
 	C.syncbase_leveldb_iter_destroy(s.cIter)
 	s.cIter = nil
 }
diff --git a/services/syncbase/store/leveldb/transaction.go b/services/syncbase/store/leveldb/transaction.go
index 2137978..64ea397 100644
--- a/services/syncbase/store/leveldb/transaction.go
+++ b/services/syncbase/store/leveldb/transaction.go
@@ -16,6 +16,7 @@
 // transaction is a wrapper around LevelDB WriteBatch that implements
 // the store.Transaction interface.
 type transaction struct {
+	node     *resourceNode
 	mu       sync.Mutex
 	d        *db
 	snapshot store.Snapshot
@@ -26,18 +27,26 @@
 
 var _ store.Transaction = (*transaction)(nil)
 
-func newTransaction(d *db) *transaction {
-	return &transaction{
+func newTransaction(d *db, parent *resourceNode) *transaction {
+	tx := &transaction{
+		node:     newResourceNode(),
 		d:        d,
 		snapshot: d.NewSnapshot(),
 		batch:    C.leveldb_writebatch_create(),
 		cOpts:    d.writeOptions,
 	}
+	parent.addChild(tx.node, func() {
+		tx.Abort()
+	})
+	return tx
 }
 
 // close frees allocated C objects and releases acquired locks.
+// Assumes mu is held.
 func (tx *transaction) close() {
 	tx.d.txmu.Unlock()
+	tx.node.close()
+	tx.snapshot.Close()
 	C.leveldb_writebatch_destroy(tx.batch)
 	tx.batch = nil
 	if tx.cOpts != tx.d.writeOptions {
@@ -46,19 +55,6 @@
 	tx.cOpts = nil
 }
 
-// ResetForRetry implements the store.Transaction interface.
-func (tx *transaction) ResetForRetry() {
-	tx.mu.Lock()
-	defer tx.mu.Unlock()
-	if tx.batch == nil {
-		panic(tx.err)
-	}
-	tx.snapshot.Close()
-	tx.snapshot = tx.d.NewSnapshot()
-	tx.err = nil
-	C.leveldb_writebatch_clear(tx.batch)
-}
-
 // Get implements the store.StoreReader interface.
 func (tx *transaction) Get(key, valbuf []byte) ([]byte, error) {
 	tx.mu.Lock()
diff --git a/services/syncbase/store/memstore/snapshot.go b/services/syncbase/store/memstore/snapshot.go
index fc7cf0a..c7c016f 100644
--- a/services/syncbase/store/memstore/snapshot.go
+++ b/services/syncbase/store/memstore/snapshot.go
@@ -62,11 +62,11 @@
 }
 
 // Scan implements the store.StoreReader interface.
-func (s *snapshot) Scan(start, end []byte) store.Stream {
+func (s *snapshot) Scan(start, limit []byte) store.Stream {
 	s.mu.Lock()
 	defer s.mu.Unlock()
 	if err := s.error(); err != nil {
 		return &store.InvalidStream{err}
 	}
-	return newStream(s, start, end)
+	return newStream(s, start, limit)
 }
diff --git a/services/syncbase/store/memstore/store.go b/services/syncbase/store/memstore/store.go
index e844b20..4aa7a6e 100644
--- a/services/syncbase/store/memstore/store.go
+++ b/services/syncbase/store/memstore/store.go
@@ -46,10 +46,10 @@
 }
 
 // Scan implements the store.StoreReader interface.
-func (st *memstore) Scan(start, end []byte) store.Stream {
+func (st *memstore) Scan(start, limit []byte) store.Stream {
 	st.mu.Lock()
 	defer st.mu.Unlock()
-	return newStream(newSnapshot(st), start, end)
+	return newStream(newSnapshot(st), start, limit)
 }
 
 // Put implements the store.StoreWriter interface.
diff --git a/services/syncbase/store/memstore/store_test.go b/services/syncbase/store/memstore/store_test.go
index 4006ce7..4211225 100644
--- a/services/syncbase/store/memstore/store_test.go
+++ b/services/syncbase/store/memstore/store_test.go
@@ -24,12 +24,18 @@
 	runTest(t, test.RunSnapshotTest)
 }
 
+// TODO(rogulenko): Enable this test once memstore.Close causes memstore to
+// disallow subsequent operations.
+/*
 func TestStoreState(t *testing.T) {
-	// TODO(rogulenko): Enable this test once memstore.Close causes memstore to
-	// disallow subsequent operations.
-	// runTest(t, test.RunStoreStateTest)
+	runTest(t, test.RunStoreStateTest)
 }
 
+func TestClose(t *testing.T) {
+	runTest(t, test.RunCloseTest)
+}
+*/
+
 func TestReadWriteBasic(t *testing.T) {
 	runTest(t, test.RunReadWriteBasicTest)
 }
diff --git a/services/syncbase/store/memstore/transaction.go b/services/syncbase/store/memstore/transaction.go
index 56dcbdd..3298ff1 100644
--- a/services/syncbase/store/memstore/transaction.go
+++ b/services/syncbase/store/memstore/transaction.go
@@ -61,20 +61,6 @@
 	return nil
 }
 
-// ResetForRetry implements the store.Transaction interface.
-func (tx *transaction) ResetForRetry() {
-	tx.mu.Lock()
-	defer tx.mu.Unlock()
-	tx.puts = make(map[string][]byte)
-	tx.deletes = make(map[string]struct{})
-	tx.err = nil
-	tx.st.mu.Lock()
-	defer tx.st.mu.Unlock() // note, defer is last-in-first-out
-	tx.st.lastSeq++
-	tx.seq = tx.st.lastSeq
-	tx.sn = newSnapshot(tx.st)
-}
-
 // Get implements the store.StoreReader interface.
 func (tx *transaction) Get(key, valbuf []byte) ([]byte, error) {
 	tx.mu.Lock()
@@ -86,13 +72,13 @@
 }
 
 // Scan implements the store.StoreReader interface.
-func (tx *transaction) Scan(start, end []byte) store.Stream {
+func (tx *transaction) Scan(start, limit []byte) store.Stream {
 	tx.mu.Lock()
 	defer tx.mu.Unlock()
 	if err := tx.error(); err != nil {
 		return &store.InvalidStream{err}
 	}
-	return newStream(tx.sn, start, end)
+	return newStream(tx.sn, start, limit)
 }
 
 // Put implements the store.StoreWriter interface.
diff --git a/services/syncbase/store/model.go b/services/syncbase/store/model.go
index 8d85b2a..f772f40 100644
--- a/services/syncbase/store/model.go
+++ b/services/syncbase/store/model.go
@@ -67,10 +67,6 @@
 
 	// Abort aborts the transaction.
 	Abort() error
-
-	// ResetForRetry resets the transaction. It's equivalent to aborting the
-	// transaction and creating a new one, but more efficient.
-	ResetForRetry()
 }
 
 // Snapshot is a handle to particular state in time of a Store.
diff --git a/services/syncbase/store/test/store.go b/services/syncbase/store/test/store.go
index 08b39b5..5ce2d9f 100644
--- a/services/syncbase/store/test/store.go
+++ b/services/syncbase/store/test/store.go
@@ -210,3 +210,45 @@
 		t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
 	}
 }
+
+// RunCloseTest verifies that child objects are closed when the parent object is
+// closed.
+func RunCloseTest(t *testing.T, st store.Store) {
+	key1, value1 := []byte("key1"), []byte("value1")
+	st.Put(key1, value1)
+
+	var streams []store.Stream
+	var snapshots []store.Snapshot
+	var transactions []store.Transaction
+	// TODO(rogulenko): make multiple transactions.
+	tx := st.NewTransaction()
+	for i := 0; i < 10; i++ {
+		streams = append(streams, st.Scan([]byte("a"), []byte("z")))
+		snapshot := st.NewSnapshot()
+		for j := 0; j < 10; j++ {
+			streams = append(streams, snapshot.Scan([]byte("a"), []byte("z")))
+			streams = append(streams, tx.Scan([]byte("a"), []byte("z")))
+		}
+		snapshots = append(snapshots, snapshot)
+		transactions = append(transactions, tx)
+	}
+	st.Close()
+
+	for _, stream := range streams {
+		if got, want := stream.Err().Error(), "canceled stream"; !strings.Contains(got, want) {
+			t.Fatalf("unexpected error: got %v, want %v", got, want)
+		}
+	}
+	for _, snapshot := range snapshots {
+		_, err := snapshot.Get(key1, nil)
+		if got, want := err.Error(), "closed snapshot"; !strings.Contains(got, want) {
+			t.Fatalf("unexpected error: got %v, want %v", got, want)
+		}
+	}
+	for _, tx := range transactions {
+		_, err := tx.Get(key1, nil)
+		if got, want := err.Error(), "aborted transaction"; !strings.Contains(got, want) {
+			t.Fatalf("unexpected error: got %v, want %v", got, want)
+		}
+	}
+}