veyron/services/wsprd: moving wspr to a separate repository
Change-Id: I09bd1418547f38ec70fcfa2e7943f8d6e78c5fb5
diff --git a/services/wsprd/app/app.go b/services/wsprd/app/app.go
new file mode 100644
index 0000000..c0fa829
--- /dev/null
+++ b/services/wsprd/app/app.go
@@ -0,0 +1,693 @@
+// The app package contains the struct that keeps per javascript app state and handles translating
+// javascript requests to veyron requests and vice versa.
+package app
+
+import (
+ "bytes"
+ "encoding/json"
+ "flag"
+ "fmt"
+ "io"
+ "sync"
+ "time"
+
+ "veyron.io/veyron/veyron2"
+ "veyron.io/veyron/veyron2/context"
+ "veyron.io/veyron/veyron2/ipc"
+ "veyron.io/veyron/veyron2/rt"
+ "veyron.io/veyron/veyron2/security"
+ "veyron.io/veyron/veyron2/verror"
+ "veyron.io/veyron/veyron2/vlog"
+ "veyron.io/veyron/veyron2/vom"
+ vom_wiretype "veyron.io/veyron/veyron2/vom/wiretype"
+ wiretype_build "veyron.io/veyron/veyron2/wiretype/build"
+ "veyron.io/wspr/veyron/services/wsprd/identity"
+ "veyron.io/wspr/veyron/services/wsprd/ipc/server"
+ "veyron.io/wspr/veyron/services/wsprd/lib"
+ "veyron.io/wspr/veyron/services/wsprd/signature"
+)
+
+// TODO(bjornick,nlacasse): Remove the retryTimeout flag once we able
+// to pass it in from javascript. For now all RPCs have the same
+// retryTimeout, set by command line flag.
+var retryTimeout *int
+
+func init() {
+ // TODO(bjornick,nlacasse): Remove the retryTimeout flag once we able
+ // to pass it in from javascript. For now all RPCs have the same
+ // retryTimeout, set by command line flag.
+ retryTimeout = flag.Int("retry-timeout", 0, "Duration in seconds to retry starting an RPC call. 0 means never retry.")
+}
+
+// Temporary holder of RPC so that we can store the unprocessed args.
+type veyronTempRPC struct {
+ Name string
+ Method string
+ InArgs []json.RawMessage
+ NumOutArgs int32
+ IsStreaming bool
+}
+
+type veyronRPC struct {
+ Name string
+ Method string
+ InArgs []interface{}
+ NumOutArgs int32
+ IsStreaming bool
+}
+
+// A request javascript to serve undern a particular name
+type serveRequest struct {
+ Name string
+ ServerId uint64
+}
+
+type jsonCaveatValidator struct {
+ Type string `json:"_type"`
+ Data json.RawMessage
+}
+
+type blessingRequest struct {
+ Handle int64
+ Caveats []jsonCaveatValidator
+ DurationMs int64
+ Name string
+}
+
+// PublicIDHandle is a handle given to Javascript that is linked
+// to a PublicID in go.
+type PublicIDHandle struct {
+ Handle int64
+ Names []string
+}
+
+// Controller represents all the state of a Veyron Web App. This is the struct
+// that is in charge performing all the veyron options.
+type Controller struct {
+ // Protects everything.
+ // TODO(bjornick): We need to split this up.
+ sync.Mutex
+
+ logger vlog.Logger
+
+ // The runtime to use to create new clients.
+ rt veyron2.Runtime
+
+ // The ipc.ListenSpec to use with server.Listen
+ listenSpec *ipc.ListenSpec
+
+ // Used to generate unique ids for requests initiated by the proxy.
+ // These ids will be even so they don't collide with the ids generated
+ // by the client.
+ lastGeneratedId int64
+
+ // Streams for the outstanding requests.
+ outstandingStreams map[int64]*outstandingStream
+
+ // Maps flowids to the server that owns them.
+ flowMap map[int64]*server.Server
+
+ // A manager that Handles fetching and caching signature of remote services
+ signatureManager lib.SignatureManager
+
+ // We maintain multiple Veyron server per websocket pipe for serving JavaScript
+ // services.
+ servers map[uint64]*server.Server
+
+ // Creates a client writer for a given flow. This is a member so that tests can override
+ // the default implementation.
+ writerCreator func(id int64) lib.ClientWriter
+
+ // There is only one client per Controller since there is only one identity per app.
+ client ipc.Client
+
+ veyronProxyEP string
+
+ // Store for all the PublicIDs that javascript has a handle to.
+ idStore *identity.JSPublicIDHandles
+}
+
+// NewController creates a new Controller. writerCreator will be used to create a new flow for rpcs to
+// javascript server. veyronProxyEP is an endpoint for the veyron proxy to serve through. It can't be empty.
+// opts are any options that should be passed to the rt.New(), such as the mounttable root.
+func NewController(writerCreator func(id int64) lib.ClientWriter,
+ listenSpec *ipc.ListenSpec, opts ...veyron2.ROpt) (*Controller, error) {
+ r, err := rt.New(opts...)
+ if err != nil {
+ return nil, err
+ }
+ client, err := r.NewClient()
+ if err != nil {
+ return nil, err
+ }
+
+ controller := &Controller{
+ rt: r,
+ logger: r.Logger(),
+ client: client,
+ writerCreator: writerCreator,
+ listenSpec: listenSpec,
+ idStore: identity.NewJSPublicIDHandles(),
+ }
+ controller.setup()
+ return controller, nil
+}
+
+// finishCall waits for the call to finish and write out the response to w.
+func (c *Controller) finishCall(w lib.ClientWriter, clientCall ipc.Call, msg *veyronRPC) {
+ if msg.IsStreaming {
+ for {
+ var item interface{}
+ if err := clientCall.Recv(&item); err != nil {
+ if err == io.EOF {
+ break
+ }
+ w.Error(err) // Send streaming error as is
+ return
+ }
+ if err := w.Send(lib.ResponseStream, item); err != nil {
+ w.Error(verror.Internalf("unable to marshal: %v", item))
+ }
+ }
+
+ if err := w.Send(lib.ResponseStreamClose, nil); err != nil {
+ w.Error(verror.Internalf("unable to marshal close stream message"))
+ }
+ }
+
+ results := make([]interface{}, msg.NumOutArgs)
+ // This array will have pointers to the values in result.
+ resultptrs := make([]interface{}, msg.NumOutArgs)
+ for ax := range results {
+ resultptrs[ax] = &results[ax]
+ }
+ if err := clientCall.Finish(resultptrs...); err != nil {
+ // return the call system error as is
+ w.Error(err)
+ return
+ }
+ // for now we assume last out argument is always error
+ if len(results) < 1 {
+ w.Error(verror.Internalf("client call did not return any results"))
+ return
+ }
+
+ if err, ok := results[len(results)-1].(error); ok {
+ // return the call Application error as is
+ w.Error(err)
+ return
+ }
+
+ if err := w.Send(lib.ResponseFinal, results[0:len(results)-1]); err != nil {
+ w.Error(verror.Internalf("error marshalling results: %v", err))
+ }
+}
+
+func (c *Controller) startCall(ctx context.T, w lib.ClientWriter, msg *veyronRPC) (ipc.Call, error) {
+ if c.client == nil {
+ return nil, verror.BadArgf("no client created")
+ }
+ methodName := lib.UppercaseFirstCharacter(msg.Method)
+ retryTimeoutOpt := veyron2.RetryTimeoutOpt(time.Duration(*retryTimeout) * time.Second)
+ clientCall, err := c.client.StartCall(ctx, msg.Name, methodName, msg.InArgs, retryTimeoutOpt)
+ if err != nil {
+ return nil, fmt.Errorf("error starting call (name: %v, method: %v, args: %v): %v", msg.Name, methodName, msg.InArgs, err)
+ }
+
+ return clientCall, nil
+}
+
+// Implements the serverHelper interface
+
+// CreateNewFlow creats a new server flow that will be used to write out
+// streaming messages to Javascript.
+func (c *Controller) CreateNewFlow(s *server.Server, stream ipc.Stream) *server.Flow {
+ c.Lock()
+ defer c.Unlock()
+ id := c.lastGeneratedId
+ c.lastGeneratedId += 2
+ c.flowMap[id] = s
+ os := newStream()
+ os.init(stream, vom_wiretype.Type{ID: 1})
+ c.outstandingStreams[id] = os
+ return &server.Flow{ID: id, Writer: c.writerCreator(id)}
+}
+
+// CleanupFlow removes the bookkeping for a previously created flow.
+func (c *Controller) CleanupFlow(id int64) {
+ c.Lock()
+ stream := c.outstandingStreams[id]
+ delete(c.outstandingStreams, id)
+ delete(c.flowMap, id)
+ c.Unlock()
+ if stream != nil {
+ stream.end()
+ stream.waitUntilDone()
+ }
+}
+
+// GetLogger returns a Veyron logger to use.
+func (c *Controller) GetLogger() vlog.Logger {
+ return c.logger
+}
+
+// RT returns the runtime of the app.
+func (c *Controller) RT() veyron2.Runtime {
+ return c.rt
+}
+
+// AddIdentity adds the PublicID to the local id store and returns
+// the handle to it. This function exists because JS only has
+// a handle to the PublicID to avoid shipping the blessing forest
+// to JS and back.
+func (c *Controller) AddIdentity(id security.PublicID) int64 {
+ return c.idStore.Add(id)
+}
+
+// Cleanup cleans up any outstanding rpcs.
+func (c *Controller) Cleanup() {
+ c.logger.VI(0).Info("Cleaning up websocket")
+ c.Lock()
+ defer c.Unlock()
+
+ for _, stream := range c.outstandingStreams {
+ stream.end()
+ }
+
+ for _, server := range c.servers {
+ server.Stop()
+ }
+}
+
+func (c *Controller) setup() {
+ c.signatureManager = lib.NewSignatureManager()
+ c.outstandingStreams = make(map[int64]*outstandingStream)
+ c.flowMap = make(map[int64]*server.Server)
+ c.servers = make(map[uint64]*server.Server)
+}
+
+// SendOnStream writes data on id's stream. The actual network write will be
+// done asynchronously. If there is an error, it will be sent to w.
+func (c *Controller) SendOnStream(id int64, data string, w lib.ClientWriter) {
+ c.Lock()
+ stream := c.outstandingStreams[id]
+ c.Unlock()
+
+ if stream == nil {
+ vlog.Errorf("unknown stream: %d", id)
+ return
+ }
+ stream.send(data, w)
+}
+
+// SendVeyronRequest makes a veyron request for the given flowId. If signal is non-nil, it will receive
+// the call object after it has been constructed.
+func (c *Controller) sendVeyronRequest(ctx context.T, id int64, tempMsg *veyronTempRPC, w lib.ClientWriter, stream *outstandingStream) {
+ // Fetch and adapt signature from the SignatureManager
+ retryTimeoutOpt := veyron2.RetryTimeoutOpt(time.Duration(*retryTimeout) * time.Second)
+ sig, err := c.signatureManager.Signature(ctx, tempMsg.Name, c.client, retryTimeoutOpt)
+ if err != nil {
+ w.Error(verror.Internalf("error getting service signature for %s: %v", tempMsg.Name, err))
+ return
+ }
+
+ methName := lib.UppercaseFirstCharacter(tempMsg.Method)
+ methSig, ok := sig.Methods[methName]
+ if !ok {
+ w.Error(fmt.Errorf("method not found in signature: %v (full sig: %v)", methName, sig))
+ return
+ }
+
+ var msg veyronRPC
+ if len(methSig.InArgs) != len(tempMsg.InArgs) {
+ w.Error(fmt.Errorf("invalid number of arguments, expected: %v, got:%v", methSig, tempMsg))
+ return
+ }
+ msg.InArgs = make([]interface{}, len(tempMsg.InArgs))
+ td := wiretype_build.TypeDefs(sig.TypeDefs)
+
+ for i := 0; i < len(tempMsg.InArgs); i++ {
+ argTypeId := methSig.InArgs[i].Type
+ argType := vom_wiretype.Type{
+ ID: argTypeId,
+ Defs: &td,
+ }
+
+ val, err := vom.JSONToObject(string(tempMsg.InArgs[i]), argType)
+ if err != nil {
+ w.Error(fmt.Errorf("error while converting json to object for arg %d (%s): %v", i, methSig.InArgs[i].Name, err))
+ return
+ }
+ msg.InArgs[i] = val
+ }
+
+ msg.Name = tempMsg.Name
+ msg.Method = tempMsg.Method
+ msg.NumOutArgs = tempMsg.NumOutArgs
+ msg.IsStreaming = tempMsg.IsStreaming
+
+ inStreamType := vom_wiretype.Type{
+ ID: methSig.InStream,
+ Defs: &td,
+ }
+
+ // We have to make the start call synchronous so we can make sure that we populate
+ // the call map before we can Handle a recieve call.
+ call, err := c.startCall(ctx, w, &msg)
+ if err != nil {
+ w.Error(verror.Internalf("can't start Veyron Request: %v", err))
+ return
+ }
+
+ if stream != nil {
+ stream.init(call, inStreamType)
+ }
+
+ c.finishCall(w, call, &msg)
+ if stream != nil {
+ c.Lock()
+ delete(c.outstandingStreams, id)
+ c.Unlock()
+ }
+}
+
+// HandleVeyronRequest starts a veyron rpc and returns before the rpc has been completed.
+func (c *Controller) HandleVeyronRequest(ctx context.T, id int64, data string, w lib.ClientWriter) {
+ veyronTempMsg, err := c.parseVeyronRequest(ctx, bytes.NewBufferString(data))
+ if err != nil {
+ w.Error(verror.Internalf("can't parse Veyron Request: %v", err))
+ return
+ }
+
+ // If this rpc is streaming, we would expect that the client would try to send
+ // on this stream. Since the initial handshake is done asynchronously, we have
+ // to put the outstanding stream in the map before we make the async call so that
+ // the future send know which queue to write to, even if the client call isn't
+ // actually ready yet.
+ var stream *outstandingStream
+ if veyronTempMsg.IsStreaming {
+ stream = newStream()
+ c.Lock()
+ c.outstandingStreams[id] = stream
+ c.Unlock()
+ }
+ go c.sendVeyronRequest(ctx, id, veyronTempMsg, w, stream)
+}
+
+// CloseStream closes the stream for a given id.
+func (c *Controller) CloseStream(id int64) {
+ c.Lock()
+ defer c.Unlock()
+ stream := c.outstandingStreams[id]
+ if stream == nil {
+ c.logger.Errorf("close called on non-existent call: %v", id)
+ return
+ }
+
+ stream.end()
+}
+
+func (c *Controller) maybeCreateServer(serverId uint64) (*server.Server, error) {
+ c.Lock()
+ defer c.Unlock()
+ if server, ok := c.servers[serverId]; ok {
+ return server, nil
+ }
+ server, err := server.NewServer(serverId, c.listenSpec, c)
+ if err != nil {
+ return nil, err
+ }
+ c.servers[serverId] = server
+ return server, nil
+}
+
+func (c *Controller) removeServer(serverId uint64) {
+ c.Lock()
+ server := c.servers[serverId]
+ if server == nil {
+ c.Unlock()
+ return
+ }
+ delete(c.servers, serverId)
+ c.Unlock()
+
+ server.Stop()
+}
+
+func (c *Controller) serve(serveRequest serveRequest, w lib.ClientWriter) {
+ // Create a server for the websocket pipe, if it does not exist already
+ server, err := c.maybeCreateServer(serveRequest.ServerId)
+ if err != nil {
+ w.Error(verror.Internalf("error creating server: %v", err))
+ }
+
+ c.logger.VI(2).Infof("serving under name: %q", serveRequest.Name)
+
+ endpoint, err := server.Serve(serveRequest.Name)
+ if err != nil {
+ w.Error(verror.Internalf("error serving service: %v", err))
+ return
+ }
+ // Send the endpoint back
+ if err := w.Send(lib.ResponseFinal, endpoint); err != nil {
+ w.Error(verror.Internalf("error marshalling results: %v", err))
+ return
+ }
+}
+
+// HandleServeRequest takes a request to serve a server, creates
+// a server, registers the provided services and sends the endpoint back.
+func (c *Controller) HandleServeRequest(data string, w lib.ClientWriter) {
+ // Decode the serve request which includes IDL, registered services and name
+ var serveRequest serveRequest
+ if err := json.Unmarshal([]byte(data), &serveRequest); err != nil {
+ w.Error(verror.Internalf("can't unmarshal JSONMessage: %v", err))
+ return
+ }
+ c.serve(serveRequest, w)
+}
+
+// HandleLookupResponse handles the result of a Dispatcher.Lookup call that was
+// run by the Javascript server.
+func (c *Controller) HandleLookupResponse(id int64, data string) {
+ c.Lock()
+ server := c.flowMap[id]
+ c.Unlock()
+ if server == nil {
+ c.logger.Errorf("unexpected result from JavaScript. No channel "+
+ "for MessageId: %d exists. Ignoring the results.", id)
+ //Ignore unknown responses that don't belong to any channel
+ return
+ }
+ server.HandleLookupResponse(id, data)
+}
+
+// HandleAuthResponse handles the result of a Authorizer.Authorize call that was
+// run by the Javascript server.
+func (c *Controller) HandleAuthResponse(id int64, data string) {
+ c.Lock()
+ server := c.flowMap[id]
+ c.Unlock()
+ if server == nil {
+ c.logger.Errorf("unexpected result from JavaScript. No channel "+
+ "for MessageId: %d exists. Ignoring the results.", id)
+ //Ignore unknown responses that don't belong to any channel
+ return
+ }
+ server.HandleAuthResponse(id, data)
+}
+
+// HandleStopRequest takes a request to stop a server.
+func (c *Controller) HandleStopRequest(data string, w lib.ClientWriter) {
+ var serverId uint64
+ if err := json.Unmarshal([]byte(data), &serverId); err != nil {
+ w.Error(verror.Internalf("can't unmarshal JSONMessage: %v", err))
+ return
+ }
+
+ c.removeServer(serverId)
+
+ // Send true to indicate stop has finished
+ if err := w.Send(lib.ResponseFinal, true); err != nil {
+ w.Error(verror.Internalf("error marshalling results: %v", err))
+ return
+ }
+}
+
+// HandleServerResponse handles the completion of outstanding calls to JavaScript services
+// by filling the corresponding channel with the result from JavaScript.
+func (c *Controller) HandleServerResponse(id int64, data string) {
+ c.Lock()
+ server := c.flowMap[id]
+ c.Unlock()
+ if server == nil {
+ c.logger.Errorf("unexpected result from JavaScript. No channel "+
+ "for MessageId: %d exists. Ignoring the results.", id)
+ //Ignore unknown responses that don't belong to any channel
+ return
+ }
+ server.HandleServerResponse(id, data)
+}
+
+// parseVeyronRequest parses a json rpc request into a veyronRPC object.
+func (c *Controller) parseVeyronRequest(ctx context.T, r io.Reader) (*veyronTempRPC, error) {
+ var tempMsg veyronTempRPC
+ decoder := json.NewDecoder(r)
+ if err := decoder.Decode(&tempMsg); err != nil {
+ return nil, fmt.Errorf("can't unmarshall JSONMessage: %v", err)
+ }
+ c.logger.VI(2).Infof("VeyronRPC: %s.%s(id=%v, ..., streaming=%v)", tempMsg.Name, tempMsg.Method, tempMsg.IsStreaming)
+ return &tempMsg, nil
+}
+
+type signatureRequest struct {
+ Name string
+}
+
+func (c *Controller) getSignature(ctx context.T, name string) (signature.JSONServiceSignature, error) {
+ // Fetch and adapt signature from the SignatureManager
+ retryTimeoutOpt := veyron2.RetryTimeoutOpt(time.Duration(*retryTimeout) * time.Second)
+ sig, err := c.signatureManager.Signature(ctx, name, c.client, retryTimeoutOpt)
+ if err != nil {
+ return nil, verror.Internalf("error getting service signature for %s: %v", name, err)
+ }
+
+ return signature.NewJSONServiceSignature(*sig), nil
+}
+
+// HandleSignatureRequest uses signature manager to get and cache signature of a remote server
+func (c *Controller) HandleSignatureRequest(ctx context.T, data string, w lib.ClientWriter) {
+ // Decode the request
+ var request signatureRequest
+ if err := json.Unmarshal([]byte(data), &request); err != nil {
+ w.Error(verror.Internalf("can't unmarshal JSONMessage: %v", err))
+ return
+ }
+
+ c.logger.VI(2).Infof("requesting Signature for %q", request.Name)
+ jsSig, err := c.getSignature(ctx, request.Name)
+ if err != nil {
+ w.Error(err)
+ return
+ }
+
+ // Send the signature back
+ if err := w.Send(lib.ResponseFinal, jsSig); err != nil {
+ w.Error(verror.Internalf("error marshalling results: %v", err))
+ return
+ }
+}
+
+// HandleUnlinkJSIdentity removes an identity from the JS identity store.
+// data should be JSON encoded number
+func (c *Controller) HandleUnlinkJSIdentity(data string, w lib.ClientWriter) {
+ var handle int64
+ if err := json.Unmarshal([]byte(data), &handle); err != nil {
+ w.Error(verror.Internalf("can't unmarshal JSONMessage: %v", err))
+ return
+ }
+ c.idStore.Remove(handle)
+}
+
+// Convert the json wire format of a caveat into the right go object
+func decodeCaveat(c jsonCaveatValidator) (security.Caveat, error) {
+ var failed security.Caveat
+ switch c.Type {
+ case "MethodCaveat":
+ var methods []string
+ if err := json.Unmarshal(c.Data, &methods); err != nil {
+ return failed, err
+ }
+ if len(methods) == 0 {
+ return failed, fmt.Errorf("must provide at least one method")
+ }
+ return security.MethodCaveat(methods[0], methods[1:]...)
+ case "PeerBlessingsCaveat":
+ var patterns []security.BlessingPattern
+ if err := json.Unmarshal(c.Data, &patterns); err != nil {
+ return failed, err
+ }
+ if len(patterns) == 0 {
+ return failed, fmt.Errorf("must provide at least one BlessingPattern")
+ }
+ return security.PeerBlessingsCaveat(patterns[0], patterns[1:]...)
+ default:
+ return failed, verror.BadArgf("unknown caveat type %s", c.Type)
+ }
+}
+
+func (c *Controller) getPublicIDHandle(handle int64) (*PublicIDHandle, error) {
+ id := c.idStore.Get(handle)
+ if id == nil {
+ return nil, verror.NoExistf("uknown public id")
+ }
+ return &PublicIDHandle{Handle: handle, Names: id.Names()}, nil
+}
+
+func (c *Controller) bless(request blessingRequest) (*PublicIDHandle, error) {
+ var caveats []security.Caveat
+ for _, c := range request.Caveats {
+ cav, err := decodeCaveat(c)
+ if err != nil {
+ return nil, verror.BadArgf("failed to create caveat: %v", err)
+ }
+ caveats = append(caveats, cav)
+ }
+ duration := time.Duration(request.DurationMs) * time.Millisecond
+
+ blessee := c.idStore.Get(request.Handle)
+
+ if blessee == nil {
+ return nil, verror.NoExistf("invalid PublicID handle")
+ }
+ blessor := c.rt.Identity()
+
+ blessed, err := blessor.Bless(blessee, request.Name, duration, caveats)
+ if err != nil {
+ return nil, err
+ }
+
+ return &PublicIDHandle{Handle: c.idStore.Add(blessed), Names: blessed.Names()}, nil
+}
+
+// HandleBlessing handles a blessing request from JS.
+func (c *Controller) HandleBlessing(data string, w lib.ClientWriter) {
+ var request blessingRequest
+ if err := json.Unmarshal([]byte(data), &request); err != nil {
+ w.Error(verror.Internalf("can't unmarshall message: %v", err))
+ return
+ }
+
+ handle, err := c.bless(request)
+
+ if err != nil {
+ w.Error(err)
+ return
+ }
+
+ // Send the id back.
+ if err := w.Send(lib.ResponseFinal, handle); err != nil {
+ w.Error(verror.Internalf("error marshalling results: %v", err))
+ return
+ }
+}
+
+func (c *Controller) HandleCreateIdentity(data string, w lib.ClientWriter) {
+ var name string
+ if err := json.Unmarshal([]byte(data), &name); err != nil {
+ w.Error(verror.Internalf("can't unmarshall message: %v", err))
+ return
+ }
+ id, err := c.rt.NewIdentity(name)
+ if err != nil {
+ w.Error(err)
+ return
+ }
+
+ publicID := id.PublicID()
+ jsID := &PublicIDHandle{Handle: c.idStore.Add(publicID), Names: publicID.Names()}
+ if err := w.Send(lib.ResponseFinal, jsID); err != nil {
+ w.Error(verror.Internalf("error marshalling results: %v", err))
+ return
+ }
+}
diff --git a/services/wsprd/app/app_test.go b/services/wsprd/app/app_test.go
new file mode 100644
index 0000000..9b20dfa
--- /dev/null
+++ b/services/wsprd/app/app_test.go
@@ -0,0 +1,855 @@
+package app
+
+import (
+ "encoding/json"
+ "fmt"
+ "reflect"
+ "testing"
+
+ "veyron.io/veyron/veyron2"
+ "veyron.io/veyron/veyron2/ipc"
+ "veyron.io/veyron/veyron2/naming"
+ "veyron.io/veyron/veyron2/rt"
+ "veyron.io/veyron/veyron2/security"
+ "veyron.io/veyron/veyron2/vdl/vdlutil"
+ "veyron.io/veyron/veyron2/verror"
+ "veyron.io/veyron/veyron2/vom"
+ vom_wiretype "veyron.io/veyron/veyron2/vom/wiretype"
+ "veyron.io/veyron/veyron2/wiretype"
+ "veyron.io/wspr/veyron/services/wsprd/lib"
+ "veyron.io/wspr/veyron/services/wsprd/lib/testwriter"
+ "veyron.io/wspr/veyron/services/wsprd/signature"
+
+ "veyron.io/veyron/veyron/profiles"
+ "veyron.io/veyron/veyron/runtimes/google/ipc/stream/proxy"
+ mounttable "veyron.io/veyron/veyron/services/mounttable/lib"
+)
+
+var r = rt.Init()
+
+type simpleAdder struct{}
+
+func (s simpleAdder) Add(_ ipc.ServerCall, a, b int32) (int32, error) {
+ return a + b, nil
+}
+
+func (s simpleAdder) Divide(_ ipc.ServerCall, a, b int32) (int32, error) {
+ if b == 0 {
+ return 0, verror.BadArgf("can't divide by zero")
+ }
+ return a / b, nil
+}
+
+func (s simpleAdder) StreamingAdd(call ipc.ServerCall) (int32, error) {
+ total := int32(0)
+ var value int32
+ for err := call.Recv(&value); err == nil; err = call.Recv(&value) {
+ total += value
+ call.Send(total)
+ }
+ return total, nil
+}
+
+func (s simpleAdder) Signature(call ipc.ServerCall) (ipc.ServiceSignature, error) {
+ result := ipc.ServiceSignature{Methods: make(map[string]ipc.MethodSignature)}
+ result.Methods["Add"] = ipc.MethodSignature{
+ InArgs: []ipc.MethodArgument{
+ {Name: "A", Type: 36},
+ {Name: "B", Type: 36},
+ },
+ OutArgs: []ipc.MethodArgument{
+ {Name: "Value", Type: 36},
+ {Name: "Err", Type: 65},
+ },
+ }
+
+ result.Methods["Divide"] = ipc.MethodSignature{
+ InArgs: []ipc.MethodArgument{
+ {Name: "A", Type: 36},
+ {Name: "B", Type: 36},
+ },
+ OutArgs: []ipc.MethodArgument{
+ {Name: "Value", Type: 36},
+ {Name: "Err", Type: 65},
+ },
+ }
+
+ result.Methods["StreamingAdd"] = ipc.MethodSignature{
+ InArgs: []ipc.MethodArgument{},
+ OutArgs: []ipc.MethodArgument{
+ {Name: "Value", Type: 36},
+ {Name: "Err", Type: 65},
+ },
+ InStream: 36,
+ OutStream: 36,
+ }
+ result.TypeDefs = []vdlutil.Any{
+ wiretype.NamedPrimitiveType{Type: 0x1, Name: "error", Tags: []string(nil)}}
+
+ return result, nil
+}
+
+func startAnyServer(servesMT bool, dispatcher ipc.Dispatcher) (ipc.Server, naming.Endpoint, error) {
+ // Create a new server instance.
+ s, err := r.NewServer(veyron2.ServesMountTableOpt(servesMT))
+ if err != nil {
+ return nil, nil, err
+ }
+
+ endpoint, err := s.Listen("tcp", "127.0.0.1:0")
+ if err != nil {
+ return nil, nil, err
+ }
+
+ if err := s.Serve("", dispatcher); err != nil {
+ return nil, nil, err
+ }
+ return s, endpoint, nil
+}
+
+func startAdderServer() (ipc.Server, naming.Endpoint, error) {
+ return startAnyServer(false, ipc.LeafDispatcher(simpleAdder{}, nil))
+}
+
+func startProxy() (*proxy.Proxy, error) {
+ rid, err := naming.NewRoutingID()
+ if err != nil {
+ return nil, err
+ }
+ return proxy.New(rid, nil, "tcp", "127.0.0.1:0", "")
+}
+
+func startMountTableServer() (ipc.Server, naming.Endpoint, error) {
+ mt, err := mounttable.NewMountTable("")
+ if err != nil {
+ return nil, nil, err
+ }
+ return startAnyServer(true, mt)
+}
+
+var adderServiceSignature signature.JSONServiceSignature = signature.JSONServiceSignature{
+ "add": signature.JSONMethodSignature{
+ InArgs: []string{"A", "B"},
+ NumOutArgs: 2,
+ IsStreaming: false,
+ },
+ "divide": signature.JSONMethodSignature{
+ InArgs: []string{"A", "B"},
+ NumOutArgs: 2,
+ IsStreaming: false,
+ },
+ "streamingAdd": signature.JSONMethodSignature{
+ InArgs: []string{},
+ NumOutArgs: 2,
+ IsStreaming: true,
+ },
+}
+
+func TestGetGoServerSignature(t *testing.T) {
+ s, endpoint, err := startAdderServer()
+ if err != nil {
+ t.Errorf("unable to start server: %v", err)
+ t.Fail()
+ return
+ }
+ defer s.Stop()
+ spec := *profiles.LocalListenSpec
+ spec.Proxy = "mockVeyronProxyEP"
+ controller, err := NewController(nil, &spec)
+
+ if err != nil {
+ t.Errorf("Failed to create controller: %v", err)
+ }
+ jsSig, err := controller.getSignature(r.NewContext(), "/"+endpoint.String())
+ if err != nil {
+ t.Errorf("Failed to get signature: %v", err)
+ }
+
+ if !reflect.DeepEqual(jsSig, adderServiceSignature) {
+ t.Errorf("Unexpected signature, got :%v, expected: %v", jsSig, adderServiceSignature)
+ }
+}
+
+type goServerTestCase struct {
+ method string
+ inArgs []json.RawMessage
+ numOutArgs int32
+ streamingInputs []string
+ streamingInputType vom.Type
+ expectedStream []testwriter.Response
+ expectedError error
+}
+
+func runGoServerTestCase(t *testing.T, test goServerTestCase) {
+ s, endpoint, err := startAdderServer()
+ if err != nil {
+ t.Errorf("unable to start server: %v", err)
+ t.Fail()
+ return
+ }
+ defer s.Stop()
+
+ spec := *profiles.LocalListenSpec
+ spec.Proxy = "mockVeyronProxyEP"
+ controller, err := NewController(nil, &spec)
+
+ if err != nil {
+ t.Errorf("unable to create controller: %v", err)
+ t.Fail()
+ return
+ }
+
+ writer := testwriter.Writer{}
+ var stream *outstandingStream
+ if len(test.streamingInputs) > 0 {
+ stream = newStream()
+ controller.outstandingStreams[0] = stream
+ go func() {
+ for _, value := range test.streamingInputs {
+ controller.SendOnStream(0, value, &writer)
+ }
+ controller.CloseStream(0)
+ }()
+ }
+
+ request := veyronTempRPC{
+ Name: "/" + endpoint.String(),
+ Method: test.method,
+ InArgs: test.inArgs,
+ NumOutArgs: test.numOutArgs,
+ IsStreaming: stream != nil,
+ }
+ controller.sendVeyronRequest(r.NewContext(), 0, &request, &writer, stream)
+
+ testwriter.CheckResponses(&writer, test.expectedStream, test.expectedError, t)
+}
+
+func TestCallingGoServer(t *testing.T) {
+ runGoServerTestCase(t, goServerTestCase{
+ method: "Add",
+ inArgs: []json.RawMessage{json.RawMessage("2"), json.RawMessage("3")},
+ numOutArgs: 2,
+ expectedStream: []testwriter.Response{
+ testwriter.Response{
+ Message: []interface{}{5.0},
+ Type: lib.ResponseFinal,
+ },
+ },
+ })
+}
+
+func TestCallingGoServerWithError(t *testing.T) {
+ runGoServerTestCase(t, goServerTestCase{
+ method: "Divide",
+ inArgs: []json.RawMessage{json.RawMessage("1"), json.RawMessage("0")},
+ numOutArgs: 2,
+ expectedError: verror.BadArgf("can't divide by zero"),
+ })
+}
+
+func TestCallingGoWithStreaming(t *testing.T) {
+ runGoServerTestCase(t, goServerTestCase{
+ method: "StreamingAdd",
+ inArgs: []json.RawMessage{},
+ streamingInputs: []string{"1", "2", "3", "4"},
+ streamingInputType: vom_wiretype.Type{ID: 36},
+ numOutArgs: 2,
+ expectedStream: []testwriter.Response{
+ testwriter.Response{
+ Message: 1.0,
+ Type: lib.ResponseStream,
+ },
+ testwriter.Response{
+ Message: 3.0,
+ Type: lib.ResponseStream,
+ },
+ testwriter.Response{
+ Message: 6.0,
+ Type: lib.ResponseStream,
+ },
+ testwriter.Response{
+ Message: 10.0,
+ Type: lib.ResponseStream,
+ },
+ testwriter.Response{
+ Message: nil,
+ Type: lib.ResponseStreamClose,
+ },
+ testwriter.Response{
+ Message: []interface{}{10.0},
+ Type: lib.ResponseFinal,
+ },
+ },
+ })
+}
+
+type runningTest struct {
+ controller *Controller
+ writer *testwriter.Writer
+ mounttableServer ipc.Server
+ proxyServer *proxy.Proxy
+}
+
+func serveServer() (*runningTest, error) {
+ mounttableServer, endpoint, err := startMountTableServer()
+
+ if err != nil {
+ return nil, fmt.Errorf("unable to start mounttable: %v", err)
+ }
+
+ proxyServer, err := startProxy()
+
+ if err != nil {
+ return nil, fmt.Errorf("unable to start proxy: %v", err)
+ }
+
+ proxyEndpoint := proxyServer.Endpoint().String()
+
+ writer := testwriter.Writer{}
+
+ writerCreator := func(int64) lib.ClientWriter {
+ return &writer
+ }
+ spec := *profiles.LocalListenSpec
+ spec.Proxy = "/" + proxyEndpoint
+ controller, err := NewController(writerCreator, &spec, veyron2.NamespaceRoots{"/" + endpoint.String()})
+
+ if err != nil {
+ return nil, err
+ }
+
+ controller.serve(serveRequest{
+ Name: "adder",
+ }, &writer)
+
+ return &runningTest{
+ controller, &writer, mounttableServer, proxyServer,
+ }, nil
+}
+
+func TestJavascriptServeServer(t *testing.T) {
+ rt, err := serveServer()
+ defer rt.mounttableServer.Stop()
+ defer rt.proxyServer.Shutdown()
+ defer rt.controller.Cleanup()
+ if err != nil {
+ t.Fatalf("could not serve server %v", err)
+ }
+
+ if len(rt.writer.Stream) != 1 {
+ t.Errorf("expected only one response, got %d", len(rt.writer.Stream))
+ return
+ }
+
+ resp := rt.writer.Stream[0]
+
+ if resp.Type != lib.ResponseFinal {
+ t.Errorf("unknown stream message Got: %v, expected: serve response", resp)
+ return
+ }
+
+ if msg, ok := resp.Message.(string); ok {
+ if _, err := r.NewEndpoint(msg); err == nil {
+ return
+ }
+ }
+ t.Errorf("invalid endpdoint returned from serve: %v", resp.Message)
+}
+
+func TestJavascriptStopServer(t *testing.T) {
+ rt, err := serveServer()
+ defer rt.mounttableServer.Stop()
+ defer rt.proxyServer.Shutdown()
+ defer rt.controller.Cleanup()
+
+ if err != nil {
+ t.Errorf("could not serve server %v", err)
+ return
+ }
+
+ // ensure there is only one server and then stop the server
+ if len(rt.controller.servers) != 1 {
+ t.Errorf("expected only one server but got: %d", len(rt.controller.servers))
+ return
+ }
+ for serverId := range rt.controller.servers {
+ rt.controller.removeServer(serverId)
+ }
+
+ // ensure there is no more servers now
+ if len(rt.controller.servers) != 0 {
+ t.Errorf("expected no server after stopping the only one but got: %d", len(rt.controller.servers))
+ return
+ }
+
+ return
+}
+
+// A test case to simulate a Javascript server talking to the App. All the
+// responses from Javascript are mocked and sent back through the method calls.
+// All messages from the client are sent using a go client.
+type jsServerTestCase struct {
+ method string
+ inArgs []interface{}
+ // The set of streaming inputs from the client to the server.
+ // This is passed to the client, which then passes it to the app.
+ clientStream []interface{}
+ // The set of JSON streaming messages sent from Javascript to the
+ // app.
+ serverStream []string
+ // The stream that we expect the client to see.
+ expectedServerStream []interface{}
+ // The final response sent by the Javascript server to the
+ // app.
+ finalResponse interface{}
+ // The final error sent by the Javascript server to the app.
+ err *verror.Standard
+
+ // Whether or not the Javascript server has an authorizer or not.
+ // If it does have an authorizer, then authError is sent back from the server
+ // to the app.
+ hasAuthorizer bool
+ authError *verror.Standard
+}
+
+func sendServerStream(t *testing.T, controller *Controller, test *jsServerTestCase, w lib.ClientWriter, expectedFlow int64) {
+ for _, msg := range test.serverStream {
+ controller.SendOnStream(4, msg, w)
+ }
+
+ serverReply := map[string]interface{}{
+ "Results": []interface{}{test.finalResponse},
+ "Err": test.err,
+ }
+
+ bytes, err := json.Marshal(serverReply)
+ if err != nil {
+ t.Fatalf("Failed to serialize the reply: %v", err)
+ }
+ controller.HandleServerResponse(expectedFlow, string(bytes))
+}
+
+// Replaces the "remoteEndpoint" in security context of the message
+// passed in with a constant string "remoteEndpoint" since there is
+// no way to get the endpoint of the client.
+func cleanUpAuthRequest(message *testwriter.Response, t *testing.T) {
+ // We should make sure that remoteEndpoint exists in the last message and
+ // change it to a fixed string.
+ if message.Type != lib.ResponseAuthRequest {
+ t.Errorf("Unexpected auth message %v", message)
+ return
+ }
+ context := message.Message.(map[string]interface{})["context"].(map[string]interface{})
+ if context["remoteEndpoint"] == nil || context["remoteEndpoint"] == "" {
+ t.Errorf("Unexpected auth message %v", message)
+ }
+ context["remoteEndpoint"] = "remoteEndpoint"
+}
+
+func runJsServerTestCase(t *testing.T, test jsServerTestCase) {
+ rt, err := serveServer()
+ defer rt.mounttableServer.Stop()
+ defer rt.proxyServer.Shutdown()
+ defer rt.controller.Cleanup()
+
+ if err != nil {
+ t.Errorf("could not serve server %v", err)
+ }
+
+ if len(rt.writer.Stream) != 1 {
+ t.Errorf("expected only on response, got %d", len(rt.writer.Stream))
+ return
+ }
+
+ resp := rt.writer.Stream[0]
+
+ if resp.Type != lib.ResponseFinal {
+ t.Errorf("unknown stream message Got: %v, expected: serve response", resp)
+ return
+ }
+
+ msg, ok := resp.Message.(string)
+ if !ok {
+ t.Errorf("invalid endpdoint returned from serve: %v", resp.Message)
+ }
+ endpoint, err := r.NewEndpoint(msg)
+ if err != nil {
+ t.Errorf("invalid endpdoint returned from serve: %v", resp.Message)
+ }
+
+ rt.writer.Stream = nil
+
+ // Create a client using app's runtime so it points to the right mounttable.
+ client, err := rt.controller.rt.NewClient()
+
+ if err != nil {
+ t.Errorf("unable to create client: %v", err)
+ }
+
+ expectedWebsocketMessage := []testwriter.Response{
+ testwriter.Response{
+ Type: lib.ResponseDispatcherLookup,
+ Message: map[string]interface{}{
+ "serverId": 0.0,
+ "suffix": "adder",
+ "method": "resolveStep",
+ },
+ },
+ }
+
+ // We have to have a go routine handle the resolveStep call because StartCall blocks until the
+ // resolve step is complete.
+ go func() {
+ // Wait until ResolveStep lookup has been called.
+ if err := rt.writer.WaitForMessage(len(expectedWebsocketMessage)); err != nil {
+ t.Errorf("didn't receive expected message: %v", err)
+ }
+
+ // Handle the ResolveStep
+ dispatcherResponse := map[string]interface{}{
+ "Err": map[string]interface{}{
+ "id": "veyron2/verror.NotFound",
+ "message": "ResolveStep not found",
+ },
+ }
+ bytes, err := json.Marshal(dispatcherResponse)
+ if err != nil {
+ t.Errorf("failed to serailize the response: %v", err)
+ return
+ }
+ rt.controller.HandleLookupResponse(0, string(bytes))
+ }()
+
+ call, err := client.StartCall(rt.controller.rt.NewContext(), "/"+msg+"/adder", test.method, test.inArgs)
+ if err != nil {
+ t.Errorf("failed to start call: %v", err)
+ }
+
+ // This is lookup call for dispatcher.
+ expectedWebsocketMessage = append(expectedWebsocketMessage, testwriter.Response{
+ Type: lib.ResponseDispatcherLookup,
+ Message: map[string]interface{}{
+ "serverId": 0.0,
+ "suffix": "adder",
+ "method": lib.LowercaseFirstCharacter(test.method),
+ },
+ })
+
+ if err := rt.writer.WaitForMessage(len(expectedWebsocketMessage)); err != nil {
+ t.Errorf("didn't receive expected message: %v", err)
+ }
+
+ dispatcherResponse := map[string]interface{}{
+ "handle": 0,
+ "signature": adderServiceSignature,
+ "hasAuthorizer": test.hasAuthorizer,
+ }
+
+ bytes, err := json.Marshal(dispatcherResponse)
+ if err != nil {
+ t.Errorf("failed to serailize the response: %v", err)
+ return
+ }
+ rt.controller.HandleLookupResponse(2, string(bytes))
+
+ typedNames := rt.controller.rt.Identity().PublicID().Names()
+ names := []interface{}{}
+ for _, n := range typedNames {
+ names = append(names, n)
+ }
+
+ // The expectedHandle for the javascript ID. Since we don't always call the authorizer
+ // this handle could be different by the time we make the start rpc call.
+ expectedIDHandle := 1.0
+ expectedFlowCount := int64(4)
+ if test.hasAuthorizer {
+ // If an authorizer exists, it gets called twice. The first time to see if the
+ // client is actually able to make this rpc and a second time to see if the server
+ // is ok with the client getting trace information for the rpc.
+ expectedWebsocketMessage = append(expectedWebsocketMessage, testwriter.Response{
+ Type: lib.ResponseAuthRequest,
+ Message: map[string]interface{}{
+ "serverID": 0.0,
+ "handle": 0.0,
+ "context": map[string]interface{}{
+ "method": lib.LowercaseFirstCharacter(test.method),
+ "name": "adder",
+ "suffix": "adder",
+ "label": 8.0, // This is a read label.
+ "localId": map[string]interface{}{
+ "Handle": 1.0,
+ "Names": names,
+ },
+ "remoteId": map[string]interface{}{
+ "Handle": 2.0,
+ "Names": names,
+ },
+ "localEndpoint": endpoint.String(),
+ "remoteEndpoint": "remoteEndpoint",
+ },
+ },
+ })
+ if err := rt.writer.WaitForMessage(len(expectedWebsocketMessage)); err != nil {
+ t.Errorf("didn't receive expected message: %v", err)
+ }
+
+ cleanUpAuthRequest(&rt.writer.Stream[len(expectedWebsocketMessage)-1], t)
+ authResponse := map[string]interface{}{
+ "err": test.authError,
+ }
+
+ bytes, err := json.Marshal(authResponse)
+ if err != nil {
+ t.Errorf("failed to serailize the response: %v", err)
+ return
+ }
+ rt.controller.HandleAuthResponse(4, string(bytes))
+
+ // If we expected an auth error, we should go ahead and finish this rpc to get back
+ // the auth error.
+ if test.authError != nil {
+ var result interface{}
+ var err2 error
+ err := call.Finish(&result, &err2)
+ testwriter.CheckResponses(rt.writer, expectedWebsocketMessage, nil, t)
+ // We can't do a deep equal with authError because the error returned by the
+ // authorizer is wrapped into another error by the ipc framework.
+ if err == nil {
+ t.Errorf("unexpected auth error, expected %v, got %v", test.authError, err)
+ }
+ return
+ }
+
+ // The debug authorize call is identical to the regular auth call with a different
+ // label.
+ expectedWebsocketMessage = append(expectedWebsocketMessage, testwriter.Response{
+ Type: lib.ResponseAuthRequest,
+ Message: map[string]interface{}{
+ "serverID": 0.0,
+ "handle": 0.0,
+ "context": map[string]interface{}{
+ "method": lib.LowercaseFirstCharacter(test.method),
+ "name": "adder",
+ "suffix": "adder",
+ "label": 16.0,
+ "localId": map[string]interface{}{
+ "Handle": 3.0,
+ "Names": names,
+ },
+ "remoteId": map[string]interface{}{
+ "Handle": 4.0,
+ "Names": names,
+ },
+ "localEndpoint": endpoint.String(),
+ "remoteEndpoint": "remoteEndpoint",
+ },
+ },
+ })
+
+ if err := rt.writer.WaitForMessage(len(expectedWebsocketMessage)); err != nil {
+ t.Errorf("didn't receive expected message: %v", err)
+ }
+
+ cleanUpAuthRequest(&rt.writer.Stream[len(expectedWebsocketMessage)-1], t)
+ authResponse = map[string]interface{}{}
+
+ bytes, err = json.Marshal(authResponse)
+ if err != nil {
+ t.Errorf("failed to serailize the response: %v", err)
+ return
+ }
+ rt.controller.HandleAuthResponse(6, string(bytes))
+
+ expectedIDHandle += 4
+ expectedFlowCount += 4
+ }
+
+ // Now we expect the rpc to be invoked on the Javascript server.
+ expectedWebsocketMessage = append(expectedWebsocketMessage, testwriter.Response{
+ Type: lib.ResponseServerRequest,
+ Message: map[string]interface{}{
+ "ServerId": 0.0,
+ "Method": lib.LowercaseFirstCharacter(test.method),
+ "Handle": 0.0,
+ "Args": test.inArgs,
+ "Context": map[string]interface{}{
+ "Name": "adder",
+ "Suffix": "adder",
+ "RemoteID": map[string]interface{}{
+ "Handle": expectedIDHandle,
+ "Names": names,
+ },
+ },
+ },
+ })
+
+ // Wait until the rpc has started.
+ if err := rt.writer.WaitForMessage(len(expectedWebsocketMessage)); err != nil {
+ t.Errorf("didn't receive expected message: %v", err)
+ }
+ for _, msg := range test.clientStream {
+ expectedWebsocketMessage = append(expectedWebsocketMessage, testwriter.Response{Type: lib.ResponseStream, Message: msg})
+ if err := call.Send(msg); err != nil {
+ t.Errorf("unexpected error while sending %v: %v", msg, err)
+ }
+ }
+
+ // Wait until all the streaming messages have been acknowledged.
+ if err := rt.writer.WaitForMessage(len(expectedWebsocketMessage)); err != nil {
+ t.Errorf("didn't receive expected message: %v", err)
+ }
+
+ expectedWebsocketMessage = append(expectedWebsocketMessage, testwriter.Response{Type: lib.ResponseStreamClose})
+
+ expectedStream := test.expectedServerStream
+ go sendServerStream(t, rt.controller, &test, rt.writer, expectedFlowCount)
+ for {
+ var data interface{}
+ if err := call.Recv(&data); err != nil {
+ break
+ }
+ if len(expectedStream) == 0 {
+ t.Errorf("unexpected stream value: %v", data)
+ continue
+ }
+ if !reflect.DeepEqual(data, expectedStream[0]) {
+ t.Errorf("unexpected stream value: got %v, expected %v", data, expectedStream[0])
+ }
+ expectedStream = expectedStream[1:]
+ }
+ var result interface{}
+ var err2 error
+
+ if err := call.Finish(&result, &err2); err != nil {
+ t.Errorf("unexpected err :%v", err)
+ }
+
+ if !reflect.DeepEqual(result, test.finalResponse) {
+ t.Errorf("unexected final response: got %v, expected %v", result, test.finalResponse)
+ }
+
+ // If err2 is nil and test.err is nil reflect.DeepEqual will return false because the
+ // types are different. Because of this, we only use reflect.DeepEqual if one of
+ // the values is non-nil. If both values are nil, then we consider them equal.
+ if (err2 != nil || test.err != nil) && !reflect.DeepEqual(err2, test.err) {
+ t.Errorf("unexected error: got %v, expected %v", err2, test.err)
+ }
+
+ // Wait until the close streaming messages have been acknowledged.
+ if err := rt.writer.WaitForMessage(len(expectedWebsocketMessage)); err != nil {
+ t.Errorf("didn't receive expected message: %v", err)
+ }
+
+ testwriter.CheckResponses(rt.writer, expectedWebsocketMessage, nil, t)
+}
+
+func TestSimpleJSServer(t *testing.T) {
+ runJsServerTestCase(t, jsServerTestCase{
+ method: "Add",
+ inArgs: []interface{}{1.0, 2.0},
+ finalResponse: 3.0,
+ })
+}
+
+func TestJSServerWithAuthorizer(t *testing.T) {
+ runJsServerTestCase(t, jsServerTestCase{
+ method: "Add",
+ inArgs: []interface{}{1.0, 2.0},
+ finalResponse: 3.0,
+ hasAuthorizer: true,
+ })
+}
+
+func TestJSServerWithError(t *testing.T) {
+ runJsServerTestCase(t, jsServerTestCase{
+ method: "Add",
+ inArgs: []interface{}{1.0, 2.0},
+ finalResponse: 3.0,
+ err: &verror.Standard{
+ ID: verror.Internal,
+ Msg: "JS Server failed",
+ },
+ })
+}
+
+func TestJSServerWithAuthorizerAndAuthError(t *testing.T) {
+ runJsServerTestCase(t, jsServerTestCase{
+ method: "Add",
+ inArgs: []interface{}{1.0, 2.0},
+ finalResponse: 3.0,
+ hasAuthorizer: true,
+ authError: &verror.Standard{
+ ID: verror.Internal,
+ Msg: "JS Server failed",
+ },
+ })
+}
+func TestJSServerWihStreamingInputs(t *testing.T) {
+ runJsServerTestCase(t, jsServerTestCase{
+ method: "StreamingAdd",
+ inArgs: []interface{}{},
+ clientStream: []interface{}{3.0, 4.0},
+ finalResponse: 10.0,
+ })
+}
+
+func TestJSServerWihStreamingOutputs(t *testing.T) {
+ runJsServerTestCase(t, jsServerTestCase{
+ method: "StreamingAdd",
+ inArgs: []interface{}{},
+ serverStream: []string{"3", "4"},
+ expectedServerStream: []interface{}{3.0, 4.0},
+ finalResponse: 10.0,
+ })
+}
+
+func TestJSServerWihStreamingInputsAndOutputs(t *testing.T) {
+ runJsServerTestCase(t, jsServerTestCase{
+ method: "StreamingAdd",
+ inArgs: []interface{}{},
+ clientStream: []interface{}{1.0, 2.0},
+ serverStream: []string{"3", "4"},
+ expectedServerStream: []interface{}{3.0, 4.0},
+ finalResponse: 10.0,
+ })
+}
+
+func TestDeserializeCaveat(t *testing.T) {
+ C := func(cav security.Caveat, err error) security.Caveat {
+ if err != nil {
+ t.Fatal(err)
+ }
+ return cav
+ }
+ testCases := []struct {
+ json string
+ expectedValue security.Caveat
+ }{
+ {
+ json: `{"_type":"MethodCaveat","service":"...","data":["Get","MultiGet"]}`,
+ expectedValue: C(security.MethodCaveat("Get", "MultiGet")),
+ },
+ {
+ json: `{"_type":"PeerBlessingsCaveat","service":"...","data":["veyron.io/veyron/veyron/batman","veyron.io/veyron/veyron/brucewayne"]}`,
+ expectedValue: C(security.PeerBlessingsCaveat("veyron.io/veyron/veyron/batman", "veyron.io/veyron/veyron/brucewayne")),
+ },
+ }
+
+ for _, c := range testCases {
+ var s jsonCaveatValidator
+ if err := json.Unmarshal([]byte(c.json), &s); err != nil {
+ t.Errorf("Failed to deserialize object: %v", err)
+ return
+ }
+
+ caveat, err := decodeCaveat(s)
+ if err != nil {
+ t.Errorf("Failed to convert json caveat to go object: %v")
+ return
+ }
+
+ if !reflect.DeepEqual(caveat, c.expectedValue) {
+ t.Errorf("decoded produced the wrong value: got %v, expected %v", caveat, c.expectedValue)
+ }
+ }
+}
diff --git a/services/wsprd/app/stream.go b/services/wsprd/app/stream.go
new file mode 100644
index 0000000..74819b6
--- /dev/null
+++ b/services/wsprd/app/stream.go
@@ -0,0 +1,84 @@
+package app
+
+import (
+ "fmt"
+
+ "veyron.io/veyron/veyron2/ipc"
+ "veyron.io/veyron/veyron2/vom"
+ "veyron.io/wspr/veyron/services/wsprd/lib"
+)
+
+type initConfig struct {
+ stream ipc.Stream
+ inType vom.Type
+}
+
+type message struct {
+ data string
+ writer lib.ClientWriter
+}
+
+// oustandingStream provides a stream-like api with the added ability to
+// queue up messages if the stream hasn't been initialized first. send
+// can be called before init has been called, but no data will be sent
+// until init is called.
+type outstandingStream struct {
+ // The channel on which the stream and the type
+ // of data on the stream is sent after the stream
+ // has been constructed.
+ initChan chan *initConfig
+ // The queue of messages to write out.
+ messages chan *message
+ // done will be notified when the stream has been closed.
+ done chan bool
+}
+
+func newStream() *outstandingStream {
+ os := &outstandingStream{
+ initChan: make(chan *initConfig, 1),
+ // We allow queueing up to 100 messages before init is called.
+ // TODO(bjornick): Deal with the case that the queue is full.
+ messages: make(chan *message, 100),
+ done: make(chan bool),
+ }
+ go os.loop()
+ return os
+}
+
+func (os *outstandingStream) send(data string, w lib.ClientWriter) {
+ os.messages <- &message{data, w}
+}
+
+func (os *outstandingStream) end() {
+ close(os.messages)
+}
+
+// Waits until the stream has been closed and all the messages
+// have been drained.
+func (os *outstandingStream) waitUntilDone() {
+ <-os.done
+}
+
+func (os *outstandingStream) loop() {
+ config := <-os.initChan
+ for msg := range os.messages {
+ payload, err := vom.JSONToObject(msg.data, config.inType)
+ if err != nil {
+ msg.writer.Error(fmt.Errorf("error while converting json to InStreamType (%s): %v", msg.data, err))
+ continue
+ }
+ if err := config.stream.Send(payload); err != nil {
+ msg.writer.Error(fmt.Errorf("failed to send on stream: %v", err))
+ }
+
+ }
+ close(os.done)
+ // If this is a client rpc, we need to call CloseSend on it.
+ if call, ok := config.stream.(ipc.Call); ok {
+ call.CloseSend()
+ }
+}
+
+func (os *outstandingStream) init(stream ipc.Stream, inType vom.Type) {
+ os.initChan <- &initConfig{stream, inType}
+}
diff --git a/services/wsprd/identity/identity.go b/services/wsprd/identity/identity.go
new file mode 100644
index 0000000..1c20413
--- /dev/null
+++ b/services/wsprd/identity/identity.go
@@ -0,0 +1,214 @@
+// Implements an identity manager that maps origins to veyron identities. Each instance
+// of wspr is expected to have only one user at a time that will be signed in. In this case,
+// the user means the person using the app. Each user may use many different names, which in
+// practice will be done by having multiple accounts across many identity providers (i.e google,
+// facebook,etc). This is similar to having a master identity that is linked to multiple identities
+// in today's technology. For each app/origin, the user may choose which name to provide that app,
+// which results in a blessed identity for that app. Each blessed identity will have a different
+// private key and ideally, all accounts will share the same private key, but for now they are also
+// separate. The identity manager only serializes the mapping of app to account and the account
+// information, but not the private keys for each app.
+// TODO(bjornick,ataly,ashankar): Have all the accounts share the same private key which will be stored
+// in a TPM, so no private key gets serialized to disk.
+package identity
+
+import (
+ "io"
+ "net/url"
+ "sync"
+ "time"
+
+ "veyron.io/veyron/veyron/security/serialization"
+
+ "veyron.io/veyron/veyron2"
+ "veyron.io/veyron/veyron2/security"
+ "veyron.io/veyron/veyron2/verror"
+ "veyron.io/veyron/veyron2/vom"
+)
+
+// permissions is a set of a permissions given to an app, containing the account
+// the app has access to and the caveats associated with it.
+type permissions struct {
+ // The account name that is given to an app.
+ Account string
+ Caveats []security.Caveat
+}
+
+// persistentState is the state of the manager that will be persisted to disk.
+type persistentState struct {
+ // A mapping of origins to the permissions provide for the origin (such as
+ // caveats and the account given to the origin)
+ Origins map[string]permissions
+
+ // A set of accounts that maps from a name to the account.
+ Accounts map[string]security.PrivateID
+}
+
+// Serializer is a factory for managing the readers and writers used by the
+// IDManager for serialization and deserialization
+type Serializer interface {
+ // Readers returns io.Readers for reading the IDManager's serialized
+ // data and its signature.
+ Readers() (data io.Reader, signature io.Reader, err error)
+ // Writers returns io.WriteClosers for writing the IDManager's
+ // serialized data and integrity its signature.
+ Writers() (data io.WriteCloser, signature io.WriteCloser, err error)
+}
+
+var OriginDoesNotExist = verror.NoExistf("origin not found")
+
+// IDManager manages app identities. We only serialize the accounts associated with
+// this id manager and the mapping of apps to permissions that they were given.
+type IDManager struct {
+ mu sync.Mutex
+ state persistentState
+
+ // The runtime that will be used to create new identities
+ rt veyron2.Runtime
+
+ serializer Serializer
+}
+
+// NewIDManager creates a new IDManager by reading it from the serializer passed in.
+// serializer can't be nil
+func NewIDManager(rt veyron2.Runtime, serializer Serializer) (*IDManager, error) {
+ result := &IDManager{
+ rt: rt,
+ state: persistentState{
+ Origins: map[string]permissions{},
+ Accounts: map[string]security.PrivateID{},
+ },
+ serializer: serializer,
+ }
+
+ data, signature, err := serializer.Readers()
+ if err != nil {
+ return nil, err
+ }
+ if (data == nil) || (signature == nil) {
+ // No serialized data exists, returning an empty IDManager.
+ return result, nil
+ }
+ vr, err := serialization.NewVerifyingReader(data, signature, rt.Identity().PublicKey())
+ if err != nil {
+ return nil, err
+ }
+ if err := vom.NewDecoder(vr).Decode(&result.state); err != nil {
+ return nil, err
+ }
+ return result, nil
+}
+
+func (i *IDManager) save() error {
+ data, signature, err := i.serializer.Writers()
+ if err != nil {
+ return err
+ }
+
+ swc, err := serialization.NewSigningWriteCloser(data, signature, i.rt.Identity(), nil)
+ if err != nil {
+ return err
+ }
+ if err := vom.NewEncoder(swc).Encode(i.state); err != nil {
+ return err
+ }
+ return swc.Close()
+}
+
+// Identity returns the identity for an origin. Returns OriginDoesNotExist if
+// there is no identity for the origin.
+func (i *IDManager) Identity(origin string) (security.PrivateID, error) {
+ i.mu.Lock()
+ defer i.mu.Unlock()
+ perm, found := i.state.Origins[origin]
+ if !found {
+ return nil, OriginDoesNotExist
+ }
+ blessedID, err := i.generateBlessedID(origin, perm.Account, perm.Caveats)
+ if err != nil {
+ return nil, err
+ }
+ return blessedID, nil
+}
+
+// AccountsMatching returns a list of accounts that match the given pattern.
+func (i *IDManager) AccountsMatching(trustedRoot security.BlessingPattern) []string {
+ i.mu.Lock()
+ defer i.mu.Unlock()
+ result := []string{}
+ for name, id := range i.state.Accounts {
+ if trustedRoot.MatchedBy(id.PublicID().Names()...) {
+ result = append(result, name)
+ }
+ }
+ return result
+}
+
+// AddAccount associates a PrivateID with an account name.
+func (i *IDManager) AddAccount(name string, id security.PrivateID) error {
+ i.mu.Lock()
+ defer i.mu.Unlock()
+ old, existed := i.state.Accounts[name]
+ i.state.Accounts[name] = id
+ if err := i.save(); err != nil {
+ delete(i.state.Accounts, name)
+ if existed {
+ i.state.Accounts[name] = old
+ }
+ return err
+ }
+ return nil
+}
+
+// AddOrigin adds an origin to the manager linked to a the given account.
+func (i *IDManager) AddOrigin(origin string, account string, caveats []security.Caveat) error {
+ i.mu.Lock()
+ defer i.mu.Unlock()
+ if _, found := i.state.Accounts[account]; !found {
+ return verror.NoExistf("unknown account %s", account)
+ }
+
+ old, existed := i.state.Origins[origin]
+
+ i.state.Origins[origin] = permissions{account, caveats}
+
+ if err := i.save(); err != nil {
+ delete(i.state.Origins, origin)
+ if existed {
+ i.state.Origins[origin] = old
+ }
+
+ return err
+ }
+
+ return nil
+}
+
+func (i *IDManager) generateBlessedID(origin string, account string, caveats []security.Caveat) (security.PrivateID, error) {
+ blessor := i.state.Accounts[account]
+ if blessor == nil {
+ return nil, verror.NoExistf("unknown account %s", account)
+ }
+ // Origins have the form protocol://hostname:port, which is not a valid
+ // blessing name. Hence we must url-encode.
+ name := url.QueryEscape(origin)
+ blessee, err := i.rt.NewIdentity(name)
+ if err != nil {
+ return nil, err
+ }
+
+ blessed, err := blessor.Bless(blessee.PublicID(), name, 24*time.Hour, caveats)
+
+ if err != nil {
+ return nil, verror.NoAccessf("failed to bless id: %v", err)
+ }
+
+ if blessee, err = blessee.Derive(blessed); err != nil {
+ return nil, verror.Internalf("failed to derive private id: %v", err)
+ }
+ return blessee, nil
+}
+
+func init() {
+ vom.Register(&persistentState{})
+}
diff --git a/services/wsprd/identity/identity_test.go b/services/wsprd/identity/identity_test.go
new file mode 100644
index 0000000..447faed
--- /dev/null
+++ b/services/wsprd/identity/identity_test.go
@@ -0,0 +1,137 @@
+package identity
+
+import (
+ "net/url"
+ "reflect"
+ "sort"
+ "strings"
+ "testing"
+ "time"
+
+ "veyron.io/veyron/veyron2"
+ "veyron.io/veyron/veyron2/rt"
+ "veyron.io/veyron/veyron2/security"
+)
+
+func createChain(r veyron2.Runtime, name string) security.PrivateID {
+ id := r.Identity()
+
+ for _, component := range strings.Split(name, "/") {
+ newID, err := r.NewIdentity(component)
+ if err != nil {
+ panic(err)
+ }
+ if id == nil {
+ id = newID
+ continue
+ }
+ blessedID, err := id.Bless(newID.PublicID(), component, time.Hour, nil)
+ if err != nil {
+ panic(err)
+ }
+ id, err = newID.Derive(blessedID)
+ if err != nil {
+ panic(err)
+ }
+ }
+ return id
+}
+
+func TestSavingAndFetchingIdentity(t *testing.T) {
+ r := rt.Init()
+ manager, err := NewIDManager(r, &InMemorySerializer{})
+ if err != nil {
+ t.Fatalf("creating identity manager failed with: %v", err)
+ }
+ origin := "http://sampleapp.com:80"
+ account := "google/user1"
+ manager.AddAccount(account, createChain(r, account))
+ if err := manager.AddOrigin(origin, account, nil); err != nil {
+ t.Fatalf("failed to generate id: %v", err)
+ }
+
+ id, err := manager.Identity(origin)
+ if err != nil {
+ t.Errorf("failed to get an identity for %v: %v", origin, err)
+ }
+ want := []string{createChain(r, account).PublicID().Names()[0] + "/" + url.QueryEscape(origin)}
+ if got := id.PublicID().Names(); !reflect.DeepEqual(got, want) {
+ t.Errorf("unexpected identity name. got: %v, wanted: %v", got, want)
+ }
+
+ unknownOrigin := "http://unknown.com:80"
+ if _, err := manager.Identity(unknownOrigin); err != OriginDoesNotExist {
+ t.Error("should not have found an identity for %v", unknownOrigin)
+ }
+}
+
+func TestAccountsMatching(t *testing.T) {
+ r := rt.Init()
+ topLevelName := r.Identity().PublicID().Names()[0]
+ manager, err := NewIDManager(r, &InMemorySerializer{})
+ if err != nil {
+ t.Fatalf("creating identity manager failed with: %v", err)
+ }
+ googleAccount1 := "google/user1"
+ googleAccount2 := "google/user2"
+ facebookAccount := "facebook/user1"
+ manager.AddAccount(googleAccount1, createChain(r, googleAccount1))
+ manager.AddAccount(googleAccount2, createChain(r, googleAccount2))
+ manager.AddAccount(facebookAccount, createChain(r, facebookAccount))
+
+ result := manager.AccountsMatching(security.BlessingPattern(topLevelName + "/google/..."))
+ sort.StringSlice(result).Sort()
+ expected := []string{googleAccount1, googleAccount2}
+ if !reflect.DeepEqual(result, expected) {
+ t.Errorf("unexpected result from AccountsMatching, expected :%v, got: %v", expected, result)
+ }
+}
+
+func TestGenerateIDWithUnknownBlesser(t *testing.T) {
+ r := rt.Init()
+ manager, err := NewIDManager(r, &InMemorySerializer{})
+ if err != nil {
+ t.Fatalf("creating identity manager failed with: %v", err)
+ }
+
+ err = manager.AddOrigin("http://sampleapp.com:80", "google/user1", nil)
+
+ if err == nil {
+ t.Errorf("should have failed to generated an id blessed by google/user1")
+ }
+}
+
+func TestSerializingAndDeserializing(t *testing.T) {
+ r := rt.Init()
+ var serializer InMemorySerializer
+
+ manager, err := NewIDManager(r, &serializer)
+ if err != nil {
+ t.Fatalf("creating identity manager failed with: %v", err)
+ }
+ manager.AddAccount("google/user1", createChain(r, "google/user1"))
+ origin1 := "https://sampleapp-1.com:443"
+ account := "google/user1"
+ if err = manager.AddOrigin(origin1, account, nil); err != nil {
+ t.Fatalf("failed to generate id: %v", err)
+ }
+
+ newManager, err := NewIDManager(r, &serializer)
+
+ if err != nil {
+ t.Fatalf("failed to deserialize data: %v", err)
+ }
+ id, err := newManager.Identity(origin1)
+ if err != nil {
+ t.Errorf("can't find the %v identity: %v", origin1, err)
+ }
+ want := []string{createChain(r, account).PublicID().Names()[0] + "/" + url.QueryEscape(origin1)}
+ if got := id.PublicID().Names(); !reflect.DeepEqual(got, want) {
+ t.Errorf("unexpected identity name. got: %v, wanted: %v", got, want)
+ }
+
+ origin2 := "https://sampleapp-2.com:443"
+ if err := newManager.AddOrigin(origin2, account, nil); err != nil {
+ t.Errorf("can't find the %v identity: %v", origin2, err)
+ }
+}
diff --git a/services/wsprd/identity/in_memory_serializer.go b/services/wsprd/identity/in_memory_serializer.go
new file mode 100644
index 0000000..99a565d
--- /dev/null
+++ b/services/wsprd/identity/in_memory_serializer.go
@@ -0,0 +1,40 @@
+package identity
+
+import (
+ "bytes"
+ "io"
+)
+
+// bufferCloser implements io.ReadWriteCloser.
+type bufferCloser struct {
+ bytes.Buffer
+}
+
+func (*bufferCloser) Close() error {
+ return nil
+}
+
+// InMemorySerializer implements Serializer. This Serializer should only be
+// used in tests.
+// TODO(ataly, bjornick): Get rid of all uses of this Serializer from non-test
+// code and use a file backed (or some persistent storage backed) Serializer there
+// instead.
+type InMemorySerializer struct {
+ data bufferCloser
+ signature bufferCloser
+ hasData bool
+}
+
+func (s *InMemorySerializer) Readers() (io.Reader, io.Reader, error) {
+ if !s.hasData {
+ return nil, nil, nil
+ }
+ return &s.data, &s.signature, nil
+}
+
+func (s *InMemorySerializer) Writers() (io.WriteCloser, io.WriteCloser, error) {
+ s.hasData = true
+ s.data.Reset()
+ s.signature.Reset()
+ return &s.data, &s.signature, nil
+}
diff --git a/services/wsprd/identity/js_identity_store.go b/services/wsprd/identity/js_identity_store.go
new file mode 100644
index 0000000..21c5fb9
--- /dev/null
+++ b/services/wsprd/identity/js_identity_store.go
@@ -0,0 +1,49 @@
+package identity
+
+import (
+ "sync"
+
+ "veyron.io/veyron/veyron2/security"
+)
+
+// JSPublicIDHandles is a store for PublicIDs in use by JS code.
+// We don't pass the full PublicID to avoid serializing and deserializing a
+// potentially huge forest of blessings. Instead we pass to JS a handle to a public
+// identity and have all operations involve cryptographic operations call into go.
+type JSPublicIDHandles struct {
+ mu sync.Mutex
+ lastHandle int64
+ store map[int64]security.PublicID
+}
+
+// NewJSPublicIDHandles returns a newly initialized JSPublicIDHandles
+func NewJSPublicIDHandles() *JSPublicIDHandles {
+ return &JSPublicIDHandles{
+ store: map[int64]security.PublicID{},
+ }
+}
+
+// Add adds a PublicID to the store and returns the handle to it.
+func (s *JSPublicIDHandles) Add(identity security.PublicID) int64 {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ s.lastHandle++
+ handle := s.lastHandle
+ s.store[handle] = identity
+ return handle
+}
+
+// Remove removes the PublicID associated with the handle.
+func (s *JSPublicIDHandles) Remove(handle int64) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ delete(s.store, handle)
+}
+
+// Get returns the PublicID represented by the handle. Returns nil
+// if no PublicID exists for the handle.
+func (s *JSPublicIDHandles) Get(handle int64) security.PublicID {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ return s.store[handle]
+}
diff --git a/services/wsprd/ipc/server/authorizer.go b/services/wsprd/ipc/server/authorizer.go
new file mode 100644
index 0000000..52046ee
--- /dev/null
+++ b/services/wsprd/ipc/server/authorizer.go
@@ -0,0 +1,13 @@
+package server
+
+import (
+ "veyron.io/veyron/veyron2/security"
+)
+
+type authorizer struct {
+ authFunc remoteAuthFunc
+}
+
+func (a *authorizer) Authorize(ctx security.Context) error {
+ return a.authFunc(ctx)
+}
diff --git a/services/wsprd/ipc/server/dispatcher.go b/services/wsprd/ipc/server/dispatcher.go
new file mode 100644
index 0000000..f3d5df1
--- /dev/null
+++ b/services/wsprd/ipc/server/dispatcher.go
@@ -0,0 +1,144 @@
+package server
+
+import (
+ "bytes"
+ "encoding/json"
+ "fmt"
+ "sync"
+
+ "veyron.io/wspr/veyron/services/wsprd/lib"
+ "veyron.io/wspr/veyron/services/wsprd/signature"
+
+ "veyron.io/veyron/veyron2/ipc"
+ "veyron.io/veyron/veyron2/security"
+ "veyron.io/veyron/veyron2/verror"
+ "veyron.io/veyron/veyron2/vlog"
+)
+
+type flowFactory interface {
+ createFlow() *Flow
+ cleanupFlow(id int64)
+}
+
+type invokerFactory interface {
+ createInvoker(handle int64, signature signature.JSONServiceSignature, label security.Label) (ipc.Invoker, error)
+}
+
+type authFactory interface {
+ createAuthorizer(handle int64, hasAuthorizer bool) (security.Authorizer, error)
+}
+
+type lookupReply struct {
+ Handle int64
+ HasAuthorizer bool
+ Label security.Label
+ Signature signature.JSONServiceSignature
+ Err *verror.Standard
+}
+
+type dispatcherRequest struct {
+ ServerID uint64 `json:"serverId"`
+ Suffix string `json:"suffix"`
+ Method string `json:"method"`
+}
+
+// dispatcher holds the invoker and the authorizer to be used for lookup.
+type dispatcher struct {
+ mu sync.Mutex
+ serverID uint64
+ flowFactory flowFactory
+ invokerFactory invokerFactory
+ authFactory authFactory
+ logger vlog.Logger
+ outstandingLookups map[int64]chan lookupReply
+}
+
+var _ ipc.Dispatcher = (*dispatcher)(nil)
+
+// newDispatcher is a dispatcher factory.
+func newDispatcher(serverID uint64, flowFactory flowFactory, invokerFactory invokerFactory, authFactory authFactory, logger vlog.Logger) *dispatcher {
+ return &dispatcher{
+ serverID: serverID,
+ flowFactory: flowFactory,
+ invokerFactory: invokerFactory,
+ authFactory: authFactory,
+ logger: logger,
+ outstandingLookups: make(map[int64]chan lookupReply),
+ }
+}
+
+// Lookup implements dispatcher interface Lookup.
+func (d *dispatcher) Lookup(suffix, method string) (ipc.Invoker, security.Authorizer, error) {
+ flow := d.flowFactory.createFlow()
+ d.mu.Lock()
+ ch := make(chan lookupReply, 1)
+ d.outstandingLookups[flow.ID] = ch
+ d.mu.Unlock()
+
+ message := dispatcherRequest{
+ ServerID: d.serverID,
+ Suffix: suffix,
+ Method: lib.LowercaseFirstCharacter(method),
+ }
+ if err := flow.Writer.Send(lib.ResponseDispatcherLookup, message); err != nil {
+ ch <- lookupReply{
+ Err: &verror.Standard{
+ ID: verror.Internal,
+ Msg: fmt.Sprintf("could not marshal the method call data: %v", err),
+ },
+ }
+ }
+ request := <-ch
+
+ d.mu.Lock()
+ delete(d.outstandingLookups, flow.ID)
+ d.mu.Unlock()
+
+ d.flowFactory.cleanupFlow(flow.ID)
+
+ if request.Err != nil {
+ return nil, nil, request.Err
+ }
+
+ if request.Handle < 0 {
+ return nil, nil, verror.NoExistf("ipc: dispatcher for %s not found", suffix)
+ }
+
+ invoker, err := d.invokerFactory.createInvoker(request.Handle, request.Signature, request.Label)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ auth, err := d.authFactory.createAuthorizer(request.Handle, request.HasAuthorizer)
+
+ return invoker, auth, err
+}
+
+func (d *dispatcher) handleLookupResponse(id int64, data string) {
+ d.mu.Lock()
+ ch := d.outstandingLookups[id]
+ d.mu.Unlock()
+
+ if ch == nil {
+ d.flowFactory.cleanupFlow(id)
+ d.logger.Errorf("unknown invoke request for flow: %d", id)
+ return
+ }
+
+ var request lookupReply
+ decoder := json.NewDecoder(bytes.NewBufferString(data))
+ if err := decoder.Decode(&request); err != nil {
+ request = lookupReply{
+ Err: &verror.Standard{
+ ID: verror.Internal,
+ Msg: fmt.Sprintf("could not unmarshal invoke request: %v", err),
+ },
+ }
+ d.logger.Errorf("unmarshaling invoke request failed: %v", err)
+ }
+ ch <- request
+}
+
+// StopServing implements dispatcher StopServing.
+func (*dispatcher) StopServing() {
+}
diff --git a/services/wsprd/ipc/server/dispatcher_test.go b/services/wsprd/ipc/server/dispatcher_test.go
new file mode 100644
index 0000000..2530fe2
--- /dev/null
+++ b/services/wsprd/ipc/server/dispatcher_test.go
@@ -0,0 +1,187 @@
+package server
+
+import (
+ "fmt"
+ "reflect"
+ "testing"
+ "veyron.io/veyron/veyron2/ipc"
+ "veyron.io/veyron/veyron2/rt"
+ "veyron.io/veyron/veyron2/security"
+ "veyron.io/wspr/veyron/services/wsprd/lib"
+ "veyron.io/wspr/veyron/services/wsprd/lib/testwriter"
+ "veyron.io/wspr/veyron/services/wsprd/signature"
+)
+
+type mockFlowFactory struct {
+ writer testwriter.Writer
+}
+
+func (m *mockFlowFactory) createFlow() *Flow {
+ return &Flow{ID: 0, Writer: &m.writer}
+}
+
+func (*mockFlowFactory) cleanupFlow(int64) {}
+
+type mockInvoker struct {
+ handle int64
+ sig signature.JSONServiceSignature
+ label security.Label
+}
+
+func (m mockInvoker) Prepare(string, int) ([]interface{}, security.Label, error) {
+ return nil, m.label, nil
+}
+
+func (mockInvoker) Invoke(string, ipc.ServerCall, []interface{}) ([]interface{}, error) {
+ return nil, nil
+}
+
+type mockInvokerFactory struct{}
+
+func (mockInvokerFactory) createInvoker(handle int64, sig signature.JSONServiceSignature, label security.Label) (ipc.Invoker, error) {
+ return &mockInvoker{handle: handle, sig: sig, label: label}, nil
+}
+
+type mockAuthorizer struct {
+ handle int64
+ hasAuthorizer bool
+}
+
+func (mockAuthorizer) Authorize(security.Context) error { return nil }
+
+type mockAuthorizerFactory struct{}
+
+func (mockAuthorizerFactory) createAuthorizer(handle int64, hasAuthorizer bool) (security.Authorizer, error) {
+ return mockAuthorizer{handle: handle, hasAuthorizer: hasAuthorizer}, nil
+}
+
+func init() {
+ rt.Init()
+}
+
+func TestSuccessfulLookup(t *testing.T) {
+ flowFactory := &mockFlowFactory{}
+ d := newDispatcher(0, flowFactory, mockInvokerFactory{}, mockAuthorizerFactory{}, rt.R().Logger())
+ go func() {
+ if err := flowFactory.writer.WaitForMessage(1); err != nil {
+ t.Errorf("failed to get dispatch request %v", err)
+ t.Fail()
+ }
+ signature := `{"add":{"inArgs":["foo","bar"],"numOutArgs":1,"isStreaming":false}}`
+ jsonResponse := fmt.Sprintf(`{"handle":1,"hasAuthorizer":false,"label":%d,"signature":%s}`, security.WriteLabel, signature)
+ d.handleLookupResponse(0, jsonResponse)
+ }()
+
+ invoker, auth, err := d.Lookup("a/b", "Read")
+
+ if err != nil {
+ t.Errorf("Unexpected error: %v", err)
+ }
+
+ expectedSig := signature.JSONServiceSignature{
+ "add": signature.JSONMethodSignature{
+ InArgs: []string{"foo", "bar"},
+ NumOutArgs: 1,
+ },
+ }
+ expectedInvoker := &mockInvoker{handle: 1, sig: expectedSig, label: security.WriteLabel}
+ if !reflect.DeepEqual(invoker, expectedInvoker) {
+ t.Errorf("wrong invoker returned, expected: %v, got :%v", expectedInvoker, invoker)
+ }
+
+ expectedAuth := mockAuthorizer{handle: 1, hasAuthorizer: false}
+ if !reflect.DeepEqual(auth, expectedAuth) {
+ t.Errorf("wrong authorizer returned, expected: %v, got :%v", expectedAuth, auth)
+ }
+
+ expectedResponses := []testwriter.Response{
+ testwriter.Response{
+ Type: lib.ResponseDispatcherLookup,
+ Message: map[string]interface{}{
+ "serverId": 0.0,
+ "suffix": "a/b",
+ "method": "read",
+ },
+ },
+ }
+ testwriter.CheckResponses(&flowFactory.writer, expectedResponses, nil, t)
+}
+
+func TestSuccessfulLookupWithAuthorizer(t *testing.T) {
+ flowFactory := &mockFlowFactory{}
+ d := newDispatcher(0, flowFactory, mockInvokerFactory{}, mockAuthorizerFactory{}, rt.R().Logger())
+ go func() {
+ if err := flowFactory.writer.WaitForMessage(1); err != nil {
+ t.Errorf("failed to get dispatch request %v", err)
+ t.Fail()
+ }
+ signature := `{"add":{"inArgs":["foo","bar"],"numOutArgs":1,"isStreaming":false}}`
+ jsonResponse := fmt.Sprintf(`{"handle":1,"hasAuthorizer":true,"label":%d,"signature":%s}`, security.ReadLabel, signature)
+ d.handleLookupResponse(0, jsonResponse)
+ }()
+
+ invoker, auth, err := d.Lookup("a/b", "Read")
+
+ if err != nil {
+ t.Errorf("Unexpected error: %v", err)
+ }
+
+ expectedSig := signature.JSONServiceSignature{
+ "add": signature.JSONMethodSignature{
+ InArgs: []string{"foo", "bar"},
+ NumOutArgs: 1,
+ },
+ }
+ expectedInvoker := &mockInvoker{handle: 1, sig: expectedSig, label: security.ReadLabel}
+ if !reflect.DeepEqual(invoker, expectedInvoker) {
+ t.Errorf("wrong invoker returned, expected: %v, got :%v", expectedInvoker, invoker)
+ }
+
+ expectedAuth := mockAuthorizer{handle: 1, hasAuthorizer: true}
+ if !reflect.DeepEqual(auth, expectedAuth) {
+ t.Errorf("wrong authorizer returned, expected: %v, got :%v", expectedAuth, auth)
+ }
+
+ expectedResponses := []testwriter.Response{
+ testwriter.Response{
+ Type: lib.ResponseDispatcherLookup,
+ Message: map[string]interface{}{
+ "serverId": 0.0,
+ "suffix": "a/b",
+ "method": "read",
+ },
+ },
+ }
+ testwriter.CheckResponses(&flowFactory.writer, expectedResponses, nil, t)
+}
+
+func TestFailedLookup(t *testing.T) {
+ flowFactory := &mockFlowFactory{}
+ d := newDispatcher(0, flowFactory, mockInvokerFactory{}, mockAuthorizerFactory{}, rt.R().Logger())
+ go func() {
+ if err := flowFactory.writer.WaitForMessage(1); err != nil {
+ t.Errorf("failed to get dispatch request %v", err)
+ t.Fail()
+ }
+ jsonResponse := `{"err":{"id":"veyron2/verror.Exists","msg":"bad stuff"}}`
+ d.handleLookupResponse(0, jsonResponse)
+ }()
+
+ _, _, err := d.Lookup("a/b", "Read")
+
+ if err == nil {
+ t.Errorf("expected error, but got none", err)
+ }
+
+ expectedResponses := []testwriter.Response{
+ testwriter.Response{
+ Type: lib.ResponseDispatcherLookup,
+ Message: map[string]interface{}{
+ "serverId": 0.0,
+ "suffix": "a/b",
+ "method": "read",
+ },
+ },
+ }
+ testwriter.CheckResponses(&flowFactory.writer, expectedResponses, nil, t)
+}
diff --git a/services/wsprd/ipc/server/invoker.go b/services/wsprd/ipc/server/invoker.go
new file mode 100644
index 0000000..9b6c099
--- /dev/null
+++ b/services/wsprd/ipc/server/invoker.go
@@ -0,0 +1,100 @@
+package server
+
+import (
+ "veyron.io/veyron/veyron2/ipc"
+ "veyron.io/veyron/veyron2/security"
+ "veyron.io/veyron/veyron2/verror"
+)
+
+var typedNil []int
+
+// invoker holds a delegate function to call on invoke and a list of methods that
+// are available for be called.
+type invoker struct {
+ // signature of the service this invoker belogs to
+ sig ipc.ServiceSignature
+ // delegate function to call when an invoke request comes in
+ invokeFunc remoteInvokeFunc
+ // map of special methods like "Signature" which invoker handles on behalf of the actual service
+ predefinedInvokers map[string]ipc.Invoker
+
+ label security.Label
+}
+
+var _ ipc.Invoker = (*invoker)(nil)
+
+// newInvoker is an invoker factory
+func newInvoker(sig ipc.ServiceSignature, label security.Label, invokeFunc remoteInvokeFunc) ipc.Invoker {
+ predefinedInvokers := make(map[string]ipc.Invoker)
+
+ // Special handling for predefined "signature" method
+ predefinedInvokers["Signature"] = newSignatureInvoker(sig)
+
+ i := &invoker{sig, invokeFunc, predefinedInvokers, label}
+ return i
+}
+
+// Prepare implements the Invoker interface.
+func (i *invoker) Prepare(methodName string, numArgs int) ([]interface{}, security.Label, error) {
+
+ if pi := i.predefinedInvokers[methodName]; pi != nil {
+ return pi.Prepare(methodName, numArgs)
+ }
+
+ method, ok := i.sig.Methods[methodName]
+ if !ok {
+ return nil, security.AdminLabel, verror.NoExistf("method name not found in IDL: %s", methodName)
+ }
+
+ argptrs := make([]interface{}, len(method.InArgs))
+
+ for ix := range method.InArgs {
+ var x interface{}
+ argptrs[ix] = &x // Accept AnyData
+ }
+
+ securityLabel := i.label
+
+ if !security.IsValidLabel(securityLabel) {
+ securityLabel = security.AdminLabel
+ }
+
+ return argptrs, securityLabel, nil
+}
+
+// Invoke implements the Invoker interface.
+func (i *invoker) Invoke(methodName string, call ipc.ServerCall, argptrs []interface{}) ([]interface{}, error) {
+
+ if pi := i.predefinedInvokers[methodName]; pi != nil {
+ return pi.Invoke(methodName, call, argptrs)
+ }
+
+ if _, ok := i.sig.Methods[methodName]; !ok {
+ return nil, verror.NoExistf("method name not found in IDL: %s", methodName)
+ }
+
+ replychan := i.invokeFunc(methodName, argptrs, call)
+
+ // Wait for the result
+ reply := <-replychan
+
+ var err error = nil
+ if reply.Err != nil {
+ err = reply.Err
+ }
+
+ for i, v := range reply.Results {
+ if v == nil {
+ reply.Results[i] = typedNil
+ }
+ }
+
+ // We always assume JavaScript services might return error.
+ // JavaScript returns non-error results in reply.Results & error in reply.Err
+ // We add the error as the last result of the ipc invoke call since last
+ // out arg is where application error is expected to be.
+ results := make([]interface{}, len(reply.Results)+1)
+ results = append(reply.Results, err)
+
+ return results, nil
+}
diff --git a/services/wsprd/ipc/server/server.go b/services/wsprd/ipc/server/server.go
new file mode 100644
index 0000000..3151419
--- /dev/null
+++ b/services/wsprd/ipc/server/server.go
@@ -0,0 +1,376 @@
+// An implementation of a server for WSPR
+
+package server
+
+import (
+ "encoding/json"
+ "fmt"
+ "sync"
+
+ vsecurity "veyron.io/veyron/veyron/security"
+ "veyron.io/wspr/veyron/services/wsprd/lib"
+ "veyron.io/wspr/veyron/services/wsprd/signature"
+
+ "veyron.io/veyron/veyron2"
+ "veyron.io/veyron/veyron2/ipc"
+ "veyron.io/veyron/veyron2/security"
+ "veyron.io/veyron/veyron2/verror"
+ "veyron.io/veyron/veyron2/vlog"
+)
+
+type Flow struct {
+ ID int64
+ Writer lib.ClientWriter
+}
+
+// A request from the proxy to javascript to handle an RPC
+type serverRPCRequest struct {
+ ServerId uint64
+ Handle int64
+ Method string
+ Args []interface{}
+ Context serverRPCRequestContext
+}
+
+type publicID struct {
+ Handle int64
+ Names []string
+}
+
+// call context for a serverRPCRequest
+type serverRPCRequestContext struct {
+ Suffix string
+ Name string
+ RemoteID publicID
+}
+
+// The response from the javascript server to the proxy.
+type serverRPCReply struct {
+ Results []interface{}
+ Err *verror.Standard
+}
+
+type FlowHandler interface {
+ CreateNewFlow(server *Server, sender ipc.Stream) *Flow
+
+ CleanupFlow(id int64)
+}
+
+type HandleStore interface {
+ // Adds an identity to the store and returns handle to the identity
+ AddIdentity(identity security.PublicID) int64
+}
+
+type ServerHelper interface {
+ FlowHandler
+ HandleStore
+
+ GetLogger() vlog.Logger
+
+ RT() veyron2.Runtime
+}
+
+type authReply struct {
+ Err *verror.Standard
+}
+
+type context struct {
+ Method string `json:"method"`
+ Name string `json:"name"`
+ Suffix string `json:"suffix"`
+ Label security.Label `json:"label"`
+ LocalID publicID `json:"localId"`
+ RemoteID publicID `json:"remoteId"`
+ LocalEndpoint string `json:"localEndpoint"`
+ RemoteEndpoint string `json:"remoteEndpoint"`
+}
+
+type authRequest struct {
+ ServerID uint64 `json:"serverID"`
+ Handle int64 `json:"handle"`
+ Context context `json:"context"`
+}
+
+type Server struct {
+ mu sync.Mutex
+
+ // The ipc.ListenSpec to use with server.Listen
+ listenSpec *ipc.ListenSpec
+
+ // The server that handles the ipc layer. Listen on this server is
+ // lazily started.
+ server ipc.Server
+
+ // The saved dispatcher to reuse when serve is called multiple times.
+ dispatcher *dispatcher
+
+ // The endpoint of the server. This is empty until the server has been
+ // started and listen has been called on it.
+ endpoint string
+
+ // The server id.
+ id uint64
+ helper ServerHelper
+
+ // The set of outstanding server requests.
+ outstandingServerRequests map[int64]chan *serverRPCReply
+
+ outstandingAuthRequests map[int64]chan error
+}
+
+func NewServer(id uint64, listenSpec *ipc.ListenSpec, helper ServerHelper) (*Server, error) {
+ server := &Server{
+ id: id,
+ helper: helper,
+ listenSpec: listenSpec,
+ outstandingServerRequests: make(map[int64]chan *serverRPCReply),
+ outstandingAuthRequests: make(map[int64]chan error),
+ }
+ var err error
+ if server.server, err = helper.RT().NewServer(); err != nil {
+ return nil, err
+ }
+ return server, nil
+}
+
+// remoteInvokeFunc is a type of function that can invoke a remote method and
+// communicate the result back via a channel to the caller
+type remoteInvokeFunc func(methodName string, args []interface{}, call ipc.ServerCall) <-chan *serverRPCReply
+
+func (s *Server) createRemoteInvokerFunc(handle int64) remoteInvokeFunc {
+ return func(methodName string, args []interface{}, call ipc.ServerCall) <-chan *serverRPCReply {
+ flow := s.helper.CreateNewFlow(s, call)
+ replyChan := make(chan *serverRPCReply, 1)
+ s.mu.Lock()
+ s.outstandingServerRequests[flow.ID] = replyChan
+ s.mu.Unlock()
+ remoteID := call.RemoteID()
+ context := serverRPCRequestContext{
+ Suffix: call.Suffix(),
+ Name: call.Name(),
+ RemoteID: publicID{
+ Handle: s.helper.AddIdentity(remoteID),
+ Names: remoteID.Names(),
+ },
+ }
+ // Send a invocation request to JavaScript
+ message := serverRPCRequest{
+ ServerId: s.id,
+ Handle: handle,
+ Method: lib.LowercaseFirstCharacter(methodName),
+ Args: args,
+ Context: context,
+ }
+
+ if err := flow.Writer.Send(lib.ResponseServerRequest, message); err != nil {
+ // Error in marshaling, pass the error through the channel immediately
+ replyChan <- &serverRPCReply{nil,
+ &verror.Standard{
+ ID: verror.Internal,
+ Msg: fmt.Sprintf("could not marshal the method call data: %v", err)},
+ }
+ return replyChan
+ }
+
+ s.helper.GetLogger().VI(3).Infof("request received to call method %q on "+
+ "JavaScript server with args %v, MessageId %d was assigned.",
+ methodName, args, flow.ID)
+
+ go proxyStream(call, flow.Writer, s.helper.GetLogger())
+ return replyChan
+ }
+}
+
+func proxyStream(stream ipc.Stream, w lib.ClientWriter, logger vlog.Logger) {
+ var item interface{}
+ for err := stream.Recv(&item); err == nil; err = stream.Recv(&item) {
+ if err := w.Send(lib.ResponseStream, item); err != nil {
+ w.Error(verror.Internalf("error marshalling stream: %v:", err))
+ return
+ }
+ }
+
+ if err := w.Send(lib.ResponseStreamClose, nil); err != nil {
+ w.Error(verror.Internalf("error closing stream: %v:", err))
+ return
+ }
+}
+
+func (s *Server) convertPublicID(id security.PublicID) publicID {
+ return publicID{
+ Handle: s.helper.AddIdentity(id),
+ Names: id.Names(),
+ }
+
+}
+
+type remoteAuthFunc func(security.Context) error
+
+func (s *Server) createRemoteAuthFunc(handle int64) remoteAuthFunc {
+ return func(ctx security.Context) error {
+ flow := s.helper.CreateNewFlow(s, nil)
+ replyChan := make(chan error, 1)
+ s.mu.Lock()
+ s.outstandingAuthRequests[flow.ID] = replyChan
+ s.mu.Unlock()
+ message := authRequest{
+ ServerID: s.id,
+ Handle: handle,
+ Context: context{
+ Method: lib.LowercaseFirstCharacter(ctx.Method()),
+ Name: ctx.Name(),
+ Suffix: ctx.Suffix(),
+ Label: ctx.Label(),
+ LocalID: s.convertPublicID(ctx.LocalID()),
+ RemoteID: s.convertPublicID(ctx.RemoteID()),
+ LocalEndpoint: ctx.LocalEndpoint().String(),
+ RemoteEndpoint: ctx.RemoteEndpoint().String(),
+ },
+ }
+ s.helper.GetLogger().VI(0).Infof("Sending out auth request for %v, %v", flow.ID, message)
+
+ if err := flow.Writer.Send(lib.ResponseAuthRequest, message); err != nil {
+ replyChan <- verror.Internalf("failed to find authorizer %v", err)
+ }
+
+ err := <-replyChan
+ s.helper.GetLogger().VI(0).Infof("going to respond with %v", err)
+ s.mu.Lock()
+ delete(s.outstandingAuthRequests, flow.ID)
+ s.mu.Unlock()
+ s.helper.CleanupFlow(flow.ID)
+ return err
+ }
+}
+
+func (s *Server) Serve(name string) (string, error) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ if s.dispatcher == nil {
+ s.dispatcher = newDispatcher(s.id, s, s, s, s.helper.GetLogger())
+ }
+
+ if s.endpoint == "" {
+ endpoint, err := s.server.ListenX(s.listenSpec)
+ if err != nil {
+ return "", err
+ }
+ s.endpoint = endpoint.String()
+ }
+ if err := s.server.Serve(name, s.dispatcher); err != nil {
+ return "", err
+ }
+ s.helper.GetLogger().VI(1).Infof("endpoint is %s", s.endpoint)
+ return s.endpoint, nil
+}
+
+func (s *Server) HandleServerResponse(id int64, data string) {
+ s.mu.Lock()
+ ch := s.outstandingServerRequests[id]
+ delete(s.outstandingServerRequests, id)
+ s.mu.Unlock()
+ if ch == nil {
+ s.helper.GetLogger().Errorf("unexpected result from JavaScript. No channel "+
+ "for MessageId: %d exists. Ignoring the results.", id)
+ //Ignore unknown responses that don't belong to any channel
+ return
+ }
+ // Decode the result and send it through the channel
+ var serverReply serverRPCReply
+ if decoderErr := json.Unmarshal([]byte(data), &serverReply); decoderErr != nil {
+ err := verror.Standard{
+ ID: verror.Internal,
+ Msg: fmt.Sprintf("could not unmarshal the result from the server: %v", decoderErr),
+ }
+ serverReply = serverRPCReply{nil, &err}
+ }
+
+ s.helper.GetLogger().VI(3).Infof("response received from JavaScript server for "+
+ "MessageId %d with result %v", id, serverReply)
+ s.helper.CleanupFlow(id)
+ ch <- &serverReply
+}
+
+func (s *Server) HandleLookupResponse(id int64, data string) {
+ s.dispatcher.handleLookupResponse(id, data)
+}
+
+func (s *Server) HandleAuthResponse(id int64, data string) {
+ s.mu.Lock()
+ ch := s.outstandingAuthRequests[id]
+ s.mu.Unlock()
+ if ch == nil {
+ s.helper.GetLogger().Errorf("unexpected result from JavaScript. No channel "+
+ "for MessageId: %d exists. Ignoring the results.", id)
+ //Ignore unknown responses that don't belong to any channel
+ return
+ }
+ // Decode the result and send it through the channel
+ var reply authReply
+ if decoderErr := json.Unmarshal([]byte(data), &reply); decoderErr != nil {
+ reply = authReply{Err: &verror.Standard{
+ ID: verror.Internal,
+ Msg: fmt.Sprintf("could not unmarshal the result from the server: %v", decoderErr),
+ }}
+ }
+
+ s.helper.GetLogger().VI(0).Infof("response received from JavaScript server for "+
+ "MessageId %d with result %v", id, reply)
+ s.helper.CleanupFlow(id)
+ // A nil verror.Standard does not result in an nil error. Instead, we have create
+ // a variable for the error interface and only set it's value if the struct is non-
+ // nil.
+ var err error
+ if reply.Err != nil {
+ err = reply.Err
+ }
+ ch <- err
+}
+
+func (s *Server) createFlow() *Flow {
+ return s.helper.CreateNewFlow(s, nil)
+}
+
+func (s *Server) cleanupFlow(id int64) {
+ s.helper.CleanupFlow(id)
+}
+
+func (s *Server) createInvoker(handle int64, sig signature.JSONServiceSignature, label security.Label) (ipc.Invoker, error) {
+ serviceSig, err := sig.ServiceSignature()
+ if err != nil {
+ return nil, err
+ }
+
+ remoteInvokeFunc := s.createRemoteInvokerFunc(handle)
+ return newInvoker(serviceSig, label, remoteInvokeFunc), nil
+}
+
+func (s *Server) createAuthorizer(handle int64, hasAuthorizer bool) (security.Authorizer, error) {
+ if hasAuthorizer {
+ return &authorizer{authFunc: s.createRemoteAuthFunc(handle)}, nil
+ }
+ return vsecurity.NewACLAuthorizer(security.ACL{In: map[security.BlessingPattern]security.LabelSet{
+ security.AllPrincipals: security.AllLabels,
+ }}), nil
+}
+
+func (s *Server) Stop() {
+ result := serverRPCReply{
+ Results: []interface{}{nil},
+ Err: &verror.Standard{
+ ID: verror.Aborted,
+ Msg: "timeout",
+ },
+ }
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ for _, ch := range s.outstandingServerRequests {
+ select {
+ case ch <- &result:
+ default:
+ }
+ }
+ s.outstandingServerRequests = make(map[int64]chan *serverRPCReply)
+ s.server.Stop()
+}
diff --git a/services/wsprd/ipc/server/signature_invoker.go b/services/wsprd/ipc/server/signature_invoker.go
new file mode 100644
index 0000000..bd2d786
--- /dev/null
+++ b/services/wsprd/ipc/server/signature_invoker.go
@@ -0,0 +1,34 @@
+package server
+
+import (
+ "veyron.io/veyron/veyron2/ipc"
+ "veyron.io/veyron/veyron2/security"
+)
+
+// signatureInvoker acts as the signature() method and is used to handle calls
+// to signature() on behalf of the service
+type signatureInvoker struct {
+ // signature of the service
+ sig ipc.ServiceSignature
+}
+
+var _ ipc.Invoker = (*signatureInvoker)(nil)
+
+func (i *signatureInvoker) signature() ipc.ServiceSignature {
+ return i.sig
+}
+
+// newSignatureInvoker is an invoker factory
+func newSignatureInvoker(sig ipc.ServiceSignature) ipc.Invoker {
+ return &signatureInvoker{sig}
+}
+
+// Prepare implements the Invoker interface.
+func (i *signatureInvoker) Prepare(methodName string, _ int) ([]interface{}, security.Label, error) {
+ return []interface{}{}, security.ReadLabel, nil
+}
+
+// Invoke implements the Invoker interface.
+func (i *signatureInvoker) Invoke(methodName string, call ipc.ServerCall, argptrs []interface{}) ([]interface{}, error) {
+ return []interface{}{i.signature(), nil}, nil
+}
diff --git a/services/wsprd/lib/case.go b/services/wsprd/lib/case.go
new file mode 100644
index 0000000..e6ec0d0
--- /dev/null
+++ b/services/wsprd/lib/case.go
@@ -0,0 +1,17 @@
+package lib
+
+import "unicode"
+
+func LowercaseFirstCharacter(s string) string {
+ for _, r := range s {
+ return string(unicode.ToLower(r)) + s[1:]
+ }
+ return ""
+}
+
+func UppercaseFirstCharacter(s string) string {
+ for _, r := range s {
+ return string(unicode.ToUpper(r)) + s[1:]
+ }
+ return ""
+}
diff --git a/services/wsprd/lib/remove_this.go b/services/wsprd/lib/remove_this.go
new file mode 100644
index 0000000..487a708
--- /dev/null
+++ b/services/wsprd/lib/remove_this.go
@@ -0,0 +1,30 @@
+package lib
+
+import (
+ // Any non-release package imports are not allowed to make
+ // sure the repository can be downloaded using "go get".
+ // rps "veyron.io/examples/rockpaperscissors"
+ // "veyron.io/store/veyron2/services/store"
+ // roadmap_watchtypes "veyron.io/store/veyron2/services/watch/types"
+ // "veyron.io/store/veyron2/storage"
+
+ mttypes "veyron.io/veyron/veyron2/services/mounttable/types"
+ watchtypes "veyron.io/veyron/veyron2/services/watch/types"
+ "veyron.io/veyron/veyron2/vom"
+)
+
+func init() {
+ vom.Register(mttypes.MountEntry{})
+ vom.Register(watchtypes.GlobRequest{})
+ vom.Register(watchtypes.Change{})
+ // vom.Register(storage.Entry{})
+ // vom.Register(storage.Stat{})
+ // vom.Register(store.NestedResult(0))
+ // vom.Register(store.QueryResult{})
+ // vom.Register(roadmap_watchtypes.QueryRequest{})
+ // vom.Register(rps.GameOptions{})
+ // vom.Register(rps.GameID{})
+ // vom.Register(rps.PlayResult{})
+ // vom.Register(rps.PlayerAction{})
+ // vom.Register(rps.JudgeAction{})
+}
diff --git a/services/wsprd/lib/signature_manager.go b/services/wsprd/lib/signature_manager.go
new file mode 100644
index 0000000..1d38641
--- /dev/null
+++ b/services/wsprd/lib/signature_manager.go
@@ -0,0 +1,79 @@
+package lib
+
+import (
+ "sync"
+ "time"
+
+ "veyron.io/veyron/veyron2/context"
+ "veyron.io/veyron/veyron2/ipc"
+)
+
+type SignatureManager interface {
+ Signature(ctx context.T, name string, client ipc.Client, opts ...ipc.CallOpt) (*ipc.ServiceSignature, error)
+}
+
+// signatureManager can be used to discover the signature of a remote service
+// It has built-in caching and TTL support.
+type signatureManager struct {
+ // protects the cache and initialization
+ sync.Mutex
+
+ // map of name to service signature and last-accessed time
+ // TODO(aghassemi) GC for expired cache entries
+ cache map[string]*cacheEntry
+}
+
+// NewSignatureManager creates and initialized a new Signature Manager
+func NewSignatureManager() SignatureManager {
+ return &signatureManager{cache: make(map[string]*cacheEntry)}
+}
+
+const (
+ // ttl from the last-accessed time.
+ ttl = time.Duration(time.Hour)
+)
+
+type cacheEntry struct {
+ signature ipc.ServiceSignature
+ lastAccessed time.Time
+}
+
+// expired returns whether the cache entry is expired or not
+func (c cacheEntry) expired() bool {
+ return time.Now().Sub(c.lastAccessed) > ttl
+}
+
+// signature uses the given client to fetch the signature for the given service name.
+// It locks until it fetches the service signature from the remote server, if not a cache hit.
+func (sm *signatureManager) Signature(ctx context.T, name string, client ipc.Client, opts ...ipc.CallOpt) (*ipc.ServiceSignature, error) {
+ sm.Lock()
+ defer sm.Unlock()
+
+ if cashedSig := sm.cache[name]; cashedSig != nil && !cashedSig.expired() {
+ cashedSig.lastAccessed = time.Now()
+ return &cashedSig.signature, nil
+ }
+
+ // cache expired or not found, fetch it from the remote server
+ signatureCall, err := client.StartCall(ctx, name, "Signature", []interface{}{}, opts...)
+ if err != nil {
+ return nil, err
+ }
+
+ var result ipc.ServiceSignature
+ var appErr error
+ if err := signatureCall.Finish(&result, &appErr); err != nil {
+ return nil, err
+ }
+ if appErr != nil {
+ return nil, appErr
+ }
+
+ // cache the result
+ sm.cache[name] = &cacheEntry{
+ signature: result,
+ lastAccessed: time.Now(),
+ }
+
+ return &result, nil
+}
diff --git a/services/wsprd/lib/signature_manager_test.go b/services/wsprd/lib/signature_manager_test.go
new file mode 100644
index 0000000..81460de
--- /dev/null
+++ b/services/wsprd/lib/signature_manager_test.go
@@ -0,0 +1,151 @@
+package lib
+
+import (
+ "reflect"
+ "testing"
+
+ mocks_ipc "veyron.io/veyron/veyron/runtimes/google/testing/mocks/ipc"
+ "veyron.io/veyron/veyron2/ipc"
+ "veyron.io/veyron/veyron2/rt"
+ "veyron.io/veyron/veyron2/vdl/vdlutil"
+ "veyron.io/veyron/veyron2/wiretype"
+)
+
+const (
+ name = "/veyron/name"
+)
+
+func init() {
+ rt.Init()
+}
+
+func expectedSignature() ipc.ServiceSignature {
+ return ipc.ServiceSignature{
+ Methods: make(map[string]ipc.MethodSignature),
+ TypeDefs: []vdlutil.Any{
+ wiretype.NamedPrimitiveType{
+ Name: "veyron.io/veyron/veyron2/vdlutil.AnyData",
+ Type: wiretype.TypeIDInterface,
+ },
+ },
+ }
+}
+
+func client() *mocks_ipc.SimpleMockClient {
+ return mocks_ipc.NewSimpleClient(
+ map[string][]interface{}{
+ "Signature": []interface{}{expectedSignature(), nil},
+ },
+ )
+}
+
+func assertMethodSignatureAsExpected(t *testing.T, got, expected ipc.MethodSignature) {
+ if !reflect.DeepEqual(got.InArgs, expected.InArgs) {
+ t.Errorf(`InArgs do not match: result "%v", want "%v"`, got.InArgs, expected.InArgs)
+ return
+ }
+ if !reflect.DeepEqual(got.OutArgs, expected.OutArgs) {
+ t.Errorf(`OutArgs do not match: result "%v", want "%v"`, got.OutArgs, expected.OutArgs)
+ return
+ }
+ if got.InStream != expected.InStream {
+ t.Errorf(`InStreams do not match: result "%v", want "%v"`, got.InStream, expected.InStream)
+ return
+ }
+ if got.OutStream != expected.OutStream {
+ t.Errorf(`OutStream do not match: result "%v", want "%v"`, got.OutStream, expected.OutStream)
+ return
+ }
+}
+
+func assertSignatureAsExpected(t *testing.T, got, expected *ipc.ServiceSignature) {
+ if !reflect.DeepEqual(got.TypeDefs, expected.TypeDefs) {
+ t.Errorf(`TypeDefs do not match: result "%v", want "%v"`, got.TypeDefs, expected.TypeDefs)
+ return
+ }
+ if n, m := len(got.Methods), len(expected.Methods); n != m {
+ t.Errorf(`Wrong number of signature methods: result "%d", want "%d"`, n, m)
+ return
+ }
+ for gotName, gotMethod := range got.Methods {
+ expectedMethod, ok := expected.Methods[gotName]
+ if !ok {
+ t.Errorf(`Method "%v" was expected but not found`, gotName)
+ return
+ }
+
+ assertMethodSignatureAsExpected(t, gotMethod, expectedMethod)
+ }
+}
+
+func TestFetching(t *testing.T) {
+ sm := NewSignatureManager()
+ got, err := sm.Signature(rt.R().NewContext(), name, client())
+ if err != nil {
+ t.Errorf(`Did not expect an error but got %v`, err)
+ return
+ }
+ expected := expectedSignature()
+ assertSignatureAsExpected(t, got, &expected)
+}
+
+func TestThatCachedAfterFetching(t *testing.T) {
+ sm := NewSignatureManager().(*signatureManager)
+ sig, _ := sm.Signature(rt.R().NewContext(), name, client())
+ cache, ok := sm.cache[name]
+ if !ok {
+ t.Errorf(`Signature manager did not cache the results`)
+ return
+ }
+ assertSignatureAsExpected(t, &cache.signature, sig)
+}
+
+func TestThatCacheIsUsed(t *testing.T) {
+ client := client()
+ sm := NewSignatureManager()
+
+ // call twice
+ sm.Signature(rt.R().NewContext(), name, client)
+ sm.Signature(rt.R().NewContext(), name, client)
+
+ // expect number of calls to Signature method of client to still be 1 since cache
+ // should have been used despite the second call
+ if client.TimesCalled("Signature") != 1 {
+ t.Errorf("Signature cache was not used for the second call")
+ }
+}
+
+func TestThatLastAccessedGetUpdated(t *testing.T) {
+ client := client()
+ sm := NewSignatureManager().(*signatureManager)
+ sm.Signature(rt.R().NewContext(), name, client)
+ // make last accessed be in the past to account for the fact that
+ // two consecutive calls to time.Now() can return identical values.
+ sm.cache[name].lastAccessed = sm.cache[name].lastAccessed.Add(-ttl / 2)
+ prevAccess := sm.cache[name].lastAccessed
+
+ // access again
+ sm.Signature(rt.R().NewContext(), name, client)
+ newAccess := sm.cache[name].lastAccessed
+
+ if !newAccess.After(prevAccess) {
+ t.Errorf("LastAccessed was not updated for cache entry")
+ }
+}
+
+func TestThatTTLExpires(t *testing.T) {
+ client := client()
+ sm := NewSignatureManager().(*signatureManager)
+ sm.Signature(rt.R().NewContext(), name, client)
+
+ // make last accessed go over the ttl
+ sm.cache[name].lastAccessed = sm.cache[name].lastAccessed.Add(-2 * ttl)
+
+ // make a second call
+ sm.Signature(rt.R().NewContext(), name, client)
+
+ // expect number of calls to Signature method of client to be 2 since cache should have expired
+ if client.TimesCalled("Signature") != 2 {
+ t.Errorf("Cache was still used but TTL had passed. It should have been fetched again")
+ }
+}
diff --git a/services/wsprd/lib/testwriter/writer.go b/services/wsprd/lib/testwriter/writer.go
new file mode 100644
index 0000000..de1a49a
--- /dev/null
+++ b/services/wsprd/lib/testwriter/writer.go
@@ -0,0 +1,96 @@
+package testwriter
+
+import (
+ "bytes"
+ "encoding/json"
+ "fmt"
+ "reflect"
+ "sync"
+ "time"
+ "veyron.io/wspr/veyron/services/wsprd/lib"
+)
+
+type TestHarness interface {
+ Errorf(fmt string, a ...interface{})
+}
+
+type Response struct {
+ Type lib.ResponseType
+ Message interface{}
+}
+
+type Writer struct {
+ sync.Mutex
+ Stream []Response
+ err error
+ // If this channel is set then a message will be sent
+ // to this channel after recieving a call to FinishMessage()
+ notifier chan bool
+}
+
+func (w *Writer) Send(responseType lib.ResponseType, msg interface{}) error {
+ // We serialize and deserialize the reponse so that we can do deep equal with
+ // messages that contain non-exported structs.
+ var buf bytes.Buffer
+ if err := json.NewEncoder(&buf).Encode(Response{Type: responseType, Message: msg}); err != nil {
+ return err
+ }
+
+ var r Response
+
+ if err := json.NewDecoder(&buf).Decode(&r); err != nil {
+ return err
+ }
+
+ w.Lock()
+ defer w.Unlock()
+ w.Stream = append(w.Stream, r)
+ if w.notifier != nil {
+ w.notifier <- true
+ }
+ return nil
+
+}
+
+func (w *Writer) Error(err error) {
+ w.err = err
+}
+
+func (w *Writer) streamLength() int {
+ w.Lock()
+ defer w.Unlock()
+ return len(w.Stream)
+}
+
+// Waits until there is at least n messages in w.Stream. Returns an error if we timeout
+// waiting for the given number of messages.
+func (w *Writer) WaitForMessage(n int) error {
+ if w.streamLength() >= n {
+ return nil
+ }
+ w.Lock()
+ w.notifier = make(chan bool, 1)
+ w.Unlock()
+ for w.streamLength() < n {
+ select {
+ case <-w.notifier:
+ continue
+ case <-time.After(10 * time.Second):
+ return fmt.Errorf("timed out")
+ }
+ }
+ w.Lock()
+ w.notifier = nil
+ w.Unlock()
+ return nil
+}
+
+func CheckResponses(w *Writer, expectedStream []Response, err error, t TestHarness) {
+ if !reflect.DeepEqual(expectedStream, w.Stream) {
+ t.Errorf("streams don't match: expected %v, got %v", expectedStream, w.Stream)
+ }
+
+ if !reflect.DeepEqual(err, w.err) {
+ t.Errorf("unexpected error, got: %v, expected: %v", w.err, err)
+ }
+}
diff --git a/services/wsprd/lib/writer.go b/services/wsprd/lib/writer.go
new file mode 100644
index 0000000..7c6c18f
--- /dev/null
+++ b/services/wsprd/lib/writer.go
@@ -0,0 +1,21 @@
+package lib
+
+type ResponseType int
+
+const (
+ ResponseFinal ResponseType = 0
+ ResponseStream = 1
+ ResponseError = 2
+ ResponseServerRequest = 3
+ ResponseStreamClose = 4
+ ResponseDispatcherLookup = 5
+ ResponseAuthRequest = 6
+)
+
+// This is basically an io.Writer interface, that allows passing error message
+// strings. This is how the proxy will talk to the javascript/java clients.
+type ClientWriter interface {
+ Send(messageType ResponseType, data interface{}) error
+
+ Error(err error)
+}
diff --git a/services/wsprd/signature/signature.go b/services/wsprd/signature/signature.go
new file mode 100644
index 0000000..d32dac1
--- /dev/null
+++ b/services/wsprd/signature/signature.go
@@ -0,0 +1,93 @@
+package signature
+
+import (
+ "veyron.io/veyron/veyron2/ipc"
+ "veyron.io/veyron/veyron2/vdl/vdlutil"
+ "veyron.io/veyron/veyron2/wiretype"
+ "veyron.io/wspr/veyron/services/wsprd/lib"
+)
+
+var (
+ anydataType = wiretype.NamedPrimitiveType{
+ Name: "veyron.io/veyron/veyron2/vdlutil.AnyData",
+ Type: wiretype.TypeIDInterface,
+ }
+ errType = wiretype.NamedPrimitiveType{
+ Name: "error",
+ Type: wiretype.TypeIDInterface,
+ }
+ anydataTypeID = wiretype.TypeIDFirst
+ errTypeID = wiretype.TypeIDFirst
+)
+
+// JSONServiceSignature represents the information about a service signature that is used by JSON.
+type JSONServiceSignature map[string]JSONMethodSignature
+
+// JSONMethodSignature represents the information about a method signature that is used by JSON.
+type JSONMethodSignature struct {
+ InArgs []string // InArgs is a list of argument names.
+ NumOutArgs int
+ IsStreaming bool
+}
+
+// NewJSONServiceSignature converts an ipc service signature to the format used by JSON.
+func NewJSONServiceSignature(sig ipc.ServiceSignature) JSONServiceSignature {
+ jsig := JSONServiceSignature{}
+
+ for name, methSig := range sig.Methods {
+ jmethSig := JSONMethodSignature{
+ InArgs: make([]string, len(methSig.InArgs)),
+ NumOutArgs: len(methSig.OutArgs),
+ IsStreaming: methSig.InStream != wiretype.TypeIDInvalid || methSig.OutStream != wiretype.TypeIDInvalid,
+ }
+
+ for i, inarg := range methSig.InArgs {
+ jmethSig.InArgs[i] = inarg.Name
+ }
+
+ jsig[lib.LowercaseFirstCharacter(name)] = jmethSig
+ }
+
+ return jsig
+}
+
+// ServiceSignature converts a JSONServiceSignature to an ipc service signature.
+func (jss JSONServiceSignature) ServiceSignature() (ipc.ServiceSignature, error) {
+ ss := ipc.ServiceSignature{
+ Methods: make(map[string]ipc.MethodSignature),
+ }
+
+ for name, sig := range jss {
+ ms := ipc.MethodSignature{}
+
+ ms.InArgs = make([]ipc.MethodArgument, len(sig.InArgs))
+ for i, argName := range sig.InArgs {
+ ms.InArgs[i] = ipc.MethodArgument{
+ Name: argName,
+ Type: anydataTypeID,
+ }
+ }
+
+ ms.OutArgs = make([]ipc.MethodArgument, sig.NumOutArgs)
+ for i := 0; i < sig.NumOutArgs-1; i++ {
+ ms.OutArgs[i] = ipc.MethodArgument{
+ Type: anydataTypeID,
+ }
+ }
+ ms.OutArgs[sig.NumOutArgs-1] = ipc.MethodArgument{
+ Name: "err",
+ Type: errTypeID,
+ }
+
+ if sig.IsStreaming {
+ ms.InStream = anydataTypeID
+ ms.OutStream = anydataTypeID
+ }
+
+ ss.Methods[lib.UppercaseFirstCharacter(name)] = ms
+ }
+
+ ss.TypeDefs = []vdlutil.Any{anydataType, errType}
+
+ return ss, nil
+}
diff --git a/services/wsprd/signature/signature_test.go b/services/wsprd/signature/signature_test.go
new file mode 100644
index 0000000..dd9e85a
--- /dev/null
+++ b/services/wsprd/signature/signature_test.go
@@ -0,0 +1,117 @@
+package signature
+
+import (
+ "reflect"
+ "testing"
+
+ "veyron.io/veyron/veyron2/ipc"
+ "veyron.io/veyron/veyron2/vdl/vdlutil"
+ "veyron.io/veyron/veyron2/wiretype"
+)
+
+func TestNewJSONServiceSignature(t *testing.T) {
+ sigIn := ipc.ServiceSignature{
+ Methods: map[string]ipc.MethodSignature{
+ "FirstMethod": ipc.MethodSignature{
+ InArgs: []ipc.MethodArgument{
+ ipc.MethodArgument{
+ Name: "FirstArg",
+ Type: wiretype.TypeIDFloat64,
+ },
+ ipc.MethodArgument{
+ Name: "SecondArg",
+ Type: wiretype.TypeIDUintptr,
+ },
+ },
+ OutArgs: []ipc.MethodArgument{
+ ipc.MethodArgument{
+ Name: "FirstOutArg",
+ Type: wiretype.TypeIDFloat64,
+ },
+ ipc.MethodArgument{
+ Name: "SecondOutArg",
+ Type: anydataTypeID,
+ },
+ ipc.MethodArgument{
+ Name: "ThirdOutArg",
+ Type: wiretype.TypeIDInt32,
+ },
+ ipc.MethodArgument{
+ Name: "err",
+ Type: wiretype.TypeIDFirst,
+ },
+ },
+ OutStream: wiretype.TypeIDString,
+ },
+ },
+ TypeDefs: []vdlutil.Any{
+ anydataType,
+ errType,
+ },
+ }
+
+ sigOut := NewJSONServiceSignature(sigIn)
+
+ expectedSigOut := JSONServiceSignature{
+ "firstMethod": JSONMethodSignature{
+ InArgs: []string{"FirstArg", "SecondArg"},
+ NumOutArgs: 4,
+ IsStreaming: true,
+ },
+ }
+
+ if !reflect.DeepEqual(sigOut, expectedSigOut) {
+ t.Errorf("Signature differed from expectation. got: %v but expected %v", sigOut, expectedSigOut)
+ }
+}
+
+func TestServiceSignature(t *testing.T) {
+ sigIn := JSONServiceSignature{
+ "firstMethod": JSONMethodSignature{
+ InArgs: []string{"FirstArg", "SecondArg"},
+ NumOutArgs: 2,
+ IsStreaming: true,
+ },
+ }
+
+ sigOut, err := sigIn.ServiceSignature()
+ if err != nil {
+ t.Fatal("error in service signature", err)
+ }
+
+ expectedSigOut := ipc.ServiceSignature{
+ Methods: map[string]ipc.MethodSignature{
+ "FirstMethod": ipc.MethodSignature{
+ InArgs: []ipc.MethodArgument{
+ ipc.MethodArgument{
+ Name: "FirstArg",
+ Type: anydataTypeID,
+ },
+ ipc.MethodArgument{
+ Name: "SecondArg",
+ Type: anydataTypeID,
+ },
+ },
+ OutArgs: []ipc.MethodArgument{
+ ipc.MethodArgument{
+ Type: anydataTypeID,
+ },
+ ipc.MethodArgument{
+ Name: "err",
+ Type: errTypeID,
+ },
+ },
+ InStream: anydataTypeID,
+ OutStream: anydataTypeID,
+ },
+ },
+ TypeDefs: []vdlutil.Any{
+ anydataType,
+ errType,
+ },
+ }
+
+ if !reflect.DeepEqual(sigOut, expectedSigOut) {
+ t.Error("Signature differed from expectation. got: %v but expected %v", sigOut, expectedSigOut)
+ }
+}
diff --git a/services/wsprd/wspr.go b/services/wsprd/wspr.go
new file mode 100644
index 0000000..fed9ea0
--- /dev/null
+++ b/services/wsprd/wspr.go
@@ -0,0 +1,26 @@
+package main
+
+import (
+ "flag"
+
+ "veyron.io/veyron/veyron/lib/signals"
+ // TODO(cnicolaou,benj): figure out how to support roaming as a chrome plugi
+ "veyron.io/veyron/veyron/profiles/roaming"
+ "veyron.io/veyron/veyron2/rt"
+ "veyron.io/wspr/veyron/services/wsprd/wspr"
+)
+
+func main() {
+ identd := flag.String("identd", "", "The endpoint for the identd server. This must be set.")
+ flag.Parse()
+
+ rt.Init()
+
+ proxy := wspr.NewWSPR(*roaming.ListenSpec, *identd)
+ defer proxy.Shutdown()
+ go func() {
+ proxy.Run()
+ }()
+
+ <-signals.ShutdownOnSignals()
+}
diff --git a/services/wsprd/wspr/pipe.go b/services/wsprd/wspr/pipe.go
new file mode 100644
index 0000000..0a538a9
--- /dev/null
+++ b/services/wsprd/wspr/pipe.go
@@ -0,0 +1,296 @@
+package wspr
+
+import (
+ "bytes"
+ "encoding/json"
+ "fmt"
+ "io"
+ "net/http"
+ _ "net/http/pprof"
+ "os"
+ "strings"
+ "time"
+
+ "veyron.io/veyron/veyron2"
+ "veyron.io/veyron/veyron2/verror"
+ "veyron.io/veyron/veyron2/vlog"
+ "veyron.io/veyron/veyron2/vom"
+ "veyron.io/wspr/veyron/services/wsprd/app"
+ "veyron.io/wspr/veyron/services/wsprd/lib"
+
+ "github.com/gorilla/websocket"
+)
+
+// The type of message sent by the JS client to the wspr.
+type websocketMessageType int
+
+const (
+ // Making a veyron client request, streaming or otherwise
+ websocketVeyronRequest websocketMessageType = 0
+
+ // Serving this websocket under an object name
+ websocketServe = 1
+
+ // A response from a service in javascript to a request
+ // from the proxy.
+ websocketServerResponse = 2
+
+ // Sending streaming data, either from a JS client or JS service.
+ websocketStreamingValue = 3
+
+ // A response that means the stream is closed by the client.
+ websocketStreamClose = 4
+
+ // A request to get signature of a remote server
+ websocketSignatureRequest = 5
+
+ // A request to stop a server
+ websocketStopServer = 6
+
+ // A request to associate an identity with an origin
+ websocketAssocIdentity = 7
+
+ // A request to bless an identity
+ websocketBlessIdentity = 8
+
+ // A request to unlink an identity. This request means that
+ // we can remove the given handle from the handle store.
+ websocketUnlinkIdentity = 9
+
+ // A request to create a new random identity
+ websocketCreateIdentity = 10
+
+ // A request to run the lookup function on a dispatcher.
+ websocketLookupResponse = 11
+
+ // A request to run the authorizer for an rpc.
+ websocketAuthResponse = 12
+)
+
+type websocketMessage struct {
+ Id int64
+ // This contains the json encoded payload.
+ Data string
+
+ // Whether it is an rpc request or a serve request.
+ Type websocketMessageType
+}
+
+// wsMessage is the struct that is put on the write queue.
+type wsMessage struct {
+ buf []byte
+ messageType int
+}
+
+type pipe struct {
+ // The struct that handles the translation of javascript request to veyron requests.
+ controller *app.Controller
+
+ ws *websocket.Conn
+
+ logger vlog.Logger
+
+ wspr *WSPR
+
+ // Creates a client writer for a given flow. This is a member so that tests can override
+ // the default implementation.
+ writerCreator func(id int64) lib.ClientWriter
+
+ // There is a single write goroutine because ws.NewWriter() creates a new writer that
+ // writes to a shared buffer in the websocket, so it is not safe to have multiple go
+ // routines writing to different websocket writers.
+ writeQueue chan wsMessage
+
+ // This request is used to tell WSPR which pipe to remove when we shutdown.
+ req *http.Request
+}
+
+func newPipe(w http.ResponseWriter, req *http.Request, wspr *WSPR, creator func(id int64) lib.ClientWriter) *pipe {
+ pipe := &pipe{logger: wspr.rt.Logger(), wspr: wspr, req: req}
+
+ if creator == nil {
+ creator = func(id int64) lib.ClientWriter {
+ return &websocketWriter{p: pipe, id: id, logger: pipe.logger}
+ }
+ }
+ pipe.writerCreator = creator
+ origin := req.Header.Get("Origin")
+ if origin == "" {
+ wspr.rt.Logger().Errorf("Could not read origin from the request")
+ http.Error(w, "Could not read origin from the request", http.StatusBadRequest)
+ return nil
+ }
+
+ id, err := wspr.idManager.Identity(origin)
+
+ if err != nil {
+ id = wspr.rt.Identity()
+ wspr.rt.Logger().Errorf("no identity associated with origin %s: %v", origin, err)
+ // TODO(bjornick): Send an error to the client when all of the identity stuff is set up.
+ }
+
+ pipe.controller, err = app.NewController(creator, &wspr.listenSpec, veyron2.RuntimeID(id))
+
+ if err != nil {
+ wspr.rt.Logger().Errorf("Could not create controller: %v", err)
+ http.Error(w, fmt.Sprintf("Failed to create controller: %v", err), http.StatusInternalServerError)
+ return nil
+ }
+
+ pipe.start(w, req)
+ return pipe
+}
+
+// cleans up any outstanding rpcs.
+func (p *pipe) cleanup() {
+ p.logger.VI(0).Info("Cleaning up websocket")
+ p.controller.Cleanup()
+ p.ws.Close()
+ p.wspr.CleanUpPipe(p.req)
+}
+
+func (p *pipe) setup() {
+ p.writeQueue = make(chan wsMessage, 50)
+ go p.writeLoop()
+}
+
+func (p *pipe) writeLoop() {
+ for {
+ msg, ok := <-p.writeQueue
+ if !ok {
+ p.logger.Errorf("write queue was closed")
+ return
+ }
+
+ if msg.messageType == websocket.PingMessage {
+ p.logger.Infof("sending ping")
+ }
+ if err := p.ws.WriteMessage(msg.messageType, msg.buf); err != nil {
+ p.logger.Errorf("failed to write bytes: %s", err)
+ }
+ }
+}
+
+func (p *pipe) start(w http.ResponseWriter, req *http.Request) {
+ ws, err := websocket.Upgrade(w, req, nil, 1024, 1024)
+ if _, ok := err.(websocket.HandshakeError); ok {
+ http.Error(w, "Not a websocket handshake", 400)
+ return
+ } else if err != nil {
+ http.Error(w, "Internal Error", 500)
+ p.logger.Errorf("websocket upgrade failed: %s", err)
+ return
+ }
+
+ p.ws = ws
+ p.ws.SetPongHandler(p.pongHandler)
+ p.setup()
+ p.sendInitialMessage()
+
+ go p.readLoop()
+ go p.pingLoop()
+}
+
+// Upon first connect, we send a message with the wsprConfig.
+func (p *pipe) sendInitialMessage() {
+ // TODO(bprosnitz) Use the same root lookup as the rest of veyron so that this is consistent.
+ mounttableRoots := strings.Split(os.Getenv("NAMESPACE_ROOT"), ",")
+ if len(mounttableRoots) == 1 && mounttableRoots[0] == "" {
+ mounttableRoots = []string{}
+ }
+ if mtRoot := os.Getenv("MOUNTTABLE_ROOT"); mtRoot != "" {
+ mounttableRoots = append(mounttableRoots, mtRoot)
+ }
+ msg := wsprConfig{
+ MounttableRoot: mounttableRoots,
+ }
+
+ var buf bytes.Buffer
+ if err := vom.ObjToJSON(&buf, vom.ValueOf(msg)); err != nil {
+ p.logger.Errorf("failed to convert wspr config to json: %s", err)
+ return
+ }
+ p.writeQueue <- wsMessage{messageType: websocket.TextMessage, buf: buf.Bytes()}
+}
+
+func (p *pipe) pingLoop() {
+ for {
+ time.Sleep(pingInterval)
+ p.logger.VI(2).Info("ws: ping")
+ p.writeQueue <- wsMessage{messageType: websocket.PingMessage, buf: []byte{}}
+ }
+}
+
+func (p *pipe) pongHandler(msg string) error {
+ p.logger.VI(2).Infof("ws: pong")
+ p.ws.SetReadDeadline(time.Now().Add(pongTimeout))
+ return nil
+}
+
+func (p *pipe) readLoop() {
+ p.ws.SetReadDeadline(time.Now().Add(pongTimeout))
+ for {
+ op, r, err := p.ws.NextReader()
+ if err == io.ErrUnexpectedEOF { // websocket disconnected
+ break
+ }
+ if err != nil {
+ p.logger.VI(1).Infof("websocket receive: %s", err)
+ break
+ }
+
+ if op != websocket.TextMessage {
+ p.logger.Errorf("unexpected websocket op: %v", op)
+ }
+
+ var msg websocketMessage
+ decoder := json.NewDecoder(r)
+ if err := decoder.Decode(&msg); err != nil {
+ errMsg := fmt.Sprintf("can't unmarshall JSONMessage: %v", err)
+ p.logger.Error(errMsg)
+ p.writeQueue <- wsMessage{messageType: websocket.TextMessage, buf: []byte(errMsg)}
+ continue
+ }
+
+ ww := p.writerCreator(msg.Id)
+
+ switch msg.Type {
+ case websocketVeyronRequest:
+ // TODO(mattr): Get the proper context information
+ // from javascript.
+ ctx := p.wspr.rt.NewContext()
+ p.controller.HandleVeyronRequest(ctx, msg.Id, msg.Data, ww)
+ case websocketStreamingValue:
+ // SendOnStream queues up the message to be sent, but doesn't do the send
+ // on this goroutine. We need to queue the messages synchronously so that
+ // the order is preserved.
+ p.controller.SendOnStream(msg.Id, msg.Data, ww)
+ case websocketStreamClose:
+ p.controller.CloseStream(msg.Id)
+ case websocketServe:
+ go p.controller.HandleServeRequest(msg.Data, ww)
+ case websocketStopServer:
+ go p.controller.HandleStopRequest(msg.Data, ww)
+ case websocketServerResponse:
+ go p.controller.HandleServerResponse(msg.Id, msg.Data)
+ case websocketSignatureRequest:
+ // TODO(mattr): Get the proper context information
+ // from javascript.
+ ctx := p.wspr.rt.NewContext()
+ go p.controller.HandleSignatureRequest(ctx, msg.Data, ww)
+ case websocketLookupResponse:
+ go p.controller.HandleLookupResponse(msg.Id, msg.Data)
+ case websocketBlessIdentity:
+ go p.controller.HandleBlessing(msg.Data, ww)
+ case websocketCreateIdentity:
+ go p.controller.HandleCreateIdentity(msg.Data, ww)
+ case websocketUnlinkIdentity:
+ go p.controller.HandleUnlinkJSIdentity(msg.Data, ww)
+ case websocketAuthResponse:
+ go p.controller.HandleAuthResponse(msg.Id, msg.Data)
+ default:
+ ww.Error(verror.Unknownf("unknown message type: %v", msg.Type))
+ }
+ }
+ p.cleanup()
+}
diff --git a/services/wsprd/wspr/writer.go b/services/wsprd/wspr/writer.go
new file mode 100644
index 0000000..a5e2514
--- /dev/null
+++ b/services/wsprd/wspr/writer.go
@@ -0,0 +1,72 @@
+package wspr
+
+import (
+ "bytes"
+ "fmt"
+ "path/filepath"
+ "runtime"
+
+ "veyron.io/wspr/veyron/services/wsprd/lib"
+
+ "veyron.io/veyron/veyron2/verror"
+ "veyron.io/veyron/veyron2/vlog"
+ "veyron.io/veyron/veyron2/vom"
+
+ "github.com/gorilla/websocket"
+)
+
+// Wraps a response to the proxy client and adds a message type.
+type response struct {
+ Type lib.ResponseType
+ Message interface{}
+}
+
+// Implements clientWriter interface for sending messages over websockets.
+type websocketWriter struct {
+ p *pipe
+ logger vlog.Logger
+ id int64
+}
+
+func (w *websocketWriter) Send(messageType lib.ResponseType, data interface{}) error {
+ var buf bytes.Buffer
+ if err := vom.ObjToJSON(&buf, vom.ValueOf(response{Type: messageType, Message: data})); err != nil {
+ w.logger.Error("Failed to marshal with", err)
+ return err
+ }
+
+ var buf2 bytes.Buffer
+
+ if err := vom.ObjToJSON(&buf2, vom.ValueOf(websocketMessage{Id: w.id, Data: buf.String()})); err != nil {
+ w.logger.Error("Failed to write the message", err)
+ return err
+ }
+
+ w.p.writeQueue <- wsMessage{messageType: websocket.TextMessage, buf: buf2.Bytes()}
+
+ return nil
+}
+
+func (w *websocketWriter) Error(err error) {
+ verr := verror.ToStandard(err)
+
+ // Also log the error but write internal errors at a more severe log level
+ var logLevel vlog.Level = 2
+ logErr := fmt.Sprintf("%v", verr)
+ // We want to look at the stack three frames up to find where the error actually
+ // occurred. (caller -> websocketErrorResponse/sendError -> generateErrorMessage).
+ if _, file, line, ok := runtime.Caller(3); ok {
+ logErr = fmt.Sprintf("%s:%d: %s", filepath.Base(file), line, logErr)
+ }
+ if verror.Is(verr, verror.Internal) {
+ logLevel = 2
+ }
+ w.logger.VI(logLevel).Info(logErr)
+
+ var errMsg = verror.Standard{
+ ID: verr.ErrorID(),
+ Msg: verr.Error(),
+ }
+
+ w.Send(lib.ResponseError, errMsg)
+}
diff --git a/services/wsprd/wspr/wspr.go b/services/wsprd/wspr/wspr.go
new file mode 100644
index 0000000..dd9cbf5
--- /dev/null
+++ b/services/wsprd/wspr/wspr.go
@@ -0,0 +1,285 @@
+// A simple WebSocket proxy (WSPR) that takes in a Veyron RPC message, encoded in JSON
+// and stored in a WebSocket message, and sends it to the specified Veyron
+// endpoint.
+//
+// Input arguments must be provided as a JSON message in the following format:
+//
+// {
+// "Address" : String, //EndPoint Address
+// "Name" : String, //Service Name
+// "Method" : String, //Method Name
+// "InArgs" : { "ArgName1" : ArgVal1, "ArgName2" : ArgVal2, ... },
+// "IsStreaming" : true/false
+// }
+//
+package wspr
+
+import (
+ "bytes"
+ "crypto/tls"
+ "encoding/json"
+ "fmt"
+ "io"
+ "log"
+ "net/http"
+ _ "net/http/pprof"
+ "sync"
+ "time"
+
+ "veyron.io/veyron/veyron2"
+ "veyron.io/veyron/veyron2/ipc"
+ "veyron.io/veyron/veyron2/rt"
+ "veyron.io/veyron/veyron2/security"
+ "veyron.io/veyron/veyron2/vlog"
+
+ veyron_identity "veyron.io/veyron/veyron/services/identity"
+ "veyron.io/wspr/veyron/services/wsprd/identity"
+)
+
+const (
+ pingInterval = 50 * time.Second // how often the server pings the client.
+ pongTimeout = pingInterval + 10*time.Second // maximum wait for pong.
+)
+
+type wsprConfig struct {
+ MounttableRoot []string
+}
+
+type WSPR struct {
+ mu sync.Mutex
+ tlsCert *tls.Certificate
+ rt veyron2.Runtime
+ httpPort int // HTTP port for WSPR to serve on. Port rather than address to discourage serving in a way that isn't local.
+ logger vlog.Logger
+ listenSpec ipc.ListenSpec
+ identdEP string
+ idManager *identity.IDManager
+ blesserService veyron_identity.OAuthBlesser
+ pipes map[*http.Request]*pipe
+}
+
+var logger vlog.Logger
+
+func readFromRequest(r *http.Request) (*bytes.Buffer, error) {
+ var buf bytes.Buffer
+ if readBytes, err := io.Copy(&buf, r.Body); err != nil {
+ return nil, fmt.Errorf("error copying message out of request: %v", err)
+ } else if wantBytes := r.ContentLength; readBytes != wantBytes {
+ return nil, fmt.Errorf("read %d bytes, wanted %d", readBytes, wantBytes)
+ }
+ return &buf, nil
+}
+
+func setAccessControl(w http.ResponseWriter) {
+ w.Header().Set("Access-Control-Allow-Origin", "*")
+}
+
+// Starts the proxy and listens for requests. This method is blocking.
+func (ctx WSPR) Run() {
+ // Bind to the OAuth Blesser service
+ blesserService, err := veyron_identity.BindOAuthBlesser(ctx.identdEP)
+ if err != nil {
+ log.Fatalf("Failed to bind to identity service at %v: %v", ctx.identdEP, err)
+ }
+ ctx.blesserService = blesserService
+
+ // HTTP routes
+ http.HandleFunc("/debug", ctx.handleDebug)
+ http.HandleFunc("/create-account", ctx.handleCreateAccount)
+ http.HandleFunc("/assoc-account", ctx.handleAssocAccount)
+ http.HandleFunc("/ws", ctx.handleWS)
+ // Everything else is a 404.
+ // Note: the pattern "/" matches all paths not matched by other
+ // registered patterns, not just the URL with Path == "/".'
+ // (http://golang.org/pkg/net/http/#ServeMux)
+ http.Handle("/", http.NotFoundHandler())
+ ctx.logger.VI(1).Infof("Listening at port %d.", ctx.httpPort)
+ httpErr := http.ListenAndServe(fmt.Sprintf("127.0.0.1:%d", ctx.httpPort), nil)
+ if httpErr != nil {
+ log.Fatalf("Failed to HTTP serve: %s", httpErr)
+ }
+}
+
+func (ctx WSPR) Shutdown() {
+ ctx.rt.Cleanup()
+}
+
+func (ctx WSPR) CleanUpPipe(req *http.Request) {
+ ctx.mu.Lock()
+ defer ctx.mu.Unlock()
+ delete(ctx.pipes, req)
+}
+
+// Creates a new WebSocket Proxy object.
+func NewWSPR(httpPort int, listenSpec ipc.ListenSpec, identdEP string, opts ...veyron2.ROpt) *WSPR {
+ if listenSpec.Proxy == "" {
+ log.Fatalf("a veyron proxy must be set")
+ }
+ if identdEP == "" {
+ log.Fatalf("an identd server must be set")
+ }
+
+ newrt, err := rt.New(opts...)
+ if err != nil {
+ log.Fatalf("rt.New failed: %s", err)
+ }
+
+ // TODO(nlacasse, bjornick) use a serializer that can actually persist.
+ idManager, err := identity.NewIDManager(newrt, &identity.InMemorySerializer{})
+ if err != nil {
+ log.Fatalf("identity.NewIDManager failed: %s", err)
+ }
+
+ return &WSPR{
+ httpPort: httpPort,
+ listenSpec: listenSpec,
+ identdEP: identdEP,
+ rt: newrt,
+ logger: newrt.Logger(),
+ idManager: idManager,
+ pipes: map[*http.Request]*pipe{},
+ }
+}
+
+// HTTP Handlers
+
+func (ctx WSPR) handleDebug(w http.ResponseWriter, r *http.Request) {
+ if r.Method != "GET" {
+ w.WriteHeader(http.StatusMethodNotAllowed)
+ fmt.Fprintf(w, "")
+ return
+ }
+ w.Header().Set("Content-Type", "text/html")
+ w.Write([]byte(`<html>
+<head>
+<title>/debug</title>
+</head>
+<body>
+<ul>
+<li><a href="/debug/pprof">/debug/pprof</a></li>
+</li></ul></body></html>
+`))
+}
+
+func (ctx WSPR) handleWS(w http.ResponseWriter, r *http.Request) {
+ if r.Method != "GET" {
+ http.Error(w, "Method not allowed.", http.StatusMethodNotAllowed)
+ return
+ }
+ ctx.logger.VI(0).Info("Creating a new websocket")
+ p := newPipe(w, r, &ctx, nil)
+
+ if p == nil {
+ return
+ }
+ ctx.mu.Lock()
+ defer ctx.mu.Unlock()
+ ctx.pipes[r] = p
+}
+
+// Structs for marshalling input/output to create-account route.
+type createAccountInput struct {
+ AccessToken string `json:"access_token"`
+}
+
+type createAccountOutput struct {
+ Names []string `json:"names"`
+}
+
+// Handler for creating an account in the identity manager.
+// A valid OAuth2 access token must be supplied in the request body. That
+// access token is exchanged for a blessing from the identd server. A new
+// privateID is then derived from WSPR's privateID and the blessing. That
+// privateID is stored in the identity manager. The name of the new privateID
+// is returned to the client.
+func (ctx WSPR) handleCreateAccount(w http.ResponseWriter, r *http.Request) {
+ if r.Method != "POST" {
+ http.Error(w, "Method not allowed.", http.StatusMethodNotAllowed)
+ return
+ }
+
+ // Parse request body.
+ var data createAccountInput
+ if err := json.NewDecoder(r.Body).Decode(&data); err != nil {
+ msg := fmt.Sprintf("Error parsing body: %v", err)
+ ctx.logger.Error(msg)
+ http.Error(w, msg, http.StatusBadRequest)
+ }
+
+ // Get a blessing for the access token from identity server.
+ rctx, cancel := ctx.rt.NewContext().WithTimeout(time.Minute)
+ defer cancel()
+ blessingAny, err := ctx.blesserService.BlessUsingAccessToken(rctx, data.AccessToken)
+ if err != nil {
+ msg := fmt.Sprintf("Error getting blessing for access token: %v", err)
+ ctx.logger.Error(msg)
+ http.Error(w, msg, http.StatusBadRequest)
+ return
+ }
+ blessing := blessingAny.(security.PublicID)
+
+ // Derive a new identity from the runtime's identity and the blessing.
+ identity, err := ctx.rt.Identity().Derive(blessing)
+ if err != nil {
+ msg := fmt.Sprintf("Error deriving identity: %v", err)
+ ctx.logger.Error(msg)
+ http.Error(w, msg, http.StatusBadRequest)
+ return
+ }
+
+ for _, name := range blessing.Names() {
+ // Store identity in identity manager.
+ if err := ctx.idManager.AddAccount(name, identity); err != nil {
+ msg := fmt.Sprintf("Error storing identity: %v", err)
+ ctx.logger.Error(msg)
+ http.Error(w, msg, http.StatusBadRequest)
+ return
+ }
+ }
+
+ // Return the names to the client.
+ out := createAccountOutput{
+ Names: blessing.Names(),
+ }
+ outJson, err := json.Marshal(out)
+ if err != nil {
+ msg := fmt.Sprintf("Error mashalling names: %v", err)
+ ctx.logger.Error(msg)
+ http.Error(w, msg, http.StatusInternalServerError)
+ return
+ }
+
+ // Success.
+ w.Header().Set("Content-Type", "application/json")
+ fmt.Fprintf(w, string(outJson))
+}
+
+// Struct for marshalling input to assoc-account route.
+type assocAccountInput struct {
+ Name string `json:"name"`
+ Origin string `json:"origin"`
+}
+
+// Handler for associating an existing privateID with an origin.
+func (ctx WSPR) handleAssocAccount(w http.ResponseWriter, r *http.Request) {
+ if r.Method != "POST" {
+ http.Error(w, "Method not allowed.", http.StatusMethodNotAllowed)
+ return
+ }
+
+ // Parse request body.
+ var data assocAccountInput
+ if err := json.NewDecoder(r.Body).Decode(&data); err != nil {
+ http.Error(w, fmt.Sprintf("Error parsing body: %v", err), http.StatusBadRequest)
+ }
+
+ // Store the origin.
+ // TODO(nlacasse, bjornick): determine what the caveats should be.
+ if err := ctx.idManager.AddOrigin(data.Origin, data.Name, nil); err != nil {
+ http.Error(w, fmt.Sprintf("Error associating account: %v", err), http.StatusBadRequest)
+ return
+ }
+
+ // Success.
+ fmt.Fprintf(w, "")
+}
diff --git a/services/wsprd/wspr/wspr_test.go b/services/wsprd/wspr/wspr_test.go
new file mode 100644
index 0000000..61dde63
--- /dev/null
+++ b/services/wsprd/wspr/wspr_test.go
@@ -0,0 +1,226 @@
+package wspr
+
+import (
+ "bytes"
+ "encoding/json"
+ "fmt"
+ "net/http"
+ "net/http/httptest"
+ "testing"
+ "time"
+
+ "veyron.io/veyron/veyron2/context"
+ "veyron.io/veyron/veyron2/ipc"
+ "veyron.io/veyron/veyron2/security"
+ "veyron.io/veyron/veyron2/vdl/vdlutil"
+
+ "veyron.io/veyron/veyron/profiles"
+)
+
+// BEGIN MOCK BLESSER SERVICE
+// TODO(nlacasse): Is there a better way to mock this?!
+type mockBlesserService struct {
+ id security.PrivateID
+ count int
+}
+
+func newMockBlesserService(id security.PrivateID) *mockBlesserService {
+ return &mockBlesserService{
+ id: id,
+ count: 0,
+ }
+}
+
+func (m *mockBlesserService) BlessUsingAccessToken(c context.T, accessToken string, co ...ipc.CallOpt) (vdlutil.Any, error) {
+ m.count = m.count + 1
+ name := fmt.Sprintf("mock-blessing-%v", m.count)
+ return m.id.Bless(m.id.PublicID(), name, 5*time.Minute, nil)
+}
+
+// This is never used. Only needed for mock.
+func (m *mockBlesserService) GetMethodTags(c context.T, s string, co ...ipc.CallOpt) ([]interface{}, error) {
+ return nil, nil
+}
+
+// This is never used. Only needed for mock.
+func (m *mockBlesserService) Signature(c context.T, co ...ipc.CallOpt) (ipc.ServiceSignature, error) {
+ return ipc.ServiceSignature{}, nil
+}
+
+// This is never used. Only needed for mock.
+func (m *mockBlesserService) UnresolveStep(c context.T, co ...ipc.CallOpt) ([]string, error) {
+ return []string{}, nil
+}
+
+// END MOCK BLESSER SERVICE
+
+func setup(t *testing.T) (*WSPR, func()) {
+ spec := *profiles.LocalListenSpec
+ spec.Proxy = "/mock/proxy"
+ wspr := NewWSPR(0, spec, "/mock/identd")
+ providerId := wspr.rt.Identity()
+
+ wspr.blesserService = newMockBlesserService(providerId)
+ return wspr, func() {
+ wspr.Shutdown()
+ }
+}
+
+func TestHandleCreateAccount(t *testing.T) {
+ wspr, teardown := setup(t)
+ defer teardown()
+
+ method := "POST"
+ path := "/create-account"
+
+ // Add one account
+ data1 := createAccountInput{
+ AccessToken: "mock-access-token-1",
+ }
+ data1Json, err := json.Marshal(data1)
+ if err != nil {
+ t.Fatalf("json.Marshal(%v) failed: %v", data1, err)
+ }
+
+ data1JsonReader := bytes.NewReader(data1Json)
+ req, err := http.NewRequest(method, path, (data1JsonReader))
+ if err != nil {
+ t.Fatalf("http.NewRequest(%v, %v, %v,) failed: %v", method, path, data1JsonReader, err)
+ }
+
+ resp1 := httptest.NewRecorder()
+ wspr.handleCreateAccount(resp1, req)
+ if resp1.Code != 200 {
+ t.Fatalf("Expected handleCreateAccount to return 200 OK, instead got %v", resp1)
+ }
+
+ // Verify that idManager has the new account
+ topLevelName := wspr.rt.Identity().PublicID().Names()[0]
+ expectedAccountName := topLevelName + "/mock-blessing-1"
+ gotAccounts := wspr.idManager.AccountsMatching(security.BlessingPattern(expectedAccountName))
+ if len(gotAccounts) != 1 {
+ t.Fatalf("Expected to have 1 account with name %v, but got %v: %v", expectedAccountName, len(gotAccounts), gotAccounts)
+ }
+
+ // Add another account
+ data2 := createAccountInput{
+ AccessToken: "mock-access-token-2",
+ }
+ data2Json, err := json.Marshal(data2)
+ if err != nil {
+ t.Fatalf("json.Marshal(%v) failed: %v", data2, err)
+ }
+ data2JsonReader := bytes.NewReader(data2Json)
+ req, err = http.NewRequest(method, path, data2JsonReader)
+ if err != nil {
+ t.Fatalf("http.NewRequest(%v, %v, %v,) failed: %v", method, path, data2JsonReader, err)
+ }
+
+ resp2 := httptest.NewRecorder()
+ wspr.handleCreateAccount(resp2, req)
+ if resp2.Code != 200 {
+ t.Fatalf("Expected handleCreateAccount to return 200 OK, instead got %v", resp2)
+ }
+
+ // Verify that idManager has both accounts
+ gotAccounts = wspr.idManager.AccountsMatching(security.BlessingPattern(fmt.Sprintf("%s%s%v", topLevelName, security.ChainSeparator, security.AllPrincipals)))
+ if len(gotAccounts) != 2 {
+ t.Fatalf("Expected to have 2 accounts, but got %v: %v", len(gotAccounts), gotAccounts)
+ }
+}
+
+func TestHandleAssocAccount(t *testing.T) {
+ wspr, teardown := setup(t)
+ defer teardown()
+
+ // First create an accounts.
+ accountName := "mock-account"
+ identityName := "mock-id"
+ privateID, err := wspr.rt.NewIdentity(identityName)
+ if err != nil {
+ t.Fatalf("wspr.rt.NewIdentity(%v) failed: %v", identityName, err)
+ }
+ if err := wspr.idManager.AddAccount(accountName, privateID); err != nil {
+ t.Fatalf("wspr.idManager.AddAccount(%v, %v) failed; %v", accountName, privateID, err)
+ }
+
+ // Associate with that account
+ method := "POST"
+ path := "/assoc-account"
+
+ origin := "https://my.webapp.com:443"
+ data := assocAccountInput{
+ Name: accountName,
+ Origin: origin,
+ }
+
+ dataJson, err := json.Marshal(data)
+ if err != nil {
+ t.Fatalf("json.Marshal(%v) failed: %v", data, err)
+ }
+
+ dataJsonReader := bytes.NewReader(dataJson)
+ req, err := http.NewRequest(method, path, (dataJsonReader))
+ if err != nil {
+ t.Fatalf("http.NewRequest(%v, %v, %v,) failed: %v", method, path, dataJsonReader, err)
+ }
+
+ resp := httptest.NewRecorder()
+ wspr.handleAssocAccount(resp, req)
+ if resp.Code != 200 {
+ t.Fatalf("Expected handleAssocAccount to return 200 OK, instead got %v", resp)
+ }
+
+ // Verify that idManager has the correct identity for the origin
+ gotID, err := wspr.idManager.Identity(origin)
+ if err != nil {
+ t.Fatalf("wspr.idManager.Identity(%v) failed: %v", origin, err)
+ }
+
+ if gotID == nil {
+ t.Fatalf("Expected wspr.idManager.Identity(%v) to return an valid identity, but got %v", origin, gotID)
+ }
+}
+
+func TestHandleAssocAccountWithMissingAccount(t *testing.T) {
+ wspr, teardown := setup(t)
+ defer teardown()
+
+ method := "POST"
+ path := "/assoc-account"
+
+ accountName := "mock-account"
+ origin := "https://my.webapp.com:443"
+ data := assocAccountInput{
+ Name: accountName,
+ Origin: origin,
+ }
+
+ dataJson, err := json.Marshal(data)
+ if err != nil {
+ t.Fatalf("json.Marshal(%v) failed: %v", data, err)
+ }
+
+ dataJsonReader := bytes.NewReader(dataJson)
+ req, err := http.NewRequest(method, path, (dataJsonReader))
+ if err != nil {
+ t.Fatalf("http.NewRequest(%v, %v, %v,) failed: %v", method, path, dataJsonReader, err)
+ }
+
+ // Verify that the request fails with 400 Bad Request error
+ resp := httptest.NewRecorder()
+ wspr.handleAssocAccount(resp, req)
+ if resp.Code != 400 {
+ t.Fatalf("Expected handleAssocAccount to return 400 error, but got %v", resp)
+ }
+
+ // Verify that idManager has no identities for the origin
+ gotID, err := wspr.idManager.Identity(origin)
+ if err == nil {
+ t.Fatalf("Expected wspr.idManager.Identity(%v) to fail, but got: %v", origin, gotID)
+ }
+
+ if gotID != nil {
+ t.Fatalf("Expected wspr.idManager.Identity(%v) not to return an identity, but got %v", origin, gotID)
+ }
+}
diff --git a/services/wsprd/wspr_nacl/main_nacl.go b/services/wsprd/wspr_nacl/main_nacl.go
new file mode 100644
index 0000000..9696356
--- /dev/null
+++ b/services/wsprd/wspr_nacl/main_nacl.go
@@ -0,0 +1,160 @@
+package main
+
+import (
+ "fmt"
+ "io/ioutil"
+ "os"
+ "runtime/ppapi"
+ "syscall"
+
+ _ "veyron.io/veyron/veyron/runtimes/google/security"
+ "veyron.io/veyron/veyron/services/wsprd/wspr"
+ "veyron.io/veyron/veyron2/ipc"
+ "veyron.io/veyron/veyron2/rt"
+)
+
+func main() {
+ ppapi.Init(newWsprInstance)
+}
+
+// WSPR instance represents an instance on a PPAPI client and receives callbacks from PPAPI to handle events.
+type wsprInstance struct {
+ ppapi.Instance
+}
+
+var _ ppapi.InstanceHandlers = wsprInstance{}
+
+func (inst wsprInstance) DidCreate(args map[string]string) bool {
+ fmt.Printf("Got to DidCreate")
+ return true
+}
+
+func (wsprInstance) DidDestroy() {
+ fmt.Printf("Got to DidDestroy()")
+}
+
+func (wsprInstance) DidChangeView(view ppapi.View) {
+ fmt.Printf("Got to DidChangeView(%v)", view)
+}
+
+func (wsprInstance) DidChangeFocus(has_focus bool) {
+ fmt.Printf("Got to DidChangeFocus(%v)", has_focus)
+}
+
+func (wsprInstance) HandleDocumentLoad(url_loader ppapi.Resource) bool {
+ fmt.Printf("Got to HandleDocumentLoad(%v)", url_loader)
+ return true
+}
+
+func (wsprInstance) HandleInputEvent(event ppapi.InputEvent) bool {
+ fmt.Printf("Got to HandleInputEvent(%v)", event)
+ return true
+}
+
+func (wsprInstance) Graphics3DContextLost() {
+ fmt.Printf("Got to Graphics3DContextLost()")
+}
+
+// StartWSPR handles starting WSPR.
+func (wsprInstance) StartWSPR(message ppapi.Var) {
+ identityContents, err := message.LookupStringValuedKey("identityContents")
+ if err != nil {
+ panic(err.Error())
+ }
+ file, err := ioutil.TempFile(os.TempDir(), "veyron_id")
+ if err != nil {
+ panic(err.Error())
+ }
+ _, err = file.WriteString(identityContents)
+ if err != nil {
+ panic(err.Error())
+ }
+ if err := file.Close(); err != nil {
+ panic(err.Error())
+ }
+ f, err := os.Open(file.Name())
+ if err != nil {
+ panic(err.Error())
+ }
+ b, err := ioutil.ReadAll(f)
+ if err != nil {
+ panic(err.Error())
+ }
+ fmt.Printf("IDENTITY: %s", string(b))
+ f.Close()
+ syscall.Setenv("VEYRON_IDENTITY", file.Name())
+
+ rt.Init()
+
+ veyronProxy, err := message.LookupStringValuedKey("proxy")
+ if err != nil {
+ panic(err.Error())
+ }
+ if veyronProxy == "" {
+ panic("Empty proxy")
+ }
+
+ mounttable, err := message.LookupStringValuedKey("mounttable")
+ if err != nil {
+ panic(err.Error())
+ }
+ syscall.Setenv("MOUNTTABLE_ROOT", mounttable)
+ syscall.Setenv("NAMESPACE_ROOT", mounttable)
+
+ identd, err := message.LookupStringValuedKey("identityd")
+ if err != nil {
+ panic(err.Error())
+ }
+
+ wsprHttpPort, err := message.LookupIntValuedKey("wsprHttpPort")
+ if err != nil {
+ panic(err.Error())
+ }
+
+ // TODO(cnicolaou,bprosnitz) Should we use the roaming profile?
+ // It uses flags. We should change that.
+ listenSpec := ipc.ListenSpec{
+ Proxy: veyronProxy,
+ Protocol: "tcp",
+ Address: ":0",
+ }
+
+ fmt.Printf("Starting WSPR with config: proxy=%q mounttable=%q identityd=%q port=%d", veyronProxy, mounttable, identd, wsprHttpPort)
+ proxy := wspr.NewWSPR(wsprHttpPort, listenSpec, identd)
+ go func() {
+ proxy.Run()
+ }()
+}
+
+// HandleMessage receives messages from Javascript and uses them to perform actions.
+// A message is of the form {"type": "typeName", "body": { stuff here }},
+// where the body is passed to the message handler.
+func (inst wsprInstance) HandleMessage(message ppapi.Var) {
+ type handlerType func(ppapi.Var)
+ handlerMap := map[string]handlerType{
+ "start": inst.StartWSPR,
+ }
+ fmt.Printf("Got to HandleMessage(%v)", message)
+ ty, err := message.LookupStringValuedKey("type")
+ if err != nil {
+ panic(err.Error())
+ }
+ h, ok := handlerMap[ty]
+ if !ok {
+ panic(fmt.Sprintf("No handler found for message type: %q", ty))
+ }
+ body, err := message.LookupKey("body")
+ if err != nil {
+ body = ppapi.VarFromString("INVALID")
+ }
+ h(body)
+ body.Release()
+}
+
+func (wsprInstance) MouseLockLost() {
+ fmt.Printf("Got to MouseLockLost()")
+}
+
+func newWsprInstance(inst ppapi.Instance) ppapi.InstanceHandlers {
+ return wsprInstance{Instance: inst}
+}