syncbase/vsync: logging & error handling
- Add some debug logging to initiator, responder, and watcher.
- Fix error handling in hasNode() and similar hasXYZ() calls.
Change-Id: Idb5596526b61079e9998bca5f530eac136e2d14c
diff --git a/x/ref/services/syncbase/vsync/conflict_resolution.go b/x/ref/services/syncbase/vsync/conflict_resolution.go
index 724e49f..39d63f1 100644
--- a/x/ref/services/syncbase/vsync/conflict_resolution.go
+++ b/x/ref/services/syncbase/vsync/conflict_resolution.go
@@ -103,7 +103,7 @@
case local.Metadata.CurVers < remote.Metadata.CurVers:
res.ty = pickRemote
default:
- vlog.Fatalf("resolveObjConflictByTime:: local and remote update times and versions are the same, local %v remote %v", local, remote)
+ vlog.Fatalf("sync: resolveObjConflictByTime: local and remote update times and versions are the same, local %v remote %v", local, remote)
}
return &res, nil
diff --git a/x/ref/services/syncbase/vsync/dag.go b/x/ref/services/syncbase/vsync/dag.go
index 0f5adfd..728cb0f 100644
--- a/x/ref/services/syncbase/vsync/dag.go
+++ b/x/ref/services/syncbase/vsync/dag.go
@@ -279,7 +279,9 @@
}
// The new node must not exist.
- if hasNode(ctx, tx, oid, version) {
+ if ok, err := hasNode(ctx, tx, oid, version); err != nil {
+ return err
+ } else if ok {
return verror.New(verror.ErrInternal, ctx, "DAG node already exists", oid, version)
}
@@ -444,7 +446,9 @@
_ = tx.(store.Transaction)
// Verify that the node exists.
- if !hasNode(ctx, tx, oid, head) {
+ if ok, err := hasNode(ctx, tx, oid, head); err != nil {
+ return err
+ } else if !ok {
return verror.New(verror.ErrInternal, ctx, "node", oid, head, "does not exist")
}
@@ -742,12 +746,15 @@
}
// hasNode returns true if the node (oid, version) exists in the DAG.
-func hasNode(ctx *context.T, st store.StoreReader, oid, version string) bool {
+func hasNode(ctx *context.T, st store.StoreReader, oid, version string) (bool, error) {
// TODO(rdaoud): optimize to avoid the unneeded fetch/decode of the data.
if _, err := getNode(ctx, st, oid, version); err != nil {
- return false
+ if verror.ErrorID(err) == verror.ErrNoExist.ID {
+ err = nil
+ }
+ return false, err
}
- return true
+ return true, nil
}
// headKey returns the key used to access the DAG object head.
diff --git a/x/ref/services/syncbase/vsync/dag_test.go b/x/ref/services/syncbase/vsync/dag_test.go
index 1aa75d5..3728635 100644
--- a/x/ref/services/syncbase/vsync/dag_test.go
+++ b/x/ref/services/syncbase/vsync/dag_test.go
@@ -31,7 +31,7 @@
t.Errorf("found non-existent object %s:%s: %v", oid, version, node)
}
- if hasNode(nil, st, oid, version) {
+ if ok, err := hasNode(nil, st, oid, version); err != nil || ok {
t.Errorf("hasNode() found non-existent object %s:%s", oid, version)
}
@@ -52,7 +52,7 @@
t.Errorf("cannot find stored object %s:%s: %v", oid, version, err)
}
- if !hasNode(nil, st, oid, version) {
+ if ok, err := hasNode(nil, st, oid, version); err != nil || !ok {
t.Errorf("hasNode() did not find object %s:%s", oid, version)
}
@@ -92,7 +92,7 @@
t.Errorf("found deleted object %s:%s (%v)", oid, version, node2)
}
- if hasNode(nil, st, oid, version) {
+ if ok, err := hasNode(nil, st, oid, version); err != nil || ok {
t.Errorf("hasNode() found deleted object %s:%s", oid, version)
}
diff --git a/x/ref/services/syncbase/vsync/initiator.go b/x/ref/services/syncbase/vsync/initiator.go
index 600af4b..e14dac6 100644
--- a/x/ref/services/syncbase/vsync/initiator.go
+++ b/x/ref/services/syncbase/vsync/initiator.go
@@ -60,17 +60,21 @@
//
// TODO(hpucha): Currently only does initiation. Add rest.
func (s *syncService) syncer(ctx *context.T) {
+ defer s.pending.Done()
+
// TODO(hpucha): Do we need context per initiator round?
ctx, cancel := context.WithRootCancel(ctx)
defer cancel()
ticker := time.NewTicker(peerSyncInterval)
+ defer ticker.Stop()
+
for {
select {
case <-s.closed:
- ticker.Stop()
- s.pending.Done()
+ vlog.VI(1).Info("sync: syncer: channel closed, stop work and exit")
return
+
case <-ticker.C:
}
@@ -103,9 +107,12 @@
//
// TODO(hpucha): Check the idempotence, esp in addNode in DAG.
func (s *syncService) getDeltasFromPeer(ctx *context.T, peer string) {
+ vlog.VI(2).Infof("sync: getDeltasFromPeer: begin: contacting peer %s", peer)
+ defer vlog.VI(2).Infof("sync: getDeltasFromPeer: end: contacting peer %s", peer)
+
info := s.allMembers.members[peer]
if info == nil {
- vlog.Fatalf("getDeltasFromPeer:: missing information in member view for %q", peer)
+ vlog.Fatalf("sync: getDeltasFromPeer: missing information in member view for %q", peer)
}
connected := false
var stream interfaces.SyncGetDeltasClientCall
@@ -117,12 +124,12 @@
// Initialize initiation state for syncing this Database.
iSt, err := newInitiationState(ctx, s, peer, gdbName, sgInfo)
if err != nil {
- vlog.Errorf("getDeltasFromPeer:: couldn't initialize initiator state for peer %s, gdb %s, err %v", peer, gdbName, err)
+ vlog.Errorf("sync: getDeltasFromPeer: couldn't initialize initiator state for peer %s, gdb %s, err %v", peer, gdbName, err)
continue
}
if len(iSt.sgIds) == 0 || len(iSt.sgPfxs) == 0 {
- vlog.Errorf("getDeltasFromPeer:: didn't find any SyncGroups for peer %s, gdb %s, err %v", peer, gdbName, err)
+ vlog.Errorf("sync: getDeltasFromPeer: didn't find any SyncGroups for peer %s, gdb %s, err %v", peer, gdbName, err)
continue
}
@@ -138,7 +145,7 @@
// Create local genvec so that it contains knowledge only about common prefixes.
if err := iSt.createLocalGenVec(ctx); err != nil {
- vlog.Errorf("getDeltasFromPeer:: error creating local genvec for gdb %s, err %v", gdbName, err)
+ vlog.Errorf("sync: getDeltasFromPeer: error creating local genvec for gdb %s, err %v", gdbName, err)
continue
}
@@ -150,20 +157,22 @@
InitVec: iSt.local,
}
+ vlog.VI(3).Infof("sync: getDeltasFromPeer: send request: %v", req)
sender := iSt.stream.SendStream()
sender.Send(req)
// Obtain deltas from the peer over the network.
if err := iSt.recvAndProcessDeltas(ctx); err != nil {
- vlog.Errorf("getDeltasFromPeer:: error receiving deltas for gdb %s, err %v", gdbName, err)
+ vlog.Errorf("sync: getDeltasFromPeer: error receiving deltas for gdb %s, err %v", gdbName, err)
// Returning here since something could be wrong with
// the connection, and no point in attempting the next
// Database.
return
}
+ vlog.VI(3).Infof("sync: getDeltasFromPeer: got reply: %v", iSt.remote)
if err := iSt.processUpdatedObjects(ctx); err != nil {
- vlog.Errorf("getDeltasFromPeer:: error processing objects for gdb %s, err %v", gdbName, err)
+ vlog.Errorf("sync: getDeltasFromPeer: error processing objects for gdb %s, err %v", gdbName, err)
// Move to the next Database even if processing updates
// failed.
continue
@@ -285,7 +294,7 @@
// obtained from the SyncGroups being synced in the current Database.
func (iSt *initiationState) connectToPeer(ctx *context.T) (interfaces.SyncGetDeltasClientCall, bool) {
if len(iSt.mtTables) < 1 {
- vlog.Errorf("getDeltasFromPeer:: no mount tables found to connect to peer %s, app %s db %s", iSt.peer, iSt.appName, iSt.dbName)
+ vlog.Errorf("sync: connectToPeer: no mount tables found to connect to peer %s, app %s db %s", iSt.peer, iSt.appName, iSt.dbName)
return nil, false
}
for mt := range iSt.mtTables {
@@ -293,6 +302,7 @@
c := interfaces.SyncClient(absName)
stream, err := c.GetDeltas(ctx)
if err == nil {
+ vlog.VI(3).Infof("sync: connectToPeer: established on %s", absName)
return stream, true
}
}
@@ -469,7 +479,7 @@
if verror.ErrorID(err) == store.ErrConcurrentTransaction.ID {
// Note: This might be triggered with memstore until it handles
// transactions in a more fine-grained fashion.
- vlog.Fatalf("recvAndProcessDeltas:: encountered concurrent transaction")
+ vlog.Fatalf("sync: recvAndProcessDeltas: encountered concurrent transaction")
}
if err == nil {
committed = true
@@ -559,11 +569,15 @@
}()
for {
+ vlog.VI(3).Infof("sync: processUpdatedObjects: begin: %d objects updated", len(iSt.updObjects))
+
iSt.tx = iSt.st.NewTransaction()
watchable.SetTransactionFromSync(iSt.tx) // for echo-suppression
- if err := iSt.detectConflicts(ctx); err != nil {
+ if count, err := iSt.detectConflicts(ctx); err != nil {
return err
+ } else {
+ vlog.VI(3).Infof("sync: processUpdatedObjects: %d conflicts detected", count)
}
if err := iSt.resolveConflicts(ctx); err != nil {
@@ -579,8 +593,9 @@
committed = true
// Update in-memory genvector since commit is successful.
if err := iSt.sync.putDbGenInfoRemote(ctx, iSt.appName, iSt.dbName, iSt.updLocal); err != nil {
- vlog.Fatalf("processUpdatedObjects:: putting geninfo in memory failed for app %s db %s, err %v", iSt.appName, iSt.dbName, err)
+ vlog.Fatalf("sync: processUpdatedObjects: putting geninfo in memory failed for app %s db %s, err %v", iSt.appName, iSt.dbName, err)
}
+ vlog.VI(3).Info("sync: processUpdatedObjects: end: changes committed")
return nil
}
@@ -592,26 +607,30 @@
// solution. Next iteration will have coordination with watch
// thread to intelligently retry. Hence this value is not a
// config param.
+ vlog.VI(3).Info("sync: processUpdatedObjects: retry due to local mutations")
iSt.tx.Abort()
time.Sleep(1 * time.Second)
}
}
// detectConflicts iterates through all the updated objects to detect conflicts.
-func (iSt *initiationState) detectConflicts(ctx *context.T) error {
+func (iSt *initiationState) detectConflicts(ctx *context.T) (int, error) {
+ count := 0
for objid, confSt := range iSt.updObjects {
// Check if object has a conflict.
var err error
confSt.isConflict, confSt.newHead, confSt.oldHead, confSt.ancestor, err = hasConflict(ctx, iSt.tx, objid, iSt.dagGraft)
if err != nil {
- return err
+ return 0, err
}
if !confSt.isConflict {
confSt.res = &conflictResolution{ty: pickRemote}
+ } else {
+ count++
}
}
- return nil
+ return count, nil
}
// updateDbAndSync updates the Database, and if that is successful, updates log,
diff --git a/x/ref/services/syncbase/vsync/responder.go b/x/ref/services/syncbase/vsync/responder.go
index 2049755..c0f2c9f 100644
--- a/x/ref/services/syncbase/vsync/responder.go
+++ b/x/ref/services/syncbase/vsync/responder.go
@@ -20,6 +20,9 @@
// GetDeltas implements the responder side of the GetDeltas RPC.
func (s *syncService) GetDeltas(ctx *context.T, call interfaces.SyncGetDeltasServerCall) error {
+ vlog.VI(2).Infof("sync: GetDeltas: begin")
+ defer vlog.VI(2).Infof("sync: GetDeltas: end")
+
recvr := call.RecvStream()
for recvr.Advance() {
req := recvr.Value()
@@ -88,6 +91,13 @@
// responder learned them so that the initiator can reconstruct the DAG for the
// objects by learning older nodes first.
func (rSt *responderState) sendDeltasPerDatabase(ctx *context.T) error {
+ // TODO(rdaoud): for such vlog.VI() calls where the function name is
+ // embedded, consider using a helper function to auto-fill it instead
+ // (see http://goo.gl/mEa4L0) but only incur that overhead when the
+ // logging level specified is enabled.
+ vlog.VI(3).Infof("sync: sendDeltasPerDatabase: %s, %s: sgids %v, genvec %v",
+ rSt.req.AppName, rSt.req.DbName, rSt.req.SgIds, rSt.req.InitVec)
+
// Phase 1 of sendDeltas: Authorize the initiator and respond to the
// caller only for the SyncGroups that allow access.
rSt.authorizeAndFilterSyncGroups(ctx)
@@ -234,6 +244,8 @@
rSt.outVec[pfx] = respgv
}
+ vlog.VI(3).Infof("sync: computeDeltaBound: %s, %s: diff %v, outvec %v",
+ rSt.req.AppName, rSt.req.DbName, rSt.diff, rSt.outVec)
return
}
@@ -395,7 +407,7 @@
// it with the SyncGroup prefixes which are defined by the application.
parts := util.SplitKeyParts(rec.Metadata.ObjId)
if len(parts) < 2 {
- vlog.Fatalf("filterLogRec: invalid entry key %s", rec.Metadata.ObjId)
+ vlog.Fatalf("sync: filterLogRec: invalid entry key %s", rec.Metadata.ObjId)
}
key := util.JoinKeyParts(parts[1:]...)
diff --git a/x/ref/services/syncbase/vsync/sync_state.go b/x/ref/services/syncbase/vsync/sync_state.go
index c0e1429..0c1adce 100644
--- a/x/ref/services/syncbase/vsync/sync_state.go
+++ b/x/ref/services/syncbase/vsync/sync_state.go
@@ -318,15 +318,16 @@
}
// hasLogRec returns true if the log record for (devid, gen) exists.
-func hasLogRec(st store.StoreReader, id, gen uint64) bool {
+func hasLogRec(st store.StoreReader, id, gen uint64) (bool, error) {
// TODO(hpucha): optimize to avoid the unneeded fetch/decode of the data.
var rec localLogRec
- // NOTE(sadovsky): This implementation doesn't explicitly handle
- // non-ErrNoExist errors. Is that intentional?
if err := util.Get(nil, st, logRecKey(id, gen), &rec); err != nil {
- return false
+ if verror.ErrorID(err) == verror.ErrNoExist.ID {
+ err = nil
+ }
+ return false, err
}
- return true
+ return true, nil
}
// putLogRec stores the log record.
diff --git a/x/ref/services/syncbase/vsync/sync_state_test.go b/x/ref/services/syncbase/vsync/sync_state_test.go
index 3929a05..7e9302b 100644
--- a/x/ref/services/syncbase/vsync/sync_state_test.go
+++ b/x/ref/services/syncbase/vsync/sync_state_test.go
@@ -168,7 +168,7 @@
t.Fatalf("getLogRec(%d:%d) failed, got %v, want %v", id, gen, gotRec, wantRec)
}
- if hasLogRec(st, id, gen) != exists {
+ if ok, err := hasLogRec(st, id, gen); err != nil || ok != exists {
t.Fatalf("hasLogRec(%d:%d) failed, want %v", id, gen, exists)
}
}
diff --git a/x/ref/services/syncbase/vsync/syncgroup.go b/x/ref/services/syncbase/vsync/syncgroup.go
index 5be4af6..a06da74 100644
--- a/x/ref/services/syncbase/vsync/syncgroup.go
+++ b/x/ref/services/syncbase/vsync/syncgroup.go
@@ -118,10 +118,14 @@
return err
}
- if hasSGDataEntry(tx, sg.Id) {
+ if ok, err := hasSGDataEntry(tx, sg.Id); err != nil {
+ return err
+ } else if ok {
return verror.New(verror.ErrExist, ctx, "group id already exists")
}
- if hasSGNameEntry(tx, sg.Name) {
+ if ok, err := hasSGNameEntry(tx, sg.Name); err != nil {
+ return err
+ } else if ok {
return verror.New(verror.ErrExist, ctx, "group name already exists")
}
@@ -247,7 +251,7 @@
for stream.Advance() {
var sg interfaces.SyncGroup
if vom.Decode(stream.Value(nil), &sg) != nil {
- vlog.Errorf("invalid SyncGroup value for key %s", string(stream.Key(nil)))
+ vlog.Errorf("sync: forEachSyncGroup: invalid SyncGroup value for key %s", string(stream.Key(nil)))
continue
}
@@ -257,7 +261,7 @@
}
if err := stream.Err(); err != nil {
- vlog.Errorf("forEachSyncGroup: scan stream error: %v", err)
+ vlog.Errorf("sync: forEachSyncGroup: scan stream error: %v", err)
}
}
@@ -298,27 +302,29 @@
}
// hasSGDataEntry returns true if the SyncGroup data entry exists.
-func hasSGDataEntry(st store.StoreReader, gid interfaces.GroupId) bool {
+func hasSGDataEntry(st store.StoreReader, gid interfaces.GroupId) (bool, error) {
// TODO(rdaoud): optimize to avoid the unneeded fetch/decode of the data.
var sg interfaces.SyncGroup
- // NOTE(sadovsky): This implementation doesn't explicitly handle
- // non-ErrNoExist errors. Is that intentional?
if err := util.Get(nil, st, sgDataKey(gid), &sg); err != nil {
- return false
+ if verror.ErrorID(err) == verror.ErrNoExist.ID {
+ err = nil
+ }
+ return false, err
}
- return true
+ return true, nil
}
// hasSGNameEntry returns true if the SyncGroup name entry exists.
-func hasSGNameEntry(st store.StoreReader, name string) bool {
+func hasSGNameEntry(st store.StoreReader, name string) (bool, error) {
// TODO(rdaoud): optimize to avoid the unneeded fetch/decode of the data.
var gid interfaces.GroupId
- // NOTE(sadovsky): This implementation doesn't explicitly handle
- // non-ErrNoExist errors. Is that intentional?
if err := util.Get(nil, st, sgNameKey(name), &gid); err != nil {
- return false
+ if verror.ErrorID(err) == verror.ErrNoExist.ID {
+ err = nil
+ }
+ return false, err
}
- return true
+ return true, nil
}
// setSGDataEntry stores the SyncGroup data entry.
@@ -368,6 +374,9 @@
// TODO(hpucha): Pass blessings along.
func (sd *syncDatabase) CreateSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string, spec wire.SyncGroupSpec, myInfo wire.SyncGroupMemberInfo) error {
+ vlog.VI(2).Infof("sync: CreateSyncGroup: begin: %s", sgName)
+ defer vlog.VI(2).Infof("sync: CreateSyncGroup: end: %s", sgName)
+
err := store.RunInTransaction(sd.db.St(), func(tx store.StoreReadWriter) error {
// Check permissions on Database.
if err := sd.db.CheckPermsInternal(ctx, call, tx); err != nil {
@@ -421,6 +430,9 @@
// TODO(hpucha): Pass blessings along.
func (sd *syncDatabase) JoinSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string, myInfo wire.SyncGroupMemberInfo) (wire.SyncGroupSpec, error) {
+ vlog.VI(2).Infof("sync: JoinSyncGroup: begin: %s", sgName)
+ defer vlog.VI(2).Infof("sync: JoinSyncGroup: end: %s", sgName)
+
var sgErr error
var sg *interfaces.SyncGroup
nullSpec := wire.SyncGroupSpec{}
@@ -589,14 +601,13 @@
// tuples) and internally as strings that match the store's key format.
for _, mp := range opts.ManagedPrefixes {
for _, p := range prefixes {
- var k, v []byte
start, limit := util.ScanPrefixArgs(util.JoinKeyParts(util.VersionPrefix, mp), p)
stream := tx.Scan(start, limit)
for stream.Advance() {
- k, v = stream.Key(k), stream.Value(v)
+ k, v := stream.Key(nil), stream.Value(nil)
parts := util.SplitKeyParts(string(k))
if len(parts) < 2 {
- vlog.Fatalf("bootstrapSyncGroup: invalid version key %s", string(k))
+ vlog.Fatalf("sync: bootstrapSyncGroup: invalid version key %s", string(k))
}
key := []byte(util.JoinKeyParts(parts[1:]...))
diff --git a/x/ref/services/syncbase/vsync/util.go b/x/ref/services/syncbase/vsync/util.go
index 3622672..af0b158 100644
--- a/x/ref/services/syncbase/vsync/util.go
+++ b/x/ref/services/syncbase/vsync/util.go
@@ -29,7 +29,7 @@
// elsewhere).
appNames, err := s.sv.AppNames(ctx, nil)
if err != nil {
- vlog.Errorf("forEachDatabaseStore: cannot get all app names: %v", err)
+ vlog.Errorf("sync: forEachDatabaseStore: cannot get all app names: %v", err)
return
}
@@ -37,12 +37,12 @@
// For each app, get its databases and iterate over them.
app, err := s.sv.App(ctx, nil, a)
if err != nil {
- vlog.Errorf("forEachDatabaseStore: cannot get app %s: %v", a, err)
+ vlog.Errorf("sync: forEachDatabaseStore: cannot get app %s: %v", a, err)
continue
}
dbNames, err := app.NoSQLDatabaseNames(ctx, nil)
if err != nil {
- vlog.Errorf("forEachDatabaseStore: cannot get all db names for app %s: %v", a, err)
+ vlog.Errorf("sync: forEachDatabaseStore: cannot get all db names for app %s: %v", a, err)
continue
}
@@ -50,7 +50,7 @@
// For each database, get its Store and invoke the callback.
db, err := app.NoSQLDatabase(ctx, nil, d)
if err != nil {
- vlog.Errorf("forEachDatabaseStore: cannot get db %s for app %s: %v", d, a, err)
+ vlog.Errorf("sync: forEachDatabaseStore: cannot get db %s for app %s: %v", d, a, err)
continue
}
@@ -77,7 +77,7 @@
// unixNanoToTime converts a Unix timestamp in nanoseconds to a Time object.
func unixNanoToTime(timestamp int64) time.Time {
if timestamp < 0 {
- vlog.Fatalf("unixNanoToTime: invalid timestamp %d", timestamp)
+ vlog.Fatalf("sync: unixNanoToTime: invalid timestamp %d", timestamp)
}
return time.Unix(timestamp/nanoPerSec, timestamp%nanoPerSec)
}
diff --git a/x/ref/services/syncbase/vsync/watcher.go b/x/ref/services/syncbase/vsync/watcher.go
index 5e14292..3b745a8 100644
--- a/x/ref/services/syncbase/vsync/watcher.go
+++ b/x/ref/services/syncbase/vsync/watcher.go
@@ -67,7 +67,7 @@
for {
select {
case <-s.closed:
- vlog.VI(1).Info("watchStore: sync channel closed, stop watching and exit")
+ vlog.VI(1).Info("sync: watchStore: channel closed, stop watching and exit")
return
case <-ticker.C:
@@ -84,10 +84,13 @@
// from starving others. A batch is stored as a contiguous set of log records
// ending with one record having the "continued" flag set to false.
func (s *syncService) processDatabase(ctx *context.T, appName, dbName string, st store.Store) {
+ vlog.VI(2).Infof("sync: processDatabase: begin: %s, %s", appName, dbName)
+ defer vlog.VI(2).Infof("sync: processDatabase: end: %s, %s", appName, dbName)
+
resMark, err := getResMark(ctx, st)
if err != nil {
if verror.ErrorID(err) != verror.ErrNoExist.ID {
- vlog.Errorf("processDatabase: %s, %s: cannot get resMark: %v", appName, dbName, err)
+ vlog.Errorf("sync: processDatabase: %s, %s: cannot get resMark: %v", appName, dbName, err)
return
}
resMark = ""
@@ -135,6 +138,8 @@
}
}
logs = logs[:i]
+ vlog.VI(3).Infof("sync: processWatchLogBatch: %s, %s: sg snap %t, syncable %d, total %d",
+ appName, dbName, !appBatch, len(logs), totalCount)
// Transactional processing of the batch: convert these syncable log
// records to a batch of sync log records, filling their parent versions
@@ -142,7 +147,9 @@
err := store.RunInTransaction(st, func(tx store.StoreReadWriter) error {
batch := make([]*localLogRec, 0, len(logs))
for _, entry := range logs {
- if rec := convertLogRecord(ctx, tx, entry); rec != nil {
+ if rec, err := convertLogRecord(ctx, tx, entry); err != nil {
+ return err
+ } else if rec != nil {
batch = append(batch, rec)
}
}
@@ -155,7 +162,7 @@
if err != nil {
// TODO(rdaoud): don't crash, quarantine this app database.
- vlog.Fatalf("processDatabase: %s, %s: watcher cannot process batch: %v", appName, dbName, err)
+ vlog.Fatalf("sync: processWatchLogBatch:: %s, %s: watcher cannot process batch: %v", appName, dbName, err)
}
}
@@ -180,6 +187,9 @@
gen, pos := s.reserveGenAndPosInDbLog(ctx, appName, dbName, count)
+ vlog.VI(3).Infof("sync: processBatch: %s, %s: len %d, total %d, btid %x, gen %d, pos %d",
+ appName, dbName, count, totalCount, batchId, gen, pos)
+
for _, rec := range batch {
// Update the log record. Portions of the record Metadata must
// already be filled.
@@ -290,7 +300,7 @@
logKey := string(stream.Key(nil))
var logEnt watchable.LogEntry
if vom.Decode(stream.Value(nil), &logEnt) != nil {
- vlog.Fatalf("getWatchLogBatch: %s, %s: invalid watch LogEntry %s: %v",
+ vlog.Fatalf("sync: getWatchLogBatch: %s, %s: invalid watch LogEntry %s: %v",
appName, dbName, logKey, stream.Value(nil))
}
@@ -305,12 +315,12 @@
}
if err := stream.Err(); err != nil {
- vlog.Errorf("getWatchLogBatch: %s, %s: scan stream error: %v", appName, dbName, err)
+ vlog.Errorf("sync: getWatchLogBatch: %s, %s: scan stream error: %v", appName, dbName, err)
return nil, resMark
}
if !endOfBatch {
if len(logs) > 0 {
- vlog.Fatalf("processDatabase: %s, %s: end of batch not found after %d entries",
+ vlog.Fatalf("sync: getWatchLogBatch: %s, %s: end of batch not found after %d entries",
appName, dbName, len(logs))
}
return nil, resMark
@@ -326,7 +336,7 @@
// simplify the store-to-sync interaction. A deleted key would still have a
// version and its value entry would encode the "deleted" flag, either in the
// key or probably in a value wrapper that would contain other metadata.
-func convertLogRecord(ctx *context.T, tx store.StoreReadWriter, logEnt *watchable.LogEntry) *localLogRec {
+func convertLogRecord(ctx *context.T, tx store.StoreReadWriter, logEnt *watchable.LogEntry) (*localLogRec, error) {
_ = tx.(store.Transaction)
var rec *localLogRec
timestamp := logEnt.CommitTimestamp
@@ -345,7 +355,9 @@
// Create records for object versions not already in the DAG.
// Duplicates can appear here in cases of nested SyncGroups or
// peer SyncGroups.
- if !hasNode(ctx, tx, string(op.Value.Key), string(op.Value.Version)) {
+ if ok, err := hasNode(ctx, tx, string(op.Value.Key), string(op.Value.Version)); err != nil {
+ return nil, err
+ } else if !ok {
rec = newLocalLogRec(ctx, tx, op.Value.Key, op.Value.Version, false, timestamp)
}
@@ -353,13 +365,15 @@
rec = newLocalLogRec(ctx, tx, op.Value.Key, watchable.NewVersion(), true, timestamp)
case watchable.OpSyncGroup:
- vlog.Errorf("watch LogEntry for SyncGroup should not be converted: %v", logEnt)
+ vlog.Errorf("sync: convertLogRecord: watch LogEntry for SyncGroup should not be converted: %v", logEnt)
+ return nil, verror.New(verror.ErrInternal, ctx, "cannot convert a watch log OpSyncGroup entry")
default:
- vlog.Errorf("invalid watch LogEntry: %v", logEnt)
+ vlog.Errorf("sync: convertLogRecord: invalid watch LogEntry: %v", logEnt)
+ return nil, verror.New(verror.ErrInternal, ctx, "cannot convert unknown watch log entry")
}
- return rec
+ return rec, nil
}
// newLocalLogRec creates a local sync log record given its information: key,
@@ -377,7 +391,7 @@
if head, err := getHead(ctx, tx, oid); err == nil {
rec.Metadata.Parents = []string{head}
} else if deleted || (verror.ErrorID(err) != verror.ErrNoExist.ID) {
- vlog.Fatalf("cannot getHead to convert log record for %s: %v", oid, err)
+ vlog.Fatalf("sync: newLocalLogRec: cannot getHead to convert log record for %s: %v", oid, err)
}
rec.Metadata.UpdTime = unixNanoToTime(timestamp)
return &rec
@@ -397,6 +411,8 @@
incrWatchPrefix(appName, dbName, prefix)
}
}
+ vlog.VI(3).Infof("sync: processSyncGroupLogRecord: %s, %s: remove %t, prefixes: %q",
+ appName, dbName, remove, op.Value.Prefixes)
return true
default:
@@ -425,7 +441,7 @@
// it with the SyncGroup prefixes which are defined by the application.
parts := util.SplitKeyParts(key)
if len(parts) < 2 {
- vlog.Fatalf("syncable: %s: invalid entry key %s: %v", appdb, key, logEnt)
+ vlog.Fatalf("sync: syncable: %s: invalid entry key %s: %v", appdb, key, logEnt)
}
key = util.JoinKeyParts(parts[1:]...)
diff --git a/x/ref/services/syncbase/vsync/watcher_test.go b/x/ref/services/syncbase/vsync/watcher_test.go
index a2107d5..66aae8f 100644
--- a/x/ref/services/syncbase/vsync/watcher_test.go
+++ b/x/ref/services/syncbase/vsync/watcher_test.go
@@ -177,8 +177,11 @@
if res, err := getResMark(nil, st); err != nil && res != resmark {
t.Errorf("invalid resmark batch processing: got %s instead of %s", res, resmark)
}
- if hasNode(nil, st, fooKey, "123") || hasNode(nil, st, barKey, "555") {
- t.Error("hasNode() found DAG entries for non-syncable logs")
+ if ok, err := hasNode(nil, st, fooKey, "123"); err != nil || ok {
+ t.Error("hasNode() found DAG entry for non-syncable log on foo")
+ }
+ if ok, err := hasNode(nil, st, barKey, "555"); err != nil || ok {
+ t.Error("hasNode() found DAG entry for non-syncable log on bar")
}
// Partially syncable logs.
@@ -212,8 +215,8 @@
if node2.Level != 0 || node2.Parents != nil || node2.Logrec == "" || node2.BatchId != NoBatchId {
t.Errorf("invalid DAG node for fooxyz: %v", node2)
}
- if hasNode(nil, st, barKey, "222") {
- t.Error("hasNode() found DAG entries for non-syncable logs")
+ if ok, err := hasNode(nil, st, barKey, "222"); err != nil || ok {
+ t.Error("hasNode() found DAG entry for non-syncable log on bar")
}
// More partially syncable logs updating existing ones.
@@ -254,8 +257,8 @@
node2.Logrec == "" || node2.BatchId == NoBatchId {
t.Errorf("invalid DAG node for fooxyz: %v", node2)
}
- if hasNode(nil, st, barKey, "7") {
- t.Error("hasNode() found DAG entries for non-syncable logs")
+ if ok, err := hasNode(nil, st, barKey, "7"); err != nil || ok {
+ t.Error("hasNode() found DAG entry for non-syncable log on bar")
}
// Back to non-syncable logs (remove "f" prefix).
@@ -282,8 +285,8 @@
if node, err := getNode(nil, st, fooxyzKey, "888"); err == nil {
t.Errorf("getNode() should not have found fooxyz @ 888: %v", node)
}
- if hasNode(nil, st, barKey, "007") {
- t.Error("hasNode() found DAG entries for non-syncable logs")
+ if ok, err := hasNode(nil, st, barKey, "007"); err != nil || ok {
+ t.Error("hasNode() found DAG entry for non-syncable log on bar")
}
// Scan the batch records and verify that there is only 1 DAG batch