syncbase: Integrate log and gen vector functionality (Part 1).

Change-Id: Id538ba542def314885120bc29d5d258d4c13e32c
diff --git a/services/syncbase/server/interfaces/sync_types.vdl b/services/syncbase/server/interfaces/sync_types.vdl
index ffb2b40..a050e45 100644
--- a/services/syncbase/server/interfaces/sync_types.vdl
+++ b/services/syncbase/server/interfaces/sync_types.vdl
@@ -5,6 +5,8 @@
 package interfaces
 
 import (
+	"time"
+
 	wire "v.io/syncbase/v23/services/syncbase/nosql"
 )
 
@@ -12,6 +14,61 @@
 	NoGroupId = GroupId(0)
 )
 
+// TODO(hpucha): These are not final yet. This is an intermediate step.
+
+const (
+        // NodeRec type log record adds a new node in the dag.
+        NodeRec = byte(0)
+        // LinkRec type log record adds a new link in the dag.
+        LinkRec = byte(1)
+)
+
+// PrefixGenVector is the generation vector for a data prefix, which maps each
+// device id to its last locally known generation in the scope of that prefix.
+type PrefixGenVector map[uint64]uint64
+
+// GenVector is the generation vector for a Database, and maps prefixes to their
+// generation vectors. Note that the prefixes in a GenVector are global prefixes
+// that include the appropriate Application and Database name.
+type GenVector map[string]PrefixGenVector
+
+// LogRecMetadata represents the metadata of a single log record that is
+// exchanged between two peers. Each log record represents a change made to an
+// object in the store. The log record metadata consists of Id, the device id
+// that created the log record, Gen, the generation number for that log record,
+// and RecType, the type of log record. It also contains information relevant to
+// the update to the object in the store: ObjId is the id of the object that was
+// updated. CurVers is the current version number of the object. Parents can
+// contain 0, 1 or 2 parent versions that the current version is derived from.
+// SyncTime is the timestamp when the update is committed to the Store.
+// Delete indicates whether the update resulted in the object being
+// deleted from the store. BatchId is the unique id of the Batch this update
+// belongs to. BatchCount is the number of objects in the Batch.
+//
+// TODO(hpucha): Add readset/scanset.
+type LogRecMetadata struct {
+        // Log related information.
+        Id        uint64
+	Gen       uint64
+	RecType   byte
+
+        // Object related information.
+        ObjId       string
+        CurVers     string
+        Parents     []string
+	SyncTime    time.Time
+	Delete      bool
+	BatchId     uint64
+	BatchCount  uint64
+}
+
+// LogRec represents the on-wire representation of an entire log record: its
+// metadata and data. Value is the actual value of a store object.
+type LogRec struct {
+	Metadata LogRecMetadata
+	Value    []byte
+}
+
 // GroupId is a globally unique SyncGroup ID.
 type GroupId uint64
 
diff --git a/services/syncbase/server/interfaces/sync_types.vdl.go b/services/syncbase/server/interfaces/sync_types.vdl.go
index 3c7e2dd..e188a35 100644
--- a/services/syncbase/server/interfaces/sync_types.vdl.go
+++ b/services/syncbase/server/interfaces/sync_types.vdl.go
@@ -13,9 +13,76 @@
 	"v.io/v23/vdl"
 
 	// VDL user imports
+	"time"
 	"v.io/syncbase/v23/services/syncbase/nosql"
+	_ "v.io/v23/vdlroot/time"
 )
 
+// PrefixGenVector is the generation vector for a data prefix, which maps each
+// device id to its last locally known generation in the scope of that prefix.
+type PrefixGenVector map[uint64]uint64
+
+func (PrefixGenVector) __VDLReflect(struct {
+	Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/server/interfaces.PrefixGenVector"`
+}) {
+}
+
+// GenVector is the generation vector for a Database, and maps prefixes to their
+// generation vectors. Note that the prefixes in a GenVector are global prefixes
+// that include the appropriate Application and Database name.
+type GenVector map[string]PrefixGenVector
+
+func (GenVector) __VDLReflect(struct {
+	Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/server/interfaces.GenVector"`
+}) {
+}
+
+// LogRecMetadata represents the metadata of a single log record that is
+// exchanged between two peers. Each log record represents a change made to an
+// object in the store. The log record metadata consists of Id, the device id
+// that created the log record, Gen, the generation number for that log record,
+// and RecType, the type of log record. It also contains information relevant to
+// the update to the object in the store: ObjId is the id of the object that was
+// updated. CurVers is the current version number of the object. Parents can
+// contain 0, 1 or 2 parent versions that the current version is derived from.
+// SyncTime is the timestamp when the update is committed to the Store.
+// Delete indicates whether the update resulted in the object being
+// deleted from the store. BatchId is the unique id of the Batch this update
+// belongs to. BatchCount is the number of objects in the Batch.
+//
+// TODO(hpucha): Add readset/scanset.
+type LogRecMetadata struct {
+	// Log related information.
+	Id      uint64
+	Gen     uint64
+	RecType byte
+	// Object related information.
+	ObjId      string
+	CurVers    string
+	Parents    []string
+	SyncTime   time.Time
+	Delete     bool
+	BatchId    uint64
+	BatchCount uint64
+}
+
+func (LogRecMetadata) __VDLReflect(struct {
+	Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/server/interfaces.LogRecMetadata"`
+}) {
+}
+
+// LogRec represents the on-wire representation of an entire log record: its
+// metadata and data. Value is the actual value of a store object.
+type LogRec struct {
+	Metadata LogRecMetadata
+	Value    []byte
+}
+
+func (LogRec) __VDLReflect(struct {
+	Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/server/interfaces.LogRec"`
+}) {
+}
+
 // GroupId is a globally unique SyncGroup ID.
 type GroupId uint64
 
@@ -97,9 +164,19 @@
 }
 
 func init() {
+	vdl.Register((*PrefixGenVector)(nil))
+	vdl.Register((*GenVector)(nil))
+	vdl.Register((*LogRecMetadata)(nil))
+	vdl.Register((*LogRec)(nil))
 	vdl.Register((*GroupId)(nil))
 	vdl.Register((*SyncGroupStatus)(nil))
 	vdl.Register((*SyncGroup)(nil))
 }
 
 const NoGroupId = GroupId(0)
+
+// NodeRec type log record adds a new node in the dag.
+const NodeRec = byte(0)
+
+// LinkRec type log record adds a new link in the dag.
+const LinkRec = byte(1)
diff --git a/services/syncbase/vsync/sync.go b/services/syncbase/vsync/sync.go
index bded17b..92d3d23 100644
--- a/services/syncbase/vsync/sync.go
+++ b/services/syncbase/vsync/sync.go
@@ -29,7 +29,7 @@
 // syncService contains the metadata for the sync module.
 type syncService struct {
 	// TODO(hpucha): see if "v.io/v23/uniqueid" is a better fit. It is 128 bits.
-	id   int64  // globally unique id for this instance of Syncbase
+	id   uint64 // globally unique id for this instance of Syncbase
 	name string // name derived from the global id.
 	sv   interfaces.Service
 
@@ -48,6 +48,11 @@
 	// In-memory sync membership info aggregated across databases.
 	allMembers *memberView
 
+	// In-memory sync state per Database. This state is populated at
+	// startup, and periodically persisted by the initiator.
+	syncState     map[string]*dbSyncStateInMem
+	syncStateLock sync.Mutex // lock to protect access to the sync state.
+
 	// In-memory tracking of batches during their construction.
 	// The sync Initiator and Watcher build batches incrementally here
 	// and then persist them in DAG batch entries.  The mutex guards
@@ -94,7 +99,7 @@
 		}
 		// First invocation of vsync.New().
 		// TODO(sadovsky): Maybe move guid generation and storage to serviceData.
-		data.Id = rng.Int63()
+		data.Id = rand64()
 		if err := util.PutObject(sv.St(), s.StKey(), data); err != nil {
 			return nil, verror.New(verror.ErrInternal, ctx, err)
 		}
@@ -104,6 +109,11 @@
 	s.id = data.Id
 	s.name = fmt.Sprintf("%x", s.id)
 
+	// Initialize in-memory state for the sync module before starting any threads.
+	if err := s.initSync(ctx); err != nil {
+		return nil, verror.New(verror.ErrInternal, ctx, err)
+	}
+
 	// Channel to propagate close event to all threads.
 	s.closed = make(chan struct{})
 	s.pending.Add(2)
diff --git a/services/syncbase/vsync/sync_state.go b/services/syncbase/vsync/sync_state.go
new file mode 100644
index 0000000..4d4e271
--- /dev/null
+++ b/services/syncbase/vsync/sync_state.go
@@ -0,0 +1,215 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package vsync
+
+// New log records are created when objects in the local store are created,
+// updated or deleted. Local log records are also replayed to keep the
+// per-object dags consistent with the local store state. Sync module assigns
+// each log record created within a Database a unique sequence number, called
+// the generation number. Locally on each device, the position of each log
+// record is also recorded relative to other local and remote log records.
+//
+// When a device receives a request to send log records, it first computes the
+// missing generations between itself and the incoming request on a per-prefix
+// basis. It then sends all the log records belonging to the missing generations
+// in the order they occur locally (using the local log position). A device that
+// receives log records over the network replays all the records received from
+// another device in a single batch. Each replayed log record adds a new version
+// to the dag of the object contained in the log record. At the end of replaying
+// all the log records, conflict detection and resolution is carried out for all
+// the objects learned during this iteration. Conflict detection and resolution
+// is carried out after a batch of log records are replayed, instead of
+// incrementally after each record is replayed, to avoid repeating conflict
+// resolution already performed by other devices.
+//
+// Sync module tracks the current generation number and the current local log
+// position for each Database. In addition, it also tracks the current
+// generation vector for a Database. Log records are indexed such that they can
+// be selectively retrieved from the store for any missing generation from any
+// device.
+
+import (
+	"fmt"
+
+	"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
+	"v.io/syncbase/x/ref/services/syncbase/server/util"
+	"v.io/syncbase/x/ref/services/syncbase/store"
+
+	"v.io/v23/context"
+	"v.io/v23/verror"
+)
+
+// dbSyncStateInMem represents the in-memory sync state of a Database.
+type dbSyncStateInMem struct {
+	gen    uint64
+	pos    uint64
+	genvec interfaces.GenVector
+}
+
+// initSync initializes the sync module during startup. It scans all the
+// databases across all apps to a) initialize the in-memory sync state of a
+// Database consisting of the current generation number, log position and
+// generation vector; b) initialize the prefixes that are currently being
+// synced.
+//
+// TODO(hpucha): This is incomplete. Flesh this out further.
+func (s *syncService) initSync(ctx *context.T) error {
+	s.syncStateLock.Lock()
+	defer s.syncStateLock.Unlock()
+
+	var errFinal error
+	s.syncState = make(map[string]*dbSyncStateInMem)
+
+	// TODO(hpucha): Temporary hack.
+	return errFinal
+
+	/*s.forEachDatabaseStore(ctx, func(st store.Store) bool {
+		sn := st.NewSnapshot()
+		defer sn.Close()
+
+		ds, err := getDbSyncState(ctx, sn)
+		if err != nil && verror.ErrorID(err) != verror.ErrNoExist.ID {
+			errFinal = err
+			return false
+		}
+
+		var scanStart, scanLimit []byte
+		// Figure out what to scan among local log records.
+		if verror.ErrorID(err) == verror.ErrNoExist.ID {
+			scanStart, scanLimit = util.ScanPrefixArgs(logRecsPerDeviceScanPrefix(s.id), "")
+		} else {
+			scanStart, scanLimit = util.ScanPrefixArgs(logRecKey(s.id, ds.Gen), "")
+		}
+
+		var maxpos uint64
+		var dbName string
+		// Scan local log records to find the most recent one.
+		sn.Scan(scanStart, scanLimit)
+
+		// Scan remote log records using the persisted GenVector.
+
+		s.syncState[dbName] = &dbSyncStateInMem{pos: maxpos + 1}
+
+		return false
+	})
+
+	return errFinal*/
+}
+
+// reserveGenAndPosInDbLog reserves a chunk of generation numbers and log
+// positions in a Database's log. Used when local updates result in log
+// entries.
+func (s *syncService) reserveGenAndPosInDbLog(ctx *context.T, appName, dbName string, count uint64) (uint64, uint64) {
+	return s.reserveGenAndPosInternal(appName, dbName, count, count)
+}
+
+// reservePosInDbLog reserves a chunk of log positions in a Database's log. Used
+// when remote log records are received.
+func (s *syncService) reservePosInDbLog(ctx *context.T, appName, dbName string, count uint64) uint64 {
+	_, pos := s.reserveGenAndPosInternal(appName, dbName, 0, count)
+	return pos
+}
+
+func (s *syncService) reserveGenAndPosInternal(appName, dbName string, genCount, posCount uint64) (uint64, uint64) {
+	s.syncStateLock.Lock()
+	defer s.syncStateLock.Unlock()
+
+	name := globalDbName(appName, dbName)
+	ds, ok := s.syncState[name]
+	if !ok {
+		ds = &dbSyncStateInMem{gen: 1}
+		s.syncState[name] = ds
+	}
+
+	gen := ds.gen
+	pos := ds.pos
+
+	ds.gen += genCount
+	ds.pos += posCount
+
+	return gen, pos
+}
+
+// globalDbName returns the global name of a Database by combining the app and db names.
+func globalDbName(appName, dbName string) string {
+	return util.JoinKeyParts(appName, dbName)
+}
+
+////////////////////////////////////////////////////////////
+// Low-level utility functions to access sync state.
+
+// dbSyncStateKey returns the key used to access the sync state of a Database.
+func dbSyncStateKey() string {
+	return util.JoinKeyParts(util.SyncPrefix, "dbss")
+}
+
+// putDbSyncState persists the sync state object for a given Database.
+func putDbSyncState(ctx *context.T, tx store.StoreReadWriter, ds *dbSyncState) error {
+	_ = tx.(store.Transaction)
+	if err := util.PutObject(tx, dbSyncStateKey(), ds); err != nil {
+		return verror.New(verror.ErrInternal, ctx, err)
+	}
+	return nil
+}
+
+// getDbSyncState retrieves the sync state object for a given Database.
+func getDbSyncState(ctx *context.T, st store.StoreReader) (*dbSyncState, error) {
+	var ds dbSyncState
+	if err := util.GetObject(st, dbSyncStateKey(), &ds); err != nil {
+		return nil, verror.New(verror.ErrInternal, ctx, err)
+	}
+	return &ds, nil
+}
+
+////////////////////////////////////////////////////////////
+// Low-level utility functions to access log records.
+
+// logRecsPerDeviceScanPrefix returns the prefix used to scan log records for a particular device.
+func logRecsPerDeviceScanPrefix(id uint64) string {
+	return util.JoinKeyParts(util.SyncPrefix, "log", fmt.Sprintf("%x", id))
+}
+
+// logRecKey returns the key used to access a specific log record.
+func logRecKey(id, gen uint64) string {
+	return util.JoinKeyParts(util.SyncPrefix, "log", fmt.Sprintf("%x", id), fmt.Sprintf("%016x", gen))
+}
+
+// hasLogRec returns true if the log record for (devid, gen) exists.
+func hasLogRec(st store.StoreReader, id, gen uint64) bool {
+	// TODO(hpucha): optimize to avoid the unneeded fetch/decode of the data.
+	var rec localLogRec
+	if err := util.GetObject(st, logRecKey(id, gen), &rec); err != nil {
+		return false
+	}
+	return true
+}
+
+// putLogRec stores the log record.
+func putLogRec(ctx *context.T, tx store.StoreReadWriter, rec *localLogRec) error {
+	_ = tx.(store.Transaction)
+	if err := util.PutObject(tx, logRecKey(rec.Metadata.Id, rec.Metadata.Gen), rec); err != nil {
+		return verror.New(verror.ErrInternal, ctx, err)
+	}
+	return nil
+}
+
+// getLogRec retrieves the log record for a given (devid, gen).
+func getLogRec(ctx *context.T, st store.StoreReader, id, gen uint64) (*localLogRec, error) {
+	var rec localLogRec
+	if err := util.GetObject(st, logRecKey(id, gen), &rec); err != nil {
+		return nil, verror.New(verror.ErrInternal, ctx, err)
+	}
+	return &rec, nil
+}
+
+// delLogRec deletes the log record for a given (devid, gen).
+func delLogRec(ctx *context.T, tx store.StoreReadWriter, id, gen uint64) error {
+	_ = tx.(store.Transaction)
+
+	if err := tx.Delete([]byte(logRecKey(id, gen))); err != nil {
+		return verror.New(verror.ErrInternal, ctx, err)
+	}
+	return nil
+}
diff --git a/services/syncbase/vsync/sync_state_test.go b/services/syncbase/vsync/sync_state_test.go
new file mode 100644
index 0000000..03cb268
--- /dev/null
+++ b/services/syncbase/vsync/sync_state_test.go
@@ -0,0 +1,141 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package vsync
+
+import (
+	"reflect"
+	"testing"
+	"time"
+
+	"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
+	"v.io/syncbase/x/ref/services/syncbase/store"
+)
+
+// Tests for sync state management and storage in Syncbase.
+
+// TestReserveGenAndPos tests reserving generation numbers and log positions in a
+// Database log.
+func TestReserveGenAndPos(t *testing.T) {
+	svc := createService(t)
+	s := svc.sync
+
+	var wantGen, wantPos uint64 = 1, 0
+	for i := 0; i < 5; i++ {
+		gotGen, gotPos := s.reserveGenAndPosInternal("mockapp", "mockdb", 5, 10)
+		if gotGen != wantGen || gotPos != wantPos {
+			t.Fatalf("reserveGenAndPosInternal failed, gotGen %v wantGen %v, gotPos %v wantPos %v", gotGen, wantGen, gotPos, wantPos)
+		}
+		wantGen += 5
+		wantPos += 10
+
+		name := globalDbName("mockapp", "mockdb")
+		if s.syncState[name].gen != wantGen || s.syncState[name].pos != wantPos {
+			t.Fatalf("reserveGenAndPosInternal failed, gotGen %v wantGen %v, gotPos %v wantPos %v", s.syncState[name].gen, wantGen, s.syncState[name].pos, wantPos)
+		}
+	}
+}
+
+// TestPutGetDbSyncState tests setting and getting sync metadata.
+func TestPutGetDbSyncState(t *testing.T) {
+	svc := createService(t)
+	st := svc.St()
+
+	checkDbSyncState(t, st, false, nil)
+
+	gv := interfaces.GenVector{
+		"mocktbl/foo": interfaces.PrefixGenVector{
+			1: 2, 3: 4, 5: 6,
+		},
+	}
+
+	tx := st.NewTransaction()
+	wantSt := &dbSyncState{Gen: 40, GenVec: gv}
+	if err := putDbSyncState(nil, tx, wantSt); err != nil {
+		t.Fatalf("putDbSyncState failed, err %v", err)
+	}
+	if err := tx.Commit(); err != nil {
+		t.Fatalf("cannot commit putting db sync state, err %v", err)
+	}
+
+	checkDbSyncState(t, st, true, wantSt)
+}
+
+// TestPutGetDelLogRec tests setting, getting, and deleting a log record.
+func TestPutGetDelLogRec(t *testing.T) {
+	svc := createService(t)
+	st := svc.St()
+
+	var id uint64 = 10
+	var gen uint64 = 100
+
+	checkLogRec(t, st, id, gen, false, nil)
+
+	tx := st.NewTransaction()
+	wantRec := &localLogRec{
+		Metadata: interfaces.LogRecMetadata{
+			Id:         id,
+			Gen:        gen,
+			RecType:    interfaces.NodeRec,
+			ObjId:      "foo",
+			CurVers:    "3",
+			Parents:    []string{"1", "2"},
+			SyncTime:   time.Now().UTC(),
+			Delete:     false,
+			BatchId:    10000,
+			BatchCount: 1,
+		},
+		Pos: 10,
+	}
+	if err := putLogRec(nil, tx, wantRec); err != nil {
+		t.Fatalf("putLogRec(%d:%d) failed err %v", id, gen, err)
+	}
+	if err := tx.Commit(); err != nil {
+		t.Fatalf("cannot commit putting log rec, err %v", err)
+	}
+
+	checkLogRec(t, st, id, gen, true, wantRec)
+
+	tx = st.NewTransaction()
+	if err := delLogRec(nil, tx, id, gen); err != nil {
+		t.Fatalf("delLogRec(%d:%d) failed err %v", id, gen, err)
+	}
+	if err := tx.Commit(); err != nil {
+		t.Fatalf("cannot commit deleting log rec, err %v", err)
+	}
+
+	checkLogRec(t, st, id, gen, false, nil)
+}
+
+//////////////////////////////
+// Helpers
+
+func checkDbSyncState(t *testing.T, st store.StoreReader, exists bool, wantSt *dbSyncState) {
+	gotSt, err := getDbSyncState(nil, st)
+
+	if (!exists && err == nil) || (exists && err != nil) {
+		t.Fatalf("getDbSyncState failed, exists %v err %v", exists, err)
+	}
+
+	if !reflect.DeepEqual(gotSt, wantSt) {
+		t.Fatalf("getDbSyncState() failed, got %v, want %v", gotSt, wantSt)
+	}
+}
+
+func checkLogRec(t *testing.T, st store.StoreReader, id, gen uint64, exists bool, wantRec *localLogRec) {
+	gotRec, err := getLogRec(nil, st, id, gen)
+
+	if (!exists && err == nil) || (exists && err != nil) {
+		t.Fatalf("getLogRec(%d:%d) failed, exists %v err %v", id, gen, exists, err)
+	}
+
+	if !reflect.DeepEqual(gotRec, wantRec) {
+		t.Fatalf("getLogRec(%d:%d) failed, got %v, want %v", id, gen, gotRec, wantRec)
+	}
+
+	if hasLogRec(st, id, gen) != exists {
+		t.Fatalf("hasLogRec(%d:%d) failed, want %v", id, gen, exists)
+	}
+
+}
diff --git a/services/syncbase/vsync/types.vdl b/services/syncbase/vsync/types.vdl
index 2434200..2810cd0 100644
--- a/services/syncbase/vsync/types.vdl
+++ b/services/syncbase/vsync/types.vdl
@@ -4,7 +4,23 @@
 
 package vsync
 
+import (
+	"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
+)
+
 // syncData represents the persistent state of the sync module.
 type syncData struct {
-	Id int64
+	Id uint64
+}
+
+// dbSyncState represents the persistent sync state of a Database.
+type dbSyncState struct {
+	Gen    uint64               // local generation number
+	GenVec interfaces.GenVector // generation vector
+}
+
+// localLogRec represents the persistent local state of a log record.
+type localLogRec struct {
+	Metadata interfaces.LogRecMetadata
+	Pos      uint64 // position in the Database log.
 }
diff --git a/services/syncbase/vsync/types.vdl.go b/services/syncbase/vsync/types.vdl.go
index cc15abe..17e8348 100644
--- a/services/syncbase/vsync/types.vdl.go
+++ b/services/syncbase/vsync/types.vdl.go
@@ -10,11 +10,14 @@
 import (
 	// VDL system imports
 	"v.io/v23/vdl"
+
+	// VDL user imports
+	"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
 )
 
 // syncData represents the persistent state of the sync module.
 type syncData struct {
-	Id int64
+	Id uint64
 }
 
 func (syncData) __VDLReflect(struct {
@@ -22,6 +25,30 @@
 }) {
 }
 
+// dbSyncState represents the persistent sync state of a Database.
+type dbSyncState struct {
+	Gen    uint64               // local generation number
+	GenVec interfaces.GenVector // generation vector
+}
+
+func (dbSyncState) __VDLReflect(struct {
+	Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/vsync.dbSyncState"`
+}) {
+}
+
+// localLogRec represents the persistent local state of a log record.
+type localLogRec struct {
+	Metadata interfaces.LogRecMetadata
+	Pos      uint64 // position in the Database log.
+}
+
+func (localLogRec) __VDLReflect(struct {
+	Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/vsync.localLogRec"`
+}) {
+}
+
 func init() {
 	vdl.Register((*syncData)(nil))
+	vdl.Register((*dbSyncState)(nil))
+	vdl.Register((*localLogRec)(nil))
 }
diff --git a/services/syncbase/vsync/watcher.go b/services/syncbase/vsync/watcher.go
index a711078..5ac1b52 100644
--- a/services/syncbase/vsync/watcher.go
+++ b/services/syncbase/vsync/watcher.go
@@ -4,13 +4,95 @@
 
 package vsync
 
+import (
+	"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
+	"v.io/syncbase/x/ref/services/syncbase/store"
+
+	"v.io/v23/context"
+	"v.io/v23/verror"
+)
+
 // When applications update objects in the local Store, the sync
 // watcher thread learns about them asynchronously via the "watch"
 // stream of object mutations. In turn, this sync watcher thread
-// updates the DAG and ILog records to track the object change
+// updates the DAG and log records to track the object change
 // histories.
 
 // watchStore processes updates obtained by watching the store.
 func (s *syncService) watchStore() {
 	defer s.pending.Done()
 }
+
+// TODO(hpucha): This is a skeleton only to drive the change for log.
+
+// processBatch applies a single batch of changes (object mutations) received
+// from watching a particular Database.
+func (s *syncService) processBatch(ctx *context.T, appName, dbName string, batch []*localLogRec, tx store.StoreReadWriter) error {
+	_ = tx.(store.Transaction)
+
+	count := uint64(len(batch))
+	if count == 0 {
+		return nil
+	}
+
+	// If the batch has more than one mutation, start a batch for it.
+	batchId := NoBatchId
+	if count > 1 {
+		batchId = s.startBatch(ctx, tx, batchId)
+		if batchId == NoBatchId {
+			return verror.New(verror.ErrInternal, ctx, "failed to generate batch ID")
+		}
+	}
+
+	gen, pos := s.reserveGenAndPosInDbLog(ctx, appName, dbName, count)
+
+	for _, rec := range batch {
+
+		// Update the log record. Portions of the record Metadata must
+		// already be filled.
+		rec.Metadata.Id = s.id
+		rec.Metadata.Gen = gen
+		rec.Metadata.RecType = interfaces.NodeRec
+
+		rec.Metadata.BatchId = batchId
+		rec.Metadata.BatchCount = count
+
+		rec.Pos = pos
+
+		gen++
+		pos++
+
+		if err := s.processLocalLogRec(ctx, tx, rec); err != nil {
+			return verror.New(verror.ErrInternal, ctx, err)
+		}
+	}
+
+	// End the batch if any.
+	if batchId != NoBatchId {
+		if err := s.endBatch(ctx, tx, batchId, count); err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
+// processLogRec processes a local log record by adding to the Database and
+// suitably updating the DAG metadata.
+func (s *syncService) processLocalLogRec(ctx *context.T, tx store.StoreReadWriter, rec *localLogRec) error {
+	// Insert the new log record into the log.
+	if err := putLogRec(ctx, tx, rec); err != nil {
+		return err
+	}
+
+	m := rec.Metadata
+	logKey := logRecKey(m.Id, m.Gen)
+
+	// Insert the new log record into dag.
+	if err := s.addNode(ctx, tx, m.ObjId, m.CurVers, logKey, m.Delete, m.Parents, m.BatchId, nil); err != nil {
+		return err
+	}
+
+	// Move the head.
+	return moveHead(ctx, tx, m.ObjId, m.CurVers)
+}