veyron/services/wsprd: Changed wsprd to expose the dispatcher interface
used by the go servers to javascript.
Change-Id: Ie80ce552dc6bc522f165b4f5a0a516be9c35c035
diff --git a/services/wsprd/app/app.go b/services/wsprd/app/app.go
index 5037b62..242f4b1 100644
--- a/services/wsprd/app/app.go
+++ b/services/wsprd/app/app.go
@@ -60,7 +60,6 @@
type serveRequest struct {
Name string
ServerId uint64
- Service signature.JSONServiceSignature
}
type jsonCaveatValidator struct {
@@ -85,7 +84,8 @@
// Controller represents all the state of a Veyron Web App. This is the struct
// that is in charge performing all the veyron options.
type Controller struct {
- // Protects outstandingStreams and outstandingServerRequests.
+ // Protects everything.
+ // TODO(bjornick): We need to split this up.
sync.Mutex
logger vlog.Logger
@@ -201,8 +201,6 @@
}
func (c *Controller) startCall(ctx context.T, w lib.ClientWriter, msg *veyronRPC) (ipc.Call, error) {
- c.Lock()
- defer c.Unlock()
if c.client == nil {
return nil, verror.BadArgf("no client created")
}
@@ -441,7 +439,7 @@
c.logger.VI(2).Infof("serving under name: %q", serveRequest.Name)
- endpoint, err := server.Serve(serveRequest.Name, serveRequest.Service)
+ endpoint, err := server.Serve(serveRequest.Name)
if err != nil {
w.Error(verror.Internalf("error serving service: %v", err))
return
@@ -466,6 +464,21 @@
c.serve(serveRequest, w)
}
+// HandleLookupResponse handles the result of a Dispatcher.Lookup call that was
+// run by the Javascript server.
+func (c *Controller) HandleLookupResponse(id int64, data string, w lib.ClientWriter) {
+ c.Lock()
+ server := c.flowMap[id]
+ c.Unlock()
+ if server == nil {
+ c.logger.Errorf("unexpected result from JavaScript. No channel "+
+ "for MessageId: %d exists. Ignoring the results.", id)
+ //Ignore unknown responses that don't belong to any channel
+ return
+ }
+ server.HandleLookupResponse(id, data)
+}
+
// HandleStopRequest takes a request to stop a server.
func (c *Controller) HandleStopRequest(data string, w lib.ClientWriter) {
var serverId uint64
diff --git a/services/wsprd/app/app_test.go b/services/wsprd/app/app_test.go
index adcf1a8..289bcaf 100644
--- a/services/wsprd/app/app_test.go
+++ b/services/wsprd/app/app_test.go
@@ -1,15 +1,12 @@
package app
import (
- "bytes"
"encoding/json"
"fmt"
"reflect"
- "strings"
- "sync"
"testing"
- "time"
"veyron.io/veyron/veyron/services/wsprd/lib"
+ "veyron.io/veyron/veyron/services/wsprd/lib/testwriter"
"veyron.io/veyron/veyron/services/wsprd/signature"
"veyron.io/veyron/veyron2"
"veyron.io/veyron/veyron2/ipc"
@@ -18,7 +15,6 @@
"veyron.io/veyron/veyron2/security"
"veyron.io/veyron/veyron2/vdl/vdlutil"
"veyron.io/veyron/veyron2/verror"
- "veyron.io/veyron/veyron2/vlog"
"veyron.io/veyron/veyron2/vom"
vom_wiretype "veyron.io/veyron/veyron2/vom/wiretype"
"veyron.io/veyron/veyron2/wiretype"
@@ -27,12 +23,6 @@
mounttable "veyron.io/veyron/veyron/services/mounttable/lib"
)
-var (
- ctxFooAlice = makeMockSecurityContext("Foo", "test/alice")
- ctxBarAlice = makeMockSecurityContext("Bar", "test/alice")
- ctxFooBob = makeMockSecurityContext("Foo", "test/bob")
- ctxBarBob = makeMockSecurityContext("Bar", "test/bob")
-)
var r = rt.Init()
type simpleAdder struct{}
@@ -135,88 +125,6 @@
return startAnyServer(true, mt)
}
-type response struct {
- Type lib.ResponseType
- Message interface{}
-}
-
-type testWriter struct {
- sync.Mutex
- stream []response
- err error
- logger vlog.Logger
- // If this channel is set then a message will be sent
- // to this channel after recieving a call to FinishMessage()
- notifier chan bool
-}
-
-func (w *testWriter) Send(responseType lib.ResponseType, msg interface{}) error {
- w.Lock()
- defer w.Unlock()
- // We serialize and deserialize the reponse so that we can do deep equal with
- // messages that contain non-exported structs.
- var buf bytes.Buffer
- if err := json.NewEncoder(&buf).Encode(response{Type: responseType, Message: msg}); err != nil {
- return err
- }
-
- var r response
-
- if err := json.NewDecoder(&buf).Decode(&r); err != nil {
- return err
- }
-
- w.stream = append(w.stream, r)
- if w.notifier != nil {
- w.notifier <- true
- }
- return nil
-
-}
-
-func (w *testWriter) Error(err error) {
- w.err = err
-}
-
-func (w *testWriter) streamLength() int {
- w.Lock()
- defer w.Unlock()
- return len(w.stream)
-}
-
-// Waits until there is at least n messages in w.stream. Returns an error if we timeout
-// waiting for the given number of messages.
-func (w *testWriter) waitForMessage(n int) error {
- if w.streamLength() >= n {
- return nil
- }
- w.Lock()
- w.notifier = make(chan bool, 1)
- w.Unlock()
- for w.streamLength() < n {
- select {
- case <-w.notifier:
- continue
- case <-time.After(time.Second):
- return fmt.Errorf("timed out")
- }
- }
- w.Lock()
- w.notifier = nil
- w.Unlock()
- return nil
-}
-
-func checkResponses(w *testWriter, expectedStream []response, err error, t *testing.T) {
- if !reflect.DeepEqual(expectedStream, w.stream) {
- t.Errorf("streams don't match: expected %v, got %v", expectedStream, w.stream)
- }
-
- if !reflect.DeepEqual(err, w.err) {
- t.Errorf("unexpected error, got: %v, expected: %v", err, w.err)
- }
-}
-
var adderServiceSignature signature.JSONServiceSignature = signature.JSONServiceSignature{
"add": signature.JSONMethodSignature{
InArgs: []string{"A", "B"},
@@ -264,7 +172,7 @@
numOutArgs int32
streamingInputs []string
streamingInputType vom.Type
- expectedStream []response
+ expectedStream []testwriter.Response
expectedError error
}
@@ -285,9 +193,7 @@
return
}
- writer := testWriter{
- logger: controller.logger,
- }
+ writer := testwriter.Writer{}
var stream *outstandingStream
if len(test.streamingInputs) > 0 {
stream = newStream()
@@ -309,7 +215,7 @@
}
controller.sendVeyronRequest(r.NewContext(), 0, &request, &writer, stream)
- checkResponses(&writer, test.expectedStream, test.expectedError, t)
+ testwriter.CheckResponses(&writer, test.expectedStream, test.expectedError, t)
}
func TestCallingGoServer(t *testing.T) {
@@ -317,8 +223,8 @@
method: "Add",
inArgs: []json.RawMessage{json.RawMessage("2"), json.RawMessage("3")},
numOutArgs: 2,
- expectedStream: []response{
- response{
+ expectedStream: []testwriter.Response{
+ testwriter.Response{
Message: []interface{}{5.0},
Type: lib.ResponseFinal,
},
@@ -342,28 +248,28 @@
streamingInputs: []string{"1", "2", "3", "4"},
streamingInputType: vom_wiretype.Type{ID: 36},
numOutArgs: 2,
- expectedStream: []response{
- response{
+ expectedStream: []testwriter.Response{
+ testwriter.Response{
Message: 1.0,
Type: lib.ResponseStream,
},
- response{
+ testwriter.Response{
Message: 3.0,
Type: lib.ResponseStream,
},
- response{
+ testwriter.Response{
Message: 6.0,
Type: lib.ResponseStream,
},
- response{
+ testwriter.Response{
Message: 10.0,
Type: lib.ResponseStream,
},
- response{
+ testwriter.Response{
Message: nil,
Type: lib.ResponseStreamClose,
},
- response{
+ testwriter.Response{
Message: []interface{}{10.0},
Type: lib.ResponseFinal,
},
@@ -373,7 +279,7 @@
type runningTest struct {
controller *Controller
- writer *testWriter
+ writer *testwriter.Writer
mounttableServer ipc.Server
proxyServer *proxy.Proxy
}
@@ -393,7 +299,7 @@
proxyEndpoint := proxyServer.Endpoint().String()
- writer := testWriter{}
+ writer := testwriter.Writer{}
writerCreator := func(int64) lib.ClientWriter {
return &writer
@@ -404,11 +310,8 @@
return nil, err
}
- writer.logger = controller.logger
-
controller.serve(serveRequest{
- Name: "adder",
- Service: adderServiceSignature,
+ Name: "adder",
}, &writer)
return &runningTest{
@@ -425,12 +328,12 @@
t.Fatalf("could not serve server %v", err)
}
- if len(rt.writer.stream) != 1 {
- t.Errorf("expected only one response, got %d", len(rt.writer.stream))
+ if len(rt.writer.Stream) != 1 {
+ t.Errorf("expected only one response, got %d", len(rt.writer.Stream))
return
}
- resp := rt.writer.stream[0]
+ resp := rt.writer.Stream[0]
if resp.Type != lib.ResponseFinal {
t.Errorf("unknown stream message Got: %v, expected: serve response", resp)
@@ -488,7 +391,7 @@
func sendServerStream(t *testing.T, controller *Controller, test *jsServerTestCase, w lib.ClientWriter) {
for _, msg := range test.serverStream {
- controller.SendOnStream(0, msg, w)
+ controller.SendOnStream(4, msg, w)
}
serverReply := map[string]interface{}{
@@ -500,7 +403,7 @@
if err != nil {
t.Fatalf("Failed to serialize the reply: %v", err)
}
- controller.HandleServerResponse(0, string(bytes))
+ controller.HandleServerResponse(4, string(bytes))
}
func runJsServerTestCase(t *testing.T, test jsServerTestCase) {
@@ -513,12 +416,12 @@
t.Errorf("could not serve server %v", err)
}
- if len(rt.writer.stream) != 1 {
- t.Errorf("expected only on response, got %d", len(rt.writer.stream))
+ if len(rt.writer.Stream) != 1 {
+ t.Errorf("expected only on response, got %d", len(rt.writer.Stream))
return
}
- resp := rt.writer.stream[0]
+ resp := rt.writer.Stream[0]
if resp.Type != lib.ResponseFinal {
t.Errorf("unknown stream message Got: %v, expected: serve response", resp)
@@ -534,7 +437,7 @@
t.Errorf("invalid endpdoint returned from serve: %v", resp.Message)
}
- rt.writer.stream = nil
+ rt.writer.Stream = nil
// Create a client using app's runtime so it points to the right mounttable.
client, err := rt.controller.rt.NewClient()
@@ -543,52 +446,110 @@
t.Errorf("unable to create client: %v", err)
}
+ expectedWebsocketMessage := []testwriter.Response{
+ testwriter.Response{
+ Type: lib.ResponseDispatcherLookup,
+ Message: map[string]interface{}{
+ "serverId": 0.0,
+ "suffix": "adder",
+ "method": "resolveStep",
+ },
+ },
+ }
+
+ // We have to have a go routine handle the resolveStep call because StartCall blocks until the
+ // resolve step is complete.
+ go func() {
+ // Wait until ResolveStep lookup has been called.
+ if err := rt.writer.WaitForMessage(len(expectedWebsocketMessage)); err != nil {
+ t.Errorf("didn't receive expected message: %v", err)
+ }
+ fmt.Printf("writer data is %v", rt.writer)
+
+ // Handle the ResolveStep
+ dispatcherResponse := map[string]interface{}{
+ "Err": map[string]interface{}{
+ "id": "veyron2/verror.NotFound",
+ "message": "ResolveStep not found",
+ },
+ }
+ bytes, err := json.Marshal(dispatcherResponse)
+ if err != nil {
+ t.Errorf("failed to serailize the response: %v", err)
+ return
+ }
+ rt.controller.HandleLookupResponse(0, string(bytes), rt.writer)
+ }()
+
call, err := client.StartCall(rt.controller.rt.NewContext(), "/"+msg+"/adder", test.method, test.inArgs)
if err != nil {
t.Errorf("failed to start call: %v", err)
}
+ expectedWebsocketMessage = append(expectedWebsocketMessage, testwriter.Response{
+ Type: lib.ResponseDispatcherLookup,
+ Message: map[string]interface{}{
+ "serverId": 0.0,
+ "suffix": "adder",
+ "method": lib.LowercaseFirstCharacter(test.method),
+ },
+ })
+
+ if err := rt.writer.WaitForMessage(len(expectedWebsocketMessage)); err != nil {
+ t.Errorf("didn't receive expected message: %v", err)
+ }
+
+ dispatcherResponse := map[string]interface{}{
+ "handle": 0,
+ "signature": adderServiceSignature,
+ }
+ bytes, err := json.Marshal(dispatcherResponse)
+ if err != nil {
+ t.Errorf("failed to serailize the response: %v", err)
+ return
+ }
+ rt.controller.HandleLookupResponse(2, string(bytes), rt.writer)
+
typedNames := rt.controller.rt.Identity().PublicID().Names()
names := []interface{}{}
for _, n := range typedNames {
names = append(names, n)
}
- expectedWebsocketMessage := []response{
- response{
- Type: lib.ResponseServerRequest,
- Message: map[string]interface{}{
- "ServerId": 0.0,
- "Method": lib.LowercaseFirstCharacter(test.method),
- "Args": test.inArgs,
- "Context": map[string]interface{}{
- "Name": "adder",
- "Suffix": "adder",
- "RemoteID": map[string]interface{}{
- "Handle": 1.0,
- "Names": names,
- },
+ expectedWebsocketMessage = append(expectedWebsocketMessage, testwriter.Response{
+ Type: lib.ResponseServerRequest,
+ Message: map[string]interface{}{
+ "ServerId": 0.0,
+ "Method": lib.LowercaseFirstCharacter(test.method),
+ "Handle": 0.0,
+ "Args": test.inArgs,
+ "Context": map[string]interface{}{
+ "Name": "adder",
+ "Suffix": "adder",
+ "RemoteID": map[string]interface{}{
+ "Handle": 1.0,
+ "Names": names,
},
},
},
- }
+ })
// Wait until the rpc has started.
- if err := rt.writer.waitForMessage(len(expectedWebsocketMessage)); err != nil {
- t.Errorf("didn't recieve expected message: %v", err)
+ if err := rt.writer.WaitForMessage(len(expectedWebsocketMessage)); err != nil {
+ t.Errorf("didn't receive expected message: %v", err)
}
for _, msg := range test.clientStream {
- expectedWebsocketMessage = append(expectedWebsocketMessage, response{Type: lib.ResponseStream, Message: msg})
+ expectedWebsocketMessage = append(expectedWebsocketMessage, testwriter.Response{Type: lib.ResponseStream, Message: msg})
if err := call.Send(msg); err != nil {
t.Errorf("unexpected error while sending %v: %v", msg, err)
}
}
// Wait until all the streaming messages have been acknowledged.
- if err := rt.writer.waitForMessage(len(expectedWebsocketMessage)); err != nil {
- t.Errorf("didn't recieve expected message: %v", err)
+ if err := rt.writer.WaitForMessage(len(expectedWebsocketMessage)); err != nil {
+ t.Errorf("didn't receive expected message: %v", err)
}
- expectedWebsocketMessage = append(expectedWebsocketMessage, response{Type: lib.ResponseStreamClose})
+ expectedWebsocketMessage = append(expectedWebsocketMessage, testwriter.Response{Type: lib.ResponseStreamClose})
expectedStream := test.expectedServerStream
go sendServerStream(t, rt.controller, &test, rt.writer)
@@ -625,11 +586,11 @@
}
// Wait until the close streaming messages have been acknowledged.
- if err := rt.writer.waitForMessage(len(expectedWebsocketMessage)); err != nil {
- t.Errorf("didn't recieve expected message: %v", err)
+ if err := rt.writer.WaitForMessage(len(expectedWebsocketMessage)); err != nil {
+ t.Errorf("didn't receive expected message: %v", err)
}
- checkResponses(rt.writer, expectedWebsocketMessage, nil, t)
+ testwriter.CheckResponses(rt.writer, expectedWebsocketMessage, nil, t)
}
func TestSimpleJSServer(t *testing.T) {
@@ -721,219 +682,3 @@
}
}
}
-
-func createChain(r veyron2.Runtime, name string) security.PrivateID {
- id := r.Identity()
-
- for _, component := range strings.Split(name, "/") {
- newID, err := r.NewIdentity(component)
- if err != nil {
- panic(err)
- }
- if id == nil {
- id = newID
- continue
- }
- blessedID, err := id.Bless(newID.PublicID(), component, time.Hour, nil)
- if err != nil {
- panic(err)
- }
- id, err = newID.Derive(blessedID)
- if err != nil {
- panic(err)
- }
- }
- return id
-}
-
-type mockSecurityContext struct {
- method string
- localID security.PublicID
-}
-
-func makeMockSecurityContext(method string, name string) *mockSecurityContext {
- return &mockSecurityContext{
- method: method,
- localID: createChain(r, name).PublicID(),
- }
-}
-
-func (m *mockSecurityContext) Method() string { return m.method }
-
-func (m *mockSecurityContext) LocalID() security.PublicID { return m.localID }
-
-func (*mockSecurityContext) Name() string { return "" }
-
-func (*mockSecurityContext) Suffix() string { return "" }
-
-func (*mockSecurityContext) Label() security.Label { return 0 }
-
-func (*mockSecurityContext) Discharges() map[string]security.Discharge { return nil }
-
-func (*mockSecurityContext) RemoteID() security.PublicID { return nil }
-
-func (*mockSecurityContext) LocalEndpoint() naming.Endpoint { return nil }
-
-func (*mockSecurityContext) RemoteEndpoint() naming.Endpoint { return nil }
-
-type blessingTestCase struct {
- requestJSON map[string]interface{}
- expectedValidateResults map[*mockSecurityContext]bool
- expectedErr error
-}
-
-func runBlessingTest(c blessingTestCase, t *testing.T) {
- controller, err := NewController(nil, "mockVeyronProxyEP")
-
- if err != nil {
- t.Errorf("unable to create controller: %v", err)
- return
- }
- controller.AddIdentity(createChain(rt.R(), "test/bar").PublicID())
-
- bytes, err := json.Marshal(c.requestJSON)
-
- if err != nil {
- t.Errorf("failed to marshal request: %v", err)
- return
- }
-
- var request blessingRequest
- if err := json.Unmarshal(bytes, &request); err != nil {
- t.Errorf("failed to unmarshal request: %v", err)
- return
- }
-
- jsId, err := controller.bless(request)
-
- if !reflect.DeepEqual(err, c.expectedErr) {
- t.Errorf("error response does not match: expected %v, got %v", c.expectedErr, err)
- return
- }
-
- if err != nil {
- return
- }
-
- id := controller.idStore.Get(jsId.Handle)
-
- if id == nil {
- t.Errorf("couldn't get identity from store")
- return
- }
-
- for ctx, value := range c.expectedValidateResults {
- _, err := id.Authorize(ctx)
- if (err == nil) != value {
- t.Errorf("authorize failed to match expected value for %v: expected %v, got %v", ctx, value, err)
- }
- }
-}
-
-// The names of the identity in the mock contexts are root off the runtime's
-// identity. This function takes a name and prepends the runtime's identity's
-// name.
-func securityName(name string) string {
- return rt.R().Identity().PublicID().Names()[0] + "/" + name
-}
-
-func TestBlessingWithNoCaveats(t *testing.T) {
- runBlessingTest(blessingTestCase{
- requestJSON: map[string]interface{}{
- "handle": 1,
- "durationMs": 10000,
- "name": "foo",
- },
- expectedValidateResults: map[*mockSecurityContext]bool{
- ctxFooAlice: true,
- ctxFooBob: true,
- ctxBarAlice: true,
- ctxBarBob: true,
- },
- }, t)
-}
-
-func TestBlessingWithMethodRestrictions(t *testing.T) {
- runBlessingTest(blessingTestCase{
- requestJSON: map[string]interface{}{
- "handle": 1,
- "durationMs": 10000,
- "caveats": []map[string]interface{}{
- map[string]interface{}{
- "_type": "MethodCaveat",
- "service": security.AllPrincipals,
- "data": []string{"Foo"},
- },
- },
- "name": "foo",
- },
- expectedValidateResults: map[*mockSecurityContext]bool{
- ctxFooAlice: true,
- ctxFooBob: true,
- ctxBarAlice: false,
- ctxBarBob: false,
- },
- }, t)
-}
-
-func TestBlessingWithPeerRestrictions(t *testing.T) {
- runBlessingTest(blessingTestCase{
- requestJSON: map[string]interface{}{
- "handle": 1,
- "durationMs": 10000,
- "caveats": []map[string]interface{}{
- map[string]interface{}{
- "_type": "PeerBlessingsCaveat",
- "service": security.AllPrincipals,
- "data": []string{securityName("test/alice")},
- },
- },
- "name": "foo",
- },
- expectedValidateResults: map[*mockSecurityContext]bool{
- ctxFooAlice: true,
- ctxFooBob: false,
- ctxBarAlice: true,
- ctxBarBob: false,
- },
- }, t)
-}
-
-func TestBlessingWithMethodAndPeerRestrictions(t *testing.T) {
- runBlessingTest(blessingTestCase{
- requestJSON: map[string]interface{}{
- "handle": 1,
- "durationMs": 10000,
- "caveats": []map[string]interface{}{
- map[string]interface{}{
- "_type": "PeerBlessingsCaveat",
- "service": security.AllPrincipals,
- "data": []string{securityName("test/alice")},
- },
- map[string]interface{}{
- "_type": "MethodCaveat",
- "service": security.AllPrincipals,
- "data": []string{"Bar"},
- },
- },
- "name": "foo",
- },
- expectedValidateResults: map[*mockSecurityContext]bool{
- ctxFooAlice: false,
- ctxFooBob: false,
- ctxBarAlice: true,
- ctxBarBob: false,
- },
- }, t)
-}
-
-func TestBlessingWhereBlesseeDoesNotExist(t *testing.T) {
- runBlessingTest(blessingTestCase{
- requestJSON: map[string]interface{}{
- "handle": 2,
- "durationMs": 10000,
- "name": "foo",
- },
- expectedErr: verror.NoExistf("invalid PublicID handle"),
- }, t)
-}
diff --git a/services/wsprd/ipc/server/dispatcher.go b/services/wsprd/ipc/server/dispatcher.go
index c03c941..e50a6fc 100644
--- a/services/wsprd/ipc/server/dispatcher.go
+++ b/services/wsprd/ipc/server/dispatcher.go
@@ -1,24 +1,136 @@
package server
import (
+ "bytes"
+ "encoding/json"
+ "fmt"
+ "sync"
+
+ vsecurity "veyron.io/veyron/veyron/security"
+ "veyron.io/veyron/veyron/services/wsprd/lib"
+ "veyron.io/veyron/veyron/services/wsprd/signature"
+
"veyron.io/veyron/veyron2/ipc"
"veyron.io/veyron/veyron2/security"
+ "veyron.io/veyron/veyron2/verror"
+ "veyron.io/veyron/veyron2/vlog"
)
+type flowFactory interface {
+ createFlow() *Flow
+ cleanupFlow(id int64)
+}
+
+type invokerFactory interface {
+ createInvoker(handle int64, signature signature.JSONServiceSignature) (ipc.Invoker, error)
+}
+
+type lookupReply struct {
+ Handle int64
+ HasAuthorizer bool
+ Signature signature.JSONServiceSignature
+ Err *verror.Standard
+}
+
+type dispatcherRequest struct {
+ ServerID uint64 `json:"serverId"`
+ Suffix string `json:"suffix"`
+ Method string `json:"method"`
+}
+
// dispatcher holds the invoker and the authorizer to be used for lookup.
type dispatcher struct {
- invoker ipc.Invoker
- authorizer security.Authorizer
+ mu sync.Mutex
+ serverID uint64
+ flowFactory flowFactory
+ invokerFactory invokerFactory
+ logger vlog.Logger
+ outstandingLookups map[int64]chan lookupReply
}
// newDispatcher is a dispatcher factory.
-func newDispatcher(invoker ipc.Invoker, authorizer security.Authorizer) *dispatcher {
- return &dispatcher{invoker, authorizer}
+func newDispatcher(serverID uint64, flowFactory flowFactory, invokerFactory invokerFactory, logger vlog.Logger) *dispatcher {
+ return &dispatcher{
+ serverID: serverID,
+ flowFactory: flowFactory,
+ invokerFactory: invokerFactory,
+ logger: logger,
+ outstandingLookups: make(map[int64]chan lookupReply),
+ }
}
// Lookup implements dispatcher interface Lookup.
func (d *dispatcher) Lookup(suffix, method string) (ipc.Invoker, security.Authorizer, error) {
- return d.invoker, d.authorizer, nil
+ flow := d.flowFactory.createFlow()
+ d.mu.Lock()
+ ch := make(chan lookupReply, 1)
+ d.outstandingLookups[flow.ID] = ch
+ d.mu.Unlock()
+
+ message := dispatcherRequest{
+ ServerID: d.serverID,
+ Suffix: suffix,
+ Method: lib.LowercaseFirstCharacter(method),
+ }
+ if err := flow.Writer.Send(lib.ResponseDispatcherLookup, message); err != nil {
+ ch <- lookupReply{
+ Err: &verror.Standard{
+ ID: verror.Internal,
+ Msg: fmt.Sprintf("could not marshal the method call data: %v", err),
+ },
+ }
+ }
+ request := <-ch
+
+ d.mu.Lock()
+ delete(d.outstandingLookups, flow.ID)
+ d.mu.Unlock()
+
+ d.flowFactory.cleanupFlow(flow.ID)
+
+ if request.Err != nil {
+ return nil, nil, request.Err
+ }
+
+ if request.Handle < 0 {
+ return nil, nil, verror.NoExistf("ipc: dispatcher for %s not found", suffix)
+ }
+
+ auth := vsecurity.NewACLAuthorizer(security.ACL{In: map[security.BlessingPattern]security.LabelSet{
+ security.AllPrincipals: security.AllLabels,
+ }})
+
+ invoker, err := d.invokerFactory.createInvoker(request.Handle, request.Signature)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ return invoker, auth, nil
+}
+
+func (d *dispatcher) handleLookupResponse(id int64, data string) {
+ d.mu.Lock()
+ ch := d.outstandingLookups[id]
+ d.mu.Unlock()
+
+ if ch == nil {
+ d.flowFactory.cleanupFlow(id)
+ d.logger.Errorf("unknown invoke request for flow: %d", id)
+ return
+ }
+
+ var request lookupReply
+ decoder := json.NewDecoder(bytes.NewBufferString(data))
+ if err := decoder.Decode(&request); err != nil {
+ request = lookupReply{
+ Err: &verror.Standard{
+ ID: verror.Internal,
+ Msg: fmt.Sprintf("could not unmarshal invoke request: %v", err),
+ },
+ }
+ d.logger.Errorf("unmarshaling invoke request failed: %v", err)
+ }
+ ch <- request
}
// StopServing implements dispatcher StopServing.
diff --git a/services/wsprd/ipc/server/dispatcher_test.go b/services/wsprd/ipc/server/dispatcher_test.go
new file mode 100644
index 0000000..16634eb
--- /dev/null
+++ b/services/wsprd/ipc/server/dispatcher_test.go
@@ -0,0 +1,119 @@
+package server
+
+import (
+ "reflect"
+ "testing"
+ "veyron.io/veyron/veyron/services/wsprd/lib"
+ "veyron.io/veyron/veyron/services/wsprd/lib/testwriter"
+ "veyron.io/veyron/veyron/services/wsprd/signature"
+ "veyron.io/veyron/veyron2/ipc"
+ "veyron.io/veyron/veyron2/rt"
+ "veyron.io/veyron/veyron2/security"
+)
+
+type mockFlowFactory struct {
+ writer testwriter.Writer
+}
+
+func (m *mockFlowFactory) createFlow() *Flow {
+ return &Flow{ID: 0, Writer: &m.writer}
+}
+
+func (*mockFlowFactory) cleanupFlow(int64) {}
+
+type mockInvoker struct {
+ handle int64
+ sig signature.JSONServiceSignature
+}
+
+func (mockInvoker) Prepare(string, int) ([]interface{}, security.Label, error) {
+ return nil, 0, nil
+}
+
+func (mockInvoker) Invoke(string, ipc.ServerCall, []interface{}) ([]interface{}, error) {
+ return nil, nil
+}
+
+type mockInvokerFactory struct{}
+
+func (mockInvokerFactory) createInvoker(handle int64, sig signature.JSONServiceSignature) (ipc.Invoker, error) {
+ return &mockInvoker{handle: handle, sig: sig}, nil
+}
+
+func init() {
+ rt.Init()
+}
+
+func TestSuccessfulLookup(t *testing.T) {
+ flowFactory := &mockFlowFactory{}
+ d := newDispatcher(0, flowFactory, mockInvokerFactory{}, rt.R().Logger())
+ go func() {
+ if err := flowFactory.writer.WaitForMessage(1); err != nil {
+ t.Errorf("failed to get dispatch request %v", err)
+ t.Fail()
+ }
+ signature := `{"add":{"inArgs":["foo","bar"],"numOutArgs":1,"isStreaming":false}}`
+ jsonResponse := `{"handle":1,"hasAuthorizer":false,"signature":` + signature + "}"
+ d.handleLookupResponse(0, jsonResponse)
+ }()
+
+ invoker, _, err := d.Lookup("a/b", "Read")
+
+ if err != nil {
+ t.Errorf("Unexpected error: %v", err)
+ }
+
+ expectedSig := signature.JSONServiceSignature{
+ "add": signature.JSONMethodSignature{
+ InArgs: []string{"foo", "bar"},
+ NumOutArgs: 1,
+ },
+ }
+ expectedInvoker := &mockInvoker{handle: 1, sig: expectedSig}
+ if !reflect.DeepEqual(invoker, expectedInvoker) {
+ t.Errorf("wrong invoker returned, expected: %v, got :%v", expectedInvoker, invoker)
+ }
+
+ expectedResponses := []testwriter.Response{
+ testwriter.Response{
+ Type: lib.ResponseDispatcherLookup,
+ Message: map[string]interface{}{
+ "serverId": 0.0,
+ "suffix": "a/b",
+ "method": "read",
+ },
+ },
+ }
+ testwriter.CheckResponses(&flowFactory.writer, expectedResponses, nil, t)
+}
+
+func TestFailedLookup(t *testing.T) {
+ flowFactory := &mockFlowFactory{}
+ d := newDispatcher(0, flowFactory, mockInvokerFactory{}, rt.R().Logger())
+ go func() {
+ if err := flowFactory.writer.WaitForMessage(1); err != nil {
+ t.Errorf("failed to get dispatch request %v", err)
+ t.Fail()
+ }
+ jsonResponse := `{"err":{"id":"veyron2/verror.Exists","msg":"bad stuff"}}`
+ d.handleLookupResponse(0, jsonResponse)
+ }()
+
+ _, _, err := d.Lookup("a/b", "Read")
+
+ if err == nil {
+ t.Errorf("expected error, but got none", err)
+ }
+
+ expectedResponses := []testwriter.Response{
+ testwriter.Response{
+ Type: lib.ResponseDispatcherLookup,
+ Message: map[string]interface{}{
+ "serverId": 0.0,
+ "suffix": "a/b",
+ "method": "read",
+ },
+ },
+ }
+ testwriter.CheckResponses(&flowFactory.writer, expectedResponses, nil, t)
+}
diff --git a/services/wsprd/ipc/server/invoker.go b/services/wsprd/ipc/server/invoker.go
index b097b6d..dd8fb8a 100644
--- a/services/wsprd/ipc/server/invoker.go
+++ b/services/wsprd/ipc/server/invoker.go
@@ -20,14 +20,14 @@
}
// newInvoker is an invoker factory
-func newInvoker(sig ipc.ServiceSignature, invokeFunc remoteInvokeFunc) (ipc.Invoker, error) {
+func newInvoker(sig ipc.ServiceSignature, invokeFunc remoteInvokeFunc) ipc.Invoker {
predefinedInvokers := make(map[string]ipc.Invoker)
// Special handling for predefined "signature" method
predefinedInvokers["Signature"] = newSignatureInvoker(sig)
i := &invoker{sig, invokeFunc, predefinedInvokers}
- return i, nil
+ return i
}
// Prepare implements the Invoker interface.
diff --git a/services/wsprd/ipc/server/server.go b/services/wsprd/ipc/server/server.go
index 4e7b627..d965c58 100644
--- a/services/wsprd/ipc/server/server.go
+++ b/services/wsprd/ipc/server/server.go
@@ -8,7 +8,6 @@
"fmt"
"sync"
- vsecurity "veyron.io/veyron/veyron/security"
"veyron.io/veyron/veyron/services/wsprd/lib"
"veyron.io/veyron/veyron/services/wsprd/signature"
@@ -27,6 +26,7 @@
// A request from the proxy to javascript to handle an RPC
type serverRPCRequest struct {
ServerId uint64
+ Handle int64
Method string
Args []interface{}
Context serverRPCRequestContext
@@ -78,7 +78,7 @@
server ipc.Server
// The saved dispatcher to reuse when serve is called multiple times.
- dispatcher ipc.Dispatcher
+ dispatcher *dispatcher
// The endpoint of the server. This is empty until the server has been
// started and listen has been called on it.
@@ -113,7 +113,7 @@
// communicate the result back via a channel to the caller
type remoteInvokeFunc func(methodName string, args []interface{}, call ipc.ServerCall) <-chan *serverRPCReply
-func (s *Server) createRemoteInvokerFunc() remoteInvokeFunc {
+func (s *Server) createRemoteInvokerFunc(handle int64) remoteInvokeFunc {
return func(methodName string, args []interface{}, call ipc.ServerCall) <-chan *serverRPCReply {
flow := s.helper.CreateNewFlow(s, call)
replyChan := make(chan *serverRPCReply, 1)
@@ -132,6 +132,7 @@
// Send a invocation request to JavaScript
message := serverRPCRequest{
ServerId: s.id,
+ Handle: handle,
Method: lib.LowercaseFirstCharacter(methodName),
Args: args,
Context: context,
@@ -171,27 +172,12 @@
}
}
-func (s *Server) Serve(name string, sig signature.JSONServiceSignature) (string, error) {
+func (s *Server) Serve(name string) (string, error) {
s.Lock()
defer s.Unlock()
- serviceSig, err := sig.ServiceSignature()
- if err != nil {
- return "", err
- }
-
- remoteInvokeFunc := s.createRemoteInvokerFunc()
- invoker, err := newInvoker(serviceSig, remoteInvokeFunc)
-
- if err != nil {
- return "", err
- }
-
if s.dispatcher == nil {
- s.dispatcher = newDispatcher(invoker,
- vsecurity.NewACLAuthorizer(security.ACL{In: map[security.BlessingPattern]security.LabelSet{
- security.AllPrincipals: security.AllLabels,
- }}))
+ s.dispatcher = newDispatcher(s.id, s, s, s.helper.GetLogger())
}
if s.endpoint == "" {
@@ -237,6 +223,28 @@
ch <- &serverReply
}
+func (s *Server) HandleLookupResponse(id int64, data string) {
+ s.dispatcher.handleLookupResponse(id, data)
+}
+
+func (s *Server) createFlow() *Flow {
+ return s.helper.CreateNewFlow(s, nil)
+}
+
+func (s *Server) cleanupFlow(id int64) {
+ s.helper.CleanupFlow(id)
+}
+
+func (s *Server) createInvoker(handle int64, sig signature.JSONServiceSignature) (ipc.Invoker, error) {
+ serviceSig, err := sig.ServiceSignature()
+ if err != nil {
+ return nil, err
+ }
+
+ remoteInvokeFunc := s.createRemoteInvokerFunc(handle)
+ return newInvoker(serviceSig, remoteInvokeFunc), nil
+}
+
func (s *Server) Stop() {
result := serverRPCReply{
Results: []interface{}{nil},
diff --git a/services/wsprd/lib/testwriter/writer.go b/services/wsprd/lib/testwriter/writer.go
new file mode 100644
index 0000000..ab97dd3
--- /dev/null
+++ b/services/wsprd/lib/testwriter/writer.go
@@ -0,0 +1,97 @@
+package testwriter
+
+import (
+ "bytes"
+ "encoding/json"
+ "fmt"
+ "reflect"
+ "sync"
+ "time"
+ "veyron.io/veyron/veyron/services/wsprd/lib"
+)
+
+type TestHarness interface {
+ Errorf(fmt string, a ...interface{})
+}
+
+type Response struct {
+ Type lib.ResponseType
+ Message interface{}
+}
+
+type Writer struct {
+ sync.Mutex
+ Stream []Response
+ err error
+ // If this channel is set then a message will be sent
+ // to this channel after recieving a call to FinishMessage()
+ notifier chan bool
+}
+
+func (w *Writer) Send(responseType lib.ResponseType, msg interface{}) error {
+ // We serialize and deserialize the reponse so that we can do deep equal with
+ // messages that contain non-exported structs.
+ var buf bytes.Buffer
+ if err := json.NewEncoder(&buf).Encode(Response{Type: responseType, Message: msg}); err != nil {
+ return err
+ }
+
+ var r Response
+
+ if err := json.NewDecoder(&buf).Decode(&r); err != nil {
+ return err
+ }
+
+ w.Lock()
+ defer w.Unlock()
+ w.Stream = append(w.Stream, r)
+ if w.notifier != nil {
+ w.notifier <- true
+ }
+ return nil
+
+}
+
+func (w *Writer) Error(err error) {
+ w.err = err
+}
+
+func (w *Writer) streamLength() int {
+ w.Lock()
+ defer w.Unlock()
+ return len(w.Stream)
+}
+
+// Waits until there is at least n messages in w.Stream. Returns an error if we timeout
+// waiting for the given number of messages.
+func (w *Writer) WaitForMessage(n int) error {
+ if w.streamLength() >= n {
+ return nil
+ }
+ w.Lock()
+ w.notifier = make(chan bool, 1)
+ w.Unlock()
+ for w.streamLength() < n {
+ select {
+ case <-w.notifier:
+ continue
+ case <-time.After(time.Second):
+ return fmt.Errorf("timed out")
+ }
+ }
+ w.Lock()
+ fmt.Printf("Looking for %d, got %d: %v", n, len(w.Stream), w.Stream)
+ w.notifier = nil
+ w.Unlock()
+ return nil
+}
+
+func CheckResponses(w *Writer, expectedStream []Response, err error, t TestHarness) {
+ if !reflect.DeepEqual(expectedStream, w.Stream) {
+ t.Errorf("streams don't match: expected %v, got %v", expectedStream, w.Stream)
+ }
+
+ if !reflect.DeepEqual(err, w.err) {
+ t.Errorf("unexpected error, got: %v, expected: %v", w.err, err)
+ }
+}
diff --git a/services/wsprd/lib/writer.go b/services/wsprd/lib/writer.go
index ae18996..c9d5ab6 100644
--- a/services/wsprd/lib/writer.go
+++ b/services/wsprd/lib/writer.go
@@ -3,11 +3,12 @@
type ResponseType int
const (
- ResponseFinal ResponseType = 0
- ResponseStream = 1
- ResponseError = 2
- ResponseServerRequest = 3
- ResponseStreamClose = 4
+ ResponseFinal ResponseType = 0
+ ResponseStream = 1
+ ResponseError = 2
+ ResponseServerRequest = 3
+ ResponseStreamClose = 4
+ ResponseDispatcherLookup = 5
)
// This is basically an io.Writer interface, that allows passing error message
diff --git a/services/wsprd/wspr/pipe.go b/services/wsprd/wspr/pipe.go
index 27d9090..2247910 100644
--- a/services/wsprd/wspr/pipe.go
+++ b/services/wsprd/wspr/pipe.go
@@ -59,6 +59,8 @@
// A request to create a new random identity
websocketCreateIdentity = 10
+
+ websocketLookupResponse = 11
)
type websocketMessage struct {
@@ -268,6 +270,8 @@
// from javascript.
ctx := p.wspr.rt.NewContext()
go p.controller.HandleSignatureRequest(ctx, msg.Data, ww)
+ case websocketLookupResponse:
+ go p.controller.HandleLookupResponse(msg.Id, msg.Data, ww)
case websocketBlessIdentity:
go p.controller.HandleBlessing(msg.Data, ww)
case websocketCreateIdentity: