Adam Sadovsky | 8db7443 | 2015-05-29 17:37:32 -0700 | [diff] [blame] | 1 | // Copyright 2015 The Vanadium Authors. All rights reserved. |
| 2 | // Use of this source code is governed by a BSD-style |
| 3 | // license that can be found in the LICENSE file. |
| 4 | |
| 5 | package watchable |
| 6 | |
| 7 | import ( |
| 8 | "fmt" |
Adam Sadovsky | b2a6569 | 2015-05-29 21:57:40 -0700 | [diff] [blame] | 9 | "math" |
Adam Sadovsky | 8db7443 | 2015-05-29 17:37:32 -0700 | [diff] [blame] | 10 | "sync" |
Raja Daoud | 5285136 | 2015-09-14 15:50:40 -0700 | [diff] [blame] | 11 | "time" |
Adam Sadovsky | 8db7443 | 2015-05-29 17:37:32 -0700 | [diff] [blame] | 12 | |
Raja Daoud | d454307 | 2015-06-30 11:15:55 -0700 | [diff] [blame] | 13 | "v.io/v23/context" |
Adam Sadovsky | 8db7443 | 2015-05-29 17:37:32 -0700 | [diff] [blame] | 14 | "v.io/v23/verror" |
Adam Sadovsky | f2efeb5 | 2015-08-31 14:17:49 -0700 | [diff] [blame] | 15 | "v.io/x/ref/services/syncbase/server/util" |
| 16 | "v.io/x/ref/services/syncbase/store" |
Adam Sadovsky | 8db7443 | 2015-05-29 17:37:32 -0700 | [diff] [blame] | 17 | ) |
| 18 | |
| 19 | type transaction struct { |
| 20 | itx store.Transaction |
| 21 | st *wstore |
| 22 | mu sync.Mutex // protects the fields below |
| 23 | err error |
| 24 | ops []Op |
Raja Daoud | d454307 | 2015-06-30 11:15:55 -0700 | [diff] [blame] | 25 | // fromSync is true when a transaction is created by sync. This causes |
| 26 | // the log entries written at commit time to have their "FromSync" field |
| 27 | // set to true. That in turn causes the sync watcher to filter out such |
| 28 | // updates since sync already knows about them (echo suppression). |
| 29 | fromSync bool |
Adam Sadovsky | 8db7443 | 2015-05-29 17:37:32 -0700 | [diff] [blame] | 30 | } |
| 31 | |
| 32 | var _ store.Transaction = (*transaction)(nil) |
| 33 | |
Adam Sadovsky | 4dcc353 | 2015-07-31 13:54:19 -0700 | [diff] [blame] | 34 | func cp(src []byte) []byte { |
| 35 | dst := make([]byte, len(src)) |
| 36 | for i := 0; i < len(src); i++ { |
| 37 | dst[i] = src[i] |
| 38 | } |
| 39 | return dst |
| 40 | } |
| 41 | |
| 42 | func cpStrings(src []string) []string { |
| 43 | dst := make([]string, len(src)) |
| 44 | for i := 0; i < len(src); i++ { |
| 45 | dst[i] = src[i] |
| 46 | } |
| 47 | return dst |
| 48 | } |
| 49 | |
Adam Sadovsky | 8db7443 | 2015-05-29 17:37:32 -0700 | [diff] [blame] | 50 | func newTransaction(st *wstore) *transaction { |
| 51 | return &transaction{ |
| 52 | itx: st.ist.NewTransaction(), |
| 53 | st: st, |
| 54 | } |
| 55 | } |
| 56 | |
| 57 | // Get implements the store.StoreReader interface. |
| 58 | func (tx *transaction) Get(key, valbuf []byte) ([]byte, error) { |
| 59 | tx.mu.Lock() |
| 60 | defer tx.mu.Unlock() |
| 61 | if tx.err != nil { |
Adam Sadovsky | a3fc33c | 2015-06-02 18:44:46 -0700 | [diff] [blame] | 62 | return valbuf, convertError(tx.err) |
Adam Sadovsky | 8db7443 | 2015-05-29 17:37:32 -0700 | [diff] [blame] | 63 | } |
| 64 | var err error |
| 65 | if !tx.st.managesKey(key) { |
| 66 | valbuf, err = tx.itx.Get(key, valbuf) |
| 67 | } else { |
| 68 | valbuf, err = getVersioned(tx.itx, key, valbuf) |
Adam Sadovsky | 4dcc353 | 2015-07-31 13:54:19 -0700 | [diff] [blame] | 69 | tx.ops = append(tx.ops, &OpGet{GetOp{Key: cp(key)}}) |
Adam Sadovsky | 8db7443 | 2015-05-29 17:37:32 -0700 | [diff] [blame] | 70 | } |
| 71 | return valbuf, err |
| 72 | } |
| 73 | |
| 74 | // Scan implements the store.StoreReader interface. |
| 75 | func (tx *transaction) Scan(start, limit []byte) store.Stream { |
| 76 | tx.mu.Lock() |
| 77 | defer tx.mu.Unlock() |
| 78 | if tx.err != nil { |
Jiri Simsa | d88e9ad | 2015-08-14 10:12:27 -0700 | [diff] [blame] | 79 | return &store.InvalidStream{Error: tx.err} |
Adam Sadovsky | 8db7443 | 2015-05-29 17:37:32 -0700 | [diff] [blame] | 80 | } |
| 81 | var it store.Stream |
| 82 | if !tx.st.managesRange(start, limit) { |
| 83 | it = tx.itx.Scan(start, limit) |
| 84 | } else { |
| 85 | it = newStreamVersioned(tx.itx, start, limit) |
Adam Sadovsky | 4dcc353 | 2015-07-31 13:54:19 -0700 | [diff] [blame] | 86 | tx.ops = append(tx.ops, &OpScan{ScanOp{Start: cp(start), Limit: cp(limit)}}) |
Adam Sadovsky | 8db7443 | 2015-05-29 17:37:32 -0700 | [diff] [blame] | 87 | } |
| 88 | return it |
| 89 | } |
| 90 | |
| 91 | // Put implements the store.StoreWriter interface. |
| 92 | func (tx *transaction) Put(key, value []byte) error { |
| 93 | tx.mu.Lock() |
| 94 | defer tx.mu.Unlock() |
| 95 | if tx.err != nil { |
Adam Sadovsky | a3fc33c | 2015-06-02 18:44:46 -0700 | [diff] [blame] | 96 | return convertError(tx.err) |
Adam Sadovsky | 8db7443 | 2015-05-29 17:37:32 -0700 | [diff] [blame] | 97 | } |
Adam Sadovsky | 8db7443 | 2015-05-29 17:37:32 -0700 | [diff] [blame] | 98 | if !tx.st.managesKey(key) { |
Raja Daoud | cb50b5d | 2015-06-26 18:37:24 -0700 | [diff] [blame] | 99 | return tx.itx.Put(key, value) |
Adam Sadovsky | 8db7443 | 2015-05-29 17:37:32 -0700 | [diff] [blame] | 100 | } |
Raja Daoud | cb50b5d | 2015-06-26 18:37:24 -0700 | [diff] [blame] | 101 | version, err := putVersioned(tx.itx, key, value) |
| 102 | if err != nil { |
| 103 | return err |
| 104 | } |
Adam Sadovsky | 4dcc353 | 2015-07-31 13:54:19 -0700 | [diff] [blame] | 105 | tx.ops = append(tx.ops, &OpPut{PutOp{Key: cp(key), Version: version}}) |
Raja Daoud | cb50b5d | 2015-06-26 18:37:24 -0700 | [diff] [blame] | 106 | return nil |
Adam Sadovsky | 8db7443 | 2015-05-29 17:37:32 -0700 | [diff] [blame] | 107 | } |
| 108 | |
| 109 | // Delete implements the store.StoreWriter interface. |
| 110 | func (tx *transaction) Delete(key []byte) error { |
| 111 | tx.mu.Lock() |
| 112 | defer tx.mu.Unlock() |
| 113 | if tx.err != nil { |
Adam Sadovsky | a3fc33c | 2015-06-02 18:44:46 -0700 | [diff] [blame] | 114 | return convertError(tx.err) |
Adam Sadovsky | 8db7443 | 2015-05-29 17:37:32 -0700 | [diff] [blame] | 115 | } |
| 116 | var err error |
| 117 | if !tx.st.managesKey(key) { |
Adam Sadovsky | a31a7cd | 2015-07-08 10:44:07 -0700 | [diff] [blame] | 118 | return tx.itx.Delete(key) |
Adam Sadovsky | 8db7443 | 2015-05-29 17:37:32 -0700 | [diff] [blame] | 119 | } |
Adam Sadovsky | a31a7cd | 2015-07-08 10:44:07 -0700 | [diff] [blame] | 120 | err = deleteVersioned(tx.itx, key) |
| 121 | if err != nil { |
| 122 | return err |
| 123 | } |
Adam Sadovsky | 4dcc353 | 2015-07-31 13:54:19 -0700 | [diff] [blame] | 124 | tx.ops = append(tx.ops, &OpDelete{DeleteOp{Key: cp(key)}}) |
Adam Sadovsky | a31a7cd | 2015-07-08 10:44:07 -0700 | [diff] [blame] | 125 | return nil |
Adam Sadovsky | 8db7443 | 2015-05-29 17:37:32 -0700 | [diff] [blame] | 126 | } |
| 127 | |
| 128 | // Commit implements the store.Transaction interface. |
| 129 | func (tx *transaction) Commit() error { |
| 130 | tx.mu.Lock() |
| 131 | defer tx.mu.Unlock() |
| 132 | if tx.err != nil { |
Adam Sadovsky | a3fc33c | 2015-06-02 18:44:46 -0700 | [diff] [blame] | 133 | return convertError(tx.err) |
Adam Sadovsky | 8db7443 | 2015-05-29 17:37:32 -0700 | [diff] [blame] | 134 | } |
| 135 | tx.err = verror.New(verror.ErrBadState, nil, store.ErrMsgCommittedTxn) |
| 136 | tx.st.mu.Lock() |
| 137 | defer tx.st.mu.Unlock() |
Raja Daoud | d454307 | 2015-06-30 11:15:55 -0700 | [diff] [blame] | 138 | // Check if there is enough space left in the sequence number. |
| 139 | if (math.MaxUint64 - tx.st.seq) < uint64(len(tx.ops)) { |
Adam Sadovsky | 8db7443 | 2015-05-29 17:37:32 -0700 | [diff] [blame] | 140 | return verror.New(verror.ErrInternal, nil, "seq maxed out") |
| 141 | } |
| 142 | // Write LogEntry records. |
Jatin Lodhia | 456b81f | 2015-10-05 17:21:28 -0700 | [diff] [blame^] | 143 | timestamp := tx.st.clock.Now().UnixNano() |
Raja Daoud | d454307 | 2015-06-30 11:15:55 -0700 | [diff] [blame] | 144 | seq := tx.st.seq |
| 145 | for i, op := range tx.ops { |
Sergey Rogulenko | 40402b5 | 2015-08-10 15:09:48 -0700 | [diff] [blame] | 146 | key := logEntryKey(seq) |
Adam Sadovsky | 8db7443 | 2015-05-29 17:37:32 -0700 | [diff] [blame] | 147 | value := &LogEntry{ |
Jatin Lodhia | 45a1fa8 | 2015-06-18 11:51:04 -0700 | [diff] [blame] | 148 | Op: op, |
| 149 | CommitTimestamp: timestamp, |
Raja Daoud | d454307 | 2015-06-30 11:15:55 -0700 | [diff] [blame] | 150 | FromSync: tx.fromSync, |
| 151 | Continued: i < len(tx.ops)-1, |
Adam Sadovsky | 8db7443 | 2015-05-29 17:37:32 -0700 | [diff] [blame] | 152 | } |
Adam Sadovsky | a31a7cd | 2015-07-08 10:44:07 -0700 | [diff] [blame] | 153 | if err := util.Put(nil, tx.itx, key, value); err != nil { |
Adam Sadovsky | 8db7443 | 2015-05-29 17:37:32 -0700 | [diff] [blame] | 154 | return err |
| 155 | } |
Raja Daoud | d454307 | 2015-06-30 11:15:55 -0700 | [diff] [blame] | 156 | seq++ |
Adam Sadovsky | 8db7443 | 2015-05-29 17:37:32 -0700 | [diff] [blame] | 157 | } |
| 158 | if err := tx.itx.Commit(); err != nil { |
| 159 | return err |
| 160 | } |
Raja Daoud | d454307 | 2015-06-30 11:15:55 -0700 | [diff] [blame] | 161 | tx.st.seq = seq |
Sergey Rogulenko | 8bf641c | 2015-08-14 17:00:09 -0700 | [diff] [blame] | 162 | tx.st.watcher.broadcastUpdates() |
Adam Sadovsky | 8db7443 | 2015-05-29 17:37:32 -0700 | [diff] [blame] | 163 | return nil |
| 164 | } |
| 165 | |
| 166 | // Abort implements the store.Transaction interface. |
| 167 | func (tx *transaction) Abort() error { |
| 168 | tx.mu.Lock() |
| 169 | defer tx.mu.Unlock() |
| 170 | if tx.err != nil { |
Adam Sadovsky | a3fc33c | 2015-06-02 18:44:46 -0700 | [diff] [blame] | 171 | return convertError(tx.err) |
Adam Sadovsky | 8db7443 | 2015-05-29 17:37:32 -0700 | [diff] [blame] | 172 | } |
| 173 | tx.err = verror.New(verror.ErrCanceled, nil, store.ErrMsgAbortedTxn) |
| 174 | return tx.itx.Abort() |
| 175 | } |
Jatin Lodhia | 45a1fa8 | 2015-06-18 11:51:04 -0700 | [diff] [blame] | 176 | |
Raja Daoud | 5285136 | 2015-09-14 15:50:40 -0700 | [diff] [blame] | 177 | // GetStoreTime returns the current time from the given transaction store. |
| 178 | func GetStoreTime(ctx *context.T, tx store.Transaction) time.Time { |
| 179 | wtx := tx.(*transaction) |
Jatin Lodhia | 456b81f | 2015-10-05 17:21:28 -0700 | [diff] [blame^] | 180 | return wtx.st.clock.Now() |
Raja Daoud | 5285136 | 2015-09-14 15:50:40 -0700 | [diff] [blame] | 181 | } |
| 182 | |
Raja Daoud | cb50b5d | 2015-06-26 18:37:24 -0700 | [diff] [blame] | 183 | // AddSyncGroupOp injects a SyncGroup operation notification in the log entries |
| 184 | // that the transaction writes when it is committed. It allows the SyncGroup |
| 185 | // operations (create, join, leave, destroy) to notify the sync watcher of the |
| 186 | // change at its proper position in the timeline (the transaction commit). |
| 187 | // Note: this is an internal function used by sync, not part of the interface. |
Sergey Rogulenko | 1068b1a | 2015-08-03 16:53:27 -0700 | [diff] [blame] | 188 | func AddSyncGroupOp(ctx *context.T, tx store.Transaction, prefixes []string, remove bool) error { |
Raja Daoud | cb50b5d | 2015-06-26 18:37:24 -0700 | [diff] [blame] | 189 | wtx := tx.(*transaction) |
| 190 | wtx.mu.Lock() |
| 191 | defer wtx.mu.Unlock() |
| 192 | if wtx.err != nil { |
| 193 | return convertError(wtx.err) |
| 194 | } |
Adam Sadovsky | 4dcc353 | 2015-07-31 13:54:19 -0700 | [diff] [blame] | 195 | // Make a defensive copy of prefixes slice. |
| 196 | wtx.ops = append(wtx.ops, &OpSyncGroup{SyncGroupOp{Prefixes: cpStrings(prefixes), Remove: remove}}) |
Raja Daoud | cb50b5d | 2015-06-26 18:37:24 -0700 | [diff] [blame] | 197 | return nil |
| 198 | } |
| 199 | |
Raja Daoud | d454307 | 2015-06-30 11:15:55 -0700 | [diff] [blame] | 200 | // AddSyncSnapshotOp injects a sync snapshot operation notification in the log |
| 201 | // entries that the transaction writes when it is committed. It allows the |
| 202 | // SyncGroup create or join operations to notify the sync watcher of the |
| 203 | // current keys and their versions to use when initializing the sync metadata |
| 204 | // at the point in the timeline when these keys become syncable (at commit). |
| 205 | // Note: this is an internal function used by sync, not part of the interface. |
Sergey Rogulenko | 1068b1a | 2015-08-03 16:53:27 -0700 | [diff] [blame] | 206 | func AddSyncSnapshotOp(ctx *context.T, tx store.Transaction, key, version []byte) error { |
Raja Daoud | d454307 | 2015-06-30 11:15:55 -0700 | [diff] [blame] | 207 | wtx := tx.(*transaction) |
| 208 | wtx.mu.Lock() |
| 209 | defer wtx.mu.Unlock() |
| 210 | if wtx.err != nil { |
| 211 | return convertError(wtx.err) |
| 212 | } |
| 213 | if !wtx.st.managesKey(key) { |
Adam Sadovsky | 4dcc353 | 2015-07-31 13:54:19 -0700 | [diff] [blame] | 214 | return verror.New(verror.ErrInternal, ctx, fmt.Sprintf("cannot create SyncSnapshotOp on unmanaged key: %s", string(key))) |
Raja Daoud | d454307 | 2015-06-30 11:15:55 -0700 | [diff] [blame] | 215 | } |
Adam Sadovsky | 4dcc353 | 2015-07-31 13:54:19 -0700 | [diff] [blame] | 216 | wtx.ops = append(wtx.ops, &OpSyncSnapshot{SyncSnapshotOp{Key: cp(key), Version: cp(version)}}) |
Raja Daoud | d454307 | 2015-06-30 11:15:55 -0700 | [diff] [blame] | 217 | return nil |
| 218 | } |
| 219 | |
| 220 | // SetTransactionFromSync marks this transaction as created by sync as opposed |
| 221 | // to one created by an application. The net effect is that, at commit time, |
| 222 | // the log entries written are marked as made by sync. This allows the sync |
| 223 | // Watcher to ignore them (echo suppression) because it made these updates. |
| 224 | // Note: this is an internal function used by sync, not part of the interface. |
| 225 | // TODO(rdaoud): support a generic echo-suppression mechanism for apps as well |
| 226 | // maybe by having a creator ID in the transaction and log entries. |
| 227 | // TODO(rdaoud): fold this flag (or creator ID) into Tx options when available. |
| 228 | func SetTransactionFromSync(tx store.Transaction) { |
| 229 | wtx := tx.(*transaction) |
| 230 | wtx.mu.Lock() |
| 231 | defer wtx.mu.Unlock() |
| 232 | wtx.fromSync = true |
| 233 | } |
| 234 | |
Himabindu Pucha | 0ff9b0c | 2015-07-14 11:40:45 -0700 | [diff] [blame] | 235 | // GetVersion returns the current version of a managed key. This method is used |
| 236 | // by the Sync module when the initiator is attempting to add new versions of |
| 237 | // objects. Reading the version key is used for optimistic concurrency |
Sergey Rogulenko | 1068b1a | 2015-08-03 16:53:27 -0700 | [diff] [blame] | 238 | // control. At minimum, an object implementing the Transaction interface is |
Himabindu Pucha | 0ff9b0c | 2015-07-14 11:40:45 -0700 | [diff] [blame] | 239 | // required since this is a Get operation. |
Sergey Rogulenko | 1068b1a | 2015-08-03 16:53:27 -0700 | [diff] [blame] | 240 | func GetVersion(ctx *context.T, tx store.Transaction, key []byte) ([]byte, error) { |
| 241 | switch w := tx.(type) { |
Himabindu Pucha | 0ff9b0c | 2015-07-14 11:40:45 -0700 | [diff] [blame] | 242 | case *transaction: |
| 243 | w.mu.Lock() |
| 244 | defer w.mu.Unlock() |
| 245 | if w.err != nil { |
| 246 | return nil, convertError(w.err) |
| 247 | } |
| 248 | return getVersion(w.itx, key) |
Himabindu Pucha | 0ff9b0c | 2015-07-14 11:40:45 -0700 | [diff] [blame] | 249 | } |
| 250 | return nil, verror.New(verror.ErrInternal, ctx, "unsupported store type") |
| 251 | } |
| 252 | |
| 253 | // GetAtVersion returns the value of a managed key at the requested |
| 254 | // version. This method is used by the Sync module when the responder needs to |
| 255 | // send objects over the wire. At minimum, an object implementing the |
| 256 | // StoreReader interface is required since this is a Get operation. |
| 257 | func GetAtVersion(ctx *context.T, st store.StoreReader, key, valbuf, version []byte) ([]byte, error) { |
| 258 | switch w := st.(type) { |
Sergey Rogulenko | 40402b5 | 2015-08-10 15:09:48 -0700 | [diff] [blame] | 259 | case *snapshot: |
| 260 | return getAtVersion(w.isn, key, valbuf, version) |
Himabindu Pucha | 0ff9b0c | 2015-07-14 11:40:45 -0700 | [diff] [blame] | 261 | case *transaction: |
| 262 | w.mu.Lock() |
| 263 | defer w.mu.Unlock() |
| 264 | if w.err != nil { |
| 265 | return valbuf, convertError(w.err) |
| 266 | } |
| 267 | return getAtVersion(w.itx, key, valbuf, version) |
| 268 | case *wstore: |
| 269 | return getAtVersion(w.ist, key, valbuf, version) |
| 270 | } |
| 271 | return nil, verror.New(verror.ErrInternal, ctx, "unsupported store type") |
| 272 | } |
| 273 | |
| 274 | // PutAtVersion puts a value for the managed key at the requested version. This |
| 275 | // method is used by the Sync module exclusively when the initiator adds objects |
| 276 | // with versions created on other Syncbases. At minimum, an object implementing |
Sergey Rogulenko | 1068b1a | 2015-08-03 16:53:27 -0700 | [diff] [blame] | 277 | // the Transaction interface is required since this is a Put operation. |
| 278 | func PutAtVersion(ctx *context.T, tx store.Transaction, key, valbuf, version []byte) error { |
Himabindu Pucha | 0ff9b0c | 2015-07-14 11:40:45 -0700 | [diff] [blame] | 279 | wtx := tx.(*transaction) |
| 280 | |
| 281 | wtx.mu.Lock() |
| 282 | defer wtx.mu.Unlock() |
| 283 | if wtx.err != nil { |
| 284 | return convertError(wtx.err) |
| 285 | } |
| 286 | |
| 287 | // Note that we do not enqueue a PutOp in the log since this Put is not |
| 288 | // updating the current version of a key. |
| 289 | return wtx.itx.Put(makeAtVersionKey(key, version), valbuf) |
| 290 | } |
| 291 | |
| 292 | // PutVersion updates the version of a managed key to the requested |
| 293 | // version. This method is used by the Sync module exclusively when the |
| 294 | // initiator selects which of the already stored versions (via PutAtVersion |
| 295 | // calls) becomes the current version. At minimum, an object implementing |
Sergey Rogulenko | 1068b1a | 2015-08-03 16:53:27 -0700 | [diff] [blame] | 296 | // the Transaction interface is required since this is a Put operation. |
| 297 | func PutVersion(ctx *context.T, tx store.Transaction, key, version []byte) error { |
Himabindu Pucha | 0ff9b0c | 2015-07-14 11:40:45 -0700 | [diff] [blame] | 298 | wtx := tx.(*transaction) |
| 299 | |
| 300 | wtx.mu.Lock() |
| 301 | defer wtx.mu.Unlock() |
| 302 | if wtx.err != nil { |
| 303 | return convertError(wtx.err) |
| 304 | } |
| 305 | |
| 306 | if err := wtx.itx.Put(makeVersionKey(key), version); err != nil { |
| 307 | return err |
| 308 | } |
Adam Sadovsky | 4dcc353 | 2015-07-31 13:54:19 -0700 | [diff] [blame] | 309 | wtx.ops = append(wtx.ops, &OpPut{PutOp{Key: cp(key), Version: cp(version)}}) |
Himabindu Pucha | 0ff9b0c | 2015-07-14 11:40:45 -0700 | [diff] [blame] | 310 | return nil |
| 311 | } |