java/syncbase: Run all callbacks with an executor

Android/Default: Use an executor that will run stuff on the main
thread.

Note: The default executor won't work for tests, so we swap it out
for a more standard cached thread pool.

The strategy used matches the one we had for the old Java API.

Change-Id: Icf139eb5406347dcf41e2d44f2969fe48c349d23
diff --git a/syncbase/src/main/java/io/v/syncbase/Database.java b/syncbase/src/main/java/io/v/syncbase/Database.java
index c94820e..2da0828 100644
--- a/syncbase/src/main/java/io/v/syncbase/Database.java
+++ b/syncbase/src/main/java/io/v/syncbase/Database.java
@@ -5,12 +5,14 @@
 package io.v.syncbase;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.SettableFuture;
 
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
 
 import io.v.syncbase.core.CollectionRowPattern;
 import io.v.syncbase.core.SyncgroupMemberInfo;
@@ -162,8 +164,22 @@
                         new io.v.syncbase.internal.Database.SyncgroupInvitesCallbacks() {
 
                     @Override
-                    public void onInvite(io.v.syncbase.core.SyncgroupInvite invite) {
-                        h.onInvite(new SyncgroupInvite(new Id(invite.syncgroup), invite.blessingNames));
+                    public void onInvite(final io.v.syncbase.core.SyncgroupInvite invite) {
+                        final SettableFuture<Boolean> setFuture = SettableFuture.create();
+                        Syncbase.sOpts.callbackExecutor.execute(new Runnable() {
+                            @Override
+                            public void run() {
+                                h.onInvite(new SyncgroupInvite(new Id(invite.syncgroup),
+                                        invite.blessingNames));
+                                setFuture.set(true);
+                            }
+                        });
+                        try {
+                            setFuture.get();
+                        } catch (InterruptedException | ExecutionException e) {
+                            e.printStackTrace();
+                            System.err.println(e.toString());
+                        }
                     }
                 });
                 mSyncgroupInviteHandlers.put(h, scanId);
@@ -388,11 +404,32 @@
                         // TODO(razvanm): Ignore changes to userdata collection.
                         mBatch.add(new WatchChange(coreWatchChange));
                         if (!coreWatchChange.continued) {
+                            final SettableFuture<Boolean> setFuture = SettableFuture.create();
                             if (!mGotFirstBatch) {
                                 mGotFirstBatch = true;
-                                h.onInitialState(mBatch.iterator());
+                                final List<WatchChange> cpBatch = mBatch;
+                                Syncbase.sOpts.callbackExecutor.execute(new Runnable() {
+                                    @Override
+                                    public void run() {
+                                        h.onInitialState(cpBatch.iterator());
+                                        setFuture.set(true);
+                                    }
+                                });
                             } else {
-                                h.onChangeBatch(mBatch.iterator());
+                                final List<WatchChange> cpBatch = mBatch;
+                                Syncbase.sOpts.callbackExecutor.execute(new Runnable() {
+                                    @Override
+                                    public void run() {
+                                        h.onChangeBatch(cpBatch.iterator());
+                                        setFuture.set(true);
+                                    }
+                                });
+                            }
+                            try {
+                                setFuture.get();
+                            } catch (InterruptedException | ExecutionException e) {
+                                e.printStackTrace();
+                                System.err.println(e.toString());
                             }
                             mBatch.clear();
                         }
diff --git a/syncbase/src/main/java/io/v/syncbase/Syncbase.java b/syncbase/src/main/java/io/v/syncbase/Syncbase.java
index ca33935..c0e93b7 100644
--- a/syncbase/src/main/java/io/v/syncbase/Syncbase.java
+++ b/syncbase/src/main/java/io/v/syncbase/Syncbase.java
@@ -6,18 +6,21 @@
 
 import android.app.Activity;
 import android.app.FragmentTransaction;
+import android.os.Handler;
+import android.os.Looper;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.SettableFuture;
 
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
 import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
 
 import io.v.syncbase.android.LoginFragment;
 import io.v.syncbase.core.NeighborhoodPeer;
@@ -47,6 +50,8 @@
      * Options for opening a database.
      */
     public static class Options {
+        // The executor used to execute callbacks.
+        public Executor callbackExecutor = UiThreadExecutor.INSTANCE;
         // Where data should be persisted.
         public String rootDir;
         // We use an empty mountPoints to avoid talking to the global mounttabled.
@@ -75,6 +80,24 @@
         }
     }
 
+    /**
+     * Executor that executes all of its commands on the Android UI thread.
+     */
+    private static class UiThreadExecutor implements Executor {
+        /**
+         * Singleton instance of the UiThreadExecutor.
+         */
+        public static final UiThreadExecutor INSTANCE = new UiThreadExecutor();
+
+        private final Handler handler = new Handler(Looper.getMainLooper());
+
+        @Override
+        public void execute(Runnable runnable) {
+            handler.post(runnable);
+        }
+        private UiThreadExecutor() {}
+    }
+
     static Options sOpts;
     private static Database sDatabase;
     static Collection sUserdataCollection;
@@ -90,18 +113,6 @@
             DB_NAME = "db",
             USERDATA_SYNCGROUP_NAME = "userdata__";
 
-    private static void enqueue(final Runnable r) {
-        // Note, we use Timer rather than Handler because the latter must be mocked out for tests,
-        // which is rather annoying.
-        //new Handler().post(r);
-        new Timer().schedule(new TimerTask() {
-            @Override
-            public void run() {
-                r.run();
-            }
-        }, 0);
-    }
-
     private static Map selfAndCloud() throws VError {
         return ImmutableMap.of(Permissions.IN,
                 ImmutableList.of(getPersonalBlessingString(), sOpts.getCloudBlessingString()));
@@ -209,7 +220,7 @@
             throw new IllegalArgumentException("Unsupported provider: " + provider);
         }
 
-        Syncbase.enqueue(new Runnable() {
+        new Thread(new Runnable() {
             @Override
             public void run() {
                 try {
@@ -230,17 +241,17 @@
                         syncgroup.createIfMissing(ImmutableList.of(sUserdataCollection));
                         sDatabase.addWatchChangeHandler(new UserdataWatchHandler());
                     }
-                    Syncbase.enqueue(new Runnable() {
-                            @Override
-                            public void run() {
-                                cb.onSuccess();
-                            }
-                        });
+                    sOpts.callbackExecutor.execute(new Runnable() {
+                        @Override
+                        public void run() {
+                            cb.onSuccess();
+                        }
+                    });
                 } catch (VError vError) {
                     cb.onError(vError);
                 }
             }
-        });
+        }).start();
     }
 
     private static class UserdataWatchHandler extends Database.WatchChangeHandler {
@@ -279,11 +290,25 @@
             try {
                 long scanId = Neighborhood.NewScan(new Neighborhood.NeighborhoodScanCallbacks() {
                     @Override
-                    public void onPeer(NeighborhoodPeer peer) {
-                        if (peer.isLost) {
-                            cb.onLost(new User(getAliasFromBlessingPattern(peer.blessings)));
-                        } else {
-                            cb.onFound(new User(getAliasFromBlessingPattern(peer.blessings)));
+                    public void onPeer(final NeighborhoodPeer peer) {
+                        final SettableFuture<Boolean> setFuture = SettableFuture.create();
+                        Syncbase.sOpts.callbackExecutor.execute(new Runnable() {
+                            @Override
+                            public void run() {
+                                User u = new User(getAliasFromBlessingPattern(peer.blessings));
+                                if (peer.isLost) {
+                                    cb.onLost(u);
+                                } else {
+                                    cb.onFound(u);
+                                }
+                                setFuture.set(true);
+                            }
+                        });
+                        try {
+                            setFuture.get();
+                        } catch (InterruptedException | ExecutionException e) {
+                            e.printStackTrace();
+                            System.err.println(e.toString());
                         }
                     }
                 });
diff --git a/syncbase/src/test/java/io/v/syncbase/SyncbaseTest.java b/syncbase/src/test/java/io/v/syncbase/SyncbaseTest.java
index 97bf1b0..6adc214 100644
--- a/syncbase/src/test/java/io/v/syncbase/SyncbaseTest.java
+++ b/syncbase/src/test/java/io/v/syncbase/SyncbaseTest.java
@@ -16,6 +16,7 @@
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import io.v.syncbase.core.Permissions;
@@ -43,6 +44,8 @@
         opts.disableUserdataSyncgroup = true;
         opts.disableSyncgroupPublishing = true;
         opts.testLogin = true;
+        // Unlike Android apps, the test doesn't have a looper/handler, so use a different executor.
+        opts.callbackExecutor = Executors.newCachedThreadPool();
         Syncbase.init(opts);
     }