syncbase: syncQL: integrate syncQL with key value store
This change implements Exec on the store's database object.
Later, when it is ready, Exec will also be added to BatchRequest.
They query package interacts with the store via implementaitons
of query's Database, Table and KeyValueStream interfaces (these
implemenations reside, for now, in
x/ref/services/syncbase/server/nosql/database.go. The query_db
package needed to be moved out of internal to accomodate this.
Unlike Scan, which returns all errors via the stream, exec
needs to return execution errors on the function itself
(so forr example, the client doesn't attempt to print
headers before iterating over the stream).
Note: Rhis change requires a resolution to the error wrapping
issue. See:
https://vanadium-review.googlesource.com/#/c/12651/
but we might as well start the review for this change.
Change-Id: I76d7fcee1557d36094c4e66cfee4cb6c9f8eedca
diff --git a/services/syncbase/server/nosql/database.go b/services/syncbase/server/nosql/database.go
index b9f7d7b..0971923 100644
--- a/services/syncbase/server/nosql/database.go
+++ b/services/syncbase/server/nosql/database.go
@@ -13,6 +13,9 @@
"time"
wire "v.io/syncbase/v23/services/syncbase/nosql"
+ "v.io/syncbase/v23/syncbase/nosql/query_db"
+ "v.io/syncbase/v23/syncbase/nosql/query_exec"
+ prefixutil "v.io/syncbase/v23/syncbase/util"
"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
"v.io/syncbase/x/ref/services/syncbase/server/util"
"v.io/syncbase/x/ref/services/syncbase/server/watchable"
@@ -20,7 +23,9 @@
"v.io/v23/context"
"v.io/v23/rpc"
"v.io/v23/security/access"
+ "v.io/v23/vdl"
"v.io/v23/verror"
+ "v.io/v23/vom"
"v.io/x/lib/vlog"
)
@@ -202,6 +207,46 @@
return err
}
+func (d *databaseReq) Exec(ctx *context.T, call wire.DatabaseExecServerCall, q string) error {
+ impl := func(headers []string, rs ResultStream, err error) error {
+ if err != nil {
+ return err
+ }
+ sender := call.SendStream()
+ // Push the headers first -- the client will retrieve them and return
+ // them separately from the results.
+ var resultHeaders []*vdl.Value
+ for _, header := range headers {
+ resultHeaders = append(resultHeaders, vdl.ValueOf(header))
+ }
+ sender.Send(resultHeaders)
+ for rs.Advance() {
+ result := rs.Result()
+ sender.Send(result)
+ }
+ return rs.Err()
+ }
+ var st store.StoreReader
+ if d.batchId != nil {
+ st = d.batchReader()
+ } else {
+ sn := d.st.NewSnapshot()
+ st = sn
+ defer sn.Close()
+ }
+ // queryDb implements query_db.Database
+ // which is needed by the query package's
+ // Exec function.
+ db := &queryDb{
+ ctx: ctx,
+ call: call,
+ req: d,
+ st: st,
+ }
+
+ return impl(query_exec.Exec(db, q))
+}
+
func (d *databaseReq) SetPermissions(ctx *context.T, call rpc.ServerCall, perms access.Permissions, version string) error {
if !d.exists {
return verror.New(verror.ErrNoExist, ctx, d.name)
@@ -246,6 +291,36 @@
}
////////////////////////////////////////
+// ResultStream interface
+
+// ResultStream is an interface for iterating through results (a.k.a, rows) returned from a
+// query. Each resulting rows are arrays of vdl objects.
+type ResultStream interface {
+ // Advance stages an element so the client can retrieve it with Result.
+ // Advance returns true iff there is a result to retrieve. The client must
+ // call Advance before calling Result. The client must call Cancel if it
+ // does not iterate through all elements (i.e. until Advance returns false).
+ // Advance may block if an element is not immediately available.
+ Advance() bool
+
+ // Result returns the row (i.e., array of vdl Values) that was staged by Advance.
+ // Result may panic if Advance returned false or was not called at all.
+ // Result does not block.
+ Result() []*vdl.Value
+
+ // Err returns a non-nil error iff the stream encountered any errors. Err does
+ // not block.
+ Err() error
+
+ // Cancel notifies the ResultStream provider that it can stop producing results.
+ // The client must call Cancel if it does not iterate through all results
+ // (i.e. until Advance returns false). Cancel is idempotent and can be called
+ // concurrently with a goroutine that is iterating via Advance/Result.
+ // Cancel causes Advance to subsequently return false. Cancel does not block.
+ Cancel()
+}
+
+////////////////////////////////////////
// interfaces.Database methods
func (d *database) St() store.Store {
@@ -295,6 +370,130 @@
}
////////////////////////////////////////
+// query_db implementations
+
+// Implement query_db's Database, Table and KeyValueStream interfaces.
+type queryDb struct {
+ ctx *context.T
+ call wire.DatabaseExecServerCall
+ req *databaseReq
+ st store.StoreReader
+}
+
+func (db *queryDb) GetContext() *context.T {
+ return db.ctx
+}
+
+func (db *queryDb) GetTable(name string) (query_db.Table, error) {
+ tDb := &tableDb{
+ qdb: db,
+ req: &tableReq{
+ name: name,
+ d: db.req,
+ },
+ }
+ // Now that we have a table, we need to check permissions.
+ if err := util.Get(db.ctx, db.call, db.st, tDb.req, &tableData{}); err != nil {
+ return nil, err
+ }
+ return tDb, nil
+}
+
+type tableDb struct {
+ qdb *queryDb
+ req *tableReq
+}
+
+func (t *tableDb) Scan(prefixes []string) (query_db.KeyValueStream, error) {
+ return &kvs{
+ t: t,
+ prefixes: prefixes,
+ curr: -1,
+ validRow: false,
+ it: nil,
+ err: nil,
+ }, nil
+}
+
+type kvs struct {
+ t *tableDb
+ prefixes []string
+ curr int // current index into prefixes, -1 at start
+ validRow bool
+ currKey string
+ currValue *vdl.Value
+ it store.Stream // current prefix key value stream
+ err error
+}
+
+func (s *kvs) Advance() bool {
+ if s.err != nil {
+ return false
+ }
+ if s.curr == -1 {
+ s.curr++
+ }
+ for s.curr < len(s.prefixes) {
+ if s.it == nil {
+ start := prefixutil.PrefixRangeStart(s.prefixes[s.curr])
+ limit := prefixutil.PrefixRangeLimit(s.prefixes[s.curr])
+ s.it = s.t.qdb.st.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, s.t.req.name), string(start), string(limit)))
+ }
+ if s.it.Advance() {
+ // key
+ keyBytes := s.it.Key(nil)
+ parts := util.SplitKeyParts(string(keyBytes))
+ s.currKey = parts[len(parts)-1]
+ // value
+ valueBytes := s.it.Value(nil)
+ var currValue *vdl.Value
+ if err := vom.Decode(valueBytes, &currValue); err != nil {
+ s.validRow = false
+ s.err = err
+ return false
+ }
+ s.currValue = currValue
+ s.validRow = true
+ return true
+ }
+ // Advance returned false. It could be an err, or it could
+ // be we've reached the end.
+ if err := s.it.Err(); err != nil {
+ s.validRow = false
+ s.err = err
+ return false
+ }
+ // We've reached the end of the iterator for this prefix.
+ // Jump to the next one.
+ s.curr++
+ s.it = nil
+ s.validRow = false
+ }
+ // There are no more prefixes to scan.
+ return false
+}
+
+func (s *kvs) KeyValue() (string, *vdl.Value) {
+ if !s.validRow {
+ return "", nil
+ }
+ return s.currKey, s.currValue
+}
+
+func (s *kvs) Err() error {
+ return s.err
+}
+
+func (s *kvs) Cancel() {
+ if s.it != nil {
+ s.it.Cancel()
+ s.it = nil
+ }
+ // set curr to end of prefixes so Advance will return false
+ s.curr = len(s.prefixes)
+}
+
+////////////////////////////////////////
// Internal helpers
func (d *databaseReq) batchReader() store.StoreReader {