blob: abeb8070f12eb47c0a5a55f88199c5126ea0af63 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package io.v.rx;
import org.joda.time.DateTime;
import org.joda.time.DateTimeConstants;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
import io.v.v23.rpc.MountStatus;
import io.v.v23.rpc.Server;
import io.v.v23.rpc.ServerStatus;
import java8.util.function.Predicate;
import lombok.experimental.Accessors;
import rx.Observable;
import rx.Subscriber;
import rx.subjects.ReplaySubject;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class RxNamespaceTest extends RxTestCase {
/**
* There is a hack in use in {@link RxNamespace} that compares any timestamp against an
* arbitrary value after 1754, currently 1970. For this test, use 2000 as the time in the past.
*/
private static final DateTime
THE_PAST = new DateTime(2000, DateTimeConstants.JANUARY, 1, 0, 0),
THE_DISTANT_PAST = new DateTime(1990, DateTimeConstants.JANUARY, 1, 0, 0);
private static final long STATUS_POLLING_DELAY_MS = verificationDelay(
RxMountState.DEFAULT_POLLING_INTERVAL);
@Accessors(prefix = "m")
private class TestSubscriber extends Subscriber<MountEvent> {
public boolean mayComplete;
/**
* This is a queue only as an implementation detail; we do not actually verify order here.
*/
public final Collection<Predicate<MountEvent>> expectedEvents =
new ConcurrentLinkedQueue<>();
@Override
public void onCompleted() {
try {
assertTrue("Unexpectedly unsubscribed", mayComplete);
} catch (final Throwable t) {
catchAsync(t);
}
}
@Override
public void onError(final Throwable t) {
catchAsync(t);
}
@Override
public void onNext(final MountEvent e) {
for (final Iterator<Predicate<MountEvent>> tests = expectedEvents.iterator();
tests.hasNext();) {
if (tests.next().test(e)) {
tests.remove();
return;
}
}
//all-else
fail("Unexpected mount event: " + e);
}
public void assertNoLeftoverExpectations() {
//It would be nice to print these, but lambdas don't stringize into anything meaningful.
assertTrue("Leftover expectations", expectedEvents.isEmpty());
}
}
private static ServerStatus mockStatus(final MountStatus... ms) {
return new ServerStatus(null, false, ms, null, null);
}
private static boolean isInitialMountAttemptStart(final MountEvent e) {
return e.isMount() && !e.getServer().isPresent() && !e.getError().isPresent();
}
private final Server mServer = mock(Server.class);
private final ReplaySubject<Server> mRxServer = ReplaySubject.create();
private final TestSubscriber mSubscriber = new TestSubscriber();
@Before
public void setUp() {
mRxServer.onNext(mServer);
}
@After
public void tearDown() throws Exception {
mSubscriber.assertNoLeftoverExpectations();
}
private void attachSubscriber(final Observable<MountEvent> mount) {
mount.share().subscribe(mSubscriber);
}
@Test
public void testAlreadyMounted() {
when(mServer.getStatus()).thenReturn(mockStatus(new MountStatus("foo", "bar",
THE_PAST, null, null, THE_DISTANT_PAST, null)));
mSubscriber.expectedEvents.add(MountEvent::isSuccessfulMount);
attachSubscriber(RxNamespace.mount(mRxServer, "foo"));
}
@Test
public void testExistingOtherNames() {
when(mServer.getStatus()).thenReturn(mockStatus(new MountStatus("baz", "bar",
THE_PAST, null, null, THE_DISTANT_PAST, null)));
mSubscriber.expectedEvents.add(RxNamespaceTest::isInitialMountAttemptStart);
attachSubscriber(RxNamespace.mount(mRxServer, "foo"));
}
private void mountWithOldStatus() {
when(mServer.getStatus()).thenReturn(mockStatus(new MountStatus("foo", "bar",
THE_DISTANT_PAST, null, null, THE_PAST, null)));
mSubscriber.expectedEvents.add(RxNamespaceTest::isInitialMountAttemptStart);
attachSubscriber(RxNamespace.mount(mRxServer, "foo"));
}
@Test
public void testMountWithOlderEvents() throws InterruptedException {
mountWithOldStatus();
Thread.sleep(STATUS_POLLING_DELAY_MS);
assertNoAsyncErrors();
}
@Test
public void testUnsubscribeOnUnmount() throws InterruptedException {
mountWithOldStatus();
Thread.sleep(BLOCKING_DELAY_MS);
assertNoAsyncErrors();
mSubscriber.mayComplete = true;
when(mServer.getStatus()).thenReturn(mockStatus(new MountStatus("foo", "bar",
THE_DISTANT_PAST, null, null, DateTime.now(), null)));
Thread.sleep(STATUS_POLLING_DELAY_MS);
assertTrue("Mount should unsubscribe after unmount", mSubscriber.isUnsubscribed());
when(mServer.getStatus()).then(i -> {
fail("Polling should stop after an unmount");
return null;
});
Thread.sleep(STATUS_POLLING_DELAY_MS);
assertNoAsyncErrors();
}
}