blob: a8c488d38a1a77ac82c46655a1a423325017da01 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package vsync
import (
"bytes"
"fmt"
"sync"
"time"
"v.io/v23/context"
"v.io/v23/flow"
"v.io/v23/naming"
"v.io/v23/verror"
"v.io/x/lib/set"
"v.io/x/lib/vlog"
"v.io/x/ref/lib/stats"
"v.io/x/ref/services/syncbase/common"
"v.io/x/ref/services/syncbase/ping"
"v.io/x/ref/services/syncbase/server/interfaces"
)
// Policies to pick a peer to sync with.
const (
// Picks a peer at random from the available set.
selectRandom = iota
// TODO(hpucha): implement these policies.
// Picks a peer with most differing generations.
selectMostDiff
// Picks a peer that was synced with the furthest in the past.
selectOldest
)
// peerManager defines the interface that a peer manager module must provide.
type peerManager interface {
// managePeers runs the feedback loop to manage and maintain a list of
// healthy peers available for syncing.
managePeers(ctx *context.T)
// pickPeer picks a Syncbase to sync with.
pickPeer(ctx *context.T) (connInfo, error)
// updatePeerFromSyncer updates information for a peer that the syncer
// attempts to connect to.
updatePeerFromSyncer(ctx *context.T, peer connInfo, attemptTs time.Time, failed bool) error
// updatePeerFromResponder updates information for a peer that the
// responder responds to.
updatePeerFromResponder(ctx *context.T, peer string, connTs time.Time, gvs interfaces.Knowledge) error
exportStats(prefix string)
}
////////////////////////////////////////
// peerManager implementation.
// Every 'peerManagementInterval', the peerManager thread wakes up, picks up to
// 'pingFanout' peers based upon the configured policy that are not already in
// its peer cache, and pings them to determine if they are reachable. The
// reachable peers are added to the peer cache. When the syncer thread calls the
// peerManager to pick a peer to sync with, the peer is chosen from the peer
// cache. This improves the odds that we are contacting a peer that is
// available.
//
// peerManager selects peers to ping by incorporating the neighborhood
// information. The heuristic bootstraps by picking a set of random peers for
// each iteration and pinging them via their mount tables. This continues until
// all the peers in an iteration are unreachable via the mount tables. Upon such
// connection errors, the heuristic switches to picking a set of random peers
// from the neighborhood while still probing to see if peers are reachable via
// the syncgroup mount tables using an exponential backoff. For example, when
// the pings fail for the first time, the heuristic waits for two rounds before
// contacting peers via mount tables again. If this attempt also fails, it then
// backs off to wait four rounds before trying the mount tables, and so on. Upon
// success, these counters are reset, and the heuristic goes back to randomly
// selecting peers and communicating via the syncgroup mount tables.
// peerSyncInfo is the running statistics collected per peer in both sync
// directions; for a peer which syncs with this node or with which this node
// syncs.
//
// TODO(hpucha): Incorporate ping related statistics here.
type peerSyncInfo struct {
// Number of continuous failures noticed when attempting to connect with
// this peer, either via any of its advertised mount tables or via
// neighborhood. These counters are reset when the connection to the
// peer succeeds.
numFailuresMountTable uint64
numFailuresNeighborhood uint64
// The most recent timestamp when a connection to this peer was attempted.
attemptTs time.Time
// The most recent timestamp when a connection to this peer succeeded.
successTs time.Time
// The most recent timestamp when this peer synced with this node.
fromTs time.Time
// Map of database names to their corresponding generation vectors for
// data and syncgroups.
gvs map[string]interfaces.Knowledge
}
// connInfo holds the information needed to connect to a peer.
//
// TODO(hpucha): Add hints to decide if both neighborhood and mount table must
// be tried. Currently, if addrs are set, only addrs are tried.
type connInfo struct {
// Name of the peer relative to the mount table chosen by the syncgroup
// creator.
relName string
// Mount tables via which this peer might be reachable.
mtTbls []string
// Network addresses of the peer if available. For example, this can be
// obtained from neighborhood discovery.
addrs []string
// pinned is a flow.PinnedConn to the remote end.
// pinned.Conn() returns the last successful connection to the remote end.
// If the channel returned by pinned.Conn.Closed() is closed, the connection can
// no longer be used.
// Once pinned.Unpin() is called, the connection will no longer be pinned in
// rpc cache, and healthCheck will return to the rpc default health check interval.
pinned flow.PinnedConn
// addedTime is the time at which the connection was put into the peer cache.
addedTime time.Time
}
type peerManagerImpl struct {
sync.RWMutex
s *syncService
policy int // peer selection policy
// In-memory cache of information relevant to syncing with a peer. This
// information could potentially be used in peer selection.
// In-memory state to detect and handle the case when a peer has
// restricted connectivity.
numFailuresMountTable uint64 // total number of mount table failures across peers.
curCount uint64 // remaining number of sync rounds before any mount table will be retried.
// In-memory cache of peer specific information.
peerTbl map[string]*peerSyncInfo
// In-memory cache of healthy peers.
healthyPeerCache map[string]*connInfo
}
func newPeerManager(ctx *context.T, s *syncService, peerSelectionPolicy int) peerManager {
return &peerManagerImpl{
s: s,
policy: peerSelectionPolicy,
peerTbl: make(map[string]*peerSyncInfo),
healthyPeerCache: make(map[string]*connInfo),
}
}
func (pm *peerManagerImpl) exportStats(prefix string) {
stats.NewStringFunc(naming.Join(prefix, "peers"), pm.debugStringForPeers)
}
func (pm *peerManagerImpl) debugStringForPeers() string {
pm.Lock()
defer pm.Unlock()
buf := &bytes.Buffer{}
for _, c := range pm.healthyPeerCache {
fmt.Fprintf(buf, "%v\n", c.debugString())
fmt.Fprintln(buf)
}
return buf.String()
}
func (c *connInfo) debugString() string {
buf := &bytes.Buffer{}
fmt.Fprintf(buf, "RELNAME: %v\n", c.relName)
fmt.Fprintf(buf, "MTTBLS: %v\n", c.mtTbls)
fmt.Fprintf(buf, "ADDRS: %v\n", c.addrs)
fmt.Fprintf(buf, "ADDEDTIME: %v\n", c.addedTime)
return buf.String()
}
func (pm *peerManagerImpl) managePeers(ctx *context.T) {
defer pm.s.pending.Done()
ticker := time.NewTicker(peerManagementInterval)
defer ticker.Stop()
for !pm.s.isClosed() {
select {
case <-ticker.C:
if pm.s.isClosed() {
break
}
pm.managePeersInternal(ctx)
case <-pm.s.closed:
break
}
}
vlog.VI(1).Info("sync: managePeers: channel closed, stop work and exit")
}
func (pm *peerManagerImpl) managePeersInternal(ctx *context.T) {
vlog.VI(2).Info("sync: managePeersInternal: begin")
defer vlog.VI(2).Info("sync: managePeersInternal: end")
var peers []*connInfo
var viaMtTbl bool
switch pm.policy {
case selectRandom:
peers, viaMtTbl = pm.pickPeersToPingRandom(ctx)
default:
return
}
if len(peers) == 0 {
return
}
peers = pm.pingPeers(ctx, peers)
pm.Lock()
defer pm.Unlock()
if len(peers) == 0 {
if viaMtTbl {
// Since all pings via mount tables failed, we treat
// this as if the node lost general connectivity and
// operate in neighborhood only mode for some time.
if pm.numFailuresMountTable == 0 {
// Drop the peer cache when we switch to using
// neighborhood.
for _, p := range pm.healthyPeerCache {
p.pinned.Unpin()
}
pm.healthyPeerCache = make(map[string]*connInfo)
}
pm.numFailuresMountTable++
pm.curCount = roundsToBackoff(pm.numFailuresMountTable)
}
return
}
if viaMtTbl {
pm.numFailuresMountTable = 0
// We do not drop the healthyPeerCache here and continue using the
// neighborhood peers until the cache entries expire.
}
now := time.Now()
for _, p := range peers {
if old, ok := pm.healthyPeerCache[p.relName]; ok {
old.pinned.Unpin()
}
p.addedTime = now
pm.healthyPeerCache[p.relName] = p
}
}
// pickPeersToPingRandom picks up to 'pingFanout' peers that are not already in
// the healthyPeerCache. It returns 'true' to indicate that these peers are picked to
// be reached via the mount tables, 'false' otherwise.
func (pm *peerManagerImpl) pickPeersToPingRandom(ctx *context.T) ([]*connInfo, bool) {
pm.Lock()
defer pm.Unlock()
// Evict any closed connection from the cache.
pm.evictClosedPeerConnsLocked(ctx)
var peers []*connInfo
members := pm.s.getMembers(ctx)
// Remove myself from the set.
delete(members, pm.s.name)
if len(members) == 0 {
vlog.VI(4).Infof("sync: pickPeersToPingRandom: no sgmembers found")
return nil, true
}
if pm.curCount == 0 {
vlog.VI(4).Infof("sync: pickPeersToPingRandom: picking from all sgmembers")
// Compute number of available peers.
n := 0
for m := range members {
if _, ok := pm.healthyPeerCache[m]; !ok {
n++
}
}
// Pick peers at random up to allowed fanout. Random selection
// is obtained as follows: As we iterate over the map, the
// current element is selected with a probability of (num
// elements remaining to be chosen)/(total elements remaining to
// be traversed). For example, to choose k elements out of a map
// of size n, the first element is selected with a probability
// of k/n. If the first element is included, the second element
// is selected with a probability of k-1/n-1. If first element
// was not included, the second element is selected with a
// probability of k/n-1.
k := pingFanout
for m := range members {
if _, ok := pm.healthyPeerCache[m]; !ok {
// Decide if this member is to be chosen.
if pm.s.randIntn(n) < k {
info := pm.s.copyMemberInfo(ctx, m)
p := &connInfo{
relName: m,
mtTbls: set.String.ToSlice(info.mtTables),
}
peers = append(peers, p)
k--
if k == 0 {
break
}
}
n--
}
}
return peers, true
}
pm.curCount--
// Pick peers from the neighborhood if available.
neighbors := pm.s.filterDiscoveryPeers(members)
if len(neighbors) == 0 {
vlog.VI(4).Infof("sync: pickPeersToPingRandom: no neighbors found")
return nil, false
}
vlog.VI(4).Infof("sync: pickPeersToPingRandom: picking from neighbors")
// Compute number of available peers.
n := 0
for nbr := range neighbors {
if _, ok := pm.healthyPeerCache[nbr]; !ok {
n++
}
}
// Pick peers at random up to allowed fanout. Random selection is done
// as described above.
k := pingFanout
for nbr, svc := range neighbors {
if _, ok := pm.healthyPeerCache[nbr]; !ok {
if pm.s.randIntn(n) < k {
p := &connInfo{relName: nbr, addrs: svc.Addresses}
peers = append(peers, p)
k--
if k == 0 {
break
}
}
n--
}
}
return peers, false
}
func (pm *peerManagerImpl) pickPeer(ctx *context.T) (connInfo, error) {
switch pm.policy {
case selectRandom:
return pm.pickPeerRandom(ctx)
default:
return connInfo{}, verror.New(verror.ErrInternal, ctx, "unimplemented peer selection policy")
}
}
func (pm *peerManagerImpl) pickPeerRandom(ctx *context.T) (connInfo, error) {
pm.Lock()
defer pm.Unlock()
pm.evictClosedPeerConnsLocked(ctx)
var nullPeer connInfo
if len(pm.healthyPeerCache) == 0 {
return nullPeer, verror.New(verror.ErrInternal, ctx, "no usable peer")
}
// Pick a peer at random.
ind := pm.s.randIntn(len(pm.healthyPeerCache))
for _, info := range pm.healthyPeerCache {
if ind == 0 {
return *info, nil
}
ind--
}
panic("random selection didn't succeed")
}
func (pm *peerManagerImpl) updatePeerFromSyncer(ctx *context.T, peer connInfo, attemptTs time.Time, failed bool) error {
pm.Lock()
defer pm.Unlock()
info, ok := pm.peerTbl[peer.relName]
if !ok {
info = &peerSyncInfo{}
pm.peerTbl[peer.relName] = info
}
info.attemptTs = attemptTs
if failed { // Handle failed sync attempt.
// Evict the peer from healthyPeerCache.
delete(pm.healthyPeerCache, peer.relName)
if peer.pinned != nil {
peer.pinned.Unpin()
}
if peer.addrs != nil {
info.numFailuresNeighborhood++
} else {
info.numFailuresMountTable++
}
} else {
if peer.addrs != nil {
info.numFailuresNeighborhood = 0
} else {
info.numFailuresMountTable = 0
}
info.successTs = time.Now()
}
return nil
}
// TODO(hpucha): Implement this.
func (pm *peerManagerImpl) updatePeerFromResponder(ctx *context.T, peer string, connTs time.Time, gv interfaces.Knowledge) error {
return nil
}
////////////////////////////////////////
// Helpers.
// roundsToBackoff computes the exponential backoff with a cap on the backoff.
func roundsToBackoff(failures uint64) uint64 {
if failures > pingFailuresCap {
failures = pingFailuresCap
}
return 1 << failures
}
// Caller of this function should hold the lock to manipulate the shared
// healthyPeerCache.
func (pm *peerManagerImpl) evictClosedPeerConnsLocked(ctx *context.T) {
for p, info := range pm.healthyPeerCache {
// TODO(suharshs): If having many goroutines is not a problem, we should have
// a goroutine monitoring each conn's status and reconnecting as needed.
select {
case <-info.pinned.Conn().Closed():
delete(pm.healthyPeerCache, p)
info.pinned.Unpin()
default:
}
}
}
func (pm *peerManagerImpl) pingPeers(ctx *context.T, peers []*connInfo) []*connInfo {
type nameInfo struct {
ci *connInfo
mtTbl bool
index int
}
nm := make(map[string]*nameInfo)
names := make([]string, 0, len(peers))
for _, p := range peers {
for i, a := range p.addrs {
n := naming.Join(a, common.SyncbaseSuffix)
names = append(names, n)
nm[n] = &nameInfo{
ci: p,
mtTbl: false,
index: i,
}
}
for i, mt := range p.mtTbls {
n := naming.Join(mt, p.relName, common.SyncbaseSuffix)
names = append(names, n)
nm[n] = &nameInfo{
ci: p,
mtTbl: true,
index: i,
}
}
}
vlog.VI(4).Infof("sync: pingPeers: sending names %v", names)
res, err := ping.PingInParallel(ctx, names, NeighborConnectionTimeout, channelTimeout)
if err != nil {
return nil
}
vlog.VI(4).Infof("sync: pingPeers: returned result %v", res)
// Make a list of the successful peers with their mount tables or
// neighborhood addresses that succeeded.
speers := make(map[string]*connInfo)
var speersArr []*connInfo
for _, r := range res {
if r.Err != nil {
continue
}
info := nm[r.Name]
ci, ok := speers[info.ci.relName]
if !ok {
ci = &connInfo{relName: info.ci.relName, pinned: r.Conn}
speers[info.ci.relName] = ci
speersArr = append(speersArr, ci)
} else {
r.Conn.Unpin()
}
if info.mtTbl {
ci.mtTbls = append(ci.mtTbls, info.ci.mtTbls[info.index])
} else {
ci.addrs = append(ci.addrs, info.ci.addrs[info.index])
}
}
return speersArr
}