blob: a7a1fcb3c30a95afe171afbd1d24778f1c821656 [file] [log] [blame]
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001package vif
2
3// Logging guidelines:
4// vlog.VI(1) for per-net.Conn information
5// vlog.VI(2) for per-VC information
6// vlog.VI(3) for per-Flow information
7
8import (
9 "bytes"
Jason Hickey96d30e82014-11-13 07:40:00 -080010 "errors"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070011 "fmt"
12 "net"
Robin Thellend5bd72422015-02-17 12:36:38 -080013 "sort"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070014 "strings"
15 "sync"
16 "time"
17
Suharsh Sivakumaraf862a52015-02-04 13:50:47 -080018 "v.io/core/veyron/runtimes/google/ipc/stream"
Jiri Simsa764efb72014-12-25 20:57:03 -080019 "v.io/core/veyron/runtimes/google/ipc/stream/crypto"
20 "v.io/core/veyron/runtimes/google/ipc/stream/id"
21 "v.io/core/veyron/runtimes/google/ipc/stream/message"
22 "v.io/core/veyron/runtimes/google/ipc/stream/vc"
23 "v.io/core/veyron/runtimes/google/ipc/version"
24 "v.io/core/veyron/runtimes/google/lib/bqueue"
25 "v.io/core/veyron/runtimes/google/lib/bqueue/drrqueue"
26 "v.io/core/veyron/runtimes/google/lib/iobuf"
27 "v.io/core/veyron/runtimes/google/lib/pcqueue"
28 vsync "v.io/core/veyron/runtimes/google/lib/sync"
29 "v.io/core/veyron/runtimes/google/lib/upcqueue"
Jiri Simsa764efb72014-12-25 20:57:03 -080030 "v.io/core/veyron2/naming"
Todd Wangff73e1f2015-02-10 21:45:52 -080031 "v.io/core/veyron2/verror"
Jiri Simsa764efb72014-12-25 20:57:03 -080032 "v.io/core/veyron2/vlog"
33 "v.io/core/veyron2/vtrace"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070034)
35
Mike Burrows35baad92015-02-10 13:42:53 -080036const pkgPath = "v.io/core/veyron/runtimes/google/ipc/stream/vif"
37
38var (
39 errShuttingDown = verror.Register(pkgPath+".errShuttingDown", verror.NoRetry, "{1:}{2:} underlying network connection({3}) shutting down{:_}")
40)
41
Jiri Simsa5293dcb2014-05-10 09:56:38 -070042// VIF implements a "virtual interface" over an underlying network connection
43// (net.Conn). Just like multiple network connections can be established over a
44// single physical interface, multiple Virtual Circuits (VCs) can be
45// established over a single VIF.
46type VIF struct {
Jungho Ahn4b9a5192015-02-02 13:11:08 -080047 // All reads must be performed through reader, and not directly through conn.
Jiri Simsa5293dcb2014-05-10 09:56:38 -070048 conn net.Conn
Jiri Simsa5293dcb2014-05-10 09:56:38 -070049 pool *iobuf.Pool
Jungho Ahn4b9a5192015-02-02 13:11:08 -080050 reader *iobuf.Reader
Jiri Simsa5293dcb2014-05-10 09:56:38 -070051 localEP naming.Endpoint
52
Jason Hickey96d30e82014-11-13 07:40:00 -080053 // control channel encryption.
54 isSetup bool
55 // ctrlCipher is normally guarded by writeMu, however see the exception in
56 // readLoop.
57 ctrlCipher crypto.ControlCipher
58 writeMu sync.Mutex
59
Asim Shankarf5781102014-06-26 22:05:35 -070060 vcMap *vcMap
Tilak Sharma6d7c39c2014-06-27 10:05:37 -070061 wpending, rpending vsync.WaitGroup
Jiri Simsa5293dcb2014-05-10 09:56:38 -070062
63 muListen sync.Mutex
64 acceptor *upcqueue.T // GUARDED_BY(muListen)
65 listenerOpts []stream.ListenerOpt // GUARDED_BY(muListen)
66
67 muNextVCI sync.Mutex
68 nextVCI id.VC
69
70 outgoing bqueue.T
71 expressQ bqueue.Writer
72
73 flowQ bqueue.Writer
74 flowMu sync.Mutex
75 flowCounters message.Counters
76
77 stopQ bqueue.Writer
78
79 // The IPC version range supported by this VIF. In practice this is
80 // non-nil only in testing. nil is equivalent to using the versions
81 // actually supported by this IPC implementation (which is always
82 // what you want outside of tests).
83 versions *version.Range
Jungho Ahn4b9a5192015-02-02 13:11:08 -080084
85 isClosedMu sync.Mutex
86 isClosed bool // GUARDED_BY(isClosedMu)
Robin Thellend2224ffa2015-02-14 21:28:27 -080087
88 // All sets that this VIF is in.
89 muSets sync.Mutex
90 sets []*Set // GUARDED_BY(muSets)
Robin Thellend5bd72422015-02-17 12:36:38 -080091
92 // These counters track the number of messages sent and received by
93 // this VIF.
94 muMsgCounters sync.Mutex
95 msgCounters map[string]int64
Jiri Simsa5293dcb2014-05-10 09:56:38 -070096}
97
98// ConnectorAndFlow represents a Flow and the Connector that can be used to
99// create another Flow over the same underlying VC.
100type ConnectorAndFlow struct {
101 Connector stream.Connector
102 Flow stream.Flow
103}
104
105// Separate out constants that are not exported so that godoc looks nicer for
106// the exported ones.
107const (
108 // Priorities of the buffered queues used for flow control of writes.
109 expressPriority bqueue.Priority = iota
110 flowPriority
111 normalPriority
112 stopPriority
113
114 // Convenience aliases so that the package name "vc" does not
115 // conflict with the variables named "vc".
116 defaultBytesBufferedPerFlow = vc.DefaultBytesBufferedPerFlow
117 sharedFlowID = vc.SharedFlowID
118)
119
Jason Hickey96d30e82014-11-13 07:40:00 -0800120var (
121 errAlreadySetup = errors.New("VIF is already setup")
122)
123
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700124// InternalNewDialedVIF creates a new virtual interface over the provided
125// network connection, under the assumption that the conn object was created
126// using net.Dial.
127//
128// As the name suggests, this method is intended for use only within packages
129// placed inside veyron/runtimes/google. Code outside the
130// veyron2/runtimes/google/* packages should never call this method.
Jason Hickey96d30e82014-11-13 07:40:00 -0800131func InternalNewDialedVIF(conn net.Conn, rid naming.RoutingID, versions *version.Range, opts ...stream.VCOpt) (*VIF, error) {
Asim Shankarf4864f42014-11-25 18:53:05 -0800132 ctx, principal, dc, err := clientAuthOptions(opts)
Jason Hickey96d30e82014-11-13 07:40:00 -0800133 if err != nil {
134 return nil, err
135 }
Asim Shankarf4864f42014-11-25 18:53:05 -0800136 if ctx != nil {
137 var span vtrace.Span
Matt Rosencrantz5f98d942015-01-08 13:48:30 -0800138 ctx, span = vtrace.SetNewSpan(ctx, "InternalNewDialedVIF")
Asim Shankarf4864f42014-11-25 18:53:05 -0800139 span.Annotatef("(%v, %v)", conn.RemoteAddr().Network(), conn.RemoteAddr())
140 defer span.Finish()
141 }
Jungho Ahn4b9a5192015-02-02 13:11:08 -0800142 pool := iobuf.NewPool(0)
143 reader := iobuf.NewReader(pool, conn)
144 c, err := AuthenticateAsClient(ctx, conn, reader, versions, principal, dc)
Jason Hickey96d30e82014-11-13 07:40:00 -0800145 if err != nil {
146 return nil, err
147 }
Jungho Ahn4b9a5192015-02-02 13:11:08 -0800148 return internalNew(conn, pool, reader, rid, id.VC(vc.NumReservedVCs), versions, nil, nil, c)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700149}
150
151// InternalNewAcceptedVIF creates a new virtual interface over the provided
152// network connection, under the assumption that the conn object was created
Asim Shankar3d133872014-05-16 23:16:31 -0700153// using an Accept call on a net.Listener object.
154//
155// The returned VIF is also setup for accepting new VCs and Flows with the provided
156// ListenerOpts.
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700157//
158// As the name suggests, this method is intended for use only within packages
159// placed inside veyron/runtimes/google. Code outside the
160// veyron/runtimes/google/* packages should never call this method.
Asim Shankar3d133872014-05-16 23:16:31 -0700161func InternalNewAcceptedVIF(conn net.Conn, rid naming.RoutingID, versions *version.Range, lopts ...stream.ListenerOpt) (*VIF, error) {
Jungho Ahn4b9a5192015-02-02 13:11:08 -0800162 pool := iobuf.NewPool(0)
163 reader := iobuf.NewReader(pool, conn)
164 return internalNew(conn, pool, reader, rid, id.VC(vc.NumReservedVCs)+1, versions, upcqueue.New(), lopts, &crypto.NullControlCipher{})
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700165}
166
Jungho Ahn4b9a5192015-02-02 13:11:08 -0800167func internalNew(conn net.Conn, pool *iobuf.Pool, reader *iobuf.Reader, rid naming.RoutingID, initialVCI id.VC, versions *version.Range, acceptor *upcqueue.T, listenerOpts []stream.ListenerOpt, c crypto.ControlCipher) (*VIF, error) {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700168 // Some cloud providers (like Google Compute Engine) seem to blackhole
169 // inactive TCP connections, set a TCP keep alive to prevent that.
170 // See: https://developers.google.com/compute/docs/troubleshooting#communicatewithinternet
171 if tcpconn, ok := conn.(*net.TCPConn); ok {
172 if err := tcpconn.SetKeepAlivePeriod(30 * time.Second); err != nil {
173 vlog.Errorf("Failed to set TCP keep alive: %v", err)
174 } else {
175 tcpconn.SetKeepAlive(true)
176 }
177 }
178 var (
179 // Choose IDs that will not conflict with any other (VC, Flow)
180 // pairs. VCI 0 is never used by the application (it is
181 // reserved for control messages), so steal from the Flow space
182 // there.
183 expressID bqueue.ID = packIDs(0, 0)
184 flowID bqueue.ID = packIDs(0, 1)
185 stopID bqueue.ID = packIDs(0, 2)
186 )
187 outgoing := drrqueue.New(vc.MaxPayloadSizeBytes)
188
189 expressQ, err := outgoing.NewWriter(expressID, expressPriority, defaultBytesBufferedPerFlow)
190 if err != nil {
191 return nil, fmt.Errorf("failed to create bqueue.Writer for express messages: %v", err)
192 }
193 expressQ.Release(-1) // Disable flow control
194
195 flowQ, err := outgoing.NewWriter(flowID, flowPriority, flowToken.Size())
196 if err != nil {
197 return nil, fmt.Errorf("failed to create bqueue.Writer for flow control counters: %v", err)
198 }
199 flowQ.Release(-1) // Disable flow control
200
201 stopQ, err := outgoing.NewWriter(stopID, stopPriority, 1)
202 if err != nil {
203 return nil, fmt.Errorf("failed to create bqueue.Writer for stopping the write loop: %v", err)
204 }
205 stopQ.Release(-1) // Disable flow control
206
207 localAddr := conn.LocalAddr()
208 ep := version.Endpoint(localAddr.Network(), localAddr.String(), rid)
209 if versions != nil {
210 ep = versions.Endpoint(localAddr.Network(), localAddr.String(), rid)
211 }
212 vif := &VIF{
213 conn: conn,
Jungho Ahn4b9a5192015-02-02 13:11:08 -0800214 pool: pool,
215 reader: reader,
Jason Hickey96d30e82014-11-13 07:40:00 -0800216 ctrlCipher: c,
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700217 vcMap: newVCMap(),
Asim Shankar3d133872014-05-16 23:16:31 -0700218 acceptor: acceptor,
219 listenerOpts: listenerOpts,
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700220 localEP: ep,
221 nextVCI: initialVCI,
222 outgoing: outgoing,
223 expressQ: expressQ,
224 flowQ: flowQ,
225 flowCounters: message.NewCounters(),
226 stopQ: stopQ,
227 versions: versions,
Robin Thellend5bd72422015-02-17 12:36:38 -0800228 msgCounters: make(map[string]int64),
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700229 }
230 go vif.readLoop()
231 go vif.writeLoop()
232 return vif, nil
233}
234
235// Dial creates a new VC to the provided remote identity, authenticating the VC
236// with the provided local identity.
237func (vif *VIF) Dial(remoteEP naming.Endpoint, opts ...stream.VCOpt) (stream.VC, error) {
Asim Shankar7b5c94a2014-10-28 21:42:56 -0700238 vc, err := vif.newVC(vif.allocVCI(), vif.localEP, remoteEP, true)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700239 if err != nil {
240 return nil, err
241 }
242 counters := message.NewCounters()
243 counters.Add(vc.VCI(), sharedFlowID, defaultBytesBufferedPerFlow)
244 err = vif.sendOnExpressQ(&message.OpenVC{
245 VCI: vc.VCI(),
246 DstEndpoint: remoteEP,
Asim Shankar7b5c94a2014-10-28 21:42:56 -0700247 SrcEndpoint: vif.localEP,
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700248 Counters: counters})
249 if err != nil {
250 err = fmt.Errorf("vif.sendOnExpressQ(OpenVC) failed: %v", err)
251 vc.Close(err.Error())
252 return nil, err
253 }
254 if err := vc.HandshakeDialedVC(opts...); err != nil {
255 vif.vcMap.Delete(vc.VCI())
256 err = fmt.Errorf("VC handshake failed: %v", err)
257 vc.Close(err.Error())
258 return nil, err
259 }
260 return vc, nil
261}
262
Robin Thellend2224ffa2015-02-14 21:28:27 -0800263// addSet adds a set to the list of sets this VIF is in. This method is called
264// by Set.Insert().
265func (vif *VIF) addSet(s *Set) {
266 vif.muSets.Lock()
267 defer vif.muSets.Unlock()
268 vif.sets = append(vif.sets, s)
269}
270
271// removeSet removes a set from the list of sets this VIF is in. This method is
272// called by Set.Delete().
273func (vif *VIF) removeSet(s *Set) {
274 vif.muSets.Lock()
275 defer vif.muSets.Unlock()
276 for ix, vs := range vif.sets {
277 if vs == s {
278 vif.sets = append(vif.sets[:ix], vif.sets[ix+1:]...)
279 return
280 }
281 }
282
283}
284
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700285// Close closes all VCs (and thereby Flows) over the VIF and then closes the
286// underlying network connection after draining all pending writes on those
287// VCs.
288func (vif *VIF) Close() {
Jungho Ahn4b9a5192015-02-02 13:11:08 -0800289 vif.isClosedMu.Lock()
290 if vif.isClosed {
291 vif.isClosedMu.Unlock()
292 return
293 }
294 vif.isClosed = true
295 vif.isClosedMu.Unlock()
296
Robin Thellend2224ffa2015-02-14 21:28:27 -0800297 vif.muSets.Lock()
298 sets := vif.sets
299 vif.sets = nil
300 vif.muSets.Unlock()
301 for _, s := range sets {
302 s.Delete(vif)
303 }
304
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700305 vlog.VI(1).Infof("Closing VIF %s", vif)
306 // Stop accepting new VCs.
307 vif.StopAccepting()
308 // Close local datastructures for all existing VCs.
309 vcs := vif.vcMap.Freeze()
310 for _, vc := range vcs {
311 vc.VC.Close("VIF is being closed")
312 }
313 // Wait for the vcWriteLoops to exit (after draining queued up messages).
314 vif.stopQ.Close()
Asim Shankarf5781102014-06-26 22:05:35 -0700315 vif.wpending.Wait()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700316 // Close the underlying network connection.
317 // No need to send individual messages to close all pending VCs since
318 // the remote end should know to close all VCs when the VIF's
319 // connection breaks.
320 if err := vif.conn.Close(); err != nil {
321 vlog.VI(1).Infof("net.Conn.Close failed on VIF %s: %v", vif, err)
322 }
323}
324
325// StartAccepting begins accepting Flows (and VCs) initiated by the remote end
326// of a VIF. opts is used to setup the listener on newly established VCs.
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700327func (vif *VIF) StartAccepting(opts ...stream.ListenerOpt) error {
328 vif.muListen.Lock()
329 defer vif.muListen.Unlock()
330 if vif.acceptor != nil {
331 return fmt.Errorf("already accepting Flows on VIF %v", vif)
332 }
333 vif.acceptor = upcqueue.New()
334 vif.listenerOpts = opts
335 return nil
336}
337
338// StopAccepting prevents any Flows initiated by the remote end of a VIF from
339// being accepted and causes any existing and future calls to Accept to fail
340// immediately.
341func (vif *VIF) StopAccepting() {
342 vif.muListen.Lock()
343 defer vif.muListen.Unlock()
344 if vif.acceptor != nil {
345 vif.acceptor.Shutdown()
346 vif.acceptor = nil
347 vif.listenerOpts = nil
348 }
349}
350
351// Accept returns the (stream.Connector, stream.Flow) pair of a newly
352// established VC and/or Flow.
353//
354// Sample usage:
355// for {
356// cAndf, err := vif.Accept()
357// switch {
358// case err != nil:
359// fmt.Println("Accept error:", err)
360// return
361// case cAndf.Flow == nil:
362// fmt.Println("New VC established:", cAndf.Connector)
363// default:
364// fmt.Println("New flow established")
365// go handleFlow(cAndf.Flow)
366// }
367// }
368func (vif *VIF) Accept() (ConnectorAndFlow, error) {
369 vif.muListen.Lock()
370 acceptor := vif.acceptor
371 vif.muListen.Unlock()
372 if acceptor == nil {
373 return ConnectorAndFlow{}, fmt.Errorf("VCs not accepted on VIF %v", vif)
374 }
375 item, err := acceptor.Get(nil)
376 if err != nil {
377 return ConnectorAndFlow{}, fmt.Errorf("Accept failed: %v", err)
378 }
379 return item.(ConnectorAndFlow), nil
380}
381
382func (vif *VIF) String() string {
383 l := vif.conn.LocalAddr()
384 r := vif.conn.RemoteAddr()
385 return fmt.Sprintf("(%s, %s) <-> (%s, %s)", r.Network(), r, l.Network(), l)
386}
387
388func (vif *VIF) readLoop() {
389 defer vif.Close()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700390 defer vif.stopVCDispatchLoops()
391 for {
Jason Hickey96d30e82014-11-13 07:40:00 -0800392 // vif.ctrlCipher is guarded by vif.writeMu. However, the only mutation
393 // to it is in handleMessage, which runs in the same goroutine, so a
394 // lock is not required here.
Jungho Ahn4b9a5192015-02-02 13:11:08 -0800395 msg, err := message.ReadFrom(vif.reader, vif.ctrlCipher)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700396 if err != nil {
397 vlog.VI(1).Infof("Exiting readLoop of VIF %s because of read error: %v", vif, err)
398 return
399 }
400 vlog.VI(3).Infof("Received %T = [%v] on VIF %s", msg, msg, vif)
Jason Hickey96d30e82014-11-13 07:40:00 -0800401 if err := vif.handleMessage(msg); err != nil {
402 vlog.VI(1).Infof("Exiting readLoop of VIF %s because of message error: %v", vif, err)
403 return
404 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700405 }
406}
407
Jason Hickey96d30e82014-11-13 07:40:00 -0800408// handleMessage handles a single incoming message. Any error returned is
409// fatal, causing the VIF to close.
410func (vif *VIF) handleMessage(msg message.T) error {
Robin Thellend5bd72422015-02-17 12:36:38 -0800411 vif.muMsgCounters.Lock()
412 vif.msgCounters[fmt.Sprintf("Recv(%T)", msg)]++
413 vif.muMsgCounters.Unlock()
414
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700415 switch m := msg.(type) {
416 case *message.Data:
417 _, rq, _ := vif.vcMap.Find(m.VCI)
418 if rq == nil {
419 vlog.VI(2).Infof("Ignoring message of %d bytes for unrecognized VCI %d on VIF %s", m.Payload.Size(), m.VCI, vif)
420 m.Release()
Jason Hickey96d30e82014-11-13 07:40:00 -0800421 return nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700422 }
423 if err := rq.Put(m, nil); err != nil {
424 vlog.VI(2).Infof("Failed to put message(%v) on VC queue on VIF %v: %v", m, vif, err)
425 m.Release()
426 }
427 case *message.OpenVC:
428 vif.muListen.Lock()
429 closed := vif.acceptor == nil || vif.acceptor.IsClosed()
430 lopts := vif.listenerOpts
431 vif.muListen.Unlock()
432 if closed {
433 vlog.VI(2).Infof("Ignoring OpenVC message %+v as VIF %s does not accept VCs", m, vif)
434 vif.sendOnExpressQ(&message.CloseVC{
435 VCI: m.VCI,
436 Error: "VCs not accepted",
437 })
Jason Hickey96d30e82014-11-13 07:40:00 -0800438 return nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700439 }
440 vc, err := vif.newVC(m.VCI, m.DstEndpoint, m.SrcEndpoint, false)
441 vif.distributeCounters(m.Counters)
442 if err != nil {
443 vif.sendOnExpressQ(&message.CloseVC{
444 VCI: m.VCI,
445 Error: err.Error(),
446 })
Jason Hickey96d30e82014-11-13 07:40:00 -0800447 return nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700448 }
449 go vif.acceptFlowsLoop(vc, vc.HandshakeAcceptedVC(lopts...))
450 case *message.CloseVC:
451 if vc, _, _ := vif.vcMap.Find(m.VCI); vc != nil {
452 vif.vcMap.Delete(vc.VCI())
453 vlog.VI(2).Infof("CloseVC(%+v) on VIF %s", m, vif)
Cosmos Nicolaou1534b3f2014-12-10 15:30:00 -0800454 // TODO(cnicolaou): it would be nice to have a method on VC
455 // to indicate a 'remote close' rather than a 'local one'. This helps
456 // with error reporting since we expect reads/writes to occur
457 // after a remote close, but not after a local close.
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700458 vc.Close(fmt.Sprintf("remote end closed VC(%v)", m.Error))
Jason Hickey96d30e82014-11-13 07:40:00 -0800459 return nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700460 }
461 vlog.VI(2).Infof("Ignoring CloseVC(%+v) for unrecognized VCI on VIF %s", m, vif)
462 case *message.AddReceiveBuffers:
463 vif.distributeCounters(m.Counters)
464 case *message.OpenFlow:
465 if vc, _, _ := vif.vcMap.Find(m.VCI); vc != nil {
466 if err := vc.AcceptFlow(m.Flow); err != nil {
467 vlog.VI(3).Infof("OpenFlow %+v on VIF %v failed:%v", m, vif, err)
468 cm := &message.Data{VCI: m.VCI, Flow: m.Flow}
469 cm.SetClose()
470 vif.sendOnExpressQ(cm)
Jason Hickey96d30e82014-11-13 07:40:00 -0800471 return nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700472 }
473 vc.ReleaseCounters(m.Flow, m.InitialCounters)
Jason Hickey96d30e82014-11-13 07:40:00 -0800474 return nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700475 }
476 vlog.VI(2).Infof("Ignoring OpenFlow(%+v) for unrecognized VCI on VIF %s", m, m, vif)
Jason Hickey96d30e82014-11-13 07:40:00 -0800477 case *message.HopSetup:
478 // Configure the VIF. This takes over the conn during negotiation.
479 if vif.isSetup {
480 return errAlreadySetup
481 }
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800482 vif.muListen.Lock()
Jason Hickey96d30e82014-11-13 07:40:00 -0800483 principal, lBlessings, dischargeClient, err := serverAuthOptions(vif.listenerOpts)
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800484 vif.muListen.Unlock()
Jason Hickey96d30e82014-11-13 07:40:00 -0800485 if err != nil {
486 return errVersionNegotiationFailed
487 }
488 vif.writeMu.Lock()
Jungho Ahn4b9a5192015-02-02 13:11:08 -0800489 c, err := AuthenticateAsServer(vif.conn, vif.reader, vif.versions, principal, lBlessings, dischargeClient, m)
Jason Hickey96d30e82014-11-13 07:40:00 -0800490 if err != nil {
491 vif.writeMu.Unlock()
492 return err
493 }
494 vif.ctrlCipher = c
495 vif.writeMu.Unlock()
496 vif.isSetup = true
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700497 default:
498 vlog.Infof("Ignoring unrecognized message %T on VIF %s", m, vif)
499 }
Jason Hickey96d30e82014-11-13 07:40:00 -0800500 return nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700501}
502
503func (vif *VIF) vcDispatchLoop(vc *vc.VC, messages *pcqueue.T) {
504 defer vlog.VI(2).Infof("Exiting vcDispatchLoop(%v) on VIF %v", vc, vif)
Asim Shankarf5781102014-06-26 22:05:35 -0700505 defer vif.rpending.Done()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700506 for {
507 qm, err := messages.Get(nil)
508 if err != nil {
509 return
510 }
511 m := qm.(*message.Data)
512 if err := vc.DispatchPayload(m.Flow, m.Payload); err != nil {
513 vlog.VI(2).Infof("Ignoring data message %v for on VIF %v: %v", m, vif, err)
514 }
515 if m.Close() {
516 vif.shutdownFlow(vc, m.Flow)
517 }
518 }
519}
520
521func (vif *VIF) stopVCDispatchLoops() {
522 vcs := vif.vcMap.Freeze()
523 for _, v := range vcs {
524 v.RQ.Close()
525 }
Asim Shankarf5781102014-06-26 22:05:35 -0700526 vif.rpending.Wait()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700527}
528
529func (vif *VIF) acceptFlowsLoop(vc *vc.VC, c <-chan vc.HandshakeResult) {
530 hr := <-c
531 if hr.Error != nil {
532 vif.closeVCAndSendMsg(vc, hr.Error.Error())
533 return
534 }
535
536 vif.muListen.Lock()
537 acceptor := vif.acceptor
538 vif.muListen.Unlock()
539 if acceptor == nil {
540 vif.closeVCAndSendMsg(vc, "Flows no longer being accepted")
541 return
542 }
543
544 // Notify any listeners that a new VC has been established
545 if err := acceptor.Put(ConnectorAndFlow{vc, nil}); err != nil {
546 vif.closeVCAndSendMsg(vc, fmt.Sprintf("VC accept failed: %v", err))
547 return
548 }
549
550 vlog.VI(2).Infof("Running acceptFlowsLoop for VC %v on VIF %v", vc, vif)
551 for {
552 f, err := hr.Listener.Accept()
553 if err != nil {
Shyam Jayaramandbae76b2014-11-17 12:51:29 -0800554 vlog.VI(2).Infof("Accept failed on VC %v on VIF %v: %v", vc, vif, err)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700555 return
556 }
557 if err := acceptor.Put(ConnectorAndFlow{vc, f}); err != nil {
558 vlog.VI(2).Infof("vif.acceptor.Put(%v, %T) on VIF %v failed: %v", vc, f, vif, err)
559 f.Close()
560 return
561 }
562 }
563}
564
565func (vif *VIF) distributeCounters(counters message.Counters) {
566 for cid, bytes := range counters {
567 vc, _, _ := vif.vcMap.Find(cid.VCI())
568 if vc == nil {
569 vlog.VI(2).Infof("Ignoring counters for non-existent VCI %d on VIF %s", cid.VCI(), vif)
570 continue
571 }
572 vc.ReleaseCounters(cid.Flow(), bytes)
573 }
574}
575
576func (vif *VIF) writeLoop() {
577 defer vif.outgoing.Close()
578 defer vif.stopVCWriteLoops()
579 for {
580 writer, bufs, err := vif.outgoing.Get(nil)
581 if err != nil {
582 vlog.VI(1).Infof("Exiting writeLoop of VIF %s because of bqueue.Get error: %v", vif, err)
583 return
584 }
Robin Thellend5bd72422015-02-17 12:36:38 -0800585 vif.muMsgCounters.Lock()
586 vif.msgCounters[fmt.Sprintf("Send(%T)", writer)]++
587 vif.muMsgCounters.Unlock()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700588 switch writer {
589 case vif.expressQ:
590 for _, b := range bufs {
Jason Hickey96d30e82014-11-13 07:40:00 -0800591 if err := vif.writeSerializedMessage(b.Contents); err != nil {
Jungho Ahn4b9a5192015-02-02 13:11:08 -0800592 vlog.VI(1).Infof("Exiting writeLoop of VIF %s because Control message write failed: %s", vif, err)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700593 releaseBufs(bufs)
594 return
595 }
596 b.Release()
597 }
598 case vif.flowQ:
599 msg := &message.AddReceiveBuffers{}
600 // No need to call releaseBufs(bufs) as all bufs are
601 // the exact same value: flowToken.
602 vif.flowMu.Lock()
603 if len(vif.flowCounters) > 0 {
604 msg.Counters = vif.flowCounters
605 vif.flowCounters = message.NewCounters()
606 }
607 vif.flowMu.Unlock()
608 if len(msg.Counters) > 0 {
609 vlog.VI(3).Infof("Sending counters %v on VIF %s", msg.Counters, vif)
Jason Hickey96d30e82014-11-13 07:40:00 -0800610 if err := vif.writeMessage(msg); err != nil {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700611 vlog.VI(1).Infof("Exiting writeLoop of VIF %s because AddReceiveBuffers message write failed: %v", vif, err)
612 return
613 }
614 }
615 case vif.stopQ:
616 // Lowest-priority queue which will never have any
617 // buffers, Close is the only method called on it.
618 return
619 default:
620 vif.writeDataMessages(writer, bufs)
621 }
622 }
623}
624
625func (vif *VIF) vcWriteLoop(vc *vc.VC, messages *pcqueue.T) {
626 defer vlog.VI(2).Infof("Exiting vcWriteLoop(%v) on VIF %v", vc, vif)
Asim Shankarf5781102014-06-26 22:05:35 -0700627 defer vif.wpending.Done()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700628 for {
629 qm, err := messages.Get(nil)
630 if err != nil {
631 return
632 }
633 m := qm.(*message.Data)
634 m.Payload, err = vc.Encrypt(m.Flow, m.Payload)
635 if err != nil {
636 vlog.Infof("Encryption failed. Flow:%v VC:%v Error:%v", m.Flow, vc, err)
637 }
638 if m.Close() {
639 // The last bytes written on the flow will be sent out
640 // on vif.conn. Local datastructures for the flow can
641 // be cleaned up now.
642 vif.shutdownFlow(vc, m.Flow)
643 }
644 if err == nil {
Jason Hickey96d30e82014-11-13 07:40:00 -0800645 err = vif.writeMessage(m)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700646 }
647 if err != nil {
Bogdan Capritaad5761f2014-09-23 10:56:23 -0700648 // TODO(caprita): Calling closeVCAndSendMsg below causes
649 // a race as described in:
650 // https://docs.google.com/a/google.com/document/d/1C0kxfYhuOcStdV7tnLZELZpUhfQCZj47B0JrzbE29h8/edit
651 //
652 // There should be a finer grained way to fix this, and
653 // there are likely other instances where we should not
654 // be closing the VC.
655 //
656 // For now, commenting out the line below removes the
657 // flakiness from our existing unit tests, but this
658 // needs to be revisited and fixed correctly.
659 //
660 // vif.closeVCAndSendMsg(vc, fmt.Sprintf("write failure: %v", err))
661
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700662 // Drain the queue and exit.
663 for {
664 qm, err := messages.Get(nil)
665 if err != nil {
666 return
667 }
668 qm.(*message.Data).Release()
669 }
670 }
671 }
672}
673
674func (vif *VIF) stopVCWriteLoops() {
675 vcs := vif.vcMap.Freeze()
676 for _, v := range vcs {
677 v.WQ.Close()
678 }
679}
680
681// sendOnExpressQ adds 'msg' to the expressQ (highest priority queue) of messages to write on the wire.
682func (vif *VIF) sendOnExpressQ(msg message.T) error {
Cosmos Nicolaou82d00d82015-02-10 21:31:00 -0800683 vlog.VI(2).Infof("sendOnExpressQ(%T = %+v) on VIF %s", msg, msg, vif)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700684 var buf bytes.Buffer
Jason Hickey96d30e82014-11-13 07:40:00 -0800685 // Don't encrypt yet, because the message ordering isn't yet determined.
686 // Encryption is performed by vif.writeSerializedMessage() when the
687 // message is actually written to vif.conn.
688 vif.writeMu.Lock()
689 c := vif.ctrlCipher
690 vif.writeMu.Unlock()
691 if err := message.WriteTo(&buf, msg, crypto.NewDisabledControlCipher(c)); err != nil {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700692 return err
693 }
694 return vif.expressQ.Put(iobuf.NewSlice(buf.Bytes()), nil)
695}
696
Jason Hickey96d30e82014-11-13 07:40:00 -0800697// writeMessage writes the message to the channel. Writes must be serialized so
698// that the control channel can be encrypted, so we acquire the writeMu.
699func (vif *VIF) writeMessage(msg message.T) error {
700 vif.writeMu.Lock()
701 defer vif.writeMu.Unlock()
702 return message.WriteTo(vif.conn, msg, vif.ctrlCipher)
703}
704
705// Write writes the message to the channel, encrypting the control data. Writes
706// must be serialized so that the control channel can be encrypted, so we
707// acquire the writeMu.
708func (vif *VIF) writeSerializedMessage(msg []byte) error {
709 vif.writeMu.Lock()
710 defer vif.writeMu.Unlock()
711 if err := message.EncryptMessage(msg, vif.ctrlCipher); err != nil {
712 return err
713 }
714 if n, err := vif.conn.Write(msg); err != nil {
715 return fmt.Errorf("write failed: got (%d, %v) for %d byte message", n, err, len(msg))
716 }
717 return nil
718}
719
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700720func (vif *VIF) writeDataMessages(writer bqueue.Writer, bufs []*iobuf.Slice) {
721 vci, fid := unpackIDs(writer.ID())
722 // iobuf.Coalesce will coalesce buffers only if they are adjacent to
723 // each other. In the worst case, each buf will be non-adjacent to the
724 // others and the code below will end up with multiple small writes
725 // instead of a single big one.
726 // Might want to investigate this and see if this needs to be
727 // revisited.
728 bufs = iobuf.Coalesce(bufs, uint(vc.MaxPayloadSizeBytes))
729 _, _, wq := vif.vcMap.Find(vci)
730 if wq == nil {
731 // VC has been removed, stop sending messages
732 vlog.VI(2).Infof("VCI %d on VIF %s was shutdown, dropping %d messages that were pending a write", vci, vif, len(bufs))
733 releaseBufs(bufs)
734 return
735 }
736 last := len(bufs) - 1
737 drained := writer.IsDrained()
738 for i, b := range bufs {
739 d := &message.Data{VCI: vci, Flow: fid, Payload: b}
740 if drained && i == last {
741 d.SetClose()
742 }
743 if err := wq.Put(d, nil); err != nil {
744 releaseBufs(bufs[i:])
745 return
746 }
747 }
748 if len(bufs) == 0 && drained {
749 d := &message.Data{VCI: vci, Flow: fid}
750 d.SetClose()
751 if err := wq.Put(d, nil); err != nil {
752 d.Release()
753 }
754 }
755}
756
757func (vif *VIF) allocVCI() id.VC {
758 vif.muNextVCI.Lock()
759 ret := vif.nextVCI
760 vif.nextVCI += 2
761 vif.muNextVCI.Unlock()
762 return ret
763}
764
765func (vif *VIF) newVC(vci id.VC, localEP, remoteEP naming.Endpoint, dialed bool) (*vc.VC, error) {
Ryan Brown407b7322014-07-22 14:18:08 -0700766 version, err := version.CommonVersion(localEP, remoteEP)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700767 if vif.versions != nil {
Ryan Brown407b7322014-07-22 14:18:08 -0700768 version, err = vif.versions.CommonVersion(localEP, remoteEP)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700769 }
770 if err != nil {
771 return nil, err
772 }
773 vc := vc.InternalNew(vc.Params{
774 VCI: vci,
775 Dialed: dialed,
776 LocalEP: localEP,
777 RemoteEP: remoteEP,
778 Pool: vif.pool,
Jason Hickey96d30e82014-11-13 07:40:00 -0800779 ReserveBytes: uint(message.HeaderSizeBytes + vif.ctrlCipher.MACSize()),
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700780 Helper: vcHelper{vif},
Ryan Brown407b7322014-07-22 14:18:08 -0700781 Version: version,
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700782 })
783 added, rq, wq := vif.vcMap.Insert(vc)
Asim Shankarf5781102014-06-26 22:05:35 -0700784 // Start vcWriteLoop
785 if added = added && vif.wpending.TryAdd(); added {
786 go vif.vcWriteLoop(vc, wq)
787 }
788 // Start vcDispatchLoop
789 if added = added && vif.rpending.TryAdd(); added {
790 go vif.vcDispatchLoop(vc, rq)
791 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700792 if !added {
Asim Shankarf5781102014-06-26 22:05:35 -0700793 if rq != nil {
794 rq.Close()
795 }
796 if wq != nil {
797 wq.Close()
798 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700799 vif.vcMap.Delete(vci)
800 vc.Close("underlying network connection shutting down")
Jiri Simsa074bf362015-02-17 09:29:45 -0800801 // We embed an error inside verror.ErrAborted because other layers
Mike Burrows35baad92015-02-10 13:42:53 -0800802 // check for the "Aborted" error as a special case. Perhaps
803 // eventually we'll get rid of the Aborted layer.
Jiri Simsa074bf362015-02-17 09:29:45 -0800804 return nil, verror.New(verror.ErrAborted, nil, verror.New(errShuttingDown, nil, vif))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700805 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700806 return vc, nil
807}
808
809func (vif *VIF) closeVCAndSendMsg(vc *vc.VC, msg string) {
810 vlog.VI(2).Infof("Shutting down VCI %d on VIF %v due to: %v", vc.VCI(), vif, msg)
811 vif.vcMap.Delete(vc.VCI())
812 vc.Close(msg)
813 if err := vif.sendOnExpressQ(&message.CloseVC{
814 VCI: vc.VCI(),
815 Error: msg,
816 }); err != nil {
817 vlog.VI(2).Infof("sendOnExpressQ(CloseVC{VCI:%d,...}) on VIF %v failed: %v", vc.VCI(), vif, err)
818 }
819}
820
821// shutdownFlow clears out all the datastructures associated with fid.
822func (vif *VIF) shutdownFlow(vc *vc.VC, fid id.Flow) {
823 vc.ShutdownFlow(fid)
824 vif.flowMu.Lock()
825 delete(vif.flowCounters, message.MakeCounterID(vc.VCI(), fid))
826 vif.flowMu.Unlock()
827}
828
829// ShutdownVCs closes all VCs established to the provided remote endpoint.
830// Returns the number of VCs that were closed.
831func (vif *VIF) ShutdownVCs(remote naming.Endpoint) int {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700832 vcs := vif.vcMap.List()
833 n := 0
834 for _, vc := range vcs {
Asim Shankar7cf29002014-10-09 00:38:37 -0700835 if naming.Compare(vc.RemoteAddr().RoutingID(), remote.RoutingID()) {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700836 vlog.VI(1).Infof("VCI %d on VIF %s being closed because of ShutdownVCs call", vc.VCI(), vif)
837 vif.closeVCAndSendMsg(vc, "")
838 n++
839 }
840 }
841 return n
842}
843
844// NumVCs returns the number of VCs established over this VIF.
845func (vif *VIF) NumVCs() int { return vif.vcMap.Size() }
846
847// DebugString returns a descriptive state of the VIF.
848//
849// The returned string is meant for consumptions by humans. The specific format
850// should not be relied upon by any automated processing.
851func (vif *VIF) DebugString() string {
852 vcs := vif.vcMap.List()
853 l := make([]string, 0, len(vcs)+1)
854
855 vif.muNextVCI.Lock() // Needed for vif.nextVCI
Robin Thellend5bd72422015-02-17 12:36:38 -0800856 l = append(l, fmt.Sprintf("VIF:[%s] -- #VCs:%d NextVCI:%d ControlChannelEncryption:%v IsClosed:%v", vif, len(vcs), vif.nextVCI, vif.isSetup, vif.isClosed))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700857 vif.muNextVCI.Unlock()
858
859 for _, vc := range vcs {
860 l = append(l, vc.DebugString())
861 }
Robin Thellend5bd72422015-02-17 12:36:38 -0800862
863 l = append(l, "Message Counters:")
864 ctrs := len(l)
865 vif.muMsgCounters.Lock()
866 for k, v := range vif.msgCounters {
867 l = append(l, fmt.Sprintf(" %-32s %10d", k, v))
868 }
869 vif.muMsgCounters.Unlock()
870 sort.Strings(l[ctrs:])
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700871 return strings.Join(l, "\n")
872}
873
874// Methods and type that implement vc.Helper
Jungho Ahn4b9a5192015-02-02 13:11:08 -0800875//
876// We create a separate type for vc.Helper to hide the vc.Helper methods
877// from the exported method set of VIF.
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700878type vcHelper struct{ vif *VIF }
879
880func (h vcHelper) NotifyOfNewFlow(vci id.VC, fid id.Flow, bytes uint) {
881 h.vif.sendOnExpressQ(&message.OpenFlow{VCI: vci, Flow: fid, InitialCounters: uint32(bytes)})
882}
883
884func (h vcHelper) AddReceiveBuffers(vci id.VC, fid id.Flow, bytes uint) {
885 if bytes == 0 {
886 return
887 }
888 h.vif.flowMu.Lock()
889 h.vif.flowCounters.Add(vci, fid, uint32(bytes))
890 h.vif.flowMu.Unlock()
891 h.vif.flowQ.TryPut(flowToken)
892}
893
894func (h vcHelper) NewWriter(vci id.VC, fid id.Flow) (bqueue.Writer, error) {
895 return h.vif.outgoing.NewWriter(packIDs(vci, fid), normalPriority, defaultBytesBufferedPerFlow)
896}
897
898// The token added to vif.flowQ.
899var flowToken *iobuf.Slice
900
901func init() {
902 // flowToken must be non-empty otherwise bqueue.Writer.Put will ignore it.
903 flowToken = iobuf.NewSlice(make([]byte, 1))
904}
905
906func packIDs(vci id.VC, fid id.Flow) bqueue.ID {
907 return bqueue.ID(message.MakeCounterID(vci, fid))
908}
909
910func unpackIDs(b bqueue.ID) (id.VC, id.Flow) {
911 cid := message.CounterID(b)
912 return cid.VCI(), cid.Flow()
913}
914
915func releaseBufs(bufs []*iobuf.Slice) {
916 for _, b := range bufs {
917 b.Release()
918 }
919}