Merge "veyron/services/wspr: Cleaned up formatting and the logging so they are more consistent."
diff --git a/services/wspr/wsprd/lib/server.go b/services/wspr/wsprd/lib/server.go
index 1ff9a32..058ea42 100644
--- a/services/wspr/wsprd/lib/server.go
+++ b/services/wspr/wsprd/lib/server.go
@@ -32,11 +32,22 @@
type server struct {
sync.Mutex
- server ipc.Server
- endpoint string
- id uint64
- helper serverHelper
- veyronProxy string
+
+ // The server that handles the ipc layer. Listen on this server is
+ // lazily started.
+ server ipc.Server
+ // The endpoint of the server. This is empty until the server has been
+ // started and listen has been called on it.
+ endpoint string
+
+ // The server id.
+ id uint64
+ helper serverHelper
+
+ // The proxy to listen through.
+ veyronProxy string
+
+ // The set of outstanding server requests.
outstandingServerRequests map[int64]chan *serverRPCReply
}
@@ -66,30 +77,6 @@
// communicate the result back via a channel to the caller
type remoteInvokeFunc func(methodName string, args []interface{}, call ipc.ServerCall) <-chan *serverRPCReply
-func proxyStream(stream ipc.Stream, w clientWriter) {
- var item interface{}
- for err := stream.Recv(&item); err == nil; err = stream.Recv(&item) {
- data := response{Type: responseStream, Message: item}
- if err := vom.ObjToJSON(w, vom.ValueOf(data)); err != nil {
- w.sendError(verror.Internalf("error marshalling stream: %v:", err))
- return
- }
- if err := w.FinishMessage(); err != nil {
- w.getLogger().VI(2).Info("WSPR: error finishing message", err)
- return
- }
- }
-
- if err := vom.ObjToJSON(w, vom.ValueOf(response{Type: responseStreamClose})); err != nil {
- w.sendError(verror.Internalf("error closing stream: %v:", err))
- return
- }
- if err := w.FinishMessage(); err != nil {
- w.getLogger().VI(2).Info("WSPR: error finishing message", err)
- return
- }
-}
-
func (s *server) createRemoteInvokerFunc(serviceName string) remoteInvokeFunc {
return func(methodName string, args []interface{}, call ipc.ServerCall) <-chan *serverRPCReply {
flow := s.helper.createNewFlow(s, senderWrapper{stream: call})
@@ -133,11 +120,35 @@
"JavaScript server %q with args %v, MessageId %d was assigned.",
methodName, serviceName, args, flow.id)
- go proxyStream(call, flow.writer)
+ go proxyStream(call, flow.writer, s.helper.getLogger())
return replyChan
}
}
+func proxyStream(stream ipc.Stream, w clientWriter, logger vlog.Logger) {
+ var item interface{}
+ for err := stream.Recv(&item); err == nil; err = stream.Recv(&item) {
+ data := response{Type: responseStream, Message: item}
+ if err := vom.ObjToJSON(w, vom.ValueOf(data)); err != nil {
+ w.sendError(verror.Internalf("error marshalling stream: %v:", err))
+ return
+ }
+ if err := w.FinishMessage(); err != nil {
+ logger.Error("WSPR: error finishing message", err)
+ return
+ }
+ }
+
+ if err := vom.ObjToJSON(w, vom.ValueOf(response{Type: responseStreamClose})); err != nil {
+ w.sendError(verror.Internalf("error closing stream: %v:", err))
+ return
+ }
+ if err := w.FinishMessage(); err != nil {
+ logger.Error("WSPR: error finishing message", err)
+ return
+ }
+}
+
func (s *server) register(name string, sig JSONServiceSignature) error {
serviceSig, err := sig.ServiceSignature()
if err != nil {
@@ -175,7 +186,7 @@
if err := s.server.Publish(name); err != nil {
return "", err
}
- s.helper.getLogger().VI(0).Infof("endpoint is %s", s.endpoint)
+ s.helper.getLogger().VI(1).Infof("endpoint is %s", s.endpoint)
return s.endpoint, nil
}
@@ -185,7 +196,7 @@
delete(s.outstandingServerRequests, id)
s.Unlock()
if ch == nil {
- s.helper.getLogger().VI(0).Infof("unexpected result from JavaScript. No channel "+
+ s.helper.getLogger().Errorf("unexpected result from JavaScript. No channel "+
"for MessageId: %d exists. Ignoring the results.", id)
//Ignore unknown responses that don't belong to any channel
return
diff --git a/services/wspr/wsprd/lib/writer.go b/services/wspr/wsprd/lib/writer.go
index c8717b3..11fd44c 100644
--- a/services/wspr/wsprd/lib/writer.go
+++ b/services/wspr/wsprd/lib/writer.go
@@ -17,8 +17,6 @@
type clientWriter interface {
Write(p []byte) (int, error)
- getLogger() vlog.Logger
-
sendError(err error)
FinishMessage() error
@@ -28,14 +26,10 @@
type websocketWriter struct {
ws *websocket.Conn
buf bytes.Buffer
- logger vlog.Logger // TODO(bprosnitz) Remove this -- it has nothing to do with websockets!
+ logger vlog.Logger
id int64
}
-func (w *websocketWriter) getLogger() vlog.Logger {
- return w.logger
-}
-
func (w *websocketWriter) Write(p []byte) (int, error) {
w.buf.Write(p)
return len(p), nil
@@ -64,11 +58,11 @@
w.buf.Reset()
if err := vom.ObjToJSON(&w.buf, vom.ValueOf(response{Type: responseError, Message: errMsg})); err != nil {
- w.logger.VI(2).Info("Failed to marshal with", err)
+ w.logger.Error("Failed to marshal with", err)
return
}
if err := w.FinishMessage(); err != nil {
- w.logger.VI(2).Info("WSPR: error finishing message: ", err)
+ w.logger.Error("WSPR: error finishing message: ", err)
return
}
}
diff --git a/services/wspr/wsprd/lib/wspr.go b/services/wspr/wsprd/lib/wspr.go
index 6fc0764..fc10682 100644
--- a/services/wspr/wsprd/lib/wspr.go
+++ b/services/wspr/wsprd/lib/wspr.go
@@ -167,7 +167,7 @@
}
// finishCall waits for the call to finish and write out the response to w.
-func finishCall(w clientWriter, clientCall ipc.Call, msg *veyronRPC) {
+func (wsp *websocketPipe) finishCall(w clientWriter, clientCall ipc.Call, msg *veyronRPC) {
if msg.IsStreaming {
for {
var item interface{}
@@ -184,7 +184,7 @@
continue
}
if err := w.FinishMessage(); err != nil {
- w.getLogger().VI(2).Info("WSPR: error finishing message: ", err)
+ wsp.ctx.logger.Error("WSPR: error finishing message: ", err)
}
}
@@ -192,7 +192,7 @@
w.sendError(verror.Internalf("unable to marshal close stream message"))
}
if err := w.FinishMessage(); err != nil {
- w.getLogger().VI(2).Info("WSPR: error finishing message: ", err)
+ wsp.ctx.logger.Error("WSPR: error finishing message: ", err)
}
}
@@ -226,7 +226,7 @@
}
if err := w.FinishMessage(); err != nil {
- w.getLogger().VI(2).Info("WSPR: error finishing message: ", err)
+ wsp.ctx.logger.Error("WSPR: error finishing message: ", err)
return
}
}
@@ -278,7 +278,7 @@
// be used instead.
var id security.PrivateID
if err := vom.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(msg))).Decode(&id); err != nil {
- logger.VI(0).Info("Could not decode identity:", err)
+ logger.Error("Could not decode identity:", err)
return nil
}
return id
@@ -398,7 +398,6 @@
}
func (wsp *websocketPipe) setup() {
- wsp.ctx.logger.Info("identity is ", wsp.ctx.rt.Identity())
wsp.signatureManager = newSignatureManager()
wsp.outstandingStreams = make(map[int64]sender)
wsp.flowMap = make(map[int64]*server)
@@ -418,7 +417,7 @@
return
} else if err != nil {
http.Error(w, "Internal Error", 500)
- wsp.ctx.logger.VI(0).Infof("websocket upgrade failed: %s", err)
+ wsp.ctx.logger.Errorf("websocket upgrade failed: %s", err)
return
}
@@ -442,11 +441,11 @@
wc, err := wsp.ws.NextWriter(websocket.TextMessage)
if err != nil {
- wsp.ctx.logger.VI(0).Infof("failed to create websocket writer: %s", err)
+ wsp.ctx.logger.Errorf("failed to create websocket writer: %s", err)
return
}
if err := vom.ObjToJSON(wc, vom.ValueOf(msg)); err != nil {
- wsp.ctx.logger.VI(0).Infof("failed to convert wspr config to json: %s", err)
+ wsp.ctx.logger.Errorf("failed to convert wspr config to json: %s", err)
return
}
wc.Close()
@@ -455,9 +454,9 @@
func (wsp *websocketPipe) pingLoop() {
for {
time.Sleep(pingInterval)
- wsp.ctx.logger.VI(2).Infof("ws: ping")
+ wsp.ctx.logger.VI(2).Info("ws: ping")
if err := wsp.ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
- wsp.ctx.logger.VI(2).Infof("ws: ping failed")
+ wsp.ctx.logger.Error("ws: ping failed")
return
}
}
@@ -505,7 +504,7 @@
signal <- call
}
- finishCall(w, call, veyronMsg)
+ wsp.finishCall(w, call, veyronMsg)
if signal != nil {
wsp.Lock()
delete(wsp.outstandingStreams, id)
@@ -541,19 +540,19 @@
defer wsp.Unlock()
stream := wsp.outstandingStreams[id]
if stream == nil {
- wsp.ctx.logger.VI(0).Infof("close called on non-existent call: %v", id)
+ wsp.ctx.logger.Errorf("close called on non-existent call: %v", id)
return
}
var call queueingStream
var ok bool
if call, ok = stream.(queueingStream); !ok {
- wsp.ctx.logger.VI(0).Infof("can't close server stream: %v", id)
+ wsp.ctx.logger.Errorf("can't close server stream: %v", id)
return
}
if err := call.Close(); err != nil {
- wsp.ctx.logger.VI(0).Infof("client call close failed with: %v", err)
+ wsp.ctx.logger.Errorf("client call close failed with: %v", err)
}
}
@@ -565,19 +564,19 @@
break
}
if err != nil {
- wsp.ctx.logger.VI(0).Infof("websocket receive: %s", err)
+ wsp.ctx.logger.VI(1).Infof("websocket receive: %s", err)
break
}
if op != websocket.TextMessage {
- wsp.ctx.logger.VI(0).Infof("unexpected websocket op: %v", op)
+ wsp.ctx.logger.Errorf("unexpected websocket op: %v", op)
}
var msg websocketMessage
decoder := json.NewDecoder(r)
if err := decoder.Decode(&msg); err != nil {
errMsg := fmt.Sprintf("can't unmarshall JSONMessage: %v", err)
- wsp.ctx.logger.VI(2).Info(errMsg)
+ wsp.ctx.logger.Error(errMsg)
wsp.ws.WriteMessage(websocket.TextMessage, []byte(errMsg))
continue
}
@@ -653,7 +652,7 @@
}
if err := w.FinishMessage(); err != nil {
- w.getLogger().VI(2).Info("WSPR: error finishing message: ", err)
+ wsp.ctx.logger.Error("WSPR: error finishing message: ", err)
return
}
}
@@ -678,7 +677,7 @@
server := wsp.flowMap[id]
wsp.Unlock()
if server == nil {
- wsp.ctx.logger.VI(0).Infof("unexpected result from JavaScript. No channel "+
+ wsp.ctx.logger.Errorf("unexpected result from JavaScript. No channel "+
"for MessageId: %d exists. Ignoring the results.", id)
//Ignore unknown responses that don't belong to any channel
return
@@ -766,7 +765,6 @@
// handleSignatureRequest uses signature manager to get and cache signature of a remote server
func (wsp *websocketPipe) handleSignatureRequest(data string, w *websocketWriter) {
-
// Decode the request
var request signatureRequest
decoder := json.NewDecoder(bytes.NewBufferString(data))
@@ -790,16 +788,14 @@
return
}
if err := w.FinishMessage(); err != nil {
- w.logger.VI(2).Info("WSPR: error finishing message: ", err)
+ w.logger.Error("WSPR: error finishing message: ", err)
return
}
}
func (ctx *WSPR) setup() {
-
// Cache up to 20 identity.PrivateID->ipc.Client mappings
ctx.clientCache = NewClientCache(20)
-
}
// Starts the proxy and listens for requests. This method is blocking.
@@ -811,7 +807,7 @@
pipe := &websocketPipe{ctx: &ctx}
pipe.start(w, r)
})
- ctx.logger.VI(0).Infof("Listening on port %d.", ctx.port)
+ ctx.logger.VI(1).Infof("Listening on port %d.", ctx.port)
httpErr := http.ListenAndServe(fmt.Sprintf(":%d", ctx.port), nil)
if httpErr != nil {
log.Fatalf("Failed to HTTP serve: %s", httpErr)
diff --git a/services/wspr/wsprd/lib/wspr_test.go b/services/wspr/wsprd/lib/wspr_test.go
index 171b39a..6adfbf4 100644
--- a/services/wspr/wsprd/lib/wspr_test.go
+++ b/services/wspr/wsprd/lib/wspr_test.go
@@ -160,10 +160,6 @@
}
-func (w *testWriter) getLogger() vlog.Logger {
- return w.logger
-}
-
func (w *testWriter) sendError(err error) {
w.err = err
}