blob: 6f1c507ba01a394190c1ad0e2423b8cf304b0237 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package vsync
import (
"sync"
"time"
"v.io/v23/context"
"v.io/v23/verror"
"v.io/x/lib/vlog"
"v.io/x/ref/services/syncbase/server/interfaces"
)
// Policies to pick a peer to sync with.
const (
// Picks a peer at random from the available set.
selectRandom = iota
// Picks a peer based on network availability and available Syncbases
// over the neighborhood via discovery.
selectNeighborhoodAware
// TODO(hpucha): implement these policies.
// Picks a peer with most differing generations.
selectMostDiff
// Picks a peer that was synced with the furthest in the past.
selectOldest
)
var (
// peerSyncInterval is the duration between two consecutive peer
// contacts. During every peer contact, the initiator obtains any
// pending updates from that peer.
peerSyncInterval = 50 * time.Millisecond
// peerSelectionPolicy is the policy used to select a peer when
// the initiator gets a chance to sync.
peerSelectionPolicy = selectRandom
// We wait for connectionTimeOut duration for a connection to be
// established with a peer reachable via a chosen mount table.
connectionTimeOut = 2 * time.Second
)
// connInfo holds the information needed to connect to a peer.
//
// TODO(hpucha): Add hints to decide if both neighborhood and mount table must
// be tried. Currently, if the addr is set, only the addr is tried.
type connInfo struct {
// Name of the peer relative to the mount table chosen by the syncgroup
// creator.
relName string
// Network address of the peer if available. For example, this can be
// obtained from neighborhood discovery.
addr string
}
// peerSelector defines the interface that a peer selection algorithm must
// provide.
type peerSelector interface {
// pickPeer picks a Syncbase to sync with.
pickPeer(ctx *context.T) (connInfo, error)
// updatePeerFromSyncer updates information for a peer that the syncer
// attempts to connect to.
updatePeerFromSyncer(ctx *context.T, peer connInfo, attemptTs time.Time, failed bool) error
// updatePeerFromResponder updates information for a peer that the
// responder responds to.
updatePeerFromResponder(ctx *context.T, peer string, connTs time.Time, gv interfaces.GenVector) error
}
// syncer wakes up every peerSyncInterval to do work: (1) Refresh memberView if
// needed and pick a peer from all the known remote peers to sync with. (2) Act
// as an initiator and sync syncgroup metadata for all common syncgroups with
// the chosen peer (getting updates from the remote peer, detecting and
// resolving conflicts) (3) Act as an initiator and sync data corresponding to
// all common syncgroups across all Apps/Databases with the chosen peer; (4)
// Fetch any queued blobs. (5) Transfer ownership of blobs if needed. (6) Act as
// a syncgroup publisher to publish pending syncgroups; (6) Garbage collect
// older generations.
//
// TODO(hpucha): Currently only does initiation. Add rest.
func (s *syncService) syncer(ctx *context.T) {
defer s.pending.Done()
s.newPeerSelector(ctx)
ticker := time.NewTicker(peerSyncInterval)
defer ticker.Stop()
for !s.Closed() {
select {
case <-ticker.C:
if s.Closed() {
break
}
s.syncerWork(ctx)
case <-s.closed:
break
}
}
vlog.VI(1).Info("sync: syncer: channel closed, stop work and exit")
}
func (s *syncService) syncerWork(ctx *context.T) {
// TODO(hpucha): Cut a gen for the responder even if there is no
// one to initiate to?
// Do work.
attemptTs := time.Now()
peer, err := s.ps.pickPeer(ctx)
if err != nil {
return
}
err = s.syncClock(ctx, peer)
// Abort syncing if there is a connection error with peer.
if verror.ErrorID(err) != interfaces.ErrConnFail.ID {
err = s.getDeltas(ctx, peer)
}
s.ps.updatePeerFromSyncer(ctx, peer, attemptTs, verror.ErrorID(err) == interfaces.ErrConnFail.ID)
}
////////////////////////////////////////
// Peer selection policies.
func (s *syncService) newPeerSelector(ctx *context.T) error {
switch peerSelectionPolicy {
case selectRandom:
s.ps = &randomPeerSelector{s: s}
return nil
case selectNeighborhoodAware:
s.ps = &neighborhoodAwarePeerSelector{s: s}
return nil
default:
return verror.New(verror.ErrInternal, ctx, "unknown peer selection policy")
}
}
////////////////////////////////////////
// Random selector.
type randomPeerSelector struct {
s *syncService
}
func (ps *randomPeerSelector) pickPeer(ctx *context.T) (connInfo, error) {
var peer connInfo
members := ps.s.getMembers(ctx)
// Remove myself from the set.
delete(members, ps.s.name)
if len(members) == 0 {
return peer, verror.New(verror.ErrInternal, ctx, "no useful peer")
}
// Pick a peer at random.
ind := randIntn(len(members))
for m := range members {
if ind == 0 {
peer.relName = m
return peer, nil
}
ind--
}
return peer, verror.New(verror.ErrInternal, ctx, "random selection didn't succeed")
}
func (ps *randomPeerSelector) updatePeerFromSyncer(ctx *context.T, peer connInfo, attemptTs time.Time, failed bool) error {
// Random selector does not care about this information.
return nil
}
func (ps *randomPeerSelector) updatePeerFromResponder(ctx *context.T, peer string, connTs time.Time, gv interfaces.GenVector) error {
// Random selector does not care about this information.
return nil
}
////////////////////////////////////////
// NeighborhoodAware selector.
// peerSyncInfo is the running statistics collected per peer, for a peer which
// syncs with this node or with which this node syncs with.
type peerSyncInfo struct {
// Number of continuous failures noticed when attempting to connect with
// this peer, either via its advertised mount table or via
// neighborhood. These counters are reset when the connection to the
// peer succeeds.
numFailuresMountTable uint64
numFailuresNeighborhood uint64
// The most recent timestamp when a connection to this peer was attempted.
attemptTs time.Time
// The most recent timestamp when a connection to this peer succeeded.
successTs time.Time
// The most recent timestamp when this peer synced with this node.
fromTs time.Time
// Map of database names and their corresponding generation vectors for
// data and syncgroups.
gvs map[string]interfaces.GenVector
}
type neighborhoodAwarePeerSelector struct {
s *syncService
// In-memory cache of information relevant to syncing with a peer. This
// information could potentially be used in peer selection.
peerTbl map[string]*peerSyncInfo
peerTblLock sync.RWMutex
}
func (ps *neighborhoodAwarePeerSelector) pickPeer(ctx *context.T) (connInfo, error) {
var peer connInfo
return peer, nil
}
func (ps *neighborhoodAwarePeerSelector) updatePeerFromSyncer(ctx *context.T, peer connInfo, attemptTs time.Time, failed bool) error {
ps.peerTblLock.Lock()
defer ps.peerTblLock.Unlock()
info, ok := ps.peerTbl[peer.relName]
if !ok {
info = &peerSyncInfo{}
ps.peerTbl[peer.relName] = info
}
info.attemptTs = attemptTs
if !failed {
info.numFailuresMountTable = 0
info.numFailuresNeighborhood = 0
info.successTs = time.Now()
return nil
}
if peer.addr != "" {
info.numFailuresNeighborhood++
} else {
info.numFailuresMountTable++
}
return nil
}
func (ps *neighborhoodAwarePeerSelector) updatePeerFromResponder(ctx *context.T, peer string, connTs time.Time, gv interfaces.GenVector) error {
return nil
}