Syncbase: Set visibility on syncgroup advertisments and update
the advertisements whenever the read ACL of the syncgroup changes.
MultiPart: 2/2
Change-Id: I4b327df6b2c34f4c433f79c044a95fe34ac1ce3f
diff --git a/services/syncbase/testutil/util.go b/services/syncbase/testutil/util.go
index e941d22..7aaad54 100644
--- a/services/syncbase/testutil/util.go
+++ b/services/syncbase/testutil/util.go
@@ -24,6 +24,9 @@
"v.io/v23/verror"
"v.io/v23/vom"
"v.io/x/lib/vlog"
+ idiscovery "v.io/x/ref/lib/discovery"
+ fdiscovery "v.io/x/ref/lib/discovery/factory"
+ "v.io/x/ref/lib/discovery/plugins/mock"
"v.io/x/ref/services/syncbase/server"
"v.io/x/ref/services/syncbase/store"
"v.io/x/ref/test"
@@ -96,6 +99,8 @@
// TODO(sadovsky): Switch unit tests to v23test.Shell, then delete this.
func SetupOrDieCustom(clientSuffix, serverSuffix string, perms access.Permissions) (ctx, clientCtx *context.T, serverName string, rootp security.Principal, cleanup func()) {
ctx, shutdown := test.V23Init()
+ df, _ := idiscovery.NewFactory(ctx, mock.New())
+ fdiscovery.InjectFactory(df)
rootp = tsecurity.NewPrincipal("root")
clientCtx, serverCtx := NewCtx(ctx, rootp, clientSuffix), NewCtx(ctx, rootp, serverSuffix)
@@ -276,11 +281,10 @@
}
serverCtx, cancel := context.WithCancel(serverCtx)
service, err := server.NewService(serverCtx, server.ServiceOptions{
- Perms: perms,
- RootDir: rootDir,
- Engine: store.EngineForTest,
- DevMode: true,
- SkipPublishInNh: true,
+ Perms: perms,
+ RootDir: rootDir,
+ Engine: store.EngineForTest,
+ DevMode: true,
})
if err != nil {
vlog.Fatal("server.NewService() failed: ", err)
@@ -289,6 +293,7 @@
if err != nil {
vlog.Fatal("v23.WithNewDispatchingServer() failed: ", err)
}
+ service.AddNames(serverCtx, s)
name := s.Status().Endpoints[0].Name()
return name, func() {
cancel()
diff --git a/services/syncbase/vsync/constants.go b/services/syncbase/vsync/constants.go
index 2791c4c..a34e1be 100644
--- a/services/syncbase/vsync/constants.go
+++ b/services/syncbase/vsync/constants.go
@@ -22,11 +22,4 @@
// <logPrefix>:<sgoid> records (for syncgroup metadata), where <logDataPrefix>
// is defined below, and <sgoid> is <sgDataPrefix>:<GroupId>.
logDataPrefix = "d"
-
- // Types of discovery service attributes.
- discoveryAttrPeer = "p"
- discoveryAttrSyncgroupName = "s"
- discoveryAttrSyncgroupBlessing = "sb"
- discoveryAttrDatabaseName = "d"
- discoveryAttrDatabaseBlessing = "db"
)
diff --git a/services/syncbase/vsync/initiator.go b/services/syncbase/vsync/initiator.go
index 0441dd8..c49f428 100644
--- a/services/syncbase/vsync/initiator.go
+++ b/services/syncbase/vsync/initiator.go
@@ -781,8 +781,9 @@
vlog.Fatalf("sync: processUpdatedObjects: putting geninfo in memory failed for db %v, err %v", iSt.config.dbId, err)
}
- // Ignore errors.
- iSt.advertiseSyncgroups(ctx)
+ // There is no need to wait for the new advertisement to finish so
+ // we just run it asynchronously in its own goroutine.
+ go iSt.advertiseSyncgroups(ctx)
vlog.VI(4).Info("sync: processUpdatedObjects: end: changes committed")
return nil
diff --git a/services/syncbase/vsync/peer_manager_test.go b/services/syncbase/vsync/peer_manager_test.go
index b8f74b2..1b07e96 100644
--- a/services/syncbase/vsync/peer_manager_test.go
+++ b/services/syncbase/vsync/peer_manager_test.go
@@ -59,11 +59,11 @@
// Add a few peers to simulate neighborhood.
s.updateDiscoveryInfo("a", &discovery.Advertisement{
- Attributes: discovery.Attributes{discoveryAttrPeer: "a"},
+ Attributes: discovery.Attributes{wire.DiscoveryAttrPeer: "a"},
Addresses: []string{"aa", "aaa"},
})
s.updateDiscoveryInfo("b", &discovery.Advertisement{
- Attributes: discovery.Attributes{discoveryAttrPeer: "b"},
+ Attributes: discovery.Attributes{wire.DiscoveryAttrPeer: "b"},
Addresses: []string{"bb", "bbb"},
})
diff --git a/services/syncbase/vsync/sync.go b/services/syncbase/vsync/sync.go
index 776918e..8640fe6 100644
--- a/services/syncbase/vsync/sync.go
+++ b/services/syncbase/vsync/sync.go
@@ -24,7 +24,9 @@
"v.io/v23/discovery"
"v.io/v23/naming"
"v.io/v23/rpc"
+ "v.io/v23/security/access"
wire "v.io/v23/services/syncbase"
+ "v.io/v23/syncbase"
"v.io/v23/verror"
"v.io/x/lib/vlog"
idiscovery "v.io/x/ref/lib/discovery"
@@ -90,8 +92,9 @@
// Cancel functions for contexts derived from the root context when
// advertising over neighborhood. This is needed to stop advertising.
- cancelAdvSyncbase context.CancelFunc // cancels Syncbase advertising.
- cancelAdvSyncgroups map[wire.Id]context.CancelFunc // cancels syncgroup advertising.
+ cancelAdvSyncbase context.CancelFunc // cancels Syncbase advertising.
+ advSyncgroups map[interfaces.GroupId]syncAdvertisementState // describes syncgroup advertising.
+ advLock sync.Mutex
// Whether to enable neighborhood advertising.
publishInNh bool
@@ -131,6 +134,13 @@
sync interfaces.SyncServerMethods
}
+// syncAdvertisementState contains information about the most recent
+// advertisement for each advertised syncgroup.
+type syncAdvertisementState struct {
+ cancel context.CancelFunc // cancels advertising.
+ specVersion string // version of most recently advertised spec.
+}
+
var (
ifName = interfaces.SyncDesc.PkgPath + "/" + interfaces.SyncDesc.Name
_ interfaces.SyncServerMethods = (*syncService)(nil)
@@ -148,20 +158,20 @@
// that the syncer can pick from. In addition, the sync module responds to
// incoming RPCs from remote sync modules and local clients.
func New(ctx *context.T, sv interfaces.Service, blobStEngine, blobRootDir string, cl *vclock.VClock, publishInNh bool) (*syncService, error) {
- discovery, err := v23.NewDiscovery(v23.WithListenSpec(ctx, rpc.ListenSpec{}))
+ discovery, err := syncbase.NewDiscovery(v23.WithListenSpec(ctx, rpc.ListenSpec{}))
if err != nil {
return nil, err
}
s := &syncService{
- sv: sv,
- batches: make(batchSet),
- sgPublishQueue: list.New(),
- vclock: cl,
- ctx: ctx,
- rng: rand.New(rand.NewSource(time.Now().UTC().UnixNano())),
- discovery: discovery,
- publishInNh: publishInNh,
- cancelAdvSyncgroups: make(map[wire.Id]context.CancelFunc),
+ sv: sv,
+ batches: make(batchSet),
+ sgPublishQueue: list.New(),
+ vclock: cl,
+ ctx: ctx,
+ rng: rand.New(rand.NewSource(time.Now().UTC().UnixNano())),
+ discovery: discovery,
+ publishInNh: publishInNh,
+ advSyncgroups: make(map[interfaces.GroupId]syncAdvertisementState),
}
data := &SyncData{}
@@ -302,7 +312,7 @@
}
// handle peers
- if peer, ok := attrs[discoveryAttrPeer]; ok {
+ if peer, ok := attrs[wire.DiscoveryAttrPeer]; ok {
// The attribute value is the Syncbase peer name.
if ad != nil {
s.discoveryPeers[peer] = ad
@@ -312,10 +322,10 @@
}
// handle syngroups
- if sgName, ok := attrs[discoveryAttrSyncgroupName]; ok {
+ if sgName, ok := attrs[wire.DiscoveryAttrSyncgroupName]; ok {
// The attribute value is the syncgroup name.
- dbId := wire.Id{Name: attrs[discoveryAttrDatabaseName], Blessing: attrs[discoveryAttrDatabaseBlessing]}
- sgId := wire.Id{Name: sgName, Blessing: attrs[discoveryAttrSyncgroupBlessing]}
+ dbId := wire.Id{Name: attrs[wire.DiscoveryAttrDatabaseName], Blessing: attrs[wire.DiscoveryAttrDatabaseBlessing]}
+ sgId := wire.Id{Name: sgName, Blessing: attrs[wire.DiscoveryAttrSyncgroupBlessing]}
gid := SgIdToGid(dbId, sgId)
admins := s.discoverySyncgroups[gid]
if ad != nil {
@@ -453,6 +463,8 @@
vlog.VI(4).Infof("sync: advertiseSyncbaseInNeighborhood: begin")
defer vlog.VI(4).Infof("sync: advertiseSyncbaseInNeighborhood: end")
+ s.advLock.Lock()
+ defer s.advLock.Unlock()
// Syncbase is already being advertised.
if s.cancelAdvSyncbase != nil {
return nil
@@ -461,7 +473,7 @@
sbService := discovery.Advertisement{
InterfaceName: ifName,
Attributes: discovery.Attributes{
- discoveryAttrPeer: s.name,
+ wire.DiscoveryAttrPeer: s.name,
},
}
@@ -487,41 +499,62 @@
if !s.publishInNh {
return nil
}
-
vlog.VI(4).Infof("sync: advertiseSyncgroupInNeighborhood: begin")
defer vlog.VI(4).Infof("sync: advertiseSyncgroupInNeighborhood: end")
- if !syncgroupAdmin(s.ctx, sg.Spec.Perms) {
- if cancel := s.cancelAdvSyncgroups[sg.Id]; cancel != nil {
- cancel()
- delete(s.cancelAdvSyncgroups, sg.Id)
- }
- return nil
+ s.advLock.Lock()
+ defer s.advLock.Unlock()
+
+ // Refresh the sg spec before advertising. This prevents trampling
+ // when concurrent spec updates apply transactions in one ordering,
+ // but advertise in another.
+ gid := SgIdToGid(sg.DbId, sg.Id)
+ st, err := s.getDbStore(s.ctx, nil, sg.DbId)
+ if err != nil {
+ return err
+ }
+ sg, err = getSyncgroupByGid(s.ctx, st, gid)
+ if err != nil {
+ return err
}
- // Syncgroup is already being advertised.
- if s.cancelAdvSyncgroups[sg.Id] != nil {
+ if state, advertising := s.advSyncgroups[gid]; advertising {
+ // The spec hasn't changed since the last advertisement.
+ if sg.SpecVersion == state.specVersion {
+ return nil
+ }
+ state.cancel()
+ delete(s.advSyncgroups, gid)
+ }
+
+ // We only advertise if potential members could join by contacting us.
+ // For that reason we only advertise if we are an admin.
+ if !syncgroupAdmin(s.ctx, sg.Spec.Perms) {
return nil
}
sbService := discovery.Advertisement{
InterfaceName: ifName,
Attributes: discovery.Attributes{
- discoveryAttrDatabaseName: sg.DbId.Name,
- discoveryAttrDatabaseBlessing: sg.DbId.Blessing,
- discoveryAttrSyncgroupName: sg.Id.Name,
- discoveryAttrSyncgroupBlessing: sg.Id.Blessing,
+ wire.DiscoveryAttrDatabaseName: sg.DbId.Name,
+ wire.DiscoveryAttrDatabaseBlessing: sg.DbId.Blessing,
+ wire.DiscoveryAttrSyncgroupName: sg.Id.Name,
+ wire.DiscoveryAttrSyncgroupBlessing: sg.Id.Blessing,
},
}
ctx, stop := context.WithCancel(s.ctx)
vlog.VI(4).Infof("sync: advertiseSyncgroupInNeighborhood: advertising %v", sbService)
- // Note that duplicate calls to advertise will return an error.
- _, err := idiscovery.AdvertiseServer(ctx, s.discovery, s.svr, "", &sbService, nil)
+ // TODO(mattr): Unfortunately, discovery visibility isn't as powerful
+ // as an ACL. There's no way to represent the NotIn list. For now
+ // if you match the In list you can see the advertisement, though you
+ // might not be able to join.
+ visibility := sg.Spec.Perms[string(access.Read)].In
+ _, err = idiscovery.AdvertiseServer(ctx, s.discovery, s.svr, "", &sbService, visibility)
if err == nil {
vlog.VI(4).Infof("sync: advertiseSyncgroupInNeighborhood: successful")
- s.cancelAdvSyncgroups[sg.Id] = stop
+ s.advSyncgroups[gid] = syncAdvertisementState{cancel: stop, specVersion: sg.SpecVersion}
return nil
}
stop()
diff --git a/services/syncbase/vsync/sync_test.go b/services/syncbase/vsync/sync_test.go
index 6f801d3..d136268 100644
--- a/services/syncbase/vsync/sync_test.go
+++ b/services/syncbase/vsync/sync_test.go
@@ -38,11 +38,11 @@
// Add peer neighbors.
svcA := &discovery.Advertisement{
- Attributes: discovery.Attributes{discoveryAttrPeer: "a"},
+ Attributes: discovery.Attributes{wire.DiscoveryAttrPeer: "a"},
Addresses: []string{"aa", "aaa"},
}
svcB := &discovery.Advertisement{
- Attributes: discovery.Attributes{discoveryAttrPeer: "b"},
+ Attributes: discovery.Attributes{wire.DiscoveryAttrPeer: "b"},
Addresses: []string{"bb", "bbb"},
}
@@ -96,26 +96,26 @@
// Add syncgroup admin neighbors.
svcA := &discovery.Advertisement{
Attributes: discovery.Attributes{
- discoveryAttrDatabaseName: "dfoo",
- discoveryAttrDatabaseBlessing: "dblessing",
- discoveryAttrSyncgroupName: "foo",
- discoveryAttrSyncgroupBlessing: "blessing"},
+ wire.DiscoveryAttrDatabaseName: "dfoo",
+ wire.DiscoveryAttrDatabaseBlessing: "dblessing",
+ wire.DiscoveryAttrSyncgroupName: "foo",
+ wire.DiscoveryAttrSyncgroupBlessing: "blessing"},
Addresses: []string{"aa", "aaa"},
}
svcB := &discovery.Advertisement{
Attributes: discovery.Attributes{
- discoveryAttrDatabaseName: "dfoo",
- discoveryAttrDatabaseBlessing: "dblessing",
- discoveryAttrSyncgroupName: "foo",
- discoveryAttrSyncgroupBlessing: "blessing"},
+ wire.DiscoveryAttrDatabaseName: "dfoo",
+ wire.DiscoveryAttrDatabaseBlessing: "dblessing",
+ wire.DiscoveryAttrSyncgroupName: "foo",
+ wire.DiscoveryAttrSyncgroupBlessing: "blessing"},
Addresses: []string{"bb", "bbb"},
}
svcC := &discovery.Advertisement{
Attributes: discovery.Attributes{
- discoveryAttrDatabaseName: "dbar",
- discoveryAttrDatabaseBlessing: "dblessing",
- discoveryAttrSyncgroupName: "bar",
- discoveryAttrSyncgroupBlessing: "blessing"},
+ wire.DiscoveryAttrDatabaseName: "dbar",
+ wire.DiscoveryAttrDatabaseBlessing: "dblessing",
+ wire.DiscoveryAttrSyncgroupName: "bar",
+ wire.DiscoveryAttrSyncgroupBlessing: "blessing"},
Addresses: []string{"cc", "ccc"},
}
diff --git a/services/syncbase/vsync/syncgroup.go b/services/syncbase/vsync/syncgroup.go
index fb251b4..241672a 100644
--- a/services/syncbase/vsync/syncgroup.go
+++ b/services/syncbase/vsync/syncgroup.go
@@ -963,6 +963,7 @@
ss := sd.sync.(*syncService)
dbId := sd.db.Id()
gid := SgIdToGid(dbId, sgId)
+ var sg *interfaces.Syncgroup
err := watchable.RunInTransaction(sd.db.St(), func(tx *watchable.Transaction) error {
// Check permissions on Database.
@@ -970,7 +971,8 @@
return err
}
- sg, err := getSyncgroupByGid(ctx, tx, gid)
+ var err error
+ sg, err = getSyncgroupByGid(ctx, tx, gid)
if err != nil {
return err
}
@@ -1017,7 +1019,9 @@
if err != nil {
return err
}
-
+ if err = ss.advertiseSyncgroupInNeighborhood(sg); err != nil {
+ return err
+ }
return ss.checkptSgLocalGen(ctx, dbId, gid)
}
@@ -1235,9 +1239,8 @@
// continue when members are online despite these errors.
vlog.Errorf("sync: advertiseSyncbaseInNeighborhood: failed with err %v", err)
}
-
// TODO(hpucha): In case of a joiner, this can be optimized such that we
- // don't advertise until the syncgroup is in pending state.
+ // don't advertise until the syncgroup is out of the pending state.
if err := s.advertiseSyncgroupInNeighborhood(sg); err != nil {
// We ignore errors on neighborhood advertising since sync when
// members are online can continue despite these errors.