store/memstore: recursively close objects
Make sure that ia object 'a' is closed, then object 'b' created
from 'a' is also closed, where 'a'->'b' are:
store -> transaction -> stream
store -> snapshot -> stream
store -> stream
Change-Id: Iebc3776f057bb988efb4f7ea6bdbfd0055abc8ca
diff --git a/services/syncbase/store/leveldb/db.go b/services/syncbase/store/leveldb/db.go
index a09c8f1..e5e32cb 100644
--- a/services/syncbase/store/leveldb/db.go
+++ b/services/syncbase/store/leveldb/db.go
@@ -22,7 +22,7 @@
type db struct {
// mu protects the state of the db.
mu sync.RWMutex
- node *resourceNode
+ node *store.ResourceNode
cDb *C.leveldb_t
// Default read/write options.
readOptions *C.leveldb_readoptions_t
@@ -54,7 +54,7 @@
readOptions := C.leveldb_readoptions_create()
C.leveldb_readoptions_set_verify_checksums(readOptions, 1)
return &db{
- node: newResourceNode(),
+ node: store.NewResourceNode(),
cDb: cDb,
readOptions: readOptions,
writeOptions: C.leveldb_writeoptions_create(),
@@ -68,7 +68,7 @@
if d.err != nil {
return store.WrapError(d.err)
}
- d.node.close()
+ d.node.Close()
C.leveldb_close(d.cDb)
d.cDb = nil
C.leveldb_readoptions_destroy(d.readOptions)
diff --git a/services/syncbase/store/leveldb/snapshot.go b/services/syncbase/store/leveldb/snapshot.go
index 62dc77f..9a4ad6e 100644
--- a/services/syncbase/store/leveldb/snapshot.go
+++ b/services/syncbase/store/leveldb/snapshot.go
@@ -18,7 +18,7 @@
type snapshot struct {
// mu protects the state of the snapshot.
mu sync.RWMutex
- node *resourceNode
+ node *store.ResourceNode
d *db
cSnapshot *C.leveldb_snapshot_t
cOpts *C.leveldb_readoptions_t
@@ -27,18 +27,18 @@
var _ store.Snapshot = (*snapshot)(nil)
-func newSnapshot(d *db, parent *resourceNode) *snapshot {
+func newSnapshot(d *db, parent *store.ResourceNode) *snapshot {
cSnapshot := C.leveldb_create_snapshot(d.cDb)
cOpts := C.leveldb_readoptions_create()
C.leveldb_readoptions_set_verify_checksums(cOpts, 1)
C.leveldb_readoptions_set_snapshot(cOpts, cSnapshot)
s := &snapshot{
- node: newResourceNode(),
+ node: store.NewResourceNode(),
d: d,
cSnapshot: cSnapshot,
cOpts: cOpts,
}
- parent.addChild(s.node, func() {
+ parent.AddChild(s.node, func() {
s.Close()
})
return s
@@ -51,7 +51,7 @@
if s.err != nil {
return store.WrapError(s.err)
}
- s.node.close()
+ s.node.Close()
C.leveldb_readoptions_destroy(s.cOpts)
s.cOpts = nil
C.leveldb_release_snapshot(s.d.cDb, s.cSnapshot)
diff --git a/services/syncbase/store/leveldb/stream.go b/services/syncbase/store/leveldb/stream.go
index ec167c3..abf230e 100644
--- a/services/syncbase/store/leveldb/stream.go
+++ b/services/syncbase/store/leveldb/stream.go
@@ -20,7 +20,7 @@
type stream struct {
// mu protects the state of the stream.
mu sync.Mutex
- node *resourceNode
+ node *store.ResourceNode
cIter *C.syncbase_leveldb_iterator_t
limit []byte
@@ -38,15 +38,15 @@
var _ store.Stream = (*stream)(nil)
-func newStream(d *db, parent *resourceNode, start, limit []byte, cOpts *C.leveldb_readoptions_t) *stream {
+func newStream(d *db, parent *store.ResourceNode, start, limit []byte, cOpts *C.leveldb_readoptions_t) *stream {
cStr, size := cSlice(start)
cIter := C.syncbase_leveldb_create_iterator(d.cDb, cOpts, cStr, size)
s := &stream{
- node: newResourceNode(),
+ node: store.NewResourceNode(),
cIter: cIter,
limit: limit,
}
- parent.addChild(s.node, func() {
+ parent.AddChild(s.node, func() {
s.Cancel()
})
return s
@@ -55,7 +55,7 @@
// destroyLeveldbIter destroys the underlying C iterator.
// Assumes mu is held.
func (s *stream) destroyLeveldbIter() {
- s.node.close()
+ s.node.Close()
C.syncbase_leveldb_iter_destroy(s.cIter)
s.cIter = nil
}
diff --git a/services/syncbase/store/leveldb/transaction.go b/services/syncbase/store/leveldb/transaction.go
index 609e55f..6dfa250 100644
--- a/services/syncbase/store/leveldb/transaction.go
+++ b/services/syncbase/store/leveldb/transaction.go
@@ -18,7 +18,7 @@
type transaction struct {
// mu protects the state of the transaction.
mu sync.Mutex
- node *resourceNode
+ node *store.ResourceNode
d *db
snapshot store.Snapshot
batch *C.leveldb_writebatch_t
@@ -28,15 +28,17 @@
var _ store.Transaction = (*transaction)(nil)
-func newTransaction(d *db, parent *resourceNode) *transaction {
+func newTransaction(d *db, parent *store.ResourceNode) *transaction {
+ node := store.NewResourceNode()
+ snapshot := newSnapshot(d, node)
tx := &transaction{
- node: newResourceNode(),
+ node: node,
d: d,
- snapshot: d.NewSnapshot(),
+ snapshot: snapshot,
batch: C.leveldb_writebatch_create(),
cOpts: d.writeOptions,
}
- parent.addChild(tx.node, func() {
+ parent.AddChild(tx.node, func() {
tx.Abort()
})
return tx
@@ -46,8 +48,7 @@
// Assumes mu is held.
func (tx *transaction) close() {
tx.d.txmu.Unlock()
- tx.node.close()
- tx.snapshot.Close()
+ tx.node.Close()
C.leveldb_writebatch_destroy(tx.batch)
tx.batch = nil
if tx.cOpts != tx.d.writeOptions {
diff --git a/services/syncbase/store/memstore/snapshot.go b/services/syncbase/store/memstore/snapshot.go
index a080032..cd39f77 100644
--- a/services/syncbase/store/memstore/snapshot.go
+++ b/services/syncbase/store/memstore/snapshot.go
@@ -13,6 +13,7 @@
type snapshot struct {
mu sync.Mutex
+ node *store.ResourceNode
data map[string][]byte
err error
}
@@ -20,12 +21,19 @@
var _ store.Snapshot = (*snapshot)(nil)
// Assumes st lock is held.
-func newSnapshot(st *memstore) *snapshot {
+func newSnapshot(st *memstore, parent *store.ResourceNode) *snapshot {
dataCopy := map[string][]byte{}
for k, v := range st.data {
dataCopy[k] = v
}
- return &snapshot{data: dataCopy}
+ s := &snapshot{
+ node: store.NewResourceNode(),
+ data: dataCopy,
+ }
+ parent.AddChild(s.node, func() {
+ s.Close()
+ })
+ return s
}
// Close implements the store.Snapshot interface.
@@ -35,6 +43,7 @@
if s.err != nil {
return store.WrapError(s.err)
}
+ s.node.Close()
s.err = verror.New(verror.ErrCanceled, nil, "closed snapshot")
return nil
}
@@ -60,5 +69,5 @@
if s.err != nil {
return &store.InvalidStream{s.err}
}
- return newStream(s, start, limit)
+ return newStream(s, s.node, start, limit)
}
diff --git a/services/syncbase/store/memstore/store.go b/services/syncbase/store/memstore/store.go
index a6b5aee..a3e98b1 100644
--- a/services/syncbase/store/memstore/store.go
+++ b/services/syncbase/store/memstore/store.go
@@ -15,6 +15,7 @@
type memstore struct {
mu sync.Mutex
+ node *store.ResourceNode
data map[string][]byte
err error
// Most recent sequence number handed out.
@@ -27,7 +28,10 @@
// New creates a new memstore.
func New() store.Store {
- return &memstore{data: map[string][]byte{}}
+ return &memstore{
+ data: map[string][]byte{},
+ node: store.NewResourceNode(),
+ }
}
// Close implements the store.Store interface.
@@ -37,6 +41,7 @@
if st.err != nil {
return store.WrapError(st.err)
}
+ st.node.Close()
st.err = verror.New(verror.ErrCanceled, nil, "closed store")
return nil
}
@@ -62,7 +67,7 @@
if st.err != nil {
return &store.InvalidStream{st.err}
}
- return newStream(newSnapshot(st), start, limit)
+ return newSnapshot(st, st.node).Scan(start, limit)
}
// Put implements the store.StoreWriter interface.
@@ -87,7 +92,7 @@
return &store.InvalidTransaction{st.err}
}
st.lastSeq++
- return newTransaction(st, st.lastSeq)
+ return newTransaction(st, st.node, st.lastSeq)
}
// NewSnapshot implements the store.Store interface.
@@ -97,5 +102,5 @@
if st.err != nil {
return &store.InvalidSnapshot{st.err}
}
- return newSnapshot(st)
+ return newSnapshot(st, st.node)
}
diff --git a/services/syncbase/store/memstore/store_test.go b/services/syncbase/store/memstore/store_test.go
index 357cf20..1b11ac3 100644
--- a/services/syncbase/store/memstore/store_test.go
+++ b/services/syncbase/store/memstore/store_test.go
@@ -28,13 +28,9 @@
runTest(t, test.RunStoreStateTest)
}
-// TODO(rogulenko): Enable this test once memstore.Close closes all downstream
-// resources.
-/*
func TestClose(t *testing.T) {
runTest(t, test.RunCloseTest)
}
-*/
func TestReadWriteBasic(t *testing.T) {
runTest(t, test.RunReadWriteBasicTest)
diff --git a/services/syncbase/store/memstore/stream.go b/services/syncbase/store/memstore/stream.go
index 09ccfe3..5c5a70b 100644
--- a/services/syncbase/store/memstore/stream.go
+++ b/services/syncbase/store/memstore/stream.go
@@ -14,6 +14,7 @@
type stream struct {
mu sync.Mutex
+ node *store.ResourceNode
sn *snapshot
keys []string
currIndex int
@@ -23,7 +24,7 @@
var _ store.Stream = (*stream)(nil)
-func newStream(sn *snapshot, start, limit []byte) *stream {
+func newStream(sn *snapshot, parent *store.ResourceNode, start, limit []byte) *stream {
keys := []string{}
for k := range sn.data {
if k >= string(start) && (len(limit) == 0 || k < string(limit)) {
@@ -31,11 +32,16 @@
}
}
sort.Strings(keys)
- return &stream{
+ s := &stream{
+ node: store.NewResourceNode(),
sn: sn,
keys: keys,
currIndex: -1,
}
+ parent.AddChild(s.node, func() {
+ s.Cancel()
+ })
+ return s
}
// Advance implements the store.Stream interface.
@@ -92,5 +98,6 @@
if s.err != nil {
return
}
+ s.node.Close()
s.err = verror.New(verror.ErrCanceled, nil, "canceled stream")
}
diff --git a/services/syncbase/store/memstore/transaction.go b/services/syncbase/store/memstore/transaction.go
index ba14496..81b51d0 100644
--- a/services/syncbase/store/memstore/transaction.go
+++ b/services/syncbase/store/memstore/transaction.go
@@ -17,9 +17,10 @@
)
type transaction struct {
- mu sync.Mutex
- st *memstore
- sn *snapshot
+ mu sync.Mutex
+ node *store.ResourceNode
+ st *memstore
+ sn *snapshot
// The following fields are used to determine whether method calls should
// error out.
err error
@@ -32,15 +33,22 @@
var _ store.Transaction = (*transaction)(nil)
-func newTransaction(st *memstore, seq uint64) *transaction {
- return &transaction{
+func newTransaction(st *memstore, parent *store.ResourceNode, seq uint64) *transaction {
+ node := store.NewResourceNode()
+ sn := newSnapshot(st, node)
+ tx := &transaction{
+ node: node,
st: st,
- sn: newSnapshot(st),
+ sn: sn,
seq: seq,
createdTime: time.Now(),
puts: map[string][]byte{},
deletes: map[string]struct{}{},
}
+ parent.AddChild(tx.node, func() {
+ tx.Abort()
+ })
+ return tx
}
func (tx *transaction) expired() bool {
@@ -74,7 +82,7 @@
if err := tx.error(); err != nil {
return &store.InvalidStream{err}
}
- return newStream(tx.sn, start, limit)
+ return tx.sn.Scan(start, limit)
}
// Put implements the store.StoreWriter interface.
@@ -108,7 +116,7 @@
if err := tx.error(); err != nil {
return err
}
- tx.sn.Close()
+ tx.node.Close()
tx.st.mu.Lock()
defer tx.st.mu.Unlock() // note, defer is last-in-first-out
if tx.seq <= tx.st.lastCommitSeq {
@@ -135,7 +143,7 @@
if err := tx.error(); err != nil {
return err
}
- tx.sn.Close()
+ tx.node.Close()
tx.err = verror.New(verror.ErrCanceled, nil, "aborted transaction")
return nil
}
diff --git a/services/syncbase/store/leveldb/resource_node.go b/services/syncbase/store/resource_node.go
similarity index 64%
rename from services/syncbase/store/leveldb/resource_node.go
rename to services/syncbase/store/resource_node.go
index 90e598d..c7228b2 100644
--- a/services/syncbase/store/leveldb/resource_node.go
+++ b/services/syncbase/store/resource_node.go
@@ -2,32 +2,33 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
-package leveldb
+package store
import (
"sync"
)
-// resourceNode is a node in a dependency graph. This graph is used to ensure
+// ResourceNode is a node in a dependency graph. This graph is used to ensure
// that when a resource is freed, downstream resources are also freed. For
// example, closing a store closes all downstream transactions, snapshots and
// streams.
-type resourceNode struct {
+type ResourceNode struct {
mu sync.Mutex
- parent *resourceNode
- children map[*resourceNode]func()
+ parent *ResourceNode
+ children map[*ResourceNode]func()
}
-func newResourceNode() *resourceNode {
- return &resourceNode{
- children: make(map[*resourceNode]func()),
+// NewResourceNode creates a new isolated node in the dependency graph.
+func NewResourceNode() *ResourceNode {
+ return &ResourceNode{
+ children: make(map[*ResourceNode]func()),
}
}
-// addChild adds a parent-child relation between this node and the provided
+// AddChild adds a parent-child relation between this node and the provided
// node. The provided function is called to close the child when this node is
// closed.
-func (r *resourceNode) addChild(node *resourceNode, closefn func()) {
+func (r *ResourceNode) AddChild(node *ResourceNode, closefn func()) {
r.mu.Lock()
defer r.mu.Unlock()
if r.children == nil {
@@ -39,8 +40,8 @@
// removeChild removes the parent-child relation between this node and the
// provided node, enabling Go's garbage collector to free the resources
-// associated with the node if there are no more references to it.
-func (r *resourceNode) removeChild(node *resourceNode) {
+// associated with the child node if there are no more references to it.
+func (r *ResourceNode) removeChild(node *ResourceNode) {
r.mu.Lock()
defer r.mu.Unlock()
if r.children == nil {
@@ -50,9 +51,9 @@
delete(r.children, node)
}
-// close closes this node and detaches it from its parent. All child nodes
-// are closed using close functions provided to addChild.
-func (r *resourceNode) close() {
+// Close closes this node and detaches it from its parent. All of this node's
+// children are closed using close functions provided to AddChild.
+func (r *ResourceNode) Close() {
r.mu.Lock()
if r.parent != nil {
// If there is a node V with parent P and we decide to explicitly close V,