veyron/runtimes/google/ipc: Impelement a better fix for cancellation goroutine leak.
This version prevents spawning many goroutines in the first place by reusing
the contexts cancel channel. Also fix a goroutine leak when a single
context is used to make a large number of calls over a long period of time
on the client side.
Change-Id: I29fdc6b290b4bbad2ea82c0b399c7deee9dff6f9
diff --git a/runtimes/google/ipc/client.go b/runtimes/google/ipc/client.go
index 1a749d0..f4e177b 100644
--- a/runtimes/google/ipc/client.go
+++ b/runtimes/google/ipc/client.go
@@ -219,14 +219,7 @@
vlog.VI(2).Infof("ipc: couldn't connect to server %v: %v", server, err)
continue // Try the next server.
}
- timeout := time.Duration(ipc.NoTimeout)
- if deadline, hasDeadline := ctx.Deadline(); hasDeadline {
- timeout = deadline.Sub(time.Now())
- if err := flow.SetDeadline(deadline); err != nil {
- lastErr = verror.Internalf("ipc: flow.SetDeadline failed: %v", err)
- continue
- }
- }
+ flow.SetDeadline(ctx.Done())
// Validate caveats on the server's identity for the context associated with this call.
blessing, err := authorizeServer(flow.LocalID(), flow.RemoteID(), opts)
@@ -243,11 +236,18 @@
if doneChan := ctx.Done(); doneChan != nil {
go func() {
- <-ctx.Done()
- fc.Cancel()
+ select {
+ case <-ctx.Done():
+ fc.Cancel()
+ case <-fc.flow.Closed():
+ }
}()
}
+ timeout := time.Duration(ipc.NoTimeout)
+ if deadline, hasDeadline := ctx.Deadline(); hasDeadline {
+ timeout = deadline.Sub(time.Now())
+ }
if verr := fc.start(suffix, method, args, timeout, blessing); verr != nil {
return nil, verr
}
@@ -356,6 +356,7 @@
}
func (fc *flowClient) start(suffix, method string, args []interface{}, timeout time.Duration, blessing security.PublicID) verror.E {
+
req := ipc.Request{
Suffix: suffix,
Method: method,
diff --git a/runtimes/google/ipc/flow_test.go b/runtimes/google/ipc/flow_test.go
index 34cc921..1a10e49 100644
--- a/runtimes/google/ipc/flow_test.go
+++ b/runtimes/google/ipc/flow_test.go
@@ -4,9 +4,7 @@
"bytes"
"errors"
"fmt"
- "net"
"testing"
- "time"
_ "veyron.io/veyron/veyron/lib/testutil"
isecurity "veyron.io/veyron/veyron/runtimes/google/security"
@@ -34,8 +32,6 @@
func (f *testFlow) Read(b []byte) (int, error) { return f.r.Read(b) }
func (f *testFlow) Write(b []byte) (int, error) { return f.w.Write(b) }
-func (f *testFlow) LocalAddr() net.Addr { return nil }
-func (f *testFlow) RemoteAddr() net.Addr { return nil }
func (f *testFlow) LocalEndpoint() naming.Endpoint { return nil }
func (f *testFlow) RemoteEndpoint() naming.Endpoint { return nil }
func (f *testFlow) LocalID() security.PublicID { return testID.PublicID() }
@@ -43,9 +39,7 @@
func (f *testFlow) LocalPrincipal() security.Principal { return nil }
func (f *testFlow) LocalBlessings() security.Blessings { return nil }
func (f *testFlow) RemoteBlessings() security.Blessings { return nil }
-func (f *testFlow) SetReadDeadline(t time.Time) error { return nil }
-func (f *testFlow) SetWriteDeadline(t time.Time) error { return nil }
-func (f *testFlow) SetDeadline(t time.Time) error { return nil }
+func (f *testFlow) SetDeadline(<-chan struct{}) {}
func (f *testFlow) IsClosed() bool { return false }
func (f *testFlow) Closed() <-chan struct{} { return nil }
func (f *testFlow) Cancel() {}
diff --git a/runtimes/google/ipc/server.go b/runtimes/google/ipc/server.go
index 38404b7..57d90dc 100644
--- a/runtimes/google/ipc/server.go
+++ b/runtimes/google/ipc/server.go
@@ -630,6 +630,12 @@
func (fs *flowServer) serve() error {
defer fs.flow.Close()
+ // Here we remove the contexts channel as a deadline to the flow.
+ // We do this to ensure clients get a consistent error when they read/write
+ // after the flow is closed. Otherwise there is a race between the
+ // context cancellation and the flow being closed.
+ defer fs.flow.SetDeadline(nil)
+
results, err := fs.processRequest()
var traceResponse vtrace.Response
@@ -671,45 +677,47 @@
return nil
}
-func (fs *flowServer) processRequest() ([]interface{}, verror.E) {
- start := time.Now()
+func (fs *flowServer) readIPCRequest() (*ipc.Request, verror.E) {
// Set a default timeout before reading from the flow. Without this timeout,
// a client that sends no request or a partial request will retain the flow
// indefinitely (and lock up server resources).
- deadline := start.Add(defaultCallTimeout)
- if verr := fs.setDeadline(deadline); verr != nil {
- return nil, verr
- }
+ initTimer := newTimer(defaultCallTimeout)
+ defer initTimer.Stop()
+ fs.flow.SetDeadline(initTimer.C)
+
// Decode the initial request.
var req ipc.Request
if err := fs.dec.Decode(&req); err != nil {
return nil, verror.BadProtocolf("ipc: request decoding failed: %v", err)
}
+ return &req, nil
+}
+
+func (fs *flowServer) processRequest() ([]interface{}, verror.E) {
+ start := time.Now()
+
+ req, verr := fs.readIPCRequest()
+ if verr != nil {
+ return nil, verr
+ }
fs.method = req.Method
+
// TODO(mattr): Currently this allows users to trigger trace collection
// on the server even if they will not be allowed to collect the
// results later. This might be consider a DOS vector.
spanName := fmt.Sprintf("Server Call: %s.%s", fs.Name(), fs.Method())
fs.T, _ = ivtrace.WithContinuedSpan(fs, spanName, req.TraceRequest)
- // Set the appropriate deadline, if specified.
- if req.Timeout == ipc.NoTimeout {
- deadline = time.Time{}
- } else if req.Timeout > 0 {
- deadline = start.Add(time.Duration(req.Timeout))
- }
- if verr := fs.setDeadline(deadline); verr != nil {
- return nil, verr
- }
-
var cancel context.CancelFunc
- if !deadline.IsZero() {
- fs.T, cancel = fs.WithDeadline(deadline)
+ if req.Timeout != ipc.NoTimeout {
+ fs.T, cancel = fs.WithDeadline(start.Add(time.Duration(req.Timeout)))
} else {
fs.T, cancel = fs.WithCancel()
}
+ fs.flow.SetDeadline(fs.Done())
- // Notify the context when the channel is closed.
+ // Ensure that the context gets cancelled if the flow is closed
+ // due to a network error, or client cancellation.
go func() {
<-fs.flow.Closed()
cancel()
@@ -841,16 +849,6 @@
return vsecurity.NewACLAuthorizer(defaultACL(dc.LocalID())).Authorize(dc)
}
-// setDeadline sets a deadline on the flow. The flow will be cancelled if it
-// is not closed by the specified deadline.
-// A zero deadline (time.Time.IsZero) implies that no cancellation is desired.
-func (fs *flowServer) setDeadline(deadline time.Time) verror.E {
- if err := fs.flow.SetDeadline(deadline); err != nil {
- return verror.Internalf("ipc: flow SetDeadline failed: %v", err)
- }
- return nil
-}
-
// Send implements the ipc.Stream method.
func (fs *flowServer) Send(item interface{}) error {
defer vlog.LogCall()()
diff --git a/runtimes/google/ipc/stream/crypto/crypto_test.go b/runtimes/google/ipc/stream/crypto/crypto_test.go
index 3c28210..88e0bd3 100644
--- a/runtimes/google/ipc/stream/crypto/crypto_test.go
+++ b/runtimes/google/ipc/stream/crypto/crypto_test.go
@@ -145,7 +145,7 @@
func tlsCrypters(t testing.TB, serverConn, clientConn net.Conn) (Crypter, Crypter) {
crypters := make(chan Crypter)
go func() {
- server, err := NewTLSServer(serverConn, iobuf.NewPool(0))
+ server, err := NewTLSServer(serverConn, serverConn.LocalAddr(), serverConn.RemoteAddr(), iobuf.NewPool(0))
if err != nil {
t.Fatal(err)
}
@@ -153,7 +153,7 @@
}()
go func() {
- client, err := NewTLSClient(clientConn, TLSClientSessionCache{}, iobuf.NewPool(0))
+ client, err := NewTLSClient(clientConn, clientConn.LocalAddr(), clientConn.RemoteAddr(), TLSClientSessionCache{}, iobuf.NewPool(0))
if err != nil {
t.Fatal(err)
}
diff --git a/runtimes/google/ipc/stream/crypto/tls.go b/runtimes/google/ipc/stream/crypto/tls.go
index df0ea97..962ec8c 100644
--- a/runtimes/google/ipc/stream/crypto/tls.go
+++ b/runtimes/google/ipc/stream/crypto/tls.go
@@ -7,6 +7,7 @@
"crypto/tls"
"errors"
"fmt"
+ "io"
"net"
"sync"
"time"
@@ -31,23 +32,23 @@
// NewTLSClient returns a Crypter implementation that uses TLS, assuming
// handshaker was initiated by a client.
-func NewTLSClient(handshaker net.Conn, sessionCache TLSClientSessionCache, pool *iobuf.Pool) (Crypter, error) {
+func NewTLSClient(handshaker io.ReadWriteCloser, local, remote net.Addr, sessionCache TLSClientSessionCache, pool *iobuf.Pool) (Crypter, error) {
var config tls.Config
// TLS + resumption + channel bindings is broken: <https://secure-resumption.com/#channelbindings>.
config.SessionTicketsDisabled = true
config.InsecureSkipVerify = true
config.ClientSessionCache = sessionCache.ClientSessionCache
- return newTLSCrypter(handshaker, &config, pool, false)
+ return newTLSCrypter(handshaker, local, remote, &config, pool, false)
}
// NewTLSServer returns a Crypter implementation that uses TLS, assuming
// handshaker was accepted by a server.
-func NewTLSServer(handshaker net.Conn, pool *iobuf.Pool) (Crypter, error) {
- return newTLSCrypter(handshaker, ServerTLSConfig(), pool, true)
+func NewTLSServer(handshaker io.ReadWriteCloser, local, remote net.Addr, pool *iobuf.Pool) (Crypter, error) {
+ return newTLSCrypter(handshaker, local, remote, ServerTLSConfig(), pool, true)
}
type fakeConn struct {
- handshakeConn net.Conn
+ handshakeConn io.ReadWriteCloser
out bytes.Buffer
in []byte
laddr, raddr net.Addr
@@ -110,8 +111,8 @@
fc *fakeConn
}
-func newTLSCrypter(handshaker net.Conn, config *tls.Config, pool *iobuf.Pool, server bool) (Crypter, error) {
- fc := &fakeConn{handshakeConn: handshaker, laddr: handshaker.LocalAddr(), raddr: handshaker.RemoteAddr()}
+func newTLSCrypter(handshaker io.ReadWriteCloser, local, remote net.Addr, config *tls.Config, pool *iobuf.Pool, server bool) (Crypter, error) {
+ fc := &fakeConn{handshakeConn: handshaker, laddr: local, raddr: remote}
var t *tls.Conn
if server {
t = tls.Server(fc, config)
diff --git a/runtimes/google/ipc/stream/crypto/tls_old.go b/runtimes/google/ipc/stream/crypto/tls_old.go
index 15dfa44..73a60f5 100644
--- a/runtimes/google/ipc/stream/crypto/tls_old.go
+++ b/runtimes/google/ipc/stream/crypto/tls_old.go
@@ -13,6 +13,7 @@
"bytes"
"errors"
"fmt"
+ "io"
"net"
"sync"
"time"
@@ -38,23 +39,23 @@
// NewTLSClient returns a Crypter implementation that uses TLS, assuming
// handshaker was initiated by a client.
-func NewTLSClient(handshaker net.Conn, sessionCache TLSClientSessionCache, pool *iobuf.Pool) (Crypter, error) {
+func NewTLSClient(handshaker io.ReadWriteCloser, local, remote net.Addr, sessionCache TLSClientSessionCache, pool *iobuf.Pool) (Crypter, error) {
var config tls.Config
// TLS + resumption + channel bindings is broken: <https://secure-resumption.com/#channelbindings>.
config.SessionTicketsDisabled = true
config.InsecureSkipVerify = true
config.ClientSessionCache = sessionCache.ClientSessionCache
- return newTLSCrypter(handshaker, &config, pool, false)
+ return newTLSCrypter(handshaker, local, remote, &config, pool, false)
}
// NewTLSServer returns a Crypter implementation that uses TLS, assuming
// handshaker was accepted by a server.
-func NewTLSServer(handshaker net.Conn, pool *iobuf.Pool) (Crypter, error) {
- return newTLSCrypter(handshaker, ServerTLSConfig(), pool, true)
+func NewTLSServer(handshaker io.ReadWriteCloser, local, remote net.Addr, pool *iobuf.Pool) (Crypter, error) {
+ return newTLSCrypter(handshaker, local, remote, ServerTLSConfig(), pool, true)
}
type fakeConn struct {
- handshakeConn net.Conn
+ handshakeConn io.ReadWriteCloser
out bytes.Buffer
in []byte
laddr, raddr net.Addr
@@ -117,8 +118,8 @@
fc *fakeConn
}
-func newTLSCrypter(handshaker net.Conn, config *tls.Config, pool *iobuf.Pool, server bool) (Crypter, error) {
- fc := &fakeConn{handshakeConn: handshaker, laddr: handshaker.LocalAddr(), raddr: handshaker.RemoteAddr()}
+func newTLSCrypter(handshaker io.ReadWriteCloser, local, remote net.Addr, config *tls.Config, pool *iobuf.Pool, server bool) (Crypter, error) {
+ fc := &fakeConn{handshakeConn: handshaker, laddr: local, raddr: remote}
var t *tls.Conn
if server {
t = tls.Server(fc, config)
diff --git a/runtimes/google/ipc/stream/vc/cancel.go b/runtimes/google/ipc/stream/vc/cancel.go
deleted file mode 100644
index 4fa5180..0000000
--- a/runtimes/google/ipc/stream/vc/cancel.go
+++ /dev/null
@@ -1,37 +0,0 @@
-package vc
-
-import "time"
-
-// cancelChannel creates a channel usable by bqueue.Writer.Put and upcqueue.Get
-// to cancel the calls if they have not completed by the provided deadline.
-// It returns two channels, the first is the channel to supply to Put or Get
-// which will be closed when the deadline expires.
-// The second is the quit channel, which can be closed to clean up resources
-// when the first is no longer needed (the deadline is no longer worth enforcing).
-//
-// A zero deadline (time.Time.IsZero) implies that no cancellation is desired.
-func cancelChannel(deadline time.Time) (expired, quit chan struct{}) {
- if deadline.IsZero() {
- return nil, nil
- }
- expired = make(chan struct{})
- quit = make(chan struct{})
- timer := time.NewTimer(deadline.Sub(time.Now()))
-
- go func() {
- select {
- case <-timer.C:
- case <-quit:
- }
- timer.Stop()
- close(expired)
- }()
- return expired, quit
-}
-
-// timeoutError implements net.Error with Timeout returning true.
-type timeoutError struct{}
-
-func (t timeoutError) Error() string { return "deadline exceeded" }
-func (t timeoutError) Timeout() bool { return true }
-func (t timeoutError) Temporary() bool { return false }
diff --git a/runtimes/google/ipc/stream/vc/cancel_test.go b/runtimes/google/ipc/stream/vc/cancel_test.go
deleted file mode 100644
index 90db06d..0000000
--- a/runtimes/google/ipc/stream/vc/cancel_test.go
+++ /dev/null
@@ -1,33 +0,0 @@
-package vc
-
-import (
- "testing"
- "time"
-)
-
-func TestCancelChannelNil(t *testing.T) {
- var zero time.Time
- if cancel, _ := cancelChannel(zero); cancel != nil {
- t.Errorf("Got %v want nil with deadline %v", cancel, zero)
- }
-}
-
-func TestCancelChannel(t *testing.T) {
- deadline := time.Now()
- cancel, _ := cancelChannel(deadline)
- if cancel == nil {
- t.Fatalf("Got nil channel for deadline %v", deadline)
- }
- if _, ok := <-cancel; ok {
- t.Errorf("Expected channel to be closed")
- }
-}
-
-func TestCancelChannelQuit(t *testing.T) {
- deadline := time.Now().Add(time.Hour)
- cancel, quit := cancelChannel(deadline)
- close(quit)
- if _, ok := <-cancel; ok {
- t.Errorf("Expected channel to be closed")
- }
-}
diff --git a/runtimes/google/ipc/stream/vc/flow.go b/runtimes/google/ipc/stream/vc/flow.go
index fb74427..1f5a843 100644
--- a/runtimes/google/ipc/stream/vc/flow.go
+++ b/runtimes/google/ipc/stream/vc/flow.go
@@ -2,7 +2,6 @@
import (
"net"
- "time"
"veyron.io/veyron/veyron2/naming"
"veyron.io/veyron/veyron2/security"
@@ -35,17 +34,11 @@
return nil
}
-// SetDeadline sets a deadline on the flow. The flow will be cancelled if it
-// is not closed by the specified deadline.
-// A zero deadline (time.Time.IsZero) implies that no cancellation is desired.
-func (f *flow) SetDeadline(t time.Time) error {
- if err := f.SetReadDeadline(t); err != nil {
- return err
- }
- if err := f.SetWriteDeadline(t); err != nil {
- return err
- }
- return nil
+// SetDeadline sets a deadline channel on the flow. Reads and writes
+// will be cancelled if the channel is closed.
+func (f *flow) SetDeadline(deadline <-chan struct{}) {
+ f.reader.SetDeadline(deadline)
+ f.writer.SetDeadline(deadline)
}
// Shutdown closes the flow and discards any queued up write buffers.
diff --git a/runtimes/google/ipc/stream/vc/listener_test.go b/runtimes/google/ipc/stream/vc/listener_test.go
index fd615ab..e8931f7 100644
--- a/runtimes/google/ipc/stream/vc/listener_test.go
+++ b/runtimes/google/ipc/stream/vc/listener_test.go
@@ -1,9 +1,7 @@
package vc
import (
- "net"
"testing"
- "time"
"veyron.io/veyron/veyron2/naming"
"veyron.io/veyron/veyron2/security"
@@ -12,24 +10,20 @@
type noopFlow struct{}
// net.Conn methods
-func (*noopFlow) Read([]byte) (int, error) { return 0, nil }
-func (*noopFlow) Write([]byte) (int, error) { return 0, nil }
-func (*noopFlow) Close() error { return nil }
-func (*noopFlow) IsClosed() bool { return false }
-func (*noopFlow) Closed() <-chan struct{} { return nil }
-func (*noopFlow) Cancel() {}
-func (*noopFlow) LocalEndpoint() naming.Endpoint { return nil }
-func (*noopFlow) RemoteEndpoint() naming.Endpoint { return nil }
-func (*noopFlow) LocalAddr() net.Addr { return nil }
-func (*noopFlow) RemoteAddr() net.Addr { return nil }
-func (*noopFlow) SetDeadline(t time.Time) error { return nil }
-func (*noopFlow) SetReadDeadline(t time.Time) error { return nil }
-func (*noopFlow) SetWriteDeadline(t time.Time) error { return nil }
+func (*noopFlow) Read([]byte) (int, error) { return 0, nil }
+func (*noopFlow) Write([]byte) (int, error) { return 0, nil }
+func (*noopFlow) Close() error { return nil }
+func (*noopFlow) IsClosed() bool { return false }
+func (*noopFlow) Closed() <-chan struct{} { return nil }
+func (*noopFlow) Cancel() {}
+func (*noopFlow) LocalEndpoint() naming.Endpoint { return nil }
+func (*noopFlow) RemoteEndpoint() naming.Endpoint { return nil }
// Other stream.Flow methods
func (*noopFlow) LocalPrincipal() security.Principal { return nil }
func (*noopFlow) LocalBlessings() security.Blessings { return nil }
func (*noopFlow) RemoteBlessings() security.Blessings { return nil }
+func (*noopFlow) SetDeadline(<-chan struct{}) {}
func (*noopFlow) LocalID() security.PublicID { return nil }
func (*noopFlow) RemoteID() security.PublicID { return nil }
diff --git a/runtimes/google/ipc/stream/vc/reader.go b/runtimes/google/ipc/stream/vc/reader.go
index f1b6f3a..ebe161d 100644
--- a/runtimes/google/ipc/stream/vc/reader.go
+++ b/runtimes/google/ipc/stream/vc/reader.go
@@ -5,7 +5,6 @@
"io"
"sync"
"sync/atomic"
- "time"
"veyron.io/veyron/veyron/runtimes/google/lib/iobuf"
vsync "veyron.io/veyron/veyron/runtimes/google/lib/sync"
@@ -21,13 +20,12 @@
// reader implements the io.Reader and SetReadDeadline interfaces for a Flow,
// backed by iobuf.Slice objects read from a upcqueue.
type reader struct {
- handler readHandler
- src *upcqueue.T
- mu sync.Mutex
- buf *iobuf.Slice // GUARDED_BY(mu)
- deadline chan struct{} // GUARDED_BY(mu)
- cancelDeadline chan struct{} // GUARDED_BY(mu)
- totalBytes uint32
+ handler readHandler
+ src *upcqueue.T
+ mu sync.Mutex
+ buf *iobuf.Slice // GUARDED_BY(mu)
+ deadline <-chan struct{} // GUARDED_BY(mu)
+ totalBytes uint32
}
func newReader(h readHandler) *reader {
@@ -88,19 +86,10 @@
return copied, nil
}
-// SetReadDeadline sets a deadline on read. The read will be cancelled if it
-// does not complete by the specified deadline.
-// A zero deadline (time.Time.IsZero) implies that no cancellation is desired.
-func (r *reader) SetReadDeadline(t time.Time) error {
- c, q := cancelChannel(t)
+func (r *reader) SetDeadline(deadline <-chan struct{}) {
r.mu.Lock()
- if r.cancelDeadline != nil {
- close(r.cancelDeadline)
- }
- r.deadline = c
- r.cancelDeadline = q
- r.mu.Unlock()
- return nil
+ defer r.mu.Unlock()
+ r.deadline = deadline
}
func (r *reader) BytesRead() uint32 {
@@ -110,3 +99,10 @@
func (r *reader) Put(slice *iobuf.Slice) error {
return r.src.Put(slice)
}
+
+// timeoutError implements net.Error with Timeout returning true.
+type timeoutError struct{}
+
+func (t timeoutError) Error() string { return "deadline exceeded" }
+func (t timeoutError) Timeout() bool { return true }
+func (t timeoutError) Temporary() bool { return false }
diff --git a/runtimes/google/ipc/stream/vc/reader_test.go b/runtimes/google/ipc/stream/vc/reader_test.go
index 4a73158..c9d1dd2 100644
--- a/runtimes/google/ipc/stream/vc/reader_test.go
+++ b/runtimes/google/ipc/stream/vc/reader_test.go
@@ -6,7 +6,6 @@
"reflect"
"testing"
"testing/quick"
- "time"
"veyron.io/veyron/veyron/runtimes/google/lib/iobuf"
)
@@ -91,8 +90,12 @@
func TestReadDeadline(t *testing.T) {
l := &testReadHandler{}
r := newReader(l)
- r.SetReadDeadline(time.Now().Add(1 * time.Millisecond))
defer r.Close()
+
+ deadline := make(chan struct{}, 0)
+ r.SetDeadline(deadline)
+ close(deadline)
+
var buf [1]byte
n, err := r.Read(buf[:])
neterr, ok := err.(net.Error)
diff --git a/runtimes/google/ipc/stream/vc/vc.go b/runtimes/google/ipc/stream/vc/vc.go
index dbe46a2..e239b5a 100644
--- a/runtimes/google/ipc/stream/vc/vc.go
+++ b/runtimes/google/ipc/stream/vc/vc.go
@@ -412,7 +412,7 @@
if err != nil {
return vc.err(fmt.Errorf("failed to create a Flow for setting up TLS: %v", err))
}
- crypter, err := crypto.NewTLSClient(handshakeConn, tlsSessionCache, vc.pool)
+ crypter, err := crypto.NewTLSClient(handshakeConn, handshakeConn.LocalEndpoint(), handshakeConn.RemoteEndpoint(), tlsSessionCache, vc.pool)
if err != nil {
return vc.err(fmt.Errorf("failed to setup TLS: %v", err))
}
@@ -536,7 +536,7 @@
vc.mu.Unlock()
// Establish TLS
- crypter, err := crypto.NewTLSServer(handshakeConn, vc.pool)
+ crypter, err := crypto.NewTLSServer(handshakeConn, handshakeConn.LocalEndpoint(), handshakeConn.RemoteEndpoint(), vc.pool)
if err != nil {
sendErr(fmt.Errorf("failed to setup TLS: %v", err))
return
@@ -616,13 +616,8 @@
if err != nil {
return nil, err
}
- return &writer{
- MTU: MaxPayloadSizeBytes,
- Sink: bq,
- Alloc: iobuf.NewAllocator(vc.pool, vc.reserveBytes),
- SharedCounters: vc.sharedCounters,
- closed: make(chan struct{}),
- }, nil
+ alloc := iobuf.NewAllocator(vc.pool, vc.reserveBytes)
+ return newWriter(MaxPayloadSizeBytes, bq, alloc, vc.sharedCounters), nil
}
// findFlowLocked finds the flow id for the provided flow.
diff --git a/runtimes/google/ipc/stream/vc/writer.go b/runtimes/google/ipc/stream/vc/writer.go
index 5114cef..f3f3788 100644
--- a/runtimes/google/ipc/stream/vc/writer.go
+++ b/runtimes/google/ipc/stream/vc/writer.go
@@ -5,7 +5,6 @@
"fmt"
"sync"
"sync/atomic"
- "time"
"veyron.io/veyron/veyron/runtimes/google/lib/bqueue"
"veyron.io/veyron/veyron/runtimes/google/lib/iobuf"
@@ -21,12 +20,11 @@
Alloc *iobuf.Allocator // Allocator for iobuf.Slice objects. GUARDED_BY(mu)
SharedCounters *vsync.Semaphore // Semaphore hosting counters shared by all flows over a VC.
- mu sync.Mutex // Guards call to Writes
- wroteOnce bool // GUARDED_BY(mu)
- deadline chan struct{} // GUARDED_BY(mu)
- cancelDeadline chan struct{} // GUARDED_BY(mu)
- isClosed bool // GUARDED_BY(mu)
- closed chan struct{} // GUARDED_BY(mu)
+ mu sync.Mutex // Guards call to Writes
+ wroteOnce bool // GUARDED_BY(mu)
+ isClosed bool // GUARDED_BY(mu)
+ closed chan struct{} // GUARDED_BY(mu)
+ deadline <-chan struct{} // GUARDED_BY(mu)
// Total number of bytes filled in by all Write calls on this writer.
// Atomic operations are used to manipulate it.
@@ -37,6 +35,16 @@
sharedCountersBorrowed int // GUARDED_BY(muSharedCountersBorrowed)
}
+func newWriter(mtu int, sink bqueue.Writer, alloc *iobuf.Allocator, counters *vsync.Semaphore) *writer {
+ return &writer{
+ MTU: mtu,
+ Sink: sink,
+ Alloc: alloc,
+ SharedCounters: counters,
+ closed: make(chan struct{}),
+ }
+}
+
// Shutdown closes the writer and discards any queued up write buffers, i.e.,
// the bqueue.Get call will not see the buffers queued up at this writer.
// If removeWriter is true the writer will also be removed entirely from the
@@ -138,19 +146,10 @@
return written, nil
}
-// SetWriteDeadline sets a deadline on write. The write will be cancelled if it
-// does not complete by the specified deadline.
-// A zero deadline (time.Time.IsZero) implies that no cancellation is desired.
-func (w *writer) SetWriteDeadline(t time.Time) error {
- c, q := cancelChannel(t)
+func (w *writer) SetDeadline(deadline <-chan struct{}) {
w.mu.Lock()
- if w.cancelDeadline != nil {
- close(w.cancelDeadline)
- }
- w.deadline = c
- w.cancelDeadline = q
- w.mu.Unlock()
- return nil
+ defer w.mu.Unlock()
+ w.deadline = deadline
}
// Release allows the next 'bytes' of data to be removed from the buffer queue
diff --git a/runtimes/google/ipc/stream/vc/writer_test.go b/runtimes/google/ipc/stream/vc/writer_test.go
index 42a263c..1ba4908 100644
--- a/runtimes/google/ipc/stream/vc/writer_test.go
+++ b/runtimes/google/ipc/stream/vc/writer_test.go
@@ -5,7 +5,6 @@
"net"
"reflect"
"testing"
- "time"
"veyron.io/veyron/veyron/runtimes/google/lib/bqueue"
"veyron.io/veyron/veyron/runtimes/google/lib/bqueue/drrqueue"
@@ -28,7 +27,7 @@
shared := sync.NewSemaphore()
shared.IncN(4)
- w := newWriter(bw, shared)
+ w := newTestWriter(bw, shared)
if n, err := w.Write([]byte("abcd")); n != 4 || err != nil {
t.Errorf("Got (%d, %v) want (4, nil)", n, err)
@@ -51,9 +50,11 @@
// Further writes will block since all 10 bytes (provided to NewWriter)
// have been exhausted and Get hasn't been called on bq yet.
- if err := w.SetWriteDeadline(time.Now()); err != nil {
- t.Errorf("Got %v want nil", err)
- }
+ deadline := make(chan struct{}, 0)
+ w.SetDeadline(deadline)
+ close(deadline)
+
+ w.SetDeadline(deadline)
if n, err := w.Write([]byte("k")); n != 0 || !isTimeoutError(err) {
t.Errorf("Got (%d, %v) want (0, timeout error)", n, err)
}
@@ -86,7 +87,7 @@
shared := sync.NewSemaphore()
shared.IncN(4)
- w := newWriter(bw, shared)
+ w := newTestWriter(bw, shared)
w.Close()
if n, err := w.Write([]byte{1, 2}); n != 0 || err != errWriterClosed {
@@ -106,7 +107,7 @@
shared := sync.NewSemaphore()
shared.IncN(2)
- w := newWriter(bw, shared)
+ w := newTestWriter(bw, shared)
data := []byte{1, 2}
if n, err := w.Write(data); n != len(data) || err != nil {
t.Fatalf("Got (%d, %v) want (%d, nil)", n, err, len(data))
@@ -142,7 +143,7 @@
shared := sync.NewSemaphore()
shared.IncN(1)
- w := newWriter(bw, shared)
+ w := newTestWriter(bw, shared)
if n, err := w.Write([]byte{1}); n != 1 || err != nil {
t.Fatalf("Got (%d, %v) want (1, nil)", n, err)
}
@@ -174,7 +175,7 @@
shared := sync.NewSemaphore()
shared.IncN(4)
- w := newWriter(bw, shared)
+ w := newTestWriter(bw, shared)
go w.Close()
<-w.Closed()
@@ -183,14 +184,9 @@
}
}
-func newWriter(bqw bqueue.Writer, shared *sync.Semaphore) *writer {
- return &writer{
- MTU: 16,
- Sink: bqw,
- Alloc: iobuf.NewAllocator(iobuf.NewPool(0), 0),
- SharedCounters: shared,
- closed: make(chan struct{}),
- }
+func newTestWriter(bqw bqueue.Writer, shared *sync.Semaphore) *writer {
+ alloc := iobuf.NewAllocator(iobuf.NewPool(0), 0)
+ return newWriter(16, bqw, alloc, shared)
}
func isTimeoutError(err error) bool {
diff --git a/runtimes/google/ipc/timer.go b/runtimes/google/ipc/timer.go
new file mode 100644
index 0000000..fa148d4
--- /dev/null
+++ b/runtimes/google/ipc/timer.go
@@ -0,0 +1,32 @@
+package ipc
+
+import (
+ "time"
+)
+
+// timer is a replacement for time.Timer, the only difference is that
+// its channel is type chan struct{} and it will be closed when the timer expires,
+// which we need in some places.
+type timer struct {
+ base *time.Timer
+ C <-chan struct{}
+}
+
+func newTimer(d time.Duration) *timer {
+ c := make(chan struct{}, 0)
+ base := time.AfterFunc(d, func() {
+ close(c)
+ })
+ return &timer{
+ base: base,
+ C: c,
+ }
+}
+
+func (t *timer) Stop() bool {
+ return t.base.Stop()
+}
+
+func (t *timer) Reset(d time.Duration) bool {
+ return t.base.Reset(d)
+}
diff --git a/runtimes/google/ipc/timer_test.go b/runtimes/google/ipc/timer_test.go
new file mode 100644
index 0000000..e7345d6
--- /dev/null
+++ b/runtimes/google/ipc/timer_test.go
@@ -0,0 +1,31 @@
+package ipc
+
+import (
+ "testing"
+ "time"
+)
+
+func TestTimer(t *testing.T) {
+ test := newTimer(time.Millisecond)
+ if _, ok := <-test.C; ok {
+ t.Errorf("Expected the channel to be closed.")
+ }
+
+ // Test resetting.
+ test = newTimer(time.Hour)
+ if reset := test.Reset(time.Millisecond); !reset {
+ t.Errorf("Expected to successfully reset.")
+ }
+ if _, ok := <-test.C; ok {
+ t.Errorf("Expected the channel to be closed.")
+ }
+
+ // Test stop.
+ test = newTimer(100 * time.Millisecond)
+ test.Stop()
+ select {
+ case <-test.C:
+ t.Errorf("the test timer should have been stopped.")
+ case <-time.After(200 * time.Millisecond):
+ }
+}