syncbase/watch: Return initial state for empty ResumeMarker.
Empty ResumeMarker is a convenience for Scan followed by Watch,
returning the initial state of the watched row set as the first
change batch, as specified in v.io/v23/watch.
Fixes v.io/i/693 and most of v.io/i/917
MultiPart: 4/5
Change-Id: Ib316fd61c0c45211836f532ac4677507002c5f05
diff --git a/lib/src/main/java/io/v/v23/syncbase/nosql/Database.java b/lib/src/main/java/io/v/v23/syncbase/nosql/Database.java
index fe2d0f3..9518a58 100644
--- a/lib/src/main/java/io/v/v23/syncbase/nosql/Database.java
+++ b/lib/src/main/java/io/v/v23/syncbase/nosql/Database.java
@@ -113,21 +113,12 @@
/**
* Allows a client to watch for updates to the database. For each watch request, the client will
- * receive a reliable {@link InputChannel} of watch events without re-ordering.
+ * receive a reliable {@link InputChannel} of watch events since the provided
+ * {@link ResumeMarker} without re-ordering.
* <p>
* See {@link io.v.v23.services.watch.GlobWatcherClient} for a detailed explanation of the
- * watch behavior.
+ * watch behavior and additional {@link ResumeMarker} semantics.
* <p>
- * In order to not miss any changes, clients should obtain a {@link ResumeMarker} from a
- * fixed snapshot of the database, and then listen to changes that occurred only after this
- * snapshot. In other words, a client should:
- * <p><ol>
- * <li>begin a read-only batch,</li>
- * <li>read all information your app needs,</li>
- * <li>read the ResumeMarker,</li>
- * <li>abort the batch,</li>
- * <li>start watching for changes to the data using the {@link ResumeMarker}.</li>
- * </ol><p>
* {@link io.v.v23.context.VContext#cancel Canceling} the provided context will
* stop the watch operation and cause the channel to stop producing elements. Note that to
* avoid memory leaks, the caller should drain the channel after cancelling the context.
@@ -142,6 +133,20 @@
String rowPrefix, ResumeMarker resumeMarker);
/**
+ * Allows a client to watch for updates to the database. Same as
+ * {@link #watch(VContext, String, String, ResumeMarker)} with an empty {@link ResumeMarker}:
+ * the first batch on the returned stream represents the initial state of the watched row set
+ * at the time of the call.
+ *
+ * @param context vanadium context
+ * @param tableRelativeName relative name of the table to watch
+ * @param rowPrefix prefix of the rows to watch
+ * @return a (potentially-infinite) {@link InputChannel} of changes
+ */
+ InputChannel<WatchChange> watch(VContext context, String tableRelativeName,
+ String rowPrefix);
+
+ /**
* Returns a handle to a database {@link Syncgroup} with the given full (i.e., object) name.
*
* @param name name of the synchronization group
diff --git a/lib/src/main/java/io/v/v23/syncbase/nosql/DatabaseImpl.java b/lib/src/main/java/io/v/v23/syncbase/nosql/DatabaseImpl.java
index e5412b7..bf2f04a 100644
--- a/lib/src/main/java/io/v/v23/syncbase/nosql/DatabaseImpl.java
+++ b/lib/src/main/java/io/v/v23/syncbase/nosql/DatabaseImpl.java
@@ -167,6 +167,11 @@
});
}
@Override
+ public InputChannel<WatchChange> watch(VContext ctx, String tableRelativeName,
+ String rowPrefix) {
+ return this.watch(ctx, tableRelativeName, rowPrefix, null);
+ }
+ @Override
public ListenableFuture<ResumeMarker> getResumeMarker(VContext ctx) {
return client.getResumeMarker(ctx);
}
diff --git a/lib/src/test/java/io/v/v23/syncbase/SyncbaseTest.java b/lib/src/test/java/io/v/v23/syncbase/SyncbaseTest.java
index 8ed0b85..4df616a 100644
--- a/lib/src/test/java/io/v/v23/syncbase/SyncbaseTest.java
+++ b/lib/src/test/java/io/v/v23/syncbase/SyncbaseTest.java
@@ -50,6 +50,7 @@
import java.io.OutputStream;
import java.io.Serializable;
import java.util.Iterator;
+import java.util.List;
import static com.google.common.truth.Truth.assertThat;
import static io.v.v23.VFutures.sync;
@@ -250,16 +251,38 @@
VContext ctxC = ctx.withCancel();
Iterator<WatchChange> it = InputChannels.asIterable(
db.watch(ctxC, TABLE_NAME, "b", marker)).iterator();
- for (WatchChange expected : expectedChanges) {
- assertThat(it.hasNext()).isTrue();
- WatchChange actual = it.next();
- assertThat(actual.getTableName()).isEqualTo(expected.getTableName());
- assertThat(actual.getRowName()).isEqualTo(expected.getRowName());
- assertThat(actual.getChangeType()).isEqualTo(expected.getChangeType());
- assertThat(actual.getVomValue()).isEqualTo(expected.getVomValue());
- assertThat(actual.isFromSync()).isEqualTo(expected.isFromSync());
- assertThat(actual.isContinued()).isEqualTo(expected.isContinued());
- }
+ checkWatch(it, expectedChanges);
+ ctxC.cancel();
+ }
+
+ public void testDatabaseWatchWithInitialState() throws Exception {
+ Database db = createDatabase(createApp(createService()));
+ Table table = createTable(db);
+ Foo foo = new Foo(4, "f");
+ Bar bar = new Bar(0.5f, "b");
+ Baz baz = new Baz("John Doe", true);
+
+ sync(table.put(ctx, "foo", foo, Foo.class));
+ sync(table.put(ctx, "barfoo", foo, Foo.class));
+ sync(table.put(ctx, "bar", bar, Bar.class));
+
+ VContext ctxC = ctx.withCancel();
+ Iterator<WatchChange> it = InputChannels.asIterable(
+ db.watch(ctxC, TABLE_NAME, "b")).iterator();
+
+ sync(table.put(ctx, "baz", baz, Baz.class));
+ sync(table.getRow("baz").delete(ctx));
+
+ ImmutableList<WatchChange> expectedChanges = ImmutableList.of(
+ new WatchChange(TABLE_NAME, "bar", ChangeType.PUT_CHANGE,
+ VomUtil.encode(bar, Bar.class), null, false, true),
+ new WatchChange(TABLE_NAME, "barfoo", ChangeType.PUT_CHANGE,
+ VomUtil.encode(foo, Foo.class), null, false, false),
+ new WatchChange(TABLE_NAME, "baz", ChangeType.PUT_CHANGE,
+ VomUtil.encode(baz, Baz.class), null, false, false),
+ new WatchChange(TABLE_NAME, "baz", ChangeType.DELETE_CHANGE,
+ new byte[0], null, false, false));
+ checkWatch(it, expectedChanges);
ctxC.cancel();
}
@@ -565,6 +588,20 @@
return table;
}
+ private void checkWatch(Iterator<WatchChange> it,
+ List<WatchChange> expectedChanges) throws Exception {
+ for (WatchChange expected : expectedChanges) {
+ assertThat(it.hasNext()).isTrue();
+ WatchChange actual = it.next();
+ assertThat(actual.getTableName()).isEqualTo(expected.getTableName());
+ assertThat(actual.getRowName()).isEqualTo(expected.getRowName());
+ assertThat(actual.getChangeType()).isEqualTo(expected.getChangeType());
+ assertThat(actual.getVomValue()).isEqualTo(expected.getVomValue());
+ assertThat(actual.isFromSync()).isEqualTo(expected.isFromSync());
+ assertThat(actual.isContinued()).isEqualTo(expected.isContinued());
+ }
+ }
+
private static class Foo implements Serializable {
private int i;
private String s;