blob: 2818a8c7214fe44d6495c91a791b718d59ee0e3b [file] [log] [blame]
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -07001// Copyright 2015 The Vanadium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package nosql
6
7import (
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -07008 "math/rand"
Adam Sadovsky232c3662015-06-04 15:00:09 -07009 "path"
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -070010 "strconv"
11 "strings"
12 "sync"
13 "time"
Adam Sadovsky232c3662015-06-04 15:00:09 -070014
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -070015 wire "v.io/syncbase/v23/services/syncbase/nosql"
John Klinece0de2b2015-06-17 09:06:56 -070016 "v.io/syncbase/v23/syncbase/nosql/query_db"
17 "v.io/syncbase/v23/syncbase/nosql/query_exec"
Jatin Lodhiad70777b2015-08-19 16:59:47 -070018 "v.io/syncbase/x/ref/services/syncbase/clock"
Adam Sadovskybc00bd62015-05-22 12:50:03 -070019 "v.io/syncbase/x/ref/services/syncbase/server/interfaces"
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -070020 "v.io/syncbase/x/ref/services/syncbase/server/util"
Adam Sadovsky8db74432015-05-29 17:37:32 -070021 "v.io/syncbase/x/ref/services/syncbase/server/watchable"
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -070022 "v.io/syncbase/x/ref/services/syncbase/store"
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -070023 "v.io/v23/context"
Robin Thellend2ff665b2015-08-06 13:41:02 -070024 "v.io/v23/glob"
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -070025 "v.io/v23/rpc"
26 "v.io/v23/security/access"
John Klinece0de2b2015-06-17 09:06:56 -070027 "v.io/v23/vdl"
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -070028 "v.io/v23/verror"
John Klinece0de2b2015-06-17 09:06:56 -070029 "v.io/v23/vom"
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -070030 "v.io/x/lib/vlog"
31)
32
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -070033// database is a per-database singleton (i.e. not per-request). It does not
34// directly handle RPCs.
35// Note: If a database does not exist at the time of a database RPC, the
36// dispatcher creates a short-lived database object to service that particular
37// request.
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -070038type database struct {
39 name string
Adam Sadovskybc00bd62015-05-22 12:50:03 -070040 a interfaces.App
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -070041 // The fields below are initialized iff this database exists.
Adam Sadovskyd6b9a232015-06-16 14:04:45 -070042 exists bool
Adam Sadovskyb6a5aa32015-07-07 13:05:26 -070043 // TODO(sadovsky): Make st point to a store.Store wrapper that handles paging,
44 // and do not actually open the store in NewDatabase.
45 st store.Store // stores all data for a single database
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -070046
47 // Active snapshots and transactions corresponding to client batches.
48 // TODO(sadovsky): Add timeouts and GC.
49 mu sync.Mutex // protects the fields below
50 sns map[uint64]store.Snapshot
51 txs map[uint64]store.Transaction
52}
53
54// databaseReq is a per-request object that handles Database RPCs.
55// It embeds database and tracks request-specific batch state.
56type databaseReq struct {
57 *database
58 // If non-nil, sn or tx will be non-nil.
59 batchId *uint64
60 sn store.Snapshot
61 tx store.Transaction
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -070062}
63
64var (
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -070065 _ wire.DatabaseServerMethods = (*databaseReq)(nil)
Adam Sadovskybc00bd62015-05-22 12:50:03 -070066 _ interfaces.Database = (*database)(nil)
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -070067)
68
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -070069// DatabaseOptions configures a database.
Adam Sadovsky232c3662015-06-04 15:00:09 -070070type DatabaseOptions struct {
71 // Database-level permissions.
72 Perms access.Permissions
73 // Root dir for data storage.
74 RootDir string
75 // Storage engine to use.
76 Engine string
77}
78
Adam Sadovskyb6a5aa32015-07-07 13:05:26 -070079// OpenDatabase opens a database and returns a *database for it. Designed for
80// use from within NewDatabase and server.NewService.
81func OpenDatabase(ctx *context.T, a interfaces.App, name string, opts DatabaseOptions, openOpts util.OpenOptions) (*database, error) {
82 st, err := util.OpenStore(opts.Engine, path.Join(opts.RootDir, opts.Engine), openOpts)
Adam Sadovsky232c3662015-06-04 15:00:09 -070083 if err != nil {
84 return nil, err
85 }
Jatin Lodhiad70777b2015-08-19 16:59:47 -070086 vclock := clock.NewVClock(a.Service().St())
87 st, err = watchable.Wrap(st, vclock, &watchable.Options{
Raja Daoudd4543072015-06-30 11:15:55 -070088 ManagedPrefixes: []string{util.RowPrefix, util.PermsPrefix},
Adam Sadovsky8db74432015-05-29 17:37:32 -070089 })
Adam Sadovsky63fafbb2015-05-26 14:22:31 -070090 if err != nil {
91 return nil, err
92 }
Adam Sadovskyb6a5aa32015-07-07 13:05:26 -070093 return &database{
Adam Sadovskyd6b9a232015-06-16 14:04:45 -070094 name: name,
95 a: a,
96 exists: true,
97 st: st,
98 sns: make(map[uint64]store.Snapshot),
99 txs: make(map[uint64]store.Transaction),
Adam Sadovskyb6a5aa32015-07-07 13:05:26 -0700100 }, nil
101}
102
103// NewDatabase creates a new database instance and returns it.
Adam Sadovskyb6a5aa32015-07-07 13:05:26 -0700104// Designed for use from within App.CreateNoSQLDatabase.
Jatin Lodhiaf6486d42015-07-17 15:57:36 -0700105func NewDatabase(ctx *context.T, a interfaces.App, name string, metadata *wire.SchemaMetadata, opts DatabaseOptions) (*database, error) {
Adam Sadovskyb6a5aa32015-07-07 13:05:26 -0700106 if opts.Perms == nil {
107 return nil, verror.New(verror.ErrInternal, ctx, "perms must be specified")
108 }
109 d, err := OpenDatabase(ctx, a, name, opts, util.OpenOptions{CreateIfMissing: true, ErrorIfExists: true})
110 if err != nil {
111 return nil, err
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -0700112 }
113 data := &databaseData{
Jatin Lodhiaf6486d42015-07-17 15:57:36 -0700114 Name: d.name,
115 Perms: opts.Perms,
116 SchemaMetadata: metadata,
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -0700117 }
Adam Sadovskya31a7cd2015-07-08 10:44:07 -0700118 if err := util.Put(ctx, d.st, d.stKey(), data); err != nil {
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -0700119 return nil, err
120 }
121 return d, nil
122}
123
124////////////////////////////////////////
125// RPC methods
126
Jatin Lodhia28417552015-07-28 14:17:56 -0700127func (d *databaseReq) Create(ctx *context.T, call rpc.ServerCall, metadata *wire.SchemaMetadata, perms access.Permissions) error {
Adam Sadovskyd6b9a232015-06-16 14:04:45 -0700128 if d.exists {
129 return verror.New(verror.ErrExist, ctx, d.name)
130 }
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -0700131 if d.batchId != nil {
132 return wire.NewErrBoundToBatch(ctx)
133 }
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -0700134 // This database does not yet exist; d is just an ephemeral handle that holds
135 // {name string, a *app}. d.a.CreateNoSQLDatabase will create a new database
136 // handle and store it in d.a.dbs[d.name].
Jatin Lodhiaf6486d42015-07-17 15:57:36 -0700137 return d.a.CreateNoSQLDatabase(ctx, call, d.name, perms, metadata)
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -0700138}
139
Jatin Lodhia28417552015-07-28 14:17:56 -0700140func (d *databaseReq) Delete(ctx *context.T, call rpc.ServerCall, schemaVersion int32) error {
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -0700141 if d.batchId != nil {
142 return wire.NewErrBoundToBatch(ctx)
143 }
Jatin Lodhia28417552015-07-28 14:17:56 -0700144 if err := d.checkSchemaVersion(ctx, schemaVersion); err != nil {
145 return err
146 }
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -0700147 return d.a.DeleteNoSQLDatabase(ctx, call, d.name)
148}
149
Jatin Lodhia28417552015-07-28 14:17:56 -0700150func (d *databaseReq) Exists(ctx *context.T, call rpc.ServerCall, schemaVersion int32) (bool, error) {
Ivan Pilat8e4a4ab2015-07-14 12:14:52 -0700151 if !d.exists {
152 return false, nil
153 }
Jatin Lodhia28417552015-07-28 14:17:56 -0700154 if err := d.checkSchemaVersion(ctx, schemaVersion); err != nil {
155 return false, err
156 }
Ivan Pilat8e4a4ab2015-07-14 12:14:52 -0700157 return util.ErrorToExists(util.GetWithAuth(ctx, call, d.st, d.stKey(), &databaseData{}))
158}
159
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -0700160var rng *rand.Rand = rand.New(rand.NewSource(time.Now().UTC().UnixNano()))
161
Jatin Lodhia28417552015-07-28 14:17:56 -0700162func (d *databaseReq) BeginBatch(ctx *context.T, call rpc.ServerCall, schemaVersion int32, bo wire.BatchOptions) (string, error) {
Adam Sadovskyd6b9a232015-06-16 14:04:45 -0700163 if !d.exists {
164 return "", verror.New(verror.ErrNoExist, ctx, d.name)
165 }
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -0700166 if d.batchId != nil {
167 return "", wire.NewErrBoundToBatch(ctx)
168 }
Jatin Lodhia28417552015-07-28 14:17:56 -0700169 if err := d.checkSchemaVersion(ctx, schemaVersion); err != nil {
170 return "", err
171 }
172
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -0700173 d.mu.Lock()
174 defer d.mu.Unlock()
175 var id uint64
176 var batchType string
177 for {
178 id = uint64(rng.Int63())
179 if bo.ReadOnly {
180 if _, ok := d.sns[id]; !ok {
181 d.sns[id] = d.st.NewSnapshot()
182 batchType = "sn"
183 break
184 }
185 } else {
186 if _, ok := d.txs[id]; !ok {
187 d.txs[id] = d.st.NewTransaction()
188 batchType = "tx"
189 break
190 }
191 }
192 }
193 return strings.Join([]string{d.name, batchType, strconv.FormatUint(id, 10)}, util.BatchSep), nil
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -0700194}
195
Jatin Lodhia28417552015-07-28 14:17:56 -0700196func (d *databaseReq) Commit(ctx *context.T, call rpc.ServerCall, schemaVersion int32) error {
Adam Sadovskyd6b9a232015-06-16 14:04:45 -0700197 if !d.exists {
198 return verror.New(verror.ErrNoExist, ctx, d.name)
199 }
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -0700200 if d.batchId == nil {
201 return wire.NewErrNotBoundToBatch(ctx)
202 }
203 if d.tx == nil {
204 return wire.NewErrReadOnlyBatch(ctx)
205 }
Jatin Lodhia28417552015-07-28 14:17:56 -0700206 if err := d.checkSchemaVersion(ctx, schemaVersion); err != nil {
207 return err
208 }
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -0700209 var err error
210 if err = d.tx.Commit(); err == nil {
211 d.mu.Lock()
212 delete(d.txs, *d.batchId)
213 d.mu.Unlock()
214 }
Adam Sadovskyb5f88922015-07-26 17:41:37 -0700215 if verror.ErrorID(err) == store.ErrConcurrentTransaction.ID {
216 return verror.New(wire.ErrConcurrentBatch, ctx, err)
217 }
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -0700218 return err
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -0700219}
220
Jatin Lodhia28417552015-07-28 14:17:56 -0700221func (d *databaseReq) Abort(ctx *context.T, call rpc.ServerCall, schemaVersion int32) error {
Adam Sadovskyd6b9a232015-06-16 14:04:45 -0700222 if !d.exists {
223 return verror.New(verror.ErrNoExist, ctx, d.name)
224 }
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -0700225 if d.batchId == nil {
226 return wire.NewErrNotBoundToBatch(ctx)
227 }
Jatin Lodhia28417552015-07-28 14:17:56 -0700228 if err := d.checkSchemaVersion(ctx, schemaVersion); err != nil {
229 return err
230 }
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -0700231 var err error
232 if d.tx != nil {
233 if err = d.tx.Abort(); err == nil {
234 d.mu.Lock()
235 delete(d.txs, *d.batchId)
236 d.mu.Unlock()
237 }
238 } else {
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700239 if err = d.sn.Abort(); err == nil {
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -0700240 d.mu.Lock()
241 delete(d.sns, *d.batchId)
242 d.mu.Unlock()
243 }
244 }
245 return err
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -0700246}
247
Jatin Lodhia28417552015-07-28 14:17:56 -0700248func (d *databaseReq) Exec(ctx *context.T, call wire.DatabaseExecServerCall, schemaVersion int32, q string) error {
249 if err := d.checkSchemaVersion(ctx, schemaVersion); err != nil {
250 return err
251 }
John Klinece0de2b2015-06-17 09:06:56 -0700252 impl := func(headers []string, rs ResultStream, err error) error {
253 if err != nil {
254 return err
255 }
256 sender := call.SendStream()
257 // Push the headers first -- the client will retrieve them and return
258 // them separately from the results.
259 var resultHeaders []*vdl.Value
260 for _, header := range headers {
261 resultHeaders = append(resultHeaders, vdl.ValueOf(header))
262 }
263 sender.Send(resultHeaders)
264 for rs.Advance() {
265 result := rs.Result()
Sergey Rogulenko40402b52015-08-10 15:09:48 -0700266 if err := sender.Send(result); err != nil {
267 rs.Cancel()
268 return err
269 }
John Klinece0de2b2015-06-17 09:06:56 -0700270 }
271 return rs.Err()
272 }
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700273 var sntx store.SnapshotOrTransaction
John Klinece0de2b2015-06-17 09:06:56 -0700274 if d.batchId != nil {
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700275 sntx = d.batchReader()
John Klinece0de2b2015-06-17 09:06:56 -0700276 } else {
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700277 sntx = d.st.NewSnapshot()
278 defer sntx.Abort()
John Klinece0de2b2015-06-17 09:06:56 -0700279 }
280 // queryDb implements query_db.Database
281 // which is needed by the query package's
282 // Exec function.
283 db := &queryDb{
284 ctx: ctx,
285 call: call,
286 req: d,
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700287 sntx: sntx,
John Klinece0de2b2015-06-17 09:06:56 -0700288 }
289
290 return impl(query_exec.Exec(db, q))
291}
292
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -0700293func (d *databaseReq) SetPermissions(ctx *context.T, call rpc.ServerCall, perms access.Permissions, version string) error {
Adam Sadovskyd6b9a232015-06-16 14:04:45 -0700294 if !d.exists {
295 return verror.New(verror.ErrNoExist, ctx, d.name)
296 }
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -0700297 if d.batchId != nil {
298 return wire.NewErrBoundToBatch(ctx)
299 }
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -0700300 return d.a.SetDatabasePerms(ctx, call, d.name, perms, version)
301}
302
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -0700303func (d *databaseReq) GetPermissions(ctx *context.T, call rpc.ServerCall) (perms access.Permissions, version string, err error) {
Adam Sadovskyd6b9a232015-06-16 14:04:45 -0700304 if !d.exists {
305 return nil, "", verror.New(verror.ErrNoExist, ctx, d.name)
306 }
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -0700307 if d.batchId != nil {
308 return nil, "", wire.NewErrBoundToBatch(ctx)
309 }
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -0700310 data := &databaseData{}
Adam Sadovskya31a7cd2015-07-08 10:44:07 -0700311 if err := util.GetWithAuth(ctx, call, d.st, d.stKey(), data); err != nil {
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -0700312 return nil, "", err
313 }
314 return data.Perms, util.FormatVersion(data.Version), nil
315}
316
Robin Thellend2ff665b2015-08-06 13:41:02 -0700317func (d *databaseReq) GlobChildren__(ctx *context.T, call rpc.GlobChildrenServerCall, matcher *glob.Element) error {
Adam Sadovskyd6b9a232015-06-16 14:04:45 -0700318 if !d.exists {
Robin Thellend2ff665b2015-08-06 13:41:02 -0700319 return verror.New(verror.ErrNoExist, ctx, d.name)
Adam Sadovskyd6b9a232015-06-16 14:04:45 -0700320 }
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -0700321 if d.batchId != nil {
Robin Thellend2ff665b2015-08-06 13:41:02 -0700322 return wire.NewErrBoundToBatch(ctx)
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -0700323 }
Adam Sadovsky49261192015-05-19 17:39:59 -0700324 // Check perms.
325 sn := d.st.NewSnapshot()
Adam Sadovskya31a7cd2015-07-08 10:44:07 -0700326 if err := util.GetWithAuth(ctx, call, sn, d.stKey(), &databaseData{}); err != nil {
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700327 sn.Abort()
Robin Thellend2ff665b2015-08-06 13:41:02 -0700328 return err
Adam Sadovsky49261192015-05-19 17:39:59 -0700329 }
Robin Thellend2ff665b2015-08-06 13:41:02 -0700330 return util.Glob(ctx, call, matcher, sn, sn.Abort, util.TablePrefix)
Adam Sadovsky49261192015-05-19 17:39:59 -0700331}
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -0700332
333////////////////////////////////////////
John Klinece0de2b2015-06-17 09:06:56 -0700334// ResultStream interface
335
336// ResultStream is an interface for iterating through results (a.k.a, rows) returned from a
337// query. Each resulting rows are arrays of vdl objects.
338type ResultStream interface {
339 // Advance stages an element so the client can retrieve it with Result.
340 // Advance returns true iff there is a result to retrieve. The client must
341 // call Advance before calling Result. The client must call Cancel if it
342 // does not iterate through all elements (i.e. until Advance returns false).
343 // Advance may block if an element is not immediately available.
344 Advance() bool
345
346 // Result returns the row (i.e., array of vdl Values) that was staged by Advance.
347 // Result may panic if Advance returned false or was not called at all.
348 // Result does not block.
349 Result() []*vdl.Value
350
351 // Err returns a non-nil error iff the stream encountered any errors. Err does
352 // not block.
353 Err() error
354
355 // Cancel notifies the ResultStream provider that it can stop producing results.
356 // The client must call Cancel if it does not iterate through all results
357 // (i.e. until Advance returns false). Cancel is idempotent and can be called
358 // concurrently with a goroutine that is iterating via Advance/Result.
359 // Cancel causes Advance to subsequently return false. Cancel does not block.
360 Cancel()
361}
362
363////////////////////////////////////////
Adam Sadovskybc00bd62015-05-22 12:50:03 -0700364// interfaces.Database methods
365
366func (d *database) St() store.Store {
Adam Sadovskyd6b9a232015-06-16 14:04:45 -0700367 if !d.exists {
Adam Sadovskybc00bd62015-05-22 12:50:03 -0700368 vlog.Fatalf("database %q does not exist", d.name)
369 }
370 return d.st
371}
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -0700372
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -0700373func (d *database) App() interfaces.App {
374 return d.a
375}
376
Himabindu Pucha25a1d022015-07-21 20:46:33 -0700377func (d *database) CheckPermsInternal(ctx *context.T, call rpc.ServerCall, st store.StoreReader) error {
Adam Sadovskyd6b9a232015-06-16 14:04:45 -0700378 if !d.exists {
379 vlog.Fatalf("database %q does not exist", d.name)
380 }
Adam Sadovskya31a7cd2015-07-08 10:44:07 -0700381 return util.GetWithAuth(ctx, call, st, d.stKey(), &databaseData{})
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -0700382}
383
384func (d *database) SetPermsInternal(ctx *context.T, call rpc.ServerCall, perms access.Permissions, version string) error {
Adam Sadovskyd6b9a232015-06-16 14:04:45 -0700385 if !d.exists {
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -0700386 vlog.Fatalf("database %q does not exist", d.name)
387 }
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700388 return store.RunInTransaction(d.st, func(tx store.Transaction) error {
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -0700389 data := &databaseData{}
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700390 return util.UpdateWithAuth(ctx, call, tx, d.stKey(), data, func() error {
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -0700391 if err := util.CheckVersion(ctx, version, data.Version); err != nil {
392 return err
393 }
394 data.Perms = perms
395 data.Version++
396 return nil
397 })
398 })
399}
400
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -0700401func (d *database) Name() string {
402 return d.name
403}
404
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -0700405////////////////////////////////////////
Adam Sadovskya31a7cd2015-07-08 10:44:07 -0700406// query_db implementation
John Klinece0de2b2015-06-17 09:06:56 -0700407
408// Implement query_db's Database, Table and KeyValueStream interfaces.
409type queryDb struct {
410 ctx *context.T
411 call wire.DatabaseExecServerCall
412 req *databaseReq
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700413 sntx store.SnapshotOrTransaction
John Klinece0de2b2015-06-17 09:06:56 -0700414}
415
416func (db *queryDb) GetContext() *context.T {
417 return db.ctx
418}
419
420func (db *queryDb) GetTable(name string) (query_db.Table, error) {
421 tDb := &tableDb{
422 qdb: db,
423 req: &tableReq{
424 name: name,
425 d: db.req,
426 },
427 }
428 // Now that we have a table, we need to check permissions.
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700429 if err := util.GetWithAuth(db.ctx, db.call, db.sntx, tDb.req.stKey(), &tableData{}); err != nil {
John Klinece0de2b2015-06-17 09:06:56 -0700430 return nil, err
431 }
432 return tDb, nil
433}
434
435type tableDb struct {
436 qdb *queryDb
437 req *tableReq
438}
439
John Kline96b004f2015-06-19 15:20:00 -0700440func (t *tableDb) Scan(keyRanges query_db.KeyRanges) (query_db.KeyValueStream, error) {
John Kline18834bd2015-06-26 10:07:46 -0700441 streams := []store.Stream{}
442 for _, keyRange := range keyRanges {
John Kline18834bd2015-06-26 10:07:46 -0700443 // TODO(jkline): For now, acquire all of the streams at once to minimize the race condition.
444 // Need a way to Scan multiple ranges at the same state of uncommitted changes.
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700445 streams = append(streams, t.qdb.sntx.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, t.req.name), keyRange.Start, keyRange.Limit)))
John Kline18834bd2015-06-26 10:07:46 -0700446 }
John Klinece0de2b2015-06-17 09:06:56 -0700447 return &kvs{
John Kline8dbd2652015-07-07 09:27:36 -0700448 t: t,
449 curr: 0,
450 validRow: false,
451 it: streams,
452 err: nil,
John Klinece0de2b2015-06-17 09:06:56 -0700453 }, nil
454}
455
456type kvs struct {
457 t *tableDb
John Kline18834bd2015-06-26 10:07:46 -0700458 curr int
John Klinece0de2b2015-06-17 09:06:56 -0700459 validRow bool
460 currKey string
461 currValue *vdl.Value
John Kline18834bd2015-06-26 10:07:46 -0700462 it []store.Stream // array of store.Streams
John Klinece0de2b2015-06-17 09:06:56 -0700463 err error
464}
465
466func (s *kvs) Advance() bool {
467 if s.err != nil {
468 return false
469 }
John Kline8dbd2652015-07-07 09:27:36 -0700470 for s.curr < len(s.it) {
John Kline18834bd2015-06-26 10:07:46 -0700471 if s.it[s.curr].Advance() {
John Klinece0de2b2015-06-17 09:06:56 -0700472 // key
John Kline18834bd2015-06-26 10:07:46 -0700473 keyBytes := s.it[s.curr].Key(nil)
John Klinece0de2b2015-06-17 09:06:56 -0700474 parts := util.SplitKeyParts(string(keyBytes))
Sergey Rogulenko1b92b4f2015-06-18 23:06:11 -0700475 // TODO(rogulenko): Check access for the key.
John Klinece0de2b2015-06-17 09:06:56 -0700476 s.currKey = parts[len(parts)-1]
477 // value
John Kline18834bd2015-06-26 10:07:46 -0700478 valueBytes := s.it[s.curr].Value(nil)
John Klinece0de2b2015-06-17 09:06:56 -0700479 var currValue *vdl.Value
480 if err := vom.Decode(valueBytes, &currValue); err != nil {
481 s.validRow = false
482 s.err = err
483 return false
484 }
485 s.currValue = currValue
486 s.validRow = true
487 return true
488 }
489 // Advance returned false. It could be an err, or it could
490 // be we've reached the end.
John Kline18834bd2015-06-26 10:07:46 -0700491 if err := s.it[s.curr].Err(); err != nil {
John Klinece0de2b2015-06-17 09:06:56 -0700492 s.validRow = false
493 s.err = err
494 return false
495 }
John Kline18834bd2015-06-26 10:07:46 -0700496 // We've reached the end of the iterator for this keyRange.
John Klinece0de2b2015-06-17 09:06:56 -0700497 // Jump to the next one.
John Kline8fe4a402015-07-23 16:40:19 -0700498 s.it[s.curr] = nil
John Klinece0de2b2015-06-17 09:06:56 -0700499 s.curr++
John Klinece0de2b2015-06-17 09:06:56 -0700500 s.validRow = false
501 }
502 // There are no more prefixes to scan.
503 return false
504}
505
506func (s *kvs) KeyValue() (string, *vdl.Value) {
507 if !s.validRow {
508 return "", nil
509 }
510 return s.currKey, s.currValue
511}
512
513func (s *kvs) Err() error {
514 return s.err
515}
516
517func (s *kvs) Cancel() {
518 if s.it != nil {
John Kline18834bd2015-06-26 10:07:46 -0700519 for i := s.curr; i < len(s.it); i++ {
520 s.it[i].Cancel()
521 }
John Klinece0de2b2015-06-17 09:06:56 -0700522 s.it = nil
523 }
John Kline96b004f2015-06-19 15:20:00 -0700524 // set curr to end of keyRanges so Advance will return false
John Kline8dbd2652015-07-07 09:27:36 -0700525 s.curr = len(s.it)
John Klinece0de2b2015-06-17 09:06:56 -0700526}
527
528////////////////////////////////////////
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -0700529// Internal helpers
530
Adam Sadovskya31a7cd2015-07-08 10:44:07 -0700531func (d *database) stKey() string {
532 return util.DatabasePrefix
533}
534
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700535func (d *databaseReq) batchReader() store.SnapshotOrTransaction {
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -0700536 if d.batchId == nil {
537 return nil
538 } else if d.sn != nil {
539 return d.sn
540 } else {
541 return d.tx
542 }
543}
544
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700545func (d *databaseReq) batchTransaction() (store.Transaction, error) {
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -0700546 if d.batchId == nil {
547 return nil, nil
548 } else if d.tx != nil {
549 return d.tx, nil
550 } else {
551 return nil, wire.NewErrReadOnlyBatch(nil)
552 }
553}
Jatin Lodhia28417552015-07-28 14:17:56 -0700554
555// TODO(jlodhia): Schema check should happen within a transaction for each
556// operation in database, table and row. Do schema check along with permissions
557// check when fully-specified permission model is implemented.
558func (d *databaseReq) checkSchemaVersion(ctx *context.T, schemaVersion int32) error {
559 if !d.exists {
560 // database does not exist yet and hence there is no schema to check.
561 // This can happen if delete is called twice on the same database.
562 return nil
563 }
564 schemaMetadata, err := d.getSchemaMetadataWithoutAuth(ctx)
565 if err != nil {
566 return err
567 }
568 if (schemaMetadata == nil) || (schemaMetadata.Version == schemaVersion) {
569 return nil
570 }
571 return wire.NewErrSchemaVersionMismatch(ctx)
572}