blob: b4556c34395f019f579304e07e221b8e4ed034a3 [file] [log] [blame]
Adam Sadovsky8db74432015-05-29 17:37:32 -07001// Copyright 2015 The Vanadium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package watchable
6
7import (
8 "fmt"
Adam Sadovskyb2a65692015-05-29 21:57:40 -07009 "math"
Adam Sadovsky8db74432015-05-29 17:37:32 -070010 "sync"
Raja Daoud52851362015-09-14 15:50:40 -070011 "time"
Adam Sadovsky8db74432015-05-29 17:37:32 -070012
Raja Daoudd4543072015-06-30 11:15:55 -070013 "v.io/v23/context"
Adam Sadovsky8db74432015-05-29 17:37:32 -070014 "v.io/v23/verror"
Adam Sadovskyf2efeb52015-08-31 14:17:49 -070015 "v.io/x/ref/services/syncbase/server/util"
16 "v.io/x/ref/services/syncbase/store"
Adam Sadovsky8db74432015-05-29 17:37:32 -070017)
18
19type transaction struct {
20 itx store.Transaction
21 st *wstore
22 mu sync.Mutex // protects the fields below
23 err error
24 ops []Op
Raja Daoudd4543072015-06-30 11:15:55 -070025 // fromSync is true when a transaction is created by sync. This causes
26 // the log entries written at commit time to have their "FromSync" field
27 // set to true. That in turn causes the sync watcher to filter out such
28 // updates since sync already knows about them (echo suppression).
29 fromSync bool
Adam Sadovsky8db74432015-05-29 17:37:32 -070030}
31
32var _ store.Transaction = (*transaction)(nil)
33
Adam Sadovsky4dcc3532015-07-31 13:54:19 -070034func cp(src []byte) []byte {
35 dst := make([]byte, len(src))
36 for i := 0; i < len(src); i++ {
37 dst[i] = src[i]
38 }
39 return dst
40}
41
42func cpStrings(src []string) []string {
43 dst := make([]string, len(src))
44 for i := 0; i < len(src); i++ {
45 dst[i] = src[i]
46 }
47 return dst
48}
49
Adam Sadovsky8db74432015-05-29 17:37:32 -070050func newTransaction(st *wstore) *transaction {
51 return &transaction{
52 itx: st.ist.NewTransaction(),
53 st: st,
54 }
55}
56
57// Get implements the store.StoreReader interface.
58func (tx *transaction) Get(key, valbuf []byte) ([]byte, error) {
59 tx.mu.Lock()
60 defer tx.mu.Unlock()
61 if tx.err != nil {
Adam Sadovskya3fc33c2015-06-02 18:44:46 -070062 return valbuf, convertError(tx.err)
Adam Sadovsky8db74432015-05-29 17:37:32 -070063 }
64 var err error
65 if !tx.st.managesKey(key) {
66 valbuf, err = tx.itx.Get(key, valbuf)
67 } else {
68 valbuf, err = getVersioned(tx.itx, key, valbuf)
Adam Sadovsky4dcc3532015-07-31 13:54:19 -070069 tx.ops = append(tx.ops, &OpGet{GetOp{Key: cp(key)}})
Adam Sadovsky8db74432015-05-29 17:37:32 -070070 }
71 return valbuf, err
72}
73
74// Scan implements the store.StoreReader interface.
75func (tx *transaction) Scan(start, limit []byte) store.Stream {
76 tx.mu.Lock()
77 defer tx.mu.Unlock()
78 if tx.err != nil {
Jiri Simsad88e9ad2015-08-14 10:12:27 -070079 return &store.InvalidStream{Error: tx.err}
Adam Sadovsky8db74432015-05-29 17:37:32 -070080 }
81 var it store.Stream
82 if !tx.st.managesRange(start, limit) {
83 it = tx.itx.Scan(start, limit)
84 } else {
85 it = newStreamVersioned(tx.itx, start, limit)
Adam Sadovsky4dcc3532015-07-31 13:54:19 -070086 tx.ops = append(tx.ops, &OpScan{ScanOp{Start: cp(start), Limit: cp(limit)}})
Adam Sadovsky8db74432015-05-29 17:37:32 -070087 }
88 return it
89}
90
91// Put implements the store.StoreWriter interface.
92func (tx *transaction) Put(key, value []byte) error {
93 tx.mu.Lock()
94 defer tx.mu.Unlock()
95 if tx.err != nil {
Adam Sadovskya3fc33c2015-06-02 18:44:46 -070096 return convertError(tx.err)
Adam Sadovsky8db74432015-05-29 17:37:32 -070097 }
Adam Sadovsky8db74432015-05-29 17:37:32 -070098 if !tx.st.managesKey(key) {
Raja Daoudcb50b5d2015-06-26 18:37:24 -070099 return tx.itx.Put(key, value)
Adam Sadovsky8db74432015-05-29 17:37:32 -0700100 }
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700101 version, err := putVersioned(tx.itx, key, value)
102 if err != nil {
103 return err
104 }
Adam Sadovsky4dcc3532015-07-31 13:54:19 -0700105 tx.ops = append(tx.ops, &OpPut{PutOp{Key: cp(key), Version: version}})
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700106 return nil
Adam Sadovsky8db74432015-05-29 17:37:32 -0700107}
108
109// Delete implements the store.StoreWriter interface.
110func (tx *transaction) Delete(key []byte) error {
111 tx.mu.Lock()
112 defer tx.mu.Unlock()
113 if tx.err != nil {
Adam Sadovskya3fc33c2015-06-02 18:44:46 -0700114 return convertError(tx.err)
Adam Sadovsky8db74432015-05-29 17:37:32 -0700115 }
116 var err error
117 if !tx.st.managesKey(key) {
Adam Sadovskya31a7cd2015-07-08 10:44:07 -0700118 return tx.itx.Delete(key)
Adam Sadovsky8db74432015-05-29 17:37:32 -0700119 }
Adam Sadovskya31a7cd2015-07-08 10:44:07 -0700120 err = deleteVersioned(tx.itx, key)
121 if err != nil {
122 return err
123 }
Adam Sadovsky4dcc3532015-07-31 13:54:19 -0700124 tx.ops = append(tx.ops, &OpDelete{DeleteOp{Key: cp(key)}})
Adam Sadovskya31a7cd2015-07-08 10:44:07 -0700125 return nil
Adam Sadovsky8db74432015-05-29 17:37:32 -0700126}
127
128// Commit implements the store.Transaction interface.
129func (tx *transaction) Commit() error {
130 tx.mu.Lock()
131 defer tx.mu.Unlock()
132 if tx.err != nil {
Adam Sadovskya3fc33c2015-06-02 18:44:46 -0700133 return convertError(tx.err)
Adam Sadovsky8db74432015-05-29 17:37:32 -0700134 }
135 tx.err = verror.New(verror.ErrBadState, nil, store.ErrMsgCommittedTxn)
136 tx.st.mu.Lock()
137 defer tx.st.mu.Unlock()
Raja Daoudd4543072015-06-30 11:15:55 -0700138 // Check if there is enough space left in the sequence number.
139 if (math.MaxUint64 - tx.st.seq) < uint64(len(tx.ops)) {
Adam Sadovsky8db74432015-05-29 17:37:32 -0700140 return verror.New(verror.ErrInternal, nil, "seq maxed out")
141 }
142 // Write LogEntry records.
Jatin Lodhia456b81f2015-10-05 17:21:28 -0700143 timestamp := tx.st.clock.Now().UnixNano()
Raja Daoudd4543072015-06-30 11:15:55 -0700144 seq := tx.st.seq
145 for i, op := range tx.ops {
Sergey Rogulenko40402b52015-08-10 15:09:48 -0700146 key := logEntryKey(seq)
Adam Sadovsky8db74432015-05-29 17:37:32 -0700147 value := &LogEntry{
Jatin Lodhia45a1fa82015-06-18 11:51:04 -0700148 Op: op,
149 CommitTimestamp: timestamp,
Raja Daoudd4543072015-06-30 11:15:55 -0700150 FromSync: tx.fromSync,
151 Continued: i < len(tx.ops)-1,
Adam Sadovsky8db74432015-05-29 17:37:32 -0700152 }
Adam Sadovskya31a7cd2015-07-08 10:44:07 -0700153 if err := util.Put(nil, tx.itx, key, value); err != nil {
Adam Sadovsky8db74432015-05-29 17:37:32 -0700154 return err
155 }
Raja Daoudd4543072015-06-30 11:15:55 -0700156 seq++
Adam Sadovsky8db74432015-05-29 17:37:32 -0700157 }
158 if err := tx.itx.Commit(); err != nil {
159 return err
160 }
Raja Daoudd4543072015-06-30 11:15:55 -0700161 tx.st.seq = seq
Sergey Rogulenko8bf641c2015-08-14 17:00:09 -0700162 tx.st.watcher.broadcastUpdates()
Adam Sadovsky8db74432015-05-29 17:37:32 -0700163 return nil
164}
165
166// Abort implements the store.Transaction interface.
167func (tx *transaction) Abort() error {
168 tx.mu.Lock()
169 defer tx.mu.Unlock()
170 if tx.err != nil {
Adam Sadovskya3fc33c2015-06-02 18:44:46 -0700171 return convertError(tx.err)
Adam Sadovsky8db74432015-05-29 17:37:32 -0700172 }
173 tx.err = verror.New(verror.ErrCanceled, nil, store.ErrMsgAbortedTxn)
174 return tx.itx.Abort()
175}
Jatin Lodhia45a1fa82015-06-18 11:51:04 -0700176
Raja Daoud52851362015-09-14 15:50:40 -0700177// GetStoreTime returns the current time from the given transaction store.
178func GetStoreTime(ctx *context.T, tx store.Transaction) time.Time {
179 wtx := tx.(*transaction)
Jatin Lodhia456b81f2015-10-05 17:21:28 -0700180 return wtx.st.clock.Now()
Raja Daoud52851362015-09-14 15:50:40 -0700181}
182
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700183// AddSyncGroupOp injects a SyncGroup operation notification in the log entries
184// that the transaction writes when it is committed. It allows the SyncGroup
185// operations (create, join, leave, destroy) to notify the sync watcher of the
186// change at its proper position in the timeline (the transaction commit).
187// Note: this is an internal function used by sync, not part of the interface.
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700188func AddSyncGroupOp(ctx *context.T, tx store.Transaction, prefixes []string, remove bool) error {
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700189 wtx := tx.(*transaction)
190 wtx.mu.Lock()
191 defer wtx.mu.Unlock()
192 if wtx.err != nil {
193 return convertError(wtx.err)
194 }
Adam Sadovsky4dcc3532015-07-31 13:54:19 -0700195 // Make a defensive copy of prefixes slice.
196 wtx.ops = append(wtx.ops, &OpSyncGroup{SyncGroupOp{Prefixes: cpStrings(prefixes), Remove: remove}})
Raja Daoudcb50b5d2015-06-26 18:37:24 -0700197 return nil
198}
199
Raja Daoudd4543072015-06-30 11:15:55 -0700200// AddSyncSnapshotOp injects a sync snapshot operation notification in the log
201// entries that the transaction writes when it is committed. It allows the
202// SyncGroup create or join operations to notify the sync watcher of the
203// current keys and their versions to use when initializing the sync metadata
204// at the point in the timeline when these keys become syncable (at commit).
205// Note: this is an internal function used by sync, not part of the interface.
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700206func AddSyncSnapshotOp(ctx *context.T, tx store.Transaction, key, version []byte) error {
Raja Daoudd4543072015-06-30 11:15:55 -0700207 wtx := tx.(*transaction)
208 wtx.mu.Lock()
209 defer wtx.mu.Unlock()
210 if wtx.err != nil {
211 return convertError(wtx.err)
212 }
213 if !wtx.st.managesKey(key) {
Adam Sadovsky4dcc3532015-07-31 13:54:19 -0700214 return verror.New(verror.ErrInternal, ctx, fmt.Sprintf("cannot create SyncSnapshotOp on unmanaged key: %s", string(key)))
Raja Daoudd4543072015-06-30 11:15:55 -0700215 }
Adam Sadovsky4dcc3532015-07-31 13:54:19 -0700216 wtx.ops = append(wtx.ops, &OpSyncSnapshot{SyncSnapshotOp{Key: cp(key), Version: cp(version)}})
Raja Daoudd4543072015-06-30 11:15:55 -0700217 return nil
218}
219
220// SetTransactionFromSync marks this transaction as created by sync as opposed
221// to one created by an application. The net effect is that, at commit time,
222// the log entries written are marked as made by sync. This allows the sync
223// Watcher to ignore them (echo suppression) because it made these updates.
224// Note: this is an internal function used by sync, not part of the interface.
225// TODO(rdaoud): support a generic echo-suppression mechanism for apps as well
226// maybe by having a creator ID in the transaction and log entries.
227// TODO(rdaoud): fold this flag (or creator ID) into Tx options when available.
228func SetTransactionFromSync(tx store.Transaction) {
229 wtx := tx.(*transaction)
230 wtx.mu.Lock()
231 defer wtx.mu.Unlock()
232 wtx.fromSync = true
233}
234
Himabindu Pucha0ff9b0c2015-07-14 11:40:45 -0700235// GetVersion returns the current version of a managed key. This method is used
236// by the Sync module when the initiator is attempting to add new versions of
237// objects. Reading the version key is used for optimistic concurrency
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700238// control. At minimum, an object implementing the Transaction interface is
Himabindu Pucha0ff9b0c2015-07-14 11:40:45 -0700239// required since this is a Get operation.
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700240func GetVersion(ctx *context.T, tx store.Transaction, key []byte) ([]byte, error) {
241 switch w := tx.(type) {
Himabindu Pucha0ff9b0c2015-07-14 11:40:45 -0700242 case *transaction:
243 w.mu.Lock()
244 defer w.mu.Unlock()
245 if w.err != nil {
246 return nil, convertError(w.err)
247 }
248 return getVersion(w.itx, key)
Himabindu Pucha0ff9b0c2015-07-14 11:40:45 -0700249 }
250 return nil, verror.New(verror.ErrInternal, ctx, "unsupported store type")
251}
252
253// GetAtVersion returns the value of a managed key at the requested
254// version. This method is used by the Sync module when the responder needs to
255// send objects over the wire. At minimum, an object implementing the
256// StoreReader interface is required since this is a Get operation.
257func GetAtVersion(ctx *context.T, st store.StoreReader, key, valbuf, version []byte) ([]byte, error) {
258 switch w := st.(type) {
Sergey Rogulenko40402b52015-08-10 15:09:48 -0700259 case *snapshot:
260 return getAtVersion(w.isn, key, valbuf, version)
Himabindu Pucha0ff9b0c2015-07-14 11:40:45 -0700261 case *transaction:
262 w.mu.Lock()
263 defer w.mu.Unlock()
264 if w.err != nil {
265 return valbuf, convertError(w.err)
266 }
267 return getAtVersion(w.itx, key, valbuf, version)
268 case *wstore:
269 return getAtVersion(w.ist, key, valbuf, version)
270 }
271 return nil, verror.New(verror.ErrInternal, ctx, "unsupported store type")
272}
273
274// PutAtVersion puts a value for the managed key at the requested version. This
275// method is used by the Sync module exclusively when the initiator adds objects
276// with versions created on other Syncbases. At minimum, an object implementing
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700277// the Transaction interface is required since this is a Put operation.
278func PutAtVersion(ctx *context.T, tx store.Transaction, key, valbuf, version []byte) error {
Himabindu Pucha0ff9b0c2015-07-14 11:40:45 -0700279 wtx := tx.(*transaction)
280
281 wtx.mu.Lock()
282 defer wtx.mu.Unlock()
283 if wtx.err != nil {
284 return convertError(wtx.err)
285 }
286
287 // Note that we do not enqueue a PutOp in the log since this Put is not
288 // updating the current version of a key.
289 return wtx.itx.Put(makeAtVersionKey(key, version), valbuf)
290}
291
292// PutVersion updates the version of a managed key to the requested
293// version. This method is used by the Sync module exclusively when the
294// initiator selects which of the already stored versions (via PutAtVersion
295// calls) becomes the current version. At minimum, an object implementing
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700296// the Transaction interface is required since this is a Put operation.
297func PutVersion(ctx *context.T, tx store.Transaction, key, version []byte) error {
Himabindu Pucha0ff9b0c2015-07-14 11:40:45 -0700298 wtx := tx.(*transaction)
299
300 wtx.mu.Lock()
301 defer wtx.mu.Unlock()
302 if wtx.err != nil {
303 return convertError(wtx.err)
304 }
305
306 if err := wtx.itx.Put(makeVersionKey(key), version); err != nil {
307 return err
308 }
Adam Sadovsky4dcc3532015-07-31 13:54:19 -0700309 wtx.ops = append(wtx.ops, &OpPut{PutOp{Key: cp(key), Version: cp(version)}})
Himabindu Pucha0ff9b0c2015-07-14 11:40:45 -0700310 return nil
311}