blob: 97ccc3544bad8139e62dd7b49c2442ba0f77c06b [file] [log] [blame]
Bogdan Caprita4d67c042014-08-19 10:41:19 -07001package impl
2
3// The config invoker is responsible for answering calls to the config service
4// run as part of the node manager. The config invoker converts RPCs to
5// messages on channels that are used to listen on callbacks coming from child
6// application instances.
7
8import (
9 "strconv"
10 "sync"
Bogdan Caprita78b62162014-08-21 15:35:08 -070011 "time"
Bogdan Caprita4d67c042014-08-19 10:41:19 -070012
Jiri Simsa519c5072014-09-17 21:37:57 -070013 "veyron.io/veyron/veyron2/ipc"
14 "veyron.io/veyron/veyron2/naming"
15 "veyron.io/veyron/veyron2/vlog"
Bogdan Caprita4d67c042014-08-19 10:41:19 -070016)
17
18type callbackState struct {
19 sync.Mutex
20 // channels maps callback identifiers and config keys to channels that
21 // are used to communicate corresponding config values from child
22 // processes.
Bogdan Caprita78b62162014-08-21 15:35:08 -070023 channels map[string]map[string]chan<- string
Bogdan Caprita4d67c042014-08-19 10:41:19 -070024 // nextCallbackID provides the next callback identifier to use as a key
25 // for the channels map.
26 nextCallbackID int64
Bogdan Caprita78b62162014-08-21 15:35:08 -070027 // name is the object name for making calls against the node manager's
28 // config service.
29 name string
Bogdan Caprita4d67c042014-08-19 10:41:19 -070030}
31
Bogdan Caprita78b62162014-08-21 15:35:08 -070032func newCallbackState(name string) *callbackState {
Bogdan Caprita4d67c042014-08-19 10:41:19 -070033 return &callbackState{
Bogdan Caprita78b62162014-08-21 15:35:08 -070034 channels: make(map[string]map[string]chan<- string),
35 name: name,
36 }
37}
38
39// callbackListener abstracts out listening for values provided via the
40// callback mechanism for a given key.
41type callbackListener interface {
42 // waitForValue blocks until the value that this listener is expecting
43 // arrives; or until the timeout expires.
44 waitForValue(timeout time.Duration) (string, error)
45 // cleanup cleans up any state used by the listener. Should be called
46 // when the listener is no longer needed.
47 cleanup()
48 // name returns the object name for the config service object that
49 // handles the key that the listener is listening for.
50 name() string
51}
52
53// listener implements callbackListener
54type listener struct {
55 id string
56 cs *callbackState
57 ch <-chan string
58 n string
59}
60
61func (l *listener) waitForValue(timeout time.Duration) (string, error) {
62 select {
63 case value := <-l.ch:
64 return value, nil
65 case <-time.After(timeout):
66 vlog.Errorf("Waiting for callback timed out")
67 return "", errOperationFailed
68 }
69}
70
71func (l *listener) cleanup() {
72 l.cs.unregister(l.id)
73}
74
75func (l *listener) name() string {
76 return l.n
77}
78
79func (c *callbackState) listenFor(key string) callbackListener {
80 id := c.generateID()
David Why Use Two When One Will Do Presotto8b4dbbf2014-11-06 10:50:14 -080081 callbackName := naming.Join(c.name, configSuffix, id)
Bogdan Caprita78b62162014-08-21 15:35:08 -070082 // Make the channel buffered to avoid blocking the Set method when
83 // nothing is receiving on the channel. This happens e.g. when
84 // unregisterCallbacks executes before Set is called.
85 callbackChan := make(chan string, 1)
86 c.register(id, key, callbackChan)
87 return &listener{
88 id: id,
89 cs: c,
90 ch: callbackChan,
91 n: callbackName,
Bogdan Caprita4d67c042014-08-19 10:41:19 -070092 }
93}
94
95func (c *callbackState) generateID() string {
96 c.Lock()
97 defer c.Unlock()
98 c.nextCallbackID++
99 return strconv.FormatInt(c.nextCallbackID-1, 10)
100}
101
Bogdan Caprita78b62162014-08-21 15:35:08 -0700102func (c *callbackState) register(id, key string, channel chan<- string) {
Bogdan Caprita4d67c042014-08-19 10:41:19 -0700103 c.Lock()
104 defer c.Unlock()
105 if _, ok := c.channels[id]; !ok {
Bogdan Caprita78b62162014-08-21 15:35:08 -0700106 c.channels[id] = make(map[string]chan<- string)
Bogdan Caprita4d67c042014-08-19 10:41:19 -0700107 }
108 c.channels[id][key] = channel
109}
110
111func (c *callbackState) unregister(id string) {
112 c.Lock()
113 defer c.Unlock()
114 delete(c.channels, id)
115}
116
117// configInvoker holds the state of a config service invocation.
118type configInvoker struct {
119 callback *callbackState
120 // Suffix contains an identifier for the channel corresponding to the
121 // request.
122 suffix string
123}
124
125func (i *configInvoker) Set(_ ipc.ServerContext, key, value string) error {
126 id := i.suffix
127 i.callback.Lock()
128 if _, ok := i.callback.channels[id]; !ok {
129 i.callback.Unlock()
130 return errInvalidSuffix
131 }
132 channel, ok := i.callback.channels[id][key]
133 i.callback.Unlock()
134 if !ok {
135 return nil
136 }
137 channel <- value
138 return nil
139}