Merge "syncbase/store: store in-flight mutations of a transcation in a ptrie"
diff --git a/cmd/sb51/internal/demodb/db.go b/cmd/sb51/internal/demodb/db.go
index 88b84f5..0429b4f 100644
--- a/cmd/sb51/internal/demodb/db.go
+++ b/cmd/sb51/internal/demodb/db.go
@@ -107,6 +107,37 @@
},
},
},
+ table{
+ name: "Students",
+ rows: []kv{
+ kv{
+ "1",
+ vdl.ValueOf(Student{Name: "John Smith", TestTime: t("Jul 22 12:34:56 PDT 2015"), Score: ActOrSatScoreActScore{Value: 36}}),
+ },
+ kv{
+ "2",
+ vdl.ValueOf(Student{Name: "Mary Jones", TestTime: t("Sep 4 01:23:45 PDT 2015"), Score: ActOrSatScoreSatScore{Value: 1200}}),
+ },
+ },
+ },
+ table{
+ name: "AnythingGoes",
+ rows: []kv{
+ kv{
+ "bar",
+ vdl.ValueOf(AnythingGoes{NameOfType: "Student", Anything: vdl.ValueOf(Student{Name: "John Smith", Score: ActOrSatScoreActScore{Value: 36}})}),
+ },
+ kv{
+ "baz",
+ vdl.ValueOf(AnythingGoes{NameOfType: "Customer", Anything: vdl.ValueOf(Customer{"Bat Masterson", 2, true, AddressInfo{"777 Any St.", "Collins", "IA", "50055"}, CreditReport{Agency: CreditAgencyTransUnion, Report: AgencyReportTransUnionReport{TransUnionCreditReport{80}}}})}),
+ },
+ },
+ },
+}
+
+func t(timeStr string) time.Time {
+ t, _ := time.Parse("Jan 2 15:04:05 MST 2006", timeStr)
+ return t
}
// Creates demo tables in the provided database. Tables are destroyed and
diff --git a/cmd/sb51/internal/demodb/db_objects.vdl b/cmd/sb51/internal/demodb/db_objects.vdl
index cbf119a..eb32dcc 100644
--- a/cmd/sb51/internal/demodb/db_objects.vdl
+++ b/cmd/sb51/internal/demodb/db_objects.vdl
@@ -113,3 +113,19 @@
Maybe ?Times
Rec map[Array2String]Recursive
}
+
+type ActOrSatScore union {
+ ActScore uint16
+ SatScore uint16
+}
+
+type Student struct {
+ Name string
+ TestTime time.Time
+ Score ActOrSatScore
+}
+
+type AnythingGoes struct {
+ NameOfType string
+ Anything any
+}
diff --git a/cmd/sb51/internal/demodb/db_objects.vdl.go b/cmd/sb51/internal/demodb/db_objects.vdl.go
index ebdda02..6298c30 100644
--- a/cmd/sb51/internal/demodb/db_objects.vdl.go
+++ b/cmd/sb51/internal/demodb/db_objects.vdl.go
@@ -360,6 +360,64 @@
}) {
}
+type (
+ // ActOrSatScore represents any single field of the ActOrSatScore union type.
+ ActOrSatScore interface {
+ // Index returns the field index.
+ Index() int
+ // Interface returns the field value as an interface.
+ Interface() interface{}
+ // Name returns the field name.
+ Name() string
+ // __VDLReflect describes the ActOrSatScore union type.
+ __VDLReflect(__ActOrSatScoreReflect)
+ }
+ // ActOrSatScoreActScore represents field ActScore of the ActOrSatScore union type.
+ ActOrSatScoreActScore struct{ Value uint16 }
+ // ActOrSatScoreSatScore represents field SatScore of the ActOrSatScore union type.
+ ActOrSatScoreSatScore struct{ Value uint16 }
+ // __ActOrSatScoreReflect describes the ActOrSatScore union type.
+ __ActOrSatScoreReflect struct {
+ Name string `vdl:"v.io/x/ref/cmd/sb51/internal/demodb.ActOrSatScore"`
+ Type ActOrSatScore
+ Union struct {
+ ActScore ActOrSatScoreActScore
+ SatScore ActOrSatScoreSatScore
+ }
+ }
+)
+
+func (x ActOrSatScoreActScore) Index() int { return 0 }
+func (x ActOrSatScoreActScore) Interface() interface{} { return x.Value }
+func (x ActOrSatScoreActScore) Name() string { return "ActScore" }
+func (x ActOrSatScoreActScore) __VDLReflect(__ActOrSatScoreReflect) {}
+
+func (x ActOrSatScoreSatScore) Index() int { return 1 }
+func (x ActOrSatScoreSatScore) Interface() interface{} { return x.Value }
+func (x ActOrSatScoreSatScore) Name() string { return "SatScore" }
+func (x ActOrSatScoreSatScore) __VDLReflect(__ActOrSatScoreReflect) {}
+
+type Student struct {
+ Name string
+ TestTime time.Time
+ Score ActOrSatScore
+}
+
+func (Student) __VDLReflect(struct {
+ Name string `vdl:"v.io/x/ref/cmd/sb51/internal/demodb.Student"`
+}) {
+}
+
+type AnythingGoes struct {
+ NameOfType string
+ Anything *vdl.Value
+}
+
+func (AnythingGoes) __VDLReflect(struct {
+ Name string `vdl:"v.io/x/ref/cmd/sb51/internal/demodb.AnythingGoes"`
+}) {
+}
+
func init() {
vdl.Register((*AddressInfo)(nil))
vdl.Register((*CreditAgency)(nil))
@@ -380,4 +438,7 @@
vdl.Register((*Composite)(nil))
vdl.Register((*Times)(nil))
vdl.Register((*Recursive)(nil))
+ vdl.Register((*ActOrSatScore)(nil))
+ vdl.Register((*Student)(nil))
+ vdl.Register((*AnythingGoes)(nil))
}
diff --git a/cmd/sb51/shell.go b/cmd/sb51/shell.go
index fb28d3c..e97419a 100644
--- a/cmd/sb51/shell.go
+++ b/cmd/sb51/shell.go
@@ -82,7 +82,7 @@
}
if flagMakeDemoTables {
- if err := makeDemoDB(ctx, d); err != nil {
+ if err := makeDemoDB(ctx, env.Stdout, d); err != nil {
return err
}
}
@@ -122,7 +122,7 @@
err = dumpDB(ctx, env.Stdout, d)
case "make-demo":
// TODO(jkline): add an "Are you sure prompt" to give the user a 2nd chance.
- err = makeDemoDB(ctx, d)
+ err = makeDemoDB(ctx, env.Stdout, d)
case "select":
err = queryExec(ctx, env.Stdout, d, q)
default:
@@ -191,8 +191,10 @@
return nil
}
-func makeDemoDB(ctx *context.T, d nosql.Database) error {
- if err := demodb.PopulateDemoDB(ctx, d); err != nil {
+func makeDemoDB(ctx *context.T, w io.Writer, d nosql.Database) error {
+ if err := demodb.PopulateDemoDB(ctx, d); err == nil {
+ fmt.Fprintln(w, "Demo tables created and populated.")
+ } else {
return fmt.Errorf("failed making demo tables: %v", err)
}
return nil
diff --git a/lib/discovery/advertise.go b/lib/discovery/advertise.go
index 3930f63..5abdbe4 100644
--- a/lib/discovery/advertise.go
+++ b/lib/discovery/advertise.go
@@ -25,15 +25,13 @@
if len(service.InterfaceName) == 0 {
return verror.New(errNoInterfaceName, ctx)
}
- if !IsAttributePackable(service.Attrs) {
- return verror.New(errNotPackableAttributes, ctx)
- }
if len(service.Addrs) == 0 {
return verror.New(errNoAddresses, ctx)
}
- if !IsAddressPackable(service.Addrs) {
- return verror.New(errNotPackableAddresses, ctx)
+ if err := validateAttributes(service.Attrs); err != nil {
+ return err
}
+
if len(service.InstanceUuid) == 0 {
service.InstanceUuid = NewInstanceUUID()
}
diff --git a/lib/discovery/discovery.go b/lib/discovery/discovery.go
index 4f2210b..e260074 100644
--- a/lib/discovery/discovery.go
+++ b/lib/discovery/discovery.go
@@ -36,7 +36,7 @@
Lost bool
}
-type EncryptionAlgorithm byte
+type EncryptionAlgorithm int
type EncryptionKey []byte
const (
diff --git a/lib/discovery/discovery_test.go b/lib/discovery/discovery_test.go
index 3875fa2..fe0b55f 100644
--- a/lib/discovery/discovery_test.go
+++ b/lib/discovery/discovery_test.go
@@ -5,6 +5,7 @@
package discovery_test
import (
+ "bytes"
"fmt"
"reflect"
"runtime"
@@ -83,18 +84,12 @@
for _, want := range wants {
matched := false
for i, update := range updates {
- var service discovery.Service
switch u := update.(type) {
case discovery.UpdateFound:
- if !lost {
- service = u.Value.Service
- }
+ matched = !lost && reflect.DeepEqual(u.Value.Service, want)
case discovery.UpdateLost:
- if lost {
- service = u.Value.Service
- }
+ matched = lost && bytes.Equal(u.Value.InstanceUuid, want.InstanceUuid)
}
- matched = reflect.DeepEqual(service, want)
if matched {
updates = append(updates[:i], updates[i+1:]...)
break
diff --git a/lib/discovery/encoding.go b/lib/discovery/encoding.go
index 0f27651..d58b315 100644
--- a/lib/discovery/encoding.go
+++ b/lib/discovery/encoding.go
@@ -6,91 +6,102 @@
import (
"bytes"
- "fmt"
+ "encoding/binary"
+ "errors"
+ "io"
"strings"
"v.io/v23/discovery"
)
-// TODO(jhahn): Figure out how to overcome the size limit.
-
-// isAttributePackage returns false if the provided attributes cannot be serialized safely.
-func IsAttributePackable(attrs discovery.Attributes) bool {
- for k, v := range attrs {
- if strings.HasPrefix(k, "_") || strings.Contains(k, "=") {
- return false
+// validateAttributes returns an error if the attributes are not suitable for advertising.
+func validateAttributes(attrs discovery.Attributes) error {
+ for k, _ := range attrs {
+ if len(k) == 0 {
+ return errors.New("empty key")
}
- if len(k)+len(v) > 254 {
- return false
+ if strings.HasPrefix(k, "_") {
+ return errors.New("key starts with '_'")
+ }
+ for _, c := range k {
+ if c < 0x20 || c > 0x7e {
+ return errors.New("key is not printable US-ASCII")
+ }
+ if c == '=' {
+ return errors.New("key includes '='")
+ }
}
}
- return true
+ return nil
}
-// IsAddressPackable returns false if any address is larger than 250 bytes.
-//
-// go-mdns-sd package limits the size of each txt record to 255 bytes. We use
-// 5 bytes for tag, so we limit the address to 250 bytes.
-func IsAddressPackable(addrs []string) bool {
- for _, a := range addrs {
- if len(a) > 250 {
- return false
- }
- }
- return true
-}
-
-// PackAddresses packs addresses into a byte slice. If any address exceeds
-// 255 bytes, it will panic.
+// PackAddresses packs addresses into a byte slice.
func PackAddresses(addrs []string) []byte {
- var b bytes.Buffer
+ var buf bytes.Buffer
for _, a := range addrs {
- n := len(a)
- if n > 255 {
- panic(fmt.Sprintf("too large address %d: %s", n, a))
- }
- b.WriteByte(byte(n))
- b.WriteString(a)
+ writeInt(&buf, len(a))
+ buf.WriteString(a)
}
- return b.Bytes()
+ return buf.Bytes()
}
// UnpackAddresses unpacks addresses from a byte slice.
-func UnpackAddresses(data []byte) []string {
- addrs := []string{}
- for off := 0; off < len(data); {
- n := int(data[off])
- off++
- addrs = append(addrs, string(data[off:off+n]))
- off += n
- }
- return addrs
-}
-
-// PackEncryptionKeys packs keys into a byte slice.
-func PackEncryptionKeys(algo EncryptionAlgorithm, keys []EncryptionKey) []byte {
- var b bytes.Buffer
- b.WriteByte(byte(algo))
- for _, k := range keys {
- n := len(k)
- if n > 255 {
- panic(fmt.Sprintf("too large key %d", n))
+func UnpackAddresses(data []byte) ([]string, error) {
+ var addrs []string
+ for r := bytes.NewBuffer(data); r.Len() > 0; {
+ n, err := readInt(r)
+ if err != nil {
+ return nil, err
}
- b.WriteByte(byte(n))
- b.Write(k)
+ b := r.Next(n)
+ if len(b) != n {
+ return nil, errors.New("invalid addresses")
+ }
+ addrs = append(addrs, string(b))
}
- return b.Bytes()
+ return addrs, nil
}
-// UnpackEncryptionKeys unpacks keys from a byte slice.
-func UnpackEncryptionKeys(data []byte) (EncryptionAlgorithm, []EncryptionKey) {
- algo := EncryptionAlgorithm(data[0])
- keys := []EncryptionKey{}
- for off := 1; off < len(data); {
- n := int(data[off])
- off++
- keys = append(keys, EncryptionKey(data[off:off+n]))
- off += n
+// PackEncryptionKeys packs encryption algorithm and keys into a byte slice.
+func PackEncryptionKeys(algo EncryptionAlgorithm, keys []EncryptionKey) []byte {
+ var buf bytes.Buffer
+ writeInt(&buf, int(algo))
+ for _, k := range keys {
+ writeInt(&buf, len(k))
+ buf.Write(k)
}
- return algo, keys
+ return buf.Bytes()
+}
+
+// UnpackEncryptionKeys unpacks encryption algorithm and keys from a byte slice.
+func UnpackEncryptionKeys(data []byte) (EncryptionAlgorithm, []EncryptionKey, error) {
+ buf := bytes.NewBuffer(data)
+ algo, err := readInt(buf)
+ if err != nil {
+ return NoEncryption, nil, err
+ }
+ var keys []EncryptionKey
+ for buf.Len() > 0 {
+ n, err := readInt(buf)
+ if err != nil {
+ return NoEncryption, nil, err
+ }
+ v := buf.Next(n)
+ if len(v) != n {
+ return NoEncryption, nil, errors.New("invalid encryption keys")
+ }
+ keys = append(keys, EncryptionKey(v))
+ }
+ return EncryptionAlgorithm(algo), keys, nil
+}
+
+func writeInt(w io.Writer, x int) {
+ var b [binary.MaxVarintLen64]byte
+ n := binary.PutUvarint(b[:], uint64(x))
+ w.Write(b[0:n])
+}
+
+func readInt(r io.ByteReader) (int, error) {
+ x, err := binary.ReadUvarint(r)
+ return int(x), err
}
diff --git a/lib/discovery/encoding_test.go b/lib/discovery/encoding_test.go
index ca74b57..488bc77 100644
--- a/lib/discovery/encoding_test.go
+++ b/lib/discovery/encoding_test.go
@@ -6,41 +6,31 @@
import (
"reflect"
- "strings"
"testing"
"v.io/v23/discovery"
)
-func TestAttributePackable(t *testing.T) {
- tests := []struct {
- addrs discovery.Attributes
- want bool
- }{
- {discovery.Attributes{"k": "v"}, true},
- {discovery.Attributes{"_k": "v"}, false},
- {discovery.Attributes{"k=": "v"}, false},
- {discovery.Attributes{strings.Repeat("k", 100): strings.Repeat("v", 154)}, true},
- {discovery.Attributes{strings.Repeat("k", 100): strings.Repeat("v", 155)}, false},
+func TestValidateAttributes(t *testing.T) {
+ valids := []discovery.Attributes{
+ discovery.Attributes{"key": "v"},
+ discovery.Attributes{"k_e.y": "v"},
+ discovery.Attributes{"k!": "v"},
}
- for i, test := range tests {
- if got := IsAttributePackable(test.addrs); got != test.want {
- t.Errorf("[%d]: packable %v, but want %v", i, got, test.want)
+ for i, attrs := range valids {
+ if err := validateAttributes(attrs); err != nil {
+ t.Errorf("[%d]: valid attributes got error: %v", i, err)
}
}
-}
-func TestAddressPackable(t *testing.T) {
- tests := []struct {
- addrs []string
- want bool
- }{
- {[]string{strings.Repeat("a", 250)}, true},
- {[]string{strings.Repeat("a", 10), strings.Repeat("a", 251)}, false},
+ invalids := []discovery.Attributes{
+ discovery.Attributes{"_key": "v"},
+ discovery.Attributes{"k=ey": "v"},
+ discovery.Attributes{"key\n": "v"},
}
- for i, test := range tests {
- if got := IsAddressPackable(test.addrs); got != test.want {
- t.Errorf("[%d]: packable %v, but want %v", i, got, test.want)
+ for i, attrs := range invalids {
+ if err := validateAttributes(attrs); err == nil {
+ t.Errorf("[%d]: invalid attributes didn't get error", i)
}
}
}
@@ -49,12 +39,16 @@
tests := [][]string{
[]string{"a12345"},
[]string{"a1234", "b5678", "c9012"},
- []string{},
+ nil,
}
for _, test := range tests {
pack := PackAddresses(test)
- unpack := UnpackAddresses(pack)
+ unpack, err := UnpackAddresses(pack)
+ if err != nil {
+ t.Errorf("unpacked error: %v", err)
+ continue
+ }
if !reflect.DeepEqual(test, unpack) {
t.Errorf("unpacked to %v, but want %v", unpack, test)
}
@@ -68,12 +62,16 @@
}{
{TestEncryption, []EncryptionKey{EncryptionKey("0123456789")}},
{IbeEncryption, []EncryptionKey{EncryptionKey("012345"), EncryptionKey("123456"), EncryptionKey("234567")}},
- {NoEncryption, []EncryptionKey{}},
+ {NoEncryption, nil},
}
for _, test := range tests {
pack := PackEncryptionKeys(test.algo, test.keys)
- algo, keys := UnpackEncryptionKeys(pack)
+ algo, keys, err := UnpackEncryptionKeys(pack)
+ if err != nil {
+ t.Errorf("unpacked error: %v", err)
+ continue
+ }
if algo != test.algo || !reflect.DeepEqual(keys, test.keys) {
t.Errorf("unpacked to (%d, %v), but want (%d, %v)", algo, keys, test.algo, test.keys)
}
diff --git a/lib/discovery/plugins/ble/advertisement.go b/lib/discovery/plugins/ble/advertisement.go
index b64ade1..7a8a3f9 100644
--- a/lib/discovery/plugins/ble/advertisement.go
+++ b/lib/discovery/plugins/ble/advertisement.go
@@ -25,6 +25,7 @@
// This uuids are v5 uuid generated out of band. These constants need
// to be accessible in all the languages that have a ble implementation
instanceUUID = "12db9a9c-1c7c-5560-bc6b-73a115c93413" // NewAttributeUUID("_instanceuuid")
+ instanceNameUUID = "ffbdcff3-e56f-58f0-8c1a-e416c39aac0d" // NewAttributeUUID("_instancename")
interfaceNameUUID = "b2cadfd4-d003-576c-acad-58b8e3a9cbc8" // NewAttributeUUID("_interfacename")
addrsUUID = "ad2566b7-59d8-50ae-8885-222f43f65fdc" // NewAttributeUUID("_addrs")
encryptionUUID = "6286d80a-adaa-519a-8a06-281a4645a607" // NewAttributeUUID("_encryption")
@@ -34,8 +35,15 @@
attrs := map[string][]byte{
instanceUUID: adv.InstanceUuid,
interfaceNameUUID: []byte(adv.InterfaceName),
- addrsUUID: discovery.PackAddresses(adv.Addrs),
- encryptionUUID: discovery.PackEncryptionKeys(adv.EncryptionAlgorithm, adv.EncryptionKeys),
+ }
+ if len(adv.InstanceName) > 0 {
+ attrs[instanceNameUUID] = []byte(adv.InstanceName)
+ }
+ if len(adv.Addrs) > 0 {
+ attrs[addrsUUID] = discovery.PackAddresses(adv.Addrs)
+ }
+ if adv.EncryptionAlgorithm != discovery.NoEncryption {
+ attrs[encryptionUUID] = discovery.PackEncryptionKeys(adv.EncryptionAlgorithm, adv.EncryptionKeys)
}
for k, v := range adv.Attrs {
@@ -58,16 +66,23 @@
ServiceUuid: a.serviceUUID,
}
+ var err error
for k, v := range a.attrs {
switch k {
case instanceUUID:
adv.InstanceUuid = v
+ case instanceNameUUID:
+ adv.InstanceName = string(v)
case interfaceNameUUID:
adv.InterfaceName = string(v)
case addrsUUID:
- adv.Addrs = discovery.UnpackAddresses(v)
+ if adv.Addrs, err = discovery.UnpackAddresses(v); err != nil {
+ return nil, err
+ }
case encryptionUUID:
- adv.EncryptionAlgorithm, adv.EncryptionKeys = discovery.UnpackEncryptionKeys(v)
+ if adv.EncryptionAlgorithm, adv.EncryptionKeys, err = discovery.UnpackEncryptionKeys(v); err != nil {
+ return nil, err
+ }
default:
parts := strings.SplitN(string(v), "=", 2)
if len(parts) != 2 {
diff --git a/lib/discovery/plugins/ble/advertisement_test.go b/lib/discovery/plugins/ble/advertisement_test.go
index 3fe95db..7525be1 100644
--- a/lib/discovery/plugins/ble/advertisement_test.go
+++ b/lib/discovery/plugins/ble/advertisement_test.go
@@ -19,6 +19,7 @@
v23Adv := discovery.Advertisement{
Service: vdiscovery.Service{
InstanceUuid: []byte(discovery.NewInstanceUUID()),
+ InstanceName: "service",
Attrs: vdiscovery.Attributes{
"key1": "value1",
"key2": "value2",
@@ -27,7 +28,7 @@
},
ServiceUuid: uuid.NewUUID(),
EncryptionAlgorithm: discovery.TestEncryption,
- EncryptionKeys: []discovery.EncryptionKey{discovery.EncryptionKey("k1"), discovery.EncryptionKey("k2")},
+ EncryptionKeys: []discovery.EncryptionKey{discovery.EncryptionKey("k")},
}
adv := newAdvertisment(v23Adv)
diff --git a/lib/discovery/plugins/ble/scanner.go b/lib/discovery/plugins/ble/scanner.go
index 9b379a1..a05e809 100644
--- a/lib/discovery/plugins/ble/scanner.go
+++ b/lib/discovery/plugins/ble/scanner.go
@@ -5,12 +5,13 @@
package ble
import (
- "log"
"sync"
- "v.io/x/ref/lib/discovery"
-
"github.com/pborman/uuid"
+
+ vdiscovery "v.io/v23/discovery"
+
+ "v.io/x/ref/lib/discovery"
)
type scanner struct {
@@ -26,13 +27,16 @@
if s.done {
return nil
}
+
+ // TODO(bjornick,jhahn): Revisit this strategy to provide the consistent behavior
+ // for updated advertisements across all plugins.
if oldAdv != nil {
- a, err := oldAdv.toDiscoveryAdvertisement()
- if err != nil {
- log.Println("failed to convert advertisement:", err)
+ s.ch <- &discovery.Advertisement{
+ Service: vdiscovery.Service{
+ InstanceUuid: oldAdv.instanceID,
+ },
+ Lost: true,
}
- a.Lost = true
- s.ch <- a
}
if newAdv != nil {
diff --git a/lib/discovery/plugins/mdns/encoding.go b/lib/discovery/plugins/mdns/encoding.go
new file mode 100644
index 0000000..2610dae
--- /dev/null
+++ b/lib/discovery/plugins/mdns/encoding.go
@@ -0,0 +1,105 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package mdns
+
+import (
+ "encoding/base32"
+ "errors"
+ "regexp"
+ "sort"
+ "strings"
+)
+
+const (
+ // Limit the maximum large txt records to the maximum total txt records size.
+ maxLargeTxtRecordLen = maxTotalTxtRecordsLen
+)
+
+var (
+ // The key of encoded large txt records is "_x<i><j>", where 'i' and 'j' will
+ // be one digit numbers since we limit the large txt record to 1300 bytes.
+ reLargeTxtRecord = regexp.MustCompile("^" + attrLargeTxtPrefix + "[0-9][0-9]=")
+
+ errInvalidLargeTxtRecord = errors.New("invalid large txt record")
+)
+
+// encodeInstanceUuid encodes the given instance uuid to a valid host name by using
+// "Extended Hex Alphabet" defined in RFC 4648. This removes any padding characters.
+func encodeInstanceUuid(instanceUuid []byte) string {
+ return strings.TrimRight(base32.HexEncoding.EncodeToString(instanceUuid), "=")
+}
+
+// decodeInstanceUuid decodes the given host name to an instance uuid.
+func decodeInstanceUuid(hostname string) ([]byte, error) {
+ // Add padding characters if needed.
+ if p := len(hostname) % 8; p > 0 {
+ hostname += strings.Repeat("=", 8-p)
+ }
+ return base32.HexEncoding.DecodeString(hostname)
+}
+
+// maybeSplitLargeTXT slices txt records larger than 255 bytes into multiple txt records.
+func maybeSplitLargeTXT(txt []string) ([]string, error) {
+ splitted := make([]string, 0, len(txt))
+ xno := 0
+ for _, v := range txt {
+ switch n := len(v); {
+ case n > maxLargeTxtRecordLen:
+ return nil, errMaxTxtRecordLenExceeded
+ case n > maxTxtRecordLen:
+ var buf [maxTxtRecordLen]byte
+ copy(buf[:], attrLargeTxtPrefix)
+ for i, off := 0, 0; off < n; i++ {
+ buf[2] = byte(xno + '0')
+ buf[3] = byte(i + '0')
+ buf[4] = '='
+ c := copy(buf[5:], v[off:])
+ splitted = append(splitted, string(buf[:5+c]))
+ off += c
+ }
+ xno++
+ default:
+ splitted = append(splitted, v)
+ }
+ }
+ return splitted, nil
+}
+
+// maybeJoinLargeTXT joins the splitted large txt records.
+func maybeJoinLargeTXT(txt []string) ([]string, error) {
+ joined, splitted := make([]string, 0, len(txt)), make([]string, 0)
+ for _, v := range txt {
+ switch {
+ case strings.HasPrefix(v, attrLargeTxtPrefix):
+ if !reLargeTxtRecord.MatchString(v) {
+ return nil, errInvalidLargeTxtRecord
+ }
+ splitted = append(splitted, v)
+ default:
+ joined = append(joined, v)
+ }
+ }
+ if len(splitted) == 0 {
+ return joined, nil
+ }
+
+ sort.Strings(splitted)
+
+ var buf [maxLargeTxtRecordLen]byte
+ xno, off := 0, 0
+ for _, v := range splitted {
+ i := int(v[2] - '0')
+ if i > xno {
+ // A new large txt record started.
+ joined = append(joined, string(buf[:off]))
+ xno++
+ off = 0
+ }
+ c := copy(buf[off:], v[5:])
+ off += c
+ }
+ joined = append(joined, string(buf[:off]))
+ return joined, nil
+}
diff --git a/lib/discovery/plugins/mdns/encoding_test.go b/lib/discovery/plugins/mdns/encoding_test.go
new file mode 100644
index 0000000..cbfa213
--- /dev/null
+++ b/lib/discovery/plugins/mdns/encoding_test.go
@@ -0,0 +1,87 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package mdns
+
+import (
+ "crypto/rand"
+ "encoding/base64"
+ "reflect"
+ "sort"
+ "testing"
+)
+
+func TestEncodeInstanceUuid(t *testing.T) {
+ tests := [][]byte{
+ randInstanceUuid(1),
+ randInstanceUuid(10),
+ randInstanceUuid(16),
+ randInstanceUuid(32),
+ }
+
+ for i, test := range tests {
+ encoded := encodeInstanceUuid(test)
+ instanceUuid, err := decodeInstanceUuid(encoded)
+ if err != nil {
+ t.Errorf("[%d]: decodeInstanceUuid failed: %v", i, err)
+ continue
+ }
+ if !reflect.DeepEqual(instanceUuid, test) {
+ t.Errorf("[%d]: decoded to %v, but want %v", i, instanceUuid, test)
+ }
+ }
+}
+
+func randInstanceUuid(n int) []byte {
+ b := make([]byte, n)
+ _, err := rand.Read(b)
+ if err != nil {
+ panic(err)
+ }
+ return b
+}
+
+func TestSplitLargeTxt(t *testing.T) {
+ tests := [][]string{
+ []string{randTxt(maxTxtRecordLen / 2)},
+ []string{randTxt(maxTxtRecordLen / 2), randTxt(maxTxtRecordLen / 3)},
+ []string{randTxt(maxTxtRecordLen * 2)},
+ []string{randTxt(maxTxtRecordLen * 2), randTxt(maxTxtRecordLen * 3)},
+ []string{randTxt(maxTxtRecordLen / 2), randTxt(maxTxtRecordLen * 3), randTxt(maxTxtRecordLen * 2), randTxt(maxTxtRecordLen / 3)},
+ }
+
+ for i, test := range tests {
+ splitted, err := maybeSplitLargeTXT(test)
+ if err != nil {
+ t.Errorf("[%d]: encodeLargeTxt failed: %v", i, err)
+ continue
+ }
+ for _, v := range splitted {
+ if len(v) > maxTxtRecordLen {
+ t.Errorf("[%d]: too large encoded txt %d - %v", i, len(v), v)
+ }
+ }
+
+ txt, err := maybeJoinLargeTXT(splitted)
+ if err != nil {
+ t.Errorf("[%d]: decodeLargeTxt failed: %v", i, err)
+ continue
+ }
+
+ sort.Strings(txt)
+ sort.Strings(test)
+ if !reflect.DeepEqual(txt, test) {
+ t.Errorf("[%d]: decoded to %#v, but want %#v", i, txt, test)
+ }
+ }
+}
+
+func randTxt(n int) string {
+ b := make([]byte, int((n*3+3)/4))
+ _, err := rand.Read(b)
+ if err != nil {
+ panic(err)
+ }
+ return base64.RawStdEncoding.EncodeToString(b)[:n]
+}
diff --git a/lib/discovery/plugins/mdns/mdns.go b/lib/discovery/plugins/mdns/mdns.go
index b036716..5056621 100644
--- a/lib/discovery/plugins/mdns/mdns.go
+++ b/lib/discovery/plugins/mdns/mdns.go
@@ -15,9 +15,9 @@
package mdns
import (
- "encoding/hex"
+ "bytes"
+ "errors"
"fmt"
- "strconv"
"strings"
"sync"
"time"
@@ -35,14 +35,25 @@
v23ServiceName = "v23"
serviceNameSuffix = "._sub._" + v23ServiceName
- // The attribute names should not exceed 4 bytes due to the txt record
- // size limit.
- attrServiceUuid = "_srv"
- attrInterface = "_itf"
- attrAddr = "_adr"
- // TODO(jhahn): Remove attrEncryptionAlgorithm.
- attrEncryptionAlgorithm = "_xxx"
- attrEncryptionKeys = "_key"
+ // Use short attribute names due to the txt record size limit.
+ attrName = "_n"
+ attrInterface = "_i"
+ attrAddrs = "_a"
+ attrEncryption = "_e"
+
+ // The prefix for attribute names for encoded large txt records.
+ attrLargeTxtPrefix = "_x"
+
+ // RFC 6763 limits each DNS txt record to 255 bytes and recommends to not have
+ // the cumulative size be larger than 1300 bytes.
+ //
+ // TODO(jhahn): Figure out how to overcome this limit.
+ maxTxtRecordLen = 255
+ maxTotalTxtRecordsLen = 1300
+)
+
+var (
+ errMaxTxtRecordLenExceeded = errors.New("max txt record size exceeded")
)
type plugin struct {
@@ -64,8 +75,8 @@
serviceName := ad.ServiceUuid.String() + serviceNameSuffix
// We use the instance uuid as the host name so that we can get the instance uuid
// from the lost service instance, which has no txt records at all.
- hostName := hex.EncodeToString(ad.InstanceUuid)
- txt, err := createTXTRecords(&ad)
+ hostName := encodeInstanceUuid(ad.InstanceUuid)
+ txt, err := createTxtRecords(&ad)
if err != nil {
return err
}
@@ -134,7 +145,7 @@
case <-ctx.Done():
return
}
- ad, err := decodeAdvertisement(service)
+ ad, err := createAdvertisement(service)
if err != nil {
ctx.Error(err)
continue
@@ -149,65 +160,89 @@
return nil
}
-func createTXTRecords(ad *ldiscovery.Advertisement) ([]string, error) {
- // Prepare a TXT record with attributes and addresses to announce.
- //
- // TODO(jhahn): Currently, the packet size is limited to 2000 bytes in
- // go-mdns-sd package. Think about how to handle a large number of TXT
- // records.
- txt := make([]string, 0, len(ad.Attrs)+4)
- txt = append(txt, fmt.Sprintf("%s=%s", attrServiceUuid, ad.ServiceUuid))
- txt = append(txt, fmt.Sprintf("%s=%s", attrInterface, ad.InterfaceName))
+func createTxtRecords(ad *ldiscovery.Advertisement) ([]string, error) {
+ // Prepare a txt record with attributes and addresses to announce.
+ txt := appendTxtRecord(nil, attrInterface, ad.InterfaceName)
+ if len(ad.InstanceName) > 0 {
+ txt = appendTxtRecord(txt, attrName, ad.InstanceName)
+ }
+ if len(ad.Addrs) > 0 {
+ addrs := ldiscovery.PackAddresses(ad.Addrs)
+ txt = appendTxtRecord(txt, attrAddrs, string(addrs))
+ }
+ if ad.EncryptionAlgorithm != ldiscovery.NoEncryption {
+ enc := ldiscovery.PackEncryptionKeys(ad.EncryptionAlgorithm, ad.EncryptionKeys)
+ txt = appendTxtRecord(txt, attrEncryption, string(enc))
+ }
for k, v := range ad.Attrs {
- txt = append(txt, fmt.Sprintf("%s=%s", k, v))
+ txt = appendTxtRecord(txt, k, v)
}
- for _, a := range ad.Addrs {
- txt = append(txt, fmt.Sprintf("%s=%s", attrAddr, a))
+ txt, err := maybeSplitLargeTXT(txt)
+ if err != nil {
+ return nil, err
}
- txt = append(txt, fmt.Sprintf("%s=%d", attrEncryptionAlgorithm, ad.EncryptionAlgorithm))
- for _, k := range ad.EncryptionKeys {
- txt = append(txt, fmt.Sprintf("%s=%s", attrEncryptionKeys, k))
+ n := 0
+ for _, v := range txt {
+ n += len(v)
+ if n > maxTotalTxtRecordsLen {
+ return nil, errMaxTxtRecordLenExceeded
+ }
}
return txt, nil
}
-func decodeAdvertisement(service mdns.ServiceInstance) (ldiscovery.Advertisement, error) {
+func appendTxtRecord(txt []string, k, v string) []string {
+ var buf bytes.Buffer
+ buf.WriteString(k)
+ buf.WriteByte('=')
+ buf.WriteString(v)
+ kv := buf.String()
+ txt = append(txt, kv)
+ return txt
+}
+
+func createAdvertisement(service mdns.ServiceInstance) (ldiscovery.Advertisement, error) {
// Note that service.Name starts with a host name, which is the instance uuid.
p := strings.SplitN(service.Name, ".", 2)
if len(p) < 1 {
- return ldiscovery.Advertisement{}, fmt.Errorf("invalid host name: %s", service.Name)
+ return ldiscovery.Advertisement{}, fmt.Errorf("invalid service name: %s", service.Name)
}
- instanceUuid, err := hex.DecodeString(p[0])
+ instanceUuid, err := decodeInstanceUuid(p[0])
if err != nil {
return ldiscovery.Advertisement{}, fmt.Errorf("invalid host name: %v", err)
}
- ad := ldiscovery.Advertisement{
- Service: discovery.Service{
- InstanceUuid: instanceUuid,
- Attrs: make(discovery.Attributes),
- },
- Lost: len(service.SrvRRs) == 0 && len(service.TxtRRs) == 0,
+ ad := ldiscovery.Advertisement{Service: discovery.Service{InstanceUuid: instanceUuid}}
+ if len(service.SrvRRs) == 0 && len(service.TxtRRs) == 0 {
+ ad.Lost = true
+ return ad, nil
}
+ ad.Attrs = make(discovery.Attributes)
for _, rr := range service.TxtRRs {
- for _, txt := range rr.Txt {
- kv := strings.SplitN(txt, "=", 2)
- if len(kv) != 2 {
+ txt, err := maybeJoinLargeTXT(rr.Txt)
+ if err != nil {
+ return ldiscovery.Advertisement{}, err
+ }
+
+ for _, kv := range txt {
+ p := strings.SplitN(kv, "=", 2)
+ if len(p) != 2 {
return ldiscovery.Advertisement{}, fmt.Errorf("invalid txt record: %s", txt)
}
- switch k, v := kv[0], kv[1]; k {
- case attrServiceUuid:
- ad.ServiceUuid = uuid.Parse(v)
+ switch k, v := p[0], p[1]; k {
+ case attrName:
+ ad.InstanceName = v
case attrInterface:
ad.InterfaceName = v
- case attrAddr:
- ad.Addrs = append(ad.Addrs, v)
- case attrEncryptionAlgorithm:
- a, _ := strconv.Atoi(v)
- ad.EncryptionAlgorithm = ldiscovery.EncryptionAlgorithm(a)
- case attrEncryptionKeys:
- ad.EncryptionKeys = append(ad.EncryptionKeys, ldiscovery.EncryptionKey(v))
+ case attrAddrs:
+ if ad.Addrs, err = ldiscovery.UnpackAddresses([]byte(v)); err != nil {
+ return ldiscovery.Advertisement{}, err
+ }
+ case attrEncryption:
+ if ad.EncryptionAlgorithm, ad.EncryptionKeys, err = ldiscovery.UnpackEncryptionKeys([]byte(v)); err != nil {
+ return ldiscovery.Advertisement{}, err
+ }
default:
ad.Attrs[k] = v
}
diff --git a/lib/discovery/plugins/mdns/mdns_test.go b/lib/discovery/plugins/mdns/mdns_test.go
index 5b6a7c3..27956dc 100644
--- a/lib/discovery/plugins/mdns/mdns_test.go
+++ b/lib/discovery/plugins/mdns/mdns_test.go
@@ -8,6 +8,7 @@
"fmt"
"reflect"
"runtime"
+ "strings"
"testing"
"time"
@@ -128,6 +129,7 @@
services := []discovery.Service{
{
InstanceUuid: ldiscovery.NewInstanceUUID(),
+ InstanceName: "service1",
InterfaceName: "v.io/x",
Attrs: discovery.Attributes{
"a": "a1234",
@@ -139,6 +141,7 @@
},
{
InstanceUuid: ldiscovery.NewInstanceUUID(),
+ InstanceName: "service2",
InterfaceName: "v.io/x",
Attrs: discovery.Attributes{
"a": "a5678",
@@ -150,6 +153,7 @@
},
{
InstanceUuid: ldiscovery.NewInstanceUUID(),
+ InstanceName: "service3",
InterfaceName: "v.io/y",
Attrs: discovery.Attributes{
"c": "c1234",
@@ -229,3 +233,40 @@
t.Error(err)
}
}
+
+func TestLargeTxt(t *testing.T) {
+ ctx, shutdown := test.V23Init()
+ defer shutdown()
+
+ service := discovery.Service{
+ InstanceUuid: ldiscovery.NewInstanceUUID(),
+ InstanceName: "service2",
+ InterfaceName: strings.Repeat("i", 280),
+ Attrs: discovery.Attributes{
+ "k": strings.Repeat("v", 280),
+ },
+ Addrs: []string{
+ strings.Repeat("a1", 100),
+ strings.Repeat("a2", 100),
+ },
+ }
+
+ p1, err := newWithLoopback("m1", true)
+ if err != nil {
+ t.Fatalf("New() failed: %v", err)
+ }
+ stop, err := advertise(ctx, p1, service)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer stop()
+
+ p2, err := newWithLoopback("m2", true)
+ if err != nil {
+ t.Fatalf("New() failed: %v", err)
+ }
+
+ if err := scanAndMatch(ctx, p2, "", service); err != nil {
+ t.Error(err)
+ }
+}
diff --git a/lib/discovery/scan.go b/lib/discovery/scan.go
index 94da490..430c199 100644
--- a/lib/discovery/scan.go
+++ b/lib/discovery/scan.go
@@ -71,7 +71,7 @@
if ad.Lost {
if _, ok := found[id]; ok {
delete(found, id)
- updateCh <- discovery.UpdateLost{discovery.Lost{Service: ad.Service}}
+ updateCh <- discovery.UpdateLost{discovery.Lost{InstanceUuid: ad.InstanceUuid}}
}
} else {
found[id] = struct{}{}
diff --git a/runtime/internal/flow/manager/manager.go b/runtime/internal/flow/manager/manager.go
index 185ff12..80099f4 100644
--- a/runtime/internal/flow/manager/manager.go
+++ b/runtime/internal/flow/manager/manager.go
@@ -73,7 +73,8 @@
// Periodically kill closed connections.
m.cache.KillConnections(ctx, 0)
case e := <-events:
- if e.Status.Closed && !e.Status.LocalLameDuck || e.Status.LocalLameDuck {
+ if e.Status.Closed && !e.Status.LocalLameDuck ||
+ !e.Status.Closed && e.Status.LocalLameDuck {
m.ls.activeConns.Done()
}
}
diff --git a/services/binary/tidy/impl_test.go b/services/binary/tidy/impl_test.go
index 9fbd5c6..9a80966 100644
--- a/services/binary/tidy/impl_test.go
+++ b/services/binary/tidy/impl_test.go
@@ -163,6 +163,11 @@
application.Envelope{},
fmt.Errorf("no applications.Match(darwin-amd64)"),
},
+ // applications.Match(android-arm)
+ appd.MatchResult{
+ application.Envelope{},
+ fmt.Errorf("no applications.Match(android-arm)"),
+ },
// applications/applicationd.Match(linux-amd64)
appd.MatchResult{
application.Envelope{
@@ -191,6 +196,11 @@
},
nil,
},
+ // applications/applicationd.Match(android-arm)
+ appd.MatchResult{
+ application.Envelope{},
+ fmt.Errorf("no applications/applicationd.Match(android-arm)"),
+ },
// applications/applicationd/0.Match(linux-amd64)
appd.MatchResult{
application.Envelope{
@@ -219,6 +229,11 @@
},
nil,
},
+ // applications/applicationd/0.Match(android-arm)
+ appd.MatchResult{
+ application.Envelope{},
+ fmt.Errorf("no applications/applicationd/0.Match(android-arm)"),
+ },
// applications/binaryd.Match(linux-amd64)
appd.MatchResult{
application.Envelope{
@@ -253,6 +268,11 @@
},
nil,
},
+ // applications/binaryd.Match(android-arm)
+ appd.MatchResult{
+ application.Envelope{},
+ fmt.Errorf("no applications/binaryd.Match(android-arm)"),
+ },
// applications/binaryd/1.Match(linux-amd64)
appd.MatchResult{
application.Envelope{
@@ -287,6 +307,11 @@
},
nil,
},
+ // applications/binaryd/1.Match(android-arm)
+ appd.MatchResult{
+ application.Envelope{},
+ fmt.Errorf("no applications/binaryd/1.Match(android-arm)"),
+ },
)
if err := v23cmd.ParseAndRunForTest(cmdBinaryTidy, ctx, env, []string{applicationName, binaryName}); err != nil {
@@ -333,22 +358,27 @@
appd.MatchStimulus{Name: "Match", Suffix: "applications", Profiles: []string{"linux-386"}},
appd.MatchStimulus{Name: "Match", Suffix: "applications", Profiles: []string{"linux-arm"}},
appd.MatchStimulus{Name: "Match", Suffix: "applications", Profiles: []string{"darwin-amd64"}},
+ appd.MatchStimulus{Name: "Match", Suffix: "applications", Profiles: []string{"android-arm"}},
appd.MatchStimulus{Name: "Match", Suffix: "applications/applicationd", Profiles: []string{"linux-amd64"}},
appd.MatchStimulus{Name: "Match", Suffix: "applications/applicationd", Profiles: []string{"linux-386"}},
appd.MatchStimulus{Name: "Match", Suffix: "applications/applicationd", Profiles: []string{"linux-arm"}},
appd.MatchStimulus{Name: "Match", Suffix: "applications/applicationd", Profiles: []string{"darwin-amd64"}},
+ appd.MatchStimulus{Name: "Match", Suffix: "applications/applicationd", Profiles: []string{"android-arm"}},
appd.MatchStimulus{Name: "Match", Suffix: "applications/applicationd/0", Profiles: []string{"linux-amd64"}},
appd.MatchStimulus{Name: "Match", Suffix: "applications/applicationd/0", Profiles: []string{"linux-386"}},
appd.MatchStimulus{Name: "Match", Suffix: "applications/applicationd/0", Profiles: []string{"linux-arm"}},
appd.MatchStimulus{Name: "Match", Suffix: "applications/applicationd/0", Profiles: []string{"darwin-amd64"}},
+ appd.MatchStimulus{Name: "Match", Suffix: "applications/applicationd/0", Profiles: []string{"android-arm"}},
appd.MatchStimulus{Name: "Match", Suffix: "applications/binaryd", Profiles: []string{"linux-amd64"}},
appd.MatchStimulus{Name: "Match", Suffix: "applications/binaryd", Profiles: []string{"linux-386"}},
appd.MatchStimulus{Name: "Match", Suffix: "applications/binaryd", Profiles: []string{"linux-arm"}},
appd.MatchStimulus{Name: "Match", Suffix: "applications/binaryd", Profiles: []string{"darwin-amd64"}},
+ appd.MatchStimulus{Name: "Match", Suffix: "applications/binaryd", Profiles: []string{"android-arm"}},
appd.MatchStimulus{Name: "Match", Suffix: "applications/binaryd/1", Profiles: []string{"linux-amd64"}},
appd.MatchStimulus{Name: "Match", Suffix: "applications/binaryd/1", Profiles: []string{"linux-386"}},
appd.MatchStimulus{Name: "Match", Suffix: "applications/binaryd/1", Profiles: []string{"linux-arm"}},
appd.MatchStimulus{Name: "Match", Suffix: "applications/binaryd/1", Profiles: []string{"darwin-amd64"}},
+ appd.MatchStimulus{Name: "Match", Suffix: "applications/binaryd/1", Profiles: []string{"android-arm"}},
}; !reflect.DeepEqual(expected, got) {
t.Errorf("apptape invalid call sequence. Got %#v, want %#v", got, expected)
}
diff --git a/services/device/deviced/internal/impl/device_service.go b/services/device/deviced/internal/impl/device_service.go
index 76a1a7f..6ee780b 100644
--- a/services/device/deviced/internal/impl/device_service.go
+++ b/services/device/deviced/internal/impl/device_service.go
@@ -416,7 +416,7 @@
}
stderrLog, stdoutLog := filepath.Join(logs, "STDERR"), filepath.Join(logs, "STDOUT")
- output := "#!/bin/bash\n"
+ output := "#!" + ShellPath + "\n"
output += "if [ -z \"$DEVICE_MANAGER_DONT_REDIRECT_STDOUT_STDERR\" ]; then\n"
output += fmt.Sprintf(" TIMESTAMP=$(%s)\n", DateCommand)
output += fmt.Sprintf(" exec > %s-$TIMESTAMP 2> %s-$TIMESTAMP\n", stdoutLog, stderrLog)
diff --git a/services/device/deviced/internal/impl/helper_manager.go b/services/device/deviced/internal/impl/helper_manager.go
index 2d65694..4d635d2 100644
--- a/services/device/deviced/internal/impl/helper_manager.go
+++ b/services/device/deviced/internal/impl/helper_manager.go
@@ -26,16 +26,21 @@
var suidHelper *suidHelperState
func InitSuidHelper(ctx *context.T, helperPath string) {
- if suidHelper != nil || helperPath == "" {
+ if suidHelper != nil {
return
}
+ if helperPath == "" {
+ ctx.Panicf("suidhelper path needs to be specified")
+ }
- u, err := user.Current()
- if err != nil {
- ctx.Panicf("devicemanager has no current user: %v", err)
+ var userName string
+ if user, _ := user.Current(); user != nil && len(user.Username) > 0 {
+ userName = user.Username
+ } else {
+ userName = "anonymous"
}
suidHelper = &suidHelperState{
- dmUser: u.Username,
+ dmUser: userName,
helperPath: helperPath,
}
}
diff --git a/services/device/deviced/internal/impl/impl_test.go b/services/device/deviced/internal/impl/impl_test.go
index 81fdeba..b210344 100644
--- a/services/device/deviced/internal/impl/impl_test.go
+++ b/services/device/deviced/internal/impl/impl_test.go
@@ -53,7 +53,7 @@
// of how device manager implementation sets up its updated versions.
func generateDeviceManagerScript(t *testing.T, root string, args, env []string) string {
env = impl.VanadiumEnvironment(env)
- output := "#!/bin/bash\n"
+ output := "#!" + impl.ShellPath + "\n"
output += strings.Join(config.QuoteEnv(env), " ") + " exec "
output += strings.Join(args, " ")
if err := os.MkdirAll(filepath.Join(root, "factory"), 0755); err != nil {
diff --git a/services/device/deviced/internal/impl/profile.go b/services/device/deviced/internal/impl/profile.go
index 9908666..5c978b1 100644
--- a/services/device/deviced/internal/impl/profile.go
+++ b/services/device/deviced/internal/impl/profile.go
@@ -40,6 +40,8 @@
result.Format = build.FormatElf
case build.OperatingSystemWindows:
result.Format = build.FormatPe
+ case build.OperatingSystemAndroid:
+ result.Format = build.FormatElf
default:
return nil, errors.New("Unsupported operating system: " + os.String())
}
@@ -91,6 +93,8 @@
// TODO(jsimsa): Implement.
case "windows":
// TODO(jsimsa): Implement.
+ case "android":
+ // TODO(caprita): Implement.
default:
return nil, errors.New("Unsupported operating system: " + runtime.GOOS)
}
diff --git a/services/device/deviced/internal/impl/shell_android.go b/services/device/deviced/internal/impl/shell_android.go
new file mode 100644
index 0000000..f45cf69
--- /dev/null
+++ b/services/device/deviced/internal/impl/shell_android.go
@@ -0,0 +1,10 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package impl
+
+const (
+ ShellPath = "/system/bin/sh"
+ DateCommand = "/system/bin/date +%s.$RANDOM"
+)
diff --git a/services/device/deviced/internal/impl/shell_darwin.go b/services/device/deviced/internal/impl/shell_darwin.go
index fc0d92a..e1b54a9 100644
--- a/services/device/deviced/internal/impl/shell_darwin.go
+++ b/services/device/deviced/internal/impl/shell_darwin.go
@@ -5,5 +5,6 @@
package impl
const (
+ ShellPath = "/bin/bash"
DateCommand = "/bin/date +%s.$RANDOM"
)
diff --git a/services/device/deviced/internal/impl/shell_linux.go b/services/device/deviced/internal/impl/shell_linux.go
index 8e02672..3b81a07 100644
--- a/services/device/deviced/internal/impl/shell_linux.go
+++ b/services/device/deviced/internal/impl/shell_linux.go
@@ -2,8 +2,11 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
+// +build linux,!android
+
package impl
const (
+ ShellPath = "/bin/bash"
DateCommand = "/bin/date +%s%N"
)
diff --git a/services/device/deviced/internal/impl/utiltest/helpers.go b/services/device/deviced/internal/impl/utiltest/helpers.go
index b8e855e..d66576a 100644
--- a/services/device/deviced/internal/impl/utiltest/helpers.go
+++ b/services/device/deviced/internal/impl/utiltest/helpers.go
@@ -498,7 +498,7 @@
// GenerateSuidHelperScript builds a script to execute the test target as
// a suidhelper instance and returns the path to the script.
func GenerateSuidHelperScript(t *testing.T, root string) string {
- output := "#!/bin/bash\n"
+ output := "#!" + impl.ShellPath + "\n"
output += "V23_SUIDHELPER_TEST=1"
output += " "
output += "exec " + os.Args[0] + " -minuid=1 -test.run=TestSuidHelper \"$@\""
@@ -519,8 +519,8 @@
// GenerateAgentScript creates a simple script that acts as the security agent
// for tests. It blackholes arguments meant for the agent.
func GenerateAgentScript(t *testing.T, root string) string {
- output := `
-#!/bin/bash
+ output := "#!" + impl.ShellPath + "\n" +
+ `
ARGS=$*
for ARG in ${ARGS[@]}; do
if [[ ${ARG} = -- ]]; then
diff --git a/services/device/deviced/internal/installer/device_installer.go b/services/device/deviced/internal/installer/device_installer.go
index fbdc36b..282dc51 100644
--- a/services/device/deviced/internal/installer/device_installer.go
+++ b/services/device/deviced/internal/installer/device_installer.go
@@ -218,7 +218,7 @@
}
stdoutLog, stderrLog := filepath.Join(logs, "STDOUT"), filepath.Join(logs, "STDERR")
// TODO(caprita): Switch all our generated bash scripts to use templates.
- output := "#!/bin/bash\n"
+ output := "#!" + impl.ShellPath + "\n"
output += "if [ -z \"$DEVICE_MANAGER_DONT_REDIRECT_STDOUT_STDERR\" ]; then\n"
output += fmt.Sprintf(" TIMESTAMP=$(%s)\n", impl.DateCommand)
output += fmt.Sprintf(" exec > %s-$TIMESTAMP 2> %s-$TIMESTAMP\n", stdoutLog, stderrLog)
diff --git a/services/device/internal/suid/args.go b/services/device/internal/suid/args.go
index d9ab2fb..8ba2eeb 100644
--- a/services/device/internal/suid/args.go
+++ b/services/device/internal/suid/args.go
@@ -166,34 +166,37 @@
return nil
}
- username := *flagUsername
- if username == "" {
- return verror.New(errUserNameMissing, nil)
- }
+ if *flagDryrun {
+ wp.uid, wp.gid = -1, -1
+ } else {
+ username := *flagUsername
+ if username == "" {
+ return verror.New(errUserNameMissing, nil)
+ }
- usr, err := user.Lookup(username)
- if err != nil {
- return verror.New(errUnknownUser, nil, username)
- }
+ usr, err := user.Lookup(username)
+ if err != nil {
+ return verror.New(errUnknownUser, nil, username)
+ }
- uid, err := strconv.ParseInt(usr.Uid, 0, 32)
- if err != nil {
- return verror.New(errInvalidUID, nil, usr.Uid)
- }
- gid, err := strconv.ParseInt(usr.Gid, 0, 32)
- if err != nil {
- return verror.New(errInvalidGID, nil, usr.Gid)
- }
- warnMissingSuidPrivs(int(uid))
+ uid, err := strconv.ParseInt(usr.Uid, 0, 32)
+ if err != nil {
+ return verror.New(errInvalidUID, nil, usr.Uid)
+ }
+ gid, err := strconv.ParseInt(usr.Gid, 0, 32)
+ if err != nil {
+ return verror.New(errInvalidGID, nil, usr.Gid)
+ }
+ warnMissingSuidPrivs(int(uid))
- // Uids less than 501 can be special so we forbid running as them.
- if uid < *flagMinimumUid {
- return verror.New(errUIDTooLow, nil,
- uid, *flagMinimumUid)
+ // Uids less than 501 can be special so we forbid running as them.
+ if uid < *flagMinimumUid {
+ return verror.New(errUIDTooLow, nil,
+ uid, *flagMinimumUid)
+ }
+ wp.uid = int(uid)
+ wp.gid = int(gid)
}
- wp.uid = int(uid)
- wp.gid = int(gid)
-
wp.dryrun = *flagDryrun
// At this point, all flags allowed by --chown have been processed
diff --git a/services/device/internal/suid/args_test.go b/services/device/internal/suid/args_test.go
index d910b51..b7ae0a8 100644
--- a/services/device/internal/suid/args_test.go
+++ b/services/device/internal/suid/args_test.go
@@ -103,8 +103,8 @@
[]string{"A=B"},
"",
WorkParameters{
- uid: testUid,
- gid: testGid,
+ uid: -1,
+ gid: -1,
workspace: "",
agentsock: "",
logDir: "",
@@ -167,8 +167,8 @@
[]string{"A=B"},
"",
WorkParameters{
- uid: testUid,
- gid: testGid,
+ uid: -1,
+ gid: -1,
workspace: "/hello",
agentsock: "/tmp/2981298123/s",
logDir: "/logging",
diff --git a/services/internal/profiles/listprofiles.go b/services/internal/profiles/listprofiles.go
index 6542bbe..66b6eb5 100644
--- a/services/internal/profiles/listprofiles.go
+++ b/services/internal/profiles/listprofiles.go
@@ -56,6 +56,13 @@
Os: build.OperatingSystemDarwin,
Format: build.FormatMach,
},
+ {
+ Label: "android-arm",
+ Description: "",
+ Arch: build.ArchitectureArm,
+ Os: build.OperatingSystemAndroid,
+ Format: build.FormatElf,
+ },
}, nil
// TODO(jsimsa): This function assumes the existence of a profile
diff --git a/services/syncbase/server/interfaces/sync_types.go b/services/syncbase/server/interfaces/sync_types.go
index 39732b1..a37ddc5 100644
--- a/services/syncbase/server/interfaces/sync_types.go
+++ b/services/syncbase/server/interfaces/sync_types.go
@@ -19,3 +19,49 @@
}
return out
}
+
+// Compare returns an integer comparing two prefix generation vectors. The
+// result will be 0 if a==b, -1 if a < b, +1 if a > b and +2 if a and b are
+// uncomparable.
+func (a PrefixGenVector) Compare(b PrefixGenVector) int {
+ res := -2
+
+ if len(a) == 0 && len(b) == 0 {
+ return 0
+ }
+
+ for aid, agen := range a {
+ bgen, ok := b[aid]
+
+ resCur := 0
+ if agen > bgen || !ok {
+ resCur = 1
+ } else if agen < bgen {
+ resCur = -1
+ }
+
+ if res == -2 || res == 0 {
+ // Initialize/overwrite safely with the curent result.
+ res = resCur
+ } else if res != resCur && resCur != 0 {
+ // Uncomparable, since some elements are less and others
+ // are greater.
+ return 2
+ }
+ }
+
+ for bid := range b {
+ if _, ok := a[bid]; ok {
+ continue
+ }
+
+ if res == 1 {
+ // Missing elements. So a cannot be greater than b.
+ return 2
+ }
+
+ return -1
+ }
+
+ return res
+}
diff --git a/services/syncbase/server/interfaces/sync_types_test.go b/services/syncbase/server/interfaces/sync_types_test.go
new file mode 100644
index 0000000..4386bf5
--- /dev/null
+++ b/services/syncbase/server/interfaces/sync_types_test.go
@@ -0,0 +1,105 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package interfaces
+
+import "testing"
+
+func TestPrefixGenVectorCompare(t *testing.T) {
+ tests := []struct {
+ a, b PrefixGenVector
+ resAB int
+ resBA int
+ }{
+ { // a = b.
+ a: PrefixGenVector{10: 1, 11: 10, 12: 20, 13: 2},
+ b: PrefixGenVector{10: 1, 11: 10, 12: 20, 13: 2},
+ resAB: 0,
+ resBA: 0,
+ },
+ { // a = b.
+ a: PrefixGenVector{},
+ b: PrefixGenVector{},
+ resAB: 0,
+ resBA: 0,
+ },
+ { // a > b.
+ a: PrefixGenVector{10: 10, 11: 11, 12: 12, 13: 13},
+ b: PrefixGenVector{10: 1, 11: 2, 12: 3, 13: 4},
+ resAB: 1,
+ resBA: -1,
+ },
+ { // a > b.
+ a: PrefixGenVector{10: 38, 11: 5, 12: 56, 13: 13},
+ b: PrefixGenVector{},
+ resAB: 1,
+ resBA: -1,
+ },
+ { // a > b.
+ a: PrefixGenVector{10: 10, 11: 11, 12: 12, 13: 13},
+ b: PrefixGenVector{11: 2, 12: 3, 13: 4},
+ resAB: 1,
+ resBA: -1,
+ },
+ { // a > b.
+ a: PrefixGenVector{10: 10, 11: 11, 12: 12, 13: 13},
+ b: PrefixGenVector{11: 11, 12: 2, 13: 4},
+ resAB: 1,
+ resBA: -1,
+ },
+ { // a > b.
+ a: PrefixGenVector{10: 10, 11: 11, 12: 12, 13: 13},
+ b: PrefixGenVector{11: 11, 12: 12, 13: 13},
+ resAB: 1,
+ resBA: -1,
+ },
+ { // a > b.
+ a: PrefixGenVector{10: 1, 11: 11, 12: 12, 13: 13},
+ b: PrefixGenVector{10: 1, 11: 2, 12: 3, 13: 4},
+ resAB: 1,
+ resBA: -1,
+ },
+ { // a > b.
+ a: PrefixGenVector{10: 38, 11: 5, 12: 56, 13: 13},
+ b: PrefixGenVector{10: 1, 11: 5, 12: 23, 13: 4},
+ resAB: 1,
+ resBA: -1,
+ },
+ { // a > b.
+ a: PrefixGenVector{10: 0, 11: 5, 12: 56, 13: 13},
+ b: PrefixGenVector{11: 5, 12: 23, 13: 4},
+ resAB: 1,
+ resBA: -1,
+ },
+ { // a != b.
+ a: PrefixGenVector{10: 38, 11: 5, 12: 56, 13: 13},
+ b: PrefixGenVector{10: 56, 11: 5, 12: 23, 13: 4},
+ resAB: 2,
+ resBA: 2,
+ },
+ { // a != b.
+ a: PrefixGenVector{10: 38, 11: 5, 12: 56, 13: 13},
+ b: PrefixGenVector{10: 1, 11: 50, 12: 23, 13: 4},
+ resAB: 2,
+ resBA: 2,
+ },
+ { // a != b.
+ a: PrefixGenVector{10: 10, 11: 11, 12: 12, 13: 13},
+ b: PrefixGenVector{11: 11, 12: 2, 13: 4, 15: 40},
+ resAB: 2,
+ resBA: 2,
+ },
+ }
+
+ for pos, test := range tests {
+ got, want := test.a.Compare(test.b), test.resAB
+ if got != want {
+ t.Fatalf("Comparison failed for pos %d (a=%v, b=%v), got %v, want %v", pos, test.a, test.b, got, want)
+ }
+ got, want = test.b.Compare(test.a), test.resBA
+ if got != want {
+ t.Fatalf("Comparison failed for pos %d (a=%v, b=%v), got %v, want %v", pos, test.a, test.b, got, want)
+ }
+ }
+}
diff --git a/services/syncbase/server/mojo_impl.go b/services/syncbase/server/mojo_impl.go
index ecbbc0c..9c1b654 100644
--- a/services/syncbase/server/mojo_impl.go
+++ b/services/syncbase/server/mojo_impl.go
@@ -409,7 +409,15 @@
Continued: vc.Continued,
}
- return s.proxy.OnChange(mc)
+ // Block until client acks receiving and processing the previous change before sending more.
+ // This effectively creates a flow control mechanism.
+ // TODO(aghassemi): Consider sending more than a single change event before
+ // blocking until receiving ack.
+ ack, err := s.proxy.OnChange(mc)
+ if !ack && err != nil {
+ err = verror.NewErrInternal(s.ctx)
+ }
+ return err
}
func (s *watchGlobStreamImpl) Recv(_ interface{}) error {
@@ -646,10 +654,20 @@
return verror.NewErrInternal(s.ctx)
}
- return s.proxy.OnKeyValue(mojom.KeyValue{
+ // Block until client acks receiving and processing the previous change before sending more.
+ // This effectively creates a flow control mechanism.
+ // TODO(aghassemi): Consider sending more than a single KeyValue before
+ // blocking until receiving ack.
+ ack, err := s.proxy.OnKeyValue(mojom.KeyValue{
Key: kv.Key,
Value: kv.Value,
})
+
+ if !ack && err != nil {
+ err = verror.NewErrInternal(s.ctx)
+ }
+
+ return err
}
func (s *scanStreamImpl) Recv(_ interface{}) error {
diff --git a/services/syncbase/testutil/v23util.go b/services/syncbase/testutil/v23util.go
index 8e370a8..42496a7 100644
--- a/services/syncbase/testutil/v23util.go
+++ b/services/syncbase/testutil/v23util.go
@@ -31,6 +31,7 @@
}
rmRootDir = true
}
+
// Start syncbased.
invocation := syncbased.WithStartOpts(syncbased.StartOpts().WithCustomCredentials(creds)).Start(
"--v23.tcp.address=127.0.0.1:0",
diff --git a/services/syncbase/vsync/initiator.go b/services/syncbase/vsync/initiator.go
index 94f79e1..0727cca 100644
--- a/services/syncbase/vsync/initiator.go
+++ b/services/syncbase/vsync/initiator.go
@@ -494,6 +494,9 @@
if iSt.sg {
// Add the SyncGroup value to the Database.
+ if err := iSt.insertSgRecInDb(ctx, rec, v.Value.Value, tx); err != nil {
+ return err
+ }
} else {
if err := iSt.insertRecInDb(ctx, rec, v.Value.Value, tx); err != nil {
return err
@@ -564,6 +567,16 @@
return err
}
+// insertSgRecInDb inserts the versioned value of a SyncGroup in the Database.
+func (iSt *initiationState) insertSgRecInDb(ctx *context.T, rec *localLogRec, valbuf []byte, tx store.Transaction) error {
+ m := rec.Metadata
+ var sg interfaces.SyncGroup
+ if err := vom.Decode(valbuf, &sg); err != nil {
+ return err
+ }
+ return setSGDataEntryByOID(ctx, tx, m.ObjId, m.CurVers, &sg)
+}
+
// insertRecInDb inserts the versioned value in the Database.
func (iSt *initiationState) insertRecInDb(ctx *context.T, rec *localLogRec, valbuf []byte, tx store.Transaction) error {
m := rec.Metadata
@@ -944,7 +957,7 @@
Gen: dsInMem.data.gen,
CheckptGen: dsInMem.data.checkptGen,
},
- Sgs: make(map[interfaces.GroupId]localGenInfo),
+ Sgs: make(map[string]localGenInfo),
GenVec: dsInMem.genvec,
SgGenVec: dsInMem.sggenvec,
}
@@ -970,6 +983,31 @@
if _, ok := genvec[rpfx]; !ok {
genvec[rpfx] = respgv
}
+
+ if iSt.sg {
+ // Flip sync pending if needed in case of SyncGroup
+ // syncing. See explanation for SyncPending flag in
+ // types.vdl.
+ gid, err := sgID(rpfx)
+ if err != nil {
+ return err
+ }
+ state, err := getSGIdEntry(ctx, iSt.tx, gid)
+ if err != nil {
+ return err
+ }
+ if state.SyncPending {
+ curgv := genvec[rpfx]
+ res := curgv.Compare(state.PendingGenVec)
+ vlog.VI(4).Infof("sync: updateSyncSt:: checking join pending %v, curgv %v, res %v", state.PendingGenVec, curgv, res)
+ if res >= 0 {
+ state.SyncPending = false
+ if err := setSGIdEntry(ctx, iSt.tx, gid, state); err != nil {
+ return err
+ }
+ }
+ }
+ }
}
iSt.updLocal = genvec
@@ -979,8 +1017,6 @@
delete(pgv, iSt.config.sync.id)
}
- // TODO(hpucha): Flip join pending if needed.
-
// TODO(hpucha): Add knowledge compaction.
return putDbSyncState(ctx, iSt.tx, ds)
diff --git a/services/syncbase/vsync/initiator_test.go b/services/syncbase/vsync/initiator_test.go
index 7c1a045..c740d95 100644
--- a/services/syncbase/vsync/initiator_test.go
+++ b/services/syncbase/vsync/initiator_test.go
@@ -490,7 +490,7 @@
t.Fatalf("Mount tables are not equal config %v, spec %v", iSt.config.mtTables, sg1.Spec.MountTables)
}
- s.initSyncStateInMem(nil, "mockapp", "mockdb", sgId1)
+ s.initSyncStateInMem(nil, "mockapp", "mockdb", sgOID(sgId1))
iSt.stream = createReplayStream(t, rfile)
diff --git a/services/syncbase/vsync/responder.go b/services/syncbase/vsync/responder.go
index 0c78f52..8cdc47a 100644
--- a/services/syncbase/vsync/responder.go
+++ b/services/syncbase/vsync/responder.go
@@ -8,12 +8,11 @@
"container/heap"
"fmt"
"sort"
- "strconv"
"strings"
"v.io/v23/context"
- wire "v.io/v23/services/syncbase/nosql"
"v.io/v23/verror"
+ "v.io/v23/vom"
"v.io/x/lib/vlog"
"v.io/x/ref/services/syncbase/server/interfaces"
"v.io/x/ref/services/syncbase/server/watchable"
@@ -69,10 +68,10 @@
rSt.initVec = v.Value.InitVec
rSt.sgIds = make(sgSet)
// Populate the sgids from the initvec.
- for id := range rSt.initVec {
- gid, err := strconv.ParseUint(id, 10, 64)
+ for oid := range rSt.initVec {
+ gid, err := sgID(oid)
if err != nil {
- vlog.Fatalf("sync: newResponderState: invalid syncgroup id", gid)
+ vlog.Fatalf("sync: newResponderState: invalid syncgroup key", oid)
}
rSt.sgIds[interfaces.GroupId(gid)] = struct{}{}
}
@@ -119,8 +118,8 @@
// embedded, consider using a helper function to auto-fill it instead
// (see http://goo.gl/mEa4L0) but only incur that overhead when the
// logging level specified is enabled.
- vlog.VI(3).Infof("sync: sendDeltasPerDatabase: %s, %s: sgids %v, genvec %v",
- rSt.appName, rSt.dbName, rSt.sgIds, rSt.initVec)
+ vlog.VI(3).Infof("sync: sendDeltasPerDatabase: recvd %s, %s: sgids %v, genvec %v, sg %v",
+ rSt.appName, rSt.dbName, rSt.sgIds, rSt.initVec, rSt.sg)
// Phase 1 of sendDeltas: Authorize the initiator and respond to the
// caller only for the SyncGroups that allow access.
@@ -179,12 +178,6 @@
for _, p := range sg.Spec.Prefixes {
allowedPfxs[p] = struct{}{}
}
-
- // Add the initiator to the SyncGroup membership if not already
- // in it. It is a temporary solution until SyncGroup metadata
- // is synchronized peer to peer.
- // TODO(rdaoud): remove this when SyncGroups are synced.
- rSt.addInitiatorToSyncGroup(ctx, sgid)
}
if err != nil {
@@ -214,49 +207,9 @@
return nil
}
-// addInitiatorToSyncGroup adds the request initiator to the membership of the
-// given SyncGroup if the initiator is not already a member. It is a temporary
-// solution until SyncGroup metadata starts being synchronized, at which time
-// peers will learn of new members through mutations of the SyncGroup metadata
-// by the SyncGroup administrators.
-// Note: the joiner metadata is fake because the responder does not have it.
-func (rSt *responderState) addInitiatorToSyncGroup(ctx *context.T, gid interfaces.GroupId) {
- if rSt.initiator == "" {
- return
- }
-
- err := store.RunInTransaction(rSt.st, func(tx store.Transaction) error {
- version, err := getSyncGroupVersion(ctx, tx, gid)
- if err != nil {
- return err
- }
- sg, err := getSGDataEntry(ctx, tx, gid, version)
- if err != nil {
- return err
- }
-
- // If the initiator is already a member of the SyncGroup abort
- // the transaction with a special error code.
- if _, ok := sg.Joiners[rSt.initiator]; ok {
- return verror.New(verror.ErrExist, ctx, "member already in SyncGroup")
- }
-
- vlog.VI(4).Infof("sync: addInitiatorToSyncGroup: add %s to sgid %d", rSt.initiator, gid)
- sg.Joiners[rSt.initiator] = wire.SyncGroupMemberInfo{SyncPriority: 1}
- return setSGDataEntry(ctx, tx, gid, version, sg)
- })
-
- if err != nil && verror.ErrorID(err) != verror.ErrExist.ID {
- vlog.Errorf("sync: addInitiatorToSyncGroup: initiator %s, sgid %d: %v", rSt.initiator, gid, err)
- }
-}
-
// sendSgDeltas computes the bound on missing generations, and sends the missing
// log records across all requested SyncGroups (phases 2 and 3 of sendDeltas).
func (rSt *responderState) sendSgDeltas(ctx *context.T) error {
- vlog.VI(3).Infof("sync: sendSgDeltas: %s, %s: sgids %v, genvec %v",
- rSt.appName, rSt.dbName, rSt.sgIds, rSt.initVec)
-
respVec, _, err := rSt.sync.copyDbGenInfo(ctx, rSt.appName, rSt.dbName, rSt.sgIds)
if err != nil {
return err
@@ -367,7 +320,7 @@
rSt.outVec[pfx] = respgv
}
- vlog.VI(3).Infof("sync: computeDeltaBound: %s, %s: diff %v, outvec %v",
+ vlog.VI(3).Infof("sync: computeDataDeltas: %s, %s: diff %v, outvec %v",
rSt.appName, rSt.dbName, rSt.diff, rSt.outVec)
return nil
}
@@ -411,7 +364,7 @@
if rSt.sg || !filterLogRec(rec, rSt.initVec, initPfxs) {
// Send on the wire.
- wireRec, err := makeWireLogRec(ctx, rSt.st, rec)
+ wireRec, err := makeWireLogRec(ctx, rSt.sg, rSt.st, rec)
if err != nil {
return err
}
@@ -434,6 +387,8 @@
}
func (rSt *responderState) sendGenVec(ctx *context.T) error {
+ vlog.VI(3).Infof("sync: sendGenVec: sending genvec %v", rSt.outVec)
+
sender := rSt.call.SendStream()
sender.Send(interfaces.DeltaRespRespVec{rSt.outVec})
return nil
@@ -553,11 +508,20 @@
// makeWireLogRec creates a sync log record to send on the wire from a given
// local sync record.
-func makeWireLogRec(ctx *context.T, st store.Store, rec *localLogRec) (*interfaces.LogRec, error) {
+func makeWireLogRec(ctx *context.T, sg bool, st store.Store, rec *localLogRec) (*interfaces.LogRec, error) {
// Get the object value at the required version.
key, version := rec.Metadata.ObjId, rec.Metadata.CurVers
var value []byte
- if !rec.Metadata.Delete {
+ if sg {
+ sg, err := getSGDataEntryByOID(ctx, st, key, version)
+ if err != nil {
+ return nil, err
+ }
+ value, err = vom.Encode(sg)
+ if err != nil {
+ return nil, err
+ }
+ } else if !rec.Metadata.Delete {
var err error
value, err = watchable.GetAtVersion(ctx, st, []byte(key), nil, []byte(version))
if err != nil {
diff --git a/services/syncbase/vsync/sync_state.go b/services/syncbase/vsync/sync_state.go
index d820d8f..953f150 100644
--- a/services/syncbase/vsync/sync_state.go
+++ b/services/syncbase/vsync/sync_state.go
@@ -57,7 +57,6 @@
"v.io/v23/context"
"v.io/v23/verror"
- "v.io/x/lib/vlog"
"v.io/x/ref/services/syncbase/server/interfaces"
"v.io/x/ref/services/syncbase/server/util"
"v.io/x/ref/services/syncbase/store"
@@ -82,8 +81,11 @@
// dbSyncStateInMem represents the in-memory sync state of a Database and all
// its SyncGroups.
type dbSyncStateInMem struct {
- data *localGenInfoInMem // info for data.
- sgs map[interfaces.GroupId]*localGenInfoInMem // info for SyncGroups.
+ data *localGenInfoInMem // info for data.
+
+ // Info for SyncGroups. The key here is the SyncGroup oid of the form
+ // $sync:sgd:<group id>. More details in syncgroup.go.
+ sgs map[string]*localGenInfoInMem
// Note: Generation vector contains state from remote devices only.
genvec interfaces.GenVector
@@ -94,9 +96,9 @@
out := &dbSyncStateInMem{}
out.data = in.data.deepCopy()
- out.sgs = make(map[interfaces.GroupId]*localGenInfoInMem)
- for id, info := range in.sgs {
- out.sgs[id] = info.deepCopy()
+ out.sgs = make(map[string]*localGenInfoInMem)
+ for oid, info := range in.sgs {
+ out.sgs[oid] = info.deepCopy()
}
out.genvec = in.genvec.DeepCopy()
@@ -200,25 +202,22 @@
// Note: For all the utilities below, if the sgid parameter is non-nil, the
// operation is performed in the SyncGroup space. If nil, it is performed in the
// data space for the Database.
-//
-// TODO(hpucha): Once GroupId is changed to string, clean up these function
-// signatures.
// reserveGenAndPosInDbLog reserves a chunk of generation numbers and log
// positions in a Database's log. Used when local updates result in log
// entries.
-func (s *syncService) reserveGenAndPosInDbLog(ctx *context.T, appName, dbName, sgid string, count uint64) (uint64, uint64) {
- return s.reserveGenAndPosInternal(appName, dbName, sgid, count, count)
+func (s *syncService) reserveGenAndPosInDbLog(ctx *context.T, appName, dbName, sgoid string, count uint64) (uint64, uint64) {
+ return s.reserveGenAndPosInternal(appName, dbName, sgoid, count, count)
}
// reservePosInDbLog reserves a chunk of log positions in a Database's log. Used
// when remote log records are received.
-func (s *syncService) reservePosInDbLog(ctx *context.T, appName, dbName, sgid string, count uint64) uint64 {
- _, pos := s.reserveGenAndPosInternal(appName, dbName, sgid, 0, count)
+func (s *syncService) reservePosInDbLog(ctx *context.T, appName, dbName, sgoid string, count uint64) uint64 {
+ _, pos := s.reserveGenAndPosInternal(appName, dbName, sgoid, 0, count)
return pos
}
-func (s *syncService) reserveGenAndPosInternal(appName, dbName, sgid string, genCount, posCount uint64) (uint64, uint64) {
+func (s *syncService) reserveGenAndPosInternal(appName, dbName, sgoid string, genCount, posCount uint64) (uint64, uint64) {
s.syncStateLock.Lock()
defer s.syncStateLock.Unlock()
@@ -227,23 +226,18 @@
if !ok {
ds = &dbSyncStateInMem{
data: &localGenInfoInMem{gen: 1},
- sgs: make(map[interfaces.GroupId]*localGenInfoInMem),
+ sgs: make(map[string]*localGenInfoInMem),
}
s.syncState[name] = ds
}
var info *localGenInfoInMem
- if sgid != "" {
- id, err := strconv.ParseUint(sgid, 10, 64)
- if err != nil {
- vlog.Fatalf("sync: reserveGenAndPosInternal: invalid syncgroup id", sgid)
- }
-
+ if sgoid != "" {
var ok bool
- info, ok = ds.sgs[interfaces.GroupId(id)]
+ info, ok = ds.sgs[sgoid]
if !ok {
info = &localGenInfoInMem{gen: 1}
- ds.sgs[interfaces.GroupId(id)] = info
+ ds.sgs[sgoid] = info
}
} else {
info = ds.data
@@ -273,7 +267,7 @@
if len(sgs) > 0 {
// Checkpoint requested SyncGroups.
for id := range sgs {
- info, ok := ds.sgs[id]
+ info, ok := ds.sgs[sgOID(id)]
if !ok {
return verror.New(verror.ErrInternal, ctx, "sg state not found", name, id)
}
@@ -286,7 +280,7 @@
}
// initSyncStateInMem initializes the in memory sync state of the Database/SyncGroup if needed.
-func (s *syncService) initSyncStateInMem(ctx *context.T, appName, dbName string, sgid interfaces.GroupId) {
+func (s *syncService) initSyncStateInMem(ctx *context.T, appName, dbName string, sgoid string) {
s.syncStateLock.Lock()
defer s.syncStateLock.Unlock()
@@ -294,13 +288,13 @@
if s.syncState[name] == nil {
s.syncState[name] = &dbSyncStateInMem{
data: &localGenInfoInMem{gen: 1},
- sgs: make(map[interfaces.GroupId]*localGenInfoInMem),
+ sgs: make(map[string]*localGenInfoInMem),
}
}
- if sgid != interfaces.NoGroupId {
+ if sgoid != "" {
ds := s.syncState[name]
- if _, ok := ds.sgs[sgid]; !ok {
- ds.sgs[sgid] = &localGenInfoInMem{gen: 1}
+ if _, ok := ds.sgs[sgoid]; !ok {
+ ds.sgs[sgoid] = &localGenInfoInMem{gen: 1}
}
}
return
@@ -335,10 +329,10 @@
if len(sgs) > 0 {
genvec = make(interfaces.GenVector)
for id := range sgs {
- sid := fmt.Sprintf("%d", id)
- gv := ds.sggenvec[sid]
- genvec[sid] = gv.DeepCopy()
- genvec[sid][s.id] = ds.sgs[id].checkptGen
+ sgoid := sgOID(id)
+ gv := ds.sggenvec[sgoid]
+ genvec[sgoid] = gv.DeepCopy()
+ genvec[sgoid][s.id] = ds.sgs[sgoid].checkptGen
}
} else {
genvec = ds.genvec.DeepCopy()
diff --git a/services/syncbase/vsync/sync_state_test.go b/services/syncbase/vsync/sync_state_test.go
index 3036611..5a3b2c8 100644
--- a/services/syncbase/vsync/sync_state_test.go
+++ b/services/syncbase/vsync/sync_state_test.go
@@ -6,7 +6,6 @@
import (
"reflect"
- "strconv"
"testing"
"time"
@@ -40,11 +39,7 @@
if sgid == "" {
info = s.syncState[name].data
} else {
- id, err := strconv.ParseUint(sgid, 10, 64)
- if err != nil {
- t.Fatalf("reserveGenAndPosInternal failed, invalid sgid %v", sgid)
- }
- info = s.syncState[name].sgs[interfaces.GroupId(id)]
+ info = s.syncState[name].sgs[sgid]
}
if info.gen != wantGen || info.pos != wantPos {
t.Fatalf("reserveGenAndPosInternal failed, gotGen %v wantGen %v, gotPos %v wantPos %v", info.gen, wantGen, info.pos, wantPos)
@@ -74,9 +69,9 @@
100: 200, 300: 400, 500: 600,
},
}
- localsgs := make(map[interfaces.GroupId]localGenInfo)
- localsgs[interfaces.GroupId(8888)] = localGenInfo{Gen: 56, CheckptGen: 2000}
- localsgs[interfaces.GroupId(1008888)] = localGenInfo{Gen: 25890, CheckptGen: 100}
+ localsgs := make(map[string]localGenInfo)
+ localsgs["8888"] = localGenInfo{Gen: 56, CheckptGen: 2000}
+ localsgs["1008888"] = localGenInfo{Gen: 25890, CheckptGen: 100}
tx := st.NewTransaction()
wantSt := &dbSyncState{
diff --git a/services/syncbase/vsync/syncgroup.go b/services/syncbase/vsync/syncgroup.go
index 2fb5479..98e0e84 100644
--- a/services/syncbase/vsync/syncgroup.go
+++ b/services/syncbase/vsync/syncgroup.go
@@ -15,6 +15,7 @@
import (
"fmt"
+ "strconv"
"strings"
"time"
@@ -212,28 +213,30 @@
version = newSyncGroupVersion()
}
+ oid := sgOID(sg.Id)
+
// Add the SyncGroup versioned data entry.
- if err := setSGDataEntry(ctx, tx, sg.Id, version, sg); err != nil {
+ if err := setSGDataEntryByOID(ctx, tx, oid, version, sg); err != nil {
return err
}
- // Add a sync log record for the SyncGroup if needed.
- oid := sgIdKey(sg.Id)
- logKey := ""
- if withLog {
- if err := addSyncGroupLogRec(ctx, tx, sg.Id, version, servId, gen, pos); err != nil {
- return err
- }
- logKey = logRecKey(oid, servId, gen)
- }
-
- // Add the SyncGroup to the DAG.
var parents []string
if head, err := getHead(ctx, tx, oid); err == nil {
parents = []string{head}
} else if verror.ErrorID(err) != verror.ErrNoExist.ID {
return err
}
+
+ // Add a sync log record for the SyncGroup if needed.
+ logKey := ""
+ if withLog {
+ if err := addSyncGroupLogRec(ctx, tx, oid, version, parents, servId, gen, pos); err != nil {
+ return err
+ }
+ logKey = logRecKey(oid, servId, gen)
+ }
+
+ // Add the SyncGroup to the DAG.
if err := s.addNode(ctx, tx, oid, version, logKey, false, parents, NoBatchId, nil); err != nil {
return err
}
@@ -241,12 +244,12 @@
}
// addSyncGroupLogRec adds a new local log record for a SyncGroup.
-func addSyncGroupLogRec(ctx *context.T, tx store.Transaction, gid interfaces.GroupId, version string, servId, gen, pos uint64) error {
- oid := sgIdKey(gid)
+func addSyncGroupLogRec(ctx *context.T, tx store.Transaction, oid, version string, parents []string, servId, gen, pos uint64) error {
rec := &localLogRec{
Metadata: interfaces.LogRecMetadata{
ObjId: oid,
CurVers: version,
+ Parents: parents,
Delete: false,
UpdTime: watchable.GetStoreTime(ctx, tx),
Id: servId,
@@ -267,7 +270,7 @@
// getSyncGroupVersion retrieves the current version of the SyncGroup.
func getSyncGroupVersion(ctx *context.T, st store.StoreReader, gid interfaces.GroupId) (string, error) {
- return getHead(ctx, st, sgIdKey(gid))
+ return getHead(ctx, st, sgOID(gid))
}
// getSyncGroupById retrieves the SyncGroup given its ID.
@@ -321,7 +324,7 @@
// nodes). This is done separately from pruning the DAG nodes because
// some nodes may have no log record pointing back to the SyncGroup data
// entries (loose coupling to support the pending SyncGroup state).
- oid := sgIdKey(gid)
+ oid := sgOID(gid)
err = forEachAncestor(ctx, tx, oid, []string{version}, func(v string, nd *dagNode) error {
return delSGDataEntry(ctx, tx, gid, v)
})
@@ -491,21 +494,11 @@
// Note: as with other syncable objects, the DAG "heads" table contains
// a reference to the current SyncGroup version, and the DAG "nodes"
// table tracks its history of mutations.
- // TODO(rdaoud): change the data key prefix to use the SG OID instead
- // of its ID, to be similar to the versioned user data keys. The OID
- // would use another SG-data prefix: "$sync:sgd:<gid>" and the data
- // entry: "$sync:sgd:<gid>:<version>" (i.e. <oid>:<version>).
sgNameKeyPrefix = util.JoinKeyParts(util.SyncPrefix, sgPrefix, "n")
sgIdKeyPrefix = util.JoinKeyParts(util.SyncPrefix, sgPrefix, "i")
- sgDataKeyPrefix = util.JoinKeyParts(util.SyncPrefix, sgPrefix, "d")
+ sgDataKeyPrefix = util.JoinKeyParts(util.SyncPrefix, sgDataPrefix)
)
-// sgIdStr returns the SyncGroup ID in string format.
-// TODO(rdaoud): delete when the SG ID becomes a string throughout.
-func sgIdStr(gid interfaces.GroupId) string {
- return fmt.Sprintf("%d", uint64(gid))
-}
-
// sgNameKey returns the key used to access the SyncGroup name entry.
func sgNameKey(name string) string {
return util.JoinKeyParts(sgNameKeyPrefix, name)
@@ -516,11 +509,35 @@
return util.JoinKeyParts(sgIdKeyPrefix, fmt.Sprintf("%d", gid))
}
+// sgOID converts a group id into an oid string.
+func sgOID(gid interfaces.GroupId) string {
+ return util.JoinKeyParts(sgDataKeyPrefix, fmt.Sprintf("%d", gid))
+}
+
+// sgID is the inverse of sgOID and converts an oid string into a group id.
+func sgID(oid string) (interfaces.GroupId, error) {
+ parts := util.SplitKeyParts(oid)
+ if len(parts) != 3 {
+ return 0, fmt.Errorf("invalid sgoid %s", oid)
+ }
+
+ id, err := strconv.ParseUint(parts[2], 10, 64)
+ if err != nil {
+ return 0, err
+ }
+ return interfaces.GroupId(id), nil
+}
+
// sgDataKey returns the key used to access a version of the SyncGroup data.
func sgDataKey(gid interfaces.GroupId, version string) string {
return util.JoinKeyParts(sgDataKeyPrefix, fmt.Sprintf("%d", gid), version)
}
+// sgDataKeyByOID returns the key used to access a version of the SyncGroup data.
+func sgDataKeyByOID(oid, version string) string {
+ return util.JoinKeyParts(oid, version)
+}
+
// splitSgNameKey is the inverse of sgNameKey and returns the SyncGroup name.
func splitSgNameKey(ctx *context.T, key string) (string, error) {
// Note that the actual SyncGroup name may contain ":" as a separator.
@@ -558,9 +575,9 @@
return util.Put(ctx, tx, sgIdKey(gid), state)
}
-// setSGDataEntry stores the SyncGroup versioned data entry.
-func setSGDataEntry(ctx *context.T, tx store.Transaction, gid interfaces.GroupId, version string, sg *interfaces.SyncGroup) error {
- return util.Put(ctx, tx, sgDataKey(gid, version), sg)
+// setSGDataEntryByOID stores the SyncGroup versioned data entry.
+func setSGDataEntryByOID(ctx *context.T, tx store.Transaction, sgoid, version string, sg *interfaces.SyncGroup) error {
+ return util.Put(ctx, tx, sgDataKeyByOID(sgoid, version), sg)
}
// getSGNameEntry retrieves the SyncGroup ID for a given name.
@@ -590,6 +607,15 @@
return &sg, nil
}
+// getSGDataEntryByOID retrieves the SyncGroup data for a given group OID and version.
+func getSGDataEntryByOID(ctx *context.T, st store.StoreReader, sgoid string, version string) (*interfaces.SyncGroup, error) {
+ var sg interfaces.SyncGroup
+ if err := util.Get(ctx, st, sgDataKeyByOID(sgoid, version), &sg); err != nil {
+ return nil, err
+ }
+ return &sg, nil
+}
+
// delSGNameEntry deletes the SyncGroup name entry.
func delSGNameEntry(ctx *context.T, tx store.Transaction, name string) error {
return util.Delete(ctx, tx, sgNameKey(name))
@@ -642,8 +668,7 @@
// has Admin privilege.
// Reserve a log generation and position counts for the new SyncGroup.
- //gen, pos := ss.reserveGenAndPosInDbLog(ctx, appName, dbName, sgIdStr(gid), 1)
- gen, pos := uint64(1), uint64(1)
+ gen, pos := ss.reserveGenAndPosInDbLog(ctx, appName, dbName, sgOID(gid), 1)
if err := ss.addSyncGroup(ctx, tx, version, true, "", nil, ss.id, gen, pos, sg); err != nil {
return err
@@ -657,7 +682,7 @@
return err
}
- ss.initSyncStateInMem(ctx, appName, dbName, gid)
+ ss.initSyncStateInMem(ctx, appName, dbName, sgOID(gid))
// Local SG create succeeded. Publish the SG at the chosen server, or if
// that fails, enqueue it for later publish retries.
@@ -780,7 +805,7 @@
return nullSpec, err
}
- ss.initSyncStateInMem(ctx, sg2.AppName, sg2.DbName, sg2.Id)
+ ss.initSyncStateInMem(ctx, sg2.AppName, sg2.DbName, sgOID(sg2.Id))
// Publish at the chosen mount table and in the neighborhood.
sd.publishInMountTables(ctx, call, sg2.Spec)
@@ -878,7 +903,7 @@
}
ss := sd.sync.(*syncService)
- //appName, dbName := sd.db.App().Name(), sd.db.Name()
+ appName, dbName := sd.db.App().Name(), sd.db.Name()
err := store.RunInTransaction(sd.db.St(), func(tx store.Transaction) error {
// Check permissions on Database.
@@ -909,8 +934,7 @@
}
// Reserve a log generation and position counts for the new SyncGroup.
- //gen, pos := ss.reserveGenAndPosInDbLog(ctx, appName, dbName, sgIdStr(sg.Id), 1)
- gen, pos := uint64(1), uint64(1)
+ gen, pos := ss.reserveGenAndPosInDbLog(ctx, appName, dbName, sgOID(sg.Id), 1)
// TODO(hpucha): Check SyncGroup ACL.
@@ -935,7 +959,7 @@
func (sd *syncDatabase) publishSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string) error {
st := sd.db.St()
ss := sd.sync.(*syncService)
- //appName, dbName := sd.db.App().Name(), sd.db.Name()
+ appName, dbName := sd.db.App().Name(), sd.db.Name()
gid, err := getSyncGroupId(ctx, st, sgName)
if err != nil {
@@ -959,11 +983,18 @@
// and joiner list of the SyncGroup get updated. This is functionally
// correct, just not symmetrical with what happens at joiner, which
// receives the SyncGroup state post-join.
- // TODO(rdaoud): send the SyncGroup genvec to the remote peer.
status := interfaces.SyncGroupStatusPublishRejected
+ sgs := sgSet{gid: struct{}{}}
+ gv, _, err := ss.copyDbGenInfo(ctx, appName, dbName, sgs)
+ if err != nil {
+ return err
+ }
+ // TODO(hpucha): Do we want to pick the head version corresponding to
+ // the local gen of the sg? It appears that it shouldn't matter.
+
c := interfaces.SyncClient(sgName)
- peer, err := c.PublishSyncGroup(ctx, ss.name, *sg, version, nil)
+ peer, err := c.PublishSyncGroup(ctx, ss.name, *sg, version, gv[sgOID(gid)])
if err == nil {
status = interfaces.SyncGroupStatusRunning
@@ -1007,8 +1038,7 @@
// Reserve a log generation and position counts for the new
// SyncGroup version.
- //gen, pos := ss.reserveGenAndPosInDbLog(ctx, appName, dbName, sgIdStr(gid), 1)
- gen, pos := uint64(1), uint64(1)
+ gen, pos := ss.reserveGenAndPosInDbLog(ctx, appName, dbName, sgOID(gid), 1)
sg.Status = status
if status == interfaces.SyncGroupStatusRunning {
@@ -1176,12 +1206,15 @@
})
if err == nil {
- s.initSyncStateInMem(ctx, sg.AppName, sg.DbName, sg.Id)
+ s.initSyncStateInMem(ctx, sg.AppName, sg.DbName, sgOID(sg.Id))
}
return s.name, err
}
func (s *syncService) JoinSyncGroupAtAdmin(ctx *context.T, call rpc.ServerCall, sgName, joinerName string, joinerInfo wire.SyncGroupMemberInfo) (interfaces.SyncGroup, string, interfaces.PrefixGenVector, error) {
+ vlog.VI(2).Infof("sync: JoinSyncGroupAtAdmin: begin: %s from peer %s", sgName, joinerName)
+ defer vlog.VI(2).Infof("sync: JoinSyncGroupAtAdmin: end: %s from peer %s", sgName, joinerName)
+
var dbSt store.Store
var gid interfaces.GroupId
var err error
@@ -1212,6 +1245,7 @@
version := newSyncGroupVersion()
var sg *interfaces.SyncGroup
+ var gen, pos uint64
err = store.RunInTransaction(dbSt, func(tx store.Transaction) error {
var err error
@@ -1235,8 +1269,7 @@
}
// Reserve a log generation and position counts for the new SyncGroup.
- //gen, pos := s.reserveGenAndPosInDbLog(ctx, stAppName, stDbName, sgIdStr(gid), 1)
- gen, pos := uint64(1), uint64(1)
+ gen, pos = s.reserveGenAndPosInDbLog(ctx, stAppName, stDbName, sgOID(gid), 1)
// Add to joiner list.
sg.Joiners[joinerName] = joinerInfo
@@ -1246,6 +1279,17 @@
if err != nil {
return nullSG, "", nullGV, err
}
- // TODO(rdaoud): return the SyncGroup genvec
- return *sg, version, nullGV, nil
+
+ sgs := sgSet{gid: struct{}{}}
+ gv, _, err := s.copyDbGenInfo(ctx, stAppName, stDbName, sgs)
+ if err != nil {
+ return nullSG, "", nullGV, err
+ }
+ // The retrieved genvector does not contain the mutation that adds the
+ // joiner to the list since initiator is the one checkpointing the
+ // generations. Add that generation to this genvector.
+ gv[sgOID(gid)][s.id] = gen
+
+ vlog.VI(2).Infof("sync: JoinSyncGroupAtAdmin: returning: sg %v, vers %v, genvec %v", sg, version, gv[sgOID(gid)])
+ return *sg, version, gv[sgOID(gid)], nil
}
diff --git a/services/syncbase/vsync/types.vdl b/services/syncbase/vsync/types.vdl
index 820dec6..054eb20 100644
--- a/services/syncbase/vsync/types.vdl
+++ b/services/syncbase/vsync/types.vdl
@@ -15,7 +15,8 @@
logDataPrefix = "data" // data log state.
dbssPrefix = "dbss" // database sync state.
dagPrefix = "dag" // dag state.
- sgPrefix = "sg" // syncgroup state.
+ sgPrefix = "sg" // local syncgroup state.
+ sgDataPrefix = "sgd" // synced syncgroup state.
)
// syncData represents the persistent state of the sync module.
@@ -32,7 +33,7 @@
// dbSyncState represents the persistent sync state of a Database.
type dbSyncState struct {
Data localGenInfo
- Sgs map[interfaces.GroupId]localGenInfo
+ Sgs map[string]localGenInfo
GenVec interfaces.GenVector // generation vector capturing the locally-known generations of remote peers for data in Database.
SgGenVec interfaces.GenVector // generation vector capturing the locally-known generations of remote peers for SyncGroups in Database.
}
diff --git a/services/syncbase/vsync/types.vdl.go b/services/syncbase/vsync/types.vdl.go
index e352de4..3726fb0 100644
--- a/services/syncbase/vsync/types.vdl.go
+++ b/services/syncbase/vsync/types.vdl.go
@@ -39,7 +39,7 @@
// dbSyncState represents the persistent sync state of a Database.
type dbSyncState struct {
Data localGenInfo
- Sgs map[interfaces.GroupId]localGenInfo
+ Sgs map[string]localGenInfo
GenVec interfaces.GenVector // generation vector capturing the locally-known generations of remote peers for data in Database.
SgGenVec interfaces.GenVector // generation vector capturing the locally-known generations of remote peers for SyncGroups in Database.
}
@@ -156,4 +156,6 @@
const dagPrefix = "dag" // dag state.
-const sgPrefix = "sg" // syncgroup state.
+const sgPrefix = "sg" // local syncgroup state.
+
+const sgDataPrefix = "sgd" // synced syncgroup state.
diff --git a/services/syncbase/vsync/watcher.go b/services/syncbase/vsync/watcher.go
index 9c16294..5b8b469 100644
--- a/services/syncbase/vsync/watcher.go
+++ b/services/syncbase/vsync/watcher.go
@@ -120,7 +120,7 @@
}
// Initialize Database sync state if needed.
- s.initSyncStateInMem(ctx, appName, dbName, interfaces.NoGroupId)
+ s.initSyncStateInMem(ctx, appName, dbName, "")
// Get a batch of watch log entries, if any, after this resume marker.
logs, nextResmark, err := watchable.ReadBatchFromLog(st, resMark)