runtime/internal/flow: Add markUsed method to conn to cleanup code.
Change-Id: I11fb8fb771440bb32b314e22c85b693a662fac26
diff --git a/runtime/internal/flow/conn/conn.go b/runtime/internal/flow/conn/conn.go
index 2a0fec1..6f12546 100644
--- a/runtime/internal/flow/conn/conn.go
+++ b/runtime/internal/flow/conn/conn.go
@@ -311,3 +311,9 @@
c.internalClose(ctx, err, flows)
}
}
+
+func (c *Conn) markUsed() {
+ c.mu.Lock()
+ c.lastUsedTime = time.Now()
+ c.mu.Unlock()
+}
diff --git a/runtime/internal/flow/conn/flow.go b/runtime/internal/flow/conn/flow.go
index 435a4b4..1e69d74 100644
--- a/runtime/internal/flow/conn/flow.go
+++ b/runtime/internal/flow/conn/flow.go
@@ -6,7 +6,6 @@
import (
"strconv"
- "time"
"v.io/v23/context"
"v.io/v23/flow"
@@ -51,9 +50,7 @@
// Read and ReadMsg should not be called concurrently with themselves
// or each other.
func (f *flw) Read(p []byte) (n int, err error) {
- f.conn.mu.Lock()
- f.conn.lastUsedTime = time.Now()
- f.conn.mu.Unlock()
+ f.conn.markUsed()
var release bool
if n, release, err = f.q.read(f.ctx, p); release {
f.conn.release(f.ctx)
@@ -69,9 +66,7 @@
// Read and ReadMsg should not be called concurrently with themselves
// or each other.
func (f *flw) ReadMsg() (buf []byte, err error) {
- f.conn.mu.Lock()
- f.conn.lastUsedTime = time.Now()
- f.conn.mu.Unlock()
+ f.conn.markUsed()
var release bool
// TODO(mattr): Currently we only ever release counters when some flow
// reads. We may need to do it more or less often. Currently
@@ -93,9 +88,7 @@
}
func (f *flw) writeMsg(alsoClose bool, parts ...[]byte) (int, error) {
- f.conn.mu.Lock()
- f.conn.lastUsedTime = time.Now()
- f.conn.mu.Unlock()
+ f.conn.markUsed()
sent := 0
var left []byte
err := f.worker.Run(f.ctx, func(tokens int) (int, bool, error) {