blob: e356600f13698b610939886b98bdfec3da1b079b [file] [log] [blame]
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001package vif
2
3// Logging guidelines:
4// vlog.VI(1) for per-net.Conn information
5// vlog.VI(2) for per-VC information
6// vlog.VI(3) for per-Flow information
7
8import (
9 "bytes"
Jason Hickey96d30e82014-11-13 07:40:00 -080010 "errors"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070011 "fmt"
12 "net"
Robin Thellend5bd72422015-02-17 12:36:38 -080013 "sort"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070014 "strings"
15 "sync"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070016
Suharsh Sivakumaraf862a52015-02-04 13:50:47 -080017 "v.io/core/veyron/runtimes/google/ipc/stream"
Jiri Simsa764efb72014-12-25 20:57:03 -080018 "v.io/core/veyron/runtimes/google/ipc/stream/crypto"
19 "v.io/core/veyron/runtimes/google/ipc/stream/id"
20 "v.io/core/veyron/runtimes/google/ipc/stream/message"
21 "v.io/core/veyron/runtimes/google/ipc/stream/vc"
22 "v.io/core/veyron/runtimes/google/ipc/version"
23 "v.io/core/veyron/runtimes/google/lib/bqueue"
24 "v.io/core/veyron/runtimes/google/lib/bqueue/drrqueue"
25 "v.io/core/veyron/runtimes/google/lib/iobuf"
26 "v.io/core/veyron/runtimes/google/lib/pcqueue"
27 vsync "v.io/core/veyron/runtimes/google/lib/sync"
28 "v.io/core/veyron/runtimes/google/lib/upcqueue"
Jiri Simsa6ac95222015-02-23 16:11:49 -080029 "v.io/v23/naming"
30 "v.io/v23/verror"
Jiri Simsa6ac95222015-02-23 16:11:49 -080031 "v.io/v23/vtrace"
Jiri Simsa337af232015-02-27 14:36:46 -080032 "v.io/x/lib/vlog"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070033)
34
Mike Burrows35baad92015-02-10 13:42:53 -080035const pkgPath = "v.io/core/veyron/runtimes/google/ipc/stream/vif"
36
37var (
38 errShuttingDown = verror.Register(pkgPath+".errShuttingDown", verror.NoRetry, "{1:}{2:} underlying network connection({3}) shutting down{:_}")
39)
40
Jiri Simsa5293dcb2014-05-10 09:56:38 -070041// VIF implements a "virtual interface" over an underlying network connection
42// (net.Conn). Just like multiple network connections can be established over a
43// single physical interface, multiple Virtual Circuits (VCs) can be
44// established over a single VIF.
45type VIF struct {
Jungho Ahn4b9a5192015-02-02 13:11:08 -080046 // All reads must be performed through reader, and not directly through conn.
Jiri Simsa5293dcb2014-05-10 09:56:38 -070047 conn net.Conn
Jiri Simsa5293dcb2014-05-10 09:56:38 -070048 pool *iobuf.Pool
Jungho Ahn4b9a5192015-02-02 13:11:08 -080049 reader *iobuf.Reader
Jiri Simsa5293dcb2014-05-10 09:56:38 -070050 localEP naming.Endpoint
51
Jason Hickey96d30e82014-11-13 07:40:00 -080052 // control channel encryption.
53 isSetup bool
54 // ctrlCipher is normally guarded by writeMu, however see the exception in
55 // readLoop.
56 ctrlCipher crypto.ControlCipher
57 writeMu sync.Mutex
58
Asim Shankarf5781102014-06-26 22:05:35 -070059 vcMap *vcMap
Tilak Sharma6d7c39c2014-06-27 10:05:37 -070060 wpending, rpending vsync.WaitGroup
Jiri Simsa5293dcb2014-05-10 09:56:38 -070061
62 muListen sync.Mutex
63 acceptor *upcqueue.T // GUARDED_BY(muListen)
64 listenerOpts []stream.ListenerOpt // GUARDED_BY(muListen)
65
66 muNextVCI sync.Mutex
67 nextVCI id.VC
68
69 outgoing bqueue.T
70 expressQ bqueue.Writer
71
72 flowQ bqueue.Writer
73 flowMu sync.Mutex
74 flowCounters message.Counters
75
76 stopQ bqueue.Writer
77
78 // The IPC version range supported by this VIF. In practice this is
79 // non-nil only in testing. nil is equivalent to using the versions
80 // actually supported by this IPC implementation (which is always
81 // what you want outside of tests).
82 versions *version.Range
Jungho Ahn4b9a5192015-02-02 13:11:08 -080083
84 isClosedMu sync.Mutex
85 isClosed bool // GUARDED_BY(isClosedMu)
Robin Thellend2224ffa2015-02-14 21:28:27 -080086
87 // All sets that this VIF is in.
88 muSets sync.Mutex
89 sets []*Set // GUARDED_BY(muSets)
Robin Thellend5bd72422015-02-17 12:36:38 -080090
91 // These counters track the number of messages sent and received by
92 // this VIF.
93 muMsgCounters sync.Mutex
94 msgCounters map[string]int64
Jiri Simsa5293dcb2014-05-10 09:56:38 -070095}
96
97// ConnectorAndFlow represents a Flow and the Connector that can be used to
98// create another Flow over the same underlying VC.
99type ConnectorAndFlow struct {
100 Connector stream.Connector
101 Flow stream.Flow
102}
103
104// Separate out constants that are not exported so that godoc looks nicer for
105// the exported ones.
106const (
107 // Priorities of the buffered queues used for flow control of writes.
108 expressPriority bqueue.Priority = iota
109 flowPriority
110 normalPriority
111 stopPriority
112
113 // Convenience aliases so that the package name "vc" does not
114 // conflict with the variables named "vc".
115 defaultBytesBufferedPerFlow = vc.DefaultBytesBufferedPerFlow
116 sharedFlowID = vc.SharedFlowID
117)
118
Jason Hickey96d30e82014-11-13 07:40:00 -0800119var (
120 errAlreadySetup = errors.New("VIF is already setup")
121)
122
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700123// InternalNewDialedVIF creates a new virtual interface over the provided
124// network connection, under the assumption that the conn object was created
125// using net.Dial.
126//
127// As the name suggests, this method is intended for use only within packages
128// placed inside veyron/runtimes/google. Code outside the
129// veyron2/runtimes/google/* packages should never call this method.
Jason Hickey96d30e82014-11-13 07:40:00 -0800130func InternalNewDialedVIF(conn net.Conn, rid naming.RoutingID, versions *version.Range, opts ...stream.VCOpt) (*VIF, error) {
Asim Shankarf4864f42014-11-25 18:53:05 -0800131 ctx, principal, dc, err := clientAuthOptions(opts)
Jason Hickey96d30e82014-11-13 07:40:00 -0800132 if err != nil {
133 return nil, err
134 }
Asim Shankarf4864f42014-11-25 18:53:05 -0800135 if ctx != nil {
136 var span vtrace.Span
Matt Rosencrantz5f98d942015-01-08 13:48:30 -0800137 ctx, span = vtrace.SetNewSpan(ctx, "InternalNewDialedVIF")
Asim Shankarf4864f42014-11-25 18:53:05 -0800138 span.Annotatef("(%v, %v)", conn.RemoteAddr().Network(), conn.RemoteAddr())
139 defer span.Finish()
140 }
Jungho Ahn4b9a5192015-02-02 13:11:08 -0800141 pool := iobuf.NewPool(0)
142 reader := iobuf.NewReader(pool, conn)
143 c, err := AuthenticateAsClient(ctx, conn, reader, versions, principal, dc)
Jason Hickey96d30e82014-11-13 07:40:00 -0800144 if err != nil {
145 return nil, err
146 }
Jungho Ahn4b9a5192015-02-02 13:11:08 -0800147 return internalNew(conn, pool, reader, rid, id.VC(vc.NumReservedVCs), versions, nil, nil, c)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700148}
149
150// InternalNewAcceptedVIF creates a new virtual interface over the provided
151// network connection, under the assumption that the conn object was created
Asim Shankar3d133872014-05-16 23:16:31 -0700152// using an Accept call on a net.Listener object.
153//
154// The returned VIF is also setup for accepting new VCs and Flows with the provided
155// ListenerOpts.
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700156//
157// As the name suggests, this method is intended for use only within packages
158// placed inside veyron/runtimes/google. Code outside the
159// veyron/runtimes/google/* packages should never call this method.
Asim Shankar3d133872014-05-16 23:16:31 -0700160func InternalNewAcceptedVIF(conn net.Conn, rid naming.RoutingID, versions *version.Range, lopts ...stream.ListenerOpt) (*VIF, error) {
Jungho Ahn4b9a5192015-02-02 13:11:08 -0800161 pool := iobuf.NewPool(0)
162 reader := iobuf.NewReader(pool, conn)
163 return internalNew(conn, pool, reader, rid, id.VC(vc.NumReservedVCs)+1, versions, upcqueue.New(), lopts, &crypto.NullControlCipher{})
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700164}
165
Jungho Ahn4b9a5192015-02-02 13:11:08 -0800166func internalNew(conn net.Conn, pool *iobuf.Pool, reader *iobuf.Reader, rid naming.RoutingID, initialVCI id.VC, versions *version.Range, acceptor *upcqueue.T, listenerOpts []stream.ListenerOpt, c crypto.ControlCipher) (*VIF, error) {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700167 var (
168 // Choose IDs that will not conflict with any other (VC, Flow)
169 // pairs. VCI 0 is never used by the application (it is
170 // reserved for control messages), so steal from the Flow space
171 // there.
172 expressID bqueue.ID = packIDs(0, 0)
173 flowID bqueue.ID = packIDs(0, 1)
174 stopID bqueue.ID = packIDs(0, 2)
175 )
176 outgoing := drrqueue.New(vc.MaxPayloadSizeBytes)
177
178 expressQ, err := outgoing.NewWriter(expressID, expressPriority, defaultBytesBufferedPerFlow)
179 if err != nil {
180 return nil, fmt.Errorf("failed to create bqueue.Writer for express messages: %v", err)
181 }
182 expressQ.Release(-1) // Disable flow control
183
184 flowQ, err := outgoing.NewWriter(flowID, flowPriority, flowToken.Size())
185 if err != nil {
186 return nil, fmt.Errorf("failed to create bqueue.Writer for flow control counters: %v", err)
187 }
188 flowQ.Release(-1) // Disable flow control
189
190 stopQ, err := outgoing.NewWriter(stopID, stopPriority, 1)
191 if err != nil {
192 return nil, fmt.Errorf("failed to create bqueue.Writer for stopping the write loop: %v", err)
193 }
194 stopQ.Release(-1) // Disable flow control
195
196 localAddr := conn.LocalAddr()
197 ep := version.Endpoint(localAddr.Network(), localAddr.String(), rid)
198 if versions != nil {
199 ep = versions.Endpoint(localAddr.Network(), localAddr.String(), rid)
200 }
201 vif := &VIF{
202 conn: conn,
Jungho Ahn4b9a5192015-02-02 13:11:08 -0800203 pool: pool,
204 reader: reader,
Jason Hickey96d30e82014-11-13 07:40:00 -0800205 ctrlCipher: c,
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700206 vcMap: newVCMap(),
Asim Shankar3d133872014-05-16 23:16:31 -0700207 acceptor: acceptor,
208 listenerOpts: listenerOpts,
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700209 localEP: ep,
210 nextVCI: initialVCI,
211 outgoing: outgoing,
212 expressQ: expressQ,
213 flowQ: flowQ,
214 flowCounters: message.NewCounters(),
215 stopQ: stopQ,
216 versions: versions,
Robin Thellend5bd72422015-02-17 12:36:38 -0800217 msgCounters: make(map[string]int64),
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700218 }
219 go vif.readLoop()
220 go vif.writeLoop()
221 return vif, nil
222}
223
224// Dial creates a new VC to the provided remote identity, authenticating the VC
225// with the provided local identity.
226func (vif *VIF) Dial(remoteEP naming.Endpoint, opts ...stream.VCOpt) (stream.VC, error) {
Asim Shankar7b5c94a2014-10-28 21:42:56 -0700227 vc, err := vif.newVC(vif.allocVCI(), vif.localEP, remoteEP, true)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700228 if err != nil {
229 return nil, err
230 }
231 counters := message.NewCounters()
232 counters.Add(vc.VCI(), sharedFlowID, defaultBytesBufferedPerFlow)
Asim Shankarcd612e12015-02-24 15:29:52 -0800233 // TODO(ashankar,mattr): If remoteEP/localEP version ranges allow, then
234 // use message.SetupVC instead of message.OpenVC.
235 // Rough outline:
236 // (1) Switch to NaclBox for VC encryption (thus the VC handshake will
237 // no longer require the TLS flow and roundtrips for that).
238 // (2) Send an appropriate SetupVC message in response to a received
239 // SetupVC message.
240 // (3) Use the SetupVC received from the remote end to establish the
241 // exact protocol version to use.
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700242 err = vif.sendOnExpressQ(&message.OpenVC{
243 VCI: vc.VCI(),
244 DstEndpoint: remoteEP,
Asim Shankar7b5c94a2014-10-28 21:42:56 -0700245 SrcEndpoint: vif.localEP,
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700246 Counters: counters})
247 if err != nil {
248 err = fmt.Errorf("vif.sendOnExpressQ(OpenVC) failed: %v", err)
249 vc.Close(err.Error())
250 return nil, err
251 }
252 if err := vc.HandshakeDialedVC(opts...); err != nil {
253 vif.vcMap.Delete(vc.VCI())
254 err = fmt.Errorf("VC handshake failed: %v", err)
255 vc.Close(err.Error())
256 return nil, err
257 }
258 return vc, nil
259}
260
Robin Thellend2224ffa2015-02-14 21:28:27 -0800261// addSet adds a set to the list of sets this VIF is in. This method is called
262// by Set.Insert().
263func (vif *VIF) addSet(s *Set) {
264 vif.muSets.Lock()
265 defer vif.muSets.Unlock()
266 vif.sets = append(vif.sets, s)
267}
268
269// removeSet removes a set from the list of sets this VIF is in. This method is
270// called by Set.Delete().
271func (vif *VIF) removeSet(s *Set) {
272 vif.muSets.Lock()
273 defer vif.muSets.Unlock()
274 for ix, vs := range vif.sets {
275 if vs == s {
276 vif.sets = append(vif.sets[:ix], vif.sets[ix+1:]...)
277 return
278 }
279 }
280
281}
282
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700283// Close closes all VCs (and thereby Flows) over the VIF and then closes the
284// underlying network connection after draining all pending writes on those
285// VCs.
286func (vif *VIF) Close() {
Jungho Ahn4b9a5192015-02-02 13:11:08 -0800287 vif.isClosedMu.Lock()
288 if vif.isClosed {
289 vif.isClosedMu.Unlock()
290 return
291 }
292 vif.isClosed = true
293 vif.isClosedMu.Unlock()
294
Robin Thellend2224ffa2015-02-14 21:28:27 -0800295 vif.muSets.Lock()
296 sets := vif.sets
297 vif.sets = nil
298 vif.muSets.Unlock()
299 for _, s := range sets {
300 s.Delete(vif)
301 }
302
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700303 vlog.VI(1).Infof("Closing VIF %s", vif)
304 // Stop accepting new VCs.
305 vif.StopAccepting()
306 // Close local datastructures for all existing VCs.
307 vcs := vif.vcMap.Freeze()
308 for _, vc := range vcs {
309 vc.VC.Close("VIF is being closed")
310 }
311 // Wait for the vcWriteLoops to exit (after draining queued up messages).
312 vif.stopQ.Close()
Asim Shankarf5781102014-06-26 22:05:35 -0700313 vif.wpending.Wait()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700314 // Close the underlying network connection.
315 // No need to send individual messages to close all pending VCs since
316 // the remote end should know to close all VCs when the VIF's
317 // connection breaks.
318 if err := vif.conn.Close(); err != nil {
319 vlog.VI(1).Infof("net.Conn.Close failed on VIF %s: %v", vif, err)
320 }
321}
322
323// StartAccepting begins accepting Flows (and VCs) initiated by the remote end
324// of a VIF. opts is used to setup the listener on newly established VCs.
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700325func (vif *VIF) StartAccepting(opts ...stream.ListenerOpt) error {
326 vif.muListen.Lock()
327 defer vif.muListen.Unlock()
328 if vif.acceptor != nil {
329 return fmt.Errorf("already accepting Flows on VIF %v", vif)
330 }
331 vif.acceptor = upcqueue.New()
332 vif.listenerOpts = opts
333 return nil
334}
335
336// StopAccepting prevents any Flows initiated by the remote end of a VIF from
337// being accepted and causes any existing and future calls to Accept to fail
338// immediately.
339func (vif *VIF) StopAccepting() {
340 vif.muListen.Lock()
341 defer vif.muListen.Unlock()
342 if vif.acceptor != nil {
343 vif.acceptor.Shutdown()
344 vif.acceptor = nil
345 vif.listenerOpts = nil
346 }
347}
348
349// Accept returns the (stream.Connector, stream.Flow) pair of a newly
350// established VC and/or Flow.
351//
352// Sample usage:
353// for {
354// cAndf, err := vif.Accept()
355// switch {
356// case err != nil:
357// fmt.Println("Accept error:", err)
358// return
359// case cAndf.Flow == nil:
360// fmt.Println("New VC established:", cAndf.Connector)
361// default:
362// fmt.Println("New flow established")
363// go handleFlow(cAndf.Flow)
364// }
365// }
366func (vif *VIF) Accept() (ConnectorAndFlow, error) {
367 vif.muListen.Lock()
368 acceptor := vif.acceptor
369 vif.muListen.Unlock()
370 if acceptor == nil {
371 return ConnectorAndFlow{}, fmt.Errorf("VCs not accepted on VIF %v", vif)
372 }
373 item, err := acceptor.Get(nil)
374 if err != nil {
375 return ConnectorAndFlow{}, fmt.Errorf("Accept failed: %v", err)
376 }
377 return item.(ConnectorAndFlow), nil
378}
379
380func (vif *VIF) String() string {
381 l := vif.conn.LocalAddr()
382 r := vif.conn.RemoteAddr()
383 return fmt.Sprintf("(%s, %s) <-> (%s, %s)", r.Network(), r, l.Network(), l)
384}
385
386func (vif *VIF) readLoop() {
387 defer vif.Close()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700388 defer vif.stopVCDispatchLoops()
389 for {
Jason Hickey96d30e82014-11-13 07:40:00 -0800390 // vif.ctrlCipher is guarded by vif.writeMu. However, the only mutation
391 // to it is in handleMessage, which runs in the same goroutine, so a
392 // lock is not required here.
Jungho Ahn4b9a5192015-02-02 13:11:08 -0800393 msg, err := message.ReadFrom(vif.reader, vif.ctrlCipher)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700394 if err != nil {
395 vlog.VI(1).Infof("Exiting readLoop of VIF %s because of read error: %v", vif, err)
396 return
397 }
398 vlog.VI(3).Infof("Received %T = [%v] on VIF %s", msg, msg, vif)
Jason Hickey96d30e82014-11-13 07:40:00 -0800399 if err := vif.handleMessage(msg); err != nil {
400 vlog.VI(1).Infof("Exiting readLoop of VIF %s because of message error: %v", vif, err)
401 return
402 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700403 }
404}
405
Jason Hickey96d30e82014-11-13 07:40:00 -0800406// handleMessage handles a single incoming message. Any error returned is
407// fatal, causing the VIF to close.
408func (vif *VIF) handleMessage(msg message.T) error {
Robin Thellend5bd72422015-02-17 12:36:38 -0800409 vif.muMsgCounters.Lock()
410 vif.msgCounters[fmt.Sprintf("Recv(%T)", msg)]++
411 vif.muMsgCounters.Unlock()
412
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700413 switch m := msg.(type) {
414 case *message.Data:
415 _, rq, _ := vif.vcMap.Find(m.VCI)
416 if rq == nil {
417 vlog.VI(2).Infof("Ignoring message of %d bytes for unrecognized VCI %d on VIF %s", m.Payload.Size(), m.VCI, vif)
418 m.Release()
Jason Hickey96d30e82014-11-13 07:40:00 -0800419 return nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700420 }
421 if err := rq.Put(m, nil); err != nil {
422 vlog.VI(2).Infof("Failed to put message(%v) on VC queue on VIF %v: %v", m, vif, err)
423 m.Release()
424 }
425 case *message.OpenVC:
426 vif.muListen.Lock()
427 closed := vif.acceptor == nil || vif.acceptor.IsClosed()
428 lopts := vif.listenerOpts
429 vif.muListen.Unlock()
430 if closed {
431 vlog.VI(2).Infof("Ignoring OpenVC message %+v as VIF %s does not accept VCs", m, vif)
432 vif.sendOnExpressQ(&message.CloseVC{
433 VCI: m.VCI,
434 Error: "VCs not accepted",
435 })
Jason Hickey96d30e82014-11-13 07:40:00 -0800436 return nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700437 }
438 vc, err := vif.newVC(m.VCI, m.DstEndpoint, m.SrcEndpoint, false)
439 vif.distributeCounters(m.Counters)
440 if err != nil {
441 vif.sendOnExpressQ(&message.CloseVC{
442 VCI: m.VCI,
443 Error: err.Error(),
444 })
Jason Hickey96d30e82014-11-13 07:40:00 -0800445 return nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700446 }
447 go vif.acceptFlowsLoop(vc, vc.HandshakeAcceptedVC(lopts...))
Asim Shankarcd612e12015-02-24 15:29:52 -0800448 case *message.SetupVC:
449 // TODO(ashankar,mattr): Handle this! See comment about SetupVC
450 // in vif.Dial
451 vif.distributeCounters(m.Counters)
452 vif.sendOnExpressQ(&message.CloseVC{
453 VCI: m.VCI,
454 Error: "SetupVC handling not implemented yet",
455 })
456 vlog.VI(2).Infof("Received SetupVC message, but handling not yet implemented")
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700457 case *message.CloseVC:
458 if vc, _, _ := vif.vcMap.Find(m.VCI); vc != nil {
459 vif.vcMap.Delete(vc.VCI())
460 vlog.VI(2).Infof("CloseVC(%+v) on VIF %s", m, vif)
Cosmos Nicolaou1534b3f2014-12-10 15:30:00 -0800461 // TODO(cnicolaou): it would be nice to have a method on VC
462 // to indicate a 'remote close' rather than a 'local one'. This helps
463 // with error reporting since we expect reads/writes to occur
464 // after a remote close, but not after a local close.
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700465 vc.Close(fmt.Sprintf("remote end closed VC(%v)", m.Error))
Jason Hickey96d30e82014-11-13 07:40:00 -0800466 return nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700467 }
468 vlog.VI(2).Infof("Ignoring CloseVC(%+v) for unrecognized VCI on VIF %s", m, vif)
469 case *message.AddReceiveBuffers:
470 vif.distributeCounters(m.Counters)
471 case *message.OpenFlow:
472 if vc, _, _ := vif.vcMap.Find(m.VCI); vc != nil {
473 if err := vc.AcceptFlow(m.Flow); err != nil {
474 vlog.VI(3).Infof("OpenFlow %+v on VIF %v failed:%v", m, vif, err)
475 cm := &message.Data{VCI: m.VCI, Flow: m.Flow}
476 cm.SetClose()
477 vif.sendOnExpressQ(cm)
Jason Hickey96d30e82014-11-13 07:40:00 -0800478 return nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700479 }
480 vc.ReleaseCounters(m.Flow, m.InitialCounters)
Jason Hickey96d30e82014-11-13 07:40:00 -0800481 return nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700482 }
483 vlog.VI(2).Infof("Ignoring OpenFlow(%+v) for unrecognized VCI on VIF %s", m, m, vif)
Jason Hickey96d30e82014-11-13 07:40:00 -0800484 case *message.HopSetup:
485 // Configure the VIF. This takes over the conn during negotiation.
486 if vif.isSetup {
487 return errAlreadySetup
488 }
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800489 vif.muListen.Lock()
Jason Hickey96d30e82014-11-13 07:40:00 -0800490 principal, lBlessings, dischargeClient, err := serverAuthOptions(vif.listenerOpts)
Cosmos Nicolaou5a8a1252014-12-01 14:14:25 -0800491 vif.muListen.Unlock()
Jason Hickey96d30e82014-11-13 07:40:00 -0800492 if err != nil {
493 return errVersionNegotiationFailed
494 }
495 vif.writeMu.Lock()
Jungho Ahn4b9a5192015-02-02 13:11:08 -0800496 c, err := AuthenticateAsServer(vif.conn, vif.reader, vif.versions, principal, lBlessings, dischargeClient, m)
Jason Hickey96d30e82014-11-13 07:40:00 -0800497 if err != nil {
498 vif.writeMu.Unlock()
499 return err
500 }
501 vif.ctrlCipher = c
502 vif.writeMu.Unlock()
503 vif.isSetup = true
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700504 default:
505 vlog.Infof("Ignoring unrecognized message %T on VIF %s", m, vif)
506 }
Jason Hickey96d30e82014-11-13 07:40:00 -0800507 return nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700508}
509
510func (vif *VIF) vcDispatchLoop(vc *vc.VC, messages *pcqueue.T) {
511 defer vlog.VI(2).Infof("Exiting vcDispatchLoop(%v) on VIF %v", vc, vif)
Asim Shankarf5781102014-06-26 22:05:35 -0700512 defer vif.rpending.Done()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700513 for {
514 qm, err := messages.Get(nil)
515 if err != nil {
516 return
517 }
518 m := qm.(*message.Data)
519 if err := vc.DispatchPayload(m.Flow, m.Payload); err != nil {
520 vlog.VI(2).Infof("Ignoring data message %v for on VIF %v: %v", m, vif, err)
521 }
522 if m.Close() {
523 vif.shutdownFlow(vc, m.Flow)
524 }
525 }
526}
527
528func (vif *VIF) stopVCDispatchLoops() {
529 vcs := vif.vcMap.Freeze()
530 for _, v := range vcs {
531 v.RQ.Close()
532 }
Asim Shankarf5781102014-06-26 22:05:35 -0700533 vif.rpending.Wait()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700534}
535
536func (vif *VIF) acceptFlowsLoop(vc *vc.VC, c <-chan vc.HandshakeResult) {
537 hr := <-c
538 if hr.Error != nil {
539 vif.closeVCAndSendMsg(vc, hr.Error.Error())
540 return
541 }
542
543 vif.muListen.Lock()
544 acceptor := vif.acceptor
545 vif.muListen.Unlock()
546 if acceptor == nil {
547 vif.closeVCAndSendMsg(vc, "Flows no longer being accepted")
548 return
549 }
550
551 // Notify any listeners that a new VC has been established
552 if err := acceptor.Put(ConnectorAndFlow{vc, nil}); err != nil {
553 vif.closeVCAndSendMsg(vc, fmt.Sprintf("VC accept failed: %v", err))
554 return
555 }
556
557 vlog.VI(2).Infof("Running acceptFlowsLoop for VC %v on VIF %v", vc, vif)
558 for {
559 f, err := hr.Listener.Accept()
560 if err != nil {
Shyam Jayaramandbae76b2014-11-17 12:51:29 -0800561 vlog.VI(2).Infof("Accept failed on VC %v on VIF %v: %v", vc, vif, err)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700562 return
563 }
564 if err := acceptor.Put(ConnectorAndFlow{vc, f}); err != nil {
565 vlog.VI(2).Infof("vif.acceptor.Put(%v, %T) on VIF %v failed: %v", vc, f, vif, err)
566 f.Close()
567 return
568 }
569 }
570}
571
572func (vif *VIF) distributeCounters(counters message.Counters) {
573 for cid, bytes := range counters {
574 vc, _, _ := vif.vcMap.Find(cid.VCI())
575 if vc == nil {
576 vlog.VI(2).Infof("Ignoring counters for non-existent VCI %d on VIF %s", cid.VCI(), vif)
577 continue
578 }
579 vc.ReleaseCounters(cid.Flow(), bytes)
580 }
581}
582
583func (vif *VIF) writeLoop() {
584 defer vif.outgoing.Close()
585 defer vif.stopVCWriteLoops()
586 for {
587 writer, bufs, err := vif.outgoing.Get(nil)
588 if err != nil {
589 vlog.VI(1).Infof("Exiting writeLoop of VIF %s because of bqueue.Get error: %v", vif, err)
590 return
591 }
Robin Thellend5bd72422015-02-17 12:36:38 -0800592 vif.muMsgCounters.Lock()
593 vif.msgCounters[fmt.Sprintf("Send(%T)", writer)]++
594 vif.muMsgCounters.Unlock()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700595 switch writer {
596 case vif.expressQ:
597 for _, b := range bufs {
Jason Hickey96d30e82014-11-13 07:40:00 -0800598 if err := vif.writeSerializedMessage(b.Contents); err != nil {
Jungho Ahn4b9a5192015-02-02 13:11:08 -0800599 vlog.VI(1).Infof("Exiting writeLoop of VIF %s because Control message write failed: %s", vif, err)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700600 releaseBufs(bufs)
601 return
602 }
603 b.Release()
604 }
605 case vif.flowQ:
606 msg := &message.AddReceiveBuffers{}
607 // No need to call releaseBufs(bufs) as all bufs are
608 // the exact same value: flowToken.
609 vif.flowMu.Lock()
610 if len(vif.flowCounters) > 0 {
611 msg.Counters = vif.flowCounters
612 vif.flowCounters = message.NewCounters()
613 }
614 vif.flowMu.Unlock()
615 if len(msg.Counters) > 0 {
616 vlog.VI(3).Infof("Sending counters %v on VIF %s", msg.Counters, vif)
Jason Hickey96d30e82014-11-13 07:40:00 -0800617 if err := vif.writeMessage(msg); err != nil {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700618 vlog.VI(1).Infof("Exiting writeLoop of VIF %s because AddReceiveBuffers message write failed: %v", vif, err)
619 return
620 }
621 }
622 case vif.stopQ:
623 // Lowest-priority queue which will never have any
624 // buffers, Close is the only method called on it.
625 return
626 default:
627 vif.writeDataMessages(writer, bufs)
628 }
629 }
630}
631
632func (vif *VIF) vcWriteLoop(vc *vc.VC, messages *pcqueue.T) {
633 defer vlog.VI(2).Infof("Exiting vcWriteLoop(%v) on VIF %v", vc, vif)
Asim Shankarf5781102014-06-26 22:05:35 -0700634 defer vif.wpending.Done()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700635 for {
636 qm, err := messages.Get(nil)
637 if err != nil {
638 return
639 }
640 m := qm.(*message.Data)
641 m.Payload, err = vc.Encrypt(m.Flow, m.Payload)
642 if err != nil {
643 vlog.Infof("Encryption failed. Flow:%v VC:%v Error:%v", m.Flow, vc, err)
644 }
645 if m.Close() {
646 // The last bytes written on the flow will be sent out
647 // on vif.conn. Local datastructures for the flow can
648 // be cleaned up now.
649 vif.shutdownFlow(vc, m.Flow)
650 }
651 if err == nil {
Jason Hickey96d30e82014-11-13 07:40:00 -0800652 err = vif.writeMessage(m)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700653 }
654 if err != nil {
Bogdan Capritaad5761f2014-09-23 10:56:23 -0700655 // TODO(caprita): Calling closeVCAndSendMsg below causes
656 // a race as described in:
657 // https://docs.google.com/a/google.com/document/d/1C0kxfYhuOcStdV7tnLZELZpUhfQCZj47B0JrzbE29h8/edit
658 //
659 // There should be a finer grained way to fix this, and
660 // there are likely other instances where we should not
661 // be closing the VC.
662 //
663 // For now, commenting out the line below removes the
664 // flakiness from our existing unit tests, but this
665 // needs to be revisited and fixed correctly.
666 //
667 // vif.closeVCAndSendMsg(vc, fmt.Sprintf("write failure: %v", err))
668
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700669 // Drain the queue and exit.
670 for {
671 qm, err := messages.Get(nil)
672 if err != nil {
673 return
674 }
675 qm.(*message.Data).Release()
676 }
677 }
678 }
679}
680
681func (vif *VIF) stopVCWriteLoops() {
682 vcs := vif.vcMap.Freeze()
683 for _, v := range vcs {
684 v.WQ.Close()
685 }
686}
687
688// sendOnExpressQ adds 'msg' to the expressQ (highest priority queue) of messages to write on the wire.
689func (vif *VIF) sendOnExpressQ(msg message.T) error {
Cosmos Nicolaou82d00d82015-02-10 21:31:00 -0800690 vlog.VI(2).Infof("sendOnExpressQ(%T = %+v) on VIF %s", msg, msg, vif)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700691 var buf bytes.Buffer
Jason Hickey96d30e82014-11-13 07:40:00 -0800692 // Don't encrypt yet, because the message ordering isn't yet determined.
693 // Encryption is performed by vif.writeSerializedMessage() when the
694 // message is actually written to vif.conn.
695 vif.writeMu.Lock()
696 c := vif.ctrlCipher
697 vif.writeMu.Unlock()
698 if err := message.WriteTo(&buf, msg, crypto.NewDisabledControlCipher(c)); err != nil {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700699 return err
700 }
701 return vif.expressQ.Put(iobuf.NewSlice(buf.Bytes()), nil)
702}
703
Jason Hickey96d30e82014-11-13 07:40:00 -0800704// writeMessage writes the message to the channel. Writes must be serialized so
705// that the control channel can be encrypted, so we acquire the writeMu.
706func (vif *VIF) writeMessage(msg message.T) error {
707 vif.writeMu.Lock()
708 defer vif.writeMu.Unlock()
709 return message.WriteTo(vif.conn, msg, vif.ctrlCipher)
710}
711
712// Write writes the message to the channel, encrypting the control data. Writes
713// must be serialized so that the control channel can be encrypted, so we
714// acquire the writeMu.
715func (vif *VIF) writeSerializedMessage(msg []byte) error {
716 vif.writeMu.Lock()
717 defer vif.writeMu.Unlock()
718 if err := message.EncryptMessage(msg, vif.ctrlCipher); err != nil {
719 return err
720 }
721 if n, err := vif.conn.Write(msg); err != nil {
722 return fmt.Errorf("write failed: got (%d, %v) for %d byte message", n, err, len(msg))
723 }
724 return nil
725}
726
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700727func (vif *VIF) writeDataMessages(writer bqueue.Writer, bufs []*iobuf.Slice) {
728 vci, fid := unpackIDs(writer.ID())
729 // iobuf.Coalesce will coalesce buffers only if they are adjacent to
730 // each other. In the worst case, each buf will be non-adjacent to the
731 // others and the code below will end up with multiple small writes
732 // instead of a single big one.
733 // Might want to investigate this and see if this needs to be
734 // revisited.
735 bufs = iobuf.Coalesce(bufs, uint(vc.MaxPayloadSizeBytes))
736 _, _, wq := vif.vcMap.Find(vci)
737 if wq == nil {
738 // VC has been removed, stop sending messages
739 vlog.VI(2).Infof("VCI %d on VIF %s was shutdown, dropping %d messages that were pending a write", vci, vif, len(bufs))
740 releaseBufs(bufs)
741 return
742 }
743 last := len(bufs) - 1
744 drained := writer.IsDrained()
745 for i, b := range bufs {
746 d := &message.Data{VCI: vci, Flow: fid, Payload: b}
747 if drained && i == last {
748 d.SetClose()
749 }
750 if err := wq.Put(d, nil); err != nil {
751 releaseBufs(bufs[i:])
752 return
753 }
754 }
755 if len(bufs) == 0 && drained {
756 d := &message.Data{VCI: vci, Flow: fid}
757 d.SetClose()
758 if err := wq.Put(d, nil); err != nil {
759 d.Release()
760 }
761 }
762}
763
764func (vif *VIF) allocVCI() id.VC {
765 vif.muNextVCI.Lock()
766 ret := vif.nextVCI
767 vif.nextVCI += 2
768 vif.muNextVCI.Unlock()
769 return ret
770}
771
772func (vif *VIF) newVC(vci id.VC, localEP, remoteEP naming.Endpoint, dialed bool) (*vc.VC, error) {
Ryan Brown407b7322014-07-22 14:18:08 -0700773 version, err := version.CommonVersion(localEP, remoteEP)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700774 if vif.versions != nil {
Ryan Brown407b7322014-07-22 14:18:08 -0700775 version, err = vif.versions.CommonVersion(localEP, remoteEP)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700776 }
777 if err != nil {
778 return nil, err
779 }
780 vc := vc.InternalNew(vc.Params{
781 VCI: vci,
782 Dialed: dialed,
783 LocalEP: localEP,
784 RemoteEP: remoteEP,
785 Pool: vif.pool,
Jason Hickey96d30e82014-11-13 07:40:00 -0800786 ReserveBytes: uint(message.HeaderSizeBytes + vif.ctrlCipher.MACSize()),
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700787 Helper: vcHelper{vif},
Ryan Brown407b7322014-07-22 14:18:08 -0700788 Version: version,
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700789 })
790 added, rq, wq := vif.vcMap.Insert(vc)
Asim Shankarf5781102014-06-26 22:05:35 -0700791 // Start vcWriteLoop
792 if added = added && vif.wpending.TryAdd(); added {
793 go vif.vcWriteLoop(vc, wq)
794 }
795 // Start vcDispatchLoop
796 if added = added && vif.rpending.TryAdd(); added {
797 go vif.vcDispatchLoop(vc, rq)
798 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700799 if !added {
Asim Shankarf5781102014-06-26 22:05:35 -0700800 if rq != nil {
801 rq.Close()
802 }
803 if wq != nil {
804 wq.Close()
805 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700806 vif.vcMap.Delete(vci)
807 vc.Close("underlying network connection shutting down")
Jiri Simsa074bf362015-02-17 09:29:45 -0800808 // We embed an error inside verror.ErrAborted because other layers
Mike Burrows35baad92015-02-10 13:42:53 -0800809 // check for the "Aborted" error as a special case. Perhaps
810 // eventually we'll get rid of the Aborted layer.
Jiri Simsa074bf362015-02-17 09:29:45 -0800811 return nil, verror.New(verror.ErrAborted, nil, verror.New(errShuttingDown, nil, vif))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700812 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700813 return vc, nil
814}
815
816func (vif *VIF) closeVCAndSendMsg(vc *vc.VC, msg string) {
817 vlog.VI(2).Infof("Shutting down VCI %d on VIF %v due to: %v", vc.VCI(), vif, msg)
818 vif.vcMap.Delete(vc.VCI())
819 vc.Close(msg)
820 if err := vif.sendOnExpressQ(&message.CloseVC{
821 VCI: vc.VCI(),
822 Error: msg,
823 }); err != nil {
824 vlog.VI(2).Infof("sendOnExpressQ(CloseVC{VCI:%d,...}) on VIF %v failed: %v", vc.VCI(), vif, err)
825 }
826}
827
828// shutdownFlow clears out all the datastructures associated with fid.
829func (vif *VIF) shutdownFlow(vc *vc.VC, fid id.Flow) {
830 vc.ShutdownFlow(fid)
831 vif.flowMu.Lock()
832 delete(vif.flowCounters, message.MakeCounterID(vc.VCI(), fid))
833 vif.flowMu.Unlock()
834}
835
836// ShutdownVCs closes all VCs established to the provided remote endpoint.
837// Returns the number of VCs that were closed.
838func (vif *VIF) ShutdownVCs(remote naming.Endpoint) int {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700839 vcs := vif.vcMap.List()
840 n := 0
841 for _, vc := range vcs {
Asim Shankar7cf29002014-10-09 00:38:37 -0700842 if naming.Compare(vc.RemoteAddr().RoutingID(), remote.RoutingID()) {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700843 vlog.VI(1).Infof("VCI %d on VIF %s being closed because of ShutdownVCs call", vc.VCI(), vif)
844 vif.closeVCAndSendMsg(vc, "")
845 n++
846 }
847 }
848 return n
849}
850
851// NumVCs returns the number of VCs established over this VIF.
852func (vif *VIF) NumVCs() int { return vif.vcMap.Size() }
853
854// DebugString returns a descriptive state of the VIF.
855//
856// The returned string is meant for consumptions by humans. The specific format
857// should not be relied upon by any automated processing.
858func (vif *VIF) DebugString() string {
859 vcs := vif.vcMap.List()
860 l := make([]string, 0, len(vcs)+1)
861
862 vif.muNextVCI.Lock() // Needed for vif.nextVCI
Robin Thellend5bd72422015-02-17 12:36:38 -0800863 l = append(l, fmt.Sprintf("VIF:[%s] -- #VCs:%d NextVCI:%d ControlChannelEncryption:%v IsClosed:%v", vif, len(vcs), vif.nextVCI, vif.isSetup, vif.isClosed))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700864 vif.muNextVCI.Unlock()
865
866 for _, vc := range vcs {
867 l = append(l, vc.DebugString())
868 }
Robin Thellend5bd72422015-02-17 12:36:38 -0800869
870 l = append(l, "Message Counters:")
871 ctrs := len(l)
872 vif.muMsgCounters.Lock()
873 for k, v := range vif.msgCounters {
874 l = append(l, fmt.Sprintf(" %-32s %10d", k, v))
875 }
876 vif.muMsgCounters.Unlock()
877 sort.Strings(l[ctrs:])
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700878 return strings.Join(l, "\n")
879}
880
881// Methods and type that implement vc.Helper
Jungho Ahn4b9a5192015-02-02 13:11:08 -0800882//
883// We create a separate type for vc.Helper to hide the vc.Helper methods
884// from the exported method set of VIF.
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700885type vcHelper struct{ vif *VIF }
886
887func (h vcHelper) NotifyOfNewFlow(vci id.VC, fid id.Flow, bytes uint) {
888 h.vif.sendOnExpressQ(&message.OpenFlow{VCI: vci, Flow: fid, InitialCounters: uint32(bytes)})
889}
890
891func (h vcHelper) AddReceiveBuffers(vci id.VC, fid id.Flow, bytes uint) {
892 if bytes == 0 {
893 return
894 }
895 h.vif.flowMu.Lock()
896 h.vif.flowCounters.Add(vci, fid, uint32(bytes))
897 h.vif.flowMu.Unlock()
898 h.vif.flowQ.TryPut(flowToken)
899}
900
901func (h vcHelper) NewWriter(vci id.VC, fid id.Flow) (bqueue.Writer, error) {
902 return h.vif.outgoing.NewWriter(packIDs(vci, fid), normalPriority, defaultBytesBufferedPerFlow)
903}
904
905// The token added to vif.flowQ.
906var flowToken *iobuf.Slice
907
908func init() {
909 // flowToken must be non-empty otherwise bqueue.Writer.Put will ignore it.
910 flowToken = iobuf.NewSlice(make([]byte, 1))
911}
912
913func packIDs(vci id.VC, fid id.Flow) bqueue.ID {
914 return bqueue.ID(message.MakeCounterID(vci, fid))
915}
916
917func unpackIDs(b bqueue.ID) (id.VC, id.Flow) {
918 cid := message.CounterID(b)
919 return cid.VCI(), cid.Flow()
920}
921
922func releaseBufs(bufs []*iobuf.Slice) {
923 for _, b := range bufs {
924 b.Release()
925 }
926}