TBR: Avoid bad peers
Keep track of peers that return a lot of fatal errors and exclude them
from when peer selection is required.
Change-Id: I0438b8cbeb7d08545740f2b54fe87e455825defa
diff --git a/internal/peers.go b/internal/peers.go
index 58e67f6..35bec5e 100644
--- a/internal/peers.go
+++ b/internal/peers.go
@@ -17,8 +17,11 @@
"v.io/v23/naming"
"v.io/v23/options"
"v.io/v23/rpc"
+ "v.io/v23/security/access"
"v.io/v23/verror"
+ "v.io/x/ref/lib/stats/counter"
+
"messenger/ifc"
)
@@ -61,7 +64,10 @@
}
func (pm *peerManager) loop(ctx *context.T, updateChan <-chan discovery.Update) {
- var changed bool
+ var (
+ changed bool
+ lastCheck time.Time
+ )
for {
select {
case update, ok := <-updateChan:
@@ -77,9 +83,10 @@
// checkActivePeers on a timer makes its cost
// independent of the number of updates received in
// the interval.
- if changed {
+ if changed || time.Since(lastCheck) >= time.Minute {
pm.checkActivePeers(ctx)
changed = false
+ lastCheck = time.Now()
}
}
}
@@ -105,6 +112,7 @@
params: pm.params,
counters: pm.counters,
}
+ p.errors = counter.New()
pm.peers[id] = p
pm.counters.numPeers.Incr(1)
ctx.Infof("%s has new peer %s", pm.id, id)
@@ -143,24 +151,15 @@
func (pm *peerManager) checkActivePeers(ctx *context.T) {
pm.mu.Lock()
defer pm.mu.Unlock()
- if pm.params.MaxActivePeers == 0 || len(pm.peers) < pm.params.MaxActivePeers {
+ numPeers := len(pm.peers)
+ if pm.params.MaxActivePeers == 0 || numPeers <= pm.params.MaxActivePeers {
for _, p := range pm.peers {
pm.startPeer(ctx, p)
}
return
}
- // Calculate which neighboring nodes to talk to.
- active := make([]bool, len(pm.peers))
- for i := 0; i < len(active) && i < pm.params.MaxActivePeers; i++ {
- idx := (1<<uint(i%32) - 1) % len(active)
- for active[idx] {
- idx = (idx + 1) % len(active)
- }
- active[idx] = true
- }
-
- sortedPeers := make([]string, 0, len(pm.peers))
+ sortedPeers := make([]string, 0, numPeers)
for id := range pm.peers {
sortedPeers = append(sortedPeers, id)
}
@@ -168,15 +167,41 @@
// Find this node's (would-be) position in the list.
offset := 0
- for ; offset < len(sortedPeers); offset++ {
+ for ; offset < numPeers; offset++ {
if pm.id < sortedPeers[offset] {
break
}
}
- for i := 0; i < len(sortedPeers); i++ {
- id := sortedPeers[(offset+i)%len(sortedPeers)]
- p := pm.peers[id]
+ // Calculate which neighboring nodes to talk to.
+ const peerErrorThreshold = 10
+ bad := make([]bool, numPeers)
+ for i := 0; i < numPeers; i++ {
+ if pm.peers[sortedPeers[i]].errors.Delta1h() >= peerErrorThreshold {
+ bad[i] = true
+ }
+ }
+
+ active := make([]bool, numPeers)
+ for i := 0; i < len(active) && i < pm.params.MaxActivePeers; i++ {
+ idx := (offset + (1<<uint(i%32) - 1)) % numPeers
+ // Skip peers that are already selected or that are bad.
+ for j := 0; j < numPeers; j++ {
+ if !active[idx] && !bad[idx] {
+ break
+ }
+ idx = (idx + 1) % numPeers
+ }
+ // If we can't find a peer that isn't already selected or bad,
+ // just find one that isn't already selected.
+ for active[idx] {
+ idx = (idx + 1) % numPeers
+ }
+ active[idx] = true
+ }
+
+ for i := 0; i < numPeers; i++ {
+ p := pm.peers[sortedPeers[i]]
if active[i] {
pm.startPeer(ctx, p)
} else {
@@ -233,6 +258,7 @@
counters *Counters
psChan chan *ifc.Message
done chan struct{}
+ errors *counter.Counter
// mu guards me
mu sync.Mutex
me *naming.MountEntry
@@ -290,14 +316,20 @@
}
func (p *peer) msgLoop() {
+ const maxBackoff = 5 * time.Minute
ctx := p.ctx
- for {
+ for attempt := uint(0); ; attempt++ {
if err := backoff(p.ctx, func() error { return p.diffAndEnqueue(p.ctx) }); err != nil {
+ p.errors.Incr(1)
+ delay := time.Duration((5+rand.Intn(5))<<attempt) * time.Second
+ if delay > maxBackoff {
+ delay = maxBackoff
+ }
select {
case <-p.ctx.Done():
close(p.done)
return
- case <-time.After(5 * time.Second):
+ case <-time.After(delay):
}
continue
}
@@ -343,8 +375,12 @@
pi.FatalError = true
case verror.ErrorID(err) == ifc.ErrTooBig.ID:
pi.FatalError = true
+ case verror.ErrorID(err) == access.ErrNoPermissions.ID:
+ pi.FatalError = true
case verror.ErrorID(err) == verror.ErrUnknownMethod.ID:
pi.FatalError = true
+ case verror.ErrorID(err) == verror.ErrBadArg.ID:
+ pi.FatalError = true
case verror.ErrorID(err) == verror.ErrNoAccess.ID:
pi.FatalError = true
case verror.ErrorID(err) == verror.ErrNotTrusted.ID:
@@ -352,7 +388,6 @@
case verror.ErrorID(err) == verror.ErrNoExist.ID:
pi.FatalError = true
default:
- const maxBackoff = 5 * time.Minute
n := uint(pi.NumAttempts)
// This is (10 to 20) * 2^n sec.
backoff := time.Duration((10+rand.Intn(10))<<n) * time.Second
@@ -362,6 +397,9 @@
pi.NextAttempt = time.Now().Add(backoff)
p.queue.Insert(pi.NextAttempt, msgId)
}
+ if pi.FatalError || pi.NumAttempts > 2 {
+ p.errors.Incr(1)
+ }
if err := p.store.SetPeerInfo(ctx, msgId, p.peerId, pi); err != nil {
ctx.Infof("store.SetPeerInfo failed: %v", err)
}
diff --git a/internal/rateacl.go b/internal/rateacl.go
index 2439309..c882199 100644
--- a/internal/rateacl.go
+++ b/internal/rateacl.go
@@ -46,6 +46,10 @@
if !r[i].Acl.Includes(blessings...) {
continue
}
+ if r[i].Limit == 0 {
+ ctx.Infof("No permissions for %v matched by %v", r[i].Acl, blessings)
+ break
+ }
if rate := r[i].c.Rate1m(); rate >= float64(r[i].Limit) {
ctx.Infof("Rate limit exceeded for %v matched by %v, %f >= %f", r[i].Acl, blessings, rate, r[i].Limit)
return ifc.NewErrRateLimitExceeded(ctx, r[i].Limit)