Initial Store API
Many changes in this revision. A few things still broken, mostly from the same vom/ipc error, but I would rather work on that after/in parallel with the review.
- Updated to match the go vstore API.
- JS RPC args are now transcoded. Types still need to be registered. They also need to be registered for the other direction. (In WSPR).
- Tests exist for the store and the stored process is started automatically.
- Fixed bug in VDL that did not set the type of struct fields
- Fixed bug in VOM that didn't set the elem field on byte slices (only seems to happen when they come from the signature)
- Changed a number of things in the transcoder, including enabling the ability to convert JSON strings to byte slices.
Change-Id: Ia1ba56518ab22a078ef8b711043263b098ec5f4c
diff --git a/examples/rockpaperscissors/service.vdl.go b/examples/rockpaperscissors/service.vdl.go
index 2e23af2..1d9354d 100644
--- a/examples/rockpaperscissors/service.vdl.go
+++ b/examples/rockpaperscissors/service.vdl.go
@@ -958,12 +958,13 @@
}
d = wt
case _gen_wiretype.StructType:
- for _, fld := range wt.Fields {
+ for i, fld := range wt.Fields {
if fld.Type >= _gen_wiretype.TypeIDFirst {
- fld.Type += _gen_wiretype.TypeID(firstAdded)
+ wt.Fields[i].Type += _gen_wiretype.TypeID(firstAdded)
}
}
d = wt
+ // NOTE: other types are missing, but we are upgrading anyways.
}
result.TypeDefs = append(result.TypeDefs, d)
}
@@ -1010,12 +1011,13 @@
}
d = wt
case _gen_wiretype.StructType:
- for _, fld := range wt.Fields {
+ for i, fld := range wt.Fields {
if fld.Type >= _gen_wiretype.TypeIDFirst {
- fld.Type += _gen_wiretype.TypeID(firstAdded)
+ wt.Fields[i].Type += _gen_wiretype.TypeID(firstAdded)
}
}
d = wt
+ // NOTE: other types are missing, but we are upgrading anyways.
}
result.TypeDefs = append(result.TypeDefs, d)
}
@@ -1062,12 +1064,13 @@
}
d = wt
case _gen_wiretype.StructType:
- for _, fld := range wt.Fields {
+ for i, fld := range wt.Fields {
if fld.Type >= _gen_wiretype.TypeIDFirst {
- fld.Type += _gen_wiretype.TypeID(firstAdded)
+ wt.Fields[i].Type += _gen_wiretype.TypeID(firstAdded)
}
}
d = wt
+ // NOTE: other types are missing, but we are upgrading anyways.
}
result.TypeDefs = append(result.TypeDefs, d)
}
diff --git a/runtimes/google/ipc/client.go b/runtimes/google/ipc/client.go
index f758190..baedb01 100644
--- a/runtimes/google/ipc/client.go
+++ b/runtimes/google/ipc/client.go
@@ -284,7 +284,7 @@
// The empty request header indicates what follows is a streaming arg.
if err := fc.enc.Encode(ipc.Request{}); err != nil {
- return fc.close(verror.BadProtocolf("ipc: streaming request encoding failed: %v", err))
+ return fc.close(verror.BadProtocolf("ipc: streaming request header encoding failed: %v", err))
}
if err := fc.enc.Encode(item); err != nil {
return fc.close(verror.BadProtocolf("ipc: streaming arg encoding failed: %v", err))
@@ -302,7 +302,7 @@
// Decode the response header and handle errors and EOF.
if err := fc.dec.Decode(&fc.response); err != nil {
- return fc.close(verror.BadProtocolf("ipc: response decoding failed: %v", err))
+ return fc.close(verror.BadProtocolf("ipc: response header decoding failed: %v", err))
}
if fc.response.Error != nil {
return fc.response.Error
@@ -363,7 +363,7 @@
// Decode the response header, if it hasn't already been decoded by Recv.
if fc.response.Error == nil && !fc.response.EndStreamResults {
if err := fc.dec.Decode(&fc.response); err != nil {
- return fc.close(verror.BadProtocolf("ipc: response decoding failed: %v", err))
+ return fc.close(verror.BadProtocolf("ipc: response header decoding failed: %v", err))
}
// The response header must indicate the streaming results have ended.
if fc.response.Error == nil && !fc.response.EndStreamResults {
diff --git a/services/mgmt/application/application.vdl.go b/services/mgmt/application/application.vdl.go
index 0d5b605..1be1d59 100644
--- a/services/mgmt/application/application.vdl.go
+++ b/services/mgmt/application/application.vdl.go
@@ -294,12 +294,13 @@
}
d = wt
case _gen_wiretype.StructType:
- for _, fld := range wt.Fields {
+ for i, fld := range wt.Fields {
if fld.Type >= _gen_wiretype.TypeIDFirst {
- fld.Type += _gen_wiretype.TypeID(firstAdded)
+ wt.Fields[i].Type += _gen_wiretype.TypeID(firstAdded)
}
}
d = wt
+ // NOTE: other types are missing, but we are upgrading anyways.
}
result.TypeDefs = append(result.TypeDefs, d)
}
diff --git a/services/mgmt/profile/profile.vdl.go b/services/mgmt/profile/profile.vdl.go
index fc8af95..8f460f0 100644
--- a/services/mgmt/profile/profile.vdl.go
+++ b/services/mgmt/profile/profile.vdl.go
@@ -333,12 +333,13 @@
}
d = wt
case _gen_wiretype.StructType:
- for _, fld := range wt.Fields {
+ for i, fld := range wt.Fields {
if fld.Type >= _gen_wiretype.TypeIDFirst {
- fld.Type += _gen_wiretype.TypeID(firstAdded)
+ wt.Fields[i].Type += _gen_wiretype.TypeID(firstAdded)
}
}
d = wt
+ // NOTE: other types are missing, but we are upgrading anyways.
}
result.TypeDefs = append(result.TypeDefs, d)
}
diff --git a/services/store/raw/service.vdl.go b/services/store/raw/service.vdl.go
index ea7bb7b..2bed1f9 100644
--- a/services/store/raw/service.vdl.go
+++ b/services/store/raw/service.vdl.go
@@ -341,12 +341,13 @@
}
d = wt
case _gen_wiretype.StructType:
- for _, fld := range wt.Fields {
+ for i, fld := range wt.Fields {
if fld.Type >= _gen_wiretype.TypeIDFirst {
- fld.Type += _gen_wiretype.TypeID(firstAdded)
+ wt.Fields[i].Type += _gen_wiretype.TypeID(firstAdded)
}
}
d = wt
+ // NOTE: other types are missing, but we are upgrading anyways.
}
result.TypeDefs = append(result.TypeDefs, d)
}
diff --git a/services/wspr/wsprd/lib/wspr.go b/services/wspr/wsprd/lib/wspr.go
index f720f51..6fc0764 100644
--- a/services/wspr/wsprd/lib/wspr.go
+++ b/services/wspr/wsprd/lib/wspr.go
@@ -38,6 +38,8 @@
"veyron2/verror"
"veyron2/vlog"
"veyron2/vom"
+ vom_wiretype "veyron2/vom/wiretype"
+ wiretype_build "veyron2/wiretype/build"
"github.com/gorilla/websocket"
)
@@ -111,6 +113,16 @@
Type websocketMessageType
}
+// Temporary holder of RPC so that we can store the unprocessed args.
+type veyronTempRPC struct {
+ Name string
+ Method string
+ PrivateId string // base64(vom(security.PrivateID))
+ InArgs []json.RawMessage
+ NumOutArgs int32
+ IsStreaming bool
+}
+
type veyronRPC struct {
Name string
Method string
@@ -219,17 +231,6 @@
}
}
-func (ctx WSPR) parseVeyronRequest(r io.Reader) (*veyronRPC, error) {
- var msg veyronRPC
- decoder := json.NewDecoder(r)
- if err := decoder.Decode(&msg); err != nil {
- return nil, fmt.Errorf("can't unmarshall JSONMessage: %v", err)
- }
-
- ctx.logger.VI(2).Infof("VeyronRPC: %s.%s(id=%v, ..., streaming=%v)", msg.Name, msg.Method, len(msg.PrivateId) > 0, msg.IsStreaming)
- return &msg, nil
-}
-
func (ctx WSPR) newClient(privateId string) (ipc.Client, error) {
id := decodeIdentity(ctx.logger, privateId)
client := ctx.clientCache.Get(id)
@@ -255,7 +256,7 @@
clientCall, err := client.StartCall(ctx.rt.TODOContext(), msg.Name, uppercaseFirstCharacter(msg.Method), msg.InArgs)
if err != nil {
- return nil, fmt.Errorf("error starting call: %v", err)
+ return nil, fmt.Errorf("error starting call (name: %v, method: %v, args: %v): %v", msg.Name, uppercaseFirstCharacter(msg.Method), msg.InArgs, err)
}
return clientCall, nil
@@ -397,7 +398,7 @@
}
func (wsp *websocketPipe) setup() {
- wsp.ctx.logger.Info("identity is", wsp.ctx.rt.Identity())
+ wsp.ctx.logger.Info("identity is ", wsp.ctx.rt.Identity())
wsp.signatureManager = newSignatureManager()
wsp.outstandingStreams = make(map[int64]sender)
wsp.flowMap = make(map[int64]*server)
@@ -514,8 +515,7 @@
// handleVeyronRequest starts a veyron rpc and returns before the rpc has been completed.
func (wsp *websocketPipe) handleVeyronRequest(id int64, data string, w *websocketWriter) {
- veyronMsg, err := wsp.ctx.parseVeyronRequest(bytes.NewBufferString(data))
-
+ veyronMsg, err := wsp.parseVeyronRequest(bytes.NewBufferString(data))
if err != nil {
w.sendError(verror.Internalf("can't parse Veyron Request: %v", err))
return
@@ -686,6 +686,63 @@
server.handleServerResponse(id, data)
}
+// parseVeyronRequest parses a json rpc request into a veyronRPC object.
+func (wsp *websocketPipe) parseVeyronRequest(r io.Reader) (*veyronRPC, error) {
+ var tempMsg veyronTempRPC
+ decoder := json.NewDecoder(r)
+ if err := decoder.Decode(&tempMsg); err != nil {
+ return nil, fmt.Errorf("can't unmarshall JSONMessage: %v", err)
+ }
+
+ client, err := wsp.ctx.newClient(tempMsg.PrivateId)
+ if err != nil {
+ return nil, verror.Internalf("error creating client: %v", err)
+ }
+
+ // Fetch and adapt signature from the SignatureManager
+ ctx := wsp.ctx.rt.TODOContext()
+ sig, err := wsp.signatureManager.signature(ctx, tempMsg.Name, client)
+ if err != nil {
+ return nil, verror.Internalf("error getting service signature for %s: %v", tempMsg.Name, err)
+ }
+
+ methName := uppercaseFirstCharacter(tempMsg.Method)
+ methSig, ok := sig.Methods[methName]
+ if !ok {
+ return nil, fmt.Errorf("Method not found in signature: %v (full sig: %v)", methName, sig)
+ }
+
+ var msg veyronRPC
+ if len(methSig.InArgs) != len(tempMsg.InArgs) {
+ return nil, fmt.Errorf("invalid number of arguments: %v vs. %v", methSig, tempMsg)
+ }
+ msg.InArgs = make([]interface{}, len(tempMsg.InArgs))
+ td := wiretype_build.TypeDefs(sig.TypeDefs)
+
+ for i := 0; i < len(tempMsg.InArgs); i++ {
+ argTypeId := methSig.InArgs[i].Type
+ argType := vom_wiretype.Type{
+ ID: argTypeId,
+ Defs: &td,
+ }
+
+ val, err := vom.JSONToObject(string(tempMsg.InArgs[i]), argType)
+ if err != nil {
+ return nil, fmt.Errorf("error while converting json to object for arg %d (%s): %v", i, methSig.InArgs[i].Name, err)
+ }
+ msg.InArgs[i] = val
+ }
+
+ msg.Name = tempMsg.Name
+ msg.Method = tempMsg.Method
+ msg.PrivateId = tempMsg.PrivateId
+ msg.NumOutArgs = tempMsg.NumOutArgs
+ msg.IsStreaming = tempMsg.IsStreaming
+
+ wsp.ctx.logger.VI(2).Infof("VeyronRPC: %s.%s(id=%v, ..., streaming=%v)", msg.Name, msg.Method, len(msg.PrivateId) > 0, msg.IsStreaming)
+ return &msg, nil
+}
+
type signatureRequest struct {
Name string
PrivateId string