veyron/services/wspr: Support cancellation and deadlines to and from javascript.

Change-Id: Ie6eb0ff489a04a06572500efd44f4150c97dc045
diff --git a/services/wsprd/app/app.go b/services/wsprd/app/app.go
index f58254e..139a124 100644
--- a/services/wsprd/app/app.go
+++ b/services/wsprd/app/app.go
@@ -67,6 +67,7 @@
 	InArgs      []json.RawMessage
 	NumOutArgs  int32
 	IsStreaming bool
+	Timeout     int64
 }
 
 type veyronRPC struct {
@@ -98,6 +99,11 @@
 	Name string
 }
 
+type outstandingRequest struct {
+	stream *outstandingStream
+	cancel context.CancelFunc
+}
+
 // Controller represents all the state of a Veyron Web App.  This is the struct
 // that is in charge performing all the veyron options.
 type Controller struct {
@@ -118,8 +124,9 @@
 	// by the client.
 	lastGeneratedId int64
 
-	// Streams for the outstanding requests.
-	outstandingStreams map[int64]*outstandingStream
+	// Used to keep track of data (streams and cancellation functions) for
+	// outstanding requests.
+	outstandingRequests map[int64]*outstandingRequest
 
 	// Maps flowids to the server that owns them.
 	flowMap map[int64]*server.Server
@@ -260,20 +267,22 @@
 	c.flowMap[id] = s
 	os := newStream()
 	os.init(stream, vom_wiretype.Type{ID: 1})
-	c.outstandingStreams[id] = os
+	c.outstandingRequests[id] = &outstandingRequest{
+		stream: os,
+	}
 	return &server.Flow{ID: id, Writer: c.writerCreator(id)}
 }
 
 // CleanupFlow removes the bookkeping for a previously created flow.
 func (c *Controller) CleanupFlow(id int64) {
 	c.Lock()
-	stream := c.outstandingStreams[id]
-	delete(c.outstandingStreams, id)
+	request := c.outstandingRequests[id]
+	delete(c.outstandingRequests, id)
 	delete(c.flowMap, id)
 	c.Unlock()
-	if stream != nil {
-		stream.end()
-		stream.waitUntilDone()
+	if request != nil && request.stream != nil {
+		request.stream.end()
+		request.stream.waitUntilDone()
 	}
 }
 
@@ -301,8 +310,13 @@
 	c.Lock()
 	defer c.Unlock()
 
-	for _, stream := range c.outstandingStreams {
-		stream.end()
+	for _, request := range c.outstandingRequests {
+		if request.cancel != nil {
+			request.cancel()
+		}
+		if request.stream != nil {
+			request.stream.end()
+		}
 	}
 
 	for _, server := range c.servers {
@@ -314,7 +328,7 @@
 
 func (c *Controller) setup() {
 	c.signatureManager = lib.NewSignatureManager()
-	c.outstandingStreams = make(map[int64]*outstandingStream)
+	c.outstandingRequests = make(map[int64]*outstandingRequest)
 	c.flowMap = make(map[int64]*server.Server)
 	c.servers = make(map[uint64]*server.Server)
 }
@@ -323,14 +337,14 @@
 // done asynchronously.  If there is an error, it will be sent to w.
 func (c *Controller) SendOnStream(id int64, data string, w lib.ClientWriter) {
 	c.Lock()
-	stream := c.outstandingStreams[id]
+	request := c.outstandingRequests[id]
 	c.Unlock()
 
-	if stream == nil {
+	if request == nil || request.stream == nil {
 		vlog.Errorf("unknown stream: %d", id)
 		return
 	}
-	stream.send(data, w)
+	request.stream.send(data, w)
 }
 
 // SendVeyronRequest makes a veyron request for the given flowId.  If signal is non-nil, it will receive
@@ -396,11 +410,14 @@
 	}
 
 	c.finishCall(ctx, w, call, &msg)
-	if stream != nil {
-		c.Lock()
-		delete(c.outstandingStreams, id)
-		c.Unlock()
+	c.Lock()
+	if request, ok := c.outstandingRequests[id]; ok {
+		delete(c.outstandingRequests, id)
+		if request.cancel != nil {
+			request.cancel()
+		}
 	}
+	c.Unlock()
 }
 
 // HandleVeyronRequest starts a veyron rpc and returns before the rpc has been completed.
@@ -411,32 +428,56 @@
 		return
 	}
 
-	// If this rpc is streaming, we would expect that the client would try to send
-	// on this stream.  Since the initial handshake is done asynchronously, we have
-	// to put the outstanding stream in the map before we make the async call so that
-	// the future send know which queue to write to, even if the client call isn't
-	// actually ready yet.
-	var stream *outstandingStream
-	if veyronTempMsg.IsStreaming {
-		stream = newStream()
-		c.Lock()
-		c.outstandingStreams[id] = stream
-		c.Unlock()
+	var cctx context.T
+	var cancel context.CancelFunc
+
+	// TODO(mattr): To be consistent with go, we should not ignore 0 timeouts.
+	// However as a rollout strategy we must, otherwise there is a circular
+	// dependency between the WSPR change and the JS change that will follow.
+	if veyronTempMsg.Timeout == lib.JSIPCNoTimeout || veyronTempMsg.Timeout == 0 {
+		cctx, cancel = ctx.WithCancel()
+	} else {
+		cctx, cancel = ctx.WithTimeout(lib.JSToGoDuration(veyronTempMsg.Timeout))
 	}
-	go c.sendVeyronRequest(ctx, id, veyronTempMsg, w, stream)
+	var stream *outstandingStream
+
+	request := &outstandingRequest{
+		cancel: cancel,
+	}
+	if veyronTempMsg.IsStreaming {
+		// If this rpc is streaming, we would expect that the client would try to send
+		// on this stream.  Since the initial handshake is done asynchronously, we have
+		// to put the outstanding stream in the map before we make the async call so that
+		// the future send know which queue to write to, even if the client call isn't
+		// actually ready yet.
+		request.stream = newStream()
+	}
+	c.Lock()
+	c.outstandingRequests[id] = request
+	c.Unlock()
+
+	go c.sendVeyronRequest(cctx, id, veyronTempMsg, w, stream)
+}
+
+// HandleVeyronCancellation cancels the request corresponding to the
+// given id if it is still outstanding.
+func (c *Controller) HandleVeyronCancellation(id int64) {
+	c.Lock()
+	defer c.Unlock()
+	if request, ok := c.outstandingRequests[id]; ok && request.cancel != nil {
+		request.cancel()
+	}
 }
 
 // CloseStream closes the stream for a given id.
 func (c *Controller) CloseStream(id int64) {
 	c.Lock()
 	defer c.Unlock()
-	stream := c.outstandingStreams[id]
-	if stream == nil {
-		c.logger.Errorf("close called on non-existent call: %v", id)
+	if request, ok := c.outstandingRequests[id]; ok && request.stream != nil {
+		request.stream.end()
 		return
 	}
-
-	stream.end()
+	c.logger.Errorf("close called on non-existent call: %v", id)
 }
 
 func (c *Controller) maybeCreateServer(serverId uint64) (*server.Server, error) {