blob: 60aa58bb7ffdb5ad73d1843d92ad0f592a45f335 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package io.v.rx.syncbase;
import io.v.rx.RxVIterable;
import io.v.rx.VFn;
import io.v.v23.VIterable;
import io.v.v23.context.CancelableVContext;
import io.v.v23.context.VContext;
import io.v.v23.services.syncbase.nosql.BatchOptions;
import io.v.v23.services.watch.ResumeMarker;
import io.v.v23.syncbase.nosql.BatchDatabase;
import io.v.v23.syncbase.nosql.Database;
import io.v.v23.syncbase.nosql.DatabaseCore;
import io.v.v23.syncbase.nosql.Table;
import io.v.v23.syncbase.nosql.WatchChange;
import io.v.v23.verror.NoExistException;
import io.v.v23.verror.VException;
import lombok.Getter;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
import rx.subscriptions.Subscriptions;
@Accessors(prefix = "m")
@Getter
@Slf4j
public class RxTable extends RxEntity<Table, DatabaseCore> {
private final VContext mVContext;
private final String mName;
private final RxDb mRxDb;
private final Observable<Table> mObservable;
public RxTable(final String name, final RxDb rxDb) {
mVContext = rxDb.getVContext();
mName = name;
mRxDb = rxDb;
mObservable = rxDb.getObservable().map(VFn.unchecked(this::mapFrom));
}
@Override
public Table mapFrom(final DatabaseCore db) throws VException {
final Table t = db.getTable(mName);
SyncbaseEntity.compose(t::exists, t::create).ensureExists(mVContext, null);
return t;
}
private <T> T getInitial(final BatchDatabase db, final String tableName, final String key,
final Class<T> type, final T defaultValue) throws VException {
try {
@SuppressWarnings("unchecked")
final T fromGet = (T) db.getTable(tableName).get(
mVContext, key, type);
return fromGet;
} catch (final NoExistException e) {
return defaultValue;
}
}
/**
* Wraps a prefix watch stream in a key-specific observable. It remains to be seen whether it
* will be better to feature-request an exact-match watch API from Syncbase or consolidate all
* watches into one stream. Exact-match presents a cleaner API boundary but results in more
* underlying streams, whereas consolidating at the library level will usually be more efficient
* unless large portions of data won't need to be watched, and also it opens up questions of
* whether we should computationally optimize the prefix query.
*
* @return an observable wrapping the watch stream. This observable should only be subscribed to
* once, as we can only iterate over the underlying stream once.
*/
private static <T> Observable<WatchEvent<T>> observeWatchStream(
final VIterable<WatchChange> s, final String key, final T defaultValue) {
return RxVIterable.wrap(s)
.filter(c -> c.getRowName().equals(key))
/*
About the Vfn.unchecked, on error, the wrapping replay will disconnect, calling
cancellation (see cancelOnDisconnect). The possible source of VException here is VOM
decoding.
*/
.map(VFn.unchecked(c -> {
return WatchEvent.fromWatchChange(c, defaultValue);
}))
.distinctUntilChanged();
}
private void cancelContextOnDisconnect(final Subscriber<?> subscriber,
final CancelableVContext cancelable,
final String key) {
subscriber.add(Subscriptions.create(() -> {
log.debug("Cancelling watch on {}: {}", mName, key);
cancelable.cancel();
}));
}
private <T> void subscribeWatch(final Subscriber<? super WatchEvent<T>> subscriber,
final Database db, final String key, final Class<T> type,
final T defaultValue) throws VException {
/*
Watch will not work properly unless the table exists (sync will not create the table),
and table creation must happen outside the batch.
https://github.com/vanadium/issues/issues/857
*/
mapFrom(db);
final BatchDatabase batch = db.beginBatch(mVContext, new BatchOptions("", true));
final T initial = getInitial(batch, mName, key, type, defaultValue);
final ResumeMarker r = batch.getResumeMarker(mVContext);
subscriber.onNext(new WatchEvent<>(initial, r, false));
batch.abort(mVContext);
final CancelableVContext cancelable = mVContext.withCancel();
final VIterable<WatchChange> s = db.watch(cancelable, mName, key, r);
log.debug("Watching {}: {}", mName, key);
cancelContextOnDisconnect(subscriber, cancelable, key);
observeWatchStream(s, key, defaultValue).subscribe(subscriber);
}
/**
* Watches a specific Syncbase row for changes.
*
* TODO(rosswang): Cache this by args.
*/
public <T> Observable<WatchEvent<T>> watch(final String key, final Class<T> type,
final T defaultValue) {
return Observable.<WatchEvent<T>>create(subscriber -> mRxDb.getObservable()
.observeOn(Schedulers.io())
.subscribe(
VFn.unchecked(db -> {
/*
Could be an expression lambda, but that confuses both RetroLambda and
AndroidStudio.
*/
subscribeWatch(subscriber, db, key, type, defaultValue);
}),
subscriber::onError
//onComplete is connected by subscribeWatch/observeWatchStream.subscribe
))
/*
Don't create new watch streams for subsequent subscribers, but do cancel the stream
if no subscribers are listening (and restart if new subscriptions happen).
*/
.replay(1)
.refCount();
}
}