blob: 095357b73066593a2ca39722439d56586bd5ce11 [file] [log] [blame]
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001package proxy
2
3import (
4 "errors"
5 "fmt"
6 "net"
7 "sync"
Cosmos Nicolaoud6c3c9c2014-09-30 15:42:53 -07008
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -08009 "veyron.io/veyron/veyron2/ipc/stream"
Jiri Simsa519c5072014-09-17 21:37:57 -070010 "veyron.io/veyron/veyron2/naming"
11 "veyron.io/veyron/veyron2/security"
12 "veyron.io/veyron/veyron2/verror"
13 "veyron.io/veyron/veyron2/vlog"
14 "veyron.io/veyron/veyron2/vom"
Cosmos Nicolaou87c0a552014-12-02 23:05:49 -080015
Cosmos Nicolaou87c0a552014-12-02 23:05:49 -080016 "veyron.io/veyron/veyron/runtimes/google/ipc/stream/crypto"
17 "veyron.io/veyron/veyron/runtimes/google/ipc/stream/id"
18 "veyron.io/veyron/veyron/runtimes/google/ipc/stream/message"
19 "veyron.io/veyron/veyron/runtimes/google/ipc/stream/vc"
20 "veyron.io/veyron/veyron/runtimes/google/ipc/stream/vif"
21 "veyron.io/veyron/veyron/runtimes/google/ipc/version"
22 "veyron.io/veyron/veyron/runtimes/google/lib/bqueue"
23 "veyron.io/veyron/veyron/runtimes/google/lib/bqueue/drrqueue"
24 "veyron.io/veyron/veyron/runtimes/google/lib/iobuf"
25 "veyron.io/veyron/veyron/runtimes/google/lib/upcqueue"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070026)
27
28var (
29 errNoRoutingTableEntry = errors.New("routing table has no entry for the VC")
30 errProcessVanished = errors.New("remote process vanished")
31 errDuplicateOpenVC = errors.New("duplicate OpenVC request")
32)
33
34// Proxy routes virtual circuit (VC) traffic between multiple underlying
35// network connections.
36type Proxy struct {
Robin Thellend020cd222014-06-10 14:29:28 -070037 ln net.Listener
38 rid naming.RoutingID
Jason Hickey96d30e82014-11-13 07:40:00 -080039 principal security.Principal
Robin Thellend020cd222014-06-10 14:29:28 -070040 mu sync.RWMutex
41 servers *servermap
42 processes map[*process]struct{}
43 pubAddress string
Jiri Simsa5293dcb2014-05-10 09:56:38 -070044}
45
46// process encapsulates the physical network connection and the routing table
Asim Shankara263fc52014-06-02 22:00:26 -070047// associated with the process at the other end of the network connection.
Jiri Simsa5293dcb2014-05-10 09:56:38 -070048type process struct {
Ankur77c7f032014-06-10 16:52:13 -070049 Conn net.Conn
Jason Hickey96d30e82014-11-13 07:40:00 -080050 isSetup bool
51 ctrlCipher crypto.ControlCipher
Ankur77c7f032014-06-10 16:52:13 -070052 Queue *upcqueue.T
Jiri Simsa5293dcb2014-05-10 09:56:38 -070053 mu sync.RWMutex
54 routingTable map[id.VC]*destination
55 nextVCI id.VC
Asim Shankara263fc52014-06-02 22:00:26 -070056 servers map[id.VC]*vc.VC // servers wishing to be proxied create a VC that terminates at the proxy
Ankur77c7f032014-06-10 16:52:13 -070057 BQ bqueue.T // Flow control for messages sent on behalf of servers.
Jiri Simsa5293dcb2014-05-10 09:56:38 -070058}
59
Asim Shankara263fc52014-06-02 22:00:26 -070060// destination is an entry in the routingtable of a process.
Jiri Simsa5293dcb2014-05-10 09:56:38 -070061type destination struct {
62 VCI id.VC
63 Process *process
64}
65
Asim Shankara263fc52014-06-02 22:00:26 -070066// server encapsulates information stored about a server exporting itself via the proxy.
67type server struct {
68 Process *process
69 VC *vc.VC
70}
71
72func (s *server) RoutingID() naming.RoutingID { return s.VC.RemoteAddr().RoutingID() }
73func (s *server) Close(err error) {
74 if vc := s.Process.RemoveServerVC(s.VC.VCI()); vc != nil {
75 if err != nil {
76 vc.Close(err.Error())
77 } else {
78 vc.Close("server closed by proxy")
79 }
Asim Shankar0ea02ab2014-06-09 11:39:24 -070080 s.Process.SendCloseVC(s.VC.VCI(), err)
Asim Shankara263fc52014-06-02 22:00:26 -070081 }
Asim Shankara263fc52014-06-02 22:00:26 -070082}
83func (s *server) String() string {
Asim Shankar220a0152014-10-30 21:21:09 -070084 return fmt.Sprintf("RoutingID %v on process %v (VCI:%v Blessings:%v)", s.RoutingID(), s.Process, s.VC.VCI(), s.VC.RemoteBlessings())
Asim Shankara263fc52014-06-02 22:00:26 -070085}
86
87// servermap is a concurrent-access safe map from the RoutingID of a server exporting itself
88// through the proxy to the underlying network connection that the server is found on.
89type servermap struct {
90 mu sync.Mutex
91 m map[naming.RoutingID]*server
92}
93
94func (m *servermap) Add(server *server) error {
95 key := server.RoutingID()
96 m.mu.Lock()
97 defer m.mu.Unlock()
98 if m.m[key] != nil {
99 return fmt.Errorf("server with routing id %v is already being proxied", key)
100 }
101 m.m[key] = server
102 proxyLog().Infof("Started proxying server: %v", server)
103 return nil
104}
Asim Shankara263fc52014-06-02 22:00:26 -0700105func (m *servermap) Remove(server *server) {
106 key := server.RoutingID()
107 m.mu.Lock()
108 if m.m[key] != nil {
109 delete(m.m, key)
110 proxyLog().Infof("Stopped proxying server: %v", server)
111 }
112 m.mu.Unlock()
113}
Asim Shankara263fc52014-06-02 22:00:26 -0700114func (m *servermap) Process(rid naming.RoutingID) *process {
115 m.mu.Lock()
116 defer m.mu.Unlock()
117 if s := m.m[rid]; s != nil {
118 return s.Process
119 }
120 return nil
121}
Asim Shankara263fc52014-06-02 22:00:26 -0700122func (m *servermap) List() []string {
123 m.mu.Lock()
124 defer m.mu.Unlock()
125 ret := make([]string, 0, len(m.m))
126 for _, s := range m.m {
127 ret = append(ret, s.String())
128 }
129 return ret
130}
131
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700132// New creates a new Proxy that listens for network connections on the provided
133// (network, address) pair and routes VC traffic between accepted connections.
Asim Shankar7cf29002014-10-09 00:38:37 -0700134func New(rid naming.RoutingID, principal security.Principal, network, address, pubAddress string) (*Proxy, error) {
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800135 _, listenFn := stream.RegisteredProtocol(network)
136 if listenFn == nil {
137 return nil, fmt.Errorf("unknown network %s", network)
138 }
139 ln, err := listenFn(network, address)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700140 if err != nil {
141 return nil, fmt.Errorf("net.Listen(%q, %q) failed: %v", network, address, err)
142 }
Robin Thellend020cd222014-06-10 14:29:28 -0700143 if len(pubAddress) == 0 {
144 pubAddress = ln.Addr().String()
145 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700146 proxy := &Proxy{
Robin Thellend020cd222014-06-10 14:29:28 -0700147 ln: ln,
148 rid: rid,
Robin Thellend020cd222014-06-10 14:29:28 -0700149 servers: &servermap{m: make(map[naming.RoutingID]*server)},
150 processes: make(map[*process]struct{}),
151 pubAddress: pubAddress,
Jason Hickey96d30e82014-11-13 07:40:00 -0800152 principal: principal,
Ankura3c97652014-07-17 20:01:21 -0700153 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700154 go proxy.listenLoop()
155 return proxy, nil
156}
Ankura3c97652014-07-17 20:01:21 -0700157
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700158func (p *Proxy) listenLoop() {
159 proxyLog().Infof("Proxy listening on (%q, %q): %v", p.ln.Addr().Network(), p.ln.Addr(), p.Endpoint())
160 for {
161 conn, err := p.ln.Accept()
162 if err != nil {
163 proxyLog().Infof("Exiting listenLoop of proxy %q: %v", p.Endpoint(), err)
164 return
165 }
Jason Hickey96d30e82014-11-13 07:40:00 -0800166 go p.acceptProcess(conn)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700167 }
168}
Jason Hickey96d30e82014-11-13 07:40:00 -0800169
170func (p *Proxy) acceptProcess(conn net.Conn) {
171 process := &process{
172 Conn: conn,
173 ctrlCipher: &crypto.NullControlCipher{},
174 Queue: upcqueue.New(),
175 routingTable: make(map[id.VC]*destination),
176 servers: make(map[id.VC]*vc.VC),
177 BQ: drrqueue.New(vc.MaxPayloadSizeBytes),
178 }
179 go writeLoop(process)
180 go serverVCsLoop(process)
181 go p.readLoop(process)
182}
183
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700184func writeLoop(process *process) {
185 defer processLog().Infof("Exited writeLoop for %v", process)
186 defer process.Close()
187 for {
188 item, err := process.Queue.Get(nil)
189 if err != nil {
190 if err != upcqueue.ErrQueueIsClosed {
191 processLog().Infof("upcqueue.Get failed on %v: %v", process, err)
192 }
193 return
194 }
Jason Hickey96d30e82014-11-13 07:40:00 -0800195 if err = message.WriteTo(process.Conn, item.(message.T), process.ctrlCipher); err != nil {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700196 processLog().Infof("message.WriteTo on %v failed: %v", process, err)
197 return
198 }
199 }
200}
Asim Shankara263fc52014-06-02 22:00:26 -0700201func serverVCsLoop(process *process) {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700202 for {
203 w, bufs, err := process.BQ.Get(nil)
204 if err != nil {
205 return
206 }
207 vci, fid := unpackIDs(w.ID())
Asim Shankara263fc52014-06-02 22:00:26 -0700208 if vc := process.ServerVC(vci); vc != nil {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700209 queueDataMessages(bufs, vc, fid, process.Queue)
210 if len(bufs) == 0 {
211 m := &message.Data{VCI: vci, Flow: fid}
212 m.SetClose()
213 process.Queue.Put(m)
214 w.Shutdown(true)
215 }
216 continue
217 }
218 releaseBufs(0, bufs)
219 }
220}
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700221func releaseBufs(start int, bufs []*iobuf.Slice) {
222 for i := start; i < len(bufs); i++ {
223 bufs[i].Release()
224 }
225}
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700226func queueDataMessages(bufs []*iobuf.Slice, vc *vc.VC, fid id.Flow, q *upcqueue.T) {
227 for ix, b := range bufs {
228 m := &message.Data{VCI: vc.VCI(), Flow: fid}
229 var err error
230 if m.Payload, err = vc.Encrypt(fid, b); err != nil {
231 msgLog().Infof("vc.Encrypt failed. VC:%v Flow:%v Error:%v", vc, fid, err)
232 releaseBufs(ix+1, bufs)
233 return
234 }
235 if err = q.Put(m); err != nil {
236 msgLog().Infof("Failed to enqueue data message %v: %v", m, err)
237 m.Release()
238 releaseBufs(ix+1, bufs)
239 return
240 }
241 }
242}
Asim Shankara263fc52014-06-02 22:00:26 -0700243func (p *Proxy) startProcess(process *process) {
244 p.mu.Lock()
245 p.processes[process] = struct{}{}
246 p.mu.Unlock()
247 processLog().Infof("Started process %v", process)
248}
Asim Shankara263fc52014-06-02 22:00:26 -0700249func (p *Proxy) stopProcess(process *process) {
250 process.Close()
251 p.mu.Lock()
252 delete(p.processes, process)
253 p.mu.Unlock()
254 processLog().Infof("Stopped process %v", process)
255}
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700256func (p *Proxy) readLoop(process *process) {
Asim Shankara263fc52014-06-02 22:00:26 -0700257 p.startProcess(process)
258 defer p.stopProcess(process)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700259 reader := iobuf.NewReader(iobuf.NewPool(0), process.Conn)
260 defer reader.Close()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700261 for {
Jason Hickey96d30e82014-11-13 07:40:00 -0800262 msg, err := message.ReadFrom(reader, process.ctrlCipher)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700263 if err != nil {
264 processLog().Infof("Read on %v failed: %v", process, err)
265 return
266 }
267 msgLog().Infof("Received msg: %T = %v", msg, msg)
268 switch m := msg.(type) {
269 case *message.Data:
Asim Shankara263fc52014-06-02 22:00:26 -0700270 if vc := process.ServerVC(m.VCI); vc != nil {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700271 if err := vc.DispatchPayload(m.Flow, m.Payload); err != nil {
272 processLog().Infof("Ignoring data message %v from process %v: %v", m, process, err)
273 }
274 if m.Close() {
275 vc.ShutdownFlow(m.Flow)
276 }
277 break
278 }
279 srcVCI := m.VCI
280 if d := process.Route(srcVCI); d != nil {
281 m.VCI = d.VCI
282 if err := d.Process.Queue.Put(m); err != nil {
283 process.RemoveRoute(srcVCI)
284 process.SendCloseVC(srcVCI, fmt.Errorf("proxy failed to forward data message: %v", err))
285 }
286 break
287 }
288 process.SendCloseVC(srcVCI, errNoRoutingTableEntry)
289 case *message.OpenFlow:
Asim Shankara263fc52014-06-02 22:00:26 -0700290 if vc := process.ServerVC(m.VCI); vc != nil {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700291 if err := vc.AcceptFlow(m.Flow); err != nil {
292 processLog().Infof("OpenFlow %+v on process %v failed: %v", m, process, err)
293 cm := &message.Data{VCI: m.VCI, Flow: m.Flow}
294 cm.SetClose()
295 process.Queue.Put(cm)
296 }
297 vc.ReleaseCounters(m.Flow, m.InitialCounters)
298 break
299 }
300 srcVCI := m.VCI
301 if d := process.Route(srcVCI); d != nil {
302 m.VCI = d.VCI
303 if err := d.Process.Queue.Put(m); err != nil {
304 process.RemoveRoute(srcVCI)
305 process.SendCloseVC(srcVCI, fmt.Errorf("proxy failed to forward open flow message: %v", err))
306 }
307 break
308 }
309 process.SendCloseVC(srcVCI, errNoRoutingTableEntry)
310 case *message.CloseVC:
Asim Shankara263fc52014-06-02 22:00:26 -0700311 if vc := process.RemoveServerVC(m.VCI); vc != nil {
312 vc.Close(m.Error)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700313 break
314 }
315 srcVCI := m.VCI
316 if d := process.Route(srcVCI); d != nil {
317 m.VCI = d.VCI
318 d.Process.Queue.Put(m)
319 d.Process.RemoveRoute(d.VCI)
320 }
321 process.RemoveRoute(srcVCI)
322 case *message.AddReceiveBuffers:
323 p.routeCounters(process, m.Counters)
324 case *message.OpenVC:
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700325 dstrid := m.DstEndpoint.RoutingID()
Asim Shankara263fc52014-06-02 22:00:26 -0700326 if naming.Compare(dstrid, p.rid) || naming.Compare(dstrid, naming.NullRoutingID) {
327 // VC that terminates at the proxy.
328 // See protocol.vdl for details on the protocol between the server and the proxy.
Ankur77c7f032014-06-10 16:52:13 -0700329 vcObj := process.NewServerVC(m)
Asim Shankara263fc52014-06-02 22:00:26 -0700330 // route counters after creating the VC so counters to vc are not lost.
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700331 p.routeCounters(process, m.Counters)
Ankur77c7f032014-06-10 16:52:13 -0700332 if vcObj != nil {
333 server := &server{Process: process, VC: vcObj}
Jason Hickey96d30e82014-11-13 07:40:00 -0800334 go p.runServer(server, vcObj.HandshakeAcceptedVC(vc.LocalPrincipal{p.principal}))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700335 }
Asim Shankara263fc52014-06-02 22:00:26 -0700336 break
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700337 }
Asim Shankara263fc52014-06-02 22:00:26 -0700338 dstprocess := p.servers.Process(dstrid)
339 if dstprocess == nil {
340 process.SendCloseVC(m.VCI, fmt.Errorf("no server with routing id %v is being proxied", dstrid))
341 p.routeCounters(process, m.Counters)
342 break
343 }
344 srcVCI := m.VCI
345 dstVCI := dstprocess.AllocVCI()
346 startRoutingVC(srcVCI, dstVCI, process, dstprocess)
347 // Forward the OpenVC message.
348 // Typically, an OpenVC message is accompanied with Counters for the new VC.
349 // Keep that in the forwarded message and route the remaining counters separately.
350 counters := m.Counters
351 m.Counters = message.NewCounters()
352 for cid, bytes := range counters {
353 if cid.VCI() == srcVCI {
354 m.Counters.Add(dstVCI, cid.Flow(), bytes)
355 delete(counters, cid)
356 }
357 }
358 m.VCI = dstVCI
359 dstprocess.Queue.Put(m)
360 p.routeCounters(process, counters)
Jason Hickey96d30e82014-11-13 07:40:00 -0800361 case *message.HopSetup:
362 // Set up the hop. This takes over the process during negotiation.
363 if process.isSetup {
364 // Already performed authentication. We don't do it again.
365 processLog().Infof("Process %v is already setup", process)
366 return
367 }
368 var blessings security.Blessings
369 if p.principal != nil {
370 blessings = p.principal.BlessingStore().Default()
371 }
372 c, err := vif.AuthenticateAsServer(process.Conn, nil, p.principal, blessings, nil, m)
373 if err != nil {
374 processLog().Infof("Process %v failed to authenticate: %s", process, err)
375 return
376 }
377 process.ctrlCipher = c
378 process.isSetup = true
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700379 default:
380 processLog().Infof("Closing %v because of unrecognized message %T", process, m)
381 return
382 }
383 }
384}
Asim Shankara263fc52014-06-02 22:00:26 -0700385func (p *Proxy) runServer(server *server, c <-chan vc.HandshakeResult) {
386 hr := <-c
387 if hr.Error != nil {
388 server.Close(hr.Error)
389 return
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700390 }
Asim Shankara263fc52014-06-02 22:00:26 -0700391 // See comments in protocol.vdl for the protocol between servers and the proxy.
392 conn, err := hr.Listener.Accept()
393 if err != nil {
394 server.Close(errors.New("failed to accept health check flow"))
395 return
396 }
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700397 defer server.Close(nil)
Asim Shankara263fc52014-06-02 22:00:26 -0700398 server.Process.InitVCI(server.VC.VCI())
Asim Shankara263fc52014-06-02 22:00:26 -0700399 var request Request
400 var response Response
401 if err := vom.NewDecoder(conn).Decode(&request); err != nil {
402 response.Error = verror.BadProtocolf("proxy: unable to read Request: %v", err)
403 } else if err := p.servers.Add(server); err != nil {
404 response.Error = verror.Convert(err)
405 } else {
406 defer p.servers.Remove(server)
Asim Shankar0ea02ab2014-06-09 11:39:24 -0700407 ep, err := version.ProxiedEndpoint(server.VC.RemoteAddr().RoutingID(), p.Endpoint())
408 if err != nil {
409 response.Error = verror.ConvertWithDefault(verror.Internal, err)
410 }
411 if ep != nil {
412 response.Endpoint = ep.String()
413 }
Asim Shankara263fc52014-06-02 22:00:26 -0700414 }
415 if err := vom.NewEncoder(conn).Encode(response); err != nil {
416 proxyLog().Infof("Failed to encode response %#v for server %v", response, server)
417 server.Close(err)
418 return
419 }
420 // Reject all other flows
421 go func() {
422 for {
423 flow, err := hr.Listener.Accept()
424 if err != nil {
425 return
426 }
427 flow.Close()
428 }
429 }()
430 // Wait for this flow to be closed.
431 <-conn.Closed()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700432}
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700433func (p *Proxy) routeCounters(process *process, counters message.Counters) {
434 // Since each VC can be routed to a different process, split up the
435 // Counters into one message per VC.
436 // Ideally, would split into one message per process (rather than per
Asim Shankara263fc52014-06-02 22:00:26 -0700437 // flow). This optimization is left an as excercise to the interested.
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700438 for cid, bytes := range counters {
439 srcVCI := cid.VCI()
Asim Shankar2fd7a5f2014-06-23 11:57:53 -0700440 if vc := process.ServerVC(srcVCI); vc != nil {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700441 vc.ReleaseCounters(cid.Flow(), bytes)
442 continue
443 }
444 if d := process.Route(srcVCI); d != nil {
445 c := message.NewCounters()
446 c.Add(d.VCI, cid.Flow(), bytes)
447 if err := d.Process.Queue.Put(&message.AddReceiveBuffers{Counters: c}); err != nil {
448 process.RemoveRoute(srcVCI)
449 process.SendCloseVC(srcVCI, fmt.Errorf("proxy failed to forward receive buffers: %v", err))
450 }
451 }
452 }
453}
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700454func startRoutingVC(srcVCI, dstVCI id.VC, srcProcess, dstProcess *process) {
455 dstProcess.AddRoute(dstVCI, &destination{VCI: srcVCI, Process: srcProcess})
456 srcProcess.AddRoute(srcVCI, &destination{VCI: dstVCI, Process: dstProcess})
457 vcLog().Infof("Routing (VCI %d @ [%s]) <-> (VCI %d @ [%s])", srcVCI, srcProcess, dstVCI, dstProcess)
458}
459
460// Endpoint returns the endpoint of the proxy service. By Dialing a VC to this
461// endpoint, processes can have their services exported through the proxy.
462func (p *Proxy) Endpoint() naming.Endpoint {
Cosmos Nicolaouae8dd212014-12-13 23:43:08 -0800463 ep := version.Endpoint(p.ln.Addr().Network(), p.pubAddress, p.rid)
464 return ep
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700465}
466
467// Shutdown stops the proxy service, closing all network connections.
468func (p *Proxy) Shutdown() {
469 p.ln.Close()
470 p.mu.Lock()
471 defer p.mu.Unlock()
Asim Shankara263fc52014-06-02 22:00:26 -0700472 for process, _ := range p.processes {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700473 process.Close()
474 }
475}
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700476func (p *process) String() string {
477 r := p.Conn.RemoteAddr()
478 return fmt.Sprintf("(%s, %s)", r.Network(), r)
479}
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700480func (p *process) Route(vci id.VC) *destination {
481 p.mu.RLock()
482 defer p.mu.RUnlock()
483 return p.routingTable[vci]
484}
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700485func (p *process) AddRoute(vci id.VC, d *destination) {
486 p.mu.Lock()
487 p.routingTable[vci] = d
488 p.mu.Unlock()
489}
Asim Shankara263fc52014-06-02 22:00:26 -0700490func (p *process) InitVCI(vci id.VC) {
491 p.mu.Lock()
492 if p.nextVCI <= vci {
493 p.nextVCI = vci + 1
494 }
495 p.mu.Unlock()
496}
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700497func (p *process) AllocVCI() id.VC {
498 p.mu.Lock()
499 ret := p.nextVCI
500 p.nextVCI += 2
501 p.mu.Unlock()
502 return ret
503}
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700504func (p *process) RemoveRoute(vci id.VC) {
505 p.mu.Lock()
506 delete(p.routingTable, vci)
507 p.mu.Unlock()
508}
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700509func (p *process) SendCloseVC(vci id.VC, err error) {
510 var estr string
511 if err != nil {
512 estr = err.Error()
513 }
514 p.Queue.Put(&message.CloseVC{VCI: vci, Error: estr})
515}
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700516func (p *process) Close() {
517 p.mu.Lock()
518 rt := p.routingTable
519 p.routingTable = nil
Asim Shankara263fc52014-06-02 22:00:26 -0700520 for _, vc := range p.servers {
521 vc.Close("net.Conn is closing")
522 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700523 p.mu.Unlock()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700524 for _, d := range rt {
525 d.Process.SendCloseVC(d.VCI, errProcessVanished)
526 }
527 p.BQ.Close()
528 p.Queue.Close()
529 p.Conn.Close()
530}
Asim Shankara263fc52014-06-02 22:00:26 -0700531func (p *process) ServerVC(vci id.VC) *vc.VC {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700532 p.mu.Lock()
533 defer p.mu.Unlock()
Asim Shankara263fc52014-06-02 22:00:26 -0700534 return p.servers[vci]
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700535}
Asim Shankara263fc52014-06-02 22:00:26 -0700536func (p *process) NewServerVC(m *message.OpenVC) *vc.VC {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700537 p.mu.Lock()
538 defer p.mu.Unlock()
Asim Shankara263fc52014-06-02 22:00:26 -0700539 if vc := p.servers[m.VCI]; vc != nil {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700540 vc.Close("duplicate OpenVC request")
Asim Shankara263fc52014-06-02 22:00:26 -0700541 return nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700542 }
Asim Shankar7cf29002014-10-09 00:38:37 -0700543 version, err := version.CommonVersion(m.DstEndpoint, m.SrcEndpoint)
544 if err != nil {
545 p.SendCloseVC(m.VCI, fmt.Errorf("incompatible IPC protocol versions: %v", err))
546 return nil
547 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700548 vc := vc.InternalNew(vc.Params{
549 VCI: m.VCI,
550 LocalEP: m.DstEndpoint,
551 RemoteEP: m.SrcEndpoint,
552 Pool: iobuf.NewPool(0),
553 ReserveBytes: message.HeaderSizeBytes,
554 Helper: p,
Asim Shankar7cf29002014-10-09 00:38:37 -0700555 Version: version,
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700556 })
Asim Shankara263fc52014-06-02 22:00:26 -0700557 p.servers[m.VCI] = vc
558 proxyLog().Infof("Registered VC %v from server on process %v", vc, p)
559 return vc
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700560}
Asim Shankara263fc52014-06-02 22:00:26 -0700561func (p *process) RemoveServerVC(vci id.VC) *vc.VC {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700562 p.mu.Lock()
563 defer p.mu.Unlock()
Asim Shankara263fc52014-06-02 22:00:26 -0700564 if vc := p.servers[vci]; vc != nil {
565 delete(p.servers, vci)
566 proxyLog().Infof("Unregistered server VC %v from process %v", vc, p)
567 return vc
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700568 }
Asim Shankara263fc52014-06-02 22:00:26 -0700569 return nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700570}
571
572// Make process implement vc.Helper
573func (p *process) NotifyOfNewFlow(vci id.VC, fid id.Flow, bytes uint) {
574 msg := &message.OpenFlow{VCI: vci, Flow: fid, InitialCounters: uint32(bytes)}
575 if err := p.Queue.Put(msg); err != nil {
576 processLog().Infof("Failed to send OpenFlow(%+v) on process %v: %v", msg, p, err)
577 }
578}
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700579func (p *process) AddReceiveBuffers(vci id.VC, fid id.Flow, bytes uint) {
580 if bytes == 0 {
581 return
582 }
583 msg := &message.AddReceiveBuffers{Counters: message.NewCounters()}
584 msg.Counters.Add(vci, fid, uint32(bytes))
585 if err := p.Queue.Put(msg); err != nil {
586 processLog().Infof("Failed to send AddReceiveBuffers(%+v) on process %v: %v", msg, p, err)
587 }
588}
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700589func (p *process) NewWriter(vci id.VC, fid id.Flow) (bqueue.Writer, error) {
590 return p.BQ.NewWriter(packIDs(vci, fid), 0, vc.DefaultBytesBufferedPerFlow)
591}
592
593// Convenience functions to assist with the logging convention.
594func proxyLog() vlog.InfoLog { return vlog.VI(1) }
595func processLog() vlog.InfoLog { return vlog.VI(2) }
596func vcLog() vlog.InfoLog { return vlog.VI(3) }
597func msgLog() vlog.InfoLog { return vlog.VI(4) }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700598func packIDs(vci id.VC, fid id.Flow) bqueue.ID {
599 return bqueue.ID(message.MakeCounterID(vci, fid))
600}
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700601func unpackIDs(b bqueue.ID) (id.VC, id.Flow) {
602 cid := message.CounterID(b)
603 return cid.VCI(), cid.Flow()
604}