blob: a669fd794378d757a9c58da4952bdbd17cdbbb1e [file] [log] [blame]
// Copyright 2016 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package io.v.syncbase;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import io.v.v23.InputChannel;
import io.v.v23.InputChannelCallback;
import io.v.v23.InputChannels;
import io.v.v23.VFutures;
import io.v.v23.services.syncbase.CollectionRowPattern;
import io.v.v23.services.syncbase.SyncgroupSpec;
import io.v.v23.syncbase.Batch;
import io.v.v23.verror.ExistException;
import io.v.v23.verror.VException;
/**
* A set of collections and syncgroups.
* To get a Database handle, call {@code Syncbase.database}.
*/
public class Database extends DatabaseHandle {
private final io.v.v23.syncbase.Database mVDatabase;
private final Object mSyncgroupInviteHandlersMu = new Object();
private final Object mWatchChangeHandlersMu = new Object();
private Map<SyncgroupInviteHandler, Runnable> mSyncgroupInviteHandlers = new HashMap<>();
private Map<WatchChangeHandler, Runnable> mWatchChangeHandlers = new HashMap<>();
protected void createIfMissing() {
try {
VFutures.sync(mVDatabase.create(Syncbase.getVContext(), Syncbase.defaultPerms()));
} catch (ExistException e) {
// Database already exists, presumably from a previous run of the app.
} catch (VException e) {
throw new RuntimeException("Failed to create database", e);
}
}
protected Database(io.v.v23.syncbase.Database vDatabase) {
super(vDatabase);
mVDatabase = vDatabase;
}
@Override
public Collection collection(String name, CollectionOptions opts) {
Collection res = getCollection(new Id(Syncbase.getPersonalBlessingString(), name));
res.createIfMissing();
// TODO(sadovsky): Unwind collection creation on syncgroup creation failure? It would be
// nice if we could create the collection and syncgroup in a batch.
if (!opts.withoutSyncgroup) {
syncgroup(name, ImmutableList.of(res), new SyncgroupOptions());
}
return res;
}
/**
* FOR ADVANCED USERS. Options for syncgroup creation.
*/
public static class SyncgroupOptions {
// TODO(sadovsky): Fill this in.
}
/**
* FOR ADVANCED USERS. Creates syncgroup and adds it to the user's "userdata" collection, as
* needed. Idempotent. The id of the new syncgroup will include the creator's user id and the
* given syncgroup name. Requires that all collections were created by the current user.
*
* @param name name of the syncgroup
* @param collections collections in the syncgroup
* @param opts options for syncgroup creation
* @return the syncgroup
*/
public Syncgroup syncgroup(String name, List<Collection> collections, SyncgroupOptions opts) {
if (collections.isEmpty()) {
throw new RuntimeException("No collections specified");
}
Id id = new Id(collections.get(0).getId().getBlessing(), name);
for (Collection cx : collections) {
if (!cx.getId().getBlessing().equals(id.getBlessing())) {
throw new RuntimeException("Collections must all have the same creator");
}
}
Syncgroup res = new Syncgroup(mVDatabase.getSyncgroup(id.toVId()), this, id);
res.createIfMissing(collections);
return res;
}
/**
* Calls {@code syncgroup(name, collections, opts)} with default {@code SyncgroupOptions}.
*/
public Syncgroup syncgroup(String name, List<Collection> collections) {
return syncgroup(name, collections, new SyncgroupOptions());
}
/**
* Returns the syncgroup with the given id.
*/
public Syncgroup getSyncgroup(Id id) {
// TODO(sadovsky): Consider throwing an exception or returning null if the syncgroup does
// not exist. But note, a syncgroup can get destroyed via sync after a client obtains a
// handle for it, so perhaps we should instead add an 'exists' method.
return new Syncgroup(mVDatabase.getSyncgroup(id.toVId()), this, id);
}
/**
* Returns an iterator over all syncgroups in the database.
*/
public Iterator<Syncgroup> getSyncgroups() {
List<io.v.v23.services.syncbase.Id> vIds;
try {
vIds = VFutures.sync(mVDatabase.listSyncgroups(Syncbase.getVContext()));
} catch (VException e) {
throw new RuntimeException("listSyncgroups failed", e);
}
ArrayList<Syncgroup> sgs = new ArrayList<>(vIds.size());
for (io.v.v23.services.syncbase.Id vId : vIds) {
sgs.add(new Syncgroup(mVDatabase.getSyncgroup(vId), this, new Id(vId)));
}
return sgs.iterator();
}
/**
* Options for {@code addSyncgroupInviteHandler}.
*/
public static class AddSyncgroupInviteHandlerOptions {
// TODO(sadovsky): Fill this in.
}
/**
* Handles discovered syncgroup invites.
*/
public static abstract class SyncgroupInviteHandler {
/**
* Called when a syncgroup invitation is discovered. Clients typically handle invites by
* calling {@code acceptSyncgroupInvite} or {@code ignoreSyncgroupInvite}.
*/
public void onInvite(SyncgroupInvite invite) {
}
/**
* Called when an error occurs while scanning for syncgroup invitations. Once
* {@code onError} is called, no other methods will be called on this handler.
*/
public void onError(Throwable e) {
throw new RuntimeException(e);
}
}
// TODO(sadovsky): Document which thread the handler methods are called on.
/**
* Notifies {@code h} of any existing syncgroup invites, and of all subsequent new invites.
*/
public void addSyncgroupInviteHandler(SyncgroupInviteHandler h, AddSyncgroupInviteHandlerOptions opts) {
synchronized (mSyncgroupInviteHandlersMu) {
throw new RuntimeException("Not implemented");
}
}
/**
* Calls {@code addSyncgroupInviteHandler(h, opts)} with default
* {@code AddSyncgroupInviteHandlerOptions}.
*/
public void addSyncgroupInviteHandler(SyncgroupInviteHandler h) {
addSyncgroupInviteHandler(h, new AddSyncgroupInviteHandlerOptions());
}
/**
* Makes it so {@code h} stops receiving notifications.
*/
public void removeSyncgroupInviteHandler(SyncgroupInviteHandler h) {
synchronized (mSyncgroupInviteHandlersMu) {
Runnable cancel = mSyncgroupInviteHandlers.remove(h);
if (cancel != null) {
cancel.run();
}
}
}
/**
* Makes it so all syncgroup invite handlers stop receiving notifications.
*/
public void removeAllSyncgroupInviteHandlers() {
synchronized (mSyncgroupInviteHandlersMu) {
for (Runnable cancel : mSyncgroupInviteHandlers.values()) {
cancel.run();
}
mSyncgroupInviteHandlers.clear();
}
}
public static abstract class AcceptSyncgroupInviteCallback {
public void onSuccess(Syncgroup sg) {
}
public void onFailure(Throwable e) {
throw new RuntimeException(e);
}
}
/**
* Joins the syncgroup associated with the given invite and adds it to the user's "userdata"
* collection, as needed. The passed callback is called on the current thread.
*
* @param invite the syncgroup invite
* @param cb the callback to call with the syncgroup handle
*/
public void acceptSyncgroupInvite(SyncgroupInvite invite, final AcceptSyncgroupInviteCallback cb) {
// TODO(sadovsky): Should we add "accept" and "ignore" methods to the SyncgroupInvite class,
// or should we treat it as a POJO (with no reference to Database)?
io.v.v23.syncbase.Syncgroup vSyncgroup = mVDatabase.getSyncgroup(invite.getId().toVId());
final Syncgroup syncgroup = new Syncgroup(vSyncgroup, this, invite.getId());
ListenableFuture<SyncgroupSpec> future = vSyncgroup.join(Syncbase.getVContext(), invite.getRemoteSyncbaseName(), invite.getExpectedSyncbaseBlessings(), Syncgroup.newSyncgroupMemberInfo());
Futures.addCallback(future, new FutureCallback<SyncgroupSpec>() {
@Override
public void onSuccess(@Nullable SyncgroupSpec result) {
cb.onSuccess(syncgroup);
}
@Override
public void onFailure(Throwable e) {
cb.onFailure(e);
}
});
}
/**
* Records that the user has ignored this invite, such that it's never surfaced again.
*
* @param invite the syncgroup invite
*/
public void ignoreSyncgroupInvite(SyncgroupInvite invite) {
// Note: This will be one of the last things we implement.
// TODO(sadovsky): Maybe document how to read/write rejection metadata in the userdata
// collection, for advanced users.
throw new RuntimeException("Not implemented");
}
/**
* Options for a batch.
*/
public static class BatchOptions {
public boolean readOnly;
protected io.v.v23.services.syncbase.BatchOptions toVBatchOptions() {
io.v.v23.services.syncbase.BatchOptions res = new io.v.v23.services.syncbase.BatchOptions();
res.setReadOnly(true);
return res;
}
}
/**
* Designed for use in {@code runInBatch}.
*/
public interface BatchOperation {
void run(BatchDatabase db);
}
/**
* Runs the given operation in a batch, managing retries and commit/abort. Writable batches are
* committed, retrying if commit fails due to a concurrent batch. Read-only batches are aborted.
*
* @param op the operation to run
* @param opts options for this batch
*/
public void runInBatch(final BatchOperation op, BatchOptions opts) {
ListenableFuture<Void> future = Batch.runInBatch(Syncbase.getVContext(), mVDatabase, opts.toVBatchOptions(), new Batch.BatchOperation() {
@Override
public ListenableFuture<Void> run(io.v.v23.syncbase.BatchDatabase vBatchDatabase) {
final SettableFuture<Void> res = SettableFuture.create();
try {
op.run(new BatchDatabase(vBatchDatabase));
res.set(null);
} catch (Exception e) {
res.setException(e);
}
return res;
}
});
try {
VFutures.sync(future);
} catch (VException e) {
throw new RuntimeException("runInBatch failed", e);
}
}
/**
* Creates a new batch. Instead of calling this function directly, clients are encouraged to use
* the {@code runInBatch} helper function, which detects "concurrent batch" errors and handles
* retries internally.
* <p>
* Default concurrency semantics:
* <ul>
* <li>Reads (e.g. gets, scans) inside a batch operate over a consistent snapshot taken during
* {@code beginBatch}, and will see the effects of prior writes performed inside the batch.</li>
* <li>{@code commit} may fail with {@code ConcurrentBatchException}, indicating that after
* {@code beginBatch} but before {@code commit}, some concurrent routine wrote to a key that
* matches a key or row-range read inside this batch.</li>
* <li>Other methods will never fail with error {@code ConcurrentBatchException}, even if it is
* known that {@code commit} will fail with this error.</li>
* </ul>
* <p>
* Once a batch has been committed or aborted, subsequent method calls will fail with no
* effect.
* <p>
* Concurrency semantics can be configured using BatchOptions.
*
* @param opts options for this batch
* @return the batch handle
*/
public BatchDatabase beginBatch(BatchOptions opts) {
io.v.v23.syncbase.BatchDatabase vBatchDatabase;
try {
vBatchDatabase = VFutures.sync(mVDatabase.beginBatch(Syncbase.getVContext(), opts.toVBatchOptions()));
} catch (VException e) {
throw new RuntimeException("beginBatch failed", e);
}
return new BatchDatabase(vBatchDatabase);
}
/**
* Options for {@code addWatchChangeHandler}.
*/
public static class AddWatchChangeHandlerOptions {
public byte[] resumeMarker;
}
/**
* Handles observed changes to the database.
*/
public static abstract class WatchChangeHandler {
// TODO(sadovsky): Consider adopting Aaron's suggestion of combining onInitialState and
// onChangeBatch into a single method, to make things simpler for developers who don't want
// to apply deltas to their in-memory data structures:
// void onChangeBatch(Iterator<WatchChange> values, Iterator<WatchChange> changes)
/**
* Called once, when a watch change handler is added, to provide the initial state of the
* values being watched.
*/
public void onInitialState(Iterator<WatchChange> values) {
}
/**
* Called whenever a batch of changes is committed to the database. Individual puts/deletes
* surface as a single-change batch.
*/
public void onChangeBatch(Iterator<WatchChange> changes) {
}
/**
* Called when an error occurs while watching for changes. Once {@code onError} is called,
* no other methods will be called on this handler.
*/
public void onError(Throwable e) {
throw new RuntimeException(e);
}
}
// TODO(sadovsky): Document which thread the handler methods are called on.
/**
* Notifies {@code h} of initial state, and of all subsequent changes to this database.
*/
public void addWatchChangeHandler(final WatchChangeHandler h, AddWatchChangeHandlerOptions opts) {
// Note: Eventually we'll add a watch variant that takes a query, where the query can be
// constructed using some sort of query builder API.
// TODO(sadovsky): Support specifying resumeMarker. Note, watch-from-resumeMarker may be
// problematic in that we don't track the governing ACL for changes in the watch log.
if (opts.resumeMarker != null && opts.resumeMarker.length != 0) {
throw new RuntimeException("Specifying resumeMarker is not yet supported");
}
InputChannel<io.v.v23.syncbase.WatchChange> ic = mVDatabase.watch(Syncbase.getVContext(), ImmutableList.of(new CollectionRowPattern("%", "%", "%")));
ListenableFuture<Void> future = InputChannels.withCallback(ic, new InputChannelCallback<io.v.v23.syncbase.WatchChange>() {
private boolean mGotFirstBatch = false;
private List<WatchChange> mBatch = new ArrayList<>();
@Override
public ListenableFuture<Void> onNext(io.v.v23.syncbase.WatchChange vChange) {
WatchChange change = new WatchChange(vChange);
// Ignore changes to userdata collection.
if (change.getCollectionId().getName().equals(Syncbase.USERDATA_SYNCGROUP_NAME)) {
return null;
}
mBatch.add(change);
if (!change.isContinued()) {
if (!mGotFirstBatch) {
mGotFirstBatch = true;
h.onInitialState(mBatch.iterator());
} else {
h.onChangeBatch(mBatch.iterator());
}
mBatch.clear();
}
return null;
}
});
Futures.addCallback(future, new FutureCallback<Void>() {
@Override
public void onSuccess(@Nullable Void result) {
}
@Override
public void onFailure(Throwable e) {
// TODO(sadovsky): Make sure cancellations are surfaced as such (or ignored).
h.onError(e);
}
});
synchronized (mWatchChangeHandlersMu) {
mWatchChangeHandlers.put(h, new Runnable() {
@Override
public void run() {
throw new RuntimeException("Not implemented");
}
});
}
}
/**
* Calls {@code addWatchChangeHandler(h, opts)} with default
* {@code AddWatchChangeHandlerOptions}.
*/
public void addWatchChangeHandler(WatchChangeHandler h) {
addWatchChangeHandler(h, new AddWatchChangeHandlerOptions());
}
/**
* Makes it so {@code h} stops receiving notifications.
*/
public void removeWatchChangeHandler(WatchChangeHandler h) {
synchronized (mWatchChangeHandlersMu) {
Runnable cancel = mWatchChangeHandlers.remove(h);
if (cancel != null) {
cancel.run();
}
}
}
/**
* Makes it so all watch change handlers stop receiving notifications.
*/
public void removeAllWatchChangeHandlers() {
synchronized (mWatchChangeHandlersMu) {
for (Runnable cancel : mWatchChangeHandlers.values()) {
cancel.run();
}
mWatchChangeHandlers.clear();
}
}
}