TODOs: Userdata collection
This sets up the userdata syncgroup among the same user's devices.
When the user creates a new todo list, the other devices will also
join that todo list collection.
We currently shard by email address and use a common cloud.
Caveats:
- Since we can only create syncgroups underneath an existing syncbase,
and merging is not supported, the same user must be sure to start
one device a bit before the other to avoid a creation race.
- Discovery between syncbases has occasional errors saying that ads
are too large on both the creator and joiner sides.
This is expected, but we need a hack to syncbase to ensure that
the cloud is published too.
https://vanadium-review.googlesource.com/22043
- Similarly, the aforementioned CL is used to improve stability
after a user's 2nd device joins the todo list. It won't crash if
a local publish of Vanadium uses that patch.
https://github.com/vanadium/issues/issues/1314
- If you run with vlog, the app runs out of memory eventually and
crashes. This bug already existed though, IIRC.
- There are occasional errors in go-jni.
"Error while importing keypair: -1
panic: Reference 42975 doesn't exist"
as well as
"panic: runtime error: cgo argument has Go pointer to Go pointer"
https://github.com/vanadium/issues/issues/1315
This only starts to happen after syncing and seemingly occurs randomly.
Change-Id: If0c76ec52546f93fd1ab56e9ef0d8aeef06201f8
diff --git a/app/build.gradle b/app/build.gradle
index 89a014d..8cb1bb0 100644
--- a/app/build.gradle
+++ b/app/build.gradle
@@ -41,6 +41,10 @@
}
}
+repositories {
+ mavenLocal()
+}
+
dependencies {
compile fileTree(dir: 'libs', include: ['*.jar'])
testCompile 'junit:junit:4.12'
@@ -75,5 +79,5 @@
'com.google.guava:guava:19.0'
)
firebaseCompile 'com.firebase:firebase-client-android:2.5.2'
- syncbaseCompile 'io.v:vanadium-android:2.0'
+ syncbaseCompile 'io.v:vanadium-android:2.+'
}
diff --git a/app/src/syncbase/java/io/v/todos/persistence/syncbase/SyncbaseMain.java b/app/src/syncbase/java/io/v/todos/persistence/syncbase/SyncbaseMain.java
index b4ee310..dd05366 100644
--- a/app/src/syncbase/java/io/v/todos/persistence/syncbase/SyncbaseMain.java
+++ b/app/src/syncbase/java/io/v/todos/persistence/syncbase/SyncbaseMain.java
@@ -8,6 +8,7 @@
import android.support.annotation.NonNull;
import android.util.Log;
+import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@@ -29,8 +30,14 @@
import io.v.v23.InputChannelCallback;
import io.v.v23.InputChannels;
import io.v.v23.VFutures;
+import io.v.v23.security.access.Permissions;
import io.v.v23.services.syncbase.BatchOptions;
+import io.v.v23.services.syncbase.CollectionRow;
+import io.v.v23.services.syncbase.Id;
import io.v.v23.services.syncbase.KeyValue;
+import io.v.v23.services.syncbase.SyncgroupJoinFailedException;
+import io.v.v23.services.syncbase.SyncgroupMemberInfo;
+import io.v.v23.services.syncbase.SyncgroupSpec;
import io.v.v23.syncbase.Batch;
import io.v.v23.syncbase.BatchDatabase;
import io.v.v23.syncbase.ChangeType;
@@ -57,6 +64,8 @@
throws VException, SyncbaseServer.StartException {
super(activity);
+ // Prepare a watch on top of the userdata collection to determine which todo lists need to
+ // be tracked by this application.
InputChannel<WatchChange> watch = getDatabase().watch(
mVContext, getUserCollection().id(), LISTS_PREFIX);
trap(InputChannels.withCallback(watch, new InputChannelCallback<WatchChange>() {
@@ -71,6 +80,31 @@
} else {
mIdGenerator.registerId(change.getRowName().substring(LISTS_PREFIX.length()));
+ Log.d(TAG, "Found a list id from userdata watch: " + listId);
+ // TODO(alexfandrianto): Exponential backoff on joining this list syncgroup.
+ Futures.addCallback(joinListSyncgroup(listId), new TrappingCallback
+ <SyncgroupSpec>(mActivity) {
+ @Override
+ public void onFailure(@NonNull Throwable t) {
+ if (t instanceof SyncgroupJoinFailedException) {
+ // Let's try again...
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {}
+
+
+ // If this errors, then we will not get another chance to see this
+ // syncgroup until the app is restarted.
+ trap(joinListSyncgroup(listId));
+ } else {
+ super.onFailure(t);
+ }
+ }
+ });
+
+
+
+
MainListTracker listTracker = new MainListTracker(
mVContext, getDatabase(), listId, listener);
if (mTaskTrackers.put(listId, listTracker) != null) {
@@ -107,14 +141,39 @@
new TrappingCallback<Void>(mActivity) {
@Override
public void onSuccess(@Nullable Void result) {
- // These can happen in either order
- trap(getUserCollection().put(mVContext, listName, null, VdlAny.class));
- trap(listCollection.put(mVContext, SyncbaseTodoList.LIST_METADATA_ROW_NAME,
- listSpec, ListSpec.class));
+ Futures.addCallback(createListSyncgroup(listCollection.id()), new TrappingCallback<Void>(mActivity) {
+ @Override
+ public void onSuccess(@Nullable Void result) {
+ // These can happen in either order.
+ trap(getUserCollection().put(mVContext, listName, null, VdlAny.class));
+ trap(listCollection.put(mVContext, SyncbaseTodoList.LIST_METADATA_ROW_NAME,
+ listSpec, ListSpec.class));
+ }
+ });
}
});
}
+ private ListenableFuture<SyncgroupSpec> joinListSyncgroup(String listId) {
+ SyncgroupMemberInfo memberInfo = getDefaultMemberInfo();
+ String sgName = computeListSyncgroupName(listId);
+ return getDatabase().getSyncgroup(sgName).join(mVContext, memberInfo);
+ }
+
+ private ListenableFuture<Void> createListSyncgroup(Id id) {
+ String listId = id.getName();
+ final String sgName = computeListSyncgroupName(listId);
+ Permissions permissions = computePermissionsFromBlessings(getPersonalBlessings(mVContext));
+
+ SyncgroupMemberInfo memberInfo = getDefaultMemberInfo();
+
+ SyncgroupSpec spec = new SyncgroupSpec(
+ "TODOs User Data Collection", permissions,
+ ImmutableList.of(new CollectionRow(id, "")),
+ ImmutableList.of(MOUNTPOINT), false);
+ return getDatabase().getSyncgroup(sgName).create(mVContext, spec, memberInfo);
+ }
+
@Override
public void deleteTodoList(String key) {
MainListTracker tracker = mTaskTrackers.remove(key);
@@ -139,8 +198,10 @@
RowRange.prefix(SyncbaseTodoList.TASKS_PREFIX));
List<ListenableFuture<Void>> puts = new ArrayList<>();
+
for (KeyValue kv : InputChannels.asIterable(scan)) {
- TaskSpec taskSpec = (TaskSpec) VomUtil.decode(kv.getValue());
+ TaskSpec taskSpec = (TaskSpec) VomUtil.decode(kv.getValue(),
+ TaskSpec.class);
if (taskSpec.getDone() != done) {
taskSpec.setDone(done);
puts.add(list.put(mVContext, kv.getKey(), taskSpec,
diff --git a/app/src/syncbase/java/io/v/todos/persistence/syncbase/SyncbasePersistence.java b/app/src/syncbase/java/io/v/todos/persistence/syncbase/SyncbasePersistence.java
index e475d3d..59b5f4c 100644
--- a/app/src/syncbase/java/io/v/todos/persistence/syncbase/SyncbasePersistence.java
+++ b/app/src/syncbase/java/io/v/todos/persistence/syncbase/SyncbasePersistence.java
@@ -33,6 +33,8 @@
import io.v.impl.google.services.syncbase.SyncbaseServer;
import io.v.todos.R;
import io.v.todos.persistence.Persistence;
+import io.v.v23.OptionDefs;
+import io.v.v23.Options;
import io.v.v23.VFutures;
import io.v.v23.context.VContext;
import io.v.v23.rpc.Server;
@@ -41,15 +43,21 @@
import io.v.v23.security.access.AccessList;
import io.v.v23.security.access.Constants;
import io.v.v23.security.access.Permissions;
+import io.v.v23.services.syncbase.CollectionRow;
+import io.v.v23.services.syncbase.SyncgroupJoinFailedException;
+import io.v.v23.services.syncbase.SyncgroupMemberInfo;
+import io.v.v23.services.syncbase.SyncgroupSpec;
import io.v.v23.syncbase.Collection;
import io.v.v23.syncbase.Database;
import io.v.v23.syncbase.Syncbase;
import io.v.v23.syncbase.SyncbaseService;
+import io.v.v23.syncbase.Syncgroup;
import io.v.v23.vdl.VdlStruct;
import io.v.v23.verror.CanceledException;
import io.v.v23.verror.ExistException;
import io.v.v23.verror.VException;
import io.v.v23.vom.VomUtil;
+import io.v.x.ref.lib.discovery.BadAdvertisementException;
/**
* TODO(rosswang): Move most of this to vanadium-android.
@@ -60,9 +68,15 @@
FILENAME = "syncbase",
PROXY = "proxy",
DATABASE = "db",
- BLESSINGS_KEY = "blessings";
+ BLESSINGS_KEY = "blessings",
+ USER_COLLECTION_SYNCGROUP_SUFFIX = "/%%sync/sg_",
+ LIST_COLLECTION_SYNCGROUP_SUFFIX = "/%%sync/list_",
+ // TODO(alexfandrianto): This shouldn't be me running the cloud.
+ CLOUD_BLESSING = "dev.v.io:u:alexfandrianto@google.com";
public static final String
- USER_COLLECTION_NAME = "userdata";
+ USER_COLLECTION_NAME = "userdata",
+ MOUNTPOINT = "/ns.dev.v.io:8101/tmp/todos/users/",
+ CLOUD_NAME = MOUNTPOINT + "cloud";
// BlessingPattern initialization has to be deferred until after V23 init due to native binding.
private static final Supplier<AccessList> OPEN_ACL = Suppliers.memoize(
new Supplier<AccessList>() {
@@ -93,10 +107,12 @@
storageRoot.mkdirs();
Log.i(TAG, "Starting Syncbase");
- VContext serverContext = SyncbaseServer.withNewServer(vContext,
- new SyncbaseServer.Params()
- .withPermissions(serverPermissions)
- .withStorageRootDir(storageRoot.getAbsolutePath()));
+ SyncbaseServer.Params params = new SyncbaseServer.Params()
+ .withPermissions(serverPermissions)
+ .withStorageRootDir(storageRoot.getAbsolutePath());
+
+
+ VContext serverContext = SyncbaseServer.withNewServer(vContext, params);
Server server = V.getServer(serverContext);
return "/" + server.getStatus().getEndpoints()[0];
@@ -115,24 +131,18 @@
synchronized (sSyncbaseMutex) {
if (sSyncbase == null) {
final Context appContext = androidContext.getApplicationContext();
- VContext singletonContext = V.init(appContext);
+ VContext singletonContext = V.init(appContext, new Options()
+ .set(OptionDefs.LOG_VLEVEL, 0)
+ .set(OptionDefs.LOG_VMODULE, "vsync*=0"));
+
try {
- Blessings clientBlessings = V.getPrincipal(singletonContext)
- .blessingStore().defaultBlessings();
- if (clientBlessings == null) {
+ // Retrieve this context's personal permissions to set ACLs on the server.
+ Blessings personalBlessings = getPersonalBlessings(singletonContext);
+ if (personalBlessings == null) {
throw new IllegalStateException("Blessings must be attached to the " +
"Vanadium principal before Syncbase initialization.");
}
-
- AccessList clientAcl = new AccessList(ImmutableList.of(
- new BlessingPattern(clientBlessings.toString())),
- ImmutableList.<String>of());
-
- Permissions permissions = new Permissions(ImmutableMap.of(
- Constants.RESOLVE.getValue(), OPEN_ACL.get(),
- Constants.READ.getValue(), clientAcl,
- Constants.WRITE.getValue(), clientAcl,
- Constants.ADMIN.getValue(), clientAcl));
+ Permissions permissions = computePermissionsFromBlessings(personalBlessings);
sSyncbase = Syncbase.newService(startSyncbaseServer(
singletonContext, appContext, permissions));
@@ -145,6 +155,22 @@
}
}
+ protected static Blessings getPersonalBlessings(VContext ctx) {
+ return V.getPrincipal(ctx).blessingStore().defaultBlessings();
+ }
+
+ protected static Permissions computePermissionsFromBlessings(Blessings blessings) {
+ AccessList clientAcl = new AccessList(ImmutableList.of(
+ new BlessingPattern(blessings.toString()), new BlessingPattern(CLOUD_BLESSING)),
+ ImmutableList.<String>of());
+
+ return new Permissions(ImmutableMap.of(
+ Constants.RESOLVE.getValue(), OPEN_ACL.get(),
+ Constants.READ.getValue(), clientAcl,
+ Constants.WRITE.getValue(), clientAcl,
+ Constants.ADMIN.getValue(), clientAcl));
+ }
+
private static final Object sDatabaseMutex = new Object();
private static Database sDatabase;
@@ -181,8 +207,86 @@
}
}
+ private static final Object sCloudDatabaseMutex = new Object();
+ private static volatile Database sCloudDatabase;
+
+ private static void ensureCloudDatabaseExists() {
+ synchronized (sCloudDatabaseMutex) {
+ if (sCloudDatabase == null) {
+ SyncbaseService cloudService = Syncbase.newService(CLOUD_NAME);
+ Database db = cloudService.getDatabase(sVContext, DATABASE, null);
+ try {
+ VFutures.sync(db.create(sVContext, null));
+ } catch (ExistException e) {
+ // This is acceptable. No need to do it again.
+ } catch (Exception e) {
+ Log.w(TAG, "Could not ensure cloud database exists: " + e.getMessage());
+ }
+ sCloudDatabase = db;
+ }
+ }
+ }
+
+ private static final Object sUserSyncgroupMutex = new Object();
+ private static volatile Syncgroup sUserSyncgroup;
+
+ private static void ensureUserSyncgroupExists() throws VException {
+ synchronized (sUserSyncgroupMutex) {
+ if (sUserSyncgroup == null) {
+ Blessings clientBlessings = getPersonalBlessings(sVContext);
+ String[] split = clientBlessings.toString().split(":");
+ String email = split[split.length - 1];
+ Log.d(TAG, email);
+
+ Permissions permissions = computePermissionsFromBlessings(clientBlessings);
+
+ String sgName = CLOUD_NAME + USER_COLLECTION_SYNCGROUP_SUFFIX + email;
+
+ final SyncgroupMemberInfo memberInfo = getDefaultMemberInfo();
+ final Syncgroup sgHandle = sDatabase.getSyncgroup(sgName);
+
+ try {
+ Log.d(TAG, "Trying to join the syncgroup: " + sgName);
+ VFutures.sync(sgHandle.join(sVContext, memberInfo));
+ Log.d(TAG, "JOINED the syncgroup: " + sgName);
+ } catch (SyncgroupJoinFailedException e) {
+ Log.w(TAG, "Failed join. Trying to create the syncgroup: " + sgName, e);
+ SyncgroupSpec spec = new SyncgroupSpec(
+ "TODOs User Data Collection", permissions,
+ ImmutableList.of(new CollectionRow(sUserCollection.id(), "")),
+ ImmutableList.of(MOUNTPOINT), false);
+ try {
+ VFutures.sync(sgHandle.create(sVContext, spec, memberInfo));
+ } catch (BadAdvertisementException e2) {
+ Log.d(TAG, "Bad advertisement exception. Can we fix this?");
+ }
+ Log.d(TAG, "CREATED the syncgroup: " + sgName);
+ } catch (Exception e) {
+ Log.d(TAG, "Failed to join or create the syncgroup: " + sgName);
+ if (!(e instanceof BadAdvertisementException)) { // joined, I guess
+ throw e;
+ }
+ }
+ sUserSyncgroup = sgHandle;
+ }
+ }
+ }
+
+ protected static SyncgroupMemberInfo getDefaultMemberInfo() {
+ SyncgroupMemberInfo memberInfo = new SyncgroupMemberInfo();
+ memberInfo.setSyncPriority((byte) 3);
+ return memberInfo;
+
+ }
+
+ protected String computeListSyncgroupName(String listId) {
+ return CLOUD_NAME + LIST_COLLECTION_SYNCGROUP_SUFFIX + listId;
+ }
+
+ private static volatile boolean sInitialized;
+
public static boolean isInitialized() {
- return sUserCollection != null;
+ return sInitialized;
}
/**
@@ -205,7 +309,7 @@
@Override
public void onFailure(@NonNull Throwable t) {
- if (!(t instanceof CanceledException)) {
+ if (!(t instanceof CanceledException || t instanceof ExistException)) {
Toast.makeText(mAndroidContext, R.string.err_sync, Toast.LENGTH_LONG).show();
StringBuilder traceBuilder = new StringBuilder(Throwables.getStackTraceAsString(t))
.append("\n invoked at ").append(mCaller[FIRST_SIGNIFICANT_STACK_ELEMENT]);
@@ -220,10 +324,9 @@
/**
* Extracts the value from a watch change.
* TODO(rosswang): This method is a tempory hack, awaiting resolution of the following issues:
- *
* <ul>
- * <li><a href="https://github.com/vanadium/issues/issues/1305">#1305</a>
- * <li><a href="https://github.com/vanadium/issues/issues/1310">#1310</a>
+ * <li><a href="https://github.com/vanadium/issues/issues/1305">#1305</a>
+ * <li><a href="https://github.com/vanadium/issues/issues/1310">#1310</a>
* </ul>
*/
@SuppressWarnings("unchecked")
@@ -264,7 +367,9 @@
public SyncbasePersistence(final Activity activity)
throws VException, SyncbaseServer.StartException {
mActivity = activity;
- mVContext = V.init(activity);
+ mVContext = V.init(activity, new Options()
+ .set(OptionDefs.LOG_VLEVEL, 0)
+ .set(OptionDefs.LOG_VMODULE, "vsync*=0"));
// We might not actually have to seek blessings each time, but getBlessings does not
// block if we already have blessings and this has better-behaved lifecycle
@@ -285,6 +390,9 @@
ensureSyncbaseStarted(activity);
ensureDatabaseExists();
ensureUserCollectionExists();
+ ensureCloudDatabaseExists();
+ ensureUserSyncgroupExists();
+ sInitialized = true;
}
@Override