Syncbase: Set visibility on syncgroup advertisments and update
the advertisements whenever the read ACL of the syncgroup changes.

MultiPart: 2/2
Change-Id: I4b327df6b2c34f4c433f79c044a95fe34ac1ce3f
diff --git a/services/syncbase/testutil/util.go b/services/syncbase/testutil/util.go
index e941d22..7aaad54 100644
--- a/services/syncbase/testutil/util.go
+++ b/services/syncbase/testutil/util.go
@@ -24,6 +24,9 @@
 	"v.io/v23/verror"
 	"v.io/v23/vom"
 	"v.io/x/lib/vlog"
+	idiscovery "v.io/x/ref/lib/discovery"
+	fdiscovery "v.io/x/ref/lib/discovery/factory"
+	"v.io/x/ref/lib/discovery/plugins/mock"
 	"v.io/x/ref/services/syncbase/server"
 	"v.io/x/ref/services/syncbase/store"
 	"v.io/x/ref/test"
@@ -96,6 +99,8 @@
 // TODO(sadovsky): Switch unit tests to v23test.Shell, then delete this.
 func SetupOrDieCustom(clientSuffix, serverSuffix string, perms access.Permissions) (ctx, clientCtx *context.T, serverName string, rootp security.Principal, cleanup func()) {
 	ctx, shutdown := test.V23Init()
+	df, _ := idiscovery.NewFactory(ctx, mock.New())
+	fdiscovery.InjectFactory(df)
 	rootp = tsecurity.NewPrincipal("root")
 	clientCtx, serverCtx := NewCtx(ctx, rootp, clientSuffix), NewCtx(ctx, rootp, serverSuffix)
 
@@ -276,11 +281,10 @@
 	}
 	serverCtx, cancel := context.WithCancel(serverCtx)
 	service, err := server.NewService(serverCtx, server.ServiceOptions{
-		Perms:           perms,
-		RootDir:         rootDir,
-		Engine:          store.EngineForTest,
-		DevMode:         true,
-		SkipPublishInNh: true,
+		Perms:   perms,
+		RootDir: rootDir,
+		Engine:  store.EngineForTest,
+		DevMode: true,
 	})
 	if err != nil {
 		vlog.Fatal("server.NewService() failed: ", err)
@@ -289,6 +293,7 @@
 	if err != nil {
 		vlog.Fatal("v23.WithNewDispatchingServer() failed: ", err)
 	}
+	service.AddNames(serverCtx, s)
 	name := s.Status().Endpoints[0].Name()
 	return name, func() {
 		cancel()
diff --git a/services/syncbase/vsync/constants.go b/services/syncbase/vsync/constants.go
index 2791c4c..a34e1be 100644
--- a/services/syncbase/vsync/constants.go
+++ b/services/syncbase/vsync/constants.go
@@ -22,11 +22,4 @@
 	// <logPrefix>:<sgoid> records (for syncgroup metadata), where <logDataPrefix>
 	// is defined below, and <sgoid> is <sgDataPrefix>:<GroupId>.
 	logDataPrefix = "d"
-
-	// Types of discovery service attributes.
-	discoveryAttrPeer              = "p"
-	discoveryAttrSyncgroupName     = "s"
-	discoveryAttrSyncgroupBlessing = "sb"
-	discoveryAttrDatabaseName      = "d"
-	discoveryAttrDatabaseBlessing  = "db"
 )
diff --git a/services/syncbase/vsync/initiator.go b/services/syncbase/vsync/initiator.go
index 0441dd8..c49f428 100644
--- a/services/syncbase/vsync/initiator.go
+++ b/services/syncbase/vsync/initiator.go
@@ -781,8 +781,9 @@
 				vlog.Fatalf("sync: processUpdatedObjects: putting geninfo in memory failed for db %v, err %v", iSt.config.dbId, err)
 			}
 
-			// Ignore errors.
-			iSt.advertiseSyncgroups(ctx)
+			// There is no need to wait for the new advertisement to finish so
+			// we just run it asynchronously in its own goroutine.
+			go iSt.advertiseSyncgroups(ctx)
 
 			vlog.VI(4).Info("sync: processUpdatedObjects: end: changes committed")
 			return nil
diff --git a/services/syncbase/vsync/peer_manager_test.go b/services/syncbase/vsync/peer_manager_test.go
index b8f74b2..1b07e96 100644
--- a/services/syncbase/vsync/peer_manager_test.go
+++ b/services/syncbase/vsync/peer_manager_test.go
@@ -59,11 +59,11 @@
 
 	// Add a few peers to simulate neighborhood.
 	s.updateDiscoveryInfo("a", &discovery.Advertisement{
-		Attributes: discovery.Attributes{discoveryAttrPeer: "a"},
+		Attributes: discovery.Attributes{wire.DiscoveryAttrPeer: "a"},
 		Addresses:  []string{"aa", "aaa"},
 	})
 	s.updateDiscoveryInfo("b", &discovery.Advertisement{
-		Attributes: discovery.Attributes{discoveryAttrPeer: "b"},
+		Attributes: discovery.Attributes{wire.DiscoveryAttrPeer: "b"},
 		Addresses:  []string{"bb", "bbb"},
 	})
 
diff --git a/services/syncbase/vsync/sync.go b/services/syncbase/vsync/sync.go
index 776918e..8640fe6 100644
--- a/services/syncbase/vsync/sync.go
+++ b/services/syncbase/vsync/sync.go
@@ -24,7 +24,9 @@
 	"v.io/v23/discovery"
 	"v.io/v23/naming"
 	"v.io/v23/rpc"
+	"v.io/v23/security/access"
 	wire "v.io/v23/services/syncbase"
+	"v.io/v23/syncbase"
 	"v.io/v23/verror"
 	"v.io/x/lib/vlog"
 	idiscovery "v.io/x/ref/lib/discovery"
@@ -90,8 +92,9 @@
 
 	// Cancel functions for contexts derived from the root context when
 	// advertising over neighborhood. This is needed to stop advertising.
-	cancelAdvSyncbase   context.CancelFunc             // cancels Syncbase advertising.
-	cancelAdvSyncgroups map[wire.Id]context.CancelFunc // cancels syncgroup advertising.
+	cancelAdvSyncbase context.CancelFunc                            // cancels Syncbase advertising.
+	advSyncgroups     map[interfaces.GroupId]syncAdvertisementState // describes syncgroup advertising.
+	advLock           sync.Mutex
 
 	// Whether to enable neighborhood advertising.
 	publishInNh bool
@@ -131,6 +134,13 @@
 	sync interfaces.SyncServerMethods
 }
 
+// syncAdvertisementState contains information about the most recent
+// advertisement for each advertised syncgroup.
+type syncAdvertisementState struct {
+	cancel      context.CancelFunc // cancels advertising.
+	specVersion string             // version of most recently advertised spec.
+}
+
 var (
 	ifName                              = interfaces.SyncDesc.PkgPath + "/" + interfaces.SyncDesc.Name
 	_      interfaces.SyncServerMethods = (*syncService)(nil)
@@ -148,20 +158,20 @@
 // that the syncer can pick from. In addition, the sync module responds to
 // incoming RPCs from remote sync modules and local clients.
 func New(ctx *context.T, sv interfaces.Service, blobStEngine, blobRootDir string, cl *vclock.VClock, publishInNh bool) (*syncService, error) {
-	discovery, err := v23.NewDiscovery(v23.WithListenSpec(ctx, rpc.ListenSpec{}))
+	discovery, err := syncbase.NewDiscovery(v23.WithListenSpec(ctx, rpc.ListenSpec{}))
 	if err != nil {
 		return nil, err
 	}
 	s := &syncService{
-		sv:                  sv,
-		batches:             make(batchSet),
-		sgPublishQueue:      list.New(),
-		vclock:              cl,
-		ctx:                 ctx,
-		rng:                 rand.New(rand.NewSource(time.Now().UTC().UnixNano())),
-		discovery:           discovery,
-		publishInNh:         publishInNh,
-		cancelAdvSyncgroups: make(map[wire.Id]context.CancelFunc),
+		sv:             sv,
+		batches:        make(batchSet),
+		sgPublishQueue: list.New(),
+		vclock:         cl,
+		ctx:            ctx,
+		rng:            rand.New(rand.NewSource(time.Now().UTC().UnixNano())),
+		discovery:      discovery,
+		publishInNh:    publishInNh,
+		advSyncgroups:  make(map[interfaces.GroupId]syncAdvertisementState),
 	}
 
 	data := &SyncData{}
@@ -302,7 +312,7 @@
 	}
 
 	// handle peers
-	if peer, ok := attrs[discoveryAttrPeer]; ok {
+	if peer, ok := attrs[wire.DiscoveryAttrPeer]; ok {
 		// The attribute value is the Syncbase peer name.
 		if ad != nil {
 			s.discoveryPeers[peer] = ad
@@ -312,10 +322,10 @@
 	}
 
 	// handle syngroups
-	if sgName, ok := attrs[discoveryAttrSyncgroupName]; ok {
+	if sgName, ok := attrs[wire.DiscoveryAttrSyncgroupName]; ok {
 		// The attribute value is the syncgroup name.
-		dbId := wire.Id{Name: attrs[discoveryAttrDatabaseName], Blessing: attrs[discoveryAttrDatabaseBlessing]}
-		sgId := wire.Id{Name: sgName, Blessing: attrs[discoveryAttrSyncgroupBlessing]}
+		dbId := wire.Id{Name: attrs[wire.DiscoveryAttrDatabaseName], Blessing: attrs[wire.DiscoveryAttrDatabaseBlessing]}
+		sgId := wire.Id{Name: sgName, Blessing: attrs[wire.DiscoveryAttrSyncgroupBlessing]}
 		gid := SgIdToGid(dbId, sgId)
 		admins := s.discoverySyncgroups[gid]
 		if ad != nil {
@@ -453,6 +463,8 @@
 	vlog.VI(4).Infof("sync: advertiseSyncbaseInNeighborhood: begin")
 	defer vlog.VI(4).Infof("sync: advertiseSyncbaseInNeighborhood: end")
 
+	s.advLock.Lock()
+	defer s.advLock.Unlock()
 	// Syncbase is already being advertised.
 	if s.cancelAdvSyncbase != nil {
 		return nil
@@ -461,7 +473,7 @@
 	sbService := discovery.Advertisement{
 		InterfaceName: ifName,
 		Attributes: discovery.Attributes{
-			discoveryAttrPeer: s.name,
+			wire.DiscoveryAttrPeer: s.name,
 		},
 	}
 
@@ -487,41 +499,62 @@
 	if !s.publishInNh {
 		return nil
 	}
-
 	vlog.VI(4).Infof("sync: advertiseSyncgroupInNeighborhood: begin")
 	defer vlog.VI(4).Infof("sync: advertiseSyncgroupInNeighborhood: end")
 
-	if !syncgroupAdmin(s.ctx, sg.Spec.Perms) {
-		if cancel := s.cancelAdvSyncgroups[sg.Id]; cancel != nil {
-			cancel()
-			delete(s.cancelAdvSyncgroups, sg.Id)
-		}
-		return nil
+	s.advLock.Lock()
+	defer s.advLock.Unlock()
+
+	// Refresh the sg spec before advertising.  This prevents trampling
+	// when concurrent spec updates apply transactions in one ordering,
+	// but advertise in another.
+	gid := SgIdToGid(sg.DbId, sg.Id)
+	st, err := s.getDbStore(s.ctx, nil, sg.DbId)
+	if err != nil {
+		return err
+	}
+	sg, err = getSyncgroupByGid(s.ctx, st, gid)
+	if err != nil {
+		return err
 	}
 
-	// Syncgroup is already being advertised.
-	if s.cancelAdvSyncgroups[sg.Id] != nil {
+	if state, advertising := s.advSyncgroups[gid]; advertising {
+		// The spec hasn't changed since the last advertisement.
+		if sg.SpecVersion == state.specVersion {
+			return nil
+		}
+		state.cancel()
+		delete(s.advSyncgroups, gid)
+	}
+
+	// We only advertise if potential members could join by contacting us.
+	// For that reason we only advertise if we are an admin.
+	if !syncgroupAdmin(s.ctx, sg.Spec.Perms) {
 		return nil
 	}
 
 	sbService := discovery.Advertisement{
 		InterfaceName: ifName,
 		Attributes: discovery.Attributes{
-			discoveryAttrDatabaseName:      sg.DbId.Name,
-			discoveryAttrDatabaseBlessing:  sg.DbId.Blessing,
-			discoveryAttrSyncgroupName:     sg.Id.Name,
-			discoveryAttrSyncgroupBlessing: sg.Id.Blessing,
+			wire.DiscoveryAttrDatabaseName:      sg.DbId.Name,
+			wire.DiscoveryAttrDatabaseBlessing:  sg.DbId.Blessing,
+			wire.DiscoveryAttrSyncgroupName:     sg.Id.Name,
+			wire.DiscoveryAttrSyncgroupBlessing: sg.Id.Blessing,
 		},
 	}
 	ctx, stop := context.WithCancel(s.ctx)
 
 	vlog.VI(4).Infof("sync: advertiseSyncgroupInNeighborhood: advertising %v", sbService)
 
-	// Note that duplicate calls to advertise will return an error.
-	_, err := idiscovery.AdvertiseServer(ctx, s.discovery, s.svr, "", &sbService, nil)
+	// TODO(mattr): Unfortunately, discovery visibility isn't as powerful
+	// as an ACL.  There's no way to represent the NotIn list.  For now
+	// if you match the In list you can see the advertisement, though you
+	// might not be able to join.
+	visibility := sg.Spec.Perms[string(access.Read)].In
+	_, err = idiscovery.AdvertiseServer(ctx, s.discovery, s.svr, "", &sbService, visibility)
 	if err == nil {
 		vlog.VI(4).Infof("sync: advertiseSyncgroupInNeighborhood: successful")
-		s.cancelAdvSyncgroups[sg.Id] = stop
+		s.advSyncgroups[gid] = syncAdvertisementState{cancel: stop, specVersion: sg.SpecVersion}
 		return nil
 	}
 	stop()
diff --git a/services/syncbase/vsync/sync_test.go b/services/syncbase/vsync/sync_test.go
index 6f801d3..d136268 100644
--- a/services/syncbase/vsync/sync_test.go
+++ b/services/syncbase/vsync/sync_test.go
@@ -38,11 +38,11 @@
 
 	// Add peer neighbors.
 	svcA := &discovery.Advertisement{
-		Attributes: discovery.Attributes{discoveryAttrPeer: "a"},
+		Attributes: discovery.Attributes{wire.DiscoveryAttrPeer: "a"},
 		Addresses:  []string{"aa", "aaa"},
 	}
 	svcB := &discovery.Advertisement{
-		Attributes: discovery.Attributes{discoveryAttrPeer: "b"},
+		Attributes: discovery.Attributes{wire.DiscoveryAttrPeer: "b"},
 		Addresses:  []string{"bb", "bbb"},
 	}
 
@@ -96,26 +96,26 @@
 	// Add syncgroup admin neighbors.
 	svcA := &discovery.Advertisement{
 		Attributes: discovery.Attributes{
-			discoveryAttrDatabaseName:      "dfoo",
-			discoveryAttrDatabaseBlessing:  "dblessing",
-			discoveryAttrSyncgroupName:     "foo",
-			discoveryAttrSyncgroupBlessing: "blessing"},
+			wire.DiscoveryAttrDatabaseName:      "dfoo",
+			wire.DiscoveryAttrDatabaseBlessing:  "dblessing",
+			wire.DiscoveryAttrSyncgroupName:     "foo",
+			wire.DiscoveryAttrSyncgroupBlessing: "blessing"},
 		Addresses: []string{"aa", "aaa"},
 	}
 	svcB := &discovery.Advertisement{
 		Attributes: discovery.Attributes{
-			discoveryAttrDatabaseName:      "dfoo",
-			discoveryAttrDatabaseBlessing:  "dblessing",
-			discoveryAttrSyncgroupName:     "foo",
-			discoveryAttrSyncgroupBlessing: "blessing"},
+			wire.DiscoveryAttrDatabaseName:      "dfoo",
+			wire.DiscoveryAttrDatabaseBlessing:  "dblessing",
+			wire.DiscoveryAttrSyncgroupName:     "foo",
+			wire.DiscoveryAttrSyncgroupBlessing: "blessing"},
 		Addresses: []string{"bb", "bbb"},
 	}
 	svcC := &discovery.Advertisement{
 		Attributes: discovery.Attributes{
-			discoveryAttrDatabaseName:      "dbar",
-			discoveryAttrDatabaseBlessing:  "dblessing",
-			discoveryAttrSyncgroupName:     "bar",
-			discoveryAttrSyncgroupBlessing: "blessing"},
+			wire.DiscoveryAttrDatabaseName:      "dbar",
+			wire.DiscoveryAttrDatabaseBlessing:  "dblessing",
+			wire.DiscoveryAttrSyncgroupName:     "bar",
+			wire.DiscoveryAttrSyncgroupBlessing: "blessing"},
 		Addresses: []string{"cc", "ccc"},
 	}
 
diff --git a/services/syncbase/vsync/syncgroup.go b/services/syncbase/vsync/syncgroup.go
index fb251b4..241672a 100644
--- a/services/syncbase/vsync/syncgroup.go
+++ b/services/syncbase/vsync/syncgroup.go
@@ -963,6 +963,7 @@
 	ss := sd.sync.(*syncService)
 	dbId := sd.db.Id()
 	gid := SgIdToGid(dbId, sgId)
+	var sg *interfaces.Syncgroup
 
 	err := watchable.RunInTransaction(sd.db.St(), func(tx *watchable.Transaction) error {
 		// Check permissions on Database.
@@ -970,7 +971,8 @@
 			return err
 		}
 
-		sg, err := getSyncgroupByGid(ctx, tx, gid)
+		var err error
+		sg, err = getSyncgroupByGid(ctx, tx, gid)
 		if err != nil {
 			return err
 		}
@@ -1017,7 +1019,9 @@
 	if err != nil {
 		return err
 	}
-
+	if err = ss.advertiseSyncgroupInNeighborhood(sg); err != nil {
+		return err
+	}
 	return ss.checkptSgLocalGen(ctx, dbId, gid)
 }
 
@@ -1235,9 +1239,8 @@
 		// continue when members are online despite these errors.
 		vlog.Errorf("sync: advertiseSyncbaseInNeighborhood: failed with err %v", err)
 	}
-
 	// TODO(hpucha): In case of a joiner, this can be optimized such that we
-	// don't advertise until the syncgroup is in pending state.
+	// don't advertise until the syncgroup is out of the pending state.
 	if err := s.advertiseSyncgroupInNeighborhood(sg); err != nil {
 		// We ignore errors on neighborhood advertising since sync when
 		// members are online can continue despite these errors.