blob: 1c81abd07cb6fe74cc12f5e0661be1d2d3090d0b [file] [log] [blame]
Sergey Rogulenkob0081cf2015-05-05 22:39:37 -07001// Copyright 2015 The Vanadium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
Sergey Rogulenko054b4db2015-08-20 11:06:52 -07005package transactions
Sergey Rogulenkob0081cf2015-05-05 22:39:37 -07006
Sergey Rogulenkob0081cf2015-05-05 22:39:37 -07007import (
Raja Daouda3e42302015-06-10 18:49:02 -07008 "bytes"
Sergey Rogulenko8a1ae3a2015-05-29 17:13:44 -07009 "container/list"
Sergey Rogulenko0dbfe072015-05-19 20:10:18 -070010 "sync"
11
Sergey Rogulenkoa53e60f2015-05-22 11:05:01 -070012 "v.io/v23/verror"
Adam Sadovskyb5f88922015-07-26 17:41:37 -070013 "v.io/x/lib/vlog"
Adam Sadovskyf2efeb52015-08-31 14:17:49 -070014 "v.io/x/ref/services/syncbase/store"
Sergey Rogulenkob0081cf2015-05-05 22:39:37 -070015)
16
Sergey Rogulenko054b4db2015-08-20 11:06:52 -070017// transaction is a wrapper on top of a BatchWriter and a store.Snapshot that
18// implements the store.Transaction interface.
Sergey Rogulenkob0081cf2015-05-05 22:39:37 -070019type transaction struct {
Sergey Rogulenkoa53e60f2015-05-22 11:05:01 -070020 // mu protects the state of the transaction.
Sergey Rogulenko0dbfe072015-05-19 20:10:18 -070021 mu sync.Mutex
Sergey Rogulenko054b4db2015-08-20 11:06:52 -070022 mg *manager
Sergey Rogulenko8a1ae3a2015-05-29 17:13:44 -070023 seq uint64
Sergey Rogulenko7f77da02015-09-25 18:28:51 -070024 event *list.Element // pointer to element of mg.events
Sergey Rogulenkob0081cf2015-05-05 22:39:37 -070025 snapshot store.Snapshot
Sergey Rogulenko054b4db2015-08-20 11:06:52 -070026 reads readSet
27 writes []WriteOp
Sergey Rogulenko0dbfe072015-05-19 20:10:18 -070028 err error
Sergey Rogulenkob0081cf2015-05-05 22:39:37 -070029}
30
31var _ store.Transaction = (*transaction)(nil)
32
Sergey Rogulenko054b4db2015-08-20 11:06:52 -070033func newTransaction(mg *manager) *transaction {
Sergey Rogulenkodef3b302015-05-20 17:33:24 -070034 tx := &transaction{
Sergey Rogulenko054b4db2015-08-20 11:06:52 -070035 mg: mg,
36 snapshot: mg.BatchStore.NewSnapshot(),
37 seq: mg.seq,
Sergey Rogulenkob0081cf2015-05-05 22:39:37 -070038 }
Sergey Rogulenko054b4db2015-08-20 11:06:52 -070039 tx.event = mg.events.PushFront(tx)
Sergey Rogulenkodef3b302015-05-20 17:33:24 -070040 return tx
Sergey Rogulenkob0081cf2015-05-05 22:39:37 -070041}
42
Sergey Rogulenko054b4db2015-08-20 11:06:52 -070043// removeEvent removes this transaction from the mg.events queue.
Sergey Rogulenko7f77da02015-09-25 18:28:51 -070044// Assumes mu and mg.mu are held.
Sergey Rogulenko8a1ae3a2015-05-29 17:13:44 -070045func (tx *transaction) removeEvent() {
Sergey Rogulenko7f77da02015-09-25 18:28:51 -070046 if tx.event != nil {
47 tx.mg.events.Remove(tx.event)
48 tx.event = nil
Sergey Rogulenko8a1ae3a2015-05-29 17:13:44 -070049 }
Sergey Rogulenko8a1ae3a2015-05-29 17:13:44 -070050}
51
Sergey Rogulenkob0081cf2015-05-05 22:39:37 -070052// Get implements the store.StoreReader interface.
Sergey Rogulenko802fe1e2015-05-08 12:51:22 -070053func (tx *transaction) Get(key, valbuf []byte) ([]byte, error) {
Sergey Rogulenko0dbfe072015-05-19 20:10:18 -070054 tx.mu.Lock()
55 defer tx.mu.Unlock()
56 if tx.err != nil {
Sergey Rogulenko054b4db2015-08-20 11:06:52 -070057 return valbuf, store.ConvertError(tx.err)
Sergey Rogulenko0dbfe072015-05-19 20:10:18 -070058 }
John Kline18834bd2015-06-26 10:07:46 -070059 tx.reads.Keys = append(tx.reads.Keys, key)
Raja Daouda3e42302015-06-10 18:49:02 -070060
61 // Reflect the state of the transaction: the "writes" (puts and
62 // deletes) override the values in the transaction snapshot.
63 // Find the last "writes" entry for this key, if one exists.
64 // Note: this step could be optimized by using maps (puts and
65 // deletes) instead of an array.
66 for i := len(tx.writes) - 1; i >= 0; i-- {
67 op := &tx.writes[i]
John Kline18834bd2015-06-26 10:07:46 -070068 if bytes.Equal(op.Key, key) {
Sergey Rogulenko054b4db2015-08-20 11:06:52 -070069 if op.T == PutOp {
John Kline18834bd2015-06-26 10:07:46 -070070 return op.Value, nil
Raja Daouda3e42302015-06-10 18:49:02 -070071 }
72 return valbuf, verror.New(store.ErrUnknownKey, nil, string(key))
73 }
74 }
75
Sergey Rogulenko802fe1e2015-05-08 12:51:22 -070076 return tx.snapshot.Get(key, valbuf)
Sergey Rogulenkob0081cf2015-05-05 22:39:37 -070077}
78
Adam Sadovskyc18c8ca2015-05-08 18:05:46 -070079// Scan implements the store.StoreReader interface.
Adam Sadovskyf437f332015-05-19 23:03:22 -070080func (tx *transaction) Scan(start, limit []byte) store.Stream {
Sergey Rogulenko0dbfe072015-05-19 20:10:18 -070081 tx.mu.Lock()
82 defer tx.mu.Unlock()
83 if tx.err != nil {
Jiri Simsad88e9ad2015-08-14 10:12:27 -070084 return &store.InvalidStream{Error: tx.err}
Sergey Rogulenko0dbfe072015-05-19 20:10:18 -070085 }
John Kline18834bd2015-06-26 10:07:46 -070086
Sergey Rogulenko054b4db2015-08-20 11:06:52 -070087 tx.reads.Ranges = append(tx.reads.Ranges, scanRange{
John Kline18834bd2015-06-26 10:07:46 -070088 Start: start,
89 Limit: limit,
Sergey Rogulenko8a1ae3a2015-05-29 17:13:44 -070090 })
John Kline18834bd2015-06-26 10:07:46 -070091
92 // Return a stream which merges the snaphot stream with the uncommitted changes.
Sergey Rogulenko054b4db2015-08-20 11:06:52 -070093 return mergeWritesWithStream(tx.snapshot, tx.writes, start, limit)
Adam Sadovskyc18c8ca2015-05-08 18:05:46 -070094}
95
Sergey Rogulenkob0081cf2015-05-05 22:39:37 -070096// Put implements the store.StoreWriter interface.
Sergey Rogulenko802fe1e2015-05-08 12:51:22 -070097func (tx *transaction) Put(key, value []byte) error {
Sergey Rogulenko0dbfe072015-05-19 20:10:18 -070098 tx.mu.Lock()
99 defer tx.mu.Unlock()
100 if tx.err != nil {
Sergey Rogulenko054b4db2015-08-20 11:06:52 -0700101 return store.ConvertError(tx.err)
Sergey Rogulenko0dbfe072015-05-19 20:10:18 -0700102 }
Sergey Rogulenko054b4db2015-08-20 11:06:52 -0700103 tx.writes = append(tx.writes, WriteOp{
104 T: PutOp,
John Kline18834bd2015-06-26 10:07:46 -0700105 Key: key,
106 Value: value,
Sergey Rogulenko8a1ae3a2015-05-29 17:13:44 -0700107 })
Sergey Rogulenkob0081cf2015-05-05 22:39:37 -0700108 return nil
109}
110
111// Delete implements the store.StoreWriter interface.
Sergey Rogulenko802fe1e2015-05-08 12:51:22 -0700112func (tx *transaction) Delete(key []byte) error {
Sergey Rogulenko0dbfe072015-05-19 20:10:18 -0700113 tx.mu.Lock()
114 defer tx.mu.Unlock()
115 if tx.err != nil {
Sergey Rogulenko054b4db2015-08-20 11:06:52 -0700116 return store.ConvertError(tx.err)
Sergey Rogulenko0dbfe072015-05-19 20:10:18 -0700117 }
Sergey Rogulenko054b4db2015-08-20 11:06:52 -0700118 tx.writes = append(tx.writes, WriteOp{
119 T: DeleteOp,
John Kline18834bd2015-06-26 10:07:46 -0700120 Key: key,
Sergey Rogulenko8a1ae3a2015-05-29 17:13:44 -0700121 })
Sergey Rogulenkob0081cf2015-05-05 22:39:37 -0700122 return nil
123}
Adam Sadovskyc18c8ca2015-05-08 18:05:46 -0700124
Sergey Rogulenko8a1ae3a2015-05-29 17:13:44 -0700125// validateReadSet returns true iff the read set of this transaction has not
126// been invalidated by other transactions.
Sergey Rogulenko054b4db2015-08-20 11:06:52 -0700127// Assumes tx.mg.mu is held.
Sergey Rogulenko8a1ae3a2015-05-29 17:13:44 -0700128func (tx *transaction) validateReadSet() bool {
John Kline18834bd2015-06-26 10:07:46 -0700129 for _, key := range tx.reads.Keys {
Sergey Rogulenko054b4db2015-08-20 11:06:52 -0700130 if tx.mg.txTable.get(key) > tx.seq {
Adam Sadovskyb5f88922015-07-26 17:41:37 -0700131 vlog.VI(3).Infof("key conflict: %q", key)
Sergey Rogulenko8a1ae3a2015-05-29 17:13:44 -0700132 return false
133 }
134 }
John Kline18834bd2015-06-26 10:07:46 -0700135 for _, r := range tx.reads.Ranges {
Sergey Rogulenko054b4db2015-08-20 11:06:52 -0700136 if tx.mg.txTable.rangeMax(r.Start, r.Limit) > tx.seq {
Adam Sadovskyb5f88922015-07-26 17:41:37 -0700137 vlog.VI(3).Infof("range conflict: {%q, %q}", r.Start, r.Limit)
Sergey Rogulenko8a1ae3a2015-05-29 17:13:44 -0700138 return false
139 }
140
141 }
142 return true
143}
144
Adam Sadovskyc18c8ca2015-05-08 18:05:46 -0700145// Commit implements the store.Transaction interface.
146func (tx *transaction) Commit() error {
Sergey Rogulenko0dbfe072015-05-19 20:10:18 -0700147 tx.mu.Lock()
148 defer tx.mu.Unlock()
149 if tx.err != nil {
Sergey Rogulenko054b4db2015-08-20 11:06:52 -0700150 return store.ConvertError(tx.err)
Sergey Rogulenko0dbfe072015-05-19 20:10:18 -0700151 }
Adam Sadovsky8db74432015-05-29 17:37:32 -0700152 tx.err = verror.New(verror.ErrBadState, nil, store.ErrMsgCommittedTxn)
Sergey Rogulenko7f77da02015-09-25 18:28:51 -0700153 tx.snapshot.Abort()
Sergey Rogulenko054b4db2015-08-20 11:06:52 -0700154 tx.mg.mu.Lock()
155 defer tx.mg.mu.Unlock()
Sergey Rogulenko7f77da02015-09-25 18:28:51 -0700156 if tx.mg.txTable == nil {
157 return verror.New(verror.ErrCanceled, nil, store.ErrMsgClosedStore)
158 }
159 // Explicitly remove this transaction from the event queue. If this was the
160 // only active transaction, the event queue becomes empty and trackBatch will
161 // not add this transaction's write set to txTable.
162 tx.removeEvent()
Sergey Rogulenko8a1ae3a2015-05-29 17:13:44 -0700163 if !tx.validateReadSet() {
164 return store.NewErrConcurrentTransaction(nil)
165 }
Sergey Rogulenko054b4db2015-08-20 11:06:52 -0700166 if err := tx.mg.BatchStore.WriteBatch(tx.writes...); err != nil {
167 return err
168 }
169 tx.mg.trackBatch(tx.writes...)
170 return nil
Adam Sadovskyc18c8ca2015-05-08 18:05:46 -0700171}
172
173// Abort implements the store.Transaction interface.
174func (tx *transaction) Abort() error {
Sergey Rogulenko0dbfe072015-05-19 20:10:18 -0700175 tx.mu.Lock()
176 defer tx.mu.Unlock()
Sergey Rogulenkoa53e60f2015-05-22 11:05:01 -0700177 if tx.err != nil {
Sergey Rogulenko054b4db2015-08-20 11:06:52 -0700178 return store.ConvertError(tx.err)
Sergey Rogulenko0dbfe072015-05-19 20:10:18 -0700179 }
Adam Sadovsky8db74432015-05-29 17:37:32 -0700180 tx.err = verror.New(verror.ErrCanceled, nil, store.ErrMsgAbortedTxn)
Sergey Rogulenko7f77da02015-09-25 18:28:51 -0700181 tx.snapshot.Abort()
182 tx.mg.mu.Lock()
183 tx.removeEvent()
184 tx.mg.mu.Unlock()
Adam Sadovskyc18c8ca2015-05-08 18:05:46 -0700185 return nil
186}