blob: 8b7aa8d0502d28cdb168a457072a2b7e2314b42c [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package io.v.rx;
import org.joda.time.Duration;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import io.v.v23.rpc.PublisherEntry;
import io.v.v23.rpc.PublisherEntryKey;
import io.v.v23.rpc.PublisherEntryValue;
import io.v.v23.rpc.Server;
import java8.util.J8Arrays;
import java8.util.stream.Collectors;
import java8.util.stream.Stream;
import lombok.experimental.UtilityClass;
import rx.Observable;
@UtilityClass
public class RxPublisherState {
/**
* Time interval for mount status polling.
*/
public static final Duration DEFAULT_POLLING_INTERVAL = Duration.standardSeconds(1);
public static Observable<Stream<PublisherEntry>> poll(final Server s, final Duration interval) {
return Observable.interval(0, interval.getMillis(), TimeUnit.MILLISECONDS)
.map(i -> J8Arrays.stream(s.getStatus().getPublisherStatus()));
}
public static Observable<Stream<PublisherEntry>> poll(final Server s) {
return poll(s, DEFAULT_POLLING_INTERVAL);
}
public static Map<PublisherEntryKey, PublisherEntryValue> index(
final Stream<PublisherEntry> state) {
return state.collect(Collectors.toMap(
PublisherEntryKey::fromPublisherEntry,
PublisherEntryValue::fromPublisherEntry));
}
public static Map<PublisherEntryKey, PublisherEntryValue> index(
final PublisherEntry[] state) {
return index(J8Arrays.stream(state));
}
public static Observable<Map<PublisherEntryKey, PublisherEntryValue>> index(
final Observable<Stream<PublisherEntry>> rx) {
return rx.map(RxPublisherState::index)
.distinctUntilChanged();
}
}