Merge "syncbase/vsync: Adding functionality to advertise syncbase via discovery and support for endpoints learned via neighborhood in the initiator."
diff --git a/services/syncbase/server/interfaces/sync.vdl b/services/syncbase/server/interfaces/sync.vdl
index d144928..823a2b5 100644
--- a/services/syncbase/server/interfaces/sync.vdl
+++ b/services/syncbase/server/interfaces/sync.vdl
@@ -78,5 +78,6 @@
error (
DupSyncgroupPublish(name string) {"en": "duplicate publish on syncgroup: {name}"}
+ ConnFail() {"en": "connection to peer failed{:_}"}
BrokenCrConnection() {"en": "CrConnection stream to application does not exist or is broken."}
)
diff --git a/services/syncbase/server/interfaces/sync.vdl.go b/services/syncbase/server/interfaces/sync.vdl.go
index a413d60..efe5af5 100644
--- a/services/syncbase/server/interfaces/sync.vdl.go
+++ b/services/syncbase/server/interfaces/sync.vdl.go
@@ -24,11 +24,13 @@
var (
ErrDupSyncgroupPublish = verror.Register("v.io/x/ref/services/syncbase/server/interfaces.DupSyncgroupPublish", verror.NoRetry, "{1:}{2:} duplicate publish on syncgroup: {3}")
+ ErrConnFail = verror.Register("v.io/x/ref/services/syncbase/server/interfaces.ConnFail", verror.NoRetry, "{1:}{2:} connection to peer failed{:_}")
ErrBrokenCrConnection = verror.Register("v.io/x/ref/services/syncbase/server/interfaces.BrokenCrConnection", verror.NoRetry, "{1:}{2:} CrConnection stream to application does not exist or is broken.")
)
func init() {
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrDupSyncgroupPublish.ID), "{1:}{2:} duplicate publish on syncgroup: {3}")
+ i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrConnFail.ID), "{1:}{2:} connection to peer failed{:_}")
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrBrokenCrConnection.ID), "{1:}{2:} CrConnection stream to application does not exist or is broken.")
}
@@ -37,6 +39,11 @@
return verror.New(ErrDupSyncgroupPublish, ctx, name)
}
+// NewErrConnFail returns an error with the ErrConnFail ID.
+func NewErrConnFail(ctx *context.T) error {
+ return verror.New(ErrConnFail, ctx)
+}
+
// NewErrBrokenCrConnection returns an error with the ErrBrokenCrConnection ID.
func NewErrBrokenCrConnection(ctx *context.T) error {
return verror.New(ErrBrokenCrConnection, ctx)
diff --git a/services/syncbase/vsync/clock.go b/services/syncbase/vsync/clock.go
index 4076616..cddd285 100644
--- a/services/syncbase/vsync/clock.go
+++ b/services/syncbase/vsync/clock.go
@@ -72,31 +72,36 @@
// syncClock syncs the syncbase clock with peer's syncbase clock.
// TODO(jlodhia): Refactor the mount table entry search code based on the
// unified solution for looking up peer once it exists.
-func (s *syncService) syncClock(ctx *context.T, peer string) {
- vlog.VI(2).Infof("sync: syncClock: begin: contacting peer %s", peer)
- defer vlog.VI(2).Infof("sync: syncClock: end: contacting peer %s", peer)
+func (s *syncService) syncClock(ctx *context.T, peer connInfo) error {
+ vlog.VI(2).Infof("sync: syncClock: begin: contacting peer %v", peer)
+ defer vlog.VI(2).Infof("sync: syncClock: end: contacting peer %v", peer)
- info := s.copyMemberInfo(ctx, peer)
+ info := s.copyMemberInfo(ctx, peer.relName)
if info == nil {
- vlog.Fatalf("sync: syncClock: missing information in member view for %q", peer)
+ vlog.Fatalf("sync: syncClock: missing information in member view for %v", peer)
}
- // Preferred mount tables for this peer.
- if len(info.mtTables) < 1 {
- vlog.Errorf("sync: syncClock: no mount tables found to connect to peer %s", peer)
- return
+ if len(info.mtTables) < 1 && peer.addr == "" {
+ vlog.Errorf("sync: syncClock: no mount tables or endpoint found to connect to peer %v", peer)
+ return verror.New(verror.ErrInternal, ctx, peer.relName, peer.addr, "no mount tables or endpoint found")
}
+
+ if peer.addr != "" {
+ vlog.VI(4).Infof("sync: syncClock: trying neighborhood addr for peer %v", peer)
+
+ absName := naming.Join(peer.addr, util.SyncbaseSuffix)
+ return syncWithPeer(ctx, s.vclock, absName, s.name)
+ }
+
for mt, _ := range info.mtTables {
- absName := naming.Join(mt, peer, util.SyncbaseSuffix)
- if err := syncWithPeer(ctx, s.vclock, absName, s.name); err == nil {
- return
- } else if (verror.ErrorID(err) == verror.ErrNoExist.ID) || (verror.ErrorID(err) == verror.ErrInternal.ID) {
- vlog.Errorf("sync: syncClock: error returned by peer %s: %v", peer, err)
- return
+ absName := naming.Join(mt, peer.relName, util.SyncbaseSuffix)
+ if err := syncWithPeer(ctx, s.vclock, absName, s.name); verror.ErrorID(err) != interfaces.ErrConnFail.ID {
+ return err
}
}
- vlog.Errorf("sync: syncClock: couldn't connect to peer %s", peer)
- return
+
+ vlog.Errorf("sync: syncClock: couldn't connect to peer %v", peer)
+ return verror.New(interfaces.ErrConnFail, ctx, peer.relName, peer.addr, "all mount tables failed")
}
// syncWithPeer tries to sync local clock with peer syncbase clock.
@@ -156,8 +161,11 @@
if commitErr := tx.Commit(); commitErr != nil {
vlog.Errorf("sync: syncClock: error while commiting tx: %v", commitErr)
}
+ } else if (verror.ErrorID(reqErr) == verror.ErrNoExist.ID) || (verror.ErrorID(reqErr) == verror.ErrInternal.ID) {
+ vlog.Errorf("sync: syncClock: error returned by peer %s: %v", absPeerName, err)
} else {
- vlog.Errorf("sync: syncClock: received error: %v", reqErr)
+ reqErr = verror.New(interfaces.ErrConnFail, ctx, myName)
+ vlog.Errorf("sync: syncClock: received network error: %v", reqErr)
}
// Return error received while making request if any to the caller.
return reqErr
diff --git a/services/syncbase/vsync/initiator.go b/services/syncbase/vsync/initiator.go
index f65c70f..f6a9bc0 100644
--- a/services/syncbase/vsync/initiator.go
+++ b/services/syncbase/vsync/initiator.go
@@ -48,13 +48,13 @@
// initiation round), the work done by the initiator is idempotent.
//
// TODO(hpucha): Check the idempotence, esp in addNode in DAG.
-func (s *syncService) getDeltas(ctx *context.T, peer string) {
- vlog.VI(2).Infof("sync: getDeltas: begin: contacting peer %s", peer)
- defer vlog.VI(2).Infof("sync: getDeltas: end: contacting peer %s", peer)
+func (s *syncService) getDeltas(ctx *context.T, peer connInfo) error {
+ vlog.VI(2).Infof("sync: getDeltas: begin: contacting peer %v", peer)
+ defer vlog.VI(2).Infof("sync: getDeltas: end: contacting peer %v", peer)
- info := s.copyMemberInfo(ctx, peer)
+ info := s.copyMemberInfo(ctx, peer.relName)
if info == nil {
- vlog.Fatalf("sync: getDeltas: missing information in member view for %q", peer)
+ vlog.Fatalf("sync: getDeltas: missing information in member view for %v", peer)
}
// Preferred mount tables for this peer.
@@ -62,22 +62,24 @@
// Sync each Database that may have syncgroups common with this peer,
// one at a time.
+ var errFinal error // Any error encountered is returned to the caller.
for gdbName := range info.db2sg {
- vlog.VI(4).Infof("sync: getDeltas: started for peer %s db %s", peer, gdbName)
+ vlog.VI(4).Infof("sync: getDeltas: started for peer %v db %s", peer, gdbName)
if len(prfMtTbls) < 1 {
- vlog.Errorf("sync: getDeltas: no mount tables found to connect to peer %s", peer)
- return
+ vlog.Errorf("sync: getDeltas: no mount tables found to connect to peer %v", peer)
+ return verror.New(verror.ErrInternal, ctx, peer.relName, peer.addr, "all mount tables failed")
}
c, err := newInitiationConfig(ctx, s, peer, gdbName, info, prfMtTbls)
if err != nil {
- vlog.Errorf("sync: getDeltas: couldn't initialize initiator config for peer %s, gdb %s, err %v", peer, gdbName, err)
+ vlog.Errorf("sync: getDeltas: couldn't initialize initiator config for peer %v, gdb %s, err %v", peer, gdbName, err)
+ errFinal = err
continue
}
- if err := s.getDBDeltas(ctx, peer, c, true); err == nil {
- if err := s.getDBDeltas(ctx, peer, c, false); err != nil {
+ if err = s.getDBDeltas(ctx, c, true); err == nil {
+ if err = s.getDBDeltas(ctx, c, false); err != nil {
vlog.Errorf("sync: getDeltas: failed for data sync, err %v", err)
}
} else {
@@ -85,18 +87,25 @@
vlog.Errorf("sync: getDeltas: failed for syncgroup sync, err %v", err)
}
+ if verror.ErrorID(err) == interfaces.ErrConnFail.ID {
+ return err
+ } else if err != nil {
+ errFinal = err
+ }
+
// Cache the pruned mount table list for the next Database.
prfMtTbls = c.mtTables
- vlog.VI(4).Infof("sync: getDeltas: done for peer %s db %s", peer, gdbName)
+ vlog.VI(4).Infof("sync: getDeltas: done for peer %v db %s", peer, gdbName)
}
+ return errFinal
}
// getDBDeltas gets the deltas from the chosen peer. If sg flag is set to true,
// it will sync syncgroup metadata. If sg flag is false, it will sync data.
-func (s *syncService) getDBDeltas(ctxIn *context.T, peer string, c *initiationConfig, sg bool) error {
- vlog.VI(2).Infof("sync: getDBDeltas: begin: contacting peer sg %v %s", sg, peer)
- defer vlog.VI(2).Infof("sync: getDBDeltas: end: contacting peer sg %v %s", sg, peer)
+func (s *syncService) getDBDeltas(ctxIn *context.T, c *initiationConfig, sg bool) error {
+ vlog.VI(2).Infof("sync: getDBDeltas: begin: contacting peer sg %v %v", sg, c.peer)
+ defer vlog.VI(2).Infof("sync: getDBDeltas: end: contacting peer sg %v %v", sg, c.peer)
ctx, cancel := context.WithCancel(ctxIn)
// cancel() is idempotent.
@@ -109,7 +118,7 @@
if !sg {
iSt.peerSgInfo(ctx)
if len(iSt.config.sgPfxs) == 0 {
- return verror.New(verror.ErrInternal, ctx, "no syncgroup prefixes found", peer, iSt.config.appName, iSt.config.dbName)
+ return verror.New(verror.ErrInternal, ctx, "no syncgroup prefixes found", c.peer.relName, iSt.config.appName, iSt.config.dbName)
}
}
@@ -130,7 +139,7 @@
// Make contact with the peer.
if !iSt.connectToPeer(ctx) {
- return verror.New(verror.ErrInternal, ctx, "couldn't connect to peer", peer)
+ return verror.New(interfaces.ErrConnFail, ctx, "couldn't connect to peer", c.peer.relName, c.peer.addr)
}
// Obtain deltas from the peer over the network.
@@ -156,7 +165,7 @@
// initiationConfig is the configuration information for a Database in an
// initiation round.
type initiationConfig struct {
- peer string // relative name of the peer to sync with.
+ peer connInfo // connection info of the peer to sync with.
// Mount tables that this peer may have registered with. The first entry
// in this array is the mount table where the peer was successfully
@@ -206,13 +215,12 @@
oldHead string
ancestor string
res *conflictResolution
-
// TODO(jlodhia): Add perms object and version for the row keys for pickNew
}
// newInitiatonConfig creates new initiation config. This will be shared between
// the two sync rounds in the initiation round of a Database.
-func newInitiationConfig(ctx *context.T, s *syncService, peer string, name string, info *memberInfo, mtTables []string) (*initiationConfig, error) {
+func newInitiationConfig(ctx *context.T, s *syncService, peer connInfo, name string, info *memberInfo, mtTables []string) (*initiationConfig, error) {
c := &initiationConfig{}
c.peer = peer
c.mtTables = mtTables
@@ -221,7 +229,7 @@
c.sgIds[id] = struct{}{}
}
if len(c.sgIds) == 0 {
- return nil, verror.New(verror.ErrInternal, ctx, "no syncgroups found", peer, name)
+ return nil, verror.New(verror.ErrInternal, ctx, "no syncgroups found", peer.relName, name)
}
// Note: sgPfxs will be inited when needed by the data sync.
@@ -270,7 +278,7 @@
if err != nil {
continue
}
- if _, ok := sg.Joiners[iSt.config.peer]; !ok {
+ if _, ok := sg.Joiners[iSt.config.peer.relName]; !ok {
// Peer is no longer part of the syncgroup.
continue
}
@@ -413,45 +421,59 @@
return nil
}
-// connectToPeer attempts to connect to the remote peer using the mount tables
-// obtained from all the common syncgroups.
-func (iSt *initiationState) connectToPeer(ctxIn *context.T) bool {
+// connectToPeer attempts to connect to the remote peer using the neighborhood
+// address when specified or the mount tables obtained from all the common
+// syncgroups.
+func (iSt *initiationState) connectToPeer(ctx *context.T) bool {
vlog.VI(4).Infof("sync: connectToPeer: begin")
- if len(iSt.config.mtTables) < 1 {
- vlog.Errorf("sync: connectToPeer: no mount tables found to connect to peer %s, app %s db %s", iSt.config.peer, iSt.config.appName, iSt.config.dbName)
+ if len(iSt.config.mtTables) < 1 && iSt.config.peer.addr == "" {
+ vlog.Errorf("sync: connectToPeer: no mount tables or endpoint found to connect to peer %s, app %s db %s", iSt.config.peer, iSt.config.appName, iSt.config.dbName)
return false
}
+ if iSt.config.peer.addr != "" {
+ absName := naming.Join(iSt.config.peer.addr, util.SyncbaseSuffix)
+ return iSt.connectToPeerInternal(ctx, absName)
+ }
+
for i, mt := range iSt.config.mtTables {
- ctx, cancel := context.WithCancel(ctxIn)
-
- // We start a timer to bound the amount of time we wait to
- // initiate a connection.
- t := time.AfterFunc(connectionTimeOut, cancel)
-
- absName := naming.Join(mt, iSt.config.peer, util.SyncbaseSuffix)
- c := interfaces.SyncClient(absName)
-
- vlog.VI(4).Infof("sync: connectToPeer: trying %v", absName)
-
- var err error
- iSt.stream, err = c.GetDeltas(ctx, iSt.req, iSt.config.sync.name)
- t.Stop()
-
- if err == nil {
- vlog.VI(4).Infof("sync: connectToPeer: established on %s", absName)
-
- // Prune out the unsuccessful mount tables.
- iSt.config.mtTables = iSt.config.mtTables[i:]
+ absName := naming.Join(mt, iSt.config.peer.relName, util.SyncbaseSuffix)
+ if iSt.connectToPeerInternal(ctx, absName) {
return true
}
- // When the RPC is successful, cancelling the parent context
- // will take care of cancelling the child context.
- cancel()
+ // Prune out the unsuccessful mount tables.
+ iSt.config.mtTables = iSt.config.mtTables[i:]
}
iSt.config.mtTables = nil
- vlog.Errorf("sync: connectToPeer: couldn't connect to peer %s", iSt.config.peer)
+
+ vlog.Errorf("sync: connectToPeer: couldn't connect to peer %v", iSt.config.peer)
+ return false
+}
+
+func (iSt *initiationState) connectToPeerInternal(ctxIn *context.T, absName string) bool {
+ ctx, cancel := context.WithCancel(ctxIn)
+
+ // We start a timer to bound the amount of time we wait to
+ // initiate a connection.
+ t := time.AfterFunc(connectionTimeOut, cancel)
+
+ c := interfaces.SyncClient(absName)
+
+ vlog.VI(4).Infof("sync: connectToPeer: trying %v", absName)
+
+ var err error
+ iSt.stream, err = c.GetDeltas(ctx, iSt.req, iSt.config.sync.name)
+ t.Stop()
+
+ if err == nil {
+ vlog.VI(4).Infof("sync: connectToPeer: established on %s", absName)
+ return true
+ }
+
+ // When the RPC is successful, cancelling the parent context
+ // will take care of cancelling the child context.
+ cancel()
return false
}
@@ -653,7 +675,7 @@
}
}
vlog.VI(4).Infof("sync: processBlobRefs: Found blobref %v peer %v, source %v, sgs %v", br, iSt.config.peer, srcPeer, sgIds)
- info := &blobLocInfo{peer: iSt.config.peer, source: srcPeer, sgIds: sgIds}
+ info := &blobLocInfo{peer: iSt.config.peer.relName, source: srcPeer, sgIds: sgIds}
if err := iSt.config.sync.addBlobLocInfo(ctx, br, info); err != nil {
return err
}
diff --git a/services/syncbase/vsync/initiator_test.go b/services/syncbase/vsync/initiator_test.go
index 07552f0..d77bbf5 100644
--- a/services/syncbase/vsync/initiator_test.go
+++ b/services/syncbase/vsync/initiator_test.go
@@ -424,7 +424,7 @@
return svc, nil, cleanup
}
- c, err := newInitiationConfig(nil, s, "b", gdb, info, set.String.ToSlice(info.mtTables))
+ c, err := newInitiationConfig(nil, s, connInfo{relName: "b"}, gdb, info, set.String.ToSlice(info.mtTables))
if err != nil {
t.Fatalf("newInitiationConfig failed with err %v", err)
}
diff --git a/services/syncbase/vsync/sync.go b/services/syncbase/vsync/sync.go
index 60b6b58..c2ccf69 100644
--- a/services/syncbase/vsync/sync.go
+++ b/services/syncbase/vsync/sync.go
@@ -37,10 +37,23 @@
// syncService contains the metadata for the sync module.
type syncService struct {
- // TODO(hpucha): see if "v.io/v23/uniqueid" is a better fit. It is 128 bits.
- id uint64 // globally unique id for this instance of Syncbase.
- name string // name derived from the global id.
- sv interfaces.Service
+ // TODO(hpucha): see if "v.io/v23/uniqueid" is a better fit. It is 128
+ // bits. Another alternative is to derive this from the blessing of
+ // Syncbase. Syncbase can append a uuid to the blessing it is given upon
+ // launch and use its hash as id. Note we cannot use public key since we
+ // want to support key rollover.
+ id uint64 // globally unique id for this instance of Syncbase.
+ name string // name derived from the global id.
+ sv interfaces.Service
+
+ // Root context to be used to create a context for advertising over
+ // neighborhood.
+ ctx *context.T
+
+ // Cancel function for a context derived from the root context when
+ // advertising over neighborhood. This is needed to stop advertising.
+ advCancel context.CancelFunc
+
nameLock sync.Mutex // lock needed to serialize adding and removing of Syncbase names.
// High-level lock to serialize the watcher and the initiator. This lock is
@@ -104,6 +117,9 @@
// Syncbase clock related variables.
vclock *clock.VClock
+
+ // Peer selector for picking a peer to sync with.
+ ps peerSelector
}
// syncDatabase contains the metadata for syncing a database. This struct is
@@ -147,6 +163,7 @@
batches: make(batchSet),
sgPublishQueue: list.New(),
vclock: vclock,
+ ctx: ctx,
}
data := &syncData{}
@@ -280,34 +297,73 @@
// syncbased is restarted so that it can republish itself at the names being
// used in the syncgroups.
func AddNames(ctx *context.T, ss interfaces.SyncServerMethods, svr rpc.Server) error {
- vlog.VI(2).Infof("sync: AddNames:: begin")
- defer vlog.VI(2).Infof("sync: AddNames:: end")
+ vlog.VI(2).Infof("sync: AddNames: begin")
+ defer vlog.VI(2).Infof("sync: AddNames: end")
s := ss.(*syncService)
s.nameLock.Lock()
defer s.nameLock.Unlock()
mInfo := s.copyMemberInfo(ctx, s.name)
- if mInfo == nil {
- vlog.VI(2).Infof("sync: GetNames:: end returning no names")
+ if mInfo == nil || len(mInfo.mtTables) == 0 {
+ vlog.VI(2).Infof("sync: AddNames: end returning no names")
return nil
}
for mt := range mInfo.mtTables {
name := naming.Join(mt, s.name)
if err := svr.AddName(name); err != nil {
+ vlog.VI(2).Infof("sync: AddNames: end returning err %v", err)
return err
}
}
- return nil
+ return s.publishInNeighborhood(svr)
+}
+
+// publishInNeighborhood checks if the Syncbase service is already being
+// advertised over the neighborhood. If not, it begins advertising. The caller
+// of the function is holding nameLock.
+func (s *syncService) publishInNeighborhood(svr rpc.Server) error {
+ // Syncbase is already being advertised.
+ if s.advCancel != nil {
+ return nil
+ }
+
+ ctx, stop := context.WithCancel(s.ctx)
+
+ advertiser := v23.GetDiscovery(ctx)
+ if advertiser == nil {
+ vlog.Fatal("sync: publishInNeighborhood: discovery not initialized.")
+ }
+
+ // TODO(hpucha): For now we grab the current address of the server. This
+ // will be replaced by library support that will take care of roaming.
+ var eps []string
+ for _, ep := range svr.Status().Endpoints {
+ eps = append(eps, ep.Name())
+ }
+
+ sbService := discovery.Service{
+ InstanceUuid: []byte(s.name),
+ InstanceName: s.name,
+ InterfaceName: interfaces.SyncDesc.PkgPath + "/" + interfaces.SyncDesc.Name,
+ Addrs: eps,
+ }
+
+ // Duplicate calls to advertise will return an error.
+ err := advertiser.Advertise(ctx, sbService, nil)
+ if err == nil {
+ s.advCancel = stop
+ }
+ return err
}
// Close waits for spawned sync threads (watcher and initiator) to shut down,
// and closes the local blob store handle.
func Close(ss interfaces.SyncServerMethods) {
- vlog.VI(2).Infof("sync: Close:: begin")
- defer vlog.VI(2).Infof("sync: Close:: end")
+ vlog.VI(2).Infof("sync: Close: begin")
+ defer vlog.VI(2).Infof("sync: Close: end")
s := ss.(*syncService)
close(s.closed)
diff --git a/services/syncbase/vsync/syncer.go b/services/syncbase/vsync/syncer.go
index 2ec041b..6f1c507 100644
--- a/services/syncbase/vsync/syncer.go
+++ b/services/syncbase/vsync/syncer.go
@@ -5,11 +5,13 @@
package vsync
import (
+ "sync"
"time"
"v.io/v23/context"
"v.io/v23/verror"
"v.io/x/lib/vlog"
+ "v.io/x/ref/services/syncbase/server/interfaces"
)
// Policies to pick a peer to sync with.
@@ -17,7 +19,11 @@
// Picks a peer at random from the available set.
selectRandom = iota
- // TODO(hpucha): implement other policies.
+ // Picks a peer based on network availability and available Syncbases
+ // over the neighborhood via discovery.
+ selectNeighborhoodAware
+
+ // TODO(hpucha): implement these policies.
// Picks a peer with most differing generations.
selectMostDiff
@@ -40,6 +46,34 @@
connectionTimeOut = 2 * time.Second
)
+// connInfo holds the information needed to connect to a peer.
+//
+// TODO(hpucha): Add hints to decide if both neighborhood and mount table must
+// be tried. Currently, if the addr is set, only the addr is tried.
+type connInfo struct {
+ // Name of the peer relative to the mount table chosen by the syncgroup
+ // creator.
+ relName string
+ // Network address of the peer if available. For example, this can be
+ // obtained from neighborhood discovery.
+ addr string
+}
+
+// peerSelector defines the interface that a peer selection algorithm must
+// provide.
+type peerSelector interface {
+ // pickPeer picks a Syncbase to sync with.
+ pickPeer(ctx *context.T) (connInfo, error)
+
+ // updatePeerFromSyncer updates information for a peer that the syncer
+ // attempts to connect to.
+ updatePeerFromSyncer(ctx *context.T, peer connInfo, attemptTs time.Time, failed bool) error
+
+ // updatePeerFromResponder updates information for a peer that the
+ // responder responds to.
+ updatePeerFromResponder(ctx *context.T, peer string, connTs time.Time, gv interfaces.GenVector) error
+}
+
// syncer wakes up every peerSyncInterval to do work: (1) Refresh memberView if
// needed and pick a peer from all the known remote peers to sync with. (2) Act
// as an initiator and sync syncgroup metadata for all common syncgroups with
@@ -54,6 +88,7 @@
func (s *syncService) syncer(ctx *context.T) {
defer s.pending.Done()
+ s.newPeerSelector(ctx)
ticker := time.NewTicker(peerSyncInterval)
defer ticker.Stop()
@@ -69,7 +104,6 @@
break
}
}
-
vlog.VI(1).Info("sync: syncer: channel closed, stop work and exit")
}
@@ -77,41 +111,141 @@
// TODO(hpucha): Cut a gen for the responder even if there is no
// one to initiate to?
- peer, err := s.pickPeer(ctx)
+ // Do work.
+ attemptTs := time.Now()
+ peer, err := s.ps.pickPeer(ctx)
if err != nil {
return
}
- s.syncClock(ctx, peer)
+ err = s.syncClock(ctx, peer)
+ // Abort syncing if there is a connection error with peer.
+ if verror.ErrorID(err) != interfaces.ErrConnFail.ID {
+ err = s.getDeltas(ctx, peer)
+ }
- // Sync syncgroup metadata and data.
- s.getDeltas(ctx, peer)
+ s.ps.updatePeerFromSyncer(ctx, peer, attemptTs, verror.ErrorID(err) == interfaces.ErrConnFail.ID)
}
////////////////////////////////////////
// Peer selection policies.
-// pickPeer picks a Syncbase to sync with.
-func (s *syncService) pickPeer(ctx *context.T) (string, error) {
+func (s *syncService) newPeerSelector(ctx *context.T) error {
switch peerSelectionPolicy {
case selectRandom:
- members := s.getMembers(ctx)
- // Remove myself from the set.
- delete(members, s.name)
- if len(members) == 0 {
- return "", verror.New(verror.ErrInternal, ctx, "no useful peer")
- }
-
- // Pick a peer at random.
- ind := randIntn(len(members))
- for m := range members {
- if ind == 0 {
- return m, nil
- }
- ind--
- }
- return "", verror.New(verror.ErrInternal, ctx, "random selection didn't succeed")
+ s.ps = &randomPeerSelector{s: s}
+ return nil
+ case selectNeighborhoodAware:
+ s.ps = &neighborhoodAwarePeerSelector{s: s}
+ return nil
default:
- return "", verror.New(verror.ErrInternal, ctx, "unknown peer selection policy")
+ return verror.New(verror.ErrInternal, ctx, "unknown peer selection policy")
}
}
+
+////////////////////////////////////////
+// Random selector.
+
+type randomPeerSelector struct {
+ s *syncService
+}
+
+func (ps *randomPeerSelector) pickPeer(ctx *context.T) (connInfo, error) {
+ var peer connInfo
+
+ members := ps.s.getMembers(ctx)
+ // Remove myself from the set.
+ delete(members, ps.s.name)
+ if len(members) == 0 {
+ return peer, verror.New(verror.ErrInternal, ctx, "no useful peer")
+ }
+
+ // Pick a peer at random.
+ ind := randIntn(len(members))
+ for m := range members {
+ if ind == 0 {
+ peer.relName = m
+ return peer, nil
+ }
+ ind--
+ }
+ return peer, verror.New(verror.ErrInternal, ctx, "random selection didn't succeed")
+}
+
+func (ps *randomPeerSelector) updatePeerFromSyncer(ctx *context.T, peer connInfo, attemptTs time.Time, failed bool) error {
+ // Random selector does not care about this information.
+ return nil
+}
+
+func (ps *randomPeerSelector) updatePeerFromResponder(ctx *context.T, peer string, connTs time.Time, gv interfaces.GenVector) error {
+ // Random selector does not care about this information.
+ return nil
+}
+
+////////////////////////////////////////
+// NeighborhoodAware selector.
+
+// peerSyncInfo is the running statistics collected per peer, for a peer which
+// syncs with this node or with which this node syncs with.
+type peerSyncInfo struct {
+ // Number of continuous failures noticed when attempting to connect with
+ // this peer, either via its advertised mount table or via
+ // neighborhood. These counters are reset when the connection to the
+ // peer succeeds.
+ numFailuresMountTable uint64
+ numFailuresNeighborhood uint64
+
+ // The most recent timestamp when a connection to this peer was attempted.
+ attemptTs time.Time
+ // The most recent timestamp when a connection to this peer succeeded.
+ successTs time.Time
+ // The most recent timestamp when this peer synced with this node.
+ fromTs time.Time
+ // Map of database names and their corresponding generation vectors for
+ // data and syncgroups.
+ gvs map[string]interfaces.GenVector
+}
+
+type neighborhoodAwarePeerSelector struct {
+ s *syncService
+ // In-memory cache of information relevant to syncing with a peer. This
+ // information could potentially be used in peer selection.
+ peerTbl map[string]*peerSyncInfo
+ peerTblLock sync.RWMutex
+}
+
+func (ps *neighborhoodAwarePeerSelector) pickPeer(ctx *context.T) (connInfo, error) {
+ var peer connInfo
+ return peer, nil
+}
+
+func (ps *neighborhoodAwarePeerSelector) updatePeerFromSyncer(ctx *context.T, peer connInfo, attemptTs time.Time, failed bool) error {
+ ps.peerTblLock.Lock()
+ defer ps.peerTblLock.Unlock()
+
+ info, ok := ps.peerTbl[peer.relName]
+ if !ok {
+ info = &peerSyncInfo{}
+ ps.peerTbl[peer.relName] = info
+ }
+
+ info.attemptTs = attemptTs
+ if !failed {
+ info.numFailuresMountTable = 0
+ info.numFailuresNeighborhood = 0
+ info.successTs = time.Now()
+ return nil
+ }
+
+ if peer.addr != "" {
+ info.numFailuresNeighborhood++
+ } else {
+ info.numFailuresMountTable++
+ }
+
+ return nil
+}
+
+func (ps *neighborhoodAwarePeerSelector) updatePeerFromResponder(ctx *context.T, peer string, connTs time.Time, gv interfaces.GenVector) error {
+ return nil
+}
diff --git a/services/syncbase/vsync/syncgroup.go b/services/syncbase/vsync/syncgroup.go
index 5433869..3d1f5fc 100644
--- a/services/syncbase/vsync/syncgroup.go
+++ b/services/syncbase/vsync/syncgroup.go
@@ -1144,9 +1144,7 @@
}
}
- // TODO(hpucha): Do we have to publish in neighborhood explicitly?
-
- return nil
+ return ss.publishInNeighborhood(call.Server())
}
func (sd *syncDatabase) joinSyncgroupAtAdmin(ctx *context.T, call rpc.ServerCall, sgName, name string, myInfo wire.SyncgroupMemberInfo) (interfaces.Syncgroup, string, interfaces.PrefixGenVector, error) {