syncbase/vsync: logging & error handling

- Add some debug logging to initiator, responder, and watcher.
- Fix error handling in hasNode() and similar hasXYZ() calls.

Change-Id: Idb5596526b61079e9998bca5f530eac136e2d14c
diff --git a/x/ref/services/syncbase/vsync/conflict_resolution.go b/x/ref/services/syncbase/vsync/conflict_resolution.go
index 724e49f..39d63f1 100644
--- a/x/ref/services/syncbase/vsync/conflict_resolution.go
+++ b/x/ref/services/syncbase/vsync/conflict_resolution.go
@@ -103,7 +103,7 @@
 	case local.Metadata.CurVers < remote.Metadata.CurVers:
 		res.ty = pickRemote
 	default:
-		vlog.Fatalf("resolveObjConflictByTime:: local and remote update times and versions are the same, local %v remote %v", local, remote)
+		vlog.Fatalf("sync: resolveObjConflictByTime: local and remote update times and versions are the same, local %v remote %v", local, remote)
 	}
 
 	return &res, nil
diff --git a/x/ref/services/syncbase/vsync/dag.go b/x/ref/services/syncbase/vsync/dag.go
index 0f5adfd..728cb0f 100644
--- a/x/ref/services/syncbase/vsync/dag.go
+++ b/x/ref/services/syncbase/vsync/dag.go
@@ -279,7 +279,9 @@
 	}
 
 	// The new node must not exist.
-	if hasNode(ctx, tx, oid, version) {
+	if ok, err := hasNode(ctx, tx, oid, version); err != nil {
+		return err
+	} else if ok {
 		return verror.New(verror.ErrInternal, ctx, "DAG node already exists", oid, version)
 	}
 
@@ -444,7 +446,9 @@
 	_ = tx.(store.Transaction)
 
 	// Verify that the node exists.
-	if !hasNode(ctx, tx, oid, head) {
+	if ok, err := hasNode(ctx, tx, oid, head); err != nil {
+		return err
+	} else if !ok {
 		return verror.New(verror.ErrInternal, ctx, "node", oid, head, "does not exist")
 	}
 
@@ -742,12 +746,15 @@
 }
 
 // hasNode returns true if the node (oid, version) exists in the DAG.
-func hasNode(ctx *context.T, st store.StoreReader, oid, version string) bool {
+func hasNode(ctx *context.T, st store.StoreReader, oid, version string) (bool, error) {
 	// TODO(rdaoud): optimize to avoid the unneeded fetch/decode of the data.
 	if _, err := getNode(ctx, st, oid, version); err != nil {
-		return false
+		if verror.ErrorID(err) == verror.ErrNoExist.ID {
+			err = nil
+		}
+		return false, err
 	}
-	return true
+	return true, nil
 }
 
 // headKey returns the key used to access the DAG object head.
diff --git a/x/ref/services/syncbase/vsync/dag_test.go b/x/ref/services/syncbase/vsync/dag_test.go
index 1aa75d5..3728635 100644
--- a/x/ref/services/syncbase/vsync/dag_test.go
+++ b/x/ref/services/syncbase/vsync/dag_test.go
@@ -31,7 +31,7 @@
 		t.Errorf("found non-existent object %s:%s: %v", oid, version, node)
 	}
 
-	if hasNode(nil, st, oid, version) {
+	if ok, err := hasNode(nil, st, oid, version); err != nil || ok {
 		t.Errorf("hasNode() found non-existent object %s:%s", oid, version)
 	}
 
@@ -52,7 +52,7 @@
 		t.Errorf("cannot find stored object %s:%s: %v", oid, version, err)
 	}
 
-	if !hasNode(nil, st, oid, version) {
+	if ok, err := hasNode(nil, st, oid, version); err != nil || !ok {
 		t.Errorf("hasNode() did not find object %s:%s", oid, version)
 	}
 
@@ -92,7 +92,7 @@
 		t.Errorf("found deleted object %s:%s (%v)", oid, version, node2)
 	}
 
-	if hasNode(nil, st, oid, version) {
+	if ok, err := hasNode(nil, st, oid, version); err != nil || ok {
 		t.Errorf("hasNode() found deleted object %s:%s", oid, version)
 	}
 
diff --git a/x/ref/services/syncbase/vsync/initiator.go b/x/ref/services/syncbase/vsync/initiator.go
index 600af4b..e14dac6 100644
--- a/x/ref/services/syncbase/vsync/initiator.go
+++ b/x/ref/services/syncbase/vsync/initiator.go
@@ -60,17 +60,21 @@
 //
 // TODO(hpucha): Currently only does initiation. Add rest.
 func (s *syncService) syncer(ctx *context.T) {
+	defer s.pending.Done()
+
 	// TODO(hpucha): Do we need context per initiator round?
 	ctx, cancel := context.WithRootCancel(ctx)
 	defer cancel()
 
 	ticker := time.NewTicker(peerSyncInterval)
+	defer ticker.Stop()
+
 	for {
 		select {
 		case <-s.closed:
-			ticker.Stop()
-			s.pending.Done()
+			vlog.VI(1).Info("sync: syncer: channel closed, stop work and exit")
 			return
+
 		case <-ticker.C:
 		}
 
@@ -103,9 +107,12 @@
 //
 // TODO(hpucha): Check the idempotence, esp in addNode in DAG.
 func (s *syncService) getDeltasFromPeer(ctx *context.T, peer string) {
+	vlog.VI(2).Infof("sync: getDeltasFromPeer: begin: contacting peer %s", peer)
+	defer vlog.VI(2).Infof("sync: getDeltasFromPeer: end: contacting peer %s", peer)
+
 	info := s.allMembers.members[peer]
 	if info == nil {
-		vlog.Fatalf("getDeltasFromPeer:: missing information in member view for %q", peer)
+		vlog.Fatalf("sync: getDeltasFromPeer: missing information in member view for %q", peer)
 	}
 	connected := false
 	var stream interfaces.SyncGetDeltasClientCall
@@ -117,12 +124,12 @@
 		// Initialize initiation state for syncing this Database.
 		iSt, err := newInitiationState(ctx, s, peer, gdbName, sgInfo)
 		if err != nil {
-			vlog.Errorf("getDeltasFromPeer:: couldn't initialize initiator state for peer %s, gdb %s, err %v", peer, gdbName, err)
+			vlog.Errorf("sync: getDeltasFromPeer: couldn't initialize initiator state for peer %s, gdb %s, err %v", peer, gdbName, err)
 			continue
 		}
 
 		if len(iSt.sgIds) == 0 || len(iSt.sgPfxs) == 0 {
-			vlog.Errorf("getDeltasFromPeer:: didn't find any SyncGroups for peer %s, gdb %s, err %v", peer, gdbName, err)
+			vlog.Errorf("sync: getDeltasFromPeer: didn't find any SyncGroups for peer %s, gdb %s, err %v", peer, gdbName, err)
 			continue
 		}
 
@@ -138,7 +145,7 @@
 
 		// Create local genvec so that it contains knowledge only about common prefixes.
 		if err := iSt.createLocalGenVec(ctx); err != nil {
-			vlog.Errorf("getDeltasFromPeer:: error creating local genvec for gdb %s, err %v", gdbName, err)
+			vlog.Errorf("sync: getDeltasFromPeer: error creating local genvec for gdb %s, err %v", gdbName, err)
 			continue
 		}
 
@@ -150,20 +157,22 @@
 			InitVec: iSt.local,
 		}
 
+		vlog.VI(3).Infof("sync: getDeltasFromPeer: send request: %v", req)
 		sender := iSt.stream.SendStream()
 		sender.Send(req)
 
 		// Obtain deltas from the peer over the network.
 		if err := iSt.recvAndProcessDeltas(ctx); err != nil {
-			vlog.Errorf("getDeltasFromPeer:: error receiving deltas for gdb %s, err %v", gdbName, err)
+			vlog.Errorf("sync: getDeltasFromPeer: error receiving deltas for gdb %s, err %v", gdbName, err)
 			// Returning here since something could be wrong with
 			// the connection, and no point in attempting the next
 			// Database.
 			return
 		}
+		vlog.VI(3).Infof("sync: getDeltasFromPeer: got reply: %v", iSt.remote)
 
 		if err := iSt.processUpdatedObjects(ctx); err != nil {
-			vlog.Errorf("getDeltasFromPeer:: error processing objects for gdb %s, err %v", gdbName, err)
+			vlog.Errorf("sync: getDeltasFromPeer: error processing objects for gdb %s, err %v", gdbName, err)
 			// Move to the next Database even if processing updates
 			// failed.
 			continue
@@ -285,7 +294,7 @@
 // obtained from the SyncGroups being synced in the current Database.
 func (iSt *initiationState) connectToPeer(ctx *context.T) (interfaces.SyncGetDeltasClientCall, bool) {
 	if len(iSt.mtTables) < 1 {
-		vlog.Errorf("getDeltasFromPeer:: no mount tables found to connect to peer %s, app %s db %s", iSt.peer, iSt.appName, iSt.dbName)
+		vlog.Errorf("sync: connectToPeer: no mount tables found to connect to peer %s, app %s db %s", iSt.peer, iSt.appName, iSt.dbName)
 		return nil, false
 	}
 	for mt := range iSt.mtTables {
@@ -293,6 +302,7 @@
 		c := interfaces.SyncClient(absName)
 		stream, err := c.GetDeltas(ctx)
 		if err == nil {
+			vlog.VI(3).Infof("sync: connectToPeer: established on %s", absName)
 			return stream, true
 		}
 	}
@@ -469,7 +479,7 @@
 	if verror.ErrorID(err) == store.ErrConcurrentTransaction.ID {
 		// Note: This might be triggered with memstore until it handles
 		// transactions in a more fine-grained fashion.
-		vlog.Fatalf("recvAndProcessDeltas:: encountered concurrent transaction")
+		vlog.Fatalf("sync: recvAndProcessDeltas: encountered concurrent transaction")
 	}
 	if err == nil {
 		committed = true
@@ -559,11 +569,15 @@
 	}()
 
 	for {
+		vlog.VI(3).Infof("sync: processUpdatedObjects: begin: %d objects updated", len(iSt.updObjects))
+
 		iSt.tx = iSt.st.NewTransaction()
 		watchable.SetTransactionFromSync(iSt.tx) // for echo-suppression
 
-		if err := iSt.detectConflicts(ctx); err != nil {
+		if count, err := iSt.detectConflicts(ctx); err != nil {
 			return err
+		} else {
+			vlog.VI(3).Infof("sync: processUpdatedObjects: %d conflicts detected", count)
 		}
 
 		if err := iSt.resolveConflicts(ctx); err != nil {
@@ -579,8 +593,9 @@
 			committed = true
 			// Update in-memory genvector since commit is successful.
 			if err := iSt.sync.putDbGenInfoRemote(ctx, iSt.appName, iSt.dbName, iSt.updLocal); err != nil {
-				vlog.Fatalf("processUpdatedObjects:: putting geninfo in memory failed for app %s db %s, err %v", iSt.appName, iSt.dbName, err)
+				vlog.Fatalf("sync: processUpdatedObjects: putting geninfo in memory failed for app %s db %s, err %v", iSt.appName, iSt.dbName, err)
 			}
+			vlog.VI(3).Info("sync: processUpdatedObjects: end: changes committed")
 			return nil
 		}
 
@@ -592,26 +607,30 @@
 		// solution. Next iteration will have coordination with watch
 		// thread to intelligently retry. Hence this value is not a
 		// config param.
+		vlog.VI(3).Info("sync: processUpdatedObjects: retry due to local mutations")
 		iSt.tx.Abort()
 		time.Sleep(1 * time.Second)
 	}
 }
 
 // detectConflicts iterates through all the updated objects to detect conflicts.
-func (iSt *initiationState) detectConflicts(ctx *context.T) error {
+func (iSt *initiationState) detectConflicts(ctx *context.T) (int, error) {
+	count := 0
 	for objid, confSt := range iSt.updObjects {
 		// Check if object has a conflict.
 		var err error
 		confSt.isConflict, confSt.newHead, confSt.oldHead, confSt.ancestor, err = hasConflict(ctx, iSt.tx, objid, iSt.dagGraft)
 		if err != nil {
-			return err
+			return 0, err
 		}
 
 		if !confSt.isConflict {
 			confSt.res = &conflictResolution{ty: pickRemote}
+		} else {
+			count++
 		}
 	}
-	return nil
+	return count, nil
 }
 
 // updateDbAndSync updates the Database, and if that is successful, updates log,
diff --git a/x/ref/services/syncbase/vsync/responder.go b/x/ref/services/syncbase/vsync/responder.go
index 2049755..c0f2c9f 100644
--- a/x/ref/services/syncbase/vsync/responder.go
+++ b/x/ref/services/syncbase/vsync/responder.go
@@ -20,6 +20,9 @@
 
 // GetDeltas implements the responder side of the GetDeltas RPC.
 func (s *syncService) GetDeltas(ctx *context.T, call interfaces.SyncGetDeltasServerCall) error {
+	vlog.VI(2).Infof("sync: GetDeltas: begin")
+	defer vlog.VI(2).Infof("sync: GetDeltas: end")
+
 	recvr := call.RecvStream()
 	for recvr.Advance() {
 		req := recvr.Value()
@@ -88,6 +91,13 @@
 // responder learned them so that the initiator can reconstruct the DAG for the
 // objects by learning older nodes first.
 func (rSt *responderState) sendDeltasPerDatabase(ctx *context.T) error {
+	// TODO(rdaoud): for such vlog.VI() calls where the function name is
+	// embedded, consider using a helper function to auto-fill it instead
+	// (see http://goo.gl/mEa4L0) but only incur that overhead when the
+	// logging level specified is enabled.
+	vlog.VI(3).Infof("sync: sendDeltasPerDatabase: %s, %s: sgids %v, genvec %v",
+		rSt.req.AppName, rSt.req.DbName, rSt.req.SgIds, rSt.req.InitVec)
+
 	// Phase 1 of sendDeltas: Authorize the initiator and respond to the
 	// caller only for the SyncGroups that allow access.
 	rSt.authorizeAndFilterSyncGroups(ctx)
@@ -234,6 +244,8 @@
 		rSt.outVec[pfx] = respgv
 	}
 
+	vlog.VI(3).Infof("sync: computeDeltaBound: %s, %s: diff %v, outvec %v",
+		rSt.req.AppName, rSt.req.DbName, rSt.diff, rSt.outVec)
 	return
 }
 
@@ -395,7 +407,7 @@
 	// it with the SyncGroup prefixes which are defined by the application.
 	parts := util.SplitKeyParts(rec.Metadata.ObjId)
 	if len(parts) < 2 {
-		vlog.Fatalf("filterLogRec: invalid entry key %s", rec.Metadata.ObjId)
+		vlog.Fatalf("sync: filterLogRec: invalid entry key %s", rec.Metadata.ObjId)
 	}
 	key := util.JoinKeyParts(parts[1:]...)
 
diff --git a/x/ref/services/syncbase/vsync/sync_state.go b/x/ref/services/syncbase/vsync/sync_state.go
index c0e1429..0c1adce 100644
--- a/x/ref/services/syncbase/vsync/sync_state.go
+++ b/x/ref/services/syncbase/vsync/sync_state.go
@@ -318,15 +318,16 @@
 }
 
 // hasLogRec returns true if the log record for (devid, gen) exists.
-func hasLogRec(st store.StoreReader, id, gen uint64) bool {
+func hasLogRec(st store.StoreReader, id, gen uint64) (bool, error) {
 	// TODO(hpucha): optimize to avoid the unneeded fetch/decode of the data.
 	var rec localLogRec
-	// NOTE(sadovsky): This implementation doesn't explicitly handle
-	// non-ErrNoExist errors. Is that intentional?
 	if err := util.Get(nil, st, logRecKey(id, gen), &rec); err != nil {
-		return false
+		if verror.ErrorID(err) == verror.ErrNoExist.ID {
+			err = nil
+		}
+		return false, err
 	}
-	return true
+	return true, nil
 }
 
 // putLogRec stores the log record.
diff --git a/x/ref/services/syncbase/vsync/sync_state_test.go b/x/ref/services/syncbase/vsync/sync_state_test.go
index 3929a05..7e9302b 100644
--- a/x/ref/services/syncbase/vsync/sync_state_test.go
+++ b/x/ref/services/syncbase/vsync/sync_state_test.go
@@ -168,7 +168,7 @@
 		t.Fatalf("getLogRec(%d:%d) failed, got %v, want %v", id, gen, gotRec, wantRec)
 	}
 
-	if hasLogRec(st, id, gen) != exists {
+	if ok, err := hasLogRec(st, id, gen); err != nil || ok != exists {
 		t.Fatalf("hasLogRec(%d:%d) failed, want %v", id, gen, exists)
 	}
 }
diff --git a/x/ref/services/syncbase/vsync/syncgroup.go b/x/ref/services/syncbase/vsync/syncgroup.go
index 5be4af6..a06da74 100644
--- a/x/ref/services/syncbase/vsync/syncgroup.go
+++ b/x/ref/services/syncbase/vsync/syncgroup.go
@@ -118,10 +118,14 @@
 		return err
 	}
 
-	if hasSGDataEntry(tx, sg.Id) {
+	if ok, err := hasSGDataEntry(tx, sg.Id); err != nil {
+		return err
+	} else if ok {
 		return verror.New(verror.ErrExist, ctx, "group id already exists")
 	}
-	if hasSGNameEntry(tx, sg.Name) {
+	if ok, err := hasSGNameEntry(tx, sg.Name); err != nil {
+		return err
+	} else if ok {
 		return verror.New(verror.ErrExist, ctx, "group name already exists")
 	}
 
@@ -247,7 +251,7 @@
 	for stream.Advance() {
 		var sg interfaces.SyncGroup
 		if vom.Decode(stream.Value(nil), &sg) != nil {
-			vlog.Errorf("invalid SyncGroup value for key %s", string(stream.Key(nil)))
+			vlog.Errorf("sync: forEachSyncGroup: invalid SyncGroup value for key %s", string(stream.Key(nil)))
 			continue
 		}
 
@@ -257,7 +261,7 @@
 	}
 
 	if err := stream.Err(); err != nil {
-		vlog.Errorf("forEachSyncGroup: scan stream error: %v", err)
+		vlog.Errorf("sync: forEachSyncGroup: scan stream error: %v", err)
 	}
 }
 
@@ -298,27 +302,29 @@
 }
 
 // hasSGDataEntry returns true if the SyncGroup data entry exists.
-func hasSGDataEntry(st store.StoreReader, gid interfaces.GroupId) bool {
+func hasSGDataEntry(st store.StoreReader, gid interfaces.GroupId) (bool, error) {
 	// TODO(rdaoud): optimize to avoid the unneeded fetch/decode of the data.
 	var sg interfaces.SyncGroup
-	// NOTE(sadovsky): This implementation doesn't explicitly handle
-	// non-ErrNoExist errors. Is that intentional?
 	if err := util.Get(nil, st, sgDataKey(gid), &sg); err != nil {
-		return false
+		if verror.ErrorID(err) == verror.ErrNoExist.ID {
+			err = nil
+		}
+		return false, err
 	}
-	return true
+	return true, nil
 }
 
 // hasSGNameEntry returns true if the SyncGroup name entry exists.
-func hasSGNameEntry(st store.StoreReader, name string) bool {
+func hasSGNameEntry(st store.StoreReader, name string) (bool, error) {
 	// TODO(rdaoud): optimize to avoid the unneeded fetch/decode of the data.
 	var gid interfaces.GroupId
-	// NOTE(sadovsky): This implementation doesn't explicitly handle
-	// non-ErrNoExist errors. Is that intentional?
 	if err := util.Get(nil, st, sgNameKey(name), &gid); err != nil {
-		return false
+		if verror.ErrorID(err) == verror.ErrNoExist.ID {
+			err = nil
+		}
+		return false, err
 	}
-	return true
+	return true, nil
 }
 
 // setSGDataEntry stores the SyncGroup data entry.
@@ -368,6 +374,9 @@
 
 // TODO(hpucha): Pass blessings along.
 func (sd *syncDatabase) CreateSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string, spec wire.SyncGroupSpec, myInfo wire.SyncGroupMemberInfo) error {
+	vlog.VI(2).Infof("sync: CreateSyncGroup: begin: %s", sgName)
+	defer vlog.VI(2).Infof("sync: CreateSyncGroup: end: %s", sgName)
+
 	err := store.RunInTransaction(sd.db.St(), func(tx store.StoreReadWriter) error {
 		// Check permissions on Database.
 		if err := sd.db.CheckPermsInternal(ctx, call, tx); err != nil {
@@ -421,6 +430,9 @@
 
 // TODO(hpucha): Pass blessings along.
 func (sd *syncDatabase) JoinSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string, myInfo wire.SyncGroupMemberInfo) (wire.SyncGroupSpec, error) {
+	vlog.VI(2).Infof("sync: JoinSyncGroup: begin: %s", sgName)
+	defer vlog.VI(2).Infof("sync: JoinSyncGroup: end: %s", sgName)
+
 	var sgErr error
 	var sg *interfaces.SyncGroup
 	nullSpec := wire.SyncGroupSpec{}
@@ -589,14 +601,13 @@
 	// tuples) and internally as strings that match the store's key format.
 	for _, mp := range opts.ManagedPrefixes {
 		for _, p := range prefixes {
-			var k, v []byte
 			start, limit := util.ScanPrefixArgs(util.JoinKeyParts(util.VersionPrefix, mp), p)
 			stream := tx.Scan(start, limit)
 			for stream.Advance() {
-				k, v = stream.Key(k), stream.Value(v)
+				k, v := stream.Key(nil), stream.Value(nil)
 				parts := util.SplitKeyParts(string(k))
 				if len(parts) < 2 {
-					vlog.Fatalf("bootstrapSyncGroup: invalid version key %s", string(k))
+					vlog.Fatalf("sync: bootstrapSyncGroup: invalid version key %s", string(k))
 
 				}
 				key := []byte(util.JoinKeyParts(parts[1:]...))
diff --git a/x/ref/services/syncbase/vsync/util.go b/x/ref/services/syncbase/vsync/util.go
index 3622672..af0b158 100644
--- a/x/ref/services/syncbase/vsync/util.go
+++ b/x/ref/services/syncbase/vsync/util.go
@@ -29,7 +29,7 @@
 	// elsewhere).
 	appNames, err := s.sv.AppNames(ctx, nil)
 	if err != nil {
-		vlog.Errorf("forEachDatabaseStore: cannot get all app names: %v", err)
+		vlog.Errorf("sync: forEachDatabaseStore: cannot get all app names: %v", err)
 		return
 	}
 
@@ -37,12 +37,12 @@
 		// For each app, get its databases and iterate over them.
 		app, err := s.sv.App(ctx, nil, a)
 		if err != nil {
-			vlog.Errorf("forEachDatabaseStore: cannot get app %s: %v", a, err)
+			vlog.Errorf("sync: forEachDatabaseStore: cannot get app %s: %v", a, err)
 			continue
 		}
 		dbNames, err := app.NoSQLDatabaseNames(ctx, nil)
 		if err != nil {
-			vlog.Errorf("forEachDatabaseStore: cannot get all db names for app %s: %v", a, err)
+			vlog.Errorf("sync: forEachDatabaseStore: cannot get all db names for app %s: %v", a, err)
 			continue
 		}
 
@@ -50,7 +50,7 @@
 			// For each database, get its Store and invoke the callback.
 			db, err := app.NoSQLDatabase(ctx, nil, d)
 			if err != nil {
-				vlog.Errorf("forEachDatabaseStore: cannot get db %s for app %s: %v", d, a, err)
+				vlog.Errorf("sync: forEachDatabaseStore: cannot get db %s for app %s: %v", d, a, err)
 				continue
 			}
 
@@ -77,7 +77,7 @@
 // unixNanoToTime converts a Unix timestamp in nanoseconds to a Time object.
 func unixNanoToTime(timestamp int64) time.Time {
 	if timestamp < 0 {
-		vlog.Fatalf("unixNanoToTime: invalid timestamp %d", timestamp)
+		vlog.Fatalf("sync: unixNanoToTime: invalid timestamp %d", timestamp)
 	}
 	return time.Unix(timestamp/nanoPerSec, timestamp%nanoPerSec)
 }
diff --git a/x/ref/services/syncbase/vsync/watcher.go b/x/ref/services/syncbase/vsync/watcher.go
index 5e14292..3b745a8 100644
--- a/x/ref/services/syncbase/vsync/watcher.go
+++ b/x/ref/services/syncbase/vsync/watcher.go
@@ -67,7 +67,7 @@
 	for {
 		select {
 		case <-s.closed:
-			vlog.VI(1).Info("watchStore: sync channel closed, stop watching and exit")
+			vlog.VI(1).Info("sync: watchStore: channel closed, stop watching and exit")
 			return
 
 		case <-ticker.C:
@@ -84,10 +84,13 @@
 // from starving others.  A batch is stored as a contiguous set of log records
 // ending with one record having the "continued" flag set to false.
 func (s *syncService) processDatabase(ctx *context.T, appName, dbName string, st store.Store) {
+	vlog.VI(2).Infof("sync: processDatabase: begin: %s, %s", appName, dbName)
+	defer vlog.VI(2).Infof("sync: processDatabase: end: %s, %s", appName, dbName)
+
 	resMark, err := getResMark(ctx, st)
 	if err != nil {
 		if verror.ErrorID(err) != verror.ErrNoExist.ID {
-			vlog.Errorf("processDatabase: %s, %s: cannot get resMark: %v", appName, dbName, err)
+			vlog.Errorf("sync: processDatabase: %s, %s: cannot get resMark: %v", appName, dbName, err)
 			return
 		}
 		resMark = ""
@@ -135,6 +138,8 @@
 		}
 	}
 	logs = logs[:i]
+	vlog.VI(3).Infof("sync: processWatchLogBatch: %s, %s: sg snap %t, syncable %d, total %d",
+		appName, dbName, !appBatch, len(logs), totalCount)
 
 	// Transactional processing of the batch: convert these syncable log
 	// records to a batch of sync log records, filling their parent versions
@@ -142,7 +147,9 @@
 	err := store.RunInTransaction(st, func(tx store.StoreReadWriter) error {
 		batch := make([]*localLogRec, 0, len(logs))
 		for _, entry := range logs {
-			if rec := convertLogRecord(ctx, tx, entry); rec != nil {
+			if rec, err := convertLogRecord(ctx, tx, entry); err != nil {
+				return err
+			} else if rec != nil {
 				batch = append(batch, rec)
 			}
 		}
@@ -155,7 +162,7 @@
 
 	if err != nil {
 		// TODO(rdaoud): don't crash, quarantine this app database.
-		vlog.Fatalf("processDatabase: %s, %s: watcher cannot process batch: %v", appName, dbName, err)
+		vlog.Fatalf("sync: processWatchLogBatch:: %s, %s: watcher cannot process batch: %v", appName, dbName, err)
 	}
 }
 
@@ -180,6 +187,9 @@
 
 	gen, pos := s.reserveGenAndPosInDbLog(ctx, appName, dbName, count)
 
+	vlog.VI(3).Infof("sync: processBatch: %s, %s: len %d, total %d, btid %x, gen %d, pos %d",
+		appName, dbName, count, totalCount, batchId, gen, pos)
+
 	for _, rec := range batch {
 		// Update the log record. Portions of the record Metadata must
 		// already be filled.
@@ -290,7 +300,7 @@
 		logKey := string(stream.Key(nil))
 		var logEnt watchable.LogEntry
 		if vom.Decode(stream.Value(nil), &logEnt) != nil {
-			vlog.Fatalf("getWatchLogBatch: %s, %s: invalid watch LogEntry %s: %v",
+			vlog.Fatalf("sync: getWatchLogBatch: %s, %s: invalid watch LogEntry %s: %v",
 				appName, dbName, logKey, stream.Value(nil))
 		}
 
@@ -305,12 +315,12 @@
 	}
 
 	if err := stream.Err(); err != nil {
-		vlog.Errorf("getWatchLogBatch: %s, %s: scan stream error: %v", appName, dbName, err)
+		vlog.Errorf("sync: getWatchLogBatch: %s, %s: scan stream error: %v", appName, dbName, err)
 		return nil, resMark
 	}
 	if !endOfBatch {
 		if len(logs) > 0 {
-			vlog.Fatalf("processDatabase: %s, %s: end of batch not found after %d entries",
+			vlog.Fatalf("sync: getWatchLogBatch: %s, %s: end of batch not found after %d entries",
 				appName, dbName, len(logs))
 		}
 		return nil, resMark
@@ -326,7 +336,7 @@
 // simplify the store-to-sync interaction.  A deleted key would still have a
 // version and its value entry would encode the "deleted" flag, either in the
 // key or probably in a value wrapper that would contain other metadata.
-func convertLogRecord(ctx *context.T, tx store.StoreReadWriter, logEnt *watchable.LogEntry) *localLogRec {
+func convertLogRecord(ctx *context.T, tx store.StoreReadWriter, logEnt *watchable.LogEntry) (*localLogRec, error) {
 	_ = tx.(store.Transaction)
 	var rec *localLogRec
 	timestamp := logEnt.CommitTimestamp
@@ -345,7 +355,9 @@
 		// Create records for object versions not already in the DAG.
 		// Duplicates can appear here in cases of nested SyncGroups or
 		// peer SyncGroups.
-		if !hasNode(ctx, tx, string(op.Value.Key), string(op.Value.Version)) {
+		if ok, err := hasNode(ctx, tx, string(op.Value.Key), string(op.Value.Version)); err != nil {
+			return nil, err
+		} else if !ok {
 			rec = newLocalLogRec(ctx, tx, op.Value.Key, op.Value.Version, false, timestamp)
 		}
 
@@ -353,13 +365,15 @@
 		rec = newLocalLogRec(ctx, tx, op.Value.Key, watchable.NewVersion(), true, timestamp)
 
 	case watchable.OpSyncGroup:
-		vlog.Errorf("watch LogEntry for SyncGroup should not be converted: %v", logEnt)
+		vlog.Errorf("sync: convertLogRecord: watch LogEntry for SyncGroup should not be converted: %v", logEnt)
+		return nil, verror.New(verror.ErrInternal, ctx, "cannot convert a watch log OpSyncGroup entry")
 
 	default:
-		vlog.Errorf("invalid watch LogEntry: %v", logEnt)
+		vlog.Errorf("sync: convertLogRecord: invalid watch LogEntry: %v", logEnt)
+		return nil, verror.New(verror.ErrInternal, ctx, "cannot convert unknown watch log entry")
 	}
 
-	return rec
+	return rec, nil
 }
 
 // newLocalLogRec creates a local sync log record given its information: key,
@@ -377,7 +391,7 @@
 	if head, err := getHead(ctx, tx, oid); err == nil {
 		rec.Metadata.Parents = []string{head}
 	} else if deleted || (verror.ErrorID(err) != verror.ErrNoExist.ID) {
-		vlog.Fatalf("cannot getHead to convert log record for %s: %v", oid, err)
+		vlog.Fatalf("sync: newLocalLogRec: cannot getHead to convert log record for %s: %v", oid, err)
 	}
 	rec.Metadata.UpdTime = unixNanoToTime(timestamp)
 	return &rec
@@ -397,6 +411,8 @@
 				incrWatchPrefix(appName, dbName, prefix)
 			}
 		}
+		vlog.VI(3).Infof("sync: processSyncGroupLogRecord: %s, %s: remove %t, prefixes: %q",
+			appName, dbName, remove, op.Value.Prefixes)
 		return true
 
 	default:
@@ -425,7 +441,7 @@
 	// it with the SyncGroup prefixes which are defined by the application.
 	parts := util.SplitKeyParts(key)
 	if len(parts) < 2 {
-		vlog.Fatalf("syncable: %s: invalid entry key %s: %v", appdb, key, logEnt)
+		vlog.Fatalf("sync: syncable: %s: invalid entry key %s: %v", appdb, key, logEnt)
 	}
 	key = util.JoinKeyParts(parts[1:]...)
 
diff --git a/x/ref/services/syncbase/vsync/watcher_test.go b/x/ref/services/syncbase/vsync/watcher_test.go
index a2107d5..66aae8f 100644
--- a/x/ref/services/syncbase/vsync/watcher_test.go
+++ b/x/ref/services/syncbase/vsync/watcher_test.go
@@ -177,8 +177,11 @@
 	if res, err := getResMark(nil, st); err != nil && res != resmark {
 		t.Errorf("invalid resmark batch processing: got %s instead of %s", res, resmark)
 	}
-	if hasNode(nil, st, fooKey, "123") || hasNode(nil, st, barKey, "555") {
-		t.Error("hasNode() found DAG entries for non-syncable logs")
+	if ok, err := hasNode(nil, st, fooKey, "123"); err != nil || ok {
+		t.Error("hasNode() found DAG entry for non-syncable log on foo")
+	}
+	if ok, err := hasNode(nil, st, barKey, "555"); err != nil || ok {
+		t.Error("hasNode() found DAG entry for non-syncable log on bar")
 	}
 
 	// Partially syncable logs.
@@ -212,8 +215,8 @@
 	if node2.Level != 0 || node2.Parents != nil || node2.Logrec == "" || node2.BatchId != NoBatchId {
 		t.Errorf("invalid DAG node for fooxyz: %v", node2)
 	}
-	if hasNode(nil, st, barKey, "222") {
-		t.Error("hasNode() found DAG entries for non-syncable logs")
+	if ok, err := hasNode(nil, st, barKey, "222"); err != nil || ok {
+		t.Error("hasNode() found DAG entry for non-syncable log on bar")
 	}
 
 	// More partially syncable logs updating existing ones.
@@ -254,8 +257,8 @@
 		node2.Logrec == "" || node2.BatchId == NoBatchId {
 		t.Errorf("invalid DAG node for fooxyz: %v", node2)
 	}
-	if hasNode(nil, st, barKey, "7") {
-		t.Error("hasNode() found DAG entries for non-syncable logs")
+	if ok, err := hasNode(nil, st, barKey, "7"); err != nil || ok {
+		t.Error("hasNode() found DAG entry for non-syncable log on bar")
 	}
 
 	// Back to non-syncable logs (remove "f" prefix).
@@ -282,8 +285,8 @@
 	if node, err := getNode(nil, st, fooxyzKey, "888"); err == nil {
 		t.Errorf("getNode() should not have found fooxyz @ 888: %v", node)
 	}
-	if hasNode(nil, st, barKey, "007") {
-		t.Error("hasNode() found DAG entries for non-syncable logs")
+	if ok, err := hasNode(nil, st, barKey, "007"); err != nil || ok {
+		t.Error("hasNode() found DAG entry for non-syncable log on bar")
 	}
 
 	// Scan the batch records and verify that there is only 1 DAG batch