java: Add support for the userdata syncgroup
This change also adds a toString to the WatchChange.
Change-Id: I1d9d423d1b3f372bb700f47a5a89e5234c6e3a1c
diff --git a/syncbase/src/main/java/io/v/syncbase/Database.java b/syncbase/src/main/java/io/v/syncbase/Database.java
index 0ef2804..c94820e 100644
--- a/syncbase/src/main/java/io/v/syncbase/Database.java
+++ b/syncbase/src/main/java/io/v/syncbase/Database.java
@@ -87,6 +87,10 @@
}
Syncgroup syncgroup = new Syncgroup(mCoreDatabase.syncgroup(id.toCoreId()), this);
syncgroup.createIfMissing(collections);
+ // Remember this syncgroup in the userdata collection. The value doesn't matter, but since
+ // VOM won't accept null, use a boolean.
+ // Note: We may eventually want to use the value to deal with rejected invitations.
+ Syncbase.sUserdataCollection.put(id.encode(), true);
return syncgroup;
}
diff --git a/syncbase/src/main/java/io/v/syncbase/Syncbase.java b/syncbase/src/main/java/io/v/syncbase/Syncbase.java
index 9df5fae..7af2289 100644
--- a/syncbase/src/main/java/io/v/syncbase/Syncbase.java
+++ b/syncbase/src/main/java/io/v/syncbase/Syncbase.java
@@ -9,6 +9,7 @@
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Timer;
@@ -18,8 +19,8 @@
import io.v.syncbase.core.Permissions;
import io.v.syncbase.core.Service;
import io.v.syncbase.core.VError;
-import io.v.syncbase.internal.Neighborhood;
import io.v.syncbase.internal.Blessings;
+import io.v.syncbase.internal.Neighborhood;
// FIXME(sadovsky): Currently, various methods throw RuntimeException on any error. We need to
// decide which error types to surface to clients, and define specific Exception subclasses for
@@ -71,6 +72,7 @@
static Options sOpts;
private static Database sDatabase;
+ static Collection sUserdataCollection;
private static final Object sScanMappingMu = new Object();
private static final Map<ScanNeighborhoodForUsersCallback, Long> sScanMapping = new HashMap<>();
@@ -192,23 +194,22 @@
return;
}
sDatabase.createIfMissing();
- if (sOpts.disableUserdataSyncgroup) {
- Database.CollectionOptions cxOpts = new DatabaseHandle.CollectionOptions();
- cxOpts.withoutSyncgroup = true;
- sDatabase.collection(USERDATA_SYNCGROUP_NAME, cxOpts);
- Syncbase.enqueue(new Runnable() {
+ sUserdataCollection = sDatabase.collection(
+ USERDATA_SYNCGROUP_NAME,
+ new DatabaseHandle.CollectionOptions().setWithoutSyncgroup(true));
+ if (!sOpts.disableUserdataSyncgroup) {
+ Syncgroup syncgroup = sUserdataCollection.getSyncgroup();
+ // TODO(razvanm): First we need to try to join, then we need to try to
+ // create the syncgroup.
+ syncgroup.createIfMissing(ImmutableList.of(sUserdataCollection));
+ sDatabase.addWatchChangeHandler(new UserdataWatchHandler());
+ }
+ Syncbase.enqueue(new Runnable() {
@Override
public void run() {
cb.onSuccess();
}
});
- } else {
- // FIXME(sadovsky): Implement create-or-join (and watch) of userdata
- // syncgroup. For the new JNI API, we'll need to add Go code for this,
- // since Java can't make RPCs.
- cb.onError(new UnsupportedOperationException(
- "Synced userdata collection is not yet supported"));
- }
} catch (VError vError) {
cb.onError(vError);
}
@@ -216,6 +217,32 @@
});
}
+ private static class UserdataWatchHandler extends Database.WatchChangeHandler {
+ @Override
+ public void onInitialState(Iterator<WatchChange> values) {
+ onWatchChange(values);
+ }
+
+ @Override
+ public void onChangeBatch(Iterator<WatchChange> changes) {
+ onWatchChange(changes);
+ }
+
+ private void onWatchChange(Iterator<WatchChange> changes) {
+ WatchChange watchChange = changes.next();
+ if (watchChange.getCollectionId().getName().equals(USERDATA_SYNCGROUP_NAME) &&
+ watchChange.getEntityType() == WatchChange.EntityType.ROW &&
+ watchChange.getChangeType() == WatchChange.ChangeType.PUT) {
+ try {
+ sDatabase.getSyncgroup(Id.decode(watchChange.getRowKey())).join();
+ } catch (VError vError) {
+ vError.printStackTrace();
+ System.err.println(vError.toString());
+ }
+ }
+ }
+ }
+
/**
* Scans the neighborhood for nearby users.
*
diff --git a/syncbase/src/main/java/io/v/syncbase/Syncgroup.java b/syncbase/src/main/java/io/v/syncbase/Syncgroup.java
index 68ef623..d8f0dda 100644
--- a/syncbase/src/main/java/io/v/syncbase/Syncgroup.java
+++ b/syncbase/src/main/java/io/v/syncbase/Syncgroup.java
@@ -4,6 +4,8 @@
package io.v.syncbase;
+import com.google.common.collect.ImmutableList;
+
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -54,6 +56,12 @@
}
}
+ protected void join() throws VError {
+ // TODO(razvanm): Find a way to restrict the remote blessing. Cloud is one thing the remote
+ // blessings should include.
+ mCoreSyncgroup.join("", ImmutableList.of("..."), new SyncgroupMemberInfo());
+ }
+
/**
* Returns the id of this syncgroup.
*/
diff --git a/syncbase/src/main/java/io/v/syncbase/WatchChange.java b/syncbase/src/main/java/io/v/syncbase/WatchChange.java
index 499852f..46c236c 100644
--- a/syncbase/src/main/java/io/v/syncbase/WatchChange.java
+++ b/syncbase/src/main/java/io/v/syncbase/WatchChange.java
@@ -94,4 +94,11 @@
public boolean isContinued() {
return mContinued;
}
+
+ public String toString() {
+ return String.format("{\n entityType: \"%s\"\n changeType: \"%s\"\n" +
+ " collectionId: %s\n rowKey: \"%s\"\n fromSync: %s\n" +
+ " continued: %s\n}",
+ mEntityType, mChangeType, mCollectionId, mRowKey, mFromSync, mContinued);
+ }
}
\ No newline at end of file