blob: 9a4bdf096512745437e113833f4ba70a2f799d2f [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package io.v.v23.syncbase;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.v.impl.google.naming.NamingUtil;
import io.v.impl.google.services.syncbase.SyncbaseServer;
import io.v.v23.InputChannel;
import io.v.v23.InputChannels;
import io.v.v23.context.VContext;
import io.v.v23.naming.Endpoint;
import io.v.v23.rpc.ListenSpec;
import io.v.v23.services.syncbase.nosql.BatchOptions;
import io.v.v23.services.syncbase.nosql.BlobRef;
import io.v.v23.services.syncbase.nosql.KeyValue;
import io.v.v23.services.syncbase.nosql.ReadOnlyBatchException;
import io.v.v23.services.syncbase.nosql.SyncgroupMemberInfo;
import io.v.v23.services.syncbase.nosql.TableRow;
import io.v.v23.services.syncbase.nosql.SyncgroupSpec;
import io.v.v23.services.watch.ResumeMarker;
import io.v.v23.syncbase.nosql.BatchDatabase;
import io.v.v23.syncbase.nosql.BlobReader;
import io.v.v23.syncbase.nosql.BlobWriter;
import io.v.v23.syncbase.nosql.ChangeType;
import io.v.v23.syncbase.nosql.Database;
import io.v.v23.syncbase.nosql.DatabaseCore;
import io.v.v23.syncbase.nosql.NoSql;
import io.v.v23.syncbase.nosql.Row;
import io.v.v23.syncbase.nosql.RowRange;
import io.v.v23.syncbase.nosql.Syncgroup;
import io.v.v23.syncbase.nosql.Table;
import io.v.v23.syncbase.util.Util;
import io.v.v23.V;
import io.v.v23.rpc.Server;
import io.v.v23.security.BlessingPattern;
import io.v.v23.security.access.AccessList;
import io.v.v23.security.access.Constants;
import io.v.v23.security.access.Permissions;
import io.v.v23.syncbase.nosql.WatchChange;
import io.v.v23.vdl.VdlAny;
import io.v.v23.verror.CanceledException;
import io.v.v23.verror.NoExistException;
import io.v.v23.verror.VException;
import io.v.v23.vom.VomUtil;
import junit.framework.TestCase;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.lang.reflect.Type;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import static com.google.common.truth.Truth.assertThat;
import static io.v.v23.VFutures.sync;
/**
* Client-server syncbase tests.
*/
public class SyncbaseTest extends TestCase {
private static final String APP_NAME = "app/a#%b"; // symbols are okay
private static final String DB_NAME = "db";
private static final String TABLE_NAME = "table";
private static final String ROW_NAME = "row/a#%b"; // symbols are okay
private VContext ctx;
private Permissions allowAll;
private Endpoint serverEndpoint;
@Override
protected void setUp() throws Exception {
ctx = V.init();
ctx = V.withListenSpec(ctx, V.getListenSpec(ctx).withAddress(
new ListenSpec.Address("tcp", "localhost:0")));
AccessList acl = new AccessList(
ImmutableList.of(new BlessingPattern("...")), ImmutableList.<String>of());
allowAll = new Permissions(ImmutableMap.of(
Constants.READ.getValue(), acl,
Constants.WRITE.getValue(), acl,
Constants.ADMIN.getValue(), acl));
String tmpDir = Files.createTempDir().getAbsolutePath();
ctx = SyncbaseServer.withNewServer(ctx, new SyncbaseServer.Params()
.withPermissions(allowAll)
.withStorageRootDir(tmpDir));
Server server = V.getServer(ctx);
assertThat(server).isNotNull();
Endpoint[] endpoints = server.getStatus().getEndpoints();
assertThat(endpoints).isNotEmpty();
serverEndpoint = endpoints[0];
}
@Override
protected void tearDown() throws Exception {
ctx.cancel();
}
public void testService() throws Exception {
SyncbaseService service = createService();
assertThat(service.fullName()).isEqualTo(serverEndpoint.name());
assertThat(sync(service.listApps(ctx))).isEmpty();
}
public void testApp() throws Exception {
SyncbaseService service = createService();
SyncbaseApp app = service.getApp(APP_NAME);
assertThat(app).isNotNull();
assertThat(app.name()).isEqualTo(APP_NAME);
assertThat(app.fullName()).isEqualTo(
NamingUtil.join(serverEndpoint.name(), Util.escape(APP_NAME)));
assertThat(sync(app.exists(ctx))).isFalse();
assertThat(sync(service.listApps(ctx))).isEmpty();
sync(app.create(ctx, allowAll));
assertThat(sync(app.exists(ctx))).isTrue();
assertThat(sync(service.listApps(ctx))).containsExactly(app.name());
assertThat(sync(app.listDatabases(ctx))).isEmpty();
sync(app.destroy(ctx));
assertThat(sync(app.exists(ctx))).isFalse();
assertThat(sync(service.listApps(ctx))).isEmpty();
}
public void testDatabase() throws Exception {
SyncbaseApp app = createApp(createService());
assertThat(app).isNotNull();
Database db = app.getNoSqlDatabase("db", null);
assertThat(db).isNotNull();
assertThat(db.name()).isEqualTo(DB_NAME);
assertThat(db.fullName()).isEqualTo(
NamingUtil.join(serverEndpoint.name(), Util.escape(APP_NAME), DB_NAME));
assertThat(sync(db.exists(ctx))).isFalse();
assertThat(sync(app.listDatabases(ctx))).isEmpty();
sync(db.create(ctx, allowAll));
assertThat(sync(db.exists(ctx))).isTrue();
assertThat(sync(app.listDatabases(ctx))).containsExactly(db.name());
assertThat(sync(db.listTables(ctx))).isEmpty();
sync(db.destroy(ctx));
assertThat(sync(db.exists(ctx))).isFalse();
assertThat(sync(app.listDatabases(ctx))).isEmpty();
}
public void testTable() throws Exception {
Database db = createDatabase(createApp(createService()));
assertThat(db).isNotNull();
Table table = db.getTable(TABLE_NAME);
assertThat(table).isNotNull();
assertThat(table.name()).isEqualTo(TABLE_NAME);
assertThat(table.fullName()).isEqualTo(
NamingUtil.join(serverEndpoint.name(), Util.escape(APP_NAME), DB_NAME, TABLE_NAME));
assertThat(sync(table.exists(ctx))).isFalse();
assertThat(sync(db.listTables(ctx))).isEmpty();
sync(table.create(ctx, allowAll));
assertThat(sync(table.exists(ctx))).isTrue();
assertThat(sync(db.listTables(ctx))).containsExactly(TABLE_NAME);
assertThat(sync(table.getRow("row1").exists(ctx))).isFalse();
sync(table.put(ctx, "row1", "value1", String.class));
assertThat(sync(table.getRow("row1").exists(ctx))).isTrue();
assertThat(sync(table.get(ctx, "row1", String.class))).isEqualTo("value1");
sync(table.delete(ctx, "row1"));
assertThat(sync(table.getRow("row1").exists(ctx))).isFalse();
sync(table.put(ctx, "row1", "value1", String.class));
sync(table.put(ctx, "row2", "value2", String.class));
assertThat(sync(table.getRow("row1").exists(ctx))).isTrue();
assertThat(sync(table.getRow("row2").exists(ctx))).isTrue();
assertThat(sync(table.get(ctx, "row1", String.class))).isEqualTo("value1");
assertThat(sync(table.get(ctx, "row2", String.class))).isEqualTo("value2");
assertThat(sync(InputChannels.asList(
table.scan(ctx, RowRange.range("row1", "row3"))))).containsExactly(
new KeyValue("row1", VomUtil.encode("value1", String.class)),
new KeyValue("row2", VomUtil.encode("value2", String.class)));
sync(table.deleteRange(ctx, RowRange.range("row1", "row3")));
assertThat(sync(table.getRow("row1").exists(ctx))).isFalse();
assertThat(sync(table.getRow("row2").exists(ctx))).isFalse();
sync(table.destroy(ctx));
assertThat(sync(table.exists(ctx))).isFalse();
assertThat(sync(db.listTables(ctx))).isEmpty();
}
public void testRow() throws Exception {
Table table = createTable(createDatabase(createApp(createService())));
Row row = table.getRow(ROW_NAME);
assertThat(row).isNotNull();
assertThat(row.key()).isEqualTo(ROW_NAME);
assertThat(row.fullName()).isEqualTo(
NamingUtil.join(serverEndpoint.name(), Util.escape(APP_NAME), DB_NAME, TABLE_NAME,
Util.escape(ROW_NAME)));
assertThat(sync(row.exists(ctx))).isFalse();
sync(row.put(ctx, "value", String.class));
assertThat(sync(row.exists(ctx))).isTrue();
assertThat(sync(row.get(ctx, String.class))).isEqualTo("value");
assertThat(sync(table.get(ctx, ROW_NAME, String.class))).isEqualTo("value");
sync(row.delete(ctx));
assertThat(sync(row.exists(ctx))).isFalse();
sync(table.put(ctx, ROW_NAME, "value", String.class));
assertThat(sync(row.exists(ctx))).isTrue();
assertThat(sync(row.get(ctx, String.class))).isEqualTo("value");
assertThat(sync(table.get(ctx, ROW_NAME, String.class))).isEqualTo("value");
}
public void testDatabaseExec() throws Exception {
Database db = createDatabase(createApp(createService()));
Table table = createTable(db);
Foo foo = new Foo(4, "f");
Bar bar = new Bar(0.5f, "b");
Baz baz = new Baz("John Doe", true);
sync(table.put(ctx, "foo", foo, Foo.class));
sync(table.put(ctx, "bar", bar, Bar.class));
sync(table.put(ctx, "baz", baz, Baz.class));
{
DatabaseCore.QueryResults results = sync(db.exec(ctx,
"select k, v.Name from " + TABLE_NAME + " where Type(v) like \"%Baz\""));
assertThat(results.columnNames()).containsExactly("k", "v.Name");
assertThat(sync(InputChannels.asList(results))).containsExactly(ImmutableList.of(
new VdlAny(String.class, "baz"), new VdlAny(String.class, baz.name)));
}
{
DatabaseCore.QueryResults results =
sync(db.exec(ctx, "select k, v from " + TABLE_NAME));
assertThat(results.columnNames()).containsExactly("k", "v");
assertThat(sync(InputChannels.asList(results))).containsExactly(
ImmutableList.of(new VdlAny(String.class, "bar"), new VdlAny(Bar.class, bar)),
ImmutableList.of(new VdlAny(String.class, "baz"), new VdlAny(Baz.class, baz)),
ImmutableList.of(new VdlAny(String.class, "foo"), new VdlAny(Foo.class, foo))
);
}
{
DatabaseCore.QueryResults results = sync(db.exec(ctx,
"select k, v from " + TABLE_NAME + " where k = ? or v.I = ?",
Arrays.<Object>asList("baz", 4),
Arrays.<Type>asList(String.class, int.class)));
assertThat(results.columnNames()).containsExactly("k", "v");
assertThat(sync(InputChannels.asList(results))).containsExactly(
ImmutableList.of(new VdlAny(String.class, "baz"), new VdlAny(Baz.class, baz)),
ImmutableList.of(new VdlAny(String.class, "foo"), new VdlAny(Foo.class, foo))
);
}
}
public void testDatabaseWatch() throws Exception {
Database db = createDatabase(createApp(createService()));
Table table = createTable(db);
Foo foo = new Foo(4, "f");
Bar bar = new Bar(0.5f, "b");
Baz baz = new Baz("John Doe", true);
ResumeMarker marker = sync(db.getResumeMarker(ctx));
sync(table.put(ctx, "foo", foo, Foo.class));
sync(table.put(ctx, "bar", bar, Bar.class));
sync(table.put(ctx, "baz", baz, Baz.class));
sync(table.getRow("baz").delete(ctx));
ImmutableList<WatchChange> expectedChanges = ImmutableList.of(
new WatchChange(TABLE_NAME, "bar", ChangeType.PUT_CHANGE,
VomUtil.encode(bar, Bar.class), null, false, false),
new WatchChange(TABLE_NAME, "baz", ChangeType.PUT_CHANGE,
VomUtil.encode(baz, Baz.class), null, false, false),
new WatchChange(TABLE_NAME, "baz", ChangeType.DELETE_CHANGE,
new byte[0], null, false, false));
VContext ctxC = ctx.withCancel();
Iterator<WatchChange> it = InputChannels.asIterable(
db.watch(ctxC, TABLE_NAME, "b", marker)).iterator();
checkWatch(it, expectedChanges);
ctxC.cancel();
}
public void testDatabaseWatchWithInitialState() throws Exception {
Database db = createDatabase(createApp(createService()));
Table table = createTable(db);
Foo foo = new Foo(4, "f");
Bar bar = new Bar(0.5f, "b");
Baz baz = new Baz("John Doe", true);
sync(table.put(ctx, "foo", foo, Foo.class));
sync(table.put(ctx, "barfoo", foo, Foo.class));
sync(table.put(ctx, "bar", bar, Bar.class));
VContext ctxC = ctx.withCancel();
Iterator<WatchChange> it = InputChannels.asIterable(
db.watch(ctxC, TABLE_NAME, "b")).iterator();
ImmutableList<WatchChange> expectedInitialChanges = ImmutableList.of(
new WatchChange(TABLE_NAME, "bar", ChangeType.PUT_CHANGE,
VomUtil.encode(bar, Bar.class), null, false, true),
new WatchChange(TABLE_NAME, "barfoo", ChangeType.PUT_CHANGE,
VomUtil.encode(foo, Foo.class), null, false, false));
checkWatch(it, expectedInitialChanges);
sync(table.put(ctx, "baz", baz, Baz.class));
sync(table.getRow("baz").delete(ctx));
ImmutableList<WatchChange> expectedChanges = ImmutableList.of(
new WatchChange(TABLE_NAME, "baz", ChangeType.PUT_CHANGE,
VomUtil.encode(baz, Baz.class), null, false, false),
new WatchChange(TABLE_NAME, "baz", ChangeType.DELETE_CHANGE,
new byte[0], null, false, false));
checkWatch(it, expectedChanges);
ctxC.cancel();
}
public void testDatabaseWatchWithContextCancel() throws Exception {
final VContext ctxC = ctx.withCancel();
Database db = createDatabase(createApp(createService()));
createTable(db);
InputChannel<WatchChange> channel = db.watch(
ctxC, TABLE_NAME, "b", sync(db.getResumeMarker(ctx)));
new Thread(new Runnable() {
@Override
public void run() {
ctxC.cancel();
}
}).start();
try {
sync(InputChannels.asList(channel));
} catch (CanceledException e) {
// OK
}
}
public void testBatch() throws Exception {
Database db = createDatabase(createApp(createService()));
Table table = createTable(db);
assertThat(sync(InputChannels.asList(table.scan(ctx, RowRange.prefix(""))))).isEmpty();
BatchDatabase batchFoo = sync(db.beginBatch(ctx, null));
Table batchFooTable = batchFoo.getTable(TABLE_NAME);
assertThat(sync(batchFooTable.exists(ctx))).isTrue();
sync(batchFooTable.put(ctx, ROW_NAME, "foo", String.class));
// Assert that value is visible inside the batch but not outside.
assertThat(sync(batchFooTable.get(ctx, ROW_NAME, String.class))).isEqualTo("foo");
assertThat(sync(table.getRow(ROW_NAME).exists(ctx))).isFalse();
BatchDatabase batchBar = sync(db.beginBatch(ctx, null));
Table batchBarTable = batchBar.getTable(TABLE_NAME);
assertThat(sync(batchBarTable.exists(ctx))).isTrue();
sync(batchBarTable.put(ctx, ROW_NAME, "foo", String.class));
// Assert that value is visible inside the batch but not outside.
assertThat(sync(batchBarTable.get(ctx, ROW_NAME, String.class))).isEqualTo("foo");
assertThat(sync(table.getRow(ROW_NAME).exists(ctx))).isFalse();
sync(batchFoo.commit(ctx));
// Assert that the value is visible outside the batch.
assertThat(sync(table.get(ctx, ROW_NAME, String.class))).isEqualTo("foo");
try {
sync(batchBar.commit(ctx));
fail("Expected batchBar.commit() to fail");
} catch (VException e) {
// ok
}
}
public void testRunInBatch() throws Exception {
final Database d = createDatabase(createApp(createService()));
Table table = createTable(d);
sync(NoSql.runInBatch(ctx, d, new BatchOptions(), new NoSql.BatchOperation() {
private int retries = 0;
@Override
public ListenableFuture<Void> run(BatchDatabase b) {
++retries;
String fooKey = String.format("foo-%d", retries);
String barKey = String.format("bar-%d", retries);
try {
// Read foo. It does not exist.
try {
sync(b.getTable(TABLE_NAME).get(ctx, fooKey, String.class));
throw new VException("Expected b.get() to fail with NoExistException");
} catch (NoExistException e) {
// ok
}
// If we need to fail the commit, write to foo in a separate concurrent batch.
if (retries < 2) {
sync(d.getTable(TABLE_NAME).put(ctx, fooKey, "foo", String.class));
}
// Write to bar.
sync(b.getTable(TABLE_NAME).put(ctx, barKey, "bar", String.class));
} catch (VException e) {
return Futures.immediateFailedFuture(e);
}
return Futures.immediateFuture(null);
}
}));
// First try failed (wrote foo), second succeeded (bar commit succeeded).
assertThat(sync(InputChannels.asList(table.scan(ctx, RowRange.prefix("")))))
.containsExactly(
new KeyValue("bar-2", VomUtil.encode("bar", String.class)),
new KeyValue("foo-1", VomUtil.encode("foo", String.class)));
}
public void testRunInBatchReadOnly() throws Exception {
final Database d = createDatabase(createApp(createService()));
Table table = createTable(d);
sync(table.put(ctx, "foo", "foo", String.class));
sync(NoSql.runInBatch(ctx, d, new BatchOptions("", true), new NoSql.BatchOperation() {
@Override
public ListenableFuture<Void> run(BatchDatabase b) {
try {
// Read foo.
Object before = sync(b.getTable(TABLE_NAME).get(ctx, "foo", String.class));
// Write to foo in a separate concurrent batch. It should not cause a retry
// since readonly batches are not committed.
sync(d.getTable(TABLE_NAME).put(ctx, "foo", "oof", String.class));
// Read foo again. Batch should not see the changed value.
Object after = sync(b.getTable(TABLE_NAME).get(ctx, "foo", String.class));
if (!before.equals(after)) {
throw new VException("batch should not see concurrently changed value");
}
// Try writing to bar. This should fail since the batch is readonly.
try {
sync(b.getTable(TABLE_NAME).put(ctx, "bar", "bar", String.class));
throw new VException("Expected b.put() to fail with ReadOnlyBatchException");
} catch (ReadOnlyBatchException e) {
// ok
}
} catch (VException e) {
return Futures.immediateFailedFuture(e);
}
return Futures.immediateFuture(null);
}
}));
// Single uncommitted iteration.
assertThat(sync(InputChannels.asList(table.scan(ctx, RowRange.prefix("")))))
.containsExactly(
new KeyValue("foo", VomUtil.encode("oof", String.class)));
}
public void testSyncgroup() throws Exception {
Database db = createDatabase(createApp(createService()));
String groupName = "test";
// "A" creates the group.
SyncgroupSpec spec = new SyncgroupSpec("test", allowAll,
ImmutableList.of(new TableRow(TABLE_NAME, "")),
ImmutableList.<String>of(), false);
SyncgroupMemberInfo memberInfo = new SyncgroupMemberInfo();
memberInfo.setSyncPriority((byte) 1);
Syncgroup group = db.getSyncgroup(groupName);
{
sync(group.create(ctx, spec, memberInfo));
assertThat(sync(db.listSyncgroupNames(ctx))).containsExactly(groupName);
assertThat(sync(group.getSpec(ctx)).values()).containsExactly(spec);
assertThat(sync(group.getMembers(ctx)).values()).containsExactly(memberInfo);
assertThat(sync(group.join(ctx, memberInfo))).isEqualTo(spec);
}
// TODO(spetrovic): test leave() and destroy().
SyncgroupSpec specRMW = new SyncgroupSpec("testRMW", allowAll,
ImmutableList.of(new TableRow(TABLE_NAME, "")),
ImmutableList.<String>of(), false);
assertThat(sync(group.getSpec(ctx)).keySet()).isNotEmpty();
String version = sync(group.getSpec(ctx)).keySet().iterator().next();
sync(group.setSpec(ctx, specRMW, version));
assertThat(sync(group.getSpec(ctx)).values()).containsExactly(specRMW);
SyncgroupSpec specOverwrite = new SyncgroupSpec("testOverwrite", allowAll,
ImmutableList.of(new TableRow(TABLE_NAME, "")),
ImmutableList.<String>of(), false);
sync(group.setSpec(ctx, specOverwrite, ""));
assertThat(sync(group.getSpec(ctx)).values()).containsExactly(specOverwrite);
}
// TODO(spetrovic): Test Database.enforceSchema().
public void testBlobSmall() throws Exception {
byte[] data = new byte[]{ 1, 2, 3, 4, 5 };
Database db = createDatabase(createApp(createService()));
BlobWriter writer = sync(db.writeBlob(ctx, null));
OutputStream out = writer.stream(ctx);
out.write(data);
out.close();
assertThat(sync(writer.size(ctx))).isEqualTo(data.length);
sync(writer.commit(ctx));
BlobRef ref = writer.getRef();
BlobReader reader = db.readBlob(ctx, ref);
byte[] actual = new byte[data.length];
ByteStreams.readFully(reader.stream(ctx, 0), actual);
assertThat(actual).isEqualTo(data);
}
public void testBlobLarge() throws Exception {
byte[] data = new byte[1 << 17];
for (int i = 0; i < data.length; ++i) {
data[i] = (byte)(i & 0xFF);
}
Database db = createDatabase(createApp(createService()));
BlobWriter writer = sync(db.writeBlob(ctx, null));
OutputStream out = writer.stream(ctx);
out.write(data);
out.close();
assertThat(sync(writer.size(ctx))).isEqualTo(data.length);
sync(writer.commit(ctx));
BlobRef ref = writer.getRef();
BlobReader reader = db.readBlob(ctx, ref);
byte[] actual = new byte[data.length];
ByteStreams.readFully(reader.stream(ctx, 0), actual);
assertThat(actual).isEqualTo(data);
}
public void testBlobWriteResume() throws Exception {
Database db = createDatabase(createApp(createService()));
BlobWriter writer = sync(db.writeBlob(ctx, null));
BlobRef ref = writer.getRef();
byte[] data = new byte[]{ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
{
// Write, part 1.
OutputStream out = writer.stream(ctx);
out.write(data, 0, data.length / 2);
out.close();
assertThat(sync(writer.size(ctx))).isEqualTo(data.length / 2);
}
{
// Write, part 2.
writer = sync(db.writeBlob(ctx, ref));
assertThat(sync(writer.size(ctx))).isEqualTo(5);
OutputStream out = writer.stream(ctx);
out.write(data, data.length / 2, data.length / 2);
out.close();
assertThat(sync(writer.size(ctx))).isEqualTo(data.length);
sync(writer.commit(ctx));
}
// Read.
BlobReader reader = db.readBlob(ctx, ref);
byte[] actual = new byte[data.length];
ByteStreams.readFully(reader.stream(ctx, 0), actual);
assertThat(actual).isEqualTo(data);
}
public void testBlobWriteCommitted() throws Exception {
byte[] data = new byte[]{ 1, 2, 3, 4, 5 };
Database db = createDatabase(createApp(createService()));
BlobWriter writer = sync(db.writeBlob(ctx, null));
BlobRef ref = writer.getRef();
OutputStream out = writer.stream(ctx);
out.write(data);
out.close();
assertThat(sync(writer.size(ctx))).isEqualTo(data.length);
sync(writer.commit(ctx));
try {
out = writer.stream(ctx);
out.write(data);
out.close();
fail("write of a committed blob should fail");
} catch (Exception e) {
// OK
}
try {
sync(writer.commit(ctx));
fail("commit of a committed blob should fail");
} catch (VException e) {
// OK
}
BlobReader reader = db.readBlob(ctx, ref);
byte[] actual = new byte[data.length];
ByteStreams.readFully(reader.stream(ctx, 0), actual);
assertThat(actual).isEqualTo(data);
}
public void testBlobWriteCancelable() throws Exception {
Database db = createDatabase(createApp(createService()));
VContext ctxC = ctx.withCancel();
BlobWriter writer = sync(db.writeBlob(ctxC, null));
BlobRef ref = writer.getRef();
byte[] data = new byte[]{ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
// Write 1st chunk.
OutputStream out = writer.stream(ctxC);
out.write(data, 0, data.length / 2);
ctxC.cancel();
// Write 2nd chunk.
try {
out.write(data, data.length / 2, data.length / 2);
out.close();
fail("write on a canceled stream should fail");
} catch (IOException e) {
// OK
}
}
public void testBlobReadUncommitted() throws Exception {
Database db = createDatabase(createApp(createService()));
BlobWriter writer = sync(db.writeBlob(ctx, null));
BlobRef ref = writer.getRef();
byte[] data = new byte[]{ 1, 2, 3, 4, 5 };
OutputStream out = writer.stream(ctx);
out.write(data, 0, data.length);
out.close();
BlobReader reader = db.readBlob(ctx, ref);
try {
byte[] actual = new byte[data.length];
ByteStreams.readFully(reader.stream(ctx, 0), actual);
fail("read of an uncommitted blob should fail");
} catch (IOException e) {
// OK
}
try {
sync(reader.prefetch(ctx, 0).recv());
} catch (VException e) {
// OK
}
}
public void testBlobReadPrefetch() throws Exception {
Database db = createDatabase(createApp(createService()));
BlobWriter writer = sync(db.writeBlob(ctx, null));
BlobRef ref = writer.getRef();
byte[] data = new byte[]{ 1, 2, 3, 4, 5 };
OutputStream out = writer.stream(ctx);
out.write(data, 0, data.length);
out.close();
sync(writer.commit(ctx));
// Prefetch
BlobReader reader = db.readBlob(ctx, ref);
sync(InputChannels.asDone(reader.prefetch(ctx, 0)));
// Read
byte[] actual = new byte[data.length];
ByteStreams.readFully(reader.stream(ctx, 0), actual);
assertThat(actual).isEqualTo(data);
}
public void testBlobReadClosedStream() throws Exception {
byte[] data = new byte[]{ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
Database db = createDatabase(createApp(createService()));
BlobWriter writer = sync(db.writeBlob(ctx, null));
OutputStream out = writer.stream(ctx);
out.write(data);
out.close();
assertThat(sync(writer.size(ctx))).isEqualTo(data.length);
sync(writer.commit(ctx));
BlobRef ref = writer.getRef();
BlobReader reader = db.readBlob(ctx, ref);
byte[] actual = new byte[data.length / 2];
InputStream in = reader.stream(ctx, 0);
// Read 1st chunk.
ByteStreams.readFully(in, actual);
assertThat(actual).isEqualTo(new byte[]{1, 2, 3, 4, 5});
// Close the input stream.
in.close();
// Read 2nd chunk.
try {
ByteStreams.readFully(in, actual);
fail("read of a closed stream should fail");
} catch (IOException e) {
// OK
}
}
private SyncbaseService createService() throws Exception {
return Syncbase.newService(serverEndpoint.name());
}
private SyncbaseApp createApp(SyncbaseService service) throws Exception {
SyncbaseApp app = service.getApp(APP_NAME);
sync(app.create(ctx, allowAll));
return app;
}
private Database createDatabase(SyncbaseApp app) throws Exception {
Database db = app.getNoSqlDatabase(DB_NAME, null);
sync(db.create(ctx, allowAll));
return db;
}
private Table createTable(Database db) throws Exception {
Table table = db.getTable(TABLE_NAME);
sync(table.create(ctx, allowAll));
return table;
}
private void checkWatch(Iterator<WatchChange> it,
List<WatchChange> expectedChanges) throws Exception {
for (WatchChange expected : expectedChanges) {
assertThat(it.hasNext()).isTrue();
WatchChange actual = it.next();
assertThat(actual.getTableName()).isEqualTo(expected.getTableName());
assertThat(actual.getRowName()).isEqualTo(expected.getRowName());
assertThat(actual.getChangeType()).isEqualTo(expected.getChangeType());
assertThat(actual.getVomValue()).isEqualTo(expected.getVomValue());
assertThat(actual.isFromSync()).isEqualTo(expected.isFromSync());
assertThat(actual.isContinued()).isEqualTo(expected.isContinued());
}
}
private static class Foo implements Serializable {
private int i;
private String s;
public Foo() {
this.i = 0;
this.s = "";
}
public Foo(int i, String s) {
this.i = i;
this.s = s;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Foo foo = (Foo) o;
if (i != foo.i) return false;
return !(s != null ? !s.equals(foo.s) : foo.s != null);
}
}
private static class Bar implements Serializable {
private float f;
private String s;
public Bar() {
this.f = 0f;
this.s = "";
}
public Bar(float f, String s) {
this.f = f;
this.s = s;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Bar bar = (Bar) o;
if (Float.compare(bar.f, f) != 0) return false;
return !(s != null ? !s.equals(bar.s) : bar.s != null);
}
}
private static class Baz implements Serializable {
private String name;
private boolean active;
public Baz() {
this.name = "";
this.active = false;
}
public Baz(String name, boolean active) {
this.name = name;
this.active = active;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Baz baz = (Baz) o;
if (active != baz.active) return false;
return !(name != null ? !name.equals(baz.name) : baz.name != null);
}
}
}