sensorlog: syncbase: Change Collection to use Id instead of name.
Part 3 of Syncbase API simplification.
MultiPart: 3/3
Change-Id: Ia6287cb7b4f1176d8788b3f20a1c1d36bf568764
diff --git a/go/src/v.io/x/sensorlog/internal/client/device.go b/go/src/v.io/x/sensorlog/internal/client/device.go
index e3838bd..d67333c 100644
--- a/go/src/v.io/x/sensorlog/internal/client/device.go
+++ b/go/src/v.io/x/sensorlog/internal/client/device.go
@@ -25,7 +25,7 @@
if err := keyutil.ValidateId(devId); err != nil {
return nil, fmt.Errorf("invalid devId: %v", err)
}
- devKey := sbmodel.KDeviceCfg{
+ devKey := &sbmodel.KDeviceCfg{
DevId: devId,
}
if desc == "" {
@@ -36,7 +36,7 @@
SgPublishSb: sgPublishSb,
}
devSgName := config.SyncgroupName(sgPublishSb, devKey.Key())
- devRow := db.Collection(devKey.Collection()).Row(devKey.Key())
+ devRow := db.CollectionForId(sbmodel.CollectionId(devKey)).Row(devKey.Key())
if exists, err := devRow.Exists(ctx); err != nil {
return nil, err
@@ -51,5 +51,5 @@
return nil, err
}
- return &devKey, devRow.Put(ctx, &devVal)
+ return devKey, devRow.Put(ctx, &devVal)
}
diff --git a/go/src/v.io/x/sensorlog/internal/client/list.go b/go/src/v.io/x/sensorlog/internal/client/list.go
index 795480b..cab7e19 100644
--- a/go/src/v.io/x/sensorlog/internal/client/list.go
+++ b/go/src/v.io/x/sensorlog/internal/client/list.go
@@ -24,39 +24,35 @@
// in chronological order, calling listCb for each.
// TODO(ivanpi): Allow specifying time interval.
func ListStreamData(ctx *context.T, db syncbase.Database, streamKey *sbmodel.KStreamDef, listCb ListCallback) error {
- collectionName := sbmodel.KDataPoint{}.Collection()
+ dataCollectionId := sbmodel.CollectionId(&sbmodel.KDataPoint{})
dataPrefix := keyutil.Join(streamKey.DevId, streamKey.StreamId, "")
- bdb, err := db.BeginBatch(ctx, wire.BatchOptions{ReadOnly: true})
- if err != nil {
- return err
- }
- defer bdb.Abort(ctx)
-
- streamRow := bdb.Collection(streamKey.Collection()).Row(streamKey.Key())
- if exists, err := streamRow.Exists(ctx); err != nil {
- return err
- } else if !exists {
- return verror.New(verror.ErrNoExist, ctx, "Stream '"+streamKey.Key()+"' does not exist")
- }
-
- sstr := bdb.Collection(collectionName).Scan(ctx, syncbase.Prefix(dataPrefix))
- defer sstr.Cancel()
-
- for sstr.Advance() {
- key := &sbmodel.KDataPoint{}
- if err := key.Parse(sstr.Key()); err != nil {
- return fmt.Errorf("invalid DataPoint key for prefix %s: %v", dataPrefix, err)
- }
- var val sbmodel.VDataPoint
- if err := sstr.Value(&val); err != nil {
- return fmt.Errorf("invalid DataPoint value for key %s: %v", sstr.Key(), err)
- }
- if err := listCb(key, val); err != nil {
+ return syncbase.RunInBatch(ctx, db, wire.BatchOptions{ReadOnly: true}, func(bdb syncbase.BatchDatabase) error {
+ streamRow := bdb.CollectionForId(sbmodel.CollectionId(streamKey)).Row(streamKey.Key())
+ if exists, err := streamRow.Exists(ctx); err != nil {
return err
+ } else if !exists {
+ return verror.New(verror.ErrNoExist, ctx, "Stream '"+streamKey.Key()+"' does not exist")
}
- }
- return sstr.Err()
+
+ sstr := bdb.CollectionForId(dataCollectionId).Scan(ctx, syncbase.Prefix(dataPrefix))
+ defer sstr.Cancel()
+
+ for sstr.Advance() {
+ key := &sbmodel.KDataPoint{}
+ if err := key.Parse(sstr.Key()); err != nil {
+ return fmt.Errorf("invalid DataPoint key for prefix %s: %v", dataPrefix, err)
+ }
+ var val sbmodel.VDataPoint
+ if err := sstr.Value(&val); err != nil {
+ return fmt.Errorf("invalid DataPoint value for key %s: %v", sstr.Key(), err)
+ }
+ if err := listCb(key, val); err != nil {
+ return err
+ }
+ }
+ return sstr.Err()
+ })
}
// FollowStreamData lists all data points for the stream specified by streamKey
@@ -64,11 +60,12 @@
// points until ctx is cancelled.
// TODO(ivanpi): Allow specifying time interval.
func FollowStreamData(ctx *context.T, db syncbase.Database, streamKey *sbmodel.KStreamDef, listCb ListCallback) error {
- collectionName := sbmodel.KDataPoint{}.Collection()
+ dataCollectionId := sbmodel.CollectionId(&sbmodel.KDataPoint{})
dataPrefix := keyutil.Join(streamKey.DevId, streamKey.StreamId, "")
// Watch for DataPoints, existing followed by new.
- ws, err := db.Watch(ctx, collectionName, dataPrefix, nil)
+ // TODO(ivanpi): Check if stream exists.
+ ws, err := db.Watch(ctx, dataCollectionId, dataPrefix, nil)
if err != nil {
return err
}
diff --git a/go/src/v.io/x/sensorlog/internal/client/stream.go b/go/src/v.io/x/sensorlog/internal/client/stream.go
index 237ad00..2733b44 100644
--- a/go/src/v.io/x/sensorlog/internal/client/stream.go
+++ b/go/src/v.io/x/sensorlog/internal/client/stream.go
@@ -34,7 +34,7 @@
if interval.Nanoseconds() < config.MinSamplingInterval.Nanoseconds() {
return nil, fmt.Errorf("sampling interval cannot be smaller than %v", config.MinSamplingInterval)
}
- stmKey := sbmodel.KStreamDef{
+ stmKey := &sbmodel.KStreamDef{
DevId: devKey.DevId,
StreamId: streamId,
}
@@ -52,8 +52,8 @@
}
if err := syncbase.RunInBatch(ctx, db, wire.BatchOptions{}, func(db syncbase.BatchDatabase) error {
- devRow := db.Collection(devKey.Collection()).Row(devKey.Key())
- stmRow := db.Collection(stmKey.Collection()).Row(stmKey.Key())
+ devRow := db.CollectionForId(sbmodel.CollectionId(devKey)).Row(devKey.Key())
+ stmRow := db.CollectionForId(sbmodel.CollectionId(stmKey)).Row(stmKey.Key())
if exists, err := devRow.Exists(ctx); err != nil {
return err
@@ -72,7 +72,7 @@
return nil, err
}
- return &stmKey, nil
+ return stmKey, nil
}
// TODO(ivanpi): Implement pausing and resuming streams.
diff --git a/go/src/v.io/x/sensorlog/internal/config/defaults.go b/go/src/v.io/x/sensorlog/internal/config/defaults.go
index 28d0b40..a1472ce 100644
--- a/go/src/v.io/x/sensorlog/internal/config/defaults.go
+++ b/go/src/v.io/x/sensorlog/internal/config/defaults.go
@@ -16,6 +16,7 @@
AppName = "sensorlog"
DBName = "sldb"
+ User = "sluser"
SyncPriority = 42
diff --git a/go/src/v.io/x/sensorlog/internal/measure/syncgroup.go b/go/src/v.io/x/sensorlog/internal/measure/syncgroup.go
index 2974096..97eabb3 100644
--- a/go/src/v.io/x/sensorlog/internal/measure/syncgroup.go
+++ b/go/src/v.io/x/sensorlog/internal/measure/syncgroup.go
@@ -55,8 +55,8 @@
// StreamDef : <devId>
// Admin client has full permissions, measured drops to readonly.
prefixStreamDef := wire.CollectionRow{
- CollectionName: sbmodel.KStreamDef{}.Collection(),
- Row: devId,
+ CollectionId: sbmodel.CollectionId(&sbmodel.KStreamDef{}),
+ Row: devId,
}
aclStreamDef := access.Permissions{}
sbutil.AddPermsForPrincipal(&aclStreamDef, v23.GetPrincipal(ctx), access.Resolve, access.Read)
@@ -66,8 +66,8 @@
// DataPoint : <devId>
// Admin client has full permissions, measured drops to read/write.
prefixDataPoint := wire.CollectionRow{
- CollectionName: sbmodel.KDataPoint{}.Collection(),
- Row: devId,
+ CollectionId: sbmodel.CollectionId(&sbmodel.KDataPoint{}),
+ Row: devId,
}
aclDataPoint := access.Permissions{}
sbutil.AddPermsForPrincipal(&aclDataPoint, v23.GetPrincipal(ctx), access.Resolve, access.Read, access.Write)
diff --git a/go/src/v.io/x/sensorlog/internal/measure/watcher.go b/go/src/v.io/x/sensorlog/internal/measure/watcher.go
index c9260a6..284a41c 100644
--- a/go/src/v.io/x/sensorlog/internal/measure/watcher.go
+++ b/go/src/v.io/x/sensorlog/internal/measure/watcher.go
@@ -27,11 +27,11 @@
// WatchForStreams is synchronous and runs until the context is cancelled or
// an error is encountered.
func WatchForStreams(ctx *context.T, db syncbase.Database, devId string, register RegisterWorker) error {
- collectionName := sbmodel.KStreamDef{}.Collection()
+ collectionId := sbmodel.CollectionId(&sbmodel.KStreamDef{})
watchPrefix := keyutil.Join(devId, "")
// Watch for StreamDef changes and register samplers as needed.
- ws, err := db.Watch(ctx, collectionName, watchPrefix, nil)
+ ws, err := db.Watch(ctx, collectionId, watchPrefix, nil)
if err != nil {
return err
}
diff --git a/go/src/v.io/x/sensorlog/internal/measure/watcher_test.go b/go/src/v.io/x/sensorlog/internal/measure/watcher_test.go
index 9b6d9e6..da9015f 100644
--- a/go/src/v.io/x/sensorlog/internal/measure/watcher_test.go
+++ b/go/src/v.io/x/sensorlog/internal/measure/watcher_test.go
@@ -124,9 +124,9 @@
}
func putStreamDef(t *testing.T, ctx *context.T, db syncbase.DatabaseHandle, devId, desc string) {
- key := sbmodel.KStreamDef{DevId: devId, StreamId: keyutil.UUID()}
+ key := &sbmodel.KStreamDef{DevId: devId, StreamId: keyutil.UUID()}
val := sbmodel.VStreamDef{Desc: desc}
- if err := db.Collection(key.Collection()).Put(ctx, key.Key(), &val); err != nil {
+ if err := db.CollectionForId(sbmodel.CollectionId(key)).Put(ctx, key.Key(), &val); err != nil {
t.Fatalf(tu.FormatLogLine(2, "failed to put StreamDef %s (%s): %v", key.Key(), val.Desc, err))
}
}
diff --git a/go/src/v.io/x/sensorlog/internal/sbmodel/types.go b/go/src/v.io/x/sensorlog/internal/sbmodel/types.go
index 7d5007d..ce33aab 100644
--- a/go/src/v.io/x/sensorlog/internal/sbmodel/types.go
+++ b/go/src/v.io/x/sensorlog/internal/sbmodel/types.go
@@ -7,6 +7,8 @@
package sbmodel
import (
+ wire "v.io/v23/services/syncbase"
+ "v.io/x/sensorlog/internal/config"
"v.io/x/sensorlog/internal/sbmodel/keyutil"
)
@@ -85,3 +87,9 @@
{Prototype: &KStreamDef{}, ReadOnly: true},
{Prototype: &KDataPoint{}},
}
+
+// CollectionId returns the collection id for the data type.
+// TODO(ivanpi): Pass in user instead of using default.
+func CollectionId(prototype PersistentDataKey) wire.Id {
+ return wire.Id{Blessing: config.User, Name: prototype.Collection()}
+}
diff --git a/go/src/v.io/x/sensorlog/internal/sbutil/syncbase.go b/go/src/v.io/x/sensorlog/internal/sbutil/syncbase.go
index e076030..1d0af06 100644
--- a/go/src/v.io/x/sensorlog/internal/sbutil/syncbase.go
+++ b/go/src/v.io/x/sensorlog/internal/sbutil/syncbase.go
@@ -19,6 +19,9 @@
// CreateOrOpenDB opens the Sensor Log database hosted on specified Syncbase
// instance, creating it if missing, and initializing specified collections.
+// TODO(ivanpi): Collection ids currently use a hardwired user blessing. The
+// blessing should instead be provided as a command-line argument, or taken
+// from ctx if not provided.
func CreateOrOpenDB(ctx *context.T, sbService string, collections []sbmodel.CollectionSpec) (syncbase.Database, error) {
aclFull := access.Permissions{}
// Allow everyone to resolve to allow joining syncgroups.
@@ -43,7 +46,7 @@
// TODO(ivanpi): Add schemas when available.
for _, cs := range collections {
- c := db.Collection(cs.Prototype.Collection())
+ c := db.CollectionForId(sbmodel.CollectionId(cs.Prototype))
acl := aclReadOnly
if !cs.ReadOnly {
acl = aclFull
diff --git a/go/src/v.io/x/sensorlog/internal/sbutil/syncbase_test.go b/go/src/v.io/x/sensorlog/internal/sbutil/syncbase_test.go
index 7f248e2..d2b6c17 100644
--- a/go/src/v.io/x/sensorlog/internal/sbutil/syncbase_test.go
+++ b/go/src/v.io/x/sensorlog/internal/sbutil/syncbase_test.go
@@ -47,9 +47,9 @@
}
// Check that all collections exist.
for _, cs := range sbmodel.MasterCollections {
- c := dbGuest.Collection(cs.Prototype.Collection())
+ c := dbGuest.CollectionForId(sbmodel.CollectionId(cs.Prototype))
if exists, err := c.Exists(ctxGuest); err != nil || !exists {
- t.Errorf("Expected collection %s to exist, got: %v (error: %v)", c.Name(), exists, err)
+ t.Errorf("Expected collection %v to exist, got: %v (error: %v)", c.Id(), exists, err)
}
}
}
@@ -73,9 +73,9 @@
// Check that all collections have correct permissions (full or readonly).
for _, cs := range sbmodel.MeasuredCollections {
- c := dbOwner.Collection(cs.Prototype.Collection())
+ c := dbOwner.CollectionForId(sbmodel.CollectionId(cs.Prototype))
if exists, err := c.Exists(ctxOwner); err != nil || !exists {
- t.Errorf("Expected collection %s to exist, got: %v (error: %v)", c.Name(), exists, err)
+ t.Errorf("Expected collection %v to exist, got: %v (error: %v)", c.Id(), exists, err)
}
want := expectPermsFull
if cs.ReadOnly {
@@ -84,7 +84,7 @@
if got, err := c.GetPermissions(ctxOwner); err != nil {
t.Errorf("GetPermissions should have succeeded, got error: %v", err)
} else if got, want = got.Normalize(), want.Normalize(); !reflect.DeepEqual(got, want) {
- t.Errorf("Unexpected collection %s permissions: got %v, want %v", c.Name(), got, want)
+ t.Errorf("Unexpected collection %v permissions: got %v, want %v", c.Id(), got, want)
}
}
}
diff --git a/go/src/v.io/x/sensorlog/measured/measured.go b/go/src/v.io/x/sensorlog/measured/measured.go
index c0c0b45..ff93284 100644
--- a/go/src/v.io/x/sensorlog/measured/measured.go
+++ b/go/src/v.io/x/sensorlog/measured/measured.go
@@ -79,7 +79,7 @@
})
writer := func(ctx *context.T, key *sbmodel.KDataPoint, val sbmodel.VDataPoint) error {
- return db.Collection(key.Collection()).Put(ctx, key.Key(), val)
+ return db.CollectionForId(sbmodel.CollectionId(key)).Put(ctx, key.Key(), val)
}
waitWatch := util.AsyncRun(func() error {