blob: 35bec5e628bbb5833905fc362a3b89b4bc31f119 [file] [log] [blame]
// Copyright 2016 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package internal
import (
"fmt"
"io"
"math/rand"
"sort"
"sync"
"time"
"v.io/v23/context"
"v.io/v23/discovery"
"v.io/v23/naming"
"v.io/v23/options"
"v.io/v23/rpc"
"v.io/v23/security/access"
"v.io/v23/verror"
"v.io/x/ref/lib/stats/counter"
"messenger/ifc"
)
func startPeerManager(ctx *context.T, id string, updateChan <-chan discovery.Update, ps *PubSub, store MessengerStorage, params Params, counters *Counters) *peerManager {
pm := &peerManager{
id: id,
store: store,
params: params,
peers: make(map[string]*peer),
ps: ps,
counters: counters,
done: make(chan struct{}),
}
go pm.loop(ctx, updateChan)
return pm
}
type peerManager struct {
mu sync.Mutex
id string
store MessengerStorage
params Params
peers map[string]*peer
ps *PubSub
counters *Counters
done chan struct{}
}
func (pm *peerManager) debugString() string {
pm.mu.Lock()
defer pm.mu.Unlock()
active := []string{}
for _, p := range pm.peers {
if p.ctx != nil {
active = append(active, p.debugString())
}
}
return fmt.Sprintf("PeerManager ID:%s MaxActivePeers:%d NumPeers:%d Active:%v",
pm.id, pm.params.MaxActivePeers, len(pm.peers), active)
}
func (pm *peerManager) loop(ctx *context.T, updateChan <-chan discovery.Update) {
var (
changed bool
lastCheck time.Time
)
for {
select {
case update, ok := <-updateChan:
if !ok {
pm.removePeer(ctx, "")
close(pm.done)
return
}
pm.processDiscoveryUpdate(ctx, update)
changed = true
case <-time.After(time.Second):
// Updates tend to arrive in batches. Calling
// checkActivePeers on a timer makes its cost
// independent of the number of updates received in
// the interval.
if changed || time.Since(lastCheck) >= time.Minute {
pm.checkActivePeers(ctx)
changed = false
lastCheck = time.Now()
}
}
}
}
// processDiscoveryUpdate adds and removes peers as reported in the discovery
// update.
func (pm *peerManager) processDiscoveryUpdate(ctx *context.T, update discovery.Update) {
pm.mu.Lock()
defer pm.mu.Unlock()
id := update.Id().String()
if update.IsLost() {
ctx.Infof("%s lost peer %s", pm.id, id)
pm.removePeer(ctx, id)
return
}
p, exists := pm.peers[id]
if !exists {
p = &peer{
peerId: id,
localId: pm.id,
store: pm.store,
params: pm.params,
counters: pm.counters,
}
p.errors = counter.New()
pm.peers[id] = p
pm.counters.numPeers.Incr(1)
ctx.Infof("%s has new peer %s", pm.id, id)
}
me := &naming.MountEntry{}
for _, a := range update.Addresses() {
me.Servers = append(me.Servers, naming.MountedServer{Server: a})
}
p.setMountEntry(me)
}
// checkActivePeers decides which peers this node should talk to.
//
// The MaxActivePeers parameter controls how many peers this node should talk
// to. If MaxActivePeers >= the number of peers, this node can talk to all of
// them.
//
// When MaxActivePeers is less than the number of peers, we need to choose which
// peers to talk to such that message propagation to all the peers is likely.
//
// Each node has a unique ID, which is known by all of its peers. So peers can
// be sorted and collectively treated as a ring.
//
// Each node then picks N peers to talk to, where N <= MaxActivePeers. These
// N peers are always the first N power of 2 neighbors to the right, i.e.
//
// Ring: A->B->C->D->E->F->G->H->I->J->K->L->M (->A)
//
// If N=1, Node C talks to D (distance 1)
// If N=2, Node C talks to D, and E (distance 2)
// If N=3, Node C talks to D, E, and G (distance 4)
// If N=4, Node C talks to D, E, G, and K (distance 8)
// etc.
// When a power of 2 falls on a neighbor that was already selected, the next
// unselected neighbor is selected instead.
func (pm *peerManager) checkActivePeers(ctx *context.T) {
pm.mu.Lock()
defer pm.mu.Unlock()
numPeers := len(pm.peers)
if pm.params.MaxActivePeers == 0 || numPeers <= pm.params.MaxActivePeers {
for _, p := range pm.peers {
pm.startPeer(ctx, p)
}
return
}
sortedPeers := make([]string, 0, numPeers)
for id := range pm.peers {
sortedPeers = append(sortedPeers, id)
}
sort.Strings(sortedPeers)
// Find this node's (would-be) position in the list.
offset := 0
for ; offset < numPeers; offset++ {
if pm.id < sortedPeers[offset] {
break
}
}
// Calculate which neighboring nodes to talk to.
const peerErrorThreshold = 10
bad := make([]bool, numPeers)
for i := 0; i < numPeers; i++ {
if pm.peers[sortedPeers[i]].errors.Delta1h() >= peerErrorThreshold {
bad[i] = true
}
}
active := make([]bool, numPeers)
for i := 0; i < len(active) && i < pm.params.MaxActivePeers; i++ {
idx := (offset + (1<<uint(i%32) - 1)) % numPeers
// Skip peers that are already selected or that are bad.
for j := 0; j < numPeers; j++ {
if !active[idx] && !bad[idx] {
break
}
idx = (idx + 1) % numPeers
}
// If we can't find a peer that isn't already selected or bad,
// just find one that isn't already selected.
for active[idx] {
idx = (idx + 1) % numPeers
}
active[idx] = true
}
for i := 0; i < numPeers; i++ {
p := pm.peers[sortedPeers[i]]
if active[i] {
pm.startPeer(ctx, p)
} else {
pm.stopPeer(ctx, p)
}
}
}
// startPeer starts sending messages to the peer.
func (pm *peerManager) startPeer(ctx *context.T, p *peer) {
if p.ctx == nil {
ctx.Infof("start peer %s", p.peerId)
p.ctx, p.cancel = context.WithCancel(ctx)
p.queue = newMsgQueue(p.ctx)
p.psChan = pm.ps.Sub()
p.done = make(chan struct{})
go p.subLoop()
go p.msgLoop()
}
}
// stopPeer stops sending messages to the peer.
func (pm *peerManager) stopPeer(ctx *context.T, p *peer) {
if p.ctx != nil {
ctx.Infof("stop peer %s", p.peerId)
pm.ps.Unsub(p.psChan)
p.cancel()
<-p.done
p.ctx = nil
p.cancel = nil
p.psChan = nil
p.queue = nil
p.done = nil
}
}
// removePeer removes the peer from the peer list, after stopping it.
func (pm *peerManager) removePeer(ctx *context.T, id string) {
for _, p := range pm.peers {
delete(pm.peers, id)
pm.stopPeer(ctx, p)
pm.counters.numPeers.Incr(-1)
}
}
type peer struct {
ctx *context.T
cancel func()
localId string
peerId string
store MessengerStorage
params Params
queue *MsgQueue
counters *Counters
psChan chan *ifc.Message
done chan struct{}
errors *counter.Counter
// mu guards me
mu sync.Mutex
me *naming.MountEntry
}
func (p *peer) debugString() string {
l := 0
if p.queue != nil {
l = p.queue.Len()
}
return fmt.Sprintf("Peer ID:%s QueueLen:%d", p.peerId, l)
}
func (p *peer) setMountEntry(me *naming.MountEntry) {
p.mu.Lock()
defer p.mu.Unlock()
p.me = me
}
func (p *peer) mountEntry() *naming.MountEntry {
p.mu.Lock()
defer p.mu.Unlock()
me := *p.me
return &me
}
func (p *peer) checkHops(ctx *context.T, msg *ifc.Message) error {
for _, h := range msg.Hops {
if h == p.peerId {
return ifc.NewErrAlreadySeen(ctx, msg.Id)
}
}
if p.params.MaxHops > 0 && len(msg.Hops) >= p.params.MaxHops {
return ifc.NewErrTooManyHops(ctx, int32(p.params.MaxHops))
}
return nil
}
func (p *peer) subLoop() {
ctx := p.ctx
for {
select {
case <-ctx.Done():
return
case msg := <-p.psChan:
if msg == nil {
return
}
if p.checkHops(ctx, msg) != nil {
continue
}
p.queue.Insert(msg.CreationTime, msg.Id)
}
}
}
func (p *peer) msgLoop() {
const maxBackoff = 5 * time.Minute
ctx := p.ctx
for attempt := uint(0); ; attempt++ {
if err := backoff(p.ctx, func() error { return p.diffAndEnqueue(p.ctx) }); err != nil {
p.errors.Incr(1)
delay := time.Duration((5+rand.Intn(5))<<attempt) * time.Second
if delay > maxBackoff {
delay = maxBackoff
}
select {
case <-p.ctx.Done():
close(p.done)
return
case <-time.After(delay):
}
continue
}
break
}
for {
var msgId string
select {
case <-ctx.Done():
close(p.done)
return
case msgId = <-p.queue.Pop():
}
pi := p.store.PeerInfo(ctx, msgId, p.peerId)
if pi.Sent || pi.FatalError {
continue
}
if time.Now().Before(pi.NextAttempt) {
p.queue.Insert(pi.NextAttempt, msgId)
continue
}
pi.NumAttempts++
switch err := p.pushMessage(ctx, msgId); {
case err == nil:
pi.Sent = true
p.counters.numMessagesSent.Incr(1)
case verror.ErrorID(err) == ifc.ErrAlreadySeen.ID:
pi.Sent = true
case verror.ErrorID(err) == ifc.ErrContentMismatch.ID:
pi.FatalError = true
case verror.ErrorID(err) == ifc.ErrExpired.ID:
pi.FatalError = true
case verror.ErrorID(err) == ifc.ErrInvalidSignature.ID:
pi.FatalError = true
case verror.ErrorID(err) == ifc.ErrMessageIdCollision.ID:
pi.FatalError = true
case verror.ErrorID(err) == ifc.ErrNoRoute.ID:
pi.FatalError = true
case verror.ErrorID(err) == ifc.ErrTooManyHops.ID:
pi.FatalError = true
case verror.ErrorID(err) == ifc.ErrTooBig.ID:
pi.FatalError = true
case verror.ErrorID(err) == access.ErrNoPermissions.ID:
pi.FatalError = true
case verror.ErrorID(err) == verror.ErrUnknownMethod.ID:
pi.FatalError = true
case verror.ErrorID(err) == verror.ErrBadArg.ID:
pi.FatalError = true
case verror.ErrorID(err) == verror.ErrNoAccess.ID:
pi.FatalError = true
case verror.ErrorID(err) == verror.ErrNotTrusted.ID:
pi.FatalError = true
case verror.ErrorID(err) == verror.ErrNoExist.ID:
pi.FatalError = true
default:
n := uint(pi.NumAttempts)
// This is (10 to 20) * 2^n sec.
backoff := time.Duration((10+rand.Intn(10))<<n) * time.Second
if backoff > maxBackoff {
backoff = maxBackoff
}
pi.NextAttempt = time.Now().Add(backoff)
p.queue.Insert(pi.NextAttempt, msgId)
}
if pi.FatalError || pi.NumAttempts > 2 {
p.errors.Incr(1)
}
if err := p.store.SetPeerInfo(ctx, msgId, p.peerId, pi); err != nil {
ctx.Infof("store.SetPeerInfo failed: %v", err)
}
}
}
func (p *peer) pushMessage(ctx *context.T, msgId string) error {
ctx.Infof("Pushing %s to %s", msgId, p.peerId)
msg, r, err := p.store.OpenRead(ctx, msgId)
if err != nil {
return err
}
defer r.Close()
if err := msg.Expired(ctx); err != nil {
return err
}
msg.Hops = append(msg.Hops, p.localId)
opts := []rpc.CallOpt{
options.Preresolved{p.mountEntry()},
options.ServerAuthorizer{p.params.RateAclOut},
}
offset := int64(0)
fPush := func() error {
return p.doPush(ctx, msg, offset, r, opts)
}
push:
if err = backoff(ctx, fPush); verror.ErrorID(err) == ifc.ErrIncorrectOffset.ID {
fOffset := func() error {
var err error
offset, err = ifc.MessengerClient("").ResumeOffset(ctx, msg, opts...)
return err
}
if err = backoff(ctx, fOffset); err != nil {
return err
}
if _, err := r.Seek(offset, 0); err != nil {
return err
}
goto push
}
return err
}
func backoff(ctx *context.T, f func() error) (err error) {
for n := uint(0); ; n++ {
if err = f(); verror.Action(err) == verror.RetryBackoff {
const maxBackoff = time.Minute
// This is (100 to 200) * 2^n ms.
backoff := time.Duration((100+rand.Intn(100))<<n) * time.Millisecond
if backoff > maxBackoff {
backoff = maxBackoff
}
select {
case <-ctx.Done():
case <-time.After(backoff):
}
continue
}
return
}
}
func (p *peer) doPush(ctx *context.T, msg ifc.Message, offset int64, r io.Reader, opts []rpc.CallOpt) error {
call, err := ifc.MessengerClient("").Push(ctx, msg, offset, opts...)
if err != nil {
return err
}
buf := make([]byte, 2048)
for {
n, err := r.Read(buf)
if n > 0 {
if err := call.SendStream().Send(buf[:n]); err != nil {
return err
}
p.counters.numBytesSent.Incr(int64(n))
}
if err == io.EOF {
break
}
if err != nil {
return err
}
}
return call.Finish()
}
func (p *peer) diffAndEnqueue(ctx *context.T) error {
opts := []rpc.CallOpt{
options.Preresolved{p.mountEntry()},
options.ServerAuthorizer{p.params.RateAclOut},
}
mctx, cancel := context.WithCancel(ctx)
defer cancel()
ch, err := p.store.Manifest(mctx)
if err != nil {
return err
}
const batchSize = 256
type batchItem struct {
id string
pi PeerInfo
}
var call ifc.MessengerDiffClientCall
for {
batch := make([]string, 0, batchSize)
batchInfo := make([]batchItem, 0, cap(batch))
for i := 0; i < cap(batch); i++ {
var msg *ifc.Message
select {
case <-ctx.Done():
return verror.NewErrCanceled(ctx)
case msg = <-ch:
}
if msg == nil {
break
}
pi := p.store.PeerInfo(ctx, msg.Id, p.peerId)
if pi.Sent || pi.FatalError {
continue
}
if p.checkHops(ctx, msg) != nil {
continue
}
batch = append(batch, msg.Id)
if pi.NextAttempt.IsZero() {
// Make an effort to deliver the messages in the
// order they were created.
pi.NextAttempt = msg.CreationTime
}
batchInfo = append(batchInfo, batchItem{msg.Id, pi})
}
if len(batch) == 0 {
break
}
if call == nil {
if call, err = ifc.MessengerClient("").Diff(ctx, opts...); err != nil {
return err
}
}
if err := call.SendStream().Send(batch); err != nil {
return err
}
if !call.RecvStream().Advance() {
return verror.New(verror.ErrBadProtocol, ctx)
}
responses := call.RecvStream().Value()
if len(batch) != len(responses) {
return verror.New(verror.ErrBadProtocol, ctx)
}
for i, hasIt := range responses {
id, pi := batchInfo[i].id, batchInfo[i].pi
if hasIt {
pi.Sent = true
if err := p.store.SetPeerInfo(ctx, id, p.peerId, pi); err != nil {
ctx.Infof("store.SetPeerInfo failed: %v", err)
}
continue
}
p.queue.Insert(pi.NextAttempt, id)
}
}
if call == nil {
return nil
}
if err := call.SendStream().Close(); err != nil {
return err
}
return call.Finish()
}