blob: a9e7ac92bf13a9187a68af0b7cba3620b32ed087 [file] [log] [blame]
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -07001// Copyright 2015 The Vanadium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package nosql
6
7import (
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -07008 "math/rand"
Adam Sadovsky232c3662015-06-04 15:00:09 -07009 "path"
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -070010 "strconv"
11 "strings"
12 "sync"
13 "time"
Adam Sadovsky232c3662015-06-04 15:00:09 -070014
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -070015 wire "v.io/syncbase/v23/services/syncbase/nosql"
John Klinece0de2b2015-06-17 09:06:56 -070016 "v.io/syncbase/v23/syncbase/nosql/query_db"
17 "v.io/syncbase/v23/syncbase/nosql/query_exec"
Raja Daoud7eea2da2015-07-17 19:27:56 -070018 "v.io/syncbase/x/ref/services/syncbase/localblobstore"
19 "v.io/syncbase/x/ref/services/syncbase/localblobstore/fs_cablobstore"
Adam Sadovskybc00bd62015-05-22 12:50:03 -070020 "v.io/syncbase/x/ref/services/syncbase/server/interfaces"
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -070021 "v.io/syncbase/x/ref/services/syncbase/server/util"
Adam Sadovsky8db74432015-05-29 17:37:32 -070022 "v.io/syncbase/x/ref/services/syncbase/server/watchable"
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -070023 "v.io/syncbase/x/ref/services/syncbase/store"
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -070024 "v.io/v23/context"
25 "v.io/v23/rpc"
26 "v.io/v23/security/access"
Sergey Rogulenko448a5a02015-07-17 17:54:07 -070027 "v.io/v23/services/watch"
John Klinece0de2b2015-06-17 09:06:56 -070028 "v.io/v23/vdl"
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -070029 "v.io/v23/verror"
John Klinece0de2b2015-06-17 09:06:56 -070030 "v.io/v23/vom"
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -070031 "v.io/x/lib/vlog"
32)
33
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -070034// database is a per-database singleton (i.e. not per-request). It does not
35// directly handle RPCs.
36// Note: If a database does not exist at the time of a database RPC, the
37// dispatcher creates a short-lived database object to service that particular
38// request.
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -070039type database struct {
40 name string
Adam Sadovskybc00bd62015-05-22 12:50:03 -070041 a interfaces.App
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -070042 // The fields below are initialized iff this database exists.
Adam Sadovskyd6b9a232015-06-16 14:04:45 -070043 exists bool
Adam Sadovskyb6a5aa32015-07-07 13:05:26 -070044 // TODO(sadovsky): Make st point to a store.Store wrapper that handles paging,
45 // and do not actually open the store in NewDatabase.
46 st store.Store // stores all data for a single database
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -070047
Raja Daoud7eea2da2015-07-17 19:27:56 -070048 // Local blob store associated with this database.
49 bst localblobstore.BlobStore
50
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -070051 // Active snapshots and transactions corresponding to client batches.
52 // TODO(sadovsky): Add timeouts and GC.
53 mu sync.Mutex // protects the fields below
54 sns map[uint64]store.Snapshot
55 txs map[uint64]store.Transaction
56}
57
58// databaseReq is a per-request object that handles Database RPCs.
59// It embeds database and tracks request-specific batch state.
60type databaseReq struct {
61 *database
62 // If non-nil, sn or tx will be non-nil.
63 batchId *uint64
64 sn store.Snapshot
65 tx store.Transaction
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -070066}
67
68var (
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -070069 _ wire.DatabaseServerMethods = (*databaseReq)(nil)
Adam Sadovskybc00bd62015-05-22 12:50:03 -070070 _ interfaces.Database = (*database)(nil)
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -070071)
72
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -070073// DatabaseOptions configures a database.
Adam Sadovsky232c3662015-06-04 15:00:09 -070074type DatabaseOptions struct {
75 // Database-level permissions.
76 Perms access.Permissions
77 // Root dir for data storage.
78 RootDir string
79 // Storage engine to use.
80 Engine string
81}
82
Adam Sadovskyb6a5aa32015-07-07 13:05:26 -070083// OpenDatabase opens a database and returns a *database for it. Designed for
84// use from within NewDatabase and server.NewService.
85func OpenDatabase(ctx *context.T, a interfaces.App, name string, opts DatabaseOptions, openOpts util.OpenOptions) (*database, error) {
86 st, err := util.OpenStore(opts.Engine, path.Join(opts.RootDir, opts.Engine), openOpts)
Adam Sadovsky232c3662015-06-04 15:00:09 -070087 if err != nil {
88 return nil, err
89 }
90 st, err = watchable.Wrap(st, &watchable.Options{
Raja Daoudd4543072015-06-30 11:15:55 -070091 ManagedPrefixes: []string{util.RowPrefix, util.PermsPrefix},
Adam Sadovsky8db74432015-05-29 17:37:32 -070092 })
Adam Sadovsky63fafbb2015-05-26 14:22:31 -070093 if err != nil {
94 return nil, err
95 }
Raja Daoud7eea2da2015-07-17 19:27:56 -070096 // Open a co-located blob store, adjacent to the structured store.
97 bst, err := fs_cablobstore.Create(ctx, path.Join(opts.RootDir, "blobs"))
98 if err != nil {
99 return nil, err
100 }
Adam Sadovskyb6a5aa32015-07-07 13:05:26 -0700101 return &database{
Adam Sadovskyd6b9a232015-06-16 14:04:45 -0700102 name: name,
103 a: a,
104 exists: true,
105 st: st,
Raja Daoud7eea2da2015-07-17 19:27:56 -0700106 bst: bst,
Adam Sadovskyd6b9a232015-06-16 14:04:45 -0700107 sns: make(map[uint64]store.Snapshot),
108 txs: make(map[uint64]store.Transaction),
Adam Sadovskyb6a5aa32015-07-07 13:05:26 -0700109 }, nil
110}
111
112// NewDatabase creates a new database instance and returns it.
Adam Sadovskyb6a5aa32015-07-07 13:05:26 -0700113// Designed for use from within App.CreateNoSQLDatabase.
Jatin Lodhiaf6486d42015-07-17 15:57:36 -0700114func NewDatabase(ctx *context.T, a interfaces.App, name string, metadata *wire.SchemaMetadata, opts DatabaseOptions) (*database, error) {
Adam Sadovskyb6a5aa32015-07-07 13:05:26 -0700115 if opts.Perms == nil {
116 return nil, verror.New(verror.ErrInternal, ctx, "perms must be specified")
117 }
118 d, err := OpenDatabase(ctx, a, name, opts, util.OpenOptions{CreateIfMissing: true, ErrorIfExists: true})
119 if err != nil {
120 return nil, err
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -0700121 }
122 data := &databaseData{
Jatin Lodhiaf6486d42015-07-17 15:57:36 -0700123 Name: d.name,
124 Perms: opts.Perms,
125 SchemaMetadata: metadata,
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -0700126 }
Adam Sadovskya31a7cd2015-07-08 10:44:07 -0700127 if err := util.Put(ctx, d.st, d.stKey(), data); err != nil {
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -0700128 return nil, err
129 }
130 return d, nil
131}
132
133////////////////////////////////////////
134// RPC methods
135
Jatin Lodhiaf6486d42015-07-17 15:57:36 -0700136func (d *databaseReq) Create(ctx *context.T, call rpc.ServerCall, perms access.Permissions, metadata *wire.SchemaMetadata) error {
Adam Sadovskyd6b9a232015-06-16 14:04:45 -0700137 if d.exists {
138 return verror.New(verror.ErrExist, ctx, d.name)
139 }
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -0700140 if d.batchId != nil {
141 return wire.NewErrBoundToBatch(ctx)
142 }
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -0700143 // This database does not yet exist; d is just an ephemeral handle that holds
144 // {name string, a *app}. d.a.CreateNoSQLDatabase will create a new database
145 // handle and store it in d.a.dbs[d.name].
Jatin Lodhiaf6486d42015-07-17 15:57:36 -0700146 return d.a.CreateNoSQLDatabase(ctx, call, d.name, perms, metadata)
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -0700147}
148
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -0700149func (d *databaseReq) Delete(ctx *context.T, call rpc.ServerCall) error {
150 if d.batchId != nil {
151 return wire.NewErrBoundToBatch(ctx)
152 }
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -0700153 return d.a.DeleteNoSQLDatabase(ctx, call, d.name)
154}
155
Ivan Pilat8e4a4ab2015-07-14 12:14:52 -0700156func (d *databaseReq) Exists(ctx *context.T, call rpc.ServerCall) (bool, error) {
157 if !d.exists {
158 return false, nil
159 }
160 return util.ErrorToExists(util.GetWithAuth(ctx, call, d.st, d.stKey(), &databaseData{}))
161}
162
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -0700163var rng *rand.Rand = rand.New(rand.NewSource(time.Now().UTC().UnixNano()))
164
165func (d *databaseReq) BeginBatch(ctx *context.T, call rpc.ServerCall, bo wire.BatchOptions) (string, error) {
Adam Sadovskyd6b9a232015-06-16 14:04:45 -0700166 if !d.exists {
167 return "", verror.New(verror.ErrNoExist, ctx, d.name)
168 }
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -0700169 if d.batchId != nil {
170 return "", wire.NewErrBoundToBatch(ctx)
171 }
172 d.mu.Lock()
173 defer d.mu.Unlock()
174 var id uint64
175 var batchType string
176 for {
177 id = uint64(rng.Int63())
178 if bo.ReadOnly {
179 if _, ok := d.sns[id]; !ok {
180 d.sns[id] = d.st.NewSnapshot()
181 batchType = "sn"
182 break
183 }
184 } else {
185 if _, ok := d.txs[id]; !ok {
186 d.txs[id] = d.st.NewTransaction()
187 batchType = "tx"
188 break
189 }
190 }
191 }
192 return strings.Join([]string{d.name, batchType, strconv.FormatUint(id, 10)}, util.BatchSep), nil
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -0700193}
194
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -0700195func (d *databaseReq) Commit(ctx *context.T, call rpc.ServerCall) error {
Adam Sadovskyd6b9a232015-06-16 14:04:45 -0700196 if !d.exists {
197 return verror.New(verror.ErrNoExist, ctx, d.name)
198 }
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -0700199 if d.batchId == nil {
200 return wire.NewErrNotBoundToBatch(ctx)
201 }
202 if d.tx == nil {
203 return wire.NewErrReadOnlyBatch(ctx)
204 }
205 var err error
206 if err = d.tx.Commit(); err == nil {
207 d.mu.Lock()
208 delete(d.txs, *d.batchId)
209 d.mu.Unlock()
210 }
Adam Sadovskyb5f88922015-07-26 17:41:37 -0700211 if verror.ErrorID(err) == store.ErrConcurrentTransaction.ID {
212 return verror.New(wire.ErrConcurrentBatch, ctx, err)
213 }
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -0700214 return err
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -0700215}
216
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -0700217func (d *databaseReq) Abort(ctx *context.T, call rpc.ServerCall) error {
Adam Sadovskyd6b9a232015-06-16 14:04:45 -0700218 if !d.exists {
219 return verror.New(verror.ErrNoExist, ctx, d.name)
220 }
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -0700221 if d.batchId == nil {
222 return wire.NewErrNotBoundToBatch(ctx)
223 }
224 var err error
225 if d.tx != nil {
226 if err = d.tx.Abort(); err == nil {
227 d.mu.Lock()
228 delete(d.txs, *d.batchId)
229 d.mu.Unlock()
230 }
231 } else {
232 if err = d.sn.Close(); err == nil {
233 d.mu.Lock()
234 delete(d.sns, *d.batchId)
235 d.mu.Unlock()
236 }
237 }
238 return err
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -0700239}
240
John Klinece0de2b2015-06-17 09:06:56 -0700241func (d *databaseReq) Exec(ctx *context.T, call wire.DatabaseExecServerCall, q string) error {
242 impl := func(headers []string, rs ResultStream, err error) error {
243 if err != nil {
244 return err
245 }
246 sender := call.SendStream()
247 // Push the headers first -- the client will retrieve them and return
248 // them separately from the results.
249 var resultHeaders []*vdl.Value
250 for _, header := range headers {
251 resultHeaders = append(resultHeaders, vdl.ValueOf(header))
252 }
253 sender.Send(resultHeaders)
254 for rs.Advance() {
255 result := rs.Result()
256 sender.Send(result)
257 }
258 return rs.Err()
259 }
260 var st store.StoreReader
261 if d.batchId != nil {
262 st = d.batchReader()
263 } else {
264 sn := d.st.NewSnapshot()
265 st = sn
266 defer sn.Close()
267 }
268 // queryDb implements query_db.Database
269 // which is needed by the query package's
270 // Exec function.
271 db := &queryDb{
272 ctx: ctx,
273 call: call,
274 req: d,
275 st: st,
276 }
277
278 return impl(query_exec.Exec(db, q))
279}
280
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -0700281func (d *databaseReq) SetPermissions(ctx *context.T, call rpc.ServerCall, perms access.Permissions, version string) error {
Adam Sadovskyd6b9a232015-06-16 14:04:45 -0700282 if !d.exists {
283 return verror.New(verror.ErrNoExist, ctx, d.name)
284 }
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -0700285 if d.batchId != nil {
286 return wire.NewErrBoundToBatch(ctx)
287 }
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -0700288 return d.a.SetDatabasePerms(ctx, call, d.name, perms, version)
289}
290
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -0700291func (d *databaseReq) GetPermissions(ctx *context.T, call rpc.ServerCall) (perms access.Permissions, version string, err error) {
Adam Sadovskyd6b9a232015-06-16 14:04:45 -0700292 if !d.exists {
293 return nil, "", verror.New(verror.ErrNoExist, ctx, d.name)
294 }
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -0700295 if d.batchId != nil {
296 return nil, "", wire.NewErrBoundToBatch(ctx)
297 }
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -0700298 data := &databaseData{}
Adam Sadovskya31a7cd2015-07-08 10:44:07 -0700299 if err := util.GetWithAuth(ctx, call, d.st, d.stKey(), data); err != nil {
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -0700300 return nil, "", err
301 }
302 return data.Perms, util.FormatVersion(data.Version), nil
303}
304
Sergey Rogulenko448a5a02015-07-17 17:54:07 -0700305func (d *databaseReq) WatchGlob(ctx *context.T, call watch.GlobWatcherWatchGlobServerCall, req watch.GlobRequest) error {
Sergey Rogulenko48857092015-07-16 20:38:53 -0700306 // TODO(rogulenko): Implement.
307 if !d.exists {
308 return verror.New(verror.ErrNoExist, ctx, d.name)
309 }
310 if d.batchId != nil {
311 return wire.NewErrBoundToBatch(ctx)
312 }
313 return verror.NewErrNotImplemented(ctx)
314}
315
Sergey Rogulenko448a5a02015-07-17 17:54:07 -0700316func (d *databaseReq) GetResumeMarker(ctx *context.T, call rpc.ServerCall) (watch.ResumeMarker, error) {
Sergey Rogulenko48857092015-07-16 20:38:53 -0700317 // TODO(rogulenko): Implement.
318 if !d.exists {
Sergey Rogulenko448a5a02015-07-17 17:54:07 -0700319 return nil, verror.New(verror.ErrNoExist, ctx, d.name)
Sergey Rogulenko48857092015-07-16 20:38:53 -0700320 }
Sergey Rogulenko448a5a02015-07-17 17:54:07 -0700321 return nil, verror.NewErrNotImplemented(ctx)
Sergey Rogulenko48857092015-07-16 20:38:53 -0700322}
323
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -0700324func (d *databaseReq) GlobChildren__(ctx *context.T, call rpc.ServerCall) (<-chan string, error) {
Adam Sadovskyd6b9a232015-06-16 14:04:45 -0700325 if !d.exists {
326 return nil, verror.New(verror.ErrNoExist, ctx, d.name)
327 }
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -0700328 if d.batchId != nil {
329 return nil, wire.NewErrBoundToBatch(ctx)
330 }
Adam Sadovsky49261192015-05-19 17:39:59 -0700331 // Check perms.
332 sn := d.st.NewSnapshot()
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -0700333 closeSnapshot := func() error {
334 return sn.Close()
335 }
Adam Sadovskya31a7cd2015-07-08 10:44:07 -0700336 if err := util.GetWithAuth(ctx, call, sn, d.stKey(), &databaseData{}); err != nil {
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -0700337 closeSnapshot()
Adam Sadovsky49261192015-05-19 17:39:59 -0700338 return nil, err
339 }
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -0700340 return util.Glob(ctx, call, "*", sn, closeSnapshot, util.TablePrefix)
Adam Sadovsky49261192015-05-19 17:39:59 -0700341}
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -0700342
343////////////////////////////////////////
John Klinece0de2b2015-06-17 09:06:56 -0700344// ResultStream interface
345
346// ResultStream is an interface for iterating through results (a.k.a, rows) returned from a
347// query. Each resulting rows are arrays of vdl objects.
348type ResultStream interface {
349 // Advance stages an element so the client can retrieve it with Result.
350 // Advance returns true iff there is a result to retrieve. The client must
351 // call Advance before calling Result. The client must call Cancel if it
352 // does not iterate through all elements (i.e. until Advance returns false).
353 // Advance may block if an element is not immediately available.
354 Advance() bool
355
356 // Result returns the row (i.e., array of vdl Values) that was staged by Advance.
357 // Result may panic if Advance returned false or was not called at all.
358 // Result does not block.
359 Result() []*vdl.Value
360
361 // Err returns a non-nil error iff the stream encountered any errors. Err does
362 // not block.
363 Err() error
364
365 // Cancel notifies the ResultStream provider that it can stop producing results.
366 // The client must call Cancel if it does not iterate through all results
367 // (i.e. until Advance returns false). Cancel is idempotent and can be called
368 // concurrently with a goroutine that is iterating via Advance/Result.
369 // Cancel causes Advance to subsequently return false. Cancel does not block.
370 Cancel()
371}
372
373////////////////////////////////////////
Adam Sadovskybc00bd62015-05-22 12:50:03 -0700374// interfaces.Database methods
375
376func (d *database) St() store.Store {
Adam Sadovskyd6b9a232015-06-16 14:04:45 -0700377 if !d.exists {
Adam Sadovskybc00bd62015-05-22 12:50:03 -0700378 vlog.Fatalf("database %q does not exist", d.name)
379 }
380 return d.st
381}
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -0700382
Raja Daoud7eea2da2015-07-17 19:27:56 -0700383func (d *database) BlobSt() localblobstore.BlobStore {
384 if !d.exists {
385 vlog.Fatalf("database %q does not exist", d.name)
386 }
387 return d.bst
388}
389
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -0700390func (d *database) App() interfaces.App {
391 return d.a
392}
393
Himabindu Pucha25a1d022015-07-21 20:46:33 -0700394func (d *database) CheckPermsInternal(ctx *context.T, call rpc.ServerCall, st store.StoreReader) error {
Adam Sadovskyd6b9a232015-06-16 14:04:45 -0700395 if !d.exists {
396 vlog.Fatalf("database %q does not exist", d.name)
397 }
Adam Sadovskya31a7cd2015-07-08 10:44:07 -0700398 return util.GetWithAuth(ctx, call, st, d.stKey(), &databaseData{})
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -0700399}
400
401func (d *database) SetPermsInternal(ctx *context.T, call rpc.ServerCall, perms access.Permissions, version string) error {
Adam Sadovskyd6b9a232015-06-16 14:04:45 -0700402 if !d.exists {
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -0700403 vlog.Fatalf("database %q does not exist", d.name)
404 }
405 return store.RunInTransaction(d.st, func(st store.StoreReadWriter) error {
406 data := &databaseData{}
Adam Sadovskya31a7cd2015-07-08 10:44:07 -0700407 return util.UpdateWithAuth(ctx, call, st, d.stKey(), data, func() error {
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -0700408 if err := util.CheckVersion(ctx, version, data.Version); err != nil {
409 return err
410 }
411 data.Perms = perms
412 data.Version++
413 return nil
414 })
415 })
416}
417
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -0700418func (d *database) Name() string {
419 return d.name
420}
421
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -0700422////////////////////////////////////////
Adam Sadovskya31a7cd2015-07-08 10:44:07 -0700423// query_db implementation
John Klinece0de2b2015-06-17 09:06:56 -0700424
425// Implement query_db's Database, Table and KeyValueStream interfaces.
426type queryDb struct {
427 ctx *context.T
428 call wire.DatabaseExecServerCall
429 req *databaseReq
430 st store.StoreReader
431}
432
433func (db *queryDb) GetContext() *context.T {
434 return db.ctx
435}
436
437func (db *queryDb) GetTable(name string) (query_db.Table, error) {
438 tDb := &tableDb{
439 qdb: db,
440 req: &tableReq{
441 name: name,
442 d: db.req,
443 },
444 }
445 // Now that we have a table, we need to check permissions.
Adam Sadovskya31a7cd2015-07-08 10:44:07 -0700446 if err := util.GetWithAuth(db.ctx, db.call, db.st, tDb.req.stKey(), &tableData{}); err != nil {
John Klinece0de2b2015-06-17 09:06:56 -0700447 return nil, err
448 }
449 return tDb, nil
450}
451
452type tableDb struct {
453 qdb *queryDb
454 req *tableReq
455}
456
John Kline96b004f2015-06-19 15:20:00 -0700457func (t *tableDb) Scan(keyRanges query_db.KeyRanges) (query_db.KeyValueStream, error) {
John Kline18834bd2015-06-26 10:07:46 -0700458 streams := []store.Stream{}
459 for _, keyRange := range keyRanges {
John Kline18834bd2015-06-26 10:07:46 -0700460 // TODO(jkline): For now, acquire all of the streams at once to minimize the race condition.
461 // Need a way to Scan multiple ranges at the same state of uncommitted changes.
John Kline8dbd2652015-07-07 09:27:36 -0700462 streams = append(streams, t.qdb.st.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, t.req.name), keyRange.Start, keyRange.Limit)))
John Kline18834bd2015-06-26 10:07:46 -0700463 }
John Klinece0de2b2015-06-17 09:06:56 -0700464 return &kvs{
John Kline8dbd2652015-07-07 09:27:36 -0700465 t: t,
466 curr: 0,
467 validRow: false,
468 it: streams,
469 err: nil,
John Klinece0de2b2015-06-17 09:06:56 -0700470 }, nil
471}
472
473type kvs struct {
474 t *tableDb
John Kline18834bd2015-06-26 10:07:46 -0700475 curr int
John Klinece0de2b2015-06-17 09:06:56 -0700476 validRow bool
477 currKey string
478 currValue *vdl.Value
John Kline18834bd2015-06-26 10:07:46 -0700479 it []store.Stream // array of store.Streams
John Klinece0de2b2015-06-17 09:06:56 -0700480 err error
481}
482
483func (s *kvs) Advance() bool {
484 if s.err != nil {
485 return false
486 }
John Kline8dbd2652015-07-07 09:27:36 -0700487 for s.curr < len(s.it) {
John Kline18834bd2015-06-26 10:07:46 -0700488 if s.it[s.curr].Advance() {
John Klinece0de2b2015-06-17 09:06:56 -0700489 // key
John Kline18834bd2015-06-26 10:07:46 -0700490 keyBytes := s.it[s.curr].Key(nil)
John Klinece0de2b2015-06-17 09:06:56 -0700491 parts := util.SplitKeyParts(string(keyBytes))
Sergey Rogulenko1b92b4f2015-06-18 23:06:11 -0700492 // TODO(rogulenko): Check access for the key.
John Klinece0de2b2015-06-17 09:06:56 -0700493 s.currKey = parts[len(parts)-1]
494 // value
John Kline18834bd2015-06-26 10:07:46 -0700495 valueBytes := s.it[s.curr].Value(nil)
John Klinece0de2b2015-06-17 09:06:56 -0700496 var currValue *vdl.Value
497 if err := vom.Decode(valueBytes, &currValue); err != nil {
498 s.validRow = false
499 s.err = err
500 return false
501 }
502 s.currValue = currValue
503 s.validRow = true
504 return true
505 }
506 // Advance returned false. It could be an err, or it could
507 // be we've reached the end.
John Kline18834bd2015-06-26 10:07:46 -0700508 if err := s.it[s.curr].Err(); err != nil {
John Klinece0de2b2015-06-17 09:06:56 -0700509 s.validRow = false
510 s.err = err
511 return false
512 }
John Kline18834bd2015-06-26 10:07:46 -0700513 // We've reached the end of the iterator for this keyRange.
John Klinece0de2b2015-06-17 09:06:56 -0700514 // Jump to the next one.
John Kline8fe4a402015-07-23 16:40:19 -0700515 s.it[s.curr] = nil
John Klinece0de2b2015-06-17 09:06:56 -0700516 s.curr++
John Klinece0de2b2015-06-17 09:06:56 -0700517 s.validRow = false
518 }
519 // There are no more prefixes to scan.
520 return false
521}
522
523func (s *kvs) KeyValue() (string, *vdl.Value) {
524 if !s.validRow {
525 return "", nil
526 }
527 return s.currKey, s.currValue
528}
529
530func (s *kvs) Err() error {
531 return s.err
532}
533
534func (s *kvs) Cancel() {
535 if s.it != nil {
John Kline18834bd2015-06-26 10:07:46 -0700536 for i := s.curr; i < len(s.it); i++ {
537 s.it[i].Cancel()
538 }
John Klinece0de2b2015-06-17 09:06:56 -0700539 s.it = nil
540 }
John Kline96b004f2015-06-19 15:20:00 -0700541 // set curr to end of keyRanges so Advance will return false
John Kline8dbd2652015-07-07 09:27:36 -0700542 s.curr = len(s.it)
John Klinece0de2b2015-06-17 09:06:56 -0700543}
544
545////////////////////////////////////////
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -0700546// Internal helpers
547
Adam Sadovskya31a7cd2015-07-08 10:44:07 -0700548func (d *database) stKey() string {
549 return util.DatabasePrefix
550}
551
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -0700552func (d *databaseReq) batchReader() store.StoreReader {
553 if d.batchId == nil {
554 return nil
555 } else if d.sn != nil {
556 return d.sn
557 } else {
558 return d.tx
559 }
560}
561
562func (d *databaseReq) batchReadWriter() (store.StoreReadWriter, error) {
563 if d.batchId == nil {
564 return nil, nil
565 } else if d.tx != nil {
566 return d.tx, nil
567 } else {
568 return nil, wire.NewErrReadOnlyBatch(nil)
569 }
570}