Merge "veyron/services/mounttable/lib: demonstrate effect of globbing "..." pattern against the root of the namespace for a mounttable."
diff --git a/examples/boxes/android/src/boxesp2p/main.go b/examples/boxes/android/src/boxesp2p/main.go
index 90cae68..6be5812 100644
--- a/examples/boxes/android/src/boxesp2p/main.go
+++ b/examples/boxes/android/src/boxesp2p/main.go
@@ -59,7 +59,6 @@
import (
"bytes"
"fmt"
- "io"
"net"
"os"
"runtime"
@@ -94,7 +93,7 @@
runtime veyron2.Runtime
store *storage.Server
ipc ipc.Server
- drawStream boxes.DrawInterfaceDrawStream
+ drawStream boxes.DrawInterfaceServiceDrawStream
signalling boxes.BoxSignalling
boxList chan boxes.Box
myIPAddr string
@@ -141,48 +140,14 @@
}
// Launch the sync service
initSyncService(endPt.Addr().String())
- // Watch the store for any box updates
- go gs.watchStore()
+ // Watch/Update the store for any box changes
+ go gs.monitorStore()
return nil
}
-func (gs *goState) watchStore() {
- rst, err := raw.BindStore(naming.JoinAddressName(gs.storeEndpoint, raw.RawStoreSuffix))
- if err != nil {
- panic(fmt.Errorf("Failed to Bind Store:%v", err))
- }
- ctx := gs.runtime.NewContext()
- req := watch.Request{Query: query.Query{}}
- stream, err := rst.Watch(ctx, req, veyron2.CallTimeout(ipc.NoTimeout))
- if err != nil {
- panic(fmt.Errorf("Can't watch store: %s: %s", gs.storeEndpoint, err))
- }
- for {
- cb, err := stream.Recv()
- if err != nil {
- panic(fmt.Errorf("Can't receive watch event: %s: %s", gs.storeEndpoint, err))
- }
- for _, change := range cb.Changes {
- if mu, ok := change.Value.(*raw.Mutation); ok && len(mu.Dir) == 0 {
- if box, ok := mu.Value.(boxes.Box); ok {
- nativeJava.addBox(&box)
- }
- }
- }
- }
-}
-
func (gs *goState) Draw(context ipc.ServerContext, stream boxes.DrawInterfaceServiceDrawStream) error {
- for {
- box, err := stream.Recv()
- if err == io.EOF {
- break
- }
- if err != nil {
- return err
- }
- nativeJava.addBox(&box)
- }
+ gs.drawStream = stream
+ gs.streamBoxesLoop()
return nil
}
@@ -190,7 +155,18 @@
gs.boxList <- box
}
-func (gs *goState) sendDrawLoop() {
+func (gs *goState) streamBoxesLoop() {
+ // Loop to receive boxes from remote peer
+ go func() {
+ for {
+ box, err := gs.drawStream.Recv()
+ if err != nil {
+ return
+ }
+ nativeJava.addBox(&box)
+ }
+ }()
+ // Loop to send boxes to remote peer
for {
if err := gs.drawStream.Send(<-gs.boxList); err != nil {
break
@@ -198,9 +174,35 @@
}
}
-func (gs *goState) updateStore(endpoint string) {
- initSyncService(endpoint)
+func (gs *goState) monitorStore() {
ctx := gs.runtime.NewContext()
+
+ // Watch for any box updates from the store
+ go func() {
+ rst, err := raw.BindStore(naming.JoinAddressName(gs.storeEndpoint, raw.RawStoreSuffix))
+ if err != nil {
+ panic(fmt.Errorf("Failed to raw.Bind Store:%v", err))
+ }
+ req := watch.Request{Query: query.Query{}}
+ stream, err := rst.Watch(ctx, req, veyron2.CallTimeout(ipc.NoTimeout))
+ if err != nil {
+ panic(fmt.Errorf("Can't watch store: %s: %s", gs.storeEndpoint, err))
+ }
+ for {
+ cb, err := stream.Recv()
+ if err != nil {
+ panic(fmt.Errorf("Can't receive watch event: %s: %s", gs.storeEndpoint, err))
+ }
+ for _, change := range cb.Changes {
+ if mu, ok := change.Value.(*raw.Mutation); ok && len(mu.Dir) == 0 {
+ if box, ok := mu.Value.(boxes.Box); ok && box.DeviceId != gs.myIPAddr {
+ nativeJava.addBox(&box)
+ }
+ }
+ }
+ }
+ }()
+ // Send any box updates to the store
vst, err := vstore.New(gs.storeEndpoint)
if err != nil {
panic(fmt.Errorf("Failed to init veyron store:%v", err))
@@ -232,7 +234,6 @@
if err != nil {
panic(fmt.Errorf("Failed runtime.NewServer:%v", err))
}
-
drawServer := boxes.NewServerDrawInterface(gs)
if err := srv.Register("draw", ipc.SoloDispatcher(drawServer, auth)); err != nil {
panic(fmt.Errorf("Failed Register:%v", err))
@@ -255,12 +256,11 @@
if err != nil {
panic(fmt.Errorf("failed BindDrawInterface:%v", err))
}
- if gs.drawStream, err = drawInterface.Draw(gs.runtime.TODOContext()); err != nil {
- panic(fmt.Errorf("failed to get handle to Draw stream:%v\n", err))
- }
- gs.boxList = make(chan boxes.Box, 256)
if !useStoreService {
- go gs.sendDrawLoop()
+ if gs.drawStream, err = drawInterface.Draw(gs.runtime.TODOContext()); err != nil {
+ panic(fmt.Errorf("failed to get handle to Draw stream:%v\n", err))
+ }
+ go gs.streamBoxesLoop()
} else {
// Initialize the store sync service that listens for updates from a peer
endpoint, err := inaming.NewEndpoint(endpointStr)
@@ -270,7 +270,8 @@
if err = drawInterface.SyncBoxes(gs.runtime.TODOContext()); err != nil {
panic(fmt.Errorf("failed to setup remote sync service:%v", err))
}
- go gs.updateStore(endpoint.Addr().String())
+ initSyncService(endpoint.Addr().String())
+ go gs.monitorStore()
}
}
@@ -293,7 +294,7 @@
//export Java_com_boxes_GoNative_sendDrawBox
func Java_com_boxes_GoNative_sendDrawBox(env *C.JNIEnv, class C.jclass,
boxId C.jstring, ox C.jfloat, oy C.jfloat, cx C.jfloat, cy C.jfloat) {
- gs.sendBox(boxes.Box{BoxId: C.GoString(C.JToCString(env, boxId)), Points: [4]float32{float32(ox), float32(oy), float32(cx), float32(cy)}})
+ gs.sendBox(boxes.Box{DeviceId: gs.myIPAddr, BoxId: C.GoString(C.JToCString(env, boxId)), Points: [4]float32{float32(ox), float32(oy), float32(cx), float32(cy)}})
}
//export Java_com_boxes_GoNative_registerAddBox
@@ -377,6 +378,8 @@
panic(fmt.Errorf("Failed to remove remnant store files:%v\n", err))
}
+ gs.boxList = make(chan boxes.Box, 256)
+
// Initialize veyron runtime and bind to the signalling server used to rendezvous with
// another peer device. TODO(gauthamt): Switch to using the nameserver for signalling.
gs.runtime = rt.Init(veyron2.LocalID(privateID))
diff --git a/examples/boxes/boxes.vdl b/examples/boxes/boxes.vdl
index b34341d..1736cd4 100644
--- a/examples/boxes/boxes.vdl
+++ b/examples/boxes/boxes.vdl
@@ -13,6 +13,8 @@
// Box describes the name and co-ordinates of a given box that
// is displayed in the View of a peer device.
type Box struct {
+ // DeviceID that generated the box
+ DeviceId string
// BoxId is a unique name for a box
BoxId string
// Points are the co-ordinates of a given box
diff --git a/examples/boxes/boxes.vdl.go b/examples/boxes/boxes.vdl.go
index f6a561c..5f73ee5 100644
--- a/examples/boxes/boxes.vdl.go
+++ b/examples/boxes/boxes.vdl.go
@@ -19,6 +19,8 @@
// Box describes the name and co-ordinates of a given box that
// is displayed in the View of a peer device.
type Box struct {
+ // DeviceID that generated the box
+ DeviceId string
// BoxId is a unique name for a box
BoxId string
// Points are the co-ordinates of a given box
@@ -464,6 +466,7 @@
result.TypeDefs = []_gen_vdl.Any{
_gen_wiretype.NamedPrimitiveType{Type: 0x1, Name: "error", Tags: []string(nil)}, _gen_wiretype.ArrayType{Elem: 0x19, Len: 0x4, Name: "", Tags: []string(nil)}, _gen_wiretype.StructType{
[]_gen_wiretype.FieldType{
+ _gen_wiretype.FieldType{Type: 0x3, Name: "DeviceId"},
_gen_wiretype.FieldType{Type: 0x3, Name: "BoxId"},
_gen_wiretype.FieldType{Type: 0x42, Name: "Points"},
},
diff --git a/lib/testutil/modules/ls.go b/lib/testutil/modules/ls.go
index daac425..b0e21cf 100644
--- a/lib/testutil/modules/ls.go
+++ b/lib/testutil/modules/ls.go
@@ -4,6 +4,7 @@
"fmt"
"io"
"strconv"
+ "strings"
"veyron2/rt"
"veyron2/services/mounttable"
@@ -71,12 +72,11 @@
return nil, nil, nil, err
} else {
vars := make(Variables)
- if len(r) > 1 {
- for i, v := range r[:len(r)-1] {
- k := "R" + strconv.Itoa(i)
- vars.Update(k, v)
- }
+ for i, v := range r {
+ k := "R" + strconv.Itoa(i)
+ vars.Update(k, v)
}
+ vars.Update("ALL", strings.Join(r, ","))
return vars, r, nil, nil
}
}
diff --git a/runtimes/google/lib/sshagent/sshagent.go b/runtimes/google/lib/sshagent/sshagent.go
deleted file mode 100644
index b7e4575..0000000
--- a/runtimes/google/lib/sshagent/sshagent.go
+++ /dev/null
@@ -1,25 +0,0 @@
-// Package sshagent implements a type to use ssh-agent to store ECDSA private keys and sign slices using them.
-//
-// For unix-based systems, the implementation is based on the OpenSSH implementation of ssh-agent, with the protocol defined in
-// revision 1.7 of the PROTOCOL.agent file (http://www.openbsd.org/cgi-bin/cvsweb/src/usr.bin/ssh/PROTOCOL.agent?rev=1.7;content-type=text%2Fplain)
-package sshagent
-
-import (
- "crypto/ecdsa"
- "math/big"
-)
-
-// Agent is the interface for communicating with an ssh-agent process.
-type Agent interface {
- // List returns the set of public keys and comments associated with them
- // stored in the SSH agent.
- List() (keys []*ecdsa.PublicKey, comments []string, err error)
- // Add adds a (private key, comment) pair to the SSH agent.
- Add(priv *ecdsa.PrivateKey, comment string) error
- // Remove removes the private key associated with the provided public key
- // from the SSH agent.
- Remove(pub *ecdsa.PublicKey) error
- // Sign signs the SHA-256 hash of data using the private key stored in the
- // SSH agent correponding to pub.
- Sign(pub *ecdsa.PublicKey, data []byte) (R, S *big.Int, err error)
-}
diff --git a/runtimes/google/lib/sshagent/sshagent_unix.go b/runtimes/google/lib/sshagent/sshagent_unix.go
deleted file mode 100644
index 325e624..0000000
--- a/runtimes/google/lib/sshagent/sshagent_unix.go
+++ /dev/null
@@ -1,370 +0,0 @@
-package sshagent
-
-import (
- "bytes"
- "crypto/ecdsa"
- "crypto/elliptic"
- "encoding/binary"
- "fmt"
- "io"
- "math/big"
- "net"
- "os"
- "strings"
-)
-
-// New returns an Agent implementation that uses the environment variable SSH_AUTH_SOCK
-// to configure communication with a running ssh-agent.
-func New() (Agent, error) {
- sock := os.Getenv(SSH_AUTH_SOCK)
- if len(sock) == 0 {
- return nil, fmt.Errorf("SSH_AUTH_SOCK not set")
- }
- return &agent{addr: sock}, nil
-}
-
-const (
- // Requests from client to agent (Section 3.2 of PROTOCOL.agent)
- SSH2_AGENTC_REQUEST_IDENTITIES = 11
- SSH2_AGENTC_SIGN_REQUEST = 13
- SSH2_AGENTC_ADD_IDENTITY = 17
- SSH2_AGENTC_REMOVE_IDENTITY = 18
- SSH2_AGENTC_REMOVE_ALL_IDENTITIES = 19
- SSH2_AGENTC_ADD_ID_CONSTRAINED = 25
-
- // Key-type independent requests from client to agent (Section 3.3 of PROTOCOL.agent)
- SSH_AGENTC_ADD_SMARTCARD_KEY = 20
- SSH_AGENTC_REMOVE_SMARTCARD_KEY = 21
- SSH_AGENTC_LOCK = 22
- SSH_AGENTC_UNLOCK = 23
-
- // Generic replies from agent to client (Section 3.4)
- SSH2_AGENT_FAILURE = 5
- SSH2_AGENT_SUCCESS = 6
-
- // Replies from agent to client for protocol 1 key operations (Section 3.6 of PROTOCOL.agent)
- SSH2_AGENT_IDENTITIES_ANSWER = 12
- SSH2_AGENT_SIGN_RESPONSE = 14
- SSH_AGENTC_ADD_SMARTCARD_KEY_CONSTRAINED = 26
-
- // Key constraint identifiers (Section 3.7 of PROTOCOL.agent)
- SSH_AGENT_CONSTRAIN_LIFETIME = 1
- SSH_AGENT_CONSTRAIN_CONFIRM = 2
-
- // SSH_AUTH_SOCK is the name of the environment variable containing the path to the
- // unix socket to communicate with the SSH agent.
- SSH_AUTH_SOCK = "SSH_AUTH_SOCK"
-
- ecdsaKeyPrefix = "ecdsa-sha2-"
-)
-
-var curvenames = map[string]elliptic.Curve{
- "nistp256": elliptic.P256(),
- "nistp384": elliptic.P384(),
- "nistp521": elliptic.P521(),
-}
-
-func name2curve(name string) (elliptic.Curve, error) {
- curve, exists := curvenames[name]
- if !exists {
- return nil, fmt.Errorf("unrecognized elliptic curve: %q", name)
- }
- return curve, nil
-}
-
-func curve2name(curve elliptic.Curve) (string, error) {
- for n, c := range curvenames {
- if c == curve {
- return n, nil
- }
- }
- return "", fmt.Errorf("unregistered elliptic curve: %v", curve)
-}
-
-func readU32(r io.Reader) (uint32, error) {
- var ret uint32
- err := binary.Read(r, binary.BigEndian, &ret)
- return ret, err
-}
-
-func read(r io.Reader) ([]byte, error) {
- size, err := readU32(r)
- if err != nil {
- return nil, fmt.Errorf("failed to read length header: %v", err)
- }
- msg := make([]byte, size)
- if _, err := io.ReadFull(r, msg); err != nil {
- return nil, fmt.Errorf("failed to read %d bytes: %v", size, err)
- }
- return msg, nil
-}
-
-func readString(r io.Reader) (string, error) {
- b, err := read(r)
- if err != nil {
- return "", err
- }
- return string(b), nil
-}
-
-func readcurve(r io.Reader) (elliptic.Curve, error) {
- str, err := readString(r)
- if err != nil {
- return nil, fmt.Errorf("failed to read ECDSA curve: %v", err)
- }
- return name2curve(str)
-}
-
-func write(msg []byte, w io.Writer) error {
- if err := binary.Write(w, binary.BigEndian, uint32(len(msg))); err != nil {
- return err
- }
- if n, err := w.Write(msg); err != nil {
- return err
- } else if n != len(msg) {
- return fmt.Errorf("wrote %d, wanted to write %d bytes", n, len(msg))
- }
- return nil
-}
-
-func blob2ecdsa(blob []byte) (*ecdsa.PublicKey, error) {
- buf := bytes.NewBuffer(blob)
- keytype, err := readString(buf)
- if err != nil {
- return nil, fmt.Errorf("failed to read key type: %v", err)
- }
- // the curve is specified as a key type (e.g. ecdsa-sha2-nistp256)
- // and then again as a string. The two should match.
- if !strings.HasPrefix(keytype, ecdsaKeyPrefix) {
- return nil, fmt.Errorf("not a recognized ECDSA key type: %q", keytype)
- }
- keycurve, err := name2curve(strings.TrimPrefix(keytype, ecdsaKeyPrefix))
- if err != nil {
- return nil, err
- }
- curve, err := readcurve(buf)
- if err != nil {
- return nil, err
- }
- if curve != keycurve {
- return nil, fmt.Errorf("ECDSA curve does not match key type")
- }
- marshaled, err := read(buf)
- if err != nil {
- return nil, fmt.Errorf("failed to read marshaled ECDSA point: %v", err)
- }
- x, y := elliptic.Unmarshal(curve, marshaled)
- if x == nil {
- return nil, fmt.Errorf("failed to unmarshal ECDSA point: %v", err)
- }
- return &ecdsa.PublicKey{Curve: curve, X: x, Y: y}, nil
-}
-
-type agent struct {
- addr string
- conn io.ReadWriter
-}
-
-func (a *agent) connect() error {
- if a.conn != nil {
- return nil
- }
- var err error
- if a.conn, err = net.Dial("unix", a.addr); err != nil {
- return fmt.Errorf("failed to connect to %v=%v", SSH_AUTH_SOCK, a.addr)
- }
- return nil
-}
-
-func (a *agent) call(request []byte) ([]byte, error) {
- if err := a.connect(); err != nil {
- return nil, err
- }
- if err := write(request, a.conn); err != nil {
- a.conn = nil
- return nil, fmt.Errorf("failed to send request: %v", err)
- }
- response, err := read(a.conn)
- if err != nil {
- a.conn = nil
- return nil, fmt.Errorf("failed to read response: %v", err)
- }
- return response, nil
-}
-
-// Section 2.5.2 of PROTOCOL.agent
-func (a *agent) List() (keys []*ecdsa.PublicKey, comments []string, err error) {
- reply, err := a.call([]byte{SSH2_AGENTC_REQUEST_IDENTITIES})
- if err != nil {
- return nil, nil, fmt.Errorf("SSH2_AGENTC_REQUEST_IDENTITIES call failed: %v", err)
- }
- if len(reply) == 0 {
- return nil, nil, fmt.Errorf("SSH2_AGENTC_REQUEST_IDENTITES call received empty reply")
- }
- if got, want := reply[0], byte(SSH2_AGENT_IDENTITIES_ANSWER); got != want {
- return nil, nil, fmt.Errorf("unexpected reply from ssh-agent: got %v, want SSH2_AGENT_IDENTITIES_ANSWER(%v)", got, want)
- }
- buf := bytes.NewBuffer(reply[1:])
- num, err := readU32(buf)
- if err != nil {
- return nil, nil, fmt.Errorf("failed to read num identities from SSH2_AGENT_IDENTITIES_ANSWER: %v", err)
- }
- for i := uint32(0); i < num; i++ {
- blob, err := read(buf)
- if err != nil {
- return nil, nil, fmt.Errorf("failed to read key blob for key %d of %d in SSH2_AGENT_IDENTITIES_ANSWER: %v", i, num, err)
- }
- comment, err := readString(buf)
- if err != nil {
- return nil, nil, fmt.Errorf("failed to read comment for key %d of %d in SSH2_AGENT_IDENTITIES_ANSWER: %v", i, num, err)
- }
- key, _ := blob2ecdsa(blob)
- if key != nil {
- keys = append(keys, key)
- comments = append(comments, comment)
- }
- }
- return
-}
-
-func (a *agent) checkSSH2_AGENT_SUCCESS(request []byte, description string) error {
- reply, err := a.call(request)
- if err != nil {
- return fmt.Errorf("%v call failed: %v", description, err)
- }
- if len(reply) != 1 || reply[0] != SSH2_AGENT_SUCCESS {
- return fmt.Errorf("got reply %v for %v, wanted [SSH2_AGENT_SUCCESS(%d)]", reply, description, SSH2_AGENT_SUCCESS)
- }
- return nil
-}
-
-func bignum(n *big.Int) []byte {
- bytes := n.Bytes()
- // Prepend 0 if sign bit is set.
- // See sshbuf_put_bignum2 in ssh/sshbuf-getput-crypto.c revision 1.1:
- // http://www.openbsd.org/cgi-bin/cvsweb/src/usr.bin/ssh/sshbuf-getput-crypto.c?rev=1.1;content-type=text%2Fx-cvsweb-markup
- if bytes[0]&0x80 != 0 {
- return append([]byte{0}, bytes...)
- }
- return bytes
-}
-
-// Section 2.2.3 of PROTOCOL.agent
-func (a *agent) Add(key *ecdsa.PrivateKey, comment string) error {
- if key.D.Sign() < 0 || key.PublicKey.X.Sign() < 0 || key.PublicKey.Y.Sign() < 0 {
- // As of June 2014, ssh did not like negative bignums.
- // See sshbuf_get_bignum2 in ssh/sshbuf-getput-crypto.c revision 1.1:
- // http://www.openbsd.org/cgi-bin/cvsweb/src/usr.bin/ssh/sshbuf-getput-crypto.c?rev=1.1;content-type=text%2Fx-cvsweb-markup
- return fmt.Errorf("ssh-agent does not support negative big numbers")
- }
- curve, err := curve2name(key.PublicKey.Curve)
- if err != nil {
- return err
- }
- var request bytes.Buffer
- w := func(data []byte) {
- if err == nil {
- err = write(data, &request)
- }
- }
- if err := request.WriteByte(SSH2_AGENTC_ADD_IDENTITY); err != nil {
- return err
- }
- w([]byte(ecdsaKeyPrefix + curve)) // key type: e.g., ecdsa-sha2-nistp256
- w([]byte(curve)) // ecdsa curve: e.g. nistp256
- w(elliptic.Marshal(key.PublicKey.Curve, key.PublicKey.X, key.PublicKey.Y))
- w(bignum(key.D))
- w([]byte(comment))
- if err != nil {
- return err
- }
- return a.checkSSH2_AGENT_SUCCESS(request.Bytes(), "SSH2_AGENTC_ADD_IDENTITY")
-}
-
-// keyblob produces an encoding of an ecdsa.PublicKey as per Section 3.1 in RFC 5656,
-// (extending Section 6.6 of RFC 4253),
-// required by the SSH2_AGENTC_REMOVE_IDENTITY and SSH2_AGENTC_SIGN_REQUEST
-// (Section 2.4.2 and Section 2.6.2 in PROTOCOL.agent).
-func keyblob(key *ecdsa.PublicKey) ([]byte, error) {
- curve, err := curve2name(key.Curve)
- if err != nil {
- return nil, err
- }
- var blob bytes.Buffer
- if err := write([]byte(ecdsaKeyPrefix+curve), &blob); err != nil {
- return nil, err
- }
- if err := write([]byte(curve), &blob); err != nil {
- return nil, err
- }
- if err := write(elliptic.Marshal(key.Curve, key.X, key.Y), &blob); err != nil {
- return nil, err
- }
- return blob.Bytes(), nil
-}
-
-func (a *agent) Remove(key *ecdsa.PublicKey) error {
- keyblob, err := keyblob(key)
- if err != nil {
- return err
- }
- request := bytes.NewBuffer([]byte{SSH2_AGENTC_REMOVE_IDENTITY})
- if err := write(keyblob, request); err != nil {
- return err
- }
- return a.checkSSH2_AGENT_SUCCESS(request.Bytes(), "SSH2_AGENTC_REMOVE_IDENTITY")
-}
-
-// Section 2.6.2 of PROTOCOL.agent
-func (a *agent) Sign(pub *ecdsa.PublicKey, data []byte) (r, s *big.Int, err error) {
- keyblob, err := keyblob(pub)
- if err != nil {
- return nil, nil, err
- }
- request := bytes.NewBuffer([]byte{SSH2_AGENTC_SIGN_REQUEST})
- w := func(data []byte) {
- if err == nil {
- err = write(data, request)
- }
- }
- w(keyblob)
- w(data)
- w([]byte{0, 0, 0, 0}) // flags (see Section 2.6.2 of PROTOCOL.agent)
- if err != nil {
- return nil, nil, err
- }
- reply, err := a.call(request.Bytes())
- if err != nil {
- return nil, nil, err
- }
- if len(reply) == 0 {
- return nil, nil, fmt.Errorf("empty reply for SSH2_AGENT_SIGN_REQUEST")
- }
- if reply[0] != SSH2_AGENT_SIGN_RESPONSE {
- return nil, nil, fmt.Errorf("got reply %v, want SSH2_AGENT_SIGN_RESPONSE(%d)", reply[0], SSH2_AGENT_SIGN_RESPONSE)
- }
- reply, err = read(bytes.NewBuffer(reply[1:]))
- if err != nil {
- return nil, nil, err
- }
- reader := bytes.NewBuffer(reply)
- if _, err := read(reader); err != nil { // key, e.g. ecdsa-sha2-nistp256
- return nil, nil, err
- }
- signature, err := read(reader)
- if err != nil {
- return nil, nil, err
- }
- reader = bytes.NewBuffer(signature)
- R, err := read(reader)
- if err != nil {
- return nil, nil, fmt.Errorf("failed to read signature's R: %v", err)
- }
- S, err := read(reader)
- if err != nil {
- return nil, nil, fmt.Errorf("failed to read signature's S: %v", err)
- }
- r = new(big.Int)
- s = new(big.Int)
- return r.SetBytes(R), s.SetBytes(S), nil
-}
diff --git a/runtimes/google/lib/sshagent/sshagent_unix_test.go b/runtimes/google/lib/sshagent/sshagent_unix_test.go
deleted file mode 100644
index e3e34e7..0000000
--- a/runtimes/google/lib/sshagent/sshagent_unix_test.go
+++ /dev/null
@@ -1,103 +0,0 @@
-// Do not test on OS X as the default SSH installation that ships with that (at least up till 10.9.3) is not compiled with ECDSA key support.
-// +build !darwin
-
-package sshagent_test
-
-import (
- "bufio"
- "crypto/ecdsa"
- "crypto/elliptic"
- "crypto/rand"
- "crypto/sha256"
- "fmt"
- "os"
- "os/exec"
- "reflect"
- "strings"
- "testing"
-
- "veyron/runtimes/google/lib/sshagent"
-)
-
-func TestSSHAgent(t *testing.T) {
- if err := start(); err != nil {
- t.Fatal(err)
- }
- defer stop()
- agent, err := sshagent.New()
- if err != nil {
- t.Fatal(err)
- }
- key, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
- if err != nil {
- t.Fatal(err)
- }
-
- if keys, comments, err := agent.List(); len(keys) != 0 || len(comments) != 0 || err != nil {
- t.Errorf("Got (%v, %v, %v) want ([], [], nil)", keys, comments, err)
- }
- if err := agent.Add(key, "bugs bunny"); err != nil {
- t.Error(err)
- }
- if keys, comments, err := agent.List(); len(keys) != 1 || len(comments) != 1 || err != nil || !samekey(keys[0], key) || comments[0] != "bugs bunny" {
- var match bool
- if len(keys) == 1 {
- match = samekey(keys[0], key)
- }
- t.Errorf("Got (%v, %v, %v) want ([<something>], [bugs bunny], nil), keys match: %v", keys, comments, err, match)
- }
- if r, s, err := agent.Sign(&key.PublicKey, []byte("looney tunes")); err != nil {
- t.Error(err)
- } else if !ecdsa.Verify(&key.PublicKey, sha256hash("looney tunes"), r, s) {
- t.Errorf("signature does not verify")
- }
- if err := agent.Remove(&key.PublicKey); err != nil {
- t.Error(err)
- }
- if keys, comments, err := agent.List(); len(keys) != 0 || len(comments) != 0 || err != nil {
- t.Errorf("Got (%v, %v, %v) want ([], [], nil)", keys, comments, err)
- }
-}
-
-func samekey(pub *ecdsa.PublicKey, priv *ecdsa.PrivateKey) bool {
- return reflect.DeepEqual(*pub, priv.PublicKey)
-}
-
-func sha256hash(s string) []byte {
- h := sha256.New()
- h.Write([]byte(s))
- return h.Sum(nil)
-}
-
-func start() error {
- os.Setenv(sshagent.SSH_AUTH_SOCK, "")
- os.Setenv("SSH_AGENT_PID", "")
- cmd := exec.Command("ssh-agent")
- stdout, err := cmd.StdoutPipe()
- if err != nil {
- return fmt.Errorf("failed to created stdout pipe for ssh-agent: %v", err)
- }
- if err := cmd.Start(); err != nil {
- return fmt.Errorf("failed to execute ssh-agent -d: %v", err)
- }
- reader := bufio.NewReader(stdout)
- var line1, line2 string
- if line1, err = reader.ReadString('\n'); err != nil {
- return fmt.Errorf("failed to read line1 from ssh-agent: %v", err)
- }
- if line2, err = reader.ReadString('\n'); err != nil {
- return fmt.Errorf("failed to read line2 from ssh-agent: %v", err)
- }
-
- const (
- prefix1 = sshagent.SSH_AUTH_SOCK + "="
- prefix2 = "SSH_AGENT_PID="
- )
- os.Setenv(sshagent.SSH_AUTH_SOCK, strings.Split(strings.TrimPrefix(line1, prefix1), ";")[0])
- os.Setenv("SSH_AGENT_PID", strings.Split(strings.TrimPrefix(line2, prefix2), ";")[0])
- return nil
-}
-
-func stop() {
- exec.Command("ssh-agent", "-k").Run()
-}
diff --git a/runtimes/google/vsync/dag.go b/runtimes/google/vsync/dag.go
index 9b47cc9..ac1a342 100644
--- a/runtimes/google/vsync/dag.go
+++ b/runtimes/google/vsync/dag.go
@@ -97,6 +97,7 @@
"fmt"
"veyron2/storage"
+ "veyron2/vlog"
)
type dag struct {
@@ -114,9 +115,9 @@
}
type graftInfo struct {
- newNodes map[storage.Version]bool // set of newly added nodes during a sync
- graftNodes map[storage.Version]uint64 // set of graft nodes and their level
- newHead storage.Version // new (pending) head node
+ newNodes map[storage.Version]struct{} // set of newly added nodes during a sync
+ graftNodes map[storage.Version]uint64 // set of graft nodes and their level
+ newHeads map[storage.Version]struct{} // set of candidate new head nodes
}
// openDAG opens or creates a DAG for the given filename.
@@ -177,6 +178,46 @@
}
}
+// getObjectGraft returns the graft structure for an object ID.
+// The graftInfo struct for an object is ephemeral (in-memory) and it
+// tracks the following information:
+// - newNodes: the set of newly added nodes used to detect the type of
+// edges between nodes (new-node to old-node or vice versa).
+// - newHeads: the set of new candidate head nodes used to detect conflicts.
+// - graftNodes: the set of nodes used to find common ancestors between
+// conflicting nodes.
+//
+// After the received Sync logs are applied, if there are two new heads in
+// the newHeads set, there is a conflict to be resolved for this object.
+// Otherwise if there is only one head, no conflict was triggered and the
+// new head becomes the current version for the object.
+//
+// In case of conflict, the graftNodes set is used to select the common
+// ancestor to pass to the conflict resolver.
+//
+// Note: if an object's graft structure does not exist only create it
+// if the "create" parameter is set to true.
+func (d *dag) getObjectGraft(oid storage.ID, create bool) *graftInfo {
+ graft := d.graft[oid]
+ if graft == nil && create {
+ graft = &graftInfo{
+ newNodes: make(map[storage.Version]struct{}),
+ graftNodes: make(map[storage.Version]uint64),
+ newHeads: make(map[storage.Version]struct{}),
+ }
+
+ // If a current head node exists for this object, initialize
+ // the set of candidate new heads to include it.
+ head, err := d.getHead(oid)
+ if err == nil {
+ graft.newHeads[head] = struct{}{}
+ }
+
+ d.graft[oid] = graft
+ }
+ return graft
+}
+
// addNode adds a new node for an object in the DAG, linking it to its parent nodes.
// It verifies that this node does not exist and that its parent nodes are valid.
// It also determines the DAG level of the node from its parent nodes (max() + 1).
@@ -237,21 +278,10 @@
// the origin root node), representing the most up-to-date common
// knowledge between this device and the divergent changes.
//
- // The graftInfo struct for an object is ephemeral (in-memory) and it
- // tracks the set of newly added nodes, which in turn is used to
- // detect an old-to-new edge which identifies the old node as a graft
- // node. The graft nodes and their DAG levels are tracked in a map
- // which is used to (1) detect conflicts and (2) select the common
- // ancestor to pass to the conflict resolver.
- //
// Note: at the end of a sync operation between 2 devices, the whole
// graft info is cleared (Syncd calls clearGraft()) to prepare it for
// the new pairwise sync operation.
- graft := d.graft[oid]
- if graft == nil && remote {
- graft = &graftInfo{newNodes: make(map[storage.Version]bool), graftNodes: make(map[storage.Version]uint64)}
- d.graft[oid] = graft
- }
+ graft := d.getObjectGraft(oid, remote)
// Verify the parents and determine the node level.
// Update the graft info in the DAG for this object.
@@ -267,16 +297,21 @@
if remote {
// If this parent is an old node, it's a graft point in the DAG
// and may be a common ancestor used during conflict resolution.
- if !graft.newNodes[parent] {
+ if _, ok := graft.newNodes[parent]; !ok {
graft.graftNodes[parent] = node.Level
}
+
+ // The parent nodes can no longer be candidates for new head versions.
+ if _, ok := graft.newHeads[parent]; ok {
+ delete(graft.newHeads, parent)
+ }
}
}
if remote {
- // This new node is so far the candidate for new head version.
- graft.newNodes[version] = true
- graft.newHead = version
+ // This new node is a candidate for new head version.
+ graft.newNodes[version] = struct{}{}
+ graft.newHeads[version] = struct{}{}
}
// Insert the new node in the kvdb.
@@ -293,6 +328,73 @@
return d.nodes.hasKey(key)
}
+// addParent adds to the DAG node (oid, version) linkage to this parent node.
+// If the parent linkage is due to a local change (from conflict resolution
+// by blessing an existing version), no need to update the grafting structure.
+// Otherwise a remote change (from the Sync protocol) updates the graft.
+//
+// TODO(rdaoud): add a check to avoid the creation of cycles in the DAG.
+// TODO(rdaoud): recompute the levels of reachable child-nodes if the new
+// parent's level is greater or equal to the node's current level.
+func (d *dag) addParent(oid storage.ID, version, parent storage.Version, remote bool) error {
+ node, err := d.getNode(oid, version)
+ if err != nil {
+ return err
+ }
+
+ pnode, err := d.getNode(oid, parent)
+ if err != nil {
+ vlog.VI(1).Infof("addParent: object %v, node %d, parent %d: parent node not found", oid, version, parent)
+ return err
+ }
+
+ // Check if the parent is already linked to this node.
+ found := false
+ for i := range node.Parents {
+ if node.Parents[i] == parent {
+ found = true
+ break
+ }
+ }
+
+ // If the parent is not yet linked (local or remote) add it.
+ if !found {
+ node.Parents = append(node.Parents, parent)
+ err = d.setNode(oid, version, node)
+ if err != nil {
+ return err
+ }
+ }
+
+ // For local changes we are done, the grafting structure is not updated.
+ if !remote {
+ return nil
+ }
+
+ // If the node and its parent are new/old or old/new then add
+ // the parent as a graft point (a potential common ancestor).
+ graft := d.getObjectGraft(oid, true)
+
+ _, nodeNew := graft.newNodes[version]
+ _, parentNew := graft.newNodes[parent]
+ if (nodeNew && !parentNew) || (!nodeNew && parentNew) {
+ graft.graftNodes[parent] = pnode.Level
+ }
+
+ // The parent node can no longer be a candidate for a new head version.
+ // The addParent() function only removes candidates from newHeads that
+ // have become parents. It does not add the child nodes to newHeads
+ // because they are not necessarily new-head candidates. If they are
+ // new nodes, the addNode() function handles adding them to newHeads.
+ // For old nodes, only the current head could be a candidate and it is
+ // added to newHeads when the graft struct is initialized.
+ if _, ok := graft.newHeads[parent]; ok {
+ delete(graft.newHeads, parent)
+ }
+
+ return nil
+}
+
// moveHead moves the object head node in the DAG.
func (d *dag) moveHead(oid storage.ID, head storage.Version) error {
if d.store == nil {
@@ -311,9 +413,10 @@
// new and old head nodes.
// - Yes: return (true, newHead, oldHead, ancestor)
// - No: return (false, newHead, oldHead, NoVersion)
-// A conflict exists if the current (old) head node is not one of the graft
-// point of the graph fragment just added. It means the newly added object
-// versions are not derived in part from this device's current knowledge.
+// A conflict exists when there are two new-head nodes. It means the newly
+// added object versions are not derived in part from this device's current
+// knowledge. If there is a single new-head, the object changes were applied
+// without triggering a conflict.
func (d *dag) hasConflict(oid storage.ID) (isConflict bool, newHead, oldHead, ancestor storage.Version, err error) {
oldHead = storage.NoVersion
newHead = storage.NoVersion
@@ -329,19 +432,33 @@
return
}
- newHead = graft.newHead
- oldHead, err2 := d.getHead(oid)
- if err2 != nil {
- // This is a new object not previously known on this device, so
- // it does not yet have a "head node" here. This means all the
- // DAG updates for it are new and to taken as-is (no conflict).
- // That's why we ignore the error which is only indicating the
- // lack of a prior head node.
+ numHeads := len(graft.newHeads)
+ if numHeads < 1 || numHeads > 2 {
+ err = fmt.Errorf("node %d has invalid number of new head candidates %d: %v", oid, numHeads, graft.newHeads)
return
}
- if _, ok := graft.graftNodes[oldHead]; ok {
- return // no conflict, current head is a graft point
+ // Fetch the current head for this object if it exists. The error from getHead()
+ // is ignored because a newly received object is not yet known on this device and
+ // will not trigger a conflict.
+ oldHead, _ = d.getHead(oid)
+
+ // If there is only one new head node there is no conflict.
+ // The new head is that single one, even if it might also be the same old node.
+ if numHeads == 1 {
+ for k := range graft.newHeads {
+ newHead = k
+ }
+ return
+ }
+
+ // With two candidate head nodes, the new one is the node that is
+ // not the current (old) head node.
+ for k := range graft.newHeads {
+ if k != oldHead {
+ newHead = k
+ break
+ }
}
// There is a conflict: the best choice ancestor is the graft point
@@ -514,10 +631,10 @@
// getHead retrieves the object head from the DAG.
func (d *dag) getHead(oid storage.ID) (storage.Version, error) {
- if d.store == nil {
- return 0, errors.New("invalid DAG")
- }
var version storage.Version
+ if d.store == nil {
+ return version, errors.New("invalid DAG")
+ }
key := objHeadKey(oid)
err := d.heads.get(key, &version)
if err != nil {
@@ -537,7 +654,9 @@
iterVersions = append(iterVersions, head)
}
if graft := d.graft[oid]; graft != nil {
- iterVersions = append(iterVersions, graft.newHead)
+ for k := range graft.newHeads {
+ iterVersions = append(iterVersions, k)
+ }
}
// Breadth-first traversal starting from the object head.
@@ -551,15 +670,15 @@
// getGraftNodes is a testing and debug helper function that returns for
// an object the graft information built and used during a sync operation.
-// The newHead version identifies the head node reported by the other device
-// during a sync operation. The graftNodes map identifies the set of old
-// nodes where the new DAG fragments were attached and their depth level
-// in the DAG.
-func (d *dag) getGraftNodes(oid storage.ID) (storage.Version, map[storage.Version]uint64) {
+// The newHeads map identifies the candidate head nodes based on the data
+// reported by the other device during a sync operation. The graftNodes map
+// identifies the set of old nodes where the new DAG fragments were attached
+// and their depth level in the DAG.
+func (d *dag) getGraftNodes(oid storage.ID) (map[storage.Version]struct{}, map[storage.Version]uint64) {
if d.store != nil {
if ginfo := d.graft[oid]; ginfo != nil {
- return ginfo.newHead, ginfo.graftNodes
+ return ginfo.newHeads, ginfo.graftNodes
}
}
- return 0, nil
+ return nil, nil
}
diff --git a/runtimes/google/vsync/dag_test.go b/runtimes/google/vsync/dag_test.go
index 0b5d4c8..5f94836 100644
--- a/runtimes/google/vsync/dag_test.go
+++ b/runtimes/google/vsync/dag_test.go
@@ -141,6 +141,11 @@
t.Errorf("delNode() did not fail on a closed DAG: %v", err)
}
+ err = dag.addParent(oid, 4, 2, true)
+ if err == nil || err.Error() != "invalid DAG" {
+ t.Errorf("addParent() did not fail on a closed DAG: %v", err)
+ }
+
err = dag.setHead(oid, 4)
if err == nil || err.Error() != "invalid DAG" {
t.Errorf("setHead() did not fail on a closed DAG: %v", err)
@@ -166,8 +171,8 @@
if pmap := dag.getParentMap(oid); len(pmap) != 0 {
t.Errorf("getParentMap() found data on a closed DAG: %v", pmap)
}
- if head, gmap := dag.getGraftNodes(oid); head != 0 || len(gmap) != 0 {
- t.Errorf("getGraftNodes() found data on a closed DAG: head: %d, map: %v", head, gmap)
+ if hmap, gmap := dag.getGraftNodes(oid); hmap != nil || gmap != nil {
+ t.Errorf("getGraftNodes() found data on a closed DAG: head map: %v, graft map: %v", hmap, gmap)
}
}
@@ -294,6 +299,65 @@
dag.close()
}
+// TestAddParent tests adding parents to a DAG node.
+func TestAddParent(t *testing.T) {
+ dagfile := dagFilename()
+ defer os.Remove(dagfile)
+
+ dag, err := openDAG(dagfile)
+ if err != nil {
+ t.Fatalf("Cannot open new DAG file %s", dagfile)
+ }
+
+ version := storage.Version(7)
+ oid, err := strToObjID("789")
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ if err = dag.addParent(oid, version, 1, true); err == nil {
+ t.Errorf("addParent() did not fail for an unknown object %d:%d in DAG file %s", oid, version, dagfile)
+ }
+
+ node := &dagNode{Level: 15, Logrec: "logrec-22"}
+ if err = dag.setNode(oid, version, node); err != nil {
+ t.Fatalf("Cannot set object %d:%d (%v) in DAG file %s", oid, version, node, dagfile)
+ }
+
+ for _, parent := range []storage.Version{1, 2, 3} {
+ if err = dag.addParent(oid, version, parent, true); err == nil {
+ t.Errorf("addParent() did not reject invalid parent %d for object %d:%d in DAG file %s",
+ parent, oid, version, dagfile)
+ }
+
+ pnode := &dagNode{Level: 11, Logrec: fmt.Sprint("logrec-%d", parent)}
+ if err = dag.setNode(oid, parent, pnode); err != nil {
+ t.Fatalf("Cannot set parent object %d:%d (%v) in DAG file %s", oid, parent, pnode, dagfile)
+ }
+
+ remote := parent%2 == 0
+ for i := 0; i < 2; i++ {
+ if err = dag.addParent(oid, version, parent, remote); err != nil {
+ t.Errorf("addParent() failed on parent %d, remote %d (i=%d) for object %d:%d in DAG file %s: %v",
+ parent, remote, i, oid, version, dagfile, err)
+ }
+ }
+ }
+
+ node2, err := dag.getNode(oid, version)
+ if err != nil || node2 == nil {
+ t.Errorf("Cannot find stored object %d:%d in DAG file %s", oid, version, dagfile)
+ }
+
+ expParents := []storage.Version{1, 2, 3}
+ if !reflect.DeepEqual(node2.Parents, expParents) {
+ t.Errorf("invalid parents for object %d:%d in DAG file %s: %v instead of %v",
+ oid, version, dagfile, node2.Parents, expParents)
+ }
+
+ dag.close()
+}
+
// TestSetHead tests setting and getting a DAG head node across DAG open/close/reopen.
func TestSetHead(t *testing.T) {
dagfile := dagFilename()
@@ -357,9 +421,9 @@
d.clearGraft()
// There should be no grafting info, and hasConflict() should fail.
- newHead, grafts := d.getGraftNodes(oid)
- if newHead != 0 || grafts != nil {
- return fmt.Errorf("Object %d: graft info not cleared: newhead (%d), grafts (%v)", oid, newHead, grafts)
+ newHeads, grafts := d.getGraftNodes(oid)
+ if newHeads != nil || grafts != nil {
+ return fmt.Errorf("Object %d: graft info not cleared: newHeads (%v), grafts (%v)", oid, newHeads, grafts)
}
isConflict, newHead, oldHead, ancestor, errConflict := d.hasConflict(oid)
@@ -472,9 +536,11 @@
}
// Verify the grafting of remote nodes.
- newHead, grafts := dag.getGraftNodes(oid)
- if newHead != 2 {
- t.Errorf("Object %d has invalid newhead %d in graft info in DAG file %s", oid, newHead, dagfile)
+ newHeads, grafts := dag.getGraftNodes(oid)
+
+ expNewHeads := map[storage.Version]struct{}{2: struct{}{}}
+ if !reflect.DeepEqual(newHeads, expNewHeads) {
+ t.Errorf("Object %d has invalid newHeads in DAG file %s: (%v) instead of (%v)", oid, dagfile, newHeads, expNewHeads)
}
expgrafts := map[storage.Version]uint64{}
@@ -551,9 +617,11 @@
}
// Verify the grafting of remote nodes.
- newHead, grafts := dag.getGraftNodes(oid)
- if newHead != 5 {
- t.Errorf("Object %d has invalid newhead %d in graft info in DAG file %s", oid, newHead, dagfile)
+ newHeads, grafts := dag.getGraftNodes(oid)
+
+ expNewHeads := map[storage.Version]struct{}{5: struct{}{}}
+ if !reflect.DeepEqual(newHeads, expNewHeads) {
+ t.Errorf("Object %d has invalid newHeads in DAG file %s: (%v) instead of (%v)", oid, dagfile, newHeads, expNewHeads)
}
expgrafts := map[storage.Version]uint64{2: 2}
@@ -640,9 +708,11 @@
}
// Verify the grafting of remote nodes.
- newHead, grafts := dag.getGraftNodes(oid)
- if newHead != 5 {
- t.Errorf("Object %d has invalid newhead %d in graft info in DAG file %s", oid, newHead, dagfile)
+ newHeads, grafts := dag.getGraftNodes(oid)
+
+ expNewHeads := map[storage.Version]struct{}{2: struct{}{}, 5: struct{}{}}
+ if !reflect.DeepEqual(newHeads, expNewHeads) {
+ t.Errorf("Object %d has invalid newHeads in DAG file %s: (%v) instead of (%v)", oid, dagfile, newHeads, expNewHeads)
}
expgrafts := map[storage.Version]uint64{1: 1}
@@ -740,9 +810,11 @@
}
// Verify the grafting of remote nodes.
- newHead, grafts := dag.getGraftNodes(oid)
- if newHead != 5 {
- t.Errorf("Object %d has invalid newhead %d in graft info in DAG file %s", oid, newHead, dagfile)
+ newHeads, grafts := dag.getGraftNodes(oid)
+
+ expNewHeads := map[storage.Version]struct{}{2: struct{}{}, 5: struct{}{}}
+ if !reflect.DeepEqual(newHeads, expNewHeads) {
+ t.Errorf("Object %d has invalid newHeads in DAG file %s: (%v) instead of (%v)", oid, dagfile, newHeads, expNewHeads)
}
expgrafts := map[storage.Version]uint64{0: 0, 1: 1}
@@ -1090,3 +1162,339 @@
}
dag.close()
}
+
+// TestRemoteLinkedNoConflictSameHead tests sync of remote updates that contain
+// linked nodes (conflict resolution by selecting an existing version) on top of
+// a local initial state without conflict. An object is created locally and
+// updated twice (v0 -> v1 -> v2). Another device has learned about v0, created
+// (v0 -> v3), then learned about (v0 -> v1) and resolved that conflict by selecting
+// v1 over v3. Now it sends that new info (v3 and the v1/v3 link) back to the
+// original (local) device. Instead of a v2/v3 conflict, the device sees that
+// v1 was chosen over v3 and resolves it as a no-conflict case.
+func TestRemoteLinkedNoConflictSameHead(t *testing.T) {
+ dagfile := dagFilename()
+ defer os.Remove(dagfile)
+
+ dag, err := openDAG(dagfile)
+ if err != nil {
+ t.Fatalf("Cannot open new DAG file %s", dagfile)
+ }
+
+ if err = dagReplayCommands(dag, "local-init-00.sync"); err != nil {
+ t.Fatal(err)
+ }
+ if err = dagReplayCommands(dag, "remote-noconf-link-00.log.sync"); err != nil {
+ t.Fatal(err)
+ }
+
+ // The head must not have moved (i.e. still at v2) and the parent map
+ // shows the newly grafted DAG fragment on top of the prior DAG.
+ oid, err := strToObjID("12345")
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ if head, e := dag.getHead(oid); e != nil || head != 2 {
+ t.Errorf("Object %d has wrong head in DAG file %s: %d", oid, dagfile, head)
+ }
+
+ pmap := dag.getParentMap(oid)
+
+ exp := map[storage.Version][]storage.Version{0: nil, 1: {0, 3}, 2: {1}, 3: {0}}
+
+ if !reflect.DeepEqual(pmap, exp) {
+ t.Errorf("Invalid object %d parent map in DAG file %s: (%v) instead of (%v)", oid, dagfile, pmap, exp)
+ }
+
+ // Verify the grafting of remote nodes.
+ newHeads, grafts := dag.getGraftNodes(oid)
+
+ expNewHeads := map[storage.Version]struct{}{2: struct{}{}}
+ if !reflect.DeepEqual(newHeads, expNewHeads) {
+ t.Errorf("Object %d has invalid newHeads in DAG file %s: (%v) instead of (%v)", oid, dagfile, newHeads, expNewHeads)
+ }
+
+ expgrafts := map[storage.Version]uint64{0: 0, 3: 1}
+ if !reflect.DeepEqual(grafts, expgrafts) {
+ t.Errorf("Invalid object %d graft in DAG file %s: (%v) instead of (%v)", oid, dagfile, grafts, expgrafts)
+ }
+
+ // There should be no conflict.
+ isConflict, newHead, oldHead, ancestor, errConflict := dag.hasConflict(oid)
+ if !(!isConflict && newHead == 2 && oldHead == 2 && ancestor == 0 && errConflict == nil) {
+ t.Errorf("Object %d wrong conflict info: flag %t, newHead %d, oldHead %d, ancestor %d, err %v",
+ oid, isConflict, newHead, oldHead, ancestor, errConflict)
+ }
+
+ // Clear the grafting data and verify that hasConflict() fails without it.
+ dag.clearGraft()
+ isConflict, newHead, oldHead, ancestor, errConflict = dag.hasConflict(oid)
+ if errConflict == nil {
+ t.Errorf("hasConflict() did not fail w/o graft info: flag %t, newHead %d, oldHead %d, ancestor %d, err %v",
+ oid, isConflict, newHead, oldHead, ancestor, errConflict)
+ }
+
+ if err := checkEndOfSync(dag, oid); err != nil {
+ t.Fatal(err)
+ }
+}
+
+// TestRemoteLinkedConflict tests sync of remote updates that contain linked
+// nodes (conflict resolution by selecting an existing version) on top of a local
+// initial state triggering a local conflict. An object is created locally and
+// updated twice (v0 -> v1 -> v2). Another device has along the way learned about v0,
+// created (v0 -> v3), then learned about (v0 -> v1) and resolved that conflict by
+// selecting v3 over v1. Now it sends that new info (v3 and the v3/v1 link) back
+// to the original (local) device. The device sees a v2/v3 conflict.
+func TestRemoteLinkedConflict(t *testing.T) {
+ dagfile := dagFilename()
+ defer os.Remove(dagfile)
+
+ dag, err := openDAG(dagfile)
+ if err != nil {
+ t.Fatalf("Cannot open new DAG file %s", dagfile)
+ }
+
+ if err = dagReplayCommands(dag, "local-init-00.sync"); err != nil {
+ t.Fatal(err)
+ }
+ if err = dagReplayCommands(dag, "remote-conf-link.log.sync"); err != nil {
+ t.Fatal(err)
+ }
+
+ // The head must not have moved (i.e. still at v2) and the parent map
+ // shows the newly grafted DAG fragment on top of the prior DAG.
+ oid, err := strToObjID("12345")
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ if head, e := dag.getHead(oid); e != nil || head != 2 {
+ t.Errorf("Object %d has wrong head in DAG file %s: %d", oid, dagfile, head)
+ }
+
+ pmap := dag.getParentMap(oid)
+
+ exp := map[storage.Version][]storage.Version{0: nil, 1: {0}, 2: {1}, 3: {0, 1}}
+
+ if !reflect.DeepEqual(pmap, exp) {
+ t.Errorf("Invalid object %d parent map in DAG file %s: (%v) instead of (%v)", oid, dagfile, pmap, exp)
+ }
+
+ // Verify the grafting of remote nodes.
+ newHeads, grafts := dag.getGraftNodes(oid)
+
+ expNewHeads := map[storage.Version]struct{}{2: struct{}{}, 3: struct{}{}}
+ if !reflect.DeepEqual(newHeads, expNewHeads) {
+ t.Errorf("Object %d has invalid newHeads in DAG file %s: (%v) instead of (%v)", oid, dagfile, newHeads, expNewHeads)
+ }
+
+ expgrafts := map[storage.Version]uint64{0: 0, 1: 1}
+ if !reflect.DeepEqual(grafts, expgrafts) {
+ t.Errorf("Invalid object %d graft in DAG file %s: (%v) instead of (%v)", oid, dagfile, grafts, expgrafts)
+ }
+
+ // There should be no conflict.
+ isConflict, newHead, oldHead, ancestor, errConflict := dag.hasConflict(oid)
+ if !(isConflict && newHead == 3 && oldHead == 2 && ancestor == 1 && errConflict == nil) {
+ t.Errorf("Object %d wrong conflict info: flag %t, newHead %d, oldHead %d, ancestor %d, err %v",
+ oid, isConflict, newHead, oldHead, ancestor, errConflict)
+ }
+
+ // Clear the grafting data and verify that hasConflict() fails without it.
+ dag.clearGraft()
+ isConflict, newHead, oldHead, ancestor, errConflict = dag.hasConflict(oid)
+ if errConflict == nil {
+ t.Errorf("hasConflict() did not fail w/o graft info: flag %t, newHead %d, oldHead %d, ancestor %d, err %v",
+ oid, isConflict, newHead, oldHead, ancestor, errConflict)
+ }
+
+ if err := checkEndOfSync(dag, oid); err != nil {
+ t.Fatal(err)
+ }
+}
+
+// TestRemoteLinkedNoConflictNewHead tests sync of remote updates that contain
+// linked nodes (conflict resolution by selecting an existing version) on top of
+// a local initial state without conflict, but moves the head node to a new one.
+// An object is created locally and updated twice (v0 -> v1 -> v2). Another device
+// has along the way learned about v0, created (v0 -> v3), then learned about
+// (v0 -> v1 -> v2) and resolved that conflict by selecting v3 over v2. Now it
+// sends that new info (v3 and the v3/v2 link) back to the original (local) device.
+// The device sees that the new head v3 is "derived" from v2 thus no conflict.
+func TestRemoteLinkedConflictNewHead(t *testing.T) {
+ dagfile := dagFilename()
+ defer os.Remove(dagfile)
+
+ dag, err := openDAG(dagfile)
+ if err != nil {
+ t.Fatalf("Cannot open new DAG file %s", dagfile)
+ }
+
+ if err = dagReplayCommands(dag, "local-init-00.sync"); err != nil {
+ t.Fatal(err)
+ }
+ if err = dagReplayCommands(dag, "remote-noconf-link-01.log.sync"); err != nil {
+ t.Fatal(err)
+ }
+
+ // The head must not have moved (i.e. still at v2) and the parent map
+ // shows the newly grafted DAG fragment on top of the prior DAG.
+ oid, err := strToObjID("12345")
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ if head, e := dag.getHead(oid); e != nil || head != 2 {
+ t.Errorf("Object %d has wrong head in DAG file %s: %d", oid, dagfile, head)
+ }
+
+ pmap := dag.getParentMap(oid)
+
+ exp := map[storage.Version][]storage.Version{0: nil, 1: {0}, 2: {1}, 3: {0, 2}}
+
+ if !reflect.DeepEqual(pmap, exp) {
+ t.Errorf("Invalid object %d parent map in DAG file %s: (%v) instead of (%v)", oid, dagfile, pmap, exp)
+ }
+
+ // Verify the grafting of remote nodes.
+ newHeads, grafts := dag.getGraftNodes(oid)
+
+ expNewHeads := map[storage.Version]struct{}{3: struct{}{}}
+ if !reflect.DeepEqual(newHeads, expNewHeads) {
+ t.Errorf("Object %d has invalid newHeads in DAG file %s: (%v) instead of (%v)", oid, dagfile, newHeads, expNewHeads)
+ }
+
+ expgrafts := map[storage.Version]uint64{0: 0, 2: 2}
+ if !reflect.DeepEqual(grafts, expgrafts) {
+ t.Errorf("Invalid object %d graft in DAG file %s: (%v) instead of (%v)", oid, dagfile, grafts, expgrafts)
+ }
+
+ // There should be no conflict.
+ isConflict, newHead, oldHead, ancestor, errConflict := dag.hasConflict(oid)
+ if !(!isConflict && newHead == 3 && oldHead == 2 && ancestor == 0 && errConflict == nil) {
+ t.Errorf("Object %d wrong conflict info: flag %t, newHead %d, oldHead %d, ancestor %d, err %v",
+ oid, isConflict, newHead, oldHead, ancestor, errConflict)
+ }
+
+ // Clear the grafting data and verify that hasConflict() fails without it.
+ dag.clearGraft()
+ isConflict, newHead, oldHead, ancestor, errConflict = dag.hasConflict(oid)
+ if errConflict == nil {
+ t.Errorf("hasConflict() did not fail w/o graft info: flag %t, newHead %d, oldHead %d, ancestor %d, err %v",
+ oid, isConflict, newHead, oldHead, ancestor, errConflict)
+ }
+
+ if err := checkEndOfSync(dag, oid); err != nil {
+ t.Fatal(err)
+ }
+}
+
+// TestRemoteLinkedNoConflictNewHeadOvertake tests sync of remote updates that
+// contain linked nodes (conflict resolution by selecting an existing version)
+// on top of a local initial state without conflict, but moves the head node
+// to a new one that overtook the linked node.
+// An object is created locally and updated twice (v0 -> v1 -> v2). Another
+// device has along the way learned about v0, created (v0 -> v3), then learned
+// about (v0 -> v1 -> v2) and resolved that conflict by selecting v2 over v3.
+// Then it creates a new update v4 from v2 (v2 -> v4). Now it sends that new
+// info (v3, the v2/v3 link, and v4) back to the original (local) device.
+// The device sees that the new head v4 is "derived" from v2 thus no conflict.
+func TestRemoteLinkedConflictNewHeadOvertake(t *testing.T) {
+ dagfile := dagFilename()
+ defer os.Remove(dagfile)
+
+ dag, err := openDAG(dagfile)
+ if err != nil {
+ t.Fatalf("Cannot open new DAG file %s", dagfile)
+ }
+
+ if err = dagReplayCommands(dag, "local-init-00.sync"); err != nil {
+ t.Fatal(err)
+ }
+ if err = dagReplayCommands(dag, "remote-noconf-link-02.log.sync"); err != nil {
+ t.Fatal(err)
+ }
+
+ // The head must not have moved (i.e. still at v2) and the parent map
+ // shows the newly grafted DAG fragment on top of the prior DAG.
+ oid, err := strToObjID("12345")
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ if head, e := dag.getHead(oid); e != nil || head != 2 {
+ t.Errorf("Object %d has wrong head in DAG file %s: %d", oid, dagfile, head)
+ }
+
+ pmap := dag.getParentMap(oid)
+
+ exp := map[storage.Version][]storage.Version{0: nil, 1: {0}, 2: {1, 3}, 3: {0}, 4: {2}}
+
+ if !reflect.DeepEqual(pmap, exp) {
+ t.Errorf("Invalid object %d parent map in DAG file %s: (%v) instead of (%v)", oid, dagfile, pmap, exp)
+ }
+
+ // Verify the grafting of remote nodes.
+ newHeads, grafts := dag.getGraftNodes(oid)
+
+ expNewHeads := map[storage.Version]struct{}{4: struct{}{}}
+ if !reflect.DeepEqual(newHeads, expNewHeads) {
+ t.Errorf("Object %d has invalid newHeads in DAG file %s: (%v) instead of (%v)", oid, dagfile, newHeads, expNewHeads)
+ }
+
+ expgrafts := map[storage.Version]uint64{0: 0, 2: 2, 3: 1}
+ if !reflect.DeepEqual(grafts, expgrafts) {
+ t.Errorf("Invalid object %d graft in DAG file %s: (%v) instead of (%v)", oid, dagfile, grafts, expgrafts)
+ }
+
+ // There should be no conflict.
+ isConflict, newHead, oldHead, ancestor, errConflict := dag.hasConflict(oid)
+ if !(!isConflict && newHead == 4 && oldHead == 2 && ancestor == 0 && errConflict == nil) {
+ t.Errorf("Object %d wrong conflict info: flag %t, newHead %d, oldHead %d, ancestor %d, err %v",
+ oid, isConflict, newHead, oldHead, ancestor, errConflict)
+ }
+
+ // Then we can move the head and clear the grafting data.
+ if err = dag.moveHead(oid, newHead); err != nil {
+ t.Errorf("Object %d cannot move head to %d in DAG file %s: %v", oid, newHead, dagfile, err)
+ }
+
+ // Clear the grafting data and verify that hasConflict() fails without it.
+ dag.clearGraft()
+ isConflict, newHead, oldHead, ancestor, errConflict = dag.hasConflict(oid)
+ if errConflict == nil {
+ t.Errorf("hasConflict() did not fail w/o graft info: flag %t, newHead %d, oldHead %d, ancestor %d, err %v",
+ oid, isConflict, newHead, oldHead, ancestor, errConflict)
+ }
+
+ // Now new info comes from another device repeating the v2/v3 link.
+ // Verify that it is a NOP (no changes).
+ if err = dagReplayCommands(dag, "remote-noconf-link-repeat.log.sync"); err != nil {
+ t.Fatal(err)
+ }
+
+ if head, e := dag.getHead(oid); e != nil || head != 4 {
+ t.Errorf("Object %d has wrong head in DAG file %s: %d", oid, dagfile, head)
+ }
+
+ newHeads, grafts = dag.getGraftNodes(oid)
+ if !reflect.DeepEqual(newHeads, expNewHeads) {
+ t.Errorf("Object %d has invalid newHeads in DAG file %s: (%v) instead of (%v)", oid, dagfile, newHeads, expNewHeads)
+ }
+
+ expgrafts = map[storage.Version]uint64{}
+ if !reflect.DeepEqual(grafts, expgrafts) {
+ t.Errorf("Invalid object %d graft in DAG file %s: (%v) instead of (%v)", oid, dagfile, grafts, expgrafts)
+ }
+
+ isConflict, newHead, oldHead, ancestor, errConflict = dag.hasConflict(oid)
+ if !(!isConflict && newHead == 4 && oldHead == 4 && ancestor == 0 && errConflict == nil) {
+ t.Errorf("Object %d wrong conflict info: flag %t, newHead %d, oldHead %d, ancestor %d, err %v",
+ oid, isConflict, newHead, oldHead, ancestor, errConflict)
+ }
+
+ if err := checkEndOfSync(dag, oid); err != nil {
+ t.Fatal(err)
+ }
+}
diff --git a/runtimes/google/vsync/gc.go b/runtimes/google/vsync/gc.go
index 726868c..9dc8dd1 100644
--- a/runtimes/google/vsync/gc.go
+++ b/runtimes/google/vsync/gc.go
@@ -96,7 +96,9 @@
// strictCheck when enabled performs strict checking of every
// log record being deleted to confirm that it should be in
// fact deleted.
- strictCheck = true
+ // TODO(hpucha): Support strictCheck in the presence
+ // of Link log records.
+ strictCheck = false
// Every compactCount iterations of garbage collection, kvdb
// is compacted. This value has performance implications as
@@ -304,6 +306,12 @@
return err
}
+ if rec.RecType == LinkRec {
+ // For a link log record, gc it right away.
+ g.dagPruneCallBack(logRecKey(devid, gnum, l))
+ continue
+ }
+
// Insert the object in this log record to the prune
// map if needed.
// If this object does not exist, create it.
@@ -441,19 +449,21 @@
if err != nil {
return err
}
- objHist, ok := g.verifyPruneMap[rec.ObjID]
- if !ok {
- return errors.New("obj not found in verifyMap")
- }
- _, found := objHist.versions[rec.CurVers]
- // If online consistency check is in progress, we
- // cannot strictly verify all the versions to be
- // deleted, and we ignore the failure to find a
- // version.
- if found {
- delete(objHist.versions, rec.CurVers)
- } else if !g.checkConsistency {
- return errors.New("verification failed")
+ if rec.RecType == NodeRec {
+ objHist, ok := g.verifyPruneMap[rec.ObjID]
+ if !ok {
+ return errors.New("obj not found in verifyMap")
+ }
+ _, found := objHist.versions[rec.CurVers]
+ // If online consistency check is in progress, we
+ // cannot strictly verify all the versions to be
+ // deleted, and we ignore the failure to find a
+ // version.
+ if found {
+ delete(objHist.versions, rec.CurVers)
+ } else if !g.checkConsistency {
+ return errors.New("verification failed")
+ }
}
}
diff --git a/runtimes/google/vsync/ilog.go b/runtimes/google/vsync/ilog.go
index feb92a8..930df89 100644
--- a/runtimes/google/vsync/ilog.go
+++ b/runtimes/google/vsync/ilog.go
@@ -45,6 +45,7 @@
"veyron/services/store/raw"
"veyron2/storage"
+ "veyron2/vlog"
)
var (
@@ -300,12 +301,13 @@
return l.gens.del(key)
}
-// createLocalLogRec creates a new local log record.
+// createLocalLogRec creates a new local log record of type NodeRec.
func (l *iLog) createLocalLogRec(obj storage.ID, vers storage.Version, par []storage.Version, val *LogValue) (*LogRec, error) {
rec := &LogRec{
- DevID: l.s.id,
- GNum: l.head.Curgen,
- LSN: l.head.Curlsn,
+ DevID: l.s.id,
+ GNum: l.head.Curgen,
+ LSN: l.head.Curlsn,
+ RecType: NodeRec,
ObjID: obj,
CurVers: vers,
@@ -319,6 +321,25 @@
return rec, nil
}
+// createLocalLinkLogRec creates a new local log record of type LinkRec.
+func (l *iLog) createLocalLinkLogRec(obj storage.ID, vers, par storage.Version) (*LogRec, error) {
+ rec := &LogRec{
+ DevID: l.s.id,
+ GNum: l.head.Curgen,
+ LSN: l.head.Curlsn,
+ RecType: LinkRec,
+
+ ObjID: obj,
+ CurVers: vers,
+ Parents: []storage.Version{par},
+ }
+
+ // Increment the LSN for the local log.
+ l.head.Curlsn++
+
+ return rec, nil
+}
+
// createRemoteGeneration adds a new remote generation.
func (l *iLog) createRemoteGeneration(dev DeviceID, gnum GenID, gen *genMetadata) error {
if l.db == nil {
@@ -357,6 +378,7 @@
}
err := l.putGenMetadata(l.s.id, g, val)
+ vlog.VI(2).Infof("createLocalGeneration:: created gen %d %v", g, val)
// Move to the next generation irrespective of err.
l.head.Curorder++
l.head.Curgen++
@@ -370,6 +392,8 @@
if l.db == nil {
return errInvalidLog
}
+
+ vlog.VI(2).Infof("processWatchRecord:: adding object %v %v", objID, vers)
// Check if the object version already exists in the DAG. if so return.
if l.s.dag.hasNode(objID, vers) {
return nil
diff --git a/runtimes/google/vsync/initiator.go b/runtimes/google/vsync/initiator.go
index 67422d9..af7ece9 100644
--- a/runtimes/google/vsync/initiator.go
+++ b/runtimes/google/vsync/initiator.go
@@ -44,7 +44,7 @@
// peerSyncInterval is the duration between two consecutive
// sync events. In every sync event, the initiator contacts
// one of its peers to obtain any pending updates.
- peerSyncInterval = 100 * time.Millisecond
+ peerSyncInterval = 50 * time.Millisecond
// peerSelectionPolicy is the policy used to select a peer when
// the initiator gets a chance to sync.
@@ -191,6 +191,7 @@
return GenVector{}, err
}
+ vlog.VI(2).Infof("updateLocalGeneration:: created gen %d", gen)
// Update local generation vector in devTable.
if err = i.syncd.devtab.updateGeneration(i.syncd.id, i.syncd.id, gen); err != nil {
return GenVector{}, err
@@ -308,10 +309,16 @@
if err != nil {
return err
}
- if err = i.syncd.dag.addNode(rec.ObjID, rec.CurVers, true, rec.Parents, logKey); err != nil {
- return err
+
+ vlog.VI(2).Infof("insertRecInLogAndDag:: Adding log record %v", rec)
+ switch rec.RecType {
+ case NodeRec:
+ return i.syncd.dag.addNode(rec.ObjID, rec.CurVers, true, rec.Parents, logKey)
+ case LinkRec:
+ return i.syncd.dag.addParent(rec.ObjID, rec.CurVers, rec.Parents[0], true)
+ default:
+ return fmt.Errorf("unknown log record type")
}
- return nil
}
// createGenMetadataBatch inserts a batch of generations into the log.
@@ -337,12 +344,31 @@
// processUpdatedObjects processes all the updates received by the
// initiator, one object at a time. For each updated object, we first
-// check if the object has any conflicts. If there is a conflict, we
-// resolve the conflict and generate a new store mutation reflecting
-// the conflict resolution. If there is no conflict, we generate a
-// store mutation to simply update the store to the latest value. We
-// then put all these mutations in the store. If the put succeeds, we
-// update the log and dag state suitably (move the head ptr of the
+// check if the object has any conflicts, resulting in three
+// possibilities:
+//
+// * There is no conflict, and no updates are needed to the store
+// (isConflict=false, newHead == oldHead). All changes received convey
+// information that still keeps the local head as the most recent
+// version. This occurs when conflicts are resolved by blessings.
+//
+// * There is no conflict, but a remote version is discovered that
+// builds on the local head (isConflict=false, newHead != oldHead). In
+// this case, we generate a store mutation to simply update the store
+// to the latest value.
+//
+// * There is a conflict and we call into the app or the system to
+// resolve the conflict, resulting in three possibilties: (a) conflict
+// was resolved by blessing the local version. In this case, store
+// need not be updated, but a link is added to record the
+// blessing. (b) conflict was resolved by blessing the remote
+// version. In this case, store is updated with the remote version and
+// a link is added as well. (c) conflict was resolved by generating a
+// new store mutation. In this case, store is updated with the new
+// version.
+//
+// We then put all these mutations in the store. If the put succeeds,
+// we update the log and dag state suitably (move the head ptr of the
// object in the dag to the latest version, and create a new log
// record reflecting conflict resolution if any). Puts to store can
// fail since preconditions on the objects may have been violated. In
@@ -355,12 +381,11 @@
return err
}
- m, err := i.resolveConflicts()
- if err != nil {
+ if err := i.resolveConflicts(); err != nil {
return err
}
- err = i.updateStoreAndSync(ctx, m, local, minGens, remote, dID)
+ err := i.updateStoreAndSync(ctx, local, minGens, remote, dID)
if err == nil {
break
}
@@ -370,7 +395,7 @@
// solution. Next iteration will have coordination
// with watch thread to intelligently retry. Hence
// this value is not a config param.
- time.Sleep(10 * time.Second)
+ time.Sleep(1 * time.Second)
}
// Remove any pending state.
@@ -390,60 +415,28 @@
// Check if object has a conflict.
var err error
st.isConflict, st.newHead, st.oldHead, st.ancestor, err = i.syncd.dag.hasConflict(obj)
+ vlog.VI(2).Infof("detectConflicts:: object %v state %v err %v",
+ obj, st, err)
if err != nil {
return err
}
- if !st.isConflict {
- rec, err := i.getLogRec(obj, st.newHead)
- if err != nil {
- return err
- }
- st.resolvVal = &rec.Value
- // Sanity check.
- if st.resolvVal.Mutation.Version != st.newHead {
- return fmt.Errorf("bad mutation %d %d",
- st.resolvVal.Mutation.Version, st.newHead)
- }
- }
}
return nil
}
-// getLogRec returns the log record corresponding to a given object and its version.
-func (i *syncInitiator) getLogRec(obj storage.ID, vers storage.Version) (*LogRec, error) {
- logKey, err := i.syncd.dag.getLogrec(obj, vers)
- if err != nil {
- return nil, err
- }
- dev, gen, lsn, err := splitLogRecKey(logKey)
- if err != nil {
- return nil, err
- }
- rec, err := i.syncd.log.getLogRec(dev, gen, lsn)
- if err != nil {
- return nil, err
- }
- return rec, nil
-}
-
-// resolveConflicts resolves conflicts for updated objects.
-func (i *syncInitiator) resolveConflicts() ([]raw.Mutation, error) {
+// resolveConflicts resolves conflicts for updated objects. Conflicts
+// may be resolved by adding new versions or blessing either the local
+// or the remote version.
+func (i *syncInitiator) resolveConflicts() error {
switch conflictResolutionPolicy {
case useTime:
if err := i.resolveConflictsByTime(); err != nil {
- return nil, err
+ return err
}
default:
- return nil, fmt.Errorf("unknown conflict resolution policy")
+ return fmt.Errorf("unknown conflict resolution policy")
}
-
- var m []raw.Mutation
- for _, st := range i.updObjects {
- // Append to mutations.
- st.resolvVal.Mutation.PriorVersion = st.oldHead
- m = append(m, st.resolvVal.Mutation)
- }
- return m, nil
+ return nil
}
// resolveConflictsByTime resolves conflicts using the timestamps
@@ -482,11 +475,10 @@
res = 1
}
- m := lrecs[res].Value.Mutation
- m.Version = storage.NewVersion()
-
- // TODO(hpucha): handle continue and delete flags.
- st.resolvVal = &LogValue{Mutation: m}
+ // Instead of creating a new version that resolves the
+ // conflict, we are blessing an existing version as
+ // the conflict resolution.
+ st.resolvVal = &lrecs[res].Value
}
return nil
@@ -511,11 +503,38 @@
// updateStoreAndSync updates the store, and if that is successful,
// updates log and dag data structures.
-func (i *syncInitiator) updateStoreAndSync(ctx context.T, m []raw.Mutation, local, minGens, remote GenVector, dID DeviceID) error {
+func (i *syncInitiator) updateStoreAndSync(ctx context.T, local, minGens, remote GenVector, dID DeviceID) error {
// TODO(hpucha): Eliminate reaching into syncd's lock.
i.syncd.lock.Lock()
defer i.syncd.lock.Unlock()
+ var m []raw.Mutation
+ for obj, st := range i.updObjects {
+ if !st.isConflict {
+ rec, err := i.getLogRec(obj, st.newHead)
+ if err != nil {
+ return err
+ }
+ st.resolvVal = &rec.Value
+ // Sanity check.
+ if st.resolvVal.Mutation.Version != st.newHead {
+ return fmt.Errorf("bad mutation %d %d",
+ st.resolvVal.Mutation.Version, st.newHead)
+ }
+ }
+
+ // If the local version is picked, no further updates
+ // to the store are needed. If the remote version is
+ // picked, we put it in the store.
+ if st.resolvVal.Mutation.Version != st.oldHead {
+ // Append to mutations.
+ st.resolvVal.Mutation.PriorVersion = st.oldHead
+ vlog.VI(2).Infof("updateStoreAndSync:: appending mutation %v for obj %v",
+ st.resolvVal.Mutation, obj)
+ m = append(m, st.resolvVal.Mutation)
+ }
+ }
+
// TODO(hpucha): We will hold the lock across PutMutations rpc
// to prevent a race with watcher. The next iteration will
// clean up this coordination.
@@ -553,30 +572,68 @@
return nil
}
+// getLogRec returns the log record corresponding to a given object and its version.
+func (i *syncInitiator) getLogRec(obj storage.ID, vers storage.Version) (*LogRec, error) {
+ logKey, err := i.syncd.dag.getLogrec(obj, vers)
+ if err != nil {
+ return nil, err
+ }
+ dev, gen, lsn, err := splitLogRecKey(logKey)
+ if err != nil {
+ return nil, err
+ }
+ rec, err := i.syncd.log.getLogRec(dev, gen, lsn)
+ if err != nil {
+ return nil, err
+ }
+ return rec, nil
+}
+
// updateLogAndDag updates the log and dag data structures on a successful store put.
func (i *syncInitiator) updateLogAndDag() error {
for obj, st := range i.updObjects {
if st.isConflict {
// Object had a conflict, which was resolved successfully.
// Put is successful, create a log record.
- parents := []storage.Version{st.newHead, st.oldHead}
- rec, err := i.syncd.log.createLocalLogRec(obj, st.resolvVal.Mutation.Version, parents, st.resolvVal)
+ var err error
+ var rec *LogRec
+
+ switch {
+ case st.resolvVal.Mutation.Version == st.oldHead:
+ // Local version was blessed as the conflict resolution.
+ rec, err = i.syncd.log.createLocalLinkLogRec(obj, st.oldHead, st.newHead)
+ case st.resolvVal.Mutation.Version == st.newHead:
+ // Remote version was blessed as the conflict resolution.
+ rec, err = i.syncd.log.createLocalLinkLogRec(obj, st.newHead, st.oldHead)
+ default:
+ // New version was created to resolve the conflict.
+ parents := []storage.Version{st.newHead, st.oldHead}
+ rec, err = i.syncd.log.createLocalLogRec(obj, st.resolvVal.Mutation.Version, parents, st.resolvVal)
+
+ }
if err != nil {
return err
}
-
logKey, err := i.syncd.log.putLogRec(rec)
if err != nil {
return err
}
-
- // Put is successful, add a new DAG node.
- if err = i.syncd.dag.addNode(obj, st.resolvVal.Mutation.Version, false, parents, logKey); err != nil {
+ // Add a new DAG node.
+ switch rec.RecType {
+ case NodeRec:
+ err = i.syncd.dag.addNode(obj, rec.CurVers, false, rec.Parents, logKey)
+ case LinkRec:
+ err = i.syncd.dag.addParent(obj, rec.CurVers, rec.Parents[0], false)
+ default:
+ return fmt.Errorf("unknown log record type")
+ }
+ if err != nil {
return err
}
}
- // Move the head.
+ // Move the head. This should be idempotent. We may
+ // move head to the local head in some cases.
if err := i.syncd.dag.moveHead(obj, st.resolvVal.Mutation.Version); err != nil {
return err
}
diff --git a/runtimes/google/vsync/initiator_test.go b/runtimes/google/vsync/initiator_test.go
index 3c9a063..468c882 100644
--- a/runtimes/google/vsync/initiator_test.go
+++ b/runtimes/google/vsync/initiator_test.go
@@ -172,22 +172,27 @@
t.Errorf("GetNode() can not find object %d %d in DAG, err %v", objid, i, err)
}
}
-
if err := s.hdlInitiator.detectConflicts(); err != nil {
t.Fatalf("detectConflicts failed with err %v", err)
}
if len(s.hdlInitiator.updObjects) != 1 {
t.Errorf("Unexpected number of updated objects %d", len(s.hdlInitiator.updObjects))
}
- m, err := s.hdlInitiator.resolveConflicts()
- if err != nil {
+ st := s.hdlInitiator.updObjects[objid]
+ if st.isConflict {
+ t.Errorf("Detected a conflict %v", st)
+ }
+ if st.newHead != 2 || st.oldHead != storage.NoVersion {
+ t.Errorf("Conflict detection didn't succeed %v", st)
+ }
+ if err := s.hdlInitiator.resolveConflicts(); err != nil {
t.Fatalf("resolveConflicts failed with err %v", err)
}
- if len(m) != 1 || m[0].PriorVersion != storage.NoVersion || m[0].Version != 2 {
- t.Errorf("Unexpected mutations %v", m)
+ if err := s.hdlInitiator.updateStoreAndSync(nil, GenVector{}, minGens, GenVector{}, "VeyronPhone"); err != nil {
+ t.Fatalf("updateStoreAndSync failed with err %v", err)
}
- if err = s.hdlInitiator.updateLogAndDag(); err != nil {
- t.Fatalf("updateLogAndDag failed with err %v", err)
+ if st.resolvVal.Mutation.PriorVersion != storage.NoVersion || st.resolvVal.Mutation.Version != 2 {
+ t.Errorf("Mutation generation is not accurate %v", st)
}
if s.log.head.Curgen != 1 || s.log.head.Curlsn != 0 || s.log.head.Curorder != 1 {
t.Errorf("Data mismatch in log header %v", s.log.head)
@@ -270,15 +275,21 @@
if len(s.hdlInitiator.updObjects) != 1 {
t.Errorf("Unexpected number of updated objects %d", len(s.hdlInitiator.updObjects))
}
- m, err := s.hdlInitiator.resolveConflicts()
- if err != nil {
+ st := s.hdlInitiator.updObjects[objid]
+ if st.isConflict {
+ t.Errorf("Detected a conflict %v", st)
+ }
+ if st.newHead != 2 || st.oldHead != storage.NoVersion {
+ t.Errorf("Conflict detection didn't succeed %v", st)
+ }
+ if err := s.hdlInitiator.resolveConflicts(); err != nil {
t.Fatalf("resolveConflicts failed with err %v", err)
}
- if len(m) != 1 || m[0].PriorVersion != storage.NoVersion || m[0].Version != 2 {
- t.Errorf("Unexpected mutations %v", m)
+ if err := s.hdlInitiator.updateStoreAndSync(nil, GenVector{}, minGens, GenVector{}, "VeyronPhone"); err != nil {
+ t.Fatalf("updateStoreAndSync failed with err %v", err)
}
- if err = s.hdlInitiator.updateLogAndDag(); err != nil {
- t.Fatalf("updateLogAndDag failed with err %v", err)
+ if st.resolvVal.Mutation.PriorVersion != storage.NoVersion || st.resolvVal.Mutation.Version != 2 {
+ t.Errorf("Mutation generation is not accurate %v", st)
}
if s.log.head.Curgen != 1 || s.log.head.Curlsn != 0 || s.log.head.Curorder != 1 {
t.Errorf("Data mismatch in log header %v", s.log.head)
@@ -362,22 +373,27 @@
v = v + 1
}
}
-
if err := s.hdlInitiator.detectConflicts(); err != nil {
t.Fatalf("detectConflicts failed with err %v", err)
}
if len(s.hdlInitiator.updObjects) != 1 {
t.Errorf("Unexpected number of updated objects %d", len(s.hdlInitiator.updObjects))
}
- m, err := s.hdlInitiator.resolveConflicts()
- if err != nil {
+ st := s.hdlInitiator.updObjects[objid]
+ if st.isConflict {
+ t.Errorf("Detected a conflict %v", st)
+ }
+ if st.newHead != 5 || st.oldHead != 2 {
+ t.Errorf("Conflict detection didn't succeed %v", st)
+ }
+ if err := s.hdlInitiator.resolveConflicts(); err != nil {
t.Fatalf("resolveConflicts failed with err %v", err)
}
- if len(m) != 1 || m[0].PriorVersion != 2 || m[0].Version != 5 {
- t.Errorf("Unexpected versions %v", m[0])
+ if err := s.hdlInitiator.updateStoreAndSync(nil, GenVector{}, minGens, GenVector{}, "VeyronPhone"); err != nil {
+ t.Fatalf("updateStoreAndSync failed with err %v", err)
}
- if err = s.hdlInitiator.updateLogAndDag(); err != nil {
- t.Fatalf("updateLogAndDag failed with err %v", err)
+ if st.resolvVal.Mutation.PriorVersion != 2 || st.resolvVal.Mutation.Version != 5 {
+ t.Errorf("Mutation generation is not accurate %v", st)
}
if s.log.head.Curgen != 1 || s.log.head.Curlsn != 3 || s.log.head.Curorder != 1 {
t.Errorf("Data mismatch in log header %v", s.log.head)
@@ -437,16 +453,14 @@
t.Errorf("Data mismatch for generation metadata: %v instead of %v",
curVal, expVal)
}
-
if err := s.hdlInitiator.detectConflicts(); err != nil {
t.Fatalf("detectConflicts failed with err %v", err)
}
- m, err := s.hdlInitiator.resolveConflicts()
- if err != nil {
+ if err := s.hdlInitiator.resolveConflicts(); err != nil {
t.Fatalf("resolveConflicts failed with err %v", err)
}
- if err = s.hdlInitiator.updateLogAndDag(); err != nil {
- t.Fatalf("updateLogAndDag failed with err %v", err)
+ if err := s.hdlInitiator.updateStoreAndSync(nil, GenVector{}, minGens, GenVector{}, "VeyronPhone"); err != nil {
+ t.Fatalf("updateStoreAndSync failed with err %v", err)
}
objid, err := strToObjID("12345")
@@ -466,6 +480,9 @@
if curRec.ObjID != objid {
t.Errorf("Data mismatch in log record %v", curRec)
}
+ if devid == "VeyronTab" && index == 3 && curRec.RecType != LinkRec {
+ t.Errorf("Data mismatch in log record %v", curRec)
+ }
// Verify DAG state.
if _, err := s.dag.getNode(objid, v); err != nil {
t.Errorf("GetNode() can not find object %d %d in DAG, err %v", objid, i, err)
@@ -473,19 +490,25 @@
v = v + 1
}
}
-
- if len(m) != 1 || len(s.hdlInitiator.updObjects) != 1 {
- t.Errorf("Unexpected number of updated objects %d %d", len(m), len(s.hdlInitiator.updObjects))
+ if len(s.hdlInitiator.updObjects) != 1 {
+ t.Errorf("Unexpected number of updated objects %d", len(s.hdlInitiator.updObjects))
}
- if m[0].PriorVersion != 2 {
- t.Errorf("Unexpected version %v", m[0])
+ st := s.hdlInitiator.updObjects[objid]
+ if !st.isConflict {
+ t.Errorf("Didn't detect a conflict %v", st)
+ }
+ if st.newHead != 5 || st.oldHead != 2 {
+ t.Errorf("Conflict detection didn't succeed %v", st)
+ }
+ if st.resolvVal.Mutation.PriorVersion != 2 || st.resolvVal.Mutation.Version != 5 {
+ t.Errorf("Mutation generation is not accurate %v", st)
}
// Curlsn == 4 for the log record that resolves conflict.
if s.log.head.Curgen != 1 || s.log.head.Curlsn != 4 || s.log.head.Curorder != 1 {
t.Errorf("Data mismatch in log header %v", s.log.head)
}
// Verify DAG state.
- if head, err := s.dag.getHead(objid); err != nil || head <= 5 {
+ if head, err := s.dag.getHead(objid); err != nil || head != 5 {
t.Errorf("Invalid object %d head in DAG %s, err %v", objid, head, err)
}
}
@@ -555,12 +578,11 @@
if err := s.hdlInitiator.detectConflicts(); err != nil {
t.Fatalf("detectConflicts failed with err %v", err)
}
- m, err := s.hdlInitiator.resolveConflicts()
- if err != nil {
+ if err := s.hdlInitiator.resolveConflicts(); err != nil {
t.Fatalf("resolveConflicts failed with err %v", err)
}
- if err = s.hdlInitiator.updateLogAndDag(); err != nil {
- t.Fatalf("updateLogAndDag failed with err %v", err)
+ if err := s.hdlInitiator.updateStoreAndSync(nil, GenVector{}, minGens, GenVector{}, "VeyronPhone"); err != nil {
+ t.Fatalf("updateStoreAndSync failed with err %v", err)
}
objid, err := strToObjID("12345")
@@ -580,6 +602,9 @@
if curRec.ObjID != objid {
t.Errorf("Data mismatch in log record %v", curRec)
}
+ if devid == "VeyronTab" && index == 3 && curRec.RecType != LinkRec {
+ t.Errorf("Data mismatch in log record %v", curRec)
+ }
// Verify DAG state.
if _, err := s.dag.getNode(objid, v); err != nil {
t.Errorf("GetNode() can not find object %d %d in DAG, err %v", objid, i, err)
@@ -587,18 +612,357 @@
v = v + 1
}
}
- if len(m) != 1 || len(s.hdlInitiator.updObjects) != 1 {
- t.Errorf("Unexpected number of updated objects %d %d", len(m), len(s.hdlInitiator.updObjects))
+ if len(s.hdlInitiator.updObjects) != 1 {
+ t.Errorf("Unexpected number of updated objects %d", len(s.hdlInitiator.updObjects))
}
- if m[0].PriorVersion != 2 {
- t.Errorf("Unexpected version %v", m[0])
+ st := s.hdlInitiator.updObjects[objid]
+ if !st.isConflict {
+ t.Errorf("Didn't detect a conflict %v", st)
+ }
+ if st.newHead != 5 || st.oldHead != 2 {
+ t.Errorf("Conflict detection didn't succeed %v", st)
+ }
+ if st.resolvVal.Mutation.PriorVersion != 2 || st.resolvVal.Mutation.Version != 5 {
+ t.Errorf("Mutation generation is not accurate %v", st)
}
// Curlsn == 4 for the log record that resolves conflict.
if s.log.head.Curgen != 1 || s.log.head.Curlsn != 4 || s.log.head.Curorder != 2 {
t.Errorf("Data mismatch in log header %v", s.log.head)
}
// Verify DAG state.
- if head, err := s.dag.getHead(objid); err != nil || head <= 5 {
+ if head, err := s.dag.getHead(objid); err != nil || head != 5 {
+ t.Errorf("Invalid object %d head in DAG %s, err %v", objid, head, err)
+ }
+}
+
+// TestInitiatorBlessNoConf0 tests that a local and a remote log
+// record stream can be correctly applied, when the conflict is
+// resolved by a blessing. In this test, local head of the object is
+// unchanged at the end of replay. Commands are in files
+// testdata/<local-init-00.sync,remote-noconf-link-00.log.sync>.
+func TestInitiatorBlessNoConf0(t *testing.T) {
+ dir, err := createTempDir()
+ if err != nil {
+ t.Errorf("Could not create tempdir %v", err)
+ }
+ // Set a large value to prevent the threads from firing.
+ // Test is not thread safe.
+ peerSyncInterval = 1 * time.Hour
+ garbageCollectInterval = 1 * time.Hour
+ s := NewSyncd("", "", "VeyronTab", dir, "", 0)
+
+ defer s.Close()
+ defer os.RemoveAll(dir)
+
+ if _, err = logReplayCommands(s.log, "local-init-00.sync"); err != nil {
+ t.Error(err)
+ }
+ stream, err := createReplayStream("remote-noconf-link-00.log.sync")
+ if err != nil {
+ t.Fatalf("createReplayStream failed with err %v", err)
+ }
+
+ var minGens GenVector
+ if minGens, err = s.hdlInitiator.processLogStream(stream); err != nil {
+ t.Fatalf("processLogStream failed with err %v", err)
+ }
+ if err := s.hdlInitiator.detectConflicts(); err != nil {
+ t.Fatalf("detectConflicts failed with err %v", err)
+ }
+ // Check that there are no conflicts.
+ if len(s.hdlInitiator.updObjects) != 1 {
+ t.Errorf("Too many objects %v", len(s.hdlInitiator.updObjects))
+ }
+ objid, err := strToObjID("12345")
+ if err != nil {
+ t.Errorf("Could not create objid %v", err)
+ }
+ st := s.hdlInitiator.updObjects[objid]
+ if st.isConflict {
+ t.Errorf("Detected a conflict %v", st)
+ }
+ if st.newHead != 2 || st.oldHead != 2 {
+ t.Errorf("Conflict detection didn't succeed %v", st)
+ }
+
+ if err := s.hdlInitiator.resolveConflicts(); err != nil {
+ t.Fatalf("resolveConflicts failed with err %v", err)
+ }
+ if err := s.hdlInitiator.updateStoreAndSync(nil, GenVector{}, minGens, GenVector{}, "VeyronPhone"); err != nil {
+ t.Fatalf("updateStoreAndSync failed with err %v", err)
+ }
+ if st.resolvVal.Mutation.Version != 2 {
+ t.Errorf("Mutation generation is not accurate %v", st)
+ }
+ // No new log records should be added.
+ if s.log.head.Curgen != 1 || s.log.head.Curlsn != 3 || s.log.head.Curorder != 1 {
+ t.Errorf("Data mismatch in log header %v", s.log.head)
+ }
+ // Verify DAG state.
+ if head, err := s.dag.getHead(objid); err != nil || head != 2 {
+ t.Errorf("Invalid object %d head in DAG %s, err %v", objid, head, err)
+ }
+}
+
+// TestInitiatorBlessNoConf1 tests that a local and a remote log
+// record stream can be correctly applied, when the conflict is
+// resolved by a blessing. In this test, local head of the object is
+// updated at the end of the replay. Commands are in files
+// testdata/<local-init-00.sync,remote-noconf-link-01.log.sync>.
+func TestInitiatorBlessNoConf1(t *testing.T) {
+ dir, err := createTempDir()
+ if err != nil {
+ t.Errorf("Could not create tempdir %v", err)
+ }
+ // Set a large value to prevent the threads from firing.
+ // Test is not thread safe.
+ peerSyncInterval = 1 * time.Hour
+ garbageCollectInterval = 1 * time.Hour
+ s := NewSyncd("", "", "VeyronTab", dir, "", 0)
+
+ defer s.Close()
+ defer os.RemoveAll(dir)
+
+ if _, err = logReplayCommands(s.log, "local-init-00.sync"); err != nil {
+ t.Error(err)
+ }
+ stream, err := createReplayStream("remote-noconf-link-01.log.sync")
+ if err != nil {
+ t.Fatalf("createReplayStream failed with err %v", err)
+ }
+
+ var minGens GenVector
+ if minGens, err = s.hdlInitiator.processLogStream(stream); err != nil {
+ t.Fatalf("processLogStream failed with err %v", err)
+ }
+ if err := s.hdlInitiator.detectConflicts(); err != nil {
+ t.Fatalf("detectConflicts failed with err %v", err)
+ }
+ // Check that there are no conflicts.
+ if len(s.hdlInitiator.updObjects) != 1 {
+ t.Errorf("Too many objects %v", len(s.hdlInitiator.updObjects))
+ }
+ objid, err := strToObjID("12345")
+ if err != nil {
+ t.Errorf("Could not create objid %v", err)
+ }
+ st := s.hdlInitiator.updObjects[objid]
+ if st.isConflict {
+ t.Errorf("Detected a conflict %v", st)
+ }
+ if st.newHead != 3 || st.oldHead != 2 {
+ t.Errorf("Conflict detection didn't succeed %v", st)
+ }
+
+ if err := s.hdlInitiator.resolveConflicts(); err != nil {
+ t.Fatalf("resolveConflicts failed with err %v", err)
+ }
+ if err := s.hdlInitiator.updateStoreAndSync(nil, GenVector{}, minGens, GenVector{}, "VeyronPhone"); err != nil {
+ t.Fatalf("updateStoreAndSync failed with err %v", err)
+ }
+ if st.resolvVal.Mutation.Version != 3 || st.resolvVal.Mutation.PriorVersion != 2 {
+ t.Errorf("Mutation generation is not accurate %v", st)
+ }
+ // No new log records should be added.
+ if s.log.head.Curgen != 1 || s.log.head.Curlsn != 3 || s.log.head.Curorder != 1 {
+ t.Errorf("Data mismatch in log header %v", s.log.head)
+ }
+ // Verify DAG state.
+ if head, err := s.dag.getHead(objid); err != nil || head != 3 {
+ t.Errorf("Invalid object %d head in DAG %s, err %v", objid, head, err)
+ }
+}
+
+// TestInitiatorBlessNoConf2 tests that a local and a remote log
+// record stream can be correctly applied, when the conflict is
+// resolved by a blessing. In this test, local head of the object is
+// updated at the end of the first replay. In the second replay, a
+// conflict resolved locally is rediscovered since it was also
+// resolved remotely. Commands are in files
+// testdata/<local-init-00.sync,remote-noconf-link-02.log.sync,
+// remote-noconf-link-repeat.log.sync>.
+func TestInitiatorBlessNoConf2(t *testing.T) {
+ dir, err := createTempDir()
+ if err != nil {
+ t.Errorf("Could not create tempdir %v", err)
+ }
+ // Set a large value to prevent the threads from firing.
+ // Test is not thread safe.
+ peerSyncInterval = 1 * time.Hour
+ garbageCollectInterval = 1 * time.Hour
+ s := NewSyncd("", "", "VeyronTab", dir, "", 0)
+
+ defer s.Close()
+ defer os.RemoveAll(dir)
+
+ if _, err = logReplayCommands(s.log, "local-init-00.sync"); err != nil {
+ t.Error(err)
+ }
+ stream, err := createReplayStream("remote-noconf-link-02.log.sync")
+ if err != nil {
+ t.Fatalf("createReplayStream failed with err %v", err)
+ }
+
+ var minGens GenVector
+ if minGens, err = s.hdlInitiator.processLogStream(stream); err != nil {
+ t.Fatalf("processLogStream failed with err %v", err)
+ }
+ if err := s.hdlInitiator.detectConflicts(); err != nil {
+ t.Fatalf("detectConflicts failed with err %v", err)
+ }
+ // Check that there are no conflicts.
+ if len(s.hdlInitiator.updObjects) != 1 {
+ t.Errorf("Too many objects %v", len(s.hdlInitiator.updObjects))
+ }
+ objid, err := strToObjID("12345")
+ if err != nil {
+ t.Errorf("Could not create objid %v", err)
+ }
+ st := s.hdlInitiator.updObjects[objid]
+ if st.isConflict {
+ t.Errorf("Detected a conflict %v", st)
+ }
+ if st.newHead != 4 || st.oldHead != 2 {
+ t.Errorf("Conflict detection didn't succeed %v", st)
+ }
+
+ if err := s.hdlInitiator.resolveConflicts(); err != nil {
+ t.Fatalf("resolveConflicts failed with err %v", err)
+ }
+ if err := s.hdlInitiator.updateStoreAndSync(nil, GenVector{"VeyronTab": 0}, minGens, GenVector{}, "VeyronPhone"); err != nil {
+ t.Fatalf("updateStoreAndSync failed with err %v", err)
+ }
+ if st.resolvVal.Mutation.Version != 4 || st.resolvVal.Mutation.PriorVersion != 2 {
+ t.Errorf("Mutation generation is not accurate %v", st)
+ }
+ // No new log records should be added.
+ if s.log.head.Curgen != 1 || s.log.head.Curlsn != 3 || s.log.head.Curorder != 2 {
+ t.Errorf("Data mismatch in log header %v", s.log.head)
+ }
+ // Verify DAG state.
+ if head, err := s.dag.getHead(objid); err != nil || head != 4 {
+ t.Errorf("Invalid object %d head in DAG %s, err %v", objid, head, err)
+ }
+
+ // Test simultaneous conflict resolution.
+ stream, err = createReplayStream("remote-noconf-link-repeat.log.sync")
+ if err != nil {
+ t.Fatalf("createReplayStream failed with err %v", err)
+ }
+
+ if minGens, err = s.hdlInitiator.processLogStream(stream); err != nil {
+ t.Fatalf("processLogStream failed with err %v", err)
+ }
+ if err := s.hdlInitiator.detectConflicts(); err != nil {
+ t.Fatalf("detectConflicts failed with err %v", err)
+ }
+ // Check that there are no conflicts.
+ if len(s.hdlInitiator.updObjects) != 1 {
+ t.Errorf("Too many objects %v", len(s.hdlInitiator.updObjects))
+ }
+ st = s.hdlInitiator.updObjects[objid]
+ if st.isConflict {
+ t.Errorf("Detected a conflict %v", st)
+ }
+ if st.newHead != 4 || st.oldHead != 4 {
+ t.Errorf("Conflict detection didn't succeed %v", st)
+ }
+
+ if err := s.hdlInitiator.resolveConflicts(); err != nil {
+ t.Fatalf("resolveConflicts failed with err %v", err)
+ }
+ if err := s.hdlInitiator.updateStoreAndSync(nil, GenVector{}, minGens, GenVector{}, "VeyronLaptop"); err != nil {
+ t.Fatalf("updateStoreAndSync failed with err %v", err)
+ }
+ if st.resolvVal.Mutation.Version != 4 {
+ t.Errorf("Mutation generation is not accurate %v", st)
+ }
+ // No new log records should be added.
+ if s.log.head.Curgen != 1 || s.log.head.Curlsn != 3 || s.log.head.Curorder != 3 {
+ t.Errorf("Data mismatch in log header %v", s.log.head)
+ }
+ // Verify DAG state.
+ if head, err := s.dag.getHead(objid); err != nil || head != 4 {
+ t.Errorf("Invalid object %d head in DAG %s, err %v", objid, head, err)
+ }
+}
+
+// TestInitiatorBlessConf tests that a local and a remote log record
+// stream can be correctly applied, when the conflict is resolved by a
+// blessing. Commands are in files
+// testdata/<local-init-00.sync,remote-conf-link.log.sync>.
+func TestInitiatorBlessConf(t *testing.T) {
+ dir, err := createTempDir()
+ if err != nil {
+ t.Errorf("Could not create tempdir %v", err)
+ }
+ // Set a large value to prevent the threads from firing.
+ // Test is not thread safe.
+ peerSyncInterval = 1 * time.Hour
+ garbageCollectInterval = 1 * time.Hour
+ s := NewSyncd("", "", "VeyronTab", dir, "", 0)
+
+ defer s.Close()
+ defer os.RemoveAll(dir)
+
+ if _, err = logReplayCommands(s.log, "local-init-00.sync"); err != nil {
+ t.Error(err)
+ }
+ stream, err := createReplayStream("remote-conf-link.log.sync")
+ if err != nil {
+ t.Fatalf("createReplayStream failed with err %v", err)
+ }
+
+ var minGens GenVector
+ if minGens, err = s.hdlInitiator.processLogStream(stream); err != nil {
+ t.Fatalf("processLogStream failed with err %v", err)
+ }
+ if err := s.hdlInitiator.detectConflicts(); err != nil {
+ t.Fatalf("detectConflicts failed with err %v", err)
+ }
+ // Check that there are no conflicts.
+ if len(s.hdlInitiator.updObjects) != 1 {
+ t.Errorf("Too many objects %v", len(s.hdlInitiator.updObjects))
+ }
+ objid, err := strToObjID("12345")
+ if err != nil {
+ t.Errorf("Could not create objid %v", err)
+ }
+ st := s.hdlInitiator.updObjects[objid]
+ if !st.isConflict {
+ t.Errorf("Didn't detect a conflict %v", st)
+ }
+ if st.newHead != 3 || st.oldHead != 2 || st.ancestor != 1 {
+ t.Errorf("Conflict detection didn't succeed %v", st)
+ }
+
+ if err := s.hdlInitiator.resolveConflicts(); err != nil {
+ t.Fatalf("resolveConflicts failed with err %v", err)
+ }
+ if st.resolvVal.Mutation.Version != 3 {
+ t.Errorf("Mutation generation is not accurate %v", st)
+ }
+
+ if err := s.hdlInitiator.updateStoreAndSync(nil, GenVector{}, minGens, GenVector{}, "VeyronPhone"); err != nil {
+ t.Fatalf("updateStoreAndSync failed with err %v", err)
+ }
+ if st.resolvVal.Mutation.Version != 3 || st.resolvVal.Mutation.PriorVersion != 2 {
+ t.Errorf("Mutation generation is not accurate %v", st)
+ }
+ // New log records should be added.
+ if s.log.head.Curgen != 1 || s.log.head.Curlsn != 4 || s.log.head.Curorder != 1 {
+ t.Errorf("Data mismatch in log header %v", s.log.head)
+ }
+ curRec, err := s.log.getLogRec(s.id, GenID(1), LSN(3))
+ if err != nil || curRec == nil {
+ t.Fatalf("GetLogRec() can not find object %s:1:3 in log file err %v",
+ s.id, err)
+ }
+ if curRec.ObjID != objid || curRec.RecType != LinkRec {
+ t.Errorf("Data mismatch in log record %v", curRec)
+ }
+ // Verify DAG state.
+ if head, err := s.dag.getHead(objid); err != nil || head != 3 {
t.Errorf("Invalid object %d head in DAG %s, err %v", objid, head, err)
}
}
diff --git a/runtimes/google/vsync/replay_test.go b/runtimes/google/vsync/replay_test.go
index 41e82cd..02cd561 100644
--- a/runtimes/google/vsync/replay_test.go
+++ b/runtimes/google/vsync/replay_test.go
@@ -22,6 +22,8 @@
addLocal = iota
addRemote
setDevTable
+ linkLocal
+ linkRemote
)
type syncCommand struct {
@@ -140,6 +142,38 @@
cmd := syncCommand{cmd: setDevTable, devID: DeviceID(args[1]), genVec: genVec}
cmds = append(cmds, cmd)
+ case "linkl", "linkr":
+ expNargs := 6
+ if nargs != expNargs {
+ return nil, fmt.Errorf("%s:%d: need %d args instead of %d", file, lineno, expNargs, nargs)
+ }
+
+ version, err := strToVersion(args[2])
+ if err != nil {
+ return nil, fmt.Errorf("%s:%d: invalid version: %s", file, lineno, args[2])
+ }
+ if args[3] == "" {
+ return nil, fmt.Errorf("%s:%d: parent (to-node) version not specified", file, lineno)
+ }
+ if args[4] != "" {
+ return nil, fmt.Errorf("%s:%d: cannot specify a 2nd parent (to-node): %s", file, lineno, args[4])
+ }
+ parent, err := strToVersion(args[3])
+ if err != nil {
+ return nil, fmt.Errorf("%s:%d: invalid parent (to-node) version: %s", file, lineno, args[3])
+ }
+
+ cmd := syncCommand{version: version, parents: []storage.Version{parent}, logrec: args[5]}
+ if args[0] == "linkl" {
+ cmd.cmd = linkLocal
+ } else {
+ cmd.cmd = linkRemote
+ }
+ if cmd.objID, err = strToObjID(args[1]); err != nil {
+ return nil, fmt.Errorf("%s:%d: invalid object ID: %s", file, lineno, args[1])
+ }
+ cmds = append(cmds, cmd)
+
default:
return nil, fmt.Errorf("%s:%d: invalid operation: %s", file, lineno, args[0])
}
@@ -171,6 +205,20 @@
return fmt.Errorf("cannot add remote node %d:%d to DAG: %v", cmd.objID, cmd.version, err)
}
dag.flush()
+
+ case linkLocal:
+ if err = dag.addParent(cmd.objID, cmd.version, cmd.parents[0], false); err != nil {
+ return fmt.Errorf("cannot add local parent %d to DAG node %d:%d: %v",
+ cmd.parents[0], cmd.objID, cmd.version, err)
+ }
+ dag.flush()
+
+ case linkRemote:
+ if err = dag.addParent(cmd.objID, cmd.version, cmd.parents[0], true); err != nil {
+ return fmt.Errorf("cannot add remote parent %d to DAG node %d:%d: %v",
+ cmd.parents[0], cmd.objID, cmd.version, err)
+ }
+ dag.flush()
}
}
return nil
diff --git a/runtimes/google/vsync/util_test.go b/runtimes/google/vsync/util_test.go
index 783d23f..bd9356c 100644
--- a/runtimes/google/vsync/util_test.go
+++ b/runtimes/google/vsync/util_test.go
@@ -78,7 +78,7 @@
for _, cmd := range cmds {
switch cmd.cmd {
case addLocal:
- err = log.processWatchRecord(cmd.objID, cmd.version, cmd.parents, &LogValue{})
+ err = log.processWatchRecord(cmd.objID, cmd.version, cmd.parents, &LogValue{Mutation: raw.Mutation{Version: cmd.version}})
if err != nil {
return nil, fmt.Errorf("cannot replay local log records %d:%s err %v",
cmd.objID, cmd.version, err)
@@ -121,25 +121,31 @@
stream := newStream()
for _, cmd := range cmds {
+ id, gnum, lsn, err := splitLogRecKey(cmd.logrec)
+ if err != nil {
+ return nil, err
+ }
+ rec := LogRec{
+ DevID: id,
+ GNum: gnum,
+ LSN: lsn,
+ ObjID: cmd.objID,
+ CurVers: cmd.version,
+ Parents: cmd.parents,
+ Value: LogValue{
+ Mutation: raw.Mutation{Version: cmd.version},
+ },
+ }
+
switch cmd.cmd {
case addRemote:
- id, gnum, lsn, err := splitLogRecKey(cmd.logrec)
- if err != nil {
- return nil, err
- }
- rec := LogRec{
- DevID: id,
- GNum: gnum,
- LSN: lsn,
- ObjID: cmd.objID,
- CurVers: cmd.version,
- Parents: cmd.parents,
- Value: LogValue{
- Mutation: raw.Mutation{Version: cmd.version},
- },
- }
- stream.add(rec)
+ rec.RecType = NodeRec
+ case linkRemote:
+ rec.RecType = LinkRec
+ default:
+ return nil, err
}
+ stream.add(rec)
}
return stream, nil
diff --git a/runtimes/google/vsync/vsync.vdl b/runtimes/google/vsync/vsync.vdl
index 4de7c9c..aa06ec9 100644
--- a/runtimes/google/vsync/vsync.vdl
+++ b/runtimes/google/vsync/vsync.vdl
@@ -15,13 +15,21 @@
// GenVector is the generation vector.
type GenVector map[DeviceID]GenID
+const (
+ // NodeRec type log record adds a new node in the dag.
+ NodeRec = byte(0)
+ // LinkRec type log record adds a new link in the dag.
+ LinkRec = byte(1)
+)
+
// LogRec represents a single log record that is exchanged between two
// peers.
//
// It contains log related metadata: DevID is the id of the
// device that created the log record, GNum is the ID of the
-// generation that the log record is part of, and LSN is the log
-// sequence number of the log record in the generation GNum.
+// generation that the log record is part of, LSN is the log
+// sequence number of the log record in the generation GNum,
+// and RecType is the type of log record.
//
// It also contains information relevant to the updates to an object
// in the store: ObjID is the id of the object that was
@@ -34,6 +42,7 @@
DevID DeviceID
GNum GenID
LSN LSN
+ RecType byte
// Object related information.
ObjID storage.ID
CurVers storage.Version
diff --git a/runtimes/google/vsync/vsync.vdl.go b/runtimes/google/vsync/vsync.vdl.go
index 2055734..e2120db 100644
--- a/runtimes/google/vsync/vsync.vdl.go
+++ b/runtimes/google/vsync/vsync.vdl.go
@@ -35,8 +35,9 @@
//
// It contains log related metadata: DevID is the id of the
// device that created the log record, GNum is the ID of the
-// generation that the log record is part of, and LSN is the log
-// sequence number of the log record in the generation GNum.
+// generation that the log record is part of, LSN is the log
+// sequence number of the log record in the generation GNum,
+// and RecType is the type of log record.
//
// It also contains information relevant to the updates to an object
// in the store: ObjID is the id of the object that was
@@ -46,9 +47,10 @@
// the object mutation.
type LogRec struct {
// Log related information.
- DevID DeviceID
- GNum GenID
- LSN LSN
+ DevID DeviceID
+ GNum GenID
+ LSN LSN
+ RecType byte
// Object related information.
ObjID storage.ID
CurVers storage.Version
@@ -71,6 +73,14 @@
Continue bool
}
+const (
+ // NodeRec type log record adds a new node in the dag.
+ NodeRec = byte(0)
+
+ // LinkRec type log record adds a new link in the dag.
+ LinkRec = byte(1)
+)
+
// Sync allows a device to GetDeltas from another device.
// Sync is the interface the client binds and uses.
// Sync_ExcludingUniversal is the interface without internal framework-added methods
@@ -303,6 +313,7 @@
_gen_wiretype.FieldType{Type: 0x41, Name: "DevID"},
_gen_wiretype.FieldType{Type: 0x42, Name: "GNum"},
_gen_wiretype.FieldType{Type: 0x45, Name: "LSN"},
+ _gen_wiretype.FieldType{Type: 0x46, Name: "RecType"},
_gen_wiretype.FieldType{Type: 0x47, Name: "ObjID"},
_gen_wiretype.FieldType{Type: 0x48, Name: "CurVers"},
_gen_wiretype.FieldType{Type: 0x49, Name: "Parents"},
diff --git a/runtimes/google/vsync/watcher.go b/runtimes/google/vsync/watcher.go
index a0801a7..43b2238 100644
--- a/runtimes/google/vsync/watcher.go
+++ b/runtimes/google/vsync/watcher.go
@@ -183,18 +183,9 @@
}
}
- // Flush the DAG and Log DBs. If the resume marker changed, update the device table and flush it.
- // TODO(rdaoud): this should be conditional, e.g. don't flush if Initiator is in-progress.
- // TODO(rdaoud): flushes can also be batched across multiple change-batches.
- w.syncd.dag.flush()
- if err := w.syncd.log.flush(); err != nil {
- return fmt.Errorf("cannot flush log DB: %s", err)
- }
+ // If the resume marker changed, update the device table.
if lastResmark != nil {
w.syncd.devtab.head.Resmark = lastResmark
- if err := w.syncd.devtab.flush(); err != nil {
- return fmt.Errorf("cannot flush device table DB: %s", err)
- }
}
return nil
diff --git a/services/mgmt/node/impl/invoker.go b/services/mgmt/node/impl/invoker.go
index e32ff53..3a3df2e 100644
--- a/services/mgmt/node/impl/invoker.go
+++ b/services/mgmt/node/impl/invoker.go
@@ -430,7 +430,7 @@
return errOperationFailed
}
// Wait for the child process to start.
- testTimeout := 2 * time.Second
+ testTimeout := 10 * time.Second
if err := handle.WaitForReady(testTimeout); err != nil {
vlog.Errorf("WaitForReady(%v) failed: %v", testTimeout, err)
if err := cmd.Process.Kill(); err != nil {
diff --git a/services/store/memstore/watch/watcher.go b/services/store/memstore/watch/watcher.go
index 1ef8692..225e6f4 100644
--- a/services/store/memstore/watch/watcher.go
+++ b/services/store/memstore/watch/watcher.go
@@ -106,13 +106,12 @@
return err
}
- // Retrieve the initial timestamp. Changes that occured at or before the
- // initial timestamp will not be sent.
resumeMarker := req.ResumeMarker
- initialTimestamp, err := resumeMarkerToTimestamp(resumeMarker)
+ filter, err := newChangeFilter(resumeMarker)
if err != nil {
return err
}
+
if isNowResumeMarker(resumeMarker) {
sendChanges(stream, []watch.Change{initialStateSkippedChange})
}
@@ -127,9 +126,13 @@
if err != nil {
return err
}
- err = processChanges(stream, changes, initialTimestamp, st.Timestamp())
- if err != nil {
+ timestamp := st.Timestamp()
+ if send, err := filter.shouldProcessChanges(timestamp); err != nil {
return err
+ } else if send {
+ if err := processChanges(stream, changes, timestamp); err != nil {
+ return err
+ }
}
for {
@@ -142,9 +145,13 @@
if err != nil {
return err
}
- err = processChanges(stream, changes, initialTimestamp, mu.Timestamp)
- if err != nil {
+ timestamp := mu.Timestamp
+ if send, err := filter.shouldProcessChanges(timestamp); err != nil {
return err
+ } else if send {
+ if err := processChanges(stream, changes, timestamp); err != nil {
+ return err
+ }
}
}
}
@@ -172,10 +179,78 @@
return newSyncProcessor(client)
}
-func processChanges(stream watch.WatcherServiceWatchStream, changes []watch.Change, initialTimestamp, timestamp uint64) error {
- if timestamp <= initialTimestamp {
- return nil
+type changeFilter interface {
+ // shouldProcessChanges determines whether to process changes with the given
+ // timestamp. Changes should appear in the sequence of the store log, and
+ // timestamps should be monotonically increasing.
+ shouldProcessChanges(timestamp uint64) (bool, error)
+}
+
+type baseFilter struct {
+ // initialTimestamp is the minimum timestamp of the first change sent.
+ initialTimestamp uint64
+ // crossedInitialTimestamp is true if a change with timestamp >=
+ // initialTimestamp has already been sent.
+ crossedInitialTimestamp bool
+}
+
+// onOrAfterFilter accepts any change with timestamp >= initialTimestamp.
+type onOrAfterFilter struct {
+ baseFilter
+}
+
+// onAndAfterFilter accepts any change with timestamp >= initialTimestamp, but
+// requires the first change to have timestamp = initialTimestamp.
+type onAndAfterFilter struct {
+ baseFilter
+}
+
+// newChangeFilter creates a changeFilter that processes changes only
+// at or after the requested resumeMarker.
+func newChangeFilter(resumeMarker []byte) (changeFilter, error) {
+ if len(resumeMarker) == 0 {
+ return &onOrAfterFilter{baseFilter{0, false}}, nil
}
+ if isNowResumeMarker(resumeMarker) {
+ // TODO(tilaks): Get the current resume marker from the log.g
+ return &onOrAfterFilter{baseFilter{uint64(time.Now().UnixNano()), false}}, nil
+ }
+ if len(resumeMarker) != 8 {
+ return nil, ErrUnknownResumeMarker
+ }
+ return &onAndAfterFilter{baseFilter{binary.BigEndian.Uint64(resumeMarker), false}}, nil
+}
+
+func (f *onOrAfterFilter) shouldProcessChanges(timestamp uint64) (bool, error) {
+ // Bypass checks if a change with timestamp >= initialTimestamp has already
+ // been sent.
+ if !f.crossedInitialTimestamp {
+ if timestamp < f.initialTimestamp {
+ return false, nil
+ }
+ }
+ f.crossedInitialTimestamp = true
+ return true, nil
+}
+
+func (f *onAndAfterFilter) shouldProcessChanges(timestamp uint64) (bool, error) {
+ // Bypass checks if a change with timestamp >= initialTimestamp has already
+ // been sent.
+ if !f.crossedInitialTimestamp {
+ if timestamp < f.initialTimestamp {
+ return false, nil
+ }
+ if timestamp > f.initialTimestamp {
+ return false, ErrUnknownResumeMarker
+ }
+ // TODO(tilaks): if the most recent timestamp in the log is less than
+ // initialTimestamp, return ErrUnknownResumeMarker.
+ }
+ f.crossedInitialTimestamp = true
+ return true, nil
+}
+
+func processChanges(stream watch.WatcherServiceWatchStream, changes []watch.Change, timestamp uint64) error {
addContinued(changes)
addResumeMarkers(changes, timestampToResumeMarker(timestamp))
return sendChanges(stream, changes)
@@ -209,20 +284,6 @@
return bytes.Equal(resumeMarker, nowResumeMarker)
}
-func resumeMarkerToTimestamp(resumeMarker []byte) (uint64, error) {
- if len(resumeMarker) == 0 {
- return 0, nil
- }
- if isNowResumeMarker(resumeMarker) {
- // TODO(tilaks): Get the current resume marker from the log.
- return uint64(time.Now().UnixNano()), nil
- }
- if len(resumeMarker) != 8 {
- return 0, ErrUnknownResumeMarker
- }
- return binary.BigEndian.Uint64(resumeMarker), nil
-}
-
func timestampToResumeMarker(timestamp uint64) []byte {
buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, timestamp)
diff --git a/services/store/memstore/watch/watcher_test.go b/services/store/memstore/watch/watcher_test.go
index 1ec7c10..cb60b0e 100644
--- a/services/store/memstore/watch/watcher_test.go
+++ b/services/store/memstore/watch/watcher_test.go
@@ -157,7 +157,7 @@
id1 := put(t, st, tr, "/", "val1")
commit(t, tr)
- post1 := st.Snapshot().Find(id1).Version
+ post11 := st.Snapshot().Find(id1).Version
if err := st.Close(); err != nil {
t.Fatalf("Close() failed: %v", err)
@@ -177,9 +177,9 @@
id2 := put(t, st, tr, "/a", "val2")
commit(t, tr)
- pre1 := post1
- post1 = st.Snapshot().Find(id1).Version
- post2 := st.Snapshot().Find(id2).Version
+ pre21 := post11
+ post21 := st.Snapshot().Find(id1).Version
+ post22 := st.Snapshot().Find(id2).Version
// Start a watch request.
req := watch.Request{}
@@ -198,11 +198,22 @@
ws.Cancel()
ws.Finish()
- // Start a watch request after the initial state.
+ // Start a watch request at the initial state.
req = watch.Request{ResumeMarker: resumeMarker1}
ws = watchtesting.Watch(rootPublicID, w.Watch, req)
- // Check that watch detects the changes the transaction.
+ // Check that watch detects the changes in the state and the transaction.
+ cb, err = ws.Recv()
+ if err != nil {
+ t.Error("Recv() failed: %v", err)
+ }
+ changes = cb.Changes
+ change = changes[0]
+ if change.Continued {
+ t.Error("Expected change to be the last in this transaction")
+ }
+ expectExists(t, changes, id1, storage.NoVersion, post11, true, "val1", empty)
+
cb, err = ws.Recv()
if err != nil {
t.Error("Recv() failed: %v", err)
@@ -216,8 +227,8 @@
if change.Continued {
t.Error("Expected change to be the last in this transaction")
}
- expectExists(t, changes, id1, pre1, post1, true, "val1", dir("a", id2))
- expectExists(t, changes, id2, storage.NoVersion, post2, false, "val2", empty)
+ expectExists(t, changes, id1, pre21, post21, true, "val1", dir("a", id2))
+ expectExists(t, changes, id2, storage.NoVersion, post22, false, "val2", empty)
}
func TestTransactionResumeMarker(t *testing.T) {
@@ -234,16 +245,16 @@
id1 := put(t, st, tr, "/", "val1")
commit(t, tr)
- post1 := st.Snapshot().Find(id1).Version
+ post11 := st.Snapshot().Find(id1).Version
// Put /a
tr = memstore.NewTransaction()
id2 := put(t, st, tr, "/a", "val2")
commit(t, tr)
- pre1 := post1
- post1 = st.Snapshot().Find(id1).Version
- post2 := st.Snapshot().Find(id2).Version
+ pre21 := post11
+ post21 := st.Snapshot().Find(id1).Version
+ post22 := st.Snapshot().Find(id2).Version
// Start a watch request.
req := watch.Request{}
@@ -262,10 +273,47 @@
ws.Cancel()
ws.Finish()
- // Start a watch request after the first transaction.
+ // Start a watch request at the first transaction.
req = watch.Request{ResumeMarker: resumeMarker1}
ws = watchtesting.Watch(rootPublicID, w.Watch, req)
+ // Check that watch detects the changes in the first and second transaction.
+ cb, err = ws.Recv()
+ if err != nil {
+ t.Error("Recv() failed: %v", err)
+ }
+ changes = cb.Changes
+ change = changes[0]
+ if change.Continued {
+ t.Error("Expected change to be the last in this transaction")
+ }
+ expectExists(t, changes, id1, storage.NoVersion, post11, true, "val1", empty)
+
+ cb, err = ws.Recv()
+ if err != nil {
+ t.Error("Recv() failed: %v", err)
+ }
+ changes = cb.Changes
+ change = changes[0]
+ if !change.Continued {
+ t.Error("Expected change to continue the transaction")
+ }
+ change = changes[1]
+ if change.Continued {
+ t.Error("Expected change to be the last in this transaction")
+ }
+ resumeMarker2 := change.ResumeMarker
+ expectExists(t, changes, id1, pre21, post21, true, "val1", dir("a", id2))
+ expectExists(t, changes, id2, storage.NoVersion, post22, false, "val2", empty)
+
+ // Cancel the watch request.
+ ws.Cancel()
+ ws.Finish()
+
+ // Start a watch request at the second transaction.
+ req = watch.Request{ResumeMarker: resumeMarker2}
+ ws = watchtesting.Watch(rootPublicID, w.Watch, req)
+
// Check that watch detects the changes in the second transaction.
cb, err = ws.Recv()
if err != nil {
@@ -280,8 +328,8 @@
if change.Continued {
t.Error("Expected change to be the last in this transaction")
}
- expectExists(t, changes, id1, pre1, post1, true, "val1", dir("a", id2))
- expectExists(t, changes, id2, storage.NoVersion, post2, false, "val2", empty)
+ expectExists(t, changes, id1, pre21, post21, true, "val1", dir("a", id2))
+ expectExists(t, changes, id2, storage.NoVersion, post22, false, "val2", empty)
}
func TestNowResumeMarker(t *testing.T) {
@@ -295,10 +343,15 @@
// Put /
tr := memstore.NewTransaction()
- id1 := put(t, st, tr, "/", "val1")
+ put(t, st, tr, "/", "val1")
commit(t, tr)
- post1 := st.Snapshot().Find(id1).Version
+ // Put /a
+ tr = memstore.NewTransaction()
+ id2 := put(t, st, tr, "/a", "val2")
+ commit(t, tr)
+
+ post22 := st.Snapshot().Find(id2).Version
// Start a watch request with the "now" resume marker.
req := watch.Request{ResumeMarker: nowResumeMarker}
@@ -307,6 +360,15 @@
// Give watch some time to pick "now".
time.Sleep(time.Second)
+ // Put /a/b
+ tr = memstore.NewTransaction()
+ id3 := put(t, st, tr, "/a/b", "val3")
+ commit(t, tr)
+
+ pre32 := post22
+ post32 := st.Snapshot().Find(id2).Version
+ post33 := st.Snapshot().Find(id3).Version
+
// Check that watch announces that the initial state was skipped.
cb, err := ws.Recv()
if err != nil {
@@ -316,16 +378,7 @@
change := changes[0]
expectInitialStateSkipped(t, change)
- // Put /a
- tr = memstore.NewTransaction()
- id2 := put(t, st, tr, "/a", "val2")
- commit(t, tr)
-
- pre1 := post1
- post1 = st.Snapshot().Find(id1).Version
- post2 := st.Snapshot().Find(id2).Version
-
- // Check that watch detects the changes in the second transaction.
+ // Check that watch detects the changes in the third transaction.
cb, err = ws.Recv()
if err != nil {
t.Error("Recv() failed: %v", err)
@@ -339,25 +392,40 @@
if change.Continued {
t.Error("Expected change to be the last in this transaction")
}
- expectExists(t, changes, id1, pre1, post1, true, "val1", dir("a", id2))
- expectExists(t, changes, id2, storage.NoVersion, post2, false, "val2", empty)
+ expectExists(t, changes, id2, pre32, post32, false, "val2", dir("b", id3))
+ expectExists(t, changes, id3, storage.NoVersion, post33, false, "val3", empty)
}
func TestUnknownResumeMarkers(t *testing.T) {
// Create a new store.
- dbName, _, cleanup := createStore(t)
+ dbName, st, cleanup := createStore(t)
defer cleanup()
// Create a new watcher.
w, cleanup := createWatcher(t, dbName)
defer cleanup()
- // Start a watch request with a resume marker that is too long.
- resumeMarker := make([]byte, 9)
+ // Put /
+ tr := memstore.NewTransaction()
+ put(t, st, tr, "/", "val1")
+ commit(t, tr)
+
+ // Start a watch request with a resume marker that's too early.
+ resumeMarker := timestampToResumeMarker(1)
req := watch.Request{ResumeMarker: resumeMarker}
ws := watchtesting.Watch(rootPublicID, w.Watch, req)
- // The resume marker should be too long.
+ // The resume marker should be unknown.
+ if err := ws.Finish(); err != ErrUnknownResumeMarker {
+ t.Errorf("Unexpected error: %v", err)
+ }
+
+ // Start a watch request with a resume marker that's too late.
+ resumeMarker = timestampToResumeMarker(2 ^ 63 - 1)
+ req = watch.Request{ResumeMarker: resumeMarker}
+ ws = watchtesting.Watch(rootPublicID, w.Watch, req)
+
+ // The resume marker should be unknown.
if err := ws.Finish(); err != ErrUnknownResumeMarker {
t.Errorf("Unexpected error: %v", err)
}
diff --git a/tools/naming/simulator/clock.scr b/tools/naming/simulator/clock.scr
new file mode 100644
index 0000000..2df07af
--- /dev/null
+++ b/tools/naming/simulator/clock.scr
@@ -0,0 +1,66 @@
+#
+# Simple example to show how names work and are used both without
+# and with a mount table.
+#
+
+# A 'stand-alone' server, with an internal namespace of 'mt/...'
+clockServer "" mt clock
+set STAND_ALONE_CLOCK_NAME=$NAME
+
+set N=$STAND_ALONE_CLOCK_NAME
+print "Stand alone clock server at" $N
+time $N "Using $N"
+set N=/$ADDR//mt
+time $N "Using $N"
+
+# Run a root MountTable.
+rootMT mt
+set ROOT_ADDR=$MT_ADDR ROOT_NAME=$MT_NAME
+print ""
+print "Root MountTable at $ROOT_NAME"
+
+clockServer $MT_NAME clocks home_clock
+set CLOCK_NAME=/$ADDR//clocks
+print "Running Clock Server at $CLOCK_NAME"
+
+# Still bypassing the MountTable
+time $CLOCK_NAME bar
+
+# Now, let's use the MountTable
+setLocalRoots $ROOT_NAME
+
+set N=home_clock
+resolve $N
+print $N -> $R0
+
+set N=home_clock/clocks
+resolve $N
+print $N -> $R0
+
+set N=/$ROOT_ADDR/home_clock/clocks
+resolve $N
+print $N -> $R0
+
+set N=home_clock/clocks
+time $N "Using $N"
+set N=$ROOT_NAME/home_clock/clocks
+time $N "Using $N"
+
+# ls * returns home_clock
+ls *
+# ls ... returns "" and home_clock - i.e two items. Is this a bug?
+ls ...
+
+# These all behave as above
+lsmt $ROOT_NAME *
+lsmt $ROOT_NAME ...
+
+# Conclusion: some of this behaviour seems a little awkward. In particular:
+#
+# The client neeeds to use a different form of the name depending on whether
+# a MountTable is used or not. If a MountTable is not used, then the internal
+# 'suffix' (/mt) in the examples above must be used. If a MountTable is used
+# then the internal suffix must not be included in the name.
+#
+# ls ... seems to always return an extra, zero length, string as entry
+#
diff --git a/tools/naming/simulator/driver.go b/tools/naming/simulator/driver.go
index 4a48737..4206289 100644
--- a/tools/naming/simulator/driver.go
+++ b/tools/naming/simulator/driver.go
@@ -78,6 +78,9 @@
for scanner.Scan() {
line := scanner.Text()
if !strings.HasPrefix(line, "#") && len(line) > 0 {
+ if line == "eof" {
+ break
+ }
if err := process(line, lineno); err != nil {
if debug {
fmt.Printf("%d> %s: %v\n", lineno, line, err)
@@ -122,17 +125,17 @@
if factory == nil {
return fmt.Errorf("unrecognised command %q", name)
}
-
if vars, output, _, err := factory().Run(sub); err != nil {
return err
} else {
if debug || interactive {
+ fmt.Printf("%d> %s\n", lineno, line)
+ }
+ if len(output) > 0 {
if !interactive {
- fmt.Printf("%d> %s\n", lineno, line)
+ fmt.Printf("%d> ", lineno)
}
- if len(output) > 0 {
- fmt.Printf("%s\n", strings.Join(output, " "))
- }
+ fmt.Printf("%s\n", strings.Join(output, " "))
}
if debug && len(vars) > 0 {
for k, v := range vars {
diff --git a/tools/naming/simulator/mt.scr b/tools/naming/simulator/mt.scr
new file mode 100644
index 0000000..4ebbb1a
--- /dev/null
+++ b/tools/naming/simulator/mt.scr
@@ -0,0 +1,90 @@
+#
+# Example showing multiple mount tables, servers and globing
+#
+
+rootMT ""
+set ROOT_MT_NAME=$MT_NAME ROOT_MT_ADDR=$MT_ADDR
+set ROOT=$ROOT_MT_NAME
+
+nodeMT $ROOT "mt" "usa"
+set USA_MT=$MT_NAME
+
+nodeMT $USA_MT_NAME "mt" "palo alto"
+set PA_MT=$MT_NAME
+
+
+print "--------- Resolve ROOT, ROOT/usa, ROOT/usa/mt/palo alto"
+print "--------- Each should return a different address"
+resolve $ROOT
+resolve $ROOT/usa
+resolve "$ROOT/usa/mt/palo alto"
+
+print "--------- ResolveMT ROOT, ROOT/usa, ROOT/usa/mt - should return the ROOT MT"
+resolveMT $ROOT
+resolveMT $ROOT/usa
+resolveMT $ROOT/usa/mt
+
+print "--------- ResolveMT ROOT/usa/mt/palo alto, palo alto/mt should return the USA MT"
+resolveMT "$ROOT/usa/mt/palo alto"
+resolveMT "$ROOT/usa/mt/palo alto/mt"
+
+print "--------- ResolveMT ROOT/usa/mt/palo alto/mt/bar should return the palo alto MT"
+resolveMT "$ROOT/usa/mt/palo alto/mt/bar"
+
+# should return a complete hiearchy....
+setLocalRoots $ROOT
+print "--------- setLocalRoots to global root"
+print "--------- ls ..."
+ls ...
+print "--------- ls *"
+ls *
+
+print "--------- setLocalRoots to usa"
+setLocalRoots $USA_MT
+ls ...
+
+print "--------- setLocalRoots to palo alto"
+setLocalRoots $PA_MT
+ls ...
+
+nodeMT $ROOT "mt" "uk"
+set UK_MT=$MT_NAME
+
+nodeMT $UK_MT "mt" "cambridge"
+set CAM_MT=$MT_NAME
+
+setLocalRoots $ROOT
+ls ...
+setLocalRoots $UK_MT
+ls ...
+setLocalRoots $CAM_MT
+ls ...
+
+# Create a MountTable tree without using the internal 'mt' suffix as in the
+# examples above.
+nodeMT $ROOT "" "france"
+set FRANCE_MT=$MT_NAME
+nodeMT $FRANCE_MT "" "paris"
+setLocalRoots $ROOT
+ls ...
+
+# Conclusion: some of this behaviour seems a little awkward. In particular:
+#
+# ls (i.e. glob) ... on the root doesn't seem to iterate down the name
+# space hierarchy, I was under the impression that the local MountTable
+# client would do so? Well, it turns out that this only works if the
+# internal suffix (mt in the USA/UK trees, empty in the France tree) is
+# empty!!
+#
+# ls using the local MountTable on a rooted name doesn't seem to work either,
+# thus making it impossible to see the whole name space without setting the
+# local MountTable's root which will clearly cause problems for concurrent
+# clients.
+#
+# I find the distinction between the "local MountTable client" and
+# "remote MountTable or local MountTable server" awkward. Maybe we should
+# call the "local MountTable client" the "Resolver?".
+#
+
+
+