Merge "veyron/runtimes/google/ipc/stream/proxy: Add publish address"
diff --git a/services/config/lib/config.go b/services/config/lib/config.go
index 32b0217..2e11c38 100644
--- a/services/config/lib/config.go
+++ b/services/config/lib/config.go
@@ -76,15 +76,10 @@
cs.rwlock.Lock()
mdns := cs.mdns
cs.mdns = nil
- c := cs.done
- cs.done = nil
cs.rwlock.Unlock()
-
- if c != nil {
- close(c)
- }
if mdns != nil {
mdns.Stop()
+ close(cs.done)
}
}
@@ -264,9 +259,14 @@
// TODO(p): Should we also watch the file for changes?
func (cs *configService) watcher() {
cs.rwlock.RLock()
+ if cs.mdns == nil {
+ cs.rwlock.RUnlock()
+ return
+ }
c := cs.mdns.ServiceMemberWatch(cs.service)
cs.mdns.SubscribeToService(cs.service)
cs.rwlock.RUnlock()
+ defer close(c)
for {
select {
case si := <-c:
@@ -299,11 +299,9 @@
cs.atomic.Unlock()
cs.Offer()
case <-cs.done:
- break
+ return
}
}
- close(c)
- return
}
// Get returns the value associated with key.
diff --git a/services/wspr/wsprd/lib/wspr_test.go b/services/wspr/wsprd/lib/wspr_test.go
index 830e3f7..171b39a 100644
--- a/services/wspr/wsprd/lib/wspr_test.go
+++ b/services/wspr/wsprd/lib/wspr_test.go
@@ -177,10 +177,10 @@
}
w.Lock()
w.stream = append(w.stream, resp)
- w.Unlock()
if w.notifier != nil {
w.notifier <- true
}
+ w.Unlock()
return nil
}
@@ -196,7 +196,9 @@
if w.streamLength() >= n {
return nil
}
+ w.Lock()
w.notifier = make(chan bool, 1)
+ w.Unlock()
for w.streamLength() < n {
select {
case <-w.notifier:
@@ -205,7 +207,9 @@
return fmt.Errorf("timed out")
}
}
+ w.Lock()
w.notifier = nil
+ w.Unlock()
return nil
}
@@ -590,6 +594,12 @@
if (err2 != nil || test.err != nil) && !reflect.DeepEqual(err2, test.err) {
t.Errorf("unexected error: got %v, expected %v", err2, test.err)
}
+
+ // Wait until the close streaming messages have been acknowledged.
+ if err := writer.waitForMessage(len(expectedWebsocketMessage)); err != nil {
+ t.Errorf("didn't recieve expected message: %v", err)
+ }
+
checkResponses(&writer, expectedWebsocketMessage, nil, t)
}