blob: 0d3e4eb2a2bebd94c18f1d7a9cf12b8c06591812 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package io.v.v23.syncbase;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.v.impl.google.ListenableFutureCallback;
import io.v.impl.google.naming.NamingUtil;
import io.v.v23.InputChannel;
import io.v.v23.InputChannels;
import io.v.v23.VFutures;
import io.v.v23.context.VContext;
import io.v.v23.security.BlessingPattern;
import io.v.v23.security.access.AccessList;
import io.v.v23.security.access.Constants;
import io.v.v23.security.access.Permissions;
import io.v.v23.services.permissions.ObjectClient;
import io.v.v23.services.syncbase.BatchOptions;
import io.v.v23.services.syncbase.BatchHandle;
import io.v.v23.services.syncbase.BlobRef;
import io.v.v23.services.syncbase.CollectionRowPattern;
import io.v.v23.services.syncbase.Id;
import io.v.v23.services.syncbase.DatabaseClient;
import io.v.v23.services.syncbase.DatabaseClientFactory;
import io.v.v23.services.syncbase.SchemaMetadata;
import io.v.v23.services.syncbase.StoreChange;
import io.v.v23.services.watch.Change;
import io.v.v23.services.watch.ResumeMarker;
import io.v.v23.syncbase.util.Util;
import io.v.v23.vdl.ClientRecvStream;
import io.v.v23.vdl.Types;
import io.v.v23.vdl.VdlAny;
import io.v.v23.vdl.VdlOptional;
import io.v.v23.verror.NotImplementedException;
import io.v.v23.verror.VException;
import io.v.v23.vom.VomUtil;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
class DatabaseImpl implements Database, BatchDatabase {
private final String parentFullName;
private final String fullName;
private final Id id;
private final BatchHandle batchHandle;
private final Schema schema;
private final DatabaseClient client;
DatabaseImpl(String parentFullName, Id id, Schema schema, BatchHandle batchHandle) {
this.parentFullName = parentFullName;
this.fullName = NamingUtil.join(parentFullName, Util.encodeId(id));
this.id = id;
this.batchHandle = batchHandle;
this.schema = schema;
this.client = DatabaseClientFactory.getDatabaseClient(this.fullName);
}
private static List<String> splitInTwo(String str, String separator) {
Iterator<String> iter = Splitter.on(separator).limit(2).split(str).iterator();
return ImmutableList.of(
iter.hasNext() ? iter.next() : "", iter.hasNext() ? iter.next() : "");
}
// Implements DatabaseCore interface.
@Override
public Id id() {
return this.id;
}
@Override
public String fullName() {
return this.fullName;
}
@Override
public Collection getCollection(VContext ctx, String name) {
String blessing = Util.UserBlessingFromContext(ctx);
Id id = new Id(blessing, name);
return getCollection(id);
}
public Collection getCollection(Id collectionId) {
return new CollectionImpl(this.fullName, collectionId, this.batchHandle);
}
@Override
public ListenableFuture<List<Id>> listCollections(VContext ctx) {
// See comment in v.io/v23/services/syncbase/nosql/service.vdl for why
// we can't implement listCollections using Glob (via Util.listChildren).
return client.listCollections(ctx, this.batchHandle);
}
@Override
public ListenableFuture<QueryResults> exec(VContext ctx, String query) {
return this.execInternal(ctx, query, null);
}
@Override
public ListenableFuture<QueryResults> exec(VContext ctx, String query,
List<Object> paramValues, List<Type> paramTypes) {
Preconditions.checkNotNull(paramValues);
Preconditions.checkNotNull(paramTypes);
if (paramValues.size() != paramTypes.size()) {
throw new IllegalArgumentException("Length of paramValues and paramTypes is not equal");
}
List<VdlAny> params = new ArrayList<VdlAny>();
try {
Iterator<Object> v = paramValues.iterator();
Iterator<Type> t = paramTypes.iterator();
while (v.hasNext() && t.hasNext()) {
params.add(new VdlAny(VomUtil.valueOf(v.next(), t.next())));
}
} catch (VException e) {
return VFutures.withUserLandChecks(ctx, Futures.<QueryResults>immediateFailedFuture(e));
}
return this.execInternal(ctx, query, params);
}
// Implements AccessController interface.
@Override
public ListenableFuture<Void> setPermissions(VContext ctx, Permissions perms, String version) {
return client.setPermissions(ctx, perms, version);
}
@Override
public ListenableFuture<Map<String, Permissions>> getPermissions(VContext ctx) {
return VFutures.withUserLandChecks(ctx, Futures.transform(client.getPermissions(ctx),
new Function<ObjectClient.GetPermissionsOut, Map<String, Permissions>>() {
@Override
public Map<String, Permissions> apply(ObjectClient.GetPermissionsOut perms) {
return ImmutableMap.of(perms.version, perms.perms);
}
}));
}
// Implements Database interface.
@Override
public ListenableFuture<Boolean> exists(VContext ctx) {
return client.exists(ctx);
}
@Override
public ListenableFuture<Void> create(VContext ctx, Permissions perms) {
VdlOptional metadataOpt = schema != null
? VdlOptional.of(schema.getMetadata())
: new VdlOptional<SchemaMetadata>(Types.optionalOf(SchemaMetadata.VDL_TYPE));
if (perms == null) {
// Default to giving full permissions to the creator.
String blessing = Util.UserBlessingFromContext(ctx);
AccessList acl = new AccessList(
ImmutableList.of(new BlessingPattern(blessing)), ImmutableList.<String>of());
perms = new Permissions(ImmutableMap.of(
Constants.RESOLVE.getValue(), acl,
Constants.READ.getValue(), acl,
Constants.WRITE.getValue(), acl,
Constants.ADMIN.getValue(), acl));
}
return client.create(ctx, metadataOpt, perms);
}
@Override
public ListenableFuture<Void> destroy(VContext ctx) {
return client.destroy(ctx);
}
public ListenableFuture<BatchDatabase> beginBatch(VContext ctx, BatchOptions opts) {
ListenableFuture<BatchHandle> batchFuture = client.beginBatch(ctx, opts);
final String parentFullName = this.parentFullName;
final Id id = this.id;
final Schema schema = this.schema;
return VFutures.withUserLandChecks(ctx,
Futures.transform(batchFuture, new Function<BatchHandle, BatchDatabase>() {
@Override
public BatchDatabase apply(BatchHandle batchHandle) {
return new DatabaseImpl(parentFullName, id, schema, batchHandle);
}
}));
}
@Override
public InputChannel<WatchChange> watch(VContext ctx, ResumeMarker resumeMarker,
List<CollectionRowPattern> patterns) {
return InputChannels.transform(ctx, client.watchPatterns(ctx, resumeMarker, patterns),
new InputChannels.TransformFunction<Change, WatchChange>() {
@Override
public WatchChange apply(Change change) throws VException {
return convertToWatchChange(change);
}
});
}
@Override
public InputChannel<WatchChange> watch(VContext ctx, List<CollectionRowPattern> patterns) {
return this.watch(ctx, null, patterns);
}
@Override
public ListenableFuture<ResumeMarker> getResumeMarker(VContext ctx) {
return client.getResumeMarker(ctx, this.batchHandle);
}
@Override
public Syncgroup getSyncgroup(Id sgId) {
return new SyncgroupImpl(fullName, sgId);
}
@Override
public ListenableFuture<List<Id>> listSyncgroups(VContext ctx) {
return client.listSyncgroups(ctx);
}
@Override
public ListenableFuture<BlobWriter> writeBlob(VContext ctx, BlobRef ref) {
ListenableFuture<BlobRef> refFuture = ref == null
? client.createBlob(ctx)
: Futures.immediateFuture(ref);
return VFutures.withUserLandChecks(ctx,
Futures.transform(refFuture, new Function<BlobRef, BlobWriter>() {
@Override
public BlobWriter apply(BlobRef ref) {
return new BlobWriterImpl(client, ref);
}
}));
}
@Override
public BlobReader readBlob(VContext ctx, BlobRef ref) throws VException {
if (ref == null) {
throw new VException("Must pass a non-null blob ref.");
}
return new BlobReaderImpl(client, ref);
}
@Override
public ListenableFuture<Void> enforceSchema(final VContext ctx) {
ListenableFutureCallback<Void> callback = new ListenableFutureCallback<>();
VException error = new NotImplementedException(ctx);
callback.onFailure(error);
return callback.getFuture(ctx);
}
// Implements BatchDatabase.
@Override
public ListenableFuture<Void> commit(VContext ctx) {
return client.commit(ctx, this.batchHandle);
}
@Override
public ListenableFuture<Void> abort(VContext ctx) {
return client.abort(ctx, this.batchHandle);
}
private WatchChange convertToWatchChange(Change watchChange) throws VException {
Object value = watchChange.getValue().getElem();
if (!(value instanceof StoreChange)) {
throw new VException("Expected watch data to contain StoreChange, instead got: "
+ value);
}
StoreChange storeChange = (StoreChange) value;
ChangeType changeType;
switch (watchChange.getState()) {
case io.v.v23.services.watch.Constants.EXISTS:
changeType = ChangeType.PUT_CHANGE;
break;
case io.v.v23.services.watch.Constants.DOES_NOT_EXIST:
changeType = ChangeType.DELETE_CHANGE;
break;
default:
throw new VException(
"Unsupported watch change state: " + watchChange.getState());
}
String[] parts = watchChange.getName().split("/", 2);
if (parts.length != 2) {
throw new VException("Invalid collection-row pair: " + watchChange.getName());
}
Id collectionId = Util.decodeId(parts[0]);
String rowName = parts[1];
return new WatchChange(collectionId, rowName, changeType, storeChange.getValue(),
watchChange.getResumeMarker(), storeChange.getFromSync(),
watchChange.getContinued());
}
private ListenableFuture<QueryResults> execInternal(VContext ctx, String query, List<VdlAny> params) {
final ClientRecvStream<List<VdlAny>, Void> stream =
client.exec(ctx, this.batchHandle, query, params);
return VFutures.withUserLandChecks(ctx, Futures.transform(stream.recv(),
new AsyncFunction<List<VdlAny>, QueryResults>() {
@Override
public ListenableFuture<QueryResults> apply(List<VdlAny> columnNames)
throws Exception {
return Futures.immediateFuture(
(QueryResults) new QueryResultsImpl(columnNames, stream));
}
}));
}
private static class QueryResultsImpl implements QueryResults {
private final InputChannel<List<VdlAny>> input;
private final List<String> columnNames;
private QueryResultsImpl(List<VdlAny> columnNames,
InputChannel<List<VdlAny>> input) throws VException {
this.input = input;
this.columnNames = new ArrayList<>(columnNames.size());
for (int i = 0; i < columnNames.size(); ++i) {
Object elem = columnNames.get(i).getElem();
if (elem instanceof String) {
this.columnNames.add((String) elem);
} else {
throw new VException("Expected first row in exec() stream to contain column " +
"names (of type String), got type: " + elem.getClass());
}
}
}
@Override
public ListenableFuture<List<VdlAny>> recv() {
return input.recv();
}
@Override
public List<String> columnNames() {
return columnNames;
}
}
}