java: Add support for the userdata syncgroup

This change also adds a toString to the WatchChange.

Change-Id: I1d9d423d1b3f372bb700f47a5a89e5234c6e3a1c
diff --git a/syncbase/src/main/java/io/v/syncbase/Database.java b/syncbase/src/main/java/io/v/syncbase/Database.java
index 0ef2804..c94820e 100644
--- a/syncbase/src/main/java/io/v/syncbase/Database.java
+++ b/syncbase/src/main/java/io/v/syncbase/Database.java
@@ -87,6 +87,10 @@
         }
         Syncgroup syncgroup = new Syncgroup(mCoreDatabase.syncgroup(id.toCoreId()), this);
         syncgroup.createIfMissing(collections);
+        // Remember this syncgroup in the userdata collection. The value doesn't matter, but since
+        // VOM won't accept null, use a boolean.
+        // Note: We may eventually want to use the value to deal with rejected invitations.
+        Syncbase.sUserdataCollection.put(id.encode(), true);
         return syncgroup;
     }
 
diff --git a/syncbase/src/main/java/io/v/syncbase/Syncbase.java b/syncbase/src/main/java/io/v/syncbase/Syncbase.java
index 9df5fae..7af2289 100644
--- a/syncbase/src/main/java/io/v/syncbase/Syncbase.java
+++ b/syncbase/src/main/java/io/v/syncbase/Syncbase.java
@@ -9,6 +9,7 @@
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Timer;
@@ -18,8 +19,8 @@
 import io.v.syncbase.core.Permissions;
 import io.v.syncbase.core.Service;
 import io.v.syncbase.core.VError;
-import io.v.syncbase.internal.Neighborhood;
 import io.v.syncbase.internal.Blessings;
+import io.v.syncbase.internal.Neighborhood;
 
 // FIXME(sadovsky): Currently, various methods throw RuntimeException on any error. We need to
 // decide which error types to surface to clients, and define specific Exception subclasses for
@@ -71,6 +72,7 @@
 
     static Options sOpts;
     private static Database sDatabase;
+    static Collection sUserdataCollection;
     private static final Object sScanMappingMu = new Object();
     private static final Map<ScanNeighborhoodForUsersCallback, Long> sScanMapping = new HashMap<>();
 
@@ -192,23 +194,22 @@
                         return;
                     }
                     sDatabase.createIfMissing();
-                    if (sOpts.disableUserdataSyncgroup) {
-                        Database.CollectionOptions cxOpts = new DatabaseHandle.CollectionOptions();
-                        cxOpts.withoutSyncgroup = true;
-                        sDatabase.collection(USERDATA_SYNCGROUP_NAME, cxOpts);
-                        Syncbase.enqueue(new Runnable() {
+                    sUserdataCollection = sDatabase.collection(
+                            USERDATA_SYNCGROUP_NAME,
+                            new DatabaseHandle.CollectionOptions().setWithoutSyncgroup(true));
+                    if (!sOpts.disableUserdataSyncgroup) {
+                        Syncgroup syncgroup = sUserdataCollection.getSyncgroup();
+                        // TODO(razvanm): First we need to try to join, then we need to try to
+                        // create the syncgroup.
+                        syncgroup.createIfMissing(ImmutableList.of(sUserdataCollection));
+                        sDatabase.addWatchChangeHandler(new UserdataWatchHandler());
+                    }
+                    Syncbase.enqueue(new Runnable() {
                             @Override
                             public void run() {
                                 cb.onSuccess();
                             }
                         });
-                    } else {
-                        // FIXME(sadovsky): Implement create-or-join (and watch) of userdata
-                        // syncgroup. For the new JNI API, we'll need to add Go code for this,
-                        // since Java can't make RPCs.
-                        cb.onError(new UnsupportedOperationException(
-                                "Synced userdata collection is not yet supported"));
-                    }
                 } catch (VError vError) {
                     cb.onError(vError);
                 }
@@ -216,6 +217,32 @@
         });
     }
 
+    private static class UserdataWatchHandler extends Database.WatchChangeHandler {
+        @Override
+        public void onInitialState(Iterator<WatchChange> values) {
+            onWatchChange(values);
+        }
+
+        @Override
+        public void onChangeBatch(Iterator<WatchChange> changes) {
+            onWatchChange(changes);
+        }
+
+        private void onWatchChange(Iterator<WatchChange> changes) {
+            WatchChange watchChange = changes.next();
+            if (watchChange.getCollectionId().getName().equals(USERDATA_SYNCGROUP_NAME) &&
+                    watchChange.getEntityType() == WatchChange.EntityType.ROW &&
+                    watchChange.getChangeType() == WatchChange.ChangeType.PUT) {
+                try {
+                    sDatabase.getSyncgroup(Id.decode(watchChange.getRowKey())).join();
+                } catch (VError vError) {
+                    vError.printStackTrace();
+                    System.err.println(vError.toString());
+                }
+            }
+        }
+    }
+
     /**
      * Scans the neighborhood for nearby users.
      *
diff --git a/syncbase/src/main/java/io/v/syncbase/Syncgroup.java b/syncbase/src/main/java/io/v/syncbase/Syncgroup.java
index 68ef623..d8f0dda 100644
--- a/syncbase/src/main/java/io/v/syncbase/Syncgroup.java
+++ b/syncbase/src/main/java/io/v/syncbase/Syncgroup.java
@@ -4,6 +4,8 @@
 
 package io.v.syncbase;
 
+import com.google.common.collect.ImmutableList;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -54,6 +56,12 @@
         }
     }
 
+    protected void join() throws VError {
+        // TODO(razvanm): Find a way to restrict the remote blessing. Cloud is one thing the remote
+        // blessings should include.
+        mCoreSyncgroup.join("", ImmutableList.of("..."), new SyncgroupMemberInfo());
+    }
+
     /**
      * Returns the id of this syncgroup.
      */
diff --git a/syncbase/src/main/java/io/v/syncbase/WatchChange.java b/syncbase/src/main/java/io/v/syncbase/WatchChange.java
index 499852f..46c236c 100644
--- a/syncbase/src/main/java/io/v/syncbase/WatchChange.java
+++ b/syncbase/src/main/java/io/v/syncbase/WatchChange.java
@@ -94,4 +94,11 @@
     public boolean isContinued() {
         return mContinued;
     }
+
+    public String toString() {
+        return String.format("{\n  entityType: \"%s\"\n  changeType: \"%s\"\n" +
+                "  collectionId: %s\n  rowKey: \"%s\"\n  fromSync: %s\n" +
+                "  continued: %s\n}",
+                mEntityType, mChangeType, mCollectionId, mRowKey, mFromSync, mContinued);
+    }
 }
\ No newline at end of file