Merge "mojo/syncbase: Implementing ListApps and ListDatabases."
diff --git a/runtime/internal/flow/conn/flow.go b/runtime/internal/flow/conn/flow.go
index 1cad267..4681bde 100644
--- a/runtime/internal/flow/conn/flow.go
+++ b/runtime/internal/flow/conn/flow.go
@@ -254,7 +254,6 @@
Flags: d.Flags,
Payload: d.Payload,
})
- f.conn.unopenedFlows.Done()
}
sent += size
@@ -262,6 +261,12 @@
// opened. Note that since we've definitely sent a message now opened is surely
// true.
f.conn.mu.Lock()
+ // We need to ensure that we only call Done() exactly once, so we need to
+ // recheck f.opened, to ensure that f.close didn't decrement the wait group
+ // while we were not holding the lock.
+ if !f.opened {
+ f.conn.unopenedFlows.Done()
+ }
f.opened = true
}
f.writing = false
@@ -435,6 +440,9 @@
if !f.opened {
// Closing a flow that was never opened.
f.conn.unopenedFlows.Done()
+ // We mark the flow as opened to prevent mulitple calls to
+ // f.conn.unopenedFlows.Done().
+ f.opened = true
} else if !closedRemotely && !connClosing {
// Note: If the conn is closing there is no point in trying to
// send the flow close message as it will fail. This is racy
diff --git a/runtime/internal/rpc/full_test.go b/runtime/internal/rpc/full_test.go
index bd1c492..e6f0cad 100644
--- a/runtime/internal/rpc/full_test.go
+++ b/runtime/internal/rpc/full_test.go
@@ -510,6 +510,36 @@
}
}
+func TestAddNameLater(t *testing.T) {
+ ctx, shutdown := initForTest()
+ defer shutdown()
+ sm := imanager.InternalNew(ctx, naming.FixedRoutingID(0x66666666))
+ defer sm.Shutdown()
+ ns := tnaming.NewSimpleNamespace()
+ server, err := testInternalNewServer(ctx, sm, ns, nil, options.SecurityNone)
+ if err != nil {
+ t.Fatalf("InternalNewServer failed: %v", err)
+ }
+ if _, err = server.Listen(listenSpec); err != nil {
+ t.Fatalf("server.Listen failed: %v", err)
+ }
+ disp := &testServerDisp{&testServer{}}
+ if err := server.ServeDispatcher("", disp); err != nil {
+ t.Fatalf("server.Serve failed: %v", err)
+ }
+ if err := server.AddName("mp/server"); err != nil {
+ t.Fatalf("server.AddName failed: %v", err)
+ }
+ client := DeprecatedNewClient(sm, ns)
+ var got string
+ if err := client.Call(ctx, "mp/server", "Unauthorized", nil, []interface{}{&got}, options.SecurityNone); err != nil {
+ t.Fatalf("client.Call failed: %v", err)
+ }
+ if want := "UnauthorizedResult"; got != want {
+ t.Errorf("got (%v), want (%v)", got, want)
+ }
+}
+
func TestNoPrincipal(t *testing.T) {
ctx, shutdown := initForTest()
defer shutdown()
diff --git a/runtime/internal/rpc/server.go b/runtime/internal/rpc/server.go
index 7cb149d..e770aa5 100644
--- a/runtime/internal/rpc/server.go
+++ b/runtime/internal/rpc/server.go
@@ -909,12 +909,12 @@
}
vtrace.GetSpan(s.ctx).Annotate("Serving under name: " + name)
s.disp = disp
- if len(name) > 0 {
- for ls, _ := range s.listenState {
- for _, iep := range ls.ieps {
- s.publisher.AddServer(iep.String())
- }
+ for ls, _ := range s.listenState {
+ for _, iep := range ls.ieps {
+ s.publisher.AddServer(iep.String())
}
+ }
+ if len(name) > 0 {
s.publisher.AddName(name, s.servesMountTable, s.isLeaf)
}
return nil
diff --git a/services/syncbase/server/db_info_test.go b/services/syncbase/server/db_info_test.go
index 386cd4c..0268f82 100644
--- a/services/syncbase/server/db_info_test.go
+++ b/services/syncbase/server/db_info_test.go
@@ -14,7 +14,7 @@
dbName string
stKey string
}{
- {"app1", "db1", "$dbInfo\xfeapp1\xfedb1"},
+ {"app1", "db1", "i\xfeapp1\xfedb1"},
}
for _, test := range tests {
got, want := dbInfoStKey(&app{name: test.appName}, test.dbName), test.stKey
diff --git a/services/syncbase/server/interfaces/sync_types.vdl b/services/syncbase/server/interfaces/sync_types.vdl
index aa79fa5..7b695fb 100644
--- a/services/syncbase/server/interfaces/sync_types.vdl
+++ b/services/syncbase/server/interfaces/sync_types.vdl
@@ -17,14 +17,14 @@
// TODO(hpucha): These are not final yet. This is an intermediate step.
const (
- // NodeRec type log record adds a new node in the dag.
- NodeRec = byte(0)
+ // NodeRec type log record adds a new node in the dag.
+ NodeRec = byte(0)
- // LinkRec type log record adds a new link in the dag. Link records are
- // added when a conflict is resolved by picking the local or the remote
- // version as the resolution of a conflict, instead of creating a new
- // version.
- LinkRec = byte(1)
+ // LinkRec type log record adds a new link in the dag. Link records are
+ // added when a conflict is resolved by picking the local or the remote
+ // version as the resolution of a conflict, instead of creating a new
+ // version.
+ LinkRec = byte(1)
)
// PrefixGenVector is the generation vector for a data prefix, which maps each
@@ -44,26 +44,26 @@
// TODO(hpucha): Add readset/scanset. Look into sending tx metadata only once
// per transaction.
type LogRecMetadata struct {
- // Log related information.
- Id uint64 // device id that created the log record.
- Gen uint64 // generation number for the log record.
- RecType byte // type of log record.
+ // Log related information.
+ Id uint64 // device id that created the log record.
+ Gen uint64 // generation number for the log record.
+ RecType byte // type of log record.
- // Object related information.
+ // Object related information.
// Id of the object that was updated. This id is relative to Application
// and Database names and is the store key for a particular row in a
// table.
- ObjId string
- CurVers string // current version number of the object.
- Parents []string // 0, 1 or 2 parent versions that the current version is derived from.
- UpdTime time.Time // timestamp when the update is generated.
- PermId string // id of the permissions object controlling this version.
- PermVers string // current version of the permissions object.
- Shell bool // true when the mutation data is hidden due to permissions.
- Delete bool // indicates whether the update resulted in object being deleted from the store.
- BatchId uint64 // unique id of the Batch this update belongs to.
- BatchCount uint64 // number of objects in the Batch.
+ ObjId string
+ CurVers string // current version number of the object.
+ Parents []string // 0, 1 or 2 parent versions that the current version is derived from.
+ UpdTime time.Time // timestamp when the update is generated.
+ PermId string // id of the permissions object controlling this version.
+ PermVers string // current version of the permissions object.
+ Shell bool // true when the mutation data is hidden due to permissions.
+ Delete bool // indicates whether the update resulted in object being deleted from the store.
+ BatchId uint64 // unique id of the Batch this update belongs to.
+ BatchCount uint64 // number of objects in the Batch.
}
// LogRec represents the on-wire representation of an entire log record: its
@@ -107,7 +107,7 @@
// DeltaReq contains a request to sync either data or syncgroup metadata for a
// Database.
type DeltaReq union {
- Sgs SgDeltaReq
+ Sgs SgDeltaReq
Data DataDeltaReq
}
@@ -116,7 +116,7 @@
// requesting deltas for that Database.
type DataDeltaReq struct {
AppName string
- DbName string
+ DbName string
SgIds set[GroupId]
InitVec GenVector
}
@@ -126,7 +126,7 @@
// requesting deltas for those syncgroups.
type SgDeltaReq struct {
AppName string
- DbName string
+ DbName string
InitVec GenVector // Contains a genvector per syncgroup.
}
diff --git a/services/syncbase/server/mojo_impl.go b/services/syncbase/server/mojo_impl.go
index 71f027b..d276923 100644
--- a/services/syncbase/server/mojo_impl.go
+++ b/services/syncbase/server/mojo_impl.go
@@ -106,10 +106,10 @@
if err != nil {
return nosqlwire.SyncgroupSpec{}, err
}
- prefixes := make([]nosqlwire.SyncgroupPrefix, len(mSpec.Prefixes))
+ prefixes := make([]nosqlwire.TableRow, len(mSpec.Prefixes))
for i, v := range mSpec.Prefixes {
prefixes[i].TableName = v.TableName
- prefixes[i].RowPrefix = v.RowPrefix
+ prefixes[i].Row = v.Row
}
return nosqlwire.SyncgroupSpec{
Description: mSpec.Description,
@@ -125,10 +125,10 @@
if err != nil {
return mojom.SyncgroupSpec{}, err
}
- prefixes := make([]mojom.SyncgroupPrefix, len(vSpec.Prefixes))
+ prefixes := make([]mojom.TableRow, len(vSpec.Prefixes))
for i, v := range vSpec.Prefixes {
prefixes[i].TableName = v.TableName
- prefixes[i].RowPrefix = v.RowPrefix
+ prefixes[i].Row = v.Row
}
return mojom.SyncgroupSpec{
Description: vSpec.Description,
diff --git a/services/syncbase/server/service.go b/services/syncbase/server/service.go
index 23ab144..f6bd7f6 100644
--- a/services/syncbase/server/service.go
+++ b/services/syncbase/server/service.go
@@ -9,16 +9,22 @@
// preserve privacy.
import (
+ "bytes"
+ "fmt"
"path"
+ "reflect"
"sync"
+ "v.io/v23"
"v.io/v23/context"
"v.io/v23/glob"
"v.io/v23/rpc"
+ "v.io/v23/security"
"v.io/v23/security/access"
wire "v.io/v23/services/syncbase"
"v.io/v23/verror"
"v.io/v23/vom"
+ "v.io/x/lib/vlog"
"v.io/x/ref/services/syncbase/clock"
"v.io/x/ref/services/syncbase/server/interfaces"
"v.io/x/ref/services/syncbase/server/nosql"
@@ -46,7 +52,7 @@
// ServiceOptions configures a service.
type ServiceOptions struct {
- // Service-level permissions.
+ // Service-level permissions. Used only when creating a brand-new storage instance.
Perms access.Permissions
// Root dir for data storage.
RootDir string
@@ -54,12 +60,31 @@
Engine string
}
+// defaultPerms returns a permissions object that grants all permissions to the
+// provided blessing patterns.
+func defaultPerms(blessingPatterns []security.BlessingPattern) access.Permissions {
+ perms := access.Permissions{}
+ for _, tag := range access.AllTypicalTags() {
+ for _, bp := range blessingPatterns {
+ perms.Add(bp, string(tag))
+ }
+ }
+ return perms
+}
+
+// PermsString returns a JSON-based string representation of the permissions.
+func PermsString(perms access.Permissions) string {
+ var buf bytes.Buffer
+ if err := access.WritePermissions(&buf, perms); err != nil {
+ vlog.Errorf("Failed to serialize permissions %+v: %v", perms, err)
+ return fmt.Sprintf("[unserializable] %+v", perms)
+ }
+ return buf.String()
+}
+
// NewService creates a new service instance and returns it.
// TODO(sadovsky): If possible, close all stores when the server is stopped.
func NewService(ctx *context.T, call rpc.ServerCall, opts ServiceOptions) (*service, error) {
- if opts.Perms == nil {
- return nil, verror.New(verror.ErrInternal, ctx, "perms must be specified")
- }
st, err := util.OpenStore(opts.Engine, path.Join(opts.RootDir, opts.Engine), util.OpenOptions{CreateIfMissing: true, ErrorIfExists: false})
if err != nil {
return nil, err
@@ -69,13 +94,18 @@
opts: opts,
apps: map[string]*app{},
}
- data := &serviceData{
- Perms: opts.Perms,
- }
- if err := util.Get(ctx, st, s.stKey(), &serviceData{}); verror.ErrorID(err) != verror.ErrNoExist.ID {
+ var sd serviceData
+ if err := util.Get(ctx, st, s.stKey(), &sd); verror.ErrorID(err) != verror.ErrNoExist.ID {
if err != nil {
return nil, err
}
+ readPerms := sd.Perms.Normalize()
+ if opts.Perms != nil {
+ if givenPerms := opts.Perms.Copy().Normalize(); !reflect.DeepEqual(givenPerms, readPerms) {
+ vlog.Infof("Warning: configured permissions will be ignored: %v", PermsString(givenPerms))
+ }
+ }
+ vlog.Infof("Using persisted permissions: %v", PermsString(readPerms))
// Service exists. Initialize in-memory data structures.
// Read all apps, populate apps map.
aIt := st.Scan(util.ScanPrefixArgs(util.AppPrefix, ""))
@@ -122,7 +152,16 @@
return nil, verror.New(verror.ErrInternal, ctx, err)
}
} else {
+ perms := opts.Perms
// Service does not exist.
+ if perms == nil {
+ vlog.Info("Permissions flag not set. Giving local principal all permissions.")
+ perms = defaultPerms(security.DefaultBlessingPatterns(v23.GetPrincipal(ctx)))
+ }
+ vlog.Infof("Using permissions: %v", PermsString(perms))
+ data := &serviceData{
+ Perms: perms,
+ }
if err := util.Put(ctx, st, s.stKey(), data); err != nil {
return nil, err
}
diff --git a/services/syncbase/server/util/constants.go b/services/syncbase/server/util/constants.go
index 272233e..fc0b6a5 100644
--- a/services/syncbase/server/util/constants.go
+++ b/services/syncbase/server/util/constants.go
@@ -10,21 +10,19 @@
// Constants related to storage engine keys.
// Note, these are persisted and therefore must not be modified.
-// TODO(sadovsky): Use one-byte strings. Changing these prefixes breaks various
-// tests. Tests generally shouldn't depend on the values of these constants.
const (
- AppPrefix = "$app"
- ClockPrefix = "$clock"
- DatabasePrefix = "$database"
- DbInfoPrefix = "$dbInfo"
- LogPrefix = "$log"
- PermsPrefix = "$perms"
- PermsIndexPrefix = "$iperms"
- RowPrefix = "$row"
- ServicePrefix = "$service"
- SyncPrefix = "$sync"
- TablePrefix = "$table"
- VersionPrefix = "$version"
+ AppPrefix = "a"
+ ClockPrefix = "c"
+ DatabasePrefix = "d"
+ DbInfoPrefix = "i"
+ LogPrefix = "l"
+ PermsPrefix = "p"
+ RowPrefix = "r"
+ ServicePrefix = "s"
+ TablePrefix = "t"
+ VersionPrefix = "v"
+ PermsIndexPrefix = "x"
+ SyncPrefix = "y"
// KeyPartSep is a separator for parts of storage engine keys, e.g. separating
// table name from row key.
diff --git a/services/syncbase/server/watchable/watcher.go b/services/syncbase/server/watchable/watcher.go
index ab24d81..80db6eb 100644
--- a/services/syncbase/server/watchable/watcher.go
+++ b/services/syncbase/server/watchable/watcher.go
@@ -146,6 +146,7 @@
}
func parseResumeMarker(resumeMarker string) (uint64, error) {
+ // See logEntryKey() for the structure of a resume marker key.
parts := util.SplitNKeyParts(resumeMarker, 2)
if len(parts) != 2 {
return 0, verror.New(watch.ErrUnknownResumeMarker, nil, resumeMarker)
diff --git a/services/syncbase/syncbased/main.go b/services/syncbase/syncbased/main.go
index ebe3fb7..c6a9e85 100644
--- a/services/syncbase/syncbased/main.go
+++ b/services/syncbase/syncbased/main.go
@@ -10,8 +10,6 @@
"v.io/v23"
"v.io/v23/context"
"v.io/v23/rpc"
- "v.io/v23/security"
- "v.io/v23/security/access"
"v.io/x/lib/vlog"
"v.io/x/ref/lib/security/securityflag"
_ "v.io/x/ref/runtime/factories/roaming"
@@ -24,18 +22,6 @@
engine = flag.String("engine", "leveldb", "Storage engine to use. Currently supported: memstore and leveldb.")
)
-// defaultPerms returns a permissions object that grants all permissions to the
-// provided blessing patterns.
-func defaultPerms(blessingPatterns []security.BlessingPattern) access.Permissions {
- perms := access.Permissions{}
- for _, tag := range access.AllTypicalTags() {
- for _, bp := range blessingPatterns {
- perms.Add(bp, string(tag))
- }
- }
- return perms
-}
-
// TODO(sadovsky): We return rpc.Server and rpc.Dispatcher as a quick hack to
// support Mojo.
func Serve(ctx *context.T) (rpc.Server, rpc.Dispatcher, func()) {
@@ -44,12 +30,8 @@
vlog.Fatal("securityflag.PermissionsFromFlag() failed: ", err)
}
if perms != nil {
- vlog.Info("Using perms from command line flag.")
- } else {
- vlog.Info("Perms flag not set. Giving local principal all perms.")
- perms = defaultPerms(security.DefaultBlessingPatterns(v23.GetPrincipal(ctx)))
+ vlog.Infof("Read permissions from command line flag: %v", server.PermsString(perms))
}
- vlog.Infof("Perms: %v", perms)
service, err := server.NewService(ctx, nil, server.ServiceOptions{
Perms: perms,
RootDir: *rootDir,
@@ -65,6 +47,9 @@
if err != nil {
vlog.Fatal("v23.WithNewDispatchingServer() failed: ", err)
}
+ if eps := s.Status().Endpoints; len(eps) > 0 {
+ vlog.Info("Serving as: ", eps[0].Name())
+ }
if *name != "" {
vlog.Info("Mounted at: ", *name)
}
diff --git a/services/syncbase/vsync/constants.go b/services/syncbase/vsync/constants.go
new file mode 100644
index 0000000..ddefac5
--- /dev/null
+++ b/services/syncbase/vsync/constants.go
@@ -0,0 +1,26 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package vsync
+
+import "v.io/x/ref/services/syncbase/server/util"
+
+// Key prefixes for sync-related metadata.
+var (
+ dagNodePrefix = util.JoinKeyParts(util.SyncPrefix, "a")
+ dagHeadPrefix = util.JoinKeyParts(util.SyncPrefix, "b")
+ dagBatchPrefix = util.JoinKeyParts(util.SyncPrefix, "c")
+ dbssKey = util.JoinKeyParts(util.SyncPrefix, "d") // database sync state
+ sgIdPrefix = util.JoinKeyParts(util.SyncPrefix, "i") // syncgroup ID --> syncgroup local state
+ logPrefix = util.JoinKeyParts(util.SyncPrefix, "l") // log state
+ sgNamePrefix = util.JoinKeyParts(util.SyncPrefix, "n") // syncgroup name --> syncgroup ID
+ sgDataPrefix = util.JoinKeyParts(util.SyncPrefix, "s") // syncgroup (ID, version) --> syncgroup synced state
+)
+
+const (
+ // The sync log contains <logPrefix>:<logDataPrefix> records (for data) and
+ // <logPrefix>:<sgoid> records (for syncgroup metadata), where <logDataPrefix>
+ // is defined below, and <sgoid> is <sgDataPrefix>:<GroupId>.
+ logDataPrefix = "d"
+)
diff --git a/services/syncbase/vsync/dag.go b/services/syncbase/vsync/dag.go
index 9739330..55b0043 100644
--- a/services/syncbase/vsync/dag.go
+++ b/services/syncbase/vsync/dag.go
@@ -723,7 +723,7 @@
// nodeKey returns the key used to access a DAG node (oid, version).
func nodeKey(oid, version string) string {
- return util.JoinKeyParts(util.SyncPrefix, dagPrefix, "n", oid, version)
+ return util.JoinKeyParts(dagNodePrefix, oid, version)
}
// setNode stores the DAG node entry.
@@ -769,7 +769,7 @@
// headKey returns the key used to access the DAG object head.
func headKey(oid string) string {
- return util.JoinKeyParts(util.SyncPrefix, dagPrefix, "h", oid)
+ return util.JoinKeyParts(dagHeadPrefix, oid)
}
// setHead stores version as the DAG object head.
@@ -798,7 +798,7 @@
// batchKey returns the key used to access the DAG batch info.
func batchKey(btid uint64) string {
- return util.JoinKeyParts(util.SyncPrefix, dagPrefix, "b", fmt.Sprintf("%d", btid))
+ return util.JoinKeyParts(dagBatchPrefix, fmt.Sprintf("%d", btid))
}
// setBatch stores the DAG batch entry.
diff --git a/services/syncbase/vsync/dag_test.go b/services/syncbase/vsync/dag_test.go
index 8ec653b..05c841e 100644
--- a/services/syncbase/vsync/dag_test.go
+++ b/services/syncbase/vsync/dag_test.go
@@ -331,7 +331,7 @@
oid, isConflict, newHead, oldHead, ancestor, errConflict)
}
- if logrec, err := getLogRecKey(nil, st, oid, newHead); err != nil || logrec != "$sync\xfelog\xfedata\xfe11\xfe3" {
+ if logrec, err := getLogRecKey(nil, st, oid, newHead); err != nil || logrec != "y\xfel\xfed\xfe11\xfe3" {
t.Errorf("invalid logrec for newhead object %s:%s: %v", oid, newHead, logrec)
}
@@ -407,10 +407,10 @@
oid, isConflict, newHead, oldHead, ancestor, errConflict)
}
- if logrec, err := getLogRecKey(nil, st, oid, oldHead); err != nil || logrec != "$sync\xfelog\xfedata\xfe10\xfe3" {
+ if logrec, err := getLogRecKey(nil, st, oid, oldHead); err != nil || logrec != "y\xfel\xfed\xfe10\xfe3" {
t.Errorf("invalid logrec for oldhead object %s:%s: %v", oid, oldHead, logrec)
}
- if logrec, err := getLogRecKey(nil, st, oid, newHead); err != nil || logrec != "$sync\xfelog\xfedata\xfe11\xfe3" {
+ if logrec, err := getLogRecKey(nil, st, oid, newHead); err != nil || logrec != "y\xfel\xfed\xfe11\xfe3" {
t.Errorf("invalid logrec for newhead object %s:%s: %v", oid, newHead, logrec)
}
@@ -490,13 +490,13 @@
oid, isConflict, newHead, oldHead, ancestor, errConflict)
}
- if logrec, err := getLogRecKey(nil, st, oid, oldHead); err != nil || logrec != "$sync\xfelog\xfedata\xfe10\xfe3" {
+ if logrec, err := getLogRecKey(nil, st, oid, oldHead); err != nil || logrec != "y\xfel\xfed\xfe10\xfe3" {
t.Errorf("invalid logrec for oldhead object %s:%s: %s", oid, oldHead, logrec)
}
- if logrec, err := getLogRecKey(nil, st, oid, newHead); err != nil || logrec != "$sync\xfelog\xfedata\xfe11\xfe3" {
+ if logrec, err := getLogRecKey(nil, st, oid, newHead); err != nil || logrec != "y\xfel\xfed\xfe11\xfe3" {
t.Errorf("invalid logrec for newhead object %s:%s: %s", oid, newHead, logrec)
}
- if logrec, err := getLogRecKey(nil, st, oid, ancestor); err != nil || logrec != "$sync\xfelog\xfedata\xfe10\xfe2" {
+ if logrec, err := getLogRecKey(nil, st, oid, ancestor); err != nil || logrec != "y\xfel\xfed\xfe10\xfe2" {
t.Errorf("invalid logrec for ancestor object %s:%s: %s", oid, ancestor, logrec)
}
@@ -583,13 +583,13 @@
oid, isConflict, newHead, oldHead, ancestor, errConflict)
}
- if logrec, err := getLogRecKey(nil, st, oid, oldHead); err != nil || logrec != "$sync\xfelog\xfedata\xfe10\xfe3" {
+ if logrec, err := getLogRecKey(nil, st, oid, oldHead); err != nil || logrec != "y\xfel\xfed\xfe10\xfe3" {
t.Errorf("invalid logrec for oldhead object %s:%s: %s", oid, oldHead, logrec)
}
- if logrec, err := getLogRecKey(nil, st, oid, newHead); err != nil || logrec != "$sync\xfelog\xfedata\xfe11\xfe2" {
+ if logrec, err := getLogRecKey(nil, st, oid, newHead); err != nil || logrec != "y\xfel\xfed\xfe11\xfe2" {
t.Errorf("invalid logrec for newhead object %s:%s: %s", oid, newHead, logrec)
}
- if logrec, err := getLogRecKey(nil, st, oid, ancestor); err != nil || logrec != "$sync\xfelog\xfedata\xfe10\xfe2" {
+ if logrec, err := getLogRecKey(nil, st, oid, ancestor); err != nil || logrec != "y\xfel\xfed\xfe10\xfe2" {
t.Errorf("invalid logrec for ancestor object %s:%s: %s", oid, ancestor, logrec)
}
@@ -670,10 +670,10 @@
oid, isConflict, newHead, oldHead, ancestor, errConflict)
}
- if logrec, err := getLogRecKey(nil, st, oid, oldHead); err != nil || logrec != "$sync\xfelog\xfedata\xfe10\xfe3" {
+ if logrec, err := getLogRecKey(nil, st, oid, oldHead); err != nil || logrec != "y\xfel\xfed\xfe10\xfe3" {
t.Errorf("invalid logrec for oldhead object %s:%s: %s", oid, oldHead, logrec)
}
- if logrec, err := getLogRecKey(nil, st, oid, newHead); err != nil || logrec != "$sync\xfelog\xfedata\xfe11\xfe3" {
+ if logrec, err := getLogRecKey(nil, st, oid, newHead); err != nil || logrec != "y\xfel\xfed\xfe11\xfe3" {
t.Errorf("invalid logrec for newhead object %s:%s: %s", oid, newHead, logrec)
}
@@ -989,11 +989,11 @@
}
// TestRemoteLinkedConflict tests sync of remote updates that contain linked
-// nodes (conflict resolution by selecting an existing version) on top of a local
-// initial state triggering a local conflict. An object is created locally and
-// updated twice (v1 -> v2 -> v3). Another device has along the way learned
+// nodes (conflict resolution by selecting an existing version) on top of a
+// local initial state triggering a local conflict. An object is created locally
+// and updated twice (v1 -> v2 -> v3). Another device has along the way learned
// about v1, created (v1 -> v4), then learned about (v1 -> v2) and resolved that
-// conflict by selecting v4 over v2. Now it sends that new info (v4 and the
+// conflict by selecting v4 over v2. Now it sends that new info (v4 and the
// v4/v2 link) back to the original (local) device which sees a v3/v4 conflict.
func TestRemoteLinkedConflict(t *testing.T) {
svc := createService(t)
diff --git a/services/syncbase/vsync/initiator.go b/services/syncbase/vsync/initiator.go
index d39590a..16b217b 100644
--- a/services/syncbase/vsync/initiator.go
+++ b/services/syncbase/vsync/initiator.go
@@ -1059,7 +1059,7 @@
if state.SyncPending {
curgv := genvec[rpfx]
res := curgv.Compare(state.PendingGenVec)
- vlog.VI(4).Infof("sync: updateSyncSt:: checking join pending %v, curgv %v, res %v", state.PendingGenVec, curgv, res)
+ vlog.VI(4).Infof("sync: updateSyncSt: checking join pending %v, curgv %v, res %v", state.PendingGenVec, curgv, res)
if res >= 0 {
state.SyncPending = false
if err := setSGIdEntry(ctx, iSt.tx, gid, state); err != nil {
diff --git a/services/syncbase/vsync/initiator_test.go b/services/syncbase/vsync/initiator_test.go
index fd37fb6..41f5272 100644
--- a/services/syncbase/vsync/initiator_test.go
+++ b/services/syncbase/vsync/initiator_test.go
@@ -402,7 +402,7 @@
Creator: "mockCreator",
SpecVersion: "etag-0",
Spec: wire.SyncgroupSpec{
- Prefixes: []wire.SyncgroupPrefix{{TableName: "foo", RowPrefix: ""}, {TableName: "bar", RowPrefix: ""}},
+ Prefixes: []wire.TableRow{{TableName: "foo", Row: ""}, {TableName: "bar", Row: ""}},
MountTables: []string{"1/2/3/4", "5/6/7/8"},
},
Joiners: map[string]wire.SyncgroupMemberInfo{
diff --git a/services/syncbase/vsync/replay_test.go b/services/syncbase/vsync/replay_test.go
index 2547966..02337fe 100644
--- a/services/syncbase/vsync/replay_test.go
+++ b/services/syncbase/vsync/replay_test.go
@@ -397,28 +397,21 @@
if len(parts) != 5 && len(parts) != 7 {
return "", 0, 0, verr
}
- if parts[0] != util.SyncPrefix || parts[1] != logPrefix {
+ if util.JoinKeyParts(parts[:2]...) != logPrefix {
return "", 0, 0, verr
}
var idStr, genStr, prefix string
- if parts[2] == logDataPrefix {
- if len(parts) != 5 {
+ if len(parts) == 5 {
+ if parts[2] != logDataPrefix {
return "", 0, 0, verr
}
- prefix = parts[2]
- idStr = parts[3]
- genStr = parts[4]
- } else {
- if len(parts) != 7 {
+ prefix, idStr, genStr = parts[2], parts[3], parts[4]
+ } else { // len(parts) == 7
+ if _, err := strconv.ParseUint(parts[4], 10, 64); err != nil { // GroupId
return "", 0, 0, verr
}
- prefix = util.JoinKeyParts(parts[2:5]...)
- if _, err := strconv.ParseUint(parts[4], 10, 64); err != nil {
- return "", 0, 0, verr
- }
- idStr = parts[5]
- genStr = parts[6]
+ prefix, idStr, genStr = util.JoinKeyParts(parts[2:5]...), parts[5], parts[6]
}
id, err := strconv.ParseUint(idStr, 10, 64)
if err != nil {
@@ -433,13 +426,14 @@
func TestSplitLogRecKey(t *testing.T) {
invalid := []string{
- "$sync\xfe100\xfebb",
- "log\xfe100\xfebb",
- "$sync\xfelog\xfedata\xfe100\xfexx",
- "$sync\xfelog\xfedata\xfeaa\xfebb",
- "$sync\xfelog\xfexx\xfe100\xfebb",
- "$sync\xfelog\xfedata\xfeaa\xfe100\xfebb",
- "$sync\xfelog\xfe$sync\xfesgd\xfexx\xfe100\xfebb",
+ "y\xfe100\xfebb",
+ "l\xfe100\xfebb",
+ "y\xfel\xfed\xfe100\xfexx",
+ "y\xfel\xfed\xfeaa\xfe100",
+ "y\xfel\xfex\xfe100\xfe100",
+ "y\xfel\xfed\xfe100",
+ "y\xfel\xfed\xfe100\xfe100\xfebb",
+ "y\xfel\xfey\xfes\xfexx\xfe100\xfe100",
}
for _, k := range invalid {
@@ -454,14 +448,14 @@
gen uint64
}{
{logDataPrefix, 10, 20},
- {"$sync\xfesgd\xfe2500", 190, 540},
- {"$sync\xfesgd\xfe4200", 9999, 999999},
+ {"y\xfes\xfe2500", 190, 540},
+ {"y\xfes\xfe4200", 9999, 999999},
}
for _, v := range valid {
gotPfx, gotId, gotGen, err := splitLogRecKey(nil, logRecKey(v.pfx, v.id, v.gen))
if gotPfx != v.pfx || gotId != v.id || gotGen != v.gen || err != nil {
- t.Fatalf("failed key conversion pfx got %v want %v, id got %v want %v, gen got %v want %v, err %v", gotPfx, v.pfx, gotId, v.id, gotGen, v.gen, err)
+ t.Fatalf("failed key conversion: pfx got %v want %v, id got %v want %v, gen got %v want %v, err %v", gotPfx, v.pfx, gotId, v.id, gotGen, v.gen, err)
}
}
}
diff --git a/services/syncbase/vsync/sync_state.go b/services/syncbase/vsync/sync_state.go
index fe389d1..7b571c4 100644
--- a/services/syncbase/vsync/sync_state.go
+++ b/services/syncbase/vsync/sync_state.go
@@ -35,11 +35,10 @@
// the data log records, these log records are used to sync syncgroup metadata.
//
// The generations for the data mutations and mutations for each syncgroup are
-// in separate spaces. Data mutations in a Database start at gen 1, and
-// grow. Mutations for each syncgroup start at gen 1, and grow. Thus, for the
-// local data log records, the keys are of the form
-// $sync:log:data:<devid>:<gen>, and the keys for local syncgroup log record are
-// of the form $sync:log:<sgid>:<devid>:<gen>.
+// in separate spaces. Data mutations in a Database start at gen 1, and grow.
+// Mutations for each syncgroup start at gen 1, and grow. Thus, for the local
+// data log records, the keys are of the form y:l:d:<devid>:<gen>, and the keys
+// for local syncgroup log record are of the form y:l:<sgoid>:<devid>:<gen>.
// TODO(hpucha): Should this space be separate from the data or not? If it is
// not, it can provide consistency between data and syncgroup metadata. For
@@ -85,7 +84,7 @@
data *localGenInfoInMem // info for data.
// Info for syncgroups. The key here is the syncgroup oid of the form
- // $sync:sgd:<group id>. More details in syncgroup.go.
+ // y:s:<groupId>. More details in syncgroup.go.
sgs map[string]*localGenInfoInMem
// Note: Generation vector contains state from remote devices only.
@@ -126,8 +125,8 @@
// c) republish names in mount tables for all syncgroups.
// d) in-memory queue of syncgroups to be published.
func (s *syncService) initSync(ctx *context.T) error {
- vlog.VI(2).Infof("sync: initSync:: begin")
- defer vlog.VI(2).Infof("sync: initSync:: end")
+ vlog.VI(2).Infof("sync: initSync: begin")
+ defer vlog.VI(2).Infof("sync: initSync: end")
s.syncStateLock.Lock()
defer s.syncStateLock.Unlock()
@@ -154,7 +153,7 @@
dsInMem.sggenvec = ds.SgGenVec
}
- vlog.VI(2).Infof("sync: initSync:: initing app %v db %v, dsInMem %v", appName, dbName, dsInMem)
+ vlog.VI(2).Infof("sync: initSync: initing app %v db %v, dsInMem %v", appName, dbName, dsInMem)
sgCount := 0
name := appDbName(appName, dbName)
@@ -201,13 +200,13 @@
}
info.checkptGen = info.gen - 1
- vlog.VI(4).Infof("sync: initSync:: initing app %v db %v sg %v info %v", appName, dbName, sgoid, info)
+ vlog.VI(4).Infof("sync: initSync: initing app %v db %v sg %v info %v", appName, dbName, sgoid, info)
return false
})
if sgCount == 0 {
- vlog.VI(2).Infof("sync: initSync:: initing app %v db %v done (no sgs found)", appName, dbName)
+ vlog.VI(2).Infof("sync: initSync: initing app %v db %v done (no sgs found)", appName, dbName)
return false
}
@@ -231,7 +230,7 @@
s.syncState[name] = dsInMem
- vlog.VI(2).Infof("sync: initSync:: initing app %v db %v done dsInMem %v (data %v)", appName, dbName, dsInMem, dsInMem.data)
+ vlog.VI(2).Infof("sync: initSync: initing app %v db %v done dsInMem %v (data %v)", appName, dbName, dsInMem, dsInMem.data)
return false
})
@@ -517,20 +516,15 @@
////////////////////////////////////////////////////////////
// Low-level utility functions to access sync state.
-// dbSyncStateKey returns the key used to access the sync state of a Database.
-func dbSyncStateKey() string {
- return util.JoinKeyParts(util.SyncPrefix, dbssPrefix)
-}
-
// putDbSyncState persists the sync state object for a given Database.
func putDbSyncState(ctx *context.T, tx store.Transaction, ds *dbSyncState) error {
- return util.Put(ctx, tx, dbSyncStateKey(), ds)
+ return util.Put(ctx, tx, dbssKey, ds)
}
// getDbSyncState retrieves the sync state object for a given Database.
func getDbSyncState(ctx *context.T, st store.StoreReader) (*dbSyncState, error) {
var ds dbSyncState
- if err := util.Get(ctx, st, dbSyncStateKey(), &ds); err != nil {
+ if err := util.Get(ctx, st, dbssKey, &ds); err != nil {
return nil, err
}
return &ds, nil
@@ -541,12 +535,12 @@
// logRecsPerDeviceScanPrefix returns the prefix used to scan log records for a particular device.
func logRecsPerDeviceScanPrefix(pfx string, id uint64) string {
- return util.JoinKeyParts(util.SyncPrefix, logPrefix, pfx, fmt.Sprintf("%d", id))
+ return util.JoinKeyParts(logPrefix, pfx, fmt.Sprintf("%d", id))
}
// logRecKey returns the key used to access a specific log record.
func logRecKey(pfx string, id, gen uint64) string {
- return util.JoinKeyParts(util.SyncPrefix, logPrefix, pfx, fmt.Sprintf("%d", id), fmt.Sprintf("%016x", gen))
+ return util.JoinKeyParts(logPrefix, pfx, fmt.Sprintf("%d", id), fmt.Sprintf("%016x", gen))
}
// hasLogRec returns true if the log record for (devid, gen) exists.
diff --git a/services/syncbase/vsync/syncgroup.go b/services/syncbase/vsync/syncgroup.go
index 243fc01..4ded8b1 100644
--- a/services/syncbase/vsync/syncgroup.go
+++ b/services/syncbase/vsync/syncgroup.go
@@ -125,10 +125,10 @@
prefixes := make(map[string]bool, len(spec.Prefixes))
for _, p := range spec.Prefixes {
if !pubutil.ValidTableName(p.TableName) {
- return verror.New(verror.ErrBadArg, ctx, fmt.Sprintf("group has a SyncgroupPrefix with invalid table name %q", p.TableName))
+ return verror.New(verror.ErrBadArg, ctx, fmt.Sprintf("group has a TableRow with invalid table name %q", p.TableName))
}
- if p.RowPrefix != "" && !pubutil.ValidRowKey(p.RowPrefix) {
- return verror.New(verror.ErrBadArg, ctx, fmt.Sprintf("group has a SyncgroupPrefix with invalid row prefix %q", p.RowPrefix))
+ if p.Row != "" && !pubutil.ValidRowKey(p.Row) {
+ return verror.New(verror.ErrBadArg, ctx, fmt.Sprintf("group has a TableRow with invalid row prefix %q", p.Row))
}
prefixes[toTableRowPrefixStr(p)] = true
}
@@ -139,7 +139,7 @@
}
// samePrefixes returns true if the two sets of prefixes are the same.
-func samePrefixes(pfx1, pfx2 []wire.SyncgroupPrefix) bool {
+func samePrefixes(pfx1, pfx2 []wire.TableRow) bool {
pfxMap := make(map[string]uint8)
for _, p := range pfx1 {
pfxMap[toTableRowPrefixStr(p)] |= 0x01
@@ -419,7 +419,7 @@
// make forEachSyncgroup() stop the iteration earlier; otherwise the function
// loops across all Syncgroups in the Database.
func forEachSyncgroup(st store.StoreReader, callback func(*interfaces.Syncgroup) bool) {
- stream := st.Scan(util.ScanPrefixArgs(sgNameKeyPrefix, ""))
+ stream := st.Scan(util.ScanPrefixArgs(sgNamePrefix, ""))
defer stream.Cancel()
for stream.Advance() {
@@ -497,47 +497,34 @@
// relationships.
// Use the functions above to manipulate syncgroups.
-var (
- // Prefixes used to store the different mappings of a syncgroup:
- // sgNameKeyPrefix: name --> ID
- // sgIdKeyPrefix: ID --> syncgroup local state
- // sgDataKeyPrefix: (ID, version) --> syncgroup data (synchronized)
- //
- // Note: as with other syncable objects, the DAG "heads" table contains
- // a reference to the current syncgroup version, and the DAG "nodes"
- // table tracks its history of mutations.
- sgNameKeyPrefix = util.JoinKeyParts(util.SyncPrefix, sgPrefix, "n")
- sgIdKeyPrefix = util.JoinKeyParts(util.SyncPrefix, sgPrefix, "i")
- sgDataKeyPrefix = util.JoinKeyParts(util.SyncPrefix, sgDataPrefix)
-)
+// Note: as with other syncable objects, the DAG "heads" table contains a
+// reference to the current syncgroup version, and the DAG "nodes" table tracks
+// its history of mutations.
// sgNameKey returns the key used to access the syncgroup name entry.
func sgNameKey(name string) string {
- return util.JoinKeyParts(sgNameKeyPrefix, name)
+ return util.JoinKeyParts(sgNamePrefix, name)
}
// sgIdKey returns the key used to access the syncgroup ID entry.
func sgIdKey(gid interfaces.GroupId) string {
- return util.JoinKeyParts(sgIdKeyPrefix, fmt.Sprintf("%d", gid))
+ return util.JoinKeyParts(sgIdPrefix, fmt.Sprintf("%d", gid))
}
// sgOID converts a group id into an oid string.
func sgOID(gid interfaces.GroupId) string {
- return util.JoinKeyParts(sgDataKeyPrefix, fmt.Sprintf("%d", gid))
+ return util.JoinKeyParts(sgDataPrefix, fmt.Sprintf("%d", gid))
}
-// sgID is approximately the inverse of sgOID: it converts an oid string into a
-// group id, but assumes that oid is prefixed with util.SyncPrefix (whereas
-// sgOID does not prepend util.SyncPrefix).
-// TODO(hpucha): Add unittests that cover sgOID/sgID (and other such helpers).
-// In CL v.io/c/16919, an incorrect change to the implementation of sgID was
-// only caught by integration tests.
+// sgID is the inverse of sgOID.
+// TODO(hpucha): Add unittests for sgOID/sgID and other such helpers. In CLs
+// v.io/c/16919 and v.io/c/17043, bugs in sgID were only caught by integration
+// tests.
func sgID(oid string) (interfaces.GroupId, error) {
- parts := util.SplitKeyParts(oid)
+ parts := util.SplitNKeyParts(oid, 3)
if len(parts) != 3 {
return 0, fmt.Errorf("invalid sgoid %s", oid)
}
-
id, err := strconv.ParseUint(parts[2], 10, 64)
if err != nil {
return 0, err
@@ -547,7 +534,7 @@
// sgDataKey returns the key used to access a version of the syncgroup data.
func sgDataKey(gid interfaces.GroupId, version string) string {
- return util.JoinKeyParts(sgDataKeyPrefix, fmt.Sprintf("%d", gid), version)
+ return sgDataKeyByOID(sgOID(gid), version)
}
// sgDataKeyByOID returns the key used to access a version of the syncgroup
@@ -560,7 +547,7 @@
func splitSgNameKey(ctx *context.T, key string) (string, error) {
// Note that the actual syncgroup name may contain ":" as a separator.
// So don't split the key on the separator, instead trim its prefix.
- prefix := util.JoinKeyParts(sgNameKeyPrefix, "")
+ prefix := util.JoinKeyParts(sgNamePrefix, "")
name := strings.TrimPrefix(key, prefix)
if name == key {
return "", verror.New(verror.ErrInternal, ctx, "invalid sgNamekey", key)
@@ -618,11 +605,7 @@
// getSGDataEntry retrieves the syncgroup data for a given group ID and version.
func getSGDataEntry(ctx *context.T, st store.StoreReader, gid interfaces.GroupId, version string) (*interfaces.Syncgroup, error) {
- var sg interfaces.Syncgroup
- if err := util.Get(ctx, st, sgDataKey(gid, version), &sg); err != nil {
- return nil, err
- }
- return &sg, nil
+ return getSGDataEntryByOID(ctx, st, sgOID(gid), version)
}
// getSGDataEntryByOID retrieves the syncgroup data for a given group OID and
@@ -845,7 +828,7 @@
}
// Scan all the syncgroup names found in the Database.
- stream := sn.Scan(util.ScanPrefixArgs(sgNameKeyPrefix, ""))
+ stream := sn.Scan(util.ScanPrefixArgs(sgNamePrefix, ""))
var sgNames []string
var key []byte
for stream.Advance() {
@@ -1082,7 +1065,7 @@
// be time consuming. Consider doing it asynchronously and letting the server
// reply to the client earlier. However it must happen within the scope of this
// transaction (and its snapshot view).
-func (sd *syncDatabase) bootstrapSyncgroup(ctx *context.T, tx store.Transaction, sgId interfaces.GroupId, prefixes []wire.SyncgroupPrefix) error {
+func (sd *syncDatabase) bootstrapSyncgroup(ctx *context.T, tx store.Transaction, sgId interfaces.GroupId, prefixes []wire.TableRow) error {
if len(prefixes) == 0 {
return verror.New(verror.ErrInternal, ctx, "no prefixes specified")
}
diff --git a/services/syncbase/vsync/syncgroup_test.go b/services/syncbase/vsync/syncgroup_test.go
index 6975eff..c26ac2a 100644
--- a/services/syncbase/vsync/syncgroup_test.go
+++ b/services/syncbase/vsync/syncgroup_test.go
@@ -69,7 +69,7 @@
Creator: "mockCreator",
SpecVersion: "etag-0",
Spec: wire.SyncgroupSpec{
- Prefixes: []wire.SyncgroupPrefix{{TableName: "foo", RowPrefix: ""}, {TableName: "bar", RowPrefix: ""}},
+ Prefixes: []wire.TableRow{{TableName: "foo", Row: ""}, {TableName: "bar", Row: ""}},
},
Joiners: map[string]wire.SyncgroupMemberInfo{
"phone": wire.SyncgroupMemberInfo{SyncPriority: 10},
@@ -210,7 +210,7 @@
Creator: "mockCreator",
SpecVersion: "etag-0",
Spec: wire.SyncgroupSpec{
- Prefixes: []wire.SyncgroupPrefix{{TableName: "foo", RowPrefix: ""}, {TableName: "bar", RowPrefix: ""}},
+ Prefixes: []wire.TableRow{{TableName: "foo", Row: ""}, {TableName: "bar", Row: ""}},
},
Joiners: map[string]wire.SyncgroupMemberInfo{
"phone": wire.SyncgroupMemberInfo{SyncPriority: 10},
@@ -253,15 +253,15 @@
checkBadAddSyncgroup(t, st, sg, "SG w/o Prefixes")
sg = mkSg()
- sg.Spec.Prefixes = []wire.SyncgroupPrefix{{TableName: "foo", RowPrefix: ""}, {TableName: "bar", RowPrefix: ""}, {TableName: "foo", RowPrefix: ""}}
+ sg.Spec.Prefixes = []wire.TableRow{{TableName: "foo", Row: ""}, {TableName: "bar", Row: ""}, {TableName: "foo", Row: ""}}
checkBadAddSyncgroup(t, st, sg, "SG with duplicate Prefixes")
sg = mkSg()
- sg.Spec.Prefixes = []wire.SyncgroupPrefix{{TableName: "", RowPrefix: ""}}
+ sg.Spec.Prefixes = []wire.TableRow{{TableName: "", Row: ""}}
checkBadAddSyncgroup(t, st, sg, "SG with invalid (empty) table name")
sg = mkSg()
- sg.Spec.Prefixes = []wire.SyncgroupPrefix{{TableName: "a", RowPrefix: "\xfe"}}
+ sg.Spec.Prefixes = []wire.TableRow{{TableName: "a", Row: "\xfe"}}
checkBadAddSyncgroup(t, st, sg, "SG with invalid row prefix")
}
@@ -300,7 +300,7 @@
Creator: "mockCreator",
SpecVersion: "etag-0",
Spec: wire.SyncgroupSpec{
- Prefixes: []wire.SyncgroupPrefix{{TableName: "foo", RowPrefix: ""}, {TableName: "bar", RowPrefix: ""}},
+ Prefixes: []wire.TableRow{{TableName: "foo", Row: ""}, {TableName: "bar", Row: ""}},
},
Joiners: map[string]wire.SyncgroupMemberInfo{
"phone": wire.SyncgroupMemberInfo{SyncPriority: 10},
@@ -385,7 +385,7 @@
SpecVersion: "etag-1",
Spec: wire.SyncgroupSpec{
MountTables: []string{"mt1"},
- Prefixes: []wire.SyncgroupPrefix{{TableName: "foo", RowPrefix: ""}},
+ Prefixes: []wire.TableRow{{TableName: "foo", Row: ""}},
},
Joiners: map[string]wire.SyncgroupMemberInfo{
"phone": wire.SyncgroupMemberInfo{SyncPriority: 10},
@@ -402,7 +402,7 @@
SpecVersion: "etag-2",
Spec: wire.SyncgroupSpec{
MountTables: []string{"mt2", "mt3"},
- Prefixes: []wire.SyncgroupPrefix{{TableName: "bar", RowPrefix: ""}},
+ Prefixes: []wire.TableRow{{TableName: "bar", Row: ""}},
},
Joiners: map[string]wire.SyncgroupMemberInfo{
"tablet": wire.SyncgroupMemberInfo{SyncPriority: 111},
@@ -568,14 +568,14 @@
// TestPrefixCompare tests the prefix comparison utility.
func TestPrefixCompare(t *testing.T) {
- mksgps := func(strs []string) []wire.SyncgroupPrefix {
- res := make([]wire.SyncgroupPrefix, len(strs))
+ mksgps := func(strs []string) []wire.TableRow {
+ res := make([]wire.TableRow, len(strs))
for i, v := range strs {
parts := strings.SplitN(v, ":", 2)
if len(parts) != 2 {
- t.Fatalf("invalid SyncgroupPrefix string: %s", v)
+ t.Fatalf("invalid TableRow string: %s", v)
}
- res[i] = wire.SyncgroupPrefix{TableName: parts[0], RowPrefix: parts[1]}
+ res[i] = wire.TableRow{TableName: parts[0], Row: parts[1]}
}
return res
}
diff --git a/services/syncbase/vsync/testdata/local-init-00.log.sync b/services/syncbase/vsync/testdata/local-init-00.log.sync
index 59fb856..543536f 100644
--- a/services/syncbase/vsync/testdata/local-init-00.log.sync
+++ b/services/syncbase/vsync/testdata/local-init-00.log.sync
@@ -1,6 +1,6 @@
# Create an object locally and update it twice (linked-list).
# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
-addl|tb\xfefoo1|1|||$sync\xfelog\xfedata\xfe10\xfe1|0|1|false
-addl|tb\xfefoo1|2|1||$sync\xfelog\xfedata\xfe10\xfe2|0|1|false
-addl|tb\xfefoo1|3|2||$sync\xfelog\xfedata\xfe10\xfe3|0|1|false
+addl|tb\xfefoo1|1|||y\xfel\xfed\xfe10\xfe1|0|1|false
+addl|tb\xfefoo1|2|1||y\xfel\xfed\xfe10\xfe2|0|1|false
+addl|tb\xfefoo1|3|2||y\xfel\xfed\xfe10\xfe3|0|1|false
diff --git a/services/syncbase/vsync/testdata/remote-conf-00.log.sync b/services/syncbase/vsync/testdata/remote-conf-00.log.sync
index 261f520..985885b 100644
--- a/services/syncbase/vsync/testdata/remote-conf-00.log.sync
+++ b/services/syncbase/vsync/testdata/remote-conf-00.log.sync
@@ -3,6 +3,6 @@
# it from the local sync at v2, then updated separately).
# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
-addr|tb\xfefoo1|4|2||$sync\xfelog\xfedata\xfe11\xfe1|0|1|false
-addr|tb\xfefoo1|5|4||$sync\xfelog\xfedata\xfe11\xfe2|0|1|false
-addr|tb\xfefoo1|6|5||$sync\xfelog\xfedata\xfe11\xfe3|0|1|false
+addr|tb\xfefoo1|4|2||y\xfel\xfed\xfe11\xfe1|0|1|false
+addr|tb\xfefoo1|5|4||y\xfel\xfed\xfe11\xfe2|0|1|false
+addr|tb\xfefoo1|6|5||y\xfel\xfed\xfe11\xfe3|0|1|false
diff --git a/services/syncbase/vsync/testdata/remote-conf-01.log.sync b/services/syncbase/vsync/testdata/remote-conf-01.log.sync
index ee1b836..a2f230f 100644
--- a/services/syncbase/vsync/testdata/remote-conf-01.log.sync
+++ b/services/syncbase/vsync/testdata/remote-conf-01.log.sync
@@ -5,6 +5,6 @@
# sees 2 graft points: v1-v4 and v2-v5.
# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
-addr|tb\xfefoo1|4|1||$sync\xfelog\xfedata\xfe12\xfe1|0|1|false
-addr|tb\xfefoo1|5|2|4|$sync\xfelog\xfedata\xfe11\xfe1|0|1|false
-addr|tb\xfefoo1|6|5||$sync\xfelog\xfedata\xfe11\xfe2|0|1|false
+addr|tb\xfefoo1|4|1||y\xfel\xfed\xfe12\xfe1|0|1|false
+addr|tb\xfefoo1|5|2|4|y\xfel\xfed\xfe11\xfe1|0|1|false
+addr|tb\xfefoo1|6|5||y\xfel\xfed\xfe11\xfe2|0|1|false
diff --git a/services/syncbase/vsync/testdata/remote-conf-03.log.sync b/services/syncbase/vsync/testdata/remote-conf-03.log.sync
index 8aed9dc..2a68ea7 100644
--- a/services/syncbase/vsync/testdata/remote-conf-03.log.sync
+++ b/services/syncbase/vsync/testdata/remote-conf-03.log.sync
@@ -1,6 +1,6 @@
# Create the same object remotely from scratch and update it twice (linked-list).
# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
-addr|tb\xfefoo1|4|||$sync\xfelog\xfedata\xfe11\xfe1|0|1|false
-addr|tb\xfefoo1|5|4||$sync\xfelog\xfedata\xfe11\xfe2|0|1|false
-addr|tb\xfefoo1|6|5||$sync\xfelog\xfedata\xfe11\xfe3|0|1|false
+addr|tb\xfefoo1|4|||y\xfel\xfed\xfe11\xfe1|0|1|false
+addr|tb\xfefoo1|5|4||y\xfel\xfed\xfe11\xfe2|0|1|false
+addr|tb\xfefoo1|6|5||y\xfel\xfed\xfe11\xfe3|0|1|false
diff --git a/services/syncbase/vsync/testdata/remote-conf-link.log.sync b/services/syncbase/vsync/testdata/remote-conf-link.log.sync
index d3be9e4..10d083c 100644
--- a/services/syncbase/vsync/testdata/remote-conf-link.log.sync
+++ b/services/syncbase/vsync/testdata/remote-conf-link.log.sync
@@ -1,5 +1,5 @@
# Update an object remotely, detect conflict, and bless the local version.
# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
-addr|tb\xfefoo1|4|1||$sync\xfelog\xfe11\xfe1|0|1|false
-linkr|tb\xfefoo1|4|2||$sync\xfelog\xfe11\xfe2
+addr|tb\xfefoo1|4|1||logrec-01|0|1|false
+linkr|tb\xfefoo1|4|2||logrec-02
diff --git a/services/syncbase/vsync/testdata/remote-init-00.log.sync b/services/syncbase/vsync/testdata/remote-init-00.log.sync
index 1683d08..d77bf85 100644
--- a/services/syncbase/vsync/testdata/remote-init-00.log.sync
+++ b/services/syncbase/vsync/testdata/remote-init-00.log.sync
@@ -2,7 +2,7 @@
# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
# TODO(rdaoud): The above comment is incorrect for the 'genvec' line.
-addr|tb\xfefoo1|1|||$sync\xfelog\xfedata\xfe11\xfe1|0|1|false
-addr|tb\xfefoo1|2|1||$sync\xfelog\xfedata\xfe11\xfe2|0|1|false
-addr|tb\xfefoo1|3|2||$sync\xfelog\xfedata\xfe11\xfe3|0|1|false
+addr|tb\xfefoo1|1|||y\xfel\xfed\xfe11\xfe1|0|1|false
+addr|tb\xfefoo1|2|1||y\xfel\xfed\xfe11\xfe2|0|1|false
+addr|tb\xfefoo1|3|2||y\xfel\xfed\xfe11\xfe3|0|1|false
genvec|tb\xfefoo1|10:0,11:3|tb\xfebar|11:0
diff --git a/services/syncbase/vsync/testdata/remote-noconf-00.log.sync b/services/syncbase/vsync/testdata/remote-noconf-00.log.sync
index e54a5ec..4f0b2e4 100644
--- a/services/syncbase/vsync/testdata/remote-noconf-00.log.sync
+++ b/services/syncbase/vsync/testdata/remote-noconf-00.log.sync
@@ -4,7 +4,7 @@
# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
# TODO(rdaoud): The above comment is incorrect for the 'genvec' line.
-addr|tb\xfefoo1|4|3||$sync\xfelog\xfedata\xfe11\xfe1|0|1|false
-addr|tb\xfefoo1|5|4||$sync\xfelog\xfedata\xfe11\xfe2|0|1|false
-addr|tb\xfefoo1|6|5||$sync\xfelog\xfedata\xfe11\xfe3|0|1|false
+addr|tb\xfefoo1|4|3||y\xfel\xfed\xfe11\xfe1|0|1|false
+addr|tb\xfefoo1|5|4||y\xfel\xfed\xfe11\xfe2|0|1|false
+addr|tb\xfefoo1|6|5||y\xfel\xfed\xfe11\xfe3|0|1|false
genvec|tb\xfefoo1|10:0,11:3|tb\xfebar|11:0
diff --git a/services/syncbase/vsync/testdata/remote-noconf-link-00.log.sync b/services/syncbase/vsync/testdata/remote-noconf-link-00.log.sync
index 11e0df5..c9b6b25 100644
--- a/services/syncbase/vsync/testdata/remote-noconf-link-00.log.sync
+++ b/services/syncbase/vsync/testdata/remote-noconf-link-00.log.sync
@@ -1,5 +1,5 @@
# Update an object remotely, detect conflict, and bless the remote version.
# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
-addr|tb\xfefoo1|4|1||$sync\xfelog\xfe11\xfe1|0|1|false
-linkr|tb\xfefoo1|2|4||$sync\xfelog\xfe11\xfe2
+addr|tb\xfefoo1|4|1||logrec-01|0|1|false
+linkr|tb\xfefoo1|2|4||logrec-02
diff --git a/services/syncbase/vsync/testdata/remote-noconf-link-01.log.sync b/services/syncbase/vsync/testdata/remote-noconf-link-01.log.sync
index 840514c..3606a13 100644
--- a/services/syncbase/vsync/testdata/remote-noconf-link-01.log.sync
+++ b/services/syncbase/vsync/testdata/remote-noconf-link-01.log.sync
@@ -1,5 +1,5 @@
# Update an object remotely, detect conflict, and bless the local version.
# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
-addr|tb\xfefoo1|4|1||$sync\xfelog\xfe11\xfe1|0|1|false
-linkr|tb\xfefoo1|4|3||$sync\xfelog\xfe11\xfe2
+addr|tb\xfefoo1|4|1||logrec-01|0|1|false
+linkr|tb\xfefoo1|4|3||logrec-02
diff --git a/services/syncbase/vsync/testdata/remote-noconf-link-02.log.sync b/services/syncbase/vsync/testdata/remote-noconf-link-02.log.sync
index cd42033..a1db71a 100644
--- a/services/syncbase/vsync/testdata/remote-noconf-link-02.log.sync
+++ b/services/syncbase/vsync/testdata/remote-noconf-link-02.log.sync
@@ -1,6 +1,6 @@
# Update an object remotely, detect conflict, and bless the remote version, and continue updating.
# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
-addr|tb\xfefoo1|4|1||$sync\xfelog\xfe11\xfe1|0|1|false
-linkr|tb\xfefoo1|3|4||$sync\xfelog\xfe11\xfe2
-addr|tb\xfefoo1|5|3||$sync\xfelog\xfe11\xfe3|0|1|false
+addr|tb\xfefoo1|4|1||logrec-01|0|1|false
+linkr|tb\xfefoo1|3|4||logrec-02
+addr|tb\xfefoo1|5|3||logrec-03|0|1|false
diff --git a/services/syncbase/vsync/testdata/remote-noconf-link-repeat.log.sync b/services/syncbase/vsync/testdata/remote-noconf-link-repeat.log.sync
index cbcdd58..6c08648 100644
--- a/services/syncbase/vsync/testdata/remote-noconf-link-repeat.log.sync
+++ b/services/syncbase/vsync/testdata/remote-noconf-link-repeat.log.sync
@@ -1,4 +1,4 @@
# Resolve the same conflict on two different devices.
# The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
-linkr|tb\xfefoo1|3|4||$sync\xfelog\xfe12\xfe1
+linkr|tb\xfefoo1|3|4||logrec-01
diff --git a/services/syncbase/vsync/types.vdl b/services/syncbase/vsync/types.vdl
index 419d59a..12f8cbf 100644
--- a/services/syncbase/vsync/types.vdl
+++ b/services/syncbase/vsync/types.vdl
@@ -8,17 +8,6 @@
"v.io/x/ref/services/syncbase/server/interfaces"
)
-// Key prefixes for sync data structures. All these prefixes are prepended with
-// util.SyncPrefix.
-const (
- logPrefix = "log" // log state.
- logDataPrefix = "data" // data log state.
- dbssPrefix = "dbss" // database sync state.
- dagPrefix = "dag" // dag state.
- sgPrefix = "sg" // local syncgroup state.
- sgDataPrefix = "sgd" // synced syncgroup state.
-)
-
// syncData represents the persistent state of the sync module.
type syncData struct {
Id uint64
@@ -105,7 +94,7 @@
// - The total count of batch objects, including non-syncable ones.
// TODO(rdaoud): add support to track the read and scan sets.
type batchInfo struct {
- Objects map[string]string
+ Objects map[string]string
LinkedObjects map[string]string
- Count uint64
+ Count uint64
}
diff --git a/services/syncbase/vsync/types.vdl.go b/services/syncbase/vsync/types.vdl.go
index c61e061..ec04be8 100644
--- a/services/syncbase/vsync/types.vdl.go
+++ b/services/syncbase/vsync/types.vdl.go
@@ -141,15 +141,3 @@
vdl.Register((*dagNode)(nil))
vdl.Register((*batchInfo)(nil))
}
-
-const logPrefix = "log" // log state.
-
-const logDataPrefix = "data" // data log state.
-
-const dbssPrefix = "dbss" // database sync state.
-
-const dagPrefix = "dag" // dag state.
-
-const sgPrefix = "sg" // local syncgroup state.
-
-const sgDataPrefix = "sgd" // synced syncgroup state.
diff --git a/services/syncbase/vsync/util.go b/services/syncbase/vsync/util.go
index 4842a0c..b41887c 100644
--- a/services/syncbase/vsync/util.go
+++ b/services/syncbase/vsync/util.go
@@ -90,17 +90,17 @@
return time.Unix(timestamp/nanoPerSec, timestamp%nanoPerSec)
}
-// toTableRowPrefixStr converts a SyncgroupPrefix (tableName-rowPrefix pair) to
-// a string of the form used for storing perms and row data in the underlying
-// storage engine.
-func toTableRowPrefixStr(p wire.SyncgroupPrefix) string {
- return util.JoinKeyParts(p.TableName, p.RowPrefix)
+// toTableRowPrefixStr converts a TableRow (table name and row key or prefix
+// pair) to a string of the form used for storing perms and row data in the
+// underlying storage engine.
+func toTableRowPrefixStr(p wire.TableRow) string {
+ return util.JoinKeyParts(p.TableName, p.Row)
}
// toRowKey prepends RowPrefix to what is presumably a "<table>:<row>" string,
// yielding a storage engine key for a row.
// TODO(sadovsky): Only used by CR code. Should go away once CR stores table
-// name and row key as separate fields in a "TableAndRow" struct.
+// name and row key as separate fields in a "TableRow" struct.
func toRowKey(tableRow string) string {
return util.JoinKeyParts(util.RowPrefix, tableRow)
}
diff --git a/services/syncbase/vsync/watcher.go b/services/syncbase/vsync/watcher.go
index c61ff55..e43d8d1 100644
--- a/services/syncbase/vsync/watcher.go
+++ b/services/syncbase/vsync/watcher.go
@@ -203,7 +203,7 @@
if err != nil {
// TODO(rdaoud): don't crash, quarantine this app database.
- vlog.Fatalf("sync: processWatchLogBatch:: %s, %s: watcher cannot process batch: %v", appName, dbName, err)
+ vlog.Fatalf("sync: processWatchLogBatch: %s, %s: watcher cannot process batch: %v", appName, dbName, err)
}
}
diff --git a/services/syncbase/vsync/watcher_test.go b/services/syncbase/vsync/watcher_test.go
index 66a6c55..86d9bb2 100644
--- a/services/syncbase/vsync/watcher_test.go
+++ b/services/syncbase/vsync/watcher_test.go
@@ -306,7 +306,7 @@
// batch is an application batch with 3 keys of which 2 are syncable.
// The 3rd batch is also a syncgroup snapshot.
count := 0
- start, limit := util.ScanPrefixArgs(util.JoinKeyParts(util.SyncPrefix, "dag", "b"), "")
+ start, limit := util.ScanPrefixArgs(dagBatchPrefix, "")
stream := st.Scan(start, limit)
for stream.Advance() {
count++
@@ -323,6 +323,6 @@
}
}
if count != 1 {
- t.Errorf("wrong count of batches: got %d instead of 2", count)
+ t.Errorf("wrong count of batches: got %d instead of 1", count)
}
}