blob: a9f503f02da921ded84f1a10935e66f84bc21d45 [file] [log] [blame]
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -07001// Copyright 2015 The Vanadium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package nosql
6
7import (
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -07008 "math/rand"
Adam Sadovsky232c3662015-06-04 15:00:09 -07009 "path"
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -070010 "strconv"
11 "strings"
12 "sync"
13 "time"
Adam Sadovsky232c3662015-06-04 15:00:09 -070014
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -070015 wire "v.io/syncbase/v23/services/syncbase/nosql"
John Klinece0de2b2015-06-17 09:06:56 -070016 "v.io/syncbase/v23/syncbase/nosql/query_db"
17 "v.io/syncbase/v23/syncbase/nosql/query_exec"
Jatin Lodhiad70777b2015-08-19 16:59:47 -070018 "v.io/syncbase/x/ref/services/syncbase/clock"
Adam Sadovskybc00bd62015-05-22 12:50:03 -070019 "v.io/syncbase/x/ref/services/syncbase/server/interfaces"
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -070020 "v.io/syncbase/x/ref/services/syncbase/server/util"
Adam Sadovsky8db74432015-05-29 17:37:32 -070021 "v.io/syncbase/x/ref/services/syncbase/server/watchable"
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -070022 "v.io/syncbase/x/ref/services/syncbase/store"
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -070023 "v.io/v23/context"
Robin Thellend2ff665b2015-08-06 13:41:02 -070024 "v.io/v23/glob"
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -070025 "v.io/v23/rpc"
26 "v.io/v23/security/access"
John Klinece0de2b2015-06-17 09:06:56 -070027 "v.io/v23/vdl"
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -070028 "v.io/v23/verror"
John Klinece0de2b2015-06-17 09:06:56 -070029 "v.io/v23/vom"
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -070030 "v.io/x/lib/vlog"
31)
32
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -070033// database is a per-database singleton (i.e. not per-request). It does not
34// directly handle RPCs.
35// Note: If a database does not exist at the time of a database RPC, the
36// dispatcher creates a short-lived database object to service that particular
37// request.
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -070038type database struct {
39 name string
Adam Sadovskybc00bd62015-05-22 12:50:03 -070040 a interfaces.App
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -070041 // The fields below are initialized iff this database exists.
Adam Sadovskyd6b9a232015-06-16 14:04:45 -070042 exists bool
Adam Sadovskyb6a5aa32015-07-07 13:05:26 -070043 // TODO(sadovsky): Make st point to a store.Store wrapper that handles paging,
44 // and do not actually open the store in NewDatabase.
45 st store.Store // stores all data for a single database
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -070046
47 // Active snapshots and transactions corresponding to client batches.
48 // TODO(sadovsky): Add timeouts and GC.
49 mu sync.Mutex // protects the fields below
50 sns map[uint64]store.Snapshot
51 txs map[uint64]store.Transaction
Jatin Lodhiac1735112015-08-28 10:52:14 -070052
53 // Active ConflictResolver connection from the app to this database.
54 // NOTE: For now, we assume there's only one open conflict resolution stream
55 // per database (typically, from the app that owns the database).
56 resolver wire.ConflictManagerStartConflictResolverServerCall
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -070057}
58
59// databaseReq is a per-request object that handles Database RPCs.
60// It embeds database and tracks request-specific batch state.
61type databaseReq struct {
62 *database
63 // If non-nil, sn or tx will be non-nil.
64 batchId *uint64
65 sn store.Snapshot
66 tx store.Transaction
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -070067}
68
69var (
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -070070 _ wire.DatabaseServerMethods = (*databaseReq)(nil)
Adam Sadovskybc00bd62015-05-22 12:50:03 -070071 _ interfaces.Database = (*database)(nil)
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -070072)
73
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -070074// DatabaseOptions configures a database.
Adam Sadovsky232c3662015-06-04 15:00:09 -070075type DatabaseOptions struct {
76 // Database-level permissions.
77 Perms access.Permissions
78 // Root dir for data storage.
79 RootDir string
80 // Storage engine to use.
81 Engine string
82}
83
Adam Sadovskyb6a5aa32015-07-07 13:05:26 -070084// OpenDatabase opens a database and returns a *database for it. Designed for
85// use from within NewDatabase and server.NewService.
86func OpenDatabase(ctx *context.T, a interfaces.App, name string, opts DatabaseOptions, openOpts util.OpenOptions) (*database, error) {
87 st, err := util.OpenStore(opts.Engine, path.Join(opts.RootDir, opts.Engine), openOpts)
Adam Sadovsky232c3662015-06-04 15:00:09 -070088 if err != nil {
89 return nil, err
90 }
Jatin Lodhiad70777b2015-08-19 16:59:47 -070091 vclock := clock.NewVClock(a.Service().St())
92 st, err = watchable.Wrap(st, vclock, &watchable.Options{
Raja Daoudd4543072015-06-30 11:15:55 -070093 ManagedPrefixes: []string{util.RowPrefix, util.PermsPrefix},
Adam Sadovsky8db74432015-05-29 17:37:32 -070094 })
Adam Sadovsky63fafbb2015-05-26 14:22:31 -070095 if err != nil {
96 return nil, err
97 }
Adam Sadovskyb6a5aa32015-07-07 13:05:26 -070098 return &database{
Adam Sadovskyd6b9a232015-06-16 14:04:45 -070099 name: name,
100 a: a,
101 exists: true,
102 st: st,
103 sns: make(map[uint64]store.Snapshot),
104 txs: make(map[uint64]store.Transaction),
Adam Sadovskyb6a5aa32015-07-07 13:05:26 -0700105 }, nil
106}
107
108// NewDatabase creates a new database instance and returns it.
Adam Sadovskyb6a5aa32015-07-07 13:05:26 -0700109// Designed for use from within App.CreateNoSQLDatabase.
Jatin Lodhiaf6486d42015-07-17 15:57:36 -0700110func NewDatabase(ctx *context.T, a interfaces.App, name string, metadata *wire.SchemaMetadata, opts DatabaseOptions) (*database, error) {
Adam Sadovskyb6a5aa32015-07-07 13:05:26 -0700111 if opts.Perms == nil {
112 return nil, verror.New(verror.ErrInternal, ctx, "perms must be specified")
113 }
114 d, err := OpenDatabase(ctx, a, name, opts, util.OpenOptions{CreateIfMissing: true, ErrorIfExists: true})
115 if err != nil {
116 return nil, err
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -0700117 }
118 data := &databaseData{
Jatin Lodhiaf6486d42015-07-17 15:57:36 -0700119 Name: d.name,
120 Perms: opts.Perms,
121 SchemaMetadata: metadata,
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -0700122 }
Adam Sadovskya31a7cd2015-07-08 10:44:07 -0700123 if err := util.Put(ctx, d.st, d.stKey(), data); err != nil {
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -0700124 return nil, err
125 }
126 return d, nil
127}
128
129////////////////////////////////////////
130// RPC methods
131
Jatin Lodhia28417552015-07-28 14:17:56 -0700132func (d *databaseReq) Create(ctx *context.T, call rpc.ServerCall, metadata *wire.SchemaMetadata, perms access.Permissions) error {
Adam Sadovskyd6b9a232015-06-16 14:04:45 -0700133 if d.exists {
134 return verror.New(verror.ErrExist, ctx, d.name)
135 }
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -0700136 if d.batchId != nil {
137 return wire.NewErrBoundToBatch(ctx)
138 }
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -0700139 // This database does not yet exist; d is just an ephemeral handle that holds
140 // {name string, a *app}. d.a.CreateNoSQLDatabase will create a new database
141 // handle and store it in d.a.dbs[d.name].
Jatin Lodhiaf6486d42015-07-17 15:57:36 -0700142 return d.a.CreateNoSQLDatabase(ctx, call, d.name, perms, metadata)
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -0700143}
144
Jatin Lodhia28417552015-07-28 14:17:56 -0700145func (d *databaseReq) Delete(ctx *context.T, call rpc.ServerCall, schemaVersion int32) error {
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -0700146 if d.batchId != nil {
147 return wire.NewErrBoundToBatch(ctx)
148 }
Jatin Lodhia28417552015-07-28 14:17:56 -0700149 if err := d.checkSchemaVersion(ctx, schemaVersion); err != nil {
150 return err
151 }
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -0700152 return d.a.DeleteNoSQLDatabase(ctx, call, d.name)
153}
154
Jatin Lodhia28417552015-07-28 14:17:56 -0700155func (d *databaseReq) Exists(ctx *context.T, call rpc.ServerCall, schemaVersion int32) (bool, error) {
Ivan Pilat8e4a4ab2015-07-14 12:14:52 -0700156 if !d.exists {
157 return false, nil
158 }
Jatin Lodhia28417552015-07-28 14:17:56 -0700159 if err := d.checkSchemaVersion(ctx, schemaVersion); err != nil {
160 return false, err
161 }
Ivan Pilat8e4a4ab2015-07-14 12:14:52 -0700162 return util.ErrorToExists(util.GetWithAuth(ctx, call, d.st, d.stKey(), &databaseData{}))
163}
164
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -0700165var rng *rand.Rand = rand.New(rand.NewSource(time.Now().UTC().UnixNano()))
166
Jatin Lodhia28417552015-07-28 14:17:56 -0700167func (d *databaseReq) BeginBatch(ctx *context.T, call rpc.ServerCall, schemaVersion int32, bo wire.BatchOptions) (string, error) {
Adam Sadovskyd6b9a232015-06-16 14:04:45 -0700168 if !d.exists {
169 return "", verror.New(verror.ErrNoExist, ctx, d.name)
170 }
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -0700171 if d.batchId != nil {
172 return "", wire.NewErrBoundToBatch(ctx)
173 }
Jatin Lodhia28417552015-07-28 14:17:56 -0700174 if err := d.checkSchemaVersion(ctx, schemaVersion); err != nil {
175 return "", err
176 }
177
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -0700178 d.mu.Lock()
179 defer d.mu.Unlock()
180 var id uint64
181 var batchType string
182 for {
183 id = uint64(rng.Int63())
184 if bo.ReadOnly {
185 if _, ok := d.sns[id]; !ok {
186 d.sns[id] = d.st.NewSnapshot()
187 batchType = "sn"
188 break
189 }
190 } else {
191 if _, ok := d.txs[id]; !ok {
192 d.txs[id] = d.st.NewTransaction()
193 batchType = "tx"
194 break
195 }
196 }
197 }
198 return strings.Join([]string{d.name, batchType, strconv.FormatUint(id, 10)}, util.BatchSep), nil
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -0700199}
200
Jatin Lodhia28417552015-07-28 14:17:56 -0700201func (d *databaseReq) Commit(ctx *context.T, call rpc.ServerCall, schemaVersion int32) error {
Adam Sadovskyd6b9a232015-06-16 14:04:45 -0700202 if !d.exists {
203 return verror.New(verror.ErrNoExist, ctx, d.name)
204 }
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -0700205 if d.batchId == nil {
206 return wire.NewErrNotBoundToBatch(ctx)
207 }
208 if d.tx == nil {
209 return wire.NewErrReadOnlyBatch(ctx)
210 }
Jatin Lodhia28417552015-07-28 14:17:56 -0700211 if err := d.checkSchemaVersion(ctx, schemaVersion); err != nil {
212 return err
213 }
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -0700214 var err error
215 if err = d.tx.Commit(); err == nil {
216 d.mu.Lock()
217 delete(d.txs, *d.batchId)
218 d.mu.Unlock()
219 }
Adam Sadovskyb5f88922015-07-26 17:41:37 -0700220 if verror.ErrorID(err) == store.ErrConcurrentTransaction.ID {
221 return verror.New(wire.ErrConcurrentBatch, ctx, err)
222 }
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -0700223 return err
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -0700224}
225
Jatin Lodhia28417552015-07-28 14:17:56 -0700226func (d *databaseReq) Abort(ctx *context.T, call rpc.ServerCall, schemaVersion int32) error {
Adam Sadovskyd6b9a232015-06-16 14:04:45 -0700227 if !d.exists {
228 return verror.New(verror.ErrNoExist, ctx, d.name)
229 }
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -0700230 if d.batchId == nil {
231 return wire.NewErrNotBoundToBatch(ctx)
232 }
Jatin Lodhia28417552015-07-28 14:17:56 -0700233 if err := d.checkSchemaVersion(ctx, schemaVersion); err != nil {
234 return err
235 }
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -0700236 var err error
237 if d.tx != nil {
238 if err = d.tx.Abort(); err == nil {
239 d.mu.Lock()
240 delete(d.txs, *d.batchId)
241 d.mu.Unlock()
242 }
243 } else {
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700244 if err = d.sn.Abort(); err == nil {
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -0700245 d.mu.Lock()
246 delete(d.sns, *d.batchId)
247 d.mu.Unlock()
248 }
249 }
250 return err
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -0700251}
252
Jatin Lodhia28417552015-07-28 14:17:56 -0700253func (d *databaseReq) Exec(ctx *context.T, call wire.DatabaseExecServerCall, schemaVersion int32, q string) error {
254 if err := d.checkSchemaVersion(ctx, schemaVersion); err != nil {
255 return err
256 }
John Klinece0de2b2015-06-17 09:06:56 -0700257 impl := func(headers []string, rs ResultStream, err error) error {
258 if err != nil {
259 return err
260 }
261 sender := call.SendStream()
262 // Push the headers first -- the client will retrieve them and return
263 // them separately from the results.
264 var resultHeaders []*vdl.Value
265 for _, header := range headers {
266 resultHeaders = append(resultHeaders, vdl.ValueOf(header))
267 }
268 sender.Send(resultHeaders)
269 for rs.Advance() {
270 result := rs.Result()
Sergey Rogulenko40402b52015-08-10 15:09:48 -0700271 if err := sender.Send(result); err != nil {
272 rs.Cancel()
273 return err
274 }
John Klinece0de2b2015-06-17 09:06:56 -0700275 }
276 return rs.Err()
277 }
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700278 var sntx store.SnapshotOrTransaction
John Klinece0de2b2015-06-17 09:06:56 -0700279 if d.batchId != nil {
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700280 sntx = d.batchReader()
John Klinece0de2b2015-06-17 09:06:56 -0700281 } else {
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700282 sntx = d.st.NewSnapshot()
283 defer sntx.Abort()
John Klinece0de2b2015-06-17 09:06:56 -0700284 }
285 // queryDb implements query_db.Database
286 // which is needed by the query package's
287 // Exec function.
288 db := &queryDb{
289 ctx: ctx,
290 call: call,
291 req: d,
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700292 sntx: sntx,
John Klinece0de2b2015-06-17 09:06:56 -0700293 }
294
295 return impl(query_exec.Exec(db, q))
296}
297
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -0700298func (d *databaseReq) SetPermissions(ctx *context.T, call rpc.ServerCall, perms access.Permissions, version string) error {
Adam Sadovskyd6b9a232015-06-16 14:04:45 -0700299 if !d.exists {
300 return verror.New(verror.ErrNoExist, ctx, d.name)
301 }
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -0700302 if d.batchId != nil {
303 return wire.NewErrBoundToBatch(ctx)
304 }
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -0700305 return d.a.SetDatabasePerms(ctx, call, d.name, perms, version)
306}
307
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -0700308func (d *databaseReq) GetPermissions(ctx *context.T, call rpc.ServerCall) (perms access.Permissions, version string, err error) {
Adam Sadovskyd6b9a232015-06-16 14:04:45 -0700309 if !d.exists {
310 return nil, "", verror.New(verror.ErrNoExist, ctx, d.name)
311 }
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -0700312 if d.batchId != nil {
313 return nil, "", wire.NewErrBoundToBatch(ctx)
314 }
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -0700315 data := &databaseData{}
Adam Sadovskya31a7cd2015-07-08 10:44:07 -0700316 if err := util.GetWithAuth(ctx, call, d.st, d.stKey(), data); err != nil {
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -0700317 return nil, "", err
318 }
319 return data.Perms, util.FormatVersion(data.Version), nil
320}
321
Robin Thellend2ff665b2015-08-06 13:41:02 -0700322func (d *databaseReq) GlobChildren__(ctx *context.T, call rpc.GlobChildrenServerCall, matcher *glob.Element) error {
Adam Sadovskyd6b9a232015-06-16 14:04:45 -0700323 if !d.exists {
Robin Thellend2ff665b2015-08-06 13:41:02 -0700324 return verror.New(verror.ErrNoExist, ctx, d.name)
Adam Sadovskyd6b9a232015-06-16 14:04:45 -0700325 }
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -0700326 if d.batchId != nil {
Robin Thellend2ff665b2015-08-06 13:41:02 -0700327 return wire.NewErrBoundToBatch(ctx)
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -0700328 }
Adam Sadovsky49261192015-05-19 17:39:59 -0700329 // Check perms.
330 sn := d.st.NewSnapshot()
Adam Sadovskya31a7cd2015-07-08 10:44:07 -0700331 if err := util.GetWithAuth(ctx, call, sn, d.stKey(), &databaseData{}); err != nil {
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700332 sn.Abort()
Robin Thellend2ff665b2015-08-06 13:41:02 -0700333 return err
Adam Sadovsky49261192015-05-19 17:39:59 -0700334 }
Robin Thellend2ff665b2015-08-06 13:41:02 -0700335 return util.Glob(ctx, call, matcher, sn, sn.Abort, util.TablePrefix)
Adam Sadovsky49261192015-05-19 17:39:59 -0700336}
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -0700337
338////////////////////////////////////////
John Klinece0de2b2015-06-17 09:06:56 -0700339// ResultStream interface
340
341// ResultStream is an interface for iterating through results (a.k.a, rows) returned from a
342// query. Each resulting rows are arrays of vdl objects.
343type ResultStream interface {
344 // Advance stages an element so the client can retrieve it with Result.
345 // Advance returns true iff there is a result to retrieve. The client must
346 // call Advance before calling Result. The client must call Cancel if it
347 // does not iterate through all elements (i.e. until Advance returns false).
348 // Advance may block if an element is not immediately available.
349 Advance() bool
350
351 // Result returns the row (i.e., array of vdl Values) that was staged by Advance.
352 // Result may panic if Advance returned false or was not called at all.
353 // Result does not block.
354 Result() []*vdl.Value
355
356 // Err returns a non-nil error iff the stream encountered any errors. Err does
357 // not block.
358 Err() error
359
360 // Cancel notifies the ResultStream provider that it can stop producing results.
361 // The client must call Cancel if it does not iterate through all results
362 // (i.e. until Advance returns false). Cancel is idempotent and can be called
363 // concurrently with a goroutine that is iterating via Advance/Result.
364 // Cancel causes Advance to subsequently return false. Cancel does not block.
365 Cancel()
366}
367
368////////////////////////////////////////
Adam Sadovskybc00bd62015-05-22 12:50:03 -0700369// interfaces.Database methods
370
371func (d *database) St() store.Store {
Adam Sadovskyd6b9a232015-06-16 14:04:45 -0700372 if !d.exists {
Adam Sadovskybc00bd62015-05-22 12:50:03 -0700373 vlog.Fatalf("database %q does not exist", d.name)
374 }
375 return d.st
376}
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -0700377
Himabindu Puchaf9ec56f2015-06-02 11:34:05 -0700378func (d *database) App() interfaces.App {
379 return d.a
380}
381
Himabindu Pucha25a1d022015-07-21 20:46:33 -0700382func (d *database) CheckPermsInternal(ctx *context.T, call rpc.ServerCall, st store.StoreReader) error {
Adam Sadovskyd6b9a232015-06-16 14:04:45 -0700383 if !d.exists {
384 vlog.Fatalf("database %q does not exist", d.name)
385 }
Adam Sadovskya31a7cd2015-07-08 10:44:07 -0700386 return util.GetWithAuth(ctx, call, st, d.stKey(), &databaseData{})
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -0700387}
388
389func (d *database) SetPermsInternal(ctx *context.T, call rpc.ServerCall, perms access.Permissions, version string) error {
Adam Sadovskyd6b9a232015-06-16 14:04:45 -0700390 if !d.exists {
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -0700391 vlog.Fatalf("database %q does not exist", d.name)
392 }
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700393 return store.RunInTransaction(d.st, func(tx store.Transaction) error {
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -0700394 data := &databaseData{}
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700395 return util.UpdateWithAuth(ctx, call, tx, d.stKey(), data, func() error {
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -0700396 if err := util.CheckVersion(ctx, version, data.Version); err != nil {
397 return err
398 }
399 data.Perms = perms
400 data.Version++
401 return nil
402 })
403 })
404}
405
Adam Sadovskyf3b7abc2015-05-04 15:33:22 -0700406func (d *database) Name() string {
407 return d.name
408}
409
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -0700410////////////////////////////////////////
Adam Sadovskya31a7cd2015-07-08 10:44:07 -0700411// query_db implementation
John Klinece0de2b2015-06-17 09:06:56 -0700412
413// Implement query_db's Database, Table and KeyValueStream interfaces.
414type queryDb struct {
415 ctx *context.T
416 call wire.DatabaseExecServerCall
417 req *databaseReq
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700418 sntx store.SnapshotOrTransaction
John Klinece0de2b2015-06-17 09:06:56 -0700419}
420
421func (db *queryDb) GetContext() *context.T {
422 return db.ctx
423}
424
425func (db *queryDb) GetTable(name string) (query_db.Table, error) {
426 tDb := &tableDb{
427 qdb: db,
428 req: &tableReq{
429 name: name,
430 d: db.req,
431 },
432 }
433 // Now that we have a table, we need to check permissions.
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700434 if err := util.GetWithAuth(db.ctx, db.call, db.sntx, tDb.req.stKey(), &tableData{}); err != nil {
John Klinece0de2b2015-06-17 09:06:56 -0700435 return nil, err
436 }
437 return tDb, nil
438}
439
440type tableDb struct {
441 qdb *queryDb
442 req *tableReq
443}
444
John Kline96b004f2015-06-19 15:20:00 -0700445func (t *tableDb) Scan(keyRanges query_db.KeyRanges) (query_db.KeyValueStream, error) {
John Kline18834bd2015-06-26 10:07:46 -0700446 streams := []store.Stream{}
447 for _, keyRange := range keyRanges {
John Kline18834bd2015-06-26 10:07:46 -0700448 // TODO(jkline): For now, acquire all of the streams at once to minimize the race condition.
449 // Need a way to Scan multiple ranges at the same state of uncommitted changes.
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700450 streams = append(streams, t.qdb.sntx.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, t.req.name), keyRange.Start, keyRange.Limit)))
John Kline18834bd2015-06-26 10:07:46 -0700451 }
John Klinece0de2b2015-06-17 09:06:56 -0700452 return &kvs{
John Kline8dbd2652015-07-07 09:27:36 -0700453 t: t,
454 curr: 0,
455 validRow: false,
456 it: streams,
457 err: nil,
John Klinece0de2b2015-06-17 09:06:56 -0700458 }, nil
459}
460
461type kvs struct {
462 t *tableDb
John Kline18834bd2015-06-26 10:07:46 -0700463 curr int
John Klinece0de2b2015-06-17 09:06:56 -0700464 validRow bool
465 currKey string
466 currValue *vdl.Value
John Kline18834bd2015-06-26 10:07:46 -0700467 it []store.Stream // array of store.Streams
John Klinece0de2b2015-06-17 09:06:56 -0700468 err error
469}
470
471func (s *kvs) Advance() bool {
472 if s.err != nil {
473 return false
474 }
John Kline8dbd2652015-07-07 09:27:36 -0700475 for s.curr < len(s.it) {
John Kline18834bd2015-06-26 10:07:46 -0700476 if s.it[s.curr].Advance() {
John Klinece0de2b2015-06-17 09:06:56 -0700477 // key
John Kline18834bd2015-06-26 10:07:46 -0700478 keyBytes := s.it[s.curr].Key(nil)
John Klinece0de2b2015-06-17 09:06:56 -0700479 parts := util.SplitKeyParts(string(keyBytes))
Sergey Rogulenko1b92b4f2015-06-18 23:06:11 -0700480 // TODO(rogulenko): Check access for the key.
John Klinece0de2b2015-06-17 09:06:56 -0700481 s.currKey = parts[len(parts)-1]
482 // value
John Kline18834bd2015-06-26 10:07:46 -0700483 valueBytes := s.it[s.curr].Value(nil)
John Klinece0de2b2015-06-17 09:06:56 -0700484 var currValue *vdl.Value
485 if err := vom.Decode(valueBytes, &currValue); err != nil {
486 s.validRow = false
487 s.err = err
488 return false
489 }
490 s.currValue = currValue
491 s.validRow = true
492 return true
493 }
494 // Advance returned false. It could be an err, or it could
495 // be we've reached the end.
John Kline18834bd2015-06-26 10:07:46 -0700496 if err := s.it[s.curr].Err(); err != nil {
John Klinece0de2b2015-06-17 09:06:56 -0700497 s.validRow = false
498 s.err = err
499 return false
500 }
John Kline18834bd2015-06-26 10:07:46 -0700501 // We've reached the end of the iterator for this keyRange.
John Klinece0de2b2015-06-17 09:06:56 -0700502 // Jump to the next one.
John Kline8fe4a402015-07-23 16:40:19 -0700503 s.it[s.curr] = nil
John Klinece0de2b2015-06-17 09:06:56 -0700504 s.curr++
John Klinece0de2b2015-06-17 09:06:56 -0700505 s.validRow = false
506 }
507 // There are no more prefixes to scan.
508 return false
509}
510
511func (s *kvs) KeyValue() (string, *vdl.Value) {
512 if !s.validRow {
513 return "", nil
514 }
515 return s.currKey, s.currValue
516}
517
518func (s *kvs) Err() error {
519 return s.err
520}
521
522func (s *kvs) Cancel() {
523 if s.it != nil {
John Kline18834bd2015-06-26 10:07:46 -0700524 for i := s.curr; i < len(s.it); i++ {
525 s.it[i].Cancel()
526 }
John Klinece0de2b2015-06-17 09:06:56 -0700527 s.it = nil
528 }
John Kline96b004f2015-06-19 15:20:00 -0700529 // set curr to end of keyRanges so Advance will return false
John Kline8dbd2652015-07-07 09:27:36 -0700530 s.curr = len(s.it)
John Klinece0de2b2015-06-17 09:06:56 -0700531}
532
533////////////////////////////////////////
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -0700534// Internal helpers
535
Adam Sadovskya31a7cd2015-07-08 10:44:07 -0700536func (d *database) stKey() string {
537 return util.DatabasePrefix
538}
539
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700540func (d *databaseReq) batchReader() store.SnapshotOrTransaction {
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -0700541 if d.batchId == nil {
542 return nil
543 } else if d.sn != nil {
544 return d.sn
545 } else {
546 return d.tx
547 }
548}
549
Sergey Rogulenko1068b1a2015-08-03 16:53:27 -0700550func (d *databaseReq) batchTransaction() (store.Transaction, error) {
Adam Sadovsky1c91f2a2015-06-04 22:23:51 -0700551 if d.batchId == nil {
552 return nil, nil
553 } else if d.tx != nil {
554 return d.tx, nil
555 } else {
556 return nil, wire.NewErrReadOnlyBatch(nil)
557 }
558}
Jatin Lodhia28417552015-07-28 14:17:56 -0700559
560// TODO(jlodhia): Schema check should happen within a transaction for each
561// operation in database, table and row. Do schema check along with permissions
562// check when fully-specified permission model is implemented.
563func (d *databaseReq) checkSchemaVersion(ctx *context.T, schemaVersion int32) error {
564 if !d.exists {
565 // database does not exist yet and hence there is no schema to check.
566 // This can happen if delete is called twice on the same database.
567 return nil
568 }
569 schemaMetadata, err := d.getSchemaMetadataWithoutAuth(ctx)
570 if err != nil {
571 return err
572 }
573 if (schemaMetadata == nil) || (schemaMetadata.Version == schemaVersion) {
574 return nil
575 }
576 return wire.NewErrSchemaVersionMismatch(ctx)
577}