Implement watch functionality

Change-Id: Ibb62df0546f8ff801cb775da6eeae9aaaefe1817
diff --git a/cmd/sb/commands/acl.go b/cmd/sb/commands/acl.go
index 6afdc5f..f68e64e 100644
--- a/cmd/sb/commands/acl.go
+++ b/cmd/sb/commands/acl.go
@@ -33,8 +33,8 @@
 
 func init() {
 	cmdAcl.Flags.StringVar(&flagTarget, "target", "db", "The access controller type to act on (service, db, database, cx, collection, sg, syncgroup).")
-	cmdAcl.Flags.StringVar(&flagCollection, "cx", "", "The collection to act on. (note: requires -target=collection or cx)")
-	cmdAcl.Flags.StringVar(&flagSyncgroup, "sg", "", "The syncgroup to act on. (note: requires -target=syncgroup or sg)")
+	cmdAcl.Flags.StringVar(&flagCollection, "cx", "", "The collection to act on, in format <blessing>,<collection>. (note: requires -target=collection or cx)")
+	cmdAcl.Flags.StringVar(&flagSyncgroup, "sg", "", "The syncgroup to act on, in format <blessing>,<syncgroup>. (note: requires -target=syncgroup or sg)")
 }
 
 func updatePerms(ctx *context.T, db syncbase.Database, env *cmdline.Env, callback func(access.Permissions) (access.Permissions, error)) error {
@@ -61,6 +61,7 @@
 		if err := ac.SetPermissions(ctx, perms, version); err != nil {
 			return fmt.Errorf("error setting permissions: %q", err)
 		}
+		return nil
 	case "cx", "collection":
 		bdb, err := db.BeginBatch(ctx, wire.BatchOptions{})
 		if err != nil {
@@ -68,7 +69,11 @@
 		}
 		defer bdb.Abort(ctx)
 
-		collection := bdb.Collection(ctx, flagCollection)
+		id, err := wire.ParseId(flagCollection)
+		if err != nil {
+			return err
+		}
+		collection := bdb.CollectionForId(id)
 		perms, err := collection.GetPermissions(ctx)
 		if err != nil {
 			return err
@@ -86,7 +91,11 @@
 
 		return bdb.Commit(ctx)
 	case "sg", "syncgroup":
-		sg := db.Syncgroup(ctx, flagSyncgroup)
+		id, err := wire.ParseId(flagSyncgroup)
+		if err != nil {
+			return err
+		}
+		sg := db.SyncgroupForId(id)
 		spec, version, err := sg.GetSpec(ctx)
 		if err != nil {
 			return err
@@ -126,7 +135,7 @@
 	blessing := args[0]
 	return updatePerms(ctx, db, env,
 		func(perms access.Permissions) (access.Permissions, error) {
-			// Pretty-print the groups that blessing is in
+			// Pretty-print the groups that blessing is in.
 			var groupsIn []string
 			for groupName, acl := range perms {
 				for _, b := range acl.In {
@@ -138,7 +147,7 @@
 			}
 			fmt.Printf("[%s]\n", strings.Join(groupsIn, ", "))
 
-			// Pretty-print the groups that blessing is blacklisted from
+			// Pretty-print the groups that blessing is blacklisted from.
 			var groupsNotIn []string
 			for groupName, acl := range perms {
 				for _, b := range acl.NotIn {
@@ -252,7 +261,7 @@
 }
 
 // parseAccessList returns a map from blessing patterns to structs containing lists
-// of tags which the pattern is white/blacklisted on
+// of tags which the pattern is white/blacklisted on.
 func parseAccessList(args []string) (map[string]userTags, error) {
 	if got := len(args); got%2 != 0 {
 		return nil, fmt.Errorf("incorrect number of arguments %d, must be even", got)
diff --git a/cmd/sb/commands/commands.go b/cmd/sb/commands/commands.go
index 5900016..ac324ed 100644
--- a/cmd/sb/commands/commands.go
+++ b/cmd/sb/commands/commands.go
@@ -27,6 +27,8 @@
 	cmdSelect,
 	cmdSg,
 	cmdAcl,
+	cmdWatch,
+	cmdDestroy,
 }
 
 var (
diff --git a/cmd/sb/commands/destroy.go b/cmd/sb/commands/destroy.go
new file mode 100644
index 0000000..15fa2f1
--- /dev/null
+++ b/cmd/sb/commands/destroy.go
@@ -0,0 +1,32 @@
+// Copyright 2016 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package commands
+
+import (
+	"v.io/v23/context"
+	wire "v.io/v23/services/syncbase"
+	"v.io/v23/syncbase"
+	"v.io/x/lib/cmdline"
+)
+
+var cmdDestroy = &cmdline.Command{
+	Name:     "destroy",
+	Short:    "Destroy a specified collection",
+	Long:     "Destroy a specified collection.",
+	ArgsName: "<collection_blessing>,<collection_name>",
+	Runner:   SbRunner(runDestroy),
+}
+
+func runDestroy(ctx *context.T, db syncbase.Database, env *cmdline.Env, args []string) error {
+	if got := len(args); got != 1 {
+		return env.UsageErrorf("destroy: expected 1 arg, got %d", got)
+	}
+
+	if id, err := wire.ParseId(args[0]); err != nil {
+		return err
+	} else {
+		return db.CollectionForId(id).Destroy(ctx)
+	}
+}
diff --git a/cmd/sb/commands/watch.go b/cmd/sb/commands/watch.go
new file mode 100644
index 0000000..07c0b05
--- /dev/null
+++ b/cmd/sb/commands/watch.go
@@ -0,0 +1,89 @@
+// Copyright 2016 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package commands
+
+import (
+	"fmt"
+	"strings"
+
+	"v.io/v23/context"
+	wire "v.io/v23/services/syncbase"
+	"v.io/v23/services/watch"
+	"v.io/v23/syncbase"
+	"v.io/v23/vdl"
+	"v.io/x/lib/cmdline"
+)
+
+var cmdWatch = &cmdline.Command{
+	Name:     "watch",
+	Short:    "Watch for updates to the database",
+	Long:     "Watch for updates to the database.",
+	ArgsName: "(<collection_blessing>,<collection_name>[,<row_key>] )*",
+	ArgsLong: `
+If no patterns are specified, watch defaults to watching for updates to any row
+of any collection.
+`,
+	Runner: SbRunner(runWatch),
+}
+
+func runWatch(ctx *context.T, db syncbase.Database, env *cmdline.Env, args []string) error {
+	// Get list of patterns to watch for.
+	var patterns []wire.CollectionRowPattern
+	if len(args) == 0 {
+		patterns = []wire.CollectionRowPattern{
+			wire.CollectionRowPattern{"%", "%", "%"},
+		}
+	} else {
+		patterns = make([]wire.CollectionRowPattern, len(args))
+		for i, arg := range args {
+			patterns[i] = parsePattern(arg)
+		}
+	}
+
+	// Watch for and print out changes.
+	ws := db.Watch(ctx, watch.ResumeMarker("now"), patterns)
+	for ws.Advance() {
+		ch := ws.Change()
+		switch ch.EntityType {
+		case syncbase.EntityRow:
+			fmt.Printf("change on row %s of collection %s:\n", ch.Row, ch.Collection)
+
+			switch ch.ChangeType {
+			case syncbase.PutChange:
+				var v vdl.Value
+				if err := ch.Value(&v); err != nil {
+					fmt.Fprintf(env.Stderr, "\terror reading new value: %v\n", err)
+				} else {
+					fmt.Printf("\tnew value: %s\n", v.String())
+				}
+			case syncbase.DeleteChange:
+				fmt.Printf("\trow removed\n")
+			default:
+				fmt.Fprintf(env.Stderr, "\tbad change type\n")
+			}
+
+		case syncbase.EntityCollection:
+			fmt.Printf("change on collection %s:\n", ch.Collection)
+			switch ch.ChangeType {
+			case syncbase.PutChange:
+				fmt.Printf("\tnew info: %v\n", ch.CollectionInfo())
+			case syncbase.DeleteChange:
+				fmt.Printf("\tcollection deleted\n")
+			default:
+				fmt.Fprintf(env.Stderr, "\tbad change type\n")
+			}
+
+		default:
+			fmt.Fprintln(env.Stderr, "malformed change detected")
+		}
+	}
+
+	return ws.Err()
+}
+
+func parsePattern(p string) wire.CollectionRowPattern {
+	fields := strings.SplitN(p, ",", 3)
+	return wire.CollectionRowPattern{fields[0], fields[1], fields[2]}
+}
diff --git a/cmd/sb/dbutil/dbutil.go b/cmd/sb/dbutil/dbutil.go
index 34f0e33..039ee45 100644
--- a/cmd/sb/dbutil/dbutil.go
+++ b/cmd/sb/dbutil/dbutil.go
@@ -31,7 +31,7 @@
 
 // OpenDB is a user-friendly wrapper for openDB.
 func OpenDB(ctx *context.T) (syncbase.Database, error) {
-	// Open a connection to syncbase
+	// Open a connection to syncbase.
 	sbService := syncbase.NewService(flagService)
 	dbId, err := wire.ParseId(flagDbId)
 	if err != nil {
diff --git a/cmd/sb/doc.go b/cmd/sb/doc.go
index 182e1b0..b5d2a32 100644
--- a/cmd/sb/doc.go
+++ b/cmd/sb/doc.go
@@ -19,6 +19,8 @@
    select      Display particular rows, or parts of rows
    sg          Manipulate syncgroups
    acl         Read or mutate the ACLs
+   watch       Watch for updates to the database
+   destroy     Destroy a specified collection
    help        Display help for commands or topics
 
 The global flags are: