Adam Sadovsky | c18c8ca | 2015-05-08 18:05:46 -0700 | [diff] [blame] | 1 | // Copyright 2015 The Vanadium Authors. All rights reserved. |
| 2 | // Use of this source code is governed by a BSD-style |
| 3 | // license that can be found in the LICENSE file. |
| 4 | |
| 5 | package memstore |
| 6 | |
| 7 | import ( |
Adam Sadovsky | c18c8ca | 2015-05-08 18:05:46 -0700 | [diff] [blame] | 8 | "sync" |
| 9 | "time" |
| 10 | |
| 11 | "v.io/syncbase/x/ref/services/syncbase/store" |
Sergey Rogulenko | a53e60f | 2015-05-22 11:05:01 -0700 | [diff] [blame] | 12 | "v.io/v23/verror" |
Adam Sadovsky | c18c8ca | 2015-05-08 18:05:46 -0700 | [diff] [blame] | 13 | ) |
| 14 | |
Sergey Rogulenko | a53e60f | 2015-05-22 11:05:01 -0700 | [diff] [blame] | 15 | const ( |
| 16 | txnTimeout = time.Duration(5) * time.Second |
Adam Sadovsky | c18c8ca | 2015-05-08 18:05:46 -0700 | [diff] [blame] | 17 | ) |
| 18 | |
| 19 | type transaction struct { |
Sergey Rogulenko | 95baa66 | 2015-05-22 15:07:06 -0700 | [diff] [blame] | 20 | mu sync.Mutex |
| 21 | node *store.ResourceNode |
| 22 | st *memstore |
| 23 | sn *snapshot |
Adam Sadovsky | c18c8ca | 2015-05-08 18:05:46 -0700 | [diff] [blame] | 24 | // The following fields are used to determine whether method calls should |
| 25 | // error out. |
| 26 | err error |
| 27 | seq uint64 |
| 28 | createdTime time.Time |
| 29 | // The following fields track writes performed against this transaction. |
| 30 | puts map[string][]byte |
| 31 | deletes map[string]struct{} |
| 32 | } |
| 33 | |
| 34 | var _ store.Transaction = (*transaction)(nil) |
| 35 | |
Sergey Rogulenko | 95baa66 | 2015-05-22 15:07:06 -0700 | [diff] [blame] | 36 | func newTransaction(st *memstore, parent *store.ResourceNode, seq uint64) *transaction { |
| 37 | node := store.NewResourceNode() |
| 38 | sn := newSnapshot(st, node) |
| 39 | tx := &transaction{ |
| 40 | node: node, |
Adam Sadovsky | c18c8ca | 2015-05-08 18:05:46 -0700 | [diff] [blame] | 41 | st: st, |
Sergey Rogulenko | 95baa66 | 2015-05-22 15:07:06 -0700 | [diff] [blame] | 42 | sn: sn, |
Adam Sadovsky | c18c8ca | 2015-05-08 18:05:46 -0700 | [diff] [blame] | 43 | seq: seq, |
| 44 | createdTime: time.Now(), |
| 45 | puts: map[string][]byte{}, |
| 46 | deletes: map[string]struct{}{}, |
| 47 | } |
Sergey Rogulenko | 95baa66 | 2015-05-22 15:07:06 -0700 | [diff] [blame] | 48 | parent.AddChild(tx.node, func() { |
| 49 | tx.Abort() |
| 50 | }) |
| 51 | return tx |
Adam Sadovsky | c18c8ca | 2015-05-08 18:05:46 -0700 | [diff] [blame] | 52 | } |
| 53 | |
| 54 | func (tx *transaction) expired() bool { |
| 55 | return time.Now().After(tx.createdTime.Add(txnTimeout)) |
| 56 | } |
| 57 | |
| 58 | func (tx *transaction) error() error { |
| 59 | if tx.err != nil { |
Sergey Rogulenko | a53e60f | 2015-05-22 11:05:01 -0700 | [diff] [blame] | 60 | return store.WrapError(tx.err) |
Adam Sadovsky | c18c8ca | 2015-05-08 18:05:46 -0700 | [diff] [blame] | 61 | } |
| 62 | if tx.expired() { |
Sergey Rogulenko | a53e60f | 2015-05-22 11:05:01 -0700 | [diff] [blame] | 63 | return verror.New(verror.ErrBadState, nil, "expired transaction") |
Adam Sadovsky | c18c8ca | 2015-05-08 18:05:46 -0700 | [diff] [blame] | 64 | } |
| 65 | return nil |
| 66 | } |
| 67 | |
Adam Sadovsky | c18c8ca | 2015-05-08 18:05:46 -0700 | [diff] [blame] | 68 | // Get implements the store.StoreReader interface. |
| 69 | func (tx *transaction) Get(key, valbuf []byte) ([]byte, error) { |
| 70 | tx.mu.Lock() |
| 71 | defer tx.mu.Unlock() |
| 72 | if err := tx.error(); err != nil { |
Sergey Rogulenko | 0dbfe07 | 2015-05-19 20:10:18 -0700 | [diff] [blame] | 73 | return valbuf, err |
Adam Sadovsky | c18c8ca | 2015-05-08 18:05:46 -0700 | [diff] [blame] | 74 | } |
| 75 | return tx.sn.Get(key, valbuf) |
| 76 | } |
| 77 | |
| 78 | // Scan implements the store.StoreReader interface. |
Sergey Rogulenko | def3b30 | 2015-05-20 17:33:24 -0700 | [diff] [blame] | 79 | func (tx *transaction) Scan(start, limit []byte) store.Stream { |
Adam Sadovsky | c18c8ca | 2015-05-08 18:05:46 -0700 | [diff] [blame] | 80 | tx.mu.Lock() |
| 81 | defer tx.mu.Unlock() |
| 82 | if err := tx.error(); err != nil { |
Sergey Rogulenko | 0dbfe07 | 2015-05-19 20:10:18 -0700 | [diff] [blame] | 83 | return &store.InvalidStream{err} |
Adam Sadovsky | c18c8ca | 2015-05-08 18:05:46 -0700 | [diff] [blame] | 84 | } |
Sergey Rogulenko | 95baa66 | 2015-05-22 15:07:06 -0700 | [diff] [blame] | 85 | return tx.sn.Scan(start, limit) |
Adam Sadovsky | c18c8ca | 2015-05-08 18:05:46 -0700 | [diff] [blame] | 86 | } |
| 87 | |
| 88 | // Put implements the store.StoreWriter interface. |
| 89 | func (tx *transaction) Put(key, value []byte) error { |
| 90 | tx.st.mu.Lock() |
| 91 | defer tx.st.mu.Unlock() |
| 92 | if err := tx.error(); err != nil { |
| 93 | return err |
| 94 | } |
| 95 | delete(tx.deletes, string(key)) |
| 96 | tx.puts[string(key)] = value |
| 97 | return nil |
| 98 | } |
| 99 | |
| 100 | // Delete implements the store.StoreWriter interface. |
| 101 | func (tx *transaction) Delete(key []byte) error { |
| 102 | tx.st.mu.Lock() |
| 103 | defer tx.st.mu.Unlock() |
| 104 | if err := tx.error(); err != nil { |
| 105 | return err |
| 106 | } |
| 107 | delete(tx.puts, string(key)) |
| 108 | tx.deletes[string(key)] = struct{}{} |
| 109 | return nil |
| 110 | } |
| 111 | |
| 112 | // Commit implements the store.Transaction interface. |
| 113 | func (tx *transaction) Commit() error { |
| 114 | tx.mu.Lock() |
| 115 | defer tx.mu.Unlock() |
| 116 | if err := tx.error(); err != nil { |
| 117 | return err |
| 118 | } |
Sergey Rogulenko | 95baa66 | 2015-05-22 15:07:06 -0700 | [diff] [blame] | 119 | tx.node.Close() |
Adam Sadovsky | c18c8ca | 2015-05-08 18:05:46 -0700 | [diff] [blame] | 120 | tx.st.mu.Lock() |
| 121 | defer tx.st.mu.Unlock() // note, defer is last-in-first-out |
| 122 | if tx.seq <= tx.st.lastCommitSeq { |
| 123 | // Once Commit() has failed with store.ErrConcurrentTransaction, subsequent |
Sergey Rogulenko | a53e60f | 2015-05-22 11:05:01 -0700 | [diff] [blame] | 124 | // ops on the transaction will fail with the following error. |
| 125 | tx.err = verror.New(verror.ErrBadState, nil, "already attempted to commit transaction") |
| 126 | return store.NewErrConcurrentTransaction(nil) |
Adam Sadovsky | c18c8ca | 2015-05-08 18:05:46 -0700 | [diff] [blame] | 127 | } |
Sergey Rogulenko | a53e60f | 2015-05-22 11:05:01 -0700 | [diff] [blame] | 128 | tx.err = verror.New(verror.ErrBadState, nil, "committed transaction") |
Adam Sadovsky | c18c8ca | 2015-05-08 18:05:46 -0700 | [diff] [blame] | 129 | for k, v := range tx.puts { |
| 130 | tx.st.data[k] = v |
| 131 | } |
| 132 | for k := range tx.deletes { |
| 133 | delete(tx.st.data, k) |
| 134 | } |
| 135 | tx.st.lastCommitSeq = tx.st.lastSeq |
| 136 | return nil |
| 137 | } |
| 138 | |
| 139 | // Abort implements the store.Transaction interface. |
| 140 | func (tx *transaction) Abort() error { |
| 141 | tx.mu.Lock() |
| 142 | defer tx.mu.Unlock() |
| 143 | if err := tx.error(); err != nil { |
| 144 | return err |
| 145 | } |
Sergey Rogulenko | 95baa66 | 2015-05-22 15:07:06 -0700 | [diff] [blame] | 146 | tx.node.Close() |
Sergey Rogulenko | a53e60f | 2015-05-22 11:05:01 -0700 | [diff] [blame] | 147 | tx.err = verror.New(verror.ErrCanceled, nil, "aborted transaction") |
Adam Sadovsky | c18c8ca | 2015-05-08 18:05:46 -0700 | [diff] [blame] | 148 | return nil |
| 149 | } |