blob: b895f9f9dfa5d74fd590852668c9a0d4fbcc4f7a [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package io.v.rx.syncbase;
import com.google.common.collect.Iterables;
import org.joda.time.Duration;
import io.v.rx.MountEvent;
import io.v.rx.RxNamespace;
import io.v.v23.context.VContext;
import io.v.v23.rpc.Server;
import io.v.v23.syncbase.Syncbase;
import io.v.v23.syncbase.nosql.Database;
import io.v.v23.verror.TimeoutException;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
import rx.Observable;
import static net.javacrumbs.futureconverter.guavarx.FutureConverter.toObservable;
/**
* This utility class is a short-term solution until a better solution for distributed syncgroup
* hosting is available.
*/
@Slf4j
@UtilityClass
public class SgHostUtil {
public static final Duration SYNCBASE_PING_TIMEOUT = Duration.standardSeconds(5);
/**
* @return an observable that emits a single boolean indicating whether a Syncbase instance
* mounted at {@code name} is responsive within {@link #SYNCBASE_PING_TIMEOUT}.
*/
public static Observable<Boolean> isSyncbaseOnline(final VContext vContext, final String name) {
final VContext pingContext = vContext.withTimeout(SYNCBASE_PING_TIMEOUT);
/*
It would be nice if there were a more straightforward ping. We can't just query the mount
table because the server might not have shut down cleanly.
TODO(rosswang): I think sadovsky@ has added this, but it doesn't appear exposed yet.
*/
return toObservable(Syncbase.newService(name).getApp("ping").exists(pingContext))
.map(e -> true)
.onErrorResumeNext(t -> t instanceof TimeoutException ?
Observable.just(false) : Observable.error(t));
}
/**
* @return {@code true} iff the mount event represents a successful mount.
*/
private static boolean processMountEvent(final MountEvent e) {
e.getServer().ifPresentOrElse(
s -> e.getError().ifPresentOrElse(
err -> log.error(String.format("Could not %s local Syncbase instance %s " +
"as syncgroup host %s",
e.isMount() ? "mount" : "unmount", s, e.getName()), err),
() -> log.info("{} local Syncbase instance {} as syncgroup host {}",
e.isMount() ? "Mounted" : "Unmounted", s, e.getName())
),
() -> e.getError().ifPresentOrElse(
err -> log.error("Could not mount local Syncbase instance as syncgroup " +
"host " + e.getName(), err),
() -> log.info("Mounting local Syncbase instance as syncgroup host " +
e.getName()))
);
return e.isSuccessfulMount();
}
private static Observable<MountEvent> mountSgHost(final Observable<Server> rxServer,
final String name) {
return RxNamespace.mount(rxServer, name)
.filter(SgHostUtil::processMountEvent)
.retry((i, t) -> {
log.error("Error maintaining mount of local Syncbase instance as " +
"syncgroup host " + name, t);
return t instanceof Exception;
})
.replay().refCount();
}
/**
* @return an observable that emits an item when a Syncbase instance is known to be hosted at
* the given name. The mount is updated for any new server instances until this observable has
* been unsubscribed.
*/
public static Observable<Object> ensureSyncgroupHost(
final VContext vContext, final Observable<Server> rxServer, final String name) {
return rxServer.switchMap(s -> isSyncbaseOnline(vContext, name)
.flatMap(online -> online ? Observable.just(0) : mountSgHost(rxServer, name)));
}
/**
* @return an observable that echos the db after the each db emitted by
* {@link RxDb#getObservable()} has been ensured to possess the given table names. Upon
* subscription, for each db emitted, the observable will create these app/db/table hierarchies
* if not already present.
*/
public static Observable<Database> ensureSyncgroupHierarchies(
final RxDb rxDb, final Iterable<String> tableNames) {
return rxDb.getObservable().switchMap(db -> Observable.merge(Iterables.transform(tableNames,
t -> rxDb.rxTable(t)
.mapFrom(db)
.map(rxt -> db)))
.lastOrDefault(db));
}
}