blob: b6e256ba05941395da0969350225e949919f5964 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package io.v.rx;
import io.v.v23.VIterable;
import io.v.v23.verror.VException;
import lombok.experimental.UtilityClass;
import rx.Observable;
@UtilityClass
public class RxVIterable {
/**
* Wraps a {@link VIterable} in an observable that produces the same elements and checks for
* an error from the {@code VIterable} at the end if present. This is a thin wrapper that does
* not ensure that the {@code VIterable} is only iterated over once, so the returned
* {@link Observable} should be subscribed to at most once. If multiple subscriptions are
* needed, consider a connectable observable operator, such as {@link Observable#publish()},
* {@link Observable#replay()}, {@link Observable#share()}, or {@link Observable#cache()}.
* However, if using a replay/cache, be cognizant of buffer growth.
*
* @return an observable wrapping the {@link VIterable}. This observable should only be
* subscribed to once, as we can only iterate over the underlying stream once.
*/
public static <T> Observable<T> wrap(final VIterable<T> vi) {
return Observable.from(vi)
.concatWith(Observable.defer(() -> {
final VException e = vi.error();
return e == null ? Observable.empty() : Observable.error(e);
}));
}
}