veyron/runtimes/google/ipc: Impelement a better fix for cancellation goroutine leak.

This version prevents spawning many goroutines in the first place by reusing
the contexts cancel channel.  Also fix a goroutine leak when a single
context is used to make a large number of calls over a long period of time
on the client side.

Change-Id: I29fdc6b290b4bbad2ea82c0b399c7deee9dff6f9
diff --git a/runtimes/google/ipc/client.go b/runtimes/google/ipc/client.go
index 1a749d0..f4e177b 100644
--- a/runtimes/google/ipc/client.go
+++ b/runtimes/google/ipc/client.go
@@ -219,14 +219,7 @@
 			vlog.VI(2).Infof("ipc: couldn't connect to server %v: %v", server, err)
 			continue // Try the next server.
 		}
-		timeout := time.Duration(ipc.NoTimeout)
-		if deadline, hasDeadline := ctx.Deadline(); hasDeadline {
-			timeout = deadline.Sub(time.Now())
-			if err := flow.SetDeadline(deadline); err != nil {
-				lastErr = verror.Internalf("ipc: flow.SetDeadline failed: %v", err)
-				continue
-			}
-		}
+		flow.SetDeadline(ctx.Done())
 
 		// Validate caveats on the server's identity for the context associated with this call.
 		blessing, err := authorizeServer(flow.LocalID(), flow.RemoteID(), opts)
@@ -243,11 +236,18 @@
 
 		if doneChan := ctx.Done(); doneChan != nil {
 			go func() {
-				<-ctx.Done()
-				fc.Cancel()
+				select {
+				case <-ctx.Done():
+					fc.Cancel()
+				case <-fc.flow.Closed():
+				}
 			}()
 		}
 
+		timeout := time.Duration(ipc.NoTimeout)
+		if deadline, hasDeadline := ctx.Deadline(); hasDeadline {
+			timeout = deadline.Sub(time.Now())
+		}
 		if verr := fc.start(suffix, method, args, timeout, blessing); verr != nil {
 			return nil, verr
 		}
@@ -356,6 +356,7 @@
 }
 
 func (fc *flowClient) start(suffix, method string, args []interface{}, timeout time.Duration, blessing security.PublicID) verror.E {
+
 	req := ipc.Request{
 		Suffix:        suffix,
 		Method:        method,
diff --git a/runtimes/google/ipc/flow_test.go b/runtimes/google/ipc/flow_test.go
index 34cc921..1a10e49 100644
--- a/runtimes/google/ipc/flow_test.go
+++ b/runtimes/google/ipc/flow_test.go
@@ -4,9 +4,7 @@
 	"bytes"
 	"errors"
 	"fmt"
-	"net"
 	"testing"
-	"time"
 
 	_ "veyron.io/veyron/veyron/lib/testutil"
 	isecurity "veyron.io/veyron/veyron/runtimes/google/security"
@@ -34,8 +32,6 @@
 
 func (f *testFlow) Read(b []byte) (int, error)          { return f.r.Read(b) }
 func (f *testFlow) Write(b []byte) (int, error)         { return f.w.Write(b) }
-func (f *testFlow) LocalAddr() net.Addr                 { return nil }
-func (f *testFlow) RemoteAddr() net.Addr                { return nil }
 func (f *testFlow) LocalEndpoint() naming.Endpoint      { return nil }
 func (f *testFlow) RemoteEndpoint() naming.Endpoint     { return nil }
 func (f *testFlow) LocalID() security.PublicID          { return testID.PublicID() }
@@ -43,9 +39,7 @@
 func (f *testFlow) LocalPrincipal() security.Principal  { return nil }
 func (f *testFlow) LocalBlessings() security.Blessings  { return nil }
 func (f *testFlow) RemoteBlessings() security.Blessings { return nil }
-func (f *testFlow) SetReadDeadline(t time.Time) error   { return nil }
-func (f *testFlow) SetWriteDeadline(t time.Time) error  { return nil }
-func (f *testFlow) SetDeadline(t time.Time) error       { return nil }
+func (f *testFlow) SetDeadline(<-chan struct{})         {}
 func (f *testFlow) IsClosed() bool                      { return false }
 func (f *testFlow) Closed() <-chan struct{}             { return nil }
 func (f *testFlow) Cancel()                             {}
diff --git a/runtimes/google/ipc/server.go b/runtimes/google/ipc/server.go
index 38404b7..57d90dc 100644
--- a/runtimes/google/ipc/server.go
+++ b/runtimes/google/ipc/server.go
@@ -630,6 +630,12 @@
 
 func (fs *flowServer) serve() error {
 	defer fs.flow.Close()
+	// Here we remove the contexts channel as a deadline to the flow.
+	// We do this to ensure clients get a consistent error when they read/write
+	// after the flow is closed.  Otherwise there is a race between the
+	// context cancellation and the flow being closed.
+	defer fs.flow.SetDeadline(nil)
+
 	results, err := fs.processRequest()
 
 	var traceResponse vtrace.Response
@@ -671,45 +677,47 @@
 	return nil
 }
 
-func (fs *flowServer) processRequest() ([]interface{}, verror.E) {
-	start := time.Now()
+func (fs *flowServer) readIPCRequest() (*ipc.Request, verror.E) {
 	// Set a default timeout before reading from the flow. Without this timeout,
 	// a client that sends no request or a partial request will retain the flow
 	// indefinitely (and lock up server resources).
-	deadline := start.Add(defaultCallTimeout)
-	if verr := fs.setDeadline(deadline); verr != nil {
-		return nil, verr
-	}
+	initTimer := newTimer(defaultCallTimeout)
+	defer initTimer.Stop()
+	fs.flow.SetDeadline(initTimer.C)
+
 	// Decode the initial request.
 	var req ipc.Request
 	if err := fs.dec.Decode(&req); err != nil {
 		return nil, verror.BadProtocolf("ipc: request decoding failed: %v", err)
 	}
+	return &req, nil
+}
+
+func (fs *flowServer) processRequest() ([]interface{}, verror.E) {
+	start := time.Now()
+
+	req, verr := fs.readIPCRequest()
+	if verr != nil {
+		return nil, verr
+	}
 	fs.method = req.Method
+
 	// TODO(mattr): Currently this allows users to trigger trace collection
 	// on the server even if they will not be allowed to collect the
 	// results later.  This might be consider a DOS vector.
 	spanName := fmt.Sprintf("Server Call: %s.%s", fs.Name(), fs.Method())
 	fs.T, _ = ivtrace.WithContinuedSpan(fs, spanName, req.TraceRequest)
 
-	// Set the appropriate deadline, if specified.
-	if req.Timeout == ipc.NoTimeout {
-		deadline = time.Time{}
-	} else if req.Timeout > 0 {
-		deadline = start.Add(time.Duration(req.Timeout))
-	}
-	if verr := fs.setDeadline(deadline); verr != nil {
-		return nil, verr
-	}
-
 	var cancel context.CancelFunc
-	if !deadline.IsZero() {
-		fs.T, cancel = fs.WithDeadline(deadline)
+	if req.Timeout != ipc.NoTimeout {
+		fs.T, cancel = fs.WithDeadline(start.Add(time.Duration(req.Timeout)))
 	} else {
 		fs.T, cancel = fs.WithCancel()
 	}
+	fs.flow.SetDeadline(fs.Done())
 
-	// Notify the context when the channel is closed.
+	// Ensure that the context gets cancelled if the flow is closed
+	// due to a network error, or client cancellation.
 	go func() {
 		<-fs.flow.Closed()
 		cancel()
@@ -841,16 +849,6 @@
 	return vsecurity.NewACLAuthorizer(defaultACL(dc.LocalID())).Authorize(dc)
 }
 
-// setDeadline sets a deadline on the flow. The flow will be cancelled if it
-// is not closed by the specified deadline.
-// A zero deadline (time.Time.IsZero) implies that no cancellation is desired.
-func (fs *flowServer) setDeadline(deadline time.Time) verror.E {
-	if err := fs.flow.SetDeadline(deadline); err != nil {
-		return verror.Internalf("ipc: flow SetDeadline failed: %v", err)
-	}
-	return nil
-}
-
 // Send implements the ipc.Stream method.
 func (fs *flowServer) Send(item interface{}) error {
 	defer vlog.LogCall()()
diff --git a/runtimes/google/ipc/stream/crypto/crypto_test.go b/runtimes/google/ipc/stream/crypto/crypto_test.go
index 3c28210..88e0bd3 100644
--- a/runtimes/google/ipc/stream/crypto/crypto_test.go
+++ b/runtimes/google/ipc/stream/crypto/crypto_test.go
@@ -145,7 +145,7 @@
 func tlsCrypters(t testing.TB, serverConn, clientConn net.Conn) (Crypter, Crypter) {
 	crypters := make(chan Crypter)
 	go func() {
-		server, err := NewTLSServer(serverConn, iobuf.NewPool(0))
+		server, err := NewTLSServer(serverConn, serverConn.LocalAddr(), serverConn.RemoteAddr(), iobuf.NewPool(0))
 		if err != nil {
 			t.Fatal(err)
 		}
@@ -153,7 +153,7 @@
 	}()
 
 	go func() {
-		client, err := NewTLSClient(clientConn, TLSClientSessionCache{}, iobuf.NewPool(0))
+		client, err := NewTLSClient(clientConn, clientConn.LocalAddr(), clientConn.RemoteAddr(), TLSClientSessionCache{}, iobuf.NewPool(0))
 		if err != nil {
 			t.Fatal(err)
 		}
diff --git a/runtimes/google/ipc/stream/crypto/tls.go b/runtimes/google/ipc/stream/crypto/tls.go
index df0ea97..962ec8c 100644
--- a/runtimes/google/ipc/stream/crypto/tls.go
+++ b/runtimes/google/ipc/stream/crypto/tls.go
@@ -7,6 +7,7 @@
 	"crypto/tls"
 	"errors"
 	"fmt"
+	"io"
 	"net"
 	"sync"
 	"time"
@@ -31,23 +32,23 @@
 
 // NewTLSClient returns a Crypter implementation that uses TLS, assuming
 // handshaker was initiated by a client.
-func NewTLSClient(handshaker net.Conn, sessionCache TLSClientSessionCache, pool *iobuf.Pool) (Crypter, error) {
+func NewTLSClient(handshaker io.ReadWriteCloser, local, remote net.Addr, sessionCache TLSClientSessionCache, pool *iobuf.Pool) (Crypter, error) {
 	var config tls.Config
 	// TLS + resumption + channel bindings is broken: <https://secure-resumption.com/#channelbindings>.
 	config.SessionTicketsDisabled = true
 	config.InsecureSkipVerify = true
 	config.ClientSessionCache = sessionCache.ClientSessionCache
-	return newTLSCrypter(handshaker, &config, pool, false)
+	return newTLSCrypter(handshaker, local, remote, &config, pool, false)
 }
 
 // NewTLSServer returns a Crypter implementation that uses TLS, assuming
 // handshaker was accepted by a server.
-func NewTLSServer(handshaker net.Conn, pool *iobuf.Pool) (Crypter, error) {
-	return newTLSCrypter(handshaker, ServerTLSConfig(), pool, true)
+func NewTLSServer(handshaker io.ReadWriteCloser, local, remote net.Addr, pool *iobuf.Pool) (Crypter, error) {
+	return newTLSCrypter(handshaker, local, remote, ServerTLSConfig(), pool, true)
 }
 
 type fakeConn struct {
-	handshakeConn net.Conn
+	handshakeConn io.ReadWriteCloser
 	out           bytes.Buffer
 	in            []byte
 	laddr, raddr  net.Addr
@@ -110,8 +111,8 @@
 	fc    *fakeConn
 }
 
-func newTLSCrypter(handshaker net.Conn, config *tls.Config, pool *iobuf.Pool, server bool) (Crypter, error) {
-	fc := &fakeConn{handshakeConn: handshaker, laddr: handshaker.LocalAddr(), raddr: handshaker.RemoteAddr()}
+func newTLSCrypter(handshaker io.ReadWriteCloser, local, remote net.Addr, config *tls.Config, pool *iobuf.Pool, server bool) (Crypter, error) {
+	fc := &fakeConn{handshakeConn: handshaker, laddr: local, raddr: remote}
 	var t *tls.Conn
 	if server {
 		t = tls.Server(fc, config)
diff --git a/runtimes/google/ipc/stream/crypto/tls_old.go b/runtimes/google/ipc/stream/crypto/tls_old.go
index 15dfa44..73a60f5 100644
--- a/runtimes/google/ipc/stream/crypto/tls_old.go
+++ b/runtimes/google/ipc/stream/crypto/tls_old.go
@@ -13,6 +13,7 @@
 	"bytes"
 	"errors"
 	"fmt"
+	"io"
 	"net"
 	"sync"
 	"time"
@@ -38,23 +39,23 @@
 
 // NewTLSClient returns a Crypter implementation that uses TLS, assuming
 // handshaker was initiated by a client.
-func NewTLSClient(handshaker net.Conn, sessionCache TLSClientSessionCache, pool *iobuf.Pool) (Crypter, error) {
+func NewTLSClient(handshaker io.ReadWriteCloser, local, remote net.Addr, sessionCache TLSClientSessionCache, pool *iobuf.Pool) (Crypter, error) {
 	var config tls.Config
 	// TLS + resumption + channel bindings is broken: <https://secure-resumption.com/#channelbindings>.
 	config.SessionTicketsDisabled = true
 	config.InsecureSkipVerify = true
 	config.ClientSessionCache = sessionCache.ClientSessionCache
-	return newTLSCrypter(handshaker, &config, pool, false)
+	return newTLSCrypter(handshaker, local, remote, &config, pool, false)
 }
 
 // NewTLSServer returns a Crypter implementation that uses TLS, assuming
 // handshaker was accepted by a server.
-func NewTLSServer(handshaker net.Conn, pool *iobuf.Pool) (Crypter, error) {
-	return newTLSCrypter(handshaker, ServerTLSConfig(), pool, true)
+func NewTLSServer(handshaker io.ReadWriteCloser, local, remote net.Addr, pool *iobuf.Pool) (Crypter, error) {
+	return newTLSCrypter(handshaker, local, remote, ServerTLSConfig(), pool, true)
 }
 
 type fakeConn struct {
-	handshakeConn net.Conn
+	handshakeConn io.ReadWriteCloser
 	out           bytes.Buffer
 	in            []byte
 	laddr, raddr  net.Addr
@@ -117,8 +118,8 @@
 	fc    *fakeConn
 }
 
-func newTLSCrypter(handshaker net.Conn, config *tls.Config, pool *iobuf.Pool, server bool) (Crypter, error) {
-	fc := &fakeConn{handshakeConn: handshaker, laddr: handshaker.LocalAddr(), raddr: handshaker.RemoteAddr()}
+func newTLSCrypter(handshaker io.ReadWriteCloser, local, remote net.Addr, config *tls.Config, pool *iobuf.Pool, server bool) (Crypter, error) {
+	fc := &fakeConn{handshakeConn: handshaker, laddr: local, raddr: remote}
 	var t *tls.Conn
 	if server {
 		t = tls.Server(fc, config)
diff --git a/runtimes/google/ipc/stream/vc/cancel.go b/runtimes/google/ipc/stream/vc/cancel.go
deleted file mode 100644
index 4fa5180..0000000
--- a/runtimes/google/ipc/stream/vc/cancel.go
+++ /dev/null
@@ -1,37 +0,0 @@
-package vc
-
-import "time"
-
-// cancelChannel creates a channel usable by bqueue.Writer.Put and upcqueue.Get
-// to cancel the calls if they have not completed by the provided deadline.
-// It returns two channels, the first is the channel to supply to Put or Get
-// which will be closed when the deadline expires.
-// The second is the quit channel, which can be closed to clean up resources
-// when the first is no longer needed (the deadline is no longer worth enforcing).
-//
-// A zero deadline (time.Time.IsZero) implies that no cancellation is desired.
-func cancelChannel(deadline time.Time) (expired, quit chan struct{}) {
-	if deadline.IsZero() {
-		return nil, nil
-	}
-	expired = make(chan struct{})
-	quit = make(chan struct{})
-	timer := time.NewTimer(deadline.Sub(time.Now()))
-
-	go func() {
-		select {
-		case <-timer.C:
-		case <-quit:
-		}
-		timer.Stop()
-		close(expired)
-	}()
-	return expired, quit
-}
-
-// timeoutError implements net.Error with Timeout returning true.
-type timeoutError struct{}
-
-func (t timeoutError) Error() string   { return "deadline exceeded" }
-func (t timeoutError) Timeout() bool   { return true }
-func (t timeoutError) Temporary() bool { return false }
diff --git a/runtimes/google/ipc/stream/vc/cancel_test.go b/runtimes/google/ipc/stream/vc/cancel_test.go
deleted file mode 100644
index 90db06d..0000000
--- a/runtimes/google/ipc/stream/vc/cancel_test.go
+++ /dev/null
@@ -1,33 +0,0 @@
-package vc
-
-import (
-	"testing"
-	"time"
-)
-
-func TestCancelChannelNil(t *testing.T) {
-	var zero time.Time
-	if cancel, _ := cancelChannel(zero); cancel != nil {
-		t.Errorf("Got %v want nil with deadline %v", cancel, zero)
-	}
-}
-
-func TestCancelChannel(t *testing.T) {
-	deadline := time.Now()
-	cancel, _ := cancelChannel(deadline)
-	if cancel == nil {
-		t.Fatalf("Got nil channel for deadline %v", deadline)
-	}
-	if _, ok := <-cancel; ok {
-		t.Errorf("Expected channel to be closed")
-	}
-}
-
-func TestCancelChannelQuit(t *testing.T) {
-	deadline := time.Now().Add(time.Hour)
-	cancel, quit := cancelChannel(deadline)
-	close(quit)
-	if _, ok := <-cancel; ok {
-		t.Errorf("Expected channel to be closed")
-	}
-}
diff --git a/runtimes/google/ipc/stream/vc/flow.go b/runtimes/google/ipc/stream/vc/flow.go
index fb74427..1f5a843 100644
--- a/runtimes/google/ipc/stream/vc/flow.go
+++ b/runtimes/google/ipc/stream/vc/flow.go
@@ -2,7 +2,6 @@
 
 import (
 	"net"
-	"time"
 
 	"veyron.io/veyron/veyron2/naming"
 	"veyron.io/veyron/veyron2/security"
@@ -35,17 +34,11 @@
 	return nil
 }
 
-// SetDeadline sets a deadline on the flow. The flow will be cancelled if it
-// is not closed by the specified deadline.
-// A zero deadline (time.Time.IsZero) implies that no cancellation is desired.
-func (f *flow) SetDeadline(t time.Time) error {
-	if err := f.SetReadDeadline(t); err != nil {
-		return err
-	}
-	if err := f.SetWriteDeadline(t); err != nil {
-		return err
-	}
-	return nil
+// SetDeadline sets a deadline channel on the flow.  Reads and writes
+// will be cancelled if the channel is closed.
+func (f *flow) SetDeadline(deadline <-chan struct{}) {
+	f.reader.SetDeadline(deadline)
+	f.writer.SetDeadline(deadline)
 }
 
 // Shutdown closes the flow and discards any queued up write buffers.
diff --git a/runtimes/google/ipc/stream/vc/listener_test.go b/runtimes/google/ipc/stream/vc/listener_test.go
index fd615ab..e8931f7 100644
--- a/runtimes/google/ipc/stream/vc/listener_test.go
+++ b/runtimes/google/ipc/stream/vc/listener_test.go
@@ -1,9 +1,7 @@
 package vc
 
 import (
-	"net"
 	"testing"
-	"time"
 
 	"veyron.io/veyron/veyron2/naming"
 	"veyron.io/veyron/veyron2/security"
@@ -12,24 +10,20 @@
 type noopFlow struct{}
 
 // net.Conn methods
-func (*noopFlow) Read([]byte) (int, error)           { return 0, nil }
-func (*noopFlow) Write([]byte) (int, error)          { return 0, nil }
-func (*noopFlow) Close() error                       { return nil }
-func (*noopFlow) IsClosed() bool                     { return false }
-func (*noopFlow) Closed() <-chan struct{}            { return nil }
-func (*noopFlow) Cancel()                            {}
-func (*noopFlow) LocalEndpoint() naming.Endpoint     { return nil }
-func (*noopFlow) RemoteEndpoint() naming.Endpoint    { return nil }
-func (*noopFlow) LocalAddr() net.Addr                { return nil }
-func (*noopFlow) RemoteAddr() net.Addr               { return nil }
-func (*noopFlow) SetDeadline(t time.Time) error      { return nil }
-func (*noopFlow) SetReadDeadline(t time.Time) error  { return nil }
-func (*noopFlow) SetWriteDeadline(t time.Time) error { return nil }
+func (*noopFlow) Read([]byte) (int, error)        { return 0, nil }
+func (*noopFlow) Write([]byte) (int, error)       { return 0, nil }
+func (*noopFlow) Close() error                    { return nil }
+func (*noopFlow) IsClosed() bool                  { return false }
+func (*noopFlow) Closed() <-chan struct{}         { return nil }
+func (*noopFlow) Cancel()                         {}
+func (*noopFlow) LocalEndpoint() naming.Endpoint  { return nil }
+func (*noopFlow) RemoteEndpoint() naming.Endpoint { return nil }
 
 // Other stream.Flow methods
 func (*noopFlow) LocalPrincipal() security.Principal  { return nil }
 func (*noopFlow) LocalBlessings() security.Blessings  { return nil }
 func (*noopFlow) RemoteBlessings() security.Blessings { return nil }
+func (*noopFlow) SetDeadline(<-chan struct{})         {}
 func (*noopFlow) LocalID() security.PublicID          { return nil }
 func (*noopFlow) RemoteID() security.PublicID         { return nil }
 
diff --git a/runtimes/google/ipc/stream/vc/reader.go b/runtimes/google/ipc/stream/vc/reader.go
index f1b6f3a..ebe161d 100644
--- a/runtimes/google/ipc/stream/vc/reader.go
+++ b/runtimes/google/ipc/stream/vc/reader.go
@@ -5,7 +5,6 @@
 	"io"
 	"sync"
 	"sync/atomic"
-	"time"
 
 	"veyron.io/veyron/veyron/runtimes/google/lib/iobuf"
 	vsync "veyron.io/veyron/veyron/runtimes/google/lib/sync"
@@ -21,13 +20,12 @@
 // reader implements the io.Reader and SetReadDeadline interfaces for a Flow,
 // backed by iobuf.Slice objects read from a upcqueue.
 type reader struct {
-	handler        readHandler
-	src            *upcqueue.T
-	mu             sync.Mutex
-	buf            *iobuf.Slice  // GUARDED_BY(mu)
-	deadline       chan struct{} // GUARDED_BY(mu)
-	cancelDeadline chan struct{} // GUARDED_BY(mu)
-	totalBytes     uint32
+	handler    readHandler
+	src        *upcqueue.T
+	mu         sync.Mutex
+	buf        *iobuf.Slice    // GUARDED_BY(mu)
+	deadline   <-chan struct{} // GUARDED_BY(mu)
+	totalBytes uint32
 }
 
 func newReader(h readHandler) *reader {
@@ -88,19 +86,10 @@
 	return copied, nil
 }
 
-// SetReadDeadline sets a deadline on read. The read will be cancelled if it
-// does not complete by the specified deadline.
-// A zero deadline (time.Time.IsZero) implies that no cancellation is desired.
-func (r *reader) SetReadDeadline(t time.Time) error {
-	c, q := cancelChannel(t)
+func (r *reader) SetDeadline(deadline <-chan struct{}) {
 	r.mu.Lock()
-	if r.cancelDeadline != nil {
-		close(r.cancelDeadline)
-	}
-	r.deadline = c
-	r.cancelDeadline = q
-	r.mu.Unlock()
-	return nil
+	defer r.mu.Unlock()
+	r.deadline = deadline
 }
 
 func (r *reader) BytesRead() uint32 {
@@ -110,3 +99,10 @@
 func (r *reader) Put(slice *iobuf.Slice) error {
 	return r.src.Put(slice)
 }
+
+// timeoutError implements net.Error with Timeout returning true.
+type timeoutError struct{}
+
+func (t timeoutError) Error() string   { return "deadline exceeded" }
+func (t timeoutError) Timeout() bool   { return true }
+func (t timeoutError) Temporary() bool { return false }
diff --git a/runtimes/google/ipc/stream/vc/reader_test.go b/runtimes/google/ipc/stream/vc/reader_test.go
index 4a73158..c9d1dd2 100644
--- a/runtimes/google/ipc/stream/vc/reader_test.go
+++ b/runtimes/google/ipc/stream/vc/reader_test.go
@@ -6,7 +6,6 @@
 	"reflect"
 	"testing"
 	"testing/quick"
-	"time"
 
 	"veyron.io/veyron/veyron/runtimes/google/lib/iobuf"
 )
@@ -91,8 +90,12 @@
 func TestReadDeadline(t *testing.T) {
 	l := &testReadHandler{}
 	r := newReader(l)
-	r.SetReadDeadline(time.Now().Add(1 * time.Millisecond))
 	defer r.Close()
+
+	deadline := make(chan struct{}, 0)
+	r.SetDeadline(deadline)
+	close(deadline)
+
 	var buf [1]byte
 	n, err := r.Read(buf[:])
 	neterr, ok := err.(net.Error)
diff --git a/runtimes/google/ipc/stream/vc/vc.go b/runtimes/google/ipc/stream/vc/vc.go
index dbe46a2..e239b5a 100644
--- a/runtimes/google/ipc/stream/vc/vc.go
+++ b/runtimes/google/ipc/stream/vc/vc.go
@@ -412,7 +412,7 @@
 	if err != nil {
 		return vc.err(fmt.Errorf("failed to create a Flow for setting up TLS: %v", err))
 	}
-	crypter, err := crypto.NewTLSClient(handshakeConn, tlsSessionCache, vc.pool)
+	crypter, err := crypto.NewTLSClient(handshakeConn, handshakeConn.LocalEndpoint(), handshakeConn.RemoteEndpoint(), tlsSessionCache, vc.pool)
 	if err != nil {
 		return vc.err(fmt.Errorf("failed to setup TLS: %v", err))
 	}
@@ -536,7 +536,7 @@
 		vc.mu.Unlock()
 
 		// Establish TLS
-		crypter, err := crypto.NewTLSServer(handshakeConn, vc.pool)
+		crypter, err := crypto.NewTLSServer(handshakeConn, handshakeConn.LocalEndpoint(), handshakeConn.RemoteEndpoint(), vc.pool)
 		if err != nil {
 			sendErr(fmt.Errorf("failed to setup TLS: %v", err))
 			return
@@ -616,13 +616,8 @@
 	if err != nil {
 		return nil, err
 	}
-	return &writer{
-		MTU:            MaxPayloadSizeBytes,
-		Sink:           bq,
-		Alloc:          iobuf.NewAllocator(vc.pool, vc.reserveBytes),
-		SharedCounters: vc.sharedCounters,
-		closed:         make(chan struct{}),
-	}, nil
+	alloc := iobuf.NewAllocator(vc.pool, vc.reserveBytes)
+	return newWriter(MaxPayloadSizeBytes, bq, alloc, vc.sharedCounters), nil
 }
 
 // findFlowLocked finds the flow id for the provided flow.
diff --git a/runtimes/google/ipc/stream/vc/writer.go b/runtimes/google/ipc/stream/vc/writer.go
index 5114cef..f3f3788 100644
--- a/runtimes/google/ipc/stream/vc/writer.go
+++ b/runtimes/google/ipc/stream/vc/writer.go
@@ -5,7 +5,6 @@
 	"fmt"
 	"sync"
 	"sync/atomic"
-	"time"
 
 	"veyron.io/veyron/veyron/runtimes/google/lib/bqueue"
 	"veyron.io/veyron/veyron/runtimes/google/lib/iobuf"
@@ -21,12 +20,11 @@
 	Alloc          *iobuf.Allocator // Allocator for iobuf.Slice objects. GUARDED_BY(mu)
 	SharedCounters *vsync.Semaphore // Semaphore hosting counters shared by all flows over a VC.
 
-	mu             sync.Mutex    // Guards call to Writes
-	wroteOnce      bool          // GUARDED_BY(mu)
-	deadline       chan struct{} // GUARDED_BY(mu)
-	cancelDeadline chan struct{} // GUARDED_BY(mu)
-	isClosed       bool          // GUARDED_BY(mu)
-	closed         chan struct{} // GUARDED_BY(mu)
+	mu        sync.Mutex      // Guards call to Writes
+	wroteOnce bool            // GUARDED_BY(mu)
+	isClosed  bool            // GUARDED_BY(mu)
+	closed    chan struct{}   // GUARDED_BY(mu)
+	deadline  <-chan struct{} // GUARDED_BY(mu)
 
 	// Total number of bytes filled in by all Write calls on this writer.
 	// Atomic operations are used to manipulate it.
@@ -37,6 +35,16 @@
 	sharedCountersBorrowed   int // GUARDED_BY(muSharedCountersBorrowed)
 }
 
+func newWriter(mtu int, sink bqueue.Writer, alloc *iobuf.Allocator, counters *vsync.Semaphore) *writer {
+	return &writer{
+		MTU:            mtu,
+		Sink:           sink,
+		Alloc:          alloc,
+		SharedCounters: counters,
+		closed:         make(chan struct{}),
+	}
+}
+
 // Shutdown closes the writer and discards any queued up write buffers, i.e.,
 // the bqueue.Get call will not see the buffers queued up at this writer.
 // If removeWriter is true the writer will also be removed entirely from the
@@ -138,19 +146,10 @@
 	return written, nil
 }
 
-// SetWriteDeadline sets a deadline on write. The write will be cancelled if it
-// does not complete by the specified deadline.
-// A zero deadline (time.Time.IsZero) implies that no cancellation is desired.
-func (w *writer) SetWriteDeadline(t time.Time) error {
-	c, q := cancelChannel(t)
+func (w *writer) SetDeadline(deadline <-chan struct{}) {
 	w.mu.Lock()
-	if w.cancelDeadline != nil {
-		close(w.cancelDeadline)
-	}
-	w.deadline = c
-	w.cancelDeadline = q
-	w.mu.Unlock()
-	return nil
+	defer w.mu.Unlock()
+	w.deadline = deadline
 }
 
 // Release allows the next 'bytes' of data to be removed from the buffer queue
diff --git a/runtimes/google/ipc/stream/vc/writer_test.go b/runtimes/google/ipc/stream/vc/writer_test.go
index 42a263c..1ba4908 100644
--- a/runtimes/google/ipc/stream/vc/writer_test.go
+++ b/runtimes/google/ipc/stream/vc/writer_test.go
@@ -5,7 +5,6 @@
 	"net"
 	"reflect"
 	"testing"
-	"time"
 
 	"veyron.io/veyron/veyron/runtimes/google/lib/bqueue"
 	"veyron.io/veyron/veyron/runtimes/google/lib/bqueue/drrqueue"
@@ -28,7 +27,7 @@
 	shared := sync.NewSemaphore()
 	shared.IncN(4)
 
-	w := newWriter(bw, shared)
+	w := newTestWriter(bw, shared)
 
 	if n, err := w.Write([]byte("abcd")); n != 4 || err != nil {
 		t.Errorf("Got (%d, %v) want (4, nil)", n, err)
@@ -51,9 +50,11 @@
 
 	// Further writes will block since all 10 bytes (provided to NewWriter)
 	// have been exhausted and Get hasn't been called on bq yet.
-	if err := w.SetWriteDeadline(time.Now()); err != nil {
-		t.Errorf("Got %v want nil", err)
-	}
+	deadline := make(chan struct{}, 0)
+	w.SetDeadline(deadline)
+	close(deadline)
+
+	w.SetDeadline(deadline)
 	if n, err := w.Write([]byte("k")); n != 0 || !isTimeoutError(err) {
 		t.Errorf("Got (%d, %v) want (0, timeout error)", n, err)
 	}
@@ -86,7 +87,7 @@
 	shared := sync.NewSemaphore()
 	shared.IncN(4)
 
-	w := newWriter(bw, shared)
+	w := newTestWriter(bw, shared)
 	w.Close()
 
 	if n, err := w.Write([]byte{1, 2}); n != 0 || err != errWriterClosed {
@@ -106,7 +107,7 @@
 	shared := sync.NewSemaphore()
 	shared.IncN(2)
 
-	w := newWriter(bw, shared)
+	w := newTestWriter(bw, shared)
 	data := []byte{1, 2}
 	if n, err := w.Write(data); n != len(data) || err != nil {
 		t.Fatalf("Got (%d, %v) want (%d, nil)", n, err, len(data))
@@ -142,7 +143,7 @@
 
 	shared := sync.NewSemaphore()
 	shared.IncN(1)
-	w := newWriter(bw, shared)
+	w := newTestWriter(bw, shared)
 	if n, err := w.Write([]byte{1}); n != 1 || err != nil {
 		t.Fatalf("Got (%d, %v) want (1, nil)", n, err)
 	}
@@ -174,7 +175,7 @@
 	shared := sync.NewSemaphore()
 	shared.IncN(4)
 
-	w := newWriter(bw, shared)
+	w := newTestWriter(bw, shared)
 	go w.Close()
 	<-w.Closed()
 
@@ -183,14 +184,9 @@
 	}
 }
 
-func newWriter(bqw bqueue.Writer, shared *sync.Semaphore) *writer {
-	return &writer{
-		MTU:            16,
-		Sink:           bqw,
-		Alloc:          iobuf.NewAllocator(iobuf.NewPool(0), 0),
-		SharedCounters: shared,
-		closed:         make(chan struct{}),
-	}
+func newTestWriter(bqw bqueue.Writer, shared *sync.Semaphore) *writer {
+	alloc := iobuf.NewAllocator(iobuf.NewPool(0), 0)
+	return newWriter(16, bqw, alloc, shared)
 }
 
 func isTimeoutError(err error) bool {
diff --git a/runtimes/google/ipc/timer.go b/runtimes/google/ipc/timer.go
new file mode 100644
index 0000000..fa148d4
--- /dev/null
+++ b/runtimes/google/ipc/timer.go
@@ -0,0 +1,32 @@
+package ipc
+
+import (
+	"time"
+)
+
+// timer is a replacement for time.Timer, the only difference is that
+// its channel is type chan struct{} and it will be closed when the timer expires,
+// which we need in some places.
+type timer struct {
+	base *time.Timer
+	C    <-chan struct{}
+}
+
+func newTimer(d time.Duration) *timer {
+	c := make(chan struct{}, 0)
+	base := time.AfterFunc(d, func() {
+		close(c)
+	})
+	return &timer{
+		base: base,
+		C:    c,
+	}
+}
+
+func (t *timer) Stop() bool {
+	return t.base.Stop()
+}
+
+func (t *timer) Reset(d time.Duration) bool {
+	return t.base.Reset(d)
+}
diff --git a/runtimes/google/ipc/timer_test.go b/runtimes/google/ipc/timer_test.go
new file mode 100644
index 0000000..e7345d6
--- /dev/null
+++ b/runtimes/google/ipc/timer_test.go
@@ -0,0 +1,31 @@
+package ipc
+
+import (
+	"testing"
+	"time"
+)
+
+func TestTimer(t *testing.T) {
+	test := newTimer(time.Millisecond)
+	if _, ok := <-test.C; ok {
+		t.Errorf("Expected the channel to be closed.")
+	}
+
+	// Test resetting.
+	test = newTimer(time.Hour)
+	if reset := test.Reset(time.Millisecond); !reset {
+		t.Errorf("Expected to successfully reset.")
+	}
+	if _, ok := <-test.C; ok {
+		t.Errorf("Expected the channel to be closed.")
+	}
+
+	// Test stop.
+	test = newTimer(100 * time.Millisecond)
+	test.Stop()
+	select {
+	case <-test.C:
+		t.Errorf("the test timer should have been stopped.")
+	case <-time.After(200 * time.Millisecond):
+	}
+}