store/leveldb: recursively close objects
Make sure that ia object 'a' is closed, then object 'b' created
from 'a' is also closed, where 'a'->'b' are:
store -> transaction -> stream
store -> snapshot -> stream
store -> stream
Also remove ResetForRetry function from the API of transactions,
because it becomes harder to track the half-closed state of a
transaction and there is not much benefit for having this function
now.
Change-Id: Icf9158bcf3cb1c57f75dd1d9d0cadd95e3350a20
diff --git a/services/syncbase/store/invalid_types.go b/services/syncbase/store/invalid_types.go
index 05bfc13..83e73e9 100644
--- a/services/syncbase/store/invalid_types.go
+++ b/services/syncbase/store/invalid_types.go
@@ -21,7 +21,7 @@
}
// Scan implements the store.StoreReader interface.
-func (s *InvalidSnapshot) Scan(start, end []byte) Stream {
+func (s *InvalidSnapshot) Scan(start, limit []byte) Stream {
return &InvalidStream{s.Error}
}
@@ -72,7 +72,7 @@
}
// Scan implements the store.StoreReader interface.
-func (tx *InvalidTransaction) Scan(start, end []byte) Stream {
+func (tx *InvalidTransaction) Scan(start, limit []byte) Stream {
return &InvalidStream{tx.Error}
}
diff --git a/services/syncbase/store/leveldb/db.go b/services/syncbase/store/leveldb/db.go
index ee49b1c..5e45bd3 100644
--- a/services/syncbase/store/leveldb/db.go
+++ b/services/syncbase/store/leveldb/db.go
@@ -24,9 +24,10 @@
// db is a wrapper around LevelDB that implements the store.Store interface.
type db struct {
- // mu protects cDb.
- mu sync.RWMutex
- cDb *C.leveldb_t
+ // mu protects the state of the db.
+ mu sync.RWMutex
+ node *resourceNode
+ cDb *C.leveldb_t
// Default read/write options.
readOptions *C.leveldb_readoptions_t
writeOptions *C.leveldb_writeoptions_t
@@ -57,6 +58,7 @@
readOptions := C.leveldb_readoptions_create()
C.leveldb_readoptions_set_verify_checksums(readOptions, 1)
return &db{
+ node: newResourceNode(),
cDb: cDb,
readOptions: readOptions,
writeOptions: C.leveldb_writeoptions_create(),
@@ -70,6 +72,7 @@
if d.err != nil {
return d.err
}
+ d.node.close()
C.leveldb_close(d.cDb)
d.cDb = nil
C.leveldb_readoptions_destroy(d.readOptions)
@@ -103,7 +106,7 @@
if d.err != nil {
return &store.InvalidStream{d.err}
}
- return newStream(d, start, limit, d.readOptions)
+ return newStream(d, d.node, start, limit, d.readOptions)
}
// Put implements the store.StoreWriter interface.
@@ -132,7 +135,7 @@
d.txmu.Unlock()
return &store.InvalidTransaction{d.err}
}
- return newTransaction(d)
+ return newTransaction(d, d.node)
}
// NewSnapshot implements the store.Store interface.
@@ -142,7 +145,7 @@
if d.err != nil {
return &store.InvalidSnapshot{d.err}
}
- return newSnapshot(d)
+ return newSnapshot(d, d.node)
}
// getWithOpts returns the value for the given key.
diff --git a/services/syncbase/store/leveldb/db_test.go b/services/syncbase/store/leveldb/db_test.go
index 666cc40..65594f5 100644
--- a/services/syncbase/store/leveldb/db_test.go
+++ b/services/syncbase/store/leveldb/db_test.go
@@ -30,6 +30,10 @@
runTest(t, test.RunStoreStateTest)
}
+func TestClose(t *testing.T) {
+ runTest(t, test.RunCloseTest)
+}
+
func TestReadWriteBasic(t *testing.T) {
runTest(t, test.RunReadWriteBasicTest)
}
diff --git a/services/syncbase/store/leveldb/resource_node.go b/services/syncbase/store/leveldb/resource_node.go
new file mode 100644
index 0000000..90e598d
--- /dev/null
+++ b/services/syncbase/store/leveldb/resource_node.go
@@ -0,0 +1,72 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package leveldb
+
+import (
+ "sync"
+)
+
+// resourceNode is a node in a dependency graph. This graph is used to ensure
+// that when a resource is freed, downstream resources are also freed. For
+// example, closing a store closes all downstream transactions, snapshots and
+// streams.
+type resourceNode struct {
+ mu sync.Mutex
+ parent *resourceNode
+ children map[*resourceNode]func()
+}
+
+func newResourceNode() *resourceNode {
+ return &resourceNode{
+ children: make(map[*resourceNode]func()),
+ }
+}
+
+// addChild adds a parent-child relation between this node and the provided
+// node. The provided function is called to close the child when this node is
+// closed.
+func (r *resourceNode) addChild(node *resourceNode, closefn func()) {
+ r.mu.Lock()
+ defer r.mu.Unlock()
+ if r.children == nil {
+ panic("already closed")
+ }
+ node.parent = r
+ r.children[node] = closefn
+}
+
+// removeChild removes the parent-child relation between this node and the
+// provided node, enabling Go's garbage collector to free the resources
+// associated with the node if there are no more references to it.
+func (r *resourceNode) removeChild(node *resourceNode) {
+ r.mu.Lock()
+ defer r.mu.Unlock()
+ if r.children == nil {
+ // Already closed.
+ return
+ }
+ delete(r.children, node)
+}
+
+// close closes this node and detaches it from its parent. All child nodes
+// are closed using close functions provided to addChild.
+func (r *resourceNode) close() {
+ r.mu.Lock()
+ if r.parent != nil {
+ // If there is a node V with parent P and we decide to explicitly close V,
+ // then we need to remove V from P's children list so that we don't close
+ // V again when P is closed.
+ r.parent.removeChild(r)
+ r.parent = nil
+ }
+ // Copy the children map to a local variable so that the removeChild step
+ // executed from children won't affect the map while we iterate through it.
+ children := r.children
+ r.children = nil
+ r.mu.Unlock()
+ for _, closefn := range children {
+ closefn()
+ }
+}
diff --git a/services/syncbase/store/leveldb/snapshot.go b/services/syncbase/store/leveldb/snapshot.go
index 1e920fd..0d78f03 100644
--- a/services/syncbase/store/leveldb/snapshot.go
+++ b/services/syncbase/store/leveldb/snapshot.go
@@ -18,6 +18,7 @@
type snapshot struct {
// mu protects the state of the snapshot.
mu sync.RWMutex
+ node *resourceNode
d *db
cSnapshot *C.leveldb_snapshot_t
cOpts *C.leveldb_readoptions_t
@@ -26,16 +27,21 @@
var _ store.Snapshot = (*snapshot)(nil)
-func newSnapshot(d *db) *snapshot {
+func newSnapshot(d *db, parent *resourceNode) *snapshot {
cSnapshot := C.leveldb_create_snapshot(d.cDb)
cOpts := C.leveldb_readoptions_create()
C.leveldb_readoptions_set_verify_checksums(cOpts, 1)
C.leveldb_readoptions_set_snapshot(cOpts, cSnapshot)
- return &snapshot{
+ s := &snapshot{
+ node: newResourceNode(),
d: d,
cSnapshot: cSnapshot,
cOpts: cOpts,
}
+ parent.addChild(s.node, func() {
+ s.Close()
+ })
+ return s
}
// Close implements the store.Snapshot interface.
@@ -45,6 +51,7 @@
if s.err != nil {
return s.err
}
+ s.node.close()
C.leveldb_readoptions_destroy(s.cOpts)
s.cOpts = nil
C.leveldb_release_snapshot(s.d.cDb, s.cSnapshot)
@@ -70,5 +77,5 @@
if s.err != nil {
return &store.InvalidStream{s.err}
}
- return newStream(s.d, start, limit, s.cOpts)
+ return newStream(s.d, s.node, start, limit, s.cOpts)
}
diff --git a/services/syncbase/store/leveldb/stream.go b/services/syncbase/store/leveldb/stream.go
index bc146e8..b528626 100644
--- a/services/syncbase/store/leveldb/stream.go
+++ b/services/syncbase/store/leveldb/stream.go
@@ -20,6 +20,7 @@
type stream struct {
// mu protects the state of the stream.
mu sync.Mutex
+ node *resourceNode
cIter *C.syncbase_leveldb_iterator_t
limit []byte
@@ -37,16 +38,24 @@
var _ store.Stream = (*stream)(nil)
-func newStream(d *db, start, limit []byte, cOpts *C.leveldb_readoptions_t) *stream {
+func newStream(d *db, parent *resourceNode, start, limit []byte, cOpts *C.leveldb_readoptions_t) *stream {
cStr, size := cSlice(start)
cIter := C.syncbase_leveldb_create_iterator(d.cDb, cOpts, cStr, size)
- return &stream{
+ s := &stream{
+ node: newResourceNode(),
cIter: cIter,
limit: limit,
}
+ parent.addChild(s.node, func() {
+ s.Cancel()
+ })
+ return s
}
+// destroyLeveldbIter destroys the underlying C iterator.
+// Assumes mu is held.
func (s *stream) destroyLeveldbIter() {
+ s.node.close()
C.syncbase_leveldb_iter_destroy(s.cIter)
s.cIter = nil
}
diff --git a/services/syncbase/store/leveldb/transaction.go b/services/syncbase/store/leveldb/transaction.go
index 2137978..64ea397 100644
--- a/services/syncbase/store/leveldb/transaction.go
+++ b/services/syncbase/store/leveldb/transaction.go
@@ -16,6 +16,7 @@
// transaction is a wrapper around LevelDB WriteBatch that implements
// the store.Transaction interface.
type transaction struct {
+ node *resourceNode
mu sync.Mutex
d *db
snapshot store.Snapshot
@@ -26,18 +27,26 @@
var _ store.Transaction = (*transaction)(nil)
-func newTransaction(d *db) *transaction {
- return &transaction{
+func newTransaction(d *db, parent *resourceNode) *transaction {
+ tx := &transaction{
+ node: newResourceNode(),
d: d,
snapshot: d.NewSnapshot(),
batch: C.leveldb_writebatch_create(),
cOpts: d.writeOptions,
}
+ parent.addChild(tx.node, func() {
+ tx.Abort()
+ })
+ return tx
}
// close frees allocated C objects and releases acquired locks.
+// Assumes mu is held.
func (tx *transaction) close() {
tx.d.txmu.Unlock()
+ tx.node.close()
+ tx.snapshot.Close()
C.leveldb_writebatch_destroy(tx.batch)
tx.batch = nil
if tx.cOpts != tx.d.writeOptions {
@@ -46,19 +55,6 @@
tx.cOpts = nil
}
-// ResetForRetry implements the store.Transaction interface.
-func (tx *transaction) ResetForRetry() {
- tx.mu.Lock()
- defer tx.mu.Unlock()
- if tx.batch == nil {
- panic(tx.err)
- }
- tx.snapshot.Close()
- tx.snapshot = tx.d.NewSnapshot()
- tx.err = nil
- C.leveldb_writebatch_clear(tx.batch)
-}
-
// Get implements the store.StoreReader interface.
func (tx *transaction) Get(key, valbuf []byte) ([]byte, error) {
tx.mu.Lock()
diff --git a/services/syncbase/store/memstore/snapshot.go b/services/syncbase/store/memstore/snapshot.go
index fc7cf0a..c7c016f 100644
--- a/services/syncbase/store/memstore/snapshot.go
+++ b/services/syncbase/store/memstore/snapshot.go
@@ -62,11 +62,11 @@
}
// Scan implements the store.StoreReader interface.
-func (s *snapshot) Scan(start, end []byte) store.Stream {
+func (s *snapshot) Scan(start, limit []byte) store.Stream {
s.mu.Lock()
defer s.mu.Unlock()
if err := s.error(); err != nil {
return &store.InvalidStream{err}
}
- return newStream(s, start, end)
+ return newStream(s, start, limit)
}
diff --git a/services/syncbase/store/memstore/store.go b/services/syncbase/store/memstore/store.go
index e844b20..4aa7a6e 100644
--- a/services/syncbase/store/memstore/store.go
+++ b/services/syncbase/store/memstore/store.go
@@ -46,10 +46,10 @@
}
// Scan implements the store.StoreReader interface.
-func (st *memstore) Scan(start, end []byte) store.Stream {
+func (st *memstore) Scan(start, limit []byte) store.Stream {
st.mu.Lock()
defer st.mu.Unlock()
- return newStream(newSnapshot(st), start, end)
+ return newStream(newSnapshot(st), start, limit)
}
// Put implements the store.StoreWriter interface.
diff --git a/services/syncbase/store/memstore/store_test.go b/services/syncbase/store/memstore/store_test.go
index 4006ce7..4211225 100644
--- a/services/syncbase/store/memstore/store_test.go
+++ b/services/syncbase/store/memstore/store_test.go
@@ -24,12 +24,18 @@
runTest(t, test.RunSnapshotTest)
}
+// TODO(rogulenko): Enable this test once memstore.Close causes memstore to
+// disallow subsequent operations.
+/*
func TestStoreState(t *testing.T) {
- // TODO(rogulenko): Enable this test once memstore.Close causes memstore to
- // disallow subsequent operations.
- // runTest(t, test.RunStoreStateTest)
+ runTest(t, test.RunStoreStateTest)
}
+func TestClose(t *testing.T) {
+ runTest(t, test.RunCloseTest)
+}
+*/
+
func TestReadWriteBasic(t *testing.T) {
runTest(t, test.RunReadWriteBasicTest)
}
diff --git a/services/syncbase/store/memstore/transaction.go b/services/syncbase/store/memstore/transaction.go
index 56dcbdd..3298ff1 100644
--- a/services/syncbase/store/memstore/transaction.go
+++ b/services/syncbase/store/memstore/transaction.go
@@ -61,20 +61,6 @@
return nil
}
-// ResetForRetry implements the store.Transaction interface.
-func (tx *transaction) ResetForRetry() {
- tx.mu.Lock()
- defer tx.mu.Unlock()
- tx.puts = make(map[string][]byte)
- tx.deletes = make(map[string]struct{})
- tx.err = nil
- tx.st.mu.Lock()
- defer tx.st.mu.Unlock() // note, defer is last-in-first-out
- tx.st.lastSeq++
- tx.seq = tx.st.lastSeq
- tx.sn = newSnapshot(tx.st)
-}
-
// Get implements the store.StoreReader interface.
func (tx *transaction) Get(key, valbuf []byte) ([]byte, error) {
tx.mu.Lock()
@@ -86,13 +72,13 @@
}
// Scan implements the store.StoreReader interface.
-func (tx *transaction) Scan(start, end []byte) store.Stream {
+func (tx *transaction) Scan(start, limit []byte) store.Stream {
tx.mu.Lock()
defer tx.mu.Unlock()
if err := tx.error(); err != nil {
return &store.InvalidStream{err}
}
- return newStream(tx.sn, start, end)
+ return newStream(tx.sn, start, limit)
}
// Put implements the store.StoreWriter interface.
diff --git a/services/syncbase/store/model.go b/services/syncbase/store/model.go
index 8d85b2a..f772f40 100644
--- a/services/syncbase/store/model.go
+++ b/services/syncbase/store/model.go
@@ -67,10 +67,6 @@
// Abort aborts the transaction.
Abort() error
-
- // ResetForRetry resets the transaction. It's equivalent to aborting the
- // transaction and creating a new one, but more efficient.
- ResetForRetry()
}
// Snapshot is a handle to particular state in time of a Store.
diff --git a/services/syncbase/store/test/store.go b/services/syncbase/store/test/store.go
index 08b39b5..5ce2d9f 100644
--- a/services/syncbase/store/test/store.go
+++ b/services/syncbase/store/test/store.go
@@ -210,3 +210,45 @@
t.Fatalf("unexpected error: got %v, want %v", err, expectedErr)
}
}
+
+// RunCloseTest verifies that child objects are closed when the parent object is
+// closed.
+func RunCloseTest(t *testing.T, st store.Store) {
+ key1, value1 := []byte("key1"), []byte("value1")
+ st.Put(key1, value1)
+
+ var streams []store.Stream
+ var snapshots []store.Snapshot
+ var transactions []store.Transaction
+ // TODO(rogulenko): make multiple transactions.
+ tx := st.NewTransaction()
+ for i := 0; i < 10; i++ {
+ streams = append(streams, st.Scan([]byte("a"), []byte("z")))
+ snapshot := st.NewSnapshot()
+ for j := 0; j < 10; j++ {
+ streams = append(streams, snapshot.Scan([]byte("a"), []byte("z")))
+ streams = append(streams, tx.Scan([]byte("a"), []byte("z")))
+ }
+ snapshots = append(snapshots, snapshot)
+ transactions = append(transactions, tx)
+ }
+ st.Close()
+
+ for _, stream := range streams {
+ if got, want := stream.Err().Error(), "canceled stream"; !strings.Contains(got, want) {
+ t.Fatalf("unexpected error: got %v, want %v", got, want)
+ }
+ }
+ for _, snapshot := range snapshots {
+ _, err := snapshot.Get(key1, nil)
+ if got, want := err.Error(), "closed snapshot"; !strings.Contains(got, want) {
+ t.Fatalf("unexpected error: got %v, want %v", got, want)
+ }
+ }
+ for _, tx := range transactions {
+ _, err := tx.Get(key1, nil)
+ if got, want := err.Error(), "aborted transaction"; !strings.Contains(got, want) {
+ t.Fatalf("unexpected error: got %v, want %v", got, want)
+ }
+ }
+}