Merge "veyron/services/mounttable/lib: demonstrate effect of globbing "..." pattern against the root of the namespace for a mounttable."
diff --git a/examples/boxes/android/src/boxesp2p/main.go b/examples/boxes/android/src/boxesp2p/main.go
index 90cae68..6be5812 100644
--- a/examples/boxes/android/src/boxesp2p/main.go
+++ b/examples/boxes/android/src/boxesp2p/main.go
@@ -59,7 +59,6 @@
 import (
 	"bytes"
 	"fmt"
-	"io"
 	"net"
 	"os"
 	"runtime"
@@ -94,7 +93,7 @@
 	runtime       veyron2.Runtime
 	store         *storage.Server
 	ipc           ipc.Server
-	drawStream    boxes.DrawInterfaceDrawStream
+	drawStream    boxes.DrawInterfaceServiceDrawStream
 	signalling    boxes.BoxSignalling
 	boxList       chan boxes.Box
 	myIPAddr      string
@@ -141,48 +140,14 @@
 	}
 	// Launch the sync service
 	initSyncService(endPt.Addr().String())
-	// Watch the store for any box updates
-	go gs.watchStore()
+	// Watch/Update the store for any box changes
+	go gs.monitorStore()
 	return nil
 }
 
-func (gs *goState) watchStore() {
-	rst, err := raw.BindStore(naming.JoinAddressName(gs.storeEndpoint, raw.RawStoreSuffix))
-	if err != nil {
-		panic(fmt.Errorf("Failed to Bind Store:%v", err))
-	}
-	ctx := gs.runtime.NewContext()
-	req := watch.Request{Query: query.Query{}}
-	stream, err := rst.Watch(ctx, req, veyron2.CallTimeout(ipc.NoTimeout))
-	if err != nil {
-		panic(fmt.Errorf("Can't watch store: %s: %s", gs.storeEndpoint, err))
-	}
-	for {
-		cb, err := stream.Recv()
-		if err != nil {
-			panic(fmt.Errorf("Can't receive watch event: %s: %s", gs.storeEndpoint, err))
-		}
-		for _, change := range cb.Changes {
-			if mu, ok := change.Value.(*raw.Mutation); ok && len(mu.Dir) == 0 {
-				if box, ok := mu.Value.(boxes.Box); ok {
-					nativeJava.addBox(&box)
-				}
-			}
-		}
-	}
-}
-
 func (gs *goState) Draw(context ipc.ServerContext, stream boxes.DrawInterfaceServiceDrawStream) error {
-	for {
-		box, err := stream.Recv()
-		if err == io.EOF {
-			break
-		}
-		if err != nil {
-			return err
-		}
-		nativeJava.addBox(&box)
-	}
+	gs.drawStream = stream
+	gs.streamBoxesLoop()
 	return nil
 }
 
@@ -190,7 +155,18 @@
 	gs.boxList <- box
 }
 
-func (gs *goState) sendDrawLoop() {
+func (gs *goState) streamBoxesLoop() {
+	// Loop to receive boxes from remote peer
+	go func() {
+		for {
+			box, err := gs.drawStream.Recv()
+			if err != nil {
+				return
+			}
+			nativeJava.addBox(&box)
+		}
+	}()
+	// Loop to send boxes to remote peer
 	for {
 		if err := gs.drawStream.Send(<-gs.boxList); err != nil {
 			break
@@ -198,9 +174,35 @@
 	}
 }
 
-func (gs *goState) updateStore(endpoint string) {
-	initSyncService(endpoint)
+func (gs *goState) monitorStore() {
 	ctx := gs.runtime.NewContext()
+
+	// Watch for any box updates from the store
+	go func() {
+		rst, err := raw.BindStore(naming.JoinAddressName(gs.storeEndpoint, raw.RawStoreSuffix))
+		if err != nil {
+			panic(fmt.Errorf("Failed to raw.Bind Store:%v", err))
+		}
+		req := watch.Request{Query: query.Query{}}
+		stream, err := rst.Watch(ctx, req, veyron2.CallTimeout(ipc.NoTimeout))
+		if err != nil {
+			panic(fmt.Errorf("Can't watch store: %s: %s", gs.storeEndpoint, err))
+		}
+		for {
+			cb, err := stream.Recv()
+			if err != nil {
+				panic(fmt.Errorf("Can't receive watch event: %s: %s", gs.storeEndpoint, err))
+			}
+			for _, change := range cb.Changes {
+				if mu, ok := change.Value.(*raw.Mutation); ok && len(mu.Dir) == 0 {
+					if box, ok := mu.Value.(boxes.Box); ok && box.DeviceId != gs.myIPAddr {
+						nativeJava.addBox(&box)
+					}
+				}
+			}
+		}
+	}()
+	// Send any box updates to the store
 	vst, err := vstore.New(gs.storeEndpoint)
 	if err != nil {
 		panic(fmt.Errorf("Failed to init veyron store:%v", err))
@@ -232,7 +234,6 @@
 	if err != nil {
 		panic(fmt.Errorf("Failed runtime.NewServer:%v", err))
 	}
-
 	drawServer := boxes.NewServerDrawInterface(gs)
 	if err := srv.Register("draw", ipc.SoloDispatcher(drawServer, auth)); err != nil {
 		panic(fmt.Errorf("Failed Register:%v", err))
@@ -255,12 +256,11 @@
 	if err != nil {
 		panic(fmt.Errorf("failed BindDrawInterface:%v", err))
 	}
-	if gs.drawStream, err = drawInterface.Draw(gs.runtime.TODOContext()); err != nil {
-		panic(fmt.Errorf("failed to get handle to Draw stream:%v\n", err))
-	}
-	gs.boxList = make(chan boxes.Box, 256)
 	if !useStoreService {
-		go gs.sendDrawLoop()
+		if gs.drawStream, err = drawInterface.Draw(gs.runtime.TODOContext()); err != nil {
+			panic(fmt.Errorf("failed to get handle to Draw stream:%v\n", err))
+		}
+		go gs.streamBoxesLoop()
 	} else {
 		// Initialize the store sync service that listens for updates from a peer
 		endpoint, err := inaming.NewEndpoint(endpointStr)
@@ -270,7 +270,8 @@
 		if err = drawInterface.SyncBoxes(gs.runtime.TODOContext()); err != nil {
 			panic(fmt.Errorf("failed to setup remote sync service:%v", err))
 		}
-		go gs.updateStore(endpoint.Addr().String())
+		initSyncService(endpoint.Addr().String())
+		go gs.monitorStore()
 	}
 }
 
@@ -293,7 +294,7 @@
 //export Java_com_boxes_GoNative_sendDrawBox
 func Java_com_boxes_GoNative_sendDrawBox(env *C.JNIEnv, class C.jclass,
 	boxId C.jstring, ox C.jfloat, oy C.jfloat, cx C.jfloat, cy C.jfloat) {
-	gs.sendBox(boxes.Box{BoxId: C.GoString(C.JToCString(env, boxId)), Points: [4]float32{float32(ox), float32(oy), float32(cx), float32(cy)}})
+	gs.sendBox(boxes.Box{DeviceId: gs.myIPAddr, BoxId: C.GoString(C.JToCString(env, boxId)), Points: [4]float32{float32(ox), float32(oy), float32(cx), float32(cy)}})
 }
 
 //export Java_com_boxes_GoNative_registerAddBox
@@ -377,6 +378,8 @@
 		panic(fmt.Errorf("Failed to remove remnant store files:%v\n", err))
 	}
 
+	gs.boxList = make(chan boxes.Box, 256)
+
 	// Initialize veyron runtime and bind to the signalling server used to rendezvous with
 	// another peer device. TODO(gauthamt): Switch to using the nameserver for signalling.
 	gs.runtime = rt.Init(veyron2.LocalID(privateID))
diff --git a/examples/boxes/boxes.vdl b/examples/boxes/boxes.vdl
index b34341d..1736cd4 100644
--- a/examples/boxes/boxes.vdl
+++ b/examples/boxes/boxes.vdl
@@ -13,6 +13,8 @@
 // Box describes the name and co-ordinates of a given box that
 // is displayed in the View of a peer device.
 type Box struct {
+ // DeviceID that generated the box
+ DeviceId string
  // BoxId is a unique name for a box
  BoxId string
  // Points are the co-ordinates of a given box
diff --git a/examples/boxes/boxes.vdl.go b/examples/boxes/boxes.vdl.go
index f6a561c..5f73ee5 100644
--- a/examples/boxes/boxes.vdl.go
+++ b/examples/boxes/boxes.vdl.go
@@ -19,6 +19,8 @@
 // Box describes the name and co-ordinates of a given box that
 // is displayed in the View of a peer device.
 type Box struct {
+	// DeviceID that generated the box
+	DeviceId string
 	// BoxId is a unique name for a box
 	BoxId string
 	// Points are the co-ordinates of a given box
@@ -464,6 +466,7 @@
 	result.TypeDefs = []_gen_vdl.Any{
 		_gen_wiretype.NamedPrimitiveType{Type: 0x1, Name: "error", Tags: []string(nil)}, _gen_wiretype.ArrayType{Elem: 0x19, Len: 0x4, Name: "", Tags: []string(nil)}, _gen_wiretype.StructType{
 			[]_gen_wiretype.FieldType{
+				_gen_wiretype.FieldType{Type: 0x3, Name: "DeviceId"},
 				_gen_wiretype.FieldType{Type: 0x3, Name: "BoxId"},
 				_gen_wiretype.FieldType{Type: 0x42, Name: "Points"},
 			},
diff --git a/lib/testutil/modules/ls.go b/lib/testutil/modules/ls.go
index daac425..b0e21cf 100644
--- a/lib/testutil/modules/ls.go
+++ b/lib/testutil/modules/ls.go
@@ -4,6 +4,7 @@
 	"fmt"
 	"io"
 	"strconv"
+	"strings"
 
 	"veyron2/rt"
 	"veyron2/services/mounttable"
@@ -71,12 +72,11 @@
 		return nil, nil, nil, err
 	} else {
 		vars := make(Variables)
-		if len(r) > 1 {
-			for i, v := range r[:len(r)-1] {
-				k := "R" + strconv.Itoa(i)
-				vars.Update(k, v)
-			}
+		for i, v := range r {
+			k := "R" + strconv.Itoa(i)
+			vars.Update(k, v)
 		}
+		vars.Update("ALL", strings.Join(r, ","))
 		return vars, r, nil, nil
 	}
 }
diff --git a/runtimes/google/lib/sshagent/sshagent.go b/runtimes/google/lib/sshagent/sshagent.go
deleted file mode 100644
index b7e4575..0000000
--- a/runtimes/google/lib/sshagent/sshagent.go
+++ /dev/null
@@ -1,25 +0,0 @@
-// Package sshagent implements a type to use ssh-agent to store ECDSA private keys and sign slices using them.
-//
-// For unix-based systems, the implementation is based on the OpenSSH implementation of ssh-agent, with the protocol defined in
-// revision 1.7 of the PROTOCOL.agent file (http://www.openbsd.org/cgi-bin/cvsweb/src/usr.bin/ssh/PROTOCOL.agent?rev=1.7;content-type=text%2Fplain)
-package sshagent
-
-import (
-	"crypto/ecdsa"
-	"math/big"
-)
-
-// Agent is the interface for communicating with an ssh-agent process.
-type Agent interface {
-	// List returns the set of public keys and comments associated with them
-	// stored in the SSH agent.
-	List() (keys []*ecdsa.PublicKey, comments []string, err error)
-	// Add adds a (private key, comment) pair to the SSH agent.
-	Add(priv *ecdsa.PrivateKey, comment string) error
-	// Remove removes the private key associated with the provided public key
-	// from the SSH agent.
-	Remove(pub *ecdsa.PublicKey) error
-	// Sign signs the SHA-256 hash of data using the private key stored in the
-	// SSH agent correponding to pub.
-	Sign(pub *ecdsa.PublicKey, data []byte) (R, S *big.Int, err error)
-}
diff --git a/runtimes/google/lib/sshagent/sshagent_unix.go b/runtimes/google/lib/sshagent/sshagent_unix.go
deleted file mode 100644
index 325e624..0000000
--- a/runtimes/google/lib/sshagent/sshagent_unix.go
+++ /dev/null
@@ -1,370 +0,0 @@
-package sshagent
-
-import (
-	"bytes"
-	"crypto/ecdsa"
-	"crypto/elliptic"
-	"encoding/binary"
-	"fmt"
-	"io"
-	"math/big"
-	"net"
-	"os"
-	"strings"
-)
-
-// New returns an Agent implementation that uses the environment variable SSH_AUTH_SOCK
-// to configure communication with a running ssh-agent.
-func New() (Agent, error) {
-	sock := os.Getenv(SSH_AUTH_SOCK)
-	if len(sock) == 0 {
-		return nil, fmt.Errorf("SSH_AUTH_SOCK not set")
-	}
-	return &agent{addr: sock}, nil
-}
-
-const (
-	// Requests from client to agent (Section 3.2 of PROTOCOL.agent)
-	SSH2_AGENTC_REQUEST_IDENTITIES    = 11
-	SSH2_AGENTC_SIGN_REQUEST          = 13
-	SSH2_AGENTC_ADD_IDENTITY          = 17
-	SSH2_AGENTC_REMOVE_IDENTITY       = 18
-	SSH2_AGENTC_REMOVE_ALL_IDENTITIES = 19
-	SSH2_AGENTC_ADD_ID_CONSTRAINED    = 25
-
-	// Key-type independent requests from client to agent (Section 3.3 of PROTOCOL.agent)
-	SSH_AGENTC_ADD_SMARTCARD_KEY    = 20
-	SSH_AGENTC_REMOVE_SMARTCARD_KEY = 21
-	SSH_AGENTC_LOCK                 = 22
-	SSH_AGENTC_UNLOCK               = 23
-
-	// Generic replies from agent to client (Section 3.4)
-	SSH2_AGENT_FAILURE = 5
-	SSH2_AGENT_SUCCESS = 6
-
-	// Replies from agent to client for protocol 1 key operations (Section 3.6 of PROTOCOL.agent)
-	SSH2_AGENT_IDENTITIES_ANSWER             = 12
-	SSH2_AGENT_SIGN_RESPONSE                 = 14
-	SSH_AGENTC_ADD_SMARTCARD_KEY_CONSTRAINED = 26
-
-	// Key constraint identifiers (Section 3.7 of PROTOCOL.agent)
-	SSH_AGENT_CONSTRAIN_LIFETIME = 1
-	SSH_AGENT_CONSTRAIN_CONFIRM  = 2
-
-	// SSH_AUTH_SOCK is the name of the environment variable containing the path to the
-	// unix socket to communicate with the SSH agent.
-	SSH_AUTH_SOCK = "SSH_AUTH_SOCK"
-
-	ecdsaKeyPrefix = "ecdsa-sha2-"
-)
-
-var curvenames = map[string]elliptic.Curve{
-	"nistp256": elliptic.P256(),
-	"nistp384": elliptic.P384(),
-	"nistp521": elliptic.P521(),
-}
-
-func name2curve(name string) (elliptic.Curve, error) {
-	curve, exists := curvenames[name]
-	if !exists {
-		return nil, fmt.Errorf("unrecognized elliptic curve: %q", name)
-	}
-	return curve, nil
-}
-
-func curve2name(curve elliptic.Curve) (string, error) {
-	for n, c := range curvenames {
-		if c == curve {
-			return n, nil
-		}
-	}
-	return "", fmt.Errorf("unregistered elliptic curve: %v", curve)
-}
-
-func readU32(r io.Reader) (uint32, error) {
-	var ret uint32
-	err := binary.Read(r, binary.BigEndian, &ret)
-	return ret, err
-}
-
-func read(r io.Reader) ([]byte, error) {
-	size, err := readU32(r)
-	if err != nil {
-		return nil, fmt.Errorf("failed to read length header: %v", err)
-	}
-	msg := make([]byte, size)
-	if _, err := io.ReadFull(r, msg); err != nil {
-		return nil, fmt.Errorf("failed to read %d bytes: %v", size, err)
-	}
-	return msg, nil
-}
-
-func readString(r io.Reader) (string, error) {
-	b, err := read(r)
-	if err != nil {
-		return "", err
-	}
-	return string(b), nil
-}
-
-func readcurve(r io.Reader) (elliptic.Curve, error) {
-	str, err := readString(r)
-	if err != nil {
-		return nil, fmt.Errorf("failed to read ECDSA curve: %v", err)
-	}
-	return name2curve(str)
-}
-
-func write(msg []byte, w io.Writer) error {
-	if err := binary.Write(w, binary.BigEndian, uint32(len(msg))); err != nil {
-		return err
-	}
-	if n, err := w.Write(msg); err != nil {
-		return err
-	} else if n != len(msg) {
-		return fmt.Errorf("wrote %d, wanted to write %d bytes", n, len(msg))
-	}
-	return nil
-}
-
-func blob2ecdsa(blob []byte) (*ecdsa.PublicKey, error) {
-	buf := bytes.NewBuffer(blob)
-	keytype, err := readString(buf)
-	if err != nil {
-		return nil, fmt.Errorf("failed to read key type: %v", err)
-	}
-	// the curve is specified as a key type (e.g. ecdsa-sha2-nistp256)
-	// and then again as a string. The two should match.
-	if !strings.HasPrefix(keytype, ecdsaKeyPrefix) {
-		return nil, fmt.Errorf("not a recognized ECDSA key type: %q", keytype)
-	}
-	keycurve, err := name2curve(strings.TrimPrefix(keytype, ecdsaKeyPrefix))
-	if err != nil {
-		return nil, err
-	}
-	curve, err := readcurve(buf)
-	if err != nil {
-		return nil, err
-	}
-	if curve != keycurve {
-		return nil, fmt.Errorf("ECDSA curve does not match key type")
-	}
-	marshaled, err := read(buf)
-	if err != nil {
-		return nil, fmt.Errorf("failed to read marshaled ECDSA point: %v", err)
-	}
-	x, y := elliptic.Unmarshal(curve, marshaled)
-	if x == nil {
-		return nil, fmt.Errorf("failed to unmarshal ECDSA point: %v", err)
-	}
-	return &ecdsa.PublicKey{Curve: curve, X: x, Y: y}, nil
-}
-
-type agent struct {
-	addr string
-	conn io.ReadWriter
-}
-
-func (a *agent) connect() error {
-	if a.conn != nil {
-		return nil
-	}
-	var err error
-	if a.conn, err = net.Dial("unix", a.addr); err != nil {
-		return fmt.Errorf("failed to connect to %v=%v", SSH_AUTH_SOCK, a.addr)
-	}
-	return nil
-}
-
-func (a *agent) call(request []byte) ([]byte, error) {
-	if err := a.connect(); err != nil {
-		return nil, err
-	}
-	if err := write(request, a.conn); err != nil {
-		a.conn = nil
-		return nil, fmt.Errorf("failed to send request: %v", err)
-	}
-	response, err := read(a.conn)
-	if err != nil {
-		a.conn = nil
-		return nil, fmt.Errorf("failed to read response: %v", err)
-	}
-	return response, nil
-}
-
-// Section 2.5.2 of PROTOCOL.agent
-func (a *agent) List() (keys []*ecdsa.PublicKey, comments []string, err error) {
-	reply, err := a.call([]byte{SSH2_AGENTC_REQUEST_IDENTITIES})
-	if err != nil {
-		return nil, nil, fmt.Errorf("SSH2_AGENTC_REQUEST_IDENTITIES call failed: %v", err)
-	}
-	if len(reply) == 0 {
-		return nil, nil, fmt.Errorf("SSH2_AGENTC_REQUEST_IDENTITES call received empty reply")
-	}
-	if got, want := reply[0], byte(SSH2_AGENT_IDENTITIES_ANSWER); got != want {
-		return nil, nil, fmt.Errorf("unexpected reply from ssh-agent: got %v, want SSH2_AGENT_IDENTITIES_ANSWER(%v)", got, want)
-	}
-	buf := bytes.NewBuffer(reply[1:])
-	num, err := readU32(buf)
-	if err != nil {
-		return nil, nil, fmt.Errorf("failed to read num identities from SSH2_AGENT_IDENTITIES_ANSWER: %v", err)
-	}
-	for i := uint32(0); i < num; i++ {
-		blob, err := read(buf)
-		if err != nil {
-			return nil, nil, fmt.Errorf("failed to read key blob for key %d of %d in SSH2_AGENT_IDENTITIES_ANSWER: %v", i, num, err)
-		}
-		comment, err := readString(buf)
-		if err != nil {
-			return nil, nil, fmt.Errorf("failed to read comment for key %d of %d in SSH2_AGENT_IDENTITIES_ANSWER: %v", i, num, err)
-		}
-		key, _ := blob2ecdsa(blob)
-		if key != nil {
-			keys = append(keys, key)
-			comments = append(comments, comment)
-		}
-	}
-	return
-}
-
-func (a *agent) checkSSH2_AGENT_SUCCESS(request []byte, description string) error {
-	reply, err := a.call(request)
-	if err != nil {
-		return fmt.Errorf("%v call failed: %v", description, err)
-	}
-	if len(reply) != 1 || reply[0] != SSH2_AGENT_SUCCESS {
-		return fmt.Errorf("got reply %v for %v, wanted [SSH2_AGENT_SUCCESS(%d)]", reply, description, SSH2_AGENT_SUCCESS)
-	}
-	return nil
-}
-
-func bignum(n *big.Int) []byte {
-	bytes := n.Bytes()
-	// Prepend 0 if sign bit is set.
-	// See sshbuf_put_bignum2 in ssh/sshbuf-getput-crypto.c revision 1.1:
-	// http://www.openbsd.org/cgi-bin/cvsweb/src/usr.bin/ssh/sshbuf-getput-crypto.c?rev=1.1;content-type=text%2Fx-cvsweb-markup
-	if bytes[0]&0x80 != 0 {
-		return append([]byte{0}, bytes...)
-	}
-	return bytes
-}
-
-// Section 2.2.3 of PROTOCOL.agent
-func (a *agent) Add(key *ecdsa.PrivateKey, comment string) error {
-	if key.D.Sign() < 0 || key.PublicKey.X.Sign() < 0 || key.PublicKey.Y.Sign() < 0 {
-		// As of June 2014, ssh did not like negative bignums.
-		// See sshbuf_get_bignum2 in ssh/sshbuf-getput-crypto.c revision 1.1:
-		// http://www.openbsd.org/cgi-bin/cvsweb/src/usr.bin/ssh/sshbuf-getput-crypto.c?rev=1.1;content-type=text%2Fx-cvsweb-markup
-		return fmt.Errorf("ssh-agent does not support negative big numbers")
-	}
-	curve, err := curve2name(key.PublicKey.Curve)
-	if err != nil {
-		return err
-	}
-	var request bytes.Buffer
-	w := func(data []byte) {
-		if err == nil {
-			err = write(data, &request)
-		}
-	}
-	if err := request.WriteByte(SSH2_AGENTC_ADD_IDENTITY); err != nil {
-		return err
-	}
-	w([]byte(ecdsaKeyPrefix + curve)) // key type: e.g., ecdsa-sha2-nistp256
-	w([]byte(curve))                  // ecdsa curve: e.g. nistp256
-	w(elliptic.Marshal(key.PublicKey.Curve, key.PublicKey.X, key.PublicKey.Y))
-	w(bignum(key.D))
-	w([]byte(comment))
-	if err != nil {
-		return err
-	}
-	return a.checkSSH2_AGENT_SUCCESS(request.Bytes(), "SSH2_AGENTC_ADD_IDENTITY")
-}
-
-// keyblob produces an encoding of an ecdsa.PublicKey as per Section 3.1 in RFC 5656,
-// (extending Section 6.6 of RFC 4253),
-// required by the SSH2_AGENTC_REMOVE_IDENTITY and SSH2_AGENTC_SIGN_REQUEST
-// (Section 2.4.2 and Section 2.6.2 in PROTOCOL.agent).
-func keyblob(key *ecdsa.PublicKey) ([]byte, error) {
-	curve, err := curve2name(key.Curve)
-	if err != nil {
-		return nil, err
-	}
-	var blob bytes.Buffer
-	if err := write([]byte(ecdsaKeyPrefix+curve), &blob); err != nil {
-		return nil, err
-	}
-	if err := write([]byte(curve), &blob); err != nil {
-		return nil, err
-	}
-	if err := write(elliptic.Marshal(key.Curve, key.X, key.Y), &blob); err != nil {
-		return nil, err
-	}
-	return blob.Bytes(), nil
-}
-
-func (a *agent) Remove(key *ecdsa.PublicKey) error {
-	keyblob, err := keyblob(key)
-	if err != nil {
-		return err
-	}
-	request := bytes.NewBuffer([]byte{SSH2_AGENTC_REMOVE_IDENTITY})
-	if err := write(keyblob, request); err != nil {
-		return err
-	}
-	return a.checkSSH2_AGENT_SUCCESS(request.Bytes(), "SSH2_AGENTC_REMOVE_IDENTITY")
-}
-
-// Section 2.6.2 of PROTOCOL.agent
-func (a *agent) Sign(pub *ecdsa.PublicKey, data []byte) (r, s *big.Int, err error) {
-	keyblob, err := keyblob(pub)
-	if err != nil {
-		return nil, nil, err
-	}
-	request := bytes.NewBuffer([]byte{SSH2_AGENTC_SIGN_REQUEST})
-	w := func(data []byte) {
-		if err == nil {
-			err = write(data, request)
-		}
-	}
-	w(keyblob)
-	w(data)
-	w([]byte{0, 0, 0, 0}) // flags (see Section 2.6.2 of PROTOCOL.agent)
-	if err != nil {
-		return nil, nil, err
-	}
-	reply, err := a.call(request.Bytes())
-	if err != nil {
-		return nil, nil, err
-	}
-	if len(reply) == 0 {
-		return nil, nil, fmt.Errorf("empty reply for SSH2_AGENT_SIGN_REQUEST")
-	}
-	if reply[0] != SSH2_AGENT_SIGN_RESPONSE {
-		return nil, nil, fmt.Errorf("got reply %v, want SSH2_AGENT_SIGN_RESPONSE(%d)", reply[0], SSH2_AGENT_SIGN_RESPONSE)
-	}
-	reply, err = read(bytes.NewBuffer(reply[1:]))
-	if err != nil {
-		return nil, nil, err
-	}
-	reader := bytes.NewBuffer(reply)
-	if _, err := read(reader); err != nil { // key, e.g. ecdsa-sha2-nistp256
-		return nil, nil, err
-	}
-	signature, err := read(reader)
-	if err != nil {
-		return nil, nil, err
-	}
-	reader = bytes.NewBuffer(signature)
-	R, err := read(reader)
-	if err != nil {
-		return nil, nil, fmt.Errorf("failed to read signature's R: %v", err)
-	}
-	S, err := read(reader)
-	if err != nil {
-		return nil, nil, fmt.Errorf("failed to read signature's S: %v", err)
-	}
-	r = new(big.Int)
-	s = new(big.Int)
-	return r.SetBytes(R), s.SetBytes(S), nil
-}
diff --git a/runtimes/google/lib/sshagent/sshagent_unix_test.go b/runtimes/google/lib/sshagent/sshagent_unix_test.go
deleted file mode 100644
index e3e34e7..0000000
--- a/runtimes/google/lib/sshagent/sshagent_unix_test.go
+++ /dev/null
@@ -1,103 +0,0 @@
-// Do not test on OS X as the default SSH installation that ships with that (at least up till 10.9.3) is not compiled with ECDSA key support.
-// +build !darwin
-
-package sshagent_test
-
-import (
-	"bufio"
-	"crypto/ecdsa"
-	"crypto/elliptic"
-	"crypto/rand"
-	"crypto/sha256"
-	"fmt"
-	"os"
-	"os/exec"
-	"reflect"
-	"strings"
-	"testing"
-
-	"veyron/runtimes/google/lib/sshagent"
-)
-
-func TestSSHAgent(t *testing.T) {
-	if err := start(); err != nil {
-		t.Fatal(err)
-	}
-	defer stop()
-	agent, err := sshagent.New()
-	if err != nil {
-		t.Fatal(err)
-	}
-	key, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
-	if err != nil {
-		t.Fatal(err)
-	}
-
-	if keys, comments, err := agent.List(); len(keys) != 0 || len(comments) != 0 || err != nil {
-		t.Errorf("Got (%v, %v, %v) want ([], [], nil)", keys, comments, err)
-	}
-	if err := agent.Add(key, "bugs bunny"); err != nil {
-		t.Error(err)
-	}
-	if keys, comments, err := agent.List(); len(keys) != 1 || len(comments) != 1 || err != nil || !samekey(keys[0], key) || comments[0] != "bugs bunny" {
-		var match bool
-		if len(keys) == 1 {
-			match = samekey(keys[0], key)
-		}
-		t.Errorf("Got (%v, %v, %v) want ([<something>], [bugs bunny], nil), keys match: %v", keys, comments, err, match)
-	}
-	if r, s, err := agent.Sign(&key.PublicKey, []byte("looney tunes")); err != nil {
-		t.Error(err)
-	} else if !ecdsa.Verify(&key.PublicKey, sha256hash("looney tunes"), r, s) {
-		t.Errorf("signature does not verify")
-	}
-	if err := agent.Remove(&key.PublicKey); err != nil {
-		t.Error(err)
-	}
-	if keys, comments, err := agent.List(); len(keys) != 0 || len(comments) != 0 || err != nil {
-		t.Errorf("Got (%v, %v, %v) want ([], [], nil)", keys, comments, err)
-	}
-}
-
-func samekey(pub *ecdsa.PublicKey, priv *ecdsa.PrivateKey) bool {
-	return reflect.DeepEqual(*pub, priv.PublicKey)
-}
-
-func sha256hash(s string) []byte {
-	h := sha256.New()
-	h.Write([]byte(s))
-	return h.Sum(nil)
-}
-
-func start() error {
-	os.Setenv(sshagent.SSH_AUTH_SOCK, "")
-	os.Setenv("SSH_AGENT_PID", "")
-	cmd := exec.Command("ssh-agent")
-	stdout, err := cmd.StdoutPipe()
-	if err != nil {
-		return fmt.Errorf("failed to created stdout pipe for ssh-agent: %v", err)
-	}
-	if err := cmd.Start(); err != nil {
-		return fmt.Errorf("failed to execute ssh-agent -d: %v", err)
-	}
-	reader := bufio.NewReader(stdout)
-	var line1, line2 string
-	if line1, err = reader.ReadString('\n'); err != nil {
-		return fmt.Errorf("failed to read line1 from ssh-agent: %v", err)
-	}
-	if line2, err = reader.ReadString('\n'); err != nil {
-		return fmt.Errorf("failed to read line2 from ssh-agent: %v", err)
-	}
-
-	const (
-		prefix1 = sshagent.SSH_AUTH_SOCK + "="
-		prefix2 = "SSH_AGENT_PID="
-	)
-	os.Setenv(sshagent.SSH_AUTH_SOCK, strings.Split(strings.TrimPrefix(line1, prefix1), ";")[0])
-	os.Setenv("SSH_AGENT_PID", strings.Split(strings.TrimPrefix(line2, prefix2), ";")[0])
-	return nil
-}
-
-func stop() {
-	exec.Command("ssh-agent", "-k").Run()
-}
diff --git a/runtimes/google/vsync/dag.go b/runtimes/google/vsync/dag.go
index 9b47cc9..ac1a342 100644
--- a/runtimes/google/vsync/dag.go
+++ b/runtimes/google/vsync/dag.go
@@ -97,6 +97,7 @@
 	"fmt"
 
 	"veyron2/storage"
+	"veyron2/vlog"
 )
 
 type dag struct {
@@ -114,9 +115,9 @@
 }
 
 type graftInfo struct {
-	newNodes   map[storage.Version]bool   // set of newly added nodes during a sync
-	graftNodes map[storage.Version]uint64 // set of graft nodes and their level
-	newHead    storage.Version            // new (pending) head node
+	newNodes   map[storage.Version]struct{} // set of newly added nodes during a sync
+	graftNodes map[storage.Version]uint64   // set of graft nodes and their level
+	newHeads   map[storage.Version]struct{} // set of candidate new head nodes
 }
 
 // openDAG opens or creates a DAG for the given filename.
@@ -177,6 +178,46 @@
 	}
 }
 
+// getObjectGraft returns the graft structure for an object ID.
+// The graftInfo struct for an object is ephemeral (in-memory) and it
+// tracks the following information:
+// - newNodes:   the set of newly added nodes used to detect the type of
+//               edges between nodes (new-node to old-node or vice versa).
+// - newHeads:   the set of new candidate head nodes used to detect conflicts.
+// - graftNodes: the set of nodes used to find common ancestors between
+//               conflicting nodes.
+//
+// After the received Sync logs are applied, if there are two new heads in
+// the newHeads set, there is a conflict to be resolved for this object.
+// Otherwise if there is only one head, no conflict was triggered and the
+// new head becomes the current version for the object.
+//
+// In case of conflict, the graftNodes set is used to select the common
+// ancestor to pass to the conflict resolver.
+//
+// Note: if an object's graft structure does not exist only create it
+// if the "create" parameter is set to true.
+func (d *dag) getObjectGraft(oid storage.ID, create bool) *graftInfo {
+	graft := d.graft[oid]
+	if graft == nil && create {
+		graft = &graftInfo{
+			newNodes:   make(map[storage.Version]struct{}),
+			graftNodes: make(map[storage.Version]uint64),
+			newHeads:   make(map[storage.Version]struct{}),
+		}
+
+		// If a current head node exists for this object, initialize
+		// the set of candidate new heads to include it.
+		head, err := d.getHead(oid)
+		if err == nil {
+			graft.newHeads[head] = struct{}{}
+		}
+
+		d.graft[oid] = graft
+	}
+	return graft
+}
+
 // addNode adds a new node for an object in the DAG, linking it to its parent nodes.
 // It verifies that this node does not exist and that its parent nodes are valid.
 // It also determines the DAG level of the node from its parent nodes (max() + 1).
@@ -237,21 +278,10 @@
 	//    the origin root node), representing the most up-to-date common
 	//    knowledge between this device and the divergent changes.
 	//
-	// The graftInfo struct for an object is ephemeral (in-memory) and it
-	// tracks the set of newly added nodes, which in turn is used to
-	// detect an old-to-new edge which identifies the old node as a graft
-	// node.  The graft nodes and their DAG levels are tracked in a map
-	// which is used to (1) detect conflicts and (2) select the common
-	// ancestor to pass to the conflict resolver.
-	//
 	// Note: at the end of a sync operation between 2 devices, the whole
 	// graft info is cleared (Syncd calls clearGraft()) to prepare it for
 	// the new pairwise sync operation.
-	graft := d.graft[oid]
-	if graft == nil && remote {
-		graft = &graftInfo{newNodes: make(map[storage.Version]bool), graftNodes: make(map[storage.Version]uint64)}
-		d.graft[oid] = graft
-	}
+	graft := d.getObjectGraft(oid, remote)
 
 	// Verify the parents and determine the node level.
 	// Update the graft info in the DAG for this object.
@@ -267,16 +297,21 @@
 		if remote {
 			// If this parent is an old node, it's a graft point in the DAG
 			// and may be a common ancestor used during conflict resolution.
-			if !graft.newNodes[parent] {
+			if _, ok := graft.newNodes[parent]; !ok {
 				graft.graftNodes[parent] = node.Level
 			}
+
+			// The parent nodes can no longer be candidates for new head versions.
+			if _, ok := graft.newHeads[parent]; ok {
+				delete(graft.newHeads, parent)
+			}
 		}
 	}
 
 	if remote {
-		// This new node is so far the candidate for new head version.
-		graft.newNodes[version] = true
-		graft.newHead = version
+		// This new node is a candidate for new head version.
+		graft.newNodes[version] = struct{}{}
+		graft.newHeads[version] = struct{}{}
 	}
 
 	// Insert the new node in the kvdb.
@@ -293,6 +328,73 @@
 	return d.nodes.hasKey(key)
 }
 
+// addParent adds to the DAG node (oid, version) linkage to this parent node.
+// If the parent linkage is due to a local change (from conflict resolution
+// by blessing an existing version), no need to update the grafting structure.
+// Otherwise a remote change (from the Sync protocol) updates the graft.
+//
+// TODO(rdaoud): add a check to avoid the creation of cycles in the DAG.
+// TODO(rdaoud): recompute the levels of reachable child-nodes if the new
+// parent's level is greater or equal to the node's current level.
+func (d *dag) addParent(oid storage.ID, version, parent storage.Version, remote bool) error {
+	node, err := d.getNode(oid, version)
+	if err != nil {
+		return err
+	}
+
+	pnode, err := d.getNode(oid, parent)
+	if err != nil {
+		vlog.VI(1).Infof("addParent: object %v, node %d, parent %d: parent node not found", oid, version, parent)
+		return err
+	}
+
+	// Check if the parent is already linked to this node.
+	found := false
+	for i := range node.Parents {
+		if node.Parents[i] == parent {
+			found = true
+			break
+		}
+	}
+
+	// If the parent is not yet linked (local or remote) add it.
+	if !found {
+		node.Parents = append(node.Parents, parent)
+		err = d.setNode(oid, version, node)
+		if err != nil {
+			return err
+		}
+	}
+
+	// For local changes we are done, the grafting structure is not updated.
+	if !remote {
+		return nil
+	}
+
+	// If the node and its parent are new/old or old/new then add
+	// the parent as a graft point (a potential common ancestor).
+	graft := d.getObjectGraft(oid, true)
+
+	_, nodeNew := graft.newNodes[version]
+	_, parentNew := graft.newNodes[parent]
+	if (nodeNew && !parentNew) || (!nodeNew && parentNew) {
+		graft.graftNodes[parent] = pnode.Level
+	}
+
+	// The parent node can no longer be a candidate for a new head version.
+	// The addParent() function only removes candidates from newHeads that
+	// have become parents.  It does not add the child nodes to newHeads
+	// because they are not necessarily new-head candidates.  If they are
+	// new nodes, the addNode() function handles adding them to newHeads.
+	// For old nodes, only the current head could be a candidate and it is
+	// added to newHeads when the graft struct is initialized.
+	if _, ok := graft.newHeads[parent]; ok {
+		delete(graft.newHeads, parent)
+	}
+
+	return nil
+}
+
 // moveHead moves the object head node in the DAG.
 func (d *dag) moveHead(oid storage.ID, head storage.Version) error {
 	if d.store == nil {
@@ -311,9 +413,10 @@
 // new and old head nodes.
 // - Yes: return (true, newHead, oldHead, ancestor)
 // - No:  return (false, newHead, oldHead, NoVersion)
-// A conflict exists if the current (old) head node is not one of the graft
-// point of the graph fragment just added.  It means the newly added object
-// versions are not derived in part from this device's current knowledge.
+// A conflict exists when there are two new-head nodes.  It means the newly
+// added object versions are not derived in part from this device's current
+// knowledge.  If there is a single new-head, the object changes were applied
+// without triggering a conflict.
 func (d *dag) hasConflict(oid storage.ID) (isConflict bool, newHead, oldHead, ancestor storage.Version, err error) {
 	oldHead = storage.NoVersion
 	newHead = storage.NoVersion
@@ -329,19 +432,33 @@
 		return
 	}
 
-	newHead = graft.newHead
-	oldHead, err2 := d.getHead(oid)
-	if err2 != nil {
-		// This is a new object not previously known on this device, so
-		// it does not yet have a "head node" here.  This means all the
-		// DAG updates for it are new and to taken as-is (no conflict).
-		// That's why we ignore the error which is only indicating the
-		// lack of a prior head node.
+	numHeads := len(graft.newHeads)
+	if numHeads < 1 || numHeads > 2 {
+		err = fmt.Errorf("node %d has invalid number of new head candidates %d: %v", oid, numHeads, graft.newHeads)
 		return
 	}
 
-	if _, ok := graft.graftNodes[oldHead]; ok {
-		return // no conflict, current head is a graft point
+	// Fetch the current head for this object if it exists.  The error from getHead()
+	// is ignored because a newly received object is not yet known on this device and
+	// will not trigger a conflict.
+	oldHead, _ = d.getHead(oid)
+
+	// If there is only one new head node there is no conflict.
+	// The new head is that single one, even if it might also be the same old node.
+	if numHeads == 1 {
+		for k := range graft.newHeads {
+			newHead = k
+		}
+		return
+	}
+
+	// With two candidate head nodes, the new one is the node that is
+	// not the current (old) head node.
+	for k := range graft.newHeads {
+		if k != oldHead {
+			newHead = k
+			break
+		}
 	}
 
 	// There is a conflict: the best choice ancestor is the graft point
@@ -514,10 +631,10 @@
 
 // getHead retrieves the object head from the DAG.
 func (d *dag) getHead(oid storage.ID) (storage.Version, error) {
-	if d.store == nil {
-		return 0, errors.New("invalid DAG")
-	}
 	var version storage.Version
+	if d.store == nil {
+		return version, errors.New("invalid DAG")
+	}
 	key := objHeadKey(oid)
 	err := d.heads.get(key, &version)
 	if err != nil {
@@ -537,7 +654,9 @@
 		iterVersions = append(iterVersions, head)
 	}
 	if graft := d.graft[oid]; graft != nil {
-		iterVersions = append(iterVersions, graft.newHead)
+		for k := range graft.newHeads {
+			iterVersions = append(iterVersions, k)
+		}
 	}
 
 	// Breadth-first traversal starting from the object head.
@@ -551,15 +670,15 @@
 
 // getGraftNodes is a testing and debug helper function that returns for
 // an object the graft information built and used during a sync operation.
-// The newHead version identifies the head node reported by the other device
-// during a sync operation.  The graftNodes map identifies the set of old
-// nodes where the new DAG fragments were attached and their depth level
-// in the DAG.
-func (d *dag) getGraftNodes(oid storage.ID) (storage.Version, map[storage.Version]uint64) {
+// The newHeads map identifies the candidate head nodes based on the data
+// reported by the other device during a sync operation.  The graftNodes map
+// identifies the set of old nodes where the new DAG fragments were attached
+// and their depth level in the DAG.
+func (d *dag) getGraftNodes(oid storage.ID) (map[storage.Version]struct{}, map[storage.Version]uint64) {
 	if d.store != nil {
 		if ginfo := d.graft[oid]; ginfo != nil {
-			return ginfo.newHead, ginfo.graftNodes
+			return ginfo.newHeads, ginfo.graftNodes
 		}
 	}
-	return 0, nil
+	return nil, nil
 }
diff --git a/runtimes/google/vsync/dag_test.go b/runtimes/google/vsync/dag_test.go
index 0b5d4c8..5f94836 100644
--- a/runtimes/google/vsync/dag_test.go
+++ b/runtimes/google/vsync/dag_test.go
@@ -141,6 +141,11 @@
 		t.Errorf("delNode() did not fail on a closed DAG: %v", err)
 	}
 
+	err = dag.addParent(oid, 4, 2, true)
+	if err == nil || err.Error() != "invalid DAG" {
+		t.Errorf("addParent() did not fail on a closed DAG: %v", err)
+	}
+
 	err = dag.setHead(oid, 4)
 	if err == nil || err.Error() != "invalid DAG" {
 		t.Errorf("setHead() did not fail on a closed DAG: %v", err)
@@ -166,8 +171,8 @@
 	if pmap := dag.getParentMap(oid); len(pmap) != 0 {
 		t.Errorf("getParentMap() found data on a closed DAG: %v", pmap)
 	}
-	if head, gmap := dag.getGraftNodes(oid); head != 0 || len(gmap) != 0 {
-		t.Errorf("getGraftNodes() found data on a closed DAG: head: %d, map: %v", head, gmap)
+	if hmap, gmap := dag.getGraftNodes(oid); hmap != nil || gmap != nil {
+		t.Errorf("getGraftNodes() found data on a closed DAG: head map: %v, graft map: %v", hmap, gmap)
 	}
 }
 
@@ -294,6 +299,65 @@
 	dag.close()
 }
 
+// TestAddParent tests adding parents to a DAG node.
+func TestAddParent(t *testing.T) {
+	dagfile := dagFilename()
+	defer os.Remove(dagfile)
+
+	dag, err := openDAG(dagfile)
+	if err != nil {
+		t.Fatalf("Cannot open new DAG file %s", dagfile)
+	}
+
+	version := storage.Version(7)
+	oid, err := strToObjID("789")
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if err = dag.addParent(oid, version, 1, true); err == nil {
+		t.Errorf("addParent() did not fail for an unknown object %d:%d in DAG file %s", oid, version, dagfile)
+	}
+
+	node := &dagNode{Level: 15, Logrec: "logrec-22"}
+	if err = dag.setNode(oid, version, node); err != nil {
+		t.Fatalf("Cannot set object %d:%d (%v) in DAG file %s", oid, version, node, dagfile)
+	}
+
+	for _, parent := range []storage.Version{1, 2, 3} {
+		if err = dag.addParent(oid, version, parent, true); err == nil {
+			t.Errorf("addParent() did not reject invalid parent %d for object %d:%d in DAG file %s",
+				parent, oid, version, dagfile)
+		}
+
+		pnode := &dagNode{Level: 11, Logrec: fmt.Sprint("logrec-%d", parent)}
+		if err = dag.setNode(oid, parent, pnode); err != nil {
+			t.Fatalf("Cannot set parent object %d:%d (%v) in DAG file %s", oid, parent, pnode, dagfile)
+		}
+
+		remote := parent%2 == 0
+		for i := 0; i < 2; i++ {
+			if err = dag.addParent(oid, version, parent, remote); err != nil {
+				t.Errorf("addParent() failed on parent %d, remote %d (i=%d) for object %d:%d in DAG file %s: %v",
+					parent, remote, i, oid, version, dagfile, err)
+			}
+		}
+	}
+
+	node2, err := dag.getNode(oid, version)
+	if err != nil || node2 == nil {
+		t.Errorf("Cannot find stored object %d:%d in DAG file %s", oid, version, dagfile)
+	}
+
+	expParents := []storage.Version{1, 2, 3}
+	if !reflect.DeepEqual(node2.Parents, expParents) {
+		t.Errorf("invalid parents for object %d:%d in DAG file %s: %v instead of %v",
+			oid, version, dagfile, node2.Parents, expParents)
+	}
+
+	dag.close()
+}
+
 // TestSetHead tests setting and getting a DAG head node across DAG open/close/reopen.
 func TestSetHead(t *testing.T) {
 	dagfile := dagFilename()
@@ -357,9 +421,9 @@
 	d.clearGraft()
 
 	// There should be no grafting info, and hasConflict() should fail.
-	newHead, grafts := d.getGraftNodes(oid)
-	if newHead != 0 || grafts != nil {
-		return fmt.Errorf("Object %d: graft info not cleared: newhead (%d), grafts (%v)", oid, newHead, grafts)
+	newHeads, grafts := d.getGraftNodes(oid)
+	if newHeads != nil || grafts != nil {
+		return fmt.Errorf("Object %d: graft info not cleared: newHeads (%v), grafts (%v)", oid, newHeads, grafts)
 	}
 
 	isConflict, newHead, oldHead, ancestor, errConflict := d.hasConflict(oid)
@@ -472,9 +536,11 @@
 	}
 
 	// Verify the grafting of remote nodes.
-	newHead, grafts := dag.getGraftNodes(oid)
-	if newHead != 2 {
-		t.Errorf("Object %d has invalid newhead %d in graft info in DAG file %s", oid, newHead, dagfile)
+	newHeads, grafts := dag.getGraftNodes(oid)
+
+	expNewHeads := map[storage.Version]struct{}{2: struct{}{}}
+	if !reflect.DeepEqual(newHeads, expNewHeads) {
+		t.Errorf("Object %d has invalid newHeads in DAG file %s: (%v) instead of (%v)", oid, dagfile, newHeads, expNewHeads)
 	}
 
 	expgrafts := map[storage.Version]uint64{}
@@ -551,9 +617,11 @@
 	}
 
 	// Verify the grafting of remote nodes.
-	newHead, grafts := dag.getGraftNodes(oid)
-	if newHead != 5 {
-		t.Errorf("Object %d has invalid newhead %d in graft info in DAG file %s", oid, newHead, dagfile)
+	newHeads, grafts := dag.getGraftNodes(oid)
+
+	expNewHeads := map[storage.Version]struct{}{5: struct{}{}}
+	if !reflect.DeepEqual(newHeads, expNewHeads) {
+		t.Errorf("Object %d has invalid newHeads in DAG file %s: (%v) instead of (%v)", oid, dagfile, newHeads, expNewHeads)
 	}
 
 	expgrafts := map[storage.Version]uint64{2: 2}
@@ -640,9 +708,11 @@
 	}
 
 	// Verify the grafting of remote nodes.
-	newHead, grafts := dag.getGraftNodes(oid)
-	if newHead != 5 {
-		t.Errorf("Object %d has invalid newhead %d in graft info in DAG file %s", oid, newHead, dagfile)
+	newHeads, grafts := dag.getGraftNodes(oid)
+
+	expNewHeads := map[storage.Version]struct{}{2: struct{}{}, 5: struct{}{}}
+	if !reflect.DeepEqual(newHeads, expNewHeads) {
+		t.Errorf("Object %d has invalid newHeads in DAG file %s: (%v) instead of (%v)", oid, dagfile, newHeads, expNewHeads)
 	}
 
 	expgrafts := map[storage.Version]uint64{1: 1}
@@ -740,9 +810,11 @@
 	}
 
 	// Verify the grafting of remote nodes.
-	newHead, grafts := dag.getGraftNodes(oid)
-	if newHead != 5 {
-		t.Errorf("Object %d has invalid newhead %d in graft info in DAG file %s", oid, newHead, dagfile)
+	newHeads, grafts := dag.getGraftNodes(oid)
+
+	expNewHeads := map[storage.Version]struct{}{2: struct{}{}, 5: struct{}{}}
+	if !reflect.DeepEqual(newHeads, expNewHeads) {
+		t.Errorf("Object %d has invalid newHeads in DAG file %s: (%v) instead of (%v)", oid, dagfile, newHeads, expNewHeads)
 	}
 
 	expgrafts := map[storage.Version]uint64{0: 0, 1: 1}
@@ -1090,3 +1162,339 @@
 	}
 	dag.close()
 }
+
+// TestRemoteLinkedNoConflictSameHead tests sync of remote updates that contain
+// linked nodes (conflict resolution by selecting an existing version) on top of
+// a local initial state without conflict.  An object is created locally and
+// updated twice (v0 -> v1 -> v2).  Another device has learned about v0, created
+// (v0 -> v3), then learned about (v0 -> v1) and resolved that conflict by selecting
+// v1 over v3.  Now it sends that new info (v3 and the v1/v3 link) back to the
+// original (local) device.  Instead of a v2/v3 conflict, the device sees that
+// v1 was chosen over v3 and resolves it as a no-conflict case.
+func TestRemoteLinkedNoConflictSameHead(t *testing.T) {
+	dagfile := dagFilename()
+	defer os.Remove(dagfile)
+
+	dag, err := openDAG(dagfile)
+	if err != nil {
+		t.Fatalf("Cannot open new DAG file %s", dagfile)
+	}
+
+	if err = dagReplayCommands(dag, "local-init-00.sync"); err != nil {
+		t.Fatal(err)
+	}
+	if err = dagReplayCommands(dag, "remote-noconf-link-00.log.sync"); err != nil {
+		t.Fatal(err)
+	}
+
+	// The head must not have moved (i.e. still at v2) and the parent map
+	// shows the newly grafted DAG fragment on top of the prior DAG.
+	oid, err := strToObjID("12345")
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if head, e := dag.getHead(oid); e != nil || head != 2 {
+		t.Errorf("Object %d has wrong head in DAG file %s: %d", oid, dagfile, head)
+	}
+
+	pmap := dag.getParentMap(oid)
+
+	exp := map[storage.Version][]storage.Version{0: nil, 1: {0, 3}, 2: {1}, 3: {0}}
+
+	if !reflect.DeepEqual(pmap, exp) {
+		t.Errorf("Invalid object %d parent map in DAG file %s: (%v) instead of (%v)", oid, dagfile, pmap, exp)
+	}
+
+	// Verify the grafting of remote nodes.
+	newHeads, grafts := dag.getGraftNodes(oid)
+
+	expNewHeads := map[storage.Version]struct{}{2: struct{}{}}
+	if !reflect.DeepEqual(newHeads, expNewHeads) {
+		t.Errorf("Object %d has invalid newHeads in DAG file %s: (%v) instead of (%v)", oid, dagfile, newHeads, expNewHeads)
+	}
+
+	expgrafts := map[storage.Version]uint64{0: 0, 3: 1}
+	if !reflect.DeepEqual(grafts, expgrafts) {
+		t.Errorf("Invalid object %d graft in DAG file %s: (%v) instead of (%v)", oid, dagfile, grafts, expgrafts)
+	}
+
+	// There should be no conflict.
+	isConflict, newHead, oldHead, ancestor, errConflict := dag.hasConflict(oid)
+	if !(!isConflict && newHead == 2 && oldHead == 2 && ancestor == 0 && errConflict == nil) {
+		t.Errorf("Object %d wrong conflict info: flag %t, newHead %d, oldHead %d, ancestor %d, err %v",
+			oid, isConflict, newHead, oldHead, ancestor, errConflict)
+	}
+
+	// Clear the grafting data and verify that hasConflict() fails without it.
+	dag.clearGraft()
+	isConflict, newHead, oldHead, ancestor, errConflict = dag.hasConflict(oid)
+	if errConflict == nil {
+		t.Errorf("hasConflict() did not fail w/o graft info: flag %t, newHead %d, oldHead %d, ancestor %d, err %v",
+			oid, isConflict, newHead, oldHead, ancestor, errConflict)
+	}
+
+	if err := checkEndOfSync(dag, oid); err != nil {
+		t.Fatal(err)
+	}
+}
+
+// TestRemoteLinkedConflict tests sync of remote updates that contain linked
+// nodes (conflict resolution by selecting an existing version) on top of a local
+// initial state triggering a local conflict.  An object is created locally and
+// updated twice (v0 -> v1 -> v2).  Another device has along the way learned about v0,
+// created (v0 -> v3), then learned about (v0 -> v1) and resolved that conflict by
+// selecting v3 over v1.  Now it sends that new info (v3 and the v3/v1 link) back
+// to the original (local) device.  The device sees a v2/v3 conflict.
+func TestRemoteLinkedConflict(t *testing.T) {
+	dagfile := dagFilename()
+	defer os.Remove(dagfile)
+
+	dag, err := openDAG(dagfile)
+	if err != nil {
+		t.Fatalf("Cannot open new DAG file %s", dagfile)
+	}
+
+	if err = dagReplayCommands(dag, "local-init-00.sync"); err != nil {
+		t.Fatal(err)
+	}
+	if err = dagReplayCommands(dag, "remote-conf-link.log.sync"); err != nil {
+		t.Fatal(err)
+	}
+
+	// The head must not have moved (i.e. still at v2) and the parent map
+	// shows the newly grafted DAG fragment on top of the prior DAG.
+	oid, err := strToObjID("12345")
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if head, e := dag.getHead(oid); e != nil || head != 2 {
+		t.Errorf("Object %d has wrong head in DAG file %s: %d", oid, dagfile, head)
+	}
+
+	pmap := dag.getParentMap(oid)
+
+	exp := map[storage.Version][]storage.Version{0: nil, 1: {0}, 2: {1}, 3: {0, 1}}
+
+	if !reflect.DeepEqual(pmap, exp) {
+		t.Errorf("Invalid object %d parent map in DAG file %s: (%v) instead of (%v)", oid, dagfile, pmap, exp)
+	}
+
+	// Verify the grafting of remote nodes.
+	newHeads, grafts := dag.getGraftNodes(oid)
+
+	expNewHeads := map[storage.Version]struct{}{2: struct{}{}, 3: struct{}{}}
+	if !reflect.DeepEqual(newHeads, expNewHeads) {
+		t.Errorf("Object %d has invalid newHeads in DAG file %s: (%v) instead of (%v)", oid, dagfile, newHeads, expNewHeads)
+	}
+
+	expgrafts := map[storage.Version]uint64{0: 0, 1: 1}
+	if !reflect.DeepEqual(grafts, expgrafts) {
+		t.Errorf("Invalid object %d graft in DAG file %s: (%v) instead of (%v)", oid, dagfile, grafts, expgrafts)
+	}
+
+	// There should be no conflict.
+	isConflict, newHead, oldHead, ancestor, errConflict := dag.hasConflict(oid)
+	if !(isConflict && newHead == 3 && oldHead == 2 && ancestor == 1 && errConflict == nil) {
+		t.Errorf("Object %d wrong conflict info: flag %t, newHead %d, oldHead %d, ancestor %d, err %v",
+			oid, isConflict, newHead, oldHead, ancestor, errConflict)
+	}
+
+	// Clear the grafting data and verify that hasConflict() fails without it.
+	dag.clearGraft()
+	isConflict, newHead, oldHead, ancestor, errConflict = dag.hasConflict(oid)
+	if errConflict == nil {
+		t.Errorf("hasConflict() did not fail w/o graft info: flag %t, newHead %d, oldHead %d, ancestor %d, err %v",
+			oid, isConflict, newHead, oldHead, ancestor, errConflict)
+	}
+
+	if err := checkEndOfSync(dag, oid); err != nil {
+		t.Fatal(err)
+	}
+}
+
+// TestRemoteLinkedNoConflictNewHead tests sync of remote updates that contain
+// linked nodes (conflict resolution by selecting an existing version) on top of
+// a local initial state without conflict, but moves the head node to a new one.
+// An object is created locally and updated twice (v0 -> v1 -> v2).  Another device
+// has along the way learned about v0, created (v0 -> v3), then learned about
+// (v0 -> v1 -> v2) and resolved that conflict by selecting v3 over v2.  Now it
+// sends that new info (v3 and the v3/v2 link) back to the original (local) device.
+// The device sees that the new head v3 is "derived" from v2 thus no conflict.
+func TestRemoteLinkedConflictNewHead(t *testing.T) {
+	dagfile := dagFilename()
+	defer os.Remove(dagfile)
+
+	dag, err := openDAG(dagfile)
+	if err != nil {
+		t.Fatalf("Cannot open new DAG file %s", dagfile)
+	}
+
+	if err = dagReplayCommands(dag, "local-init-00.sync"); err != nil {
+		t.Fatal(err)
+	}
+	if err = dagReplayCommands(dag, "remote-noconf-link-01.log.sync"); err != nil {
+		t.Fatal(err)
+	}
+
+	// The head must not have moved (i.e. still at v2) and the parent map
+	// shows the newly grafted DAG fragment on top of the prior DAG.
+	oid, err := strToObjID("12345")
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if head, e := dag.getHead(oid); e != nil || head != 2 {
+		t.Errorf("Object %d has wrong head in DAG file %s: %d", oid, dagfile, head)
+	}
+
+	pmap := dag.getParentMap(oid)
+
+	exp := map[storage.Version][]storage.Version{0: nil, 1: {0}, 2: {1}, 3: {0, 2}}
+
+	if !reflect.DeepEqual(pmap, exp) {
+		t.Errorf("Invalid object %d parent map in DAG file %s: (%v) instead of (%v)", oid, dagfile, pmap, exp)
+	}
+
+	// Verify the grafting of remote nodes.
+	newHeads, grafts := dag.getGraftNodes(oid)
+
+	expNewHeads := map[storage.Version]struct{}{3: struct{}{}}
+	if !reflect.DeepEqual(newHeads, expNewHeads) {
+		t.Errorf("Object %d has invalid newHeads in DAG file %s: (%v) instead of (%v)", oid, dagfile, newHeads, expNewHeads)
+	}
+
+	expgrafts := map[storage.Version]uint64{0: 0, 2: 2}
+	if !reflect.DeepEqual(grafts, expgrafts) {
+		t.Errorf("Invalid object %d graft in DAG file %s: (%v) instead of (%v)", oid, dagfile, grafts, expgrafts)
+	}
+
+	// There should be no conflict.
+	isConflict, newHead, oldHead, ancestor, errConflict := dag.hasConflict(oid)
+	if !(!isConflict && newHead == 3 && oldHead == 2 && ancestor == 0 && errConflict == nil) {
+		t.Errorf("Object %d wrong conflict info: flag %t, newHead %d, oldHead %d, ancestor %d, err %v",
+			oid, isConflict, newHead, oldHead, ancestor, errConflict)
+	}
+
+	// Clear the grafting data and verify that hasConflict() fails without it.
+	dag.clearGraft()
+	isConflict, newHead, oldHead, ancestor, errConflict = dag.hasConflict(oid)
+	if errConflict == nil {
+		t.Errorf("hasConflict() did not fail w/o graft info: flag %t, newHead %d, oldHead %d, ancestor %d, err %v",
+			oid, isConflict, newHead, oldHead, ancestor, errConflict)
+	}
+
+	if err := checkEndOfSync(dag, oid); err != nil {
+		t.Fatal(err)
+	}
+}
+
+// TestRemoteLinkedNoConflictNewHeadOvertake tests sync of remote updates that
+// contain linked nodes (conflict resolution by selecting an existing version)
+// on top of a local initial state without conflict, but moves the head node
+// to a new one that overtook the linked node.
+// An object is created locally and updated twice (v0 -> v1 -> v2).  Another
+// device has along the way learned about v0, created (v0 -> v3), then learned
+// about (v0 -> v1 -> v2) and resolved that conflict by selecting v2 over v3.
+// Then it creates a new update v4 from v2 (v2 -> v4).  Now it sends that new
+// info (v3, the v2/v3 link, and v4) back to the original (local) device.
+// The device sees that the new head v4 is "derived" from v2 thus no conflict.
+func TestRemoteLinkedConflictNewHeadOvertake(t *testing.T) {
+	dagfile := dagFilename()
+	defer os.Remove(dagfile)
+
+	dag, err := openDAG(dagfile)
+	if err != nil {
+		t.Fatalf("Cannot open new DAG file %s", dagfile)
+	}
+
+	if err = dagReplayCommands(dag, "local-init-00.sync"); err != nil {
+		t.Fatal(err)
+	}
+	if err = dagReplayCommands(dag, "remote-noconf-link-02.log.sync"); err != nil {
+		t.Fatal(err)
+	}
+
+	// The head must not have moved (i.e. still at v2) and the parent map
+	// shows the newly grafted DAG fragment on top of the prior DAG.
+	oid, err := strToObjID("12345")
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if head, e := dag.getHead(oid); e != nil || head != 2 {
+		t.Errorf("Object %d has wrong head in DAG file %s: %d", oid, dagfile, head)
+	}
+
+	pmap := dag.getParentMap(oid)
+
+	exp := map[storage.Version][]storage.Version{0: nil, 1: {0}, 2: {1, 3}, 3: {0}, 4: {2}}
+
+	if !reflect.DeepEqual(pmap, exp) {
+		t.Errorf("Invalid object %d parent map in DAG file %s: (%v) instead of (%v)", oid, dagfile, pmap, exp)
+	}
+
+	// Verify the grafting of remote nodes.
+	newHeads, grafts := dag.getGraftNodes(oid)
+
+	expNewHeads := map[storage.Version]struct{}{4: struct{}{}}
+	if !reflect.DeepEqual(newHeads, expNewHeads) {
+		t.Errorf("Object %d has invalid newHeads in DAG file %s: (%v) instead of (%v)", oid, dagfile, newHeads, expNewHeads)
+	}
+
+	expgrafts := map[storage.Version]uint64{0: 0, 2: 2, 3: 1}
+	if !reflect.DeepEqual(grafts, expgrafts) {
+		t.Errorf("Invalid object %d graft in DAG file %s: (%v) instead of (%v)", oid, dagfile, grafts, expgrafts)
+	}
+
+	// There should be no conflict.
+	isConflict, newHead, oldHead, ancestor, errConflict := dag.hasConflict(oid)
+	if !(!isConflict && newHead == 4 && oldHead == 2 && ancestor == 0 && errConflict == nil) {
+		t.Errorf("Object %d wrong conflict info: flag %t, newHead %d, oldHead %d, ancestor %d, err %v",
+			oid, isConflict, newHead, oldHead, ancestor, errConflict)
+	}
+
+	// Then we can move the head and clear the grafting data.
+	if err = dag.moveHead(oid, newHead); err != nil {
+		t.Errorf("Object %d cannot move head to %d in DAG file %s: %v", oid, newHead, dagfile, err)
+	}
+
+	// Clear the grafting data and verify that hasConflict() fails without it.
+	dag.clearGraft()
+	isConflict, newHead, oldHead, ancestor, errConflict = dag.hasConflict(oid)
+	if errConflict == nil {
+		t.Errorf("hasConflict() did not fail w/o graft info: flag %t, newHead %d, oldHead %d, ancestor %d, err %v",
+			oid, isConflict, newHead, oldHead, ancestor, errConflict)
+	}
+
+	// Now new info comes from another device repeating the v2/v3 link.
+	// Verify that it is a NOP (no changes).
+	if err = dagReplayCommands(dag, "remote-noconf-link-repeat.log.sync"); err != nil {
+		t.Fatal(err)
+	}
+
+	if head, e := dag.getHead(oid); e != nil || head != 4 {
+		t.Errorf("Object %d has wrong head in DAG file %s: %d", oid, dagfile, head)
+	}
+
+	newHeads, grafts = dag.getGraftNodes(oid)
+	if !reflect.DeepEqual(newHeads, expNewHeads) {
+		t.Errorf("Object %d has invalid newHeads in DAG file %s: (%v) instead of (%v)", oid, dagfile, newHeads, expNewHeads)
+	}
+
+	expgrafts = map[storage.Version]uint64{}
+	if !reflect.DeepEqual(grafts, expgrafts) {
+		t.Errorf("Invalid object %d graft in DAG file %s: (%v) instead of (%v)", oid, dagfile, grafts, expgrafts)
+	}
+
+	isConflict, newHead, oldHead, ancestor, errConflict = dag.hasConflict(oid)
+	if !(!isConflict && newHead == 4 && oldHead == 4 && ancestor == 0 && errConflict == nil) {
+		t.Errorf("Object %d wrong conflict info: flag %t, newHead %d, oldHead %d, ancestor %d, err %v",
+			oid, isConflict, newHead, oldHead, ancestor, errConflict)
+	}
+
+	if err := checkEndOfSync(dag, oid); err != nil {
+		t.Fatal(err)
+	}
+}
diff --git a/runtimes/google/vsync/gc.go b/runtimes/google/vsync/gc.go
index 726868c..9dc8dd1 100644
--- a/runtimes/google/vsync/gc.go
+++ b/runtimes/google/vsync/gc.go
@@ -96,7 +96,9 @@
 	// strictCheck when enabled performs strict checking of every
 	// log record being deleted to confirm that it should be in
 	// fact deleted.
-	strictCheck = true
+	// TODO(hpucha): Support strictCheck in the presence
+	// of Link log records.
+	strictCheck = false
 
 	// Every compactCount iterations of garbage collection, kvdb
 	// is compacted.  This value has performance implications as
@@ -304,6 +306,12 @@
 			return err
 		}
 
+		if rec.RecType == LinkRec {
+			// For a link log record, gc it right away.
+			g.dagPruneCallBack(logRecKey(devid, gnum, l))
+			continue
+		}
+
 		// Insert the object in this log record to the prune
 		// map if needed.
 		// If this object does not exist, create it.
@@ -441,19 +449,21 @@
 		if err != nil {
 			return err
 		}
-		objHist, ok := g.verifyPruneMap[rec.ObjID]
-		if !ok {
-			return errors.New("obj not found in verifyMap")
-		}
-		_, found := objHist.versions[rec.CurVers]
-		// If online consistency check is in progress, we
-		// cannot strictly verify all the versions to be
-		// deleted, and we ignore the failure to find a
-		// version.
-		if found {
-			delete(objHist.versions, rec.CurVers)
-		} else if !g.checkConsistency {
-			return errors.New("verification failed")
+		if rec.RecType == NodeRec {
+			objHist, ok := g.verifyPruneMap[rec.ObjID]
+			if !ok {
+				return errors.New("obj not found in verifyMap")
+			}
+			_, found := objHist.versions[rec.CurVers]
+			// If online consistency check is in progress, we
+			// cannot strictly verify all the versions to be
+			// deleted, and we ignore the failure to find a
+			// version.
+			if found {
+				delete(objHist.versions, rec.CurVers)
+			} else if !g.checkConsistency {
+				return errors.New("verification failed")
+			}
 		}
 	}
 
diff --git a/runtimes/google/vsync/ilog.go b/runtimes/google/vsync/ilog.go
index feb92a8..930df89 100644
--- a/runtimes/google/vsync/ilog.go
+++ b/runtimes/google/vsync/ilog.go
@@ -45,6 +45,7 @@
 	"veyron/services/store/raw"
 
 	"veyron2/storage"
+	"veyron2/vlog"
 )
 
 var (
@@ -300,12 +301,13 @@
 	return l.gens.del(key)
 }
 
-// createLocalLogRec creates a new local log record.
+// createLocalLogRec creates a new local log record of type NodeRec.
 func (l *iLog) createLocalLogRec(obj storage.ID, vers storage.Version, par []storage.Version, val *LogValue) (*LogRec, error) {
 	rec := &LogRec{
-		DevID: l.s.id,
-		GNum:  l.head.Curgen,
-		LSN:   l.head.Curlsn,
+		DevID:   l.s.id,
+		GNum:    l.head.Curgen,
+		LSN:     l.head.Curlsn,
+		RecType: NodeRec,
 
 		ObjID:   obj,
 		CurVers: vers,
@@ -319,6 +321,25 @@
 	return rec, nil
 }
 
+// createLocalLinkLogRec creates a new local log record of type LinkRec.
+func (l *iLog) createLocalLinkLogRec(obj storage.ID, vers, par storage.Version) (*LogRec, error) {
+	rec := &LogRec{
+		DevID:   l.s.id,
+		GNum:    l.head.Curgen,
+		LSN:     l.head.Curlsn,
+		RecType: LinkRec,
+
+		ObjID:   obj,
+		CurVers: vers,
+		Parents: []storage.Version{par},
+	}
+
+	// Increment the LSN for the local log.
+	l.head.Curlsn++
+
+	return rec, nil
+}
+
 // createRemoteGeneration adds a new remote generation.
 func (l *iLog) createRemoteGeneration(dev DeviceID, gnum GenID, gen *genMetadata) error {
 	if l.db == nil {
@@ -357,6 +378,7 @@
 	}
 	err := l.putGenMetadata(l.s.id, g, val)
 
+	vlog.VI(2).Infof("createLocalGeneration:: created gen %d %v", g, val)
 	// Move to the next generation irrespective of err.
 	l.head.Curorder++
 	l.head.Curgen++
@@ -370,6 +392,8 @@
 	if l.db == nil {
 		return errInvalidLog
 	}
+
+	vlog.VI(2).Infof("processWatchRecord:: adding object %v %v", objID, vers)
 	// Check if the object version already exists in the DAG. if so return.
 	if l.s.dag.hasNode(objID, vers) {
 		return nil
diff --git a/runtimes/google/vsync/initiator.go b/runtimes/google/vsync/initiator.go
index 67422d9..af7ece9 100644
--- a/runtimes/google/vsync/initiator.go
+++ b/runtimes/google/vsync/initiator.go
@@ -44,7 +44,7 @@
 	// peerSyncInterval is the duration between two consecutive
 	// sync events.  In every sync event, the initiator contacts
 	// one of its peers to obtain any pending updates.
-	peerSyncInterval = 100 * time.Millisecond
+	peerSyncInterval = 50 * time.Millisecond
 
 	// peerSelectionPolicy is the policy used to select a peer when
 	// the initiator gets a chance to sync.
@@ -191,6 +191,7 @@
 		return GenVector{}, err
 	}
 
+	vlog.VI(2).Infof("updateLocalGeneration:: created gen %d", gen)
 	// Update local generation vector in devTable.
 	if err = i.syncd.devtab.updateGeneration(i.syncd.id, i.syncd.id, gen); err != nil {
 		return GenVector{}, err
@@ -308,10 +309,16 @@
 	if err != nil {
 		return err
 	}
-	if err = i.syncd.dag.addNode(rec.ObjID, rec.CurVers, true, rec.Parents, logKey); err != nil {
-		return err
+
+	vlog.VI(2).Infof("insertRecInLogAndDag:: Adding log record %v", rec)
+	switch rec.RecType {
+	case NodeRec:
+		return i.syncd.dag.addNode(rec.ObjID, rec.CurVers, true, rec.Parents, logKey)
+	case LinkRec:
+		return i.syncd.dag.addParent(rec.ObjID, rec.CurVers, rec.Parents[0], true)
+	default:
+		return fmt.Errorf("unknown log record type")
 	}
-	return nil
 }
 
 // createGenMetadataBatch inserts a batch of generations into the log.
@@ -337,12 +344,31 @@
 
 // processUpdatedObjects processes all the updates received by the
 // initiator, one object at a time. For each updated object, we first
-// check if the object has any conflicts.  If there is a conflict, we
-// resolve the conflict and generate a new store mutation reflecting
-// the conflict resolution. If there is no conflict, we generate a
-// store mutation to simply update the store to the latest value. We
-// then put all these mutations in the store. If the put succeeds, we
-// update the log and dag state suitably (move the head ptr of the
+// check if the object has any conflicts, resulting in three
+// possibilities:
+//
+// * There is no conflict, and no updates are needed to the store
+// (isConflict=false, newHead == oldHead). All changes received convey
+// information that still keeps the local head as the most recent
+// version. This occurs when conflicts are resolved by blessings.
+//
+// * There is no conflict, but a remote version is discovered that
+// builds on the local head (isConflict=false, newHead != oldHead). In
+// this case, we generate a store mutation to simply update the store
+// to the latest value.
+//
+// * There is a conflict and we call into the app or the system to
+// resolve the conflict, resulting in three possibilties: (a) conflict
+// was resolved by blessing the local version. In this case, store
+// need not be updated, but a link is added to record the
+// blessing. (b) conflict was resolved by blessing the remote
+// version. In this case, store is updated with the remote version and
+// a link is added as well. (c) conflict was resolved by generating a
+// new store mutation. In this case, store is updated with the new
+// version.
+//
+// We then put all these mutations in the store. If the put succeeds,
+// we update the log and dag state suitably (move the head ptr of the
 // object in the dag to the latest version, and create a new log
 // record reflecting conflict resolution if any). Puts to store can
 // fail since preconditions on the objects may have been violated. In
@@ -355,12 +381,11 @@
 			return err
 		}
 
-		m, err := i.resolveConflicts()
-		if err != nil {
+		if err := i.resolveConflicts(); err != nil {
 			return err
 		}
 
-		err = i.updateStoreAndSync(ctx, m, local, minGens, remote, dID)
+		err := i.updateStoreAndSync(ctx, local, minGens, remote, dID)
 		if err == nil {
 			break
 		}
@@ -370,7 +395,7 @@
 		// solution. Next iteration will have coordination
 		// with watch thread to intelligently retry. Hence
 		// this value is not a config param.
-		time.Sleep(10 * time.Second)
+		time.Sleep(1 * time.Second)
 	}
 
 	// Remove any pending state.
@@ -390,60 +415,28 @@
 		// Check if object has a conflict.
 		var err error
 		st.isConflict, st.newHead, st.oldHead, st.ancestor, err = i.syncd.dag.hasConflict(obj)
+		vlog.VI(2).Infof("detectConflicts:: object %v state %v err %v",
+			obj, st, err)
 		if err != nil {
 			return err
 		}
-		if !st.isConflict {
-			rec, err := i.getLogRec(obj, st.newHead)
-			if err != nil {
-				return err
-			}
-			st.resolvVal = &rec.Value
-			// Sanity check.
-			if st.resolvVal.Mutation.Version != st.newHead {
-				return fmt.Errorf("bad mutation %d %d",
-					st.resolvVal.Mutation.Version, st.newHead)
-			}
-		}
 	}
 	return nil
 }
 
-// getLogRec returns the log record corresponding to a given object and its version.
-func (i *syncInitiator) getLogRec(obj storage.ID, vers storage.Version) (*LogRec, error) {
-	logKey, err := i.syncd.dag.getLogrec(obj, vers)
-	if err != nil {
-		return nil, err
-	}
-	dev, gen, lsn, err := splitLogRecKey(logKey)
-	if err != nil {
-		return nil, err
-	}
-	rec, err := i.syncd.log.getLogRec(dev, gen, lsn)
-	if err != nil {
-		return nil, err
-	}
-	return rec, nil
-}
-
-// resolveConflicts resolves conflicts for updated objects.
-func (i *syncInitiator) resolveConflicts() ([]raw.Mutation, error) {
+// resolveConflicts resolves conflicts for updated objects. Conflicts
+// may be resolved by adding new versions or blessing either the local
+// or the remote version.
+func (i *syncInitiator) resolveConflicts() error {
 	switch conflictResolutionPolicy {
 	case useTime:
 		if err := i.resolveConflictsByTime(); err != nil {
-			return nil, err
+			return err
 		}
 	default:
-		return nil, fmt.Errorf("unknown conflict resolution policy")
+		return fmt.Errorf("unknown conflict resolution policy")
 	}
-
-	var m []raw.Mutation
-	for _, st := range i.updObjects {
-		// Append to mutations.
-		st.resolvVal.Mutation.PriorVersion = st.oldHead
-		m = append(m, st.resolvVal.Mutation)
-	}
-	return m, nil
+	return nil
 }
 
 // resolveConflictsByTime resolves conflicts using the timestamps
@@ -482,11 +475,10 @@
 			res = 1
 		}
 
-		m := lrecs[res].Value.Mutation
-		m.Version = storage.NewVersion()
-
-		// TODO(hpucha): handle continue and delete flags.
-		st.resolvVal = &LogValue{Mutation: m}
+		// Instead of creating a new version that resolves the
+		// conflict, we are blessing an existing version as
+		// the conflict resolution.
+		st.resolvVal = &lrecs[res].Value
 	}
 
 	return nil
@@ -511,11 +503,38 @@
 
 // updateStoreAndSync updates the store, and if that is successful,
 // updates log and dag data structures.
-func (i *syncInitiator) updateStoreAndSync(ctx context.T, m []raw.Mutation, local, minGens, remote GenVector, dID DeviceID) error {
+func (i *syncInitiator) updateStoreAndSync(ctx context.T, local, minGens, remote GenVector, dID DeviceID) error {
 	// TODO(hpucha): Eliminate reaching into syncd's lock.
 	i.syncd.lock.Lock()
 	defer i.syncd.lock.Unlock()
 
+	var m []raw.Mutation
+	for obj, st := range i.updObjects {
+		if !st.isConflict {
+			rec, err := i.getLogRec(obj, st.newHead)
+			if err != nil {
+				return err
+			}
+			st.resolvVal = &rec.Value
+			// Sanity check.
+			if st.resolvVal.Mutation.Version != st.newHead {
+				return fmt.Errorf("bad mutation %d %d",
+					st.resolvVal.Mutation.Version, st.newHead)
+			}
+		}
+
+		// If the local version is picked, no further updates
+		// to the store are needed. If the remote version is
+		// picked, we put it in the store.
+		if st.resolvVal.Mutation.Version != st.oldHead {
+			// Append to mutations.
+			st.resolvVal.Mutation.PriorVersion = st.oldHead
+			vlog.VI(2).Infof("updateStoreAndSync:: appending mutation %v for obj %v",
+				st.resolvVal.Mutation, obj)
+			m = append(m, st.resolvVal.Mutation)
+		}
+	}
+
 	// TODO(hpucha): We will hold the lock across PutMutations rpc
 	// to prevent a race with watcher. The next iteration will
 	// clean up this coordination.
@@ -553,30 +572,68 @@
 	return nil
 }
 
+// getLogRec returns the log record corresponding to a given object and its version.
+func (i *syncInitiator) getLogRec(obj storage.ID, vers storage.Version) (*LogRec, error) {
+	logKey, err := i.syncd.dag.getLogrec(obj, vers)
+	if err != nil {
+		return nil, err
+	}
+	dev, gen, lsn, err := splitLogRecKey(logKey)
+	if err != nil {
+		return nil, err
+	}
+	rec, err := i.syncd.log.getLogRec(dev, gen, lsn)
+	if err != nil {
+		return nil, err
+	}
+	return rec, nil
+}
+
 // updateLogAndDag updates the log and dag data structures on a successful store put.
 func (i *syncInitiator) updateLogAndDag() error {
 	for obj, st := range i.updObjects {
 		if st.isConflict {
 			// Object had a conflict, which was resolved successfully.
 			// Put is successful, create a log record.
-			parents := []storage.Version{st.newHead, st.oldHead}
-			rec, err := i.syncd.log.createLocalLogRec(obj, st.resolvVal.Mutation.Version, parents, st.resolvVal)
+			var err error
+			var rec *LogRec
+
+			switch {
+			case st.resolvVal.Mutation.Version == st.oldHead:
+				// Local version was blessed as the conflict resolution.
+				rec, err = i.syncd.log.createLocalLinkLogRec(obj, st.oldHead, st.newHead)
+			case st.resolvVal.Mutation.Version == st.newHead:
+				// Remote version was blessed as the conflict resolution.
+				rec, err = i.syncd.log.createLocalLinkLogRec(obj, st.newHead, st.oldHead)
+			default:
+				// New version was created to resolve the conflict.
+				parents := []storage.Version{st.newHead, st.oldHead}
+				rec, err = i.syncd.log.createLocalLogRec(obj, st.resolvVal.Mutation.Version, parents, st.resolvVal)
+
+			}
 			if err != nil {
 				return err
 			}
-
 			logKey, err := i.syncd.log.putLogRec(rec)
 			if err != nil {
 				return err
 			}
-
-			// Put is successful, add a new DAG node.
-			if err = i.syncd.dag.addNode(obj, st.resolvVal.Mutation.Version, false, parents, logKey); err != nil {
+			// Add a new DAG node.
+			switch rec.RecType {
+			case NodeRec:
+				err = i.syncd.dag.addNode(obj, rec.CurVers, false, rec.Parents, logKey)
+			case LinkRec:
+				err = i.syncd.dag.addParent(obj, rec.CurVers, rec.Parents[0], false)
+			default:
+				return fmt.Errorf("unknown log record type")
+			}
+			if err != nil {
 				return err
 			}
 		}
 
-		// Move the head.
+		// Move the head. This should be idempotent. We may
+		// move head to the local head in some cases.
 		if err := i.syncd.dag.moveHead(obj, st.resolvVal.Mutation.Version); err != nil {
 			return err
 		}
diff --git a/runtimes/google/vsync/initiator_test.go b/runtimes/google/vsync/initiator_test.go
index 3c9a063..468c882 100644
--- a/runtimes/google/vsync/initiator_test.go
+++ b/runtimes/google/vsync/initiator_test.go
@@ -172,22 +172,27 @@
 			t.Errorf("GetNode() can not find object %d %d in DAG, err %v", objid, i, err)
 		}
 	}
-
 	if err := s.hdlInitiator.detectConflicts(); err != nil {
 		t.Fatalf("detectConflicts failed with err %v", err)
 	}
 	if len(s.hdlInitiator.updObjects) != 1 {
 		t.Errorf("Unexpected number of updated objects %d", len(s.hdlInitiator.updObjects))
 	}
-	m, err := s.hdlInitiator.resolveConflicts()
-	if err != nil {
+	st := s.hdlInitiator.updObjects[objid]
+	if st.isConflict {
+		t.Errorf("Detected a conflict %v", st)
+	}
+	if st.newHead != 2 || st.oldHead != storage.NoVersion {
+		t.Errorf("Conflict detection didn't succeed %v", st)
+	}
+	if err := s.hdlInitiator.resolveConflicts(); err != nil {
 		t.Fatalf("resolveConflicts failed with err %v", err)
 	}
-	if len(m) != 1 || m[0].PriorVersion != storage.NoVersion || m[0].Version != 2 {
-		t.Errorf("Unexpected mutations %v", m)
+	if err := s.hdlInitiator.updateStoreAndSync(nil, GenVector{}, minGens, GenVector{}, "VeyronPhone"); err != nil {
+		t.Fatalf("updateStoreAndSync failed with err %v", err)
 	}
-	if err = s.hdlInitiator.updateLogAndDag(); err != nil {
-		t.Fatalf("updateLogAndDag failed with err %v", err)
+	if st.resolvVal.Mutation.PriorVersion != storage.NoVersion || st.resolvVal.Mutation.Version != 2 {
+		t.Errorf("Mutation generation is not accurate %v", st)
 	}
 	if s.log.head.Curgen != 1 || s.log.head.Curlsn != 0 || s.log.head.Curorder != 1 {
 		t.Errorf("Data mismatch in log header %v", s.log.head)
@@ -270,15 +275,21 @@
 	if len(s.hdlInitiator.updObjects) != 1 {
 		t.Errorf("Unexpected number of updated objects %d", len(s.hdlInitiator.updObjects))
 	}
-	m, err := s.hdlInitiator.resolveConflicts()
-	if err != nil {
+	st := s.hdlInitiator.updObjects[objid]
+	if st.isConflict {
+		t.Errorf("Detected a conflict %v", st)
+	}
+	if st.newHead != 2 || st.oldHead != storage.NoVersion {
+		t.Errorf("Conflict detection didn't succeed %v", st)
+	}
+	if err := s.hdlInitiator.resolveConflicts(); err != nil {
 		t.Fatalf("resolveConflicts failed with err %v", err)
 	}
-	if len(m) != 1 || m[0].PriorVersion != storage.NoVersion || m[0].Version != 2 {
-		t.Errorf("Unexpected mutations %v", m)
+	if err := s.hdlInitiator.updateStoreAndSync(nil, GenVector{}, minGens, GenVector{}, "VeyronPhone"); err != nil {
+		t.Fatalf("updateStoreAndSync failed with err %v", err)
 	}
-	if err = s.hdlInitiator.updateLogAndDag(); err != nil {
-		t.Fatalf("updateLogAndDag failed with err %v", err)
+	if st.resolvVal.Mutation.PriorVersion != storage.NoVersion || st.resolvVal.Mutation.Version != 2 {
+		t.Errorf("Mutation generation is not accurate %v", st)
 	}
 	if s.log.head.Curgen != 1 || s.log.head.Curlsn != 0 || s.log.head.Curorder != 1 {
 		t.Errorf("Data mismatch in log header %v", s.log.head)
@@ -362,22 +373,27 @@
 			v = v + 1
 		}
 	}
-
 	if err := s.hdlInitiator.detectConflicts(); err != nil {
 		t.Fatalf("detectConflicts failed with err %v", err)
 	}
 	if len(s.hdlInitiator.updObjects) != 1 {
 		t.Errorf("Unexpected number of updated objects %d", len(s.hdlInitiator.updObjects))
 	}
-	m, err := s.hdlInitiator.resolveConflicts()
-	if err != nil {
+	st := s.hdlInitiator.updObjects[objid]
+	if st.isConflict {
+		t.Errorf("Detected a conflict %v", st)
+	}
+	if st.newHead != 5 || st.oldHead != 2 {
+		t.Errorf("Conflict detection didn't succeed %v", st)
+	}
+	if err := s.hdlInitiator.resolveConflicts(); err != nil {
 		t.Fatalf("resolveConflicts failed with err %v", err)
 	}
-	if len(m) != 1 || m[0].PriorVersion != 2 || m[0].Version != 5 {
-		t.Errorf("Unexpected versions %v", m[0])
+	if err := s.hdlInitiator.updateStoreAndSync(nil, GenVector{}, minGens, GenVector{}, "VeyronPhone"); err != nil {
+		t.Fatalf("updateStoreAndSync failed with err %v", err)
 	}
-	if err = s.hdlInitiator.updateLogAndDag(); err != nil {
-		t.Fatalf("updateLogAndDag failed with err %v", err)
+	if st.resolvVal.Mutation.PriorVersion != 2 || st.resolvVal.Mutation.Version != 5 {
+		t.Errorf("Mutation generation is not accurate %v", st)
 	}
 	if s.log.head.Curgen != 1 || s.log.head.Curlsn != 3 || s.log.head.Curorder != 1 {
 		t.Errorf("Data mismatch in log header %v", s.log.head)
@@ -437,16 +453,14 @@
 		t.Errorf("Data mismatch for generation metadata: %v instead of %v",
 			curVal, expVal)
 	}
-
 	if err := s.hdlInitiator.detectConflicts(); err != nil {
 		t.Fatalf("detectConflicts failed with err %v", err)
 	}
-	m, err := s.hdlInitiator.resolveConflicts()
-	if err != nil {
+	if err := s.hdlInitiator.resolveConflicts(); err != nil {
 		t.Fatalf("resolveConflicts failed with err %v", err)
 	}
-	if err = s.hdlInitiator.updateLogAndDag(); err != nil {
-		t.Fatalf("updateLogAndDag failed with err %v", err)
+	if err := s.hdlInitiator.updateStoreAndSync(nil, GenVector{}, minGens, GenVector{}, "VeyronPhone"); err != nil {
+		t.Fatalf("updateStoreAndSync failed with err %v", err)
 	}
 
 	objid, err := strToObjID("12345")
@@ -466,6 +480,9 @@
 			if curRec.ObjID != objid {
 				t.Errorf("Data mismatch in log record %v", curRec)
 			}
+			if devid == "VeyronTab" && index == 3 && curRec.RecType != LinkRec {
+				t.Errorf("Data mismatch in log record %v", curRec)
+			}
 			// Verify DAG state.
 			if _, err := s.dag.getNode(objid, v); err != nil {
 				t.Errorf("GetNode() can not find object %d %d in DAG, err %v", objid, i, err)
@@ -473,19 +490,25 @@
 			v = v + 1
 		}
 	}
-
-	if len(m) != 1 || len(s.hdlInitiator.updObjects) != 1 {
-		t.Errorf("Unexpected number of updated objects %d %d", len(m), len(s.hdlInitiator.updObjects))
+	if len(s.hdlInitiator.updObjects) != 1 {
+		t.Errorf("Unexpected number of updated objects %d", len(s.hdlInitiator.updObjects))
 	}
-	if m[0].PriorVersion != 2 {
-		t.Errorf("Unexpected version %v", m[0])
+	st := s.hdlInitiator.updObjects[objid]
+	if !st.isConflict {
+		t.Errorf("Didn't detect a conflict %v", st)
+	}
+	if st.newHead != 5 || st.oldHead != 2 {
+		t.Errorf("Conflict detection didn't succeed %v", st)
+	}
+	if st.resolvVal.Mutation.PriorVersion != 2 || st.resolvVal.Mutation.Version != 5 {
+		t.Errorf("Mutation generation is not accurate %v", st)
 	}
 	// Curlsn == 4 for the log record that resolves conflict.
 	if s.log.head.Curgen != 1 || s.log.head.Curlsn != 4 || s.log.head.Curorder != 1 {
 		t.Errorf("Data mismatch in log header %v", s.log.head)
 	}
 	// Verify DAG state.
-	if head, err := s.dag.getHead(objid); err != nil || head <= 5 {
+	if head, err := s.dag.getHead(objid); err != nil || head != 5 {
 		t.Errorf("Invalid object %d head in DAG %s, err %v", objid, head, err)
 	}
 }
@@ -555,12 +578,11 @@
 	if err := s.hdlInitiator.detectConflicts(); err != nil {
 		t.Fatalf("detectConflicts failed with err %v", err)
 	}
-	m, err := s.hdlInitiator.resolveConflicts()
-	if err != nil {
+	if err := s.hdlInitiator.resolveConflicts(); err != nil {
 		t.Fatalf("resolveConflicts failed with err %v", err)
 	}
-	if err = s.hdlInitiator.updateLogAndDag(); err != nil {
-		t.Fatalf("updateLogAndDag failed with err %v", err)
+	if err := s.hdlInitiator.updateStoreAndSync(nil, GenVector{}, minGens, GenVector{}, "VeyronPhone"); err != nil {
+		t.Fatalf("updateStoreAndSync failed with err %v", err)
 	}
 
 	objid, err := strToObjID("12345")
@@ -580,6 +602,9 @@
 			if curRec.ObjID != objid {
 				t.Errorf("Data mismatch in log record %v", curRec)
 			}
+			if devid == "VeyronTab" && index == 3 && curRec.RecType != LinkRec {
+				t.Errorf("Data mismatch in log record %v", curRec)
+			}
 			// Verify DAG state.
 			if _, err := s.dag.getNode(objid, v); err != nil {
 				t.Errorf("GetNode() can not find object %d %d in DAG, err %v", objid, i, err)
@@ -587,18 +612,357 @@
 			v = v + 1
 		}
 	}
-	if len(m) != 1 || len(s.hdlInitiator.updObjects) != 1 {
-		t.Errorf("Unexpected number of updated objects %d %d", len(m), len(s.hdlInitiator.updObjects))
+	if len(s.hdlInitiator.updObjects) != 1 {
+		t.Errorf("Unexpected number of updated objects %d", len(s.hdlInitiator.updObjects))
 	}
-	if m[0].PriorVersion != 2 {
-		t.Errorf("Unexpected version %v", m[0])
+	st := s.hdlInitiator.updObjects[objid]
+	if !st.isConflict {
+		t.Errorf("Didn't detect a conflict %v", st)
+	}
+	if st.newHead != 5 || st.oldHead != 2 {
+		t.Errorf("Conflict detection didn't succeed %v", st)
+	}
+	if st.resolvVal.Mutation.PriorVersion != 2 || st.resolvVal.Mutation.Version != 5 {
+		t.Errorf("Mutation generation is not accurate %v", st)
 	}
 	// Curlsn == 4 for the log record that resolves conflict.
 	if s.log.head.Curgen != 1 || s.log.head.Curlsn != 4 || s.log.head.Curorder != 2 {
 		t.Errorf("Data mismatch in log header %v", s.log.head)
 	}
 	// Verify DAG state.
-	if head, err := s.dag.getHead(objid); err != nil || head <= 5 {
+	if head, err := s.dag.getHead(objid); err != nil || head != 5 {
+		t.Errorf("Invalid object %d head in DAG %s, err %v", objid, head, err)
+	}
+}
+
+// TestInitiatorBlessNoConf0 tests that a local and a remote log
+// record stream can be correctly applied, when the conflict is
+// resolved by a blessing. In this test, local head of the object is
+// unchanged at the end of replay. Commands are in files
+// testdata/<local-init-00.sync,remote-noconf-link-00.log.sync>.
+func TestInitiatorBlessNoConf0(t *testing.T) {
+	dir, err := createTempDir()
+	if err != nil {
+		t.Errorf("Could not create tempdir %v", err)
+	}
+	// Set a large value to prevent the threads from firing.
+	// Test is not thread safe.
+	peerSyncInterval = 1 * time.Hour
+	garbageCollectInterval = 1 * time.Hour
+	s := NewSyncd("", "", "VeyronTab", dir, "", 0)
+
+	defer s.Close()
+	defer os.RemoveAll(dir)
+
+	if _, err = logReplayCommands(s.log, "local-init-00.sync"); err != nil {
+		t.Error(err)
+	}
+	stream, err := createReplayStream("remote-noconf-link-00.log.sync")
+	if err != nil {
+		t.Fatalf("createReplayStream failed with err %v", err)
+	}
+
+	var minGens GenVector
+	if minGens, err = s.hdlInitiator.processLogStream(stream); err != nil {
+		t.Fatalf("processLogStream failed with err %v", err)
+	}
+	if err := s.hdlInitiator.detectConflicts(); err != nil {
+		t.Fatalf("detectConflicts failed with err %v", err)
+	}
+	// Check that there are no conflicts.
+	if len(s.hdlInitiator.updObjects) != 1 {
+		t.Errorf("Too many objects %v", len(s.hdlInitiator.updObjects))
+	}
+	objid, err := strToObjID("12345")
+	if err != nil {
+		t.Errorf("Could not create objid %v", err)
+	}
+	st := s.hdlInitiator.updObjects[objid]
+	if st.isConflict {
+		t.Errorf("Detected a conflict %v", st)
+	}
+	if st.newHead != 2 || st.oldHead != 2 {
+		t.Errorf("Conflict detection didn't succeed %v", st)
+	}
+
+	if err := s.hdlInitiator.resolveConflicts(); err != nil {
+		t.Fatalf("resolveConflicts failed with err %v", err)
+	}
+	if err := s.hdlInitiator.updateStoreAndSync(nil, GenVector{}, minGens, GenVector{}, "VeyronPhone"); err != nil {
+		t.Fatalf("updateStoreAndSync failed with err %v", err)
+	}
+	if st.resolvVal.Mutation.Version != 2 {
+		t.Errorf("Mutation generation is not accurate %v", st)
+	}
+	// No new log records should be added.
+	if s.log.head.Curgen != 1 || s.log.head.Curlsn != 3 || s.log.head.Curorder != 1 {
+		t.Errorf("Data mismatch in log header %v", s.log.head)
+	}
+	// Verify DAG state.
+	if head, err := s.dag.getHead(objid); err != nil || head != 2 {
+		t.Errorf("Invalid object %d head in DAG %s, err %v", objid, head, err)
+	}
+}
+
+// TestInitiatorBlessNoConf1 tests that a local and a remote log
+// record stream can be correctly applied, when the conflict is
+// resolved by a blessing. In this test, local head of the object is
+// updated at the end of the replay. Commands are in files
+// testdata/<local-init-00.sync,remote-noconf-link-01.log.sync>.
+func TestInitiatorBlessNoConf1(t *testing.T) {
+	dir, err := createTempDir()
+	if err != nil {
+		t.Errorf("Could not create tempdir %v", err)
+	}
+	// Set a large value to prevent the threads from firing.
+	// Test is not thread safe.
+	peerSyncInterval = 1 * time.Hour
+	garbageCollectInterval = 1 * time.Hour
+	s := NewSyncd("", "", "VeyronTab", dir, "", 0)
+
+	defer s.Close()
+	defer os.RemoveAll(dir)
+
+	if _, err = logReplayCommands(s.log, "local-init-00.sync"); err != nil {
+		t.Error(err)
+	}
+	stream, err := createReplayStream("remote-noconf-link-01.log.sync")
+	if err != nil {
+		t.Fatalf("createReplayStream failed with err %v", err)
+	}
+
+	var minGens GenVector
+	if minGens, err = s.hdlInitiator.processLogStream(stream); err != nil {
+		t.Fatalf("processLogStream failed with err %v", err)
+	}
+	if err := s.hdlInitiator.detectConflicts(); err != nil {
+		t.Fatalf("detectConflicts failed with err %v", err)
+	}
+	// Check that there are no conflicts.
+	if len(s.hdlInitiator.updObjects) != 1 {
+		t.Errorf("Too many objects %v", len(s.hdlInitiator.updObjects))
+	}
+	objid, err := strToObjID("12345")
+	if err != nil {
+		t.Errorf("Could not create objid %v", err)
+	}
+	st := s.hdlInitiator.updObjects[objid]
+	if st.isConflict {
+		t.Errorf("Detected a conflict %v", st)
+	}
+	if st.newHead != 3 || st.oldHead != 2 {
+		t.Errorf("Conflict detection didn't succeed %v", st)
+	}
+
+	if err := s.hdlInitiator.resolveConflicts(); err != nil {
+		t.Fatalf("resolveConflicts failed with err %v", err)
+	}
+	if err := s.hdlInitiator.updateStoreAndSync(nil, GenVector{}, minGens, GenVector{}, "VeyronPhone"); err != nil {
+		t.Fatalf("updateStoreAndSync failed with err %v", err)
+	}
+	if st.resolvVal.Mutation.Version != 3 || st.resolvVal.Mutation.PriorVersion != 2 {
+		t.Errorf("Mutation generation is not accurate %v", st)
+	}
+	// No new log records should be added.
+	if s.log.head.Curgen != 1 || s.log.head.Curlsn != 3 || s.log.head.Curorder != 1 {
+		t.Errorf("Data mismatch in log header %v", s.log.head)
+	}
+	// Verify DAG state.
+	if head, err := s.dag.getHead(objid); err != nil || head != 3 {
+		t.Errorf("Invalid object %d head in DAG %s, err %v", objid, head, err)
+	}
+}
+
+// TestInitiatorBlessNoConf2 tests that a local and a remote log
+// record stream can be correctly applied, when the conflict is
+// resolved by a blessing. In this test, local head of the object is
+// updated at the end of the first replay. In the second replay, a
+// conflict resolved locally is rediscovered since it was also
+// resolved remotely. Commands are in files
+// testdata/<local-init-00.sync,remote-noconf-link-02.log.sync,
+// remote-noconf-link-repeat.log.sync>.
+func TestInitiatorBlessNoConf2(t *testing.T) {
+	dir, err := createTempDir()
+	if err != nil {
+		t.Errorf("Could not create tempdir %v", err)
+	}
+	// Set a large value to prevent the threads from firing.
+	// Test is not thread safe.
+	peerSyncInterval = 1 * time.Hour
+	garbageCollectInterval = 1 * time.Hour
+	s := NewSyncd("", "", "VeyronTab", dir, "", 0)
+
+	defer s.Close()
+	defer os.RemoveAll(dir)
+
+	if _, err = logReplayCommands(s.log, "local-init-00.sync"); err != nil {
+		t.Error(err)
+	}
+	stream, err := createReplayStream("remote-noconf-link-02.log.sync")
+	if err != nil {
+		t.Fatalf("createReplayStream failed with err %v", err)
+	}
+
+	var minGens GenVector
+	if minGens, err = s.hdlInitiator.processLogStream(stream); err != nil {
+		t.Fatalf("processLogStream failed with err %v", err)
+	}
+	if err := s.hdlInitiator.detectConflicts(); err != nil {
+		t.Fatalf("detectConflicts failed with err %v", err)
+	}
+	// Check that there are no conflicts.
+	if len(s.hdlInitiator.updObjects) != 1 {
+		t.Errorf("Too many objects %v", len(s.hdlInitiator.updObjects))
+	}
+	objid, err := strToObjID("12345")
+	if err != nil {
+		t.Errorf("Could not create objid %v", err)
+	}
+	st := s.hdlInitiator.updObjects[objid]
+	if st.isConflict {
+		t.Errorf("Detected a conflict %v", st)
+	}
+	if st.newHead != 4 || st.oldHead != 2 {
+		t.Errorf("Conflict detection didn't succeed %v", st)
+	}
+
+	if err := s.hdlInitiator.resolveConflicts(); err != nil {
+		t.Fatalf("resolveConflicts failed with err %v", err)
+	}
+	if err := s.hdlInitiator.updateStoreAndSync(nil, GenVector{"VeyronTab": 0}, minGens, GenVector{}, "VeyronPhone"); err != nil {
+		t.Fatalf("updateStoreAndSync failed with err %v", err)
+	}
+	if st.resolvVal.Mutation.Version != 4 || st.resolvVal.Mutation.PriorVersion != 2 {
+		t.Errorf("Mutation generation is not accurate %v", st)
+	}
+	// No new log records should be added.
+	if s.log.head.Curgen != 1 || s.log.head.Curlsn != 3 || s.log.head.Curorder != 2 {
+		t.Errorf("Data mismatch in log header %v", s.log.head)
+	}
+	// Verify DAG state.
+	if head, err := s.dag.getHead(objid); err != nil || head != 4 {
+		t.Errorf("Invalid object %d head in DAG %s, err %v", objid, head, err)
+	}
+
+	// Test simultaneous conflict resolution.
+	stream, err = createReplayStream("remote-noconf-link-repeat.log.sync")
+	if err != nil {
+		t.Fatalf("createReplayStream failed with err %v", err)
+	}
+
+	if minGens, err = s.hdlInitiator.processLogStream(stream); err != nil {
+		t.Fatalf("processLogStream failed with err %v", err)
+	}
+	if err := s.hdlInitiator.detectConflicts(); err != nil {
+		t.Fatalf("detectConflicts failed with err %v", err)
+	}
+	// Check that there are no conflicts.
+	if len(s.hdlInitiator.updObjects) != 1 {
+		t.Errorf("Too many objects %v", len(s.hdlInitiator.updObjects))
+	}
+	st = s.hdlInitiator.updObjects[objid]
+	if st.isConflict {
+		t.Errorf("Detected a conflict %v", st)
+	}
+	if st.newHead != 4 || st.oldHead != 4 {
+		t.Errorf("Conflict detection didn't succeed %v", st)
+	}
+
+	if err := s.hdlInitiator.resolveConflicts(); err != nil {
+		t.Fatalf("resolveConflicts failed with err %v", err)
+	}
+	if err := s.hdlInitiator.updateStoreAndSync(nil, GenVector{}, minGens, GenVector{}, "VeyronLaptop"); err != nil {
+		t.Fatalf("updateStoreAndSync failed with err %v", err)
+	}
+	if st.resolvVal.Mutation.Version != 4 {
+		t.Errorf("Mutation generation is not accurate %v", st)
+	}
+	// No new log records should be added.
+	if s.log.head.Curgen != 1 || s.log.head.Curlsn != 3 || s.log.head.Curorder != 3 {
+		t.Errorf("Data mismatch in log header %v", s.log.head)
+	}
+	// Verify DAG state.
+	if head, err := s.dag.getHead(objid); err != nil || head != 4 {
+		t.Errorf("Invalid object %d head in DAG %s, err %v", objid, head, err)
+	}
+}
+
+// TestInitiatorBlessConf tests that a local and a remote log record
+// stream can be correctly applied, when the conflict is resolved by a
+// blessing. Commands are in files
+// testdata/<local-init-00.sync,remote-conf-link.log.sync>.
+func TestInitiatorBlessConf(t *testing.T) {
+	dir, err := createTempDir()
+	if err != nil {
+		t.Errorf("Could not create tempdir %v", err)
+	}
+	// Set a large value to prevent the threads from firing.
+	// Test is not thread safe.
+	peerSyncInterval = 1 * time.Hour
+	garbageCollectInterval = 1 * time.Hour
+	s := NewSyncd("", "", "VeyronTab", dir, "", 0)
+
+	defer s.Close()
+	defer os.RemoveAll(dir)
+
+	if _, err = logReplayCommands(s.log, "local-init-00.sync"); err != nil {
+		t.Error(err)
+	}
+	stream, err := createReplayStream("remote-conf-link.log.sync")
+	if err != nil {
+		t.Fatalf("createReplayStream failed with err %v", err)
+	}
+
+	var minGens GenVector
+	if minGens, err = s.hdlInitiator.processLogStream(stream); err != nil {
+		t.Fatalf("processLogStream failed with err %v", err)
+	}
+	if err := s.hdlInitiator.detectConflicts(); err != nil {
+		t.Fatalf("detectConflicts failed with err %v", err)
+	}
+	// Check that there are no conflicts.
+	if len(s.hdlInitiator.updObjects) != 1 {
+		t.Errorf("Too many objects %v", len(s.hdlInitiator.updObjects))
+	}
+	objid, err := strToObjID("12345")
+	if err != nil {
+		t.Errorf("Could not create objid %v", err)
+	}
+	st := s.hdlInitiator.updObjects[objid]
+	if !st.isConflict {
+		t.Errorf("Didn't detect a conflict %v", st)
+	}
+	if st.newHead != 3 || st.oldHead != 2 || st.ancestor != 1 {
+		t.Errorf("Conflict detection didn't succeed %v", st)
+	}
+
+	if err := s.hdlInitiator.resolveConflicts(); err != nil {
+		t.Fatalf("resolveConflicts failed with err %v", err)
+	}
+	if st.resolvVal.Mutation.Version != 3 {
+		t.Errorf("Mutation generation is not accurate %v", st)
+	}
+
+	if err := s.hdlInitiator.updateStoreAndSync(nil, GenVector{}, minGens, GenVector{}, "VeyronPhone"); err != nil {
+		t.Fatalf("updateStoreAndSync failed with err %v", err)
+	}
+	if st.resolvVal.Mutation.Version != 3 || st.resolvVal.Mutation.PriorVersion != 2 {
+		t.Errorf("Mutation generation is not accurate %v", st)
+	}
+	// New log records should be added.
+	if s.log.head.Curgen != 1 || s.log.head.Curlsn != 4 || s.log.head.Curorder != 1 {
+		t.Errorf("Data mismatch in log header %v", s.log.head)
+	}
+	curRec, err := s.log.getLogRec(s.id, GenID(1), LSN(3))
+	if err != nil || curRec == nil {
+		t.Fatalf("GetLogRec() can not find object %s:1:3 in log file err %v",
+			s.id, err)
+	}
+	if curRec.ObjID != objid || curRec.RecType != LinkRec {
+		t.Errorf("Data mismatch in log record %v", curRec)
+	}
+	// Verify DAG state.
+	if head, err := s.dag.getHead(objid); err != nil || head != 3 {
 		t.Errorf("Invalid object %d head in DAG %s, err %v", objid, head, err)
 	}
 }
diff --git a/runtimes/google/vsync/replay_test.go b/runtimes/google/vsync/replay_test.go
index 41e82cd..02cd561 100644
--- a/runtimes/google/vsync/replay_test.go
+++ b/runtimes/google/vsync/replay_test.go
@@ -22,6 +22,8 @@
 	addLocal = iota
 	addRemote
 	setDevTable
+	linkLocal
+	linkRemote
 )
 
 type syncCommand struct {
@@ -140,6 +142,38 @@
 			cmd := syncCommand{cmd: setDevTable, devID: DeviceID(args[1]), genVec: genVec}
 			cmds = append(cmds, cmd)
 
+		case "linkl", "linkr":
+			expNargs := 6
+			if nargs != expNargs {
+				return nil, fmt.Errorf("%s:%d: need %d args instead of %d", file, lineno, expNargs, nargs)
+			}
+
+			version, err := strToVersion(args[2])
+			if err != nil {
+				return nil, fmt.Errorf("%s:%d: invalid version: %s", file, lineno, args[2])
+			}
+			if args[3] == "" {
+				return nil, fmt.Errorf("%s:%d: parent (to-node) version not specified", file, lineno)
+			}
+			if args[4] != "" {
+				return nil, fmt.Errorf("%s:%d: cannot specify a 2nd parent (to-node): %s", file, lineno, args[4])
+			}
+			parent, err := strToVersion(args[3])
+			if err != nil {
+				return nil, fmt.Errorf("%s:%d: invalid parent (to-node) version: %s", file, lineno, args[3])
+			}
+
+			cmd := syncCommand{version: version, parents: []storage.Version{parent}, logrec: args[5]}
+			if args[0] == "linkl" {
+				cmd.cmd = linkLocal
+			} else {
+				cmd.cmd = linkRemote
+			}
+			if cmd.objID, err = strToObjID(args[1]); err != nil {
+				return nil, fmt.Errorf("%s:%d: invalid object ID: %s", file, lineno, args[1])
+			}
+			cmds = append(cmds, cmd)
+
 		default:
 			return nil, fmt.Errorf("%s:%d: invalid operation: %s", file, lineno, args[0])
 		}
@@ -171,6 +205,20 @@
 				return fmt.Errorf("cannot add remote node %d:%d to DAG: %v", cmd.objID, cmd.version, err)
 			}
 			dag.flush()
+
+		case linkLocal:
+			if err = dag.addParent(cmd.objID, cmd.version, cmd.parents[0], false); err != nil {
+				return fmt.Errorf("cannot add local parent %d to DAG node %d:%d: %v",
+					cmd.parents[0], cmd.objID, cmd.version, err)
+			}
+			dag.flush()
+
+		case linkRemote:
+			if err = dag.addParent(cmd.objID, cmd.version, cmd.parents[0], true); err != nil {
+				return fmt.Errorf("cannot add remote parent %d to DAG node %d:%d: %v",
+					cmd.parents[0], cmd.objID, cmd.version, err)
+			}
+			dag.flush()
 		}
 	}
 	return nil
diff --git a/runtimes/google/vsync/util_test.go b/runtimes/google/vsync/util_test.go
index 783d23f..bd9356c 100644
--- a/runtimes/google/vsync/util_test.go
+++ b/runtimes/google/vsync/util_test.go
@@ -78,7 +78,7 @@
 	for _, cmd := range cmds {
 		switch cmd.cmd {
 		case addLocal:
-			err = log.processWatchRecord(cmd.objID, cmd.version, cmd.parents, &LogValue{})
+			err = log.processWatchRecord(cmd.objID, cmd.version, cmd.parents, &LogValue{Mutation: raw.Mutation{Version: cmd.version}})
 			if err != nil {
 				return nil, fmt.Errorf("cannot replay local log records %d:%s err %v",
 					cmd.objID, cmd.version, err)
@@ -121,25 +121,31 @@
 
 	stream := newStream()
 	for _, cmd := range cmds {
+		id, gnum, lsn, err := splitLogRecKey(cmd.logrec)
+		if err != nil {
+			return nil, err
+		}
+		rec := LogRec{
+			DevID:   id,
+			GNum:    gnum,
+			LSN:     lsn,
+			ObjID:   cmd.objID,
+			CurVers: cmd.version,
+			Parents: cmd.parents,
+			Value: LogValue{
+				Mutation: raw.Mutation{Version: cmd.version},
+			},
+		}
+
 		switch cmd.cmd {
 		case addRemote:
-			id, gnum, lsn, err := splitLogRecKey(cmd.logrec)
-			if err != nil {
-				return nil, err
-			}
-			rec := LogRec{
-				DevID:   id,
-				GNum:    gnum,
-				LSN:     lsn,
-				ObjID:   cmd.objID,
-				CurVers: cmd.version,
-				Parents: cmd.parents,
-				Value: LogValue{
-					Mutation: raw.Mutation{Version: cmd.version},
-				},
-			}
-			stream.add(rec)
+			rec.RecType = NodeRec
+		case linkRemote:
+			rec.RecType = LinkRec
+		default:
+			return nil, err
 		}
+		stream.add(rec)
 	}
 
 	return stream, nil
diff --git a/runtimes/google/vsync/vsync.vdl b/runtimes/google/vsync/vsync.vdl
index 4de7c9c..aa06ec9 100644
--- a/runtimes/google/vsync/vsync.vdl
+++ b/runtimes/google/vsync/vsync.vdl
@@ -15,13 +15,21 @@
 // GenVector is the generation vector.
 type GenVector map[DeviceID]GenID
 
+const (
+      // NodeRec type log record adds a new node in the dag.
+      NodeRec = byte(0)
+      // LinkRec type log record adds a new link in the dag.
+      LinkRec = byte(1)
+)
+
 // LogRec represents a single log record that is exchanged between two
 // peers.
 //
 // It contains log related metadata: DevID is the id of the
 // device that created the log record, GNum is the ID of the
-// generation that the log record is part of, and LSN is the log
-// sequence number of the log record in the generation GNum.
+// generation that the log record is part of, LSN is the log
+// sequence number of the log record in the generation GNum, 
+// and RecType is the type of log record.
 //
 // It also contains information relevant to the updates to an object
 // in the store: ObjID is the id of the object that was
@@ -34,6 +42,7 @@
   DevID    DeviceID
   GNum     GenID
   LSN      LSN
+  RecType  byte
   // Object related information.
   ObjID    storage.ID
   CurVers  storage.Version
diff --git a/runtimes/google/vsync/vsync.vdl.go b/runtimes/google/vsync/vsync.vdl.go
index 2055734..e2120db 100644
--- a/runtimes/google/vsync/vsync.vdl.go
+++ b/runtimes/google/vsync/vsync.vdl.go
@@ -35,8 +35,9 @@
 //
 // It contains log related metadata: DevID is the id of the
 // device that created the log record, GNum is the ID of the
-// generation that the log record is part of, and LSN is the log
-// sequence number of the log record in the generation GNum.
+// generation that the log record is part of, LSN is the log
+// sequence number of the log record in the generation GNum,
+// and RecType is the type of log record.
 //
 // It also contains information relevant to the updates to an object
 // in the store: ObjID is the id of the object that was
@@ -46,9 +47,10 @@
 // the object mutation.
 type LogRec struct {
 	// Log related information.
-	DevID DeviceID
-	GNum  GenID
-	LSN   LSN
+	DevID   DeviceID
+	GNum    GenID
+	LSN     LSN
+	RecType byte
 	// Object related information.
 	ObjID   storage.ID
 	CurVers storage.Version
@@ -71,6 +73,14 @@
 	Continue bool
 }
 
+const (
+	// NodeRec type log record adds a new node in the dag.
+	NodeRec = byte(0)
+
+	// LinkRec type log record adds a new link in the dag.
+	LinkRec = byte(1)
+)
+
 // Sync allows a device to GetDeltas from another device.
 // Sync is the interface the client binds and uses.
 // Sync_ExcludingUniversal is the interface without internal framework-added methods
@@ -303,6 +313,7 @@
 				_gen_wiretype.FieldType{Type: 0x41, Name: "DevID"},
 				_gen_wiretype.FieldType{Type: 0x42, Name: "GNum"},
 				_gen_wiretype.FieldType{Type: 0x45, Name: "LSN"},
+				_gen_wiretype.FieldType{Type: 0x46, Name: "RecType"},
 				_gen_wiretype.FieldType{Type: 0x47, Name: "ObjID"},
 				_gen_wiretype.FieldType{Type: 0x48, Name: "CurVers"},
 				_gen_wiretype.FieldType{Type: 0x49, Name: "Parents"},
diff --git a/runtimes/google/vsync/watcher.go b/runtimes/google/vsync/watcher.go
index a0801a7..43b2238 100644
--- a/runtimes/google/vsync/watcher.go
+++ b/runtimes/google/vsync/watcher.go
@@ -183,18 +183,9 @@
 		}
 	}
 
-	// Flush the DAG and Log DBs.  If the resume marker changed, update the device table and flush it.
-	// TODO(rdaoud): this should be conditional, e.g. don't flush if Initiator is in-progress.
-	// TODO(rdaoud): flushes can also be batched across multiple change-batches.
-	w.syncd.dag.flush()
-	if err := w.syncd.log.flush(); err != nil {
-		return fmt.Errorf("cannot flush log DB: %s", err)
-	}
+	// If the resume marker changed, update the device table.
 	if lastResmark != nil {
 		w.syncd.devtab.head.Resmark = lastResmark
-		if err := w.syncd.devtab.flush(); err != nil {
-			return fmt.Errorf("cannot flush device table DB: %s", err)
-		}
 	}
 
 	return nil
diff --git a/services/mgmt/node/impl/invoker.go b/services/mgmt/node/impl/invoker.go
index e32ff53..3a3df2e 100644
--- a/services/mgmt/node/impl/invoker.go
+++ b/services/mgmt/node/impl/invoker.go
@@ -430,7 +430,7 @@
 		return errOperationFailed
 	}
 	// Wait for the child process to start.
-	testTimeout := 2 * time.Second
+	testTimeout := 10 * time.Second
 	if err := handle.WaitForReady(testTimeout); err != nil {
 		vlog.Errorf("WaitForReady(%v) failed: %v", testTimeout, err)
 		if err := cmd.Process.Kill(); err != nil {
diff --git a/services/store/memstore/watch/watcher.go b/services/store/memstore/watch/watcher.go
index 1ef8692..225e6f4 100644
--- a/services/store/memstore/watch/watcher.go
+++ b/services/store/memstore/watch/watcher.go
@@ -106,13 +106,12 @@
 		return err
 	}
 
-	// Retrieve the initial timestamp. Changes that occured at or before the
-	// initial timestamp will not be sent.
 	resumeMarker := req.ResumeMarker
-	initialTimestamp, err := resumeMarkerToTimestamp(resumeMarker)
+	filter, err := newChangeFilter(resumeMarker)
 	if err != nil {
 		return err
 	}
+
 	if isNowResumeMarker(resumeMarker) {
 		sendChanges(stream, []watch.Change{initialStateSkippedChange})
 	}
@@ -127,9 +126,13 @@
 	if err != nil {
 		return err
 	}
-	err = processChanges(stream, changes, initialTimestamp, st.Timestamp())
-	if err != nil {
+	timestamp := st.Timestamp()
+	if send, err := filter.shouldProcessChanges(timestamp); err != nil {
 		return err
+	} else if send {
+		if err := processChanges(stream, changes, timestamp); err != nil {
+			return err
+		}
 	}
 
 	for {
@@ -142,9 +145,13 @@
 		if err != nil {
 			return err
 		}
-		err = processChanges(stream, changes, initialTimestamp, mu.Timestamp)
-		if err != nil {
+		timestamp := mu.Timestamp
+		if send, err := filter.shouldProcessChanges(timestamp); err != nil {
 			return err
+		} else if send {
+			if err := processChanges(stream, changes, timestamp); err != nil {
+				return err
+			}
 		}
 	}
 }
@@ -172,10 +179,78 @@
 	return newSyncProcessor(client)
 }
 
-func processChanges(stream watch.WatcherServiceWatchStream, changes []watch.Change, initialTimestamp, timestamp uint64) error {
-	if timestamp <= initialTimestamp {
-		return nil
+type changeFilter interface {
+	// shouldProcessChanges determines whether to process changes with the given
+	// timestamp. Changes should appear in the sequence of the store log, and
+	// timestamps should be monotonically increasing.
+	shouldProcessChanges(timestamp uint64) (bool, error)
+}
+
+type baseFilter struct {
+	// initialTimestamp is the minimum timestamp of the first change sent.
+	initialTimestamp uint64
+	// crossedInitialTimestamp is true if a change with timestamp >=
+	// initialTimestamp has already been sent.
+	crossedInitialTimestamp bool
+}
+
+// onOrAfterFilter accepts any change with timestamp >= initialTimestamp.
+type onOrAfterFilter struct {
+	baseFilter
+}
+
+// onAndAfterFilter accepts any change with timestamp >= initialTimestamp, but
+// requires the first change to have timestamp = initialTimestamp.
+type onAndAfterFilter struct {
+	baseFilter
+}
+
+// newChangeFilter creates a changeFilter that processes changes only
+// at or after the requested resumeMarker.
+func newChangeFilter(resumeMarker []byte) (changeFilter, error) {
+	if len(resumeMarker) == 0 {
+		return &onOrAfterFilter{baseFilter{0, false}}, nil
 	}
+	if isNowResumeMarker(resumeMarker) {
+		// TODO(tilaks): Get the current resume marker from the log.g
+		return &onOrAfterFilter{baseFilter{uint64(time.Now().UnixNano()), false}}, nil
+	}
+	if len(resumeMarker) != 8 {
+		return nil, ErrUnknownResumeMarker
+	}
+	return &onAndAfterFilter{baseFilter{binary.BigEndian.Uint64(resumeMarker), false}}, nil
+}
+
+func (f *onOrAfterFilter) shouldProcessChanges(timestamp uint64) (bool, error) {
+	// Bypass checks if a change with timestamp >= initialTimestamp has already
+	// been sent.
+	if !f.crossedInitialTimestamp {
+		if timestamp < f.initialTimestamp {
+			return false, nil
+		}
+	}
+	f.crossedInitialTimestamp = true
+	return true, nil
+}
+
+func (f *onAndAfterFilter) shouldProcessChanges(timestamp uint64) (bool, error) {
+	// Bypass checks if a change with timestamp >= initialTimestamp has already
+	// been sent.
+	if !f.crossedInitialTimestamp {
+		if timestamp < f.initialTimestamp {
+			return false, nil
+		}
+		if timestamp > f.initialTimestamp {
+			return false, ErrUnknownResumeMarker
+		}
+		// TODO(tilaks): if the most recent timestamp in the log is less than
+		// initialTimestamp, return ErrUnknownResumeMarker.
+	}
+	f.crossedInitialTimestamp = true
+	return true, nil
+}
+
+func processChanges(stream watch.WatcherServiceWatchStream, changes []watch.Change, timestamp uint64) error {
 	addContinued(changes)
 	addResumeMarkers(changes, timestampToResumeMarker(timestamp))
 	return sendChanges(stream, changes)
@@ -209,20 +284,6 @@
 	return bytes.Equal(resumeMarker, nowResumeMarker)
 }
 
-func resumeMarkerToTimestamp(resumeMarker []byte) (uint64, error) {
-	if len(resumeMarker) == 0 {
-		return 0, nil
-	}
-	if isNowResumeMarker(resumeMarker) {
-		// TODO(tilaks): Get the current resume marker from the log.
-		return uint64(time.Now().UnixNano()), nil
-	}
-	if len(resumeMarker) != 8 {
-		return 0, ErrUnknownResumeMarker
-	}
-	return binary.BigEndian.Uint64(resumeMarker), nil
-}
-
 func timestampToResumeMarker(timestamp uint64) []byte {
 	buf := make([]byte, 8)
 	binary.BigEndian.PutUint64(buf, timestamp)
diff --git a/services/store/memstore/watch/watcher_test.go b/services/store/memstore/watch/watcher_test.go
index 1ec7c10..cb60b0e 100644
--- a/services/store/memstore/watch/watcher_test.go
+++ b/services/store/memstore/watch/watcher_test.go
@@ -157,7 +157,7 @@
 	id1 := put(t, st, tr, "/", "val1")
 	commit(t, tr)
 
-	post1 := st.Snapshot().Find(id1).Version
+	post11 := st.Snapshot().Find(id1).Version
 
 	if err := st.Close(); err != nil {
 		t.Fatalf("Close() failed: %v", err)
@@ -177,9 +177,9 @@
 	id2 := put(t, st, tr, "/a", "val2")
 	commit(t, tr)
 
-	pre1 := post1
-	post1 = st.Snapshot().Find(id1).Version
-	post2 := st.Snapshot().Find(id2).Version
+	pre21 := post11
+	post21 := st.Snapshot().Find(id1).Version
+	post22 := st.Snapshot().Find(id2).Version
 
 	// Start a watch request.
 	req := watch.Request{}
@@ -198,11 +198,22 @@
 	ws.Cancel()
 	ws.Finish()
 
-	// Start a watch request after the initial state.
+	// Start a watch request at the initial state.
 	req = watch.Request{ResumeMarker: resumeMarker1}
 	ws = watchtesting.Watch(rootPublicID, w.Watch, req)
 
-	// Check that watch detects the changes the transaction.
+	// Check that watch detects the changes in the state and the transaction.
+	cb, err = ws.Recv()
+	if err != nil {
+		t.Error("Recv() failed: %v", err)
+	}
+	changes = cb.Changes
+	change = changes[0]
+	if change.Continued {
+		t.Error("Expected change to be the last in this transaction")
+	}
+	expectExists(t, changes, id1, storage.NoVersion, post11, true, "val1", empty)
+
 	cb, err = ws.Recv()
 	if err != nil {
 		t.Error("Recv() failed: %v", err)
@@ -216,8 +227,8 @@
 	if change.Continued {
 		t.Error("Expected change to be the last in this transaction")
 	}
-	expectExists(t, changes, id1, pre1, post1, true, "val1", dir("a", id2))
-	expectExists(t, changes, id2, storage.NoVersion, post2, false, "val2", empty)
+	expectExists(t, changes, id1, pre21, post21, true, "val1", dir("a", id2))
+	expectExists(t, changes, id2, storage.NoVersion, post22, false, "val2", empty)
 }
 
 func TestTransactionResumeMarker(t *testing.T) {
@@ -234,16 +245,16 @@
 	id1 := put(t, st, tr, "/", "val1")
 	commit(t, tr)
 
-	post1 := st.Snapshot().Find(id1).Version
+	post11 := st.Snapshot().Find(id1).Version
 
 	// Put /a
 	tr = memstore.NewTransaction()
 	id2 := put(t, st, tr, "/a", "val2")
 	commit(t, tr)
 
-	pre1 := post1
-	post1 = st.Snapshot().Find(id1).Version
-	post2 := st.Snapshot().Find(id2).Version
+	pre21 := post11
+	post21 := st.Snapshot().Find(id1).Version
+	post22 := st.Snapshot().Find(id2).Version
 
 	// Start a watch request.
 	req := watch.Request{}
@@ -262,10 +273,47 @@
 	ws.Cancel()
 	ws.Finish()
 
-	// Start a watch request after the first transaction.
+	// Start a watch request at the first transaction.
 	req = watch.Request{ResumeMarker: resumeMarker1}
 	ws = watchtesting.Watch(rootPublicID, w.Watch, req)
 
+	// Check that watch detects the changes in the first and second transaction.
+	cb, err = ws.Recv()
+	if err != nil {
+		t.Error("Recv() failed: %v", err)
+	}
+	changes = cb.Changes
+	change = changes[0]
+	if change.Continued {
+		t.Error("Expected change to be the last in this transaction")
+	}
+	expectExists(t, changes, id1, storage.NoVersion, post11, true, "val1", empty)
+
+	cb, err = ws.Recv()
+	if err != nil {
+		t.Error("Recv() failed: %v", err)
+	}
+	changes = cb.Changes
+	change = changes[0]
+	if !change.Continued {
+		t.Error("Expected change to continue the transaction")
+	}
+	change = changes[1]
+	if change.Continued {
+		t.Error("Expected change to be the last in this transaction")
+	}
+	resumeMarker2 := change.ResumeMarker
+	expectExists(t, changes, id1, pre21, post21, true, "val1", dir("a", id2))
+	expectExists(t, changes, id2, storage.NoVersion, post22, false, "val2", empty)
+
+	// Cancel the watch request.
+	ws.Cancel()
+	ws.Finish()
+
+	// Start a watch request at the second transaction.
+	req = watch.Request{ResumeMarker: resumeMarker2}
+	ws = watchtesting.Watch(rootPublicID, w.Watch, req)
+
 	// Check that watch detects the changes in the second transaction.
 	cb, err = ws.Recv()
 	if err != nil {
@@ -280,8 +328,8 @@
 	if change.Continued {
 		t.Error("Expected change to be the last in this transaction")
 	}
-	expectExists(t, changes, id1, pre1, post1, true, "val1", dir("a", id2))
-	expectExists(t, changes, id2, storage.NoVersion, post2, false, "val2", empty)
+	expectExists(t, changes, id1, pre21, post21, true, "val1", dir("a", id2))
+	expectExists(t, changes, id2, storage.NoVersion, post22, false, "val2", empty)
 }
 
 func TestNowResumeMarker(t *testing.T) {
@@ -295,10 +343,15 @@
 
 	// Put /
 	tr := memstore.NewTransaction()
-	id1 := put(t, st, tr, "/", "val1")
+	put(t, st, tr, "/", "val1")
 	commit(t, tr)
 
-	post1 := st.Snapshot().Find(id1).Version
+	// Put /a
+	tr = memstore.NewTransaction()
+	id2 := put(t, st, tr, "/a", "val2")
+	commit(t, tr)
+
+	post22 := st.Snapshot().Find(id2).Version
 
 	// Start a watch request with the "now" resume marker.
 	req := watch.Request{ResumeMarker: nowResumeMarker}
@@ -307,6 +360,15 @@
 	// Give watch some time to pick "now".
 	time.Sleep(time.Second)
 
+	// Put /a/b
+	tr = memstore.NewTransaction()
+	id3 := put(t, st, tr, "/a/b", "val3")
+	commit(t, tr)
+
+	pre32 := post22
+	post32 := st.Snapshot().Find(id2).Version
+	post33 := st.Snapshot().Find(id3).Version
+
 	// Check that watch announces that the initial state was skipped.
 	cb, err := ws.Recv()
 	if err != nil {
@@ -316,16 +378,7 @@
 	change := changes[0]
 	expectInitialStateSkipped(t, change)
 
-	// Put /a
-	tr = memstore.NewTransaction()
-	id2 := put(t, st, tr, "/a", "val2")
-	commit(t, tr)
-
-	pre1 := post1
-	post1 = st.Snapshot().Find(id1).Version
-	post2 := st.Snapshot().Find(id2).Version
-
-	// Check that watch detects the changes in the second transaction.
+	// Check that watch detects the changes in the third transaction.
 	cb, err = ws.Recv()
 	if err != nil {
 		t.Error("Recv() failed: %v", err)
@@ -339,25 +392,40 @@
 	if change.Continued {
 		t.Error("Expected change to be the last in this transaction")
 	}
-	expectExists(t, changes, id1, pre1, post1, true, "val1", dir("a", id2))
-	expectExists(t, changes, id2, storage.NoVersion, post2, false, "val2", empty)
+	expectExists(t, changes, id2, pre32, post32, false, "val2", dir("b", id3))
+	expectExists(t, changes, id3, storage.NoVersion, post33, false, "val3", empty)
 }
 
 func TestUnknownResumeMarkers(t *testing.T) {
 	// Create a new store.
-	dbName, _, cleanup := createStore(t)
+	dbName, st, cleanup := createStore(t)
 	defer cleanup()
 
 	// Create a new watcher.
 	w, cleanup := createWatcher(t, dbName)
 	defer cleanup()
 
-	// Start a watch request with a resume marker that is too long.
-	resumeMarker := make([]byte, 9)
+	// Put /
+	tr := memstore.NewTransaction()
+	put(t, st, tr, "/", "val1")
+	commit(t, tr)
+
+	// Start a watch request with a resume marker that's too early.
+	resumeMarker := timestampToResumeMarker(1)
 	req := watch.Request{ResumeMarker: resumeMarker}
 	ws := watchtesting.Watch(rootPublicID, w.Watch, req)
 
-	// The resume marker should be too long.
+	// The resume marker should be unknown.
+	if err := ws.Finish(); err != ErrUnknownResumeMarker {
+		t.Errorf("Unexpected error: %v", err)
+	}
+
+	// Start a watch request with a resume marker that's too late.
+	resumeMarker = timestampToResumeMarker(2 ^ 63 - 1)
+	req = watch.Request{ResumeMarker: resumeMarker}
+	ws = watchtesting.Watch(rootPublicID, w.Watch, req)
+
+	// The resume marker should be unknown.
 	if err := ws.Finish(); err != ErrUnknownResumeMarker {
 		t.Errorf("Unexpected error: %v", err)
 	}
diff --git a/tools/naming/simulator/clock.scr b/tools/naming/simulator/clock.scr
new file mode 100644
index 0000000..2df07af
--- /dev/null
+++ b/tools/naming/simulator/clock.scr
@@ -0,0 +1,66 @@
+#
+# Simple example to show how names work and are used both without
+# and with a mount table.
+#
+
+# A 'stand-alone' server, with an internal namespace of 'mt/...'
+clockServer "" mt clock
+set STAND_ALONE_CLOCK_NAME=$NAME
+
+set N=$STAND_ALONE_CLOCK_NAME
+print "Stand alone clock server at" $N
+time $N "Using $N"
+set N=/$ADDR//mt
+time $N "Using $N"
+
+# Run a root MountTable.
+rootMT mt
+set ROOT_ADDR=$MT_ADDR ROOT_NAME=$MT_NAME
+print ""
+print "Root MountTable at $ROOT_NAME"
+
+clockServer $MT_NAME clocks home_clock
+set CLOCK_NAME=/$ADDR//clocks
+print "Running Clock Server at $CLOCK_NAME"
+
+# Still bypassing the MountTable
+time $CLOCK_NAME bar
+
+# Now, let's use the MountTable
+setLocalRoots $ROOT_NAME
+
+set N=home_clock
+resolve $N
+print $N -> $R0
+
+set N=home_clock/clocks
+resolve $N
+print $N -> $R0
+
+set N=/$ROOT_ADDR/home_clock/clocks
+resolve $N
+print $N -> $R0
+
+set N=home_clock/clocks
+time $N "Using $N"
+set N=$ROOT_NAME/home_clock/clocks
+time $N "Using $N"
+
+# ls * returns home_clock
+ls *
+# ls ... returns "" and home_clock - i.e two items. Is this a bug?
+ls ...
+
+# These all behave as above
+lsmt $ROOT_NAME *
+lsmt $ROOT_NAME ...
+
+# Conclusion: some of this behaviour seems a little awkward. In particular:
+#
+# The client neeeds to use a different form of the name depending on whether
+# a MountTable is used or not. If a MountTable is not used, then the internal
+# 'suffix' (/mt) in the examples above must be used. If a MountTable is used
+# then the internal suffix must not be included in the name.
+#
+# ls ... seems to always return an extra, zero length, string as entry
+#
diff --git a/tools/naming/simulator/driver.go b/tools/naming/simulator/driver.go
index 4a48737..4206289 100644
--- a/tools/naming/simulator/driver.go
+++ b/tools/naming/simulator/driver.go
@@ -78,6 +78,9 @@
 	for scanner.Scan() {
 		line := scanner.Text()
 		if !strings.HasPrefix(line, "#") && len(line) > 0 {
+			if line == "eof" {
+				break
+			}
 			if err := process(line, lineno); err != nil {
 				if debug {
 					fmt.Printf("%d> %s: %v\n", lineno, line, err)
@@ -122,17 +125,17 @@
 	if factory == nil {
 		return fmt.Errorf("unrecognised command %q", name)
 	}
-
 	if vars, output, _, err := factory().Run(sub); err != nil {
 		return err
 	} else {
 		if debug || interactive {
+			fmt.Printf("%d> %s\n", lineno, line)
+		}
+		if len(output) > 0 {
 			if !interactive {
-				fmt.Printf("%d> %s\n", lineno, line)
+				fmt.Printf("%d> ", lineno)
 			}
-			if len(output) > 0 {
-				fmt.Printf("%s\n", strings.Join(output, " "))
-			}
+			fmt.Printf("%s\n", strings.Join(output, " "))
 		}
 		if debug && len(vars) > 0 {
 			for k, v := range vars {
diff --git a/tools/naming/simulator/mt.scr b/tools/naming/simulator/mt.scr
new file mode 100644
index 0000000..4ebbb1a
--- /dev/null
+++ b/tools/naming/simulator/mt.scr
@@ -0,0 +1,90 @@
+#
+# Example showing multiple mount tables, servers and globing
+#
+
+rootMT ""
+set ROOT_MT_NAME=$MT_NAME ROOT_MT_ADDR=$MT_ADDR
+set ROOT=$ROOT_MT_NAME
+
+nodeMT $ROOT "mt" "usa"
+set USA_MT=$MT_NAME
+
+nodeMT $USA_MT_NAME "mt" "palo alto"
+set PA_MT=$MT_NAME
+
+
+print "--------- Resolve ROOT, ROOT/usa, ROOT/usa/mt/palo alto"
+print "--------- Each should return a different address"
+resolve $ROOT
+resolve $ROOT/usa
+resolve "$ROOT/usa/mt/palo alto"
+
+print "--------- ResolveMT ROOT, ROOT/usa, ROOT/usa/mt - should return the ROOT MT"
+resolveMT $ROOT
+resolveMT $ROOT/usa
+resolveMT $ROOT/usa/mt
+
+print "--------- ResolveMT ROOT/usa/mt/palo alto, palo alto/mt should return the USA MT"
+resolveMT "$ROOT/usa/mt/palo alto"
+resolveMT "$ROOT/usa/mt/palo alto/mt"
+
+print "--------- ResolveMT ROOT/usa/mt/palo alto/mt/bar should return the palo alto MT"
+resolveMT "$ROOT/usa/mt/palo alto/mt/bar"
+
+# should return a complete hiearchy....
+setLocalRoots $ROOT
+print "--------- setLocalRoots to global root"
+print "--------- ls ..."
+ls ...
+print "--------- ls *"
+ls *
+
+print "--------- setLocalRoots to usa"
+setLocalRoots $USA_MT
+ls ...
+
+print "--------- setLocalRoots to palo alto"
+setLocalRoots $PA_MT
+ls ...
+
+nodeMT $ROOT "mt" "uk"
+set UK_MT=$MT_NAME
+
+nodeMT $UK_MT "mt" "cambridge"
+set CAM_MT=$MT_NAME
+
+setLocalRoots $ROOT
+ls ...
+setLocalRoots $UK_MT
+ls ...
+setLocalRoots $CAM_MT
+ls ...
+
+# Create a MountTable tree without using the internal 'mt' suffix as in the
+# examples above.
+nodeMT $ROOT "" "france"
+set FRANCE_MT=$MT_NAME
+nodeMT $FRANCE_MT "" "paris"
+setLocalRoots $ROOT
+ls ...
+
+# Conclusion: some of this behaviour seems a little awkward. In particular:
+#
+# ls (i.e. glob) ... on the root doesn't seem to iterate down the name
+# space hierarchy, I was under the impression that the local MountTable
+# client would do so? Well, it turns out that this only works if the
+# internal suffix (mt in the USA/UK trees, empty in the France tree) is
+# empty!!
+#
+# ls using the local MountTable on a rooted name doesn't seem to work either,
+# thus making it impossible to see the whole name space without setting the
+# local MountTable's root which will clearly cause problems for concurrent
+# clients.
+#
+# I find the distinction between the "local MountTable client" and
+# "remote MountTable or local MountTable server" awkward. Maybe we should
+# call the "local MountTable client" the "Resolver?".
+#
+
+
+