WSPR VOM2 + Signature
All tests pass.
MultiPart: 3/4
Change-Id: I34647d2df9e876de084d20eaf09ebb14cf3bd1f3
diff --git a/services/wsprd/app/app.go b/services/wsprd/app/app.go
index 3322afb..44c58ff 100644
--- a/services/wsprd/app/app.go
+++ b/services/wsprd/app/app.go
@@ -3,8 +3,6 @@
package app
import (
- "bytes"
- "encoding/hex"
"encoding/json"
"flag"
"fmt"
@@ -19,17 +17,13 @@
"veyron.io/veyron/veyron2/options"
"veyron.io/veyron/veyron2/rt"
"veyron.io/veyron/veyron2/security"
+ "veyron.io/veyron/veyron2/vdl/vdlroot/src/signature"
"veyron.io/veyron/veyron2/verror2"
"veyron.io/veyron/veyron2/vlog"
- "veyron.io/veyron/veyron2/vom"
- vom_wiretype "veyron.io/veyron/veyron2/vom/wiretype"
- "veyron.io/veyron/veyron2/vom2"
- wiretype_build "veyron.io/veyron/veyron2/wiretype/build"
"veyron.io/wspr/veyron/services/wsprd/ipc/server"
"veyron.io/wspr/veyron/services/wsprd/lib"
"veyron.io/wspr/veyron/services/wsprd/namespace"
"veyron.io/wspr/veyron/services/wsprd/principal"
- "veyron.io/wspr/veyron/services/wsprd/signature"
)
// pkgPath is the prefix os errors in this package.
@@ -39,7 +33,6 @@
var (
marshallingError = verror2.Register(pkgPath+".marshallingError", verror2.NoRetry, "{1} {2} marshalling error {_}")
noResults = verror2.Register(pkgPath+".noResults", verror2.NoRetry, "{1} {2} no results from call {_}")
- signatureError = verror2.Register(pkgPath+".signatureError", verror2.NoRetry, "{1} {2} signature error {_}")
badCaveatType = verror2.Register(pkgPath+".badCaveatType", verror2.NoRetry, "{1} {2} bad caveat type {_}")
unknownBlessings = verror2.Register(pkgPath+".unknownBlessings", verror2.NoRetry, "{1} {2} unknown public id {_}")
invalidBlessingsHandle = verror2.Register(pkgPath+".invalidBlessingsHandle", verror2.NoRetry, "{1} {2} invalid blessings handle {_}")
@@ -57,22 +50,13 @@
retryTimeout = flag.Int("retry-timeout", 2, "Duration in seconds to retry starting an RPC call. 0 means never retry.")
}
-// Temporary holder of RPC so that we can store the unprocessed args.
-type veyronTempRPC struct {
- Name string
- Method string
- InArgs []json.RawMessage
- NumOutArgs int32
- IsStreaming bool
- Timeout int64
-}
-
-type veyronRPC struct {
+type VeyronRPC struct {
Name string
Method string
InArgs []interface{}
NumOutArgs int32
IsStreaming bool
+ Timeout int64
}
type serveRequest struct {
@@ -182,7 +166,7 @@
}
// finishCall waits for the call to finish and write out the response to w.
-func (c *Controller) finishCall(ctx context.T, w lib.ClientWriter, clientCall ipc.Call, msg *veyronRPC) {
+func (c *Controller) finishCall(ctx context.T, w lib.ClientWriter, clientCall ipc.Call, msg *VeyronRPC) {
if msg.IsStreaming {
for {
var item interface{}
@@ -193,24 +177,15 @@
w.Error(err) // Send streaming error as is
return
}
-
- var buf bytes.Buffer
- encoder, err := vom2.NewBinaryEncoder(&buf)
+ vomItem, err := lib.VomEncode(item)
if err != nil {
- w.Error(verror2.Make(marshallingError, ctx, item))
+ w.Error(verror2.Make(marshallingError, ctx, item, err))
continue
}
-
- if err := encoder.Encode(item); err != nil {
- w.Error(verror2.Make(marshallingError, ctx, item))
- continue
- }
-
- if err := w.Send(lib.ResponseStream, hex.EncodeToString(buf.Bytes())); err != nil {
+ if err := w.Send(lib.ResponseStream, vomItem); err != nil {
w.Error(verror2.Make(marshallingError, ctx, item))
}
}
-
if err := w.Send(lib.ResponseStreamClose, nil); err != nil {
w.Error(verror2.Make(marshallingError, ctx, "ResponseStreamClose"))
}
@@ -227,36 +202,29 @@
w.Error(err)
return
}
+
// for now we assume last out argument is always error
if len(results) < 1 {
w.Error(verror2.Make(noResults, ctx))
return
}
-
if err, ok := results[len(results)-1].(error); ok {
// return the call Application error as is
w.Error(err)
return
}
- var buf bytes.Buffer
- encoder, err := vom2.NewBinaryEncoder(&buf)
+ vomResults, err := lib.VomEncode(results[:len(results)-1])
if err != nil {
w.Error(err)
return
}
-
- if err := encoder.Encode(results[:len(results)-1]); err != nil {
- w.Error(err)
- return
- }
-
- if err := w.Send(lib.ResponseFinal, hex.EncodeToString(buf.Bytes())); err != nil {
+ if err := w.Send(lib.ResponseFinal, vomResults); err != nil {
w.Error(verror2.Convert(marshallingError, ctx, err))
}
}
-func (c *Controller) startCall(ctx context.T, w lib.ClientWriter, msg *veyronRPC) (ipc.Call, error) {
+func (c *Controller) startCall(ctx context.T, w lib.ClientWriter, msg *VeyronRPC) (ipc.Call, error) {
if c.client == nil {
return nil, verror2.Make(verror2.BadArg, ctx, "app.Controller.client")
}
@@ -281,7 +249,7 @@
c.lastGeneratedId += 2
c.flowMap[id] = s
os := newStream()
- os.init(stream, vom_wiretype.Type{ID: 1})
+ os.init(stream)
c.outstandingRequests[id] = &outstandingRequest{
stream: os,
}
@@ -364,67 +332,36 @@
// SendVeyronRequest makes a veyron request for the given flowId. If signal is non-nil, it will receive
// the call object after it has been constructed.
-func (c *Controller) sendVeyronRequest(ctx context.T, id int64, tempMsg *veyronTempRPC, w lib.ClientWriter, stream *outstandingStream) {
- // Fetch and adapt signature from the SignatureManager
- retryTimeoutOpt := options.RetryTimeout(time.Duration(*retryTimeout) * time.Second)
- sig, err := c.signatureManager.Signature(ctx, tempMsg.Name, c.client, retryTimeoutOpt)
+func (c *Controller) sendVeyronRequest(ctx context.T, id int64, msg *VeyronRPC, w lib.ClientWriter, stream *outstandingStream) {
+ sig, err := c.getSignature(ctx, msg.Name)
if err != nil {
- w.Error(verror2.Make(signatureError, ctx, tempMsg.Name, err))
+ w.Error(err)
return
}
- methName := lib.UppercaseFirstCharacter(tempMsg.Method)
- methSig, ok := sig.Methods[methName]
+ methName := lib.UppercaseFirstCharacter(msg.Method)
+ methSig, ok := signature.FirstMethod(sig, methName)
if !ok {
- w.Error(fmt.Errorf("method not found in signature: %v (full sig: %v)", methName, sig))
+ w.Error(fmt.Errorf("method %q not found in signature: %#v", methName, sig))
return
}
-
- var msg veyronRPC
- if len(methSig.InArgs) != len(tempMsg.InArgs) {
- w.Error(fmt.Errorf("invalid number of arguments, expected: %v, got:%v", methSig, tempMsg))
+ if len(methSig.InArgs) != len(msg.InArgs) {
+ w.Error(fmt.Errorf("invalid number of arguments, expected: %v, got:%v", methSig, *msg))
return
}
- msg.InArgs = make([]interface{}, len(tempMsg.InArgs))
- td := wiretype_build.TypeDefs(sig.TypeDefs)
-
- for i := 0; i < len(tempMsg.InArgs); i++ {
- argTypeId := methSig.InArgs[i].Type
- argType := vom_wiretype.Type{
- ID: argTypeId,
- Defs: &td,
- }
-
- val, err := vom.JSONToObject(string(tempMsg.InArgs[i]), argType)
- if err != nil {
- w.Error(fmt.Errorf("error while converting json to object for arg %d (%s): %v", i, methSig.InArgs[i].Name, err))
- return
- }
- msg.InArgs[i] = val
- }
-
- msg.Name = tempMsg.Name
- msg.Method = tempMsg.Method
- msg.NumOutArgs = tempMsg.NumOutArgs
- msg.IsStreaming = tempMsg.IsStreaming
-
- inStreamType := vom_wiretype.Type{
- ID: methSig.InStream,
- Defs: &td,
- }
// We have to make the start call synchronous so we can make sure that we populate
// the call map before we can Handle a recieve call.
- call, err := c.startCall(ctx, w, &msg)
+ call, err := c.startCall(ctx, w, msg)
if err != nil {
w.Error(verror2.Convert(verror2.Internal, ctx, err))
return
}
if stream != nil {
- stream.init(call, inStreamType)
+ stream.init(call)
}
- c.finishCall(ctx, w, call, &msg)
+ c.finishCall(ctx, w, call, msg)
c.Lock()
if request, ok := c.outstandingRequests[id]; ok {
delete(c.outstandingRequests, id)
@@ -437,7 +374,7 @@
// HandleVeyronRequest starts a veyron rpc and returns before the rpc has been completed.
func (c *Controller) HandleVeyronRequest(ctx context.T, id int64, data string, w lib.ClientWriter) {
- veyronTempMsg, err := c.parseVeyronRequest(ctx, bytes.NewBufferString(data))
+ msg, err := c.parseVeyronRequest(data)
if err != nil {
w.Error(verror2.Convert(verror2.Internal, ctx, err))
return
@@ -449,16 +386,16 @@
// TODO(mattr): To be consistent with go, we should not ignore 0 timeouts.
// However as a rollout strategy we must, otherwise there is a circular
// dependency between the WSPR change and the JS change that will follow.
- if veyronTempMsg.Timeout == lib.JSIPCNoTimeout || veyronTempMsg.Timeout == 0 {
+ if msg.Timeout == lib.JSIPCNoTimeout || msg.Timeout == 0 {
cctx, cancel = ctx.WithCancel()
} else {
- cctx, cancel = ctx.WithTimeout(lib.JSToGoDuration(veyronTempMsg.Timeout))
+ cctx, cancel = ctx.WithTimeout(lib.JSToGoDuration(msg.Timeout))
}
request := &outstandingRequest{
cancel: cancel,
}
- if veyronTempMsg.IsStreaming {
+ if msg.IsStreaming {
// If this rpc is streaming, we would expect that the client would try to send
// on this stream. Since the initial handshake is done asynchronously, we have
// to put the outstanding stream in the map before we make the async call so that
@@ -468,7 +405,7 @@
}
c.Lock()
c.outstandingRequests[id] = request
- go c.sendVeyronRequest(cctx, id, veyronTempMsg, w, request.stream)
+ go c.sendVeyronRequest(cctx, id, msg, w, request.stream)
c.Unlock()
}
@@ -543,7 +480,7 @@
// HandleServeRequest takes a request to serve a server, creates a server,
// registers the provided services and sends true if everything succeeded.
func (c *Controller) HandleServeRequest(data string, w lib.ClientWriter) {
- // Decode the serve request which includes IDL, registered services and name
+ // Decode the serve request which includes VDL, registered services and name
var serveRequest serveRequest
if err := json.Unmarshal([]byte(data), &serveRequest); err != nil {
w.Error(verror2.Convert(verror2.Internal, nil, err))
@@ -673,30 +610,23 @@
server.HandleServerResponse(id, data)
}
-// parseVeyronRequest parses a json rpc request into a veyronRPC object.
-func (c *Controller) parseVeyronRequest(ctx context.T, r io.Reader) (*veyronTempRPC, error) {
- var tempMsg veyronTempRPC
- decoder := json.NewDecoder(r)
- if err := decoder.Decode(&tempMsg); err != nil {
- return nil, fmt.Errorf("can't unmarshall JSONMessage: %v", err)
+// parseVeyronRequest parses a json rpc request into a VeyronRPC object.
+func (c *Controller) parseVeyronRequest(data string) (*VeyronRPC, error) {
+ var msg VeyronRPC
+ if err := lib.VomDecode(data, &msg); err != nil {
+ return nil, err
}
- c.logger.VI(2).Infof("VeyronRPC: %s.%s(..., streaming=%v)", tempMsg.Name, tempMsg.Method, tempMsg.IsStreaming)
- return &tempMsg, nil
+ c.logger.VI(2).Infof("VeyronRPC: %s.%s(..., streaming=%v)", msg.Name, msg.Method, msg.IsStreaming)
+ return &msg, nil
}
type signatureRequest struct {
Name string
}
-func (c *Controller) getSignature(ctx context.T, name string) (signature.JSONServiceSignature, error) {
- // Fetch and adapt signature from the SignatureManager
+func (c *Controller) getSignature(ctx context.T, name string) ([]signature.Interface, error) {
retryTimeoutOpt := options.RetryTimeout(time.Duration(*retryTimeout) * time.Second)
- sig, err := c.signatureManager.Signature(ctx, name, c.client, retryTimeoutOpt)
- if err != nil {
- return nil, verror2.Convert(verror2.Internal, ctx, err)
- }
-
- return signature.NewJSONServiceSignature(*sig), nil
+ return c.signatureManager.Signature(ctx, name, c.client, retryTimeoutOpt)
}
// HandleSignatureRequest uses signature manager to get and cache signature of a remote server
@@ -709,26 +639,18 @@
}
c.logger.VI(2).Infof("requesting Signature for %q", request.Name)
- jsSig, err := c.getSignature(ctx, request.Name)
- if err != nil {
- w.Error(verror2.Convert(verror2.Internal, ctx, err))
- return
- }
-
- var buf bytes.Buffer
- encoder, err := vom2.NewBinaryEncoder(&buf)
+ sig, err := c.getSignature(ctx, request.Name)
if err != nil {
w.Error(err)
return
}
-
- if err := encoder.Encode(jsSig); err != nil {
+ vomSig, err := lib.VomEncode(sig)
+ if err != nil {
w.Error(err)
return
}
-
// Send the signature back
- if err := w.Send(lib.ResponseFinal, hex.EncodeToString(buf.Bytes())); err != nil {
+ if err := w.Send(lib.ResponseFinal, vomSig); err != nil {
w.Error(verror2.Convert(verror2.Internal, ctx, err))
return
}
diff --git a/services/wsprd/app/app_test.go b/services/wsprd/app/app_test.go
index 80a6072..fcf6a91 100644
--- a/services/wsprd/app/app_test.go
+++ b/services/wsprd/app/app_test.go
@@ -12,14 +12,11 @@
"veyron.io/veyron/veyron2/options"
"veyron.io/veyron/veyron2/rt"
"veyron.io/veyron/veyron2/security"
- "veyron.io/veyron/veyron2/vdl/vdlutil"
+ "veyron.io/veyron/veyron2/vdl"
+ "veyron.io/veyron/veyron2/vdl/vdlroot/src/signature"
"veyron.io/veyron/veyron2/verror2"
- "veyron.io/veyron/veyron2/vom"
- vom_wiretype "veyron.io/veyron/veyron2/vom/wiretype"
- "veyron.io/veyron/veyron2/wiretype"
"veyron.io/wspr/veyron/services/wsprd/lib"
"veyron.io/wspr/veyron/services/wsprd/lib/testwriter"
- "veyron.io/wspr/veyron/services/wsprd/signature"
tsecurity "veyron.io/veyron/veyron/lib/testutil/security"
"veyron.io/veyron/veyron/profiles"
@@ -75,11 +72,11 @@
type simpleAdder struct{}
-func (s simpleAdder) Add(_ ipc.ServerCall, a, b int32) (int32, error) {
+func (s simpleAdder) Add(_ ipc.ServerContext, a, b int32) (int32, error) {
return a + b, nil
}
-func (s simpleAdder) Divide(_ ipc.ServerCall, a, b int32) (int32, error) {
+func (s simpleAdder) Divide(_ ipc.ServerContext, a, b int32) (int32, error) {
if b == 0 {
return 0, verror2.Make(verror2.BadArg, nil, "div 0")
}
@@ -96,43 +93,28 @@
return total, nil
}
-func (s simpleAdder) Signature(call ipc.ServerCall) (ipc.ServiceSignature, error) {
- result := ipc.ServiceSignature{Methods: make(map[string]ipc.MethodSignature)}
- result.Methods["Add"] = ipc.MethodSignature{
- InArgs: []ipc.MethodArgument{
- {Name: "A", Type: 36},
- {Name: "B", Type: 36},
+var simpleAddrSig = []signature.Interface{
+ {
+ Doc: "The empty interface contains methods not attached to any interface.",
+ Methods: []signature.Method{
+ {
+ Name: "Add",
+ InArgs: []signature.Arg{{Type: vdl.Int32Type}, {Type: vdl.Int32Type}},
+ OutArgs: []signature.Arg{{Type: vdl.Int32Type}, {Type: vdl.ErrorType}},
+ },
+ {
+ Name: "Divide",
+ InArgs: []signature.Arg{{Type: vdl.Int32Type}, {Type: vdl.Int32Type}},
+ OutArgs: []signature.Arg{{Type: vdl.Int32Type}, {Type: vdl.ErrorType}},
+ },
+ {
+ Name: "StreamingAdd",
+ OutArgs: []signature.Arg{{Type: vdl.Int32Type}, {Type: vdl.ErrorType}},
+ InStream: &signature.Arg{Type: vdl.AnyType},
+ OutStream: &signature.Arg{Type: vdl.AnyType},
+ },
},
- OutArgs: []ipc.MethodArgument{
- {Name: "Value", Type: 36},
- {Name: "Err", Type: 65},
- },
- }
-
- result.Methods["Divide"] = ipc.MethodSignature{
- InArgs: []ipc.MethodArgument{
- {Name: "A", Type: 36},
- {Name: "B", Type: 36},
- },
- OutArgs: []ipc.MethodArgument{
- {Name: "Value", Type: 36},
- {Name: "Err", Type: 65},
- },
- }
-
- result.Methods["StreamingAdd"] = ipc.MethodSignature{
- InArgs: []ipc.MethodArgument{},
- OutArgs: []ipc.MethodArgument{
- {Name: "Value", Type: 36},
- {Name: "Err", Type: 65},
- },
- InStream: 36,
- OutStream: 36,
- }
- result.TypeDefs = []vdlutil.Any{
- wiretype.NamedPrimitiveType{Type: 0x1, Name: "error", Tags: []string(nil)}}
-
- return result, nil
+ },
}
func startAnyServer(servesMT bool, dispatcher ipc.Dispatcher) (ipc.Server, naming.Endpoint, error) {
@@ -173,24 +155,6 @@
return startAnyServer(true, mt)
}
-var adderServiceSignature signature.JSONServiceSignature = signature.JSONServiceSignature{
- "add": signature.JSONMethodSignature{
- InArgs: []string{"A", "B"},
- NumOutArgs: 2,
- IsStreaming: false,
- },
- "divide": signature.JSONMethodSignature{
- InArgs: []string{"A", "B"},
- NumOutArgs: 2,
- IsStreaming: false,
- },
- "streamingAdd": signature.JSONMethodSignature{
- InArgs: []string{},
- NumOutArgs: 2,
- IsStreaming: true,
- },
-}
-
func TestGetGoServerSignature(t *testing.T) {
s, endpoint, err := startAdderServer()
if err != nil {
@@ -206,24 +170,22 @@
if err != nil {
t.Fatalf("Failed to create controller: %v", err)
}
- jsSig, err := controller.getSignature(r.NewContext(), "/"+endpoint.String())
+ sig, err := controller.getSignature(r.NewContext(), "/"+endpoint.String())
if err != nil {
t.Fatalf("Failed to get signature: %v", err)
}
-
- if !reflect.DeepEqual(jsSig, adderServiceSignature) {
- t.Fatalf("Unexpected signature, got :%v, expected: %v", jsSig, adderServiceSignature)
+ if got, want := sig, simpleAddrSig; !reflect.DeepEqual(got, want) {
+ t.Fatalf("Unexpected signature, got :%#v, want: %#v", got, want)
}
}
type goServerTestCase struct {
- method string
- inArgs []json.RawMessage
- numOutArgs int32
- streamingInputs []string
- streamingInputType vom.Type
- expectedStream []testwriter.Response
- expectedError error
+ method string
+ inArgs []interface{}
+ numOutArgs int32
+ streamingInputs []interface{}
+ expectedStream []lib.Response
+ expectedError error
}
func runGoServerTestCase(t *testing.T, test goServerTestCase) {
@@ -253,13 +215,13 @@
}
go func() {
for _, value := range test.streamingInputs {
- controller.SendOnStream(0, value, &writer)
+ controller.SendOnStream(0, lib.VomEncodeOrDie(value), &writer)
}
controller.CloseStream(0)
}()
}
- request := veyronTempRPC{
+ request := VeyronRPC{
Name: "/" + endpoint.String(),
Method: test.method,
InArgs: test.inArgs,
@@ -273,21 +235,14 @@
}
}
-func vomEncodeOrDie(v interface{}) string {
- s, err := lib.VomEncode(v)
- if err != nil {
- panic(err)
- }
- return s
-}
func TestCallingGoServer(t *testing.T) {
runGoServerTestCase(t, goServerTestCase{
method: "Add",
- inArgs: []json.RawMessage{json.RawMessage("2"), json.RawMessage("3")},
+ inArgs: []interface{}{2, 3},
numOutArgs: 2,
- expectedStream: []testwriter.Response{
- testwriter.Response{
- Message: vomEncodeOrDie([]interface{}{int32(5)}),
+ expectedStream: []lib.Response{
+ lib.Response{
+ Message: lib.VomEncodeOrDie([]interface{}{int32(5)}),
Type: lib.ResponseFinal,
},
},
@@ -297,7 +252,7 @@
func TestCallingGoServerWithError(t *testing.T) {
runGoServerTestCase(t, goServerTestCase{
method: "Divide",
- inArgs: []json.RawMessage{json.RawMessage("1"), json.RawMessage("0")},
+ inArgs: []interface{}{1, 0},
numOutArgs: 2,
expectedError: verror2.Make(verror2.BadArg, nil, "div 0"),
})
@@ -305,34 +260,32 @@
func TestCallingGoWithStreaming(t *testing.T) {
runGoServerTestCase(t, goServerTestCase{
- method: "StreamingAdd",
- inArgs: []json.RawMessage{},
- streamingInputs: []string{"1", "2", "3", "4"},
- streamingInputType: vom_wiretype.Type{ID: 36},
- numOutArgs: 2,
- expectedStream: []testwriter.Response{
- testwriter.Response{
- Message: vomEncodeOrDie(int32(1)),
+ method: "StreamingAdd",
+ streamingInputs: []interface{}{1, 2, 3, 4},
+ numOutArgs: 2,
+ expectedStream: []lib.Response{
+ lib.Response{
+ Message: lib.VomEncodeOrDie(int32(1)),
Type: lib.ResponseStream,
},
- testwriter.Response{
- Message: vomEncodeOrDie(int32(3)),
+ lib.Response{
+ Message: lib.VomEncodeOrDie(int32(3)),
Type: lib.ResponseStream,
},
- testwriter.Response{
- Message: vomEncodeOrDie(int32(6)),
+ lib.Response{
+ Message: lib.VomEncodeOrDie(int32(6)),
Type: lib.ResponseStream,
},
- testwriter.Response{
- Message: vomEncodeOrDie(int32(10)),
+ lib.Response{
+ Message: lib.VomEncodeOrDie(int32(10)),
Type: lib.ResponseStream,
},
- testwriter.Response{
+ lib.Response{
Message: nil,
Type: lib.ResponseStreamClose,
},
- testwriter.Response{
- Message: vomEncodeOrDie([]interface{}{int32(10)}),
+ lib.Response{
+ Message: lib.VomEncodeOrDie([]interface{}{int32(10)}),
Type: lib.ResponseFinal,
},
},
@@ -446,9 +399,7 @@
clientStream []interface{}
// The set of JSON streaming messages sent from Javascript to the
// app.
- serverStream []string
- // The stream that we expect the client to see.
- expectedServerStream []interface{}
+ serverStream []interface{}
// The final response sent by the Javascript server to the
// app.
finalResponse interface{}
@@ -462,43 +413,6 @@
authError verror2.E
}
-func sendServerStream(t *testing.T, controller *Controller, test *jsServerTestCase, w lib.ClientWriter, expectedFlow int64) {
- for _, msg := range test.serverStream {
- controller.SendOnStream(expectedFlow, msg, w)
- }
-
- serverReply := map[string]interface{}{
- "Results": []interface{}{test.finalResponse},
- "Err": test.err,
- }
-
- bytes, err := json.Marshal(serverReply)
- if err != nil {
- t.Fatalf("Failed to serialize the reply: %v", err)
- }
- controller.HandleServerResponse(expectedFlow, string(bytes))
-}
-
-// Replaces "localEndpoint" and "remoteEndpoint" in security context of the
-// message passed in with constant strings "localEndpoint" and "remoteEndpoint".
-func cleanUpAuthRequest(message *testwriter.Response, t *testing.T) {
- // We should make sure that remoteEndpoint exists in the last message and
- // change it to a fixed string.
- if message.Type != lib.ResponseAuthRequest {
- t.Errorf("Unexpected auth message %v", message)
- return
- }
- context := message.Message.(map[string]interface{})["context"].(map[string]interface{})
-
- keysToReplace := []string{"localEndpoint", "remoteEndpoint"}
- for _, key := range keysToReplace {
- if context[key] == nil || context[key] == "" {
- t.Errorf("Unexpected auth message %v", message)
- }
- context[key] = key
- }
-}
-
func runJsServerTestCase(t *testing.T, test jsServerTestCase) {
rt, err := serveServer()
defer rt.mounttableServer.Stop()
@@ -525,13 +439,13 @@
vomClientStream := []string{}
for _, m := range test.clientStream {
- vomClientStream = append(vomClientStream, vomEncodeOrDie(m))
+ vomClientStream = append(vomClientStream, lib.VomEncodeOrDie(m))
}
mock := &mockJSServer{
controller: rt.controller,
t: t,
method: test.method,
- serviceSignature: adderServiceSignature,
+ serviceSignature: simpleAddrSig,
expectedClientStream: vomClientStream,
serverStream: test.serverStream,
hasAuthorizer: test.hasAuthorizer,
@@ -566,7 +480,7 @@
t.Errorf("unexpected error on close: %v", err)
}
- expectedStream := test.expectedServerStream
+ expectedStream := test.serverStream
for {
var data interface{}
if err := call.Recv(&data); err != nil {
@@ -587,7 +501,7 @@
err = call.Finish(&result, &err2)
if (err == nil && test.authError != nil) || (err != nil && test.authError == nil) {
- t.Errorf("unexpected err :%v, %v", err, test.authError)
+ t.Errorf("unexpected err: %v, %v", err, test.authError)
}
if err != nil {
@@ -608,16 +522,16 @@
func TestSimpleJSServer(t *testing.T) {
runJsServerTestCase(t, jsServerTestCase{
method: "Add",
- inArgs: []interface{}{1.0, 2.0},
- finalResponse: 3.0,
+ inArgs: []interface{}{int32(1), int32(2)},
+ finalResponse: int32(3),
})
}
func TestJSServerWithAuthorizer(t *testing.T) {
runJsServerTestCase(t, jsServerTestCase{
method: "Add",
- inArgs: []interface{}{1.0, 2.0},
- finalResponse: 3.0,
+ inArgs: []interface{}{int32(1), int32(2)},
+ finalResponse: int32(3),
hasAuthorizer: true,
})
}
@@ -626,8 +540,8 @@
err := verror2.Make(verror2.Internal, nil)
runJsServerTestCase(t, jsServerTestCase{
method: "Add",
- inArgs: []interface{}{1.0, 2.0},
- finalResponse: 3.0,
+ inArgs: []interface{}{int32(1), int32(2)},
+ finalResponse: int32(3),
err: err,
})
}
@@ -636,8 +550,8 @@
err := verror2.Make(verror2.Internal, nil)
runJsServerTestCase(t, jsServerTestCase{
method: "Add",
- inArgs: []interface{}{1.0, 2.0},
- finalResponse: 3.0,
+ inArgs: []interface{}{int32(1), int32(2)},
+ finalResponse: int32(3),
hasAuthorizer: true,
authError: err,
})
@@ -645,27 +559,25 @@
func TestJSServerWihStreamingInputs(t *testing.T) {
runJsServerTestCase(t, jsServerTestCase{
method: "StreamingAdd",
- clientStream: []interface{}{3.0, 4.0},
- finalResponse: 10.0,
+ clientStream: []interface{}{int32(3), int32(4)},
+ finalResponse: int32(10),
})
}
func TestJSServerWihStreamingOutputs(t *testing.T) {
runJsServerTestCase(t, jsServerTestCase{
- method: "StreamingAdd",
- serverStream: []string{"3", "4"},
- expectedServerStream: []interface{}{3.0, 4.0},
- finalResponse: 10.0,
+ method: "StreamingAdd",
+ serverStream: []interface{}{int32(3), int32(4)},
+ finalResponse: int32(10),
})
}
func TestJSServerWihStreamingInputsAndOutputs(t *testing.T) {
runJsServerTestCase(t, jsServerTestCase{
- method: "StreamingAdd",
- clientStream: []interface{}{1.0, 2.0},
- serverStream: []string{"3", "4"},
- expectedServerStream: []interface{}{3.0, 4.0},
- finalResponse: 10.0,
+ method: "StreamingAdd",
+ clientStream: []interface{}{int32(1), int32(2)},
+ serverStream: []interface{}{int32(3), int32(4)},
+ finalResponse: int32(10),
})
}
diff --git a/services/wsprd/app/messaging.go b/services/wsprd/app/messaging.go
index 2f78d0d..d0cf3c4 100644
--- a/services/wsprd/app/messaging.go
+++ b/services/wsprd/app/messaging.go
@@ -75,11 +75,6 @@
WebsocketRemoveName = 19
)
-type Response struct {
- Type lib.ResponseType
- Message interface{}
-}
-
type Message struct {
Id int64
// This contains the json encoded payload.
@@ -93,7 +88,6 @@
func (c *Controller) HandleIncomingMessage(msg Message, w lib.ClientWriter) {
// TODO(mattr): Get the proper context information from javascript.
ctx := c.RT().NewContext()
-
switch msg.Type {
case VeyronRequestMessage:
c.HandleVeyronRequest(ctx, msg.Id, msg.Data, w)
@@ -139,7 +133,7 @@
// TODO(bprosnitz) Don't double-encode
func ConstructOutgoingMessage(messageId int64, messageType lib.ResponseType, data interface{}) (string, error) {
var buf bytes.Buffer
- if err := vom.ObjToJSON(&buf, vom.ValueOf(Response{Type: messageType, Message: data})); err != nil {
+ if err := vom.ObjToJSON(&buf, vom.ValueOf(lib.Response{Type: messageType, Message: data})); err != nil {
return "", err
}
diff --git a/services/wsprd/app/mock_jsServer_test.go b/services/wsprd/app/mock_jsServer_test.go
index fbfc17d..a9c127d 100644
--- a/services/wsprd/app/mock_jsServer_test.go
+++ b/services/wsprd/app/mock_jsServer_test.go
@@ -2,28 +2,26 @@
import (
"bytes"
- "encoding/hex"
"encoding/json"
"fmt"
"reflect"
"sync"
"testing"
- "veyron.io/veyron/veyron2/vom2"
+ "veyron.io/veyron/veyron2/vdl/vdlroot/src/signature"
"veyron.io/wspr/veyron/services/wsprd/ipc/server"
"veyron.io/wspr/veyron/services/wsprd/lib"
"veyron.io/wspr/veyron/services/wsprd/principal"
- "veyron.io/wspr/veyron/services/wsprd/signature"
)
type mockJSServer struct {
controller *Controller
t *testing.T
method string
- serviceSignature signature.JSONServiceSignature
+ serviceSignature []signature.Interface
sender sync.WaitGroup
expectedClientStream []string
- serverStream []string
+ serverStream []interface{}
hasAuthorizer bool
authError error
inArgs []interface{}
@@ -69,19 +67,6 @@
panic(err)
}
-func vomDecode(s string, v interface{}) error {
- b, err := hex.DecodeString(s)
- if err != nil {
- return err
- }
- decoder, err := vom2.NewDecoder(bytes.NewBuffer(b))
- if err != nil {
- return err
- }
-
- return decoder.Decode(v)
-}
-
func normalize(msg interface{}) (map[string]interface{}, error) {
// We serialize and deserialize the reponse so that we can do deep equal with
// messages that contain non-exported structs.
@@ -114,7 +99,7 @@
}
bytes, err := json.Marshal(map[string]interface{}{
"handle": 0,
- "signature": vomEncodeOrDie(m.serviceSignature),
+ "signature": lib.VomEncodeOrDie(m.serviceSignature),
"hasAuthorizer": m.hasAuthorizer,
})
if err != nil {
@@ -146,7 +131,7 @@
}
var msg server.AuthRequest
- if err := vomDecode(v.(string), &msg); err != nil {
+ if err := lib.VomDecode(v.(string), &msg); err != nil {
m.controller.HandleAuthResponse(m.flowCount, internalErrJSON(fmt.Sprintf("error decoding %v:", err)))
return nil
}
@@ -221,7 +206,7 @@
}
var msg server.ServerRPCRequest
- if err := vomDecode(v.(string), &msg); err != nil {
+ if err := lib.VomDecode(v.(string), &msg); err != nil {
m.controller.HandleServerResponse(m.flowCount, internalErrJSON(err))
return nil
@@ -272,8 +257,8 @@
func (m *mockJSServer) sendServerStream() {
defer m.sender.Done()
- for _, msg := range m.serverStream {
- m.controller.SendOnStream(m.rpcFlow, msg, m)
+ for _, v := range m.serverStream {
+ m.controller.SendOnStream(m.rpcFlow, lib.VomEncodeOrDie(v), m)
}
}
@@ -294,15 +279,14 @@
func (m *mockJSServer) handleStreamClose(msg interface{}) error {
m.sender.Wait()
- serverReply := map[string]interface{}{
- "Results": []interface{}{m.finalResponse},
- "Err": m.finalError,
+ reply := lib.ServerRPCReply{
+ Results: []interface{}{m.finalResponse},
+ Err: m.finalError,
}
-
- bytes, err := json.Marshal(serverReply)
+ vomReply, err := lib.VomEncode(reply)
if err != nil {
m.t.Fatalf("Failed to serialize the reply: %v", err)
}
- m.controller.HandleServerResponse(m.rpcFlow, string(bytes))
+ m.controller.HandleServerResponse(m.rpcFlow, vomReply)
return nil
}
diff --git a/services/wsprd/app/stream.go b/services/wsprd/app/stream.go
index e277be8..f1fa4ff 100644
--- a/services/wsprd/app/stream.go
+++ b/services/wsprd/app/stream.go
@@ -4,13 +4,12 @@
"fmt"
"veyron.io/veyron/veyron2/ipc"
- "veyron.io/veyron/veyron2/vom"
+ "veyron.io/veyron/veyron2/vdl"
"veyron.io/wspr/veyron/services/wsprd/lib"
)
type initConfig struct {
stream ipc.Stream
- inType vom.Type
}
type message struct {
@@ -69,15 +68,14 @@
func (os *outstandingStream) loop() {
config := <-os.initChan
for msg := range os.messages {
- payload, err := vom.JSONToObject(msg.data, config.inType)
- if err != nil {
- msg.writer.Error(fmt.Errorf("error while converting json to InStreamType (%s): %v", msg.data, err))
- continue
+ var item *vdl.Value
+ if err := lib.VomDecode(msg.data, &item); err != nil {
+ msg.writer.Error(fmt.Errorf("failed to decode stream arg from %v: %v", msg.data, err))
+ break
}
- if err := config.stream.Send(payload); err != nil {
+ if err := config.stream.Send(item); err != nil {
msg.writer.Error(fmt.Errorf("failed to send on stream: %v", err))
}
-
}
close(os.done)
// If this is a client rpc, we need to call CloseSend on it.
@@ -86,6 +84,6 @@
}
}
-func (os *outstandingStream) init(stream ipc.Stream, inType vom.Type) {
- os.initChan <- &initConfig{stream, inType}
+func (os *outstandingStream) init(stream ipc.Stream) {
+ os.initChan <- &initConfig{stream}
}
diff --git a/services/wsprd/browspr/browspr_test.go b/services/wsprd/browspr/browspr_test.go
index e4a3cc3..8192702 100644
--- a/services/wsprd/browspr/browspr_test.go
+++ b/services/wsprd/browspr/browspr_test.go
@@ -1,9 +1,8 @@
package browspr
import (
- "bytes"
- "encoding/hex"
"encoding/json"
+ "reflect"
"strings"
"testing"
@@ -15,10 +14,7 @@
"veyron.io/veyron/veyron2/naming"
"veyron.io/veyron/veyron2/options"
"veyron.io/veyron/veyron2/rt"
- "veyron.io/veyron/veyron2/vdl/vdlutil"
"veyron.io/veyron/veyron2/vlog"
- "veyron.io/veyron/veyron2/vom2"
- "veyron.io/veyron/veyron2/wiretype"
"veyron.io/wspr/veyron/services/wsprd/app"
"veyron.io/wspr/veyron/services/wsprd/lib"
)
@@ -69,23 +65,6 @@
return "[" + txt + "]", nil
}
-func (s mockServer) Signature(call ipc.ServerCall) (ipc.ServiceSignature, error) {
- result := ipc.ServiceSignature{Methods: make(map[string]ipc.MethodSignature)}
- result.Methods["BasicCall"] = ipc.MethodSignature{
- InArgs: []ipc.MethodArgument{
- {Name: "Txt", Type: 3},
- },
- OutArgs: []ipc.MethodArgument{
- {Name: "Value", Type: 3},
- {Name: "Err", Type: 65},
- },
- }
- result.TypeDefs = []vdlutil.Any{
- wiretype.NamedPrimitiveType{Type: 0x1, Name: "error", Tags: []string(nil)}}
-
- return result, nil
-}
-
func startMockServer(desiredName string) (ipc.Server, naming.Endpoint, error) {
// Create a new server instance.
s, err := r.NewServer()
@@ -105,15 +84,6 @@
return s, endpoint, nil
}
-type veyronTempRPC struct {
- Name string
- Method string
- InArgs []json.RawMessage
- NumOutArgs int32
- IsStreaming bool
- Timeout int64
-}
-
func TestBrowspr(t *testing.T) {
proxy, err := startProxy()
if err != nil {
@@ -198,25 +168,23 @@
msgInstanceId := int32(11)
- rpcMessage := veyronTempRPC{
- Name: mockServerName,
- Method: "BasicCall",
- InArgs: []json.RawMessage{
- json.RawMessage([]byte("\"InputValue\"")),
- },
+ rpc := app.VeyronRPC{
+ Name: mockServerName,
+ Method: "BasicCall",
+ InArgs: []interface{}{"InputValue"},
NumOutArgs: 2,
IsStreaming: false,
Timeout: (1 << 31) - 1,
}
- jsonRpcMessage, err := json.Marshal(rpcMessage)
+ vomRPC, err := lib.VomEncode(rpc)
if err != nil {
- t.Fatalf("Failed to marshall rpc message to json: %v", err)
+ t.Fatalf("Failed to vom encode rpc message: %v", err)
}
msg, err := json.Marshal(app.Message{
Id: 1,
- Data: string(jsonRpcMessage),
+ Data: vomRPC,
Type: app.VeyronRequestMessage,
})
if err != nil {
@@ -248,7 +216,7 @@
t.Errorf("Message type was %v, expected %v", outMsg.Type, app.MessageType(0))
}
- var responseMsg app.Response
+ var responseMsg lib.Response
if err := json.Unmarshal([]byte(outMsg.Data), &responseMsg); err != nil {
t.Fatalf("Failed to unmarshall outgoing response: %v", err)
}
@@ -261,15 +229,10 @@
t.Errorf("Got unexpected response message body of type %T, expected type string", responseMsg.Message)
}
var result []string
- arg, err := hex.DecodeString(outArg)
- if err != nil {
- t.Errorf("failed to hex decode string: %v", err)
+ if err := lib.VomDecode(outArg, &result); err != nil {
+ t.Errorf("Failed to vom decode args from %v: %v", outArg, err)
}
- decoder, err := vom2.NewDecoder(bytes.NewBuffer(arg))
- if err != nil {
- t.Fatalf("failed to construct new decoder: %v", err)
- }
- if err := decoder.Decode(&result); err != nil || result[0] != "[InputValue]" {
- t.Errorf("got %s with err: %v, expected %s", result[0], err, "[InputValue]")
+ if got, want := result, []string{"[InputValue]"}; !reflect.DeepEqual(got, want) {
+ t.Errorf("Result got %v, want %v", got, want)
}
}
diff --git a/services/wsprd/ipc/server/dispatcher.go b/services/wsprd/ipc/server/dispatcher.go
index 9cb48f2..851430d 100644
--- a/services/wsprd/ipc/server/dispatcher.go
+++ b/services/wsprd/ipc/server/dispatcher.go
@@ -2,18 +2,16 @@
import (
"bytes"
- "encoding/hex"
"encoding/json"
"sync"
"veyron.io/wspr/veyron/services/wsprd/lib"
- "veyron.io/wspr/veyron/services/wsprd/signature"
"veyron.io/veyron/veyron2/ipc"
"veyron.io/veyron/veyron2/security"
+ "veyron.io/veyron/veyron2/vdl/vdlroot/src/signature"
"veyron.io/veyron/veyron2/verror2"
"veyron.io/veyron/veyron2/vlog"
- "veyron.io/veyron/veyron2/vom2"
)
type flowFactory interface {
@@ -22,20 +20,27 @@
}
type invokerFactory interface {
- createInvoker(handle int64, signature signature.JSONServiceSignature) (ipc.Invoker, error)
+ createInvoker(handle int64, signature []signature.Interface) (ipc.Invoker, error)
}
type authFactory interface {
createAuthorizer(handle int64, hasAuthorizer bool) (security.Authorizer, error)
}
-type lookupReply struct {
+type lookupIntermediateReply struct {
Handle int64
HasAuthorizer bool
Signature string
Err *verror2.Standard
}
+type lookupReply struct {
+ Handle int64
+ HasAuthorizer bool
+ Signature []signature.Interface
+ Err *verror2.Standard
+}
+
type dispatcherRequest struct {
ServerID uint64 `json:"serverId"`
Suffix string `json:"suffix"`
@@ -81,7 +86,7 @@
if err := flow.Writer.Send(lib.ResponseDispatcherLookup, message); err != nil {
ch <- lookupReply{Err: verror2.Convert(verror2.Internal, nil, err).(*verror2.Standard)}
}
- request := <-ch
+ reply := <-ch
d.mu.Lock()
delete(d.outstandingLookups, flow.ID)
@@ -89,37 +94,23 @@
d.flowFactory.cleanupFlow(flow.ID)
- if request.Err != nil {
- return nil, nil, request.Err
+ if reply.Err != nil {
+ return nil, nil, reply.Err
}
-
- if request.Handle < 0 {
+ if reply.Handle < 0 {
return nil, nil, verror2.Make(verror2.NoExist, nil, "Dispatcher", suffix)
}
- var sig signature.JSONServiceSignature
- b, err := hex.DecodeString(request.Signature)
+ invoker, err := d.invokerFactory.createInvoker(reply.Handle, reply.Signature)
if err != nil {
- return nil, nil, verror2.Convert(verror2.Internal, nil, err)
+ return nil, nil, err
}
- buf := bytes.NewBuffer(b)
- decoder, err := vom2.NewDecoder(buf)
- if err != nil {
- return nil, nil, verror2.Convert(verror2.Internal, nil, err)
- }
-
- if err := decoder.Decode(&sig); err != nil {
- return nil, nil, verror2.Convert(verror2.Internal, nil, err)
- }
-
- invoker, err := d.invokerFactory.createInvoker(request.Handle, sig)
+ auth, err := d.authFactory.createAuthorizer(reply.Handle, reply.HasAuthorizer)
if err != nil {
return nil, nil, err
}
- auth, err := d.authFactory.createAuthorizer(request.Handle, request.HasAuthorizer)
-
- return invoker, auth, err
+ return invoker, auth, nil
}
func (d *dispatcher) handleLookupResponse(id int64, data string) {
@@ -133,14 +124,26 @@
return
}
- var request lookupReply
+ var intermediateReply lookupIntermediateReply
decoder := json.NewDecoder(bytes.NewBufferString(data))
- if err := decoder.Decode(&request); err != nil {
+ if err := decoder.Decode(&intermediateReply); err != nil {
err2 := verror2.Convert(verror2.Internal, nil, err).(verror2.Standard)
- request = lookupReply{Err: &err2}
+ intermediateReply = lookupIntermediateReply{Err: &err2}
d.logger.Errorf("unmarshaling invoke request failed: %v, %s", err, data)
}
- ch <- request
+
+ reply := lookupReply{
+ Handle: intermediateReply.Handle,
+ HasAuthorizer: intermediateReply.HasAuthorizer,
+ Err: intermediateReply.Err,
+ }
+ if reply.Err == nil && intermediateReply.Signature != "" {
+ if err := lib.VomDecode(intermediateReply.Signature, &reply.Signature); err != nil {
+ err2 := verror2.Convert(verror2.Internal, nil, err).(verror2.Standard)
+ reply.Err = &err2
+ }
+ }
+ ch <- reply
}
// StopServing implements dispatcher StopServing.
diff --git a/services/wsprd/ipc/server/dispatcher_test.go b/services/wsprd/ipc/server/dispatcher_test.go
index 6c923d6..62038a1 100644
--- a/services/wsprd/ipc/server/dispatcher_test.go
+++ b/services/wsprd/ipc/server/dispatcher_test.go
@@ -9,10 +9,9 @@
"veyron.io/veyron/veyron2/ipc"
"veyron.io/veyron/veyron2/rt"
"veyron.io/veyron/veyron2/security"
- vdlsig "veyron.io/veyron/veyron2/vdl/vdlroot/src/signature"
+ "veyron.io/veyron/veyron2/vdl/vdlroot/src/signature"
"veyron.io/wspr/veyron/services/wsprd/lib"
"veyron.io/wspr/veyron/services/wsprd/lib/testwriter"
- "veyron.io/wspr/veyron/services/wsprd/signature"
)
type mockFlowFactory struct {
@@ -27,7 +26,7 @@
type mockInvoker struct {
handle int64
- sig signature.JSONServiceSignature
+ sig []signature.Interface
}
func (m mockInvoker) Prepare(string, int) ([]interface{}, []interface{}, error) {
@@ -42,17 +41,21 @@
return nil
}
-func (mockInvoker) Signature(ctx ipc.ServerContext) ([]vdlsig.Interface, error) {
- return nil, nil
+func (m mockInvoker) Signature(ctx ipc.ServerContext) ([]signature.Interface, error) {
+ return m.sig, nil
}
-func (mockInvoker) MethodSignature(ctx ipc.ServerContext, method string) (vdlsig.Method, error) {
- return vdlsig.Method{}, nil
+func (m mockInvoker) MethodSignature(ctx ipc.ServerContext, methodName string) (signature.Method, error) {
+ method, found := m.sig[0].FindMethod(methodName)
+ if !found {
+ return signature.Method{}, fmt.Errorf("Method %q not found", methodName)
+ }
+ return method, nil
}
type mockInvokerFactory struct{}
-func (mockInvokerFactory) createInvoker(handle int64, sig signature.JSONServiceSignature) (ipc.Invoker, error) {
+func (mockInvokerFactory) createInvoker(handle int64, sig []signature.Interface) (ipc.Invoker, error) {
return &mockInvoker{handle: handle, sig: sig}, nil
}
@@ -69,14 +72,6 @@
return mockAuthorizer{handle: handle, hasAuthorizer: hasAuthorizer}, nil
}
-func vomEncodeOrDie(v interface{}) string {
- s, err := lib.VomEncode(v)
- if err != nil {
- panic(err)
- }
- return s
-}
-
func TestSuccessfulLookup(t *testing.T) {
runtime, err := rt.New()
if err != nil {
@@ -86,19 +81,15 @@
flowFactory := &mockFlowFactory{}
d := newDispatcher(0, flowFactory, mockInvokerFactory{}, mockAuthorizerFactory{}, runtime.Logger())
- expectedSig := signature.JSONServiceSignature{
- "add": signature.JSONMethodSignature{
- InArgs: []string{"foo", "bar"},
- NumOutArgs: 1,
- },
+ expectedSig := []signature.Interface{
+ {Name: "AName"},
}
-
go func() {
if err := flowFactory.writer.WaitForMessage(1); err != nil {
t.Errorf("failed to get dispatch request %v", err)
t.Fail()
}
- jsonResponse := fmt.Sprintf(`{"handle":1,"hasAuthorizer":false,"signature":"%s"}`, vomEncodeOrDie(expectedSig))
+ jsonResponse := fmt.Sprintf(`{"handle":1,"hasAuthorizer":false,"signature":"%s"}`, lib.VomEncodeOrDie(expectedSig))
d.handleLookupResponse(0, jsonResponse)
}()
@@ -118,8 +109,8 @@
t.Errorf("wrong authorizer returned, expected: %v, got :%v", expectedAuth, auth)
}
- expectedResponses := []testwriter.Response{
- testwriter.Response{
+ expectedResponses := []lib.Response{
+ {
Type: lib.ResponseDispatcherLookup,
Message: map[string]interface{}{
"serverId": 0.0,
@@ -141,19 +132,15 @@
flowFactory := &mockFlowFactory{}
d := newDispatcher(0, flowFactory, mockInvokerFactory{}, mockAuthorizerFactory{}, runtime.Logger())
- expectedSig := signature.JSONServiceSignature{
- "add": signature.JSONMethodSignature{
- InArgs: []string{"foo", "bar"},
- NumOutArgs: 1,
- },
+ expectedSig := []signature.Interface{
+ {Name: "AName"},
}
-
go func() {
if err := flowFactory.writer.WaitForMessage(1); err != nil {
t.Errorf("failed to get dispatch request %v", err)
t.Fail()
}
- jsonResponse := fmt.Sprintf(`{"handle":1,"hasAuthorizer":true,"signature":"%s"}`, vomEncodeOrDie(expectedSig))
+ jsonResponse := fmt.Sprintf(`{"handle":1,"hasAuthorizer":true,"signature":"%s"}`, lib.VomEncodeOrDie(expectedSig))
d.handleLookupResponse(0, jsonResponse)
}()
@@ -173,8 +160,8 @@
t.Errorf("wrong authorizer returned, expected: %v, got :%v", expectedAuth, auth)
}
- expectedResponses := []testwriter.Response{
- testwriter.Response{
+ expectedResponses := []lib.Response{
+ {
Type: lib.ResponseDispatcherLookup,
Message: map[string]interface{}{
"serverId": 0.0,
@@ -211,8 +198,8 @@
t.Errorf("expected error, but got none", err)
}
- expectedResponses := []testwriter.Response{
- testwriter.Response{
+ expectedResponses := []lib.Response{
+ {
Type: lib.ResponseDispatcherLookup,
Message: map[string]interface{}{
"serverId": 0.0,
diff --git a/services/wsprd/ipc/server/invoker.go b/services/wsprd/ipc/server/invoker.go
index 6044cb3..c923a18 100644
--- a/services/wsprd/ipc/server/invoker.go
+++ b/services/wsprd/ipc/server/invoker.go
@@ -2,11 +2,9 @@
import (
"veyron.io/veyron/veyron2/ipc"
- vdlsig "veyron.io/veyron/veyron2/vdl/vdlroot/src/signature"
+ "veyron.io/veyron/veyron2/vdl"
+ "veyron.io/veyron/veyron2/vdl/vdlroot/src/signature"
"veyron.io/veyron/veyron2/verror"
-
- "veyron.io/wspr/veyron/services/wsprd/lib"
- "veyron.io/wspr/veyron/services/wsprd/signature"
)
var typedNil []int
@@ -14,84 +12,59 @@
// invoker holds a delegate function to call on invoke and a list of methods that
// are available for be called.
type invoker struct {
- // signature of the service this invoker belogs to
- sig ipc.ServiceSignature
// delegate function to call when an invoke request comes in
invokeFunc remoteInvokeFunc
- // map of special methods like "Signature" which invoker handles on behalf of the actual service
- predefinedInvokers map[string]ipc.Invoker
- // This is to get the method tags. TODO(bjornick): Remove this when vom2 signatures
- // has tags.
- jsonSig signature.JSONServiceSignature
+ signature []signature.Interface
}
var _ ipc.Invoker = (*invoker)(nil)
// newInvoker is an invoker factory
-func newInvoker(sig ipc.ServiceSignature, jsonSig signature.JSONServiceSignature, invokeFunc remoteInvokeFunc) ipc.Invoker {
- predefinedInvokers := make(map[string]ipc.Invoker)
-
- // Special handling for predefined "signature" method
- predefinedInvokers["Signature"] = newSignatureInvoker(sig)
-
- i := &invoker{sig, invokeFunc, predefinedInvokers, jsonSig}
+func newInvoker(signature []signature.Interface, invokeFunc remoteInvokeFunc) ipc.Invoker {
+ i := &invoker{invokeFunc, signature}
return i
}
// Prepare implements the Invoker interface.
func (i *invoker) Prepare(methodName string, numArgs int) ([]interface{}, []interface{}, error) {
- if pi := i.predefinedInvokers[methodName]; pi != nil {
- return pi.Prepare(methodName, numArgs)
+ method, err := i.MethodSignature(nil, methodName)
+ if err != nil {
+ return nil, nil, err
}
-
- method, ok := i.jsonSig[lib.LowercaseFirstCharacter(methodName)]
- if !ok {
- return nil, nil, verror.NoExistf("method name not found in IDL: %s", methodName)
+ if got, want := numArgs, len(method.InArgs); got != want {
+ return nil, nil, verror.NoExistf("Method %q got %d args, want %d", methodName, got, want)
}
-
argptrs := make([]interface{}, len(method.InArgs))
-
- for ix := range method.InArgs {
- var x interface{}
- argptrs[ix] = &x // Accept AnyData
+ for ix, arg := range method.InArgs {
+ argptrs[ix] = vdl.ZeroValue(arg.Type)
}
- return argptrs, method.Tags, nil
+ tags := make([]interface{}, len(method.Tags))
+ for ix, tag := range method.Tags {
+ tags[ix] = (interface{})(tag)
+ }
+
+ return argptrs, tags, nil
}
// Invoke implements the Invoker interface.
func (i *invoker) Invoke(methodName string, call ipc.ServerCall, argptrs []interface{}) ([]interface{}, error) {
-
- if pi := i.predefinedInvokers[methodName]; pi != nil {
- return pi.Invoke(methodName, call, argptrs)
- }
-
- if _, ok := i.sig.Methods[methodName]; !ok {
- return nil, verror.NoExistf("method name not found in IDL: %s", methodName)
- }
-
replychan := i.invokeFunc(methodName, argptrs, call)
// Wait for the result
reply := <-replychan
- var err error = nil
+ var err error
if reply.Err != nil {
err = reply.Err
}
- for i, v := range reply.Results {
- if v == nil {
- reply.Results[i] = typedNil
- }
- }
-
// We always assume JavaScript services might return error.
// JavaScript returns non-error results in reply.Results & error in reply.Err
// We add the error as the last result of the ipc invoke call since last
// out arg is where application error is expected to be.
- results := make([]interface{}, len(reply.Results)+1)
+ results := make([]interface{}, len(reply.Results))
results = append(reply.Results, err)
return results, nil
@@ -102,12 +75,13 @@
return nil
}
-// TODO(bjornick,toddw): Implement this for JS.
-func (i *invoker) Signature(ctx ipc.ServerContext) ([]vdlsig.Interface, error) {
- return nil, nil
+func (i *invoker) Signature(ctx ipc.ServerContext) ([]signature.Interface, error) {
+ return i.signature, nil
}
-// TODO(bjornick,toddw): Implement this for JS.
-func (i *invoker) MethodSignature(ctx ipc.ServerContext, method string) (vdlsig.Method, error) {
- return vdlsig.Method{}, nil
+func (i *invoker) MethodSignature(ctx ipc.ServerContext, method string) (signature.Method, error) {
+ if methodSig, ok := signature.FirstMethod(i.signature, method); ok {
+ return methodSig, nil
+ }
+ return signature.Method{}, verror.NoExistf("Method %q not found in signature", method)
}
diff --git a/services/wsprd/ipc/server/server.go b/services/wsprd/ipc/server/server.go
index a92e710..355749c 100644
--- a/services/wsprd/ipc/server/server.go
+++ b/services/wsprd/ipc/server/server.go
@@ -3,22 +3,19 @@
package server
import (
- "bytes"
- "encoding/hex"
"encoding/json"
"sync"
"time"
"veyron.io/wspr/veyron/services/wsprd/lib"
"veyron.io/wspr/veyron/services/wsprd/principal"
- "veyron.io/wspr/veyron/services/wsprd/signature"
"veyron.io/veyron/veyron2"
"veyron.io/veyron/veyron2/ipc"
"veyron.io/veyron/veyron2/security"
+ "veyron.io/veyron/veyron2/vdl/vdlroot/src/signature"
"veyron.io/veyron/veyron2/verror2"
"veyron.io/veyron/veyron2/vlog"
- "veyron.io/veyron/veyron2/vom2"
)
type Flow struct {
@@ -46,12 +43,6 @@
MethodTags []interface{}
}
-// The response from the javascript server to the proxy.
-type serverRPCReply struct {
- Results []interface{}
- Err *verror2.Standard
-}
-
type FlowHandler interface {
CreateNewFlow(server *Server, sender ipc.Stream) *Flow
@@ -120,7 +111,7 @@
helper ServerHelper
// The set of outstanding server requests.
- outstandingServerRequests map[int64]chan *serverRPCReply
+ outstandingServerRequests map[int64]chan *lib.ServerRPCReply
outstandingAuthRequests map[int64]chan error
}
@@ -130,7 +121,7 @@
id: id,
helper: helper,
listenSpec: listenSpec,
- outstandingServerRequests: make(map[int64]chan *serverRPCReply),
+ outstandingServerRequests: make(map[int64]chan *lib.ServerRPCReply),
outstandingAuthRequests: make(map[int64]chan error),
}
var err error
@@ -142,12 +133,12 @@
// remoteInvokeFunc is a type of function that can invoke a remote method and
// communicate the result back via a channel to the caller
-type remoteInvokeFunc func(methodName string, args []interface{}, call ipc.ServerCall) <-chan *serverRPCReply
+type remoteInvokeFunc func(methodName string, args []interface{}, call ipc.ServerCall) <-chan *lib.ServerRPCReply
func (s *Server) createRemoteInvokerFunc(handle int64) remoteInvokeFunc {
- return func(methodName string, args []interface{}, call ipc.ServerCall) <-chan *serverRPCReply {
+ return func(methodName string, args []interface{}, call ipc.ServerCall) <-chan *lib.ServerRPCReply {
flow := s.helper.CreateNewFlow(s, call)
- replyChan := make(chan *serverRPCReply, 1)
+ replyChan := make(chan *lib.ServerRPCReply, 1)
s.mu.Lock()
s.outstandingServerRequests[flow.ID] = replyChan
s.mu.Unlock()
@@ -157,10 +148,10 @@
timeout = lib.GoToJSDuration(deadline.Sub(time.Now()))
}
- errHandler := func(err error) <-chan *serverRPCReply {
+ errHandler := func(err error) <-chan *lib.ServerRPCReply {
if ch := s.popServerRequest(flow.ID); ch != nil {
stdErr := verror2.Convert(verror2.Internal, call, err).(verror2.Standard)
- ch <- &serverRPCReply{nil, &stdErr}
+ ch <- &lib.ServerRPCReply{nil, &stdErr}
s.helper.CleanupFlow(flow.ID)
}
return replyChan
@@ -184,20 +175,15 @@
Args: args,
Context: context,
}
-
vomMessage, err := lib.VomEncode(message)
-
if err != nil {
return errHandler(err)
}
-
if err := flow.Writer.Send(lib.ResponseServerRequest, vomMessage); err != nil {
return errHandler(err)
}
- s.helper.GetLogger().VI(3).Infof("request received to call method %q on "+
- "JavaScript server with args %v, MessageId %d was assigned.",
- methodName, args, flow.ID)
+ s.helper.GetLogger().VI(3).Infof("calling method %q with args %v, MessageID %d assigned\n", methodName, args, flow.ID)
// Watch for cancellation.
go func() {
@@ -212,7 +198,7 @@
s.helper.CleanupFlow(flow.ID)
err := verror2.Convert(verror2.Aborted, call, call.Err()).(verror2.Standard)
- ch <- &serverRPCReply{nil, &err}
+ ch <- &lib.ServerRPCReply{nil, &err}
}()
go proxyStream(call, flow.Writer, s.helper.GetLogger())
@@ -224,24 +210,16 @@
func proxyStream(stream ipc.Stream, w lib.ClientWriter, logger vlog.Logger) {
var item interface{}
for err := stream.Recv(&item); err == nil; err = stream.Recv(&item) {
- var buf bytes.Buffer
- encoder, err := vom2.NewBinaryEncoder(&buf)
+ vomItem, err := lib.VomEncode(item)
if err != nil {
w.Error(verror2.Convert(verror2.Internal, nil, err))
return
}
-
- if err := encoder.Encode(item); err != nil {
- w.Error(verror2.Convert(verror2.Internal, nil, err))
- return
- }
-
- if err := w.Send(lib.ResponseStream, hex.EncodeToString(buf.Bytes())); err != nil {
+ if err := w.Send(lib.ResponseStream, vomItem); err != nil {
w.Error(verror2.Convert(verror2.Internal, nil, err))
return
}
}
-
if err := w.Send(lib.ResponseStreamClose, nil); err != nil {
w.Error(verror2.Convert(verror2.Internal, nil, err))
return
@@ -317,7 +295,7 @@
return nil
}
-func (s *Server) popServerRequest(id int64) chan *serverRPCReply {
+func (s *Server) popServerRequest(id int64) chan *lib.ServerRPCReply {
s.mu.Lock()
defer s.mu.Unlock()
ch := s.outstandingServerRequests[id]
@@ -331,21 +309,20 @@
if ch == nil {
s.helper.GetLogger().Errorf("unexpected result from JavaScript. No channel "+
"for MessageId: %d exists. Ignoring the results.", id)
- //Ignore unknown responses that don't belong to any channel
+ // Ignore unknown responses that don't belong to any channel
return
}
// Decode the result and send it through the channel
- var serverReply serverRPCReply
- if decoderErr := json.Unmarshal([]byte(data), &serverReply); decoderErr != nil {
- err := verror2.Convert(verror2.Internal, nil, decoderErr).(verror2.Standard)
- serverReply = serverRPCReply{nil, &err}
+ var reply lib.ServerRPCReply
+ if err := lib.VomDecode(data, &reply); err != nil {
+ reply.Err = err
}
s.helper.GetLogger().VI(3).Infof("response received from JavaScript server for "+
- "MessageId %d with result %v", id, serverReply)
+ "MessageId %d with result %v", id, reply)
s.helper.CleanupFlow(id)
- ch <- &serverReply
+ ch <- &reply
}
func (s *Server) HandleLookupResponse(id int64, data string) {
@@ -390,14 +367,9 @@
s.helper.CleanupFlow(id)
}
-func (s *Server) createInvoker(handle int64, sig signature.JSONServiceSignature) (ipc.Invoker, error) {
- serviceSig, err := sig.ServiceSignature()
- if err != nil {
- return nil, err
- }
-
+func (s *Server) createInvoker(handle int64, sig []signature.Interface) (ipc.Invoker, error) {
remoteInvokeFunc := s.createRemoteInvokerFunc(handle)
- return newInvoker(serviceSig, sig, remoteInvokeFunc), nil
+ return newInvoker(sig, remoteInvokeFunc), nil
}
func (s *Server) createAuthorizer(handle int64, hasAuthorizer bool) (security.Authorizer, error) {
@@ -409,8 +381,8 @@
func (s *Server) Stop() {
stdErr := verror2.Make(verror2.Timeout, nil).(verror2.Standard)
- result := serverRPCReply{
- Results: []interface{}{nil},
+ result := lib.ServerRPCReply{
+ Results: nil,
Err: &stdErr,
}
s.mu.Lock()
@@ -421,7 +393,7 @@
default:
}
}
- s.outstandingServerRequests = make(map[int64]chan *serverRPCReply)
+ s.outstandingServerRequests = make(map[int64]chan *lib.ServerRPCReply)
s.server.Stop()
}
diff --git a/services/wsprd/ipc/server/signature_invoker.go b/services/wsprd/ipc/server/signature_invoker.go
deleted file mode 100644
index f3eaa39..0000000
--- a/services/wsprd/ipc/server/signature_invoker.go
+++ /dev/null
@@ -1,49 +0,0 @@
-package server
-
-import (
- "veyron.io/veyron/veyron2/ipc"
- "veyron.io/veyron/veyron2/security"
- vdlsig "veyron.io/veyron/veyron2/vdl/vdlroot/src/signature"
-)
-
-// signatureInvoker acts as the signature() method and is used to handle calls
-// to signature() on behalf of the service
-//
-// TODO(toddw): Replace this with the new Signature call.
-type signatureInvoker struct {
- // signature of the service
- sig ipc.ServiceSignature
-}
-
-var _ ipc.Invoker = (*signatureInvoker)(nil)
-
-func (i *signatureInvoker) signature() ipc.ServiceSignature {
- return i.sig
-}
-
-// newSignatureInvoker is an invoker factory
-func newSignatureInvoker(sig ipc.ServiceSignature) ipc.Invoker {
- return &signatureInvoker{sig}
-}
-
-// Prepare implements the Invoker interface.
-func (i *signatureInvoker) Prepare(methodName string, _ int) (argptrs, tags []interface{}, err error) {
- return []interface{}{}, []interface{}{security.ReadLabel}, nil
-}
-
-// Invoke implements the Invoker interface.
-func (i *signatureInvoker) Invoke(methodName string, call ipc.ServerCall, argptrs []interface{}) ([]interface{}, error) {
- return []interface{}{i.signature(), nil}, nil
-}
-
-func (i *signatureInvoker) Globber() *ipc.GlobState {
- return nil
-}
-
-func (i *signatureInvoker) Signature(ctx ipc.ServerContext) ([]vdlsig.Interface, error) {
- return nil, nil
-}
-
-func (i *signatureInvoker) MethodSignature(ctx ipc.ServerContext, method string) (vdlsig.Method, error) {
- return vdlsig.Method{}, nil
-}
diff --git a/services/wsprd/lib/signature_manager.go b/services/wsprd/lib/signature_manager.go
index fa78195..9a2856d 100644
--- a/services/wsprd/lib/signature_manager.go
+++ b/services/wsprd/lib/signature_manager.go
@@ -6,10 +6,13 @@
"veyron.io/veyron/veyron2/context"
"veyron.io/veyron/veyron2/ipc"
+ "veyron.io/veyron/veyron2/ipc/reserved"
+ "veyron.io/veyron/veyron2/vdl/vdlroot/src/signature"
+ "veyron.io/veyron/veyron2/verror2"
)
type SignatureManager interface {
- Signature(ctx context.T, name string, client ipc.Client, opts ...ipc.CallOpt) (*ipc.ServiceSignature, error)
+ Signature(ctx context.T, name string, client ipc.Client, opts ...ipc.CallOpt) ([]signature.Interface, error)
FlushCacheEntry(name string)
}
@@ -35,7 +38,7 @@
)
type cacheEntry struct {
- signature ipc.ServiceSignature
+ sig []signature.Interface
lastAccessed time.Time
}
@@ -44,39 +47,32 @@
return time.Now().Sub(c.lastAccessed) > ttl
}
-// Signature uses the given client to fetch the signature for the given service name.
-// It locks until it fetches the service signature from the remote server, if not a cache hit.
-func (sm *signatureManager) Signature(ctx context.T, name string, client ipc.Client, opts ...ipc.CallOpt) (*ipc.ServiceSignature, error) {
+const pkgPath = "veyron.io/wspr/veyron/services/wsprd/lib"
+
+// Signature uses the given client to fetch the signature for the given service
+// name. It either returns the signature from the cache, or blocks until it
+// fetches the signature from the remote server.
+func (sm *signatureManager) Signature(ctx context.T, name string, client ipc.Client, opts ...ipc.CallOpt) ([]signature.Interface, error) {
sm.Lock()
defer sm.Unlock()
- if cashedSig := sm.cache[name]; cashedSig != nil && !cashedSig.expired() {
- cashedSig.lastAccessed = time.Now()
- return &cashedSig.signature, nil
+ if entry := sm.cache[name]; entry != nil && !entry.expired() {
+ entry.lastAccessed = time.Now()
+ return entry.sig, nil
}
- // cache expired or not found, fetch it from the remote server
- signatureCall, err := client.StartCall(ctx, name, "Signature", []interface{}{}, opts...)
+ // Fetch from the remote server.
+ sig, err := reserved.Signature(ctx, client, name, opts...)
if err != nil {
- return nil, err
+ return nil, verror2.Make(verror2.NoServers, ctx, name, err)
}
- var result ipc.ServiceSignature
- var appErr error
- if err := signatureCall.Finish(&result, &appErr); err != nil {
- return nil, err
- }
- if appErr != nil {
- return nil, appErr
- }
-
- // cache the result
+ // Add to the cache.
sm.cache[name] = &cacheEntry{
- signature: result,
+ sig: sig,
lastAccessed: time.Now(),
}
-
- return &result, nil
+ return sig, nil
}
// FlushCacheEntry removes the cached signature for the given name
diff --git a/services/wsprd/lib/signature_manager_test.go b/services/wsprd/lib/signature_manager_test.go
index 16cdbe0..a6874ea 100644
--- a/services/wsprd/lib/signature_manager_test.go
+++ b/services/wsprd/lib/signature_manager_test.go
@@ -6,23 +6,24 @@
_ "veyron.io/veyron/veyron/profiles"
mocks_ipc "veyron.io/veyron/veyron/runtimes/google/testing/mocks/ipc"
- "veyron.io/veyron/veyron2/ipc"
"veyron.io/veyron/veyron2/rt"
- "veyron.io/veyron/veyron2/vdl/vdlutil"
- "veyron.io/veyron/veyron2/wiretype"
+ "veyron.io/veyron/veyron2/vdl"
+ "veyron.io/veyron/veyron2/vdl/vdlroot/src/signature"
)
const (
name = "/veyron/name"
)
-func expectedSignature() ipc.ServiceSignature {
- return ipc.ServiceSignature{
- Methods: make(map[string]ipc.MethodSignature),
- TypeDefs: []vdlutil.Any{
- wiretype.NamedPrimitiveType{
- Name: "veyron.io/veyron/veyron2/vdlutil.AnyData",
- Type: wiretype.TypeIDInterface,
+func wantSignature() []signature.Interface {
+ return []signature.Interface{
+ {
+ Methods: []signature.Method{
+ {
+ Name: "Method1",
+ InArgs: []signature.Arg{{Type: vdl.StringType}},
+ OutArgs: []signature.Arg{{Type: vdl.ErrorType}},
+ },
},
},
}
@@ -31,50 +32,11 @@
func client() *mocks_ipc.SimpleMockClient {
return mocks_ipc.NewSimpleClient(
map[string][]interface{}{
- "Signature": []interface{}{expectedSignature(), nil},
+ "__Signature": []interface{}{wantSignature(), nil},
},
)
}
-func assertMethodSignatureAsExpected(t *testing.T, got, expected ipc.MethodSignature) {
- if !reflect.DeepEqual(got.InArgs, expected.InArgs) {
- t.Errorf(`InArgs do not match: result "%v", want "%v"`, got.InArgs, expected.InArgs)
- return
- }
- if !reflect.DeepEqual(got.OutArgs, expected.OutArgs) {
- t.Errorf(`OutArgs do not match: result "%v", want "%v"`, got.OutArgs, expected.OutArgs)
- return
- }
- if got.InStream != expected.InStream {
- t.Errorf(`InStreams do not match: result "%v", want "%v"`, got.InStream, expected.InStream)
- return
- }
- if got.OutStream != expected.OutStream {
- t.Errorf(`OutStream do not match: result "%v", want "%v"`, got.OutStream, expected.OutStream)
- return
- }
-}
-
-func assertSignatureAsExpected(t *testing.T, got, expected *ipc.ServiceSignature) {
- if !reflect.DeepEqual(got.TypeDefs, expected.TypeDefs) {
- t.Errorf(`TypeDefs do not match: result "%v", want "%v"`, got.TypeDefs, expected.TypeDefs)
- return
- }
- if n, m := len(got.Methods), len(expected.Methods); n != m {
- t.Errorf(`Wrong number of signature methods: result "%d", want "%d"`, n, m)
- return
- }
- for gotName, gotMethod := range got.Methods {
- expectedMethod, ok := expected.Methods[gotName]
- if !ok {
- t.Errorf(`Method "%v" was expected but not found`, gotName)
- return
- }
-
- assertMethodSignatureAsExpected(t, gotMethod, expectedMethod)
- }
-}
-
func TestFetching(t *testing.T) {
runtime, err := rt.New()
if err != nil {
@@ -88,8 +50,9 @@
t.Errorf(`Did not expect an error but got %v`, err)
return
}
- expected := expectedSignature()
- assertSignatureAsExpected(t, got, &expected)
+ if want := wantSignature(); !reflect.DeepEqual(got, want) {
+ t.Errorf(`Signature got %v, want %v`, got, want)
+ }
}
func TestThatCachedAfterFetching(t *testing.T) {
@@ -106,7 +69,9 @@
t.Errorf(`Signature manager did not cache the results`)
return
}
- assertSignatureAsExpected(t, &cache.signature, sig)
+ if got, want := cache.sig, sig; !reflect.DeepEqual(got, want) {
+ t.Errorf(`Cached signature got %v, want %v`, got, want)
+ }
}
func TestThatCacheIsUsed(t *testing.T) {
@@ -125,7 +90,7 @@
// expect number of calls to Signature method of client to still be 1 since cache
// should have been used despite the second call
- if client.TimesCalled("Signature") != 1 {
+ if client.TimesCalled("__Signature") != 1 {
t.Errorf("Signature cache was not used for the second call")
}
}
@@ -172,7 +137,7 @@
sm.Signature(runtime.NewContext(), name, client)
// expect number of calls to Signature method of client to be 2 since cache should have expired
- if client.TimesCalled("Signature") != 2 {
+ if client.TimesCalled("__Signature") != 2 {
t.Errorf("Cache was still used but TTL had passed. It should have been fetched again")
}
}
diff --git a/services/wsprd/lib/testwriter/writer.go b/services/wsprd/lib/testwriter/writer.go
index 093445f..f0766cc 100644
--- a/services/wsprd/lib/testwriter/writer.go
+++ b/services/wsprd/lib/testwriter/writer.go
@@ -7,6 +7,7 @@
"reflect"
"sync"
"time"
+
"veyron.io/veyron/veyron2/verror2"
"veyron.io/wspr/veyron/services/wsprd/lib"
)
@@ -15,14 +16,9 @@
Errorf(fmt string, a ...interface{})
}
-type Response struct {
- Type lib.ResponseType
- Message interface{}
-}
-
type Writer struct {
sync.Mutex
- Stream []Response
+ Stream []lib.Response
err error
// If this channel is set then a message will be sent
// to this channel after recieving a call to FinishMessage()
@@ -33,11 +29,11 @@
// We serialize and deserialize the reponse so that we can do deep equal with
// messages that contain non-exported structs.
var buf bytes.Buffer
- if err := json.NewEncoder(&buf).Encode(Response{Type: responseType, Message: msg}); err != nil {
+ if err := json.NewEncoder(&buf).Encode(lib.Response{Type: responseType, Message: msg}); err != nil {
return err
}
- var r Response
+ var r lib.Response
if err := json.NewDecoder(&buf).Decode(&r); err != nil {
return err
@@ -86,7 +82,7 @@
return nil
}
-func CheckResponses(w *Writer, wantStream []Response, wantErr error) error {
+func CheckResponses(w *Writer, wantStream []lib.Response, wantErr error) error {
if got, want := w.Stream, wantStream; !reflect.DeepEqual(got, want) {
return fmt.Errorf("streams don't match: got %v, want %v", got, want)
}
diff --git a/services/wsprd/lib/vom.go b/services/wsprd/lib/vom.go
index b145009..493376d 100644
--- a/services/wsprd/lib/vom.go
+++ b/services/wsprd/lib/vom.go
@@ -3,6 +3,7 @@
import (
"bytes"
"encoding/hex"
+
"veyron.io/veyron/veyron2/vom2"
)
@@ -12,9 +13,28 @@
if err != nil {
return "", err
}
-
if err := encoder.Encode(v); err != nil {
return "", err
}
return hex.EncodeToString(buf.Bytes()), nil
}
+
+func VomEncodeOrDie(v interface{}) string {
+ s, err := VomEncode(v)
+ if err != nil {
+ panic(err)
+ }
+ return s
+}
+
+func VomDecode(data string, v interface{}) error {
+ binbytes, err := hex.DecodeString(data)
+ if err != nil {
+ return err
+ }
+ decoder, err := vom2.NewDecoder(bytes.NewReader(binbytes))
+ if err != nil {
+ return err
+ }
+ return decoder.Decode(v)
+}
diff --git a/services/wsprd/lib/writer.go b/services/wsprd/lib/writer.go
index 61a2cd5..bcf1f8d 100644
--- a/services/wsprd/lib/writer.go
+++ b/services/wsprd/lib/writer.go
@@ -13,6 +13,11 @@
ResponseCancel = 7
)
+type Response struct {
+ Type ResponseType
+ Message interface{}
+}
+
// This is basically an io.Writer interface, that allows passing error message
// strings. This is how the proxy will talk to the javascript/java clients.
type ClientWriter interface {
@@ -20,3 +25,9 @@
Error(err error)
}
+
+// The response from the javascript server to the proxy.
+type ServerRPCReply struct {
+ Results []interface{}
+ Err error
+}
diff --git a/services/wsprd/namespace/request_handler.go b/services/wsprd/namespace/request_handler.go
index 857af50..48112c3 100644
--- a/services/wsprd/namespace/request_handler.go
+++ b/services/wsprd/namespace/request_handler.go
@@ -1,8 +1,6 @@
package namespace
import (
- "bytes"
- "encoding/hex"
"encoding/json"
"time"
@@ -10,7 +8,6 @@
"veyron.io/veyron/veyron2/context"
"veyron.io/veyron/veyron2/naming"
"veyron.io/veyron/veyron2/verror2"
- "veyron.io/veyron/veyron2/vom2"
"veyron.io/wspr/veyron/services/wsprd/lib"
)
@@ -119,20 +116,6 @@
}
}
-func encodeVom2(value interface{}) (string, error) {
- var buf bytes.Buffer
- encoder, err := vom2.NewBinaryEncoder(&buf)
- if err != nil {
- return "", err
- }
-
- if err := encoder.Encode(value); err != nil {
- return "", err
- }
- return hex.EncodeToString(buf.Bytes()), nil
-
-}
-
func convertToVDLEntry(value naming.MountEntry) naming.VDLMountEntry {
result := naming.VDLMountEntry{
Name: value.Name,
@@ -172,7 +155,7 @@
continue
}
- val, err := encodeVom2(convertToVDLEntry(mp))
+ val, err := lib.VomEncode(convertToVDLEntry(mp))
if err != nil {
w.Error(verror2.Make(verror2.Internal, ctx, err))
return
diff --git a/services/wsprd/signature/signature.go b/services/wsprd/signature/signature.go
deleted file mode 100644
index 5c89bfc..0000000
--- a/services/wsprd/signature/signature.go
+++ /dev/null
@@ -1,94 +0,0 @@
-package signature
-
-import (
- "veyron.io/veyron/veyron2/ipc"
- "veyron.io/veyron/veyron2/vdl/vdlutil"
- "veyron.io/veyron/veyron2/wiretype"
- "veyron.io/wspr/veyron/services/wsprd/lib"
-)
-
-var (
- anydataType = wiretype.NamedPrimitiveType{
- Name: "veyron.io/veyron/veyron2/vdlutil.AnyData",
- Type: wiretype.TypeIDInterface,
- }
- errType = wiretype.NamedPrimitiveType{
- Name: "error",
- Type: wiretype.TypeIDInterface,
- }
- anydataTypeID = wiretype.TypeIDFirst
- errTypeID = wiretype.TypeIDFirst
-)
-
-// JSONServiceSignature represents the information about a service signature that is used by JSON.
-type JSONServiceSignature map[string]JSONMethodSignature
-
-// JSONMethodSignature represents the information about a method signature that is used by JSON.
-type JSONMethodSignature struct {
- InArgs []string // InArgs is a list of argument names.
- NumOutArgs int
- IsStreaming bool
- Tags []interface{}
-}
-
-// NewJSONServiceSignature converts an ipc service signature to the format used by JSON.
-func NewJSONServiceSignature(sig ipc.ServiceSignature) JSONServiceSignature {
- jsig := JSONServiceSignature{}
-
- for name, methSig := range sig.Methods {
- jmethSig := JSONMethodSignature{
- InArgs: make([]string, len(methSig.InArgs)),
- NumOutArgs: len(methSig.OutArgs),
- IsStreaming: methSig.InStream != wiretype.TypeIDInvalid || methSig.OutStream != wiretype.TypeIDInvalid,
- }
-
- for i, inarg := range methSig.InArgs {
- jmethSig.InArgs[i] = inarg.Name
- }
-
- jsig[lib.LowercaseFirstCharacter(name)] = jmethSig
- }
-
- return jsig
-}
-
-// ServiceSignature converts a JSONServiceSignature to an ipc service signature.
-func (jss JSONServiceSignature) ServiceSignature() (ipc.ServiceSignature, error) {
- ss := ipc.ServiceSignature{
- Methods: make(map[string]ipc.MethodSignature),
- }
-
- for name, sig := range jss {
- ms := ipc.MethodSignature{}
-
- ms.InArgs = make([]ipc.MethodArgument, len(sig.InArgs))
- for i, argName := range sig.InArgs {
- ms.InArgs[i] = ipc.MethodArgument{
- Name: argName,
- Type: anydataTypeID,
- }
- }
-
- ms.OutArgs = make([]ipc.MethodArgument, sig.NumOutArgs)
- for i := 0; i < sig.NumOutArgs-1; i++ {
- ms.OutArgs[i] = ipc.MethodArgument{
- Type: anydataTypeID,
- }
- }
- ms.OutArgs[sig.NumOutArgs-1] = ipc.MethodArgument{
- Name: "err",
- Type: errTypeID,
- }
-
- if sig.IsStreaming {
- ms.InStream = anydataTypeID
- ms.OutStream = anydataTypeID
- }
-
- ss.Methods[lib.UppercaseFirstCharacter(name)] = ms
- }
-
- ss.TypeDefs = []vdlutil.Any{anydataType, errType}
-
- return ss, nil
-}
diff --git a/services/wsprd/signature/signature_test.go b/services/wsprd/signature/signature_test.go
deleted file mode 100644
index dd9e85a..0000000
--- a/services/wsprd/signature/signature_test.go
+++ /dev/null
@@ -1,117 +0,0 @@
-package signature
-
-import (
- "reflect"
- "testing"
-
- "veyron.io/veyron/veyron2/ipc"
- "veyron.io/veyron/veyron2/vdl/vdlutil"
- "veyron.io/veyron/veyron2/wiretype"
-)
-
-func TestNewJSONServiceSignature(t *testing.T) {
- sigIn := ipc.ServiceSignature{
- Methods: map[string]ipc.MethodSignature{
- "FirstMethod": ipc.MethodSignature{
- InArgs: []ipc.MethodArgument{
- ipc.MethodArgument{
- Name: "FirstArg",
- Type: wiretype.TypeIDFloat64,
- },
- ipc.MethodArgument{
- Name: "SecondArg",
- Type: wiretype.TypeIDUintptr,
- },
- },
- OutArgs: []ipc.MethodArgument{
- ipc.MethodArgument{
- Name: "FirstOutArg",
- Type: wiretype.TypeIDFloat64,
- },
- ipc.MethodArgument{
- Name: "SecondOutArg",
- Type: anydataTypeID,
- },
- ipc.MethodArgument{
- Name: "ThirdOutArg",
- Type: wiretype.TypeIDInt32,
- },
- ipc.MethodArgument{
- Name: "err",
- Type: wiretype.TypeIDFirst,
- },
- },
- OutStream: wiretype.TypeIDString,
- },
- },
- TypeDefs: []vdlutil.Any{
- anydataType,
- errType,
- },
- }
-
- sigOut := NewJSONServiceSignature(sigIn)
-
- expectedSigOut := JSONServiceSignature{
- "firstMethod": JSONMethodSignature{
- InArgs: []string{"FirstArg", "SecondArg"},
- NumOutArgs: 4,
- IsStreaming: true,
- },
- }
-
- if !reflect.DeepEqual(sigOut, expectedSigOut) {
- t.Errorf("Signature differed from expectation. got: %v but expected %v", sigOut, expectedSigOut)
- }
-}
-
-func TestServiceSignature(t *testing.T) {
- sigIn := JSONServiceSignature{
- "firstMethod": JSONMethodSignature{
- InArgs: []string{"FirstArg", "SecondArg"},
- NumOutArgs: 2,
- IsStreaming: true,
- },
- }
-
- sigOut, err := sigIn.ServiceSignature()
- if err != nil {
- t.Fatal("error in service signature", err)
- }
-
- expectedSigOut := ipc.ServiceSignature{
- Methods: map[string]ipc.MethodSignature{
- "FirstMethod": ipc.MethodSignature{
- InArgs: []ipc.MethodArgument{
- ipc.MethodArgument{
- Name: "FirstArg",
- Type: anydataTypeID,
- },
- ipc.MethodArgument{
- Name: "SecondArg",
- Type: anydataTypeID,
- },
- },
- OutArgs: []ipc.MethodArgument{
- ipc.MethodArgument{
- Type: anydataTypeID,
- },
- ipc.MethodArgument{
- Name: "err",
- Type: errTypeID,
- },
- },
- InStream: anydataTypeID,
- OutStream: anydataTypeID,
- },
- },
- TypeDefs: []vdlutil.Any{
- anydataType,
- errType,
- },
- }
-
- if !reflect.DeepEqual(sigOut, expectedSigOut) {
- t.Error("Signature differed from expectation. got: %v but expected %v", sigOut, expectedSigOut)
- }
-}