TODOs: Userdata collection

This sets up the userdata syncgroup among the same user's devices.
When the user creates a new todo list, the other devices will also
join that todo list collection.

We currently shard by email address and use a common cloud.

Caveats:
- Since we can only create syncgroups underneath an existing syncbase,
  and merging is not supported, the same user must be sure to start
  one device a bit before the other to avoid a creation race.
- Discovery between syncbases has occasional errors saying that ads
  are too large on both the creator and joiner sides.
  This is expected, but we need a hack to syncbase to ensure that
  the cloud is published too.
  https://vanadium-review.googlesource.com/22043
- Similarly, the aforementioned CL is used to improve stability
  after a user's 2nd device joins the todo list. It won't crash if
  a local publish of Vanadium uses that patch.
  https://github.com/vanadium/issues/issues/1314

- If you run with vlog, the app runs out of memory eventually and
  crashes. This bug already existed though, IIRC.
- There are occasional errors in go-jni.
  "Error while importing keypair: -1
   panic: Reference 42975 doesn't exist"
  as well as
  "panic: runtime error: cgo argument has Go pointer to Go pointer"
  https://github.com/vanadium/issues/issues/1315

  This only starts to happen after syncing and seemingly occurs randomly.

Change-Id: If0c76ec52546f93fd1ab56e9ef0d8aeef06201f8
diff --git a/app/build.gradle b/app/build.gradle
index 89a014d..8cb1bb0 100644
--- a/app/build.gradle
+++ b/app/build.gradle
@@ -41,6 +41,10 @@
     }
 }
 
+repositories {
+    mavenLocal()
+}
+
 dependencies {
     compile fileTree(dir: 'libs', include: ['*.jar'])
     testCompile 'junit:junit:4.12'
@@ -75,5 +79,5 @@
             'com.google.guava:guava:19.0'
     )
     firebaseCompile 'com.firebase:firebase-client-android:2.5.2'
-    syncbaseCompile 'io.v:vanadium-android:2.0'
+    syncbaseCompile 'io.v:vanadium-android:2.+'
 }
diff --git a/app/src/syncbase/java/io/v/todos/persistence/syncbase/SyncbaseMain.java b/app/src/syncbase/java/io/v/todos/persistence/syncbase/SyncbaseMain.java
index b4ee310..dd05366 100644
--- a/app/src/syncbase/java/io/v/todos/persistence/syncbase/SyncbaseMain.java
+++ b/app/src/syncbase/java/io/v/todos/persistence/syncbase/SyncbaseMain.java
@@ -8,6 +8,7 @@
 import android.support.annotation.NonNull;
 import android.util.Log;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 
@@ -29,8 +30,14 @@
 import io.v.v23.InputChannelCallback;
 import io.v.v23.InputChannels;
 import io.v.v23.VFutures;
+import io.v.v23.security.access.Permissions;
 import io.v.v23.services.syncbase.BatchOptions;
+import io.v.v23.services.syncbase.CollectionRow;
+import io.v.v23.services.syncbase.Id;
 import io.v.v23.services.syncbase.KeyValue;
+import io.v.v23.services.syncbase.SyncgroupJoinFailedException;
+import io.v.v23.services.syncbase.SyncgroupMemberInfo;
+import io.v.v23.services.syncbase.SyncgroupSpec;
 import io.v.v23.syncbase.Batch;
 import io.v.v23.syncbase.BatchDatabase;
 import io.v.v23.syncbase.ChangeType;
@@ -57,6 +64,8 @@
             throws VException, SyncbaseServer.StartException {
         super(activity);
 
+        // Prepare a watch on top of the userdata collection to determine which todo lists need to
+        // be tracked by this application.
         InputChannel<WatchChange> watch = getDatabase().watch(
                 mVContext, getUserCollection().id(), LISTS_PREFIX);
         trap(InputChannels.withCallback(watch, new InputChannelCallback<WatchChange>() {
@@ -71,6 +80,31 @@
                 } else {
                     mIdGenerator.registerId(change.getRowName().substring(LISTS_PREFIX.length()));
 
+                    Log.d(TAG, "Found a list id from userdata watch: " + listId);
+                    // TODO(alexfandrianto): Exponential backoff on joining this list syncgroup.
+                    Futures.addCallback(joinListSyncgroup(listId), new TrappingCallback
+                            <SyncgroupSpec>(mActivity) {
+                        @Override
+                        public void onFailure(@NonNull Throwable t) {
+                            if (t instanceof SyncgroupJoinFailedException) {
+                                // Let's try again...
+                                try {
+                                    Thread.sleep(2000);
+                                } catch (InterruptedException e) {}
+
+
+                                // If this errors, then we will not get another chance to see this
+                                // syncgroup until the app is restarted.
+                                trap(joinListSyncgroup(listId));
+                            } else {
+                                super.onFailure(t);
+                            }
+                        }
+                    });
+
+
+
+
                     MainListTracker listTracker = new MainListTracker(
                             mVContext, getDatabase(), listId, listener);
                     if (mTaskTrackers.put(listId, listTracker) != null) {
@@ -107,14 +141,39 @@
                 new TrappingCallback<Void>(mActivity) {
                     @Override
                     public void onSuccess(@Nullable Void result) {
-                        // These can happen in either order
-                        trap(getUserCollection().put(mVContext, listName, null, VdlAny.class));
-                        trap(listCollection.put(mVContext, SyncbaseTodoList.LIST_METADATA_ROW_NAME,
-                                listSpec, ListSpec.class));
+                        Futures.addCallback(createListSyncgroup(listCollection.id()), new TrappingCallback<Void>(mActivity) {
+                            @Override
+                            public void onSuccess(@Nullable Void result) {
+                                // These can happen in either order.
+                                trap(getUserCollection().put(mVContext, listName, null, VdlAny.class));
+                                trap(listCollection.put(mVContext, SyncbaseTodoList.LIST_METADATA_ROW_NAME,
+                                        listSpec, ListSpec.class));
+                            }
+                        });
                     }
                 });
     }
 
+    private ListenableFuture<SyncgroupSpec> joinListSyncgroup(String listId) {
+        SyncgroupMemberInfo memberInfo = getDefaultMemberInfo();
+        String sgName = computeListSyncgroupName(listId);
+        return getDatabase().getSyncgroup(sgName).join(mVContext, memberInfo);
+    }
+
+    private ListenableFuture<Void> createListSyncgroup(Id id) {
+        String listId = id.getName();
+        final String sgName = computeListSyncgroupName(listId);
+        Permissions permissions = computePermissionsFromBlessings(getPersonalBlessings(mVContext));
+
+        SyncgroupMemberInfo memberInfo = getDefaultMemberInfo();
+
+        SyncgroupSpec spec = new SyncgroupSpec(
+                "TODOs User Data Collection", permissions,
+                ImmutableList.of(new CollectionRow(id, "")),
+                ImmutableList.of(MOUNTPOINT), false);
+        return getDatabase().getSyncgroup(sgName).create(mVContext, spec, memberInfo);
+    }
+
     @Override
     public void deleteTodoList(String key) {
         MainListTracker tracker = mTaskTrackers.remove(key);
@@ -139,8 +198,10 @@
                                         RowRange.prefix(SyncbaseTodoList.TASKS_PREFIX));
 
                                 List<ListenableFuture<Void>> puts = new ArrayList<>();
+
                                 for (KeyValue kv : InputChannels.asIterable(scan)) {
-                                    TaskSpec taskSpec = (TaskSpec) VomUtil.decode(kv.getValue());
+                                    TaskSpec taskSpec = (TaskSpec) VomUtil.decode(kv.getValue(),
+                                            TaskSpec.class);
                                     if (taskSpec.getDone() != done) {
                                         taskSpec.setDone(done);
                                         puts.add(list.put(mVContext, kv.getKey(), taskSpec,
diff --git a/app/src/syncbase/java/io/v/todos/persistence/syncbase/SyncbasePersistence.java b/app/src/syncbase/java/io/v/todos/persistence/syncbase/SyncbasePersistence.java
index e475d3d..59b5f4c 100644
--- a/app/src/syncbase/java/io/v/todos/persistence/syncbase/SyncbasePersistence.java
+++ b/app/src/syncbase/java/io/v/todos/persistence/syncbase/SyncbasePersistence.java
@@ -33,6 +33,8 @@
 import io.v.impl.google.services.syncbase.SyncbaseServer;
 import io.v.todos.R;
 import io.v.todos.persistence.Persistence;
+import io.v.v23.OptionDefs;
+import io.v.v23.Options;
 import io.v.v23.VFutures;
 import io.v.v23.context.VContext;
 import io.v.v23.rpc.Server;
@@ -41,15 +43,21 @@
 import io.v.v23.security.access.AccessList;
 import io.v.v23.security.access.Constants;
 import io.v.v23.security.access.Permissions;
+import io.v.v23.services.syncbase.CollectionRow;
+import io.v.v23.services.syncbase.SyncgroupJoinFailedException;
+import io.v.v23.services.syncbase.SyncgroupMemberInfo;
+import io.v.v23.services.syncbase.SyncgroupSpec;
 import io.v.v23.syncbase.Collection;
 import io.v.v23.syncbase.Database;
 import io.v.v23.syncbase.Syncbase;
 import io.v.v23.syncbase.SyncbaseService;
+import io.v.v23.syncbase.Syncgroup;
 import io.v.v23.vdl.VdlStruct;
 import io.v.v23.verror.CanceledException;
 import io.v.v23.verror.ExistException;
 import io.v.v23.verror.VException;
 import io.v.v23.vom.VomUtil;
+import io.v.x.ref.lib.discovery.BadAdvertisementException;
 
 /**
  * TODO(rosswang): Move most of this to vanadium-android.
@@ -60,9 +68,15 @@
             FILENAME = "syncbase",
             PROXY = "proxy",
             DATABASE = "db",
-            BLESSINGS_KEY = "blessings";
+            BLESSINGS_KEY = "blessings",
+            USER_COLLECTION_SYNCGROUP_SUFFIX = "/%%sync/sg_",
+            LIST_COLLECTION_SYNCGROUP_SUFFIX = "/%%sync/list_",
+    // TODO(alexfandrianto): This shouldn't be me running the cloud.
+    CLOUD_BLESSING = "dev.v.io:u:alexfandrianto@google.com";
     public static final String
-            USER_COLLECTION_NAME = "userdata";
+            USER_COLLECTION_NAME = "userdata",
+            MOUNTPOINT = "/ns.dev.v.io:8101/tmp/todos/users/",
+            CLOUD_NAME = MOUNTPOINT + "cloud";
     // BlessingPattern initialization has to be deferred until after V23 init due to native binding.
     private static final Supplier<AccessList> OPEN_ACL = Suppliers.memoize(
             new Supplier<AccessList>() {
@@ -93,10 +107,12 @@
         storageRoot.mkdirs();
 
         Log.i(TAG, "Starting Syncbase");
-        VContext serverContext = SyncbaseServer.withNewServer(vContext,
-                new SyncbaseServer.Params()
-                        .withPermissions(serverPermissions)
-                        .withStorageRootDir(storageRoot.getAbsolutePath()));
+        SyncbaseServer.Params params = new SyncbaseServer.Params()
+                .withPermissions(serverPermissions)
+                .withStorageRootDir(storageRoot.getAbsolutePath());
+
+
+        VContext serverContext = SyncbaseServer.withNewServer(vContext, params);
 
         Server server = V.getServer(serverContext);
         return "/" + server.getStatus().getEndpoints()[0];
@@ -115,24 +131,18 @@
         synchronized (sSyncbaseMutex) {
             if (sSyncbase == null) {
                 final Context appContext = androidContext.getApplicationContext();
-                VContext singletonContext = V.init(appContext);
+                VContext singletonContext = V.init(appContext, new Options()
+                        .set(OptionDefs.LOG_VLEVEL, 0)
+                        .set(OptionDefs.LOG_VMODULE, "vsync*=0"));
+
                 try {
-                    Blessings clientBlessings = V.getPrincipal(singletonContext)
-                            .blessingStore().defaultBlessings();
-                    if (clientBlessings == null) {
+                    // Retrieve this context's personal permissions to set ACLs on the server.
+                    Blessings personalBlessings = getPersonalBlessings(singletonContext);
+                    if (personalBlessings == null) {
                         throw new IllegalStateException("Blessings must be attached to the " +
                                 "Vanadium principal before Syncbase initialization.");
                     }
-
-                    AccessList clientAcl = new AccessList(ImmutableList.of(
-                            new BlessingPattern(clientBlessings.toString())),
-                            ImmutableList.<String>of());
-
-                    Permissions permissions = new Permissions(ImmutableMap.of(
-                            Constants.RESOLVE.getValue(), OPEN_ACL.get(),
-                            Constants.READ.getValue(), clientAcl,
-                            Constants.WRITE.getValue(), clientAcl,
-                            Constants.ADMIN.getValue(), clientAcl));
+                    Permissions permissions = computePermissionsFromBlessings(personalBlessings);
 
                     sSyncbase = Syncbase.newService(startSyncbaseServer(
                             singletonContext, appContext, permissions));
@@ -145,6 +155,22 @@
         }
     }
 
+    protected static Blessings getPersonalBlessings(VContext ctx) {
+        return V.getPrincipal(ctx).blessingStore().defaultBlessings();
+    }
+
+    protected static Permissions computePermissionsFromBlessings(Blessings blessings) {
+        AccessList clientAcl = new AccessList(ImmutableList.of(
+                new BlessingPattern(blessings.toString()), new BlessingPattern(CLOUD_BLESSING)),
+                ImmutableList.<String>of());
+
+        return new Permissions(ImmutableMap.of(
+                Constants.RESOLVE.getValue(), OPEN_ACL.get(),
+                Constants.READ.getValue(), clientAcl,
+                Constants.WRITE.getValue(), clientAcl,
+                Constants.ADMIN.getValue(), clientAcl));
+    }
+
     private static final Object sDatabaseMutex = new Object();
     private static Database sDatabase;
 
@@ -181,8 +207,86 @@
         }
     }
 
+    private static final Object sCloudDatabaseMutex = new Object();
+    private static volatile Database sCloudDatabase;
+
+    private static void ensureCloudDatabaseExists() {
+        synchronized (sCloudDatabaseMutex) {
+            if (sCloudDatabase == null) {
+                SyncbaseService cloudService = Syncbase.newService(CLOUD_NAME);
+                Database db = cloudService.getDatabase(sVContext, DATABASE, null);
+                try {
+                    VFutures.sync(db.create(sVContext, null));
+                } catch (ExistException e) {
+                    // This is acceptable. No need to do it again.
+                } catch (Exception e) {
+                    Log.w(TAG, "Could not ensure cloud database exists: " + e.getMessage());
+                }
+                sCloudDatabase = db;
+            }
+        }
+    }
+
+    private static final Object sUserSyncgroupMutex = new Object();
+    private static volatile Syncgroup sUserSyncgroup;
+
+    private static void ensureUserSyncgroupExists() throws VException {
+        synchronized (sUserSyncgroupMutex) {
+            if (sUserSyncgroup == null) {
+                Blessings clientBlessings = getPersonalBlessings(sVContext);
+                String[] split = clientBlessings.toString().split(":");
+                String email = split[split.length - 1];
+                Log.d(TAG, email);
+
+                Permissions permissions = computePermissionsFromBlessings(clientBlessings);
+
+                String sgName = CLOUD_NAME + USER_COLLECTION_SYNCGROUP_SUFFIX + email;
+
+                final SyncgroupMemberInfo memberInfo = getDefaultMemberInfo();
+                final Syncgroup sgHandle = sDatabase.getSyncgroup(sgName);
+
+                try {
+                    Log.d(TAG, "Trying to join the syncgroup: " + sgName);
+                    VFutures.sync(sgHandle.join(sVContext, memberInfo));
+                    Log.d(TAG, "JOINED the syncgroup: " + sgName);
+                } catch (SyncgroupJoinFailedException e) {
+                    Log.w(TAG, "Failed join. Trying to create the syncgroup: " + sgName, e);
+                    SyncgroupSpec spec = new SyncgroupSpec(
+                            "TODOs User Data Collection", permissions,
+                            ImmutableList.of(new CollectionRow(sUserCollection.id(), "")),
+                            ImmutableList.of(MOUNTPOINT), false);
+                    try {
+                        VFutures.sync(sgHandle.create(sVContext, spec, memberInfo));
+                    } catch (BadAdvertisementException e2) {
+                        Log.d(TAG, "Bad advertisement exception. Can we fix this?");
+                    }
+                    Log.d(TAG, "CREATED the syncgroup: " + sgName);
+                } catch (Exception e) {
+                    Log.d(TAG, "Failed to join or create the syncgroup: " + sgName);
+                    if (!(e instanceof BadAdvertisementException)) { // joined, I guess
+                        throw e;
+                    }
+                }
+                sUserSyncgroup = sgHandle;
+            }
+        }
+    }
+
+    protected static SyncgroupMemberInfo getDefaultMemberInfo() {
+        SyncgroupMemberInfo memberInfo = new SyncgroupMemberInfo();
+        memberInfo.setSyncPriority((byte) 3);
+        return memberInfo;
+
+    }
+
+    protected String computeListSyncgroupName(String listId) {
+        return CLOUD_NAME + LIST_COLLECTION_SYNCGROUP_SUFFIX + listId;
+    }
+
+    private static volatile boolean sInitialized;
+
     public static boolean isInitialized() {
-        return sUserCollection != null;
+        return sInitialized;
     }
 
     /**
@@ -205,7 +309,7 @@
 
         @Override
         public void onFailure(@NonNull Throwable t) {
-            if (!(t instanceof CanceledException)) {
+            if (!(t instanceof CanceledException || t instanceof ExistException)) {
                 Toast.makeText(mAndroidContext, R.string.err_sync, Toast.LENGTH_LONG).show();
                 StringBuilder traceBuilder = new StringBuilder(Throwables.getStackTraceAsString(t))
                         .append("\n invoked at ").append(mCaller[FIRST_SIGNIFICANT_STACK_ELEMENT]);
@@ -220,10 +324,9 @@
     /**
      * Extracts the value from a watch change.
      * TODO(rosswang): This method is a tempory hack, awaiting resolution of the following issues:
-     *
      * <ul>
-     *  <li><a href="https://github.com/vanadium/issues/issues/1305">#1305</a>
-     *  <li><a href="https://github.com/vanadium/issues/issues/1310">#1310</a>
+     * <li><a href="https://github.com/vanadium/issues/issues/1305">#1305</a>
+     * <li><a href="https://github.com/vanadium/issues/issues/1310">#1310</a>
      * </ul>
      */
     @SuppressWarnings("unchecked")
@@ -264,7 +367,9 @@
     public SyncbasePersistence(final Activity activity)
             throws VException, SyncbaseServer.StartException {
         mActivity = activity;
-        mVContext = V.init(activity);
+        mVContext = V.init(activity, new Options()
+                .set(OptionDefs.LOG_VLEVEL, 0)
+                .set(OptionDefs.LOG_VMODULE, "vsync*=0"));
 
         // We might not actually have to seek blessings each time, but getBlessings does not
         // block if we already have blessings and this has better-behaved lifecycle
@@ -285,6 +390,9 @@
         ensureSyncbaseStarted(activity);
         ensureDatabaseExists();
         ensureUserCollectionExists();
+        ensureCloudDatabaseExists();
+        ensureUserSyncgroupExists();
+        sInitialized = true;
     }
 
     @Override