| // Copyright 2015 The Vanadium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package io.v.v23.syncbase; |
| |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.ImmutableMap; |
| import com.google.common.io.ByteStreams; |
| import com.google.common.io.Files; |
| import com.google.common.util.concurrent.Futures; |
| import com.google.common.util.concurrent.ListenableFuture; |
| import io.v.impl.google.naming.NamingUtil; |
| import io.v.impl.google.services.syncbase.SyncbaseServer; |
| import io.v.v23.InputChannel; |
| import io.v.v23.InputChannels; |
| import io.v.v23.context.VContext; |
| import io.v.v23.naming.Endpoint; |
| import io.v.v23.rpc.ListenSpec; |
| import io.v.v23.services.syncbase.nosql.BatchOptions; |
| import io.v.v23.services.syncbase.nosql.BlobRef; |
| import io.v.v23.services.syncbase.nosql.KeyValue; |
| import io.v.v23.services.syncbase.nosql.ReadOnlyBatchException; |
| import io.v.v23.services.syncbase.nosql.SyncgroupMemberInfo; |
| import io.v.v23.services.syncbase.nosql.TableRow; |
| import io.v.v23.services.syncbase.nosql.SyncgroupSpec; |
| import io.v.v23.services.watch.ResumeMarker; |
| import io.v.v23.syncbase.nosql.BatchDatabase; |
| import io.v.v23.syncbase.nosql.BlobReader; |
| import io.v.v23.syncbase.nosql.BlobWriter; |
| import io.v.v23.syncbase.nosql.ChangeType; |
| import io.v.v23.syncbase.nosql.Database; |
| import io.v.v23.syncbase.nosql.DatabaseCore; |
| import io.v.v23.syncbase.nosql.NoSql; |
| import io.v.v23.syncbase.nosql.Row; |
| import io.v.v23.syncbase.nosql.RowRange; |
| import io.v.v23.syncbase.nosql.Syncgroup; |
| import io.v.v23.syncbase.nosql.Table; |
| import io.v.v23.syncbase.util.Util; |
| import io.v.v23.V; |
| import io.v.v23.rpc.Server; |
| import io.v.v23.security.BlessingPattern; |
| import io.v.v23.security.access.AccessList; |
| import io.v.v23.security.access.Constants; |
| import io.v.v23.security.access.Permissions; |
| import io.v.v23.syncbase.nosql.WatchChange; |
| import io.v.v23.vdl.VdlAny; |
| import io.v.v23.verror.CanceledException; |
| import io.v.v23.verror.NoExistException; |
| import io.v.v23.verror.VException; |
| import io.v.v23.vom.VomUtil; |
| import junit.framework.TestCase; |
| |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.OutputStream; |
| import java.io.Serializable; |
| import java.lang.reflect.Type; |
| import java.util.Arrays; |
| import java.util.Iterator; |
| import java.util.List; |
| |
| import static com.google.common.truth.Truth.assertThat; |
| import static io.v.v23.VFutures.sync; |
| |
| /** |
| * Client-server syncbase tests. |
| */ |
| public class SyncbaseTest extends TestCase { |
| private static final String APP_NAME = "app/a#%b"; // symbols are okay |
| private static final String DB_NAME = "db"; |
| private static final String TABLE_NAME = "table"; |
| private static final String ROW_NAME = "row/a#%b"; // symbols are okay |
| |
| private VContext ctx; |
| private Permissions allowAll; |
| private Endpoint serverEndpoint; |
| |
| @Override |
| protected void setUp() throws Exception { |
| ctx = V.init(); |
| ctx = V.withListenSpec(ctx, V.getListenSpec(ctx).withAddress( |
| new ListenSpec.Address("tcp", "localhost:0"))); |
| AccessList acl = new AccessList( |
| ImmutableList.of(new BlessingPattern("...")), ImmutableList.<String>of()); |
| allowAll = new Permissions(ImmutableMap.of( |
| Constants.READ.getValue(), acl, |
| Constants.WRITE.getValue(), acl, |
| Constants.ADMIN.getValue(), acl)); |
| String tmpDir = Files.createTempDir().getAbsolutePath(); |
| ctx = SyncbaseServer.withNewServer(ctx, new SyncbaseServer.Params() |
| .withPermissions(allowAll) |
| .withStorageRootDir(tmpDir)); |
| Server server = V.getServer(ctx); |
| assertThat(server).isNotNull(); |
| Endpoint[] endpoints = server.getStatus().getEndpoints(); |
| assertThat(endpoints).isNotEmpty(); |
| serverEndpoint = endpoints[0]; |
| } |
| |
| @Override |
| protected void tearDown() throws Exception { |
| ctx.cancel(); |
| } |
| |
| public void testService() throws Exception { |
| SyncbaseService service = createService(); |
| assertThat(service.fullName()).isEqualTo(serverEndpoint.name()); |
| assertThat(sync(service.listApps(ctx))).isEmpty(); |
| } |
| |
| public void testApp() throws Exception { |
| SyncbaseService service = createService(); |
| SyncbaseApp app = service.getApp(APP_NAME); |
| assertThat(app).isNotNull(); |
| assertThat(app.name()).isEqualTo(APP_NAME); |
| assertThat(app.fullName()).isEqualTo( |
| NamingUtil.join(serverEndpoint.name(), Util.escape(APP_NAME))); |
| assertThat(sync(app.exists(ctx))).isFalse(); |
| assertThat(sync(service.listApps(ctx))).isEmpty(); |
| sync(app.create(ctx, allowAll)); |
| assertThat(sync(app.exists(ctx))).isTrue(); |
| assertThat(sync(service.listApps(ctx))).containsExactly(app.name()); |
| assertThat(sync(app.listDatabases(ctx))).isEmpty(); |
| sync(app.destroy(ctx)); |
| assertThat(sync(app.exists(ctx))).isFalse(); |
| assertThat(sync(service.listApps(ctx))).isEmpty(); |
| } |
| |
| public void testDatabase() throws Exception { |
| SyncbaseApp app = createApp(createService()); |
| assertThat(app).isNotNull(); |
| Database db = app.getNoSqlDatabase("db", null); |
| assertThat(db).isNotNull(); |
| assertThat(db.name()).isEqualTo(DB_NAME); |
| assertThat(db.fullName()).isEqualTo( |
| NamingUtil.join(serverEndpoint.name(), Util.escape(APP_NAME), DB_NAME)); |
| assertThat(sync(db.exists(ctx))).isFalse(); |
| assertThat(sync(app.listDatabases(ctx))).isEmpty(); |
| sync(db.create(ctx, allowAll)); |
| assertThat(sync(db.exists(ctx))).isTrue(); |
| assertThat(sync(app.listDatabases(ctx))).containsExactly(db.name()); |
| assertThat(sync(db.listTables(ctx))).isEmpty(); |
| sync(db.destroy(ctx)); |
| assertThat(sync(db.exists(ctx))).isFalse(); |
| assertThat(sync(app.listDatabases(ctx))).isEmpty(); |
| } |
| |
| public void testTable() throws Exception { |
| Database db = createDatabase(createApp(createService())); |
| assertThat(db).isNotNull(); |
| Table table = db.getTable(TABLE_NAME); |
| assertThat(table).isNotNull(); |
| assertThat(table.name()).isEqualTo(TABLE_NAME); |
| assertThat(table.fullName()).isEqualTo( |
| NamingUtil.join(serverEndpoint.name(), Util.escape(APP_NAME), DB_NAME, TABLE_NAME)); |
| assertThat(sync(table.exists(ctx))).isFalse(); |
| assertThat(sync(db.listTables(ctx))).isEmpty(); |
| sync(table.create(ctx, allowAll)); |
| assertThat(sync(table.exists(ctx))).isTrue(); |
| assertThat(sync(db.listTables(ctx))).containsExactly(TABLE_NAME); |
| |
| assertThat(sync(table.getRow("row1").exists(ctx))).isFalse(); |
| sync(table.put(ctx, "row1", "value1", String.class)); |
| assertThat(sync(table.getRow("row1").exists(ctx))).isTrue(); |
| assertThat(sync(table.get(ctx, "row1", String.class))).isEqualTo("value1"); |
| sync(table.delete(ctx, "row1")); |
| assertThat(sync(table.getRow("row1").exists(ctx))).isFalse(); |
| sync(table.put(ctx, "row1", "value1", String.class)); |
| sync(table.put(ctx, "row2", "value2", String.class)); |
| assertThat(sync(table.getRow("row1").exists(ctx))).isTrue(); |
| assertThat(sync(table.getRow("row2").exists(ctx))).isTrue(); |
| assertThat(sync(table.get(ctx, "row1", String.class))).isEqualTo("value1"); |
| assertThat(sync(table.get(ctx, "row2", String.class))).isEqualTo("value2"); |
| assertThat(sync(InputChannels.asList( |
| table.scan(ctx, RowRange.range("row1", "row3"))))).containsExactly( |
| new KeyValue("row1", VomUtil.encode("value1", String.class)), |
| new KeyValue("row2", VomUtil.encode("value2", String.class))); |
| sync(table.deleteRange(ctx, RowRange.range("row1", "row3"))); |
| assertThat(sync(table.getRow("row1").exists(ctx))).isFalse(); |
| assertThat(sync(table.getRow("row2").exists(ctx))).isFalse(); |
| |
| sync(table.destroy(ctx)); |
| assertThat(sync(table.exists(ctx))).isFalse(); |
| assertThat(sync(db.listTables(ctx))).isEmpty(); |
| } |
| |
| public void testRow() throws Exception { |
| Table table = createTable(createDatabase(createApp(createService()))); |
| Row row = table.getRow(ROW_NAME); |
| assertThat(row).isNotNull(); |
| assertThat(row.key()).isEqualTo(ROW_NAME); |
| assertThat(row.fullName()).isEqualTo( |
| NamingUtil.join(serverEndpoint.name(), Util.escape(APP_NAME), DB_NAME, TABLE_NAME, |
| Util.escape(ROW_NAME))); |
| assertThat(sync(row.exists(ctx))).isFalse(); |
| sync(row.put(ctx, "value", String.class)); |
| assertThat(sync(row.exists(ctx))).isTrue(); |
| assertThat(sync(row.get(ctx, String.class))).isEqualTo("value"); |
| assertThat(sync(table.get(ctx, ROW_NAME, String.class))).isEqualTo("value"); |
| sync(row.delete(ctx)); |
| assertThat(sync(row.exists(ctx))).isFalse(); |
| sync(table.put(ctx, ROW_NAME, "value", String.class)); |
| assertThat(sync(row.exists(ctx))).isTrue(); |
| assertThat(sync(row.get(ctx, String.class))).isEqualTo("value"); |
| assertThat(sync(table.get(ctx, ROW_NAME, String.class))).isEqualTo("value"); |
| } |
| |
| public void testDatabaseExec() throws Exception { |
| Database db = createDatabase(createApp(createService())); |
| Table table = createTable(db); |
| Foo foo = new Foo(4, "f"); |
| Bar bar = new Bar(0.5f, "b"); |
| Baz baz = new Baz("John Doe", true); |
| |
| sync(table.put(ctx, "foo", foo, Foo.class)); |
| sync(table.put(ctx, "bar", bar, Bar.class)); |
| sync(table.put(ctx, "baz", baz, Baz.class)); |
| |
| { |
| DatabaseCore.QueryResults results = sync(db.exec(ctx, |
| "select k, v.Name from " + TABLE_NAME + " where Type(v) like \"%Baz\"")); |
| assertThat(results.columnNames()).containsExactly("k", "v.Name"); |
| assertThat(sync(InputChannels.asList(results))).containsExactly(ImmutableList.of( |
| new VdlAny(String.class, "baz"), new VdlAny(String.class, baz.name))); |
| } |
| { |
| DatabaseCore.QueryResults results = |
| sync(db.exec(ctx, "select k, v from " + TABLE_NAME)); |
| assertThat(results.columnNames()).containsExactly("k", "v"); |
| assertThat(sync(InputChannels.asList(results))).containsExactly( |
| ImmutableList.of(new VdlAny(String.class, "bar"), new VdlAny(Bar.class, bar)), |
| ImmutableList.of(new VdlAny(String.class, "baz"), new VdlAny(Baz.class, baz)), |
| ImmutableList.of(new VdlAny(String.class, "foo"), new VdlAny(Foo.class, foo)) |
| ); |
| } |
| { |
| DatabaseCore.QueryResults results = sync(db.exec(ctx, |
| "select k, v from " + TABLE_NAME + " where k = ? or v.I = ?", |
| Arrays.<Object>asList("baz", 4), |
| Arrays.<Type>asList(String.class, int.class))); |
| assertThat(results.columnNames()).containsExactly("k", "v"); |
| assertThat(sync(InputChannels.asList(results))).containsExactly( |
| ImmutableList.of(new VdlAny(String.class, "baz"), new VdlAny(Baz.class, baz)), |
| ImmutableList.of(new VdlAny(String.class, "foo"), new VdlAny(Foo.class, foo)) |
| ); |
| } |
| } |
| |
| public void testDatabaseWatch() throws Exception { |
| Database db = createDatabase(createApp(createService())); |
| Table table = createTable(db); |
| Foo foo = new Foo(4, "f"); |
| Bar bar = new Bar(0.5f, "b"); |
| Baz baz = new Baz("John Doe", true); |
| ResumeMarker marker = sync(db.getResumeMarker(ctx)); |
| |
| sync(table.put(ctx, "foo", foo, Foo.class)); |
| sync(table.put(ctx, "bar", bar, Bar.class)); |
| sync(table.put(ctx, "baz", baz, Baz.class)); |
| sync(table.getRow("baz").delete(ctx)); |
| |
| ImmutableList<WatchChange> expectedChanges = ImmutableList.of( |
| new WatchChange(TABLE_NAME, "bar", ChangeType.PUT_CHANGE, |
| VomUtil.encode(bar, Bar.class), null, false, false), |
| new WatchChange(TABLE_NAME, "baz", ChangeType.PUT_CHANGE, |
| VomUtil.encode(baz, Baz.class), null, false, false), |
| new WatchChange(TABLE_NAME, "baz", ChangeType.DELETE_CHANGE, |
| new byte[0], null, false, false)); |
| VContext ctxC = ctx.withCancel(); |
| Iterator<WatchChange> it = InputChannels.asIterable( |
| db.watch(ctxC, TABLE_NAME, "b", marker)).iterator(); |
| checkWatch(it, expectedChanges); |
| ctxC.cancel(); |
| } |
| |
| public void testDatabaseWatchWithInitialState() throws Exception { |
| Database db = createDatabase(createApp(createService())); |
| Table table = createTable(db); |
| Foo foo = new Foo(4, "f"); |
| Bar bar = new Bar(0.5f, "b"); |
| Baz baz = new Baz("John Doe", true); |
| |
| sync(table.put(ctx, "foo", foo, Foo.class)); |
| sync(table.put(ctx, "barfoo", foo, Foo.class)); |
| sync(table.put(ctx, "bar", bar, Bar.class)); |
| |
| VContext ctxC = ctx.withCancel(); |
| Iterator<WatchChange> it = InputChannels.asIterable( |
| db.watch(ctxC, TABLE_NAME, "b")).iterator(); |
| |
| ImmutableList<WatchChange> expectedInitialChanges = ImmutableList.of( |
| new WatchChange(TABLE_NAME, "bar", ChangeType.PUT_CHANGE, |
| VomUtil.encode(bar, Bar.class), null, false, true), |
| new WatchChange(TABLE_NAME, "barfoo", ChangeType.PUT_CHANGE, |
| VomUtil.encode(foo, Foo.class), null, false, false)); |
| checkWatch(it, expectedInitialChanges); |
| |
| sync(table.put(ctx, "baz", baz, Baz.class)); |
| sync(table.getRow("baz").delete(ctx)); |
| |
| ImmutableList<WatchChange> expectedChanges = ImmutableList.of( |
| new WatchChange(TABLE_NAME, "baz", ChangeType.PUT_CHANGE, |
| VomUtil.encode(baz, Baz.class), null, false, false), |
| new WatchChange(TABLE_NAME, "baz", ChangeType.DELETE_CHANGE, |
| new byte[0], null, false, false)); |
| checkWatch(it, expectedChanges); |
| ctxC.cancel(); |
| } |
| |
| public void testDatabaseWatchWithContextCancel() throws Exception { |
| final VContext ctxC = ctx.withCancel(); |
| Database db = createDatabase(createApp(createService())); |
| createTable(db); |
| |
| InputChannel<WatchChange> channel = db.watch( |
| ctxC, TABLE_NAME, "b", sync(db.getResumeMarker(ctx))); |
| new Thread(new Runnable() { |
| @Override |
| public void run() { |
| ctxC.cancel(); |
| } |
| }).start(); |
| try { |
| sync(InputChannels.asList(channel)); |
| } catch (CanceledException e) { |
| // OK |
| } |
| } |
| |
| public void testBatch() throws Exception { |
| Database db = createDatabase(createApp(createService())); |
| Table table = createTable(db); |
| assertThat(sync(InputChannels.asList(table.scan(ctx, RowRange.prefix(""))))).isEmpty(); |
| |
| BatchDatabase batchFoo = sync(db.beginBatch(ctx, null)); |
| Table batchFooTable = batchFoo.getTable(TABLE_NAME); |
| assertThat(sync(batchFooTable.exists(ctx))).isTrue(); |
| sync(batchFooTable.put(ctx, ROW_NAME, "foo", String.class)); |
| // Assert that value is visible inside the batch but not outside. |
| assertThat(sync(batchFooTable.get(ctx, ROW_NAME, String.class))).isEqualTo("foo"); |
| assertThat(sync(table.getRow(ROW_NAME).exists(ctx))).isFalse(); |
| |
| BatchDatabase batchBar = sync(db.beginBatch(ctx, null)); |
| Table batchBarTable = batchBar.getTable(TABLE_NAME); |
| assertThat(sync(batchBarTable.exists(ctx))).isTrue(); |
| sync(batchBarTable.put(ctx, ROW_NAME, "foo", String.class)); |
| // Assert that value is visible inside the batch but not outside. |
| assertThat(sync(batchBarTable.get(ctx, ROW_NAME, String.class))).isEqualTo("foo"); |
| assertThat(sync(table.getRow(ROW_NAME).exists(ctx))).isFalse(); |
| |
| sync(batchFoo.commit(ctx)); |
| // Assert that the value is visible outside the batch. |
| assertThat(sync(table.get(ctx, ROW_NAME, String.class))).isEqualTo("foo"); |
| |
| try { |
| sync(batchBar.commit(ctx)); |
| fail("Expected batchBar.commit() to fail"); |
| } catch (VException e) { |
| // ok |
| } |
| } |
| |
| public void testRunInBatch() throws Exception { |
| final Database d = createDatabase(createApp(createService())); |
| Table table = createTable(d); |
| |
| sync(NoSql.runInBatch(ctx, d, new BatchOptions(), new NoSql.BatchOperation() { |
| private int retries = 0; |
| @Override |
| public ListenableFuture<Void> run(BatchDatabase b) { |
| ++retries; |
| String fooKey = String.format("foo-%d", retries); |
| String barKey = String.format("bar-%d", retries); |
| try { |
| // Read foo. It does not exist. |
| try { |
| sync(b.getTable(TABLE_NAME).get(ctx, fooKey, String.class)); |
| throw new VException("Expected b.get() to fail with NoExistException"); |
| } catch (NoExistException e) { |
| // ok |
| } |
| // If we need to fail the commit, write to foo in a separate concurrent batch. |
| if (retries < 2) { |
| sync(d.getTable(TABLE_NAME).put(ctx, fooKey, "foo", String.class)); |
| } |
| // Write to bar. |
| sync(b.getTable(TABLE_NAME).put(ctx, barKey, "bar", String.class)); |
| } catch (VException e) { |
| return Futures.immediateFailedFuture(e); |
| } |
| return Futures.immediateFuture(null); |
| } |
| })); |
| |
| // First try failed (wrote foo), second succeeded (bar commit succeeded). |
| assertThat(sync(InputChannels.asList(table.scan(ctx, RowRange.prefix(""))))) |
| .containsExactly( |
| new KeyValue("bar-2", VomUtil.encode("bar", String.class)), |
| new KeyValue("foo-1", VomUtil.encode("foo", String.class))); |
| } |
| |
| public void testRunInBatchReadOnly() throws Exception { |
| final Database d = createDatabase(createApp(createService())); |
| Table table = createTable(d); |
| sync(table.put(ctx, "foo", "foo", String.class)); |
| |
| sync(NoSql.runInBatch(ctx, d, new BatchOptions("", true), new NoSql.BatchOperation() { |
| @Override |
| public ListenableFuture<Void> run(BatchDatabase b) { |
| try { |
| // Read foo. |
| Object before = sync(b.getTable(TABLE_NAME).get(ctx, "foo", String.class)); |
| // Write to foo in a separate concurrent batch. It should not cause a retry |
| // since readonly batches are not committed. |
| sync(d.getTable(TABLE_NAME).put(ctx, "foo", "oof", String.class)); |
| // Read foo again. Batch should not see the changed value. |
| Object after = sync(b.getTable(TABLE_NAME).get(ctx, "foo", String.class)); |
| if (!before.equals(after)) { |
| throw new VException("batch should not see concurrently changed value"); |
| } |
| // Try writing to bar. This should fail since the batch is readonly. |
| try { |
| sync(b.getTable(TABLE_NAME).put(ctx, "bar", "bar", String.class)); |
| throw new VException("Expected b.put() to fail with ReadOnlyBatchException"); |
| } catch (ReadOnlyBatchException e) { |
| // ok |
| } |
| } catch (VException e) { |
| return Futures.immediateFailedFuture(e); |
| } |
| return Futures.immediateFuture(null); |
| } |
| })); |
| |
| // Single uncommitted iteration. |
| assertThat(sync(InputChannels.asList(table.scan(ctx, RowRange.prefix(""))))) |
| .containsExactly( |
| new KeyValue("foo", VomUtil.encode("oof", String.class))); |
| } |
| |
| public void testSyncgroup() throws Exception { |
| Database db = createDatabase(createApp(createService())); |
| String groupName = "test"; |
| |
| // "A" creates the group. |
| SyncgroupSpec spec = new SyncgroupSpec("test", allowAll, |
| ImmutableList.of(new TableRow(TABLE_NAME, "")), |
| ImmutableList.<String>of(), false); |
| SyncgroupMemberInfo memberInfo = new SyncgroupMemberInfo(); |
| memberInfo.setSyncPriority((byte) 1); |
| Syncgroup group = db.getSyncgroup(groupName); |
| { |
| sync(group.create(ctx, spec, memberInfo)); |
| assertThat(sync(db.listSyncgroupNames(ctx))).containsExactly(groupName); |
| assertThat(sync(group.getSpec(ctx)).values()).containsExactly(spec); |
| assertThat(sync(group.getMembers(ctx)).values()).containsExactly(memberInfo); |
| assertThat(sync(group.join(ctx, memberInfo))).isEqualTo(spec); |
| } |
| // TODO(spetrovic): test leave() and destroy(). |
| |
| SyncgroupSpec specRMW = new SyncgroupSpec("testRMW", allowAll, |
| ImmutableList.of(new TableRow(TABLE_NAME, "")), |
| ImmutableList.<String>of(), false); |
| assertThat(sync(group.getSpec(ctx)).keySet()).isNotEmpty(); |
| String version = sync(group.getSpec(ctx)).keySet().iterator().next(); |
| sync(group.setSpec(ctx, specRMW, version)); |
| assertThat(sync(group.getSpec(ctx)).values()).containsExactly(specRMW); |
| SyncgroupSpec specOverwrite = new SyncgroupSpec("testOverwrite", allowAll, |
| ImmutableList.of(new TableRow(TABLE_NAME, "")), |
| ImmutableList.<String>of(), false); |
| sync(group.setSpec(ctx, specOverwrite, "")); |
| assertThat(sync(group.getSpec(ctx)).values()).containsExactly(specOverwrite); |
| } |
| |
| // TODO(spetrovic): Test Database.enforceSchema(). |
| |
| public void testBlobSmall() throws Exception { |
| byte[] data = new byte[]{ 1, 2, 3, 4, 5 }; |
| Database db = createDatabase(createApp(createService())); |
| BlobWriter writer = sync(db.writeBlob(ctx, null)); |
| OutputStream out = writer.stream(ctx); |
| out.write(data); |
| out.close(); |
| assertThat(sync(writer.size(ctx))).isEqualTo(data.length); |
| sync(writer.commit(ctx)); |
| BlobRef ref = writer.getRef(); |
| |
| BlobReader reader = db.readBlob(ctx, ref); |
| byte[] actual = new byte[data.length]; |
| ByteStreams.readFully(reader.stream(ctx, 0), actual); |
| assertThat(actual).isEqualTo(data); |
| } |
| |
| public void testBlobLarge() throws Exception { |
| byte[] data = new byte[1 << 17]; |
| for (int i = 0; i < data.length; ++i) { |
| data[i] = (byte)(i & 0xFF); |
| } |
| Database db = createDatabase(createApp(createService())); |
| BlobWriter writer = sync(db.writeBlob(ctx, null)); |
| OutputStream out = writer.stream(ctx); |
| out.write(data); |
| out.close(); |
| assertThat(sync(writer.size(ctx))).isEqualTo(data.length); |
| sync(writer.commit(ctx)); |
| BlobRef ref = writer.getRef(); |
| |
| BlobReader reader = db.readBlob(ctx, ref); |
| byte[] actual = new byte[data.length]; |
| ByteStreams.readFully(reader.stream(ctx, 0), actual); |
| assertThat(actual).isEqualTo(data); |
| } |
| |
| public void testBlobWriteResume() throws Exception { |
| Database db = createDatabase(createApp(createService())); |
| BlobWriter writer = sync(db.writeBlob(ctx, null)); |
| BlobRef ref = writer.getRef(); |
| byte[] data = new byte[]{ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }; |
| { |
| // Write, part 1. |
| OutputStream out = writer.stream(ctx); |
| out.write(data, 0, data.length / 2); |
| out.close(); |
| assertThat(sync(writer.size(ctx))).isEqualTo(data.length / 2); |
| } |
| { |
| // Write, part 2. |
| writer = sync(db.writeBlob(ctx, ref)); |
| assertThat(sync(writer.size(ctx))).isEqualTo(5); |
| OutputStream out = writer.stream(ctx); |
| out.write(data, data.length / 2, data.length / 2); |
| out.close(); |
| assertThat(sync(writer.size(ctx))).isEqualTo(data.length); |
| sync(writer.commit(ctx)); |
| } |
| // Read. |
| BlobReader reader = db.readBlob(ctx, ref); |
| byte[] actual = new byte[data.length]; |
| ByteStreams.readFully(reader.stream(ctx, 0), actual); |
| assertThat(actual).isEqualTo(data); |
| } |
| |
| public void testBlobWriteCommitted() throws Exception { |
| byte[] data = new byte[]{ 1, 2, 3, 4, 5 }; |
| Database db = createDatabase(createApp(createService())); |
| BlobWriter writer = sync(db.writeBlob(ctx, null)); |
| BlobRef ref = writer.getRef(); |
| OutputStream out = writer.stream(ctx); |
| out.write(data); |
| out.close(); |
| assertThat(sync(writer.size(ctx))).isEqualTo(data.length); |
| sync(writer.commit(ctx)); |
| |
| try { |
| out = writer.stream(ctx); |
| out.write(data); |
| out.close(); |
| fail("write of a committed blob should fail"); |
| } catch (Exception e) { |
| // OK |
| } |
| try { |
| sync(writer.commit(ctx)); |
| fail("commit of a committed blob should fail"); |
| } catch (VException e) { |
| // OK |
| } |
| |
| BlobReader reader = db.readBlob(ctx, ref); |
| byte[] actual = new byte[data.length]; |
| ByteStreams.readFully(reader.stream(ctx, 0), actual); |
| assertThat(actual).isEqualTo(data); |
| } |
| |
| public void testBlobWriteCancelable() throws Exception { |
| Database db = createDatabase(createApp(createService())); |
| VContext ctxC = ctx.withCancel(); |
| BlobWriter writer = sync(db.writeBlob(ctxC, null)); |
| BlobRef ref = writer.getRef(); |
| byte[] data = new byte[]{ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }; |
| |
| // Write 1st chunk. |
| OutputStream out = writer.stream(ctxC); |
| out.write(data, 0, data.length / 2); |
| ctxC.cancel(); |
| // Write 2nd chunk. |
| try { |
| out.write(data, data.length / 2, data.length / 2); |
| out.close(); |
| fail("write on a canceled stream should fail"); |
| } catch (IOException e) { |
| // OK |
| } |
| } |
| |
| public void testBlobReadUncommitted() throws Exception { |
| Database db = createDatabase(createApp(createService())); |
| BlobWriter writer = sync(db.writeBlob(ctx, null)); |
| BlobRef ref = writer.getRef(); |
| byte[] data = new byte[]{ 1, 2, 3, 4, 5 }; |
| OutputStream out = writer.stream(ctx); |
| out.write(data, 0, data.length); |
| out.close(); |
| |
| BlobReader reader = db.readBlob(ctx, ref); |
| try { |
| byte[] actual = new byte[data.length]; |
| ByteStreams.readFully(reader.stream(ctx, 0), actual); |
| fail("read of an uncommitted blob should fail"); |
| } catch (IOException e) { |
| // OK |
| } |
| try { |
| sync(reader.prefetch(ctx, 0).recv()); |
| } catch (VException e) { |
| // OK |
| } |
| } |
| |
| public void testBlobReadPrefetch() throws Exception { |
| Database db = createDatabase(createApp(createService())); |
| BlobWriter writer = sync(db.writeBlob(ctx, null)); |
| BlobRef ref = writer.getRef(); |
| byte[] data = new byte[]{ 1, 2, 3, 4, 5 }; |
| OutputStream out = writer.stream(ctx); |
| out.write(data, 0, data.length); |
| out.close(); |
| sync(writer.commit(ctx)); |
| |
| // Prefetch |
| BlobReader reader = db.readBlob(ctx, ref); |
| sync(InputChannels.asDone(reader.prefetch(ctx, 0))); |
| // Read |
| byte[] actual = new byte[data.length]; |
| ByteStreams.readFully(reader.stream(ctx, 0), actual); |
| assertThat(actual).isEqualTo(data); |
| } |
| |
| public void testBlobReadClosedStream() throws Exception { |
| byte[] data = new byte[]{ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }; |
| Database db = createDatabase(createApp(createService())); |
| BlobWriter writer = sync(db.writeBlob(ctx, null)); |
| OutputStream out = writer.stream(ctx); |
| out.write(data); |
| out.close(); |
| assertThat(sync(writer.size(ctx))).isEqualTo(data.length); |
| sync(writer.commit(ctx)); |
| BlobRef ref = writer.getRef(); |
| |
| BlobReader reader = db.readBlob(ctx, ref); |
| byte[] actual = new byte[data.length / 2]; |
| InputStream in = reader.stream(ctx, 0); |
| // Read 1st chunk. |
| ByteStreams.readFully(in, actual); |
| assertThat(actual).isEqualTo(new byte[]{1, 2, 3, 4, 5}); |
| // Close the input stream. |
| in.close(); |
| // Read 2nd chunk. |
| try { |
| ByteStreams.readFully(in, actual); |
| fail("read of a closed stream should fail"); |
| } catch (IOException e) { |
| // OK |
| } |
| } |
| |
| |
| private SyncbaseService createService() throws Exception { |
| return Syncbase.newService(serverEndpoint.name()); |
| } |
| |
| private SyncbaseApp createApp(SyncbaseService service) throws Exception { |
| SyncbaseApp app = service.getApp(APP_NAME); |
| sync(app.create(ctx, allowAll)); |
| return app; |
| } |
| |
| private Database createDatabase(SyncbaseApp app) throws Exception { |
| Database db = app.getNoSqlDatabase(DB_NAME, null); |
| sync(db.create(ctx, allowAll)); |
| return db; |
| } |
| |
| private Table createTable(Database db) throws Exception { |
| Table table = db.getTable(TABLE_NAME); |
| sync(table.create(ctx, allowAll)); |
| return table; |
| } |
| |
| private void checkWatch(Iterator<WatchChange> it, |
| List<WatchChange> expectedChanges) throws Exception { |
| for (WatchChange expected : expectedChanges) { |
| assertThat(it.hasNext()).isTrue(); |
| WatchChange actual = it.next(); |
| assertThat(actual.getTableName()).isEqualTo(expected.getTableName()); |
| assertThat(actual.getRowName()).isEqualTo(expected.getRowName()); |
| assertThat(actual.getChangeType()).isEqualTo(expected.getChangeType()); |
| assertThat(actual.getVomValue()).isEqualTo(expected.getVomValue()); |
| assertThat(actual.isFromSync()).isEqualTo(expected.isFromSync()); |
| assertThat(actual.isContinued()).isEqualTo(expected.isContinued()); |
| } |
| } |
| |
| private static class Foo implements Serializable { |
| private int i; |
| private String s; |
| |
| public Foo() { |
| this.i = 0; |
| this.s = ""; |
| } |
| |
| public Foo(int i, String s) { |
| this.i = i; |
| this.s = s; |
| } |
| |
| @Override |
| public boolean equals(Object o) { |
| if (this == o) return true; |
| if (o == null || getClass() != o.getClass()) return false; |
| |
| Foo foo = (Foo) o; |
| |
| if (i != foo.i) return false; |
| return !(s != null ? !s.equals(foo.s) : foo.s != null); |
| |
| } |
| } |
| |
| private static class Bar implements Serializable { |
| private float f; |
| private String s; |
| |
| public Bar() { |
| this.f = 0f; |
| this.s = ""; |
| } |
| |
| public Bar(float f, String s) { |
| this.f = f; |
| this.s = s; |
| } |
| |
| @Override |
| public boolean equals(Object o) { |
| if (this == o) return true; |
| if (o == null || getClass() != o.getClass()) return false; |
| |
| Bar bar = (Bar) o; |
| |
| if (Float.compare(bar.f, f) != 0) return false; |
| return !(s != null ? !s.equals(bar.s) : bar.s != null); |
| |
| } |
| } |
| |
| private static class Baz implements Serializable { |
| private String name; |
| private boolean active; |
| |
| public Baz() { |
| this.name = ""; |
| this.active = false; |
| } |
| |
| public Baz(String name, boolean active) { |
| this.name = name; |
| this.active = active; |
| } |
| |
| @Override |
| public boolean equals(Object o) { |
| if (this == o) return true; |
| if (o == null || getClass() != o.getClass()) return false; |
| |
| Baz baz = (Baz) o; |
| |
| if (active != baz.active) return false; |
| return !(name != null ? !name.equals(baz.name) : baz.name != null); |
| |
| } |
| } |
| } |