Java: add a common iterator class for various iterators we use
Change the iterator behavior to never throw a RuntimeException
but to terminate early instead, reporting any errors encountered
via a separate method.
Also unified syncbase Stream and iterator. The end result is that
the only way to close the syncbase Stream is to cancel the context.
MultiPart: 1/2
Change-Id: I7f49270f69aa805761a7cd14a0cda4e8d94e4ea7
diff --git a/lib/src/main/java/io/v/impl/google/channel/ChannelIterable.java b/lib/src/main/java/io/v/impl/google/channel/ChannelIterable.java
index 309272f..36a7655 100644
--- a/lib/src/main/java/io/v/impl/google/channel/ChannelIterable.java
+++ b/lib/src/main/java/io/v/impl/google/channel/ChannelIterable.java
@@ -5,23 +5,30 @@
package io.v.impl.google.channel;
import com.google.common.collect.AbstractIterator;
+import io.v.v23.VIterable;
+import io.v.v23.verror.VException;
import java.io.EOFException;
import java.util.Iterator;
/**
- * An implementation of {@link Iterable} that reads data from an underlying Go channel.
+ * An implementation of {@link VIterable} that reads data from an underlying Go channel.
*/
-public class ChannelIterable<T> implements Iterable<T> {
- private final long nativePtr;
- private final long sourceNativePtr;
+class ChannelIterable<T> implements VIterable<T> {
+ private final long nativeValuePtr;
+ private final long nativeErrorPtr;
+ private final long nativeSourcePtr;
+ private volatile VException error;
- private native Object nativeReadValue(long nativePtr) throws EOFException;
- private native void nativeFinalize(long nativePtr, long sourceNativePtr);
+ private native Object nativeReadValue(long nativeValuePtr, long nativeErrorPtr)
+ throws EOFException, VException;
+ private native void nativeFinalize(
+ long nativeValuePtr, long nativeErrorPtr, long nativeSourcePtr);
- private ChannelIterable(long nativePtr, long sourceNativePtr) {
- this.nativePtr = nativePtr;
- this.sourceNativePtr = sourceNativePtr;
+ private ChannelIterable(long nativeValuePtr, long nativeErrorPtr, long nativeSourcePtr) {
+ this.nativeValuePtr = nativeValuePtr;
+ this.nativeErrorPtr = nativeErrorPtr;
+ this.nativeSourcePtr = nativeSourcePtr;
}
@Override
@@ -33,31 +40,25 @@
return readValue();
} catch (EOFException e) {
return endOfData();
+ } catch (VException e) {
+ error = e;
+ return endOfData();
}
}
};
}
- private T readValue() throws EOFException {
- return (T) nativeReadValue(this.nativePtr);
+ @Override
+ public VException error() {
+ return error;
+ }
+
+ private T readValue() throws EOFException, VException {
+ return (T) nativeReadValue(nativeValuePtr, nativeErrorPtr);
}
@Override
protected void finalize() {
- nativeFinalize(this.nativePtr, this.sourceNativePtr);
+ nativeFinalize(nativeValuePtr, nativeErrorPtr, nativeSourcePtr);
}
-
- /**
- * Returns the native pointer to the Go channel of Java objects.
- *
- * @return the native pointer to the Go channel of Java objects
- */
- public long getNativePtr() { return this.nativePtr; }
-
- /**
- * Returns the native pointer to the Go channel that feeds the above Go channel of Java object.
- *
- * @return the native pointer to the Go channel that feeds the above Go channel of Java object
- */
- public long getSourceNativePtr() { return this.sourceNativePtr; }
}
\ No newline at end of file
diff --git a/lib/src/main/java/io/v/impl/google/namespace/NamespaceImpl.java b/lib/src/main/java/io/v/impl/google/namespace/NamespaceImpl.java
index 5ceec7c..a2fae33 100644
--- a/lib/src/main/java/io/v/impl/google/namespace/NamespaceImpl.java
+++ b/lib/src/main/java/io/v/impl/google/namespace/NamespaceImpl.java
@@ -4,6 +4,7 @@
package io.v.impl.google.namespace;
+import io.v.v23.VIterable;
import io.v.v23.rpc.Callback;
import org.joda.time.Duration;
@@ -25,11 +26,11 @@
public class NamespaceImpl implements Namespace {
private final long nativePtr;
- private static native Iterable<GlobReply> nativeGlob(
+ private static native VIterable<GlobReply> nativeGlob(
long nativePtr, VContext context, String pattern, Options options) throws VException;
private static native void nativeGlobAsync(long nativePtr, VContext context, String pattern,
- Options options, Callback<Iterable<GlobReply>>
+ Options options, Callback<VIterable<GlobReply>>
callback) throws VException;
private static native void nativeMount(long nativePtr, VContext context, String name,
@@ -221,25 +222,25 @@
}
@Override
- public Iterable<GlobReply> glob(VContext context, String pattern) throws VException {
+ public VIterable<GlobReply> glob(VContext context, String pattern) throws VException {
return glob(context, pattern, (Options) null);
}
@Override
- public Iterable<GlobReply> glob(VContext context, String pattern, Options options)
+ public VIterable<GlobReply> glob(VContext context, String pattern, Options options)
throws VException {
return nativeGlob(nativePtr, context, pattern, options);
}
@Override
- public void glob(VContext context, String pattern, Callback<Iterable<GlobReply>> callback)
+ public void glob(VContext context, String pattern, Callback<VIterable<GlobReply>> callback)
throws VException {
glob(context, pattern, null, callback);
}
@Override
public void glob(VContext context, String pattern, Options options,
- Callback<Iterable<GlobReply>> callback) throws VException {
+ Callback<VIterable<GlobReply>> callback) throws VException {
nativeGlobAsync(nativePtr, context, pattern, options, callback);
}
diff --git a/lib/src/main/java/io/v/v23/VIterable.java b/lib/src/main/java/io/v/v23/VIterable.java
new file mode 100644
index 0000000..9e12368
--- /dev/null
+++ b/lib/src/main/java/io/v/v23/VIterable.java
@@ -0,0 +1,26 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package io.v.v23;
+
+import io.v.v23.verror.VException;
+
+/**
+ * An interface for iterating through a collection of elements.
+ * <p>
+ * The {@link java.util.Iterator} returned by the {@link java.lang.Iterable#iterator iterator}:
+ * <p><ul>
+ * <li>can be created <strong>only</strong> once,
+ * <li>does not support {@link java.util.Iterator#remove remove}</li>,
+ * <li>terminates early if the underlying iteration step fails; in that case, {@link #error}
+ * method returns the error that caused the iteration step to fail.
+ * </ul>
+ */
+public interface VIterable<T> extends Iterable<T> {
+ /**
+ * Returns an error (if any) that caused the iterator to terminate early. Returns {@code null}
+ * if the iterator terminated gracefully.
+ */
+ VException error();
+}
diff --git a/lib/src/main/java/io/v/v23/namespace/Namespace.java b/lib/src/main/java/io/v/v23/namespace/Namespace.java
index 408436f..4ad4138 100644
--- a/lib/src/main/java/io/v/v23/namespace/Namespace.java
+++ b/lib/src/main/java/io/v/v23/namespace/Namespace.java
@@ -4,6 +4,7 @@
package io.v.v23.namespace;
+import io.v.v23.VIterable;
import io.v.v23.rpc.Callback;
import org.joda.time.Duration;
@@ -248,7 +249,7 @@
* A shortcut for {@link #glob(VContext, String, Options)} with a {@code null} options
* parameter.
*/
- Iterable<GlobReply> glob(VContext context, String pattern) throws VException;
+ VIterable<GlobReply> glob(VContext context, String pattern) throws VException;
/**
* Returns the iterator over all names matching the provided pattern. Note that due to the
@@ -266,6 +267,9 @@
* <ul>
* <li>{@link io.v.v23.OptionDefs#SKIP_SERVER_ENDPOINT_AUTHORIZATION}</li>
* </ul>
+ * <p>
+ * {@link io.v.v23.context.CancelableVContext#cancel Canceling} the provided context will
+ * stop the glob operation and terminate the iterator early.
*
* @param context a client context
* @param pattern a pattern that should be matched
@@ -273,43 +277,22 @@
* @return an iterator over {@link GlobReply} objects matching the provided pattern
* @throws VException if an error is encountered
*/
- Iterable<GlobReply> glob(VContext context, String pattern, Options options)
- throws VException;
+ VIterable<GlobReply> glob(VContext context, String pattern, Options options) throws VException;
/**
* A shortcut for {@link #glob(VContext, String, Options, Callback)} with a {@code null} options
* parameter.
*/
- void glob(VContext context, String pattern, Callback<Iterable<GlobReply>> callback)
+ void glob(VContext context, String pattern, Callback<VIterable<GlobReply>> callback)
throws VException;
/**
- * Asynchronously returns the iterator over all names matching the provided pattern. This
- * function returns immediately and the given non-{@code null} callback is called when the
- * operation completes (either successfully or with a failure). Generally, the callback will
- * be called when at least one entry can be read from the iterator. Subsequent calls to
- * {@code next} may block. You should not use the iterator on threads that should not block.
- * <p>
- * You should be aware that the iterator:
- * <p><ul>
- * <li>can be created <strong>only</strong> once</li>
- * <li>does not support {@link java.util.Iterator#remove remove}</li>
- * </ul>
- * <p>
- * A particular implementation of this interface chooses which options to support, but at the
- * minimum it must handle the following pre-defined options:
- * <ul>
- * <li>{@link io.v.v23.OptionDefs#SKIP_SERVER_ENDPOINT_AUTHORIZATION}</li>
- * </ul>
+ * Asynchronous version of {@link #glob(VContext, String, Options)}.
*
- * @param context a client context
- * @param pattern a pattern that should be matched
- * @param options options to pass to the implementation as described above, or {@code null}
- * @param callback a callback whose {@code onSuccess} method will be passed the {@link Iterable}
- * over {@link GlobReply} objects matching the provided pattern
- * @throws VException if an error is encountered
+ * @throws VException if there was an error creating the asynchronous call. In this case, no
+ * methods on {@code callback} will be called.
*/
- void glob(VContext context, String pattern, Options options, Callback<Iterable<GlobReply>>
+ void glob(VContext context, String pattern, Options options, Callback<VIterable<GlobReply>>
callback) throws VException;
/**
@@ -322,7 +305,8 @@
*
* @param roots the roots that will be used to turn relative paths into absolute paths, or
* {@code null} to clear the currently configured set of roots. Each entry should
- * be a Vanadium name, see also <a href="https://github.com/vanadium/docs/blob/master/glossary.md#object-name">
+ * be a Vanadium name, see also
+ * <a href="https://github.com/vanadium/docs/blob/master/glossary.md#object-name">
* the Name entry</a> in the glossary
*/
void setRoots(List<String> roots) throws VException;
diff --git a/lib/src/main/java/io/v/v23/syncbase/nosql/BlobReader.java b/lib/src/main/java/io/v/v23/syncbase/nosql/BlobReader.java
index 1409bd3..edf4961 100644
--- a/lib/src/main/java/io/v/v23/syncbase/nosql/BlobReader.java
+++ b/lib/src/main/java/io/v/v23/syncbase/nosql/BlobReader.java
@@ -4,6 +4,7 @@
package io.v.v23.syncbase.nosql;
+import io.v.v23.VIterable;
import io.v.v23.context.VContext;
import io.v.v23.services.syncbase.nosql.BlobFetchStatus;
import io.v.v23.services.syncbase.nosql.BlobRef;
@@ -55,10 +56,13 @@
* priority blobs are prefetched before the lower priority ones. However an ongoing blob
* transfer is not interrupted.
* <p>
- * The return {@link Stream} can be used to track the progress of the prefetch. When the
- * {@link Stream} exhausts all of the iterable elements, the blob is guaranteed to have
+ * The returned iterator can be used to track the progress of the prefetch. When the
+ * iterator exhausts all of the iterable elements, the blob is guaranteed to have
* been entirely copied to a local cache.
* <p>
+ * {@link io.v.v23.context.CancelableVContext#cancel Canceling} the provided context will
+ * stop the prefetch and terminate the iterator early.
+ * <p>
* This method doesn't block.
*
* @param ctx vanadium context
@@ -66,7 +70,7 @@
* @return a stream used for tracking the progress of the prefetch
* @throws VException if the blob couldn't be prefetched
*/
- Stream<BlobFetchStatus> prefetch(VContext ctx, long priority) throws VException;
+ VIterable<BlobFetchStatus> prefetch(VContext ctx, long priority) throws VException;
/**
* Deletes the blob's local cached copy.
diff --git a/lib/src/main/java/io/v/v23/syncbase/nosql/BlobReaderImpl.java b/lib/src/main/java/io/v/v23/syncbase/nosql/BlobReaderImpl.java
index 63b69da..7323e39 100644
--- a/lib/src/main/java/io/v/v23/syncbase/nosql/BlobReaderImpl.java
+++ b/lib/src/main/java/io/v/v23/syncbase/nosql/BlobReaderImpl.java
@@ -4,11 +4,13 @@
package io.v.v23.syncbase.nosql;
+import io.v.v23.VIterable;
import io.v.v23.context.VContext;
import io.v.v23.services.syncbase.nosql.BlobFetchStatus;
import io.v.v23.services.syncbase.nosql.BlobRef;
import io.v.v23.services.syncbase.nosql.DatabaseClient;
import io.v.v23.vdl.TypedClientStream;
+import io.v.v23.vdl.TypedStreamIterable;
import io.v.v23.vdl.VdlUint64;
import io.v.v23.verror.VException;
@@ -35,10 +37,10 @@
return new BlobInputStream(stream);
}
@Override
- public Stream<BlobFetchStatus> prefetch(VContext ctx, long priority) throws VException {
+ public VIterable<BlobFetchStatus> prefetch(VContext ctx, long priority) throws VException {
TypedClientStream<Void, BlobFetchStatus, Void> stream =
client.fetchBlob(ctx, ref, new VdlUint64(priority));
- return new StreamImpl(ctx.withCancel(), stream);
+ return new TypedStreamIterable(stream);
}
@Override
public long size(VContext ctx) throws VException {
diff --git a/lib/src/main/java/io/v/v23/syncbase/nosql/Database.java b/lib/src/main/java/io/v/v23/syncbase/nosql/Database.java
index 924a5cd..1a5ebb6 100644
--- a/lib/src/main/java/io/v/v23/syncbase/nosql/Database.java
+++ b/lib/src/main/java/io/v/v23/syncbase/nosql/Database.java
@@ -3,6 +3,7 @@
// license that can be found in the LICENSE file.
package io.v.v23.syncbase.nosql;
+import io.v.v23.VIterable;
import io.v.v23.context.VContext;
import io.v.v23.security.access.Permissions;
import io.v.v23.services.syncbase.nosql.BatchOptions;
@@ -82,7 +83,7 @@
/**
* Allows a client to watch for updates to the database. For each watch request, the client will
- * receive a reliable stream of watch events without re-ordering.
+ * receive a reliable iterator of watch events without re-ordering.
* <p>
* See {@link io.v.v23.services.watch.GlobWatcherClient} for a detailed explanation of the
* watch behavior.
@@ -97,16 +98,18 @@
* <li>abort the batch,</li>
* <li>start watching for changes to the data using the {@link ResumeMarker}.</li>
* </ol><p>
+ * {@link io.v.v23.context.CancelableVContext#cancel Canceling} the provided context will
+ * stop the watch operation and terminate the iterator early.
*
* @param ctx vanadium context
* @param tableRelativeName relative name of the table to watch
* @param rowPrefix prefix of the rows to watch
* @param resumeMarker {@link ResumeMarker} from which the changes will be monitored
- * @return a stream of changes
+ * @return a (potentially-infinite) iterator of changes
* @throws VException if there was an error setting up this watch request
*/
- Stream<WatchChange> watch(VContext ctx, String tableRelativeName, String rowPrefix,
- ResumeMarker resumeMarker) throws VException;
+ VIterable<WatchChange> watch(VContext ctx, String tableRelativeName, String rowPrefix,
+ ResumeMarker resumeMarker) throws VException;
/**
* Returns a handle to a database {@link Syncgroup} with the given full (i.e., object) name.
diff --git a/lib/src/main/java/io/v/v23/syncbase/nosql/DatabaseCore.java b/lib/src/main/java/io/v/v23/syncbase/nosql/DatabaseCore.java
index 1ef0b98..e5f465c 100644
--- a/lib/src/main/java/io/v/v23/syncbase/nosql/DatabaseCore.java
+++ b/lib/src/main/java/io/v/v23/syncbase/nosql/DatabaseCore.java
@@ -3,6 +3,7 @@
// license that can be found in the LICENSE file.
package io.v.v23.syncbase.nosql;
+import io.v.v23.VIterable;
import io.v.v23.context.VContext;
import io.v.v23.services.watch.ResumeMarker;
import io.v.v23.vdl.VdlAny;
@@ -42,20 +43,24 @@
String[] listTables(VContext ctx) throws VException;
/**
- * Executes a SyncQL query, returning a {@link ResultStream} object that allows the caller to
+ * Executes a SyncQL query, returning a {@link QueryResults} object that allows the caller to
* iterate over arrays of values for each row that matches the query.
* <p>
* It is legal to perform writes concurrently with {@link #exec exec()}. The returned stream reads
* from a consistent snapshot taken at the time of the method and will not reflect subsequent
* writes to keys not yet reached by the stream.
+ * <p>
+ * {@link io.v.v23.context.CancelableVContext#cancel Canceling} the provided context will
+ * stop the query execution and terminate the returned iterator early.
+
*
* @param ctx Vanadium context
* @param query a SyncQL query
- * @return a {@link ResultStream} object that allows the caller to iterate over
+ * @return a {@link QueryResults} object that allows the caller to iterate over
* arrays of values for each row that matches the query
* @throws VException if there was an error executing the query
*/
- ResultStream exec(VContext ctx, String query) throws VException;
+ QueryResults exec(VContext ctx, String query) throws VException;
/**
* Returns the {@link ResumeMarker} that points to the current state of the database.
@@ -68,7 +73,7 @@
* An interface for iterating through rows resulting from a
* {@link DatabaseCore#exec DatabaseCore.exec()}.
*/
- interface ResultStream extends Stream<List<VdlAny>> {
+ interface QueryResults extends VIterable<List<VdlAny>> {
/**
* Returns an array of column names that matched the query. The size of the {@link VdlAny}
* list returned in every iteration will match the size of this array.
diff --git a/lib/src/main/java/io/v/v23/syncbase/nosql/DatabaseImpl.java b/lib/src/main/java/io/v/v23/syncbase/nosql/DatabaseImpl.java
index cd063f2..501680c 100644
--- a/lib/src/main/java/io/v/v23/syncbase/nosql/DatabaseImpl.java
+++ b/lib/src/main/java/io/v/v23/syncbase/nosql/DatabaseImpl.java
@@ -18,6 +18,7 @@
import com.google.common.collect.Iterators;
import io.v.impl.google.naming.NamingUtil;
+import io.v.v23.VIterable;
import io.v.v23.services.syncbase.nosql.BatchOptions;
import io.v.v23.services.syncbase.nosql.BlobRef;
import io.v.v23.services.syncbase.nosql.DatabaseClient;
@@ -32,6 +33,7 @@
import io.v.v23.context.VContext;
import io.v.v23.security.access.Permissions;
import io.v.v23.vdl.TypedClientStream;
+import io.v.v23.vdl.TypedStreamIterable;
import io.v.v23.vdl.Types;
import io.v.v23.vdl.VdlAny;
import io.v.v23.vdl.VdlOptional;
@@ -77,10 +79,9 @@
return x.toArray(new String[x.size()]);
}
@Override
- public ResultStream exec(VContext ctx, String query) throws VException {
- CancelableVContext ctxC = ctx.withCancel();
+ public QueryResults exec(VContext ctx, String query) throws VException {
TypedClientStream<Void, List<VdlAny>, Void> stream =
- this.client.exec(ctxC, getSchemaVersion(), query);
+ this.client.exec(ctx, getSchemaVersion(), query);
// The first row contains column names, pull them off the stream.
List<VdlAny> row = null;
@@ -99,7 +100,7 @@
"names (of type String), got type: " + elem.getClass());
}
}
- return new ResultStreamImpl(ctxC, stream, Arrays.asList(columnNames));
+ return new QueryResultsImpl(stream, Arrays.asList(columnNames));
}
// Implements AccessController interface.
@@ -134,12 +135,11 @@
return new DatabaseImpl(this.parentFullName, this.name, batchSuffix, this.schema);
}
@Override
- public Stream<WatchChange> watch(VContext ctx, String tableRelativeName, String rowPrefix,
- ResumeMarker resumeMarker) throws VException {
- CancelableVContext ctxC = ctx.withCancel();
- TypedClientStream<Void, Change, Void> stream = this.client.watchGlob(ctxC,
+ public VIterable<WatchChange> watch(VContext ctx, String tableRelativeName, String rowPrefix,
+ ResumeMarker resumeMarker) throws VException {
+ TypedClientStream<Void, Change, Void> stream = this.client.watchGlob(ctx,
new GlobRequest(NamingUtil.join(tableRelativeName, rowPrefix + "*"), resumeMarker));
- return new StreamImpl(ctxC, stream) {
+ return new TypedStreamIterable(stream) {
@Override
public synchronized Iterator iterator() {
return Iterators.transform(super.iterator(), new Function<Change, WatchChange>() {
@@ -249,12 +249,13 @@
return this.schema.getMetadata().getVersion();
}
- private static class ResultStreamImpl extends StreamImpl<List<VdlAny>> implements ResultStream {
+ private static class QueryResultsImpl
+ extends TypedStreamIterable<List<VdlAny>> implements QueryResults {
private final List<String> columnNames;
- private ResultStreamImpl(CancelableVContext ctxC, TypedClientStream<Void,
- List<VdlAny>, Void> stream, List<String> columnNames) {
- super(ctxC, stream);
+ private QueryResultsImpl(TypedClientStream<Void, List<VdlAny>, Void> stream,
+ List<String> columnNames) {
+ super(stream);
this.columnNames = columnNames;
}
@Override
diff --git a/lib/src/main/java/io/v/v23/syncbase/nosql/Stream.java b/lib/src/main/java/io/v/v23/syncbase/nosql/Stream.java
deleted file mode 100644
index 4429170..0000000
--- a/lib/src/main/java/io/v/v23/syncbase/nosql/Stream.java
+++ /dev/null
@@ -1,32 +0,0 @@
-// Copyright 2015 The Vanadium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-package io.v.v23.syncbase.nosql;
-
-import io.v.v23.verror.VException;
-
-/**
- * An interface for iterating through a collection of elements.
- *
- * The {@link java.util.Iterator} returned by the {@link java.lang.Iterable#iterator iterator}:
- * <p><ul>
- * <li>can be created <strong>only</strong> once,
- * <li>does not support {@link java.util.Iterator#remove remove}</li>,
- * <li>stops the iteration early and gracefully if the {@link #cancel} method is invoked,</li>
- * <li>may throw {@link RuntimeException} if the underlying iteration step throws
- * a {@link VException}. The {@link RuntimeException#getCause cause} of the
- * {@link RuntimeException} will be the said {@link VException}.</li>
- * </ul>
- */
-public interface Stream<T> extends Iterable<T> {
- /**
- * Notifies the stream provider that it can stop producing elements. The client must call
- * {@link #cancel} if it does not iterate through all the elements.
- * <p>
- * This method is idempotent and can be called concurrently with a thread that is iterating.
- * <p>
- * This method causes the iterator to (gracefully) terminate early.
- */
- void cancel() throws VException;
-}
diff --git a/lib/src/main/java/io/v/v23/syncbase/nosql/StreamImpl.java b/lib/src/main/java/io/v/v23/syncbase/nosql/StreamImpl.java
deleted file mode 100644
index f3a7ede..0000000
--- a/lib/src/main/java/io/v/v23/syncbase/nosql/StreamImpl.java
+++ /dev/null
@@ -1,61 +0,0 @@
-// Copyright 2015 The Vanadium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-package io.v.v23.syncbase.nosql;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.AbstractIterator;
-import io.v.v23.context.CancelableVContext;
-import io.v.v23.vdl.TypedClientStream;
-import io.v.v23.verror.Errors;
-import io.v.v23.verror.VException;
-
-import java.io.EOFException;
-import java.util.Iterator;
-
-/**
- * Implementation of the {@link Stream} interface that reads from a VDL stream.
- */
-class StreamImpl<T> implements Stream<T> {
- private final CancelableVContext ctxC;
- private final TypedClientStream<Void, T, Void> stream;
- private volatile boolean isCanceled;
- private volatile boolean isCreated;
-
- StreamImpl(CancelableVContext ctxC, TypedClientStream<Void, T, Void> stream) {
- this.ctxC = ctxC;
- this.stream = stream;
- this.isCanceled = this.isCreated = false;
- }
- @Override
- public synchronized Iterator<T> iterator() {
- Preconditions.checkState(!isCreated, "Can only create one Stream iterator.");
- isCreated = true;
- return new AbstractIterator<T>() {
- @Override
- protected T computeNext() {
- synchronized (StreamImpl.this) {
- if (isCanceled) { // client canceled the stream
- return endOfData();
- }
- try {
- return stream.recv();
- } catch (EOFException e) { // legitimate end of stream
- return endOfData();
- } catch (VException e) {
- if (isCanceled || Errors.CANCELED.getID().equals(e.getID())) {
- return endOfData();
- }
- throw new RuntimeException("Error retrieving next stream element.", e);
- }
- }
- }
- };
- }
- @Override
- public synchronized void cancel() throws VException {
- this.isCanceled = true;
- this.ctxC.cancel();
- }
-}
diff --git a/lib/src/main/java/io/v/v23/syncbase/nosql/Table.java b/lib/src/main/java/io/v/v23/syncbase/nosql/Table.java
index 53dc329..dedbe9b 100644
--- a/lib/src/main/java/io/v/v23/syncbase/nosql/Table.java
+++ b/lib/src/main/java/io/v/v23/syncbase/nosql/Table.java
@@ -6,6 +6,7 @@
import java.lang.reflect.Type;
+import io.v.v23.VIterable;
import io.v.v23.context.VContext;
import io.v.v23.security.access.Permissions;
import io.v.v23.services.syncbase.nosql.KeyValue;
@@ -131,13 +132,16 @@
* It is legal to perform writes concurrently with {@link #scan scan()}. The returned stream
* reads from a consistent snapshot taken at the time of the method and will not reflect
* subsequent writes to keys not yet reached by the stream.
+ * <p>
+ * {@link io.v.v23.context.CancelableVContext#cancel Canceling} the provided context will
+ * stop the scan and terminate the iterator early.
*
* @param ctx Vanadium context
* @param range range of rows to be read
* @return a stream used for iterating over the snapshot of the provided rows
* @throws VException if the scan stream couldn't be created
*/
- Stream<KeyValue> scan(VContext ctx, RowRange range) throws VException;
+ VIterable<KeyValue> scan(VContext ctx, RowRange range) throws VException;
/**
* Returns an array of {@link PrefixPermissions} (i.e., {@code (prefix, perms)} pairs) for
diff --git a/lib/src/main/java/io/v/v23/syncbase/nosql/TableImpl.java b/lib/src/main/java/io/v/v23/syncbase/nosql/TableImpl.java
index 1f08a27..0e5fc67 100644
--- a/lib/src/main/java/io/v/v23/syncbase/nosql/TableImpl.java
+++ b/lib/src/main/java/io/v/v23/syncbase/nosql/TableImpl.java
@@ -8,6 +8,7 @@
import java.util.List;
import io.v.impl.google.naming.NamingUtil;
+import io.v.v23.VIterable;
import io.v.v23.services.syncbase.nosql.KeyValue;
import io.v.v23.services.syncbase.nosql.PrefixPermissions;
import io.v.v23.services.syncbase.nosql.TableClient;
@@ -17,6 +18,7 @@
import io.v.v23.context.VContext;
import io.v.v23.security.access.Permissions;
import io.v.v23.vdl.TypedClientStream;
+import io.v.v23.vdl.TypedStreamIterable;
import io.v.v23.verror.VException;
class TableImpl implements Table {
@@ -86,11 +88,10 @@
Util.getBytes(range.getStart()), Util.getBytes(range.getLimit()));
}
@Override
- public Stream<KeyValue> scan(VContext ctx, RowRange range) throws VException {
- CancelableVContext ctxC = ctx.withCancel();
- TypedClientStream<Void, KeyValue, Void> stream = this.client.scan(ctxC, this.schemaVersion,
+ public VIterable<KeyValue> scan(VContext ctx, RowRange range) throws VException {
+ TypedClientStream<Void, KeyValue, Void> stream = this.client.scan(ctx, this.schemaVersion,
Util.getBytes(range.getStart()), Util.getBytes(range.getLimit()));
- return new StreamImpl(ctxC, stream);
+ return new TypedStreamIterable(stream);
}
@Override
public PrefixPermissions[] getPrefixPermissions(VContext ctx, String key) throws VException {
diff --git a/lib/src/main/java/io/v/v23/vdl/TypedStreamIterable.java b/lib/src/main/java/io/v/v23/vdl/TypedStreamIterable.java
new file mode 100644
index 0000000..9dbcb61
--- /dev/null
+++ b/lib/src/main/java/io/v/v23/vdl/TypedStreamIterable.java
@@ -0,0 +1,54 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package io.v.v23.vdl;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.AbstractIterator;
+import io.v.v23.VIterable;
+import io.v.v23.verror.Errors;
+import io.v.v23.verror.VException;
+
+import java.io.EOFException;
+import java.util.Iterator;
+
+/**
+ * Implementation of {@link VIterable} that reads from a {@link TypedClientStream}.
+ */
+public class TypedStreamIterable<T> implements VIterable<T> {
+ private final TypedClientStream<Void, T, Void> stream;
+ private volatile boolean isCreated;
+ protected VException error;
+
+ public TypedStreamIterable(TypedClientStream<Void, T, Void> stream) {
+ this.stream = stream;
+ }
+ @Override
+ public synchronized Iterator<T> iterator() {
+ Preconditions.checkState(!isCreated, "Can only create one Stream iterator.");
+ isCreated = true;
+ return new AbstractIterator<T>() {
+ @Override
+ protected T computeNext() {
+ synchronized (TypedStreamIterable.this) {
+ try {
+ return stream.recv();
+ } catch (EOFException e) { // legitimate end of stream
+ return endOfData();
+ } catch (VException e) {
+ if (!Errors.CANCELED.getID().equals(e.getID())) { // stream not canceled
+ error = e;
+ }
+ return endOfData();
+ }
+ }
+ }
+ };
+ }
+
+ @Override
+ public VException error() {
+ return error;
+ }
+}
diff --git a/lib/src/test/java/io/v/impl/google/namespace/NamespaceTest.java b/lib/src/test/java/io/v/impl/google/namespace/NamespaceTest.java
index 91f5c80..cee1c9e 100644
--- a/lib/src/test/java/io/v/impl/google/namespace/NamespaceTest.java
+++ b/lib/src/test/java/io/v/impl/google/namespace/NamespaceTest.java
@@ -12,6 +12,7 @@
import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
+import io.v.v23.VIterable;
import junit.framework.TestCase;
import org.joda.time.Duration;
@@ -190,7 +191,7 @@
Namespace n = V.getNamespace(ctx);
n.mount(ctx, "test/test", dummyServerEndpoint.name(), Duration.standardDays(1));
{
- final FutureCallback<Iterable<GlobReply>> callback = new FutureCallback<>();
+ final FutureCallback<VIterable<GlobReply>> callback = new FutureCallback<>();
n.glob(ctx, "test/*", callback);
List<GlobReply> reply = ImmutableList.copyOf(
@@ -207,7 +208,7 @@
n.mount(ctx, "test/test", dummyServerEndpoint.name(), Duration.standardDays(1));
CancelableVContext cancelContext = ctx.withCancel();
- FutureCallback<Iterable<GlobReply>> callback = new FutureCallback<>();
+ FutureCallback<VIterable<GlobReply>> callback = new FutureCallback<>();
n.glob(cancelContext, "test/*", callback);
cancelContext.cancel();
List<GlobReply> replies = ImmutableList.copyOf(Uninterruptibles.getUninterruptibly(callback.getFuture(), 1, TimeUnit.SECONDS));
diff --git a/lib/src/test/java/io/v/v23/syncbase/SyncbaseTest.java b/lib/src/test/java/io/v/v23/syncbase/SyncbaseTest.java
index 9da28a8..b9ee6a3 100644
--- a/lib/src/test/java/io/v/v23/syncbase/SyncbaseTest.java
+++ b/lib/src/test/java/io/v/v23/syncbase/SyncbaseTest.java
@@ -10,6 +10,7 @@
import com.google.common.io.Files;
import io.v.impl.google.naming.NamingUtil;
import io.v.impl.google.services.syncbase.SyncbaseServer;
+import io.v.v23.VIterable;
import io.v.v23.context.CancelableVContext;
import io.v.v23.naming.Endpoint;
import io.v.v23.rpc.ListenSpec;
@@ -19,6 +20,7 @@
import io.v.v23.services.syncbase.nosql.SyncgroupMemberInfo;
import io.v.v23.services.syncbase.nosql.TableRow;
import io.v.v23.services.syncbase.nosql.SyncgroupSpec;
+import io.v.v23.services.watch.ResumeMarker;
import io.v.v23.syncbase.nosql.BatchDatabase;
import io.v.v23.syncbase.nosql.BlobReader;
import io.v.v23.syncbase.nosql.BlobWriter;
@@ -27,7 +29,6 @@
import io.v.v23.syncbase.nosql.DatabaseCore;
import io.v.v23.syncbase.nosql.Row;
import io.v.v23.syncbase.nosql.RowRange;
-import io.v.v23.syncbase.nosql.Stream;
import io.v.v23.syncbase.nosql.Syncgroup;
import io.v.v23.syncbase.nosql.Table;
import io.v.v23.syncbase.util.Util;
@@ -211,16 +212,16 @@
table.put(ctx, "baz", baz, Baz.class);
{
- DatabaseCore.ResultStream stream = db.exec(ctx,
+ DatabaseCore.QueryResults results = db.exec(ctx,
"select k, v.Name from " + TABLE_NAME + " where Type(v) like \"%Baz\"");
- assertThat(stream.columnNames()).containsExactly("k", "v.Name");
- assertThat(stream).containsExactly(ImmutableList.of(
+ assertThat(results.columnNames()).containsExactly("k", "v.Name");
+ assertThat(results).containsExactly(ImmutableList.of(
new VdlAny(String.class, "baz"), new VdlAny(String.class, baz.name)));
}
{
- DatabaseCore.ResultStream stream = db.exec(ctx, "select k, v from " + TABLE_NAME);
- assertThat(stream.columnNames()).containsExactly("k", "v");
- assertThat(stream).containsExactly(
+ DatabaseCore.QueryResults results = db.exec(ctx, "select k, v from " + TABLE_NAME);
+ assertThat(results.columnNames()).containsExactly("k", "v");
+ assertThat(results).containsExactly(
ImmutableList.of(new VdlAny(String.class, "bar"), new VdlAny(Bar.class, bar)),
ImmutableList.of(new VdlAny(String.class, "baz"), new VdlAny(Baz.class, baz)),
ImmutableList.of(new VdlAny(String.class, "foo"), new VdlAny(Foo.class, foo))
@@ -234,8 +235,8 @@
Foo foo = new Foo(4, "f");
Bar bar = new Bar(0.5f, "b");
Baz baz = new Baz("John Doe", true);
+ ResumeMarker marker = db.getResumeMarker(ctx);
- Stream<WatchChange> watchStream = db.watch(ctx, TABLE_NAME, "b", db.getResumeMarker(ctx));
table.put(ctx, "foo", foo, Foo.class);
table.put(ctx, "bar", bar, Bar.class);
table.put(ctx, "baz", baz, Baz.class);
@@ -248,10 +249,11 @@
VomUtil.encode(baz, Baz.class), null, false, false),
new WatchChange(TABLE_NAME, "baz", ChangeType.DELETE_CHANGE,
new byte[0], null, false, false ));
- Iterator<WatchChange> iterator = watchStream.iterator();
+ CancelableVContext ctxC = ctx.withCancel();
+ Iterator<WatchChange> it = db.watch(ctxC, TABLE_NAME, "b", marker).iterator();
for (WatchChange expected : expectedChanges) {
- assertThat(iterator.hasNext());
- WatchChange actual = iterator.next();
+ assertThat(it.hasNext());
+ WatchChange actual = it.next();
assertThat(actual.getTableName()).isEqualTo(expected.getTableName());
assertThat(actual.getRowName()).isEqualTo(expected.getRowName());
assertThat(actual.getChangeType()).isEqualTo(expected.getChangeType());
@@ -259,7 +261,7 @@
assertThat(actual.isFromSync()).isEqualTo(expected.isFromSync());
assertThat(actual.isContinued()).isEqualTo(expected.isContinued());
}
- watchStream.cancel();
+ ctxC.cancel();
}
public void testDatabaseWatchWithContextCancel() throws Exception {
@@ -267,7 +269,7 @@
Database db = createDatabase(createApp(createService()));
createTable(db);
- Stream<WatchChange> watchStream = db.watch(
+ VIterable<WatchChange> it = db.watch(
cancelCtx, TABLE_NAME, "b", db.getResumeMarker(ctx));
new Thread(new Runnable() {
@Override
@@ -275,7 +277,7 @@
cancelCtx.cancel();
}
}).start();
- assertThat(watchStream).isEmpty();
+ assertThat(it).isEmpty();
}
public void testBatch() throws Exception {
diff --git a/projects/namespace_browser/src/main/java/io/v/android/apps/namespace_browser/Namespace.java b/projects/namespace_browser/src/main/java/io/v/android/apps/namespace_browser/Namespace.java
index e090389..005e860 100644
--- a/projects/namespace_browser/src/main/java/io/v/android/apps/namespace_browser/Namespace.java
+++ b/projects/namespace_browser/src/main/java/io/v/android/apps/namespace_browser/Namespace.java
@@ -29,7 +29,6 @@
public static List<GlobReply> glob(String root, VContext ctx) throws VException {
io.v.v23.namespace.Namespace n = V.getNamespace(ctx);
VContext ctxT = ctx.withTimeout(new Duration(20000)); // 20s
- Iterable<GlobReply> chan = n.glob(ctxT, root.isEmpty() ? "*" : root + "/*");
- return ImmutableList.copyOf(chan);
+ return ImmutableList.copyOf(n.glob(ctxT, root.isEmpty() ? "*" : root + "/*"));
}
}
diff --git a/projects/syncslides/app/src/main/java/io/v/android/apps/syncslides/db/CurrentSlideWatcher.java b/projects/syncslides/app/src/main/java/io/v/android/apps/syncslides/db/CurrentSlideWatcher.java
index beab3d1..675c086 100644
--- a/projects/syncslides/app/src/main/java/io/v/android/apps/syncslides/db/CurrentSlideWatcher.java
+++ b/projects/syncslides/app/src/main/java/io/v/android/apps/syncslides/db/CurrentSlideWatcher.java
@@ -13,12 +13,12 @@
import java.util.List;
import io.v.impl.google.naming.NamingUtil;
+import io.v.v23.VIterable;
import io.v.v23.context.CancelableVContext;
import io.v.v23.context.VContext;
import io.v.v23.syncbase.nosql.BatchDatabase;
import io.v.v23.syncbase.nosql.ChangeType;
import io.v.v23.syncbase.nosql.Database;
-import io.v.v23.syncbase.nosql.Stream;
import io.v.v23.syncbase.nosql.Table;
import io.v.v23.syncbase.nosql.WatchChange;
import io.v.v23.verror.VException;
@@ -122,10 +122,10 @@
}
});
}
- Stream<WatchChange> changeStream;
- changeStream = mDB.watch(mVContext, SyncbaseDB.PRESENTATIONS_TABLE, rowKey,
+ VIterable<WatchChange> changes =
+ mDB.watch(mVContext, SyncbaseDB.PRESENTATIONS_TABLE, rowKey,
batch.getResumeMarker(mVContext));
- for (WatchChange change : changeStream) {
+ for (WatchChange change : changes) {
if (!change.getTableName().equals(SyncbaseDB.PRESENTATIONS_TABLE)) {
Log.e(TAG, "Wrong change table name: " + change.getTableName() + ", wanted: " +
SyncbaseDB.PRESENTATIONS_TABLE);
diff --git a/projects/syncslides/app/src/main/java/io/v/android/apps/syncslides/db/DriverWatcher.java b/projects/syncslides/app/src/main/java/io/v/android/apps/syncslides/db/DriverWatcher.java
index 0e29070..4fb43da 100644
--- a/projects/syncslides/app/src/main/java/io/v/android/apps/syncslides/db/DriverWatcher.java
+++ b/projects/syncslides/app/src/main/java/io/v/android/apps/syncslides/db/DriverWatcher.java
@@ -7,9 +7,9 @@
import android.util.Log;
import io.v.impl.google.naming.NamingUtil;
+import io.v.v23.VIterable;
import io.v.v23.syncbase.nosql.BatchDatabase;
import io.v.v23.syncbase.nosql.ChangeType;
-import io.v.v23.syncbase.nosql.Stream;
import io.v.v23.syncbase.nosql.Table;
import io.v.v23.syncbase.nosql.WatchChange;
import io.v.v23.verror.VException;
@@ -67,10 +67,10 @@
postInUiThread(presentation.getDriver().getElem());
}
- Stream<WatchChange> watch = mState.db.watch(
+ VIterable<WatchChange> changes = mState.db.watch(
mState.vContext, SyncbaseDB.PRESENTATIONS_TABLE, row,
batch.getResumeMarker(mState.vContext));
- for (WatchChange change : watch) {
+ for (WatchChange change : changes) {
Log.i(TAG, "Found change " + change.getChangeType());
if (!change.getRowName().equals(row)) {
continue;
diff --git a/projects/syncslides/app/src/main/java/io/v/android/apps/syncslides/db/QuestionWatcher.java b/projects/syncslides/app/src/main/java/io/v/android/apps/syncslides/db/QuestionWatcher.java
index 2c82486..ce929be 100644
--- a/projects/syncslides/app/src/main/java/io/v/android/apps/syncslides/db/QuestionWatcher.java
+++ b/projects/syncslides/app/src/main/java/io/v/android/apps/syncslides/db/QuestionWatcher.java
@@ -14,11 +14,11 @@
import io.v.android.apps.syncslides.model.Question;
import io.v.impl.google.naming.NamingUtil;
+import io.v.v23.VIterable;
import io.v.v23.services.syncbase.nosql.KeyValue;
import io.v.v23.syncbase.nosql.BatchDatabase;
import io.v.v23.syncbase.nosql.ChangeType;
import io.v.v23.syncbase.nosql.RowRange;
-import io.v.v23.syncbase.nosql.Stream;
import io.v.v23.syncbase.nosql.Table;
import io.v.v23.syncbase.nosql.WatchChange;
import io.v.v23.verror.VException;
@@ -73,14 +73,14 @@
Log.i(TAG, "Watching questions: " + prefix);
BatchDatabase batch = mState.db.beginBatch(mState.vContext, null);
Table presentations = batch.getTable(SyncbaseDB.PRESENTATIONS_TABLE);
- Stream<KeyValue> stream = presentations.scan(mState.vContext, RowRange.prefix(prefix));
- for (KeyValue keyValue : stream) {
- VQuestion value = (VQuestion) VomUtil.decode(keyValue.getValue(), VQuestion.class);
+ VIterable<KeyValue> rows = presentations.scan(mState.vContext, RowRange.prefix(prefix));
+ for (KeyValue row : rows) {
+ VQuestion value = (VQuestion) VomUtil.decode(row.getValue(), VQuestion.class);
if (value.getAnswered()) {
continue;
}
final Question question = new Question(
- lastPart(keyValue.getKey()),
+ lastPart(row.getKey()),
value.getQuestioner().getName(),
value.getTime());
mState.handler.post(new Runnable() {
@@ -91,10 +91,10 @@
});
}
- Stream<WatchChange> watch = mState.db.watch(
+ VIterable<WatchChange> changes = mState.db.watch(
mState.vContext, SyncbaseDB.PRESENTATIONS_TABLE, prefix,
batch.getResumeMarker(mState.vContext));
- for (WatchChange change : watch) {
+ for (WatchChange change : changes) {
Log.i(TAG, "Found change " + change.getChangeType());
final String id = lastPart(change.getRowName());
if (change.getChangeType().equals(ChangeType.PUT_CHANGE)) {
diff --git a/projects/syncslides/app/src/main/java/io/v/android/apps/syncslides/db/SyncbaseDB.java b/projects/syncslides/app/src/main/java/io/v/android/apps/syncslides/db/SyncbaseDB.java
index a8a8039..de35dc9 100644
--- a/projects/syncslides/app/src/main/java/io/v/android/apps/syncslides/db/SyncbaseDB.java
+++ b/projects/syncslides/app/src/main/java/io/v/android/apps/syncslides/db/SyncbaseDB.java
@@ -19,6 +19,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import io.v.v23.VIterable;
import org.joda.time.Duration;
import java.io.ByteArrayOutputStream;
@@ -61,7 +62,6 @@
import io.v.v23.syncbase.nosql.Database;
import io.v.v23.syncbase.nosql.DatabaseCore;
import io.v.v23.syncbase.nosql.RowRange;
-import io.v.v23.syncbase.nosql.Stream;
import io.v.v23.syncbase.nosql.Syncgroup;
import io.v.v23.syncbase.nosql.Table;
import io.v.v23.syncbase.nosql.WatchChange;
@@ -189,7 +189,7 @@
if (!mPresentations.exists(mVContext)) {
mPresentations.create(mVContext, mPermissions);
}
- //importDecks();
+ importDecks();
} catch (VException e) {
handleError("Couldn't setup syncbase service: " + e.getMessage());
return;
@@ -369,9 +369,9 @@
try {
BatchDatabase batch = mDB.beginBatch(mVContext, null);
mWatchMarker = batch.getResumeMarker(mVContext);
- DatabaseCore.ResultStream stream = batch.exec(mVContext,
+ DatabaseCore.QueryResults results = batch.exec(mVContext,
"SELECT k, v FROM Decks WHERE Type(v) like \"%VDeck\"");
- for (List<VdlAny> row : stream) {
+ for (List<VdlAny> row : results) {
if (row.size() != 2) {
throw new VException("Wrong number of columns: " + row.size());
}
@@ -385,6 +385,9 @@
}
});
}
+ if (results.error() != null) {
+ Log.e(TAG, "Couldn't get all decks due to error: " + results.error());
+ }
watchForDeckChanges();
} catch (VException e) {
Log.e(TAG, e.toString());
@@ -392,14 +395,14 @@
}
private void watchForDeckChanges() {
- Stream<WatchChange> changeStream = null;
+ VIterable<WatchChange> changes = null;
try {
- changeStream = mDB.watch(mVContext, DECKS_TABLE, "", mWatchMarker);
+ changes = mDB.watch(mVContext, DECKS_TABLE, "", mWatchMarker);
} catch (VException e) {
Log.e(TAG, "Couldn't watch for changes to the Decks table: " + e.toString());
return;
}
- for (WatchChange change : changeStream) {
+ for (WatchChange change : changes) {
if (!change.getTableName().equals(DECKS_TABLE)) {
Log.e(TAG, "Wrong change table name: " + change.getTableName() + ", wanted: " +
DECKS_TABLE);
@@ -437,6 +440,9 @@
});
}
}
+ if (changes.error() != null) {
+ Log.e(TAG, "Deck change thread exited early due to error: " + changes.error());
+ }
Log.i(TAG, "Deck change thread exiting");
}
@@ -538,11 +544,11 @@
String query = "SELECT k, v FROM Decks WHERE Type(v) LIKE \"%VSlide\" " +
"AND k LIKE \"" + NamingUtil.join(deckId, "slides") + "%\"";
- DatabaseCore.ResultStream stream = batch.exec(mVContext, query);
+ DatabaseCore.QueryResults results = batch.exec(mVContext, query);
// TODO(kash): Abort execution if interrupted. Perhaps we should derive
// a new VContext so it can be cancelled.
final List<Slide> slides = Lists.newArrayList();
- for (List<VdlAny> row : stream) {
+ for (List<VdlAny> row : results) {
if (row.size() != 2) {
throw new VException("Wrong number of columns: " + row.size());
}
@@ -552,6 +558,9 @@
String note = notesForSlide(mVContext, table, key);
slides.add(new SlideImpl(slide.getThumbnail(), note));
}
+ if (results.error() != null) {
+ Log.e(TAG, "Couldn't get all slides due to error: " + results.error());
+ }
Handler handler = new Handler(Looper.getMainLooper());
handler.post(new Runnable() {
@Override
@@ -616,8 +625,8 @@
String query = "SELECT k, v FROM Decks WHERE Type(v) LIKE \"%VSlide\" " +
"AND k LIKE \"" + NamingUtil.join(mDeckId, "slides") + "%\"";
- DatabaseCore.ResultStream stream = batch.exec(mVContext, query);
- for (List<VdlAny> row : stream) {
+ DatabaseCore.QueryResults results = batch.exec(mVContext, query);
+ for (List<VdlAny> row : results) {
if (row.size() != 2) {
throw new VException("Wrong number of columns: " + row.size());
}
@@ -633,6 +642,9 @@
}
});
}
+ if (results.error() != null) {
+ Log.e(TAG, "Couldn't get all slides due to error: " + results.error());
+ }
watchForSlideChanges();
} catch (VException e) {
Log.e(TAG, e.toString());
@@ -641,15 +653,15 @@
private void watchForSlideChanges() {
Table notesTable = mDB.getTable(NOTES_TABLE);
- Stream<WatchChange> changeStream;
+ VIterable<WatchChange> changes;
try {
- changeStream = mDB.watch(mVContext, DECKS_TABLE, "", mWatchMarker);
+ changes = mDB.watch(mVContext, DECKS_TABLE, "", mWatchMarker);
} catch (VException e) {
Log.e(TAG, "Couldn't watch for changes to the Decks table: " + e.toString());
return;
}
- for (WatchChange change : changeStream) {
+ for (WatchChange change : changes) {
if (!change.getTableName().equals(DECKS_TABLE)) {
Log.e(TAG, "Wrong change table name: " + change.getTableName() + ", wanted: " +
DECKS_TABLE);
@@ -687,7 +699,10 @@
});
}
}
- Log.i(TAG, "Slides watcher thread exiting");
+ if (changes.error() != null) {
+ Log.e(TAG, "Slides change thread exited early due to error: " + changes.error());
+ }
+ Log.i(TAG, "Slides change thread exiting");
}
@Override