blob: 81b51d0fb08c206b111f42e6f939a75bc32a74de [file] [log] [blame]
Adam Sadovskyc18c8ca2015-05-08 18:05:46 -07001// Copyright 2015 The Vanadium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package memstore
6
7import (
Adam Sadovskyc18c8ca2015-05-08 18:05:46 -07008 "sync"
9 "time"
10
11 "v.io/syncbase/x/ref/services/syncbase/store"
Sergey Rogulenkoa53e60f2015-05-22 11:05:01 -070012 "v.io/v23/verror"
Adam Sadovskyc18c8ca2015-05-08 18:05:46 -070013)
14
Sergey Rogulenkoa53e60f2015-05-22 11:05:01 -070015const (
16 txnTimeout = time.Duration(5) * time.Second
Adam Sadovskyc18c8ca2015-05-08 18:05:46 -070017)
18
19type transaction struct {
Sergey Rogulenko95baa662015-05-22 15:07:06 -070020 mu sync.Mutex
21 node *store.ResourceNode
22 st *memstore
23 sn *snapshot
Adam Sadovskyc18c8ca2015-05-08 18:05:46 -070024 // The following fields are used to determine whether method calls should
25 // error out.
26 err error
27 seq uint64
28 createdTime time.Time
29 // The following fields track writes performed against this transaction.
30 puts map[string][]byte
31 deletes map[string]struct{}
32}
33
34var _ store.Transaction = (*transaction)(nil)
35
Sergey Rogulenko95baa662015-05-22 15:07:06 -070036func newTransaction(st *memstore, parent *store.ResourceNode, seq uint64) *transaction {
37 node := store.NewResourceNode()
38 sn := newSnapshot(st, node)
39 tx := &transaction{
40 node: node,
Adam Sadovskyc18c8ca2015-05-08 18:05:46 -070041 st: st,
Sergey Rogulenko95baa662015-05-22 15:07:06 -070042 sn: sn,
Adam Sadovskyc18c8ca2015-05-08 18:05:46 -070043 seq: seq,
44 createdTime: time.Now(),
45 puts: map[string][]byte{},
46 deletes: map[string]struct{}{},
47 }
Sergey Rogulenko95baa662015-05-22 15:07:06 -070048 parent.AddChild(tx.node, func() {
49 tx.Abort()
50 })
51 return tx
Adam Sadovskyc18c8ca2015-05-08 18:05:46 -070052}
53
54func (tx *transaction) expired() bool {
55 return time.Now().After(tx.createdTime.Add(txnTimeout))
56}
57
58func (tx *transaction) error() error {
59 if tx.err != nil {
Sergey Rogulenkoa53e60f2015-05-22 11:05:01 -070060 return store.WrapError(tx.err)
Adam Sadovskyc18c8ca2015-05-08 18:05:46 -070061 }
62 if tx.expired() {
Sergey Rogulenkoa53e60f2015-05-22 11:05:01 -070063 return verror.New(verror.ErrBadState, nil, "expired transaction")
Adam Sadovskyc18c8ca2015-05-08 18:05:46 -070064 }
65 return nil
66}
67
Adam Sadovskyc18c8ca2015-05-08 18:05:46 -070068// Get implements the store.StoreReader interface.
69func (tx *transaction) Get(key, valbuf []byte) ([]byte, error) {
70 tx.mu.Lock()
71 defer tx.mu.Unlock()
72 if err := tx.error(); err != nil {
Sergey Rogulenko0dbfe072015-05-19 20:10:18 -070073 return valbuf, err
Adam Sadovskyc18c8ca2015-05-08 18:05:46 -070074 }
75 return tx.sn.Get(key, valbuf)
76}
77
78// Scan implements the store.StoreReader interface.
Sergey Rogulenkodef3b302015-05-20 17:33:24 -070079func (tx *transaction) Scan(start, limit []byte) store.Stream {
Adam Sadovskyc18c8ca2015-05-08 18:05:46 -070080 tx.mu.Lock()
81 defer tx.mu.Unlock()
82 if err := tx.error(); err != nil {
Sergey Rogulenko0dbfe072015-05-19 20:10:18 -070083 return &store.InvalidStream{err}
Adam Sadovskyc18c8ca2015-05-08 18:05:46 -070084 }
Sergey Rogulenko95baa662015-05-22 15:07:06 -070085 return tx.sn.Scan(start, limit)
Adam Sadovskyc18c8ca2015-05-08 18:05:46 -070086}
87
88// Put implements the store.StoreWriter interface.
89func (tx *transaction) Put(key, value []byte) error {
90 tx.st.mu.Lock()
91 defer tx.st.mu.Unlock()
92 if err := tx.error(); err != nil {
93 return err
94 }
95 delete(tx.deletes, string(key))
96 tx.puts[string(key)] = value
97 return nil
98}
99
100// Delete implements the store.StoreWriter interface.
101func (tx *transaction) Delete(key []byte) error {
102 tx.st.mu.Lock()
103 defer tx.st.mu.Unlock()
104 if err := tx.error(); err != nil {
105 return err
106 }
107 delete(tx.puts, string(key))
108 tx.deletes[string(key)] = struct{}{}
109 return nil
110}
111
112// Commit implements the store.Transaction interface.
113func (tx *transaction) Commit() error {
114 tx.mu.Lock()
115 defer tx.mu.Unlock()
116 if err := tx.error(); err != nil {
117 return err
118 }
Sergey Rogulenko95baa662015-05-22 15:07:06 -0700119 tx.node.Close()
Adam Sadovskyc18c8ca2015-05-08 18:05:46 -0700120 tx.st.mu.Lock()
121 defer tx.st.mu.Unlock() // note, defer is last-in-first-out
122 if tx.seq <= tx.st.lastCommitSeq {
123 // Once Commit() has failed with store.ErrConcurrentTransaction, subsequent
Sergey Rogulenkoa53e60f2015-05-22 11:05:01 -0700124 // ops on the transaction will fail with the following error.
125 tx.err = verror.New(verror.ErrBadState, nil, "already attempted to commit transaction")
126 return store.NewErrConcurrentTransaction(nil)
Adam Sadovskyc18c8ca2015-05-08 18:05:46 -0700127 }
Sergey Rogulenkoa53e60f2015-05-22 11:05:01 -0700128 tx.err = verror.New(verror.ErrBadState, nil, "committed transaction")
Adam Sadovskyc18c8ca2015-05-08 18:05:46 -0700129 for k, v := range tx.puts {
130 tx.st.data[k] = v
131 }
132 for k := range tx.deletes {
133 delete(tx.st.data, k)
134 }
135 tx.st.lastCommitSeq = tx.st.lastSeq
136 return nil
137}
138
139// Abort implements the store.Transaction interface.
140func (tx *transaction) Abort() error {
141 tx.mu.Lock()
142 defer tx.mu.Unlock()
143 if err := tx.error(); err != nil {
144 return err
145 }
Sergey Rogulenko95baa662015-05-22 15:07:06 -0700146 tx.node.Close()
Sergey Rogulenkoa53e60f2015-05-22 11:05:01 -0700147 tx.err = verror.New(verror.ErrCanceled, nil, "aborted transaction")
Adam Sadovskyc18c8ca2015-05-08 18:05:46 -0700148 return nil
149}