// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package vsync

import (
	"sync"
	"time"

	"v.io/v23/context"
	"v.io/v23/naming"
	"v.io/v23/verror"
	"v.io/x/lib/set"
	"v.io/x/lib/vlog"
	"v.io/x/ref/services/syncbase/ping"
	"v.io/x/ref/services/syncbase/server/interfaces"
	"v.io/x/ref/services/syncbase/server/util"
)

// Policies to pick a peer to sync with.
const (
	// Picks a peer at random from the available set.
	selectRandom = iota

	// TODO(hpucha): implement these policies.
	// Picks a peer with most differing generations.
	selectMostDiff

	// Picks a peer that was synced with the furthest in the past.
	selectOldest
)

// peerManager defines the interface that a peer manager module must provide.
type peerManager interface {
	// managePeers runs the feedback loop to manage and maintain a list of
	// healthy peers available for syncing.
	managePeers(ctx *context.T)

	// pickPeer picks a Syncbase to sync with.
	pickPeer(ctx *context.T) (connInfo, error)

	// updatePeerFromSyncer updates information for a peer that the syncer
	// attempts to connect to.
	updatePeerFromSyncer(ctx *context.T, peer connInfo, attemptTs time.Time, failed bool) error

	// updatePeerFromResponder updates information for a peer that the
	// responder responds to.
	updatePeerFromResponder(ctx *context.T, peer string, connTs time.Time, gvs interfaces.Knowledge) error
}

////////////////////////////////////////
// peerManager implementation.

// Every 'peerManagementInterval', the peerManager thread wakes up, picks up to
// 'pingFanout' peers based upon the configured policy that are not already in
// its peer cache, and pings them to determine if they are reachable. The
// reachable peers are added to the peer cache. When the syncer thread calls the
// peerManager to pick a peer to sync with, the peer is chosen from the peer
// cache. This improves the odds that we are contacting a peer that is
// available.
//
// peerManager selects peers to ping by incorporating the neighborhood
// information. The heuristic bootstraps by picking a set of random peers for
// each iteration and pinging them via their mount tables. This continues until
// all the peers in an iteration are unreachable via the mount tables. Upon such
// connection errors, the heuristic switches to picking a set of random peers
// from the neighborhood while still probing to see if peers are reachable via
// the syncgroup mount tables using an exponential backoff. For example, when
// the pings fail for the first time, the heuristic waits for two rounds before
// contacting peers via mount tables again. If this attempt also fails, it then
// backs off to wait four rounds before trying the mount tables, and so on. Upon
// success, these counters are reset, and the heuristic goes back to randomly
// selecting peers and communicating via the syncgroup mount tables.

// peerSyncInfo is the running statistics collected per peer in both sync
// directions; for a peer which syncs with this node or with which this node
// syncs.
//
// TODO(hpucha): Incorporate ping related statistics here.
type peerSyncInfo struct {
	// Number of continuous failures noticed when attempting to connect with
	// this peer, either via any of its advertised mount tables or via
	// neighborhood. These counters are reset when the connection to the
	// peer succeeds.
	numFailuresMountTable   uint64
	numFailuresNeighborhood uint64

	// The most recent timestamp when a connection to this peer was attempted.
	attemptTs time.Time
	// The most recent timestamp when a connection to this peer succeeded.
	successTs time.Time
	// The most recent timestamp when this peer synced with this node.
	fromTs time.Time
	// Map of database names to their corresponding generation vectors for
	// data and syncgroups.
	gvs map[string]interfaces.Knowledge
}

// connInfo holds the information needed to connect to a peer.
//
// TODO(hpucha): Add hints to decide if both neighborhood and mount table must
// be tried. Currently, if addrs are set, only addrs are tried.
type connInfo struct {
	// Name of the peer relative to the mount table chosen by the syncgroup
	// creator.
	relName string

	// Mount tables via which this peer might be reachable.
	mtTbls []string

	// Network addresses of the peer if available. For example, this can be
	// obtained from neighborhood discovery.
	addrs []string
}

// peerHealthInfo contains the connection information and its associated expiry
// time for a peer that was successfully pinged.
type peerHealthInfo struct {
	// connInfo contains all the successfully reachable mount tables or
	// endpoints based on the whether the selection is in the neighborhood
	// or mount table mode.
	peer   connInfo
	expiry time.Time
}

type peerManagerImpl struct {
	sync.RWMutex
	s      *syncService
	policy int // peer selection policy

	// In-memory cache of information relevant to syncing with a peer. This
	// information could potentially be used in peer selection.

	// In-memory state to detect and handle the case when a peer has
	// restricted connectivity.
	numFailuresMountTable uint64 // total number of mount table failures across peers.
	curCount              uint64 // remaining number of sync rounds before any mount table will be retried.

	// In-memory cache of peer specific information.
	peerTbl map[string]*peerSyncInfo

	// In-memory cache of healthy peers.
	healthyPeerCache map[string]*peerHealthInfo
}

func newPeerManager(ctx *context.T, s *syncService, peerSelectionPolicy int) peerManager {
	return &peerManagerImpl{
		s:                s,
		policy:           peerSelectionPolicy,
		peerTbl:          make(map[string]*peerSyncInfo),
		healthyPeerCache: make(map[string]*peerHealthInfo),
	}
}

func (pm *peerManagerImpl) managePeers(ctx *context.T) {
	defer pm.s.pending.Done()

	ticker := time.NewTicker(peerManagementInterval)
	defer ticker.Stop()

	for !pm.s.isClosed() {
		select {
		case <-ticker.C:
			if pm.s.isClosed() {
				break
			}
			pm.managePeersInternal(ctx)

		case <-pm.s.closed:
			break
		}
	}
	vlog.VI(1).Info("sync: managePeers: channel closed, stop work and exit")
}

func (pm *peerManagerImpl) managePeersInternal(ctx *context.T) {
	vlog.VI(2).Info("sync: managePeersInternal: begin")
	defer vlog.VI(2).Info("sync: managePeersInternal: end")

	var peers []*connInfo
	var viaMtTbl bool

	switch pm.policy {
	case selectRandom:
		peers, viaMtTbl = pm.pickPeersToPingRandom(ctx)
	default:
		return
	}

	if len(peers) == 0 {
		return
	}

	peers = pm.pingPeers(ctx, peers)

	pm.Lock()
	defer pm.Unlock()

	if len(peers) == 0 {
		if viaMtTbl {
			// Since all pings via mount tables failed, we treat
			// this as if the node lost general connectivity and
			// operate in neighborhood only mode for some time.
			if pm.numFailuresMountTable == 0 {
				// Drop the peer cache when we switch to using
				// neighborhood.
				pm.healthyPeerCache = make(map[string]*peerHealthInfo)
			}
			pm.numFailuresMountTable++
			pm.curCount = roundsToBackoff(pm.numFailuresMountTable)
		}
		return
	}

	if viaMtTbl {
		pm.numFailuresMountTable = 0
		// We do not drop the healthyPeerCache here and continue using the
		// neighborhood peers until the cache entries expire.
	}

	// Add the newly found peers to the cache.
	// TODO(hpucha): Change these times to vclock??
	expiry := time.Now().Add(healthInfoTimeOut)
	for _, p := range peers {
		pm.healthyPeerCache[p.relName] = &peerHealthInfo{peer: *p, expiry: expiry}
	}
}

// pickPeersToPingRandom picks up to 'pingFanout' peers that are not already in
// the healthyPeerCache. It returns 'true' to indicate that these peers are picked to
// be reached via the mount tables, 'false' otherwise.
func (pm *peerManagerImpl) pickPeersToPingRandom(ctx *context.T) ([]*connInfo, bool) {
	pm.Lock()
	defer pm.Unlock()

	// Evict expired peers from the cache if any.
	pm.evictExpiredPeerCacheEntries(ctx)

	var peers []*connInfo

	members := pm.s.getMembers(ctx)

	// Remove myself from the set.
	delete(members, pm.s.name)
	if len(members) == 0 {
		vlog.VI(4).Infof("sync: pickPeersToPingRandom: no sgmembers found")
		return nil, true
	}

	if pm.curCount == 0 {
		vlog.VI(4).Infof("sync: pickPeersToPingRandom: picking from all sgmembers")

		// Compute number of available peers.
		n := 0
		for m := range members {
			if _, ok := pm.healthyPeerCache[m]; !ok {
				n++
			}
		}

		// Pick peers at random up to allowed fanout. Random selection
		// is obtained as follows: As we iterate over the map, the
		// current element is selected with a probability of (num
		// elements remaining to be chosen)/(total elements remaining to
		// be traversed). For example, to choose k elements out of a map
		// of size n, the first element is selected with a probability
		// of k/n. If the first element is included, the second element
		// is selected with a probability of k-1/n-1. If first element
		// was not included, the second element is selected with a
		// probability of k/n-1.
		k := pingFanout
		for m := range members {
			if _, ok := pm.healthyPeerCache[m]; !ok {
				// Decide if this member is to be chosen.
				if randIntn(n) < k {
					info := pm.s.copyMemberInfo(ctx, m)
					p := &connInfo{
						relName: m,
						mtTbls:  set.String.ToSlice(info.mtTables),
					}
					peers = append(peers, p)

					k--
					if k == 0 {
						break
					}
				}
				n--
			}
		}
		return peers, true
	}

	pm.curCount--

	// Pick peers from the neighborhood if available.
	neighbors := pm.s.filterDiscoveryPeers(members)
	if len(neighbors) == 0 {
		vlog.VI(4).Infof("sync: pickPeersToPingRandom: no neighbors found")
		return nil, false
	}

	vlog.VI(4).Infof("sync: pickPeersToPingRandom: picking from neighbors")

	// Compute number of available peers.
	n := 0
	for nbr := range neighbors {
		if _, ok := pm.healthyPeerCache[nbr]; !ok {
			n++
		}
	}

	// Pick peers at random up to allowed fanout. Random selection is done
	// as described above.
	k := pingFanout
	for nbr, svc := range neighbors {
		if _, ok := pm.healthyPeerCache[nbr]; !ok {
			if randIntn(n) < k {
				p := &connInfo{relName: nbr, addrs: svc.Addrs}
				peers = append(peers, p)

				k--
				if k == 0 {
					break
				}
			}
			n--
		}
	}
	return peers, false
}

func (pm *peerManagerImpl) pickPeer(ctx *context.T) (connInfo, error) {
	switch pm.policy {
	case selectRandom:
		return pm.pickPeerRandom(ctx)
	default:
		return connInfo{}, verror.New(verror.ErrInternal, ctx, "unimplemented peer selection policy")
	}
}

func (pm *peerManagerImpl) pickPeerRandom(ctx *context.T) (connInfo, error) {
	pm.Lock()
	defer pm.Unlock()

	pm.evictExpiredPeerCacheEntries(ctx)

	var nullPeer connInfo

	if len(pm.healthyPeerCache) == 0 {
		return nullPeer, verror.New(verror.ErrInternal, ctx, "no usable peer")
	}

	// Pick a peer at random.
	ind := randIntn(len(pm.healthyPeerCache))
	for _, info := range pm.healthyPeerCache {
		if ind == 0 {
			return info.peer, nil
		}
		ind--
	}
	panic("random selection didn't succeed")
}

func (pm *peerManagerImpl) updatePeerFromSyncer(ctx *context.T, peer connInfo, attemptTs time.Time, failed bool) error {
	pm.Lock()
	defer pm.Unlock()

	info, ok := pm.peerTbl[peer.relName]
	if !ok {
		info = &peerSyncInfo{}
		pm.peerTbl[peer.relName] = info
	}

	info.attemptTs = attemptTs
	if failed { // Handle failed sync attempt.
		// Evict the peer from healthyPeerCache.
		delete(pm.healthyPeerCache, peer.relName)

		if peer.addrs != nil {
			info.numFailuresNeighborhood++
		} else {
			info.numFailuresMountTable++
		}
	} else {
		if peer.addrs != nil {
			info.numFailuresNeighborhood = 0
		} else {
			info.numFailuresMountTable = 0
		}

		now := time.Now()
		info.successTs = now

		// Update the health information in healthyPeerCache if the peer is not
		// yet evicted, and this information is more recent than the
		// cache.
		if hinfo, ok := pm.healthyPeerCache[peer.relName]; ok {
			expiry := attemptTs.Add(healthInfoTimeOut)
			if hinfo.expiry.Before(expiry) && expiry.After(now) {
				hinfo.peer = peer
				hinfo.expiry = expiry
			}
		}
	}

	return nil
}

// TODO(hpucha): Implement this.
func (pm *peerManagerImpl) updatePeerFromResponder(ctx *context.T, peer string, connTs time.Time, gv interfaces.Knowledge) error {
	return nil
}

////////////////////////////////////////
// Helpers.

// roundsToBackoff computes the exponential backoff with a cap on the backoff.
func roundsToBackoff(failures uint64) uint64 {
	if failures > pingFailuresCap {
		failures = pingFailuresCap
	}

	return 1 << failures
}

// Caller of this function should hold the lock to manipulate the shared
// healthyPeerCache.
func (pm *peerManagerImpl) evictExpiredPeerCacheEntries(ctx *context.T) {
	now := time.Now()
	for p, info := range pm.healthyPeerCache {
		if now.After(info.expiry) {
			delete(pm.healthyPeerCache, p)
		}
	}

	return
}

func (pm *peerManagerImpl) pingPeers(ctx *context.T, peers []*connInfo) []*connInfo {
	type nameInfo struct {
		ci    *connInfo
		mtTbl bool
		index int
	}

	nm := make(map[string]*nameInfo)

	names := make([]string, 0, len(peers))
	for _, p := range peers {
		for i, a := range p.addrs {
			n := naming.Join(a, util.SyncbaseSuffix)
			names = append(names, n)
			nm[n] = &nameInfo{
				ci:    p,
				mtTbl: false,
				index: i,
			}
		}
		for i, mt := range p.mtTbls {
			n := naming.Join(mt, p.relName, util.SyncbaseSuffix)
			names = append(names, n)
			nm[n] = &nameInfo{
				ci:    p,
				mtTbl: true,
				index: i,
			}
		}
	}

	vlog.VI(4).Infof("sync: pingPeers: sending names %v", names)

	res, err := ping.PingInParallel(ctx, names, connectionTimeOut, connectionTimeOut)
	if err != nil {
		return nil
	}

	vlog.VI(4).Infof("sync: pingPeers: returned result %v", res)

	// Make a list of the successful peers with their mount tables or
	// neighborhood addresses that succeeded.
	speers := make(map[string]*connInfo)
	var speersArr []*connInfo

	for _, r := range res {
		if r.Err != nil {
			continue
		}

		info := nm[r.Name]
		ci, ok := speers[info.ci.relName]
		if !ok {
			ci = &connInfo{relName: info.ci.relName}
			speers[info.ci.relName] = ci
			speersArr = append(speersArr, ci)
		}

		if info.mtTbl {
			ci.mtTbls = append(ci.mtTbls, info.ci.mtTbls[info.index])
		} else {
			ci.addrs = append(ci.addrs, info.ci.addrs[info.index])
		}
	}

	return speersArr
}
