blob: ace48de86116e3609581edff8d00da82699ae790 [file] [log] [blame]
Adam Sadovsky8db74432015-05-29 17:37:32 -07001// Copyright 2015 The Vanadium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package watchable
6
7import (
8 "fmt"
Adam Sadovskyb2a65692015-05-29 21:57:40 -07009 "math"
Adam Sadovsky8db74432015-05-29 17:37:32 -070010 "sync"
11
12 "v.io/syncbase/x/ref/services/syncbase/server/util"
13 "v.io/syncbase/x/ref/services/syncbase/store"
14 "v.io/v23/verror"
15)
16
17type transaction struct {
18 itx store.Transaction
19 st *wstore
20 mu sync.Mutex // protects the fields below
21 err error
22 ops []Op
23}
24
25var _ store.Transaction = (*transaction)(nil)
26
27func newTransaction(st *wstore) *transaction {
28 return &transaction{
29 itx: st.ist.NewTransaction(),
30 st: st,
31 }
32}
33
34// Get implements the store.StoreReader interface.
35func (tx *transaction) Get(key, valbuf []byte) ([]byte, error) {
36 tx.mu.Lock()
37 defer tx.mu.Unlock()
38 if tx.err != nil {
Adam Sadovskya3fc33c2015-06-02 18:44:46 -070039 return valbuf, convertError(tx.err)
Adam Sadovsky8db74432015-05-29 17:37:32 -070040 }
41 var err error
42 if !tx.st.managesKey(key) {
43 valbuf, err = tx.itx.Get(key, valbuf)
44 } else {
45 valbuf, err = getVersioned(tx.itx, key, valbuf)
46 tx.ops = append(tx.ops, &OpGet{GetOp{Key: key}})
47 }
48 return valbuf, err
49}
50
51// Scan implements the store.StoreReader interface.
52func (tx *transaction) Scan(start, limit []byte) store.Stream {
53 tx.mu.Lock()
54 defer tx.mu.Unlock()
55 if tx.err != nil {
56 return &store.InvalidStream{tx.err}
57 }
58 var it store.Stream
59 if !tx.st.managesRange(start, limit) {
60 it = tx.itx.Scan(start, limit)
61 } else {
62 it = newStreamVersioned(tx.itx, start, limit)
63 tx.ops = append(tx.ops, &OpScan{ScanOp{Start: start, Limit: limit}})
64 }
65 return it
66}
67
68// Put implements the store.StoreWriter interface.
69func (tx *transaction) Put(key, value []byte) error {
70 tx.mu.Lock()
71 defer tx.mu.Unlock()
72 if tx.err != nil {
Adam Sadovskya3fc33c2015-06-02 18:44:46 -070073 return convertError(tx.err)
Adam Sadovsky8db74432015-05-29 17:37:32 -070074 }
75 var err error
76 if !tx.st.managesKey(key) {
77 err = tx.itx.Put(key, value)
78 } else {
79 err = putVersioned(tx.itx, key, value)
80 tx.ops = append(tx.ops, &OpPut{PutOp{Key: key, Value: value}})
81 }
82 return err
83}
84
85// Delete implements the store.StoreWriter interface.
86func (tx *transaction) Delete(key []byte) error {
87 tx.mu.Lock()
88 defer tx.mu.Unlock()
89 if tx.err != nil {
Adam Sadovskya3fc33c2015-06-02 18:44:46 -070090 return convertError(tx.err)
Adam Sadovsky8db74432015-05-29 17:37:32 -070091 }
92 var err error
93 if !tx.st.managesKey(key) {
94 err = tx.itx.Delete(key)
95 } else {
96 err = deleteVersioned(tx.itx, key)
97 tx.ops = append(tx.ops, &OpDelete{DeleteOp{Key: key}})
98 }
99 return err
100}
101
102// Commit implements the store.Transaction interface.
103func (tx *transaction) Commit() error {
104 tx.mu.Lock()
105 defer tx.mu.Unlock()
106 if tx.err != nil {
Adam Sadovskya3fc33c2015-06-02 18:44:46 -0700107 return convertError(tx.err)
Adam Sadovsky8db74432015-05-29 17:37:32 -0700108 }
109 tx.err = verror.New(verror.ErrBadState, nil, store.ErrMsgCommittedTxn)
110 tx.st.mu.Lock()
111 defer tx.st.mu.Unlock()
112 // Check sequence numbers.
Adam Sadovskyb2a65692015-05-29 21:57:40 -0700113 if uint64(len(tx.ops)) > math.MaxUint16 {
Adam Sadovsky8db74432015-05-29 17:37:32 -0700114 return verror.New(verror.ErrInternal, nil, "too many ops")
115 }
Adam Sadovskyb2a65692015-05-29 21:57:40 -0700116 if tx.st.seq == math.MaxUint64 {
Adam Sadovsky8db74432015-05-29 17:37:32 -0700117 return verror.New(verror.ErrInternal, nil, "seq maxed out")
118 }
119 // Write LogEntry records.
Adam Sadovskyb2a65692015-05-29 21:57:40 -0700120 // Note, MaxUint16 is 0xffff and MaxUint64 is 0xffffffffffffffff.
Adam Sadovsky8db74432015-05-29 17:37:32 -0700121 // TODO(sadovsky): Use a more space-efficient lexicographic number encoding.
Adam Sadovskyb2a65692015-05-29 21:57:40 -0700122 keyPrefix := join(util.LogPrefix, fmt.Sprintf("%016x", tx.st.seq))
Adam Sadovsky8db74432015-05-29 17:37:32 -0700123 for txSeq, op := range tx.ops {
Adam Sadovskyb2a65692015-05-29 21:57:40 -0700124 key := join(keyPrefix, fmt.Sprintf("%04x", txSeq))
Adam Sadovsky8db74432015-05-29 17:37:32 -0700125 value := &LogEntry{
126 Op: op,
127 // TODO(sadovsky): This information is also captured in LogEntry keys.
128 // Optimize to avoid redundancy.
129 Continued: txSeq < len(tx.ops)-1,
130 }
131 if err := util.PutObject(tx.itx, key, value); err != nil {
132 return err
133 }
134 }
135 if err := tx.itx.Commit(); err != nil {
136 return err
137 }
138 tx.st.seq++
139 return nil
140}
141
142// Abort implements the store.Transaction interface.
143func (tx *transaction) Abort() error {
144 tx.mu.Lock()
145 defer tx.mu.Unlock()
146 if tx.err != nil {
Adam Sadovskya3fc33c2015-06-02 18:44:46 -0700147 return convertError(tx.err)
Adam Sadovsky8db74432015-05-29 17:37:32 -0700148 }
149 tx.err = verror.New(verror.ErrCanceled, nil, store.ErrMsgAbortedTxn)
150 return tx.itx.Abort()
151}