Sergey Rogulenko | b0081cf | 2015-05-05 22:39:37 -0700 | [diff] [blame] | 1 | // Copyright 2015 The Vanadium Authors. All rights reserved. |
| 2 | // Use of this source code is governed by a BSD-style |
| 3 | // license that can be found in the LICENSE file. |
| 4 | |
Sergey Rogulenko | 054b4db | 2015-08-20 11:06:52 -0700 | [diff] [blame] | 5 | package transactions |
Sergey Rogulenko | b0081cf | 2015-05-05 22:39:37 -0700 | [diff] [blame] | 6 | |
Sergey Rogulenko | b0081cf | 2015-05-05 22:39:37 -0700 | [diff] [blame] | 7 | import ( |
Raja Daoud | a3e4230 | 2015-06-10 18:49:02 -0700 | [diff] [blame] | 8 | "bytes" |
Sergey Rogulenko | 8a1ae3a | 2015-05-29 17:13:44 -0700 | [diff] [blame] | 9 | "container/list" |
Sergey Rogulenko | 0dbfe07 | 2015-05-19 20:10:18 -0700 | [diff] [blame] | 10 | "sync" |
| 11 | |
Sergey Rogulenko | a53e60f | 2015-05-22 11:05:01 -0700 | [diff] [blame] | 12 | "v.io/v23/verror" |
Adam Sadovsky | b5f8892 | 2015-07-26 17:41:37 -0700 | [diff] [blame] | 13 | "v.io/x/lib/vlog" |
Adam Sadovsky | f2efeb5 | 2015-08-31 14:17:49 -0700 | [diff] [blame] | 14 | "v.io/x/ref/services/syncbase/store" |
Sergey Rogulenko | b0081cf | 2015-05-05 22:39:37 -0700 | [diff] [blame] | 15 | ) |
| 16 | |
Sergey Rogulenko | 054b4db | 2015-08-20 11:06:52 -0700 | [diff] [blame] | 17 | // transaction is a wrapper on top of a BatchWriter and a store.Snapshot that |
| 18 | // implements the store.Transaction interface. |
Sergey Rogulenko | b0081cf | 2015-05-05 22:39:37 -0700 | [diff] [blame] | 19 | type transaction struct { |
Sergey Rogulenko | a53e60f | 2015-05-22 11:05:01 -0700 | [diff] [blame] | 20 | // mu protects the state of the transaction. |
Sergey Rogulenko | 0dbfe07 | 2015-05-19 20:10:18 -0700 | [diff] [blame] | 21 | mu sync.Mutex |
Sergey Rogulenko | 054b4db | 2015-08-20 11:06:52 -0700 | [diff] [blame] | 22 | mg *manager |
Sergey Rogulenko | 8a1ae3a | 2015-05-29 17:13:44 -0700 | [diff] [blame] | 23 | seq uint64 |
Sergey Rogulenko | 7f77da0 | 2015-09-25 18:28:51 -0700 | [diff] [blame] | 24 | event *list.Element // pointer to element of mg.events |
Sergey Rogulenko | b0081cf | 2015-05-05 22:39:37 -0700 | [diff] [blame] | 25 | snapshot store.Snapshot |
Sergey Rogulenko | 054b4db | 2015-08-20 11:06:52 -0700 | [diff] [blame] | 26 | reads readSet |
| 27 | writes []WriteOp |
Sergey Rogulenko | 0dbfe07 | 2015-05-19 20:10:18 -0700 | [diff] [blame] | 28 | err error |
Sergey Rogulenko | b0081cf | 2015-05-05 22:39:37 -0700 | [diff] [blame] | 29 | } |
| 30 | |
| 31 | var _ store.Transaction = (*transaction)(nil) |
| 32 | |
Sergey Rogulenko | 054b4db | 2015-08-20 11:06:52 -0700 | [diff] [blame] | 33 | func newTransaction(mg *manager) *transaction { |
Sergey Rogulenko | def3b30 | 2015-05-20 17:33:24 -0700 | [diff] [blame] | 34 | tx := &transaction{ |
Sergey Rogulenko | 054b4db | 2015-08-20 11:06:52 -0700 | [diff] [blame] | 35 | mg: mg, |
| 36 | snapshot: mg.BatchStore.NewSnapshot(), |
| 37 | seq: mg.seq, |
Sergey Rogulenko | b0081cf | 2015-05-05 22:39:37 -0700 | [diff] [blame] | 38 | } |
Sergey Rogulenko | 054b4db | 2015-08-20 11:06:52 -0700 | [diff] [blame] | 39 | tx.event = mg.events.PushFront(tx) |
Sergey Rogulenko | def3b30 | 2015-05-20 17:33:24 -0700 | [diff] [blame] | 40 | return tx |
Sergey Rogulenko | b0081cf | 2015-05-05 22:39:37 -0700 | [diff] [blame] | 41 | } |
| 42 | |
Sergey Rogulenko | 054b4db | 2015-08-20 11:06:52 -0700 | [diff] [blame] | 43 | // removeEvent removes this transaction from the mg.events queue. |
Sergey Rogulenko | 7f77da0 | 2015-09-25 18:28:51 -0700 | [diff] [blame] | 44 | // Assumes mu and mg.mu are held. |
Sergey Rogulenko | 8a1ae3a | 2015-05-29 17:13:44 -0700 | [diff] [blame] | 45 | func (tx *transaction) removeEvent() { |
Sergey Rogulenko | 7f77da0 | 2015-09-25 18:28:51 -0700 | [diff] [blame] | 46 | if tx.event != nil { |
| 47 | tx.mg.events.Remove(tx.event) |
| 48 | tx.event = nil |
Sergey Rogulenko | 8a1ae3a | 2015-05-29 17:13:44 -0700 | [diff] [blame] | 49 | } |
Sergey Rogulenko | 8a1ae3a | 2015-05-29 17:13:44 -0700 | [diff] [blame] | 50 | } |
| 51 | |
Sergey Rogulenko | b0081cf | 2015-05-05 22:39:37 -0700 | [diff] [blame] | 52 | // Get implements the store.StoreReader interface. |
Sergey Rogulenko | 802fe1e | 2015-05-08 12:51:22 -0700 | [diff] [blame] | 53 | func (tx *transaction) Get(key, valbuf []byte) ([]byte, error) { |
Sergey Rogulenko | 0dbfe07 | 2015-05-19 20:10:18 -0700 | [diff] [blame] | 54 | tx.mu.Lock() |
| 55 | defer tx.mu.Unlock() |
| 56 | if tx.err != nil { |
Sergey Rogulenko | 054b4db | 2015-08-20 11:06:52 -0700 | [diff] [blame] | 57 | return valbuf, store.ConvertError(tx.err) |
Sergey Rogulenko | 0dbfe07 | 2015-05-19 20:10:18 -0700 | [diff] [blame] | 58 | } |
John Kline | 18834bd | 2015-06-26 10:07:46 -0700 | [diff] [blame] | 59 | tx.reads.Keys = append(tx.reads.Keys, key) |
Raja Daoud | a3e4230 | 2015-06-10 18:49:02 -0700 | [diff] [blame] | 60 | |
| 61 | // Reflect the state of the transaction: the "writes" (puts and |
| 62 | // deletes) override the values in the transaction snapshot. |
| 63 | // Find the last "writes" entry for this key, if one exists. |
| 64 | // Note: this step could be optimized by using maps (puts and |
| 65 | // deletes) instead of an array. |
| 66 | for i := len(tx.writes) - 1; i >= 0; i-- { |
| 67 | op := &tx.writes[i] |
John Kline | 18834bd | 2015-06-26 10:07:46 -0700 | [diff] [blame] | 68 | if bytes.Equal(op.Key, key) { |
Sergey Rogulenko | 054b4db | 2015-08-20 11:06:52 -0700 | [diff] [blame] | 69 | if op.T == PutOp { |
John Kline | 18834bd | 2015-06-26 10:07:46 -0700 | [diff] [blame] | 70 | return op.Value, nil |
Raja Daoud | a3e4230 | 2015-06-10 18:49:02 -0700 | [diff] [blame] | 71 | } |
| 72 | return valbuf, verror.New(store.ErrUnknownKey, nil, string(key)) |
| 73 | } |
| 74 | } |
| 75 | |
Sergey Rogulenko | 802fe1e | 2015-05-08 12:51:22 -0700 | [diff] [blame] | 76 | return tx.snapshot.Get(key, valbuf) |
Sergey Rogulenko | b0081cf | 2015-05-05 22:39:37 -0700 | [diff] [blame] | 77 | } |
| 78 | |
Adam Sadovsky | c18c8ca | 2015-05-08 18:05:46 -0700 | [diff] [blame] | 79 | // Scan implements the store.StoreReader interface. |
Adam Sadovsky | f437f33 | 2015-05-19 23:03:22 -0700 | [diff] [blame] | 80 | func (tx *transaction) Scan(start, limit []byte) store.Stream { |
Sergey Rogulenko | 0dbfe07 | 2015-05-19 20:10:18 -0700 | [diff] [blame] | 81 | tx.mu.Lock() |
| 82 | defer tx.mu.Unlock() |
| 83 | if tx.err != nil { |
Jiri Simsa | d88e9ad | 2015-08-14 10:12:27 -0700 | [diff] [blame] | 84 | return &store.InvalidStream{Error: tx.err} |
Sergey Rogulenko | 0dbfe07 | 2015-05-19 20:10:18 -0700 | [diff] [blame] | 85 | } |
John Kline | 18834bd | 2015-06-26 10:07:46 -0700 | [diff] [blame] | 86 | |
Sergey Rogulenko | 054b4db | 2015-08-20 11:06:52 -0700 | [diff] [blame] | 87 | tx.reads.Ranges = append(tx.reads.Ranges, scanRange{ |
John Kline | 18834bd | 2015-06-26 10:07:46 -0700 | [diff] [blame] | 88 | Start: start, |
| 89 | Limit: limit, |
Sergey Rogulenko | 8a1ae3a | 2015-05-29 17:13:44 -0700 | [diff] [blame] | 90 | }) |
John Kline | 18834bd | 2015-06-26 10:07:46 -0700 | [diff] [blame] | 91 | |
| 92 | // Return a stream which merges the snaphot stream with the uncommitted changes. |
Sergey Rogulenko | 054b4db | 2015-08-20 11:06:52 -0700 | [diff] [blame] | 93 | return mergeWritesWithStream(tx.snapshot, tx.writes, start, limit) |
Adam Sadovsky | c18c8ca | 2015-05-08 18:05:46 -0700 | [diff] [blame] | 94 | } |
| 95 | |
Sergey Rogulenko | b0081cf | 2015-05-05 22:39:37 -0700 | [diff] [blame] | 96 | // Put implements the store.StoreWriter interface. |
Sergey Rogulenko | 802fe1e | 2015-05-08 12:51:22 -0700 | [diff] [blame] | 97 | func (tx *transaction) Put(key, value []byte) error { |
Sergey Rogulenko | 0dbfe07 | 2015-05-19 20:10:18 -0700 | [diff] [blame] | 98 | tx.mu.Lock() |
| 99 | defer tx.mu.Unlock() |
| 100 | if tx.err != nil { |
Sergey Rogulenko | 054b4db | 2015-08-20 11:06:52 -0700 | [diff] [blame] | 101 | return store.ConvertError(tx.err) |
Sergey Rogulenko | 0dbfe07 | 2015-05-19 20:10:18 -0700 | [diff] [blame] | 102 | } |
Sergey Rogulenko | 054b4db | 2015-08-20 11:06:52 -0700 | [diff] [blame] | 103 | tx.writes = append(tx.writes, WriteOp{ |
| 104 | T: PutOp, |
John Kline | 18834bd | 2015-06-26 10:07:46 -0700 | [diff] [blame] | 105 | Key: key, |
| 106 | Value: value, |
Sergey Rogulenko | 8a1ae3a | 2015-05-29 17:13:44 -0700 | [diff] [blame] | 107 | }) |
Sergey Rogulenko | b0081cf | 2015-05-05 22:39:37 -0700 | [diff] [blame] | 108 | return nil |
| 109 | } |
| 110 | |
| 111 | // Delete implements the store.StoreWriter interface. |
Sergey Rogulenko | 802fe1e | 2015-05-08 12:51:22 -0700 | [diff] [blame] | 112 | func (tx *transaction) Delete(key []byte) error { |
Sergey Rogulenko | 0dbfe07 | 2015-05-19 20:10:18 -0700 | [diff] [blame] | 113 | tx.mu.Lock() |
| 114 | defer tx.mu.Unlock() |
| 115 | if tx.err != nil { |
Sergey Rogulenko | 054b4db | 2015-08-20 11:06:52 -0700 | [diff] [blame] | 116 | return store.ConvertError(tx.err) |
Sergey Rogulenko | 0dbfe07 | 2015-05-19 20:10:18 -0700 | [diff] [blame] | 117 | } |
Sergey Rogulenko | 054b4db | 2015-08-20 11:06:52 -0700 | [diff] [blame] | 118 | tx.writes = append(tx.writes, WriteOp{ |
| 119 | T: DeleteOp, |
John Kline | 18834bd | 2015-06-26 10:07:46 -0700 | [diff] [blame] | 120 | Key: key, |
Sergey Rogulenko | 8a1ae3a | 2015-05-29 17:13:44 -0700 | [diff] [blame] | 121 | }) |
Sergey Rogulenko | b0081cf | 2015-05-05 22:39:37 -0700 | [diff] [blame] | 122 | return nil |
| 123 | } |
Adam Sadovsky | c18c8ca | 2015-05-08 18:05:46 -0700 | [diff] [blame] | 124 | |
Sergey Rogulenko | 8a1ae3a | 2015-05-29 17:13:44 -0700 | [diff] [blame] | 125 | // validateReadSet returns true iff the read set of this transaction has not |
| 126 | // been invalidated by other transactions. |
Sergey Rogulenko | 054b4db | 2015-08-20 11:06:52 -0700 | [diff] [blame] | 127 | // Assumes tx.mg.mu is held. |
Sergey Rogulenko | 8a1ae3a | 2015-05-29 17:13:44 -0700 | [diff] [blame] | 128 | func (tx *transaction) validateReadSet() bool { |
John Kline | 18834bd | 2015-06-26 10:07:46 -0700 | [diff] [blame] | 129 | for _, key := range tx.reads.Keys { |
Sergey Rogulenko | 054b4db | 2015-08-20 11:06:52 -0700 | [diff] [blame] | 130 | if tx.mg.txTable.get(key) > tx.seq { |
Adam Sadovsky | b5f8892 | 2015-07-26 17:41:37 -0700 | [diff] [blame] | 131 | vlog.VI(3).Infof("key conflict: %q", key) |
Sergey Rogulenko | 8a1ae3a | 2015-05-29 17:13:44 -0700 | [diff] [blame] | 132 | return false |
| 133 | } |
| 134 | } |
John Kline | 18834bd | 2015-06-26 10:07:46 -0700 | [diff] [blame] | 135 | for _, r := range tx.reads.Ranges { |
Sergey Rogulenko | 054b4db | 2015-08-20 11:06:52 -0700 | [diff] [blame] | 136 | if tx.mg.txTable.rangeMax(r.Start, r.Limit) > tx.seq { |
Adam Sadovsky | b5f8892 | 2015-07-26 17:41:37 -0700 | [diff] [blame] | 137 | vlog.VI(3).Infof("range conflict: {%q, %q}", r.Start, r.Limit) |
Sergey Rogulenko | 8a1ae3a | 2015-05-29 17:13:44 -0700 | [diff] [blame] | 138 | return false |
| 139 | } |
| 140 | |
| 141 | } |
| 142 | return true |
| 143 | } |
| 144 | |
Adam Sadovsky | c18c8ca | 2015-05-08 18:05:46 -0700 | [diff] [blame] | 145 | // Commit implements the store.Transaction interface. |
| 146 | func (tx *transaction) Commit() error { |
Sergey Rogulenko | 0dbfe07 | 2015-05-19 20:10:18 -0700 | [diff] [blame] | 147 | tx.mu.Lock() |
| 148 | defer tx.mu.Unlock() |
| 149 | if tx.err != nil { |
Sergey Rogulenko | 054b4db | 2015-08-20 11:06:52 -0700 | [diff] [blame] | 150 | return store.ConvertError(tx.err) |
Sergey Rogulenko | 0dbfe07 | 2015-05-19 20:10:18 -0700 | [diff] [blame] | 151 | } |
Adam Sadovsky | 8db7443 | 2015-05-29 17:37:32 -0700 | [diff] [blame] | 152 | tx.err = verror.New(verror.ErrBadState, nil, store.ErrMsgCommittedTxn) |
Sergey Rogulenko | 7f77da0 | 2015-09-25 18:28:51 -0700 | [diff] [blame] | 153 | tx.snapshot.Abort() |
Sergey Rogulenko | 054b4db | 2015-08-20 11:06:52 -0700 | [diff] [blame] | 154 | tx.mg.mu.Lock() |
| 155 | defer tx.mg.mu.Unlock() |
Sergey Rogulenko | 7f77da0 | 2015-09-25 18:28:51 -0700 | [diff] [blame] | 156 | if tx.mg.txTable == nil { |
| 157 | return verror.New(verror.ErrCanceled, nil, store.ErrMsgClosedStore) |
| 158 | } |
| 159 | // Explicitly remove this transaction from the event queue. If this was the |
| 160 | // only active transaction, the event queue becomes empty and trackBatch will |
| 161 | // not add this transaction's write set to txTable. |
| 162 | tx.removeEvent() |
Sergey Rogulenko | 8a1ae3a | 2015-05-29 17:13:44 -0700 | [diff] [blame] | 163 | if !tx.validateReadSet() { |
| 164 | return store.NewErrConcurrentTransaction(nil) |
| 165 | } |
Sergey Rogulenko | 054b4db | 2015-08-20 11:06:52 -0700 | [diff] [blame] | 166 | if err := tx.mg.BatchStore.WriteBatch(tx.writes...); err != nil { |
| 167 | return err |
| 168 | } |
| 169 | tx.mg.trackBatch(tx.writes...) |
| 170 | return nil |
Adam Sadovsky | c18c8ca | 2015-05-08 18:05:46 -0700 | [diff] [blame] | 171 | } |
| 172 | |
| 173 | // Abort implements the store.Transaction interface. |
| 174 | func (tx *transaction) Abort() error { |
Sergey Rogulenko | 0dbfe07 | 2015-05-19 20:10:18 -0700 | [diff] [blame] | 175 | tx.mu.Lock() |
| 176 | defer tx.mu.Unlock() |
Sergey Rogulenko | a53e60f | 2015-05-22 11:05:01 -0700 | [diff] [blame] | 177 | if tx.err != nil { |
Sergey Rogulenko | 054b4db | 2015-08-20 11:06:52 -0700 | [diff] [blame] | 178 | return store.ConvertError(tx.err) |
Sergey Rogulenko | 0dbfe07 | 2015-05-19 20:10:18 -0700 | [diff] [blame] | 179 | } |
Adam Sadovsky | 8db7443 | 2015-05-29 17:37:32 -0700 | [diff] [blame] | 180 | tx.err = verror.New(verror.ErrCanceled, nil, store.ErrMsgAbortedTxn) |
Sergey Rogulenko | 7f77da0 | 2015-09-25 18:28:51 -0700 | [diff] [blame] | 181 | tx.snapshot.Abort() |
| 182 | tx.mg.mu.Lock() |
| 183 | tx.removeEvent() |
| 184 | tx.mg.mu.Unlock() |
Adam Sadovsky | c18c8ca | 2015-05-08 18:05:46 -0700 | [diff] [blame] | 185 | return nil |
| 186 | } |