TBR: Avoid bad peers

Keep track of peers that return a lot of fatal errors and exclude them
from when peer selection is required.

Change-Id: I0438b8cbeb7d08545740f2b54fe87e455825defa
diff --git a/internal/peers.go b/internal/peers.go
index 58e67f6..35bec5e 100644
--- a/internal/peers.go
+++ b/internal/peers.go
@@ -17,8 +17,11 @@
 	"v.io/v23/naming"
 	"v.io/v23/options"
 	"v.io/v23/rpc"
+	"v.io/v23/security/access"
 	"v.io/v23/verror"
 
+	"v.io/x/ref/lib/stats/counter"
+
 	"messenger/ifc"
 )
 
@@ -61,7 +64,10 @@
 }
 
 func (pm *peerManager) loop(ctx *context.T, updateChan <-chan discovery.Update) {
-	var changed bool
+	var (
+		changed   bool
+		lastCheck time.Time
+	)
 	for {
 		select {
 		case update, ok := <-updateChan:
@@ -77,9 +83,10 @@
 			// checkActivePeers on a timer makes its cost
 			// independent of the number of updates received in
 			// the interval.
-			if changed {
+			if changed || time.Since(lastCheck) >= time.Minute {
 				pm.checkActivePeers(ctx)
 				changed = false
+				lastCheck = time.Now()
 			}
 		}
 	}
@@ -105,6 +112,7 @@
 			params:   pm.params,
 			counters: pm.counters,
 		}
+		p.errors = counter.New()
 		pm.peers[id] = p
 		pm.counters.numPeers.Incr(1)
 		ctx.Infof("%s has new peer %s", pm.id, id)
@@ -143,24 +151,15 @@
 func (pm *peerManager) checkActivePeers(ctx *context.T) {
 	pm.mu.Lock()
 	defer pm.mu.Unlock()
-	if pm.params.MaxActivePeers == 0 || len(pm.peers) < pm.params.MaxActivePeers {
+	numPeers := len(pm.peers)
+	if pm.params.MaxActivePeers == 0 || numPeers <= pm.params.MaxActivePeers {
 		for _, p := range pm.peers {
 			pm.startPeer(ctx, p)
 		}
 		return
 	}
 
-	// Calculate which neighboring nodes to talk to.
-	active := make([]bool, len(pm.peers))
-	for i := 0; i < len(active) && i < pm.params.MaxActivePeers; i++ {
-		idx := (1<<uint(i%32) - 1) % len(active)
-		for active[idx] {
-			idx = (idx + 1) % len(active)
-		}
-		active[idx] = true
-	}
-
-	sortedPeers := make([]string, 0, len(pm.peers))
+	sortedPeers := make([]string, 0, numPeers)
 	for id := range pm.peers {
 		sortedPeers = append(sortedPeers, id)
 	}
@@ -168,15 +167,41 @@
 
 	// Find this node's (would-be) position in the list.
 	offset := 0
-	for ; offset < len(sortedPeers); offset++ {
+	for ; offset < numPeers; offset++ {
 		if pm.id < sortedPeers[offset] {
 			break
 		}
 	}
 
-	for i := 0; i < len(sortedPeers); i++ {
-		id := sortedPeers[(offset+i)%len(sortedPeers)]
-		p := pm.peers[id]
+	// Calculate which neighboring nodes to talk to.
+	const peerErrorThreshold = 10
+	bad := make([]bool, numPeers)
+	for i := 0; i < numPeers; i++ {
+		if pm.peers[sortedPeers[i]].errors.Delta1h() >= peerErrorThreshold {
+			bad[i] = true
+		}
+	}
+
+	active := make([]bool, numPeers)
+	for i := 0; i < len(active) && i < pm.params.MaxActivePeers; i++ {
+		idx := (offset + (1<<uint(i%32) - 1)) % numPeers
+		// Skip peers that are already selected or that are bad.
+		for j := 0; j < numPeers; j++ {
+			if !active[idx] && !bad[idx] {
+				break
+			}
+			idx = (idx + 1) % numPeers
+		}
+		// If we can't find a peer that isn't already selected or bad,
+		// just find one that isn't already selected.
+		for active[idx] {
+			idx = (idx + 1) % numPeers
+		}
+		active[idx] = true
+	}
+
+	for i := 0; i < numPeers; i++ {
+		p := pm.peers[sortedPeers[i]]
 		if active[i] {
 			pm.startPeer(ctx, p)
 		} else {
@@ -233,6 +258,7 @@
 	counters *Counters
 	psChan   chan *ifc.Message
 	done     chan struct{}
+	errors   *counter.Counter
 	// mu guards me
 	mu sync.Mutex
 	me *naming.MountEntry
@@ -290,14 +316,20 @@
 }
 
 func (p *peer) msgLoop() {
+	const maxBackoff = 5 * time.Minute
 	ctx := p.ctx
-	for {
+	for attempt := uint(0); ; attempt++ {
 		if err := backoff(p.ctx, func() error { return p.diffAndEnqueue(p.ctx) }); err != nil {
+			p.errors.Incr(1)
+			delay := time.Duration((5+rand.Intn(5))<<attempt) * time.Second
+			if delay > maxBackoff {
+				delay = maxBackoff
+			}
 			select {
 			case <-p.ctx.Done():
 				close(p.done)
 				return
-			case <-time.After(5 * time.Second):
+			case <-time.After(delay):
 			}
 			continue
 		}
@@ -343,8 +375,12 @@
 			pi.FatalError = true
 		case verror.ErrorID(err) == ifc.ErrTooBig.ID:
 			pi.FatalError = true
+		case verror.ErrorID(err) == access.ErrNoPermissions.ID:
+			pi.FatalError = true
 		case verror.ErrorID(err) == verror.ErrUnknownMethod.ID:
 			pi.FatalError = true
+		case verror.ErrorID(err) == verror.ErrBadArg.ID:
+			pi.FatalError = true
 		case verror.ErrorID(err) == verror.ErrNoAccess.ID:
 			pi.FatalError = true
 		case verror.ErrorID(err) == verror.ErrNotTrusted.ID:
@@ -352,7 +388,6 @@
 		case verror.ErrorID(err) == verror.ErrNoExist.ID:
 			pi.FatalError = true
 		default:
-			const maxBackoff = 5 * time.Minute
 			n := uint(pi.NumAttempts)
 			// This is (10 to 20) * 2^n sec.
 			backoff := time.Duration((10+rand.Intn(10))<<n) * time.Second
@@ -362,6 +397,9 @@
 			pi.NextAttempt = time.Now().Add(backoff)
 			p.queue.Insert(pi.NextAttempt, msgId)
 		}
+		if pi.FatalError || pi.NumAttempts > 2 {
+			p.errors.Incr(1)
+		}
 		if err := p.store.SetPeerInfo(ctx, msgId, p.peerId, pi); err != nil {
 			ctx.Infof("store.SetPeerInfo failed: %v", err)
 		}
diff --git a/internal/rateacl.go b/internal/rateacl.go
index 2439309..c882199 100644
--- a/internal/rateacl.go
+++ b/internal/rateacl.go
@@ -46,6 +46,10 @@
 		if !r[i].Acl.Includes(blessings...) {
 			continue
 		}
+		if r[i].Limit == 0 {
+			ctx.Infof("No permissions for %v matched by %v", r[i].Acl, blessings)
+			break
+		}
 		if rate := r[i].c.Rate1m(); rate >= float64(r[i].Limit) {
 			ctx.Infof("Rate limit exceeded for %v matched by %v, %f >= %f", r[i].Acl, blessings, rate, r[i].Limit)
 			return ifc.NewErrRateLimitExceeded(ctx, r[i].Limit)