store/memstore: recursively close objects

Make sure that ia object 'a' is closed, then object 'b' created
from 'a' is also closed, where 'a'->'b' are:
store -> transaction -> stream
store -> snapshot -> stream
store -> stream

Change-Id: Iebc3776f057bb988efb4f7ea6bdbfd0055abc8ca
diff --git a/services/syncbase/store/leveldb/db.go b/services/syncbase/store/leveldb/db.go
index a09c8f1..e5e32cb 100644
--- a/services/syncbase/store/leveldb/db.go
+++ b/services/syncbase/store/leveldb/db.go
@@ -22,7 +22,7 @@
 type db struct {
 	// mu protects the state of the db.
 	mu   sync.RWMutex
-	node *resourceNode
+	node *store.ResourceNode
 	cDb  *C.leveldb_t
 	// Default read/write options.
 	readOptions  *C.leveldb_readoptions_t
@@ -54,7 +54,7 @@
 	readOptions := C.leveldb_readoptions_create()
 	C.leveldb_readoptions_set_verify_checksums(readOptions, 1)
 	return &db{
-		node:         newResourceNode(),
+		node:         store.NewResourceNode(),
 		cDb:          cDb,
 		readOptions:  readOptions,
 		writeOptions: C.leveldb_writeoptions_create(),
@@ -68,7 +68,7 @@
 	if d.err != nil {
 		return store.WrapError(d.err)
 	}
-	d.node.close()
+	d.node.Close()
 	C.leveldb_close(d.cDb)
 	d.cDb = nil
 	C.leveldb_readoptions_destroy(d.readOptions)
diff --git a/services/syncbase/store/leveldb/snapshot.go b/services/syncbase/store/leveldb/snapshot.go
index 62dc77f..9a4ad6e 100644
--- a/services/syncbase/store/leveldb/snapshot.go
+++ b/services/syncbase/store/leveldb/snapshot.go
@@ -18,7 +18,7 @@
 type snapshot struct {
 	// mu protects the state of the snapshot.
 	mu        sync.RWMutex
-	node      *resourceNode
+	node      *store.ResourceNode
 	d         *db
 	cSnapshot *C.leveldb_snapshot_t
 	cOpts     *C.leveldb_readoptions_t
@@ -27,18 +27,18 @@
 
 var _ store.Snapshot = (*snapshot)(nil)
 
-func newSnapshot(d *db, parent *resourceNode) *snapshot {
+func newSnapshot(d *db, parent *store.ResourceNode) *snapshot {
 	cSnapshot := C.leveldb_create_snapshot(d.cDb)
 	cOpts := C.leveldb_readoptions_create()
 	C.leveldb_readoptions_set_verify_checksums(cOpts, 1)
 	C.leveldb_readoptions_set_snapshot(cOpts, cSnapshot)
 	s := &snapshot{
-		node:      newResourceNode(),
+		node:      store.NewResourceNode(),
 		d:         d,
 		cSnapshot: cSnapshot,
 		cOpts:     cOpts,
 	}
-	parent.addChild(s.node, func() {
+	parent.AddChild(s.node, func() {
 		s.Close()
 	})
 	return s
@@ -51,7 +51,7 @@
 	if s.err != nil {
 		return store.WrapError(s.err)
 	}
-	s.node.close()
+	s.node.Close()
 	C.leveldb_readoptions_destroy(s.cOpts)
 	s.cOpts = nil
 	C.leveldb_release_snapshot(s.d.cDb, s.cSnapshot)
diff --git a/services/syncbase/store/leveldb/stream.go b/services/syncbase/store/leveldb/stream.go
index ec167c3..abf230e 100644
--- a/services/syncbase/store/leveldb/stream.go
+++ b/services/syncbase/store/leveldb/stream.go
@@ -20,7 +20,7 @@
 type stream struct {
 	// mu protects the state of the stream.
 	mu    sync.Mutex
-	node  *resourceNode
+	node  *store.ResourceNode
 	cIter *C.syncbase_leveldb_iterator_t
 	limit []byte
 
@@ -38,15 +38,15 @@
 
 var _ store.Stream = (*stream)(nil)
 
-func newStream(d *db, parent *resourceNode, start, limit []byte, cOpts *C.leveldb_readoptions_t) *stream {
+func newStream(d *db, parent *store.ResourceNode, start, limit []byte, cOpts *C.leveldb_readoptions_t) *stream {
 	cStr, size := cSlice(start)
 	cIter := C.syncbase_leveldb_create_iterator(d.cDb, cOpts, cStr, size)
 	s := &stream{
-		node:  newResourceNode(),
+		node:  store.NewResourceNode(),
 		cIter: cIter,
 		limit: limit,
 	}
-	parent.addChild(s.node, func() {
+	parent.AddChild(s.node, func() {
 		s.Cancel()
 	})
 	return s
@@ -55,7 +55,7 @@
 // destroyLeveldbIter destroys the underlying C iterator.
 // Assumes mu is held.
 func (s *stream) destroyLeveldbIter() {
-	s.node.close()
+	s.node.Close()
 	C.syncbase_leveldb_iter_destroy(s.cIter)
 	s.cIter = nil
 }
diff --git a/services/syncbase/store/leveldb/transaction.go b/services/syncbase/store/leveldb/transaction.go
index 609e55f..6dfa250 100644
--- a/services/syncbase/store/leveldb/transaction.go
+++ b/services/syncbase/store/leveldb/transaction.go
@@ -18,7 +18,7 @@
 type transaction struct {
 	// mu protects the state of the transaction.
 	mu       sync.Mutex
-	node     *resourceNode
+	node     *store.ResourceNode
 	d        *db
 	snapshot store.Snapshot
 	batch    *C.leveldb_writebatch_t
@@ -28,15 +28,17 @@
 
 var _ store.Transaction = (*transaction)(nil)
 
-func newTransaction(d *db, parent *resourceNode) *transaction {
+func newTransaction(d *db, parent *store.ResourceNode) *transaction {
+	node := store.NewResourceNode()
+	snapshot := newSnapshot(d, node)
 	tx := &transaction{
-		node:     newResourceNode(),
+		node:     node,
 		d:        d,
-		snapshot: d.NewSnapshot(),
+		snapshot: snapshot,
 		batch:    C.leveldb_writebatch_create(),
 		cOpts:    d.writeOptions,
 	}
-	parent.addChild(tx.node, func() {
+	parent.AddChild(tx.node, func() {
 		tx.Abort()
 	})
 	return tx
@@ -46,8 +48,7 @@
 // Assumes mu is held.
 func (tx *transaction) close() {
 	tx.d.txmu.Unlock()
-	tx.node.close()
-	tx.snapshot.Close()
+	tx.node.Close()
 	C.leveldb_writebatch_destroy(tx.batch)
 	tx.batch = nil
 	if tx.cOpts != tx.d.writeOptions {
diff --git a/services/syncbase/store/memstore/snapshot.go b/services/syncbase/store/memstore/snapshot.go
index a080032..cd39f77 100644
--- a/services/syncbase/store/memstore/snapshot.go
+++ b/services/syncbase/store/memstore/snapshot.go
@@ -13,6 +13,7 @@
 
 type snapshot struct {
 	mu   sync.Mutex
+	node *store.ResourceNode
 	data map[string][]byte
 	err  error
 }
@@ -20,12 +21,19 @@
 var _ store.Snapshot = (*snapshot)(nil)
 
 // Assumes st lock is held.
-func newSnapshot(st *memstore) *snapshot {
+func newSnapshot(st *memstore, parent *store.ResourceNode) *snapshot {
 	dataCopy := map[string][]byte{}
 	for k, v := range st.data {
 		dataCopy[k] = v
 	}
-	return &snapshot{data: dataCopy}
+	s := &snapshot{
+		node: store.NewResourceNode(),
+		data: dataCopy,
+	}
+	parent.AddChild(s.node, func() {
+		s.Close()
+	})
+	return s
 }
 
 // Close implements the store.Snapshot interface.
@@ -35,6 +43,7 @@
 	if s.err != nil {
 		return store.WrapError(s.err)
 	}
+	s.node.Close()
 	s.err = verror.New(verror.ErrCanceled, nil, "closed snapshot")
 	return nil
 }
@@ -60,5 +69,5 @@
 	if s.err != nil {
 		return &store.InvalidStream{s.err}
 	}
-	return newStream(s, start, limit)
+	return newStream(s, s.node, start, limit)
 }
diff --git a/services/syncbase/store/memstore/store.go b/services/syncbase/store/memstore/store.go
index a6b5aee..a3e98b1 100644
--- a/services/syncbase/store/memstore/store.go
+++ b/services/syncbase/store/memstore/store.go
@@ -15,6 +15,7 @@
 
 type memstore struct {
 	mu   sync.Mutex
+	node *store.ResourceNode
 	data map[string][]byte
 	err  error
 	// Most recent sequence number handed out.
@@ -27,7 +28,10 @@
 
 // New creates a new memstore.
 func New() store.Store {
-	return &memstore{data: map[string][]byte{}}
+	return &memstore{
+		data: map[string][]byte{},
+		node: store.NewResourceNode(),
+	}
 }
 
 // Close implements the store.Store interface.
@@ -37,6 +41,7 @@
 	if st.err != nil {
 		return store.WrapError(st.err)
 	}
+	st.node.Close()
 	st.err = verror.New(verror.ErrCanceled, nil, "closed store")
 	return nil
 }
@@ -62,7 +67,7 @@
 	if st.err != nil {
 		return &store.InvalidStream{st.err}
 	}
-	return newStream(newSnapshot(st), start, limit)
+	return newSnapshot(st, st.node).Scan(start, limit)
 }
 
 // Put implements the store.StoreWriter interface.
@@ -87,7 +92,7 @@
 		return &store.InvalidTransaction{st.err}
 	}
 	st.lastSeq++
-	return newTransaction(st, st.lastSeq)
+	return newTransaction(st, st.node, st.lastSeq)
 }
 
 // NewSnapshot implements the store.Store interface.
@@ -97,5 +102,5 @@
 	if st.err != nil {
 		return &store.InvalidSnapshot{st.err}
 	}
-	return newSnapshot(st)
+	return newSnapshot(st, st.node)
 }
diff --git a/services/syncbase/store/memstore/store_test.go b/services/syncbase/store/memstore/store_test.go
index 357cf20..1b11ac3 100644
--- a/services/syncbase/store/memstore/store_test.go
+++ b/services/syncbase/store/memstore/store_test.go
@@ -28,13 +28,9 @@
 	runTest(t, test.RunStoreStateTest)
 }
 
-// TODO(rogulenko): Enable this test once memstore.Close closes all downstream
-// resources.
-/*
 func TestClose(t *testing.T) {
 	runTest(t, test.RunCloseTest)
 }
-*/
 
 func TestReadWriteBasic(t *testing.T) {
 	runTest(t, test.RunReadWriteBasicTest)
diff --git a/services/syncbase/store/memstore/stream.go b/services/syncbase/store/memstore/stream.go
index 09ccfe3..5c5a70b 100644
--- a/services/syncbase/store/memstore/stream.go
+++ b/services/syncbase/store/memstore/stream.go
@@ -14,6 +14,7 @@
 
 type stream struct {
 	mu        sync.Mutex
+	node      *store.ResourceNode
 	sn        *snapshot
 	keys      []string
 	currIndex int
@@ -23,7 +24,7 @@
 
 var _ store.Stream = (*stream)(nil)
 
-func newStream(sn *snapshot, start, limit []byte) *stream {
+func newStream(sn *snapshot, parent *store.ResourceNode, start, limit []byte) *stream {
 	keys := []string{}
 	for k := range sn.data {
 		if k >= string(start) && (len(limit) == 0 || k < string(limit)) {
@@ -31,11 +32,16 @@
 		}
 	}
 	sort.Strings(keys)
-	return &stream{
+	s := &stream{
+		node:      store.NewResourceNode(),
 		sn:        sn,
 		keys:      keys,
 		currIndex: -1,
 	}
+	parent.AddChild(s.node, func() {
+		s.Cancel()
+	})
+	return s
 }
 
 // Advance implements the store.Stream interface.
@@ -92,5 +98,6 @@
 	if s.err != nil {
 		return
 	}
+	s.node.Close()
 	s.err = verror.New(verror.ErrCanceled, nil, "canceled stream")
 }
diff --git a/services/syncbase/store/memstore/transaction.go b/services/syncbase/store/memstore/transaction.go
index ba14496..81b51d0 100644
--- a/services/syncbase/store/memstore/transaction.go
+++ b/services/syncbase/store/memstore/transaction.go
@@ -17,9 +17,10 @@
 )
 
 type transaction struct {
-	mu sync.Mutex
-	st *memstore
-	sn *snapshot
+	mu   sync.Mutex
+	node *store.ResourceNode
+	st   *memstore
+	sn   *snapshot
 	// The following fields are used to determine whether method calls should
 	// error out.
 	err         error
@@ -32,15 +33,22 @@
 
 var _ store.Transaction = (*transaction)(nil)
 
-func newTransaction(st *memstore, seq uint64) *transaction {
-	return &transaction{
+func newTransaction(st *memstore, parent *store.ResourceNode, seq uint64) *transaction {
+	node := store.NewResourceNode()
+	sn := newSnapshot(st, node)
+	tx := &transaction{
+		node:        node,
 		st:          st,
-		sn:          newSnapshot(st),
+		sn:          sn,
 		seq:         seq,
 		createdTime: time.Now(),
 		puts:        map[string][]byte{},
 		deletes:     map[string]struct{}{},
 	}
+	parent.AddChild(tx.node, func() {
+		tx.Abort()
+	})
+	return tx
 }
 
 func (tx *transaction) expired() bool {
@@ -74,7 +82,7 @@
 	if err := tx.error(); err != nil {
 		return &store.InvalidStream{err}
 	}
-	return newStream(tx.sn, start, limit)
+	return tx.sn.Scan(start, limit)
 }
 
 // Put implements the store.StoreWriter interface.
@@ -108,7 +116,7 @@
 	if err := tx.error(); err != nil {
 		return err
 	}
-	tx.sn.Close()
+	tx.node.Close()
 	tx.st.mu.Lock()
 	defer tx.st.mu.Unlock() // note, defer is last-in-first-out
 	if tx.seq <= tx.st.lastCommitSeq {
@@ -135,7 +143,7 @@
 	if err := tx.error(); err != nil {
 		return err
 	}
-	tx.sn.Close()
+	tx.node.Close()
 	tx.err = verror.New(verror.ErrCanceled, nil, "aborted transaction")
 	return nil
 }
diff --git a/services/syncbase/store/leveldb/resource_node.go b/services/syncbase/store/resource_node.go
similarity index 64%
rename from services/syncbase/store/leveldb/resource_node.go
rename to services/syncbase/store/resource_node.go
index 90e598d..c7228b2 100644
--- a/services/syncbase/store/leveldb/resource_node.go
+++ b/services/syncbase/store/resource_node.go
@@ -2,32 +2,33 @@
 // Use of this source code is governed by a BSD-style
 // license that can be found in the LICENSE file.
 
-package leveldb
+package store
 
 import (
 	"sync"
 )
 
-// resourceNode is a node in a dependency graph. This graph is used to ensure
+// ResourceNode is a node in a dependency graph. This graph is used to ensure
 // that when a resource is freed, downstream resources are also freed. For
 // example, closing a store closes all downstream transactions, snapshots and
 // streams.
-type resourceNode struct {
+type ResourceNode struct {
 	mu       sync.Mutex
-	parent   *resourceNode
-	children map[*resourceNode]func()
+	parent   *ResourceNode
+	children map[*ResourceNode]func()
 }
 
-func newResourceNode() *resourceNode {
-	return &resourceNode{
-		children: make(map[*resourceNode]func()),
+// NewResourceNode creates a new isolated node in the dependency graph.
+func NewResourceNode() *ResourceNode {
+	return &ResourceNode{
+		children: make(map[*ResourceNode]func()),
 	}
 }
 
-// addChild adds a parent-child relation between this node and the provided
+// AddChild adds a parent-child relation between this node and the provided
 // node. The provided function is called to close the child when this node is
 // closed.
-func (r *resourceNode) addChild(node *resourceNode, closefn func()) {
+func (r *ResourceNode) AddChild(node *ResourceNode, closefn func()) {
 	r.mu.Lock()
 	defer r.mu.Unlock()
 	if r.children == nil {
@@ -39,8 +40,8 @@
 
 // removeChild removes the parent-child relation between this node and the
 // provided node, enabling Go's garbage collector to free the resources
-// associated with the node if there are no more references to it.
-func (r *resourceNode) removeChild(node *resourceNode) {
+// associated with the child node if there are no more references to it.
+func (r *ResourceNode) removeChild(node *ResourceNode) {
 	r.mu.Lock()
 	defer r.mu.Unlock()
 	if r.children == nil {
@@ -50,9 +51,9 @@
 	delete(r.children, node)
 }
 
-// close closes this node and detaches it from its parent. All child nodes
-// are closed using close functions provided to addChild.
-func (r *resourceNode) close() {
+// Close closes this node and detaches it from its parent. All of this node's
+// children are closed using close functions provided to AddChild.
+func (r *ResourceNode) Close() {
 	r.mu.Lock()
 	if r.parent != nil {
 		// If there is a node V with parent P and we decide to explicitly close V,