Merge branch 'ble' into discovery
Change-Id: Ifd34c03eba21fd4738940f64e7624a5edbddd9ed
diff --git a/android-lib/build.gradle b/android-lib/build.gradle
index f2d5260..d21fcc6 100644
--- a/android-lib/build.gradle
+++ b/android-lib/build.gradle
@@ -157,7 +157,7 @@
// Copy the shared library to its ultimate destination.
task copyVanadiumLib(type: Copy, dependsOn: goBuildVanadiumLib) {
- from jiriRoot + '/release/go/pkg/android_arm_shared/v.io/x/jni'
+ from jiriRoot + '/release/go/pkg/android_arm_shared_shared/v.io/x/jni'
into 'src/main/jniLibs/armeabi-v7a'
include 'main.a'
rename 'main.a', 'libv23.so'
diff --git a/android-lib/src/main/java/io/v/android/libs/discovery/ble/BlePlugin.java b/android-lib/src/main/java/io/v/android/libs/discovery/ble/BlePlugin.java
index ddff860..a729bf8 100644
--- a/android-lib/src/main/java/io/v/android/libs/discovery/ble/BlePlugin.java
+++ b/android-lib/src/main/java/io/v/android/libs/discovery/ble/BlePlugin.java
@@ -50,7 +50,7 @@
import io.v.x.ref.lib.discovery.Advertisement;
/**
- * The Discovery Plugin Interface for Bluetooth.
+ * The discovery plugin interface for Bluetooth.
*/
public class BlePlugin {
// We are using a constant for the MTU because Android and paypal/gatt don't get along
@@ -59,21 +59,21 @@
private static final int MTU = 23;
// Object used to lock advertisement objects.
- private final Object advertisementLock;
+ private final Object advertisementLock = new Object();
// The id to assign to the next advertisment.
private int nextAdv;
// A map of advertisement ids to the advertisement that corresponds to them.
- private Map<Integer, BluetoothGattService> advertisements;
+ private final Map<Integer, BluetoothGattService> advertisements = new HashMap<>();
// A map of advertisement ids to the thread waiting for cancellation of the context.
- private Map<Integer, Thread> advCancellationThreads;
+ private final Map<Integer, Thread> advCancellationThreads = new HashMap<>();
// Object used to lock scanner objects
- private final Object scannerLock;
+ private final Object scannerLock = new Object();
// A map of scanner ids to the thread waiting for cancellation of the context.
- private Map<Integer, Thread> scanCancellationThreads;
- private DeviceCache cachedDevices;
+ private final Map<Integer, Thread> scanCancellationThreads = new HashMap<>();
+ private final DeviceCache cachedDevices;
// Used to track the set of devices we currently talking to.
- private Set<String> pendingCalls;
+ private final Set<String> pendingCalls = new HashSet<>();
// Set of Ble objects that will be interacted with to perform operations.
private BluetoothLeAdvertiser bluetoothLeAdvertise;
@@ -87,15 +87,15 @@
private boolean isScanning;
- private Context androidContext;
+ private final Context androidContext;
// A thread to wait for the cancellation of a particular advertisement. VContext.done().await()
// is blocking so have to spin up a thread per outstanding advertisement.
private class AdvertisementCancellationRunner implements Runnable{
- private VContext ctx;
+ private final VContext ctx;
- private int id;
+ private final int id;
AdvertisementCancellationRunner(VContext ctx, int id) {
this.id = id;
this.ctx = ctx;
@@ -126,18 +126,7 @@
}
public BlePlugin(Context androidContext) {
- advertisementLock = new Object();
- nextAdv = 0;
- advertisements = new TreeMap<>();
- advCancellationThreads = new HashMap<>();
-
- scannerLock = new Object();
- cachedDevices = new DeviceCache(Duration.standardMinutes(5));
- ;
- scanCancellationThreads = new HashMap<>();
- isScanning = false;
- pendingCalls = new HashSet<>();
-
+ cachedDevices = new DeviceCache(Duration.standardMinutes(1));
bluetoothLeAdvertise = BluetoothAdapter.getDefaultAdapter().getBluetoothLeAdvertiser();
bluetoothLeScanner = BluetoothAdapter.getDefaultAdapter().getBluetoothLeScanner();
this.androidContext = androidContext;
diff --git a/android-lib/src/main/java/io/v/android/libs/discovery/ble/BluetoothGattClientCallback.java b/android-lib/src/main/java/io/v/android/libs/discovery/ble/BluetoothGattClientCallback.java
index 62e3c3e..991ed60 100644
--- a/android-lib/src/main/java/io/v/android/libs/discovery/ble/BluetoothGattClientCallback.java
+++ b/android-lib/src/main/java/io/v/android/libs/discovery/ble/BluetoothGattClientCallback.java
@@ -21,24 +21,31 @@
* A handler for responses from a GattServer.
*/
public class BluetoothGattClientCallback extends BluetoothGattCallback {
- // We want to ignore the GATT and GAP services, which are 1800 and 1801 respectively.
- static final String GATT_AND_GAP_PREFIX = "0000180";
+ /**
+ * A handler that will get called when all the services from a gatt service
+ * are read.
+ */
public interface Callback {
+ /**
+ * Called with the map of service ids to their attributes.
+ * @param services A map from service id to (characteristics uuid to values).
+ */
void handle(Map<UUID, Map<UUID, byte[]>> services);
}
- private Callback callback;
+ // We want to ignore the GATT and GAP services, which are 1800 and 1801 respectively.
+ static final String GATT_AND_GAP_PREFIX = "0000180";
- private Map<UUID, Map<UUID, byte[]>> services;
+ private final Callback callback;
+
+ private final Map<UUID, Map<UUID, byte[]>> services = new HashMap<>();
private BluetoothGatt gatt;
- private List<BluetoothGattCharacteristic> chars;
+ private final List<BluetoothGattCharacteristic> chars = new ArrayList<>();
private int pos;
BluetoothGattClientCallback(Callback cb) {
callback = cb;
- chars = new ArrayList<>();
- services = new Hashtable<>();
}
@Override
diff --git a/gradle-plugin/src/main/groovy/io/v/vdl/vdl.groovy b/gradle-plugin/src/main/groovy/io/v/vdl/vdl.groovy
index 296c6a9..d6faed1 100644
--- a/gradle-plugin/src/main/groovy/io/v/vdl/vdl.groovy
+++ b/gradle-plugin/src/main/groovy/io/v/vdl/vdl.groovy
@@ -31,7 +31,16 @@
def prepareTask = project.task('prepareVdl') {
doLast {
generateTask.environment(VDLPATH: project.vdl.inputPaths.join(":"))
- generateTask.commandLine('build/vdltool/vdl-' + getOsName(), '--builtin_vdlroot', 'generate', '--lang=java', "--java-out-dir=${project.vdl.outputPath}", 'all')
+ List<String> commandLine = ['build/vdltool/vdl-' + getOsName(),
+ '--builtin_vdlroot', 'generate',
+ '--lang=java',
+ "--java-out-dir=${project.vdl.outputPath}"
+ ]
+ if (!project.vdl.packageTranslations.isEmpty()) {
+ commandLine.add('--java-out-pkg=' + project.vdl.packageTranslations.join(','))
+ }
+ commandLine.add('all')
+ generateTask.commandLine(commandLine)
}
}
def removeVdlRootTask = project.task('removeVdlRoot', type: Delete) {
@@ -96,6 +105,7 @@
List<String> inputPaths = []
String outputPath = "generated-src/vdl"
String vdlToolPath = ""
+ List<String> packageTranslations = []
// If true, code generated for the vdlroot vdl package will be emitted.
// Typically, users will want to leave this set to false as they will
diff --git a/lib/src/main/java/io/v/impl/google/channel/ChannelIterable.java b/lib/src/main/java/io/v/impl/google/channel/ChannelIterable.java
index 309272f..36a7655 100644
--- a/lib/src/main/java/io/v/impl/google/channel/ChannelIterable.java
+++ b/lib/src/main/java/io/v/impl/google/channel/ChannelIterable.java
@@ -5,23 +5,30 @@
package io.v.impl.google.channel;
import com.google.common.collect.AbstractIterator;
+import io.v.v23.VIterable;
+import io.v.v23.verror.VException;
import java.io.EOFException;
import java.util.Iterator;
/**
- * An implementation of {@link Iterable} that reads data from an underlying Go channel.
+ * An implementation of {@link VIterable} that reads data from an underlying Go channel.
*/
-public class ChannelIterable<T> implements Iterable<T> {
- private final long nativePtr;
- private final long sourceNativePtr;
+class ChannelIterable<T> implements VIterable<T> {
+ private final long nativeValuePtr;
+ private final long nativeErrorPtr;
+ private final long nativeSourcePtr;
+ private volatile VException error;
- private native Object nativeReadValue(long nativePtr) throws EOFException;
- private native void nativeFinalize(long nativePtr, long sourceNativePtr);
+ private native Object nativeReadValue(long nativeValuePtr, long nativeErrorPtr)
+ throws EOFException, VException;
+ private native void nativeFinalize(
+ long nativeValuePtr, long nativeErrorPtr, long nativeSourcePtr);
- private ChannelIterable(long nativePtr, long sourceNativePtr) {
- this.nativePtr = nativePtr;
- this.sourceNativePtr = sourceNativePtr;
+ private ChannelIterable(long nativeValuePtr, long nativeErrorPtr, long nativeSourcePtr) {
+ this.nativeValuePtr = nativeValuePtr;
+ this.nativeErrorPtr = nativeErrorPtr;
+ this.nativeSourcePtr = nativeSourcePtr;
}
@Override
@@ -33,31 +40,25 @@
return readValue();
} catch (EOFException e) {
return endOfData();
+ } catch (VException e) {
+ error = e;
+ return endOfData();
}
}
};
}
- private T readValue() throws EOFException {
- return (T) nativeReadValue(this.nativePtr);
+ @Override
+ public VException error() {
+ return error;
+ }
+
+ private T readValue() throws EOFException, VException {
+ return (T) nativeReadValue(nativeValuePtr, nativeErrorPtr);
}
@Override
protected void finalize() {
- nativeFinalize(this.nativePtr, this.sourceNativePtr);
+ nativeFinalize(nativeValuePtr, nativeErrorPtr, nativeSourcePtr);
}
-
- /**
- * Returns the native pointer to the Go channel of Java objects.
- *
- * @return the native pointer to the Go channel of Java objects
- */
- public long getNativePtr() { return this.nativePtr; }
-
- /**
- * Returns the native pointer to the Go channel that feeds the above Go channel of Java object.
- *
- * @return the native pointer to the Go channel that feeds the above Go channel of Java object
- */
- public long getSourceNativePtr() { return this.sourceNativePtr; }
}
\ No newline at end of file
diff --git a/lib/src/main/java/io/v/impl/google/lib/discovery/DeviceCache.java b/lib/src/main/java/io/v/impl/google/lib/discovery/DeviceCache.java
index 14ecf28..0cef869 100644
--- a/lib/src/main/java/io/v/impl/google/lib/discovery/DeviceCache.java
+++ b/lib/src/main/java/io/v/impl/google/lib/discovery/DeviceCache.java
@@ -22,29 +22,25 @@
import io.v.x.ref.lib.discovery.Advertisement;
/**
- * A Cache of ble devices that were seen recently. The current Vanadium BLE protocol requires
+ * A cache of ble devices that were seen recently. The current Vanadium BLE protocol requires
* connecting to the advertiser to grab the attributes and the addrs. This can be expensive
* so we only refetch the data if it hash changed.
*/
public class DeviceCache {
- private Map<Long, CacheEntry> cachedDevices;
- private Map<String, CacheEntry> knownIds;
+
+ final private Map<Long, CacheEntry> cachedDevices = new HashMap<>();
+ final private Map<String, CacheEntry> knownIds = new HashMap<>();
final AtomicInteger nextScanner = new AtomicInteger(0);
- private SetMultimap<UUID, Advertisement> knownServices;
- private Map<Integer, VScanner> scannersById;
- private SetMultimap<UUID, VScanner> scannersByUUID;
+ final private SetMultimap<UUID, Advertisement> knownServices = HashMultimap.create();
+ final private Map<Integer, VScanner> scannersById = new HashMap<>();
+ final private SetMultimap<UUID, VScanner> scannersByUUID = HashMultimap.create();
- private Duration maxAge;
+ final private Duration maxAge;
public DeviceCache(Duration maxAge) {
- cachedDevices = new HashMap<>();
- knownIds = new HashMap<>();
- knownServices = HashMultimap.create();
- scannersById = new HashMap<>();
- scannersByUUID = HashMultimap.create();
this.maxAge = maxAge;
}
@@ -135,8 +131,8 @@
synchronized (this) {
int id = nextScanner.addAndGet(1);
scannersById.put(id, scanner);
- scannersByUUID.put(scanner.getmServiceUUID(), scanner);
- Set<Advertisement> knownAdvs = knownServices.get(scanner.getmServiceUUID());
+ scannersByUUID.put(scanner.getServiceUUID(), scanner);
+ Set<Advertisement> knownAdvs = knownServices.get(scanner.getServiceUUID());
if (knownAdvs != null) {
for (Advertisement adv : knownAdvs) {
scanner.getHandler().handleUpdate(adv);
@@ -153,7 +149,7 @@
synchronized (this) {
VScanner scanner = scannersById.get(id);
if (scanner != null) {
- scannersByUUID.remove(scanner.getmServiceUUID(), scanner);
+ scannersByUUID.remove(scanner.getServiceUUID(), scanner);
scannersById.remove(id);
}
}
diff --git a/lib/src/main/java/io/v/impl/google/lib/discovery/EncodingUtil.java b/lib/src/main/java/io/v/impl/google/lib/discovery/EncodingUtil.java
index 0441eb6..ca88cad 100644
--- a/lib/src/main/java/io/v/impl/google/lib/discovery/EncodingUtil.java
+++ b/lib/src/main/java/io/v/impl/google/lib/discovery/EncodingUtil.java
@@ -25,9 +25,10 @@
/**
* Encodes the addresses passed in.
- * @param addrs The list of addresses to encode.
- * @return The byte representation of the encoded addresses.
- * @throws IOException
+ *
+ * @param addrs the list of addresses to encode.
+ * @return the byte representation of the encoded addresses.
+ * @throws IOException if the address can't be encoded.
*/
public static byte[] packAddresses(List<String> addrs) throws IOException {
ByteArrayOutputStream stream = new ByteArrayOutputStream();
@@ -40,9 +41,10 @@
/**
* Decodes addresses from a byte array that was encoded by packAddresses
- * @param input The byte array toe decode
- * @return The list of addresses.
- * @throws IOException
+ *
+ * @param input the byte array toe decode
+ * @return the list of addresses.
+ * @throws IOException if the addresses can't be decoded.
*/
public static List<String> unpackAddresses(byte[] input) throws IOException {
ByteArrayInputStream stream = new ByteArrayInputStream(input);
@@ -61,11 +63,12 @@
/**
* Encode the encryption keys and algorithm passed in.
- * @param encryptionAlgorithm The encryption algorithm to use.
+ *
+ * @param encryptionAlgorithm the encryption algorithm to use.
* See io.v.x.ref.lib.discovery.Constants for valid values.
- * @param keys The keys to encode
- * @return The byte array that is the encoded form.
- * @throws IOException
+ * @param keys the keys to encode
+ * @return the byte array that is the encoded form.
+ * @throws IOException if the keys can't be encoded.
*/
public static byte[] packEncryptionKeys(int encryptionAlgorithm, List<EncryptionKey> keys)
throws IOException {
@@ -82,9 +85,10 @@
/**
* Decodes the encryption algorithm and keys that was encoded by packEncryptionKeys.
- * @param input The byte array containg the keys.
- * @return The keys and the encryption algorithm in input.
- * @throws IOException
+ *
+ * @param input the byte array containg the keys.
+ * @return the keys and the encryption algorithm in input.
+ * @throws IOException if the keys can't be decoded.
*/
public static KeysAndAlgorithm unpackEncryptionKeys(byte[] input) throws IOException {
ByteArrayInputStream stream = new ByteArrayInputStream(input);
@@ -103,21 +107,21 @@
}
/**
- * An object with EncryptionKeys and the encryption algorithm.
+ * Stores {@link EncryptionKey}s and the encryption algorithm.
*/
public static class KeysAndAlgorithm {
int encryptionAlgorithm;
List<EncryptionKey> keys;
/**
- * Gets the stored encryption algorithm.
+ * Returns the stored encryption algorithm.
*/
public int getEncryptionAlgorithm() {
return encryptionAlgorithm;
}
/**
- * Gets the stored keys.
+ * Returns the stored keys.
*/
public List<EncryptionKey> getKeys() {
return keys;
diff --git a/lib/src/main/java/io/v/impl/google/lib/discovery/ScanHandler.java b/lib/src/main/java/io/v/impl/google/lib/discovery/ScanHandler.java
index 20414b7..ec4268b 100644
--- a/lib/src/main/java/io/v/impl/google/lib/discovery/ScanHandler.java
+++ b/lib/src/main/java/io/v/impl/google/lib/discovery/ScanHandler.java
@@ -11,8 +11,7 @@
*/
public interface ScanHandler {
/**
- * Called when there is a new advertisement or an update
- * to and old advertisement
+ * Called when there is a new advertisement or an update to and old advertisement
*/
void handleUpdate(Advertisement advertisement);
}
diff --git a/lib/src/main/java/io/v/impl/google/lib/discovery/UUIDUtil.java b/lib/src/main/java/io/v/impl/google/lib/discovery/UUIDUtil.java
index 9413ce9..da95af9 100644
--- a/lib/src/main/java/io/v/impl/google/lib/discovery/UUIDUtil.java
+++ b/lib/src/main/java/io/v/impl/google/lib/discovery/UUIDUtil.java
@@ -12,8 +12,8 @@
import io.v.x.ref.lib.discovery.Uuid;
/**
- * A class for generating v5 UUIDs and converting from {@link java.util.UUID} and
- * {@link io.v.x.ref.lib.discovery.Uuid}. T{@link java.util.UUID} does not provide
+ * A class for generating v5 UUIDs and converting from {@link UUID} and
+ * {@link Uuid}. {@link UUID} does not provide
* any mechanism to compute v5 UUIDs which are used to convert InterfaceNames to service
* UUIDs. Conversion from {@link Uuid} and {@link UUID} is necessary because the VDL type
* and the native type are annoying to convert to.
@@ -24,7 +24,7 @@
public static native UUID UUIDForAttributeKey(String key);
/**
- * Converts from {@link java.util.UUID} to {@link Uuid}.
+ * Converts from {@link UUID} to {@link Uuid}.
*/
public static Uuid UUIDToUuid(UUID id) {
ByteBuffer b = ByteBuffer.allocate(16);
diff --git a/lib/src/main/java/io/v/impl/google/lib/discovery/VScanner.java b/lib/src/main/java/io/v/impl/google/lib/discovery/VScanner.java
index 7b6e411..1f99a4d 100644
--- a/lib/src/main/java/io/v/impl/google/lib/discovery/VScanner.java
+++ b/lib/src/main/java/io/v/impl/google/lib/discovery/VScanner.java
@@ -9,7 +9,7 @@
import io.v.impl.google.lib.discovery.ScanHandler;
/**
- * Wraps a {@link UUID} and a {@link ScanHandler}.
+ * Stores a {@link UUID} and a {@link ScanHandler}.
*/
public class VScanner {
private UUID serviceUUID;
@@ -21,10 +21,12 @@
this.handler = handler;
}
- public UUID getmServiceUUID() {
+ /** Returns the {@link UUID} */
+ public UUID getServiceUUID() {
return serviceUUID;
}
+ /** Returns the {@link ScanHandler} */
public ScanHandler getHandler() {
return handler;
}
diff --git a/lib/src/main/java/io/v/impl/google/lib/discovery/ble/BleAdvertisementConverter.java b/lib/src/main/java/io/v/impl/google/lib/discovery/ble/BleAdvertisementConverter.java
index fc1de67..03c966f 100644
--- a/lib/src/main/java/io/v/impl/google/lib/discovery/ble/BleAdvertisementConverter.java
+++ b/lib/src/main/java/io/v/impl/google/lib/discovery/ble/BleAdvertisementConverter.java
@@ -22,8 +22,7 @@
import io.v.x.ref.lib.discovery.plugins.ble.Constants;
/**
- * BleAdvertisementConverter converts from {@link Advertisement} to
- * the gatt Services and vice-versa.
+ * Converts from {@link Advertisement} to the gatt services and vice-versa.
*/
public class BleAdvertisementConverter {
private static Charset utf8 = Charset.forName("UTF-8");
@@ -65,8 +64,7 @@
}
/**
- * Converts from Map of Characteristic UUIDs -> values to a
- * {@link Advertisement}
+ * Converts from Map of Characteristic UUIDs -> values to a {@link Advertisement}
*
* @param attr the map of characteristic uuids to their values
* @return the Vanadium Advertisement based on characteristics.
diff --git a/lib/src/main/java/io/v/impl/google/namespace/NamespaceImpl.java b/lib/src/main/java/io/v/impl/google/namespace/NamespaceImpl.java
index 5ceec7c..a2fae33 100644
--- a/lib/src/main/java/io/v/impl/google/namespace/NamespaceImpl.java
+++ b/lib/src/main/java/io/v/impl/google/namespace/NamespaceImpl.java
@@ -4,6 +4,7 @@
package io.v.impl.google.namespace;
+import io.v.v23.VIterable;
import io.v.v23.rpc.Callback;
import org.joda.time.Duration;
@@ -25,11 +26,11 @@
public class NamespaceImpl implements Namespace {
private final long nativePtr;
- private static native Iterable<GlobReply> nativeGlob(
+ private static native VIterable<GlobReply> nativeGlob(
long nativePtr, VContext context, String pattern, Options options) throws VException;
private static native void nativeGlobAsync(long nativePtr, VContext context, String pattern,
- Options options, Callback<Iterable<GlobReply>>
+ Options options, Callback<VIterable<GlobReply>>
callback) throws VException;
private static native void nativeMount(long nativePtr, VContext context, String name,
@@ -221,25 +222,25 @@
}
@Override
- public Iterable<GlobReply> glob(VContext context, String pattern) throws VException {
+ public VIterable<GlobReply> glob(VContext context, String pattern) throws VException {
return glob(context, pattern, (Options) null);
}
@Override
- public Iterable<GlobReply> glob(VContext context, String pattern, Options options)
+ public VIterable<GlobReply> glob(VContext context, String pattern, Options options)
throws VException {
return nativeGlob(nativePtr, context, pattern, options);
}
@Override
- public void glob(VContext context, String pattern, Callback<Iterable<GlobReply>> callback)
+ public void glob(VContext context, String pattern, Callback<VIterable<GlobReply>> callback)
throws VException {
glob(context, pattern, null, callback);
}
@Override
public void glob(VContext context, String pattern, Options options,
- Callback<Iterable<GlobReply>> callback) throws VException {
+ Callback<VIterable<GlobReply>> callback) throws VException {
nativeGlobAsync(nativePtr, context, pattern, options, callback);
}
diff --git a/lib/src/main/java/io/v/v23/VIterable.java b/lib/src/main/java/io/v/v23/VIterable.java
new file mode 100644
index 0000000..9e12368
--- /dev/null
+++ b/lib/src/main/java/io/v/v23/VIterable.java
@@ -0,0 +1,26 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package io.v.v23;
+
+import io.v.v23.verror.VException;
+
+/**
+ * An interface for iterating through a collection of elements.
+ * <p>
+ * The {@link java.util.Iterator} returned by the {@link java.lang.Iterable#iterator iterator}:
+ * <p><ul>
+ * <li>can be created <strong>only</strong> once,
+ * <li>does not support {@link java.util.Iterator#remove remove}</li>,
+ * <li>terminates early if the underlying iteration step fails; in that case, {@link #error}
+ * method returns the error that caused the iteration step to fail.
+ * </ul>
+ */
+public interface VIterable<T> extends Iterable<T> {
+ /**
+ * Returns an error (if any) that caused the iterator to terminate early. Returns {@code null}
+ * if the iterator terminated gracefully.
+ */
+ VException error();
+}
diff --git a/lib/src/main/java/io/v/v23/namespace/Namespace.java b/lib/src/main/java/io/v/v23/namespace/Namespace.java
index 408436f..4ad4138 100644
--- a/lib/src/main/java/io/v/v23/namespace/Namespace.java
+++ b/lib/src/main/java/io/v/v23/namespace/Namespace.java
@@ -4,6 +4,7 @@
package io.v.v23.namespace;
+import io.v.v23.VIterable;
import io.v.v23.rpc.Callback;
import org.joda.time.Duration;
@@ -248,7 +249,7 @@
* A shortcut for {@link #glob(VContext, String, Options)} with a {@code null} options
* parameter.
*/
- Iterable<GlobReply> glob(VContext context, String pattern) throws VException;
+ VIterable<GlobReply> glob(VContext context, String pattern) throws VException;
/**
* Returns the iterator over all names matching the provided pattern. Note that due to the
@@ -266,6 +267,9 @@
* <ul>
* <li>{@link io.v.v23.OptionDefs#SKIP_SERVER_ENDPOINT_AUTHORIZATION}</li>
* </ul>
+ * <p>
+ * {@link io.v.v23.context.CancelableVContext#cancel Canceling} the provided context will
+ * stop the glob operation and terminate the iterator early.
*
* @param context a client context
* @param pattern a pattern that should be matched
@@ -273,43 +277,22 @@
* @return an iterator over {@link GlobReply} objects matching the provided pattern
* @throws VException if an error is encountered
*/
- Iterable<GlobReply> glob(VContext context, String pattern, Options options)
- throws VException;
+ VIterable<GlobReply> glob(VContext context, String pattern, Options options) throws VException;
/**
* A shortcut for {@link #glob(VContext, String, Options, Callback)} with a {@code null} options
* parameter.
*/
- void glob(VContext context, String pattern, Callback<Iterable<GlobReply>> callback)
+ void glob(VContext context, String pattern, Callback<VIterable<GlobReply>> callback)
throws VException;
/**
- * Asynchronously returns the iterator over all names matching the provided pattern. This
- * function returns immediately and the given non-{@code null} callback is called when the
- * operation completes (either successfully or with a failure). Generally, the callback will
- * be called when at least one entry can be read from the iterator. Subsequent calls to
- * {@code next} may block. You should not use the iterator on threads that should not block.
- * <p>
- * You should be aware that the iterator:
- * <p><ul>
- * <li>can be created <strong>only</strong> once</li>
- * <li>does not support {@link java.util.Iterator#remove remove}</li>
- * </ul>
- * <p>
- * A particular implementation of this interface chooses which options to support, but at the
- * minimum it must handle the following pre-defined options:
- * <ul>
- * <li>{@link io.v.v23.OptionDefs#SKIP_SERVER_ENDPOINT_AUTHORIZATION}</li>
- * </ul>
+ * Asynchronous version of {@link #glob(VContext, String, Options)}.
*
- * @param context a client context
- * @param pattern a pattern that should be matched
- * @param options options to pass to the implementation as described above, or {@code null}
- * @param callback a callback whose {@code onSuccess} method will be passed the {@link Iterable}
- * over {@link GlobReply} objects matching the provided pattern
- * @throws VException if an error is encountered
+ * @throws VException if there was an error creating the asynchronous call. In this case, no
+ * methods on {@code callback} will be called.
*/
- void glob(VContext context, String pattern, Options options, Callback<Iterable<GlobReply>>
+ void glob(VContext context, String pattern, Options options, Callback<VIterable<GlobReply>>
callback) throws VException;
/**
@@ -322,7 +305,8 @@
*
* @param roots the roots that will be used to turn relative paths into absolute paths, or
* {@code null} to clear the currently configured set of roots. Each entry should
- * be a Vanadium name, see also <a href="https://github.com/vanadium/docs/blob/master/glossary.md#object-name">
+ * be a Vanadium name, see also
+ * <a href="https://github.com/vanadium/docs/blob/master/glossary.md#object-name">
* the Name entry</a> in the glossary
*/
void setRoots(List<String> roots) throws VException;
diff --git a/lib/src/main/java/io/v/v23/syncbase/nosql/BlobReader.java b/lib/src/main/java/io/v/v23/syncbase/nosql/BlobReader.java
index 1409bd3..edf4961 100644
--- a/lib/src/main/java/io/v/v23/syncbase/nosql/BlobReader.java
+++ b/lib/src/main/java/io/v/v23/syncbase/nosql/BlobReader.java
@@ -4,6 +4,7 @@
package io.v.v23.syncbase.nosql;
+import io.v.v23.VIterable;
import io.v.v23.context.VContext;
import io.v.v23.services.syncbase.nosql.BlobFetchStatus;
import io.v.v23.services.syncbase.nosql.BlobRef;
@@ -55,10 +56,13 @@
* priority blobs are prefetched before the lower priority ones. However an ongoing blob
* transfer is not interrupted.
* <p>
- * The return {@link Stream} can be used to track the progress of the prefetch. When the
- * {@link Stream} exhausts all of the iterable elements, the blob is guaranteed to have
+ * The returned iterator can be used to track the progress of the prefetch. When the
+ * iterator exhausts all of the iterable elements, the blob is guaranteed to have
* been entirely copied to a local cache.
* <p>
+ * {@link io.v.v23.context.CancelableVContext#cancel Canceling} the provided context will
+ * stop the prefetch and terminate the iterator early.
+ * <p>
* This method doesn't block.
*
* @param ctx vanadium context
@@ -66,7 +70,7 @@
* @return a stream used for tracking the progress of the prefetch
* @throws VException if the blob couldn't be prefetched
*/
- Stream<BlobFetchStatus> prefetch(VContext ctx, long priority) throws VException;
+ VIterable<BlobFetchStatus> prefetch(VContext ctx, long priority) throws VException;
/**
* Deletes the blob's local cached copy.
diff --git a/lib/src/main/java/io/v/v23/syncbase/nosql/BlobReaderImpl.java b/lib/src/main/java/io/v/v23/syncbase/nosql/BlobReaderImpl.java
index 63b69da..7323e39 100644
--- a/lib/src/main/java/io/v/v23/syncbase/nosql/BlobReaderImpl.java
+++ b/lib/src/main/java/io/v/v23/syncbase/nosql/BlobReaderImpl.java
@@ -4,11 +4,13 @@
package io.v.v23.syncbase.nosql;
+import io.v.v23.VIterable;
import io.v.v23.context.VContext;
import io.v.v23.services.syncbase.nosql.BlobFetchStatus;
import io.v.v23.services.syncbase.nosql.BlobRef;
import io.v.v23.services.syncbase.nosql.DatabaseClient;
import io.v.v23.vdl.TypedClientStream;
+import io.v.v23.vdl.TypedStreamIterable;
import io.v.v23.vdl.VdlUint64;
import io.v.v23.verror.VException;
@@ -35,10 +37,10 @@
return new BlobInputStream(stream);
}
@Override
- public Stream<BlobFetchStatus> prefetch(VContext ctx, long priority) throws VException {
+ public VIterable<BlobFetchStatus> prefetch(VContext ctx, long priority) throws VException {
TypedClientStream<Void, BlobFetchStatus, Void> stream =
client.fetchBlob(ctx, ref, new VdlUint64(priority));
- return new StreamImpl(ctx.withCancel(), stream);
+ return new TypedStreamIterable(stream);
}
@Override
public long size(VContext ctx) throws VException {
diff --git a/lib/src/main/java/io/v/v23/syncbase/nosql/Database.java b/lib/src/main/java/io/v/v23/syncbase/nosql/Database.java
index 924a5cd..1a5ebb6 100644
--- a/lib/src/main/java/io/v/v23/syncbase/nosql/Database.java
+++ b/lib/src/main/java/io/v/v23/syncbase/nosql/Database.java
@@ -3,6 +3,7 @@
// license that can be found in the LICENSE file.
package io.v.v23.syncbase.nosql;
+import io.v.v23.VIterable;
import io.v.v23.context.VContext;
import io.v.v23.security.access.Permissions;
import io.v.v23.services.syncbase.nosql.BatchOptions;
@@ -82,7 +83,7 @@
/**
* Allows a client to watch for updates to the database. For each watch request, the client will
- * receive a reliable stream of watch events without re-ordering.
+ * receive a reliable iterator of watch events without re-ordering.
* <p>
* See {@link io.v.v23.services.watch.GlobWatcherClient} for a detailed explanation of the
* watch behavior.
@@ -97,16 +98,18 @@
* <li>abort the batch,</li>
* <li>start watching for changes to the data using the {@link ResumeMarker}.</li>
* </ol><p>
+ * {@link io.v.v23.context.CancelableVContext#cancel Canceling} the provided context will
+ * stop the watch operation and terminate the iterator early.
*
* @param ctx vanadium context
* @param tableRelativeName relative name of the table to watch
* @param rowPrefix prefix of the rows to watch
* @param resumeMarker {@link ResumeMarker} from which the changes will be monitored
- * @return a stream of changes
+ * @return a (potentially-infinite) iterator of changes
* @throws VException if there was an error setting up this watch request
*/
- Stream<WatchChange> watch(VContext ctx, String tableRelativeName, String rowPrefix,
- ResumeMarker resumeMarker) throws VException;
+ VIterable<WatchChange> watch(VContext ctx, String tableRelativeName, String rowPrefix,
+ ResumeMarker resumeMarker) throws VException;
/**
* Returns a handle to a database {@link Syncgroup} with the given full (i.e., object) name.
diff --git a/lib/src/main/java/io/v/v23/syncbase/nosql/DatabaseCore.java b/lib/src/main/java/io/v/v23/syncbase/nosql/DatabaseCore.java
index 1ef0b98..e5f465c 100644
--- a/lib/src/main/java/io/v/v23/syncbase/nosql/DatabaseCore.java
+++ b/lib/src/main/java/io/v/v23/syncbase/nosql/DatabaseCore.java
@@ -3,6 +3,7 @@
// license that can be found in the LICENSE file.
package io.v.v23.syncbase.nosql;
+import io.v.v23.VIterable;
import io.v.v23.context.VContext;
import io.v.v23.services.watch.ResumeMarker;
import io.v.v23.vdl.VdlAny;
@@ -42,20 +43,24 @@
String[] listTables(VContext ctx) throws VException;
/**
- * Executes a SyncQL query, returning a {@link ResultStream} object that allows the caller to
+ * Executes a SyncQL query, returning a {@link QueryResults} object that allows the caller to
* iterate over arrays of values for each row that matches the query.
* <p>
* It is legal to perform writes concurrently with {@link #exec exec()}. The returned stream reads
* from a consistent snapshot taken at the time of the method and will not reflect subsequent
* writes to keys not yet reached by the stream.
+ * <p>
+ * {@link io.v.v23.context.CancelableVContext#cancel Canceling} the provided context will
+ * stop the query execution and terminate the returned iterator early.
+
*
* @param ctx Vanadium context
* @param query a SyncQL query
- * @return a {@link ResultStream} object that allows the caller to iterate over
+ * @return a {@link QueryResults} object that allows the caller to iterate over
* arrays of values for each row that matches the query
* @throws VException if there was an error executing the query
*/
- ResultStream exec(VContext ctx, String query) throws VException;
+ QueryResults exec(VContext ctx, String query) throws VException;
/**
* Returns the {@link ResumeMarker} that points to the current state of the database.
@@ -68,7 +73,7 @@
* An interface for iterating through rows resulting from a
* {@link DatabaseCore#exec DatabaseCore.exec()}.
*/
- interface ResultStream extends Stream<List<VdlAny>> {
+ interface QueryResults extends VIterable<List<VdlAny>> {
/**
* Returns an array of column names that matched the query. The size of the {@link VdlAny}
* list returned in every iteration will match the size of this array.
diff --git a/lib/src/main/java/io/v/v23/syncbase/nosql/DatabaseImpl.java b/lib/src/main/java/io/v/v23/syncbase/nosql/DatabaseImpl.java
index cd063f2..501680c 100644
--- a/lib/src/main/java/io/v/v23/syncbase/nosql/DatabaseImpl.java
+++ b/lib/src/main/java/io/v/v23/syncbase/nosql/DatabaseImpl.java
@@ -18,6 +18,7 @@
import com.google.common.collect.Iterators;
import io.v.impl.google.naming.NamingUtil;
+import io.v.v23.VIterable;
import io.v.v23.services.syncbase.nosql.BatchOptions;
import io.v.v23.services.syncbase.nosql.BlobRef;
import io.v.v23.services.syncbase.nosql.DatabaseClient;
@@ -32,6 +33,7 @@
import io.v.v23.context.VContext;
import io.v.v23.security.access.Permissions;
import io.v.v23.vdl.TypedClientStream;
+import io.v.v23.vdl.TypedStreamIterable;
import io.v.v23.vdl.Types;
import io.v.v23.vdl.VdlAny;
import io.v.v23.vdl.VdlOptional;
@@ -77,10 +79,9 @@
return x.toArray(new String[x.size()]);
}
@Override
- public ResultStream exec(VContext ctx, String query) throws VException {
- CancelableVContext ctxC = ctx.withCancel();
+ public QueryResults exec(VContext ctx, String query) throws VException {
TypedClientStream<Void, List<VdlAny>, Void> stream =
- this.client.exec(ctxC, getSchemaVersion(), query);
+ this.client.exec(ctx, getSchemaVersion(), query);
// The first row contains column names, pull them off the stream.
List<VdlAny> row = null;
@@ -99,7 +100,7 @@
"names (of type String), got type: " + elem.getClass());
}
}
- return new ResultStreamImpl(ctxC, stream, Arrays.asList(columnNames));
+ return new QueryResultsImpl(stream, Arrays.asList(columnNames));
}
// Implements AccessController interface.
@@ -134,12 +135,11 @@
return new DatabaseImpl(this.parentFullName, this.name, batchSuffix, this.schema);
}
@Override
- public Stream<WatchChange> watch(VContext ctx, String tableRelativeName, String rowPrefix,
- ResumeMarker resumeMarker) throws VException {
- CancelableVContext ctxC = ctx.withCancel();
- TypedClientStream<Void, Change, Void> stream = this.client.watchGlob(ctxC,
+ public VIterable<WatchChange> watch(VContext ctx, String tableRelativeName, String rowPrefix,
+ ResumeMarker resumeMarker) throws VException {
+ TypedClientStream<Void, Change, Void> stream = this.client.watchGlob(ctx,
new GlobRequest(NamingUtil.join(tableRelativeName, rowPrefix + "*"), resumeMarker));
- return new StreamImpl(ctxC, stream) {
+ return new TypedStreamIterable(stream) {
@Override
public synchronized Iterator iterator() {
return Iterators.transform(super.iterator(), new Function<Change, WatchChange>() {
@@ -249,12 +249,13 @@
return this.schema.getMetadata().getVersion();
}
- private static class ResultStreamImpl extends StreamImpl<List<VdlAny>> implements ResultStream {
+ private static class QueryResultsImpl
+ extends TypedStreamIterable<List<VdlAny>> implements QueryResults {
private final List<String> columnNames;
- private ResultStreamImpl(CancelableVContext ctxC, TypedClientStream<Void,
- List<VdlAny>, Void> stream, List<String> columnNames) {
- super(ctxC, stream);
+ private QueryResultsImpl(TypedClientStream<Void, List<VdlAny>, Void> stream,
+ List<String> columnNames) {
+ super(stream);
this.columnNames = columnNames;
}
@Override
diff --git a/lib/src/main/java/io/v/v23/syncbase/nosql/Stream.java b/lib/src/main/java/io/v/v23/syncbase/nosql/Stream.java
deleted file mode 100644
index 4429170..0000000
--- a/lib/src/main/java/io/v/v23/syncbase/nosql/Stream.java
+++ /dev/null
@@ -1,32 +0,0 @@
-// Copyright 2015 The Vanadium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-package io.v.v23.syncbase.nosql;
-
-import io.v.v23.verror.VException;
-
-/**
- * An interface for iterating through a collection of elements.
- *
- * The {@link java.util.Iterator} returned by the {@link java.lang.Iterable#iterator iterator}:
- * <p><ul>
- * <li>can be created <strong>only</strong> once,
- * <li>does not support {@link java.util.Iterator#remove remove}</li>,
- * <li>stops the iteration early and gracefully if the {@link #cancel} method is invoked,</li>
- * <li>may throw {@link RuntimeException} if the underlying iteration step throws
- * a {@link VException}. The {@link RuntimeException#getCause cause} of the
- * {@link RuntimeException} will be the said {@link VException}.</li>
- * </ul>
- */
-public interface Stream<T> extends Iterable<T> {
- /**
- * Notifies the stream provider that it can stop producing elements. The client must call
- * {@link #cancel} if it does not iterate through all the elements.
- * <p>
- * This method is idempotent and can be called concurrently with a thread that is iterating.
- * <p>
- * This method causes the iterator to (gracefully) terminate early.
- */
- void cancel() throws VException;
-}
diff --git a/lib/src/main/java/io/v/v23/syncbase/nosql/StreamImpl.java b/lib/src/main/java/io/v/v23/syncbase/nosql/StreamImpl.java
deleted file mode 100644
index f3a7ede..0000000
--- a/lib/src/main/java/io/v/v23/syncbase/nosql/StreamImpl.java
+++ /dev/null
@@ -1,61 +0,0 @@
-// Copyright 2015 The Vanadium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-package io.v.v23.syncbase.nosql;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.AbstractIterator;
-import io.v.v23.context.CancelableVContext;
-import io.v.v23.vdl.TypedClientStream;
-import io.v.v23.verror.Errors;
-import io.v.v23.verror.VException;
-
-import java.io.EOFException;
-import java.util.Iterator;
-
-/**
- * Implementation of the {@link Stream} interface that reads from a VDL stream.
- */
-class StreamImpl<T> implements Stream<T> {
- private final CancelableVContext ctxC;
- private final TypedClientStream<Void, T, Void> stream;
- private volatile boolean isCanceled;
- private volatile boolean isCreated;
-
- StreamImpl(CancelableVContext ctxC, TypedClientStream<Void, T, Void> stream) {
- this.ctxC = ctxC;
- this.stream = stream;
- this.isCanceled = this.isCreated = false;
- }
- @Override
- public synchronized Iterator<T> iterator() {
- Preconditions.checkState(!isCreated, "Can only create one Stream iterator.");
- isCreated = true;
- return new AbstractIterator<T>() {
- @Override
- protected T computeNext() {
- synchronized (StreamImpl.this) {
- if (isCanceled) { // client canceled the stream
- return endOfData();
- }
- try {
- return stream.recv();
- } catch (EOFException e) { // legitimate end of stream
- return endOfData();
- } catch (VException e) {
- if (isCanceled || Errors.CANCELED.getID().equals(e.getID())) {
- return endOfData();
- }
- throw new RuntimeException("Error retrieving next stream element.", e);
- }
- }
- }
- };
- }
- @Override
- public synchronized void cancel() throws VException {
- this.isCanceled = true;
- this.ctxC.cancel();
- }
-}
diff --git a/lib/src/main/java/io/v/v23/syncbase/nosql/Table.java b/lib/src/main/java/io/v/v23/syncbase/nosql/Table.java
index 53dc329..dedbe9b 100644
--- a/lib/src/main/java/io/v/v23/syncbase/nosql/Table.java
+++ b/lib/src/main/java/io/v/v23/syncbase/nosql/Table.java
@@ -6,6 +6,7 @@
import java.lang.reflect.Type;
+import io.v.v23.VIterable;
import io.v.v23.context.VContext;
import io.v.v23.security.access.Permissions;
import io.v.v23.services.syncbase.nosql.KeyValue;
@@ -131,13 +132,16 @@
* It is legal to perform writes concurrently with {@link #scan scan()}. The returned stream
* reads from a consistent snapshot taken at the time of the method and will not reflect
* subsequent writes to keys not yet reached by the stream.
+ * <p>
+ * {@link io.v.v23.context.CancelableVContext#cancel Canceling} the provided context will
+ * stop the scan and terminate the iterator early.
*
* @param ctx Vanadium context
* @param range range of rows to be read
* @return a stream used for iterating over the snapshot of the provided rows
* @throws VException if the scan stream couldn't be created
*/
- Stream<KeyValue> scan(VContext ctx, RowRange range) throws VException;
+ VIterable<KeyValue> scan(VContext ctx, RowRange range) throws VException;
/**
* Returns an array of {@link PrefixPermissions} (i.e., {@code (prefix, perms)} pairs) for
diff --git a/lib/src/main/java/io/v/v23/syncbase/nosql/TableImpl.java b/lib/src/main/java/io/v/v23/syncbase/nosql/TableImpl.java
index 1f08a27..0e5fc67 100644
--- a/lib/src/main/java/io/v/v23/syncbase/nosql/TableImpl.java
+++ b/lib/src/main/java/io/v/v23/syncbase/nosql/TableImpl.java
@@ -8,6 +8,7 @@
import java.util.List;
import io.v.impl.google.naming.NamingUtil;
+import io.v.v23.VIterable;
import io.v.v23.services.syncbase.nosql.KeyValue;
import io.v.v23.services.syncbase.nosql.PrefixPermissions;
import io.v.v23.services.syncbase.nosql.TableClient;
@@ -17,6 +18,7 @@
import io.v.v23.context.VContext;
import io.v.v23.security.access.Permissions;
import io.v.v23.vdl.TypedClientStream;
+import io.v.v23.vdl.TypedStreamIterable;
import io.v.v23.verror.VException;
class TableImpl implements Table {
@@ -86,11 +88,10 @@
Util.getBytes(range.getStart()), Util.getBytes(range.getLimit()));
}
@Override
- public Stream<KeyValue> scan(VContext ctx, RowRange range) throws VException {
- CancelableVContext ctxC = ctx.withCancel();
- TypedClientStream<Void, KeyValue, Void> stream = this.client.scan(ctxC, this.schemaVersion,
+ public VIterable<KeyValue> scan(VContext ctx, RowRange range) throws VException {
+ TypedClientStream<Void, KeyValue, Void> stream = this.client.scan(ctx, this.schemaVersion,
Util.getBytes(range.getStart()), Util.getBytes(range.getLimit()));
- return new StreamImpl(ctxC, stream);
+ return new TypedStreamIterable(stream);
}
@Override
public PrefixPermissions[] getPrefixPermissions(VContext ctx, String key) throws VException {
diff --git a/lib/src/main/java/io/v/v23/vdl/TypedStreamIterable.java b/lib/src/main/java/io/v/v23/vdl/TypedStreamIterable.java
new file mode 100644
index 0000000..9dbcb61
--- /dev/null
+++ b/lib/src/main/java/io/v/v23/vdl/TypedStreamIterable.java
@@ -0,0 +1,54 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package io.v.v23.vdl;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.AbstractIterator;
+import io.v.v23.VIterable;
+import io.v.v23.verror.Errors;
+import io.v.v23.verror.VException;
+
+import java.io.EOFException;
+import java.util.Iterator;
+
+/**
+ * Implementation of {@link VIterable} that reads from a {@link TypedClientStream}.
+ */
+public class TypedStreamIterable<T> implements VIterable<T> {
+ private final TypedClientStream<Void, T, Void> stream;
+ private volatile boolean isCreated;
+ protected VException error;
+
+ public TypedStreamIterable(TypedClientStream<Void, T, Void> stream) {
+ this.stream = stream;
+ }
+ @Override
+ public synchronized Iterator<T> iterator() {
+ Preconditions.checkState(!isCreated, "Can only create one Stream iterator.");
+ isCreated = true;
+ return new AbstractIterator<T>() {
+ @Override
+ protected T computeNext() {
+ synchronized (TypedStreamIterable.this) {
+ try {
+ return stream.recv();
+ } catch (EOFException e) { // legitimate end of stream
+ return endOfData();
+ } catch (VException e) {
+ if (!Errors.CANCELED.getID().equals(e.getID())) { // stream not canceled
+ error = e;
+ }
+ return endOfData();
+ }
+ }
+ }
+ };
+ }
+
+ @Override
+ public VException error() {
+ return error;
+ }
+}
diff --git a/lib/src/test/java/io/v/impl/google/lib/discovery/DeviceCacheTest.java b/lib/src/test/java/io/v/impl/google/lib/discovery/DeviceCacheTest.java
index 5cf666e..b3c0479 100644
--- a/lib/src/test/java/io/v/impl/google/lib/discovery/DeviceCacheTest.java
+++ b/lib/src/test/java/io/v/impl/google/lib/discovery/DeviceCacheTest.java
@@ -17,7 +17,7 @@
import io.v.x.ref.lib.discovery.Uuid;
/**
- * Test for {@link DeviceCache}
+ * Tests for {@link DeviceCache}.
*/
public class DeviceCacheTest extends TestCase {
private abstract class CountingHandler implements ScanHandler {
diff --git a/lib/src/test/java/io/v/impl/google/lib/discovery/EncodingUtilTest.java b/lib/src/test/java/io/v/impl/google/lib/discovery/EncodingUtilTest.java
index 55a9d35..0c135ca 100644
--- a/lib/src/test/java/io/v/impl/google/lib/discovery/EncodingUtilTest.java
+++ b/lib/src/test/java/io/v/impl/google/lib/discovery/EncodingUtilTest.java
@@ -4,23 +4,19 @@
package io.v.impl.google.lib.discovery;
-import com.google.common.primitives.Bytes;
-
import junit.framework.TestCase;
import java.io.IOException;
-import java.util.ArrayList;
+
import java.util.Arrays;
-import java.util.List;
import io.v.x.ref.lib.discovery.EncryptionAlgorithm;
-import io.v.x.ref.lib.discovery.EncryptionKey;
import io.v.x.ref.lib.discovery.testdata.PackAddressTest;
import io.v.x.ref.lib.discovery.testdata.Constants;
import io.v.x.ref.lib.discovery.testdata.PackEncryptionKeysTest;
/**
- * Test for {@link EncodingUtil}
+ * Tests for {@link EncodingUtil}.
*/
public class EncodingUtilTest extends TestCase {
public void testPackAddresses() throws IOException {
diff --git a/lib/src/test/java/io/v/impl/google/lib/discovery/UUIDUtilTest.java b/lib/src/test/java/io/v/impl/google/lib/discovery/UUIDUtilTest.java
index f9e5db9..f59c3cd 100644
--- a/lib/src/test/java/io/v/impl/google/lib/discovery/UUIDUtilTest.java
+++ b/lib/src/test/java/io/v/impl/google/lib/discovery/UUIDUtilTest.java
@@ -14,7 +14,11 @@
import io.v.x.ref.lib.discovery.testdata.UuidTestData;
/**
+<<<<<<< HEAD
* Tests for {@link UUIDUtil}
+=======
+ * Tests for {@link UUIDUtil}.
+>>>>>>> ble
*/
public class UUIDUtilTest extends TestCase {
public void testInterfaceNameUUID() {
diff --git a/lib/src/test/java/io/v/impl/google/namespace/NamespaceTest.java b/lib/src/test/java/io/v/impl/google/namespace/NamespaceTest.java
index 91f5c80..cee1c9e 100644
--- a/lib/src/test/java/io/v/impl/google/namespace/NamespaceTest.java
+++ b/lib/src/test/java/io/v/impl/google/namespace/NamespaceTest.java
@@ -12,6 +12,7 @@
import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
+import io.v.v23.VIterable;
import junit.framework.TestCase;
import org.joda.time.Duration;
@@ -190,7 +191,7 @@
Namespace n = V.getNamespace(ctx);
n.mount(ctx, "test/test", dummyServerEndpoint.name(), Duration.standardDays(1));
{
- final FutureCallback<Iterable<GlobReply>> callback = new FutureCallback<>();
+ final FutureCallback<VIterable<GlobReply>> callback = new FutureCallback<>();
n.glob(ctx, "test/*", callback);
List<GlobReply> reply = ImmutableList.copyOf(
@@ -207,7 +208,7 @@
n.mount(ctx, "test/test", dummyServerEndpoint.name(), Duration.standardDays(1));
CancelableVContext cancelContext = ctx.withCancel();
- FutureCallback<Iterable<GlobReply>> callback = new FutureCallback<>();
+ FutureCallback<VIterable<GlobReply>> callback = new FutureCallback<>();
n.glob(cancelContext, "test/*", callback);
cancelContext.cancel();
List<GlobReply> replies = ImmutableList.copyOf(Uninterruptibles.getUninterruptibly(callback.getFuture(), 1, TimeUnit.SECONDS));
diff --git a/lib/src/test/java/io/v/v23/syncbase/SyncbaseTest.java b/lib/src/test/java/io/v/v23/syncbase/SyncbaseTest.java
index 9da28a8..b9ee6a3 100644
--- a/lib/src/test/java/io/v/v23/syncbase/SyncbaseTest.java
+++ b/lib/src/test/java/io/v/v23/syncbase/SyncbaseTest.java
@@ -10,6 +10,7 @@
import com.google.common.io.Files;
import io.v.impl.google.naming.NamingUtil;
import io.v.impl.google.services.syncbase.SyncbaseServer;
+import io.v.v23.VIterable;
import io.v.v23.context.CancelableVContext;
import io.v.v23.naming.Endpoint;
import io.v.v23.rpc.ListenSpec;
@@ -19,6 +20,7 @@
import io.v.v23.services.syncbase.nosql.SyncgroupMemberInfo;
import io.v.v23.services.syncbase.nosql.TableRow;
import io.v.v23.services.syncbase.nosql.SyncgroupSpec;
+import io.v.v23.services.watch.ResumeMarker;
import io.v.v23.syncbase.nosql.BatchDatabase;
import io.v.v23.syncbase.nosql.BlobReader;
import io.v.v23.syncbase.nosql.BlobWriter;
@@ -27,7 +29,6 @@
import io.v.v23.syncbase.nosql.DatabaseCore;
import io.v.v23.syncbase.nosql.Row;
import io.v.v23.syncbase.nosql.RowRange;
-import io.v.v23.syncbase.nosql.Stream;
import io.v.v23.syncbase.nosql.Syncgroup;
import io.v.v23.syncbase.nosql.Table;
import io.v.v23.syncbase.util.Util;
@@ -211,16 +212,16 @@
table.put(ctx, "baz", baz, Baz.class);
{
- DatabaseCore.ResultStream stream = db.exec(ctx,
+ DatabaseCore.QueryResults results = db.exec(ctx,
"select k, v.Name from " + TABLE_NAME + " where Type(v) like \"%Baz\"");
- assertThat(stream.columnNames()).containsExactly("k", "v.Name");
- assertThat(stream).containsExactly(ImmutableList.of(
+ assertThat(results.columnNames()).containsExactly("k", "v.Name");
+ assertThat(results).containsExactly(ImmutableList.of(
new VdlAny(String.class, "baz"), new VdlAny(String.class, baz.name)));
}
{
- DatabaseCore.ResultStream stream = db.exec(ctx, "select k, v from " + TABLE_NAME);
- assertThat(stream.columnNames()).containsExactly("k", "v");
- assertThat(stream).containsExactly(
+ DatabaseCore.QueryResults results = db.exec(ctx, "select k, v from " + TABLE_NAME);
+ assertThat(results.columnNames()).containsExactly("k", "v");
+ assertThat(results).containsExactly(
ImmutableList.of(new VdlAny(String.class, "bar"), new VdlAny(Bar.class, bar)),
ImmutableList.of(new VdlAny(String.class, "baz"), new VdlAny(Baz.class, baz)),
ImmutableList.of(new VdlAny(String.class, "foo"), new VdlAny(Foo.class, foo))
@@ -234,8 +235,8 @@
Foo foo = new Foo(4, "f");
Bar bar = new Bar(0.5f, "b");
Baz baz = new Baz("John Doe", true);
+ ResumeMarker marker = db.getResumeMarker(ctx);
- Stream<WatchChange> watchStream = db.watch(ctx, TABLE_NAME, "b", db.getResumeMarker(ctx));
table.put(ctx, "foo", foo, Foo.class);
table.put(ctx, "bar", bar, Bar.class);
table.put(ctx, "baz", baz, Baz.class);
@@ -248,10 +249,11 @@
VomUtil.encode(baz, Baz.class), null, false, false),
new WatchChange(TABLE_NAME, "baz", ChangeType.DELETE_CHANGE,
new byte[0], null, false, false ));
- Iterator<WatchChange> iterator = watchStream.iterator();
+ CancelableVContext ctxC = ctx.withCancel();
+ Iterator<WatchChange> it = db.watch(ctxC, TABLE_NAME, "b", marker).iterator();
for (WatchChange expected : expectedChanges) {
- assertThat(iterator.hasNext());
- WatchChange actual = iterator.next();
+ assertThat(it.hasNext());
+ WatchChange actual = it.next();
assertThat(actual.getTableName()).isEqualTo(expected.getTableName());
assertThat(actual.getRowName()).isEqualTo(expected.getRowName());
assertThat(actual.getChangeType()).isEqualTo(expected.getChangeType());
@@ -259,7 +261,7 @@
assertThat(actual.isFromSync()).isEqualTo(expected.isFromSync());
assertThat(actual.isContinued()).isEqualTo(expected.isContinued());
}
- watchStream.cancel();
+ ctxC.cancel();
}
public void testDatabaseWatchWithContextCancel() throws Exception {
@@ -267,7 +269,7 @@
Database db = createDatabase(createApp(createService()));
createTable(db);
- Stream<WatchChange> watchStream = db.watch(
+ VIterable<WatchChange> it = db.watch(
cancelCtx, TABLE_NAME, "b", db.getResumeMarker(ctx));
new Thread(new Runnable() {
@Override
@@ -275,7 +277,7 @@
cancelCtx.cancel();
}
}).start();
- assertThat(watchStream).isEmpty();
+ assertThat(it).isEmpty();
}
public void testBatch() throws Exception {
diff --git a/lib/src/test/java/io/v/x/jni/test/fortune/FortuneTest.java b/lib/src/test/java/io/v/x/jni/test/fortune/FortuneTest.java
index 20eaffe..1bfcd94 100644
--- a/lib/src/test/java/io/v/x/jni/test/fortune/FortuneTest.java
+++ b/lib/src/test/java/io/v/x/jni/test/fortune/FortuneTest.java
@@ -6,6 +6,21 @@
import com.google.common.collect.ImmutableList;
import com.google.common.reflect.TypeToken;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.common.util.concurrent.Uninterruptibles;
+
+import junit.framework.TestCase;
+
+import org.joda.time.Duration;
+
+import java.io.EOFException;
+import java.lang.reflect.Type;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
import io.v.v23.OutputChannel;
import io.v.v23.V;
import io.v.v23.context.CancelableVContext;
@@ -26,17 +41,6 @@
import io.v.v23.vdlroot.signature.Interface;
import io.v.v23.vdlroot.signature.Method;
import io.v.v23.verror.VException;
-import junit.framework.TestCase;
-import org.joda.time.Duration;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.lang.reflect.Type;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.CyclicBarrier;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
import static com.google.common.truth.Truth.assertThat;
@@ -93,24 +97,12 @@
ctx = V.withNewServer(ctx, "", server, null);
FortuneClient client = FortuneClientFactory.getFortuneClient(name());
- final AtomicReference<String> result = new AtomicReference<>();
- final CountDownLatch latch = new CountDownLatch(1);
VContext ctxT = ctx.withTimeout(new Duration(2000000)); // 20s
client.add(ctxT, "Hello world");
- client.get(ctxT, new Callback<String>() {
- @Override
- public void onSuccess(String fortune) {
- result.set(fortune);
- latch.countDown();
- }
-
- @Override
- public void onFailure(VException error) {
- throw new RuntimeException(error);
- }
- });
- assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue();
- assertThat(result.get()).isNotEmpty();
+ FutureCallback<String> result = new FutureCallback<>();
+ client.get(ctxT, result);
+ assertThat(Uninterruptibles.getUninterruptibly(
+ result.getFuture(), 5, TimeUnit.SECONDS)).isNotEmpty();
}
public void testAsyncFortuneWithCancel() throws Exception {
@@ -120,34 +112,23 @@
CancelableVContext cancelCtx = ctx.withCancel();
FortuneClient client = FortuneClientFactory.getFortuneClient(name());
- final AtomicReference<String> result = new AtomicReference<>();
- final AtomicReference<VException> errorResult = new AtomicReference<>();
- final CountDownLatch latch = new CountDownLatch(1);
VContext ctxT = ctx.withTimeout(new Duration(20000)); // 20s
client.add(ctxT, "Hello world");
- client.get(cancelCtx, new Callback<String>() {
- @Override
- public void onSuccess(String fortune) {
- result.set(fortune);
- latch.countDown();
- }
-
- @Override
- public void onFailure(VException error) {
- errorResult.set(error);
- latch.countDown();
- }
- });
+ FutureCallback<String> result = new FutureCallback<>();
+ client.get(cancelCtx, result);
// Cancel the RPC.
cancelCtx.cancel();
// Allow the server RPC impl to finish.
callLatch.countDown();
// The call should have failed, it was canceled before it completed.
- assertThat(result.get()).isNull();
- assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue();
- assertThat(errorResult.get()).isNotNull();
- assertThat(errorResult.get().getAction()).isEqualTo(io.v.v23.verror.Errors.CANCELED
- .getAction());
+ try {
+ result.getFuture().get();
+ fail("Should have failed!");
+ } catch (ExecutionException e) {
+ assertThat(e.getCause()).isInstanceOf(VException.class);
+ assertThat(((VException) e.getCause()).getAction())
+ .isEqualTo(io.v.v23.verror.Errors.CANCELED.getAction());
+ }
}
public void testStreaming() throws Exception {
@@ -179,37 +160,17 @@
VContext ctxT = ctx.withTimeout(new Duration(20000)); // 20s
final String msg = "The only fortune";
client.add(ctxT, msg);
-
- final AtomicReference<Throwable> errorResult = new AtomicReference<>();
- final CountDownLatch latch = new CountDownLatch(1);
- client.streamingGet(ctxT, new Callback<TypedClientStream<Boolean,
- String, Integer>>() {
- @Override
- public void onSuccess(TypedClientStream<Boolean, String, Integer> stream) {
- try {
- for (int i = 0; i < 5; i++) {
- stream.send(true);
- assertEquals(msg, stream.recv());
- }
- int total = stream.finish();
- assertEquals(5, total);
- } catch (VException | IOException | AssertionError error) {
- errorResult.set(error);
- } finally {
- latch.countDown();
- }
- }
-
- @Override
- public void onFailure(VException error) {
- errorResult.set(error);
- latch.countDown();
- }
- });
- assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue();
- if (errorResult.get() != null) {
- throw errorResult.get();
+ FutureCallback<TypedClientStream<Boolean, String, Integer>> callback
+ = new FutureCallback<>();
+ client.streamingGet(ctxT, callback);
+ TypedClientStream<Boolean, String, Integer> stream
+ = Uninterruptibles.getUninterruptibly(callback.getFuture(), 1, TimeUnit.SECONDS);
+ for (int i = 0; i < 5; i++) {
+ stream.send(true);
+ assertEquals(msg, stream.recv());
}
+ int total = stream.finish();
+ assertEquals(5, total);
}
public void testMultiple() throws Exception {
@@ -237,7 +198,8 @@
fail("Expected exception during call to getComplexError()");
} catch (VException e) {
if (!FortuneServerImpl.COMPLEX_ERROR.deepEquals(e)) {
- fail(String.format("Expected error %s, got %s", FortuneServerImpl.COMPLEX_ERROR, e));
+ fail(String.format("Expected error %s, got %s",
+ FortuneServerImpl.COMPLEX_ERROR, e));
}
}
}
@@ -376,4 +338,23 @@
responseChannel.close();
}
}
+
+ private static class FutureCallback<T> implements Callback<T> {
+ private final SettableFuture<T> future = SettableFuture.create();
+
+ public Future<T> getFuture() {
+ return future;
+ }
+
+ @Override
+ public void onSuccess(T result) {
+ future.set(result);
+ }
+
+ @Override
+ public void onFailure(VException error) {
+ future.setException(error);
+ }
+ }
+
}
diff --git a/projects/discovery_sample/app/src/main/java/io/v/discoverysample/AttrAdapter.java b/projects/discovery_sample/app/src/main/java/io/v/discoverysample/AttrAdapter.java
index 23c0947..13219ab 100644
--- a/projects/discovery_sample/app/src/main/java/io/v/discoverysample/AttrAdapter.java
+++ b/projects/discovery_sample/app/src/main/java/io/v/discoverysample/AttrAdapter.java
@@ -21,9 +21,6 @@
import io.v.v23.discovery.Attributes;
-/**
- * Created by bjornick on 10/14/15.
- */
public class AttrAdapter extends BaseAdapter implements ListAdapter {
private class Entry implements Comparable<Entry> {
String key;
diff --git a/projects/discovery_sample/app/src/main/java/io/v/discoverysample/ScanHandlerAdapter.java b/projects/discovery_sample/app/src/main/java/io/v/discoverysample/ScanHandlerAdapter.java
index c6bb1cd..3f1999f 100644
--- a/projects/discovery_sample/app/src/main/java/io/v/discoverysample/ScanHandlerAdapter.java
+++ b/projects/discovery_sample/app/src/main/java/io/v/discoverysample/ScanHandlerAdapter.java
@@ -26,9 +26,6 @@
import io.v.impl.google.lib.discovery.ScanHandler;
import io.v.x.ref.lib.discovery.Advertisement;
-/**
- * Created by bjornick on 10/14/15.
- */
public class ScanHandlerAdapter extends BaseAdapter implements ScanHandler{
List<Advertisement> knownAdvertisements;
diff --git a/projects/namespace_browser/src/main/java/io/v/android/apps/namespace_browser/Namespace.java b/projects/namespace_browser/src/main/java/io/v/android/apps/namespace_browser/Namespace.java
index e090389..005e860 100644
--- a/projects/namespace_browser/src/main/java/io/v/android/apps/namespace_browser/Namespace.java
+++ b/projects/namespace_browser/src/main/java/io/v/android/apps/namespace_browser/Namespace.java
@@ -29,7 +29,6 @@
public static List<GlobReply> glob(String root, VContext ctx) throws VException {
io.v.v23.namespace.Namespace n = V.getNamespace(ctx);
VContext ctxT = ctx.withTimeout(new Duration(20000)); // 20s
- Iterable<GlobReply> chan = n.glob(ctxT, root.isEmpty() ? "*" : root + "/*");
- return ImmutableList.copyOf(chan);
+ return ImmutableList.copyOf(n.glob(ctxT, root.isEmpty() ? "*" : root + "/*"));
}
}
diff --git a/projects/syncslidepresenter/src/main/java/io/v/syncslidepresenter/Main.java b/projects/syncslidepresenter/src/main/java/io/v/syncslidepresenter/Main.java
index 73ba305..e8f0a1c 100644
--- a/projects/syncslidepresenter/src/main/java/io/v/syncslidepresenter/Main.java
+++ b/projects/syncslidepresenter/src/main/java/io/v/syncslidepresenter/Main.java
@@ -12,6 +12,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import io.v.v23.VIterable;
import org.joda.time.Duration;
import java.awt.Dimension;
@@ -78,7 +79,6 @@
import io.v.v23.syncbase.nosql.BatchDatabase;
import io.v.v23.syncbase.nosql.Database;
import io.v.v23.syncbase.nosql.RowRange;
-import io.v.v23.syncbase.nosql.Stream;
import io.v.v23.syncbase.nosql.Syncgroup;
import io.v.v23.syncbase.nosql.Table;
import io.v.v23.syncbase.nosql.WatchChange;
@@ -251,14 +251,13 @@
String rowKey = Joiner.on("/").join(presentation.getDeckId(), presentation
.getPresentationId(), "CurrentSlide");
logger.info("going to watch row key " + rowKey);
- Stream<WatchChange> watchStream = db.watch(context, presentations.name(), rowKey,
- marker);
+ VIterable<WatchChange> changes = db.watch(context, presentations.name(), rowKey, marker);
- for (WatchChange w : watchStream) {
- logger.info("Change detected in " + w.getRowName());
- logger.info("Type: " + w.getChangeType());
+ for (WatchChange change : changes) {
+ logger.info("Change detected in " + change.getRowName());
+ logger.info("Type: " + change.getChangeType());
try {
- VCurrentSlide currentSlide = (VCurrentSlide) VomUtil.decode(w.getVomValue(),
+ VCurrentSlide currentSlide = (VCurrentSlide) VomUtil.decode(change.getVomValue(),
VCurrentSlide.class);
logger.info("Current slide: " + currentSlide);
// Read the corresponding slide.
@@ -270,6 +269,10 @@
logger.log(Level.WARNING, "exception encountered while handling change event", e);
}
}
+
+ if (changes.error() != null) {
+ logger.log(Level.WARNING, "Premature end of slide changes: " + changes.error());
+ }
}
private static void enableOSXFullscreen(Window window) {
diff --git a/projects/syncslides/app/src/main/java/io/v/android/apps/syncslides/db/CurrentSlideWatcher.java b/projects/syncslides/app/src/main/java/io/v/android/apps/syncslides/db/CurrentSlideWatcher.java
index beab3d1..675c086 100644
--- a/projects/syncslides/app/src/main/java/io/v/android/apps/syncslides/db/CurrentSlideWatcher.java
+++ b/projects/syncslides/app/src/main/java/io/v/android/apps/syncslides/db/CurrentSlideWatcher.java
@@ -13,12 +13,12 @@
import java.util.List;
import io.v.impl.google.naming.NamingUtil;
+import io.v.v23.VIterable;
import io.v.v23.context.CancelableVContext;
import io.v.v23.context.VContext;
import io.v.v23.syncbase.nosql.BatchDatabase;
import io.v.v23.syncbase.nosql.ChangeType;
import io.v.v23.syncbase.nosql.Database;
-import io.v.v23.syncbase.nosql.Stream;
import io.v.v23.syncbase.nosql.Table;
import io.v.v23.syncbase.nosql.WatchChange;
import io.v.v23.verror.VException;
@@ -122,10 +122,10 @@
}
});
}
- Stream<WatchChange> changeStream;
- changeStream = mDB.watch(mVContext, SyncbaseDB.PRESENTATIONS_TABLE, rowKey,
+ VIterable<WatchChange> changes =
+ mDB.watch(mVContext, SyncbaseDB.PRESENTATIONS_TABLE, rowKey,
batch.getResumeMarker(mVContext));
- for (WatchChange change : changeStream) {
+ for (WatchChange change : changes) {
if (!change.getTableName().equals(SyncbaseDB.PRESENTATIONS_TABLE)) {
Log.e(TAG, "Wrong change table name: " + change.getTableName() + ", wanted: " +
SyncbaseDB.PRESENTATIONS_TABLE);
diff --git a/projects/syncslides/app/src/main/java/io/v/android/apps/syncslides/db/DriverWatcher.java b/projects/syncslides/app/src/main/java/io/v/android/apps/syncslides/db/DriverWatcher.java
index 0e29070..4fb43da 100644
--- a/projects/syncslides/app/src/main/java/io/v/android/apps/syncslides/db/DriverWatcher.java
+++ b/projects/syncslides/app/src/main/java/io/v/android/apps/syncslides/db/DriverWatcher.java
@@ -7,9 +7,9 @@
import android.util.Log;
import io.v.impl.google.naming.NamingUtil;
+import io.v.v23.VIterable;
import io.v.v23.syncbase.nosql.BatchDatabase;
import io.v.v23.syncbase.nosql.ChangeType;
-import io.v.v23.syncbase.nosql.Stream;
import io.v.v23.syncbase.nosql.Table;
import io.v.v23.syncbase.nosql.WatchChange;
import io.v.v23.verror.VException;
@@ -67,10 +67,10 @@
postInUiThread(presentation.getDriver().getElem());
}
- Stream<WatchChange> watch = mState.db.watch(
+ VIterable<WatchChange> changes = mState.db.watch(
mState.vContext, SyncbaseDB.PRESENTATIONS_TABLE, row,
batch.getResumeMarker(mState.vContext));
- for (WatchChange change : watch) {
+ for (WatchChange change : changes) {
Log.i(TAG, "Found change " + change.getChangeType());
if (!change.getRowName().equals(row)) {
continue;
diff --git a/projects/syncslides/app/src/main/java/io/v/android/apps/syncslides/db/QuestionWatcher.java b/projects/syncslides/app/src/main/java/io/v/android/apps/syncslides/db/QuestionWatcher.java
index 2c82486..ce929be 100644
--- a/projects/syncslides/app/src/main/java/io/v/android/apps/syncslides/db/QuestionWatcher.java
+++ b/projects/syncslides/app/src/main/java/io/v/android/apps/syncslides/db/QuestionWatcher.java
@@ -14,11 +14,11 @@
import io.v.android.apps.syncslides.model.Question;
import io.v.impl.google.naming.NamingUtil;
+import io.v.v23.VIterable;
import io.v.v23.services.syncbase.nosql.KeyValue;
import io.v.v23.syncbase.nosql.BatchDatabase;
import io.v.v23.syncbase.nosql.ChangeType;
import io.v.v23.syncbase.nosql.RowRange;
-import io.v.v23.syncbase.nosql.Stream;
import io.v.v23.syncbase.nosql.Table;
import io.v.v23.syncbase.nosql.WatchChange;
import io.v.v23.verror.VException;
@@ -73,14 +73,14 @@
Log.i(TAG, "Watching questions: " + prefix);
BatchDatabase batch = mState.db.beginBatch(mState.vContext, null);
Table presentations = batch.getTable(SyncbaseDB.PRESENTATIONS_TABLE);
- Stream<KeyValue> stream = presentations.scan(mState.vContext, RowRange.prefix(prefix));
- for (KeyValue keyValue : stream) {
- VQuestion value = (VQuestion) VomUtil.decode(keyValue.getValue(), VQuestion.class);
+ VIterable<KeyValue> rows = presentations.scan(mState.vContext, RowRange.prefix(prefix));
+ for (KeyValue row : rows) {
+ VQuestion value = (VQuestion) VomUtil.decode(row.getValue(), VQuestion.class);
if (value.getAnswered()) {
continue;
}
final Question question = new Question(
- lastPart(keyValue.getKey()),
+ lastPart(row.getKey()),
value.getQuestioner().getName(),
value.getTime());
mState.handler.post(new Runnable() {
@@ -91,10 +91,10 @@
});
}
- Stream<WatchChange> watch = mState.db.watch(
+ VIterable<WatchChange> changes = mState.db.watch(
mState.vContext, SyncbaseDB.PRESENTATIONS_TABLE, prefix,
batch.getResumeMarker(mState.vContext));
- for (WatchChange change : watch) {
+ for (WatchChange change : changes) {
Log.i(TAG, "Found change " + change.getChangeType());
final String id = lastPart(change.getRowName());
if (change.getChangeType().equals(ChangeType.PUT_CHANGE)) {
diff --git a/projects/syncslides/app/src/main/java/io/v/android/apps/syncslides/db/SyncbaseDB.java b/projects/syncslides/app/src/main/java/io/v/android/apps/syncslides/db/SyncbaseDB.java
index a8a8039..de35dc9 100644
--- a/projects/syncslides/app/src/main/java/io/v/android/apps/syncslides/db/SyncbaseDB.java
+++ b/projects/syncslides/app/src/main/java/io/v/android/apps/syncslides/db/SyncbaseDB.java
@@ -19,6 +19,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import io.v.v23.VIterable;
import org.joda.time.Duration;
import java.io.ByteArrayOutputStream;
@@ -61,7 +62,6 @@
import io.v.v23.syncbase.nosql.Database;
import io.v.v23.syncbase.nosql.DatabaseCore;
import io.v.v23.syncbase.nosql.RowRange;
-import io.v.v23.syncbase.nosql.Stream;
import io.v.v23.syncbase.nosql.Syncgroup;
import io.v.v23.syncbase.nosql.Table;
import io.v.v23.syncbase.nosql.WatchChange;
@@ -189,7 +189,7 @@
if (!mPresentations.exists(mVContext)) {
mPresentations.create(mVContext, mPermissions);
}
- //importDecks();
+ importDecks();
} catch (VException e) {
handleError("Couldn't setup syncbase service: " + e.getMessage());
return;
@@ -369,9 +369,9 @@
try {
BatchDatabase batch = mDB.beginBatch(mVContext, null);
mWatchMarker = batch.getResumeMarker(mVContext);
- DatabaseCore.ResultStream stream = batch.exec(mVContext,
+ DatabaseCore.QueryResults results = batch.exec(mVContext,
"SELECT k, v FROM Decks WHERE Type(v) like \"%VDeck\"");
- for (List<VdlAny> row : stream) {
+ for (List<VdlAny> row : results) {
if (row.size() != 2) {
throw new VException("Wrong number of columns: " + row.size());
}
@@ -385,6 +385,9 @@
}
});
}
+ if (results.error() != null) {
+ Log.e(TAG, "Couldn't get all decks due to error: " + results.error());
+ }
watchForDeckChanges();
} catch (VException e) {
Log.e(TAG, e.toString());
@@ -392,14 +395,14 @@
}
private void watchForDeckChanges() {
- Stream<WatchChange> changeStream = null;
+ VIterable<WatchChange> changes = null;
try {
- changeStream = mDB.watch(mVContext, DECKS_TABLE, "", mWatchMarker);
+ changes = mDB.watch(mVContext, DECKS_TABLE, "", mWatchMarker);
} catch (VException e) {
Log.e(TAG, "Couldn't watch for changes to the Decks table: " + e.toString());
return;
}
- for (WatchChange change : changeStream) {
+ for (WatchChange change : changes) {
if (!change.getTableName().equals(DECKS_TABLE)) {
Log.e(TAG, "Wrong change table name: " + change.getTableName() + ", wanted: " +
DECKS_TABLE);
@@ -437,6 +440,9 @@
});
}
}
+ if (changes.error() != null) {
+ Log.e(TAG, "Deck change thread exited early due to error: " + changes.error());
+ }
Log.i(TAG, "Deck change thread exiting");
}
@@ -538,11 +544,11 @@
String query = "SELECT k, v FROM Decks WHERE Type(v) LIKE \"%VSlide\" " +
"AND k LIKE \"" + NamingUtil.join(deckId, "slides") + "%\"";
- DatabaseCore.ResultStream stream = batch.exec(mVContext, query);
+ DatabaseCore.QueryResults results = batch.exec(mVContext, query);
// TODO(kash): Abort execution if interrupted. Perhaps we should derive
// a new VContext so it can be cancelled.
final List<Slide> slides = Lists.newArrayList();
- for (List<VdlAny> row : stream) {
+ for (List<VdlAny> row : results) {
if (row.size() != 2) {
throw new VException("Wrong number of columns: " + row.size());
}
@@ -552,6 +558,9 @@
String note = notesForSlide(mVContext, table, key);
slides.add(new SlideImpl(slide.getThumbnail(), note));
}
+ if (results.error() != null) {
+ Log.e(TAG, "Couldn't get all slides due to error: " + results.error());
+ }
Handler handler = new Handler(Looper.getMainLooper());
handler.post(new Runnable() {
@Override
@@ -616,8 +625,8 @@
String query = "SELECT k, v FROM Decks WHERE Type(v) LIKE \"%VSlide\" " +
"AND k LIKE \"" + NamingUtil.join(mDeckId, "slides") + "%\"";
- DatabaseCore.ResultStream stream = batch.exec(mVContext, query);
- for (List<VdlAny> row : stream) {
+ DatabaseCore.QueryResults results = batch.exec(mVContext, query);
+ for (List<VdlAny> row : results) {
if (row.size() != 2) {
throw new VException("Wrong number of columns: " + row.size());
}
@@ -633,6 +642,9 @@
}
});
}
+ if (results.error() != null) {
+ Log.e(TAG, "Couldn't get all slides due to error: " + results.error());
+ }
watchForSlideChanges();
} catch (VException e) {
Log.e(TAG, e.toString());
@@ -641,15 +653,15 @@
private void watchForSlideChanges() {
Table notesTable = mDB.getTable(NOTES_TABLE);
- Stream<WatchChange> changeStream;
+ VIterable<WatchChange> changes;
try {
- changeStream = mDB.watch(mVContext, DECKS_TABLE, "", mWatchMarker);
+ changes = mDB.watch(mVContext, DECKS_TABLE, "", mWatchMarker);
} catch (VException e) {
Log.e(TAG, "Couldn't watch for changes to the Decks table: " + e.toString());
return;
}
- for (WatchChange change : changeStream) {
+ for (WatchChange change : changes) {
if (!change.getTableName().equals(DECKS_TABLE)) {
Log.e(TAG, "Wrong change table name: " + change.getTableName() + ", wanted: " +
DECKS_TABLE);
@@ -687,7 +699,10 @@
});
}
}
- Log.i(TAG, "Slides watcher thread exiting");
+ if (changes.error() != null) {
+ Log.e(TAG, "Slides change thread exited early due to error: " + changes.error());
+ }
+ Log.i(TAG, "Slides change thread exiting");
}
@Override