java/syncbase: Expose more ways to Watch
AddWatchChangeHandlerOptions now has a builder, though it might
be somewhat verbose to use.
Watch can now use these filters:
- specific collection
- specific row
- collection prefix
- row prefix
Tests added for some of them (not exhaustive)
Change-Id: Ib823421932cc6c26c999a93a33865bbf95a0380e
diff --git a/syncbase/src/main/java/io/v/syncbase/Database.java b/syncbase/src/main/java/io/v/syncbase/Database.java
index fc67c83..00c0858 100644
--- a/syncbase/src/main/java/io/v/syncbase/Database.java
+++ b/syncbase/src/main/java/io/v/syncbase/Database.java
@@ -392,8 +392,72 @@
* Options for {@code addWatchChangeHandler}.
*/
public static class AddWatchChangeHandlerOptions {
- public byte[] resumeMarker;
- boolean showUserdataCollectionRow;
+ final byte[] resumeMarker;
+ final String name;
+ final String blessing;
+ final String row;
+ final boolean showUserdataCollectionRow;
+
+ AddWatchChangeHandlerOptions(Builder builder) {
+ resumeMarker = builder.resumeMarker;
+ name = builder.name;
+ blessing = builder.blessing;
+ row = builder.row;
+ showUserdataCollectionRow = builder.showUserdataCollectionRow;
+ }
+
+ CollectionRowPattern getCollectionRowPattern() {
+ return new CollectionRowPattern(blessing, name, row);
+ }
+
+ public static class Builder {
+ private byte[] resumeMarker;
+ private String name = WILDCARD;
+ private String blessing = WILDCARD;
+ private String row = WILDCARD;
+ private boolean showUserdataCollectionRow;
+
+ private static final String WILDCARD = "%";
+
+ public Builder setResumeMarker(byte[] resumeMarker) {
+ this.resumeMarker = resumeMarker;
+ return this;
+ }
+
+ public Builder setCollectionNamePrefix(String prefix) {
+ // TODO(alexfandrianto): Unsafe. The prefix was not escaped. Incorrect if it has a
+ // trailing backslash.
+ name = prefix + WILDCARD;
+ return this;
+ }
+
+ public Builder setCollectionId(Id id) {
+ name = id.getName();
+ blessing = id.getBlessing();
+ return this;
+ }
+
+ public Builder setRowKeyPrefix(String prefix) {
+ // TODO(alexfandrianto): Unsafe. The prefix was not escaped. Incorrect if it has a
+ // trailing backslash.
+ row = prefix + WILDCARD;
+ return this;
+ }
+
+ public Builder setRowKey(String rowKey) {
+ row = rowKey;
+ return this;
+ }
+
+ Builder setShowUserdataCollectionRow(boolean shouldShow) {
+ showUserdataCollectionRow = shouldShow;
+ return this;
+ }
+
+ public AddWatchChangeHandlerOptions build() {
+ return new AddWatchChangeHandlerOptions(this);
+ }
+ }
}
/**
@@ -438,7 +502,7 @@
throw new UnsupportedOperationException("Specifying resumeMarker is not yet supported");
}
- mCoreDatabase.watch(null, ImmutableList.of(new CollectionRowPattern("%", "%", "%")),
+ mCoreDatabase.watch(null, ImmutableList.of(opts.getCollectionRowPattern()),
new io.v.syncbase.core.Database.WatchPatternsCallbacks() {
private boolean mGotFirstBatch = false;
private final List<WatchChange> mBatch = new ArrayList<>();
@@ -509,7 +573,7 @@
* {@code AddWatchChangeHandlerOptions}.
*/
public void addWatchChangeHandler(WatchChangeHandler h) {
- addWatchChangeHandler(h, new AddWatchChangeHandlerOptions());
+ addWatchChangeHandler(h, new AddWatchChangeHandlerOptions.Builder().build());
}
/**
diff --git a/syncbase/src/main/java/io/v/syncbase/Syncbase.java b/syncbase/src/main/java/io/v/syncbase/Syncbase.java
index e026642..56083da 100644
--- a/syncbase/src/main/java/io/v/syncbase/Syncbase.java
+++ b/syncbase/src/main/java/io/v/syncbase/Syncbase.java
@@ -267,8 +267,8 @@
syncgroup.createIfMissing(ImmutableList.of(sUserdataCollection));
}
Database.AddWatchChangeHandlerOptions opts = new Database
- .AddWatchChangeHandlerOptions();
- opts.showUserdataCollectionRow = true;
+ .AddWatchChangeHandlerOptions.Builder().
+ setShowUserdataCollectionRow(true).build();
sDatabase.addWatchChangeHandler(new UserdataWatchHandler(), opts);
}
sOpts.callbackExecutor.execute(new Runnable() {
diff --git a/syncbase/src/test/java/io/v/syncbase/SyncbaseTest.java b/syncbase/src/test/java/io/v/syncbase/SyncbaseTest.java
index f8fd471..8513424 100644
--- a/syncbase/src/test/java/io/v/syncbase/SyncbaseTest.java
+++ b/syncbase/src/test/java/io/v/syncbase/SyncbaseTest.java
@@ -215,8 +215,8 @@
WatchChange watchChange = changes.next();
assertEquals(WatchChange.ChangeType.DELETE, watchChange.getChangeType());
assertTrue(watchChange.getCollectionId().getName().startsWith("c"));
- // TODO(razvanm): Uncomment after the POJO start working.
- //assertEquals(1, watchChange.getValue());
+ assertEquals("foo", watchChange.getRowKey());
+
assertFalse(changes.hasNext());
waitOnChangeBatch.set(null);
}
@@ -235,6 +235,140 @@
}
@Test
+ public void testWatchSpecificCollection() throws Exception {
+ Database db = createDatabase();
+ final SettableFuture<Void> waitOnInitialState = SettableFuture.create();
+ final SettableFuture<Void> waitOnChangeBatch = SettableFuture.create();
+ Collection collection = db.createCollection();
+ collection.put("foo", 1);
+ // Note: For ease of test writing, "gar" is a key lexicographically after "foo".
+ collection.put("gar", 2);
+ final String collectionName = collection.getId().getName();
+ Database.AddWatchChangeHandlerOptions opts = new Database.AddWatchChangeHandlerOptions.
+ Builder().setCollectionId(collection.getId()).build();
+ db.addWatchChangeHandler(new Database.WatchChangeHandler() {
+ @Override
+ public void onInitialState(Iterator<WatchChange> values) {
+ // TODO(razvanm): Check the entire contents of each change.
+ // 1st change: the collection entity for the "c" collection.
+ assertTrue(values.hasNext());
+ WatchChange watchChange = (WatchChange) values.next();
+ assertEquals(WatchChange.EntityType.COLLECTION, watchChange.getEntityType());
+ assertEquals(WatchChange.ChangeType.PUT, watchChange.getChangeType());
+ assertTrue(watchChange.getCollectionId().getName().equals(collectionName));
+ // 2nd change: the row for the "foo" key.
+ assertTrue(values.hasNext());
+ watchChange = (WatchChange) values.next();
+ assertEquals(WatchChange.EntityType.ROW, watchChange.getEntityType());
+ assertEquals(WatchChange.ChangeType.PUT, watchChange.getChangeType());
+ assertTrue(watchChange.getCollectionId().getName().equals(collectionName));
+ assertEquals("foo", watchChange.getRowKey());
+ // TODO(razvanm): Uncomment after the POJO start working.
+ //assertEquals(1, watchChange.getValue());
+ // 3rd change: the row for the "gar" key.
+ assertTrue(values.hasNext());
+ watchChange = (WatchChange) values.next();
+ assertEquals(WatchChange.EntityType.ROW, watchChange.getEntityType());
+ assertEquals(WatchChange.ChangeType.PUT, watchChange.getChangeType());
+ assertTrue(watchChange.getCollectionId().getName().equals(collectionName));
+ assertEquals("gar", watchChange.getRowKey());
+ // TODO(razvanm): Uncomment after the POJO start working.
+ //assertEquals(2, watchChange.getValue());
+
+ // No more changes.
+ assertFalse(values.hasNext());
+ waitOnInitialState.set(null);
+ }
+
+ @Override
+ public void onChangeBatch(Iterator<WatchChange> changes) {
+ assertTrue(changes.hasNext());
+ WatchChange watchChange = changes.next();
+ assertEquals(WatchChange.ChangeType.DELETE, watchChange.getChangeType());
+ assertTrue(watchChange.getCollectionId().getName().startsWith("c"));
+ assertEquals("foo", watchChange.getRowKey());
+
+ assertFalse(changes.hasNext());
+ waitOnChangeBatch.set(null);
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ VError vError = (VError) e;
+ assertEquals("v.io/v23/verror.Unknown", vError.id);
+ assertEquals("context canceled", vError.message);
+ assertEquals(0, vError.actionCode);
+ }
+ }, opts);
+ waitOnInitialState.get(1, TimeUnit.SECONDS);
+ collection.delete("foo");
+ waitOnChangeBatch.get(1, TimeUnit.SECONDS);
+ }
+
+
+ @Test
+ public void testWatchSpecificCollectionWithRowPrefix() throws Exception {
+ Database db = createDatabase();
+ final SettableFuture<Void> waitOnInitialState = SettableFuture.create();
+ final SettableFuture<Void> waitOnChangeBatch = SettableFuture.create();
+ Collection collection = db.createCollection();
+ collection.put("foo", 1);
+ collection.put("bar", 1); // not seen, due to filter
+ final String collectionName = collection.getId().getName();
+ Database.AddWatchChangeHandlerOptions opts = new Database.AddWatchChangeHandlerOptions.
+ Builder().setCollectionId(collection.getId()).setRowKeyPrefix("f").build();
+ db.addWatchChangeHandler(new Database.WatchChangeHandler() {
+ @Override
+ public void onInitialState(Iterator<WatchChange> values) {
+ // TODO(razvanm): Check the entire contents of each change.
+ // 1st change: the collection entity for the "c" collection.
+ assertTrue(values.hasNext());
+ WatchChange watchChange = (WatchChange) values.next();
+ assertEquals(WatchChange.EntityType.COLLECTION, watchChange.getEntityType());
+ assertEquals(WatchChange.ChangeType.PUT, watchChange.getChangeType());
+ assertTrue(watchChange.getCollectionId().getName().equals(collectionName));
+ // 2nd change: the row for the "foo" key.
+ assertTrue(values.hasNext());
+ watchChange = (WatchChange) values.next();
+ assertEquals(WatchChange.EntityType.ROW, watchChange.getEntityType());
+ assertEquals(WatchChange.ChangeType.PUT, watchChange.getChangeType());
+ assertTrue(watchChange.getCollectionId().getName().equals(collectionName));
+ assertEquals("foo", watchChange.getRowKey());
+ // TODO(razvanm): Uncomment after the POJO start working.
+ //assertEquals(1, watchChange.getValue());
+
+ // No more changes.
+ assertFalse(values.hasNext());
+ waitOnInitialState.set(null);
+ }
+
+ @Override
+ public void onChangeBatch(Iterator<WatchChange> changes) {
+ assertTrue(changes.hasNext());
+ WatchChange watchChange = changes.next();
+ assertEquals(WatchChange.ChangeType.DELETE, watchChange.getChangeType());
+ assertTrue(watchChange.getCollectionId().getName().startsWith("c"));
+ assertEquals("foo", watchChange.getRowKey());
+
+ assertFalse(changes.hasNext());
+ waitOnChangeBatch.set(null);
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ VError vError = (VError) e;
+ assertEquals("v.io/v23/verror.Unknown", vError.id);
+ assertEquals("context canceled", vError.message);
+ assertEquals(0, vError.actionCode);
+ }
+ }, opts);
+ waitOnInitialState.get(1, TimeUnit.SECONDS);
+ collection.delete("foo");
+ collection.delete("bar"); // not noticed due to filter.
+ waitOnChangeBatch.get(1, TimeUnit.SECONDS);
+ }
+
+ @Test
public void testRunInBatch() throws Exception {
Database db = createDatabase();