// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package io.v.rx.syncbase;

import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.Futures;

import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.core.classloader.annotations.SuppressStaticInitializationFor;
import org.powermock.modules.junit4.PowerMockRunner;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import io.v.baku.toolkit.blessings.BlessingsUtils;
import io.v.baku.toolkit.blessings.ClientUser;
import io.v.debug.SyncbaseAndroidClient;
import io.v.rx.RxMountState;
import io.v.rx.RxTestCase;
import io.v.v23.context.VContext;
import io.v.v23.rpc.MountStatus;
import io.v.v23.rpc.Server;
import io.v.v23.rpc.ServerStatus;
import io.v.v23.security.Blessings;
import io.v.v23.syncbase.SyncbaseApp;
import io.v.v23.syncbase.SyncbaseService;
import io.v.v23.syncbase.nosql.Database;
import io.v.v23.syncbase.nosql.Syncgroup;
import java8.util.stream.RefStreams;
import rx.Observable;
import rx.Subscription;
import rx.subjects.ReplaySubject;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@RunWith(PowerMockRunner.class)
@SuppressStaticInitializationFor("io.v.baku.toolkit.blessings.BlessingsUtils")
@PrepareForTest({BlessingsUtils.class, SgHostUtil.class})
public class UserPeerSyncgroupTest extends RxTestCase {
    private static final long STATUS_POLLING_DELAY_MS = verificationDelay(
            RxMountState.DEFAULT_POLLING_INTERVAL);

    private final VContext mVContext = mock(VContext.class);

    private final ReplaySubject<Blessings> mRxBlessings = ReplaySubject.create();
    private final ReplaySubject<Server> mRxServer = ReplaySubject.create();
    private final ReplaySubject<SyncbaseService> mRxClient = ReplaySubject.create();

    private final Server mServer = mock(Server.class);
    private final SyncbaseService mClient = mock(SyncbaseService.class);
    private final Database mDb = mock(Database.class);
    private final Syncgroup mSg = mock(Syncgroup.class);

    private RxAndroidSyncbase mSb;

    @Before
    public void setUp() throws Exception {
        final SyncbaseAndroidClient sbClient = mock(SyncbaseAndroidClient.class);
        final SyncbaseApp app = mock(SyncbaseApp.class);

        when(sbClient.getRxServer()).thenReturn(mRxServer);
        when(sbClient.getRxClient()).thenReturn(mRxClient);

        mRxServer.onNext(mServer);
        mRxClient.onNext(mClient);

        when(mClient.getApp("app")).thenReturn(app);
        when(app.getNoSqlDatabase(eq("db"), any())).thenReturn(mDb);

        when(app.exists(any())).thenReturn(Futures.immediateFuture(true));
        when(mDb.exists(any())).thenReturn(Futures.immediateFuture(true));

        when(mSg.join(any(), any())).thenReturn(Futures.immediateFuture(null));

        mSb = new RxAndroidSyncbase(null, sbClient);

        PowerMockito.spy(SgHostUtil.class);
    }

    private Subscription joinMockSyncgroup() throws Exception {
        when(mDb.getSyncgroup(RxSyncbase.syncgroupName("users/foo@bar.com/app/sghost", "sg")))
                .thenReturn(mSg);
        final Stopwatch t = Stopwatch.createStarted();

        final Subscription subscription = UserPeerSyncgroup.builder()
                .vContext(mVContext)
                .rxBlessings(mRxBlessings)
                .syncHostLevel(new UserAppSyncHost("app", "sghost", "sgmt"))
                .sgSuffix("sg")
                .db(mSb.rxApp("app").rxDb("db"))
                .onError((m, e) -> catchAsync(e))
                .buildPeer()
                .join();

        PowerMockito.spy(BlessingsUtils.class);
        PowerMockito.doReturn(RefStreams.of(new ClientUser("fooclient", "foo@bar.com")))
                .when(BlessingsUtils.class, "blessingsToClientUserStream", any(), any());
        PowerMockito.doReturn(null)
                .when(BlessingsUtils.class, "blessingsToAcl", any(), any());
        PowerMockito.doReturn(null)
                .when(BlessingsUtils.class, "homogeneousPermissions", any(), any());

        long elapsed = t.elapsed(TimeUnit.MILLISECONDS);
        if (elapsed > BLOCKING_DELAY_MS) {
            fail("UserPeerSyncgroup.join should not block; took " + elapsed + " ms (threshold " +
                    BLOCKING_DELAY_MS + " ms)");
        }
        return subscription;
    }

    @Test
    public void testJoinAlreadyMounted() throws Exception {
        PowerMockito.doReturn(Observable.just(true))
                .when(SgHostUtil.class, "isSyncbaseOnline", any(), any());
        joinMockSyncgroup();
        mRxBlessings.onNext(null);
        Thread.sleep(STATUS_POLLING_DELAY_MS +
                SgHostUtil.SYNCBASE_PING_TIMEOUT.getMillis());
        verify(mServer, never()).addName(any());
        verify(mServer, never()).getStatus();
        verify(mSg).join(any(), any());
    }

    @Test
    public void testJoinMountLifecycle() throws Exception {
        PowerMockito.doReturn(Observable.just(false))
                .when(SgHostUtil.class, "isSyncbaseOnline", any(), any());
        final Subscription subscription = joinMockSyncgroup();

        final AtomicInteger statusPolls = new AtomicInteger();
        final ServerStatus statusNone = mock(ServerStatus.class);
        when(statusNone.getMounts()).then(i -> {
            statusPolls.incrementAndGet();
            return new MountStatus[0];
        });
        when(mServer.getStatus()).thenReturn(statusNone);

        Thread.sleep(STATUS_POLLING_DELAY_MS);
        verify(mServer, never()).addName(any());
        assertEquals("Polling should not start until blessings are resolved", 0, statusPolls.get());

        mRxBlessings.onNext(null);
        //Verify 3 polls + initial to ensure polling loop is working.
        Thread.sleep(RxMountState.DEFAULT_POLLING_INTERVAL.getMillis() +
                SgHostUtil.SYNCBASE_PING_TIMEOUT.getMillis());
        verify(mSg, never()).join(any(), any());
        final String name = "users/foo@bar.com/app/sghost";
        verify(mServer).addName(name);

        final MountStatus mountedMountStatus = new MountStatus(name, "foo", new DateTime(), null,
                        Duration.standardMinutes(5), new DateTime(0), null);
        final ServerStatus statusMounted = mock(ServerStatus.class);
        when(statusMounted.getMounts()).then(i -> {
            statusPolls.incrementAndGet();
            return new MountStatus[]{mountedMountStatus};
        });
        when(mServer.getStatus()).thenReturn(statusMounted);

        Thread.sleep(3 * RxMountState.DEFAULT_POLLING_INTERVAL.getMillis());
        final int nPolls = statusPolls.get();
        if (nPolls < 4) {
            fail("Polling should start and continue after blessings are resolved (" + nPolls +
                    "/4-5 expected polls)");
        }

        verify(mSg).join(any(), any());

        try {
            subscription.unsubscribe();
        } catch (final IllegalArgumentException expected) {
            // https://github.com/ReactiveX/RxJava/pull/3167
            // This should be fixed in the next version of RxJava.
        }
        statusPolls.set(0);
        Thread.sleep(STATUS_POLLING_DELAY_MS);
        assertEquals("Polling should stop after Syncgroup join is unsubscribed",
                0, statusPolls.get());
    }
}
