rpc/stream: fix flaky test
start/idle timeout related tests are flaky in a overloaded machine
(especially in race tests), since it can take longer than the wait
time to show the expected behavior.
This CL changes to wait for the expected notifications forever to remove
this kind of time dependency. But we still need to wait some amount of
time to verify that there is no notification in some test cases. In this
case, I keep the original wait time so that a test can wait long enough
to discover any failure in normal circumstances.
Also added a mutex in faketimer to remove a data race failure.
Change-Id: Iffe36a136776ed63dfb15693bef93752676852f1
diff --git a/profiles/internal/rpc/stream/manager/manager_test.go b/profiles/internal/rpc/stream/manager/manager_test.go
index 0581802..8b551e4 100644
--- a/profiles/internal/rpc/stream/manager/manager_test.go
+++ b/profiles/internal/rpc/stream/manager/manager_test.go
@@ -386,9 +386,6 @@
func TestStartTimeout(t *testing.T) {
const (
startTime = 5 * time.Millisecond
- // We use a long wait time here since it takes some time for the underlying
- // VIF of the other side to be closed especially in race testing.
- waitTime = 250 * time.Millisecond
)
var (
@@ -422,18 +419,11 @@
triggerTimers()
// No VC is opened. The VIF should be closed after start timeout.
- timeout := time.After(waitTime)
- for done := false; !done; {
- select {
- case <-time.After(startTime * 2):
- done = numVIFs(server) == 0
- case <-timeout:
- done = true
+ for range time.Tick(startTime) {
+ if numVIFs(server) == 0 {
+ break
}
}
- if n := numVIFs(server); n != 0 {
- t.Errorf("Server has %d VIFs; want 0\n%v", n, debugString(server))
- }
}
func testIdleTimeout(t *testing.T, testServer bool) {
@@ -499,21 +489,11 @@
f.Close()
// The flow has been closed. The VIF should be closed after idle timeout.
- timeout := time.After(waitTime)
- for done := false; !done; {
- select {
- case <-time.After(idleTime * 2):
- done = numVIFs(client) == 0 && numVIFs(server) == 0
- case <-timeout:
- done = true
+ for range time.Tick(idleTime) {
+ if numVIFs(client) == 0 && numVIFs(server) == 0 {
+ break
}
}
- if n := numVIFs(client); n != 0 {
- t.Errorf("Client has %d VIFs; want 0\n%v", n, debugString(client))
- }
- if n := numVIFs(server); n != 0 {
- t.Errorf("Server has %d VIFs; want 0\n%v", n, debugString(server))
- }
}
func TestIdleTimeout(t *testing.T) { testIdleTimeout(t, false) }
diff --git a/profiles/internal/rpc/stream/proxy/proxy_test.go b/profiles/internal/rpc/stream/proxy/proxy_test.go
index d94dc12..6a08df9 100644
--- a/profiles/internal/rpc/stream/proxy/proxy_test.go
+++ b/profiles/internal/rpc/stream/proxy/proxy_test.go
@@ -354,18 +354,11 @@
flow.Close()
// The flow has been closed. The VC should be closed after idle timeout.
- timeout := time.After(waitTime)
- for done := false; !done; {
- select {
- case <-time.After(idleTime * 2):
- done = proxy.NumProcesses(Proxy) == 1
- case <-timeout:
- done = true
+ for range time.Tick(idleTime) {
+ if proxy.NumProcesses(Proxy) == 1 {
+ break
}
}
- if numProcs := proxy.NumProcesses(Proxy); numProcs != 1 {
- t.Error("Want VC has been closed; still open")
- }
client.ShutdownEndpoint(ep)
diff --git a/profiles/internal/rpc/stream/vif/faketimer.go b/profiles/internal/rpc/stream/vif/faketimer.go
index 65a6137..914c4e2 100644
--- a/profiles/internal/rpc/stream/vif/faketimer.go
+++ b/profiles/internal/rpc/stream/vif/faketimer.go
@@ -69,15 +69,20 @@
func SetFakeTimers() func() {
backup := newTimer
+ var mu sync.Mutex
var wg sync.WaitGroup
release := make(chan struct{})
newTimer = func(d time.Duration, f func()) timer {
+ mu.Lock()
+ defer mu.Unlock()
wg.Add(1)
t := newFakeTimer(d, f)
go t.run(release, &wg)
return t
}
return func() {
+ mu.Lock()
+ defer mu.Unlock()
newTimer = backup
close(release)
wg.Wait()
diff --git a/profiles/internal/rpc/stream/vif/idletimer_test.go b/profiles/internal/rpc/stream/vif/idletimer_test.go
index 1d9ce57..0c61fe4 100644
--- a/profiles/internal/rpc/stream/vif/idletimer_test.go
+++ b/profiles/internal/rpc/stream/vif/idletimer_test.go
@@ -31,14 +31,14 @@
m := newIdleTimerMap(notifyFunc)
// An empty map. Should not be notified.
- if err := WaitForNotifications(notify, waitTime); err != nil {
+ if err := WaitWithTimeout(notify, waitTime); err != nil {
t.Error(err)
}
m.Insert(vc1, idleTime)
// A new empty VC. Should be notified.
- if err := WaitForNotifications(notify, waitTime, vc1); err != nil {
+ if err := WaitForNotifications(notify, vc1); err != nil {
t.Error(err)
}
@@ -47,25 +47,25 @@
// A VC with one flow. Should not be notified.
m.InsertFlow(vc1, flow1)
- if err := WaitForNotifications(notify, waitTime); err != nil {
+ if err := WaitWithTimeout(notify, waitTime); err != nil {
t.Error(err)
}
// Try to delete non-existent flow. Should not be notified.
m.DeleteFlow(vc1, flow2)
- if err := WaitForNotifications(notify, waitTime); err != nil {
+ if err := WaitWithTimeout(notify, waitTime); err != nil {
t.Error(err)
}
// Delete the flow. Should be notified.
m.DeleteFlow(vc1, flow1)
- if err := WaitForNotifications(notify, waitTime, vc1); err != nil {
+ if err := WaitForNotifications(notify, vc1); err != nil {
t.Error(err)
}
// Try to delete the deleted flow again. Should not be notified.
m.DeleteFlow(vc1, flow1)
- if err := WaitForNotifications(notify, waitTime); err != nil {
+ if err := WaitWithTimeout(notify, waitTime); err != nil {
t.Error(err)
}
@@ -74,7 +74,7 @@
// Delete an empty VC. Should not be notified.
m.Delete(vc1)
- if err := WaitForNotifications(notify, waitTime); err != nil {
+ if err := WaitWithTimeout(notify, waitTime); err != nil {
t.Error(err)
}
@@ -87,13 +87,13 @@
// Delete the first flow twice. Should not be notified.
m.DeleteFlow(vc1, flow1)
m.DeleteFlow(vc1, flow1)
- if err := WaitForNotifications(notify, waitTime); err != nil {
+ if err := WaitWithTimeout(notify, waitTime); err != nil {
t.Error(err)
}
// Delete the second flow. Should be notified.
m.DeleteFlow(vc1, flow2)
- if err := WaitForNotifications(notify, waitTime, vc1); err != nil {
+ if err := WaitForNotifications(notify, vc1); err != nil {
t.Error(err)
}
@@ -102,7 +102,7 @@
// Insert a reserved flow. Should be notified.
m.InsertFlow(vc1, flowReserved)
- if err := WaitForNotifications(notify, waitTime, vc1); err != nil {
+ if err := WaitForNotifications(notify, vc1); err != nil {
t.Error(err)
}
@@ -111,7 +111,7 @@
m.Insert(vc2, idleTime)
// Multiple VCs. Should be notified for each.
- if err := WaitForNotifications(notify, waitTime, vc1, vc2); err != nil {
+ if err := WaitForNotifications(notify, vc1, vc2); err != nil {
t.Error(err)
}
@@ -124,7 +124,7 @@
if m.Insert(vc1, idleTime) {
t.Fatal("timer has been stopped, but can insert a vc")
}
- if err := WaitForNotifications(notify, waitTime); err != nil {
+ if err := WaitWithTimeout(notify, waitTime); err != nil {
t.Error(err)
}
}
diff --git a/profiles/internal/rpc/stream/vif/testutil_test.go b/profiles/internal/rpc/stream/vif/testutil_test.go
index 36ef1aa..9d35a6a 100644
--- a/profiles/internal/rpc/stream/vif/testutil_test.go
+++ b/profiles/internal/rpc/stream/vif/testutil_test.go
@@ -6,39 +6,35 @@
import (
"fmt"
- "reflect"
"time"
)
-// WaitForNotifications waits till all notifications in 'wants' have been received,
-// or the timeout expires.
-func WaitForNotifications(notify <-chan interface{}, timeout time.Duration, wants ...interface{}) error {
- timer := make(<-chan time.Time)
- if timeout > 0 {
- timer = time.After(timeout)
- }
-
- received := make(map[interface{}]struct{})
- want := make(map[interface{}]struct{})
+// WaitForNotifications waits till all notifications in 'wants' have been received.
+func WaitForNotifications(notify <-chan interface{}, wants ...interface{}) error {
+ expected := make(map[interface{}]struct{})
for _, w := range wants {
- want[w] = struct{}{}
+ expected[w] = struct{}{}
}
+ for len(expected) > 0 {
+ n := <-notify
+ if _, exists := expected[n]; !exists {
+ return fmt.Errorf("unexpected notification %v", n)
+ }
+ delete(expected, n)
+ }
+ return nil
+}
+
+// WaitWithTimeout returns error if any notification has been received before
+// the timeout expires.
+func WaitWithTimeout(notify <-chan interface{}, timeout time.Duration) error {
+ timer := time.After(timeout)
for {
select {
case n := <-notify:
- received[n] = struct{}{}
- if _, exists := want[n]; !exists {
- return fmt.Errorf("unexpected notification %v", n)
- }
- if reflect.DeepEqual(received, want) {
- return nil
- }
+ return fmt.Errorf("unexpected notification %v", n)
case <-timer:
- if len(wants) == 0 {
- // No notification wanted.
- return nil
- }
- return fmt.Errorf("timeout after receiving %v", reflect.ValueOf(received).MapKeys())
+ return nil
}
}
}
diff --git a/profiles/internal/rpc/stream/vif/vif_test.go b/profiles/internal/rpc/stream/vif/vif_test.go
index d598953..e630edc 100644
--- a/profiles/internal/rpc/stream/vif/vif_test.go
+++ b/profiles/internal/rpc/stream/vif/vif_test.go
@@ -320,7 +320,7 @@
// Initially empty. Should not be closed.
vf, remote := newVIF()
- if err := vif.WaitForNotifications(notify, waitTime); err != nil {
+ if err := vif.WaitWithTimeout(notify, waitTime); err != nil {
t.Error(err)
}
@@ -329,13 +329,13 @@
if _, _, err := createVC(vf, remote, makeEP(0x10)); err != nil {
t.Fatal(err)
}
- if err := vif.WaitForNotifications(notify, waitTime); err != nil {
+ if err := vif.WaitWithTimeout(notify, waitTime); err != nil {
t.Error(err)
}
// Close the VC. Should be closed.
vf.ShutdownVCs(makeEP(0x10))
- if err := vif.WaitForNotifications(notify, waitTime, vf, remote); err != nil {
+ if err := vif.WaitForNotifications(notify, vf, remote); err != nil {
t.Error(err)
}
@@ -345,11 +345,11 @@
if err != nil {
t.Fatal(err)
}
- if err := vif.WaitForNotifications(notify, waitTime); err != nil {
+ if err := vif.WaitWithTimeout(notify, waitTime); err != nil {
t.Error(err)
}
remote.ShutdownVCs(makeEP(0x10))
- if err := vif.WaitForNotifications(notify, waitTime, vf, remote); err != nil {
+ if err := vif.WaitForNotifications(notify, vf, remote); err != nil {
t.Error(err)
}
@@ -362,13 +362,13 @@
// Close the first VC twice. Should not be closed.
vf.ShutdownVCs(makeEP(0x10))
vf.ShutdownVCs(makeEP(0x10))
- if err := vif.WaitForNotifications(notify, waitTime); err != nil {
+ if err := vif.WaitWithTimeout(notify, waitTime); err != nil {
t.Error(err)
}
// Close the second VC. Should be closed.
vf.ShutdownVCs(makeEP(0x10 + 1))
- if err := vif.WaitForNotifications(notify, waitTime, vf, remote); err != nil {
+ if err := vif.WaitForNotifications(notify, vf, remote); err != nil {
t.Error(err)
}
}
@@ -410,7 +410,7 @@
// No VC opened. Should be closed after the start timeout.
vf, remote, triggerTimers := newVIF()
triggerTimers()
- if err := vif.WaitForNotifications(notify, waitTime, vf, remote); err != nil {
+ if err := vif.WaitForNotifications(notify, vf, remote); err != nil {
t.Error(err)
}
@@ -420,13 +420,13 @@
t.Fatal(err)
}
triggerTimers()
- if err := vif.WaitForNotifications(notify, waitTime); err != nil {
+ if err := vif.WaitWithTimeout(notify, waitTime); err != nil {
t.Error(err)
}
// Close the VC. Should be closed.
vf.ShutdownVCs(makeEP(0x10))
- if err := vif.WaitForNotifications(notify, waitTime, vf, remote); err != nil {
+ if err := vif.WaitForNotifications(notify, vf, remote); err != nil {
t.Error(err)
}
}
@@ -481,7 +481,7 @@
// No active flow. Should be notified.
vf, remote := newVIF()
_, _, _ = newVC(vf, remote)
- if err := vif.WaitForNotifications(notify, waitTime, vf, remote); err != nil {
+ if err := vif.WaitForNotifications(notify, vf, remote); err != nil {
t.Error(err)
}
@@ -492,7 +492,7 @@
t.Fatal(err)
}
triggerTimers()
- if err := vif.WaitForNotifications(notify, waitTime, vf, remote); err != nil {
+ if err := vif.WaitForNotifications(notify, vf, remote); err != nil {
t.Error(err)
}
@@ -500,13 +500,13 @@
vf, remote = newVIF()
vc, _, _ := newVC(vf, remote)
f1 := newFlow(vc, remote)
- if err := vif.WaitForNotifications(notify, waitTime); err != nil {
+ if err := vif.WaitWithTimeout(notify, waitTime); err != nil {
t.Error(err)
}
// Close the flow. Should be notified.
f1.Close()
- if err := vif.WaitForNotifications(notify, waitTime, vf, remote); err != nil {
+ if err := vif.WaitForNotifications(notify, vf, remote); err != nil {
t.Error(err)
}
@@ -519,13 +519,13 @@
// Close the first flow twice. Should not be notified.
f1.Close()
f1.Close()
- if err := vif.WaitForNotifications(notify, waitTime); err != nil {
+ if err := vif.WaitWithTimeout(notify, waitTime); err != nil {
t.Error(err)
}
// Close the second flow. Should be notified now.
f2.Close()
- if err := vif.WaitForNotifications(notify, waitTime, vf, remote); err != nil {
+ if err := vif.WaitForNotifications(notify, vf, remote); err != nil {
t.Error(err)
}
@@ -537,11 +537,11 @@
t.Fatal(err)
}
acceptFlowAtClient(ln)
- if err := vif.WaitForNotifications(notify, waitTime); err != nil {
+ if err := vif.WaitWithTimeout(notify, waitTime); err != nil {
t.Error(err)
}
f1.Close()
- if err := vif.WaitForNotifications(notify, waitTime, vf, remote); err != nil {
+ if err := vif.WaitForNotifications(notify, vf, remote); err != nil {
t.Error(err)
}
}