// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package io.v.rx.syncbase;

import android.content.Context;

import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;

import io.v.baku.toolkit.BakuActivityTrait;
import io.v.baku.toolkit.R;
import io.v.baku.toolkit.VAndroidContextTrait;
import io.v.baku.toolkit.blessings.BlessingsUtils;
import io.v.v23.context.VContext;
import io.v.v23.security.Blessings;
import io.v.v23.security.access.AccessList;
import io.v.v23.security.access.Permissions;
import io.v.v23.services.syncbase.nosql.SyncgroupJoinFailedException;
import io.v.v23.services.syncbase.nosql.SyncgroupMemberInfo;
import io.v.v23.services.syncbase.nosql.SyncgroupSpec;
import io.v.v23.services.syncbase.nosql.TableRow;
import io.v.v23.syncbase.nosql.Database;
import io.v.v23.syncbase.nosql.Syncgroup;
import java8.util.function.Function;
import java8.util.stream.Collectors;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import rx.Observable;
import rx.Subscription;
import rx.functions.Action2;

import static net.javacrumbs.futureconverter.guavarx.FutureConverter.toObservable;

@Accessors(prefix = "m")
@AllArgsConstructor
@Builder(builderClassName = "Builder")
@Slf4j
public class GlobalUserSyncgroup {
    public static final String
            DEFAULT_SYNCGROUP_HOST_NAME = "usersync",
            DEFAULT_SYNCGROUP_SUFFIX = "user",
            DEFAULT_RENDEZVOUS_MOUNT = "sgmt";
    public static final SyncgroupMemberInfo
            DEFAULT_SYNCGROUP_MEMBER_INFO = new SyncgroupMemberInfo();

    public static GlobalUserSyncgroup forActivity(final BakuActivityTrait t) {
        return builder().activity(t).build();
    }

    public static class Builder {
        private String sgSuffix = DEFAULT_SYNCGROUP_SUFFIX;
        private Function<String, String> descriptionForUsername = u -> "User syncgroup for " + u;
        private Function<AccessList, Permissions> permissionsForUserAcl =
                BlessingsUtils::syncgroupPermissions;
        private List<TableRow> prefixes = new ArrayList<>();
        private SyncgroupMemberInfo memberInfo = DEFAULT_SYNCGROUP_MEMBER_INFO;

        /**
         * This is an additive setter to {@link #prefixes(List)}.
         */
        public Builder prefix(final TableRow prefix) {
            prefixes.add(prefix);
            return this;
        }

        /**
         * This is an additive setter to {@link #prefixes(List)}.
         */
        public Builder prefix(final String tableName, final String rowPrefix) {
            return prefix(new TableRow(tableName, rowPrefix));
        }

        /**
         * This is an additive setter to {@link #prefixes(List)}.
         */
        public Builder prefix(final String tableName) {
            return prefix(tableName, "");
        }

        /**
         * This is a composite setter for:
         * <ul>
         * <li>{@code vContext}</li>
         * <li>{@code rxBlessings}</li>
         * <li>{@code syncHostLevel}</li> (as a new
         * {@link UserAppSyncHost#UserAppSyncHost(Context)})
         * <li>{@code onError}</li>
         * </ul>
         * and should be called prior to any overrides for those fields.
         */
        public Builder activity(final VAndroidContextTrait<?> t) {
            return vContext(t.getVContext())
                    .rxBlessings(t.getBlessingsProvider().getRxBlessings())
                    .syncHostLevel(new UserAppSyncHost(t.getAndroidContext()))
                    .onError(t.getErrorReporter()::onError);
        }

        /**
         * In addition to those fields in {@link #activity(VAndroidContextTrait)}, this
         * additionally sets:
         * <ul>
         * <li>{@code syncbase}</li>
         * <li>{@code db}</li>
         * <li>and adds to {@code prefixes}</li>
         * </ul>
         */
        public Builder activity(final BakuActivityTrait<?> t) {
            return activity(t.getVAndroidContextTrait())
                    .syncbase(t.getSyncbase())
                    .db(t.getSyncbaseDb())
                    .prefix(t.getSyncbaseTableName());
        }
    }

    private final VContext mVContext;
    private final Observable<Blessings> mRxBlessings;
    private final SyncHostLevel mSyncHostLevel;
    private final String mSgSuffix;
    private final RxSyncbase mSyncbase;
    private final RxDb mDb;
    private final Function<String, String> mDescriptionForUsername;
    private final Function<AccessList, Permissions> mPermissionsForUserAcl;
    private final List<TableRow> mPrefixes;
    private final SyncgroupMemberInfo mMemberInfo;
    /**
     * @see io.v.baku.toolkit.ErrorReporter#onError(int, Throwable)
     */
    private final Action2<Integer, Throwable> mOnError;

    private SyncgroupSpec createSpec(final String username, final AccessList userAcl) {
        return new SyncgroupSpec(mDescriptionForUsername.apply(username),
                mPermissionsForUserAcl.apply(userAcl), mPrefixes,
                mSyncHostLevel.getRendezvousTableNames(username), false);
    }

    private Observable<SyncgroupSpec> createOrJoinSyncgroup(final Database db, final String sgName,
                                                            final SyncgroupSpec spec) {
        final Syncgroup sg = db.getSyncgroup(sgName);
        return Observable.defer(() -> toObservable(sg.join(mVContext, mMemberInfo)))
                .doOnCompleted(() -> log.info("Joined syncgroup " + sgName))
                .onErrorResumeNext(t -> t instanceof SyncgroupJoinFailedException ?
                        toObservable(sg.create(mVContext, spec, mMemberInfo))
                                .doOnCompleted(() -> log.info("Created syncgroup " + sgName))
                                .map(x -> spec) :
                        Observable.error(t));
    }

    private Observable<Object> createOrJoinSyncgroup(final String username, final AccessList acl) {
        final String sgHost = mSyncHostLevel.getSyncgroupHostName(username);
        final String sgName = RxSyncbase.syncgroupName(sgHost, mSgSuffix);
        final SyncgroupSpec spec = createSpec(username, acl);

        final Observable<Object> mount = SgHostUtil.ensureSyncgroupHost(
                mVContext, mSyncbase.getRxServer(), sgHost).share();

        return mDb.getObservable()
                .switchMap(db -> Observable.merge(mount.first().ignoreElements().concatWith(
                        createOrJoinSyncgroup(db, sgName, spec)), mount));
    }

    public Observable<?> rxJoin() {
        return Observable.switchOnNext(mRxBlessings
                .map(b -> {
                    final AccessList acl = BlessingsUtils.blessingsToAcl(mVContext, b);
                    final List<Observable<?>> createOrJoins =
                            BlessingsUtils.blessingsToUsernameStream(mVContext, b)
                                    .distinct()
                                    .map(u -> createOrJoinSyncgroup(u, acl))
                                    .collect(Collectors.toList());
                    if (createOrJoins.isEmpty()) {
                        throw new NoSuchElementException("GlobalUserSyncgroup requires a " +
                                "username; no username blessings found. Blessings: " + b);
                    }
                    return Observable.merge(createOrJoins);
                }));
    }

    /**
     * It is not generally necessary to unsubscribe explicitly from this subscription since the
     * lifecycle of the Syncbase client is generally tied to a Baku Activity.
     */
    public Subscription join() {
        return rxJoin().subscribe(x -> {
        }, t -> mOnError.call(R.string.err_syncgroup_join, t));
    }
}
