blob: a75275b05ddbd1218ac91f0aeee53bcdffe3b043 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package vsync
import (
"time"
"v.io/v23/context"
"v.io/v23/verror"
"v.io/x/lib/vlog"
"v.io/x/ref/services/syncbase/server/interfaces"
)
// syncer wakes up every peerSyncInterval to do work: (1) Refresh memberView if
// needed and pick a peer from all the known remote peers to sync with. (2) Act
// as an initiator and sync syncgroup metadata for all common syncgroups with
// the chosen peer (getting updates from the remote peer, detecting and
// resolving conflicts). (3) Act as an initiator and sync data corresponding to
// all common syncgroups across all Databases with the chosen peer. (4) Fetch
// any queued blobs. (5) Transfer ownership of blobs if needed. (6) Act as a
// syncgroup publisher to publish pending syncgroups. (6) Garbage collect older
// generations.
//
// TODO(hpucha): Currently only does initiation. Add rest.
func (s *syncService) syncer(ctx *context.T) {
defer s.pending.Done()
ticker := time.NewTicker(peerSyncInterval)
defer ticker.Stop()
for !s.isClosed() {
select {
case <-ticker.C:
if s.isClosed() {
break
}
s.syncerWork(ctx)
case <-s.closed:
break
}
}
vlog.VI(1).Info("sync: syncer: channel closed, stop work and exit")
}
func (s *syncService) syncerWork(ctx *context.T) {
// TODO(hpucha): Cut a gen for the responder even if there is no
// one to initiate to?
// Do work.
attemptTs := time.Now()
// TODO(hpucha): Change the peer passed into initiator and clock to be
// just one name since we now have a parallel ping stage.
peer, err := s.pm.pickPeer(ctx)
if err != nil {
return
}
err = s.syncVClock(ctx, peer)
// Abort syncing if there is a connection error with peer.
if verror.ErrorID(err) != interfaces.ErrConnFail.ID {
err = s.getDeltasFromPeer(ctx, peer)
}
s.pm.updatePeerFromSyncer(ctx, peer, attemptTs, verror.ErrorID(err) == interfaces.ErrConnFail.ID)
}