Merge "syncbase/vsync: Adding functionality to advertise syncbase via discovery and support for endpoints learned via neighborhood in the initiator."
diff --git a/services/syncbase/server/interfaces/sync.vdl b/services/syncbase/server/interfaces/sync.vdl
index d144928..823a2b5 100644
--- a/services/syncbase/server/interfaces/sync.vdl
+++ b/services/syncbase/server/interfaces/sync.vdl
@@ -78,5 +78,6 @@
 
 error (
 	DupSyncgroupPublish(name string) {"en": "duplicate publish on syncgroup: {name}"}
+	ConnFail() {"en": "connection to peer failed{:_}"}
 	BrokenCrConnection() {"en": "CrConnection stream to application does not exist or is broken."}
 )
diff --git a/services/syncbase/server/interfaces/sync.vdl.go b/services/syncbase/server/interfaces/sync.vdl.go
index a413d60..efe5af5 100644
--- a/services/syncbase/server/interfaces/sync.vdl.go
+++ b/services/syncbase/server/interfaces/sync.vdl.go
@@ -24,11 +24,13 @@
 
 var (
 	ErrDupSyncgroupPublish = verror.Register("v.io/x/ref/services/syncbase/server/interfaces.DupSyncgroupPublish", verror.NoRetry, "{1:}{2:} duplicate publish on syncgroup: {3}")
+	ErrConnFail            = verror.Register("v.io/x/ref/services/syncbase/server/interfaces.ConnFail", verror.NoRetry, "{1:}{2:} connection to peer failed{:_}")
 	ErrBrokenCrConnection  = verror.Register("v.io/x/ref/services/syncbase/server/interfaces.BrokenCrConnection", verror.NoRetry, "{1:}{2:} CrConnection stream to application does not exist or is broken.")
 )
 
 func init() {
 	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrDupSyncgroupPublish.ID), "{1:}{2:} duplicate publish on syncgroup: {3}")
+	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrConnFail.ID), "{1:}{2:} connection to peer failed{:_}")
 	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrBrokenCrConnection.ID), "{1:}{2:} CrConnection stream to application does not exist or is broken.")
 }
 
@@ -37,6 +39,11 @@
 	return verror.New(ErrDupSyncgroupPublish, ctx, name)
 }
 
+// NewErrConnFail returns an error with the ErrConnFail ID.
+func NewErrConnFail(ctx *context.T) error {
+	return verror.New(ErrConnFail, ctx)
+}
+
 // NewErrBrokenCrConnection returns an error with the ErrBrokenCrConnection ID.
 func NewErrBrokenCrConnection(ctx *context.T) error {
 	return verror.New(ErrBrokenCrConnection, ctx)
diff --git a/services/syncbase/vsync/clock.go b/services/syncbase/vsync/clock.go
index 4076616..cddd285 100644
--- a/services/syncbase/vsync/clock.go
+++ b/services/syncbase/vsync/clock.go
@@ -72,31 +72,36 @@
 // syncClock syncs the syncbase clock with peer's syncbase clock.
 // TODO(jlodhia): Refactor the mount table entry search code based on the
 // unified solution for looking up peer once it exists.
-func (s *syncService) syncClock(ctx *context.T, peer string) {
-	vlog.VI(2).Infof("sync: syncClock: begin: contacting peer %s", peer)
-	defer vlog.VI(2).Infof("sync: syncClock: end: contacting peer %s", peer)
+func (s *syncService) syncClock(ctx *context.T, peer connInfo) error {
+	vlog.VI(2).Infof("sync: syncClock: begin: contacting peer %v", peer)
+	defer vlog.VI(2).Infof("sync: syncClock: end: contacting peer %v", peer)
 
-	info := s.copyMemberInfo(ctx, peer)
+	info := s.copyMemberInfo(ctx, peer.relName)
 	if info == nil {
-		vlog.Fatalf("sync: syncClock: missing information in member view for %q", peer)
+		vlog.Fatalf("sync: syncClock: missing information in member view for %v", peer)
 	}
 
-	// Preferred mount tables for this peer.
-	if len(info.mtTables) < 1 {
-		vlog.Errorf("sync: syncClock: no mount tables found to connect to peer %s", peer)
-		return
+	if len(info.mtTables) < 1 && peer.addr == "" {
+		vlog.Errorf("sync: syncClock: no mount tables or endpoint found to connect to peer %v", peer)
+		return verror.New(verror.ErrInternal, ctx, peer.relName, peer.addr, "no mount tables or endpoint found")
 	}
+
+	if peer.addr != "" {
+		vlog.VI(4).Infof("sync: syncClock: trying neighborhood addr for peer %v", peer)
+
+		absName := naming.Join(peer.addr, util.SyncbaseSuffix)
+		return syncWithPeer(ctx, s.vclock, absName, s.name)
+	}
+
 	for mt, _ := range info.mtTables {
-		absName := naming.Join(mt, peer, util.SyncbaseSuffix)
-		if err := syncWithPeer(ctx, s.vclock, absName, s.name); err == nil {
-			return
-		} else if (verror.ErrorID(err) == verror.ErrNoExist.ID) || (verror.ErrorID(err) == verror.ErrInternal.ID) {
-			vlog.Errorf("sync: syncClock: error returned by peer %s: %v", peer, err)
-			return
+		absName := naming.Join(mt, peer.relName, util.SyncbaseSuffix)
+		if err := syncWithPeer(ctx, s.vclock, absName, s.name); verror.ErrorID(err) != interfaces.ErrConnFail.ID {
+			return err
 		}
 	}
-	vlog.Errorf("sync: syncClock: couldn't connect to peer %s", peer)
-	return
+
+	vlog.Errorf("sync: syncClock: couldn't connect to peer %v", peer)
+	return verror.New(interfaces.ErrConnFail, ctx, peer.relName, peer.addr, "all mount tables failed")
 }
 
 // syncWithPeer tries to sync local clock with peer syncbase clock.
@@ -156,8 +161,11 @@
 		if commitErr := tx.Commit(); commitErr != nil {
 			vlog.Errorf("sync: syncClock: error while commiting tx: %v", commitErr)
 		}
+	} else if (verror.ErrorID(reqErr) == verror.ErrNoExist.ID) || (verror.ErrorID(reqErr) == verror.ErrInternal.ID) {
+		vlog.Errorf("sync: syncClock: error returned by peer %s: %v", absPeerName, err)
 	} else {
-		vlog.Errorf("sync: syncClock: received error: %v", reqErr)
+		reqErr = verror.New(interfaces.ErrConnFail, ctx, myName)
+		vlog.Errorf("sync: syncClock: received network error: %v", reqErr)
 	}
 	// Return error received while making request if any to the caller.
 	return reqErr
diff --git a/services/syncbase/vsync/initiator.go b/services/syncbase/vsync/initiator.go
index f65c70f..f6a9bc0 100644
--- a/services/syncbase/vsync/initiator.go
+++ b/services/syncbase/vsync/initiator.go
@@ -48,13 +48,13 @@
 // initiation round), the work done by the initiator is idempotent.
 //
 // TODO(hpucha): Check the idempotence, esp in addNode in DAG.
-func (s *syncService) getDeltas(ctx *context.T, peer string) {
-	vlog.VI(2).Infof("sync: getDeltas: begin: contacting peer %s", peer)
-	defer vlog.VI(2).Infof("sync: getDeltas: end: contacting peer %s", peer)
+func (s *syncService) getDeltas(ctx *context.T, peer connInfo) error {
+	vlog.VI(2).Infof("sync: getDeltas: begin: contacting peer %v", peer)
+	defer vlog.VI(2).Infof("sync: getDeltas: end: contacting peer %v", peer)
 
-	info := s.copyMemberInfo(ctx, peer)
+	info := s.copyMemberInfo(ctx, peer.relName)
 	if info == nil {
-		vlog.Fatalf("sync: getDeltas: missing information in member view for %q", peer)
+		vlog.Fatalf("sync: getDeltas: missing information in member view for %v", peer)
 	}
 
 	// Preferred mount tables for this peer.
@@ -62,22 +62,24 @@
 
 	// Sync each Database that may have syncgroups common with this peer,
 	// one at a time.
+	var errFinal error // Any error encountered is returned to the caller.
 	for gdbName := range info.db2sg {
-		vlog.VI(4).Infof("sync: getDeltas: started for peer %s db %s", peer, gdbName)
+		vlog.VI(4).Infof("sync: getDeltas: started for peer %v db %s", peer, gdbName)
 
 		if len(prfMtTbls) < 1 {
-			vlog.Errorf("sync: getDeltas: no mount tables found to connect to peer %s", peer)
-			return
+			vlog.Errorf("sync: getDeltas: no mount tables found to connect to peer %v", peer)
+			return verror.New(verror.ErrInternal, ctx, peer.relName, peer.addr, "all mount tables failed")
 		}
 
 		c, err := newInitiationConfig(ctx, s, peer, gdbName, info, prfMtTbls)
 		if err != nil {
-			vlog.Errorf("sync: getDeltas: couldn't initialize initiator config for peer %s, gdb %s, err %v", peer, gdbName, err)
+			vlog.Errorf("sync: getDeltas: couldn't initialize initiator config for peer %v, gdb %s, err %v", peer, gdbName, err)
+			errFinal = err
 			continue
 		}
 
-		if err := s.getDBDeltas(ctx, peer, c, true); err == nil {
-			if err := s.getDBDeltas(ctx, peer, c, false); err != nil {
+		if err = s.getDBDeltas(ctx, c, true); err == nil {
+			if err = s.getDBDeltas(ctx, c, false); err != nil {
 				vlog.Errorf("sync: getDeltas: failed for data sync, err %v", err)
 			}
 		} else {
@@ -85,18 +87,25 @@
 			vlog.Errorf("sync: getDeltas: failed for syncgroup sync, err %v", err)
 		}
 
+		if verror.ErrorID(err) == interfaces.ErrConnFail.ID {
+			return err
+		} else if err != nil {
+			errFinal = err
+		}
+
 		// Cache the pruned mount table list for the next Database.
 		prfMtTbls = c.mtTables
 
-		vlog.VI(4).Infof("sync: getDeltas: done for peer %s db %s", peer, gdbName)
+		vlog.VI(4).Infof("sync: getDeltas: done for peer %v db %s", peer, gdbName)
 	}
+	return errFinal
 }
 
 // getDBDeltas gets the deltas from the chosen peer. If sg flag is set to true,
 // it will sync syncgroup metadata. If sg flag is false, it will sync data.
-func (s *syncService) getDBDeltas(ctxIn *context.T, peer string, c *initiationConfig, sg bool) error {
-	vlog.VI(2).Infof("sync: getDBDeltas: begin: contacting peer sg %v %s", sg, peer)
-	defer vlog.VI(2).Infof("sync: getDBDeltas: end: contacting peer sg %v %s", sg, peer)
+func (s *syncService) getDBDeltas(ctxIn *context.T, c *initiationConfig, sg bool) error {
+	vlog.VI(2).Infof("sync: getDBDeltas: begin: contacting peer sg %v %v", sg, c.peer)
+	defer vlog.VI(2).Infof("sync: getDBDeltas: end: contacting peer sg %v %v", sg, c.peer)
 
 	ctx, cancel := context.WithCancel(ctxIn)
 	// cancel() is idempotent.
@@ -109,7 +118,7 @@
 	if !sg {
 		iSt.peerSgInfo(ctx)
 		if len(iSt.config.sgPfxs) == 0 {
-			return verror.New(verror.ErrInternal, ctx, "no syncgroup prefixes found", peer, iSt.config.appName, iSt.config.dbName)
+			return verror.New(verror.ErrInternal, ctx, "no syncgroup prefixes found", c.peer.relName, iSt.config.appName, iSt.config.dbName)
 		}
 	}
 
@@ -130,7 +139,7 @@
 
 	// Make contact with the peer.
 	if !iSt.connectToPeer(ctx) {
-		return verror.New(verror.ErrInternal, ctx, "couldn't connect to peer", peer)
+		return verror.New(interfaces.ErrConnFail, ctx, "couldn't connect to peer", c.peer.relName, c.peer.addr)
 	}
 
 	// Obtain deltas from the peer over the network.
@@ -156,7 +165,7 @@
 // initiationConfig is the configuration information for a Database in an
 // initiation round.
 type initiationConfig struct {
-	peer string // relative name of the peer to sync with.
+	peer connInfo // connection info of the peer to sync with.
 
 	// Mount tables that this peer may have registered with. The first entry
 	// in this array is the mount table where the peer was successfully
@@ -206,13 +215,12 @@
 	oldHead     string
 	ancestor    string
 	res         *conflictResolution
-
 	// TODO(jlodhia): Add perms object and version for the row keys for pickNew
 }
 
 // newInitiatonConfig creates new initiation config. This will be shared between
 // the two sync rounds in the initiation round of a Database.
-func newInitiationConfig(ctx *context.T, s *syncService, peer string, name string, info *memberInfo, mtTables []string) (*initiationConfig, error) {
+func newInitiationConfig(ctx *context.T, s *syncService, peer connInfo, name string, info *memberInfo, mtTables []string) (*initiationConfig, error) {
 	c := &initiationConfig{}
 	c.peer = peer
 	c.mtTables = mtTables
@@ -221,7 +229,7 @@
 		c.sgIds[id] = struct{}{}
 	}
 	if len(c.sgIds) == 0 {
-		return nil, verror.New(verror.ErrInternal, ctx, "no syncgroups found", peer, name)
+		return nil, verror.New(verror.ErrInternal, ctx, "no syncgroups found", peer.relName, name)
 	}
 	// Note: sgPfxs will be inited when needed by the data sync.
 
@@ -270,7 +278,7 @@
 		if err != nil {
 			continue
 		}
-		if _, ok := sg.Joiners[iSt.config.peer]; !ok {
+		if _, ok := sg.Joiners[iSt.config.peer.relName]; !ok {
 			// Peer is no longer part of the syncgroup.
 			continue
 		}
@@ -413,45 +421,59 @@
 	return nil
 }
 
-// connectToPeer attempts to connect to the remote peer using the mount tables
-// obtained from all the common syncgroups.
-func (iSt *initiationState) connectToPeer(ctxIn *context.T) bool {
+// connectToPeer attempts to connect to the remote peer using the neighborhood
+// address when specified or the mount tables obtained from all the common
+// syncgroups.
+func (iSt *initiationState) connectToPeer(ctx *context.T) bool {
 	vlog.VI(4).Infof("sync: connectToPeer: begin")
 
-	if len(iSt.config.mtTables) < 1 {
-		vlog.Errorf("sync: connectToPeer: no mount tables found to connect to peer %s, app %s db %s", iSt.config.peer, iSt.config.appName, iSt.config.dbName)
+	if len(iSt.config.mtTables) < 1 && iSt.config.peer.addr == "" {
+		vlog.Errorf("sync: connectToPeer: no mount tables or endpoint found to connect to peer %s, app %s db %s", iSt.config.peer, iSt.config.appName, iSt.config.dbName)
 		return false
 	}
 
+	if iSt.config.peer.addr != "" {
+		absName := naming.Join(iSt.config.peer.addr, util.SyncbaseSuffix)
+		return iSt.connectToPeerInternal(ctx, absName)
+	}
+
 	for i, mt := range iSt.config.mtTables {
-		ctx, cancel := context.WithCancel(ctxIn)
-
-		// We start a timer to bound the amount of time we wait to
-		// initiate a connection.
-		t := time.AfterFunc(connectionTimeOut, cancel)
-
-		absName := naming.Join(mt, iSt.config.peer, util.SyncbaseSuffix)
-		c := interfaces.SyncClient(absName)
-
-		vlog.VI(4).Infof("sync: connectToPeer: trying %v", absName)
-
-		var err error
-		iSt.stream, err = c.GetDeltas(ctx, iSt.req, iSt.config.sync.name)
-		t.Stop()
-
-		if err == nil {
-			vlog.VI(4).Infof("sync: connectToPeer: established on %s", absName)
-
-			// Prune out the unsuccessful mount tables.
-			iSt.config.mtTables = iSt.config.mtTables[i:]
+		absName := naming.Join(mt, iSt.config.peer.relName, util.SyncbaseSuffix)
+		if iSt.connectToPeerInternal(ctx, absName) {
 			return true
 		}
-		// When the RPC is successful, cancelling the parent context
-		// will take care of cancelling the child context.
-		cancel()
+		// Prune out the unsuccessful mount tables.
+		iSt.config.mtTables = iSt.config.mtTables[i:]
 	}
 	iSt.config.mtTables = nil
-	vlog.Errorf("sync: connectToPeer: couldn't connect to peer %s", iSt.config.peer)
+
+	vlog.Errorf("sync: connectToPeer: couldn't connect to peer %v", iSt.config.peer)
+	return false
+}
+
+func (iSt *initiationState) connectToPeerInternal(ctxIn *context.T, absName string) bool {
+	ctx, cancel := context.WithCancel(ctxIn)
+
+	// We start a timer to bound the amount of time we wait to
+	// initiate a connection.
+	t := time.AfterFunc(connectionTimeOut, cancel)
+
+	c := interfaces.SyncClient(absName)
+
+	vlog.VI(4).Infof("sync: connectToPeer: trying %v", absName)
+
+	var err error
+	iSt.stream, err = c.GetDeltas(ctx, iSt.req, iSt.config.sync.name)
+	t.Stop()
+
+	if err == nil {
+		vlog.VI(4).Infof("sync: connectToPeer: established on %s", absName)
+		return true
+	}
+
+	// When the RPC is successful, cancelling the parent context
+	// will take care of cancelling the child context.
+	cancel()
 	return false
 }
 
@@ -653,7 +675,7 @@
 			}
 		}
 		vlog.VI(4).Infof("sync: processBlobRefs: Found blobref %v peer %v, source %v, sgs %v", br, iSt.config.peer, srcPeer, sgIds)
-		info := &blobLocInfo{peer: iSt.config.peer, source: srcPeer, sgIds: sgIds}
+		info := &blobLocInfo{peer: iSt.config.peer.relName, source: srcPeer, sgIds: sgIds}
 		if err := iSt.config.sync.addBlobLocInfo(ctx, br, info); err != nil {
 			return err
 		}
diff --git a/services/syncbase/vsync/initiator_test.go b/services/syncbase/vsync/initiator_test.go
index 07552f0..d77bbf5 100644
--- a/services/syncbase/vsync/initiator_test.go
+++ b/services/syncbase/vsync/initiator_test.go
@@ -424,7 +424,7 @@
 		return svc, nil, cleanup
 	}
 
-	c, err := newInitiationConfig(nil, s, "b", gdb, info, set.String.ToSlice(info.mtTables))
+	c, err := newInitiationConfig(nil, s, connInfo{relName: "b"}, gdb, info, set.String.ToSlice(info.mtTables))
 	if err != nil {
 		t.Fatalf("newInitiationConfig failed with err %v", err)
 	}
diff --git a/services/syncbase/vsync/sync.go b/services/syncbase/vsync/sync.go
index 60b6b58..c2ccf69 100644
--- a/services/syncbase/vsync/sync.go
+++ b/services/syncbase/vsync/sync.go
@@ -37,10 +37,23 @@
 
 // syncService contains the metadata for the sync module.
 type syncService struct {
-	// TODO(hpucha): see if "v.io/v23/uniqueid" is a better fit. It is 128 bits.
-	id       uint64 // globally unique id for this instance of Syncbase.
-	name     string // name derived from the global id.
-	sv       interfaces.Service
+	// TODO(hpucha): see if "v.io/v23/uniqueid" is a better fit. It is 128
+	// bits. Another alternative is to derive this from the blessing of
+	// Syncbase. Syncbase can append a uuid to the blessing it is given upon
+	// launch and use its hash as id. Note we cannot use public key since we
+	// want to support key rollover.
+	id   uint64 // globally unique id for this instance of Syncbase.
+	name string // name derived from the global id.
+	sv   interfaces.Service
+
+	// Root context to be used to create a context for advertising over
+	// neighborhood.
+	ctx *context.T
+
+	// Cancel function for a context derived from the root context when
+	// advertising over neighborhood. This is needed to stop advertising.
+	advCancel context.CancelFunc
+
 	nameLock sync.Mutex // lock needed to serialize adding and removing of Syncbase names.
 
 	// High-level lock to serialize the watcher and the initiator. This lock is
@@ -104,6 +117,9 @@
 
 	// Syncbase clock related variables.
 	vclock *clock.VClock
+
+	// Peer selector for picking a peer to sync with.
+	ps peerSelector
 }
 
 // syncDatabase contains the metadata for syncing a database. This struct is
@@ -147,6 +163,7 @@
 		batches:        make(batchSet),
 		sgPublishQueue: list.New(),
 		vclock:         vclock,
+		ctx:            ctx,
 	}
 
 	data := &syncData{}
@@ -280,34 +297,73 @@
 // syncbased is restarted so that it can republish itself at the names being
 // used in the syncgroups.
 func AddNames(ctx *context.T, ss interfaces.SyncServerMethods, svr rpc.Server) error {
-	vlog.VI(2).Infof("sync: AddNames:: begin")
-	defer vlog.VI(2).Infof("sync: AddNames:: end")
+	vlog.VI(2).Infof("sync: AddNames: begin")
+	defer vlog.VI(2).Infof("sync: AddNames: end")
 
 	s := ss.(*syncService)
 	s.nameLock.Lock()
 	defer s.nameLock.Unlock()
 
 	mInfo := s.copyMemberInfo(ctx, s.name)
-	if mInfo == nil {
-		vlog.VI(2).Infof("sync: GetNames:: end returning no names")
+	if mInfo == nil || len(mInfo.mtTables) == 0 {
+		vlog.VI(2).Infof("sync: AddNames: end returning no names")
 		return nil
 	}
 
 	for mt := range mInfo.mtTables {
 		name := naming.Join(mt, s.name)
 		if err := svr.AddName(name); err != nil {
+			vlog.VI(2).Infof("sync: AddNames: end returning err %v", err)
 			return err
 		}
 	}
 
-	return nil
+	return s.publishInNeighborhood(svr)
+}
+
+// publishInNeighborhood checks if the Syncbase service is already being
+// advertised over the neighborhood. If not, it begins advertising. The caller
+// of the function is holding nameLock.
+func (s *syncService) publishInNeighborhood(svr rpc.Server) error {
+	// Syncbase is already being advertised.
+	if s.advCancel != nil {
+		return nil
+	}
+
+	ctx, stop := context.WithCancel(s.ctx)
+
+	advertiser := v23.GetDiscovery(ctx)
+	if advertiser == nil {
+		vlog.Fatal("sync: publishInNeighborhood: discovery not initialized.")
+	}
+
+	// TODO(hpucha): For now we grab the current address of the server. This
+	// will be replaced by library support that will take care of roaming.
+	var eps []string
+	for _, ep := range svr.Status().Endpoints {
+		eps = append(eps, ep.Name())
+	}
+
+	sbService := discovery.Service{
+		InstanceUuid:  []byte(s.name),
+		InstanceName:  s.name,
+		InterfaceName: interfaces.SyncDesc.PkgPath + "/" + interfaces.SyncDesc.Name,
+		Addrs:         eps,
+	}
+
+	// Duplicate calls to advertise will return an error.
+	err := advertiser.Advertise(ctx, sbService, nil)
+	if err == nil {
+		s.advCancel = stop
+	}
+	return err
 }
 
 // Close waits for spawned sync threads (watcher and initiator) to shut down,
 // and closes the local blob store handle.
 func Close(ss interfaces.SyncServerMethods) {
-	vlog.VI(2).Infof("sync: Close:: begin")
-	defer vlog.VI(2).Infof("sync: Close:: end")
+	vlog.VI(2).Infof("sync: Close: begin")
+	defer vlog.VI(2).Infof("sync: Close: end")
 
 	s := ss.(*syncService)
 	close(s.closed)
diff --git a/services/syncbase/vsync/syncer.go b/services/syncbase/vsync/syncer.go
index 2ec041b..6f1c507 100644
--- a/services/syncbase/vsync/syncer.go
+++ b/services/syncbase/vsync/syncer.go
@@ -5,11 +5,13 @@
 package vsync
 
 import (
+	"sync"
 	"time"
 
 	"v.io/v23/context"
 	"v.io/v23/verror"
 	"v.io/x/lib/vlog"
+	"v.io/x/ref/services/syncbase/server/interfaces"
 )
 
 // Policies to pick a peer to sync with.
@@ -17,7 +19,11 @@
 	// Picks a peer at random from the available set.
 	selectRandom = iota
 
-	// TODO(hpucha): implement other policies.
+	// Picks a peer based on network availability and available Syncbases
+	// over the neighborhood via discovery.
+	selectNeighborhoodAware
+
+	// TODO(hpucha): implement these policies.
 	// Picks a peer with most differing generations.
 	selectMostDiff
 
@@ -40,6 +46,34 @@
 	connectionTimeOut = 2 * time.Second
 )
 
+// connInfo holds the information needed to connect to a peer.
+//
+// TODO(hpucha): Add hints to decide if both neighborhood and mount table must
+// be tried. Currently, if the addr is set, only the addr is tried.
+type connInfo struct {
+	// Name of the peer relative to the mount table chosen by the syncgroup
+	// creator.
+	relName string
+	// Network address of the peer if available. For example, this can be
+	// obtained from neighborhood discovery.
+	addr string
+}
+
+// peerSelector defines the interface that a peer selection algorithm must
+// provide.
+type peerSelector interface {
+	// pickPeer picks a Syncbase to sync with.
+	pickPeer(ctx *context.T) (connInfo, error)
+
+	// updatePeerFromSyncer updates information for a peer that the syncer
+	// attempts to connect to.
+	updatePeerFromSyncer(ctx *context.T, peer connInfo, attemptTs time.Time, failed bool) error
+
+	// updatePeerFromResponder updates information for a peer that the
+	// responder responds to.
+	updatePeerFromResponder(ctx *context.T, peer string, connTs time.Time, gv interfaces.GenVector) error
+}
+
 // syncer wakes up every peerSyncInterval to do work: (1) Refresh memberView if
 // needed and pick a peer from all the known remote peers to sync with. (2) Act
 // as an initiator and sync syncgroup metadata for all common syncgroups with
@@ -54,6 +88,7 @@
 func (s *syncService) syncer(ctx *context.T) {
 	defer s.pending.Done()
 
+	s.newPeerSelector(ctx)
 	ticker := time.NewTicker(peerSyncInterval)
 	defer ticker.Stop()
 
@@ -69,7 +104,6 @@
 			break
 		}
 	}
-
 	vlog.VI(1).Info("sync: syncer: channel closed, stop work and exit")
 }
 
@@ -77,41 +111,141 @@
 	// TODO(hpucha): Cut a gen for the responder even if there is no
 	// one to initiate to?
 
-	peer, err := s.pickPeer(ctx)
+	// Do work.
+	attemptTs := time.Now()
+	peer, err := s.ps.pickPeer(ctx)
 	if err != nil {
 		return
 	}
 
-	s.syncClock(ctx, peer)
+	err = s.syncClock(ctx, peer)
+	// Abort syncing if there is a connection error with peer.
+	if verror.ErrorID(err) != interfaces.ErrConnFail.ID {
+		err = s.getDeltas(ctx, peer)
+	}
 
-	// Sync syncgroup metadata and data.
-	s.getDeltas(ctx, peer)
+	s.ps.updatePeerFromSyncer(ctx, peer, attemptTs, verror.ErrorID(err) == interfaces.ErrConnFail.ID)
 }
 
 ////////////////////////////////////////
 // Peer selection policies.
 
-// pickPeer picks a Syncbase to sync with.
-func (s *syncService) pickPeer(ctx *context.T) (string, error) {
+func (s *syncService) newPeerSelector(ctx *context.T) error {
 	switch peerSelectionPolicy {
 	case selectRandom:
-		members := s.getMembers(ctx)
-		// Remove myself from the set.
-		delete(members, s.name)
-		if len(members) == 0 {
-			return "", verror.New(verror.ErrInternal, ctx, "no useful peer")
-		}
-
-		// Pick a peer at random.
-		ind := randIntn(len(members))
-		for m := range members {
-			if ind == 0 {
-				return m, nil
-			}
-			ind--
-		}
-		return "", verror.New(verror.ErrInternal, ctx, "random selection didn't succeed")
+		s.ps = &randomPeerSelector{s: s}
+		return nil
+	case selectNeighborhoodAware:
+		s.ps = &neighborhoodAwarePeerSelector{s: s}
+		return nil
 	default:
-		return "", verror.New(verror.ErrInternal, ctx, "unknown peer selection policy")
+		return verror.New(verror.ErrInternal, ctx, "unknown peer selection policy")
 	}
 }
+
+////////////////////////////////////////
+// Random selector.
+
+type randomPeerSelector struct {
+	s *syncService
+}
+
+func (ps *randomPeerSelector) pickPeer(ctx *context.T) (connInfo, error) {
+	var peer connInfo
+
+	members := ps.s.getMembers(ctx)
+	// Remove myself from the set.
+	delete(members, ps.s.name)
+	if len(members) == 0 {
+		return peer, verror.New(verror.ErrInternal, ctx, "no useful peer")
+	}
+
+	// Pick a peer at random.
+	ind := randIntn(len(members))
+	for m := range members {
+		if ind == 0 {
+			peer.relName = m
+			return peer, nil
+		}
+		ind--
+	}
+	return peer, verror.New(verror.ErrInternal, ctx, "random selection didn't succeed")
+}
+
+func (ps *randomPeerSelector) updatePeerFromSyncer(ctx *context.T, peer connInfo, attemptTs time.Time, failed bool) error {
+	// Random selector does not care about this information.
+	return nil
+}
+
+func (ps *randomPeerSelector) updatePeerFromResponder(ctx *context.T, peer string, connTs time.Time, gv interfaces.GenVector) error {
+	// Random selector does not care about this information.
+	return nil
+}
+
+////////////////////////////////////////
+// NeighborhoodAware selector.
+
+// peerSyncInfo is the running statistics collected per peer, for a peer which
+// syncs with this node or with which this node syncs with.
+type peerSyncInfo struct {
+	// Number of continuous failures noticed when attempting to connect with
+	// this peer, either via its advertised mount table or via
+	// neighborhood. These counters are reset when the connection to the
+	// peer succeeds.
+	numFailuresMountTable   uint64
+	numFailuresNeighborhood uint64
+
+	// The most recent timestamp when a connection to this peer was attempted.
+	attemptTs time.Time
+	// The most recent timestamp when a connection to this peer succeeded.
+	successTs time.Time
+	// The most recent timestamp when this peer synced with this node.
+	fromTs time.Time
+	// Map of database names and their corresponding generation vectors for
+	// data and syncgroups.
+	gvs map[string]interfaces.GenVector
+}
+
+type neighborhoodAwarePeerSelector struct {
+	s *syncService
+	// In-memory cache of information relevant to syncing with a peer. This
+	// information could potentially be used in peer selection.
+	peerTbl     map[string]*peerSyncInfo
+	peerTblLock sync.RWMutex
+}
+
+func (ps *neighborhoodAwarePeerSelector) pickPeer(ctx *context.T) (connInfo, error) {
+	var peer connInfo
+	return peer, nil
+}
+
+func (ps *neighborhoodAwarePeerSelector) updatePeerFromSyncer(ctx *context.T, peer connInfo, attemptTs time.Time, failed bool) error {
+	ps.peerTblLock.Lock()
+	defer ps.peerTblLock.Unlock()
+
+	info, ok := ps.peerTbl[peer.relName]
+	if !ok {
+		info = &peerSyncInfo{}
+		ps.peerTbl[peer.relName] = info
+	}
+
+	info.attemptTs = attemptTs
+	if !failed {
+		info.numFailuresMountTable = 0
+		info.numFailuresNeighborhood = 0
+		info.successTs = time.Now()
+		return nil
+	}
+
+	if peer.addr != "" {
+		info.numFailuresNeighborhood++
+	} else {
+		info.numFailuresMountTable++
+	}
+
+	return nil
+}
+
+func (ps *neighborhoodAwarePeerSelector) updatePeerFromResponder(ctx *context.T, peer string, connTs time.Time, gv interfaces.GenVector) error {
+	return nil
+}
diff --git a/services/syncbase/vsync/syncgroup.go b/services/syncbase/vsync/syncgroup.go
index 5433869..3d1f5fc 100644
--- a/services/syncbase/vsync/syncgroup.go
+++ b/services/syncbase/vsync/syncgroup.go
@@ -1144,9 +1144,7 @@
 		}
 	}
 
-	// TODO(hpucha): Do we have to publish in neighborhood explicitly?
-
-	return nil
+	return ss.publishInNeighborhood(call.Server())
 }
 
 func (sd *syncDatabase) joinSyncgroupAtAdmin(ctx *context.T, call rpc.ServerCall, sgName, name string, myInfo wire.SyncgroupMemberInfo) (interfaces.Syncgroup, string, interfaces.PrefixGenVector, error) {