syncbase/vsync: Handle JoinSyncGroup RPC from client-syncbase.

Change-Id: Ic535383816b9db05bb96ab7ceadab56aa59fde65
diff --git a/services/syncbase/server/interfaces/sync.vdl b/services/syncbase/server/interfaces/sync.vdl
index 7421b48..fe64066 100644
--- a/services/syncbase/server/interfaces/sync.vdl
+++ b/services/syncbase/server/interfaces/sync.vdl
@@ -4,6 +4,10 @@
 
 package interfaces
 
+import (
+	wire "v.io/syncbase/v23/services/syncbase/nosql"
+)
+
 // Sync defines methods for data exchange between Syncbases.
 // TODO(hpucha): Flesh this out further.
 type Sync interface {
@@ -18,9 +22,11 @@
 	// to publish the SyncGroup.
 	PublishSyncGroup(sg SyncGroup) error
 
-	// JoinSyncGroup is invoked on a SyncGroup Admin and checks if
-	// the requestor can join the SyncGroup.
-	JoinSyncGroup() error
+	// JoinSyncGroupAtAdmin is invoked by a prospective SyncGroup member's
+	// Syncbase on a SyncGroup admin. It checks whether the requestor is
+	// allowed to join the named SyncGroup, and if so, adds the requestor to
+	// the SyncGroup.
+	JoinSyncGroupAtAdmin(sgName, joinerName string, myInfo wire.SyncGroupMemberInfo) (SyncGroup | error)
 
 	// BlobSync methods.
 	// FetchBlob returns the requested blob.
diff --git a/services/syncbase/server/interfaces/sync.vdl.go b/services/syncbase/server/interfaces/sync.vdl.go
index 37c9a1e..de16181 100644
--- a/services/syncbase/server/interfaces/sync.vdl.go
+++ b/services/syncbase/server/interfaces/sync.vdl.go
@@ -12,6 +12,9 @@
 	"v.io/v23"
 	"v.io/v23/context"
 	"v.io/v23/rpc"
+
+	// VDL user imports
+	"v.io/syncbase/v23/services/syncbase/nosql"
 )
 
 // SyncClientMethods is the client interface
@@ -27,9 +30,11 @@
 	// PublishSyncGroup is typically invoked on a "central" peer
 	// to publish the SyncGroup.
 	PublishSyncGroup(ctx *context.T, sg SyncGroup, opts ...rpc.CallOpt) error
-	// JoinSyncGroup is invoked on a SyncGroup Admin and checks if
-	// the requestor can join the SyncGroup.
-	JoinSyncGroup(*context.T, ...rpc.CallOpt) error
+	// JoinSyncGroupAtAdmin is invoked by a prospective SyncGroup member's
+	// Syncbase on a SyncGroup admin. It checks whether the requestor is
+	// allowed to join the named SyncGroup, and if so, adds the requestor to
+	// the SyncGroup.
+	JoinSyncGroupAtAdmin(ctx *context.T, sgName string, joinerName string, myInfo nosql.SyncGroupMemberInfo, opts ...rpc.CallOpt) (SyncGroup, error)
 	// BlobSync methods.
 	// FetchBlob returns the requested blob.
 	FetchBlob(*context.T, ...rpc.CallOpt) error
@@ -60,8 +65,8 @@
 	return
 }
 
-func (c implSyncClientStub) JoinSyncGroup(ctx *context.T, opts ...rpc.CallOpt) (err error) {
-	err = v23.GetClient(ctx).Call(ctx, c.name, "JoinSyncGroup", nil, nil, opts...)
+func (c implSyncClientStub) JoinSyncGroupAtAdmin(ctx *context.T, i0 string, i1 string, i2 nosql.SyncGroupMemberInfo, opts ...rpc.CallOpt) (o0 SyncGroup, err error) {
+	err = v23.GetClient(ctx).Call(ctx, c.name, "JoinSyncGroupAtAdmin", []interface{}{i0, i1, i2}, []interface{}{&o0}, opts...)
 	return
 }
 
@@ -83,9 +88,11 @@
 	// PublishSyncGroup is typically invoked on a "central" peer
 	// to publish the SyncGroup.
 	PublishSyncGroup(ctx *context.T, call rpc.ServerCall, sg SyncGroup) error
-	// JoinSyncGroup is invoked on a SyncGroup Admin and checks if
-	// the requestor can join the SyncGroup.
-	JoinSyncGroup(*context.T, rpc.ServerCall) error
+	// JoinSyncGroupAtAdmin is invoked by a prospective SyncGroup member's
+	// Syncbase on a SyncGroup admin. It checks whether the requestor is
+	// allowed to join the named SyncGroup, and if so, adds the requestor to
+	// the SyncGroup.
+	JoinSyncGroupAtAdmin(ctx *context.T, call rpc.ServerCall, sgName string, joinerName string, myInfo nosql.SyncGroupMemberInfo) (SyncGroup, error)
 	// BlobSync methods.
 	// FetchBlob returns the requested blob.
 	FetchBlob(*context.T, rpc.ServerCall) error
@@ -134,8 +141,8 @@
 	return s.impl.PublishSyncGroup(ctx, call, i0)
 }
 
-func (s implSyncServerStub) JoinSyncGroup(ctx *context.T, call rpc.ServerCall) error {
-	return s.impl.JoinSyncGroup(ctx, call)
+func (s implSyncServerStub) JoinSyncGroupAtAdmin(ctx *context.T, call rpc.ServerCall, i0 string, i1 string, i2 nosql.SyncGroupMemberInfo) (SyncGroup, error) {
+	return s.impl.JoinSyncGroupAtAdmin(ctx, call, i0, i1, i2)
 }
 
 func (s implSyncServerStub) FetchBlob(ctx *context.T, call rpc.ServerCall) error {
@@ -171,8 +178,16 @@
 			},
 		},
 		{
-			Name: "JoinSyncGroup",
-			Doc:  "// JoinSyncGroup is invoked on a SyncGroup Admin and checks if\n// the requestor can join the SyncGroup.",
+			Name: "JoinSyncGroupAtAdmin",
+			Doc:  "// JoinSyncGroupAtAdmin is invoked by a prospective SyncGroup member's\n// Syncbase on a SyncGroup admin. It checks whether the requestor is\n// allowed to join the named SyncGroup, and if so, adds the requestor to\n// the SyncGroup.",
+			InArgs: []rpc.ArgDesc{
+				{"sgName", ``},     // string
+				{"joinerName", ``}, // string
+				{"myInfo", ``},     // nosql.SyncGroupMemberInfo
+			},
+			OutArgs: []rpc.ArgDesc{
+				{"", ``}, // SyncGroup
+			},
 		},
 		{
 			Name: "FetchBlob",
diff --git a/services/syncbase/server/nosql/database_sgm.go b/services/syncbase/server/nosql/database_sgm.go
index b5797b0..62c905a 100644
--- a/services/syncbase/server/nosql/database_sgm.go
+++ b/services/syncbase/server/nosql/database_sgm.go
@@ -35,7 +35,8 @@
 	if d.batchId != nil {
 		return wire.SyncGroupSpec{}, wire.NewErrBoundToBatch(ctx)
 	}
-	return wire.SyncGroupSpec{}, verror.NewErrNotImplemented(ctx)
+	sd := vsync.NewSyncDatabase(d)
+	return sd.JoinSyncGroup(ctx, call, sgName, myInfo)
 }
 
 func (d *databaseReq) LeaveSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string) error {
diff --git a/services/syncbase/vsync/sync.go b/services/syncbase/vsync/sync.go
index b3f396b..567d588 100644
--- a/services/syncbase/vsync/sync.go
+++ b/services/syncbase/vsync/sync.go
@@ -12,6 +12,7 @@
 // records in response to a GetDeltas request, it replays those log
 // records to get in sync with the sender.
 import (
+	"fmt"
 	"math/rand"
 	"sync"
 	"time"
@@ -27,9 +28,10 @@
 
 // syncService contains the metadata for the sync module.
 type syncService struct {
-	// TODO(hpucha): see if uniqueid is a better fit. It is 128 bits.
-	id int64 // globally unique id for this instance of Syncbase
-	sv interfaces.Service
+	// TODO(hpucha): see if "v.io/v23/uniqueid" is a better fit. It is 128 bits.
+	id   int64  // globally unique id for this instance of Syncbase
+	name string // name derived from the global id.
+	sv   interfaces.Service
 
 	// State to coordinate shutdown of spawned goroutines.
 	pending sync.WaitGroup
@@ -85,6 +87,7 @@
 
 	// data.Id is now guaranteed to be initialized.
 	s.id = data.Id
+	s.name = fmt.Sprintf("%x", s.id)
 
 	// Channel to propagate close event to all threads.
 	s.closed = make(chan struct{})
diff --git a/services/syncbase/vsync/syncgroup.go b/services/syncbase/vsync/syncgroup.go
index f7e5a04..72e8533 100644
--- a/services/syncbase/vsync/syncgroup.go
+++ b/services/syncbase/vsync/syncgroup.go
@@ -10,6 +10,9 @@
 // to the higher levels of sync (Initiator, Watcher) to get membership
 // information and map key/value changes to their matching SyncGroups.
 
+// TODO(hpucha): Add high level commentary about the logic behind create/join
+// etc.
+
 import (
 	"fmt"
 	"strings"
@@ -23,6 +26,8 @@
 
 	"v.io/v23/context"
 	"v.io/v23/rpc"
+	"v.io/v23/security"
+	"v.io/v23/security/access"
 	"v.io/v23/verror"
 	"v.io/v23/vom"
 	"v.io/x/lib/vlog"
@@ -345,22 +350,21 @@
 
 // TODO(hpucha): Pass blessings along.
 func (sd *syncDatabase) CreateSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string, spec wire.SyncGroupSpec, myInfo wire.SyncGroupMemberInfo) error {
-	err := store.RunInTransaction(sd.db.St(), func(st store.StoreReadWriter) error {
+	err := store.RunInTransaction(sd.db.St(), func(tx store.StoreReadWriter) error {
 
 		// Check permissions on Database.
-		if err := sd.db.CheckPermsInternal(ctx, call, st); err != nil {
+		if err := sd.db.CheckPermsInternal(ctx, call, tx); err != nil {
 			return err
 		}
 
 		// TODO(hpucha): Check prefix ACLs on all SG prefixes.
 		// This may need another method on util.Database interface.
 
-		// TODO(hpucha): Do some SGACL checking. Check creator
+		// TODO(hpucha): Do some SG ACL checking. Check creator
 		// has Admin privilege.
 
-		// Get this Syncbase's id.
+		// Get this Syncbase's sync module handle.
 		ss := sd.db.App().Service().Sync().(*syncService)
-		name := fmt.Sprintf("%x", ss.id)
 
 		// Instantiate sg. Add self as joiner.
 		sg := &interfaces.SyncGroup{
@@ -368,21 +372,21 @@
 			Name:        sgName,
 			SpecVersion: newSyncGroupVersion(),
 			Spec:        spec,
-			Creator:     name,
+			Creator:     ss.name,
 			AppName:     sd.db.App().Name(),
 			DbName:      sd.db.Name(),
 			Status:      interfaces.SyncGroupStatusPublishPending,
-			Joiners:     map[string]wire.SyncGroupMemberInfo{name: myInfo},
+			Joiners:     map[string]wire.SyncGroupMemberInfo{ss.name: myInfo},
 		}
 
-		if err := addSyncGroup(ctx, st, sg); err != nil {
+		if err := addSyncGroup(ctx, tx, sg); err != nil {
 			return err
 		}
 
 		// TODO(hpucha): Bootstrap DAG/Genvector etc for syncing the SG metadata.
 
 		// Take a snapshot of the data to bootstrap the SyncGroup.
-		if err := bootstrapSyncGroup(st, spec.Prefixes); err != nil {
+		if err := bootstrapSyncGroup(tx, spec.Prefixes); err != nil {
 			return err
 		}
 
@@ -404,6 +408,98 @@
 	return nil
 }
 
+// TODO(hpucha): Pass blessings along.
+func (sd *syncDatabase) JoinSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string, myInfo wire.SyncGroupMemberInfo) (wire.SyncGroupSpec, error) {
+	var sgErr error
+	var sg *interfaces.SyncGroup
+	nullSpec := wire.SyncGroupSpec{}
+
+	err := store.RunInTransaction(sd.db.St(), func(tx store.StoreReadWriter) error {
+		// Check permissions on Database.
+		if err := sd.db.CheckPermsInternal(ctx, call, tx); err != nil {
+			return err
+		}
+
+		// Check if SyncGroup already exists.
+		sg, sgErr = getSyncGroupByName(ctx, tx, sgName)
+		if sgErr != nil {
+			return sgErr
+		}
+
+		// SyncGroup already exists. Possibilities include created
+		// locally, already joined locally or published at the device as
+		// a result of SyncGroup creation on a different device.
+		//
+		// TODO(hpucha): Handle the above cases. If the SG was published
+		// locally, but not joined, we need to bootstrap the DAG and
+		// watcher. If multiple joins are done locally, we may want to
+		// ref count the SG state and track the leaves accordingly. So
+		// we may need to add some local state for each SyncGroup.
+
+		// Check SG ACL.
+		return authorize(ctx, call.Security(), sg)
+	})
+
+	// The presented blessing is allowed to make this Syncbase instance join
+	// the specified SyncGroup, but this Syncbase instance has in fact
+	// already joined the SyncGroup. Join is idempotent, so we simply return
+	// the spec to indicate success.
+	if err == nil {
+		return sg.Spec, nil
+	}
+
+	// Join is not allowed (possibilities include Database permissions check
+	// failed, SG ACL check failed or error during fetching SG information).
+	if verror.ErrorID(sgErr) != verror.ErrNoExist.ID {
+		return nullSpec, err
+	}
+
+	// Brand new join.
+
+	// Get this Syncbase's sync module handle.
+	ss := sd.db.App().Service().Sync().(*syncService)
+
+	// Contact a SyncGroup Admin to join the SyncGroup.
+	*sg, err = sd.joinSyncGroupAtAdmin(ctx, call, sgName, ss.name, myInfo)
+	if err != nil {
+		return nullSpec, err
+	}
+
+	// Verify that the app/db combination is valid for this SyncGroup.
+	if sg.AppName != sd.db.App().Name() || sg.DbName != sd.db.Name() {
+		return nullSpec, verror.New(verror.ErrBadArg, ctx, "bad app/db with syncgroup")
+	}
+
+	err = store.RunInTransaction(sd.db.St(), func(tx store.StoreReadWriter) error {
+
+		// TODO(hpucha): Bootstrap DAG/Genvector etc for syncing the SG metadata.
+
+		// TODO(hpucha): Get SG Deltas from Admin device.
+
+		if err := addSyncGroup(ctx, tx, sg); err != nil {
+			return err
+		}
+
+		// Take a snapshot of the data to bootstrap the SyncGroup.
+		if err := bootstrapSyncGroup(tx, sg.Spec.Prefixes); err != nil {
+			return err
+		}
+
+		// TODO(hpucha): Add a watch notification to signal new SG.
+
+		return nil
+	})
+
+	if err != nil {
+		return nullSpec, err
+	}
+
+	// Publish at the chosen mount table and in the neighborhood.
+	sd.publishInMountTables(ctx, call, sg.Spec)
+
+	return sg.Spec, nil
+}
+
 //////////////////////////////
 // Helper functions
 
@@ -436,15 +532,15 @@
 
 	// Publish rejected. Persist that to avoid retrying in the
 	// future and to remember the split universe scenario.
-	err = store.RunInTransaction(sd.db.St(), func(st store.StoreReadWriter) error {
+	err = store.RunInTransaction(sd.db.St(), func(tx store.StoreReadWriter) error {
 		// Ensure SG still exists.
-		sg, err := getSyncGroupByName(ctx, st, sgName)
+		sg, err := getSyncGroupByName(ctx, tx, sgName)
 		if err != nil {
 			return err
 		}
 
 		sg.Status = interfaces.SyncGroupStatusPublishRejected
-		return setSGDataEntry(ctx, st, sg.Id, sg)
+		return setSGDataEntry(ctx, tx, sg.Id, sg)
 	})
 	return err
 }
@@ -487,6 +583,21 @@
 	return nil
 }
 
+func (sd *syncDatabase) joinSyncGroupAtAdmin(ctx *context.T, call rpc.ServerCall, sgName, name string, myInfo wire.SyncGroupMemberInfo) (interfaces.SyncGroup, error) {
+	c := interfaces.SyncClient(sgName)
+	return c.JoinSyncGroupAtAdmin(ctx, sgName, name, myInfo)
+
+	// TODO(hpucha): Try to join using an Admin on neighborhood if the publisher is not reachable.
+}
+
+func authorize(ctx *context.T, call security.Call, sg *interfaces.SyncGroup) error {
+	auth := access.TypicalTagTypePermissionsAuthorizer(sg.Spec.Perms)
+	if err := auth.Authorize(ctx, call); err != nil {
+		return verror.New(verror.ErrNoAccess, ctx, err)
+	}
+	return nil
+}
+
 ////////////////////////////////////////////////////////////
 // Methods for SyncGroup create/join between Syncbases.
 
@@ -502,8 +613,8 @@
 		return err
 	}
 
-	err = store.RunInTransaction(db.St(), func(st store.StoreReadWriter) error {
-		localSG, err := getSyncGroupByName(ctx, st, sg.Name)
+	err = store.RunInTransaction(db.St(), func(tx store.StoreReadWriter) error {
+		localSG, err := getSyncGroupByName(ctx, tx, sg.Name)
 
 		if err != nil && verror.ErrorID(err) != verror.ErrNoExist.ID {
 			return err
@@ -523,7 +634,7 @@
 		if err == nil && localSG.Id == sg.Id {
 			if localSG.Status == interfaces.SyncGroupStatusPublishPending {
 				localSG.Status = interfaces.SyncGroupStatusRunning
-				return setSGDataEntry(ctx, st, localSG.Id, localSG)
+				return setSGDataEntry(ctx, tx, localSG.Id, localSG)
 			}
 			return nil
 		}
@@ -533,16 +644,61 @@
 		// TODO(hpucha): Use some ACL check to allow/deny publishing.
 		// TODO(hpucha): Ensure node is on Admin ACL.
 
-		name := fmt.Sprintf("%x", s.id)
 		// TODO(hpucha): Default priority?
-		sg.Joiners[name] = wire.SyncGroupMemberInfo{}
+		sg.Joiners[s.name] = wire.SyncGroupMemberInfo{}
 		sg.Status = interfaces.SyncGroupStatusRunning
-		return addSyncGroup(ctx, st, &sg)
+		return addSyncGroup(ctx, tx, &sg)
 	})
 
 	return err
 }
 
-func (s *syncService) JoinSyncGroup(ctx *context.T, call rpc.ServerCall) error {
-	return verror.NewErrNotImplemented(ctx)
+func (s *syncService) JoinSyncGroupAtAdmin(ctx *context.T, call rpc.ServerCall, sgName, joinerName string, joinerInfo wire.SyncGroupMemberInfo) (interfaces.SyncGroup, error) {
+	var dbSt store.Store
+	var gid interfaces.GroupId
+	var err error
+
+	// Find the database store for this SyncGroup.
+	//
+	// TODO(hpucha): At a high level, we have yet to decide if the SG name
+	// is stand-alone or is derived from the app/db namespace, based on the
+	// feedback from app developers (see discussion in SyncGroup API
+	// doc). If we decide to keep the SG name as stand-alone, this scan can
+	// be optimized by a lazy cache of sgname to <app, db> info.
+	s.forEachDatabaseStore(ctx, func(st store.Store) bool {
+		if gid, err = getSyncGroupId(ctx, st, sgName); err == nil {
+			// Found the SyncGroup being looked for.
+			dbSt = st
+			return true
+		}
+		return false
+	})
+
+	// SyncGroup not found.
+	if err != nil {
+		return interfaces.SyncGroup{}, verror.New(verror.ErrNoExist, ctx, "SyncGroup not found", sgName)
+	}
+
+	var sg *interfaces.SyncGroup
+	err = store.RunInTransaction(dbSt, func(tx store.StoreReadWriter) error {
+		var err error
+		sg, err = getSyncGroupById(ctx, tx, gid)
+		if err != nil {
+			return err
+		}
+
+		// Check SG ACL.
+		if err := authorize(ctx, call.Security(), sg); err != nil {
+			return err
+		}
+
+		// Add to joiner list.
+		sg.Joiners[joinerName] = joinerInfo
+		return setSGDataEntry(ctx, tx, sg.Id, sg)
+	})
+
+	if err != nil {
+		return interfaces.SyncGroup{}, err
+	}
+	return *sg, nil
 }