blob: 565f7d528bc846c22c0caebfca4ace3639feeb72 [file] [log] [blame]
package io.v.rx.syncbase;// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
import com.google.common.base.Stopwatch;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.core.classloader.annotations.SuppressStaticInitializationFor;
import org.powermock.modules.junit4.PowerMockRunner;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import io.v.baku.toolkit.BlessingsUtils;
import io.v.debug.SyncbaseClient;
import io.v.rx.RxMountState;
import io.v.rx.RxTestCase;
import io.v.v23.context.VContext;
import io.v.v23.rpc.MountStatus;
import io.v.v23.rpc.Server;
import io.v.v23.rpc.ServerStatus;
import io.v.v23.security.Blessings;
import io.v.v23.syncbase.SyncbaseApp;
import io.v.v23.syncbase.SyncbaseService;
import io.v.v23.syncbase.nosql.Database;
import io.v.v23.syncbase.nosql.Syncgroup;
import java8.util.stream.RefStreams;
import rx.Subscription;
import rx.subjects.ReplaySubject;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(PowerMockRunner.class)
@SuppressStaticInitializationFor("io.v.baku.toolkit.BlessingsUtils")
@PrepareForTest({BlessingsUtils.class, SgHostUtil.class})
public class GlobalUserSyncgroupTest extends RxTestCase {
private static final long STATUS_POLLING_DELAY_MS = verificationDelay(
RxMountState.DEFAULT_POLLING_INTERVAL);
private final VContext mVContext = mock(VContext.class);
private final ReplaySubject<Blessings> mRxBlessings = ReplaySubject.create();
private final ReplaySubject<Server> mRxServer = ReplaySubject.create();
private final ReplaySubject<SyncbaseService> mRxClient = ReplaySubject.create();
private final Server mServer = mock(Server.class);
private final SyncbaseService mClient = mock(SyncbaseService.class);
private final Database mDb = mock(Database.class);
private final Syncgroup mSg = mock(Syncgroup.class);
private RxSyncbase mSb;
@Before
public void setUp() throws Exception {
final SyncbaseClient sbClient = mock(SyncbaseClient.class);
final SyncbaseApp app = mock(SyncbaseApp.class);
when(sbClient.getRxServer()).thenReturn(mRxServer);
when(sbClient.getRxClient()).thenReturn(mRxClient);
mRxServer.onNext(mServer);
mRxClient.onNext(mClient);
when(mClient.getApp("app")).thenReturn(app);
when(app.getNoSqlDatabase(eq("db"), any())).thenReturn(mDb);
mSb = new RxSyncbase(null, sbClient);
PowerMockito.spy(SgHostUtil.class);
}
private Subscription joinMockSyncgroup() throws Exception {
when(mDb.getSyncgroup(RxSyncbase.syncgroupName("users/foo@bar.com/app/sghost", "sg")))
.thenReturn(mSg);
final Stopwatch t = Stopwatch.createStarted();
final Subscription subscription = GlobalUserSyncgroup.builder()
.vContext(mVContext)
.rxBlessings(mRxBlessings)
.syncHostLevel(new UserAppSyncHost("app", "sghost", "sgmt"))
.sgSuffix("sg")
.syncbase(mSb)
.db(mSb.rxApp("app").rxDb("db"))
.onError((m, e) -> catchAsync(e))
.build()
.join();
PowerMockito.spy(BlessingsUtils.class);
PowerMockito.doReturn(RefStreams.of("foo@bar.com"))
.when(BlessingsUtils.class, "blessingsToUsernameStream", any(), any());
PowerMockito.doReturn(null)
.when(BlessingsUtils.class, "blessingsToAcl", any(), any());
PowerMockito.doReturn(null)
.when(BlessingsUtils.class, "homogeneousPermissions", any(), any());
long elapsed = t.elapsed(TimeUnit.MILLISECONDS);
if (elapsed > BLOCKING_DELAY_MS) {
fail("GlobalUserSyncgroup.join should not block; took " + elapsed + " ms (threshold " +
BLOCKING_DELAY_MS + " ms)");
}
return subscription;
}
@Test
public void testJoinAlreadyMounted() throws Exception {
PowerMockito.doReturn(true)
.when(SgHostUtil.class, "isSyncbaseOnline", any(), any());
joinMockSyncgroup();
mRxBlessings.onNext(null);
Thread.sleep(STATUS_POLLING_DELAY_MS +
SgHostUtil.SYNCBASE_PING_TIMEOUT.getMillis());
verify(mServer, never()).addName(any());
verify(mServer, never()).getStatus();
verify(mSg).join(any(), any());
}
@Test
public void testJoinMountLifecycle() throws Exception {
PowerMockito.doReturn(false)
.when(SgHostUtil.class, "isSyncbaseOnline", any(), any());
final Subscription subscription = joinMockSyncgroup();
final AtomicInteger statusPolls = new AtomicInteger();
final ServerStatus serverStatus = mock(ServerStatus.class);
when(serverStatus.getMounts()).then(i -> {
statusPolls.incrementAndGet();
return new MountStatus[0];
});
when(mServer.getStatus()).thenReturn(serverStatus);
Thread.sleep(STATUS_POLLING_DELAY_MS);
verify(mServer, never()).addName(any());
assertEquals("Polling should not start until blessings are resolved", 0, statusPolls.get());
mRxBlessings.onNext(null);
//Verify 3 polls + initial to ensure polling loop is working.
Thread.sleep(4 * RxMountState.DEFAULT_POLLING_INTERVAL.getMillis() +
SgHostUtil.SYNCBASE_PING_TIMEOUT.getMillis());
verify(mServer).addName("users/foo@bar.com/app/sghost");
final int nPolls = statusPolls.get();
if (nPolls < 4) {
fail("Polling should start and continue after blessings are resolved (" + nPolls +
"/4-5 expected polls)");
}
try {
subscription.unsubscribe();
} catch (final IllegalArgumentException expected) {
// https://github.com/ReactiveX/RxJava/pull/3167
// This should be fixed in the next version of RxJava.
}
Thread.sleep(BLOCKING_DELAY_MS);
statusPolls.set(0);
Thread.sleep(STATUS_POLLING_DELAY_MS);
assertEquals("Polling should stop after Syncgroup join is unsubscribed",
0, statusPolls.get());
}
}