Java: add a common iterator class for various iterators we use

Change the iterator behavior to never throw a RuntimeException
but to terminate early instead, reporting any errors encountered
via a separate method.

Also unified syncbase Stream and iterator.  The end result is that
the only way to close the syncbase Stream is to cancel the context.

MultiPart: 1/2

Change-Id: I7f49270f69aa805761a7cd14a0cda4e8d94e4ea7
diff --git a/lib/src/main/java/io/v/impl/google/channel/ChannelIterable.java b/lib/src/main/java/io/v/impl/google/channel/ChannelIterable.java
index 309272f..36a7655 100644
--- a/lib/src/main/java/io/v/impl/google/channel/ChannelIterable.java
+++ b/lib/src/main/java/io/v/impl/google/channel/ChannelIterable.java
@@ -5,23 +5,30 @@
 package io.v.impl.google.channel;
 
 import com.google.common.collect.AbstractIterator;
+import io.v.v23.VIterable;
+import io.v.v23.verror.VException;
 
 import java.io.EOFException;
 import java.util.Iterator;
 
 /**
- * An implementation of {@link Iterable} that reads data from an underlying Go channel.
+ * An implementation of {@link VIterable} that reads data from an underlying Go channel.
  */
-public class ChannelIterable<T> implements Iterable<T> {
-    private final long nativePtr;
-    private final long sourceNativePtr;
+class ChannelIterable<T> implements VIterable<T> {
+    private final long nativeValuePtr;
+    private final long nativeErrorPtr;
+    private final long nativeSourcePtr;
+    private volatile VException error;
 
-    private native Object nativeReadValue(long nativePtr) throws EOFException;
-    private native void nativeFinalize(long nativePtr, long sourceNativePtr);
+    private native Object nativeReadValue(long nativeValuePtr, long nativeErrorPtr)
+            throws EOFException, VException;
+    private native void nativeFinalize(
+            long nativeValuePtr, long nativeErrorPtr, long nativeSourcePtr);
 
-    private ChannelIterable(long nativePtr, long sourceNativePtr) {
-        this.nativePtr = nativePtr;
-        this.sourceNativePtr = sourceNativePtr;
+    private ChannelIterable(long nativeValuePtr, long nativeErrorPtr, long nativeSourcePtr) {
+        this.nativeValuePtr = nativeValuePtr;
+        this.nativeErrorPtr = nativeErrorPtr;
+        this.nativeSourcePtr = nativeSourcePtr;
     }
 
     @Override
@@ -33,31 +40,25 @@
                     return readValue();
                 } catch (EOFException e) {
                     return endOfData();
+                } catch (VException e) {
+                    error = e;
+                    return endOfData();
                 }
             }
         };
     }
 
-    private T readValue() throws EOFException {
-        return (T) nativeReadValue(this.nativePtr);
+    @Override
+    public VException error() {
+        return error;
+    }
+
+    private T readValue() throws EOFException, VException {
+        return (T) nativeReadValue(nativeValuePtr, nativeErrorPtr);
     }
 
     @Override
     protected void finalize() {
-        nativeFinalize(this.nativePtr, this.sourceNativePtr);
+        nativeFinalize(nativeValuePtr, nativeErrorPtr, nativeSourcePtr);
     }
-
-    /**
-     * Returns the native pointer to the Go channel of Java objects.
-     *
-     * @return the native pointer to the Go channel of Java objects
-     */
-    public long getNativePtr() { return this.nativePtr; }
-
-    /**
-     * Returns the native pointer to the Go channel that feeds the above Go channel of Java object.
-     *
-     * @return the native pointer to the Go channel that feeds the above Go channel of Java object
-     */
-    public long getSourceNativePtr() { return this.sourceNativePtr; }
 }
\ No newline at end of file
diff --git a/lib/src/main/java/io/v/impl/google/namespace/NamespaceImpl.java b/lib/src/main/java/io/v/impl/google/namespace/NamespaceImpl.java
index 5ceec7c..a2fae33 100644
--- a/lib/src/main/java/io/v/impl/google/namespace/NamespaceImpl.java
+++ b/lib/src/main/java/io/v/impl/google/namespace/NamespaceImpl.java
@@ -4,6 +4,7 @@
 
 package io.v.impl.google.namespace;
 
+import io.v.v23.VIterable;
 import io.v.v23.rpc.Callback;
 import org.joda.time.Duration;
 
@@ -25,11 +26,11 @@
 public class NamespaceImpl implements Namespace {
     private final long nativePtr;
 
-    private static native Iterable<GlobReply> nativeGlob(
+    private static native VIterable<GlobReply> nativeGlob(
             long nativePtr, VContext context, String pattern, Options options) throws VException;
 
     private static native void nativeGlobAsync(long nativePtr, VContext context, String pattern,
-                                               Options options, Callback<Iterable<GlobReply>>
+                                               Options options, Callback<VIterable<GlobReply>>
                                                        callback) throws VException;
 
     private static native void nativeMount(long nativePtr, VContext context, String name,
@@ -221,25 +222,25 @@
     }
 
     @Override
-    public Iterable<GlobReply> glob(VContext context, String pattern) throws VException {
+    public VIterable<GlobReply> glob(VContext context, String pattern) throws VException {
         return glob(context, pattern, (Options) null);
     }
 
     @Override
-    public Iterable<GlobReply> glob(VContext context, String pattern, Options options)
+    public VIterable<GlobReply> glob(VContext context, String pattern, Options options)
             throws VException {
         return nativeGlob(nativePtr, context, pattern, options);
     }
 
     @Override
-    public void glob(VContext context, String pattern, Callback<Iterable<GlobReply>> callback)
+    public void glob(VContext context, String pattern, Callback<VIterable<GlobReply>> callback)
             throws VException {
         glob(context, pattern, null, callback);
     }
 
     @Override
     public void glob(VContext context, String pattern, Options options,
-                     Callback<Iterable<GlobReply>> callback) throws VException {
+                     Callback<VIterable<GlobReply>> callback) throws VException {
         nativeGlobAsync(nativePtr, context, pattern, options, callback);
     }
 
diff --git a/lib/src/main/java/io/v/v23/VIterable.java b/lib/src/main/java/io/v/v23/VIterable.java
new file mode 100644
index 0000000..9e12368
--- /dev/null
+++ b/lib/src/main/java/io/v/v23/VIterable.java
@@ -0,0 +1,26 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package io.v.v23;
+
+import io.v.v23.verror.VException;
+
+/**
+ * An interface for iterating through a collection of elements.
+ * <p>
+ * The {@link java.util.Iterator} returned by the {@link java.lang.Iterable#iterator iterator}:
+ * <p><ul>
+ *     <li>can be created <strong>only</strong> once,
+ *     <li>does not support {@link java.util.Iterator#remove remove}</li>,
+ *     <li>terminates early if the underlying iteration step fails; in that case, {@link #error}
+ *         method returns the error that caused the iteration step to fail.
+ * </ul>
+ */
+public interface VIterable<T> extends Iterable<T> {
+    /**
+     * Returns an error (if any) that caused the iterator to terminate early.  Returns {@code null}
+     * if the iterator terminated gracefully.
+     */
+    VException error();
+}
diff --git a/lib/src/main/java/io/v/v23/namespace/Namespace.java b/lib/src/main/java/io/v/v23/namespace/Namespace.java
index 408436f..4ad4138 100644
--- a/lib/src/main/java/io/v/v23/namespace/Namespace.java
+++ b/lib/src/main/java/io/v/v23/namespace/Namespace.java
@@ -4,6 +4,7 @@
 
 package io.v.v23.namespace;
 
+import io.v.v23.VIterable;
 import io.v.v23.rpc.Callback;
 import org.joda.time.Duration;
 
@@ -248,7 +249,7 @@
      * A shortcut for {@link #glob(VContext, String, Options)} with a {@code null} options
      * parameter.
      */
-    Iterable<GlobReply> glob(VContext context, String pattern) throws VException;
+    VIterable<GlobReply> glob(VContext context, String pattern) throws VException;
 
     /**
      * Returns the iterator over all names matching the provided pattern. Note that due to the
@@ -266,6 +267,9 @@
      * <ul>
      *     <li>{@link io.v.v23.OptionDefs#SKIP_SERVER_ENDPOINT_AUTHORIZATION}</li>
      * </ul>
+     * <p>
+     * {@link io.v.v23.context.CancelableVContext#cancel Canceling} the provided context will
+     * stop the glob operation and terminate the iterator early.
      *
      * @param context a client context
      * @param pattern a pattern that should be matched
@@ -273,43 +277,22 @@
      * @return        an iterator over {@link GlobReply} objects matching the provided pattern
      * @throws VException if an error is encountered
      */
-    Iterable<GlobReply> glob(VContext context, String pattern, Options options)
-            throws VException;
+    VIterable<GlobReply> glob(VContext context, String pattern, Options options) throws VException;
 
     /**
      * A shortcut for {@link #glob(VContext, String, Options, Callback)} with a {@code null} options
      * parameter.
      */
-    void glob(VContext context, String pattern, Callback<Iterable<GlobReply>> callback)
+    void glob(VContext context, String pattern, Callback<VIterable<GlobReply>> callback)
             throws VException;
 
     /**
-     * Asynchronously returns the iterator over all names matching the provided pattern. This
-     * function returns immediately and the given non-{@code null} callback is called when the
-     * operation completes (either successfully or with a failure). Generally, the callback will
-     * be called when at least one entry can be read from the iterator. Subsequent calls to
-     * {@code next} may block. You should not use the iterator on threads that should not block.
-     * <p>
-     * You should be aware that the iterator:
-     * <p><ul>
-     *     <li>can be created <strong>only</strong> once</li>
-     *     <li>does not support {@link java.util.Iterator#remove remove}</li>
-     * </ul>
-     * <p>
-     * A particular implementation of this interface chooses which options to support, but at the
-     * minimum it must handle the following pre-defined options:
-     * <ul>
-     *     <li>{@link io.v.v23.OptionDefs#SKIP_SERVER_ENDPOINT_AUTHORIZATION}</li>
-     * </ul>
+     * Asynchronous version of {@link #glob(VContext, String, Options)}.
      *
-     * @param context a client context
-     * @param pattern a pattern that should be matched
-     * @param options options to pass to the implementation as described above, or {@code null}
-     * @param callback a callback whose {@code onSuccess} method will be passed the {@link Iterable}
-     *                 over {@link GlobReply} objects matching the provided pattern
-     * @throws VException if an error is encountered
+     * @throws VException if there was an error creating the asynchronous call. In this case, no
+     *                    methods on {@code callback} will be called.
      */
-    void glob(VContext context, String pattern, Options options, Callback<Iterable<GlobReply>>
+    void glob(VContext context, String pattern, Options options, Callback<VIterable<GlobReply>>
             callback) throws VException;
 
     /**
@@ -322,7 +305,8 @@
      *
      * @param roots the roots that will be used to turn relative paths into absolute paths, or
      *              {@code null} to clear the currently configured set of roots. Each entry should
-     *              be a Vanadium name, see also <a href="https://github.com/vanadium/docs/blob/master/glossary.md#object-name">
+     *              be a Vanadium name, see also
+     *              <a href="https://github.com/vanadium/docs/blob/master/glossary.md#object-name">
      *              the Name entry</a> in the glossary
      */
     void setRoots(List<String> roots) throws VException;
diff --git a/lib/src/main/java/io/v/v23/syncbase/nosql/BlobReader.java b/lib/src/main/java/io/v/v23/syncbase/nosql/BlobReader.java
index 1409bd3..edf4961 100644
--- a/lib/src/main/java/io/v/v23/syncbase/nosql/BlobReader.java
+++ b/lib/src/main/java/io/v/v23/syncbase/nosql/BlobReader.java
@@ -4,6 +4,7 @@
 
 package io.v.v23.syncbase.nosql;
 
+import io.v.v23.VIterable;
 import io.v.v23.context.VContext;
 import io.v.v23.services.syncbase.nosql.BlobFetchStatus;
 import io.v.v23.services.syncbase.nosql.BlobRef;
@@ -55,10 +56,13 @@
      * priority blobs are prefetched before the lower priority ones.  However an ongoing blob
      * transfer is not interrupted.
      * <p>
-     * The return {@link Stream} can be used to track the progress of the prefetch.  When the
-     * {@link Stream} exhausts all of the iterable elements, the blob is guaranteed to have
+     * The returned iterator can be used to track the progress of the prefetch.  When the
+     * iterator exhausts all of the iterable elements, the blob is guaranteed to have
      * been entirely copied to a local cache.
      * <p>
+     * {@link io.v.v23.context.CancelableVContext#cancel Canceling} the provided context will
+     * stop the prefetch and terminate the iterator early.
+     * <p>
      * This method doesn't block.
      *
      * @param ctx         vanadium context
@@ -66,7 +70,7 @@
      * @return            a stream used for tracking the progress of the prefetch
      * @throws VException if the blob couldn't be prefetched
      */
-    Stream<BlobFetchStatus> prefetch(VContext ctx, long priority) throws VException;
+    VIterable<BlobFetchStatus> prefetch(VContext ctx, long priority) throws VException;
 
     /**
      * Deletes the blob's local cached copy.
diff --git a/lib/src/main/java/io/v/v23/syncbase/nosql/BlobReaderImpl.java b/lib/src/main/java/io/v/v23/syncbase/nosql/BlobReaderImpl.java
index 63b69da..7323e39 100644
--- a/lib/src/main/java/io/v/v23/syncbase/nosql/BlobReaderImpl.java
+++ b/lib/src/main/java/io/v/v23/syncbase/nosql/BlobReaderImpl.java
@@ -4,11 +4,13 @@
 
 package io.v.v23.syncbase.nosql;
 
+import io.v.v23.VIterable;
 import io.v.v23.context.VContext;
 import io.v.v23.services.syncbase.nosql.BlobFetchStatus;
 import io.v.v23.services.syncbase.nosql.BlobRef;
 import io.v.v23.services.syncbase.nosql.DatabaseClient;
 import io.v.v23.vdl.TypedClientStream;
+import io.v.v23.vdl.TypedStreamIterable;
 import io.v.v23.vdl.VdlUint64;
 import io.v.v23.verror.VException;
 
@@ -35,10 +37,10 @@
         return new BlobInputStream(stream);
     }
     @Override
-    public Stream<BlobFetchStatus> prefetch(VContext ctx, long priority) throws VException {
+    public VIterable<BlobFetchStatus> prefetch(VContext ctx, long priority) throws VException {
         TypedClientStream<Void, BlobFetchStatus, Void> stream =
                 client.fetchBlob(ctx, ref, new VdlUint64(priority));
-        return new StreamImpl(ctx.withCancel(), stream);
+        return new TypedStreamIterable(stream);
     }
     @Override
     public long size(VContext ctx) throws VException {
diff --git a/lib/src/main/java/io/v/v23/syncbase/nosql/Database.java b/lib/src/main/java/io/v/v23/syncbase/nosql/Database.java
index 924a5cd..1a5ebb6 100644
--- a/lib/src/main/java/io/v/v23/syncbase/nosql/Database.java
+++ b/lib/src/main/java/io/v/v23/syncbase/nosql/Database.java
@@ -3,6 +3,7 @@
 // license that can be found in the LICENSE file.
 package io.v.v23.syncbase.nosql;
 
+import io.v.v23.VIterable;
 import io.v.v23.context.VContext;
 import io.v.v23.security.access.Permissions;
 import io.v.v23.services.syncbase.nosql.BatchOptions;
@@ -82,7 +83,7 @@
 
     /**
      * Allows a client to watch for updates to the database. For each watch request, the client will
-     * receive a reliable stream of watch events without re-ordering.
+     * receive a reliable iterator of watch events without re-ordering.
      * <p>
      * See {@link io.v.v23.services.watch.GlobWatcherClient} for a detailed explanation of the
      * watch behavior.
@@ -97,16 +98,18 @@
      * <li>abort the batch,</li>
      * <li>start watching for changes to the data using the {@link ResumeMarker}.</li>
      * </ol><p>
+     * {@link io.v.v23.context.CancelableVContext#cancel Canceling} the provided context will
+     * stop the watch operation and terminate the iterator early.
      *
      * @param ctx                 vanadium context
      * @param tableRelativeName   relative name of the table to watch
      * @param rowPrefix           prefix of the rows to watch
      * @param resumeMarker        {@link ResumeMarker} from which the changes will be monitored
-     * @return                    a stream of changes
+     * @return                    a (potentially-infinite) iterator of changes
      * @throws VException         if there was an error setting up this watch request
      */
-    Stream<WatchChange> watch(VContext ctx, String tableRelativeName, String rowPrefix,
-                              ResumeMarker resumeMarker) throws VException;
+    VIterable<WatchChange> watch(VContext ctx, String tableRelativeName, String rowPrefix,
+                                 ResumeMarker resumeMarker) throws VException;
 
     /**
      * Returns a handle to a database {@link Syncgroup} with the given full (i.e., object) name.
diff --git a/lib/src/main/java/io/v/v23/syncbase/nosql/DatabaseCore.java b/lib/src/main/java/io/v/v23/syncbase/nosql/DatabaseCore.java
index 1ef0b98..e5f465c 100644
--- a/lib/src/main/java/io/v/v23/syncbase/nosql/DatabaseCore.java
+++ b/lib/src/main/java/io/v/v23/syncbase/nosql/DatabaseCore.java
@@ -3,6 +3,7 @@
 // license that can be found in the LICENSE file.
 package io.v.v23.syncbase.nosql;
 
+import io.v.v23.VIterable;
 import io.v.v23.context.VContext;
 import io.v.v23.services.watch.ResumeMarker;
 import io.v.v23.vdl.VdlAny;
@@ -42,20 +43,24 @@
     String[] listTables(VContext ctx) throws VException;
 
     /**
-     * Executes a SyncQL query, returning a {@link ResultStream} object that allows the caller to
+     * Executes a SyncQL query, returning a {@link QueryResults} object that allows the caller to
      * iterate over arrays of values for each row that matches the query.
      * <p>
      * It is legal to perform writes concurrently with {@link #exec exec()}. The returned stream reads
      * from a consistent snapshot taken at the time of the method and will not reflect subsequent
      * writes to keys not yet reached by the stream.
+     * <p>
+     * {@link io.v.v23.context.CancelableVContext#cancel Canceling} the provided context will
+     * stop the query execution and terminate the returned iterator early.
+
      *
      * @param  ctx        Vanadium context
      * @param  query      a SyncQL query
-     * @return            a {@link ResultStream} object that allows the caller to iterate over
+     * @return            a {@link QueryResults} object that allows the caller to iterate over
      *                    arrays of values for each row that matches the query
      * @throws VException if there was an error executing the query
      */
-    ResultStream exec(VContext ctx, String query) throws VException;
+    QueryResults exec(VContext ctx, String query) throws VException;
 
     /**
      * Returns the {@link ResumeMarker} that points to the current state of the database.
@@ -68,7 +73,7 @@
      * An interface for iterating through rows resulting from a
      * {@link DatabaseCore#exec DatabaseCore.exec()}.
      */
-    interface ResultStream extends Stream<List<VdlAny>> {
+    interface QueryResults extends VIterable<List<VdlAny>> {
         /**
          * Returns an array of column names that matched the query.  The size of the {@link VdlAny}
          * list returned in every iteration will match the size of this array.
diff --git a/lib/src/main/java/io/v/v23/syncbase/nosql/DatabaseImpl.java b/lib/src/main/java/io/v/v23/syncbase/nosql/DatabaseImpl.java
index cd063f2..501680c 100644
--- a/lib/src/main/java/io/v/v23/syncbase/nosql/DatabaseImpl.java
+++ b/lib/src/main/java/io/v/v23/syncbase/nosql/DatabaseImpl.java
@@ -18,6 +18,7 @@
 
 import com.google.common.collect.Iterators;
 import io.v.impl.google.naming.NamingUtil;
+import io.v.v23.VIterable;
 import io.v.v23.services.syncbase.nosql.BatchOptions;
 import io.v.v23.services.syncbase.nosql.BlobRef;
 import io.v.v23.services.syncbase.nosql.DatabaseClient;
@@ -32,6 +33,7 @@
 import io.v.v23.context.VContext;
 import io.v.v23.security.access.Permissions;
 import io.v.v23.vdl.TypedClientStream;
+import io.v.v23.vdl.TypedStreamIterable;
 import io.v.v23.vdl.Types;
 import io.v.v23.vdl.VdlAny;
 import io.v.v23.vdl.VdlOptional;
@@ -77,10 +79,9 @@
         return x.toArray(new String[x.size()]);
     }
     @Override
-    public ResultStream exec(VContext ctx, String query) throws VException {
-        CancelableVContext ctxC = ctx.withCancel();
+    public QueryResults exec(VContext ctx, String query) throws VException {
         TypedClientStream<Void, List<VdlAny>, Void> stream =
-                this.client.exec(ctxC, getSchemaVersion(), query);
+                this.client.exec(ctx, getSchemaVersion(), query);
 
         // The first row contains column names, pull them off the stream.
         List<VdlAny> row = null;
@@ -99,7 +100,7 @@
                         "names (of type String), got type: " + elem.getClass());
             }
         }
-        return new ResultStreamImpl(ctxC, stream, Arrays.asList(columnNames));
+        return new QueryResultsImpl(stream, Arrays.asList(columnNames));
     }
 
     // Implements AccessController interface.
@@ -134,12 +135,11 @@
         return new DatabaseImpl(this.parentFullName, this.name, batchSuffix, this.schema);
     }
     @Override
-    public Stream<WatchChange> watch(VContext ctx, String tableRelativeName, String rowPrefix,
-                                     ResumeMarker resumeMarker) throws VException {
-        CancelableVContext ctxC = ctx.withCancel();
-        TypedClientStream<Void, Change, Void> stream = this.client.watchGlob(ctxC,
+    public VIterable<WatchChange> watch(VContext ctx, String tableRelativeName, String rowPrefix,
+                                        ResumeMarker resumeMarker) throws VException {
+        TypedClientStream<Void, Change, Void> stream = this.client.watchGlob(ctx,
                 new GlobRequest(NamingUtil.join(tableRelativeName, rowPrefix + "*"), resumeMarker));
-        return new StreamImpl(ctxC, stream) {
+        return new TypedStreamIterable(stream) {
             @Override
             public synchronized Iterator iterator() {
                 return Iterators.transform(super.iterator(), new Function<Change, WatchChange>() {
@@ -249,12 +249,13 @@
         return this.schema.getMetadata().getVersion();
     }
 
-    private static class ResultStreamImpl extends StreamImpl<List<VdlAny>> implements ResultStream {
+    private static class QueryResultsImpl
+            extends TypedStreamIterable<List<VdlAny>> implements QueryResults {
         private final List<String> columnNames;
 
-        private ResultStreamImpl(CancelableVContext ctxC, TypedClientStream<Void,
-                List<VdlAny>, Void> stream, List<String> columnNames) {
-            super(ctxC, stream);
+        private QueryResultsImpl(TypedClientStream<Void, List<VdlAny>, Void> stream,
+                            List<String> columnNames) {
+            super(stream);
             this.columnNames = columnNames;
         }
         @Override
diff --git a/lib/src/main/java/io/v/v23/syncbase/nosql/Stream.java b/lib/src/main/java/io/v/v23/syncbase/nosql/Stream.java
deleted file mode 100644
index 4429170..0000000
--- a/lib/src/main/java/io/v/v23/syncbase/nosql/Stream.java
+++ /dev/null
@@ -1,32 +0,0 @@
-// Copyright 2015 The Vanadium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-package io.v.v23.syncbase.nosql;
-
-import io.v.v23.verror.VException;
-
-/**
- * An interface for iterating through a collection of elements.
- *
- * The {@link java.util.Iterator} returned by the {@link java.lang.Iterable#iterator iterator}:
- * <p><ul>
- *     <li>can be created <strong>only</strong> once,
- *     <li>does not support {@link java.util.Iterator#remove remove}</li>,
- *     <li>stops the iteration early and gracefully if the {@link #cancel} method is invoked,</li>
- *     <li>may throw {@link RuntimeException} if the underlying iteration step throws
- *         a {@link VException}.  The {@link RuntimeException#getCause cause} of the
- *         {@link RuntimeException} will be the said {@link VException}.</li>
- * </ul>
- */
-public interface Stream<T> extends Iterable<T> {
-    /**
-     * Notifies the stream provider that it can stop producing elements.  The client must call
-     * {@link #cancel} if it does not iterate through all the elements.
-     * <p>
-     * This method is idempotent and can be called concurrently with a thread that is iterating.
-     * <p>
-     * This method causes the iterator to (gracefully) terminate early.
-     */
-    void cancel() throws VException;
-}
diff --git a/lib/src/main/java/io/v/v23/syncbase/nosql/StreamImpl.java b/lib/src/main/java/io/v/v23/syncbase/nosql/StreamImpl.java
deleted file mode 100644
index f3a7ede..0000000
--- a/lib/src/main/java/io/v/v23/syncbase/nosql/StreamImpl.java
+++ /dev/null
@@ -1,61 +0,0 @@
-// Copyright 2015 The Vanadium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-package io.v.v23.syncbase.nosql;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.AbstractIterator;
-import io.v.v23.context.CancelableVContext;
-import io.v.v23.vdl.TypedClientStream;
-import io.v.v23.verror.Errors;
-import io.v.v23.verror.VException;
-
-import java.io.EOFException;
-import java.util.Iterator;
-
-/**
- * Implementation of the {@link Stream} interface that reads from a VDL stream.
- */
-class StreamImpl<T> implements Stream<T> {
-    private final CancelableVContext ctxC;
-    private final TypedClientStream<Void, T, Void> stream;
-    private volatile boolean isCanceled;
-    private volatile boolean isCreated;
-
-    StreamImpl(CancelableVContext ctxC, TypedClientStream<Void, T, Void> stream) {
-        this.ctxC = ctxC;
-        this.stream = stream;
-        this.isCanceled = this.isCreated = false;
-    }
-    @Override
-    public synchronized Iterator<T> iterator() {
-        Preconditions.checkState(!isCreated, "Can only create one Stream iterator.");
-        isCreated = true;
-        return new AbstractIterator<T>() {
-            @Override
-            protected T computeNext() {
-                synchronized (StreamImpl.this) {
-                    if (isCanceled) {  // client canceled the stream
-                        return endOfData();
-                    }
-                    try {
-                        return stream.recv();
-                    } catch (EOFException e) {  // legitimate end of stream
-                        return endOfData();
-                    } catch (VException e) {
-                        if (isCanceled || Errors.CANCELED.getID().equals(e.getID())) {
-                            return endOfData();
-                        }
-                        throw new RuntimeException("Error retrieving next stream element.", e);
-                    }
-                }
-            }
-        };
-    }
-    @Override
-    public synchronized void cancel() throws VException {
-        this.isCanceled = true;
-        this.ctxC.cancel();
-    }
-}
diff --git a/lib/src/main/java/io/v/v23/syncbase/nosql/Table.java b/lib/src/main/java/io/v/v23/syncbase/nosql/Table.java
index 53dc329..dedbe9b 100644
--- a/lib/src/main/java/io/v/v23/syncbase/nosql/Table.java
+++ b/lib/src/main/java/io/v/v23/syncbase/nosql/Table.java
@@ -6,6 +6,7 @@
 
 import java.lang.reflect.Type;
 
+import io.v.v23.VIterable;
 import io.v.v23.context.VContext;
 import io.v.v23.security.access.Permissions;
 import io.v.v23.services.syncbase.nosql.KeyValue;
@@ -131,13 +132,16 @@
      * It is legal to perform writes concurrently with {@link #scan scan()}. The returned stream
      * reads from a consistent snapshot taken at the time of the method and will not reflect
      * subsequent writes to keys not yet reached by the stream.
+     * <p>
+     * {@link io.v.v23.context.CancelableVContext#cancel Canceling} the provided context will
+     * stop the scan and terminate the iterator early.
      *
      * @param  ctx         Vanadium context
      * @param  range       range of rows to be read
      * @return             a stream used for iterating over the snapshot of the provided rows
      * @throws VException  if the scan stream couldn't be created
      */
-    Stream<KeyValue> scan(VContext ctx, RowRange range) throws VException;
+    VIterable<KeyValue> scan(VContext ctx, RowRange range) throws VException;
 
     /**
      * Returns an array of {@link PrefixPermissions} (i.e., {@code (prefix, perms)} pairs) for
diff --git a/lib/src/main/java/io/v/v23/syncbase/nosql/TableImpl.java b/lib/src/main/java/io/v/v23/syncbase/nosql/TableImpl.java
index 1f08a27..0e5fc67 100644
--- a/lib/src/main/java/io/v/v23/syncbase/nosql/TableImpl.java
+++ b/lib/src/main/java/io/v/v23/syncbase/nosql/TableImpl.java
@@ -8,6 +8,7 @@
 import java.util.List;
 
 import io.v.impl.google.naming.NamingUtil;
+import io.v.v23.VIterable;
 import io.v.v23.services.syncbase.nosql.KeyValue;
 import io.v.v23.services.syncbase.nosql.PrefixPermissions;
 import io.v.v23.services.syncbase.nosql.TableClient;
@@ -17,6 +18,7 @@
 import io.v.v23.context.VContext;
 import io.v.v23.security.access.Permissions;
 import io.v.v23.vdl.TypedClientStream;
+import io.v.v23.vdl.TypedStreamIterable;
 import io.v.v23.verror.VException;
 
 class TableImpl implements Table {
@@ -86,11 +88,10 @@
                 Util.getBytes(range.getStart()), Util.getBytes(range.getLimit()));
     }
     @Override
-    public Stream<KeyValue> scan(VContext ctx, RowRange range) throws VException {
-        CancelableVContext ctxC = ctx.withCancel();
-        TypedClientStream<Void, KeyValue, Void> stream = this.client.scan(ctxC, this.schemaVersion,
+    public VIterable<KeyValue> scan(VContext ctx, RowRange range) throws VException {
+        TypedClientStream<Void, KeyValue, Void> stream = this.client.scan(ctx, this.schemaVersion,
                 Util.getBytes(range.getStart()), Util.getBytes(range.getLimit()));
-        return new StreamImpl(ctxC, stream);
+        return new TypedStreamIterable(stream);
     }
     @Override
     public PrefixPermissions[] getPrefixPermissions(VContext ctx, String key) throws VException {
diff --git a/lib/src/main/java/io/v/v23/vdl/TypedStreamIterable.java b/lib/src/main/java/io/v/v23/vdl/TypedStreamIterable.java
new file mode 100644
index 0000000..9dbcb61
--- /dev/null
+++ b/lib/src/main/java/io/v/v23/vdl/TypedStreamIterable.java
@@ -0,0 +1,54 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package io.v.v23.vdl;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.AbstractIterator;
+import io.v.v23.VIterable;
+import io.v.v23.verror.Errors;
+import io.v.v23.verror.VException;
+
+import java.io.EOFException;
+import java.util.Iterator;
+
+/**
+ * Implementation of {@link VIterable} that reads from a {@link TypedClientStream}.
+ */
+public class TypedStreamIterable<T> implements VIterable<T> {
+    private final TypedClientStream<Void, T, Void> stream;
+    private volatile boolean isCreated;
+    protected VException error;
+
+    public TypedStreamIterable(TypedClientStream<Void, T, Void> stream) {
+        this.stream = stream;
+    }
+    @Override
+    public synchronized Iterator<T> iterator() {
+        Preconditions.checkState(!isCreated, "Can only create one Stream iterator.");
+        isCreated = true;
+        return new AbstractIterator<T>() {
+            @Override
+            protected T computeNext() {
+                synchronized (TypedStreamIterable.this) {
+                    try {
+                        return stream.recv();
+                    } catch (EOFException e) {  // legitimate end of stream
+                        return endOfData();
+                    } catch (VException e) {
+                        if (!Errors.CANCELED.getID().equals(e.getID())) {  // stream not canceled
+                            error = e;
+                        }
+                        return endOfData();
+                    }
+                }
+            }
+        };
+    }
+
+    @Override
+    public VException error() {
+        return error;
+    }
+}
diff --git a/lib/src/test/java/io/v/impl/google/namespace/NamespaceTest.java b/lib/src/test/java/io/v/impl/google/namespace/NamespaceTest.java
index 91f5c80..cee1c9e 100644
--- a/lib/src/test/java/io/v/impl/google/namespace/NamespaceTest.java
+++ b/lib/src/test/java/io/v/impl/google/namespace/NamespaceTest.java
@@ -12,6 +12,7 @@
 import com.google.common.util.concurrent.SettableFuture;
 import com.google.common.util.concurrent.Uninterruptibles;
 
+import io.v.v23.VIterable;
 import junit.framework.TestCase;
 
 import org.joda.time.Duration;
@@ -190,7 +191,7 @@
         Namespace n = V.getNamespace(ctx);
         n.mount(ctx, "test/test", dummyServerEndpoint.name(), Duration.standardDays(1));
         {
-            final FutureCallback<Iterable<GlobReply>> callback = new FutureCallback<>();
+            final FutureCallback<VIterable<GlobReply>> callback = new FutureCallback<>();
             n.glob(ctx, "test/*", callback);
 
             List<GlobReply> reply = ImmutableList.copyOf(
@@ -207,7 +208,7 @@
         n.mount(ctx, "test/test", dummyServerEndpoint.name(), Duration.standardDays(1));
 
         CancelableVContext cancelContext = ctx.withCancel();
-        FutureCallback<Iterable<GlobReply>> callback = new FutureCallback<>();
+        FutureCallback<VIterable<GlobReply>> callback = new FutureCallback<>();
         n.glob(cancelContext, "test/*", callback);
         cancelContext.cancel();
         List<GlobReply> replies = ImmutableList.copyOf(Uninterruptibles.getUninterruptibly(callback.getFuture(), 1, TimeUnit.SECONDS));
diff --git a/lib/src/test/java/io/v/v23/syncbase/SyncbaseTest.java b/lib/src/test/java/io/v/v23/syncbase/SyncbaseTest.java
index 9da28a8..b9ee6a3 100644
--- a/lib/src/test/java/io/v/v23/syncbase/SyncbaseTest.java
+++ b/lib/src/test/java/io/v/v23/syncbase/SyncbaseTest.java
@@ -10,6 +10,7 @@
 import com.google.common.io.Files;
 import io.v.impl.google.naming.NamingUtil;
 import io.v.impl.google.services.syncbase.SyncbaseServer;
+import io.v.v23.VIterable;
 import io.v.v23.context.CancelableVContext;
 import io.v.v23.naming.Endpoint;
 import io.v.v23.rpc.ListenSpec;
@@ -19,6 +20,7 @@
 import io.v.v23.services.syncbase.nosql.SyncgroupMemberInfo;
 import io.v.v23.services.syncbase.nosql.TableRow;
 import io.v.v23.services.syncbase.nosql.SyncgroupSpec;
+import io.v.v23.services.watch.ResumeMarker;
 import io.v.v23.syncbase.nosql.BatchDatabase;
 import io.v.v23.syncbase.nosql.BlobReader;
 import io.v.v23.syncbase.nosql.BlobWriter;
@@ -27,7 +29,6 @@
 import io.v.v23.syncbase.nosql.DatabaseCore;
 import io.v.v23.syncbase.nosql.Row;
 import io.v.v23.syncbase.nosql.RowRange;
-import io.v.v23.syncbase.nosql.Stream;
 import io.v.v23.syncbase.nosql.Syncgroup;
 import io.v.v23.syncbase.nosql.Table;
 import io.v.v23.syncbase.util.Util;
@@ -211,16 +212,16 @@
         table.put(ctx, "baz", baz, Baz.class);
 
         {
-            DatabaseCore.ResultStream stream = db.exec(ctx,
+            DatabaseCore.QueryResults results = db.exec(ctx,
                     "select k, v.Name from " + TABLE_NAME + " where Type(v) like \"%Baz\"");
-            assertThat(stream.columnNames()).containsExactly("k", "v.Name");
-            assertThat(stream).containsExactly(ImmutableList.of(
+            assertThat(results.columnNames()).containsExactly("k", "v.Name");
+            assertThat(results).containsExactly(ImmutableList.of(
                     new VdlAny(String.class, "baz"), new VdlAny(String.class, baz.name)));
         }
         {
-            DatabaseCore.ResultStream stream = db.exec(ctx, "select k, v from " + TABLE_NAME);
-            assertThat(stream.columnNames()).containsExactly("k", "v");
-            assertThat(stream).containsExactly(
+            DatabaseCore.QueryResults results = db.exec(ctx, "select k, v from " + TABLE_NAME);
+            assertThat(results.columnNames()).containsExactly("k", "v");
+            assertThat(results).containsExactly(
                     ImmutableList.of(new VdlAny(String.class, "bar"), new VdlAny(Bar.class, bar)),
                     ImmutableList.of(new VdlAny(String.class, "baz"), new VdlAny(Baz.class, baz)),
                     ImmutableList.of(new VdlAny(String.class, "foo"), new VdlAny(Foo.class, foo))
@@ -234,8 +235,8 @@
         Foo foo = new Foo(4, "f");
         Bar bar = new Bar(0.5f, "b");
         Baz baz = new Baz("John Doe", true);
+        ResumeMarker marker = db.getResumeMarker(ctx);
 
-        Stream<WatchChange> watchStream = db.watch(ctx, TABLE_NAME, "b", db.getResumeMarker(ctx));
         table.put(ctx, "foo", foo, Foo.class);
         table.put(ctx, "bar", bar, Bar.class);
         table.put(ctx, "baz", baz, Baz.class);
@@ -248,10 +249,11 @@
                         VomUtil.encode(baz, Baz.class), null, false, false),
                 new WatchChange(TABLE_NAME, "baz", ChangeType.DELETE_CHANGE,
                         new byte[0], null, false, false ));
-        Iterator<WatchChange> iterator = watchStream.iterator();
+        CancelableVContext ctxC = ctx.withCancel();
+        Iterator<WatchChange> it = db.watch(ctxC, TABLE_NAME, "b", marker).iterator();
         for (WatchChange expected : expectedChanges) {
-            assertThat(iterator.hasNext());
-            WatchChange actual = iterator.next();
+            assertThat(it.hasNext());
+            WatchChange actual = it.next();
             assertThat(actual.getTableName()).isEqualTo(expected.getTableName());
             assertThat(actual.getRowName()).isEqualTo(expected.getRowName());
             assertThat(actual.getChangeType()).isEqualTo(expected.getChangeType());
@@ -259,7 +261,7 @@
             assertThat(actual.isFromSync()).isEqualTo(expected.isFromSync());
             assertThat(actual.isContinued()).isEqualTo(expected.isContinued());
         }
-        watchStream.cancel();
+        ctxC.cancel();
     }
 
     public void testDatabaseWatchWithContextCancel() throws Exception {
@@ -267,7 +269,7 @@
         Database db = createDatabase(createApp(createService()));
         createTable(db);
 
-        Stream<WatchChange> watchStream = db.watch(
+        VIterable<WatchChange> it = db.watch(
                 cancelCtx, TABLE_NAME, "b", db.getResumeMarker(ctx));
         new Thread(new Runnable() {
             @Override
@@ -275,7 +277,7 @@
                 cancelCtx.cancel();
             }
         }).start();
-        assertThat(watchStream).isEmpty();
+        assertThat(it).isEmpty();
     }
 
     public void testBatch() throws Exception {
diff --git a/projects/namespace_browser/src/main/java/io/v/android/apps/namespace_browser/Namespace.java b/projects/namespace_browser/src/main/java/io/v/android/apps/namespace_browser/Namespace.java
index e090389..005e860 100644
--- a/projects/namespace_browser/src/main/java/io/v/android/apps/namespace_browser/Namespace.java
+++ b/projects/namespace_browser/src/main/java/io/v/android/apps/namespace_browser/Namespace.java
@@ -29,7 +29,6 @@
     public static List<GlobReply> glob(String root, VContext ctx) throws VException {
         io.v.v23.namespace.Namespace n = V.getNamespace(ctx);
         VContext ctxT = ctx.withTimeout(new Duration(20000));  // 20s
-        Iterable<GlobReply> chan = n.glob(ctxT, root.isEmpty() ? "*" : root + "/*");
-        return ImmutableList.copyOf(chan);
+        return ImmutableList.copyOf(n.glob(ctxT, root.isEmpty() ? "*" : root + "/*"));
     }
 }
diff --git a/projects/syncslides/app/src/main/java/io/v/android/apps/syncslides/db/CurrentSlideWatcher.java b/projects/syncslides/app/src/main/java/io/v/android/apps/syncslides/db/CurrentSlideWatcher.java
index beab3d1..675c086 100644
--- a/projects/syncslides/app/src/main/java/io/v/android/apps/syncslides/db/CurrentSlideWatcher.java
+++ b/projects/syncslides/app/src/main/java/io/v/android/apps/syncslides/db/CurrentSlideWatcher.java
@@ -13,12 +13,12 @@
 import java.util.List;
 
 import io.v.impl.google.naming.NamingUtil;
+import io.v.v23.VIterable;
 import io.v.v23.context.CancelableVContext;
 import io.v.v23.context.VContext;
 import io.v.v23.syncbase.nosql.BatchDatabase;
 import io.v.v23.syncbase.nosql.ChangeType;
 import io.v.v23.syncbase.nosql.Database;
-import io.v.v23.syncbase.nosql.Stream;
 import io.v.v23.syncbase.nosql.Table;
 import io.v.v23.syncbase.nosql.WatchChange;
 import io.v.v23.verror.VException;
@@ -122,10 +122,10 @@
                     }
                 });
             }
-            Stream<WatchChange> changeStream;
-            changeStream = mDB.watch(mVContext, SyncbaseDB.PRESENTATIONS_TABLE, rowKey,
+            VIterable<WatchChange> changes =
+                    mDB.watch(mVContext, SyncbaseDB.PRESENTATIONS_TABLE, rowKey,
                     batch.getResumeMarker(mVContext));
-            for (WatchChange change : changeStream) {
+            for (WatchChange change : changes) {
                 if (!change.getTableName().equals(SyncbaseDB.PRESENTATIONS_TABLE)) {
                     Log.e(TAG, "Wrong change table name: " + change.getTableName() + ", wanted: " +
                             SyncbaseDB.PRESENTATIONS_TABLE);
diff --git a/projects/syncslides/app/src/main/java/io/v/android/apps/syncslides/db/DriverWatcher.java b/projects/syncslides/app/src/main/java/io/v/android/apps/syncslides/db/DriverWatcher.java
index 0e29070..4fb43da 100644
--- a/projects/syncslides/app/src/main/java/io/v/android/apps/syncslides/db/DriverWatcher.java
+++ b/projects/syncslides/app/src/main/java/io/v/android/apps/syncslides/db/DriverWatcher.java
@@ -7,9 +7,9 @@
 import android.util.Log;
 
 import io.v.impl.google.naming.NamingUtil;
+import io.v.v23.VIterable;
 import io.v.v23.syncbase.nosql.BatchDatabase;
 import io.v.v23.syncbase.nosql.ChangeType;
-import io.v.v23.syncbase.nosql.Stream;
 import io.v.v23.syncbase.nosql.Table;
 import io.v.v23.syncbase.nosql.WatchChange;
 import io.v.v23.verror.VException;
@@ -67,10 +67,10 @@
                 postInUiThread(presentation.getDriver().getElem());
             }
 
-            Stream<WatchChange> watch = mState.db.watch(
+            VIterable<WatchChange> changes = mState.db.watch(
                     mState.vContext, SyncbaseDB.PRESENTATIONS_TABLE, row,
                     batch.getResumeMarker(mState.vContext));
-            for (WatchChange change : watch) {
+            for (WatchChange change : changes) {
                 Log.i(TAG, "Found change " + change.getChangeType());
                 if (!change.getRowName().equals(row)) {
                     continue;
diff --git a/projects/syncslides/app/src/main/java/io/v/android/apps/syncslides/db/QuestionWatcher.java b/projects/syncslides/app/src/main/java/io/v/android/apps/syncslides/db/QuestionWatcher.java
index 2c82486..ce929be 100644
--- a/projects/syncslides/app/src/main/java/io/v/android/apps/syncslides/db/QuestionWatcher.java
+++ b/projects/syncslides/app/src/main/java/io/v/android/apps/syncslides/db/QuestionWatcher.java
@@ -14,11 +14,11 @@
 
 import io.v.android.apps.syncslides.model.Question;
 import io.v.impl.google.naming.NamingUtil;
+import io.v.v23.VIterable;
 import io.v.v23.services.syncbase.nosql.KeyValue;
 import io.v.v23.syncbase.nosql.BatchDatabase;
 import io.v.v23.syncbase.nosql.ChangeType;
 import io.v.v23.syncbase.nosql.RowRange;
-import io.v.v23.syncbase.nosql.Stream;
 import io.v.v23.syncbase.nosql.Table;
 import io.v.v23.syncbase.nosql.WatchChange;
 import io.v.v23.verror.VException;
@@ -73,14 +73,14 @@
             Log.i(TAG, "Watching questions: " + prefix);
             BatchDatabase batch = mState.db.beginBatch(mState.vContext, null);
             Table presentations = batch.getTable(SyncbaseDB.PRESENTATIONS_TABLE);
-            Stream<KeyValue> stream = presentations.scan(mState.vContext, RowRange.prefix(prefix));
-            for (KeyValue keyValue : stream) {
-                VQuestion value = (VQuestion) VomUtil.decode(keyValue.getValue(), VQuestion.class);
+            VIterable<KeyValue> rows = presentations.scan(mState.vContext, RowRange.prefix(prefix));
+            for (KeyValue row : rows) {
+                VQuestion value = (VQuestion) VomUtil.decode(row.getValue(), VQuestion.class);
                 if (value.getAnswered()) {
                     continue;
                 }
                 final Question question = new Question(
-                        lastPart(keyValue.getKey()),
+                        lastPart(row.getKey()),
                         value.getQuestioner().getName(),
                         value.getTime());
                 mState.handler.post(new Runnable() {
@@ -91,10 +91,10 @@
                 });
             }
 
-            Stream<WatchChange> watch = mState.db.watch(
+            VIterable<WatchChange> changes = mState.db.watch(
                     mState.vContext, SyncbaseDB.PRESENTATIONS_TABLE, prefix,
                     batch.getResumeMarker(mState.vContext));
-            for (WatchChange change : watch) {
+            for (WatchChange change : changes) {
                 Log.i(TAG, "Found change " + change.getChangeType());
                 final String id = lastPart(change.getRowName());
                 if (change.getChangeType().equals(ChangeType.PUT_CHANGE)) {
diff --git a/projects/syncslides/app/src/main/java/io/v/android/apps/syncslides/db/SyncbaseDB.java b/projects/syncslides/app/src/main/java/io/v/android/apps/syncslides/db/SyncbaseDB.java
index a8a8039..de35dc9 100644
--- a/projects/syncslides/app/src/main/java/io/v/android/apps/syncslides/db/SyncbaseDB.java
+++ b/projects/syncslides/app/src/main/java/io/v/android/apps/syncslides/db/SyncbaseDB.java
@@ -19,6 +19,7 @@
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
+import io.v.v23.VIterable;
 import org.joda.time.Duration;
 
 import java.io.ByteArrayOutputStream;
@@ -61,7 +62,6 @@
 import io.v.v23.syncbase.nosql.Database;
 import io.v.v23.syncbase.nosql.DatabaseCore;
 import io.v.v23.syncbase.nosql.RowRange;
-import io.v.v23.syncbase.nosql.Stream;
 import io.v.v23.syncbase.nosql.Syncgroup;
 import io.v.v23.syncbase.nosql.Table;
 import io.v.v23.syncbase.nosql.WatchChange;
@@ -189,7 +189,7 @@
             if (!mPresentations.exists(mVContext)) {
                 mPresentations.create(mVContext, mPermissions);
             }
-            //importDecks();
+            importDecks();
         } catch (VException e) {
             handleError("Couldn't setup syncbase service: " + e.getMessage());
             return;
@@ -369,9 +369,9 @@
             try {
                 BatchDatabase batch = mDB.beginBatch(mVContext, null);
                 mWatchMarker = batch.getResumeMarker(mVContext);
-                DatabaseCore.ResultStream stream = batch.exec(mVContext,
+                DatabaseCore.QueryResults results = batch.exec(mVContext,
                         "SELECT k, v FROM Decks WHERE Type(v) like \"%VDeck\"");
-                for (List<VdlAny> row : stream) {
+                for (List<VdlAny> row : results) {
                     if (row.size() != 2) {
                         throw new VException("Wrong number of columns: " + row.size());
                     }
@@ -385,6 +385,9 @@
                         }
                     });
                 }
+                if (results.error() != null) {
+                    Log.e(TAG, "Couldn't get all decks due to error: " + results.error());
+                }
                 watchForDeckChanges();
             } catch (VException e) {
                 Log.e(TAG, e.toString());
@@ -392,14 +395,14 @@
         }
 
         private void watchForDeckChanges() {
-            Stream<WatchChange> changeStream = null;
+            VIterable<WatchChange> changes = null;
             try {
-                changeStream = mDB.watch(mVContext, DECKS_TABLE, "", mWatchMarker);
+                changes = mDB.watch(mVContext, DECKS_TABLE, "", mWatchMarker);
             } catch (VException e) {
                 Log.e(TAG, "Couldn't watch for changes to the Decks table: " + e.toString());
                 return;
             }
-            for (WatchChange change : changeStream) {
+            for (WatchChange change : changes) {
                 if (!change.getTableName().equals(DECKS_TABLE)) {
                     Log.e(TAG, "Wrong change table name: " + change.getTableName() + ", wanted: " +
                             DECKS_TABLE);
@@ -437,6 +440,9 @@
                     });
                 }
             }
+            if (changes.error() != null) {
+                Log.e(TAG, "Deck change thread exited early due to error: " + changes.error());
+            }
             Log.i(TAG, "Deck change thread exiting");
         }
 
@@ -538,11 +544,11 @@
 
                     String query = "SELECT k, v FROM Decks WHERE Type(v) LIKE \"%VSlide\" " +
                             "AND k LIKE \"" + NamingUtil.join(deckId, "slides") + "%\"";
-                    DatabaseCore.ResultStream stream = batch.exec(mVContext, query);
+                    DatabaseCore.QueryResults results = batch.exec(mVContext, query);
                     // TODO(kash): Abort execution if interrupted.  Perhaps we should derive
                     // a new VContext so it can be cancelled.
                     final List<Slide> slides = Lists.newArrayList();
-                    for (List<VdlAny> row : stream) {
+                    for (List<VdlAny> row : results) {
                         if (row.size() != 2) {
                             throw new VException("Wrong number of columns: " + row.size());
                         }
@@ -552,6 +558,9 @@
                         String note = notesForSlide(mVContext, table, key);
                         slides.add(new SlideImpl(slide.getThumbnail(), note));
                     }
+                    if (results.error() != null) {
+                        Log.e(TAG, "Couldn't get all slides due to error: " + results.error());
+                    }
                     Handler handler = new Handler(Looper.getMainLooper());
                     handler.post(new Runnable() {
                         @Override
@@ -616,8 +625,8 @@
 
                 String query = "SELECT k, v FROM Decks WHERE Type(v) LIKE \"%VSlide\" " +
                         "AND k LIKE \"" + NamingUtil.join(mDeckId, "slides") + "%\"";
-                DatabaseCore.ResultStream stream = batch.exec(mVContext, query);
-                for (List<VdlAny> row : stream) {
+                DatabaseCore.QueryResults results = batch.exec(mVContext, query);
+                for (List<VdlAny> row : results) {
                     if (row.size() != 2) {
                         throw new VException("Wrong number of columns: " + row.size());
                     }
@@ -633,6 +642,9 @@
                         }
                     });
                 }
+                if (results.error() != null) {
+                    Log.e(TAG, "Couldn't get all slides due to error: " + results.error());
+                }
                 watchForSlideChanges();
             } catch (VException e) {
                 Log.e(TAG, e.toString());
@@ -641,15 +653,15 @@
 
         private void watchForSlideChanges() {
             Table notesTable = mDB.getTable(NOTES_TABLE);
-            Stream<WatchChange> changeStream;
+            VIterable<WatchChange> changes;
             try {
-                changeStream = mDB.watch(mVContext, DECKS_TABLE, "", mWatchMarker);
+                changes = mDB.watch(mVContext, DECKS_TABLE, "", mWatchMarker);
             } catch (VException e) {
                 Log.e(TAG, "Couldn't watch for changes to the Decks table: " + e.toString());
                 return;
             }
 
-            for (WatchChange change : changeStream) {
+            for (WatchChange change : changes) {
                 if (!change.getTableName().equals(DECKS_TABLE)) {
                     Log.e(TAG, "Wrong change table name: " + change.getTableName() + ", wanted: " +
                             DECKS_TABLE);
@@ -687,7 +699,10 @@
                     });
                 }
             }
-            Log.i(TAG, "Slides watcher thread exiting");
+            if (changes.error() != null) {
+                Log.e(TAG, "Slides change thread exited early due to error: " + changes.error());
+            }
+            Log.i(TAG, "Slides change thread exiting");
         }
 
         @Override