syncbase: Integrate log and gen vector functionality (Part 1).
Change-Id: Id538ba542def314885120bc29d5d258d4c13e32c
diff --git a/services/syncbase/server/interfaces/sync_types.vdl b/services/syncbase/server/interfaces/sync_types.vdl
index ffb2b40..a050e45 100644
--- a/services/syncbase/server/interfaces/sync_types.vdl
+++ b/services/syncbase/server/interfaces/sync_types.vdl
@@ -5,6 +5,8 @@
package interfaces
import (
+ "time"
+
wire "v.io/syncbase/v23/services/syncbase/nosql"
)
@@ -12,6 +14,61 @@
NoGroupId = GroupId(0)
)
+// TODO(hpucha): These are not final yet. This is an intermediate step.
+
+const (
+ // NodeRec type log record adds a new node in the dag.
+ NodeRec = byte(0)
+ // LinkRec type log record adds a new link in the dag.
+ LinkRec = byte(1)
+)
+
+// PrefixGenVector is the generation vector for a data prefix, which maps each
+// device id to its last locally known generation in the scope of that prefix.
+type PrefixGenVector map[uint64]uint64
+
+// GenVector is the generation vector for a Database, and maps prefixes to their
+// generation vectors. Note that the prefixes in a GenVector are global prefixes
+// that include the appropriate Application and Database name.
+type GenVector map[string]PrefixGenVector
+
+// LogRecMetadata represents the metadata of a single log record that is
+// exchanged between two peers. Each log record represents a change made to an
+// object in the store. The log record metadata consists of Id, the device id
+// that created the log record, Gen, the generation number for that log record,
+// and RecType, the type of log record. It also contains information relevant to
+// the update to the object in the store: ObjId is the id of the object that was
+// updated. CurVers is the current version number of the object. Parents can
+// contain 0, 1 or 2 parent versions that the current version is derived from.
+// SyncTime is the timestamp when the update is committed to the Store.
+// Delete indicates whether the update resulted in the object being
+// deleted from the store. BatchId is the unique id of the Batch this update
+// belongs to. BatchCount is the number of objects in the Batch.
+//
+// TODO(hpucha): Add readset/scanset.
+type LogRecMetadata struct {
+ // Log related information.
+ Id uint64
+ Gen uint64
+ RecType byte
+
+ // Object related information.
+ ObjId string
+ CurVers string
+ Parents []string
+ SyncTime time.Time
+ Delete bool
+ BatchId uint64
+ BatchCount uint64
+}
+
+// LogRec represents the on-wire representation of an entire log record: its
+// metadata and data. Value is the actual value of a store object.
+type LogRec struct {
+ Metadata LogRecMetadata
+ Value []byte
+}
+
// GroupId is a globally unique SyncGroup ID.
type GroupId uint64
diff --git a/services/syncbase/server/interfaces/sync_types.vdl.go b/services/syncbase/server/interfaces/sync_types.vdl.go
index 3c7e2dd..e188a35 100644
--- a/services/syncbase/server/interfaces/sync_types.vdl.go
+++ b/services/syncbase/server/interfaces/sync_types.vdl.go
@@ -13,9 +13,76 @@
"v.io/v23/vdl"
// VDL user imports
+ "time"
"v.io/syncbase/v23/services/syncbase/nosql"
+ _ "v.io/v23/vdlroot/time"
)
+// PrefixGenVector is the generation vector for a data prefix, which maps each
+// device id to its last locally known generation in the scope of that prefix.
+type PrefixGenVector map[uint64]uint64
+
+func (PrefixGenVector) __VDLReflect(struct {
+ Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/server/interfaces.PrefixGenVector"`
+}) {
+}
+
+// GenVector is the generation vector for a Database, and maps prefixes to their
+// generation vectors. Note that the prefixes in a GenVector are global prefixes
+// that include the appropriate Application and Database name.
+type GenVector map[string]PrefixGenVector
+
+func (GenVector) __VDLReflect(struct {
+ Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/server/interfaces.GenVector"`
+}) {
+}
+
+// LogRecMetadata represents the metadata of a single log record that is
+// exchanged between two peers. Each log record represents a change made to an
+// object in the store. The log record metadata consists of Id, the device id
+// that created the log record, Gen, the generation number for that log record,
+// and RecType, the type of log record. It also contains information relevant to
+// the update to the object in the store: ObjId is the id of the object that was
+// updated. CurVers is the current version number of the object. Parents can
+// contain 0, 1 or 2 parent versions that the current version is derived from.
+// SyncTime is the timestamp when the update is committed to the Store.
+// Delete indicates whether the update resulted in the object being
+// deleted from the store. BatchId is the unique id of the Batch this update
+// belongs to. BatchCount is the number of objects in the Batch.
+//
+// TODO(hpucha): Add readset/scanset.
+type LogRecMetadata struct {
+ // Log related information.
+ Id uint64
+ Gen uint64
+ RecType byte
+ // Object related information.
+ ObjId string
+ CurVers string
+ Parents []string
+ SyncTime time.Time
+ Delete bool
+ BatchId uint64
+ BatchCount uint64
+}
+
+func (LogRecMetadata) __VDLReflect(struct {
+ Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/server/interfaces.LogRecMetadata"`
+}) {
+}
+
+// LogRec represents the on-wire representation of an entire log record: its
+// metadata and data. Value is the actual value of a store object.
+type LogRec struct {
+ Metadata LogRecMetadata
+ Value []byte
+}
+
+func (LogRec) __VDLReflect(struct {
+ Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/server/interfaces.LogRec"`
+}) {
+}
+
// GroupId is a globally unique SyncGroup ID.
type GroupId uint64
@@ -97,9 +164,19 @@
}
func init() {
+ vdl.Register((*PrefixGenVector)(nil))
+ vdl.Register((*GenVector)(nil))
+ vdl.Register((*LogRecMetadata)(nil))
+ vdl.Register((*LogRec)(nil))
vdl.Register((*GroupId)(nil))
vdl.Register((*SyncGroupStatus)(nil))
vdl.Register((*SyncGroup)(nil))
}
const NoGroupId = GroupId(0)
+
+// NodeRec type log record adds a new node in the dag.
+const NodeRec = byte(0)
+
+// LinkRec type log record adds a new link in the dag.
+const LinkRec = byte(1)
diff --git a/services/syncbase/vsync/sync.go b/services/syncbase/vsync/sync.go
index bded17b..92d3d23 100644
--- a/services/syncbase/vsync/sync.go
+++ b/services/syncbase/vsync/sync.go
@@ -29,7 +29,7 @@
// syncService contains the metadata for the sync module.
type syncService struct {
// TODO(hpucha): see if "v.io/v23/uniqueid" is a better fit. It is 128 bits.
- id int64 // globally unique id for this instance of Syncbase
+ id uint64 // globally unique id for this instance of Syncbase
name string // name derived from the global id.
sv interfaces.Service
@@ -48,6 +48,11 @@
// In-memory sync membership info aggregated across databases.
allMembers *memberView
+ // In-memory sync state per Database. This state is populated at
+ // startup, and periodically persisted by the initiator.
+ syncState map[string]*dbSyncStateInMem
+ syncStateLock sync.Mutex // lock to protect access to the sync state.
+
// In-memory tracking of batches during their construction.
// The sync Initiator and Watcher build batches incrementally here
// and then persist them in DAG batch entries. The mutex guards
@@ -94,7 +99,7 @@
}
// First invocation of vsync.New().
// TODO(sadovsky): Maybe move guid generation and storage to serviceData.
- data.Id = rng.Int63()
+ data.Id = rand64()
if err := util.PutObject(sv.St(), s.StKey(), data); err != nil {
return nil, verror.New(verror.ErrInternal, ctx, err)
}
@@ -104,6 +109,11 @@
s.id = data.Id
s.name = fmt.Sprintf("%x", s.id)
+ // Initialize in-memory state for the sync module before starting any threads.
+ if err := s.initSync(ctx); err != nil {
+ return nil, verror.New(verror.ErrInternal, ctx, err)
+ }
+
// Channel to propagate close event to all threads.
s.closed = make(chan struct{})
s.pending.Add(2)
diff --git a/services/syncbase/vsync/sync_state.go b/services/syncbase/vsync/sync_state.go
new file mode 100644
index 0000000..4d4e271
--- /dev/null
+++ b/services/syncbase/vsync/sync_state.go
@@ -0,0 +1,215 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package vsync
+
+// New log records are created when objects in the local store are created,
+// updated or deleted. Local log records are also replayed to keep the
+// per-object dags consistent with the local store state. Sync module assigns
+// each log record created within a Database a unique sequence number, called
+// the generation number. Locally on each device, the position of each log
+// record is also recorded relative to other local and remote log records.
+//
+// When a device receives a request to send log records, it first computes the
+// missing generations between itself and the incoming request on a per-prefix
+// basis. It then sends all the log records belonging to the missing generations
+// in the order they occur locally (using the local log position). A device that
+// receives log records over the network replays all the records received from
+// another device in a single batch. Each replayed log record adds a new version
+// to the dag of the object contained in the log record. At the end of replaying
+// all the log records, conflict detection and resolution is carried out for all
+// the objects learned during this iteration. Conflict detection and resolution
+// is carried out after a batch of log records are replayed, instead of
+// incrementally after each record is replayed, to avoid repeating conflict
+// resolution already performed by other devices.
+//
+// Sync module tracks the current generation number and the current local log
+// position for each Database. In addition, it also tracks the current
+// generation vector for a Database. Log records are indexed such that they can
+// be selectively retrieved from the store for any missing generation from any
+// device.
+
+import (
+ "fmt"
+
+ "v.io/syncbase/x/ref/services/syncbase/server/interfaces"
+ "v.io/syncbase/x/ref/services/syncbase/server/util"
+ "v.io/syncbase/x/ref/services/syncbase/store"
+
+ "v.io/v23/context"
+ "v.io/v23/verror"
+)
+
+// dbSyncStateInMem represents the in-memory sync state of a Database.
+type dbSyncStateInMem struct {
+ gen uint64
+ pos uint64
+ genvec interfaces.GenVector
+}
+
+// initSync initializes the sync module during startup. It scans all the
+// databases across all apps to a) initialize the in-memory sync state of a
+// Database consisting of the current generation number, log position and
+// generation vector; b) initialize the prefixes that are currently being
+// synced.
+//
+// TODO(hpucha): This is incomplete. Flesh this out further.
+func (s *syncService) initSync(ctx *context.T) error {
+ s.syncStateLock.Lock()
+ defer s.syncStateLock.Unlock()
+
+ var errFinal error
+ s.syncState = make(map[string]*dbSyncStateInMem)
+
+ // TODO(hpucha): Temporary hack.
+ return errFinal
+
+ /*s.forEachDatabaseStore(ctx, func(st store.Store) bool {
+ sn := st.NewSnapshot()
+ defer sn.Close()
+
+ ds, err := getDbSyncState(ctx, sn)
+ if err != nil && verror.ErrorID(err) != verror.ErrNoExist.ID {
+ errFinal = err
+ return false
+ }
+
+ var scanStart, scanLimit []byte
+ // Figure out what to scan among local log records.
+ if verror.ErrorID(err) == verror.ErrNoExist.ID {
+ scanStart, scanLimit = util.ScanPrefixArgs(logRecsPerDeviceScanPrefix(s.id), "")
+ } else {
+ scanStart, scanLimit = util.ScanPrefixArgs(logRecKey(s.id, ds.Gen), "")
+ }
+
+ var maxpos uint64
+ var dbName string
+ // Scan local log records to find the most recent one.
+ sn.Scan(scanStart, scanLimit)
+
+ // Scan remote log records using the persisted GenVector.
+
+ s.syncState[dbName] = &dbSyncStateInMem{pos: maxpos + 1}
+
+ return false
+ })
+
+ return errFinal*/
+}
+
+// reserveGenAndPosInDbLog reserves a chunk of generation numbers and log
+// positions in a Database's log. Used when local updates result in log
+// entries.
+func (s *syncService) reserveGenAndPosInDbLog(ctx *context.T, appName, dbName string, count uint64) (uint64, uint64) {
+ return s.reserveGenAndPosInternal(appName, dbName, count, count)
+}
+
+// reservePosInDbLog reserves a chunk of log positions in a Database's log. Used
+// when remote log records are received.
+func (s *syncService) reservePosInDbLog(ctx *context.T, appName, dbName string, count uint64) uint64 {
+ _, pos := s.reserveGenAndPosInternal(appName, dbName, 0, count)
+ return pos
+}
+
+func (s *syncService) reserveGenAndPosInternal(appName, dbName string, genCount, posCount uint64) (uint64, uint64) {
+ s.syncStateLock.Lock()
+ defer s.syncStateLock.Unlock()
+
+ name := globalDbName(appName, dbName)
+ ds, ok := s.syncState[name]
+ if !ok {
+ ds = &dbSyncStateInMem{gen: 1}
+ s.syncState[name] = ds
+ }
+
+ gen := ds.gen
+ pos := ds.pos
+
+ ds.gen += genCount
+ ds.pos += posCount
+
+ return gen, pos
+}
+
+// globalDbName returns the global name of a Database by combining the app and db names.
+func globalDbName(appName, dbName string) string {
+ return util.JoinKeyParts(appName, dbName)
+}
+
+////////////////////////////////////////////////////////////
+// Low-level utility functions to access sync state.
+
+// dbSyncStateKey returns the key used to access the sync state of a Database.
+func dbSyncStateKey() string {
+ return util.JoinKeyParts(util.SyncPrefix, "dbss")
+}
+
+// putDbSyncState persists the sync state object for a given Database.
+func putDbSyncState(ctx *context.T, tx store.StoreReadWriter, ds *dbSyncState) error {
+ _ = tx.(store.Transaction)
+ if err := util.PutObject(tx, dbSyncStateKey(), ds); err != nil {
+ return verror.New(verror.ErrInternal, ctx, err)
+ }
+ return nil
+}
+
+// getDbSyncState retrieves the sync state object for a given Database.
+func getDbSyncState(ctx *context.T, st store.StoreReader) (*dbSyncState, error) {
+ var ds dbSyncState
+ if err := util.GetObject(st, dbSyncStateKey(), &ds); err != nil {
+ return nil, verror.New(verror.ErrInternal, ctx, err)
+ }
+ return &ds, nil
+}
+
+////////////////////////////////////////////////////////////
+// Low-level utility functions to access log records.
+
+// logRecsPerDeviceScanPrefix returns the prefix used to scan log records for a particular device.
+func logRecsPerDeviceScanPrefix(id uint64) string {
+ return util.JoinKeyParts(util.SyncPrefix, "log", fmt.Sprintf("%x", id))
+}
+
+// logRecKey returns the key used to access a specific log record.
+func logRecKey(id, gen uint64) string {
+ return util.JoinKeyParts(util.SyncPrefix, "log", fmt.Sprintf("%x", id), fmt.Sprintf("%016x", gen))
+}
+
+// hasLogRec returns true if the log record for (devid, gen) exists.
+func hasLogRec(st store.StoreReader, id, gen uint64) bool {
+ // TODO(hpucha): optimize to avoid the unneeded fetch/decode of the data.
+ var rec localLogRec
+ if err := util.GetObject(st, logRecKey(id, gen), &rec); err != nil {
+ return false
+ }
+ return true
+}
+
+// putLogRec stores the log record.
+func putLogRec(ctx *context.T, tx store.StoreReadWriter, rec *localLogRec) error {
+ _ = tx.(store.Transaction)
+ if err := util.PutObject(tx, logRecKey(rec.Metadata.Id, rec.Metadata.Gen), rec); err != nil {
+ return verror.New(verror.ErrInternal, ctx, err)
+ }
+ return nil
+}
+
+// getLogRec retrieves the log record for a given (devid, gen).
+func getLogRec(ctx *context.T, st store.StoreReader, id, gen uint64) (*localLogRec, error) {
+ var rec localLogRec
+ if err := util.GetObject(st, logRecKey(id, gen), &rec); err != nil {
+ return nil, verror.New(verror.ErrInternal, ctx, err)
+ }
+ return &rec, nil
+}
+
+// delLogRec deletes the log record for a given (devid, gen).
+func delLogRec(ctx *context.T, tx store.StoreReadWriter, id, gen uint64) error {
+ _ = tx.(store.Transaction)
+
+ if err := tx.Delete([]byte(logRecKey(id, gen))); err != nil {
+ return verror.New(verror.ErrInternal, ctx, err)
+ }
+ return nil
+}
diff --git a/services/syncbase/vsync/sync_state_test.go b/services/syncbase/vsync/sync_state_test.go
new file mode 100644
index 0000000..03cb268
--- /dev/null
+++ b/services/syncbase/vsync/sync_state_test.go
@@ -0,0 +1,141 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package vsync
+
+import (
+ "reflect"
+ "testing"
+ "time"
+
+ "v.io/syncbase/x/ref/services/syncbase/server/interfaces"
+ "v.io/syncbase/x/ref/services/syncbase/store"
+)
+
+// Tests for sync state management and storage in Syncbase.
+
+// TestReserveGenAndPos tests reserving generation numbers and log positions in a
+// Database log.
+func TestReserveGenAndPos(t *testing.T) {
+ svc := createService(t)
+ s := svc.sync
+
+ var wantGen, wantPos uint64 = 1, 0
+ for i := 0; i < 5; i++ {
+ gotGen, gotPos := s.reserveGenAndPosInternal("mockapp", "mockdb", 5, 10)
+ if gotGen != wantGen || gotPos != wantPos {
+ t.Fatalf("reserveGenAndPosInternal failed, gotGen %v wantGen %v, gotPos %v wantPos %v", gotGen, wantGen, gotPos, wantPos)
+ }
+ wantGen += 5
+ wantPos += 10
+
+ name := globalDbName("mockapp", "mockdb")
+ if s.syncState[name].gen != wantGen || s.syncState[name].pos != wantPos {
+ t.Fatalf("reserveGenAndPosInternal failed, gotGen %v wantGen %v, gotPos %v wantPos %v", s.syncState[name].gen, wantGen, s.syncState[name].pos, wantPos)
+ }
+ }
+}
+
+// TestPutGetDbSyncState tests setting and getting sync metadata.
+func TestPutGetDbSyncState(t *testing.T) {
+ svc := createService(t)
+ st := svc.St()
+
+ checkDbSyncState(t, st, false, nil)
+
+ gv := interfaces.GenVector{
+ "mocktbl/foo": interfaces.PrefixGenVector{
+ 1: 2, 3: 4, 5: 6,
+ },
+ }
+
+ tx := st.NewTransaction()
+ wantSt := &dbSyncState{Gen: 40, GenVec: gv}
+ if err := putDbSyncState(nil, tx, wantSt); err != nil {
+ t.Fatalf("putDbSyncState failed, err %v", err)
+ }
+ if err := tx.Commit(); err != nil {
+ t.Fatalf("cannot commit putting db sync state, err %v", err)
+ }
+
+ checkDbSyncState(t, st, true, wantSt)
+}
+
+// TestPutGetDelLogRec tests setting, getting, and deleting a log record.
+func TestPutGetDelLogRec(t *testing.T) {
+ svc := createService(t)
+ st := svc.St()
+
+ var id uint64 = 10
+ var gen uint64 = 100
+
+ checkLogRec(t, st, id, gen, false, nil)
+
+ tx := st.NewTransaction()
+ wantRec := &localLogRec{
+ Metadata: interfaces.LogRecMetadata{
+ Id: id,
+ Gen: gen,
+ RecType: interfaces.NodeRec,
+ ObjId: "foo",
+ CurVers: "3",
+ Parents: []string{"1", "2"},
+ SyncTime: time.Now().UTC(),
+ Delete: false,
+ BatchId: 10000,
+ BatchCount: 1,
+ },
+ Pos: 10,
+ }
+ if err := putLogRec(nil, tx, wantRec); err != nil {
+ t.Fatalf("putLogRec(%d:%d) failed err %v", id, gen, err)
+ }
+ if err := tx.Commit(); err != nil {
+ t.Fatalf("cannot commit putting log rec, err %v", err)
+ }
+
+ checkLogRec(t, st, id, gen, true, wantRec)
+
+ tx = st.NewTransaction()
+ if err := delLogRec(nil, tx, id, gen); err != nil {
+ t.Fatalf("delLogRec(%d:%d) failed err %v", id, gen, err)
+ }
+ if err := tx.Commit(); err != nil {
+ t.Fatalf("cannot commit deleting log rec, err %v", err)
+ }
+
+ checkLogRec(t, st, id, gen, false, nil)
+}
+
+//////////////////////////////
+// Helpers
+
+func checkDbSyncState(t *testing.T, st store.StoreReader, exists bool, wantSt *dbSyncState) {
+ gotSt, err := getDbSyncState(nil, st)
+
+ if (!exists && err == nil) || (exists && err != nil) {
+ t.Fatalf("getDbSyncState failed, exists %v err %v", exists, err)
+ }
+
+ if !reflect.DeepEqual(gotSt, wantSt) {
+ t.Fatalf("getDbSyncState() failed, got %v, want %v", gotSt, wantSt)
+ }
+}
+
+func checkLogRec(t *testing.T, st store.StoreReader, id, gen uint64, exists bool, wantRec *localLogRec) {
+ gotRec, err := getLogRec(nil, st, id, gen)
+
+ if (!exists && err == nil) || (exists && err != nil) {
+ t.Fatalf("getLogRec(%d:%d) failed, exists %v err %v", id, gen, exists, err)
+ }
+
+ if !reflect.DeepEqual(gotRec, wantRec) {
+ t.Fatalf("getLogRec(%d:%d) failed, got %v, want %v", id, gen, gotRec, wantRec)
+ }
+
+ if hasLogRec(st, id, gen) != exists {
+ t.Fatalf("hasLogRec(%d:%d) failed, want %v", id, gen, exists)
+ }
+
+}
diff --git a/services/syncbase/vsync/types.vdl b/services/syncbase/vsync/types.vdl
index 2434200..2810cd0 100644
--- a/services/syncbase/vsync/types.vdl
+++ b/services/syncbase/vsync/types.vdl
@@ -4,7 +4,23 @@
package vsync
+import (
+ "v.io/syncbase/x/ref/services/syncbase/server/interfaces"
+)
+
// syncData represents the persistent state of the sync module.
type syncData struct {
- Id int64
+ Id uint64
+}
+
+// dbSyncState represents the persistent sync state of a Database.
+type dbSyncState struct {
+ Gen uint64 // local generation number
+ GenVec interfaces.GenVector // generation vector
+}
+
+// localLogRec represents the persistent local state of a log record.
+type localLogRec struct {
+ Metadata interfaces.LogRecMetadata
+ Pos uint64 // position in the Database log.
}
diff --git a/services/syncbase/vsync/types.vdl.go b/services/syncbase/vsync/types.vdl.go
index cc15abe..17e8348 100644
--- a/services/syncbase/vsync/types.vdl.go
+++ b/services/syncbase/vsync/types.vdl.go
@@ -10,11 +10,14 @@
import (
// VDL system imports
"v.io/v23/vdl"
+
+ // VDL user imports
+ "v.io/syncbase/x/ref/services/syncbase/server/interfaces"
)
// syncData represents the persistent state of the sync module.
type syncData struct {
- Id int64
+ Id uint64
}
func (syncData) __VDLReflect(struct {
@@ -22,6 +25,30 @@
}) {
}
+// dbSyncState represents the persistent sync state of a Database.
+type dbSyncState struct {
+ Gen uint64 // local generation number
+ GenVec interfaces.GenVector // generation vector
+}
+
+func (dbSyncState) __VDLReflect(struct {
+ Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/vsync.dbSyncState"`
+}) {
+}
+
+// localLogRec represents the persistent local state of a log record.
+type localLogRec struct {
+ Metadata interfaces.LogRecMetadata
+ Pos uint64 // position in the Database log.
+}
+
+func (localLogRec) __VDLReflect(struct {
+ Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/vsync.localLogRec"`
+}) {
+}
+
func init() {
vdl.Register((*syncData)(nil))
+ vdl.Register((*dbSyncState)(nil))
+ vdl.Register((*localLogRec)(nil))
}
diff --git a/services/syncbase/vsync/watcher.go b/services/syncbase/vsync/watcher.go
index a711078..5ac1b52 100644
--- a/services/syncbase/vsync/watcher.go
+++ b/services/syncbase/vsync/watcher.go
@@ -4,13 +4,95 @@
package vsync
+import (
+ "v.io/syncbase/x/ref/services/syncbase/server/interfaces"
+ "v.io/syncbase/x/ref/services/syncbase/store"
+
+ "v.io/v23/context"
+ "v.io/v23/verror"
+)
+
// When applications update objects in the local Store, the sync
// watcher thread learns about them asynchronously via the "watch"
// stream of object mutations. In turn, this sync watcher thread
-// updates the DAG and ILog records to track the object change
+// updates the DAG and log records to track the object change
// histories.
// watchStore processes updates obtained by watching the store.
func (s *syncService) watchStore() {
defer s.pending.Done()
}
+
+// TODO(hpucha): This is a skeleton only to drive the change for log.
+
+// processBatch applies a single batch of changes (object mutations) received
+// from watching a particular Database.
+func (s *syncService) processBatch(ctx *context.T, appName, dbName string, batch []*localLogRec, tx store.StoreReadWriter) error {
+ _ = tx.(store.Transaction)
+
+ count := uint64(len(batch))
+ if count == 0 {
+ return nil
+ }
+
+ // If the batch has more than one mutation, start a batch for it.
+ batchId := NoBatchId
+ if count > 1 {
+ batchId = s.startBatch(ctx, tx, batchId)
+ if batchId == NoBatchId {
+ return verror.New(verror.ErrInternal, ctx, "failed to generate batch ID")
+ }
+ }
+
+ gen, pos := s.reserveGenAndPosInDbLog(ctx, appName, dbName, count)
+
+ for _, rec := range batch {
+
+ // Update the log record. Portions of the record Metadata must
+ // already be filled.
+ rec.Metadata.Id = s.id
+ rec.Metadata.Gen = gen
+ rec.Metadata.RecType = interfaces.NodeRec
+
+ rec.Metadata.BatchId = batchId
+ rec.Metadata.BatchCount = count
+
+ rec.Pos = pos
+
+ gen++
+ pos++
+
+ if err := s.processLocalLogRec(ctx, tx, rec); err != nil {
+ return verror.New(verror.ErrInternal, ctx, err)
+ }
+ }
+
+ // End the batch if any.
+ if batchId != NoBatchId {
+ if err := s.endBatch(ctx, tx, batchId, count); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+// processLogRec processes a local log record by adding to the Database and
+// suitably updating the DAG metadata.
+func (s *syncService) processLocalLogRec(ctx *context.T, tx store.StoreReadWriter, rec *localLogRec) error {
+ // Insert the new log record into the log.
+ if err := putLogRec(ctx, tx, rec); err != nil {
+ return err
+ }
+
+ m := rec.Metadata
+ logKey := logRecKey(m.Id, m.Gen)
+
+ // Insert the new log record into dag.
+ if err := s.addNode(ctx, tx, m.ObjId, m.CurVers, logKey, m.Delete, m.Parents, m.BatchId, nil); err != nil {
+ return err
+ }
+
+ // Move the head.
+ return moveHead(ctx, tx, m.ObjId, m.CurVers)
+}