blob: 87626ba8d7f9b5186c55ca33445a244e6d1d4e61 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package io.v.rx;
import org.robotninjas.concurrent.FluentFutures;
import io.v.v23.InputChannel;
import io.v.v23.verror.EndOfFileException;
import lombok.experimental.UtilityClass;
import rx.Observable;
import rx.Subscriber;
import rx.observables.ConnectableObservable;
public class RxInputChannel {
* Wraps an {@link io.v.v23.InputChannel} in a connectable observable that produces the same
* elements.
public static <T> ConnectableObservable<T> wrap(final InputChannel<T> i) {
return Observable.<T>create(s -> connect(i, s)).publish();
private static <T> void connect(final InputChannel<T> i, final Subscriber<? super T> s) {
.onSuccess(r -> {
connect(i, s);
.onFailure(t -> {
if (t instanceof EndOfFileException) {
} else {