TBR java/syncbase: Fix mistake with userdata watch

Wasn't seeing changes.

Change-Id: I8fa0bace8beada1fcda3ae9d4de189973872325a
diff --git a/syncbase/src/main/java/io/v/syncbase/Database.java b/syncbase/src/main/java/io/v/syncbase/Database.java
index e86141f..fc67c83 100644
--- a/syncbase/src/main/java/io/v/syncbase/Database.java
+++ b/syncbase/src/main/java/io/v/syncbase/Database.java
@@ -393,6 +393,7 @@
      */
     public static class AddWatchChangeHandlerOptions {
         public byte[] resumeMarker;
+        boolean showUserdataCollectionRow;
     }
 
     /**
@@ -428,7 +429,7 @@
     /**
      * Notifies {@code h} of initial state, and of all subsequent changes to this database.
      */
-    public void addWatchChangeHandler(final WatchChangeHandler h, AddWatchChangeHandlerOptions opts) {
+    public void addWatchChangeHandler(final WatchChangeHandler h, final AddWatchChangeHandlerOptions opts) {
         // Note: Eventually we'll add a watch variant that takes a query, where the query can be
         // constructed using some sort of query builder API.
         // TODO(sadovsky): Support specifying resumeMarker. Note, watch-from-resumeMarker may be
@@ -451,7 +452,7 @@
                                         io.v.syncbase.core.WatchChange.EntityType.ROW &&
                                 coreWatchChange.collection.name.equals(Syncbase.USERDATA_NAME) &&
                                 coreWatchChange.row.startsWith(Syncbase.USERDATA_COLLECTION_PREFIX);
-                        if (!isRoot && !isUserdataCollectionRow) {
+                        if (!isRoot && (opts.showUserdataCollectionRow || !isUserdataCollectionRow)) {
                             mBatch.add(new WatchChange(coreWatchChange));
                         }
                         if (!coreWatchChange.continued) {
diff --git a/syncbase/src/main/java/io/v/syncbase/Syncbase.java b/syncbase/src/main/java/io/v/syncbase/Syncbase.java
index 0f7caab..e026642 100644
--- a/syncbase/src/main/java/io/v/syncbase/Syncbase.java
+++ b/syncbase/src/main/java/io/v/syncbase/Syncbase.java
@@ -266,7 +266,10 @@
                         } catch(VError e) {
                             syncgroup.createIfMissing(ImmutableList.of(sUserdataCollection));
                         }
-                        sDatabase.addWatchChangeHandler(new UserdataWatchHandler());
+                        Database.AddWatchChangeHandlerOptions opts = new Database
+                                .AddWatchChangeHandlerOptions();
+                        opts.showUserdataCollectionRow = true;
+                        sDatabase.addWatchChangeHandler(new UserdataWatchHandler(), opts);
                     }
                     sOpts.callbackExecutor.execute(new Runnable() {
                         @Override
@@ -298,18 +301,20 @@
         }
 
         private void onWatchChange(Iterator<WatchChange> changes) {
-            WatchChange watchChange = changes.next();
-            if (watchChange.getCollectionId().getName().equals(USERDATA_NAME) &&
-                    watchChange.getEntityType() == WatchChange.EntityType.ROW &&
-                    watchChange.getChangeType() == WatchChange.ChangeType.PUT &&
-                    watchChange.getRowKey().startsWith(USERDATA_COLLECTION_PREFIX)) {
-                try {
-                    String encodedId = watchChange.getRowKey().
-                            substring(Syncbase.USERDATA_COLLECTION_PREFIX.length());
-                    sDatabase.getSyncgroup(Id.decode(encodedId)).join();
-                } catch (VError vError) {
-                    vError.printStackTrace();
-                    System.err.println(vError.toString());
+            while (changes.hasNext()) {
+                WatchChange watchChange = changes.next();
+                if (watchChange.getCollectionId().getName().equals(USERDATA_NAME) &&
+                        watchChange.getEntityType() == WatchChange.EntityType.ROW &&
+                        watchChange.getChangeType() == WatchChange.ChangeType.PUT &&
+                        watchChange.getRowKey().startsWith(USERDATA_COLLECTION_PREFIX)) {
+                    try {
+                        String encodedId = watchChange.getRowKey().
+                                substring(Syncbase.USERDATA_COLLECTION_PREFIX.length());
+                        sDatabase.getSyncgroup(Id.decode(encodedId)).join();
+                    } catch (VError vError) {
+                        vError.printStackTrace();
+                        System.err.println(vError.toString());
+                    }
                 }
             }
         }