Initial attempt at SyncbaseMain persistence

Change-Id: Ic280802b901b22491d72f7f3cfc8d1b5bba37d10
diff --git a/app/src/main/java/io/v/todos/model/ListMetadata.java b/app/src/main/java/io/v/todos/model/ListMetadata.java
index 46b89ec..074a46d 100644
--- a/app/src/main/java/io/v/todos/model/ListMetadata.java
+++ b/app/src/main/java/io/v/todos/model/ListMetadata.java
@@ -6,6 +6,8 @@
 
 import android.support.annotation.NonNull;
 
+import java.util.Date;
+
 /**
  * Tracks information regarding a particular todo list.
  */
@@ -69,4 +71,10 @@
     public ListSpec toSpec() {
         return new ListSpec(name, updatedAt);
     }
+
+    @Override
+    public String toString() {
+        return key + ": " + name + " (" + numCompleted + "/" + numTasks + "), last updated " +
+                new Date(updatedAt);
+    }
 }
diff --git a/app/src/main/java/io/v/todos/model/ListSpec.java b/app/src/main/java/io/v/todos/model/ListSpec.java
index 06302d8..44c3bbd 100644
--- a/app/src/main/java/io/v/todos/model/ListSpec.java
+++ b/app/src/main/java/io/v/todos/model/ListSpec.java
@@ -4,10 +4,12 @@
 
 package io.v.todos.model;
 
+import java.io.Serializable;
+
 /**
  * POJO of persisted information regarding a particular todo list.
  */
-public class ListSpec {
+public class ListSpec implements Serializable {
     private String mName;
     private long mUpdatedAt;
 
diff --git a/app/src/main/java/io/v/todos/model/TaskSpec.java b/app/src/main/java/io/v/todos/model/TaskSpec.java
index f3f243d..dc3858a 100644
--- a/app/src/main/java/io/v/todos/model/TaskSpec.java
+++ b/app/src/main/java/io/v/todos/model/TaskSpec.java
@@ -4,12 +4,14 @@
 
 package io.v.todos.model;
 
+import java.io.Serializable;
+
 /**
  * POJO of persisted information regarding a particular task.
  *
  * @author alexfandrianto
  */
-public class TaskSpec {
+public class TaskSpec implements Serializable {
     private String mText;
     private long mAddedAt;
     private boolean mDone;
diff --git a/app/src/syncbase/java/io/v/todos/persistence/syncbase/MainListTracker.java b/app/src/syncbase/java/io/v/todos/persistence/syncbase/MainListTracker.java
new file mode 100644
index 0000000..744ee81
--- /dev/null
+++ b/app/src/syncbase/java/io/v/todos/persistence/syncbase/MainListTracker.java
@@ -0,0 +1,119 @@
+// Copyright 2016 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package io.v.todos.persistence.syncbase;
+
+import android.util.Log;
+
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.annotation.Nullable;
+
+import io.v.todos.model.ListMetadata;
+import io.v.todos.model.ListSpec;
+import io.v.todos.model.TaskSpec;
+import io.v.todos.persistence.ListEventListener;
+import io.v.v23.InputChannel;
+import io.v.v23.InputChannelCallback;
+import io.v.v23.InputChannels;
+import io.v.v23.context.VContext;
+import io.v.v23.syncbase.ChangeType;
+import io.v.v23.syncbase.Collection;
+import io.v.v23.syncbase.Database;
+import io.v.v23.syncbase.WatchChange;
+
+/**
+ * This class aggregates Todo-list watch data from Syncbase into {@link ListMetadata}.
+ */
+public class MainListTracker {
+    private static final String TAG = MainListTracker.class.getSimpleName();
+
+    private final VContext mWatchContext;
+    private final Collection mList;
+    private final ListEventListener<ListMetadata> mListener;
+    private ListSpec mListSpec;
+
+    private final Map<String, Boolean> mIsTaskCompleted = new HashMap<>();
+    private int mNumCompletedTasks;
+    private boolean mFireUpdates;
+
+    public final ListenableFuture<Void> watchFuture;
+
+    public MainListTracker(VContext vContext, Database database, String listId,
+                           ListEventListener<ListMetadata> listener) {
+        mList = database.getCollection(vContext, listId);
+        mListener = listener;
+
+        mWatchContext = vContext.withCancel();
+        InputChannel<WatchChange> watch = database.watch(mWatchContext, mList.id(), "");
+        watchFuture = InputChannels.withCallback(watch, new InputChannelCallback<WatchChange>() {
+            @Override
+            public ListenableFuture<Void> onNext(WatchChange change) {
+                processWatchChange(change);
+                return null;
+            }
+        });
+    }
+
+    public ListenableFuture<Void> deleteList(VContext vContext) {
+        // The watch context has to be cancelled first or else we may run into race conditions as
+        // the collection is destroyed while the watch is still ongoing, which fails the watch with
+        // a NoExistException. Alternatively we could just ignore that exception and not bother
+        // cancelling the watch at all.
+        mWatchContext.cancel();
+        return Futures.transform(mList.destroy(vContext),
+                new Function<Void, Void>() {
+                    @Override
+                    public Void apply(@Nullable Void input) {
+                        mListener.onItemDelete(mList.id().getName());
+                        return null;
+                    }
+                });
+    }
+
+    public ListMetadata getListMetadata() {
+        return new ListMetadata(mList.id().getName(), mListSpec, mNumCompletedTasks,
+                mIsTaskCompleted.size());
+    }
+
+    private void processWatchChange(WatchChange change) {
+        String rowName = change.getRowName();
+
+        if (rowName.equals(SyncbaseTodoList.LIST_ROW_NAME)) {
+            mListSpec = SyncbasePersistence.castWatchValue(change.getValue(), ListSpec.class);
+        } else if (change.getChangeType() == ChangeType.DELETE_CHANGE) {
+            if (mIsTaskCompleted.remove(rowName)) {
+                mNumCompletedTasks--;
+            }
+        } else {
+            boolean isDone = SyncbasePersistence.castWatchValue(change.getValue(), TaskSpec.class)
+                    .getDone();
+            Boolean rawWasDone = mIsTaskCompleted.put(rowName, isDone);
+            boolean wasDone = rawWasDone != null && rawWasDone;
+            if (!wasDone && isDone) {
+                mNumCompletedTasks++;
+            } else if (wasDone && !isDone) {
+                mNumCompletedTasks--;
+            }
+        }
+
+        // Don't fire events until we've processed the entire batch of watch events.
+        if (!change.isContinued()) {
+            ListMetadata listMetadata = getListMetadata();
+            Log.d(TAG, listMetadata.toString());
+
+            if (mFireUpdates) {
+                mListener.onItemUpdate(listMetadata);
+            } else {
+                mFireUpdates = true;
+                mListener.onItemAdd(listMetadata);
+            }
+        }
+    }
+}
diff --git a/app/src/syncbase/java/io/v/todos/persistence/syncbase/SyncbaseMain.java b/app/src/syncbase/java/io/v/todos/persistence/syncbase/SyncbaseMain.java
index 8c7ae05..14c1ddf 100644
--- a/app/src/syncbase/java/io/v/todos/persistence/syncbase/SyncbaseMain.java
+++ b/app/src/syncbase/java/io/v/todos/persistence/syncbase/SyncbaseMain.java
@@ -5,19 +5,47 @@
 package io.v.todos.persistence.syncbase;
 
 import android.app.Activity;
+import android.util.Log;
+
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import javax.annotation.Nullable;
 
 import io.v.impl.google.services.syncbase.SyncbaseServer;
 import io.v.todos.model.ListMetadata;
 import io.v.todos.model.ListSpec;
+import io.v.todos.model.TaskSpec;
 import io.v.todos.persistence.ListEventListener;
 import io.v.todos.persistence.MainPersistence;
+import io.v.v23.InputChannel;
+import io.v.v23.InputChannelCallback;
+import io.v.v23.InputChannels;
 import io.v.v23.VFutures;
+import io.v.v23.services.syncbase.BatchOptions;
+import io.v.v23.services.syncbase.KeyValue;
+import io.v.v23.syncbase.Batch;
+import io.v.v23.syncbase.BatchDatabase;
+import io.v.v23.syncbase.ChangeType;
 import io.v.v23.syncbase.Collection;
+import io.v.v23.syncbase.RowRange;
+import io.v.v23.syncbase.WatchChange;
+import io.v.v23.vdl.VdlAny;
 import io.v.v23.verror.ExistException;
 import io.v.v23.verror.VException;
+import io.v.v23.vom.VomUtil;
 
 public class SyncbaseMain extends SyncbasePersistence implements MainPersistence {
-    public static final String MAIN_COLLECTION_NAME = "userdata";
+    private static final String
+            TAG = SyncbaseMain.class.getSimpleName(),
+            MAIN_COLLECTION_NAME = "userdata",
+            LISTS_PREFIX = "lists_";
 
     private static final Object sMainCollectionMutex = new Object();
     private static volatile Collection sMainCollection;
@@ -26,10 +54,12 @@
         return sMainCollection != null;
     }
 
+    private final Map<String, MainListTracker> mTaskTrackers = new HashMap<>();
+
     /**
      * This constructor blocks until the instance is ready for use.
      */
-    public SyncbaseMain(Activity activity, ListEventListener<ListMetadata> listener)
+    public SyncbaseMain(Activity activity, final ListEventListener<ListMetadata> listener)
             throws VException, SyncbaseServer.StartException {
         super(activity);
 
@@ -45,20 +75,88 @@
                 sMainCollection = mainCollection;
             }
         }
+
+        InputChannel<WatchChange> watch = getDatabase().watch(
+                mVContext, sMainCollection.id(), LISTS_PREFIX);
+        trap(InputChannels.withCallback(watch, new InputChannelCallback<WatchChange>() {
+            @Override
+            public ListenableFuture<Void> onNext(WatchChange change) {
+                String listId = change.getRowName();
+
+                if (change.getChangeType() == ChangeType.DELETE_CHANGE) {
+                    trap(mTaskTrackers.remove(listId).deleteList(mVContext));
+                } else {
+                    MainListTracker listTracker = new MainListTracker(
+                            mVContext, getDatabase(), listId, listener);
+                    if (mTaskTrackers.put(listId, listTracker) != null) {
+                        // List entries in the main collection are just ( list ID => nil ), so we
+                        // never expect updates other than an initial add...
+                        Log.w(TAG, "Unexpected update to " + MAIN_COLLECTION_NAME + " collection " +
+                                "for list " + listId);
+                    }
+
+                    trap(listTracker.watchFuture);
+                }
+                return null;
+            }
+        }));
     }
 
     @Override
-    public void addTodoList(ListSpec listSpec) {
-
+    public void addTodoList(final ListSpec listSpec) {
+        final String listName = LISTS_PREFIX + UUID.randomUUID().toString().replace('-', '_');
+        final Collection listCollection = getDatabase().getCollection(mVContext, listName);
+        Futures.addCallback(listCollection.create(mVContext, null),
+                new TrappingCallback<Void>(mActivity) {
+                    @Override
+                    public void onSuccess(@Nullable Void result) {
+                        // These can happen in either order
+                        trap(sMainCollection.put(mVContext, listName, null, VdlAny.class));
+                        trap(listCollection.put(mVContext, SyncbaseTodoList.LIST_ROW_NAME, listSpec,
+                                ListSpec.class));
+                    }
+                });
     }
 
     @Override
     public void deleteTodoList(String key) {
-
+        trap(sMainCollection.delete(mVContext, key));
     }
 
     @Override
     public void completeAllTasks(ListMetadata listMetadata) {
+        final String listId = listMetadata.key;
+        trap(Batch.runInBatch(mVContext, getDatabase(), new BatchOptions(),
+                new Batch.BatchOperation() {
+                    @Override
+                    public ListenableFuture<Void> run(BatchDatabase db) {
+                        final Collection list = db.getCollection(mVContext, listId);
 
+                        InputChannel<KeyValue> scan = list.scan(mVContext,
+                                RowRange.prefix(SyncbaseTodoList.TASKS_PREFIX));
+                        InputChannel<ListenableFuture<Void>> puts = InputChannels.transform(
+                                mVContext, scan, new InputChannels.TransformFunction<KeyValue,
+                                        ListenableFuture<Void>>() {
+                                    @Override
+                                    public ListenableFuture<Void> apply(KeyValue kv)
+                                            throws VException {
+                                        TaskSpec taskSpec =
+                                                (TaskSpec) VomUtil.decode(kv.getValue());
+                                        taskSpec.setDone(true);
+                                        return list.put(mVContext, kv.getKey(), taskSpec,
+                                                TaskSpec.class);
+                                    }
+                                });
+
+                        return Futures.transform(Futures.allAsList(InputChannels.asIterable(puts)),
+                                new Function<List<Void>, Void>() {
+                                    @Nullable
+                                    @Override
+                                    public Void apply(@Nullable List<Void> input) {
+                                        return null;
+                                    }
+                                });
+                    }
+                }));
     }
 }
diff --git a/app/src/syncbase/java/io/v/todos/persistence/syncbase/SyncbasePersistence.java b/app/src/syncbase/java/io/v/todos/persistence/syncbase/SyncbasePersistence.java
index 63ecc2c..75b9342 100644
--- a/app/src/syncbase/java/io/v/todos/persistence/syncbase/SyncbasePersistence.java
+++ b/app/src/syncbase/java/io/v/todos/persistence/syncbase/SyncbasePersistence.java
@@ -7,12 +7,16 @@
 import android.app.Activity;
 import android.content.Context;
 import android.os.Handler;
+import android.support.annotation.NonNull;
 import android.util.Log;
+import android.widget.Toast;
 
 import com.google.common.base.Supplier;
 import com.google.common.base.Suppliers;
+import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
@@ -22,9 +26,12 @@
 import java.io.File;
 import java.util.concurrent.Executors;
 
+import javax.annotation.Nullable;
+
 import io.v.android.libs.security.BlessingsManager;
 import io.v.android.v23.V;
 import io.v.impl.google.services.syncbase.SyncbaseServer;
+import io.v.todos.R;
 import io.v.todos.persistence.Persistence;
 import io.v.v23.VFutures;
 import io.v.v23.context.VContext;
@@ -37,8 +44,11 @@
 import io.v.v23.syncbase.Database;
 import io.v.v23.syncbase.Syncbase;
 import io.v.v23.syncbase.SyncbaseService;
+import io.v.v23.vdl.VdlStruct;
+import io.v.v23.verror.CanceledException;
 import io.v.v23.verror.ExistException;
 import io.v.v23.verror.VException;
+import io.v.v23.vom.VomUtil;
 
 /**
  * TODO(rosswang): Move most of this to vanadium-android.
@@ -152,6 +162,61 @@
         return sDatabase;
     }
 
+    /**
+     * A {@link FutureCallback} that reports persistence errors by toasting a short message to the
+     * user and logging the exception trace and the call stack from where the future was invoked.
+     */
+    public static class TrappingCallback<T> implements FutureCallback<T> {
+        private static final int FIRST_SIGNIFICANT_STACK_ELEMENT = 3;
+        private final Context mAndroidContext;
+        private final StackTraceElement[] mCaller;
+
+        public TrappingCallback(Context androidContext) {
+            mAndroidContext = androidContext;
+            mCaller = Thread.currentThread().getStackTrace();
+        }
+
+        @Override
+        public void onSuccess(@Nullable T result) {
+        }
+
+        @Override
+        public void onFailure(@NonNull Throwable t) {
+            if (!(t instanceof CanceledException)) {
+                Toast.makeText(mAndroidContext, R.string.err_sync, Toast.LENGTH_LONG).show();
+                StringBuilder traceBuilder = new StringBuilder(Throwables.getStackTraceAsString(t))
+                        .append("\n invoked at ").append(mCaller[FIRST_SIGNIFICANT_STACK_ELEMENT]);
+                for (int i = FIRST_SIGNIFICANT_STACK_ELEMENT + 1; i < mCaller.length; i++) {
+                    traceBuilder.append("\n\tat ").append(mCaller[i]);
+                }
+                Log.e(TAG, traceBuilder.toString());
+            }
+        }
+    }
+
+    /**
+     * Extracts the value from a watch change.
+     * TODO(rosswang): This method is a tempory hack, awaiting resolution of the following issues:
+     *
+     * <ul>
+     *  <li><a href="https://github.com/vanadium/issues/issues/1305">#1305</a>
+     *  <li><a href="https://github.com/vanadium/issues/issues/1310">#1310</a>
+     * </ul>
+     */
+    @SuppressWarnings("unchecked")
+    public static <T> T castWatchValue(Object watchValue, Class<T> type) {
+        if (type.isInstance(watchValue)) {
+            return (T) watchValue;
+        }
+
+        try {
+            return (T) VomUtil.decode(VomUtil.encode((VdlStruct) watchValue), type);
+        } catch (VException e) {
+            Log.e(TAG, Throwables.getStackTraceAsString(e));
+            throw new ClassCastException("Could not cast " + watchValue + " to " + type);
+        }
+    }
+
     protected final Activity mActivity;
     protected final VContext mVContext;
 
@@ -160,6 +225,13 @@
     }
 
     /**
+     * @see TrappingCallback
+     */
+    protected void trap(ListenableFuture<?> future) {
+        Futures.addCallback(future, new TrappingCallback<>(mActivity));
+    }
+
+    /**
      * This constructor is blocking for simplicity.
      */
     public SyncbasePersistence(final Activity activity)
diff --git a/app/src/syncbase/java/io/v/todos/persistence/syncbase/SyncbaseTodoList.java b/app/src/syncbase/java/io/v/todos/persistence/syncbase/SyncbaseTodoList.java
new file mode 100644
index 0000000..56f56e9
--- /dev/null
+++ b/app/src/syncbase/java/io/v/todos/persistence/syncbase/SyncbaseTodoList.java
@@ -0,0 +1,11 @@
+// Copyright 2016 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package io.v.todos.persistence.syncbase;
+
+public class SyncbaseTodoList {
+    public static final String
+            LIST_ROW_NAME = "list",
+            TASKS_PREFIX = "tasks_";
+}