Adam Sadovsky | 8db7443 | 2015-05-29 17:37:32 -0700 | [diff] [blame] | 1 | // Copyright 2015 The Vanadium Authors. All rights reserved. |
| 2 | // Use of this source code is governed by a BSD-style |
| 3 | // license that can be found in the LICENSE file. |
| 4 | |
| 5 | package watchable |
| 6 | |
| 7 | import ( |
| 8 | "fmt" |
Adam Sadovsky | b2a6569 | 2015-05-29 21:57:40 -0700 | [diff] [blame] | 9 | "math" |
Adam Sadovsky | 8db7443 | 2015-05-29 17:37:32 -0700 | [diff] [blame] | 10 | "sync" |
| 11 | |
| 12 | "v.io/syncbase/x/ref/services/syncbase/server/util" |
| 13 | "v.io/syncbase/x/ref/services/syncbase/store" |
| 14 | "v.io/v23/verror" |
| 15 | ) |
| 16 | |
| 17 | type transaction struct { |
| 18 | itx store.Transaction |
| 19 | st *wstore |
| 20 | mu sync.Mutex // protects the fields below |
| 21 | err error |
| 22 | ops []Op |
| 23 | } |
| 24 | |
| 25 | var _ store.Transaction = (*transaction)(nil) |
| 26 | |
| 27 | func newTransaction(st *wstore) *transaction { |
| 28 | return &transaction{ |
| 29 | itx: st.ist.NewTransaction(), |
| 30 | st: st, |
| 31 | } |
| 32 | } |
| 33 | |
| 34 | // Get implements the store.StoreReader interface. |
| 35 | func (tx *transaction) Get(key, valbuf []byte) ([]byte, error) { |
| 36 | tx.mu.Lock() |
| 37 | defer tx.mu.Unlock() |
| 38 | if tx.err != nil { |
Adam Sadovsky | a3fc33c | 2015-06-02 18:44:46 -0700 | [diff] [blame^] | 39 | return valbuf, convertError(tx.err) |
Adam Sadovsky | 8db7443 | 2015-05-29 17:37:32 -0700 | [diff] [blame] | 40 | } |
| 41 | var err error |
| 42 | if !tx.st.managesKey(key) { |
| 43 | valbuf, err = tx.itx.Get(key, valbuf) |
| 44 | } else { |
| 45 | valbuf, err = getVersioned(tx.itx, key, valbuf) |
| 46 | tx.ops = append(tx.ops, &OpGet{GetOp{Key: key}}) |
| 47 | } |
| 48 | return valbuf, err |
| 49 | } |
| 50 | |
| 51 | // Scan implements the store.StoreReader interface. |
| 52 | func (tx *transaction) Scan(start, limit []byte) store.Stream { |
| 53 | tx.mu.Lock() |
| 54 | defer tx.mu.Unlock() |
| 55 | if tx.err != nil { |
| 56 | return &store.InvalidStream{tx.err} |
| 57 | } |
| 58 | var it store.Stream |
| 59 | if !tx.st.managesRange(start, limit) { |
| 60 | it = tx.itx.Scan(start, limit) |
| 61 | } else { |
| 62 | it = newStreamVersioned(tx.itx, start, limit) |
| 63 | tx.ops = append(tx.ops, &OpScan{ScanOp{Start: start, Limit: limit}}) |
| 64 | } |
| 65 | return it |
| 66 | } |
| 67 | |
| 68 | // Put implements the store.StoreWriter interface. |
| 69 | func (tx *transaction) Put(key, value []byte) error { |
| 70 | tx.mu.Lock() |
| 71 | defer tx.mu.Unlock() |
| 72 | if tx.err != nil { |
Adam Sadovsky | a3fc33c | 2015-06-02 18:44:46 -0700 | [diff] [blame^] | 73 | return convertError(tx.err) |
Adam Sadovsky | 8db7443 | 2015-05-29 17:37:32 -0700 | [diff] [blame] | 74 | } |
| 75 | var err error |
| 76 | if !tx.st.managesKey(key) { |
| 77 | err = tx.itx.Put(key, value) |
| 78 | } else { |
| 79 | err = putVersioned(tx.itx, key, value) |
| 80 | tx.ops = append(tx.ops, &OpPut{PutOp{Key: key, Value: value}}) |
| 81 | } |
| 82 | return err |
| 83 | } |
| 84 | |
| 85 | // Delete implements the store.StoreWriter interface. |
| 86 | func (tx *transaction) Delete(key []byte) error { |
| 87 | tx.mu.Lock() |
| 88 | defer tx.mu.Unlock() |
| 89 | if tx.err != nil { |
Adam Sadovsky | a3fc33c | 2015-06-02 18:44:46 -0700 | [diff] [blame^] | 90 | return convertError(tx.err) |
Adam Sadovsky | 8db7443 | 2015-05-29 17:37:32 -0700 | [diff] [blame] | 91 | } |
| 92 | var err error |
| 93 | if !tx.st.managesKey(key) { |
| 94 | err = tx.itx.Delete(key) |
| 95 | } else { |
| 96 | err = deleteVersioned(tx.itx, key) |
| 97 | tx.ops = append(tx.ops, &OpDelete{DeleteOp{Key: key}}) |
| 98 | } |
| 99 | return err |
| 100 | } |
| 101 | |
| 102 | // Commit implements the store.Transaction interface. |
| 103 | func (tx *transaction) Commit() error { |
| 104 | tx.mu.Lock() |
| 105 | defer tx.mu.Unlock() |
| 106 | if tx.err != nil { |
Adam Sadovsky | a3fc33c | 2015-06-02 18:44:46 -0700 | [diff] [blame^] | 107 | return convertError(tx.err) |
Adam Sadovsky | 8db7443 | 2015-05-29 17:37:32 -0700 | [diff] [blame] | 108 | } |
| 109 | tx.err = verror.New(verror.ErrBadState, nil, store.ErrMsgCommittedTxn) |
| 110 | tx.st.mu.Lock() |
| 111 | defer tx.st.mu.Unlock() |
| 112 | // Check sequence numbers. |
Adam Sadovsky | b2a6569 | 2015-05-29 21:57:40 -0700 | [diff] [blame] | 113 | if uint64(len(tx.ops)) > math.MaxUint16 { |
Adam Sadovsky | 8db7443 | 2015-05-29 17:37:32 -0700 | [diff] [blame] | 114 | return verror.New(verror.ErrInternal, nil, "too many ops") |
| 115 | } |
Adam Sadovsky | b2a6569 | 2015-05-29 21:57:40 -0700 | [diff] [blame] | 116 | if tx.st.seq == math.MaxUint64 { |
Adam Sadovsky | 8db7443 | 2015-05-29 17:37:32 -0700 | [diff] [blame] | 117 | return verror.New(verror.ErrInternal, nil, "seq maxed out") |
| 118 | } |
| 119 | // Write LogEntry records. |
Adam Sadovsky | b2a6569 | 2015-05-29 21:57:40 -0700 | [diff] [blame] | 120 | // Note, MaxUint16 is 0xffff and MaxUint64 is 0xffffffffffffffff. |
Adam Sadovsky | 8db7443 | 2015-05-29 17:37:32 -0700 | [diff] [blame] | 121 | // TODO(sadovsky): Use a more space-efficient lexicographic number encoding. |
Adam Sadovsky | b2a6569 | 2015-05-29 21:57:40 -0700 | [diff] [blame] | 122 | keyPrefix := join(util.LogPrefix, fmt.Sprintf("%016x", tx.st.seq)) |
Adam Sadovsky | 8db7443 | 2015-05-29 17:37:32 -0700 | [diff] [blame] | 123 | for txSeq, op := range tx.ops { |
Adam Sadovsky | b2a6569 | 2015-05-29 21:57:40 -0700 | [diff] [blame] | 124 | key := join(keyPrefix, fmt.Sprintf("%04x", txSeq)) |
Adam Sadovsky | 8db7443 | 2015-05-29 17:37:32 -0700 | [diff] [blame] | 125 | value := &LogEntry{ |
| 126 | Op: op, |
| 127 | // TODO(sadovsky): This information is also captured in LogEntry keys. |
| 128 | // Optimize to avoid redundancy. |
| 129 | Continued: txSeq < len(tx.ops)-1, |
| 130 | } |
| 131 | if err := util.PutObject(tx.itx, key, value); err != nil { |
| 132 | return err |
| 133 | } |
| 134 | } |
| 135 | if err := tx.itx.Commit(); err != nil { |
| 136 | return err |
| 137 | } |
| 138 | tx.st.seq++ |
| 139 | return nil |
| 140 | } |
| 141 | |
| 142 | // Abort implements the store.Transaction interface. |
| 143 | func (tx *transaction) Abort() error { |
| 144 | tx.mu.Lock() |
| 145 | defer tx.mu.Unlock() |
| 146 | if tx.err != nil { |
Adam Sadovsky | a3fc33c | 2015-06-02 18:44:46 -0700 | [diff] [blame^] | 147 | return convertError(tx.err) |
Adam Sadovsky | 8db7443 | 2015-05-29 17:37:32 -0700 | [diff] [blame] | 148 | } |
| 149 | tx.err = verror.New(verror.ErrCanceled, nil, store.ErrMsgAbortedTxn) |
| 150 | return tx.itx.Abort() |
| 151 | } |