blob: e1911aafa1ab55be478c27ab6294631d425c2fb1 [file] [log] [blame]
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001package proxy
2
3import (
4 "errors"
5 "fmt"
6 "net"
7 "sync"
8
9 "veyron/runtimes/google/ipc/stream/id"
10 "veyron/runtimes/google/ipc/stream/message"
11 "veyron/runtimes/google/ipc/stream/vc"
12 "veyron/runtimes/google/ipc/version"
13 "veyron/runtimes/google/lib/bqueue"
14 "veyron/runtimes/google/lib/bqueue/drrqueue"
15 "veyron/runtimes/google/lib/iobuf"
16 "veyron/runtimes/google/lib/upcqueue"
17
18 "veyron2"
19 "veyron2/naming"
20 "veyron2/security"
Asim Shankara263fc52014-06-02 22:00:26 -070021 "veyron2/verror"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070022 "veyron2/vlog"
Asim Shankara263fc52014-06-02 22:00:26 -070023 "veyron2/vom"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070024)
25
26var (
27 errNoRoutingTableEntry = errors.New("routing table has no entry for the VC")
28 errProcessVanished = errors.New("remote process vanished")
29 errDuplicateOpenVC = errors.New("duplicate OpenVC request")
30)
31
32// Proxy routes virtual circuit (VC) traffic between multiple underlying
33// network connections.
34type Proxy struct {
Asim Shankara263fc52014-06-02 22:00:26 -070035 ln net.Listener
36 rid naming.RoutingID
37 id security.PrivateID
Jiri Simsa5293dcb2014-05-10 09:56:38 -070038 mu sync.RWMutex
Asim Shankara263fc52014-06-02 22:00:26 -070039 servers *servermap
40 processes map[*process]struct{}
Jiri Simsa5293dcb2014-05-10 09:56:38 -070041}
42
43// process encapsulates the physical network connection and the routing table
Asim Shankara263fc52014-06-02 22:00:26 -070044// associated with the process at the other end of the network connection.
Jiri Simsa5293dcb2014-05-10 09:56:38 -070045type process struct {
46 Conn net.Conn
47 Queue *upcqueue.T
48
49 mu sync.RWMutex
50 routingTable map[id.VC]*destination
51 nextVCI id.VC
Asim Shankara263fc52014-06-02 22:00:26 -070052 servers map[id.VC]*vc.VC // servers wishing to be proxied create a VC that terminates at the proxy
Jiri Simsa5293dcb2014-05-10 09:56:38 -070053
Asim Shankara263fc52014-06-02 22:00:26 -070054 BQ bqueue.T // Flow control for messages sent on behalf of servers.
Jiri Simsa5293dcb2014-05-10 09:56:38 -070055}
56
Asim Shankara263fc52014-06-02 22:00:26 -070057// destination is an entry in the routingtable of a process.
Jiri Simsa5293dcb2014-05-10 09:56:38 -070058type destination struct {
59 VCI id.VC
60 Process *process
61}
62
Asim Shankara263fc52014-06-02 22:00:26 -070063// server encapsulates information stored about a server exporting itself via the proxy.
64type server struct {
65 Process *process
66 VC *vc.VC
67}
68
69func (s *server) RoutingID() naming.RoutingID { return s.VC.RemoteAddr().RoutingID() }
70func (s *server) Close(err error) {
71 if vc := s.Process.RemoveServerVC(s.VC.VCI()); vc != nil {
72 if err != nil {
73 vc.Close(err.Error())
74 } else {
75 vc.Close("server closed by proxy")
76 }
Asim Shankar0ea02ab2014-06-09 11:39:24 -070077 s.Process.SendCloseVC(s.VC.VCI(), err)
Asim Shankara263fc52014-06-02 22:00:26 -070078 }
Asim Shankara263fc52014-06-02 22:00:26 -070079}
80func (s *server) String() string {
81 return fmt.Sprintf("RoutingID %v on process %v (VCI:%v ID:%v)", s.RoutingID(), s.Process, s.VC.VCI(), s.VC.RemoteID())
82}
83
84// servermap is a concurrent-access safe map from the RoutingID of a server exporting itself
85// through the proxy to the underlying network connection that the server is found on.
86type servermap struct {
87 mu sync.Mutex
88 m map[naming.RoutingID]*server
89}
90
91func (m *servermap) Add(server *server) error {
92 key := server.RoutingID()
93 m.mu.Lock()
94 defer m.mu.Unlock()
95 if m.m[key] != nil {
96 return fmt.Errorf("server with routing id %v is already being proxied", key)
97 }
98 m.m[key] = server
99 proxyLog().Infof("Started proxying server: %v", server)
100 return nil
101}
102
103func (m *servermap) Remove(server *server) {
104 key := server.RoutingID()
105 m.mu.Lock()
106 if m.m[key] != nil {
107 delete(m.m, key)
108 proxyLog().Infof("Stopped proxying server: %v", server)
109 }
110 m.mu.Unlock()
111}
112
113func (m *servermap) Process(rid naming.RoutingID) *process {
114 m.mu.Lock()
115 defer m.mu.Unlock()
116 if s := m.m[rid]; s != nil {
117 return s.Process
118 }
119 return nil
120}
121
122func (m *servermap) List() []string {
123 m.mu.Lock()
124 defer m.mu.Unlock()
125 ret := make([]string, 0, len(m.m))
126 for _, s := range m.m {
127 ret = append(ret, s.String())
128 }
129 return ret
130}
131
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700132// New creates a new Proxy that listens for network connections on the provided
133// (network, address) pair and routes VC traffic between accepted connections.
134func New(rid naming.RoutingID, identity security.PrivateID, network, address string) (*Proxy, error) {
135 ln, err := net.Listen(network, address)
136 if err != nil {
137 return nil, fmt.Errorf("net.Listen(%q, %q) failed: %v", network, address, err)
138 }
139 proxy := &Proxy{
140 ln: ln,
141 rid: rid,
142 id: identity,
Asim Shankara263fc52014-06-02 22:00:26 -0700143 servers: &servermap{m: make(map[naming.RoutingID]*server)},
144 processes: make(map[*process]struct{}),
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700145 }
146 go proxy.listenLoop()
147 return proxy, nil
148}
149
150func (p *Proxy) listenLoop() {
151 proxyLog().Infof("Proxy listening on (%q, %q): %v", p.ln.Addr().Network(), p.ln.Addr(), p.Endpoint())
152 for {
153 conn, err := p.ln.Accept()
154 if err != nil {
155 proxyLog().Infof("Exiting listenLoop of proxy %q: %v", p.Endpoint(), err)
156 return
157 }
158 process := &process{
159 Conn: conn,
160 Queue: upcqueue.New(),
161 routingTable: make(map[id.VC]*destination),
Asim Shankara263fc52014-06-02 22:00:26 -0700162 servers: make(map[id.VC]*vc.VC),
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700163 BQ: drrqueue.New(vc.MaxPayloadSizeBytes),
164 }
165 go writeLoop(process)
Asim Shankara263fc52014-06-02 22:00:26 -0700166 go serverVCsLoop(process)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700167 go p.readLoop(process)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700168 }
169}
170
171func writeLoop(process *process) {
172 defer processLog().Infof("Exited writeLoop for %v", process)
173 defer process.Close()
174 for {
175 item, err := process.Queue.Get(nil)
176 if err != nil {
177 if err != upcqueue.ErrQueueIsClosed {
178 processLog().Infof("upcqueue.Get failed on %v: %v", process, err)
179 }
180 return
181 }
182 if err = message.WriteTo(process.Conn, item.(message.T)); err != nil {
183 processLog().Infof("message.WriteTo on %v failed: %v", process, err)
184 return
185 }
186 }
187}
188
Asim Shankara263fc52014-06-02 22:00:26 -0700189func serverVCsLoop(process *process) {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700190 for {
191 w, bufs, err := process.BQ.Get(nil)
192 if err != nil {
193 return
194 }
195 vci, fid := unpackIDs(w.ID())
Asim Shankara263fc52014-06-02 22:00:26 -0700196 if vc := process.ServerVC(vci); vc != nil {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700197 queueDataMessages(bufs, vc, fid, process.Queue)
198 if len(bufs) == 0 {
199 m := &message.Data{VCI: vci, Flow: fid}
200 m.SetClose()
201 process.Queue.Put(m)
202 w.Shutdown(true)
203 }
204 continue
205 }
206 releaseBufs(0, bufs)
207 }
208}
209
210func releaseBufs(start int, bufs []*iobuf.Slice) {
211 for i := start; i < len(bufs); i++ {
212 bufs[i].Release()
213 }
214}
215
216func queueDataMessages(bufs []*iobuf.Slice, vc *vc.VC, fid id.Flow, q *upcqueue.T) {
217 for ix, b := range bufs {
218 m := &message.Data{VCI: vc.VCI(), Flow: fid}
219 var err error
220 if m.Payload, err = vc.Encrypt(fid, b); err != nil {
221 msgLog().Infof("vc.Encrypt failed. VC:%v Flow:%v Error:%v", vc, fid, err)
222 releaseBufs(ix+1, bufs)
223 return
224 }
225 if err = q.Put(m); err != nil {
226 msgLog().Infof("Failed to enqueue data message %v: %v", m, err)
227 m.Release()
228 releaseBufs(ix+1, bufs)
229 return
230 }
231 }
232}
233
Asim Shankara263fc52014-06-02 22:00:26 -0700234func (p *Proxy) startProcess(process *process) {
235 p.mu.Lock()
236 p.processes[process] = struct{}{}
237 p.mu.Unlock()
238 processLog().Infof("Started process %v", process)
239}
240
241func (p *Proxy) stopProcess(process *process) {
242 process.Close()
243 p.mu.Lock()
244 delete(p.processes, process)
245 p.mu.Unlock()
246 processLog().Infof("Stopped process %v", process)
247}
248
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700249func (p *Proxy) readLoop(process *process) {
Asim Shankara263fc52014-06-02 22:00:26 -0700250 p.startProcess(process)
251 defer p.stopProcess(process)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700252
253 reader := iobuf.NewReader(iobuf.NewPool(0), process.Conn)
254 defer reader.Close()
255
256 for {
257 msg, err := message.ReadFrom(reader)
258 if err != nil {
259 processLog().Infof("Read on %v failed: %v", process, err)
260 return
261 }
262 msgLog().Infof("Received msg: %T = %v", msg, msg)
263 switch m := msg.(type) {
264 case *message.Data:
Asim Shankara263fc52014-06-02 22:00:26 -0700265 if vc := process.ServerVC(m.VCI); vc != nil {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700266 if err := vc.DispatchPayload(m.Flow, m.Payload); err != nil {
267 processLog().Infof("Ignoring data message %v from process %v: %v", m, process, err)
268 }
269 if m.Close() {
270 vc.ShutdownFlow(m.Flow)
271 }
272 break
273 }
274 srcVCI := m.VCI
275 if d := process.Route(srcVCI); d != nil {
276 m.VCI = d.VCI
277 if err := d.Process.Queue.Put(m); err != nil {
278 process.RemoveRoute(srcVCI)
279 process.SendCloseVC(srcVCI, fmt.Errorf("proxy failed to forward data message: %v", err))
280 }
281 break
282 }
283 process.SendCloseVC(srcVCI, errNoRoutingTableEntry)
284 case *message.OpenFlow:
Asim Shankara263fc52014-06-02 22:00:26 -0700285 if vc := process.ServerVC(m.VCI); vc != nil {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700286 if err := vc.AcceptFlow(m.Flow); err != nil {
287 processLog().Infof("OpenFlow %+v on process %v failed: %v", m, process, err)
288 cm := &message.Data{VCI: m.VCI, Flow: m.Flow}
289 cm.SetClose()
290 process.Queue.Put(cm)
291 }
292 vc.ReleaseCounters(m.Flow, m.InitialCounters)
293 break
294 }
295 srcVCI := m.VCI
296 if d := process.Route(srcVCI); d != nil {
297 m.VCI = d.VCI
298 if err := d.Process.Queue.Put(m); err != nil {
299 process.RemoveRoute(srcVCI)
300 process.SendCloseVC(srcVCI, fmt.Errorf("proxy failed to forward open flow message: %v", err))
301 }
302 break
303 }
304 process.SendCloseVC(srcVCI, errNoRoutingTableEntry)
305 case *message.CloseVC:
Asim Shankara263fc52014-06-02 22:00:26 -0700306 if vc := process.RemoveServerVC(m.VCI); vc != nil {
307 vc.Close(m.Error)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700308 break
309 }
310 srcVCI := m.VCI
311 if d := process.Route(srcVCI); d != nil {
312 m.VCI = d.VCI
313 d.Process.Queue.Put(m)
314 d.Process.RemoveRoute(d.VCI)
315 }
316 process.RemoveRoute(srcVCI)
317 case *message.AddReceiveBuffers:
318 p.routeCounters(process, m.Counters)
319 case *message.OpenVC:
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700320 dstrid := m.DstEndpoint.RoutingID()
Asim Shankara263fc52014-06-02 22:00:26 -0700321 if naming.Compare(dstrid, p.rid) || naming.Compare(dstrid, naming.NullRoutingID) {
322 // VC that terminates at the proxy.
323 // See protocol.vdl for details on the protocol between the server and the proxy.
324 vc := process.NewServerVC(m)
325 // route counters after creating the VC so counters to vc are not lost.
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700326 p.routeCounters(process, m.Counters)
Asim Shankara263fc52014-06-02 22:00:26 -0700327 if vc != nil {
328 server := &server{Process: process, VC: vc}
329 go p.runServer(server, vc.HandshakeAcceptedVC(veyron2.LocalID(p.id)))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700330 }
Asim Shankara263fc52014-06-02 22:00:26 -0700331 break
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700332 }
Asim Shankara263fc52014-06-02 22:00:26 -0700333 dstprocess := p.servers.Process(dstrid)
334 if dstprocess == nil {
335 process.SendCloseVC(m.VCI, fmt.Errorf("no server with routing id %v is being proxied", dstrid))
336 p.routeCounters(process, m.Counters)
337 break
338 }
339 srcVCI := m.VCI
340 dstVCI := dstprocess.AllocVCI()
341 startRoutingVC(srcVCI, dstVCI, process, dstprocess)
342 // Forward the OpenVC message.
343 // Typically, an OpenVC message is accompanied with Counters for the new VC.
344 // Keep that in the forwarded message and route the remaining counters separately.
345 counters := m.Counters
346 m.Counters = message.NewCounters()
347 for cid, bytes := range counters {
348 if cid.VCI() == srcVCI {
349 m.Counters.Add(dstVCI, cid.Flow(), bytes)
350 delete(counters, cid)
351 }
352 }
353 m.VCI = dstVCI
354 dstprocess.Queue.Put(m)
355 p.routeCounters(process, counters)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700356 default:
357 processLog().Infof("Closing %v because of unrecognized message %T", process, m)
358 return
359 }
360 }
361}
362
Asim Shankara263fc52014-06-02 22:00:26 -0700363func (p *Proxy) runServer(server *server, c <-chan vc.HandshakeResult) {
364 hr := <-c
365 if hr.Error != nil {
366 server.Close(hr.Error)
367 return
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700368 }
Asim Shankara263fc52014-06-02 22:00:26 -0700369 // See comments in protocol.vdl for the protocol between servers and the proxy.
370 conn, err := hr.Listener.Accept()
371 if err != nil {
372 server.Close(errors.New("failed to accept health check flow"))
373 return
374 }
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700375 defer server.Close(nil)
Asim Shankara263fc52014-06-02 22:00:26 -0700376 server.Process.InitVCI(server.VC.VCI())
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700377
Asim Shankara263fc52014-06-02 22:00:26 -0700378 var request Request
379 var response Response
380 if err := vom.NewDecoder(conn).Decode(&request); err != nil {
381 response.Error = verror.BadProtocolf("proxy: unable to read Request: %v", err)
382 } else if err := p.servers.Add(server); err != nil {
383 response.Error = verror.Convert(err)
384 } else {
385 defer p.servers.Remove(server)
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700386 ep, err := version.ProxiedEndpoint(server.VC.RemoteAddr().RoutingID(), p.Endpoint())
387 if err != nil {
388 response.Error = verror.ConvertWithDefault(verror.Internal, err)
389 }
390 if ep != nil {
391 response.Endpoint = ep.String()
392 }
Asim Shankara263fc52014-06-02 22:00:26 -0700393 }
394 if err := vom.NewEncoder(conn).Encode(response); err != nil {
395 proxyLog().Infof("Failed to encode response %#v for server %v", response, server)
396 server.Close(err)
397 return
398 }
399 // Reject all other flows
400 go func() {
401 for {
402 flow, err := hr.Listener.Accept()
403 if err != nil {
404 return
405 }
406 flow.Close()
407 }
408 }()
409 // Wait for this flow to be closed.
410 <-conn.Closed()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700411}
412
413func (p *Proxy) routeCounters(process *process, counters message.Counters) {
414 // Since each VC can be routed to a different process, split up the
415 // Counters into one message per VC.
416 // Ideally, would split into one message per process (rather than per
Asim Shankara263fc52014-06-02 22:00:26 -0700417 // flow). This optimization is left an as excercise to the interested.
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700418 for cid, bytes := range counters {
419 srcVCI := cid.VCI()
Asim Shankara263fc52014-06-02 22:00:26 -0700420 if vc := process.servers[srcVCI]; vc != nil {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700421 vc.ReleaseCounters(cid.Flow(), bytes)
422 continue
423 }
424 if d := process.Route(srcVCI); d != nil {
425 c := message.NewCounters()
426 c.Add(d.VCI, cid.Flow(), bytes)
427 if err := d.Process.Queue.Put(&message.AddReceiveBuffers{Counters: c}); err != nil {
428 process.RemoveRoute(srcVCI)
429 process.SendCloseVC(srcVCI, fmt.Errorf("proxy failed to forward receive buffers: %v", err))
430 }
431 }
432 }
433}
434
435func startRoutingVC(srcVCI, dstVCI id.VC, srcProcess, dstProcess *process) {
436 dstProcess.AddRoute(dstVCI, &destination{VCI: srcVCI, Process: srcProcess})
437 srcProcess.AddRoute(srcVCI, &destination{VCI: dstVCI, Process: dstProcess})
438 vcLog().Infof("Routing (VCI %d @ [%s]) <-> (VCI %d @ [%s])", srcVCI, srcProcess, dstVCI, dstProcess)
439}
440
441// Endpoint returns the endpoint of the proxy service. By Dialing a VC to this
442// endpoint, processes can have their services exported through the proxy.
443func (p *Proxy) Endpoint() naming.Endpoint {
444 return version.Endpoint(p.ln.Addr().Network(), p.ln.Addr().String(), p.rid)
445}
446
447// Shutdown stops the proxy service, closing all network connections.
448func (p *Proxy) Shutdown() {
449 p.ln.Close()
450 p.mu.Lock()
451 defer p.mu.Unlock()
Asim Shankara263fc52014-06-02 22:00:26 -0700452 for process, _ := range p.processes {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700453 process.Close()
454 }
455}
456
457func (p *process) String() string {
458 r := p.Conn.RemoteAddr()
459 return fmt.Sprintf("(%s, %s)", r.Network(), r)
460}
461
462func (p *process) Route(vci id.VC) *destination {
463 p.mu.RLock()
464 defer p.mu.RUnlock()
465 return p.routingTable[vci]
466}
467
468func (p *process) AddRoute(vci id.VC, d *destination) {
469 p.mu.Lock()
470 p.routingTable[vci] = d
471 p.mu.Unlock()
472}
473
Asim Shankara263fc52014-06-02 22:00:26 -0700474func (p *process) InitVCI(vci id.VC) {
475 p.mu.Lock()
476 if p.nextVCI <= vci {
477 p.nextVCI = vci + 1
478 }
479 p.mu.Unlock()
480}
481
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700482func (p *process) AllocVCI() id.VC {
483 p.mu.Lock()
484 ret := p.nextVCI
485 p.nextVCI += 2
486 p.mu.Unlock()
487 return ret
488}
489
490func (p *process) RemoveRoute(vci id.VC) {
491 p.mu.Lock()
492 delete(p.routingTable, vci)
493 p.mu.Unlock()
494}
495
496func (p *process) SendCloseVC(vci id.VC, err error) {
497 var estr string
498 if err != nil {
499 estr = err.Error()
500 }
501 p.Queue.Put(&message.CloseVC{VCI: vci, Error: estr})
502}
503
504func (p *process) Close() {
505 p.mu.Lock()
506 rt := p.routingTable
507 p.routingTable = nil
Asim Shankara263fc52014-06-02 22:00:26 -0700508 for _, vc := range p.servers {
509 vc.Close("net.Conn is closing")
510 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700511 p.mu.Unlock()
512
513 for _, d := range rt {
514 d.Process.SendCloseVC(d.VCI, errProcessVanished)
515 }
516 p.BQ.Close()
517 p.Queue.Close()
518 p.Conn.Close()
519}
520
Asim Shankara263fc52014-06-02 22:00:26 -0700521func (p *process) ServerVC(vci id.VC) *vc.VC {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700522 p.mu.Lock()
523 defer p.mu.Unlock()
Asim Shankara263fc52014-06-02 22:00:26 -0700524 return p.servers[vci]
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700525}
526
Asim Shankara263fc52014-06-02 22:00:26 -0700527func (p *process) NewServerVC(m *message.OpenVC) *vc.VC {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700528 p.mu.Lock()
529 defer p.mu.Unlock()
Asim Shankara263fc52014-06-02 22:00:26 -0700530 if vc := p.servers[m.VCI]; vc != nil {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700531 vc.Close("duplicate OpenVC request")
Asim Shankara263fc52014-06-02 22:00:26 -0700532 return nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700533 }
534 vc := vc.InternalNew(vc.Params{
535 VCI: m.VCI,
536 LocalEP: m.DstEndpoint,
537 RemoteEP: m.SrcEndpoint,
538 Pool: iobuf.NewPool(0),
539 ReserveBytes: message.HeaderSizeBytes,
540 Helper: p,
541 })
Asim Shankara263fc52014-06-02 22:00:26 -0700542 p.servers[m.VCI] = vc
543 proxyLog().Infof("Registered VC %v from server on process %v", vc, p)
544 return vc
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700545}
546
Asim Shankara263fc52014-06-02 22:00:26 -0700547func (p *process) RemoveServerVC(vci id.VC) *vc.VC {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700548 p.mu.Lock()
549 defer p.mu.Unlock()
Asim Shankara263fc52014-06-02 22:00:26 -0700550 if vc := p.servers[vci]; vc != nil {
551 delete(p.servers, vci)
552 proxyLog().Infof("Unregistered server VC %v from process %v", vc, p)
553 return vc
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700554 }
Asim Shankara263fc52014-06-02 22:00:26 -0700555 return nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700556}
557
558// Make process implement vc.Helper
559func (p *process) NotifyOfNewFlow(vci id.VC, fid id.Flow, bytes uint) {
560 msg := &message.OpenFlow{VCI: vci, Flow: fid, InitialCounters: uint32(bytes)}
561 if err := p.Queue.Put(msg); err != nil {
562 processLog().Infof("Failed to send OpenFlow(%+v) on process %v: %v", msg, p, err)
563 }
564}
565
566func (p *process) AddReceiveBuffers(vci id.VC, fid id.Flow, bytes uint) {
567 if bytes == 0 {
568 return
569 }
570 msg := &message.AddReceiveBuffers{Counters: message.NewCounters()}
571 msg.Counters.Add(vci, fid, uint32(bytes))
572 if err := p.Queue.Put(msg); err != nil {
573 processLog().Infof("Failed to send AddReceiveBuffers(%+v) on process %v: %v", msg, p, err)
574 }
575}
576
577func (p *process) NewWriter(vci id.VC, fid id.Flow) (bqueue.Writer, error) {
578 return p.BQ.NewWriter(packIDs(vci, fid), 0, vc.DefaultBytesBufferedPerFlow)
579}
580
581// Convenience functions to assist with the logging convention.
582func proxyLog() vlog.InfoLog { return vlog.VI(1) }
583func processLog() vlog.InfoLog { return vlog.VI(2) }
584func vcLog() vlog.InfoLog { return vlog.VI(3) }
585func msgLog() vlog.InfoLog { return vlog.VI(4) }
586
587func packIDs(vci id.VC, fid id.Flow) bqueue.ID {
588 return bqueue.ID(message.MakeCounterID(vci, fid))
589}
590
591func unpackIDs(b bqueue.ID) (id.VC, id.Flow) {
592 cid := message.CounterID(b)
593 return cid.VCI(), cid.Flow()
594}