veyron/services/wsprd: Broke up the monolithic lib package into many
smaller packages.
Change-Id: I943b8ad75f96013e04f18837a517b8b2933171d6
diff --git a/services/wspr/wsprd/lib/stream.go b/services/wspr/wsprd/lib/stream.go
deleted file mode 100644
index 062115b..0000000
--- a/services/wspr/wsprd/lib/stream.go
+++ /dev/null
@@ -1,81 +0,0 @@
-// The set of streaming helper objects for wspr.
-
-package lib
-
-import (
- "veyron2/ipc"
-)
-
-// An interface for an asynchronous sender.
-type sender interface {
- // Similar to ipc.Stream.Send, expect that instead of
- // returning an error, w.sendError will be called.
- Send(item interface{}, w clientWriter)
-}
-
-// A message that will be passed to the writeLoop function that will
-// eventually write the message out to the stream.
-type streamMessage struct {
- // The data to put on the stream.
- data interface{}
- // The client writer that will be used to send errors.
- w clientWriter
-}
-
-// A stream that will eventually write messages to the underlying stream.
-// It isn't initialized with a stream, but rather a chan that will eventually
-// provide a stream, so that it can accept sends before the underlying stream
-// has been set up.
-type queueingStream chan *streamMessage
-
-// Creates and returns a queueing stream that will starting writing to the
-// stream provided by the ready channel. It is expected that ready will only
-// provide a single stream.
-// TODO(bjornick): allow for ready to pass an error if the stream had any issues
-// setting up.
-func startQueueingStream(ready chan ipc.Stream) queueingStream {
- s := make(queueingStream, 100)
- go s.writeLoop(ready)
- return s
-}
-
-func (q queueingStream) Send(item interface{}, w clientWriter) {
- // TODO(bjornick): Reject the message if the queue is too long.
- message := streamMessage{data: item, w: w}
- q <- &message
-}
-
-func (q queueingStream) Close() error {
- close(q)
- return nil
-}
-
-func (q queueingStream) writeLoop(ready chan ipc.Stream) {
- stream := <-ready
- for value, ok := <-q; ok; value, ok = <-q {
- if !ok {
- break
- }
- if err := stream.Send(value.data); err != nil {
- value.w.sendError(err)
- }
- }
-
- // If the stream is on the client side, then also close the stream.
- if call, ok := stream.(ipc.Call); ok {
- call.CloseSend()
- }
-}
-
-// A simple struct that wraps a stream with the sender api. It
-// will write to the stream synchronously. Any error will still
-// be written to clientWriter.
-type senderWrapper struct {
- stream ipc.Stream
-}
-
-func (s senderWrapper) Send(item interface{}, w clientWriter) {
- if err := s.stream.Send(item); err != nil {
- w.sendError(err)
- }
-}
diff --git a/services/wsprd/ipc/client/stream.go b/services/wsprd/ipc/client/stream.go
new file mode 100644
index 0000000..4237f8f
--- /dev/null
+++ b/services/wsprd/ipc/client/stream.go
@@ -0,0 +1,62 @@
+// A client stream helper.
+
+package client
+
+import (
+ "veyron/services/wsprd/lib"
+ "veyron2/ipc"
+)
+
+// A message that will be passed to the writeLoop function that will
+// eventually write the message out to the stream.
+type streamMessage struct {
+ // The data to put on the stream.
+ data interface{}
+ // The client writer that will be used to send errors.
+ w lib.ClientWriter
+}
+
+// A stream that will eventually write messages to the underlying stream.
+// It isn't initialized with a stream, but rather a chan that will eventually
+// provide a stream, so that it can accept sends before the underlying stream
+// has been set up.
+type QueueingStream chan *streamMessage
+
+// Creates and returns a queueing stream that will starting writing to the
+// stream provided by the ready channel. It is expected that ready will only
+// provide a single stream.
+// TODO(bjornick): allow for ready to pass an error if the stream had any issues
+// setting up.
+func StartQueueingStream(ready chan ipc.Stream) QueueingStream {
+ s := make(QueueingStream, 100)
+ go s.writeLoop(ready)
+ return s
+}
+
+func (q QueueingStream) Send(item interface{}, w lib.ClientWriter) {
+ // TODO(bjornick): Reject the message if the queue is too long.
+ message := streamMessage{data: item, w: w}
+ q <- &message
+}
+
+func (q QueueingStream) Close() error {
+ close(q)
+ return nil
+}
+
+func (q QueueingStream) writeLoop(ready chan ipc.Stream) {
+ stream := <-ready
+ for value, ok := <-q; ok; value, ok = <-q {
+ if !ok {
+ break
+ }
+ if err := stream.Send(value.data); err != nil {
+ value.w.Error(err)
+ }
+ }
+
+ // If the stream is on the client side, then also close the stream.
+ if call, ok := stream.(ipc.Call); ok {
+ call.CloseSend()
+ }
+}
diff --git a/services/wspr/wsprd/lib/dispatcher.go b/services/wsprd/ipc/server/dispatcher.go
similarity index 97%
rename from services/wspr/wsprd/lib/dispatcher.go
rename to services/wsprd/ipc/server/dispatcher.go
index 97b4aab..7c73701 100644
--- a/services/wspr/wsprd/lib/dispatcher.go
+++ b/services/wsprd/ipc/server/dispatcher.go
@@ -1,4 +1,4 @@
-package lib
+package server
import (
"veyron2/ipc"
diff --git a/services/wspr/wsprd/lib/invoker.go b/services/wsprd/ipc/server/invoker.go
similarity index 96%
rename from services/wspr/wsprd/lib/invoker.go
rename to services/wsprd/ipc/server/invoker.go
index a385d4d..aabafe8 100644
--- a/services/wspr/wsprd/lib/invoker.go
+++ b/services/wsprd/ipc/server/invoker.go
@@ -1,4 +1,4 @@
-package lib
+package server
import (
"veyron2/ipc"
@@ -24,7 +24,7 @@
predefinedInvokers := make(map[string]ipc.Invoker)
// Special handling for predefined "signature" method
- predefinedInvokers[signatureMethodName] = newSignatureInvoker(sig)
+ predefinedInvokers["Signature"] = newSignatureInvoker(sig)
i := &invoker{sig, invokeFunc, predefinedInvokers}
return i, nil
diff --git a/services/wspr/wsprd/lib/server.go b/services/wsprd/ipc/server/server.go
similarity index 65%
rename from services/wspr/wsprd/lib/server.go
rename to services/wsprd/ipc/server/server.go
index 8b95ea2..8b79b15 100644
--- a/services/wspr/wsprd/lib/server.go
+++ b/services/wsprd/ipc/server/server.go
@@ -1,6 +1,6 @@
// An implementation of a server for WSPR
-package lib
+package server
import (
"bytes"
@@ -8,17 +8,19 @@
"fmt"
"sync"
+ "veyron/services/wsprd/ipc/stream"
+ "veyron/services/wsprd/lib"
+ "veyron/services/wsprd/signature"
"veyron2"
"veyron2/ipc"
"veyron2/security"
"veyron2/verror"
"veyron2/vlog"
- "veyron2/vom"
)
-type flow struct {
- id int64
- writer clientWriter
+type Flow struct {
+ ID int64
+ Writer lib.ClientWriter
}
// A request from the proxy to javascript to handle an RPC
@@ -35,17 +37,23 @@
Name string
}
-type serverHelper interface {
- createNewFlow(server *server, sender sender) *flow
-
- cleanupFlow(id int64)
-
- getLogger() vlog.Logger
-
- rt() veyron2.Runtime
+// The response from the javascript server to the proxy.
+type serverRPCReply struct {
+ Results []interface{}
+ Err *verror.Standard
}
-type server struct {
+type ServerHelper interface {
+ CreateNewFlow(server *Server, sender stream.Sender) *Flow
+
+ CleanupFlow(id int64)
+
+ GetLogger() vlog.Logger
+
+ RT() veyron2.Runtime
+}
+
+type Server struct {
sync.Mutex
// The server that handles the ipc layer. Listen on this server is
@@ -61,7 +69,7 @@
// The server id.
id uint64
- helper serverHelper
+ helper ServerHelper
// The proxy to listen through.
veyronProxy string
@@ -70,15 +78,15 @@
outstandingServerRequests map[int64]chan *serverRPCReply
}
-func newServer(id uint64, veyronProxy string, helper serverHelper) (*server, error) {
- server := &server{
+func NewServer(id uint64, veyronProxy string, helper ServerHelper) (*Server, error) {
+ server := &Server{
id: id,
helper: helper,
veyronProxy: veyronProxy,
outstandingServerRequests: make(map[int64]chan *serverRPCReply),
}
var err error
- if server.server, err = helper.rt().NewServer(); err != nil {
+ if server.server, err = helper.RT().NewServer(); err != nil {
return nil, err
}
return server, nil
@@ -88,12 +96,12 @@
// communicate the result back via a channel to the caller
type remoteInvokeFunc func(methodName string, args []interface{}, call ipc.ServerCall) <-chan *serverRPCReply
-func (s *server) createRemoteInvokerFunc() remoteInvokeFunc {
+func (s *Server) createRemoteInvokerFunc() remoteInvokeFunc {
return func(methodName string, args []interface{}, call ipc.ServerCall) <-chan *serverRPCReply {
- flow := s.helper.createNewFlow(s, senderWrapper{stream: call})
+ flow := s.helper.CreateNewFlow(s, senderWrapper{stream: call})
replyChan := make(chan *serverRPCReply, 1)
s.Lock()
- s.outstandingServerRequests[flow.id] = replyChan
+ s.outstandingServerRequests[flow.ID] = replyChan
s.Unlock()
context := serverRPCRequestContext{
Suffix: call.Suffix(),
@@ -102,13 +110,12 @@
// Send a invocation request to JavaScript
message := serverRPCRequest{
ServerId: s.id,
- Method: lowercaseFirstCharacter(methodName),
+ Method: lib.LowercaseFirstCharacter(methodName),
Args: args,
Context: context,
}
- data := response{Type: responseServerRequest, Message: message}
- if err := vom.ObjToJSON(flow.writer, vom.ValueOf(data)); err != nil {
+ if err := flow.Writer.Send(lib.ResponseServerRequest, message); err != nil {
// Error in marshaling, pass the error through the channel immediately
replyChan <- &serverRPCReply{nil,
&verror.Standard{
@@ -117,49 +124,32 @@
}
return replyChan
}
- if err := flow.writer.FinishMessage(); err != nil {
- replyChan <- &serverRPCReply{nil,
- &verror.Standard{
- ID: verror.Internal,
- Msg: fmt.Sprintf("WSPR: error finishing message: %v", err)},
- }
- return replyChan
- }
- s.helper.getLogger().VI(3).Infof("request received to call method %q on "+
+ s.helper.GetLogger().VI(3).Infof("request received to call method %q on "+
"JavaScript server with args %v, MessageId %d was assigned.",
- methodName, args, flow.id)
+ methodName, args, flow.ID)
- go proxyStream(call, flow.writer, s.helper.getLogger())
+ go proxyStream(call, flow.Writer, s.helper.GetLogger())
return replyChan
}
}
-func proxyStream(stream ipc.Stream, w clientWriter, logger vlog.Logger) {
+func proxyStream(stream ipc.Stream, w lib.ClientWriter, logger vlog.Logger) {
var item interface{}
for err := stream.Recv(&item); err == nil; err = stream.Recv(&item) {
- data := response{Type: responseStream, Message: item}
- if err := vom.ObjToJSON(w, vom.ValueOf(data)); err != nil {
- w.sendError(verror.Internalf("error marshalling stream: %v:", err))
- return
- }
- if err := w.FinishMessage(); err != nil {
- logger.Error("WSPR: error finishing message", err)
+ if err := w.Send(lib.ResponseStream, item); err != nil {
+ w.Error(verror.Internalf("error marshalling stream: %v:", err))
return
}
}
- if err := vom.ObjToJSON(w, vom.ValueOf(response{Type: responseStreamClose})); err != nil {
- w.sendError(verror.Internalf("error closing stream: %v:", err))
- return
- }
- if err := w.FinishMessage(); err != nil {
- logger.Error("WSPR: error finishing message", err)
+ if err := w.Send(lib.ResponseStreamClose, nil); err != nil {
+ w.Error(verror.Internalf("error closing stream: %v:", err))
return
}
}
-func (s *server) serve(name string, sig JSONServiceSignature) (string, error) {
+func (s *Server) Serve(name string, sig signature.JSONServiceSignature) (string, error) {
s.Lock()
defer s.Unlock()
@@ -192,17 +182,17 @@
if err := s.server.Serve(name, s.dispatcher); err != nil {
return "", err
}
- s.helper.getLogger().VI(1).Infof("endpoint is %s", s.endpoint)
+ s.helper.GetLogger().VI(1).Infof("endpoint is %s", s.endpoint)
return s.endpoint, nil
}
-func (s *server) handleServerResponse(id int64, data string) {
+func (s *Server) HandleServerResponse(id int64, data string) {
s.Lock()
ch := s.outstandingServerRequests[id]
delete(s.outstandingServerRequests, id)
s.Unlock()
if ch == nil {
- s.helper.getLogger().Errorf("unexpected result from JavaScript. No channel "+
+ s.helper.GetLogger().Errorf("unexpected result from JavaScript. No channel "+
"for MessageId: %d exists. Ignoring the results.", id)
//Ignore unknown responses that don't belong to any channel
return
@@ -218,13 +208,13 @@
serverReply = serverRPCReply{nil, &err}
}
- s.helper.getLogger().VI(3).Infof("response received from JavaScript server for "+
+ s.helper.GetLogger().VI(3).Infof("response received from JavaScript server for "+
"MessageId %d with result %v", id, serverReply)
- s.helper.cleanupFlow(id)
+ s.helper.CleanupFlow(id)
ch <- &serverReply
}
-func (s *server) Stop() {
+func (s *Server) Stop() {
result := serverRPCReply{
Results: []interface{}{nil},
Err: &verror.Standard{
diff --git a/services/wspr/wsprd/lib/signature_invoker.go b/services/wsprd/ipc/server/signature_invoker.go
similarity index 97%
rename from services/wspr/wsprd/lib/signature_invoker.go
rename to services/wsprd/ipc/server/signature_invoker.go
index 56026e2..6384510 100644
--- a/services/wspr/wsprd/lib/signature_invoker.go
+++ b/services/wsprd/ipc/server/signature_invoker.go
@@ -1,4 +1,4 @@
-package lib
+package server
import (
"veyron2/ipc"
diff --git a/services/wsprd/ipc/server/stream.go b/services/wsprd/ipc/server/stream.go
new file mode 100644
index 0000000..03b39e2
--- /dev/null
+++ b/services/wsprd/ipc/server/stream.go
@@ -0,0 +1,19 @@
+package server
+
+import (
+ "veyron/services/wsprd/lib"
+ "veyron2/ipc"
+)
+
+// A simple struct that wraps a stream with the sender api. It
+// will write to the stream synchronously. Any error will still
+// be written to clientWriter.
+type senderWrapper struct {
+ stream ipc.Stream
+}
+
+func (s senderWrapper) Send(item interface{}, w lib.ClientWriter) {
+ if err := s.stream.Send(item); err != nil {
+ w.Error(err)
+ }
+}
diff --git a/services/wsprd/ipc/stream/stream.go b/services/wsprd/ipc/stream/stream.go
new file mode 100644
index 0000000..57ef1b3
--- /dev/null
+++ b/services/wsprd/ipc/stream/stream.go
@@ -0,0 +1,14 @@
+// The set of streaming helper objects for wspr.
+
+package stream
+
+import (
+ "veyron/services/wsprd/lib"
+)
+
+// An interface for an asynchronous sender.
+type Sender interface {
+ // Similar to ipc.Stream.Send, expect that instead of
+ // returning an error, w.sendError will be called.
+ Send(item interface{}, w lib.ClientWriter)
+}
diff --git a/services/wspr/wsprd/lib/cache.go b/services/wsprd/lib/cache.go
similarity index 100%
rename from services/wspr/wsprd/lib/cache.go
rename to services/wsprd/lib/cache.go
diff --git a/services/wspr/wsprd/lib/case.go b/services/wsprd/lib/case.go
similarity index 67%
rename from services/wspr/wsprd/lib/case.go
rename to services/wsprd/lib/case.go
index b440561..e6ec0d0 100644
--- a/services/wspr/wsprd/lib/case.go
+++ b/services/wsprd/lib/case.go
@@ -2,14 +2,14 @@
import "unicode"
-func lowercaseFirstCharacter(s string) string {
+func LowercaseFirstCharacter(s string) string {
for _, r := range s {
return string(unicode.ToLower(r)) + s[1:]
}
return ""
}
-func uppercaseFirstCharacter(s string) string {
+func UppercaseFirstCharacter(s string) string {
for _, r := range s {
return string(unicode.ToUpper(r)) + s[1:]
}
diff --git a/services/wspr/wsprd/lib/remove_this.go b/services/wsprd/lib/remove_this.go
similarity index 100%
rename from services/wspr/wsprd/lib/remove_this.go
rename to services/wsprd/lib/remove_this.go
diff --git a/services/wspr/wsprd/lib/signature_manager.go b/services/wsprd/lib/signature_manager.go
similarity index 84%
rename from services/wspr/wsprd/lib/signature_manager.go
rename to services/wsprd/lib/signature_manager.go
index 642dfe8..18a76a8 100644
--- a/services/wspr/wsprd/lib/signature_manager.go
+++ b/services/wsprd/lib/signature_manager.go
@@ -8,9 +8,8 @@
"veyron2/ipc"
)
-// NewSignatureManager creates and initialized a new Signature Manager
-func newSignatureManager() *signatureManager {
- return &signatureManager{cache: make(map[string]*cacheEntry)}
+type SignatureManager interface {
+ Signature(ctx context.T, name string, client ipc.Client) (*ipc.ServiceSignature, error)
}
// signatureManager can be used to discover the signature of a remote service
@@ -24,6 +23,11 @@
cache map[string]*cacheEntry
}
+// NewSignatureManager creates and initialized a new Signature Manager
+func NewSignatureManager() SignatureManager {
+ return &signatureManager{cache: make(map[string]*cacheEntry)}
+}
+
const (
// ttl from the last-accessed time.
ttl = time.Duration(time.Hour)
@@ -41,7 +45,7 @@
// signature uses the given client to fetch the signature for the given service name.
// It locks until it fetches the service signature from the remote server, if not a cache hit.
-func (sm *signatureManager) signature(ctx context.T, name string, client ipc.Client) (*ipc.ServiceSignature, error) {
+func (sm *signatureManager) Signature(ctx context.T, name string, client ipc.Client) (*ipc.ServiceSignature, error) {
sm.Lock()
defer sm.Unlock()
@@ -51,7 +55,7 @@
}
// cache expired or not found, fetch it from the remote server
- signatureCall, err := client.StartCall(ctx, name, signatureMethodName, []interface{}{})
+ signatureCall, err := client.StartCall(ctx, name, "Signature", []interface{}{})
if err != nil {
return nil, err
}
diff --git a/services/wspr/wsprd/lib/signature_manager_test.go b/services/wsprd/lib/signature_manager_test.go
similarity index 81%
rename from services/wspr/wsprd/lib/signature_manager_test.go
rename to services/wsprd/lib/signature_manager_test.go
index ac60393..f0caf01 100644
--- a/services/wspr/wsprd/lib/signature_manager_test.go
+++ b/services/wsprd/lib/signature_manager_test.go
@@ -15,6 +15,10 @@
name = "/veyron/name"
)
+func init() {
+ rt.Init()
+}
+
func expectedSignature() ipc.ServiceSignature {
return ipc.ServiceSignature{
Methods: make(map[string]ipc.MethodSignature),
@@ -30,7 +34,7 @@
func client() *mocks_ipc.SimpleMockClient {
return mocks_ipc.NewSimpleClient(
map[string][]interface{}{
- signatureMethodName: []interface{}{expectedSignature(), nil},
+ "Signature": []interface{}{expectedSignature(), nil},
},
)
}
@@ -75,8 +79,8 @@
}
func TestFetching(t *testing.T) {
- sm := newSignatureManager()
- got, err := sm.signature(rt.R().NewContext(), name, client())
+ sm := NewSignatureManager()
+ got, err := sm.Signature(rt.R().NewContext(), name, client())
if err != nil {
t.Errorf(`Did not expect an error but got %v`, err)
return
@@ -86,8 +90,8 @@
}
func TestThatCachedAfterFetching(t *testing.T) {
- sm := newSignatureManager()
- sig, _ := sm.signature(rt.R().NewContext(), name, client())
+ sm := NewSignatureManager().(*signatureManager)
+ sig, _ := sm.Signature(rt.R().NewContext(), name, client())
cache, ok := sm.cache[name]
if !ok {
t.Errorf(`Signature manager did not cache the results`)
@@ -98,30 +102,30 @@
func TestThatCacheIsUsed(t *testing.T) {
client := client()
- sm := newSignatureManager()
+ sm := NewSignatureManager()
// call twice
- sm.signature(rt.R().NewContext(), name, client)
- sm.signature(rt.R().NewContext(), name, client)
+ sm.Signature(rt.R().NewContext(), name, client)
+ sm.Signature(rt.R().NewContext(), name, client)
// expect number of calls to Signature method of client to still be 1 since cache
// should have been used despite the second call
- if client.TimesCalled(signatureMethodName) != 1 {
+ if client.TimesCalled("Signature") != 1 {
t.Errorf("Signature cache was not used for the second call")
}
}
func TestThatLastAccessedGetUpdated(t *testing.T) {
client := client()
- sm := newSignatureManager()
- sm.signature(rt.R().NewContext(), name, client)
+ sm := NewSignatureManager().(*signatureManager)
+ sm.Signature(rt.R().NewContext(), name, client)
// make last accessed be in the past to account for the fact that
// two consecutive calls to time.Now() can return identical values.
sm.cache[name].lastAccessed = sm.cache[name].lastAccessed.Add(-ttl / 2)
prevAccess := sm.cache[name].lastAccessed
// access again
- sm.signature(rt.R().NewContext(), name, client)
+ sm.Signature(rt.R().NewContext(), name, client)
newAccess := sm.cache[name].lastAccessed
if !newAccess.After(prevAccess) {
@@ -131,17 +135,17 @@
func TestThatTTLExpires(t *testing.T) {
client := client()
- sm := newSignatureManager()
- sm.signature(rt.R().NewContext(), name, client)
+ sm := NewSignatureManager().(*signatureManager)
+ sm.Signature(rt.R().NewContext(), name, client)
// make last accessed go over the ttl
sm.cache[name].lastAccessed = sm.cache[name].lastAccessed.Add(-2 * ttl)
// make a second call
- sm.signature(rt.R().NewContext(), name, client)
+ sm.Signature(rt.R().NewContext(), name, client)
// expect number of calls to Signature method of client to be 2 since cache should have expired
- if client.TimesCalled(signatureMethodName) != 2 {
+ if client.TimesCalled("Signature") != 2 {
t.Errorf("Cache was still used but TTL had passed. It should have been fetched again")
}
}
diff --git a/services/wsprd/lib/writer.go b/services/wsprd/lib/writer.go
new file mode 100644
index 0000000..ae18996
--- /dev/null
+++ b/services/wsprd/lib/writer.go
@@ -0,0 +1,19 @@
+package lib
+
+type ResponseType int
+
+const (
+ ResponseFinal ResponseType = 0
+ ResponseStream = 1
+ ResponseError = 2
+ ResponseServerRequest = 3
+ ResponseStreamClose = 4
+)
+
+// This is basically an io.Writer interface, that allows passing error message
+// strings. This is how the proxy will talk to the javascript/java clients.
+type ClientWriter interface {
+ Send(messageType ResponseType, data interface{}) error
+
+ Error(err error)
+}
diff --git a/services/wspr/wsprd/lib/identity.go b/services/wsprd/security/identity.go
similarity index 99%
rename from services/wspr/wsprd/lib/identity.go
rename to services/wsprd/security/identity.go
index 823d4d8..f72ad24 100644
--- a/services/wspr/wsprd/lib/identity.go
+++ b/services/wsprd/security/identity.go
@@ -10,8 +10,7 @@
// information, but not the private keys for each app.
// TODO(bjornick,ataly,ashankar): Have all the accounts share the same private key which will be stored
// in a TPM, so no private key gets serialized to disk.
-
-package lib
+package security
import (
"crypto/ecdsa"
diff --git a/services/wspr/wsprd/lib/identity_test.go b/services/wsprd/security/identity_test.go
similarity index 99%
rename from services/wspr/wsprd/lib/identity_test.go
rename to services/wsprd/security/identity_test.go
index b7ccbad..9b89999 100644
--- a/services/wspr/wsprd/lib/identity_test.go
+++ b/services/wsprd/security/identity_test.go
@@ -1,4 +1,4 @@
-package lib
+package security
import (
"bytes"
diff --git a/services/wspr/wsprd/lib/signature.go b/services/wsprd/signature/signature.go
similarity index 90%
rename from services/wspr/wsprd/lib/signature.go
rename to services/wsprd/signature/signature.go
index 54dd136..0185658 100644
--- a/services/wspr/wsprd/lib/signature.go
+++ b/services/wsprd/signature/signature.go
@@ -1,16 +1,12 @@
-package lib
+package signature
import (
+ "veyron/services/wsprd/lib"
"veyron2/ipc"
"veyron2/vdl/vdlutil"
"veyron2/wiretype"
)
-const (
- // agreed-upon name of the signature method that's available on all services
- signatureMethodName = "Signature"
-)
-
var (
anydataType = wiretype.NamedPrimitiveType{
Name: "veyron2/vdlutil.AnyData",
@@ -49,7 +45,7 @@
jmethSig.InArgs[i] = inarg.Name
}
- jsig[lowercaseFirstCharacter(name)] = jmethSig
+ jsig[lib.LowercaseFirstCharacter(name)] = jmethSig
}
return jsig
@@ -88,7 +84,7 @@
ms.OutStream = anydataTypeID
}
- ss.Methods[uppercaseFirstCharacter(name)] = ms
+ ss.Methods[lib.UppercaseFirstCharacter(name)] = ms
}
ss.TypeDefs = []vdlutil.Any{anydataType, errType}
diff --git a/services/wspr/wsprd/lib/signature_test.go b/services/wsprd/signature/signature_test.go
similarity index 98%
rename from services/wspr/wsprd/lib/signature_test.go
rename to services/wsprd/signature/signature_test.go
index e8e1f4f..250acf5 100644
--- a/services/wspr/wsprd/lib/signature_test.go
+++ b/services/wsprd/signature/signature_test.go
@@ -1,4 +1,4 @@
-package lib
+package signature
import (
"reflect"
diff --git a/services/wspr/wsprd/wspr.go b/services/wsprd/wspr.go
similarity index 81%
rename from services/wspr/wsprd/wspr.go
rename to services/wsprd/wspr.go
index ef4977d..4a01ea0 100644
--- a/services/wspr/wsprd/wspr.go
+++ b/services/wsprd/wspr.go
@@ -4,7 +4,7 @@
"flag"
"veyron/lib/signals"
- "veyron/services/wspr/wsprd/lib"
+ "veyron/services/wsprd/wspr"
)
func main() {
@@ -12,7 +12,7 @@
veyronProxy := flag.String("vproxy", "", "The endpoint for the veyron proxy to publish on. This must be set")
flag.Parse()
- proxy := lib.NewWSPR(*port, *veyronProxy)
+ proxy := wspr.NewWSPR(*port, *veyronProxy)
defer proxy.Shutdown()
go func() {
proxy.Run()
diff --git a/services/wspr/wsprd/lib/writer.go b/services/wsprd/wspr/writer.go
similarity index 60%
rename from services/wspr/wsprd/lib/writer.go
rename to services/wsprd/wspr/writer.go
index 11fd44c..4c85ea0 100644
--- a/services/wspr/wsprd/lib/writer.go
+++ b/services/wsprd/wspr/writer.go
@@ -1,10 +1,13 @@
-package lib
+package wspr
import (
"bytes"
"fmt"
"path/filepath"
"runtime"
+
+ "veyron/services/wsprd/lib"
+
"veyron2/verror"
"veyron2/vlog"
"veyron2/vom"
@@ -12,30 +15,41 @@
"github.com/gorilla/websocket"
)
-// This is basically an io.Writer interface, that allows passing error message
-// strings. This is how the proxy will talk to the javascript/java clients.
-type clientWriter interface {
- Write(p []byte) (int, error)
-
- sendError(err error)
-
- FinishMessage() error
+// Wraps a response to the proxy client and adds a message type.
+type response struct {
+ Type lib.ResponseType
+ Message interface{}
}
// Implements clientWriter interface for sending messages over websockets.
type websocketWriter struct {
ws *websocket.Conn
- buf bytes.Buffer
logger vlog.Logger
id int64
}
-func (w *websocketWriter) Write(p []byte) (int, error) {
- w.buf.Write(p)
- return len(p), nil
+func (w *websocketWriter) Send(messageType lib.ResponseType, data interface{}) error {
+ var buf bytes.Buffer
+ if err := vom.ObjToJSON(&buf, vom.ValueOf(response{Type: messageType, Message: data})); err != nil {
+ w.logger.Error("Failed to marshal with", err)
+ return err
+ }
+
+ wc, err := w.ws.NextWriter(websocket.TextMessage)
+ if err != nil {
+ w.logger.Error("Failed to get a writer from the websocket", err)
+ return err
+ }
+ if err := vom.ObjToJSON(wc, vom.ValueOf(websocketMessage{Id: w.id, Data: buf.String()})); err != nil {
+ w.logger.Error("Failed to write the message", err)
+ return err
+ }
+ wc.Close()
+
+ return nil
}
-func (w *websocketWriter) sendError(err error) {
+func (w *websocketWriter) Error(err error) {
verr := verror.ToStandard(err)
// Also log the error but write internal errors at a more severe log level
@@ -56,26 +70,5 @@
Msg: verr.Error(),
}
- w.buf.Reset()
- if err := vom.ObjToJSON(&w.buf, vom.ValueOf(response{Type: responseError, Message: errMsg})); err != nil {
- w.logger.Error("Failed to marshal with", err)
- return
- }
- if err := w.FinishMessage(); err != nil {
- w.logger.Error("WSPR: error finishing message: ", err)
- return
- }
-}
-
-func (w *websocketWriter) FinishMessage() error {
- wc, err := w.ws.NextWriter(websocket.TextMessage)
- if err != nil {
- return err
- }
- if err := vom.ObjToJSON(wc, vom.ValueOf(websocketMessage{Id: w.id, Data: w.buf.String()})); err != nil {
- return err
- }
- wc.Close()
- w.buf.Reset()
- return nil
+ w.Send(lib.ResponseError, errMsg)
}
diff --git a/services/wspr/wsprd/lib/wspr.go b/services/wsprd/wspr/wspr.go
similarity index 80%
rename from services/wspr/wsprd/lib/wspr.go
rename to services/wsprd/wspr/wspr.go
index 596ed12..81e31e9 100644
--- a/services/wspr/wsprd/lib/wspr.go
+++ b/services/wsprd/wspr/wspr.go
@@ -13,7 +13,7 @@
// "IsStreaming" : true/false
// }
//
-package lib
+package wspr
import (
"bytes"
@@ -31,6 +31,11 @@
"sync"
"time"
+ "veyron/services/wsprd/ipc/client"
+ "veyron/services/wsprd/ipc/server"
+ "veyron/services/wsprd/ipc/stream"
+ "veyron/services/wsprd/lib"
+ "veyron/services/wsprd/signature"
"veyron2"
"veyron2/ipc"
"veyron2/rt"
@@ -55,29 +60,13 @@
type WSPR struct {
tlsCert *tls.Certificate
- clientCache *ClientCache
+ clientCache *lib.ClientCache
rt veyron2.Runtime
logger vlog.Logger
port int
veyronProxyEP string
}
-type responseType int
-
-const (
- responseFinal responseType = 0
- responseStream = 1
- responseError = 2
- responseServerRequest = 3
- responseStreamClose = 4
-)
-
-// Wraps a response to the proxy client and adds a message type.
-type response struct {
- Type responseType
- Message interface{}
-}
-
var logger vlog.Logger
// The type of message sent by the JS client to the wspr.
@@ -139,17 +128,11 @@
type serveRequest struct {
Name string
ServerId uint64
- Service JSONServiceSignature
-}
-
-// The response from the javascript server to the proxy.
-type serverRPCReply struct {
- Results []interface{}
- Err *verror.Standard
+ Service signature.JSONServiceSignature
}
// finishCall waits for the call to finish and write out the response to w.
-func (wsp *websocketPipe) finishCall(w clientWriter, clientCall ipc.Call, msg *veyronRPC) {
+func (wsp *websocketPipe) finishCall(w lib.ClientWriter, clientCall ipc.Call, msg *veyronRPC) {
if msg.IsStreaming {
for {
var item interface{}
@@ -157,24 +140,16 @@
if err == io.EOF {
break
}
- w.sendError(err) // Send streaming error as is
+ w.Error(err) // Send streaming error as is
return
}
- data := &response{Type: responseStream, Message: item}
- if err := vom.ObjToJSON(w, vom.ValueOf(data)); err != nil {
- w.sendError(verror.Internalf("unable to marshal: %v", item))
- continue
- }
- if err := w.FinishMessage(); err != nil {
- wsp.ctx.logger.Error("WSPR: error finishing message: ", err)
+ if err := w.Send(lib.ResponseStream, item); err != nil {
+ w.Error(verror.Internalf("unable to marshal: %v", item))
}
}
- if err := vom.ObjToJSON(w, vom.ValueOf(response{Type: responseStreamClose})); err != nil {
- w.sendError(verror.Internalf("unable to marshal close stream message"))
- }
- if err := w.FinishMessage(); err != nil {
- wsp.ctx.logger.Error("WSPR: error finishing message: ", err)
+ if err := w.Send(lib.ResponseStreamClose, nil); err != nil {
+ w.Error(verror.Internalf("unable to marshal close stream message"))
}
}
@@ -186,30 +161,23 @@
}
if err := clientCall.Finish(resultptrs...); err != nil {
// return the call system error as is
- w.sendError(err)
+ w.Error(err)
return
}
// for now we assume last out argument is always error
if len(results) < 1 {
- w.sendError(verror.Internalf("client call did not return any results"))
+ w.Error(verror.Internalf("client call did not return any results"))
return
}
if err, ok := results[len(results)-1].(error); ok {
// return the call application error as is
- w.sendError(err)
+ w.Error(err)
return
}
- data := response{Type: responseFinal, Message: results[0 : len(results)-1]}
- if err := vom.ObjToJSON(w, vom.ValueOf(data)); err != nil {
- w.sendError(verror.Internalf("error marshalling results: %v", err))
- return
- }
-
- if err := w.FinishMessage(); err != nil {
- wsp.ctx.logger.Error("WSPR: error finishing message: ", err)
- return
+ if err := w.Send(lib.ResponseFinal, results[0:len(results)-1]); err != nil {
+ w.Error(verror.Internalf("error marshalling results: %v", err))
}
}
@@ -229,16 +197,17 @@
return client, nil
}
-func (ctx WSPR) startVeyronRequest(w clientWriter, msg *veyronRPC) (ipc.Call, error) {
+func (ctx WSPR) startVeyronRequest(w lib.ClientWriter, msg *veyronRPC) (ipc.Call, error) {
// Issue request to the endpoint.
client, err := ctx.newClient(msg.PrivateId)
if err != nil {
return nil, err
}
- clientCall, err := client.StartCall(ctx.rt.TODOContext(), msg.Name, uppercaseFirstCharacter(msg.Method), msg.InArgs)
+ methodName := lib.UppercaseFirstCharacter(msg.Method)
+ clientCall, err := client.StartCall(ctx.rt.TODOContext(), msg.Name, methodName, msg.InArgs)
if err != nil {
- return nil, fmt.Errorf("error starting call (name: %v, method: %v, args: %v): %v", msg.Name, uppercaseFirstCharacter(msg.Method), msg.InArgs, err)
+ return nil, fmt.Errorf("error starting call (name: %v, method: %v, args: %v): %v", msg.Name, methodName, msg.InArgs, err)
}
return clientCall, nil
@@ -311,7 +280,7 @@
}
type outstandingStream struct {
- stream sender
+ stream stream.Sender
inType vom.Type
}
@@ -329,43 +298,43 @@
outstandingStreams map[int64]outstandingStream
// Maps flowids to the server that owns them.
- flowMap map[int64]*server
+ flowMap map[int64]*server.Server
// A manager that handles fetching and caching signature of remote services
- signatureManager *signatureManager
+ signatureManager lib.SignatureManager
// We maintain multiple Veyron server per websocket pipe for serving JavaScript
// services.
- servers map[uint64]*server
+ servers map[uint64]*server.Server
// Creates a client writer for a given flow. This is a member so that tests can override
// the default implementation.
- writerCreator func(id int64) clientWriter
+ writerCreator func(id int64) lib.ClientWriter
}
// Implements the serverHelper interface
-func (wsp *websocketPipe) createNewFlow(server *server, stream sender) *flow {
+func (wsp *websocketPipe) CreateNewFlow(s *server.Server, stream stream.Sender) *server.Flow {
wsp.Lock()
defer wsp.Unlock()
id := wsp.lastGeneratedId
wsp.lastGeneratedId += 2
- wsp.flowMap[id] = server
+ wsp.flowMap[id] = s
wsp.outstandingStreams[id] = outstandingStream{stream, vom_wiretype.Type{ID: 1}}
- return &flow{id: id, writer: wsp.writerCreator(id)}
+ return &server.Flow{ID: id, Writer: wsp.writerCreator(id)}
}
-func (wsp *websocketPipe) cleanupFlow(id int64) {
+func (wsp *websocketPipe) CleanupFlow(id int64) {
wsp.Lock()
defer wsp.Unlock()
delete(wsp.outstandingStreams, id)
delete(wsp.flowMap, id)
}
-func (wsp *websocketPipe) getLogger() vlog.Logger {
+func (wsp *websocketPipe) GetLogger() vlog.Logger {
return wsp.ctx.logger
}
-func (wsp *websocketPipe) rt() veyron2.Runtime {
+func (wsp *websocketPipe) RT() veyron2.Runtime {
return wsp.ctx.rt
}
@@ -385,13 +354,13 @@
}
func (wsp *websocketPipe) setup() {
- wsp.signatureManager = newSignatureManager()
+ wsp.signatureManager = lib.NewSignatureManager()
wsp.outstandingStreams = make(map[int64]outstandingStream)
- wsp.flowMap = make(map[int64]*server)
- wsp.servers = make(map[uint64]*server)
+ wsp.flowMap = make(map[int64]*server.Server)
+ wsp.servers = make(map[uint64]*server.Server)
if wsp.writerCreator == nil {
- wsp.writerCreator = func(id int64) clientWriter {
+ wsp.writerCreator = func(id int64) lib.ClientWriter {
return &websocketWriter{ws: wsp.ws, id: id, logger: wsp.ctx.logger}
}
}
@@ -455,12 +424,12 @@
return nil
}
-func (wsp *websocketPipe) sendParsedMessageOnStream(id int64, msg interface{}, w clientWriter) {
+func (wsp *websocketPipe) sendParsedMessageOnStream(id int64, msg interface{}, w lib.ClientWriter) {
wsp.Lock()
defer wsp.Unlock()
stream := wsp.outstandingStreams[id].stream
if stream == nil {
- w.sendError(fmt.Errorf("unknown stream"))
+ w.Error(fmt.Errorf("unknown stream"))
return
}
@@ -469,7 +438,7 @@
}
// sendOnStream writes data on id's stream. Returns an error if the send failed.
-func (wsp *websocketPipe) sendOnStream(id int64, data string, w clientWriter) {
+func (wsp *websocketPipe) sendOnStream(id int64, data string, w lib.ClientWriter) {
wsp.Lock()
typ := wsp.outstandingStreams[id].inType
wsp.Unlock()
@@ -485,12 +454,12 @@
wsp.sendParsedMessageOnStream(id, payload, w)
}
-func (wsp *websocketPipe) sendVeyronRequest(id int64, veyronMsg *veyronRPC, w clientWriter, signal chan ipc.Stream) {
+func (wsp *websocketPipe) sendVeyronRequest(id int64, veyronMsg *veyronRPC, w lib.ClientWriter, signal chan ipc.Stream) {
// We have to make the start call synchronous so we can make sure that we populate
// the call map before we can handle a recieve call.
call, err := wsp.ctx.startVeyronRequest(w, veyronMsg)
if err != nil {
- w.sendError(verror.Internalf("can't start Veyron Request: %v", err))
+ w.Error(verror.Internalf("can't start Veyron Request: %v", err))
return
}
@@ -510,7 +479,7 @@
func (wsp *websocketPipe) handleVeyronRequest(id int64, data string, w *websocketWriter) {
veyronMsg, inStreamType, err := wsp.parseVeyronRequest(bytes.NewBufferString(data))
if err != nil {
- w.sendError(verror.Internalf("can't parse Veyron Request: %v", err))
+ w.Error(verror.Internalf("can't parse Veyron Request: %v", err))
return
}
@@ -525,7 +494,7 @@
if veyronMsg.IsStreaming {
signal = make(chan ipc.Stream)
wsp.outstandingStreams[id] = outstandingStream{
- stream: startQueueingStream(signal),
+ stream: client.StartQueueingStream(signal),
inType: inStreamType,
}
}
@@ -541,9 +510,9 @@
return
}
- var call queueingStream
+ var call client.QueueingStream
var ok bool
- if call, ok = stream.(queueingStream); !ok {
+ if call, ok = stream.(client.QueueingStream); !ok {
wsp.ctx.logger.Errorf("can't close server stream: %v", id)
return
}
@@ -602,19 +571,19 @@
case websocketSignatureRequest:
go wsp.handleSignatureRequest(msg.Data, ww)
default:
- ww.sendError(verror.Unknownf("unknown message type: %v", msg.Type))
+ ww.Error(verror.Unknownf("unknown message type: %v", msg.Type))
}
}
wsp.cleanup()
}
-func (wsp *websocketPipe) maybeCreateServer(serverId uint64) (*server, error) {
+func (wsp *websocketPipe) maybeCreateServer(serverId uint64) (*server.Server, error) {
wsp.Lock()
defer wsp.Unlock()
if server, ok := wsp.servers[serverId]; ok {
return server, nil
}
- server, err := newServer(serverId, wsp.ctx.veyronProxyEP, wsp)
+ server, err := server.NewServer(serverId, wsp.ctx.veyronProxyEP, wsp)
if err != nil {
return nil, err
}
@@ -635,29 +604,23 @@
server.Stop()
}
-func (wsp *websocketPipe) serve(serveRequest serveRequest, w clientWriter) {
+func (wsp *websocketPipe) serve(serveRequest serveRequest, w lib.ClientWriter) {
// Create a server for the websocket pipe, if it does not exist already
server, err := wsp.maybeCreateServer(serveRequest.ServerId)
if err != nil {
- w.sendError(verror.Internalf("error creating server: %v", err))
+ w.Error(verror.Internalf("error creating server: %v", err))
}
wsp.ctx.logger.VI(2).Infof("serving under name: %q", serveRequest.Name)
- endpoint, err := server.serve(serveRequest.Name, serveRequest.Service)
+ endpoint, err := server.Serve(serveRequest.Name, serveRequest.Service)
if err != nil {
- w.sendError(verror.Internalf("error serving service: %v", err))
+ w.Error(verror.Internalf("error serving service: %v", err))
return
}
// Send the endpoint back
- endpointData := response{Type: responseFinal, Message: endpoint}
- if err := vom.ObjToJSON(w, vom.ValueOf(endpointData)); err != nil {
- w.sendError(verror.Internalf("error marshalling results: %v", err))
- return
- }
-
- if err := w.FinishMessage(); err != nil {
- wsp.ctx.logger.Error("WSPR: error finishing message: ", err)
+ if err := w.Send(lib.ResponseFinal, endpoint); err != nil {
+ w.Error(verror.Internalf("error marshalling results: %v", err))
return
}
}
@@ -669,7 +632,7 @@
var serveRequest serveRequest
decoder := json.NewDecoder(bytes.NewBufferString(data))
if err := decoder.Decode(&serveRequest); err != nil {
- w.sendError(verror.Internalf("can't unmarshal JSONMessage: %v", err))
+ w.Error(verror.Internalf("can't unmarshal JSONMessage: %v", err))
return
}
wsp.serve(serveRequest, w)
@@ -681,19 +644,17 @@
var serverId uint64
decoder := json.NewDecoder(bytes.NewBufferString(data))
if err := decoder.Decode(&serverId); err != nil {
- w.sendError(verror.Internalf("can't unmarshal JSONMessage: %v", err))
+ w.Error(verror.Internalf("can't unmarshal JSONMessage: %v", err))
return
}
wsp.removeServer(serverId)
// Send true to indicate stop has finished
- result := response{Type: responseFinal, Message: true}
- if err := vom.ObjToJSON(w, vom.ValueOf(result)); err != nil {
- w.sendError(verror.Internalf("error marshalling results: %v", err))
+ if err := w.Send(lib.ResponseFinal, true); err != nil {
+ w.Error(verror.Internalf("error marshalling results: %v", err))
return
}
- w.FinishMessage()
}
// handleServerResponse handles the completion of outstanding calls to JavaScript services
@@ -708,7 +669,7 @@
//Ignore unknown responses that don't belong to any channel
return
}
- server.handleServerResponse(id, data)
+ server.HandleServerResponse(id, data)
}
// parseVeyronRequest parses a json rpc request into a veyronRPC object.
@@ -726,12 +687,12 @@
// Fetch and adapt signature from the SignatureManager
ctx := wsp.ctx.rt.TODOContext()
- sig, err := wsp.signatureManager.signature(ctx, tempMsg.Name, client)
+ sig, err := wsp.signatureManager.Signature(ctx, tempMsg.Name, client)
if err != nil {
return nil, nil, verror.Internalf("error getting service signature for %s: %v", tempMsg.Name, err)
}
- methName := uppercaseFirstCharacter(tempMsg.Method)
+ methName := lib.UppercaseFirstCharacter(tempMsg.Method)
methSig, ok := sig.Methods[methName]
if !ok {
return nil, nil, fmt.Errorf("Method not found in signature: %v (full sig: %v)", methName, sig)
@@ -778,7 +739,7 @@
PrivateId string
}
-func (wsp *websocketPipe) getSignature(name string, privateId string) (JSONServiceSignature, error) {
+func (wsp *websocketPipe) getSignature(name string, privateId string) (signature.JSONServiceSignature, error) {
client, err := wsp.ctx.newClient(privateId)
if err != nil {
return nil, verror.Internalf("error creating client: %v", err)
@@ -786,12 +747,12 @@
// Fetch and adapt signature from the SignatureManager
ctx := wsp.ctx.rt.TODOContext()
- sig, err := wsp.signatureManager.signature(ctx, name, client)
+ sig, err := wsp.signatureManager.Signature(ctx, name, client)
if err != nil {
return nil, verror.Internalf("error getting service signature for %s: %v", name, err)
}
- return NewJSONServiceSignature(*sig), nil
+ return signature.NewJSONServiceSignature(*sig), nil
}
// handleSignatureRequest uses signature manager to get and cache signature of a remote server
@@ -800,7 +761,7 @@
var request signatureRequest
decoder := json.NewDecoder(bytes.NewBufferString(data))
if err := decoder.Decode(&request); err != nil {
- w.sendError(verror.Internalf("can't unmarshal JSONMessage: %v", err))
+ w.Error(verror.Internalf("can't unmarshal JSONMessage: %v", err))
return
}
@@ -808,25 +769,20 @@
wsp.ctx.logger.VI(2).Info("private id is", request.PrivateId)
jsSig, err := wsp.getSignature(request.Name, request.PrivateId)
if err != nil {
- w.sendError(err)
+ w.Error(err)
return
}
// Send the signature back
- signatureData := response{Type: responseFinal, Message: jsSig}
- if err := vom.ObjToJSON(w, vom.ValueOf(signatureData)); err != nil {
- w.sendError(verror.Internalf("error marshalling results: %v", err))
- return
- }
- if err := w.FinishMessage(); err != nil {
- w.logger.Error("WSPR: error finishing message: ", err)
+ if err := w.Send(lib.ResponseFinal, jsSig); err != nil {
+ w.Error(verror.Internalf("error marshalling results: %v", err))
return
}
}
func (ctx *WSPR) setup() {
// Cache up to 20 identity.PrivateID->ipc.Client mappings
- ctx.clientCache = NewClientCache(20)
+ ctx.clientCache = lib.NewClientCache(20)
}
// Starts the proxy and listens for requests. This method is blocking.
diff --git a/services/wspr/wsprd/lib/wspr_test.go b/services/wsprd/wspr/wspr_test.go
similarity index 89%
rename from services/wspr/wsprd/lib/wspr_test.go
rename to services/wsprd/wspr/wspr_test.go
index 018270d..d03d7fa 100644
--- a/services/wspr/wsprd/lib/wspr_test.go
+++ b/services/wsprd/wspr/wspr_test.go
@@ -1,4 +1,4 @@
-package lib
+package wspr
import (
"bytes"
@@ -8,6 +8,9 @@
"sync"
"testing"
"time"
+ "veyron/services/wsprd/ipc/client"
+ "veyron/services/wsprd/lib"
+ "veyron/services/wsprd/signature"
"veyron2"
"veyron2/ipc"
"veyron2/naming"
@@ -132,7 +135,6 @@
type testWriter struct {
sync.Mutex
stream []response
- buf bytes.Buffer
err error
logger vlog.Logger
// If this channel is set then a message will be sent
@@ -140,29 +142,32 @@
notifier chan bool
}
-func (w *testWriter) Write(p []byte) (int, error) {
- return w.buf.Write(p)
-
-}
-
-func (w *testWriter) sendError(err error) {
- w.err = err
-}
-
-func (w *testWriter) FinishMessage() error {
- var resp response
- p := w.buf.Bytes()
- w.buf.Reset()
- if err := json.Unmarshal(p, &resp); err != nil {
+func (w *testWriter) Send(responseType lib.ResponseType, msg interface{}) error {
+ w.Lock()
+ defer w.Unlock()
+ // We serialize and deserialize the reponse so that we can do deep equal with
+ // messages that contain non-exported structs.
+ var buf bytes.Buffer
+ if err := json.NewEncoder(&buf).Encode(response{Type: responseType, Message: msg}); err != nil {
return err
}
- w.Lock()
- w.stream = append(w.stream, resp)
+
+ var r response
+
+ if err := json.NewDecoder(&buf).Decode(&r); err != nil {
+ return err
+ }
+
+ w.stream = append(w.stream, r)
if w.notifier != nil {
w.notifier <- true
}
- w.Unlock()
return nil
+
+}
+
+func (w *testWriter) Error(err error) {
+ w.err = err
}
func (w *testWriter) streamLength() int {
@@ -204,18 +209,18 @@
}
}
-var adderServiceSignature JSONServiceSignature = JSONServiceSignature{
- "add": JSONMethodSignature{
+var adderServiceSignature signature.JSONServiceSignature = signature.JSONServiceSignature{
+ "add": signature.JSONMethodSignature{
InArgs: []string{"A", "B"},
NumOutArgs: 2,
IsStreaming: false,
},
- "divide": JSONMethodSignature{
+ "divide": signature.JSONMethodSignature{
InArgs: []string{"A", "B"},
NumOutArgs: 2,
IsStreaming: false,
},
- "streamingAdd": JSONMethodSignature{
+ "streamingAdd": signature.JSONMethodSignature{
InArgs: []string{},
NumOutArgs: 2,
IsStreaming: true,
@@ -275,7 +280,7 @@
if len(test.streamingInputs) > 0 {
signal = make(chan ipc.Stream, 1)
wsp.outstandingStreams[0] = outstandingStream{
- stream: startQueueingStream(signal),
+ stream: client.StartQueueingStream(signal),
inType: test.streamingInputType,
}
go func() {
@@ -294,6 +299,7 @@
IsStreaming: signal != nil,
}
wsp.sendVeyronRequest(0, &request, &writer, signal)
+
checkResponses(&writer, test.expectedStream, test.expectedError, t)
}
@@ -304,8 +310,8 @@
numOutArgs: 2,
expectedStream: []response{
response{
- Message: []interface{}{float64(5)},
- Type: responseFinal,
+ Message: []interface{}{5.0},
+ Type: lib.ResponseFinal,
},
},
})
@@ -330,27 +336,27 @@
expectedStream: []response{
response{
Message: 1.0,
- Type: responseStream,
+ Type: lib.ResponseStream,
},
response{
Message: 3.0,
- Type: responseStream,
+ Type: lib.ResponseStream,
},
response{
Message: 6.0,
- Type: responseStream,
+ Type: lib.ResponseStream,
},
response{
Message: 10.0,
- Type: responseStream,
+ Type: lib.ResponseStream,
},
response{
Message: nil,
- Type: responseStreamClose,
+ Type: lib.ResponseStreamClose,
},
response{
Message: []interface{}{10.0},
- Type: responseFinal,
+ Type: lib.ResponseFinal,
},
},
})
@@ -385,7 +391,7 @@
writer := testWriter{
logger: wspr.logger,
}
- wsp.writerCreator = func(int64) clientWriter {
+ wsp.writerCreator = func(int64) lib.ClientWriter {
return &writer
}
wsp.setup()
@@ -416,7 +422,7 @@
resp := rt.writer.stream[0]
- if resp.Type != responseFinal {
+ if resp.Type != lib.ResponseFinal {
t.Errorf("unknown stream message Got: %v, expected: serve response", resp)
return
}
@@ -469,14 +475,14 @@
err *verror.Standard
}
-func sendServerStream(t *testing.T, wsp *websocketPipe, test *jsServerTestCase, w clientWriter) {
+func sendServerStream(t *testing.T, wsp *websocketPipe, test *jsServerTestCase, w lib.ClientWriter) {
for _, msg := range test.serverStream {
wsp.sendParsedMessageOnStream(0, msg, w)
}
- serverReply := serverRPCReply{
- Results: []interface{}{test.finalResponse},
- Err: test.err,
+ serverReply := map[string]interface{}{
+ "Results": []interface{}{test.finalResponse},
+ "Err": test.err,
}
bytes, err := json.Marshal(serverReply)
@@ -503,7 +509,7 @@
resp := rt.writer.stream[0]
- if resp.Type != responseFinal {
+ if resp.Type != lib.ResponseFinal {
t.Errorf("unknown stream message Got: %v, expected: serve response", resp)
return
}
@@ -533,14 +539,14 @@
expectedWebsocketMessage := []response{
response{
- Type: responseServerRequest,
+ Type: lib.ResponseServerRequest,
Message: map[string]interface{}{
- "serverId": 0.0,
- "method": lowercaseFirstCharacter(test.method),
- "args": test.inArgs,
- "context": map[string]interface{}{
- "name": "adder",
- "suffix": "adder",
+ "ServerId": 0.0,
+ "Method": lib.LowercaseFirstCharacter(test.method),
+ "Args": test.inArgs,
+ "Context": map[string]interface{}{
+ "Name": "adder",
+ "Suffix": "adder",
},
},
},
@@ -551,7 +557,7 @@
t.Errorf("didn't recieve expected message: %v", err)
}
for _, msg := range test.clientStream {
- expectedWebsocketMessage = append(expectedWebsocketMessage, response{Type: responseStream, Message: msg})
+ expectedWebsocketMessage = append(expectedWebsocketMessage, response{Type: lib.ResponseStream, Message: msg})
if err := call.Send(msg); err != nil {
t.Errorf("unexpected error while sending %v: %v", msg, err)
}
@@ -562,7 +568,7 @@
t.Errorf("didn't recieve expected message: %v", err)
}
- expectedWebsocketMessage = append(expectedWebsocketMessage, response{Type: responseStreamClose})
+ expectedWebsocketMessage = append(expectedWebsocketMessage, response{Type: lib.ResponseStreamClose})
expectedStream := test.serverStream
go sendServerStream(t, rt.wsp, &test, rt.writer)