java/syncbase: Run all callbacks with an executor
Android/Default: Use an executor that will run stuff on the main
thread.
Note: The default executor won't work for tests, so we swap it out
for a more standard cached thread pool.
The strategy used matches the one we had for the old Java API.
Change-Id: Icf139eb5406347dcf41e2d44f2969fe48c349d23
diff --git a/syncbase/src/main/java/io/v/syncbase/Database.java b/syncbase/src/main/java/io/v/syncbase/Database.java
index c94820e..2da0828 100644
--- a/syncbase/src/main/java/io/v/syncbase/Database.java
+++ b/syncbase/src/main/java/io/v/syncbase/Database.java
@@ -5,12 +5,14 @@
package io.v.syncbase;
import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.SettableFuture;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
import io.v.syncbase.core.CollectionRowPattern;
import io.v.syncbase.core.SyncgroupMemberInfo;
@@ -162,8 +164,22 @@
new io.v.syncbase.internal.Database.SyncgroupInvitesCallbacks() {
@Override
- public void onInvite(io.v.syncbase.core.SyncgroupInvite invite) {
- h.onInvite(new SyncgroupInvite(new Id(invite.syncgroup), invite.blessingNames));
+ public void onInvite(final io.v.syncbase.core.SyncgroupInvite invite) {
+ final SettableFuture<Boolean> setFuture = SettableFuture.create();
+ Syncbase.sOpts.callbackExecutor.execute(new Runnable() {
+ @Override
+ public void run() {
+ h.onInvite(new SyncgroupInvite(new Id(invite.syncgroup),
+ invite.blessingNames));
+ setFuture.set(true);
+ }
+ });
+ try {
+ setFuture.get();
+ } catch (InterruptedException | ExecutionException e) {
+ e.printStackTrace();
+ System.err.println(e.toString());
+ }
}
});
mSyncgroupInviteHandlers.put(h, scanId);
@@ -388,11 +404,32 @@
// TODO(razvanm): Ignore changes to userdata collection.
mBatch.add(new WatchChange(coreWatchChange));
if (!coreWatchChange.continued) {
+ final SettableFuture<Boolean> setFuture = SettableFuture.create();
if (!mGotFirstBatch) {
mGotFirstBatch = true;
- h.onInitialState(mBatch.iterator());
+ final List<WatchChange> cpBatch = mBatch;
+ Syncbase.sOpts.callbackExecutor.execute(new Runnable() {
+ @Override
+ public void run() {
+ h.onInitialState(cpBatch.iterator());
+ setFuture.set(true);
+ }
+ });
} else {
- h.onChangeBatch(mBatch.iterator());
+ final List<WatchChange> cpBatch = mBatch;
+ Syncbase.sOpts.callbackExecutor.execute(new Runnable() {
+ @Override
+ public void run() {
+ h.onChangeBatch(cpBatch.iterator());
+ setFuture.set(true);
+ }
+ });
+ }
+ try {
+ setFuture.get();
+ } catch (InterruptedException | ExecutionException e) {
+ e.printStackTrace();
+ System.err.println(e.toString());
}
mBatch.clear();
}
diff --git a/syncbase/src/main/java/io/v/syncbase/Syncbase.java b/syncbase/src/main/java/io/v/syncbase/Syncbase.java
index ca33935..c0e93b7 100644
--- a/syncbase/src/main/java/io/v/syncbase/Syncbase.java
+++ b/syncbase/src/main/java/io/v/syncbase/Syncbase.java
@@ -6,18 +6,21 @@
import android.app.Activity;
import android.app.FragmentTransaction;
+import android.os.Handler;
+import android.os.Looper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.SettableFuture;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
import io.v.syncbase.android.LoginFragment;
import io.v.syncbase.core.NeighborhoodPeer;
@@ -47,6 +50,8 @@
* Options for opening a database.
*/
public static class Options {
+ // The executor used to execute callbacks.
+ public Executor callbackExecutor = UiThreadExecutor.INSTANCE;
// Where data should be persisted.
public String rootDir;
// We use an empty mountPoints to avoid talking to the global mounttabled.
@@ -75,6 +80,24 @@
}
}
+ /**
+ * Executor that executes all of its commands on the Android UI thread.
+ */
+ private static class UiThreadExecutor implements Executor {
+ /**
+ * Singleton instance of the UiThreadExecutor.
+ */
+ public static final UiThreadExecutor INSTANCE = new UiThreadExecutor();
+
+ private final Handler handler = new Handler(Looper.getMainLooper());
+
+ @Override
+ public void execute(Runnable runnable) {
+ handler.post(runnable);
+ }
+ private UiThreadExecutor() {}
+ }
+
static Options sOpts;
private static Database sDatabase;
static Collection sUserdataCollection;
@@ -90,18 +113,6 @@
DB_NAME = "db",
USERDATA_SYNCGROUP_NAME = "userdata__";
- private static void enqueue(final Runnable r) {
- // Note, we use Timer rather than Handler because the latter must be mocked out for tests,
- // which is rather annoying.
- //new Handler().post(r);
- new Timer().schedule(new TimerTask() {
- @Override
- public void run() {
- r.run();
- }
- }, 0);
- }
-
private static Map selfAndCloud() throws VError {
return ImmutableMap.of(Permissions.IN,
ImmutableList.of(getPersonalBlessingString(), sOpts.getCloudBlessingString()));
@@ -209,7 +220,7 @@
throw new IllegalArgumentException("Unsupported provider: " + provider);
}
- Syncbase.enqueue(new Runnable() {
+ new Thread(new Runnable() {
@Override
public void run() {
try {
@@ -230,17 +241,17 @@
syncgroup.createIfMissing(ImmutableList.of(sUserdataCollection));
sDatabase.addWatchChangeHandler(new UserdataWatchHandler());
}
- Syncbase.enqueue(new Runnable() {
- @Override
- public void run() {
- cb.onSuccess();
- }
- });
+ sOpts.callbackExecutor.execute(new Runnable() {
+ @Override
+ public void run() {
+ cb.onSuccess();
+ }
+ });
} catch (VError vError) {
cb.onError(vError);
}
}
- });
+ }).start();
}
private static class UserdataWatchHandler extends Database.WatchChangeHandler {
@@ -279,11 +290,25 @@
try {
long scanId = Neighborhood.NewScan(new Neighborhood.NeighborhoodScanCallbacks() {
@Override
- public void onPeer(NeighborhoodPeer peer) {
- if (peer.isLost) {
- cb.onLost(new User(getAliasFromBlessingPattern(peer.blessings)));
- } else {
- cb.onFound(new User(getAliasFromBlessingPattern(peer.blessings)));
+ public void onPeer(final NeighborhoodPeer peer) {
+ final SettableFuture<Boolean> setFuture = SettableFuture.create();
+ Syncbase.sOpts.callbackExecutor.execute(new Runnable() {
+ @Override
+ public void run() {
+ User u = new User(getAliasFromBlessingPattern(peer.blessings));
+ if (peer.isLost) {
+ cb.onLost(u);
+ } else {
+ cb.onFound(u);
+ }
+ setFuture.set(true);
+ }
+ });
+ try {
+ setFuture.get();
+ } catch (InterruptedException | ExecutionException e) {
+ e.printStackTrace();
+ System.err.println(e.toString());
}
}
});
diff --git a/syncbase/src/test/java/io/v/syncbase/SyncbaseTest.java b/syncbase/src/test/java/io/v/syncbase/SyncbaseTest.java
index 97bf1b0..6adc214 100644
--- a/syncbase/src/test/java/io/v/syncbase/SyncbaseTest.java
+++ b/syncbase/src/test/java/io/v/syncbase/SyncbaseTest.java
@@ -16,6 +16,7 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import io.v.syncbase.core.Permissions;
@@ -43,6 +44,8 @@
opts.disableUserdataSyncgroup = true;
opts.disableSyncgroupPublishing = true;
opts.testLogin = true;
+ // Unlike Android apps, the test doesn't have a looper/handler, so use a different executor.
+ opts.callbackExecutor = Executors.newCachedThreadPool();
Syncbase.init(opts);
}