veyron/services/wspr: Support cancellation and deadlines to and from javascript.

Change-Id: Ie6eb0ff489a04a06572500efd44f4150c97dc045
diff --git a/services/wsprd/app/app.go b/services/wsprd/app/app.go
index f58254e..139a124 100644
--- a/services/wsprd/app/app.go
+++ b/services/wsprd/app/app.go
@@ -67,6 +67,7 @@
 	InArgs      []json.RawMessage
 	NumOutArgs  int32
 	IsStreaming bool
+	Timeout     int64
 }
 
 type veyronRPC struct {
@@ -98,6 +99,11 @@
 	Name string
 }
 
+type outstandingRequest struct {
+	stream *outstandingStream
+	cancel context.CancelFunc
+}
+
 // Controller represents all the state of a Veyron Web App.  This is the struct
 // that is in charge performing all the veyron options.
 type Controller struct {
@@ -118,8 +124,9 @@
 	// by the client.
 	lastGeneratedId int64
 
-	// Streams for the outstanding requests.
-	outstandingStreams map[int64]*outstandingStream
+	// Used to keep track of data (streams and cancellation functions) for
+	// outstanding requests.
+	outstandingRequests map[int64]*outstandingRequest
 
 	// Maps flowids to the server that owns them.
 	flowMap map[int64]*server.Server
@@ -260,20 +267,22 @@
 	c.flowMap[id] = s
 	os := newStream()
 	os.init(stream, vom_wiretype.Type{ID: 1})
-	c.outstandingStreams[id] = os
+	c.outstandingRequests[id] = &outstandingRequest{
+		stream: os,
+	}
 	return &server.Flow{ID: id, Writer: c.writerCreator(id)}
 }
 
 // CleanupFlow removes the bookkeping for a previously created flow.
 func (c *Controller) CleanupFlow(id int64) {
 	c.Lock()
-	stream := c.outstandingStreams[id]
-	delete(c.outstandingStreams, id)
+	request := c.outstandingRequests[id]
+	delete(c.outstandingRequests, id)
 	delete(c.flowMap, id)
 	c.Unlock()
-	if stream != nil {
-		stream.end()
-		stream.waitUntilDone()
+	if request != nil && request.stream != nil {
+		request.stream.end()
+		request.stream.waitUntilDone()
 	}
 }
 
@@ -301,8 +310,13 @@
 	c.Lock()
 	defer c.Unlock()
 
-	for _, stream := range c.outstandingStreams {
-		stream.end()
+	for _, request := range c.outstandingRequests {
+		if request.cancel != nil {
+			request.cancel()
+		}
+		if request.stream != nil {
+			request.stream.end()
+		}
 	}
 
 	for _, server := range c.servers {
@@ -314,7 +328,7 @@
 
 func (c *Controller) setup() {
 	c.signatureManager = lib.NewSignatureManager()
-	c.outstandingStreams = make(map[int64]*outstandingStream)
+	c.outstandingRequests = make(map[int64]*outstandingRequest)
 	c.flowMap = make(map[int64]*server.Server)
 	c.servers = make(map[uint64]*server.Server)
 }
@@ -323,14 +337,14 @@
 // done asynchronously.  If there is an error, it will be sent to w.
 func (c *Controller) SendOnStream(id int64, data string, w lib.ClientWriter) {
 	c.Lock()
-	stream := c.outstandingStreams[id]
+	request := c.outstandingRequests[id]
 	c.Unlock()
 
-	if stream == nil {
+	if request == nil || request.stream == nil {
 		vlog.Errorf("unknown stream: %d", id)
 		return
 	}
-	stream.send(data, w)
+	request.stream.send(data, w)
 }
 
 // SendVeyronRequest makes a veyron request for the given flowId.  If signal is non-nil, it will receive
@@ -396,11 +410,14 @@
 	}
 
 	c.finishCall(ctx, w, call, &msg)
-	if stream != nil {
-		c.Lock()
-		delete(c.outstandingStreams, id)
-		c.Unlock()
+	c.Lock()
+	if request, ok := c.outstandingRequests[id]; ok {
+		delete(c.outstandingRequests, id)
+		if request.cancel != nil {
+			request.cancel()
+		}
 	}
+	c.Unlock()
 }
 
 // HandleVeyronRequest starts a veyron rpc and returns before the rpc has been completed.
@@ -411,32 +428,56 @@
 		return
 	}
 
-	// If this rpc is streaming, we would expect that the client would try to send
-	// on this stream.  Since the initial handshake is done asynchronously, we have
-	// to put the outstanding stream in the map before we make the async call so that
-	// the future send know which queue to write to, even if the client call isn't
-	// actually ready yet.
-	var stream *outstandingStream
-	if veyronTempMsg.IsStreaming {
-		stream = newStream()
-		c.Lock()
-		c.outstandingStreams[id] = stream
-		c.Unlock()
+	var cctx context.T
+	var cancel context.CancelFunc
+
+	// TODO(mattr): To be consistent with go, we should not ignore 0 timeouts.
+	// However as a rollout strategy we must, otherwise there is a circular
+	// dependency between the WSPR change and the JS change that will follow.
+	if veyronTempMsg.Timeout == lib.JSIPCNoTimeout || veyronTempMsg.Timeout == 0 {
+		cctx, cancel = ctx.WithCancel()
+	} else {
+		cctx, cancel = ctx.WithTimeout(lib.JSToGoDuration(veyronTempMsg.Timeout))
 	}
-	go c.sendVeyronRequest(ctx, id, veyronTempMsg, w, stream)
+	var stream *outstandingStream
+
+	request := &outstandingRequest{
+		cancel: cancel,
+	}
+	if veyronTempMsg.IsStreaming {
+		// If this rpc is streaming, we would expect that the client would try to send
+		// on this stream.  Since the initial handshake is done asynchronously, we have
+		// to put the outstanding stream in the map before we make the async call so that
+		// the future send know which queue to write to, even if the client call isn't
+		// actually ready yet.
+		request.stream = newStream()
+	}
+	c.Lock()
+	c.outstandingRequests[id] = request
+	c.Unlock()
+
+	go c.sendVeyronRequest(cctx, id, veyronTempMsg, w, stream)
+}
+
+// HandleVeyronCancellation cancels the request corresponding to the
+// given id if it is still outstanding.
+func (c *Controller) HandleVeyronCancellation(id int64) {
+	c.Lock()
+	defer c.Unlock()
+	if request, ok := c.outstandingRequests[id]; ok && request.cancel != nil {
+		request.cancel()
+	}
 }
 
 // CloseStream closes the stream for a given id.
 func (c *Controller) CloseStream(id int64) {
 	c.Lock()
 	defer c.Unlock()
-	stream := c.outstandingStreams[id]
-	if stream == nil {
-		c.logger.Errorf("close called on non-existent call: %v", id)
+	if request, ok := c.outstandingRequests[id]; ok && request.stream != nil {
+		request.stream.end()
 		return
 	}
-
-	stream.end()
+	c.logger.Errorf("close called on non-existent call: %v", id)
 }
 
 func (c *Controller) maybeCreateServer(serverId uint64) (*server.Server, error) {
diff --git a/services/wsprd/app/app_test.go b/services/wsprd/app/app_test.go
index 3297124..9425760 100644
--- a/services/wsprd/app/app_test.go
+++ b/services/wsprd/app/app_test.go
@@ -249,7 +249,9 @@
 	var stream *outstandingStream
 	if len(test.streamingInputs) > 0 {
 		stream = newStream()
-		controller.outstandingStreams[0] = stream
+		controller.outstandingRequests[0] = &outstandingRequest{
+			stream: stream,
+		}
 		go func() {
 			for _, value := range test.streamingInputs {
 				controller.SendOnStream(0, value, &writer)
@@ -267,7 +269,9 @@
 	}
 	controller.sendVeyronRequest(r.NewContext(), 0, &request, &writer, stream)
 
-	testwriter.CheckResponses(&writer, test.expectedStream, test.expectedError, t)
+	if err := testwriter.CheckResponses(&writer, test.expectedStream, test.expectedError); err != nil {
+		t.Error(err)
+	}
 }
 
 func TestCallingGoServer(t *testing.T) {
@@ -666,7 +670,9 @@
 			var result interface{}
 			var err2 error
 			err := call.Finish(&result, &err2)
-			testwriter.CheckResponses(rt.writer, expectedWebsocketMessage, nil, t)
+			if err := testwriter.CheckResponses(rt.writer, expectedWebsocketMessage, nil); err != nil {
+				t.Error(err)
+			}
 			// We can't do a deep equal with authError because the error returned by the
 			// authorizer is wrapped into another error by the ipc framework.
 			if err == nil {
@@ -732,8 +738,9 @@
 			"Handle":   0.0,
 			"Args":     test.inArgs,
 			"Context": map[string]interface{}{
-				"Name":   "adder",
-				"Suffix": "adder",
+				"Name":    "adder",
+				"Suffix":  "adder",
+				"Timeout": float64(lib.JSIPCNoTimeout),
 				"RemoteBlessings": map[string]interface{}{
 					"Handle":    expectedBlessingsHandle,
 					"PublicKey": publicKey,
@@ -800,7 +807,9 @@
 		t.Errorf("didn't receive expected message: %v", err)
 	}
 
-	testwriter.CheckResponses(rt.writer, expectedWebsocketMessage, nil, t)
+	if err := testwriter.CheckResponses(rt.writer, expectedWebsocketMessage, nil); err != nil {
+		t.Error(err)
+	}
 }
 
 func TestSimpleJSServer(t *testing.T) {
diff --git a/services/wsprd/ipc/server/dispatcher_test.go b/services/wsprd/ipc/server/dispatcher_test.go
index 2260d8e..f07d1e1 100644
--- a/services/wsprd/ipc/server/dispatcher_test.go
+++ b/services/wsprd/ipc/server/dispatcher_test.go
@@ -107,7 +107,9 @@
 			},
 		},
 	}
-	testwriter.CheckResponses(&flowFactory.writer, expectedResponses, nil, t)
+	if err := testwriter.CheckResponses(&flowFactory.writer, expectedResponses, nil); err != nil {
+		t.Error(err)
+	}
 }
 
 func TestSuccessfulLookupWithAuthorizer(t *testing.T) {
@@ -155,7 +157,9 @@
 			},
 		},
 	}
-	testwriter.CheckResponses(&flowFactory.writer, expectedResponses, nil, t)
+	if err := testwriter.CheckResponses(&flowFactory.writer, expectedResponses, nil); err != nil {
+		t.Error(err)
+	}
 }
 
 func TestFailedLookup(t *testing.T) {
@@ -186,5 +190,7 @@
 			},
 		},
 	}
-	testwriter.CheckResponses(&flowFactory.writer, expectedResponses, nil, t)
+	if err := testwriter.CheckResponses(&flowFactory.writer, expectedResponses, nil); err != nil {
+		t.Error(err)
+	}
 }
diff --git a/services/wsprd/ipc/server/server.go b/services/wsprd/ipc/server/server.go
index aca1eee..f879a26 100644
--- a/services/wsprd/ipc/server/server.go
+++ b/services/wsprd/ipc/server/server.go
@@ -5,6 +5,7 @@
 import (
 	"encoding/json"
 	"sync"
+	"time"
 
 	vsecurity "veyron.io/veyron/veyron/security"
 	"veyron.io/wspr/veyron/services/wsprd/identity"
@@ -41,6 +42,7 @@
 
 	// TODO(ataly, ashankar, bjornick): Remove this field once the old security model is killed.
 	RemoteID identity.PublicIDHandle
+	Timeout  int64 // The time period (in ns) between now and the deadline.
 }
 
 // The response from the javascript server to the proxy.
@@ -157,15 +159,23 @@
 		s.mu.Lock()
 		s.outstandingServerRequests[flow.ID] = replyChan
 		s.mu.Unlock()
+
+		timeout := lib.JSIPCNoTimeout
+		if deadline, ok := call.Deadline(); ok {
+			timeout = lib.GoToJSDuration(deadline.Sub(time.Now()))
+		}
+
 		context := serverRPCRequestContext{
-			Suffix: call.Suffix(),
-			Name:   call.Name(),
+			Suffix:  call.Suffix(),
+			Name:    call.Name(),
+			Timeout: timeout,
 		}
 		if s.helper.UseOldModel() {
 			context.RemoteID = s.convertPublicIDToHandle(call.RemoteID())
 		} else {
 			context.RemoteBlessings = s.convertBlessingsToHandle(call.RemoteBlessings())
 		}
+
 		// Send a invocation request to JavaScript
 		message := serverRPCRequest{
 			ServerId: s.id,
@@ -177,9 +187,10 @@
 
 		if err := flow.Writer.Send(lib.ResponseServerRequest, message); err != nil {
 			// Error in marshaling, pass the error through the channel immediately
-			stdErr := verror2.Convert(verror2.Internal, nil, err).(verror2.Standard)
-			replyChan <- &serverRPCReply{nil,
-				&stdErr,
+			if ch := s.popServerRequest(flow.ID); ch != nil {
+				stdErr := verror2.Convert(verror2.Internal, call, err).(verror2.Standard)
+				ch <- &serverRPCReply{nil, &stdErr}
+				s.helper.CleanupFlow(flow.ID)
 			}
 			return replyChan
 		}
@@ -188,7 +199,24 @@
 			"JavaScript server with args %v, MessageId %d was assigned.",
 			methodName, args, flow.ID)
 
+		// Watch for cancellation.
+		go func() {
+			<-call.Done()
+			ch := s.popServerRequest(flow.ID)
+			if ch == nil {
+				return
+			}
+
+			// Send a cancel message to the JS server.
+			flow.Writer.Send(lib.ResponseCancel, nil)
+			s.helper.CleanupFlow(flow.ID)
+
+			err := verror2.Convert(verror2.Aborted, call, call.Err()).(verror2.Standard)
+			ch <- &serverRPCReply{nil, &err}
+		}()
+
 		go proxyStream(call, flow.Writer, s.helper.GetLogger())
+
 		return replyChan
 	}
 }
@@ -280,17 +308,24 @@
 	return s.endpoint, nil
 }
 
-func (s *Server) HandleServerResponse(id int64, data string) {
+func (s *Server) popServerRequest(id int64) chan *serverRPCReply {
 	s.mu.Lock()
+	defer s.mu.Unlock()
 	ch := s.outstandingServerRequests[id]
 	delete(s.outstandingServerRequests, id)
-	s.mu.Unlock()
+
+	return ch
+}
+
+func (s *Server) HandleServerResponse(id int64, data string) {
+	ch := s.popServerRequest(id)
 	if ch == nil {
 		s.helper.GetLogger().Errorf("unexpected result from JavaScript. No channel "+
 			"for MessageId: %d exists. Ignoring the results.", id)
 		//Ignore unknown responses that don't belong to any channel
 		return
 	}
+
 	// Decode the result and send it through the channel
 	var serverReply serverRPCReply
 	if decoderErr := json.Unmarshal([]byte(data), &serverReply); decoderErr != nil {
diff --git a/services/wsprd/lib/testwriter/writer.go b/services/wsprd/lib/testwriter/writer.go
index 01f22ff..093445f 100644
--- a/services/wsprd/lib/testwriter/writer.go
+++ b/services/wsprd/lib/testwriter/writer.go
@@ -86,11 +86,12 @@
 	return nil
 }
 
-func CheckResponses(w *Writer, wantStream []Response, wantErr error, t TestHarness) {
+func CheckResponses(w *Writer, wantStream []Response, wantErr error) error {
 	if got, want := w.Stream, wantStream; !reflect.DeepEqual(got, want) {
-		t.Errorf("streams don't match: got %v, want %v", got, want)
+		return fmt.Errorf("streams don't match: got %v, want %v", got, want)
 	}
 	if got, want := w.err, wantErr; !verror2.Equal(got, want) {
-		t.Errorf("unexpected error, got: %#v, expected: %#v", got, want)
+		return fmt.Errorf("unexpected error, got: %#v, expected: %#v", got, want)
 	}
+	return nil
 }
diff --git a/services/wsprd/lib/time.go b/services/wsprd/lib/time.go
new file mode 100644
index 0000000..c9f89d3
--- /dev/null
+++ b/services/wsprd/lib/time.go
@@ -0,0 +1,23 @@
+package lib
+
+import (
+	"time"
+
+	"veyron.io/veyron/veyron2/ipc"
+)
+
+const nanosecondsPerMillisecond = 1000000
+
+// Javascript uses millisecond time units, not nanoseconds.
+// This is both because they are the native time unit, and because
+// otherwise JS numbers would not be capable of representing the
+// full time range available to Go programs.
+const JSIPCNoTimeout = ipc.NoTimeout / nanosecondsPerMillisecond
+
+func GoToJSDuration(d time.Duration) int64 {
+	return int64(d) / nanosecondsPerMillisecond
+}
+
+func JSToGoDuration(d int64) time.Duration {
+	return time.Duration(d * nanosecondsPerMillisecond)
+}
diff --git a/services/wsprd/lib/writer.go b/services/wsprd/lib/writer.go
index 7c6c18f..61a2cd5 100644
--- a/services/wsprd/lib/writer.go
+++ b/services/wsprd/lib/writer.go
@@ -10,6 +10,7 @@
 	ResponseStreamClose                   = 4
 	ResponseDispatcherLookup              = 5
 	ResponseAuthRequest                   = 6
+	ResponseCancel                        = 7
 )
 
 // This is basically an io.Writer interface, that allows passing error message
diff --git a/services/wsprd/wspr/pipe.go b/services/wsprd/wspr/pipe.go
index feffa05..371fbfd 100644
--- a/services/wsprd/wspr/pipe.go
+++ b/services/wsprd/wspr/pipe.go
@@ -80,6 +80,9 @@
 
 	// A request to create a new random blessings
 	websocketCreateBlessings = 16
+
+	// A request to cancel an rpc initiated by the JS.
+	websocketCancel = 17
 )
 
 type websocketMessage struct {
@@ -300,6 +303,8 @@
 		switch msg.Type {
 		case websocketVeyronRequest:
 			p.controller.HandleVeyronRequest(ctx, msg.Id, msg.Data, ww)
+		case websocketCancel:
+			go p.controller.HandleVeyronCancellation(msg.Id)
 		case websocketStreamingValue:
 			// SendOnStream queues up the message to be sent, but doesn't do the send
 			// on this goroutine.  We need to queue the messages synchronously so that