syncbase/vsync: Handle JoinSyncGroup RPC from client-syncbase.
Change-Id: Ic535383816b9db05bb96ab7ceadab56aa59fde65
diff --git a/services/syncbase/server/interfaces/sync.vdl b/services/syncbase/server/interfaces/sync.vdl
index 7421b48..fe64066 100644
--- a/services/syncbase/server/interfaces/sync.vdl
+++ b/services/syncbase/server/interfaces/sync.vdl
@@ -4,6 +4,10 @@
package interfaces
+import (
+ wire "v.io/syncbase/v23/services/syncbase/nosql"
+)
+
// Sync defines methods for data exchange between Syncbases.
// TODO(hpucha): Flesh this out further.
type Sync interface {
@@ -18,9 +22,11 @@
// to publish the SyncGroup.
PublishSyncGroup(sg SyncGroup) error
- // JoinSyncGroup is invoked on a SyncGroup Admin and checks if
- // the requestor can join the SyncGroup.
- JoinSyncGroup() error
+ // JoinSyncGroupAtAdmin is invoked by a prospective SyncGroup member's
+ // Syncbase on a SyncGroup admin. It checks whether the requestor is
+ // allowed to join the named SyncGroup, and if so, adds the requestor to
+ // the SyncGroup.
+ JoinSyncGroupAtAdmin(sgName, joinerName string, myInfo wire.SyncGroupMemberInfo) (SyncGroup | error)
// BlobSync methods.
// FetchBlob returns the requested blob.
diff --git a/services/syncbase/server/interfaces/sync.vdl.go b/services/syncbase/server/interfaces/sync.vdl.go
index 37c9a1e..de16181 100644
--- a/services/syncbase/server/interfaces/sync.vdl.go
+++ b/services/syncbase/server/interfaces/sync.vdl.go
@@ -12,6 +12,9 @@
"v.io/v23"
"v.io/v23/context"
"v.io/v23/rpc"
+
+ // VDL user imports
+ "v.io/syncbase/v23/services/syncbase/nosql"
)
// SyncClientMethods is the client interface
@@ -27,9 +30,11 @@
// PublishSyncGroup is typically invoked on a "central" peer
// to publish the SyncGroup.
PublishSyncGroup(ctx *context.T, sg SyncGroup, opts ...rpc.CallOpt) error
- // JoinSyncGroup is invoked on a SyncGroup Admin and checks if
- // the requestor can join the SyncGroup.
- JoinSyncGroup(*context.T, ...rpc.CallOpt) error
+ // JoinSyncGroupAtAdmin is invoked by a prospective SyncGroup member's
+ // Syncbase on a SyncGroup admin. It checks whether the requestor is
+ // allowed to join the named SyncGroup, and if so, adds the requestor to
+ // the SyncGroup.
+ JoinSyncGroupAtAdmin(ctx *context.T, sgName string, joinerName string, myInfo nosql.SyncGroupMemberInfo, opts ...rpc.CallOpt) (SyncGroup, error)
// BlobSync methods.
// FetchBlob returns the requested blob.
FetchBlob(*context.T, ...rpc.CallOpt) error
@@ -60,8 +65,8 @@
return
}
-func (c implSyncClientStub) JoinSyncGroup(ctx *context.T, opts ...rpc.CallOpt) (err error) {
- err = v23.GetClient(ctx).Call(ctx, c.name, "JoinSyncGroup", nil, nil, opts...)
+func (c implSyncClientStub) JoinSyncGroupAtAdmin(ctx *context.T, i0 string, i1 string, i2 nosql.SyncGroupMemberInfo, opts ...rpc.CallOpt) (o0 SyncGroup, err error) {
+ err = v23.GetClient(ctx).Call(ctx, c.name, "JoinSyncGroupAtAdmin", []interface{}{i0, i1, i2}, []interface{}{&o0}, opts...)
return
}
@@ -83,9 +88,11 @@
// PublishSyncGroup is typically invoked on a "central" peer
// to publish the SyncGroup.
PublishSyncGroup(ctx *context.T, call rpc.ServerCall, sg SyncGroup) error
- // JoinSyncGroup is invoked on a SyncGroup Admin and checks if
- // the requestor can join the SyncGroup.
- JoinSyncGroup(*context.T, rpc.ServerCall) error
+ // JoinSyncGroupAtAdmin is invoked by a prospective SyncGroup member's
+ // Syncbase on a SyncGroup admin. It checks whether the requestor is
+ // allowed to join the named SyncGroup, and if so, adds the requestor to
+ // the SyncGroup.
+ JoinSyncGroupAtAdmin(ctx *context.T, call rpc.ServerCall, sgName string, joinerName string, myInfo nosql.SyncGroupMemberInfo) (SyncGroup, error)
// BlobSync methods.
// FetchBlob returns the requested blob.
FetchBlob(*context.T, rpc.ServerCall) error
@@ -134,8 +141,8 @@
return s.impl.PublishSyncGroup(ctx, call, i0)
}
-func (s implSyncServerStub) JoinSyncGroup(ctx *context.T, call rpc.ServerCall) error {
- return s.impl.JoinSyncGroup(ctx, call)
+func (s implSyncServerStub) JoinSyncGroupAtAdmin(ctx *context.T, call rpc.ServerCall, i0 string, i1 string, i2 nosql.SyncGroupMemberInfo) (SyncGroup, error) {
+ return s.impl.JoinSyncGroupAtAdmin(ctx, call, i0, i1, i2)
}
func (s implSyncServerStub) FetchBlob(ctx *context.T, call rpc.ServerCall) error {
@@ -171,8 +178,16 @@
},
},
{
- Name: "JoinSyncGroup",
- Doc: "// JoinSyncGroup is invoked on a SyncGroup Admin and checks if\n// the requestor can join the SyncGroup.",
+ Name: "JoinSyncGroupAtAdmin",
+ Doc: "// JoinSyncGroupAtAdmin is invoked by a prospective SyncGroup member's\n// Syncbase on a SyncGroup admin. It checks whether the requestor is\n// allowed to join the named SyncGroup, and if so, adds the requestor to\n// the SyncGroup.",
+ InArgs: []rpc.ArgDesc{
+ {"sgName", ``}, // string
+ {"joinerName", ``}, // string
+ {"myInfo", ``}, // nosql.SyncGroupMemberInfo
+ },
+ OutArgs: []rpc.ArgDesc{
+ {"", ``}, // SyncGroup
+ },
},
{
Name: "FetchBlob",
diff --git a/services/syncbase/server/nosql/database_sgm.go b/services/syncbase/server/nosql/database_sgm.go
index b5797b0..62c905a 100644
--- a/services/syncbase/server/nosql/database_sgm.go
+++ b/services/syncbase/server/nosql/database_sgm.go
@@ -35,7 +35,8 @@
if d.batchId != nil {
return wire.SyncGroupSpec{}, wire.NewErrBoundToBatch(ctx)
}
- return wire.SyncGroupSpec{}, verror.NewErrNotImplemented(ctx)
+ sd := vsync.NewSyncDatabase(d)
+ return sd.JoinSyncGroup(ctx, call, sgName, myInfo)
}
func (d *databaseReq) LeaveSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string) error {
diff --git a/services/syncbase/vsync/sync.go b/services/syncbase/vsync/sync.go
index b3f396b..567d588 100644
--- a/services/syncbase/vsync/sync.go
+++ b/services/syncbase/vsync/sync.go
@@ -12,6 +12,7 @@
// records in response to a GetDeltas request, it replays those log
// records to get in sync with the sender.
import (
+ "fmt"
"math/rand"
"sync"
"time"
@@ -27,9 +28,10 @@
// syncService contains the metadata for the sync module.
type syncService struct {
- // TODO(hpucha): see if uniqueid is a better fit. It is 128 bits.
- id int64 // globally unique id for this instance of Syncbase
- sv interfaces.Service
+ // TODO(hpucha): see if "v.io/v23/uniqueid" is a better fit. It is 128 bits.
+ id int64 // globally unique id for this instance of Syncbase
+ name string // name derived from the global id.
+ sv interfaces.Service
// State to coordinate shutdown of spawned goroutines.
pending sync.WaitGroup
@@ -85,6 +87,7 @@
// data.Id is now guaranteed to be initialized.
s.id = data.Id
+ s.name = fmt.Sprintf("%x", s.id)
// Channel to propagate close event to all threads.
s.closed = make(chan struct{})
diff --git a/services/syncbase/vsync/syncgroup.go b/services/syncbase/vsync/syncgroup.go
index f7e5a04..72e8533 100644
--- a/services/syncbase/vsync/syncgroup.go
+++ b/services/syncbase/vsync/syncgroup.go
@@ -10,6 +10,9 @@
// to the higher levels of sync (Initiator, Watcher) to get membership
// information and map key/value changes to their matching SyncGroups.
+// TODO(hpucha): Add high level commentary about the logic behind create/join
+// etc.
+
import (
"fmt"
"strings"
@@ -23,6 +26,8 @@
"v.io/v23/context"
"v.io/v23/rpc"
+ "v.io/v23/security"
+ "v.io/v23/security/access"
"v.io/v23/verror"
"v.io/v23/vom"
"v.io/x/lib/vlog"
@@ -345,22 +350,21 @@
// TODO(hpucha): Pass blessings along.
func (sd *syncDatabase) CreateSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string, spec wire.SyncGroupSpec, myInfo wire.SyncGroupMemberInfo) error {
- err := store.RunInTransaction(sd.db.St(), func(st store.StoreReadWriter) error {
+ err := store.RunInTransaction(sd.db.St(), func(tx store.StoreReadWriter) error {
// Check permissions on Database.
- if err := sd.db.CheckPermsInternal(ctx, call, st); err != nil {
+ if err := sd.db.CheckPermsInternal(ctx, call, tx); err != nil {
return err
}
// TODO(hpucha): Check prefix ACLs on all SG prefixes.
// This may need another method on util.Database interface.
- // TODO(hpucha): Do some SGACL checking. Check creator
+ // TODO(hpucha): Do some SG ACL checking. Check creator
// has Admin privilege.
- // Get this Syncbase's id.
+ // Get this Syncbase's sync module handle.
ss := sd.db.App().Service().Sync().(*syncService)
- name := fmt.Sprintf("%x", ss.id)
// Instantiate sg. Add self as joiner.
sg := &interfaces.SyncGroup{
@@ -368,21 +372,21 @@
Name: sgName,
SpecVersion: newSyncGroupVersion(),
Spec: spec,
- Creator: name,
+ Creator: ss.name,
AppName: sd.db.App().Name(),
DbName: sd.db.Name(),
Status: interfaces.SyncGroupStatusPublishPending,
- Joiners: map[string]wire.SyncGroupMemberInfo{name: myInfo},
+ Joiners: map[string]wire.SyncGroupMemberInfo{ss.name: myInfo},
}
- if err := addSyncGroup(ctx, st, sg); err != nil {
+ if err := addSyncGroup(ctx, tx, sg); err != nil {
return err
}
// TODO(hpucha): Bootstrap DAG/Genvector etc for syncing the SG metadata.
// Take a snapshot of the data to bootstrap the SyncGroup.
- if err := bootstrapSyncGroup(st, spec.Prefixes); err != nil {
+ if err := bootstrapSyncGroup(tx, spec.Prefixes); err != nil {
return err
}
@@ -404,6 +408,98 @@
return nil
}
+// TODO(hpucha): Pass blessings along.
+func (sd *syncDatabase) JoinSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string, myInfo wire.SyncGroupMemberInfo) (wire.SyncGroupSpec, error) {
+ var sgErr error
+ var sg *interfaces.SyncGroup
+ nullSpec := wire.SyncGroupSpec{}
+
+ err := store.RunInTransaction(sd.db.St(), func(tx store.StoreReadWriter) error {
+ // Check permissions on Database.
+ if err := sd.db.CheckPermsInternal(ctx, call, tx); err != nil {
+ return err
+ }
+
+ // Check if SyncGroup already exists.
+ sg, sgErr = getSyncGroupByName(ctx, tx, sgName)
+ if sgErr != nil {
+ return sgErr
+ }
+
+ // SyncGroup already exists. Possibilities include created
+ // locally, already joined locally or published at the device as
+ // a result of SyncGroup creation on a different device.
+ //
+ // TODO(hpucha): Handle the above cases. If the SG was published
+ // locally, but not joined, we need to bootstrap the DAG and
+ // watcher. If multiple joins are done locally, we may want to
+ // ref count the SG state and track the leaves accordingly. So
+ // we may need to add some local state for each SyncGroup.
+
+ // Check SG ACL.
+ return authorize(ctx, call.Security(), sg)
+ })
+
+ // The presented blessing is allowed to make this Syncbase instance join
+ // the specified SyncGroup, but this Syncbase instance has in fact
+ // already joined the SyncGroup. Join is idempotent, so we simply return
+ // the spec to indicate success.
+ if err == nil {
+ return sg.Spec, nil
+ }
+
+ // Join is not allowed (possibilities include Database permissions check
+ // failed, SG ACL check failed or error during fetching SG information).
+ if verror.ErrorID(sgErr) != verror.ErrNoExist.ID {
+ return nullSpec, err
+ }
+
+ // Brand new join.
+
+ // Get this Syncbase's sync module handle.
+ ss := sd.db.App().Service().Sync().(*syncService)
+
+ // Contact a SyncGroup Admin to join the SyncGroup.
+ *sg, err = sd.joinSyncGroupAtAdmin(ctx, call, sgName, ss.name, myInfo)
+ if err != nil {
+ return nullSpec, err
+ }
+
+ // Verify that the app/db combination is valid for this SyncGroup.
+ if sg.AppName != sd.db.App().Name() || sg.DbName != sd.db.Name() {
+ return nullSpec, verror.New(verror.ErrBadArg, ctx, "bad app/db with syncgroup")
+ }
+
+ err = store.RunInTransaction(sd.db.St(), func(tx store.StoreReadWriter) error {
+
+ // TODO(hpucha): Bootstrap DAG/Genvector etc for syncing the SG metadata.
+
+ // TODO(hpucha): Get SG Deltas from Admin device.
+
+ if err := addSyncGroup(ctx, tx, sg); err != nil {
+ return err
+ }
+
+ // Take a snapshot of the data to bootstrap the SyncGroup.
+ if err := bootstrapSyncGroup(tx, sg.Spec.Prefixes); err != nil {
+ return err
+ }
+
+ // TODO(hpucha): Add a watch notification to signal new SG.
+
+ return nil
+ })
+
+ if err != nil {
+ return nullSpec, err
+ }
+
+ // Publish at the chosen mount table and in the neighborhood.
+ sd.publishInMountTables(ctx, call, sg.Spec)
+
+ return sg.Spec, nil
+}
+
//////////////////////////////
// Helper functions
@@ -436,15 +532,15 @@
// Publish rejected. Persist that to avoid retrying in the
// future and to remember the split universe scenario.
- err = store.RunInTransaction(sd.db.St(), func(st store.StoreReadWriter) error {
+ err = store.RunInTransaction(sd.db.St(), func(tx store.StoreReadWriter) error {
// Ensure SG still exists.
- sg, err := getSyncGroupByName(ctx, st, sgName)
+ sg, err := getSyncGroupByName(ctx, tx, sgName)
if err != nil {
return err
}
sg.Status = interfaces.SyncGroupStatusPublishRejected
- return setSGDataEntry(ctx, st, sg.Id, sg)
+ return setSGDataEntry(ctx, tx, sg.Id, sg)
})
return err
}
@@ -487,6 +583,21 @@
return nil
}
+func (sd *syncDatabase) joinSyncGroupAtAdmin(ctx *context.T, call rpc.ServerCall, sgName, name string, myInfo wire.SyncGroupMemberInfo) (interfaces.SyncGroup, error) {
+ c := interfaces.SyncClient(sgName)
+ return c.JoinSyncGroupAtAdmin(ctx, sgName, name, myInfo)
+
+ // TODO(hpucha): Try to join using an Admin on neighborhood if the publisher is not reachable.
+}
+
+func authorize(ctx *context.T, call security.Call, sg *interfaces.SyncGroup) error {
+ auth := access.TypicalTagTypePermissionsAuthorizer(sg.Spec.Perms)
+ if err := auth.Authorize(ctx, call); err != nil {
+ return verror.New(verror.ErrNoAccess, ctx, err)
+ }
+ return nil
+}
+
////////////////////////////////////////////////////////////
// Methods for SyncGroup create/join between Syncbases.
@@ -502,8 +613,8 @@
return err
}
- err = store.RunInTransaction(db.St(), func(st store.StoreReadWriter) error {
- localSG, err := getSyncGroupByName(ctx, st, sg.Name)
+ err = store.RunInTransaction(db.St(), func(tx store.StoreReadWriter) error {
+ localSG, err := getSyncGroupByName(ctx, tx, sg.Name)
if err != nil && verror.ErrorID(err) != verror.ErrNoExist.ID {
return err
@@ -523,7 +634,7 @@
if err == nil && localSG.Id == sg.Id {
if localSG.Status == interfaces.SyncGroupStatusPublishPending {
localSG.Status = interfaces.SyncGroupStatusRunning
- return setSGDataEntry(ctx, st, localSG.Id, localSG)
+ return setSGDataEntry(ctx, tx, localSG.Id, localSG)
}
return nil
}
@@ -533,16 +644,61 @@
// TODO(hpucha): Use some ACL check to allow/deny publishing.
// TODO(hpucha): Ensure node is on Admin ACL.
- name := fmt.Sprintf("%x", s.id)
// TODO(hpucha): Default priority?
- sg.Joiners[name] = wire.SyncGroupMemberInfo{}
+ sg.Joiners[s.name] = wire.SyncGroupMemberInfo{}
sg.Status = interfaces.SyncGroupStatusRunning
- return addSyncGroup(ctx, st, &sg)
+ return addSyncGroup(ctx, tx, &sg)
})
return err
}
-func (s *syncService) JoinSyncGroup(ctx *context.T, call rpc.ServerCall) error {
- return verror.NewErrNotImplemented(ctx)
+func (s *syncService) JoinSyncGroupAtAdmin(ctx *context.T, call rpc.ServerCall, sgName, joinerName string, joinerInfo wire.SyncGroupMemberInfo) (interfaces.SyncGroup, error) {
+ var dbSt store.Store
+ var gid interfaces.GroupId
+ var err error
+
+ // Find the database store for this SyncGroup.
+ //
+ // TODO(hpucha): At a high level, we have yet to decide if the SG name
+ // is stand-alone or is derived from the app/db namespace, based on the
+ // feedback from app developers (see discussion in SyncGroup API
+ // doc). If we decide to keep the SG name as stand-alone, this scan can
+ // be optimized by a lazy cache of sgname to <app, db> info.
+ s.forEachDatabaseStore(ctx, func(st store.Store) bool {
+ if gid, err = getSyncGroupId(ctx, st, sgName); err == nil {
+ // Found the SyncGroup being looked for.
+ dbSt = st
+ return true
+ }
+ return false
+ })
+
+ // SyncGroup not found.
+ if err != nil {
+ return interfaces.SyncGroup{}, verror.New(verror.ErrNoExist, ctx, "SyncGroup not found", sgName)
+ }
+
+ var sg *interfaces.SyncGroup
+ err = store.RunInTransaction(dbSt, func(tx store.StoreReadWriter) error {
+ var err error
+ sg, err = getSyncGroupById(ctx, tx, gid)
+ if err != nil {
+ return err
+ }
+
+ // Check SG ACL.
+ if err := authorize(ctx, call.Security(), sg); err != nil {
+ return err
+ }
+
+ // Add to joiner list.
+ sg.Joiners[joinerName] = joinerInfo
+ return setSGDataEntry(ctx, tx, sg.Id, sg)
+ })
+
+ if err != nil {
+ return interfaces.SyncGroup{}, err
+ }
+ return *sg, nil
}