blob: a013effc9fe31f3d5943da8cff1d7fec7110d1b2 [file] [log] [blame]
// Copyright 2016 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package util
import (
"reflect"
"v.io/v23/context"
"v.io/v23/security/access"
wire "v.io/v23/services/syncbase"
"v.io/v23/syncbase"
"v.io/v23/vdl"
)
// Row represents a syncbase Row, Collection, or Database.
type Row struct {
DatabaseId wire.Id
CollectionId wire.Id
// If the Row represents a Collection or Database, then Key and Value will
// be "" and nil respectively.
Key string
Value *vdl.Value
// If the Row represents a real Row, then Permissions will be nil,
// otherwise it will be the Permissions of the Collection or Database.
Permissions access.Permissions
}
// Equals returns true if the rows have the same properties.
func (r *Row) Equals(rr *Row) bool {
return reflect.DeepEqual(r.DatabaseId, rr.DatabaseId) &&
reflect.DeepEqual(r.CollectionId, rr.CollectionId) &&
r.Key == rr.Key &&
vdl.EqualValue(r.Value, rr.Value) &&
reflect.DeepEqual(r.Permissions, rr.Permissions)
}
// DumpStream iterates through all collections in all database, and all rows in
// each collection, emiting Rows for each row in the collection. The stream is
// ordered first by database, then by collection, then by row. In addition, a
// Row is emitted for each database before all of its collections, and a Row is
// emitted for each collections before its rows.
type DumpStream struct {
// ServiceName is the name of the syncbase service from which Rows will be
// dumped.
ServiceName string
ctx *context.T
databaseCollections []databaseCollection
err error
nextRow *Row
scanStream syncbase.ScanStream
}
var _ (syncbase.Stream) = (*DumpStream)(nil)
// databaseCollection wraps a database and collection.
type databaseCollection struct {
database syncbase.Database
collection syncbase.Collection
}
func NewDumpStream(ctx *context.T, service syncbase.Service) (*DumpStream, error) {
// Get all database ids.
dbIds, err := service.ListDatabases(ctx)
if err != nil {
return nil, err
}
// Get collections for each database.
dbCols := []databaseCollection{}
for _, dbId := range dbIds {
db := service.DatabaseForId(dbId, nil)
// Add a databaseCollection for just this database.
dbCols = append(dbCols, databaseCollection{database: db})
colIds, err := db.ListCollections(ctx)
if err != nil {
return nil, err
}
for _, colId := range colIds {
dbCols = append(dbCols, databaseCollection{
database: db,
collection: db.CollectionForId(colId),
})
}
}
return &DumpStream{
ServiceName: service.FullName(),
ctx: ctx,
databaseCollections: dbCols,
}, nil
}
// Row returns the next row in the DumpStream.
func (s *DumpStream) Row() *Row {
return s.nextRow
}
func (s *DumpStream) Advance() bool {
// Loop while we either have a stream or collections left.
for s.scanStream != nil || len(s.databaseCollections) > 0 {
if s.scanStream == nil {
dbCol := s.databaseCollections[0]
if dbCol.collection == nil {
// A database with no collection. Emit a Row for just the
// database, and remove it from the databaseCollections slice.
perms, _, err := dbCol.database.GetPermissions(s.ctx)
if err != nil {
s.setErr(err)
return false
}
s.nextRow = &Row{
DatabaseId: dbCol.database.Id(),
Permissions: perms,
}
s.databaseCollections = s.databaseCollections[1:]
return true
}
// Get new scan stream for the collection.
s.scanStream = dbCol.collection.Scan(s.ctx, syncbase.Prefix(""))
// Emit a Row for just the collection.
perms, err := dbCol.collection.GetPermissions(s.ctx)
if err != nil {
s.setErr(err)
return false
}
s.nextRow = &Row{
CollectionId: dbCol.collection.Id(),
DatabaseId: dbCol.database.Id(),
Permissions: perms,
}
return true
}
// We have a stream. Pull rows from it.
if !s.scanStream.Advance() {
// Current stream has ended. Loop to get a new stream from the
// next collection.
s.scanStream = nil
s.databaseCollections = s.databaseCollections[1:]
continue
}
// Get next row key and value from current scan stream.
key := s.scanStream.Key()
var val *vdl.Value
if err := s.scanStream.Value(&val); err != nil {
s.setErr(err)
return false
}
dbCol := s.databaseCollections[0]
s.nextRow = &Row{
Key: key,
Value: val,
CollectionId: dbCol.collection.Id(),
DatabaseId: dbCol.database.Id(),
}
return true
}
// No more stream or collections. We are done.
s.nextRow = nil
return false
}
func (s *DumpStream) setErr(err error) {
s.err = err
s.databaseCollections = nil
s.nextRow = nil
s.scanStream = nil
}
func (s *DumpStream) Err() error {
return s.err
}
func (s *DumpStream) Cancel() {
if s.scanStream != nil {
s.scanStream.Cancel()
}
s.databaseCollections = []databaseCollection{}
s.scanStream = nil
}