sensorlog: syncbase: Change Collection to use Id instead of name.

Part 3 of Syncbase API simplification.

MultiPart: 3/3

Change-Id: Ia6287cb7b4f1176d8788b3f20a1c1d36bf568764
diff --git a/go/src/v.io/x/sensorlog/internal/client/device.go b/go/src/v.io/x/sensorlog/internal/client/device.go
index e3838bd..d67333c 100644
--- a/go/src/v.io/x/sensorlog/internal/client/device.go
+++ b/go/src/v.io/x/sensorlog/internal/client/device.go
@@ -25,7 +25,7 @@
 	if err := keyutil.ValidateId(devId); err != nil {
 		return nil, fmt.Errorf("invalid devId: %v", err)
 	}
-	devKey := sbmodel.KDeviceCfg{
+	devKey := &sbmodel.KDeviceCfg{
 		DevId: devId,
 	}
 	if desc == "" {
@@ -36,7 +36,7 @@
 		SgPublishSb: sgPublishSb,
 	}
 	devSgName := config.SyncgroupName(sgPublishSb, devKey.Key())
-	devRow := db.Collection(devKey.Collection()).Row(devKey.Key())
+	devRow := db.CollectionForId(sbmodel.CollectionId(devKey)).Row(devKey.Key())
 
 	if exists, err := devRow.Exists(ctx); err != nil {
 		return nil, err
@@ -51,5 +51,5 @@
 		return nil, err
 	}
 
-	return &devKey, devRow.Put(ctx, &devVal)
+	return devKey, devRow.Put(ctx, &devVal)
 }
diff --git a/go/src/v.io/x/sensorlog/internal/client/list.go b/go/src/v.io/x/sensorlog/internal/client/list.go
index 795480b..cab7e19 100644
--- a/go/src/v.io/x/sensorlog/internal/client/list.go
+++ b/go/src/v.io/x/sensorlog/internal/client/list.go
@@ -24,39 +24,35 @@
 // in chronological order, calling listCb for each.
 // TODO(ivanpi): Allow specifying time interval.
 func ListStreamData(ctx *context.T, db syncbase.Database, streamKey *sbmodel.KStreamDef, listCb ListCallback) error {
-	collectionName := sbmodel.KDataPoint{}.Collection()
+	dataCollectionId := sbmodel.CollectionId(&sbmodel.KDataPoint{})
 	dataPrefix := keyutil.Join(streamKey.DevId, streamKey.StreamId, "")
 
-	bdb, err := db.BeginBatch(ctx, wire.BatchOptions{ReadOnly: true})
-	if err != nil {
-		return err
-	}
-	defer bdb.Abort(ctx)
-
-	streamRow := bdb.Collection(streamKey.Collection()).Row(streamKey.Key())
-	if exists, err := streamRow.Exists(ctx); err != nil {
-		return err
-	} else if !exists {
-		return verror.New(verror.ErrNoExist, ctx, "Stream '"+streamKey.Key()+"' does not exist")
-	}
-
-	sstr := bdb.Collection(collectionName).Scan(ctx, syncbase.Prefix(dataPrefix))
-	defer sstr.Cancel()
-
-	for sstr.Advance() {
-		key := &sbmodel.KDataPoint{}
-		if err := key.Parse(sstr.Key()); err != nil {
-			return fmt.Errorf("invalid DataPoint key for prefix %s: %v", dataPrefix, err)
-		}
-		var val sbmodel.VDataPoint
-		if err := sstr.Value(&val); err != nil {
-			return fmt.Errorf("invalid DataPoint value for key %s: %v", sstr.Key(), err)
-		}
-		if err := listCb(key, val); err != nil {
+	return syncbase.RunInBatch(ctx, db, wire.BatchOptions{ReadOnly: true}, func(bdb syncbase.BatchDatabase) error {
+		streamRow := bdb.CollectionForId(sbmodel.CollectionId(streamKey)).Row(streamKey.Key())
+		if exists, err := streamRow.Exists(ctx); err != nil {
 			return err
+		} else if !exists {
+			return verror.New(verror.ErrNoExist, ctx, "Stream '"+streamKey.Key()+"' does not exist")
 		}
-	}
-	return sstr.Err()
+
+		sstr := bdb.CollectionForId(dataCollectionId).Scan(ctx, syncbase.Prefix(dataPrefix))
+		defer sstr.Cancel()
+
+		for sstr.Advance() {
+			key := &sbmodel.KDataPoint{}
+			if err := key.Parse(sstr.Key()); err != nil {
+				return fmt.Errorf("invalid DataPoint key for prefix %s: %v", dataPrefix, err)
+			}
+			var val sbmodel.VDataPoint
+			if err := sstr.Value(&val); err != nil {
+				return fmt.Errorf("invalid DataPoint value for key %s: %v", sstr.Key(), err)
+			}
+			if err := listCb(key, val); err != nil {
+				return err
+			}
+		}
+		return sstr.Err()
+	})
 }
 
 // FollowStreamData lists all data points for the stream specified by streamKey
@@ -64,11 +60,12 @@
 // points until ctx is cancelled.
 // TODO(ivanpi): Allow specifying time interval.
 func FollowStreamData(ctx *context.T, db syncbase.Database, streamKey *sbmodel.KStreamDef, listCb ListCallback) error {
-	collectionName := sbmodel.KDataPoint{}.Collection()
+	dataCollectionId := sbmodel.CollectionId(&sbmodel.KDataPoint{})
 	dataPrefix := keyutil.Join(streamKey.DevId, streamKey.StreamId, "")
 
 	// Watch for DataPoints, existing followed by new.
-	ws, err := db.Watch(ctx, collectionName, dataPrefix, nil)
+	// TODO(ivanpi): Check if stream exists.
+	ws, err := db.Watch(ctx, dataCollectionId, dataPrefix, nil)
 	if err != nil {
 		return err
 	}
diff --git a/go/src/v.io/x/sensorlog/internal/client/stream.go b/go/src/v.io/x/sensorlog/internal/client/stream.go
index 237ad00..2733b44 100644
--- a/go/src/v.io/x/sensorlog/internal/client/stream.go
+++ b/go/src/v.io/x/sensorlog/internal/client/stream.go
@@ -34,7 +34,7 @@
 	if interval.Nanoseconds() < config.MinSamplingInterval.Nanoseconds() {
 		return nil, fmt.Errorf("sampling interval cannot be smaller than %v", config.MinSamplingInterval)
 	}
-	stmKey := sbmodel.KStreamDef{
+	stmKey := &sbmodel.KStreamDef{
 		DevId:    devKey.DevId,
 		StreamId: streamId,
 	}
@@ -52,8 +52,8 @@
 	}
 
 	if err := syncbase.RunInBatch(ctx, db, wire.BatchOptions{}, func(db syncbase.BatchDatabase) error {
-		devRow := db.Collection(devKey.Collection()).Row(devKey.Key())
-		stmRow := db.Collection(stmKey.Collection()).Row(stmKey.Key())
+		devRow := db.CollectionForId(sbmodel.CollectionId(devKey)).Row(devKey.Key())
+		stmRow := db.CollectionForId(sbmodel.CollectionId(stmKey)).Row(stmKey.Key())
 
 		if exists, err := devRow.Exists(ctx); err != nil {
 			return err
@@ -72,7 +72,7 @@
 		return nil, err
 	}
 
-	return &stmKey, nil
+	return stmKey, nil
 }
 
 // TODO(ivanpi): Implement pausing and resuming streams.
diff --git a/go/src/v.io/x/sensorlog/internal/config/defaults.go b/go/src/v.io/x/sensorlog/internal/config/defaults.go
index 28d0b40..a1472ce 100644
--- a/go/src/v.io/x/sensorlog/internal/config/defaults.go
+++ b/go/src/v.io/x/sensorlog/internal/config/defaults.go
@@ -16,6 +16,7 @@
 
 	AppName = "sensorlog"
 	DBName  = "sldb"
+	User    = "sluser"
 
 	SyncPriority = 42
 
diff --git a/go/src/v.io/x/sensorlog/internal/measure/syncgroup.go b/go/src/v.io/x/sensorlog/internal/measure/syncgroup.go
index 2974096..97eabb3 100644
--- a/go/src/v.io/x/sensorlog/internal/measure/syncgroup.go
+++ b/go/src/v.io/x/sensorlog/internal/measure/syncgroup.go
@@ -55,8 +55,8 @@
 	// StreamDef : <devId>
 	// Admin client has full permissions, measured drops to readonly.
 	prefixStreamDef := wire.CollectionRow{
-		CollectionName: sbmodel.KStreamDef{}.Collection(),
-		Row:            devId,
+		CollectionId: sbmodel.CollectionId(&sbmodel.KStreamDef{}),
+		Row:          devId,
 	}
 	aclStreamDef := access.Permissions{}
 	sbutil.AddPermsForPrincipal(&aclStreamDef, v23.GetPrincipal(ctx), access.Resolve, access.Read)
@@ -66,8 +66,8 @@
 	// DataPoint : <devId>
 	// Admin client has full permissions, measured drops to read/write.
 	prefixDataPoint := wire.CollectionRow{
-		CollectionName: sbmodel.KDataPoint{}.Collection(),
-		Row:            devId,
+		CollectionId: sbmodel.CollectionId(&sbmodel.KDataPoint{}),
+		Row:          devId,
 	}
 	aclDataPoint := access.Permissions{}
 	sbutil.AddPermsForPrincipal(&aclDataPoint, v23.GetPrincipal(ctx), access.Resolve, access.Read, access.Write)
diff --git a/go/src/v.io/x/sensorlog/internal/measure/watcher.go b/go/src/v.io/x/sensorlog/internal/measure/watcher.go
index c9260a6..284a41c 100644
--- a/go/src/v.io/x/sensorlog/internal/measure/watcher.go
+++ b/go/src/v.io/x/sensorlog/internal/measure/watcher.go
@@ -27,11 +27,11 @@
 // WatchForStreams is synchronous and runs until the context is cancelled or
 // an error is encountered.
 func WatchForStreams(ctx *context.T, db syncbase.Database, devId string, register RegisterWorker) error {
-	collectionName := sbmodel.KStreamDef{}.Collection()
+	collectionId := sbmodel.CollectionId(&sbmodel.KStreamDef{})
 	watchPrefix := keyutil.Join(devId, "")
 
 	// Watch for StreamDef changes and register samplers as needed.
-	ws, err := db.Watch(ctx, collectionName, watchPrefix, nil)
+	ws, err := db.Watch(ctx, collectionId, watchPrefix, nil)
 	if err != nil {
 		return err
 	}
diff --git a/go/src/v.io/x/sensorlog/internal/measure/watcher_test.go b/go/src/v.io/x/sensorlog/internal/measure/watcher_test.go
index 9b6d9e6..da9015f 100644
--- a/go/src/v.io/x/sensorlog/internal/measure/watcher_test.go
+++ b/go/src/v.io/x/sensorlog/internal/measure/watcher_test.go
@@ -124,9 +124,9 @@
 }
 
 func putStreamDef(t *testing.T, ctx *context.T, db syncbase.DatabaseHandle, devId, desc string) {
-	key := sbmodel.KStreamDef{DevId: devId, StreamId: keyutil.UUID()}
+	key := &sbmodel.KStreamDef{DevId: devId, StreamId: keyutil.UUID()}
 	val := sbmodel.VStreamDef{Desc: desc}
-	if err := db.Collection(key.Collection()).Put(ctx, key.Key(), &val); err != nil {
+	if err := db.CollectionForId(sbmodel.CollectionId(key)).Put(ctx, key.Key(), &val); err != nil {
 		t.Fatalf(tu.FormatLogLine(2, "failed to put StreamDef %s (%s): %v", key.Key(), val.Desc, err))
 	}
 }
diff --git a/go/src/v.io/x/sensorlog/internal/sbmodel/types.go b/go/src/v.io/x/sensorlog/internal/sbmodel/types.go
index 7d5007d..ce33aab 100644
--- a/go/src/v.io/x/sensorlog/internal/sbmodel/types.go
+++ b/go/src/v.io/x/sensorlog/internal/sbmodel/types.go
@@ -7,6 +7,8 @@
 package sbmodel
 
 import (
+	wire "v.io/v23/services/syncbase"
+	"v.io/x/sensorlog/internal/config"
 	"v.io/x/sensorlog/internal/sbmodel/keyutil"
 )
 
@@ -85,3 +87,9 @@
 	{Prototype: &KStreamDef{}, ReadOnly: true},
 	{Prototype: &KDataPoint{}},
 }
+
+// CollectionId returns the collection id for the data type.
+// TODO(ivanpi): Pass in user instead of using default.
+func CollectionId(prototype PersistentDataKey) wire.Id {
+	return wire.Id{Blessing: config.User, Name: prototype.Collection()}
+}
diff --git a/go/src/v.io/x/sensorlog/internal/sbutil/syncbase.go b/go/src/v.io/x/sensorlog/internal/sbutil/syncbase.go
index e076030..1d0af06 100644
--- a/go/src/v.io/x/sensorlog/internal/sbutil/syncbase.go
+++ b/go/src/v.io/x/sensorlog/internal/sbutil/syncbase.go
@@ -19,6 +19,9 @@
 
 // CreateOrOpenDB opens the Sensor Log database hosted on specified Syncbase
 // instance, creating it if missing, and initializing specified collections.
+// TODO(ivanpi): Collection ids currently use a hardwired user blessing. The
+// blessing should instead be provided as a command-line argument, or taken
+// from ctx if not provided.
 func CreateOrOpenDB(ctx *context.T, sbService string, collections []sbmodel.CollectionSpec) (syncbase.Database, error) {
 	aclFull := access.Permissions{}
 	// Allow everyone to resolve to allow joining syncgroups.
@@ -43,7 +46,7 @@
 
 	// TODO(ivanpi): Add schemas when available.
 	for _, cs := range collections {
-		c := db.Collection(cs.Prototype.Collection())
+		c := db.CollectionForId(sbmodel.CollectionId(cs.Prototype))
 		acl := aclReadOnly
 		if !cs.ReadOnly {
 			acl = aclFull
diff --git a/go/src/v.io/x/sensorlog/internal/sbutil/syncbase_test.go b/go/src/v.io/x/sensorlog/internal/sbutil/syncbase_test.go
index 7f248e2..d2b6c17 100644
--- a/go/src/v.io/x/sensorlog/internal/sbutil/syncbase_test.go
+++ b/go/src/v.io/x/sensorlog/internal/sbutil/syncbase_test.go
@@ -47,9 +47,9 @@
 	}
 	// Check that all collections exist.
 	for _, cs := range sbmodel.MasterCollections {
-		c := dbGuest.Collection(cs.Prototype.Collection())
+		c := dbGuest.CollectionForId(sbmodel.CollectionId(cs.Prototype))
 		if exists, err := c.Exists(ctxGuest); err != nil || !exists {
-			t.Errorf("Expected collection %s to exist, got: %v (error: %v)", c.Name(), exists, err)
+			t.Errorf("Expected collection %v to exist, got: %v (error: %v)", c.Id(), exists, err)
 		}
 	}
 }
@@ -73,9 +73,9 @@
 
 	// Check that all collections have correct permissions (full or readonly).
 	for _, cs := range sbmodel.MeasuredCollections {
-		c := dbOwner.Collection(cs.Prototype.Collection())
+		c := dbOwner.CollectionForId(sbmodel.CollectionId(cs.Prototype))
 		if exists, err := c.Exists(ctxOwner); err != nil || !exists {
-			t.Errorf("Expected collection %s to exist, got: %v (error: %v)", c.Name(), exists, err)
+			t.Errorf("Expected collection %v to exist, got: %v (error: %v)", c.Id(), exists, err)
 		}
 		want := expectPermsFull
 		if cs.ReadOnly {
@@ -84,7 +84,7 @@
 		if got, err := c.GetPermissions(ctxOwner); err != nil {
 			t.Errorf("GetPermissions should have succeeded, got error: %v", err)
 		} else if got, want = got.Normalize(), want.Normalize(); !reflect.DeepEqual(got, want) {
-			t.Errorf("Unexpected collection %s permissions: got %v, want %v", c.Name(), got, want)
+			t.Errorf("Unexpected collection %v permissions: got %v, want %v", c.Id(), got, want)
 		}
 	}
 }
diff --git a/go/src/v.io/x/sensorlog/measured/measured.go b/go/src/v.io/x/sensorlog/measured/measured.go
index c0c0b45..ff93284 100644
--- a/go/src/v.io/x/sensorlog/measured/measured.go
+++ b/go/src/v.io/x/sensorlog/measured/measured.go
@@ -79,7 +79,7 @@
 	})
 
 	writer := func(ctx *context.T, key *sbmodel.KDataPoint, val sbmodel.VDataPoint) error {
-		return db.Collection(key.Collection()).Put(ctx, key.Key(), val)
+		return db.CollectionForId(sbmodel.CollectionId(key)).Put(ctx, key.Key(), val)
 	}
 
 	waitWatch := util.AsyncRun(func() error {