blob: e09b0a5d1e072a7c988fbf0065a97dd58827ed69 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package io.v.baku.toolkit.syncbase;
import io.v.baku.toolkit.BakuActivityTrait;
import io.v.rx.syncbase.RangeWatchBatch;
import io.v.rx.syncbase.RxTable;
import io.v.rx.syncbase.SingleWatchEvent;
import io.v.v23.syncbase.nosql.PrefixRange;
import io.v.v23.syncbase.nosql.Table;
import lombok.Getter;
import lombok.experimental.Accessors;
import rx.Observable;
import rx.Subscription;
import rx.functions.Func1;
@Accessors(prefix = "m")
public class BakuTable extends RxTable {
private final BakuActivityTrait<?> mBakuContext;
public BakuTable(final RxTable t, final BakuActivityTrait bakuContext) {
mBakuContext = bakuContext;
private <T> Observable<T> withActivityLifecycle(final Observable<T> o) {
return o.takeUntil(x -> mBakuContext.getSubscriptions().isUnsubscribed());
public <T> Observable<RangeWatchBatch<T>> watch(
final PrefixRange prefix, final @Nullable Func1<String, Boolean> keyFilter,
final Class<T> type) {
return withActivityLifecycle(, keyFilter, type));
public <T> Observable<SingleWatchEvent<T>> watch(
final String key, final Class<T> type, final T defaultValue) {
return withActivityLifecycle(, type, defaultValue));
* While the default implementation of {@link RxTable#exec(Func1)} provides an autoConnect
* observable that requires an explicit subscription, this implementation auto-subscribes the
* default Baku error handler and immediately begins the requested operation. An explicit
* subscription unsubscribes the default error handler, and should include its own error
* handler, which may still be the default error handler.
public <T> Observable<T> exec(final Func1<Table, ListenableFuture<T>> op) {
final Observable<T> exec = super.exec(op);
final Subscription defaultSubscription = exec.subscribe(x -> {
}, mBakuContext::onSyncError);
// TODO(rosswang): This is a bit fragile, as it can result in the default onSyncError being
// called in (edge edge) cases where the async completes before ... wait a second, does the
// above have to observeOn the main thread?
return withActivityLifecycle(exec.doOnSubscribe(defaultSubscription::unsubscribe));