x/ref: Restructure services/wspr
The high-level renaming is services/wsprd -> services/wspr
The other renamings are:
services/wsprd/wspr.go -> services/wspr/wsprd/main.go
services/wsprd/browspr/main -> services/wspr/browsprd
services/wsprd/wspr -> services/wspr/wsprlib
services/wsprd/account -> services/wspr/internal/account
services/wsprd/app -> services/wspr/internal/app
services/wsprd/browspr -> services/wspr/internal/browspr
services/wsprd/channel -> services/wspr/internal/channel
services/wsprd/lib -> services/wspr/internal/lib
services/wsprd/lib/testwriter -> services/wspr/internal/lib/testwriter
services/wsprd/namespace -> services/wspr/internal/namespace
services/wsprd/principal -> services/wspr/internal/principal
services/wsprd/rpc -> services/wspr/internal/rpc
services/wsprd/rpc/server -> services/wspr/internal/rpc/server
MultiPart: 1/3
Change-Id: I056b527f6f17495d6c2fffc2dc68d7ac126a5068
diff --git a/services/wspr/internal/account/account.go b/services/wspr/internal/account/account.go
new file mode 100644
index 0000000..f577b59
--- /dev/null
+++ b/services/wspr/internal/account/account.go
@@ -0,0 +1,162 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// The account package contains logic for creating accounts and associating them with origins.
+package account
+
+import (
+ "fmt"
+ "strings"
+ "time"
+
+ "v.io/v23"
+ "v.io/v23/context"
+ "v.io/v23/rpc"
+ "v.io/v23/security"
+ "v.io/x/lib/vlog"
+ "v.io/x/ref/services/wspr/internal/principal"
+)
+
+type BlesserService interface {
+ BlessUsingAccessToken(ctx *context.T, token string, opts ...rpc.CallOpt) (blessingObj security.Blessings, account string, err error)
+}
+
+type bs struct {
+ name string
+}
+
+func (s *bs) BlessUsingAccessToken(ctx *context.T, token string, opts ...rpc.CallOpt) (blessingObj security.Blessings, account string, err error) {
+ client := v23.GetClient(ctx)
+ var call rpc.ClientCall
+ if call, err = client.StartCall(ctx, s.name, "BlessUsingAccessToken", []interface{}{token}, opts...); err != nil {
+ return
+ }
+ var email string
+ if err := call.Finish(&blessingObj, &email); err != nil {
+ return security.Blessings{}, "", err
+ }
+ serverBlessings, _ := call.RemoteBlessings()
+ return blessingObj, accountName(serverBlessings, email), nil
+}
+
+func accountName(serverBlessings []string, email string) string {
+ return strings.Join(serverBlessings, "#") + security.ChainSeparator + email
+}
+
+type AccountManager struct {
+ ctx *context.T
+ blesser BlesserService
+ principalManager *principal.PrincipalManager
+ accounts []string
+}
+
+func NewAccountManager(identitydEndpoint string, principalManager *principal.PrincipalManager) *AccountManager {
+ return &AccountManager{
+ blesser: &bs{name: identitydEndpoint},
+ principalManager: principalManager,
+ }
+}
+
+func (am *AccountManager) CreateAccount(ctx *context.T, accessToken string) (string, error) {
+ // Get a blessing for the access token from blessing server.
+ ctx, cancel := context.WithTimeout(ctx, time.Minute)
+ defer cancel()
+ blessings, account, err := am.blesser.BlessUsingAccessToken(ctx, accessToken)
+ if err != nil {
+ return "", fmt.Errorf("Error getting blessing for access token: %v", err)
+ }
+
+ // Add blessings to principalManager under the provided
+ // account.
+ if err := am.principalManager.AddAccount(account, blessings); err != nil {
+ return "", fmt.Errorf("Error adding account: %v", err)
+ }
+
+ am.accounts = append(am.accounts, account)
+
+ return account, nil
+}
+
+func (am *AccountManager) GetAccounts() []string {
+ return am.accounts
+}
+
+func (am *AccountManager) AssociateAccount(origin, account string, cavs []Caveat) error {
+ caveats, expirations, err := constructCaveats(cavs)
+ if err != nil {
+ return fmt.Errorf("failed to construct caveats: %v", err)
+ }
+ // Store the origin.
+ if err := am.principalManager.AddOrigin(origin, account, caveats, expirations); err != nil {
+ return fmt.Errorf("failed to associate account: %v", err)
+ }
+ vlog.VI(1).Infof("Associated origin %v with account %v and cavs %v", origin, account, caveats)
+ return nil
+}
+
+func (am *AccountManager) LookupPrincipal(origin string) (security.Principal, error) {
+ return am.principalManager.Principal(origin)
+}
+
+func (am *AccountManager) OriginHasAccount(origin string) bool {
+ return am.principalManager.OriginHasAccount(origin)
+}
+
+func (am *AccountManager) PrincipalManager() *principal.PrincipalManager {
+ return am.principalManager
+}
+
+// TODO(bprosnitz) Refactor WSPR to remove this.
+func (am *AccountManager) SetMockBlesser(blesser BlesserService) {
+ am.blesser = blesser
+}
+
+func constructCaveats(cavs []Caveat) ([]security.Caveat, []time.Time, error) {
+ var caveats []security.Caveat
+ var expirations []time.Time
+
+ for _, cav := range cavs {
+ var (
+ caveat security.Caveat
+ expiration time.Time
+ err error
+ )
+ switch cav.Type {
+ case "ExpiryCaveat":
+ caveat, expiration, err = createExpiryCaveat(cav.Args)
+ expirations = append(expirations, expiration)
+ case "MethodCaveat":
+ caveat, err = createMethodCaveat(cav.Args)
+ default:
+ return nil, nil, fmt.Errorf("caveat %v does not exist", cav.Type)
+ }
+ if err != nil {
+ return nil, nil, err
+ }
+ caveats = append(caveats, caveat)
+ }
+ return caveats, expirations, nil
+}
+
+func createExpiryCaveat(arg string) (security.Caveat, time.Time, error) {
+ var zeroTime time.Time
+ dur, err := time.ParseDuration(arg)
+ if err != nil {
+ return security.Caveat{}, zeroTime, fmt.Errorf("time.parseDuration(%v) failed: %v", arg, err)
+ }
+ expirationTime := time.Now().Add(dur)
+ cav, err := security.ExpiryCaveat(expirationTime)
+ if err != nil {
+ return security.Caveat{}, zeroTime, fmt.Errorf("security.ExpiryCaveat(%v) failed: %v", expirationTime, err)
+ }
+ return cav, expirationTime, nil
+}
+
+func createMethodCaveat(a string) (security.Caveat, error) {
+ args := strings.Split(a, ",")
+ if len(args) == 0 {
+ return security.Caveat{}, fmt.Errorf("must pass at least one method")
+ }
+ return security.MethodCaveat(args[0], args[1:]...)
+}
diff --git a/services/wspr/internal/account/account.vdl b/services/wspr/internal/account/account.vdl
new file mode 100644
index 0000000..25d7f0d
--- /dev/null
+++ b/services/wspr/internal/account/account.vdl
@@ -0,0 +1,12 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package account
+
+// Caveat describes a restriction on the validity of a blessing/discharge.
+// TODO Remove this
+type Caveat struct {
+ Type string
+ Args string
+}
diff --git a/services/wspr/internal/account/account.vdl.go b/services/wspr/internal/account/account.vdl.go
new file mode 100644
index 0000000..cc306d0
--- /dev/null
+++ b/services/wspr/internal/account/account.vdl.go
@@ -0,0 +1,29 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// This file was auto-generated by the vanadium vdl tool.
+// Source: account.vdl
+
+package account
+
+import (
+ // VDL system imports
+ "v.io/v23/vdl"
+)
+
+// Caveat describes a restriction on the validity of a blessing/discharge.
+// TODO Remove this
+type Caveat struct {
+ Type string
+ Args string
+}
+
+func (Caveat) __VDLReflect(struct {
+ Name string "v.io/x/ref/services/wspr/internal/account.Caveat"
+}) {
+}
+
+func init() {
+ vdl.Register((*Caveat)(nil))
+}
diff --git a/services/wspr/internal/app/app.go b/services/wspr/internal/app/app.go
new file mode 100644
index 0000000..6c8b51c
--- /dev/null
+++ b/services/wspr/internal/app/app.go
@@ -0,0 +1,763 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// The app package contains the struct that keeps per javascript app state and handles translating
+// javascript requests to vanadium requests and vice versa.
+package app
+
+import (
+ "bytes"
+ "encoding/hex"
+ "fmt"
+ "io"
+ "reflect"
+ "sync"
+ "time"
+
+ "v.io/v23"
+ "v.io/v23/context"
+ "v.io/v23/naming"
+ "v.io/v23/options"
+ "v.io/v23/rpc"
+ "v.io/v23/security"
+ "v.io/v23/vdl"
+ "v.io/v23/vdlroot/signature"
+ "v.io/v23/verror"
+ "v.io/v23/vom"
+ "v.io/v23/vtrace"
+ "v.io/x/lib/vlog"
+ "v.io/x/ref/services/wspr/internal/lib"
+ "v.io/x/ref/services/wspr/internal/namespace"
+ "v.io/x/ref/services/wspr/internal/principal"
+ "v.io/x/ref/services/wspr/internal/rpc/server"
+)
+
+const (
+ // pkgPath is the prefix os errors in this package.
+ pkgPath = "v.io/x/ref/services/wspr/internal/app"
+)
+
+// Errors
+var (
+ marshallingError = verror.Register(pkgPath+".marshallingError", verror.NoRetry, "{1} {2} marshalling error {_}")
+ noResults = verror.Register(pkgPath+".noResults", verror.NoRetry, "{1} {2} no results from call {_}")
+ badCaveatType = verror.Register(pkgPath+".badCaveatType", verror.NoRetry, "{1} {2} bad caveat type {_}")
+ unknownBlessings = verror.Register(pkgPath+".unknownBlessings", verror.NoRetry, "{1} {2} unknown public id {_}")
+ invalidBlessingsHandle = verror.Register(pkgPath+".invalidBlessingsHandle", verror.NoRetry, "{1} {2} invalid blessings handle {_}")
+)
+
+type outstandingRequest struct {
+ stream *outstandingStream
+ cancel context.CancelFunc
+}
+
+// Controller represents all the state of a Vanadium Web App. This is the struct
+// that is in charge performing all the vanadium options.
+type Controller struct {
+ // Protects everything.
+ // TODO(bjornick): We need to split this up.
+ sync.Mutex
+
+ // The context of this controller.
+ ctx *context.T
+
+ // The cleanup function for this controller.
+ cancel context.CancelFunc
+
+ // The rpc.ListenSpec to use with server.Listen
+ listenSpec *rpc.ListenSpec
+
+ // Used to generate unique ids for requests initiated by the proxy.
+ // These ids will be even so they don't collide with the ids generated
+ // by the client.
+ lastGeneratedId int32
+
+ // Used to keep track of data (streams and cancellation functions) for
+ // outstanding requests.
+ outstandingRequests map[int32]*outstandingRequest
+
+ // Maps flowids to the server that owns them.
+ flowMap map[int32]*server.Server
+
+ // A manager that Handles fetching and caching signature of remote services
+ signatureManager lib.SignatureManager
+
+ // We maintain multiple Vanadium server per pipe for serving JavaScript
+ // services.
+ servers map[uint32]*server.Server
+
+ // Creates a client writer for a given flow. This is a member so that tests can override
+ // the default implementation.
+ writerCreator func(id int32) lib.ClientWriter
+
+ // Cache for all the Blessings that javascript has a handle to.
+ blessingsCache *principal.JSBlessingsHandles
+
+ // reservedServices contains a map of reserved service names. These
+ // are objects that serve requests in wspr without actually making
+ // an outgoing rpc call.
+ reservedServices map[string]rpc.Invoker
+}
+
+// NewController creates a new Controller. writerCreator will be used to create a new flow for rpcs to
+// javascript server.
+func NewController(ctx *context.T, writerCreator func(id int32) lib.ClientWriter, listenSpec *rpc.ListenSpec, namespaceRoots []string, p security.Principal) (*Controller, error) {
+ ctx, cancel := context.WithCancel(ctx)
+
+ if namespaceRoots != nil {
+ var err error
+ ctx, _, err = v23.SetNewNamespace(ctx, namespaceRoots...)
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ ctx, _ = vtrace.SetNewTrace(ctx)
+
+ ctx, err := v23.SetPrincipal(ctx, p)
+ if err != nil {
+ return nil, err
+ }
+
+ controller := &Controller{
+ ctx: ctx,
+ cancel: cancel,
+ writerCreator: writerCreator,
+ listenSpec: listenSpec,
+ blessingsCache: principal.NewJSBlessingsHandles(),
+ }
+
+ controllerInvoker, err := rpc.ReflectInvoker(ControllerServer(controller))
+ if err != nil {
+ return nil, err
+ }
+ namespaceInvoker, err := rpc.ReflectInvoker(namespace.New(ctx))
+ if err != nil {
+ return nil, err
+ }
+ controller.reservedServices = map[string]rpc.Invoker{
+ "__controller": controllerInvoker,
+ "__namespace": namespaceInvoker,
+ }
+
+ controller.setup()
+ return controller, nil
+}
+
+// finishCall waits for the call to finish and write out the response to w.
+func (c *Controller) finishCall(ctx *context.T, w lib.ClientWriter, clientCall rpc.ClientCall, msg *RpcRequest, span vtrace.Span) {
+ if msg.IsStreaming {
+ for {
+ var item interface{}
+ if err := clientCall.Recv(&item); err != nil {
+ if err == io.EOF {
+ break
+ }
+ w.Error(err) // Send streaming error as is
+ return
+ }
+ vomItem, err := lib.VomEncode(item)
+ if err != nil {
+ w.Error(verror.New(marshallingError, ctx, item, err))
+ continue
+ }
+ if err := w.Send(lib.ResponseStream, vomItem); err != nil {
+ w.Error(verror.New(marshallingError, ctx, item))
+ }
+ }
+ if err := w.Send(lib.ResponseStreamClose, nil); err != nil {
+ w.Error(verror.New(marshallingError, ctx, "ResponseStreamClose"))
+ }
+ }
+ results := make([]*vdl.Value, msg.NumOutArgs)
+ // This array will have pointers to the values in results.
+ resultptrs := make([]interface{}, msg.NumOutArgs)
+ for i := range results {
+ resultptrs[i] = &results[i]
+ }
+ if err := clientCall.Finish(resultptrs...); err != nil {
+ // return the call system error as is
+ w.Error(err)
+ return
+ }
+ c.sendRPCResponse(ctx, w, span, results)
+}
+
+func (c *Controller) sendRPCResponse(ctx *context.T, w lib.ClientWriter, span vtrace.Span, results []*vdl.Value) {
+ span.Finish()
+ response := RpcResponse{
+ OutArgs: results,
+ TraceResponse: vtrace.GetResponse(ctx),
+ }
+ encoded, err := lib.VomEncode(response)
+ if err != nil {
+ w.Error(err)
+ return
+ }
+ if err := w.Send(lib.ResponseFinal, encoded); err != nil {
+ w.Error(verror.Convert(marshallingError, ctx, err))
+ }
+}
+
+// callOpts turns a slice of type []RpcCallOption object into an array of rpc.CallOpt.
+func (c *Controller) callOpts(opts []RpcCallOption) ([]rpc.CallOpt, error) {
+ var callOpts []rpc.CallOpt
+
+ for _, opt := range opts {
+ switch v := opt.(type) {
+ case RpcCallOptionAllowedServersPolicy:
+ callOpts = append(callOpts, options.AllowedServersPolicy(v.Value))
+ case RpcCallOptionRetryTimeout:
+ callOpts = append(callOpts, options.RetryTimeout(v.Value))
+ default:
+ return nil, fmt.Errorf("Unknown RpcCallOption type %T", v)
+ }
+ }
+
+ return callOpts, nil
+}
+
+func (c *Controller) startCall(ctx *context.T, w lib.ClientWriter, msg *RpcRequest, inArgs []interface{}) (rpc.ClientCall, error) {
+ methodName := lib.UppercaseFirstCharacter(msg.Method)
+ callOpts, err := c.callOpts(msg.CallOptions)
+ if err != nil {
+ return nil, err
+ }
+ clientCall, err := v23.GetClient(ctx).StartCall(ctx, msg.Name, methodName, inArgs, callOpts...)
+ if err != nil {
+ return nil, fmt.Errorf("error starting call (name: %v, method: %v, args: %v): %v", msg.Name, methodName, inArgs, err)
+ }
+
+ return clientCall, nil
+}
+
+// Implements the serverHelper interface
+
+// CreateNewFlow creats a new server flow that will be used to write out
+// streaming messages to Javascript.
+func (c *Controller) CreateNewFlow(s *server.Server, stream rpc.Stream) *server.Flow {
+ c.Lock()
+ defer c.Unlock()
+ id := c.lastGeneratedId
+ c.lastGeneratedId += 2
+ c.flowMap[id] = s
+ os := newStream()
+ os.init(stream)
+ c.outstandingRequests[id] = &outstandingRequest{
+ stream: os,
+ }
+ return &server.Flow{ID: id, Writer: c.writerCreator(id)}
+}
+
+// CleanupFlow removes the bookkeping for a previously created flow.
+func (c *Controller) CleanupFlow(id int32) {
+ c.Lock()
+ request := c.outstandingRequests[id]
+ delete(c.outstandingRequests, id)
+ delete(c.flowMap, id)
+ c.Unlock()
+ if request != nil && request.stream != nil {
+ request.stream.end()
+ request.stream.waitUntilDone()
+ }
+}
+
+// RT returns the runtime of the app.
+func (c *Controller) Context() *context.T {
+ return c.ctx
+}
+
+// AddBlessings adds the Blessings to the local blessings store and returns
+// the handle to it. This function exists because JS only has
+// a handle to the blessings to avoid shipping the certificate forest
+// to JS and back.
+func (c *Controller) AddBlessings(blessings security.Blessings) principal.BlessingsHandle {
+ return c.blessingsCache.Add(blessings)
+}
+
+// Cleanup cleans up any outstanding rpcs.
+func (c *Controller) Cleanup() {
+ vlog.VI(0).Info("Cleaning up controller")
+ c.Lock()
+
+ for _, request := range c.outstandingRequests {
+ if request.cancel != nil {
+ request.cancel()
+ }
+ if request.stream != nil {
+ request.stream.end()
+ }
+ }
+
+ servers := []*server.Server{}
+ for _, server := range c.servers {
+ servers = append(servers, server)
+ }
+
+ c.Unlock()
+
+ // We must unlock before calling server.Stop otherwise it can deadlock.
+ for _, server := range servers {
+ server.Stop()
+ }
+
+ c.cancel()
+}
+
+func (c *Controller) setup() {
+ c.signatureManager = lib.NewSignatureManager()
+ c.outstandingRequests = make(map[int32]*outstandingRequest)
+ c.flowMap = make(map[int32]*server.Server)
+ c.servers = make(map[uint32]*server.Server)
+}
+
+// SendOnStream writes data on id's stream. The actual network write will be
+// done asynchronously. If there is an error, it will be sent to w.
+func (c *Controller) SendOnStream(id int32, data string, w lib.ClientWriter) {
+ c.Lock()
+ request := c.outstandingRequests[id]
+ if request == nil || request.stream == nil {
+ vlog.Errorf("unknown stream: %d", id)
+ c.Unlock()
+ return
+ }
+ stream := request.stream
+ c.Unlock()
+ stream.send(data, w)
+}
+
+// SendVeyronRequest makes a vanadium request for the given flowId. If signal is non-nil, it will receive
+// the call object after it has been constructed.
+func (c *Controller) sendVeyronRequest(ctx *context.T, id int32, msg *RpcRequest, inArgs []interface{}, w lib.ClientWriter, stream *outstandingStream, span vtrace.Span) {
+ sig, err := c.getSignature(ctx, msg.Name)
+ if err != nil {
+ w.Error(err)
+ return
+ }
+ methName := lib.UppercaseFirstCharacter(msg.Method)
+ methSig, ok := signature.FirstMethod(sig, methName)
+ if !ok {
+ w.Error(fmt.Errorf("method %q not found in signature: %#v", methName, sig))
+ return
+ }
+ if len(methSig.InArgs) != len(inArgs) {
+ w.Error(fmt.Errorf("invalid number of arguments, expected: %v, got:%v", methSig, *msg))
+ return
+ }
+
+ // We have to make the start call synchronous so we can make sure that we populate
+ // the call map before we can Handle a recieve call.
+ call, err := c.startCall(ctx, w, msg, inArgs)
+ if err != nil {
+ w.Error(verror.Convert(verror.ErrInternal, ctx, err))
+ return
+ }
+
+ if stream != nil {
+ stream.init(call)
+ }
+
+ c.finishCall(ctx, w, call, msg, span)
+ c.Lock()
+ if request, ok := c.outstandingRequests[id]; ok {
+ delete(c.outstandingRequests, id)
+ if request.cancel != nil {
+ request.cancel()
+ }
+ }
+ c.Unlock()
+}
+
+// TODO(mattr): This is a very limited implementation of ServerCall,
+// but currently none of the methods the controller exports require
+// any of this context information.
+type localCall struct {
+ ctx *context.T
+ vrpc *RpcRequest
+ tags []*vdl.Value
+ w lib.ClientWriter
+}
+
+func (l *localCall) Send(item interface{}) error {
+ vomItem, err := lib.VomEncode(item)
+ if err != nil {
+ err = verror.New(marshallingError, l.ctx, item, err)
+ l.w.Error(err)
+ return err
+ }
+ if err := l.w.Send(lib.ResponseStream, vomItem); err != nil {
+ err = verror.New(marshallingError, l.ctx, item)
+ l.w.Error(err)
+ return err
+ }
+ return nil
+}
+func (l *localCall) Recv(interface{}) error { return nil }
+func (l *localCall) GrantedBlessings() security.Blessings { return security.Blessings{} }
+func (l *localCall) Server() rpc.Server { return nil }
+func (l *localCall) Context() *context.T { return l.ctx }
+func (l *localCall) Timestamp() (t time.Time) { return }
+func (l *localCall) Method() string { return l.vrpc.Method }
+func (l *localCall) MethodTags() []*vdl.Value { return l.tags }
+func (l *localCall) Name() string { return l.vrpc.Name }
+func (l *localCall) Suffix() string { return "" }
+func (l *localCall) LocalDischarges() map[string]security.Discharge { return nil }
+func (l *localCall) RemoteDischarges() map[string]security.Discharge { return nil }
+func (l *localCall) LocalPrincipal() security.Principal { return nil }
+func (l *localCall) LocalBlessings() security.Blessings { return security.Blessings{} }
+func (l *localCall) RemoteBlessings() security.Blessings { return security.Blessings{} }
+func (l *localCall) LocalEndpoint() naming.Endpoint { return nil }
+func (l *localCall) RemoteEndpoint() naming.Endpoint { return nil }
+func (l *localCall) VanadiumContext() *context.T { return l.ctx }
+
+func (c *Controller) handleInternalCall(ctx *context.T, invoker rpc.Invoker, msg *RpcRequest, decoder *vom.Decoder, w lib.ClientWriter, span vtrace.Span) {
+ argptrs, tags, err := invoker.Prepare(msg.Method, int(msg.NumInArgs))
+ if err != nil {
+ w.Error(verror.Convert(verror.ErrInternal, ctx, err))
+ return
+ }
+ for _, argptr := range argptrs {
+ if err := decoder.Decode(argptr); err != nil {
+ w.Error(verror.Convert(verror.ErrInternal, ctx, err))
+ return
+ }
+ }
+ results, err := invoker.Invoke(msg.Method, &localCall{ctx, msg, tags, w}, argptrs)
+ if err != nil {
+ w.Error(verror.Convert(verror.ErrInternal, ctx, err))
+ return
+ }
+ if msg.IsStreaming {
+ if err := w.Send(lib.ResponseStreamClose, nil); err != nil {
+ w.Error(verror.New(marshallingError, ctx, "ResponseStreamClose"))
+ }
+ }
+
+ // Convert results from []interface{} to []*vdl.Value.
+ vresults := make([]*vdl.Value, len(results))
+ for i, res := range results {
+ vv, err := vdl.ValueFromReflect(reflect.ValueOf(res))
+ if err != nil {
+ w.Error(verror.Convert(verror.ErrInternal, ctx, err))
+ return
+ }
+ vresults[i] = vv
+ }
+ c.sendRPCResponse(ctx, w, span, vresults)
+}
+
+// HandleCaveatValidationResponse handles the response to caveat validation
+// requests.
+func (c *Controller) HandleCaveatValidationResponse(id int32, data string) {
+ c.Lock()
+ server := c.flowMap[id]
+ c.Unlock()
+ if server == nil {
+ vlog.Errorf("unexpected result from JavaScript. No server found matching id %d.", id)
+ return // ignore unknown server
+ }
+ server.HandleCaveatValidationResponse(id, data)
+}
+
+// HandleVeyronRequest starts a vanadium rpc and returns before the rpc has been completed.
+func (c *Controller) HandleVeyronRequest(ctx *context.T, id int32, data string, w lib.ClientWriter) {
+ binbytes, err := hex.DecodeString(data)
+ if err != nil {
+ w.Error(verror.Convert(verror.ErrInternal, ctx, err))
+ return
+ }
+ decoder, err := vom.NewDecoder(bytes.NewReader(binbytes))
+ if err != nil {
+ w.Error(verror.Convert(verror.ErrInternal, ctx, err))
+ return
+ }
+
+ var msg RpcRequest
+ if err := decoder.Decode(&msg); err != nil {
+ w.Error(verror.Convert(verror.ErrInternal, ctx, err))
+ return
+ }
+ vlog.VI(2).Infof("Rpc: %s.%s(..., streaming=%v)", msg.Name, msg.Method, msg.IsStreaming)
+ spanName := fmt.Sprintf("<wspr>%q.%s", msg.Name, msg.Method)
+ ctx, span := vtrace.SetContinuedTrace(ctx, spanName, msg.TraceRequest)
+
+ var cctx *context.T
+ var cancel context.CancelFunc
+
+ // TODO(mattr): To be consistent with go, we should not ignore 0 timeouts.
+ // However as a rollout strategy we must, otherwise there is a circular
+ // dependency between the WSPR change and the JS change that will follow.
+ if msg.Deadline.IsZero() {
+ cctx, cancel = context.WithCancel(ctx)
+ } else {
+ cctx, cancel = context.WithDeadline(ctx, msg.Deadline.Time)
+ }
+
+ // If this message is for an internal service, do a short-circuit dispatch here.
+ if invoker, ok := c.reservedServices[msg.Name]; ok {
+ go c.handleInternalCall(ctx, invoker, &msg, decoder, w, span)
+ return
+ }
+
+ inArgs := make([]interface{}, msg.NumInArgs)
+ for i := range inArgs {
+ var v *vdl.Value
+ if err := decoder.Decode(&v); err != nil {
+ w.Error(err)
+ return
+ }
+ inArgs[i] = v
+ }
+
+ request := &outstandingRequest{
+ cancel: cancel,
+ }
+ if msg.IsStreaming {
+ // If this rpc is streaming, we would expect that the client would try to send
+ // on this stream. Since the initial handshake is done asynchronously, we have
+ // to put the outstanding stream in the map before we make the async call so that
+ // the future send know which queue to write to, even if the client call isn't
+ // actually ready yet.
+ request.stream = newStream()
+ }
+ c.Lock()
+ c.outstandingRequests[id] = request
+ go c.sendVeyronRequest(cctx, id, &msg, inArgs, w, request.stream, span)
+ c.Unlock()
+}
+
+// HandleVeyronCancellation cancels the request corresponding to the
+// given id if it is still outstanding.
+func (c *Controller) HandleVeyronCancellation(id int32) {
+ c.Lock()
+ defer c.Unlock()
+ if request, ok := c.outstandingRequests[id]; ok && request.cancel != nil {
+ request.cancel()
+ }
+}
+
+// CloseStream closes the stream for a given id.
+func (c *Controller) CloseStream(id int32) {
+ c.Lock()
+ defer c.Unlock()
+ if request, ok := c.outstandingRequests[id]; ok && request.stream != nil {
+ request.stream.end()
+ return
+ }
+ vlog.Errorf("close called on non-existent call: %v", id)
+}
+
+func (c *Controller) maybeCreateServer(serverId uint32) (*server.Server, error) {
+ c.Lock()
+ defer c.Unlock()
+ if server, ok := c.servers[serverId]; ok {
+ return server, nil
+ }
+ server, err := server.NewServer(serverId, c.listenSpec, c)
+ if err != nil {
+ return nil, err
+ }
+ c.servers[serverId] = server
+ return server, nil
+}
+
+// HandleLookupResponse handles the result of a Dispatcher.Lookup call that was
+// run by the Javascript server.
+func (c *Controller) HandleLookupResponse(id int32, data string) {
+ c.Lock()
+ server := c.flowMap[id]
+ c.Unlock()
+ if server == nil {
+ vlog.Errorf("unexpected result from JavaScript. No channel "+
+ "for MessageId: %d exists. Ignoring the results.", id)
+ //Ignore unknown responses that don't belong to any channel
+ return
+ }
+ server.HandleLookupResponse(id, data)
+}
+
+// HandleAuthResponse handles the result of a Authorizer.Authorize call that was
+// run by the Javascript server.
+func (c *Controller) HandleAuthResponse(id int32, data string) {
+ c.Lock()
+ server := c.flowMap[id]
+ c.Unlock()
+ if server == nil {
+ vlog.Errorf("unexpected result from JavaScript. No channel "+
+ "for MessageId: %d exists. Ignoring the results.", id)
+ //Ignore unknown responses that don't belong to any channel
+ return
+ }
+ server.HandleAuthResponse(id, data)
+}
+
+// Serve instructs WSPR to start listening for calls on behalf
+// of a javascript server.
+func (c *Controller) Serve(_ rpc.ServerCall, name string, serverId uint32) error {
+ server, err := c.maybeCreateServer(serverId)
+ if err != nil {
+ return verror.Convert(verror.ErrInternal, nil, err)
+ }
+ vlog.VI(2).Infof("serving under name: %q", name)
+ if err := server.Serve(name); err != nil {
+ return verror.Convert(verror.ErrInternal, nil, err)
+ }
+ return nil
+}
+
+// Stop instructs WSPR to stop listening for calls for the
+// given javascript server.
+func (c *Controller) Stop(_ rpc.ServerCall, serverId uint32) error {
+ c.Lock()
+ server := c.servers[serverId]
+ if server == nil {
+ c.Unlock()
+ return nil
+ }
+ delete(c.servers, serverId)
+ c.Unlock()
+
+ server.Stop()
+ return nil
+}
+
+// AddName adds a published name to an existing server.
+func (c *Controller) AddName(_ rpc.ServerCall, serverId uint32, name string) error {
+ // Create a server for the pipe, if it does not exist already
+ server, err := c.maybeCreateServer(serverId)
+ if err != nil {
+ return verror.Convert(verror.ErrInternal, nil, err)
+ }
+ // Add name
+ if err := server.AddName(name); err != nil {
+ return verror.Convert(verror.ErrInternal, nil, err)
+ }
+ return nil
+}
+
+// RemoveName removes a published name from an existing server.
+func (c *Controller) RemoveName(_ rpc.ServerCall, serverId uint32, name string) error {
+ // Create a server for the pipe, if it does not exist already
+ server, err := c.maybeCreateServer(serverId)
+ if err != nil {
+ return verror.Convert(verror.ErrInternal, nil, err)
+ }
+ // Remove name
+ server.RemoveName(name)
+ // Remove name from signature cache as well
+ c.signatureManager.FlushCacheEntry(name)
+ return nil
+}
+
+// HandleServerResponse handles the completion of outstanding calls to JavaScript services
+// by filling the corresponding channel with the result from JavaScript.
+func (c *Controller) HandleServerResponse(id int32, data string) {
+ c.Lock()
+ server := c.flowMap[id]
+ c.Unlock()
+ if server == nil {
+ vlog.Errorf("unexpected result from JavaScript. No channel "+
+ "for MessageId: %d exists. Ignoring the results.", id)
+ //Ignore unknown responses that don't belong to any channel
+ return
+ }
+ server.HandleServerResponse(id, data)
+}
+
+// parseVeyronRequest parses a json rpc request into a RpcRequest object.
+func (c *Controller) parseVeyronRequest(data string) (*RpcRequest, error) {
+ var msg RpcRequest
+ if err := lib.VomDecode(data, &msg); err != nil {
+ return nil, err
+ }
+ vlog.VI(2).Infof("RpcRequest: %s.%s(..., streaming=%v)", msg.Name, msg.Method, msg.IsStreaming)
+ return &msg, nil
+}
+
+// getSignature uses the signature manager to get and cache the signature of a remote server.
+func (c *Controller) getSignature(ctx *context.T, name string) ([]signature.Interface, error) {
+ return c.signatureManager.Signature(ctx, name)
+}
+
+// Signature uses the signature manager to get and cache the signature of a remote server.
+func (c *Controller) Signature(call rpc.ServerCall, name string) ([]signature.Interface, error) {
+ return c.getSignature(call.Context(), name)
+}
+
+// UnlinkBlessings removes the given blessings from the blessings store.
+func (c *Controller) UnlinkBlessings(_ rpc.ServerCall, handle principal.BlessingsHandle) error {
+ c.blessingsCache.Remove(handle)
+ return nil
+}
+
+// Bless binds extensions of blessings held by this principal to
+// another principal (represented by its public key).
+func (c *Controller) Bless(_ rpc.ServerCall,
+ publicKey string,
+ blessingHandle principal.BlessingsHandle,
+ extension string,
+ caveats []security.Caveat) (string, principal.BlessingsHandle, error) {
+ var inputBlessing security.Blessings
+ if inputBlessing = c.blessingsCache.Get(blessingHandle); inputBlessing.IsZero() {
+ return "", principal.ZeroHandle, verror.New(invalidBlessingsHandle, nil)
+ }
+
+ key, err := principal.DecodePublicKey(publicKey)
+ if err != nil {
+ return "", principal.ZeroHandle, err
+ }
+
+ if len(caveats) == 0 {
+ caveats = append(caveats, security.UnconstrainedUse())
+ }
+
+ p := v23.GetPrincipal(c.ctx)
+ blessings, err := p.Bless(key, inputBlessing, extension, caveats[0], caveats[1:]...)
+ if err != nil {
+ return "", principal.ZeroHandle, err
+ }
+ handle := c.blessingsCache.Add(blessings)
+ return publicKey, handle, nil
+}
+
+// BlessSelf creates a blessing with the provided name for this principal.
+func (c *Controller) BlessSelf(call rpc.ServerCall,
+ extension string, caveats []security.Caveat) (string, principal.BlessingsHandle, error) {
+ p := v23.GetPrincipal(c.ctx)
+ blessings, err := p.BlessSelf(extension)
+ if err != nil {
+ return "", principal.ZeroHandle, verror.Convert(verror.ErrInternal, nil, err)
+ }
+
+ handle := c.blessingsCache.Add(blessings)
+
+ encKey, err := principal.EncodePublicKey(p.PublicKey())
+ return encKey, handle, err
+}
+
+func (c *Controller) RemoteBlessings(call rpc.ServerCall, name, method string) ([]string, error) {
+ vlog.VI(2).Infof("requesting remote blessings for %q", name)
+
+ cctx, cancel := context.WithTimeout(call.Context(), 5*time.Second)
+ defer cancel()
+
+ clientCall, err := v23.GetClient(cctx).StartCall(cctx, name, method, nil)
+ if err != nil {
+ return nil, verror.Convert(verror.ErrInternal, cctx, err)
+ }
+
+ blessings, _ := clientCall.RemoteBlessings()
+ return blessings, nil
+}
+
+func (c *Controller) SendLogMessage(level lib.LogLevel, msg string) error {
+ c.Lock()
+ defer c.Unlock()
+ id := c.lastGeneratedId
+ c.lastGeneratedId += 2
+ return c.writerCreator(id).Send(lib.ResponseLog, lib.LogMessage{
+ Level: level,
+ Message: msg,
+ })
+}
diff --git a/services/wspr/internal/app/app.vdl b/services/wspr/internal/app/app.vdl
new file mode 100644
index 0000000..8f5f122
--- /dev/null
+++ b/services/wspr/internal/app/app.vdl
@@ -0,0 +1,47 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// The app package contains the struct that keeps per javascript app state and handles translating
+// javascript requests to vanadium requests and vice versa.
+package app
+
+import (
+ "time"
+
+ "v.io/v23/security"
+ "v.io/v23/vtrace"
+)
+
+type RpcRequest struct {
+ Name string
+ Method string
+ NumInArgs int32
+ NumOutArgs int32
+ IsStreaming bool
+ Deadline time.WireDeadline
+ TraceRequest vtrace.Request
+ CallOptions []RpcCallOption
+}
+
+// TODO(nlacasse): It would be nice if RpcCallOption were a struct with
+// optional types, like:
+//
+// type RpcCallOptions struct {
+// AllowedServersPolicy ?[]security.BlessingPattern
+// RetryTimeout ?time.Duration
+// }
+//
+// Unfortunately, optional types are currently only supported for structs, not
+// slices or primitive types. Once optional types are better supported, switch
+// this to a struct with optional types.
+
+type RpcCallOption union {
+ AllowedServersPolicy []security.BlessingPattern
+ RetryTimeout time.Duration
+}
+
+type RpcResponse struct {
+ OutArgs []any
+ TraceResponse vtrace.Response
+}
diff --git a/services/wspr/internal/app/app.vdl.go b/services/wspr/internal/app/app.vdl.go
new file mode 100644
index 0000000..60dfc1f
--- /dev/null
+++ b/services/wspr/internal/app/app.vdl.go
@@ -0,0 +1,90 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// This file was auto-generated by the vanadium vdl tool.
+// Source: app.vdl
+
+// The app package contains the struct that keeps per javascript app state and handles translating
+// javascript requests to vanadium requests and vice versa.
+package app
+
+import (
+ // VDL system imports
+ "v.io/v23/vdl"
+
+ // VDL user imports
+ "time"
+ "v.io/v23/security"
+ time_2 "v.io/v23/vdlroot/time"
+ "v.io/v23/vtrace"
+)
+
+type RpcRequest struct {
+ Name string
+ Method string
+ NumInArgs int32
+ NumOutArgs int32
+ IsStreaming bool
+ Deadline time_2.Deadline
+ TraceRequest vtrace.Request
+ CallOptions []RpcCallOption
+}
+
+func (RpcRequest) __VDLReflect(struct {
+ Name string "v.io/x/ref/services/wspr/internal/app.RpcRequest"
+}) {
+}
+
+type (
+ // RpcCallOption represents any single field of the RpcCallOption union type.
+ RpcCallOption interface {
+ // Index returns the field index.
+ Index() int
+ // Interface returns the field value as an interface.
+ Interface() interface{}
+ // Name returns the field name.
+ Name() string
+ // __VDLReflect describes the RpcCallOption union type.
+ __VDLReflect(__RpcCallOptionReflect)
+ }
+ // RpcCallOptionAllowedServersPolicy represents field AllowedServersPolicy of the RpcCallOption union type.
+ RpcCallOptionAllowedServersPolicy struct{ Value []security.BlessingPattern }
+ // RpcCallOptionRetryTimeout represents field RetryTimeout of the RpcCallOption union type.
+ RpcCallOptionRetryTimeout struct{ Value time.Duration }
+ // __RpcCallOptionReflect describes the RpcCallOption union type.
+ __RpcCallOptionReflect struct {
+ Name string "v.io/x/ref/services/wspr/internal/app.RpcCallOption"
+ Type RpcCallOption
+ Union struct {
+ AllowedServersPolicy RpcCallOptionAllowedServersPolicy
+ RetryTimeout RpcCallOptionRetryTimeout
+ }
+ }
+)
+
+func (x RpcCallOptionAllowedServersPolicy) Index() int { return 0 }
+func (x RpcCallOptionAllowedServersPolicy) Interface() interface{} { return x.Value }
+func (x RpcCallOptionAllowedServersPolicy) Name() string { return "AllowedServersPolicy" }
+func (x RpcCallOptionAllowedServersPolicy) __VDLReflect(__RpcCallOptionReflect) {}
+
+func (x RpcCallOptionRetryTimeout) Index() int { return 1 }
+func (x RpcCallOptionRetryTimeout) Interface() interface{} { return x.Value }
+func (x RpcCallOptionRetryTimeout) Name() string { return "RetryTimeout" }
+func (x RpcCallOptionRetryTimeout) __VDLReflect(__RpcCallOptionReflect) {}
+
+type RpcResponse struct {
+ OutArgs []*vdl.Value
+ TraceResponse vtrace.Response
+}
+
+func (RpcResponse) __VDLReflect(struct {
+ Name string "v.io/x/ref/services/wspr/internal/app.RpcResponse"
+}) {
+}
+
+func init() {
+ vdl.Register((*RpcRequest)(nil))
+ vdl.Register((*RpcCallOption)(nil))
+ vdl.Register((*RpcResponse)(nil))
+}
diff --git a/services/wspr/internal/app/app_test.go b/services/wspr/internal/app/app_test.go
new file mode 100644
index 0000000..772714d
--- /dev/null
+++ b/services/wspr/internal/app/app_test.go
@@ -0,0 +1,561 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package app
+
+import (
+ "bytes"
+ "encoding/hex"
+ "fmt"
+ "reflect"
+ "sync"
+ "testing"
+
+ "v.io/v23"
+ "v.io/v23/context"
+ "v.io/v23/naming"
+ "v.io/v23/options"
+ "v.io/v23/rpc"
+ "v.io/v23/security"
+ "v.io/v23/vdl"
+ "v.io/v23/vdlroot/signature"
+ vdltime "v.io/v23/vdlroot/time"
+ "v.io/v23/verror"
+ "v.io/v23/vom"
+ "v.io/v23/vtrace"
+
+ "v.io/x/ref/profiles"
+ vsecurity "v.io/x/ref/security"
+ "v.io/x/ref/services/mounttable/mounttablelib"
+ "v.io/x/ref/services/wspr/internal/lib"
+ "v.io/x/ref/services/wspr/internal/lib/testwriter"
+ "v.io/x/ref/services/wspr/internal/rpc/server"
+ "v.io/x/ref/test"
+ "v.io/x/ref/test/testutil"
+)
+
+//go:generate v23 test generate
+
+var testPrincipal = testutil.NewPrincipal("test")
+
+// newBlessedPrincipal returns a new principal that has a blessing from the
+// provided runtime's principal which is set on its BlessingStore such
+// that it is revealed to all clients and servers.
+func newBlessedPrincipal(ctx *context.T) security.Principal {
+ principal := v23.GetPrincipal(ctx)
+ p := testutil.NewPrincipal()
+ b, err := principal.Bless(p.PublicKey(), principal.BlessingStore().Default(), "delegate", security.UnconstrainedUse())
+ if err != nil {
+ panic(err)
+ }
+ if err := vsecurity.SetDefaultBlessings(p, b); err != nil {
+ panic(err)
+ }
+ return p
+}
+
+type simpleAdder struct{}
+
+func (s simpleAdder) Add(_ rpc.ServerCall, a, b int32) (int32, error) {
+ return a + b, nil
+}
+
+func (s simpleAdder) Divide(_ rpc.ServerCall, a, b int32) (int32, error) {
+ if b == 0 {
+ return 0, verror.New(verror.ErrBadArg, nil, "div 0")
+ }
+ return a / b, nil
+}
+
+func (s simpleAdder) StreamingAdd(call rpc.StreamServerCall) (int32, error) {
+ total := int32(0)
+ var value int32
+ for err := call.Recv(&value); err == nil; err = call.Recv(&value) {
+ total += value
+ call.Send(total)
+ }
+ return total, nil
+}
+
+var simpleAddrSig = signature.Interface{
+ Doc: "The empty interface contains methods not attached to any interface.",
+ Methods: []signature.Method{
+ {
+ Name: "Add",
+ InArgs: []signature.Arg{{Type: vdl.Int32Type}, {Type: vdl.Int32Type}},
+ OutArgs: []signature.Arg{{Type: vdl.Int32Type}},
+ },
+ {
+ Name: "Divide",
+ InArgs: []signature.Arg{{Type: vdl.Int32Type}, {Type: vdl.Int32Type}},
+ OutArgs: []signature.Arg{{Type: vdl.Int32Type}},
+ },
+ {
+ Name: "StreamingAdd",
+ OutArgs: []signature.Arg{{Type: vdl.Int32Type}},
+ InStream: &signature.Arg{Type: vdl.AnyType},
+ OutStream: &signature.Arg{Type: vdl.AnyType},
+ },
+ },
+}
+
+func startAnyServer(ctx *context.T, servesMT bool, dispatcher rpc.Dispatcher) (rpc.Server, naming.Endpoint, error) {
+ // Create a new server instance.
+ s, err := v23.NewServer(ctx, options.ServesMountTable(servesMT))
+ if err != nil {
+ return nil, nil, err
+ }
+
+ endpoints, err := s.Listen(v23.GetListenSpec(ctx))
+ if err != nil {
+ return nil, nil, err
+ }
+
+ if err := s.ServeDispatcher("", dispatcher); err != nil {
+ return nil, nil, err
+ }
+ return s, endpoints[0], nil
+}
+
+func startAdderServer(ctx *context.T) (rpc.Server, naming.Endpoint, error) {
+ return startAnyServer(ctx, false, testutil.LeafDispatcher(simpleAdder{}, nil))
+}
+
+func startMountTableServer(ctx *context.T) (rpc.Server, naming.Endpoint, error) {
+ mt, err := mounttablelib.NewMountTableDispatcher("")
+ if err != nil {
+ return nil, nil, err
+ }
+ return startAnyServer(ctx, true, mt)
+}
+
+func TestGetGoServerSignature(t *testing.T) {
+ ctx, shutdown := test.InitForTest()
+ defer shutdown()
+
+ s, endpoint, err := startAdderServer(ctx)
+ if err != nil {
+ t.Errorf("unable to start server: %v", err)
+ t.Fail()
+ return
+ }
+ defer s.Stop()
+
+ spec := v23.GetListenSpec(ctx)
+ spec.Proxy = "mockVeyronProxyEP"
+ controller, err := NewController(ctx, nil, &spec, nil, newBlessedPrincipal(ctx))
+
+ if err != nil {
+ t.Fatalf("Failed to create controller: %v", err)
+ }
+ sig, err := controller.getSignature(ctx, "/"+endpoint.String())
+ if err != nil {
+ t.Fatalf("Failed to get signature: %v", err)
+ }
+ if got, want := len(sig), 2; got != want {
+ t.Fatalf("got signature %#v len %d, want %d", sig, got, want)
+ }
+ if got, want := sig[0], simpleAddrSig; !reflect.DeepEqual(got, want) {
+ t.Errorf("got sig[0] %#v, want: %#v", got, want)
+ }
+ if got, want := sig[1].Name, "__Reserved"; got != want {
+ t.Errorf("got sig[1].Name %#v, want: %#v", got, want)
+ }
+}
+
+type goServerTestCase struct {
+ method string
+ inArgs []interface{}
+ numOutArgs int32
+ streamingInputs []interface{}
+ expectedStream []lib.Response
+ expectedError error
+}
+
+func runGoServerTestCase(t *testing.T, testCase goServerTestCase) {
+ ctx, shutdown := test.InitForTest()
+ defer shutdown()
+
+ s, endpoint, err := startAdderServer(ctx)
+ if err != nil {
+ t.Errorf("unable to start server: %v", err)
+ t.Fail()
+ return
+ }
+ defer s.Stop()
+
+ spec := v23.GetListenSpec(ctx)
+ spec.Proxy = "mockVeyronProxyEP"
+ controller, err := NewController(ctx, nil, &spec, nil, newBlessedPrincipal(ctx))
+
+ if err != nil {
+ t.Errorf("unable to create controller: %v", err)
+ t.Fail()
+ return
+ }
+ writer := testwriter.Writer{}
+ var stream *outstandingStream
+ if len(testCase.streamingInputs) > 0 {
+ stream = newStream()
+ controller.outstandingRequests[0] = &outstandingRequest{
+ stream: stream,
+ }
+ go func() {
+ for _, value := range testCase.streamingInputs {
+ controller.SendOnStream(0, lib.VomEncodeOrDie(value), &writer)
+ }
+ controller.CloseStream(0)
+ }()
+ }
+
+ request := RpcRequest{
+ Name: "/" + endpoint.String(),
+ Method: testCase.method,
+ NumInArgs: int32(len(testCase.inArgs)),
+ NumOutArgs: testCase.numOutArgs,
+ IsStreaming: stream != nil,
+ }
+ controller.sendVeyronRequest(ctx, 0, &request, testCase.inArgs, &writer, stream, vtrace.GetSpan(ctx))
+
+ if err := testwriter.CheckResponses(&writer, testCase.expectedStream, testCase.expectedError); err != nil {
+ t.Error(err)
+ }
+}
+
+func makeRPCResponse(outArgs ...*vdl.Value) string {
+ return lib.VomEncodeOrDie(RpcResponse{
+ OutArgs: outArgs,
+ TraceResponse: vtrace.Response{},
+ })
+}
+
+func TestCallingGoServer(t *testing.T) {
+ runGoServerTestCase(t, goServerTestCase{
+ method: "Add",
+ inArgs: []interface{}{2, 3},
+ numOutArgs: 1,
+ expectedStream: []lib.Response{
+ lib.Response{
+ Message: makeRPCResponse(vdl.Int32Value(5)),
+ Type: lib.ResponseFinal,
+ },
+ },
+ })
+}
+
+func TestCallingGoServerWithError(t *testing.T) {
+ runGoServerTestCase(t, goServerTestCase{
+ method: "Divide",
+ inArgs: []interface{}{1, 0},
+ numOutArgs: 1,
+ expectedError: verror.New(verror.ErrBadArg, nil, "div 0"),
+ })
+}
+
+func TestCallingGoWithStreaming(t *testing.T) {
+ runGoServerTestCase(t, goServerTestCase{
+ method: "StreamingAdd",
+ streamingInputs: []interface{}{1, 2, 3, 4},
+ numOutArgs: 1,
+ expectedStream: []lib.Response{
+ lib.Response{
+ Message: lib.VomEncodeOrDie(int32(1)),
+ Type: lib.ResponseStream,
+ },
+ lib.Response{
+ Message: lib.VomEncodeOrDie(int32(3)),
+ Type: lib.ResponseStream,
+ },
+ lib.Response{
+ Message: lib.VomEncodeOrDie(int32(6)),
+ Type: lib.ResponseStream,
+ },
+ lib.Response{
+ Message: lib.VomEncodeOrDie(int32(10)),
+ Type: lib.ResponseStream,
+ },
+ lib.Response{
+ Message: nil,
+ Type: lib.ResponseStreamClose,
+ },
+ lib.Response{
+ Message: makeRPCResponse(vdl.Int32Value(10)),
+ Type: lib.ResponseFinal,
+ },
+ },
+ })
+}
+
+type runningTest struct {
+ controller *Controller
+ writer *testwriter.Writer
+ mounttableServer rpc.Server
+ proxyShutdown func()
+}
+
+func makeRequest(rpc RpcRequest, args ...interface{}) (string, error) {
+ var buf bytes.Buffer
+ encoder, err := vom.NewEncoder(&buf)
+ if err != nil {
+ return "", err
+ }
+ if err := encoder.Encode(rpc); err != nil {
+ return "", err
+ }
+ for _, arg := range args {
+ if err := encoder.Encode(arg); err != nil {
+ return "", err
+ }
+ }
+ return hex.EncodeToString(buf.Bytes()), nil
+}
+
+func serveServer(ctx *context.T, writer lib.ClientWriter, setController func(*Controller)) (*runningTest, error) {
+ mounttableServer, endpoint, err := startMountTableServer(ctx)
+ if err != nil {
+ return nil, fmt.Errorf("unable to start mounttable: %v", err)
+ }
+
+ proxyShutdown, proxyEndpoint, err := profiles.NewProxy(ctx, "tcp", "127.0.0.1:0", "")
+ if err != nil {
+ return nil, fmt.Errorf("unable to start proxy: %v", err)
+ }
+
+ writerCreator := func(int32) lib.ClientWriter {
+ return writer
+ }
+ spec := v23.GetListenSpec(ctx)
+ spec.Proxy = proxyEndpoint.Name()
+ controller, err := NewController(ctx, writerCreator, &spec, nil, testPrincipal)
+ if err != nil {
+ return nil, err
+ }
+
+ if setController != nil {
+ setController(controller)
+ }
+
+ v23.GetNamespace(controller.Context()).SetRoots("/" + endpoint.String())
+
+ req, err := makeRequest(RpcRequest{
+ Name: "__controller",
+ Method: "Serve",
+ NumInArgs: 2,
+ NumOutArgs: 1,
+ Deadline: vdltime.Deadline{},
+ }, "adder", 0)
+ controller.HandleVeyronRequest(ctx, 0, req, writer)
+
+ testWriter, _ := writer.(*testwriter.Writer)
+ return &runningTest{
+ controller, testWriter, mounttableServer, proxyShutdown,
+ }, nil
+}
+
+// A test case to simulate a Javascript server talking to the App. All the
+// responses from Javascript are mocked and sent back through the method calls.
+// All messages from the client are sent using a go client.
+type jsServerTestCase struct {
+ method string
+ inArgs []interface{}
+ // The set of streaming inputs from the client to the server.
+ // This is passed to the client, which then passes it to the app.
+ clientStream []interface{}
+ // The set of JSON streaming messages sent from Javascript to the
+ // app.
+ serverStream []interface{}
+ // The final response sent by the Javascript server to the
+ // app.
+ finalResponse *vdl.Value
+ // The final error sent by the Javascript server to the app.
+ err error
+
+ // Whether or not the Javascript server has an authorizer or not.
+ // If it does have an authorizer, then err is sent back from the server
+ // to the app.
+ hasAuthorizer bool
+}
+
+func runJsServerTestCase(t *testing.T, testCase jsServerTestCase) {
+ ctx, shutdown := test.InitForTest()
+ defer shutdown()
+
+ vomClientStream := []string{}
+ for _, m := range testCase.clientStream {
+ vomClientStream = append(vomClientStream, lib.VomEncodeOrDie(m))
+ }
+ mock := &mockJSServer{
+ t: t,
+ method: testCase.method,
+ serviceSignature: []signature.Interface{simpleAddrSig},
+ expectedClientStream: vomClientStream,
+ serverStream: testCase.serverStream,
+ hasAuthorizer: testCase.hasAuthorizer,
+ inArgs: testCase.inArgs,
+ finalResponse: testCase.finalResponse,
+ finalError: testCase.err,
+ controllerReady: sync.RWMutex{},
+ }
+ rt, err := serveServer(ctx, mock, func(controller *Controller) {
+ mock.controller = controller
+ })
+ defer rt.mounttableServer.Stop()
+ defer rt.proxyShutdown()
+ defer rt.controller.Cleanup()
+
+ if err != nil {
+ t.Fatalf("could not serve server %v", err)
+ }
+
+ // Get the client that is relevant to the controller so it talks
+ // to the right mounttable.
+ client := v23.GetClient(rt.controller.Context())
+ // And have the client recognize the server, otherwise it won't
+ // authorize calls to it.
+ v23.GetPrincipal(rt.controller.Context()).AddToRoots(v23.GetPrincipal(ctx).BlessingStore().Default())
+
+ if err != nil {
+ t.Fatalf("unable to create client: %v", err)
+ }
+
+ call, err := client.StartCall(rt.controller.Context(), "adder/adder", testCase.method, testCase.inArgs)
+ if err != nil {
+ t.Fatalf("failed to start call: %v", err)
+ }
+
+ for _, msg := range testCase.clientStream {
+ if err := call.Send(msg); err != nil {
+ t.Errorf("unexpected error while sending %v: %v", msg, err)
+ }
+ }
+ if err := call.CloseSend(); err != nil {
+ t.Errorf("unexpected error on close: %v", err)
+ }
+
+ expectedStream := testCase.serverStream
+ for {
+ var data interface{}
+ if err := call.Recv(&data); err != nil {
+ break
+ }
+ if len(expectedStream) == 0 {
+ t.Errorf("unexpected stream value: %v", data)
+ continue
+ }
+ if !reflect.DeepEqual(data, expectedStream[0]) {
+ t.Errorf("unexpected stream value: got %v, expected %v", data, expectedStream[0])
+ }
+ expectedStream = expectedStream[1:]
+ }
+
+ var result *vdl.Value
+ err = call.Finish(&result)
+
+ if verror.ErrorID(err) != verror.ErrorID(testCase.err) {
+ t.Errorf("unexpected err: got %#v, expected %#v", err, testCase.err)
+ }
+
+ if err != nil {
+ return
+ }
+
+ if got, want := result, testCase.finalResponse; !vdl.EqualValue(got, want) {
+ t.Errorf("unexected final response: got %v, want %v", got, want)
+ }
+
+ // ensure there is only one server and then stop the server
+ if len(rt.controller.servers) != 1 {
+ t.Errorf("expected only one server but got: %d", len(rt.controller.servers))
+ return
+ }
+ for serverId := range rt.controller.servers {
+ rt.controller.Stop(nil, serverId)
+ }
+
+ // ensure there is no more servers now
+ if len(rt.controller.servers) != 0 {
+ t.Errorf("expected no server after stopping the only one but got: %d", len(rt.controller.servers))
+ return
+ }
+}
+
+func TestSimpleJSServer(t *testing.T) {
+ runJsServerTestCase(t, jsServerTestCase{
+ method: "Add",
+ inArgs: []interface{}{int32(1), int32(2)},
+ finalResponse: vdl.Int32Value(3),
+ })
+}
+
+func TestJSServerWithAuthorizer(t *testing.T) {
+ runJsServerTestCase(t, jsServerTestCase{
+ method: "Add",
+ inArgs: []interface{}{int32(1), int32(2)},
+ finalResponse: vdl.Int32Value(3),
+ hasAuthorizer: true,
+ })
+}
+
+func TestJSServerWithError(t *testing.T) {
+ err := verror.New(verror.ErrInternal, nil)
+ runJsServerTestCase(t, jsServerTestCase{
+ method: "Divide",
+ inArgs: []interface{}{int32(1), int32(0)},
+ err: err,
+ })
+}
+
+func TestJSServerWithAuthorizerAndAuthError(t *testing.T) {
+ err := verror.New(verror.ErrNoAccess, nil)
+ runJsServerTestCase(t, jsServerTestCase{
+ method: "Add",
+ inArgs: []interface{}{int32(1), int32(2)},
+ hasAuthorizer: true,
+ finalResponse: vdl.Int32Value(3),
+ err: err,
+ })
+}
+func TestJSServerWihStreamingInputs(t *testing.T) {
+ runJsServerTestCase(t, jsServerTestCase{
+ method: "StreamingAdd",
+ clientStream: []interface{}{int32(3), int32(4)},
+ finalResponse: vdl.Int32Value(10),
+ })
+}
+
+func TestJSServerWihStreamingOutputs(t *testing.T) {
+ runJsServerTestCase(t, jsServerTestCase{
+ method: "StreamingAdd",
+ serverStream: []interface{}{int32(3), int32(4)},
+ finalResponse: vdl.Int32Value(10),
+ })
+}
+
+func TestJSServerWihStreamingInputsAndOutputs(t *testing.T) {
+ runJsServerTestCase(t, jsServerTestCase{
+ method: "StreamingAdd",
+ clientStream: []interface{}{int32(1), int32(2)},
+ serverStream: []interface{}{int32(3), int32(4)},
+ finalResponse: vdl.Int32Value(10),
+ })
+}
+
+func TestJSServerWithWrongNumberOfArgs(t *testing.T) {
+ err := verror.New(server.ErrWrongNumberOfArgs, nil, "Add", 3, 2)
+ runJsServerTestCase(t, jsServerTestCase{
+ method: "Add",
+ inArgs: []interface{}{int32(1), int32(2), int32(3)},
+ err: err,
+ })
+}
+
+func TestJSServerWithMethodNotFound(t *testing.T) {
+ methodName := "UnknownMethod"
+ err := verror.New(server.ErrMethodNotFoundInSignature, nil, methodName)
+ runJsServerTestCase(t, jsServerTestCase{
+ method: methodName,
+ inArgs: []interface{}{int32(1), int32(2)},
+ err: err,
+ })
+}
diff --git a/services/wspr/internal/app/controller.vdl b/services/wspr/internal/app/controller.vdl
new file mode 100644
index 0000000..5e7e498
--- /dev/null
+++ b/services/wspr/internal/app/controller.vdl
@@ -0,0 +1,38 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package app
+
+import (
+ "signature"
+
+ "v.io/v23/security"
+ "v.io/x/ref/services/wspr/internal/principal"
+)
+
+type Controller interface {
+ // Serve instructs WSPR to start listening for calls on behalf
+ // of a javascript server.
+ Serve(name string, serverId uint32) error
+ // Stop instructs WSPR to stop listening for calls for the
+ // given javascript server.
+ Stop(serverId uint32) error
+ // AddName adds a published name to an existing server.
+ AddName(serverId uint32, name string) error
+ // RemoveName removes a published name from an existing server.
+ RemoveName(serverId uint32, name string) error
+
+ // UnlinkBlessings removes the given blessings from the blessings store.
+ UnlinkBlessings(handle principal.BlessingsHandle) error
+ // Bless binds extensions of blessings held by this principal to
+ // another principal (represented by its public key).
+ Bless(publicKey string, blessingHandle principal.BlessingsHandle, extension string, caveat []security.Caveat) (string, principal.BlessingsHandle | error)
+ // BlessSelf creates a blessing with the provided name for this principal.
+ BlessSelf(name string, caveats []security.Caveat) (string, principal.BlessingsHandle | error)
+
+ // RemoteBlessings fetches the remote blessings for a given name and method.
+ RemoteBlessings(name, method string) ([]string | error)
+ // Signature fetches the signature for a given name.
+ Signature(name string) ([]signature.Interface | error)
+}
diff --git a/services/wspr/internal/app/controller.vdl.go b/services/wspr/internal/app/controller.vdl.go
new file mode 100644
index 0000000..ffe9ae5
--- /dev/null
+++ b/services/wspr/internal/app/controller.vdl.go
@@ -0,0 +1,343 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// This file was auto-generated by the vanadium vdl tool.
+// Source: controller.vdl
+
+package app
+
+import (
+ // VDL system imports
+ "v.io/v23"
+ "v.io/v23/context"
+ "v.io/v23/rpc"
+
+ // VDL user imports
+ "v.io/v23/security"
+ "v.io/v23/vdlroot/signature"
+ "v.io/x/ref/services/wspr/internal/principal"
+)
+
+// ControllerClientMethods is the client interface
+// containing Controller methods.
+type ControllerClientMethods interface {
+ // Serve instructs WSPR to start listening for calls on behalf
+ // of a javascript server.
+ Serve(ctx *context.T, name string, serverId uint32, opts ...rpc.CallOpt) error
+ // Stop instructs WSPR to stop listening for calls for the
+ // given javascript server.
+ Stop(ctx *context.T, serverId uint32, opts ...rpc.CallOpt) error
+ // AddName adds a published name to an existing server.
+ AddName(ctx *context.T, serverId uint32, name string, opts ...rpc.CallOpt) error
+ // RemoveName removes a published name from an existing server.
+ RemoveName(ctx *context.T, serverId uint32, name string, opts ...rpc.CallOpt) error
+ // UnlinkBlessings removes the given blessings from the blessings store.
+ UnlinkBlessings(ctx *context.T, handle principal.BlessingsHandle, opts ...rpc.CallOpt) error
+ // Bless binds extensions of blessings held by this principal to
+ // another principal (represented by its public key).
+ Bless(ctx *context.T, publicKey string, blessingHandle principal.BlessingsHandle, extension string, caveat []security.Caveat, opts ...rpc.CallOpt) (string, principal.BlessingsHandle, error)
+ // BlessSelf creates a blessing with the provided name for this principal.
+ BlessSelf(ctx *context.T, name string, caveats []security.Caveat, opts ...rpc.CallOpt) (string, principal.BlessingsHandle, error)
+ // RemoteBlessings fetches the remote blessings for a given name and method.
+ RemoteBlessings(ctx *context.T, name string, method string, opts ...rpc.CallOpt) ([]string, error)
+ // Signature fetches the signature for a given name.
+ Signature(ctx *context.T, name string, opts ...rpc.CallOpt) ([]signature.Interface, error)
+}
+
+// ControllerClientStub adds universal methods to ControllerClientMethods.
+type ControllerClientStub interface {
+ ControllerClientMethods
+ rpc.UniversalServiceMethods
+}
+
+// ControllerClient returns a client stub for Controller.
+func ControllerClient(name string) ControllerClientStub {
+ return implControllerClientStub{name}
+}
+
+type implControllerClientStub struct {
+ name string
+}
+
+func (c implControllerClientStub) Serve(ctx *context.T, i0 string, i1 uint32, opts ...rpc.CallOpt) (err error) {
+ var call rpc.ClientCall
+ if call, err = v23.GetClient(ctx).StartCall(ctx, c.name, "Serve", []interface{}{i0, i1}, opts...); err != nil {
+ return
+ }
+ err = call.Finish()
+ return
+}
+
+func (c implControllerClientStub) Stop(ctx *context.T, i0 uint32, opts ...rpc.CallOpt) (err error) {
+ var call rpc.ClientCall
+ if call, err = v23.GetClient(ctx).StartCall(ctx, c.name, "Stop", []interface{}{i0}, opts...); err != nil {
+ return
+ }
+ err = call.Finish()
+ return
+}
+
+func (c implControllerClientStub) AddName(ctx *context.T, i0 uint32, i1 string, opts ...rpc.CallOpt) (err error) {
+ var call rpc.ClientCall
+ if call, err = v23.GetClient(ctx).StartCall(ctx, c.name, "AddName", []interface{}{i0, i1}, opts...); err != nil {
+ return
+ }
+ err = call.Finish()
+ return
+}
+
+func (c implControllerClientStub) RemoveName(ctx *context.T, i0 uint32, i1 string, opts ...rpc.CallOpt) (err error) {
+ var call rpc.ClientCall
+ if call, err = v23.GetClient(ctx).StartCall(ctx, c.name, "RemoveName", []interface{}{i0, i1}, opts...); err != nil {
+ return
+ }
+ err = call.Finish()
+ return
+}
+
+func (c implControllerClientStub) UnlinkBlessings(ctx *context.T, i0 principal.BlessingsHandle, opts ...rpc.CallOpt) (err error) {
+ var call rpc.ClientCall
+ if call, err = v23.GetClient(ctx).StartCall(ctx, c.name, "UnlinkBlessings", []interface{}{i0}, opts...); err != nil {
+ return
+ }
+ err = call.Finish()
+ return
+}
+
+func (c implControllerClientStub) Bless(ctx *context.T, i0 string, i1 principal.BlessingsHandle, i2 string, i3 []security.Caveat, opts ...rpc.CallOpt) (o0 string, o1 principal.BlessingsHandle, err error) {
+ var call rpc.ClientCall
+ if call, err = v23.GetClient(ctx).StartCall(ctx, c.name, "Bless", []interface{}{i0, i1, i2, i3}, opts...); err != nil {
+ return
+ }
+ err = call.Finish(&o0, &o1)
+ return
+}
+
+func (c implControllerClientStub) BlessSelf(ctx *context.T, i0 string, i1 []security.Caveat, opts ...rpc.CallOpt) (o0 string, o1 principal.BlessingsHandle, err error) {
+ var call rpc.ClientCall
+ if call, err = v23.GetClient(ctx).StartCall(ctx, c.name, "BlessSelf", []interface{}{i0, i1}, opts...); err != nil {
+ return
+ }
+ err = call.Finish(&o0, &o1)
+ return
+}
+
+func (c implControllerClientStub) RemoteBlessings(ctx *context.T, i0 string, i1 string, opts ...rpc.CallOpt) (o0 []string, err error) {
+ var call rpc.ClientCall
+ if call, err = v23.GetClient(ctx).StartCall(ctx, c.name, "RemoteBlessings", []interface{}{i0, i1}, opts...); err != nil {
+ return
+ }
+ err = call.Finish(&o0)
+ return
+}
+
+func (c implControllerClientStub) Signature(ctx *context.T, i0 string, opts ...rpc.CallOpt) (o0 []signature.Interface, err error) {
+ var call rpc.ClientCall
+ if call, err = v23.GetClient(ctx).StartCall(ctx, c.name, "Signature", []interface{}{i0}, opts...); err != nil {
+ return
+ }
+ err = call.Finish(&o0)
+ return
+}
+
+// ControllerServerMethods is the interface a server writer
+// implements for Controller.
+type ControllerServerMethods interface {
+ // Serve instructs WSPR to start listening for calls on behalf
+ // of a javascript server.
+ Serve(call rpc.ServerCall, name string, serverId uint32) error
+ // Stop instructs WSPR to stop listening for calls for the
+ // given javascript server.
+ Stop(call rpc.ServerCall, serverId uint32) error
+ // AddName adds a published name to an existing server.
+ AddName(call rpc.ServerCall, serverId uint32, name string) error
+ // RemoveName removes a published name from an existing server.
+ RemoveName(call rpc.ServerCall, serverId uint32, name string) error
+ // UnlinkBlessings removes the given blessings from the blessings store.
+ UnlinkBlessings(call rpc.ServerCall, handle principal.BlessingsHandle) error
+ // Bless binds extensions of blessings held by this principal to
+ // another principal (represented by its public key).
+ Bless(call rpc.ServerCall, publicKey string, blessingHandle principal.BlessingsHandle, extension string, caveat []security.Caveat) (string, principal.BlessingsHandle, error)
+ // BlessSelf creates a blessing with the provided name for this principal.
+ BlessSelf(call rpc.ServerCall, name string, caveats []security.Caveat) (string, principal.BlessingsHandle, error)
+ // RemoteBlessings fetches the remote blessings for a given name and method.
+ RemoteBlessings(call rpc.ServerCall, name string, method string) ([]string, error)
+ // Signature fetches the signature for a given name.
+ Signature(call rpc.ServerCall, name string) ([]signature.Interface, error)
+}
+
+// ControllerServerStubMethods is the server interface containing
+// Controller methods, as expected by rpc.Server.
+// There is no difference between this interface and ControllerServerMethods
+// since there are no streaming methods.
+type ControllerServerStubMethods ControllerServerMethods
+
+// ControllerServerStub adds universal methods to ControllerServerStubMethods.
+type ControllerServerStub interface {
+ ControllerServerStubMethods
+ // Describe the Controller interfaces.
+ Describe__() []rpc.InterfaceDesc
+}
+
+// ControllerServer returns a server stub for Controller.
+// It converts an implementation of ControllerServerMethods into
+// an object that may be used by rpc.Server.
+func ControllerServer(impl ControllerServerMethods) ControllerServerStub {
+ stub := implControllerServerStub{
+ impl: impl,
+ }
+ // Initialize GlobState; always check the stub itself first, to handle the
+ // case where the user has the Glob method defined in their VDL source.
+ if gs := rpc.NewGlobState(stub); gs != nil {
+ stub.gs = gs
+ } else if gs := rpc.NewGlobState(impl); gs != nil {
+ stub.gs = gs
+ }
+ return stub
+}
+
+type implControllerServerStub struct {
+ impl ControllerServerMethods
+ gs *rpc.GlobState
+}
+
+func (s implControllerServerStub) Serve(call rpc.ServerCall, i0 string, i1 uint32) error {
+ return s.impl.Serve(call, i0, i1)
+}
+
+func (s implControllerServerStub) Stop(call rpc.ServerCall, i0 uint32) error {
+ return s.impl.Stop(call, i0)
+}
+
+func (s implControllerServerStub) AddName(call rpc.ServerCall, i0 uint32, i1 string) error {
+ return s.impl.AddName(call, i0, i1)
+}
+
+func (s implControllerServerStub) RemoveName(call rpc.ServerCall, i0 uint32, i1 string) error {
+ return s.impl.RemoveName(call, i0, i1)
+}
+
+func (s implControllerServerStub) UnlinkBlessings(call rpc.ServerCall, i0 principal.BlessingsHandle) error {
+ return s.impl.UnlinkBlessings(call, i0)
+}
+
+func (s implControllerServerStub) Bless(call rpc.ServerCall, i0 string, i1 principal.BlessingsHandle, i2 string, i3 []security.Caveat) (string, principal.BlessingsHandle, error) {
+ return s.impl.Bless(call, i0, i1, i2, i3)
+}
+
+func (s implControllerServerStub) BlessSelf(call rpc.ServerCall, i0 string, i1 []security.Caveat) (string, principal.BlessingsHandle, error) {
+ return s.impl.BlessSelf(call, i0, i1)
+}
+
+func (s implControllerServerStub) RemoteBlessings(call rpc.ServerCall, i0 string, i1 string) ([]string, error) {
+ return s.impl.RemoteBlessings(call, i0, i1)
+}
+
+func (s implControllerServerStub) Signature(call rpc.ServerCall, i0 string) ([]signature.Interface, error) {
+ return s.impl.Signature(call, i0)
+}
+
+func (s implControllerServerStub) Globber() *rpc.GlobState {
+ return s.gs
+}
+
+func (s implControllerServerStub) Describe__() []rpc.InterfaceDesc {
+ return []rpc.InterfaceDesc{ControllerDesc}
+}
+
+// ControllerDesc describes the Controller interface.
+var ControllerDesc rpc.InterfaceDesc = descController
+
+// descController hides the desc to keep godoc clean.
+var descController = rpc.InterfaceDesc{
+ Name: "Controller",
+ PkgPath: "v.io/x/ref/services/wspr/internal/app",
+ Methods: []rpc.MethodDesc{
+ {
+ Name: "Serve",
+ Doc: "// Serve instructs WSPR to start listening for calls on behalf\n// of a javascript server.",
+ InArgs: []rpc.ArgDesc{
+ {"name", ``}, // string
+ {"serverId", ``}, // uint32
+ },
+ },
+ {
+ Name: "Stop",
+ Doc: "// Stop instructs WSPR to stop listening for calls for the\n// given javascript server.",
+ InArgs: []rpc.ArgDesc{
+ {"serverId", ``}, // uint32
+ },
+ },
+ {
+ Name: "AddName",
+ Doc: "// AddName adds a published name to an existing server.",
+ InArgs: []rpc.ArgDesc{
+ {"serverId", ``}, // uint32
+ {"name", ``}, // string
+ },
+ },
+ {
+ Name: "RemoveName",
+ Doc: "// RemoveName removes a published name from an existing server.",
+ InArgs: []rpc.ArgDesc{
+ {"serverId", ``}, // uint32
+ {"name", ``}, // string
+ },
+ },
+ {
+ Name: "UnlinkBlessings",
+ Doc: "// UnlinkBlessings removes the given blessings from the blessings store.",
+ InArgs: []rpc.ArgDesc{
+ {"handle", ``}, // principal.BlessingsHandle
+ },
+ },
+ {
+ Name: "Bless",
+ Doc: "// Bless binds extensions of blessings held by this principal to\n// another principal (represented by its public key).",
+ InArgs: []rpc.ArgDesc{
+ {"publicKey", ``}, // string
+ {"blessingHandle", ``}, // principal.BlessingsHandle
+ {"extension", ``}, // string
+ {"caveat", ``}, // []security.Caveat
+ },
+ OutArgs: []rpc.ArgDesc{
+ {"", ``}, // string
+ {"", ``}, // principal.BlessingsHandle
+ },
+ },
+ {
+ Name: "BlessSelf",
+ Doc: "// BlessSelf creates a blessing with the provided name for this principal.",
+ InArgs: []rpc.ArgDesc{
+ {"name", ``}, // string
+ {"caveats", ``}, // []security.Caveat
+ },
+ OutArgs: []rpc.ArgDesc{
+ {"", ``}, // string
+ {"", ``}, // principal.BlessingsHandle
+ },
+ },
+ {
+ Name: "RemoteBlessings",
+ Doc: "// RemoteBlessings fetches the remote blessings for a given name and method.",
+ InArgs: []rpc.ArgDesc{
+ {"name", ``}, // string
+ {"method", ``}, // string
+ },
+ OutArgs: []rpc.ArgDesc{
+ {"", ``}, // []string
+ },
+ },
+ {
+ Name: "Signature",
+ Doc: "// Signature fetches the signature for a given name.",
+ InArgs: []rpc.ArgDesc{
+ {"name", ``}, // string
+ },
+ OutArgs: []rpc.ArgDesc{
+ {"", ``}, // []signature.Interface
+ },
+ },
+ },
+}
diff --git a/services/wspr/internal/app/messaging.go b/services/wspr/internal/app/messaging.go
new file mode 100644
index 0000000..ff9c7c4
--- /dev/null
+++ b/services/wspr/internal/app/messaging.go
@@ -0,0 +1,182 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package app
+
+import (
+ "bytes"
+ "fmt"
+ "path/filepath"
+ "runtime"
+
+ "v.io/v23/verror"
+ "v.io/v23/vom"
+ "v.io/v23/vtrace"
+ "v.io/x/lib/vlog"
+ "v.io/x/ref/services/wspr/internal/lib"
+)
+
+const (
+ verrorPkgPath = "v.io/x/ref/services/wspr/internal/app"
+)
+
+var (
+ errUnknownMessageType = verror.Register(verrorPkgPath+".unkownMessage", verror.NoRetry, "{1} {2} Unknown message type {_}")
+)
+
+// Incoming message from the javascript client to WSPR.
+type MessageType int32
+
+const (
+ // Making a vanadium client request, streaming or otherwise.
+ VeyronRequestMessage MessageType = 0
+
+ // Serving this under an object name.
+ ServeMessage = 1
+
+ // A response from a service in javascript to a request.
+ // from the proxy.
+ ServerResponseMessage = 2
+
+ // Sending streaming data, either from a JS client or JS service.
+ StreamingValueMessage = 3
+
+ // A response that means the stream is closed by the client.
+ StreamCloseMessage = 4
+
+ // A request to get signature of a remote server.
+ SignatureRequestMessage = 5
+
+ // A request to stop a server.
+ StopServerMessage = 6
+
+ // A request to unlink blessings. This request means that
+ // we can remove the given handle from the handle store.
+ UnlinkBlessingsMessage = 8
+
+ // A request to run the lookup function on a dispatcher.
+ LookupResponseMessage = 11
+
+ // A request to run the authorizer for an rpc.
+ AuthResponseMessage = 12
+
+ // A request to run a namespace client method.
+ NamespaceRequestMessage = 13
+
+ // A request to cancel an rpc initiated by the JS.
+ CancelMessage = 17
+
+ // A request to add a new name to server.
+ AddName = 18
+
+ // A request to remove a name from server.
+ RemoveName = 19
+
+ // A request to get the remove blessings of a server.
+ RemoteBlessings = 20
+
+ // A response to a caveat validation request.
+ CaveatValidationResponse = 21
+)
+
+type Message struct {
+ // TODO(bprosnitz) Consider changing this ID to a larger value.
+ // TODO(bprosnitz) Consider making the ID have positive / negative value
+ // depending on whether from/to JS.
+ Id int32
+ // This contains the json encoded payload.
+ Data string
+
+ // Whether it is an rpc request or a serve request.
+ Type MessageType
+}
+
+// HandleIncomingMessage handles most incoming messages from JS and calls the appropriate handler.
+func (c *Controller) HandleIncomingMessage(msg Message, w lib.ClientWriter) {
+ // TODO(mattr): Get the proper context information from javascript.
+ ctx, _ := vtrace.SetNewTrace(c.Context())
+
+ switch msg.Type {
+ case VeyronRequestMessage:
+ c.HandleVeyronRequest(ctx, msg.Id, msg.Data, w)
+ case CancelMessage:
+ go c.HandleVeyronCancellation(msg.Id)
+ case StreamingValueMessage:
+ // SendOnStream queues up the message to be sent, but doesn't do the send
+ // on this goroutine. We need to queue the messages synchronously so that
+ // the order is preserved.
+ c.SendOnStream(msg.Id, msg.Data, w)
+ case StreamCloseMessage:
+ c.CloseStream(msg.Id)
+
+ case ServerResponseMessage:
+ go c.HandleServerResponse(msg.Id, msg.Data)
+ case LookupResponseMessage:
+ go c.HandleLookupResponse(msg.Id, msg.Data)
+ case AuthResponseMessage:
+ go c.HandleAuthResponse(msg.Id, msg.Data)
+ case CaveatValidationResponse:
+ go c.HandleCaveatValidationResponse(msg.Id, msg.Data)
+
+ default:
+ w.Error(verror.New(errUnknownMessageType, ctx, msg.Type))
+ }
+}
+
+// ConstructOutgoingMessage constructs a message to send to javascript in a consistent format.
+// TODO(bprosnitz) Don't double-encode
+func ConstructOutgoingMessage(messageId int32, messageType lib.ResponseType, data interface{}) (string, error) {
+ var buf bytes.Buffer
+ enc, err := vom.NewEncoder(&buf)
+ if err != nil {
+ return "", err
+ }
+ if err := enc.Encode(lib.Response{Type: messageType, Message: data}); err != nil {
+ return "", err
+ }
+
+ var buf2 bytes.Buffer
+ enc2, err := vom.NewEncoder(&buf2)
+ if err != nil {
+ return "", err
+ }
+ if err := enc2.Encode(Message{Id: messageId, Data: fmt.Sprintf("%x", buf.Bytes())}); err != nil {
+ return "", err
+ }
+
+ return fmt.Sprintf("%x", buf2.Bytes()), nil
+}
+
+// FormatAsVerror formats an error as a verror.
+// This also logs the error.
+func FormatAsVerror(err error) error {
+ verr := verror.Convert(verror.ErrUnknown, nil, err)
+
+ // Also log the error but write internal errors at a more severe log level
+ var logLevel vlog.Level = 2
+ logErr := fmt.Sprintf("%v", verr)
+
+ // Prefix the message with the code locations associated with verr,
+ // except the last, which is the Convert() above. This does nothing if
+ // err was not a verror error.
+ verrStack := verror.Stack(verr)
+ for i := 0; i < len(verrStack)-1; i++ {
+ pc := verrStack[i]
+ fnc := runtime.FuncForPC(pc)
+ file, line := fnc.FileLine(pc)
+ logErr = fmt.Sprintf("%s:%d: %s", file, line)
+ }
+
+ // We want to look at the stack three frames up to find where the error actually
+ // occurred. (caller -> websocketErrorResponse/sendError -> generateErrorMessage).
+ if _, file, line, ok := runtime.Caller(3); ok {
+ logErr = fmt.Sprintf("%s:%d: %s", filepath.Base(file), line, logErr)
+ }
+ if verror.ErrorID(verr) == verror.ErrInternal.ID {
+ logLevel = 2
+ }
+ vlog.VI(logLevel).Info(logErr)
+
+ return verr
+}
diff --git a/services/wspr/internal/app/mock_jsServer_test.go b/services/wspr/internal/app/mock_jsServer_test.go
new file mode 100644
index 0000000..febba77
--- /dev/null
+++ b/services/wspr/internal/app/mock_jsServer_test.go
@@ -0,0 +1,321 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package app
+
+import (
+ "bytes"
+ "encoding/json"
+ "fmt"
+ "reflect"
+ "sync"
+ "testing"
+
+ "v.io/v23/vdl"
+ "v.io/v23/vdlroot/signature"
+ "v.io/v23/vom"
+ "v.io/x/ref/services/wspr/internal/lib"
+ "v.io/x/ref/services/wspr/internal/principal"
+ "v.io/x/ref/services/wspr/internal/rpc/server"
+)
+
+type mockJSServer struct {
+ controller *Controller
+ t *testing.T
+ method string
+ serviceSignature []signature.Interface
+ sender sync.WaitGroup
+ expectedClientStream []string
+ serverStream []interface{}
+ hasAuthorizer bool
+ authError error
+ inArgs []interface{}
+ controllerReady sync.RWMutex
+ finalResponse *vdl.Value
+ receivedResponse *vdl.Value
+ finalError error
+ hasCalledAuth bool
+ // Right now we keep track of the flow count by hand, but maybe we
+ // should setup a different object to handle each flow, so we
+ // can make sure that both sides are using the same flowId. This
+ // isn't a problem right now because the test doesn't do multiple flows
+ // at the same time.
+ flowCount int32
+ rpcFlow int32
+}
+
+func (m *mockJSServer) Send(responseType lib.ResponseType, msg interface{}) error {
+ switch responseType {
+ case lib.ResponseDispatcherLookup:
+ return m.handleDispatcherLookup(msg)
+ case lib.ResponseAuthRequest:
+ return m.handleAuthRequest(msg)
+ case lib.ResponseServerRequest:
+ return m.handleServerRequest(msg)
+ case lib.ResponseValidate:
+ return m.handleValidationRequest(msg)
+ case lib.ResponseStream:
+ return m.handleStream(msg)
+ case lib.ResponseStreamClose:
+ return m.handleStreamClose(msg)
+ case lib.ResponseFinal:
+ if m.receivedResponse != nil {
+ return fmt.Errorf("Two responses received. First was: %#v. Second was: %#v", m.receivedResponse, msg)
+ }
+ m.receivedResponse = vdl.ValueOf(msg)
+ return nil
+ case lib.ResponseLog:
+ m.flowCount += 2
+ return nil
+ }
+ return fmt.Errorf("Unknown message type: %d", responseType)
+}
+
+func internalErrJSON(args interface{}) string {
+ return fmt.Sprintf(`{"err": {
+ "idAction": {
+ "id": "v.io/v23/verror.Internal",
+ "action": 0
+ },
+ "paramList": ["%v"]}, "results":[null]}`, args)
+}
+
+func (m *mockJSServer) Error(err error) {
+ panic(err)
+}
+
+func normalize(msg interface{}) (map[string]interface{}, error) {
+ // We serialize and deserialize the reponse so that we can do deep equal with
+ // messages that contain non-exported structs.
+ var buf bytes.Buffer
+ if err := json.NewEncoder(&buf).Encode(msg); err != nil {
+ return nil, err
+ }
+
+ var r interface{}
+
+ if err := json.NewDecoder(&buf).Decode(&r); err != nil {
+ return nil, err
+ }
+ return r.(map[string]interface{}), nil
+}
+
+func (m *mockJSServer) handleDispatcherLookup(v interface{}) error {
+ defer func() {
+ m.flowCount += 2
+ }()
+ m.controllerReady.RLock()
+ defer m.controllerReady.RUnlock()
+ msg, err := normalize(v)
+ if err != nil {
+ m.controller.HandleLookupResponse(m.flowCount, internalErrJSON(err))
+ return nil
+ }
+ expected := map[string]interface{}{"serverId": 0.0, "suffix": "adder"}
+ if !reflect.DeepEqual(msg, expected) {
+ m.controller.HandleLookupResponse(m.flowCount, internalErrJSON(fmt.Sprintf("got: %v, want: %v", msg, expected)))
+ return nil
+ }
+ bytes, err := json.Marshal(map[string]interface{}{
+ "handle": 0,
+ "signature": lib.VomEncodeOrDie(m.serviceSignature),
+ "hasAuthorizer": m.hasAuthorizer,
+ })
+ if err != nil {
+ m.controller.HandleLookupResponse(m.flowCount, internalErrJSON(fmt.Sprintf("failed to serialize %v", err)))
+ return nil
+ }
+ m.controller.HandleLookupResponse(m.flowCount, string(bytes))
+ return nil
+}
+
+// Returns false if the blessing is malformed
+func validateBlessing(blessings principal.JsBlessings) bool {
+ return blessings.Handle != 0 && blessings.PublicKey != ""
+}
+
+func validateEndpoint(ep string) bool {
+ return ep != ""
+}
+
+func (m *mockJSServer) handleAuthRequest(v interface{}) error {
+ defer func() {
+ m.flowCount += 2
+ }()
+
+ m.hasCalledAuth = true
+ if !m.hasAuthorizer {
+ m.controller.HandleAuthResponse(m.flowCount, internalErrJSON("unexpected auth request"))
+ return nil
+ }
+
+ var msg server.AuthRequest
+ if err := lib.VomDecode(v.(string), &msg); err != nil {
+ m.controller.HandleAuthResponse(m.flowCount, internalErrJSON(fmt.Sprintf("error decoding %v:", err)))
+ return nil
+ }
+
+ if msg.Handle != 0 {
+ m.controller.HandleAuthResponse(m.flowCount, internalErrJSON(fmt.Sprintf("unexpected handled: %f", msg.Handle)))
+ return nil
+ }
+
+ call := msg.Call
+ if field, got, want := "Method", call.Method, lib.LowercaseFirstCharacter(m.method); got != want {
+ m.controller.HandleAuthResponse(m.flowCount, internalErrJSON(fmt.Sprintf("unexpected value for %s: got %v, want %v", field, got, want)))
+ return nil
+ }
+
+ if field, got, want := "Suffix", call.Suffix, "adder"; got != want {
+ m.controller.HandleAuthResponse(m.flowCount, internalErrJSON(fmt.Sprintf("unexpected value for %s: got %v, want %v", field, got, want)))
+ return nil
+ }
+
+ // We expect localBlessings and remoteBlessings to be set and the publicKey be a string
+ if !validateBlessing(call.LocalBlessings) {
+ m.controller.HandleAuthResponse(m.flowCount, internalErrJSON(fmt.Sprintf("bad localblessing:%v", call.LocalBlessings)))
+ return nil
+ }
+ if !validateBlessing(call.RemoteBlessings) {
+ m.controller.HandleAuthResponse(m.flowCount, internalErrJSON(fmt.Sprintf("bad remoteblessing:%v", call.RemoteBlessings)))
+ return nil
+ }
+
+ // We expect endpoints to be set
+ if !validateEndpoint(call.LocalEndpoint) {
+ m.controller.HandleAuthResponse(m.flowCount, internalErrJSON(fmt.Sprintf("bad endpoint:%v", call.LocalEndpoint)))
+ return nil
+ }
+
+ if !validateEndpoint(call.RemoteEndpoint) {
+ m.controller.HandleAuthResponse(m.flowCount, internalErrJSON(fmt.Sprintf("bad endpoint:%v", call.RemoteEndpoint)))
+ return nil
+ }
+
+ bytes, err := json.Marshal(map[string]interface{}{
+ "err": m.authError,
+ })
+ if err != nil {
+ m.controller.HandleAuthResponse(m.flowCount, internalErrJSON(fmt.Sprintf("failed to serialize %v", err)))
+ return nil
+ }
+
+ m.controller.HandleAuthResponse(m.flowCount, string(bytes))
+ return nil
+}
+
+func (m *mockJSServer) handleServerRequest(v interface{}) error {
+ defer func() {
+ m.flowCount += 2
+ }()
+
+ if m.hasCalledAuth != m.hasAuthorizer {
+ m.controller.HandleServerResponse(m.flowCount, internalErrJSON("authorizer hasn't been called yet"))
+ return nil
+ }
+
+ var msg server.ServerRPCRequest
+ if err := lib.VomDecode(v.(string), &msg); err != nil {
+ m.controller.HandleServerResponse(m.flowCount, internalErrJSON(err))
+ return nil
+ }
+
+ if field, got, want := "Method", msg.Method, lib.LowercaseFirstCharacter(m.method); got != want {
+ m.controller.HandleServerResponse(m.flowCount, internalErrJSON(fmt.Sprintf("unexpected value for %s: got %v, want %v", field, got, want)))
+ return nil
+ }
+
+ if field, got, want := "Handle", msg.Handle, int32(0); got != want {
+ m.controller.HandleServerResponse(m.flowCount, internalErrJSON(fmt.Sprintf("unexpected value for %s: got %v, want %v", field, got, want)))
+ return nil
+ }
+
+ if field, got, want := "Args", msg.Args, m.inArgs; !reflect.DeepEqual(got, want) {
+ m.controller.HandleServerResponse(m.flowCount, internalErrJSON(fmt.Sprintf("unexpected value for %s: got %v, want %v", field, got, want)))
+ return nil
+ }
+
+ call := msg.Call.SecurityCall
+ if field, got, want := "Suffix", call.Suffix, "adder"; got != want {
+ m.controller.HandleServerResponse(m.flowCount, internalErrJSON(fmt.Sprintf("unexpected value for %s: got %v, want %v", field, got, want)))
+ return nil
+ }
+
+ if !validateBlessing(call.RemoteBlessings) {
+ m.controller.HandleServerResponse(m.flowCount, internalErrJSON(fmt.Sprintf("bad Remoteblessing:%v", call.RemoteBlessings)))
+ return nil
+ }
+
+ m.rpcFlow = m.flowCount
+
+ // We don't return the final response until the stream is closed.
+ m.sender.Add(1)
+ go m.sendServerStream()
+ return nil
+}
+
+func (m *mockJSServer) handleValidationRequest(v interface{}) error {
+ defer func() {
+ m.flowCount += 2
+ }()
+
+ req := v.(server.CaveatValidationRequest)
+ resp := server.CaveatValidationResponse{
+ Results: make([]error, len(req.Cavs)),
+ }
+
+ var b bytes.Buffer
+ enc, err := vom.NewEncoder(&b)
+ if err != nil {
+ panic(err)
+ }
+ if err := enc.Encode(resp); err != nil {
+ panic(err)
+ }
+
+ m.controllerReady.RLock()
+ m.controller.HandleCaveatValidationResponse(m.flowCount, fmt.Sprintf("%x", b.Bytes()))
+ m.controllerReady.RUnlock()
+ return nil
+}
+
+func (m *mockJSServer) sendServerStream() {
+ defer m.sender.Done()
+ m.controllerReady.RLock()
+ for _, v := range m.serverStream {
+ m.controller.SendOnStream(m.rpcFlow, lib.VomEncodeOrDie(v), m)
+ }
+ m.controllerReady.RUnlock()
+}
+
+func (m *mockJSServer) handleStream(msg interface{}) error {
+ smsg, ok := msg.(string)
+ if !ok || len(m.expectedClientStream) == 0 {
+ m.t.Errorf("unexpected stream message: %v", msg)
+ return nil
+ }
+
+ curr := m.expectedClientStream[0]
+ m.expectedClientStream = m.expectedClientStream[1:]
+ if smsg != curr {
+ m.t.Errorf("unexpected stream message, got %s, want: %s", smsg, curr)
+ }
+ return nil
+}
+
+func (m *mockJSServer) handleStreamClose(msg interface{}) error {
+ m.sender.Wait()
+ reply := lib.ServerRpcReply{
+ Results: []*vdl.Value{m.finalResponse},
+ Err: m.finalError,
+ }
+ vomReply, err := lib.VomEncode(reply)
+ if err != nil {
+ m.t.Fatalf("Failed to serialize the reply: %v", err)
+ }
+ m.controllerReady.RLock()
+ m.controller.HandleServerResponse(m.rpcFlow, vomReply)
+ m.controllerReady.RUnlock()
+ return nil
+}
diff --git a/services/wspr/internal/app/stream.go b/services/wspr/internal/app/stream.go
new file mode 100644
index 0000000..a5c3637
--- /dev/null
+++ b/services/wspr/internal/app/stream.go
@@ -0,0 +1,96 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package app
+
+import (
+ "fmt"
+
+ "v.io/v23/rpc"
+ "v.io/x/ref/services/wspr/internal/lib"
+)
+
+type initConfig struct {
+ stream rpc.Stream
+}
+
+type Closer interface {
+ CloseSend() error
+}
+
+type message struct {
+ data string
+ writer lib.ClientWriter
+}
+
+// oustandingStream provides a stream-like api with the added ability to
+// queue up messages if the stream hasn't been initialized first. send
+// can be called before init has been called, but no data will be sent
+// until init is called.
+type outstandingStream struct {
+ // The channel on which the stream and the type
+ // of data on the stream is sent after the stream
+ // has been constructed.
+ initChan chan *initConfig
+ // The queue of messages to write out.
+ messages chan *message
+ // done will be notified when the stream has been closed.
+ done chan bool
+ // true if the stream has been closed.
+ closed bool
+}
+
+func newStream() *outstandingStream {
+ os := &outstandingStream{
+ initChan: make(chan *initConfig, 1),
+ // We allow queueing up to 100 messages before init is called.
+ // TODO(bjornick): Deal with the case that the queue is full.
+ messages: make(chan *message, 100),
+ done: make(chan bool),
+ }
+ go os.loop()
+ return os
+}
+
+func (os *outstandingStream) send(data string, w lib.ClientWriter) {
+ if !os.closed {
+ os.messages <- &message{data, w}
+ }
+}
+
+func (os *outstandingStream) end() {
+ if !os.closed {
+ close(os.messages)
+ os.closed = true
+ }
+}
+
+// Waits until the stream has been closed and all the messages
+// have been drained.
+func (os *outstandingStream) waitUntilDone() {
+ <-os.done
+}
+
+func (os *outstandingStream) loop() {
+ config := <-os.initChan
+ for msg := range os.messages {
+ var item interface{}
+ if err := lib.VomDecode(msg.data, &item); err != nil {
+ msg.writer.Error(fmt.Errorf("failed to decode stream arg from %v: %v", msg.data, err))
+ break
+ }
+ if err := config.stream.Send(item); err != nil {
+ msg.writer.Error(fmt.Errorf("failed to send on stream: %v", err))
+ }
+ }
+ close(os.done)
+ // If this is a stream that has a CloseSend, we need to call it.
+ if call, ok := config.stream.(Closer); ok {
+ call.CloseSend()
+ }
+}
+
+func (os *outstandingStream) init(stream rpc.Stream) {
+ os.initChan <- &initConfig{stream}
+}
diff --git a/services/wspr/internal/app/v23_internal_test.go b/services/wspr/internal/app/v23_internal_test.go
new file mode 100644
index 0000000..9ae5683
--- /dev/null
+++ b/services/wspr/internal/app/v23_internal_test.go
@@ -0,0 +1,17 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// This file was auto-generated via go generate.
+// DO NOT UPDATE MANUALLY
+package app
+
+import "testing"
+import "os"
+
+import "v.io/x/ref/test"
+
+func TestMain(m *testing.M) {
+ test.Init()
+ os.Exit(m.Run())
+}
diff --git a/services/wspr/internal/browspr/browspr.go b/services/wspr/internal/browspr/browspr.go
new file mode 100644
index 0000000..a8f1f37
--- /dev/null
+++ b/services/wspr/internal/browspr/browspr.go
@@ -0,0 +1,200 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Browspr is the browser version of WSPR, intended to communicate with javascript through postMessage.
+package browspr
+
+import (
+ "fmt"
+ "reflect"
+ "sync"
+
+ "v.io/v23"
+ "v.io/v23/context"
+ "v.io/v23/rpc"
+ "v.io/v23/vdl"
+ "v.io/v23/vtrace"
+ "v.io/x/lib/vlog"
+ "v.io/x/ref/services/wspr/internal/account"
+ "v.io/x/ref/services/wspr/internal/principal"
+)
+
+// Browspr is an intermediary between our javascript code and the vanadium
+// network that allows our javascript library to use vanadium.
+type Browspr struct {
+ ctx *context.T
+ listenSpec *rpc.ListenSpec
+ namespaceRoots []string
+ accountManager *account.AccountManager
+ postMessage func(instanceId int32, ty, msg string)
+ principalManager *principal.PrincipalManager
+
+ // Locks activeInstances
+ mu sync.Mutex
+ activeInstances map[int32]*pipe
+}
+
+// Create a new Browspr instance.
+func NewBrowspr(ctx *context.T,
+ postMessage func(instanceId int32, ty, msg string),
+ listenSpec *rpc.ListenSpec,
+ identd string,
+ wsNamespaceRoots []string) *Browspr {
+ if listenSpec.Proxy == "" {
+ vlog.Fatalf("a vanadium proxy must be set")
+ }
+ if identd == "" {
+ vlog.Fatalf("an identd server must be set")
+ }
+
+ browspr := &Browspr{
+ listenSpec: listenSpec,
+ namespaceRoots: wsNamespaceRoots,
+ postMessage: postMessage,
+ ctx: ctx,
+ activeInstances: make(map[int32]*pipe),
+ }
+
+ // TODO(nlacasse, bjornick) use a serializer that can actually persist.
+ var err error
+ p := v23.GetPrincipal(ctx)
+ if browspr.principalManager, err = principal.NewPrincipalManager(p, &principal.InMemorySerializer{}); err != nil {
+ vlog.Fatalf("principal.NewPrincipalManager failed: %s", err)
+ }
+
+ browspr.accountManager = account.NewAccountManager(identd, browspr.principalManager)
+
+ return browspr
+}
+
+func (b *Browspr) Shutdown() {
+ // TODO(ataly, bprosnitz): Get rid of this method if possible.
+}
+
+// CreateInstance creates a new pipe and stores it in activeInstances map.
+func (b *Browspr) createInstance(instanceId int32, origin string, namespaceRoots []string, proxy string) (*pipe, error) {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ // Make sure we don't already have an instance.
+ p, ok := b.activeInstances[instanceId]
+ if ok {
+ return nil, fmt.Errorf("InstanceId %v already has an open instance.", instanceId)
+ }
+
+ p = newPipe(b, instanceId, origin, namespaceRoots, proxy)
+ if p == nil {
+ return nil, fmt.Errorf("Could not create pipe for instanceId %v and origin %v", instanceId, origin)
+ }
+ b.activeInstances[instanceId] = p
+ return p, nil
+}
+
+// HandleMessage handles most messages from javascript and forwards them to a
+// Controller.
+func (b *Browspr) HandleMessage(instanceId int32, origin, msg string) error {
+ b.mu.Lock()
+ p, ok := b.activeInstances[instanceId]
+ b.mu.Unlock()
+ if !ok {
+ return fmt.Errorf("No pipe found for instanceId %v. Must send CreateInstance message first.", instanceId)
+ }
+
+ if origin != p.origin {
+ return fmt.Errorf("Invalid message origin. InstanceId %v has origin %v, but message is from %v.", instanceId, p.origin, origin)
+ }
+
+ return p.handleMessage(msg)
+}
+
+// HandleCleanupRpc cleans up the specified instance state. (For instance,
+// when a browser tab is closed)
+func (b *Browspr) HandleCleanupRpc(val *vdl.Value) (*vdl.Value, error) {
+ var msg CleanupMessage
+ if err := vdl.Convert(&msg, val); err != nil {
+ return nil, fmt.Errorf("HandleCleanupRpc did not receive CleanupMessage, received: %v, %v", val, err)
+ }
+
+ b.mu.Lock()
+
+ if pipe, ok := b.activeInstances[msg.InstanceId]; ok {
+ // We must unlock the mutex before calling cleanunp, otherwise
+ // browspr deadlocks.
+ b.mu.Unlock()
+ pipe.cleanup()
+
+ b.mu.Lock()
+ delete(b.activeInstances, msg.InstanceId)
+ }
+
+ b.mu.Unlock()
+
+ return nil, nil
+}
+
+// Handler for creating an account in the principal manager.
+// A valid OAuth2 access token must be supplied in the request body,
+// which is exchanged for blessings from the vanadium blessing server.
+// An account based on the blessings is then added to WSPR's principal
+// manager, and the set of blessing strings are returned to the client.
+func (b *Browspr) HandleAuthCreateAccountRpc(val *vdl.Value) (*vdl.Value, error) {
+ var msg CreateAccountMessage
+ if err := vdl.Convert(&msg, val); err != nil {
+ return nil, fmt.Errorf("HandleAuthCreateAccountRpc did not receive CreateAccountMessage, received: %v, %v", val, err)
+ }
+
+ ctx, _ := vtrace.SetNewTrace(b.ctx)
+ account, err := b.accountManager.CreateAccount(ctx, msg.Token)
+ if err != nil {
+ return nil, err
+ }
+
+ return vdl.ValueFromReflect(reflect.ValueOf(account))
+}
+
+// HandleAssociateAccountMessage associates an account with the specified origin.
+func (b *Browspr) HandleAuthAssociateAccountRpc(val *vdl.Value) (*vdl.Value, error) {
+ var msg AssociateAccountMessage
+ if err := vdl.Convert(&msg, val); err != nil {
+ return nil, fmt.Errorf("HandleAuthAssociateAccountRpc did not receive AssociateAccountMessage, received: %v, %v", val, err)
+ }
+
+ if err := b.accountManager.AssociateAccount(msg.Origin, msg.Account, msg.Caveats); err != nil {
+ return nil, err
+ }
+ return nil, nil
+}
+
+// HandleAuthGetAccountsRpc gets the root account name from the account manager.
+func (b *Browspr) HandleAuthGetAccountsRpc(*vdl.Value) (*vdl.Value, error) {
+ return vdl.ValueFromReflect(reflect.ValueOf(b.accountManager.GetAccounts()))
+}
+
+// HandleAuthOriginHasAccountRpc returns true iff the origin has an associated
+// principal.
+func (b *Browspr) HandleAuthOriginHasAccountRpc(val *vdl.Value) (*vdl.Value, error) {
+ var msg OriginHasAccountMessage
+ if err := vdl.Convert(&msg, val); err != nil {
+ return nil, fmt.Errorf("HandleAuthOriginHasAccountRpc did not receive OriginHasAccountMessage, received: %v, %v", val, err)
+ }
+
+ res := b.accountManager.OriginHasAccount(msg.Origin)
+ return vdl.ValueFromReflect(reflect.ValueOf(res))
+}
+
+// HandleCreateInstanceRpc sets the namespace root and proxy on the pipe, if
+// any are provided.
+func (b *Browspr) HandleCreateInstanceRpc(val *vdl.Value) (*vdl.Value, error) {
+ var msg CreateInstanceMessage
+ if err := vdl.Convert(&msg, val); err != nil {
+ return nil, fmt.Errorf("HandleCreateInstanceMessage did not receive CreateInstanceMessage, received: %v, %v", val, err)
+ }
+
+ _, err := b.createInstance(msg.InstanceId, msg.Origin, msg.NamespaceRoots, msg.Proxy)
+ if err != nil {
+ return nil, err
+ }
+
+ return nil, nil
+}
diff --git a/services/wspr/internal/browspr/browspr.vdl b/services/wspr/internal/browspr/browspr.vdl
new file mode 100644
index 0000000..f0016ed
--- /dev/null
+++ b/services/wspr/internal/browspr/browspr.vdl
@@ -0,0 +1,46 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package browspr
+
+import (
+ "v.io/x/ref/services/identity"
+ "v.io/x/ref/services/wspr/internal/account"
+)
+
+type StartMessage struct {
+ Identityd string
+ IdentitydBlessingRoot identity.BlessingRootResponse
+ Proxy string
+ NamespaceRoot string
+ LogLevel int32
+ LogModule string
+}
+
+type AssociateAccountMessage struct {
+ Account string
+ Origin string
+ Caveats []account.Caveat
+}
+
+type CreateAccountMessage struct {
+ Token string
+}
+
+type CleanupMessage struct {
+ InstanceId int32
+}
+
+type OriginHasAccountMessage struct {
+ Origin string
+}
+
+type GetAccountsMessage struct{}
+
+type CreateInstanceMessage struct {
+ InstanceId int32
+ Origin string
+ NamespaceRoots []string
+ Proxy string
+}
diff --git a/services/wspr/internal/browspr/browspr.vdl.go b/services/wspr/internal/browspr/browspr.vdl.go
new file mode 100644
index 0000000..e4a4600
--- /dev/null
+++ b/services/wspr/internal/browspr/browspr.vdl.go
@@ -0,0 +1,99 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// This file was auto-generated by the vanadium vdl tool.
+// Source: browspr.vdl
+
+package browspr
+
+import (
+ // VDL system imports
+ "v.io/v23/vdl"
+
+ // VDL user imports
+ "v.io/x/ref/services/identity"
+ "v.io/x/ref/services/wspr/internal/account"
+)
+
+type StartMessage struct {
+ Identityd string
+ IdentitydBlessingRoot identity.BlessingRootResponse
+ Proxy string
+ NamespaceRoot string
+ LogLevel int32
+ LogModule string
+}
+
+func (StartMessage) __VDLReflect(struct {
+ Name string "v.io/x/ref/services/wspr/internal/browspr.StartMessage"
+}) {
+}
+
+type AssociateAccountMessage struct {
+ Account string
+ Origin string
+ Caveats []account.Caveat
+}
+
+func (AssociateAccountMessage) __VDLReflect(struct {
+ Name string "v.io/x/ref/services/wspr/internal/browspr.AssociateAccountMessage"
+}) {
+}
+
+type CreateAccountMessage struct {
+ Token string
+}
+
+func (CreateAccountMessage) __VDLReflect(struct {
+ Name string "v.io/x/ref/services/wspr/internal/browspr.CreateAccountMessage"
+}) {
+}
+
+type CleanupMessage struct {
+ InstanceId int32
+}
+
+func (CleanupMessage) __VDLReflect(struct {
+ Name string "v.io/x/ref/services/wspr/internal/browspr.CleanupMessage"
+}) {
+}
+
+type OriginHasAccountMessage struct {
+ Origin string
+}
+
+func (OriginHasAccountMessage) __VDLReflect(struct {
+ Name string "v.io/x/ref/services/wspr/internal/browspr.OriginHasAccountMessage"
+}) {
+}
+
+type GetAccountsMessage struct {
+}
+
+func (GetAccountsMessage) __VDLReflect(struct {
+ Name string "v.io/x/ref/services/wspr/internal/browspr.GetAccountsMessage"
+}) {
+}
+
+type CreateInstanceMessage struct {
+ InstanceId int32
+ Origin string
+ NamespaceRoots []string
+ Proxy string
+}
+
+func (CreateInstanceMessage) __VDLReflect(struct {
+ Name string "v.io/x/ref/services/wspr/internal/browspr.CreateInstanceMessage"
+}) {
+}
+
+func init() {
+ vdl.Register((*StartMessage)(nil))
+ vdl.Register((*AssociateAccountMessage)(nil))
+ vdl.Register((*CreateAccountMessage)(nil))
+ vdl.Register((*CleanupMessage)(nil))
+ vdl.Register((*OriginHasAccountMessage)(nil))
+ vdl.Register((*GetAccountsMessage)(nil))
+ vdl.Register((*CreateInstanceMessage)(nil))
+}
diff --git a/services/wspr/internal/browspr/browspr_account_test.go b/services/wspr/internal/browspr/browspr_account_test.go
new file mode 100644
index 0000000..a5390f5
--- /dev/null
+++ b/services/wspr/internal/browspr/browspr_account_test.go
@@ -0,0 +1,219 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package browspr
+
+import (
+ "fmt"
+ "testing"
+
+ "v.io/v23"
+ "v.io/v23/context"
+ "v.io/v23/rpc"
+ "v.io/v23/security"
+ "v.io/v23/vdl"
+
+ _ "v.io/x/ref/profiles"
+ "v.io/x/ref/test"
+)
+
+const topLevelName = "mock-blesser"
+
+type mockBlesserService struct {
+ p security.Principal
+ count int
+}
+
+func newMockBlesserService(p security.Principal) *mockBlesserService {
+ return &mockBlesserService{
+ p: p,
+ count: 0,
+ }
+}
+
+func (m *mockBlesserService) BlessUsingAccessToken(c *context.T, accessToken string, co ...rpc.CallOpt) (security.Blessings, string, error) {
+ m.count++
+ name := fmt.Sprintf("%s%s%d", topLevelName, security.ChainSeparator, m.count)
+ blessing, err := m.p.BlessSelf(name)
+ if err != nil {
+ return blessing, "", err
+ }
+ return blessing, name, nil
+}
+
+func setup(t *testing.T) (*Browspr, func()) {
+ ctx, shutdown := test.InitForTest()
+
+ spec := v23.GetListenSpec(ctx)
+ spec.Proxy = "/mock/proxy"
+ mockPostMessage := func(_ int32, _, _ string) {}
+ browspr := NewBrowspr(ctx, mockPostMessage, &spec, "/mock:1234/identd", nil)
+ principal := v23.GetPrincipal(browspr.ctx)
+ browspr.accountManager.SetMockBlesser(newMockBlesserService(principal))
+
+ return browspr, func() {
+ browspr.Shutdown()
+ shutdown()
+ }
+}
+
+func TestHandleCreateAccount(t *testing.T) {
+ browspr, teardown := setup(t)
+ defer teardown()
+
+ // Verify that HandleAuthGetAccountsRpc returns empty.
+ nilValue := vdl.ValueOf(GetAccountsMessage{})
+ a, err := browspr.HandleAuthGetAccountsRpc(nilValue)
+ if err != nil {
+ t.Fatal("browspr.HandleAuthGetAccountsRpc(%v) failed: %v", nilValue, err)
+ }
+ if a.Len() > 0 {
+ t.Fatalf("Expected accounts to be empty array but got %v", a)
+ }
+
+ // Add one account.
+ message1 := vdl.ValueOf(CreateAccountMessage{
+ Token: "mock-access-token-1",
+ })
+ account1, err := browspr.HandleAuthCreateAccountRpc(message1)
+ if err != nil {
+ t.Fatalf("browspr.HandleAuthCreateAccountRpc(%v) failed: %v", message1, err)
+ }
+
+ // Verify that principalManager has the new account
+ if b, err := browspr.principalManager.BlessingsForAccount(account1.RawString()); err != nil || b.IsZero() {
+ t.Fatalf("Failed to get Blessings for account %v: got %v, %v", account1, b, err)
+ }
+
+ // Verify that HandleAuthGetAccountsRpc returns the new account.
+ gotAccounts1, err := browspr.HandleAuthGetAccountsRpc(nilValue)
+ if err != nil {
+ t.Fatal("browspr.HandleAuthGetAccountsRpc(%v) failed: %v", nilValue, err)
+ }
+ if want := vdl.ValueOf([]string{account1.RawString()}); !vdl.EqualValue(want, gotAccounts1) {
+ t.Fatalf("Expected account to be %v but got empty but got %v", want, gotAccounts1)
+ }
+
+ // Add another account
+ message2 := vdl.ValueOf(CreateAccountMessage{
+ Token: "mock-access-token-2",
+ })
+ account2, err := browspr.HandleAuthCreateAccountRpc(message2)
+ if err != nil {
+ t.Fatalf("browspr.HandleAuthCreateAccountsRpc(%v) failed: %v", message2, err)
+ }
+
+ // Verify that HandleAuthGetAccountsRpc returns the new account.
+ gotAccounts2, err := browspr.HandleAuthGetAccountsRpc(nilValue)
+ if err != nil {
+ t.Fatal("browspr.HandleAuthGetAccountsRpc(%v) failed: %v", nilValue, err)
+ }
+ if want := vdl.ValueOf([]string{account1.RawString(), account2.RawString()}); !vdl.EqualValue(want, gotAccounts2) {
+ t.Fatalf("Expected account to be %v but got empty but got %v", want, gotAccounts2)
+ }
+
+ // Verify that principalManager has both accounts
+ if b, err := browspr.principalManager.BlessingsForAccount(account1.RawString()); err != nil || b.IsZero() {
+ t.Fatalf("Failed to get Blessings for account %v: got %v, %v", account1, b, err)
+ }
+ if b, err := browspr.principalManager.BlessingsForAccount(account2.RawString()); err != nil || b.IsZero() {
+ t.Fatalf("Failed to get Blessings for account %v: got %v, %v", account2, b, err)
+ }
+}
+
+func TestHandleAssocAccount(t *testing.T) {
+ browspr, teardown := setup(t)
+ defer teardown()
+
+ // First create an account.
+ account := "mock-account"
+ principal := v23.GetPrincipal(browspr.ctx)
+ blessing, err := principal.BlessSelf(account)
+ if err != nil {
+ t.Fatalf("browspr.rt.Principal.BlessSelf(%v) failed: %v", account, err)
+ }
+ if err := browspr.principalManager.AddAccount(account, blessing); err != nil {
+ t.Fatalf("browspr.principalManager.AddAccount(%v, %v) failed; %v", account, blessing, err)
+ }
+
+ origin := "https://my.webapp.com:443"
+
+ // Verify that HandleAuthOriginHasAccountRpc returns false
+ hasAccountMessage := vdl.ValueOf(OriginHasAccountMessage{
+ Origin: origin,
+ })
+ hasAccount, err := browspr.HandleAuthOriginHasAccountRpc(hasAccountMessage)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if hasAccount.Bool() {
+ t.Fatal("Expected browspr.HandleAuthOriginHasAccountRpc(%v) to be false but was true", hasAccountMessage)
+ }
+
+ assocAccountMessage := vdl.ValueOf(AssociateAccountMessage{
+ Account: account,
+ Origin: origin,
+ })
+
+ if _, err := browspr.HandleAuthAssociateAccountRpc(assocAccountMessage); err != nil {
+ t.Fatalf("browspr.HandleAuthAssociateAccountRpc(%v) failed: %v", assocAccountMessage, err)
+ }
+
+ // Verify that HandleAuthOriginHasAccountRpc returns true
+ hasAccount, err = browspr.HandleAuthOriginHasAccountRpc(hasAccountMessage)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if !hasAccount.Bool() {
+ t.Fatal("Expected browspr.HandleAuthOriginHasAccountRpc(%v) to be true but was false", hasAccountMessage)
+ }
+
+ // Verify that principalManager has the correct principal for the origin
+ got, err := browspr.principalManager.Principal(origin)
+ if err != nil {
+ t.Fatalf("browspr.principalManager.Principal(%v) failed: %v", origin, err)
+ }
+
+ if got == nil {
+ t.Fatalf("Expected browspr.principalManager.Principal(%v) to return a valid principal, but got %v", origin, got)
+ }
+}
+
+func TestHandleAssocAccountWithMissingAccount(t *testing.T) {
+ browspr, teardown := setup(t)
+ defer teardown()
+
+ account := "mock-account"
+ origin := "https://my.webapp.com:443"
+ message := vdl.ValueOf(AssociateAccountMessage{
+ Account: account,
+ Origin: origin,
+ })
+
+ if _, err := browspr.HandleAuthAssociateAccountRpc(message); err == nil {
+ t.Fatalf("browspr.HandleAuthAssociateAccountRpc(%v) should have failed but did not.")
+ }
+
+ // Verify that principalManager creates no principal for the origin
+ got, err := browspr.principalManager.Principal(origin)
+ if err == nil {
+ t.Fatalf("Expected browspr.principalManager.Principal(%v) to fail, but got: %v", origin, got)
+ }
+
+ if got != nil {
+ t.Fatalf("Expected browspr.principalManager.Principal(%v) not to return a principal, but got %v", origin, got)
+ }
+
+ // Verify that HandleAuthOriginHasAccountRpc returns false
+ hasAccountMessage := vdl.ValueOf(OriginHasAccountMessage{
+ Origin: origin,
+ })
+ hasAccount, err := browspr.HandleAuthOriginHasAccountRpc(hasAccountMessage)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if hasAccount.Bool() {
+ t.Fatal("Expected browspr.HandleAuthOriginHasAccountRpc(%v) to be false but was true", hasAccountMessage)
+ }
+}
diff --git a/services/wspr/internal/browspr/browspr_test.go b/services/wspr/internal/browspr/browspr_test.go
new file mode 100644
index 0000000..81ab8d0
--- /dev/null
+++ b/services/wspr/internal/browspr/browspr_test.go
@@ -0,0 +1,260 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package browspr
+
+import (
+ "bytes"
+ "encoding/hex"
+ "encoding/json"
+ "strings"
+ "testing"
+ "time"
+
+ "v.io/v23"
+ "v.io/v23/context"
+ "v.io/v23/naming"
+ "v.io/v23/options"
+ "v.io/v23/rpc"
+ "v.io/v23/vdl"
+ vdltime "v.io/v23/vdlroot/time"
+ "v.io/v23/vom"
+
+ "v.io/x/ref/profiles"
+ "v.io/x/ref/services/mounttable/mounttablelib"
+ "v.io/x/ref/services/wspr/internal/app"
+ "v.io/x/ref/services/wspr/internal/lib"
+ "v.io/x/ref/test"
+)
+
+//go:generate v23 test generate
+
+func startMounttable(ctx *context.T) (rpc.Server, naming.Endpoint, error) {
+ mt, err := mounttablelib.NewMountTableDispatcher("")
+ if err != nil {
+ return nil, nil, err
+ }
+
+ s, err := v23.NewServer(ctx, options.ServesMountTable(true))
+ if err != nil {
+ return nil, nil, err
+ }
+
+ endpoints, err := s.Listen(v23.GetListenSpec(ctx))
+ if err != nil {
+ return nil, nil, err
+ }
+
+ if err := s.ServeDispatcher("", mt); err != nil {
+ return nil, nil, err
+ }
+
+ return s, endpoints[0], nil
+}
+
+type mockServer struct{}
+
+func (s mockServer) BasicCall(_ rpc.StreamServerCall, txt string) (string, error) {
+ return "[" + txt + "]", nil
+}
+
+func startMockServer(ctx *context.T, desiredName string) (rpc.Server, naming.Endpoint, error) {
+ // Create a new server instance.
+ s, err := v23.NewServer(ctx)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ endpoints, err := s.Listen(v23.GetListenSpec(ctx))
+ if err != nil {
+ return nil, nil, err
+ }
+
+ if err := s.Serve(desiredName, mockServer{}, nil); err != nil {
+ return nil, nil, err
+ }
+
+ return s, endpoints[0], nil
+}
+
+func TestBrowspr(t *testing.T) {
+ ctx, shutdown := test.InitForTest()
+ defer shutdown()
+
+ proxyShutdown, proxyEndpoint, err := profiles.NewProxy(ctx, "tcp", "127.0.0.1:0", "")
+ if err != nil {
+ t.Fatalf("Failed to start proxy: %v", err)
+ }
+ defer proxyShutdown()
+
+ mtServer, mtEndpoint, err := startMounttable(ctx)
+ if err != nil {
+ t.Fatalf("Failed to start mounttable server: %v", err)
+ }
+ defer mtServer.Stop()
+ root := mtEndpoint.Name()
+ if err := v23.GetNamespace(ctx).SetRoots(root); err != nil {
+ t.Fatalf("Failed to set namespace roots: %v", err)
+ }
+
+ mockServerName := "mock/server"
+ mockServer, mockServerEndpoint, err := startMockServer(ctx, mockServerName)
+ if err != nil {
+ t.Fatalf("Failed to start mock server: %v", err)
+ }
+ defer mockServer.Stop()
+
+ then := time.Now()
+found:
+ for {
+ status := mockServer.Status()
+ for _, v := range status.Mounts {
+ if v.Name == mockServerName && v.Server == mockServerEndpoint.String() && !v.LastMount.IsZero() {
+ if v.LastMountErr != nil {
+ t.Fatalf("Failed to mount %s: %v", v.Name, v.LastMountErr)
+ }
+ break found
+ }
+ }
+ if time.Now().Sub(then) > time.Minute {
+ t.Fatalf("Failed to find mounted server and endpoint: %v: %v", mockServerName, mtEndpoint)
+ }
+ time.Sleep(100 * time.Millisecond)
+ }
+ mountEntry, err := v23.GetNamespace(ctx).Resolve(ctx, mockServerName)
+ if err != nil {
+ t.Fatalf("Error fetching published names from mounttable: %v", err)
+ }
+
+ servers := []string{}
+ for _, s := range mountEntry.Servers {
+ if strings.Index(s.Server, "@tcp") != -1 {
+ servers = append(servers, s.Server)
+ }
+ }
+ if len(servers) != 1 || servers[0] != mockServerEndpoint.String() {
+ t.Fatalf("Incorrect names retrieved from mounttable: %v", mountEntry)
+ }
+
+ spec := v23.GetListenSpec(ctx)
+ spec.Proxy = proxyEndpoint.String()
+
+ receivedResponse := make(chan bool, 1)
+ var receivedInstanceId int32
+ var receivedType string
+ var receivedMsg string
+
+ var postMessageHandler = func(instanceId int32, ty, msg string) {
+ receivedInstanceId = instanceId
+ receivedType = ty
+ receivedMsg = msg
+ receivedResponse <- true
+ }
+
+ v23.GetNamespace(ctx).SetRoots(root)
+ browspr := NewBrowspr(ctx, postMessageHandler, &spec, "/mock:1234/identd", []string{root})
+
+ // browspr sets its namespace root to use the "ws" protocol, but we want to force "tcp" here.
+ browspr.namespaceRoots = []string{root}
+
+ browspr.accountManager.SetMockBlesser(newMockBlesserService(v23.GetPrincipal(ctx)))
+
+ msgInstanceId := int32(11)
+ msgOrigin := "http://test-origin.com"
+
+ // Associate the origin with the root accounts' blessings, otherwise a
+ // dummy account will be used and will be rejected by the authorizer.
+ accountName := "test-account"
+ bp := v23.GetPrincipal(browspr.ctx)
+ if err := browspr.principalManager.AddAccount(accountName, bp.BlessingStore().Default()); err != nil {
+ t.Fatalf("Failed to add account: %v")
+ }
+ if err := browspr.accountManager.AssociateAccount(msgOrigin, accountName, nil); err != nil {
+ t.Fatalf("Failed to associate account: %v")
+ }
+
+ rpc := app.RpcRequest{
+ Name: mockServerName,
+ Method: "BasicCall",
+ NumInArgs: 1,
+ NumOutArgs: 1,
+ IsStreaming: false,
+ Deadline: vdltime.Deadline{},
+ }
+
+ var buf bytes.Buffer
+ encoder, err := vom.NewEncoder(&buf)
+ if err != nil {
+ t.Fatalf("Failed to vom encode rpc message: %v", err)
+ }
+ if err := encoder.Encode(rpc); err != nil {
+ t.Fatalf("Failed to vom encode rpc message: %v", err)
+ }
+ if err := encoder.Encode("InputValue"); err != nil {
+ t.Fatalf("Failed to vom encode rpc message: %v", err)
+ }
+ vomRPC := hex.EncodeToString(buf.Bytes())
+
+ msg, err := json.Marshal(app.Message{
+ Id: 1,
+ Data: vomRPC,
+ Type: app.VeyronRequestMessage,
+ })
+ if err != nil {
+ t.Fatalf("Failed to marshall app message to json: %v", err)
+ }
+
+ createInstanceMessage := CreateInstanceMessage{
+ InstanceId: msgInstanceId,
+ Origin: msgOrigin,
+ NamespaceRoots: nil,
+ Proxy: "",
+ }
+ _, err = browspr.HandleCreateInstanceRpc(vdl.ValueOf(createInstanceMessage))
+
+ err = browspr.HandleMessage(msgInstanceId, msgOrigin, string(msg))
+ if err != nil {
+ t.Fatalf("Error while handling message: %v", err)
+ }
+
+ <-receivedResponse
+
+ if receivedInstanceId != msgInstanceId {
+ t.Errorf("Received unexpected instance id: %d. Expected: %d", receivedInstanceId, msgInstanceId)
+ }
+ if receivedType != "browsprMsg" {
+ t.Errorf("Received unexpected response type. Expected: %q, but got %q", "browsprMsg", receivedType)
+ }
+
+ var outMsg app.Message
+ if err := lib.VomDecode(receivedMsg, &outMsg); err != nil {
+ t.Fatalf("Failed to unmarshall outgoing message: %v", err)
+ }
+ if outMsg.Id != int32(1) {
+ t.Errorf("Id was %v, expected %v", outMsg.Id, int32(1))
+ }
+ if outMsg.Type != app.VeyronRequestMessage {
+ t.Errorf("Message type was %v, expected %v", outMsg.Type, app.MessageType(0))
+ }
+
+ var responseMsg lib.Response
+ if err := lib.VomDecode(outMsg.Data, &responseMsg); err != nil {
+ t.Fatalf("Failed to unmarshall outgoing response: %v", err)
+ }
+ if responseMsg.Type != lib.ResponseFinal {
+ t.Errorf("Data was %q, expected %q", outMsg.Data, `["[InputValue]"]`)
+ }
+ var outArg string
+ var ok bool
+ if outArg, ok = responseMsg.Message.(string); !ok {
+ t.Errorf("Got unexpected response message body of type %T, expected type string", responseMsg.Message)
+ }
+ var result app.RpcResponse
+ if err := lib.VomDecode(outArg, &result); err != nil {
+ t.Errorf("Failed to vom decode args from %v: %v", outArg, err)
+ }
+ if got, want := result.OutArgs[0], vdl.StringValue("[InputValue]"); !vdl.EqualValue(got, want) {
+ t.Errorf("Result got %v, want %v", got, want)
+ }
+}
diff --git a/services/wspr/internal/browspr/file_serializer_nacl.go b/services/wspr/internal/browspr/file_serializer_nacl.go
new file mode 100644
index 0000000..ceed808
--- /dev/null
+++ b/services/wspr/internal/browspr/file_serializer_nacl.go
@@ -0,0 +1,65 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package browspr
+
+import (
+ "io"
+ "os"
+ "runtime/ppapi"
+)
+
+// fileSerializer implements vsecurity.SerializerReaderWriter that persists state to
+// files with the pepper API.
+type fileSerializer struct {
+ system ppapi.FileSystem
+ data *ppapi.FileIO
+ signature *ppapi.FileIO
+
+ dataFile string
+ signatureFile string
+}
+
+func (fs *fileSerializer) Readers() (data io.ReadCloser, sig io.ReadCloser, err error) {
+ if fs.data == nil || fs.signature == nil {
+ return nil, nil, nil
+ }
+ return fs.data, fs.signature, nil
+}
+
+func (fs *fileSerializer) Writers() (data io.WriteCloser, sig io.WriteCloser, err error) {
+ // Remove previous version of the files
+ fs.system.Remove(fs.dataFile)
+ fs.system.Remove(fs.signatureFile)
+ if fs.data, err = fs.system.Create(fs.dataFile); err != nil {
+ return nil, nil, err
+ }
+ if fs.signature, err = fs.system.Create(fs.signatureFile); err != nil {
+ return nil, nil, err
+ }
+ return fs.data, fs.signature, nil
+}
+
+func fileNotExist(err error) bool {
+ pe, ok := err.(*os.PathError)
+ return ok && pe.Err.Error() == "file not found"
+}
+
+func NewFileSerializer(dataFile, signatureFile string, system ppapi.FileSystem) (*fileSerializer, error) {
+ data, err := system.Open(dataFile)
+ if err != nil && !fileNotExist(err) {
+ return nil, err
+ }
+ signature, err := system.Open(signatureFile)
+ if err != nil && !fileNotExist(err) {
+ return nil, err
+ }
+ return &fileSerializer{
+ system: system,
+ data: data,
+ signature: signature,
+ dataFile: dataFile,
+ signatureFile: signatureFile,
+ }, nil
+}
diff --git a/services/wspr/internal/browspr/pipe.go b/services/wspr/internal/browspr/pipe.go
new file mode 100644
index 0000000..138e3ef
--- /dev/null
+++ b/services/wspr/internal/browspr/pipe.go
@@ -0,0 +1,101 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package browspr
+
+import (
+ "encoding/json"
+ "fmt"
+
+ "v.io/x/lib/vlog"
+ "v.io/x/ref/services/wspr/internal/app"
+ "v.io/x/ref/services/wspr/internal/lib"
+)
+
+// pipe controls the flow of messages for a specific instance (corresponding to a specific tab).
+type pipe struct {
+ browspr *Browspr
+ controller *app.Controller
+ origin string
+ instanceId int32
+}
+
+func newPipe(b *Browspr, instanceId int32, origin string, namespaceRoots []string, proxy string) *pipe {
+ pipe := &pipe{
+ browspr: b,
+ origin: origin,
+ instanceId: instanceId,
+ }
+
+ // TODO(bprosnitz) LookupPrincipal() maybe should not take a string in the future.
+ p, err := b.accountManager.LookupPrincipal(origin)
+ if err != nil {
+ // TODO(nlacasse, bjornick): This code should go away once we
+ // start requiring authentication. At that point, we should
+ // just return an error to the client.
+ vlog.Errorf("No principal associated with origin %v, creating a new principal with self-signed blessing from browspr: %v", origin, err)
+
+ dummyAccount, err := b.principalManager.DummyAccount()
+ if err != nil {
+ vlog.Errorf("principalManager.DummyAccount() failed: %v", err)
+ return nil
+ }
+
+ if err := b.accountManager.AssociateAccount(origin, dummyAccount, nil); err != nil {
+ vlog.Errorf("accountManager.AssociateAccount(%v, %v, %v) failed: %v", origin, dummyAccount, nil, err)
+ return nil
+ }
+ p, err = b.accountManager.LookupPrincipal(origin)
+ if err != nil {
+ return nil
+ }
+ }
+
+ // Shallow copy browspr's default listenSpec. If we have passed in a
+ // proxy, set listenSpec.proxy.
+ listenSpec := *b.listenSpec
+ if proxy != "" {
+ listenSpec.Proxy = proxy
+ }
+
+ // If we have been passed in namespace roots, pass them to NewController, otherwise pass browspr's default.
+ if namespaceRoots == nil {
+ namespaceRoots = b.namespaceRoots
+ }
+
+ pipe.controller, err = app.NewController(b.ctx, pipe.createWriter, &listenSpec, namespaceRoots, p)
+ if err != nil {
+ vlog.Errorf("Could not create controller: %v", err)
+ return nil
+ }
+
+ return pipe
+}
+
+func (p *pipe) createWriter(messageId int32) lib.ClientWriter {
+ return &postMessageWriter{
+ messageId: messageId,
+ p: p,
+ }
+}
+
+func (p *pipe) cleanup() {
+ vlog.VI(0).Info("Cleaning up pipe")
+ p.controller.Cleanup()
+}
+
+func (p *pipe) handleMessage(jsonMsg string) error {
+ var msg app.Message
+ if err := json.Unmarshal([]byte(jsonMsg), &msg); err != nil {
+ fullErr := fmt.Errorf("Can't unmarshall message: %s error: %v", jsonMsg, err)
+ // Send the failed to unmarshal error to the client.
+ errWriter := &postMessageWriter{p: p}
+ errWriter.Error(fullErr)
+ return fullErr
+ }
+
+ writer := p.createWriter(msg.Id)
+ p.controller.HandleIncomingMessage(msg, writer)
+ return nil
+}
diff --git a/services/wspr/internal/browspr/v23_internal_test.go b/services/wspr/internal/browspr/v23_internal_test.go
new file mode 100644
index 0000000..9b21db0
--- /dev/null
+++ b/services/wspr/internal/browspr/v23_internal_test.go
@@ -0,0 +1,17 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// This file was auto-generated via go generate.
+// DO NOT UPDATE MANUALLY
+package browspr
+
+import "testing"
+import "os"
+
+import "v.io/x/ref/test"
+
+func TestMain(m *testing.M) {
+ test.Init()
+ os.Exit(m.Run())
+}
diff --git a/services/wspr/internal/browspr/writer.go b/services/wspr/internal/browspr/writer.go
new file mode 100644
index 0000000..17222f1
--- /dev/null
+++ b/services/wspr/internal/browspr/writer.go
@@ -0,0 +1,30 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package browspr
+
+import (
+ "v.io/x/ref/services/wspr/internal/app"
+ "v.io/x/ref/services/wspr/internal/lib"
+)
+
+// postMessageWriter is a lib.ClientWriter that handles sending messages over postMessage to the extension.
+type postMessageWriter struct {
+ messageId int32
+ p *pipe
+}
+
+func (w *postMessageWriter) Send(messageType lib.ResponseType, data interface{}) error {
+ outMsg, err := app.ConstructOutgoingMessage(w.messageId, messageType, data)
+ if err != nil {
+ return err
+ }
+
+ w.p.browspr.postMessage(w.p.instanceId, "browsprMsg", outMsg)
+ return nil
+}
+
+func (w *postMessageWriter) Error(err error) {
+ w.Send(lib.ResponseError, app.FormatAsVerror(err))
+}
diff --git a/services/wspr/internal/channel/channel.go b/services/wspr/internal/channel/channel.go
new file mode 100644
index 0000000..d7863f9
--- /dev/null
+++ b/services/wspr/internal/channel/channel.go
@@ -0,0 +1,113 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package channel
+
+import (
+ "fmt"
+ "sync"
+
+ "v.io/v23/vdl"
+)
+
+type RequestHandler func(*vdl.Value) (*vdl.Value, error)
+
+type MessageSender func(Message)
+
+type Channel struct {
+ messageHandler MessageSender
+
+ lastSeq uint32
+ handlers map[string]RequestHandler
+ pendingResponses map[uint32]chan Response
+ lock sync.Mutex
+}
+
+func NewChannel(messageHandler MessageSender) *Channel {
+ return &Channel{
+ messageHandler: messageHandler,
+ handlers: map[string]RequestHandler{},
+ pendingResponses: map[uint32]chan Response{},
+ }
+}
+
+func (c *Channel) PerformRpc(typ string, body *vdl.Value) (*vdl.Value, error) {
+ c.lock.Lock()
+ c.lastSeq++
+ lastSeq := c.lastSeq
+ m := MessageRequest{Request{
+ Type: typ,
+ Seq: lastSeq,
+ Body: body,
+ }}
+ pending := make(chan Response, 1)
+ c.pendingResponses[lastSeq] = pending
+ c.lock.Unlock()
+
+ go c.messageHandler(m)
+ response := <-pending
+
+ c.lock.Lock()
+ delete(c.pendingResponses, lastSeq)
+ c.lock.Unlock()
+
+ if response.Err == "" {
+ return response.Body, nil
+ }
+ return response.Body, fmt.Errorf(response.Err)
+}
+
+func (c *Channel) RegisterRequestHandler(typ string, handler RequestHandler) {
+ c.lock.Lock()
+ c.handlers[typ] = handler
+ c.lock.Unlock()
+}
+
+func (c *Channel) handleRequest(req Request) {
+ // Call handler.
+ c.lock.Lock()
+ handler, ok := c.handlers[req.Type]
+ c.lock.Unlock()
+ if !ok {
+ panic(fmt.Errorf("Unknown handler: %s", req.Type))
+ }
+
+ result, err := handler(req.Body)
+ errMsg := ""
+ if err != nil {
+ errMsg = err.Error()
+ }
+ m := MessageResponse{Response{
+ ReqSeq: req.Seq,
+ Err: errMsg,
+ Body: result,
+ }}
+ c.messageHandler(m)
+}
+
+func (c *Channel) handleResponse(resp Response) {
+ seq := resp.ReqSeq
+ c.lock.Lock()
+ pendingResponse, ok := c.pendingResponses[seq]
+ c.lock.Unlock()
+ if !ok {
+ panic("Received invalid response code")
+ }
+
+ pendingResponse <- resp
+}
+
+func (c *Channel) HandleMessage(m Message) {
+ switch r := m.(type) {
+ // Run the handlers in goroutines so we don't block the main thread.
+ // This is particularly important for the request handler, since it can
+ // potentially do a lot of work.
+ case MessageRequest:
+ go c.handleRequest(r.Value)
+ case MessageResponse:
+ go c.handleResponse(r.Value)
+ default:
+ panic(fmt.Sprintf("Unknown message type: %T", m))
+ }
+}
diff --git a/services/wspr/internal/channel/channel.vdl b/services/wspr/internal/channel/channel.vdl
new file mode 100644
index 0000000..0fc7ebc
--- /dev/null
+++ b/services/wspr/internal/channel/channel.vdl
@@ -0,0 +1,22 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package channel
+
+type Request struct {
+ Type string
+ Seq uint32
+ Body any
+}
+
+type Response struct {
+ ReqSeq uint32
+ Err string // TODO(bprosnitz) change this back to error when it is possible to do so. (issue 368)
+ Body any
+}
+
+type Message union {
+ Request Request
+ Response Response
+}
diff --git a/services/wspr/internal/channel/channel.vdl.go b/services/wspr/internal/channel/channel.vdl.go
new file mode 100644
index 0000000..34c163f
--- /dev/null
+++ b/services/wspr/internal/channel/channel.vdl.go
@@ -0,0 +1,78 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// This file was auto-generated by the vanadium vdl tool.
+// Source: channel.vdl
+
+package channel
+
+import (
+ // VDL system imports
+ "v.io/v23/vdl"
+)
+
+type Request struct {
+ Type string
+ Seq uint32
+ Body *vdl.Value
+}
+
+func (Request) __VDLReflect(struct {
+ Name string "v.io/x/ref/services/wspr/internal/channel.Request"
+}) {
+}
+
+type Response struct {
+ ReqSeq uint32
+ Err string // TODO(bprosnitz) change this back to error when it is possible to do so. (issue 368)
+ Body *vdl.Value
+}
+
+func (Response) __VDLReflect(struct {
+ Name string "v.io/x/ref/services/wspr/internal/channel.Response"
+}) {
+}
+
+type (
+ // Message represents any single field of the Message union type.
+ Message interface {
+ // Index returns the field index.
+ Index() int
+ // Interface returns the field value as an interface.
+ Interface() interface{}
+ // Name returns the field name.
+ Name() string
+ // __VDLReflect describes the Message union type.
+ __VDLReflect(__MessageReflect)
+ }
+ // MessageRequest represents field Request of the Message union type.
+ MessageRequest struct{ Value Request }
+ // MessageResponse represents field Response of the Message union type.
+ MessageResponse struct{ Value Response }
+ // __MessageReflect describes the Message union type.
+ __MessageReflect struct {
+ Name string "v.io/x/ref/services/wspr/internal/channel.Message"
+ Type Message
+ Union struct {
+ Request MessageRequest
+ Response MessageResponse
+ }
+ }
+)
+
+func (x MessageRequest) Index() int { return 0 }
+func (x MessageRequest) Interface() interface{} { return x.Value }
+func (x MessageRequest) Name() string { return "Request" }
+func (x MessageRequest) __VDLReflect(__MessageReflect) {}
+
+func (x MessageResponse) Index() int { return 1 }
+func (x MessageResponse) Interface() interface{} { return x.Value }
+func (x MessageResponse) Name() string { return "Response" }
+func (x MessageResponse) __VDLReflect(__MessageReflect) {}
+
+func init() {
+ vdl.Register((*Request)(nil))
+ vdl.Register((*Response)(nil))
+ vdl.Register((*Message)(nil))
+}
diff --git a/services/wspr/internal/channel/channel_nacl/channel_nacl.go b/services/wspr/internal/channel/channel_nacl/channel_nacl.go
new file mode 100644
index 0000000..347a355
--- /dev/null
+++ b/services/wspr/internal/channel/channel_nacl/channel_nacl.go
@@ -0,0 +1,72 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package channel_nacl
+
+import (
+ "bytes"
+ "fmt"
+ "runtime/ppapi"
+
+ "v.io/v23/vdl"
+ "v.io/v23/vom"
+ "v.io/x/ref/services/wspr/internal/channel" // contains most of the logic, factored out for testing
+)
+
+type Channel struct {
+ impl *channel.Channel
+ ppapiInst ppapi.Instance
+}
+
+func sendMessageToBrowser(ppapiInst ppapi.Instance, m channel.Message) {
+ var outBuf bytes.Buffer
+ enc, err := vom.NewEncoder(&outBuf)
+ if err != nil {
+ panic(fmt.Sprintf("Error beginning encoding: %v", err))
+ }
+ if err := enc.Encode(m); err != nil {
+ panic(fmt.Sprintf("Error encoding message %v: %v", m, err))
+ }
+ outVar := ppapi.VarFromByteSlice(outBuf.Bytes())
+ ppapiInst.PostMessage(outVar)
+}
+
+func NewChannel(ppapiInst ppapi.Instance) *Channel {
+ sendMessageFunc := func(m channel.Message) {
+ sendMessageToBrowser(ppapiInst, m)
+ }
+ return &Channel{
+ impl: channel.NewChannel(sendMessageFunc),
+ ppapiInst: ppapiInst,
+ }
+}
+
+func (c *Channel) RegisterRequestHandler(typ string, handler channel.RequestHandler) {
+ c.impl.RegisterRequestHandler(typ, handler)
+}
+
+func (c *Channel) PerformRpc(typ string, body *vdl.Value) (*vdl.Value, error) {
+ return c.impl.PerformRpc(typ, body)
+}
+
+func (c *Channel) HandleMessage(v ppapi.Var) {
+ // Read input message
+ b, err := v.AsByteSlice()
+ if err != nil {
+ panic(fmt.Sprintf("Cannot convert message to byte slice: %v", err))
+ }
+
+ buf := bytes.NewBuffer(b)
+ dec, err := vom.NewDecoder(buf)
+ if err != nil {
+ panic(fmt.Sprintf("Error beginning decoding: %v", err))
+ }
+
+ var m channel.Message
+ if err := dec.Decode(&m); err != nil {
+ panic(fmt.Sprintf("Error decoding message: %v", err))
+ }
+
+ c.impl.HandleMessage(m)
+}
diff --git a/services/wspr/internal/channel/channel_test.go b/services/wspr/internal/channel/channel_test.go
new file mode 100644
index 0000000..4513bd1
--- /dev/null
+++ b/services/wspr/internal/channel/channel_test.go
@@ -0,0 +1,109 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package channel_test
+
+import (
+ "fmt"
+ "sync"
+ "testing"
+
+ "v.io/v23/vdl"
+ "v.io/x/ref/services/wspr/internal/channel"
+)
+
+func TestChannelRpcs(t *testing.T) {
+ // Two channels are used and different test send in different directions.
+ var bHandler channel.MessageSender
+ channelA := channel.NewChannel(func(msg channel.Message) {
+ bHandler(msg)
+ })
+ channelB := channel.NewChannel(channelA.HandleMessage)
+ bHandler = channelB.HandleMessage
+
+ type testCase struct {
+ SendChannel *channel.Channel
+ RecvChannel *channel.Channel
+ Type string
+ ReqVal int
+ RespVal int
+ Err error
+ }
+
+ // The list of tests to run concurrently in goroutines.
+ // Half of the tests are with different type keys to test multiple type keys.
+ // Half of the tests use the same type key
+ // One test returns an error.
+ tests := []testCase{}
+ const reusedTypeName string = "reusedTypeName"
+ expectedNumSuccessfulEachDirection := 128
+ for i := 0; i < expectedNumSuccessfulEachDirection; i++ {
+ tests = append(tests, testCase{channelA, channelB, fmt.Sprintf("Type%d", i), i, i + 1000, nil})
+ tests = append(tests, testCase{channelB, channelA, reusedTypeName, -i - 1, -i - 1001, nil})
+ }
+ expectedNumFailures := 1
+ tests = append(tests, testCase{channelB, channelA, "Type3", 0, 0, fmt.Errorf("TestError")})
+ expectedNumCalls := expectedNumSuccessfulEachDirection*2 + expectedNumFailures
+ callCountLock := sync.Mutex{}
+ numCalls := 0
+
+ // reusedHandler handles requests to the same type name.
+ reusedHandler := func(v *vdl.Value) (*vdl.Value, error) {
+ callCountLock.Lock()
+ numCalls++
+ callCountLock.Unlock()
+
+ return vdl.Int64Value(v.Int() - 1000), nil
+ }
+
+ wg := sync.WaitGroup{}
+ wg.Add(len(tests))
+ var testGoRoutine = func(i int, test testCase) {
+ defer wg.Done()
+
+ // Get the message handler. Either the reused handle or a unique handle for this
+ // test, depending on the type name.
+ var handler func(v *vdl.Value) (*vdl.Value, error)
+ if test.Type == reusedTypeName {
+ handler = reusedHandler
+ } else {
+ handler = func(v *vdl.Value) (*vdl.Value, error) {
+ callCountLock.Lock()
+ numCalls++
+ callCountLock.Unlock()
+
+ if got, want := v, vdl.Int64Value(int64(test.ReqVal)); !vdl.EqualValue(got, want) {
+ t.Errorf("For test %d, got %v, want %v", i, got, want)
+ }
+ return vdl.Int64Value(int64(test.RespVal)), test.Err
+ }
+ }
+ test.RecvChannel.RegisterRequestHandler(test.Type, handler)
+
+ // Perform the RPC.
+ result, err := test.SendChannel.PerformRpc(test.Type, vdl.Int64Value(int64(test.ReqVal)))
+ if test.Err != nil {
+ if err == nil {
+ t.Errorf("For test %d, expected an error but didn't get one", i)
+ }
+ } else {
+ if err != nil {
+ t.Errorf("For test %d, received unexpected error %v", i, err)
+ return
+ }
+ if got, want := result, vdl.Int64Value(int64(test.RespVal)); !vdl.EqualValue(got, want) {
+ t.Errorf("For test %d, got %v, want %v", i, got, want)
+ }
+ }
+ }
+ for i, test := range tests {
+ go testGoRoutine(i, test)
+ }
+
+ wg.Wait()
+
+ if numCalls != expectedNumCalls {
+ t.Errorf("Expected to receive %d rpcs, but only got %d", expectedNumCalls, numCalls)
+ }
+}
diff --git a/services/wspr/internal/lib/case.go b/services/wspr/internal/lib/case.go
new file mode 100644
index 0000000..4c5fbba
--- /dev/null
+++ b/services/wspr/internal/lib/case.go
@@ -0,0 +1,21 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package lib
+
+import "unicode"
+
+func LowercaseFirstCharacter(s string) string {
+ for _, r := range s {
+ return string(unicode.ToLower(r)) + s[1:]
+ }
+ return ""
+}
+
+func UppercaseFirstCharacter(s string) string {
+ for _, r := range s {
+ return string(unicode.ToUpper(r)) + s[1:]
+ }
+ return ""
+}
diff --git a/services/wspr/internal/lib/signature_manager.go b/services/wspr/internal/lib/signature_manager.go
new file mode 100644
index 0000000..41271e8
--- /dev/null
+++ b/services/wspr/internal/lib/signature_manager.go
@@ -0,0 +1,127 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package lib
+
+import (
+ "sync"
+ "time"
+
+ "v.io/v23/context"
+ "v.io/v23/rpc"
+ "v.io/v23/rpc/reserved"
+ "v.io/v23/vdlroot/signature"
+ "v.io/v23/verror"
+)
+
+type SignatureManager interface {
+ Signature(ctx *context.T, name string, opts ...rpc.CallOpt) ([]signature.Interface, error)
+ FlushCacheEntry(name string)
+}
+
+// signatureManager can be used to discover the signature of a remote service
+// It has built-in caching and TTL support.
+type signatureManager struct {
+ // protects the cache and initialization
+ sync.Mutex
+
+ // map of name to service signature and last-accessed time
+ // TODO(aghassemi) GC for expired cache entries
+ cache map[string]*cacheEntry
+
+ // we keep track of current pending request so we don't issue
+ // multiple signature requests to the same server simultaneously.
+ pendingSignatures map[string]chan struct{}
+}
+
+// NewSignatureManager creates and initialized a new Signature Manager
+func NewSignatureManager() SignatureManager {
+ return &signatureManager{
+ cache: make(map[string]*cacheEntry),
+ pendingSignatures: map[string]chan struct{}{},
+ }
+}
+
+const (
+ // ttl from the last-accessed time.
+ ttl = time.Duration(time.Hour)
+)
+
+type cacheEntry struct {
+ sig []signature.Interface
+ lastAccessed time.Time
+}
+
+// expired returns whether the cache entry is expired or not
+func (c cacheEntry) expired() bool {
+ return time.Now().Sub(c.lastAccessed) > ttl
+}
+
+func (sm *signatureManager) lookupCacheLocked(name string) []signature.Interface {
+ if entry := sm.cache[name]; entry != nil && !entry.expired() {
+ entry.lastAccessed = time.Now()
+ return entry.sig
+ }
+ return nil
+}
+
+// Signature fetches the signature for the given service name. It either
+// returns the signature from the cache, or blocks until it fetches the
+// signature from the remote server.
+func (sm *signatureManager) Signature(ctx *context.T, name string, opts ...rpc.CallOpt) ([]signature.Interface, error) {
+ sm.Lock()
+
+ if sigs := sm.lookupCacheLocked(name); sigs != nil {
+ sm.Unlock()
+ return sigs, nil
+ }
+
+ ch, found := sm.pendingSignatures[name]
+
+ if !found {
+ ch = make(chan struct{})
+ sm.pendingSignatures[name] = ch
+ }
+ sm.Unlock()
+
+ if found {
+ <-ch
+ // If the channel is closed then we know that the outstanding request finished
+ // if it failed then there will not be a valid entry in the cache.
+ sm.Lock()
+ result := sm.lookupCacheLocked(name)
+ sm.Unlock()
+ var err error
+ if result == nil {
+ return nil, verror.New(verror.ErrNoServers, ctx, name, err)
+
+ }
+ return result, nil
+ }
+
+ // Fetch from the remote server.
+ sig, err := reserved.Signature(ctx, name, opts...)
+ sm.Lock()
+ // On cleanup we need to close the channel, remove the entry from the
+ defer func() {
+ close(ch)
+ delete(sm.pendingSignatures, name)
+ sm.Unlock()
+ }()
+ if err != nil {
+ return nil, verror.New(verror.ErrNoServers, ctx, name, err)
+ }
+
+ // Add to the cache.
+ sm.cache[name] = &cacheEntry{
+ sig: sig,
+ lastAccessed: time.Now(),
+ }
+ return sig, nil
+}
+
+// FlushCacheEntry removes the cached signature for the given name
+func (sm *signatureManager) FlushCacheEntry(name string) {
+ delete(sm.cache, name)
+}
diff --git a/services/wspr/internal/lib/signature_manager_test.go b/services/wspr/internal/lib/signature_manager_test.go
new file mode 100644
index 0000000..0f0de0e
--- /dev/null
+++ b/services/wspr/internal/lib/signature_manager_test.go
@@ -0,0 +1,169 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package lib
+
+import (
+ "reflect"
+ "sync"
+ "testing"
+
+ "v.io/v23"
+ "v.io/v23/context"
+ "v.io/v23/vdl"
+ "v.io/v23/vdlroot/signature"
+ "v.io/x/ref/profiles/fake"
+ "v.io/x/ref/test"
+)
+
+const (
+ name = "/vanadium/name"
+)
+
+func initContext(t *testing.T) (*context.T, clientWithTimesCalled, v23.Shutdown) {
+ ctx, shutdown := test.InitForTest()
+ initialSig := []signature.Interface{
+ {
+ Methods: []signature.Method{
+ {
+ Name: "Method1",
+ InArgs: []signature.Arg{{Type: vdl.StringType}},
+ },
+ },
+ },
+ }
+ client := newSimpleClient(
+ map[string][]interface{}{
+ "__Signature": []interface{}{initialSig},
+ },
+ )
+ ctx = fake.SetClient(ctx, client)
+ return ctx, client, shutdown
+}
+
+func TestFetching(t *testing.T) {
+ ctx, _, shutdown := initContext(t)
+ defer shutdown()
+
+ sm := NewSignatureManager()
+ got, err := sm.Signature(ctx, name)
+ if err != nil {
+ t.Errorf(`Did not expect an error but got %v`, err)
+ return
+ }
+
+ want := []signature.Interface{
+ {
+ Methods: []signature.Method{
+ {
+ Name: "Method1",
+ InArgs: []signature.Arg{{Type: vdl.StringType}},
+ },
+ },
+ },
+ }
+ if !reflect.DeepEqual(got, want) {
+ t.Errorf(`Signature got %v, want %v`, got, want)
+ }
+}
+
+func TestThatCachedAfterFetching(t *testing.T) {
+ ctx, _, shutdown := initContext(t)
+ defer shutdown()
+
+ sm := NewSignatureManager().(*signatureManager)
+ sig, _ := sm.Signature(ctx, name)
+ cache, ok := sm.cache[name]
+ if !ok {
+ t.Errorf(`Signature manager did not cache the results`)
+ return
+ }
+ if got, want := cache.sig, sig; !reflect.DeepEqual(got, want) {
+ t.Errorf(`Cached signature got %v, want %v`, got, want)
+ }
+}
+
+func TestThatCacheIsUsed(t *testing.T) {
+ ctx, client, shutdown := initContext(t)
+ defer shutdown()
+
+ // call twice
+ sm := NewSignatureManager()
+ sm.Signature(ctx, name)
+ sm.Signature(ctx, name)
+
+ // expect number of calls to Signature method of client to still be 1 since cache
+ // should have been used despite the second call
+ if client.TimesCalled("__Signature") != 1 {
+ t.Errorf("Signature cache was not used for the second call")
+ }
+}
+
+func TestThatLastAccessedGetUpdated(t *testing.T) {
+ ctx, _, shutdown := initContext(t)
+ defer shutdown()
+
+ sm := NewSignatureManager().(*signatureManager)
+ sm.Signature(ctx, name)
+ // make last accessed be in the past to account for the fact that
+ // two consecutive calls to time.Now() can return identical values.
+ sm.cache[name].lastAccessed = sm.cache[name].lastAccessed.Add(-ttl / 2)
+ prevAccess := sm.cache[name].lastAccessed
+
+ // access again
+ sm.Signature(ctx, name)
+ newAccess := sm.cache[name].lastAccessed
+
+ if !newAccess.After(prevAccess) {
+ t.Errorf("LastAccessed was not updated for cache entry")
+ }
+}
+
+func TestThatTTLExpires(t *testing.T) {
+ ctx, client, shutdown := initContext(t)
+ defer shutdown()
+
+ sm := NewSignatureManager().(*signatureManager)
+ sm.Signature(ctx, name)
+
+ // make last accessed go over the ttl
+ sm.cache[name].lastAccessed = sm.cache[name].lastAccessed.Add(-2 * ttl)
+
+ // make a second call
+ sm.Signature(ctx, name)
+
+ // expect number of calls to Signature method of client to be 2 since cache should have expired
+ if client.TimesCalled("__Signature") != 2 {
+ t.Errorf("Cache was still used but TTL had passed. It should have been fetched again")
+ }
+}
+
+func TestConcurrency(t *testing.T) {
+ ctx, client, shutdown := initContext(t)
+ defer shutdown()
+
+ sm := NewSignatureManager().(*signatureManager)
+ var wg sync.WaitGroup
+
+ wg.Add(2)
+ // Even though the signature calls return immediately in the fake client,
+ // running this with the race detector should find races if the locking is done
+ // poorly.
+ go func() {
+ sm.Signature(ctx, name)
+ wg.Done()
+ }()
+
+ go func() {
+ sm.Signature(ctx, name)
+ wg.Done()
+ }()
+
+ wg.Wait()
+ // expect number of calls to Signature method of client to be 1 since the second call should
+ // wait until the first finished.
+ if client.TimesCalled("__Signature") != 1 {
+ t.Errorf("__Signature should only be called once.")
+ }
+}
diff --git a/services/wspr/internal/lib/simple_client.go b/services/wspr/internal/lib/simple_client.go
new file mode 100644
index 0000000..f57cfc6
--- /dev/null
+++ b/services/wspr/internal/lib/simple_client.go
@@ -0,0 +1,139 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package lib
+
+import (
+ "errors"
+ "fmt"
+ "sync"
+
+ "v.io/v23/context"
+ "v.io/v23/rpc"
+ "v.io/v23/security"
+ "v.io/v23/vdl"
+ "v.io/v23/vom"
+ "v.io/x/lib/vlog"
+)
+
+type clientWithTimesCalled interface {
+ rpc.Client
+ TimesCalled(method string) int
+}
+
+// NewSimpleClient creates a new mocked rpc client where the given map of method name
+// to outputs is used for evaluating the method calls.
+// It also adds some testing features such as counters for number of times a method is called
+func newSimpleClient(methodsResults map[string][]interface{}) clientWithTimesCalled {
+ return &simpleMockClient{
+ results: methodsResults,
+ timesCalled: make(map[string]int),
+ }
+}
+
+// simpleMockClient implements rpc.Client
+type simpleMockClient struct {
+ // Protects timesCalled
+ sync.Mutex
+
+ // results is a map of method names to results
+ results map[string][]interface{}
+ // timesCalled is a counter for number of times StartCall is called on a specific method name
+ timesCalled map[string]int
+}
+
+// TimesCalled returns number of times the given method has been called.
+func (c *simpleMockClient) TimesCalled(method string) int {
+ return c.timesCalled[method]
+}
+
+// StartCall Implements rpc.Client
+func (c *simpleMockClient) StartCall(ctx *context.T, name, method string, args []interface{}, opts ...rpc.CallOpt) (rpc.ClientCall, error) {
+ defer vlog.LogCall()()
+ results, ok := c.results[method]
+ if !ok {
+ return nil, errors.New(fmt.Sprintf("method %s not found", method))
+ }
+
+ // Copy the results so that they can be modified without effecting the original.
+ // This must be done via vom encode and decode rather than a direct deep copy because (among other reasons)
+ // reflect-based deep copy on vdl.Type objects will fail because of their private fields. This is not a problem with vom
+ // as it manually creates the type objects. It is also more realistic to use the same mechanism as the ultimate calls.
+ vomBytes, err := vom.Encode(results)
+ if err != nil {
+ panic(fmt.Sprintf("Error copying value with vom (failed on encode): %v", err))
+ }
+ var copiedResults []interface{}
+ if err := vom.Decode(vomBytes, &copiedResults); err != nil {
+ panic(fmt.Sprintf("Error copying value with vom (failed on decode): %v", err))
+ }
+
+ clientCall := mockCall{
+ results: copiedResults,
+ }
+
+ c.Lock()
+ c.timesCalled[method]++
+ c.Unlock()
+
+ return &clientCall, nil
+}
+
+// Close implements rpc.Client
+func (*simpleMockClient) Close() {
+ defer vlog.LogCall()()
+}
+
+// mockCall implements rpc.ClientCall
+type mockCall struct {
+ mockStream
+ results []interface{}
+}
+
+// Cancel implements rpc.ClientCall
+func (*mockCall) Cancel() {
+ defer vlog.LogCall()()
+}
+
+// CloseSend implements rpc.ClientCall
+func (*mockCall) CloseSend() error {
+ defer vlog.LogCall()()
+ return nil
+}
+
+// Finish implements rpc.ClientCall
+func (mc *mockCall) Finish(resultptrs ...interface{}) error {
+ defer vlog.LogCall()()
+ if got, want := len(resultptrs), len(mc.results); got != want {
+ return errors.New(fmt.Sprintf("wrong number of output results; expected resultptrs of size %d but got %d", want, got))
+ }
+ for ax, res := range resultptrs {
+ if mc.results[ax] != nil {
+ if err := vdl.Convert(res, mc.results[ax]); err != nil {
+ panic(fmt.Sprintf("Error converting out argument %#v: %v", mc.results[ax], err))
+ }
+ }
+ }
+ return nil
+}
+
+// RemoteBlessings implements rpc.ClientCall
+func (*mockCall) RemoteBlessings() ([]string, security.Blessings) {
+ return []string{}, security.Blessings{}
+}
+
+//mockStream implements rpc.Stream
+type mockStream struct{}
+
+//Send implements rpc.Stream
+func (*mockStream) Send(interface{}) error {
+ defer vlog.LogCall()()
+ return nil
+}
+
+//Recv implements rpc.Stream
+func (*mockStream) Recv(interface{}) error {
+ defer vlog.LogCall()()
+ return nil
+}
diff --git a/services/wspr/internal/lib/simple_client_test.go b/services/wspr/internal/lib/simple_client_test.go
new file mode 100644
index 0000000..ed7f8c2
--- /dev/null
+++ b/services/wspr/internal/lib/simple_client_test.go
@@ -0,0 +1,143 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package lib
+
+import (
+ "testing"
+
+ "v.io/v23/context"
+)
+
+func testContext() *context.T {
+ ctx, _ := context.RootContext()
+ return ctx
+}
+
+func TestSuccessfulCalls(t *testing.T) {
+
+ method1ExpectedResult := []interface{}{"one", 2}
+ method2ExpectedResult := []interface{}{"one"}
+ method3ExpectedResult := []interface{}{nil}
+
+ client := newSimpleClient(map[string][]interface{}{
+ "method1": method1ExpectedResult,
+ "method2": method2ExpectedResult,
+ "method3": method3ExpectedResult,
+ })
+
+ ctx := testContext()
+
+ // method1
+ method1Call, err := client.StartCall(ctx, "name/obj", "method1", []interface{}{})
+ if err != nil {
+ t.Errorf("StartCall: did not expect an error return")
+ return
+ }
+ var resultOne string
+ var resultTwo int64
+ method1Call.Finish(&resultOne, &resultTwo)
+ if resultOne != "one" {
+ t.Errorf(`FinishCall: first result was "%v", want "one"`, resultOne)
+ return
+ }
+ if resultTwo != 2 {
+ t.Errorf(`FinishCall: second result was "%v", want 2`, resultTwo)
+ return
+ }
+
+ // method2
+ method2Call, err := client.StartCall(ctx, "name/obj", "method2", []interface{}{})
+ if err != nil {
+ t.Errorf(`StartCall: did not expect an error return`)
+ return
+ }
+ method2Call.Finish(&resultOne)
+ if resultOne != "one" {
+ t.Errorf(`FinishCall: result "%v", want "one"`, resultOne)
+ return
+ }
+
+ // method3
+ var result interface{}
+ method3Call, err := client.StartCall(ctx, "name/obj", "method3", []interface{}{})
+ if err != nil {
+ t.Errorf(`StartCall: did not expect an error return`)
+ return
+ }
+ method3Call.Finish(&result)
+ if result != nil {
+ t.Errorf(`FinishCall: result "%v", want nil`, result)
+ return
+ }
+}
+
+type sampleStruct struct {
+ Name string
+}
+
+func TestStructResult(t *testing.T) {
+ client := newSimpleClient(map[string][]interface{}{
+ "foo": []interface{}{
+ sampleStruct{Name: "bar"},
+ },
+ })
+ ctx := testContext()
+ call, _ := client.StartCall(ctx, "name/obj", "foo", []interface{}{})
+ var result sampleStruct
+ call.Finish(&result)
+ if result.Name != "bar" {
+ t.Errorf(`FinishCall: second result was "%v", want "bar"`, result.Name)
+ return
+ }
+}
+
+func TestErrorCall(t *testing.T) {
+ client := newSimpleClient(map[string][]interface{}{
+ "bar": []interface{}{},
+ })
+ ctx := testContext()
+ _, err := client.StartCall(ctx, "name/obj", "wrongMethodName", []interface{}{})
+ if err == nil {
+ t.Errorf(`StartCall: should have returned an error on invalid method name`)
+ return
+ }
+}
+
+func TestNumberOfCalls(t *testing.T) {
+ client := newSimpleClient(map[string][]interface{}{
+ "method1": []interface{}{},
+ "method2": []interface{}{},
+ })
+
+ errMsg := "Expected method to be called %d times but it was called %d"
+ ctx := testContext()
+
+ // method 1
+ if n := client.TimesCalled("method1"); n != 0 {
+ t.Errorf(errMsg, 0, n)
+ return
+ }
+ client.StartCall(ctx, "name/of/object", "method1", []interface{}{})
+ if n := client.TimesCalled("method1"); n != 1 {
+ t.Errorf(errMsg, 1, n)
+ return
+ }
+ client.StartCall(ctx, "name/of/object", "method1", []interface{}{})
+ if n := client.TimesCalled("method1"); n != 2 {
+ t.Errorf(errMsg, 2, n)
+ return
+ }
+
+ // method 2
+ if n := client.TimesCalled("method2"); n != 0 {
+ t.Errorf(errMsg, 0, n)
+ return
+ }
+ client.StartCall(ctx, "name/of/object", "method2", []interface{}{})
+ if n := client.TimesCalled("method2"); n != 1 {
+ t.Errorf(errMsg, 1, n)
+ return
+ }
+}
diff --git a/services/wspr/internal/lib/testwriter/writer.go b/services/wspr/internal/lib/testwriter/writer.go
new file mode 100644
index 0000000..01c78d4
--- /dev/null
+++ b/services/wspr/internal/lib/testwriter/writer.go
@@ -0,0 +1,112 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package testwriter
+
+import (
+ "bytes"
+ "encoding/json"
+ "fmt"
+ "reflect"
+ "sync"
+ "time"
+
+ "v.io/v23/verror"
+ "v.io/x/ref/services/wspr/internal/lib"
+)
+
+type TestHarness interface {
+ Errorf(fmt string, a ...interface{})
+}
+
+type Writer struct {
+ sync.Mutex
+ Stream []lib.Response // TODO Why not use channel?
+ err error
+ // If this channel is set then a message will be sent
+ // to this channel after recieving a call to FinishMessage()
+ notifier chan bool
+}
+
+func (w *Writer) Send(responseType lib.ResponseType, msg interface{}) error {
+ // We serialize and deserialize the reponse so that we can do deep equal with
+ // messages that contain non-exported structs.
+ var buf bytes.Buffer
+ if err := json.NewEncoder(&buf).Encode(lib.Response{Type: responseType, Message: msg}); err != nil {
+ return err
+ }
+
+ var r lib.Response
+
+ if err := json.NewDecoder(&buf).Decode(&r); err != nil {
+ return err
+ }
+
+ w.Lock()
+ defer w.Unlock()
+ w.Stream = append(w.Stream, r)
+ if w.notifier != nil {
+ w.notifier <- true
+ }
+ return nil
+
+}
+
+// ImmediatelyConsumeItem consumes an item on the stream without waiting.
+func (w *Writer) ImmediatelyConsumeItem() (lib.Response, error) {
+ w.Lock()
+ defer w.Unlock()
+
+ if len(w.Stream) < 1 {
+ return lib.Response{}, fmt.Errorf("Expected an item on the stream, none found")
+ }
+
+ item := w.Stream[0]
+ w.Stream = w.Stream[1:]
+
+ return item, nil
+}
+
+func (w *Writer) Error(err error) {
+ w.err = err
+}
+
+func (w *Writer) streamLength() int {
+ w.Lock()
+ defer w.Unlock()
+ return len(w.Stream)
+}
+
+// Waits until there is at least n messages in w.Stream. Returns an error if we timeout
+// waiting for the given number of messages.
+func (w *Writer) WaitForMessage(n int) error {
+ if w.streamLength() >= n {
+ return nil
+ }
+ w.Lock()
+ w.notifier = make(chan bool, 1)
+ w.Unlock()
+ for w.streamLength() < n {
+ select {
+ case <-w.notifier:
+ continue
+ case <-time.After(10 * time.Second):
+ return fmt.Errorf("timed out")
+ }
+ }
+ w.Lock()
+ w.notifier = nil
+ w.Unlock()
+ return nil
+}
+
+func CheckResponses(w *Writer, wantStream []lib.Response, wantErr error) error {
+ if got, want := w.Stream, wantStream; !reflect.DeepEqual(got, want) {
+ return fmt.Errorf("streams don't match: got %#v, want %#v", got, want)
+ }
+ if got, want := w.err, wantErr; verror.ErrorID(got) != verror.ErrorID(want) {
+ return fmt.Errorf("unexpected error, got: %#v, expected: %#v", got, want)
+ }
+ return nil
+}
diff --git a/services/wspr/internal/lib/time.go b/services/wspr/internal/lib/time.go
new file mode 100644
index 0000000..fa37d70
--- /dev/null
+++ b/services/wspr/internal/lib/time.go
@@ -0,0 +1,23 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package lib
+
+import "time"
+
+// Javascript uses millisecond time units. This is both because they are the
+// native time unit, and because otherwise JS numbers would not be capable of
+// representing the full time range available to Go programs.
+//
+// TODO(bjornick,toddw): Pick a better sentry for no timeout, or change to using
+// VDL time.WireDeadline.
+const JSRPCNoTimeout = int64(6307200000000) // 200 years in milliseconds
+
+func GoToJSDuration(d time.Duration) int64 {
+ return int64(d / time.Millisecond)
+}
+
+func JSToGoDuration(d int64) time.Duration {
+ return time.Duration(d) * time.Millisecond
+}
diff --git a/services/wspr/internal/lib/vom.go b/services/wspr/internal/lib/vom.go
new file mode 100644
index 0000000..d3f437a
--- /dev/null
+++ b/services/wspr/internal/lib/vom.go
@@ -0,0 +1,44 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package lib
+
+import (
+ "bytes"
+ "encoding/hex"
+
+ "v.io/v23/vom"
+)
+
+func VomEncode(v interface{}) (string, error) {
+ var buf bytes.Buffer
+ encoder, err := vom.NewEncoder(&buf)
+ if err != nil {
+ return "", err
+ }
+ if err := encoder.Encode(v); err != nil {
+ return "", err
+ }
+ return hex.EncodeToString(buf.Bytes()), nil
+}
+
+func VomEncodeOrDie(v interface{}) string {
+ s, err := VomEncode(v)
+ if err != nil {
+ panic(err)
+ }
+ return s
+}
+
+func VomDecode(data string, v interface{}) error {
+ binbytes, err := hex.DecodeString(data)
+ if err != nil {
+ return err
+ }
+ decoder, err := vom.NewDecoder(bytes.NewReader(binbytes))
+ if err != nil {
+ return err
+ }
+ return decoder.Decode(v)
+}
diff --git a/services/wspr/internal/lib/writer.go b/services/wspr/internal/lib/writer.go
new file mode 100644
index 0000000..a1ccecc
--- /dev/null
+++ b/services/wspr/internal/lib/writer.go
@@ -0,0 +1,33 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package lib
+
+type ResponseType int32
+
+const (
+ ResponseFinal ResponseType = 0
+ ResponseStream = 1
+ ResponseError = 2
+ ResponseServerRequest = 3
+ ResponseStreamClose = 4
+ ResponseDispatcherLookup = 5
+ ResponseAuthRequest = 6
+ ResponseCancel = 7
+ ResponseValidate = 8 // Request to validate caveats.
+ ResponseLog = 9 // Sends a message to be logged.
+)
+
+type Response struct {
+ Type ResponseType
+ Message interface{}
+}
+
+// This is basically an io.Writer interface, that allows passing error message
+// strings. This is how the proxy will talk to the javascript/java clients.
+type ClientWriter interface {
+ Send(messageType ResponseType, data interface{}) error
+
+ Error(err error)
+}
diff --git a/services/wspr/internal/lib/writer.vdl b/services/wspr/internal/lib/writer.vdl
new file mode 100644
index 0000000..ce94e21
--- /dev/null
+++ b/services/wspr/internal/lib/writer.vdl
@@ -0,0 +1,24 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package lib
+
+import "v.io/v23/vtrace"
+
+// The response from the javascript server to the proxy.
+type ServerRpcReply struct {
+ Results []any
+ Err error
+ TraceResponse vtrace.Response
+}
+
+type LogLevel enum {
+ Info
+ Error
+}
+
+type LogMessage struct {
+ Level LogLevel
+ Message string
+}
diff --git a/services/wspr/internal/lib/writer.vdl.go b/services/wspr/internal/lib/writer.vdl.go
new file mode 100644
index 0000000..1930733
--- /dev/null
+++ b/services/wspr/internal/lib/writer.vdl.go
@@ -0,0 +1,92 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// This file was auto-generated by the vanadium vdl tool.
+// Source: writer.vdl
+
+package lib
+
+import (
+ // VDL system imports
+ "fmt"
+ "v.io/v23/vdl"
+
+ // VDL user imports
+ "v.io/v23/vtrace"
+)
+
+// The response from the javascript server to the proxy.
+type ServerRpcReply struct {
+ Results []*vdl.Value
+ Err error
+ TraceResponse vtrace.Response
+}
+
+func (ServerRpcReply) __VDLReflect(struct {
+ Name string "v.io/x/ref/services/wspr/internal/lib.ServerRpcReply"
+}) {
+}
+
+type LogLevel int
+
+const (
+ LogLevelInfo LogLevel = iota
+ LogLevelError
+)
+
+// LogLevelAll holds all labels for LogLevel.
+var LogLevelAll = [...]LogLevel{LogLevelInfo, LogLevelError}
+
+// LogLevelFromString creates a LogLevel from a string label.
+func LogLevelFromString(label string) (x LogLevel, err error) {
+ err = x.Set(label)
+ return
+}
+
+// Set assigns label to x.
+func (x *LogLevel) Set(label string) error {
+ switch label {
+ case "Info", "info":
+ *x = LogLevelInfo
+ return nil
+ case "Error", "error":
+ *x = LogLevelError
+ return nil
+ }
+ *x = -1
+ return fmt.Errorf("unknown label %q in lib.LogLevel", label)
+}
+
+// String returns the string label of x.
+func (x LogLevel) String() string {
+ switch x {
+ case LogLevelInfo:
+ return "Info"
+ case LogLevelError:
+ return "Error"
+ }
+ return ""
+}
+
+func (LogLevel) __VDLReflect(struct {
+ Name string "v.io/x/ref/services/wspr/internal/lib.LogLevel"
+ Enum struct{ Info, Error string }
+}) {
+}
+
+type LogMessage struct {
+ Level LogLevel
+ Message string
+}
+
+func (LogMessage) __VDLReflect(struct {
+ Name string "v.io/x/ref/services/wspr/internal/lib.LogMessage"
+}) {
+}
+
+func init() {
+ vdl.Register((*ServerRpcReply)(nil))
+ vdl.Register((*LogLevel)(nil))
+ vdl.Register((*LogMessage)(nil))
+}
diff --git a/services/wspr/internal/namespace/namespace.vdl b/services/wspr/internal/namespace/namespace.vdl
new file mode 100644
index 0000000..bfb3849
--- /dev/null
+++ b/services/wspr/internal/namespace/namespace.vdl
@@ -0,0 +1,44 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package namespace defines an RPC services that allows remoting of the
+// namespace client library over the wire. This is useful for
+// javascript so it doesn't have to implement the library.
+// This should be kept in sync with the namespace library (v.io/v23/naming).
+package namespace
+
+import (
+ "time"
+
+ "v.io/v23/naming"
+ "v.io/v23/security/access"
+)
+
+type Namespace interface {
+ // Run a glob query and stream the results.
+ Glob(pattern string) stream<_, naming.GlobReply> error
+ // Mount mounts a server under the given name.
+ Mount(name, server string, ttl time.Duration, replace bool) error
+ // Unmount removes an existing mount point.
+ Unmount(name, server string) error
+ // Resolve resolves a name to an address.
+ Resolve(name string) ([]string | error)
+ // ResolveToMountTable resolves a name to the address of the mounttable
+ // directly hosting it.
+ ResolveToMountTable(name string) ([]string | error)
+ // FlushCacheEntry removes the namespace cache entry for a given name.
+ FlushCacheEntry(name string) (bool | error)
+ // DisableCache disables the naming cache.
+ DisableCache(disable bool) error
+ // Roots returns the addresses of the current mounttable roots.
+ Roots() ([]string | error)
+ // SetRoots sets the current mounttable roots.
+ SetRoots(roots []string) error
+ // SetPermissions sets the AccessList in a node in a mount table.
+ SetPermissions(name string, acl access.Permissions, etag string) error
+ // GetPermissions returns the AccessList in a node in a mount table.
+ GetPermissions(name string) (acl access.Permissions, etag string | error)
+ // Delete deletes the name from the mounttable and, if requested, any subtree.
+ Delete(name string, deleteSubtree bool) error
+}
diff --git a/services/wspr/internal/namespace/namespace.vdl.go b/services/wspr/internal/namespace/namespace.vdl.go
new file mode 100644
index 0000000..a15e808
--- /dev/null
+++ b/services/wspr/internal/namespace/namespace.vdl.go
@@ -0,0 +1,552 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// This file was auto-generated by the vanadium vdl tool.
+// Source: namespace.vdl
+
+// Package namespace defines an RPC services that allows remoting of the
+// namespace client library over the wire. This is useful for
+// javascript so it doesn't have to implement the library.
+// This should be kept in sync with the namespace library (v.io/v23/naming).
+package namespace
+
+import (
+ // VDL system imports
+ "io"
+ "v.io/v23"
+ "v.io/v23/context"
+ "v.io/v23/rpc"
+
+ // VDL user imports
+ "time"
+ "v.io/v23/naming"
+ "v.io/v23/security/access"
+ _ "v.io/v23/vdlroot/time"
+)
+
+// NamespaceClientMethods is the client interface
+// containing Namespace methods.
+type NamespaceClientMethods interface {
+ // Run a glob query and stream the results.
+ Glob(ctx *context.T, pattern string, opts ...rpc.CallOpt) (NamespaceGlobClientCall, error)
+ // Mount mounts a server under the given name.
+ Mount(ctx *context.T, name string, server string, ttl time.Duration, replace bool, opts ...rpc.CallOpt) error
+ // Unmount removes an existing mount point.
+ Unmount(ctx *context.T, name string, server string, opts ...rpc.CallOpt) error
+ // Resolve resolves a name to an address.
+ Resolve(ctx *context.T, name string, opts ...rpc.CallOpt) ([]string, error)
+ // ResolveToMountTable resolves a name to the address of the mounttable
+ // directly hosting it.
+ ResolveToMountTable(ctx *context.T, name string, opts ...rpc.CallOpt) ([]string, error)
+ // FlushCacheEntry removes the namespace cache entry for a given name.
+ FlushCacheEntry(ctx *context.T, name string, opts ...rpc.CallOpt) (bool, error)
+ // DisableCache disables the naming cache.
+ DisableCache(ctx *context.T, disable bool, opts ...rpc.CallOpt) error
+ // Roots returns the addresses of the current mounttable roots.
+ Roots(*context.T, ...rpc.CallOpt) ([]string, error)
+ // SetRoots sets the current mounttable roots.
+ SetRoots(ctx *context.T, roots []string, opts ...rpc.CallOpt) error
+ // SetPermissions sets the AccessList in a node in a mount table.
+ SetPermissions(ctx *context.T, name string, acl access.Permissions, etag string, opts ...rpc.CallOpt) error
+ // GetPermissions returns the AccessList in a node in a mount table.
+ GetPermissions(ctx *context.T, name string, opts ...rpc.CallOpt) (acl access.Permissions, etag string, err error)
+ // Delete deletes the name from the mounttable and, if requested, any subtree.
+ Delete(ctx *context.T, name string, deleteSubtree bool, opts ...rpc.CallOpt) error
+}
+
+// NamespaceClientStub adds universal methods to NamespaceClientMethods.
+type NamespaceClientStub interface {
+ NamespaceClientMethods
+ rpc.UniversalServiceMethods
+}
+
+// NamespaceClient returns a client stub for Namespace.
+func NamespaceClient(name string) NamespaceClientStub {
+ return implNamespaceClientStub{name}
+}
+
+type implNamespaceClientStub struct {
+ name string
+}
+
+func (c implNamespaceClientStub) Glob(ctx *context.T, i0 string, opts ...rpc.CallOpt) (ocall NamespaceGlobClientCall, err error) {
+ var call rpc.ClientCall
+ if call, err = v23.GetClient(ctx).StartCall(ctx, c.name, "Glob", []interface{}{i0}, opts...); err != nil {
+ return
+ }
+ ocall = &implNamespaceGlobClientCall{ClientCall: call}
+ return
+}
+
+func (c implNamespaceClientStub) Mount(ctx *context.T, i0 string, i1 string, i2 time.Duration, i3 bool, opts ...rpc.CallOpt) (err error) {
+ var call rpc.ClientCall
+ if call, err = v23.GetClient(ctx).StartCall(ctx, c.name, "Mount", []interface{}{i0, i1, i2, i3}, opts...); err != nil {
+ return
+ }
+ err = call.Finish()
+ return
+}
+
+func (c implNamespaceClientStub) Unmount(ctx *context.T, i0 string, i1 string, opts ...rpc.CallOpt) (err error) {
+ var call rpc.ClientCall
+ if call, err = v23.GetClient(ctx).StartCall(ctx, c.name, "Unmount", []interface{}{i0, i1}, opts...); err != nil {
+ return
+ }
+ err = call.Finish()
+ return
+}
+
+func (c implNamespaceClientStub) Resolve(ctx *context.T, i0 string, opts ...rpc.CallOpt) (o0 []string, err error) {
+ var call rpc.ClientCall
+ if call, err = v23.GetClient(ctx).StartCall(ctx, c.name, "Resolve", []interface{}{i0}, opts...); err != nil {
+ return
+ }
+ err = call.Finish(&o0)
+ return
+}
+
+func (c implNamespaceClientStub) ResolveToMountTable(ctx *context.T, i0 string, opts ...rpc.CallOpt) (o0 []string, err error) {
+ var call rpc.ClientCall
+ if call, err = v23.GetClient(ctx).StartCall(ctx, c.name, "ResolveToMountTable", []interface{}{i0}, opts...); err != nil {
+ return
+ }
+ err = call.Finish(&o0)
+ return
+}
+
+func (c implNamespaceClientStub) FlushCacheEntry(ctx *context.T, i0 string, opts ...rpc.CallOpt) (o0 bool, err error) {
+ var call rpc.ClientCall
+ if call, err = v23.GetClient(ctx).StartCall(ctx, c.name, "FlushCacheEntry", []interface{}{i0}, opts...); err != nil {
+ return
+ }
+ err = call.Finish(&o0)
+ return
+}
+
+func (c implNamespaceClientStub) DisableCache(ctx *context.T, i0 bool, opts ...rpc.CallOpt) (err error) {
+ var call rpc.ClientCall
+ if call, err = v23.GetClient(ctx).StartCall(ctx, c.name, "DisableCache", []interface{}{i0}, opts...); err != nil {
+ return
+ }
+ err = call.Finish()
+ return
+}
+
+func (c implNamespaceClientStub) Roots(ctx *context.T, opts ...rpc.CallOpt) (o0 []string, err error) {
+ var call rpc.ClientCall
+ if call, err = v23.GetClient(ctx).StartCall(ctx, c.name, "Roots", nil, opts...); err != nil {
+ return
+ }
+ err = call.Finish(&o0)
+ return
+}
+
+func (c implNamespaceClientStub) SetRoots(ctx *context.T, i0 []string, opts ...rpc.CallOpt) (err error) {
+ var call rpc.ClientCall
+ if call, err = v23.GetClient(ctx).StartCall(ctx, c.name, "SetRoots", []interface{}{i0}, opts...); err != nil {
+ return
+ }
+ err = call.Finish()
+ return
+}
+
+func (c implNamespaceClientStub) SetPermissions(ctx *context.T, i0 string, i1 access.Permissions, i2 string, opts ...rpc.CallOpt) (err error) {
+ var call rpc.ClientCall
+ if call, err = v23.GetClient(ctx).StartCall(ctx, c.name, "SetPermissions", []interface{}{i0, i1, i2}, opts...); err != nil {
+ return
+ }
+ err = call.Finish()
+ return
+}
+
+func (c implNamespaceClientStub) GetPermissions(ctx *context.T, i0 string, opts ...rpc.CallOpt) (o0 access.Permissions, o1 string, err error) {
+ var call rpc.ClientCall
+ if call, err = v23.GetClient(ctx).StartCall(ctx, c.name, "GetPermissions", []interface{}{i0}, opts...); err != nil {
+ return
+ }
+ err = call.Finish(&o0, &o1)
+ return
+}
+
+func (c implNamespaceClientStub) Delete(ctx *context.T, i0 string, i1 bool, opts ...rpc.CallOpt) (err error) {
+ var call rpc.ClientCall
+ if call, err = v23.GetClient(ctx).StartCall(ctx, c.name, "Delete", []interface{}{i0, i1}, opts...); err != nil {
+ return
+ }
+ err = call.Finish()
+ return
+}
+
+// NamespaceGlobClientStream is the client stream for Namespace.Glob.
+type NamespaceGlobClientStream interface {
+ // RecvStream returns the receiver side of the Namespace.Glob client stream.
+ RecvStream() interface {
+ // Advance stages an item so that it may be retrieved via Value. Returns
+ // true iff there is an item to retrieve. Advance must be called before
+ // Value is called. May block if an item is not available.
+ Advance() bool
+ // Value returns the item that was staged by Advance. May panic if Advance
+ // returned false or was not called. Never blocks.
+ Value() naming.GlobReply
+ // Err returns any error encountered by Advance. Never blocks.
+ Err() error
+ }
+}
+
+// NamespaceGlobClientCall represents the call returned from Namespace.Glob.
+type NamespaceGlobClientCall interface {
+ NamespaceGlobClientStream
+ // Finish blocks until the server is done, and returns the positional return
+ // values for call.
+ //
+ // Finish returns immediately if the call has been canceled; depending on the
+ // timing the output could either be an error signaling cancelation, or the
+ // valid positional return values from the server.
+ //
+ // Calling Finish is mandatory for releasing stream resources, unless the call
+ // has been canceled or any of the other methods return an error. Finish should
+ // be called at most once.
+ Finish() error
+}
+
+type implNamespaceGlobClientCall struct {
+ rpc.ClientCall
+ valRecv naming.GlobReply
+ errRecv error
+}
+
+func (c *implNamespaceGlobClientCall) RecvStream() interface {
+ Advance() bool
+ Value() naming.GlobReply
+ Err() error
+} {
+ return implNamespaceGlobClientCallRecv{c}
+}
+
+type implNamespaceGlobClientCallRecv struct {
+ c *implNamespaceGlobClientCall
+}
+
+func (c implNamespaceGlobClientCallRecv) Advance() bool {
+ c.c.errRecv = c.c.Recv(&c.c.valRecv)
+ return c.c.errRecv == nil
+}
+func (c implNamespaceGlobClientCallRecv) Value() naming.GlobReply {
+ return c.c.valRecv
+}
+func (c implNamespaceGlobClientCallRecv) Err() error {
+ if c.c.errRecv == io.EOF {
+ return nil
+ }
+ return c.c.errRecv
+}
+func (c *implNamespaceGlobClientCall) Finish() (err error) {
+ err = c.ClientCall.Finish()
+ return
+}
+
+// NamespaceServerMethods is the interface a server writer
+// implements for Namespace.
+type NamespaceServerMethods interface {
+ // Run a glob query and stream the results.
+ Glob(call NamespaceGlobServerCall, pattern string) error
+ // Mount mounts a server under the given name.
+ Mount(call rpc.ServerCall, name string, server string, ttl time.Duration, replace bool) error
+ // Unmount removes an existing mount point.
+ Unmount(call rpc.ServerCall, name string, server string) error
+ // Resolve resolves a name to an address.
+ Resolve(call rpc.ServerCall, name string) ([]string, error)
+ // ResolveToMountTable resolves a name to the address of the mounttable
+ // directly hosting it.
+ ResolveToMountTable(call rpc.ServerCall, name string) ([]string, error)
+ // FlushCacheEntry removes the namespace cache entry for a given name.
+ FlushCacheEntry(call rpc.ServerCall, name string) (bool, error)
+ // DisableCache disables the naming cache.
+ DisableCache(call rpc.ServerCall, disable bool) error
+ // Roots returns the addresses of the current mounttable roots.
+ Roots(rpc.ServerCall) ([]string, error)
+ // SetRoots sets the current mounttable roots.
+ SetRoots(call rpc.ServerCall, roots []string) error
+ // SetPermissions sets the AccessList in a node in a mount table.
+ SetPermissions(call rpc.ServerCall, name string, acl access.Permissions, etag string) error
+ // GetPermissions returns the AccessList in a node in a mount table.
+ GetPermissions(call rpc.ServerCall, name string) (acl access.Permissions, etag string, err error)
+ // Delete deletes the name from the mounttable and, if requested, any subtree.
+ Delete(call rpc.ServerCall, name string, deleteSubtree bool) error
+}
+
+// NamespaceServerStubMethods is the server interface containing
+// Namespace methods, as expected by rpc.Server.
+// The only difference between this interface and NamespaceServerMethods
+// is the streaming methods.
+type NamespaceServerStubMethods interface {
+ // Run a glob query and stream the results.
+ Glob(call *NamespaceGlobServerCallStub, pattern string) error
+ // Mount mounts a server under the given name.
+ Mount(call rpc.ServerCall, name string, server string, ttl time.Duration, replace bool) error
+ // Unmount removes an existing mount point.
+ Unmount(call rpc.ServerCall, name string, server string) error
+ // Resolve resolves a name to an address.
+ Resolve(call rpc.ServerCall, name string) ([]string, error)
+ // ResolveToMountTable resolves a name to the address of the mounttable
+ // directly hosting it.
+ ResolveToMountTable(call rpc.ServerCall, name string) ([]string, error)
+ // FlushCacheEntry removes the namespace cache entry for a given name.
+ FlushCacheEntry(call rpc.ServerCall, name string) (bool, error)
+ // DisableCache disables the naming cache.
+ DisableCache(call rpc.ServerCall, disable bool) error
+ // Roots returns the addresses of the current mounttable roots.
+ Roots(rpc.ServerCall) ([]string, error)
+ // SetRoots sets the current mounttable roots.
+ SetRoots(call rpc.ServerCall, roots []string) error
+ // SetPermissions sets the AccessList in a node in a mount table.
+ SetPermissions(call rpc.ServerCall, name string, acl access.Permissions, etag string) error
+ // GetPermissions returns the AccessList in a node in a mount table.
+ GetPermissions(call rpc.ServerCall, name string) (acl access.Permissions, etag string, err error)
+ // Delete deletes the name from the mounttable and, if requested, any subtree.
+ Delete(call rpc.ServerCall, name string, deleteSubtree bool) error
+}
+
+// NamespaceServerStub adds universal methods to NamespaceServerStubMethods.
+type NamespaceServerStub interface {
+ NamespaceServerStubMethods
+ // Describe the Namespace interfaces.
+ Describe__() []rpc.InterfaceDesc
+}
+
+// NamespaceServer returns a server stub for Namespace.
+// It converts an implementation of NamespaceServerMethods into
+// an object that may be used by rpc.Server.
+func NamespaceServer(impl NamespaceServerMethods) NamespaceServerStub {
+ stub := implNamespaceServerStub{
+ impl: impl,
+ }
+ // Initialize GlobState; always check the stub itself first, to handle the
+ // case where the user has the Glob method defined in their VDL source.
+ if gs := rpc.NewGlobState(stub); gs != nil {
+ stub.gs = gs
+ } else if gs := rpc.NewGlobState(impl); gs != nil {
+ stub.gs = gs
+ }
+ return stub
+}
+
+type implNamespaceServerStub struct {
+ impl NamespaceServerMethods
+ gs *rpc.GlobState
+}
+
+func (s implNamespaceServerStub) Glob(call *NamespaceGlobServerCallStub, i0 string) error {
+ return s.impl.Glob(call, i0)
+}
+
+func (s implNamespaceServerStub) Mount(call rpc.ServerCall, i0 string, i1 string, i2 time.Duration, i3 bool) error {
+ return s.impl.Mount(call, i0, i1, i2, i3)
+}
+
+func (s implNamespaceServerStub) Unmount(call rpc.ServerCall, i0 string, i1 string) error {
+ return s.impl.Unmount(call, i0, i1)
+}
+
+func (s implNamespaceServerStub) Resolve(call rpc.ServerCall, i0 string) ([]string, error) {
+ return s.impl.Resolve(call, i0)
+}
+
+func (s implNamespaceServerStub) ResolveToMountTable(call rpc.ServerCall, i0 string) ([]string, error) {
+ return s.impl.ResolveToMountTable(call, i0)
+}
+
+func (s implNamespaceServerStub) FlushCacheEntry(call rpc.ServerCall, i0 string) (bool, error) {
+ return s.impl.FlushCacheEntry(call, i0)
+}
+
+func (s implNamespaceServerStub) DisableCache(call rpc.ServerCall, i0 bool) error {
+ return s.impl.DisableCache(call, i0)
+}
+
+func (s implNamespaceServerStub) Roots(call rpc.ServerCall) ([]string, error) {
+ return s.impl.Roots(call)
+}
+
+func (s implNamespaceServerStub) SetRoots(call rpc.ServerCall, i0 []string) error {
+ return s.impl.SetRoots(call, i0)
+}
+
+func (s implNamespaceServerStub) SetPermissions(call rpc.ServerCall, i0 string, i1 access.Permissions, i2 string) error {
+ return s.impl.SetPermissions(call, i0, i1, i2)
+}
+
+func (s implNamespaceServerStub) GetPermissions(call rpc.ServerCall, i0 string) (access.Permissions, string, error) {
+ return s.impl.GetPermissions(call, i0)
+}
+
+func (s implNamespaceServerStub) Delete(call rpc.ServerCall, i0 string, i1 bool) error {
+ return s.impl.Delete(call, i0, i1)
+}
+
+func (s implNamespaceServerStub) Globber() *rpc.GlobState {
+ return s.gs
+}
+
+func (s implNamespaceServerStub) Describe__() []rpc.InterfaceDesc {
+ return []rpc.InterfaceDesc{NamespaceDesc}
+}
+
+// NamespaceDesc describes the Namespace interface.
+var NamespaceDesc rpc.InterfaceDesc = descNamespace
+
+// descNamespace hides the desc to keep godoc clean.
+var descNamespace = rpc.InterfaceDesc{
+ Name: "Namespace",
+ PkgPath: "v.io/x/ref/services/wspr/internal/namespace",
+ Methods: []rpc.MethodDesc{
+ {
+ Name: "Glob",
+ Doc: "// Run a glob query and stream the results.",
+ InArgs: []rpc.ArgDesc{
+ {"pattern", ``}, // string
+ },
+ },
+ {
+ Name: "Mount",
+ Doc: "// Mount mounts a server under the given name.",
+ InArgs: []rpc.ArgDesc{
+ {"name", ``}, // string
+ {"server", ``}, // string
+ {"ttl", ``}, // time.Duration
+ {"replace", ``}, // bool
+ },
+ },
+ {
+ Name: "Unmount",
+ Doc: "// Unmount removes an existing mount point.",
+ InArgs: []rpc.ArgDesc{
+ {"name", ``}, // string
+ {"server", ``}, // string
+ },
+ },
+ {
+ Name: "Resolve",
+ Doc: "// Resolve resolves a name to an address.",
+ InArgs: []rpc.ArgDesc{
+ {"name", ``}, // string
+ },
+ OutArgs: []rpc.ArgDesc{
+ {"", ``}, // []string
+ },
+ },
+ {
+ Name: "ResolveToMountTable",
+ Doc: "// ResolveToMountTable resolves a name to the address of the mounttable\n// directly hosting it.",
+ InArgs: []rpc.ArgDesc{
+ {"name", ``}, // string
+ },
+ OutArgs: []rpc.ArgDesc{
+ {"", ``}, // []string
+ },
+ },
+ {
+ Name: "FlushCacheEntry",
+ Doc: "// FlushCacheEntry removes the namespace cache entry for a given name.",
+ InArgs: []rpc.ArgDesc{
+ {"name", ``}, // string
+ },
+ OutArgs: []rpc.ArgDesc{
+ {"", ``}, // bool
+ },
+ },
+ {
+ Name: "DisableCache",
+ Doc: "// DisableCache disables the naming cache.",
+ InArgs: []rpc.ArgDesc{
+ {"disable", ``}, // bool
+ },
+ },
+ {
+ Name: "Roots",
+ Doc: "// Roots returns the addresses of the current mounttable roots.",
+ OutArgs: []rpc.ArgDesc{
+ {"", ``}, // []string
+ },
+ },
+ {
+ Name: "SetRoots",
+ Doc: "// SetRoots sets the current mounttable roots.",
+ InArgs: []rpc.ArgDesc{
+ {"roots", ``}, // []string
+ },
+ },
+ {
+ Name: "SetPermissions",
+ Doc: "// SetPermissions sets the AccessList in a node in a mount table.",
+ InArgs: []rpc.ArgDesc{
+ {"name", ``}, // string
+ {"acl", ``}, // access.Permissions
+ {"etag", ``}, // string
+ },
+ },
+ {
+ Name: "GetPermissions",
+ Doc: "// GetPermissions returns the AccessList in a node in a mount table.",
+ InArgs: []rpc.ArgDesc{
+ {"name", ``}, // string
+ },
+ OutArgs: []rpc.ArgDesc{
+ {"acl", ``}, // access.Permissions
+ {"etag", ``}, // string
+ },
+ },
+ {
+ Name: "Delete",
+ Doc: "// Delete deletes the name from the mounttable and, if requested, any subtree.",
+ InArgs: []rpc.ArgDesc{
+ {"name", ``}, // string
+ {"deleteSubtree", ``}, // bool
+ },
+ },
+ },
+}
+
+// NamespaceGlobServerStream is the server stream for Namespace.Glob.
+type NamespaceGlobServerStream interface {
+ // SendStream returns the send side of the Namespace.Glob server stream.
+ SendStream() interface {
+ // Send places the item onto the output stream. Returns errors encountered
+ // while sending. Blocks if there is no buffer space; will unblock when
+ // buffer space is available.
+ Send(item naming.GlobReply) error
+ }
+}
+
+// NamespaceGlobServerCall represents the context passed to Namespace.Glob.
+type NamespaceGlobServerCall interface {
+ rpc.ServerCall
+ NamespaceGlobServerStream
+}
+
+// NamespaceGlobServerCallStub is a wrapper that converts rpc.StreamServerCall into
+// a typesafe stub that implements NamespaceGlobServerCall.
+type NamespaceGlobServerCallStub struct {
+ rpc.StreamServerCall
+}
+
+// Init initializes NamespaceGlobServerCallStub from rpc.StreamServerCall.
+func (s *NamespaceGlobServerCallStub) Init(call rpc.StreamServerCall) {
+ s.StreamServerCall = call
+}
+
+// SendStream returns the send side of the Namespace.Glob server stream.
+func (s *NamespaceGlobServerCallStub) SendStream() interface {
+ Send(item naming.GlobReply) error
+} {
+ return implNamespaceGlobServerCallSend{s}
+}
+
+type implNamespaceGlobServerCallSend struct {
+ s *NamespaceGlobServerCallStub
+}
+
+func (s implNamespaceGlobServerCallSend) Send(item naming.GlobReply) error {
+ return s.s.Send(item)
+}
diff --git a/services/wspr/internal/namespace/request_handler.go b/services/wspr/internal/namespace/request_handler.go
new file mode 100644
index 0000000..ca8c62e
--- /dev/null
+++ b/services/wspr/internal/namespace/request_handler.go
@@ -0,0 +1,111 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package namespace
+
+import (
+ "time"
+
+ "v.io/v23"
+ "v.io/v23/context"
+ "v.io/v23/namespace"
+ "v.io/v23/naming"
+ "v.io/v23/rpc"
+ "v.io/v23/security/access"
+ "v.io/v23/verror"
+)
+
+type Server struct {
+ ns namespace.T
+}
+
+func New(ctx *context.T) *Server {
+ return &Server{v23.GetNamespace(ctx)}
+}
+
+func (s *Server) Glob(call *NamespaceGlobServerCallStub, pattern string) error {
+ // Call Glob on the namespace client instance
+ ch, err := s.ns.Glob(call.Context(), pattern)
+ if err != nil {
+ return err
+ }
+
+ stream := call.SendStream()
+
+ for mp := range ch {
+ var reply naming.GlobReply
+ switch v := mp.(type) {
+ case *naming.GlobError:
+ reply = naming.GlobReplyError{*v}
+ case *naming.MountEntry:
+ reply = naming.GlobReplyEntry{*v}
+ }
+ if err = stream.Send(reply); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func (s *Server) Mount(call rpc.ServerCall, name, server string, ttl time.Duration, replace bool) error {
+ rmOpt := naming.ReplaceMount(replace)
+ err := s.ns.Mount(call.Context(), name, server, ttl, rmOpt)
+ if err != nil {
+ err = verror.Convert(verror.ErrInternal, call.Context(), err)
+ }
+ return err
+}
+
+func (s *Server) Unmount(call rpc.ServerCall, name, server string) error {
+ return s.ns.Unmount(call.Context(), name, server)
+}
+
+func (s *Server) Resolve(call rpc.ServerCall, name string) ([]string, error) {
+ me, err := s.ns.Resolve(call.Context(), name)
+ if err != nil {
+ return nil, verror.Convert(verror.ErrInternal, call.Context(), err)
+ }
+ return me.Names(), nil
+}
+
+func (s *Server) ResolveToMountTable(call rpc.ServerCall, name string) ([]string, error) {
+ me, err := s.ns.ResolveToMountTable(call.Context(), name)
+ if err != nil {
+ return nil, verror.Convert(verror.ErrInternal, call.Context(), err)
+ }
+ return me.Names(), nil
+}
+
+func (s *Server) FlushCacheEntry(call rpc.ServerCall, name string) (bool, error) {
+ return s.ns.FlushCacheEntry(name), nil
+}
+
+func (s *Server) DisableCache(call rpc.ServerCall, disable bool) error {
+ disableCacheCtl := naming.DisableCache(disable)
+ _ = s.ns.CacheCtl(disableCacheCtl)
+ return nil
+}
+
+func (s *Server) Roots(call rpc.ServerCall) ([]string, error) {
+ return s.ns.Roots(), nil
+}
+
+func (s *Server) SetRoots(call rpc.ServerCall, roots []string) error {
+ if err := s.ns.SetRoots(roots...); err != nil {
+ return verror.Convert(verror.ErrInternal, call.Context(), err)
+ }
+ return nil
+}
+
+func (s *Server) SetPermissions(call rpc.ServerCall, name string, acl access.Permissions, etag string) error {
+ return s.ns.SetPermissions(call.Context(), name, acl, etag)
+}
+
+func (s *Server) GetPermissions(call rpc.ServerCall, name string) (access.Permissions, string, error) {
+ return s.ns.GetPermissions(call.Context(), name)
+}
+
+func (s *Server) Delete(call rpc.ServerCall, name string, deleteSubtree bool) error {
+ return s.ns.Delete(call.Context(), name, deleteSubtree)
+}
diff --git a/services/wspr/internal/principal/blessings.go b/services/wspr/internal/principal/blessings.go
new file mode 100644
index 0000000..37e6317
--- /dev/null
+++ b/services/wspr/internal/principal/blessings.go
@@ -0,0 +1,38 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package principal
+
+import (
+ "encoding/base64"
+
+ "v.io/v23/security"
+)
+
+func ConvertBlessingsToHandle(blessings security.Blessings, handle BlessingsHandle) *JsBlessings {
+ encoded, err := EncodePublicKey(blessings.PublicKey())
+ if err != nil {
+ panic(err)
+ }
+ return &JsBlessings{
+ Handle: handle,
+ PublicKey: encoded,
+ }
+}
+
+func EncodePublicKey(key security.PublicKey) (string, error) {
+ bytes, err := key.MarshalBinary()
+ if err != nil {
+ return "", err
+ }
+ return base64.StdEncoding.EncodeToString(bytes), nil
+}
+
+func DecodePublicKey(key string) (security.PublicKey, error) {
+ b, err := base64.StdEncoding.DecodeString(key)
+ if err != nil {
+ return nil, err
+ }
+ return security.UnmarshalPublicKey(b)
+}
diff --git a/services/wspr/internal/principal/blessings.vdl b/services/wspr/internal/principal/blessings.vdl
new file mode 100644
index 0000000..7793317
--- /dev/null
+++ b/services/wspr/internal/principal/blessings.vdl
@@ -0,0 +1,14 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package principal
+
+type BlessingsHandle int32
+
+const ZeroHandle = BlessingsHandle(0)
+
+type JsBlessings struct {
+ Handle BlessingsHandle
+ PublicKey string
+}
diff --git a/services/wspr/internal/principal/blessings.vdl.go b/services/wspr/internal/principal/blessings.vdl.go
new file mode 100644
index 0000000..dc078be
--- /dev/null
+++ b/services/wspr/internal/principal/blessings.vdl.go
@@ -0,0 +1,37 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// This file was auto-generated by the vanadium vdl tool.
+// Source: blessings.vdl
+
+package principal
+
+import (
+ // VDL system imports
+ "v.io/v23/vdl"
+)
+
+type BlessingsHandle int32
+
+func (BlessingsHandle) __VDLReflect(struct {
+ Name string "v.io/x/ref/services/wspr/internal/principal.BlessingsHandle"
+}) {
+}
+
+type JsBlessings struct {
+ Handle BlessingsHandle
+ PublicKey string
+}
+
+func (JsBlessings) __VDLReflect(struct {
+ Name string "v.io/x/ref/services/wspr/internal/principal.JsBlessings"
+}) {
+}
+
+func init() {
+ vdl.Register((*BlessingsHandle)(nil))
+ vdl.Register((*JsBlessings)(nil))
+}
+
+const ZeroHandle = BlessingsHandle(0)
diff --git a/services/wspr/internal/principal/js_blessings_store.go b/services/wspr/internal/principal/js_blessings_store.go
new file mode 100644
index 0000000..f660c50
--- /dev/null
+++ b/services/wspr/internal/principal/js_blessings_store.go
@@ -0,0 +1,55 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package principal
+
+import (
+ "sync"
+
+ "v.io/v23/security"
+)
+
+// JSBlessingsHandles is a store for Blessings in use by JS code.
+//
+// We don't pass the full Blessings object to avoid serializing
+// and deserializing a potentially huge forest of blessings.
+// Instead we pass to JS a handle to a Blessings object and have
+// all operations involving cryptographic operations call into go.
+type JSBlessingsHandles struct {
+ mu sync.Mutex
+ lastHandle BlessingsHandle
+ store map[BlessingsHandle]security.Blessings
+}
+
+// NewJSBlessingsHandles returns a newly initialized JSBlessingsHandles
+func NewJSBlessingsHandles() *JSBlessingsHandles {
+ return &JSBlessingsHandles{
+ store: map[BlessingsHandle]security.Blessings{},
+ }
+}
+
+// Add adds a Blessings to the store and returns the handle to it.
+func (s *JSBlessingsHandles) Add(blessings security.Blessings) BlessingsHandle {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ s.lastHandle++
+ handle := s.lastHandle
+ s.store[handle] = blessings
+ return handle
+}
+
+// Remove removes the Blessings associated with the handle.
+func (s *JSBlessingsHandles) Remove(handle BlessingsHandle) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ delete(s.store, handle)
+}
+
+// Get returns the Blessings represented by the handle. Returns nil
+// if no Blessings exists for the handle.
+func (s *JSBlessingsHandles) Get(handle BlessingsHandle) security.Blessings {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ return s.store[handle]
+}
diff --git a/services/wspr/internal/principal/js_blessings_store_test.go b/services/wspr/internal/principal/js_blessings_store_test.go
new file mode 100644
index 0000000..0433504
--- /dev/null
+++ b/services/wspr/internal/principal/js_blessings_store_test.go
@@ -0,0 +1,27 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package principal
+
+import (
+ "reflect"
+ "testing"
+
+ "v.io/x/ref/test/testutil"
+)
+
+func TestJSBlessingStore(t *testing.T) {
+ s := NewJSBlessingsHandles()
+ b := blessSelf(testutil.NewPrincipal(), "irrelevant")
+
+ h := s.Add(b)
+ if got := s.Get(h); !reflect.DeepEqual(got, b) {
+ t.Fatalf("Get after adding: got: %v, want: %v", got, b)
+ }
+
+ s.Remove(h)
+ if got := s.Get(h); !got.IsZero() {
+ t.Fatalf("Get after removing: got: %v, want nil", got)
+ }
+}
diff --git a/services/wspr/internal/principal/principal.go b/services/wspr/internal/principal/principal.go
new file mode 100644
index 0000000..acffdc5
--- /dev/null
+++ b/services/wspr/internal/principal/principal.go
@@ -0,0 +1,351 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package principal implements a principal manager that maps origins to
+// vanadium principals.
+//
+// Each instance of wspr is expected to have a single (human) user at a time
+// that will be signed in and using various apps. A user may have different
+// Blessings, which in practice will be done by having multiple accounts
+// across many Blessing providers (e.g., google, facebook, etc). This is similar
+// to having a master identity that is linked to multiple identities in today's
+// technology. In our case, each user account is represented as a Blessing
+// obtained from the corresponding Blessing provider.
+//
+// Every app/origin is a different principal and has its own public/private key
+// pair, represented by a Principal object that is created by this manager on
+// the app's behalf. For each app/origin, the user may choose which account to
+// use for the app, which results in a blessing generated for the app principal
+// using the selected account's blessing.
+//
+// For example, a user Alice may have an account at Google and Facebook,
+// resulting in the blessings "google/alice@gmail.com" and
+// "facebook/alice@facebook.com". She may then choose to use her Google account
+// for the "googleplay" app, which would result in the creation of a new
+// principal for that app and a blessing "google/alice@gmail.com/googleplay" for
+// that principal.
+//
+// The principal manager only serializes the mapping from apps to (chosen)
+// accounts and the account information, but not the private keys for each app.
+package principal
+
+import (
+ "bytes"
+ "fmt"
+ "io"
+ "net/url"
+ "sync"
+ "time"
+
+ vsecurity "v.io/x/ref/security"
+ "v.io/x/ref/security/serialization"
+
+ "v.io/v23/security"
+ "v.io/v23/verror"
+ "v.io/v23/vom"
+)
+
+// permissions is a set of a permissions given to an app, containing the account
+// the app has access to and the caveats associated with it.
+type permissions struct {
+ // Account is the name of the account given to the app.
+ Account string
+ // Caveats that must be added to the blessing generated for the app from
+ // the account's blessing.
+ Caveats []security.Caveat
+ // Expirations is the expiration times of any expiration caveats. Must
+ // be unix-time so it can be persisted.
+ Expirations []int64
+}
+
+// persistentState is the state of the manager that will be persisted to disk.
+type persistentState struct {
+ // A mapping of origins to the permissions provided for the origin (such as
+ // caveats and the account given to the origin)
+ Origins map[string]permissions
+
+ // A set of accounts that maps from an account name to the Blessings associated
+ // with the account.
+ Accounts map[string]security.Blessings
+}
+
+const pkgPath = "v.io/x/ref/services/wspr/internal/principal"
+
+// Errors.
+var (
+ errUnknownAccount = verror.Register(pkgPath+".errUnknownAccount", verror.NoRetry, "{1:}{2:} unknown account{:_}")
+ errFailedToCreatePrincipal = verror.Register(pkgPath+".errFailedToCreatePrincipal", verror.NoRetry, "{1:}{2:} failed to create new principal{:_}")
+ errFailedToConstructBlessings = verror.Register(pkgPath+".errFailedToConstructBlessings", verror.NoRetry, "{1:}{2:} failed to construct Blessings to bless with{:_}")
+ errFailedToBlessPrincipal = verror.Register(pkgPath+".errFailedToBlessPrincipal", verror.NoRetry, "{1:}{2:} failed to bless new principal with the provided account{:_}")
+ errFailedToSetDefaultBlessings = verror.Register(pkgPath+".errFailedToSetDefaultBlessings", verror.NoRetry, "{1:}{2:} failed to set account blessings as default{:_}")
+ errFailedToSetAllPrincipalBlessings = verror.Register(pkgPath+".errFailedToSetAllPrincipalBlessings", verror.NoRetry, "{1:}{2:} failed to set account blessings for all principals{:_}")
+ errFailedToAddRoots = verror.Register(pkgPath+".errFailedToAddRoots", verror.NoRetry, "{1:}{2:} failed to add roots of account blessing{:_}")
+)
+
+// bufferCloser implements io.ReadWriteCloser.
+type bufferCloser struct {
+ bytes.Buffer
+}
+
+func (*bufferCloser) Close() error {
+ return nil
+}
+
+// InMemorySerializer implements SerializerReaderWriter. This Serializer should only
+// be used in tests.
+type InMemorySerializer struct {
+ data bufferCloser
+ signature bufferCloser
+ hasData bool
+}
+
+func (s *InMemorySerializer) Readers() (io.ReadCloser, io.ReadCloser, error) {
+ if !s.hasData {
+ return nil, nil, nil
+ }
+ return &s.data, &s.signature, nil
+}
+
+func (s *InMemorySerializer) Writers() (io.WriteCloser, io.WriteCloser, error) {
+ s.hasData = true
+ s.data.Reset()
+ s.signature.Reset()
+ return &s.data, &s.signature, nil
+}
+
+// PrincipalManager manages app principals. We only serialize the accounts
+// associated with this principal manager and the mapping of apps to permissions
+// that they were given.
+type PrincipalManager struct {
+ mu sync.Mutex
+ state persistentState
+
+ // root is the Principal that hosts this PrincipalManager.
+ root security.Principal
+
+ serializer vsecurity.SerializerReaderWriter
+
+ // Dummy account name
+ // TODO(bjornick, nlacasse): Remove this once the tests no longer need
+ // it.
+ dummyAccount string
+}
+
+// NewPrincipalManager returns a new PrincipalManager that creates new principals
+// for various app/origins and blessed them with blessings for the provided 'root'
+// principal from the specified blessing provider.
+// .
+//
+// It is initialized by reading data from the 'serializer' passed in which must
+// be non-nil.
+func NewPrincipalManager(root security.Principal, serializer vsecurity.SerializerReaderWriter) (*PrincipalManager, error) {
+ result := &PrincipalManager{
+ state: persistentState{
+ Origins: map[string]permissions{},
+ Accounts: map[string]security.Blessings{},
+ },
+ root: root,
+ serializer: serializer,
+ }
+ data, signature, err := serializer.Readers()
+ if err != nil {
+ return nil, err
+ }
+ if (data == nil) || (signature == nil) {
+ // No serialized data exists, returning an empty PrincipalManager.
+ return result, nil
+ }
+ vr, err := serialization.NewVerifyingReader(data, signature, root.PublicKey())
+ if err != nil {
+ return nil, err
+ }
+
+ decoder, err := vom.NewDecoder(vr)
+
+ if err != nil {
+ return nil, err
+ }
+
+ if err := decoder.Decode(&result.state); err != nil {
+ return nil, err
+ }
+ return result, nil
+}
+
+func (i *PrincipalManager) save() error {
+ data, signature, err := i.serializer.Writers()
+ if err != nil {
+ return err
+ }
+ swc, err := serialization.NewSigningWriteCloser(data, signature, i.root, nil)
+ if err != nil {
+ return err
+ }
+
+ encoder, err := vom.NewEncoder(swc)
+
+ if err != nil {
+ return err
+ }
+ if err := encoder.Encode(i.state); err != nil {
+ return err
+ }
+ return swc.Close()
+}
+
+// Principal returns the Principal for an origin or an error if there is
+// no linked account.
+func (i *PrincipalManager) Principal(origin string) (security.Principal, error) {
+ i.mu.Lock()
+ defer i.mu.Unlock()
+ perm, found := i.state.Origins[origin]
+ if !found {
+ return nil, verror.New(verror.ErrNoExist, nil, origin)
+ }
+ blessings, found := i.state.Accounts[perm.Account]
+ if !found {
+ return nil, verror.New(errUnknownAccount, nil, perm.Account)
+ }
+ return i.createPrincipal(origin, blessings, perm.Caveats)
+}
+
+// OriginHasAccount returns true iff the origin has been associated with
+// permissions and an account for which blessings have been obtained from a
+// blessing provider, and if the blessings have no expiration caveats or an
+// expiration in the future.
+func (i *PrincipalManager) OriginHasAccount(origin string) bool {
+ i.mu.Lock()
+ defer i.mu.Unlock()
+ perm, found := i.state.Origins[origin]
+ if !found {
+ return false
+ }
+
+ // Check if all expiration caveats are satisfied.
+ now := time.Now()
+ for _, unixExp := range perm.Expirations {
+ exp := time.Unix(unixExp, 0)
+ if exp.Before(now) {
+ return false
+ }
+ }
+
+ _, found = i.state.Accounts[perm.Account]
+ return found
+}
+
+// BlessingsForAccount returns the Blessing associated with the provided
+// account. It returns an error if account does not exist.
+//
+// TODO(ataly, ashankar, bjornick): Modify this method to allow searching
+// for accounts from a specific root blessing provider. One option is
+// that the method could take a set of root blessing providers as argument
+// and then return accounts whose blessings are from one of these providers.
+func (i *PrincipalManager) BlessingsForAccount(account string) (security.Blessings, error) {
+ i.mu.Lock()
+ defer i.mu.Unlock()
+
+ blessings, found := i.state.Accounts[account]
+ if !found {
+ return security.Blessings{}, verror.New(errUnknownAccount, nil, account)
+ }
+ return blessings, nil
+}
+
+// AddAccount associates the provided Blessing with the provided account.
+func (i *PrincipalManager) AddAccount(account string, blessings security.Blessings) error {
+ i.mu.Lock()
+ defer i.mu.Unlock()
+
+ old, existed := i.state.Accounts[account]
+ i.state.Accounts[account] = blessings
+
+ if err := i.save(); err != nil {
+ delete(i.state.Accounts, account)
+ if existed {
+ i.state.Accounts[account] = old
+ }
+ return err
+ }
+ return nil
+}
+
+// AddOrigin adds an origin to the manager linked to the given account.
+func (i *PrincipalManager) AddOrigin(origin string, account string, caveats []security.Caveat, expirations []time.Time) error {
+ i.mu.Lock()
+ defer i.mu.Unlock()
+ if _, found := i.state.Accounts[account]; !found {
+ return verror.New(errUnknownAccount, nil, account)
+ }
+
+ unixExpirations := []int64{}
+ for _, exp := range expirations {
+ unixExpirations = append(unixExpirations, exp.Unix())
+ }
+
+ old, existed := i.state.Origins[origin]
+ i.state.Origins[origin] = permissions{account, caveats, unixExpirations}
+
+ if err := i.save(); err != nil {
+ delete(i.state.Origins, origin)
+ if existed {
+ i.state.Origins[origin] = old
+ }
+ return err
+ }
+ return nil
+}
+
+func (i *PrincipalManager) createPrincipal(origin string, withBlessings security.Blessings, caveats []security.Caveat) (security.Principal, error) {
+ ret, err := vsecurity.NewPrincipal()
+ if err != nil {
+ return nil, verror.New(errFailedToCreatePrincipal, nil, err)
+ }
+
+ if len(caveats) == 0 {
+ caveats = append(caveats, security.UnconstrainedUse())
+ }
+ // Origins have the form protocol://hostname:port, which is not a valid
+ // blessing extension. Hence we must url-encode.
+ blessings, err := i.root.Bless(ret.PublicKey(), withBlessings, url.QueryEscape(origin), caveats[0], caveats[1:]...)
+ if err != nil {
+ return nil, verror.New(errFailedToBlessPrincipal, nil, err)
+ }
+
+ if err := ret.BlessingStore().SetDefault(blessings); err != nil {
+ return nil, verror.New(errFailedToSetDefaultBlessings, nil, err)
+ }
+ if _, err := ret.BlessingStore().Set(blessings, security.AllPrincipals); err != nil {
+ return nil, verror.New(errFailedToSetAllPrincipalBlessings, nil, err)
+ }
+ if err := ret.AddToRoots(blessings); err != nil {
+ return nil, verror.New(errFailedToAddRoots, nil, err)
+ }
+ return ret, nil
+}
+
+// Add dummy account with default blessings, for use by unauthenticated
+// clients.
+// TODO(nlacasse, bjornick): This should go away once unauthenticate clients
+// are no longer allowed.
+func (i *PrincipalManager) DummyAccount() (string, error) {
+ if i.dummyAccount == "" {
+ // Note: We only set i.dummyAccount once the account has been
+ // successfully created. Otherwise, if an error occurs, the
+ // next time this function is called it the account won't exist
+ // but this function will return the name of the account
+ // without trying to create it.
+ dummyAccount := "unauthenticated-dummy-account"
+ blessings, err := i.root.BlessSelf(dummyAccount)
+ if err != nil {
+ return "", fmt.Errorf("i.root.BlessSelf(%v) failed: %v", dummyAccount, err)
+ }
+
+ if err := i.AddAccount(dummyAccount, blessings); err != nil {
+ return "", fmt.Errorf("browspr.principalManager.AddAccount(%v, %v) failed: %v", dummyAccount, blessings, err)
+ }
+ i.dummyAccount = dummyAccount
+ }
+ return i.dummyAccount, nil
+}
diff --git a/services/wspr/internal/principal/principal_test.go b/services/wspr/internal/principal/principal_test.go
new file mode 100644
index 0000000..0bc0dd4
--- /dev/null
+++ b/services/wspr/internal/principal/principal_test.go
@@ -0,0 +1,247 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package principal
+
+import (
+ "bytes"
+ "fmt"
+ "net/url"
+ "reflect"
+ "testing"
+ "time"
+
+ "v.io/v23/context"
+ "v.io/v23/security"
+ "v.io/v23/verror"
+ "v.io/v23/vom"
+
+ "v.io/x/ref/test/testutil"
+)
+
+func accountBlessing(p security.Principal, name string) security.Blessings {
+ // Ideally the account blessing would be from a principal representing
+ // the identity provider but for testing purpose we mock it out using a
+ // self-blessing.
+ return blessSelf(p, name)
+}
+
+type tester struct {
+ googleAccount, facebookAccount, origin string
+ googleBlessings, facebookBlessings security.Blessings
+}
+
+func (t *tester) testSetters(m *PrincipalManager) error {
+ // Test AddAccount.
+ if err := m.AddAccount(t.googleAccount, t.googleBlessings); err != nil {
+ return fmt.Errorf("AddAccount(%v, %v) failed: %v", t.googleAccount, t.googleBlessings, err)
+ }
+ if err := m.AddAccount(t.facebookAccount, t.facebookBlessings); err != nil {
+ return fmt.Errorf("AddAccount(%v, %v) failed: %v", t.facebookAccount, t.facebookBlessings, err)
+ }
+
+ // Test AddOrigin.
+ cav, err := security.MethodCaveat("Foo")
+ if err != nil {
+ return fmt.Errorf("security.MethodCaveat failed: %v", err)
+ }
+
+ if err := m.AddOrigin(t.origin, t.googleAccount, []security.Caveat{cav}, nil); err != nil {
+ return fmt.Errorf("AddOrigin failed: %v", err)
+ }
+
+ if err := matchesErrorID(m.AddOrigin(t.origin, "nonExistingAccount", nil, nil), errUnknownAccount.ID); err != nil {
+ return fmt.Errorf("AddOrigin(..., 'nonExistingAccount', ...): %v", err)
+ }
+ return nil
+}
+
+func (t *tester) testGetters(m *PrincipalManager) error {
+ // Test Principal.
+ pOrigin, err := m.Principal(t.origin)
+ if err != nil {
+ return fmt.Errorf("Principal failed: %v", err)
+ }
+
+ bOrigin := pOrigin.BlessingStore().Default()
+ // Validate the integrity of the bits.
+ buf := new(bytes.Buffer)
+
+ encoder, err := vom.NewEncoder(buf)
+
+ if err != nil {
+ return err
+ }
+
+ if encoder.Encode(bOrigin); err != nil {
+ return err
+ }
+ decoder, err := vom.NewDecoder(buf)
+ if err != nil {
+ return err
+ }
+ var decoded security.Blessings
+ if err := decoder.Decode(&decoded); err != nil {
+ return err
+ }
+ if !reflect.DeepEqual(decoded, bOrigin) {
+ return fmt.Errorf("reflect.DeepEqual(%v, %v) failed after validBlessing", decoded, bOrigin)
+ }
+ bnames := func(b security.Blessings, method string) ([]string, []security.RejectedBlessing) {
+ ctx, cancel := context.RootContext()
+ defer cancel()
+ ctx = security.SetCall(ctx, security.NewCall(&security.CallParams{
+ LocalPrincipal: pOrigin,
+ RemoteBlessings: b,
+ Method: method}))
+ return security.RemoteBlessingNames(ctx)
+ }
+
+ // Validate the blessings in various contexts.
+ want := []string{t.googleAccount + security.ChainSeparator + url.QueryEscape(t.origin)}
+ if got, _ := bnames(bOrigin, "Foo"); !reflect.DeepEqual(got, want) {
+ return fmt.Errorf("with method 'Foo', got blessing: %v, want: %v", got, want)
+ }
+ if got, _ := bnames(bOrigin, "Bar"); len(got) != 0 {
+ return fmt.Errorf("with method 'Bar', got blessing: %v, want empty", got)
+ }
+
+ unknownOrigin := "http://unknown.com:80"
+ _, err = m.Principal(unknownOrigin)
+ if merr := matchesErrorID(err, verror.ErrNoExist.ID); merr != nil {
+ return fmt.Errorf("Principal(%v): %v, errorid=%v", unknownOrigin, merr)
+ }
+
+ // Test BlessingsForAccount.
+ if got, err := m.BlessingsForAccount(t.googleAccount); err != nil || !reflect.DeepEqual(got, t.googleBlessings) {
+ return fmt.Errorf("BlessingsForAccount(%v): got: %v, %v want: %v, nil", t.googleAccount, got, err, t.googleBlessings)
+ }
+ if got, err := m.BlessingsForAccount(t.facebookAccount); err != nil || !reflect.DeepEqual(got, t.facebookBlessings) {
+ return fmt.Errorf("BlessingsForAccount(%v): got: %v, %v, want: %v, nil", t.facebookAccount, got, err, t.facebookBlessings)
+ }
+ nonExistingAccount := "nonExistingAccount"
+ if got, err := m.BlessingsForAccount(nonExistingAccount); !got.IsZero() {
+ return fmt.Errorf("BlessingsForAccount(%v): got: %v, want nil", nonExistingAccount, got)
+ } else if merr := matchesError(err, "unknown account"); merr != nil {
+ return fmt.Errorf("BlessingsForAccount(%v) returned error: %v", nonExistingAccount, merr)
+ }
+ return nil
+}
+
+func newTester(root security.Principal) *tester {
+ googleAccount := "google/alice@gmail.com"
+ facebookAccount := "facebook/alice@facebook.com"
+ return &tester{
+ googleAccount: googleAccount,
+ facebookAccount: facebookAccount,
+ origin: "https://sampleapp-1.com:443",
+ googleBlessings: accountBlessing(root, googleAccount),
+ facebookBlessings: accountBlessing(root, facebookAccount),
+ }
+}
+
+func TestPrincipalManager(t *testing.T) {
+ root := testutil.NewPrincipal()
+ m, err := NewPrincipalManager(root, &InMemorySerializer{})
+ if err != nil {
+ t.Fatalf("NewPrincipalManager failed: %v", err)
+ }
+
+ mt := newTester(root)
+ if err := mt.testSetters(m); err != nil {
+ t.Fatal(err)
+ }
+ if err := mt.testGetters(m); err != nil {
+ t.Fatal(err)
+ }
+}
+
+func TestPrincipalManagerPersistence(t *testing.T) {
+ root := testutil.NewPrincipal()
+ serializer := &InMemorySerializer{}
+ m, err := NewPrincipalManager(root, serializer)
+ if err != nil {
+ t.Fatalf("NewPrincipalManager failed: %v", err)
+ }
+
+ mt := newTester(root)
+ if err := mt.testSetters(m); err != nil {
+ t.Fatal(err)
+ }
+ if err := mt.testGetters(m); err != nil {
+ t.Fatal(err)
+ }
+
+ if m, err = NewPrincipalManager(root, serializer); err != nil {
+ t.Fatalf("NewPrincipalManager failed: %v", err)
+ }
+ if err := mt.testGetters(m); err != nil {
+ t.Fatal(err)
+ }
+}
+
+func TestOriginHasAccount(t *testing.T) {
+ root := testutil.NewPrincipal()
+ m, err := NewPrincipalManager(root, &InMemorySerializer{})
+ if err != nil {
+ t.Fatalf("NewPrincipalManager failed: %v", err)
+ }
+ googleAccount := "fred/jim@gmail.com"
+
+ // Test with unknown origin.
+ unknownOrigin := "http://unknown.com"
+ if m.OriginHasAccount(unknownOrigin) {
+ fmt.Errorf("Expected m.OriginHasAccount(%v) to be false but it was true.", unknownOrigin)
+ }
+
+ // Test with no expiration caveat.
+ origin1 := "http://origin-1.com"
+
+ methodCav, err := security.MethodCaveat("Foo")
+ if err != nil {
+ fmt.Errorf("security.MethodCaveat failed: %v", err)
+ }
+
+ if err := m.AddOrigin(origin1, googleAccount, []security.Caveat{methodCav}, nil); err != nil {
+ fmt.Errorf("AddOrigin failed: %v", err)
+ }
+
+ if !m.OriginHasAccount(origin1) {
+ fmt.Errorf("Expected m.OriginHasAccount(%v) to be true but it was false.", origin1)
+ }
+
+ // Test with expiration caveat in the future.
+ origin2 := "http://origin-2.com"
+ futureTime := time.Now().Add(5 * time.Minute)
+
+ futureExpCav, err := security.ExpiryCaveat(futureTime)
+ if err != nil {
+ fmt.Errorf("security.ExpiryCaveat(%v) failed: %v", futureTime, err)
+ }
+
+ if err := m.AddOrigin(origin2, googleAccount, []security.Caveat{futureExpCav}, []time.Time{futureTime}); err != nil {
+ fmt.Errorf("AddOrigin failed: %v", err)
+ }
+
+ if !m.OriginHasAccount(origin2) {
+ fmt.Errorf("Expected m.OriginHasAccount(%v) to be true but it was false.", origin2)
+ }
+
+ // Test with expiration caveats in the past and future.
+ origin3 := "http://origin-3.com"
+ pastTime := time.Now().Add(-5 * time.Minute)
+
+ pastExpCav, err := security.ExpiryCaveat(pastTime)
+ if err != nil {
+ fmt.Errorf("security.ExpiryCaveat(%v) failed: %v", pastTime, err)
+ }
+
+ if err := m.AddOrigin(origin3, googleAccount, []security.Caveat{futureExpCav, pastExpCav}, []time.Time{futureTime, pastTime}); err != nil {
+ fmt.Errorf("AddOrigin failed: %v", err)
+ }
+
+ if m.OriginHasAccount(origin3) {
+ fmt.Errorf("Expected m.OriginHasAccount(%v) to be false but it was true.", origin3)
+ }
+}
diff --git a/services/wspr/internal/principal/util_test.go b/services/wspr/internal/principal/util_test.go
new file mode 100644
index 0000000..7ee21ed
--- /dev/null
+++ b/services/wspr/internal/principal/util_test.go
@@ -0,0 +1,47 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package principal
+
+import (
+ "fmt"
+ "strings"
+
+ "v.io/v23/security"
+ "v.io/v23/verror"
+)
+
+func blessSelf(p security.Principal, name string) security.Blessings {
+ b, err := p.BlessSelf(name)
+ if err != nil {
+ panic(err)
+ }
+ return b
+}
+
+func matchesError(got error, want string) error {
+ if (got == nil) && len(want) == 0 {
+ return nil
+ }
+ if got == nil {
+ return fmt.Errorf("Got nil error, wanted to match %q", want)
+ }
+ if !strings.Contains(got.Error(), want) {
+ return fmt.Errorf("Got error %q, wanted to match %q", got, want)
+ }
+ return nil
+}
+
+func matchesErrorID(got error, want verror.ID) error {
+ if (got == nil) && len(want) == 0 {
+ return nil
+ }
+ if got == nil {
+ return fmt.Errorf("Got nil error, wanted to match %q", want)
+ }
+ if verror.ErrorID(got) != want {
+ return fmt.Errorf("Got error %q, wanted to match %q", got, want)
+ }
+ return nil
+}
diff --git a/services/wspr/internal/rpc/server/authorizer.go b/services/wspr/internal/rpc/server/authorizer.go
new file mode 100644
index 0000000..80b8542
--- /dev/null
+++ b/services/wspr/internal/rpc/server/authorizer.go
@@ -0,0 +1,17 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package server
+
+import (
+ "v.io/v23/context"
+)
+
+type authorizer struct {
+ authFunc remoteAuthFunc
+}
+
+func (a *authorizer) Authorize(ctx *context.T) error {
+ return a.authFunc(ctx)
+}
diff --git a/services/wspr/internal/rpc/server/dispatcher.go b/services/wspr/internal/rpc/server/dispatcher.go
new file mode 100644
index 0000000..82efad2
--- /dev/null
+++ b/services/wspr/internal/rpc/server/dispatcher.go
@@ -0,0 +1,167 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package server
+
+import (
+ "bytes"
+ "encoding/json"
+ "fmt"
+ "sync"
+
+ "v.io/v23/rpc"
+ "v.io/v23/security"
+ "v.io/v23/vdlroot/signature"
+ "v.io/v23/verror"
+ "v.io/x/lib/vlog"
+ "v.io/x/ref/services/wspr/internal/lib"
+)
+
+type flowFactory interface {
+ createFlow() *Flow
+ cleanupFlow(id int32)
+}
+
+type invokerFactory interface {
+ createInvoker(handle int32, signature []signature.Interface, hasGlobber bool) (rpc.Invoker, error)
+}
+
+type authFactory interface {
+ createAuthorizer(handle int32, hasAuthorizer bool) (security.Authorizer, error)
+}
+
+type lookupIntermediateReply struct {
+ Handle int32
+ HasAuthorizer bool
+ HasGlobber bool
+ Signature string
+ Err *verror.E
+}
+
+type lookupReply struct {
+ Handle int32
+ HasAuthorizer bool
+ HasGlobber bool
+ Signature []signature.Interface
+ Err *verror.E
+}
+
+type dispatcherRequest struct {
+ ServerId uint32 `json:"serverId"`
+ Suffix string `json:"suffix"`
+}
+
+// dispatcher holds the invoker and the authorizer to be used for lookup.
+type dispatcher struct {
+ mu sync.Mutex
+ serverId uint32
+ flowFactory flowFactory
+ invokerFactory invokerFactory
+ authFactory authFactory
+ outstandingLookups map[int32]chan lookupReply
+}
+
+var _ rpc.Dispatcher = (*dispatcher)(nil)
+
+// newDispatcher is a dispatcher factory.
+func newDispatcher(serverId uint32, flowFactory flowFactory, invokerFactory invokerFactory, authFactory authFactory) *dispatcher {
+ return &dispatcher{
+ serverId: serverId,
+ flowFactory: flowFactory,
+ invokerFactory: invokerFactory,
+ authFactory: authFactory,
+ outstandingLookups: make(map[int32]chan lookupReply),
+ }
+}
+
+func (d *dispatcher) Cleanup() {
+ d.mu.Lock()
+ defer d.mu.Unlock()
+
+ for _, ch := range d.outstandingLookups {
+ verr := verror.Convert(verror.ErrInternal, nil, fmt.Errorf("Cleaning up dispatcher")).(verror.E)
+ ch <- lookupReply{Err: &verr}
+ }
+}
+
+// Lookup implements dispatcher interface Lookup.
+func (d *dispatcher) Lookup(suffix string) (interface{}, security.Authorizer, error) {
+ flow := d.flowFactory.createFlow()
+ d.mu.Lock()
+ ch := make(chan lookupReply, 1)
+ d.outstandingLookups[flow.ID] = ch
+ d.mu.Unlock()
+
+ message := dispatcherRequest{
+ ServerId: d.serverId,
+ Suffix: suffix,
+ }
+ if err := flow.Writer.Send(lib.ResponseDispatcherLookup, message); err != nil {
+ verr := verror.Convert(verror.ErrInternal, nil, err).(verror.E)
+ ch <- lookupReply{Err: &verr}
+ }
+ reply := <-ch
+
+ d.mu.Lock()
+ delete(d.outstandingLookups, flow.ID)
+ d.mu.Unlock()
+
+ d.flowFactory.cleanupFlow(flow.ID)
+
+ if reply.Err != nil {
+ return nil, nil, reply.Err
+ }
+ if reply.Handle < 0 {
+ return nil, nil, verror.New(verror.ErrNoExist, nil, "Dispatcher", suffix)
+ }
+
+ invoker, err := d.invokerFactory.createInvoker(reply.Handle, reply.Signature, reply.HasGlobber)
+ if err != nil {
+ return nil, nil, err
+ }
+ auth, err := d.authFactory.createAuthorizer(reply.Handle, reply.HasAuthorizer)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ return invoker, auth, nil
+}
+
+func (d *dispatcher) handleLookupResponse(id int32, data string) {
+ d.mu.Lock()
+ ch := d.outstandingLookups[id]
+ d.mu.Unlock()
+
+ if ch == nil {
+ d.flowFactory.cleanupFlow(id)
+ vlog.Errorf("unknown invoke request for flow: %d", id)
+ return
+ }
+
+ var intermediateReply lookupIntermediateReply
+ decoder := json.NewDecoder(bytes.NewBufferString(data))
+ if err := decoder.Decode(&intermediateReply); err != nil {
+ err2 := verror.Convert(verror.ErrInternal, nil, err).(verror.E)
+ intermediateReply = lookupIntermediateReply{Err: &err2}
+ vlog.Errorf("unmarshaling invoke request failed: %v, %s", err, data)
+ }
+
+ reply := lookupReply{
+ Handle: intermediateReply.Handle,
+ HasAuthorizer: intermediateReply.HasAuthorizer,
+ HasGlobber: intermediateReply.HasGlobber,
+ Err: intermediateReply.Err,
+ }
+ if reply.Err == nil && intermediateReply.Signature != "" {
+ if err := lib.VomDecode(intermediateReply.Signature, &reply.Signature); err != nil {
+ err2 := verror.Convert(verror.ErrInternal, nil, err).(verror.E)
+ reply.Err = &err2
+ }
+ }
+ ch <- reply
+}
+
+// StopServing implements dispatcher StopServing.
+func (*dispatcher) StopServing() {
+}
diff --git a/services/wspr/internal/rpc/server/dispatcher_test.go b/services/wspr/internal/rpc/server/dispatcher_test.go
new file mode 100644
index 0000000..b659aab
--- /dev/null
+++ b/services/wspr/internal/rpc/server/dispatcher_test.go
@@ -0,0 +1,200 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package server
+
+import (
+ "fmt"
+ "reflect"
+ "testing"
+
+ "v.io/v23/context"
+ "v.io/v23/rpc"
+ "v.io/v23/security"
+ "v.io/v23/vdl"
+ "v.io/v23/vdlroot/signature"
+ "v.io/x/ref/services/wspr/internal/lib"
+ "v.io/x/ref/services/wspr/internal/lib/testwriter"
+)
+
+type mockFlowFactory struct {
+ writer testwriter.Writer
+}
+
+func (m *mockFlowFactory) createFlow() *Flow {
+ return &Flow{ID: 0, Writer: &m.writer}
+}
+
+func (*mockFlowFactory) cleanupFlow(int32) {}
+
+type mockInvoker struct {
+ handle int32
+ sig []signature.Interface
+ hasGlobber bool
+}
+
+func (m mockInvoker) Prepare(string, int) ([]interface{}, []*vdl.Value, error) {
+ return nil, nil, nil
+}
+
+func (mockInvoker) Invoke(string, rpc.StreamServerCall, []interface{}) ([]interface{}, error) {
+ return nil, nil
+}
+
+func (mockInvoker) Globber() *rpc.GlobState {
+ return nil
+}
+
+func (m mockInvoker) Signature(call rpc.ServerCall) ([]signature.Interface, error) {
+ return m.sig, nil
+}
+
+func (m mockInvoker) MethodSignature(call rpc.ServerCall, methodName string) (signature.Method, error) {
+ method, found := m.sig[0].FindMethod(methodName)
+ if !found {
+ return signature.Method{}, fmt.Errorf("Method %q not found", methodName)
+ }
+ return method, nil
+}
+
+type mockInvokerFactory struct{}
+
+func (mockInvokerFactory) createInvoker(handle int32, sig []signature.Interface, hasGlobber bool) (rpc.Invoker, error) {
+ return &mockInvoker{handle: handle, sig: sig, hasGlobber: hasGlobber}, nil
+}
+
+type mockAuthorizer struct {
+ handle int32
+ hasAuthorizer bool
+}
+
+func (mockAuthorizer) Authorize(*context.T) error { return nil }
+
+type mockAuthorizerFactory struct{}
+
+func (mockAuthorizerFactory) createAuthorizer(handle int32, hasAuthorizer bool) (security.Authorizer, error) {
+ return mockAuthorizer{handle: handle, hasAuthorizer: hasAuthorizer}, nil
+}
+
+func TestSuccessfulLookup(t *testing.T) {
+ flowFactory := &mockFlowFactory{}
+ d := newDispatcher(0, flowFactory, mockInvokerFactory{}, mockAuthorizerFactory{})
+ expectedSig := []signature.Interface{
+ {Name: "AName"},
+ }
+ go func() {
+ if err := flowFactory.writer.WaitForMessage(1); err != nil {
+ t.Errorf("failed to get dispatch request %v", err)
+ t.Fail()
+ }
+ jsonResponse := fmt.Sprintf(`{"handle":1,"hasAuthorizer":false,"signature":"%s"}`, lib.VomEncodeOrDie(expectedSig))
+ d.handleLookupResponse(0, jsonResponse)
+ }()
+
+ invoker, auth, err := d.Lookup("a/b")
+
+ if err != nil {
+ t.Errorf("Unexpected error: %v", err)
+ }
+
+ expectedInvoker := &mockInvoker{handle: 1, sig: expectedSig}
+ if !reflect.DeepEqual(invoker, expectedInvoker) {
+ t.Errorf("wrong invoker returned, expected: %#v, got :%#v", expectedInvoker, invoker)
+ }
+
+ expectedAuth := mockAuthorizer{handle: 1, hasAuthorizer: false}
+ if !reflect.DeepEqual(auth, expectedAuth) {
+ t.Errorf("wrong authorizer returned, expected: %v, got :%v", expectedAuth, auth)
+ }
+
+ expectedResponses := []lib.Response{
+ {
+ Type: lib.ResponseDispatcherLookup,
+ Message: map[string]interface{}{
+ "serverId": 0.0,
+ "suffix": "a/b",
+ },
+ },
+ }
+ if err := testwriter.CheckResponses(&flowFactory.writer, expectedResponses, nil); err != nil {
+ t.Error(err)
+ }
+}
+
+func TestSuccessfulLookupWithAuthorizer(t *testing.T) {
+ flowFactory := &mockFlowFactory{}
+ d := newDispatcher(0, flowFactory, mockInvokerFactory{}, mockAuthorizerFactory{})
+ expectedSig := []signature.Interface{
+ {Name: "AName"},
+ }
+ go func() {
+ if err := flowFactory.writer.WaitForMessage(1); err != nil {
+ t.Errorf("failed to get dispatch request %v", err)
+ t.Fail()
+ }
+ jsonResponse := fmt.Sprintf(`{"handle":1,"hasAuthorizer":true,"signature":"%s"}`, lib.VomEncodeOrDie(expectedSig))
+ d.handleLookupResponse(0, jsonResponse)
+ }()
+
+ invoker, auth, err := d.Lookup("a/b")
+
+ if err != nil {
+ t.Errorf("Unexpected error: %v", err)
+ }
+
+ expectedInvoker := &mockInvoker{handle: 1, sig: expectedSig}
+ if !reflect.DeepEqual(invoker, expectedInvoker) {
+ t.Errorf("wrong invoker returned, expected: %v, got :%v", expectedInvoker, invoker)
+ }
+
+ expectedAuth := mockAuthorizer{handle: 1, hasAuthorizer: true}
+ if !reflect.DeepEqual(auth, expectedAuth) {
+ t.Errorf("wrong authorizer returned, expected: %v, got :%v", expectedAuth, auth)
+ }
+
+ expectedResponses := []lib.Response{
+ {
+ Type: lib.ResponseDispatcherLookup,
+ Message: map[string]interface{}{
+ "serverId": 0.0,
+ "suffix": "a/b",
+ },
+ },
+ }
+ if err := testwriter.CheckResponses(&flowFactory.writer, expectedResponses, nil); err != nil {
+ t.Error(err)
+ }
+}
+
+func TestFailedLookup(t *testing.T) {
+ flowFactory := &mockFlowFactory{}
+ d := newDispatcher(0, flowFactory, mockInvokerFactory{}, mockAuthorizerFactory{})
+ go func() {
+ if err := flowFactory.writer.WaitForMessage(1); err != nil {
+ t.Errorf("failed to get dispatch request %v", err)
+ t.Fail()
+ }
+ jsonResponse := `{"err":{"id":"v23/verror.Exists","msg":"bad stuff"}}`
+ d.handleLookupResponse(0, jsonResponse)
+ }()
+
+ _, _, err := d.Lookup("a/b")
+
+ if err == nil {
+ t.Errorf("expected error, but got none", err)
+ }
+
+ expectedResponses := []lib.Response{
+ {
+ Type: lib.ResponseDispatcherLookup,
+ Message: map[string]interface{}{
+ "serverId": 0.0,
+ "suffix": "a/b",
+ },
+ },
+ }
+ if err := testwriter.CheckResponses(&flowFactory.writer, expectedResponses, nil); err != nil {
+ t.Error(err)
+ }
+}
diff --git a/services/wspr/internal/rpc/server/invoker.go b/services/wspr/internal/rpc/server/invoker.go
new file mode 100644
index 0000000..90e86e9
--- /dev/null
+++ b/services/wspr/internal/rpc/server/invoker.go
@@ -0,0 +1,110 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package server
+
+import (
+ "v.io/v23/context"
+ "v.io/v23/naming"
+ "v.io/v23/rpc"
+ "v.io/v23/vdl"
+ "v.io/v23/vdlroot/signature"
+ "v.io/v23/verror"
+ "v.io/v23/vtrace"
+)
+
+var typedNil []int
+
+const pkgPath = "v.io/x/ref/services/wspr/internal/rpc/server"
+
+// Errors.
+var (
+ ErrWrongNumberOfArgs = verror.Register(pkgPath+".ErrWrongNumberOfArgs", verror.NoRetry, "{1:}{2:} Method {3} got {4} args, want {5}{:_}")
+ ErrMethodNotFoundInSignature = verror.Register(pkgPath+".ErrMethodNotFoundInSignature", verror.NoRetry, "{1:}{2:} Method {3} not found in signature{:_}")
+)
+
+// invoker holds a delegate function to call on invoke and a list of methods that
+// are available for be called.
+type invoker struct {
+ // delegate function to call when an invoke request comes in
+ invokeFunc remoteInvokeFunc
+
+ signature []signature.Interface
+
+ globFunc remoteGlobFunc
+}
+
+var _ rpc.Invoker = (*invoker)(nil)
+
+// newInvoker is an invoker factory
+func newInvoker(signature []signature.Interface, invokeFunc remoteInvokeFunc, globFunc remoteGlobFunc) rpc.Invoker {
+ i := &invoker{invokeFunc, signature, globFunc}
+ return i
+}
+
+// Prepare implements the Invoker interface.
+func (i *invoker) Prepare(methodName string, numArgs int) ([]interface{}, []*vdl.Value, error) {
+ method, err := i.MethodSignature(nil, methodName)
+ if err != nil {
+ return nil, nil, err
+ }
+ if got, want := numArgs, len(method.InArgs); got != want {
+ return nil, nil, verror.New(ErrWrongNumberOfArgs, nil, methodName, got, want)
+ }
+ argptrs := make([]interface{}, len(method.InArgs))
+ for ix, arg := range method.InArgs {
+ argptrs[ix] = vdl.ZeroValue(arg.Type)
+ }
+ return argptrs, method.Tags, nil
+}
+
+// Invoke implements the Invoker interface.
+func (i *invoker) Invoke(methodName string, call rpc.StreamServerCall, argptrs []interface{}) ([]interface{}, error) {
+ replychan := i.invokeFunc(methodName, argptrs, call)
+
+ // Wait for the result
+ reply := <-replychan
+
+ if reply.Err != nil {
+ return nil, reply.Err
+ }
+
+ vtrace.GetStore(call.Context()).Merge(reply.TraceResponse)
+
+ // Convert the reply.Results from []*vdl.Value to []interface{}
+ results := make([]interface{}, len(reply.Results))
+ for i, r := range reply.Results {
+ results[i] = r
+ }
+ return results, nil
+}
+
+// TODO(bjornick,rthellend): Find a reasonable way to implement this for JS.
+func (i *invoker) Globber() *rpc.GlobState {
+ if i.globFunc == nil {
+ return nil
+ }
+ return &rpc.GlobState{AllGlobber: i}
+}
+
+func (i *invoker) Glob__(call rpc.ServerCall, pattern string) (<-chan naming.GlobReply, error) {
+ return i.globFunc(pattern, call)
+}
+
+func (i *invoker) Signature(call rpc.ServerCall) ([]signature.Interface, error) {
+ return i.signature, nil
+}
+
+func (i *invoker) MethodSignature(call rpc.ServerCall, method string) (signature.Method, error) {
+ if methodSig, ok := signature.FirstMethod(i.signature, method); ok {
+ return methodSig, nil
+ }
+
+ var innerContext *context.T
+ if call != nil {
+ innerContext = call.Context()
+ }
+
+ return signature.Method{}, verror.New(ErrMethodNotFoundInSignature, innerContext, method)
+}
diff --git a/services/wspr/internal/rpc/server/server.go b/services/wspr/internal/rpc/server/server.go
new file mode 100644
index 0000000..3a97693
--- /dev/null
+++ b/services/wspr/internal/rpc/server/server.go
@@ -0,0 +1,727 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// An implementation of a server for WSPR
+
+package server
+
+import (
+ "encoding/json"
+ "fmt"
+ "sync"
+ "time"
+
+ "v.io/v23"
+ "v.io/v23/context"
+ "v.io/v23/naming"
+ "v.io/v23/rpc"
+ "v.io/v23/security"
+ "v.io/v23/vdl"
+ "v.io/v23/vdlroot/signature"
+ vdltime "v.io/v23/vdlroot/time"
+ "v.io/v23/verror"
+ "v.io/v23/vtrace"
+ "v.io/x/lib/vlog"
+ "v.io/x/ref/services/wspr/internal/lib"
+ "v.io/x/ref/services/wspr/internal/principal"
+)
+
+type Flow struct {
+ ID int32
+ Writer lib.ClientWriter
+}
+
+// A request from the proxy to javascript to handle an RPC
+type ServerRPCRequest struct {
+ ServerId uint32
+ Handle int32
+ Method string
+ Args []interface{}
+ Call ServerRPCRequestCall
+}
+
+type ServerRPCRequestCall struct {
+ SecurityCall SecurityCall
+ Deadline vdltime.Deadline
+ TraceRequest vtrace.Request
+}
+
+type FlowHandler interface {
+ CreateNewFlow(server *Server, sender rpc.Stream) *Flow
+
+ CleanupFlow(id int32)
+}
+
+type HandleStore interface {
+ // Adds blessings to the store and returns handle to the blessings
+ AddBlessings(blessings security.Blessings) principal.BlessingsHandle
+}
+
+type ServerHelper interface {
+ FlowHandler
+ HandleStore
+
+ SendLogMessage(level lib.LogLevel, msg string) error
+
+ Context() *context.T
+}
+
+type authReply struct {
+ Err *verror.E
+}
+
+// AuthRequest is a request for a javascript authorizer to run
+// This is exported to make the app test easier.
+type AuthRequest struct {
+ ServerId uint32 `json:"serverId"`
+ Handle int32 `json:"handle"`
+ Call SecurityCall `json:"call"`
+}
+
+type Server struct {
+ // serverStateLock should be aquired when starting or stopping the server.
+ // This should be locked before outstandingRequestLock.
+ serverStateLock sync.Mutex
+
+ // The rpc.ListenSpec to use with server.Listen
+ listenSpec *rpc.ListenSpec
+
+ // The server that handles the rpc layer. Listen on this server is
+ // lazily started.
+ server rpc.Server
+
+ // The saved dispatcher to reuse when serve is called multiple times.
+ dispatcher *dispatcher
+
+ // Whether the server is listening.
+ isListening bool
+
+ // The server id.
+ id uint32
+ helper ServerHelper
+
+ // outstandingRequestLock should be acquired only to update the
+ // outstanding request maps below.
+ outstandingRequestLock sync.Mutex
+
+ // The set of outstanding server requests.
+ outstandingServerRequests map[int32]chan *lib.ServerRpcReply
+
+ outstandingAuthRequests map[int32]chan error
+
+ outstandingValidationRequests map[int32]chan []error
+
+ // statusClose will be closed when the server is shutting down, this will
+ // cause the status poller to exit.
+ statusClose chan struct{}
+}
+
+func NewServer(id uint32, listenSpec *rpc.ListenSpec, helper ServerHelper) (*Server, error) {
+ server := &Server{
+ id: id,
+ helper: helper,
+ listenSpec: listenSpec,
+ outstandingServerRequests: make(map[int32]chan *lib.ServerRpcReply),
+ outstandingAuthRequests: make(map[int32]chan error),
+ outstandingValidationRequests: make(map[int32]chan []error),
+ }
+ var err error
+ ctx := helper.Context()
+ ctx = context.WithValue(ctx, "customChainValidator", server.wsprCaveatValidator)
+ if server.server, err = v23.NewServer(ctx); err != nil {
+ return nil, err
+ }
+ return server, nil
+}
+
+// remoteInvokeFunc is a type of function that can invoke a remote method and
+// communicate the result back via a channel to the caller
+type remoteInvokeFunc func(methodName string, args []interface{}, call rpc.StreamServerCall) <-chan *lib.ServerRpcReply
+
+func (s *Server) createRemoteInvokerFunc(handle int32) remoteInvokeFunc {
+ return func(methodName string, args []interface{}, call rpc.StreamServerCall) <-chan *lib.ServerRpcReply {
+ securityCall := s.convertSecurityCall(call.Context(), true)
+
+ flow := s.helper.CreateNewFlow(s, call)
+ replyChan := make(chan *lib.ServerRpcReply, 1)
+ s.outstandingRequestLock.Lock()
+ s.outstandingServerRequests[flow.ID] = replyChan
+ s.outstandingRequestLock.Unlock()
+
+ var timeout vdltime.Deadline
+ if deadline, ok := call.Context().Deadline(); ok {
+ timeout.Time = deadline
+ }
+
+ errHandler := func(err error) <-chan *lib.ServerRpcReply {
+ if ch := s.popServerRequest(flow.ID); ch != nil {
+ stdErr := verror.Convert(verror.ErrInternal, call.Context(), err).(verror.E)
+ ch <- &lib.ServerRpcReply{nil, &stdErr, vtrace.Response{}}
+ s.helper.CleanupFlow(flow.ID)
+ }
+ return replyChan
+ }
+
+ rpcCall := ServerRPCRequestCall{
+ SecurityCall: securityCall,
+ Deadline: timeout,
+ TraceRequest: vtrace.GetRequest(call.Context()),
+ }
+
+ // Send a invocation request to JavaScript
+ message := ServerRPCRequest{
+ ServerId: s.id,
+ Handle: handle,
+ Method: lib.LowercaseFirstCharacter(methodName),
+ Args: args,
+ Call: rpcCall,
+ }
+ vomMessage, err := lib.VomEncode(message)
+ if err != nil {
+ return errHandler(err)
+ }
+ if err := flow.Writer.Send(lib.ResponseServerRequest, vomMessage); err != nil {
+ return errHandler(err)
+ }
+
+ vlog.VI(3).Infof("calling method %q with args %v, MessageID %d assigned\n", methodName, args, flow.ID)
+
+ // Watch for cancellation.
+ go func() {
+ <-call.Context().Done()
+ ch := s.popServerRequest(flow.ID)
+ if ch == nil {
+ return
+ }
+
+ // Send a cancel message to the JS server.
+ flow.Writer.Send(lib.ResponseCancel, nil)
+ s.helper.CleanupFlow(flow.ID)
+
+ err := verror.Convert(verror.ErrAborted, call.Context(), call.Context().Err()).(verror.E)
+ ch <- &lib.ServerRpcReply{nil, &err, vtrace.Response{}}
+ }()
+
+ go proxyStream(call, flow.Writer)
+
+ return replyChan
+ }
+}
+
+type globStream struct {
+ ch chan naming.GlobReply
+ ctx *context.T
+}
+
+func (g *globStream) Send(item interface{}) error {
+ if v, ok := item.(naming.GlobReply); ok {
+ g.ch <- v
+ return nil
+ }
+ return verror.New(verror.ErrBadArg, g.ctx, item)
+}
+
+func (g *globStream) Recv(itemptr interface{}) error {
+ return verror.New(verror.ErrNoExist, g.ctx, "Can't call recieve on glob stream")
+}
+
+func (g *globStream) CloseSend() error {
+ close(g.ch)
+ return nil
+}
+
+// remoteGlobFunc is a type of function that can invoke a remote glob and
+// communicate the result back via the channel returned
+type remoteGlobFunc func(pattern string, call rpc.ServerCall) (<-chan naming.GlobReply, error)
+
+func (s *Server) createRemoteGlobFunc(handle int32) remoteGlobFunc {
+ return func(pattern string, call rpc.ServerCall) (<-chan naming.GlobReply, error) {
+ // Until the tests get fixed, we need to create a security context before creating the flow
+ // because creating the security context creates a flow and flow ids will be off.
+ // See https://github.com/veyron/release-issues/issues/1181
+ securityCall := s.convertSecurityCall(call.Context(), true)
+
+ globChan := make(chan naming.GlobReply, 1)
+ flow := s.helper.CreateNewFlow(s, &globStream{
+ ch: globChan,
+ ctx: call.Context(),
+ })
+ replyChan := make(chan *lib.ServerRpcReply, 1)
+ s.outstandingRequestLock.Lock()
+ s.outstandingServerRequests[flow.ID] = replyChan
+ s.outstandingRequestLock.Unlock()
+
+ var timeout vdltime.Deadline
+ if deadline, ok := call.Context().Deadline(); ok {
+ timeout.Time = deadline
+ }
+
+ errHandler := func(err error) (<-chan naming.GlobReply, error) {
+ if ch := s.popServerRequest(flow.ID); ch != nil {
+ s.helper.CleanupFlow(flow.ID)
+ }
+ return nil, verror.Convert(verror.ErrInternal, call.Context(), err).(verror.E)
+ }
+
+ rpcCall := ServerRPCRequestCall{
+ SecurityCall: securityCall,
+ Deadline: timeout,
+ }
+
+ // Send a invocation request to JavaScript
+ message := ServerRPCRequest{
+ ServerId: s.id,
+ Handle: handle,
+ Method: "Glob__",
+ Args: []interface{}{pattern},
+ Call: rpcCall,
+ }
+ vomMessage, err := lib.VomEncode(message)
+ if err != nil {
+ return errHandler(err)
+ }
+ if err := flow.Writer.Send(lib.ResponseServerRequest, vomMessage); err != nil {
+ return errHandler(err)
+ }
+
+ vlog.VI(3).Infof("calling method 'Glob__' with args %v, MessageID %d assigned\n", []interface{}{pattern}, flow.ID)
+
+ // Watch for cancellation.
+ go func() {
+ <-call.Context().Done()
+ ch := s.popServerRequest(flow.ID)
+ if ch == nil {
+ return
+ }
+
+ // Send a cancel message to the JS server.
+ flow.Writer.Send(lib.ResponseCancel, nil)
+ s.helper.CleanupFlow(flow.ID)
+
+ err := verror.Convert(verror.ErrAborted, call.Context(), call.Context().Err()).(verror.E)
+ ch <- &lib.ServerRpcReply{nil, &err, vtrace.Response{}}
+ }()
+
+ return globChan, nil
+ }
+}
+
+func proxyStream(stream rpc.Stream, w lib.ClientWriter) {
+ var item interface{}
+ for err := stream.Recv(&item); err == nil; err = stream.Recv(&item) {
+ vomItem, err := lib.VomEncode(item)
+ if err != nil {
+ w.Error(verror.Convert(verror.ErrInternal, nil, err))
+ return
+ }
+ if err := w.Send(lib.ResponseStream, vomItem); err != nil {
+ w.Error(verror.Convert(verror.ErrInternal, nil, err))
+ return
+ }
+ }
+ if err := w.Send(lib.ResponseStreamClose, nil); err != nil {
+ w.Error(verror.Convert(verror.ErrInternal, nil, err))
+ return
+ }
+}
+
+func (s *Server) convertBlessingsToHandle(blessings security.Blessings) principal.JsBlessings {
+ return *principal.ConvertBlessingsToHandle(blessings, s.helper.AddBlessings(blessings))
+}
+
+func makeListOfErrors(numErrors int, err error) []error {
+ errs := make([]error, numErrors)
+ for i := 0; i < numErrors; i++ {
+ errs[i] = err
+ }
+ return errs
+}
+
+// wsprCaveatValidator validates caveats in javascript.
+// It resolves each []security.Caveat in cavs to an error (or nil) and collects them in a slice.
+func (s *Server) validateCavsInJavascript(ctx *context.T, cavs [][]security.Caveat) []error {
+ flow := s.helper.CreateNewFlow(s, nil)
+ req := CaveatValidationRequest{
+ Call: s.convertSecurityCall(ctx, false),
+ Cavs: cavs,
+ }
+
+ replyChan := make(chan []error, 1)
+ s.outstandingRequestLock.Lock()
+ s.outstandingValidationRequests[flow.ID] = replyChan
+ s.outstandingRequestLock.Unlock()
+
+ defer func() {
+ s.outstandingRequestLock.Lock()
+ delete(s.outstandingValidationRequests, flow.ID)
+ s.outstandingRequestLock.Unlock()
+ s.cleanupFlow(flow.ID)
+ }()
+
+ if err := flow.Writer.Send(lib.ResponseValidate, req); err != nil {
+ vlog.VI(2).Infof("Failed to send validate response: %v", err)
+ replyChan <- makeListOfErrors(len(cavs), err)
+ }
+
+ // TODO(bprosnitz) Consider using a different timeout than the standard rpc timeout.
+ var timeoutChan <-chan time.Time
+ if deadline, ok := ctx.Deadline(); ok {
+ timeoutChan = time.After(deadline.Sub(time.Now()))
+ }
+
+ select {
+ case <-timeoutChan:
+ return makeListOfErrors(len(cavs), NewErrCaveatValidationTimeout(ctx))
+ case reply := <-replyChan:
+ if len(reply) != len(cavs) {
+ vlog.VI(2).Infof("Wspr caveat validator received %d results from javascript but expected %d", len(reply), len(cavs))
+ return makeListOfErrors(len(cavs), NewErrInvalidValidationResponseFromJavascript(ctx))
+ }
+
+ return reply
+ }
+}
+
+// wsprCaveatValidator validates caveats for javascript.
+// Certain caveats (PublicKeyThirdPartyCaveatX) are intercepted and handled in go.
+// This call validateCavsInJavascript to process the remaining caveats in javascript.
+func (s *Server) wsprCaveatValidator(ctx *context.T, cavs [][]security.Caveat) []error {
+ type validationStatus struct {
+ err error
+ isSet bool
+ }
+ valStatus := make([]validationStatus, len(cavs))
+
+ var caveatChainsToValidate [][]security.Caveat
+nextCav:
+ for i, chainCavs := range cavs {
+ var newChainCavs []security.Caveat
+ for _, cav := range chainCavs {
+ switch cav.Id {
+ case security.PublicKeyThirdPartyCaveatX.Id:
+ res := cav.Validate(ctx)
+ if res != nil {
+ valStatus[i] = validationStatus{
+ err: res,
+ isSet: true,
+ }
+ continue nextCav
+ }
+ default:
+ newChainCavs = append(newChainCavs, cav)
+ }
+ }
+ if len(newChainCavs) == 0 {
+ valStatus[i] = validationStatus{
+ err: nil,
+ isSet: true,
+ }
+ } else {
+ caveatChainsToValidate = append(caveatChainsToValidate, newChainCavs)
+ }
+ }
+
+ jsRes := s.validateCavsInJavascript(ctx, caveatChainsToValidate)
+
+ outResults := make([]error, len(cavs))
+ jsIndex := 0
+ for i, status := range valStatus {
+ if status.isSet {
+ outResults[i] = status.err
+ } else {
+ outResults[i] = jsRes[jsIndex]
+ jsIndex++
+ }
+ }
+
+ return outResults
+}
+
+func (s *Server) convertSecurityCall(ctx *context.T, includeBlessingStrings bool) SecurityCall {
+ call := security.GetCall(ctx)
+ // TODO(bprosnitz) Local/Remote Endpoint should always be non-nil, but isn't
+ // due to a TODO in vc/auth.go
+ var localEndpoint string
+ if call.LocalEndpoint() != nil {
+ localEndpoint = call.LocalEndpoint().String()
+ }
+ var remoteEndpoint string
+ if call.RemoteEndpoint() != nil {
+ remoteEndpoint = call.RemoteEndpoint().String()
+ }
+ var localBlessings principal.JsBlessings
+ if !call.LocalBlessings().IsZero() {
+ localBlessings = s.convertBlessingsToHandle(call.LocalBlessings())
+ }
+ anymtags := make([]*vdl.Value, len(call.MethodTags()))
+ for i, mtag := range call.MethodTags() {
+ anymtags[i] = mtag
+ }
+ secCall := SecurityCall{
+ Method: lib.LowercaseFirstCharacter(call.Method()),
+ Suffix: call.Suffix(),
+ MethodTags: anymtags,
+ LocalEndpoint: localEndpoint,
+ RemoteEndpoint: remoteEndpoint,
+ LocalBlessings: localBlessings,
+ RemoteBlessings: s.convertBlessingsToHandle(call.RemoteBlessings()),
+ }
+ if includeBlessingStrings {
+ secCall.LocalBlessingStrings = security.LocalBlessingNames(ctx)
+ secCall.RemoteBlessingStrings, _ = security.RemoteBlessingNames(ctx)
+ }
+ return secCall
+}
+
+type remoteAuthFunc func(ctx *context.T) error
+
+func (s *Server) createRemoteAuthFunc(handle int32) remoteAuthFunc {
+ return func(ctx *context.T) error {
+ // Until the tests get fixed, we need to create a security context before creating the flow
+ // because creating the security context creates a flow and flow ids will be off.
+ securityCall := s.convertSecurityCall(ctx, true)
+
+ flow := s.helper.CreateNewFlow(s, nil)
+ replyChan := make(chan error, 1)
+ s.outstandingRequestLock.Lock()
+ s.outstandingAuthRequests[flow.ID] = replyChan
+ s.outstandingRequestLock.Unlock()
+ message := AuthRequest{
+ ServerId: s.id,
+ Handle: handle,
+ Call: securityCall,
+ }
+ vlog.VI(0).Infof("Sending out auth request for %v, %v", flow.ID, message)
+
+ vomMessage, err := lib.VomEncode(message)
+ if err != nil {
+ replyChan <- verror.Convert(verror.ErrInternal, nil, err)
+ } else if err := flow.Writer.Send(lib.ResponseAuthRequest, vomMessage); err != nil {
+ replyChan <- verror.Convert(verror.ErrInternal, nil, err)
+ }
+
+ err = <-replyChan
+ vlog.VI(0).Infof("going to respond with %v", err)
+ s.outstandingRequestLock.Lock()
+ delete(s.outstandingAuthRequests, flow.ID)
+ s.outstandingRequestLock.Unlock()
+ s.helper.CleanupFlow(flow.ID)
+ return err
+ }
+}
+
+func (s *Server) readStatus() {
+ // A map of names to the last error message sent.
+ lastErrors := map[string]string{}
+ for {
+ status := s.server.Status()
+ for _, mountStatus := range status.Mounts {
+ var errMsg string
+ if mountStatus.LastMountErr != nil {
+ errMsg = mountStatus.LastMountErr.Error()
+ }
+ mountName := mountStatus.Name
+ if lastMessage, ok := lastErrors[mountName]; !ok || errMsg != lastMessage {
+ if errMsg == "" {
+ s.helper.SendLogMessage(
+ lib.LogLevelInfo, "serve: "+mountName+" successfully mounted ")
+ } else {
+ s.helper.SendLogMessage(
+ lib.LogLevelError, "serve: "+mountName+" failed with: "+errMsg)
+ }
+ }
+ lastErrors[mountName] = errMsg
+ }
+ select {
+ case <-time.After(10 * time.Second):
+ continue
+ case <-s.statusClose:
+ return
+ }
+ }
+}
+
+func (s *Server) Serve(name string) error {
+ s.serverStateLock.Lock()
+ defer s.serverStateLock.Unlock()
+
+ if s.dispatcher == nil {
+ s.dispatcher = newDispatcher(s.id, s, s, s)
+ }
+
+ if !s.isListening {
+ _, err := s.server.Listen(*s.listenSpec)
+ if err != nil {
+ return err
+ }
+ s.isListening = true
+ }
+ if err := s.server.ServeDispatcher(name, s.dispatcher); err != nil {
+ return err
+ }
+ s.statusClose = make(chan struct{}, 1)
+ go s.readStatus()
+ return nil
+}
+
+func (s *Server) popServerRequest(id int32) chan *lib.ServerRpcReply {
+ s.outstandingRequestLock.Lock()
+ defer s.outstandingRequestLock.Unlock()
+ ch := s.outstandingServerRequests[id]
+ delete(s.outstandingServerRequests, id)
+
+ return ch
+}
+
+func (s *Server) HandleServerResponse(id int32, data string) {
+ ch := s.popServerRequest(id)
+ if ch == nil {
+ vlog.Errorf("unexpected result from JavaScript. No channel "+
+ "for MessageId: %d exists. Ignoring the results.", id)
+ // Ignore unknown responses that don't belong to any channel
+ return
+ }
+
+ // Decode the result and send it through the channel
+ var reply lib.ServerRpcReply
+ if err := lib.VomDecode(data, &reply); err != nil {
+ reply.Err = err
+ }
+
+ vlog.VI(0).Infof("response received from JavaScript server for "+
+ "MessageId %d with result %v", id, reply)
+ s.helper.CleanupFlow(id)
+ ch <- &reply
+}
+
+func (s *Server) HandleLookupResponse(id int32, data string) {
+ s.dispatcher.handleLookupResponse(id, data)
+}
+
+func (s *Server) HandleAuthResponse(id int32, data string) {
+ s.outstandingRequestLock.Lock()
+ ch := s.outstandingAuthRequests[id]
+ s.outstandingRequestLock.Unlock()
+ if ch == nil {
+ vlog.Errorf("unexpected result from JavaScript. No channel "+
+ "for MessageId: %d exists. Ignoring the results(%s)", id, data)
+ //Ignore unknown responses that don't belong to any channel
+ return
+ }
+ // Decode the result and send it through the channel
+ var reply authReply
+ if decoderErr := json.Unmarshal([]byte(data), &reply); decoderErr != nil {
+ err := verror.Convert(verror.ErrInternal, nil, decoderErr).(verror.E)
+ reply = authReply{Err: &err}
+ }
+
+ vlog.VI(0).Infof("response received from JavaScript server for "+
+ "MessageId %d with result %v", id, reply)
+ s.helper.CleanupFlow(id)
+ // A nil verror.E does not result in an nil error. Instead, we have create
+ // a variable for the error interface and only set it's value if the struct is non-
+ // nil.
+ var err error
+ if reply.Err != nil {
+ err = reply.Err
+ }
+ ch <- err
+}
+
+func (s *Server) HandleCaveatValidationResponse(id int32, data string) {
+ s.outstandingRequestLock.Lock()
+ ch := s.outstandingValidationRequests[id]
+ s.outstandingRequestLock.Unlock()
+ if ch == nil {
+ vlog.Errorf("unexpected result from JavaScript. No channel "+
+ "for validation response with MessageId: %d exists. Ignoring the results(%s)", id, data)
+ //Ignore unknown responses that don't belong to any channel
+ return
+ }
+
+ var reply CaveatValidationResponse
+ if err := lib.VomDecode(data, &reply); err != nil {
+ vlog.Errorf("failed to decode validation response %q: error %v", data, err)
+ ch <- []error{}
+ return
+ }
+
+ ch <- reply.Results
+}
+
+func (s *Server) createFlow() *Flow {
+ return s.helper.CreateNewFlow(s, nil)
+}
+
+func (s *Server) cleanupFlow(id int32) {
+ s.helper.CleanupFlow(id)
+}
+
+func (s *Server) createInvoker(handle int32, sig []signature.Interface, hasGlobber bool) (rpc.Invoker, error) {
+ remoteInvokeFunc := s.createRemoteInvokerFunc(handle)
+ var globFunc remoteGlobFunc
+ if hasGlobber {
+ globFunc = s.createRemoteGlobFunc(handle)
+ }
+ return newInvoker(sig, remoteInvokeFunc, globFunc), nil
+}
+
+func (s *Server) createAuthorizer(handle int32, hasAuthorizer bool) (security.Authorizer, error) {
+ if hasAuthorizer {
+ return &authorizer{authFunc: s.createRemoteAuthFunc(handle)}, nil
+ }
+ return nil, nil
+}
+
+func (s *Server) Stop() {
+ stdErr := verror.New(verror.ErrTimeout, nil).(verror.E)
+ result := lib.ServerRpcReply{
+ Results: nil,
+ Err: &stdErr,
+ }
+ s.serverStateLock.Lock()
+
+ if s.statusClose != nil {
+ close(s.statusClose)
+ }
+ if s.dispatcher != nil {
+ s.dispatcher.Cleanup()
+ }
+
+ for _, ch := range s.outstandingAuthRequests {
+ ch <- fmt.Errorf("Cleaning up server")
+ }
+ s.outstandingAuthRequests = make(map[int32]chan error)
+
+ for _, ch := range s.outstandingServerRequests {
+ select {
+ case ch <- &result:
+ default:
+ }
+ }
+ s.outstandingRequestLock.Lock()
+ s.outstandingAuthRequests = make(map[int32]chan error)
+ s.outstandingServerRequests = make(map[int32]chan *lib.ServerRpcReply)
+ s.outstandingRequestLock.Unlock()
+ s.serverStateLock.Unlock()
+ s.server.Stop()
+
+ // Only clear the validation requests map after stopping. clearing
+ // it can cause the publisher to get stuck waiting for a caveat validtion
+ // that will never be answered, which in turn causes it to not be
+ // able to stop which prevents the server from stopping.
+ s.serverStateLock.Lock()
+ s.outstandingRequestLock.Lock()
+ s.outstandingValidationRequests = make(map[int32]chan []error)
+ s.outstandingRequestLock.Unlock()
+ s.serverStateLock.Unlock()
+}
+
+func (s *Server) AddName(name string) error {
+ return s.server.AddName(name)
+}
+
+func (s *Server) RemoveName(name string) {
+ s.server.RemoveName(name)
+}
diff --git a/services/wspr/internal/rpc/server/server.vdl b/services/wspr/internal/rpc/server/server.vdl
new file mode 100644
index 0000000..043a986
--- /dev/null
+++ b/services/wspr/internal/rpc/server/server.vdl
@@ -0,0 +1,36 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package server
+
+import (
+ "v.io/v23/security"
+ "v.io/x/ref/services/wspr/internal/principal"
+)
+
+type SecurityCall struct {
+ Method string
+ Suffix string
+ MethodTags []any
+ LocalBlessings principal.JsBlessings
+ LocalBlessingStrings []string
+ RemoteBlessings principal.JsBlessings
+ RemoteBlessingStrings []string
+ LocalEndpoint string
+ RemoteEndpoint string
+}
+
+type CaveatValidationRequest struct {
+ Call SecurityCall
+ Cavs [][]security.Caveat
+}
+
+type CaveatValidationResponse struct {
+ Results []error
+}
+
+error (
+ CaveatValidationTimeout() {"en": "Caveat validation has timed out"}
+ InvalidValidationResponseFromJavascript() {"en": "Invalid validation response from javascript"}
+)
diff --git a/services/wspr/internal/rpc/server/server.vdl.go b/services/wspr/internal/rpc/server/server.vdl.go
new file mode 100644
index 0000000..21c0041
--- /dev/null
+++ b/services/wspr/internal/rpc/server/server.vdl.go
@@ -0,0 +1,82 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// This file was auto-generated by the vanadium vdl tool.
+// Source: server.vdl
+
+package server
+
+import (
+ // VDL system imports
+ "v.io/v23/context"
+ "v.io/v23/i18n"
+ "v.io/v23/vdl"
+ "v.io/v23/verror"
+
+ // VDL user imports
+ "v.io/v23/security"
+ "v.io/x/ref/services/wspr/internal/principal"
+)
+
+type SecurityCall struct {
+ Method string
+ Suffix string
+ MethodTags []*vdl.Value
+ LocalBlessings principal.JsBlessings
+ LocalBlessingStrings []string
+ RemoteBlessings principal.JsBlessings
+ RemoteBlessingStrings []string
+ LocalEndpoint string
+ RemoteEndpoint string
+}
+
+func (SecurityCall) __VDLReflect(struct {
+ Name string "v.io/x/ref/services/wspr/internal/rpc/server.SecurityCall"
+}) {
+}
+
+type CaveatValidationRequest struct {
+ Call SecurityCall
+ Cavs [][]security.Caveat
+}
+
+func (CaveatValidationRequest) __VDLReflect(struct {
+ Name string "v.io/x/ref/services/wspr/internal/rpc/server.CaveatValidationRequest"
+}) {
+}
+
+type CaveatValidationResponse struct {
+ Results []error
+}
+
+func (CaveatValidationResponse) __VDLReflect(struct {
+ Name string "v.io/x/ref/services/wspr/internal/rpc/server.CaveatValidationResponse"
+}) {
+}
+
+func init() {
+ vdl.Register((*SecurityCall)(nil))
+ vdl.Register((*CaveatValidationRequest)(nil))
+ vdl.Register((*CaveatValidationResponse)(nil))
+}
+
+var (
+ ErrCaveatValidationTimeout = verror.Register("v.io/x/ref/services/wspr/internal/rpc/server.CaveatValidationTimeout", verror.NoRetry, "{1:}{2:} Caveat validation has timed out")
+ ErrInvalidValidationResponseFromJavascript = verror.Register("v.io/x/ref/services/wspr/internal/rpc/server.InvalidValidationResponseFromJavascript", verror.NoRetry, "{1:}{2:} Invalid validation response from javascript")
+)
+
+func init() {
+ i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrCaveatValidationTimeout.ID), "{1:}{2:} Caveat validation has timed out")
+ i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrInvalidValidationResponseFromJavascript.ID), "{1:}{2:} Invalid validation response from javascript")
+}
+
+// NewErrCaveatValidationTimeout returns an error with the ErrCaveatValidationTimeout ID.
+func NewErrCaveatValidationTimeout(ctx *context.T) error {
+ return verror.New(ErrCaveatValidationTimeout, ctx)
+}
+
+// NewErrInvalidValidationResponseFromJavascript returns an error with the ErrInvalidValidationResponseFromJavascript ID.
+func NewErrInvalidValidationResponseFromJavascript(ctx *context.T) error {
+ return verror.New(ErrInvalidValidationResponseFromJavascript, ctx)
+}