Merge "internal storage engine: improve compile-time typing"
diff --git a/v23/syncbase/nosql/internal/query/query_functions/date_funcs.go b/v23/syncbase/nosql/internal/query/query_functions/date_funcs.go
index 1a1ed91..f62fb78 100644
--- a/v23/syncbase/nosql/internal/query/query_functions/date_funcs.go
+++ b/v23/syncbase/nosql/internal/query/query_functions/date_funcs.go
@@ -80,7 +80,7 @@
// Input: "YYYY-MM-DD TZ"
// "2015-03-17 PDT"
-func date(off int64, args []*query_parser.Operand) (*query_parser.Operand, error) {
+func date(db query_db.Database, off int64, args []*query_parser.Operand) (*query_parser.Operand, error) {
dateStrOp, err := conversions.ConvertValueToString(args[0])
if err != nil {
return nil, err
@@ -95,7 +95,7 @@
// "YYYY-MM-DD HH:MI:SS TZ"
// "2015-03-17 13:22:17 PDT"
-func dateTime(off int64, args []*query_parser.Operand) (*query_parser.Operand, error) {
+func dateTime(db query_db.Database, off int64, args []*query_parser.Operand) (*query_parser.Operand, error) {
dateStrOp, err := conversions.ConvertValueToString(args[0])
if err != nil {
return nil, err
@@ -109,7 +109,7 @@
}
// y(v.InvoiceDate, "America/Los_Angeles")
-func y(off int64, args []*query_parser.Operand) (*query_parser.Operand, error) {
+func y(db query_db.Database, off int64, args []*query_parser.Operand) (*query_parser.Operand, error) {
var timeOp *query_parser.Operand
var locOp *query_parser.Operand
var err error
@@ -132,7 +132,7 @@
}
// ym(v.InvoiceDate, "America/Los_Angeles")
-func ym(off int64, args []*query_parser.Operand) (*query_parser.Operand, error) {
+func ym(db query_db.Database, off int64, args []*query_parser.Operand) (*query_parser.Operand, error) {
var timeOp *query_parser.Operand
var locOp *query_parser.Operand
var err error
@@ -155,7 +155,7 @@
}
// ymd(v.InvoiceDate, "America/Los_Angeles")
-func ymd(off int64, args []*query_parser.Operand) (*query_parser.Operand, error) {
+func ymd(db query_db.Database, off int64, args []*query_parser.Operand) (*query_parser.Operand, error) {
var timeOp *query_parser.Operand
var locOp *query_parser.Operand
var err error
@@ -178,7 +178,7 @@
}
// ymdh(v.InvoiceDate, "America/Los_Angeles")
-func ymdh(off int64, args []*query_parser.Operand) (*query_parser.Operand, error) {
+func ymdh(db query_db.Database, off int64, args []*query_parser.Operand) (*query_parser.Operand, error) {
var timeOp *query_parser.Operand
var locOp *query_parser.Operand
var err error
@@ -201,7 +201,7 @@
}
// ymdhm(v.InvoiceDate, "America/Los_Angeles")
-func ymdhm(off int64, args []*query_parser.Operand) (*query_parser.Operand, error) {
+func ymdhm(db query_db.Database, off int64, args []*query_parser.Operand) (*query_parser.Operand, error) {
var timeOp *query_parser.Operand
var locOp *query_parser.Operand
var err error
@@ -224,7 +224,7 @@
}
// ymdhms(v.InvoiceDate, "America/Los_Angeles")
-func ymdhms(off int64, args []*query_parser.Operand) (*query_parser.Operand, error) {
+func ymdhms(db query_db.Database, off int64, args []*query_parser.Operand) (*query_parser.Operand, error) {
var timeOp *query_parser.Operand
var locOp *query_parser.Operand
var err error
@@ -247,7 +247,7 @@
}
// now()
-func now(off int64, args []*query_parser.Operand) (*query_parser.Operand, error) {
+func now(db query_db.Database, off int64, args []*query_parser.Operand) (*query_parser.Operand, error) {
return makeTimeOp(off, time.Now()), nil
}
diff --git a/v23/syncbase/nosql/internal/query/query_functions/math_funcs.go b/v23/syncbase/nosql/internal/query/query_functions/math_funcs.go
index 5628180..903a842 100644
--- a/v23/syncbase/nosql/internal/query/query_functions/math_funcs.go
+++ b/v23/syncbase/nosql/internal/query/query_functions/math_funcs.go
@@ -11,7 +11,7 @@
"v.io/syncbase/v23/syncbase/nosql/syncql"
)
-func complexFunc(off int64, args []*query_parser.Operand) (*query_parser.Operand, error) {
+func complexFunc(db query_db.Database, off int64, args []*query_parser.Operand) (*query_parser.Operand, error) {
r, err := conversions.ConvertValueToFloat(args[0])
if err != nil {
return nil, err
diff --git a/v23/syncbase/nosql/internal/query/query_functions/obj_funcs.go b/v23/syncbase/nosql/internal/query/query_functions/obj_funcs.go
new file mode 100644
index 0000000..4372c85
--- /dev/null
+++ b/v23/syncbase/nosql/internal/query/query_functions/obj_funcs.go
@@ -0,0 +1,35 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package query_functions
+
+import (
+ "v.io/syncbase/v23/syncbase/nosql/internal/query/query_parser"
+ "v.io/syncbase/v23/syncbase/nosql/query_db"
+ "v.io/syncbase/v23/syncbase/nosql/syncql"
+ "v.io/v23/vdl"
+)
+
+// len returns the size of the argument passed in.
+// For vdl's Array, List, Map and Set, it returns the number of entries.
+// For TypString, it returns the number of bytes in the string.
+// For TypNil, it returns 0.
+// For all other types, it returns an error.
+// e.g., Len("abc") returns 3
+func lenFunc(db query_db.Database, off int64, args []*query_parser.Operand) (*query_parser.Operand, error) {
+ switch args[0].Type {
+ case query_parser.TypNil:
+ return makeIntOp(args[0].Off, 0), nil
+ case query_parser.TypObject:
+ switch args[0].Object.Kind() {
+ case vdl.Array, vdl.List, vdl.Map, vdl.Set:
+ // If array, list, set, map, call Value.Len()
+ return makeIntOp(args[0].Off, int64(args[0].Object.Len())), nil
+ }
+ case query_parser.TypStr:
+ // If string, call go's built-in len().
+ return makeIntOp(args[0].Off, int64(len(args[0].Str))), nil
+ }
+ return nil, syncql.NewErrFunctionLenInvalidArg(db.GetContext(), args[0].Off)
+}
diff --git a/v23/syncbase/nosql/internal/query/query_functions/query_functions.go b/v23/syncbase/nosql/internal/query/query_functions/query_functions.go
index 2919ad2..250c089 100644
--- a/v23/syncbase/nosql/internal/query/query_functions/query_functions.go
+++ b/v23/syncbase/nosql/internal/query/query_functions/query_functions.go
@@ -14,7 +14,7 @@
"v.io/v23/vdl"
)
-type queryFunc func(int64, []*query_parser.Operand) (*query_parser.Operand, error)
+type queryFunc func(query_db.Database, int64, []*query_parser.Operand) (*query_parser.Operand, error)
type checkArgsFunc func(query_db.Database, int64, []*query_parser.Operand) error
type function struct {
@@ -40,10 +40,13 @@
functions["now"] = function{[]query_parser.OperandType{}, query_parser.TypTime, now, nil}
functions["lowercase"] = function{[]query_parser.OperandType{query_parser.TypStr}, query_parser.TypStr, lowerCase, singleStringArgCheck}
+ functions["split"] = function{[]query_parser.OperandType{query_parser.TypStr, query_parser.TypStr}, query_parser.TypObject, split, twoStringArgsCheck}
functions["type"] = function{[]query_parser.OperandType{query_parser.TypObject}, query_parser.TypStr, typeFunc, singleFieldArgCheck}
functions["uppercase"] = function{[]query_parser.OperandType{query_parser.TypStr}, query_parser.TypStr, upperCase, singleStringArgCheck}
functions["complex"] = function{[]query_parser.OperandType{query_parser.TypFloat, query_parser.TypFloat}, query_parser.TypComplex, complexFunc, twoFloatsArgsCheck}
+
+ functions["len"] = function{[]query_parser.OperandType{query_parser.TypObject}, query_parser.TypInt, lenFunc, nil}
}
// Check that function exists and that the number of args passed matches the spec.
@@ -115,7 +118,7 @@
if entry, ok := functions[strings.ToLower(f.Name)]; !ok {
return nil, syncql.NewErrFunctionNotFound(db.GetContext(), f.Off, f.Name)
} else {
- retValue, err := entry.funcAddr(f.Off, args)
+ retValue, err := entry.funcAddr(db, f.Off, args)
if err != nil {
return nil, err
} else {
@@ -169,10 +172,25 @@
return &o
}
+func makeIntOp(off int64, i int64) *query_parser.Operand {
+ var o query_parser.Operand
+ o.Off = off
+ o.Type = query_parser.TypInt
+ o.Int = i
+ return &o
+}
+
func singleStringArgCheck(db query_db.Database, off int64, args []*query_parser.Operand) error {
return checkIfPossibleThatArgIsConvertableToString(db, args[0])
}
+func twoStringArgsCheck(db query_db.Database, off int64, args []*query_parser.Operand) error {
+ if err := checkIfPossibleThatArgIsConvertableToString(db, args[0]); err != nil {
+ return err
+ }
+ return checkIfPossibleThatArgIsConvertableToString(db, args[1])
+}
+
func singleFieldArgCheck(db query_db.Database, off int64, args []*query_parser.Operand) error {
// single argument must be of type field
// It must begin with a v segment.
diff --git a/v23/syncbase/nosql/internal/query/query_functions/query_functions_test.go b/v23/syncbase/nosql/internal/query/query_functions/query_functions_test.go
index eef9604..fc5eae7 100644
--- a/v23/syncbase/nosql/internal/query/query_functions/query_functions_test.go
+++ b/v23/syncbase/nosql/internal/query/query_functions/query_functions_test.go
@@ -15,6 +15,7 @@
"v.io/syncbase/v23/syncbase/nosql/query_db"
"v.io/v23"
"v.io/v23/context"
+ "v.io/v23/vdl"
_ "v.io/x/ref/runtime/factories/generic"
"v.io/x/ref/test"
)
@@ -401,6 +402,153 @@
Str: "FOOBAR",
},
},
+ // Split
+ functionsTest{
+ &query_parser.Function{
+ Name: "Split",
+ Args: []*query_parser.Operand{
+ &query_parser.Operand{
+ Type: query_parser.TypStr,
+ Str: "alpha.bravo.charlie.delta.echo",
+ },
+ &query_parser.Operand{
+ Type: query_parser.TypStr,
+ Str: ".",
+ },
+ },
+ ArgTypes: []query_parser.OperandType{
+ query_parser.TypStr,
+ query_parser.TypStr,
+ },
+ RetType: query_parser.TypObject,
+ Computed: false,
+ RetValue: nil,
+ },
+ []*query_parser.Operand{
+ &query_parser.Operand{
+ Type: query_parser.TypStr,
+ Str: "alpha.bravo.charlie.delta.echo",
+ },
+ &query_parser.Operand{
+ Type: query_parser.TypStr,
+ Str: ".",
+ },
+ },
+ &query_parser.Operand{
+ Type: query_parser.TypObject,
+ Object: vdl.ValueOf([]string{"alpha", "bravo", "charlie", "delta", "echo"}),
+ },
+ },
+ // Len (of list)
+ functionsTest{
+ &query_parser.Function{
+ Name: "Len",
+ Args: []*query_parser.Operand{
+ &query_parser.Operand{
+ Type: query_parser.TypObject,
+ Object: vdl.ValueOf([]string{"alpha", "bravo"}),
+ },
+ },
+ ArgTypes: []query_parser.OperandType{
+ query_parser.TypObject,
+ },
+ RetType: query_parser.TypInt,
+ Computed: false,
+ RetValue: nil,
+ },
+ []*query_parser.Operand{
+ &query_parser.Operand{
+ Type: query_parser.TypObject,
+ Object: vdl.ValueOf([]string{"alpha", "bravo"}),
+ },
+ },
+ &query_parser.Operand{
+ Type: query_parser.TypInt,
+ Int: 2,
+ },
+ },
+ // Len (of nil)
+ functionsTest{
+ &query_parser.Function{
+ Name: "Len",
+ Args: []*query_parser.Operand{
+ &query_parser.Operand{
+ Type: query_parser.TypNil,
+ },
+ },
+ ArgTypes: []query_parser.OperandType{
+ query_parser.TypObject,
+ },
+ RetType: query_parser.TypInt,
+ Computed: false,
+ RetValue: nil,
+ },
+ []*query_parser.Operand{
+ &query_parser.Operand{
+ Type: query_parser.TypNil,
+ },
+ },
+ &query_parser.Operand{
+ Type: query_parser.TypInt,
+ Int: 0,
+ },
+ },
+ // Len (of string)
+ functionsTest{
+ &query_parser.Function{
+ Name: "Len",
+ Args: []*query_parser.Operand{
+ &query_parser.Operand{
+ Type: query_parser.TypStr,
+ Str: "foo",
+ },
+ },
+ ArgTypes: []query_parser.OperandType{
+ query_parser.TypObject,
+ },
+ RetType: query_parser.TypInt,
+ Computed: false,
+ RetValue: nil,
+ },
+ []*query_parser.Operand{
+ &query_parser.Operand{
+ Type: query_parser.TypStr,
+ Str: "foo",
+ },
+ },
+ &query_parser.Operand{
+ Type: query_parser.TypInt,
+ Int: 3,
+ },
+ },
+ // Len (of map)
+ functionsTest{
+ &query_parser.Function{
+ Name: "Len",
+ Args: []*query_parser.Operand{
+ &query_parser.Operand{
+ Type: query_parser.TypObject,
+ Object: vdl.ValueOf(map[string]string{"alpha": "ALPHA", "bravo": "BRAVO"}),
+ },
+ },
+ ArgTypes: []query_parser.OperandType{
+ query_parser.TypObject,
+ },
+ RetType: query_parser.TypInt,
+ Computed: false,
+ RetValue: nil,
+ },
+ []*query_parser.Operand{
+ &query_parser.Operand{
+ Type: query_parser.TypObject,
+ Object: vdl.ValueOf(map[string]string{"alpha": "ALPHA", "bravo": "BRAVO"}),
+ },
+ },
+ &query_parser.Operand{
+ Type: query_parser.TypInt,
+ Int: 2,
+ },
+ },
}
for _, test := range tests {
diff --git a/v23/syncbase/nosql/internal/query/query_functions/str_funcs.go b/v23/syncbase/nosql/internal/query/query_functions/str_funcs.go
index 38d74aa..ec20b59 100644
--- a/v23/syncbase/nosql/internal/query/query_functions/str_funcs.go
+++ b/v23/syncbase/nosql/internal/query/query_functions/str_funcs.go
@@ -5,15 +5,16 @@
package query_functions
import (
- "errors"
"strings"
"v.io/syncbase/v23/syncbase/nosql/internal/query/conversions"
"v.io/syncbase/v23/syncbase/nosql/internal/query/query_parser"
+ "v.io/syncbase/v23/syncbase/nosql/query_db"
+ "v.io/syncbase/v23/syncbase/nosql/syncql"
"v.io/v23/vdl"
)
-func lowerCase(off int64, args []*query_parser.Operand) (*query_parser.Operand, error) {
+func lowerCase(db query_db.Database, off int64, args []*query_parser.Operand) (*query_parser.Operand, error) {
strOp, err := conversions.ConvertValueToString(args[0])
if err != nil {
return nil, err
@@ -21,7 +22,7 @@
return makeStrOp(off, strings.ToLower(strOp.Str)), nil
}
-func upperCase(off int64, args []*query_parser.Operand) (*query_parser.Operand, error) {
+func upperCase(db query_db.Database, off int64, args []*query_parser.Operand) (*query_parser.Operand, error) {
strOp, err := conversions.ConvertValueToString(args[0])
if err != nil {
return nil, err
@@ -29,12 +30,31 @@
return makeStrOp(off, strings.ToUpper(strOp.Str)), nil
}
-func typeFunc(off int64, args []*query_parser.Operand) (*query_parser.Operand, error) {
+func typeFunc(db query_db.Database, off int64, args []*query_parser.Operand) (*query_parser.Operand, error) {
// If operand is not an object, we can't get a type
if args[0].Type != query_parser.TypObject {
- return nil, errors.New("Type function argument must be object.")
+ return nil, syncql.NewErrFunctionTypeInvalidArg(db.GetContext(), args[0].Off)
}
- t := args[0].Object.Type()
- pkg, name := vdl.SplitIdent(t.Name())
- return makeStrOp(off, pkg+"."+name), nil
+ return makeStrOp(off, args[0].Object.Type().Name()), nil
+}
+
+// Split splits str (arg[0]) into substrings separated by sep (arg[1]) and returns an
+// array of substrings between those separators. If sep is empty, Split splits after each
+// UTF-8 sequence.
+// e.g., Split("abc.def.ghi", ".") an list of "abc", "def", "ghi"
+func split(db query_db.Database, off int64, args []*query_parser.Operand) (*query_parser.Operand, error) {
+ strArg, err := conversions.ConvertValueToString(args[0])
+ if err != nil {
+ return nil, err
+ }
+ sepArg, err := conversions.ConvertValueToString(args[1])
+ if err != nil {
+ return nil, err
+ }
+
+ var o query_parser.Operand
+ o.Off = args[0].Off
+ o.Type = query_parser.TypObject
+ o.Object = vdl.ValueOf(strings.Split(strArg.Str, sepArg.Str))
+ return &o, nil
}
diff --git a/v23/syncbase/nosql/internal/query/test/query_test.go b/v23/syncbase/nosql/internal/query/test/query_test.go
index f2bf178..b8c2e9c 100644
--- a/v23/syncbase/nosql/internal/query/test/query_test.go
+++ b/v23/syncbase/nosql/internal/query/test/query_test.go
@@ -1877,6 +1877,65 @@
[]string{"k", "v.Key"},
svPairs(100, 300),
},
+ {
+ // Split on .
+ "select Split(Type(v), \".\") from Customer where k = \"001\"",
+ []string{"Split"},
+ [][]*vdl.Value{
+ []*vdl.Value{vdl.ValueOf([]string{"v", "io/syncbase/v23/syncbase/nosql/internal/query/test", "Customer"})},
+ },
+ },
+ {
+ // Split on /
+ "select Split(Type(v), \"/\") from Customer where k = \"001\"",
+ []string{"Split"},
+ [][]*vdl.Value{
+ []*vdl.Value{vdl.ValueOf([]string{"v.io", "syncbase", "v23", "syncbase", "nosql", "internal", "query", "test.Customer"})},
+ },
+ },
+ {
+ // Split on /, Len of array
+ "select Len(Split(Type(v), \"/\")) from Customer where k = \"001\"",
+ []string{"Len"},
+ [][]*vdl.Value{
+ []*vdl.Value{vdl.ValueOf(int64(8))},
+ },
+ },
+ {
+ // Split on empty string, Len of array
+ // Split with sep == empty string splits on chars.
+ "select Len(Split(Type(v), \"\")) from Customer where k = \"001\"",
+ []string{"Len"},
+ [][]*vdl.Value{
+ []*vdl.Value{vdl.ValueOf(int64(len("v.io/syncbase/v23/syncbase/nosql/internal/query/test.Customer")))},
+ },
+ },
+ {
+ // Len of string, list and struct.
+ // returns len of string, elements in list and nil for struct.
+ "select Len(v.Name), Len(v.PreviousAddresses), Len(v.Credit) from Customer where k = \"002\"",
+ []string{"Len", "Len", "Len"},
+ [][]*vdl.Value{
+ []*vdl.Value{vdl.ValueOf(int64(13)), vdl.ValueOf(int64(2)), vdl.ValueOf(nil)},
+ },
+ },
+ {
+ // Len of set
+ "select Len(v.Credit.Report.ExperianReport.TdhApprovals) from Customer where Type(v) like \"%.Customer\" and k = \"003\"",
+ []string{"Len"},
+ [][]*vdl.Value{
+ []*vdl.Value{vdl.ValueOf(int64(2))},
+ },
+ },
+ {
+ // Len of map
+ "select Len(v.Map) from FunWithMaps",
+ []string{"Len"},
+ [][]*vdl.Value{
+ []*vdl.Value{vdl.ValueOf(int64(2))},
+ []*vdl.Value{vdl.ValueOf(int64(2))},
+ },
+ },
}
for _, test := range basic {
@@ -2645,6 +2704,10 @@
"select k, v.Key from BigTable where Type(k) = \"BigData\"",
syncql.NewErrArgMustBeField(db.GetContext(), 41),
},
+ {
+ "select v from Customer where Len(10) = 3",
+ syncql.NewErrFunctionLenInvalidArg(db.GetContext(), 33),
+ },
}
for _, test := range basic {
diff --git a/v23/syncbase/nosql/syncql/syncql.vdl b/v23/syncbase/nosql/syncql/syncql.vdl
index f6267bf..810c3e0 100644
--- a/v23/syncbase/nosql/syncql/syncql.vdl
+++ b/v23/syncbase/nosql/syncql/syncql.vdl
@@ -45,6 +45,12 @@
FunctionArgCount(off int64, name string, expected int64, found int64) {
"en": "[{off}]Function '{name}' expects {expected} args, found: {found}.",
}
+ FunctionTypeInvalidArg(off int64) {
+ "en": "[{off}]Function 'Type()' cannot get type of argument -- expecting object.",
+ }
+ FunctionLenInvalidArg(off int64) {
+ "en": "[{off}]Function 'Len()' expects array, list, set, map, string or nil.",
+ }
FunctionArgBad(off int64, funcName, argName string) {
"en": "[{off}]Function '{funcName}' arg '{argName}' could not be resolved.",
}
diff --git a/v23/syncbase/nosql/syncql/syncql.vdl.go b/v23/syncbase/nosql/syncql/syncql.vdl.go
index 778646d..1d4e2b4 100644
--- a/v23/syncbase/nosql/syncql/syncql.vdl.go
+++ b/v23/syncbase/nosql/syncql/syncql.vdl.go
@@ -28,6 +28,8 @@
ErrExpectedOperand = verror.Register("v.io/syncbase/v23/syncbase/nosql/syncql.ExpectedOperand", verror.NoRetry, "{1:}{2:} [{3}]Expected operand, found {4}.")
ErrExpectedOperator = verror.Register("v.io/syncbase/v23/syncbase/nosql/syncql.ExpectedOperator", verror.NoRetry, "{1:}{2:} [{3}]Expected operator, found {4}.")
ErrFunctionArgCount = verror.Register("v.io/syncbase/v23/syncbase/nosql/syncql.FunctionArgCount", verror.NoRetry, "{1:}{2:} [{3}]Function '{4}' expects {5} args, found: {6}.")
+ ErrFunctionTypeInvalidArg = verror.Register("v.io/syncbase/v23/syncbase/nosql/syncql.FunctionTypeInvalidArg", verror.NoRetry, "{1:}{2:} [{3}]Function 'Type()' cannot get type of argument -- expecting object.")
+ ErrFunctionLenInvalidArg = verror.Register("v.io/syncbase/v23/syncbase/nosql/syncql.FunctionLenInvalidArg", verror.NoRetry, "{1:}{2:} [{3}]Function 'Len()' expects array, list, set, map, string or nil.")
ErrFunctionArgBad = verror.Register("v.io/syncbase/v23/syncbase/nosql/syncql.FunctionArgBad", verror.NoRetry, "{1:}{2:} [{3}]Function '{4}' arg '{5}' could not be resolved.")
ErrFunctionNotFound = verror.Register("v.io/syncbase/v23/syncbase/nosql/syncql.FunctionNotFound", verror.NoRetry, "{1:}{2:} [{3}]Function '{4}' not found.")
ErrArgMustBeField = verror.Register("v.io/syncbase/v23/syncbase/nosql/syncql.ArgMustBeField", verror.NoRetry, "{1:}{2:} [{3}]Argument must be a value field (i.e., must begin with 'v').")
@@ -66,6 +68,8 @@
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrExpectedOperand.ID), "{1:}{2:} [{3}]Expected operand, found {4}.")
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrExpectedOperator.ID), "{1:}{2:} [{3}]Expected operator, found {4}.")
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrFunctionArgCount.ID), "{1:}{2:} [{3}]Function '{4}' expects {5} args, found: {6}.")
+ i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrFunctionTypeInvalidArg.ID), "{1:}{2:} [{3}]Function 'Type()' cannot get type of argument -- expecting object.")
+ i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrFunctionLenInvalidArg.ID), "{1:}{2:} [{3}]Function 'Len()' expects array, list, set, map, string or nil.")
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrFunctionArgBad.ID), "{1:}{2:} [{3}]Function '{4}' arg '{5}' could not be resolved.")
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrFunctionNotFound.ID), "{1:}{2:} [{3}]Function '{4}' not found.")
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrArgMustBeField.ID), "{1:}{2:} [{3}]Argument must be a value field (i.e., must begin with 'v').")
@@ -155,6 +159,16 @@
return verror.New(ErrFunctionArgCount, ctx, off, name, expected, found)
}
+// NewErrFunctionTypeInvalidArg returns an error with the ErrFunctionTypeInvalidArg ID.
+func NewErrFunctionTypeInvalidArg(ctx *context.T, off int64) error {
+ return verror.New(ErrFunctionTypeInvalidArg, ctx, off)
+}
+
+// NewErrFunctionLenInvalidArg returns an error with the ErrFunctionLenInvalidArg ID.
+func NewErrFunctionLenInvalidArg(ctx *context.T, off int64) error {
+ return verror.New(ErrFunctionLenInvalidArg, ctx, off)
+}
+
// NewErrFunctionArgBad returns an error with the ErrFunctionArgBad ID.
func NewErrFunctionArgBad(ctx *context.T, off int64, funcName string, argName string) error {
return verror.New(ErrFunctionArgBad, ctx, off, funcName, argName)
diff --git a/x/ref/services/syncbase/server/interfaces/sync.vdl b/x/ref/services/syncbase/server/interfaces/sync.vdl
index 470d5b9..ee15a16 100644
--- a/x/ref/services/syncbase/server/interfaces/sync.vdl
+++ b/x/ref/services/syncbase/server/interfaces/sync.vdl
@@ -22,7 +22,7 @@
// record. The initiator parses the stream between a Start and a Finish
// record as the response to its DeltaReq, and then moves on to the
// next Database in common with this responder.
- GetDeltas() stream<DeltaReq, DeltaResp> error {access.Read}
+ GetDeltas(initiator string) stream<DeltaReq, DeltaResp> error {access.Read}
// SyncGroup-related methods.
diff --git a/x/ref/services/syncbase/server/interfaces/sync.vdl.go b/x/ref/services/syncbase/server/interfaces/sync.vdl.go
index adb964f..d378170 100644
--- a/x/ref/services/syncbase/server/interfaces/sync.vdl.go
+++ b/x/ref/services/syncbase/server/interfaces/sync.vdl.go
@@ -36,7 +36,7 @@
// record. The initiator parses the stream between a Start and a Finish
// record as the response to its DeltaReq, and then moves on to the
// next Database in common with this responder.
- GetDeltas(*context.T, ...rpc.CallOpt) (SyncGetDeltasClientCall, error)
+ GetDeltas(ctx *context.T, initiator string, opts ...rpc.CallOpt) (SyncGetDeltasClientCall, error)
// PublishSyncGroup is typically invoked on a "central" peer to publish
// the SyncGroup.
PublishSyncGroup(ctx *context.T, sg SyncGroup, opts ...rpc.CallOpt) error
@@ -65,9 +65,9 @@
name string
}
-func (c implSyncClientStub) GetDeltas(ctx *context.T, opts ...rpc.CallOpt) (ocall SyncGetDeltasClientCall, err error) {
+func (c implSyncClientStub) GetDeltas(ctx *context.T, i0 string, opts ...rpc.CallOpt) (ocall SyncGetDeltasClientCall, err error) {
var call rpc.ClientCall
- if call, err = v23.GetClient(ctx).StartCall(ctx, c.name, "GetDeltas", nil, opts...); err != nil {
+ if call, err = v23.GetClient(ctx).StartCall(ctx, c.name, "GetDeltas", []interface{}{i0}, opts...); err != nil {
return
}
ocall = &implSyncGetDeltasClientCall{ClientCall: call}
@@ -207,7 +207,7 @@
// record. The initiator parses the stream between a Start and a Finish
// record as the response to its DeltaReq, and then moves on to the
// next Database in common with this responder.
- GetDeltas(*context.T, SyncGetDeltasServerCall) error
+ GetDeltas(ctx *context.T, call SyncGetDeltasServerCall, initiator string) error
// PublishSyncGroup is typically invoked on a "central" peer to publish
// the SyncGroup.
PublishSyncGroup(ctx *context.T, call rpc.ServerCall, sg SyncGroup) error
@@ -236,7 +236,7 @@
// record. The initiator parses the stream between a Start and a Finish
// record as the response to its DeltaReq, and then moves on to the
// next Database in common with this responder.
- GetDeltas(*context.T, *SyncGetDeltasServerCallStub) error
+ GetDeltas(ctx *context.T, call *SyncGetDeltasServerCallStub, initiator string) error
// PublishSyncGroup is typically invoked on a "central" peer to publish
// the SyncGroup.
PublishSyncGroup(ctx *context.T, call rpc.ServerCall, sg SyncGroup) error
@@ -279,8 +279,8 @@
gs *rpc.GlobState
}
-func (s implSyncServerStub) GetDeltas(ctx *context.T, call *SyncGetDeltasServerCallStub) error {
- return s.impl.GetDeltas(ctx, call)
+func (s implSyncServerStub) GetDeltas(ctx *context.T, call *SyncGetDeltasServerCallStub, i0 string) error {
+ return s.impl.GetDeltas(ctx, call, i0)
}
func (s implSyncServerStub) PublishSyncGroup(ctx *context.T, call rpc.ServerCall, i0 SyncGroup) error {
@@ -315,6 +315,9 @@
{
Name: "GetDeltas",
Doc: "// GetDeltas returns the responder's current generation vector and all\n// the missing log records when compared to the initiator's generation\n// vector. This process happens one Database at a time encompassing all\n// the SyncGroups common to the initiator and the responder. For each\n// Database, the initiator sends a DeltaReq. In response, the\n// responder sends a \"Start\" DeltaResp record, all the missing log\n// records, the responder's genvector, and a \"Finish\" DeltaResp\n// record. The initiator parses the stream between a Start and a Finish\n// record as the response to its DeltaReq, and then moves on to the\n// next Database in common with this responder.",
+ InArgs: []rpc.ArgDesc{
+ {"initiator", ``}, // string
+ },
Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Read"))},
},
{
diff --git a/x/ref/services/syncbase/vsync/initiator.go b/x/ref/services/syncbase/vsync/initiator.go
index e237277..5d3d9b8 100644
--- a/x/ref/services/syncbase/vsync/initiator.go
+++ b/x/ref/services/syncbase/vsync/initiator.go
@@ -300,7 +300,7 @@
for mt := range iSt.mtTables {
absName := naming.Join(mt, iSt.peer, util.SyncbaseSuffix)
c := interfaces.SyncClient(absName)
- stream, err := c.GetDeltas(ctx)
+ stream, err := c.GetDeltas(ctx, iSt.sync.name)
if err == nil {
vlog.VI(3).Infof("sync: connectToPeer: established on %s", absName)
return stream, true
diff --git a/x/ref/services/syncbase/vsync/responder.go b/x/ref/services/syncbase/vsync/responder.go
index 8d5b384..8ba9cd9 100644
--- a/x/ref/services/syncbase/vsync/responder.go
+++ b/x/ref/services/syncbase/vsync/responder.go
@@ -9,6 +9,7 @@
"sort"
"strings"
+ wire "v.io/syncbase/v23/services/syncbase/nosql"
"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
"v.io/syncbase/x/ref/services/syncbase/server/util"
"v.io/syncbase/x/ref/services/syncbase/server/watchable"
@@ -19,9 +20,9 @@
)
// GetDeltas implements the responder side of the GetDeltas RPC.
-func (s *syncService) GetDeltas(ctx *context.T, call interfaces.SyncGetDeltasServerCall) error {
- vlog.VI(2).Infof("sync: GetDeltas: begin")
- defer vlog.VI(2).Infof("sync: GetDeltas: end")
+func (s *syncService) GetDeltas(ctx *context.T, call interfaces.SyncGetDeltasServerCall, initiator string) error {
+ vlog.VI(2).Infof("sync: GetDeltas: begin: from initiator %s", initiator)
+ defer vlog.VI(2).Infof("sync: GetDeltas: end: from initiator %s", initiator)
recvr := call.RecvStream()
for recvr.Advance() {
@@ -31,7 +32,7 @@
// the failure might be genuine. For example, the responder is
// no longer part of the requested SyncGroups, or the app/db is
// locally deleted, or a permission change has denied access.
- rSt := newResponderState(ctx, call, s, req)
+ rSt := newResponderState(ctx, call, s, req, initiator)
rSt.sendDeltasPerDatabase(ctx)
}
@@ -42,17 +43,18 @@
// responderState is state accumulated per Database by the responder during an
// initiation round.
type responderState struct {
- req interfaces.DeltaReq
- call interfaces.SyncGetDeltasServerCall // Stream handle for the GetDeltas RPC.
- errState error // Captures the error from the first two phases of the responder.
- sync *syncService
- st store.Store // Store handle to the Database.
- diff genRangeVector
- outVec interfaces.GenVector
+ req interfaces.DeltaReq
+ call interfaces.SyncGetDeltasServerCall // Stream handle for the GetDeltas RPC.
+ initiator string
+ errState error // Captures the error from the first two phases of the responder.
+ sync *syncService
+ st store.Store // Store handle to the Database.
+ diff genRangeVector
+ outVec interfaces.GenVector
}
-func newResponderState(ctx *context.T, call interfaces.SyncGetDeltasServerCall, sync *syncService, req interfaces.DeltaReq) *responderState {
- rSt := &responderState{call: call, sync: sync, req: req}
+func newResponderState(ctx *context.T, call interfaces.SyncGetDeltasServerCall, sync *syncService, req interfaces.DeltaReq, initiator string) *responderState {
+ rSt := &responderState{call: call, sync: sync, req: req, initiator: initiator}
return rSt
}
@@ -138,6 +140,12 @@
for _, p := range sg.Spec.Prefixes {
allowedPfxs[p] = struct{}{}
}
+
+ // Add the initiator to the SyncGroup membership if not already
+ // in it. It is a temporary solution until SyncGroup metadata
+ // is synchronized peer to peer.
+ // TODO(rdaoud): remove this when SyncGroups are synced.
+ rSt.addInitiatorToSyncGroup(ctx, sgid)
}
// Filter the initiator's prefixes to what is allowed.
@@ -159,6 +167,39 @@
return
}
+// addInitiatorToSyncGroup adds the request initiator to the membership of the
+// given SyncGroup if the initiator is not already a member. It is a temporary
+// solution until SyncGroup metadata starts being synchronized, at which time
+// peers will learn of new members through mutations of the SyncGroup metadata
+// by the SyncGroup administrators.
+// Note: the joiner metadata is fake because the responder does not have it.
+func (rSt *responderState) addInitiatorToSyncGroup(ctx *context.T, gid interfaces.GroupId) {
+ if rSt.initiator == "" {
+ return
+ }
+
+ err := store.RunInTransaction(rSt.st, func(tx store.StoreReadWriter) error {
+ sg, err := getSyncGroupById(ctx, tx, gid)
+ if err != nil {
+ return err
+ }
+
+ // If the initiator is already a member of the SyncGroup abort
+ // the transaction with a special error code.
+ if _, ok := sg.Joiners[rSt.initiator]; ok {
+ return verror.New(verror.ErrExist, ctx, "member already in SyncGroup")
+ }
+
+ vlog.VI(4).Infof("sync: addInitiatorToSyncGroup: add %s to sgid %d", rSt.initiator, gid)
+ sg.Joiners[rSt.initiator] = wire.SyncGroupMemberInfo{SyncPriority: 1}
+ return setSGDataEntry(ctx, tx, gid, sg)
+ })
+
+ if err != nil && verror.ErrorID(err) != verror.ErrExist.ID {
+ vlog.Errorf("sync: addInitiatorToSyncGroup: initiator %s, sgid %d: %v", rSt.initiator, gid, err)
+ }
+}
+
// computeDeltaBound computes the bound on missing generations across all
// requested prefixes (phase 2 of sendDeltas).
func (rSt *responderState) computeDeltaBound(ctx *context.T) {
diff --git a/x/ref/services/syncbase/vsync/responder_test.go b/x/ref/services/syncbase/vsync/responder_test.go
index cf67ca8..986f9b3 100644
--- a/x/ref/services/syncbase/vsync/responder_test.go
+++ b/x/ref/services/syncbase/vsync/responder_test.go
@@ -111,7 +111,7 @@
for _, test := range tests {
want := test.genDiffWant
got := test.genDiffIn
- rSt := newResponderState(nil, nil, s, interfaces.DeltaReq{})
+ rSt := newResponderState(nil, nil, s, interfaces.DeltaReq{}, "fakeInitiator")
rSt.diff = got
rSt.diffPrefixGenVectors(test.respPVec, test.initPVec)
checkEqualDevRanges(t, got, want)
@@ -353,7 +353,7 @@
s.syncState[appDbName(appName, dbName)] = &dbSyncStateInMem{gen: test.respGen, checkptGen: test.respGen, genvec: test.respVec}
req := interfaces.DeltaReq{AppName: appName, DbName: dbName, InitVec: test.initVec}
- rSt := newResponderState(nil, nil, s, req)
+ rSt := newResponderState(nil, nil, s, req, "fakeInitiator")
rSt.computeDeltaBound(nil)
if rSt.errState != nil || !reflect.DeepEqual(rSt.outVec, wantVec) {
diff --git a/x/ref/services/syncbase/vsync/syncgroup.go b/x/ref/services/syncbase/vsync/syncgroup.go
index 2f341e4..bf4e3aa 100644
--- a/x/ref/services/syncbase/vsync/syncgroup.go
+++ b/x/ref/services/syncbase/vsync/syncgroup.go
@@ -618,16 +618,15 @@
return err
}
- gid, err := getSyncGroupId(ctx, tx, sgName)
+ sg, err := getSyncGroupByName(ctx, tx, sgName)
if err != nil {
return err
}
- sg, err := getSyncGroupById(ctx, tx, gid)
// TODO(hpucha): Check SyncGroup ACL. Perform version checking.
sg.Spec = spec
- return setSGDataEntry(ctx, tx, gid, sg)
+ return setSGDataEntry(ctx, tx, sg.Id, sg)
})
return err
}