java/syncbase: Expose more ways to Watch

AddWatchChangeHandlerOptions now has a builder, though it might
be somewhat verbose to use.

Watch can now use these filters:
- specific collection
- specific row
- collection prefix
- row prefix

Tests added for some of them (not exhaustive)

Change-Id: Ib823421932cc6c26c999a93a33865bbf95a0380e
diff --git a/syncbase/src/main/java/io/v/syncbase/Database.java b/syncbase/src/main/java/io/v/syncbase/Database.java
index fc67c83..00c0858 100644
--- a/syncbase/src/main/java/io/v/syncbase/Database.java
+++ b/syncbase/src/main/java/io/v/syncbase/Database.java
@@ -392,8 +392,72 @@
      * Options for {@code addWatchChangeHandler}.
      */
     public static class AddWatchChangeHandlerOptions {
-        public byte[] resumeMarker;
-        boolean showUserdataCollectionRow;
+        final byte[] resumeMarker;
+        final String name;
+        final String blessing;
+        final String row;
+        final boolean showUserdataCollectionRow;
+
+        AddWatchChangeHandlerOptions(Builder builder) {
+            resumeMarker = builder.resumeMarker;
+            name = builder.name;
+            blessing = builder.blessing;
+            row = builder.row;
+            showUserdataCollectionRow = builder.showUserdataCollectionRow;
+        }
+
+        CollectionRowPattern getCollectionRowPattern() {
+            return new CollectionRowPattern(blessing, name, row);
+        }
+
+        public static class Builder {
+            private byte[] resumeMarker;
+            private String name = WILDCARD;
+            private String blessing = WILDCARD;
+            private String row = WILDCARD;
+            private boolean showUserdataCollectionRow;
+
+            private static final String WILDCARD = "%";
+
+            public Builder setResumeMarker(byte[] resumeMarker) {
+                this.resumeMarker = resumeMarker;
+                return this;
+            }
+
+            public Builder setCollectionNamePrefix(String prefix) {
+                // TODO(alexfandrianto): Unsafe. The prefix was not escaped. Incorrect if it has a
+                // trailing backslash.
+                name = prefix + WILDCARD;
+                return this;
+            }
+
+            public Builder setCollectionId(Id id) {
+                name = id.getName();
+                blessing = id.getBlessing();
+                return this;
+            }
+
+            public Builder setRowKeyPrefix(String prefix) {
+                // TODO(alexfandrianto): Unsafe. The prefix was not escaped. Incorrect if it has a
+                // trailing backslash.
+                row = prefix + WILDCARD;
+                return this;
+            }
+
+            public Builder setRowKey(String rowKey) {
+                row = rowKey;
+                return this;
+            }
+
+            Builder setShowUserdataCollectionRow(boolean shouldShow) {
+                showUserdataCollectionRow = shouldShow;
+                return this;
+            }
+
+            public AddWatchChangeHandlerOptions build() {
+                return new AddWatchChangeHandlerOptions(this);
+            }
+        }
     }
 
     /**
@@ -438,7 +502,7 @@
             throw new UnsupportedOperationException("Specifying resumeMarker is not yet supported");
         }
 
-        mCoreDatabase.watch(null, ImmutableList.of(new CollectionRowPattern("%", "%", "%")),
+        mCoreDatabase.watch(null, ImmutableList.of(opts.getCollectionRowPattern()),
                 new io.v.syncbase.core.Database.WatchPatternsCallbacks() {
                     private boolean mGotFirstBatch = false;
                     private final List<WatchChange> mBatch = new ArrayList<>();
@@ -509,7 +573,7 @@
      * {@code AddWatchChangeHandlerOptions}.
      */
     public void addWatchChangeHandler(WatchChangeHandler h) {
-        addWatchChangeHandler(h, new AddWatchChangeHandlerOptions());
+        addWatchChangeHandler(h, new AddWatchChangeHandlerOptions.Builder().build());
     }
 
     /**
diff --git a/syncbase/src/main/java/io/v/syncbase/Syncbase.java b/syncbase/src/main/java/io/v/syncbase/Syncbase.java
index e026642..56083da 100644
--- a/syncbase/src/main/java/io/v/syncbase/Syncbase.java
+++ b/syncbase/src/main/java/io/v/syncbase/Syncbase.java
@@ -267,8 +267,8 @@
                             syncgroup.createIfMissing(ImmutableList.of(sUserdataCollection));
                         }
                         Database.AddWatchChangeHandlerOptions opts = new Database
-                                .AddWatchChangeHandlerOptions();
-                        opts.showUserdataCollectionRow = true;
+                                .AddWatchChangeHandlerOptions.Builder().
+                                setShowUserdataCollectionRow(true).build();
                         sDatabase.addWatchChangeHandler(new UserdataWatchHandler(), opts);
                     }
                     sOpts.callbackExecutor.execute(new Runnable() {
diff --git a/syncbase/src/test/java/io/v/syncbase/SyncbaseTest.java b/syncbase/src/test/java/io/v/syncbase/SyncbaseTest.java
index f8fd471..8513424 100644
--- a/syncbase/src/test/java/io/v/syncbase/SyncbaseTest.java
+++ b/syncbase/src/test/java/io/v/syncbase/SyncbaseTest.java
@@ -215,8 +215,8 @@
                 WatchChange watchChange = changes.next();
                 assertEquals(WatchChange.ChangeType.DELETE, watchChange.getChangeType());
                 assertTrue(watchChange.getCollectionId().getName().startsWith("c"));
-                // TODO(razvanm): Uncomment after the POJO start working.
-                //assertEquals(1, watchChange.getValue());
+                assertEquals("foo", watchChange.getRowKey());
+
                 assertFalse(changes.hasNext());
                 waitOnChangeBatch.set(null);
             }
@@ -235,6 +235,140 @@
     }
 
     @Test
+    public void testWatchSpecificCollection() throws Exception {
+        Database db = createDatabase();
+        final SettableFuture<Void> waitOnInitialState = SettableFuture.create();
+        final SettableFuture<Void> waitOnChangeBatch = SettableFuture.create();
+        Collection collection = db.createCollection();
+        collection.put("foo", 1);
+        // Note: For ease of test writing, "gar" is a key lexicographically after "foo".
+        collection.put("gar", 2);
+        final String collectionName = collection.getId().getName();
+        Database.AddWatchChangeHandlerOptions opts = new Database.AddWatchChangeHandlerOptions.
+                Builder().setCollectionId(collection.getId()).build();
+        db.addWatchChangeHandler(new Database.WatchChangeHandler() {
+            @Override
+            public void onInitialState(Iterator<WatchChange> values) {
+                // TODO(razvanm): Check the entire contents of each change.
+                // 1st change: the collection entity for the "c" collection.
+                assertTrue(values.hasNext());
+                WatchChange watchChange = (WatchChange) values.next();
+                assertEquals(WatchChange.EntityType.COLLECTION, watchChange.getEntityType());
+                assertEquals(WatchChange.ChangeType.PUT, watchChange.getChangeType());
+                assertTrue(watchChange.getCollectionId().getName().equals(collectionName));
+                // 2nd change: the row for the "foo" key.
+                assertTrue(values.hasNext());
+                watchChange = (WatchChange) values.next();
+                assertEquals(WatchChange.EntityType.ROW, watchChange.getEntityType());
+                assertEquals(WatchChange.ChangeType.PUT, watchChange.getChangeType());
+                assertTrue(watchChange.getCollectionId().getName().equals(collectionName));
+                assertEquals("foo", watchChange.getRowKey());
+                // TODO(razvanm): Uncomment after the POJO start working.
+                //assertEquals(1, watchChange.getValue());
+                // 3rd change: the row for the "gar" key.
+                assertTrue(values.hasNext());
+                watchChange = (WatchChange) values.next();
+                assertEquals(WatchChange.EntityType.ROW, watchChange.getEntityType());
+                assertEquals(WatchChange.ChangeType.PUT, watchChange.getChangeType());
+                assertTrue(watchChange.getCollectionId().getName().equals(collectionName));
+                assertEquals("gar", watchChange.getRowKey());
+                // TODO(razvanm): Uncomment after the POJO start working.
+                //assertEquals(2, watchChange.getValue());
+
+                // No more changes.
+                assertFalse(values.hasNext());
+                waitOnInitialState.set(null);
+            }
+
+            @Override
+            public void onChangeBatch(Iterator<WatchChange> changes) {
+                assertTrue(changes.hasNext());
+                WatchChange watchChange = changes.next();
+                assertEquals(WatchChange.ChangeType.DELETE, watchChange.getChangeType());
+                assertTrue(watchChange.getCollectionId().getName().startsWith("c"));
+                assertEquals("foo", watchChange.getRowKey());
+
+                assertFalse(changes.hasNext());
+                waitOnChangeBatch.set(null);
+            }
+
+            @Override
+            public void onError(Throwable e) {
+                VError vError = (VError) e;
+                assertEquals("v.io/v23/verror.Unknown", vError.id);
+                assertEquals("context canceled", vError.message);
+                assertEquals(0, vError.actionCode);
+            }
+        }, opts);
+        waitOnInitialState.get(1, TimeUnit.SECONDS);
+        collection.delete("foo");
+        waitOnChangeBatch.get(1, TimeUnit.SECONDS);
+    }
+
+
+    @Test
+    public void testWatchSpecificCollectionWithRowPrefix() throws Exception {
+        Database db = createDatabase();
+        final SettableFuture<Void> waitOnInitialState = SettableFuture.create();
+        final SettableFuture<Void> waitOnChangeBatch = SettableFuture.create();
+        Collection collection = db.createCollection();
+        collection.put("foo", 1);
+        collection.put("bar", 1); // not seen, due to filter
+        final String collectionName = collection.getId().getName();
+        Database.AddWatchChangeHandlerOptions opts = new Database.AddWatchChangeHandlerOptions.
+                Builder().setCollectionId(collection.getId()).setRowKeyPrefix("f").build();
+        db.addWatchChangeHandler(new Database.WatchChangeHandler() {
+            @Override
+            public void onInitialState(Iterator<WatchChange> values) {
+                // TODO(razvanm): Check the entire contents of each change.
+                // 1st change: the collection entity for the "c" collection.
+                assertTrue(values.hasNext());
+                WatchChange watchChange = (WatchChange) values.next();
+                assertEquals(WatchChange.EntityType.COLLECTION, watchChange.getEntityType());
+                assertEquals(WatchChange.ChangeType.PUT, watchChange.getChangeType());
+                assertTrue(watchChange.getCollectionId().getName().equals(collectionName));
+                // 2nd change: the row for the "foo" key.
+                assertTrue(values.hasNext());
+                watchChange = (WatchChange) values.next();
+                assertEquals(WatchChange.EntityType.ROW, watchChange.getEntityType());
+                assertEquals(WatchChange.ChangeType.PUT, watchChange.getChangeType());
+                assertTrue(watchChange.getCollectionId().getName().equals(collectionName));
+                assertEquals("foo", watchChange.getRowKey());
+                // TODO(razvanm): Uncomment after the POJO start working.
+                //assertEquals(1, watchChange.getValue());
+
+                // No more changes.
+                assertFalse(values.hasNext());
+                waitOnInitialState.set(null);
+            }
+
+            @Override
+            public void onChangeBatch(Iterator<WatchChange> changes) {
+                assertTrue(changes.hasNext());
+                WatchChange watchChange = changes.next();
+                assertEquals(WatchChange.ChangeType.DELETE, watchChange.getChangeType());
+                assertTrue(watchChange.getCollectionId().getName().startsWith("c"));
+                assertEquals("foo", watchChange.getRowKey());
+
+                assertFalse(changes.hasNext());
+                waitOnChangeBatch.set(null);
+            }
+
+            @Override
+            public void onError(Throwable e) {
+                VError vError = (VError) e;
+                assertEquals("v.io/v23/verror.Unknown", vError.id);
+                assertEquals("context canceled", vError.message);
+                assertEquals(0, vError.actionCode);
+            }
+        }, opts);
+        waitOnInitialState.get(1, TimeUnit.SECONDS);
+        collection.delete("foo");
+        collection.delete("bar"); // not noticed due to filter.
+        waitOnChangeBatch.get(1, TimeUnit.SECONDS);
+    }
+
+    @Test
     public void testRunInBatch() throws Exception {
         Database db = createDatabase();