blob: ec10fb44578f2eda324c57975d2360516f41ce4e [file] [log] [blame]
Jiri Simsa574ec4b2015-08-11 09:31:37 -07001// Copyright 2015 The Vanadium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package pubsub
6
7import (
8 "fmt"
9 "strings"
10 "sync"
11
12 "v.io/v23/verror"
13)
14
15const pkgPath = "v.io/x/ref/lib/pubsub"
16
17var (
18 errNeedNonNilChannel = verror.Register(pkgPath+".errNeedNonNilChannel", verror.NoRetry, "must provide a non-nil channel")
19 errStreamExists = verror.Register(pkgPath+".errStreamExists", verror.NoRetry, "stream {3} already exists")
20 errStreamShutDown = verror.Register(pkgPath+".errStreamShutDown", verror.NoRetry, "stream {3} has been shut down")
21 errStreamDoesntExist = verror.Register(pkgPath+".errStreamDoesntExist", verror.NoRetry, "stream {3} doesn't exist")
22)
23
24// A Publisher provides a mechanism for communicating Settings from a set
25// of producers to multiple consumers. Each such producer and associated
26// consumers are called a Stream. Operations are provided for creating
27// streams (CreateStream) and adding new consumers (ForkStream). Communication
28// is implemented using channels, with the producer and consumers providing
29// channels to send and receive Settings over. A Stream remembers the last value
30// of all Settings that were sent over it; these can be retrieved via ForkStream
31// or the Latest method.
32//
33// The Publisher may be shut down by calling its Shutdown method and
34// the producers will be notified via the channel returned by CreateStream,
35// at which point they should close the channel they use for publishing Settings.
36// If producers fail to close this channel then the Publisher will leak
37// goroutines (one per stream) when it is shutdown.
38type Publisher struct {
39 mu sync.RWMutex
40 stop chan struct{}
41 shutdown bool
42 streams map[string]*fork
43}
44
45// Stream is returned by Latest and includes the name and description
46// for the stream and the most recent values of the Setting that flowed
47// through it.
48type Stream struct {
49 Name, Description string
50 // Latest is a map of Setting names to the Setting itself.
51 Latest map[string]Setting
52}
53
54// NewPublisher creates a Publisher.
55func NewPublisher() *Publisher {
56 return &Publisher{
57 streams: make(map[string]*fork),
58 stop: make(chan struct{}),
59 }
60}
61
62type fork struct {
63 sync.RWMutex
64 desc string
65 vals map[string]Setting
66 in <-chan Setting
67 outs []chan<- Setting
68}
69
70// CreateStream creates a Stream with the provided name and description
71// (note, Settings have their own names and description, these are for the
72// stream). In general, no buffering is required for this channel since
73// the Publisher itself will read from it, however, if the consumers are slow
74// then the publisher may be slow in draining the channel. The publisher
75// should provide additional buffering if this is a concern.
76// Consequently this mechanism should be used for rarely changing Settings,
77// such as network address changes forced by DHCP and hence no buffering
78// will be required. The channel returned by CreateStream is closed when the
79// publisher is shut down and hence the caller should wait for this to occur
80// and then close the channel it has passed to CreateStream.
81func (p *Publisher) CreateStream(name, description string, ch <-chan Setting) (<-chan struct{}, error) {
82 if ch == nil {
83 return nil, verror.New(errNeedNonNilChannel, nil)
84 }
85 p.mu.Lock()
86 defer p.mu.Unlock()
87 if p.streams[name] != nil {
88 return nil, verror.New(errStreamExists, nil, name)
89 }
90 f := &fork{desc: description, in: ch, vals: make(map[string]Setting)}
91 p.streams[name] = f
92 go f.flow(p.stop)
93 return p.stop, nil
94}
95
96// String returns a string representation of the publisher, including
97// the names and descriptions of all the streams it currently supports.
98func (p *Publisher) String() string {
99 r := ""
100 p.mu.RLock()
101 defer p.mu.RUnlock()
102 if p.shutdown {
103 return "shutdown"
104 }
105 for k, s := range p.streams {
106 r += fmt.Sprintf("(%s: %s) ", k, s.desc)
107 }
108 return strings.TrimRight(r, " ")
109}
110
111// Latest returns information on the requested stream, including the
112// last instance of all Settings, if any, that flowed over it.
113func (p *Publisher) Latest(name string) *Stream {
114 p.mu.Lock()
115 defer p.mu.Unlock()
116 f := p.streams[name]
117 if f == nil {
118 return nil
119 }
120 var r map[string]Setting
121 f.RLock()
122 defer f.RUnlock()
123 for k, v := range f.vals {
124 r[k] = v
125 }
126 return &Stream{Name: name, Description: f.desc, Latest: r}
127}
128
129// ForkStream 'forks' the named stream to add a new consumer. The channel
130// provided is to be used to read Settings sent down the stream. This
131// channel will be closed by the Publisher when it is asked to shut down.
132// The reader on this channel must be able to keep up with the flow of Settings
133// through the Stream in order to avoid blocking all other readers and hence
134// should set an appropriate amount of buffering for the channel it passes in.
135// ForkStream returns the most recent values of all Settings previously
136// sent over the stream, thus allowing its caller to synchronise with the
137// stream.
138func (p *Publisher) ForkStream(name string, ch chan<- Setting) (*Stream, error) {
139 p.mu.Lock()
140 defer p.mu.Unlock()
141 if p.shutdown {
142 return nil, verror.New(errStreamShutDown, nil, name)
143 }
144 f := p.streams[name]
145 if f == nil {
146 return nil, verror.New(errStreamDoesntExist, nil, name)
147 }
148 f.Lock()
149 defer f.Unlock()
150 r := make(map[string]Setting)
151 for k, v := range f.vals {
152 r[k] = v
153 }
154 f.outs = append(f.outs, ch)
155 return &Stream{Name: name, Description: f.desc, Latest: r}, nil
156}
157
158// CloseFork removes the specified channel from the named stream.
159// The caller must drain the channel before closing it.
160// TODO(cnicolaou): add tests for this.
161func (p *Publisher) CloseFork(name string, ch chan<- Setting) error {
162 p.mu.Lock()
163 defer p.mu.Unlock()
164 if p.shutdown {
165 return verror.New(errStreamShutDown, nil, name)
166 }
167 f := p.streams[name]
168 if f == nil {
169 return verror.New(errStreamDoesntExist, nil, name)
170 }
171 f.Lock()
172 defer f.Unlock()
173 for i, v := range f.outs {
174 if v == ch {
175 f.outs = append(f.outs[0:i], f.outs[i+1:]...)
176 break
177 }
178 }
179 return nil
180}
181
182// Shutdown initiates the process of stopping the operation of the Publisher.
183// All of the channels passed to CreateStream must be closed by their owner
184// to ensure that all goroutines are garbage collected.
185func (p *Publisher) Shutdown() {
186 p.mu.Lock()
187 defer p.mu.Unlock()
188 if p.shutdown {
189 return
190 }
191 p.shutdown = true
192 close(p.stop)
193}
194
195func (f *fork) closeChans() {
196 f.Lock()
197 for _, o := range f.outs {
198 close(o)
199 }
200 f.outs = nil
201 f.Unlock()
202}
203
204func (f *fork) flow(stop chan struct{}) {
205 closed := false
206 for {
207 select {
208 case <-stop:
209 if !closed {
210 f.closeChans()
211 closed = true
212 }
213 case val, ok := <-f.in:
214 if !ok {
215 f.closeChans()
216 return
217 }
218 f.Lock()
219 f.vals[val.Name()] = val
220 cpy := make([]chan<- Setting, len(f.outs))
221 copy(cpy, f.outs)
222 f.Unlock()
223 for _, o := range cpy {
224 // We may well block here.
225 o <- val
226 }
227 }
228 }
229}