syncbase/watch: Return initial state for empty ResumeMarker.

Empty ResumeMarker is a convenience for Scan followed by Watch,
returning the initial state of the watched row set as the first
change batch, as specified in v.io/v23/watch.

Fixes v.io/i/693 and most of v.io/i/917

MultiPart: 4/5
Change-Id: Ib316fd61c0c45211836f532ac4677507002c5f05
diff --git a/lib/src/main/java/io/v/v23/syncbase/nosql/Database.java b/lib/src/main/java/io/v/v23/syncbase/nosql/Database.java
index fe2d0f3..9518a58 100644
--- a/lib/src/main/java/io/v/v23/syncbase/nosql/Database.java
+++ b/lib/src/main/java/io/v/v23/syncbase/nosql/Database.java
@@ -113,21 +113,12 @@
 
     /**
      * Allows a client to watch for updates to the database. For each watch request, the client will
-     * receive a reliable {@link InputChannel} of watch events without re-ordering.
+     * receive a reliable {@link InputChannel} of watch events since the provided
+     * {@link ResumeMarker} without re-ordering.
      * <p>
      * See {@link io.v.v23.services.watch.GlobWatcherClient} for a detailed explanation of the
-     * watch behavior.
+     * watch behavior and additional {@link ResumeMarker} semantics.
      * <p>
-     * In order to not miss any changes, clients should obtain a {@link ResumeMarker} from a
-     * fixed snapshot of the database, and then listen to changes that occurred only after this
-     * snapshot.  In other words, a client should:
-     * <p><ol>
-     * <li>begin a read-only batch,</li>
-     * <li>read all information your app needs,</li>
-     * <li>read the ResumeMarker,</li>
-     * <li>abort the batch,</li>
-     * <li>start watching for changes to the data using the {@link ResumeMarker}.</li>
-     * </ol><p>
      * {@link io.v.v23.context.VContext#cancel Canceling} the provided context will
      * stop the watch operation and cause the channel to stop producing elements.  Note that to
      * avoid memory leaks, the caller should drain the channel after cancelling the context.
@@ -142,6 +133,20 @@
                                     String rowPrefix, ResumeMarker resumeMarker);
 
     /**
+     * Allows a client to watch for updates to the database. Same as
+     * {@link #watch(VContext, String, String, ResumeMarker)} with an empty {@link ResumeMarker}:
+     * the first batch on the returned stream represents the initial state of the watched row set
+     * at the time of the call.
+     *
+     * @param context             vanadium context
+     * @param tableRelativeName   relative name of the table to watch
+     * @param rowPrefix           prefix of the rows to watch
+     * @return                    a (potentially-infinite) {@link InputChannel} of changes
+     */
+    InputChannel<WatchChange> watch(VContext context, String tableRelativeName,
+                                    String rowPrefix);
+
+    /**
      * Returns a handle to a database {@link Syncgroup} with the given full (i.e., object) name.
      *
      * @param  name name of the synchronization group
diff --git a/lib/src/main/java/io/v/v23/syncbase/nosql/DatabaseImpl.java b/lib/src/main/java/io/v/v23/syncbase/nosql/DatabaseImpl.java
index e5412b7..bf2f04a 100644
--- a/lib/src/main/java/io/v/v23/syncbase/nosql/DatabaseImpl.java
+++ b/lib/src/main/java/io/v/v23/syncbase/nosql/DatabaseImpl.java
@@ -167,6 +167,11 @@
                 });
     }
     @Override
+    public InputChannel<WatchChange> watch(VContext ctx, String tableRelativeName,
+                                           String rowPrefix) {
+        return this.watch(ctx, tableRelativeName, rowPrefix, null);
+    }
+    @Override
     public ListenableFuture<ResumeMarker> getResumeMarker(VContext ctx) {
         return client.getResumeMarker(ctx);
     }
diff --git a/lib/src/test/java/io/v/v23/syncbase/SyncbaseTest.java b/lib/src/test/java/io/v/v23/syncbase/SyncbaseTest.java
index 8ed0b85..4df616a 100644
--- a/lib/src/test/java/io/v/v23/syncbase/SyncbaseTest.java
+++ b/lib/src/test/java/io/v/v23/syncbase/SyncbaseTest.java
@@ -50,6 +50,7 @@
 import java.io.OutputStream;
 import java.io.Serializable;
 import java.util.Iterator;
+import java.util.List;
 
 import static com.google.common.truth.Truth.assertThat;
 import static io.v.v23.VFutures.sync;
@@ -250,16 +251,38 @@
         VContext ctxC = ctx.withCancel();
         Iterator<WatchChange> it = InputChannels.asIterable(
                 db.watch(ctxC, TABLE_NAME, "b", marker)).iterator();
-        for (WatchChange expected : expectedChanges) {
-            assertThat(it.hasNext()).isTrue();
-            WatchChange actual = it.next();
-            assertThat(actual.getTableName()).isEqualTo(expected.getTableName());
-            assertThat(actual.getRowName()).isEqualTo(expected.getRowName());
-            assertThat(actual.getChangeType()).isEqualTo(expected.getChangeType());
-            assertThat(actual.getVomValue()).isEqualTo(expected.getVomValue());
-            assertThat(actual.isFromSync()).isEqualTo(expected.isFromSync());
-            assertThat(actual.isContinued()).isEqualTo(expected.isContinued());
-        }
+        checkWatch(it, expectedChanges);
+        ctxC.cancel();
+    }
+
+    public void testDatabaseWatchWithInitialState() throws Exception {
+        Database db = createDatabase(createApp(createService()));
+        Table table = createTable(db);
+        Foo foo = new Foo(4, "f");
+        Bar bar = new Bar(0.5f, "b");
+        Baz baz = new Baz("John Doe", true);
+
+        sync(table.put(ctx, "foo", foo, Foo.class));
+        sync(table.put(ctx, "barfoo", foo, Foo.class));
+        sync(table.put(ctx, "bar", bar, Bar.class));
+
+        VContext ctxC = ctx.withCancel();
+        Iterator<WatchChange> it = InputChannels.asIterable(
+                db.watch(ctxC, TABLE_NAME, "b")).iterator();
+
+        sync(table.put(ctx, "baz", baz, Baz.class));
+        sync(table.getRow("baz").delete(ctx));
+
+        ImmutableList<WatchChange> expectedChanges = ImmutableList.of(
+                new WatchChange(TABLE_NAME, "bar", ChangeType.PUT_CHANGE,
+                        VomUtil.encode(bar, Bar.class), null, false, true),
+                new WatchChange(TABLE_NAME, "barfoo", ChangeType.PUT_CHANGE,
+                        VomUtil.encode(foo, Foo.class), null, false, false),
+                new WatchChange(TABLE_NAME, "baz", ChangeType.PUT_CHANGE,
+                        VomUtil.encode(baz, Baz.class), null, false, false),
+                new WatchChange(TABLE_NAME, "baz", ChangeType.DELETE_CHANGE,
+                        new byte[0], null, false, false));
+        checkWatch(it, expectedChanges);
         ctxC.cancel();
     }
 
@@ -565,6 +588,20 @@
         return table;
     }
 
+    private void checkWatch(Iterator<WatchChange> it,
+                            List<WatchChange> expectedChanges) throws Exception {
+        for (WatchChange expected : expectedChanges) {
+            assertThat(it.hasNext()).isTrue();
+            WatchChange actual = it.next();
+            assertThat(actual.getTableName()).isEqualTo(expected.getTableName());
+            assertThat(actual.getRowName()).isEqualTo(expected.getRowName());
+            assertThat(actual.getChangeType()).isEqualTo(expected.getChangeType());
+            assertThat(actual.getVomValue()).isEqualTo(expected.getVomValue());
+            assertThat(actual.isFromSync()).isEqualTo(expected.isFromSync());
+            assertThat(actual.isContinued()).isEqualTo(expected.isContinued());
+        }
+    }
+
     private static class Foo implements Serializable {
         private int i;
         private String s;